1use crate::types::DatabaseError;
8use js_sys::Date;
9use std::cell::RefCell;
10use std::rc::Rc;
11use wasm_bindgen::prelude::*;
12use wasm_bindgen::JsCast;
13use web_sys::BroadcastChannel;
14
15#[derive(Debug, Clone)]
17pub struct LeaderElectionState {
18 pub db_name: String,
19 pub instance_id: String,
20 pub is_leader: bool,
21 pub leader_id: Option<String>,
22 pub lease_expiry: u64,
23 pub last_heartbeat: u64,
24}
25
26pub struct LeaderElectionManager {
28 pub state: Rc<RefCell<LeaderElectionState>>,
29 broadcast_channel: Option<BroadcastChannel>,
30 pub heartbeat_interval: Option<i32>,
31 lease_duration_ms: u64,
32}
33
34impl LeaderElectionManager {
35 pub fn new(db_name: String) -> Self {
37 let timestamp = Date::now() as u64;
39 let random_part = (js_sys::Math::random() * 1000.0) as u64;
40 let instance_id = format!("{:016x}_{:03x}", timestamp, random_part);
41
42 log::debug!("Created instance {} for {}", instance_id, db_name);
43
44 Self {
45 state: Rc::new(RefCell::new(LeaderElectionState {
46 db_name,
47 instance_id,
48 is_leader: false,
49 leader_id: None,
50 lease_expiry: 0,
51 last_heartbeat: 0,
52 })),
53 broadcast_channel: None,
54 heartbeat_interval: None,
55 lease_duration_ms: 5000, }
57 }
58
59 pub async fn start_election(&mut self) -> Result<(), DatabaseError> {
61 log::debug!("Starting localStorage-based leader election for {}", self.state.borrow().db_name);
62
63 let channel_name = format!("datasync_leader_{}", self.state.borrow().db_name);
65 let broadcast_channel = BroadcastChannel::new(&channel_name)
66 .map_err(|_| DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to create BroadcastChannel"))?;
67
68 self.broadcast_channel = Some(broadcast_channel);
69
70 self.try_become_leader().await?;
72
73 if self.state.borrow().is_leader {
75 self.start_heartbeat()?;
76 }
77
78 Ok(())
79 }
80
81 pub async fn try_become_leader_internal(&mut self, force: bool) -> Result<(), DatabaseError> {
86 let state = self.state.borrow();
87 let my_instance_id = state.instance_id.clone();
88 let db_name = state.db_name.clone();
89 drop(state);
90
91 let window = web_sys::window()
93 .ok_or_else(|| DatabaseError::new(
94 "STORAGE_ERROR",
95 "Window not available - not in browser context"
96 ))?;
97 let storage = window
98 .local_storage()
99 .map_err(|_| DatabaseError::new(
100 "STORAGE_ERROR",
101 "localStorage access denied (check browser settings)"
102 ))?
103 .ok_or_else(|| DatabaseError::new(
104 "STORAGE_ERROR",
105 "localStorage unavailable (private browsing mode?)"
106 ))?;
107
108 let instances_key = format!("datasync_instances_{}", db_name);
109 let leader_key = format!("datasync_leader_{}", db_name);
110
111 let current_time = Date::now() as u64;
113 let instance_data = format!("{}:{}", my_instance_id, current_time);
114
115 let existing_instances = storage.get_item(&instances_key).unwrap().unwrap_or_default();
117 let mut all_instances: Vec<String> = if existing_instances.is_empty() {
118 Vec::new()
119 } else {
120 existing_instances.split(',').map(|s| s.to_string()).collect()
121 };
122
123 if !all_instances.iter().any(|inst| inst.starts_with(&format!("{}:", my_instance_id))) {
125 all_instances.push(instance_data);
126 }
127
128 let cutoff_time = current_time - 10000;
130 all_instances.retain(|inst| {
131 if let Some(colon_pos) = inst.rfind(':') {
132 if let Ok(timestamp) = inst[colon_pos + 1..].parse::<u64>() {
133 timestamp > cutoff_time
134 } else {
135 false
136 }
137 } else {
138 false
139 }
140 });
141
142 let instances_str = all_instances.join(",");
144 storage.set_item(&instances_key, &instances_str).unwrap();
145
146 let mut instance_ids: Vec<String> = all_instances.iter()
148 .filter_map(|inst| inst.split(':').next().map(|s| s.to_string()))
149 .collect();
150 instance_ids.sort();
151
152 log::debug!("All instances for {}: {:?}", db_name, instance_ids);
153
154 if let Some(lowest_id) = instance_ids.first() {
155 if force || *lowest_id == my_instance_id {
156 if force && *lowest_id != my_instance_id {
158 log::debug!("FORCING leadership takeover for {} (overriding lowest ID rule)", db_name);
159 } else {
160 log::debug!("I have the lowest ID - attempting atomic leadership claim for {}", db_name);
161 }
162
163 if let Ok(existing_leader) = storage.get_item(&leader_key) {
165 if let Some(existing_data) = existing_leader {
166 if let Some(colon_pos) = existing_data.rfind(':') {
167 let existing_leader_id = &existing_data[..colon_pos];
168 if let Ok(existing_timestamp) = existing_data[colon_pos + 1..].parse::<u64>() {
169 let existing_lease_expired = (current_time - existing_timestamp) > 5000;
170
171 if !force && !existing_lease_expired && existing_leader_id != my_instance_id {
172 log::debug!("{} already claimed leadership for {}",
174 existing_leader_id, db_name);
175
176 let mut state = self.state.borrow_mut();
177 state.is_leader = false;
178 state.leader_id = Some(existing_leader_id.to_string());
179 state.lease_expiry = existing_timestamp + self.lease_duration_ms;
180 return Ok(());
181 }
182 }
183 }
184 }
185 }
186
187 let leader_data = format!("{}:{}", my_instance_id, current_time);
189 storage.set_item(&leader_key, &leader_data).unwrap();
190
191 let mut state = self.state.borrow_mut();
192 state.is_leader = true;
193 state.leader_id = Some(my_instance_id.clone());
194 state.lease_expiry = current_time + self.lease_duration_ms;
195 drop(state);
196
197 log::info!("Became leader for {} with ID {}", db_name, my_instance_id);
198
199 if self.heartbeat_interval.is_none() {
201 let _ = self.start_heartbeat();
202 }
203 } else {
204 log::debug!("Instance {} has lower ID - not claiming leadership for {}",
206 lowest_id, db_name);
207
208 let mut state = self.state.borrow_mut();
209 state.is_leader = false;
210 state.leader_id = Some(lowest_id.clone());
211 state.lease_expiry = current_time + self.lease_duration_ms;
212 }
213 }
214
215 Ok(())
216 }
217
218 pub async fn try_become_leader(&mut self) -> Result<(), DatabaseError> {
220 self.try_become_leader_internal(false).await
221 }
222
223 pub async fn force_become_leader(&mut self) -> Result<(), DatabaseError> {
225 self.try_become_leader_internal(true).await
226 }
227
228 pub fn start_heartbeat(&mut self) -> Result<(), DatabaseError> {
230 let state_clone = self.state.clone();
231
232 let closure = Closure::wrap(Box::new(move || {
233 let state = state_clone.borrow();
234 if state.is_leader {
235 let current_time = Date::now() as u64;
236
237 let window = match web_sys::window() {
239 Some(w) => w,
240 None => {
241 log::error!("Window unavailable in heartbeat - stopping heartbeat");
242 return;
243 }
244 };
245 let storage = match window.local_storage() {
246 Ok(Some(s)) => s,
247 Ok(None) => {
248 log::warn!("localStorage unavailable in heartbeat (private browsing?)");
249 return;
250 },
251 Err(_) => {
252 log::error!("localStorage access denied in heartbeat");
253 return;
254 }
255 };
256 let leader_key = format!("datasync_leader_{}", state.db_name);
257 let leader_data = format!("{}:{}", state.instance_id, current_time);
258
259 let _ = storage.set_item(&leader_key, &leader_data);
260
261 log::debug!("Updated leader heartbeat for {} from leader {}",
262 state.db_name, state.instance_id);
263 }
264 }) as Box<dyn FnMut()>);
265
266 let interval_id = web_sys::window()
267 .unwrap()
268 .set_interval_with_callback_and_timeout_and_arguments_0(
269 closure.as_ref().unchecked_ref(),
270 1000, )
272 .map_err(|_| DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to start heartbeat interval"))?;
273
274 closure.forget();
275 self.heartbeat_interval = Some(interval_id);
276
277 Ok(())
278 }
279
280 pub async fn stop_election(&mut self) -> Result<(), DatabaseError> {
282 let state = self.state.borrow();
283 let db_name = state.db_name.clone();
284 let instance_id = state.instance_id.clone();
285 let was_leader = state.is_leader;
286 drop(state);
287
288 log::debug!("Stopping leader election for {}", db_name);
289
290 if let Some(interval_id) = self.heartbeat_interval.take() {
292 if let Some(window) = web_sys::window() {
293 window.clear_interval_with_handle(interval_id);
294 }
295 }
296
297 let Some(window) = web_sys::window() else {
299 log::warn!("Window unavailable during cleanup");
300 return Ok(());
301 };
302 let storage = match window.local_storage() {
303 Ok(Some(s)) => s,
304 Ok(None) | Err(_) => {
305 log::warn!("localStorage unavailable during cleanup (private browsing?)");
306 return Ok(());
307 }
308 };
309 let instances_key = format!("datasync_instances_{}", db_name);
310
311 if let Ok(Some(existing_instances)) = storage.get_item(&instances_key) {
312 let all_instances: Vec<String> = existing_instances.split(',').map(|s| s.to_string()).collect();
313 let filtered_instances: Vec<String> = all_instances.into_iter()
314 .filter(|inst| !inst.starts_with(&format!("{}:", instance_id)))
315 .collect();
316
317 if filtered_instances.is_empty() {
318 let _ = storage.remove_item(&instances_key);
320 } else {
321 let instances_str = filtered_instances.join(",");
323 let _ = storage.set_item(&instances_key, &instances_str);
324 }
325 }
326
327 if was_leader {
329 let leader_key = format!("datasync_leader_{}", db_name);
330 let _ = storage.remove_item(&leader_key);
331
332 log::debug!("Cleared leader data for {} (was leader: {})", db_name, instance_id);
333 }
334
335 let mut state = self.state.borrow_mut();
337 state.is_leader = false;
338 state.leader_id = None;
339
340 Ok(())
341 }
342
343 pub async fn is_leader(&self) -> bool {
345 let now = Date::now() as u64;
346 let state = self.state.borrow();
347 let db_name = state.db_name.clone();
348 let my_instance_id = state.instance_id.clone();
349
350 let Some(window) = web_sys::window() else {
352 log::warn!("Window unavailable for leader check - assuming not leader");
353 return false;
354 };
355 let storage = match window.local_storage() {
356 Ok(Some(s)) => s,
357 Ok(None) => {
358 log::warn!("localStorage unavailable for leader check (private browsing?) - assuming not leader");
359 return false;
360 },
361 Err(_) => {
362 log::error!("localStorage access denied for leader check - assuming not leader");
363 return false;
364 }
365 };
366 let leader_key = format!("datasync_leader_{}", db_name);
367
368 let current_leader_expired = if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
370 if let Some(colon_pos) = leader_data.rfind(':') {
371 let leader_id = &leader_data[..colon_pos];
372 if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
373 let lease_expired = (now - timestamp) > 5000; if leader_id == my_instance_id && !lease_expired {
376 return true; }
378
379 lease_expired } else {
381 true }
383 } else {
384 true }
386 } else {
387 true };
389
390 drop(state);
391
392 if current_leader_expired {
394 log::debug!("Current leader lease expired for {} - re-election needed", db_name);
395
396 let mut state = self.state.borrow_mut();
398 state.is_leader = false;
399 state.leader_id = None;
400 false
401 } else {
402 let mut state = self.state.borrow_mut();
404 state.is_leader = false;
405 false
406 }
407 }
408
409 pub async fn send_heartbeat(&self) -> Result<(), DatabaseError> {
411 if let Some(ref channel) = self.broadcast_channel {
412 let state = self.state.borrow();
413 let now = Date::now() as u64;
414
415 let message = js_sys::Object::new();
416 js_sys::Reflect::set(&message, &"type".into(), &"heartbeat".into()).unwrap();
417 js_sys::Reflect::set(&message, &"leader_id".into(), &state.instance_id.clone().into()).unwrap();
418 js_sys::Reflect::set(&message, &"timestamp".into(), &(now as f64).into()).unwrap();
419
420 channel.post_message(&message)
421 .map_err(|_| DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to send heartbeat"))?;
422 }
423
424 Ok(())
425 }
426
427 pub async fn get_last_heartbeat(&self) -> u64 {
429 let state = self.state.borrow();
430 let Some(window) = web_sys::window() else {
431 log::warn!("Window unavailable for heartbeat check");
432 return 0;
433 };
434 let storage = match window.local_storage() {
435 Ok(Some(s)) => s,
436 Ok(None) | Err(_) => {
437 log::warn!("localStorage unavailable for heartbeat check (private browsing?)");
438 return 0;
439 }
440 };
441 let leader_key = format!("datasync_leader_{}", state.db_name);
442
443 if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
444 if let Some(colon_pos) = leader_data.rfind(':') {
445 if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
446 return timestamp;
447 }
448 }
449 }
450
451 state.last_heartbeat
453 }
454}