aranya_runtime/storage/linear/
mod.rs

1//! Persistant linear storage implemenatation.
2//!
3//! `LinearStorage` is a graph storage implementation backed by a file-like byte
4//! storage interface. This is designed to be usable across many environments
5//! with minimal assumptions on the underlying storage.
6//!
7//! # Layout
8//!
9//! `[x]` is page aligned.
10//!
11//! ```text
12//! // Control section
13//! [Base] [Root] [Root]
14//! // Data section
15//! [Segment or FactIndex]
16//! |
17//! V
18//! ```
19//!
20//! The `LinearStorage` will exclusively modify the control section. The data
21//! section is append-only but can be read concurrently. If written data is not
22//! committed, it may be overwritten and will become unreachable by intended
23//! means.
24
25pub mod libc;
26
27#[cfg(feature = "testing")]
28pub mod testing;
29
30use alloc::{boxed::Box, collections::BTreeMap, string::String, vec, vec::Vec};
31
32use aranya_crypto::{Csprng, Rng, dangerous::spideroak_crypto::csprng::rand::Rng as _};
33use buggy::{Bug, BugExt as _, bug};
34use serde::{Deserialize, Serialize};
35use vec1::Vec1;
36
37use crate::{
38    Address, Checkpoint, CmdId, Command, Fact, FactIndex, FactPerspective, GraphId, Keys, Location,
39    Perspective, PolicyId, Prior, Priority, Query, QueryMut, Revertable, Segment, Storage,
40    StorageError, StorageProvider,
41};
42
43pub mod io;
44pub use io::*;
45
46/// Maximum depth of fact indices before compaction.
47///
48/// A lower value will speed up search queries but require more compaction,
49/// slowing down fact index creation and using more storage space.
50///
51/// In the future, this may be configurable at runtime or dynamic based on
52/// heuristics such as fact density.
53///
54/// 16 is our initial guess for balance.
55///
56/// This must be at least 2.
57const MAX_FACT_INDEX_DEPTH: usize = 16;
58
59pub struct LinearStorageProvider<FM: IoManager> {
60    manager: FM,
61    storage: BTreeMap<GraphId, LinearStorage<FM::Writer>>,
62}
63
64pub struct LinearStorage<W> {
65    writer: W,
66}
67
68#[derive(Debug)]
69pub struct LinearSegment<R> {
70    repr: SegmentRepr,
71    reader: R,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75struct SegmentRepr {
76    /// Self offset in file.
77    offset: usize,
78    prior: Prior<Location>,
79    parents: Prior<Address>,
80    policy: PolicyId,
81    /// Offset in file to associated fact index.
82    facts: usize,
83    commands: Vec1<CommandData>,
84    max_cut: usize,
85    skip_list: Vec<(Location, usize)>,
86}
87
88#[derive(Debug, Serialize, Deserialize)]
89struct CommandData {
90    id: CmdId,
91    priority: Priority,
92    policy: Option<Bytes>,
93    data: Bytes,
94    updates: Vec<Update>,
95}
96
97pub struct LinearCommand<'a> {
98    id: &'a CmdId,
99    parent: Prior<Address>,
100    priority: Priority,
101    policy: Option<&'a [u8]>,
102    data: &'a [u8],
103    max_cut: usize,
104}
105
106type Bytes = Box<[u8]>;
107
108type Update = (String, Keys, Option<Bytes>);
109type FactMap = BTreeMap<Keys, Option<Box<[u8]>>>;
110type NamedFactMap = BTreeMap<String, FactMap>;
111
112#[derive(Debug)]
113pub struct LinearFactIndex<R> {
114    repr: FactIndexRepr,
115    reader: R,
116}
117
118#[derive(Debug, Serialize, Deserialize)]
119struct FactIndexRepr {
120    /// Self offset in file.
121    offset: usize,
122    /// Offset of prior fact index.
123    prior: Option<usize>,
124    /// Depth of this fact index.
125    ///
126    /// `prior.depth + 1`, or just `1` if no prior
127    depth: usize,
128    /// Facts in sorted order
129    facts: NamedFactMap,
130}
131
132#[derive(Debug)]
133pub struct LinearPerspective<R> {
134    prior: Prior<Location>,
135    parents: Prior<Address>,
136    policy: PolicyId,
137    facts: LinearFactPerspective<R>,
138    commands: Vec<CommandData>,
139    current_updates: Vec<Update>,
140    max_cut: usize,
141    last_common_ancestor: Option<(Location, usize)>,
142}
143
144impl<R> LinearPerspective<R> {
145    fn new(
146        prior: Prior<Location>,
147        parents: Prior<Address>,
148        policy: PolicyId,
149        prior_facts: FactPerspectivePrior<R>,
150        max_cut: usize,
151        last_common_ancestor: Option<(Location, usize)>,
152    ) -> Self {
153        Self {
154            prior,
155            parents,
156            policy,
157            facts: LinearFactPerspective::new(prior_facts),
158            commands: Vec::new(),
159            current_updates: Vec::new(),
160            max_cut,
161            last_common_ancestor,
162        }
163    }
164}
165
166#[derive(Debug)]
167pub struct LinearFactPerspective<R> {
168    map: BTreeMap<String, BTreeMap<Keys, Option<Bytes>>>,
169    prior: FactPerspectivePrior<R>,
170}
171
172impl<R> LinearFactPerspective<R> {
173    fn new(prior: FactPerspectivePrior<R>) -> Self {
174        Self {
175            map: BTreeMap::new(),
176            prior,
177        }
178    }
179}
180
181#[derive(Debug)]
182enum FactPerspectivePrior<R> {
183    None,
184    FactPerspective(Box<LinearFactPerspective<R>>),
185    FactIndex { offset: usize, reader: R },
186}
187
188impl<R> FactPerspectivePrior<R> {
189    fn is_none(&self) -> bool {
190        matches!(self, Self::None)
191    }
192}
193
194impl<FM: IoManager + Default> Default for LinearStorageProvider<FM> {
195    fn default() -> Self {
196        Self {
197            manager: FM::default(),
198            storage: BTreeMap::new(),
199        }
200    }
201}
202
203impl<FM: IoManager> LinearStorageProvider<FM> {
204    pub fn new(manager: FM) -> Self {
205        Self {
206            manager,
207            storage: BTreeMap::new(),
208        }
209    }
210}
211
212impl<FM: IoManager> StorageProvider for LinearStorageProvider<FM> {
213    type Perspective = LinearPerspective<<FM::Writer as Write>::ReadOnly>;
214    type Segment = LinearSegment<<FM::Writer as Write>::ReadOnly>;
215    type Storage = LinearStorage<FM::Writer>;
216
217    fn new_perspective(&mut self, policy_id: PolicyId) -> Self::Perspective {
218        LinearPerspective::new(
219            Prior::None,
220            Prior::None,
221            policy_id,
222            FactPerspectivePrior::None,
223            0,
224            None,
225        )
226    }
227
228    fn new_storage(
229        &mut self,
230        init: Self::Perspective,
231    ) -> Result<(GraphId, &mut Self::Storage), StorageError> {
232        use alloc::collections::btree_map::Entry;
233
234        if init.commands.is_empty() {
235            return Err(StorageError::EmptyPerspective);
236        }
237        let graph_id = GraphId::transmute(init.commands[0].id);
238        let Entry::Vacant(entry) = self.storage.entry(graph_id) else {
239            return Err(StorageError::StorageExists);
240        };
241
242        let file = self.manager.create(graph_id)?;
243        Ok((graph_id, entry.insert(LinearStorage::create(file, init)?)))
244    }
245
246    fn get_storage(&mut self, graph: GraphId) -> Result<&mut Self::Storage, StorageError> {
247        use alloc::collections::btree_map::Entry;
248
249        let entry = match self.storage.entry(graph) {
250            Entry::Vacant(v) => v,
251            Entry::Occupied(o) => return Ok(o.into_mut()),
252        };
253
254        let file = self
255            .manager
256            .open(graph)?
257            .ok_or(StorageError::NoSuchStorage)?;
258        Ok(entry.insert(LinearStorage::open(file)?))
259    }
260
261    fn remove_storage(&mut self, graph: GraphId) -> Result<(), StorageError> {
262        self.manager.remove(graph)?;
263
264        self.storage
265            .remove(&graph)
266            .ok_or(StorageError::NoSuchStorage)?;
267
268        Ok(())
269    }
270
271    fn list_graph_ids(
272        &mut self,
273    ) -> Result<impl Iterator<Item = Result<GraphId, StorageError>>, StorageError> {
274        self.manager.list()
275    }
276}
277
278impl<W: Write> LinearStorage<W> {
279    fn get_skip(
280        &self,
281        segment: <Self as Storage>::Segment,
282        max_cut: usize,
283    ) -> Result<Option<(Location, usize)>, StorageError> {
284        let mut head = segment;
285        let mut current = None;
286        'outer: loop {
287            if max_cut > head.longest_max_cut()? {
288                return Ok(current);
289            }
290            current = Some((head.first_location(), head.shortest_max_cut()));
291            if max_cut >= head.shortest_max_cut() {
292                return Ok(current);
293            }
294            // Assumes skip list is sorted in ascending order.
295            // We always want to skip as close to the root as possible.
296            for (skip, skip_max_cut) in head.skip_list() {
297                if skip_max_cut <= &max_cut {
298                    head = self.get_segment(*skip)?;
299                    continue 'outer;
300                }
301            }
302            head = match head.prior() {
303                Prior::None | Prior::Merge(_, _) => {
304                    return Ok(current);
305                }
306                Prior::Single(l) => self.get_segment(l)?,
307            }
308        }
309    }
310}
311
312impl<W: Write> LinearStorage<W> {
313    fn create(mut writer: W, init: LinearPerspective<W::ReadOnly>) -> Result<Self, StorageError> {
314        assert!(matches!(init.prior, Prior::None));
315        assert!(matches!(init.parents, Prior::None));
316        assert!(matches!(init.facts.prior, FactPerspectivePrior::None));
317
318        let mut map = init.facts.map;
319        map.retain(|_, kv| !kv.is_empty());
320
321        let facts = writer
322            .append(|offset| FactIndexRepr {
323                offset,
324                prior: None,
325                depth: 1,
326                facts: map,
327            })?
328            .offset;
329
330        let commands = init
331            .commands
332            .try_into()
333            .map_err(|_| StorageError::EmptyPerspective)?;
334        let segment = writer.append(|offset| SegmentRepr {
335            offset,
336            prior: Prior::None,
337            parents: Prior::None,
338            policy: init.policy,
339            facts,
340            commands,
341            max_cut: 0,
342            skip_list: vec![],
343        })?;
344
345        let head = Location::new(
346            segment.offset,
347            segment
348                .commands
349                .len()
350                .checked_sub(1)
351                .assume("vec1 length >= 1")?,
352        );
353
354        writer.commit(head)?;
355
356        let storage = Self { writer };
357
358        Ok(storage)
359    }
360
361    fn open(writer: W) -> Result<Self, StorageError> {
362        Ok(Self { writer })
363    }
364
365    fn compact(&mut self, mut repr: FactIndexRepr) -> Result<FactIndexRepr, StorageError> {
366        let mut map = NamedFactMap::new();
367        let reader = self.writer.readonly();
368        loop {
369            for (name, kv) in repr.facts {
370                let sub = map.entry(name).or_default();
371                for (k, v) in kv {
372                    sub.entry(k).or_insert(v);
373                }
374            }
375            let Some(offset) = repr.prior else { break };
376            repr = reader.fetch(offset)?;
377        }
378
379        // Since there's no prior, we can remove tombstones
380        map.retain(|_, kv| {
381            kv.retain(|_, v| v.is_some());
382            !kv.is_empty()
383        });
384
385        Ok(self
386            .write_facts(LinearFactPerspective {
387                map,
388                prior: FactPerspectivePrior::None,
389            })?
390            .repr)
391    }
392}
393
394impl<F: Write> Storage for LinearStorage<F> {
395    type Perspective = LinearPerspective<F::ReadOnly>;
396    type FactPerspective = LinearFactPerspective<F::ReadOnly>;
397    type Segment = LinearSegment<F::ReadOnly>;
398    type FactIndex = LinearFactIndex<F::ReadOnly>;
399
400    fn get_linear_perspective(&self, parent: Location) -> Result<Self::Perspective, StorageError> {
401        let segment = self.get_segment(parent)?;
402        let command = segment
403            .get_command(parent)
404            .ok_or(StorageError::CommandOutOfBounds(parent))?;
405        let policy = segment.repr.policy;
406        let prior_facts: FactPerspectivePrior<F::ReadOnly> = if parent == segment.head_location() {
407            FactPerspectivePrior::FactIndex {
408                offset: segment.repr.facts,
409                reader: self.writer.readonly(),
410            }
411        } else {
412            let prior = match segment.facts()?.repr.prior {
413                Some(offset) => FactPerspectivePrior::FactIndex {
414                    offset,
415                    reader: self.writer.readonly(),
416                },
417                None => FactPerspectivePrior::None,
418            };
419            let mut facts = LinearFactPerspective::new(prior);
420            for data in &segment.repr.commands[..=parent.command] {
421                facts.apply_updates(&data.updates);
422            }
423            if facts.prior.is_none() {
424                facts.map.retain(|_, kv| !kv.is_empty());
425            }
426            if facts.map.is_empty() {
427                facts.prior
428            } else {
429                FactPerspectivePrior::FactPerspective(Box::new(facts))
430            }
431        };
432        let prior = Prior::Single(parent);
433
434        let perspective = LinearPerspective::new(
435            prior,
436            Prior::Single(command.address()?),
437            policy,
438            prior_facts,
439            command
440                .max_cut()?
441                .checked_add(1)
442                .assume("must not overflow")?,
443            None,
444        );
445
446        Ok(perspective)
447    }
448
449    fn get_fact_perspective(
450        &self,
451        location: Location,
452    ) -> Result<Self::FactPerspective, StorageError> {
453        let segment = self.get_segment(location)?;
454
455        // If at head of segment, or no facts in segment,
456        // we don't need to apply updates.
457        if location == segment.head_location()
458            || segment
459                .repr
460                .commands
461                .iter()
462                .all(|cmd| cmd.updates.is_empty())
463        {
464            return Ok(LinearFactPerspective::new(
465                FactPerspectivePrior::FactIndex {
466                    offset: segment.repr.facts,
467                    reader: self.writer.readonly(),
468                },
469            ));
470        }
471
472        let prior = match segment.facts()?.repr.prior {
473            Some(offset) => FactPerspectivePrior::FactIndex {
474                offset,
475                reader: self.writer.readonly(),
476            },
477            None => FactPerspectivePrior::None,
478        };
479        let mut facts = LinearFactPerspective::new(prior);
480        for data in &segment.repr.commands[..=location.command] {
481            facts.apply_updates(&data.updates);
482        }
483
484        Ok(facts)
485    }
486
487    fn new_merge_perspective(
488        &self,
489        left: Location,
490        right: Location,
491        last_common_ancestor: (Location, usize),
492        policy_id: PolicyId,
493        braid: Self::FactIndex,
494    ) -> Result<Self::Perspective, StorageError> {
495        // TODO(jdygert): ensure braid belongs to this storage.
496        // TODO(jdygert): ensure braid ends at given command?
497        let left_segment = self.get_segment(left)?;
498        let left_command = left_segment
499            .get_command(left)
500            .ok_or(StorageError::CommandOutOfBounds(left))?;
501        let right_segment = self.get_segment(right)?;
502        let right_command = right_segment
503            .get_command(right)
504            .ok_or(StorageError::CommandOutOfBounds(right))?;
505
506        let parent = Prior::Merge(left_command.address()?, right_command.address()?);
507
508        if policy_id != left_segment.policy() && policy_id != right_segment.policy() {
509            return Err(StorageError::PolicyMismatch);
510        }
511
512        let prior = Prior::Merge(left, right);
513
514        let perspective = LinearPerspective::new(
515            prior,
516            parent,
517            policy_id,
518            FactPerspectivePrior::FactIndex {
519                offset: braid.repr.offset,
520                reader: braid.reader,
521            },
522            left_command
523                .max_cut()?
524                .max(right_command.max_cut()?)
525                .checked_add(1)
526                .assume("must not overflow")?,
527            Some(last_common_ancestor),
528        );
529
530        Ok(perspective)
531    }
532
533    fn get_segment(&self, location: Location) -> Result<Self::Segment, StorageError> {
534        let reader = self.writer.readonly();
535        let repr = reader.fetch(location.segment)?;
536        let seg = LinearSegment { repr, reader };
537
538        Ok(seg)
539    }
540
541    fn get_head(&self) -> Result<Location, StorageError> {
542        self.writer.head()
543    }
544
545    fn commit(&mut self, segment: Self::Segment) -> Result<(), StorageError> {
546        if !self.is_ancestor(self.get_head()?, &segment)? {
547            return Err(StorageError::HeadNotAncestor);
548        }
549
550        self.writer.commit(segment.head_location())
551    }
552
553    fn write(&mut self, perspective: Self::Perspective) -> Result<Self::Segment, StorageError> {
554        // TODO(jdygert): Validate prior?
555
556        let facts = self.write_facts(perspective.facts)?.repr.offset;
557
558        let commands: Vec1<CommandData> = perspective
559            .commands
560            .try_into()
561            .map_err(|_| StorageError::EmptyPerspective)?;
562
563        let get_skips =
564            |l: Location, count: usize| -> Result<Vec<(Location, usize)>, StorageError> {
565                let mut rng = &mut Rng as &mut dyn Csprng;
566                let mut skips = vec![];
567                for _ in 0..count {
568                    let segment = self.get_segment(l)?;
569                    let l_max_cut = segment
570                        .get_command(l)
571                        .assume("location must exist")?
572                        .max_cut;
573                    if l_max_cut > 0 {
574                        let max_cut = rng.gen_range(0..l_max_cut);
575                        if let Some(skip) = self.get_skip(segment, max_cut)? {
576                            if !skips.contains(&skip) {
577                                skips.push(skip);
578                            }
579                        } else {
580                            break;
581                        }
582                    }
583                }
584                Ok(skips)
585            };
586
587        let skip_list = match perspective.prior {
588            Prior::None => vec![],
589            Prior::Merge(_, _) => {
590                let (lca, max_cut) = perspective.last_common_ancestor.assume("lca must exist")?;
591                let mut skips = get_skips(lca, 2)?;
592                if !skips.contains(&(lca, max_cut)) {
593                    skips.push((lca, max_cut));
594                }
595                skips.sort();
596                skips
597            }
598            Prior::Single(l) => {
599                let mut skips = get_skips(l, 3)?;
600                skips.sort();
601                skips
602            }
603        };
604        let repr = self.writer.append(|offset| SegmentRepr {
605            offset,
606            prior: perspective.prior,
607            parents: perspective.parents,
608            policy: perspective.policy,
609            facts,
610            commands,
611            max_cut: perspective.max_cut,
612            skip_list,
613        })?;
614
615        Ok(LinearSegment {
616            repr,
617            reader: self.writer.readonly(),
618        })
619    }
620
621    fn write_facts(
622        &mut self,
623        facts: Self::FactPerspective,
624    ) -> Result<Self::FactIndex, StorageError> {
625        let mut prior = match facts.prior {
626            FactPerspectivePrior::None => None,
627            FactPerspectivePrior::FactPerspective(prior) => {
628                let prior = self.write_facts(*prior)?;
629                if facts.map.is_empty() {
630                    return Ok(prior);
631                }
632                Some(prior.repr)
633            }
634            FactPerspectivePrior::FactIndex { offset, reader } => {
635                let repr = reader.fetch(offset)?;
636                if facts.map.is_empty() {
637                    return Ok(LinearFactIndex { repr, reader });
638                }
639                Some(repr)
640            }
641        };
642
643        let depth = if let Some(mut p) = prior.take() {
644            if p.depth > MAX_FACT_INDEX_DEPTH - 1 {
645                p = self.compact(p)?;
646            }
647            prior.insert(p).depth
648        } else {
649            0
650        };
651
652        let depth = depth.checked_add(1).assume("depth won't overflow")?;
653
654        if depth > MAX_FACT_INDEX_DEPTH {
655            bug!("fact index too deep");
656        }
657
658        let repr = self.writer.append(|offset| FactIndexRepr {
659            offset,
660            prior: prior.map(|p| p.offset),
661            depth,
662            facts: facts.map,
663        })?;
664
665        Ok(LinearFactIndex {
666            repr,
667            reader: self.writer.readonly(),
668        })
669    }
670}
671
672impl<R: Read> Segment for LinearSegment<R> {
673    type FactIndex = LinearFactIndex<R>;
674    type Command<'a>
675        = LinearCommand<'a>
676    where
677        R: 'a;
678
679    fn head(&self) -> Result<Self::Command<'_>, StorageError> {
680        let data = self.repr.commands.last();
681        let parent = if let Some(prev) = usize::checked_sub(self.repr.commands.len(), 2) {
682            Prior::Single(Address {
683                id: self.repr.commands[prev].id,
684                max_cut: self
685                    .repr
686                    .max_cut
687                    .checked_add(prev)
688                    .assume("must not overflow")?,
689            })
690        } else {
691            self.repr.parents
692        };
693        Ok(LinearCommand {
694            id: &data.id,
695            parent,
696            priority: data.priority.clone(),
697            policy: data.policy.as_deref(),
698            data: &data.data,
699            max_cut: self
700                .repr
701                .max_cut
702                .checked_add(self.repr.commands.len())
703                .assume("must not overflow")?
704                .checked_sub(1)
705                .assume("must not overflow")?,
706        })
707    }
708
709    fn first(&self) -> Self::Command<'_> {
710        let data = self.repr.commands.first();
711        LinearCommand {
712            id: &data.id,
713            parent: self.repr.parents,
714            priority: data.priority.clone(),
715            policy: data.policy.as_deref(),
716            data: &data.data,
717            max_cut: self.repr.max_cut,
718        }
719    }
720
721    fn head_location(&self) -> Location {
722        // vec1 length >= 1
723        #[allow(clippy::arithmetic_side_effects)]
724        Location::new(self.repr.offset, self.repr.commands.len() - 1)
725    }
726
727    fn first_location(&self) -> Location {
728        Location::new(self.repr.offset, 0)
729    }
730
731    fn contains(&self, location: Location) -> bool {
732        location.segment == self.repr.offset && location.command < self.repr.commands.len()
733    }
734
735    fn policy(&self) -> PolicyId {
736        self.repr.policy
737    }
738
739    fn prior(&self) -> Prior<Location> {
740        self.repr.prior
741    }
742
743    fn get_command(&self, location: Location) -> Option<Self::Command<'_>> {
744        if self.repr.offset != location.segment {
745            return None;
746        }
747        let data = self.repr.commands.get(location.command)?;
748        let parent = if let Some(prev) = usize::checked_sub(location.command, 1) {
749            if let Some(max_cut) = self.repr.max_cut.checked_add(prev) {
750                Prior::Single(Address {
751                    id: self.repr.commands[prev].id,
752                    max_cut,
753                })
754            } else {
755                return None;
756            }
757        } else {
758            self.repr.parents
759        };
760        self.repr
761            .max_cut
762            .checked_add(location.command)
763            .map(|max_cut| LinearCommand {
764                id: &data.id,
765                parent,
766                priority: data.priority.clone(),
767                policy: data.policy.as_deref(),
768                data: &data.data,
769                max_cut,
770            })
771    }
772
773    fn get_from(&self, location: Location) -> Vec<Self::Command<'_>> {
774        if self.repr.offset != location.segment {
775            // TODO(jdygert): Result?
776            return Vec::new();
777        }
778
779        // TODO(jdygert): Optimize?
780        (location.command..self.repr.commands.len())
781            .map(|c| Location::new(location.segment, c))
782            .map(|loc| {
783                self.get_command(loc)
784                    .expect("constructed location is valid")
785            })
786            .collect()
787    }
788
789    fn get_by_address(&self, address: Address) -> Option<Location> {
790        if address.max_cut < self.repr.max_cut {
791            return None;
792        }
793        let idx = address.max_cut.checked_sub(self.repr.max_cut)?;
794        let cmd = self.repr.commands.get(idx)?;
795        if cmd.id != address.id {
796            return None;
797        }
798        Some(Location::new(self.repr.offset, idx))
799    }
800
801    fn facts(&self) -> Result<Self::FactIndex, StorageError> {
802        Ok(LinearFactIndex {
803            repr: self.reader.fetch(self.repr.facts)?,
804            reader: self.reader.clone(),
805        })
806    }
807
808    fn skip_list(&self) -> &[(Location, usize)] {
809        &self.repr.skip_list
810    }
811
812    fn shortest_max_cut(&self) -> usize {
813        self.repr.max_cut
814    }
815
816    fn longest_max_cut(&self) -> Result<usize, StorageError> {
817        Ok(self
818            .repr
819            .max_cut
820            .checked_add(self.repr.commands.len())
821            .assume("must not overflow")?
822            .checked_sub(1)
823            .assume("must not overflow")?)
824    }
825}
826
827impl<R: Read> FactIndex for LinearFactIndex<R> {}
828
829type MapIter = alloc::collections::btree_map::IntoIter<Keys, Option<Bytes>>;
830pub struct QueryIterator {
831    it: MapIter,
832}
833
834impl QueryIterator {
835    fn new(it: MapIter) -> Self {
836        Self { it }
837    }
838}
839
840impl Iterator for QueryIterator {
841    type Item = Result<Fact, StorageError>;
842    fn next(&mut self) -> Option<Self::Item> {
843        loop {
844            // filter out tombstones
845            if let (key, Some(value)) = self.it.next()? {
846                return Some(Ok(Fact { key, value }));
847            }
848        }
849    }
850}
851
852impl<R: Read> Query for LinearFactIndex<R> {
853    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
854        let mut prior = Some(&self.repr);
855        let mut slot; // Need to store deserialized value.
856        while let Some(facts) = prior {
857            if let Some(v) = facts.facts.get(name).and_then(|m| m.get(keys)) {
858                return Ok(v.clone());
859            }
860            slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
861            prior = slot.as_ref();
862        }
863        Ok(None)
864    }
865
866    type QueryIterator = QueryIterator;
867    fn query_prefix(
868        &self,
869        name: &str,
870        prefix: &[Box<[u8]>],
871    ) -> Result<QueryIterator, StorageError> {
872        Ok(QueryIterator::new(
873            self.query_prefix_inner(name, prefix)?.into_iter(),
874        ))
875    }
876}
877
878impl<R: Read> LinearFactIndex<R> {
879    fn query_prefix_inner(
880        &self,
881        name: &str,
882        prefix: &[Box<[u8]>],
883    ) -> Result<FactMap, StorageError> {
884        let mut matches = BTreeMap::new();
885        let mut prior = Some(&self.repr);
886        let mut slot; // Need to store deserialized value.
887        while let Some(facts) = prior {
888            if let Some(map) = facts.facts.get(name) {
889                for (k, v) in super::memory::find_prefixes(map, prefix) {
890                    // don't override, if we've already found the fact (including deletions)
891                    if !matches.contains_key(k) {
892                        matches.insert(k.clone(), v.map(Into::into));
893                    }
894                }
895            }
896            slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
897            prior = slot.as_ref();
898        }
899        Ok(matches)
900    }
901}
902
903impl<R> LinearFactPerspective<R> {
904    fn clear(&mut self) {
905        self.map.clear();
906    }
907
908    fn apply_updates(&mut self, updates: &[Update]) {
909        for (name, key, value) in updates {
910            if self.prior.is_none() {
911                if let Some(value) = value {
912                    self.map
913                        .entry(name.clone())
914                        .or_default()
915                        .insert(key.clone(), Some(value.clone()));
916                } else if let Some(e) = self.map.get_mut(name) {
917                    e.remove(key);
918                }
919            } else {
920                self.map
921                    .entry(name.clone())
922                    .or_default()
923                    .insert(key.clone(), value.clone());
924            }
925        }
926    }
927}
928
929impl<R: Read> FactPerspective for LinearFactPerspective<R> {}
930
931impl<R: Read> Query for LinearFactPerspective<R> {
932    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
933        if let Some(wrapped) = self.map.get(name).and_then(|m| m.get(keys)) {
934            return Ok(wrapped.as_deref().map(Box::from));
935        }
936        match &self.prior {
937            FactPerspectivePrior::None => Ok(None),
938            FactPerspectivePrior::FactPerspective(prior) => prior.query(name, keys),
939            FactPerspectivePrior::FactIndex { offset, reader } => {
940                let repr: FactIndexRepr = reader.fetch(*offset)?;
941                let prior = LinearFactIndex {
942                    repr,
943                    reader: reader.clone(),
944                };
945                prior.query(name, keys)
946            }
947        }
948    }
949
950    type QueryIterator = QueryIterator;
951    fn query_prefix(
952        &self,
953        name: &str,
954        prefix: &[Box<[u8]>],
955    ) -> Result<QueryIterator, StorageError> {
956        Ok(QueryIterator::new(
957            self.query_prefix_inner(name, prefix)?.into_iter(),
958        ))
959    }
960}
961
962impl<R: Read> LinearFactPerspective<R> {
963    fn query_prefix_inner(
964        &self,
965        name: &str,
966        prefix: &[Box<[u8]>],
967    ) -> Result<FactMap, StorageError> {
968        let mut matches = match &self.prior {
969            FactPerspectivePrior::None => BTreeMap::new(),
970            FactPerspectivePrior::FactPerspective(prior) => {
971                prior.query_prefix_inner(name, prefix)?
972            }
973            FactPerspectivePrior::FactIndex { offset, reader } => {
974                let repr: FactIndexRepr = reader.fetch(*offset)?;
975                let prior = LinearFactIndex {
976                    repr,
977                    reader: reader.clone(),
978                };
979                prior.query_prefix_inner(name, prefix)?
980            }
981        };
982        if let Some(map) = self.map.get(name) {
983            for (k, v) in super::memory::find_prefixes(map, prefix) {
984                // overwrite "earlier" facts
985                matches.insert(k.clone(), v.map(Into::into));
986            }
987        }
988        Ok(matches)
989    }
990}
991
992impl<R: Read> QueryMut for LinearFactPerspective<R> {
993    fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
994        self.map.entry(name).or_default().insert(keys, Some(value));
995    }
996
997    fn delete(&mut self, name: String, keys: Keys) {
998        if self.prior.is_none() {
999            // No need for tombstones with no prior.
1000            if let Some(kv) = self.map.get_mut(&name) {
1001                kv.remove(&keys);
1002            }
1003        } else {
1004            self.map.entry(name).or_default().insert(keys, None);
1005        }
1006    }
1007}
1008
1009impl<R: Read> FactPerspective for LinearPerspective<R> {}
1010
1011impl<R: Read> Query for LinearPerspective<R> {
1012    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
1013        self.facts.query(name, keys)
1014    }
1015
1016    type QueryIterator = QueryIterator;
1017    fn query_prefix(
1018        &self,
1019        name: &str,
1020        prefix: &[Box<[u8]>],
1021    ) -> Result<QueryIterator, StorageError> {
1022        self.facts.query_prefix(name, prefix)
1023    }
1024}
1025
1026impl<R: Read> QueryMut for LinearPerspective<R> {
1027    fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1028        self.facts.insert(name.clone(), keys.clone(), value.clone());
1029        self.current_updates.push((name, keys, Some(value)));
1030    }
1031
1032    fn delete(&mut self, name: String, keys: Keys) {
1033        self.facts.delete(name.clone(), keys.clone());
1034        self.current_updates.push((name, keys, None));
1035    }
1036}
1037
1038impl<R: Read> Revertable for LinearPerspective<R> {
1039    fn checkpoint(&self) -> Checkpoint {
1040        Checkpoint {
1041            index: self.commands.len(),
1042        }
1043    }
1044
1045    fn revert(&mut self, checkpoint: Checkpoint) -> Result<(), Bug> {
1046        if checkpoint.index == self.commands.len() {
1047            return Ok(());
1048        }
1049
1050        if checkpoint.index > self.commands.len() {
1051            bug!(
1052                "A checkpoint's index should always be less than or equal to the length of a perspective's command history!"
1053            );
1054        }
1055
1056        self.commands.truncate(checkpoint.index);
1057        self.facts.clear();
1058        self.current_updates.clear();
1059        for data in &self.commands {
1060            self.facts.apply_updates(&data.updates);
1061        }
1062
1063        Ok(())
1064    }
1065}
1066
1067impl<R: Read> Perspective for LinearPerspective<R> {
1068    fn policy(&self) -> PolicyId {
1069        self.policy
1070    }
1071
1072    fn add_command(&mut self, command: &impl Command) -> Result<usize, StorageError> {
1073        if command.parent() != self.head_address()? {
1074            return Err(StorageError::PerspectiveHeadMismatch);
1075        }
1076
1077        self.commands.push(CommandData {
1078            id: command.id(),
1079            priority: command.priority(),
1080            policy: command.policy().map(Box::from),
1081            data: command.bytes().into(),
1082            updates: core::mem::take(&mut self.current_updates),
1083        });
1084        Ok(self.commands.len())
1085    }
1086
1087    fn includes(&self, id: CmdId) -> bool {
1088        self.commands.iter().any(|cmd| cmd.id == id)
1089    }
1090
1091    fn head_address(&self) -> Result<Prior<Address>, Bug> {
1092        Ok(if let Some(last) = self.commands.last() {
1093            Prior::Single(Address {
1094                id: last.id,
1095                max_cut: self
1096                    .max_cut
1097                    .checked_add(self.commands.len())
1098                    .assume("must not overflow")?
1099                    .checked_sub(1)
1100                    .assume("must not overflow")?,
1101            })
1102        } else {
1103            self.parents
1104        })
1105    }
1106}
1107
1108impl From<Prior<Address>> for Prior<CmdId> {
1109    fn from(p: Prior<Address>) -> Self {
1110        match p {
1111            Prior::None => Self::None,
1112            Prior::Single(l) => Self::Single(l.id),
1113            Prior::Merge(l, r) => Self::Merge(l.id, r.id),
1114        }
1115    }
1116}
1117
1118impl Command for LinearCommand<'_> {
1119    fn priority(&self) -> Priority {
1120        self.priority.clone()
1121    }
1122
1123    fn id(&self) -> CmdId {
1124        *self.id
1125    }
1126
1127    fn parent(&self) -> Prior<Address> {
1128        self.parent
1129    }
1130
1131    fn policy(&self) -> Option<&[u8]> {
1132        self.policy
1133    }
1134
1135    fn bytes(&self) -> &[u8] {
1136        self.data
1137    }
1138
1139    fn max_cut(&self) -> Result<usize, Bug> {
1140        Ok(self.max_cut)
1141    }
1142}
1143
1144#[cfg(test)]
1145mod test {
1146    use testing::Manager;
1147
1148    use super::*;
1149    use crate::testing::dsl::{StorageBackend, test_suite};
1150
1151    #[test]
1152    fn test_query_prefix() {
1153        let mut provider = LinearStorageProvider::new(Manager::new());
1154        let mut fp = provider.new_perspective(PolicyId::new(0));
1155
1156        let name = "x";
1157
1158        let keys: &[&[&str]] = &[
1159            &["aa", "xy", "123"],
1160            &["aa", "xz", "123"],
1161            &["bb", "ccc"],
1162            &["bc", ""],
1163        ];
1164        let keys: Vec<Keys> = keys
1165            .iter()
1166            .map(|ks| ks.iter().map(|k| k.as_bytes()).collect())
1167            .collect();
1168
1169        for ks in &keys {
1170            fp.insert(
1171                name.into(),
1172                ks.clone(),
1173                format!("{ks:?}").into_bytes().into(),
1174            );
1175        }
1176
1177        let prefixes: &[&[&str]] = &[
1178            &["aa", "xy", "12"],
1179            &["aa", "xy"],
1180            &["aa", "xz"],
1181            &["aa", "x"],
1182            &["bb", ""],
1183            &["bb", "ccc"],
1184            &["bc", ""],
1185            &["bc", "", ""],
1186        ];
1187
1188        for prefix in prefixes {
1189            let prefix: Keys = prefix.iter().map(|k| k.as_bytes()).collect();
1190            let found: Vec<_> = fp.query_prefix(name, &prefix).unwrap().collect();
1191            let mut expected: Vec<_> = keys.iter().filter(|k| k.starts_with(&prefix)).collect();
1192            expected.sort();
1193            assert_eq!(found.len(), expected.len());
1194            for (a, b) in std::iter::zip(found, expected) {
1195                let a = a.unwrap();
1196                assert_eq!(&a.key, b);
1197                assert_eq!(a.value.as_ref(), format!("{b:?}").as_bytes());
1198            }
1199        }
1200    }
1201
1202    struct LinearBackend;
1203    impl StorageBackend for LinearBackend {
1204        type StorageProvider = LinearStorageProvider<Manager>;
1205
1206        fn provider(&mut self, _client_id: u64) -> Self::StorageProvider {
1207            LinearStorageProvider::new(Manager::new())
1208        }
1209    }
1210    test_suite!(|| LinearBackend);
1211}