Skip to main content

reddb_server/runtime/
lease_loop.rs

1//! Serverless writer-lease boot path (PLAN.md Phase 5 / W6).
2//!
3//! Boot-time entrypoint that opts the runtime into lease-fenced
4//! writes when:
5//!   * `RED_LEASE_REQUIRED=true` (or `1`) is set, and
6//!   * a remote backend is configured (S3, FS, HTTP, …).
7//!
8//! All transitions (acquire / refresh / lost / release) are
9//! delegated to `LeaseLifecycle`; this module only owns env-var
10//! parsing, lifecycle construction, and the refresh thread.
11//!
12//! The refresh thread uses `LeaseTimerWheel` instead of a fixed
13//! `thread::sleep(ttl_ms / 3)` polling loop. The wheel wakes the
14//! thread exactly when the next refresh is due and reschedules after
15//! each successful refresh, so idle leases cost zero CPU.
16//!
17//! Env knobs:
18//!   * `RED_LEASE_REQUIRED` — `true` / `1` to enable. Default off.
19//!   * `RED_LEASE_TTL_SECS` — lease TTL in seconds. Default 60.
20//!   * `RED_LEASE_HOLDER_ID` — explicit holder id. Default
21//!     `<hostname>-<pid>`.
22//!   * `RED_LEASE_PREFIX` — backend prefix for the lease object key.
23//!     Default `leases/`.
24
25use std::sync::Arc;
26use std::thread;
27use std::time::{Duration, Instant};
28
29use crate::api::{RedDBError, RedDBResult};
30use crate::replication::lease::LeaseStore;
31use crate::runtime::lease_lifecycle::{LeaseLifecycle, MarkDraining};
32use crate::runtime::lease_timer_wheel::LeaseTimerWheel;
33use crate::runtime::RedDBRuntime;
34
35/// Try to start the writer-lease lifecycle if the operator opted in.
36/// Returns `Ok(())` when:
37///   * `RED_LEASE_REQUIRED` is unset / false, or
38///   * the lease was acquired and the refresh thread is running.
39///
40/// Returns `Err` when the operator asked for a lease and we couldn't
41/// get one — the caller should refuse to serve in that case.
42pub fn start_lease_loop_if_required(runtime: &RedDBRuntime) -> RedDBResult<()> {
43    if !lease_required() {
44        return Ok(());
45    }
46
47    let backend = runtime
48        .db()
49        .options()
50        .remote_backend_atomic
51        .clone()
52        .ok_or_else(|| {
53            RedDBError::Internal(
54                "RED_LEASE_REQUIRED=true but the configured backend does not support atomic \
55                 CAS — use s3, fs, or http with RED_HTTP_CONDITIONAL_WRITES=true"
56                    .to_string(),
57            )
58        })?;
59
60    let database_key = runtime
61        .db()
62        .options()
63        .remote_key
64        .clone()
65        .unwrap_or_else(|| "main".to_string());
66    let ttl_ms = lease_ttl_secs() * 1000;
67    let holder_id = lease_holder_id();
68    let prefix = std::env::var("RED_LEASE_PREFIX")
69        .ok()
70        .filter(|v| !v.trim().is_empty())
71        .unwrap_or_else(|| "leases/".to_string());
72
73    let store = Arc::new(LeaseStore::new(backend).with_prefix(prefix));
74    let runtime_for_drain = runtime.clone();
75    let mark_draining: MarkDraining = Arc::new(move || {
76        runtime_for_drain.lifecycle().mark_draining();
77    });
78    let runtime_for_term = runtime.clone();
79    let current_term = Arc::new(move || runtime_for_term.current_replication_term());
80    let lifecycle = Arc::new(LeaseLifecycle::new(
81        store,
82        runtime.write_gate_arc(),
83        runtime.audit_log_arc(),
84        mark_draining,
85        current_term,
86        holder_id,
87        database_key,
88        ttl_ms,
89    ));
90    lifecycle.try_acquire()?;
91
92    // Stash the lifecycle on the runtime so admin handlers and the
93    // refresh thread share one instance. The OnceLock guarantees
94    // idempotency — re-entering the boot path (tests, double-init)
95    // returns Err and we drop the duplicate.
96    let lifecycle_for_runtime = Arc::clone(&lifecycle);
97    let _ = runtime.set_lease_lifecycle(lifecycle_for_runtime);
98
99    spawn_refresh_thread(runtime.clone(), lifecycle, ttl_ms);
100    Ok(())
101}
102
103fn spawn_refresh_thread(runtime: RedDBRuntime, lifecycle: Arc<LeaseLifecycle>, ttl_ms: u64) {
104    let interval = Duration::from_millis(ttl_ms.saturating_div(3).max(1_000));
105    // Timer wheel replaces thread::sleep(interval) — wakes exactly when the
106    // next refresh is due. Zero CPU during the idle window.
107    let wheel = Arc::new(LeaseTimerWheel::new(100));
108    wheel.schedule("lease-refresh".to_string(), Instant::now() + interval);
109
110    let wheel_for_handler = Arc::clone(&wheel);
111    let _ = thread::Builder::new()
112        .name("reddb-lease-refresh".into())
113        .spawn(move || {
114            wheel.run_until_shutdown(move |_id| {
115                // Bail out cleanly on shutdown so the holder thread does not
116                // refresh past the runtime's lifetime and confuse the next
117                // writer's acquire attempt.
118                let phase = runtime.lifecycle().phase();
119                if matches!(
120                    phase,
121                    crate::runtime::lifecycle::Phase::Draining
122                        | crate::runtime::lifecycle::Phase::ShuttingDown
123                        | crate::runtime::lifecycle::Phase::Stopped
124                ) {
125                    let _ = lifecycle.release();
126                    return false;
127                }
128
129                if lifecycle.refresh().is_err() {
130                    return false;
131                }
132
133                // Reschedule the next refresh.
134                wheel_for_handler.schedule("lease-refresh".to_string(), Instant::now() + interval);
135                true
136            });
137        });
138}
139
140fn lease_required() -> bool {
141    std::env::var("RED_LEASE_REQUIRED")
142        .ok()
143        .map(|v| {
144            let t = v.trim();
145            t.eq_ignore_ascii_case("true") || t == "1" || t.eq_ignore_ascii_case("yes")
146        })
147        .unwrap_or(false)
148}
149
150fn lease_ttl_secs() -> u64 {
151    std::env::var("RED_LEASE_TTL_SECS")
152        .ok()
153        .and_then(|s| s.trim().parse::<u64>().ok())
154        .filter(|v| *v > 0)
155        .unwrap_or(60)
156}
157
158fn lease_holder_id() -> String {
159    if let Some(explicit) = crate::utils::env_with_file_fallback("RED_LEASE_HOLDER_ID") {
160        return explicit;
161    }
162    let host = std::env::var("HOSTNAME")
163        .or_else(|_| std::env::var("HOST"))
164        .unwrap_or_else(|_| "unknown-host".to_string());
165    format!("{}-{}", host, std::process::id())
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    #[test]
173    fn lease_required_parses_truthy_values() {
174        unsafe {
175            std::env::set_var("RED_LEASE_REQUIRED", "true");
176        }
177        assert!(lease_required());
178        unsafe {
179            std::env::set_var("RED_LEASE_REQUIRED", "1");
180        }
181        assert!(lease_required());
182        unsafe {
183            std::env::set_var("RED_LEASE_REQUIRED", "yes");
184        }
185        assert!(lease_required());
186        unsafe {
187            std::env::set_var("RED_LEASE_REQUIRED", "false");
188        }
189        assert!(!lease_required());
190        unsafe {
191            std::env::remove_var("RED_LEASE_REQUIRED");
192        }
193        assert!(!lease_required());
194    }
195
196    #[test]
197    fn ttl_defaults_to_60_when_unset() {
198        unsafe {
199            std::env::remove_var("RED_LEASE_TTL_SECS");
200        }
201        assert_eq!(lease_ttl_secs(), 60);
202    }
203
204    #[test]
205    fn ttl_rejects_zero_and_negative() {
206        unsafe {
207            std::env::set_var("RED_LEASE_TTL_SECS", "0");
208        }
209        assert_eq!(lease_ttl_secs(), 60);
210        unsafe {
211            std::env::set_var("RED_LEASE_TTL_SECS", "abc");
212        }
213        assert_eq!(lease_ttl_secs(), 60);
214        unsafe {
215            std::env::set_var("RED_LEASE_TTL_SECS", "30");
216        }
217        assert_eq!(lease_ttl_secs(), 30);
218        unsafe {
219            std::env::remove_var("RED_LEASE_TTL_SECS");
220        }
221    }
222
223    #[test]
224    fn holder_id_falls_back_when_no_env() {
225        unsafe {
226            std::env::remove_var("RED_LEASE_HOLDER_ID");
227        }
228        let id = lease_holder_id();
229        assert!(id.contains('-'));
230        assert!(!id.is_empty());
231    }
232
233    #[test]
234    fn holder_id_uses_explicit_when_set() {
235        unsafe {
236            std::env::set_var("RED_LEASE_HOLDER_ID", "explicit-writer-1");
237        }
238        assert_eq!(lease_holder_id(), "explicit-writer-1");
239        unsafe {
240            std::env::remove_var("RED_LEASE_HOLDER_ID");
241        }
242    }
243}