Skip to main content

aranya_runtime/client/
transaction.rs

1use alloc::collections::{BTreeMap, VecDeque};
2use core::{marker::PhantomData, mem};
3
4use buggy::{BugExt as _, bug};
5
6use super::braiding;
7use crate::{
8    Address, ClientError, CmdId, Command, GraphId, Location, MAX_COMMAND_LENGTH, MergeIds,
9    Perspective as _, Policy as _, PolicyError, PolicyId, PolicyStore, Prior, Revertable as _,
10    Segment as _, Sink, Storage, StorageError, StorageProvider, policy::CommandPlacement,
11};
12
13/// Transaction used to receive many commands at once.
14///
15/// The transaction allows us to have many temporary heads at once, so we don't
16/// need as many merges when adding commands. When the transaction is committed,
17/// we will merge all temporary heads and the graph head, and then commit the
18/// result as the new graph head.
19pub struct Transaction<SP: StorageProvider, PS> {
20    /// The ID of the associated graph
21    graph_id: GraphId,
22    /// Current working perspective
23    perspective: Option<SP::Perspective>,
24    /// Head of the current perspective
25    phead: Option<CmdId>,
26    /// Written but not committed heads
27    heads: BTreeMap<Address, Location>,
28    /// Tag for associated policy store
29    policy_store: PhantomData<PS>,
30}
31
32impl<SP: StorageProvider, PS> Transaction<SP, PS> {
33    pub(super) const fn new(graph_id: GraphId) -> Self {
34        Self {
35            graph_id,
36            perspective: None,
37            phead: None,
38            heads: BTreeMap::new(),
39            policy_store: PhantomData,
40        }
41    }
42}
43
44impl<SP: StorageProvider, PS: PolicyStore> Transaction<SP, PS> {
45    /// Returns the transaction's graph id.
46    pub fn graph_id(&self) -> GraphId {
47        self.graph_id
48    }
49
50    /// Find a given id if reachable within this transaction.
51    ///
52    /// Does not search `self.perspective`, which should be written out beforehand.
53    fn locate(
54        &self,
55        storage: &mut SP::Storage,
56        address: Address,
57    ) -> Result<Option<Location>, ClientError> {
58        // Search from committed head.
59        if let Some(found) = storage.get_location(address)? {
60            return Ok(Some(found));
61        }
62        // Search from our temporary heads.
63        for &head in self.heads.values() {
64            if let Some(found) = storage.get_location_from(head, address)? {
65                return Ok(Some(found));
66            }
67        }
68        Ok(None)
69    }
70
71    /// Write current perspective, merge transaction heads, and commit to graph.
72    pub(super) fn commit(
73        &mut self,
74        provider: &mut SP,
75        policy_store: &mut PS,
76        sink: &mut impl Sink<PS::Effect>,
77    ) -> Result<(), ClientError> {
78        let storage = provider.get_storage(self.graph_id)?;
79
80        // Write out current perspective.
81        if let Some(p) = Option::take(&mut self.perspective) {
82            self.phead = None;
83            let segment = storage.write(p)?;
84            let head = segment.head()?;
85            self.heads.insert(head.address()?, segment.head_location());
86        }
87
88        // Merge heads pairwise until single head left, then commit.
89        // TODO(#370): Merge deterministically
90        let mut heads: VecDeque<_> = mem::take(&mut self.heads).into_iter().collect();
91        let mut merging_head = false;
92        while let Some((left_id, mut left_loc)) = heads.pop_front() {
93            if let Some((right_id, mut right_loc)) = heads.pop_front() {
94                let (policy, policy_id) =
95                    choose_policy(storage, policy_store, left_loc, right_loc)?;
96
97                let mut buffer = [0u8; MAX_COMMAND_LENGTH];
98                let merge_ids = MergeIds::new(left_id, right_id).assume("merging different ids")?;
99                if left_id > right_id {
100                    mem::swap(&mut left_loc, &mut right_loc);
101                }
102                let command = policy.merge(&mut buffer, merge_ids)?;
103
104                let (braid, last_common_ancestor) =
105                    make_braid_segment::<_, PS>(storage, left_loc, right_loc, sink, policy)?;
106
107                let mut perspective = storage.new_merge_perspective(
108                    left_loc,
109                    right_loc,
110                    last_common_ancestor,
111                    policy_id,
112                    braid,
113                )?;
114                perspective.add_command(&command)?;
115
116                let segment = storage.write(perspective)?;
117                let head = segment.head()?;
118                heads.push_back((head.address()?, segment.head_location()));
119            } else {
120                let segment = storage.get_segment(left_loc)?;
121                // Try to commit. If it fails with `HeadNotAncestor`, we know we
122                // need to merge with the graph head.
123                match storage.commit(segment) {
124                    Ok(()) => break,
125                    Err(StorageError::HeadNotAncestor) => {
126                        if merging_head {
127                            bug!("merging with graph head again, would loop");
128                        }
129
130                        merging_head = true;
131
132                        heads.push_back((left_id, left_loc));
133
134                        let head_loc = storage.get_head()?;
135                        let segment = storage.get_segment(head_loc)?;
136                        let head = segment.head()?;
137                        heads.push_back((head.address()?, segment.head_location()));
138                    }
139                    Err(e) => return Err(e.into()),
140                }
141            }
142        }
143
144        Ok(())
145    }
146
147    /// Attempt to store the `command` in the graph with `graph_id`. Effects will be
148    /// emitted to the `sink`. This interface is used when syncing with another device
149    /// and integrating the new commands.
150    pub(super) fn add_commands(
151        &mut self,
152        commands: &[impl Command],
153        provider: &mut SP,
154        policy_store: &mut PS,
155        sink: &mut impl Sink<PS::Effect>,
156    ) -> Result<usize, ClientError> {
157        let mut commands = commands.iter();
158        let mut count: usize = 0;
159
160        // Get storage or try to initialize with first command.
161        let storage = match provider.get_storage(self.graph_id) {
162            Ok(s) => s,
163            Err(StorageError::NoSuchStorage) => {
164                let command = commands.next().ok_or(ClientError::InitError)?;
165                count = count.checked_add(1).assume("must not overflow")?;
166                self.init(command, policy_store, provider, sink)?
167            }
168            Err(e) => return Err(e.into()),
169        };
170
171        // Handle remaining commands.
172        for command in commands {
173            if self
174                .perspective
175                .as_ref()
176                .is_some_and(|p| p.includes(command.id()))
177            {
178                // Command in current perspective.
179                continue;
180            }
181
182            if self.locate(storage, command.address()?)?.is_some() {
183                // Command already added.
184                continue;
185            }
186            match command.parent() {
187                Prior::None => {
188                    if command.id().as_base() == self.graph_id.as_base() {
189                        // Graph already initialized, extra init just spurious
190                    } else {
191                        bug!("init command does not belong in graph");
192                    }
193                }
194                Prior::Single(parent) => {
195                    self.add_single(storage, policy_store, sink, command, parent)?;
196                    count = count.checked_add(1).assume("must not overflow")?;
197                }
198                Prior::Merge(left, right) => {
199                    self.add_merge(storage, policy_store, sink, command, left, right)?;
200                    count = count.checked_add(1).assume("must not overflow")?;
201                }
202            }
203        }
204
205        Ok(count)
206    }
207
208    fn add_single(
209        &mut self,
210        storage: &mut <SP as StorageProvider>::Storage,
211        policy_store: &mut PS,
212        sink: &mut impl Sink<PS::Effect>,
213        command: &impl Command,
214        parent: Address,
215    ) -> Result<(), ClientError> {
216        let perspective = self.get_perspective(parent, storage)?;
217
218        let policy_id = perspective.policy();
219        let policy = policy_store.get_policy(policy_id)?;
220
221        // Try to run command, or revert if failed.
222        sink.begin();
223        let checkpoint = perspective.checkpoint();
224        if let Err(e) = policy.call_rule(
225            command,
226            perspective,
227            sink,
228            CommandPlacement::OnGraphAtOrigin,
229        ) {
230            perspective.revert(checkpoint)?;
231            sink.rollback();
232            return Err(e.into());
233        }
234        perspective.add_command(command)?;
235        sink.commit();
236
237        self.phead = Some(command.id());
238
239        Ok(())
240    }
241
242    fn add_merge(
243        &mut self,
244        storage: &mut <SP as StorageProvider>::Storage,
245        policy_store: &mut PS,
246        sink: &mut impl Sink<PS::Effect>,
247        command: &impl Command,
248        left: Address,
249        right: Address,
250    ) -> Result<bool, ClientError> {
251        // Must always start a new perspective for merges.
252        if let Some(p) = Option::take(&mut self.perspective) {
253            let seg = storage.write(p)?;
254            let head = seg.head()?;
255            self.heads.insert(head.address()?, seg.head_location());
256        }
257
258        let left_loc = self
259            .locate(storage, left)?
260            .ok_or(ClientError::NoSuchParent(left.id))?;
261        let right_loc = self
262            .locate(storage, right)?
263            .ok_or(ClientError::NoSuchParent(right.id))?;
264
265        let (policy, policy_id) = choose_policy(storage, policy_store, left_loc, right_loc)?;
266
267        // Braid commands from left and right into an ordered sequence.
268        let (braid, last_common_ancestor) =
269            make_braid_segment::<_, PS>(storage, left_loc, right_loc, sink, policy)?;
270
271        let mut perspective = storage.new_merge_perspective(
272            left_loc,
273            right_loc,
274            last_common_ancestor,
275            policy_id,
276            braid,
277        )?;
278        perspective.add_command(command)?;
279
280        // These are no longer heads of the transaction, since they are both covered by the merge
281        self.heads.remove(&left);
282        self.heads.remove(&right);
283
284        self.perspective = Some(perspective);
285        self.phead = Some(command.id());
286
287        Ok(true)
288    }
289
290    /// Get a perspective to which we can add a command with the given parant.
291    ///
292    /// If parent is the head of the current perspective, we can just use it.
293    /// Otherwise, we must write out the perspective and get a new one.
294    fn get_perspective(
295        &mut self,
296        parent: Address,
297        storage: &mut <SP as StorageProvider>::Storage,
298    ) -> Result<&mut <SP as StorageProvider>::Perspective, ClientError> {
299        if self.phead == Some(parent.id) {
300            // Command will append to current perspective.
301            return Ok(self
302                .perspective
303                .as_mut()
304                .assume("trx has perspective when has phead")?);
305        }
306
307        // Write out the current perspective.
308        if let Some(p) = Option::take(&mut self.perspective) {
309            self.phead = None;
310            let seg = storage.write(p)?;
311            let head = seg.head()?;
312            self.heads.insert(head.address()?, seg.head_location());
313        }
314
315        let loc = self
316            .locate(storage, parent)?
317            .ok_or(ClientError::NoSuchParent(parent.id))?;
318
319        // Get a new perspective and store it in the transaction.
320        let p = self
321            .perspective
322            .insert(storage.get_linear_perspective(loc)?);
323
324        self.phead = Some(parent.id);
325        self.heads.remove(&parent);
326
327        Ok(p)
328    }
329
330    fn init<'sp>(
331        &mut self,
332        command: &impl Command,
333        policy_store: &mut PS,
334        provider: &'sp mut SP,
335        sink: &mut impl Sink<PS::Effect>,
336    ) -> Result<&'sp mut <SP as StorageProvider>::Storage, ClientError> {
337        // Graph ID is the id of the init command by definition.
338        if self.graph_id.as_base() != command.id().as_base() {
339            return Err(ClientError::InitError);
340        }
341
342        // The init command must not have a parent.
343        if !matches!(command.parent(), Prior::None) {
344            return Err(ClientError::InitError);
345        }
346
347        // The graph must have policy to start with.
348        let Some(policy_data) = command.policy() else {
349            return Err(ClientError::InitError);
350        };
351
352        let policy_id = policy_store.add_policy(policy_data)?;
353        let policy = policy_store.get_policy(policy_id)?;
354
355        // Get an empty perspective and run the init command.
356        let mut perspective = provider.new_perspective(policy_id);
357        sink.begin();
358        if let Err(e) = policy.call_rule(
359            command,
360            &mut perspective,
361            sink,
362            CommandPlacement::OnGraphAtOrigin,
363        ) {
364            sink.rollback();
365            // We don't need to revert perspective since we just drop it.
366            return Err(e.into());
367        }
368        perspective.add_command(command)?;
369
370        let (_, storage) = provider.new_storage(perspective)?;
371
372        // Wait to commit until we are absolutely sure we've initialized.
373        sink.commit();
374
375        Ok(storage)
376    }
377}
378
379/// Run the braid algorithm and evaluate the sequence to create a braided fact index.
380fn make_braid_segment<S: Storage, PS: PolicyStore>(
381    storage: &mut S,
382    left: Location,
383    right: Location,
384    sink: &mut impl Sink<PS::Effect>,
385    policy: &PS::Policy,
386) -> Result<(S::FactIndex, (Location, usize)), ClientError> {
387    let order = braiding::braid(storage, left, right)?;
388    let last_common_ancestor = braiding::last_common_ancestor(storage, left, right)?;
389
390    let (&first, rest) = order.split_first().assume("braid is non-empty")?;
391
392    let mut braid_perspective = storage.get_fact_perspective(first)?;
393
394    sink.begin();
395
396    for &location in rest {
397        let segment = storage.get_segment(location)?;
398        let command = segment
399            .get_command(location)
400            .assume("braid only contains existing commands")?;
401
402        let result = policy.call_rule(
403            &command,
404            &mut braid_perspective,
405            sink,
406            CommandPlacement::OnGraphInBraid,
407        );
408
409        // If the command failed in an uncontrolled way, rollback
410        if let Err(e) = result
411            && e != PolicyError::Check
412        {
413            sink.rollback();
414            return Err(e.into());
415        }
416    }
417
418    let braid = storage.write_facts(braid_perspective)?;
419
420    sink.commit();
421
422    Ok((braid, last_common_ancestor))
423}
424
425/// Select the policy from two locations with the greatest serial value.
426fn choose_policy<'a, PS: PolicyStore>(
427    storage: &impl Storage,
428    policy_store: &'a PS,
429    left: Location,
430    right: Location,
431) -> Result<(&'a PS::Policy, PolicyId), ClientError> {
432    Ok(core::cmp::max_by_key(
433        get_policy(storage, policy_store, left)?,
434        get_policy(storage, policy_store, right)?,
435        |(p, _)| p.serial(),
436    ))
437}
438
439fn get_policy<'a, PS: PolicyStore>(
440    storage: &impl Storage,
441    policy_store: &'a PS,
442    location: Location,
443) -> Result<(&'a PS::Policy, PolicyId), ClientError> {
444    let segment = storage.get_segment(location)?;
445    let policy_id = segment.policy();
446    let policy = policy_store.get_policy(policy_id)?;
447    Ok((policy, policy_id))
448}
449
450#[cfg(test)]
451mod test {
452    use std::collections::HashMap;
453
454    use aranya_crypto::id::{Id, IdTag};
455    use buggy::Bug;
456    use test_log::test;
457
458    use super::*;
459    use crate::{
460        ClientState, Keys, MergeIds, Perspective, Policy, Priority,
461        memory::MemStorageProvider,
462        policy::{ActionPlacement, CommandPlacement},
463        testing::{hash_for_testing_only, short_b58},
464    };
465
466    struct SeqPolicyStore;
467
468    /// [`SeqPolicy`] is a very simple policy which appends the id of each
469    /// command to a fact named `b"seq"`. At each point in the graph, the value
470    /// of this fact should be equal to the ids in braid order of all facts up
471    /// to that point.
472    struct SeqPolicy;
473
474    struct SeqCommand {
475        id: CmdId,
476        prior: Prior<Address>,
477        finalize: bool,
478        data: Box<str>,
479        max_cut: usize,
480    }
481
482    impl PolicyStore for SeqPolicyStore {
483        type Policy = SeqPolicy;
484        type Effect = ();
485
486        fn add_policy(&mut self, _policy: &[u8]) -> Result<PolicyId, PolicyError> {
487            Ok(PolicyId::new(0))
488        }
489
490        fn get_policy(&self, _id: PolicyId) -> Result<&Self::Policy, PolicyError> {
491            Ok(&SeqPolicy)
492        }
493    }
494
495    impl Policy for SeqPolicy {
496        type Action<'a> = &'a str;
497        type Effect = ();
498        type Command<'a> = SeqCommand;
499
500        fn serial(&self) -> u32 {
501            0
502        }
503
504        fn call_rule(
505            &self,
506            command: &impl Command,
507            facts: &mut impl crate::FactPerspective,
508            _sink: &mut impl Sink<Self::Effect>,
509            _placement: CommandPlacement,
510        ) -> Result<(), PolicyError> {
511            assert!(
512                !matches!(command.parent(), Prior::Merge { .. }),
513                "merges shouldn't be evaluated"
514            );
515
516            // For init and basic commands, append the id to the seq fact.
517            let data = command.bytes();
518            if let Some(seq) = facts
519                .query("seq", &Keys::default())
520                .assume("can query")?
521                .as_deref()
522            {
523                facts.insert(
524                    "seq".into(),
525                    Keys::default(),
526                    [seq, b":", data].concat().into(),
527                );
528            } else {
529                facts.insert("seq".into(), Keys::default(), data.into());
530            }
531            Ok(())
532        }
533
534        fn call_action(
535            &self,
536            _action: Self::Action<'_>,
537            _facts: &mut impl Perspective,
538            _sink: &mut impl Sink<Self::Effect>,
539            _placement: ActionPlacement,
540        ) -> Result<(), PolicyError> {
541            unimplemented!()
542        }
543
544        fn merge<'a>(
545            &self,
546            _target: &'a mut [u8],
547            ids: MergeIds,
548        ) -> Result<Self::Command<'a>, PolicyError> {
549            let (left, right): (Address, Address) = ids.into();
550            let parents = [*left.id.as_array(), *right.id.as_array()];
551            let id = hash_for_testing_only(parents.as_flattened());
552
553            Ok(SeqCommand::new(
554                id,
555                Prior::Merge(left, right),
556                left.max_cut
557                    .max(right.max_cut)
558                    .checked_add(1)
559                    .assume("must not overflow")?,
560            ))
561        }
562    }
563
564    impl SeqCommand {
565        fn new(id: CmdId, prior: Prior<Address>, max_cut: usize) -> Self {
566            let data = short_b58(id).into_boxed_str();
567            Self {
568                id,
569                prior,
570                finalize: false,
571                data,
572                max_cut,
573            }
574        }
575
576        fn finalize(id: CmdId, prev: Address, max_cut: usize) -> Self {
577            let data = short_b58(id).into_boxed_str();
578            Self {
579                id,
580                prior: Prior::Single(prev),
581                finalize: true,
582                data,
583                max_cut,
584            }
585        }
586    }
587
588    impl Command for SeqCommand {
589        fn priority(&self) -> Priority {
590            if self.finalize {
591                return Priority::Finalize;
592            }
593            match self.prior {
594                Prior::None => Priority::Init,
595                Prior::Single(_) => {
596                    // Use the last byte of the ID as priority, just so we can
597                    // properly see the effects of braiding
598                    let id = self.id.as_bytes();
599                    let priority = u32::from(*id.last().unwrap());
600                    Priority::Basic(priority)
601                }
602                Prior::Merge(_, _) => Priority::Merge,
603            }
604        }
605
606        fn id(&self) -> CmdId {
607            self.id
608        }
609
610        fn parent(&self) -> Prior<Address> {
611            self.prior
612        }
613
614        fn policy(&self) -> Option<&[u8]> {
615            // We don't actually need any policy bytes, but the
616            // transaction/storage requires it on init commands.
617            match self.prior {
618                Prior::None => Some(b""),
619                _ => None,
620            }
621        }
622
623        fn bytes(&self) -> &[u8] {
624            self.data.as_bytes()
625        }
626
627        fn max_cut(&self) -> Result<usize, Bug> {
628            Ok(self.max_cut)
629        }
630    }
631
632    struct NullSink;
633    impl<Eff> Sink<Eff> for NullSink {
634        fn begin(&mut self) {}
635        fn consume(&mut self, _: Eff) {}
636        fn rollback(&mut self) {}
637        fn commit(&mut self) {}
638    }
639
640    /// [`GraphBuilder`] and the associated macro [`graph`] provide an easy way
641    /// to create a graph with a specific structure.
642    struct GraphBuilder<SP: StorageProvider> {
643        client: ClientState<SeqPolicyStore, SP>,
644        trx: Transaction<SP, SeqPolicyStore>,
645        max_cuts: HashMap<CmdId, usize>,
646    }
647
648    impl<SP: StorageProvider> GraphBuilder<SP> {
649        pub fn init(
650            mut client: ClientState<SeqPolicyStore, SP>,
651            ids: &[CmdId],
652        ) -> Result<Self, ClientError> {
653            let mut trx = Transaction::new(GraphId::transmute(ids[0]));
654            let mut prior: Prior<Address> = Prior::None;
655            let mut max_cuts = HashMap::new();
656            for (max_cut, &id) in ids.iter().enumerate() {
657                let cmd = SeqCommand::new(id, prior, max_cut);
658                trx.add_commands(
659                    &[cmd],
660                    &mut client.provider,
661                    &mut client.policy_store,
662                    &mut NullSink,
663                )?;
664                max_cuts.insert(id, max_cut);
665                prior = Prior::Single(Address { id, max_cut });
666            }
667            Ok(Self {
668                client,
669                trx,
670                max_cuts,
671            })
672        }
673
674        fn get_addr(&self, id: CmdId) -> Address {
675            Address {
676                id,
677                max_cut: self.max_cuts[&id],
678            }
679        }
680
681        pub fn line(&mut self, prev: CmdId, ids: &[CmdId]) -> Result<(), ClientError> {
682            let mut prev = self.get_addr(prev);
683            for &id in ids {
684                let max_cut = prev.max_cut.checked_add(1).unwrap();
685                let cmd = SeqCommand::new(id, Prior::Single(prev), max_cut);
686                self.trx.add_commands(
687                    &[cmd],
688                    &mut self.client.provider,
689                    &mut self.client.policy_store,
690                    &mut NullSink,
691                )?;
692                self.max_cuts.insert(id, max_cut);
693                prev = Address { id, max_cut };
694            }
695            Ok(())
696        }
697
698        pub fn finalize(&mut self, prev: CmdId, id: CmdId) -> Result<(), ClientError> {
699            let prev = self.get_addr(prev);
700            let max_cut = prev.max_cut.checked_add(1).unwrap();
701            let cmd = SeqCommand::finalize(id, prev, max_cut);
702            self.trx.add_commands(
703                &[cmd],
704                &mut self.client.provider,
705                &mut self.client.policy_store,
706                &mut NullSink,
707            )?;
708            self.max_cuts.insert(id, max_cut);
709            Ok(())
710        }
711
712        pub fn merge(
713            &mut self,
714            (left, right): (CmdId, CmdId),
715            ids: &[CmdId],
716        ) -> Result<(), ClientError> {
717            let prior = Prior::Merge(self.get_addr(left), self.get_addr(right));
718            let mergecmd = SeqCommand::new(ids[0], prior, prior.next_max_cut().unwrap());
719            let mut prev = Address {
720                id: mergecmd.id,
721                max_cut: mergecmd.max_cut,
722            };
723            self.max_cuts.insert(mergecmd.id, mergecmd.max_cut);
724            self.trx.add_commands(
725                &[mergecmd],
726                &mut self.client.provider,
727                &mut self.client.policy_store,
728                &mut NullSink,
729            )?;
730            for &id in &ids[1..] {
731                let cmd = SeqCommand::new(
732                    id,
733                    Prior::Single(prev),
734                    prev.max_cut.checked_add(1).expect("must not overflow"),
735                );
736                prev = Address {
737                    id: cmd.id,
738                    max_cut: cmd.max_cut,
739                };
740                self.max_cuts.insert(cmd.id, cmd.max_cut);
741                self.trx.add_commands(
742                    &[cmd],
743                    &mut self.client.provider,
744                    &mut self.client.policy_store,
745                    &mut NullSink,
746                )?;
747            }
748            Ok(())
749        }
750
751        pub fn flush(&mut self) {
752            if let Some(p) = Option::take(&mut self.trx.perspective) {
753                self.trx.phead = None;
754                let seg = self
755                    .client
756                    .provider
757                    .get_storage(self.trx.graph_id)
758                    .unwrap()
759                    .write(p)
760                    .unwrap();
761                let head = seg.head().unwrap();
762                self.trx.heads.insert(
763                    head.address().expect("address must exist"),
764                    seg.head_location(),
765                );
766            }
767        }
768
769        pub fn commit(&mut self) -> Result<(), ClientError> {
770            self.trx.commit(
771                &mut self.client.provider,
772                &mut self.client.policy_store,
773                &mut NullSink,
774            )
775        }
776    }
777
778    fn mkid<Tag: IdTag>(x: &str) -> Id<Tag> {
779        x.parse().unwrap()
780    }
781
782    /// See tests for usage.
783    macro_rules! graph {
784        ( $client:expr ; $init:literal $($inits:literal )* ; $($rest:tt)*) => {{
785            let mut gb = GraphBuilder::init($client, &[mkid($init), $(mkid($inits)),*]).unwrap();
786            graph!(@ gb, $($rest)*);
787            gb
788        }};
789        (@ $gb:ident, $prev:literal < $($id:literal)+; $($rest:tt)*) => {
790            $gb.line(mkid($prev), &[$(mkid($id)),+]).unwrap();
791            graph!(@ $gb, $($rest)*);
792        };
793        (@ $gb:ident, $l:literal $r:literal < $($id:literal)+; $($rest:tt)*) => {
794            $gb.merge((mkid($l), mkid($r)), &[$(mkid($id)),+]).unwrap();
795            graph!(@ $gb, $($rest)*);
796        };
797        (@ $gb:ident, $prev:literal < finalize $id:literal; $($rest:tt)*) => {
798            $gb.finalize(mkid($prev), mkid($id)).unwrap();
799            graph!(@ $gb, $($rest)*);
800        };
801        (@ $gb:ident, commit; $($rest:tt)*) => {
802            $gb.commit().unwrap();
803            graph!(@ $gb, $($rest)*);
804        };
805        (@ $gb:ident, ) => {
806            $gb.flush();
807        };
808    }
809
810    fn lookup(storage: &impl Storage, name: &str) -> Option<Box<[u8]>> {
811        use crate::Query as _;
812        let head = storage.get_head().unwrap();
813        let p = storage.get_fact_perspective(head).unwrap();
814        p.query(name, &Keys::default()).unwrap()
815    }
816
817    #[test]
818    fn test_simple() -> Result<(), StorageError> {
819        let mut gb = graph! {
820            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
821            "a";
822            "a" < "b";
823            "a" < "c";
824            "b" "c" < "ma";
825            "b" < "d";
826            "ma" "d" < "mb";
827            commit;
828        };
829        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
830
831        #[cfg(feature = "graphviz")]
832        crate::storage::memory::graphviz::dot(g, "simple");
833
834        assert_eq!(g.get_head().unwrap(), Location::new(5, 0));
835
836        let seq = lookup(g, "seq").unwrap();
837        let seq = std::str::from_utf8(&seq).unwrap();
838        assert_eq!(seq, "a:b:d:c");
839
840        Ok(())
841    }
842
843    #[test]
844    fn test_complex() -> Result<(), StorageError> {
845        let mut gb = graph! {
846            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
847            "a";
848            "a" < "1" "2" "3";
849            "3" < "4" "6" "7";
850            "3" < "5" "8";
851            "6" "8" < "9" "aa"; commit;
852            "7" < "a1" "a2";
853            "aa" "a2" < "a3";
854            "a3" < "a6" "a4";
855            "a3" < "a7" "a5";
856            "a4" "a5" < "a8";
857            "9" < "42" "43";
858            "42" < "45" "46";
859            "45" < "47" "48";
860            commit;
861        };
862
863        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
864
865        #[cfg(feature = "graphviz")]
866        crate::storage::memory::graphviz::dot(g, "complex");
867
868        assert_eq!(g.get_head().unwrap(), Location::new(15, 0));
869
870        let seq = lookup(g, "seq").unwrap();
871        let seq = std::str::from_utf8(&seq).unwrap();
872        assert_eq!(
873            seq,
874            "a:1:2:3:5:8:4:6:42:45:47:48:46:43:aa:7:a1:a2:a7:a6:a5:a4"
875        );
876
877        Ok(())
878    }
879
880    #[test]
881    fn test_duplicates() {
882        let mut gb = graph! {
883            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
884            "a";
885            "a" < "b" "c";
886            "a" < "b";
887            "b" < "c";
888            "c" < "d";
889            commit;
890            "a" < "b";
891            "b" < "c";
892            "d" < "e";
893            commit;
894        };
895
896        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
897
898        #[cfg(feature = "graphviz")]
899        crate::storage::memory::graphviz::dot(g, "duplicates");
900
901        assert_eq!(g.get_head().unwrap(), Location::new(2, 0));
902
903        let seq = lookup(g, "seq").unwrap();
904        let seq = std::str::from_utf8(&seq).unwrap();
905        assert_eq!(seq, "a:b:c:d:e");
906    }
907
908    #[test]
909    fn test_mid_braid_1() {
910        let mut gb = graph! {
911            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
912            "a";
913            commit;
914            "a" < "b" "c" "d" "e" "f" "g";
915            "d" < "h" "i" "j";
916            commit;
917        };
918
919        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
920
921        #[cfg(feature = "graphviz")]
922        crate::storage::memory::graphviz::dot(g, "mid_braid_1");
923
924        assert_eq!(g.get_head().unwrap(), Location::new(3, 0));
925
926        let seq = lookup(g, "seq").unwrap();
927        let seq = std::str::from_utf8(&seq).unwrap();
928        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
929    }
930
931    #[test]
932    fn test_mid_braid_2() {
933        let mut gb = graph! {
934            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
935            "a";
936            commit;
937            "a" < "b" "c" "d" "h" "i" "j";
938            "d" < "e" "f" "g";
939            commit;
940        };
941
942        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
943
944        #[cfg(feature = "graphviz")]
945        crate::storage::memory::graphviz::dot(g, "mid_braid_2");
946
947        assert_eq!(g.get_head().unwrap(), Location::new(3, 0));
948
949        let seq = lookup(g, "seq").unwrap();
950        let seq = std::str::from_utf8(&seq).unwrap();
951        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
952    }
953
954    #[test]
955    fn test_sequential_finalize() {
956        let mut gb = graph! {
957            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
958            "a";
959            commit;
960            "a" < "b" "c" "d" "e" "f" "g";
961            "d" < "h" "i" "j";
962            "e" < finalize "fff1";
963            "fff1" < "x" "y";
964            "y" < finalize "fff2";
965            commit;
966        };
967
968        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
969
970        #[cfg(feature = "graphviz")]
971        crate::storage::memory::graphviz::dot(g, "finalize_success");
972
973        assert_eq!(g.get_head().unwrap(), Location::new(5, 0));
974
975        let seq = lookup(g, "seq").unwrap();
976        let seq = std::str::from_utf8(&seq).unwrap();
977        assert_eq!(seq, "a:b:c:d:e:fff1:x:y:fff2:h:i:j:f:g");
978    }
979
980    #[test]
981    fn test_parallel_finalize() {
982        let mut gb = graph! {
983            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
984            "a";
985            commit;
986            "a" < "b" "c" "d" "e" "f" "g";
987            "d" < "h" "i" "j";
988            "e" < finalize "fff1";
989            "i" < finalize "fff2";
990        };
991        let err = gb.commit().expect_err("merge should fail");
992        assert!(matches!(err, ClientError::ParallelFinalize), "{err:?}");
993    }
994}