aranya_runtime/client/
transaction.rs

1use alloc::collections::{BTreeMap, VecDeque};
2use core::{marker::PhantomData, mem};
3
4use buggy::{BugExt as _, bug};
5
6use super::braiding;
7use crate::{
8    Address, ClientError, CmdId, Command, Engine, EngineError, GraphId, Location,
9    MAX_COMMAND_LENGTH, MergeIds, Perspective as _, Policy as _, PolicyId, Prior, Revertable as _,
10    Segment as _, Sink, Storage, StorageError, StorageProvider, engine::CommandPlacement,
11};
12
13/// Transaction used to receive many commands at once.
14///
15/// The transaction allows us to have many temporary heads at once, so we don't
16/// need as many merges when adding commands. When the transaction is committed,
17/// we will merge all temporary heads and the storage head, and then commit the
18/// result as the new storage head.
19pub struct Transaction<SP: StorageProvider, E> {
20    /// The ID of the associated storage
21    storage_id: GraphId,
22    /// Current working perspective
23    perspective: Option<SP::Perspective>,
24    /// Head of the current perspective
25    phead: Option<CmdId>,
26    /// Written but not committed heads
27    heads: BTreeMap<Address, Location>,
28    /// Tag for associated engine
29    _engine: PhantomData<E>,
30}
31
32impl<SP: StorageProvider, E> Transaction<SP, E> {
33    pub(super) const fn new(storage_id: GraphId) -> Self {
34        Self {
35            storage_id,
36            perspective: None,
37            phead: None,
38            heads: BTreeMap::new(),
39            _engine: PhantomData,
40        }
41    }
42}
43
44impl<SP: StorageProvider, E: Engine> Transaction<SP, E> {
45    /// Returns the transaction's storage id.
46    pub fn storage_id(&self) -> GraphId {
47        self.storage_id
48    }
49
50    /// Find a given id if reachable within this transaction.
51    ///
52    /// Does not search `self.perspective`, which should be written out beforehand.
53    fn locate(
54        &self,
55        storage: &mut SP::Storage,
56        address: Address,
57    ) -> Result<Option<Location>, ClientError> {
58        // Search from committed head.
59        if let Some(found) = storage.get_location(address)? {
60            return Ok(Some(found));
61        }
62        // Search from our temporary heads.
63        for &head in self.heads.values() {
64            if let Some(found) = storage.get_location_from(head, address)? {
65                return Ok(Some(found));
66            }
67        }
68        Ok(None)
69    }
70
71    /// Write current perspective, merge transaction heads, and commit to graph.
72    pub(super) fn commit(
73        &mut self,
74        provider: &mut SP,
75        engine: &mut E,
76        sink: &mut impl Sink<E::Effect>,
77    ) -> Result<(), ClientError> {
78        let storage = provider.get_storage(self.storage_id)?;
79
80        // Write out current perspective.
81        if let Some(p) = Option::take(&mut self.perspective) {
82            self.phead = None;
83            let segment = storage.write(p)?;
84            let head = segment.head()?;
85            self.heads.insert(head.address()?, segment.head_location());
86        }
87
88        // Merge heads pairwise until single head left, then commit.
89        // TODO(#370): Merge deterministically
90        let mut heads: VecDeque<_> = mem::take(&mut self.heads).into_iter().collect();
91        let mut merging_head = false;
92        while let Some((left_id, mut left_loc)) = heads.pop_front() {
93            if let Some((right_id, mut right_loc)) = heads.pop_front() {
94                let (policy, policy_id) = choose_policy(storage, engine, left_loc, right_loc)?;
95
96                let mut buffer = [0u8; MAX_COMMAND_LENGTH];
97                let merge_ids = MergeIds::new(left_id, right_id).assume("merging different ids")?;
98                if left_id > right_id {
99                    mem::swap(&mut left_loc, &mut right_loc);
100                }
101                let command = policy.merge(&mut buffer, merge_ids)?;
102
103                let (braid, last_common_ancestor) =
104                    make_braid_segment::<_, E>(storage, left_loc, right_loc, sink, policy)?;
105
106                let mut perspective = storage.new_merge_perspective(
107                    left_loc,
108                    right_loc,
109                    last_common_ancestor,
110                    policy_id,
111                    braid,
112                )?;
113                perspective.add_command(&command)?;
114
115                let segment = storage.write(perspective)?;
116                let head = segment.head()?;
117
118                heads.push_back((head.address()?, segment.head_location()));
119            } else {
120                let segment = storage.get_segment(left_loc)?;
121                // Try to commit. If it fails with `HeadNotAncestor`, we know we
122                // need to merge with the graph head.
123                match storage.commit(segment) {
124                    Ok(()) => break,
125                    Err(StorageError::HeadNotAncestor) => {
126                        if merging_head {
127                            bug!("merging with graph head again, would loop");
128                        }
129
130                        merging_head = true;
131
132                        heads.push_back((left_id, left_loc));
133
134                        let head_loc = storage.get_head()?;
135                        let segment = storage.get_segment(head_loc)?;
136                        let head = segment.head()?;
137                        heads.push_back((head.address()?, segment.head_location()));
138                    }
139                    Err(e) => return Err(e.into()),
140                }
141            }
142        }
143
144        Ok(())
145    }
146
147    /// Attempt to store the `command` in the graph with `storage_id`. Effects will be
148    /// emitted to the `sink`. This interface is used when syncing with another device
149    /// and integrating the new commands.
150    pub(super) fn add_commands(
151        &mut self,
152        commands: &[impl Command],
153        provider: &mut SP,
154        engine: &mut E,
155        sink: &mut impl Sink<E::Effect>,
156    ) -> Result<usize, ClientError> {
157        let mut commands = commands.iter();
158        let mut count: usize = 0;
159
160        // Get storage or try to initialize with first command.
161        let storage = match provider.get_storage(self.storage_id) {
162            Ok(s) => s,
163            Err(StorageError::NoSuchStorage) => {
164                let command = commands.next().ok_or(ClientError::InitError)?;
165                count = count.checked_add(1).assume("must not overflow")?;
166                self.init(command, engine, provider, sink)?
167            }
168            Err(e) => return Err(e.into()),
169        };
170
171        // Handle remaining commands.
172        for command in commands {
173            if self
174                .perspective
175                .as_ref()
176                .is_some_and(|p| p.includes(command.id()))
177            {
178                // Command in current perspective.
179                continue;
180            }
181            if (self.locate(storage, command.address()?)?).is_some() {
182                // Command already added.
183                continue;
184            }
185            match command.parent() {
186                Prior::None => {
187                    if command.id().as_base() == self.storage_id.as_base() {
188                        // Graph already initialized, extra init just spurious
189                    } else {
190                        bug!("init command does not belong in graph");
191                    }
192                }
193                Prior::Single(parent) => {
194                    self.add_single(storage, engine, sink, command, parent)?;
195                    count = count.checked_add(1).assume("must not overflow")?;
196                }
197                Prior::Merge(left, right) => {
198                    self.add_merge(storage, engine, sink, command, left, right)?;
199                    count = count.checked_add(1).assume("must not overflow")?;
200                }
201            }
202        }
203
204        Ok(count)
205    }
206
207    fn add_single(
208        &mut self,
209        storage: &mut <SP as StorageProvider>::Storage,
210        engine: &mut E,
211        sink: &mut impl Sink<E::Effect>,
212        command: &impl Command,
213        parent: Address,
214    ) -> Result<(), ClientError> {
215        let perspective = self.get_perspective(parent, storage)?;
216
217        let policy_id = perspective.policy();
218        let policy = engine.get_policy(policy_id)?;
219
220        // Try to run command, or revert if failed.
221        sink.begin();
222        let checkpoint = perspective.checkpoint();
223        if let Err(e) = policy.call_rule(
224            command,
225            perspective,
226            sink,
227            CommandPlacement::OnGraphAtOrigin,
228        ) {
229            perspective.revert(checkpoint)?;
230            sink.rollback();
231            return Err(e.into());
232        }
233        perspective.add_command(command)?;
234        sink.commit();
235
236        self.phead = Some(command.id());
237
238        Ok(())
239    }
240
241    fn add_merge(
242        &mut self,
243        storage: &mut <SP as StorageProvider>::Storage,
244        engine: &mut E,
245        sink: &mut impl Sink<E::Effect>,
246        command: &impl Command,
247        left: Address,
248        right: Address,
249    ) -> Result<bool, ClientError> {
250        // Must always start a new perspective for merges.
251        if let Some(p) = Option::take(&mut self.perspective) {
252            let seg = storage.write(p)?;
253            let head = seg.head()?;
254            self.heads.insert(head.address()?, seg.head_location());
255        }
256
257        let left_loc = self
258            .locate(storage, left)?
259            .ok_or(ClientError::NoSuchParent(left.id))?;
260        let right_loc = self
261            .locate(storage, right)?
262            .ok_or(ClientError::NoSuchParent(right.id))?;
263
264        let (policy, policy_id) = choose_policy(storage, engine, left_loc, right_loc)?;
265
266        // Braid commands from left and right into an ordered sequence.
267        let (braid, last_common_ancestor) =
268            make_braid_segment::<_, E>(storage, left_loc, right_loc, sink, policy)?;
269
270        let mut perspective = storage.new_merge_perspective(
271            left_loc,
272            right_loc,
273            last_common_ancestor,
274            policy_id,
275            braid,
276        )?;
277        perspective.add_command(command)?;
278
279        // These are no longer heads of the transaction, since they are both covered by the merge
280        self.heads.remove(&left);
281        self.heads.remove(&right);
282
283        self.perspective = Some(perspective);
284        self.phead = Some(command.id());
285
286        Ok(true)
287    }
288
289    /// Get a perspective to which we can add a command with the given parant.
290    ///
291    /// If parent is the head of the current perspective, we can just use it.
292    /// Otherwise, we must write out the perspective and get a new one.
293    fn get_perspective(
294        &mut self,
295        parent: Address,
296        storage: &mut <SP as StorageProvider>::Storage,
297    ) -> Result<&mut <SP as StorageProvider>::Perspective, ClientError> {
298        if self.phead == Some(parent.id) {
299            // Command will append to current perspective.
300            return Ok(self
301                .perspective
302                .as_mut()
303                .assume("trx has perspective when has phead")?);
304        }
305
306        // Write out the current perspective.
307        if let Some(p) = Option::take(&mut self.perspective) {
308            self.phead = None;
309            let seg = storage.write(p)?;
310            let head = seg.head()?;
311            self.heads.insert(head.address()?, seg.head_location());
312        }
313
314        let loc = self
315            .locate(storage, parent)?
316            .ok_or(ClientError::NoSuchParent(parent.id))?;
317
318        // Get a new perspective and store it in the transaction.
319        let p = self
320            .perspective
321            .insert(storage.get_linear_perspective(loc)?);
322
323        self.phead = Some(parent.id);
324        self.heads.remove(&parent);
325
326        Ok(p)
327    }
328
329    fn init<'sp>(
330        &mut self,
331        command: &impl Command,
332        engine: &mut E,
333        provider: &'sp mut SP,
334        sink: &mut impl Sink<E::Effect>,
335    ) -> Result<&'sp mut <SP as StorageProvider>::Storage, ClientError> {
336        // Storage ID is the id of the init command by definition.
337        if self.storage_id.as_base() != command.id().as_base() {
338            return Err(ClientError::InitError);
339        }
340
341        // The init command must not have a parent.
342        if !matches!(command.parent(), Prior::None) {
343            return Err(ClientError::InitError);
344        }
345
346        // The graph must have policy to start with.
347        let Some(policy_data) = command.policy() else {
348            return Err(ClientError::InitError);
349        };
350
351        let policy_id = engine.add_policy(policy_data)?;
352        let policy = engine.get_policy(policy_id)?;
353
354        // Get an empty perspective and run the init command.
355        let mut perspective = provider.new_perspective(policy_id);
356        sink.begin();
357        if let Err(e) = policy.call_rule(
358            command,
359            &mut perspective,
360            sink,
361            CommandPlacement::OnGraphAtOrigin,
362        ) {
363            sink.rollback();
364            // We don't need to revert perspective since we just drop it.
365            return Err(e.into());
366        }
367        perspective.add_command(command)?;
368
369        let (_, storage) = provider.new_storage(perspective)?;
370
371        // Wait to commit until we are absolutely sure we've initialized.
372        sink.commit();
373
374        Ok(storage)
375    }
376}
377
378/// Run the braid algorithm and evaluate the sequence to create a braided fact index.
379fn make_braid_segment<S: Storage, E: Engine>(
380    storage: &mut S,
381    left: Location,
382    right: Location,
383    sink: &mut impl Sink<E::Effect>,
384    policy: &E::Policy,
385) -> Result<(S::FactIndex, (Location, usize)), ClientError> {
386    let order = braiding::braid(storage, left, right)?;
387    let last_common_ancestor = braiding::last_common_ancestor(storage, left, right)?;
388
389    let (&first, rest) = order.split_first().assume("braid is non-empty")?;
390
391    let mut braid_perspective = storage.get_fact_perspective(first)?;
392
393    sink.begin();
394
395    for &location in rest {
396        let segment = storage.get_segment(location)?;
397        let command = segment
398            .get_command(location)
399            .assume("braid only contains existing commands")?;
400
401        let result = policy.call_rule(
402            &command,
403            &mut braid_perspective,
404            sink,
405            CommandPlacement::OnGraphInBraid,
406        );
407
408        // If the command failed in an uncontrolled way, rollback
409        if let Err(e) = result {
410            if e != EngineError::Check {
411                sink.rollback();
412                return Err(e.into());
413            }
414        }
415    }
416
417    let braid = storage.write_facts(braid_perspective)?;
418
419    sink.commit();
420
421    Ok((braid, last_common_ancestor))
422}
423
424/// Select the policy from two locations with the greatest serial value.
425fn choose_policy<'a, E: Engine>(
426    storage: &impl Storage,
427    engine: &'a E,
428    left: Location,
429    right: Location,
430) -> Result<(&'a E::Policy, PolicyId), ClientError> {
431    Ok(core::cmp::max_by_key(
432        get_policy(storage, engine, left)?,
433        get_policy(storage, engine, right)?,
434        |(p, _)| p.serial(),
435    ))
436}
437
438fn get_policy<'a, E: Engine>(
439    storage: &impl Storage,
440    engine: &'a E,
441    location: Location,
442) -> Result<(&'a E::Policy, PolicyId), ClientError> {
443    let segment = storage.get_segment(location)?;
444    let policy_id = segment.policy();
445    let policy = engine.get_policy(policy_id)?;
446    Ok((policy, policy_id))
447}
448
449#[cfg(test)]
450mod test {
451    use std::collections::HashMap;
452
453    use aranya_crypto::id::{Id, IdTag};
454    use buggy::Bug;
455    use test_log::test;
456
457    use super::*;
458    use crate::{
459        ClientState, Keys, MergeIds, Perspective, Policy, Priority,
460        engine::{ActionPlacement, CommandPlacement},
461        memory::MemStorageProvider,
462        testing::{hash_for_testing_only, short_b58},
463    };
464
465    struct SeqEngine;
466
467    /// [`SeqPolicy`] is a very simple policy which appends the id of each
468    /// command to a fact named `b"seq"`. At each point in the graph, the value
469    /// of this fact should be equal to the ids in braid order of all facts up
470    /// to that point.
471    struct SeqPolicy;
472
473    struct SeqCommand {
474        id: CmdId,
475        prior: Prior<Address>,
476        finalize: bool,
477        data: Box<str>,
478        max_cut: usize,
479    }
480
481    impl Engine for SeqEngine {
482        type Policy = SeqPolicy;
483        type Effect = ();
484
485        fn add_policy(&mut self, _policy: &[u8]) -> Result<PolicyId, EngineError> {
486            Ok(PolicyId::new(0))
487        }
488
489        fn get_policy(&self, _id: PolicyId) -> Result<&Self::Policy, EngineError> {
490            Ok(&SeqPolicy)
491        }
492    }
493
494    impl Policy for SeqPolicy {
495        type Action<'a> = &'a str;
496        type Effect = ();
497        type Command<'a> = SeqCommand;
498
499        fn serial(&self) -> u32 {
500            0
501        }
502
503        fn call_rule(
504            &self,
505            command: &impl Command,
506            facts: &mut impl crate::FactPerspective,
507            _sink: &mut impl Sink<Self::Effect>,
508            _placement: CommandPlacement,
509        ) -> Result<(), EngineError> {
510            assert!(
511                !matches!(command.parent(), Prior::Merge { .. }),
512                "merges shouldn't be evaluated"
513            );
514
515            // For init and basic commands, append the id to the seq fact.
516            let data = command.bytes();
517            if let Some(seq) = facts
518                .query("seq", &Keys::default())
519                .assume("can query")?
520                .as_deref()
521            {
522                facts.insert(
523                    "seq".into(),
524                    Keys::default(),
525                    [seq, b":", data].concat().into(),
526                );
527            } else {
528                facts.insert("seq".into(), Keys::default(), data.into());
529            }
530            Ok(())
531        }
532
533        fn call_action(
534            &self,
535            _action: Self::Action<'_>,
536            _facts: &mut impl Perspective,
537            _sink: &mut impl Sink<Self::Effect>,
538            _placement: ActionPlacement,
539        ) -> Result<(), EngineError> {
540            unimplemented!()
541        }
542
543        fn merge<'a>(
544            &self,
545            _target: &'a mut [u8],
546            ids: MergeIds,
547        ) -> Result<Self::Command<'a>, EngineError> {
548            let (left, right): (Address, Address) = ids.into();
549            let parents = [*left.id.as_array(), *right.id.as_array()];
550            let id = hash_for_testing_only(parents.as_flattened());
551
552            Ok(SeqCommand::new(
553                id,
554                Prior::Merge(left, right),
555                left.max_cut
556                    .max(right.max_cut)
557                    .checked_add(1)
558                    .assume("must not overflow")?,
559            ))
560        }
561    }
562
563    impl SeqCommand {
564        fn new(id: CmdId, prior: Prior<Address>, max_cut: usize) -> Self {
565            let data = short_b58(id).into_boxed_str();
566            Self {
567                id,
568                prior,
569                finalize: false,
570                data,
571                max_cut,
572            }
573        }
574
575        fn finalize(id: CmdId, prev: Address, max_cut: usize) -> Self {
576            let data = short_b58(id).into_boxed_str();
577            Self {
578                id,
579                prior: Prior::Single(prev),
580                finalize: true,
581                data,
582                max_cut,
583            }
584        }
585    }
586
587    impl Command for SeqCommand {
588        fn priority(&self) -> Priority {
589            if self.finalize {
590                return Priority::Finalize;
591            }
592            match self.prior {
593                Prior::None => Priority::Init,
594                Prior::Single(_) => {
595                    // Use the last byte of the ID as priority, just so we can
596                    // properly see the effects of braiding
597                    let id = self.id.as_bytes();
598                    let priority = u32::from(*id.last().unwrap());
599                    Priority::Basic(priority)
600                }
601                Prior::Merge(_, _) => Priority::Merge,
602            }
603        }
604
605        fn id(&self) -> CmdId {
606            self.id
607        }
608
609        fn parent(&self) -> Prior<Address> {
610            self.prior
611        }
612
613        fn policy(&self) -> Option<&[u8]> {
614            // We don't actually need any policy bytes, but the
615            // transaction/storage requires it on init commands.
616            match self.prior {
617                Prior::None => Some(b""),
618                _ => None,
619            }
620        }
621
622        fn bytes(&self) -> &[u8] {
623            self.data.as_bytes()
624        }
625
626        fn max_cut(&self) -> Result<usize, Bug> {
627            Ok(self.max_cut)
628        }
629    }
630
631    struct NullSink;
632    impl<E> Sink<E> for NullSink {
633        fn begin(&mut self) {}
634        fn consume(&mut self, _: E) {}
635        fn rollback(&mut self) {}
636        fn commit(&mut self) {}
637    }
638
639    /// [`GraphBuilder`] and the associated macro [`graph`] provide an easy way
640    /// to create a graph with a specific structure.
641    struct GraphBuilder<SP: StorageProvider> {
642        client: ClientState<SeqEngine, SP>,
643        trx: Transaction<SP, SeqEngine>,
644        max_cuts: HashMap<CmdId, usize>,
645    }
646
647    impl<SP: StorageProvider> GraphBuilder<SP> {
648        pub fn init(
649            mut client: ClientState<SeqEngine, SP>,
650            ids: &[CmdId],
651        ) -> Result<Self, ClientError> {
652            let mut trx = Transaction::new(GraphId::transmute(ids[0]));
653            let mut prior: Prior<Address> = Prior::None;
654            let mut max_cuts = HashMap::new();
655            for (max_cut, &id) in ids.iter().enumerate() {
656                let cmd = SeqCommand::new(id, prior, max_cut);
657                trx.add_commands(
658                    &[cmd],
659                    &mut client.provider,
660                    &mut client.engine,
661                    &mut NullSink,
662                )?;
663                max_cuts.insert(id, max_cut);
664                prior = Prior::Single(Address { id, max_cut });
665            }
666            Ok(Self {
667                client,
668                trx,
669                max_cuts,
670            })
671        }
672
673        fn get_addr(&self, id: CmdId) -> Address {
674            Address {
675                id,
676                max_cut: self.max_cuts[&id],
677            }
678        }
679
680        pub fn line(&mut self, prev: CmdId, ids: &[CmdId]) -> Result<(), ClientError> {
681            let mut prev = self.get_addr(prev);
682            for &id in ids {
683                let max_cut = prev.max_cut.checked_add(1).unwrap();
684                let cmd = SeqCommand::new(id, Prior::Single(prev), max_cut);
685                self.trx.add_commands(
686                    &[cmd],
687                    &mut self.client.provider,
688                    &mut self.client.engine,
689                    &mut NullSink,
690                )?;
691                self.max_cuts.insert(id, max_cut);
692                prev = Address { id, max_cut };
693            }
694            Ok(())
695        }
696
697        pub fn finalize(&mut self, prev: CmdId, id: CmdId) -> Result<(), ClientError> {
698            let prev = self.get_addr(prev);
699            let max_cut = prev.max_cut.checked_add(1).unwrap();
700            let cmd = SeqCommand::finalize(id, prev, max_cut);
701            self.trx.add_commands(
702                &[cmd],
703                &mut self.client.provider,
704                &mut self.client.engine,
705                &mut NullSink,
706            )?;
707            self.max_cuts.insert(id, max_cut);
708            Ok(())
709        }
710
711        pub fn merge(
712            &mut self,
713            (left, right): (CmdId, CmdId),
714            ids: &[CmdId],
715        ) -> Result<(), ClientError> {
716            let prior = Prior::Merge(self.get_addr(left), self.get_addr(right));
717            let mergecmd = SeqCommand::new(ids[0], prior, prior.next_max_cut().unwrap());
718            let mut prev = Address {
719                id: mergecmd.id,
720                max_cut: mergecmd.max_cut,
721            };
722            self.max_cuts.insert(mergecmd.id, mergecmd.max_cut);
723            self.trx.add_commands(
724                &[mergecmd],
725                &mut self.client.provider,
726                &mut self.client.engine,
727                &mut NullSink,
728            )?;
729            for &id in &ids[1..] {
730                let cmd = SeqCommand::new(
731                    id,
732                    Prior::Single(prev),
733                    prev.max_cut.checked_add(1).expect("must not overflow"),
734                );
735                prev = Address {
736                    id: cmd.id,
737                    max_cut: cmd.max_cut,
738                };
739                self.max_cuts.insert(cmd.id, cmd.max_cut);
740                self.trx.add_commands(
741                    &[cmd],
742                    &mut self.client.provider,
743                    &mut self.client.engine,
744                    &mut NullSink,
745                )?;
746            }
747            Ok(())
748        }
749
750        pub fn flush(&mut self) {
751            if let Some(p) = Option::take(&mut self.trx.perspective) {
752                self.trx.phead = None;
753                let seg = self
754                    .client
755                    .provider
756                    .get_storage(self.trx.storage_id)
757                    .unwrap()
758                    .write(p)
759                    .unwrap();
760                let head = seg.head().unwrap();
761                self.trx.heads.insert(
762                    head.address().expect("address must exist"),
763                    seg.head_location(),
764                );
765            }
766        }
767
768        pub fn commit(&mut self) -> Result<(), ClientError> {
769            self.trx.commit(
770                &mut self.client.provider,
771                &mut self.client.engine,
772                &mut NullSink,
773            )
774        }
775    }
776
777    fn mkid<Tag: IdTag>(x: &str) -> Id<Tag> {
778        x.parse().unwrap()
779    }
780
781    /// See tests for usage.
782    macro_rules! graph {
783        ( $client:expr ; $init:literal $($inits:literal )* ; $($rest:tt)*) => {{
784            let mut gb = GraphBuilder::init($client, &[mkid($init), $(mkid($inits)),*]).unwrap();
785            graph!(@ gb, $($rest)*);
786            gb
787        }};
788        (@ $gb:ident, $prev:literal < $($id:literal)+; $($rest:tt)*) => {
789            $gb.line(mkid($prev), &[$(mkid($id)),+]).unwrap();
790            graph!(@ $gb, $($rest)*);
791        };
792        (@ $gb:ident, $l:literal $r:literal < $($id:literal)+; $($rest:tt)*) => {
793            $gb.merge((mkid($l), mkid($r)), &[$(mkid($id)),+]).unwrap();
794            graph!(@ $gb, $($rest)*);
795        };
796        (@ $gb:ident, $prev:literal < finalize $id:literal; $($rest:tt)*) => {
797            $gb.finalize(mkid($prev), mkid($id)).unwrap();
798            graph!(@ $gb, $($rest)*);
799        };
800        (@ $gb:ident, commit; $($rest:tt)*) => {
801            $gb.commit().unwrap();
802            graph!(@ $gb, $($rest)*);
803        };
804        (@ $gb:ident, ) => {
805            $gb.flush();
806        };
807    }
808
809    fn lookup(storage: &impl Storage, name: &str) -> Option<Box<[u8]>> {
810        use crate::Query as _;
811        let head = storage.get_head().unwrap();
812        let p = storage.get_fact_perspective(head).unwrap();
813        p.query(name, &Keys::default()).unwrap()
814    }
815
816    #[test]
817    fn test_simple() -> Result<(), StorageError> {
818        let mut gb = graph! {
819            ClientState::new(SeqEngine, MemStorageProvider::new());
820            "a";
821            "a" < "b";
822            "a" < "c";
823            "b" "c" < "ma";
824            "b" < "d";
825            "ma" "d" < "mb";
826            commit;
827        };
828        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
829
830        #[cfg(feature = "graphviz")]
831        crate::storage::memory::graphviz::dot(g, "simple");
832
833        assert_eq!(g.get_head().unwrap(), Location::new(5, 0));
834
835        let seq = lookup(g, "seq").unwrap();
836        let seq = std::str::from_utf8(&seq).unwrap();
837        assert_eq!(seq, "a:b:d:c");
838
839        Ok(())
840    }
841
842    #[test]
843    fn test_complex() -> Result<(), StorageError> {
844        let mut gb = graph! {
845            ClientState::new(SeqEngine, MemStorageProvider::new());
846            "a";
847            "a" < "1" "2" "3";
848            "3" < "4" "6" "7";
849            "3" < "5" "8";
850            "6" "8" < "9" "aa"; commit;
851            "7" < "a1" "a2";
852            "aa" "a2" < "a3";
853            "a3" < "a6" "a4";
854            "a3" < "a7" "a5";
855            "a4" "a5" < "a8";
856            "9" < "42" "43";
857            "42" < "45" "46";
858            "45" < "47" "48";
859            commit;
860        };
861
862        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
863
864        #[cfg(feature = "graphviz")]
865        crate::storage::memory::graphviz::dot(g, "complex");
866
867        assert_eq!(g.get_head().unwrap(), Location::new(15, 0));
868
869        let seq = lookup(g, "seq").unwrap();
870        let seq = std::str::from_utf8(&seq).unwrap();
871        assert_eq!(
872            seq,
873            "a:1:2:3:5:8:4:6:42:45:47:48:46:43:aa:7:a1:a2:a7:a6:a5:a4"
874        );
875
876        Ok(())
877    }
878
879    #[test]
880    fn test_duplicates() {
881        let mut gb = graph! {
882            ClientState::new(SeqEngine, MemStorageProvider::new());
883            "a";
884            "a" < "b" "c";
885            "a" < "b";
886            "b" < "c";
887            "c" < "d";
888            commit;
889            "a" < "b";
890            "b" < "c";
891            "d" < "e";
892            commit;
893        };
894
895        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
896
897        #[cfg(feature = "graphviz")]
898        crate::storage::memory::graphviz::dot(g, "duplicates");
899
900        assert_eq!(g.get_head().unwrap(), Location::new(2, 0));
901
902        let seq = lookup(g, "seq").unwrap();
903        let seq = std::str::from_utf8(&seq).unwrap();
904        assert_eq!(seq, "a:b:c:d:e");
905    }
906
907    #[test]
908    fn test_mid_braid_1() {
909        let mut gb = graph! {
910            ClientState::new(SeqEngine, MemStorageProvider::new());
911            "a";
912            commit;
913            "a" < "b" "c" "d" "e" "f" "g";
914            "d" < "h" "i" "j";
915            commit;
916        };
917
918        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
919
920        #[cfg(feature = "graphviz")]
921        crate::storage::memory::graphviz::dot(g, "mid_braid_1");
922
923        assert_eq!(g.get_head().unwrap(), Location::new(3, 0));
924
925        let seq = lookup(g, "seq").unwrap();
926        let seq = std::str::from_utf8(&seq).unwrap();
927        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
928    }
929
930    #[test]
931    fn test_mid_braid_2() {
932        let mut gb = graph! {
933            ClientState::new(SeqEngine, MemStorageProvider::new());
934            "a";
935            commit;
936            "a" < "b" "c" "d" "h" "i" "j";
937            "d" < "e" "f" "g";
938            commit;
939        };
940
941        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
942
943        #[cfg(feature = "graphviz")]
944        crate::storage::memory::graphviz::dot(g, "mid_braid_2");
945
946        assert_eq!(g.get_head().unwrap(), Location::new(3, 0));
947
948        let seq = lookup(g, "seq").unwrap();
949        let seq = std::str::from_utf8(&seq).unwrap();
950        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
951    }
952
953    #[test]
954    fn test_sequential_finalize() {
955        let mut gb = graph! {
956            ClientState::new(SeqEngine, MemStorageProvider::new());
957            "a";
958            commit;
959            "a" < "b" "c" "d" "e" "f" "g";
960            "d" < "h" "i" "j";
961            "e" < finalize "fff1";
962            "fff1" < "x" "y";
963            "y" < finalize "fff2";
964            commit;
965        };
966
967        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
968
969        #[cfg(feature = "graphviz")]
970        crate::storage::memory::graphviz::dot(g, "finalize_success");
971
972        assert_eq!(g.get_head().unwrap(), Location::new(5, 0));
973
974        let seq = lookup(g, "seq").unwrap();
975        let seq = std::str::from_utf8(&seq).unwrap();
976        assert_eq!(seq, "a:b:c:d:e:fff1:x:y:fff2:h:i:j:f:g");
977    }
978
979    #[test]
980    fn test_parallel_finalize() {
981        let mut gb = graph! {
982            ClientState::new(SeqEngine, MemStorageProvider::new());
983            "a";
984            commit;
985            "a" < "b" "c" "d" "e" "f" "g";
986            "d" < "h" "i" "j";
987            "e" < finalize "fff1";
988            "i" < finalize "fff2";
989        };
990        let err = gb.commit().expect_err("merge should fail");
991        assert!(matches!(err, ClientError::ParallelFinalize), "{err:?}");
992    }
993}