Skip to main content

co_core_storage/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use anyhow::anyhow;
5use co_api::{
6	co, BlockStorage, BlockStorageExt, CoList, CoListTransaction, CoMap, CoMapTransaction, CoSet, CoreBlockStorage,
7	IsDefault, LazyTransaction, Link, OptionLink, Reducer, ReducerAction, StorageError, Tags, WeakCid,
8};
9use futures::{pin_mut, FutureExt, TryStreamExt};
10use std::collections::{BTreeMap, BTreeSet};
11
12#[co(state)]
13pub struct Storage {
14	/// Named pins.
15	#[serde(rename = "p", default, skip_serializing_if = "CoMap::is_empty")]
16	pub pins: CoMap<String, Pin>,
17
18	/// Block metadata.
19	#[serde(rename = "b", default, skip_serializing_if = "CoMap::is_empty")]
20	pub blocks: CoMap<WeakCid, BlockMetadata>,
21
22	/// Block metadata index to unreferenced (reference count of zero and children resolved) entries.
23	/// See: [`BlockMetadata::is_removable`]
24	#[serde(rename = "bu", default, skip_serializing_if = "CoMap::is_empty")]
25	pub blocks_index_unreferenced: CoMap<WeakCid, BlockInfo>,
26
27	/// Blocks that are recursively added but children are pending.
28	/// Blocks that are recursively deleted but children has not yet unreferenced.
29	#[serde(rename = "bs", default, skip_serializing_if = "CoMap::is_empty")]
30	pub block_structure_pending: CoMap<WeakCid, BlockStructurePending>,
31}
32
33#[co]
34pub enum BlockStructurePending {
35	/// recursively added but children has not yet referenced.
36	Reference(BlockInfo),
37}
38impl BlockStructurePending {
39	pub fn info(&self) -> &BlockInfo {
40		match self {
41			BlockStructurePending::Reference(block_info) => block_info,
42		}
43	}
44}
45
46#[co]
47#[derive(Default)]
48pub enum ReferenceMode {
49	/// Reference is shallow. Children not yet referenced.
50	#[default]
51	#[serde(rename = "s")]
52	Shallow,
53
54	/// All direct children has been referenced by this reference.
55	#[serde(rename = "r")]
56	Recursive,
57}
58impl ReferenceMode {
59	pub fn is_recursive(&self) -> bool {
60		match self {
61			ReferenceMode::Shallow => false,
62			ReferenceMode::Recursive => true,
63		}
64	}
65}
66
67#[co]
68pub struct BlockInfo {
69	/// Pinning keys that reference this block.
70	#[serde(rename = "p", default, skip_serializing_if = "CoSet::is_empty")]
71	pub pins: CoSet<String>,
72
73	/// This is a root reference.
74	#[serde(rename = "t", default, skip_serializing_if = "BlockType::is_unknown")]
75	pub block_type: BlockType,
76}
77impl BlockInfo {
78	pub async fn new<S>(storage: &S, pin: String, block_type: BlockType) -> Result<Self, StorageError>
79	where
80		S: BlockStorage + Clone + 'static,
81	{
82		let mut pins = CoSet::default();
83		pins.insert(storage, pin).await?;
84		Ok(Self { pins, block_type })
85	}
86
87	pub fn with_block_type(mut self, block_type: BlockType) -> Self {
88		self.block_type = block_type;
89		self
90	}
91}
92
93#[co]
94#[derive(Default)]
95pub enum BlockType {
96	#[default]
97	Unknown,
98
99	/// Block type will be set to root if is caused by a pin operation (create/remove).
100	Root,
101}
102impl BlockType {
103	pub fn is_unknown(&self) -> bool {
104		matches!(self, BlockType::Unknown)
105	}
106
107	pub fn is_root(&self) -> bool {
108		matches!(self, BlockType::Root)
109	}
110}
111
112#[co]
113#[derive(Default)]
114pub struct BlockMetadata {
115	/// Current reference count on this node.
116	#[serde(rename = "r")]
117	pub references: u32,
118
119	/// Reference mode.
120	#[serde(rename = "m", default, skip_serializing_if = "IsDefault::is_default")]
121	pub mode: ReferenceMode,
122
123	/// Additional metadata.
124	#[serde(rename = "t", default, skip_serializing_if = "Tags::is_empty")]
125	pub tags: Tags,
126}
127impl BlockMetadata {
128	pub fn is_removable(&self) -> bool {
129		self.references == 0 && self.mode.is_recursive()
130	}
131}
132
133#[co]
134#[derive(Default)]
135pub struct Pin {
136	/// Free strategy.
137	#[serde(rename = "s")]
138	pub strategy: PinStrategy,
139
140	/// Pinned references.
141	/// Sorted by insertion (oldest is first).
142	/// Every pinned item will automatically maintain a reference count.
143	#[serde(rename = "r", default, skip_serializing_if = "CoList::is_empty")]
144	pub references: CoList<WeakCid>,
145
146	/// Pinned references count.
147	#[serde(rename = "c")]
148	pub references_count: u32,
149}
150
151#[co]
152#[derive(Default)]
153pub enum PinStrategy {
154	/// Unlimited pins.
155	#[default]
156	#[serde(rename = "u")]
157	Unlimited,
158
159	/// Maximum count of references.
160	#[serde(rename = "h")]
161	MaxCount(u32),
162}
163
164/// A list of references.
165/// A single [`Cid`] is allowed to be contained multiple times (=reference count).
166#[co]
167#[derive(Default)]
168#[serde(transparent)]
169pub struct References(Vec<(WeakCid, BlockMetadata)>);
170impl References {
171	pub fn new() -> Self {
172		Self::default()
173	}
174
175	pub fn insert(&mut self, reference: impl Into<WeakCid>) {
176		let reference = reference.into();
177		let (_, block) = match self.0.iter_mut().find(|(cid, _block)| *cid == reference) {
178			Some(block) => block,
179			None => {
180				self.0.push((reference, Default::default()));
181				// SAFETY: Just created.
182				self.0.last_mut().unwrap()
183			},
184		};
185		block.references += 1;
186	}
187
188	pub fn insert_with_tags(&mut self, reference: impl Into<WeakCid>, tags: Tags) {
189		let reference = reference.into();
190		let (_, block) = match self.0.iter_mut().find(|(cid, _block)| *cid == reference) {
191			Some(block) => block,
192			None => {
193				self.0.push((reference, Default::default()));
194				// SAFETY: Just created.
195				self.0.last_mut().unwrap()
196			},
197		};
198		block.references += 1;
199		block.tags.extend(tags);
200	}
201
202	pub fn extend(&mut self, references: impl IntoIterator<Item = WeakCid>) {
203		for item in references.into_iter() {
204			self.insert(item);
205		}
206	}
207
208	pub fn extend_with_tags(&mut self, references: impl IntoIterator<Item = WeakCid>, tags: Tags) {
209		for item in references.into_iter() {
210			self.insert_with_tags(item, tags.clone());
211		}
212	}
213
214	pub fn len(&self) -> usize {
215		self.0.len()
216	}
217
218	pub fn is_empty(&self) -> bool {
219		self.0.is_empty()
220	}
221
222	pub fn iter(&self) -> impl Iterator<Item = WeakCid> + use<'_> {
223		self.0.iter().map(|(cid, _)| *cid)
224	}
225
226	pub fn iter_with_tags(&self) -> impl Iterator<Item = (WeakCid, &'_ Tags)> + use<'_> {
227		self.0.iter().map(|(cid, tags)| (*cid, &tags.tags))
228	}
229}
230impl FromIterator<WeakCid> for References {
231	fn from_iter<T: IntoIterator<Item = WeakCid>>(iter: T) -> Self {
232		let mut result = Self::default();
233		result.extend(iter);
234		result
235	}
236}
237impl<const N: usize> From<[WeakCid; N]> for References {
238	fn from(arr: [WeakCid; N]) -> Self {
239		if N == 0 {
240			return Self::default();
241		}
242		Self::from_iter(arr)
243	}
244}
245
246#[co]
247pub enum StorageAction {
248	/// Increase [`Cid`] reference count by one.
249	/// Refernces are creates on-the-fly if not exist.
250	/// Shallow: [`Cid`] links are not added automatically (not recusrive).
251	///
252	/// # Args
253	/// - BlockInfo contains causality data about the reference
254	/// - The blocks to reference
255	/// - Tags will be merged with [`BlockMetadata::tags`]
256	#[serde(rename = "r")]
257	Reference(BlockInfo, References),
258
259	/// Decrease [`Cid`] reference count by one.
260	#[serde(rename = "u")]
261	Unreference(BlockInfo, References),
262
263	/// Structurally reference/delete [`Cid`].
264	/// Expects all children references passed for a parent even is they not exist on disk.
265	///
266	/// # Vec Arguments
267	/// - `0`: The parent reference.
268	/// - `1`: The links of the parent reference.
269	#[serde(rename = "s")]
270	Structure(#[serde(with = "co_api::serde_map_as_list")] BTreeMap<WeakCid, References>),
271
272	/// Create [`Cid`] references with ref count of zero if the reference not exists yet.
273	/// This is normally used to track newly created blocks.
274	///
275	/// # Arguments
276	/// - `0`: The [`Cid`] of entries to create.
277	#[serde(rename = "c")]
278	ReferenceCreate(BlockInfo, References),
279
280	/// Mark to remove [`Cid`]. This will make the references shallow again.
281	/// And eventually shedule them to delete.
282	///
283	/// # Note
284	/// This is basically the same as Unreference.
285	///
286	/// # Arguments
287	/// - `0`: The BlockInfo of the blocks to remove.
288	/// - `0`: The [`Cid`] of entries to remove.
289	/// - `1`: Force remove all instances.
290	#[serde(rename = "ud")]
291	Remove(BlockInfo, BTreeSet<WeakCid>, bool),
292
293	/// Delete [`Cid`] references.
294	///
295	/// # Arguments
296	/// - `0`: The [`Cid`] of entries to remove with all of its direct children.
297	/// - `1`: Force delete. If false only references with a zero ref count will be removed.
298	#[serde(rename = "d")]
299	Delete(BlockInfo, BTreeMap<WeakCid, BTreeSet<WeakCid>>, bool),
300
301	/// Append tags to references.
302	#[serde(rename = "ti")]
303	TagsInsert(BTreeSet<WeakCid>, Tags),
304
305	/// Remove tags from references.
306	#[serde(rename = "tr")]
307	TagsRemove(BTreeSet<WeakCid>, Tags),
308
309	/// Create a named pin and reference all specified [`Cid`]s.
310	#[serde(rename = "pc")]
311	PinCreate(String, PinStrategy, References),
312
313	/// Update a named pin by setting the [`PinStrategy`].
314	#[serde(rename = "pu")]
315	PinUpdate(String, PinStrategy),
316
317	/// Insert references to a named pin and reference all specified [`Cid`]s.
318	#[serde(rename = "pr")]
319	PinReference(String, References),
320
321	/// Remove a named pin and unreference all [`Cid`]s.
322	#[serde(rename = "pd")]
323	PinRemove(String),
324
325	/// Batch process actions.
326	#[serde(rename = "b")]
327	Batch(CoList<StorageAction>),
328}
329impl Storage {
330	/// Create inital state.
331	pub async fn initial_state<S: BlockStorage + Clone + 'static>(
332		storage: &S,
333		actions: Vec<StorageAction>,
334	) -> Result<OptionLink<Self>, anyhow::Error> {
335		let mut state = Storage::default();
336		let mut transaction = StorageTransaction::open(storage.clone(), &state).await?;
337		for action in actions {
338			reduce(&mut transaction, action).await?;
339		}
340		transaction.store(&mut state).await?;
341		Ok(storage.set_value(&state).await?.into())
342	}
343}
344impl Reducer<StorageAction> for Storage {
345	async fn reduce(
346		state: OptionLink<Self>,
347		event: Link<ReducerAction<StorageAction>>,
348		storage: &CoreBlockStorage,
349	) -> Result<Link<Self>, anyhow::Error> {
350		let event = storage.get_value(&event).await?;
351		let mut state = storage.get_value_or_default(&state).await?;
352		let mut transaction = StorageTransaction::open(storage.clone(), &state).await?;
353		reduce(&mut transaction, event.payload).await?;
354		transaction.store(&mut state).await?;
355		Ok(storage.set_value(&state).await?)
356	}
357}
358
359struct StorageTransaction<S>
360where
361	S: BlockStorage + Clone + 'static,
362{
363	storage: S,
364	pins_changed: bool,
365	pins: LazyTransaction<S, CoMap<String, Pin>>,
366	blocks_changed: bool,
367	blocks: LazyTransaction<S, CoMap<WeakCid, BlockMetadata>>,
368	blocks_index_unreferenced_changed: bool,
369	blocks_index_unreferenced: LazyTransaction<S, CoMap<WeakCid, BlockInfo>>,
370	block_structure_pending_changed: bool,
371	block_structure_pending: LazyTransaction<S, CoMap<WeakCid, BlockStructurePending>>,
372}
373impl<S> StorageTransaction<S>
374where
375	S: BlockStorage + Clone + 'static,
376{
377	async fn open(storage: S, state: &Storage) -> Result<Self, anyhow::Error> {
378		Ok(Self {
379			pins_changed: false,
380			pins: state.pins.open_lazy(&storage).await?,
381			blocks_changed: false,
382			blocks: state.blocks.open_lazy(&storage).await?,
383			blocks_index_unreferenced_changed: false,
384			blocks_index_unreferenced: state.blocks_index_unreferenced.open_lazy(&storage).await?,
385			block_structure_pending_changed: false,
386			block_structure_pending: state.block_structure_pending.open_lazy(&storage).await?,
387			storage,
388		})
389	}
390
391	async fn store(&mut self, state: &mut Storage) -> Result<(), anyhow::Error> {
392		if let Some(pins) = self.pins.opt_mut() {
393			if self.pins_changed {
394				state.pins = pins.store().await?;
395				self.pins_changed = false;
396			}
397		}
398		if let Some(blocks) = self.blocks.opt_mut() {
399			if self.blocks_changed {
400				state.blocks = blocks.store().await?;
401				self.blocks_changed = false;
402			}
403		}
404		if let Some(blocks_index_unreferenced) = self.blocks_index_unreferenced.opt_mut() {
405			if self.blocks_index_unreferenced_changed {
406				state.blocks_index_unreferenced = blocks_index_unreferenced.store().await?;
407				self.blocks_index_unreferenced_changed = false;
408			}
409		}
410		if let Some(block_structure_pending) = self.block_structure_pending.opt_mut() {
411			if self.block_structure_pending_changed {
412				state.block_structure_pending = block_structure_pending.store().await?;
413				self.block_structure_pending_changed = false;
414			}
415		}
416		Ok(())
417	}
418
419	fn storage(&self) -> &S {
420		&self.storage
421	}
422
423	async fn pins(&mut self) -> Result<&CoMapTransaction<S, String, Pin>, StorageError> {
424		self.pins.get().await
425	}
426
427	async fn pins_mut(&mut self) -> Result<&mut CoMapTransaction<S, String, Pin>, StorageError> {
428		self.pins_changed = true;
429		self.pins.get_mut().await
430	}
431
432	async fn blocks(&mut self) -> Result<&CoMapTransaction<S, WeakCid, BlockMetadata>, StorageError> {
433		self.blocks.get().await
434	}
435
436	async fn blocks_mut(&mut self) -> Result<&mut CoMapTransaction<S, WeakCid, BlockMetadata>, StorageError> {
437		self.blocks_changed = true;
438		self.blocks.get_mut().await
439	}
440
441	// async fn blocks_index_unreferenced(&mut self) -> Result<&CoSetTransaction<S, WeakCid>, StorageError> {
442	// 	blocks_index_unreferenced.get().await
443	// }
444
445	async fn blocks_index_unreferenced_mut(
446		&mut self,
447	) -> Result<&mut CoMapTransaction<S, WeakCid, BlockInfo>, StorageError> {
448		self.blocks_index_unreferenced_changed = true;
449		self.blocks_index_unreferenced.get_mut().await
450	}
451
452	async fn block_structure_pending_mut(
453		&mut self,
454	) -> Result<&mut CoMapTransaction<S, WeakCid, BlockStructurePending>, StorageError> {
455		self.block_structure_pending_changed = true;
456		self.block_structure_pending.get_mut().await
457	}
458}
459
460async fn reduce<S>(transaction: &mut StorageTransaction<S>, action: StorageAction) -> Result<(), anyhow::Error>
461where
462	S: BlockStorage + Clone + 'static,
463{
464	match action {
465		StorageAction::Reference(info, references) => reduce_reference(transaction, info, references).boxed().await?,
466		StorageAction::Unreference(info, references) => {
467			reduce_unreference(transaction, info, references).boxed().await?
468		},
469		StorageAction::Structure(structure) => reduce_structure(transaction, structure).boxed().await?,
470		StorageAction::ReferenceCreate(info, references) => {
471			reduce_reference_create(transaction, info, references).boxed().await?
472		},
473		StorageAction::Remove(info, cids, zero) => reduce_remove(transaction, cids, zero, info).boxed().await?,
474		StorageAction::Delete(info, cids, force) => reduce_delete(transaction, cids, force, info).boxed().await?,
475		StorageAction::TagsInsert(cids, tags) => reduce_tags_insert(transaction, cids, tags).boxed().await?,
476		StorageAction::TagsRemove(cids, tags) => reduce_tags_remove(transaction, cids, tags).boxed().await?,
477		StorageAction::PinCreate(key, strategy, references) => {
478			reduce_pin_create(transaction, key, strategy, references).boxed().await?
479		},
480		StorageAction::PinUpdate(key, strategy) => reduce_pin_update(transaction, key, strategy).boxed().await?,
481		StorageAction::PinReference(key, references) => {
482			reduce_pin_reference(transaction, key, references).boxed().await?
483		},
484		StorageAction::PinRemove(key) => reduce_pin_remove(transaction, key).boxed().await?,
485		StorageAction::Batch(actions) => {
486			let actions_stream = actions.stream(transaction.storage());
487			pin_mut!(actions_stream);
488			while let Some((_, action)) = actions_stream.try_next().await? {
489				Box::pin(reduce(transaction, action)).await?;
490			}
491		},
492	}
493	Ok(())
494}
495
496/// See: [`StorageAction::ReferenceStructure`]
497async fn reduce_structure<S>(
498	transaction: &mut StorageTransaction<S>,
499	structure: BTreeMap<WeakCid, References>,
500) -> Result<(), anyhow::Error>
501where
502	S: BlockStorage + Clone + 'static,
503{
504	for (parent, children) in structure {
505		reference_structure_cid(transaction, parent, children).await?;
506	}
507	Ok(())
508}
509
510/// Reference/Unreference a children of a recursive reference.
511/// When this gets called for Unreference the `parent` block already has been deleted.
512async fn reference_structure_cid<S>(
513	transaction: &mut StorageTransaction<S>,
514	parent: WeakCid,
515	children: References,
516) -> Result<(), anyhow::Error>
517where
518	S: BlockStorage + Clone + 'static,
519{
520	// remove pending flag and ignore if not pending
521	let pending = match transaction.block_structure_pending_mut().await?.remove(parent).await? {
522		Some(info) => info,
523		None => {
524			return Ok(());
525		},
526	};
527	match pending {
528		BlockStructurePending::Reference(info) => {
529			// get block
530			let mut block = transaction
531				.blocks()
532				.await?
533				.get(&parent)
534				.await?
535				.ok_or(anyhow!("Reference not found: {:?}", parent))?;
536
537			// reference children
538			let info = info.clone().with_block_type(BlockType::Unknown);
539			for (child_cid, child_tags) in children.0.into_iter() {
540				reference_cid(transaction, &info, child_cid, child_tags.references, &child_tags.tags).await?;
541			}
542
543			// mode
544			block.mode = ReferenceMode::Recursive;
545
546			// remove
547			if block.is_removable() {
548				transaction
549					.blocks_index_unreferenced_mut()
550					.await?
551					.insert(parent, info.clone())
552					.await?;
553			}
554
555			// store
556			transaction.blocks_mut().await?.insert(parent, block).await?;
557		},
558	}
559	Ok(())
560}
561
562async fn reduce_pin_remove<S>(transaction: &mut StorageTransaction<S>, key: String) -> Result<(), anyhow::Error>
563where
564	S: BlockStorage + Clone + 'static,
565{
566	// pin
567	let pin = transaction
568		.pins_mut()
569		.await?
570		.remove(key.clone())
571		.await?
572		.ok_or(anyhow!("Pin not found: {}", key))?;
573	let info = BlockInfo::new(transaction.storage(), key.clone(), BlockType::Root).await?;
574
575	// references
576	let cids = pin.references.stream(transaction.storage()).map_ok(|(_key, value)| value);
577	pin_mut!(cids);
578	while let Some(reference) = cids.try_next().await? {
579		unreference_cid(transaction, &info, reference, Unreference::ByOne).await?;
580	}
581
582	// result
583	Ok(())
584}
585
586async fn reduce_pin_reference<S>(
587	transaction: &mut StorageTransaction<S>,
588	key: String,
589	references: References,
590) -> Result<(), anyhow::Error>
591where
592	S: BlockStorage + Clone + 'static,
593{
594	// apply
595	pin_reference(transaction, key, references).await?;
596
597	// result
598	Ok(())
599}
600
601async fn pin_reference<S>(
602	transaction: &mut StorageTransaction<S>,
603	key: String,
604	references: References,
605) -> Result<(), anyhow::Error>
606where
607	S: BlockStorage + Clone + 'static,
608{
609	let mut pin = transaction
610		.pins()
611		.await?
612		.get(&key)
613		.await?
614		.ok_or(anyhow!("Pin not found: {}", key))?;
615	let mut pin_references = pin.references.open(transaction.storage()).await?;
616	let info = BlockInfo::new(transaction.storage(), key.clone(), BlockType::Root).await?;
617
618	// insert references
619	for (reference, block_metadata) in references.0 {
620		pin_references.push(reference).await?;
621		pin.references_count += 1;
622		reference_cid(transaction, &info, reference, block_metadata.references, &block_metadata.tags).await?;
623	}
624
625	// apply pin strategy
626	apply_pin_strategy(transaction, &mut pin, &mut pin_references, info.clone()).await?;
627
628	// store pin
629	pin.references = pin_references.store().await?;
630	transaction.pins_mut().await?.insert(key, pin).await?;
631
632	Ok(())
633}
634
635/// Apply pin strategy on pin.
636async fn apply_pin_strategy<S>(
637	transaction: &mut StorageTransaction<S>,
638	pin: &mut Pin,
639	references: &mut CoListTransaction<S, WeakCid>,
640	info: BlockInfo,
641) -> Result<bool, anyhow::Error>
642where
643	S: BlockStorage + Clone + 'static,
644{
645	let mut changed = false;
646	match &pin.strategy {
647		PinStrategy::Unlimited => {},
648		PinStrategy::MaxCount(count) => {
649			while pin.references_count > *count {
650				if let Some((_, remove)) = references.pop_front().await? {
651					unreference_cid(transaction, &info, remove, Unreference::ByOne).await?;
652				}
653				pin.references_count -= 1;
654				changed = true;
655			}
656		},
657	}
658	Ok(changed)
659}
660
661async fn reduce_pin_create<S>(
662	transaction: &mut StorageTransaction<S>,
663	key: String,
664	strategy: PinStrategy,
665	references: References,
666) -> Result<(), anyhow::Error>
667where
668	S: BlockStorage + Clone + 'static,
669{
670	// validate
671	if transaction.pins().await?.contains_key(&key).await? {
672		return Err(anyhow::anyhow!("Pin already exists: {}", key));
673	}
674
675	// insert pin
676	let pin = Pin { strategy, references: Default::default(), references_count: 0 };
677	transaction.pins_mut().await?.insert(key.clone(), pin).await?;
678
679	// initial
680	if !references.0.is_empty() {
681		pin_reference(transaction, key, references).await?;
682	}
683
684	// result
685	Ok(())
686}
687
688async fn reduce_pin_update<S>(
689	transaction: &mut StorageTransaction<S>,
690	key: String,
691	strategy: PinStrategy,
692) -> Result<(), anyhow::Error>
693where
694	S: BlockStorage + Clone + 'static,
695{
696	// get
697	let Some(mut pin) = transaction.pins().await?.get(&key).await? else {
698		return Err(anyhow::anyhow!("Pin not exists: {}", key));
699	};
700	let info = BlockInfo::new(transaction.storage(), key.clone(), BlockType::Root).await?;
701
702	// update pin strategy
703	pin.strategy = strategy;
704
705	// enfore pin strategy
706	let mut references = pin.references.open(transaction.storage()).await?;
707	apply_pin_strategy(transaction, &mut pin, &mut references, info).await?;
708
709	// store pin
710	pin.references = references.store().await?;
711	transaction.pins_mut().await?.insert(key, pin).await?;
712
713	// result
714	Ok(())
715}
716
717async fn reduce_tags_remove<S>(
718	transaction: &mut StorageTransaction<S>,
719	cids: BTreeSet<WeakCid>,
720	tags: Tags,
721) -> Result<(), anyhow::Error>
722where
723	S: BlockStorage + Clone + 'static,
724{
725	for cid in cids {
726		transaction
727			.blocks_mut()
728			.await?
729			.try_update_or_insert_async(cid, |mut block| async {
730				block.tags.clear(Some(&tags));
731				Ok(block)
732			})
733			.await?;
734	}
735	Ok(())
736}
737
738async fn reduce_tags_insert<S>(
739	transaction: &mut StorageTransaction<S>,
740	cids: BTreeSet<WeakCid>,
741	tags: Tags,
742) -> Result<(), anyhow::Error>
743where
744	S: BlockStorage + Clone + 'static,
745{
746	for cid in cids {
747		transaction
748			.blocks_mut()
749			.await?
750			.try_update_or_insert_async(cid, |mut block| {
751				let mut tags = tags.clone();
752				async move {
753					block.tags.append(&mut tags);
754					Ok(block)
755				}
756			})
757			.await?;
758	}
759	Ok(())
760}
761
762async fn reduce_remove<S>(
763	transaction: &mut StorageTransaction<S>,
764	cids: impl IntoIterator<Item = WeakCid>,
765	zero: bool,
766	info: BlockInfo,
767) -> Result<(), anyhow::Error>
768where
769	S: BlockStorage + Clone + 'static,
770{
771	for cid in cids {
772		unreference_cid(transaction, &info, cid, if zero { Unreference::ToZero } else { Unreference::ByOne }).await?;
773	}
774	Ok(())
775}
776
777/// Delete block references from storage state.
778/// After this call the parent blocks can be deleted.
779async fn reduce_delete<S>(
780	transaction: &mut StorageTransaction<S>,
781	cids: impl IntoIterator<Item = (WeakCid, BTreeSet<WeakCid>)>,
782	force: bool,
783	info: BlockInfo,
784) -> Result<(), anyhow::Error>
785where
786	S: BlockStorage + Clone + 'static,
787{
788	// remove
789	let info = info.clone().with_block_type(BlockType::Unknown);
790	for (cid, links) in cids {
791		// structure
792		let mut references = References::default();
793		references.extend(links.iter().cloned());
794		reference_structure_cid(transaction, cid, references).await?;
795
796		// remove block
797		let block = match transaction.blocks().await?.get(&cid).await? {
798			Some(block) if (block.references == 0 || force) => transaction.blocks_mut().await?.remove(cid).await?,
799			_ => None,
800		};
801		if let Some(block) = block {
802			// remove from index
803			transaction.blocks_index_unreferenced_mut().await?.remove(cid).await?;
804
805			// unreference links
806			match block.mode {
807				ReferenceMode::Shallow => {},
808				ReferenceMode::Recursive => {
809					for link in links.iter() {
810						unreference_cid(transaction, &info, *link, Unreference::ByOne).await?;
811					}
812				},
813			}
814		}
815	}
816
817	// result
818	Ok(())
819}
820
821async fn reduce_reference<S>(
822	transaction: &mut StorageTransaction<S>,
823	info: BlockInfo,
824	references: References,
825) -> Result<(), anyhow::Error>
826where
827	S: BlockStorage + Clone + 'static,
828{
829	for (reference, data) in references.0.into_iter() {
830		reference_cid(transaction, &info, reference, data.references, &data.tags).await?;
831	}
832	Ok(())
833}
834
835async fn reduce_reference_create<S>(
836	transaction: &mut StorageTransaction<S>,
837	info: BlockInfo,
838	references: References,
839) -> Result<(), anyhow::Error>
840where
841	S: BlockStorage + Clone + 'static,
842{
843	for (reference, block_metadata) in references.0 {
844		if transaction.blocks().await?.get(&reference).await?.is_none() {
845			// we only want to reuse tags here
846			// other data is managed internally by the core
847			let block = BlockMetadata { tags: block_metadata.tags, ..Default::default() };
848
849			// block
850			transaction.blocks_mut().await?.insert(reference, block).await?;
851
852			// shallow
853			transaction
854				.block_structure_pending_mut()
855				.await?
856				.insert(reference, BlockStructurePending::Reference(info.clone()))
857				.await?;
858		}
859	}
860	Ok(())
861}
862
863async fn reference_cid<S>(
864	transaction: &mut StorageTransaction<S>,
865	info: &BlockInfo,
866	cid: WeakCid,
867	references: u32,
868	tags: &Tags,
869) -> Result<(), anyhow::Error>
870where
871	S: BlockStorage + Clone + 'static,
872{
873	let block = transaction.blocks().await?.get(&cid).await?;
874
875	// new block?
876	if let Some(block) = &block {
877		// remove from index as we have references now
878		if block.references == 0 {
879			transaction.blocks_index_unreferenced_mut().await?.remove(cid).await?;
880		}
881	} else {
882		// add to pending as we are about to create the block
883		transaction
884			.block_structure_pending_mut()
885			.await?
886			.insert(cid, BlockStructurePending::Reference(info.clone()))
887			.await?;
888	}
889
890	// increment
891	let mut block = block.unwrap_or_default();
892	block.references += references;
893	block.tags.extend(tags.iter().cloned());
894	transaction.blocks_mut().await?.insert(cid, block).await?;
895
896	// result
897	Ok(())
898}
899
900async fn reduce_unreference<S>(
901	transaction: &mut StorageTransaction<S>,
902	info: BlockInfo,
903	references: References,
904) -> Result<(), anyhow::Error>
905where
906	S: BlockStorage + Clone + 'static,
907{
908	for (reference, data) in references.0.into_iter() {
909		unreference_cid(transaction, &info, reference, Unreference::By(data.references)).await?;
910	}
911	Ok(())
912}
913
914enum Unreference {
915	ByOne,
916	By(u32),
917	ToZero,
918	// To(u32),
919}
920
921async fn unreference_cid<S>(
922	transaction: &mut StorageTransaction<S>,
923	info: &BlockInfo,
924	cid: WeakCid,
925	reference: Unreference,
926) -> Result<bool, anyhow::Error>
927where
928	S: BlockStorage + Clone + 'static,
929{
930	Ok(match transaction.blocks().await?.get(&cid).await? {
931		Some(mut block) if block.references > 0 => {
932			// decrement
933			match reference {
934				Unreference::ByOne => {
935					block.references -= 1;
936				},
937				Unreference::By(by) => {
938					block.references -= by;
939				},
940				Unreference::ToZero => {
941					block.references = 0;
942				},
943				// Unreference::To(to) => {
944				// 	block.references = to;
945				// },
946			}
947
948			// index
949			if block.is_removable() {
950				transaction
951					.blocks_index_unreferenced_mut()
952					.await?
953					.insert(cid, info.clone())
954					.await?;
955			}
956
957			// store
958			transaction.blocks_mut().await?.insert(cid, block).await?;
959
960			// result
961			true
962		},
963		_ => false,
964	})
965}
966
967#[cfg(test)]
968mod tests {
969	use crate::{PinStrategy, References, Storage, StorageAction};
970	use cid::Cid;
971	use co_api::{BlockSerializer, BlockStorageExt, CoreBlockStorage, OptionLink, Reducer, ReducerAction, WeakCid};
972	use co_storage::MemoryBlockStorage;
973	use futures::TryStreamExt;
974	use ipld_core::{ipld::Ipld, serde::to_ipld};
975	use std::{collections::BTreeMap, str::FromStr};
976
977	#[test]
978	fn test_serialize_storage_action() {
979		let cid1 = *BlockSerializer::default().serialize(&1).unwrap().cid();
980		let cid2 = *BlockSerializer::default().serialize(&2).unwrap().cid();
981		let cid3 = *BlockSerializer::default().serialize(&2).unwrap().cid();
982		let mut map = BTreeMap::<WeakCid, References>::new();
983		map.entry(cid1.into()).or_default().insert(cid2);
984		map.entry(cid1.into()).or_default().insert(cid3);
985
986		// action
987		let action = StorageAction::Structure(map.into_iter().collect());
988		let block = BlockSerializer::default().serialize(&action).unwrap();
989		let action_deserialize: StorageAction = BlockSerializer::default().deserialize(&block).unwrap();
990		assert_eq!(action_deserialize, action);
991
992		// reducer action
993		let reducer_action: ReducerAction<StorageAction> =
994			ReducerAction { core: "storage".to_owned(), from: "test".to_owned(), payload: action.clone(), time: 123 };
995		let block = BlockSerializer::default().serialize(&reducer_action).unwrap();
996		let reducer_action_deserialize: ReducerAction<StorageAction> =
997			BlockSerializer::default().deserialize(&block).unwrap();
998		assert_eq!(reducer_action_deserialize, reducer_action);
999
1000		// reducer action ipld
1001		let reducer_action_ipld: ReducerAction<Ipld> = ReducerAction {
1002			core: "storage".to_owned(),
1003			from: "test".to_owned(),
1004			payload: to_ipld(action).unwrap(),
1005			time: 123,
1006		};
1007		let reducer_action_ipld_deserialize: ReducerAction<Ipld> =
1008			BlockSerializer::default().deserialize(&block).unwrap();
1009
1010		assert_eq!(reducer_action_ipld_deserialize, reducer_action_ipld);
1011	}
1012
1013	/// This is data gatered from storage_cleanup test which failed.
1014	#[tokio::test]
1015	async fn test_blocks_index_unreferenced_is_correct() {
1016		fn cid(s: &str) -> co_api::WeakCid {
1017			Cid::from_str(s).unwrap().into()
1018		}
1019		let storage = CoreBlockStorage::new(MemoryBlockStorage::default(), true);
1020
1021		// actions
1022		let actions = [
1023			ReducerAction {
1024				from: "did:local:device".into(),
1025				time: 0,
1026				core: "storage".into(),
1027				payload: StorageAction::PinCreate("co.local.state".into(), PinStrategy::MaxCount(100), [].into()),
1028			},
1029			ReducerAction {
1030				from: "did:local:device".into(),
1031				time: 0,
1032				core: "storage".into(),
1033				payload: StorageAction::PinCreate("co.local.log".into(), PinStrategy::MaxCount(100), [].into()),
1034			},
1035			ReducerAction {
1036				from: "did:local:device".into(),
1037				time: 0,
1038				core: "storage".into(),
1039				payload: StorageAction::PinReference(
1040					"co.local.state".into(),
1041					[(cid("bagakbqabdyqar5vlsfqd3g4mxngt3yl7nx2na2kb4jybylzn5bktwnihjhih42a"))].into(),
1042				),
1043			},
1044			ReducerAction {
1045				from: "did:local:device".into(),
1046				time: 0,
1047				core: "storage".into(),
1048				payload: StorageAction::Structure(
1049					[
1050						(
1051							(cid("bagakbqabdyqar5vlsfqd3g4mxngt3yl7nx2na2kb4jybylzn5bktwnihjhih42a")),
1052							[
1053								(cid("QmUDCqxH2vm9MBb2mLsGmHsoCMXBBnd4iWDruZdcSGaN7d")),
1054								(cid("QmY8fStJQWVsfY4ae7KzfgeJQKqcXEbp1THut3Uz4aBBP6")),
1055								(cid("QmcS1eGNuBM3a4pf8hw4hEWwdALXEEnimqZBhSBo8aHS7K")),
1056								(cid("bagakbqabdyqcgkbe7hbegknbemf73xlnooct2g35zzrbdkus6z342bir46k5zgq")),
1057							]
1058							.into(),
1059						),
1060						(
1061							(cid("bagakbqabdyqcgkbe7hbegknbemf73xlnooct2g35zzrbdkus6z342bir46k5zgq")),
1062							[(cid("bagakbqabdyqfomt5rhne4gqclpbi7t2emjthzcm4frymppcndo27rxum6tugwoi"))].into(),
1063						),
1064						(
1065							(cid("bagakbqabdyqfomt5rhne4gqclpbi7t2emjthzcm4frymppcndo27rxum6tugwoi")),
1066							[(cid("bagakbqabdyqdyybl3osmbp4ckybdvmwccje5kxa6bhy6yz7p3ftrsngh4r6lg5a"))].into(),
1067						),
1068					]
1069					.into(),
1070				),
1071			},
1072			ReducerAction {
1073				from: "did:local:device".into(),
1074				time: 0,
1075				core: "storage".into(),
1076				payload: StorageAction::PinReference(
1077					"co.local.state".into(),
1078					[(cid("bagakbqabdyqldyp7kxv6p5wb3edrywc74xfkgauqzlumlxncdlzncbwt36y7iby"))].into(),
1079				),
1080			},
1081			ReducerAction {
1082				from: "did:local:device".into(),
1083				time: 1745513086640,
1084				core: "storage".into(),
1085				payload: StorageAction::PinUpdate("co.local.state".into(), PinStrategy::MaxCount(1)),
1086			},
1087			ReducerAction {
1088				from: "did:local:device".into(),
1089				time: 0,
1090				core: "storage".into(),
1091				payload: StorageAction::PinReference(
1092					"co.local.state".into(),
1093					[(cid("bagakbqabdyqklkdo5hv4smstsuv2t347nnonrdgylyrb3qepc2rh5p2qtntmbba"))].into(),
1094				),
1095			},
1096			ReducerAction {
1097				from: "did:local:device".into(),
1098				time: 0,
1099				core: "storage".into(),
1100				payload: StorageAction::Structure(
1101					[
1102						(
1103							(cid("bagakbqabdyqklkdo5hv4smstsuv2t347nnonrdgylyrb3qepc2rh5p2qtntmbba")),
1104							[(cid("bagakbqabdyqh7c4dgnjexzftz5aethy36hwi4q6iosiwy32e6lortxcp3l6et3a"))].into(),
1105						),
1106						(
1107							(cid("bagakbqabdyqh7c4dgnjexzftz5aethy36hwi4q6iosiwy32e6lortxcp3l6et3a")),
1108							[
1109								(cid("bagakbqabdyqc63i6iuxec7qgmzor4a554ihpznnbmnonh2l5l2h6w4vcvyn2zia")),
1110								(cid("bagakbqabdyqodqxbpakp23ngiqce4hhif2w5n54ujalwomt5lravwfezkdgyica")),
1111								(cid("bagakbqabdyqosx7w5aag3uid3tgh6w3g7p5vmtykf4cqefg7zwbpkf27bfvqlby")),
1112							]
1113							.into(),
1114						),
1115						(
1116							(cid("bagakbqabdyqc63i6iuxec7qgmzor4a554ihpznnbmnonh2l5l2h6w4vcvyn2zia")),
1117							[(cid("bagakbqabdyqpr2imdfe2lch4cqf7e4cjd5i26yrjsqai2gbwipxdesgukfupu7q"))].into(),
1118						),
1119						(
1120							(cid("bagakbqabdyqodqxbpakp23ngiqce4hhif2w5n54ujalwomt5lravwfezkdgyica")),
1121							[(cid("bagakbqabdyqjf3zpgq5jg7fjnnxo3pybvf63f7n73now5pvnednflv4ezgahadq"))].into(),
1122						),
1123						(
1124							(cid("bagakbqabdyqosx7w5aag3uid3tgh6w3g7p5vmtykf4cqefg7zwbpkf27bfvqlby")),
1125							[(cid("bagakbqabdyqebeu7wndmyhr63zfriwlaoddqy3sygd5it7xagora7xreqbbjk3q"))].into(),
1126						),
1127						(
1128							(cid("bagakbqabdyqjf3zpgq5jg7fjnnxo3pybvf63f7n73now5pvnednflv4ezgahadq")),
1129							[(cid("bagakbqabdyqocquirj4gdy2vvismgm52awzdgf66sqevvrswwvyalg57pt5bboy"))].into(),
1130						),
1131						(
1132							(cid("bagakbqabdyqocquirj4gdy2vvismgm52awzdgf66sqevvrswwvyalg57pt5bboy")),
1133							[(cid("bagakbqabdyqjamecznbm6ninfi5dryyvshenwnzbiunh7v6qrqy2ydlfkobjakq"))].into(),
1134						),
1135					]
1136					.into(),
1137				),
1138			},
1139		];
1140		let mut state_reference = OptionLink::none();
1141		for action in actions {
1142			let action_link = storage.set_value(&action).await.unwrap();
1143			state_reference = Storage::reduce(state_reference, action_link, &storage).await.unwrap().into();
1144		}
1145
1146		// validate
1147		let state = storage.get_value(&state_reference.unwrap()).await.unwrap();
1148		assert!(state
1149			.blocks_index_unreferenced
1150			.contains(&storage, &cid("bagakbqabdyqar5vlsfqd3g4mxngt3yl7nx2na2kb4jybylzn5bktwnihjhih42a"))
1151			.await
1152			.unwrap());
1153		assert!(!state
1154			.blocks_index_unreferenced
1155			.contains(&storage, &cid("bagakbqabdyqldyp7kxv6p5wb3edrywc74xfkgauqzlumlxncdlzncbwt36y7iby"))
1156			.await
1157			.unwrap());
1158	}
1159
1160	/// This is data gatered from storage_cleanup test which failed.
1161	#[tokio::test]
1162	async fn test_pin_strategy_max() {
1163		fn cid(s: &str) -> co_api::WeakCid {
1164			Cid::from_str(s).unwrap().into()
1165		}
1166		fn action(s: StorageAction) -> ReducerAction<StorageAction> {
1167			ReducerAction { from: "did:local:device".into(), time: 0, core: "storage".into(), payload: s }
1168		}
1169		let storage = CoreBlockStorage::new(MemoryBlockStorage::default(), true);
1170
1171		// actions
1172		let actions = [
1173			action(StorageAction::PinCreate("co:local".into(), PinStrategy::MaxCount(100), [].into())),
1174			action(StorageAction::PinReference(
1175				"co:local".into(),
1176				[(cid("bafyr4idmz6tdkhmdwhis4w2yov4g7ctjs72bcixzk2q7m3ioihhm4lvnky"))].into(),
1177			)),
1178			action(StorageAction::PinReference(
1179				"co:local".into(),
1180				[
1181					(cid("bafyr4id6ivgo6penzkew6tv2jsnncuq7a3zm7ajqd4nfmuxry7tq6xawbq")),
1182					(cid("bafyr4ih47p3rp5ppftduphy2fikph63iey5fwv42du6eyjccyu3ygvqvzy")),
1183				]
1184				.into(),
1185			)),
1186			action(StorageAction::PinUpdate("co:local".into(), PinStrategy::MaxCount(1))),
1187			action(StorageAction::PinReference(
1188				"co:local".into(),
1189				[
1190					(cid("bafyr4igconcuuuokydue7wglesze5vdyahzprgpkn7ukajd76besyhw2mi")),
1191					(cid("bafyr4iegqvwuhpdfp6vdfyxpxbm4qfjjo5y4rko34j6s7eqf2xfijo5chy")),
1192				]
1193				.into(),
1194			)),
1195		];
1196		let mut state_reference = OptionLink::none();
1197		for action in actions {
1198			let action_link = storage.set_value(&action).await.unwrap();
1199			state_reference = Storage::reduce(state_reference, action_link, &storage).await.unwrap().into();
1200		}
1201
1202		// validate
1203		let state = storage.get_value(&state_reference.unwrap()).await.unwrap();
1204		let pin = state.pins.get(&storage, &"co:local".to_owned()).await.unwrap().unwrap();
1205		let pin = pin
1206			.references
1207			.stream(&storage)
1208			.map_ok(|(_index, value)| value)
1209			.try_collect::<Vec<_>>()
1210			.await
1211			.unwrap();
1212		assert_eq!(pin, vec![cid("bafyr4iegqvwuhpdfp6vdfyxpxbm4qfjjo5y4rko34j6s7eqf2xfijo5chy")]);
1213	}
1214}