absurder_sql/storage/
leader_election.rs

1//! Multi-Tab Leader Election Implementation
2//! 
3//! Implements leader election using BroadcastChannel + deterministic leader selection.
4//! Only one tab/instance can be the leader at any time for a given database.
5//! Uses lowest instance ID wins approach to resolve race conditions.
6
7use crate::types::DatabaseError;
8use js_sys::Date;
9use std::cell::RefCell;
10use std::rc::Rc;
11use wasm_bindgen::prelude::*;
12use wasm_bindgen::JsCast;
13use web_sys::BroadcastChannel;
14
15/// Leader election state for a database instance
16#[derive(Debug, Clone)]
17pub struct LeaderElectionState {
18    pub db_name: String,
19    pub instance_id: String,
20    pub is_leader: bool,
21    pub leader_id: Option<String>,
22    pub lease_expiry: u64,
23    pub last_heartbeat: u64,
24}
25
26/// Manager for multi-tab leader election
27pub struct LeaderElectionManager {
28    pub state: Rc<RefCell<LeaderElectionState>>,
29    broadcast_channel: Option<BroadcastChannel>,
30    pub heartbeat_interval: Option<i32>,
31    lease_duration_ms: u64,
32}
33
34impl LeaderElectionManager {
35    /// Create new leader election manager with deterministic instance ID
36    pub fn new(db_name: String) -> Self {
37        // Create deterministic instance ID: timestamp + random for uniqueness and ordering
38        let timestamp = Date::now() as u64;
39        let random_part = (js_sys::Math::random() * 1000.0) as u64;
40        let instance_id = format!("{:016x}_{:03x}", timestamp, random_part);
41        
42        log::debug!("Created instance {} for {}", instance_id, db_name);
43        
44        Self {
45            state: Rc::new(RefCell::new(LeaderElectionState {
46                db_name,
47                instance_id,
48                is_leader: false,
49                leader_id: None,
50                lease_expiry: 0,
51                last_heartbeat: 0,
52            })),
53            broadcast_channel: None,
54            heartbeat_interval: None,
55            lease_duration_ms: 5000, // 5 seconds
56        }
57    }
58    
59    /// Start leader election process using localStorage coordination
60    pub async fn start_election(&mut self) -> Result<(), DatabaseError> {
61        log::debug!("Starting localStorage-based leader election for {}", self.state.borrow().db_name);
62        
63        // Create BroadcastChannel for heartbeats (optional, for monitoring)
64        let channel_name = format!("datasync_leader_{}", self.state.borrow().db_name);
65        let broadcast_channel = BroadcastChannel::new(&channel_name)
66            .map_err(|_| DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to create BroadcastChannel"))?;
67        
68        self.broadcast_channel = Some(broadcast_channel);
69        
70        // Use localStorage for atomic coordination - no delays needed
71        self.try_become_leader().await?;
72        
73        // Start heartbeat if we're leader
74        if self.state.borrow().is_leader {
75            self.start_heartbeat()?;
76        }
77        
78        Ok(())
79    }
80    
81    /// Try to become leader using localStorage-based atomic coordination
82    /// 
83    /// # Arguments
84    /// * `force` - If true, ignores existing leader's valid lease and forces takeover
85    pub async fn try_become_leader_internal(&mut self, force: bool) -> Result<(), DatabaseError> {
86        let state = self.state.borrow();
87        let my_instance_id = state.instance_id.clone();
88        let db_name = state.db_name.clone();
89        drop(state);
90        
91        // Use localStorage for atomic coordination
92        let window = web_sys::window()
93            .ok_or_else(|| DatabaseError::new(
94                "STORAGE_ERROR",
95                "Window not available - not in browser context"
96            ))?;
97        let storage = window
98            .local_storage()
99            .map_err(|_| DatabaseError::new(
100                "STORAGE_ERROR",
101                "localStorage access denied (check browser settings)"
102            ))?
103            .ok_or_else(|| DatabaseError::new(
104                "STORAGE_ERROR",
105                "localStorage unavailable (private browsing mode?)"
106            ))?;
107        
108        let instances_key = format!("datasync_instances_{}", db_name);
109        let leader_key = format!("datasync_leader_{}", db_name);
110        
111        // Step 1: Register our instance atomically
112        let current_time = Date::now() as u64;
113        let instance_data = format!("{}:{}", my_instance_id, current_time);
114        
115        // Get existing instances
116        let existing_instances = storage.get_item(&instances_key).unwrap().unwrap_or_default();
117        let mut all_instances: Vec<String> = if existing_instances.is_empty() {
118            Vec::new()
119        } else {
120            existing_instances.split(',').map(|s| s.to_string()).collect()
121        };
122        
123        // Add ourselves if not already present
124        if !all_instances.iter().any(|inst| inst.starts_with(&format!("{}:", my_instance_id))) {
125            all_instances.push(instance_data);
126        }
127        
128        // Clean up expired instances (older than 10 seconds)
129        let cutoff_time = current_time - 10000;
130        all_instances.retain(|inst| {
131            if let Some(colon_pos) = inst.rfind(':') {
132                if let Ok(timestamp) = inst[colon_pos + 1..].parse::<u64>() {
133                    timestamp > cutoff_time
134                } else {
135                    false
136                }
137            } else {
138                false
139            }
140        });
141        
142        // Update instances list
143        let instances_str = all_instances.join(",");
144        storage.set_item(&instances_key, &instances_str).unwrap();
145        
146        // Step 2: Determine leader based on lowest instance ID
147        let mut instance_ids: Vec<String> = all_instances.iter()
148            .filter_map(|inst| inst.split(':').next().map(|s| s.to_string()))
149            .collect();
150        instance_ids.sort();
151        
152        log::debug!("All instances for {}: {:?}", db_name, instance_ids);
153        
154        if let Some(lowest_id) = instance_ids.first() {
155            if force || *lowest_id == my_instance_id {
156                // We should be the leader - attempt atomic claim (either we have lowest ID or we're forcing)
157                if force && *lowest_id != my_instance_id {
158                    log::debug!("FORCING leadership takeover for {} (overriding lowest ID rule)", db_name);
159                } else {
160                    log::debug!("I have the lowest ID - attempting atomic leadership claim for {}", db_name);
161                }
162                
163                // Check if someone else already claimed leadership (atomic check-and-set)
164                if let Ok(existing_leader) = storage.get_item(&leader_key) {
165                    if let Some(existing_data) = existing_leader {
166                        if let Some(colon_pos) = existing_data.rfind(':') {
167                            let existing_leader_id = &existing_data[..colon_pos];
168                            if let Ok(existing_timestamp) = existing_data[colon_pos + 1..].parse::<u64>() {
169                                let existing_lease_expired = (current_time - existing_timestamp) > 5000;
170                                
171                                if !force && !existing_lease_expired && existing_leader_id != my_instance_id {
172                                    // Someone else is already leader and lease is valid (and we're not forcing)
173                                    log::debug!("{} already claimed leadership for {}", 
174                                        existing_leader_id, db_name);
175                                    
176                                    let mut state = self.state.borrow_mut();
177                                    state.is_leader = false;
178                                    state.leader_id = Some(existing_leader_id.to_string());
179                                    state.lease_expiry = existing_timestamp + self.lease_duration_ms;
180                                    return Ok(());
181                                }
182                            }
183                        }
184                    }
185                }
186                
187                // Atomically claim leadership (no valid existing leader)
188                let leader_data = format!("{}:{}", my_instance_id, current_time);
189                storage.set_item(&leader_key, &leader_data).unwrap();
190                
191                let mut state = self.state.borrow_mut();
192                state.is_leader = true;
193                state.leader_id = Some(my_instance_id.clone());
194                state.lease_expiry = current_time + self.lease_duration_ms;
195                drop(state);
196                
197                log::info!("Became leader for {} with ID {}", db_name, my_instance_id);
198                
199                // Start heartbeat to maintain lease
200                if self.heartbeat_interval.is_none() {
201                    let _ = self.start_heartbeat();
202                }
203            } else {
204                // Someone else should be the leader
205                log::debug!("Instance {} has lower ID - not claiming leadership for {}", 
206                    lowest_id, db_name);
207                
208                let mut state = self.state.borrow_mut();
209                state.is_leader = false;
210                state.leader_id = Some(lowest_id.clone());
211                state.lease_expiry = current_time + self.lease_duration_ms;
212            }
213        }
214        
215        Ok(())
216    }
217    
218    /// Try to become leader (respects existing leader's lease)
219    pub async fn try_become_leader(&mut self) -> Result<(), DatabaseError> {
220        self.try_become_leader_internal(false).await
221    }
222    
223    /// Force leadership takeover (ignores existing leader's lease)
224    pub async fn force_become_leader(&mut self) -> Result<(), DatabaseError> {
225        self.try_become_leader_internal(true).await
226    }
227    
228    /// Start sending heartbeats as leader using localStorage
229    pub fn start_heartbeat(&mut self) -> Result<(), DatabaseError> {
230        let state_clone = self.state.clone();
231        
232        let closure = Closure::wrap(Box::new(move || {
233            let state = state_clone.borrow();
234            if state.is_leader {
235                let current_time = Date::now() as u64;
236                
237                // Update leader heartbeat in localStorage
238                let window = match web_sys::window() {
239                    Some(w) => w,
240                    None => {
241                        log::error!("Window unavailable in heartbeat - stopping heartbeat");
242                        return;
243                    }
244                };
245                let storage = match window.local_storage() {
246                    Ok(Some(s)) => s,
247                    Ok(None) => {
248                        log::warn!("localStorage unavailable in heartbeat (private browsing?)");
249                        return;
250                    },
251                    Err(_) => {
252                        log::error!("localStorage access denied in heartbeat");
253                        return;
254                    }
255                };
256                let leader_key = format!("datasync_leader_{}", state.db_name);
257                let leader_data = format!("{}:{}", state.instance_id, current_time);
258                
259                let _ = storage.set_item(&leader_key, &leader_data);
260                
261                log::debug!("Updated leader heartbeat for {} from leader {}", 
262                    state.db_name, state.instance_id);
263            }
264        }) as Box<dyn FnMut()>);
265        
266        let interval_id = web_sys::window()
267            .unwrap()
268            .set_interval_with_callback_and_timeout_and_arguments_0(
269                closure.as_ref().unchecked_ref(),
270                1000, // Send heartbeat every 1 second
271            )
272            .map_err(|_| DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to start heartbeat interval"))?;
273        
274        closure.forget();
275        self.heartbeat_interval = Some(interval_id);
276        
277        Ok(())
278    }
279    
280    /// Stop leader election (e.g., when tab is closing)
281    pub async fn stop_election(&mut self) -> Result<(), DatabaseError> {
282        let state = self.state.borrow();
283        let db_name = state.db_name.clone();
284        let instance_id = state.instance_id.clone();
285        let was_leader = state.is_leader;
286        drop(state);
287        
288        log::debug!("Stopping leader election for {}", db_name);
289        
290        // Clear heartbeat interval
291        if let Some(interval_id) = self.heartbeat_interval.take() {
292            if let Some(window) = web_sys::window() {
293                window.clear_interval_with_handle(interval_id);
294            }
295        }
296        
297        // Remove ourselves from localStorage instances list
298        let Some(window) = web_sys::window() else {
299            log::warn!("Window unavailable during cleanup");
300            return Ok(());
301        };
302        let storage = match window.local_storage() {
303            Ok(Some(s)) => s,
304            Ok(None) | Err(_) => {
305                log::warn!("localStorage unavailable during cleanup (private browsing?)");
306                return Ok(());
307            }
308        };
309        let instances_key = format!("datasync_instances_{}", db_name);
310        
311        if let Ok(Some(existing_instances)) = storage.get_item(&instances_key) {
312            let all_instances: Vec<String> = existing_instances.split(',').map(|s| s.to_string()).collect();
313            let filtered_instances: Vec<String> = all_instances.into_iter()
314                .filter(|inst| !inst.starts_with(&format!("{}:", instance_id)))
315                .collect();
316            
317            if filtered_instances.is_empty() {
318                // Remove the key entirely if no instances left
319                let _ = storage.remove_item(&instances_key);
320            } else {
321                // Update with remaining instances
322                let instances_str = filtered_instances.join(",");
323                let _ = storage.set_item(&instances_key, &instances_str);
324            }
325        }
326        
327        // Clear leader data if we were the leader
328        if was_leader {
329            let leader_key = format!("datasync_leader_{}", db_name);
330            let _ = storage.remove_item(&leader_key);
331            
332            log::debug!("Cleared leader data for {} (was leader: {})", db_name, instance_id);
333        }
334        
335        // Reset state
336        let mut state = self.state.borrow_mut();
337        state.is_leader = false;
338        state.leader_id = None;
339        
340        Ok(())
341    }
342    
343    /// Check if this instance is the leader (with localStorage validation and re-election)
344    pub async fn is_leader(&self) -> bool {
345        let now = Date::now() as u64;
346        let state = self.state.borrow();
347        let db_name = state.db_name.clone();
348        let my_instance_id = state.instance_id.clone();
349        
350        // If localStorage is unavailable, we can't coordinate - return false
351        let Some(window) = web_sys::window() else {
352            log::warn!("Window unavailable for leader check - assuming not leader");
353            return false;
354        };
355        let storage = match window.local_storage() {
356            Ok(Some(s)) => s,
357            Ok(None) => {
358                log::warn!("localStorage unavailable for leader check (private browsing?) - assuming not leader");
359                return false;
360            },
361            Err(_) => {
362                log::error!("localStorage access denied for leader check - assuming not leader");
363                return false;
364            }
365        };
366        let leader_key = format!("datasync_leader_{}", db_name);
367        
368        // Check current leader in localStorage
369        let current_leader_expired = if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
370            if let Some(colon_pos) = leader_data.rfind(':') {
371                let leader_id = &leader_data[..colon_pos];
372                if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
373                    let lease_expired = (now - timestamp) > 5000; // 5 second lease
374                    
375                    if leader_id == my_instance_id && !lease_expired {
376                        return true; // We're still the valid leader
377                    }
378                    
379                    lease_expired // Return whether the current leader's lease expired
380                } else {
381                    true // Invalid timestamp, consider expired
382                }
383            } else {
384                true // Invalid format, consider expired
385            }
386        } else {
387            true // No leader data, consider expired
388        };
389        
390        drop(state);
391        
392        // If current leader expired, we need re-election (but can't do it from immutable self)
393        if current_leader_expired {
394            log::debug!("Current leader lease expired for {} - re-election needed", db_name);
395            
396            // Update our state to reflect no current leader
397            let mut state = self.state.borrow_mut();
398            state.is_leader = false;
399            state.leader_id = None;
400            false
401        } else {
402            // Update our state to reflect we're not leader
403            let mut state = self.state.borrow_mut();
404            state.is_leader = false;
405            false
406        }
407    }
408    
409    /// Send a heartbeat (for testing)
410    pub async fn send_heartbeat(&self) -> Result<(), DatabaseError> {
411        if let Some(ref channel) = self.broadcast_channel {
412            let state = self.state.borrow();
413            let now = Date::now() as u64;
414            
415            let message = js_sys::Object::new();
416            js_sys::Reflect::set(&message, &"type".into(), &"heartbeat".into()).unwrap();
417            js_sys::Reflect::set(&message, &"leader_id".into(), &state.instance_id.clone().into()).unwrap();
418            js_sys::Reflect::set(&message, &"timestamp".into(), &(now as f64).into()).unwrap();
419            
420            channel.post_message(&message)
421                .map_err(|_| DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to send heartbeat"))?;
422        }
423        
424        Ok(())
425    }
426    
427    /// Get timestamp of last received leader heartbeat from localStorage
428    pub async fn get_last_heartbeat(&self) -> u64 {
429        let state = self.state.borrow();
430        let Some(window) = web_sys::window() else {
431            log::warn!("Window unavailable for heartbeat check");
432            return 0;
433        };
434        let storage = match window.local_storage() {
435            Ok(Some(s)) => s,
436            Ok(None) | Err(_) => {
437                log::warn!("localStorage unavailable for heartbeat check (private browsing?)");
438                return 0;
439            }
440        };
441        let leader_key = format!("datasync_leader_{}", state.db_name);
442        
443        if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
444            if let Some(colon_pos) = leader_data.rfind(':') {
445                if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
446                    return timestamp;
447                }
448            }
449        }
450        
451        // Fallback to stored value
452        state.last_heartbeat
453    }
454}