1pub mod libc;
26
27#[cfg(feature = "testing")]
28pub mod testing;
29
30use alloc::{boxed::Box, collections::BTreeMap, string::String, vec, vec::Vec};
31
32use aranya_crypto::{Csprng, Rng, dangerous::spideroak_crypto::csprng::rand::Rng as _};
33use buggy::{Bug, BugExt, bug};
34use serde::{Deserialize, Serialize};
35use vec1::Vec1;
36
37use crate::{
38 Address, Checkpoint, Command, CommandId, Fact, FactIndex, FactPerspective, GraphId, Keys,
39 Location, Perspective, PolicyId, Prior, Priority, Query, QueryMut, Revertable, Segment,
40 Storage, StorageError, StorageProvider,
41};
42
43pub mod io;
44pub use io::*;
45
46const MAX_FACT_INDEX_DEPTH: usize = 16;
58
59pub struct LinearStorageProvider<FM: IoManager> {
60 manager: FM,
61 storage: BTreeMap<GraphId, LinearStorage<FM::Writer>>,
62}
63
64pub struct LinearStorage<W> {
65 writer: W,
66}
67
68#[derive(Debug)]
69pub struct LinearSegment<R> {
70 repr: SegmentRepr,
71 reader: R,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75struct SegmentRepr {
76 offset: usize,
78 prior: Prior<Location>,
79 parents: Prior<Address>,
80 policy: PolicyId,
81 facts: usize,
83 commands: Vec1<CommandData>,
84 max_cut: usize,
85 skip_list: Vec<(Location, usize)>,
86}
87
88#[derive(Debug, Serialize, Deserialize)]
89struct CommandData {
90 id: CommandId,
91 priority: Priority,
92 policy: Option<Bytes>,
93 data: Bytes,
94 updates: Vec<Update>,
95}
96
97pub struct LinearCommand<'a> {
98 id: &'a CommandId,
99 parent: Prior<Address>,
100 priority: Priority,
101 policy: Option<&'a [u8]>,
102 data: &'a [u8],
103 max_cut: usize,
104}
105
106type Bytes = Box<[u8]>;
107
108type Update = (String, Keys, Option<Bytes>);
109type FactMap = BTreeMap<Keys, Option<Box<[u8]>>>;
110type NamedFactMap = BTreeMap<String, FactMap>;
111
112#[derive(Debug)]
113pub struct LinearFactIndex<R> {
114 repr: FactIndexRepr,
115 reader: R,
116}
117
118#[derive(Debug, Serialize, Deserialize)]
119struct FactIndexRepr {
120 offset: usize,
122 prior: Option<usize>,
124 depth: usize,
128 facts: NamedFactMap,
130}
131
132#[derive(Debug)]
133pub struct LinearPerspective<R> {
134 prior: Prior<Location>,
135 parents: Prior<Address>,
136 policy: PolicyId,
137 facts: LinearFactPerspective<R>,
138 commands: Vec<CommandData>,
139 current_updates: Vec<Update>,
140 max_cut: usize,
141 last_common_ancestor: Option<(Location, usize)>,
142}
143
144impl<R> LinearPerspective<R> {
145 fn new(
146 prior: Prior<Location>,
147 parents: Prior<Address>,
148 policy: PolicyId,
149 prior_facts: FactPerspectivePrior<R>,
150 max_cut: usize,
151 last_common_ancestor: Option<(Location, usize)>,
152 ) -> Self {
153 Self {
154 prior,
155 parents,
156 policy,
157 facts: LinearFactPerspective::new(prior_facts),
158 commands: Vec::new(),
159 current_updates: Vec::new(),
160 max_cut,
161 last_common_ancestor,
162 }
163 }
164}
165
166#[derive(Debug)]
167pub struct LinearFactPerspective<R> {
168 map: BTreeMap<String, BTreeMap<Keys, Option<Bytes>>>,
169 prior: FactPerspectivePrior<R>,
170}
171
172impl<R> LinearFactPerspective<R> {
173 fn new(prior: FactPerspectivePrior<R>) -> Self {
174 Self {
175 map: BTreeMap::new(),
176 prior,
177 }
178 }
179}
180
181#[derive(Debug)]
182enum FactPerspectivePrior<R> {
183 None,
184 FactPerspective(Box<LinearFactPerspective<R>>),
185 FactIndex { offset: usize, reader: R },
186}
187
188impl<R> FactPerspectivePrior<R> {
189 fn is_none(&self) -> bool {
190 matches!(self, Self::None)
191 }
192}
193
194impl<FM: IoManager + Default> Default for LinearStorageProvider<FM> {
195 fn default() -> Self {
196 Self {
197 manager: FM::default(),
198 storage: BTreeMap::new(),
199 }
200 }
201}
202
203impl<FM: IoManager> LinearStorageProvider<FM> {
204 pub fn new(manager: FM) -> Self {
205 Self {
206 manager,
207 storage: BTreeMap::new(),
208 }
209 }
210}
211
212impl<FM: IoManager> StorageProvider for LinearStorageProvider<FM> {
213 type Perspective = LinearPerspective<<FM::Writer as Write>::ReadOnly>;
214 type Segment = LinearSegment<<FM::Writer as Write>::ReadOnly>;
215 type Storage = LinearStorage<FM::Writer>;
216
217 fn new_perspective(&mut self, policy_id: PolicyId) -> Self::Perspective {
218 LinearPerspective::new(
219 Prior::None,
220 Prior::None,
221 policy_id,
222 FactPerspectivePrior::None,
223 0,
224 None,
225 )
226 }
227
228 fn new_storage(
229 &mut self,
230 init: Self::Perspective,
231 ) -> Result<(GraphId, &mut Self::Storage), StorageError> {
232 use alloc::collections::btree_map::Entry;
233
234 if init.commands.is_empty() {
235 return Err(StorageError::EmptyPerspective);
236 }
237 let graph_id = GraphId::from(init.commands[0].id.into_id());
238 let Entry::Vacant(entry) = self.storage.entry(graph_id) else {
239 return Err(StorageError::StorageExists);
240 };
241
242 let file = self.manager.create(graph_id)?;
243 Ok((graph_id, entry.insert(LinearStorage::create(file, init)?)))
244 }
245
246 fn get_storage(&mut self, graph: GraphId) -> Result<&mut Self::Storage, StorageError> {
247 use alloc::collections::btree_map::Entry;
248
249 let entry = match self.storage.entry(graph) {
250 Entry::Vacant(v) => v,
251 Entry::Occupied(o) => return Ok(o.into_mut()),
252 };
253
254 let file = self
255 .manager
256 .open(graph)?
257 .ok_or(StorageError::NoSuchStorage)?;
258 Ok(entry.insert(LinearStorage::open(file)?))
259 }
260
261 fn remove_storage(&mut self, graph: GraphId) -> Result<(), StorageError> {
262 self.manager.remove(graph)?;
263
264 self.storage
265 .remove(&graph)
266 .ok_or(StorageError::NoSuchStorage)?;
267
268 Ok(())
269 }
270
271 fn list_graph_ids(
272 &mut self,
273 ) -> Result<impl Iterator<Item = Result<GraphId, StorageError>>, StorageError> {
274 self.manager.list()
275 }
276}
277
278impl<W: Write> LinearStorage<W> {
279 fn get_skip(
280 &self,
281 segment: <LinearStorage<W> as Storage>::Segment,
282 max_cut: usize,
283 ) -> Result<Option<(Location, usize)>, StorageError> {
284 let mut head = segment;
285 let mut current = None;
286 'outer: loop {
287 if max_cut > head.longest_max_cut()? {
288 return Ok(current);
289 }
290 current = Some((head.first_location(), head.shortest_max_cut()));
291 if max_cut >= head.shortest_max_cut() {
292 return Ok(current);
293 }
294 for (skip, skip_max_cut) in head.skip_list() {
297 if skip_max_cut <= &max_cut {
298 head = self.get_segment(*skip)?;
299 continue 'outer;
300 }
301 }
302 head = match head.prior() {
303 Prior::None | Prior::Merge(_, _) => {
304 return Ok(current);
305 }
306 Prior::Single(l) => self.get_segment(l)?,
307 }
308 }
309 }
310}
311
312impl<W: Write> LinearStorage<W> {
313 fn create(mut writer: W, init: LinearPerspective<W::ReadOnly>) -> Result<Self, StorageError> {
314 assert!(matches!(init.prior, Prior::None));
315 assert!(matches!(init.parents, Prior::None));
316 assert!(matches!(init.facts.prior, FactPerspectivePrior::None));
317
318 let mut map = init.facts.map;
319 map.retain(|_, kv| !kv.is_empty());
320
321 let facts = writer
322 .append(|offset| FactIndexRepr {
323 offset,
324 prior: None,
325 depth: 1,
326 facts: map,
327 })?
328 .offset;
329
330 let commands = init
331 .commands
332 .try_into()
333 .map_err(|_| StorageError::EmptyPerspective)?;
334 let segment = writer.append(|offset| SegmentRepr {
335 offset,
336 prior: Prior::None,
337 parents: Prior::None,
338 policy: init.policy,
339 facts,
340 commands,
341 max_cut: 0,
342 skip_list: vec![],
343 })?;
344
345 let head = Location::new(
346 segment.offset,
347 segment
348 .commands
349 .len()
350 .checked_sub(1)
351 .assume("vec1 length >= 1")?,
352 );
353
354 writer.commit(head)?;
355
356 let storage = Self { writer };
357
358 Ok(storage)
359 }
360
361 fn open(writer: W) -> Result<Self, StorageError> {
362 Ok(Self { writer })
363 }
364
365 fn compact(&mut self, mut repr: FactIndexRepr) -> Result<FactIndexRepr, StorageError> {
366 let mut map = NamedFactMap::new();
367 let reader = self.writer.readonly();
368 loop {
369 for (name, kv) in repr.facts {
370 let sub = map.entry(name).or_default();
371 for (k, v) in kv {
372 sub.entry(k).or_insert(v);
373 }
374 }
375 let Some(offset) = repr.prior else { break };
376 repr = reader.fetch(offset)?;
377 }
378
379 map.retain(|_, kv| {
381 kv.retain(|_, v| v.is_some());
382 !kv.is_empty()
383 });
384
385 Ok(self
386 .write_facts(LinearFactPerspective {
387 map,
388 prior: FactPerspectivePrior::None,
389 })?
390 .repr)
391 }
392}
393
394impl<F: Write> Storage for LinearStorage<F> {
395 type Perspective = LinearPerspective<F::ReadOnly>;
396 type FactPerspective = LinearFactPerspective<F::ReadOnly>;
397 type Segment = LinearSegment<F::ReadOnly>;
398 type FactIndex = LinearFactIndex<F::ReadOnly>;
399
400 fn get_command_id(&self, location: Location) -> Result<CommandId, StorageError> {
401 let seg = self.get_segment(location)?;
402 let cmd = seg
403 .get_command(location)
404 .ok_or(StorageError::CommandOutOfBounds(location))?;
405 Ok(cmd.id())
406 }
407
408 fn get_linear_perspective(
409 &self,
410 parent: Location,
411 ) -> Result<Option<Self::Perspective>, StorageError> {
412 let segment = self.get_segment(parent)?;
413 let command = segment
414 .get_command(parent)
415 .ok_or(StorageError::CommandOutOfBounds(parent))?;
416 let policy = segment.repr.policy;
417 let prior_facts: FactPerspectivePrior<F::ReadOnly> = if parent == segment.head_location() {
418 FactPerspectivePrior::FactIndex {
419 offset: segment.repr.facts,
420 reader: self.writer.readonly(),
421 }
422 } else {
423 let prior = match segment.facts()?.repr.prior {
424 Some(offset) => FactPerspectivePrior::FactIndex {
425 offset,
426 reader: self.writer.readonly(),
427 },
428 None => FactPerspectivePrior::None,
429 };
430 let mut facts = LinearFactPerspective::new(prior);
431 for data in &segment.repr.commands[..=parent.command] {
432 facts.apply_updates(&data.updates);
433 }
434 if facts.prior.is_none() {
435 facts.map.retain(|_, kv| !kv.is_empty());
436 }
437 if facts.map.is_empty() {
438 facts.prior
439 } else {
440 FactPerspectivePrior::FactPerspective(Box::new(facts))
441 }
442 };
443 let prior = Prior::Single(parent);
444
445 let perspective = LinearPerspective::new(
446 prior,
447 Prior::Single(command.address()?),
448 policy,
449 prior_facts,
450 command
451 .max_cut()?
452 .checked_add(1)
453 .assume("must not overflow")?,
454 None,
455 );
456
457 Ok(Some(perspective))
458 }
459
460 fn get_fact_perspective(
461 &self,
462 location: Location,
463 ) -> Result<Self::FactPerspective, StorageError> {
464 let segment = self.get_segment(location)?;
465
466 if location == segment.head_location()
469 || segment
470 .repr
471 .commands
472 .iter()
473 .all(|cmd| cmd.updates.is_empty())
474 {
475 return Ok(LinearFactPerspective::new(
476 FactPerspectivePrior::FactIndex {
477 offset: segment.repr.facts,
478 reader: self.writer.readonly(),
479 },
480 ));
481 }
482
483 let prior = match segment.facts()?.repr.prior {
484 Some(offset) => FactPerspectivePrior::FactIndex {
485 offset,
486 reader: self.writer.readonly(),
487 },
488 None => FactPerspectivePrior::None,
489 };
490 let mut facts = LinearFactPerspective::new(prior);
491 for data in &segment.repr.commands[..=location.command] {
492 facts.apply_updates(&data.updates);
493 }
494
495 Ok(facts)
496 }
497
498 fn new_merge_perspective(
499 &self,
500 left: Location,
501 right: Location,
502 last_common_ancestor: (Location, usize),
503 policy_id: PolicyId,
504 braid: Self::FactIndex,
505 ) -> Result<Option<Self::Perspective>, StorageError> {
506 let left_segment = self.get_segment(left)?;
509 let left_command = left_segment
510 .get_command(left)
511 .ok_or(StorageError::CommandOutOfBounds(left))?;
512 let right_segment = self.get_segment(right)?;
513 let right_command = right_segment
514 .get_command(right)
515 .ok_or(StorageError::CommandOutOfBounds(right))?;
516
517 let parent = Prior::Merge(left_command.address()?, right_command.address()?);
518
519 if policy_id != left_segment.policy() && policy_id != right_segment.policy() {
520 return Err(StorageError::PolicyMismatch);
521 }
522
523 let prior = Prior::Merge(left, right);
524
525 let perspective = LinearPerspective::new(
526 prior,
527 parent,
528 policy_id,
529 FactPerspectivePrior::FactIndex {
530 offset: braid.repr.offset,
531 reader: braid.reader,
532 },
533 left_command
534 .max_cut()?
535 .max(right_command.max_cut()?)
536 .checked_add(1)
537 .assume("must not overflow")?,
538 Some(last_common_ancestor),
539 );
540
541 Ok(Some(perspective))
542 }
543
544 fn get_segment(&self, location: Location) -> Result<Self::Segment, StorageError> {
545 let reader = self.writer.readonly();
546 let repr = reader.fetch(location.segment)?;
547 let seg = LinearSegment { repr, reader };
548
549 Ok(seg)
550 }
551
552 fn get_head(&self) -> Result<Location, StorageError> {
553 self.writer.head()
554 }
555
556 fn commit(&mut self, segment: Self::Segment) -> Result<(), StorageError> {
557 if !self.is_ancestor(self.get_head()?, &segment)? {
558 return Err(StorageError::HeadNotAncestor);
559 }
560
561 self.writer.commit(segment.head_location())
562 }
563
564 fn write(&mut self, perspective: Self::Perspective) -> Result<Self::Segment, StorageError> {
565 let facts = self.write_facts(perspective.facts)?.repr.offset;
568
569 let commands: Vec1<CommandData> = perspective
570 .commands
571 .try_into()
572 .map_err(|_| StorageError::EmptyPerspective)?;
573
574 let get_skips =
575 |l: Location, count: usize| -> Result<Vec<(Location, usize)>, StorageError> {
576 let mut rng = &mut Rng as &mut dyn Csprng;
577 let mut skips = vec![];
578 for _ in 0..count {
579 let segment = self.get_segment(l)?;
580 let l_max_cut = segment
581 .get_command(l)
582 .assume("location must exist")?
583 .max_cut;
584 if l_max_cut > 0 {
585 let max_cut = rng.gen_range(0..l_max_cut);
586 if let Some(skip) = self.get_skip(segment, max_cut)? {
587 if !skips.contains(&skip) {
588 skips.push(skip);
589 }
590 } else {
591 break;
592 }
593 }
594 }
595 Ok(skips)
596 };
597
598 let skip_list = match perspective.prior {
599 Prior::None => vec![],
600 Prior::Merge(_, _) => {
601 let (lca, max_cut) = perspective.last_common_ancestor.assume("lca must exist")?;
602 let mut skips = get_skips(lca, 2)?;
603 if !skips.contains(&(lca, max_cut)) {
604 skips.push((lca, max_cut));
605 }
606 skips.sort();
607 skips
608 }
609 Prior::Single(l) => {
610 let mut skips = get_skips(l, 3)?;
611 skips.sort();
612 skips
613 }
614 };
615 let repr = self.writer.append(|offset| SegmentRepr {
616 offset,
617 prior: perspective.prior,
618 parents: perspective.parents,
619 policy: perspective.policy,
620 facts,
621 commands,
622 max_cut: perspective.max_cut,
623 skip_list,
624 })?;
625
626 Ok(LinearSegment {
627 repr,
628 reader: self.writer.readonly(),
629 })
630 }
631
632 fn write_facts(
633 &mut self,
634 facts: Self::FactPerspective,
635 ) -> Result<Self::FactIndex, StorageError> {
636 let mut prior = match facts.prior {
637 FactPerspectivePrior::None => None,
638 FactPerspectivePrior::FactPerspective(prior) => {
639 let prior = self.write_facts(*prior)?;
640 if facts.map.is_empty() {
641 return Ok(prior);
642 }
643 Some(prior.repr)
644 }
645 FactPerspectivePrior::FactIndex { offset, reader } => {
646 let repr = reader.fetch(offset)?;
647 if facts.map.is_empty() {
648 return Ok(LinearFactIndex { repr, reader });
649 }
650 Some(repr)
651 }
652 };
653
654 let depth = if let Some(mut p) = prior.take() {
655 if p.depth > MAX_FACT_INDEX_DEPTH - 1 {
656 p = self.compact(p)?;
657 }
658 prior.insert(p).depth
659 } else {
660 0
661 };
662
663 let depth = depth.checked_add(1).assume("depth won't overflow")?;
664
665 if depth > MAX_FACT_INDEX_DEPTH {
666 bug!("fact index too deep");
667 }
668
669 let repr = self.writer.append(|offset| FactIndexRepr {
670 offset,
671 prior: prior.map(|p| p.offset),
672 depth,
673 facts: facts.map,
674 })?;
675
676 Ok(LinearFactIndex {
677 repr,
678 reader: self.writer.readonly(),
679 })
680 }
681}
682
683impl<R: Read> Segment for LinearSegment<R> {
684 type FactIndex = LinearFactIndex<R>;
685 type Command<'a>
686 = LinearCommand<'a>
687 where
688 R: 'a;
689
690 fn head(&self) -> Result<Self::Command<'_>, StorageError> {
691 let data = self.repr.commands.last();
692 let parent = if let Some(prev) = usize::checked_sub(self.repr.commands.len(), 2) {
693 Prior::Single(Address {
694 id: self.repr.commands[prev].id,
695 max_cut: self
696 .repr
697 .max_cut
698 .checked_add(prev)
699 .assume("must not overflow")?,
700 })
701 } else {
702 self.repr.parents
703 };
704 Ok(LinearCommand {
705 id: &data.id,
706 parent,
707 priority: data.priority.clone(),
708 policy: data.policy.as_deref(),
709 data: &data.data,
710 max_cut: self
711 .repr
712 .max_cut
713 .checked_add(self.repr.commands.len())
714 .assume("must not overflow")?
715 .checked_sub(1)
716 .assume("must not overflow")?,
717 })
718 }
719
720 fn first(&self) -> Self::Command<'_> {
721 let data = self.repr.commands.first();
722 LinearCommand {
723 id: &data.id,
724 parent: self.repr.parents,
725 priority: data.priority.clone(),
726 policy: data.policy.as_deref(),
727 data: &data.data,
728 max_cut: self.repr.max_cut,
729 }
730 }
731
732 fn head_location(&self) -> Location {
733 #[allow(clippy::arithmetic_side_effects)]
735 Location::new(self.repr.offset, self.repr.commands.len() - 1)
736 }
737
738 fn first_location(&self) -> Location {
739 Location::new(self.repr.offset, 0)
740 }
741
742 fn contains(&self, location: Location) -> bool {
743 location.segment == self.repr.offset && location.command < self.repr.commands.len()
744 }
745
746 fn policy(&self) -> PolicyId {
747 self.repr.policy
748 }
749
750 fn prior(&self) -> Prior<Location> {
751 self.repr.prior
752 }
753
754 fn get_command(&self, location: Location) -> Option<Self::Command<'_>> {
755 if self.repr.offset != location.segment {
756 return None;
757 }
758 let data = self.repr.commands.get(location.command)?;
759 let parent = if let Some(prev) = usize::checked_sub(location.command, 1) {
760 if let Some(max_cut) = self.repr.max_cut.checked_add(prev) {
761 Prior::Single(Address {
762 id: self.repr.commands[prev].id,
763 max_cut,
764 })
765 } else {
766 return None;
767 }
768 } else {
769 self.repr.parents
770 };
771 self.repr
772 .max_cut
773 .checked_add(location.command)
774 .map(|max_cut| LinearCommand {
775 id: &data.id,
776 parent,
777 priority: data.priority.clone(),
778 policy: data.policy.as_deref(),
779 data: &data.data,
780 max_cut,
781 })
782 }
783
784 fn get_from(&self, location: Location) -> Vec<Self::Command<'_>> {
785 if self.repr.offset != location.segment {
786 return Vec::new();
788 }
789
790 (location.command..self.repr.commands.len())
792 .map(|c| Location::new(location.segment, c))
793 .map(|loc| {
794 self.get_command(loc)
795 .expect("constructed location is valid")
796 })
797 .collect()
798 }
799
800 fn get_from_max_cut(&self, max_cut: usize) -> Result<Option<Location>, StorageError> {
801 if max_cut >= self.repr.max_cut
802 && max_cut
803 <= self
804 .repr
805 .max_cut
806 .checked_add(self.repr.commands.len())
807 .assume("must not overflow")?
808 {
809 return Ok(Some(Location::new(
810 self.repr.offset,
811 max_cut
812 .checked_sub(self.repr.max_cut)
813 .assume("must not overflow")?,
814 )));
815 }
816 Ok(None)
817 }
818
819 fn facts(&self) -> Result<Self::FactIndex, StorageError> {
820 Ok(LinearFactIndex {
821 repr: self.reader.fetch(self.repr.facts)?,
822 reader: self.reader.clone(),
823 })
824 }
825
826 fn skip_list(&self) -> &[(Location, usize)] {
827 &self.repr.skip_list
828 }
829
830 fn shortest_max_cut(&self) -> usize {
831 self.repr.max_cut
832 }
833
834 fn longest_max_cut(&self) -> Result<usize, StorageError> {
835 Ok(self
836 .repr
837 .max_cut
838 .checked_add(self.repr.commands.len())
839 .assume("must not overflow")?
840 .checked_sub(1)
841 .assume("must not overflow")?)
842 }
843}
844
845impl<R: Read> FactIndex for LinearFactIndex<R> {}
846
847type MapIter = alloc::collections::btree_map::IntoIter<Keys, Option<Bytes>>;
848pub struct QueryIterator {
849 it: MapIter,
850}
851
852impl QueryIterator {
853 fn new(it: MapIter) -> Self {
854 Self { it }
855 }
856}
857
858impl Iterator for QueryIterator {
859 type Item = Result<Fact, StorageError>;
860 fn next(&mut self) -> Option<Self::Item> {
861 loop {
862 if let (key, Some(value)) = self.it.next()? {
864 return Some(Ok(Fact { key, value }));
865 }
866 }
867 }
868}
869
870impl<R: Read> Query for LinearFactIndex<R> {
871 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
872 let mut prior = Some(&self.repr);
873 let mut slot; while let Some(facts) = prior {
875 if let Some(v) = facts.facts.get(name).and_then(|m| m.get(keys)) {
876 return Ok(v.as_ref().cloned());
877 }
878 slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
879 prior = slot.as_ref();
880 }
881 Ok(None)
882 }
883
884 type QueryIterator = QueryIterator;
885 fn query_prefix(
886 &self,
887 name: &str,
888 prefix: &[Box<[u8]>],
889 ) -> Result<QueryIterator, StorageError> {
890 Ok(QueryIterator::new(
891 self.query_prefix_inner(name, prefix)?.into_iter(),
892 ))
893 }
894}
895
896impl<R: Read> LinearFactIndex<R> {
897 fn query_prefix_inner(
898 &self,
899 name: &str,
900 prefix: &[Box<[u8]>],
901 ) -> Result<FactMap, StorageError> {
902 let mut matches = BTreeMap::new();
903 let mut prior = Some(&self.repr);
904 let mut slot; while let Some(facts) = prior {
906 if let Some(map) = facts.facts.get(name) {
907 for (k, v) in super::memory::find_prefixes(map, prefix) {
908 if !matches.contains_key(k) {
910 matches.insert(k.clone(), v.map(Into::into));
911 }
912 }
913 }
914 slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
915 prior = slot.as_ref();
916 }
917 Ok(matches)
918 }
919}
920
921impl<R> LinearFactPerspective<R> {
922 fn clear(&mut self) {
923 self.map.clear();
924 }
925
926 fn apply_updates(&mut self, updates: &[Update]) {
927 for (name, key, value) in updates {
928 if self.prior.is_none() {
929 if let Some(value) = value {
930 self.map
931 .entry(name.clone())
932 .or_default()
933 .insert(key.clone(), Some(value.clone()));
934 } else if let Some(e) = self.map.get_mut(name) {
935 e.remove(key);
936 }
937 } else {
938 self.map
939 .entry(name.clone())
940 .or_default()
941 .insert(key.clone(), value.clone());
942 }
943 }
944 }
945}
946
947impl<R: Read> FactPerspective for LinearFactPerspective<R> {}
948
949impl<R: Read> Query for LinearFactPerspective<R> {
950 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
951 if let Some(wrapped) = self.map.get(name).and_then(|m| m.get(keys)) {
952 return Ok(wrapped.as_deref().map(Box::from));
953 }
954 match &self.prior {
955 FactPerspectivePrior::None => Ok(None),
956 FactPerspectivePrior::FactPerspective(prior) => prior.query(name, keys),
957 FactPerspectivePrior::FactIndex { offset, reader } => {
958 let repr: FactIndexRepr = reader.fetch(*offset)?;
959 let prior = LinearFactIndex {
960 repr,
961 reader: reader.clone(),
962 };
963 prior.query(name, keys)
964 }
965 }
966 }
967
968 type QueryIterator = QueryIterator;
969 fn query_prefix(
970 &self,
971 name: &str,
972 prefix: &[Box<[u8]>],
973 ) -> Result<QueryIterator, StorageError> {
974 Ok(QueryIterator::new(
975 self.query_prefix_inner(name, prefix)?.into_iter(),
976 ))
977 }
978}
979
980impl<R: Read> LinearFactPerspective<R> {
981 fn query_prefix_inner(
982 &self,
983 name: &str,
984 prefix: &[Box<[u8]>],
985 ) -> Result<FactMap, StorageError> {
986 let mut matches = match &self.prior {
987 FactPerspectivePrior::None => BTreeMap::new(),
988 FactPerspectivePrior::FactPerspective(prior) => {
989 prior.query_prefix_inner(name, prefix)?
990 }
991 FactPerspectivePrior::FactIndex { offset, reader } => {
992 let repr: FactIndexRepr = reader.fetch(*offset)?;
993 let prior = LinearFactIndex {
994 repr,
995 reader: reader.clone(),
996 };
997 prior.query_prefix_inner(name, prefix)?
998 }
999 };
1000 if let Some(map) = self.map.get(name) {
1001 for (k, v) in super::memory::find_prefixes(map, prefix) {
1002 matches.insert(k.clone(), v.map(Into::into));
1004 }
1005 }
1006 Ok(matches)
1007 }
1008}
1009
1010impl<R: Read> QueryMut for LinearFactPerspective<R> {
1011 fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1012 self.map.entry(name).or_default().insert(keys, Some(value));
1013 }
1014
1015 fn delete(&mut self, name: String, keys: Keys) {
1016 if self.prior.is_none() {
1017 if let Some(kv) = self.map.get_mut(&name) {
1019 kv.remove(&keys);
1020 }
1021 } else {
1022 self.map.entry(name).or_default().insert(keys, None);
1023 }
1024 }
1025}
1026
1027impl<R: Read> FactPerspective for LinearPerspective<R> {}
1028
1029impl<R: Read> Query for LinearPerspective<R> {
1030 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
1031 self.facts.query(name, keys)
1032 }
1033
1034 type QueryIterator = QueryIterator;
1035 fn query_prefix(
1036 &self,
1037 name: &str,
1038 prefix: &[Box<[u8]>],
1039 ) -> Result<QueryIterator, StorageError> {
1040 self.facts.query_prefix(name, prefix)
1041 }
1042}
1043
1044impl<R: Read> QueryMut for LinearPerspective<R> {
1045 fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1046 self.facts.insert(name.clone(), keys.clone(), value.clone());
1047 self.current_updates.push((name, keys, Some(value)));
1048 }
1049
1050 fn delete(&mut self, name: String, keys: Keys) {
1051 self.facts.delete(name.clone(), keys.clone());
1052 self.current_updates.push((name, keys, None))
1053 }
1054}
1055
1056impl<R: Read> Revertable for LinearPerspective<R> {
1057 fn checkpoint(&self) -> Checkpoint {
1058 Checkpoint {
1059 index: self.commands.len(),
1060 }
1061 }
1062
1063 fn revert(&mut self, checkpoint: Checkpoint) -> Result<(), Bug> {
1064 if checkpoint.index == self.commands.len() {
1065 return Ok(());
1066 }
1067
1068 if checkpoint.index > self.commands.len() {
1069 bug!(
1070 "A checkpoint's index should always be less than or equal to the length of a perspective's command history!"
1071 );
1072 }
1073
1074 self.commands.truncate(checkpoint.index);
1075 self.facts.clear();
1076 self.current_updates.clear();
1077 for data in &self.commands {
1078 self.facts.apply_updates(&data.updates);
1079 }
1080
1081 Ok(())
1082 }
1083}
1084
1085impl<R: Read> Perspective for LinearPerspective<R> {
1086 fn policy(&self) -> PolicyId {
1087 self.policy
1088 }
1089
1090 fn add_command(&mut self, command: &impl Command) -> Result<usize, StorageError> {
1091 if command.parent() != self.head_address()? {
1092 return Err(StorageError::PerspectiveHeadMismatch);
1093 }
1094
1095 self.commands.push(CommandData {
1096 id: command.id(),
1097 priority: command.priority(),
1098 policy: command.policy().map(Box::from),
1099 data: command.bytes().into(),
1100 updates: core::mem::take(&mut self.current_updates),
1101 });
1102 Ok(self.commands.len())
1103 }
1104
1105 fn includes(&self, id: CommandId) -> bool {
1106 self.commands.iter().any(|cmd| cmd.id == id)
1107 }
1108
1109 fn head_address(&self) -> Result<Prior<Address>, Bug> {
1110 Ok(if let Some(last) = self.commands.last() {
1111 Prior::Single(Address {
1112 id: last.id,
1113 max_cut: self
1114 .max_cut
1115 .checked_add(self.commands.len())
1116 .assume("must not overflow")?
1117 .checked_sub(1)
1118 .assume("must not overflow")?,
1119 })
1120 } else {
1121 self.parents
1122 })
1123 }
1124}
1125
1126impl From<Prior<Address>> for Prior<CommandId> {
1127 fn from(p: Prior<Address>) -> Self {
1128 match p {
1129 Prior::None => Prior::None,
1130 Prior::Single(l) => Prior::Single(l.id),
1131 Prior::Merge(l, r) => Prior::Merge(l.id, r.id),
1132 }
1133 }
1134}
1135
1136impl Command for LinearCommand<'_> {
1137 fn priority(&self) -> Priority {
1138 self.priority.clone()
1139 }
1140
1141 fn id(&self) -> CommandId {
1142 *self.id
1143 }
1144
1145 fn parent(&self) -> Prior<Address> {
1146 self.parent
1147 }
1148
1149 fn policy(&self) -> Option<&[u8]> {
1150 self.policy
1151 }
1152
1153 fn bytes(&self) -> &[u8] {
1154 self.data
1155 }
1156
1157 fn max_cut(&self) -> Result<usize, Bug> {
1158 Ok(self.max_cut)
1159 }
1160}
1161
1162#[cfg(test)]
1163mod test {
1164 use testing::Manager;
1165
1166 use super::*;
1167 use crate::testing::dsl::{StorageBackend, test_suite};
1168
1169 #[test]
1170 fn test_query_prefix() {
1171 let mut provider = LinearStorageProvider::new(Manager::new());
1172 let mut fp = provider.new_perspective(PolicyId::new(0));
1173
1174 let name = "x";
1175
1176 let keys: &[&[&str]] = &[
1177 &["aa", "xy", "123"],
1178 &["aa", "xz", "123"],
1179 &["bb", "ccc"],
1180 &["bc", ""],
1181 ];
1182 let keys: Vec<Keys> = keys
1183 .iter()
1184 .map(|ks| ks.iter().map(|k| k.as_bytes()).collect())
1185 .collect();
1186
1187 for ks in &keys {
1188 fp.insert(
1189 name.into(),
1190 ks.clone(),
1191 format!("{ks:?}").into_bytes().into(),
1192 );
1193 }
1194
1195 let prefixes: &[&[&str]] = &[
1196 &["aa", "xy", "12"],
1197 &["aa", "xy"],
1198 &["aa", "xz"],
1199 &["aa", "x"],
1200 &["bb", ""],
1201 &["bb", "ccc"],
1202 &["bc", ""],
1203 &["bc", "", ""],
1204 ];
1205
1206 for prefix in prefixes {
1207 let prefix: Keys = prefix.iter().map(|k| k.as_bytes()).collect();
1208 let found: Vec<_> = fp.query_prefix(name, &prefix).unwrap().collect();
1209 let mut expected: Vec<_> = keys.iter().filter(|k| k.starts_with(&prefix)).collect();
1210 expected.sort();
1211 assert_eq!(found.len(), expected.len());
1212 for (a, b) in std::iter::zip(found, expected) {
1213 let a = a.unwrap();
1214 assert_eq!(&a.key, b);
1215 assert_eq!(a.value.as_ref(), format!("{b:?}").as_bytes());
1216 }
1217 }
1218 }
1219
1220 struct LinearBackend;
1221 impl StorageBackend for LinearBackend {
1222 type StorageProvider = LinearStorageProvider<Manager>;
1223
1224 fn provider(&mut self, _client_id: u64) -> Self::StorageProvider {
1225 LinearStorageProvider::new(Manager::new())
1226 }
1227 }
1228 test_suite!(|| LinearBackend);
1229}