aranya_runtime/storage/linear/
mod.rs

1//! Persistant linear storage implemenatation.
2//!
3//! `LinearStorage` is a graph storage implementation backed by a file-like byte
4//! storage interface. This is designed to be usable across many environments
5//! with minimal assumptions on the underlying storage.
6//!
7//! # Layout
8//!
9//! `[x]` is page aligned.
10//!
11//! ```text
12//! // Control section
13//! [Base] [Root] [Root]
14//! // Data section
15//! [Segment or FactIndex]
16//! |
17//! V
18//! ```
19//!
20//! The `LinearStorage` will exclusively modify the control section. The data
21//! section is append-only but can be read concurrently. If written data is not
22//! committed, it may be overwritten and will become unreachable by intended
23//! means.
24
25pub mod libc;
26
27#[cfg(feature = "testing")]
28pub mod testing;
29
30use alloc::{boxed::Box, collections::BTreeMap, string::String, vec, vec::Vec};
31
32use aranya_buggy::{bug, Bug, BugExt};
33use aranya_crypto::{csprng::rand::Rng as _, Csprng, Rng};
34use serde::{Deserialize, Serialize};
35use vec1::Vec1;
36
37use crate::{
38    Address, Checkpoint, Command, CommandId, Fact, FactIndex, FactPerspective, GraphId, Keys,
39    Location, Perspective, PolicyId, Prior, Priority, Query, QueryMut, Revertable, Segment,
40    Storage, StorageError, StorageProvider,
41};
42
43pub mod io;
44pub use io::*;
45
46/// Maximum depth of fact indices before compaction.
47///
48/// A lower value will speed up search queries but require more compaction,
49/// slowing down fact index creation and using more storage space.
50///
51/// In the future, this may be configurable at runtime or dynamic based on
52/// heuristics such as fact density.
53///
54/// 16 is our initial guess for balance.
55///
56/// This must be at least 2.
57const MAX_FACT_INDEX_DEPTH: usize = 16;
58
59pub struct LinearStorageProvider<FM: IoManager> {
60    manager: FM,
61    storage: BTreeMap<GraphId, LinearStorage<FM::Writer>>,
62}
63
64pub struct LinearStorage<W> {
65    writer: W,
66}
67
68#[derive(Debug)]
69pub struct LinearSegment<R> {
70    repr: SegmentRepr,
71    reader: R,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75struct SegmentRepr {
76    /// Self offset in file.
77    offset: usize,
78    prior: Prior<Location>,
79    parents: Prior<Address>,
80    policy: PolicyId,
81    /// Offset in file to associated fact index.
82    facts: usize,
83    commands: Vec1<CommandData>,
84    max_cut: usize,
85    skip_list: Vec<(Location, usize)>,
86}
87
88#[derive(Debug, Serialize, Deserialize)]
89struct CommandData {
90    id: CommandId,
91    priority: Priority,
92    policy: Option<Bytes>,
93    data: Bytes,
94    updates: Vec<Update>,
95}
96
97pub struct LinearCommand<'a> {
98    id: &'a CommandId,
99    parent: Prior<Address>,
100    priority: Priority,
101    policy: Option<&'a [u8]>,
102    data: &'a [u8],
103    max_cut: usize,
104}
105
106type Bytes = Box<[u8]>;
107
108type Update = (String, Keys, Option<Bytes>);
109type FactMap = BTreeMap<Keys, Option<Box<[u8]>>>;
110type NamedFactMap = BTreeMap<String, FactMap>;
111
112#[derive(Debug)]
113pub struct LinearFactIndex<R> {
114    repr: FactIndexRepr,
115    reader: R,
116}
117
118#[derive(Debug, Serialize, Deserialize)]
119struct FactIndexRepr {
120    /// Self offset in file.
121    offset: usize,
122    /// Offset of prior fact index.
123    prior: Option<usize>,
124    /// Depth of this fact index.
125    ///
126    /// `prior.depth + 1`, or just `1` if no prior
127    depth: usize,
128    /// Facts in sorted order
129    facts: NamedFactMap,
130}
131
132#[derive(Debug)]
133pub struct LinearPerspective<R> {
134    prior: Prior<Location>,
135    parents: Prior<Address>,
136    policy: PolicyId,
137    facts: LinearFactPerspective<R>,
138    commands: Vec<CommandData>,
139    current_updates: Vec<Update>,
140    max_cut: usize,
141    last_common_ancestor: Option<(Location, usize)>,
142}
143
144impl<R> LinearPerspective<R> {
145    fn new(
146        prior: Prior<Location>,
147        parents: Prior<Address>,
148        policy: PolicyId,
149        prior_facts: FactPerspectivePrior<R>,
150        max_cut: usize,
151        last_common_ancestor: Option<(Location, usize)>,
152    ) -> Self {
153        Self {
154            prior,
155            parents,
156            policy,
157            facts: LinearFactPerspective::new(prior_facts),
158            commands: Vec::new(),
159            current_updates: Vec::new(),
160            max_cut,
161            last_common_ancestor,
162        }
163    }
164}
165
166#[derive(Debug)]
167pub struct LinearFactPerspective<R> {
168    map: BTreeMap<String, BTreeMap<Keys, Option<Bytes>>>,
169    prior: FactPerspectivePrior<R>,
170}
171
172impl<R> LinearFactPerspective<R> {
173    fn new(prior: FactPerspectivePrior<R>) -> Self {
174        Self {
175            map: BTreeMap::new(),
176            prior,
177        }
178    }
179}
180
181#[derive(Debug)]
182enum FactPerspectivePrior<R> {
183    None,
184    FactPerspective(Box<LinearFactPerspective<R>>),
185    FactIndex { offset: usize, reader: R },
186}
187
188impl<R> FactPerspectivePrior<R> {
189    fn is_none(&self) -> bool {
190        matches!(self, Self::None)
191    }
192}
193
194impl<FM: IoManager + Default> Default for LinearStorageProvider<FM> {
195    fn default() -> Self {
196        Self {
197            manager: FM::default(),
198            storage: BTreeMap::new(),
199        }
200    }
201}
202
203impl<FM: IoManager> LinearStorageProvider<FM> {
204    pub fn new(manager: FM) -> Self {
205        Self {
206            manager,
207            storage: BTreeMap::new(),
208        }
209    }
210}
211
212impl<FM: IoManager> StorageProvider for LinearStorageProvider<FM> {
213    type Perspective = LinearPerspective<<FM::Writer as Write>::ReadOnly>;
214    type Segment = LinearSegment<<FM::Writer as Write>::ReadOnly>;
215    type Storage = LinearStorage<FM::Writer>;
216
217    fn new_perspective(&mut self, policy_id: PolicyId) -> Self::Perspective {
218        LinearPerspective::new(
219            Prior::None,
220            Prior::None,
221            policy_id,
222            FactPerspectivePrior::None,
223            0,
224            None,
225        )
226    }
227
228    fn new_storage(
229        &mut self,
230        init: Self::Perspective,
231    ) -> Result<(GraphId, &mut Self::Storage), StorageError> {
232        use alloc::collections::btree_map::Entry;
233
234        if init.commands.is_empty() {
235            return Err(StorageError::EmptyPerspective);
236        }
237        let graph_id = GraphId::from(init.commands[0].id.into_id());
238        let Entry::Vacant(entry) = self.storage.entry(graph_id) else {
239            return Err(StorageError::StorageExists);
240        };
241
242        let file = self.manager.create(graph_id)?;
243        Ok((graph_id, entry.insert(LinearStorage::create(file, init)?)))
244    }
245
246    fn get_storage(&mut self, graph: GraphId) -> Result<&mut Self::Storage, StorageError> {
247        use alloc::collections::btree_map::Entry;
248
249        let entry = match self.storage.entry(graph) {
250            Entry::Vacant(v) => v,
251            Entry::Occupied(o) => return Ok(o.into_mut()),
252        };
253
254        let file = self
255            .manager
256            .open(graph)?
257            .ok_or(StorageError::NoSuchStorage)?;
258        Ok(entry.insert(LinearStorage::open(file)?))
259    }
260}
261
262impl<W: Write> LinearStorage<W> {
263    fn get_skip(
264        &self,
265        segment: <LinearStorage<W> as Storage>::Segment,
266        max_cut: usize,
267    ) -> Result<Option<(Location, usize)>, StorageError> {
268        let mut head = segment;
269        let mut current = None;
270        'outer: loop {
271            if max_cut > head.longest_max_cut()? {
272                return Ok(current);
273            }
274            current = Some((head.first_location(), head.shortest_max_cut()));
275            if max_cut >= head.shortest_max_cut() {
276                return Ok(current);
277            }
278            // Assumes skip list is sorted in ascending order.
279            // We always want to skip as close to the root as possible.
280            for (skip, skip_max_cut) in head.skip_list() {
281                if skip_max_cut <= &max_cut {
282                    head = self.get_segment(*skip)?;
283                    continue 'outer;
284                }
285            }
286            head = match head.prior() {
287                Prior::None | Prior::Merge(_, _) => {
288                    return Ok(current);
289                }
290                Prior::Single(l) => self.get_segment(l)?,
291            }
292        }
293    }
294}
295
296impl<W: Write> LinearStorage<W> {
297    fn create(mut writer: W, init: LinearPerspective<W::ReadOnly>) -> Result<Self, StorageError> {
298        assert!(matches!(init.prior, Prior::None));
299        assert!(matches!(init.parents, Prior::None));
300        assert!(matches!(init.facts.prior, FactPerspectivePrior::None));
301
302        let mut map = init.facts.map;
303        map.retain(|_, kv| !kv.is_empty());
304
305        let facts = writer
306            .append(|offset| FactIndexRepr {
307                offset,
308                prior: None,
309                depth: 1,
310                facts: map,
311            })?
312            .offset;
313
314        let commands = init
315            .commands
316            .try_into()
317            .map_err(|_| StorageError::EmptyPerspective)?;
318        let segment = writer.append(|offset| SegmentRepr {
319            offset,
320            prior: Prior::None,
321            parents: Prior::None,
322            policy: init.policy,
323            facts,
324            commands,
325            max_cut: 0,
326            skip_list: vec![],
327        })?;
328
329        let head = Location::new(
330            segment.offset,
331            segment
332                .commands
333                .len()
334                .checked_sub(1)
335                .assume("vec1 length >= 1")?,
336        );
337
338        writer.commit(head)?;
339
340        let storage = Self { writer };
341
342        Ok(storage)
343    }
344
345    fn open(writer: W) -> Result<Self, StorageError> {
346        Ok(Self { writer })
347    }
348
349    fn compact(&mut self, mut repr: FactIndexRepr) -> Result<FactIndexRepr, StorageError> {
350        let mut map = NamedFactMap::new();
351        let reader = self.writer.readonly();
352        loop {
353            for (name, kv) in repr.facts {
354                let sub = map.entry(name).or_default();
355                for (k, v) in kv {
356                    sub.entry(k).or_insert(v);
357                }
358            }
359            let Some(offset) = repr.prior else { break };
360            repr = reader.fetch(offset)?;
361        }
362
363        // Since there's no prior, we can remove tombstones
364        map.retain(|_, kv| {
365            kv.retain(|_, v| v.is_some());
366            !kv.is_empty()
367        });
368
369        Ok(self
370            .write_facts(LinearFactPerspective {
371                map,
372                prior: FactPerspectivePrior::None,
373            })?
374            .repr)
375    }
376}
377
378impl<F: Write> Storage for LinearStorage<F> {
379    type Perspective = LinearPerspective<F::ReadOnly>;
380    type FactPerspective = LinearFactPerspective<F::ReadOnly>;
381    type Segment = LinearSegment<F::ReadOnly>;
382    type FactIndex = LinearFactIndex<F::ReadOnly>;
383
384    fn get_command_id(&self, location: Location) -> Result<CommandId, StorageError> {
385        let seg = self.get_segment(location)?;
386        let cmd = seg
387            .get_command(location)
388            .ok_or(StorageError::CommandOutOfBounds(location))?;
389        Ok(cmd.id())
390    }
391
392    fn get_linear_perspective(
393        &self,
394        parent: Location,
395    ) -> Result<Option<Self::Perspective>, StorageError> {
396        let segment = self.get_segment(parent)?;
397        let command = segment
398            .get_command(parent)
399            .ok_or(StorageError::CommandOutOfBounds(parent))?;
400        let policy = segment.repr.policy;
401        let prior_facts: FactPerspectivePrior<F::ReadOnly> = if parent == segment.head_location() {
402            FactPerspectivePrior::FactIndex {
403                offset: segment.repr.facts,
404                reader: self.writer.readonly(),
405            }
406        } else {
407            let prior = match segment.facts()?.repr.prior {
408                Some(offset) => FactPerspectivePrior::FactIndex {
409                    offset,
410                    reader: self.writer.readonly(),
411                },
412                None => FactPerspectivePrior::None,
413            };
414            let mut facts = LinearFactPerspective::new(prior);
415            for data in &segment.repr.commands[..=parent.command] {
416                facts.apply_updates(&data.updates);
417            }
418            if facts.prior.is_none() {
419                facts.map.retain(|_, kv| !kv.is_empty());
420            }
421            if facts.map.is_empty() {
422                facts.prior
423            } else {
424                FactPerspectivePrior::FactPerspective(Box::new(facts))
425            }
426        };
427        let prior = Prior::Single(parent);
428
429        let perspective = LinearPerspective::new(
430            prior,
431            Prior::Single(command.address()?),
432            policy,
433            prior_facts,
434            command
435                .max_cut()?
436                .checked_add(1)
437                .assume("must not overflow")?,
438            None,
439        );
440
441        Ok(Some(perspective))
442    }
443
444    fn get_fact_perspective(
445        &self,
446        location: Location,
447    ) -> Result<Self::FactPerspective, StorageError> {
448        let segment = self.get_segment(location)?;
449
450        // If at head of segment, or no facts in segment,
451        // we don't need to apply updates.
452        if location == segment.head_location()
453            || segment
454                .repr
455                .commands
456                .iter()
457                .all(|cmd| cmd.updates.is_empty())
458        {
459            return Ok(LinearFactPerspective::new(
460                FactPerspectivePrior::FactIndex {
461                    offset: segment.repr.facts,
462                    reader: self.writer.readonly(),
463                },
464            ));
465        }
466
467        let prior = match segment.facts()?.repr.prior {
468            Some(offset) => FactPerspectivePrior::FactIndex {
469                offset,
470                reader: self.writer.readonly(),
471            },
472            None => FactPerspectivePrior::None,
473        };
474        let mut facts = LinearFactPerspective::new(prior);
475        for data in &segment.repr.commands[..=location.command] {
476            facts.apply_updates(&data.updates);
477        }
478
479        Ok(facts)
480    }
481
482    fn new_merge_perspective(
483        &self,
484        left: Location,
485        right: Location,
486        last_common_ancestor: (Location, usize),
487        policy_id: PolicyId,
488        braid: Self::FactIndex,
489    ) -> Result<Option<Self::Perspective>, StorageError> {
490        // TODO(jdygert): ensure braid belongs to this storage.
491        // TODO(jdygert): ensure braid ends at given command?
492        let left_segment = self.get_segment(left)?;
493        let left_command = left_segment
494            .get_command(left)
495            .ok_or(StorageError::CommandOutOfBounds(left))?;
496        let right_segment = self.get_segment(right)?;
497        let right_command = right_segment
498            .get_command(right)
499            .ok_or(StorageError::CommandOutOfBounds(right))?;
500
501        let parent = Prior::Merge(left_command.address()?, right_command.address()?);
502
503        if policy_id != left_segment.policy() && policy_id != right_segment.policy() {
504            return Err(StorageError::PolicyMismatch);
505        }
506
507        let prior = Prior::Merge(left, right);
508
509        let perspective = LinearPerspective::new(
510            prior,
511            parent,
512            policy_id,
513            FactPerspectivePrior::FactIndex {
514                offset: braid.repr.offset,
515                reader: braid.reader,
516            },
517            left_command
518                .max_cut()?
519                .max(right_command.max_cut()?)
520                .checked_add(1)
521                .assume("must not overflow")?,
522            Some(last_common_ancestor),
523        );
524
525        Ok(Some(perspective))
526    }
527
528    fn get_segment(&self, location: Location) -> Result<Self::Segment, StorageError> {
529        let reader = self.writer.readonly();
530        let repr = reader.fetch(location.segment)?;
531        let seg = LinearSegment { repr, reader };
532
533        Ok(seg)
534    }
535
536    fn get_head(&self) -> Result<Location, StorageError> {
537        self.writer.head()
538    }
539
540    fn commit(&mut self, segment: Self::Segment) -> Result<(), StorageError> {
541        if !self.is_ancestor(self.get_head()?, &segment)? {
542            return Err(StorageError::HeadNotAncestor);
543        }
544
545        self.writer.commit(segment.head_location())
546    }
547
548    fn write(&mut self, perspective: Self::Perspective) -> Result<Self::Segment, StorageError> {
549        // TODO(jdygert): Validate prior?
550
551        let facts = self.write_facts(perspective.facts)?.repr.offset;
552
553        let commands: Vec1<CommandData> = perspective
554            .commands
555            .try_into()
556            .map_err(|_| StorageError::EmptyPerspective)?;
557
558        let get_skips =
559            |l: Location, count: usize| -> Result<Vec<(Location, usize)>, StorageError> {
560                let mut rng = &mut Rng as &mut dyn Csprng;
561                let mut skips = vec![];
562                for _ in 0..count {
563                    let segment = self.get_segment(l)?;
564                    let l_max_cut = segment
565                        .get_command(l)
566                        .assume("location must exist")?
567                        .max_cut;
568                    if l_max_cut > 0 {
569                        let max_cut = rng.gen_range(0..l_max_cut);
570                        if let Some(skip) = self.get_skip(segment, max_cut)? {
571                            if !skips.contains(&skip) {
572                                skips.push(skip);
573                            }
574                        } else {
575                            break;
576                        }
577                    }
578                }
579                Ok(skips)
580            };
581
582        let skip_list = match perspective.prior {
583            Prior::None => vec![],
584            Prior::Merge(_, _) => {
585                let (lca, max_cut) = perspective.last_common_ancestor.assume("lca must exist")?;
586                let mut skips = get_skips(lca, 2)?;
587                if !skips.contains(&(lca, max_cut)) {
588                    skips.push((lca, max_cut));
589                }
590                skips.sort();
591                skips
592            }
593            Prior::Single(l) => {
594                let mut skips = get_skips(l, 3)?;
595                skips.sort();
596                skips
597            }
598        };
599        let repr = self.writer.append(|offset| SegmentRepr {
600            offset,
601            prior: perspective.prior,
602            parents: perspective.parents,
603            policy: perspective.policy,
604            facts,
605            commands,
606            max_cut: perspective.max_cut,
607            skip_list,
608        })?;
609
610        Ok(LinearSegment {
611            repr,
612            reader: self.writer.readonly(),
613        })
614    }
615
616    fn write_facts(
617        &mut self,
618        facts: Self::FactPerspective,
619    ) -> Result<Self::FactIndex, StorageError> {
620        let mut prior = match facts.prior {
621            FactPerspectivePrior::None => None,
622            FactPerspectivePrior::FactPerspective(prior) => {
623                let prior = self.write_facts(*prior)?;
624                if facts.map.is_empty() {
625                    return Ok(prior);
626                }
627                Some(prior.repr)
628            }
629            FactPerspectivePrior::FactIndex { offset, reader } => {
630                let repr = reader.fetch(offset)?;
631                if facts.map.is_empty() {
632                    return Ok(LinearFactIndex { repr, reader });
633                }
634                Some(repr)
635            }
636        };
637
638        let depth = if let Some(mut p) = prior.take() {
639            if p.depth > MAX_FACT_INDEX_DEPTH - 1 {
640                p = self.compact(p)?;
641            }
642            prior.insert(p).depth
643        } else {
644            0
645        };
646
647        let depth = depth.checked_add(1).assume("depth won't overflow")?;
648
649        if depth > MAX_FACT_INDEX_DEPTH {
650            bug!("fact index too deep");
651        }
652
653        let repr = self.writer.append(|offset| FactIndexRepr {
654            offset,
655            prior: prior.map(|p| p.offset),
656            depth,
657            facts: facts.map,
658        })?;
659
660        Ok(LinearFactIndex {
661            repr,
662            reader: self.writer.readonly(),
663        })
664    }
665}
666
667impl<R: Read> Segment for LinearSegment<R> {
668    type FactIndex = LinearFactIndex<R>;
669    type Command<'a>
670        = LinearCommand<'a>
671    where
672        R: 'a;
673
674    fn head(&self) -> Result<Self::Command<'_>, StorageError> {
675        let data = self.repr.commands.last();
676        let parent = if let Some(prev) = usize::checked_sub(self.repr.commands.len(), 2) {
677            Prior::Single(Address {
678                id: self.repr.commands[prev].id,
679                max_cut: self
680                    .repr
681                    .max_cut
682                    .checked_add(prev)
683                    .assume("must not overflow")?,
684            })
685        } else {
686            self.repr.parents
687        };
688        Ok(LinearCommand {
689            id: &data.id,
690            parent,
691            priority: data.priority.clone(),
692            policy: data.policy.as_deref(),
693            data: &data.data,
694            max_cut: self
695                .repr
696                .max_cut
697                .checked_add(self.repr.commands.len())
698                .assume("must not overflow")?
699                .checked_sub(1)
700                .assume("must not overflow")?,
701        })
702    }
703
704    fn first(&self) -> Self::Command<'_> {
705        let data = self.repr.commands.first();
706        LinearCommand {
707            id: &data.id,
708            parent: self.repr.parents,
709            priority: data.priority.clone(),
710            policy: data.policy.as_deref(),
711            data: &data.data,
712            max_cut: self.repr.max_cut,
713        }
714    }
715
716    fn head_location(&self) -> Location {
717        // vec1 length >= 1
718        #[allow(clippy::arithmetic_side_effects)]
719        Location::new(self.repr.offset, self.repr.commands.len() - 1)
720    }
721
722    fn first_location(&self) -> Location {
723        Location::new(self.repr.offset, 0)
724    }
725
726    fn contains(&self, location: Location) -> bool {
727        location.segment == self.repr.offset && location.command < self.repr.commands.len()
728    }
729
730    fn policy(&self) -> PolicyId {
731        self.repr.policy
732    }
733
734    fn prior(&self) -> Prior<Location> {
735        self.repr.prior
736    }
737
738    fn get_command(&self, location: Location) -> Option<Self::Command<'_>> {
739        if self.repr.offset != location.segment {
740            return None;
741        }
742        let data = self.repr.commands.get(location.command)?;
743        let parent = if let Some(prev) = usize::checked_sub(location.command, 1) {
744            if let Some(max_cut) = self.repr.max_cut.checked_add(prev) {
745                Prior::Single(Address {
746                    id: self.repr.commands[prev].id,
747                    max_cut,
748                })
749            } else {
750                return None;
751            }
752        } else {
753            self.repr.parents
754        };
755        self.repr
756            .max_cut
757            .checked_add(location.command)
758            .map(|max_cut| LinearCommand {
759                id: &data.id,
760                parent,
761                priority: data.priority.clone(),
762                policy: data.policy.as_deref(),
763                data: &data.data,
764                max_cut,
765            })
766    }
767
768    fn get_from(&self, location: Location) -> Vec<Self::Command<'_>> {
769        if self.repr.offset != location.segment {
770            // TODO(jdygert): Result?
771            return Vec::new();
772        }
773
774        // TODO(jdygert): Optimize?
775        (location.command..self.repr.commands.len())
776            .map(|c| Location::new(location.segment, c))
777            .map(|loc| {
778                self.get_command(loc)
779                    .expect("constructed location is valid")
780            })
781            .collect()
782    }
783
784    fn get_from_max_cut(&self, max_cut: usize) -> Result<Option<Location>, StorageError> {
785        if max_cut >= self.repr.max_cut
786            && max_cut
787                <= self
788                    .repr
789                    .max_cut
790                    .checked_add(self.repr.commands.len())
791                    .assume("must not overflow")?
792        {
793            return Ok(Some(Location::new(
794                self.repr.offset,
795                max_cut
796                    .checked_sub(self.repr.max_cut)
797                    .assume("must not overflow")?,
798            )));
799        }
800        Ok(None)
801    }
802
803    fn facts(&self) -> Result<Self::FactIndex, StorageError> {
804        Ok(LinearFactIndex {
805            repr: self.reader.fetch(self.repr.facts)?,
806            reader: self.reader.clone(),
807        })
808    }
809
810    fn skip_list(&self) -> &[(Location, usize)] {
811        &self.repr.skip_list
812    }
813
814    fn shortest_max_cut(&self) -> usize {
815        self.repr.max_cut
816    }
817
818    fn longest_max_cut(&self) -> Result<usize, StorageError> {
819        Ok(self
820            .repr
821            .max_cut
822            .checked_add(self.repr.commands.len())
823            .assume("must not overflow")?
824            .checked_sub(1)
825            .assume("must not overflow")?)
826    }
827}
828
829impl<R: Read> FactIndex for LinearFactIndex<R> {}
830
831type MapIter = alloc::collections::btree_map::IntoIter<Keys, Option<Bytes>>;
832pub struct QueryIterator {
833    it: MapIter,
834}
835
836impl QueryIterator {
837    fn new(it: MapIter) -> Self {
838        Self { it }
839    }
840}
841
842impl Iterator for QueryIterator {
843    type Item = Result<Fact, StorageError>;
844    fn next(&mut self) -> Option<Self::Item> {
845        loop {
846            // filter out tombstones
847            if let (key, Some(value)) = self.it.next()? {
848                return Some(Ok(Fact { key, value }));
849            }
850        }
851    }
852}
853
854impl<R: Read> Query for LinearFactIndex<R> {
855    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
856        let mut prior = Some(&self.repr);
857        let mut slot; // Need to store deserialized value.
858        while let Some(facts) = prior {
859            if let Some(v) = facts.facts.get(name).and_then(|m| m.get(keys)) {
860                return Ok(v.as_ref().cloned());
861            }
862            slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
863            prior = slot.as_ref();
864        }
865        Ok(None)
866    }
867
868    type QueryIterator = QueryIterator;
869    fn query_prefix(
870        &self,
871        name: &str,
872        prefix: &[Box<[u8]>],
873    ) -> Result<QueryIterator, StorageError> {
874        Ok(QueryIterator::new(
875            self.query_prefix_inner(name, prefix)?.into_iter(),
876        ))
877    }
878}
879
880impl<R: Read> LinearFactIndex<R> {
881    fn query_prefix_inner(
882        &self,
883        name: &str,
884        prefix: &[Box<[u8]>],
885    ) -> Result<FactMap, StorageError> {
886        let mut matches = BTreeMap::new();
887        let mut prior = Some(&self.repr);
888        let mut slot; // Need to store deserialized value.
889        while let Some(facts) = prior {
890            if let Some(map) = facts.facts.get(name) {
891                for (k, v) in super::memory::find_prefixes(map, prefix) {
892                    // don't override, if we've already found the fact (including deletions)
893                    if !matches.contains_key(k) {
894                        matches.insert(k.clone(), v.map(Into::into));
895                    }
896                }
897            }
898            slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
899            prior = slot.as_ref();
900        }
901        Ok(matches)
902    }
903}
904
905impl<R> LinearFactPerspective<R> {
906    fn clear(&mut self) {
907        self.map.clear();
908    }
909
910    fn apply_updates(&mut self, updates: &[Update]) {
911        for (name, key, value) in updates {
912            if self.prior.is_none() {
913                if let Some(value) = value {
914                    self.map
915                        .entry(name.clone())
916                        .or_default()
917                        .insert(key.clone(), Some(value.clone()));
918                } else if let Some(e) = self.map.get_mut(name) {
919                    e.remove(key);
920                }
921            } else {
922                self.map
923                    .entry(name.clone())
924                    .or_default()
925                    .insert(key.clone(), value.clone());
926            }
927        }
928    }
929}
930
931impl<R: Read> FactPerspective for LinearFactPerspective<R> {}
932
933impl<R: Read> Query for LinearFactPerspective<R> {
934    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
935        if let Some(wrapped) = self.map.get(name).and_then(|m| m.get(keys)) {
936            return Ok(wrapped.as_deref().map(Box::from));
937        }
938        match &self.prior {
939            FactPerspectivePrior::None => Ok(None),
940            FactPerspectivePrior::FactPerspective(prior) => prior.query(name, keys),
941            FactPerspectivePrior::FactIndex { offset, reader } => {
942                let repr: FactIndexRepr = reader.fetch(*offset)?;
943                let prior = LinearFactIndex {
944                    repr,
945                    reader: reader.clone(),
946                };
947                prior.query(name, keys)
948            }
949        }
950    }
951
952    type QueryIterator = QueryIterator;
953    fn query_prefix(
954        &self,
955        name: &str,
956        prefix: &[Box<[u8]>],
957    ) -> Result<QueryIterator, StorageError> {
958        Ok(QueryIterator::new(
959            self.query_prefix_inner(name, prefix)?.into_iter(),
960        ))
961    }
962}
963
964impl<R: Read> LinearFactPerspective<R> {
965    fn query_prefix_inner(
966        &self,
967        name: &str,
968        prefix: &[Box<[u8]>],
969    ) -> Result<FactMap, StorageError> {
970        let mut matches = match &self.prior {
971            FactPerspectivePrior::None => BTreeMap::new(),
972            FactPerspectivePrior::FactPerspective(prior) => {
973                prior.query_prefix_inner(name, prefix)?
974            }
975            FactPerspectivePrior::FactIndex { offset, reader } => {
976                let repr: FactIndexRepr = reader.fetch(*offset)?;
977                let prior = LinearFactIndex {
978                    repr,
979                    reader: reader.clone(),
980                };
981                prior.query_prefix_inner(name, prefix)?
982            }
983        };
984        if let Some(map) = self.map.get(name) {
985            for (k, v) in super::memory::find_prefixes(map, prefix) {
986                // overwrite "earlier" facts
987                matches.insert(k.clone(), v.map(Into::into));
988            }
989        }
990        Ok(matches)
991    }
992}
993
994impl<R: Read> QueryMut for LinearFactPerspective<R> {
995    fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
996        self.map.entry(name).or_default().insert(keys, Some(value));
997    }
998
999    fn delete(&mut self, name: String, keys: Keys) {
1000        if self.prior.is_none() {
1001            // No need for tombstones with no prior.
1002            if let Some(kv) = self.map.get_mut(&name) {
1003                kv.remove(&keys);
1004            }
1005        } else {
1006            self.map.entry(name).or_default().insert(keys, None);
1007        }
1008    }
1009}
1010
1011impl<R: Read> FactPerspective for LinearPerspective<R> {}
1012
1013impl<R: Read> Query for LinearPerspective<R> {
1014    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
1015        self.facts.query(name, keys)
1016    }
1017
1018    type QueryIterator = QueryIterator;
1019    fn query_prefix(
1020        &self,
1021        name: &str,
1022        prefix: &[Box<[u8]>],
1023    ) -> Result<QueryIterator, StorageError> {
1024        self.facts.query_prefix(name, prefix)
1025    }
1026}
1027
1028impl<R: Read> QueryMut for LinearPerspective<R> {
1029    fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1030        self.facts.insert(name.clone(), keys.clone(), value.clone());
1031        self.current_updates.push((name, keys, Some(value)));
1032    }
1033
1034    fn delete(&mut self, name: String, keys: Keys) {
1035        self.facts.delete(name.clone(), keys.clone());
1036        self.current_updates.push((name, keys, None))
1037    }
1038}
1039
1040impl<R: Read> Revertable for LinearPerspective<R> {
1041    fn checkpoint(&self) -> Checkpoint {
1042        Checkpoint {
1043            index: self.commands.len(),
1044        }
1045    }
1046
1047    fn revert(&mut self, checkpoint: Checkpoint) -> Result<(), Bug> {
1048        if checkpoint.index == self.commands.len() {
1049            return Ok(());
1050        }
1051
1052        if checkpoint.index > self.commands.len() {
1053            bug!("A checkpoint's index should always be less than or equal to the length of a perspective's command history!");
1054        }
1055
1056        self.commands.truncate(checkpoint.index);
1057        self.facts.clear();
1058        self.current_updates.clear();
1059        for data in &self.commands {
1060            self.facts.apply_updates(&data.updates);
1061        }
1062
1063        Ok(())
1064    }
1065}
1066
1067impl<R: Read> Perspective for LinearPerspective<R> {
1068    fn policy(&self) -> PolicyId {
1069        self.policy
1070    }
1071
1072    fn add_command(&mut self, command: &impl Command) -> Result<usize, StorageError> {
1073        if command.parent() != self.head_address()? {
1074            return Err(StorageError::PerspectiveHeadMismatch);
1075        }
1076
1077        self.commands.push(CommandData {
1078            id: command.id(),
1079            priority: command.priority(),
1080            policy: command.policy().map(Box::from),
1081            data: command.bytes().into(),
1082            updates: core::mem::take(&mut self.current_updates),
1083        });
1084        Ok(self.commands.len())
1085    }
1086
1087    fn includes(&self, id: CommandId) -> bool {
1088        self.commands.iter().any(|cmd| cmd.id == id)
1089    }
1090
1091    fn head_address(&self) -> Result<Prior<Address>, Bug> {
1092        Ok(if let Some(last) = self.commands.last() {
1093            Prior::Single(Address {
1094                id: last.id,
1095                max_cut: self
1096                    .max_cut
1097                    .checked_add(self.commands.len())
1098                    .assume("must not overflow")?
1099                    .checked_sub(1)
1100                    .assume("must not overflow")?,
1101            })
1102        } else {
1103            self.parents
1104        })
1105    }
1106}
1107
1108impl From<Prior<Address>> for Prior<CommandId> {
1109    fn from(p: Prior<Address>) -> Self {
1110        match p {
1111            Prior::None => Prior::None,
1112            Prior::Single(l) => Prior::Single(l.id),
1113            Prior::Merge(l, r) => Prior::Merge(l.id, r.id),
1114        }
1115    }
1116}
1117
1118impl Command for LinearCommand<'_> {
1119    fn priority(&self) -> Priority {
1120        self.priority.clone()
1121    }
1122
1123    fn id(&self) -> CommandId {
1124        *self.id
1125    }
1126
1127    fn parent(&self) -> Prior<Address> {
1128        self.parent
1129    }
1130
1131    fn policy(&self) -> Option<&[u8]> {
1132        self.policy
1133    }
1134
1135    fn bytes(&self) -> &[u8] {
1136        self.data
1137    }
1138
1139    fn max_cut(&self) -> Result<usize, Bug> {
1140        Ok(self.max_cut)
1141    }
1142}
1143
1144#[cfg(test)]
1145mod test {
1146    use testing::Manager;
1147
1148    use super::*;
1149    use crate::testing::dsl::{test_suite, StorageBackend};
1150
1151    #[test]
1152    fn test_query_prefix() {
1153        let mut provider = LinearStorageProvider::new(Manager);
1154        let mut fp = provider.new_perspective(PolicyId::new(0));
1155
1156        let name = "x";
1157
1158        let keys: &[&[&str]] = &[
1159            &["aa", "xy", "123"],
1160            &["aa", "xz", "123"],
1161            &["bb", "ccc"],
1162            &["bc", ""],
1163        ];
1164        let keys: Vec<Keys> = keys
1165            .iter()
1166            .map(|ks| ks.iter().map(|k| k.as_bytes()).collect())
1167            .collect();
1168
1169        for ks in &keys {
1170            fp.insert(
1171                name.into(),
1172                ks.clone(),
1173                format!("{ks:?}").into_bytes().into(),
1174            );
1175        }
1176
1177        let prefixes: &[&[&str]] = &[
1178            &["aa", "xy", "12"],
1179            &["aa", "xy"],
1180            &["aa", "xz"],
1181            &["aa", "x"],
1182            &["bb", ""],
1183            &["bb", "ccc"],
1184            &["bc", ""],
1185            &["bc", "", ""],
1186        ];
1187
1188        for prefix in prefixes {
1189            let prefix: Keys = prefix.iter().map(|k| k.as_bytes()).collect();
1190            let found: Vec<_> = fp.query_prefix(name, &prefix).unwrap().collect();
1191            let mut expected: Vec<_> = keys.iter().filter(|k| k.starts_with(&prefix)).collect();
1192            expected.sort();
1193            assert_eq!(found.len(), expected.len());
1194            for (a, b) in std::iter::zip(found, expected) {
1195                let a = a.unwrap();
1196                assert_eq!(&a.key, b);
1197                assert_eq!(a.value.as_ref(), format!("{b:?}").as_bytes());
1198            }
1199        }
1200    }
1201
1202    struct LinearBackend;
1203    impl StorageBackend for LinearBackend {
1204        type StorageProvider = LinearStorageProvider<Manager>;
1205
1206        fn provider(&mut self, _client_id: u64) -> Self::StorageProvider {
1207            LinearStorageProvider::new(Manager)
1208        }
1209    }
1210    test_suite!(|| LinearBackend);
1211}