reddb_server/runtime/
lease_loop.rs1use std::sync::Arc;
26use std::thread;
27use std::time::{Duration, Instant};
28
29use crate::api::{RedDBError, RedDBResult};
30use crate::replication::lease::LeaseStore;
31use crate::runtime::lease_lifecycle::{LeaseLifecycle, MarkDraining};
32use crate::runtime::lease_timer_wheel::LeaseTimerWheel;
33use crate::runtime::RedDBRuntime;
34
35pub fn start_lease_loop_if_required(runtime: &RedDBRuntime) -> RedDBResult<()> {
43 if !lease_required() {
44 return Ok(());
45 }
46
47 let backend = runtime
48 .db()
49 .options()
50 .remote_backend_atomic
51 .clone()
52 .ok_or_else(|| {
53 RedDBError::Internal(
54 "RED_LEASE_REQUIRED=true but the configured backend does not support atomic \
55 CAS — use s3, fs, or http with RED_HTTP_CONDITIONAL_WRITES=true"
56 .to_string(),
57 )
58 })?;
59
60 let database_key = runtime
61 .db()
62 .options()
63 .remote_key
64 .clone()
65 .unwrap_or_else(|| "main".to_string());
66 let ttl_ms = lease_ttl_secs() * 1000;
67 let holder_id = lease_holder_id();
68 let prefix = std::env::var("RED_LEASE_PREFIX")
69 .ok()
70 .filter(|v| !v.trim().is_empty())
71 .unwrap_or_else(|| "leases/".to_string());
72
73 let store = Arc::new(LeaseStore::new(backend).with_prefix(prefix));
74 let runtime_for_drain = runtime.clone();
75 let mark_draining: MarkDraining = Arc::new(move || {
76 runtime_for_drain.lifecycle().mark_draining();
77 });
78 let runtime_for_term = runtime.clone();
79 let current_term = Arc::new(move || runtime_for_term.current_replication_term());
80 let lifecycle = Arc::new(LeaseLifecycle::new(
81 store,
82 runtime.write_gate_arc(),
83 runtime.audit_log_arc(),
84 mark_draining,
85 current_term,
86 holder_id,
87 database_key,
88 ttl_ms,
89 ));
90 lifecycle.try_acquire()?;
91
92 let lifecycle_for_runtime = Arc::clone(&lifecycle);
97 let _ = runtime.set_lease_lifecycle(lifecycle_for_runtime);
98
99 spawn_refresh_thread(runtime.clone(), lifecycle, ttl_ms);
100 Ok(())
101}
102
103fn spawn_refresh_thread(runtime: RedDBRuntime, lifecycle: Arc<LeaseLifecycle>, ttl_ms: u64) {
104 let interval = Duration::from_millis(ttl_ms.saturating_div(3).max(1_000));
105 let wheel = Arc::new(LeaseTimerWheel::new(100));
108 wheel.schedule("lease-refresh".to_string(), Instant::now() + interval);
109
110 let wheel_for_handler = Arc::clone(&wheel);
111 let _ = thread::Builder::new()
112 .name("reddb-lease-refresh".into())
113 .spawn(move || {
114 wheel.run_until_shutdown(move |_id| {
115 let phase = runtime.lifecycle().phase();
119 if matches!(
120 phase,
121 crate::runtime::lifecycle::Phase::Draining
122 | crate::runtime::lifecycle::Phase::ShuttingDown
123 | crate::runtime::lifecycle::Phase::Stopped
124 ) {
125 let _ = lifecycle.release();
126 return false;
127 }
128
129 if lifecycle.refresh().is_err() {
130 return false;
131 }
132
133 wheel_for_handler.schedule("lease-refresh".to_string(), Instant::now() + interval);
135 true
136 });
137 });
138}
139
140fn lease_required() -> bool {
141 std::env::var("RED_LEASE_REQUIRED")
142 .ok()
143 .map(|v| {
144 let t = v.trim();
145 t.eq_ignore_ascii_case("true") || t == "1" || t.eq_ignore_ascii_case("yes")
146 })
147 .unwrap_or(false)
148}
149
150fn lease_ttl_secs() -> u64 {
151 std::env::var("RED_LEASE_TTL_SECS")
152 .ok()
153 .and_then(|s| s.trim().parse::<u64>().ok())
154 .filter(|v| *v > 0)
155 .unwrap_or(60)
156}
157
158fn lease_holder_id() -> String {
159 if let Some(explicit) = crate::utils::env_with_file_fallback("RED_LEASE_HOLDER_ID") {
160 return explicit;
161 }
162 let host = std::env::var("HOSTNAME")
163 .or_else(|_| std::env::var("HOST"))
164 .unwrap_or_else(|_| "unknown-host".to_string());
165 format!("{}-{}", host, std::process::id())
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
173 fn lease_required_parses_truthy_values() {
174 unsafe {
175 std::env::set_var("RED_LEASE_REQUIRED", "true");
176 }
177 assert!(lease_required());
178 unsafe {
179 std::env::set_var("RED_LEASE_REQUIRED", "1");
180 }
181 assert!(lease_required());
182 unsafe {
183 std::env::set_var("RED_LEASE_REQUIRED", "yes");
184 }
185 assert!(lease_required());
186 unsafe {
187 std::env::set_var("RED_LEASE_REQUIRED", "false");
188 }
189 assert!(!lease_required());
190 unsafe {
191 std::env::remove_var("RED_LEASE_REQUIRED");
192 }
193 assert!(!lease_required());
194 }
195
196 #[test]
197 fn ttl_defaults_to_60_when_unset() {
198 unsafe {
199 std::env::remove_var("RED_LEASE_TTL_SECS");
200 }
201 assert_eq!(lease_ttl_secs(), 60);
202 }
203
204 #[test]
205 fn ttl_rejects_zero_and_negative() {
206 unsafe {
207 std::env::set_var("RED_LEASE_TTL_SECS", "0");
208 }
209 assert_eq!(lease_ttl_secs(), 60);
210 unsafe {
211 std::env::set_var("RED_LEASE_TTL_SECS", "abc");
212 }
213 assert_eq!(lease_ttl_secs(), 60);
214 unsafe {
215 std::env::set_var("RED_LEASE_TTL_SECS", "30");
216 }
217 assert_eq!(lease_ttl_secs(), 30);
218 unsafe {
219 std::env::remove_var("RED_LEASE_TTL_SECS");
220 }
221 }
222
223 #[test]
224 fn holder_id_falls_back_when_no_env() {
225 unsafe {
226 std::env::remove_var("RED_LEASE_HOLDER_ID");
227 }
228 let id = lease_holder_id();
229 assert!(id.contains('-'));
230 assert!(!id.is_empty());
231 }
232
233 #[test]
234 fn holder_id_uses_explicit_when_set() {
235 unsafe {
236 std::env::set_var("RED_LEASE_HOLDER_ID", "explicit-writer-1");
237 }
238 assert_eq!(lease_holder_id(), "explicit-writer-1");
239 unsafe {
240 std::env::remove_var("RED_LEASE_HOLDER_ID");
241 }
242 }
243}