1use crate::{
5 crypto::{
6 block::{Algorithm, BlockPayload, EncryptedBlock, Header, BLOCK_MULTICODEC},
7 secret::Secret,
8 },
9 AlgorithmError, BlockStorageContentMapping, ExtendedBlock, ExtendedBlockStorage, StorageError,
10};
11use anyhow::anyhow;
12use async_trait::async_trait;
13use cid::Cid;
14use co_primitives::{
15 from_cbor, AnyBlockStorage, Block, BlockLinks, BlockStat, BlockStorage, BlockStorageCloneSettings,
16 CloneWithBlockStorageSettings, DefaultNodeSerializer, KnownMultiCodec, Link, MappedCid, MultiCodec, Node,
17 NodeBuilder, NodeBuilderError, NodeSerializer,
18};
19use futures::{
20 stream::{self, FuturesOrdered},
21 StreamExt, TryStreamExt,
22};
23use serde::Serialize;
24use std::{
25 collections::{BTreeMap, BTreeSet},
26 sync::{Arc, RwLock},
27};
28
29#[derive(Debug, Clone)]
30pub struct EncryptedBlockStorage<S> {
31 key: Secret,
32 algorithm: Algorithm,
33 next: S,
34 mapping: EncryptedBlockStorageMapping,
35 links: BlockLinks,
36 reference_mode: EncryptionReferenceMode,
37 transform: bool,
38}
39impl<S> EncryptedBlockStorage<S>
40where
41 S: AnyBlockStorage,
42{
43 pub fn new(next: S, key: Secret, algorithm: Algorithm, mapping: EncryptedBlockStorageMapping) -> Self {
44 Self {
45 algorithm,
46 key,
47 mapping,
48 next,
49 links: Default::default(),
50 reference_mode: Default::default(),
51 transform: false,
52 }
53 }
54
55 pub fn with_encryption_reference_mode(mut self, mode: EncryptionReferenceMode) -> Self {
56 self.reference_mode = mode;
57 self
58 }
59
60 pub fn storage(&self) -> &S {
62 &self.next
63 }
64
65 pub fn set_storage(&mut self, next: S) {
67 self.next = next;
68 }
69
70 pub async fn insert_mappings(&self, mappings: impl IntoIterator<Item = MappedCid>) {
72 self.mapping.extend(mappings).await;
73 }
74
75 pub async fn load_mapping(&self, map: &Cid) -> Result<(), StorageError> {
78 self.mapping.load_mapping(self, map).await?;
79 Ok(())
80 }
81
82 pub async fn flush_mapping(&self) -> Result<Option<Cid>, StorageError> {
86 let node_serializer = EncryptedNodeSerializer { algorithm: self.algorithm, key: self.key.clone() };
88
89 let (root, blocks) = self
91 .mapping
92 .to_blocks(node_serializer, WriteOptions::new(self.max_block_size()))
93 .await?;
94
95 for block in blocks {
97 self.next.set(block).await?;
98 }
100
101 #[cfg(debug_assertions)]
103 tracing::trace!(?root, "storage-flush-mapping");
104
105 Ok(root)
107 }
108
109 pub async fn clear_mapping(&self, keep: impl IntoIterator<Item = Cid>) {
111 let mapping = self.mapping.get_mapping(keep).await;
112
113 self.mapping.clear().await;
115
116 self.mapping
118 .extend(
119 mapping
120 .into_iter()
121 .filter_map(|(key, value)| value.map(|value| MappedCid(key, value))),
122 )
123 .await;
124 }
125
126 pub async fn regenerate_mapping(&mut self, cids: impl Iterator<Item = Cid>) -> Result<Option<Cid>, StorageError> {
128 self.mapping
129 .set_mapping(
130 BlockMapping::from_cids(self, cids)
131 .await
132 .map_err(|e| StorageError::Internal(e.into()))?,
133 )
134 .await;
135 self.flush_mapping().await
136 }
137
138 #[deprecated]
141 pub async fn insert_mapping(&self, encrypted: &Cid, plain: Option<&Cid>) -> Result<bool, StorageError> {
142 if MultiCodec::is(encrypted, KnownMultiCodec::CoEncryptedBlock) {
143 let plain = match plain {
144 Some(plain) => *plain,
145 None => *self.get_unencrypted(encrypted).await?.cid(),
146 };
147 let old = self.mapping.insert(plain, *encrypted).await;
148 Ok(old.is_none() || old.as_ref() != Some(encrypted))
149 } else {
150 Ok(false)
151 }
152 }
153
154 pub async fn get_unencrypted(&self, cid: &Cid) -> Result<Block, StorageError> {
156 Ok(if MultiCodec::is(cid, KnownMultiCodec::CoEncryptedBlock) {
157 let mut block =
159 EncryptedBlock::try_from(self.next.get(cid).await?).map_err(|e| StorageError::Internal(e.into()))?;
160
161 if let Some(blocks) = block.payload.blocks() {
163 let blocks = stream::iter(blocks.iter().cloned())
164 .map(|cid| {
165 let next = self.next.clone();
166 async move { next.get(&cid).await }
167 })
168 .buffered(10)
169 .try_collect::<Vec<_>>()
170 .await?
171 .into_iter()
172 .map(|block| block.into_inner());
173 block
174 .payload
175 .try_inline_blocks(blocks)
176 .map_err(|_| StorageError::Internal(anyhow!("Inline blocks failed")))?;
177 }
178
179 let plain = block.block(&self.key).map_err(|e| StorageError::Internal(e.into()))?;
181
182 self.mapping
184 .extend(
185 [(plain.cid, *cid)]
186 .into_iter()
187 .chain(plain.references.clone().into_iter())
188 .map(|(internal, external)| MappedCid::new(internal, external)),
189 )
190 .await;
191
192 plain.into()
194 } else {
195 self.next.get(cid).await?
196 })
197 }
198
199 pub async fn set_encrypted(&self, block: Block) -> Result<Cid, StorageError> {
205 if MultiCodec::is(block.cid(), KnownMultiCodec::CoEncryptedBlock) {
206 let plain = EncryptedBlock::try_from(block.clone())
208 .map_err(|e| StorageError::InvalidArgument(e.into()))?
209 .block(&self.key)
210 .map_err(|e| StorageError::InvalidArgument(e.into()))?;
211
212 let encrypted_cid = self.next.set(block).await?;
214
215 {
217 self.mapping
218 .extend(
219 [(*plain.cid(), encrypted_cid)]
220 .into_iter()
221 .chain(plain.references.clone())
222 .map(|(internal, external)| MappedCid::new(internal, external)),
223 )
224 .await;
225 }
226
227 Ok(encrypted_cid)
229 } else {
230 self.next.set(block).await
231 }
232 }
233
234 #[tracing::instrument(level = tracing::Level::TRACE, err(Debug), skip(self, extended_block), fields(cid = ?extended_block.block.cid()))]
235 async fn set_block(&self, extended_block: ExtendedBlock) -> Result<Cid, StorageError> {
236 let block = extended_block.block;
237 let cid = *block.cid();
238
239 #[cfg(feature = "logging-verbose")]
241 {
242 if co_primitives::MultiCodec::is_cbor(block.cid()) {
243 tracing::trace!(cid = ?block.cid(), ipld = ?from_cbor::<ipld_core::ipld::Ipld>(block.data()), "set");
244 } else {
245 tracing::trace!(cid = ?block.cid(), "set");
246 }
247 }
248
249 let mut references = extended_block.options.references.unwrap_or_default();
258 if self.links.has_links(cid) {
259 let links = self.links.links(&block)?.filter(|link| !references.contains_key(link));
262
263 for (plain_cid, encrypted_cid) in self.mapping.get_mapping(links).await {
265 match encrypted_cid {
266 Some(encrypted_cid) => {
268 references.insert(plain_cid, encrypted_cid);
269 },
270 None => {
272 if let EncryptionReferenceMode::Warning = self.reference_mode {
274 tracing::trace!(unmapped_cid = ?plain_cid, ?cid, all_references = ?references, all_links = ?self.links.links(&block).map(|i| i.collect::<Vec<Cid>>()), "unmapped-reference");
275 }
276
277 if !self.reference_mode.is_reference_allowed(&self.next, plain_cid, cid).await {
279 return Err(StorageError::InvalidArgument(anyhow!("Unmapped reference found {} while storing {}. Are you sure you stored all children nodes?", plain_cid, cid)));
280 }
281 },
282 }
283 }
284 }
285
286 let mut block: BlockPayload = block.into();
288 block.references = references;
289 let mut encrypted =
290 EncryptedBlock::encrypt(self.algorithm, &self.key, block).map_err(|e| StorageError::Internal(e.into()))?;
291
292 let extra_encrypted_blocks = encrypted
294 .payload
295 .fit_into_blocks(self.next.max_block_size(), Some(Header::encoded_size(encrypted.header.algorithm)));
296 let encrypted_block: Block = encrypted
297 .try_into()
298 .map_err(|e: AlgorithmError| StorageError::Internal(e.into()))?;
299
300 for extra_encrypted_block in extra_encrypted_blocks {
302 self.next.set(extra_encrypted_block).await?;
303 }
304 let encrypted_cid = self.next.set(encrypted_block).await?;
305
306 self.mapping.insert(cid, encrypted_cid).await;
308
309 #[cfg(debug_assertions)]
311 tracing::trace!(?cid, ?encrypted_cid, cid_bytes = ?cid.to_bytes(), encrypted_cid_bytes = ?encrypted_cid.to_bytes(), "storage-set");
312
313 Ok(cid)
315 }
316}
317#[async_trait]
318impl<S> BlockStorage for EncryptedBlockStorage<S>
319where
320 S: AnyBlockStorage,
321{
322 async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
324 if self.transform && MultiCodec::is(cid, KnownMultiCodec::CoEncryptedBlock) {
326 return self.get_unencrypted(cid).await;
327 }
328
329 let encrypted_cid = self.mapping.get(cid).await;
331 #[cfg(feature = "logging-verbose")]
332 tracing::trace!(?encrypted_cid, ?cid, "encrypted-storage-get");
333 match encrypted_cid {
334 Some(encrypted_cid) => self.get_unencrypted(&encrypted_cid).await,
335 None => match self.next.get(cid).await {
336 Err(err @ StorageError::NotFound(_, _)) => {
337 #[cfg(feature = "logging-verbose")]
339 {
340 let mapping = self.mapping.mapping.read().unwrap().map.clone();
341 let parent_mapping =
342 self.mapping.parent.as_ref().map(|parent| parent.read().unwrap().map.clone());
343 tracing::warn!(?mapping, ?parent_mapping, ?err, ?cid, "encrypted-storage-get-not-found");
344 }
345
346 Err(err)
348 },
349 i => i,
350 },
351 }
352 }
353
354 async fn set(&self, block: Block) -> Result<Cid, StorageError> {
355 self.set_block(block.into()).await
356 }
357
358 async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
359 match self.mapping.remove(cid).await {
360 Some(encrypted_cid) => {
361 #[cfg(feature = "logging-verbose")]
363 tracing::trace!(?encrypted_cid, ?cid, "encrypted-storage-remove");
364
365 self.next.remove(&encrypted_cid).await
367 },
368 None => self.next.remove(cid).await,
369 }
370 }
371
372 async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
373 self.get(cid).await.map(|v| BlockStat { size: v.data().len() as u64 })
374 }
375
376 fn max_block_size(&self) -> usize {
377 self.next.max_block_size()
378 }
379}
380#[async_trait]
381impl<S> ExtendedBlockStorage for EncryptedBlockStorage<S>
382where
383 S: ExtendedBlockStorage + Clone + 'static,
384{
385 async fn set_extended(&self, extended_block: ExtendedBlock) -> Result<Cid, StorageError> {
386 self.set_block(extended_block).await
387 }
388
389 async fn exists(&self, cid: &Cid) -> Result<bool, StorageError> {
390 match self.mapping.get(cid).await {
391 Some(encrypted_cid) => self.next.exists(&encrypted_cid).await,
392 None => self.next.exists(cid).await,
393 }
394 }
395
396 async fn clear(&self) -> Result<(), StorageError> {
397 self.next.clear().await
398 }
399}
400impl<S> CloneWithBlockStorageSettings for EncryptedBlockStorage<S>
401where
402 S: CloneWithBlockStorageSettings,
403{
404 fn clone_with_settings(&self, settings: BlockStorageCloneSettings) -> Self {
405 EncryptedBlockStorage {
406 key: self.key.clone(),
407 algorithm: self.algorithm,
408 links: self.links.clone(),
409 reference_mode: self.reference_mode.clone(),
410 mapping: if settings.clear {
411 Default::default()
412 } else if settings.detached {
413 self.mapping.child()
414 } else {
415 self.mapping.clone()
416 },
417 transform: settings.transform,
418 next: self.next.clone_with_settings(settings),
419 }
420 }
421}
422#[async_trait]
423impl<S> BlockStorageContentMapping for EncryptedBlockStorage<S>
424where
425 S: BlockStorage + Clone + Send + Sync + 'static,
426{
427 async fn is_content_mapped(&self) -> bool {
428 true
429 }
430
431 async fn to_plain(&self, mapped: &Cid) -> Option<Cid> {
432 self.mapping.get(mapped).await
433 }
434
435 async fn to_mapped(&self, plain: &Cid) -> Option<Cid> {
436 if let Some(mapped) = self.mapping.get_first_by_value(plain).await {
438 return Some(mapped);
439 }
440
441 if MultiCodec::is(plain, KnownMultiCodec::CoEncryptedBlock) {
443 if let Ok(block) = self.get_unencrypted(plain).await {
444 return Some(*block.cid());
445 }
446 }
447
448 None
450 }
451
452 async fn insert_mappings(&self, mappings: BTreeSet<MappedCid>) {
453 self.mapping.extend(mappings).await;
454 }
455}
456
457#[derive(Debug, Default, Clone)]
458pub enum EncryptionReferenceMode {
459 #[default]
465 DisallowPlain,
466
467 DisallowPlainExcept(BTreeSet<Cid>),
474
475 DisallowExcept(BTreeSet<Cid>),
481
482 AllowPlain,
488
489 AllowPlainIfExists,
495
496 Warning,
502}
503impl EncryptionReferenceMode {
504 pub async fn is_reference_allowed<S>(&self, next: &S, reference: Cid, parent: Cid) -> bool
508 where
509 S: BlockStorage,
510 {
511 let is_unreleated_encrypted = MultiCodec::is(reference, KnownMultiCodec::CoEncryptedBlock);
513 let is_co_reference = MultiCodec::is(parent, KnownMultiCodec::CoReference);
514
515 match &self {
517 EncryptionReferenceMode::DisallowPlain => is_co_reference || is_unreleated_encrypted,
518 EncryptionReferenceMode::DisallowPlainExcept(allowed) => {
519 is_co_reference || is_unreleated_encrypted || allowed.contains(&reference)
520 },
521 EncryptionReferenceMode::DisallowExcept(allowed) => is_co_reference || allowed.contains(&reference),
522 EncryptionReferenceMode::AllowPlain => true,
523 EncryptionReferenceMode::AllowPlainIfExists => {
524 is_co_reference || is_unreleated_encrypted || next.stat(&reference).await.is_ok()
525 },
526 EncryptionReferenceMode::Warning => true,
527 }
528 }
529}
530
531#[derive(Debug, Clone, Default)]
532pub struct EncryptedBlockStorageMapping {
533 mapping: Arc<RwLock<BlockMapping>>,
534
535 parent: Option<Arc<RwLock<BlockMapping>>>,
539}
540impl EncryptedBlockStorageMapping {
541 fn child(&self) -> EncryptedBlockStorageMapping {
543 EncryptedBlockStorageMapping { parent: Some(self.mapping.clone()), mapping: Default::default() }
544 }
545
546 pub async fn load_mapping<S>(&self, storage: &EncryptedBlockStorage<S>, map: &Cid) -> Result<(), StorageError>
549 where
550 S: BlockStorage + Clone + Send + Sync + 'static,
551 {
552 let mut mapping = BlockMapping::new();
554 mapping.read_mappings(storage, map).await?;
555
556 self.mapping.write().unwrap().append(&mut mapping);
559
560 Ok(())
562 }
563
564 pub async fn set_mapping(&mut self, mapping: BlockMapping) {
566 self.parent = None;
567 self.mapping = Arc::new(RwLock::new(mapping));
568 }
569
570 pub async fn clear(&self) {
571 self.mapping.write().unwrap().clear();
572 }
573
574 pub async fn get(&self, key: &Cid) -> Option<Cid> {
575 match self.mapping.read().unwrap().get(key) {
576 Some(cid) => Some(cid),
577 None => {
578 if let Some(parent) = &self.parent {
579 parent.read().unwrap().get(key)
580 } else {
581 None
582 }
583 },
584 }
585 }
586
587 pub async fn remove(&self, key: &Cid) -> Option<Cid> {
588 let mut result = self.mapping.write().unwrap().remove(key);
589 if let Some(parent) = &self.parent {
590 let parent_result = parent.write().unwrap().remove(key);
591 if result.is_none() {
592 result = parent_result;
593 }
594 }
595 result
596 }
597
598 pub async fn get_mapping(&self, keys: impl IntoIterator<Item = Cid>) -> BTreeMap<Cid, Option<Cid>> {
600 let mapping = self.mapping.read().unwrap();
601 let parent = self.parent.as_ref().map(|parent| parent.read().unwrap());
602 keys.into_iter()
603 .map(|key| {
604 let value = match mapping.get(&key) {
605 Some(cid) => Some(cid),
606 None => {
607 if let Some(parent) = &parent {
608 parent.get(&key)
609 } else {
610 None
611 }
612 },
613 };
614 (key, value)
615 })
616 .collect()
617 }
618
619 pub async fn get_first_by_value(&self, key: &Cid) -> Option<Cid> {
620 match self.mapping.read().unwrap().get_first_by_value(key) {
621 Some(cid) => Some(cid),
622 None => {
623 if let Some(parent) = &self.parent {
624 parent.read().unwrap().get_first_by_value(key)
625 } else {
626 None
627 }
628 },
629 }
630 }
631
632 pub async fn insert(&self, key: Cid, value: Cid) -> Option<Cid> {
633 self.mapping.write().unwrap().insert(key, value)
634 }
635
636 pub async fn extend(&self, items: impl IntoIterator<Item = MappedCid>) {
637 self.mapping
638 .write()
639 .unwrap()
640 .extend(items.into_iter().map(|MappedCid(internal, external)| (internal, external)));
641 }
642
643 pub async fn to_blocks<S>(
644 &self,
645 serializer: S,
646 options: WriteOptions,
647 ) -> Result<(Option<Cid>, Vec<Block>), StorageError>
648 where
649 S: NodeSerializer<Node<(Cid, Cid)>, (Cid, Cid)>,
650 {
651 let mapping = {
653 let mut map = self.mapping.read().unwrap().clone();
654 if let Some(parent) = &self.parent {
655 let mut parent_map = parent.read().unwrap().clone();
656 map.append(&mut parent_map);
657 };
658 map
659 };
660
661 mapping.to_blocks(serializer, options)
663 }
664}
665#[async_trait]
666impl BlockStorageContentMapping for EncryptedBlockStorageMapping {
667 async fn is_content_mapped(&self) -> bool {
668 true
669 }
670
671 async fn to_plain(&self, mapped: &Cid) -> Option<Cid> {
673 self.get(mapped).await
674 }
675
676 async fn to_mapped(&self, plain: &Cid) -> Option<Cid> {
678 self.get_first_by_value(plain).await
679 }
680
681 async fn insert_mappings(&self, _mappings: BTreeSet<MappedCid>) {
682 unimplemented!("use storage directly");
683 }
684}
685
686#[derive(Debug, thiserror::Error)]
687pub enum BlockMappingError {
688 #[error("Storage Error")]
689 Storage(#[from] StorageError),
690
691 #[error("Algorithm Error")]
692 Algorithm(#[from] AlgorithmError),
693}
694impl From<BlockMappingError> for StorageError {
695 fn from(val: BlockMappingError) -> Self {
696 match val {
697 BlockMappingError::Storage(e) => e,
698 BlockMappingError::Algorithm(e) => match e {
699 AlgorithmError::Cipher => StorageError::InvalidArgument(e.into()), AlgorithmError::InvalidArguments(_) => StorageError::InvalidArgument(e.into()),
702 AlgorithmError::Decoding => StorageError::Internal(e.into()),
703 AlgorithmError::Encoding => StorageError::Internal(e.into()),
704 AlgorithmError::Size => StorageError::Internal(e.into()),
705 },
706 }
707 }
708}
709
710#[derive(Clone, Debug)]
713pub struct BlockMapping {
714 map: BTreeMap<Cid, Cid>,
716}
717impl BlockMapping {
718 pub fn new() -> Self {
719 Self { map: BTreeMap::new() }
720 }
721
722 pub fn get(&self, key: &Cid) -> Option<Cid> {
723 self.map.get(key).cloned()
724 }
725
726 pub fn insert(&mut self, key: Cid, value: Cid) -> Option<Cid> {
727 self.map.insert(key, value)
728 }
729
730 pub fn extend(&mut self, items: impl IntoIterator<Item = (Cid, Cid)>) {
731 self.map.extend(items);
732 }
733
734 pub fn append(&mut self, other: &mut BlockMapping) {
735 self.map.append(&mut other.map);
736 }
737
738 pub fn remove(&mut self, key: &Cid) -> Option<Cid> {
739 self.map.remove(key)
740 }
741
742 pub fn get_first_by_value(&self, value: &Cid) -> Option<Cid> {
743 self.map.iter().find_map(|(k, v)| {
744 if v == value {
745 return Some(*k);
746 }
747 None
748 })
749 }
750
751 pub fn clear(&mut self) {
752 self.map.clear();
753 }
754
755 pub fn iter(&self) -> impl Iterator<Item = (&Cid, &Cid)> {
756 self.map.iter()
757 }
758
759 pub fn into_iter(self) -> impl Iterator<Item = (Cid, Cid)> {
760 self.map.into_iter()
761 }
762
763 pub async fn from_cids<S>(
764 storage: &EncryptedBlockStorage<S>,
765 cids: impl Iterator<Item = Cid>,
766 ) -> Result<Self, BlockMappingError>
767 where
768 S: BlockStorage,
769 {
770 let mut mapping = BlockMapping::new();
771 for cid in cids {
772 if cid.codec() == BLOCK_MULTICODEC {
773 let encrypted_block: EncryptedBlock = storage.next.get(&cid).await?.try_into()?;
774 let block = encrypted_block.block(&storage.key)?;
775 mapping.insert(block.cid, cid);
776 }
777 }
778 Ok(mapping)
779 }
780
781 pub async fn read_mappings<S>(
784 &mut self,
785 storage: &EncryptedBlockStorage<S>,
786 cid: &Cid,
787 ) -> Result<usize, StorageError>
788 where
789 S: BlockStorage + Clone + Send + Sync + 'static,
790 {
791 let mut count = 0;
792 let mut tasks = FuturesOrdered::new();
793
794 let read = |cid: Cid| async move { storage.get_unencrypted(&cid).await };
796 tasks.push_back(read(*cid));
797
798 while let Some(block) = tasks.next().await {
800 let block = block?;
801
802 MultiCodec::with_cbor(block.cid())?;
804
805 let node: Node<(Cid, Cid)> =
807 from_cbor(block.data()).map_err(|e| StorageError::InvalidArgument(e.into()))?;
808
809 match node {
811 Node::Node(links) => {
812 for link in links {
813 tasks.push_back(read(link.into()));
814 }
815 },
816 Node::Leaf(entries) => {
817 for (key, value) in entries.into_iter() {
818 self.insert(key, value);
819 count += 1;
820 }
821 },
822 }
823 }
824
825 Ok(count)
827 }
828
829 pub fn to_blocks<S>(&self, serializer: S, options: WriteOptions) -> Result<(Option<Cid>, Vec<Block>), StorageError>
833 where
834 S: NodeSerializer<Node<(Cid, Cid)>, (Cid, Cid)>,
835 {
836 let mut builder = NodeBuilder::<(Cid, Cid), Node<(Cid, Cid)>, S>::new(
838 options.max_block_size,
839 options.max_children,
840 serializer,
841 );
842 for (key, value) in self.map.iter() {
843 builder.push((*key, *value)).map_err(|e| StorageError::Internal(e.into()))?;
844 }
845 let (root, blocks) = builder.into_blocks().map_err(|e| StorageError::Internal(e.into()))?;
846
847 Ok((*root.cid(), blocks))
849 }
850}
851impl Default for BlockMapping {
852 fn default() -> Self {
853 Self::new()
854 }
855}
856
857struct EncryptedNodeSerializer {
859 key: Secret,
860 algorithm: Algorithm,
861}
862impl<T> NodeSerializer<Node<T>, T> for EncryptedNodeSerializer
863where
864 T: Clone + Serialize,
865{
866 fn nodes(&mut self, nodes: Vec<Link<Node<T>>>) -> Result<Node<T>, NodeBuilderError> {
867 Ok(Node::Node(nodes))
868 }
869
870 fn leaf(&mut self, entries: Vec<T>) -> Result<Node<T>, NodeBuilderError> {
871 Ok(Node::Leaf(entries))
872 }
873
874 fn serialize(&mut self, max_block_size: usize, node: Node<T>) -> Result<Block, NodeBuilderError> {
875 let block: Block = DefaultNodeSerializer::new().serialize(max_block_size, node)?;
876 let encrypted = EncryptedBlock::encrypt(self.algorithm, &self.key, block)?;
877 let encrypted_block: Block = encrypted.try_into()?;
878 Ok(encrypted_block)
879 }
880}
881impl From<AlgorithmError> for NodeBuilderError {
882 fn from(err: AlgorithmError) -> Self {
883 NodeBuilderError::Encoding(err.into())
884 }
885}
886
887pub struct WriteOptions {
888 max_block_size: usize,
890
891 max_children: usize,
893}
894impl WriteOptions {
895 fn new(max_block_size: usize) -> Self {
896 Self { max_block_size, max_children: 174 }
897 }
898}
899
900#[cfg(test)]
901mod tests {
902 use crate::{
903 crypto::{
904 block::{Algorithm, BLOCK_MULTICODEC},
905 secret::Secret,
906 },
907 BlockStorage, EncryptedBlockStorage, MemoryBlockStorage,
908 };
909 use cid::Cid;
910 use co_primitives::{BlockSerializer, DefaultParams, StorageError, StoreParams};
911 use serde::{Deserialize, Serialize};
912 use std::iter::repeat_n;
913
914 #[derive(Debug, Serialize, Deserialize)]
915 struct Test {
916 hello: String,
917 }
918
919 #[tokio::test]
920 async fn roundtrip() {
921 let memory = MemoryBlockStorage::default();
923 let algorithm = Algorithm::default();
924 let key = Secret::new(repeat_n(42, algorithm.key_size()).collect());
925 let encryption = EncryptedBlockStorage::new(memory.clone(), key, algorithm, Default::default());
926
927 let data = Test { hello: "world".to_owned() };
929 let block = BlockSerializer::default().serialize(&data).unwrap();
930
931 let result = encryption.set(block.clone()).await.unwrap();
933 assert_eq!(&result, block.cid());
934
935 assert_eq!(encryption.get(block.cid()).await.unwrap(), block);
937
938 assert!(matches!(memory.get(block.cid()).await, Err(StorageError::NotFound(_, _))));
940 }
941
942 #[tokio::test]
943 async fn store_mapping() {
944 let memory = MemoryBlockStorage::new();
946 let algorithm = Algorithm::default();
947 let key = Secret::new(repeat_n(42, algorithm.key_size()).collect());
948 let encryption = EncryptedBlockStorage::new(memory.clone(), key.clone(), algorithm, Default::default());
949
950 let mut cids: Vec<Cid> = Default::default();
952 for i in 0..1024 {
953 let data = Test { hello: format!("Hi {}!", i).to_owned() };
954 let block = BlockSerializer::default().serialize(&data).unwrap();
955 cids.push(*block.cid());
956 encryption.set(block.clone()).await.unwrap();
957 }
958
959 let mapping_cid = encryption.flush_mapping().await.unwrap().expect("Mappings if we have items");
961 assert_eq!(mapping_cid.codec(), BLOCK_MULTICODEC); let memory_cids: Vec<Cid> = memory.entries().await.map(|block| *block.cid()).collect();
965 assert_eq!(memory_cids.len(), 7 + 1024); for memory_cid in memory_cids.iter() {
967 let memory_block = memory.get(memory_cid).await.unwrap();
968 assert_eq!(memory_cid.codec(), BLOCK_MULTICODEC); assert!(DefaultParams::MAX_BLOCK_SIZE > memory_block.data().len()); }
971
972 let encryption = EncryptedBlockStorage::new(memory, key, algorithm, Default::default());
974 encryption.load_mapping(&mapping_cid).await.unwrap();
975 for cid in cids {
976 encryption.get(&cid).await.unwrap();
977 }
978 }
979}