#[allow(unexpected_cfgs)]
#[cfg(feature = "exponential-backoff")]
compile_error!(
"Red flag: only Once and Bounded restart policies. \
Exponential backoff belongs in the product's supervisor, not here. \
See: SPEC.md ## RED FLAGS."
);
use crate::coordinate::{Coordinate, DagPosition};
use crate::event::{Event, EventKind, HashChain};
use crate::store::index::{DiskPos, IndexEntry, StoreIndex};
use crate::store::segment::{self, Active, FramePayload, Segment};
use crate::store::{AppendReceipt, StoreConfig, StoreError};
use flume::{Receiver, Sender, TrySendError};
use parking_lot::Mutex;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, info, trace};
pub(crate) enum WriterCommand {
Append {
entity: Arc<str>,
scope: Arc<str>,
event: Box<Event<Vec<u8>>>, kind: EventKind,
guards: AppendGuards,
respond: Sender<Result<AppendReceipt, StoreError>>,
},
Sync {
respond: Sender<Result<(), StoreError>>,
},
Shutdown {
respond: Sender<Result<(), StoreError>>,
},
#[doc(hidden)]
PanicForTest {
respond: Sender<Result<(), StoreError>>,
},
}
pub(crate) struct WriterHandle {
pub tx: Sender<WriterCommand>,
pub subscribers: Arc<SubscriberList>,
_thread: Option<std::thread::JoinHandle<()>>,
}
pub(crate) struct SubscriberList {
senders: Mutex<Vec<Sender<Notification>>>,
}
#[derive(Clone, Debug)]
pub struct Notification {
pub event_id: u128,
pub correlation_id: u128,
pub causation_id: Option<u128>,
pub coord: Coordinate,
pub kind: EventKind,
pub sequence: u64,
}
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub enum RestartPolicy {
#[default]
Once,
Bounded {
max_restarts: u32,
within_ms: u64,
},
}
impl SubscriberList {
pub(crate) fn new() -> Self {
Self {
senders: Mutex::new(Vec::new()),
}
}
pub(crate) fn subscribe(&self, capacity: usize) -> Receiver<Notification> {
let (tx, rx) = flume::bounded(capacity);
self.senders.lock().push(tx);
rx
}
pub(crate) fn broadcast(&self, notif: &Notification) {
let mut guard = self.senders.lock();
guard.retain(|tx| match tx.try_send(notif.clone()) {
Ok(()) => true,
Err(TrySendError::Full(_)) => true,
Err(TrySendError::Disconnected(_)) => false,
});
}
}
impl WriterHandle {
pub(crate) fn spawn(
config: &Arc<StoreConfig>,
index: &Arc<StoreIndex>,
subscribers: &Arc<SubscriberList>,
) -> Result<Self, StoreError> {
std::fs::create_dir_all(&config.data_dir).map_err(StoreError::Io)?;
let initial_segment_id = find_latest_segment_id(&config.data_dir).unwrap_or(0) + 1;
let initial_segment = Segment::<Active>::create(&config.data_dir, initial_segment_id)?;
let (tx, rx) = flume::bounded::<WriterCommand>(config.writer_channel_capacity);
let subs = Arc::clone(subscribers);
let cfg = Arc::clone(config);
let idx = Arc::clone(index);
let thread_name = format!("batpak-writer-{:08x}", {
let mut h: u64 = 0xcbf29ce484222325; for b in config.data_dir.to_string_lossy().bytes() {
h ^= b as u64;
h = h.wrapping_mul(0x100000001b3); }
h
});
let mut builder = std::thread::Builder::new().name(thread_name);
if let Some(stack_size) = config.writer_stack_size {
builder = builder.stack_size(stack_size);
}
let thread = builder
.spawn(move || {
writer_thread_main(&rx, &cfg, &idx, &subs, initial_segment, initial_segment_id);
})
.map_err(StoreError::Io)?;
Ok(Self {
tx,
subscribers: Arc::clone(subscribers),
_thread: Some(thread),
})
}
}
struct WriterState<'a> {
index: &'a StoreIndex,
active_segment: &'a mut Segment<Active>,
segment_id: &'a mut u64,
config: &'a StoreConfig,
subscribers: &'a SubscriberList,
}
fn writer_thread_main(
rx: &Receiver<WriterCommand>,
config: &StoreConfig,
index: &StoreIndex,
subscribers: &SubscriberList,
initial_segment: Segment<Active>,
initial_segment_id: u64,
) {
let mut segment = initial_segment;
let mut seg_id = initial_segment_id;
let mut restarts: u32 = 0;
let mut window_start = Instant::now();
loop {
let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
writer_loop(rx, config, index, subscribers, segment, seg_id);
}));
match result {
Ok(()) => return, Err(panic_info) => {
let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = panic_info.downcast_ref::<String>() {
s.clone()
} else {
"unknown panic".to_string()
};
let budget_ok = match &config.restart_policy {
RestartPolicy::Once => {
if restarts >= 1 {
false
} else {
restarts += 1;
true
}
}
RestartPolicy::Bounded {
max_restarts,
within_ms,
} => {
if window_start.elapsed() > std::time::Duration::from_millis(*within_ms) {
restarts = 0;
window_start = Instant::now();
}
if restarts >= *max_restarts {
false
} else {
restarts += 1;
true
}
}
};
if !budget_ok {
tracing::error!(
"writer restart budget exhausted — thread exiting. \
Last panic: {panic_msg}. Policy: {:?}",
config.restart_policy
);
return;
}
tracing::warn!(
"writer panic — restarting ({restarts}/{max}). Panic: {panic_msg}",
max = match &config.restart_policy {
RestartPolicy::Once => 1,
RestartPolicy::Bounded { max_restarts, .. } => *max_restarts,
}
);
seg_id = find_latest_segment_id(&config.data_dir).unwrap_or(seg_id) + 1;
segment = match Segment::<Active>::create(&config.data_dir, seg_id) {
Ok(s) => s,
Err(e) => {
tracing::error!(
"writer restart failed — cannot create segment: {e}. Thread exiting."
);
return;
}
};
}
}
}
}
fn writer_loop(
rx: &Receiver<WriterCommand>,
config: &StoreConfig,
index: &StoreIndex,
subscribers: &SubscriberList,
mut active_segment: Segment<Active>,
mut segment_id: u64,
) {
let mut events_since_sync: u32 = 0;
let mut state = WriterState {
index,
active_segment: &mut active_segment,
segment_id: &mut segment_id,
config,
subscribers,
};
for cmd in rx.iter() {
match cmd {
WriterCommand::Append {
entity,
scope,
event,
kind,
guards,
respond,
} => {
let result = state.handle_append(&entity, &scope, *event, kind, &guards);
let _ = respond.send(result);
events_since_sync += 1;
if events_since_sync >= config.sync_every_n_events {
if let Err(e) = state.active_segment.sync_with_mode(&config.sync_mode) {
tracing::error!("periodic sync failed: {e}");
}
events_since_sync = 0;
}
}
WriterCommand::Sync { respond } => {
let result = state.active_segment.sync_with_mode(&config.sync_mode);
let _ = respond.send(result);
events_since_sync = 0;
}
WriterCommand::Shutdown { respond } => {
let mut drained = 0;
while drained < config.shutdown_drain_limit {
match rx.try_recv() {
Ok(WriterCommand::Append {
entity,
scope,
event,
kind,
guards,
respond: r,
}) => {
let result =
state.handle_append(&entity, &scope, *event, kind, &guards);
let _ = r.send(result);
drained += 1;
}
Ok(WriterCommand::Shutdown { respond: r }) => {
let _ = r.send(Ok(()));
}
Ok(WriterCommand::Sync { respond: r }) => {
let _ = r.send(state.active_segment.sync_with_mode(&config.sync_mode));
}
Ok(WriterCommand::PanicForTest { respond: r }) => {
let _ = r.send(Ok(())); }
Err(_) => break, }
}
if let Err(e) = state.active_segment.sync_with_mode(&config.sync_mode) {
tracing::error!("shutdown sync failed: {e}");
}
let _ = respond.send(Ok(()));
return; }
#[allow(clippy::panic)]
WriterCommand::PanicForTest { respond } => {
let _ = respond.send(Ok(()));
panic!("PanicForTest: intentional writer panic for restart_policy testing");
}
}
}
}
pub(crate) struct AppendGuards {
pub correlation_id: u128,
pub causation_id: Option<u128>,
pub expected_sequence: Option<u32>,
pub idempotency_key: Option<u128>,
}
impl WriterState<'_> {
fn handle_append(
&mut self,
entity: &Arc<str>,
scope: &Arc<str>,
mut event: Event<Vec<u8>>,
kind: EventKind,
guards: &AppendGuards,
) -> Result<AppendReceipt, StoreError> {
let correlation_id = guards.correlation_id;
let causation_id = guards.causation_id;
let lock = self
.index
.entity_locks
.entry(Arc::clone(entity))
.or_insert_with(|| Arc::new(parking_lot::Mutex::new(())))
.clone();
let _entity_guard = lock.lock();
trace!(entity = %entity, "entity lock acquired");
if let Some(expected) = guards.expected_sequence {
let actual = self.index.get_latest(entity).map(|e| e.clock).unwrap_or(0);
if actual != expected {
return Err(StoreError::SequenceMismatch {
entity: entity.to_string(),
expected,
actual,
});
}
}
if let Some(key) = guards.idempotency_key {
if let Some(entry) = self.index.get_by_id(key) {
return Ok(AppendReceipt {
event_id: entry.event_id,
sequence: entry.global_sequence,
disk_pos: entry.disk_pos,
});
}
}
let prev_hash = self
.index
.get_latest(entity)
.map(|e| e.hash_chain.event_hash)
.unwrap_or([0u8; 32]);
let clock = self
.index
.get_latest(entity)
.map(|e| e.clock + 1)
.unwrap_or(0);
#[allow(clippy::cast_sign_loss)] let raw_ms = (event.header.timestamp_us / 1000) as u64;
let last_ms = self
.index
.get_latest(entity)
.map(|e| e.wall_ms)
.unwrap_or(0);
let now_ms = raw_ms.max(last_ms);
let position = DagPosition::child_at(clock, now_ms, 0);
event.header.position = position;
event.header.event_kind = kind;
event.header.correlation_id = correlation_id;
event.header.causation_id = causation_id;
#[cfg(feature = "blake3")]
let event_hash = crate::event::hash::compute_hash(&event.payload);
#[cfg(not(feature = "blake3"))]
let event_hash = [0u8; 32];
event.hash_chain = Some(HashChain {
prev_hash,
event_hash,
});
event.header.content_hash = event_hash;
let frame_payload = FramePayload {
event: event.clone(),
entity: entity.to_string(),
scope: scope.to_string(),
};
let frame = segment::frame_encode(&frame_payload)?;
if self
.active_segment
.needs_rotation(self.config.segment_max_bytes)
{
self.active_segment.sync_with_mode(&self.config.sync_mode)?;
let old = std::mem::replace(
self.active_segment,
Segment::<Active>::create(&self.config.data_dir, *self.segment_id + 1)?,
);
let _sealed = old.seal();
*self.segment_id += 1;
info!(segment_id = *self.segment_id, "segment rotated");
}
let offset = self.active_segment.write_frame(&frame)?;
trace!(offset = offset, len = frame.len(), "frame written");
let global_seq = self.index.global_sequence();
let disk_pos = DiskPos {
segment_id: *self.segment_id,
offset,
#[allow(clippy::cast_possible_truncation)] length: frame.len() as u32,
};
let entry = IndexEntry {
event_id: event.header.event_id,
correlation_id,
causation_id,
coord: Coordinate::new(entity.as_ref(), scope.as_ref())
.map_err(StoreError::Coordinate)?,
kind,
wall_ms: now_ms,
clock,
hash_chain: event.hash_chain.clone().unwrap_or_default(),
disk_pos: disk_pos.clone(),
global_sequence: global_seq,
};
self.index.insert(entry);
debug!(event_id = %event.header.event_id, clock = clock, "append committed");
self.subscribers.broadcast(&Notification {
event_id: event.header.event_id,
correlation_id,
causation_id,
coord: Coordinate::new(entity.as_ref(), scope.as_ref())
.map_err(StoreError::Coordinate)?,
kind,
sequence: global_seq,
});
Ok(AppendReceipt {
event_id: event.header.event_id,
sequence: global_seq,
disk_pos,
})
}
}
fn find_latest_segment_id(dir: &std::path::Path) -> Option<u64> {
std::fs::read_dir(dir)
.ok()?
.filter_map(|e| e.ok())
.filter_map(|e| {
let name = e.file_name();
let name = name.to_str()?;
if name.ends_with(".fbat") {
name.trim_end_matches(".fbat").parse::<u64>().ok()
} else {
None
}
})
.max()
}