1pub mod libc;
26
27#[cfg(feature = "testing")]
28pub mod testing;
29
30use alloc::{boxed::Box, collections::BTreeMap, string::String, vec, vec::Vec};
31
32use aranya_crypto::{csprng::rand::Rng as _, Csprng, Rng};
33use buggy::{bug, Bug, BugExt};
34use serde::{Deserialize, Serialize};
35use vec1::Vec1;
36
37use crate::{
38 Address, Checkpoint, Command, CommandId, Fact, FactIndex, FactPerspective, GraphId, Keys,
39 Location, Perspective, PolicyId, Prior, Priority, Query, QueryMut, Revertable, Segment,
40 Storage, StorageError, StorageProvider,
41};
42
43pub mod io;
44pub use io::*;
45
46const MAX_FACT_INDEX_DEPTH: usize = 16;
58
59pub struct LinearStorageProvider<FM: IoManager> {
60 manager: FM,
61 storage: BTreeMap<GraphId, LinearStorage<FM::Writer>>,
62}
63
64pub struct LinearStorage<W> {
65 writer: W,
66}
67
68#[derive(Debug)]
69pub struct LinearSegment<R> {
70 repr: SegmentRepr,
71 reader: R,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75struct SegmentRepr {
76 offset: usize,
78 prior: Prior<Location>,
79 parents: Prior<Address>,
80 policy: PolicyId,
81 facts: usize,
83 commands: Vec1<CommandData>,
84 max_cut: usize,
85 skip_list: Vec<(Location, usize)>,
86}
87
88#[derive(Debug, Serialize, Deserialize)]
89struct CommandData {
90 id: CommandId,
91 priority: Priority,
92 policy: Option<Bytes>,
93 data: Bytes,
94 updates: Vec<Update>,
95}
96
97pub struct LinearCommand<'a> {
98 id: &'a CommandId,
99 parent: Prior<Address>,
100 priority: Priority,
101 policy: Option<&'a [u8]>,
102 data: &'a [u8],
103 max_cut: usize,
104}
105
106type Bytes = Box<[u8]>;
107
108type Update = (String, Keys, Option<Bytes>);
109type FactMap = BTreeMap<Keys, Option<Box<[u8]>>>;
110type NamedFactMap = BTreeMap<String, FactMap>;
111
112#[derive(Debug)]
113pub struct LinearFactIndex<R> {
114 repr: FactIndexRepr,
115 reader: R,
116}
117
118#[derive(Debug, Serialize, Deserialize)]
119struct FactIndexRepr {
120 offset: usize,
122 prior: Option<usize>,
124 depth: usize,
128 facts: NamedFactMap,
130}
131
132#[derive(Debug)]
133pub struct LinearPerspective<R> {
134 prior: Prior<Location>,
135 parents: Prior<Address>,
136 policy: PolicyId,
137 facts: LinearFactPerspective<R>,
138 commands: Vec<CommandData>,
139 current_updates: Vec<Update>,
140 max_cut: usize,
141 last_common_ancestor: Option<(Location, usize)>,
142}
143
144impl<R> LinearPerspective<R> {
145 fn new(
146 prior: Prior<Location>,
147 parents: Prior<Address>,
148 policy: PolicyId,
149 prior_facts: FactPerspectivePrior<R>,
150 max_cut: usize,
151 last_common_ancestor: Option<(Location, usize)>,
152 ) -> Self {
153 Self {
154 prior,
155 parents,
156 policy,
157 facts: LinearFactPerspective::new(prior_facts),
158 commands: Vec::new(),
159 current_updates: Vec::new(),
160 max_cut,
161 last_common_ancestor,
162 }
163 }
164}
165
166#[derive(Debug)]
167pub struct LinearFactPerspective<R> {
168 map: BTreeMap<String, BTreeMap<Keys, Option<Bytes>>>,
169 prior: FactPerspectivePrior<R>,
170}
171
172impl<R> LinearFactPerspective<R> {
173 fn new(prior: FactPerspectivePrior<R>) -> Self {
174 Self {
175 map: BTreeMap::new(),
176 prior,
177 }
178 }
179}
180
181#[derive(Debug)]
182enum FactPerspectivePrior<R> {
183 None,
184 FactPerspective(Box<LinearFactPerspective<R>>),
185 FactIndex { offset: usize, reader: R },
186}
187
188impl<R> FactPerspectivePrior<R> {
189 fn is_none(&self) -> bool {
190 matches!(self, Self::None)
191 }
192}
193
194impl<FM: IoManager + Default> Default for LinearStorageProvider<FM> {
195 fn default() -> Self {
196 Self {
197 manager: FM::default(),
198 storage: BTreeMap::new(),
199 }
200 }
201}
202
203impl<FM: IoManager> LinearStorageProvider<FM> {
204 pub fn new(manager: FM) -> Self {
205 Self {
206 manager,
207 storage: BTreeMap::new(),
208 }
209 }
210}
211
212impl<FM: IoManager> StorageProvider for LinearStorageProvider<FM> {
213 type Perspective = LinearPerspective<<FM::Writer as Write>::ReadOnly>;
214 type Segment = LinearSegment<<FM::Writer as Write>::ReadOnly>;
215 type Storage = LinearStorage<FM::Writer>;
216
217 fn new_perspective(&mut self, policy_id: PolicyId) -> Self::Perspective {
218 LinearPerspective::new(
219 Prior::None,
220 Prior::None,
221 policy_id,
222 FactPerspectivePrior::None,
223 0,
224 None,
225 )
226 }
227
228 fn new_storage(
229 &mut self,
230 init: Self::Perspective,
231 ) -> Result<(GraphId, &mut Self::Storage), StorageError> {
232 use alloc::collections::btree_map::Entry;
233
234 if init.commands.is_empty() {
235 return Err(StorageError::EmptyPerspective);
236 }
237 let graph_id = GraphId::from(init.commands[0].id.into_id());
238 let Entry::Vacant(entry) = self.storage.entry(graph_id) else {
239 return Err(StorageError::StorageExists);
240 };
241
242 let file = self.manager.create(graph_id)?;
243 Ok((graph_id, entry.insert(LinearStorage::create(file, init)?)))
244 }
245
246 fn get_storage(&mut self, graph: GraphId) -> Result<&mut Self::Storage, StorageError> {
247 use alloc::collections::btree_map::Entry;
248
249 let entry = match self.storage.entry(graph) {
250 Entry::Vacant(v) => v,
251 Entry::Occupied(o) => return Ok(o.into_mut()),
252 };
253
254 let file = self
255 .manager
256 .open(graph)?
257 .ok_or(StorageError::NoSuchStorage)?;
258 Ok(entry.insert(LinearStorage::open(file)?))
259 }
260
261 fn list_graph_ids(
262 &mut self,
263 ) -> Result<impl Iterator<Item = Result<GraphId, StorageError>>, StorageError> {
264 self.manager.list()
265 }
266}
267
268impl<W: Write> LinearStorage<W> {
269 fn get_skip(
270 &self,
271 segment: <LinearStorage<W> as Storage>::Segment,
272 max_cut: usize,
273 ) -> Result<Option<(Location, usize)>, StorageError> {
274 let mut head = segment;
275 let mut current = None;
276 'outer: loop {
277 if max_cut > head.longest_max_cut()? {
278 return Ok(current);
279 }
280 current = Some((head.first_location(), head.shortest_max_cut()));
281 if max_cut >= head.shortest_max_cut() {
282 return Ok(current);
283 }
284 for (skip, skip_max_cut) in head.skip_list() {
287 if skip_max_cut <= &max_cut {
288 head = self.get_segment(*skip)?;
289 continue 'outer;
290 }
291 }
292 head = match head.prior() {
293 Prior::None | Prior::Merge(_, _) => {
294 return Ok(current);
295 }
296 Prior::Single(l) => self.get_segment(l)?,
297 }
298 }
299 }
300}
301
302impl<W: Write> LinearStorage<W> {
303 fn create(mut writer: W, init: LinearPerspective<W::ReadOnly>) -> Result<Self, StorageError> {
304 assert!(matches!(init.prior, Prior::None));
305 assert!(matches!(init.parents, Prior::None));
306 assert!(matches!(init.facts.prior, FactPerspectivePrior::None));
307
308 let mut map = init.facts.map;
309 map.retain(|_, kv| !kv.is_empty());
310
311 let facts = writer
312 .append(|offset| FactIndexRepr {
313 offset,
314 prior: None,
315 depth: 1,
316 facts: map,
317 })?
318 .offset;
319
320 let commands = init
321 .commands
322 .try_into()
323 .map_err(|_| StorageError::EmptyPerspective)?;
324 let segment = writer.append(|offset| SegmentRepr {
325 offset,
326 prior: Prior::None,
327 parents: Prior::None,
328 policy: init.policy,
329 facts,
330 commands,
331 max_cut: 0,
332 skip_list: vec![],
333 })?;
334
335 let head = Location::new(
336 segment.offset,
337 segment
338 .commands
339 .len()
340 .checked_sub(1)
341 .assume("vec1 length >= 1")?,
342 );
343
344 writer.commit(head)?;
345
346 let storage = Self { writer };
347
348 Ok(storage)
349 }
350
351 fn open(writer: W) -> Result<Self, StorageError> {
352 Ok(Self { writer })
353 }
354
355 fn compact(&mut self, mut repr: FactIndexRepr) -> Result<FactIndexRepr, StorageError> {
356 let mut map = NamedFactMap::new();
357 let reader = self.writer.readonly();
358 loop {
359 for (name, kv) in repr.facts {
360 let sub = map.entry(name).or_default();
361 for (k, v) in kv {
362 sub.entry(k).or_insert(v);
363 }
364 }
365 let Some(offset) = repr.prior else { break };
366 repr = reader.fetch(offset)?;
367 }
368
369 map.retain(|_, kv| {
371 kv.retain(|_, v| v.is_some());
372 !kv.is_empty()
373 });
374
375 Ok(self
376 .write_facts(LinearFactPerspective {
377 map,
378 prior: FactPerspectivePrior::None,
379 })?
380 .repr)
381 }
382}
383
384impl<F: Write> Storage for LinearStorage<F> {
385 type Perspective = LinearPerspective<F::ReadOnly>;
386 type FactPerspective = LinearFactPerspective<F::ReadOnly>;
387 type Segment = LinearSegment<F::ReadOnly>;
388 type FactIndex = LinearFactIndex<F::ReadOnly>;
389
390 fn get_command_id(&self, location: Location) -> Result<CommandId, StorageError> {
391 let seg = self.get_segment(location)?;
392 let cmd = seg
393 .get_command(location)
394 .ok_or(StorageError::CommandOutOfBounds(location))?;
395 Ok(cmd.id())
396 }
397
398 fn get_linear_perspective(
399 &self,
400 parent: Location,
401 ) -> Result<Option<Self::Perspective>, StorageError> {
402 let segment = self.get_segment(parent)?;
403 let command = segment
404 .get_command(parent)
405 .ok_or(StorageError::CommandOutOfBounds(parent))?;
406 let policy = segment.repr.policy;
407 let prior_facts: FactPerspectivePrior<F::ReadOnly> = if parent == segment.head_location() {
408 FactPerspectivePrior::FactIndex {
409 offset: segment.repr.facts,
410 reader: self.writer.readonly(),
411 }
412 } else {
413 let prior = match segment.facts()?.repr.prior {
414 Some(offset) => FactPerspectivePrior::FactIndex {
415 offset,
416 reader: self.writer.readonly(),
417 },
418 None => FactPerspectivePrior::None,
419 };
420 let mut facts = LinearFactPerspective::new(prior);
421 for data in &segment.repr.commands[..=parent.command] {
422 facts.apply_updates(&data.updates);
423 }
424 if facts.prior.is_none() {
425 facts.map.retain(|_, kv| !kv.is_empty());
426 }
427 if facts.map.is_empty() {
428 facts.prior
429 } else {
430 FactPerspectivePrior::FactPerspective(Box::new(facts))
431 }
432 };
433 let prior = Prior::Single(parent);
434
435 let perspective = LinearPerspective::new(
436 prior,
437 Prior::Single(command.address()?),
438 policy,
439 prior_facts,
440 command
441 .max_cut()?
442 .checked_add(1)
443 .assume("must not overflow")?,
444 None,
445 );
446
447 Ok(Some(perspective))
448 }
449
450 fn get_fact_perspective(
451 &self,
452 location: Location,
453 ) -> Result<Self::FactPerspective, StorageError> {
454 let segment = self.get_segment(location)?;
455
456 if location == segment.head_location()
459 || segment
460 .repr
461 .commands
462 .iter()
463 .all(|cmd| cmd.updates.is_empty())
464 {
465 return Ok(LinearFactPerspective::new(
466 FactPerspectivePrior::FactIndex {
467 offset: segment.repr.facts,
468 reader: self.writer.readonly(),
469 },
470 ));
471 }
472
473 let prior = match segment.facts()?.repr.prior {
474 Some(offset) => FactPerspectivePrior::FactIndex {
475 offset,
476 reader: self.writer.readonly(),
477 },
478 None => FactPerspectivePrior::None,
479 };
480 let mut facts = LinearFactPerspective::new(prior);
481 for data in &segment.repr.commands[..=location.command] {
482 facts.apply_updates(&data.updates);
483 }
484
485 Ok(facts)
486 }
487
488 fn new_merge_perspective(
489 &self,
490 left: Location,
491 right: Location,
492 last_common_ancestor: (Location, usize),
493 policy_id: PolicyId,
494 braid: Self::FactIndex,
495 ) -> Result<Option<Self::Perspective>, StorageError> {
496 let left_segment = self.get_segment(left)?;
499 let left_command = left_segment
500 .get_command(left)
501 .ok_or(StorageError::CommandOutOfBounds(left))?;
502 let right_segment = self.get_segment(right)?;
503 let right_command = right_segment
504 .get_command(right)
505 .ok_or(StorageError::CommandOutOfBounds(right))?;
506
507 let parent = Prior::Merge(left_command.address()?, right_command.address()?);
508
509 if policy_id != left_segment.policy() && policy_id != right_segment.policy() {
510 return Err(StorageError::PolicyMismatch);
511 }
512
513 let prior = Prior::Merge(left, right);
514
515 let perspective = LinearPerspective::new(
516 prior,
517 parent,
518 policy_id,
519 FactPerspectivePrior::FactIndex {
520 offset: braid.repr.offset,
521 reader: braid.reader,
522 },
523 left_command
524 .max_cut()?
525 .max(right_command.max_cut()?)
526 .checked_add(1)
527 .assume("must not overflow")?,
528 Some(last_common_ancestor),
529 );
530
531 Ok(Some(perspective))
532 }
533
534 fn get_segment(&self, location: Location) -> Result<Self::Segment, StorageError> {
535 let reader = self.writer.readonly();
536 let repr = reader.fetch(location.segment)?;
537 let seg = LinearSegment { repr, reader };
538
539 Ok(seg)
540 }
541
542 fn get_head(&self) -> Result<Location, StorageError> {
543 self.writer.head()
544 }
545
546 fn commit(&mut self, segment: Self::Segment) -> Result<(), StorageError> {
547 if !self.is_ancestor(self.get_head()?, &segment)? {
548 return Err(StorageError::HeadNotAncestor);
549 }
550
551 self.writer.commit(segment.head_location())
552 }
553
554 fn write(&mut self, perspective: Self::Perspective) -> Result<Self::Segment, StorageError> {
555 let facts = self.write_facts(perspective.facts)?.repr.offset;
558
559 let commands: Vec1<CommandData> = perspective
560 .commands
561 .try_into()
562 .map_err(|_| StorageError::EmptyPerspective)?;
563
564 let get_skips =
565 |l: Location, count: usize| -> Result<Vec<(Location, usize)>, StorageError> {
566 let mut rng = &mut Rng as &mut dyn Csprng;
567 let mut skips = vec![];
568 for _ in 0..count {
569 let segment = self.get_segment(l)?;
570 let l_max_cut = segment
571 .get_command(l)
572 .assume("location must exist")?
573 .max_cut;
574 if l_max_cut > 0 {
575 let max_cut = rng.gen_range(0..l_max_cut);
576 if let Some(skip) = self.get_skip(segment, max_cut)? {
577 if !skips.contains(&skip) {
578 skips.push(skip);
579 }
580 } else {
581 break;
582 }
583 }
584 }
585 Ok(skips)
586 };
587
588 let skip_list = match perspective.prior {
589 Prior::None => vec![],
590 Prior::Merge(_, _) => {
591 let (lca, max_cut) = perspective.last_common_ancestor.assume("lca must exist")?;
592 let mut skips = get_skips(lca, 2)?;
593 if !skips.contains(&(lca, max_cut)) {
594 skips.push((lca, max_cut));
595 }
596 skips.sort();
597 skips
598 }
599 Prior::Single(l) => {
600 let mut skips = get_skips(l, 3)?;
601 skips.sort();
602 skips
603 }
604 };
605 let repr = self.writer.append(|offset| SegmentRepr {
606 offset,
607 prior: perspective.prior,
608 parents: perspective.parents,
609 policy: perspective.policy,
610 facts,
611 commands,
612 max_cut: perspective.max_cut,
613 skip_list,
614 })?;
615
616 Ok(LinearSegment {
617 repr,
618 reader: self.writer.readonly(),
619 })
620 }
621
622 fn write_facts(
623 &mut self,
624 facts: Self::FactPerspective,
625 ) -> Result<Self::FactIndex, StorageError> {
626 let mut prior = match facts.prior {
627 FactPerspectivePrior::None => None,
628 FactPerspectivePrior::FactPerspective(prior) => {
629 let prior = self.write_facts(*prior)?;
630 if facts.map.is_empty() {
631 return Ok(prior);
632 }
633 Some(prior.repr)
634 }
635 FactPerspectivePrior::FactIndex { offset, reader } => {
636 let repr = reader.fetch(offset)?;
637 if facts.map.is_empty() {
638 return Ok(LinearFactIndex { repr, reader });
639 }
640 Some(repr)
641 }
642 };
643
644 let depth = if let Some(mut p) = prior.take() {
645 if p.depth > MAX_FACT_INDEX_DEPTH - 1 {
646 p = self.compact(p)?;
647 }
648 prior.insert(p).depth
649 } else {
650 0
651 };
652
653 let depth = depth.checked_add(1).assume("depth won't overflow")?;
654
655 if depth > MAX_FACT_INDEX_DEPTH {
656 bug!("fact index too deep");
657 }
658
659 let repr = self.writer.append(|offset| FactIndexRepr {
660 offset,
661 prior: prior.map(|p| p.offset),
662 depth,
663 facts: facts.map,
664 })?;
665
666 Ok(LinearFactIndex {
667 repr,
668 reader: self.writer.readonly(),
669 })
670 }
671}
672
673impl<R: Read> Segment for LinearSegment<R> {
674 type FactIndex = LinearFactIndex<R>;
675 type Command<'a>
676 = LinearCommand<'a>
677 where
678 R: 'a;
679
680 fn head(&self) -> Result<Self::Command<'_>, StorageError> {
681 let data = self.repr.commands.last();
682 let parent = if let Some(prev) = usize::checked_sub(self.repr.commands.len(), 2) {
683 Prior::Single(Address {
684 id: self.repr.commands[prev].id,
685 max_cut: self
686 .repr
687 .max_cut
688 .checked_add(prev)
689 .assume("must not overflow")?,
690 })
691 } else {
692 self.repr.parents
693 };
694 Ok(LinearCommand {
695 id: &data.id,
696 parent,
697 priority: data.priority.clone(),
698 policy: data.policy.as_deref(),
699 data: &data.data,
700 max_cut: self
701 .repr
702 .max_cut
703 .checked_add(self.repr.commands.len())
704 .assume("must not overflow")?
705 .checked_sub(1)
706 .assume("must not overflow")?,
707 })
708 }
709
710 fn first(&self) -> Self::Command<'_> {
711 let data = self.repr.commands.first();
712 LinearCommand {
713 id: &data.id,
714 parent: self.repr.parents,
715 priority: data.priority.clone(),
716 policy: data.policy.as_deref(),
717 data: &data.data,
718 max_cut: self.repr.max_cut,
719 }
720 }
721
722 fn head_location(&self) -> Location {
723 #[allow(clippy::arithmetic_side_effects)]
725 Location::new(self.repr.offset, self.repr.commands.len() - 1)
726 }
727
728 fn first_location(&self) -> Location {
729 Location::new(self.repr.offset, 0)
730 }
731
732 fn contains(&self, location: Location) -> bool {
733 location.segment == self.repr.offset && location.command < self.repr.commands.len()
734 }
735
736 fn policy(&self) -> PolicyId {
737 self.repr.policy
738 }
739
740 fn prior(&self) -> Prior<Location> {
741 self.repr.prior
742 }
743
744 fn get_command(&self, location: Location) -> Option<Self::Command<'_>> {
745 if self.repr.offset != location.segment {
746 return None;
747 }
748 let data = self.repr.commands.get(location.command)?;
749 let parent = if let Some(prev) = usize::checked_sub(location.command, 1) {
750 if let Some(max_cut) = self.repr.max_cut.checked_add(prev) {
751 Prior::Single(Address {
752 id: self.repr.commands[prev].id,
753 max_cut,
754 })
755 } else {
756 return None;
757 }
758 } else {
759 self.repr.parents
760 };
761 self.repr
762 .max_cut
763 .checked_add(location.command)
764 .map(|max_cut| LinearCommand {
765 id: &data.id,
766 parent,
767 priority: data.priority.clone(),
768 policy: data.policy.as_deref(),
769 data: &data.data,
770 max_cut,
771 })
772 }
773
774 fn get_from(&self, location: Location) -> Vec<Self::Command<'_>> {
775 if self.repr.offset != location.segment {
776 return Vec::new();
778 }
779
780 (location.command..self.repr.commands.len())
782 .map(|c| Location::new(location.segment, c))
783 .map(|loc| {
784 self.get_command(loc)
785 .expect("constructed location is valid")
786 })
787 .collect()
788 }
789
790 fn get_from_max_cut(&self, max_cut: usize) -> Result<Option<Location>, StorageError> {
791 if max_cut >= self.repr.max_cut
792 && max_cut
793 <= self
794 .repr
795 .max_cut
796 .checked_add(self.repr.commands.len())
797 .assume("must not overflow")?
798 {
799 return Ok(Some(Location::new(
800 self.repr.offset,
801 max_cut
802 .checked_sub(self.repr.max_cut)
803 .assume("must not overflow")?,
804 )));
805 }
806 Ok(None)
807 }
808
809 fn facts(&self) -> Result<Self::FactIndex, StorageError> {
810 Ok(LinearFactIndex {
811 repr: self.reader.fetch(self.repr.facts)?,
812 reader: self.reader.clone(),
813 })
814 }
815
816 fn skip_list(&self) -> &[(Location, usize)] {
817 &self.repr.skip_list
818 }
819
820 fn shortest_max_cut(&self) -> usize {
821 self.repr.max_cut
822 }
823
824 fn longest_max_cut(&self) -> Result<usize, StorageError> {
825 Ok(self
826 .repr
827 .max_cut
828 .checked_add(self.repr.commands.len())
829 .assume("must not overflow")?
830 .checked_sub(1)
831 .assume("must not overflow")?)
832 }
833}
834
835impl<R: Read> FactIndex for LinearFactIndex<R> {}
836
837type MapIter = alloc::collections::btree_map::IntoIter<Keys, Option<Bytes>>;
838pub struct QueryIterator {
839 it: MapIter,
840}
841
842impl QueryIterator {
843 fn new(it: MapIter) -> Self {
844 Self { it }
845 }
846}
847
848impl Iterator for QueryIterator {
849 type Item = Result<Fact, StorageError>;
850 fn next(&mut self) -> Option<Self::Item> {
851 loop {
852 if let (key, Some(value)) = self.it.next()? {
854 return Some(Ok(Fact { key, value }));
855 }
856 }
857 }
858}
859
860impl<R: Read> Query for LinearFactIndex<R> {
861 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
862 let mut prior = Some(&self.repr);
863 let mut slot; while let Some(facts) = prior {
865 if let Some(v) = facts.facts.get(name).and_then(|m| m.get(keys)) {
866 return Ok(v.as_ref().cloned());
867 }
868 slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
869 prior = slot.as_ref();
870 }
871 Ok(None)
872 }
873
874 type QueryIterator = QueryIterator;
875 fn query_prefix(
876 &self,
877 name: &str,
878 prefix: &[Box<[u8]>],
879 ) -> Result<QueryIterator, StorageError> {
880 Ok(QueryIterator::new(
881 self.query_prefix_inner(name, prefix)?.into_iter(),
882 ))
883 }
884}
885
886impl<R: Read> LinearFactIndex<R> {
887 fn query_prefix_inner(
888 &self,
889 name: &str,
890 prefix: &[Box<[u8]>],
891 ) -> Result<FactMap, StorageError> {
892 let mut matches = BTreeMap::new();
893 let mut prior = Some(&self.repr);
894 let mut slot; while let Some(facts) = prior {
896 if let Some(map) = facts.facts.get(name) {
897 for (k, v) in super::memory::find_prefixes(map, prefix) {
898 if !matches.contains_key(k) {
900 matches.insert(k.clone(), v.map(Into::into));
901 }
902 }
903 }
904 slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
905 prior = slot.as_ref();
906 }
907 Ok(matches)
908 }
909}
910
911impl<R> LinearFactPerspective<R> {
912 fn clear(&mut self) {
913 self.map.clear();
914 }
915
916 fn apply_updates(&mut self, updates: &[Update]) {
917 for (name, key, value) in updates {
918 if self.prior.is_none() {
919 if let Some(value) = value {
920 self.map
921 .entry(name.clone())
922 .or_default()
923 .insert(key.clone(), Some(value.clone()));
924 } else if let Some(e) = self.map.get_mut(name) {
925 e.remove(key);
926 }
927 } else {
928 self.map
929 .entry(name.clone())
930 .or_default()
931 .insert(key.clone(), value.clone());
932 }
933 }
934 }
935}
936
937impl<R: Read> FactPerspective for LinearFactPerspective<R> {}
938
939impl<R: Read> Query for LinearFactPerspective<R> {
940 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
941 if let Some(wrapped) = self.map.get(name).and_then(|m| m.get(keys)) {
942 return Ok(wrapped.as_deref().map(Box::from));
943 }
944 match &self.prior {
945 FactPerspectivePrior::None => Ok(None),
946 FactPerspectivePrior::FactPerspective(prior) => prior.query(name, keys),
947 FactPerspectivePrior::FactIndex { offset, reader } => {
948 let repr: FactIndexRepr = reader.fetch(*offset)?;
949 let prior = LinearFactIndex {
950 repr,
951 reader: reader.clone(),
952 };
953 prior.query(name, keys)
954 }
955 }
956 }
957
958 type QueryIterator = QueryIterator;
959 fn query_prefix(
960 &self,
961 name: &str,
962 prefix: &[Box<[u8]>],
963 ) -> Result<QueryIterator, StorageError> {
964 Ok(QueryIterator::new(
965 self.query_prefix_inner(name, prefix)?.into_iter(),
966 ))
967 }
968}
969
970impl<R: Read> LinearFactPerspective<R> {
971 fn query_prefix_inner(
972 &self,
973 name: &str,
974 prefix: &[Box<[u8]>],
975 ) -> Result<FactMap, StorageError> {
976 let mut matches = match &self.prior {
977 FactPerspectivePrior::None => BTreeMap::new(),
978 FactPerspectivePrior::FactPerspective(prior) => {
979 prior.query_prefix_inner(name, prefix)?
980 }
981 FactPerspectivePrior::FactIndex { offset, reader } => {
982 let repr: FactIndexRepr = reader.fetch(*offset)?;
983 let prior = LinearFactIndex {
984 repr,
985 reader: reader.clone(),
986 };
987 prior.query_prefix_inner(name, prefix)?
988 }
989 };
990 if let Some(map) = self.map.get(name) {
991 for (k, v) in super::memory::find_prefixes(map, prefix) {
992 matches.insert(k.clone(), v.map(Into::into));
994 }
995 }
996 Ok(matches)
997 }
998}
999
1000impl<R: Read> QueryMut for LinearFactPerspective<R> {
1001 fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1002 self.map.entry(name).or_default().insert(keys, Some(value));
1003 }
1004
1005 fn delete(&mut self, name: String, keys: Keys) {
1006 if self.prior.is_none() {
1007 if let Some(kv) = self.map.get_mut(&name) {
1009 kv.remove(&keys);
1010 }
1011 } else {
1012 self.map.entry(name).or_default().insert(keys, None);
1013 }
1014 }
1015}
1016
1017impl<R: Read> FactPerspective for LinearPerspective<R> {}
1018
1019impl<R: Read> Query for LinearPerspective<R> {
1020 fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
1021 self.facts.query(name, keys)
1022 }
1023
1024 type QueryIterator = QueryIterator;
1025 fn query_prefix(
1026 &self,
1027 name: &str,
1028 prefix: &[Box<[u8]>],
1029 ) -> Result<QueryIterator, StorageError> {
1030 self.facts.query_prefix(name, prefix)
1031 }
1032}
1033
1034impl<R: Read> QueryMut for LinearPerspective<R> {
1035 fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1036 self.facts.insert(name.clone(), keys.clone(), value.clone());
1037 self.current_updates.push((name, keys, Some(value)));
1038 }
1039
1040 fn delete(&mut self, name: String, keys: Keys) {
1041 self.facts.delete(name.clone(), keys.clone());
1042 self.current_updates.push((name, keys, None))
1043 }
1044}
1045
1046impl<R: Read> Revertable for LinearPerspective<R> {
1047 fn checkpoint(&self) -> Checkpoint {
1048 Checkpoint {
1049 index: self.commands.len(),
1050 }
1051 }
1052
1053 fn revert(&mut self, checkpoint: Checkpoint) -> Result<(), Bug> {
1054 if checkpoint.index == self.commands.len() {
1055 return Ok(());
1056 }
1057
1058 if checkpoint.index > self.commands.len() {
1059 bug!("A checkpoint's index should always be less than or equal to the length of a perspective's command history!");
1060 }
1061
1062 self.commands.truncate(checkpoint.index);
1063 self.facts.clear();
1064 self.current_updates.clear();
1065 for data in &self.commands {
1066 self.facts.apply_updates(&data.updates);
1067 }
1068
1069 Ok(())
1070 }
1071}
1072
1073impl<R: Read> Perspective for LinearPerspective<R> {
1074 fn policy(&self) -> PolicyId {
1075 self.policy
1076 }
1077
1078 fn add_command(&mut self, command: &impl Command) -> Result<usize, StorageError> {
1079 if command.parent() != self.head_address()? {
1080 return Err(StorageError::PerspectiveHeadMismatch);
1081 }
1082
1083 self.commands.push(CommandData {
1084 id: command.id(),
1085 priority: command.priority(),
1086 policy: command.policy().map(Box::from),
1087 data: command.bytes().into(),
1088 updates: core::mem::take(&mut self.current_updates),
1089 });
1090 Ok(self.commands.len())
1091 }
1092
1093 fn includes(&self, id: CommandId) -> bool {
1094 self.commands.iter().any(|cmd| cmd.id == id)
1095 }
1096
1097 fn head_address(&self) -> Result<Prior<Address>, Bug> {
1098 Ok(if let Some(last) = self.commands.last() {
1099 Prior::Single(Address {
1100 id: last.id,
1101 max_cut: self
1102 .max_cut
1103 .checked_add(self.commands.len())
1104 .assume("must not overflow")?
1105 .checked_sub(1)
1106 .assume("must not overflow")?,
1107 })
1108 } else {
1109 self.parents
1110 })
1111 }
1112}
1113
1114impl From<Prior<Address>> for Prior<CommandId> {
1115 fn from(p: Prior<Address>) -> Self {
1116 match p {
1117 Prior::None => Prior::None,
1118 Prior::Single(l) => Prior::Single(l.id),
1119 Prior::Merge(l, r) => Prior::Merge(l.id, r.id),
1120 }
1121 }
1122}
1123
1124impl Command for LinearCommand<'_> {
1125 fn priority(&self) -> Priority {
1126 self.priority.clone()
1127 }
1128
1129 fn id(&self) -> CommandId {
1130 *self.id
1131 }
1132
1133 fn parent(&self) -> Prior<Address> {
1134 self.parent
1135 }
1136
1137 fn policy(&self) -> Option<&[u8]> {
1138 self.policy
1139 }
1140
1141 fn bytes(&self) -> &[u8] {
1142 self.data
1143 }
1144
1145 fn max_cut(&self) -> Result<usize, Bug> {
1146 Ok(self.max_cut)
1147 }
1148}
1149
1150#[cfg(test)]
1151mod test {
1152 use testing::Manager;
1153
1154 use super::*;
1155 use crate::testing::dsl::{test_suite, StorageBackend};
1156
1157 #[test]
1158 fn test_query_prefix() {
1159 let mut provider = LinearStorageProvider::new(Manager::new());
1160 let mut fp = provider.new_perspective(PolicyId::new(0));
1161
1162 let name = "x";
1163
1164 let keys: &[&[&str]] = &[
1165 &["aa", "xy", "123"],
1166 &["aa", "xz", "123"],
1167 &["bb", "ccc"],
1168 &["bc", ""],
1169 ];
1170 let keys: Vec<Keys> = keys
1171 .iter()
1172 .map(|ks| ks.iter().map(|k| k.as_bytes()).collect())
1173 .collect();
1174
1175 for ks in &keys {
1176 fp.insert(
1177 name.into(),
1178 ks.clone(),
1179 format!("{ks:?}").into_bytes().into(),
1180 );
1181 }
1182
1183 let prefixes: &[&[&str]] = &[
1184 &["aa", "xy", "12"],
1185 &["aa", "xy"],
1186 &["aa", "xz"],
1187 &["aa", "x"],
1188 &["bb", ""],
1189 &["bb", "ccc"],
1190 &["bc", ""],
1191 &["bc", "", ""],
1192 ];
1193
1194 for prefix in prefixes {
1195 let prefix: Keys = prefix.iter().map(|k| k.as_bytes()).collect();
1196 let found: Vec<_> = fp.query_prefix(name, &prefix).unwrap().collect();
1197 let mut expected: Vec<_> = keys.iter().filter(|k| k.starts_with(&prefix)).collect();
1198 expected.sort();
1199 assert_eq!(found.len(), expected.len());
1200 for (a, b) in std::iter::zip(found, expected) {
1201 let a = a.unwrap();
1202 assert_eq!(&a.key, b);
1203 assert_eq!(a.value.as_ref(), format!("{b:?}").as_bytes());
1204 }
1205 }
1206 }
1207
1208 struct LinearBackend;
1209 impl StorageBackend for LinearBackend {
1210 type StorageProvider = LinearStorageProvider<Manager>;
1211
1212 fn provider(&mut self, _client_id: u64) -> Self::StorageProvider {
1213 LinearStorageProvider::new(Manager::new())
1214 }
1215 }
1216 test_suite!(|| LinearBackend);
1217}