aranya_runtime/storage/linear/
mod.rs

1//! Persistant linear storage implemenatation.
2//!
3//! `LinearStorage` is a graph storage implementation backed by a file-like byte
4//! storage interface. This is designed to be usable across many environments
5//! with minimal assumptions on the underlying storage.
6//!
7//! # Layout
8//!
9//! `[x]` is page aligned.
10//!
11//! ```text
12//! // Control section
13//! [Base] [Root] [Root]
14//! // Data section
15//! [Segment or FactIndex]
16//! |
17//! V
18//! ```
19//!
20//! The `LinearStorage` will exclusively modify the control section. The data
21//! section is append-only but can be read concurrently. If written data is not
22//! committed, it may be overwritten and will become unreachable by intended
23//! means.
24
25pub mod libc;
26
27#[cfg(feature = "testing")]
28pub mod testing;
29
30use alloc::{boxed::Box, collections::BTreeMap, string::String, vec, vec::Vec};
31
32use aranya_crypto::{csprng::rand::Rng as _, Csprng, Rng};
33use buggy::{bug, Bug, BugExt};
34use serde::{Deserialize, Serialize};
35use vec1::Vec1;
36
37use crate::{
38    Address, Checkpoint, Command, CommandId, Fact, FactIndex, FactPerspective, GraphId, Keys,
39    Location, Perspective, PolicyId, Prior, Priority, Query, QueryMut, Revertable, Segment,
40    Storage, StorageError, StorageProvider,
41};
42
43pub mod io;
44pub use io::*;
45
46/// Maximum depth of fact indices before compaction.
47///
48/// A lower value will speed up search queries but require more compaction,
49/// slowing down fact index creation and using more storage space.
50///
51/// In the future, this may be configurable at runtime or dynamic based on
52/// heuristics such as fact density.
53///
54/// 16 is our initial guess for balance.
55///
56/// This must be at least 2.
57const MAX_FACT_INDEX_DEPTH: usize = 16;
58
59pub struct LinearStorageProvider<FM: IoManager> {
60    manager: FM,
61    storage: BTreeMap<GraphId, LinearStorage<FM::Writer>>,
62}
63
64pub struct LinearStorage<W> {
65    writer: W,
66}
67
68#[derive(Debug)]
69pub struct LinearSegment<R> {
70    repr: SegmentRepr,
71    reader: R,
72}
73
74#[derive(Debug, Serialize, Deserialize)]
75struct SegmentRepr {
76    /// Self offset in file.
77    offset: usize,
78    prior: Prior<Location>,
79    parents: Prior<Address>,
80    policy: PolicyId,
81    /// Offset in file to associated fact index.
82    facts: usize,
83    commands: Vec1<CommandData>,
84    max_cut: usize,
85    skip_list: Vec<(Location, usize)>,
86}
87
88#[derive(Debug, Serialize, Deserialize)]
89struct CommandData {
90    id: CommandId,
91    priority: Priority,
92    policy: Option<Bytes>,
93    data: Bytes,
94    updates: Vec<Update>,
95}
96
97pub struct LinearCommand<'a> {
98    id: &'a CommandId,
99    parent: Prior<Address>,
100    priority: Priority,
101    policy: Option<&'a [u8]>,
102    data: &'a [u8],
103    max_cut: usize,
104}
105
106type Bytes = Box<[u8]>;
107
108type Update = (String, Keys, Option<Bytes>);
109type FactMap = BTreeMap<Keys, Option<Box<[u8]>>>;
110type NamedFactMap = BTreeMap<String, FactMap>;
111
112#[derive(Debug)]
113pub struct LinearFactIndex<R> {
114    repr: FactIndexRepr,
115    reader: R,
116}
117
118#[derive(Debug, Serialize, Deserialize)]
119struct FactIndexRepr {
120    /// Self offset in file.
121    offset: usize,
122    /// Offset of prior fact index.
123    prior: Option<usize>,
124    /// Depth of this fact index.
125    ///
126    /// `prior.depth + 1`, or just `1` if no prior
127    depth: usize,
128    /// Facts in sorted order
129    facts: NamedFactMap,
130}
131
132#[derive(Debug)]
133pub struct LinearPerspective<R> {
134    prior: Prior<Location>,
135    parents: Prior<Address>,
136    policy: PolicyId,
137    facts: LinearFactPerspective<R>,
138    commands: Vec<CommandData>,
139    current_updates: Vec<Update>,
140    max_cut: usize,
141    last_common_ancestor: Option<(Location, usize)>,
142}
143
144impl<R> LinearPerspective<R> {
145    fn new(
146        prior: Prior<Location>,
147        parents: Prior<Address>,
148        policy: PolicyId,
149        prior_facts: FactPerspectivePrior<R>,
150        max_cut: usize,
151        last_common_ancestor: Option<(Location, usize)>,
152    ) -> Self {
153        Self {
154            prior,
155            parents,
156            policy,
157            facts: LinearFactPerspective::new(prior_facts),
158            commands: Vec::new(),
159            current_updates: Vec::new(),
160            max_cut,
161            last_common_ancestor,
162        }
163    }
164}
165
166#[derive(Debug)]
167pub struct LinearFactPerspective<R> {
168    map: BTreeMap<String, BTreeMap<Keys, Option<Bytes>>>,
169    prior: FactPerspectivePrior<R>,
170}
171
172impl<R> LinearFactPerspective<R> {
173    fn new(prior: FactPerspectivePrior<R>) -> Self {
174        Self {
175            map: BTreeMap::new(),
176            prior,
177        }
178    }
179}
180
181#[derive(Debug)]
182enum FactPerspectivePrior<R> {
183    None,
184    FactPerspective(Box<LinearFactPerspective<R>>),
185    FactIndex { offset: usize, reader: R },
186}
187
188impl<R> FactPerspectivePrior<R> {
189    fn is_none(&self) -> bool {
190        matches!(self, Self::None)
191    }
192}
193
194impl<FM: IoManager + Default> Default for LinearStorageProvider<FM> {
195    fn default() -> Self {
196        Self {
197            manager: FM::default(),
198            storage: BTreeMap::new(),
199        }
200    }
201}
202
203impl<FM: IoManager> LinearStorageProvider<FM> {
204    pub fn new(manager: FM) -> Self {
205        Self {
206            manager,
207            storage: BTreeMap::new(),
208        }
209    }
210}
211
212impl<FM: IoManager> StorageProvider for LinearStorageProvider<FM> {
213    type Perspective = LinearPerspective<<FM::Writer as Write>::ReadOnly>;
214    type Segment = LinearSegment<<FM::Writer as Write>::ReadOnly>;
215    type Storage = LinearStorage<FM::Writer>;
216
217    fn new_perspective(&mut self, policy_id: PolicyId) -> Self::Perspective {
218        LinearPerspective::new(
219            Prior::None,
220            Prior::None,
221            policy_id,
222            FactPerspectivePrior::None,
223            0,
224            None,
225        )
226    }
227
228    fn new_storage(
229        &mut self,
230        init: Self::Perspective,
231    ) -> Result<(GraphId, &mut Self::Storage), StorageError> {
232        use alloc::collections::btree_map::Entry;
233
234        if init.commands.is_empty() {
235            return Err(StorageError::EmptyPerspective);
236        }
237        let graph_id = GraphId::from(init.commands[0].id.into_id());
238        let Entry::Vacant(entry) = self.storage.entry(graph_id) else {
239            return Err(StorageError::StorageExists);
240        };
241
242        let file = self.manager.create(graph_id)?;
243        Ok((graph_id, entry.insert(LinearStorage::create(file, init)?)))
244    }
245
246    fn get_storage(&mut self, graph: GraphId) -> Result<&mut Self::Storage, StorageError> {
247        use alloc::collections::btree_map::Entry;
248
249        let entry = match self.storage.entry(graph) {
250            Entry::Vacant(v) => v,
251            Entry::Occupied(o) => return Ok(o.into_mut()),
252        };
253
254        let file = self
255            .manager
256            .open(graph)?
257            .ok_or(StorageError::NoSuchStorage)?;
258        Ok(entry.insert(LinearStorage::open(file)?))
259    }
260
261    fn list_graph_ids(
262        &mut self,
263    ) -> Result<impl Iterator<Item = Result<GraphId, StorageError>>, StorageError> {
264        self.manager.list()
265    }
266}
267
268impl<W: Write> LinearStorage<W> {
269    fn get_skip(
270        &self,
271        segment: <LinearStorage<W> as Storage>::Segment,
272        max_cut: usize,
273    ) -> Result<Option<(Location, usize)>, StorageError> {
274        let mut head = segment;
275        let mut current = None;
276        'outer: loop {
277            if max_cut > head.longest_max_cut()? {
278                return Ok(current);
279            }
280            current = Some((head.first_location(), head.shortest_max_cut()));
281            if max_cut >= head.shortest_max_cut() {
282                return Ok(current);
283            }
284            // Assumes skip list is sorted in ascending order.
285            // We always want to skip as close to the root as possible.
286            for (skip, skip_max_cut) in head.skip_list() {
287                if skip_max_cut <= &max_cut {
288                    head = self.get_segment(*skip)?;
289                    continue 'outer;
290                }
291            }
292            head = match head.prior() {
293                Prior::None | Prior::Merge(_, _) => {
294                    return Ok(current);
295                }
296                Prior::Single(l) => self.get_segment(l)?,
297            }
298        }
299    }
300}
301
302impl<W: Write> LinearStorage<W> {
303    fn create(mut writer: W, init: LinearPerspective<W::ReadOnly>) -> Result<Self, StorageError> {
304        assert!(matches!(init.prior, Prior::None));
305        assert!(matches!(init.parents, Prior::None));
306        assert!(matches!(init.facts.prior, FactPerspectivePrior::None));
307
308        let mut map = init.facts.map;
309        map.retain(|_, kv| !kv.is_empty());
310
311        let facts = writer
312            .append(|offset| FactIndexRepr {
313                offset,
314                prior: None,
315                depth: 1,
316                facts: map,
317            })?
318            .offset;
319
320        let commands = init
321            .commands
322            .try_into()
323            .map_err(|_| StorageError::EmptyPerspective)?;
324        let segment = writer.append(|offset| SegmentRepr {
325            offset,
326            prior: Prior::None,
327            parents: Prior::None,
328            policy: init.policy,
329            facts,
330            commands,
331            max_cut: 0,
332            skip_list: vec![],
333        })?;
334
335        let head = Location::new(
336            segment.offset,
337            segment
338                .commands
339                .len()
340                .checked_sub(1)
341                .assume("vec1 length >= 1")?,
342        );
343
344        writer.commit(head)?;
345
346        let storage = Self { writer };
347
348        Ok(storage)
349    }
350
351    fn open(writer: W) -> Result<Self, StorageError> {
352        Ok(Self { writer })
353    }
354
355    fn compact(&mut self, mut repr: FactIndexRepr) -> Result<FactIndexRepr, StorageError> {
356        let mut map = NamedFactMap::new();
357        let reader = self.writer.readonly();
358        loop {
359            for (name, kv) in repr.facts {
360                let sub = map.entry(name).or_default();
361                for (k, v) in kv {
362                    sub.entry(k).or_insert(v);
363                }
364            }
365            let Some(offset) = repr.prior else { break };
366            repr = reader.fetch(offset)?;
367        }
368
369        // Since there's no prior, we can remove tombstones
370        map.retain(|_, kv| {
371            kv.retain(|_, v| v.is_some());
372            !kv.is_empty()
373        });
374
375        Ok(self
376            .write_facts(LinearFactPerspective {
377                map,
378                prior: FactPerspectivePrior::None,
379            })?
380            .repr)
381    }
382}
383
384impl<F: Write> Storage for LinearStorage<F> {
385    type Perspective = LinearPerspective<F::ReadOnly>;
386    type FactPerspective = LinearFactPerspective<F::ReadOnly>;
387    type Segment = LinearSegment<F::ReadOnly>;
388    type FactIndex = LinearFactIndex<F::ReadOnly>;
389
390    fn get_command_id(&self, location: Location) -> Result<CommandId, StorageError> {
391        let seg = self.get_segment(location)?;
392        let cmd = seg
393            .get_command(location)
394            .ok_or(StorageError::CommandOutOfBounds(location))?;
395        Ok(cmd.id())
396    }
397
398    fn get_linear_perspective(
399        &self,
400        parent: Location,
401    ) -> Result<Option<Self::Perspective>, StorageError> {
402        let segment = self.get_segment(parent)?;
403        let command = segment
404            .get_command(parent)
405            .ok_or(StorageError::CommandOutOfBounds(parent))?;
406        let policy = segment.repr.policy;
407        let prior_facts: FactPerspectivePrior<F::ReadOnly> = if parent == segment.head_location() {
408            FactPerspectivePrior::FactIndex {
409                offset: segment.repr.facts,
410                reader: self.writer.readonly(),
411            }
412        } else {
413            let prior = match segment.facts()?.repr.prior {
414                Some(offset) => FactPerspectivePrior::FactIndex {
415                    offset,
416                    reader: self.writer.readonly(),
417                },
418                None => FactPerspectivePrior::None,
419            };
420            let mut facts = LinearFactPerspective::new(prior);
421            for data in &segment.repr.commands[..=parent.command] {
422                facts.apply_updates(&data.updates);
423            }
424            if facts.prior.is_none() {
425                facts.map.retain(|_, kv| !kv.is_empty());
426            }
427            if facts.map.is_empty() {
428                facts.prior
429            } else {
430                FactPerspectivePrior::FactPerspective(Box::new(facts))
431            }
432        };
433        let prior = Prior::Single(parent);
434
435        let perspective = LinearPerspective::new(
436            prior,
437            Prior::Single(command.address()?),
438            policy,
439            prior_facts,
440            command
441                .max_cut()?
442                .checked_add(1)
443                .assume("must not overflow")?,
444            None,
445        );
446
447        Ok(Some(perspective))
448    }
449
450    fn get_fact_perspective(
451        &self,
452        location: Location,
453    ) -> Result<Self::FactPerspective, StorageError> {
454        let segment = self.get_segment(location)?;
455
456        // If at head of segment, or no facts in segment,
457        // we don't need to apply updates.
458        if location == segment.head_location()
459            || segment
460                .repr
461                .commands
462                .iter()
463                .all(|cmd| cmd.updates.is_empty())
464        {
465            return Ok(LinearFactPerspective::new(
466                FactPerspectivePrior::FactIndex {
467                    offset: segment.repr.facts,
468                    reader: self.writer.readonly(),
469                },
470            ));
471        }
472
473        let prior = match segment.facts()?.repr.prior {
474            Some(offset) => FactPerspectivePrior::FactIndex {
475                offset,
476                reader: self.writer.readonly(),
477            },
478            None => FactPerspectivePrior::None,
479        };
480        let mut facts = LinearFactPerspective::new(prior);
481        for data in &segment.repr.commands[..=location.command] {
482            facts.apply_updates(&data.updates);
483        }
484
485        Ok(facts)
486    }
487
488    fn new_merge_perspective(
489        &self,
490        left: Location,
491        right: Location,
492        last_common_ancestor: (Location, usize),
493        policy_id: PolicyId,
494        braid: Self::FactIndex,
495    ) -> Result<Option<Self::Perspective>, StorageError> {
496        // TODO(jdygert): ensure braid belongs to this storage.
497        // TODO(jdygert): ensure braid ends at given command?
498        let left_segment = self.get_segment(left)?;
499        let left_command = left_segment
500            .get_command(left)
501            .ok_or(StorageError::CommandOutOfBounds(left))?;
502        let right_segment = self.get_segment(right)?;
503        let right_command = right_segment
504            .get_command(right)
505            .ok_or(StorageError::CommandOutOfBounds(right))?;
506
507        let parent = Prior::Merge(left_command.address()?, right_command.address()?);
508
509        if policy_id != left_segment.policy() && policy_id != right_segment.policy() {
510            return Err(StorageError::PolicyMismatch);
511        }
512
513        let prior = Prior::Merge(left, right);
514
515        let perspective = LinearPerspective::new(
516            prior,
517            parent,
518            policy_id,
519            FactPerspectivePrior::FactIndex {
520                offset: braid.repr.offset,
521                reader: braid.reader,
522            },
523            left_command
524                .max_cut()?
525                .max(right_command.max_cut()?)
526                .checked_add(1)
527                .assume("must not overflow")?,
528            Some(last_common_ancestor),
529        );
530
531        Ok(Some(perspective))
532    }
533
534    fn get_segment(&self, location: Location) -> Result<Self::Segment, StorageError> {
535        let reader = self.writer.readonly();
536        let repr = reader.fetch(location.segment)?;
537        let seg = LinearSegment { repr, reader };
538
539        Ok(seg)
540    }
541
542    fn get_head(&self) -> Result<Location, StorageError> {
543        self.writer.head()
544    }
545
546    fn commit(&mut self, segment: Self::Segment) -> Result<(), StorageError> {
547        if !self.is_ancestor(self.get_head()?, &segment)? {
548            return Err(StorageError::HeadNotAncestor);
549        }
550
551        self.writer.commit(segment.head_location())
552    }
553
554    fn write(&mut self, perspective: Self::Perspective) -> Result<Self::Segment, StorageError> {
555        // TODO(jdygert): Validate prior?
556
557        let facts = self.write_facts(perspective.facts)?.repr.offset;
558
559        let commands: Vec1<CommandData> = perspective
560            .commands
561            .try_into()
562            .map_err(|_| StorageError::EmptyPerspective)?;
563
564        let get_skips =
565            |l: Location, count: usize| -> Result<Vec<(Location, usize)>, StorageError> {
566                let mut rng = &mut Rng as &mut dyn Csprng;
567                let mut skips = vec![];
568                for _ in 0..count {
569                    let segment = self.get_segment(l)?;
570                    let l_max_cut = segment
571                        .get_command(l)
572                        .assume("location must exist")?
573                        .max_cut;
574                    if l_max_cut > 0 {
575                        let max_cut = rng.gen_range(0..l_max_cut);
576                        if let Some(skip) = self.get_skip(segment, max_cut)? {
577                            if !skips.contains(&skip) {
578                                skips.push(skip);
579                            }
580                        } else {
581                            break;
582                        }
583                    }
584                }
585                Ok(skips)
586            };
587
588        let skip_list = match perspective.prior {
589            Prior::None => vec![],
590            Prior::Merge(_, _) => {
591                let (lca, max_cut) = perspective.last_common_ancestor.assume("lca must exist")?;
592                let mut skips = get_skips(lca, 2)?;
593                if !skips.contains(&(lca, max_cut)) {
594                    skips.push((lca, max_cut));
595                }
596                skips.sort();
597                skips
598            }
599            Prior::Single(l) => {
600                let mut skips = get_skips(l, 3)?;
601                skips.sort();
602                skips
603            }
604        };
605        let repr = self.writer.append(|offset| SegmentRepr {
606            offset,
607            prior: perspective.prior,
608            parents: perspective.parents,
609            policy: perspective.policy,
610            facts,
611            commands,
612            max_cut: perspective.max_cut,
613            skip_list,
614        })?;
615
616        Ok(LinearSegment {
617            repr,
618            reader: self.writer.readonly(),
619        })
620    }
621
622    fn write_facts(
623        &mut self,
624        facts: Self::FactPerspective,
625    ) -> Result<Self::FactIndex, StorageError> {
626        let mut prior = match facts.prior {
627            FactPerspectivePrior::None => None,
628            FactPerspectivePrior::FactPerspective(prior) => {
629                let prior = self.write_facts(*prior)?;
630                if facts.map.is_empty() {
631                    return Ok(prior);
632                }
633                Some(prior.repr)
634            }
635            FactPerspectivePrior::FactIndex { offset, reader } => {
636                let repr = reader.fetch(offset)?;
637                if facts.map.is_empty() {
638                    return Ok(LinearFactIndex { repr, reader });
639                }
640                Some(repr)
641            }
642        };
643
644        let depth = if let Some(mut p) = prior.take() {
645            if p.depth > MAX_FACT_INDEX_DEPTH - 1 {
646                p = self.compact(p)?;
647            }
648            prior.insert(p).depth
649        } else {
650            0
651        };
652
653        let depth = depth.checked_add(1).assume("depth won't overflow")?;
654
655        if depth > MAX_FACT_INDEX_DEPTH {
656            bug!("fact index too deep");
657        }
658
659        let repr = self.writer.append(|offset| FactIndexRepr {
660            offset,
661            prior: prior.map(|p| p.offset),
662            depth,
663            facts: facts.map,
664        })?;
665
666        Ok(LinearFactIndex {
667            repr,
668            reader: self.writer.readonly(),
669        })
670    }
671}
672
673impl<R: Read> Segment for LinearSegment<R> {
674    type FactIndex = LinearFactIndex<R>;
675    type Command<'a>
676        = LinearCommand<'a>
677    where
678        R: 'a;
679
680    fn head(&self) -> Result<Self::Command<'_>, StorageError> {
681        let data = self.repr.commands.last();
682        let parent = if let Some(prev) = usize::checked_sub(self.repr.commands.len(), 2) {
683            Prior::Single(Address {
684                id: self.repr.commands[prev].id,
685                max_cut: self
686                    .repr
687                    .max_cut
688                    .checked_add(prev)
689                    .assume("must not overflow")?,
690            })
691        } else {
692            self.repr.parents
693        };
694        Ok(LinearCommand {
695            id: &data.id,
696            parent,
697            priority: data.priority.clone(),
698            policy: data.policy.as_deref(),
699            data: &data.data,
700            max_cut: self
701                .repr
702                .max_cut
703                .checked_add(self.repr.commands.len())
704                .assume("must not overflow")?
705                .checked_sub(1)
706                .assume("must not overflow")?,
707        })
708    }
709
710    fn first(&self) -> Self::Command<'_> {
711        let data = self.repr.commands.first();
712        LinearCommand {
713            id: &data.id,
714            parent: self.repr.parents,
715            priority: data.priority.clone(),
716            policy: data.policy.as_deref(),
717            data: &data.data,
718            max_cut: self.repr.max_cut,
719        }
720    }
721
722    fn head_location(&self) -> Location {
723        // vec1 length >= 1
724        #[allow(clippy::arithmetic_side_effects)]
725        Location::new(self.repr.offset, self.repr.commands.len() - 1)
726    }
727
728    fn first_location(&self) -> Location {
729        Location::new(self.repr.offset, 0)
730    }
731
732    fn contains(&self, location: Location) -> bool {
733        location.segment == self.repr.offset && location.command < self.repr.commands.len()
734    }
735
736    fn policy(&self) -> PolicyId {
737        self.repr.policy
738    }
739
740    fn prior(&self) -> Prior<Location> {
741        self.repr.prior
742    }
743
744    fn get_command(&self, location: Location) -> Option<Self::Command<'_>> {
745        if self.repr.offset != location.segment {
746            return None;
747        }
748        let data = self.repr.commands.get(location.command)?;
749        let parent = if let Some(prev) = usize::checked_sub(location.command, 1) {
750            if let Some(max_cut) = self.repr.max_cut.checked_add(prev) {
751                Prior::Single(Address {
752                    id: self.repr.commands[prev].id,
753                    max_cut,
754                })
755            } else {
756                return None;
757            }
758        } else {
759            self.repr.parents
760        };
761        self.repr
762            .max_cut
763            .checked_add(location.command)
764            .map(|max_cut| LinearCommand {
765                id: &data.id,
766                parent,
767                priority: data.priority.clone(),
768                policy: data.policy.as_deref(),
769                data: &data.data,
770                max_cut,
771            })
772    }
773
774    fn get_from(&self, location: Location) -> Vec<Self::Command<'_>> {
775        if self.repr.offset != location.segment {
776            // TODO(jdygert): Result?
777            return Vec::new();
778        }
779
780        // TODO(jdygert): Optimize?
781        (location.command..self.repr.commands.len())
782            .map(|c| Location::new(location.segment, c))
783            .map(|loc| {
784                self.get_command(loc)
785                    .expect("constructed location is valid")
786            })
787            .collect()
788    }
789
790    fn get_from_max_cut(&self, max_cut: usize) -> Result<Option<Location>, StorageError> {
791        if max_cut >= self.repr.max_cut
792            && max_cut
793                <= self
794                    .repr
795                    .max_cut
796                    .checked_add(self.repr.commands.len())
797                    .assume("must not overflow")?
798        {
799            return Ok(Some(Location::new(
800                self.repr.offset,
801                max_cut
802                    .checked_sub(self.repr.max_cut)
803                    .assume("must not overflow")?,
804            )));
805        }
806        Ok(None)
807    }
808
809    fn facts(&self) -> Result<Self::FactIndex, StorageError> {
810        Ok(LinearFactIndex {
811            repr: self.reader.fetch(self.repr.facts)?,
812            reader: self.reader.clone(),
813        })
814    }
815
816    fn skip_list(&self) -> &[(Location, usize)] {
817        &self.repr.skip_list
818    }
819
820    fn shortest_max_cut(&self) -> usize {
821        self.repr.max_cut
822    }
823
824    fn longest_max_cut(&self) -> Result<usize, StorageError> {
825        Ok(self
826            .repr
827            .max_cut
828            .checked_add(self.repr.commands.len())
829            .assume("must not overflow")?
830            .checked_sub(1)
831            .assume("must not overflow")?)
832    }
833}
834
835impl<R: Read> FactIndex for LinearFactIndex<R> {}
836
837type MapIter = alloc::collections::btree_map::IntoIter<Keys, Option<Bytes>>;
838pub struct QueryIterator {
839    it: MapIter,
840}
841
842impl QueryIterator {
843    fn new(it: MapIter) -> Self {
844        Self { it }
845    }
846}
847
848impl Iterator for QueryIterator {
849    type Item = Result<Fact, StorageError>;
850    fn next(&mut self) -> Option<Self::Item> {
851        loop {
852            // filter out tombstones
853            if let (key, Some(value)) = self.it.next()? {
854                return Some(Ok(Fact { key, value }));
855            }
856        }
857    }
858}
859
860impl<R: Read> Query for LinearFactIndex<R> {
861    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
862        let mut prior = Some(&self.repr);
863        let mut slot; // Need to store deserialized value.
864        while let Some(facts) = prior {
865            if let Some(v) = facts.facts.get(name).and_then(|m| m.get(keys)) {
866                return Ok(v.as_ref().cloned());
867            }
868            slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
869            prior = slot.as_ref();
870        }
871        Ok(None)
872    }
873
874    type QueryIterator = QueryIterator;
875    fn query_prefix(
876        &self,
877        name: &str,
878        prefix: &[Box<[u8]>],
879    ) -> Result<QueryIterator, StorageError> {
880        Ok(QueryIterator::new(
881            self.query_prefix_inner(name, prefix)?.into_iter(),
882        ))
883    }
884}
885
886impl<R: Read> LinearFactIndex<R> {
887    fn query_prefix_inner(
888        &self,
889        name: &str,
890        prefix: &[Box<[u8]>],
891    ) -> Result<FactMap, StorageError> {
892        let mut matches = BTreeMap::new();
893        let mut prior = Some(&self.repr);
894        let mut slot; // Need to store deserialized value.
895        while let Some(facts) = prior {
896            if let Some(map) = facts.facts.get(name) {
897                for (k, v) in super::memory::find_prefixes(map, prefix) {
898                    // don't override, if we've already found the fact (including deletions)
899                    if !matches.contains_key(k) {
900                        matches.insert(k.clone(), v.map(Into::into));
901                    }
902                }
903            }
904            slot = facts.prior.map(|p| self.reader.fetch(p)).transpose()?;
905            prior = slot.as_ref();
906        }
907        Ok(matches)
908    }
909}
910
911impl<R> LinearFactPerspective<R> {
912    fn clear(&mut self) {
913        self.map.clear();
914    }
915
916    fn apply_updates(&mut self, updates: &[Update]) {
917        for (name, key, value) in updates {
918            if self.prior.is_none() {
919                if let Some(value) = value {
920                    self.map
921                        .entry(name.clone())
922                        .or_default()
923                        .insert(key.clone(), Some(value.clone()));
924                } else if let Some(e) = self.map.get_mut(name) {
925                    e.remove(key);
926                }
927            } else {
928                self.map
929                    .entry(name.clone())
930                    .or_default()
931                    .insert(key.clone(), value.clone());
932            }
933        }
934    }
935}
936
937impl<R: Read> FactPerspective for LinearFactPerspective<R> {}
938
939impl<R: Read> Query for LinearFactPerspective<R> {
940    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
941        if let Some(wrapped) = self.map.get(name).and_then(|m| m.get(keys)) {
942            return Ok(wrapped.as_deref().map(Box::from));
943        }
944        match &self.prior {
945            FactPerspectivePrior::None => Ok(None),
946            FactPerspectivePrior::FactPerspective(prior) => prior.query(name, keys),
947            FactPerspectivePrior::FactIndex { offset, reader } => {
948                let repr: FactIndexRepr = reader.fetch(*offset)?;
949                let prior = LinearFactIndex {
950                    repr,
951                    reader: reader.clone(),
952                };
953                prior.query(name, keys)
954            }
955        }
956    }
957
958    type QueryIterator = QueryIterator;
959    fn query_prefix(
960        &self,
961        name: &str,
962        prefix: &[Box<[u8]>],
963    ) -> Result<QueryIterator, StorageError> {
964        Ok(QueryIterator::new(
965            self.query_prefix_inner(name, prefix)?.into_iter(),
966        ))
967    }
968}
969
970impl<R: Read> LinearFactPerspective<R> {
971    fn query_prefix_inner(
972        &self,
973        name: &str,
974        prefix: &[Box<[u8]>],
975    ) -> Result<FactMap, StorageError> {
976        let mut matches = match &self.prior {
977            FactPerspectivePrior::None => BTreeMap::new(),
978            FactPerspectivePrior::FactPerspective(prior) => {
979                prior.query_prefix_inner(name, prefix)?
980            }
981            FactPerspectivePrior::FactIndex { offset, reader } => {
982                let repr: FactIndexRepr = reader.fetch(*offset)?;
983                let prior = LinearFactIndex {
984                    repr,
985                    reader: reader.clone(),
986                };
987                prior.query_prefix_inner(name, prefix)?
988            }
989        };
990        if let Some(map) = self.map.get(name) {
991            for (k, v) in super::memory::find_prefixes(map, prefix) {
992                // overwrite "earlier" facts
993                matches.insert(k.clone(), v.map(Into::into));
994            }
995        }
996        Ok(matches)
997    }
998}
999
1000impl<R: Read> QueryMut for LinearFactPerspective<R> {
1001    fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1002        self.map.entry(name).or_default().insert(keys, Some(value));
1003    }
1004
1005    fn delete(&mut self, name: String, keys: Keys) {
1006        if self.prior.is_none() {
1007            // No need for tombstones with no prior.
1008            if let Some(kv) = self.map.get_mut(&name) {
1009                kv.remove(&keys);
1010            }
1011        } else {
1012            self.map.entry(name).or_default().insert(keys, None);
1013        }
1014    }
1015}
1016
1017impl<R: Read> FactPerspective for LinearPerspective<R> {}
1018
1019impl<R: Read> Query for LinearPerspective<R> {
1020    fn query(&self, name: &str, keys: &[Box<[u8]>]) -> Result<Option<Box<[u8]>>, StorageError> {
1021        self.facts.query(name, keys)
1022    }
1023
1024    type QueryIterator = QueryIterator;
1025    fn query_prefix(
1026        &self,
1027        name: &str,
1028        prefix: &[Box<[u8]>],
1029    ) -> Result<QueryIterator, StorageError> {
1030        self.facts.query_prefix(name, prefix)
1031    }
1032}
1033
1034impl<R: Read> QueryMut for LinearPerspective<R> {
1035    fn insert(&mut self, name: String, keys: Keys, value: Bytes) {
1036        self.facts.insert(name.clone(), keys.clone(), value.clone());
1037        self.current_updates.push((name, keys, Some(value)));
1038    }
1039
1040    fn delete(&mut self, name: String, keys: Keys) {
1041        self.facts.delete(name.clone(), keys.clone());
1042        self.current_updates.push((name, keys, None))
1043    }
1044}
1045
1046impl<R: Read> Revertable for LinearPerspective<R> {
1047    fn checkpoint(&self) -> Checkpoint {
1048        Checkpoint {
1049            index: self.commands.len(),
1050        }
1051    }
1052
1053    fn revert(&mut self, checkpoint: Checkpoint) -> Result<(), Bug> {
1054        if checkpoint.index == self.commands.len() {
1055            return Ok(());
1056        }
1057
1058        if checkpoint.index > self.commands.len() {
1059            bug!("A checkpoint's index should always be less than or equal to the length of a perspective's command history!");
1060        }
1061
1062        self.commands.truncate(checkpoint.index);
1063        self.facts.clear();
1064        self.current_updates.clear();
1065        for data in &self.commands {
1066            self.facts.apply_updates(&data.updates);
1067        }
1068
1069        Ok(())
1070    }
1071}
1072
1073impl<R: Read> Perspective for LinearPerspective<R> {
1074    fn policy(&self) -> PolicyId {
1075        self.policy
1076    }
1077
1078    fn add_command(&mut self, command: &impl Command) -> Result<usize, StorageError> {
1079        if command.parent() != self.head_address()? {
1080            return Err(StorageError::PerspectiveHeadMismatch);
1081        }
1082
1083        self.commands.push(CommandData {
1084            id: command.id(),
1085            priority: command.priority(),
1086            policy: command.policy().map(Box::from),
1087            data: command.bytes().into(),
1088            updates: core::mem::take(&mut self.current_updates),
1089        });
1090        Ok(self.commands.len())
1091    }
1092
1093    fn includes(&self, id: CommandId) -> bool {
1094        self.commands.iter().any(|cmd| cmd.id == id)
1095    }
1096
1097    fn head_address(&self) -> Result<Prior<Address>, Bug> {
1098        Ok(if let Some(last) = self.commands.last() {
1099            Prior::Single(Address {
1100                id: last.id,
1101                max_cut: self
1102                    .max_cut
1103                    .checked_add(self.commands.len())
1104                    .assume("must not overflow")?
1105                    .checked_sub(1)
1106                    .assume("must not overflow")?,
1107            })
1108        } else {
1109            self.parents
1110        })
1111    }
1112}
1113
1114impl From<Prior<Address>> for Prior<CommandId> {
1115    fn from(p: Prior<Address>) -> Self {
1116        match p {
1117            Prior::None => Prior::None,
1118            Prior::Single(l) => Prior::Single(l.id),
1119            Prior::Merge(l, r) => Prior::Merge(l.id, r.id),
1120        }
1121    }
1122}
1123
1124impl Command for LinearCommand<'_> {
1125    fn priority(&self) -> Priority {
1126        self.priority.clone()
1127    }
1128
1129    fn id(&self) -> CommandId {
1130        *self.id
1131    }
1132
1133    fn parent(&self) -> Prior<Address> {
1134        self.parent
1135    }
1136
1137    fn policy(&self) -> Option<&[u8]> {
1138        self.policy
1139    }
1140
1141    fn bytes(&self) -> &[u8] {
1142        self.data
1143    }
1144
1145    fn max_cut(&self) -> Result<usize, Bug> {
1146        Ok(self.max_cut)
1147    }
1148}
1149
1150#[cfg(test)]
1151mod test {
1152    use testing::Manager;
1153
1154    use super::*;
1155    use crate::testing::dsl::{test_suite, StorageBackend};
1156
1157    #[test]
1158    fn test_query_prefix() {
1159        let mut provider = LinearStorageProvider::new(Manager::new());
1160        let mut fp = provider.new_perspective(PolicyId::new(0));
1161
1162        let name = "x";
1163
1164        let keys: &[&[&str]] = &[
1165            &["aa", "xy", "123"],
1166            &["aa", "xz", "123"],
1167            &["bb", "ccc"],
1168            &["bc", ""],
1169        ];
1170        let keys: Vec<Keys> = keys
1171            .iter()
1172            .map(|ks| ks.iter().map(|k| k.as_bytes()).collect())
1173            .collect();
1174
1175        for ks in &keys {
1176            fp.insert(
1177                name.into(),
1178                ks.clone(),
1179                format!("{ks:?}").into_bytes().into(),
1180            );
1181        }
1182
1183        let prefixes: &[&[&str]] = &[
1184            &["aa", "xy", "12"],
1185            &["aa", "xy"],
1186            &["aa", "xz"],
1187            &["aa", "x"],
1188            &["bb", ""],
1189            &["bb", "ccc"],
1190            &["bc", ""],
1191            &["bc", "", ""],
1192        ];
1193
1194        for prefix in prefixes {
1195            let prefix: Keys = prefix.iter().map(|k| k.as_bytes()).collect();
1196            let found: Vec<_> = fp.query_prefix(name, &prefix).unwrap().collect();
1197            let mut expected: Vec<_> = keys.iter().filter(|k| k.starts_with(&prefix)).collect();
1198            expected.sort();
1199            assert_eq!(found.len(), expected.len());
1200            for (a, b) in std::iter::zip(found, expected) {
1201                let a = a.unwrap();
1202                assert_eq!(&a.key, b);
1203                assert_eq!(a.value.as_ref(), format!("{b:?}").as_bytes());
1204            }
1205        }
1206    }
1207
1208    struct LinearBackend;
1209    impl StorageBackend for LinearBackend {
1210        type StorageProvider = LinearStorageProvider<Manager>;
1211
1212        fn provider(&mut self, _client_id: u64) -> Self::StorageProvider {
1213            LinearStorageProvider::new(Manager::new())
1214        }
1215    }
1216    test_suite!(|| LinearBackend);
1217}