Skip to main content

link_cli/transactions/
types.rs

1//! Value types and serialization helpers for the transactions layer.
2
3use std::path::PathBuf;
4
5use anyhow::{anyhow, bail, Result};
6
7use crate::link::Link;
8
9/// The kind of write operation recorded by a [`Transition`].
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub enum TransitionKind {
12    Create,
13    Update,
14    Delete,
15}
16
17impl TransitionKind {
18    pub(crate) fn as_u8(self) -> u8 {
19        match self {
20            TransitionKind::Create => 0,
21            TransitionKind::Update => 1,
22            TransitionKind::Delete => 2,
23        }
24    }
25
26    pub(crate) fn from_u8(value: u8) -> Option<Self> {
27        match value {
28            0 => Some(TransitionKind::Create),
29            1 => Some(TransitionKind::Update),
30            2 => Some(TransitionKind::Delete),
31            _ => None,
32        }
33    }
34}
35
36/// Sync flushes data-store side-effects before `commit` returns.
37///
38/// Async durably persists the transitions then applies the data-store
39/// side-effects on a background-friendly path (already-applied
40/// side-effects are the common case for in-process inner stores).
41///
42/// The Rust port runs both modes synchronously on the calling thread
43/// for predictability; the distinction is preserved for parity with C#
44/// and for future expansion.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
46pub enum CommitMode {
47    #[default]
48    Sync,
49    Async,
50}
51
52/// Retention policy for the transitions log.
53#[derive(Debug, Clone, PartialEq, Eq, Default)]
54pub enum LogRetentionPolicy {
55    /// Keep every transition forever (default).
56    #[default]
57    Infinite,
58    /// Drop the oldest applied transitions once the live log exceeds
59    /// `max_transitions`. Never drops un-applied transitions (R7).
60    Sized { max_transitions: u64 },
61    /// Archive the oldest `chunk_size` applied transitions to a
62    /// rolling file in `archive_directory` once the live log reaches
63    /// `chunk_size`.
64    Chunked {
65        chunk_size: u64,
66        archive_directory: PathBuf,
67    },
68}
69
70impl LogRetentionPolicy {
71    /// Parses a CLI spec: `infinite`, `sized:<n>`, `chunked:<n>:<dir>`.
72    pub fn parse(spec: &str) -> Result<Self> {
73        let trimmed = spec.trim();
74        if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("infinite") {
75            return Ok(Self::Infinite);
76        }
77
78        let lowered = trimmed.to_ascii_lowercase();
79        if lowered.starts_with("sized:") {
80            let rest = &trimmed["sized:".len()..];
81            let max: u64 = rest
82                .parse()
83                .map_err(|_| anyhow!("invalid sized retention spec '{spec}'"))?;
84            return Ok(Self::Sized {
85                max_transitions: max,
86            });
87        }
88        if lowered.starts_with("chunked:") {
89            let rest = &trimmed["chunked:".len()..];
90            let (size_text, dir) = rest
91                .split_once(':')
92                .ok_or_else(|| anyhow!("invalid chunked retention spec '{spec}'"))?;
93            let chunk_size: u64 = size_text
94                .parse()
95                .map_err(|_| anyhow!("invalid chunked size in '{spec}'"))?;
96            if chunk_size == 0 {
97                bail!("invalid chunked size in '{spec}'");
98            }
99            if dir.is_empty() {
100                bail!("invalid chunked retention spec '{spec}'");
101            }
102            return Ok(Self::Chunked {
103                chunk_size,
104                archive_directory: PathBuf::from(dir),
105            });
106        }
107        bail!("unknown retention spec '{spec}'");
108    }
109}
110
111/// A single doublet link state captured by a transition (mirror of the
112/// C# `Platform.Data.Doublets.Link<uint>`).
113#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]
114pub struct DoubletLink {
115    pub index: u32,
116    pub source: u32,
117    pub target: u32,
118}
119
120impl DoubletLink {
121    pub const fn new(index: u32, source: u32, target: u32) -> Self {
122        Self {
123            index,
124            source,
125            target,
126        }
127    }
128
129    pub const fn empty() -> Self {
130        Self::new(0, 0, 0)
131    }
132
133    pub fn from_link(link: &Link) -> Self {
134        Self::new(link.index, link.source, link.target)
135    }
136}
137
138/// Reversible write captured by the transactions layer. Holds both
139/// `before` and `after` link states so the operation can be undone or
140/// replayed.
141#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
142pub struct Transition {
143    pub transaction_id: u128,
144    pub sequence: i64,
145    pub timestamp_ms: i64,
146    pub kind: TransitionKind,
147    pub before: DoubletLink,
148    pub after: DoubletLink,
149}
150
151impl Transition {
152    pub(crate) const SCHEMA_VERSION: &'static str = "v1";
153
154    /// Encodes the transition as a single line stored as the *name*
155    /// of one link in the log doublets store.
156    pub fn serialize(&self) -> String {
157        format!(
158            "{schema}|{tx:032x}|{seq}|{ms}|{kind}|{bi},{bs},{bt}|{ai},{as_},{at}",
159            schema = Self::SCHEMA_VERSION,
160            tx = self.transaction_id,
161            seq = self.sequence,
162            ms = self.timestamp_ms,
163            kind = self.kind.as_u8(),
164            bi = self.before.index,
165            bs = self.before.source,
166            bt = self.before.target,
167            ai = self.after.index,
168            as_ = self.after.source,
169            at = self.after.target,
170        )
171    }
172
173    /// Parses a serialized transition.
174    pub fn try_parse(text: &str) -> Option<Self> {
175        if text.is_empty() {
176            return None;
177        }
178        let parts: Vec<&str> = text.split('|').collect();
179        if parts.len() < 7 {
180            return None;
181        }
182        if parts[0] != Self::SCHEMA_VERSION {
183            return None;
184        }
185        let tx = u128::from_str_radix(parts[1], 16).ok()?;
186        let seq: i64 = parts[2].parse().ok()?;
187        let ms: i64 = parts[3].parse().ok()?;
188        let kind_value: u8 = parts[4].parse().ok()?;
189        let kind = TransitionKind::from_u8(kind_value)?;
190        let before = parse_doublet(parts[5])?;
191        let after = parse_doublet(parts[6])?;
192        Some(Self {
193            transaction_id: tx,
194            sequence: seq,
195            timestamp_ms: ms,
196            kind,
197            before,
198            after,
199        })
200    }
201}
202
203fn parse_doublet(text: &str) -> Option<DoubletLink> {
204    let parts: Vec<&str> = text.split(',').collect();
205    if parts.len() != 3 {
206        return None;
207    }
208    Some(DoubletLink {
209        index: parts[0].parse().ok()?,
210        source: parts[1].parse().ok()?,
211        target: parts[2].parse().ok()?,
212    })
213}
214
215/// Sidecar-store name prefixes used by the recovery protocol.
216pub(crate) const COMMIT_MARKER_PREFIX: &str = "__transactions:commit:";
217pub(crate) const ROLLBACK_MARKER_PREFIX: &str = "__transactions:rollback:";
218pub(crate) const APPLIED_MARKER_PREFIX: &str = "__transactions:applied:";
219pub(crate) const TRANSITION_NAME_PREFIX: &str = "__transactions:transition:";