use std::panic::{AssertUnwindSafe, catch_unwind};
use std::path::Path;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Instant;
use tracing::{error, info, warn};
use crate::bridge::dispatch::{BridgeRequest, BridgeResponse};
use crate::bridge::envelope::{ErrorCode, Payload, Response, Status};
use crate::data::eventfd::{EventFd, EventFdNotifier};
use crate::data::executor::core_loop::CoreLoop;
use nodedb_bridge::buffer::{Consumer, Producer};
const IDLE_POLL_TIMEOUT_MS: i32 = 100;
const MAX_CONSECUTIVE_PANICS: u32 = 3;
const PANIC_WINDOW_SECS: u64 = 60;
const DEGRADED_COOLDOWN_SECS: u64 = 30;
struct CoreHealthWatchdog {
consecutive_panics: u32,
window_start: Option<Instant>,
degraded: bool,
degraded_at: Option<Instant>,
}
impl CoreHealthWatchdog {
fn new() -> Self {
Self {
consecutive_panics: 0,
window_start: None,
degraded: false,
degraded_at: None,
}
}
fn record_panic(&mut self) -> bool {
let now = Instant::now();
if let Some(start) = self.window_start {
if now.duration_since(start).as_secs() > PANIC_WINDOW_SECS {
self.consecutive_panics = 0;
self.window_start = Some(now);
}
} else {
self.window_start = Some(now);
}
self.consecutive_panics += 1;
if self.consecutive_panics >= MAX_CONSECUTIVE_PANICS {
self.degraded = true;
self.degraded_at = Some(Instant::now());
}
self.degraded
}
fn record_success(&mut self) {
if self.consecutive_panics > 0 {
self.consecutive_panics = 0;
self.window_start = None;
}
}
fn is_degraded(&mut self) -> bool {
if self.degraded
&& let Some(degraded_at) = self.degraded_at
&& degraded_at.elapsed().as_secs() >= DEGRADED_COOLDOWN_SECS
{
info!(
cooldown_secs = DEGRADED_COOLDOWN_SECS,
"core recovered from degraded mode after cool-down"
);
self.degraded = false;
self.degraded_at = None;
self.consecutive_panics = 0;
self.window_start = None;
return false;
}
self.degraded
}
}
#[derive(Debug, Clone)]
pub struct CoreCompactionConfig {
pub interval: std::time::Duration,
pub tombstone_threshold: f64,
pub query: nodedb_types::config::tuning::QueryTuning,
}
impl Default for CoreCompactionConfig {
fn default() -> Self {
Self {
interval: std::time::Duration::from_secs(600),
tombstone_threshold: 0.2,
query: nodedb_types::config::tuning::QueryTuning::default(),
}
}
}
#[allow(clippy::too_many_arguments)]
pub fn spawn_core(
core_id: usize,
request_rx: Consumer<BridgeRequest>,
response_tx: Producer<BridgeResponse>,
data_dir: &Path,
wal_records: Arc<[nodedb_wal::WalRecord]>,
num_cores: usize,
compaction_config: CoreCompactionConfig,
system_metrics: Option<Arc<crate::control::metrics::SystemMetrics>>,
) -> std::io::Result<(JoinHandle<()>, EventFdNotifier)> {
let data_dir = data_dir.to_path_buf();
let efd = EventFd::new().map_err(std::io::Error::other)?;
let notifier = efd.notifier();
let handle = std::thread::Builder::new()
.name(format!("data-core-{core_id}"))
.spawn(move || {
match nodedb_mem::arena::pin_thread_arena(core_id as u32) {
Ok(arena) => info!(core_id, arena, "pinned to jemalloc arena"),
Err(e) => warn!(core_id, error = %e, "failed to pin jemalloc arena, continuing with default"),
}
let mut core = CoreLoop::open(core_id, request_rx, response_tx, &data_dir)
.expect("failed to open CoreLoop engines");
if let Some(m) = system_metrics {
core.set_metrics(m);
}
core.set_compaction_config(
compaction_config.interval,
compaction_config.tombstone_threshold,
);
core.set_query_tuning(compaction_config.query);
core.load_vector_checkpoints();
if !wal_records.is_empty() {
core.replay_vector_wal(&wal_records, num_cores);
core.replay_kv_wal(&wal_records, num_cores);
}
info!(core_id, "data plane core started (eventfd-driven)");
let mut watchdog = CoreHealthWatchdog::new();
let mut last_checkpoint = Instant::now();
const CHECKPOINT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(300);
const MAX_TASKS_PER_ITERATION: usize = 256;
loop {
efd.poll_wait(IDLE_POLL_TIMEOUT_MS);
while efd.drain() > 0 {}
if watchdog.is_degraded() {
drain_and_reject(&mut core, core_id);
continue;
}
let mut tasks_processed = 0usize;
loop {
let result =
catch_unwind(AssertUnwindSafe(|| core.tick()));
match result {
Ok(0) => break, Ok(_) => {
watchdog.record_success();
tasks_processed += 1;
if tasks_processed >= MAX_TASKS_PER_ITERATION {
break; }
}
Err(panic_payload) => {
let msg = panic_message(&panic_payload);
error!(
core_id,
panic_count = watchdog.consecutive_panics + 1,
message = %msg,
"data plane core caught panic during tick"
);
let is_degraded = watchdog.record_panic();
if is_degraded {
error!(
core_id,
threshold = MAX_CONSECUTIVE_PANICS,
window_secs = PANIC_WINDOW_SECS,
"core entered DEGRADED mode — rejecting all requests"
);
drain_and_reject(&mut core, core_id);
}
break; }
}
}
if last_checkpoint.elapsed() >= CHECKPOINT_INTERVAL {
core.checkpoint_vector_indexes();
last_checkpoint = Instant::now();
}
core.maybe_run_maintenance();
}
})?;
Ok((handle, notifier))
}
fn drain_and_reject(core: &mut CoreLoop, core_id: usize) {
core.drain_requests();
while let Some(task) = core.task_queue.pop_front() {
let response = Response {
request_id: task.request_id(),
status: Status::Error,
attempt: 1,
partial: false,
payload: Payload::empty(),
watermark_lsn: core.watermark,
error_code: Some(ErrorCode::Internal {
detail: format!("core-{core_id} is degraded after repeated panics"),
}),
};
if let Err(e) = core
.response_tx
.try_push(BridgeResponse { inner: response })
{
warn!(core_id, error = %e, "failed to send degraded-rejection response");
}
}
}
fn panic_message(payload: &Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"non-string panic payload".to_string()
}
}