Skip to main content

aranya_runtime/storage/linear/
mod.rs

1//! Persistant linear storage implemenatation.
2//!
3//! `LinearStorage` is a graph storage implementation backed by a file-like byte
4//! storage interface. This is designed to be usable across many environments
5//! with minimal assumptions on the underlying storage.
6//!
7//! # Layout
8//!
9//! `[x]` is page aligned.
10//!
11//! ```text
12//! // Control section
13//! [Base] [Root] [Root]
14//! // Data section
15//! [Segment or FactIndex]
16//! |
17//! V
18//! ```
19//!
20//! The `LinearStorage` will exclusively modify the control section. The data
21//! section is append-only but can be read concurrently. If written data is not
22//! committed, it may be overwritten and will become unreachable by intended
23//! means.
24
25pub mod libc;
26
27#[cfg(feature = "testing")]
28pub mod testing;
29
30use alloc::{boxed::Box, collections::BTreeMap, string::String, vec, vec::Vec};
31
32use aranya_crypto::{Rng, dangerous::spideroak_crypto::csprng::rand::Rng as _};
33use buggy::{Bug, BugExt as _, bug};
34use serde::{Deserialize, Serialize};
35use vec1::Vec1;
36
37use crate::{
38    Address, Checkpoint, CmdId, Command, Fact, FactIndex, FactPerspective, GraphId, Keys, Location,
39    MaxCut, Perspective, PolicyId, Prior, Priority, Query, QueryMut, Revertable, Segment,
40    SegmentIndex, Storage, StorageError, StorageProvider,
41};
42
43pub mod io;
44pub use io::*;
45
46/// Maximum depth of fact indices before compaction.
47///
48/// A lower value will speed up search queries but require more compaction,
49/// slowing down fact index creation and using more storage space.
50///
51/// In the future, this may be configurable at runtime or dynamic based on
52/// heuristics such as fact density.
53///
54/// 16 is our initial guess for balance.
55///
56/// This must be at least 2.
57const MAX_FACT_INDEX_DEPTH: usize = 16;
58
59pub struct LinearStorageProvider<FM: IoManager> {
60    manager: FM,
61    storage: BTreeMap<GraphId, LinearStorage<FM::Writer>>,
62}
63
64pub struct LinearStorage<W> {
65    writer: W,
66}
67
68#[derive(Debug)]
69pub struct LinearSegment<R> {
70    repr: SegmentRepr,
71    reader: R,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75struct SegmentRepr {
76    /// Self offset in file.
77    offset: SegmentIndex,
78    prior: Prior<Location>,
79    parents: Prior<Address>,
80    policy: PolicyId,
81    /// Offset in file to associated fact index.
82    facts: usize,
83    commands: Vec1<CommandData>,
84    max_cut: MaxCut,
85    skip_list: Vec<Location>,
86}
87
88#[derive(Debug, Serialize, Deserialize)]
89struct CommandData {
90    id: CmdId,
91    priority: Priority,
92    policy: Option<Bytes>,
93    data: Bytes,
94    updates: Vec<Update>,
95}
96
97pub struct LinearCommand<'a> {
98    id: &'a CmdId,
99    parent: Prior<Address>,
100    priority: Priority,
101    policy: Option<&'a [u8]>,
102    data: &'a [u8],
103    max_cut: MaxCut,
104}
105
106type Bytes = Box<[u8]>;
107
108type Update = (String, Keys, Option<Bytes>);
109type FactMap = BTreeMap<Keys, Option<Box<[u8]>>>;
110type NamedFactMap = BTreeMap<String, FactMap>;
111
112#[derive(Debug)]
113pub struct LinearFactIndex<R> {
114    repr: FactIndexRepr,
115    reader: R,
116}
117
118#[derive(Debug, Serialize, Deserialize)]
119struct FactIndexRepr {
120    /// Self offset in file.
121    offset: usize,
122    /// Offset of prior fact index.
123    prior: Option<usize>,
124    /// Depth of this fact index.
125    ///
126    /// `prior.depth + 1`, or just `1` if no prior
127    depth: usize,
128    /// Facts in sorted order
129    facts: NamedFactMap,
130}
131
132#[derive(Debug)]
133pub struct LinearPerspective<R> {
134    prior: Prior<Location>,
135    parents: Prior<Address>,
136    policy: PolicyId,
137    facts: LinearFactPerspective<R>,
138    commands: Vec<CommandData>,
139    current_updates: Vec<Update>,
140    max_cut: MaxCut,
141    last_common_ancestor: Option<Location>,
142}
143
144impl<R> LinearPerspective<R> {
145    fn new(
146        prior: Prior<Location>,
147        parents: Prior<Address>,
148        policy: PolicyId,
149        prior_facts: FactPerspectivePrior<R>,
150        max_cut: MaxCut,
151        last_common_ancestor: Option<Location>,
152    ) -> Self {
153        Self {
154            prior,
155            parents,
156            policy,
157            facts: LinearFactPerspective::new(prior_facts),
158            commands: Vec::new(),
159            current_updates: Vec::new(),
160            max_cut,
161            last_common_ancestor,
162        }
163    }
164}
165
166#[derive(Debug)]
167pub struct LinearFactPerspective<R> {
168    map: BTreeMap<String, BTreeMap<Keys, Option<Bytes>>>,
169    prior: FactPerspectivePrior<R>,
170}
171
172impl<R> LinearFactPerspective<R> {
173    fn new(prior: FactPerspectivePrior<R>) -> Self {
174        Self {
175            map: BTreeMap::new(),
176            prior,
177        }
178    }
179}
180
181#[derive(Debug)]
182enum FactPerspectivePrior<R> {
183    None,
184    FactPerspective(Box<LinearFactPerspective<R>>),
185    FactIndex { offset: usize, reader: R },
186}
187
188impl<R> FactPerspectivePrior<R> {
189    fn is_none(&self) -> bool {
190        matches!(self, Self::None)
191    }
192}
193
194impl<FM: IoManager + Default> Default for LinearStorageProvider<FM> {
195    fn default() -> Self {
196        Self {
197            manager: FM::default(),
198            storage: BTreeMap::new(),
199        }
200    }
201}
202
203impl<FM: IoManager> LinearStorageProvider<FM> {
204    pub fn new(manager: FM) -> Self {
205        Self {
206            manager,
207            storage: BTreeMap::new(),
208        }
209    }
210}
211
212impl<FM: IoManager> StorageProvider for LinearStorageProvider<FM> {
213    type Perspective = LinearPerspective<<FM::Writer as Write>::ReadOnly>;
214    type Segment = LinearSegment<<FM::Writer as Write>::ReadOnly>;
215    type Storage = LinearStorage<FM::Writer>;
216
217    fn new_perspective(&mut self, policy_id: PolicyId) -> Self::Perspective {
218        LinearPerspective::new(
219            Prior::None,
220            Prior::None,
221            policy_id,
222            FactPerspectivePrior::None,
223            MaxCut(0),
224            None,
225        )
226    }
227
228    fn new_storage(
229        &mut self,
230        init: Self::Perspective,
231    ) -> Result<(GraphId, &mut Self::Storage), StorageError> {
232        use alloc::collections::btree_map::Entry;
233
234        if init.commands.is_empty() {
235            return Err(StorageError::EmptyPerspective);
236        }
237        let graph_id = GraphId::transmute(init.commands[0].id);
238        let Entry::Vacant(entry) = self.storage.entry(graph_id) else {
239            return Err(StorageError::StorageExists);
240        };
241
242        let file = self.manager.create(graph_id)?;
243        Ok((graph_id, entry.insert(LinearStorage::create(file, init)?)))
244    }
245
246    fn get_storage(&mut self, graph: GraphId) -> Result<&mut Self::Storage, StorageError> {
247        use alloc::collections::btree_map::Entry;
248
249        let entry = match self.storage.entry(graph) {
250            Entry::Vacant(v) => v,
251            Entry::Occupied(o) => return Ok(o.into_mut()),
252        };
253
254        let file = self
255            .manager
256            .open(graph)?
257            .ok_or(StorageError::NoSuchStorage)?;
258        Ok(entry.insert(LinearStorage::open(file)?))
259    }
260
261    fn remove_storage(&mut self, graph: GraphId) -> Result<(), StorageError> {
262        self.manager.remove(graph)?;
263
264        self.storage
265            .remove(&graph)
266            .ok_or(StorageError::NoSuchStorage)?;
267
268        Ok(())
269    }
270
271    fn list_graph_ids(
272        &mut self,
273    ) -> Result<impl Iterator<Item = Result<GraphId, StorageError>>, StorageError> {
274        self.manager.list()
275    }
276}
277
278impl<W: Write> LinearStorage<W> {
279    fn get_skip(
280        &self,
281        segment: <Self as Storage>::Segment,
282        max_cut: MaxCut,
283    ) -> Result<Option<Location>, StorageError> {
284        let mut head = segment;
285        let mut current = None;
286        'outer: loop {
287            if max_cut > head.longest_max_cut()? {
288                return Ok(current);
289            }
290            current = Some(head.first_location());
291            if max_cut >= head.shortest_max_cut() {
292                return Ok(current);
293            }
294            // Assumes skip list is sorted in ascending order.
295            // We always want to skip as close to the root as possible.
296            for skip in head.skip_list() {
297                if skip.max_cut <= max_cut {
298                    head = self.get_segment(*skip)?;
299                    continue 'outer;
300                }
301            }
302            head = match head.prior() {
303                Prior::None | Prior::Merge(_, _) => {
304                    return Ok(current);
305                }
306                Prior::Single(l) => self.get_segment(l)?,
307            }
308        }
309    }
310}
311
312impl<W: Write> LinearStorage<W> {
313    fn create(mut writer: W, init: LinearPerspective<W::ReadOnly>) -> Result<Self, StorageError> {
314        assert!(matches!(init.prior, Prior::None));
315        assert!(matches!(init.parents, Prior::None));
316        assert!(matches!(init.facts.prior, FactPerspectivePrior::None));
317
318        let mut map = init.facts.map;
319        map.retain(|_, kv| !kv.is_empty());
320
321        let facts = writer
322            .append(|offset| FactIndexRepr {
323                offset,
324                prior: None,
325                depth: 1,
326                facts: map,
327            })?
328            .offset;
329
330        let commands = init
331            .commands
332            .try_into()
333            .map_err(|_| StorageError::EmptyPerspective)?;
334        let segment = writer.append(|offset| SegmentRepr {
335            offset: SegmentIndex(offset),
336            prior: Prior::None,
337            parents: Prior::None,
338            policy: init.policy,
339            facts,
340            commands,
341            max_cut: MaxCut(0),
342            skip_list: vec![],
343        })?;
344
345        let head = Location::new(
346            segment.offset,
347            segment
348                .max_cut
349                .checked_add(
350                    segment
351                        .commands
352                        .len()
353                        .checked_sub(1)
354                        .assume("vec1 length >= 1")?,
355                )
356                .assume("valid max cut")?,
357        );
358
359        writer.commit(head)?;
360
361        let storage = Self { writer };
362
363        Ok(storage)
364    }
365
366    fn open(writer: W) -> Result<Self, StorageError> {
367        Ok(Self { writer })
368    }
369
370    fn compact(&mut self, mut repr: FactIndexRepr) -> Result<FactIndexRepr, StorageError> {
371        let mut map = NamedFactMap::new();
372        let reader = self.writer.readonly();
373        loop {
374            for (name, kv) in repr.facts {
375                let sub = map.entry(name).or_default();
376                for (k, v) in kv {
377                    sub.entry(k).or_insert(v);
378                }
379            }
380            let Some(offset) = repr.prior else { break };
381            repr = reader.fetch(offset)?;
382        }
383
384        // Since there's no prior, we can remove tombstones
385        map.retain(|_, kv| {
386            kv.retain(|_, v| v.is_some());
387            !kv.is_empty()
388        });
389
390        Ok(self
391            .write_facts(LinearFactPerspective {
392                map,
393                prior: FactPerspectivePrior::None,
394            })?
395            .repr)
396    }
397}
398
399impl<F: Write> Storage for LinearStorage<F> {
400    type Perspective = LinearPerspective<F::ReadOnly>;
401    type FactPerspective = LinearFactPerspective<F::ReadOnly>;
402    type Segment = LinearSegment<F::ReadOnly>;
403    type FactIndex = LinearFactIndex<F::ReadOnly>;
404
405    fn get_linear_perspective(&self, parent: Location) -> Result<Self::Perspective, StorageError> {
406        let segment = self.get_segment(parent)?;
407        let command = segment
408            .get_command(parent)
409            .ok_or(StorageError::CommandOutOfBounds(parent))?;
410        let policy = segment.repr.policy;
411        let prior_facts: FactPerspectivePrior<F::ReadOnly> = if parent == segment.head_location()? {
412            FactPerspectivePrior::FactIndex {
413                offset: segment.repr.facts,
414                reader: self.writer.readonly(),
415            }
416        } else {
417            let prior = match segment.facts()?.repr.prior {
418                Some(offset) => FactPerspectivePrior::FactIndex {
419                    offset,
420                    reader: self.writer.readonly(),
421                },
422                None => FactPerspectivePrior::None,
423            };
424            let mut facts = LinearFactPerspective::new(prior);
425            for data in &segment.repr.commands[..=segment.repr.cmd_index(parent.max_cut)?] {
426                facts.apply_updates(&data.updates);
427            }
428            if facts.prior.is_none() {
429                facts.map.retain(|_, kv| !kv.is_empty());
430            }
431            if facts.map.is_empty() {
432                facts.prior
433            } else {
434                FactPerspectivePrior::FactPerspective(Box::new(facts))
435            }
436        };
437        let prior = Prior::Single(parent);
438
439        let perspective = LinearPerspective::new(
440            prior,
441            Prior::Single(command.address()?),
442            policy,
443            prior_facts,
444            command
445                .max_cut()?
446                .checked_add(1)
447                .assume("must not overflow")?,
448            None,
449        );
450
451        Ok(perspective)
452    }
453
454    fn get_fact_perspective(
455        &self,
456        location: Location,
457    ) -> Result<Self::FactPerspective, StorageError> {
458        let segment = self.get_segment(location)?;
459
460        // If at head of segment, or no facts in segment,
461        // we don't need to apply updates.
462        if location == segment.head_location()?
463            || segment
464                .repr
465                .commands
466                .iter()
467                .all(|cmd| cmd.updates.is_empty())
468        {
469            return Ok(LinearFactPerspective::new(
470                FactPerspectivePrior::FactIndex {
471                    offset: segment.repr.facts,
472                    reader: self.writer.readonly(),
473                },
474            ));
475        }
476
477        let prior = match segment.facts()?.repr.prior {
478            Some(offset) => FactPerspectivePrior::FactIndex {
479                offset,
480                reader: self.writer.readonly(),
481            },
482            None => FactPerspectivePrior::None,
483        };
484        let mut facts = LinearFactPerspective::new(prior);
485        for data in &segment.repr.commands[..=segment.repr.cmd_index(location.max_cut)?] {
486            facts.apply_updates(&data.updates);
487        }
488
489        Ok(facts)
490    }
491
492    fn new_merge_perspective(
493        &self,
494        left: Location,
495        right: Location,
496        last_common_ancestor: Location,
497        policy_id: PolicyId,
498        braid: Self::FactIndex,
499    ) -> Result<Self::Perspective, StorageError> {
500        // TODO(jdygert): ensure braid belongs to this storage.
501        // TODO(jdygert): ensure braid ends at given command?
502        let left_segment = self.get_segment(left)?;
503        let left_command = left_segment
504            .get_command(left)
505            .ok_or(StorageError::CommandOutOfBounds(left))?;
506        let right_segment = self.get_segment(right)?;
507        let right_command = right_segment
508            .get_command(right)
509            .ok_or(StorageError::CommandOutOfBounds(right))?;
510
511        let parent = Prior::Merge(left_command.address()?, right_command.address()?);
512
513        if policy_id != left_segment.policy() && policy_id != right_segment.policy() {
514            return Err(StorageError::PolicyMismatch);
515        }
516
517        let prior = Prior::Merge(left, right);
518
519        let perspective = LinearPerspective::new(
520            prior,
521            parent,
522            policy_id,
523            FactPerspectivePrior::FactIndex {
524                offset: braid.repr.offset,
525                reader: braid.reader,
526            },
527            left_command
528                .max_cut()?
529                .max(right_command.max_cut()?)
530                .checked_add(1)
531                .assume("must not overflow")?,
532            Some(last_common_ancestor),
533        );
534
535        Ok(perspective)
536    }
537
538    fn get_segment(&self, location: Location) -> Result<Self::Segment, StorageError> {
539        let reader = self.writer.readonly();
540        let repr = reader.fetch(location.segment.0)?;
541        let seg = LinearSegment { repr, reader };
542
543        Ok(seg)
544    }
545
546    fn get_head(&self) -> Result<Location, StorageError> {
547        self.writer.head()
548    }
549
550    fn commit(&mut self, segment: Self::Segment) -> Result<(), StorageError> {
551        if !self.is_ancestor(self.get_head()?, &segment)? {
552            return Err(StorageError::HeadNotAncestor);
553        }
554
555        self.writer.commit(segment.head_location()?)
556    }
557
558    fn write(&mut self, perspective: Self::Perspective) -> Result<Self::Segment, StorageError> {
559        // TODO(jdygert): Validate prior?
560
561        let facts = self.write_facts(perspective.facts)?.repr.offset;
562
563        let commands: Vec1<CommandData> = perspective
564            .commands
565            .try_into()
566            .map_err(|_| StorageError::EmptyPerspective)?;
567
568        let get_skips = |l: Location, count: usize| -> Result<Vec<Location>, StorageError> {
569            let mut rng = Rng;
570            let mut skips = vec![];
571            for _ in 0..count {
572                let segment = self.get_segment(l)?;
573                if l.max_cut > MaxCut(0) {
574                    let max_cut = MaxCut(rng.gen_range(0..l.max_cut.0));
575                    if let Some(skip) = self.get_skip(segment, max_cut)? {
576                        if !skips.contains(&skip) {
577                            skips.push(skip);
578                        }
579                    } else {
580                        break;
581                    }
582                }
583            }
584            Ok(skips)
585        };
586
587        let skip_list = match perspective.prior {
588            Prior::None => vec![],
589            Prior::Merge(_, _) => {
590                let lca = perspective.last_common_ancestor.assume("lca must exist")?;
591                let mut skips = get_skips(lca, 2)?;
592                if !skips.contains(&lca) {
593                    skips.push(lca);
594                }
595                skips.sort();
596                skips
597            }
598            Prior::Single(l) => {
599                let mut skips = get_skips(l, 3)?;
600                skips.sort();
601                skips
602            }
603        };
604        let repr = self.writer.append(|offset| SegmentRepr {
605            offset: SegmentIndex(offset),
606            prior: perspective.prior,
607            parents: perspective.parents,
608            policy: perspective.policy,
609            facts,
610            commands,
611            max_cut: perspective.max_cut,
612            skip_list,
613        })?;
614
615        Ok(LinearSegment {
616            repr,
617            reader: self.writer.readonly(),
618        })
619    }
620
621    fn write_facts(
622        &mut self,
623        facts: Self::FactPerspective,
624    ) -> Result<Self::FactIndex, StorageError> {
625        let mut prior = match facts.prior {
626            FactPerspectivePrior::None => None,
627            FactPerspectivePrior::FactPerspective(prior) => {
628                let prior = self.write_facts(*prior)?;
629                if facts.map.is_empty() {
630                    return Ok(prior);
631                }
632                Some(prior.repr)
633            }
634            FactPerspectivePrior::FactIndex { offset, reader } => {
635                let repr = reader.fetch(offset)?;
636                if facts.map.is_empty() {
637                    return Ok(LinearFactIndex { repr, reader });
638                }
639                Some(repr)
640            }
641        };
642
643        let depth = if let Some(mut p) = prior.take() {
644            if p.depth > MAX_FACT_INDEX_DEPTH - 1 {
645                p = self.compact(p)?;
646            }
647            prior.insert(p).depth
648        } else {
649            0
650        };
651
652        let depth = depth.checked_add(1).assume("depth won't overflow")?;
653
654        if depth > MAX_FACT_INDEX_DEPTH {
655            bug!("fact index too deep");
656        }
657
658        let repr = self.writer.append(|offset| FactIndexRepr {
659            offset,
660            prior: prior.map(|p| p.offset),
661            depth,
662            facts: facts.map,
663        })?;
664
665        Ok(LinearFactIndex {
666            repr,
667            reader: self.writer.readonly(),
668        })
669    }
670}
671
672impl<R: Read> Segment for LinearSegment<R> {
673    type FactIndex = LinearFactIndex<R>;
674    type Command<'a>
675        = LinearCommand<'a>
676    where
677        R: 'a;
678
679    fn index(&self) -> SegmentIndex {
680        self.repr.offset
681    }
682
683    fn head_id(&self) -> CmdId {
684        self.repr.commands.last().id
685    }
686
687    fn first_location(&self) -> Location {
688        Location::new(self.repr.offset, self.repr.max_cut)
689    }
690
691    fn policy(&self) -> PolicyId {
692        self.repr.policy
693    }
694
695    fn prior(&self) -> Prior<Location> {
696        self.repr.prior
697    }
698
699    fn get_command(&self, location: Location) -> Option<Self::Command<'_>> {
700        if self.repr.offset != location.segment {
701            return None;
702        }
703        let cmd_idx = self.repr.cmd_index(location.max_cut).ok()?;
704        let data = self.repr.commands.get(cmd_idx)?;
705        let parent = if let Some(prev) = usize::checked_sub(cmd_idx, 1) {
706            if let Some(max_cut) = self.repr.max_cut.checked_add(prev) {
707                Prior::Single(Address {
708                    id: self.repr.commands[prev].id,
709                    max_cut,
710                })
711            } else {
712                return None;
713            }
714        } else {
715            self.repr.parents
716        };
717        Some(LinearCommand {
718            id: &data.id,
719            parent,
720            priority: data.priority.clone(),
721            policy: data.policy.as_deref(),
722            data: &data.data,
723            max_cut: location.max_cut,
724        })
725    }
726
727    fn facts(&self) -> Result<Self::FactIndex, StorageError> {
728        Ok(LinearFactIndex {
729            repr: self.reader.fetch(self.repr.facts)?,
730            reader: self.reader.clone(),
731        })
732    }
733
734    fn skip_list(&self) -> &[Location] {
735        &self.repr.skip_list
736    }
737
738    fn shortest_max_cut(&self) -> MaxCut {
739        self.repr.max_cut
740    }
741
742    fn longest_max_cut(&self) -> Result<MaxCut, StorageError> {
743        Ok(self
744            .repr
745            .max_cut
746            .checked_add(
747                self.repr
748                    .commands
749                    .len()
750                    .checked_sub(1)
751                    .assume("must not overflow")?,
752            )
753            .assume("must not overflow")?)
754    }
755}
756
757impl SegmentRepr {
758    fn cmd_index(&self, max_cut: MaxCut) -> Result<usize, StorageError> {
759        max_cut
760            .distance_from(self.max_cut)
761            .ok_or(StorageError::CommandOutOfBounds(Location::new(
762                self.offset,
763                max_cut,
764            )))
765    }
766}
767
768impl<R: Read> FactIndex for LinearFactIndex<R> {}
769
770type MapIter = alloc::collections::btree_map::IntoIter<Keys, Option<Bytes>>;
771pub struct QueryIterator {
772    it: MapIter,
773}
774
775impl QueryIterator {
776    fn new(it: MapIter) -> Self {
777        Self { it }
778    }
779}
780
781impl Iterator for QueryIterator {
782    type Item = Result<Fact, StorageError>;
783    fn next(&mut self) -> Option<Self::Item> {
784        loop {
785            // filter out tombstones
786            if let (key, Some(value)) = self.it.next()? {
787                return Some(Ok(Fact { key, value }));
788            }
789        }
790    }
791}
792
793impl<R: Read> Query for LinearFactIndex<R> {
794    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
795        let mut prior = Some(&self.repr);
796        let mut slot; // Need to store deserialized value.
797        while let Some(facts) = prior {
798            if let Some(v) = facts.facts.get(name).and_then(|m| m.get(keys)) {
799                return Ok(v.clone());
800            }
801            slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
802            prior = slot.as_ref();
803        }
804        Ok(None)
805    }
806
807    type QueryIterator = QueryIterator;
808    fn query_prefix(
809        &self,
810        name: &str,
811        prefix: &[Box<[u8]>],
812    ) -> Result<QueryIterator, StorageError> {
813        Ok(QueryIterator::new(
814            self.query_prefix_inner(name, prefix)?.into_iter(),
815        ))
816    }
817}
818
819impl<R: Read> LinearFactIndex<R> {
820    fn query_prefix_inner(
821        &self,
822        name: &str,
823        prefix: &[Box<[u8]>],
824    ) -> Result<FactMap, StorageError> {
825        let mut matches = BTreeMap::new();
826        let mut prior = Some(&self.repr);
827        let mut slot; // Need to store deserialized value.
828        while let Some(facts) = prior {
829            if let Some(map) = facts.facts.get(name) {
830                for (k, v) in super::memory::find_prefixes(map, prefix) {
831                    // don't override, if we've already found the fact (including deletions)
832                    if !matches.contains_key(k) {
833                        matches.insert(k.clone(), v.map(Into::into));
834                    }
835                }
836            }
837            slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
838            prior = slot.as_ref();
839        }
840        Ok(matches)
841    }
842}
843
844impl<R> LinearFactPerspective<R> {
845    fn clear(&mut self) {
846        self.map.clear();
847    }
848
849    fn apply_updates(&mut self, updates: &[Update]) {
850        for (name, key, value) in updates {
851            if self.prior.is_none() {
852                if let Some(value) = value {
853                    self.map
854                        .entry(name.clone())
855                        .or_default()
856                        .insert(key.clone(), Some(value.clone()));
857                } else if let Some(e) = self.map.get_mut(name) {
858                    e.remove(key);
859                }
860            } else {
861                self.map
862                    .entry(name.clone())
863                    .or_default()
864                    .insert(key.clone(), value.clone());
865            }
866        }
867    }
868}
869
870impl<R: Read> FactPerspective for LinearFactPerspective<R> {}
871
872impl<R: Read> Query for LinearFactPerspective<R> {
873    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
874        if let Some(wrapped) = self.map.get(name).and_then(|m| m.get(keys)) {
875            return Ok(wrapped.as_deref().map(Box::from));
876        }
877        match &self.prior {
878            FactPerspectivePrior::None => Ok(None),
879            FactPerspectivePrior::FactPerspective(prior) => prior.query(name, keys),
880            FactPerspectivePrior::FactIndex { offset, reader } => {
881                let repr: FactIndexRepr = reader.fetch(*offset)?;
882                let prior = LinearFactIndex {
883                    repr,
884                    reader: reader.clone(),
885                };
886                prior.query(name, keys)
887            }
888        }
889    }
890
891    type QueryIterator = QueryIterator;
892    fn query_prefix(
893        &self,
894        name: &str,
895        prefix: &[Box<[u8]>],
896    ) -> Result<QueryIterator, StorageError> {
897        Ok(QueryIterator::new(
898            self.query_prefix_inner(name, prefix)?.into_iter(),
899        ))
900    }
901}
902
903impl<R: Read> LinearFactPerspective<R> {
904    fn query_prefix_inner(
905        &self,
906        name: &str,
907        prefix: &[Box<[u8]>],
908    ) -> Result<FactMap, StorageError> {
909        let mut matches = match &self.prior {
910            FactPerspectivePrior::None => BTreeMap::new(),
911            FactPerspectivePrior::FactPerspective(prior) => {
912                prior.query_prefix_inner(name, prefix)?
913            }
914            FactPerspectivePrior::FactIndex { offset, reader } => {
915                let repr: FactIndexRepr = reader.fetch(*offset)?;
916                let prior = LinearFactIndex {
917                    repr,
918                    reader: reader.clone(),
919                };
920                prior.query_prefix_inner(name, prefix)?
921            }
922        };
923        if let Some(map) = self.map.get(name) {
924            for (k, v) in super::memory::find_prefixes(map, prefix) {
925                // overwrite "earlier" facts
926                matches.insert(k.clone(), v.map(Into::into));
927            }
928        }
929        Ok(matches)
930    }
931}
932
933impl<R: Read> QueryMut for LinearFactPerspective<R> {
934    fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
935        self.map.entry(name).or_default().insert(keys, Some(value));
936    }
937
938    fn delete(&mut self, name: String, keys: Keys) {
939        if self.prior.is_none() {
940            // No need for tombstones with no prior.
941            if let Some(kv) = self.map.get_mut(&name) {
942                kv.remove(&keys);
943            }
944        } else {
945            self.map.entry(name).or_default().insert(keys, None);
946        }
947    }
948}
949
950impl<R: Read> FactPerspective for LinearPerspective<R> {}
951
952impl<R: Read> Query for LinearPerspective<R> {
953    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
954        self.facts.query(name, keys)
955    }
956
957    type QueryIterator = QueryIterator;
958    fn query_prefix(
959        &self,
960        name: &str,
961        prefix: &[Box<[u8]>],
962    ) -> Result<QueryIterator, StorageError> {
963        self.facts.query_prefix(name, prefix)
964    }
965}
966
967impl<R: Read> QueryMut for LinearPerspective<R> {
968    fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
969        self.facts.insert(name.clone(), keys.clone(), value.clone());
970        self.current_updates.push((name, keys, Some(value)));
971    }
972
973    fn delete(&mut self, name: String, keys: Keys) {
974        self.facts.delete(name.clone(), keys.clone());
975        self.current_updates.push((name, keys, None));
976    }
977}
978
979impl<R: Read> Revertable for LinearPerspective<R> {
980    fn checkpoint(&self) -> Checkpoint {
981        Checkpoint {
982            index: self.commands.len(),
983        }
984    }
985
986    fn revert(&mut self, checkpoint: Checkpoint) -> Result<(), Bug> {
987        if checkpoint.index == self.commands.len() {
988            return Ok(());
989        }
990
991        if checkpoint.index > self.commands.len() {
992            bug!(
993                "A checkpoint's index should always be less than or equal to the length of a perspective's command history!"
994            );
995        }
996
997        self.commands.truncate(checkpoint.index);
998        self.facts.clear();
999        self.current_updates.clear();
1000        for data in &self.commands {
1001            self.facts.apply_updates(&data.updates);
1002        }
1003
1004        Ok(())
1005    }
1006}
1007
1008impl<R: Read> Perspective for LinearPerspective<R> {
1009    fn policy(&self) -> PolicyId {
1010        self.policy
1011    }
1012
1013    fn add_command(&mut self, command: &impl Command) -> Result<usize, StorageError> {
1014        if command.parent() != self.head_address()? {
1015            return Err(StorageError::PerspectiveHeadMismatch);
1016        }
1017
1018        self.commands.push(CommandData {
1019            id: command.id(),
1020            priority: command.priority(),
1021            policy: command.policy().map(Box::from),
1022            data: command.bytes().into(),
1023            updates: core::mem::take(&mut self.current_updates),
1024        });
1025        Ok(self.commands.len())
1026    }
1027
1028    fn includes(&self, id: CmdId) -> bool {
1029        self.commands.iter().any(|cmd| cmd.id == id)
1030    }
1031
1032    fn head_address(&self) -> Result<Prior<Address>, Bug> {
1033        Ok(if let Some(last) = self.commands.last() {
1034            Prior::Single(Address {
1035                id: last.id,
1036                max_cut: self
1037                    .max_cut
1038                    .checked_add(
1039                        self.commands
1040                            .len()
1041                            .checked_sub(1)
1042                            .assume("must not overflow")?,
1043                    )
1044                    .assume("must not overflow")?,
1045            })
1046        } else {
1047            self.parents
1048        })
1049    }
1050}
1051
1052impl From<Prior<Address>> for Prior<CmdId> {
1053    fn from(p: Prior<Address>) -> Self {
1054        match p {
1055            Prior::None => Self::None,
1056            Prior::Single(l) => Self::Single(l.id),
1057            Prior::Merge(l, r) => Self::Merge(l.id, r.id),
1058        }
1059    }
1060}
1061
1062impl Command for LinearCommand<'_> {
1063    fn priority(&self) -> Priority {
1064        self.priority.clone()
1065    }
1066
1067    fn id(&self) -> CmdId {
1068        *self.id
1069    }
1070
1071    fn parent(&self) -> Prior<Address> {
1072        self.parent
1073    }
1074
1075    fn policy(&self) -> Option<&[u8]> {
1076        self.policy
1077    }
1078
1079    fn bytes(&self) -> &[u8] {
1080        self.data
1081    }
1082
1083    fn max_cut(&self) -> Result<MaxCut, Bug> {
1084        Ok(self.max_cut)
1085    }
1086}
1087
1088#[cfg(test)]
1089mod test {
1090    use testing::Manager;
1091
1092    use super::*;
1093    use crate::testing::dsl::{StorageBackend, test_suite};
1094
1095    #[test]
1096    fn test_query_prefix() {
1097        let mut provider = LinearStorageProvider::new(Manager::new());
1098        let mut fp = provider.new_perspective(PolicyId::new(0));
1099
1100        let name = "x";
1101
1102        let keys: &[&[&str]] = &[
1103            &["aa", "xy", "123"],
1104            &["aa", "xz", "123"],
1105            &["bb", "ccc"],
1106            &["bc", ""],
1107        ];
1108        let keys: Vec<Keys> = keys
1109            .iter()
1110            .map(|ks| ks.iter().map(|k| k.as_bytes()).collect())
1111            .collect();
1112
1113        for ks in &keys {
1114            fp.insert(
1115                name.into(),
1116                ks.clone(),
1117                format!("{ks:?}").into_bytes().into(),
1118            );
1119        }
1120
1121        let prefixes: &[&[&str]] = &[
1122            &["aa", "xy", "12"],
1123            &["aa", "xy"],
1124            &["aa", "xz"],
1125            &["aa", "x"],
1126            &["bb", ""],
1127            &["bb", "ccc"],
1128            &["bc", ""],
1129            &["bc", "", ""],
1130        ];
1131
1132        for prefix in prefixes {
1133            let prefix: Keys = prefix.iter().map(|k| k.as_bytes()).collect();
1134            let found: Vec<_> = fp.query_prefix(name, &prefix).unwrap().collect();
1135            let mut expected: Vec<_> = keys.iter().filter(|k| k.starts_with(&prefix)).collect();
1136            expected.sort();
1137            assert_eq!(found.len(), expected.len());
1138            for (a, b) in std::iter::zip(found, expected) {
1139                let a = a.unwrap();
1140                assert_eq!(&a.key, b);
1141                assert_eq!(a.value.as_ref(), format!("{b:?}").as_bytes());
1142            }
1143        }
1144    }
1145
1146    struct LinearBackend;
1147    impl StorageBackend for LinearBackend {
1148        type StorageProvider = LinearStorageProvider<Manager>;
1149
1150        fn provider(&mut self, _client_id: u64) -> Self::StorageProvider {
1151            LinearStorageProvider::new(Manager::new())
1152        }
1153    }
1154    test_suite!(|| LinearBackend);
1155}