ndata/
databytes.rs

1extern crate alloc;
2use core::cmp;
3use core::mem; 
4use crate::heap::*;
5use crate::sharedmutex::*;
6use alloc::collections::VecDeque; 
7
8#[cfg(feature="no_std_support")]
9use alloc::string::{String, ToString};
10#[cfg(feature="no_std_support")]
11use alloc::vec::Vec;
12#[cfg(not(feature="no_std_support"))]
13use std::println;
14
15
16// --- NDataError Definition ---
17#[derive(Debug)]
18pub enum NDataError {
19    InvalidBytesRef,
20    StreamNotReadable,
21    StreamNotWritable,
22}
23
24impl core::fmt::Display for NDataError {
25    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
26        match self {
27            NDataError::InvalidBytesRef => write!(f, "DataBytes reference is invalid or points to deallocated memory"),
28            NDataError::StreamNotReadable => write!(f, "Stream is not open for reading"),
29            NDataError::StreamNotWritable => write!(f, "Stream is not open for writing"),
30        }
31    }
32}
33
34#[cfg(not(feature = "no_std_support"))]
35impl std::error::Error for NDataError {}
36
37
38/// Storage for runtime byte buffer values
39static mut BH:SharedMutex<Heap<DataStream>> = SharedMutex::new();
40
41/// Storage for runtime reference count reductions
42static mut BD:SharedMutex<Vec<usize>> = SharedMutex::new();
43
44/// Implements a stream of bytes.
45///
46/// Internally uses a VecDeque to allow O(1) removal from the front of the stream,
47/// solving performance issues with repeated small reads from large buffers.
48#[derive(Debug, Default)]
49pub struct DataStream {
50    /// Raw data currently held in stream
51    data: VecDeque<u8>,
52    /// Length of data to be sent in this stream. Value should be zero (unset) or fixed (unchanging) value.
53    len: usize,
54    /// Indicates whether the current stream is open to reading
55    read_open: bool,
56    /// Indicates whether the current stream is open to writing
57    write_open: bool,
58    /// Optional MIME type of this stream
59    mime_type: Option<String>,
60}
61
62impl DataStream {
63    pub fn new() -> Self {
64        DataStream {
65            data: VecDeque::new(),
66            len: 0,
67            read_open: true,
68            write_open: true,
69            mime_type: None,
70        }
71    }
72
73    pub fn from_bytes(buf: Vec<u8>) -> DataStream {
74        let len = buf.len();
75        DataStream {
76            data: VecDeque::from(buf),
77            len: len,
78            read_open: true,
79            write_open: false,
80            mime_type: None,
81        }
82    }
83
84    pub fn deep_copy(&self) -> DataStream {
85        DataStream {
86            data: self.data.clone(),
87            len: self.len,
88            read_open: self.read_open,
89            write_open: self.write_open,
90            mime_type: self.mime_type.as_ref().map(|s| s.to_string()),
91        }
92    }
93}
94
95pub fn bheap() -> &'static mut SharedMutex<Heap<DataStream>> {
96    #[allow(static_mut_refs)]
97    unsafe { &mut BH }
98}
99
100fn bdrop() -> &'static mut SharedMutex<Vec<usize>> {
101    #[allow(static_mut_refs)]
102    unsafe { &mut BD }
103}
104
105#[derive(Debug, Default)]
106pub struct DataBytes {
107    pub data_ref: usize,
108}
109
110impl Clone for DataBytes{
111    fn clone(&self) -> Self {
112        let _ = bheap().lock().incr(self.data_ref);
113        DataBytes{
114            data_ref: self.data_ref,
115        }
116    }
117}
118
119impl DataBytes {
120    #[allow(static_mut_refs)]
121    pub fn init() -> ((u64, u64),(u64, u64)){
122        unsafe {
123            if !BH.is_initialized() {
124                BH.set(Heap::new());
125                BD.set(Vec::new());
126            }
127        }
128        DataBytes::share()
129    }
130
131    #[allow(static_mut_refs)]
132    pub fn share() -> ((u64, u64), (u64, u64)){
133        unsafe{
134            let q = BH.share();
135            let r = BD.share();
136            (q, r)
137        }
138    }
139
140    #[allow(static_mut_refs)]
141    pub fn mirror(q:(u64, u64), r:(u64, u64)){
142        unsafe {
143            BH.mirror(q.0, q.1);
144            BD.mirror(r.0, r.1);
145        }
146    }
147
148    pub fn new() -> DataBytes {
149        let data_ref = bheap().lock().push(DataStream::new());
150        DataBytes { data_ref }
151    }
152
153    pub fn from_bytes(buf:&Vec<u8>) -> DataBytes {
154        // Clone the input vec into the deque
155        let data_ref = bheap().lock().push(DataStream::from_bytes(buf.clone()));
156        DataBytes { data_ref }
157    }
158
159    // --- Original Public Methods (panicking on error) ---
160
161    pub fn get_data(&self) -> Vec<u8> {
162        let mut heap_guard = bheap().lock();
163        if !heap_guard.contains_key(self.data_ref) {
164            panic!("DataBytes::get_data called on invalid data_ref: {}", self.data_ref);
165        }
166        let stream = heap_guard.get(self.data_ref);
167        // Convert VecDeque to Vec
168        Vec::from(stream.data.clone())
169    }
170
171    pub fn write(&self, buf:&[u8]) -> bool {
172        // WARNING - No guard on writing too much too fast and running out of RAM
173        //if self.current_len() > 100 * 1024 * 1024 { return false; }
174
175        let mut heap_guard = bheap().lock();
176         if !heap_guard.contains_key(self.data_ref) {
177             if cfg!(debug_assertions) {
178                #[cfg(not(feature="no_std_support"))]
179                println!("Warning: DataBytes::write called on invalid data_ref: {}", self.data_ref);
180             }
181            return false;
182        }
183        let stream = heap_guard.get(self.data_ref);
184        if !stream.write_open || !stream.read_open { return false; }
185
186        // Efficiently extend the VecDeque
187        stream.data.extend(buf.iter().copied());
188        true
189    }
190
191    pub fn read(&self, n:usize) -> Vec<u8> {
192        let mut heap_guard = bheap().lock();
193        if !heap_guard.contains_key(self.data_ref) {
194            panic!("DataBytes::read called on invalid data_ref: {}", self.data_ref);
195        }
196        let stream = heap_guard.get(self.data_ref);
197        if !stream.read_open {
198            panic!("Attempt to read from closed data stream: ref {}", self.data_ref);
199        }
200        let num_to_read = cmp::min(n, stream.data.len());
201
202        // Optimized drain on VecDeque (O(1) pointer move, followed by collection)
203        let d: Vec<u8> = stream.data.drain(0..num_to_read).collect();
204
205        if !stream.write_open && stream.data.is_empty() {
206            stream.read_open = false;
207        }
208        
209        // MEMORY RECOVERY:
210        // If the stream was large but is now empty, release the backing memory.
211        if stream.data.is_empty() && stream.data.capacity() > 1024 * 64 {
212            stream.data.shrink_to_fit();
213        }
214
215        d
216    }
217
218    pub fn set_data(&self, buf:&Vec<u8>) {
219        let mut heap_guard = bheap().lock();
220        if !heap_guard.contains_key(self.data_ref) {
221            panic!("DataBytes::set_data called on invalid data_ref: {}", self.data_ref);
222        }
223        let stream = heap_guard.get(self.data_ref);
224        let len = buf.len();
225        stream.data.clear();
226        stream.data.extend(buf.iter().copied());
227
228        stream.len = len;
229        stream.write_open = false;
230    }
231
232    pub fn current_len(&self) -> usize {
233        let mut heap_guard = bheap().lock();
234        if !heap_guard.contains_key(self.data_ref) {
235            panic!("DataBytes::current_len called on invalid data_ref: {}", self.data_ref);
236        }
237        let stream = heap_guard.get(self.data_ref);
238        stream.data.len()
239    }
240
241    pub fn stream_len(&self) -> usize {
242        let mut heap_guard = bheap().lock();
243        if !heap_guard.contains_key(self.data_ref) {
244            panic!("DataBytes::stream_len called on invalid data_ref: {}", self.data_ref);
245        }
246        let stream = heap_guard.get(self.data_ref);
247        stream.len
248    }
249
250    pub fn set_stream_len(&self, len: usize) {
251        let mut heap_guard = bheap().lock();
252        if !heap_guard.contains_key(self.data_ref) {
253            panic!("DataBytes::set_stream_len called on invalid data_ref: {}", self.data_ref);
254        }
255        let stream = heap_guard.get(self.data_ref);
256        stream.len = len;
257    }
258
259    pub fn is_write_open(&self) -> bool {
260        let mut heap_guard = bheap().lock();
261        if !heap_guard.contains_key(self.data_ref) {
262            panic!("DataBytes::is_write_open called on invalid data_ref: {}", self.data_ref);
263        }
264        let stream = heap_guard.get(self.data_ref);
265        stream.write_open
266    }
267
268    pub fn is_read_open(&self) -> bool {
269        let mut heap_guard = bheap().lock();
270        if !heap_guard.contains_key(self.data_ref) {
271            panic!("DataBytes::is_read_open called on invalid data_ref: {}", self.data_ref);
272        }
273        let stream = heap_guard.get(self.data_ref);
274        stream.read_open
275    }
276
277    pub fn close_write(&self) {
278        let mut heap_guard = bheap().lock();
279        if !heap_guard.contains_key(self.data_ref) {
280            panic!("DataBytes::close_write called on invalid data_ref: {}", self.data_ref);
281        }
282        let stream = heap_guard.get(self.data_ref);
283        stream.write_open = false;
284    }
285
286    pub fn close_read(&self) {
287        let mut heap_guard = bheap().lock();
288        if !heap_guard.contains_key(self.data_ref) {
289            panic!("DataBytes::close_read called on invalid data_ref: {}", self.data_ref);
290        }
291        let stream = heap_guard.get(self.data_ref);
292        stream.read_open = false;
293    }
294
295    pub fn set_mime_type(&self, mime:Option<String>) {
296        let mut heap_guard = bheap().lock();
297        if !heap_guard.contains_key(self.data_ref) {
298            panic!("DataBytes::set_mime_type called on invalid data_ref: {}", self.data_ref);
299        }
300        let stream = heap_guard.get(self.data_ref);
301        stream.mime_type = mime;
302    }
303
304    pub fn get_mime_type(&self) -> Option<String> {
305        let mut heap_guard = bheap().lock();
306        if !heap_guard.contains_key(self.data_ref) {
307            panic!("DataBytes::get_mime_type called on invalid data_ref: {}", self.data_ref);
308        }
309        let stream = heap_guard.get(self.data_ref);
310        stream.mime_type.as_ref().map(|s| s.to_string())
311    }
312
313    pub fn to_hex_string(&self) -> String {
314        let mut heap_guard = bheap().lock();
315         if !heap_guard.contains_key(self.data_ref) {
316            panic!("DataBytes::to_hex_string called on invalid data_ref: {}", self.data_ref);
317        }
318        let stream = heap_guard.get(self.data_ref);
319        let strs: Vec<String> = stream.data.iter()
320            .map(|b| format!("{:02X}", b))
321            .collect();
322        strs.join(" ")
323    }
324
325    pub fn deep_copy(&self) -> DataBytes {
326        let mut heap_guard = bheap().lock();
327        if !heap_guard.contains_key(self.data_ref) {
328            panic!("DataBytes::deep_copy called on invalid data_ref: {}", self.data_ref);
329        }
330        let stream_to_copy = heap_guard.get(self.data_ref);
331        let new_stream = stream_to_copy.deep_copy();
332
333        let new_data_ref = heap_guard.push(new_stream);
334        DataBytes {
335            data_ref: new_data_ref,
336        }
337    }
338
339    // --- New High-Performance Methods ---
340
341    /// Reads up to `buf.len()` bytes into the provided buffer.
342    /// Returns the number of bytes read.
343    pub fn read_into(&self, buf: &mut [u8]) -> usize {
344        let mut heap_guard = bheap().lock();
345        if !heap_guard.contains_key(self.data_ref) {
346            panic!("DataBytes::read_into called on invalid data_ref: {}", self.data_ref);
347        }
348        let stream = heap_guard.get(self.data_ref);
349        if !stream.read_open {
350             panic!("Attempt to read from closed data stream: ref {}", self.data_ref);
351        }
352
353        let num_to_read = cmp::min(buf.len(), stream.data.len());
354
355        // Optimized copy from VecDeque
356        for (i, byte) in stream.data.drain(0..num_to_read).enumerate() {
357            buf[i] = byte;
358        }
359
360        if !stream.write_open && stream.data.is_empty() {
361            stream.read_open = false;
362        }
363
364        // MEMORY RECOVERY
365        if stream.data.is_empty() && stream.data.capacity() > 1024 * 64 {
366            stream.data.shrink_to_fit();
367        }
368
369        num_to_read
370    }
371
372    /// Peeks at the first `n` bytes of the stream without consuming them.
373    pub fn peek(&self, n: usize) -> Vec<u8> {
374        let mut heap_guard = bheap().lock();
375        if !heap_guard.contains_key(self.data_ref) {
376            panic!("DataBytes::peek called on invalid data_ref: {}", self.data_ref);
377        }
378        let stream = heap_guard.get(self.data_ref);
379        let num_to_peek = cmp::min(n, stream.data.len());
380        stream.data.iter().take(num_to_peek).copied().collect()
381    }
382
383    /// Moves all available data from this stream to the destination stream.
384    /// Returns the number of bytes moved.
385    pub fn pipe(&self, dest: &DataBytes) -> usize {
386        // Reads all data and writes to dest.
387        // NOTE: This allocates a temporary Vec. 
388        // For zero-allocation piping, we would need a new method in Heap/SharedMutex 
389        // that allows locking two items simultaneously, which is complex.
390        let chunk = self.read(usize::MAX);
391        let len = chunk.len();
392
393        if len > 0 {
394            dest.write(&chunk);
395        }
396        len
397    }
398
399    // --- New `try_` Methods (non-panicking, return Result) ---
400
401    pub fn try_get_data(&self) -> Result<Vec<u8>, NDataError> {
402        let mut heap_guard = bheap().lock();
403        if !heap_guard.contains_key(self.data_ref) {
404            return Err(NDataError::InvalidBytesRef);
405        }
406        let stream = heap_guard.get(self.data_ref);
407        Ok(Vec::from(stream.data.clone()))
408    }
409
410    pub fn try_write(&mut self, buf:&[u8]) -> Result<(), NDataError> {
411        let mut heap_guard = bheap().lock();
412        if !heap_guard.contains_key(self.data_ref) {
413            return Err(NDataError::InvalidBytesRef);
414        }
415        let stream = heap_guard.get(self.data_ref);
416        if !stream.write_open {
417            return Err(NDataError::StreamNotWritable);
418        }
419        stream.data.extend(buf.iter().copied());
420        Ok(())
421    }
422
423    pub fn try_read(&mut self, n:usize) -> Result<Vec<u8>, NDataError> {
424        let mut heap_guard = bheap().lock();
425        if !heap_guard.contains_key(self.data_ref) {
426            return Err(NDataError::InvalidBytesRef);
427        }
428        let stream = heap_guard.get(self.data_ref);
429        if !stream.read_open {
430            return Err(NDataError::StreamNotReadable);
431        }
432        let num_to_read = cmp::min(n, stream.data.len());
433        let d: Vec<u8> = stream.data.drain(0..num_to_read).collect();
434
435        if !stream.write_open && stream.data.is_empty() {
436            stream.read_open = false;
437        }
438
439        // MEMORY RECOVERY
440        if stream.data.is_empty() && stream.data.capacity() > 1024 * 64 {
441            stream.data.shrink_to_fit();
442        }
443
444        Ok(d)
445    }
446
447    pub fn try_set_data(&mut self, buf:&Vec<u8>) -> Result<(), NDataError> {
448        let mut heap_guard = bheap().lock();
449        if !heap_guard.contains_key(self.data_ref) {
450            return Err(NDataError::InvalidBytesRef);
451        }
452        let stream = heap_guard.get(self.data_ref);
453        let len = buf.len();
454        stream.data.clear();
455        stream.data.extend(buf.iter().copied());
456
457        stream.len = len;
458        stream.write_open = false;
459        Ok(())
460    }
461
462    pub fn try_current_len(&self) -> Result<usize, NDataError> {
463        let mut heap_guard = bheap().lock();
464        if !heap_guard.contains_key(self.data_ref) {
465            return Err(NDataError::InvalidBytesRef);
466        }
467        let stream = heap_guard.get(self.data_ref);
468        Ok(stream.data.len())
469    }
470
471    pub fn try_stream_len(&self) -> Result<usize, NDataError> {
472        let mut heap_guard = bheap().lock();
473        if !heap_guard.contains_key(self.data_ref) {
474            return Err(NDataError::InvalidBytesRef);
475        }
476        let stream = heap_guard.get(self.data_ref);
477        Ok(stream.len)
478    }
479
480    pub fn try_set_stream_len(&mut self, len: usize) -> Result<(), NDataError> {
481        let mut heap_guard = bheap().lock();
482        if !heap_guard.contains_key(self.data_ref) {
483            return Err(NDataError::InvalidBytesRef);
484        }
485        let stream = heap_guard.get(self.data_ref);
486        stream.len = len;
487        Ok(())
488    }
489
490    pub fn try_is_write_open(&self) -> Result<bool, NDataError> {
491        let mut heap_guard = bheap().lock();
492        if !heap_guard.contains_key(self.data_ref) {
493            return Err(NDataError::InvalidBytesRef);
494        }
495        let stream = heap_guard.get(self.data_ref);
496        Ok(stream.write_open)
497    }
498
499    pub fn try_is_read_open(&self) -> Result<bool, NDataError> {
500        let mut heap_guard = bheap().lock();
501        if !heap_guard.contains_key(self.data_ref) {
502            return Err(NDataError::InvalidBytesRef);
503        }
504        let stream = heap_guard.get(self.data_ref);
505        Ok(stream.read_open)
506    }
507
508    pub fn try_close_write(&mut self) -> Result<(), NDataError> {
509        let mut heap_guard = bheap().lock();
510        if !heap_guard.contains_key(self.data_ref) {
511            return Err(NDataError::InvalidBytesRef);
512        }
513        let stream = heap_guard.get(self.data_ref);
514        stream.write_open = false;
515        Ok(())
516    }
517
518    pub fn try_close_read(&mut self) -> Result<(), NDataError> {
519        let mut heap_guard = bheap().lock();
520        if !heap_guard.contains_key(self.data_ref) {
521            return Err(NDataError::InvalidBytesRef);
522        }
523        let stream = heap_guard.get(self.data_ref);
524        stream.read_open = false;
525        Ok(())
526    }
527
528    pub fn try_set_mime_type(&mut self, mime:Option<String>) -> Result<(), NDataError> {
529        let mut heap_guard = bheap().lock();
530        if !heap_guard.contains_key(self.data_ref) {
531            return Err(NDataError::InvalidBytesRef);
532        }
533        let stream = heap_guard.get(self.data_ref);
534        stream.mime_type = mime;
535        Ok(())
536    }
537
538    pub fn try_get_mime_type(&self) -> Result<Option<String>, NDataError> {
539        let mut heap_guard = bheap().lock();
540        if !heap_guard.contains_key(self.data_ref) {
541            return Err(NDataError::InvalidBytesRef);
542        }
543        let stream = heap_guard.get(self.data_ref);
544        Ok(stream.mime_type.as_ref().map(|s| s.to_string()))
545    }
546
547    // --- Static and other existing methods ---
548    pub fn get(data_ref: usize) -> DataBytes {
549        let _ = bheap().lock().incr(data_ref);
550        DataBytes{
551            data_ref,
552        }
553    }
554
555    pub fn incr(&self) {
556        let _ = bheap().lock().incr(self.data_ref);
557    }
558
559    pub fn decr(&self) {
560        let _ = bheap().lock().decr(self.data_ref);
561    }
562
563    #[deprecated(since="0.3.0", note="please use `clone` instead")]
564    pub fn duplicate(&self) -> DataBytes {
565        self.clone()
566    }
567
568    #[cfg(not(feature="no_std_support"))]
569    pub fn print_heap() {
570        println!("bytes {:?}", bheap().lock().keys());
571    }
572
573    /// Garbage collects dropped streams.
574    ///
575    /// # Deadlock Safety
576    /// This method uses a "swap-and-drop" strategy to avoid deadlock risks.
577    /// 1. We acquire the drop list (`BD`), efficiently swap its contents with an empty list (O(1)), and release the lock immediately.
578    /// 2. We then acquire the heap lock (`BH`) to process the decrements.
579    ///
580    /// By never holding `BD` and `BH` simultaneously, we prevent any "Hold and Wait" deadlock cycles
581    /// involving these two resources.
582    pub fn gc() {
583        // Step 1: Atomic Swap (O(1))
584        // We take the list of refs to decrement and release the BD lock immediately.
585        let to_process = {
586            let mut bdrop_guard = bdrop().lock();
587            if bdrop_guard.is_empty() {
588                return;
589            }
590            mem::take(&mut *bdrop_guard)
591        }; // BD lock is released here.
592
593        // Step 2: Process Decrements
594        // We now hold only the BH lock.
595        let mut bheap_guard = bheap().lock();
596        for data_ref in to_process {
597            if bheap_guard.contains_key(data_ref) {
598                 bheap_guard.decr(data_ref);
599            } else {
600                #[cfg(not(feature = "no_std_support"))]
601                if cfg!(debug_assertions) {
602                    println!("Warning: DataBytes::gc trying to decr non-existent ref {}", data_ref);
603                }
604            }
605        }
606    }
607}
608
609impl Drop for DataBytes {
610    fn drop(&mut self) {
611        bdrop().lock().push(self.data_ref);
612    }
613}