aranya_runtime/storage/linear/
mod.rs

1//! Persistant linear storage implemenatation.
2//!
3//! `LinearStorage` is a graph storage implementation backed by a file-like byte
4//! storage interface. This is designed to be usable across many environments
5//! with minimal assumptions on the underlying storage.
6//!
7//! # Layout
8//!
9//! `[x]` is page aligned.
10//!
11//! ```text
12//! // Control section
13//! [Base] [Root] [Root]
14//! // Data section
15//! [Segment or FactIndex]
16//! |
17//! V
18//! ```
19//!
20//! The `LinearStorage` will exclusively modify the control section. The data
21//! section is append-only but can be read concurrently. If written data is not
22//! committed, it may be overwritten and will become unreachable by intended
23//! means.
24
25pub mod libc;
26
27#[cfg(feature = "testing")]
28pub mod testing;
29
30use alloc::{boxed::Box, collections::BTreeMap, string::String, vec, vec::Vec};
31
32use aranya_crypto::{dangerous::spideroak_crypto::csprng::rand::Rng as _, Csprng, Rng};
33use buggy::{bug, Bug, BugExt};
34use serde::{Deserialize, Serialize};
35use vec1::Vec1;
36
37use crate::{
38    Address, Checkpoint, Command, CommandId, Fact, FactIndex, FactPerspective, GraphId, Keys,
39    Location, Perspective, PolicyId, Prior, Priority, Query, QueryMut, Revertable, Segment,
40    Storage, StorageError, StorageProvider,
41};
42
43pub mod io;
44pub use io::*;
45
46/// Maximum depth of fact indices before compaction.
47///
48/// A lower value will speed up search queries but require more compaction,
49/// slowing down fact index creation and using more storage space.
50///
51/// In the future, this may be configurable at runtime or dynamic based on
52/// heuristics such as fact density.
53///
54/// 16 is our initial guess for balance.
55///
56/// This must be at least 2.
57const MAX_FACT_INDEX_DEPTH: usize = 16;
58
59pub struct LinearStorageProvider<FM: IoManager> {
60    manager: FM,
61    storage: BTreeMap<GraphId, LinearStorage<FM::Writer>>,
62}
63
64pub struct LinearStorage<W> {
65    writer: W,
66}
67
68#[derive(Debug)]
69pub struct LinearSegment<R> {
70    repr: SegmentRepr,
71    reader: R,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75struct SegmentRepr {
76    /// Self offset in file.
77    offset: usize,
78    prior: Prior<Location>,
79    parents: Prior<Address>,
80    policy: PolicyId,
81    /// Offset in file to associated fact index.
82    facts: usize,
83    commands: Vec1<CommandData>,
84    max_cut: usize,
85    skip_list: Vec<(Location, usize)>,
86}
87
88#[derive(Debug, Serialize, Deserialize)]
89struct CommandData {
90    id: CommandId,
91    priority: Priority,
92    policy: Option<Bytes>,
93    data: Bytes,
94    updates: Vec<Update>,
95}
96
97pub struct LinearCommand<'a> {
98    id: &'a CommandId,
99    parent: Prior<Address>,
100    priority: Priority,
101    policy: Option<&'a [u8]>,
102    data: &'a [u8],
103    max_cut: usize,
104}
105
106type Bytes = Box<[u8]>;
107
108type Update = (String, Keys, Option<Bytes>);
109type FactMap = BTreeMap<Keys, Option<Box<[u8]>>>;
110type NamedFactMap = BTreeMap<String, FactMap>;
111
112#[derive(Debug)]
113pub struct LinearFactIndex<R> {
114    repr: FactIndexRepr,
115    reader: R,
116}
117
118#[derive(Debug, Serialize, Deserialize)]
119struct FactIndexRepr {
120    /// Self offset in file.
121    offset: usize,
122    /// Offset of prior fact index.
123    prior: Option<usize>,
124    /// Depth of this fact index.
125    ///
126    /// `prior.depth + 1`, or just `1` if no prior
127    depth: usize,
128    /// Facts in sorted order
129    facts: NamedFactMap,
130}
131
132#[derive(Debug)]
133pub struct LinearPerspective<R> {
134    prior: Prior<Location>,
135    parents: Prior<Address>,
136    policy: PolicyId,
137    facts: LinearFactPerspective<R>,
138    commands: Vec<CommandData>,
139    current_updates: Vec<Update>,
140    max_cut: usize,
141    last_common_ancestor: Option<(Location, usize)>,
142}
143
144impl<R> LinearPerspective<R> {
145    fn new(
146        prior: Prior<Location>,
147        parents: Prior<Address>,
148        policy: PolicyId,
149        prior_facts: FactPerspectivePrior<R>,
150        max_cut: usize,
151        last_common_ancestor: Option<(Location, usize)>,
152    ) -> Self {
153        Self {
154            prior,
155            parents,
156            policy,
157            facts: LinearFactPerspective::new(prior_facts),
158            commands: Vec::new(),
159            current_updates: Vec::new(),
160            max_cut,
161            last_common_ancestor,
162        }
163    }
164}
165
166#[derive(Debug)]
167pub struct LinearFactPerspective<R> {
168    map: BTreeMap<String, BTreeMap<Keys, Option<Bytes>>>,
169    prior: FactPerspectivePrior<R>,
170}
171
172impl<R> LinearFactPerspective<R> {
173    fn new(prior: FactPerspectivePrior<R>) -> Self {
174        Self {
175            map: BTreeMap::new(),
176            prior,
177        }
178    }
179}
180
181#[derive(Debug)]
182enum FactPerspectivePrior<R> {
183    None,
184    FactPerspective(Box<LinearFactPerspective<R>>),
185    FactIndex { offset: usize, reader: R },
186}
187
188impl<R> FactPerspectivePrior<R> {
189    fn is_none(&self) -> bool {
190        matches!(self, Self::None)
191    }
192}
193
194impl<FM: IoManager + Default> Default for LinearStorageProvider<FM> {
195    fn default() -> Self {
196        Self {
197            manager: FM::default(),
198            storage: BTreeMap::new(),
199        }
200    }
201}
202
203impl<FM: IoManager> LinearStorageProvider<FM> {
204    pub fn new(manager: FM) -> Self {
205        Self {
206            manager,
207            storage: BTreeMap::new(),
208        }
209    }
210}
211
212impl<FM: IoManager> StorageProvider for LinearStorageProvider<FM> {
213    type Perspective = LinearPerspective<<FM::Writer as Write>::ReadOnly>;
214    type Segment = LinearSegment<<FM::Writer as Write>::ReadOnly>;
215    type Storage = LinearStorage<FM::Writer>;
216
217    fn new_perspective(&mut self, policy_id: PolicyId) -> Self::Perspective {
218        LinearPerspective::new(
219            Prior::None,
220            Prior::None,
221            policy_id,
222            FactPerspectivePrior::None,
223            0,
224            None,
225        )
226    }
227
228    fn new_storage(
229        &mut self,
230        init: Self::Perspective,
231    ) -> Result<(GraphId, &mut Self::Storage), StorageError> {
232        use alloc::collections::btree_map::Entry;
233
234        if init.commands.is_empty() {
235            return Err(StorageError::EmptyPerspective);
236        }
237        let graph_id = GraphId::from(init.commands[0].id.into_id());
238        let Entry::Vacant(entry) = self.storage.entry(graph_id) else {
239            return Err(StorageError::StorageExists);
240        };
241
242        let file = self.manager.create(graph_id)?;
243        Ok((graph_id, entry.insert(LinearStorage::create(file, init)?)))
244    }
245
246    fn get_storage(&mut self, graph: GraphId) -> Result<&mut Self::Storage, StorageError> {
247        use alloc::collections::btree_map::Entry;
248
249        let entry = match self.storage.entry(graph) {
250            Entry::Vacant(v) => v,
251            Entry::Occupied(o) => return Ok(o.into_mut()),
252        };
253
254        let file = self
255            .manager
256            .open(graph)?
257            .ok_or(StorageError::NoSuchStorage)?;
258        Ok(entry.insert(LinearStorage::open(file)?))
259    }
260
261    fn remove_storage(&mut self, graph: GraphId) -> Result<(), StorageError> {
262        self.manager.remove(graph)?;
263
264        self.storage
265            .remove(&graph)
266            .ok_or(StorageError::NoSuchStorage)?;
267
268        Ok(())
269    }
270
271    fn list_graph_ids(
272        &mut self,
273    ) -> Result<impl Iterator<Item = Result<GraphId, StorageError>>, StorageError> {
274        self.manager.list()
275    }
276}
277
278impl<W: Write> LinearStorage<W> {
279    fn get_skip(
280        &self,
281        segment: <LinearStorage<W> as Storage>::Segment,
282        max_cut: usize,
283    ) -> Result<Option<(Location, usize)>, StorageError> {
284        let mut head = segment;
285        let mut current = None;
286        'outer: loop {
287            if max_cut > head.longest_max_cut()? {
288                return Ok(current);
289            }
290            current = Some((head.first_location(), head.shortest_max_cut()));
291            if max_cut >= head.shortest_max_cut() {
292                return Ok(current);
293            }
294            // Assumes skip list is sorted in ascending order.
295            // We always want to skip as close to the root as possible.
296            for (skip, skip_max_cut) in head.skip_list() {
297                if skip_max_cut <= &max_cut {
298                    head = self.get_segment(*skip)?;
299                    continue 'outer;
300                }
301            }
302            head = match head.prior() {
303                Prior::None | Prior::Merge(_, _) => {
304                    return Ok(current);
305                }
306                Prior::Single(l) => self.get_segment(l)?,
307            }
308        }
309    }
310}
311
312impl<W: Write> LinearStorage<W> {
313    fn create(mut writer: W, init: LinearPerspective<W::ReadOnly>) -> Result<Self, StorageError> {
314        assert!(matches!(init.prior, Prior::None));
315        assert!(matches!(init.parents, Prior::None));
316        assert!(matches!(init.facts.prior, FactPerspectivePrior::None));
317
318        let mut map = init.facts.map;
319        map.retain(|_, kv| !kv.is_empty());
320
321        let facts = writer
322            .append(|offset| FactIndexRepr {
323                offset,
324                prior: None,
325                depth: 1,
326                facts: map,
327            })?
328            .offset;
329
330        let commands = init
331            .commands
332            .try_into()
333            .map_err(|_| StorageError::EmptyPerspective)?;
334        let segment = writer.append(|offset| SegmentRepr {
335            offset,
336            prior: Prior::None,
337            parents: Prior::None,
338            policy: init.policy,
339            facts,
340            commands,
341            max_cut: 0,
342            skip_list: vec![],
343        })?;
344
345        let head = Location::new(
346            segment.offset,
347            segment
348                .commands
349                .len()
350                .checked_sub(1)
351                .assume("vec1 length >= 1")?,
352        );
353
354        writer.commit(head)?;
355
356        let storage = Self { writer };
357
358        Ok(storage)
359    }
360
361    fn open(writer: W) -> Result<Self, StorageError> {
362        Ok(Self { writer })
363    }
364
365    fn compact(&mut self, mut repr: FactIndexRepr) -> Result<FactIndexRepr, StorageError> {
366        let mut map = NamedFactMap::new();
367        let reader = self.writer.readonly();
368        loop {
369            for (name, kv) in repr.facts {
370                let sub = map.entry(name).or_default();
371                for (k, v) in kv {
372                    sub.entry(k).or_insert(v);
373                }
374            }
375            let Some(offset) = repr.prior else { break };
376            repr = reader.fetch(offset)?;
377        }
378
379        // Since there's no prior, we can remove tombstones
380        map.retain(|_, kv| {
381            kv.retain(|_, v| v.is_some());
382            !kv.is_empty()
383        });
384
385        Ok(self
386            .write_facts(LinearFactPerspective {
387                map,
388                prior: FactPerspectivePrior::None,
389            })?
390            .repr)
391    }
392}
393
394impl<F: Write> Storage for LinearStorage<F> {
395    type Perspective = LinearPerspective<F::ReadOnly>;
396    type FactPerspective = LinearFactPerspective<F::ReadOnly>;
397    type Segment = LinearSegment<F::ReadOnly>;
398    type FactIndex = LinearFactIndex<F::ReadOnly>;
399
400    fn get_command_id(&self, location: Location) -> Result<CommandId, StorageError> {
401        let seg = self.get_segment(location)?;
402        let cmd = seg
403            .get_command(location)
404            .ok_or(StorageError::CommandOutOfBounds(location))?;
405        Ok(cmd.id())
406    }
407
408    fn get_linear_perspective(
409        &self,
410        parent: Location,
411    ) -> Result<Option<Self::Perspective>, StorageError> {
412        let segment = self.get_segment(parent)?;
413        let command = segment
414            .get_command(parent)
415            .ok_or(StorageError::CommandOutOfBounds(parent))?;
416        let policy = segment.repr.policy;
417        let prior_facts: FactPerspectivePrior<F::ReadOnly> = if parent == segment.head_location() {
418            FactPerspectivePrior::FactIndex {
419                offset: segment.repr.facts,
420                reader: self.writer.readonly(),
421            }
422        } else {
423            let prior = match segment.facts()?.repr.prior {
424                Some(offset) => FactPerspectivePrior::FactIndex {
425                    offset,
426                    reader: self.writer.readonly(),
427                },
428                None => FactPerspectivePrior::None,
429            };
430            let mut facts = LinearFactPerspective::new(prior);
431            for data in &segment.repr.commands[..=parent.command] {
432                facts.apply_updates(&data.updates);
433            }
434            if facts.prior.is_none() {
435                facts.map.retain(|_, kv| !kv.is_empty());
436            }
437            if facts.map.is_empty() {
438                facts.prior
439            } else {
440                FactPerspectivePrior::FactPerspective(Box::new(facts))
441            }
442        };
443        let prior = Prior::Single(parent);
444
445        let perspective = LinearPerspective::new(
446            prior,
447            Prior::Single(command.address()?),
448            policy,
449            prior_facts,
450            command
451                .max_cut()?
452                .checked_add(1)
453                .assume("must not overflow")?,
454            None,
455        );
456
457        Ok(Some(perspective))
458    }
459
460    fn get_fact_perspective(
461        &self,
462        location: Location,
463    ) -> Result<Self::FactPerspective, StorageError> {
464        let segment = self.get_segment(location)?;
465
466        // If at head of segment, or no facts in segment,
467        // we don't need to apply updates.
468        if location == segment.head_location()
469            || segment
470                .repr
471                .commands
472                .iter()
473                .all(|cmd| cmd.updates.is_empty())
474        {
475            return Ok(LinearFactPerspective::new(
476                FactPerspectivePrior::FactIndex {
477                    offset: segment.repr.facts,
478                    reader: self.writer.readonly(),
479                },
480            ));
481        }
482
483        let prior = match segment.facts()?.repr.prior {
484            Some(offset) => FactPerspectivePrior::FactIndex {
485                offset,
486                reader: self.writer.readonly(),
487            },
488            None => FactPerspectivePrior::None,
489        };
490        let mut facts = LinearFactPerspective::new(prior);
491        for data in &segment.repr.commands[..=location.command] {
492            facts.apply_updates(&data.updates);
493        }
494
495        Ok(facts)
496    }
497
498    fn new_merge_perspective(
499        &self,
500        left: Location,
501        right: Location,
502        last_common_ancestor: (Location, usize),
503        policy_id: PolicyId,
504        braid: Self::FactIndex,
505    ) -> Result<Option<Self::Perspective>, StorageError> {
506        // TODO(jdygert): ensure braid belongs to this storage.
507        // TODO(jdygert): ensure braid ends at given command?
508        let left_segment = self.get_segment(left)?;
509        let left_command = left_segment
510            .get_command(left)
511            .ok_or(StorageError::CommandOutOfBounds(left))?;
512        let right_segment = self.get_segment(right)?;
513        let right_command = right_segment
514            .get_command(right)
515            .ok_or(StorageError::CommandOutOfBounds(right))?;
516
517        let parent = Prior::Merge(left_command.address()?, right_command.address()?);
518
519        if policy_id != left_segment.policy() && policy_id != right_segment.policy() {
520            return Err(StorageError::PolicyMismatch);
521        }
522
523        let prior = Prior::Merge(left, right);
524
525        let perspective = LinearPerspective::new(
526            prior,
527            parent,
528            policy_id,
529            FactPerspectivePrior::FactIndex {
530                offset: braid.repr.offset,
531                reader: braid.reader,
532            },
533            left_command
534                .max_cut()?
535                .max(right_command.max_cut()?)
536                .checked_add(1)
537                .assume("must not overflow")?,
538            Some(last_common_ancestor),
539        );
540
541        Ok(Some(perspective))
542    }
543
544    fn get_segment(&self, location: Location) -> Result<Self::Segment, StorageError> {
545        let reader = self.writer.readonly();
546        let repr = reader.fetch(location.segment)?;
547        let seg = LinearSegment { repr, reader };
548
549        Ok(seg)
550    }
551
552    fn get_head(&self) -> Result<Location, StorageError> {
553        self.writer.head()
554    }
555
556    fn commit(&mut self, segment: Self::Segment) -> Result<(), StorageError> {
557        if !self.is_ancestor(self.get_head()?, &segment)? {
558            return Err(StorageError::HeadNotAncestor);
559        }
560
561        self.writer.commit(segment.head_location())
562    }
563
564    fn write(&mut self, perspective: Self::Perspective) -> Result<Self::Segment, StorageError> {
565        // TODO(jdygert): Validate prior?
566
567        let facts = self.write_facts(perspective.facts)?.repr.offset;
568
569        let commands: Vec1<CommandData> = perspective
570            .commands
571            .try_into()
572            .map_err(|_| StorageError::EmptyPerspective)?;
573
574        let get_skips =
575            |l: Location, count: usize| -> Result<Vec<(Location, usize)>, StorageError> {
576                let mut rng = &mut Rng as &mut dyn Csprng;
577                let mut skips = vec![];
578                for _ in 0..count {
579                    let segment = self.get_segment(l)?;
580                    let l_max_cut = segment
581                        .get_command(l)
582                        .assume("location must exist")?
583                        .max_cut;
584                    if l_max_cut > 0 {
585                        let max_cut = rng.gen_range(0..l_max_cut);
586                        if let Some(skip) = self.get_skip(segment, max_cut)? {
587                            if !skips.contains(&skip) {
588                                skips.push(skip);
589                            }
590                        } else {
591                            break;
592                        }
593                    }
594                }
595                Ok(skips)
596            };
597
598        let skip_list = match perspective.prior {
599            Prior::None => vec![],
600            Prior::Merge(_, _) => {
601                let (lca, max_cut) = perspective.last_common_ancestor.assume("lca must exist")?;
602                let mut skips = get_skips(lca, 2)?;
603                if !skips.contains(&(lca, max_cut)) {
604                    skips.push((lca, max_cut));
605                }
606                skips.sort();
607                skips
608            }
609            Prior::Single(l) => {
610                let mut skips = get_skips(l, 3)?;
611                skips.sort();
612                skips
613            }
614        };
615        let repr = self.writer.append(|offset| SegmentRepr {
616            offset,
617            prior: perspective.prior,
618            parents: perspective.parents,
619            policy: perspective.policy,
620            facts,
621            commands,
622            max_cut: perspective.max_cut,
623            skip_list,
624        })?;
625
626        Ok(LinearSegment {
627            repr,
628            reader: self.writer.readonly(),
629        })
630    }
631
632    fn write_facts(
633        &mut self,
634        facts: Self::FactPerspective,
635    ) -> Result<Self::FactIndex, StorageError> {
636        let mut prior = match facts.prior {
637            FactPerspectivePrior::None => None,
638            FactPerspectivePrior::FactPerspective(prior) => {
639                let prior = self.write_facts(*prior)?;
640                if facts.map.is_empty() {
641                    return Ok(prior);
642                }
643                Some(prior.repr)
644            }
645            FactPerspectivePrior::FactIndex { offset, reader } => {
646                let repr = reader.fetch(offset)?;
647                if facts.map.is_empty() {
648                    return Ok(LinearFactIndex { repr, reader });
649                }
650                Some(repr)
651            }
652        };
653
654        let depth = if let Some(mut p) = prior.take() {
655            if p.depth > MAX_FACT_INDEX_DEPTH - 1 {
656                p = self.compact(p)?;
657            }
658            prior.insert(p).depth
659        } else {
660            0
661        };
662
663        let depth = depth.checked_add(1).assume("depth won't overflow")?;
664
665        if depth > MAX_FACT_INDEX_DEPTH {
666            bug!("fact index too deep");
667        }
668
669        let repr = self.writer.append(|offset| FactIndexRepr {
670            offset,
671            prior: prior.map(|p| p.offset),
672            depth,
673            facts: facts.map,
674        })?;
675
676        Ok(LinearFactIndex {
677            repr,
678            reader: self.writer.readonly(),
679        })
680    }
681}
682
683impl<R: Read> Segment for LinearSegment<R> {
684    type FactIndex = LinearFactIndex<R>;
685    type Command<'a>
686        = LinearCommand<'a>
687    where
688        R: 'a;
689
690    fn head(&self) -> Result<Self::Command<'_>, StorageError> {
691        let data = self.repr.commands.last();
692        let parent = if let Some(prev) = usize::checked_sub(self.repr.commands.len(), 2) {
693            Prior::Single(Address {
694                id: self.repr.commands[prev].id,
695                max_cut: self
696                    .repr
697                    .max_cut
698                    .checked_add(prev)
699                    .assume("must not overflow")?,
700            })
701        } else {
702            self.repr.parents
703        };
704        Ok(LinearCommand {
705            id: &data.id,
706            parent,
707            priority: data.priority.clone(),
708            policy: data.policy.as_deref(),
709            data: &data.data,
710            max_cut: self
711                .repr
712                .max_cut
713                .checked_add(self.repr.commands.len())
714                .assume("must not overflow")?
715                .checked_sub(1)
716                .assume("must not overflow")?,
717        })
718    }
719
720    fn first(&self) -> Self::Command<'_> {
721        let data = self.repr.commands.first();
722        LinearCommand {
723            id: &data.id,
724            parent: self.repr.parents,
725            priority: data.priority.clone(),
726            policy: data.policy.as_deref(),
727            data: &data.data,
728            max_cut: self.repr.max_cut,
729        }
730    }
731
732    fn head_location(&self) -> Location {
733        // vec1 length >= 1
734        #[allow(clippy::arithmetic_side_effects)]
735        Location::new(self.repr.offset, self.repr.commands.len() - 1)
736    }
737
738    fn first_location(&self) -> Location {
739        Location::new(self.repr.offset, 0)
740    }
741
742    fn contains(&self, location: Location) -> bool {
743        location.segment == self.repr.offset && location.command < self.repr.commands.len()
744    }
745
746    fn policy(&self) -> PolicyId {
747        self.repr.policy
748    }
749
750    fn prior(&self) -> Prior<Location> {
751        self.repr.prior
752    }
753
754    fn get_command(&self, location: Location) -> Option<Self::Command<'_>> {
755        if self.repr.offset != location.segment {
756            return None;
757        }
758        let data = self.repr.commands.get(location.command)?;
759        let parent = if let Some(prev) = usize::checked_sub(location.command, 1) {
760            if let Some(max_cut) = self.repr.max_cut.checked_add(prev) {
761                Prior::Single(Address {
762                    id: self.repr.commands[prev].id,
763                    max_cut,
764                })
765            } else {
766                return None;
767            }
768        } else {
769            self.repr.parents
770        };
771        self.repr
772            .max_cut
773            .checked_add(location.command)
774            .map(|max_cut| LinearCommand {
775                id: &data.id,
776                parent,
777                priority: data.priority.clone(),
778                policy: data.policy.as_deref(),
779                data: &data.data,
780                max_cut,
781            })
782    }
783
784    fn get_from(&self, location: Location) -> Vec<Self::Command<'_>> {
785        if self.repr.offset != location.segment {
786            // TODO(jdygert): Result?
787            return Vec::new();
788        }
789
790        // TODO(jdygert): Optimize?
791        (location.command..self.repr.commands.len())
792            .map(|c| Location::new(location.segment, c))
793            .map(|loc| {
794                self.get_command(loc)
795                    .expect("constructed location is valid")
796            })
797            .collect()
798    }
799
800    fn get_from_max_cut(&self, max_cut: usize) -> Result<Option<Location>, StorageError> {
801        if max_cut >= self.repr.max_cut
802            && max_cut
803                <= self
804                    .repr
805                    .max_cut
806                    .checked_add(self.repr.commands.len())
807                    .assume("must not overflow")?
808        {
809            return Ok(Some(Location::new(
810                self.repr.offset,
811                max_cut
812                    .checked_sub(self.repr.max_cut)
813                    .assume("must not overflow")?,
814            )));
815        }
816        Ok(None)
817    }
818
819    fn facts(&self) -> Result<Self::FactIndex, StorageError> {
820        Ok(LinearFactIndex {
821            repr: self.reader.fetch(self.repr.facts)?,
822            reader: self.reader.clone(),
823        })
824    }
825
826    fn skip_list(&self) -> &[(Location, usize)] {
827        &self.repr.skip_list
828    }
829
830    fn shortest_max_cut(&self) -> usize {
831        self.repr.max_cut
832    }
833
834    fn longest_max_cut(&self) -> Result<usize, StorageError> {
835        Ok(self
836            .repr
837            .max_cut
838            .checked_add(self.repr.commands.len())
839            .assume("must not overflow")?
840            .checked_sub(1)
841            .assume("must not overflow")?)
842    }
843}
844
845impl<R: Read> FactIndex for LinearFactIndex<R> {}
846
847type MapIter = alloc::collections::btree_map::IntoIter<Keys, Option<Bytes>>;
848pub struct QueryIterator {
849    it: MapIter,
850}
851
852impl QueryIterator {
853    fn new(it: MapIter) -> Self {
854        Self { it }
855    }
856}
857
858impl Iterator for QueryIterator {
859    type Item = Result<Fact, StorageError>;
860    fn next(&mut self) -> Option<Self::Item> {
861        loop {
862            // filter out tombstones
863            if let (key, Some(value)) = self.it.next()? {
864                return Some(Ok(Fact { key, value }));
865            }
866        }
867    }
868}
869
870impl<R: Read> Query for LinearFactIndex<R> {
871    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
872        let mut prior = Some(&self.repr);
873        let mut slot; // Need to store deserialized value.
874        while let Some(facts) = prior {
875            if let Some(v) = facts.facts.get(name).and_then(|m| m.get(keys)) {
876                return Ok(v.as_ref().cloned());
877            }
878            slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
879            prior = slot.as_ref();
880        }
881        Ok(None)
882    }
883
884    type QueryIterator = QueryIterator;
885    fn query_prefix(
886        &self,
887        name: &str,
888        prefix: &[Box<[u8]>],
889    ) -> Result<QueryIterator, StorageError> {
890        Ok(QueryIterator::new(
891            self.query_prefix_inner(name, prefix)?.into_iter(),
892        ))
893    }
894}
895
896impl<R: Read> LinearFactIndex<R> {
897    fn query_prefix_inner(
898        &self,
899        name: &str,
900        prefix: &[Box<[u8]>],
901    ) -> Result<FactMap, StorageError> {
902        let mut matches = BTreeMap::new();
903        let mut prior = Some(&self.repr);
904        let mut slot; // Need to store deserialized value.
905        while let Some(facts) = prior {
906            if let Some(map) = facts.facts.get(name) {
907                for (k, v) in super::memory::find_prefixes(map, prefix) {
908                    // don't override, if we've already found the fact (including deletions)
909                    if !matches.contains_key(k) {
910                        matches.insert(k.clone(), v.map(Into::into));
911                    }
912                }
913            }
914            slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
915            prior = slot.as_ref();
916        }
917        Ok(matches)
918    }
919}
920
921impl<R> LinearFactPerspective<R> {
922    fn clear(&mut self) {
923        self.map.clear();
924    }
925
926    fn apply_updates(&mut self, updates: &[Update]) {
927        for (name, key, value) in updates {
928            if self.prior.is_none() {
929                if let Some(value) = value {
930                    self.map
931                        .entry(name.clone())
932                        .or_default()
933                        .insert(key.clone(), Some(value.clone()));
934                } else if let Some(e) = self.map.get_mut(name) {
935                    e.remove(key);
936                }
937            } else {
938                self.map
939                    .entry(name.clone())
940                    .or_default()
941                    .insert(key.clone(), value.clone());
942            }
943        }
944    }
945}
946
947impl<R: Read> FactPerspective for LinearFactPerspective<R> {}
948
949impl<R: Read> Query for LinearFactPerspective<R> {
950    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
951        if let Some(wrapped) = self.map.get(name).and_then(|m| m.get(keys)) {
952            return Ok(wrapped.as_deref().map(Box::from));
953        }
954        match &self.prior {
955            FactPerspectivePrior::None => Ok(None),
956            FactPerspectivePrior::FactPerspective(prior) => prior.query(name, keys),
957            FactPerspectivePrior::FactIndex { offset, reader } => {
958                let repr: FactIndexRepr = reader.fetch(*offset)?;
959                let prior = LinearFactIndex {
960                    repr,
961                    reader: reader.clone(),
962                };
963                prior.query(name, keys)
964            }
965        }
966    }
967
968    type QueryIterator = QueryIterator;
969    fn query_prefix(
970        &self,
971        name: &str,
972        prefix: &[Box<[u8]>],
973    ) -> Result<QueryIterator, StorageError> {
974        Ok(QueryIterator::new(
975            self.query_prefix_inner(name, prefix)?.into_iter(),
976        ))
977    }
978}
979
980impl<R: Read> LinearFactPerspective<R> {
981    fn query_prefix_inner(
982        &self,
983        name: &str,
984        prefix: &[Box<[u8]>],
985    ) -> Result<FactMap, StorageError> {
986        let mut matches = match &self.prior {
987            FactPerspectivePrior::None => BTreeMap::new(),
988            FactPerspectivePrior::FactPerspective(prior) => {
989                prior.query_prefix_inner(name, prefix)?
990            }
991            FactPerspectivePrior::FactIndex { offset, reader } => {
992                let repr: FactIndexRepr = reader.fetch(*offset)?;
993                let prior = LinearFactIndex {
994                    repr,
995                    reader: reader.clone(),
996                };
997                prior.query_prefix_inner(name, prefix)?
998            }
999        };
1000        if let Some(map) = self.map.get(name) {
1001            for (k, v) in super::memory::find_prefixes(map, prefix) {
1002                // overwrite "earlier" facts
1003                matches.insert(k.clone(), v.map(Into::into));
1004            }
1005        }
1006        Ok(matches)
1007    }
1008}
1009
1010impl<R: Read> QueryMut for LinearFactPerspective<R> {
1011    fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1012        self.map.entry(name).or_default().insert(keys, Some(value));
1013    }
1014
1015    fn delete(&mut self, name: String, keys: Keys) {
1016        if self.prior.is_none() {
1017            // No need for tombstones with no prior.
1018            if let Some(kv) = self.map.get_mut(&name) {
1019                kv.remove(&keys);
1020            }
1021        } else {
1022            self.map.entry(name).or_default().insert(keys, None);
1023        }
1024    }
1025}
1026
1027impl<R: Read> FactPerspective for LinearPerspective<R> {}
1028
1029impl<R: Read> Query for LinearPerspective<R> {
1030    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
1031        self.facts.query(name, keys)
1032    }
1033
1034    type QueryIterator = QueryIterator;
1035    fn query_prefix(
1036        &self,
1037        name: &str,
1038        prefix: &[Box<[u8]>],
1039    ) -> Result<QueryIterator, StorageError> {
1040        self.facts.query_prefix(name, prefix)
1041    }
1042}
1043
1044impl<R: Read> QueryMut for LinearPerspective<R> {
1045    fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1046        self.facts.insert(name.clone(), keys.clone(), value.clone());
1047        self.current_updates.push((name, keys, Some(value)));
1048    }
1049
1050    fn delete(&mut self, name: String, keys: Keys) {
1051        self.facts.delete(name.clone(), keys.clone());
1052        self.current_updates.push((name, keys, None))
1053    }
1054}
1055
1056impl<R: Read> Revertable for LinearPerspective<R> {
1057    fn checkpoint(&self) -> Checkpoint {
1058        Checkpoint {
1059            index: self.commands.len(),
1060        }
1061    }
1062
1063    fn revert(&mut self, checkpoint: Checkpoint) -> Result<(), Bug> {
1064        if checkpoint.index == self.commands.len() {
1065            return Ok(());
1066        }
1067
1068        if checkpoint.index > self.commands.len() {
1069            bug!("A checkpoint's index should always be less than or equal to the length of a perspective's command history!");
1070        }
1071
1072        self.commands.truncate(checkpoint.index);
1073        self.facts.clear();
1074        self.current_updates.clear();
1075        for data in &self.commands {
1076            self.facts.apply_updates(&data.updates);
1077        }
1078
1079        Ok(())
1080    }
1081}
1082
1083impl<R: Read> Perspective for LinearPerspective<R> {
1084    fn policy(&self) -> PolicyId {
1085        self.policy
1086    }
1087
1088    fn add_command(&mut self, command: &impl Command) -> Result<usize, StorageError> {
1089        if command.parent() != self.head_address()? {
1090            return Err(StorageError::PerspectiveHeadMismatch);
1091        }
1092
1093        self.commands.push(CommandData {
1094            id: command.id(),
1095            priority: command.priority(),
1096            policy: command.policy().map(Box::from),
1097            data: command.bytes().into(),
1098            updates: core::mem::take(&mut self.current_updates),
1099        });
1100        Ok(self.commands.len())
1101    }
1102
1103    fn includes(&self, id: CommandId) -> bool {
1104        self.commands.iter().any(|cmd| cmd.id == id)
1105    }
1106
1107    fn head_address(&self) -> Result<Prior<Address>, Bug> {
1108        Ok(if let Some(last) = self.commands.last() {
1109            Prior::Single(Address {
1110                id: last.id,
1111                max_cut: self
1112                    .max_cut
1113                    .checked_add(self.commands.len())
1114                    .assume("must not overflow")?
1115                    .checked_sub(1)
1116                    .assume("must not overflow")?,
1117            })
1118        } else {
1119            self.parents
1120        })
1121    }
1122}
1123
1124impl From<Prior<Address>> for Prior<CommandId> {
1125    fn from(p: Prior<Address>) -> Self {
1126        match p {
1127            Prior::None => Prior::None,
1128            Prior::Single(l) => Prior::Single(l.id),
1129            Prior::Merge(l, r) => Prior::Merge(l.id, r.id),
1130        }
1131    }
1132}
1133
1134impl Command for LinearCommand<'_> {
1135    fn priority(&self) -> Priority {
1136        self.priority.clone()
1137    }
1138
1139    fn id(&self) -> CommandId {
1140        *self.id
1141    }
1142
1143    fn parent(&self) -> Prior<Address> {
1144        self.parent
1145    }
1146
1147    fn policy(&self) -> Option<&[u8]> {
1148        self.policy
1149    }
1150
1151    fn bytes(&self) -> &[u8] {
1152        self.data
1153    }
1154
1155    fn max_cut(&self) -> Result<usize, Bug> {
1156        Ok(self.max_cut)
1157    }
1158}
1159
1160#[cfg(test)]
1161mod test {
1162    use testing::Manager;
1163
1164    use super::*;
1165    use crate::testing::dsl::{test_suite, StorageBackend};
1166
1167    #[test]
1168    fn test_query_prefix() {
1169        let mut provider = LinearStorageProvider::new(Manager::new());
1170        let mut fp = provider.new_perspective(PolicyId::new(0));
1171
1172        let name = "x";
1173
1174        let keys: &[&[&str]] = &[
1175            &["aa", "xy", "123"],
1176            &["aa", "xz", "123"],
1177            &["bb", "ccc"],
1178            &["bc", ""],
1179        ];
1180        let keys: Vec<Keys> = keys
1181            .iter()
1182            .map(|ks| ks.iter().map(|k| k.as_bytes()).collect())
1183            .collect();
1184
1185        for ks in &keys {
1186            fp.insert(
1187                name.into(),
1188                ks.clone(),
1189                format!("{ks:?}").into_bytes().into(),
1190            );
1191        }
1192
1193        let prefixes: &[&[&str]] = &[
1194            &["aa", "xy", "12"],
1195            &["aa", "xy"],
1196            &["aa", "xz"],
1197            &["aa", "x"],
1198            &["bb", ""],
1199            &["bb", "ccc"],
1200            &["bc", ""],
1201            &["bc", "", ""],
1202        ];
1203
1204        for prefix in prefixes {
1205            let prefix: Keys = prefix.iter().map(|k| k.as_bytes()).collect();
1206            let found: Vec<_> = fp.query_prefix(name, &prefix).unwrap().collect();
1207            let mut expected: Vec<_> = keys.iter().filter(|k| k.starts_with(&prefix)).collect();
1208            expected.sort();
1209            assert_eq!(found.len(), expected.len());
1210            for (a, b) in std::iter::zip(found, expected) {
1211                let a = a.unwrap();
1212                assert_eq!(&a.key, b);
1213                assert_eq!(a.value.as_ref(), format!("{b:?}").as_bytes());
1214            }
1215        }
1216    }
1217
1218    struct LinearBackend;
1219    impl StorageBackend for LinearBackend {
1220        type StorageProvider = LinearStorageProvider<Manager>;
1221
1222        fn provider(&mut self, _client_id: u64) -> Self::StorageProvider {
1223            LinearStorageProvider::new(Manager::new())
1224        }
1225    }
1226    test_suite!(|| LinearBackend);
1227}