use std::sync::Arc;
use std::sync::atomic::Ordering;
use tracing::{debug, info};
use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::physical_plan::TimeseriesOp;
use crate::control::state::SharedState;
use crate::types::{TenantId, TraceId, VShardId};
use nodedb_types::Lsn;
const PAGE_SIZE: usize = 512;
pub fn spawn_wal_catchup_task(
shared: Arc<SharedState>,
initial_lsn: Lsn,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) {
shared
.wal_catchup_lsn
.store(initial_lsn.as_u64(), Ordering::Release);
tokio::spawn(async move {
let mut interval_ms: u64 = 500;
loop {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(interval_ms)) => {
let result = run_catchup_cycle(&shared).await;
interval_ms = match result {
CatchupResult::HasMore => 100, CatchupResult::Dispatched => 250, CatchupResult::Backpressured => 200, CatchupResult::Idle => 2000, };
}
_ = shutdown.changed() => {
info!("WAL catch-up task shutting down");
break;
}
}
}
});
}
enum CatchupResult {
HasMore,
Dispatched,
Backpressured,
Idle,
}
async fn run_catchup_cycle(shared: &SharedState) -> CatchupResult {
if shared.max_spsc_utilization() > 50 {
return CatchupResult::Backpressured;
}
let catchup_lsn = shared.wal_catchup_lsn.load(Ordering::Acquire);
let (records, has_more) = match shared
.wal
.replay_from_limit(Lsn::new(catchup_lsn + 1), PAGE_SIZE)
{
Ok(r) => r,
Err(e) => {
debug!(error = %e, lsn = catchup_lsn, "WAL catch-up replay failed");
return CatchupResult::Idle;
}
};
if records.is_empty() {
return CatchupResult::Idle;
}
let mut dispatched = 0usize;
let mut max_lsn = catchup_lsn;
for record in &records {
let record_type = nodedb_wal::record::RecordType::from_raw(record.logical_record_type());
if record_type != Some(nodedb_wal::record::RecordType::TimeseriesBatch) {
max_lsn = max_lsn.max(record.header.lsn);
continue;
}
let Ok((collection, payload)): Result<(String, Vec<u8>), _> =
zerompk::from_msgpack(&record.payload)
else {
max_lsn = max_lsn.max(record.header.lsn);
continue;
};
let tenant_id = TenantId::new(record.header.tenant_id);
let vshard_id = VShardId::new(record.header.vshard_id);
let plan = PhysicalPlan::Timeseries(TimeseriesOp::Ingest {
collection,
payload,
format: "ilp".to_string(),
wal_lsn: Some(record.header.lsn),
surrogates: Vec::new(),
});
match crate::control::server::dispatch_utils::dispatch_to_data_plane(
shared,
tenant_id,
vshard_id,
plan,
TraceId::ZERO,
)
.await
{
Ok(_) => {
dispatched += 1;
max_lsn = max_lsn.max(record.header.lsn);
}
Err(e) => {
debug!(error = %e, "WAL catch-up dispatch failed, will retry");
break;
}
}
}
if max_lsn > catchup_lsn {
shared.wal_catchup_lsn.fetch_max(max_lsn, Ordering::Release);
}
if dispatched > 0 {
info!(dispatched, max_lsn, "WAL catch-up cycle completed");
}
if has_more {
CatchupResult::HasMore
} else if dispatched > 0 {
CatchupResult::Dispatched
} else {
CatchupResult::Idle
}
}