1extern crate alloc;
2use core::cmp;
3use core::mem;
4use crate::heap::*;
5use crate::sharedmutex::*;
6use alloc::collections::VecDeque;
7
8#[cfg(feature="no_std_support")]
9use alloc::string::{String, ToString};
10#[cfg(feature="no_std_support")]
11use alloc::vec::Vec;
12#[cfg(not(feature="no_std_support"))]
13use std::println;
14
15
16#[derive(Debug)]
18pub enum NDataError {
19 InvalidBytesRef,
20 StreamNotReadable,
21 StreamNotWritable,
22}
23
24impl core::fmt::Display for NDataError {
25 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
26 match self {
27 NDataError::InvalidBytesRef => write!(f, "DataBytes reference is invalid or points to deallocated memory"),
28 NDataError::StreamNotReadable => write!(f, "Stream is not open for reading"),
29 NDataError::StreamNotWritable => write!(f, "Stream is not open for writing"),
30 }
31 }
32}
33
34#[cfg(not(feature = "no_std_support"))]
35impl std::error::Error for NDataError {}
36
37
38static mut BH:SharedMutex<Heap<DataStream>> = SharedMutex::new();
40
41static mut BD:SharedMutex<Vec<usize>> = SharedMutex::new();
43
44#[derive(Debug, Default)]
49pub struct DataStream {
50 data: VecDeque<u8>,
52 len: usize,
54 read_open: bool,
56 write_open: bool,
58 mime_type: Option<String>,
60}
61
62impl DataStream {
63 pub fn new() -> Self {
64 DataStream {
65 data: VecDeque::new(),
66 len: 0,
67 read_open: true,
68 write_open: true,
69 mime_type: None,
70 }
71 }
72
73 pub fn from_bytes(buf: Vec<u8>) -> DataStream {
74 let len = buf.len();
75 DataStream {
76 data: VecDeque::from(buf),
77 len: len,
78 read_open: true,
79 write_open: false,
80 mime_type: None,
81 }
82 }
83
84 pub fn deep_copy(&self) -> DataStream {
85 DataStream {
86 data: self.data.clone(),
87 len: self.len,
88 read_open: self.read_open,
89 write_open: self.write_open,
90 mime_type: self.mime_type.as_ref().map(|s| s.to_string()),
91 }
92 }
93}
94
95pub fn bheap() -> &'static mut SharedMutex<Heap<DataStream>> {
96 #[allow(static_mut_refs)]
97 unsafe { &mut BH }
98}
99
100fn bdrop() -> &'static mut SharedMutex<Vec<usize>> {
101 #[allow(static_mut_refs)]
102 unsafe { &mut BD }
103}
104
105#[derive(Debug, Default)]
106pub struct DataBytes {
107 pub data_ref: usize,
108}
109
110impl Clone for DataBytes{
111 fn clone(&self) -> Self {
112 let _ = bheap().lock().incr(self.data_ref);
113 DataBytes{
114 data_ref: self.data_ref,
115 }
116 }
117}
118
119impl DataBytes {
120 #[allow(static_mut_refs)]
121 pub fn init() -> ((u64, u64),(u64, u64)){
122 unsafe {
123 if !BH.is_initialized() {
124 BH.set(Heap::new());
125 BD.set(Vec::new());
126 }
127 }
128 DataBytes::share()
129 }
130
131 #[allow(static_mut_refs)]
132 pub fn share() -> ((u64, u64), (u64, u64)){
133 unsafe{
134 let q = BH.share();
135 let r = BD.share();
136 (q, r)
137 }
138 }
139
140 #[allow(static_mut_refs)]
141 pub fn mirror(q:(u64, u64), r:(u64, u64)){
142 unsafe {
143 BH.mirror(q.0, q.1);
144 BD.mirror(r.0, r.1);
145 }
146 }
147
148 pub fn new() -> DataBytes {
149 let data_ref = bheap().lock().push(DataStream::new());
150 DataBytes { data_ref }
151 }
152
153 pub fn from_bytes(buf:&Vec<u8>) -> DataBytes {
154 let data_ref = bheap().lock().push(DataStream::from_bytes(buf.clone()));
156 DataBytes { data_ref }
157 }
158
159 pub fn get_data(&self) -> Vec<u8> {
162 let mut heap_guard = bheap().lock();
163 if !heap_guard.contains_key(self.data_ref) {
164 panic!("DataBytes::get_data called on invalid data_ref: {}", self.data_ref);
165 }
166 let stream = heap_guard.get(self.data_ref);
167 Vec::from(stream.data.clone())
169 }
170
171 pub fn write(&self, buf:&[u8]) -> bool {
172 let mut heap_guard = bheap().lock();
176 if !heap_guard.contains_key(self.data_ref) {
177 if cfg!(debug_assertions) {
178 #[cfg(not(feature="no_std_support"))]
179 println!("Warning: DataBytes::write called on invalid data_ref: {}", self.data_ref);
180 }
181 return false;
182 }
183 let stream = heap_guard.get(self.data_ref);
184 if !stream.write_open || !stream.read_open { return false; }
185
186 stream.data.extend(buf.iter().copied());
188 true
189 }
190
191 pub fn read(&self, n:usize) -> Vec<u8> {
192 let mut heap_guard = bheap().lock();
193 if !heap_guard.contains_key(self.data_ref) {
194 panic!("DataBytes::read called on invalid data_ref: {}", self.data_ref);
195 }
196 let stream = heap_guard.get(self.data_ref);
197 if !stream.read_open {
198 panic!("Attempt to read from closed data stream: ref {}", self.data_ref);
199 }
200 let num_to_read = cmp::min(n, stream.data.len());
201
202 let d: Vec<u8> = stream.data.drain(0..num_to_read).collect();
204
205 if !stream.write_open && stream.data.is_empty() {
206 stream.read_open = false;
207 }
208
209 if stream.data.is_empty() && stream.data.capacity() > 1024 * 64 {
212 stream.data.shrink_to_fit();
213 }
214
215 d
216 }
217
218 pub fn set_data(&self, buf:&Vec<u8>) {
219 let mut heap_guard = bheap().lock();
220 if !heap_guard.contains_key(self.data_ref) {
221 panic!("DataBytes::set_data called on invalid data_ref: {}", self.data_ref);
222 }
223 let stream = heap_guard.get(self.data_ref);
224 let len = buf.len();
225 stream.data.clear();
226 stream.data.extend(buf.iter().copied());
227
228 stream.len = len;
229 stream.write_open = false;
230 }
231
232 pub fn current_len(&self) -> usize {
233 let mut heap_guard = bheap().lock();
234 if !heap_guard.contains_key(self.data_ref) {
235 panic!("DataBytes::current_len called on invalid data_ref: {}", self.data_ref);
236 }
237 let stream = heap_guard.get(self.data_ref);
238 stream.data.len()
239 }
240
241 pub fn stream_len(&self) -> usize {
242 let mut heap_guard = bheap().lock();
243 if !heap_guard.contains_key(self.data_ref) {
244 panic!("DataBytes::stream_len called on invalid data_ref: {}", self.data_ref);
245 }
246 let stream = heap_guard.get(self.data_ref);
247 stream.len
248 }
249
250 pub fn set_stream_len(&self, len: usize) {
251 let mut heap_guard = bheap().lock();
252 if !heap_guard.contains_key(self.data_ref) {
253 panic!("DataBytes::set_stream_len called on invalid data_ref: {}", self.data_ref);
254 }
255 let stream = heap_guard.get(self.data_ref);
256 stream.len = len;
257 }
258
259 pub fn is_write_open(&self) -> bool {
260 let mut heap_guard = bheap().lock();
261 if !heap_guard.contains_key(self.data_ref) {
262 panic!("DataBytes::is_write_open called on invalid data_ref: {}", self.data_ref);
263 }
264 let stream = heap_guard.get(self.data_ref);
265 stream.write_open
266 }
267
268 pub fn is_read_open(&self) -> bool {
269 let mut heap_guard = bheap().lock();
270 if !heap_guard.contains_key(self.data_ref) {
271 panic!("DataBytes::is_read_open called on invalid data_ref: {}", self.data_ref);
272 }
273 let stream = heap_guard.get(self.data_ref);
274 stream.read_open
275 }
276
277 pub fn close_write(&self) {
278 let mut heap_guard = bheap().lock();
279 if !heap_guard.contains_key(self.data_ref) {
280 panic!("DataBytes::close_write called on invalid data_ref: {}", self.data_ref);
281 }
282 let stream = heap_guard.get(self.data_ref);
283 stream.write_open = false;
284 }
285
286 pub fn close_read(&self) {
287 let mut heap_guard = bheap().lock();
288 if !heap_guard.contains_key(self.data_ref) {
289 panic!("DataBytes::close_read called on invalid data_ref: {}", self.data_ref);
290 }
291 let stream = heap_guard.get(self.data_ref);
292 stream.read_open = false;
293 }
294
295 pub fn set_mime_type(&self, mime:Option<String>) {
296 let mut heap_guard = bheap().lock();
297 if !heap_guard.contains_key(self.data_ref) {
298 panic!("DataBytes::set_mime_type called on invalid data_ref: {}", self.data_ref);
299 }
300 let stream = heap_guard.get(self.data_ref);
301 stream.mime_type = mime;
302 }
303
304 pub fn get_mime_type(&self) -> Option<String> {
305 let mut heap_guard = bheap().lock();
306 if !heap_guard.contains_key(self.data_ref) {
307 panic!("DataBytes::get_mime_type called on invalid data_ref: {}", self.data_ref);
308 }
309 let stream = heap_guard.get(self.data_ref);
310 stream.mime_type.as_ref().map(|s| s.to_string())
311 }
312
313 pub fn to_hex_string(&self) -> String {
314 let mut heap_guard = bheap().lock();
315 if !heap_guard.contains_key(self.data_ref) {
316 panic!("DataBytes::to_hex_string called on invalid data_ref: {}", self.data_ref);
317 }
318 let stream = heap_guard.get(self.data_ref);
319 let strs: Vec<String> = stream.data.iter()
320 .map(|b| format!("{:02X}", b))
321 .collect();
322 strs.join(" ")
323 }
324
325 pub fn deep_copy(&self) -> DataBytes {
326 let mut heap_guard = bheap().lock();
327 if !heap_guard.contains_key(self.data_ref) {
328 panic!("DataBytes::deep_copy called on invalid data_ref: {}", self.data_ref);
329 }
330 let stream_to_copy = heap_guard.get(self.data_ref);
331 let new_stream = stream_to_copy.deep_copy();
332
333 let new_data_ref = heap_guard.push(new_stream);
334 DataBytes {
335 data_ref: new_data_ref,
336 }
337 }
338
339 pub fn read_into(&self, buf: &mut [u8]) -> usize {
344 let mut heap_guard = bheap().lock();
345 if !heap_guard.contains_key(self.data_ref) {
346 panic!("DataBytes::read_into called on invalid data_ref: {}", self.data_ref);
347 }
348 let stream = heap_guard.get(self.data_ref);
349 if !stream.read_open {
350 panic!("Attempt to read from closed data stream: ref {}", self.data_ref);
351 }
352
353 let num_to_read = cmp::min(buf.len(), stream.data.len());
354
355 for (i, byte) in stream.data.drain(0..num_to_read).enumerate() {
357 buf[i] = byte;
358 }
359
360 if !stream.write_open && stream.data.is_empty() {
361 stream.read_open = false;
362 }
363
364 if stream.data.is_empty() && stream.data.capacity() > 1024 * 64 {
366 stream.data.shrink_to_fit();
367 }
368
369 num_to_read
370 }
371
372 pub fn peek(&self, n: usize) -> Vec<u8> {
374 let mut heap_guard = bheap().lock();
375 if !heap_guard.contains_key(self.data_ref) {
376 panic!("DataBytes::peek called on invalid data_ref: {}", self.data_ref);
377 }
378 let stream = heap_guard.get(self.data_ref);
379 let num_to_peek = cmp::min(n, stream.data.len());
380 stream.data.iter().take(num_to_peek).copied().collect()
381 }
382
383 pub fn pipe(&self, dest: &DataBytes) -> usize {
386 let chunk = self.read(usize::MAX);
391 let len = chunk.len();
392
393 if len > 0 {
394 dest.write(&chunk);
395 }
396 len
397 }
398
399 pub fn try_get_data(&self) -> Result<Vec<u8>, NDataError> {
402 let mut heap_guard = bheap().lock();
403 if !heap_guard.contains_key(self.data_ref) {
404 return Err(NDataError::InvalidBytesRef);
405 }
406 let stream = heap_guard.get(self.data_ref);
407 Ok(Vec::from(stream.data.clone()))
408 }
409
410 pub fn try_write(&mut self, buf:&[u8]) -> Result<(), NDataError> {
411 let mut heap_guard = bheap().lock();
412 if !heap_guard.contains_key(self.data_ref) {
413 return Err(NDataError::InvalidBytesRef);
414 }
415 let stream = heap_guard.get(self.data_ref);
416 if !stream.write_open {
417 return Err(NDataError::StreamNotWritable);
418 }
419 stream.data.extend(buf.iter().copied());
420 Ok(())
421 }
422
423 pub fn try_read(&mut self, n:usize) -> Result<Vec<u8>, NDataError> {
424 let mut heap_guard = bheap().lock();
425 if !heap_guard.contains_key(self.data_ref) {
426 return Err(NDataError::InvalidBytesRef);
427 }
428 let stream = heap_guard.get(self.data_ref);
429 if !stream.read_open {
430 return Err(NDataError::StreamNotReadable);
431 }
432 let num_to_read = cmp::min(n, stream.data.len());
433 let d: Vec<u8> = stream.data.drain(0..num_to_read).collect();
434
435 if !stream.write_open && stream.data.is_empty() {
436 stream.read_open = false;
437 }
438
439 if stream.data.is_empty() && stream.data.capacity() > 1024 * 64 {
441 stream.data.shrink_to_fit();
442 }
443
444 Ok(d)
445 }
446
447 pub fn try_set_data(&mut self, buf:&Vec<u8>) -> Result<(), NDataError> {
448 let mut heap_guard = bheap().lock();
449 if !heap_guard.contains_key(self.data_ref) {
450 return Err(NDataError::InvalidBytesRef);
451 }
452 let stream = heap_guard.get(self.data_ref);
453 let len = buf.len();
454 stream.data.clear();
455 stream.data.extend(buf.iter().copied());
456
457 stream.len = len;
458 stream.write_open = false;
459 Ok(())
460 }
461
462 pub fn try_current_len(&self) -> Result<usize, NDataError> {
463 let mut heap_guard = bheap().lock();
464 if !heap_guard.contains_key(self.data_ref) {
465 return Err(NDataError::InvalidBytesRef);
466 }
467 let stream = heap_guard.get(self.data_ref);
468 Ok(stream.data.len())
469 }
470
471 pub fn try_stream_len(&self) -> Result<usize, NDataError> {
472 let mut heap_guard = bheap().lock();
473 if !heap_guard.contains_key(self.data_ref) {
474 return Err(NDataError::InvalidBytesRef);
475 }
476 let stream = heap_guard.get(self.data_ref);
477 Ok(stream.len)
478 }
479
480 pub fn try_set_stream_len(&mut self, len: usize) -> Result<(), NDataError> {
481 let mut heap_guard = bheap().lock();
482 if !heap_guard.contains_key(self.data_ref) {
483 return Err(NDataError::InvalidBytesRef);
484 }
485 let stream = heap_guard.get(self.data_ref);
486 stream.len = len;
487 Ok(())
488 }
489
490 pub fn try_is_write_open(&self) -> Result<bool, NDataError> {
491 let mut heap_guard = bheap().lock();
492 if !heap_guard.contains_key(self.data_ref) {
493 return Err(NDataError::InvalidBytesRef);
494 }
495 let stream = heap_guard.get(self.data_ref);
496 Ok(stream.write_open)
497 }
498
499 pub fn try_is_read_open(&self) -> Result<bool, NDataError> {
500 let mut heap_guard = bheap().lock();
501 if !heap_guard.contains_key(self.data_ref) {
502 return Err(NDataError::InvalidBytesRef);
503 }
504 let stream = heap_guard.get(self.data_ref);
505 Ok(stream.read_open)
506 }
507
508 pub fn try_close_write(&mut self) -> Result<(), NDataError> {
509 let mut heap_guard = bheap().lock();
510 if !heap_guard.contains_key(self.data_ref) {
511 return Err(NDataError::InvalidBytesRef);
512 }
513 let stream = heap_guard.get(self.data_ref);
514 stream.write_open = false;
515 Ok(())
516 }
517
518 pub fn try_close_read(&mut self) -> Result<(), NDataError> {
519 let mut heap_guard = bheap().lock();
520 if !heap_guard.contains_key(self.data_ref) {
521 return Err(NDataError::InvalidBytesRef);
522 }
523 let stream = heap_guard.get(self.data_ref);
524 stream.read_open = false;
525 Ok(())
526 }
527
528 pub fn try_set_mime_type(&mut self, mime:Option<String>) -> Result<(), NDataError> {
529 let mut heap_guard = bheap().lock();
530 if !heap_guard.contains_key(self.data_ref) {
531 return Err(NDataError::InvalidBytesRef);
532 }
533 let stream = heap_guard.get(self.data_ref);
534 stream.mime_type = mime;
535 Ok(())
536 }
537
538 pub fn try_get_mime_type(&self) -> Result<Option<String>, NDataError> {
539 let mut heap_guard = bheap().lock();
540 if !heap_guard.contains_key(self.data_ref) {
541 return Err(NDataError::InvalidBytesRef);
542 }
543 let stream = heap_guard.get(self.data_ref);
544 Ok(stream.mime_type.as_ref().map(|s| s.to_string()))
545 }
546
547 pub fn get(data_ref: usize) -> DataBytes {
549 let _ = bheap().lock().incr(data_ref);
550 DataBytes{
551 data_ref,
552 }
553 }
554
555 pub fn incr(&self) {
556 let _ = bheap().lock().incr(self.data_ref);
557 }
558
559 pub fn decr(&self) {
560 let _ = bheap().lock().decr(self.data_ref);
561 }
562
563 #[deprecated(since="0.3.0", note="please use `clone` instead")]
564 pub fn duplicate(&self) -> DataBytes {
565 self.clone()
566 }
567
568 #[cfg(not(feature="no_std_support"))]
569 pub fn print_heap() {
570 println!("bytes {:?}", bheap().lock().keys());
571 }
572
573 pub fn gc() {
583 let to_process = {
586 let mut bdrop_guard = bdrop().lock();
587 if bdrop_guard.is_empty() {
588 return;
589 }
590 mem::take(&mut *bdrop_guard)
591 }; let mut bheap_guard = bheap().lock();
596 for data_ref in to_process {
597 if bheap_guard.contains_key(data_ref) {
598 bheap_guard.decr(data_ref);
599 } else {
600 #[cfg(not(feature = "no_std_support"))]
601 if cfg!(debug_assertions) {
602 println!("Warning: DataBytes::gc trying to decr non-existent ref {}", data_ref);
603 }
604 }
605 }
606 }
607}
608
609impl Drop for DataBytes {
610 fn drop(&mut self) {
611 bdrop().lock().push(self.data_ref);
612 }
613}