Skip to main content

flowlog_runtime/
txn.rs

1//! Transaction state shared by every incremental driver.
2//!
3//! Both the binary-mode REPL (`flowlog-compiler`) and the library-mode
4//! engine (`flowlog-build`, incremental codegen) use the same
5//! epoch-broadcast protocol: a driver writes a [`TxnState`] into
6//! `Arc<RwLock<_>>`, workers rendezvous on a [`std::sync::Barrier`] to
7//! read the snapshot, apply its `pending` ops, then rendezvous again to
8//! publish outputs. The only thing that differs between modes is who
9//! plays the driver — stdin for the binary, the host thread for the
10//! library.
11
12use std::path::PathBuf;
13
14/// Update multiplicity applied to a tuple. `+1` inserts, `-1` retracts;
15/// larger magnitudes scale the count in ring-valued semirings.
16pub type Diff = i32;
17
18/// A single tuple-level update queued inside a transaction.
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub enum TxnOp {
21    /// Apply `diff` copies of `tuple` (serialized form) to `rel`.
22    Put {
23        rel: String,
24        tuple: String,
25        diff: Diff,
26    },
27    /// Apply `diff` copies of every row in `path` to `rel`.
28    File {
29        rel: String,
30        path: PathBuf,
31        diff: Diff,
32    },
33}
34
35/// What workers should do when they observe a new published [`TxnState`].
36#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
37pub enum TxnAction {
38    /// No action (idle / cleared).
39    #[default]
40    None,
41    /// Execute `pending`, then advance/flush once.
42    Commit,
43    /// Quit all workers.
44    Quit,
45}
46
47/// Shared transaction snapshot. The driver mutates this behind an
48/// `Arc<RwLock<_>>`; workers clone the inner value each epoch.
49#[derive(Clone, Debug, Default)]
50pub struct TxnState {
51    /// Broadcast indicator: incremented on each publish so workers can
52    /// detect "new txn".
53    pub epoch: u32,
54    /// Broadcast indicator: what the workers should do for this epoch.
55    pub action: TxnAction,
56    /// Updates queued for the next commit.
57    pub pending: Vec<TxnOp>,
58}
59
60impl TxnState {
61    /// Clear the pending queue — used by drivers when starting or
62    /// aborting a transaction.
63    pub fn clear_pending(&mut self) {
64        self.pending.clear();
65    }
66
67    /// Append one op to the pending queue.
68    pub fn enqueue(&mut self, op: TxnOp) {
69        self.pending.push(op);
70    }
71
72    /// Snapshot the current state as a Commit broadcast at `next_epoch`.
73    /// Clones `pending` so the driver can keep its queue for rollback.
74    pub fn as_commit_snapshot(&self, next_epoch: u32) -> TxnState {
75        TxnState {
76            epoch: next_epoch,
77            action: TxnAction::Commit,
78            pending: self.pending.clone(),
79        }
80    }
81
82    /// Freestanding Quit snapshot — no carried pending ops.
83    pub fn as_quit_snapshot(next_epoch: u32) -> TxnState {
84        TxnState {
85            epoch: next_epoch,
86            action: TxnAction::Quit,
87            pending: Vec::new(),
88        }
89    }
90}