selene_graph/committer_batch.rs
1//! Group-commit batch body for the single per-graph committer thread
2//! (v1.2 multi-writer, BRIEF 2).
3//!
4//! This module holds everything the committer's batched driver
5//! ([`crate::committer::run_committer`]) needs **per contiguous run** of
6//! commits, so `committer.rs` stays a thin driver under the 700-LOC file cap:
7//!
8//! - [`CommitBatching`] — the embedder-facing group-commit policy.
9//! - [`drain_contiguous_batch`] — Phase 1: form the contiguous-`seal_seq` run
10//! starting at `next_publish_seq` by appending (fsync deferred) each
11//! [`crate::write_txn::SealedCommit`], capped by count + aggregate bytes.
12//! - [`flush_and_publish_batch`] — Phase 2 (the R1 fsync-before-publish
13//! barrier: ONE [`crate::write_txn::flush_durables`] for the whole run) +
14//! Phase 3/4 (publish + ack each member in `seal_seq` order).
15//! - [`ack_appended_with_error`] — fail every already-appended-but-unpublished
16//! member of a run so no reply [`std::sync::mpsc::SyncSender`] is dropped
17//! silently (which would hang its `recv()`).
18//!
19//! # Why a single flush per run (the R1 barrier)
20//!
21//! For every commit `C` in a run `B`: append (bytes written, fsync deferred) →
22//! ONE [`flush_durables`](crate::write_txn::flush_durables) fsyncs **all** of
23//! `B` → [`publish_appended`](crate::write_txn::publish_appended) makes `C`
24//! visible → ack delivers `durable_at`. The flush strictly precedes both store
25//! and ack, so neither the published snapshot nor `durable_at` is observable
26//! before fsync (durable-before-visible). An appended-but-unflushed commit is
27//! never published alone, so it stays invisible **and** unacked, and on a crash
28//! its torn tail is truncated on reopen — exactly what durability permits.
29//!
30//! # OFF == BRIEF 1
31//!
32//! With [`CommitBatching::Off`] the run length is capped at 1
33//! ([`BatchLimits::resolve`]), so a run is always one append + one flush + one
34//! publish + one ack — the same syscall sequence, in the same order, as BRIEF
35//! 1's `EveryN(1)` per-commit fsync. OFF is the degenerate `N=1` case of the
36//! *identical* batched code, never a second path.
37
38use std::collections::BTreeMap;
39use std::sync::Arc;
40use std::sync::atomic::Ordering;
41use std::sync::mpsc::{Receiver, TryRecvError};
42
43use crate::committer::{
44 CommitterHandles, Work, committer_dead, drain_buffer_with_error, run_protected,
45 unwrap_protected,
46};
47use crate::error::GraphResult;
48use crate::write_txn::{
49 AppendedCommit, CommitOutcome, SealedCommit, append_sealed, flush_durables, publish_appended,
50};
51
52/// Group-commit batching policy for the single per-graph committer thread.
53/// Default [`Off`](CommitBatching::Off).
54///
55/// Composes with [`crate::SharedGraphBuilder::with_wal`]: the committer-managed
56/// WAL is always driven in [`SyncPolicy::OnFlushOnly`](crate::SyncPolicy::OnFlushOnly)
57/// regardless of the [`WalConfig`](crate::WalConfig) passed (the committer owns
58/// fsync), so the chosen `CommitBatching` policy — not the `WalConfig` —
59/// determines how many commits coalesce into one fsync.
60#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
61pub enum CommitBatching {
62 /// One append + one fsync per commit. Behaviorally identical to BRIEF 1
63 /// (durable-before-visible, same crash semantics, one fsync per commit). The
64 /// default.
65 #[default]
66 Off,
67 /// Coalesce up to `max_commits` contiguous commits (capped by aggregate
68 /// `max_bytes`) into one group fsync. A single commit larger than `max_bytes`
69 /// is still taken alone (the `>= 1` progress rule), so the byte cap only
70 /// bounds *accumulation* and never rejects a commit.
71 On {
72 /// Maximum number of contiguous commits coalesced into one fsync.
73 max_commits: std::num::NonZeroUsize,
74 /// Aggregate (estimated) payload-byte cap for one batch — a DoS bound on
75 /// how much un-fsynced work the committer accumulates before a barrier.
76 max_bytes: u64,
77 },
78}
79
80impl CommitBatching {
81 /// Conventional ON policy: coalesce up to 64 commits / 8 MiB aggregate.
82 pub const DEFAULT_ON: Self = Self::On {
83 // SAFETY-of-unwrap: 64 != 0. `NonZeroUsize::new(..).unwrap()` is const
84 // since Rust 1.83, evaluated at compile time.
85 max_commits: match std::num::NonZeroUsize::new(64) {
86 Some(value) => value,
87 None => unreachable!(),
88 },
89 max_bytes: 8 * 1024 * 1024,
90 };
91}
92
93/// Resolved per-run caps derived from a [`CommitBatching`] policy.
94///
95/// `Off` resolves to `(1, u64::MAX)` so the batched driver runs exactly one
96/// commit per run (OFF == BRIEF 1); `On { max_commits, max_bytes }` resolves to
97/// `(max_commits, max_bytes)`.
98#[derive(Clone, Copy, Debug)]
99pub(crate) struct BatchLimits {
100 /// Maximum commits in one run (clamped to 1 when `Off`).
101 pub(crate) max_commits: usize,
102 /// Aggregate estimated payload-byte cap for one run.
103 pub(crate) max_bytes: u64,
104}
105
106impl BatchLimits {
107 /// Resolve a [`CommitBatching`] policy into concrete per-run caps.
108 pub(crate) fn resolve(batching: CommitBatching) -> Self {
109 match batching {
110 CommitBatching::Off => Self {
111 max_commits: 1,
112 max_bytes: u64::MAX,
113 },
114 CommitBatching::On {
115 max_commits,
116 max_bytes,
117 } => Self {
118 max_commits: max_commits.get(),
119 max_bytes,
120 },
121 }
122 }
123}
124
125/// Documented structural estimate of a sealed commit's encoded WAL-payload size,
126/// used only for the F4 aggregate-byte cap.
127///
128/// The exact encoded length is computed by `encode_changes` deep inside
129/// `WalWriter::append` and is **not** threaded back out (that would require
130/// changing the [`DurableProvider`](crate::DurableProvider) trait signature for
131/// a non-load-bearing accounting number). Instead this approximates each
132/// [`Change`] at a flat [`PER_CHANGE_ESTIMATE_BYTES`] plus a small per-commit
133/// header allowance. The estimate is intentionally conservative-high so the
134/// `max_bytes` cap triggers a barrier *no later* than the real bytes would; the
135/// per-entry hard cap (`entry_header::ensure_payload_len`) still bounds any
136/// single commit's real size, so this estimate only governs how many commits
137/// accumulate before a flush — never correctness.
138pub(crate) fn encoded_estimate(sealed: &SealedCommit) -> u64 {
139 const PER_COMMIT_HEADER_BYTES: u64 = 64;
140 let changes = sealed.changes.len() as u64;
141 PER_COMMIT_HEADER_BYTES.saturating_add(changes.saturating_mul(PER_CHANGE_ESTIMATE_BYTES))
142}
143
144/// Flat per-[`Change`] byte allowance for [`encoded_estimate`]. Sized generously
145/// above the typical encoded change so the F4 cap errs toward flushing sooner.
146const PER_CHANGE_ESTIMATE_BYTES: u64 = 256;
147
148/// Outcome of [`drain_contiguous_batch`]: the appended (fsync-deferred) run plus
149/// whether the driver must stop after handling it.
150pub(crate) enum BatchDrain {
151 /// A (possibly empty) contiguous run of appended commits to flush + publish.
152 /// Empty only when the head at `next_publish_seq` was a gap (no work to do
153 /// this iteration) — the caller simply loops back to `recv()`.
154 Run {
155 /// The appended-but-unflushed run, in ascending `seal_seq` order.
156 batch: Vec<AppendedCommit>,
157 },
158 /// An [`append_sealed`] failed partway through the run. The committer is
159 /// already poisoned and every member listed here (including any successfully
160 /// appended earlier in this run) must be acked with `committer_dead`, the
161 /// reorder buffer drained, and the loop exited.
162 AppendFailed {
163 /// Members already appended before the failure — must be error-acked.
164 appended: Vec<AppendedCommit>,
165 },
166}
167
168/// Phase 1 — form a contiguous-`seal_seq` run starting at `next_publish_seq` by
169/// [`append_sealed`]ing each [`Work::Commit`] (fsync deferred).
170///
171/// The run is bounded by [`BatchLimits`] (count + aggregate estimated bytes,
172/// both conjunctive) with a `>= 1` progress rule (a single commit over
173/// `max_bytes` is still taken alone). It ends at:
174/// - a **gap** (the next `seal_seq` is absent after a non-blocking `try_recv`
175/// refill) — the caller blocks on `recv()` for it;
176/// - **snapshot-maintenance work** at head — a hard flush boundary (F2): the
177/// pending commit run is flushed/published first, then the maintenance item
178/// publishes solo.
179///
180/// `next_publish_seq` is advanced past every commit appended here. On an
181/// [`append_sealed`] error the already-appended members are returned in
182/// [`BatchDrain::AppendFailed`] for error-acking; the failed commit itself has
183/// already been error-acked and `next_publish_seq` advanced past it.
184pub(crate) fn drain_contiguous_batch(
185 receiver: &Receiver<Work>,
186 reorder: &mut BTreeMap<u64, Work>,
187 next_publish_seq: &mut u64,
188 limits: BatchLimits,
189 handles: &CommitterHandles,
190 poisoned: &Arc<std::sync::atomic::AtomicBool>,
191) -> BatchDrain {
192 let mut batch: Vec<AppendedCommit> = Vec::new();
193 let mut batch_bytes: u64 = 0;
194 loop {
195 // Opportunistically refill the contiguous slot from the channel without
196 // blocking, so a run can extend past what was already buffered. We only
197 // drain until the next contiguous seq is present (or the channel is
198 // momentarily empty); anything pulled is buffered by seal_seq, never
199 // published out of order.
200 if !reorder.contains_key(next_publish_seq) {
201 refill_until_contiguous(receiver, reorder, *next_publish_seq);
202 }
203 match reorder.get(next_publish_seq) {
204 // Gap: the next seal_seq is in flight on a session that already
205 // dropped the write lock and WILL send it. End the run; the caller
206 // blocks on recv() for it (no deadlock — see the module + committer
207 // docs).
208 None => return BatchDrain::Run { batch },
209 // Snapshot maintenance at head is a hard flush boundary (F2):
210 // never co-batched. Flush/publish the pending commit run first; the
211 // caller re-iterates with the maintenance item now at head + an
212 // empty batch.
213 Some(Work::Compact { .. } | Work::VectorIndexRebuild { .. }) => {
214 return BatchDrain::Run { batch };
215 }
216 Some(Work::Commit { sealed, .. }) => {
217 let estimate = encoded_estimate(sealed);
218 // Cap check with the >= 1 progress rule: only stop if the batch
219 // already has a member (so a single over-cap commit is taken
220 // alone, never rejected).
221 if !batch.is_empty()
222 && (batch.len() >= limits.max_commits
223 || batch_bytes.saturating_add(estimate) > limits.max_bytes)
224 {
225 return BatchDrain::Run { batch };
226 }
227 // Pop the head commit and advance the publish cursor.
228 let Some(Work::Commit { sealed, reply }) = reorder.remove(next_publish_seq) else {
229 unreachable!("checked Work::Commit at next_publish_seq above");
230 };
231 *next_publish_seq += 1;
232 match run_protected(|| append_sealed(sealed, &handles.durable_providers)) {
233 Ok(Ok(mut appended)) => {
234 appended.reply = Some(reply);
235 batch_bytes = batch_bytes.saturating_add(estimate);
236 batch.push(appended);
237 }
238 failed => {
239 // Partial-batch append failure: poison, Err THIS waiter,
240 // and hand the already-appended members back for
241 // error-acking. None of the run is flushed or published.
242 // `unwrap_protected` poisons + yields the GraphError; the
243 // append produced no AppendedCommit, so re-wrap the error
244 // into this waiter's CommitOutcome reply channel.
245 let error = match unwrap_protected(failed, poisoned) {
246 Ok(_appended) => unreachable!("append-failure arm is never Ok"),
247 Err(error) => error,
248 };
249 let _: Result<(), std::sync::mpsc::SendError<GraphResult<CommitOutcome>>> =
250 reply.send(Err(error));
251 return BatchDrain::AppendFailed { appended: batch };
252 }
253 }
254 }
255 }
256 }
257}
258
259/// Non-blocking refill: pull ready items from the channel into the reorder
260/// buffer until either the contiguous `next` seq is present or the channel has
261/// no immediately-ready item. Disconnect is a no-op here (the outer driver's
262/// blocking `recv()` observes it next iteration).
263fn refill_until_contiguous(
264 receiver: &Receiver<Work>,
265 reorder: &mut BTreeMap<u64, Work>,
266 next: u64,
267) {
268 loop {
269 match receiver.try_recv() {
270 Ok(work) => {
271 let seq = work.seal_seq();
272 reorder.insert(seq, work);
273 if seq == next {
274 return;
275 }
276 }
277 Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => return,
278 }
279 }
280}
281
282/// Phase 2 (R1 barrier) + Phase 3/4 — flush the whole run once, then publish +
283/// ack each member in `seal_seq` order.
284///
285/// Returns `true` when the committer must **stop** (a flush error or a
286/// publish-tail panic poisoned the engine and the buffer was drained); the
287/// caller then returns from the loop. An empty `batch` is a no-op returning
288/// `false` (the run ended on a gap; nothing to flush).
289///
290/// Failure handling mirrors [`crate::write_txn`]'s contract:
291/// - **flush Err** ⇒ publish NOTHING, poison, error-ack every member, drain the
292/// buffer, stop.
293/// - **publish panic** ⇒ `catch_unwind` poisons; error-ack the remaining members
294/// (this one already had its reply consumed), drain the buffer, stop.
295pub(crate) fn flush_and_publish_batch(
296 batch: Vec<AppendedCommit>,
297 reorder: &mut BTreeMap<u64, Work>,
298 handles: &CommitterHandles,
299 poisoned: &Arc<std::sync::atomic::AtomicBool>,
300) -> bool {
301 if batch.is_empty() {
302 return false;
303 }
304
305 // Phase 2: the single group flush == the R1 fsync-before-publish barrier.
306 match run_protected(|| flush_durables(&handles.durable_providers)) {
307 Ok(Ok(())) => {}
308 failed => {
309 // Poison via unwrap_protected (its returned Err is the flush error,
310 // already surfaced to each member below via committer_dead); publish
311 // NOTHING from the run.
312 let _ = unwrap_protected(failed, poisoned);
313 ack_appended_with_error(batch);
314 drain_buffer_with_error(reorder);
315 return true;
316 }
317 }
318
319 // Phase 3+4: publish-all in seal_seq (== batch) order, ack-all. Durable now,
320 // so each publish makes its commit visible only after the barrier above.
321 let mut batch = batch.into_iter();
322 while let Some(mut appended) = batch.next() {
323 let reply = appended.reply.take();
324 // publish_appended is infallible (returns CommitOutcome) — the only way
325 // to fail is a panic (store / debug assert / fan-out boundary), which
326 // catch_unwind converts to a Durable error + poison.
327 let result = unwrap_protected(
328 run_protected(|| {
329 Ok(publish_appended(
330 appended,
331 &handles.snapshot,
332 &handles.schema_version,
333 &handles.providers,
334 ))
335 }),
336 poisoned,
337 );
338 if let Some(reply) = reply {
339 let _ = reply.send(result);
340 }
341 if poisoned.load(Ordering::Acquire) {
342 // A publish-tail panic poisoned us. Every remaining member is
343 // appended + flushed (durable) but unpublished; we cannot trust the
344 // diverged in-memory graph, so Err them and drain the buffer.
345 ack_appended_with_error(batch.collect());
346 drain_buffer_with_error(reorder);
347 return true;
348 }
349 }
350 false
351}
352
353/// Fail every member of a run that was appended but never published, so no reply
354/// [`SyncSender`] is dropped silently (which would hang its `recv()` with a
355/// `RecvError`). Used on a partial-append failure, a flush failure, and a
356/// publish-tail panic — every poison exit drains its in-flight run through here.
357pub(crate) fn ack_appended_with_error(batch: Vec<AppendedCommit>) {
358 for appended in batch {
359 if let Some(reply) = appended.reply {
360 let _: Result<(), std::sync::mpsc::SendError<GraphResult<CommitOutcome>>> =
361 reply.send(Err(committer_dead()));
362 }
363 }
364}
365
366#[cfg(test)]
367#[path = "committer_batch_tests.rs"]
368mod tests;