1use crate::error::Result;
5use crate::format::ChildRef;
6use crate::graph::{JobConfig, SerializationJob};
7use crate::reader::{ChunkNode, ParcodeItem};
8use serde::de::DeserializeOwned;
9use std::borrow::Cow;
10use std::collections::HashMap;
11use std::hash::Hash;
12use std::io::Cursor;
13use std::marker::PhantomData;
14use std::vec::IntoIter;
15
16#[derive(Debug)]
18pub struct ConfiguredJob<'a, J: ?Sized> {
19 config: JobConfig,
20 inner: Box<J>,
21 _marker: PhantomData<&'a ()>,
22}
23
24impl<'a, J: SerializationJob<'a> + ?Sized> ConfiguredJob<'a, J> {
25 pub fn new(inner: Box<J>, config: JobConfig) -> Self {
26 Self {
27 inner,
28 config,
29 _marker: PhantomData,
30 }
31 }
32}
33
34impl<'a, J: SerializationJob<'a> + ?Sized> SerializationJob<'a> for ConfiguredJob<'a, J> {
35 fn execute(&self, children_refs: &[ChildRef]) -> Result<Vec<u8>> {
36 self.inner.execute(children_refs)
37 }
38
39 fn estimated_size(&self) -> usize {
40 self.inner.estimated_size()
41 }
42
43 fn config(&self) -> JobConfig {
44 self.config
45 }
46}
47
48pub trait ParcodeLazyRef<'a>: Sized {
53 type Lazy;
57
58 fn create_lazy(node: ChunkNode<'a>) -> Result<Self::Lazy>;
60
61 fn read_lazy_from_stream(
64 reader: &mut Cursor<&[u8]>,
65 children: &mut IntoIter<ChunkNode<'a>>,
66 ) -> Result<Self::Lazy>;
67}
68
69#[derive(Debug, Clone)]
73pub struct ParcodePromise<'a, T> {
74 node: ChunkNode<'a>,
75 _m: PhantomData<T>,
76}
77
78impl<'a, T: DeserializeOwned> ParcodePromise<'a, T> {
79 pub fn new(node: ChunkNode<'a>) -> Self {
81 Self {
82 node,
83 _m: PhantomData,
84 }
85 }
86
87 pub fn load(&self) -> Result<T> {
89 self.node.decode()
90 }
91}
92
93#[derive(Debug, Clone)]
98pub struct ParcodeCollectionPromise<'a, T> {
99 node: ChunkNode<'a>,
100 _m: PhantomData<T>,
101}
102
103impl<'a, T: ParcodeItem + Send + Sync + 'a> ParcodeCollectionPromise<'a, T> {
104 pub fn new(node: ChunkNode<'a>) -> Self {
106 Self {
107 node,
108 _m: PhantomData,
109 }
110 }
111
112 pub fn load(&self) -> Result<Vec<T>> {
114 self.node.decode_parallel_collection()
115 }
116
117 pub fn get(&self, index: usize) -> Result<T> {
120 self.node.get(index)
121 }
122
123 pub fn len(&self) -> usize {
127 usize::try_from(self.node.len()).unwrap_or(usize::MAX)
128 }
129
130 pub fn is_empty(&self) -> bool {
132 self.node.is_empty()
133 }
134
135 pub fn iter(&self) -> Result<impl Iterator<Item = Result<T>> + 'a> {
137 self.node.clone().iter() }
139}
140
141impl<'a, T> ParcodeCollectionPromise<'a, T>
142where
143 T: ParcodeItem + Send + Sync + ParcodeLazyRef<'a> + 'a,
144{
145 pub fn get_lazy(&self, index: usize) -> Result<T::Lazy> {
150 let (shard_node, index_in_shard) = self.node.locate_shard_item(index)?;
152
153 let payload = shard_node.read_raw()?;
155 let mut cursor = Cursor::new(payload.as_ref());
156 let children_vec = shard_node.children()?;
157 let mut children_iter = children_vec.into_iter();
158
159 let _len: u64 =
162 bincode::serde::decode_from_std_read(&mut cursor, bincode::config::standard())
163 .map_err(|e| crate::ParcodeError::Serialization(e.to_string()))?;
164
165 for _ in 0..index_in_shard {
168 let _ = T::read_lazy_from_stream(&mut cursor, &mut children_iter)?;
169 }
170
171 T::read_lazy_from_stream(&mut cursor, &mut children_iter)
173 }
174
175 pub fn first(&self) -> Result<Option<T::Lazy>> {
177 if self.is_empty() {
178 Ok(None)
179 } else {
180 Ok(Some(self.get_lazy(0)?))
181 }
182 }
183
184 pub fn last(&self) -> Result<Option<T::Lazy>> {
186 let len = self.len();
187 if len == 0 {
188 Ok(None)
189 } else {
190 Ok(Some(self.get_lazy(len - 1)?))
191 }
192 }
193
194 pub fn iter_lazy(&self) -> Result<ParcodeLazyIterator<'a, T>> {
199 ParcodeLazyIterator::new(self.node.clone())
200 }
201}
202
203#[derive(Debug)]
209pub struct ParcodeMapPromise<'a, K, V> {
210 node: ChunkNode<'a>,
211 _m: PhantomData<(K, V)>,
212}
213
214impl<'a, K, V> ParcodeMapPromise<'a, K, V>
215where
216 K: Hash + Eq + DeserializeOwned,
217 V: DeserializeOwned,
218{
219 pub fn new(node: ChunkNode<'a>) -> Self {
221 Self {
222 node,
223 _m: PhantomData,
224 }
225 }
226
227 pub fn load(&self) -> Result<HashMap<K, V>> {
234 let container_payload = self.node.read_raw()?;
236 let num_shards = if container_payload.len() >= 4 {
237 u32::from_le_bytes(
238 container_payload
239 .get(0..4)
240 .ok_or_else(|| crate::ParcodeError::Format("Payload too short".into()))?
241 .try_into()
242 .map_err(|_| crate::ParcodeError::Format("Failed to read num_shards".into()))?,
243 )
244 } else {
245 0
246 };
247
248 let mut map = HashMap::new();
249 if num_shards == 0 {
250 return Ok(map);
251 }
252
253 let shards = self.node.children()?;
256 for shard in shards {
257 let payload = shard.read_raw()?;
258 if payload.len() < 8 {
259 continue;
260 }
261
262 let count = u32::from_le_bytes(
264 payload
265 .get(0..4)
266 .ok_or_else(|| {
267 crate::ParcodeError::Format("Payload too short for count".into())
268 })?
269 .try_into()
270 .map_err(|_| crate::ParcodeError::Format("Failed to read count".into()))?,
271 ) as usize;
272
273 let offsets_start = 8 + (count * 8);
275 let data_start = offsets_start + (count * 4);
276
277 let offsets_bytes = payload
279 .get(offsets_start..data_start)
280 .ok_or_else(|| crate::ParcodeError::Format("Offsets out of bounds".into()))?;
281
282 for i in 0..count {
283 let off_bytes = offsets_bytes.get(i * 4..(i + 1) * 4).ok_or_else(|| {
284 crate::ParcodeError::Format("Offset index out of bounds".into())
285 })?;
286 let offset = u32::from_le_bytes(
287 off_bytes
288 .try_into()
289 .map_err(|_| crate::ParcodeError::Format("Failed to read offset".into()))?,
290 ) as usize;
291 let data_slice = payload.get(data_start + offset..).ok_or_else(|| {
292 crate::ParcodeError::Format("Data slice out of bounds".into())
293 })?;
294
295 let (k, v): (K, V) =
297 bincode::serde::decode_from_slice(data_slice, bincode::config::standard())
298 .map_err(|e| crate::ParcodeError::Serialization(e.to_string()))?
299 .0;
300
301 map.insert(k, v);
302 }
303 }
304 Ok(map)
305 }
306
307 pub fn get(&self, key: &K) -> Result<Option<V>> {
321 let container_payload = self.node.read_raw()?;
323 if container_payload.len() < 4 {
324 return Ok(None);
325 } let num_shards = u32::from_le_bytes(
327 container_payload
328 .get(0..4)
329 .ok_or_else(|| crate::ParcodeError::Format("Payload too short".into()))?
330 .try_into()
331 .map_err(|_| crate::ParcodeError::Format("Failed to read num_shards".into()))?,
332 );
333
334 let target_hash = crate::map::hash_key(key);
336 let shard_idx = usize::try_from(target_hash).unwrap_or(0) % (num_shards as usize);
337
338 let shard = self.node.get_child_by_index(shard_idx)?;
340 let payload = shard.read_raw()?;
341
342 if payload.len() < 8 {
343 return Ok(None);
344 } let count = u32::from_le_bytes(
347 payload
348 .get(0..4)
349 .ok_or_else(|| crate::ParcodeError::Format("Payload too short for count".into()))?
350 .try_into()
351 .map_err(|_| crate::ParcodeError::Format("Failed to read count".into()))?,
352 ) as usize;
353 let hashes_start = 8;
356 let hashes_end = hashes_start + (count * 8);
357 let offsets_start = hashes_end;
358 let data_start = offsets_start + (count * 4);
359
360 let hashes_slice = payload
362 .get(hashes_start..hashes_end)
363 .ok_or_else(|| crate::ParcodeError::Format("Hashes slice out of bounds".into()))?;
364
365 for (i, chunk) in hashes_slice.chunks_exact(8).enumerate() {
366 let h = u64::from_le_bytes(
367 chunk
368 .try_into()
369 .map_err(|_| crate::ParcodeError::Format("Failed to read hash".into()))?,
370 );
371
372 if h == target_hash {
373 let offset_bytes = payload
375 .get(offsets_start + (i * 4)..offsets_start + (i * 4) + 4)
376 .ok_or_else(|| {
377 crate::ParcodeError::Format("Offset bytes out of bounds".into())
378 })?;
379 let offset = u32::from_le_bytes(
380 offset_bytes
381 .try_into()
382 .map_err(|_| crate::ParcodeError::Format("Failed to read offset".into()))?,
383 ) as usize;
384
385 let data_slice = payload.get(data_start + offset..).ok_or_else(|| {
386 crate::ParcodeError::Format("Data slice out of bounds".into())
387 })?;
388
389 let (stored_key, stored_val): (K, V) =
392 bincode::serde::decode_from_slice(data_slice, bincode::config::standard())
393 .map_err(|e| crate::ParcodeError::Serialization(e.to_string()))?
394 .0;
395
396 if &stored_key == key {
397 return Ok(Some(stored_val));
398 }
399 }
401 }
402
403 Ok(None)
404 }
405}
406
407impl<'a, K, V> ParcodeLazyRef<'a> for HashMap<K, V>
408where
409 K: Hash + Eq + DeserializeOwned + Send + Sync + 'static,
410 V: DeserializeOwned + Send + Sync + 'static,
411{
412 type Lazy = ParcodeMapPromise<'a, K, V>;
413 fn create_lazy(node: ChunkNode<'a>) -> Result<Self::Lazy> {
414 Ok(ParcodeMapPromise::new(node))
415 }
416
417 fn read_lazy_from_stream(
418 _: &mut Cursor<&[u8]>,
419 children: &mut IntoIter<ChunkNode<'a>>,
420 ) -> Result<Self::Lazy> {
421 let child_node = children.next().ok_or_else(|| {
422 crate::ParcodeError::Format("Missing child node for HashMap field".into())
423 })?;
424 Ok(ParcodeMapPromise::new(child_node))
425 }
426}
427
428impl<'a, K, V> ParcodeMapPromise<'a, K, V>
429where
430 K: Hash + Eq + DeserializeOwned + Send + Sync + 'static,
431 V: ParcodeItem + Send + Sync + ParcodeLazyRef<'a> + 'a,
432{
433 pub fn get_lazy(&self, key: &K) -> Result<Option<V::Lazy>> {
438 let container_payload = self.node.read_raw()?;
440 if container_payload.len() < 4 {
441 return Ok(None);
442 } let num_shards = u32::from_le_bytes(
444 container_payload
445 .get(0..4)
446 .ok_or_else(|| crate::ParcodeError::Format("Payload too short".into()))?
447 .try_into()
448 .map_err(|_| crate::ParcodeError::Format("Failed to read num_shards".into()))?,
449 );
450
451 let target_hash = crate::map::hash_key(key);
453 let shard_idx = usize::try_from(target_hash).unwrap_or(0) % (num_shards as usize);
454
455 let shard = self.node.get_child_by_index(shard_idx)?;
457 let payload = shard.read_raw()?;
458
459 if payload.len() < 8 {
460 return Ok(None);
461 } let count = u32::from_le_bytes(
464 payload
465 .get(0..4)
466 .ok_or_else(|| crate::ParcodeError::Format("Payload too short for count".into()))?
467 .try_into()
468 .map_err(|_| crate::ParcodeError::Format("Failed to read count".into()))?,
469 ) as usize;
470 let hashes_start = 8;
473 let hashes_end = hashes_start + (count * 8);
474 let offsets_start = hashes_end;
475 let data_start = offsets_start + (count * 4);
476
477 let hashes_slice = payload
479 .get(hashes_start..hashes_end)
480 .ok_or_else(|| crate::ParcodeError::Format("Hashes slice out of bounds".into()))?;
481
482 let mut target_index = None;
484
485 for (i, chunk) in hashes_slice.chunks_exact(8).enumerate() {
486 let h = u64::from_le_bytes(
487 chunk
488 .try_into()
489 .map_err(|_| crate::ParcodeError::Format("Failed to read hash".into()))?,
490 );
491
492 if h == target_hash {
493 let offset_bytes = payload
495 .get(offsets_start + (i * 4)..offsets_start + (i * 4) + 4)
496 .ok_or_else(|| {
497 crate::ParcodeError::Format("Offset bytes out of bounds".into())
498 })?;
499 let offset = u32::from_le_bytes(
500 offset_bytes
501 .try_into()
502 .map_err(|_| crate::ParcodeError::Format("Failed to read offset".into()))?,
503 ) as usize;
504
505 let data_slice = payload.get(data_start + offset..).ok_or_else(|| {
506 crate::ParcodeError::Format("Data slice out of bounds".into())
507 })?;
508
509 let (stored_key, _): (K, usize) =
512 bincode::serde::decode_from_slice(data_slice, bincode::config::standard())
513 .map_err(|e| crate::ParcodeError::Serialization(e.to_string()))?;
514
515 if &stored_key == key {
516 target_index = Some(i);
517 break;
518 }
519 }
520 }
521
522 let target_index = match target_index {
523 Some(idx) => idx,
524 None => return Ok(None),
525 };
526
527 let children = shard.children()?;
532 let mut child_iter = children.into_iter();
533
534 for i in 0..target_index {
536 let offset_bytes = payload
537 .get(offsets_start + (i * 4)..offsets_start + (i * 4) + 4)
538 .ok_or_else(|| crate::ParcodeError::Format("Offset bytes out of bounds".into()))?;
539 let offset = u32::from_le_bytes(
540 offset_bytes
541 .try_into()
542 .map_err(|_| crate::ParcodeError::Format("Failed to read offset".into()))?,
543 ) as usize;
544
545 let data_slice = payload
546 .get(data_start + offset..)
547 .ok_or_else(|| crate::ParcodeError::Format("Data slice out of bounds".into()))?;
548
549 let mut cursor = Cursor::new(data_slice);
550
551 let _: K =
553 bincode::serde::decode_from_std_read(&mut cursor, bincode::config::standard())
554 .map_err(|e| crate::ParcodeError::Serialization(e.to_string()))?;
555
556 let _ = V::read_lazy_from_stream(&mut cursor, &mut child_iter)?;
558 }
559
560 let offset_bytes = payload
562 .get(offsets_start + (target_index * 4)..offsets_start + (target_index * 4) + 4)
563 .ok_or_else(|| crate::ParcodeError::Format("Offset bytes out of bounds".into()))?;
564 let offset = u32::from_le_bytes(
565 offset_bytes
566 .try_into()
567 .map_err(|_| crate::ParcodeError::Format("Failed to read offset".into()))?,
568 ) as usize;
569
570 let data_slice = payload
571 .get(data_start + offset..)
572 .ok_or_else(|| crate::ParcodeError::Format("Data slice out of bounds".into()))?;
573
574 let mut cursor = Cursor::new(data_slice);
575
576 let _: K = bincode::serde::decode_from_std_read(&mut cursor, bincode::config::standard())
578 .map_err(|e| crate::ParcodeError::Serialization(e.to_string()))?;
579
580 let lazy_val = V::read_lazy_from_stream(&mut cursor, &mut child_iter)?;
582
583 Ok(Some(lazy_val))
584 }
585}
586
587#[derive(Debug)]
592pub struct ParcodeLazyIterator<'a, T: ParcodeLazyRef<'a>> {
593 shards: std::vec::IntoIter<ChunkNode<'a>>,
595
596 current_payload: Option<Cow<'a, [u8]>>,
598 current_pos: u64,
600 current_children: std::vec::IntoIter<ChunkNode<'a>>,
602 items_left_in_shard: u64,
604
605 total_items: usize,
606 items_yielded: usize,
607
608 _marker: PhantomData<T>,
609}
610
611impl<'a, T: ParcodeLazyRef<'a>> ParcodeLazyIterator<'a, T> {
612 pub fn new(node: ChunkNode<'a>) -> Result<Self> {
613 let total_items = usize::try_from(node.len()).unwrap_or(usize::MAX);
614 let shards = node.children()?.into_iter();
615
616 Ok(Self {
617 shards,
618 current_payload: None,
619 current_pos: 0,
620 current_children: Vec::new().into_iter(),
621 items_left_in_shard: 0,
622 total_items,
623 items_yielded: 0,
624 _marker: PhantomData,
625 })
626 }
627
628 fn ensure_shard_loaded(&mut self) -> Result<bool> {
630 if self.items_left_in_shard > 0 {
631 return Ok(true);
632 }
633
634 if let Some(shard_node) = self.shards.next() {
636 let payload = shard_node.read_raw()?;
638
639 let children = shard_node.children()?;
641
642 let mut cursor = Cursor::new(payload.as_ref());
645 let len: u64 =
646 bincode::serde::decode_from_std_read(&mut cursor, bincode::config::standard())
647 .map_err(|e| crate::ParcodeError::Serialization(e.to_string()))?;
648
649 self.current_pos = cursor.position();
650 self.items_left_in_shard = len;
651 self.current_payload = Some(payload);
652 self.current_children = children.into_iter();
653
654 Ok(true)
655 } else {
656 Ok(false) }
658 }
659}
660
661impl<'a, T: ParcodeLazyRef<'a>> Iterator for ParcodeLazyIterator<'a, T> {
662 type Item = Result<T::Lazy>;
663
664 fn next(&mut self) -> Option<Self::Item> {
665 match self.ensure_shard_loaded() {
667 Ok(true) => {}
668 Ok(false) => return None, Err(e) => return Some(Err(e)),
670 }
671
672 let payload = self
674 .current_payload
675 .as_ref()
676 .expect("ensure_shard_loaded guaranteed payload");
677 let mut cursor = Cursor::new(payload.as_ref());
678 cursor.set_position(self.current_pos);
679
680 let result = T::read_lazy_from_stream(&mut cursor, &mut self.current_children);
683
684 self.current_pos = cursor.position();
686 self.items_left_in_shard -= 1;
687 self.items_yielded += 1;
688
689 Some(result)
690 }
691
692 fn size_hint(&self) -> (usize, Option<usize>) {
693 let remaining = self.total_items - self.items_yielded;
694 (remaining, Some(remaining))
695 }
696}
697
698impl<'a, T: ParcodeLazyRef<'a>> ExactSizeIterator for ParcodeLazyIterator<'a, T> {
699 fn len(&self) -> usize {
700 self.total_items - self.items_yielded
701 }
702}
703
704macro_rules! impl_lazy_primitive {
707 ($($t:ty),*) => {
708 $(
709 impl<'a> ParcodeLazyRef<'a> for $t {
710 type Lazy = ParcodePromise<'a, $t>;
711 fn create_lazy(node: ChunkNode<'a>) -> Result<Self::Lazy> {
712 Ok(ParcodePromise::new(node))
713 }
714
715 fn read_lazy_from_stream(
716 _: &mut Cursor<&[u8]>,
717 children: &mut IntoIter<ChunkNode<'a>>,
718 ) -> Result<Self::Lazy> {
719 let child_node = children.next().ok_or_else(|| {
720 crate::ParcodeError::Format("Missing child node for chunkable primitive field".into())
721 })?;
722 Ok(ParcodePromise::new(child_node))
723 }
724 }
725 )*
726 }
727}
728
729impl_lazy_primitive!(u8, u16, u32, u64, i8, i16, i32, i64, f32, f64, bool, String);
730
731impl<'a, T: ParcodeItem + Send + Sync + 'static> ParcodeLazyRef<'a> for Vec<T> {
734 type Lazy = ParcodeCollectionPromise<'a, T>;
735 fn create_lazy(node: ChunkNode<'a>) -> Result<Self::Lazy> {
736 Ok(ParcodeCollectionPromise::new(node))
737 }
738
739 fn read_lazy_from_stream(
740 _: &mut Cursor<&[u8]>,
741 children: &mut IntoIter<ChunkNode<'a>>,
742 ) -> Result<Self::Lazy> {
743 let child_node = children.next().ok_or_else(|| {
744 crate::ParcodeError::Format("Missing child node for Vec field".into())
745 })?;
746
747 Ok(ParcodeCollectionPromise::new(child_node))
748 }
749}