#[cfg(feature = "exponential-backoff")]
compile_error!(
"Red flag: only Once and Bounded restart policies. \
Exponential backoff belongs in the product's supervisor, not here. \
See 08_CIRCUITS.md."
);
pub use super::fanout::Notification;
use super::fanout::{ReactorSubscriberList, SubscriberList};
use super::staging::{StagedCommitMeta, StagedCommitTiming, StagedCommittedEvent};
use crate::coordinate::{Coordinate, DagPosition};
use crate::event::{Event, EventHeader, EventKind, HashChain};
use crate::store::append::BatchAppendItem;
use crate::store::config::ValidatedStoreConfig;
use crate::store::index::{DiskPos, StoreIndex};
use crate::store::segment::sidx::kind_to_raw;
use crate::store::segment::{self, Active, FramePayloadRef, Segment};
#[cfg(test)]
use crate::store::SystemClock;
use crate::store::{AppendReceipt, StoreConfig, StoreError};
use flume::{Receiver, Sender};
use std::sync::Arc;
mod append;
mod batch;
#[cfg(all(test, feature = "dangerous-test-hooks", feature = "payload-encryption"))]
mod batch_fence_crash_tests;
#[cfg(feature = "payload-encryption")]
mod encrypt;
mod fence_runtime;
mod publish;
mod runtime;
mod watermark;
pub(crate) use self::append::AppendGuards;
use self::fence_runtime::{CommandResult, DeferredReply, FenceLedger};
pub(super) use self::runtime::checked_next_clock;
pub(crate) use self::runtime::find_latest_segment_id;
#[cfg(feature = "dangerous-test-hooks")]
use self::runtime::DriveStep;
use self::runtime::{writer_thread_main, writer_thread_name, WriterRuntime};
pub(crate) use self::watermark::{WatermarkAdvanceHandle, WatermarkState};
pub(super) fn ignore_closed_response_channel<T>(result: Result<(), flume::SendError<T>>) {
drop(result);
}
pub(crate) enum WriterCommand {
BeginVisibilityFence {
token: u64,
respond: Sender<Result<(), StoreError>>,
},
Append {
coord: Coordinate,
event: Box<Event<Vec<u8>>>, kind: EventKind,
guards: AppendGuards,
respond: Sender<Result<AppendReceipt, StoreError>>,
},
FenceAppend {
token: u64,
coord: Coordinate,
event: Box<Event<Vec<u8>>>,
kind: EventKind,
guards: AppendGuards,
respond: Sender<Result<AppendReceipt, StoreError>>,
},
AppendBatch {
items: Vec<BatchAppendItem>,
respond: Sender<Result<Vec<AppendReceipt>, StoreError>>,
},
FenceAppendBatch {
token: u64,
items: Vec<BatchAppendItem>,
respond: Sender<Result<Vec<AppendReceipt>, StoreError>>,
},
CommitVisibilityFence {
token: u64,
respond: Sender<Result<(), StoreError>>,
},
CancelVisibilityFence {
token: u64,
respond: Sender<Result<(), StoreError>>,
},
Sync {
respond: Sender<Result<(), StoreError>>,
},
Shutdown {
respond: Sender<Result<(), StoreError>>,
},
#[cfg(feature = "dangerous-test-hooks")]
#[doc(hidden)]
PanicForTest {
respond: Sender<Result<(), StoreError>>,
},
}
pub(crate) struct WriterHandle {
pub tx: Sender<WriterCommand>,
pub subscribers: Arc<SubscriberList>,
pub reactor_subscribers: Arc<ReactorSubscriberList>,
watermark_handle: WatermarkAdvanceHandle,
drive: WriterDrive,
}
#[cfg(feature = "dangerous-test-hooks")]
struct CoopState {
core: WriterCore,
events_since_sync: u32,
}
#[cfg(feature = "dangerous-test-hooks")]
#[derive(Clone)]
pub(crate) struct CooperativePump {
state: std::sync::Arc<std::sync::Mutex<CoopState>>,
rx: Receiver<WriterCommand>,
validated_cfg: Arc<ValidatedStoreConfig>,
config: Arc<StoreConfig>,
}
#[cfg(feature = "dangerous-test-hooks")]
impl CooperativePump {
pub(crate) fn pump(&self) {
let mut guard = self
.state
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let CoopState {
core,
events_since_sync,
} = &mut *guard;
while let Ok(cmd) = self.rx.try_recv() {
if let DriveStep::Exit = core.drive_command(
&self.rx,
&self.validated_cfg,
&self.config,
events_since_sync,
cmd,
) {
break;
}
}
}
}
pub(crate) enum WriterDrive {
Threaded {
thread: Option<Box<dyn crate::store::platform::spawn::JobHandle>>,
},
#[cfg(feature = "dangerous-test-hooks")]
Cooperative { pump: CooperativePump },
}
impl WriterDrive {
fn pump(&self) {
match self {
WriterDrive::Threaded { .. } => {}
#[cfg(feature = "dangerous-test-hooks")]
WriterDrive::Cooperative { pump } => pump.pump(),
}
}
}
#[derive(Clone, Debug, Default)]
pub enum RestartPolicy {
#[default]
Once,
Bounded {
max_restarts: u32,
within_ms: u64,
},
}
impl WriterHandle {
pub(crate) fn spawn(
config: &Arc<StoreConfig>,
runtime: &Arc<ValidatedStoreConfig>,
index: &Arc<StoreIndex>,
subscribers: &Arc<SubscriberList>,
reactor_subscribers: &Arc<ReactorSubscriberList>,
reader: &Arc<crate::store::segment::scan::Reader>,
) -> Result<Self, StoreError> {
config
.fs()
.create_dir_all(&config.data_dir)
.map_err(StoreError::Io)?;
let initial_segment_id = find_latest_segment_id(&config.data_dir)?.unwrap_or(0) + 1;
let initial_segment = Segment::<Active>::create_with_created_ns_on(
&config.data_dir,
initial_segment_id,
runtime.now_wall_ns(),
config.fs(),
)?;
let (tx, rx) = flume::bounded::<WriterCommand>(config.writer.channel_capacity);
let subs = Arc::clone(subscribers);
let reactor_subs = Arc::clone(reactor_subscribers);
let watermark_handle = WatermarkState::handle(runtime.clock_arc());
let cfg = Arc::clone(config);
let validated = Arc::clone(runtime);
let idx = Arc::clone(index);
let rdr = Arc::clone(reader);
let watermark_for_thread = watermark_handle.clone();
let thread = config
.spawner()
.spawn(
writer_thread_name(&config.data_dir),
config.writer.stack_size,
Box::new(move || {
writer_thread_main(
&WriterRuntime {
rx: &rx,
config: cfg,
validated_cfg: validated,
index: idx,
subscribers: subs,
reactor_subscribers: reactor_subs,
reader: rdr,
watermark_handle: watermark_for_thread,
},
initial_segment,
initial_segment_id,
);
}),
)
.map_err(StoreError::from)?;
Ok(Self {
tx,
subscribers: Arc::clone(subscribers),
reactor_subscribers: Arc::clone(reactor_subscribers),
watermark_handle,
drive: WriterDrive::Threaded {
thread: Some(thread),
},
})
}
#[cfg(feature = "dangerous-test-hooks")]
pub(crate) fn cooperative(
config: &Arc<StoreConfig>,
runtime: &Arc<ValidatedStoreConfig>,
index: &Arc<StoreIndex>,
subscribers: &Arc<SubscriberList>,
reactor_subscribers: &Arc<ReactorSubscriberList>,
reader: &Arc<crate::store::segment::scan::Reader>,
) -> Result<Self, StoreError> {
config
.fs()
.create_dir_all(&config.data_dir)
.map_err(StoreError::Io)?;
let initial_segment_id = find_latest_segment_id(&config.data_dir)?.unwrap_or(0) + 1;
let initial_segment = Segment::<Active>::create_with_created_ns_on(
&config.data_dir,
initial_segment_id,
runtime.now_wall_ns(),
config.fs(),
)?;
let (tx, rx) = flume::bounded::<WriterCommand>(config.writer.channel_capacity);
let watermark_handle = WatermarkState::handle(runtime.clock_arc());
let core = WriterCore {
index: Arc::clone(index),
active_segment: initial_segment,
segment_id: initial_segment_id,
config: Arc::clone(config),
runtime: Arc::clone(runtime),
subscribers: Arc::clone(subscribers),
reactor_subscribers: Arc::clone(reactor_subscribers),
reader: Arc::clone(reader),
watermark_handle: watermark_handle.clone(),
sidx_collector: crate::store::segment::sidx::SidxEntryCollector::new(),
fence_ledger: None,
};
let state = std::sync::Arc::new(std::sync::Mutex::new(CoopState {
core,
events_since_sync: 0,
}));
Ok(Self {
tx,
subscribers: Arc::clone(subscribers),
reactor_subscribers: Arc::clone(reactor_subscribers),
watermark_handle,
drive: WriterDrive::Cooperative {
pump: CooperativePump {
state,
rx,
validated_cfg: Arc::clone(runtime),
config: Arc::clone(config),
},
},
})
}
#[cfg(test)]
pub(crate) fn from_parts_for_test(
tx: Sender<WriterCommand>,
subscribers: Arc<SubscriberList>,
) -> Self {
Self {
tx,
subscribers,
reactor_subscribers: Arc::new(ReactorSubscriberList::new()),
watermark_handle: WatermarkState::handle(Arc::new(SystemClock::new())),
drive: WriterDrive::Threaded { thread: None },
}
}
pub(crate) fn watermark_handle(&self) -> WatermarkAdvanceHandle {
self.watermark_handle.clone()
}
pub(crate) fn fail_if_exited(&self) -> Result<(), StoreError> {
match &self.drive {
WriterDrive::Threaded { thread } => {
if thread.as_ref().is_some_and(|thread| thread.is_finished()) {
self.watermark_handle.mark_writer_crashed();
return Err(StoreError::WriterCrashed);
}
Ok(())
}
#[cfg(feature = "dangerous-test-hooks")]
WriterDrive::Cooperative { .. } => Ok(()),
}
}
pub(crate) fn join(&mut self) -> Result<(), StoreError> {
match &mut self.drive {
WriterDrive::Threaded { thread } => {
if let Some(thread) = thread.take() {
thread.join().map_err(|_| {
self.watermark_handle.mark_writer_crashed();
StoreError::WriterCrashed
})?;
}
Ok(())
}
#[cfg(feature = "dangerous-test-hooks")]
WriterDrive::Cooperative { pump } => {
pump.pump();
Ok(())
}
}
}
pub(crate) fn pump(&self) {
self.drive.pump();
}
#[cfg(feature = "dangerous-test-hooks")]
pub(crate) fn cooperative_pump(&self) -> Option<CooperativePump> {
match &self.drive {
WriterDrive::Threaded { .. } => None,
WriterDrive::Cooperative { pump } => Some(pump.clone()),
}
}
#[cfg(feature = "dangerous-test-hooks")]
pub(crate) fn close_channel_and_join(&mut self) {
let (dead_tx, _dead_rx) = flume::bounded(0);
let live_tx = std::mem::replace(&mut self.tx, dead_tx);
drop(live_tx);
match &mut self.drive {
WriterDrive::Threaded { thread } => {
if let Some(thread) = thread.take() {
let _join_result = thread.join();
}
}
WriterDrive::Cooperative { pump } => pump.pump(),
}
}
}
struct WriterCore {
index: Arc<StoreIndex>,
active_segment: Segment<Active>,
segment_id: u64,
config: Arc<StoreConfig>,
runtime: Arc<ValidatedStoreConfig>,
subscribers: Arc<SubscriberList>,
reactor_subscribers: Arc<ReactorSubscriberList>,
reader: Arc<crate::store::segment::scan::Reader>,
watermark_handle: WatermarkAdvanceHandle,
sidx_collector: crate::store::segment::sidx::SidxEntryCollector,
fence_ledger: Option<FenceLedger>,
}
#[cfg(test)]
mod tests {
use super::watermark::elapsed_ms_since;
use super::{
checked_next_clock, ReactorSubscriberList, SubscriberList, WatermarkState, WriterCommand,
WriterDrive, WriterHandle,
};
use crate::store::stats::HlcPoint;
use crate::store::{StoreError, SystemClock};
use std::sync::mpsc;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[test]
fn checked_next_clock_advances_and_overflow_fails_closed() {
assert_eq!(
checked_next_clock(None, "entity:test").expect("genesis clock"),
0
);
assert_eq!(
checked_next_clock(Some(7), "entity:test").expect("increment clock"),
8
);
let err = checked_next_clock(Some(u32::MAX), "entity:overflow")
.expect_err("entity clock overflow must fail closed");
assert!(matches!(
err,
StoreError::EntityClockOverflow { ref entity } if entity == "entity:overflow"
));
}
#[test]
fn duplicate_accepted_advance_does_not_restart_pending_write_age() {
let point = HlcPoint {
wall_ms: 10,
global_sequence: 1,
};
let mut state = WatermarkState::default();
state.advance_accepted_on_lane(0, point);
state.advance_durable(point);
assert_eq!(
state.snapshot().oldest_pending_write_age_ms,
None,
"PROPERTY: durability to accepted clears pending write age"
);
state.advance_accepted_on_lane(0, point);
assert_eq!(
state.snapshot().oldest_pending_write_age_ms,
None,
"PROPERTY: duplicate accepted advance must not reopen pending write age"
);
}
#[test]
fn pending_write_age_reports_elapsed_milliseconds_not_nanoseconds_or_products() {
assert_eq!(
elapsed_ms_since(3_500_000, 1_000_000),
2,
"PROPERTY: frontier pending-write age is floor(elapsed_ns / 1_000_000)"
);
assert_eq!(
elapsed_ms_since(1_000_000, 3_500_000),
0,
"PROPERTY: backwards monotonic samples saturate to zero"
);
}
#[test]
fn writer_handle_join_surfaces_thread_panic_and_poisons_watermarks() {
let (tx, _rx) = flume::bounded::<WriterCommand>(1);
let watermark_handle = WatermarkState::handle(Arc::new(SystemClock::new()));
let thread = crate::store::platform::spawn::Spawn::spawn(
&crate::store::platform::spawn::ThreadSpawn,
"writer-join-panic-proof".to_owned(),
None,
Box::new(|| {
std::hint::black_box(Option::<()>::None)
.expect("intentional writer join panic proof");
}),
)
.expect("spawn panic proof thread");
let mut handle = WriterHandle {
tx,
subscribers: Arc::new(SubscriberList::new()),
reactor_subscribers: Arc::new(ReactorSubscriberList::new()),
watermark_handle: watermark_handle.clone(),
drive: WriterDrive::Threaded {
thread: Some(thread),
},
};
let err = handle
.join()
.expect_err("PROPERTY: writer thread panic must surface through join");
assert!(matches!(err, StoreError::WriterCrashed));
let poisoned =
watermark_handle.wait_for_durable(HlcPoint::ORIGIN, Duration::from_millis(1));
assert!(
matches!(poisoned, Err(StoreError::WriterCrashed)),
"PROPERTY: join panic must poison frontier waiters"
);
}
#[test]
fn dangerous_notify_all_wakes_condvar_waiters() {
let handle = WatermarkState::handle(std::sync::Arc::new(crate::store::SystemClock::new()));
let waiter_handle = handle.clone();
let (ready_tx, ready_rx) = mpsc::channel();
let (done_tx, done_rx) = mpsc::channel();
let waiter = std::thread::Builder::new()
.name("watermark-dangerous-notify-proof".to_string())
.spawn(move || {
ready_tx.send(()).expect("signal waiter readiness");
let timed_out =
waiter_handle.dangerous_wait_for_notification(Duration::from_secs(2));
done_tx.send(timed_out).expect("signal waiter outcome");
})
.expect("spawn condvar waiter");
ready_rx
.recv_timeout(Duration::from_secs(1))
.expect("waiter reached condvar wait setup");
let deadline = Instant::now() + Duration::from_secs(1);
let timed_out = loop {
handle.dangerous_notify_all();
match done_rx.recv_timeout(Duration::from_millis(10)) {
Ok(timed_out) => break timed_out,
Err(mpsc::RecvTimeoutError::Timeout) if Instant::now() < deadline => {}
Err(mpsc::RecvTimeoutError::Timeout | mpsc::RecvTimeoutError::Disconnected) => {
break true;
}
}
};
waiter.join().expect("condvar waiter joins");
assert!(
!timed_out,
"PROPERTY: dangerous_notify_all must wake frontier waiters before their timeout"
);
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum WriterLoopPhase {
Main,
GroupCommitDrain,
ShutdownDrain,
}
impl WriterCore {
fn execute_command(&mut self, phase: WriterLoopPhase, cmd: WriterCommand) -> CommandResult {
match cmd {
WriterCommand::BeginVisibilityFence { token, respond } => match phase {
WriterLoopPhase::Main | WriterLoopPhase::ShutdownDrain => {
ignore_closed_response_channel(
respond.send(self.begin_visibility_fence(token)),
);
CommandResult::immediate(0)
}
WriterLoopPhase::GroupCommitDrain => CommandResult::immediate(0)
.with_sync(DeferredReply::BeginVisibilityFence { token, respond })
.break_after_reply(),
},
WriterCommand::Append {
coord,
event,
kind,
guards,
respond,
} => {
let result = self.handle_append(&coord, *event, kind, &guards, None);
ignore_closed_response_channel(respond.send(result));
let base = CommandResult::immediate(1);
if matches!(phase, WriterLoopPhase::Main) {
base.enter_group_commit_drain()
} else {
base
}
}
WriterCommand::FenceAppend {
token,
coord,
event,
kind,
guards,
respond,
} => {
if let Err(error) = self.handle_fence_append_command(
token,
&coord,
*event,
kind,
&guards,
respond.clone(),
) {
ignore_closed_response_channel(respond.send(Err(error)));
CommandResult::immediate(0)
} else {
CommandResult::immediate(1)
}
}
WriterCommand::AppendBatch { items, respond } => {
let result = self.handle_append_batch(items, None);
ignore_closed_response_channel(respond.send(result));
CommandResult::immediate(1)
}
WriterCommand::FenceAppendBatch {
token,
items,
respond,
} => {
if let Err(error) =
self.handle_fence_append_batch_command(token, items, respond.clone())
{
ignore_closed_response_channel(respond.send(Err(error)));
CommandResult::immediate(0)
} else {
CommandResult::immediate(1)
}
}
WriterCommand::CommitVisibilityFence { token, respond } => match phase {
WriterLoopPhase::Main | WriterLoopPhase::GroupCommitDrain => {
CommandResult::immediate(0)
.with_sync(DeferredReply::CommitVisibilityFence { token, respond })
.break_after_reply_if(matches!(phase, WriterLoopPhase::GroupCommitDrain))
}
WriterLoopPhase::ShutdownDrain => {
ignore_closed_response_channel(
respond.send(self.commit_visibility_fence(token)),
);
CommandResult::immediate(0)
}
},
WriterCommand::CancelVisibilityFence { token, respond } => {
ignore_closed_response_channel(respond.send(self.cancel_visibility_fence(token)));
let base = CommandResult::immediate(0);
if matches!(phase, WriterLoopPhase::GroupCommitDrain) {
base.break_after_reply()
} else {
base
}
}
WriterCommand::Sync { respond } => match phase {
WriterLoopPhase::Main | WriterLoopPhase::GroupCommitDrain => {
CommandResult::immediate(0)
.with_sync(DeferredReply::Sync { respond })
.break_after_reply_if(matches!(phase, WriterLoopPhase::GroupCommitDrain))
}
WriterLoopPhase::ShutdownDrain => {
ignore_closed_response_channel(respond.send(self.sync_active_segment()));
CommandResult::immediate(0)
}
},
WriterCommand::Shutdown { respond } => match phase {
WriterLoopPhase::Main => CommandResult::immediate(0).enter_shutdown_drain(respond),
WriterLoopPhase::GroupCommitDrain => CommandResult::immediate(0)
.with_sync(DeferredReply::Shutdown { respond })
.break_after_reply()
.exit_writer(),
WriterLoopPhase::ShutdownDrain => {
ignore_closed_response_channel(respond.send(Ok(())));
CommandResult::immediate(0)
}
},
#[cfg(feature = "dangerous-test-hooks")]
WriterCommand::PanicForTest { respond } => match phase {
WriterLoopPhase::Main => {
ignore_closed_response_channel(respond.send(Ok(())));
std::panic::resume_unwind(Box::new(
"PanicForTest: intentional writer panic for restart_policy testing",
));
}
WriterLoopPhase::GroupCommitDrain | WriterLoopPhase::ShutdownDrain => {
ignore_closed_response_channel(respond.send(Ok(())));
CommandResult::immediate(0).break_after_reply()
}
},
}
}
fn maybe_rotate_segment(&mut self) -> Result<bool, StoreError> {
if !self
.active_segment
.needs_rotation(self.config.segment_max_bytes)
{
return Ok(false);
}
#[cfg(feature = "dangerous-test-hooks")]
let old_segment = self.segment_id;
#[cfg(feature = "dangerous-test-hooks")]
let new_segment = self.segment_id + 1;
#[cfg(feature = "dangerous-test-hooks")]
crate::store::fault::maybe_inject(
crate::store::fault::InjectionPoint::SegmentRotationCreate {
old_segment,
new_segment,
},
&self.config.fault_injector,
)?;
let new_active = Segment::<Active>::create_with_created_ns_on(
&self.config.data_dir,
self.segment_id + 1,
self.runtime.now_wall_ns(),
self.config.fs(),
)?;
self.sync_active_segment()?;
if let Err(e) = self.active_segment.write_sidx_footer(&self.sidx_collector) {
tracing::warn!("SIDX footer write failed (non-fatal): {e}");
} else if let Err(e) = self.active_segment.sync_with_mode(&self.config.sync.mode) {
tracing::warn!("SIDX footer durability flush failed (non-fatal): {e}");
}
self.sidx_collector = crate::store::segment::sidx::SidxEntryCollector::new();
let old = std::mem::replace(&mut self.active_segment, new_active);
let _sealed = old.seal();
self.segment_id += 1;
self.reader.set_active_segment(self.segment_id);
#[cfg(feature = "dangerous-test-hooks")]
crate::store::fault::maybe_inject(
crate::store::fault::InjectionPoint::SegmentRotation {
old_segment,
new_segment,
},
&self.config.fault_injector,
)?;
Ok(true)
}
}
#[cfg(test)]
#[path = "writer_mutation_tests.rs"]
mod mutation_kill_tests;