Skip to main content

link_cli/transactions/
mod.rs

1//! Optional transactions layer for the Rust link-cli.
2//!
3//! Mirrors the C# `TransactionsDecorator` in
4//! `csharp/Foundation.Data.Doublets.Cli.Library/TransactionsDecorator.cs`.
5//!
6//! The decorator wraps a [`NamedTypesDecorator`] and records every
7//! `create` / `update` / `delete` as a reversible [`Transition`] in a
8//! sidecar doublets log store. Supports explicit transactions, sync
9//! commits, three retention policies, and crash recovery (R1-R7, R10).
10//!
11//! Optional — when not opted in, the bare [`NamedTypesDecorator`]
12//! behaves identically (R8, R9, R17).
13
14mod types;
15
16use std::collections::HashSet;
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::{SystemTime, UNIX_EPOCH};
20
21use anyhow::{anyhow, bail, Context, Result};
22
23use crate::link::Link;
24use crate::named_types::{NamedTypes, NamedTypesDecorator};
25
26pub use types::{CommitMode, DoubletLink, LogRetentionPolicy, Transition, TransitionKind};
27use types::{
28    APPLIED_MARKER_PREFIX, COMMIT_MARKER_PREFIX, ROLLBACK_MARKER_PREFIX, TRANSITION_NAME_PREFIX,
29};
30
31/// Pending state of a transaction (used by the explicit transaction
32/// handle and by per-write auto-transactions).
33struct PendingTransaction {
34    id: u128,
35    transitions: Vec<Transition>,
36    auto_commit: bool,
37    started_ms: i64,
38}
39
40/// Snapshot of an open transaction (returned by [`TransactionsDecorator::begin_transaction`]).
41#[derive(Debug, Clone)]
42pub struct TransactionHandle {
43    pub id: u128,
44    pub started_ms: i64,
45}
46
47/// The transactions decorator wraps a [`NamedTypesDecorator`] and
48/// records every write as a reversible [`Transition`] in `log_store`.
49pub struct TransactionsDecorator {
50    inner: NamedTypesDecorator,
51    log_store: NamedTypesDecorator,
52    log: Vec<Transition>,
53    committed: HashSet<u128>,
54    rolled_back: HashSet<u128>,
55    applied: HashSet<i64>,
56    current: Option<PendingTransaction>,
57    sequence_counter: i64,
58    applied_sequence: i64,
59    retention_policy: LogRetentionPolicy,
60    commit_mode: CommitMode,
61    replaying: bool,
62    trace: bool,
63}
64
65impl TransactionsDecorator {
66    /// Creates a new transactions decorator wrapping `inner`, using
67    /// `log_store` as the sidecar log store.
68    pub fn new(
69        inner: NamedTypesDecorator,
70        log_store: NamedTypesDecorator,
71        retention_policy: LogRetentionPolicy,
72        commit_mode: CommitMode,
73        trace: bool,
74    ) -> Result<Self> {
75        let mut decorator = Self {
76            inner,
77            log_store,
78            log: Vec::new(),
79            committed: HashSet::new(),
80            rolled_back: HashSet::new(),
81            applied: HashSet::new(),
82            current: None,
83            sequence_counter: 0,
84            applied_sequence: 0,
85            retention_policy,
86            commit_mode,
87            replaying: false,
88            trace,
89        };
90        decorator.recover()?;
91        Ok(decorator)
92    }
93
94    /// Conventional sidecar filename for the transitions log.
95    pub fn make_transitions_database_filename<P: AsRef<Path>>(database_filename: P) -> PathBuf {
96        let path = database_filename.as_ref();
97        let stem = path
98            .file_stem()
99            .and_then(|s| s.to_str())
100            .unwrap_or_default();
101        let name = format!("{stem}.transitions.links");
102        match path.parent() {
103            Some(parent) if !parent.as_os_str().is_empty() => parent.join(name),
104            _ => PathBuf::from(name),
105        }
106    }
107
108    pub fn retention_policy(&self) -> &LogRetentionPolicy {
109        &self.retention_policy
110    }
111
112    pub fn set_retention_policy(&mut self, policy: LogRetentionPolicy) {
113        self.retention_policy = policy;
114    }
115
116    pub fn commit_mode(&self) -> CommitMode {
117        self.commit_mode
118    }
119
120    pub fn set_commit_mode(&mut self, mode: CommitMode) {
121        self.commit_mode = mode;
122    }
123
124    pub fn applied_sequence(&self) -> i64 {
125        self.applied_sequence
126    }
127
128    pub fn last_logged_sequence(&self) -> i64 {
129        self.sequence_counter
130    }
131
132    /// Returns a snapshot of the transitions log in sequence order.
133    pub fn log(&self) -> Vec<Transition> {
134        self.log.clone()
135    }
136
137    pub fn inner(&self) -> &NamedTypesDecorator {
138        &self.inner
139    }
140
141    pub fn inner_mut(&mut self) -> &mut NamedTypesDecorator {
142        &mut self.inner
143    }
144
145    pub fn log_store(&self) -> &NamedTypesDecorator {
146        &self.log_store
147    }
148
149    pub fn log_store_mut(&mut self) -> &mut NamedTypesDecorator {
150        &mut self.log_store
151    }
152
153    pub fn into_inner(self) -> (NamedTypesDecorator, NamedTypesDecorator) {
154        (self.inner, self.log_store)
155    }
156
157    pub fn save(&self) -> Result<()> {
158        self.inner.save()?;
159        self.log_store.save()?;
160        Ok(())
161    }
162
163    // ----- Write API ------------------------------------------------------
164
165    pub fn create(&mut self, source: u32, target: u32) -> Result<u32> {
166        if self.replaying {
167            return Ok(self.inner.create(source, target));
168        }
169        let owns = self.ensure_open_transaction();
170        let id = self.inner.create(source, target);
171        let after = self
172            .inner
173            .get(id)
174            .map(DoubletLink::from_link)
175            .unwrap_or_else(|| DoubletLink::new(id, source, target));
176        self.record_transition(TransitionKind::Create, DoubletLink::empty(), after)?;
177        if owns {
178            self.commit_current()?;
179        }
180        Ok(id)
181    }
182
183    pub fn update(&mut self, id: u32, source: u32, target: u32) -> Result<Link> {
184        if self.replaying {
185            return self.inner.update(id, source, target);
186        }
187        let before = self
188            .inner
189            .get(id)
190            .map(DoubletLink::from_link)
191            .unwrap_or_else(|| DoubletLink::new(id, 0, 0));
192        let owns = self.ensure_open_transaction();
193        let prev = match self.inner.update(id, source, target) {
194            Ok(prev) => prev,
195            Err(err) => {
196                if owns {
197                    self.current = None;
198                }
199                return Err(err);
200            }
201        };
202        let after = self
203            .inner
204            .get(id)
205            .map(DoubletLink::from_link)
206            .unwrap_or_else(|| DoubletLink::new(id, source, target));
207        self.record_transition(TransitionKind::Update, before, after)?;
208        if owns {
209            self.commit_current()?;
210        }
211        Ok(prev)
212    }
213
214    pub fn delete(&mut self, id: u32) -> Result<Link> {
215        if self.replaying {
216            return self.inner.delete(id);
217        }
218        let before = self
219            .inner
220            .get(id)
221            .map(DoubletLink::from_link)
222            .unwrap_or_else(|| DoubletLink::new(id, 0, 0));
223        let owns = self.ensure_open_transaction();
224        let deleted = match self.inner.delete(id) {
225            Ok(d) => d,
226            Err(err) => {
227                if owns {
228                    self.current = None;
229                }
230                return Err(err);
231            }
232        };
233        self.record_transition(TransitionKind::Delete, before, DoubletLink::empty())?;
234        if owns {
235            self.commit_current()?;
236        }
237        Ok(deleted)
238    }
239
240    /// Composite create-and-update used by callers that want a link
241    /// initialised with source/target in a single pair of transitions
242    /// (matches the C# `CreateAndUpdate` extension semantics, which
243    /// always emits a Create followed by an Update transition).
244    pub fn create_and_update(&mut self, source: u32, target: u32) -> Result<u32> {
245        let owns = self.ensure_open_transaction();
246        let id = self.create(0, 0)?;
247        self.update(id, source, target)?;
248        if owns {
249            self.commit_current()?;
250        }
251        Ok(id)
252    }
253
254    pub fn exists(&self, id: u32) -> bool {
255        self.inner.exists(id)
256    }
257
258    pub fn get(&self, id: u32) -> Option<&Link> {
259        self.inner.get(id)
260    }
261
262    pub fn all(&self) -> Vec<&Link> {
263        self.inner.all()
264    }
265
266    pub fn query(
267        &self,
268        index: Option<u32>,
269        source: Option<u32>,
270        target: Option<u32>,
271    ) -> Vec<&Link> {
272        self.inner.query(index, source, target)
273    }
274
275    pub fn search(&self, source: u32, target: u32) -> Option<u32> {
276        self.inner.search(source, target)
277    }
278
279    pub fn get_or_create(&mut self, source: u32, target: u32) -> Result<u32> {
280        if let Some(existing) = self.inner.search(source, target) {
281            return Ok(existing);
282        }
283        self.create(source, target)
284    }
285
286    pub fn ensure_created(&mut self, id: u32) -> u32 {
287        // ensure_created is used by recovery/replay only and is not
288        // itself a logical write; bypass transition recording.
289        self.inner.ensure_created(id)
290    }
291
292    fn ensure_open_transaction(&mut self) -> bool {
293        if self.current.is_none() {
294            self.current = Some(PendingTransaction {
295                id: new_transaction_id(),
296                transitions: Vec::new(),
297                auto_commit: true,
298                started_ms: now_unix_ms(),
299            });
300            true
301        } else {
302            false
303        }
304    }
305
306    fn record_transition(
307        &mut self,
308        kind: TransitionKind,
309        before: DoubletLink,
310        after: DoubletLink,
311    ) -> Result<()> {
312        self.sequence_counter += 1;
313        let sequence = self.sequence_counter;
314        let timestamp_ms = now_unix_ms();
315        let transaction_id = self.current.as_ref().map(|tx| tx.id).ok_or_else(|| {
316            anyhow!("internal: missing open transaction while recording transition")
317        })?;
318        let transition = Transition {
319            transaction_id,
320            sequence,
321            timestamp_ms,
322            kind,
323            before,
324            after,
325        };
326        if let Some(current) = self.current.as_mut() {
327            current.transitions.push(transition);
328        }
329        self.log.push(transition);
330        self.write_transition_to_log(&transition)?;
331        if self.trace {
332            eprintln!(
333                "[Transactions] Recorded {:?} seq={} tx={:032x}: ({},{},{}) -> ({},{},{}).",
334                kind,
335                sequence,
336                transaction_id,
337                before.index,
338                before.source,
339                before.target,
340                after.index,
341                after.source,
342                after.target,
343            );
344        }
345        Ok(())
346    }
347
348    fn write_transition_to_log(&mut self, transition: &Transition) -> Result<()> {
349        // Always allocate a fresh link so each transition has its own
350        // log entry (mirrors C# `CreateAndUpdate(Null, Null)`).
351        let link = self.log_store.create(0, 0);
352        let name = format!("{TRANSITION_NAME_PREFIX}{}", transition.serialize());
353        self.log_store.set_name(link, &name)?;
354        Ok(())
355    }
356
357    fn write_marker(&mut self, name: &str) -> Result<()> {
358        // Always allocate a fresh link so markers do not overwrite one
359        // another (mirrors C# `CreateAndUpdate(Null, Null)`).
360        let link = self.log_store.create(0, 0);
361        self.log_store.set_name(link, name)?;
362        Ok(())
363    }
364
365    // ----- Transaction handle --------------------------------------------
366
367    pub fn begin_transaction(&mut self) -> Result<TransactionHandle> {
368        if self.current.is_some() {
369            bail!("Nested transactions are not supported.");
370        }
371        let id = new_transaction_id();
372        let started_ms = now_unix_ms();
373        self.current = Some(PendingTransaction {
374            id,
375            transitions: Vec::new(),
376            auto_commit: false,
377            started_ms,
378        });
379        Ok(TransactionHandle { id, started_ms })
380    }
381
382    pub fn commit(&mut self) -> Result<()> {
383        if self.current.is_none() {
384            return Ok(());
385        }
386        self.commit_current()
387    }
388
389    fn commit_current(&mut self) -> Result<()> {
390        let pending = match self.current.take() {
391            Some(p) => p,
392            None => return Ok(()),
393        };
394        self.committed.insert(pending.id);
395        self.write_marker(&format!("{COMMIT_MARKER_PREFIX}{:032x}", pending.id))?;
396        if self.trace {
397            eprintln!(
398                "[Transactions] Committed tx {:032x} (mode={:?}, transitions={}).",
399                pending.id,
400                self.commit_mode,
401                pending.transitions.len()
402            );
403        }
404        for transition in &pending.transitions {
405            self.mark_applied(transition)?;
406        }
407        let _ = pending.auto_commit;
408        let _ = pending.started_ms;
409        self.enforce_retention()?;
410        Ok(())
411    }
412
413    pub fn rollback(&mut self) -> Result<()> {
414        let pending = match self.current.take() {
415            Some(p) => p,
416            None => return Ok(()),
417        };
418        self.rolled_back.insert(pending.id);
419        self.replaying = true;
420        for transition in pending.transitions.iter().rev() {
421            self.try_revert_transition(transition);
422        }
423        self.replaying = false;
424        self.write_marker(&format!("{ROLLBACK_MARKER_PREFIX}{:032x}", pending.id))?;
425        if self.trace {
426            eprintln!(
427                "[Transactions] Rolled back tx {:032x} ({} transitions).",
428                pending.id,
429                pending.transitions.len(),
430            );
431        }
432        self.enforce_retention()?;
433        Ok(())
434    }
435
436    /// Public helper for higher-level decorators (e.g. version control)
437    /// — applies a single transition without writing a new log entry.
438    pub fn apply_transition(&mut self, transition: &Transition) {
439        self.replaying = true;
440        self.try_apply_transition(transition, false);
441        self.replaying = false;
442    }
443
444    /// Public helper for higher-level decorators (e.g. version control)
445    /// — reverts a single transition without writing a new log entry.
446    pub fn revert_transition(&mut self, transition: &Transition) {
447        self.replaying = true;
448        self.try_revert_transition(transition);
449        self.replaying = false;
450    }
451
452    fn try_apply_transition(&mut self, transition: &Transition, record_applied: bool) {
453        let result: Result<()> = match transition.kind {
454            TransitionKind::Create => {
455                if transition.after.index != 0 && !self.inner.exists(transition.after.index) {
456                    self.inner.ensure_created(transition.after.index);
457                    self.inner
458                        .update(
459                            transition.after.index,
460                            transition.after.source,
461                            transition.after.target,
462                        )
463                        .map(|_| ())
464                } else {
465                    Ok(())
466                }
467            }
468            TransitionKind::Update => {
469                if transition.after.index != 0 && self.inner.exists(transition.after.index) {
470                    self.inner
471                        .update(
472                            transition.after.index,
473                            transition.after.source,
474                            transition.after.target,
475                        )
476                        .map(|_| ())
477                } else {
478                    Ok(())
479                }
480            }
481            TransitionKind::Delete => {
482                if transition.before.index != 0 && self.inner.exists(transition.before.index) {
483                    self.inner.delete(transition.before.index).map(|_| ())
484                } else {
485                    Ok(())
486                }
487            }
488        };
489        if let Err(e) = result {
490            if self.trace {
491                eprintln!(
492                    "[Transactions] Failed to apply transition seq={}: {e}",
493                    transition.sequence
494                );
495            }
496        }
497        if record_applied {
498            let _ = self.mark_applied(transition);
499        }
500    }
501
502    fn try_revert_transition(&mut self, transition: &Transition) {
503        let result = match transition.kind {
504            TransitionKind::Create => {
505                if transition.after.index != 0 && self.inner.exists(transition.after.index) {
506                    self.inner.delete(transition.after.index).map(|_| ())
507                } else {
508                    Ok(())
509                }
510            }
511            TransitionKind::Update => {
512                if transition.before.index != 0 && self.inner.exists(transition.before.index) {
513                    self.inner
514                        .update(
515                            transition.before.index,
516                            transition.before.source,
517                            transition.before.target,
518                        )
519                        .map(|_| ())
520                } else {
521                    Ok(())
522                }
523            }
524            TransitionKind::Delete => {
525                if transition.before.index != 0 && !self.inner.exists(transition.before.index) {
526                    self.inner.ensure_created(transition.before.index);
527                    self.inner
528                        .update(
529                            transition.before.index,
530                            transition.before.source,
531                            transition.before.target,
532                        )
533                        .map(|_| ())
534                } else {
535                    Ok(())
536                }
537            }
538        };
539        if let Err(e) = result {
540            if self.trace {
541                eprintln!(
542                    "[Transactions] Failed to revert transition seq={}: {e}",
543                    transition.sequence
544                );
545            }
546        }
547    }
548
549    fn mark_applied(&mut self, transition: &Transition) -> Result<()> {
550        if self.applied.insert(transition.sequence) {
551            self.write_marker(&format!("{APPLIED_MARKER_PREFIX}{}", transition.sequence))?;
552            if transition.sequence > self.applied_sequence {
553                self.applied_sequence = transition.sequence;
554            }
555        }
556        Ok(())
557    }
558
559    // ----- Recovery -------------------------------------------------------
560
561    /// Rebuilds the in-memory log and marker tables from the sidecar
562    /// log store and re-applies committed-but-unapplied side-effects.
563    pub fn recover(&mut self) -> Result<()> {
564        self.log.clear();
565        self.committed.clear();
566        self.rolled_back.clear();
567        self.applied.clear();
568        self.sequence_counter = 0;
569        self.applied_sequence = 0;
570
571        // Read every named link from the log store.
572        let all_links: Vec<Link> = self.log_store.all().into_iter().copied().collect();
573        for link in &all_links {
574            let name = match self.log_store.get_name(link.index)? {
575                Some(value) => value,
576                None => continue,
577            };
578            if let Some(payload) = name.strip_prefix(TRANSITION_NAME_PREFIX) {
579                if let Some(transition) = Transition::try_parse(payload) {
580                    insert_ordered(&mut self.log, transition);
581                    if transition.sequence > self.sequence_counter {
582                        self.sequence_counter = transition.sequence;
583                    }
584                }
585            } else if let Some(rest) = name.strip_prefix(COMMIT_MARKER_PREFIX) {
586                if let Ok(tx_id) = u128::from_str_radix(rest, 16) {
587                    self.committed.insert(tx_id);
588                }
589            } else if let Some(rest) = name.strip_prefix(ROLLBACK_MARKER_PREFIX) {
590                if let Ok(tx_id) = u128::from_str_radix(rest, 16) {
591                    self.rolled_back.insert(tx_id);
592                }
593            } else if let Some(rest) = name.strip_prefix(APPLIED_MARKER_PREFIX) {
594                if let Ok(seq) = rest.parse::<i64>() {
595                    self.applied.insert(seq);
596                    if seq > self.applied_sequence {
597                        self.applied_sequence = seq;
598                    }
599                }
600            }
601        }
602
603        // Re-apply committed-but-not-applied transitions (crash mid-async).
604        let log_snapshot: Vec<Transition> = self.log.clone();
605        self.replaying = true;
606        for transition in &log_snapshot {
607            if !self.committed.contains(&transition.transaction_id) {
608                continue;
609            }
610            if self.applied.contains(&transition.sequence) {
611                continue;
612            }
613            self.try_apply_transition(transition, true);
614        }
615        // Auto-rollback transitions written but never committed and never rolled back (R10).
616        let mut pending_tx_ids: Vec<u128> = Vec::new();
617        for transition in log_snapshot.iter().rev() {
618            if self.committed.contains(&transition.transaction_id) {
619                continue;
620            }
621            if self.rolled_back.contains(&transition.transaction_id) {
622                continue;
623            }
624            self.try_revert_transition(transition);
625            if !pending_tx_ids.contains(&transition.transaction_id) {
626                pending_tx_ids.push(transition.transaction_id);
627            }
628        }
629        self.replaying = false;
630        for tx_id in pending_tx_ids {
631            self.rolled_back.insert(tx_id);
632            self.write_marker(&format!("{ROLLBACK_MARKER_PREFIX}{tx_id:032x}"))?;
633        }
634        Ok(())
635    }
636
637    fn enforce_retention(&mut self) -> Result<()> {
638        match self.retention_policy.clone() {
639            LogRetentionPolicy::Infinite => Ok(()),
640            LogRetentionPolicy::Sized { max_transitions } => self.enforce_sized(max_transitions),
641            LogRetentionPolicy::Chunked {
642                chunk_size,
643                archive_directory,
644            } => self.enforce_chunked(chunk_size, &archive_directory),
645        }
646    }
647
648    fn enforce_sized(&mut self, max_transitions: u64) -> Result<()> {
649        if max_transitions == 0 {
650            return Ok(());
651        }
652        while self.log.len() as u64 > max_transitions {
653            let head = self.log[0];
654            if !self.applied.contains(&head.sequence) {
655                self.replaying = true;
656                self.try_apply_transition(&head, true);
657                self.replaying = false;
658                if !self.applied.contains(&head.sequence) {
659                    break; // R7: never drop an un-applied transition.
660                }
661            }
662            self.log.remove(0);
663            if self.trace {
664                eprintln!(
665                    "[Transactions] Dropped applied transition seq={} per sized retention.",
666                    head.sequence
667                );
668            }
669        }
670        Ok(())
671    }
672
673    fn enforce_chunked(&mut self, chunk_size: u64, archive_directory: &Path) -> Result<()> {
674        if chunk_size == 0 {
675            return Ok(());
676        }
677        if (self.log.len() as u64) < chunk_size {
678            return Ok(());
679        }
680        let chunk: Vec<Transition> = self.log.iter().take(chunk_size as usize).copied().collect();
681        for transition in &chunk {
682            if !self.applied.contains(&transition.sequence) {
683                self.replaying = true;
684                self.try_apply_transition(transition, true);
685                self.replaying = false;
686                if !self.applied.contains(&transition.sequence) {
687                    return Ok(()); // never drop un-applied
688                }
689            }
690        }
691        std::fs::create_dir_all(archive_directory).with_context(|| {
692            format!(
693                "failed to create archive dir {}",
694                archive_directory.display()
695            )
696        })?;
697        let timestamp = now_unix_ms();
698        let file_name = format!(
699            "transitions-chunk-{timestamp}-{:032x}.log",
700            new_transaction_id()
701        );
702        let path = archive_directory.join(file_name);
703        use std::io::Write;
704        let mut file = std::fs::File::create(&path)
705            .with_context(|| format!("failed to create archive file {}", path.display()))?;
706        for transition in &chunk {
707            writeln!(file, "{}", transition.serialize())?;
708        }
709        file.flush()?;
710        if self.trace {
711            eprintln!(
712                "[Transactions] Archived {} transitions to {}.",
713                chunk.len(),
714                path.display()
715            );
716        }
717        self.log.drain(0..chunk.len());
718        Ok(())
719    }
720}
721
722// ----- Helpers ----------------------------------------------------------
723
724fn insert_ordered(list: &mut Vec<Transition>, transition: Transition) {
725    let mut lo = 0usize;
726    let mut hi = list.len();
727    while lo < hi {
728        let mid = (lo + hi) / 2;
729        if list[mid].sequence < transition.sequence {
730            lo = mid + 1;
731        } else {
732            hi = mid;
733        }
734    }
735    list.insert(lo, transition);
736}
737
738static TX_COUNTER: AtomicU64 = AtomicU64::new(0);
739
740fn new_transaction_id() -> u128 {
741    // Combine a per-process counter with the current timestamp to
742    // approximate a Guid without pulling in the `uuid` crate.
743    let count = TX_COUNTER.fetch_add(1, Ordering::Relaxed) as u128;
744    let now = now_unix_ms() as u128;
745    (now << 64) | count
746}
747
748fn now_unix_ms() -> i64 {
749    SystemTime::now()
750        .duration_since(UNIX_EPOCH)
751        .map(|d| d.as_millis() as i64)
752        .unwrap_or(0)
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758
759    #[test]
760    fn retention_policy_parses_specs() {
761        assert!(matches!(
762            LogRetentionPolicy::parse("infinite").unwrap(),
763            LogRetentionPolicy::Infinite
764        ));
765        assert!(matches!(
766            LogRetentionPolicy::parse("sized:1000").unwrap(),
767            LogRetentionPolicy::Sized {
768                max_transitions: 1000
769            }
770        ));
771        match LogRetentionPolicy::parse("chunked:500:/tmp/x").unwrap() {
772            LogRetentionPolicy::Chunked {
773                chunk_size,
774                archive_directory,
775            } => {
776                assert_eq!(chunk_size, 500);
777                assert_eq!(archive_directory, PathBuf::from("/tmp/x"));
778            }
779            _ => panic!("expected Chunked"),
780        }
781        assert!(LogRetentionPolicy::parse("garbage").is_err());
782    }
783
784    #[test]
785    fn transition_round_trips_through_serialize() {
786        let t = Transition {
787            transaction_id: 0xabcdef1234567890u128,
788            sequence: 42,
789            timestamp_ms: 1234567890,
790            kind: TransitionKind::Update,
791            before: DoubletLink::new(1, 2, 3),
792            after: DoubletLink::new(1, 4, 5),
793        };
794        let parsed = Transition::try_parse(&t.serialize()).unwrap();
795        assert_eq!(t, parsed);
796    }
797}