uni_fork/maintenance.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Background fork-maintenance tasks: the TTL sweeper (Phase 4a) and the
5//! fork-local index builder (Phase 5a-impl Step 7).
6//!
7//! Both tasks own a generic *scheduling skeleton* here — an interval ticker
8//! with `MissedTickBehavior::Skip` plus a `ShutdownHandle::subscribe()`-driven
9//! `tokio::select!` loop — and delegate every per-tick action that touches
10//! `UniInner` to a [`ForkMaintenanceHost`]. uni-db implements the host over a
11//! `Weak<UniInner>`, so the tasks do not extend the database's lifetime.
12//!
13//! ## Why a tick-body trait rather than field accessors
14//!
15//! The original tasks walked `UniInner.fork_inners` (a
16//! `DashMap<ForkId, Weak<UniInner>>`) and upgraded each entry to read its
17//! storage/writer. Exposing those handles across the crate boundary would pull
18//! `UniInner` into uni-fork and create a dependency cycle. Instead the host
19//! exposes the two tick bodies (`sweep_expired_forks`,
20//! `build_fork_local_indexes`) as async methods; the reusable
21//! scheduling/shutdown machinery lives here.
22
23use std::sync::Arc;
24use std::time::Duration;
25
26use tokio::sync::broadcast;
27use tracing::debug;
28
29/// Host hook for the background fork-maintenance tasks.
30///
31/// Implemented by uni-db (over a `Weak<UniInner>` re-wrapped per tick). Each
32/// method is one full tick body; uni-fork owns only the interval/shutdown loop.
33#[async_trait::async_trait]
34pub trait ForkMaintenanceHost: Send + Sync + 'static {
35 /// One TTL-sweeper tick: drop every fork whose TTL has expired.
36 async fn sweep_expired_forks(&self);
37
38 /// One index-builder tick: build fork-local scalar indexes for any
39 /// (fork, label, column) whose fragment count crosses `threshold`.
40 async fn build_fork_local_indexes(&self, threshold: u64);
41}
42
43/// Spawn an interval-driven background task that runs `tick_fn` every
44/// `interval` and exits on the shutdown broadcast.
45///
46/// The ticker uses `MissedTickBehavior::Skip` so a long tick body doesn't
47/// trigger a thundering catch-up burst on the next tick. `task_label` names
48/// the task in shutdown-debug logs. Returns `None` (no task) when `disable`
49/// is set.
50fn spawn_ticker<F, Fut>(
51 interval: Duration,
52 disable: bool,
53 task_label: &'static str,
54 mut shutdown_rx: broadcast::Receiver<()>,
55 mut tick_fn: F,
56) -> Option<tokio::task::JoinHandle<()>>
57where
58 F: FnMut() -> Fut + Send + 'static,
59 Fut: std::future::Future<Output = ()> + Send,
60{
61 if disable {
62 debug!("{task_label} disabled by config");
63 return None;
64 }
65 let handle = tokio::spawn(async move {
66 let mut ticker = tokio::time::interval(interval);
67 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
68 loop {
69 tokio::select! {
70 _ = ticker.tick() => {
71 tick_fn().await;
72 }
73 _ = shutdown_rx.recv() => {
74 debug!("{task_label} received shutdown signal");
75 break;
76 }
77 }
78 }
79 });
80 Some(handle)
81}
82
83/// Spawn the TTL sweeper task. Returns `None` (no task) when `disable` is set.
84///
85/// Holds the host (typically backed by a `Weak<UniInner>`) so the sweeper does
86/// not extend the database's lifetime.
87pub fn spawn_sweeper<H: ForkMaintenanceHost>(
88 host: Arc<H>,
89 interval: Duration,
90 disable: bool,
91 shutdown_rx: broadcast::Receiver<()>,
92) -> Option<tokio::task::JoinHandle<()>> {
93 spawn_ticker(interval, disable, "fork sweeper", shutdown_rx, move || {
94 let host = Arc::clone(&host);
95 async move { host.sweep_expired_forks().await }
96 })
97}
98
99/// Spawn the fork-local index builder task. Returns `None` when `disable` is set.
100pub fn spawn_index_builder<H: ForkMaintenanceHost>(
101 host: Arc<H>,
102 interval: Duration,
103 threshold: u64,
104 disable: bool,
105 shutdown_rx: broadcast::Receiver<()>,
106) -> Option<tokio::task::JoinHandle<()>> {
107 spawn_ticker(
108 interval,
109 disable,
110 "fork index builder",
111 shutdown_rx,
112 move || {
113 let host = Arc::clone(&host);
114 async move { host.build_fork_local_indexes(threshold).await }
115 },
116 )
117}