Skip to main content

reddb_server/runtime/
lease_loop.rs

1//! Serverless writer-lease boot path (PLAN.md Phase 5 / W6).
2//!
3//! Boot-time entrypoint that opts the runtime into lease-fenced
4//! writes when:
5//!   * `RED_LEASE_REQUIRED=true` (or `1`) is set, and
6//!   * a remote backend is configured (S3, FS, HTTP, …).
7//!
8//! All transitions (acquire / refresh / lost / release) are
9//! delegated to `LeaseLifecycle`; this module only owns env-var
10//! parsing, lifecycle construction, and the refresh thread.
11//!
12//! The refresh thread uses `LeaseTimerWheel` instead of a fixed
13//! `thread::sleep(ttl_ms / 3)` polling loop. The wheel wakes the
14//! thread exactly when the next refresh is due and reschedules after
15//! each successful refresh, so idle leases cost zero CPU.
16//!
17//! Env knobs:
18//!   * `RED_LEASE_REQUIRED` — `true` / `1` to enable. Default off.
19//!   * `RED_LEASE_TTL_SECS` — lease TTL in seconds. Default 60.
20//!   * `RED_LEASE_HOLDER_ID` — explicit holder id. Default
21//!     `<hostname>-<pid>`.
22//!   * `RED_LEASE_PREFIX` — backend prefix for the lease object key.
23//!     Default `leases/`.
24
25use std::sync::Arc;
26use std::thread;
27use std::time::{Duration, Instant};
28
29use crate::api::{RedDBError, RedDBResult};
30use crate::replication::lease::LeaseStore;
31use crate::runtime::lease_lifecycle::{LeaseLifecycle, MarkDraining};
32use crate::runtime::lease_timer_wheel::LeaseTimerWheel;
33use crate::runtime::RedDBRuntime;
34
35/// Try to start the writer-lease lifecycle if the operator opted in.
36/// Returns `Ok(())` when:
37///   * `RED_LEASE_REQUIRED` is unset / false, or
38///   * the lease was acquired and the refresh thread is running.
39///
40/// Returns `Err` when the operator asked for a lease and we couldn't
41/// get one — the caller should refuse to serve in that case.
42pub fn start_lease_loop_if_required(runtime: &RedDBRuntime) -> RedDBResult<()> {
43    if !lease_required() {
44        return Ok(());
45    }
46
47    let backend = runtime
48        .db()
49        .options()
50        .remote_backend_atomic
51        .clone()
52        .ok_or_else(|| {
53            RedDBError::Internal(
54                "RED_LEASE_REQUIRED=true but the configured backend does not support atomic \
55                 CAS — use s3, fs, or http with RED_HTTP_CONDITIONAL_WRITES=true"
56                    .to_string(),
57            )
58        })?;
59
60    let database_key = runtime
61        .db()
62        .options()
63        .remote_key
64        .clone()
65        .unwrap_or_else(|| "main".to_string());
66    let ttl_ms = lease_ttl_secs() * 1000;
67    let holder_id = lease_holder_id();
68    let prefix = std::env::var("RED_LEASE_PREFIX")
69        .ok()
70        .filter(|v| !v.trim().is_empty())
71        .unwrap_or_else(|| "leases/".to_string());
72
73    let store = Arc::new(LeaseStore::new(backend).with_prefix(prefix));
74    let runtime_for_drain = runtime.clone();
75    let mark_draining: MarkDraining = Arc::new(move || {
76        runtime_for_drain.lifecycle().mark_draining();
77    });
78    let lifecycle = Arc::new(LeaseLifecycle::new(
79        store,
80        runtime.write_gate_arc(),
81        runtime.audit_log_arc(),
82        mark_draining,
83        holder_id,
84        database_key,
85        ttl_ms,
86    ));
87    lifecycle.try_acquire()?;
88
89    // Stash the lifecycle on the runtime so admin handlers and the
90    // refresh thread share one instance. The OnceLock guarantees
91    // idempotency — re-entering the boot path (tests, double-init)
92    // returns Err and we drop the duplicate.
93    let lifecycle_for_runtime = Arc::clone(&lifecycle);
94    let _ = runtime.set_lease_lifecycle(lifecycle_for_runtime);
95
96    spawn_refresh_thread(runtime.clone(), lifecycle, ttl_ms);
97    Ok(())
98}
99
100fn spawn_refresh_thread(runtime: RedDBRuntime, lifecycle: Arc<LeaseLifecycle>, ttl_ms: u64) {
101    let interval = Duration::from_millis(ttl_ms.saturating_div(3).max(1_000));
102    // Timer wheel replaces thread::sleep(interval) — wakes exactly when the
103    // next refresh is due. Zero CPU during the idle window.
104    let wheel = Arc::new(LeaseTimerWheel::new(100));
105    wheel.schedule("lease-refresh".to_string(), Instant::now() + interval);
106
107    let wheel_for_handler = Arc::clone(&wheel);
108    let _ = thread::Builder::new()
109        .name("reddb-lease-refresh".into())
110        .spawn(move || {
111            wheel.run_until_shutdown(move |_id| {
112                // Bail out cleanly on shutdown so the holder thread does not
113                // refresh past the runtime's lifetime and confuse the next
114                // writer's acquire attempt.
115                let phase = runtime.lifecycle().phase();
116                if matches!(
117                    phase,
118                    crate::runtime::lifecycle::Phase::Draining
119                        | crate::runtime::lifecycle::Phase::ShuttingDown
120                        | crate::runtime::lifecycle::Phase::Stopped
121                ) {
122                    let _ = lifecycle.release();
123                    return false;
124                }
125
126                if lifecycle.refresh().is_err() {
127                    return false;
128                }
129
130                // Reschedule the next refresh.
131                wheel_for_handler.schedule("lease-refresh".to_string(), Instant::now() + interval);
132                true
133            });
134        });
135}
136
137fn lease_required() -> bool {
138    std::env::var("RED_LEASE_REQUIRED")
139        .ok()
140        .map(|v| {
141            let t = v.trim();
142            t.eq_ignore_ascii_case("true") || t == "1" || t.eq_ignore_ascii_case("yes")
143        })
144        .unwrap_or(false)
145}
146
147fn lease_ttl_secs() -> u64 {
148    std::env::var("RED_LEASE_TTL_SECS")
149        .ok()
150        .and_then(|s| s.trim().parse::<u64>().ok())
151        .filter(|v| *v > 0)
152        .unwrap_or(60)
153}
154
155fn lease_holder_id() -> String {
156    if let Some(explicit) = crate::utils::env_with_file_fallback("RED_LEASE_HOLDER_ID") {
157        return explicit;
158    }
159    let host = std::env::var("HOSTNAME")
160        .or_else(|_| std::env::var("HOST"))
161        .unwrap_or_else(|_| "unknown-host".to_string());
162    format!("{}-{}", host, std::process::id())
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168
169    #[test]
170    fn lease_required_parses_truthy_values() {
171        unsafe {
172            std::env::set_var("RED_LEASE_REQUIRED", "true");
173        }
174        assert!(lease_required());
175        unsafe {
176            std::env::set_var("RED_LEASE_REQUIRED", "1");
177        }
178        assert!(lease_required());
179        unsafe {
180            std::env::set_var("RED_LEASE_REQUIRED", "yes");
181        }
182        assert!(lease_required());
183        unsafe {
184            std::env::set_var("RED_LEASE_REQUIRED", "false");
185        }
186        assert!(!lease_required());
187        unsafe {
188            std::env::remove_var("RED_LEASE_REQUIRED");
189        }
190        assert!(!lease_required());
191    }
192
193    #[test]
194    fn ttl_defaults_to_60_when_unset() {
195        unsafe {
196            std::env::remove_var("RED_LEASE_TTL_SECS");
197        }
198        assert_eq!(lease_ttl_secs(), 60);
199    }
200
201    #[test]
202    fn ttl_rejects_zero_and_negative() {
203        unsafe {
204            std::env::set_var("RED_LEASE_TTL_SECS", "0");
205        }
206        assert_eq!(lease_ttl_secs(), 60);
207        unsafe {
208            std::env::set_var("RED_LEASE_TTL_SECS", "abc");
209        }
210        assert_eq!(lease_ttl_secs(), 60);
211        unsafe {
212            std::env::set_var("RED_LEASE_TTL_SECS", "30");
213        }
214        assert_eq!(lease_ttl_secs(), 30);
215        unsafe {
216            std::env::remove_var("RED_LEASE_TTL_SECS");
217        }
218    }
219
220    #[test]
221    fn holder_id_falls_back_when_no_env() {
222        unsafe {
223            std::env::remove_var("RED_LEASE_HOLDER_ID");
224        }
225        let id = lease_holder_id();
226        assert!(id.contains('-'));
227        assert!(!id.is_empty());
228    }
229
230    #[test]
231    fn holder_id_uses_explicit_when_set() {
232        unsafe {
233            std::env::set_var("RED_LEASE_HOLDER_ID", "explicit-writer-1");
234        }
235        assert_eq!(lease_holder_id(), "explicit-writer-1");
236        unsafe {
237            std::env::remove_var("RED_LEASE_HOLDER_ID");
238        }
239    }
240}