use std::sync::Arc;
use std::time::Duration;
use sqlx::SqlitePool;
use tokio::sync::{Mutex, watch};
use tokio::task::JoinSet;
use crate::reconcilers;
use crate::tx_util::now_ms;
#[derive(Clone, Debug)]
pub struct SqliteScannerConfig {
pub budget_reset_interval: Duration,
pub worker_registry_ttl_interval: Duration,
}
pub struct SqliteScannerHandle {
shutdown_tx: watch::Sender<bool>,
join_set: Arc<Mutex<JoinSet<()>>>,
}
impl SqliteScannerHandle {
pub async fn shutdown(&self, grace: Duration) -> usize {
let _ = self.shutdown_tx.send(true);
let mut js = self.join_set.lock().await;
let deadline = tokio::time::Instant::now() + grace;
let mut timed_out = 0usize;
while !js.is_empty() {
let remaining = deadline
.checked_duration_since(tokio::time::Instant::now())
.unwrap_or(Duration::ZERO);
if remaining.is_zero() {
timed_out = js.len();
js.abort_all();
break;
}
match tokio::time::timeout(remaining, js.join_next()).await {
Ok(Some(_res)) => continue,
Ok(None) => break,
Err(_) => {
timed_out = js.len();
js.abort_all();
break;
}
}
}
timed_out
}
}
impl Drop for SqliteScannerHandle {
fn drop(&mut self) {
let _ = self.shutdown_tx.send(true);
}
}
pub fn spawn_scanners(pool: SqlitePool, cfg: SqliteScannerConfig) -> SqliteScannerHandle {
let (tx, rx) = watch::channel(false);
let mut js = JoinSet::new();
if !cfg.budget_reset_interval.is_zero() {
spawn_reconciler(
&mut js,
rx.clone(),
pool.clone(),
cfg.budget_reset_interval,
"sqlite.budget_reset",
|pool| {
Box::pin(async move {
reconcilers::budget_reset::scan_tick(&pool, now_ms())
.await
.map(|_| ())
})
},
);
}
#[cfg(feature = "core")]
if !cfg.worker_registry_ttl_interval.is_zero() {
spawn_reconciler(
&mut js,
rx.clone(),
pool.clone(),
cfg.worker_registry_ttl_interval,
"sqlite.worker_registry_ttl_sweep",
|pool| {
Box::pin(async move {
crate::worker_registry::ttl_sweep_tick(&pool)
.await
.map(|_| ())
})
},
);
}
let scanners = js.len();
tracing::info!(
scanners,
"sqlite scanner supervisor spawned (RFC-023 Phase 3.5 budget_reset + RFC-025 Phase 4 worker_registry_ttl_sweep)"
);
SqliteScannerHandle {
shutdown_tx: tx,
join_set: Arc::new(Mutex::new(js)),
}
}
type TickFut = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<(), ff_core::engine_error::EngineError>> + Send>,
>;
fn spawn_reconciler<F>(
js: &mut JoinSet<()>,
mut shutdown: watch::Receiver<bool>,
pool: SqlitePool,
interval: Duration,
name: &'static str,
tick: F,
) where
F: Fn(SqlitePool) -> TickFut + Send + Sync + 'static,
{
js.spawn(async move {
let mut tk = tokio::time::interval(interval);
tk.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
tk.tick().await;
loop {
tokio::select! {
res = shutdown.changed() => {
if res.is_err() || *shutdown.borrow() {
return;
}
}
_ = tk.tick() => {
if *shutdown.borrow() {
return;
}
if let Err(e) = tick(pool.clone()).await {
tracing::warn!(
scanner = name,
error = %e,
"sqlite reconciler tick failed"
);
}
}
}
}
});
}