Skip to main content

aranya_runtime/client/
transaction.rs

1use alloc::collections::{BTreeMap, VecDeque};
2use core::{marker::PhantomData, mem};
3
4use buggy::{BugExt as _, bug};
5
6use super::braiding;
7use crate::{
8    Address, ClientError, CmdId, Command, GraphId, Location, MAX_COMMAND_LENGTH, MergeIds,
9    Perspective as _, Policy as _, PolicyError, PolicyId, PolicyStore, Prior, Revertable as _,
10    Segment as _, Sink, Storage, StorageError, StorageProvider, policy::CommandPlacement,
11};
12
13/// Transaction used to receive many commands at once.
14///
15/// The transaction allows us to have many temporary heads at once, so we don't
16/// need as many merges when adding commands. When the transaction is committed,
17/// we will merge all temporary heads and the graph head, and then commit the
18/// result as the new graph head.
19pub struct Transaction<SP: StorageProvider, PS> {
20    /// The ID of the associated graph
21    graph_id: GraphId,
22    /// Current working perspective
23    perspective: Option<SP::Perspective>,
24    /// Head of the current perspective
25    phead: Option<CmdId>,
26    /// Written but not committed heads
27    heads: BTreeMap<CmdId, Location>,
28    /// Tag for associated policy store
29    policy_store: PhantomData<PS>,
30}
31
32impl<SP: StorageProvider, PS> Transaction<SP, PS> {
33    pub(super) const fn new(graph_id: GraphId) -> Self {
34        Self {
35            graph_id,
36            perspective: None,
37            phead: None,
38            heads: BTreeMap::new(),
39            policy_store: PhantomData,
40        }
41    }
42}
43
44impl<SP: StorageProvider, PS: PolicyStore> Transaction<SP, PS> {
45    /// Returns the transaction's graph id.
46    pub fn graph_id(&self) -> GraphId {
47        self.graph_id
48    }
49
50    /// Find a given id if reachable within this transaction.
51    ///
52    /// Does not search `self.perspective`, which should be written out beforehand.
53    fn locate(
54        &self,
55        storage: &mut SP::Storage,
56        address: Address,
57    ) -> Result<Option<Location>, ClientError> {
58        // Search from committed head.
59        if let Some(found) = storage.get_location(address)? {
60            return Ok(Some(found));
61        }
62        // Search from our temporary heads.
63        for &head in self.heads.values() {
64            if let Some(found) = storage.get_location_from(head, address)? {
65                return Ok(Some(found));
66            }
67        }
68        Ok(None)
69    }
70
71    /// Write current perspective, merge transaction heads, and commit to graph.
72    pub(super) fn commit(
73        &mut self,
74        provider: &mut SP,
75        policy_store: &mut PS,
76        sink: &mut impl Sink<PS::Effect>,
77    ) -> Result<(), ClientError> {
78        let storage = provider.get_storage(self.graph_id)?;
79
80        // Write out current perspective.
81        if let Some(p) = Option::take(&mut self.perspective) {
82            self.phead = None;
83            let segment = storage.write(p)?;
84            self.heads
85                .insert(segment.head_id(), segment.head_location()?);
86        }
87
88        // Merge heads pairwise until single head left, then commit.
89        // TODO(#370): Merge deterministically
90        let mut heads: VecDeque<_> = mem::take(&mut self.heads).into_iter().collect();
91        let mut merging_head = false;
92        while let Some((left_id, mut left_loc)) = heads.pop_front() {
93            if let Some((right_id, mut right_loc)) = heads.pop_front() {
94                let (policy, policy_id) =
95                    choose_policy(storage, policy_store, left_loc, right_loc)?;
96
97                let mut buffer = [0u8; MAX_COMMAND_LENGTH];
98                let merge_ids = MergeIds::new(
99                    Address {
100                        id: left_id,
101                        max_cut: left_loc.max_cut,
102                    },
103                    Address {
104                        id: right_id,
105                        max_cut: right_loc.max_cut,
106                    },
107                )
108                .assume("merging different ids")?;
109                if left_id > right_id {
110                    mem::swap(&mut left_loc, &mut right_loc);
111                }
112                let command = policy.merge(&mut buffer, merge_ids)?;
113
114                let (braid, last_common_ancestor) =
115                    make_braid_segment::<_, PS>(storage, left_loc, right_loc, sink, policy)?;
116
117                let mut perspective = storage.new_merge_perspective(
118                    left_loc,
119                    right_loc,
120                    last_common_ancestor,
121                    policy_id,
122                    braid,
123                )?;
124                perspective.add_command(&command)?;
125
126                let segment = storage.write(perspective)?;
127                heads.push_back((segment.head_id(), segment.head_location()?));
128            } else {
129                let segment = storage.get_segment(left_loc)?;
130                // Try to commit. If it fails with `HeadNotAncestor`, we know we
131                // need to merge with the graph head.
132                match storage.commit(segment) {
133                    Ok(()) => break,
134                    Err(StorageError::HeadNotAncestor) => {
135                        if merging_head {
136                            bug!("merging with graph head again, would loop");
137                        }
138
139                        merging_head = true;
140
141                        heads.push_back((left_id, left_loc));
142
143                        let head_loc = storage.get_head()?;
144                        let segment = storage.get_segment(head_loc)?;
145                        heads.push_back((segment.head_id(), segment.head_location()?));
146                    }
147                    Err(e) => return Err(e.into()),
148                }
149            }
150        }
151
152        Ok(())
153    }
154
155    /// Attempt to store the `command` in the graph with `graph_id`. Effects will be
156    /// emitted to the `sink`. This interface is used when syncing with another device
157    /// and integrating the new commands.
158    pub(super) fn add_commands(
159        &mut self,
160        commands: &[impl Command],
161        provider: &mut SP,
162        policy_store: &mut PS,
163        sink: &mut impl Sink<PS::Effect>,
164    ) -> Result<usize, ClientError> {
165        let mut commands = commands.iter();
166        let mut count: usize = 0;
167
168        // Get storage or try to initialize with first command.
169        let storage = match provider.get_storage(self.graph_id) {
170            Ok(s) => s,
171            Err(StorageError::NoSuchStorage) => {
172                let command = commands.next().ok_or(ClientError::InitError)?;
173                count = count.checked_add(1).assume("must not overflow")?;
174                self.init(command, policy_store, provider, sink)?
175            }
176            Err(e) => return Err(e.into()),
177        };
178
179        // Handle remaining commands.
180        for command in commands {
181            if self
182                .perspective
183                .as_ref()
184                .is_some_and(|p| p.includes(command.id()))
185            {
186                // Command in current perspective.
187                continue;
188            }
189
190            if self.locate(storage, command.address()?)?.is_some() {
191                // Command already added.
192                continue;
193            }
194            match command.parent() {
195                Prior::None => {
196                    if command.id().as_base() == self.graph_id.as_base() {
197                        // Graph already initialized, extra init just spurious
198                    } else {
199                        bug!("init command does not belong in graph");
200                    }
201                }
202                Prior::Single(parent) => {
203                    self.add_single(storage, policy_store, sink, command, parent)?;
204                    count = count.checked_add(1).assume("must not overflow")?;
205                }
206                Prior::Merge(left, right) => {
207                    self.add_merge(storage, policy_store, sink, command, left, right)?;
208                    count = count.checked_add(1).assume("must not overflow")?;
209                }
210            }
211        }
212
213        Ok(count)
214    }
215
216    fn add_single(
217        &mut self,
218        storage: &mut <SP as StorageProvider>::Storage,
219        policy_store: &mut PS,
220        sink: &mut impl Sink<PS::Effect>,
221        command: &impl Command,
222        parent: Address,
223    ) -> Result<(), ClientError> {
224        let perspective = self.get_perspective(parent, storage)?;
225
226        let policy_id = perspective.policy();
227        let policy = policy_store.get_policy(policy_id)?;
228
229        // Try to run command, or revert if failed.
230        sink.begin();
231        let checkpoint = perspective.checkpoint();
232        if let Err(e) = policy.call_rule(
233            command,
234            perspective,
235            sink,
236            CommandPlacement::OnGraphAtOrigin,
237        ) {
238            perspective.revert(checkpoint)?;
239            sink.rollback();
240            return Err(e.into());
241        }
242        perspective.add_command(command)?;
243        sink.commit();
244
245        self.phead = Some(command.id());
246
247        Ok(())
248    }
249
250    fn add_merge(
251        &mut self,
252        storage: &mut <SP as StorageProvider>::Storage,
253        policy_store: &mut PS,
254        sink: &mut impl Sink<PS::Effect>,
255        command: &impl Command,
256        left: Address,
257        right: Address,
258    ) -> Result<bool, ClientError> {
259        // Must always start a new perspective for merges.
260        if let Some(p) = Option::take(&mut self.perspective) {
261            let seg = storage.write(p)?;
262            self.heads.insert(seg.head_id(), seg.head_location()?);
263        }
264
265        let left_loc = self
266            .locate(storage, left)?
267            .ok_or(ClientError::NoSuchParent(left.id))?;
268        let right_loc = self
269            .locate(storage, right)?
270            .ok_or(ClientError::NoSuchParent(right.id))?;
271
272        let (policy, policy_id) = choose_policy(storage, policy_store, left_loc, right_loc)?;
273
274        // Braid commands from left and right into an ordered sequence.
275        let (braid, last_common_ancestor) =
276            make_braid_segment::<_, PS>(storage, left_loc, right_loc, sink, policy)?;
277
278        let mut perspective = storage.new_merge_perspective(
279            left_loc,
280            right_loc,
281            last_common_ancestor,
282            policy_id,
283            braid,
284        )?;
285        perspective.add_command(command)?;
286
287        // These are no longer heads of the transaction, since they are both covered by the merge
288        self.heads.remove(&left.id);
289        self.heads.remove(&right.id);
290
291        self.perspective = Some(perspective);
292        self.phead = Some(command.id());
293
294        Ok(true)
295    }
296
297    /// Get a perspective to which we can add a command with the given parant.
298    ///
299    /// If parent is the head of the current perspective, we can just use it.
300    /// Otherwise, we must write out the perspective and get a new one.
301    fn get_perspective(
302        &mut self,
303        parent: Address,
304        storage: &mut <SP as StorageProvider>::Storage,
305    ) -> Result<&mut <SP as StorageProvider>::Perspective, ClientError> {
306        if self.phead == Some(parent.id) {
307            // Command will append to current perspective.
308            return Ok(self
309                .perspective
310                .as_mut()
311                .assume("trx has perspective when has phead")?);
312        }
313
314        // Write out the current perspective.
315        if let Some(p) = Option::take(&mut self.perspective) {
316            self.phead = None;
317            let seg = storage.write(p)?;
318            self.heads.insert(seg.head_id(), seg.head_location()?);
319        }
320
321        let loc = self
322            .locate(storage, parent)?
323            .ok_or(ClientError::NoSuchParent(parent.id))?;
324
325        // Get a new perspective and store it in the transaction.
326        let p = self
327            .perspective
328            .insert(storage.get_linear_perspective(loc)?);
329
330        self.phead = Some(parent.id);
331        self.heads.remove(&parent.id);
332
333        Ok(p)
334    }
335
336    fn init<'sp>(
337        &mut self,
338        command: &impl Command,
339        policy_store: &mut PS,
340        provider: &'sp mut SP,
341        sink: &mut impl Sink<PS::Effect>,
342    ) -> Result<&'sp mut <SP as StorageProvider>::Storage, ClientError> {
343        // Graph ID is the id of the init command by definition.
344        if self.graph_id.as_base() != command.id().as_base() {
345            return Err(ClientError::InitError);
346        }
347
348        // The init command must not have a parent.
349        if !matches!(command.parent(), Prior::None) {
350            return Err(ClientError::InitError);
351        }
352
353        // The graph must have policy to start with.
354        let Some(policy_data) = command.policy() else {
355            return Err(ClientError::InitError);
356        };
357
358        let policy_id = policy_store.add_policy(policy_data)?;
359        let policy = policy_store.get_policy(policy_id)?;
360
361        // Get an empty perspective and run the init command.
362        let mut perspective = provider.new_perspective(policy_id);
363        sink.begin();
364        if let Err(e) = policy.call_rule(
365            command,
366            &mut perspective,
367            sink,
368            CommandPlacement::OnGraphAtOrigin,
369        ) {
370            sink.rollback();
371            // We don't need to revert perspective since we just drop it.
372            return Err(e.into());
373        }
374        perspective.add_command(command)?;
375
376        let (_, storage) = provider.new_storage(perspective)?;
377
378        // Wait to commit until we are absolutely sure we've initialized.
379        sink.commit();
380
381        Ok(storage)
382    }
383}
384
385/// Run the braid algorithm and evaluate the sequence to create a braided fact index.
386fn make_braid_segment<S: Storage, PS: PolicyStore>(
387    storage: &mut S,
388    left: Location,
389    right: Location,
390    sink: &mut impl Sink<PS::Effect>,
391    policy: &PS::Policy,
392) -> Result<(S::FactIndex, Location), ClientError> {
393    let order = braiding::braid(storage, left, right)?;
394    let last_common_ancestor = braiding::last_common_ancestor(storage, left, right)?;
395
396    let (&first, rest) = order.split_first().assume("braid is non-empty")?;
397
398    let mut braid_perspective = storage.get_fact_perspective(first)?;
399
400    sink.begin();
401
402    for &location in rest {
403        let segment = storage.get_segment(location)?;
404        let command = segment
405            .get_command(location)
406            .assume("braid only contains existing commands")?;
407
408        let result = policy.call_rule(
409            &command,
410            &mut braid_perspective,
411            sink,
412            CommandPlacement::OnGraphInBraid,
413        );
414
415        // If the command failed in an uncontrolled way, rollback
416        if let Err(e) = result
417            && e != PolicyError::Check
418        {
419            sink.rollback();
420            return Err(e.into());
421        }
422    }
423
424    let braid = storage.write_facts(braid_perspective)?;
425
426    sink.commit();
427
428    Ok((braid, last_common_ancestor))
429}
430
431/// Select the policy from two locations with the greatest serial value.
432fn choose_policy<'a, PS: PolicyStore>(
433    storage: &impl Storage,
434    policy_store: &'a PS,
435    left: Location,
436    right: Location,
437) -> Result<(&'a PS::Policy, PolicyId), ClientError> {
438    Ok(core::cmp::max_by_key(
439        get_policy(storage, policy_store, left)?,
440        get_policy(storage, policy_store, right)?,
441        |(p, _)| p.serial(),
442    ))
443}
444
445fn get_policy<'a, PS: PolicyStore>(
446    storage: &impl Storage,
447    policy_store: &'a PS,
448    location: Location,
449) -> Result<(&'a PS::Policy, PolicyId), ClientError> {
450    let segment = storage.get_segment(location)?;
451    let policy_id = segment.policy();
452    let policy = policy_store.get_policy(policy_id)?;
453    Ok((policy, policy_id))
454}
455
456#[cfg(test)]
457mod test {
458    use std::collections::HashMap;
459
460    use aranya_crypto::id::{Id, IdTag};
461    use buggy::Bug;
462    use test_log::test;
463
464    use super::*;
465    use crate::{
466        ClientState, Keys, MaxCut, MergeIds, Perspective, Policy, Priority, SegmentIndex,
467        memory::MemStorageProvider,
468        policy::{ActionPlacement, CommandPlacement},
469        testing::{hash_for_testing_only, short_b58},
470    };
471
472    struct SeqPolicyStore;
473
474    /// [`SeqPolicy`] is a very simple policy which appends the id of each
475    /// command to a fact named `b"seq"`. At each point in the graph, the value
476    /// of this fact should be equal to the ids in braid order of all facts up
477    /// to that point.
478    struct SeqPolicy;
479
480    struct SeqCommand {
481        id: CmdId,
482        prior: Prior<Address>,
483        finalize: bool,
484        data: Box<str>,
485        max_cut: MaxCut,
486    }
487
488    impl PolicyStore for SeqPolicyStore {
489        type Policy = SeqPolicy;
490        type Effect = ();
491
492        fn add_policy(&mut self, _policy: &[u8]) -> Result<PolicyId, PolicyError> {
493            Ok(PolicyId::new(0))
494        }
495
496        fn get_policy(&self, _id: PolicyId) -> Result<&Self::Policy, PolicyError> {
497            Ok(&SeqPolicy)
498        }
499    }
500
501    impl Policy for SeqPolicy {
502        type Action<'a> = &'a str;
503        type Effect = ();
504        type Command<'a> = SeqCommand;
505
506        fn serial(&self) -> u32 {
507            0
508        }
509
510        fn call_rule(
511            &self,
512            command: &impl Command,
513            facts: &mut impl crate::FactPerspective,
514            _sink: &mut impl Sink<Self::Effect>,
515            _placement: CommandPlacement,
516        ) -> Result<(), PolicyError> {
517            assert!(
518                !matches!(command.parent(), Prior::Merge { .. }),
519                "merges shouldn't be evaluated"
520            );
521
522            // For init and basic commands, append the id to the seq fact.
523            let data = command.bytes();
524            if let Some(seq) = facts
525                .query("seq", &Keys::default())
526                .assume("can query")?
527                .as_deref()
528            {
529                facts.insert(
530                    "seq".into(),
531                    Keys::default(),
532                    [seq, b":", data].concat().into(),
533                );
534            } else {
535                facts.insert("seq".into(), Keys::default(), data.into());
536            }
537            Ok(())
538        }
539
540        fn call_action(
541            &self,
542            _action: Self::Action<'_>,
543            _facts: &mut impl Perspective,
544            _sink: &mut impl Sink<Self::Effect>,
545            _placement: ActionPlacement,
546        ) -> Result<(), PolicyError> {
547            unimplemented!()
548        }
549
550        fn merge<'a>(
551            &self,
552            _target: &'a mut [u8],
553            ids: MergeIds,
554        ) -> Result<Self::Command<'a>, PolicyError> {
555            let (left, right): (Address, Address) = ids.into();
556            let parents = [*left.id.as_array(), *right.id.as_array()];
557            let id = hash_for_testing_only(parents.as_flattened());
558
559            Ok(SeqCommand::new(
560                id,
561                Prior::Merge(left, right),
562                left.max_cut
563                    .max(right.max_cut)
564                    .checked_add(1)
565                    .assume("must not overflow")?,
566            ))
567        }
568    }
569
570    impl SeqCommand {
571        fn new(id: CmdId, prior: Prior<Address>, max_cut: MaxCut) -> Self {
572            let data = short_b58(id).into_boxed_str();
573            Self {
574                id,
575                prior,
576                finalize: false,
577                data,
578                max_cut,
579            }
580        }
581
582        fn finalize(id: CmdId, prev: Address, max_cut: MaxCut) -> Self {
583            let data = short_b58(id).into_boxed_str();
584            Self {
585                id,
586                prior: Prior::Single(prev),
587                finalize: true,
588                data,
589                max_cut,
590            }
591        }
592    }
593
594    impl Command for SeqCommand {
595        fn priority(&self) -> Priority {
596            if self.finalize {
597                return Priority::Finalize;
598            }
599            match self.prior {
600                Prior::None => Priority::Init,
601                Prior::Single(_) => {
602                    // Use the last byte of the ID as priority, just so we can
603                    // properly see the effects of braiding
604                    let id = self.id.as_bytes();
605                    let priority = u32::from(*id.last().unwrap());
606                    Priority::Basic(priority)
607                }
608                Prior::Merge(_, _) => Priority::Merge,
609            }
610        }
611
612        fn id(&self) -> CmdId {
613            self.id
614        }
615
616        fn parent(&self) -> Prior<Address> {
617            self.prior
618        }
619
620        fn policy(&self) -> Option<&[u8]> {
621            // We don't actually need any policy bytes, but the
622            // transaction/storage requires it on init commands.
623            match self.prior {
624                Prior::None => Some(b""),
625                _ => None,
626            }
627        }
628
629        fn bytes(&self) -> &[u8] {
630            self.data.as_bytes()
631        }
632
633        fn max_cut(&self) -> Result<MaxCut, Bug> {
634            Ok(self.max_cut)
635        }
636    }
637
638    struct NullSink;
639    impl<Eff> Sink<Eff> for NullSink {
640        fn begin(&mut self) {}
641        fn consume(&mut self, _: Eff) {}
642        fn rollback(&mut self) {}
643        fn commit(&mut self) {}
644    }
645
646    /// [`GraphBuilder`] and the associated macro [`graph`] provide an easy way
647    /// to create a graph with a specific structure.
648    struct GraphBuilder<SP: StorageProvider> {
649        client: ClientState<SeqPolicyStore, SP>,
650        trx: Transaction<SP, SeqPolicyStore>,
651        max_cuts: HashMap<CmdId, MaxCut>,
652    }
653
654    impl<SP: StorageProvider> GraphBuilder<SP> {
655        pub fn init(
656            mut client: ClientState<SeqPolicyStore, SP>,
657            ids: &[CmdId],
658        ) -> Result<Self, ClientError> {
659            let mut trx = Transaction::new(GraphId::transmute(ids[0]));
660            let mut prior: Prior<Address> = Prior::None;
661            let mut max_cuts = HashMap::new();
662            for (max_cut, &id) in ids.iter().enumerate() {
663                let max_cut = MaxCut(max_cut);
664                let cmd = SeqCommand::new(id, prior, max_cut);
665                trx.add_commands(
666                    &[cmd],
667                    &mut client.provider,
668                    &mut client.policy_store,
669                    &mut NullSink,
670                )?;
671                max_cuts.insert(id, max_cut);
672                prior = Prior::Single(Address { id, max_cut });
673            }
674            Ok(Self {
675                client,
676                trx,
677                max_cuts,
678            })
679        }
680
681        fn get_addr(&self, id: CmdId) -> Address {
682            Address {
683                id,
684                max_cut: self.max_cuts[&id],
685            }
686        }
687
688        pub fn line(&mut self, prev: CmdId, ids: &[CmdId]) -> Result<(), ClientError> {
689            let mut prev = self.get_addr(prev);
690            for &id in ids {
691                let max_cut = prev.max_cut.checked_add(1).unwrap();
692                let cmd = SeqCommand::new(id, Prior::Single(prev), max_cut);
693                self.trx.add_commands(
694                    &[cmd],
695                    &mut self.client.provider,
696                    &mut self.client.policy_store,
697                    &mut NullSink,
698                )?;
699                self.max_cuts.insert(id, max_cut);
700                prev = Address { id, max_cut };
701            }
702            Ok(())
703        }
704
705        pub fn finalize(&mut self, prev: CmdId, id: CmdId) -> Result<(), ClientError> {
706            let prev = self.get_addr(prev);
707            let max_cut = prev.max_cut.checked_add(1).unwrap();
708            let cmd = SeqCommand::finalize(id, prev, max_cut);
709            self.trx.add_commands(
710                &[cmd],
711                &mut self.client.provider,
712                &mut self.client.policy_store,
713                &mut NullSink,
714            )?;
715            self.max_cuts.insert(id, max_cut);
716            Ok(())
717        }
718
719        pub fn merge(
720            &mut self,
721            (left, right): (CmdId, CmdId),
722            ids: &[CmdId],
723        ) -> Result<(), ClientError> {
724            let prior = Prior::Merge(self.get_addr(left), self.get_addr(right));
725            let mergecmd = SeqCommand::new(ids[0], prior, prior.next_max_cut().unwrap());
726            let mut prev = Address {
727                id: mergecmd.id,
728                max_cut: mergecmd.max_cut,
729            };
730            self.max_cuts.insert(mergecmd.id, mergecmd.max_cut);
731            self.trx.add_commands(
732                &[mergecmd],
733                &mut self.client.provider,
734                &mut self.client.policy_store,
735                &mut NullSink,
736            )?;
737            for &id in &ids[1..] {
738                let cmd = SeqCommand::new(
739                    id,
740                    Prior::Single(prev),
741                    prev.max_cut.checked_add(1).expect("must not overflow"),
742                );
743                prev = Address {
744                    id: cmd.id,
745                    max_cut: cmd.max_cut,
746                };
747                self.max_cuts.insert(cmd.id, cmd.max_cut);
748                self.trx.add_commands(
749                    &[cmd],
750                    &mut self.client.provider,
751                    &mut self.client.policy_store,
752                    &mut NullSink,
753                )?;
754            }
755            Ok(())
756        }
757
758        pub fn flush(&mut self) {
759            if let Some(p) = Option::take(&mut self.trx.perspective) {
760                self.trx.phead = None;
761                let seg = self
762                    .client
763                    .provider
764                    .get_storage(self.trx.graph_id)
765                    .unwrap()
766                    .write(p)
767                    .unwrap();
768                self.trx
769                    .heads
770                    .insert(seg.head_id(), seg.head_location().unwrap());
771            }
772        }
773
774        pub fn commit(&mut self) -> Result<(), ClientError> {
775            self.trx.commit(
776                &mut self.client.provider,
777                &mut self.client.policy_store,
778                &mut NullSink,
779            )
780        }
781    }
782
783    fn mkid<Tag: IdTag>(x: &str) -> Id<Tag> {
784        x.parse().unwrap()
785    }
786
787    /// See tests for usage.
788    macro_rules! graph {
789        ( $client:expr ; $init:literal $($inits:literal )* ; $($rest:tt)*) => {{
790            let mut gb = GraphBuilder::init($client, &[mkid($init), $(mkid($inits)),*]).unwrap();
791            graph!(@ gb, $($rest)*);
792            gb
793        }};
794        (@ $gb:ident, $prev:literal < $($id:literal)+; $($rest:tt)*) => {
795            $gb.line(mkid($prev), &[$(mkid($id)),+]).unwrap();
796            graph!(@ $gb, $($rest)*);
797        };
798        (@ $gb:ident, $l:literal $r:literal < $($id:literal)+; $($rest:tt)*) => {
799            $gb.merge((mkid($l), mkid($r)), &[$(mkid($id)),+]).unwrap();
800            graph!(@ $gb, $($rest)*);
801        };
802        (@ $gb:ident, $prev:literal < finalize $id:literal; $($rest:tt)*) => {
803            $gb.finalize(mkid($prev), mkid($id)).unwrap();
804            graph!(@ $gb, $($rest)*);
805        };
806        (@ $gb:ident, commit; $($rest:tt)*) => {
807            $gb.commit().unwrap();
808            graph!(@ $gb, $($rest)*);
809        };
810        (@ $gb:ident, ) => {
811            $gb.flush();
812        };
813    }
814
815    fn lookup(storage: &impl Storage, name: &str) -> Option<Box<[u8]>> {
816        use crate::Query as _;
817        let head = storage.get_head().unwrap();
818        let p = storage.get_fact_perspective(head).unwrap();
819        p.query(name, &Keys::default()).unwrap()
820    }
821
822    #[test]
823    fn test_simple() -> Result<(), StorageError> {
824        let mut gb = graph! {
825            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
826            "a";
827            "a" < "b";
828            "a" < "c";
829            "b" "c" < "ma";
830            "b" < "d";
831            "ma" "d" < "mb";
832            commit;
833        };
834        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
835
836        #[cfg(feature = "graphviz")]
837        crate::storage::memory::graphviz::dot(g, "simple");
838
839        assert_eq!(
840            g.get_head().unwrap(),
841            Location::new(SegmentIndex(5), MaxCut(3))
842        );
843
844        let seq = lookup(g, "seq").unwrap();
845        let seq = std::str::from_utf8(&seq).unwrap();
846        assert_eq!(seq, "a:b:d:c");
847
848        Ok(())
849    }
850
851    #[test]
852    fn test_complex() -> Result<(), StorageError> {
853        let mut gb = graph! {
854            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
855            "a";
856            "a" < "1" "2" "3";
857            "3" < "4" "6" "7";
858            "3" < "5" "8";
859            "6" "8" < "9" "aa"; commit;
860            "7" < "a1" "a2";
861            "aa" "a2" < "a3";
862            "a3" < "a6" "a4";
863            "a3" < "a7" "a5";
864            "a4" "a5" < "a8";
865            "9" < "42" "43";
866            "42" < "45" "46";
867            "45" < "47" "48";
868            commit;
869        };
870
871        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
872
873        #[cfg(feature = "graphviz")]
874        crate::storage::memory::graphviz::dot(g, "complex");
875
876        assert_eq!(
877            g.get_head().unwrap(),
878            Location::new(SegmentIndex(15), MaxCut(15))
879        );
880
881        let seq = lookup(g, "seq").unwrap();
882        let seq = std::str::from_utf8(&seq).unwrap();
883        assert_eq!(
884            seq,
885            "a:1:2:3:5:8:4:6:42:45:47:48:46:43:aa:7:a1:a2:a7:a6:a5:a4"
886        );
887
888        Ok(())
889    }
890
891    #[test]
892    fn test_duplicates() {
893        let mut gb = graph! {
894            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
895            "a";
896            "a" < "b" "c";
897            "a" < "b";
898            "b" < "c";
899            "c" < "d";
900            commit;
901            "a" < "b";
902            "b" < "c";
903            "d" < "e";
904            commit;
905        };
906
907        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
908
909        #[cfg(feature = "graphviz")]
910        crate::storage::memory::graphviz::dot(g, "duplicates");
911
912        assert_eq!(
913            g.get_head().unwrap(),
914            Location::new(SegmentIndex(2), MaxCut(4))
915        );
916
917        let seq = lookup(g, "seq").unwrap();
918        let seq = std::str::from_utf8(&seq).unwrap();
919        assert_eq!(seq, "a:b:c:d:e");
920    }
921
922    #[test]
923    fn test_mid_braid_1() {
924        let mut gb = graph! {
925            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
926            "a";
927            commit;
928            "a" < "b" "c" "d" "e" "f" "g";
929            "d" < "h" "i" "j";
930            commit;
931        };
932
933        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
934
935        #[cfg(feature = "graphviz")]
936        crate::storage::memory::graphviz::dot(g, "mid_braid_1");
937
938        assert_eq!(
939            g.get_head().unwrap(),
940            Location::new(SegmentIndex(3), MaxCut(7))
941        );
942
943        let seq = lookup(g, "seq").unwrap();
944        let seq = std::str::from_utf8(&seq).unwrap();
945        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
946    }
947
948    #[test]
949    fn test_mid_braid_2() {
950        let mut gb = graph! {
951            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
952            "a";
953            commit;
954            "a" < "b" "c" "d" "h" "i" "j";
955            "d" < "e" "f" "g";
956            commit;
957        };
958
959        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
960
961        #[cfg(feature = "graphviz")]
962        crate::storage::memory::graphviz::dot(g, "mid_braid_2");
963
964        assert_eq!(
965            g.get_head().unwrap(),
966            Location::new(SegmentIndex(3), MaxCut(7))
967        );
968
969        let seq = lookup(g, "seq").unwrap();
970        let seq = std::str::from_utf8(&seq).unwrap();
971        assert_eq!(seq, "a:b:c:d:h:i:j:e:f:g");
972    }
973
974    #[test]
975    fn test_sequential_finalize() {
976        let mut gb = graph! {
977            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
978            "a";
979            commit;
980            "a" < "b" "c" "d" "e" "f" "g";
981            "d" < "h" "i" "j";
982            "e" < finalize "fff1";
983            "fff1" < "x" "y";
984            "y" < finalize "fff2";
985            commit;
986        };
987
988        let g = gb.client.provider.get_storage(mkid("a")).unwrap();
989
990        #[cfg(feature = "graphviz")]
991        crate::storage::memory::graphviz::dot(g, "finalize_success");
992
993        assert_eq!(
994            g.get_head().unwrap(),
995            Location::new(SegmentIndex(5), MaxCut(9))
996        );
997
998        let seq = lookup(g, "seq").unwrap();
999        let seq = std::str::from_utf8(&seq).unwrap();
1000        assert_eq!(seq, "a:b:c:d:e:fff1:x:y:fff2:h:i:j:f:g");
1001    }
1002
1003    #[test]
1004    fn test_parallel_finalize() {
1005        let mut gb = graph! {
1006            ClientState::new(SeqPolicyStore, MemStorageProvider::new());
1007            "a";
1008            commit;
1009            "a" < "b" "c" "d" "e" "f" "g";
1010            "d" < "h" "i" "j";
1011            "e" < finalize "fff1";
1012            "i" < finalize "fff2";
1013        };
1014        let err = gb.commit().expect_err("merge should fail");
1015        assert!(matches!(err, ClientError::ParallelFinalize), "{err:?}");
1016    }
1017}