aranya_runtime/client/
transaction.rs

1use alloc::collections::{BTreeMap, VecDeque};
2use core::{marker::PhantomData, mem};
3
4use buggy::{bug, BugExt};
5
6use super::braiding;
7use crate::{
8    Address, ClientError, Command, CommandId, CommandRecall, Engine, EngineError, GraphId,
9    Location, MergeIds, Perspective, Policy, PolicyId, Prior, Revertable, Segment, Sink, Storage,
10    StorageError, StorageProvider, MAX_COMMAND_LENGTH,
11};
12
13/// Transaction used to receive many commands at once.
14///
15/// The transaction allows us to have many temporary heads at once, so we don't
16/// need as many merges when adding commands. When the transaction is committed,
17/// we will merge all temporary heads and the storage head, and then commit the
18/// result as the new storage head.
19pub struct Transaction<SP: StorageProvider, E> {
20    /// The ID of the associated storage
21    storage_id: GraphId,
22    /// Current working perspective
23    perspective: Option<SP::Perspective>,
24    /// Head of the current perspective
25    phead: Option<CommandId>,
26    /// Written but not committed heads
27    heads: BTreeMap<Address, Location>,
28    /// Tag for associated engine
29    _engine: PhantomData<E>,
30}
31
32impl<SP: StorageProvider, E> Transaction<SP, E> {
33    pub(super) const fn new(storage_id: GraphId) -> Self {
34        Self {
35            storage_id,
36            perspective: None,
37            phead: None,
38            heads: BTreeMap::new(),
39            _engine: PhantomData,
40        }
41    }
42}
43
44impl<SP: StorageProvider, E: Engine> Transaction<SP, E> {
45    /// Returns the transaction's storage id.
46    pub fn storage_id(&self) -> GraphId {
47        self.storage_id
48    }
49
50    /// Find a given id if reachable within this transaction.
51    ///
52    /// Does not search `self.perspective`, which should be written out beforehand.
53    fn locate(
54        &self,
55        storage: &mut SP::Storage,
56        address: Address,
57    ) -> Result<Option<Location>, ClientError> {
58        // Search from committed head.
59        if let Some(found) = storage.get_location(address)? {
60            return Ok(Some(found));
61        }
62        // Search from our temporary heads.
63        for &head in self.heads.values() {
64            if let Some(found) = storage.get_location_from(head, address)? {
65                return Ok(Some(found));
66            }
67        }
68        Ok(None)
69    }
70
71    /// Write current perspective, merge transaction heads, and commit to graph.
72    pub(super) fn commit(
73        &mut self,
74        provider: &mut SP,
75        engine: &mut E,
76        sink: &mut impl Sink<E::Effect>,
77    ) -> Result<(), ClientError> {
78        let storage = provider.get_storage(self.storage_id)?;
79
80        // Write out current perspective.
81        if let Some(p) = Option::take(&mut self.perspective) {
82            self.phead = None;
83            let segment = storage.write(p)?;
84            let head = segment.head()?;
85            self.heads.insert(head.address()?, segment.head_location());
86        }
87
88        // Merge heads pairwise until single head left, then commit.
89        // TODO(#370): Merge deterministically
90        let mut heads: VecDeque<_> = mem::take(&mut self.heads).into_iter().collect();
91        let mut merging_head = false;
92        while let Some((left_id, mut left_loc)) = heads.pop_front() {
93            if let Some((right_id, mut right_loc)) = heads.pop_front() {
94                let (policy, policy_id) = choose_policy(storage, engine, left_loc, right_loc)?;
95
96                let mut buffer = [0u8; MAX_COMMAND_LENGTH];
97                let merge_ids = MergeIds::new(left_id, right_id).assume("merging different ids")?;
98                if left_id > right_id {
99                    mem::swap(&mut left_loc, &mut right_loc);
100                }
101                let command = policy.merge(&mut buffer, merge_ids)?;
102
103                let (braid, last_common_ancestor) =
104                    make_braid_segment::<_, E>(storage, left_loc, right_loc, sink, policy)?;
105
106                let mut perspective = storage
107                    .new_merge_perspective(
108                        left_loc,
109                        right_loc,
110                        last_common_ancestor,
111                        policy_id,
112                        braid,
113                    )?
114                    .assume("trx heads should exist in storage")?;
115                perspective.add_command(&command)?;
116
117                let segment = storage.write(perspective)?;
118                let head = segment.head()?;
119
120                heads.push_back((head.address()?, segment.head_location()));
121            } else {
122                let segment = storage.get_segment(left_loc)?;
123                // Try to commit. If it fails with `HeadNotAncestor`, we know we
124                // need to merge with the graph head.
125                match storage.commit(segment) {
126                    Ok(()) => break,
127                    Err(StorageError::HeadNotAncestor) => {
128                        if merging_head {
129                            bug!("merging with graph head again, would loop");
130                        }
131
132                        merging_head = true;
133
134                        heads.push_back((left_id, left_loc));
135
136                        let head_loc = storage.get_head()?;
137                        let segment = storage.get_segment(head_loc)?;
138                        let head = segment.head()?;
139                        heads.push_back((head.address()?, segment.head_location()));
140                    }
141                    Err(e) => return Err(e.into()),
142                }
143            }
144        }
145
146        Ok(())
147    }
148
149    /// Attempt to store the `command` in the graph with `storage_id`. Effects will be
150    /// emitted to the `sink`. This interface is used when syncing with another device
151    /// and integrating the new commands.
152    pub(super) fn add_commands(
153        &mut self,
154        commands: &[impl Command],
155        provider: &mut SP,
156        engine: &mut E,
157        sink: &mut impl Sink<E::Effect>,
158    ) -> Result<usize, ClientError> {
159        let mut commands = commands.iter();
160        let mut count: usize = 0;
161
162        // Get storage or try to initialize with first command.
163        let storage = match provider.get_storage(self.storage_id) {
164            Ok(s) => s,
165            Err(StorageError::NoSuchStorage) => {
166                let command = commands.next().ok_or(ClientError::InitError)?;
167                count = count.checked_add(1).assume("must not overflow")?;
168                self.init(command, engine, provider, sink)?
169            }
170            Err(e) => return Err(e.into()),
171        };
172
173        // Handle remaining commands.
174        for command in commands {
175            if self
176                .perspective
177                .as_ref()
178                .is_some_and(|p| p.includes(command.id()))
179            {
180                // Command in current perspective.
181                continue;
182            }
183            if (self.locate(storage, command.address()?)?).is_some() {
184                // Command already added.
185                continue;
186            }
187            match command.parent() {
188                Prior::None => {
189                    if command.id().into_id() == self.storage_id.into_id() {
190                        // Graph already initialized, extra init just spurious
191                    } else {
192                        bug!("init command does not belong in graph");
193                    }
194                }
195                Prior::Single(parent) => {
196                    self.add_single(storage, engine, sink, command, parent)?;
197                    count = count.checked_add(1).assume("must not overflow")?;
198                }
199                Prior::Merge(left, right) => {
200                    self.add_merge(storage, engine, sink, command, left, right)?;
201                    count = count.checked_add(1).assume("must not overflow")?;
202                }
203            };
204        }
205
206        Ok(count)
207    }
208
209    fn add_single(
210        &mut self,
211        storage: &mut <SP as StorageProvider>::Storage,
212        engine: &mut E,
213        sink: &mut impl Sink<E::Effect>,
214        command: &impl Command,
215        parent: Address,
216    ) -> Result<(), ClientError> {
217        let perspective = self.get_perspective(parent, storage)?;
218
219        let policy_id = perspective.policy();
220        let policy = engine.get_policy(policy_id)?;
221
222        // Try to run command, or revert if failed.
223        sink.begin();
224        let checkpoint = perspective.checkpoint();
225        if let Err(e) = policy.call_rule(command, perspective, sink, CommandRecall::None) {
226            perspective.revert(checkpoint)?;
227            sink.rollback();
228            return Err(e.into());
229        }
230        perspective.add_command(command)?;
231        sink.commit();
232
233        self.phead = Some(command.id());
234
235        Ok(())
236    }
237
238    fn add_merge(
239        &mut self,
240        storage: &mut <SP as StorageProvider>::Storage,
241        engine: &mut E,
242        sink: &mut impl Sink<E::Effect>,
243        command: &impl Command,
244        left: Address,
245        right: Address,
246    ) -> Result<bool, ClientError> {
247        // Must always start a new perspective for merges.
248        if let Some(p) = Option::take(&mut self.perspective) {
249            let seg = storage.write(p)?;
250            let head = seg.head()?;
251            self.heads.insert(head.address()?, seg.head_location());
252        }
253
254        let left_loc = self
255            .locate(storage, left)?
256            .ok_or(ClientError::NoSuchParent(left.id))?;
257        let right_loc = self
258            .locate(storage, right)?
259            .ok_or(ClientError::NoSuchParent(right.id))?;
260
261        let (policy, policy_id) = choose_policy(storage, engine, left_loc, right_loc)?;
262
263        // Braid commands from left and right into an ordered sequence.
264        let (braid, last_common_ancestor) =
265            make_braid_segment::<_, E>(storage, left_loc, right_loc, sink, policy)?;
266
267        let mut perspective = storage
268            .new_merge_perspective(left_loc, right_loc, last_common_ancestor, policy_id, braid)?
269            .assume(
270                "we already found left and right locations above and we only call this with merge command",
271            )?;
272        perspective.add_command(command)?;
273
274        // These are no longer heads of the transaction, since they are both covered by the merge
275        self.heads.remove(&left);
276        self.heads.remove(&right);
277
278        self.perspective = Some(perspective);
279        self.phead = Some(command.id());
280
281        Ok(true)
282    }
283
284    /// Get a perspective to which we can add a command with the given parant.
285    ///
286    /// If parent is the head of the current perspective, we can just use it.
287    /// Otherwise, we must write out the perspective and get a new one.
288    fn get_perspective(
289        &mut self,
290        parent: Address,
291        storage: &mut <SP as StorageProvider>::Storage,
292    ) -> Result<&mut <SP as StorageProvider>::Perspective, ClientError> {
293        if self.phead == Some(parent.id) {
294            // Command will append to current perspective.
295            return Ok(self
296                .perspective
297                .as_mut()
298                .assume("trx has perspective when has phead")?);
299        }
300
301        // Write out the current perspective.
302        if let Some(p) = Option::take(&mut self.perspective) {
303            self.phead = None;
304            let seg = storage.write(p)?;
305            let head = seg.head()?;
306            self.heads.insert(head.address()?, seg.head_location());
307        }
308
309        let loc = self
310            .locate(storage, parent)?
311            .ok_or(ClientError::NoSuchParent(parent.id))?;
312
313        // Get a new perspective and store it in the transaction.
314        let p = self.perspective.insert(
315            storage
316                .get_linear_perspective(loc)?
317                .assume("location should already be in storage")?,
318        );
319
320        self.phead = Some(parent.id);
321        self.heads.remove(&parent);
322
323        Ok(p)
324    }
325
326    fn init<'sp>(
327        &mut self,
328        command: &impl Command,
329        engine: &mut E,
330        provider: &'sp mut SP,
331        sink: &mut impl Sink<E::Effect>,
332    ) -> Result<&'sp mut <SP as StorageProvider>::Storage, ClientError> {
333        // Storage ID is the id of the init command by definition.
334        if self.storage_id.into_id() != command.id().into_id() {
335            return Err(ClientError::InitError);
336        }
337
338        // The init command must not have a parent.
339        if !matches!(command.parent(), Prior::None) {
340            return Err(ClientError::InitError);
341        }
342
343        // The graph must have policy to start with.
344        let Some(policy_data) = command.policy() else {
345            return Err(ClientError::InitError);
346        };
347
348        let policy_id = engine.add_policy(policy_data)?;
349        let policy = engine.get_policy(policy_id)?;
350
351        // Get an empty perspective and run the init command.
352        let mut perspective = provider.new_perspective(policy_id);
353        sink.begin();
354        if let Err(e) = policy.call_rule(command, &mut perspective, sink, CommandRecall::None) {
355            sink.rollback();
356            // We don't need to revert perspective since we just drop it.
357            return Err(e.into());
358        }
359        perspective.add_command(command)?;
360
361        let (_, storage) = provider.new_storage(perspective)?;
362
363        // Wait to commit until we are absolutely sure we've initialized.
364        sink.commit();
365
366        Ok(storage)
367    }
368}
369
370/// Run the braid algorithm and evaluate the sequence to create a braided fact index.
371fn make_braid_segment<S: Storage, E: Engine>(
372    storage: &mut S,
373    left: Location,
374    right: Location,
375    sink: &mut impl Sink<E::Effect>,
376    policy: &E::Policy,
377) -> Result<(S::FactIndex, (Location, usize)), ClientError> {
378    let order = braiding::braid(storage, left, right)?;
379    let last_common_ancestor = braiding::last_common_ancestor(storage, left, right)?;
380
381    let (&first, rest) = order.split_first().assume("braid is non-empty")?;
382
383    let mut braid_perspective = storage.get_fact_perspective(first)?;
384
385    sink.begin();
386
387    for &location in rest {
388        let segment = storage.get_segment(location)?;
389        let command = segment
390            .get_command(location)
391            .assume("braid only contains existing commands")?;
392
393        let result = policy.call_rule(
394            &command,
395            &mut braid_perspective,
396            sink,
397            CommandRecall::OnCheck,
398        );
399
400        // If the command failed in an uncontrolled way, rollback
401        if let Err(e) = result {
402            if e != EngineError::Check {
403                sink.rollback();
404                return Err(e.into());
405            }
406        }
407    }
408
409    let braid = storage.write_facts(braid_perspective)?;
410
411    sink.commit();
412
413    Ok((braid, last_common_ancestor))
414}
415
416/// Select the policy from two locations with the greatest serial value.
417fn choose_policy<'a, E: Engine>(
418    storage: &impl Storage,
419    engine: &'a E,
420    left: Location,
421    right: Location,
422) -> Result<(&'a E::Policy, PolicyId), ClientError> {
423    Ok(core::cmp::max_by_key(
424        get_policy(storage, engine, left)?,
425        get_policy(storage, engine, right)?,
426        |(p, _)| p.serial(),
427    ))
428}
429
430fn get_policy<'a, E: Engine>(
431    storage: &impl Storage,
432    engine: &'a E,
433    location: Location,
434) -> Result<(&'a E::Policy, PolicyId), ClientError> {
435    let segment = storage.get_segment(location)?;
436    let policy_id = segment.policy();
437    let policy = engine.get_policy(policy_id)?;
438    Ok((policy, policy_id))
439}
440
441#[cfg(test)]
442mod test {
443    use std::collections::HashMap;
444
445    use buggy::Bug;
446    use test_log::test;
447
448    use super::*;
449    use crate::{memory::MemStorageProvider, ClientState, Keys, MergeIds, Priority};
450
451    struct SeqEngine;
452
453    /// [`SeqPolicy`] is a very simple policy which appends the id of each
454    /// command to a fact named `b"seq"`. At each point in the graph, the value
455    /// of this fact should be equal to the ids in braid order of all facts up
456    /// to that point.
457    struct SeqPolicy;
458
459    struct SeqCommand {
460        id: CommandId,
461        prior: Prior<Address>,
462        finalize: bool,
463        data: Box<str>,
464        max_cut: usize,
465    }
466
467    impl Engine for SeqEngine {
468        type Policy = SeqPolicy;
469        type Effect = ();
470
471        fn add_policy(&mut self, _policy: &[u8]) -> Result<PolicyId, EngineError> {
472            Ok(PolicyId::new(0))
473        }
474
475        fn get_policy(&self, _id: PolicyId) -> Result<&Self::Policy, EngineError> {
476            Ok(&SeqPolicy)
477        }
478    }
479
480    impl Policy for SeqPolicy {
481        type Action<'a> = &'a str;
482        type Effect = ();
483        type Command<'a> = SeqCommand;
484
485        fn serial(&self) -> u32 {
486            0
487        }
488
489        fn call_rule(
490            &self,
491            command: &impl Command,
492            facts: &mut impl crate::FactPerspective,
493            _sink: &mut impl Sink<Self::Effect>,
494            _recall: CommandRecall,
495        ) -> Result<(), EngineError> {
496            assert!(
497                !matches!(command.parent(), Prior::Merge { .. }),
498                "merges shouldn't be evaluated"
499            );
500
501            // For init and basic commands, append the id to the seq fact.
502            let data = command.bytes();
503            if let Some(seq) = facts
504                .query("seq", &Keys::default())
505                .assume("can query")?
506                .as_deref()
507            {
508                facts.insert(
509                    "seq".into(),
510                    Keys::default(),
511                    [seq, b":", data].concat().into(),
512                );
513            } else {
514                facts.insert("seq".into(), Keys::default(), data.into());
515            };
516            Ok(())
517        }
518
519        fn call_action(
520            &self,
521            _action: Self::Action<'_>,
522            _facts: &mut impl Perspective,
523            _sink: &mut impl Sink<Self::Effect>,
524        ) -> Result<(), EngineError> {
525            unimplemented!()
526        }
527
528        fn merge<'a>(
529            &self,
530            _target: &'a mut [u8],
531            ids: MergeIds,
532        ) -> Result<Self::Command<'a>, EngineError> {
533            let (left, right): (Address, Address) = ids.into();
534            let parents = [*left.id.as_array(), *right.id.as_array()];
535            let id = CommandId::hash_for_testing_only(parents.as_flattened());
536
537            Ok(SeqCommand::new(
538                id,
539                Prior::Merge(left, right),
540                left.max_cut
541                    .max(right.max_cut)
542                    .checked_add(1)
543                    .assume("must not overflow")?,
544            ))
545        }
546    }
547
548    impl SeqCommand {
549        fn new(id: CommandId, prior: Prior<Address>, max_cut: usize) -> Self {
550            let data = id.short_b58().into_boxed_str();
551            Self {
552                id,
553                prior,
554                finalize: false,
555                data,
556                max_cut,
557            }
558        }
559
560        fn finalize(id: CommandId, prev: Address, max_cut: usize) -> Self {
561            let data = id.short_b58().into_boxed_str();
562            Self {
563                id,
564                prior: Prior::Single(prev),
565                finalize: true,
566                data,
567                max_cut,
568            }
569        }
570    }
571
572    impl Command for SeqCommand {
573        fn priority(&self) -> Priority {
574            if self.finalize {
575                return Priority::Finalize;
576            }
577            match self.prior {
578                Prior::None => Priority::Init,
579                Prior::Single(_) => {
580                    // Use the last byte of the ID as priority, just so we can
581                    // properly see the effects of braiding
582                    let id = self.id.as_bytes();
583                    let priority = u32::from(*id.last().unwrap());
584                    Priority::Basic(priority)
585                }
586                Prior::Merge(_, _) => Priority::Merge,
587            }
588        }
589
590        fn id(&self) -> CommandId {
591            self.id
592        }
593
594        fn parent(&self) -> Prior<Address> {
595            self.prior
596        }
597
598        fn policy(&self) -> Option<&[u8]> {
599            // We don't actually need any policy bytes, but the
600            // transaction/storage requires it on init commands.
601            match self.prior {
602                Prior::None { .. } => Some(b""),
603                _ => None,
604            }
605        }
606
607        fn bytes(&self) -> &[u8] {
608            self.data.as_bytes()
609        }
610
611        fn max_cut(&self) -> Result<usize, Bug> {
612            Ok(self.max_cut)
613        }
614    }
615
616    struct NullSink;
617    impl<E> Sink<E> for NullSink {
618        fn begin(&mut self) {}
619        fn consume(&mut self, _: E) {}
620        fn rollback(&mut self) {}
621        fn commit(&mut self) {}
622    }
623
624    /// [`GraphBuilder`] and the associated macro [`graph`] provide an easy way
625    /// to create a graph with a specific structure.
626    struct GraphBuilder<SP: StorageProvider> {
627        client: ClientState<SeqEngine, SP>,
628        trx: Transaction<SP, SeqEngine>,
629        max_cuts: HashMap<CommandId, usize>,
630    }
631
632    impl<SP: StorageProvider> GraphBuilder<SP> {
633        pub fn init(
634            mut client: ClientState<SeqEngine, SP>,
635            ids: &[CommandId],
636        ) -> Result<Self, ClientError> {
637            let mut trx = Transaction::new(GraphId::from(ids[0].into_id()));
638            let mut prior: Prior<Address> = Prior::None;
639            let mut max_cuts = HashMap::new();
640            for (max_cut, &id) in ids.iter().enumerate() {
641                let cmd = SeqCommand::new(id, prior, max_cut);
642                trx.add_commands(
643                    &[cmd],
644                    &mut client.provider,
645                    &mut client.engine,
646                    &mut NullSink,
647                )?;
648                max_cuts.insert(id, max_cut);
649                prior = Prior::Single(Address { id, max_cut });
650            }
651            Ok(Self {
652                client,
653                trx,
654                max_cuts,
655            })
656        }
657
658        fn get_addr(&self, id: CommandId) -> Address {
659            Address {
660                id,
661                max_cut: self.max_cuts[&id],
662            }
663        }
664
665        pub fn line(&mut self, prev: CommandId, ids: &[CommandId]) -> Result<(), ClientError> {
666            let mut prev = self.get_addr(prev);
667            for &id in ids {
668                let max_cut = prev.max_cut.checked_add(1).unwrap();
669                let cmd = SeqCommand::new(id, Prior::Single(prev), max_cut);
670                self.trx.add_commands(
671                    &[cmd],
672                    &mut self.client.provider,
673                    &mut self.client.engine,
674                    &mut NullSink,
675                )?;
676                self.max_cuts.insert(id, max_cut);
677                prev = Address { id, max_cut };
678            }
679            Ok(())
680        }
681
682        pub fn finalize(&mut self, prev: CommandId, id: CommandId) -> Result<(), ClientError> {
683            let prev = self.get_addr(prev);
684            let max_cut = prev.max_cut.checked_add(1).unwrap();
685            let cmd = SeqCommand::finalize(id, prev, max_cut);
686            self.trx.add_commands(
687                &[cmd],
688                &mut self.client.provider,
689                &mut self.client.engine,
690                &mut NullSink,
691            )?;
692            self.max_cuts.insert(id, max_cut);
693            Ok(())
694        }
695
696        pub fn merge(
697            &mut self,
698            (left, right): (CommandId, CommandId),
699            ids: &[CommandId],
700        ) -> Result<(), ClientError> {
701            let prior = Prior::Merge(self.get_addr(left), self.get_addr(right));
702            let mergecmd = SeqCommand::new(ids[0], prior, prior.next_max_cut().unwrap());
703            let mut prev = Address {
704                id: mergecmd.id,
705                max_cut: mergecmd.max_cut,
706            };
707            self.max_cuts.insert(mergecmd.id, mergecmd.max_cut);
708            self.trx.add_commands(
709                &[mergecmd],
710                &mut self.client.provider,
711                &mut self.client.engine,
712                &mut NullSink,
713            )?;
714            for &id in &ids[1..] {
715                let cmd = SeqCommand::new(
716                    id,
717                    Prior::Single(prev),
718                    prev.max_cut.checked_add(1).expect("must not overflow"),
719                );
720                prev = Address {
721                    id: cmd.id,
722                    max_cut: cmd.max_cut,
723                };
724                self.max_cuts.insert(cmd.id, cmd.max_cut);
725                self.trx.add_commands(
726                    &[cmd],
727                    &mut self.client.provider,
728                    &mut self.client.engine,
729                    &mut NullSink,
730                )?;
731            }
732            Ok(())
733        }
734
735        pub fn flush(&mut self) {
736            if let Some(p) = Option::take(&mut self.trx.perspective) {
737                self.trx.phead = None;
738                let seg = self
739                    .client
740                    .provider
741                    .get_storage(self.trx.storage_id)
742                    .unwrap()
743                    .write(p)
744                    .unwrap();
745                let head = seg.head().unwrap();
746                self.trx.heads.insert(
747                    head.address().expect("address must exist"),
748                    seg.head_location(),
749                );
750            }
751        }
752
753        pub fn commit(&mut self) -> Result<(), ClientError> {
754            self.trx.commit(
755                &mut self.client.provider,
756                &mut self.client.engine,
757                &mut NullSink,
758            )
759        }
760    }
761
762    fn mkid<T>(x: &str) -> T
763    where
764        aranya_crypto::Id: Into<T>,
765    {
766        x.parse::<aranya_crypto::Id>().unwrap().into()
767    }
768
769    /// See tests for usage.
770    macro_rules! graph {
771        ( $client:expr ; $init:literal $($inits:literal )* ; $($rest:tt)*) => {{
772            let mut gb = GraphBuilder::init($client, &[mkid($init), $(mkid($inits)),*]).unwrap();
773            graph!(@ gb, $($rest)*);
774            gb
775        }};
776        (@ $gb:ident, $prev:literal < $($id:literal)+; $($rest:tt)*) => {
777            $gb.line(mkid($prev), &[$(mkid($id)),+]).unwrap();
778            graph!(@ $gb, $($rest)*);
779        };
780        (@ $gb:ident, $l:literal $r:literal < $($id:literal)+; $($rest:tt)*) => {
781            $gb.merge((mkid($l), mkid($r)), &[$(mkid($id)),+]).unwrap();
782            graph!(@ $gb, $($rest)*);
783        };
784        (@ $gb:ident, $prev:literal < finalize $id:literal; $($rest:tt)*) => {
785            $gb.finalize(mkid($prev), mkid($id)).unwrap();
786            graph!(@ $gb, $($rest)*);
787        };
788        (@ $gb:ident, commit; $($rest:tt)*) => {
789            $gb.commit().unwrap();
790            graph!(@ $gb, $($rest)*);
791        };
792        (@ $gb:ident, ) => {
793            $gb.flush();
794        };
795    }
796
797    fn lookup(storage: &impl Storage, name: &str) -> Option<Box<[u8]>> {
798        use crate::Query;
799        let head = storage.get_head().unwrap();
800        let p = storage.get_fact_perspective(head).unwrap();
801        p.query(name, &Keys::default()).unwrap()
802    }
803
804    #[test]
805    fn test_simple() -> Result<(), StorageError> {
806        let mut gb = graph! {
807            ClientState::new(SeqEngine, MemStorageProvider::new());
808            "a";
809            "a" < "b";
810            "a" < "c";
811            "b" "c" < "ma";
812            "b" < "d";
813            "ma" "d" < "mb";
814            commit;
815        };
816        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
817
818        #[cfg(feature = "graphviz")]
819        crate::storage::memory::graphviz::dot(g, "simple");
820
821        assert_eq!(g.get_head().unwrap(), Location::new(5, 0));
822
823        let seq = lookup(g, "seq").unwrap();
824        let seq = std::str::from_utf8(&seq).unwrap();
825        assert_eq!(seq, "a:b:d:c");
826
827        Ok(())
828    }
829
830    #[test]
831    fn test_complex() -> Result<(), StorageError> {
832        let mut gb = graph! {
833            ClientState::new(SeqEngine, MemStorageProvider::new());
834            "a";
835            "a" < "1" "2" "3";
836            "3" < "4" "6" "7";
837            "3" < "5" "8";
838            "6" "8" < "9" "aa"; commit;
839            "7" < "a1" "a2";
840            "aa" "a2" < "a3";
841            "a3" < "a6" "a4";
842            "a3" < "a7" "a5";
843            "a4" "a5" < "a8";
844            "9" < "42" "43";
845            "42" < "45" "46";
846            "45" < "47" "48";
847            commit;
848        };
849
850        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
851
852        #[cfg(feature = "graphviz")]
853        crate::storage::memory::graphviz::dot(g, "complex");
854
855        assert_eq!(g.get_head().unwrap(), Location::new(15, 0));
856
857        let seq = lookup(g, "seq").unwrap();
858        let seq = std::str::from_utf8(&seq).unwrap();
859        assert_eq!(
860            seq,
861            "a:1:2:3:5:8:4:6:42:45:47:48:46:43:aa:7:a1:a2:a7:a6:a5:a4"
862        );
863
864        Ok(())
865    }
866
867    #[test]
868    fn test_duplicates() {
869        let mut gb = graph! {
870            ClientState::new(SeqEngine, MemStorageProvider::new());
871            "a";
872            "a" < "b" "c";
873            "a" < "b";
874            "b" < "c";
875            "c" < "d";
876            commit;
877            "a" < "b";
878            "b" < "c";
879            "d" < "e";
880            commit;
881        };
882
883        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
884
885        #[cfg(feature = "graphviz")]
886        crate::storage::memory::graphviz::dot(g, "duplicates");
887
888        assert_eq!(g.get_head().unwrap(), Location::new(2, 0));
889
890        let seq = lookup(g, "seq").unwrap();
891        let seq = std::str::from_utf8(&seq).unwrap();
892        assert_eq!(seq, "a:b:c:d:e");
893    }
894
895    #[test]
896    fn test_mid_braid_1() {
897        let mut gb = graph! {
898            ClientState::new(SeqEngine, MemStorageProvider::new());
899            "a";
900            commit;
901            "a" < "b" "c" "d" "e" "f" "g";
902            "d" < "h" "i" "j";
903            commit;
904        };
905
906        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
907
908        #[cfg(feature = "graphviz")]
909        crate::storage::memory::graphviz::dot(g, "mid_braid_1");
910
911        assert_eq!(g.get_head().unwrap(), Location::new(3, 0));
912
913        let seq = lookup(g, "seq").unwrap();
914        let seq = std::str::from_utf8(&seq).unwrap();
915        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
916    }
917
918    #[test]
919    fn test_mid_braid_2() {
920        let mut gb = graph! {
921            ClientState::new(SeqEngine, MemStorageProvider::new());
922            "a";
923            commit;
924            "a" < "b" "c" "d" "h" "i" "j";
925            "d" < "e" "f" "g";
926            commit;
927        };
928
929        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
930
931        #[cfg(feature = "graphviz")]
932        crate::storage::memory::graphviz::dot(g, "mid_braid_2");
933
934        assert_eq!(g.get_head().unwrap(), Location::new(3, 0));
935
936        let seq = lookup(g, "seq").unwrap();
937        let seq = std::str::from_utf8(&seq).unwrap();
938        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
939    }
940
941    #[test]
942    fn test_sequential_finalize() {
943        let mut gb = graph! {
944            ClientState::new(SeqEngine, MemStorageProvider::new());
945            "a";
946            commit;
947            "a" < "b" "c" "d" "e" "f" "g";
948            "d" < "h" "i" "j";
949            "e" < finalize "fff1";
950            "fff1" < "x" "y";
951            "y" < finalize "fff2";
952            commit;
953        };
954
955        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
956
957        #[cfg(feature = "graphviz")]
958        crate::storage::memory::graphviz::dot(g, "finalize_success");
959
960        assert_eq!(g.get_head().unwrap(), Location::new(5, 0));
961
962        let seq = lookup(g, "seq").unwrap();
963        let seq = std::str::from_utf8(&seq).unwrap();
964        assert_eq!(seq, "a:b:c:d:e:fff1:x:y:fff2:h:i:j:f:g");
965    }
966
967    #[test]
968    fn test_parallel_finalize() {
969        let mut gb = graph! {
970            ClientState::new(SeqEngine, MemStorageProvider::new());
971            "a";
972            commit;
973            "a" < "b" "c" "d" "e" "f" "g";
974            "d" < "h" "i" "j";
975            "e" < finalize "fff1";
976            "i" < finalize "fff2";
977        };
978        let err = gb.commit().expect_err("merge should fail");
979        assert!(matches!(err, ClientError::ParallelFinalize), "{err:?}")
980    }
981}