aranya_runtime/client/
transaction.rs

1use alloc::collections::{BTreeMap, VecDeque};
2use core::{marker::PhantomData, mem};
3
4use buggy::{BugExt as _, bug};
5
6use super::braiding;
7use crate::{
8    Address, ClientError, CmdId, Command, Engine, EngineError, GraphId, Location,
9    MAX_COMMAND_LENGTH, MergeIds, Perspective as _, Policy as _, PolicyId, Prior, Revertable as _,
10    Segment as _, Sink, Storage, StorageError, StorageProvider, engine::CommandPlacement,
11};
12
13/// Transaction used to receive many commands at once.
14///
15/// The transaction allows us to have many temporary heads at once, so we don't
16/// need as many merges when adding commands. When the transaction is committed,
17/// we will merge all temporary heads and the storage head, and then commit the
18/// result as the new storage head.
19pub struct Transaction<SP: StorageProvider, E> {
20    /// The ID of the associated storage
21    storage_id: GraphId,
22    /// Current working perspective
23    perspective: Option<SP::Perspective>,
24    /// Head of the current perspective
25    phead: Option<CmdId>,
26    /// Written but not committed heads
27    heads: BTreeMap<Address, Location>,
28    /// Tag for associated engine
29    _engine: PhantomData<E>,
30}
31
32impl<SP: StorageProvider, E> Transaction<SP, E> {
33    pub(super) const fn new(storage_id: GraphId) -> Self {
34        Self {
35            storage_id,
36            perspective: None,
37            phead: None,
38            heads: BTreeMap::new(),
39            _engine: PhantomData,
40        }
41    }
42}
43
44impl<SP: StorageProvider, E: Engine> Transaction<SP, E> {
45    /// Returns the transaction's storage id.
46    pub fn storage_id(&self) -> GraphId {
47        self.storage_id
48    }
49
50    /// Find a given id if reachable within this transaction.
51    ///
52    /// Does not search `self.perspective`, which should be written out beforehand.
53    fn locate(
54        &self,
55        storage: &mut SP::Storage,
56        address: Address,
57    ) -> Result<Option<Location>, ClientError> {
58        // Search from committed head.
59        if let Some(found) = storage.get_location(address)? {
60            return Ok(Some(found));
61        }
62        // Search from our temporary heads.
63        for &head in self.heads.values() {
64            if let Some(found) = storage.get_location_from(head, address)? {
65                return Ok(Some(found));
66            }
67        }
68        Ok(None)
69    }
70
71    /// Write current perspective, merge transaction heads, and commit to graph.
72    pub(super) fn commit(
73        &mut self,
74        provider: &mut SP,
75        engine: &mut E,
76        sink: &mut impl Sink<E::Effect>,
77    ) -> Result<(), ClientError> {
78        let storage = provider.get_storage(self.storage_id)?;
79
80        // Write out current perspective.
81        if let Some(p) = Option::take(&mut self.perspective) {
82            self.phead = None;
83            let segment = storage.write(p)?;
84            let head = segment.head()?;
85            self.heads.insert(head.address()?, segment.head_location());
86        }
87
88        // Merge heads pairwise until single head left, then commit.
89        // TODO(#370): Merge deterministically
90        let mut heads: VecDeque<_> = mem::take(&mut self.heads).into_iter().collect();
91        let mut merging_head = false;
92        while let Some((left_id, mut left_loc)) = heads.pop_front() {
93            if let Some((right_id, mut right_loc)) = heads.pop_front() {
94                let (policy, policy_id) = choose_policy(storage, engine, left_loc, right_loc)?;
95
96                let mut buffer = [0u8; MAX_COMMAND_LENGTH];
97                let merge_ids = MergeIds::new(left_id, right_id).assume("merging different ids")?;
98                if left_id > right_id {
99                    mem::swap(&mut left_loc, &mut right_loc);
100                }
101                let command = policy.merge(&mut buffer, merge_ids)?;
102
103                let (braid, last_common_ancestor) =
104                    make_braid_segment::<_, E>(storage, left_loc, right_loc, sink, policy)?;
105
106                let mut perspective = storage.new_merge_perspective(
107                    left_loc,
108                    right_loc,
109                    last_common_ancestor,
110                    policy_id,
111                    braid,
112                )?;
113                perspective.add_command(&command)?;
114
115                let segment = storage.write(perspective)?;
116                let head = segment.head()?;
117
118                heads.push_back((head.address()?, segment.head_location()));
119            } else {
120                let segment = storage.get_segment(left_loc)?;
121                // Try to commit. If it fails with `HeadNotAncestor`, we know we
122                // need to merge with the graph head.
123                match storage.commit(segment) {
124                    Ok(()) => break,
125                    Err(StorageError::HeadNotAncestor) => {
126                        if merging_head {
127                            bug!("merging with graph head again, would loop");
128                        }
129
130                        merging_head = true;
131
132                        heads.push_back((left_id, left_loc));
133
134                        let head_loc = storage.get_head()?;
135                        let segment = storage.get_segment(head_loc)?;
136                        let head = segment.head()?;
137                        heads.push_back((head.address()?, segment.head_location()));
138                    }
139                    Err(e) => return Err(e.into()),
140                }
141            }
142        }
143
144        Ok(())
145    }
146
147    /// Attempt to store the `command` in the graph with `storage_id`. Effects will be
148    /// emitted to the `sink`. This interface is used when syncing with another device
149    /// and integrating the new commands.
150    pub(super) fn add_commands(
151        &mut self,
152        commands: &[impl Command],
153        provider: &mut SP,
154        engine: &mut E,
155        sink: &mut impl Sink<E::Effect>,
156    ) -> Result<usize, ClientError> {
157        let mut commands = commands.iter();
158        let mut count: usize = 0;
159
160        // Get storage or try to initialize with first command.
161        let storage = match provider.get_storage(self.storage_id) {
162            Ok(s) => s,
163            Err(StorageError::NoSuchStorage) => {
164                let command = commands.next().ok_or(ClientError::InitError)?;
165                count = count.checked_add(1).assume("must not overflow")?;
166                self.init(command, engine, provider, sink)?
167            }
168            Err(e) => return Err(e.into()),
169        };
170
171        // Handle remaining commands.
172        for command in commands {
173            if self
174                .perspective
175                .as_ref()
176                .is_some_and(|p| p.includes(command.id()))
177            {
178                // Command in current perspective.
179                continue;
180            }
181            if (self.locate(storage, command.address()?)?).is_some() {
182                // Command already added.
183                continue;
184            }
185            match command.parent() {
186                Prior::None => {
187                    if command.id().into_id() == self.storage_id.into_id() {
188                        // Graph already initialized, extra init just spurious
189                    } else {
190                        bug!("init command does not belong in graph");
191                    }
192                }
193                Prior::Single(parent) => {
194                    self.add_single(storage, engine, sink, command, parent)?;
195                    count = count.checked_add(1).assume("must not overflow")?;
196                }
197                Prior::Merge(left, right) => {
198                    self.add_merge(storage, engine, sink, command, left, right)?;
199                    count = count.checked_add(1).assume("must not overflow")?;
200                }
201            }
202        }
203
204        Ok(count)
205    }
206
207    fn add_single(
208        &mut self,
209        storage: &mut <SP as StorageProvider>::Storage,
210        engine: &mut E,
211        sink: &mut impl Sink<E::Effect>,
212        command: &impl Command,
213        parent: Address,
214    ) -> Result<(), ClientError> {
215        let perspective = self.get_perspective(parent, storage)?;
216
217        let policy_id = perspective.policy();
218        let policy = engine.get_policy(policy_id)?;
219
220        // Try to run command, or revert if failed.
221        sink.begin();
222        let checkpoint = perspective.checkpoint();
223        if let Err(e) = policy.call_rule(
224            command,
225            perspective,
226            sink,
227            CommandPlacement::OnGraphAtOrigin,
228        ) {
229            perspective.revert(checkpoint)?;
230            sink.rollback();
231            return Err(e.into());
232        }
233        perspective.add_command(command)?;
234        sink.commit();
235
236        self.phead = Some(command.id());
237
238        Ok(())
239    }
240
241    fn add_merge(
242        &mut self,
243        storage: &mut <SP as StorageProvider>::Storage,
244        engine: &mut E,
245        sink: &mut impl Sink<E::Effect>,
246        command: &impl Command,
247        left: Address,
248        right: Address,
249    ) -> Result<bool, ClientError> {
250        // Must always start a new perspective for merges.
251        if let Some(p) = Option::take(&mut self.perspective) {
252            let seg = storage.write(p)?;
253            let head = seg.head()?;
254            self.heads.insert(head.address()?, seg.head_location());
255        }
256
257        let left_loc = self
258            .locate(storage, left)?
259            .ok_or(ClientError::NoSuchParent(left.id))?;
260        let right_loc = self
261            .locate(storage, right)?
262            .ok_or(ClientError::NoSuchParent(right.id))?;
263
264        let (policy, policy_id) = choose_policy(storage, engine, left_loc, right_loc)?;
265
266        // Braid commands from left and right into an ordered sequence.
267        let (braid, last_common_ancestor) =
268            make_braid_segment::<_, E>(storage, left_loc, right_loc, sink, policy)?;
269
270        let mut perspective = storage.new_merge_perspective(
271            left_loc,
272            right_loc,
273            last_common_ancestor,
274            policy_id,
275            braid,
276        )?;
277        perspective.add_command(command)?;
278
279        // These are no longer heads of the transaction, since they are both covered by the merge
280        self.heads.remove(&left);
281        self.heads.remove(&right);
282
283        self.perspective = Some(perspective);
284        self.phead = Some(command.id());
285
286        Ok(true)
287    }
288
289    /// Get a perspective to which we can add a command with the given parant.
290    ///
291    /// If parent is the head of the current perspective, we can just use it.
292    /// Otherwise, we must write out the perspective and get a new one.
293    fn get_perspective(
294        &mut self,
295        parent: Address,
296        storage: &mut <SP as StorageProvider>::Storage,
297    ) -> Result<&mut <SP as StorageProvider>::Perspective, ClientError> {
298        if self.phead == Some(parent.id) {
299            // Command will append to current perspective.
300            return Ok(self
301                .perspective
302                .as_mut()
303                .assume("trx has perspective when has phead")?);
304        }
305
306        // Write out the current perspective.
307        if let Some(p) = Option::take(&mut self.perspective) {
308            self.phead = None;
309            let seg = storage.write(p)?;
310            let head = seg.head()?;
311            self.heads.insert(head.address()?, seg.head_location());
312        }
313
314        let loc = self
315            .locate(storage, parent)?
316            .ok_or(ClientError::NoSuchParent(parent.id))?;
317
318        // Get a new perspective and store it in the transaction.
319        let p = self
320            .perspective
321            .insert(storage.get_linear_perspective(loc)?);
322
323        self.phead = Some(parent.id);
324        self.heads.remove(&parent);
325
326        Ok(p)
327    }
328
329    fn init<'sp>(
330        &mut self,
331        command: &impl Command,
332        engine: &mut E,
333        provider: &'sp mut SP,
334        sink: &mut impl Sink<E::Effect>,
335    ) -> Result<&'sp mut <SP as StorageProvider>::Storage, ClientError> {
336        // Storage ID is the id of the init command by definition.
337        if self.storage_id.into_id() != command.id().into_id() {
338            return Err(ClientError::InitError);
339        }
340
341        // The init command must not have a parent.
342        if !matches!(command.parent(), Prior::None) {
343            return Err(ClientError::InitError);
344        }
345
346        // The graph must have policy to start with.
347        let Some(policy_data) = command.policy() else {
348            return Err(ClientError::InitError);
349        };
350
351        let policy_id = engine.add_policy(policy_data)?;
352        let policy = engine.get_policy(policy_id)?;
353
354        // Get an empty perspective and run the init command.
355        let mut perspective = provider.new_perspective(policy_id);
356        sink.begin();
357        if let Err(e) = policy.call_rule(
358            command,
359            &mut perspective,
360            sink,
361            CommandPlacement::OnGraphAtOrigin,
362        ) {
363            sink.rollback();
364            // We don't need to revert perspective since we just drop it.
365            return Err(e.into());
366        }
367        perspective.add_command(command)?;
368
369        let (_, storage) = provider.new_storage(perspective)?;
370
371        // Wait to commit until we are absolutely sure we've initialized.
372        sink.commit();
373
374        Ok(storage)
375    }
376}
377
378/// Run the braid algorithm and evaluate the sequence to create a braided fact index.
379fn make_braid_segment<S: Storage, E: Engine>(
380    storage: &mut S,
381    left: Location,
382    right: Location,
383    sink: &mut impl Sink<E::Effect>,
384    policy: &E::Policy,
385) -> Result<(S::FactIndex, (Location, usize)), ClientError> {
386    let order = braiding::braid(storage, left, right)?;
387    let last_common_ancestor = braiding::last_common_ancestor(storage, left, right)?;
388
389    let (&first, rest) = order.split_first().assume("braid is non-empty")?;
390
391    let mut braid_perspective = storage.get_fact_perspective(first)?;
392
393    sink.begin();
394
395    for &location in rest {
396        let segment = storage.get_segment(location)?;
397        let command = segment
398            .get_command(location)
399            .assume("braid only contains existing commands")?;
400
401        let result = policy.call_rule(
402            &command,
403            &mut braid_perspective,
404            sink,
405            CommandPlacement::OnGraphInBraid,
406        );
407
408        // If the command failed in an uncontrolled way, rollback
409        if let Err(e) = result {
410            if e != EngineError::Check {
411                sink.rollback();
412                return Err(e.into());
413            }
414        }
415    }
416
417    let braid = storage.write_facts(braid_perspective)?;
418
419    sink.commit();
420
421    Ok((braid, last_common_ancestor))
422}
423
424/// Select the policy from two locations with the greatest serial value.
425fn choose_policy<'a, E: Engine>(
426    storage: &impl Storage,
427    engine: &'a E,
428    left: Location,
429    right: Location,
430) -> Result<(&'a E::Policy, PolicyId), ClientError> {
431    Ok(core::cmp::max_by_key(
432        get_policy(storage, engine, left)?,
433        get_policy(storage, engine, right)?,
434        |(p, _)| p.serial(),
435    ))
436}
437
438fn get_policy<'a, E: Engine>(
439    storage: &impl Storage,
440    engine: &'a E,
441    location: Location,
442) -> Result<(&'a E::Policy, PolicyId), ClientError> {
443    let segment = storage.get_segment(location)?;
444    let policy_id = segment.policy();
445    let policy = engine.get_policy(policy_id)?;
446    Ok((policy, policy_id))
447}
448
449#[cfg(test)]
450mod test {
451    use std::collections::HashMap;
452
453    use buggy::Bug;
454    use test_log::test;
455
456    use super::*;
457    use crate::{
458        ClientState, Keys, MergeIds, Perspective, Policy, Priority,
459        engine::{ActionPlacement, CommandPlacement},
460        memory::MemStorageProvider,
461        testing::{hash_for_testing_only, short_b58},
462    };
463
464    struct SeqEngine;
465
466    /// [`SeqPolicy`] is a very simple policy which appends the id of each
467    /// command to a fact named `b"seq"`. At each point in the graph, the value
468    /// of this fact should be equal to the ids in braid order of all facts up
469    /// to that point.
470    struct SeqPolicy;
471
472    struct SeqCommand {
473        id: CmdId,
474        prior: Prior<Address>,
475        finalize: bool,
476        data: Box<str>,
477        max_cut: usize,
478    }
479
480    impl Engine for SeqEngine {
481        type Policy = SeqPolicy;
482        type Effect = ();
483
484        fn add_policy(&mut self, _policy: &[u8]) -> Result<PolicyId, EngineError> {
485            Ok(PolicyId::new(0))
486        }
487
488        fn get_policy(&self, _id: PolicyId) -> Result<&Self::Policy, EngineError> {
489            Ok(&SeqPolicy)
490        }
491    }
492
493    impl Policy for SeqPolicy {
494        type Action<'a> = &'a str;
495        type Effect = ();
496        type Command<'a> = SeqCommand;
497
498        fn serial(&self) -> u32 {
499            0
500        }
501
502        fn call_rule(
503            &self,
504            command: &impl Command,
505            facts: &mut impl crate::FactPerspective,
506            _sink: &mut impl Sink<Self::Effect>,
507            _placement: CommandPlacement,
508        ) -> Result<(), EngineError> {
509            assert!(
510                !matches!(command.parent(), Prior::Merge { .. }),
511                "merges shouldn't be evaluated"
512            );
513
514            // For init and basic commands, append the id to the seq fact.
515            let data = command.bytes();
516            if let Some(seq) = facts
517                .query("seq", &Keys::default())
518                .assume("can query")?
519                .as_deref()
520            {
521                facts.insert(
522                    "seq".into(),
523                    Keys::default(),
524                    [seq, b":", data].concat().into(),
525                );
526            } else {
527                facts.insert("seq".into(), Keys::default(), data.into());
528            }
529            Ok(())
530        }
531
532        fn call_action(
533            &self,
534            _action: Self::Action<'_>,
535            _facts: &mut impl Perspective,
536            _sink: &mut impl Sink<Self::Effect>,
537            _placement: ActionPlacement,
538        ) -> Result<(), EngineError> {
539            unimplemented!()
540        }
541
542        fn merge<'a>(
543            &self,
544            _target: &'a mut [u8],
545            ids: MergeIds,
546        ) -> Result<Self::Command<'a>, EngineError> {
547            let (left, right): (Address, Address) = ids.into();
548            let parents = [*left.id.as_array(), *right.id.as_array()];
549            let id = hash_for_testing_only(parents.as_flattened());
550
551            Ok(SeqCommand::new(
552                id,
553                Prior::Merge(left, right),
554                left.max_cut
555                    .max(right.max_cut)
556                    .checked_add(1)
557                    .assume("must not overflow")?,
558            ))
559        }
560    }
561
562    impl SeqCommand {
563        fn new(id: CmdId, prior: Prior<Address>, max_cut: usize) -> Self {
564            let data = short_b58(id).into_boxed_str();
565            Self {
566                id,
567                prior,
568                finalize: false,
569                data,
570                max_cut,
571            }
572        }
573
574        fn finalize(id: CmdId, prev: Address, max_cut: usize) -> Self {
575            let data = short_b58(id).into_boxed_str();
576            Self {
577                id,
578                prior: Prior::Single(prev),
579                finalize: true,
580                data,
581                max_cut,
582            }
583        }
584    }
585
586    impl Command for SeqCommand {
587        fn priority(&self) -> Priority {
588            if self.finalize {
589                return Priority::Finalize;
590            }
591            match self.prior {
592                Prior::None => Priority::Init,
593                Prior::Single(_) => {
594                    // Use the last byte of the ID as priority, just so we can
595                    // properly see the effects of braiding
596                    let id = self.id.as_bytes();
597                    let priority = u32::from(*id.last().unwrap());
598                    Priority::Basic(priority)
599                }
600                Prior::Merge(_, _) => Priority::Merge,
601            }
602        }
603
604        fn id(&self) -> CmdId {
605            self.id
606        }
607
608        fn parent(&self) -> Prior<Address> {
609            self.prior
610        }
611
612        fn policy(&self) -> Option<&[u8]> {
613            // We don't actually need any policy bytes, but the
614            // transaction/storage requires it on init commands.
615            match self.prior {
616                Prior::None => Some(b""),
617                _ => None,
618            }
619        }
620
621        fn bytes(&self) -> &[u8] {
622            self.data.as_bytes()
623        }
624
625        fn max_cut(&self) -> Result<usize, Bug> {
626            Ok(self.max_cut)
627        }
628    }
629
630    struct NullSink;
631    impl<E> Sink<E> for NullSink {
632        fn begin(&mut self) {}
633        fn consume(&mut self, _: E) {}
634        fn rollback(&mut self) {}
635        fn commit(&mut self) {}
636    }
637
638    /// [`GraphBuilder`] and the associated macro [`graph`] provide an easy way
639    /// to create a graph with a specific structure.
640    struct GraphBuilder<SP: StorageProvider> {
641        client: ClientState<SeqEngine, SP>,
642        trx: Transaction<SP, SeqEngine>,
643        max_cuts: HashMap<CmdId, usize>,
644    }
645
646    impl<SP: StorageProvider> GraphBuilder<SP> {
647        pub fn init(
648            mut client: ClientState<SeqEngine, SP>,
649            ids: &[CmdId],
650        ) -> Result<Self, ClientError> {
651            let mut trx = Transaction::new(GraphId::from(ids[0].into_id()));
652            let mut prior: Prior<Address> = Prior::None;
653            let mut max_cuts = HashMap::new();
654            for (max_cut, &id) in ids.iter().enumerate() {
655                let cmd = SeqCommand::new(id, prior, max_cut);
656                trx.add_commands(
657                    &[cmd],
658                    &mut client.provider,
659                    &mut client.engine,
660                    &mut NullSink,
661                )?;
662                max_cuts.insert(id, max_cut);
663                prior = Prior::Single(Address { id, max_cut });
664            }
665            Ok(Self {
666                client,
667                trx,
668                max_cuts,
669            })
670        }
671
672        fn get_addr(&self, id: CmdId) -> Address {
673            Address {
674                id,
675                max_cut: self.max_cuts[&id],
676            }
677        }
678
679        pub fn line(&mut self, prev: CmdId, ids: &[CmdId]) -> Result<(), ClientError> {
680            let mut prev = self.get_addr(prev);
681            for &id in ids {
682                let max_cut = prev.max_cut.checked_add(1).unwrap();
683                let cmd = SeqCommand::new(id, Prior::Single(prev), max_cut);
684                self.trx.add_commands(
685                    &[cmd],
686                    &mut self.client.provider,
687                    &mut self.client.engine,
688                    &mut NullSink,
689                )?;
690                self.max_cuts.insert(id, max_cut);
691                prev = Address { id, max_cut };
692            }
693            Ok(())
694        }
695
696        pub fn finalize(&mut self, prev: CmdId, id: CmdId) -> Result<(), ClientError> {
697            let prev = self.get_addr(prev);
698            let max_cut = prev.max_cut.checked_add(1).unwrap();
699            let cmd = SeqCommand::finalize(id, prev, max_cut);
700            self.trx.add_commands(
701                &[cmd],
702                &mut self.client.provider,
703                &mut self.client.engine,
704                &mut NullSink,
705            )?;
706            self.max_cuts.insert(id, max_cut);
707            Ok(())
708        }
709
710        pub fn merge(
711            &mut self,
712            (left, right): (CmdId, CmdId),
713            ids: &[CmdId],
714        ) -> Result<(), ClientError> {
715            let prior = Prior::Merge(self.get_addr(left), self.get_addr(right));
716            let mergecmd = SeqCommand::new(ids[0], prior, prior.next_max_cut().unwrap());
717            let mut prev = Address {
718                id: mergecmd.id,
719                max_cut: mergecmd.max_cut,
720            };
721            self.max_cuts.insert(mergecmd.id, mergecmd.max_cut);
722            self.trx.add_commands(
723                &[mergecmd],
724                &mut self.client.provider,
725                &mut self.client.engine,
726                &mut NullSink,
727            )?;
728            for &id in &ids[1..] {
729                let cmd = SeqCommand::new(
730                    id,
731                    Prior::Single(prev),
732                    prev.max_cut.checked_add(1).expect("must not overflow"),
733                );
734                prev = Address {
735                    id: cmd.id,
736                    max_cut: cmd.max_cut,
737                };
738                self.max_cuts.insert(cmd.id, cmd.max_cut);
739                self.trx.add_commands(
740                    &[cmd],
741                    &mut self.client.provider,
742                    &mut self.client.engine,
743                    &mut NullSink,
744                )?;
745            }
746            Ok(())
747        }
748
749        pub fn flush(&mut self) {
750            if let Some(p) = Option::take(&mut self.trx.perspective) {
751                self.trx.phead = None;
752                let seg = self
753                    .client
754                    .provider
755                    .get_storage(self.trx.storage_id)
756                    .unwrap()
757                    .write(p)
758                    .unwrap();
759                let head = seg.head().unwrap();
760                self.trx.heads.insert(
761                    head.address().expect("address must exist"),
762                    seg.head_location(),
763                );
764            }
765        }
766
767        pub fn commit(&mut self) -> Result<(), ClientError> {
768            self.trx.commit(
769                &mut self.client.provider,
770                &mut self.client.engine,
771                &mut NullSink,
772            )
773        }
774    }
775
776    fn mkid<T>(x: &str) -> T
777    where
778        aranya_crypto::BaseId: Into<T>,
779    {
780        x.parse::<aranya_crypto::BaseId>().unwrap().into()
781    }
782
783    /// See tests for usage.
784    macro_rules! graph {
785        ( $client:expr ; $init:literal $($inits:literal )* ; $($rest:tt)*) => {{
786            let mut gb = GraphBuilder::init($client, &[mkid($init), $(mkid($inits)),*]).unwrap();
787            graph!(@ gb, $($rest)*);
788            gb
789        }};
790        (@ $gb:ident, $prev:literal < $($id:literal)+; $($rest:tt)*) => {
791            $gb.line(mkid($prev), &[$(mkid($id)),+]).unwrap();
792            graph!(@ $gb, $($rest)*);
793        };
794        (@ $gb:ident, $l:literal $r:literal < $($id:literal)+; $($rest:tt)*) => {
795            $gb.merge((mkid($l), mkid($r)), &[$(mkid($id)),+]).unwrap();
796            graph!(@ $gb, $($rest)*);
797        };
798        (@ $gb:ident, $prev:literal < finalize $id:literal; $($rest:tt)*) => {
799            $gb.finalize(mkid($prev), mkid($id)).unwrap();
800            graph!(@ $gb, $($rest)*);
801        };
802        (@ $gb:ident, commit; $($rest:tt)*) => {
803            $gb.commit().unwrap();
804            graph!(@ $gb, $($rest)*);
805        };
806        (@ $gb:ident, ) => {
807            $gb.flush();
808        };
809    }
810
811    fn lookup(storage: &impl Storage, name: &str) -> Option<Box<[u8]>> {
812        use crate::Query as _;
813        let head = storage.get_head().unwrap();
814        let p = storage.get_fact_perspective(head).unwrap();
815        p.query(name, &Keys::default()).unwrap()
816    }
817
818    #[test]
819    fn test_simple() -> Result<(), StorageError> {
820        let mut gb = graph! {
821            ClientState::new(SeqEngine, MemStorageProvider::new());
822            "a";
823            "a" < "b";
824            "a" < "c";
825            "b" "c" < "ma";
826            "b" < "d";
827            "ma" "d" < "mb";
828            commit;
829        };
830        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
831
832        #[cfg(feature = "graphviz")]
833        crate::storage::memory::graphviz::dot(g, "simple");
834
835        assert_eq!(g.get_head().unwrap(), Location::new(5, 0));
836
837        let seq = lookup(g, "seq").unwrap();
838        let seq = std::str::from_utf8(&seq).unwrap();
839        assert_eq!(seq, "a:b:d:c");
840
841        Ok(())
842    }
843
844    #[test]
845    fn test_complex() -> Result<(), StorageError> {
846        let mut gb = graph! {
847            ClientState::new(SeqEngine, MemStorageProvider::new());
848            "a";
849            "a" < "1" "2" "3";
850            "3" < "4" "6" "7";
851            "3" < "5" "8";
852            "6" "8" < "9" "aa"; commit;
853            "7" < "a1" "a2";
854            "aa" "a2" < "a3";
855            "a3" < "a6" "a4";
856            "a3" < "a7" "a5";
857            "a4" "a5" < "a8";
858            "9" < "42" "43";
859            "42" < "45" "46";
860            "45" < "47" "48";
861            commit;
862        };
863
864        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
865
866        #[cfg(feature = "graphviz")]
867        crate::storage::memory::graphviz::dot(g, "complex");
868
869        assert_eq!(g.get_head().unwrap(), Location::new(15, 0));
870
871        let seq = lookup(g, "seq").unwrap();
872        let seq = std::str::from_utf8(&seq).unwrap();
873        assert_eq!(
874            seq,
875            "a:1:2:3:5:8:4:6:42:45:47:48:46:43:aa:7:a1:a2:a7:a6:a5:a4"
876        );
877
878        Ok(())
879    }
880
881    #[test]
882    fn test_duplicates() {
883        let mut gb = graph! {
884            ClientState::new(SeqEngine, MemStorageProvider::new());
885            "a";
886            "a" < "b" "c";
887            "a" < "b";
888            "b" < "c";
889            "c" < "d";
890            commit;
891            "a" < "b";
892            "b" < "c";
893            "d" < "e";
894            commit;
895        };
896
897        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
898
899        #[cfg(feature = "graphviz")]
900        crate::storage::memory::graphviz::dot(g, "duplicates");
901
902        assert_eq!(g.get_head().unwrap(), Location::new(2, 0));
903
904        let seq = lookup(g, "seq").unwrap();
905        let seq = std::str::from_utf8(&seq).unwrap();
906        assert_eq!(seq, "a:b:c:d:e");
907    }
908
909    #[test]
910    fn test_mid_braid_1() {
911        let mut gb = graph! {
912            ClientState::new(SeqEngine, MemStorageProvider::new());
913            "a";
914            commit;
915            "a" < "b" "c" "d" "e" "f" "g";
916            "d" < "h" "i" "j";
917            commit;
918        };
919
920        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
921
922        #[cfg(feature = "graphviz")]
923        crate::storage::memory::graphviz::dot(g, "mid_braid_1");
924
925        assert_eq!(g.get_head().unwrap(), Location::new(3, 0));
926
927        let seq = lookup(g, "seq").unwrap();
928        let seq = std::str::from_utf8(&seq).unwrap();
929        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
930    }
931
932    #[test]
933    fn test_mid_braid_2() {
934        let mut gb = graph! {
935            ClientState::new(SeqEngine, MemStorageProvider::new());
936            "a";
937            commit;
938            "a" < "b" "c" "d" "h" "i" "j";
939            "d" < "e" "f" "g";
940            commit;
941        };
942
943        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
944
945        #[cfg(feature = "graphviz")]
946        crate::storage::memory::graphviz::dot(g, "mid_braid_2");
947
948        assert_eq!(g.get_head().unwrap(), Location::new(3, 0));
949
950        let seq = lookup(g, "seq").unwrap();
951        let seq = std::str::from_utf8(&seq).unwrap();
952        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
953    }
954
955    #[test]
956    fn test_sequential_finalize() {
957        let mut gb = graph! {
958            ClientState::new(SeqEngine, MemStorageProvider::new());
959            "a";
960            commit;
961            "a" < "b" "c" "d" "e" "f" "g";
962            "d" < "h" "i" "j";
963            "e" < finalize "fff1";
964            "fff1" < "x" "y";
965            "y" < finalize "fff2";
966            commit;
967        };
968
969        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
970
971        #[cfg(feature = "graphviz")]
972        crate::storage::memory::graphviz::dot(g, "finalize_success");
973
974        assert_eq!(g.get_head().unwrap(), Location::new(5, 0));
975
976        let seq = lookup(g, "seq").unwrap();
977        let seq = std::str::from_utf8(&seq).unwrap();
978        assert_eq!(seq, "a:b:c:d:e:fff1:x:y:fff2:h:i:j:f:g");
979    }
980
981    #[test]
982    fn test_parallel_finalize() {
983        let mut gb = graph! {
984            ClientState::new(SeqEngine, MemStorageProvider::new());
985            "a";
986            commit;
987            "a" < "b" "c" "d" "e" "f" "g";
988            "d" < "h" "i" "j";
989            "e" < finalize "fff1";
990            "i" < finalize "fff2";
991        };
992        let err = gb.commit().expect_err("merge should fail");
993        assert!(matches!(err, ClientError::ParallelFinalize), "{err:?}");
994    }
995}