#[cfg(not(madsim))]
use std::{
sync::{
Arc,
atomic::{AtomicU64, Ordering},
mpsc::{Receiver, RecvTimeoutError, SyncSender},
},
time::Instant,
};
use nautilus_core::UnixNanos;
#[cfg(not(madsim))]
use nautilus_core::time::AtomicTime;
use crate::{
backend::AppendEntry, entry::EventStoreEntry, hash::compute_entry_hash, writer::EntryDraft,
};
#[cfg(not(madsim))]
use crate::{
backend::EventStore,
error::EventStoreError,
snapshot::SnapshotAnchor,
writer::{
WriterConfig,
halt::{HaltCallback, HaltReason},
},
};
#[cfg(not(madsim))]
pub(super) enum WriterMessage {
Entry {
draft: EntryDraft,
ts_publish: UnixNanos,
},
Close {
run_ended: EntryDraft,
ack: SyncSender<Result<u64, EventStoreError>>,
},
RecordSnapshotAnchor {
blob_ref: String,
content_hash: String,
ack: SyncSender<Result<SnapshotAnchor, EventStoreError>>,
},
}
#[cfg(not(madsim))]
#[allow(clippy::needless_pass_by_value)] pub(super) fn run(
mut backend: Box<dyn EventStore + Send>,
rx: Receiver<WriterMessage>,
config: WriterConfig,
halt: HaltCallback,
high_watermark: Arc<AtomicU64>,
clock: &'static AtomicTime,
) {
let mut next_seq = match backend.high_watermark() {
Ok(hwm) => hwm + 1,
Err(e) => {
halt(HaltReason::from_backend_error(&e));
return;
}
};
let mut batch: Vec<AppendEntry> = Vec::with_capacity(config.max_batch_entries);
let mut batch_deadline: Option<Instant> = None;
loop {
let recv_result = match batch_deadline {
None => rx.recv().map_err(|_| RecvTimeoutError::Disconnected),
Some(deadline) => {
let now = Instant::now();
let remaining = deadline.saturating_duration_since(now);
rx.recv_timeout(remaining)
}
};
match recv_result {
Ok(WriterMessage::Entry { draft, ts_publish }) => {
let append = build_append_entry(draft, ts_publish, next_seq);
next_seq += 1;
batch.push(append);
if batch_deadline.is_none() {
batch_deadline = Some(Instant::now() + config.max_batch_latency);
}
if batch.len() >= config.max_batch_entries
&& !flush(backend.as_mut(), &mut batch, &halt, high_watermark.as_ref())
{
return;
}
if batch.is_empty() {
batch_deadline = None;
}
}
Ok(WriterMessage::Close { run_ended, ack }) => {
drain_pending(&rx, &mut batch, &mut next_seq);
let now = clock.get_time_ns();
let run_ended_append = build_append_entry(run_ended, now, next_seq);
batch.push(run_ended_append);
let flush_ok = flush(backend.as_mut(), &mut batch, &halt, high_watermark.as_ref());
if !flush_ok {
let _ = ack.send(Err(EventStoreError::Backend(
"writer fail-stopped before seal".to_string(),
)));
return;
}
let seal_result = backend.seal(crate::manifest::RunStatus::Ended);
let final_result = match seal_result {
Ok(()) => Ok(high_watermark.load(Ordering::Acquire)),
Err(e) => {
halt(HaltReason::from_backend_error(&e));
Err(e)
}
};
let _ = ack.send(final_result);
return;
}
Ok(WriterMessage::RecordSnapshotAnchor {
blob_ref,
content_hash,
ack,
}) => {
if !record_snapshot_anchor(
backend.as_mut(),
&mut batch,
&halt,
high_watermark.as_ref(),
blob_ref,
content_hash,
&ack,
) {
return;
}
batch_deadline = None;
}
Err(RecvTimeoutError::Timeout) => {
if !batch.is_empty()
&& !flush(backend.as_mut(), &mut batch, &halt, high_watermark.as_ref())
{
return;
}
batch_deadline = None;
}
Err(RecvTimeoutError::Disconnected) => {
if !batch.is_empty() {
let _ = flush(backend.as_mut(), &mut batch, &halt, high_watermark.as_ref());
}
return;
}
}
}
}
#[cfg(not(madsim))]
fn record_snapshot_anchor(
backend: &mut dyn EventStore,
batch: &mut Vec<AppendEntry>,
halt: &HaltCallback,
high_watermark: &AtomicU64,
blob_ref: String,
content_hash: String,
ack: &SyncSender<Result<SnapshotAnchor, EventStoreError>>,
) -> bool {
if !batch.is_empty() && !flush(backend, batch, halt, high_watermark) {
let _ = ack.send(Err(EventStoreError::Backend(
"writer fail-stopped before snapshot anchor".to_string(),
)));
return false;
}
let hwm = high_watermark.load(Ordering::Acquire);
let anchor = SnapshotAnchor::new(hwm, blob_ref, content_hash);
let result = match backend.record_snapshot_anchor(anchor.clone()) {
Ok(()) => Ok(anchor),
Err(e) => {
halt(HaltReason::from_backend_error(&e));
Err(e)
}
};
let keep_running = result.is_ok();
let _ = ack.send(result);
keep_running
}
#[cfg(not(madsim))]
fn drain_pending(rx: &Receiver<WriterMessage>, batch: &mut Vec<AppendEntry>, next_seq: &mut u64) {
while let Ok(msg) = rx.try_recv() {
match msg {
WriterMessage::Entry { draft, ts_publish } => {
let append = build_append_entry(draft, ts_publish, *next_seq);
*next_seq += 1;
batch.push(append);
}
WriterMessage::Close { ack, .. } => {
let _ = ack.send(Err(EventStoreError::Backend(
"writer is already closing".to_string(),
)));
}
WriterMessage::RecordSnapshotAnchor { ack, .. } => {
let _ = ack.send(Err(EventStoreError::Backend(
"writer is closing before snapshot anchor".to_string(),
)));
}
}
}
}
#[cfg(not(madsim))]
fn flush(
backend: &mut dyn EventStore,
batch: &mut Vec<AppendEntry>,
halt: &HaltCallback,
high_watermark: &AtomicU64,
) -> bool {
if batch.is_empty() {
return true;
}
match backend.append_batch(batch) {
Ok(new_hwm) => {
high_watermark.store(new_hwm, Ordering::Release);
batch.clear();
true
}
Err(e) => {
halt(HaltReason::from_backend_error(&e));
batch.clear();
false
}
}
}
pub(super) fn build_append_entry(
draft: EntryDraft,
ts_publish: UnixNanos,
seq: u64,
) -> AppendEntry {
let EntryDraft {
headers,
topic,
payload_type,
payload,
ts_init,
index_keys,
} = draft;
let entry_hash = compute_entry_hash(
seq,
ts_init,
ts_publish,
topic.as_ref(),
payload_type.as_str(),
&payload,
&headers,
);
let entry = EventStoreEntry::new(
entry_hash,
seq,
headers,
topic,
payload_type,
payload,
ts_init,
ts_publish,
);
AppendEntry::new(entry, index_keys)
}