1pub mod libc;
26
27#[cfg(feature = "testing")]
28pub mod testing;
29
30use alloc::{boxed::Box, collections::BTreeMap, string::String, vec, vec::Vec};
31
32use aranya_crypto::{Csprng, Rng, dangerous::spideroak_crypto::csprng::rand::Rng as _};
33use buggy::{Bug, BugExt as _, bug};
34use serde::{Deserialize, Serialize};
35use vec1::Vec1;
36
37use crate::{
38 Address, Checkpoint, CmdId, Command, Fact, FactIndex, FactPerspective, GraphId, Keys, Location,
39 Perspective, PolicyId, Prior, Priority, Query, QueryMut, Revertable, Segment, Storage,
40 StorageError, StorageProvider,
41};
42
43pub mod io;
44pub use io::*;
45
46const MAX_FACT_INDEX_DEPTH: usize = 16;
58
59pub struct LinearStorageProvider<FM: IoManager> {
60 manager: FM,
61 storage: BTreeMap<GraphId, LinearStorage<FM::Writer>>,
62}
63
64pub struct LinearStorage<W> {
65 writer: W,
66}
67
68#[derive(Debug)]
69pub struct LinearSegment<R> {
70 repr: SegmentRepr,
71 reader: R,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75struct SegmentRepr {
76 offset: usize,
78 prior: Prior<Location>,
79 parents: Prior<Address>,
80 policy: PolicyId,
81 facts: usize,
83 commands: Vec1<CommandData>,
84 max_cut: usize,
85 skip_list: Vec<(Location, usize)>,
86}
87
88#[derive(Debug, Serialize, Deserialize)]
89struct CommandData {
90 id: CmdId,
91 priority: Priority,
92 policy: Option<Bytes>,
93 data: Bytes,
94 updates: Vec<Update>,
95}
96
97pub struct LinearCommand<'a> {
98 id: &'a CmdId,
99 parent: Prior<Address>,
100 priority: Priority,
101 policy: Option<&'a [u8]>,
102 data: &'a [u8],
103 max_cut: usize,
104}
105
106type Bytes = Box<[u8]>;
107
108type Update = (String, Keys, Option<Bytes>);
109type FactMap = BTreeMap<Keys, Option<Box<[u8]>>>;
110type NamedFactMap = BTreeMap<String, FactMap>;
111
112#[derive(Debug)]
113pub struct LinearFactIndex<R> {
114 repr: FactIndexRepr,
115 reader: R,
116}
117
118#[derive(Debug, Serialize, Deserialize)]
119struct FactIndexRepr {
120 offset: usize,
122 prior: Option<usize>,
124 depth: usize,
128 facts: NamedFactMap,
130}
131
132#[derive(Debug)]
133pub struct LinearPerspective<R> {
134 prior: Prior<Location>,
135 parents: Prior<Address>,
136 policy: PolicyId,
137 facts: LinearFactPerspective<R>,
138 commands: Vec<CommandData>,
139 current_updates: Vec<Update>,
140 max_cut: usize,
141 last_common_ancestor: Option<(Location, usize)>,
142}
143
144impl<R> LinearPerspective<R> {
145 fn new(
146 prior: Prior<Location>,
147 parents: Prior<Address>,
148 policy: PolicyId,
149 prior_facts: FactPerspectivePrior<R>,
150 max_cut: usize,
151 last_common_ancestor: Option<(Location, usize)>,
152 ) -> Self {
153 Self {
154 prior,
155 parents,
156 policy,
157 facts: LinearFactPerspective::new(prior_facts),
158 commands: Vec::new(),
159 current_updates: Vec::new(),
160 max_cut,
161 last_common_ancestor,
162 }
163 }
164}
165
166#[derive(Debug)]
167pub struct LinearFactPerspective<R> {
168 map: BTreeMap<String, BTreeMap<Keys, Option<Bytes>>>,
169 prior: FactPerspectivePrior<R>,
170}
171
172impl<R> LinearFactPerspective<R> {
173 fn new(prior: FactPerspectivePrior<R>) -> Self {
174 Self {
175 map: BTreeMap::new(),
176 prior,
177 }
178 }
179}
180
181#[derive(Debug)]
182enum FactPerspectivePrior<R> {
183 None,
184 FactPerspective(Box<LinearFactPerspective<R>>),
185 FactIndex { offset: usize, reader: R },
186}
187
188impl<R> FactPerspectivePrior<R> {
189 fn is_none(&self) -> bool {
190 matches!(self, Self::None)
191 }
192}
193
194impl<FM: IoManager + Default> Default for LinearStorageProvider<FM> {
195 fn default() -> Self {
196 Self {
197 manager: FM::default(),
198 storage: BTreeMap::new(),
199 }
200 }
201}
202
203impl<FM: IoManager> LinearStorageProvider<FM> {
204 pub fn new(manager: FM) -> Self {
205 Self {
206 manager,
207 storage: BTreeMap::new(),
208 }
209 }
210}
211
212impl<FM: IoManager> StorageProvider for LinearStorageProvider<FM> {
213 type Perspective = LinearPerspective<<FM::Writer as Write>::ReadOnly>;
214 type Segment = LinearSegment<<FM::Writer as Write>::ReadOnly>;
215 type Storage = LinearStorage<FM::Writer>;
216
217 fn new_perspective(&mut self, policy_id: PolicyId) -> Self::Perspective {
218 LinearPerspective::new(
219 Prior::None,
220 Prior::None,
221 policy_id,
222 FactPerspectivePrior::None,
223 0,
224 None,
225 )
226 }
227
228 fn new_storage(
229 &mut self,
230 init: Self::Perspective,
231 ) -> Result<(GraphId, &mut Self::Storage), StorageError> {
232 use alloc::collections::btree_map::Entry;
233
234 if init.commands.is_empty() {
235 return Err(StorageError::EmptyPerspective);
236 }
237 let graph_id = GraphId::transmute(init.commands[0].id);
238 let Entry::Vacant(entry) = self.storage.entry(graph_id) else {
239 return Err(StorageError::StorageExists);
240 };
241
242 let file = self.manager.create(graph_id)?;
243 Ok((graph_id, entry.insert(LinearStorage::create(file, init)?)))
244 }
245
246 fn get_storage(&mut self, graph: GraphId) -> Result<&mut Self::Storage, StorageError> {
247 use alloc::collections::btree_map::Entry;
248
249 let entry = match self.storage.entry(graph) {
250 Entry::Vacant(v) => v,
251 Entry::Occupied(o) => return Ok(o.into_mut()),
252 };
253
254 let file = self
255 .manager
256 .open(graph)?
257 .ok_or(StorageError::NoSuchStorage)?;
258 Ok(entry.insert(LinearStorage::open(file)?))
259 }
260
261 fn remove_storage(&mut self, graph: GraphId) -> Result<(), StorageError> {
262 self.manager.remove(graph)?;
263
264 self.storage
265 .remove(&graph)
266 .ok_or(StorageError::NoSuchStorage)?;
267
268 Ok(())
269 }
270
271 fn list_graph_ids(
272 &mut self,
273 ) -> Result<impl Iterator<Item = Result<GraphId, StorageError>>, StorageError> {
274 self.manager.list()
275 }
276}
277
278impl<W: Write> LinearStorage<W> {
279 fn get_skip(
280 &self,
281 segment: <Self as Storage>::Segment,
282 max_cut: usize,
283 ) -> Result<Option<(Location, usize)>, StorageError> {
284 let mut head = segment;
285 let mut current = None;
286 'outer: loop {
287 if max_cut > head.longest_max_cut()? {
288 return Ok(current);
289 }
290 current = Some((head.first_location(), head.shortest_max_cut()));
291 if max_cut >= head.shortest_max_cut() {
292 return Ok(current);
293 }
294 for (skip, skip_max_cut) in head.skip_list() {
297 if skip_max_cut <= &max_cut {
298 head = self.get_segment(*skip)?;
299 continue 'outer;
300 }
301 }
302 head = match head.prior() {
303 Prior::None | Prior::Merge(_, _) => {
304 return Ok(current);
305 }
306 Prior::Single(l) => self.get_segment(l)?,
307 }
308 }
309 }
310}
311
312impl<W: Write> LinearStorage<W> {
313 fn create(mut writer: W, init: LinearPerspective<W::ReadOnly>) -> Result<Self, StorageError> {
314 assert!(matches!(init.prior, Prior::None));
315 assert!(matches!(init.parents, Prior::None));
316 assert!(matches!(init.facts.prior, FactPerspectivePrior::None));
317
318 let mut map = init.facts.map;
319 map.retain(|_, kv| !kv.is_empty());
320
321 let facts = writer
322 .append(|offset| FactIndexRepr {
323 offset,
324 prior: None,
325 depth: 1,
326 facts: map,
327 })?
328 .offset;
329
330 let commands = init
331 .commands
332 .try_into()
333 .map_err(|_| StorageError::EmptyPerspective)?;
334 let segment = writer.append(|offset| SegmentRepr {
335 offset,
336 prior: Prior::None,
337 parents: Prior::None,
338 policy: init.policy,
339 facts,
340 commands,
341 max_cut: 0,
342 skip_list: vec![],
343 })?;
344
345 let head = Location::new(
346 segment.offset,
347 segment
348 .commands
349 .len()
350 .checked_sub(1)
351 .assume("vec1 length >= 1")?,
352 );
353
354 writer.commit(head)?;
355
356 let storage = Self { writer };
357
358 Ok(storage)
359 }
360
361 fn open(writer: W) -> Result<Self, StorageError> {
362 Ok(Self { writer })
363 }
364
365 fn compact(&mut self, mut repr: FactIndexRepr) -> Result<FactIndexRepr, StorageError> {
366 let mut map = NamedFactMap::new();
367 let reader = self.writer.readonly();
368 loop {
369 for (name, kv) in repr.facts {
370 let sub = map.entry(name).or_default();
371 for (k, v) in kv {
372 sub.entry(k).or_insert(v);
373 }
374 }
375 let Some(offset) = repr.prior else { break };
376 repr = reader.fetch(offset)?;
377 }
378
379 map.retain(|_, kv| {
381 kv.retain(|_, v| v.is_some());
382 !kv.is_empty()
383 });
384
385 Ok(self
386 .write_facts(LinearFactPerspective {
387 map,
388 prior: FactPerspectivePrior::None,
389 })?
390 .repr)
391 }
392}
393
394impl<F: Write> Storage for LinearStorage<F> {
395 type Perspective = LinearPerspective<F::ReadOnly>;
396 type FactPerspective = LinearFactPerspective<F::ReadOnly>;
397 type Segment = LinearSegment<F::ReadOnly>;
398 type FactIndex = LinearFactIndex<F::ReadOnly>;
399
400 fn get_linear_perspective(&self, parent: Location) -> Result<Self::Perspective, StorageError> {
401 let segment = self.get_segment(parent)?;
402 let command = segment
403 .get_command(parent)
404 .ok_or(StorageError::CommandOutOfBounds(parent))?;
405 let policy = segment.repr.policy;
406 let prior_facts: FactPerspectivePrior<F::ReadOnly> = if parent == segment.head_location() {
407 FactPerspectivePrior::FactIndex {
408 offset: segment.repr.facts,
409 reader: self.writer.readonly(),
410 }
411 } else {
412 let prior = match segment.facts()?.repr.prior {
413 Some(offset) => FactPerspectivePrior::FactIndex {
414 offset,
415 reader: self.writer.readonly(),
416 },
417 None => FactPerspectivePrior::None,
418 };
419 let mut facts = LinearFactPerspective::new(prior);
420 for data in &segment.repr.commands[..=parent.command] {
421 facts.apply_updates(&data.updates);
422 }
423 if facts.prior.is_none() {
424 facts.map.retain(|_, kv| !kv.is_empty());
425 }
426 if facts.map.is_empty() {
427 facts.prior
428 } else {
429 FactPerspectivePrior::FactPerspective(Box::new(facts))
430 }
431 };
432 let prior = Prior::Single(parent);
433
434 let perspective = LinearPerspective::new(
435 prior,
436 Prior::Single(command.address()?),
437 policy,
438 prior_facts,
439 command
440 .max_cut()?
441 .checked_add(1)
442 .assume("must not overflow")?,
443 None,
444 );
445
446 Ok(perspective)
447 }
448
449 fn get_fact_perspective(
450 &self,
451 location: Location,
452 ) -> Result<Self::FactPerspective, StorageError> {
453 let segment = self.get_segment(location)?;
454
455 if location == segment.head_location()
458 || segment
459 .repr
460 .commands
461 .iter()
462 .all(|cmd| cmd.updates.is_empty())
463 {
464 return Ok(LinearFactPerspective::new(
465 FactPerspectivePrior::FactIndex {
466 offset: segment.repr.facts,
467 reader: self.writer.readonly(),
468 },
469 ));
470 }
471
472 let prior = match segment.facts()?.repr.prior {
473 Some(offset) => FactPerspectivePrior::FactIndex {
474 offset,
475 reader: self.writer.readonly(),
476 },
477 None => FactPerspectivePrior::None,
478 };
479 let mut facts = LinearFactPerspective::new(prior);
480 for data in &segment.repr.commands[..=location.command] {
481 facts.apply_updates(&data.updates);
482 }
483
484 Ok(facts)
485 }
486
487 fn new_merge_perspective(
488 &self,
489 left: Location,
490 right: Location,
491 last_common_ancestor: (Location, usize),
492 policy_id: PolicyId,
493 braid: Self::FactIndex,
494 ) -> Result<Self::Perspective, StorageError> {
495 let left_segment = self.get_segment(left)?;
498 let left_command = left_segment
499 .get_command(left)
500 .ok_or(StorageError::CommandOutOfBounds(left))?;
501 let right_segment = self.get_segment(right)?;
502 let right_command = right_segment
503 .get_command(right)
504 .ok_or(StorageError::CommandOutOfBounds(right))?;
505
506 let parent = Prior::Merge(left_command.address()?, right_command.address()?);
507
508 if policy_id != left_segment.policy() && policy_id != right_segment.policy() {
509 return Err(StorageError::PolicyMismatch);
510 }
511
512 let prior = Prior::Merge(left, right);
513
514 let perspective = LinearPerspective::new(
515 prior,
516 parent,
517 policy_id,
518 FactPerspectivePrior::FactIndex {
519 offset: braid.repr.offset,
520 reader: braid.reader,
521 },
522 left_command
523 .max_cut()?
524 .max(right_command.max_cut()?)
525 .checked_add(1)
526 .assume("must not overflow")?,
527 Some(last_common_ancestor),
528 );
529
530 Ok(perspective)
531 }
532
533 fn get_segment(&self, location: Location) -> Result<Self::Segment, StorageError> {
534 let reader = self.writer.readonly();
535 let repr = reader.fetch(location.segment)?;
536 let seg = LinearSegment { repr, reader };
537
538 Ok(seg)
539 }
540
541 fn get_head(&self) -> Result<Location, StorageError> {
542 self.writer.head()
543 }
544
545 fn commit(&mut self, segment: Self::Segment) -> Result<(), StorageError> {
546 if !self.is_ancestor(self.get_head()?, &segment)? {
547 return Err(StorageError::HeadNotAncestor);
548 }
549
550 self.writer.commit(segment.head_location())
551 }
552
553 fn write(&mut self, perspective: Self::Perspective) -> Result<Self::Segment, StorageError> {
554 let facts = self.write_facts(perspective.facts)?.repr.offset;
557
558 let commands: Vec1<CommandData> = perspective
559 .commands
560 .try_into()
561 .map_err(|_| StorageError::EmptyPerspective)?;
562
563 let get_skips =
564 |l: Location, count: usize| -> Result<Vec<(Location, usize)>, StorageError> {
565 let mut rng = &mut Rng as &mut dyn Csprng;
566 let mut skips = vec![];
567 for _ in 0..count {
568 let segment = self.get_segment(l)?;
569 let l_max_cut = segment
570 .get_command(l)
571 .assume("location must exist")?
572 .max_cut;
573 if l_max_cut > 0 {
574 let max_cut = rng.gen_range(0..l_max_cut);
575 if let Some(skip) = self.get_skip(segment, max_cut)? {
576 if !skips.contains(&skip) {
577 skips.push(skip);
578 }
579 } else {
580 break;
581 }
582 }
583 }
584 Ok(skips)
585 };
586
587 let skip_list = match perspective.prior {
588 Prior::None => vec![],
589 Prior::Merge(_, _) => {
590 let (lca, max_cut) = perspective.last_common_ancestor.assume("lca must exist")?;
591 let mut skips = get_skips(lca, 2)?;
592 if !skips.contains(&(lca, max_cut)) {
593 skips.push((lca, max_cut));
594 }
595 skips.sort();
596 skips
597 }
598 Prior::Single(l) => {
599 let mut skips = get_skips(l, 3)?;
600 skips.sort();
601 skips
602 }
603 };
604 let repr = self.writer.append(|offset| SegmentRepr {
605 offset,
606 prior: perspective.prior,
607 parents: perspective.parents,
608 policy: perspective.policy,
609 facts,
610 commands,
611 max_cut: perspective.max_cut,
612 skip_list,
613 })?;
614
615 Ok(LinearSegment {
616 repr,
617 reader: self.writer.readonly(),
618 })
619 }
620
621 fn write_facts(
622 &mut self,
623 facts: Self::FactPerspective,
624 ) -> Result<Self::FactIndex, StorageError> {
625 let mut prior = match facts.prior {
626 FactPerspectivePrior::None => None,
627 FactPerspectivePrior::FactPerspective(prior) => {
628 let prior = self.write_facts(*prior)?;
629 if facts.map.is_empty() {
630 return Ok(prior);
631 }
632 Some(prior.repr)
633 }
634 FactPerspectivePrior::FactIndex { offset, reader } => {
635 let repr = reader.fetch(offset)?;
636 if facts.map.is_empty() {
637 return Ok(LinearFactIndex { repr, reader });
638 }
639 Some(repr)
640 }
641 };
642
643 let depth = if let Some(mut p) = prior.take() {
644 if p.depth > MAX_FACT_INDEX_DEPTH - 1 {
645 p = self.compact(p)?;
646 }
647 prior.insert(p).depth
648 } else {
649 0
650 };
651
652 let depth = depth.checked_add(1).assume("depth won't overflow")?;
653
654 if depth > MAX_FACT_INDEX_DEPTH {
655 bug!("fact index too deep");
656 }
657
658 let repr = self.writer.append(|offset| FactIndexRepr {
659 offset,
660 prior: prior.map(|p| p.offset),
661 depth,
662 facts: facts.map,
663 })?;
664
665 Ok(LinearFactIndex {
666 repr,
667 reader: self.writer.readonly(),
668 })
669 }
670}
671
672impl<R: Read> Segment for LinearSegment<R> {
673 type FactIndex = LinearFactIndex<R>;
674 type Command<'a>
675 = LinearCommand<'a>
676 where
677 R: 'a;
678
679 fn head(&self) -> Result<Self::Command<'_>, StorageError> {
680 let data = self.repr.commands.last();
681 let parent = if let Some(prev) = usize::checked_sub(self.repr.commands.len(), 2) {
682 Prior::Single(Address {
683 id: self.repr.commands[prev].id,
684 max_cut: self
685 .repr
686 .max_cut
687 .checked_add(prev)
688 .assume("must not overflow")?,
689 })
690 } else {
691 self.repr.parents
692 };
693 Ok(LinearCommand {
694 id: &data.id,
695 parent,
696 priority: data.priority.clone(),
697 policy: data.policy.as_deref(),
698 data: &data.data,
699 max_cut: self
700 .repr
701 .max_cut
702 .checked_add(self.repr.commands.len())
703 .assume("must not overflow")?
704 .checked_sub(1)
705 .assume("must not overflow")?,
706 })
707 }
708
709 fn first(&self) -> Self::Command<'_> {
710 let data = self.repr.commands.first();
711 LinearCommand {
712 id: &data.id,
713 parent: self.repr.parents,
714 priority: data.priority.clone(),
715 policy: data.policy.as_deref(),
716 data: &data.data,
717 max_cut: self.repr.max_cut,
718 }
719 }
720
721 fn head_location(&self) -> Location {
722 #[allow(clippy::arithmetic_side_effects)]
724 Location::new(self.repr.offset, self.repr.commands.len() - 1)
725 }
726
727 fn first_location(&self) -> Location {
728 Location::new(self.repr.offset, 0)
729 }
730
731 fn contains(&self, location: Location) -> bool {
732 location.segment == self.repr.offset && location.command < self.repr.commands.len()
733 }
734
735 fn policy(&self) -> PolicyId {
736 self.repr.policy
737 }
738
739 fn prior(&self) -> Prior<Location> {
740 self.repr.prior
741 }
742
743 fn get_command(&self, location: Location) -> Option<Self::Command<'_>> {
744 if self.repr.offset != location.segment {
745 return None;
746 }
747 let data = self.repr.commands.get(location.command)?;
748 let parent = if let Some(prev) = usize::checked_sub(location.command, 1) {
749 if let Some(max_cut) = self.repr.max_cut.checked_add(prev) {
750 Prior::Single(Address {
751 id: self.repr.commands[prev].id,
752 max_cut,
753 })
754 } else {
755 return None;
756 }
757 } else {
758 self.repr.parents
759 };
760 self.repr
761 .max_cut
762 .checked_add(location.command)
763 .map(|max_cut| LinearCommand {
764 id: &data.id,
765 parent,
766 priority: data.priority.clone(),
767 policy: data.policy.as_deref(),
768 data: &data.data,
769 max_cut,
770 })
771 }
772
773 fn get_from(&self, location: Location) -> Vec<Self::Command<'_>> {
774 if self.repr.offset != location.segment {
775 return Vec::new();
777 }
778
779 (location.command..self.repr.commands.len())
781 .map(|c| Location::new(location.segment, c))
782 .map(|loc| {
783 self.get_command(loc)
784 .expect("constructed location is valid")
785 })
786 .collect()
787 }
788
789 fn get_by_address(&self, address: Address) -> Option<Location> {
790 if address.max_cut < self.repr.max_cut {
791 return None;
792 }
793 let idx = address.max_cut.checked_sub(self.repr.max_cut)?;
794 let cmd = self.repr.commands.get(idx)?;
795 if cmd.id != address.id {
796 return None;
797 }
798 Some(Location::new(self.repr.offset, idx))
799 }
800
801 fn facts(&self) -> Result<Self::FactIndex, StorageError> {
802 Ok(LinearFactIndex {
803 repr: self.reader.fetch(self.repr.facts)?,
804 reader: self.reader.clone(),
805 })
806 }
807
808 fn skip_list(&self) -> &[(Location, usize)] {
809 &self.repr.skip_list
810 }
811
812 fn shortest_max_cut(&self) -> usize {
813 self.repr.max_cut
814 }
815
816 fn longest_max_cut(&self) -> Result<usize, StorageError> {
817 Ok(self
818 .repr
819 .max_cut
820 .checked_add(self.repr.commands.len())
821 .assume("must not overflow")?
822 .checked_sub(1)
823 .assume("must not overflow")?)
824 }
825}
826
827impl<R: Read> FactIndex for LinearFactIndex<R> {}
828
829type MapIter = alloc::collections::btree_map::IntoIter<Keys, Option<Bytes>>;
830pub struct QueryIterator {
831 it: MapIter,
832}
833
834impl QueryIterator {
835 fn new(it: MapIter) -> Self {
836 Self { it }
837 }
838}
839
840impl Iterator for QueryIterator {
841 type Item = Result<Fact, StorageError>;
842 fn next(&mut self) -> Option<Self::Item> {
843 loop {
844 if let (key, Some(value)) = self.it.next()? {
846 return Some(Ok(Fact { key, value }));
847 }
848 }
849 }
850}
851
852impl<R: Read> Query for LinearFactIndex<R> {
853 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
854 let mut prior = Some(&self.repr);
855 let mut slot; while let Some(facts) = prior {
857 if let Some(v) = facts.facts.get(name).and_then(|m| m.get(keys)) {
858 return Ok(v.clone());
859 }
860 slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
861 prior = slot.as_ref();
862 }
863 Ok(None)
864 }
865
866 type QueryIterator = QueryIterator;
867 fn query_prefix(
868 &self,
869 name: &str,
870 prefix: &[Box<[u8]>],
871 ) -> Result<QueryIterator, StorageError> {
872 Ok(QueryIterator::new(
873 self.query_prefix_inner(name, prefix)?.into_iter(),
874 ))
875 }
876}
877
878impl<R: Read> LinearFactIndex<R> {
879 fn query_prefix_inner(
880 &self,
881 name: &str,
882 prefix: &[Box<[u8]>],
883 ) -> Result<FactMap, StorageError> {
884 let mut matches = BTreeMap::new();
885 let mut prior = Some(&self.repr);
886 let mut slot; while let Some(facts) = prior {
888 if let Some(map) = facts.facts.get(name) {
889 for (k, v) in super::memory::find_prefixes(map, prefix) {
890 if !matches.contains_key(k) {
892 matches.insert(k.clone(), v.map(Into::into));
893 }
894 }
895 }
896 slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
897 prior = slot.as_ref();
898 }
899 Ok(matches)
900 }
901}
902
903impl<R> LinearFactPerspective<R> {
904 fn clear(&mut self) {
905 self.map.clear();
906 }
907
908 fn apply_updates(&mut self, updates: &[Update]) {
909 for (name, key, value) in updates {
910 if self.prior.is_none() {
911 if let Some(value) = value {
912 self.map
913 .entry(name.clone())
914 .or_default()
915 .insert(key.clone(), Some(value.clone()));
916 } else if let Some(e) = self.map.get_mut(name) {
917 e.remove(key);
918 }
919 } else {
920 self.map
921 .entry(name.clone())
922 .or_default()
923 .insert(key.clone(), value.clone());
924 }
925 }
926 }
927}
928
929impl<R: Read> FactPerspective for LinearFactPerspective<R> {}
930
931impl<R: Read> Query for LinearFactPerspective<R> {
932 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
933 if let Some(wrapped) = self.map.get(name).and_then(|m| m.get(keys)) {
934 return Ok(wrapped.as_deref().map(Box::from));
935 }
936 match &self.prior {
937 FactPerspectivePrior::None => Ok(None),
938 FactPerspectivePrior::FactPerspective(prior) => prior.query(name, keys),
939 FactPerspectivePrior::FactIndex { offset, reader } => {
940 let repr: FactIndexRepr = reader.fetch(*offset)?;
941 let prior = LinearFactIndex {
942 repr,
943 reader: reader.clone(),
944 };
945 prior.query(name, keys)
946 }
947 }
948 }
949
950 type QueryIterator = QueryIterator;
951 fn query_prefix(
952 &self,
953 name: &str,
954 prefix: &[Box<[u8]>],
955 ) -> Result<QueryIterator, StorageError> {
956 Ok(QueryIterator::new(
957 self.query_prefix_inner(name, prefix)?.into_iter(),
958 ))
959 }
960}
961
962impl<R: Read> LinearFactPerspective<R> {
963 fn query_prefix_inner(
964 &self,
965 name: &str,
966 prefix: &[Box<[u8]>],
967 ) -> Result<FactMap, StorageError> {
968 let mut matches = match &self.prior {
969 FactPerspectivePrior::None => BTreeMap::new(),
970 FactPerspectivePrior::FactPerspective(prior) => {
971 prior.query_prefix_inner(name, prefix)?
972 }
973 FactPerspectivePrior::FactIndex { offset, reader } => {
974 let repr: FactIndexRepr = reader.fetch(*offset)?;
975 let prior = LinearFactIndex {
976 repr,
977 reader: reader.clone(),
978 };
979 prior.query_prefix_inner(name, prefix)?
980 }
981 };
982 if let Some(map) = self.map.get(name) {
983 for (k, v) in super::memory::find_prefixes(map, prefix) {
984 matches.insert(k.clone(), v.map(Into::into));
986 }
987 }
988 Ok(matches)
989 }
990}
991
992impl<R: Read> QueryMut for LinearFactPerspective<R> {
993 fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
994 self.map.entry(name).or_default().insert(keys, Some(value));
995 }
996
997 fn delete(&mut self, name: String, keys: Keys) {
998 if self.prior.is_none() {
999 if let Some(kv) = self.map.get_mut(&name) {
1001 kv.remove(&keys);
1002 }
1003 } else {
1004 self.map.entry(name).or_default().insert(keys, None);
1005 }
1006 }
1007}
1008
1009impl<R: Read> FactPerspective for LinearPerspective<R> {}
1010
1011impl<R: Read> Query for LinearPerspective<R> {
1012 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
1013 self.facts.query(name, keys)
1014 }
1015
1016 type QueryIterator = QueryIterator;
1017 fn query_prefix(
1018 &self,
1019 name: &str,
1020 prefix: &[Box<[u8]>],
1021 ) -> Result<QueryIterator, StorageError> {
1022 self.facts.query_prefix(name, prefix)
1023 }
1024}
1025
1026impl<R: Read> QueryMut for LinearPerspective<R> {
1027 fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1028 self.facts.insert(name.clone(), keys.clone(), value.clone());
1029 self.current_updates.push((name, keys, Some(value)));
1030 }
1031
1032 fn delete(&mut self, name: String, keys: Keys) {
1033 self.facts.delete(name.clone(), keys.clone());
1034 self.current_updates.push((name, keys, None));
1035 }
1036}
1037
1038impl<R: Read> Revertable for LinearPerspective<R> {
1039 fn checkpoint(&self) -> Checkpoint {
1040 Checkpoint {
1041 index: self.commands.len(),
1042 }
1043 }
1044
1045 fn revert(&mut self, checkpoint: Checkpoint) -> Result<(), Bug> {
1046 if checkpoint.index == self.commands.len() {
1047 return Ok(());
1048 }
1049
1050 if checkpoint.index > self.commands.len() {
1051 bug!(
1052 "A checkpoint's index should always be less than or equal to the length of a perspective's command history!"
1053 );
1054 }
1055
1056 self.commands.truncate(checkpoint.index);
1057 self.facts.clear();
1058 self.current_updates.clear();
1059 for data in &self.commands {
1060 self.facts.apply_updates(&data.updates);
1061 }
1062
1063 Ok(())
1064 }
1065}
1066
1067impl<R: Read> Perspective for LinearPerspective<R> {
1068 fn policy(&self) -> PolicyId {
1069 self.policy
1070 }
1071
1072 fn add_command(&mut self, command: &impl Command) -> Result<usize, StorageError> {
1073 if command.parent() != self.head_address()? {
1074 return Err(StorageError::PerspectiveHeadMismatch);
1075 }
1076
1077 self.commands.push(CommandData {
1078 id: command.id(),
1079 priority: command.priority(),
1080 policy: command.policy().map(Box::from),
1081 data: command.bytes().into(),
1082 updates: core::mem::take(&mut self.current_updates),
1083 });
1084 Ok(self.commands.len())
1085 }
1086
1087 fn includes(&self, id: CmdId) -> bool {
1088 self.commands.iter().any(|cmd| cmd.id == id)
1089 }
1090
1091 fn head_address(&self) -> Result<Prior<Address>, Bug> {
1092 Ok(if let Some(last) = self.commands.last() {
1093 Prior::Single(Address {
1094 id: last.id,
1095 max_cut: self
1096 .max_cut
1097 .checked_add(self.commands.len())
1098 .assume("must not overflow")?
1099 .checked_sub(1)
1100 .assume("must not overflow")?,
1101 })
1102 } else {
1103 self.parents
1104 })
1105 }
1106}
1107
1108impl From<Prior<Address>> for Prior<CmdId> {
1109 fn from(p: Prior<Address>) -> Self {
1110 match p {
1111 Prior::None => Self::None,
1112 Prior::Single(l) => Self::Single(l.id),
1113 Prior::Merge(l, r) => Self::Merge(l.id, r.id),
1114 }
1115 }
1116}
1117
1118impl Command for LinearCommand<'_> {
1119 fn priority(&self) -> Priority {
1120 self.priority.clone()
1121 }
1122
1123 fn id(&self) -> CmdId {
1124 *self.id
1125 }
1126
1127 fn parent(&self) -> Prior<Address> {
1128 self.parent
1129 }
1130
1131 fn policy(&self) -> Option<&[u8]> {
1132 self.policy
1133 }
1134
1135 fn bytes(&self) -> &[u8] {
1136 self.data
1137 }
1138
1139 fn max_cut(&self) -> Result<usize, Bug> {
1140 Ok(self.max_cut)
1141 }
1142}
1143
1144#[cfg(test)]
1145mod test {
1146 use testing::Manager;
1147
1148 use super::*;
1149 use crate::testing::dsl::{StorageBackend, test_suite};
1150
1151 #[test]
1152 fn test_query_prefix() {
1153 let mut provider = LinearStorageProvider::new(Manager::new());
1154 let mut fp = provider.new_perspective(PolicyId::new(0));
1155
1156 let name = "x";
1157
1158 let keys: &[&[&str]] = &[
1159 &["aa", "xy", "123"],
1160 &["aa", "xz", "123"],
1161 &["bb", "ccc"],
1162 &["bc", ""],
1163 ];
1164 let keys: Vec<Keys> = keys
1165 .iter()
1166 .map(|ks| ks.iter().map(|k| k.as_bytes()).collect())
1167 .collect();
1168
1169 for ks in &keys {
1170 fp.insert(
1171 name.into(),
1172 ks.clone(),
1173 format!("{ks:?}").into_bytes().into(),
1174 );
1175 }
1176
1177 let prefixes: &[&[&str]] = &[
1178 &["aa", "xy", "12"],
1179 &["aa", "xy"],
1180 &["aa", "xz"],
1181 &["aa", "x"],
1182 &["bb", ""],
1183 &["bb", "ccc"],
1184 &["bc", ""],
1185 &["bc", "", ""],
1186 ];
1187
1188 for prefix in prefixes {
1189 let prefix: Keys = prefix.iter().map(|k| k.as_bytes()).collect();
1190 let found: Vec<_> = fp.query_prefix(name, &prefix).unwrap().collect();
1191 let mut expected: Vec<_> = keys.iter().filter(|k| k.starts_with(&prefix)).collect();
1192 expected.sort();
1193 assert_eq!(found.len(), expected.len());
1194 for (a, b) in std::iter::zip(found, expected) {
1195 let a = a.unwrap();
1196 assert_eq!(&a.key, b);
1197 assert_eq!(a.value.as_ref(), format!("{b:?}").as_bytes());
1198 }
1199 }
1200 }
1201
1202 struct LinearBackend;
1203 impl StorageBackend for LinearBackend {
1204 type StorageProvider = LinearStorageProvider<Manager>;
1205
1206 fn provider(&mut self, _client_id: u64) -> Self::StorageProvider {
1207 LinearStorageProvider::new(Manager::new())
1208 }
1209 }
1210 test_suite!(|| LinearBackend);
1211}