use betex::{
config::JournalConfig,
disruptor::{Envelope, RingSlot},
engine::journaler::JournalHandler,
metrics::globals,
};
use crossbeam_utils::CachePadded;
use disrupt_rs::{
BusySpin, MissingFreeSlots, Producer, RingBufferFull, Sequence, build_single_producer,
};
use rand::{RngExt, SeedableRng, rngs::SmallRng};
use rkyv::bytecheck::CheckBytes;
use serde::{Deserialize, Serialize};
use std::{
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
type Event = RingSlot<Payload>;
type EnvelopeEvent = Envelope<Payload>;
type ArchivedEvent<'a> = &'a <EnvelopeEvent as rkyv::Archive>::Archived;
const WAL_PATH: &str = "target/sample.lmdb";
const WAL_NAME: &str = "sample_walw";
const PROJECTION_PATH: &str = "target/sample.lmdb/projection.json";
fn maybe_pin<B, E, W>(builder: B, core_id: Option<usize>) -> B
where
B: disrupt_rs::ProcessorSettings<E, W>,
W: disrupt_rs::wait_strategies::WaitStrategy,
{
match core_id {
Some(id) => builder.pin_at_core(id),
None => builder,
}
}
fn maybe_pin_current(core_id: Option<usize>) {
if let Some(id) = core_id {
let _ = core_affinity::set_for_current(core_affinity::CoreId { id });
}
}
#[derive(Debug, Default, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize, CheckBytes)]
struct Payload {
value: u64,
}
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ProjectionState {
global_seq: u64,
value: u64,
}
impl ProjectionState {
pub(crate) fn apply(&mut self, e: &EnvelopeEvent) {
debug_assert_eq!(e.seq, e.payload.value);
self.global_seq = e.seq;
self.value = e.payload.value;
}
pub(crate) fn apply_archived(&mut self, e: ArchivedEvent<'_>) -> anyhow::Result<()> {
let value: u64 = e.payload.value.into();
let seq: u64 = e.seq.into();
debug_assert_eq!(seq, value);
self.global_seq = seq;
self.value = value;
Ok(())
}
pub(crate) fn load_json(path: &Path) -> anyhow::Result<Option<Self>> {
match std::fs::read_to_string(path) {
Ok(s) => Ok(Some(serde_json::from_str::<Self>(&s)?)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
}
#[derive(Debug)]
struct ProjectionHandler {
projection: ProjectionState,
projection_path: PathBuf,
loaded_seq: Option<u64>,
snapshot_frequency: u64,
}
impl ProjectionHandler {
fn load(path: impl Into<PathBuf>) -> anyhow::Result<Self> {
let projection_path = path.into();
let loaded = ProjectionState::load_json(&projection_path)?;
let (projection, loaded_seq) = match loaded {
Some(p) => {
println!(
"[projection] loaded: global_seq={} value={}",
p.global_seq, p.value
);
(p, Some(p.global_seq))
}
None => {
println!("[projection] no on-disk projection; starting from empty state");
(ProjectionState::default(), None)
}
};
Ok(Self {
projection,
projection_path,
loaded_seq,
snapshot_frequency: 50_000,
})
}
fn store_snapshot_atomic(&self) -> anyhow::Result<()> {
if let Some(parent) = self.projection_path.parent() {
std::fs::create_dir_all(parent)?;
}
let tmp_path: PathBuf = PathBuf::from(format!("{}.tmp", self.projection_path.display()));
let json = serde_json::to_string(&self.projection)?;
std::fs::write(&tmp_path, json)?;
if std::fs::rename(&tmp_path, &self.projection_path).is_err() {
let _ = std::fs::remove_file(&self.projection_path);
std::fs::rename(&tmp_path, &self.projection_path)?;
}
Ok(())
}
fn catch_up_from_journal(
&mut self,
journal: &JournalHandler,
) -> anyhow::Result<Option<(u64, u64)>> {
let after_seq = self.loaded_seq;
let mut applied: u64 = 0;
journal.recover_from::<Payload, _>(after_seq, |e, _eob| {
applied += 1;
self.projection.apply_archived(e)
})?;
if applied != 0 {
println!(
"[projection] catch-up: applied {} event(s) from journal",
applied
);
}
let next_seq = self.projection.global_seq.saturating_add(1);
Ok(Some((next_seq, next_seq)))
}
}
impl disrupt_rs::EventHandler<Event> for ProjectionHandler {
fn on_event(&mut self, e: &Event, _seq: Sequence, _end_of_batch: bool) {
self.projection.apply(e);
if self
.projection
.global_seq
.is_multiple_of(self.snapshot_frequency)
&& let Err(err) = self.store_snapshot_atomic()
{
println!(
"[projection] snapshot error at global_seq={}: {err}",
self.projection.global_seq
);
}
}
}
#[derive(Debug)]
enum PublishError {
RingBufferFull(RingBufferFull),
MissingFreeSlots(MissingFreeSlots),
}
impl std::fmt::Display for PublishError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PublishError::RingBufferFull(e) => write!(f, "{e}"),
PublishError::MissingFreeSlots(e) => write!(f, "{e}"),
}
}
}
struct TxProducer<P> {
inner: P,
next_seq: u64,
next_tx_id: u64,
rng: SmallRng,
}
impl<P> TxProducer<P>
where
P: Producer<Event>,
{
fn new(inner: P, seed: u64, next_seq: u64, next_tx_id: u64) -> Self {
Self {
inner,
next_seq,
next_tx_id,
rng: SmallRng::seed_from_u64(seed),
}
}
fn try_publish(&mut self) -> Result<Sequence, RingBufferFull> {
let seq = self.next_seq;
let tx_id = self.next_tx_id;
let published = self.inner.try_publish(move |e| {
e.seq = seq;
e.payload.value = seq;
e.tx_id = tx_id;
e.tx_len = 1;
e.tx_ix = 0;
})?;
self.next_seq += 1;
self.next_tx_id += 1;
Ok(published)
}
fn try_batch_publish(&mut self, n: usize) -> Result<Sequence, MissingFreeSlots> {
if n == 0 {
return Ok(Sequence::default());
}
let start_seq = self.next_seq;
let tx_id = self.next_tx_id;
let tx_len: u16 = n.try_into().unwrap_or(u16::MAX);
let published = self.inner.try_batch_publish(n, move |iter| {
let mut seq = start_seq;
for (ix, e) in iter.enumerate() {
e.seq = seq;
e.payload.value = seq;
e.tx_id = tx_id;
e.tx_len = tx_len;
e.tx_ix = ix as u16;
seq += 1;
}
})?;
self.next_seq += n as u64;
self.next_tx_id += 1;
Ok(published)
}
fn try_publish_random(&mut self) -> Result<usize, PublishError> {
let p: u8 = self.rng.random_range(0..100);
if p < 80 {
self.try_publish()
.map(|_| 1)
.map_err(PublishError::RingBufferFull)
} else {
let batch_n = self.rng.random_range(2..9);
self.try_batch_publish(batch_n)
.map(|_| batch_n)
.map_err(PublishError::MissingFreeSlots)
}
}
}
struct InstrumentedProjectionHandler {
inner: ProjectionHandler,
}
impl disrupt_rs::EventHandler<Event> for InstrumentedProjectionHandler {
fn on_event(&mut self, e: &Event, seq: Sequence, end_of_batch: bool) {
betex::metrics_stage!("projection");
betex::metric!({ in_total: 1 });
let t0 = Instant::now();
self.inner.on_event(e, seq, end_of_batch);
betex::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: 1
});
}
}
fn main() -> anyhow::Result<()> {
let shutdown = Arc::new(AtomicBool::new(false));
let core_ids = core_affinity::get_core_ids().unwrap_or_default();
if core_ids.len() < 3 {
anyhow::bail!(
"need at least 3 cores for the pinned sample (have {}); \
set pinning to None or run on a bigger machine",
core_ids.len()
);
}
let producer_core = Some(core_ids[0].id);
let wal_poller_core = Some(core_ids[1].id);
let projection_core = Some(core_ids[2].id);
#[cfg(unix)]
{
use signal_hook::consts::{SIGINT, SIGTERM};
use signal_hook::flag;
flag::register(SIGTERM, Arc::clone(&shutdown)).expect("register SIGTERM");
flag::register(SIGINT, Arc::clone(&shutdown)).expect("register SIGINT");
}
let (start_seq, start_tx_id, projection_handler, journal) = {
println!("[recovery] opening lmdb at {WAL_PATH} db={WAL_NAME}");
let journal = JournalHandler::open(WAL_PATH, WAL_NAME, JournalConfig::default())?;
let mut handler = ProjectionHandler::load(PROJECTION_PATH)?;
println!("[recovery] projection state {handler:?}");
let start = handler.catch_up_from_journal(&journal)?;
let (start_seq, start_tx_id) = start.unwrap_or((0, 0));
(start_seq, start_tx_id, handler, journal)
};
betex::btx_metrics_init!("publisher", "wal_poller", "projection");
let stage_metrics = globals::ids();
let projection_handler = InstrumentedProjectionHandler {
inner: projection_handler,
};
let wait_strategy = BusySpin;
let (wal_poller, builder) = build_single_producer(
8192 * 4,
|| {
CachePadded::new(Envelope {
seq: 0,
payload: Payload { value: 0 },
response_cb: None,
tx_id: 0,
tx_len: 1,
tx_ix: 0,
})
},
wait_strategy,
)
.event_poller();
let (builder, gates) = builder.new_gates::<1>();
let wal_gate = gates[0].clone();
let producer = maybe_pin(builder.and_then_with_dependents(&gates), projection_core)
.handle_events_with(projection_handler)
.build();
let shutdown_pub = Arc::clone(&shutdown);
let publisher_thread = thread::spawn(move || {
maybe_pin_current(producer_core);
betex::metrics_stage!("publisher");
let seed = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
let mut producer = TxProducer::new(producer, seed, start_seq, start_tx_id);
let period = Duration::from_micros(1);
let mut next_tick = Instant::now() + period;
while !shutdown_pub.load(Ordering::Relaxed) {
betex::metric!({ inflight: 1, in_total: 1 });
let t0 = Instant::now();
let published_count = match producer.try_publish_random() {
Ok(n) => n,
Err(e) => {
betex::metric!({
backpressure_total: 1,
err_total: 1,
inflight: 0,
duration_ns: t0.elapsed().as_nanos() as u64
});
println!("publisher blocked, err: {e}");
thread::yield_now();
continue;
}
};
betex::metric!({
inflight: 0,
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: published_count as u64
});
for _ in 0..published_count {
next_tick += period;
}
let now = Instant::now();
if next_tick > now {
thread::sleep(next_tick - now);
} else {
next_tick = now;
}
}
println!("terminate signal received");
});
let shutdown_metrics = Arc::clone(&shutdown);
let metrics_reporter = {
let metrics = globals::metrics();
thread::spawn(move || {
let mut prev = metrics.snapshot();
while !shutdown_metrics.load(Ordering::Relaxed) {
thread::sleep(Duration::from_secs(1));
let now = metrics.snapshot();
println!("[metrics] 1s delta:");
for (stage_ix, stage_name) in now.schema.stages.iter().enumerate() {
let in_ix = stage_metrics.in_total.0 as usize;
let out_ix = stage_metrics.out_total.0 as usize;
let err_ix = stage_metrics.err_total.0 as usize;
let dp_in = now.counter_values[stage_ix][in_ix]
.saturating_sub(prev.counter_values[stage_ix][in_ix]);
let dp_out = now.counter_values[stage_ix][out_ix]
.saturating_sub(prev.counter_values[stage_ix][out_ix]);
let dp_err = now.counter_values[stage_ix][err_ix]
.saturating_sub(prev.counter_values[stage_ix][err_ix]);
if dp_in != 0 || dp_out != 0 || dp_err != 0 {
println!(" stage={stage_name} in={dp_in}/s out={dp_out}/s err={dp_err}/s");
}
}
prev = now;
}
})
};
let wal_thread = journal
.into_poller(wal_poller, wait_strategy, wal_gate, Arc::clone(&shutdown))
.poll_pinned(wal_poller_core);
publisher_thread
.join()
.expect("publisher thread should not panic");
wal_thread
.join()
.map_err(|_| anyhow::anyhow!("wal thread panicked"))??;
let _ = metrics_reporter.join();
Ok(())
}