Skip to main content

co_storage/storage/
encrypted.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use crate::{
5	crypto::{
6		block::{Algorithm, BlockPayload, EncryptedBlock, Header, BLOCK_MULTICODEC},
7		secret::Secret,
8	},
9	AlgorithmError, BlockStorageContentMapping, ExtendedBlock, ExtendedBlockStorage, StorageError,
10};
11use anyhow::anyhow;
12use async_trait::async_trait;
13use cid::Cid;
14use co_primitives::{
15	from_cbor, AnyBlockStorage, Block, BlockLinks, BlockStat, BlockStorage, BlockStorageCloneSettings,
16	CloneWithBlockStorageSettings, DefaultNodeSerializer, KnownMultiCodec, Link, MappedCid, MultiCodec, Node,
17	NodeBuilder, NodeBuilderError, NodeSerializer,
18};
19use futures::{
20	stream::{self, FuturesOrdered},
21	StreamExt, TryStreamExt,
22};
23use serde::Serialize;
24use std::{
25	collections::{BTreeMap, BTreeSet},
26	sync::{Arc, RwLock},
27};
28
29#[derive(Debug, Clone)]
30pub struct EncryptedBlockStorage<S> {
31	key: Secret,
32	algorithm: Algorithm,
33	next: S,
34	mapping: EncryptedBlockStorageMapping,
35	links: BlockLinks,
36	reference_mode: EncryptionReferenceMode,
37	transform: bool,
38}
39impl<S> EncryptedBlockStorage<S>
40where
41	S: AnyBlockStorage,
42{
43	pub fn new(next: S, key: Secret, algorithm: Algorithm, mapping: EncryptedBlockStorageMapping) -> Self {
44		Self {
45			algorithm,
46			key,
47			mapping,
48			next,
49			links: Default::default(),
50			reference_mode: Default::default(),
51			transform: false,
52		}
53	}
54
55	pub fn with_encryption_reference_mode(mut self, mode: EncryptionReferenceMode) -> Self {
56		self.reference_mode = mode;
57		self
58	}
59
60	/// Get next storage.
61	pub fn storage(&self) -> &S {
62		&self.next
63	}
64
65	/// Set next storage.
66	pub fn set_storage(&mut self, next: S) {
67		self.next = next;
68	}
69
70	/// Add known mappings.
71	pub async fn insert_mappings(&self, mappings: impl IntoIterator<Item = MappedCid>) {
72		self.mapping.extend(mappings).await;
73	}
74
75	/// Load mapping from CID.
76	/// This will add the mappings to the existing.
77	pub async fn load_mapping(&self, map: &Cid) -> Result<(), StorageError> {
78		self.mapping.load_mapping(self, map).await?;
79		Ok(())
80	}
81
82	/// Flush mapping to (parent) storage.
83	/// Returns the encrypted mapping CID.
84	/// The mapping tree will also only link to encrypted CIDs.
85	pub async fn flush_mapping(&self) -> Result<Option<Cid>, StorageError> {
86		// serializer
87		let node_serializer = EncryptedNodeSerializer { algorithm: self.algorithm, key: self.key.clone() };
88
89		// blocks
90		let (root, blocks) = self
91			.mapping
92			.to_blocks(node_serializer, WriteOptions::new(self.max_block_size()))
93			.await?;
94
95		// store
96		for block in blocks {
97			self.next.set(block).await?;
98			// TODO: PIN/UNPIN
99		}
100
101		// log
102		#[cfg(debug_assertions)]
103		tracing::trace!(?root, "storage-flush-mapping");
104
105		// result
106		Ok(root)
107	}
108
109	/// Clear encryption mapping.
110	pub async fn clear_mapping(&self, keep: impl IntoIterator<Item = Cid>) {
111		let mapping = self.mapping.get_mapping(keep).await;
112
113		// clear
114		self.mapping.clear().await;
115
116		// add
117		self.mapping
118			.extend(
119				mapping
120					.into_iter()
121					.filter_map(|(key, value)| value.map(|value| MappedCid(key, value))),
122			)
123			.await;
124	}
125
126	/// This will regenerate and flush the encryption block mapping using supplied CIDs.
127	pub async fn regenerate_mapping(&mut self, cids: impl Iterator<Item = Cid>) -> Result<Option<Cid>, StorageError> {
128		self.mapping
129			.set_mapping(
130				BlockMapping::from_cids(self, cids)
131					.await
132					.map_err(|e| StorageError::Internal(e.into()))?,
133			)
134			.await;
135		self.flush_mapping().await
136	}
137
138	/// Insert mapping for encrypted block.
139	/// Returns true if mapping has been changed.
140	#[deprecated]
141	pub async fn insert_mapping(&self, encrypted: &Cid, plain: Option<&Cid>) -> Result<bool, StorageError> {
142		if MultiCodec::is(encrypted, KnownMultiCodec::CoEncryptedBlock) {
143			let plain = match plain {
144				Some(plain) => *plain,
145				None => *self.get_unencrypted(encrypted).await?.cid(),
146			};
147			let old = self.mapping.insert(plain, *encrypted).await;
148			Ok(old.is_none() || old.as_ref() != Some(encrypted))
149		} else {
150			Ok(false)
151		}
152	}
153
154	/// Get encrypted cid as unencrypted block.
155	pub async fn get_unencrypted(&self, cid: &Cid) -> Result<Block, StorageError> {
156		Ok(if MultiCodec::is(cid, KnownMultiCodec::CoEncryptedBlock) {
157			// get block
158			let mut block =
159				EncryptedBlock::try_from(self.next.get(cid).await?).map_err(|e| StorageError::Internal(e.into()))?;
160
161			// make inline
162			if let Some(blocks) = block.payload.blocks() {
163				let blocks = stream::iter(blocks.iter().cloned())
164					.map(|cid| {
165						let next = self.next.clone();
166						async move { next.get(&cid).await }
167					})
168					.buffered(10)
169					.try_collect::<Vec<_>>()
170					.await?
171					.into_iter()
172					.map(|block| block.into_inner());
173				block
174					.payload
175					.try_inline_blocks(blocks)
176					.map_err(|_| StorageError::Internal(anyhow!("Inline blocks failed")))?;
177			}
178
179			// decrypt
180			let plain = block.block(&self.key).map_err(|e| StorageError::Internal(e.into()))?;
181
182			// apply mappings
183			self.mapping
184				.extend(
185					[(plain.cid, *cid)]
186						.into_iter()
187						.chain(plain.references.clone().into_iter())
188						.map(|(internal, external)| MappedCid::new(internal, external)),
189				)
190				.await;
191
192			// result
193			plain.into()
194		} else {
195			self.next.get(cid).await?
196		})
197	}
198
199	/// Set encrypted block.
200	/// Expects the encrypted block belongs to our key.
201	///
202	/// Errors:
203	/// - [`StorageError::InvalidArgument`]: Block can not be decrypted.
204	pub async fn set_encrypted(&self, block: Block) -> Result<Cid, StorageError> {
205		if MultiCodec::is(block.cid(), KnownMultiCodec::CoEncryptedBlock) {
206			// decrypt the block to update the mapping
207			let plain = EncryptedBlock::try_from(block.clone())
208				.map_err(|e| StorageError::InvalidArgument(e.into()))?
209				.block(&self.key)
210				.map_err(|e| StorageError::InvalidArgument(e.into()))?;
211
212			// write
213			let encrypted_cid = self.next.set(block).await?;
214
215			// map
216			{
217				self.mapping
218					.extend(
219						[(*plain.cid(), encrypted_cid)]
220							.into_iter()
221							.chain(plain.references.clone())
222							.map(|(internal, external)| MappedCid::new(internal, external)),
223					)
224					.await;
225			}
226
227			// result
228			Ok(encrypted_cid)
229		} else {
230			self.next.set(block).await
231		}
232	}
233
234	#[tracing::instrument(level = tracing::Level::TRACE, err(Debug), skip(self, extended_block), fields(cid = ?extended_block.block.cid()))]
235	async fn set_block(&self, extended_block: ExtendedBlock) -> Result<Cid, StorageError> {
236		let block = extended_block.block;
237		let cid = *block.cid();
238
239		// log
240		#[cfg(feature = "logging-verbose")]
241		{
242			if co_primitives::MultiCodec::is_cbor(block.cid()) {
243				tracing::trace!(cid = ?block.cid(), ipld = ?from_cbor::<ipld_core::ipld::Ipld>(block.data()), "set");
244			} else {
245				tracing::trace!(cid = ?block.cid(), "set");
246			}
247		}
248
249		// references
250		//  try to resolve all children references using the mapping
251		//   the node creator has either:
252		//    the mapping from loading the original node before
253		//    all children nodes as he created it from sratch
254		//  as a fallback check if the Cid exists in the parent so we know this is a unencrypted reference
255		//  Question: are there any valid cases for not unencrypted and not known reference?
256		// 	  Yes: encrypted local CO stores references to unencrypted shared CO.
257		let mut references = extended_block.options.references.unwrap_or_default();
258		if self.links.has_links(cid) {
259			// links
260			//  filter out already mapped links
261			let links = self.links.links(&block)?.filter(|link| !references.contains_key(link));
262
263			// references
264			for (plain_cid, encrypted_cid) in self.mapping.get_mapping(links).await {
265				match encrypted_cid {
266					// reference mapping
267					Some(encrypted_cid) => {
268						references.insert(plain_cid, encrypted_cid);
269					},
270					// reference mode
271					None => {
272						// log
273						if let EncryptionReferenceMode::Warning = self.reference_mode {
274							tracing::trace!(unmapped_cid = ?plain_cid, ?cid, all_references = ?references, all_links = ?self.links.links(&block).map(|i| i.collect::<Vec<Cid>>()), "unmapped-reference");
275						}
276
277						// error
278						if !self.reference_mode.is_reference_allowed(&self.next, plain_cid, cid).await {
279							return Err(StorageError::InvalidArgument(anyhow!("Unmapped reference found {} while storing {}. Are you sure you stored all children nodes?", plain_cid, cid)));
280						}
281					},
282				}
283			}
284		}
285
286		// encrypt
287		let mut block: BlockPayload = block.into();
288		block.references = references;
289		let mut encrypted =
290			EncryptedBlock::encrypt(self.algorithm, &self.key, block).map_err(|e| StorageError::Internal(e.into()))?;
291
292		// fit into block size limit
293		let extra_encrypted_blocks = encrypted
294			.payload
295			.fit_into_blocks(self.next.max_block_size(), Some(Header::encoded_size(encrypted.header.algorithm)));
296		let encrypted_block: Block = encrypted
297			.try_into()
298			.map_err(|e: AlgorithmError| StorageError::Internal(e.into()))?;
299
300		// store
301		for extra_encrypted_block in extra_encrypted_blocks {
302			self.next.set(extra_encrypted_block).await?;
303		}
304		let encrypted_cid = self.next.set(encrypted_block).await?;
305
306		// map
307		self.mapping.insert(cid, encrypted_cid).await;
308
309		// trace (only in debug because this has security implications)
310		#[cfg(debug_assertions)]
311		tracing::trace!(?cid, ?encrypted_cid, cid_bytes = ?cid.to_bytes(), encrypted_cid_bytes = ?encrypted_cid.to_bytes(), "storage-set");
312
313		// result
314		Ok(cid)
315	}
316}
317#[async_trait]
318impl<S> BlockStorage for EncryptedBlockStorage<S>
319where
320	S: AnyBlockStorage,
321{
322	/// Get block.
323	async fn get(&self, cid: &Cid) -> Result<Block, StorageError> {
324		// transform?
325		if self.transform && MultiCodec::is(cid, KnownMultiCodec::CoEncryptedBlock) {
326			return self.get_unencrypted(cid).await;
327		}
328
329		// default
330		let encrypted_cid = self.mapping.get(cid).await;
331		#[cfg(feature = "logging-verbose")]
332		tracing::trace!(?encrypted_cid, ?cid, "encrypted-storage-get");
333		match encrypted_cid {
334			Some(encrypted_cid) => self.get_unencrypted(&encrypted_cid).await,
335			None => match self.next.get(cid).await {
336				Err(err @ StorageError::NotFound(_, _)) => {
337					// log
338					#[cfg(feature = "logging-verbose")]
339					{
340						let mapping = self.mapping.mapping.read().unwrap().map.clone();
341						let parent_mapping =
342							self.mapping.parent.as_ref().map(|parent| parent.read().unwrap().map.clone());
343						tracing::warn!(?mapping, ?parent_mapping, ?err, ?cid, "encrypted-storage-get-not-found");
344					}
345
346					// forward
347					Err(err)
348				},
349				i => i,
350			},
351		}
352	}
353
354	async fn set(&self, block: Block) -> Result<Cid, StorageError> {
355		self.set_block(block.into()).await
356	}
357
358	async fn remove(&self, cid: &Cid) -> Result<(), StorageError> {
359		match self.mapping.remove(cid).await {
360			Some(encrypted_cid) => {
361				// log
362				#[cfg(feature = "logging-verbose")]
363				tracing::trace!(?encrypted_cid, ?cid, "encrypted-storage-remove");
364
365				// remove
366				self.next.remove(&encrypted_cid).await
367			},
368			None => self.next.remove(cid).await,
369		}
370	}
371
372	async fn stat(&self, cid: &Cid) -> Result<BlockStat, StorageError> {
373		self.get(cid).await.map(|v| BlockStat { size: v.data().len() as u64 })
374	}
375
376	fn max_block_size(&self) -> usize {
377		self.next.max_block_size()
378	}
379}
380#[async_trait]
381impl<S> ExtendedBlockStorage for EncryptedBlockStorage<S>
382where
383	S: ExtendedBlockStorage + Clone + 'static,
384{
385	async fn set_extended(&self, extended_block: ExtendedBlock) -> Result<Cid, StorageError> {
386		self.set_block(extended_block).await
387	}
388
389	async fn exists(&self, cid: &Cid) -> Result<bool, StorageError> {
390		match self.mapping.get(cid).await {
391			Some(encrypted_cid) => self.next.exists(&encrypted_cid).await,
392			None => self.next.exists(cid).await,
393		}
394	}
395
396	async fn clear(&self) -> Result<(), StorageError> {
397		self.next.clear().await
398	}
399}
400impl<S> CloneWithBlockStorageSettings for EncryptedBlockStorage<S>
401where
402	S: CloneWithBlockStorageSettings,
403{
404	fn clone_with_settings(&self, settings: BlockStorageCloneSettings) -> Self {
405		EncryptedBlockStorage {
406			key: self.key.clone(),
407			algorithm: self.algorithm,
408			links: self.links.clone(),
409			reference_mode: self.reference_mode.clone(),
410			mapping: if settings.clear {
411				Default::default()
412			} else if settings.detached {
413				self.mapping.child()
414			} else {
415				self.mapping.clone()
416			},
417			transform: settings.transform,
418			next: self.next.clone_with_settings(settings),
419		}
420	}
421}
422#[async_trait]
423impl<S> BlockStorageContentMapping for EncryptedBlockStorage<S>
424where
425	S: BlockStorage + Clone + Send + Sync + 'static,
426{
427	async fn is_content_mapped(&self) -> bool {
428		true
429	}
430
431	async fn to_plain(&self, mapped: &Cid) -> Option<Cid> {
432		self.mapping.get(mapped).await
433	}
434
435	async fn to_mapped(&self, plain: &Cid) -> Option<Cid> {
436		// try to read from mapping
437		if let Some(mapped) = self.mapping.get_first_by_value(plain).await {
438			return Some(mapped);
439		}
440
441		// try to decrypt
442		if MultiCodec::is(plain, KnownMultiCodec::CoEncryptedBlock) {
443			if let Ok(block) = self.get_unencrypted(plain).await {
444				return Some(*block.cid());
445			}
446		}
447
448		// none
449		None
450	}
451
452	async fn insert_mappings(&self, mappings: BTreeSet<MappedCid>) {
453		self.mapping.extend(mappings).await;
454	}
455}
456
457#[derive(Debug, Default, Clone)]
458pub enum EncryptionReferenceMode {
459	/// Disallow any references that are not encrypted.
460	/// Allowed references:
461	/// - Plain: NO
462	/// - Unrelated encrypted: YES
463	/// - CoReference: YES
464	#[default]
465	DisallowPlain,
466
467	/// Disallow any references that are not encrypted except specific plain references.
468	/// Unrelated encrypted references are allowed.
469	/// Allowed references:
470	/// - Plain: SPECIFIC
471	/// - Unrelated encrypted: YES
472	/// - CoReference: YES
473	DisallowPlainExcept(BTreeSet<Cid>),
474
475	/// Disallow any references that are not encrypted except specific references.
476	/// Allowed references:
477	/// - Plain: SPECIFIC
478	/// - Unrelated encrypted: SPECIFIC
479	/// - CoReference: YES
480	DisallowExcept(BTreeSet<Cid>),
481
482	/// Allow any plain references.
483	/// Allowed references:
484	/// - Plain: YES
485	/// - Unrelated encrypted: YES
486	/// - CoReference: YES
487	AllowPlain,
488
489	/// Allow any plain references if the exists in parent storage.
490	/// Allowed references:
491	/// - Plain: IF EXISTS
492	/// - Unrelated encrypted: YES
493	/// - CoReference: YES
494	AllowPlainIfExists,
495
496	/// Allow any plain references but warn (log) about unencrypted references.
497	/// Allowed references:
498	/// - Plain: YES, WITH WARNING
499	/// - Unrelated encrypted: YES
500	/// - CoReference: YES
501	Warning,
502}
503impl EncryptionReferenceMode {
504	/// Test if reference is allowed in parent.
505	///
506	/// Note: For mode Warning, the caller is responsible for the warning.
507	pub async fn is_reference_allowed<S>(&self, next: &S, reference: Cid, parent: Cid) -> bool
508	where
509		S: BlockStorage,
510	{
511		// encrypted block reference in plain data
512		let is_unreleated_encrypted = MultiCodec::is(reference, KnownMultiCodec::CoEncryptedBlock);
513		let is_co_reference = MultiCodec::is(parent, KnownMultiCodec::CoReference);
514
515		// evaluate
516		match &self {
517			EncryptionReferenceMode::DisallowPlain => is_co_reference || is_unreleated_encrypted,
518			EncryptionReferenceMode::DisallowPlainExcept(allowed) => {
519				is_co_reference || is_unreleated_encrypted || allowed.contains(&reference)
520			},
521			EncryptionReferenceMode::DisallowExcept(allowed) => is_co_reference || allowed.contains(&reference),
522			EncryptionReferenceMode::AllowPlain => true,
523			EncryptionReferenceMode::AllowPlainIfExists => {
524				is_co_reference || is_unreleated_encrypted || next.stat(&reference).await.is_ok()
525			},
526			EncryptionReferenceMode::Warning => true,
527		}
528	}
529}
530
531#[derive(Debug, Clone, Default)]
532pub struct EncryptedBlockStorageMapping {
533	mapping: Arc<RwLock<BlockMapping>>,
534
535	/// Parent Mapping.
536	///
537	/// Note: To prevent deadlocks, when lock both always lock mapping first.
538	parent: Option<Arc<RwLock<BlockMapping>>>,
539}
540impl EncryptedBlockStorageMapping {
541	/// Create a child instance.
542	fn child(&self) -> EncryptedBlockStorageMapping {
543		EncryptedBlockStorageMapping { parent: Some(self.mapping.clone()), mapping: Default::default() }
544	}
545
546	/// Load mapping from CID.
547	/// This will add the mappings to the existing.
548	pub async fn load_mapping<S>(&self, storage: &EncryptedBlockStorage<S>, map: &Cid) -> Result<(), StorageError>
549	where
550		S: BlockStorage + Clone + Send + Sync + 'static,
551	{
552		// load
553		let mut mapping = BlockMapping::new();
554		mapping.read_mappings(storage, map).await?;
555
556		// insert
557		//  we dont use read_mappings directly because of possible deadlocks and because it involves IO.
558		self.mapping.write().unwrap().append(&mut mapping);
559
560		// done
561		Ok(())
562	}
563
564	/// Replace the mapping.
565	pub async fn set_mapping(&mut self, mapping: BlockMapping) {
566		self.parent = None;
567		self.mapping = Arc::new(RwLock::new(mapping));
568	}
569
570	pub async fn clear(&self) {
571		self.mapping.write().unwrap().clear();
572	}
573
574	pub async fn get(&self, key: &Cid) -> Option<Cid> {
575		match self.mapping.read().unwrap().get(key) {
576			Some(cid) => Some(cid),
577			None => {
578				if let Some(parent) = &self.parent {
579					parent.read().unwrap().get(key)
580				} else {
581					None
582				}
583			},
584		}
585	}
586
587	pub async fn remove(&self, key: &Cid) -> Option<Cid> {
588		let mut result = self.mapping.write().unwrap().remove(key);
589		if let Some(parent) = &self.parent {
590			let parent_result = parent.write().unwrap().remove(key);
591			if result.is_none() {
592				result = parent_result;
593			}
594		}
595		result
596	}
597
598	/// Map multiple Cids into an Map.
599	pub async fn get_mapping(&self, keys: impl IntoIterator<Item = Cid>) -> BTreeMap<Cid, Option<Cid>> {
600		let mapping = self.mapping.read().unwrap();
601		let parent = self.parent.as_ref().map(|parent| parent.read().unwrap());
602		keys.into_iter()
603			.map(|key| {
604				let value = match mapping.get(&key) {
605					Some(cid) => Some(cid),
606					None => {
607						if let Some(parent) = &parent {
608							parent.get(&key)
609						} else {
610							None
611						}
612					},
613				};
614				(key, value)
615			})
616			.collect()
617	}
618
619	pub async fn get_first_by_value(&self, key: &Cid) -> Option<Cid> {
620		match self.mapping.read().unwrap().get_first_by_value(key) {
621			Some(cid) => Some(cid),
622			None => {
623				if let Some(parent) = &self.parent {
624					parent.read().unwrap().get_first_by_value(key)
625				} else {
626					None
627				}
628			},
629		}
630	}
631
632	pub async fn insert(&self, key: Cid, value: Cid) -> Option<Cid> {
633		self.mapping.write().unwrap().insert(key, value)
634	}
635
636	pub async fn extend(&self, items: impl IntoIterator<Item = MappedCid>) {
637		self.mapping
638			.write()
639			.unwrap()
640			.extend(items.into_iter().map(|MappedCid(internal, external)| (internal, external)));
641	}
642
643	pub async fn to_blocks<S>(
644		&self,
645		serializer: S,
646		options: WriteOptions,
647	) -> Result<(Option<Cid>, Vec<Block>), StorageError>
648	where
649		S: NodeSerializer<Node<(Cid, Cid)>, (Cid, Cid)>,
650	{
651		// copy items
652		let mapping = {
653			let mut map = self.mapping.read().unwrap().clone();
654			if let Some(parent) = &self.parent {
655				let mut parent_map = parent.read().unwrap().clone();
656				map.append(&mut parent_map);
657			};
658			map
659		};
660
661		// blocks
662		mapping.to_blocks(serializer, options)
663	}
664}
665#[async_trait]
666impl BlockStorageContentMapping for EncryptedBlockStorageMapping {
667	async fn is_content_mapped(&self) -> bool {
668		true
669	}
670
671	/// Convert the mapped [`Cid`] to an plain storage [`Cid`].
672	async fn to_plain(&self, mapped: &Cid) -> Option<Cid> {
673		self.get(mapped).await
674	}
675
676	/// Convert the plain storage [`Cid`] to a mapped [`Cid`].
677	async fn to_mapped(&self, plain: &Cid) -> Option<Cid> {
678		self.get_first_by_value(plain).await
679	}
680
681	async fn insert_mappings(&self, _mappings: BTreeSet<MappedCid>) {
682		unimplemented!("use storage directly");
683	}
684}
685
686#[derive(Debug, thiserror::Error)]
687pub enum BlockMappingError {
688	#[error("Storage Error")]
689	Storage(#[from] StorageError),
690
691	#[error("Algorithm Error")]
692	Algorithm(#[from] AlgorithmError),
693}
694impl From<BlockMappingError> for StorageError {
695	fn from(val: BlockMappingError) -> Self {
696		match val {
697			BlockMappingError::Storage(e) => e,
698			BlockMappingError::Algorithm(e) => match e {
699				AlgorithmError::Cipher => StorageError::InvalidArgument(e.into()), /* likely wrong key supplied for */
700				// given CID.
701				AlgorithmError::InvalidArguments(_) => StorageError::InvalidArgument(e.into()),
702				AlgorithmError::Decoding => StorageError::Internal(e.into()),
703				AlgorithmError::Encoding => StorageError::Internal(e.into()),
704				AlgorithmError::Size => StorageError::Internal(e.into()),
705			},
706		}
707	}
708}
709
710/// Serializeable block mapping.
711/// This is used to store the mapping itself as an block.
712#[derive(Clone, Debug)]
713pub struct BlockMapping {
714	/// Mapping from mapped/internal to plain/external.
715	map: BTreeMap<Cid, Cid>,
716}
717impl BlockMapping {
718	pub fn new() -> Self {
719		Self { map: BTreeMap::new() }
720	}
721
722	pub fn get(&self, key: &Cid) -> Option<Cid> {
723		self.map.get(key).cloned()
724	}
725
726	pub fn insert(&mut self, key: Cid, value: Cid) -> Option<Cid> {
727		self.map.insert(key, value)
728	}
729
730	pub fn extend(&mut self, items: impl IntoIterator<Item = (Cid, Cid)>) {
731		self.map.extend(items);
732	}
733
734	pub fn append(&mut self, other: &mut BlockMapping) {
735		self.map.append(&mut other.map);
736	}
737
738	pub fn remove(&mut self, key: &Cid) -> Option<Cid> {
739		self.map.remove(key)
740	}
741
742	pub fn get_first_by_value(&self, value: &Cid) -> Option<Cid> {
743		self.map.iter().find_map(|(k, v)| {
744			if v == value {
745				return Some(*k);
746			}
747			None
748		})
749	}
750
751	pub fn clear(&mut self) {
752		self.map.clear();
753	}
754
755	pub fn iter(&self) -> impl Iterator<Item = (&Cid, &Cid)> {
756		self.map.iter()
757	}
758
759	pub fn into_iter(self) -> impl Iterator<Item = (Cid, Cid)> {
760		self.map.into_iter()
761	}
762
763	pub async fn from_cids<S>(
764		storage: &EncryptedBlockStorage<S>,
765		cids: impl Iterator<Item = Cid>,
766	) -> Result<Self, BlockMappingError>
767	where
768		S: BlockStorage,
769	{
770		let mut mapping = BlockMapping::new();
771		for cid in cids {
772			if cid.codec() == BLOCK_MULTICODEC {
773				let encrypted_block: EncryptedBlock = storage.next.get(&cid).await?.try_into()?;
774				let block = encrypted_block.block(&storage.key)?;
775				mapping.insert(block.cid, cid);
776			}
777		}
778		Ok(mapping)
779	}
780
781	/// Read block mappings from `cid` via an block storage.
782	/// Idempotency: Yes
783	pub async fn read_mappings<S>(
784		&mut self,
785		storage: &EncryptedBlockStorage<S>,
786		cid: &Cid,
787	) -> Result<usize, StorageError>
788	where
789		S: BlockStorage + Clone + Send + Sync + 'static,
790	{
791		let mut count = 0;
792		let mut tasks = FuturesOrdered::new();
793
794		// first
795		let read = |cid: Cid| async move { storage.get_unencrypted(&cid).await };
796		tasks.push_back(read(*cid));
797
798		// work
799		while let Some(block) = tasks.next().await {
800			let block = block?;
801
802			// validate
803			MultiCodec::with_cbor(block.cid())?;
804
805			// get node
806			let node: Node<(Cid, Cid)> =
807				from_cbor(block.data()).map_err(|e| StorageError::InvalidArgument(e.into()))?;
808
809			// read
810			match node {
811				Node::Node(links) => {
812					for link in links {
813						tasks.push_back(read(link.into()));
814					}
815				},
816				Node::Leaf(entries) => {
817					for (key, value) in entries.into_iter() {
818						self.insert(key, value);
819						count += 1;
820					}
821				},
822			}
823		}
824
825		// result
826		Ok(count)
827	}
828
829	/// Encode mapping into blocks.
830	///
831	/// Returns the root cid and all blocks.
832	pub fn to_blocks<S>(&self, serializer: S, options: WriteOptions) -> Result<(Option<Cid>, Vec<Block>), StorageError>
833	where
834		S: NodeSerializer<Node<(Cid, Cid)>, (Cid, Cid)>,
835	{
836		// blocks
837		let mut builder = NodeBuilder::<(Cid, Cid), Node<(Cid, Cid)>, S>::new(
838			options.max_block_size,
839			options.max_children,
840			serializer,
841		);
842		for (key, value) in self.map.iter() {
843			builder.push((*key, *value)).map_err(|e| StorageError::Internal(e.into()))?;
844		}
845		let (root, blocks) = builder.into_blocks().map_err(|e| StorageError::Internal(e.into()))?;
846
847		// result
848		Ok((*root.cid(), blocks))
849	}
850}
851impl Default for BlockMapping {
852	fn default() -> Self {
853		Self::new()
854	}
855}
856
857/// Create encrypted block.
858struct EncryptedNodeSerializer {
859	key: Secret,
860	algorithm: Algorithm,
861}
862impl<T> NodeSerializer<Node<T>, T> for EncryptedNodeSerializer
863where
864	T: Clone + Serialize,
865{
866	fn nodes(&mut self, nodes: Vec<Link<Node<T>>>) -> Result<Node<T>, NodeBuilderError> {
867		Ok(Node::Node(nodes))
868	}
869
870	fn leaf(&mut self, entries: Vec<T>) -> Result<Node<T>, NodeBuilderError> {
871		Ok(Node::Leaf(entries))
872	}
873
874	fn serialize(&mut self, max_block_size: usize, node: Node<T>) -> Result<Block, NodeBuilderError> {
875		let block: Block = DefaultNodeSerializer::new().serialize(max_block_size, node)?;
876		let encrypted = EncryptedBlock::encrypt(self.algorithm, &self.key, block)?;
877		let encrypted_block: Block = encrypted.try_into()?;
878		Ok(encrypted_block)
879	}
880}
881impl From<AlgorithmError> for NodeBuilderError {
882	fn from(err: AlgorithmError) -> Self {
883		NodeBuilderError::Encoding(err.into())
884	}
885}
886
887pub struct WriteOptions {
888	/// Max byte size for each block.
889	max_block_size: usize,
890
891	/// Max children for each block.
892	max_children: usize,
893}
894impl WriteOptions {
895	fn new(max_block_size: usize) -> Self {
896		Self { max_block_size, max_children: 174 }
897	}
898}
899
900#[cfg(test)]
901mod tests {
902	use crate::{
903		crypto::{
904			block::{Algorithm, BLOCK_MULTICODEC},
905			secret::Secret,
906		},
907		BlockStorage, EncryptedBlockStorage, MemoryBlockStorage,
908	};
909	use cid::Cid;
910	use co_primitives::{BlockSerializer, DefaultParams, StorageError, StoreParams};
911	use serde::{Deserialize, Serialize};
912	use std::iter::repeat_n;
913
914	#[derive(Debug, Serialize, Deserialize)]
915	struct Test {
916		hello: String,
917	}
918
919	#[tokio::test]
920	async fn roundtrip() {
921		// storage
922		let memory = MemoryBlockStorage::default();
923		let algorithm = Algorithm::default();
924		let key = Secret::new(repeat_n(42, algorithm.key_size()).collect());
925		let encryption = EncryptedBlockStorage::new(memory.clone(), key, algorithm, Default::default());
926
927		// block
928		let data = Test { hello: "world".to_owned() };
929		let block = BlockSerializer::default().serialize(&data).unwrap();
930
931		// set
932		let result = encryption.set(block.clone()).await.unwrap();
933		assert_eq!(&result, block.cid());
934
935		// get
936		assert_eq!(encryption.get(block.cid()).await.unwrap(), block);
937
938		// validate that the CID dosn't exist in parent storage layer
939		assert!(matches!(memory.get(block.cid()).await, Err(StorageError::NotFound(_, _))));
940	}
941
942	#[tokio::test]
943	async fn store_mapping() {
944		// storage
945		let memory = MemoryBlockStorage::new();
946		let algorithm = Algorithm::default();
947		let key = Secret::new(repeat_n(42, algorithm.key_size()).collect());
948		let encryption = EncryptedBlockStorage::new(memory.clone(), key.clone(), algorithm, Default::default());
949
950		// blocks
951		let mut cids: Vec<Cid> = Default::default();
952		for i in 0..1024 {
953			let data = Test { hello: format!("Hi {}!", i).to_owned() };
954			let block = BlockSerializer::default().serialize(&data).unwrap();
955			cids.push(*block.cid());
956			encryption.set(block.clone()).await.unwrap();
957		}
958
959		// validate mapping
960		let mapping_cid = encryption.flush_mapping().await.unwrap().expect("Mappings if we have items");
961		assert_eq!(mapping_cid.codec(), BLOCK_MULTICODEC); // encrypted?
962
963		// validate cids
964		let memory_cids: Vec<Cid> = memory.entries().await.map(|block| *block.cid()).collect();
965		assert_eq!(memory_cids.len(), 7 + 1024); // 7 (merkle) mapping blocks and 1024 data blocks
966		for memory_cid in memory_cids.iter() {
967			let memory_block = memory.get(memory_cid).await.unwrap();
968			assert_eq!(memory_cid.codec(), BLOCK_MULTICODEC); // all blocks are encrypted
969			assert!(DefaultParams::MAX_BLOCK_SIZE > memory_block.data().len()); // all blocks fit in max block size
970		}
971
972		// validate load blocks again
973		let encryption = EncryptedBlockStorage::new(memory, key, algorithm, Default::default());
974		encryption.load_mapping(&mapping_cid).await.unwrap();
975		for cid in cids {
976			encryption.get(&cid).await.unwrap();
977		}
978	}
979}