1use ipfrs_core::error::{Error, Result};
31use ipfrs_core::{Block, Cid};
32use ipfrs_storage::traits::BlockStore;
33use serde::{Deserialize, Serialize};
34use std::collections::{HashMap, HashSet, VecDeque};
35use std::sync::Arc;
36use tokio::sync::RwLock;
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "type", rename_all = "lowercase")]
43#[derive(Default)]
44pub enum Selector {
45 #[default]
47 All,
48 Fields { fields: Vec<String> },
50 RecursiveDepth { max_depth: usize },
52 RecursiveAll,
54 Index { index: usize },
56 Sequence { selectors: Vec<Selector> },
58 Matcher,
60}
61
62impl Selector {
63 pub fn from_json(json: &str) -> Result<Self> {
65 serde_json::from_str(json)
66 .map_err(|e| Error::InvalidInput(format!("Failed to parse selector: {}", e)))
67 }
68
69 pub fn validate(&self) -> Result<()> {
71 match self {
72 Selector::RecursiveDepth { max_depth } => {
73 if *max_depth == 0 {
74 return Err(Error::InvalidInput(
75 "max_depth must be greater than 0".to_string(),
76 ));
77 }
78 }
79 Selector::Sequence { selectors } => {
80 for sel in selectors {
81 sel.validate()?;
82 }
83 }
84 _ => {}
85 }
86 Ok(())
87 }
88
89 pub fn matches_all(&self) -> bool {
91 matches!(self, Selector::All | Selector::RecursiveAll)
92 }
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum TraversalMode {
98 BreadthFirst,
100 DepthFirst,
102}
103
104#[derive(Debug, Clone)]
106pub struct TraversalState {
107 pub root: Cid,
109 pub visited: HashSet<Cid>,
111 pub queue: VecDeque<(Cid, usize)>, pub current_depth: usize,
115 pub max_depth: Option<usize>,
117 pub blocks_fetched: usize,
119 pub bytes_fetched: u64,
121}
122
123impl TraversalState {
124 pub fn new(root: Cid, max_depth: Option<usize>) -> Self {
126 let mut queue = VecDeque::new();
127 queue.push_back((root, 0));
128
129 Self {
130 root,
131 visited: HashSet::new(),
132 queue,
133 current_depth: 0,
134 max_depth,
135 blocks_fetched: 0,
136 bytes_fetched: 0,
137 }
138 }
139
140 pub fn is_complete(&self) -> bool {
142 self.queue.is_empty()
143 }
144
145 pub fn next(&mut self, mode: TraversalMode) -> Option<(Cid, usize)> {
147 match mode {
148 TraversalMode::BreadthFirst => self.queue.pop_front(),
149 TraversalMode::DepthFirst => self.queue.pop_back(),
150 }
151 }
152
153 pub fn enqueue(&mut self, cid: Cid, depth: usize) {
155 if let Some(max) = self.max_depth {
156 if depth > max {
157 return;
158 }
159 }
160
161 if !self.visited.contains(&cid) {
162 self.queue.push_back((cid, depth));
163 }
164 }
165
166 pub fn mark_visited(&mut self, cid: Cid, size: u64) {
168 self.visited.insert(cid);
169 self.blocks_fetched += 1;
170 self.bytes_fetched += size;
171 }
172
173 pub fn checkpoint(&self) -> TraversalCheckpoint {
175 TraversalCheckpoint {
176 root: self.root,
177 visited: self.visited.clone(),
178 queue: self.queue.clone(),
179 max_depth: self.max_depth,
180 blocks_fetched: self.blocks_fetched,
181 bytes_fetched: self.bytes_fetched,
182 }
183 }
184
185 pub fn from_checkpoint(checkpoint: TraversalCheckpoint) -> Self {
187 Self {
188 root: checkpoint.root,
189 visited: checkpoint.visited,
190 queue: checkpoint.queue,
191 current_depth: 0,
192 max_depth: checkpoint.max_depth,
193 blocks_fetched: checkpoint.blocks_fetched,
194 bytes_fetched: checkpoint.bytes_fetched,
195 }
196 }
197}
198
199#[derive(Debug, Clone)]
201pub struct TraversalCheckpoint {
202 pub root: Cid,
204 pub visited: HashSet<Cid>,
206 pub queue: VecDeque<(Cid, usize)>,
208 pub max_depth: Option<usize>,
210 pub blocks_fetched: usize,
212 pub bytes_fetched: u64,
214}
215
216impl TraversalCheckpoint {
217 pub fn to_json(&self) -> Result<String> {
219 #[derive(Serialize)]
220 struct SerializableCheckpoint {
221 root: String,
222 visited: Vec<String>,
223 queue: Vec<(String, usize)>,
224 max_depth: Option<usize>,
225 blocks_fetched: usize,
226 bytes_fetched: u64,
227 }
228
229 let serializable = SerializableCheckpoint {
230 root: self.root.to_string(),
231 visited: self.visited.iter().map(|c| c.to_string()).collect(),
232 queue: self
233 .queue
234 .iter()
235 .map(|(c, d)| (c.to_string(), *d))
236 .collect(),
237 max_depth: self.max_depth,
238 blocks_fetched: self.blocks_fetched,
239 bytes_fetched: self.bytes_fetched,
240 };
241
242 serde_json::to_string(&serializable)
243 .map_err(|e| Error::Internal(format!("Failed to serialize checkpoint: {}", e)))
244 }
245
246 pub fn from_json(json: &str) -> Result<Self> {
248 #[derive(Deserialize)]
249 struct SerializableCheckpoint {
250 root: String,
251 visited: Vec<String>,
252 queue: Vec<(String, usize)>,
253 max_depth: Option<usize>,
254 blocks_fetched: usize,
255 bytes_fetched: u64,
256 }
257
258 let serializable: SerializableCheckpoint = serde_json::from_str(json)
259 .map_err(|e| Error::Internal(format!("Failed to deserialize checkpoint: {}", e)))?;
260
261 let root: Cid = serializable
262 .root
263 .parse()
264 .map_err(|e| Error::InvalidInput(format!("Invalid root CID: {}", e)))?;
265
266 let visited: Result<HashSet<Cid>> = serializable
267 .visited
268 .iter()
269 .map(|s| {
270 s.parse()
271 .map_err(|e| Error::InvalidInput(format!("Invalid CID: {}", e)))
272 })
273 .collect();
274
275 let queue: Result<VecDeque<(Cid, usize)>> = serializable
276 .queue
277 .iter()
278 .map(|(s, d)| {
279 s.parse()
280 .map(|c| (c, *d))
281 .map_err(|e| Error::InvalidInput(format!("Invalid CID: {}", e)))
282 })
283 .collect();
284
285 Ok(Self {
286 root,
287 visited: visited?,
288 queue: queue?,
289 max_depth: serializable.max_depth,
290 blocks_fetched: serializable.blocks_fetched,
291 bytes_fetched: serializable.bytes_fetched,
292 })
293 }
294}
295
296pub struct DagTraversal<S: BlockStore> {
298 store: Arc<S>,
300 mode: TraversalMode,
302 #[allow(dead_code)]
304 selector: Selector,
305 state: Arc<RwLock<TraversalState>>,
307}
308
309impl<S: BlockStore> DagTraversal<S> {
310 pub fn new(store: Arc<S>, root: Cid, selector: Selector, mode: TraversalMode) -> Result<Self> {
312 selector.validate()?;
313
314 let max_depth = match &selector {
315 Selector::RecursiveDepth { max_depth } => Some(*max_depth),
316 _ => None,
317 };
318
319 let state = TraversalState::new(root, max_depth);
320
321 Ok(Self {
322 store,
323 mode,
324 selector,
325 state: Arc::new(RwLock::new(state)),
326 })
327 }
328
329 pub fn from_checkpoint(
331 store: Arc<S>,
332 checkpoint: TraversalCheckpoint,
333 selector: Selector,
334 mode: TraversalMode,
335 ) -> Result<Self> {
336 selector.validate()?;
337 let state = TraversalState::from_checkpoint(checkpoint);
338
339 Ok(Self {
340 store,
341 mode,
342 selector,
343 state: Arc::new(RwLock::new(state)),
344 })
345 }
346
347 pub async fn next_block(&self) -> Result<Option<Block>> {
349 let mut state = self.state.write().await;
350
351 let (cid, depth) = match state.next(self.mode) {
353 Some(item) => item,
354 None => return Ok(None),
355 };
356
357 let block = match self.store.get(&cid).await? {
359 Some(b) => b,
360 None => return Err(Error::NotFound(format!("Block not found for CID: {}", cid))),
361 };
362
363 state.mark_visited(cid, block.data().len() as u64);
365 state.current_depth = depth;
366
367 if let Ok(links) = self.extract_links(&block) {
369 for link_cid in links {
370 state.enqueue(link_cid, depth + 1);
371 }
372 }
373
374 Ok(Some(block))
375 }
376
377 fn extract_links(&self, _block: &Block) -> Result<Vec<Cid>> {
379 Ok(Vec::new())
389 }
390
391 pub async fn is_complete(&self) -> bool {
393 self.state.read().await.is_complete()
394 }
395
396 pub async fn stats(&self) -> TraversalStats {
398 let state = self.state.read().await;
399 TraversalStats {
400 blocks_fetched: state.blocks_fetched,
401 bytes_fetched: state.bytes_fetched,
402 blocks_remaining: state.queue.len(),
403 current_depth: state.current_depth,
404 }
405 }
406
407 pub async fn checkpoint(&self) -> TraversalCheckpoint {
409 self.state.read().await.checkpoint()
410 }
411
412 pub async fn collect_all(&self) -> Result<Vec<Block>> {
414 let mut blocks = Vec::new();
415
416 while let Some(block) = self.next_block().await? {
417 blocks.push(block);
418 }
419
420 Ok(blocks)
421 }
422}
423
424#[derive(Debug, Clone)]
426pub struct TraversalStats {
427 pub blocks_fetched: usize,
429 pub bytes_fetched: u64,
431 pub blocks_remaining: usize,
433 pub current_depth: usize,
435}
436
437pub struct GraphSync<S: BlockStore> {
439 store: Arc<S>,
441 traversals: Arc<RwLock<HashMap<Cid, Arc<DagTraversal<S>>>>>,
443}
444
445impl<S: BlockStore> GraphSync<S> {
446 pub fn new(store: Arc<S>) -> Result<Self> {
448 Ok(Self {
449 store,
450 traversals: Arc::new(RwLock::new(HashMap::new())),
451 })
452 }
453
454 pub async fn start_traversal(
456 &self,
457 root: Cid,
458 selector: Selector,
459 mode: TraversalMode,
460 ) -> Result<Arc<DagTraversal<S>>> {
461 let traversal = Arc::new(DagTraversal::new(self.store.clone(), root, selector, mode)?);
462
463 let mut traversals = self.traversals.write().await;
464 traversals.insert(root, traversal.clone());
465
466 Ok(traversal)
467 }
468
469 pub async fn resume_traversal(
471 &self,
472 checkpoint: TraversalCheckpoint,
473 selector: Selector,
474 mode: TraversalMode,
475 ) -> Result<Arc<DagTraversal<S>>> {
476 let root = checkpoint.root;
477 let traversal = Arc::new(DagTraversal::from_checkpoint(
478 self.store.clone(),
479 checkpoint,
480 selector,
481 mode,
482 )?);
483
484 let mut traversals = self.traversals.write().await;
485 traversals.insert(root, traversal.clone());
486
487 Ok(traversal)
488 }
489
490 pub async fn get_traversal(&self, root: &Cid) -> Option<Arc<DagTraversal<S>>> {
492 self.traversals.read().await.get(root).cloned()
493 }
494
495 pub async fn remove_traversal(&self, root: &Cid) {
497 self.traversals.write().await.remove(root);
498 }
499
500 pub async fn active_count(&self) -> usize {
502 self.traversals.read().await.len()
503 }
504}
505
506#[derive(Debug, Clone, Serialize, Deserialize)]
508pub struct GradientMessage {
509 pub id: String,
511 pub data: Vec<u8>,
513 pub shape: Vec<usize>,
515 pub dtype: String,
517 pub checksum: u64,
519 pub metadata: HashMap<String, String>,
521}
522
523impl GradientMessage {
524 pub fn new(
526 id: impl Into<String>,
527 data: Vec<u8>,
528 shape: Vec<usize>,
529 dtype: impl Into<String>,
530 ) -> Self {
531 let checksum = Self::compute_checksum(&data);
532 Self {
533 id: id.into(),
534 data,
535 shape,
536 dtype: dtype.into(),
537 checksum,
538 metadata: HashMap::new(),
539 }
540 }
541
542 fn compute_checksum(data: &[u8]) -> u64 {
544 let mut hash: u64 = 0xcbf29ce484222325;
546 for &byte in data {
547 hash ^= byte as u64;
548 hash = hash.wrapping_mul(0x100000001b3);
549 }
550 hash
551 }
552
553 pub fn verify(&self) -> bool {
555 Self::compute_checksum(&self.data) == self.checksum
556 }
557
558 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
560 self.metadata.insert(key.into(), value.into());
561 self
562 }
563
564 pub fn num_elements(&self) -> usize {
566 self.shape.iter().product()
567 }
568
569 pub fn size_bytes(&self) -> usize {
571 self.data.len()
572 }
573}
574
575#[derive(Debug, Clone, Copy, PartialEq, Eq)]
577pub enum AggregationStrategy {
578 Average,
580 WeightedAverage,
582 Median,
584 FederatedAvg,
586}
587
588pub struct GradientAggregator {
590 strategy: AggregationStrategy,
592 gradients: Arc<RwLock<HashMap<String, Vec<GradientMessage>>>>,
594 expected_contributors: usize,
596 verify_checksums: bool,
598}
599
600impl GradientAggregator {
601 pub fn new(strategy: AggregationStrategy, expected_contributors: usize) -> Self {
603 Self {
604 strategy,
605 gradients: Arc::new(RwLock::new(HashMap::new())),
606 expected_contributors,
607 verify_checksums: true,
608 }
609 }
610
611 pub async fn add_gradient(&self, gradient: GradientMessage) -> Result<()> {
613 if self.verify_checksums && !gradient.verify() {
615 return Err(Error::InvalidInput(format!(
616 "Gradient checksum verification failed for {}",
617 gradient.id
618 )));
619 }
620
621 if gradient.num_elements() == 0 {
623 return Err(Error::InvalidInput(
624 "Gradient has zero elements".to_string(),
625 ));
626 }
627
628 let mut gradients = self.gradients.write().await;
629 gradients
630 .entry(gradient.id.clone())
631 .or_insert_with(Vec::new)
632 .push(gradient);
633
634 Ok(())
635 }
636
637 pub async fn is_ready(&self, layer_id: &str) -> bool {
639 let gradients = self.gradients.read().await;
640 gradients
641 .get(layer_id)
642 .map(|g| g.len() >= self.expected_contributors)
643 .unwrap_or(false)
644 }
645
646 pub async fn aggregate(&self, layer_id: &str) -> Result<GradientMessage> {
648 let gradients = self.gradients.read().await;
649 let layer_gradients = gradients
650 .get(layer_id)
651 .ok_or_else(|| Error::NotFound(format!("No gradients for layer: {}", layer_id)))?;
652
653 if layer_gradients.is_empty() {
654 return Err(Error::InvalidInput("No gradients to aggregate".to_string()));
655 }
656
657 let first_shape = &layer_gradients[0].shape;
659 for grad in layer_gradients.iter().skip(1) {
660 if &grad.shape != first_shape {
661 return Err(Error::InvalidInput("Gradient shape mismatch".to_string()));
662 }
663 }
664
665 match self.strategy {
666 AggregationStrategy::Average | AggregationStrategy::FederatedAvg => {
667 self.aggregate_average(layer_id, layer_gradients)
668 }
669 AggregationStrategy::WeightedAverage => {
670 self.aggregate_weighted(layer_id, layer_gradients)
671 }
672 AggregationStrategy::Median => self.aggregate_median(layer_id, layer_gradients),
673 }
674 }
675
676 fn aggregate_average(
678 &self,
679 layer_id: &str,
680 gradients: &[GradientMessage],
681 ) -> Result<GradientMessage> {
682 let n = gradients.len();
683 let size = gradients[0].data.len();
684
685 let mut sum = vec![0u8; size];
687 for grad in gradients {
688 for (i, &byte) in grad.data.iter().enumerate() {
689 sum[i] = sum[i].saturating_add(byte / n as u8);
690 }
691 }
692
693 Ok(GradientMessage::new(
694 layer_id,
695 sum,
696 gradients[0].shape.clone(),
697 gradients[0].dtype.clone(),
698 ))
699 }
700
701 fn aggregate_weighted(
703 &self,
704 layer_id: &str,
705 gradients: &[GradientMessage],
706 ) -> Result<GradientMessage> {
707 let weights: Vec<f32> = gradients
709 .iter()
710 .map(|g| {
711 g.metadata
712 .get("samples")
713 .and_then(|s| s.parse::<f32>().ok())
714 .unwrap_or(1.0)
715 })
716 .collect();
717
718 let total_weight: f32 = weights.iter().sum();
719 let size = gradients[0].data.len();
720
721 let mut weighted_sum = vec![0u8; size];
723 for (grad, &weight) in gradients.iter().zip(weights.iter()) {
724 let normalized_weight = weight / total_weight;
725 for (i, &byte) in grad.data.iter().enumerate() {
726 weighted_sum[i] =
727 weighted_sum[i].saturating_add((byte as f32 * normalized_weight) as u8);
728 }
729 }
730
731 Ok(GradientMessage::new(
732 layer_id,
733 weighted_sum,
734 gradients[0].shape.clone(),
735 gradients[0].dtype.clone(),
736 ))
737 }
738
739 fn aggregate_median(
741 &self,
742 _layer_id: &str,
743 _gradients: &[GradientMessage],
744 ) -> Result<GradientMessage> {
745 Err(Error::NotImplemented(
748 "Median aggregation not yet implemented".to_string(),
749 ))
750 }
751
752 pub async fn clear(&self, layer_id: &str) {
754 let mut gradients = self.gradients.write().await;
755 gradients.remove(layer_id);
756 }
757
758 pub async fn stats(&self) -> GradientAggregatorStats {
760 let gradients = self.gradients.read().await;
761 let total_gradients: usize = gradients.values().map(|v| v.len()).sum();
762 let layers_count = gradients.len();
763
764 GradientAggregatorStats {
765 total_gradients,
766 layers_count,
767 expected_contributors: self.expected_contributors,
768 }
769 }
770}
771
772#[derive(Debug, Clone)]
774pub struct GradientAggregatorStats {
775 pub total_gradients: usize,
777 pub layers_count: usize,
779 pub expected_contributors: usize,
781}
782
783pub struct GradientStream {
785 aggregator: Arc<GradientAggregator>,
787 outgoing: Arc<RwLock<VecDeque<GradientMessage>>>,
789 max_queue_size: usize,
791}
792
793impl GradientStream {
794 pub fn new(aggregator: Arc<GradientAggregator>, max_queue_size: usize) -> Self {
796 Self {
797 aggregator,
798 outgoing: Arc::new(RwLock::new(VecDeque::new())),
799 max_queue_size,
800 }
801 }
802
803 pub async fn push_gradient(&self, gradient: GradientMessage) -> Result<()> {
805 let mut outgoing = self.outgoing.write().await;
806 if outgoing.len() >= self.max_queue_size {
807 return Err(Error::Internal("Gradient queue is full".to_string()));
808 }
809 outgoing.push_back(gradient);
810 Ok(())
811 }
812
813 pub async fn pop_gradient(&self) -> Option<GradientMessage> {
815 self.outgoing.write().await.pop_front()
816 }
817
818 pub async fn receive_gradient(&self, gradient: GradientMessage) -> Result<()> {
820 self.aggregator.add_gradient(gradient).await
821 }
822
823 pub async fn queue_size(&self) -> usize {
825 self.outgoing.read().await.len()
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use super::*;
832
833 #[test]
834 fn test_selector_parse() {
835 let json = r#"{"type":"all"}"#;
836 let selector = Selector::from_json(json).unwrap();
837 assert!(selector.matches_all());
838
839 let json2 = r#"{"type":"recursivedepth","max_depth":5}"#;
840 let selector2 = Selector::from_json(json2).unwrap();
841 match selector2 {
842 Selector::RecursiveDepth { max_depth } => assert_eq!(max_depth, 5),
843 _ => panic!("Wrong selector type"),
844 }
845 }
846
847 #[test]
848 fn test_selector_validate() {
849 let selector = Selector::RecursiveDepth { max_depth: 0 };
850 assert!(selector.validate().is_err());
851
852 let selector2 = Selector::RecursiveDepth { max_depth: 5 };
853 assert!(selector2.validate().is_ok());
854 }
855
856 #[test]
857 fn test_traversal_state() {
858 let cid: Cid = "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
859 .parse()
860 .unwrap();
861
862 let mut state = TraversalState::new(cid, Some(3));
863 assert!(!state.is_complete());
864
865 let (root_cid, depth) = state.next(TraversalMode::BreadthFirst).unwrap();
867 assert_eq!(root_cid, cid);
868 assert_eq!(depth, 0);
869
870 state.mark_visited(cid, 1024);
871 assert_eq!(state.blocks_fetched, 1);
872 assert_eq!(state.bytes_fetched, 1024);
873
874 assert!(state.is_complete());
875 }
876
877 #[test]
878 fn test_checkpoint() {
879 let cid: Cid = "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
880 .parse()
881 .unwrap();
882
883 let mut state = TraversalState::new(cid, Some(3));
884 state.mark_visited(cid, 1024);
885
886 let checkpoint = state.checkpoint();
887 assert_eq!(checkpoint.root, cid);
888 assert_eq!(checkpoint.blocks_fetched, 1);
889 assert_eq!(checkpoint.bytes_fetched, 1024);
890
891 let restored = TraversalState::from_checkpoint(checkpoint);
892 assert_eq!(restored.blocks_fetched, 1);
893 assert_eq!(restored.bytes_fetched, 1024);
894 }
895
896 #[test]
897 fn test_gradient_message() {
898 let data = vec![1, 2, 3, 4, 5];
899 let shape = vec![5];
900 let gradient = GradientMessage::new("layer1", data.clone(), shape.clone(), "f32");
901
902 assert_eq!(gradient.id, "layer1");
903 assert_eq!(gradient.data, data);
904 assert_eq!(gradient.shape, shape);
905 assert_eq!(gradient.dtype, "f32");
906 assert!(gradient.verify());
907 assert_eq!(gradient.num_elements(), 5);
908 assert_eq!(gradient.size_bytes(), 5);
909 }
910
911 #[test]
912 fn test_gradient_checksum() {
913 let data = vec![1, 2, 3, 4, 5];
914 let mut gradient = GradientMessage::new("layer1", data, vec![5], "f32");
915
916 assert!(gradient.verify());
918
919 gradient.data[0] = 99;
921
922 assert!(!gradient.verify());
924 }
925
926 #[tokio::test]
927 async fn test_gradient_aggregator() {
928 let aggregator = GradientAggregator::new(AggregationStrategy::Average, 2);
929
930 let grad1 = GradientMessage::new("layer1", vec![10, 20, 30], vec![3], "f32");
931 let grad2 = GradientMessage::new("layer1", vec![20, 30, 40], vec![3], "f32");
932
933 aggregator.add_gradient(grad1).await.unwrap();
934 aggregator.add_gradient(grad2).await.unwrap();
935
936 assert!(aggregator.is_ready("layer1").await);
937
938 let aggregated = aggregator.aggregate("layer1").await.unwrap();
939 assert_eq!(aggregated.shape, vec![3]);
940 assert_eq!(aggregated.id, "layer1");
941 }
942
943 #[tokio::test]
944 async fn test_gradient_stream() {
945 let aggregator = Arc::new(GradientAggregator::new(AggregationStrategy::Average, 1));
946 let stream = GradientStream::new(aggregator, 10);
947
948 let grad = GradientMessage::new("layer1", vec![1, 2, 3], vec![3], "f32");
949
950 stream.push_gradient(grad.clone()).await.unwrap();
952 assert_eq!(stream.queue_size().await, 1);
953
954 let popped = stream.pop_gradient().await.unwrap();
956 assert_eq!(popped.id, "layer1");
957 assert_eq!(stream.queue_size().await, 0);
958
959 stream.receive_gradient(grad).await.unwrap();
961 }
962}