aranya_runtime/client/
transaction.rs

1use alloc::collections::{BTreeMap, VecDeque};
2use core::{marker::PhantomData, mem};
3
4use buggy::{BugExt, bug};
5
6use super::braiding;
7use crate::{
8    Address, ClientError, CmdId, Command, CommandRecall, Engine, EngineError, GraphId, Location,
9    MAX_COMMAND_LENGTH, MergeIds, Perspective, Policy, PolicyId, Prior, Revertable, Segment, Sink,
10    Storage, StorageError, StorageProvider,
11};
12
13/// Transaction used to receive many commands at once.
14///
15/// The transaction allows us to have many temporary heads at once, so we don't
16/// need as many merges when adding commands. When the transaction is committed,
17/// we will merge all temporary heads and the storage head, and then commit the
18/// result as the new storage head.
19pub struct Transaction<SP: StorageProvider, E> {
20    /// The ID of the associated storage
21    storage_id: GraphId,
22    /// Current working perspective
23    perspective: Option<SP::Perspective>,
24    /// Head of the current perspective
25    phead: Option<CmdId>,
26    /// Written but not committed heads
27    heads: BTreeMap<Address, Location>,
28    /// Tag for associated engine
29    _engine: PhantomData<E>,
30}
31
32impl<SP: StorageProvider, E> Transaction<SP, E> {
33    pub(super) const fn new(storage_id: GraphId) -> Self {
34        Self {
35            storage_id,
36            perspective: None,
37            phead: None,
38            heads: BTreeMap::new(),
39            _engine: PhantomData,
40        }
41    }
42}
43
44impl<SP: StorageProvider, E: Engine> Transaction<SP, E> {
45    /// Returns the transaction's storage id.
46    pub fn storage_id(&self) -> GraphId {
47        self.storage_id
48    }
49
50    /// Find a given id if reachable within this transaction.
51    ///
52    /// Does not search `self.perspective`, which should be written out beforehand.
53    fn locate(
54        &self,
55        storage: &mut SP::Storage,
56        address: Address,
57    ) -> Result<Option<Location>, ClientError> {
58        // Search from committed head.
59        if let Some(found) = storage.get_location(address)? {
60            return Ok(Some(found));
61        }
62        // Search from our temporary heads.
63        for &head in self.heads.values() {
64            if let Some(found) = storage.get_location_from(head, address)? {
65                return Ok(Some(found));
66            }
67        }
68        Ok(None)
69    }
70
71    /// Write current perspective, merge transaction heads, and commit to graph.
72    pub(super) fn commit(
73        &mut self,
74        provider: &mut SP,
75        engine: &mut E,
76        sink: &mut impl Sink<E::Effect>,
77    ) -> Result<(), ClientError> {
78        let storage = provider.get_storage(self.storage_id)?;
79
80        // Write out current perspective.
81        if let Some(p) = Option::take(&mut self.perspective) {
82            self.phead = None;
83            let segment = storage.write(p)?;
84            let head = segment.head()?;
85            self.heads.insert(head.address()?, segment.head_location());
86        }
87
88        // Merge heads pairwise until single head left, then commit.
89        // TODO(#370): Merge deterministically
90        let mut heads: VecDeque<_> = mem::take(&mut self.heads).into_iter().collect();
91        let mut merging_head = false;
92        while let Some((left_id, mut left_loc)) = heads.pop_front() {
93            if let Some((right_id, mut right_loc)) = heads.pop_front() {
94                let (policy, policy_id) = choose_policy(storage, engine, left_loc, right_loc)?;
95
96                let mut buffer = [0u8; MAX_COMMAND_LENGTH];
97                let merge_ids = MergeIds::new(left_id, right_id).assume("merging different ids")?;
98                if left_id > right_id {
99                    mem::swap(&mut left_loc, &mut right_loc);
100                }
101                let command = policy.merge(&mut buffer, merge_ids)?;
102
103                let (braid, last_common_ancestor) =
104                    make_braid_segment::<_, E>(storage, left_loc, right_loc, sink, policy)?;
105
106                let mut perspective = storage.new_merge_perspective(
107                    left_loc,
108                    right_loc,
109                    last_common_ancestor,
110                    policy_id,
111                    braid,
112                )?;
113                perspective.add_command(&command)?;
114
115                let segment = storage.write(perspective)?;
116                let head = segment.head()?;
117
118                heads.push_back((head.address()?, segment.head_location()));
119            } else {
120                let segment = storage.get_segment(left_loc)?;
121                // Try to commit. If it fails with `HeadNotAncestor`, we know we
122                // need to merge with the graph head.
123                match storage.commit(segment) {
124                    Ok(()) => break,
125                    Err(StorageError::HeadNotAncestor) => {
126                        if merging_head {
127                            bug!("merging with graph head again, would loop");
128                        }
129
130                        merging_head = true;
131
132                        heads.push_back((left_id, left_loc));
133
134                        let head_loc = storage.get_head()?;
135                        let segment = storage.get_segment(head_loc)?;
136                        let head = segment.head()?;
137                        heads.push_back((head.address()?, segment.head_location()));
138                    }
139                    Err(e) => return Err(e.into()),
140                }
141            }
142        }
143
144        Ok(())
145    }
146
147    /// Attempt to store the `command` in the graph with `storage_id`. Effects will be
148    /// emitted to the `sink`. This interface is used when syncing with another device
149    /// and integrating the new commands.
150    pub(super) fn add_commands(
151        &mut self,
152        commands: &[impl Command],
153        provider: &mut SP,
154        engine: &mut E,
155        sink: &mut impl Sink<E::Effect>,
156    ) -> Result<usize, ClientError> {
157        let mut commands = commands.iter();
158        let mut count: usize = 0;
159
160        // Get storage or try to initialize with first command.
161        let storage = match provider.get_storage(self.storage_id) {
162            Ok(s) => s,
163            Err(StorageError::NoSuchStorage) => {
164                let command = commands.next().ok_or(ClientError::InitError)?;
165                count = count.checked_add(1).assume("must not overflow")?;
166                self.init(command, engine, provider, sink)?
167            }
168            Err(e) => return Err(e.into()),
169        };
170
171        // Handle remaining commands.
172        for command in commands {
173            if self
174                .perspective
175                .as_ref()
176                .is_some_and(|p| p.includes(command.id()))
177            {
178                // Command in current perspective.
179                continue;
180            }
181            if (self.locate(storage, command.address()?)?).is_some() {
182                // Command already added.
183                continue;
184            }
185            match command.parent() {
186                Prior::None => {
187                    if command.id().into_id() == self.storage_id.into_id() {
188                        // Graph already initialized, extra init just spurious
189                    } else {
190                        bug!("init command does not belong in graph");
191                    }
192                }
193                Prior::Single(parent) => {
194                    self.add_single(storage, engine, sink, command, parent)?;
195                    count = count.checked_add(1).assume("must not overflow")?;
196                }
197                Prior::Merge(left, right) => {
198                    self.add_merge(storage, engine, sink, command, left, right)?;
199                    count = count.checked_add(1).assume("must not overflow")?;
200                }
201            };
202        }
203
204        Ok(count)
205    }
206
207    fn add_single(
208        &mut self,
209        storage: &mut <SP as StorageProvider>::Storage,
210        engine: &mut E,
211        sink: &mut impl Sink<E::Effect>,
212        command: &impl Command,
213        parent: Address,
214    ) -> Result<(), ClientError> {
215        let perspective = self.get_perspective(parent, storage)?;
216
217        let policy_id = perspective.policy();
218        let policy = engine.get_policy(policy_id)?;
219
220        // Try to run command, or revert if failed.
221        sink.begin();
222        let checkpoint = perspective.checkpoint();
223        if let Err(e) = policy.call_rule(command, perspective, sink, CommandRecall::None) {
224            perspective.revert(checkpoint)?;
225            sink.rollback();
226            return Err(e.into());
227        }
228        perspective.add_command(command)?;
229        sink.commit();
230
231        self.phead = Some(command.id());
232
233        Ok(())
234    }
235
236    fn add_merge(
237        &mut self,
238        storage: &mut <SP as StorageProvider>::Storage,
239        engine: &mut E,
240        sink: &mut impl Sink<E::Effect>,
241        command: &impl Command,
242        left: Address,
243        right: Address,
244    ) -> Result<bool, ClientError> {
245        // Must always start a new perspective for merges.
246        if let Some(p) = Option::take(&mut self.perspective) {
247            let seg = storage.write(p)?;
248            let head = seg.head()?;
249            self.heads.insert(head.address()?, seg.head_location());
250        }
251
252        let left_loc = self
253            .locate(storage, left)?
254            .ok_or(ClientError::NoSuchParent(left.id))?;
255        let right_loc = self
256            .locate(storage, right)?
257            .ok_or(ClientError::NoSuchParent(right.id))?;
258
259        let (policy, policy_id) = choose_policy(storage, engine, left_loc, right_loc)?;
260
261        // Braid commands from left and right into an ordered sequence.
262        let (braid, last_common_ancestor) =
263            make_braid_segment::<_, E>(storage, left_loc, right_loc, sink, policy)?;
264
265        let mut perspective = storage.new_merge_perspective(
266            left_loc,
267            right_loc,
268            last_common_ancestor,
269            policy_id,
270            braid,
271        )?;
272        perspective.add_command(command)?;
273
274        // These are no longer heads of the transaction, since they are both covered by the merge
275        self.heads.remove(&left);
276        self.heads.remove(&right);
277
278        self.perspective = Some(perspective);
279        self.phead = Some(command.id());
280
281        Ok(true)
282    }
283
284    /// Get a perspective to which we can add a command with the given parant.
285    ///
286    /// If parent is the head of the current perspective, we can just use it.
287    /// Otherwise, we must write out the perspective and get a new one.
288    fn get_perspective(
289        &mut self,
290        parent: Address,
291        storage: &mut <SP as StorageProvider>::Storage,
292    ) -> Result<&mut <SP as StorageProvider>::Perspective, ClientError> {
293        if self.phead == Some(parent.id) {
294            // Command will append to current perspective.
295            return Ok(self
296                .perspective
297                .as_mut()
298                .assume("trx has perspective when has phead")?);
299        }
300
301        // Write out the current perspective.
302        if let Some(p) = Option::take(&mut self.perspective) {
303            self.phead = None;
304            let seg = storage.write(p)?;
305            let head = seg.head()?;
306            self.heads.insert(head.address()?, seg.head_location());
307        }
308
309        let loc = self
310            .locate(storage, parent)?
311            .ok_or(ClientError::NoSuchParent(parent.id))?;
312
313        // Get a new perspective and store it in the transaction.
314        let p = self
315            .perspective
316            .insert(storage.get_linear_perspective(loc)?);
317
318        self.phead = Some(parent.id);
319        self.heads.remove(&parent);
320
321        Ok(p)
322    }
323
324    fn init<'sp>(
325        &mut self,
326        command: &impl Command,
327        engine: &mut E,
328        provider: &'sp mut SP,
329        sink: &mut impl Sink<E::Effect>,
330    ) -> Result<&'sp mut <SP as StorageProvider>::Storage, ClientError> {
331        // Storage ID is the id of the init command by definition.
332        if self.storage_id.into_id() != command.id().into_id() {
333            return Err(ClientError::InitError);
334        }
335
336        // The init command must not have a parent.
337        if !matches!(command.parent(), Prior::None) {
338            return Err(ClientError::InitError);
339        }
340
341        // The graph must have policy to start with.
342        let Some(policy_data) = command.policy() else {
343            return Err(ClientError::InitError);
344        };
345
346        let policy_id = engine.add_policy(policy_data)?;
347        let policy = engine.get_policy(policy_id)?;
348
349        // Get an empty perspective and run the init command.
350        let mut perspective = provider.new_perspective(policy_id);
351        sink.begin();
352        if let Err(e) = policy.call_rule(command, &mut perspective, sink, CommandRecall::None) {
353            sink.rollback();
354            // We don't need to revert perspective since we just drop it.
355            return Err(e.into());
356        }
357        perspective.add_command(command)?;
358
359        let (_, storage) = provider.new_storage(perspective)?;
360
361        // Wait to commit until we are absolutely sure we've initialized.
362        sink.commit();
363
364        Ok(storage)
365    }
366}
367
368/// Run the braid algorithm and evaluate the sequence to create a braided fact index.
369fn make_braid_segment<S: Storage, E: Engine>(
370    storage: &mut S,
371    left: Location,
372    right: Location,
373    sink: &mut impl Sink<E::Effect>,
374    policy: &E::Policy,
375) -> Result<(S::FactIndex, (Location, usize)), ClientError> {
376    let order = braiding::braid(storage, left, right)?;
377    let last_common_ancestor = braiding::last_common_ancestor(storage, left, right)?;
378
379    let (&first, rest) = order.split_first().assume("braid is non-empty")?;
380
381    let mut braid_perspective = storage.get_fact_perspective(first)?;
382
383    sink.begin();
384
385    for &location in rest {
386        let segment = storage.get_segment(location)?;
387        let command = segment
388            .get_command(location)
389            .assume("braid only contains existing commands")?;
390
391        let result = policy.call_rule(
392            &command,
393            &mut braid_perspective,
394            sink,
395            CommandRecall::OnCheck,
396        );
397
398        // If the command failed in an uncontrolled way, rollback
399        if let Err(e) = result {
400            if e != EngineError::Check {
401                sink.rollback();
402                return Err(e.into());
403            }
404        }
405    }
406
407    let braid = storage.write_facts(braid_perspective)?;
408
409    sink.commit();
410
411    Ok((braid, last_common_ancestor))
412}
413
414/// Select the policy from two locations with the greatest serial value.
415fn choose_policy<'a, E: Engine>(
416    storage: &impl Storage,
417    engine: &'a E,
418    left: Location,
419    right: Location,
420) -> Result<(&'a E::Policy, PolicyId), ClientError> {
421    Ok(core::cmp::max_by_key(
422        get_policy(storage, engine, left)?,
423        get_policy(storage, engine, right)?,
424        |(p, _)| p.serial(),
425    ))
426}
427
428fn get_policy<'a, E: Engine>(
429    storage: &impl Storage,
430    engine: &'a E,
431    location: Location,
432) -> Result<(&'a E::Policy, PolicyId), ClientError> {
433    let segment = storage.get_segment(location)?;
434    let policy_id = segment.policy();
435    let policy = engine.get_policy(policy_id)?;
436    Ok((policy, policy_id))
437}
438
439#[cfg(test)]
440mod test {
441    use std::collections::HashMap;
442
443    use buggy::Bug;
444    use test_log::test;
445
446    use super::*;
447    use crate::{
448        ClientState, Keys, MergeIds, Priority,
449        memory::MemStorageProvider,
450        testing::{hash_for_testing_only, short_b58},
451    };
452
453    struct SeqEngine;
454
455    /// [`SeqPolicy`] is a very simple policy which appends the id of each
456    /// command to a fact named `b"seq"`. At each point in the graph, the value
457    /// of this fact should be equal to the ids in braid order of all facts up
458    /// to that point.
459    struct SeqPolicy;
460
461    struct SeqCommand {
462        id: CmdId,
463        prior: Prior<Address>,
464        finalize: bool,
465        data: Box<str>,
466        max_cut: usize,
467    }
468
469    impl Engine for SeqEngine {
470        type Policy = SeqPolicy;
471        type Effect = ();
472
473        fn add_policy(&mut self, _policy: &[u8]) -> Result<PolicyId, EngineError> {
474            Ok(PolicyId::new(0))
475        }
476
477        fn get_policy(&self, _id: PolicyId) -> Result<&Self::Policy, EngineError> {
478            Ok(&SeqPolicy)
479        }
480    }
481
482    impl Policy for SeqPolicy {
483        type Action<'a> = &'a str;
484        type Effect = ();
485        type Command<'a> = SeqCommand;
486
487        fn serial(&self) -> u32 {
488            0
489        }
490
491        fn call_rule(
492            &self,
493            command: &impl Command,
494            facts: &mut impl crate::FactPerspective,
495            _sink: &mut impl Sink<Self::Effect>,
496            _recall: CommandRecall,
497        ) -> Result<(), EngineError> {
498            assert!(
499                !matches!(command.parent(), Prior::Merge { .. }),
500                "merges shouldn't be evaluated"
501            );
502
503            // For init and basic commands, append the id to the seq fact.
504            let data = command.bytes();
505            if let Some(seq) = facts
506                .query("seq", &Keys::default())
507                .assume("can query")?
508                .as_deref()
509            {
510                facts.insert(
511                    "seq".into(),
512                    Keys::default(),
513                    [seq, b":", data].concat().into(),
514                );
515            } else {
516                facts.insert("seq".into(), Keys::default(), data.into());
517            };
518            Ok(())
519        }
520
521        fn call_action(
522            &self,
523            _action: Self::Action<'_>,
524            _facts: &mut impl Perspective,
525            _sink: &mut impl Sink<Self::Effect>,
526        ) -> Result<(), EngineError> {
527            unimplemented!()
528        }
529
530        fn merge<'a>(
531            &self,
532            _target: &'a mut [u8],
533            ids: MergeIds,
534        ) -> Result<Self::Command<'a>, EngineError> {
535            let (left, right): (Address, Address) = ids.into();
536            let parents = [*left.id.as_array(), *right.id.as_array()];
537            let id = hash_for_testing_only(parents.as_flattened());
538
539            Ok(SeqCommand::new(
540                id,
541                Prior::Merge(left, right),
542                left.max_cut
543                    .max(right.max_cut)
544                    .checked_add(1)
545                    .assume("must not overflow")?,
546            ))
547        }
548    }
549
550    impl SeqCommand {
551        fn new(id: CmdId, prior: Prior<Address>, max_cut: usize) -> Self {
552            let data = short_b58(id).into_boxed_str();
553            Self {
554                id,
555                prior,
556                finalize: false,
557                data,
558                max_cut,
559            }
560        }
561
562        fn finalize(id: CmdId, prev: Address, max_cut: usize) -> Self {
563            let data = short_b58(id).into_boxed_str();
564            Self {
565                id,
566                prior: Prior::Single(prev),
567                finalize: true,
568                data,
569                max_cut,
570            }
571        }
572    }
573
574    impl Command for SeqCommand {
575        fn priority(&self) -> Priority {
576            if self.finalize {
577                return Priority::Finalize;
578            }
579            match self.prior {
580                Prior::None => Priority::Init,
581                Prior::Single(_) => {
582                    // Use the last byte of the ID as priority, just so we can
583                    // properly see the effects of braiding
584                    let id = self.id.as_bytes();
585                    let priority = u32::from(*id.last().unwrap());
586                    Priority::Basic(priority)
587                }
588                Prior::Merge(_, _) => Priority::Merge,
589            }
590        }
591
592        fn id(&self) -> CmdId {
593            self.id
594        }
595
596        fn parent(&self) -> Prior<Address> {
597            self.prior
598        }
599
600        fn policy(&self) -> Option<&[u8]> {
601            // We don't actually need any policy bytes, but the
602            // transaction/storage requires it on init commands.
603            match self.prior {
604                Prior::None { .. } => Some(b""),
605                _ => None,
606            }
607        }
608
609        fn bytes(&self) -> &[u8] {
610            self.data.as_bytes()
611        }
612
613        fn max_cut(&self) -> Result<usize, Bug> {
614            Ok(self.max_cut)
615        }
616    }
617
618    struct NullSink;
619    impl<E> Sink<E> for NullSink {
620        fn begin(&mut self) {}
621        fn consume(&mut self, _: E) {}
622        fn rollback(&mut self) {}
623        fn commit(&mut self) {}
624    }
625
626    /// [`GraphBuilder`] and the associated macro [`graph`] provide an easy way
627    /// to create a graph with a specific structure.
628    struct GraphBuilder<SP: StorageProvider> {
629        client: ClientState<SeqEngine, SP>,
630        trx: Transaction<SP, SeqEngine>,
631        max_cuts: HashMap<CmdId, usize>,
632    }
633
634    impl<SP: StorageProvider> GraphBuilder<SP> {
635        pub fn init(
636            mut client: ClientState<SeqEngine, SP>,
637            ids: &[CmdId],
638        ) -> Result<Self, ClientError> {
639            let mut trx = Transaction::new(GraphId::from(ids[0].into_id()));
640            let mut prior: Prior<Address> = Prior::None;
641            let mut max_cuts = HashMap::new();
642            for (max_cut, &id) in ids.iter().enumerate() {
643                let cmd = SeqCommand::new(id, prior, max_cut);
644                trx.add_commands(
645                    &[cmd],
646                    &mut client.provider,
647                    &mut client.engine,
648                    &mut NullSink,
649                )?;
650                max_cuts.insert(id, max_cut);
651                prior = Prior::Single(Address { id, max_cut });
652            }
653            Ok(Self {
654                client,
655                trx,
656                max_cuts,
657            })
658        }
659
660        fn get_addr(&self, id: CmdId) -> Address {
661            Address {
662                id,
663                max_cut: self.max_cuts[&id],
664            }
665        }
666
667        pub fn line(&mut self, prev: CmdId, ids: &[CmdId]) -> Result<(), ClientError> {
668            let mut prev = self.get_addr(prev);
669            for &id in ids {
670                let max_cut = prev.max_cut.checked_add(1).unwrap();
671                let cmd = SeqCommand::new(id, Prior::Single(prev), max_cut);
672                self.trx.add_commands(
673                    &[cmd],
674                    &mut self.client.provider,
675                    &mut self.client.engine,
676                    &mut NullSink,
677                )?;
678                self.max_cuts.insert(id, max_cut);
679                prev = Address { id, max_cut };
680            }
681            Ok(())
682        }
683
684        pub fn finalize(&mut self, prev: CmdId, id: CmdId) -> Result<(), ClientError> {
685            let prev = self.get_addr(prev);
686            let max_cut = prev.max_cut.checked_add(1).unwrap();
687            let cmd = SeqCommand::finalize(id, prev, max_cut);
688            self.trx.add_commands(
689                &[cmd],
690                &mut self.client.provider,
691                &mut self.client.engine,
692                &mut NullSink,
693            )?;
694            self.max_cuts.insert(id, max_cut);
695            Ok(())
696        }
697
698        pub fn merge(
699            &mut self,
700            (left, right): (CmdId, CmdId),
701            ids: &[CmdId],
702        ) -> Result<(), ClientError> {
703            let prior = Prior::Merge(self.get_addr(left), self.get_addr(right));
704            let mergecmd = SeqCommand::new(ids[0], prior, prior.next_max_cut().unwrap());
705            let mut prev = Address {
706                id: mergecmd.id,
707                max_cut: mergecmd.max_cut,
708            };
709            self.max_cuts.insert(mergecmd.id, mergecmd.max_cut);
710            self.trx.add_commands(
711                &[mergecmd],
712                &mut self.client.provider,
713                &mut self.client.engine,
714                &mut NullSink,
715            )?;
716            for &id in &ids[1..] {
717                let cmd = SeqCommand::new(
718                    id,
719                    Prior::Single(prev),
720                    prev.max_cut.checked_add(1).expect("must not overflow"),
721                );
722                prev = Address {
723                    id: cmd.id,
724                    max_cut: cmd.max_cut,
725                };
726                self.max_cuts.insert(cmd.id, cmd.max_cut);
727                self.trx.add_commands(
728                    &[cmd],
729                    &mut self.client.provider,
730                    &mut self.client.engine,
731                    &mut NullSink,
732                )?;
733            }
734            Ok(())
735        }
736
737        pub fn flush(&mut self) {
738            if let Some(p) = Option::take(&mut self.trx.perspective) {
739                self.trx.phead = None;
740                let seg = self
741                    .client
742                    .provider
743                    .get_storage(self.trx.storage_id)
744                    .unwrap()
745                    .write(p)
746                    .unwrap();
747                let head = seg.head().unwrap();
748                self.trx.heads.insert(
749                    head.address().expect("address must exist"),
750                    seg.head_location(),
751                );
752            }
753        }
754
755        pub fn commit(&mut self) -> Result<(), ClientError> {
756            self.trx.commit(
757                &mut self.client.provider,
758                &mut self.client.engine,
759                &mut NullSink,
760            )
761        }
762    }
763
764    fn mkid<T>(x: &str) -> T
765    where
766        aranya_crypto::Id: Into<T>,
767    {
768        x.parse::<aranya_crypto::Id>().unwrap().into()
769    }
770
771    /// See tests for usage.
772    macro_rules! graph {
773        ( $client:expr ; $init:literal $($inits:literal )* ; $($rest:tt)*) => {{
774            let mut gb = GraphBuilder::init($client, &[mkid($init), $(mkid($inits)),*]).unwrap();
775            graph!(@ gb, $($rest)*);
776            gb
777        }};
778        (@ $gb:ident, $prev:literal < $($id:literal)+; $($rest:tt)*) => {
779            $gb.line(mkid($prev), &[$(mkid($id)),+]).unwrap();
780            graph!(@ $gb, $($rest)*);
781        };
782        (@ $gb:ident, $l:literal $r:literal < $($id:literal)+; $($rest:tt)*) => {
783            $gb.merge((mkid($l), mkid($r)), &[$(mkid($id)),+]).unwrap();
784            graph!(@ $gb, $($rest)*);
785        };
786        (@ $gb:ident, $prev:literal < finalize $id:literal; $($rest:tt)*) => {
787            $gb.finalize(mkid($prev), mkid($id)).unwrap();
788            graph!(@ $gb, $($rest)*);
789        };
790        (@ $gb:ident, commit; $($rest:tt)*) => {
791            $gb.commit().unwrap();
792            graph!(@ $gb, $($rest)*);
793        };
794        (@ $gb:ident, ) => {
795            $gb.flush();
796        };
797    }
798
799    fn lookup(storage: &impl Storage, name: &str) -> Option<Box<[u8]>> {
800        use crate::Query;
801        let head = storage.get_head().unwrap();
802        let p = storage.get_fact_perspective(head).unwrap();
803        p.query(name, &Keys::default()).unwrap()
804    }
805
806    #[test]
807    fn test_simple() -> Result<(), StorageError> {
808        let mut gb = graph! {
809            ClientState::new(SeqEngine, MemStorageProvider::new());
810            "a";
811            "a" < "b";
812            "a" < "c";
813            "b" "c" < "ma";
814            "b" < "d";
815            "ma" "d" < "mb";
816            commit;
817        };
818        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
819
820        #[cfg(feature = "graphviz")]
821        crate::storage::memory::graphviz::dot(g, "simple");
822
823        assert_eq!(g.get_head().unwrap(), Location::new(5, 0));
824
825        let seq = lookup(g, "seq").unwrap();
826        let seq = std::str::from_utf8(&seq).unwrap();
827        assert_eq!(seq, "a:b:d:c");
828
829        Ok(())
830    }
831
832    #[test]
833    fn test_complex() -> Result<(), StorageError> {
834        let mut gb = graph! {
835            ClientState::new(SeqEngine, MemStorageProvider::new());
836            "a";
837            "a" < "1" "2" "3";
838            "3" < "4" "6" "7";
839            "3" < "5" "8";
840            "6" "8" < "9" "aa"; commit;
841            "7" < "a1" "a2";
842            "aa" "a2" < "a3";
843            "a3" < "a6" "a4";
844            "a3" < "a7" "a5";
845            "a4" "a5" < "a8";
846            "9" < "42" "43";
847            "42" < "45" "46";
848            "45" < "47" "48";
849            commit;
850        };
851
852        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
853
854        #[cfg(feature = "graphviz")]
855        crate::storage::memory::graphviz::dot(g, "complex");
856
857        assert_eq!(g.get_head().unwrap(), Location::new(15, 0));
858
859        let seq = lookup(g, "seq").unwrap();
860        let seq = std::str::from_utf8(&seq).unwrap();
861        assert_eq!(
862            seq,
863            "a:1:2:3:5:8:4:6:42:45:47:48:46:43:aa:7:a1:a2:a7:a6:a5:a4"
864        );
865
866        Ok(())
867    }
868
869    #[test]
870    fn test_duplicates() {
871        let mut gb = graph! {
872            ClientState::new(SeqEngine, MemStorageProvider::new());
873            "a";
874            "a" < "b" "c";
875            "a" < "b";
876            "b" < "c";
877            "c" < "d";
878            commit;
879            "a" < "b";
880            "b" < "c";
881            "d" < "e";
882            commit;
883        };
884
885        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
886
887        #[cfg(feature = "graphviz")]
888        crate::storage::memory::graphviz::dot(g, "duplicates");
889
890        assert_eq!(g.get_head().unwrap(), Location::new(2, 0));
891
892        let seq = lookup(g, "seq").unwrap();
893        let seq = std::str::from_utf8(&seq).unwrap();
894        assert_eq!(seq, "a:b:c:d:e");
895    }
896
897    #[test]
898    fn test_mid_braid_1() {
899        let mut gb = graph! {
900            ClientState::new(SeqEngine, MemStorageProvider::new());
901            "a";
902            commit;
903            "a" < "b" "c" "d" "e" "f" "g";
904            "d" < "h" "i" "j";
905            commit;
906        };
907
908        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
909
910        #[cfg(feature = "graphviz")]
911        crate::storage::memory::graphviz::dot(g, "mid_braid_1");
912
913        assert_eq!(g.get_head().unwrap(), Location::new(3, 0));
914
915        let seq = lookup(g, "seq").unwrap();
916        let seq = std::str::from_utf8(&seq).unwrap();
917        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
918    }
919
920    #[test]
921    fn test_mid_braid_2() {
922        let mut gb = graph! {
923            ClientState::new(SeqEngine, MemStorageProvider::new());
924            "a";
925            commit;
926            "a" < "b" "c" "d" "h" "i" "j";
927            "d" < "e" "f" "g";
928            commit;
929        };
930
931        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
932
933        #[cfg(feature = "graphviz")]
934        crate::storage::memory::graphviz::dot(g, "mid_braid_2");
935
936        assert_eq!(g.get_head().unwrap(), Location::new(3, 0));
937
938        let seq = lookup(g, "seq").unwrap();
939        let seq = std::str::from_utf8(&seq).unwrap();
940        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
941    }
942
943    #[test]
944    fn test_sequential_finalize() {
945        let mut gb = graph! {
946            ClientState::new(SeqEngine, MemStorageProvider::new());
947            "a";
948            commit;
949            "a" < "b" "c" "d" "e" "f" "g";
950            "d" < "h" "i" "j";
951            "e" < finalize "fff1";
952            "fff1" < "x" "y";
953            "y" < finalize "fff2";
954            commit;
955        };
956
957        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
958
959        #[cfg(feature = "graphviz")]
960        crate::storage::memory::graphviz::dot(g, "finalize_success");
961
962        assert_eq!(g.get_head().unwrap(), Location::new(5, 0));
963
964        let seq = lookup(g, "seq").unwrap();
965        let seq = std::str::from_utf8(&seq).unwrap();
966        assert_eq!(seq, "a:b:c:d:e:fff1:x:y:fff2:h:i:j:f:g");
967    }
968
969    #[test]
970    fn test_parallel_finalize() {
971        let mut gb = graph! {
972            ClientState::new(SeqEngine, MemStorageProvider::new());
973            "a";
974            commit;
975            "a" < "b" "c" "d" "e" "f" "g";
976            "d" < "h" "i" "j";
977            "e" < finalize "fff1";
978            "i" < finalize "fff2";
979        };
980        let err = gb.commit().expect_err("merge should fail");
981        assert!(matches!(err, ClientError::ParallelFinalize), "{err:?}")
982    }
983}