use std::sync::Arc;
use std::time::Duration;
use tracing::info;
use crate::ServerConfig;
use crate::control::state::SharedState;
use crate::event::bus::EventConsumerRx;
use crate::event::trigger::TriggerDlq;
use crate::event::watermark::WatermarkStore;
use crate::wal::WalManager;
pub struct EventPlaneComponents {
pub wal: Arc<WalManager>,
pub event_consumers: Vec<EventConsumerRx>,
pub watermark_store: Arc<WatermarkStore>,
pub trigger_dlq: Arc<std::sync::Mutex<TriggerDlq>>,
}
pub fn log_mirror_restart_decisions(shared: &Arc<crate::control::state::SharedState>) {
let catalog = match shared.credentials.catalog() {
Some(c) => c,
None => return,
};
match crate::control::mirror::enumerate_resumable_mirrors(catalog) {
Ok(decisions) => {
for d in &decisions {
tracing::info!(
database = %d.database_name,
resume_lsn = d.resume_from_lsn,
needs_bootstrap = d.needs_bootstrap,
"mirror restart: observer link will resume"
);
}
}
Err(e) => {
tracing::warn!(error = %e, "mirror restart: failed to enumerate mirrors; skipping");
}
}
}
#[must_use = "EventPlane must be held for the server's lifetime; dropping it stops all event consumers"]
pub fn spawn_background_loops(
shared: &Arc<SharedState>,
components: EventPlaneComponents,
config: &ServerConfig,
num_cores: usize,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
) -> crate::event::EventPlane {
let EventPlaneComponents {
wal,
event_consumers,
watermark_store,
trigger_dlq,
} = components;
log_mirror_restart_decisions(shared);
crate::control::event_trigger::spawn_event_trigger_processor(Arc::clone(shared));
{
let shared_mirror = Arc::clone(shared);
crate::control::shutdown::spawn_loop(
&shared.loop_registry,
&shared.shutdown,
"mirror_lag_monitor",
move |mut shutdown| async move {
let mut tick = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = shutdown.wait_cancelled() => break,
_ = tick.tick() => {}
}
if shutdown.is_cancelled() {
break;
}
let catalog = match shared_mirror.credentials.catalog() {
Some(c) => c,
None => continue,
};
let databases = match catalog.list_databases() {
Ok(d) => d,
Err(e) => {
tracing::warn!(error = %e, "mirror_lag_monitor: catalog list error");
continue;
}
};
for db in databases {
let origin = match db.mirror_origin.as_ref() {
Some(o) => o,
None => continue,
};
if matches!(origin.status, nodedb_types::MirrorStatus::Promoted) {
continue;
}
let last_received =
shared_mirror.mirror_link_registry.last_received_ms(db.id);
crate::control::mirror::update_lag_status(
catalog,
db.id,
&db.name,
&origin.status,
last_received,
false,
&shared_mirror.database_metrics,
);
}
}
},
);
info!("mirror lag monitor running");
}
shared.webhook_manager.set_state(Arc::clone(shared));
let event_plane = crate::event::EventPlane::spawn(
event_consumers,
Arc::clone(&wal),
watermark_store,
Arc::clone(shared),
trigger_dlq,
Arc::clone(&shared.cdc_router),
Arc::clone(&shared.shutdown),
);
info!(num_cores, "event plane running");
if let Ok(mut w) = shared.retention_settings.write() {
*w = config.retention.clone();
}
let _collection_gc = crate::event::collection_gc::spawn_collection_gc(Arc::clone(shared));
info!(
retention_days = config.retention.deactivated_collection_retention_days,
sweep_interval_secs = config.retention.gc_sweep_interval_secs,
"collection-gc sweeper running"
);
let _l2_cleanup = crate::event::collection_gc::spawn_l2_cleanup(Arc::clone(shared));
let shared_rate = Arc::clone(shared);
crate::control::shutdown::spawn_loop(
&shared.loop_registry,
&shared.shutdown,
"tenant_rate_reset",
move |mut shutdown| async move {
let mut tick = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = shutdown.wait_cancelled() => break,
_ = tick.tick() => shared_rate.reset_tenant_rate_counters(),
}
}
},
);
crate::control::security::sessions::spawn_idle_sweep_loop(shared);
info!("idle session sweep loop running");
let shared_audit = Arc::clone(shared);
crate::control::shutdown::spawn_loop(
&shared.loop_registry,
&shared.shutdown,
"audit_log_flush",
move |mut shutdown| async move {
let mut tick = tokio::time::interval(Duration::from_secs(10));
loop {
tokio::select! {
_ = shutdown.wait_cancelled() => break,
_ = tick.tick() => shared_audit.flush_audit_log(),
}
}
},
);
let shared_mem = Arc::clone(shared);
crate::control::shutdown::spawn_loop(
&shared.loop_registry,
&shared.shutdown,
"tenant_memory_estimate",
move |mut shutdown| async move {
let mut tick = tokio::time::interval(Duration::from_secs(30));
loop {
tokio::select! {
_ = shutdown.wait_cancelled() => break,
_ = tick.tick() => shared_mem.update_tenant_memory_estimates(),
}
}
},
);
let shared_ckpt = Arc::clone(shared);
let shutdown_rx_ckpt = shutdown_rx.clone();
crate::control::checkpoint_manager::spawn_checkpoint_task(
shared_ckpt,
num_cores,
config.checkpoint.to_manager_config(),
shutdown_rx_ckpt,
);
let _metering_flush = crate::control::security::metering::counter::spawn_flush_task(
Arc::clone(&shared.usage_counter),
Arc::clone(&shared.usage_store),
60,
);
{
let shared_sweep = Arc::clone(shared);
let sweep_ms = std::env::var("NODEDB_CLONE_SWEEP_INTERVAL_MS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30_000);
let sweep_interval = Duration::from_millis(sweep_ms);
crate::control::shutdown::spawn_loop(
&shared.loop_registry,
&shared.shutdown,
"clone_materializer_sweep",
move |mut shutdown| async move {
let mut tick = tokio::time::interval(sweep_interval);
loop {
tokio::select! {
_ = shutdown.wait_cancelled() => break,
_ = tick.tick() => {}
}
if shutdown.is_cancelled() {
break;
}
let state_for_sweep = Arc::clone(&shared_sweep);
let result = tokio::task::spawn_blocking(move || {
let Some(catalog) = state_for_sweep.credentials.catalog() else {
return;
};
let cancel = std::sync::atomic::AtomicBool::new(false);
if let Err(e) =
crate::control::maintenance::clone_materializer::run_scheduled_sweep(
&state_for_sweep,
catalog,
&cancel,
)
{
tracing::warn!(error = %e, "clone materializer sweep error");
}
})
.await;
if let Err(e) = result {
tracing::warn!(error = %e, "clone materializer sweep task panicked");
}
}
},
);
info!(
interval_ms = sweep_ms,
"clone materializer background sweep running"
);
}
if let Some(ref cold_settings) = config.cold_storage {
let shared_cold = Arc::clone(shared);
let cold_settings_clone = cold_settings.clone();
let data_dir_clone = config.server.data_dir.clone();
let shutdown_rx_cold = shutdown_rx.clone();
crate::control::cold_tier::spawn_cold_tier_task(
shared_cold,
cold_settings_clone,
data_dir_clone,
shutdown_rx_cold,
);
info!("cold tier task spawned");
}
event_plane
}
pub fn spawn_response_poller(shared: &Arc<SharedState>) {
let shared_poller = Arc::clone(shared);
crate::control::shutdown::spawn_loop(
&shared.loop_registry,
&shared.shutdown,
"response_poller",
move |shutdown| async move {
let mut idle_iters: u32 = 0;
loop {
if shutdown.is_cancelled() {
break;
}
let routed = shared_poller.poll_and_route_responses();
if routed > 0 {
idle_iters = 0;
tokio::task::yield_now().await;
continue;
}
idle_iters = idle_iters.saturating_add(1);
if idle_iters <= 256 {
tokio::task::yield_now().await;
} else if idle_iters <= 1024 {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
} else {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
},
);
}