1use crate::types::DatabaseError;
8use js_sys::Date;
9use std::cell::RefCell;
10use std::rc::Rc;
11use wasm_bindgen::JsCast;
12use wasm_bindgen::prelude::*;
13use web_sys::BroadcastChannel;
14
15#[derive(Debug, Clone)]
17pub struct LeaderElectionState {
18 pub db_name: String,
19 pub instance_id: String,
20 pub is_leader: bool,
21 pub leader_id: Option<String>,
22 pub lease_expiry: u64,
23 pub last_heartbeat: u64,
24}
25
26pub struct LeaderElectionManager {
28 pub state: Rc<RefCell<LeaderElectionState>>,
29 broadcast_channel: Option<BroadcastChannel>,
30 pub heartbeat_interval: Option<i32>,
31 pub heartbeat_closure: Option<Closure<dyn FnMut()>>,
32 message_listener: Option<Closure<dyn FnMut(web_sys::MessageEvent)>>,
33 lease_duration_ms: u64,
34}
35
36impl LeaderElectionManager {
37 pub fn new(db_name: String) -> Self {
39 let timestamp = Date::now() as u64;
41 let random_part = (js_sys::Math::random() * 1000.0) as u64;
42 let instance_id = format!("{:016x}_{:03x}", timestamp, random_part);
43
44 log::debug!("Created instance {} for {}", instance_id, db_name);
45
46 Self {
47 state: Rc::new(RefCell::new(LeaderElectionState {
48 db_name,
49 instance_id,
50 is_leader: false,
51 leader_id: None,
52 lease_expiry: 0,
53 last_heartbeat: 0,
54 })),
55 broadcast_channel: None,
56 heartbeat_interval: None,
57 heartbeat_closure: None,
58 message_listener: None,
59 lease_duration_ms: 1000, }
61 }
62
63 pub async fn start_election(&mut self) -> Result<(), DatabaseError> {
65 log::debug!(
66 "LeaderElectionManager::start_election() - Starting for {}",
67 self.state.borrow().db_name
68 );
69
70 let channel_name = format!("datasync_leader_{}", self.state.borrow().db_name);
72 log::debug!(
73 "LeaderElectionManager::start_election() - Creating BroadcastChannel: {}",
74 channel_name
75 );
76 let broadcast_channel = BroadcastChannel::new(&channel_name).map_err(|_| {
77 DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to create BroadcastChannel")
78 })?;
79
80 let state_clone = self.state.clone();
82 let listener = Closure::wrap(Box::new(move |event: web_sys::MessageEvent| {
83 if let Ok(data) = event.data().dyn_into::<js_sys::JsString>() {
84 let message: String = data.into();
85
86 if let Some(parts) = message.strip_prefix("LEADER_CLAIMED:") {
88 let parts: Vec<&str> = parts.split(':').collect();
89 if parts.len() == 2 {
90 let new_leader_id = parts[0];
91 if let Ok(_timestamp) = parts[1].parse::<u64>() {
92 let mut state = state_clone.borrow_mut();
93 let my_instance_id = state.instance_id.clone();
94
95 if new_leader_id == my_instance_id {
96 state.is_leader = true;
98 state.leader_id = Some(new_leader_id.to_string());
99 log::info!(
100 "EVENT: Became leader for {} via BroadcastChannel",
101 state.db_name
102 );
103 } else {
104 state.is_leader = false;
106 state.leader_id = Some(new_leader_id.to_string());
107 log::debug!(
108 "EVENT: {} is now leader for {}",
109 new_leader_id,
110 state.db_name
111 );
112 }
113 }
114 }
115 }
116 }
117 }) as Box<dyn FnMut(web_sys::MessageEvent)>);
118
119 broadcast_channel.set_onmessage(Some(listener.as_ref().unchecked_ref()));
120 self.message_listener = Some(listener);
121 self.broadcast_channel = Some(broadcast_channel);
122
123 log::debug!("LeaderElectionManager::start_election() - Calling try_become_leader()");
125 self.try_become_leader().await?;
126
127 let is_leader = self.state.borrow().is_leader;
129 web_sys::console::log_1(
130 &format!(
131 "LeaderElectionManager::start_election() - After try_become_leader, is_leader={}",
132 is_leader
133 )
134 .into(),
135 );
136 if is_leader {
137 web_sys::console::log_1(
138 &"LeaderElectionManager::start_election() - Calling start_heartbeat()".into(),
139 );
140 self.start_heartbeat()?;
141 web_sys::console::log_1(
142 &"LeaderElectionManager::start_election() - Heartbeat started".into(),
143 );
144 } else {
145 web_sys::console::log_1(
146 &"LeaderElectionManager::start_election() - Not leader, skipping heartbeat".into(),
147 );
148 }
149
150 Ok(())
151 }
152
153 pub async fn try_become_leader_internal(&mut self, force: bool) -> Result<(), DatabaseError> {
158 let state = self.state.borrow();
159 let my_instance_id = state.instance_id.clone();
160 let db_name = state.db_name.clone();
161 drop(state);
162
163 let window = web_sys::window().ok_or_else(|| {
165 DatabaseError::new(
166 "STORAGE_ERROR",
167 "Window not available - not in browser context",
168 )
169 })?;
170 let storage = window
171 .local_storage()
172 .map_err(|_| {
173 DatabaseError::new(
174 "STORAGE_ERROR",
175 "localStorage access denied (check browser settings)",
176 )
177 })?
178 .ok_or_else(|| {
179 DatabaseError::new(
180 "STORAGE_ERROR",
181 "localStorage unavailable (private browsing mode?)",
182 )
183 })?;
184
185 let instances_key = format!("datasync_instances_{}", db_name);
186 let leader_key = format!("datasync_leader_{}", db_name);
187
188 let current_time = Date::now() as u64;
190 let instance_data = format!("{}:{}", my_instance_id, current_time);
191
192 let existing_instances = storage
194 .get_item(&instances_key)
195 .map_err(|e| {
196 DatabaseError::new(
197 "LEADER_ELECTION_ERROR",
198 &format!("Failed to get instances: {:?}", e),
199 )
200 })?
201 .unwrap_or_default();
202 let mut all_instances: Vec<String> = if existing_instances.is_empty() {
203 Vec::new()
204 } else {
205 existing_instances
206 .split(',')
207 .map(|s| s.to_string())
208 .collect()
209 };
210
211 if !all_instances
213 .iter()
214 .any(|inst| inst.starts_with(&format!("{}:", my_instance_id)))
215 {
216 all_instances.push(instance_data);
217 }
218
219 let cutoff_time = current_time - 10000;
221 all_instances.retain(|inst| {
222 if let Some(colon_pos) = inst.rfind(':') {
223 if let Ok(timestamp) = inst[colon_pos + 1..].parse::<u64>() {
224 timestamp > cutoff_time
225 } else {
226 false
227 }
228 } else {
229 false
230 }
231 });
232
233 let instances_str = all_instances.join(",");
235 storage
236 .set_item(&instances_key, &instances_str)
237 .map_err(|e| {
238 DatabaseError::new(
239 "LEADER_ELECTION_ERROR",
240 &format!("Failed to set instances: {:?}", e),
241 )
242 })?;
243
244 let mut instance_ids: Vec<String> = all_instances
246 .iter()
247 .filter_map(|inst| inst.split(':').next().map(|s| s.to_string()))
248 .collect();
249 instance_ids.sort();
250
251 log::debug!("All instances for {}: {:?}", db_name, instance_ids);
252
253 if let Some(lowest_id) = instance_ids.first() {
254 if force || *lowest_id == my_instance_id {
255 if force && *lowest_id != my_instance_id {
257 log::debug!(
258 "FORCING leadership takeover for {} (overriding lowest ID rule)",
259 db_name
260 );
261 } else {
262 log::debug!(
263 "I have the lowest ID - attempting atomic leadership claim for {}",
264 db_name
265 );
266 }
267
268 if let Ok(Some(existing_data)) = storage.get_item(&leader_key) {
270 if let Some(colon_pos) = existing_data.rfind(':') {
271 let existing_leader_id = &existing_data[..colon_pos];
272 if let Ok(existing_timestamp) =
273 existing_data[colon_pos + 1..].parse::<u64>()
274 {
275 let existing_lease_expired = (current_time - existing_timestamp) > 5000;
276
277 if !force
278 && !existing_lease_expired
279 && existing_leader_id != my_instance_id
280 {
281 log::debug!(
283 "{} already claimed leadership for {}",
284 existing_leader_id,
285 db_name
286 );
287
288 let mut state = self.state.borrow_mut();
289 state.is_leader = false;
290 state.leader_id = Some(existing_leader_id.to_string());
291 state.lease_expiry = existing_timestamp + self.lease_duration_ms;
292 return Ok(());
293 }
294 }
295 }
296 }
297
298 let leader_data = format!("{}:{}", my_instance_id, current_time);
300 storage.set_item(&leader_key, &leader_data).map_err(|e| {
301 DatabaseError::new(
302 "LEADER_ELECTION_ERROR",
303 &format!("Failed to set leader: {:?}", e),
304 )
305 })?;
306
307 let mut state = self.state.borrow_mut();
308 state.is_leader = true;
309 state.leader_id = Some(my_instance_id.clone());
310 state.lease_expiry = current_time + self.lease_duration_ms;
311 drop(state);
312
313 log::info!("Became leader for {} with ID {}", db_name, my_instance_id);
314
315 if let Some(ref channel) = self.broadcast_channel {
317 let message = format!("LEADER_CLAIMED:{}:{}", my_instance_id, current_time);
318 if let Err(e) = channel.post_message(&JsValue::from_str(&message)) {
319 log::warn!("Failed to broadcast leadership claim: {:?}", e);
320 } else {
321 log::debug!("EVENT: Broadcasted leadership claim for {}", db_name);
322 }
323 }
324
325 if self.heartbeat_interval.is_none() {
327 let _ = self.start_heartbeat();
328 }
329 } else {
330 log::debug!(
332 "Instance {} has lower ID - not claiming leadership for {}",
333 lowest_id,
334 db_name
335 );
336
337 let mut state = self.state.borrow_mut();
338 state.is_leader = false;
339 state.leader_id = Some(lowest_id.clone());
340 state.lease_expiry = current_time + self.lease_duration_ms;
341 }
342 }
343
344 Ok(())
345 }
346
347 pub async fn try_become_leader(&mut self) -> Result<(), DatabaseError> {
349 self.try_become_leader_internal(false).await
350 }
351
352 pub async fn force_become_leader(&mut self) -> Result<(), DatabaseError> {
354 self.try_become_leader_internal(true).await
355 }
356
357 pub fn start_heartbeat(&mut self) -> Result<(), DatabaseError> {
359 web_sys::console::log_1(&"start_heartbeat() called".into());
360
361 let state = self.state.borrow();
363 web_sys::console::log_1(
364 &format!(
365 "start_heartbeat: is_leader={}, db_name={}",
366 state.is_leader, state.db_name
367 )
368 .into(),
369 );
370
371 if state.is_leader {
372 let current_time = Date::now() as u64;
373
374 let window = web_sys::window()
375 .ok_or_else(|| DatabaseError::new("LEADER_ELECTION_ERROR", "Window unavailable"))?;
376 let storage = window
377 .local_storage()
378 .map_err(|_| {
379 DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage unavailable")
380 })?
381 .ok_or_else(|| {
382 DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage is None")
383 })?;
384
385 let leader_key = format!("datasync_leader_{}", state.db_name);
386 let leader_data = format!("{}:{}", state.instance_id, current_time);
387
388 web_sys::console::log_1(
389 &format!(
390 "Writing heartbeat: key={}, data={}",
391 leader_key, leader_data
392 )
393 .into(),
394 );
395
396 storage.set_item(&leader_key, &leader_data).map_err(|e| {
397 web_sys::console::error_1(
398 &format!("Failed to write initial heartbeat: {:?}", e).into(),
399 );
400 DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to write initial heartbeat")
401 })?;
402
403 web_sys::console::log_1(
404 &format!(
405 "Sent initial heartbeat for {} from leader {}",
406 state.db_name, state.instance_id
407 )
408 .into(),
409 );
410 } else {
411 web_sys::console::warn_1(&"start_heartbeat called but is_leader=false".into());
412 }
413 drop(state);
414
415 let state_clone = self.state.clone();
417
418 let closure = Closure::wrap(Box::new(move || {
419 let state = state_clone.borrow();
420 if state.is_leader {
421 let current_time = Date::now() as u64;
422
423 let window = match web_sys::window() {
425 Some(w) => w,
426 None => {
427 log::error!("Window unavailable in heartbeat - stopping heartbeat");
428 return;
429 }
430 };
431 let storage = match window.local_storage() {
432 Ok(Some(s)) => s,
433 Ok(None) => {
434 log::warn!("localStorage unavailable in heartbeat (private browsing?)");
435 return;
436 }
437 Err(_) => {
438 log::error!("localStorage access denied in heartbeat");
439 return;
440 }
441 };
442 let leader_key = format!("datasync_leader_{}", state.db_name);
443 let leader_data = format!("{}:{}", state.instance_id, current_time);
444
445 let _ = storage.set_item(&leader_key, &leader_data);
446
447 log::debug!(
448 "Updated leader heartbeat for {} from leader {}",
449 state.db_name,
450 state.instance_id
451 );
452 }
453 }) as Box<dyn FnMut()>);
454
455 let interval_id = web_sys::window()
456 .unwrap()
457 .set_interval_with_callback_and_timeout_and_arguments_0(
458 closure.as_ref().unchecked_ref(),
459 1000, )
461 .map_err(|_| {
462 DatabaseError::new(
463 "LEADER_ELECTION_ERROR",
464 "Failed to start heartbeat interval",
465 )
466 })?;
467
468 self.heartbeat_interval = Some(interval_id);
470 self.heartbeat_closure = Some(closure);
471
472 Ok(())
473 }
474
475 pub async fn stop_election(&mut self) -> Result<(), DatabaseError> {
477 if self.heartbeat_interval.is_none() && self.heartbeat_closure.is_none() {
479 web_sys::console::log_1(&"[STOP] Already stopped - skipping".into());
480 return Ok(());
481 }
482
483 let state = self.state.borrow();
484 let db_name = state.db_name.clone();
485 let instance_id = state.instance_id.clone();
486 let was_leader = state.is_leader;
487 drop(state);
488
489 log::info!(
490 "[STOP] Stopping leader election for {} (was_leader: {})",
491 db_name,
492 was_leader
493 );
494
495 if let Some(interval_id) = self.heartbeat_interval.take() {
497 web_sys::console::log_1(
498 &format!("[STOP] Clearing interval {} for {}", interval_id, db_name).into(),
499 );
500 if let Some(window) = web_sys::window() {
501 window.clear_interval_with_handle(interval_id);
502 }
503 }
504
505 if let Some(_closure) = self.heartbeat_closure.take() {
507 web_sys::console::log_1(
508 &format!("[STOP] Dropped heartbeat closure for {}", db_name).into(),
509 );
510 }
511
512 if let Some(channel) = self.broadcast_channel.take() {
514 channel.close();
515 web_sys::console::log_1(
516 &format!("[STOP] Closed BroadcastChannel for {}", db_name).into(),
517 );
518 }
519
520 let Some(window) = web_sys::window() else {
522 log::warn!("Window unavailable during cleanup");
523 return Ok(());
524 };
525 let storage = match window.local_storage() {
526 Ok(Some(s)) => s,
527 Ok(None) | Err(_) => {
528 log::warn!("localStorage unavailable during cleanup (private browsing?)");
529 return Ok(());
530 }
531 };
532 let instances_key = format!("datasync_instances_{}", db_name);
533
534 if let Ok(Some(existing_instances)) = storage.get_item(&instances_key) {
535 let all_instances: Vec<String> = existing_instances
536 .split(',')
537 .map(|s| s.to_string())
538 .collect();
539 let filtered_instances: Vec<String> = all_instances
540 .into_iter()
541 .filter(|inst| !inst.starts_with(&format!("{}:", instance_id)))
542 .collect();
543
544 if filtered_instances.is_empty() {
545 let _ = storage.remove_item(&instances_key);
547 } else {
548 let instances_str = filtered_instances.join(",");
550 let _ = storage.set_item(&instances_key, &instances_str);
551 }
552 }
553
554 if was_leader {
556 let leader_key = format!("datasync_leader_{}", db_name);
557 let _ = storage.remove_item(&leader_key);
558
559 log::debug!(
560 "Cleared leader data for {} (was leader: {})",
561 db_name,
562 instance_id
563 );
564 }
565
566 let mut state = self.state.borrow_mut();
568 state.is_leader = false;
569 state.leader_id = None;
570
571 Ok(())
572 }
573
574 pub async fn is_leader(&self) -> bool {
576 let now = Date::now() as u64;
577 let state = self.state.borrow();
578 let db_name = state.db_name.clone();
579 let my_instance_id = state.instance_id.clone();
580
581 let Some(window) = web_sys::window() else {
583 log::warn!("Window unavailable for leader check - assuming not leader");
584 return false;
585 };
586 let storage = match window.local_storage() {
587 Ok(Some(s)) => s,
588 Ok(None) => {
589 log::warn!(
590 "localStorage unavailable for leader check (private browsing?) - assuming not leader"
591 );
592 return false;
593 }
594 Err(_) => {
595 log::error!("localStorage access denied for leader check - assuming not leader");
596 return false;
597 }
598 };
599 let leader_key = format!("datasync_leader_{}", db_name);
600
601 let current_leader_expired = if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
603 if let Some(colon_pos) = leader_data.rfind(':') {
604 let leader_id = &leader_data[..colon_pos];
605 if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
606 let lease_expired = (now - timestamp) > 5000; if leader_id == my_instance_id && !lease_expired {
609 return true; }
611
612 lease_expired } else {
614 true }
616 } else {
617 true }
619 } else {
620 true };
622
623 drop(state);
624
625 if current_leader_expired {
627 log::debug!(
628 "Current leader lease expired for {} - re-election needed",
629 db_name
630 );
631
632 let mut state = self.state.borrow_mut();
634 state.is_leader = false;
635 state.leader_id = None;
636 false
637 } else {
638 let mut state = self.state.borrow_mut();
640 state.is_leader = false;
641 false
642 }
643 }
644
645 pub async fn send_heartbeat(&self) -> Result<(), DatabaseError> {
647 if let Some(ref channel) = self.broadcast_channel {
648 let state = self.state.borrow();
649 let now = Date::now() as u64;
650
651 let message = js_sys::Object::new();
652 js_sys::Reflect::set(&message, &"type".into(), &"heartbeat".into()).unwrap();
653 js_sys::Reflect::set(
654 &message,
655 &"leader_id".into(),
656 &state.instance_id.clone().into(),
657 )
658 .unwrap();
659 js_sys::Reflect::set(&message, &"timestamp".into(), &(now as f64).into()).unwrap();
660
661 channel.post_message(&message).map_err(|_| {
662 DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to send heartbeat")
663 })?;
664 }
665
666 Ok(())
667 }
668
669 pub async fn get_last_heartbeat(&self) -> u64 {
671 let state = self.state.borrow();
672 let Some(window) = web_sys::window() else {
673 log::warn!("Window unavailable for heartbeat check");
674 return 0;
675 };
676 let storage = match window.local_storage() {
677 Ok(Some(s)) => s,
678 Ok(None) | Err(_) => {
679 log::warn!("localStorage unavailable for heartbeat check (private browsing?)");
680 return 0;
681 }
682 };
683 let leader_key = format!("datasync_leader_{}", state.db_name);
684
685 if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
686 if let Some(colon_pos) = leader_data.rfind(':') {
687 if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
688 return timestamp;
689 }
690 }
691 }
692
693 state.last_heartbeat
695 }
696}