absurder_sql/storage/
leader_election.rs

1//! Multi-Tab Leader Election Implementation
2//!
3//! Implements leader election using BroadcastChannel + deterministic leader selection.
4//! Only one tab/instance can be the leader at any time for a given database.
5//! Uses lowest instance ID wins approach to resolve race conditions.
6
7use crate::types::DatabaseError;
8use js_sys::Date;
9use std::cell::RefCell;
10use std::rc::Rc;
11use wasm_bindgen::JsCast;
12use wasm_bindgen::prelude::*;
13use web_sys::BroadcastChannel;
14
15// Thread-local reentrancy guard for heartbeat closure
16// Prevents "closure invoked recursively" errors from wasm-bindgen
17thread_local! {
18    static HEARTBEAT_RUNNING: RefCell<bool> = const { RefCell::new(false) };
19}
20
21/// Leader election state for a database instance
22#[derive(Debug, Clone)]
23pub struct LeaderElectionState {
24    pub db_name: String,
25    pub instance_id: String,
26    pub is_leader: bool,
27    pub leader_id: Option<String>,
28    pub lease_expiry: u64,
29    pub last_heartbeat: u64,
30}
31
32/// Manager for multi-tab leader election
33pub struct LeaderElectionManager {
34    pub state: Rc<RefCell<LeaderElectionState>>,
35    broadcast_channel: Option<BroadcastChannel>,
36    pub heartbeat_interval: Option<i32>,
37    // NOTE: heartbeat_closure is intentionally leaked via Closure::forget()
38    // to prevent "closure invoked after being dropped" errors from pending ticks.
39    // The heartbeat_valid flag makes the leaked closure a no-op after stop.
40    message_listener: Option<Closure<dyn FnMut(web_sys::MessageEvent)>>,
41    lease_duration_ms: u64,
42    /// Validity flag - set to false before clearing interval to prevent
43    /// leaked closure from doing any work after stop_election is called
44    heartbeat_valid: Rc<RefCell<bool>>,
45}
46
47impl LeaderElectionManager {
48    /// Create new leader election manager with deterministic instance ID
49    pub fn new(db_name: String) -> Self {
50        // Create deterministic instance ID: timestamp + random for uniqueness and ordering
51        let timestamp = Date::now() as u64;
52        let random_part = (js_sys::Math::random() * 1000.0) as u64;
53        let instance_id = format!("{:016x}_{:03x}", timestamp, random_part);
54
55        log::debug!("Created instance {} for {}", instance_id, db_name);
56
57        Self {
58            state: Rc::new(RefCell::new(LeaderElectionState {
59                db_name,
60                instance_id,
61                is_leader: false,
62                leader_id: None,
63                lease_expiry: 0,
64                last_heartbeat: 0,
65            })),
66            broadcast_channel: None,
67            heartbeat_interval: None,
68            message_listener: None,
69            lease_duration_ms: 1000, // 1 second - fast leader election
70            heartbeat_valid: Rc::new(RefCell::new(false)),
71        }
72    }
73
74    /// Start leader election process using localStorage coordination
75    pub async fn start_election(&mut self) -> Result<(), DatabaseError> {
76        log::debug!(
77            "LeaderElectionManager::start_election() - Starting for {}",
78            self.state.borrow().db_name
79        );
80
81        // Create BroadcastChannel for EVENT-BASED leader election
82        let channel_name = format!("datasync_leader_{}", self.state.borrow().db_name);
83        log::debug!(
84            "LeaderElectionManager::start_election() - Creating BroadcastChannel: {}",
85            channel_name
86        );
87        let broadcast_channel = BroadcastChannel::new(&channel_name).map_err(|_| {
88            DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to create BroadcastChannel")
89        })?;
90
91        // Set up EVENT LISTENER for leadership change messages
92        let state_clone = self.state.clone();
93        let listener = Closure::wrap(Box::new(move |event: web_sys::MessageEvent| {
94            if let Ok(data) = event.data().dyn_into::<js_sys::JsString>() {
95                let message: String = data.into();
96
97                // Parse message: "LEADER_CLAIMED:instance_id:timestamp"
98                if let Some(parts) = message.strip_prefix("LEADER_CLAIMED:") {
99                    let parts: Vec<&str> = parts.split(':').collect();
100                    if parts.len() == 2 {
101                        let new_leader_id = parts[0];
102                        if let Ok(_timestamp) = parts[1].parse::<u64>() {
103                            let mut state = state_clone.borrow_mut();
104                            let my_instance_id = state.instance_id.clone();
105
106                            if new_leader_id == my_instance_id {
107                                // We're the leader!
108                                state.is_leader = true;
109                                state.leader_id = Some(new_leader_id.to_string());
110                                log::info!(
111                                    "EVENT: Became leader for {} via BroadcastChannel",
112                                    state.db_name
113                                );
114                            } else {
115                                // Someone else is leader
116                                state.is_leader = false;
117                                state.leader_id = Some(new_leader_id.to_string());
118                                log::debug!(
119                                    "EVENT: {} is now leader for {}",
120                                    new_leader_id,
121                                    state.db_name
122                                );
123                            }
124                        }
125                    }
126                }
127            }
128        }) as Box<dyn FnMut(web_sys::MessageEvent)>);
129
130        broadcast_channel.set_onmessage(Some(listener.as_ref().unchecked_ref()));
131        self.message_listener = Some(listener);
132        self.broadcast_channel = Some(broadcast_channel);
133
134        // Use localStorage for atomic coordination - no delays needed
135        log::debug!("LeaderElectionManager::start_election() - Calling try_become_leader()");
136        self.try_become_leader().await?;
137
138        // Start heartbeat if we're leader
139        let is_leader = self.state.borrow().is_leader;
140        web_sys::console::log_1(
141            &format!(
142                "LeaderElectionManager::start_election() - After try_become_leader, is_leader={}",
143                is_leader
144            )
145            .into(),
146        );
147        if is_leader {
148            web_sys::console::log_1(
149                &"LeaderElectionManager::start_election() - Calling start_heartbeat()".into(),
150            );
151            self.start_heartbeat()?;
152            web_sys::console::log_1(
153                &"LeaderElectionManager::start_election() - Heartbeat started".into(),
154            );
155        } else {
156            web_sys::console::log_1(
157                &"LeaderElectionManager::start_election() - Not leader, skipping heartbeat".into(),
158            );
159        }
160
161        Ok(())
162    }
163
164    /// Try to become leader using localStorage-based atomic coordination
165    ///
166    /// # Arguments
167    /// * `force` - If true, ignores existing leader's valid lease and forces takeover
168    pub async fn try_become_leader_internal(&mut self, force: bool) -> Result<(), DatabaseError> {
169        let state = self.state.borrow();
170        let my_instance_id = state.instance_id.clone();
171        let db_name = state.db_name.clone();
172        drop(state);
173
174        // Use localStorage for atomic coordination
175        let window = web_sys::window().ok_or_else(|| {
176            DatabaseError::new(
177                "STORAGE_ERROR",
178                "Window not available - not in browser context",
179            )
180        })?;
181        let storage = window
182            .local_storage()
183            .map_err(|_| {
184                DatabaseError::new(
185                    "STORAGE_ERROR",
186                    "localStorage access denied (check browser settings)",
187                )
188            })?
189            .ok_or_else(|| {
190                DatabaseError::new(
191                    "STORAGE_ERROR",
192                    "localStorage unavailable (private browsing mode?)",
193                )
194            })?;
195
196        let instances_key = format!("datasync_instances_{}", db_name);
197        let leader_key = format!("datasync_leader_{}", db_name);
198
199        // Step 1: Register our instance atomically
200        let current_time = Date::now() as u64;
201        let instance_data = format!("{}:{}", my_instance_id, current_time);
202
203        // Get existing instances
204        let existing_instances = storage
205            .get_item(&instances_key)
206            .map_err(|e| {
207                DatabaseError::new(
208                    "LEADER_ELECTION_ERROR",
209                    &format!("Failed to get instances: {:?}", e),
210                )
211            })?
212            .unwrap_or_default();
213        let mut all_instances: Vec<String> = if existing_instances.is_empty() {
214            Vec::new()
215        } else {
216            existing_instances
217                .split(',')
218                .map(|s| s.to_string())
219                .collect()
220        };
221
222        // Add ourselves if not already present
223        if !all_instances
224            .iter()
225            .any(|inst| inst.starts_with(&format!("{}:", my_instance_id)))
226        {
227            all_instances.push(instance_data);
228        }
229
230        // Clean up expired instances (older than 10 seconds)
231        let cutoff_time = current_time - 10000;
232        all_instances.retain(|inst| {
233            if let Some(colon_pos) = inst.rfind(':') {
234                if let Ok(timestamp) = inst[colon_pos + 1..].parse::<u64>() {
235                    timestamp > cutoff_time
236                } else {
237                    false
238                }
239            } else {
240                false
241            }
242        });
243
244        // Update instances list
245        let instances_str = all_instances.join(",");
246        storage
247            .set_item(&instances_key, &instances_str)
248            .map_err(|e| {
249                DatabaseError::new(
250                    "LEADER_ELECTION_ERROR",
251                    &format!("Failed to set instances: {:?}", e),
252                )
253            })?;
254
255        // Step 2: Determine leader based on lowest instance ID
256        let mut instance_ids: Vec<String> = all_instances
257            .iter()
258            .filter_map(|inst| inst.split(':').next().map(|s| s.to_string()))
259            .collect();
260        instance_ids.sort();
261
262        log::debug!("All instances for {}: {:?}", db_name, instance_ids);
263
264        if let Some(lowest_id) = instance_ids.first() {
265            if force || *lowest_id == my_instance_id {
266                // We should be the leader - attempt atomic claim (either we have lowest ID or we're forcing)
267                if force && *lowest_id != my_instance_id {
268                    log::debug!(
269                        "FORCING leadership takeover for {} (overriding lowest ID rule)",
270                        db_name
271                    );
272                } else {
273                    log::debug!(
274                        "I have the lowest ID - attempting atomic leadership claim for {}",
275                        db_name
276                    );
277                }
278
279                // Check if someone else already claimed leadership (atomic check-and-set)
280                if let Ok(Some(existing_data)) = storage.get_item(&leader_key) {
281                    if let Some(colon_pos) = existing_data.rfind(':') {
282                        let existing_leader_id = &existing_data[..colon_pos];
283                        if let Ok(existing_timestamp) =
284                            existing_data[colon_pos + 1..].parse::<u64>()
285                        {
286                            let existing_lease_expired = (current_time - existing_timestamp) > 5000;
287
288                            if !force
289                                && !existing_lease_expired
290                                && existing_leader_id != my_instance_id
291                            {
292                                // Someone else is already leader and lease is valid (and we're not forcing)
293                                log::debug!(
294                                    "{} already claimed leadership for {}",
295                                    existing_leader_id,
296                                    db_name
297                                );
298
299                                let mut state = self.state.borrow_mut();
300                                state.is_leader = false;
301                                state.leader_id = Some(existing_leader_id.to_string());
302                                state.lease_expiry = existing_timestamp + self.lease_duration_ms;
303                                return Ok(());
304                            }
305                        }
306                    }
307                }
308
309                // Atomically claim leadership (no valid existing leader)
310                let leader_data = format!("{}:{}", my_instance_id, current_time);
311                storage.set_item(&leader_key, &leader_data).map_err(|e| {
312                    DatabaseError::new(
313                        "LEADER_ELECTION_ERROR",
314                        &format!("Failed to set leader: {:?}", e),
315                    )
316                })?;
317
318                let mut state = self.state.borrow_mut();
319                state.is_leader = true;
320                state.leader_id = Some(my_instance_id.clone());
321                state.lease_expiry = current_time + self.lease_duration_ms;
322                drop(state);
323
324                log::info!("Became leader for {} with ID {}", db_name, my_instance_id);
325
326                // BROADCAST EVENT: Leadership claimed - NO POLLING NEEDED!
327                if let Some(ref channel) = self.broadcast_channel {
328                    let message = format!("LEADER_CLAIMED:{}:{}", my_instance_id, current_time);
329                    if let Err(e) = channel.post_message(&JsValue::from_str(&message)) {
330                        log::warn!("Failed to broadcast leadership claim: {:?}", e);
331                    } else {
332                        log::debug!("EVENT: Broadcasted leadership claim for {}", db_name);
333                    }
334                }
335
336                // Start heartbeat to maintain lease
337                if self.heartbeat_interval.is_none() {
338                    let _ = self.start_heartbeat();
339                }
340            } else {
341                // Someone else should be the leader
342                log::debug!(
343                    "Instance {} has lower ID - not claiming leadership for {}",
344                    lowest_id,
345                    db_name
346                );
347
348                let mut state = self.state.borrow_mut();
349                state.is_leader = false;
350                state.leader_id = Some(lowest_id.clone());
351                state.lease_expiry = current_time + self.lease_duration_ms;
352            }
353        }
354
355        Ok(())
356    }
357
358    /// Try to become leader (respects existing leader's lease)
359    pub async fn try_become_leader(&mut self) -> Result<(), DatabaseError> {
360        self.try_become_leader_internal(false).await
361    }
362
363    /// Force leadership takeover (ignores existing leader's lease)
364    pub async fn force_become_leader(&mut self) -> Result<(), DatabaseError> {
365        self.try_become_leader_internal(true).await
366    }
367
368    /// Start sending heartbeats as leader using localStorage
369    pub fn start_heartbeat(&mut self) -> Result<(), DatabaseError> {
370        web_sys::console::log_1(&"start_heartbeat() called".into());
371
372        // CRITICAL: Send initial heartbeat IMMEDIATELY
373        let state = self.state.borrow();
374        web_sys::console::log_1(
375            &format!(
376                "start_heartbeat: is_leader={}, db_name={}",
377                state.is_leader, state.db_name
378            )
379            .into(),
380        );
381
382        if state.is_leader {
383            let current_time = Date::now() as u64;
384
385            let window = web_sys::window()
386                .ok_or_else(|| DatabaseError::new("LEADER_ELECTION_ERROR", "Window unavailable"))?;
387            let storage = window
388                .local_storage()
389                .map_err(|_| {
390                    DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage unavailable")
391                })?
392                .ok_or_else(|| {
393                    DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage is None")
394                })?;
395
396            let leader_key = format!("datasync_leader_{}", state.db_name);
397            let leader_data = format!("{}:{}", state.instance_id, current_time);
398
399            web_sys::console::log_1(
400                &format!(
401                    "Writing heartbeat: key={}, data={}",
402                    leader_key, leader_data
403                )
404                .into(),
405            );
406
407            storage.set_item(&leader_key, &leader_data).map_err(|e| {
408                web_sys::console::error_1(
409                    &format!("Failed to write initial heartbeat: {:?}", e).into(),
410                );
411                DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to write initial heartbeat")
412            })?;
413
414            web_sys::console::log_1(
415                &format!(
416                    "Sent initial heartbeat for {} from leader {}",
417                    state.db_name, state.instance_id
418                )
419                .into(),
420            );
421        } else {
422            web_sys::console::warn_1(&"start_heartbeat called but is_leader=false".into());
423        }
424        drop(state);
425
426        // Now set up interval for periodic updates
427        let state_clone = self.state.clone();
428        let valid_clone = self.heartbeat_valid.clone();
429
430        // Mark heartbeat as valid before starting
431        *self.heartbeat_valid.borrow_mut() = true;
432
433        let closure = Closure::wrap(Box::new(move || {
434            // CRITICAL: Check validity FIRST before any other operations
435            // This prevents "closure invoked after being dropped" errors
436            // when a pending setInterval tick fires after stop_election invalidates
437            if !*valid_clone.borrow() {
438                // Heartbeat has been invalidated - don't execute
439                return;
440            }
441
442            // Reentrancy guard: skip if heartbeat is already running
443            // This prevents "closure invoked recursively" errors from wasm-bindgen
444            let already_running = HEARTBEAT_RUNNING.with(|running| {
445                let was_running = *running.borrow();
446                if !was_running {
447                    *running.borrow_mut() = true;
448                }
449                was_running
450            });
451
452            if already_running {
453                log::debug!("Heartbeat skipped - previous invocation still running");
454                return;
455            }
456
457            // Ensure we clear the flag when done (even on early return)
458            struct HeartbeatGuard;
459            impl Drop for HeartbeatGuard {
460                fn drop(&mut self) {
461                    HEARTBEAT_RUNNING.with(|running| {
462                        *running.borrow_mut() = false;
463                    });
464                }
465            }
466            let _guard = HeartbeatGuard;
467
468            let state = state_clone.borrow();
469            if state.is_leader {
470                let current_time = Date::now() as u64;
471
472                // Update leader heartbeat in localStorage
473                let window = match web_sys::window() {
474                    Some(w) => w,
475                    None => {
476                        log::error!("Window unavailable in heartbeat - stopping heartbeat");
477                        return;
478                    }
479                };
480                let storage = match window.local_storage() {
481                    Ok(Some(s)) => s,
482                    Ok(None) => {
483                        log::warn!("localStorage unavailable in heartbeat (private browsing?)");
484                        return;
485                    }
486                    Err(_) => {
487                        log::error!("localStorage access denied in heartbeat");
488                        return;
489                    }
490                };
491                let leader_key = format!("datasync_leader_{}", state.db_name);
492                let leader_data = format!("{}:{}", state.instance_id, current_time);
493
494                let _ = storage.set_item(&leader_key, &leader_data);
495
496                log::debug!(
497                    "Updated leader heartbeat for {} from leader {}",
498                    state.db_name,
499                    state.instance_id
500                );
501            }
502        }) as Box<dyn FnMut()>);
503
504        let interval_id = web_sys::window()
505            .unwrap()
506            .set_interval_with_callback_and_timeout_and_arguments_0(
507                closure.as_ref().unchecked_ref(),
508                1000, // Send heartbeat every 1 second
509            )
510            .map_err(|_| {
511                DatabaseError::new(
512                    "LEADER_ELECTION_ERROR",
513                    "Failed to start heartbeat interval",
514                )
515            })?;
516
517        self.heartbeat_interval = Some(interval_id);
518
519        // CRITICAL: Intentionally leak the closure via forget() to prevent
520        // "closure invoked after being dropped" errors.
521        //
522        // When the manager is dropped:
523        // 1. heartbeat_valid is set to false (closure becomes no-op)
524        // 2. clearInterval is called (stops future scheduling)
525        // 3. But pending callbacks in JS event queue may still fire
526        // 4. Since the closure is leaked (never dropped), those callbacks
527        //    will safely invoke the Rust closure which immediately returns
528        //    due to the validity check.
529        //
530        // Trade-off: Small memory leak (~100 bytes per database) but prevents
531        // runtime errors. Databases are typically long-lived so this is acceptable.
532        closure.forget();
533
534        Ok(())
535    }
536
537    /// Stop leader election (e.g., when tab is closing)
538    pub async fn stop_election(&mut self) -> Result<(), DatabaseError> {
539        // CRITICAL: Check if already stopped (idempotent)
540        if self.heartbeat_interval.is_none() && !*self.heartbeat_valid.borrow() {
541            web_sys::console::log_1(&"[STOP] Already stopped - skipping".into());
542            return Ok(());
543        }
544
545        let state = self.state.borrow();
546        let db_name = state.db_name.clone();
547        let instance_id = state.instance_id.clone();
548        let was_leader = state.is_leader;
549        drop(state);
550
551        log::info!(
552            "[STOP] Stopping leader election for {} (was_leader: {})",
553            db_name,
554            was_leader
555        );
556
557        // CRITICAL: Invalidate heartbeat FIRST, before clearing interval
558        // This ensures any pending setInterval tick will bail out immediately
559        // and won't try to execute with freed resources
560        *self.heartbeat_valid.borrow_mut() = false;
561        web_sys::console::log_1(&format!("[STOP] Invalidated heartbeat for {}", db_name).into());
562
563        // Now clear interval (closure is intentionally leaked, no need to drop)
564        if let Some(interval_id) = self.heartbeat_interval.take() {
565            web_sys::console::log_1(
566                &format!("[STOP] Clearing interval {} for {}", interval_id, db_name).into(),
567            );
568            if let Some(window) = web_sys::window() {
569                window.clear_interval_with_handle(interval_id);
570            }
571        }
572
573        // CRITICAL: Close the BroadcastChannel to prevent test interference
574        if let Some(channel) = self.broadcast_channel.take() {
575            channel.close();
576            web_sys::console::log_1(
577                &format!("[STOP] Closed BroadcastChannel for {}", db_name).into(),
578            );
579        }
580
581        // Remove ourselves from localStorage instances list
582        let Some(window) = web_sys::window() else {
583            log::warn!("Window unavailable during cleanup");
584            return Ok(());
585        };
586        let storage = match window.local_storage() {
587            Ok(Some(s)) => s,
588            Ok(None) | Err(_) => {
589                log::warn!("localStorage unavailable during cleanup (private browsing?)");
590                return Ok(());
591            }
592        };
593        let instances_key = format!("datasync_instances_{}", db_name);
594
595        if let Ok(Some(existing_instances)) = storage.get_item(&instances_key) {
596            let all_instances: Vec<String> = existing_instances
597                .split(',')
598                .map(|s| s.to_string())
599                .collect();
600            let filtered_instances: Vec<String> = all_instances
601                .into_iter()
602                .filter(|inst| !inst.starts_with(&format!("{}:", instance_id)))
603                .collect();
604
605            if filtered_instances.is_empty() {
606                // Remove the key entirely if no instances left
607                let _ = storage.remove_item(&instances_key);
608            } else {
609                // Update with remaining instances
610                let instances_str = filtered_instances.join(",");
611                let _ = storage.set_item(&instances_key, &instances_str);
612            }
613        }
614
615        // Clear leader data if we were the leader
616        if was_leader {
617            let leader_key = format!("datasync_leader_{}", db_name);
618            let _ = storage.remove_item(&leader_key);
619
620            log::debug!(
621                "Cleared leader data for {} (was leader: {})",
622                db_name,
623                instance_id
624            );
625        }
626
627        // Reset state
628        let mut state = self.state.borrow_mut();
629        state.is_leader = false;
630        state.leader_id = None;
631
632        Ok(())
633    }
634
635    /// Check if this instance is the leader (with localStorage validation and re-election)
636    pub async fn is_leader(&self) -> bool {
637        let now = Date::now() as u64;
638        let state = self.state.borrow();
639        let db_name = state.db_name.clone();
640        let my_instance_id = state.instance_id.clone();
641
642        // If localStorage is unavailable, we can't coordinate - return false
643        let Some(window) = web_sys::window() else {
644            log::warn!("Window unavailable for leader check - assuming not leader");
645            return false;
646        };
647        let storage = match window.local_storage() {
648            Ok(Some(s)) => s,
649            Ok(None) => {
650                log::warn!(
651                    "localStorage unavailable for leader check (private browsing?) - assuming not leader"
652                );
653                return false;
654            }
655            Err(_) => {
656                log::error!("localStorage access denied for leader check - assuming not leader");
657                return false;
658            }
659        };
660        let leader_key = format!("datasync_leader_{}", db_name);
661
662        // Check current leader in localStorage
663        let current_leader_expired = if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
664            if let Some(colon_pos) = leader_data.rfind(':') {
665                let leader_id = &leader_data[..colon_pos];
666                if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
667                    let lease_expired = (now - timestamp) > 5000; // 5 second lease
668
669                    if leader_id == my_instance_id && !lease_expired {
670                        return true; // We're still the valid leader
671                    }
672
673                    lease_expired // Return whether the current leader's lease expired
674                } else {
675                    true // Invalid timestamp, consider expired
676                }
677            } else {
678                true // Invalid format, consider expired
679            }
680        } else {
681            true // No leader data, consider expired
682        };
683
684        drop(state);
685
686        // If current leader expired, we need re-election (but can't do it from immutable self)
687        if current_leader_expired {
688            log::debug!(
689                "Current leader lease expired for {} - re-election needed",
690                db_name
691            );
692
693            // Update our state to reflect no current leader
694            let mut state = self.state.borrow_mut();
695            state.is_leader = false;
696            state.leader_id = None;
697            false
698        } else {
699            // Update our state to reflect we're not leader
700            let mut state = self.state.borrow_mut();
701            state.is_leader = false;
702            false
703        }
704    }
705
706    /// Send a heartbeat (for testing)
707    pub async fn send_heartbeat(&self) -> Result<(), DatabaseError> {
708        if let Some(ref channel) = self.broadcast_channel {
709            let state = self.state.borrow();
710            let now = Date::now() as u64;
711
712            let message = js_sys::Object::new();
713            js_sys::Reflect::set(&message, &"type".into(), &"heartbeat".into()).unwrap();
714            js_sys::Reflect::set(
715                &message,
716                &"leader_id".into(),
717                &state.instance_id.clone().into(),
718            )
719            .unwrap();
720            js_sys::Reflect::set(&message, &"timestamp".into(), &(now as f64).into()).unwrap();
721
722            channel.post_message(&message).map_err(|_| {
723                DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to send heartbeat")
724            })?;
725        }
726
727        Ok(())
728    }
729
730    /// Get timestamp of last received leader heartbeat from localStorage
731    pub async fn get_last_heartbeat(&self) -> u64 {
732        let state = self.state.borrow();
733        let Some(window) = web_sys::window() else {
734            log::warn!("Window unavailable for heartbeat check");
735            return 0;
736        };
737        let storage = match window.local_storage() {
738            Ok(Some(s)) => s,
739            Ok(None) | Err(_) => {
740                log::warn!("localStorage unavailable for heartbeat check (private browsing?)");
741                return 0;
742            }
743        };
744        let leader_key = format!("datasync_leader_{}", state.db_name);
745
746        if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
747            if let Some(colon_pos) = leader_data.rfind(':') {
748                if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
749                    return timestamp;
750                }
751            }
752        }
753
754        // Fallback to stored value
755        state.last_heartbeat
756    }
757}
758
759/// Drop implementation to prevent "closure invoked after being dropped" errors
760///
761/// CRITICAL: Invalidate heartbeat_valid BEFORE clearing interval.
762/// The heartbeat closure is intentionally leaked via forget() so it's never dropped.
763/// Any pending setInterval ticks will safely invoke the leaked closure which
764/// immediately returns due to the validity check.
765impl Drop for LeaderElectionManager {
766    fn drop(&mut self) {
767        // CRITICAL: Invalidate heartbeat FIRST - leaked closure will become no-op
768        *self.heartbeat_valid.borrow_mut() = false;
769
770        // Clear heartbeat interval (stops future scheduling)
771        if let Some(interval_id) = self.heartbeat_interval.take() {
772            if let Some(window) = web_sys::window() {
773                window.clear_interval_with_handle(interval_id);
774                log::debug!(
775                    "LeaderElectionManager::drop() - Cleared heartbeat interval {}",
776                    interval_id
777                );
778            }
779        }
780
781        // Close broadcast channel
782        if let Some(channel) = self.broadcast_channel.take() {
783            channel.close();
784            log::debug!("LeaderElectionManager::drop() - Closed BroadcastChannel");
785        }
786
787        // Note: heartbeat closure is intentionally leaked (never dropped)
788        // message_listener will be dropped here
789        log::debug!(
790            "LeaderElectionManager::drop() - Cleanup complete for {}",
791            self.state.borrow().db_name
792        );
793    }
794}