1use crate::types::DatabaseError;
8use js_sys::Date;
9use std::cell::RefCell;
10use std::rc::Rc;
11use wasm_bindgen::JsCast;
12use wasm_bindgen::prelude::*;
13use web_sys::BroadcastChannel;
14
15thread_local! {
18 static HEARTBEAT_RUNNING: RefCell<bool> = const { RefCell::new(false) };
19}
20
21#[derive(Debug, Clone)]
23pub struct LeaderElectionState {
24 pub db_name: String,
25 pub instance_id: String,
26 pub is_leader: bool,
27 pub leader_id: Option<String>,
28 pub lease_expiry: u64,
29 pub last_heartbeat: u64,
30}
31
32pub struct LeaderElectionManager {
34 pub state: Rc<RefCell<LeaderElectionState>>,
35 broadcast_channel: Option<BroadcastChannel>,
36 pub heartbeat_interval: Option<i32>,
37 message_listener: Option<Closure<dyn FnMut(web_sys::MessageEvent)>>,
41 lease_duration_ms: u64,
42 heartbeat_valid: Rc<RefCell<bool>>,
45}
46
47impl LeaderElectionManager {
48 pub fn new(db_name: String) -> Self {
50 let timestamp = Date::now() as u64;
52 let random_part = (js_sys::Math::random() * 1000.0) as u64;
53 let instance_id = format!("{:016x}_{:03x}", timestamp, random_part);
54
55 log::debug!("Created instance {} for {}", instance_id, db_name);
56
57 Self {
58 state: Rc::new(RefCell::new(LeaderElectionState {
59 db_name,
60 instance_id,
61 is_leader: false,
62 leader_id: None,
63 lease_expiry: 0,
64 last_heartbeat: 0,
65 })),
66 broadcast_channel: None,
67 heartbeat_interval: None,
68 message_listener: None,
69 lease_duration_ms: 1000, heartbeat_valid: Rc::new(RefCell::new(false)),
71 }
72 }
73
74 pub async fn start_election(&mut self) -> Result<(), DatabaseError> {
76 log::debug!(
77 "LeaderElectionManager::start_election() - Starting for {}",
78 self.state.borrow().db_name
79 );
80
81 let channel_name = format!("datasync_leader_{}", self.state.borrow().db_name);
83 log::debug!(
84 "LeaderElectionManager::start_election() - Creating BroadcastChannel: {}",
85 channel_name
86 );
87 let broadcast_channel = BroadcastChannel::new(&channel_name).map_err(|_| {
88 DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to create BroadcastChannel")
89 })?;
90
91 let state_clone = self.state.clone();
93 let listener = Closure::wrap(Box::new(move |event: web_sys::MessageEvent| {
94 if let Ok(data) = event.data().dyn_into::<js_sys::JsString>() {
95 let message: String = data.into();
96
97 if let Some(parts) = message.strip_prefix("LEADER_CLAIMED:") {
99 let parts: Vec<&str> = parts.split(':').collect();
100 if parts.len() == 2 {
101 let new_leader_id = parts[0];
102 if let Ok(_timestamp) = parts[1].parse::<u64>() {
103 let mut state = state_clone.borrow_mut();
104 let my_instance_id = state.instance_id.clone();
105
106 if new_leader_id == my_instance_id {
107 state.is_leader = true;
109 state.leader_id = Some(new_leader_id.to_string());
110 log::info!(
111 "EVENT: Became leader for {} via BroadcastChannel",
112 state.db_name
113 );
114 } else {
115 state.is_leader = false;
117 state.leader_id = Some(new_leader_id.to_string());
118 log::debug!(
119 "EVENT: {} is now leader for {}",
120 new_leader_id,
121 state.db_name
122 );
123 }
124 }
125 }
126 }
127 }
128 }) as Box<dyn FnMut(web_sys::MessageEvent)>);
129
130 broadcast_channel.set_onmessage(Some(listener.as_ref().unchecked_ref()));
131 self.message_listener = Some(listener);
132 self.broadcast_channel = Some(broadcast_channel);
133
134 log::debug!("LeaderElectionManager::start_election() - Calling try_become_leader()");
136 self.try_become_leader().await?;
137
138 let is_leader = self.state.borrow().is_leader;
140 web_sys::console::log_1(
141 &format!(
142 "LeaderElectionManager::start_election() - After try_become_leader, is_leader={}",
143 is_leader
144 )
145 .into(),
146 );
147 if is_leader {
148 web_sys::console::log_1(
149 &"LeaderElectionManager::start_election() - Calling start_heartbeat()".into(),
150 );
151 self.start_heartbeat()?;
152 web_sys::console::log_1(
153 &"LeaderElectionManager::start_election() - Heartbeat started".into(),
154 );
155 } else {
156 web_sys::console::log_1(
157 &"LeaderElectionManager::start_election() - Not leader, skipping heartbeat".into(),
158 );
159 }
160
161 Ok(())
162 }
163
164 pub async fn try_become_leader_internal(&mut self, force: bool) -> Result<(), DatabaseError> {
169 let state = self.state.borrow();
170 let my_instance_id = state.instance_id.clone();
171 let db_name = state.db_name.clone();
172 drop(state);
173
174 let window = web_sys::window().ok_or_else(|| {
176 DatabaseError::new(
177 "STORAGE_ERROR",
178 "Window not available - not in browser context",
179 )
180 })?;
181 let storage = window
182 .local_storage()
183 .map_err(|_| {
184 DatabaseError::new(
185 "STORAGE_ERROR",
186 "localStorage access denied (check browser settings)",
187 )
188 })?
189 .ok_or_else(|| {
190 DatabaseError::new(
191 "STORAGE_ERROR",
192 "localStorage unavailable (private browsing mode?)",
193 )
194 })?;
195
196 let instances_key = format!("datasync_instances_{}", db_name);
197 let leader_key = format!("datasync_leader_{}", db_name);
198
199 let current_time = Date::now() as u64;
201 let instance_data = format!("{}:{}", my_instance_id, current_time);
202
203 let existing_instances = storage
205 .get_item(&instances_key)
206 .map_err(|e| {
207 DatabaseError::new(
208 "LEADER_ELECTION_ERROR",
209 &format!("Failed to get instances: {:?}", e),
210 )
211 })?
212 .unwrap_or_default();
213 let mut all_instances: Vec<String> = if existing_instances.is_empty() {
214 Vec::new()
215 } else {
216 existing_instances
217 .split(',')
218 .map(|s| s.to_string())
219 .collect()
220 };
221
222 if !all_instances
224 .iter()
225 .any(|inst| inst.starts_with(&format!("{}:", my_instance_id)))
226 {
227 all_instances.push(instance_data);
228 }
229
230 let cutoff_time = current_time - 10000;
232 all_instances.retain(|inst| {
233 if let Some(colon_pos) = inst.rfind(':') {
234 if let Ok(timestamp) = inst[colon_pos + 1..].parse::<u64>() {
235 timestamp > cutoff_time
236 } else {
237 false
238 }
239 } else {
240 false
241 }
242 });
243
244 let instances_str = all_instances.join(",");
246 storage
247 .set_item(&instances_key, &instances_str)
248 .map_err(|e| {
249 DatabaseError::new(
250 "LEADER_ELECTION_ERROR",
251 &format!("Failed to set instances: {:?}", e),
252 )
253 })?;
254
255 let mut instance_ids: Vec<String> = all_instances
257 .iter()
258 .filter_map(|inst| inst.split(':').next().map(|s| s.to_string()))
259 .collect();
260 instance_ids.sort();
261
262 log::debug!("All instances for {}: {:?}", db_name, instance_ids);
263
264 if let Some(lowest_id) = instance_ids.first() {
265 if force || *lowest_id == my_instance_id {
266 if force && *lowest_id != my_instance_id {
268 log::debug!(
269 "FORCING leadership takeover for {} (overriding lowest ID rule)",
270 db_name
271 );
272 } else {
273 log::debug!(
274 "I have the lowest ID - attempting atomic leadership claim for {}",
275 db_name
276 );
277 }
278
279 if let Ok(Some(existing_data)) = storage.get_item(&leader_key) {
281 if let Some(colon_pos) = existing_data.rfind(':') {
282 let existing_leader_id = &existing_data[..colon_pos];
283 if let Ok(existing_timestamp) =
284 existing_data[colon_pos + 1..].parse::<u64>()
285 {
286 let existing_lease_expired = (current_time - existing_timestamp) > 5000;
287
288 if !force
289 && !existing_lease_expired
290 && existing_leader_id != my_instance_id
291 {
292 log::debug!(
294 "{} already claimed leadership for {}",
295 existing_leader_id,
296 db_name
297 );
298
299 let mut state = self.state.borrow_mut();
300 state.is_leader = false;
301 state.leader_id = Some(existing_leader_id.to_string());
302 state.lease_expiry = existing_timestamp + self.lease_duration_ms;
303 return Ok(());
304 }
305 }
306 }
307 }
308
309 let leader_data = format!("{}:{}", my_instance_id, current_time);
311 storage.set_item(&leader_key, &leader_data).map_err(|e| {
312 DatabaseError::new(
313 "LEADER_ELECTION_ERROR",
314 &format!("Failed to set leader: {:?}", e),
315 )
316 })?;
317
318 let mut state = self.state.borrow_mut();
319 state.is_leader = true;
320 state.leader_id = Some(my_instance_id.clone());
321 state.lease_expiry = current_time + self.lease_duration_ms;
322 drop(state);
323
324 log::info!("Became leader for {} with ID {}", db_name, my_instance_id);
325
326 if let Some(ref channel) = self.broadcast_channel {
328 let message = format!("LEADER_CLAIMED:{}:{}", my_instance_id, current_time);
329 if let Err(e) = channel.post_message(&JsValue::from_str(&message)) {
330 log::warn!("Failed to broadcast leadership claim: {:?}", e);
331 } else {
332 log::debug!("EVENT: Broadcasted leadership claim for {}", db_name);
333 }
334 }
335
336 if self.heartbeat_interval.is_none() {
338 let _ = self.start_heartbeat();
339 }
340 } else {
341 log::debug!(
343 "Instance {} has lower ID - not claiming leadership for {}",
344 lowest_id,
345 db_name
346 );
347
348 let mut state = self.state.borrow_mut();
349 state.is_leader = false;
350 state.leader_id = Some(lowest_id.clone());
351 state.lease_expiry = current_time + self.lease_duration_ms;
352 }
353 }
354
355 Ok(())
356 }
357
358 pub async fn try_become_leader(&mut self) -> Result<(), DatabaseError> {
360 self.try_become_leader_internal(false).await
361 }
362
363 pub async fn force_become_leader(&mut self) -> Result<(), DatabaseError> {
365 self.try_become_leader_internal(true).await
366 }
367
368 pub fn start_heartbeat(&mut self) -> Result<(), DatabaseError> {
370 web_sys::console::log_1(&"start_heartbeat() called".into());
371
372 let state = self.state.borrow();
374 web_sys::console::log_1(
375 &format!(
376 "start_heartbeat: is_leader={}, db_name={}",
377 state.is_leader, state.db_name
378 )
379 .into(),
380 );
381
382 if state.is_leader {
383 let current_time = Date::now() as u64;
384
385 let window = web_sys::window()
386 .ok_or_else(|| DatabaseError::new("LEADER_ELECTION_ERROR", "Window unavailable"))?;
387 let storage = window
388 .local_storage()
389 .map_err(|_| {
390 DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage unavailable")
391 })?
392 .ok_or_else(|| {
393 DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage is None")
394 })?;
395
396 let leader_key = format!("datasync_leader_{}", state.db_name);
397 let leader_data = format!("{}:{}", state.instance_id, current_time);
398
399 web_sys::console::log_1(
400 &format!(
401 "Writing heartbeat: key={}, data={}",
402 leader_key, leader_data
403 )
404 .into(),
405 );
406
407 storage.set_item(&leader_key, &leader_data).map_err(|e| {
408 web_sys::console::error_1(
409 &format!("Failed to write initial heartbeat: {:?}", e).into(),
410 );
411 DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to write initial heartbeat")
412 })?;
413
414 web_sys::console::log_1(
415 &format!(
416 "Sent initial heartbeat for {} from leader {}",
417 state.db_name, state.instance_id
418 )
419 .into(),
420 );
421 } else {
422 web_sys::console::warn_1(&"start_heartbeat called but is_leader=false".into());
423 }
424 drop(state);
425
426 let state_clone = self.state.clone();
428 let valid_clone = self.heartbeat_valid.clone();
429
430 *self.heartbeat_valid.borrow_mut() = true;
432
433 let closure = Closure::wrap(Box::new(move || {
434 if !*valid_clone.borrow() {
438 return;
440 }
441
442 let already_running = HEARTBEAT_RUNNING.with(|running| {
445 let was_running = *running.borrow();
446 if !was_running {
447 *running.borrow_mut() = true;
448 }
449 was_running
450 });
451
452 if already_running {
453 log::debug!("Heartbeat skipped - previous invocation still running");
454 return;
455 }
456
457 struct HeartbeatGuard;
459 impl Drop for HeartbeatGuard {
460 fn drop(&mut self) {
461 HEARTBEAT_RUNNING.with(|running| {
462 *running.borrow_mut() = false;
463 });
464 }
465 }
466 let _guard = HeartbeatGuard;
467
468 let state = state_clone.borrow();
469 if state.is_leader {
470 let current_time = Date::now() as u64;
471
472 let window = match web_sys::window() {
474 Some(w) => w,
475 None => {
476 log::error!("Window unavailable in heartbeat - stopping heartbeat");
477 return;
478 }
479 };
480 let storage = match window.local_storage() {
481 Ok(Some(s)) => s,
482 Ok(None) => {
483 log::warn!("localStorage unavailable in heartbeat (private browsing?)");
484 return;
485 }
486 Err(_) => {
487 log::error!("localStorage access denied in heartbeat");
488 return;
489 }
490 };
491 let leader_key = format!("datasync_leader_{}", state.db_name);
492 let leader_data = format!("{}:{}", state.instance_id, current_time);
493
494 let _ = storage.set_item(&leader_key, &leader_data);
495
496 log::debug!(
497 "Updated leader heartbeat for {} from leader {}",
498 state.db_name,
499 state.instance_id
500 );
501 }
502 }) as Box<dyn FnMut()>);
503
504 let interval_id = web_sys::window()
505 .unwrap()
506 .set_interval_with_callback_and_timeout_and_arguments_0(
507 closure.as_ref().unchecked_ref(),
508 1000, )
510 .map_err(|_| {
511 DatabaseError::new(
512 "LEADER_ELECTION_ERROR",
513 "Failed to start heartbeat interval",
514 )
515 })?;
516
517 self.heartbeat_interval = Some(interval_id);
518
519 closure.forget();
533
534 Ok(())
535 }
536
537 pub async fn stop_election(&mut self) -> Result<(), DatabaseError> {
539 if self.heartbeat_interval.is_none() && !*self.heartbeat_valid.borrow() {
541 web_sys::console::log_1(&"[STOP] Already stopped - skipping".into());
542 return Ok(());
543 }
544
545 let state = self.state.borrow();
546 let db_name = state.db_name.clone();
547 let instance_id = state.instance_id.clone();
548 let was_leader = state.is_leader;
549 drop(state);
550
551 log::info!(
552 "[STOP] Stopping leader election for {} (was_leader: {})",
553 db_name,
554 was_leader
555 );
556
557 *self.heartbeat_valid.borrow_mut() = false;
561 web_sys::console::log_1(&format!("[STOP] Invalidated heartbeat for {}", db_name).into());
562
563 if let Some(interval_id) = self.heartbeat_interval.take() {
565 web_sys::console::log_1(
566 &format!("[STOP] Clearing interval {} for {}", interval_id, db_name).into(),
567 );
568 if let Some(window) = web_sys::window() {
569 window.clear_interval_with_handle(interval_id);
570 }
571 }
572
573 if let Some(channel) = self.broadcast_channel.take() {
575 channel.close();
576 web_sys::console::log_1(
577 &format!("[STOP] Closed BroadcastChannel for {}", db_name).into(),
578 );
579 }
580
581 let Some(window) = web_sys::window() else {
583 log::warn!("Window unavailable during cleanup");
584 return Ok(());
585 };
586 let storage = match window.local_storage() {
587 Ok(Some(s)) => s,
588 Ok(None) | Err(_) => {
589 log::warn!("localStorage unavailable during cleanup (private browsing?)");
590 return Ok(());
591 }
592 };
593 let instances_key = format!("datasync_instances_{}", db_name);
594
595 if let Ok(Some(existing_instances)) = storage.get_item(&instances_key) {
596 let all_instances: Vec<String> = existing_instances
597 .split(',')
598 .map(|s| s.to_string())
599 .collect();
600 let filtered_instances: Vec<String> = all_instances
601 .into_iter()
602 .filter(|inst| !inst.starts_with(&format!("{}:", instance_id)))
603 .collect();
604
605 if filtered_instances.is_empty() {
606 let _ = storage.remove_item(&instances_key);
608 } else {
609 let instances_str = filtered_instances.join(",");
611 let _ = storage.set_item(&instances_key, &instances_str);
612 }
613 }
614
615 if was_leader {
617 let leader_key = format!("datasync_leader_{}", db_name);
618 let _ = storage.remove_item(&leader_key);
619
620 log::debug!(
621 "Cleared leader data for {} (was leader: {})",
622 db_name,
623 instance_id
624 );
625 }
626
627 let mut state = self.state.borrow_mut();
629 state.is_leader = false;
630 state.leader_id = None;
631
632 Ok(())
633 }
634
635 pub async fn is_leader(&self) -> bool {
637 let now = Date::now() as u64;
638 let state = self.state.borrow();
639 let db_name = state.db_name.clone();
640 let my_instance_id = state.instance_id.clone();
641
642 let Some(window) = web_sys::window() else {
644 log::warn!("Window unavailable for leader check - assuming not leader");
645 return false;
646 };
647 let storage = match window.local_storage() {
648 Ok(Some(s)) => s,
649 Ok(None) => {
650 log::warn!(
651 "localStorage unavailable for leader check (private browsing?) - assuming not leader"
652 );
653 return false;
654 }
655 Err(_) => {
656 log::error!("localStorage access denied for leader check - assuming not leader");
657 return false;
658 }
659 };
660 let leader_key = format!("datasync_leader_{}", db_name);
661
662 let current_leader_expired = if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
664 if let Some(colon_pos) = leader_data.rfind(':') {
665 let leader_id = &leader_data[..colon_pos];
666 if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
667 let lease_expired = (now - timestamp) > 5000; if leader_id == my_instance_id && !lease_expired {
670 return true; }
672
673 lease_expired } else {
675 true }
677 } else {
678 true }
680 } else {
681 true };
683
684 drop(state);
685
686 if current_leader_expired {
688 log::debug!(
689 "Current leader lease expired for {} - re-election needed",
690 db_name
691 );
692
693 let mut state = self.state.borrow_mut();
695 state.is_leader = false;
696 state.leader_id = None;
697 false
698 } else {
699 let mut state = self.state.borrow_mut();
701 state.is_leader = false;
702 false
703 }
704 }
705
706 pub async fn send_heartbeat(&self) -> Result<(), DatabaseError> {
708 if let Some(ref channel) = self.broadcast_channel {
709 let state = self.state.borrow();
710 let now = Date::now() as u64;
711
712 let message = js_sys::Object::new();
713 js_sys::Reflect::set(&message, &"type".into(), &"heartbeat".into()).unwrap();
714 js_sys::Reflect::set(
715 &message,
716 &"leader_id".into(),
717 &state.instance_id.clone().into(),
718 )
719 .unwrap();
720 js_sys::Reflect::set(&message, &"timestamp".into(), &(now as f64).into()).unwrap();
721
722 channel.post_message(&message).map_err(|_| {
723 DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to send heartbeat")
724 })?;
725 }
726
727 Ok(())
728 }
729
730 pub async fn get_last_heartbeat(&self) -> u64 {
732 let state = self.state.borrow();
733 let Some(window) = web_sys::window() else {
734 log::warn!("Window unavailable for heartbeat check");
735 return 0;
736 };
737 let storage = match window.local_storage() {
738 Ok(Some(s)) => s,
739 Ok(None) | Err(_) => {
740 log::warn!("localStorage unavailable for heartbeat check (private browsing?)");
741 return 0;
742 }
743 };
744 let leader_key = format!("datasync_leader_{}", state.db_name);
745
746 if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
747 if let Some(colon_pos) = leader_data.rfind(':') {
748 if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
749 return timestamp;
750 }
751 }
752 }
753
754 state.last_heartbeat
756 }
757}
758
759impl Drop for LeaderElectionManager {
766 fn drop(&mut self) {
767 *self.heartbeat_valid.borrow_mut() = false;
769
770 if let Some(interval_id) = self.heartbeat_interval.take() {
772 if let Some(window) = web_sys::window() {
773 window.clear_interval_with_handle(interval_id);
774 log::debug!(
775 "LeaderElectionManager::drop() - Cleared heartbeat interval {}",
776 interval_id
777 );
778 }
779 }
780
781 if let Some(channel) = self.broadcast_channel.take() {
783 channel.close();
784 log::debug!("LeaderElectionManager::drop() - Closed BroadcastChannel");
785 }
786
787 log::debug!(
790 "LeaderElectionManager::drop() - Cleanup complete for {}",
791 self.state.borrow().db_name
792 );
793 }
794}