reddb_server/runtime/
lease_loop.rs1use std::sync::Arc;
26use std::thread;
27use std::time::{Duration, Instant};
28
29use crate::api::{RedDBError, RedDBResult};
30use crate::replication::lease::LeaseStore;
31use crate::runtime::lease_lifecycle::{LeaseLifecycle, MarkDraining};
32use crate::runtime::lease_timer_wheel::LeaseTimerWheel;
33use crate::runtime::RedDBRuntime;
34
35pub fn start_lease_loop_if_required(runtime: &RedDBRuntime) -> RedDBResult<()> {
43 if !lease_required() {
44 return Ok(());
45 }
46
47 let backend = runtime
48 .db()
49 .options()
50 .remote_backend_atomic
51 .clone()
52 .ok_or_else(|| {
53 RedDBError::Internal(
54 "RED_LEASE_REQUIRED=true but the configured backend does not support atomic \
55 CAS — use s3, fs, or http with RED_HTTP_CONDITIONAL_WRITES=true"
56 .to_string(),
57 )
58 })?;
59
60 let database_key = runtime
61 .db()
62 .options()
63 .remote_key
64 .clone()
65 .unwrap_or_else(|| "main".to_string());
66 let ttl_ms = lease_ttl_secs() * 1000;
67 let holder_id = lease_holder_id();
68 let prefix = std::env::var("RED_LEASE_PREFIX")
69 .ok()
70 .filter(|v| !v.trim().is_empty())
71 .unwrap_or_else(|| "leases/".to_string());
72
73 let store = Arc::new(LeaseStore::new(backend).with_prefix(prefix));
74 let runtime_for_drain = runtime.clone();
75 let mark_draining: MarkDraining = Arc::new(move || {
76 runtime_for_drain.lifecycle().mark_draining();
77 });
78 let lifecycle = Arc::new(LeaseLifecycle::new(
79 store,
80 runtime.write_gate_arc(),
81 runtime.audit_log_arc(),
82 mark_draining,
83 holder_id,
84 database_key,
85 ttl_ms,
86 ));
87 lifecycle.try_acquire()?;
88
89 let lifecycle_for_runtime = Arc::clone(&lifecycle);
94 let _ = runtime.set_lease_lifecycle(lifecycle_for_runtime);
95
96 spawn_refresh_thread(runtime.clone(), lifecycle, ttl_ms);
97 Ok(())
98}
99
100fn spawn_refresh_thread(runtime: RedDBRuntime, lifecycle: Arc<LeaseLifecycle>, ttl_ms: u64) {
101 let interval = Duration::from_millis(ttl_ms.saturating_div(3).max(1_000));
102 let wheel = Arc::new(LeaseTimerWheel::new(100));
105 wheel.schedule("lease-refresh".to_string(), Instant::now() + interval);
106
107 let wheel_for_handler = Arc::clone(&wheel);
108 let _ = thread::Builder::new()
109 .name("reddb-lease-refresh".into())
110 .spawn(move || {
111 wheel.run_until_shutdown(move |_id| {
112 let phase = runtime.lifecycle().phase();
116 if matches!(
117 phase,
118 crate::runtime::lifecycle::Phase::Draining
119 | crate::runtime::lifecycle::Phase::ShuttingDown
120 | crate::runtime::lifecycle::Phase::Stopped
121 ) {
122 let _ = lifecycle.release();
123 return false;
124 }
125
126 if lifecycle.refresh().is_err() {
127 return false;
128 }
129
130 wheel_for_handler.schedule("lease-refresh".to_string(), Instant::now() + interval);
132 true
133 });
134 });
135}
136
137fn lease_required() -> bool {
138 std::env::var("RED_LEASE_REQUIRED")
139 .ok()
140 .map(|v| {
141 let t = v.trim();
142 t.eq_ignore_ascii_case("true") || t == "1" || t.eq_ignore_ascii_case("yes")
143 })
144 .unwrap_or(false)
145}
146
147fn lease_ttl_secs() -> u64 {
148 std::env::var("RED_LEASE_TTL_SECS")
149 .ok()
150 .and_then(|s| s.trim().parse::<u64>().ok())
151 .filter(|v| *v > 0)
152 .unwrap_or(60)
153}
154
155fn lease_holder_id() -> String {
156 if let Some(explicit) = crate::utils::env_with_file_fallback("RED_LEASE_HOLDER_ID") {
157 return explicit;
158 }
159 let host = std::env::var("HOSTNAME")
160 .or_else(|_| std::env::var("HOST"))
161 .unwrap_or_else(|_| "unknown-host".to_string());
162 format!("{}-{}", host, std::process::id())
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168
169 #[test]
170 fn lease_required_parses_truthy_values() {
171 unsafe {
172 std::env::set_var("RED_LEASE_REQUIRED", "true");
173 }
174 assert!(lease_required());
175 unsafe {
176 std::env::set_var("RED_LEASE_REQUIRED", "1");
177 }
178 assert!(lease_required());
179 unsafe {
180 std::env::set_var("RED_LEASE_REQUIRED", "yes");
181 }
182 assert!(lease_required());
183 unsafe {
184 std::env::set_var("RED_LEASE_REQUIRED", "false");
185 }
186 assert!(!lease_required());
187 unsafe {
188 std::env::remove_var("RED_LEASE_REQUIRED");
189 }
190 assert!(!lease_required());
191 }
192
193 #[test]
194 fn ttl_defaults_to_60_when_unset() {
195 unsafe {
196 std::env::remove_var("RED_LEASE_TTL_SECS");
197 }
198 assert_eq!(lease_ttl_secs(), 60);
199 }
200
201 #[test]
202 fn ttl_rejects_zero_and_negative() {
203 unsafe {
204 std::env::set_var("RED_LEASE_TTL_SECS", "0");
205 }
206 assert_eq!(lease_ttl_secs(), 60);
207 unsafe {
208 std::env::set_var("RED_LEASE_TTL_SECS", "abc");
209 }
210 assert_eq!(lease_ttl_secs(), 60);
211 unsafe {
212 std::env::set_var("RED_LEASE_TTL_SECS", "30");
213 }
214 assert_eq!(lease_ttl_secs(), 30);
215 unsafe {
216 std::env::remove_var("RED_LEASE_TTL_SECS");
217 }
218 }
219
220 #[test]
221 fn holder_id_falls_back_when_no_env() {
222 unsafe {
223 std::env::remove_var("RED_LEASE_HOLDER_ID");
224 }
225 let id = lease_holder_id();
226 assert!(id.contains('-'));
227 assert!(!id.is_empty());
228 }
229
230 #[test]
231 fn holder_id_uses_explicit_when_set() {
232 unsafe {
233 std::env::set_var("RED_LEASE_HOLDER_ID", "explicit-writer-1");
234 }
235 assert_eq!(lease_holder_id(), "explicit-writer-1");
236 unsafe {
237 std::env::remove_var("RED_LEASE_HOLDER_ID");
238 }
239 }
240}