absurder_sql/storage/
leader_election.rs

1//! Multi-Tab Leader Election Implementation
2//!
3//! Implements leader election using BroadcastChannel + deterministic leader selection.
4//! Only one tab/instance can be the leader at any time for a given database.
5//! Uses lowest instance ID wins approach to resolve race conditions.
6
7use crate::types::DatabaseError;
8use js_sys::Date;
9use std::cell::RefCell;
10use std::rc::Rc;
11use wasm_bindgen::JsCast;
12use wasm_bindgen::prelude::*;
13use web_sys::BroadcastChannel;
14
15/// Leader election state for a database instance
16#[derive(Debug, Clone)]
17pub struct LeaderElectionState {
18    pub db_name: String,
19    pub instance_id: String,
20    pub is_leader: bool,
21    pub leader_id: Option<String>,
22    pub lease_expiry: u64,
23    pub last_heartbeat: u64,
24}
25
26/// Manager for multi-tab leader election
27pub struct LeaderElectionManager {
28    pub state: Rc<RefCell<LeaderElectionState>>,
29    broadcast_channel: Option<BroadcastChannel>,
30    pub heartbeat_interval: Option<i32>,
31    pub heartbeat_closure: Option<Closure<dyn FnMut()>>,
32    message_listener: Option<Closure<dyn FnMut(web_sys::MessageEvent)>>,
33    lease_duration_ms: u64,
34}
35
36impl LeaderElectionManager {
37    /// Create new leader election manager with deterministic instance ID
38    pub fn new(db_name: String) -> Self {
39        // Create deterministic instance ID: timestamp + random for uniqueness and ordering
40        let timestamp = Date::now() as u64;
41        let random_part = (js_sys::Math::random() * 1000.0) as u64;
42        let instance_id = format!("{:016x}_{:03x}", timestamp, random_part);
43
44        log::debug!("Created instance {} for {}", instance_id, db_name);
45
46        Self {
47            state: Rc::new(RefCell::new(LeaderElectionState {
48                db_name,
49                instance_id,
50                is_leader: false,
51                leader_id: None,
52                lease_expiry: 0,
53                last_heartbeat: 0,
54            })),
55            broadcast_channel: None,
56            heartbeat_interval: None,
57            heartbeat_closure: None,
58            message_listener: None,
59            lease_duration_ms: 1000, // 1 second - fast leader election
60        }
61    }
62
63    /// Start leader election process using localStorage coordination
64    pub async fn start_election(&mut self) -> Result<(), DatabaseError> {
65        log::debug!(
66            "LeaderElectionManager::start_election() - Starting for {}",
67            self.state.borrow().db_name
68        );
69
70        // Create BroadcastChannel for EVENT-BASED leader election
71        let channel_name = format!("datasync_leader_{}", self.state.borrow().db_name);
72        log::debug!(
73            "LeaderElectionManager::start_election() - Creating BroadcastChannel: {}",
74            channel_name
75        );
76        let broadcast_channel = BroadcastChannel::new(&channel_name).map_err(|_| {
77            DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to create BroadcastChannel")
78        })?;
79
80        // Set up EVENT LISTENER for leadership change messages
81        let state_clone = self.state.clone();
82        let listener = Closure::wrap(Box::new(move |event: web_sys::MessageEvent| {
83            if let Ok(data) = event.data().dyn_into::<js_sys::JsString>() {
84                let message: String = data.into();
85
86                // Parse message: "LEADER_CLAIMED:instance_id:timestamp"
87                if let Some(parts) = message.strip_prefix("LEADER_CLAIMED:") {
88                    let parts: Vec<&str> = parts.split(':').collect();
89                    if parts.len() == 2 {
90                        let new_leader_id = parts[0];
91                        if let Ok(_timestamp) = parts[1].parse::<u64>() {
92                            let mut state = state_clone.borrow_mut();
93                            let my_instance_id = state.instance_id.clone();
94
95                            if new_leader_id == my_instance_id {
96                                // We're the leader!
97                                state.is_leader = true;
98                                state.leader_id = Some(new_leader_id.to_string());
99                                log::info!(
100                                    "EVENT: Became leader for {} via BroadcastChannel",
101                                    state.db_name
102                                );
103                            } else {
104                                // Someone else is leader
105                                state.is_leader = false;
106                                state.leader_id = Some(new_leader_id.to_string());
107                                log::debug!(
108                                    "EVENT: {} is now leader for {}",
109                                    new_leader_id,
110                                    state.db_name
111                                );
112                            }
113                        }
114                    }
115                }
116            }
117        }) as Box<dyn FnMut(web_sys::MessageEvent)>);
118
119        broadcast_channel.set_onmessage(Some(listener.as_ref().unchecked_ref()));
120        self.message_listener = Some(listener);
121        self.broadcast_channel = Some(broadcast_channel);
122
123        // Use localStorage for atomic coordination - no delays needed
124        log::debug!("LeaderElectionManager::start_election() - Calling try_become_leader()");
125        self.try_become_leader().await?;
126
127        // Start heartbeat if we're leader
128        let is_leader = self.state.borrow().is_leader;
129        web_sys::console::log_1(
130            &format!(
131                "LeaderElectionManager::start_election() - After try_become_leader, is_leader={}",
132                is_leader
133            )
134            .into(),
135        );
136        if is_leader {
137            web_sys::console::log_1(
138                &"LeaderElectionManager::start_election() - Calling start_heartbeat()".into(),
139            );
140            self.start_heartbeat()?;
141            web_sys::console::log_1(
142                &"LeaderElectionManager::start_election() - Heartbeat started".into(),
143            );
144        } else {
145            web_sys::console::log_1(
146                &"LeaderElectionManager::start_election() - Not leader, skipping heartbeat".into(),
147            );
148        }
149
150        Ok(())
151    }
152
153    /// Try to become leader using localStorage-based atomic coordination
154    ///
155    /// # Arguments
156    /// * `force` - If true, ignores existing leader's valid lease and forces takeover
157    pub async fn try_become_leader_internal(&mut self, force: bool) -> Result<(), DatabaseError> {
158        let state = self.state.borrow();
159        let my_instance_id = state.instance_id.clone();
160        let db_name = state.db_name.clone();
161        drop(state);
162
163        // Use localStorage for atomic coordination
164        let window = web_sys::window().ok_or_else(|| {
165            DatabaseError::new(
166                "STORAGE_ERROR",
167                "Window not available - not in browser context",
168            )
169        })?;
170        let storage = window
171            .local_storage()
172            .map_err(|_| {
173                DatabaseError::new(
174                    "STORAGE_ERROR",
175                    "localStorage access denied (check browser settings)",
176                )
177            })?
178            .ok_or_else(|| {
179                DatabaseError::new(
180                    "STORAGE_ERROR",
181                    "localStorage unavailable (private browsing mode?)",
182                )
183            })?;
184
185        let instances_key = format!("datasync_instances_{}", db_name);
186        let leader_key = format!("datasync_leader_{}", db_name);
187
188        // Step 1: Register our instance atomically
189        let current_time = Date::now() as u64;
190        let instance_data = format!("{}:{}", my_instance_id, current_time);
191
192        // Get existing instances
193        let existing_instances = storage
194            .get_item(&instances_key)
195            .map_err(|e| {
196                DatabaseError::new(
197                    "LEADER_ELECTION_ERROR",
198                    &format!("Failed to get instances: {:?}", e),
199                )
200            })?
201            .unwrap_or_default();
202        let mut all_instances: Vec<String> = if existing_instances.is_empty() {
203            Vec::new()
204        } else {
205            existing_instances
206                .split(',')
207                .map(|s| s.to_string())
208                .collect()
209        };
210
211        // Add ourselves if not already present
212        if !all_instances
213            .iter()
214            .any(|inst| inst.starts_with(&format!("{}:", my_instance_id)))
215        {
216            all_instances.push(instance_data);
217        }
218
219        // Clean up expired instances (older than 10 seconds)
220        let cutoff_time = current_time - 10000;
221        all_instances.retain(|inst| {
222            if let Some(colon_pos) = inst.rfind(':') {
223                if let Ok(timestamp) = inst[colon_pos + 1..].parse::<u64>() {
224                    timestamp > cutoff_time
225                } else {
226                    false
227                }
228            } else {
229                false
230            }
231        });
232
233        // Update instances list
234        let instances_str = all_instances.join(",");
235        storage
236            .set_item(&instances_key, &instances_str)
237            .map_err(|e| {
238                DatabaseError::new(
239                    "LEADER_ELECTION_ERROR",
240                    &format!("Failed to set instances: {:?}", e),
241                )
242            })?;
243
244        // Step 2: Determine leader based on lowest instance ID
245        let mut instance_ids: Vec<String> = all_instances
246            .iter()
247            .filter_map(|inst| inst.split(':').next().map(|s| s.to_string()))
248            .collect();
249        instance_ids.sort();
250
251        log::debug!("All instances for {}: {:?}", db_name, instance_ids);
252
253        if let Some(lowest_id) = instance_ids.first() {
254            if force || *lowest_id == my_instance_id {
255                // We should be the leader - attempt atomic claim (either we have lowest ID or we're forcing)
256                if force && *lowest_id != my_instance_id {
257                    log::debug!(
258                        "FORCING leadership takeover for {} (overriding lowest ID rule)",
259                        db_name
260                    );
261                } else {
262                    log::debug!(
263                        "I have the lowest ID - attempting atomic leadership claim for {}",
264                        db_name
265                    );
266                }
267
268                // Check if someone else already claimed leadership (atomic check-and-set)
269                if let Ok(Some(existing_data)) = storage.get_item(&leader_key) {
270                    if let Some(colon_pos) = existing_data.rfind(':') {
271                        let existing_leader_id = &existing_data[..colon_pos];
272                        if let Ok(existing_timestamp) =
273                            existing_data[colon_pos + 1..].parse::<u64>()
274                        {
275                            let existing_lease_expired = (current_time - existing_timestamp) > 5000;
276
277                            if !force
278                                && !existing_lease_expired
279                                && existing_leader_id != my_instance_id
280                            {
281                                // Someone else is already leader and lease is valid (and we're not forcing)
282                                log::debug!(
283                                    "{} already claimed leadership for {}",
284                                    existing_leader_id,
285                                    db_name
286                                );
287
288                                let mut state = self.state.borrow_mut();
289                                state.is_leader = false;
290                                state.leader_id = Some(existing_leader_id.to_string());
291                                state.lease_expiry = existing_timestamp + self.lease_duration_ms;
292                                return Ok(());
293                            }
294                        }
295                    }
296                }
297
298                // Atomically claim leadership (no valid existing leader)
299                let leader_data = format!("{}:{}", my_instance_id, current_time);
300                storage.set_item(&leader_key, &leader_data).map_err(|e| {
301                    DatabaseError::new(
302                        "LEADER_ELECTION_ERROR",
303                        &format!("Failed to set leader: {:?}", e),
304                    )
305                })?;
306
307                let mut state = self.state.borrow_mut();
308                state.is_leader = true;
309                state.leader_id = Some(my_instance_id.clone());
310                state.lease_expiry = current_time + self.lease_duration_ms;
311                drop(state);
312
313                log::info!("Became leader for {} with ID {}", db_name, my_instance_id);
314
315                // BROADCAST EVENT: Leadership claimed - NO POLLING NEEDED!
316                if let Some(ref channel) = self.broadcast_channel {
317                    let message = format!("LEADER_CLAIMED:{}:{}", my_instance_id, current_time);
318                    if let Err(e) = channel.post_message(&JsValue::from_str(&message)) {
319                        log::warn!("Failed to broadcast leadership claim: {:?}", e);
320                    } else {
321                        log::debug!("EVENT: Broadcasted leadership claim for {}", db_name);
322                    }
323                }
324
325                // Start heartbeat to maintain lease
326                if self.heartbeat_interval.is_none() {
327                    let _ = self.start_heartbeat();
328                }
329            } else {
330                // Someone else should be the leader
331                log::debug!(
332                    "Instance {} has lower ID - not claiming leadership for {}",
333                    lowest_id,
334                    db_name
335                );
336
337                let mut state = self.state.borrow_mut();
338                state.is_leader = false;
339                state.leader_id = Some(lowest_id.clone());
340                state.lease_expiry = current_time + self.lease_duration_ms;
341            }
342        }
343
344        Ok(())
345    }
346
347    /// Try to become leader (respects existing leader's lease)
348    pub async fn try_become_leader(&mut self) -> Result<(), DatabaseError> {
349        self.try_become_leader_internal(false).await
350    }
351
352    /// Force leadership takeover (ignores existing leader's lease)
353    pub async fn force_become_leader(&mut self) -> Result<(), DatabaseError> {
354        self.try_become_leader_internal(true).await
355    }
356
357    /// Start sending heartbeats as leader using localStorage
358    pub fn start_heartbeat(&mut self) -> Result<(), DatabaseError> {
359        web_sys::console::log_1(&"start_heartbeat() called".into());
360
361        // CRITICAL: Send initial heartbeat IMMEDIATELY
362        let state = self.state.borrow();
363        web_sys::console::log_1(
364            &format!(
365                "start_heartbeat: is_leader={}, db_name={}",
366                state.is_leader, state.db_name
367            )
368            .into(),
369        );
370
371        if state.is_leader {
372            let current_time = Date::now() as u64;
373
374            let window = web_sys::window()
375                .ok_or_else(|| DatabaseError::new("LEADER_ELECTION_ERROR", "Window unavailable"))?;
376            let storage = window
377                .local_storage()
378                .map_err(|_| {
379                    DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage unavailable")
380                })?
381                .ok_or_else(|| {
382                    DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage is None")
383                })?;
384
385            let leader_key = format!("datasync_leader_{}", state.db_name);
386            let leader_data = format!("{}:{}", state.instance_id, current_time);
387
388            web_sys::console::log_1(
389                &format!(
390                    "Writing heartbeat: key={}, data={}",
391                    leader_key, leader_data
392                )
393                .into(),
394            );
395
396            storage.set_item(&leader_key, &leader_data).map_err(|e| {
397                web_sys::console::error_1(
398                    &format!("Failed to write initial heartbeat: {:?}", e).into(),
399                );
400                DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to write initial heartbeat")
401            })?;
402
403            web_sys::console::log_1(
404                &format!(
405                    "Sent initial heartbeat for {} from leader {}",
406                    state.db_name, state.instance_id
407                )
408                .into(),
409            );
410        } else {
411            web_sys::console::warn_1(&"start_heartbeat called but is_leader=false".into());
412        }
413        drop(state);
414
415        // Now set up interval for periodic updates
416        let state_clone = self.state.clone();
417
418        let closure = Closure::wrap(Box::new(move || {
419            let state = state_clone.borrow();
420            if state.is_leader {
421                let current_time = Date::now() as u64;
422
423                // Update leader heartbeat in localStorage
424                let window = match web_sys::window() {
425                    Some(w) => w,
426                    None => {
427                        log::error!("Window unavailable in heartbeat - stopping heartbeat");
428                        return;
429                    }
430                };
431                let storage = match window.local_storage() {
432                    Ok(Some(s)) => s,
433                    Ok(None) => {
434                        log::warn!("localStorage unavailable in heartbeat (private browsing?)");
435                        return;
436                    }
437                    Err(_) => {
438                        log::error!("localStorage access denied in heartbeat");
439                        return;
440                    }
441                };
442                let leader_key = format!("datasync_leader_{}", state.db_name);
443                let leader_data = format!("{}:{}", state.instance_id, current_time);
444
445                let _ = storage.set_item(&leader_key, &leader_data);
446
447                log::debug!(
448                    "Updated leader heartbeat for {} from leader {}",
449                    state.db_name,
450                    state.instance_id
451                );
452            }
453        }) as Box<dyn FnMut()>);
454
455        let interval_id = web_sys::window()
456            .unwrap()
457            .set_interval_with_callback_and_timeout_and_arguments_0(
458                closure.as_ref().unchecked_ref(),
459                1000, // Send heartbeat every 1 second
460            )
461            .map_err(|_| {
462                DatabaseError::new(
463                    "LEADER_ELECTION_ERROR",
464                    "Failed to start heartbeat interval",
465                )
466            })?;
467
468        // CRITICAL: Store closure instead of forgetting it, so it can be properly cleaned up
469        self.heartbeat_interval = Some(interval_id);
470        self.heartbeat_closure = Some(closure);
471
472        Ok(())
473    }
474
475    /// Stop leader election (e.g., when tab is closing)
476    pub async fn stop_election(&mut self) -> Result<(), DatabaseError> {
477        // CRITICAL: Check if already stopped (idempotent)
478        if self.heartbeat_interval.is_none() && self.heartbeat_closure.is_none() {
479            web_sys::console::log_1(&"[STOP] Already stopped - skipping".into());
480            return Ok(());
481        }
482
483        let state = self.state.borrow();
484        let db_name = state.db_name.clone();
485        let instance_id = state.instance_id.clone();
486        let was_leader = state.is_leader;
487        drop(state);
488
489        log::info!(
490            "[STOP] Stopping leader election for {} (was_leader: {})",
491            db_name,
492            was_leader
493        );
494
495        // CRITICAL: Clear interval and closure FIRST to release Rc references
496        if let Some(interval_id) = self.heartbeat_interval.take() {
497            web_sys::console::log_1(
498                &format!("[STOP] Clearing interval {} for {}", interval_id, db_name).into(),
499            );
500            if let Some(window) = web_sys::window() {
501                window.clear_interval_with_handle(interval_id);
502            }
503        }
504
505        // Drop the closure to release any Rc<RefCell<State>> references
506        if let Some(_closure) = self.heartbeat_closure.take() {
507            web_sys::console::log_1(
508                &format!("[STOP] Dropped heartbeat closure for {}", db_name).into(),
509            );
510        }
511
512        // CRITICAL: Close the BroadcastChannel to prevent test interference
513        if let Some(channel) = self.broadcast_channel.take() {
514            channel.close();
515            web_sys::console::log_1(
516                &format!("[STOP] Closed BroadcastChannel for {}", db_name).into(),
517            );
518        }
519
520        // Remove ourselves from localStorage instances list
521        let Some(window) = web_sys::window() else {
522            log::warn!("Window unavailable during cleanup");
523            return Ok(());
524        };
525        let storage = match window.local_storage() {
526            Ok(Some(s)) => s,
527            Ok(None) | Err(_) => {
528                log::warn!("localStorage unavailable during cleanup (private browsing?)");
529                return Ok(());
530            }
531        };
532        let instances_key = format!("datasync_instances_{}", db_name);
533
534        if let Ok(Some(existing_instances)) = storage.get_item(&instances_key) {
535            let all_instances: Vec<String> = existing_instances
536                .split(',')
537                .map(|s| s.to_string())
538                .collect();
539            let filtered_instances: Vec<String> = all_instances
540                .into_iter()
541                .filter(|inst| !inst.starts_with(&format!("{}:", instance_id)))
542                .collect();
543
544            if filtered_instances.is_empty() {
545                // Remove the key entirely if no instances left
546                let _ = storage.remove_item(&instances_key);
547            } else {
548                // Update with remaining instances
549                let instances_str = filtered_instances.join(",");
550                let _ = storage.set_item(&instances_key, &instances_str);
551            }
552        }
553
554        // Clear leader data if we were the leader
555        if was_leader {
556            let leader_key = format!("datasync_leader_{}", db_name);
557            let _ = storage.remove_item(&leader_key);
558
559            log::debug!(
560                "Cleared leader data for {} (was leader: {})",
561                db_name,
562                instance_id
563            );
564        }
565
566        // Reset state
567        let mut state = self.state.borrow_mut();
568        state.is_leader = false;
569        state.leader_id = None;
570
571        Ok(())
572    }
573
574    /// Check if this instance is the leader (with localStorage validation and re-election)
575    pub async fn is_leader(&self) -> bool {
576        let now = Date::now() as u64;
577        let state = self.state.borrow();
578        let db_name = state.db_name.clone();
579        let my_instance_id = state.instance_id.clone();
580
581        // If localStorage is unavailable, we can't coordinate - return false
582        let Some(window) = web_sys::window() else {
583            log::warn!("Window unavailable for leader check - assuming not leader");
584            return false;
585        };
586        let storage = match window.local_storage() {
587            Ok(Some(s)) => s,
588            Ok(None) => {
589                log::warn!(
590                    "localStorage unavailable for leader check (private browsing?) - assuming not leader"
591                );
592                return false;
593            }
594            Err(_) => {
595                log::error!("localStorage access denied for leader check - assuming not leader");
596                return false;
597            }
598        };
599        let leader_key = format!("datasync_leader_{}", db_name);
600
601        // Check current leader in localStorage
602        let current_leader_expired = if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
603            if let Some(colon_pos) = leader_data.rfind(':') {
604                let leader_id = &leader_data[..colon_pos];
605                if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
606                    let lease_expired = (now - timestamp) > 5000; // 5 second lease
607
608                    if leader_id == my_instance_id && !lease_expired {
609                        return true; // We're still the valid leader
610                    }
611
612                    lease_expired // Return whether the current leader's lease expired
613                } else {
614                    true // Invalid timestamp, consider expired
615                }
616            } else {
617                true // Invalid format, consider expired
618            }
619        } else {
620            true // No leader data, consider expired
621        };
622
623        drop(state);
624
625        // If current leader expired, we need re-election (but can't do it from immutable self)
626        if current_leader_expired {
627            log::debug!(
628                "Current leader lease expired for {} - re-election needed",
629                db_name
630            );
631
632            // Update our state to reflect no current leader
633            let mut state = self.state.borrow_mut();
634            state.is_leader = false;
635            state.leader_id = None;
636            false
637        } else {
638            // Update our state to reflect we're not leader
639            let mut state = self.state.borrow_mut();
640            state.is_leader = false;
641            false
642        }
643    }
644
645    /// Send a heartbeat (for testing)
646    pub async fn send_heartbeat(&self) -> Result<(), DatabaseError> {
647        if let Some(ref channel) = self.broadcast_channel {
648            let state = self.state.borrow();
649            let now = Date::now() as u64;
650
651            let message = js_sys::Object::new();
652            js_sys::Reflect::set(&message, &"type".into(), &"heartbeat".into()).unwrap();
653            js_sys::Reflect::set(
654                &message,
655                &"leader_id".into(),
656                &state.instance_id.clone().into(),
657            )
658            .unwrap();
659            js_sys::Reflect::set(&message, &"timestamp".into(), &(now as f64).into()).unwrap();
660
661            channel.post_message(&message).map_err(|_| {
662                DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to send heartbeat")
663            })?;
664        }
665
666        Ok(())
667    }
668
669    /// Get timestamp of last received leader heartbeat from localStorage
670    pub async fn get_last_heartbeat(&self) -> u64 {
671        let state = self.state.borrow();
672        let Some(window) = web_sys::window() else {
673            log::warn!("Window unavailable for heartbeat check");
674            return 0;
675        };
676        let storage = match window.local_storage() {
677            Ok(Some(s)) => s,
678            Ok(None) | Err(_) => {
679                log::warn!("localStorage unavailable for heartbeat check (private browsing?)");
680                return 0;
681            }
682        };
683        let leader_key = format!("datasync_leader_{}", state.db_name);
684
685        if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
686            if let Some(colon_pos) = leader_data.rfind(':') {
687                if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
688                    return timestamp;
689                }
690            }
691        }
692
693        // Fallback to stored value
694        state.last_heartbeat
695    }
696}