1pub mod libc;
26
27#[cfg(feature = "testing")]
28pub mod testing;
29
30use alloc::{boxed::Box, collections::BTreeMap, string::String, vec, vec::Vec};
31
32use aranya_crypto::{csprng::rand::Rng as _, Csprng, Rng};
33use buggy::{bug, Bug, BugExt};
34use serde::{Deserialize, Serialize};
35use vec1::Vec1;
36
37use crate::{
38 Address, Checkpoint, Command, CommandId, Fact, FactIndex, FactPerspective, GraphId, Keys,
39 Location, Perspective, PolicyId, Prior, Priority, Query, QueryMut, Revertable, Segment,
40 Storage, StorageError, StorageProvider,
41};
42
43pub mod io;
44pub use io::*;
45
46const MAX_FACT_INDEX_DEPTH: usize = 16;
58
59pub struct LinearStorageProvider<FM: IoManager> {
60 manager: FM,
61 storage: BTreeMap<GraphId, LinearStorage<FM::Writer>>,
62}
63
64pub struct LinearStorage<W> {
65 writer: W,
66}
67
68#[derive(Debug)]
69pub struct LinearSegment<R> {
70 repr: SegmentRepr,
71 reader: R,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75struct SegmentRepr {
76 offset: usize,
78 prior: Prior<Location>,
79 parents: Prior<Address>,
80 policy: PolicyId,
81 facts: usize,
83 commands: Vec1<CommandData>,
84 max_cut: usize,
85 skip_list: Vec<(Location, usize)>,
86}
87
88#[derive(Debug, Serialize, Deserialize)]
89struct CommandData {
90 id: CommandId,
91 priority: Priority,
92 policy: Option<Bytes>,
93 data: Bytes,
94 updates: Vec<Update>,
95}
96
97pub struct LinearCommand<'a> {
98 id: &'a CommandId,
99 parent: Prior<Address>,
100 priority: Priority,
101 policy: Option<&'a [u8]>,
102 data: &'a [u8],
103 max_cut: usize,
104}
105
106type Bytes = Box<[u8]>;
107
108type Update = (String, Keys, Option<Bytes>);
109type FactMap = BTreeMap<Keys, Option<Box<[u8]>>>;
110type NamedFactMap = BTreeMap<String, FactMap>;
111
112#[derive(Debug)]
113pub struct LinearFactIndex<R> {
114 repr: FactIndexRepr,
115 reader: R,
116}
117
118#[derive(Debug, Serialize, Deserialize)]
119struct FactIndexRepr {
120 offset: usize,
122 prior: Option<usize>,
124 depth: usize,
128 facts: NamedFactMap,
130}
131
132#[derive(Debug)]
133pub struct LinearPerspective<R> {
134 prior: Prior<Location>,
135 parents: Prior<Address>,
136 policy: PolicyId,
137 facts: LinearFactPerspective<R>,
138 commands: Vec<CommandData>,
139 current_updates: Vec<Update>,
140 max_cut: usize,
141 last_common_ancestor: Option<(Location, usize)>,
142}
143
144impl<R> LinearPerspective<R> {
145 fn new(
146 prior: Prior<Location>,
147 parents: Prior<Address>,
148 policy: PolicyId,
149 prior_facts: FactPerspectivePrior<R>,
150 max_cut: usize,
151 last_common_ancestor: Option<(Location, usize)>,
152 ) -> Self {
153 Self {
154 prior,
155 parents,
156 policy,
157 facts: LinearFactPerspective::new(prior_facts),
158 commands: Vec::new(),
159 current_updates: Vec::new(),
160 max_cut,
161 last_common_ancestor,
162 }
163 }
164}
165
166#[derive(Debug)]
167pub struct LinearFactPerspective<R> {
168 map: BTreeMap<String, BTreeMap<Keys, Option<Bytes>>>,
169 prior: FactPerspectivePrior<R>,
170}
171
172impl<R> LinearFactPerspective<R> {
173 fn new(prior: FactPerspectivePrior<R>) -> Self {
174 Self {
175 map: BTreeMap::new(),
176 prior,
177 }
178 }
179}
180
181#[derive(Debug)]
182enum FactPerspectivePrior<R> {
183 None,
184 FactPerspective(Box<LinearFactPerspective<R>>),
185 FactIndex { offset: usize, reader: R },
186}
187
188impl<R> FactPerspectivePrior<R> {
189 fn is_none(&self) -> bool {
190 matches!(self, Self::None)
191 }
192}
193
194impl<FM: IoManager + Default> Default for LinearStorageProvider<FM> {
195 fn default() -> Self {
196 Self {
197 manager: FM::default(),
198 storage: BTreeMap::new(),
199 }
200 }
201}
202
203impl<FM: IoManager> LinearStorageProvider<FM> {
204 pub fn new(manager: FM) -> Self {
205 Self {
206 manager,
207 storage: BTreeMap::new(),
208 }
209 }
210}
211
212impl<FM: IoManager> StorageProvider for LinearStorageProvider<FM> {
213 type Perspective = LinearPerspective<<FM::Writer as Write>::ReadOnly>;
214 type Segment = LinearSegment<<FM::Writer as Write>::ReadOnly>;
215 type Storage = LinearStorage<FM::Writer>;
216
217 fn new_perspective(&mut self, policy_id: PolicyId) -> Self::Perspective {
218 LinearPerspective::new(
219 Prior::None,
220 Prior::None,
221 policy_id,
222 FactPerspectivePrior::None,
223 0,
224 None,
225 )
226 }
227
228 fn new_storage(
229 &mut self,
230 init: Self::Perspective,
231 ) -> Result<(GraphId, &mut Self::Storage), StorageError> {
232 use alloc::collections::btree_map::Entry;
233
234 if init.commands.is_empty() {
235 return Err(StorageError::EmptyPerspective);
236 }
237 let graph_id = GraphId::from(init.commands[0].id.into_id());
238 let Entry::Vacant(entry) = self.storage.entry(graph_id) else {
239 return Err(StorageError::StorageExists);
240 };
241
242 let file = self.manager.create(graph_id)?;
243 Ok((graph_id, entry.insert(LinearStorage::create(file, init)?)))
244 }
245
246 fn get_storage(&mut self, graph: GraphId) -> Result<&mut Self::Storage, StorageError> {
247 use alloc::collections::btree_map::Entry;
248
249 let entry = match self.storage.entry(graph) {
250 Entry::Vacant(v) => v,
251 Entry::Occupied(o) => return Ok(o.into_mut()),
252 };
253
254 let file = self
255 .manager
256 .open(graph)?
257 .ok_or(StorageError::NoSuchStorage)?;
258 Ok(entry.insert(LinearStorage::open(file)?))
259 }
260}
261
262impl<W: Write> LinearStorage<W> {
263 fn get_skip(
264 &self,
265 segment: <LinearStorage<W> as Storage>::Segment,
266 max_cut: usize,
267 ) -> Result<Option<(Location, usize)>, StorageError> {
268 let mut head = segment;
269 let mut current = None;
270 'outer: loop {
271 if max_cut > head.longest_max_cut()? {
272 return Ok(current);
273 }
274 current = Some((head.first_location(), head.shortest_max_cut()));
275 if max_cut >= head.shortest_max_cut() {
276 return Ok(current);
277 }
278 for (skip, skip_max_cut) in head.skip_list() {
281 if skip_max_cut <= &max_cut {
282 head = self.get_segment(*skip)?;
283 continue 'outer;
284 }
285 }
286 head = match head.prior() {
287 Prior::None | Prior::Merge(_, _) => {
288 return Ok(current);
289 }
290 Prior::Single(l) => self.get_segment(l)?,
291 }
292 }
293 }
294}
295
296impl<W: Write> LinearStorage<W> {
297 fn create(mut writer: W, init: LinearPerspective<W::ReadOnly>) -> Result<Self, StorageError> {
298 assert!(matches!(init.prior, Prior::None));
299 assert!(matches!(init.parents, Prior::None));
300 assert!(matches!(init.facts.prior, FactPerspectivePrior::None));
301
302 let mut map = init.facts.map;
303 map.retain(|_, kv| !kv.is_empty());
304
305 let facts = writer
306 .append(|offset| FactIndexRepr {
307 offset,
308 prior: None,
309 depth: 1,
310 facts: map,
311 })?
312 .offset;
313
314 let commands = init
315 .commands
316 .try_into()
317 .map_err(|_| StorageError::EmptyPerspective)?;
318 let segment = writer.append(|offset| SegmentRepr {
319 offset,
320 prior: Prior::None,
321 parents: Prior::None,
322 policy: init.policy,
323 facts,
324 commands,
325 max_cut: 0,
326 skip_list: vec![],
327 })?;
328
329 let head = Location::new(
330 segment.offset,
331 segment
332 .commands
333 .len()
334 .checked_sub(1)
335 .assume("vec1 length >= 1")?,
336 );
337
338 writer.commit(head)?;
339
340 let storage = Self { writer };
341
342 Ok(storage)
343 }
344
345 fn open(writer: W) -> Result<Self, StorageError> {
346 Ok(Self { writer })
347 }
348
349 fn compact(&mut self, mut repr: FactIndexRepr) -> Result<FactIndexRepr, StorageError> {
350 let mut map = NamedFactMap::new();
351 let reader = self.writer.readonly();
352 loop {
353 for (name, kv) in repr.facts {
354 let sub = map.entry(name).or_default();
355 for (k, v) in kv {
356 sub.entry(k).or_insert(v);
357 }
358 }
359 let Some(offset) = repr.prior else { break };
360 repr = reader.fetch(offset)?;
361 }
362
363 map.retain(|_, kv| {
365 kv.retain(|_, v| v.is_some());
366 !kv.is_empty()
367 });
368
369 Ok(self
370 .write_facts(LinearFactPerspective {
371 map,
372 prior: FactPerspectivePrior::None,
373 })?
374 .repr)
375 }
376}
377
378impl<F: Write> Storage for LinearStorage<F> {
379 type Perspective = LinearPerspective<F::ReadOnly>;
380 type FactPerspective = LinearFactPerspective<F::ReadOnly>;
381 type Segment = LinearSegment<F::ReadOnly>;
382 type FactIndex = LinearFactIndex<F::ReadOnly>;
383
384 fn get_command_id(&self, location: Location) -> Result<CommandId, StorageError> {
385 let seg = self.get_segment(location)?;
386 let cmd = seg
387 .get_command(location)
388 .ok_or(StorageError::CommandOutOfBounds(location))?;
389 Ok(cmd.id())
390 }
391
392 fn get_linear_perspective(
393 &self,
394 parent: Location,
395 ) -> Result<Option<Self::Perspective>, StorageError> {
396 let segment = self.get_segment(parent)?;
397 let command = segment
398 .get_command(parent)
399 .ok_or(StorageError::CommandOutOfBounds(parent))?;
400 let policy = segment.repr.policy;
401 let prior_facts: FactPerspectivePrior<F::ReadOnly> = if parent == segment.head_location() {
402 FactPerspectivePrior::FactIndex {
403 offset: segment.repr.facts,
404 reader: self.writer.readonly(),
405 }
406 } else {
407 let prior = match segment.facts()?.repr.prior {
408 Some(offset) => FactPerspectivePrior::FactIndex {
409 offset,
410 reader: self.writer.readonly(),
411 },
412 None => FactPerspectivePrior::None,
413 };
414 let mut facts = LinearFactPerspective::new(prior);
415 for data in &segment.repr.commands[..=parent.command] {
416 facts.apply_updates(&data.updates);
417 }
418 if facts.prior.is_none() {
419 facts.map.retain(|_, kv| !kv.is_empty());
420 }
421 if facts.map.is_empty() {
422 facts.prior
423 } else {
424 FactPerspectivePrior::FactPerspective(Box::new(facts))
425 }
426 };
427 let prior = Prior::Single(parent);
428
429 let perspective = LinearPerspective::new(
430 prior,
431 Prior::Single(command.address()?),
432 policy,
433 prior_facts,
434 command
435 .max_cut()?
436 .checked_add(1)
437 .assume("must not overflow")?,
438 None,
439 );
440
441 Ok(Some(perspective))
442 }
443
444 fn get_fact_perspective(
445 &self,
446 location: Location,
447 ) -> Result<Self::FactPerspective, StorageError> {
448 let segment = self.get_segment(location)?;
449
450 if location == segment.head_location()
453 || segment
454 .repr
455 .commands
456 .iter()
457 .all(|cmd| cmd.updates.is_empty())
458 {
459 return Ok(LinearFactPerspective::new(
460 FactPerspectivePrior::FactIndex {
461 offset: segment.repr.facts,
462 reader: self.writer.readonly(),
463 },
464 ));
465 }
466
467 let prior = match segment.facts()?.repr.prior {
468 Some(offset) => FactPerspectivePrior::FactIndex {
469 offset,
470 reader: self.writer.readonly(),
471 },
472 None => FactPerspectivePrior::None,
473 };
474 let mut facts = LinearFactPerspective::new(prior);
475 for data in &segment.repr.commands[..=location.command] {
476 facts.apply_updates(&data.updates);
477 }
478
479 Ok(facts)
480 }
481
482 fn new_merge_perspective(
483 &self,
484 left: Location,
485 right: Location,
486 last_common_ancestor: (Location, usize),
487 policy_id: PolicyId,
488 braid: Self::FactIndex,
489 ) -> Result<Option<Self::Perspective>, StorageError> {
490 let left_segment = self.get_segment(left)?;
493 let left_command = left_segment
494 .get_command(left)
495 .ok_or(StorageError::CommandOutOfBounds(left))?;
496 let right_segment = self.get_segment(right)?;
497 let right_command = right_segment
498 .get_command(right)
499 .ok_or(StorageError::CommandOutOfBounds(right))?;
500
501 let parent = Prior::Merge(left_command.address()?, right_command.address()?);
502
503 if policy_id != left_segment.policy() && policy_id != right_segment.policy() {
504 return Err(StorageError::PolicyMismatch);
505 }
506
507 let prior = Prior::Merge(left, right);
508
509 let perspective = LinearPerspective::new(
510 prior,
511 parent,
512 policy_id,
513 FactPerspectivePrior::FactIndex {
514 offset: braid.repr.offset,
515 reader: braid.reader,
516 },
517 left_command
518 .max_cut()?
519 .max(right_command.max_cut()?)
520 .checked_add(1)
521 .assume("must not overflow")?,
522 Some(last_common_ancestor),
523 );
524
525 Ok(Some(perspective))
526 }
527
528 fn get_segment(&self, location: Location) -> Result<Self::Segment, StorageError> {
529 let reader = self.writer.readonly();
530 let repr = reader.fetch(location.segment)?;
531 let seg = LinearSegment { repr, reader };
532
533 Ok(seg)
534 }
535
536 fn get_head(&self) -> Result<Location, StorageError> {
537 self.writer.head()
538 }
539
540 fn commit(&mut self, segment: Self::Segment) -> Result<(), StorageError> {
541 if !self.is_ancestor(self.get_head()?, &segment)? {
542 return Err(StorageError::HeadNotAncestor);
543 }
544
545 self.writer.commit(segment.head_location())
546 }
547
548 fn write(&mut self, perspective: Self::Perspective) -> Result<Self::Segment, StorageError> {
549 let facts = self.write_facts(perspective.facts)?.repr.offset;
552
553 let commands: Vec1<CommandData> = perspective
554 .commands
555 .try_into()
556 .map_err(|_| StorageError::EmptyPerspective)?;
557
558 let get_skips =
559 |l: Location, count: usize| -> Result<Vec<(Location, usize)>, StorageError> {
560 let mut rng = &mut Rng as &mut dyn Csprng;
561 let mut skips = vec![];
562 for _ in 0..count {
563 let segment = self.get_segment(l)?;
564 let l_max_cut = segment
565 .get_command(l)
566 .assume("location must exist")?
567 .max_cut;
568 if l_max_cut > 0 {
569 let max_cut = rng.gen_range(0..l_max_cut);
570 if let Some(skip) = self.get_skip(segment, max_cut)? {
571 if !skips.contains(&skip) {
572 skips.push(skip);
573 }
574 } else {
575 break;
576 }
577 }
578 }
579 Ok(skips)
580 };
581
582 let skip_list = match perspective.prior {
583 Prior::None => vec![],
584 Prior::Merge(_, _) => {
585 let (lca, max_cut) = perspective.last_common_ancestor.assume("lca must exist")?;
586 let mut skips = get_skips(lca, 2)?;
587 if !skips.contains(&(lca, max_cut)) {
588 skips.push((lca, max_cut));
589 }
590 skips.sort();
591 skips
592 }
593 Prior::Single(l) => {
594 let mut skips = get_skips(l, 3)?;
595 skips.sort();
596 skips
597 }
598 };
599 let repr = self.writer.append(|offset| SegmentRepr {
600 offset,
601 prior: perspective.prior,
602 parents: perspective.parents,
603 policy: perspective.policy,
604 facts,
605 commands,
606 max_cut: perspective.max_cut,
607 skip_list,
608 })?;
609
610 Ok(LinearSegment {
611 repr,
612 reader: self.writer.readonly(),
613 })
614 }
615
616 fn write_facts(
617 &mut self,
618 facts: Self::FactPerspective,
619 ) -> Result<Self::FactIndex, StorageError> {
620 let mut prior = match facts.prior {
621 FactPerspectivePrior::None => None,
622 FactPerspectivePrior::FactPerspective(prior) => {
623 let prior = self.write_facts(*prior)?;
624 if facts.map.is_empty() {
625 return Ok(prior);
626 }
627 Some(prior.repr)
628 }
629 FactPerspectivePrior::FactIndex { offset, reader } => {
630 let repr = reader.fetch(offset)?;
631 if facts.map.is_empty() {
632 return Ok(LinearFactIndex { repr, reader });
633 }
634 Some(repr)
635 }
636 };
637
638 let depth = if let Some(mut p) = prior.take() {
639 if p.depth > MAX_FACT_INDEX_DEPTH - 1 {
640 p = self.compact(p)?;
641 }
642 prior.insert(p).depth
643 } else {
644 0
645 };
646
647 let depth = depth.checked_add(1).assume("depth won't overflow")?;
648
649 if depth > MAX_FACT_INDEX_DEPTH {
650 bug!("fact index too deep");
651 }
652
653 let repr = self.writer.append(|offset| FactIndexRepr {
654 offset,
655 prior: prior.map(|p| p.offset),
656 depth,
657 facts: facts.map,
658 })?;
659
660 Ok(LinearFactIndex {
661 repr,
662 reader: self.writer.readonly(),
663 })
664 }
665}
666
667impl<R: Read> Segment for LinearSegment<R> {
668 type FactIndex = LinearFactIndex<R>;
669 type Command<'a>
670 = LinearCommand<'a>
671 where
672 R: 'a;
673
674 fn head(&self) -> Result<Self::Command<'_>, StorageError> {
675 let data = self.repr.commands.last();
676 let parent = if let Some(prev) = usize::checked_sub(self.repr.commands.len(), 2) {
677 Prior::Single(Address {
678 id: self.repr.commands[prev].id,
679 max_cut: self
680 .repr
681 .max_cut
682 .checked_add(prev)
683 .assume("must not overflow")?,
684 })
685 } else {
686 self.repr.parents
687 };
688 Ok(LinearCommand {
689 id: &data.id,
690 parent,
691 priority: data.priority.clone(),
692 policy: data.policy.as_deref(),
693 data: &data.data,
694 max_cut: self
695 .repr
696 .max_cut
697 .checked_add(self.repr.commands.len())
698 .assume("must not overflow")?
699 .checked_sub(1)
700 .assume("must not overflow")?,
701 })
702 }
703
704 fn first(&self) -> Self::Command<'_> {
705 let data = self.repr.commands.first();
706 LinearCommand {
707 id: &data.id,
708 parent: self.repr.parents,
709 priority: data.priority.clone(),
710 policy: data.policy.as_deref(),
711 data: &data.data,
712 max_cut: self.repr.max_cut,
713 }
714 }
715
716 fn head_location(&self) -> Location {
717 #[allow(clippy::arithmetic_side_effects)]
719 Location::new(self.repr.offset, self.repr.commands.len() - 1)
720 }
721
722 fn first_location(&self) -> Location {
723 Location::new(self.repr.offset, 0)
724 }
725
726 fn contains(&self, location: Location) -> bool {
727 location.segment == self.repr.offset && location.command < self.repr.commands.len()
728 }
729
730 fn policy(&self) -> PolicyId {
731 self.repr.policy
732 }
733
734 fn prior(&self) -> Prior<Location> {
735 self.repr.prior
736 }
737
738 fn get_command(&self, location: Location) -> Option<Self::Command<'_>> {
739 if self.repr.offset != location.segment {
740 return None;
741 }
742 let data = self.repr.commands.get(location.command)?;
743 let parent = if let Some(prev) = usize::checked_sub(location.command, 1) {
744 if let Some(max_cut) = self.repr.max_cut.checked_add(prev) {
745 Prior::Single(Address {
746 id: self.repr.commands[prev].id,
747 max_cut,
748 })
749 } else {
750 return None;
751 }
752 } else {
753 self.repr.parents
754 };
755 self.repr
756 .max_cut
757 .checked_add(location.command)
758 .map(|max_cut| LinearCommand {
759 id: &data.id,
760 parent,
761 priority: data.priority.clone(),
762 policy: data.policy.as_deref(),
763 data: &data.data,
764 max_cut,
765 })
766 }
767
768 fn get_from(&self, location: Location) -> Vec<Self::Command<'_>> {
769 if self.repr.offset != location.segment {
770 return Vec::new();
772 }
773
774 (location.command..self.repr.commands.len())
776 .map(|c| Location::new(location.segment, c))
777 .map(|loc| {
778 self.get_command(loc)
779 .expect("constructed location is valid")
780 })
781 .collect()
782 }
783
784 fn get_from_max_cut(&self, max_cut: usize) -> Result<Option<Location>, StorageError> {
785 if max_cut >= self.repr.max_cut
786 && max_cut
787 <= self
788 .repr
789 .max_cut
790 .checked_add(self.repr.commands.len())
791 .assume("must not overflow")?
792 {
793 return Ok(Some(Location::new(
794 self.repr.offset,
795 max_cut
796 .checked_sub(self.repr.max_cut)
797 .assume("must not overflow")?,
798 )));
799 }
800 Ok(None)
801 }
802
803 fn facts(&self) -> Result<Self::FactIndex, StorageError> {
804 Ok(LinearFactIndex {
805 repr: self.reader.fetch(self.repr.facts)?,
806 reader: self.reader.clone(),
807 })
808 }
809
810 fn skip_list(&self) -> &[(Location, usize)] {
811 &self.repr.skip_list
812 }
813
814 fn shortest_max_cut(&self) -> usize {
815 self.repr.max_cut
816 }
817
818 fn longest_max_cut(&self) -> Result<usize, StorageError> {
819 Ok(self
820 .repr
821 .max_cut
822 .checked_add(self.repr.commands.len())
823 .assume("must not overflow")?
824 .checked_sub(1)
825 .assume("must not overflow")?)
826 }
827}
828
829impl<R: Read> FactIndex for LinearFactIndex<R> {}
830
831type MapIter = alloc::collections::btree_map::IntoIter<Keys, Option<Bytes>>;
832pub struct QueryIterator {
833 it: MapIter,
834}
835
836impl QueryIterator {
837 fn new(it: MapIter) -> Self {
838 Self { it }
839 }
840}
841
842impl Iterator for QueryIterator {
843 type Item = Result<Fact, StorageError>;
844 fn next(&mut self) -> Option<Self::Item> {
845 loop {
846 if let (key, Some(value)) = self.it.next()? {
848 return Some(Ok(Fact { key, value }));
849 }
850 }
851 }
852}
853
854impl<R: Read> Query for LinearFactIndex<R> {
855 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
856 let mut prior = Some(&self.repr);
857 let mut slot; while let Some(facts) = prior {
859 if let Some(v) = facts.facts.get(name).and_then(|m| m.get(keys)) {
860 return Ok(v.as_ref().cloned());
861 }
862 slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
863 prior = slot.as_ref();
864 }
865 Ok(None)
866 }
867
868 type QueryIterator = QueryIterator;
869 fn query_prefix(
870 &self,
871 name: &str,
872 prefix: &[Box<[u8]>],
873 ) -> Result<QueryIterator, StorageError> {
874 Ok(QueryIterator::new(
875 self.query_prefix_inner(name, prefix)?.into_iter(),
876 ))
877 }
878}
879
880impl<R: Read> LinearFactIndex<R> {
881 fn query_prefix_inner(
882 &self,
883 name: &str,
884 prefix: &[Box<[u8]>],
885 ) -> Result<FactMap, StorageError> {
886 let mut matches = BTreeMap::new();
887 let mut prior = Some(&self.repr);
888 let mut slot; while let Some(facts) = prior {
890 if let Some(map) = facts.facts.get(name) {
891 for (k, v) in super::memory::find_prefixes(map, prefix) {
892 if !matches.contains_key(k) {
894 matches.insert(k.clone(), v.map(Into::into));
895 }
896 }
897 }
898 slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
899 prior = slot.as_ref();
900 }
901 Ok(matches)
902 }
903}
904
905impl<R> LinearFactPerspective<R> {
906 fn clear(&mut self) {
907 self.map.clear();
908 }
909
910 fn apply_updates(&mut self, updates: &[Update]) {
911 for (name, key, value) in updates {
912 if self.prior.is_none() {
913 if let Some(value) = value {
914 self.map
915 .entry(name.clone())
916 .or_default()
917 .insert(key.clone(), Some(value.clone()));
918 } else if let Some(e) = self.map.get_mut(name) {
919 e.remove(key);
920 }
921 } else {
922 self.map
923 .entry(name.clone())
924 .or_default()
925 .insert(key.clone(), value.clone());
926 }
927 }
928 }
929}
930
931impl<R: Read> FactPerspective for LinearFactPerspective<R> {}
932
933impl<R: Read> Query for LinearFactPerspective<R> {
934 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
935 if let Some(wrapped) = self.map.get(name).and_then(|m| m.get(keys)) {
936 return Ok(wrapped.as_deref().map(Box::from));
937 }
938 match &self.prior {
939 FactPerspectivePrior::None => Ok(None),
940 FactPerspectivePrior::FactPerspective(prior) => prior.query(name, keys),
941 FactPerspectivePrior::FactIndex { offset, reader } => {
942 let repr: FactIndexRepr = reader.fetch(*offset)?;
943 let prior = LinearFactIndex {
944 repr,
945 reader: reader.clone(),
946 };
947 prior.query(name, keys)
948 }
949 }
950 }
951
952 type QueryIterator = QueryIterator;
953 fn query_prefix(
954 &self,
955 name: &str,
956 prefix: &[Box<[u8]>],
957 ) -> Result<QueryIterator, StorageError> {
958 Ok(QueryIterator::new(
959 self.query_prefix_inner(name, prefix)?.into_iter(),
960 ))
961 }
962}
963
964impl<R: Read> LinearFactPerspective<R> {
965 fn query_prefix_inner(
966 &self,
967 name: &str,
968 prefix: &[Box<[u8]>],
969 ) -> Result<FactMap, StorageError> {
970 let mut matches = match &self.prior {
971 FactPerspectivePrior::None => BTreeMap::new(),
972 FactPerspectivePrior::FactPerspective(prior) => {
973 prior.query_prefix_inner(name, prefix)?
974 }
975 FactPerspectivePrior::FactIndex { offset, reader } => {
976 let repr: FactIndexRepr = reader.fetch(*offset)?;
977 let prior = LinearFactIndex {
978 repr,
979 reader: reader.clone(),
980 };
981 prior.query_prefix_inner(name, prefix)?
982 }
983 };
984 if let Some(map) = self.map.get(name) {
985 for (k, v) in super::memory::find_prefixes(map, prefix) {
986 matches.insert(k.clone(), v.map(Into::into));
988 }
989 }
990 Ok(matches)
991 }
992}
993
994impl<R: Read> QueryMut for LinearFactPerspective<R> {
995 fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
996 self.map.entry(name).or_default().insert(keys, Some(value));
997 }
998
999 fn delete(&mut self, name: String, keys: Keys) {
1000 if self.prior.is_none() {
1001 if let Some(kv) = self.map.get_mut(&name) {
1003 kv.remove(&keys);
1004 }
1005 } else {
1006 self.map.entry(name).or_default().insert(keys, None);
1007 }
1008 }
1009}
1010
1011impl<R: Read> FactPerspective for LinearPerspective<R> {}
1012
1013impl<R: Read> Query for LinearPerspective<R> {
1014 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
1015 self.facts.query(name, keys)
1016 }
1017
1018 type QueryIterator = QueryIterator;
1019 fn query_prefix(
1020 &self,
1021 name: &str,
1022 prefix: &[Box<[u8]>],
1023 ) -> Result<QueryIterator, StorageError> {
1024 self.facts.query_prefix(name, prefix)
1025 }
1026}
1027
1028impl<R: Read> QueryMut for LinearPerspective<R> {
1029 fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1030 self.facts.insert(name.clone(), keys.clone(), value.clone());
1031 self.current_updates.push((name, keys, Some(value)));
1032 }
1033
1034 fn delete(&mut self, name: String, keys: Keys) {
1035 self.facts.delete(name.clone(), keys.clone());
1036 self.current_updates.push((name, keys, None))
1037 }
1038}
1039
1040impl<R: Read> Revertable for LinearPerspective<R> {
1041 fn checkpoint(&self) -> Checkpoint {
1042 Checkpoint {
1043 index: self.commands.len(),
1044 }
1045 }
1046
1047 fn revert(&mut self, checkpoint: Checkpoint) -> Result<(), Bug> {
1048 if checkpoint.index == self.commands.len() {
1049 return Ok(());
1050 }
1051
1052 if checkpoint.index > self.commands.len() {
1053 bug!("A checkpoint's index should always be less than or equal to the length of a perspective's command history!");
1054 }
1055
1056 self.commands.truncate(checkpoint.index);
1057 self.facts.clear();
1058 self.current_updates.clear();
1059 for data in &self.commands {
1060 self.facts.apply_updates(&data.updates);
1061 }
1062
1063 Ok(())
1064 }
1065}
1066
1067impl<R: Read> Perspective for LinearPerspective<R> {
1068 fn policy(&self) -> PolicyId {
1069 self.policy
1070 }
1071
1072 fn add_command(&mut self, command: &impl Command) -> Result<usize, StorageError> {
1073 if command.parent() != self.head_address()? {
1074 return Err(StorageError::PerspectiveHeadMismatch);
1075 }
1076
1077 self.commands.push(CommandData {
1078 id: command.id(),
1079 priority: command.priority(),
1080 policy: command.policy().map(Box::from),
1081 data: command.bytes().into(),
1082 updates: core::mem::take(&mut self.current_updates),
1083 });
1084 Ok(self.commands.len())
1085 }
1086
1087 fn includes(&self, id: CommandId) -> bool {
1088 self.commands.iter().any(|cmd| cmd.id == id)
1089 }
1090
1091 fn head_address(&self) -> Result<Prior<Address>, Bug> {
1092 Ok(if let Some(last) = self.commands.last() {
1093 Prior::Single(Address {
1094 id: last.id,
1095 max_cut: self
1096 .max_cut
1097 .checked_add(self.commands.len())
1098 .assume("must not overflow")?
1099 .checked_sub(1)
1100 .assume("must not overflow")?,
1101 })
1102 } else {
1103 self.parents
1104 })
1105 }
1106}
1107
1108impl From<Prior<Address>> for Prior<CommandId> {
1109 fn from(p: Prior<Address>) -> Self {
1110 match p {
1111 Prior::None => Prior::None,
1112 Prior::Single(l) => Prior::Single(l.id),
1113 Prior::Merge(l, r) => Prior::Merge(l.id, r.id),
1114 }
1115 }
1116}
1117
1118impl Command for LinearCommand<'_> {
1119 fn priority(&self) -> Priority {
1120 self.priority.clone()
1121 }
1122
1123 fn id(&self) -> CommandId {
1124 *self.id
1125 }
1126
1127 fn parent(&self) -> Prior<Address> {
1128 self.parent
1129 }
1130
1131 fn policy(&self) -> Option<&[u8]> {
1132 self.policy
1133 }
1134
1135 fn bytes(&self) -> &[u8] {
1136 self.data
1137 }
1138
1139 fn max_cut(&self) -> Result<usize, Bug> {
1140 Ok(self.max_cut)
1141 }
1142}
1143
1144#[cfg(test)]
1145mod test {
1146 use testing::Manager;
1147
1148 use super::*;
1149 use crate::testing::dsl::{test_suite, StorageBackend};
1150
1151 #[test]
1152 fn test_query_prefix() {
1153 let mut provider = LinearStorageProvider::new(Manager);
1154 let mut fp = provider.new_perspective(PolicyId::new(0));
1155
1156 let name = "x";
1157
1158 let keys: &[&[&str]] = &[
1159 &["aa", "xy", "123"],
1160 &["aa", "xz", "123"],
1161 &["bb", "ccc"],
1162 &["bc", ""],
1163 ];
1164 let keys: Vec<Keys> = keys
1165 .iter()
1166 .map(|ks| ks.iter().map(|k| k.as_bytes()).collect())
1167 .collect();
1168
1169 for ks in &keys {
1170 fp.insert(
1171 name.into(),
1172 ks.clone(),
1173 format!("{ks:?}").into_bytes().into(),
1174 );
1175 }
1176
1177 let prefixes: &[&[&str]] = &[
1178 &["aa", "xy", "12"],
1179 &["aa", "xy"],
1180 &["aa", "xz"],
1181 &["aa", "x"],
1182 &["bb", ""],
1183 &["bb", "ccc"],
1184 &["bc", ""],
1185 &["bc", "", ""],
1186 ];
1187
1188 for prefix in prefixes {
1189 let prefix: Keys = prefix.iter().map(|k| k.as_bytes()).collect();
1190 let found: Vec<_> = fp.query_prefix(name, &prefix).unwrap().collect();
1191 let mut expected: Vec<_> = keys.iter().filter(|k| k.starts_with(&prefix)).collect();
1192 expected.sort();
1193 assert_eq!(found.len(), expected.len());
1194 for (a, b) in std::iter::zip(found, expected) {
1195 let a = a.unwrap();
1196 assert_eq!(&a.key, b);
1197 assert_eq!(a.value.as_ref(), format!("{b:?}").as_bytes());
1198 }
1199 }
1200 }
1201
1202 struct LinearBackend;
1203 impl StorageBackend for LinearBackend {
1204 type StorageProvider = LinearStorageProvider<Manager>;
1205
1206 fn provider(&mut self, _client_id: u64) -> Self::StorageProvider {
1207 LinearStorageProvider::new(Manager)
1208 }
1209 }
1210 test_suite!(|| LinearBackend);
1211}