flowlog_runtime/txn.rs
1//! Transaction state shared by every incremental driver.
2//!
3//! Both the binary-mode REPL (`flowlog-compiler`) and the library-mode
4//! engine (`flowlog-build`, incremental codegen) use the same
5//! epoch-broadcast protocol: a driver writes a [`TxnState`] into
6//! `Arc<RwLock<_>>`, workers rendezvous on a [`std::sync::Barrier`] to
7//! read the snapshot, apply its `pending` ops, then rendezvous again to
8//! publish outputs. The only thing that differs between modes is who
9//! plays the driver — stdin for the binary, the host thread for the
10//! library.
11
12use std::path::PathBuf;
13
14/// Update multiplicity applied to a tuple. `+1` inserts, `-1` retracts;
15/// larger magnitudes scale the count in ring-valued semirings.
16pub type Diff = i32;
17
18/// A single tuple-level update queued inside a transaction.
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub enum TxnOp {
21 /// Apply `diff` copies of `tuple` (serialized form) to `rel`.
22 Put {
23 rel: String,
24 tuple: String,
25 diff: Diff,
26 },
27 /// Apply `diff` copies of every row in `path` to `rel`.
28 File {
29 rel: String,
30 path: PathBuf,
31 diff: Diff,
32 },
33}
34
35/// What workers should do when they observe a new published [`TxnState`].
36#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
37pub enum TxnAction {
38 /// No action (idle / cleared).
39 #[default]
40 None,
41 /// Execute `pending`, then advance/flush once.
42 Commit,
43 /// Quit all workers.
44 Quit,
45}
46
47/// Shared transaction snapshot. The driver mutates this behind an
48/// `Arc<RwLock<_>>`; workers clone the inner value each epoch.
49#[derive(Clone, Debug, Default)]
50pub struct TxnState {
51 /// Broadcast indicator: incremented on each publish so workers can
52 /// detect "new txn".
53 pub epoch: u32,
54 /// Broadcast indicator: what the workers should do for this epoch.
55 pub action: TxnAction,
56 /// Updates queued for the next commit.
57 pub pending: Vec<TxnOp>,
58}
59
60impl TxnState {
61 /// Clear the pending queue — used by drivers when starting or
62 /// aborting a transaction.
63 pub fn clear_pending(&mut self) {
64 self.pending.clear();
65 }
66
67 /// Append one op to the pending queue.
68 pub fn enqueue(&mut self, op: TxnOp) {
69 self.pending.push(op);
70 }
71
72 /// Snapshot the current state as a Commit broadcast at `next_epoch`.
73 /// Clones `pending` so the driver can keep its queue for rollback.
74 pub fn as_commit_snapshot(&self, next_epoch: u32) -> TxnState {
75 TxnState {
76 epoch: next_epoch,
77 action: TxnAction::Commit,
78 pending: self.pending.clone(),
79 }
80 }
81
82 /// Freestanding Quit snapshot — no carried pending ops.
83 pub fn as_quit_snapshot(next_epoch: u32) -> TxnState {
84 TxnState {
85 epoch: next_epoch,
86 action: TxnAction::Quit,
87 pending: Vec::new(),
88 }
89 }
90}