1use crate::types::DatabaseError;
8use js_sys::Date;
9use std::cell::RefCell;
10use std::rc::Rc;
11use wasm_bindgen::JsCast;
12use wasm_bindgen::prelude::*;
13use web_sys::BroadcastChannel;
14
15thread_local! {
18 static HEARTBEAT_RUNNING: RefCell<bool> = const { RefCell::new(false) };
19}
20
21#[derive(Debug, Clone)]
23pub struct LeaderElectionState {
24 pub db_name: String,
25 pub instance_id: String,
26 pub is_leader: bool,
27 pub leader_id: Option<String>,
28 pub lease_expiry: u64,
29 pub last_heartbeat: u64,
30}
31
32pub struct LeaderElectionManager {
34 pub state: Rc<RefCell<LeaderElectionState>>,
35 broadcast_channel: Option<BroadcastChannel>,
36 pub heartbeat_interval: Option<i32>,
37 pub heartbeat_closure: Option<Closure<dyn FnMut()>>,
38 message_listener: Option<Closure<dyn FnMut(web_sys::MessageEvent)>>,
39 lease_duration_ms: u64,
40}
41
42impl LeaderElectionManager {
43 pub fn new(db_name: String) -> Self {
45 let timestamp = Date::now() as u64;
47 let random_part = (js_sys::Math::random() * 1000.0) as u64;
48 let instance_id = format!("{:016x}_{:03x}", timestamp, random_part);
49
50 log::debug!("Created instance {} for {}", instance_id, db_name);
51
52 Self {
53 state: Rc::new(RefCell::new(LeaderElectionState {
54 db_name,
55 instance_id,
56 is_leader: false,
57 leader_id: None,
58 lease_expiry: 0,
59 last_heartbeat: 0,
60 })),
61 broadcast_channel: None,
62 heartbeat_interval: None,
63 heartbeat_closure: None,
64 message_listener: None,
65 lease_duration_ms: 1000, }
67 }
68
69 pub async fn start_election(&mut self) -> Result<(), DatabaseError> {
71 log::debug!(
72 "LeaderElectionManager::start_election() - Starting for {}",
73 self.state.borrow().db_name
74 );
75
76 let channel_name = format!("datasync_leader_{}", self.state.borrow().db_name);
78 log::debug!(
79 "LeaderElectionManager::start_election() - Creating BroadcastChannel: {}",
80 channel_name
81 );
82 let broadcast_channel = BroadcastChannel::new(&channel_name).map_err(|_| {
83 DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to create BroadcastChannel")
84 })?;
85
86 let state_clone = self.state.clone();
88 let listener = Closure::wrap(Box::new(move |event: web_sys::MessageEvent| {
89 if let Ok(data) = event.data().dyn_into::<js_sys::JsString>() {
90 let message: String = data.into();
91
92 if let Some(parts) = message.strip_prefix("LEADER_CLAIMED:") {
94 let parts: Vec<&str> = parts.split(':').collect();
95 if parts.len() == 2 {
96 let new_leader_id = parts[0];
97 if let Ok(_timestamp) = parts[1].parse::<u64>() {
98 let mut state = state_clone.borrow_mut();
99 let my_instance_id = state.instance_id.clone();
100
101 if new_leader_id == my_instance_id {
102 state.is_leader = true;
104 state.leader_id = Some(new_leader_id.to_string());
105 log::info!(
106 "EVENT: Became leader for {} via BroadcastChannel",
107 state.db_name
108 );
109 } else {
110 state.is_leader = false;
112 state.leader_id = Some(new_leader_id.to_string());
113 log::debug!(
114 "EVENT: {} is now leader for {}",
115 new_leader_id,
116 state.db_name
117 );
118 }
119 }
120 }
121 }
122 }
123 }) as Box<dyn FnMut(web_sys::MessageEvent)>);
124
125 broadcast_channel.set_onmessage(Some(listener.as_ref().unchecked_ref()));
126 self.message_listener = Some(listener);
127 self.broadcast_channel = Some(broadcast_channel);
128
129 log::debug!("LeaderElectionManager::start_election() - Calling try_become_leader()");
131 self.try_become_leader().await?;
132
133 let is_leader = self.state.borrow().is_leader;
135 web_sys::console::log_1(
136 &format!(
137 "LeaderElectionManager::start_election() - After try_become_leader, is_leader={}",
138 is_leader
139 )
140 .into(),
141 );
142 if is_leader {
143 web_sys::console::log_1(
144 &"LeaderElectionManager::start_election() - Calling start_heartbeat()".into(),
145 );
146 self.start_heartbeat()?;
147 web_sys::console::log_1(
148 &"LeaderElectionManager::start_election() - Heartbeat started".into(),
149 );
150 } else {
151 web_sys::console::log_1(
152 &"LeaderElectionManager::start_election() - Not leader, skipping heartbeat".into(),
153 );
154 }
155
156 Ok(())
157 }
158
159 pub async fn try_become_leader_internal(&mut self, force: bool) -> Result<(), DatabaseError> {
164 let state = self.state.borrow();
165 let my_instance_id = state.instance_id.clone();
166 let db_name = state.db_name.clone();
167 drop(state);
168
169 let window = web_sys::window().ok_or_else(|| {
171 DatabaseError::new(
172 "STORAGE_ERROR",
173 "Window not available - not in browser context",
174 )
175 })?;
176 let storage = window
177 .local_storage()
178 .map_err(|_| {
179 DatabaseError::new(
180 "STORAGE_ERROR",
181 "localStorage access denied (check browser settings)",
182 )
183 })?
184 .ok_or_else(|| {
185 DatabaseError::new(
186 "STORAGE_ERROR",
187 "localStorage unavailable (private browsing mode?)",
188 )
189 })?;
190
191 let instances_key = format!("datasync_instances_{}", db_name);
192 let leader_key = format!("datasync_leader_{}", db_name);
193
194 let current_time = Date::now() as u64;
196 let instance_data = format!("{}:{}", my_instance_id, current_time);
197
198 let existing_instances = storage
200 .get_item(&instances_key)
201 .map_err(|e| {
202 DatabaseError::new(
203 "LEADER_ELECTION_ERROR",
204 &format!("Failed to get instances: {:?}", e),
205 )
206 })?
207 .unwrap_or_default();
208 let mut all_instances: Vec<String> = if existing_instances.is_empty() {
209 Vec::new()
210 } else {
211 existing_instances
212 .split(',')
213 .map(|s| s.to_string())
214 .collect()
215 };
216
217 if !all_instances
219 .iter()
220 .any(|inst| inst.starts_with(&format!("{}:", my_instance_id)))
221 {
222 all_instances.push(instance_data);
223 }
224
225 let cutoff_time = current_time - 10000;
227 all_instances.retain(|inst| {
228 if let Some(colon_pos) = inst.rfind(':') {
229 if let Ok(timestamp) = inst[colon_pos + 1..].parse::<u64>() {
230 timestamp > cutoff_time
231 } else {
232 false
233 }
234 } else {
235 false
236 }
237 });
238
239 let instances_str = all_instances.join(",");
241 storage
242 .set_item(&instances_key, &instances_str)
243 .map_err(|e| {
244 DatabaseError::new(
245 "LEADER_ELECTION_ERROR",
246 &format!("Failed to set instances: {:?}", e),
247 )
248 })?;
249
250 let mut instance_ids: Vec<String> = all_instances
252 .iter()
253 .filter_map(|inst| inst.split(':').next().map(|s| s.to_string()))
254 .collect();
255 instance_ids.sort();
256
257 log::debug!("All instances for {}: {:?}", db_name, instance_ids);
258
259 if let Some(lowest_id) = instance_ids.first() {
260 if force || *lowest_id == my_instance_id {
261 if force && *lowest_id != my_instance_id {
263 log::debug!(
264 "FORCING leadership takeover for {} (overriding lowest ID rule)",
265 db_name
266 );
267 } else {
268 log::debug!(
269 "I have the lowest ID - attempting atomic leadership claim for {}",
270 db_name
271 );
272 }
273
274 if let Ok(Some(existing_data)) = storage.get_item(&leader_key) {
276 if let Some(colon_pos) = existing_data.rfind(':') {
277 let existing_leader_id = &existing_data[..colon_pos];
278 if let Ok(existing_timestamp) =
279 existing_data[colon_pos + 1..].parse::<u64>()
280 {
281 let existing_lease_expired = (current_time - existing_timestamp) > 5000;
282
283 if !force
284 && !existing_lease_expired
285 && existing_leader_id != my_instance_id
286 {
287 log::debug!(
289 "{} already claimed leadership for {}",
290 existing_leader_id,
291 db_name
292 );
293
294 let mut state = self.state.borrow_mut();
295 state.is_leader = false;
296 state.leader_id = Some(existing_leader_id.to_string());
297 state.lease_expiry = existing_timestamp + self.lease_duration_ms;
298 return Ok(());
299 }
300 }
301 }
302 }
303
304 let leader_data = format!("{}:{}", my_instance_id, current_time);
306 storage.set_item(&leader_key, &leader_data).map_err(|e| {
307 DatabaseError::new(
308 "LEADER_ELECTION_ERROR",
309 &format!("Failed to set leader: {:?}", e),
310 )
311 })?;
312
313 let mut state = self.state.borrow_mut();
314 state.is_leader = true;
315 state.leader_id = Some(my_instance_id.clone());
316 state.lease_expiry = current_time + self.lease_duration_ms;
317 drop(state);
318
319 log::info!("Became leader for {} with ID {}", db_name, my_instance_id);
320
321 if let Some(ref channel) = self.broadcast_channel {
323 let message = format!("LEADER_CLAIMED:{}:{}", my_instance_id, current_time);
324 if let Err(e) = channel.post_message(&JsValue::from_str(&message)) {
325 log::warn!("Failed to broadcast leadership claim: {:?}", e);
326 } else {
327 log::debug!("EVENT: Broadcasted leadership claim for {}", db_name);
328 }
329 }
330
331 if self.heartbeat_interval.is_none() {
333 let _ = self.start_heartbeat();
334 }
335 } else {
336 log::debug!(
338 "Instance {} has lower ID - not claiming leadership for {}",
339 lowest_id,
340 db_name
341 );
342
343 let mut state = self.state.borrow_mut();
344 state.is_leader = false;
345 state.leader_id = Some(lowest_id.clone());
346 state.lease_expiry = current_time + self.lease_duration_ms;
347 }
348 }
349
350 Ok(())
351 }
352
353 pub async fn try_become_leader(&mut self) -> Result<(), DatabaseError> {
355 self.try_become_leader_internal(false).await
356 }
357
358 pub async fn force_become_leader(&mut self) -> Result<(), DatabaseError> {
360 self.try_become_leader_internal(true).await
361 }
362
363 pub fn start_heartbeat(&mut self) -> Result<(), DatabaseError> {
365 web_sys::console::log_1(&"start_heartbeat() called".into());
366
367 let state = self.state.borrow();
369 web_sys::console::log_1(
370 &format!(
371 "start_heartbeat: is_leader={}, db_name={}",
372 state.is_leader, state.db_name
373 )
374 .into(),
375 );
376
377 if state.is_leader {
378 let current_time = Date::now() as u64;
379
380 let window = web_sys::window()
381 .ok_or_else(|| DatabaseError::new("LEADER_ELECTION_ERROR", "Window unavailable"))?;
382 let storage = window
383 .local_storage()
384 .map_err(|_| {
385 DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage unavailable")
386 })?
387 .ok_or_else(|| {
388 DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage is None")
389 })?;
390
391 let leader_key = format!("datasync_leader_{}", state.db_name);
392 let leader_data = format!("{}:{}", state.instance_id, current_time);
393
394 web_sys::console::log_1(
395 &format!(
396 "Writing heartbeat: key={}, data={}",
397 leader_key, leader_data
398 )
399 .into(),
400 );
401
402 storage.set_item(&leader_key, &leader_data).map_err(|e| {
403 web_sys::console::error_1(
404 &format!("Failed to write initial heartbeat: {:?}", e).into(),
405 );
406 DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to write initial heartbeat")
407 })?;
408
409 web_sys::console::log_1(
410 &format!(
411 "Sent initial heartbeat for {} from leader {}",
412 state.db_name, state.instance_id
413 )
414 .into(),
415 );
416 } else {
417 web_sys::console::warn_1(&"start_heartbeat called but is_leader=false".into());
418 }
419 drop(state);
420
421 let state_clone = self.state.clone();
423
424 let closure = Closure::wrap(Box::new(move || {
425 let already_running = HEARTBEAT_RUNNING.with(|running| {
428 let was_running = *running.borrow();
429 if !was_running {
430 *running.borrow_mut() = true;
431 }
432 was_running
433 });
434
435 if already_running {
436 log::debug!("Heartbeat skipped - previous invocation still running");
437 return;
438 }
439
440 struct HeartbeatGuard;
442 impl Drop for HeartbeatGuard {
443 fn drop(&mut self) {
444 HEARTBEAT_RUNNING.with(|running| {
445 *running.borrow_mut() = false;
446 });
447 }
448 }
449 let _guard = HeartbeatGuard;
450
451 let state = state_clone.borrow();
452 if state.is_leader {
453 let current_time = Date::now() as u64;
454
455 let window = match web_sys::window() {
457 Some(w) => w,
458 None => {
459 log::error!("Window unavailable in heartbeat - stopping heartbeat");
460 return;
461 }
462 };
463 let storage = match window.local_storage() {
464 Ok(Some(s)) => s,
465 Ok(None) => {
466 log::warn!("localStorage unavailable in heartbeat (private browsing?)");
467 return;
468 }
469 Err(_) => {
470 log::error!("localStorage access denied in heartbeat");
471 return;
472 }
473 };
474 let leader_key = format!("datasync_leader_{}", state.db_name);
475 let leader_data = format!("{}:{}", state.instance_id, current_time);
476
477 let _ = storage.set_item(&leader_key, &leader_data);
478
479 log::debug!(
480 "Updated leader heartbeat for {} from leader {}",
481 state.db_name,
482 state.instance_id
483 );
484 }
485 }) as Box<dyn FnMut()>);
486
487 let interval_id = web_sys::window()
488 .unwrap()
489 .set_interval_with_callback_and_timeout_and_arguments_0(
490 closure.as_ref().unchecked_ref(),
491 1000, )
493 .map_err(|_| {
494 DatabaseError::new(
495 "LEADER_ELECTION_ERROR",
496 "Failed to start heartbeat interval",
497 )
498 })?;
499
500 self.heartbeat_interval = Some(interval_id);
502 self.heartbeat_closure = Some(closure);
503
504 Ok(())
505 }
506
507 pub async fn stop_election(&mut self) -> Result<(), DatabaseError> {
509 if self.heartbeat_interval.is_none() && self.heartbeat_closure.is_none() {
511 web_sys::console::log_1(&"[STOP] Already stopped - skipping".into());
512 return Ok(());
513 }
514
515 let state = self.state.borrow();
516 let db_name = state.db_name.clone();
517 let instance_id = state.instance_id.clone();
518 let was_leader = state.is_leader;
519 drop(state);
520
521 log::info!(
522 "[STOP] Stopping leader election for {} (was_leader: {})",
523 db_name,
524 was_leader
525 );
526
527 if let Some(interval_id) = self.heartbeat_interval.take() {
529 web_sys::console::log_1(
530 &format!("[STOP] Clearing interval {} for {}", interval_id, db_name).into(),
531 );
532 if let Some(window) = web_sys::window() {
533 window.clear_interval_with_handle(interval_id);
534 }
535 }
536
537 if let Some(_closure) = self.heartbeat_closure.take() {
539 web_sys::console::log_1(
540 &format!("[STOP] Dropped heartbeat closure for {}", db_name).into(),
541 );
542 }
543
544 if let Some(channel) = self.broadcast_channel.take() {
546 channel.close();
547 web_sys::console::log_1(
548 &format!("[STOP] Closed BroadcastChannel for {}", db_name).into(),
549 );
550 }
551
552 let Some(window) = web_sys::window() else {
554 log::warn!("Window unavailable during cleanup");
555 return Ok(());
556 };
557 let storage = match window.local_storage() {
558 Ok(Some(s)) => s,
559 Ok(None) | Err(_) => {
560 log::warn!("localStorage unavailable during cleanup (private browsing?)");
561 return Ok(());
562 }
563 };
564 let instances_key = format!("datasync_instances_{}", db_name);
565
566 if let Ok(Some(existing_instances)) = storage.get_item(&instances_key) {
567 let all_instances: Vec<String> = existing_instances
568 .split(',')
569 .map(|s| s.to_string())
570 .collect();
571 let filtered_instances: Vec<String> = all_instances
572 .into_iter()
573 .filter(|inst| !inst.starts_with(&format!("{}:", instance_id)))
574 .collect();
575
576 if filtered_instances.is_empty() {
577 let _ = storage.remove_item(&instances_key);
579 } else {
580 let instances_str = filtered_instances.join(",");
582 let _ = storage.set_item(&instances_key, &instances_str);
583 }
584 }
585
586 if was_leader {
588 let leader_key = format!("datasync_leader_{}", db_name);
589 let _ = storage.remove_item(&leader_key);
590
591 log::debug!(
592 "Cleared leader data for {} (was leader: {})",
593 db_name,
594 instance_id
595 );
596 }
597
598 let mut state = self.state.borrow_mut();
600 state.is_leader = false;
601 state.leader_id = None;
602
603 Ok(())
604 }
605
606 pub async fn is_leader(&self) -> bool {
608 let now = Date::now() as u64;
609 let state = self.state.borrow();
610 let db_name = state.db_name.clone();
611 let my_instance_id = state.instance_id.clone();
612
613 let Some(window) = web_sys::window() else {
615 log::warn!("Window unavailable for leader check - assuming not leader");
616 return false;
617 };
618 let storage = match window.local_storage() {
619 Ok(Some(s)) => s,
620 Ok(None) => {
621 log::warn!(
622 "localStorage unavailable for leader check (private browsing?) - assuming not leader"
623 );
624 return false;
625 }
626 Err(_) => {
627 log::error!("localStorage access denied for leader check - assuming not leader");
628 return false;
629 }
630 };
631 let leader_key = format!("datasync_leader_{}", db_name);
632
633 let current_leader_expired = if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
635 if let Some(colon_pos) = leader_data.rfind(':') {
636 let leader_id = &leader_data[..colon_pos];
637 if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
638 let lease_expired = (now - timestamp) > 5000; if leader_id == my_instance_id && !lease_expired {
641 return true; }
643
644 lease_expired } else {
646 true }
648 } else {
649 true }
651 } else {
652 true };
654
655 drop(state);
656
657 if current_leader_expired {
659 log::debug!(
660 "Current leader lease expired for {} - re-election needed",
661 db_name
662 );
663
664 let mut state = self.state.borrow_mut();
666 state.is_leader = false;
667 state.leader_id = None;
668 false
669 } else {
670 let mut state = self.state.borrow_mut();
672 state.is_leader = false;
673 false
674 }
675 }
676
677 pub async fn send_heartbeat(&self) -> Result<(), DatabaseError> {
679 if let Some(ref channel) = self.broadcast_channel {
680 let state = self.state.borrow();
681 let now = Date::now() as u64;
682
683 let message = js_sys::Object::new();
684 js_sys::Reflect::set(&message, &"type".into(), &"heartbeat".into()).unwrap();
685 js_sys::Reflect::set(
686 &message,
687 &"leader_id".into(),
688 &state.instance_id.clone().into(),
689 )
690 .unwrap();
691 js_sys::Reflect::set(&message, &"timestamp".into(), &(now as f64).into()).unwrap();
692
693 channel.post_message(&message).map_err(|_| {
694 DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to send heartbeat")
695 })?;
696 }
697
698 Ok(())
699 }
700
701 pub async fn get_last_heartbeat(&self) -> u64 {
703 let state = self.state.borrow();
704 let Some(window) = web_sys::window() else {
705 log::warn!("Window unavailable for heartbeat check");
706 return 0;
707 };
708 let storage = match window.local_storage() {
709 Ok(Some(s)) => s,
710 Ok(None) | Err(_) => {
711 log::warn!("localStorage unavailable for heartbeat check (private browsing?)");
712 return 0;
713 }
714 };
715 let leader_key = format!("datasync_leader_{}", state.db_name);
716
717 if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
718 if let Some(colon_pos) = leader_data.rfind(':') {
719 if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
720 return timestamp;
721 }
722 }
723 }
724
725 state.last_heartbeat
727 }
728}