1use anyhow::anyhow;
5use co_api::{
6 co, BlockStorage, BlockStorageExt, CoList, CoListTransaction, CoMap, CoMapTransaction, CoSet, CoreBlockStorage,
7 IsDefault, LazyTransaction, Link, OptionLink, Reducer, ReducerAction, StorageError, Tags, WeakCid,
8};
9use futures::{pin_mut, FutureExt, TryStreamExt};
10use std::collections::{BTreeMap, BTreeSet};
11
12#[co(state)]
13pub struct Storage {
14 #[serde(rename = "p", default, skip_serializing_if = "CoMap::is_empty")]
16 pub pins: CoMap<String, Pin>,
17
18 #[serde(rename = "b", default, skip_serializing_if = "CoMap::is_empty")]
20 pub blocks: CoMap<WeakCid, BlockMetadata>,
21
22 #[serde(rename = "bu", default, skip_serializing_if = "CoMap::is_empty")]
25 pub blocks_index_unreferenced: CoMap<WeakCid, BlockInfo>,
26
27 #[serde(rename = "bs", default, skip_serializing_if = "CoMap::is_empty")]
30 pub block_structure_pending: CoMap<WeakCid, BlockStructurePending>,
31}
32
33#[co]
34pub enum BlockStructurePending {
35 Reference(BlockInfo),
37}
38impl BlockStructurePending {
39 pub fn info(&self) -> &BlockInfo {
40 match self {
41 BlockStructurePending::Reference(block_info) => block_info,
42 }
43 }
44}
45
46#[co]
47#[derive(Default)]
48pub enum ReferenceMode {
49 #[default]
51 #[serde(rename = "s")]
52 Shallow,
53
54 #[serde(rename = "r")]
56 Recursive,
57}
58impl ReferenceMode {
59 pub fn is_recursive(&self) -> bool {
60 match self {
61 ReferenceMode::Shallow => false,
62 ReferenceMode::Recursive => true,
63 }
64 }
65}
66
67#[co]
68pub struct BlockInfo {
69 #[serde(rename = "p", default, skip_serializing_if = "CoSet::is_empty")]
71 pub pins: CoSet<String>,
72
73 #[serde(rename = "t", default, skip_serializing_if = "BlockType::is_unknown")]
75 pub block_type: BlockType,
76}
77impl BlockInfo {
78 pub async fn new<S>(storage: &S, pin: String, block_type: BlockType) -> Result<Self, StorageError>
79 where
80 S: BlockStorage + Clone + 'static,
81 {
82 let mut pins = CoSet::default();
83 pins.insert(storage, pin).await?;
84 Ok(Self { pins, block_type })
85 }
86
87 pub fn with_block_type(mut self, block_type: BlockType) -> Self {
88 self.block_type = block_type;
89 self
90 }
91}
92
93#[co]
94#[derive(Default)]
95pub enum BlockType {
96 #[default]
97 Unknown,
98
99 Root,
101}
102impl BlockType {
103 pub fn is_unknown(&self) -> bool {
104 matches!(self, BlockType::Unknown)
105 }
106
107 pub fn is_root(&self) -> bool {
108 matches!(self, BlockType::Root)
109 }
110}
111
112#[co]
113#[derive(Default)]
114pub struct BlockMetadata {
115 #[serde(rename = "r")]
117 pub references: u32,
118
119 #[serde(rename = "m", default, skip_serializing_if = "IsDefault::is_default")]
121 pub mode: ReferenceMode,
122
123 #[serde(rename = "t", default, skip_serializing_if = "Tags::is_empty")]
125 pub tags: Tags,
126}
127impl BlockMetadata {
128 pub fn is_removable(&self) -> bool {
129 self.references == 0 && self.mode.is_recursive()
130 }
131}
132
133#[co]
134#[derive(Default)]
135pub struct Pin {
136 #[serde(rename = "s")]
138 pub strategy: PinStrategy,
139
140 #[serde(rename = "r", default, skip_serializing_if = "CoList::is_empty")]
144 pub references: CoList<WeakCid>,
145
146 #[serde(rename = "c")]
148 pub references_count: u32,
149}
150
151#[co]
152#[derive(Default)]
153pub enum PinStrategy {
154 #[default]
156 #[serde(rename = "u")]
157 Unlimited,
158
159 #[serde(rename = "h")]
161 MaxCount(u32),
162}
163
164#[co]
167#[derive(Default)]
168#[serde(transparent)]
169pub struct References(Vec<(WeakCid, BlockMetadata)>);
170impl References {
171 pub fn new() -> Self {
172 Self::default()
173 }
174
175 pub fn insert(&mut self, reference: impl Into<WeakCid>) {
176 let reference = reference.into();
177 let (_, block) = match self.0.iter_mut().find(|(cid, _block)| *cid == reference) {
178 Some(block) => block,
179 None => {
180 self.0.push((reference, Default::default()));
181 self.0.last_mut().unwrap()
183 },
184 };
185 block.references += 1;
186 }
187
188 pub fn insert_with_tags(&mut self, reference: impl Into<WeakCid>, tags: Tags) {
189 let reference = reference.into();
190 let (_, block) = match self.0.iter_mut().find(|(cid, _block)| *cid == reference) {
191 Some(block) => block,
192 None => {
193 self.0.push((reference, Default::default()));
194 self.0.last_mut().unwrap()
196 },
197 };
198 block.references += 1;
199 block.tags.extend(tags);
200 }
201
202 pub fn extend(&mut self, references: impl IntoIterator<Item = WeakCid>) {
203 for item in references.into_iter() {
204 self.insert(item);
205 }
206 }
207
208 pub fn extend_with_tags(&mut self, references: impl IntoIterator<Item = WeakCid>, tags: Tags) {
209 for item in references.into_iter() {
210 self.insert_with_tags(item, tags.clone());
211 }
212 }
213
214 pub fn len(&self) -> usize {
215 self.0.len()
216 }
217
218 pub fn is_empty(&self) -> bool {
219 self.0.is_empty()
220 }
221
222 pub fn iter(&self) -> impl Iterator<Item = WeakCid> + use<'_> {
223 self.0.iter().map(|(cid, _)| *cid)
224 }
225
226 pub fn iter_with_tags(&self) -> impl Iterator<Item = (WeakCid, &'_ Tags)> + use<'_> {
227 self.0.iter().map(|(cid, tags)| (*cid, &tags.tags))
228 }
229}
230impl FromIterator<WeakCid> for References {
231 fn from_iter<T: IntoIterator<Item = WeakCid>>(iter: T) -> Self {
232 let mut result = Self::default();
233 result.extend(iter);
234 result
235 }
236}
237impl<const N: usize> From<[WeakCid; N]> for References {
238 fn from(arr: [WeakCid; N]) -> Self {
239 if N == 0 {
240 return Self::default();
241 }
242 Self::from_iter(arr)
243 }
244}
245
246#[co]
247pub enum StorageAction {
248 #[serde(rename = "r")]
257 Reference(BlockInfo, References),
258
259 #[serde(rename = "u")]
261 Unreference(BlockInfo, References),
262
263 #[serde(rename = "s")]
270 Structure(#[serde(with = "co_api::serde_map_as_list")] BTreeMap<WeakCid, References>),
271
272 #[serde(rename = "c")]
278 ReferenceCreate(BlockInfo, References),
279
280 #[serde(rename = "ud")]
291 Remove(BlockInfo, BTreeSet<WeakCid>, bool),
292
293 #[serde(rename = "d")]
299 Delete(BlockInfo, BTreeMap<WeakCid, BTreeSet<WeakCid>>, bool),
300
301 #[serde(rename = "ti")]
303 TagsInsert(BTreeSet<WeakCid>, Tags),
304
305 #[serde(rename = "tr")]
307 TagsRemove(BTreeSet<WeakCid>, Tags),
308
309 #[serde(rename = "pc")]
311 PinCreate(String, PinStrategy, References),
312
313 #[serde(rename = "pu")]
315 PinUpdate(String, PinStrategy),
316
317 #[serde(rename = "pr")]
319 PinReference(String, References),
320
321 #[serde(rename = "pd")]
323 PinRemove(String),
324
325 #[serde(rename = "b")]
327 Batch(CoList<StorageAction>),
328}
329impl Storage {
330 pub async fn initial_state<S: BlockStorage + Clone + 'static>(
332 storage: &S,
333 actions: Vec<StorageAction>,
334 ) -> Result<OptionLink<Self>, anyhow::Error> {
335 let mut state = Storage::default();
336 let mut transaction = StorageTransaction::open(storage.clone(), &state).await?;
337 for action in actions {
338 reduce(&mut transaction, action).await?;
339 }
340 transaction.store(&mut state).await?;
341 Ok(storage.set_value(&state).await?.into())
342 }
343}
344impl Reducer<StorageAction> for Storage {
345 async fn reduce(
346 state: OptionLink<Self>,
347 event: Link<ReducerAction<StorageAction>>,
348 storage: &CoreBlockStorage,
349 ) -> Result<Link<Self>, anyhow::Error> {
350 let event = storage.get_value(&event).await?;
351 let mut state = storage.get_value_or_default(&state).await?;
352 let mut transaction = StorageTransaction::open(storage.clone(), &state).await?;
353 reduce(&mut transaction, event.payload).await?;
354 transaction.store(&mut state).await?;
355 Ok(storage.set_value(&state).await?)
356 }
357}
358
359struct StorageTransaction<S>
360where
361 S: BlockStorage + Clone + 'static,
362{
363 storage: S,
364 pins_changed: bool,
365 pins: LazyTransaction<S, CoMap<String, Pin>>,
366 blocks_changed: bool,
367 blocks: LazyTransaction<S, CoMap<WeakCid, BlockMetadata>>,
368 blocks_index_unreferenced_changed: bool,
369 blocks_index_unreferenced: LazyTransaction<S, CoMap<WeakCid, BlockInfo>>,
370 block_structure_pending_changed: bool,
371 block_structure_pending: LazyTransaction<S, CoMap<WeakCid, BlockStructurePending>>,
372}
373impl<S> StorageTransaction<S>
374where
375 S: BlockStorage + Clone + 'static,
376{
377 async fn open(storage: S, state: &Storage) -> Result<Self, anyhow::Error> {
378 Ok(Self {
379 pins_changed: false,
380 pins: state.pins.open_lazy(&storage).await?,
381 blocks_changed: false,
382 blocks: state.blocks.open_lazy(&storage).await?,
383 blocks_index_unreferenced_changed: false,
384 blocks_index_unreferenced: state.blocks_index_unreferenced.open_lazy(&storage).await?,
385 block_structure_pending_changed: false,
386 block_structure_pending: state.block_structure_pending.open_lazy(&storage).await?,
387 storage,
388 })
389 }
390
391 async fn store(&mut self, state: &mut Storage) -> Result<(), anyhow::Error> {
392 if let Some(pins) = self.pins.opt_mut() {
393 if self.pins_changed {
394 state.pins = pins.store().await?;
395 self.pins_changed = false;
396 }
397 }
398 if let Some(blocks) = self.blocks.opt_mut() {
399 if self.blocks_changed {
400 state.blocks = blocks.store().await?;
401 self.blocks_changed = false;
402 }
403 }
404 if let Some(blocks_index_unreferenced) = self.blocks_index_unreferenced.opt_mut() {
405 if self.blocks_index_unreferenced_changed {
406 state.blocks_index_unreferenced = blocks_index_unreferenced.store().await?;
407 self.blocks_index_unreferenced_changed = false;
408 }
409 }
410 if let Some(block_structure_pending) = self.block_structure_pending.opt_mut() {
411 if self.block_structure_pending_changed {
412 state.block_structure_pending = block_structure_pending.store().await?;
413 self.block_structure_pending_changed = false;
414 }
415 }
416 Ok(())
417 }
418
419 fn storage(&self) -> &S {
420 &self.storage
421 }
422
423 async fn pins(&mut self) -> Result<&CoMapTransaction<S, String, Pin>, StorageError> {
424 self.pins.get().await
425 }
426
427 async fn pins_mut(&mut self) -> Result<&mut CoMapTransaction<S, String, Pin>, StorageError> {
428 self.pins_changed = true;
429 self.pins.get_mut().await
430 }
431
432 async fn blocks(&mut self) -> Result<&CoMapTransaction<S, WeakCid, BlockMetadata>, StorageError> {
433 self.blocks.get().await
434 }
435
436 async fn blocks_mut(&mut self) -> Result<&mut CoMapTransaction<S, WeakCid, BlockMetadata>, StorageError> {
437 self.blocks_changed = true;
438 self.blocks.get_mut().await
439 }
440
441 async fn blocks_index_unreferenced_mut(
446 &mut self,
447 ) -> Result<&mut CoMapTransaction<S, WeakCid, BlockInfo>, StorageError> {
448 self.blocks_index_unreferenced_changed = true;
449 self.blocks_index_unreferenced.get_mut().await
450 }
451
452 async fn block_structure_pending_mut(
453 &mut self,
454 ) -> Result<&mut CoMapTransaction<S, WeakCid, BlockStructurePending>, StorageError> {
455 self.block_structure_pending_changed = true;
456 self.block_structure_pending.get_mut().await
457 }
458}
459
460async fn reduce<S>(transaction: &mut StorageTransaction<S>, action: StorageAction) -> Result<(), anyhow::Error>
461where
462 S: BlockStorage + Clone + 'static,
463{
464 match action {
465 StorageAction::Reference(info, references) => reduce_reference(transaction, info, references).boxed().await?,
466 StorageAction::Unreference(info, references) => {
467 reduce_unreference(transaction, info, references).boxed().await?
468 },
469 StorageAction::Structure(structure) => reduce_structure(transaction, structure).boxed().await?,
470 StorageAction::ReferenceCreate(info, references) => {
471 reduce_reference_create(transaction, info, references).boxed().await?
472 },
473 StorageAction::Remove(info, cids, zero) => reduce_remove(transaction, cids, zero, info).boxed().await?,
474 StorageAction::Delete(info, cids, force) => reduce_delete(transaction, cids, force, info).boxed().await?,
475 StorageAction::TagsInsert(cids, tags) => reduce_tags_insert(transaction, cids, tags).boxed().await?,
476 StorageAction::TagsRemove(cids, tags) => reduce_tags_remove(transaction, cids, tags).boxed().await?,
477 StorageAction::PinCreate(key, strategy, references) => {
478 reduce_pin_create(transaction, key, strategy, references).boxed().await?
479 },
480 StorageAction::PinUpdate(key, strategy) => reduce_pin_update(transaction, key, strategy).boxed().await?,
481 StorageAction::PinReference(key, references) => {
482 reduce_pin_reference(transaction, key, references).boxed().await?
483 },
484 StorageAction::PinRemove(key) => reduce_pin_remove(transaction, key).boxed().await?,
485 StorageAction::Batch(actions) => {
486 let actions_stream = actions.stream(transaction.storage());
487 pin_mut!(actions_stream);
488 while let Some((_, action)) = actions_stream.try_next().await? {
489 Box::pin(reduce(transaction, action)).await?;
490 }
491 },
492 }
493 Ok(())
494}
495
496async fn reduce_structure<S>(
498 transaction: &mut StorageTransaction<S>,
499 structure: BTreeMap<WeakCid, References>,
500) -> Result<(), anyhow::Error>
501where
502 S: BlockStorage + Clone + 'static,
503{
504 for (parent, children) in structure {
505 reference_structure_cid(transaction, parent, children).await?;
506 }
507 Ok(())
508}
509
510async fn reference_structure_cid<S>(
513 transaction: &mut StorageTransaction<S>,
514 parent: WeakCid,
515 children: References,
516) -> Result<(), anyhow::Error>
517where
518 S: BlockStorage + Clone + 'static,
519{
520 let pending = match transaction.block_structure_pending_mut().await?.remove(parent).await? {
522 Some(info) => info,
523 None => {
524 return Ok(());
525 },
526 };
527 match pending {
528 BlockStructurePending::Reference(info) => {
529 let mut block = transaction
531 .blocks()
532 .await?
533 .get(&parent)
534 .await?
535 .ok_or(anyhow!("Reference not found: {:?}", parent))?;
536
537 let info = info.clone().with_block_type(BlockType::Unknown);
539 for (child_cid, child_tags) in children.0.into_iter() {
540 reference_cid(transaction, &info, child_cid, child_tags.references, &child_tags.tags).await?;
541 }
542
543 block.mode = ReferenceMode::Recursive;
545
546 if block.is_removable() {
548 transaction
549 .blocks_index_unreferenced_mut()
550 .await?
551 .insert(parent, info.clone())
552 .await?;
553 }
554
555 transaction.blocks_mut().await?.insert(parent, block).await?;
557 },
558 }
559 Ok(())
560}
561
562async fn reduce_pin_remove<S>(transaction: &mut StorageTransaction<S>, key: String) -> Result<(), anyhow::Error>
563where
564 S: BlockStorage + Clone + 'static,
565{
566 let pin = transaction
568 .pins_mut()
569 .await?
570 .remove(key.clone())
571 .await?
572 .ok_or(anyhow!("Pin not found: {}", key))?;
573 let info = BlockInfo::new(transaction.storage(), key.clone(), BlockType::Root).await?;
574
575 let cids = pin.references.stream(transaction.storage()).map_ok(|(_key, value)| value);
577 pin_mut!(cids);
578 while let Some(reference) = cids.try_next().await? {
579 unreference_cid(transaction, &info, reference, Unreference::ByOne).await?;
580 }
581
582 Ok(())
584}
585
586async fn reduce_pin_reference<S>(
587 transaction: &mut StorageTransaction<S>,
588 key: String,
589 references: References,
590) -> Result<(), anyhow::Error>
591where
592 S: BlockStorage + Clone + 'static,
593{
594 pin_reference(transaction, key, references).await?;
596
597 Ok(())
599}
600
601async fn pin_reference<S>(
602 transaction: &mut StorageTransaction<S>,
603 key: String,
604 references: References,
605) -> Result<(), anyhow::Error>
606where
607 S: BlockStorage + Clone + 'static,
608{
609 let mut pin = transaction
610 .pins()
611 .await?
612 .get(&key)
613 .await?
614 .ok_or(anyhow!("Pin not found: {}", key))?;
615 let mut pin_references = pin.references.open(transaction.storage()).await?;
616 let info = BlockInfo::new(transaction.storage(), key.clone(), BlockType::Root).await?;
617
618 for (reference, block_metadata) in references.0 {
620 pin_references.push(reference).await?;
621 pin.references_count += 1;
622 reference_cid(transaction, &info, reference, block_metadata.references, &block_metadata.tags).await?;
623 }
624
625 apply_pin_strategy(transaction, &mut pin, &mut pin_references, info.clone()).await?;
627
628 pin.references = pin_references.store().await?;
630 transaction.pins_mut().await?.insert(key, pin).await?;
631
632 Ok(())
633}
634
635async fn apply_pin_strategy<S>(
637 transaction: &mut StorageTransaction<S>,
638 pin: &mut Pin,
639 references: &mut CoListTransaction<S, WeakCid>,
640 info: BlockInfo,
641) -> Result<bool, anyhow::Error>
642where
643 S: BlockStorage + Clone + 'static,
644{
645 let mut changed = false;
646 match &pin.strategy {
647 PinStrategy::Unlimited => {},
648 PinStrategy::MaxCount(count) => {
649 while pin.references_count > *count {
650 if let Some((_, remove)) = references.pop_front().await? {
651 unreference_cid(transaction, &info, remove, Unreference::ByOne).await?;
652 }
653 pin.references_count -= 1;
654 changed = true;
655 }
656 },
657 }
658 Ok(changed)
659}
660
661async fn reduce_pin_create<S>(
662 transaction: &mut StorageTransaction<S>,
663 key: String,
664 strategy: PinStrategy,
665 references: References,
666) -> Result<(), anyhow::Error>
667where
668 S: BlockStorage + Clone + 'static,
669{
670 if transaction.pins().await?.contains_key(&key).await? {
672 return Err(anyhow::anyhow!("Pin already exists: {}", key));
673 }
674
675 let pin = Pin { strategy, references: Default::default(), references_count: 0 };
677 transaction.pins_mut().await?.insert(key.clone(), pin).await?;
678
679 if !references.0.is_empty() {
681 pin_reference(transaction, key, references).await?;
682 }
683
684 Ok(())
686}
687
688async fn reduce_pin_update<S>(
689 transaction: &mut StorageTransaction<S>,
690 key: String,
691 strategy: PinStrategy,
692) -> Result<(), anyhow::Error>
693where
694 S: BlockStorage + Clone + 'static,
695{
696 let Some(mut pin) = transaction.pins().await?.get(&key).await? else {
698 return Err(anyhow::anyhow!("Pin not exists: {}", key));
699 };
700 let info = BlockInfo::new(transaction.storage(), key.clone(), BlockType::Root).await?;
701
702 pin.strategy = strategy;
704
705 let mut references = pin.references.open(transaction.storage()).await?;
707 apply_pin_strategy(transaction, &mut pin, &mut references, info).await?;
708
709 pin.references = references.store().await?;
711 transaction.pins_mut().await?.insert(key, pin).await?;
712
713 Ok(())
715}
716
717async fn reduce_tags_remove<S>(
718 transaction: &mut StorageTransaction<S>,
719 cids: BTreeSet<WeakCid>,
720 tags: Tags,
721) -> Result<(), anyhow::Error>
722where
723 S: BlockStorage + Clone + 'static,
724{
725 for cid in cids {
726 transaction
727 .blocks_mut()
728 .await?
729 .try_update_or_insert_async(cid, |mut block| async {
730 block.tags.clear(Some(&tags));
731 Ok(block)
732 })
733 .await?;
734 }
735 Ok(())
736}
737
738async fn reduce_tags_insert<S>(
739 transaction: &mut StorageTransaction<S>,
740 cids: BTreeSet<WeakCid>,
741 tags: Tags,
742) -> Result<(), anyhow::Error>
743where
744 S: BlockStorage + Clone + 'static,
745{
746 for cid in cids {
747 transaction
748 .blocks_mut()
749 .await?
750 .try_update_or_insert_async(cid, |mut block| {
751 let mut tags = tags.clone();
752 async move {
753 block.tags.append(&mut tags);
754 Ok(block)
755 }
756 })
757 .await?;
758 }
759 Ok(())
760}
761
762async fn reduce_remove<S>(
763 transaction: &mut StorageTransaction<S>,
764 cids: impl IntoIterator<Item = WeakCid>,
765 zero: bool,
766 info: BlockInfo,
767) -> Result<(), anyhow::Error>
768where
769 S: BlockStorage + Clone + 'static,
770{
771 for cid in cids {
772 unreference_cid(transaction, &info, cid, if zero { Unreference::ToZero } else { Unreference::ByOne }).await?;
773 }
774 Ok(())
775}
776
777async fn reduce_delete<S>(
780 transaction: &mut StorageTransaction<S>,
781 cids: impl IntoIterator<Item = (WeakCid, BTreeSet<WeakCid>)>,
782 force: bool,
783 info: BlockInfo,
784) -> Result<(), anyhow::Error>
785where
786 S: BlockStorage + Clone + 'static,
787{
788 let info = info.clone().with_block_type(BlockType::Unknown);
790 for (cid, links) in cids {
791 let mut references = References::default();
793 references.extend(links.iter().cloned());
794 reference_structure_cid(transaction, cid, references).await?;
795
796 let block = match transaction.blocks().await?.get(&cid).await? {
798 Some(block) if (block.references == 0 || force) => transaction.blocks_mut().await?.remove(cid).await?,
799 _ => None,
800 };
801 if let Some(block) = block {
802 transaction.blocks_index_unreferenced_mut().await?.remove(cid).await?;
804
805 match block.mode {
807 ReferenceMode::Shallow => {},
808 ReferenceMode::Recursive => {
809 for link in links.iter() {
810 unreference_cid(transaction, &info, *link, Unreference::ByOne).await?;
811 }
812 },
813 }
814 }
815 }
816
817 Ok(())
819}
820
821async fn reduce_reference<S>(
822 transaction: &mut StorageTransaction<S>,
823 info: BlockInfo,
824 references: References,
825) -> Result<(), anyhow::Error>
826where
827 S: BlockStorage + Clone + 'static,
828{
829 for (reference, data) in references.0.into_iter() {
830 reference_cid(transaction, &info, reference, data.references, &data.tags).await?;
831 }
832 Ok(())
833}
834
835async fn reduce_reference_create<S>(
836 transaction: &mut StorageTransaction<S>,
837 info: BlockInfo,
838 references: References,
839) -> Result<(), anyhow::Error>
840where
841 S: BlockStorage + Clone + 'static,
842{
843 for (reference, block_metadata) in references.0 {
844 if transaction.blocks().await?.get(&reference).await?.is_none() {
845 let block = BlockMetadata { tags: block_metadata.tags, ..Default::default() };
848
849 transaction.blocks_mut().await?.insert(reference, block).await?;
851
852 transaction
854 .block_structure_pending_mut()
855 .await?
856 .insert(reference, BlockStructurePending::Reference(info.clone()))
857 .await?;
858 }
859 }
860 Ok(())
861}
862
863async fn reference_cid<S>(
864 transaction: &mut StorageTransaction<S>,
865 info: &BlockInfo,
866 cid: WeakCid,
867 references: u32,
868 tags: &Tags,
869) -> Result<(), anyhow::Error>
870where
871 S: BlockStorage + Clone + 'static,
872{
873 let block = transaction.blocks().await?.get(&cid).await?;
874
875 if let Some(block) = &block {
877 if block.references == 0 {
879 transaction.blocks_index_unreferenced_mut().await?.remove(cid).await?;
880 }
881 } else {
882 transaction
884 .block_structure_pending_mut()
885 .await?
886 .insert(cid, BlockStructurePending::Reference(info.clone()))
887 .await?;
888 }
889
890 let mut block = block.unwrap_or_default();
892 block.references += references;
893 block.tags.extend(tags.iter().cloned());
894 transaction.blocks_mut().await?.insert(cid, block).await?;
895
896 Ok(())
898}
899
900async fn reduce_unreference<S>(
901 transaction: &mut StorageTransaction<S>,
902 info: BlockInfo,
903 references: References,
904) -> Result<(), anyhow::Error>
905where
906 S: BlockStorage + Clone + 'static,
907{
908 for (reference, data) in references.0.into_iter() {
909 unreference_cid(transaction, &info, reference, Unreference::By(data.references)).await?;
910 }
911 Ok(())
912}
913
914enum Unreference {
915 ByOne,
916 By(u32),
917 ToZero,
918 }
920
921async fn unreference_cid<S>(
922 transaction: &mut StorageTransaction<S>,
923 info: &BlockInfo,
924 cid: WeakCid,
925 reference: Unreference,
926) -> Result<bool, anyhow::Error>
927where
928 S: BlockStorage + Clone + 'static,
929{
930 Ok(match transaction.blocks().await?.get(&cid).await? {
931 Some(mut block) if block.references > 0 => {
932 match reference {
934 Unreference::ByOne => {
935 block.references -= 1;
936 },
937 Unreference::By(by) => {
938 block.references -= by;
939 },
940 Unreference::ToZero => {
941 block.references = 0;
942 },
943 }
947
948 if block.is_removable() {
950 transaction
951 .blocks_index_unreferenced_mut()
952 .await?
953 .insert(cid, info.clone())
954 .await?;
955 }
956
957 transaction.blocks_mut().await?.insert(cid, block).await?;
959
960 true
962 },
963 _ => false,
964 })
965}
966
967#[cfg(test)]
968mod tests {
969 use crate::{PinStrategy, References, Storage, StorageAction};
970 use cid::Cid;
971 use co_api::{BlockSerializer, BlockStorageExt, CoreBlockStorage, OptionLink, Reducer, ReducerAction, WeakCid};
972 use co_storage::MemoryBlockStorage;
973 use futures::TryStreamExt;
974 use ipld_core::{ipld::Ipld, serde::to_ipld};
975 use std::{collections::BTreeMap, str::FromStr};
976
977 #[test]
978 fn test_serialize_storage_action() {
979 let cid1 = *BlockSerializer::default().serialize(&1).unwrap().cid();
980 let cid2 = *BlockSerializer::default().serialize(&2).unwrap().cid();
981 let cid3 = *BlockSerializer::default().serialize(&2).unwrap().cid();
982 let mut map = BTreeMap::<WeakCid, References>::new();
983 map.entry(cid1.into()).or_default().insert(cid2);
984 map.entry(cid1.into()).or_default().insert(cid3);
985
986 let action = StorageAction::Structure(map.into_iter().collect());
988 let block = BlockSerializer::default().serialize(&action).unwrap();
989 let action_deserialize: StorageAction = BlockSerializer::default().deserialize(&block).unwrap();
990 assert_eq!(action_deserialize, action);
991
992 let reducer_action: ReducerAction<StorageAction> =
994 ReducerAction { core: "storage".to_owned(), from: "test".to_owned(), payload: action.clone(), time: 123 };
995 let block = BlockSerializer::default().serialize(&reducer_action).unwrap();
996 let reducer_action_deserialize: ReducerAction<StorageAction> =
997 BlockSerializer::default().deserialize(&block).unwrap();
998 assert_eq!(reducer_action_deserialize, reducer_action);
999
1000 let reducer_action_ipld: ReducerAction<Ipld> = ReducerAction {
1002 core: "storage".to_owned(),
1003 from: "test".to_owned(),
1004 payload: to_ipld(action).unwrap(),
1005 time: 123,
1006 };
1007 let reducer_action_ipld_deserialize: ReducerAction<Ipld> =
1008 BlockSerializer::default().deserialize(&block).unwrap();
1009
1010 assert_eq!(reducer_action_ipld_deserialize, reducer_action_ipld);
1011 }
1012
1013 #[tokio::test]
1015 async fn test_blocks_index_unreferenced_is_correct() {
1016 fn cid(s: &str) -> co_api::WeakCid {
1017 Cid::from_str(s).unwrap().into()
1018 }
1019 let storage = CoreBlockStorage::new(MemoryBlockStorage::default(), true);
1020
1021 let actions = [
1023 ReducerAction {
1024 from: "did:local:device".into(),
1025 time: 0,
1026 core: "storage".into(),
1027 payload: StorageAction::PinCreate("co.local.state".into(), PinStrategy::MaxCount(100), [].into()),
1028 },
1029 ReducerAction {
1030 from: "did:local:device".into(),
1031 time: 0,
1032 core: "storage".into(),
1033 payload: StorageAction::PinCreate("co.local.log".into(), PinStrategy::MaxCount(100), [].into()),
1034 },
1035 ReducerAction {
1036 from: "did:local:device".into(),
1037 time: 0,
1038 core: "storage".into(),
1039 payload: StorageAction::PinReference(
1040 "co.local.state".into(),
1041 [(cid("bagakbqabdyqar5vlsfqd3g4mxngt3yl7nx2na2kb4jybylzn5bktwnihjhih42a"))].into(),
1042 ),
1043 },
1044 ReducerAction {
1045 from: "did:local:device".into(),
1046 time: 0,
1047 core: "storage".into(),
1048 payload: StorageAction::Structure(
1049 [
1050 (
1051 (cid("bagakbqabdyqar5vlsfqd3g4mxngt3yl7nx2na2kb4jybylzn5bktwnihjhih42a")),
1052 [
1053 (cid("QmUDCqxH2vm9MBb2mLsGmHsoCMXBBnd4iWDruZdcSGaN7d")),
1054 (cid("QmY8fStJQWVsfY4ae7KzfgeJQKqcXEbp1THut3Uz4aBBP6")),
1055 (cid("QmcS1eGNuBM3a4pf8hw4hEWwdALXEEnimqZBhSBo8aHS7K")),
1056 (cid("bagakbqabdyqcgkbe7hbegknbemf73xlnooct2g35zzrbdkus6z342bir46k5zgq")),
1057 ]
1058 .into(),
1059 ),
1060 (
1061 (cid("bagakbqabdyqcgkbe7hbegknbemf73xlnooct2g35zzrbdkus6z342bir46k5zgq")),
1062 [(cid("bagakbqabdyqfomt5rhne4gqclpbi7t2emjthzcm4frymppcndo27rxum6tugwoi"))].into(),
1063 ),
1064 (
1065 (cid("bagakbqabdyqfomt5rhne4gqclpbi7t2emjthzcm4frymppcndo27rxum6tugwoi")),
1066 [(cid("bagakbqabdyqdyybl3osmbp4ckybdvmwccje5kxa6bhy6yz7p3ftrsngh4r6lg5a"))].into(),
1067 ),
1068 ]
1069 .into(),
1070 ),
1071 },
1072 ReducerAction {
1073 from: "did:local:device".into(),
1074 time: 0,
1075 core: "storage".into(),
1076 payload: StorageAction::PinReference(
1077 "co.local.state".into(),
1078 [(cid("bagakbqabdyqldyp7kxv6p5wb3edrywc74xfkgauqzlumlxncdlzncbwt36y7iby"))].into(),
1079 ),
1080 },
1081 ReducerAction {
1082 from: "did:local:device".into(),
1083 time: 1745513086640,
1084 core: "storage".into(),
1085 payload: StorageAction::PinUpdate("co.local.state".into(), PinStrategy::MaxCount(1)),
1086 },
1087 ReducerAction {
1088 from: "did:local:device".into(),
1089 time: 0,
1090 core: "storage".into(),
1091 payload: StorageAction::PinReference(
1092 "co.local.state".into(),
1093 [(cid("bagakbqabdyqklkdo5hv4smstsuv2t347nnonrdgylyrb3qepc2rh5p2qtntmbba"))].into(),
1094 ),
1095 },
1096 ReducerAction {
1097 from: "did:local:device".into(),
1098 time: 0,
1099 core: "storage".into(),
1100 payload: StorageAction::Structure(
1101 [
1102 (
1103 (cid("bagakbqabdyqklkdo5hv4smstsuv2t347nnonrdgylyrb3qepc2rh5p2qtntmbba")),
1104 [(cid("bagakbqabdyqh7c4dgnjexzftz5aethy36hwi4q6iosiwy32e6lortxcp3l6et3a"))].into(),
1105 ),
1106 (
1107 (cid("bagakbqabdyqh7c4dgnjexzftz5aethy36hwi4q6iosiwy32e6lortxcp3l6et3a")),
1108 [
1109 (cid("bagakbqabdyqc63i6iuxec7qgmzor4a554ihpznnbmnonh2l5l2h6w4vcvyn2zia")),
1110 (cid("bagakbqabdyqodqxbpakp23ngiqce4hhif2w5n54ujalwomt5lravwfezkdgyica")),
1111 (cid("bagakbqabdyqosx7w5aag3uid3tgh6w3g7p5vmtykf4cqefg7zwbpkf27bfvqlby")),
1112 ]
1113 .into(),
1114 ),
1115 (
1116 (cid("bagakbqabdyqc63i6iuxec7qgmzor4a554ihpznnbmnonh2l5l2h6w4vcvyn2zia")),
1117 [(cid("bagakbqabdyqpr2imdfe2lch4cqf7e4cjd5i26yrjsqai2gbwipxdesgukfupu7q"))].into(),
1118 ),
1119 (
1120 (cid("bagakbqabdyqodqxbpakp23ngiqce4hhif2w5n54ujalwomt5lravwfezkdgyica")),
1121 [(cid("bagakbqabdyqjf3zpgq5jg7fjnnxo3pybvf63f7n73now5pvnednflv4ezgahadq"))].into(),
1122 ),
1123 (
1124 (cid("bagakbqabdyqosx7w5aag3uid3tgh6w3g7p5vmtykf4cqefg7zwbpkf27bfvqlby")),
1125 [(cid("bagakbqabdyqebeu7wndmyhr63zfriwlaoddqy3sygd5it7xagora7xreqbbjk3q"))].into(),
1126 ),
1127 (
1128 (cid("bagakbqabdyqjf3zpgq5jg7fjnnxo3pybvf63f7n73now5pvnednflv4ezgahadq")),
1129 [(cid("bagakbqabdyqocquirj4gdy2vvismgm52awzdgf66sqevvrswwvyalg57pt5bboy"))].into(),
1130 ),
1131 (
1132 (cid("bagakbqabdyqocquirj4gdy2vvismgm52awzdgf66sqevvrswwvyalg57pt5bboy")),
1133 [(cid("bagakbqabdyqjamecznbm6ninfi5dryyvshenwnzbiunh7v6qrqy2ydlfkobjakq"))].into(),
1134 ),
1135 ]
1136 .into(),
1137 ),
1138 },
1139 ];
1140 let mut state_reference = OptionLink::none();
1141 for action in actions {
1142 let action_link = storage.set_value(&action).await.unwrap();
1143 state_reference = Storage::reduce(state_reference, action_link, &storage).await.unwrap().into();
1144 }
1145
1146 let state = storage.get_value(&state_reference.unwrap()).await.unwrap();
1148 assert!(state
1149 .blocks_index_unreferenced
1150 .contains(&storage, &cid("bagakbqabdyqar5vlsfqd3g4mxngt3yl7nx2na2kb4jybylzn5bktwnihjhih42a"))
1151 .await
1152 .unwrap());
1153 assert!(!state
1154 .blocks_index_unreferenced
1155 .contains(&storage, &cid("bagakbqabdyqldyp7kxv6p5wb3edrywc74xfkgauqzlumlxncdlzncbwt36y7iby"))
1156 .await
1157 .unwrap());
1158 }
1159
1160 #[tokio::test]
1162 async fn test_pin_strategy_max() {
1163 fn cid(s: &str) -> co_api::WeakCid {
1164 Cid::from_str(s).unwrap().into()
1165 }
1166 fn action(s: StorageAction) -> ReducerAction<StorageAction> {
1167 ReducerAction { from: "did:local:device".into(), time: 0, core: "storage".into(), payload: s }
1168 }
1169 let storage = CoreBlockStorage::new(MemoryBlockStorage::default(), true);
1170
1171 let actions = [
1173 action(StorageAction::PinCreate("co:local".into(), PinStrategy::MaxCount(100), [].into())),
1174 action(StorageAction::PinReference(
1175 "co:local".into(),
1176 [(cid("bafyr4idmz6tdkhmdwhis4w2yov4g7ctjs72bcixzk2q7m3ioihhm4lvnky"))].into(),
1177 )),
1178 action(StorageAction::PinReference(
1179 "co:local".into(),
1180 [
1181 (cid("bafyr4id6ivgo6penzkew6tv2jsnncuq7a3zm7ajqd4nfmuxry7tq6xawbq")),
1182 (cid("bafyr4ih47p3rp5ppftduphy2fikph63iey5fwv42du6eyjccyu3ygvqvzy")),
1183 ]
1184 .into(),
1185 )),
1186 action(StorageAction::PinUpdate("co:local".into(), PinStrategy::MaxCount(1))),
1187 action(StorageAction::PinReference(
1188 "co:local".into(),
1189 [
1190 (cid("bafyr4igconcuuuokydue7wglesze5vdyahzprgpkn7ukajd76besyhw2mi")),
1191 (cid("bafyr4iegqvwuhpdfp6vdfyxpxbm4qfjjo5y4rko34j6s7eqf2xfijo5chy")),
1192 ]
1193 .into(),
1194 )),
1195 ];
1196 let mut state_reference = OptionLink::none();
1197 for action in actions {
1198 let action_link = storage.set_value(&action).await.unwrap();
1199 state_reference = Storage::reduce(state_reference, action_link, &storage).await.unwrap().into();
1200 }
1201
1202 let state = storage.get_value(&state_reference.unwrap()).await.unwrap();
1204 let pin = state.pins.get(&storage, &"co:local".to_owned()).await.unwrap().unwrap();
1205 let pin = pin
1206 .references
1207 .stream(&storage)
1208 .map_ok(|(_index, value)| value)
1209 .try_collect::<Vec<_>>()
1210 .await
1211 .unwrap();
1212 assert_eq!(pin, vec![cid("bafyr4iegqvwuhpdfp6vdfyxpxbm4qfjjo5y4rko34j6s7eqf2xfijo5chy")]);
1213 }
1214}