absurder_sql/storage/
leader_election.rs

1//! Multi-Tab Leader Election Implementation
2//!
3//! Implements leader election using BroadcastChannel + deterministic leader selection.
4//! Only one tab/instance can be the leader at any time for a given database.
5//! Uses lowest instance ID wins approach to resolve race conditions.
6
7use crate::types::DatabaseError;
8use js_sys::Date;
9use std::cell::RefCell;
10use std::rc::Rc;
11use wasm_bindgen::JsCast;
12use wasm_bindgen::prelude::*;
13use web_sys::BroadcastChannel;
14
15// Thread-local reentrancy guard for heartbeat closure
16// Prevents "closure invoked recursively" errors from wasm-bindgen
17thread_local! {
18    static HEARTBEAT_RUNNING: RefCell<bool> = const { RefCell::new(false) };
19}
20
21/// Leader election state for a database instance
22#[derive(Debug, Clone)]
23pub struct LeaderElectionState {
24    pub db_name: String,
25    pub instance_id: String,
26    pub is_leader: bool,
27    pub leader_id: Option<String>,
28    pub lease_expiry: u64,
29    pub last_heartbeat: u64,
30}
31
32/// Manager for multi-tab leader election
33pub struct LeaderElectionManager {
34    pub state: Rc<RefCell<LeaderElectionState>>,
35    broadcast_channel: Option<BroadcastChannel>,
36    pub heartbeat_interval: Option<i32>,
37    pub heartbeat_closure: Option<Closure<dyn FnMut()>>,
38    message_listener: Option<Closure<dyn FnMut(web_sys::MessageEvent)>>,
39    lease_duration_ms: u64,
40}
41
42impl LeaderElectionManager {
43    /// Create new leader election manager with deterministic instance ID
44    pub fn new(db_name: String) -> Self {
45        // Create deterministic instance ID: timestamp + random for uniqueness and ordering
46        let timestamp = Date::now() as u64;
47        let random_part = (js_sys::Math::random() * 1000.0) as u64;
48        let instance_id = format!("{:016x}_{:03x}", timestamp, random_part);
49
50        log::debug!("Created instance {} for {}", instance_id, db_name);
51
52        Self {
53            state: Rc::new(RefCell::new(LeaderElectionState {
54                db_name,
55                instance_id,
56                is_leader: false,
57                leader_id: None,
58                lease_expiry: 0,
59                last_heartbeat: 0,
60            })),
61            broadcast_channel: None,
62            heartbeat_interval: None,
63            heartbeat_closure: None,
64            message_listener: None,
65            lease_duration_ms: 1000, // 1 second - fast leader election
66        }
67    }
68
69    /// Start leader election process using localStorage coordination
70    pub async fn start_election(&mut self) -> Result<(), DatabaseError> {
71        log::debug!(
72            "LeaderElectionManager::start_election() - Starting for {}",
73            self.state.borrow().db_name
74        );
75
76        // Create BroadcastChannel for EVENT-BASED leader election
77        let channel_name = format!("datasync_leader_{}", self.state.borrow().db_name);
78        log::debug!(
79            "LeaderElectionManager::start_election() - Creating BroadcastChannel: {}",
80            channel_name
81        );
82        let broadcast_channel = BroadcastChannel::new(&channel_name).map_err(|_| {
83            DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to create BroadcastChannel")
84        })?;
85
86        // Set up EVENT LISTENER for leadership change messages
87        let state_clone = self.state.clone();
88        let listener = Closure::wrap(Box::new(move |event: web_sys::MessageEvent| {
89            if let Ok(data) = event.data().dyn_into::<js_sys::JsString>() {
90                let message: String = data.into();
91
92                // Parse message: "LEADER_CLAIMED:instance_id:timestamp"
93                if let Some(parts) = message.strip_prefix("LEADER_CLAIMED:") {
94                    let parts: Vec<&str> = parts.split(':').collect();
95                    if parts.len() == 2 {
96                        let new_leader_id = parts[0];
97                        if let Ok(_timestamp) = parts[1].parse::<u64>() {
98                            let mut state = state_clone.borrow_mut();
99                            let my_instance_id = state.instance_id.clone();
100
101                            if new_leader_id == my_instance_id {
102                                // We're the leader!
103                                state.is_leader = true;
104                                state.leader_id = Some(new_leader_id.to_string());
105                                log::info!(
106                                    "EVENT: Became leader for {} via BroadcastChannel",
107                                    state.db_name
108                                );
109                            } else {
110                                // Someone else is leader
111                                state.is_leader = false;
112                                state.leader_id = Some(new_leader_id.to_string());
113                                log::debug!(
114                                    "EVENT: {} is now leader for {}",
115                                    new_leader_id,
116                                    state.db_name
117                                );
118                            }
119                        }
120                    }
121                }
122            }
123        }) as Box<dyn FnMut(web_sys::MessageEvent)>);
124
125        broadcast_channel.set_onmessage(Some(listener.as_ref().unchecked_ref()));
126        self.message_listener = Some(listener);
127        self.broadcast_channel = Some(broadcast_channel);
128
129        // Use localStorage for atomic coordination - no delays needed
130        log::debug!("LeaderElectionManager::start_election() - Calling try_become_leader()");
131        self.try_become_leader().await?;
132
133        // Start heartbeat if we're leader
134        let is_leader = self.state.borrow().is_leader;
135        web_sys::console::log_1(
136            &format!(
137                "LeaderElectionManager::start_election() - After try_become_leader, is_leader={}",
138                is_leader
139            )
140            .into(),
141        );
142        if is_leader {
143            web_sys::console::log_1(
144                &"LeaderElectionManager::start_election() - Calling start_heartbeat()".into(),
145            );
146            self.start_heartbeat()?;
147            web_sys::console::log_1(
148                &"LeaderElectionManager::start_election() - Heartbeat started".into(),
149            );
150        } else {
151            web_sys::console::log_1(
152                &"LeaderElectionManager::start_election() - Not leader, skipping heartbeat".into(),
153            );
154        }
155
156        Ok(())
157    }
158
159    /// Try to become leader using localStorage-based atomic coordination
160    ///
161    /// # Arguments
162    /// * `force` - If true, ignores existing leader's valid lease and forces takeover
163    pub async fn try_become_leader_internal(&mut self, force: bool) -> Result<(), DatabaseError> {
164        let state = self.state.borrow();
165        let my_instance_id = state.instance_id.clone();
166        let db_name = state.db_name.clone();
167        drop(state);
168
169        // Use localStorage for atomic coordination
170        let window = web_sys::window().ok_or_else(|| {
171            DatabaseError::new(
172                "STORAGE_ERROR",
173                "Window not available - not in browser context",
174            )
175        })?;
176        let storage = window
177            .local_storage()
178            .map_err(|_| {
179                DatabaseError::new(
180                    "STORAGE_ERROR",
181                    "localStorage access denied (check browser settings)",
182                )
183            })?
184            .ok_or_else(|| {
185                DatabaseError::new(
186                    "STORAGE_ERROR",
187                    "localStorage unavailable (private browsing mode?)",
188                )
189            })?;
190
191        let instances_key = format!("datasync_instances_{}", db_name);
192        let leader_key = format!("datasync_leader_{}", db_name);
193
194        // Step 1: Register our instance atomically
195        let current_time = Date::now() as u64;
196        let instance_data = format!("{}:{}", my_instance_id, current_time);
197
198        // Get existing instances
199        let existing_instances = storage
200            .get_item(&instances_key)
201            .map_err(|e| {
202                DatabaseError::new(
203                    "LEADER_ELECTION_ERROR",
204                    &format!("Failed to get instances: {:?}", e),
205                )
206            })?
207            .unwrap_or_default();
208        let mut all_instances: Vec<String> = if existing_instances.is_empty() {
209            Vec::new()
210        } else {
211            existing_instances
212                .split(',')
213                .map(|s| s.to_string())
214                .collect()
215        };
216
217        // Add ourselves if not already present
218        if !all_instances
219            .iter()
220            .any(|inst| inst.starts_with(&format!("{}:", my_instance_id)))
221        {
222            all_instances.push(instance_data);
223        }
224
225        // Clean up expired instances (older than 10 seconds)
226        let cutoff_time = current_time - 10000;
227        all_instances.retain(|inst| {
228            if let Some(colon_pos) = inst.rfind(':') {
229                if let Ok(timestamp) = inst[colon_pos + 1..].parse::<u64>() {
230                    timestamp > cutoff_time
231                } else {
232                    false
233                }
234            } else {
235                false
236            }
237        });
238
239        // Update instances list
240        let instances_str = all_instances.join(",");
241        storage
242            .set_item(&instances_key, &instances_str)
243            .map_err(|e| {
244                DatabaseError::new(
245                    "LEADER_ELECTION_ERROR",
246                    &format!("Failed to set instances: {:?}", e),
247                )
248            })?;
249
250        // Step 2: Determine leader based on lowest instance ID
251        let mut instance_ids: Vec<String> = all_instances
252            .iter()
253            .filter_map(|inst| inst.split(':').next().map(|s| s.to_string()))
254            .collect();
255        instance_ids.sort();
256
257        log::debug!("All instances for {}: {:?}", db_name, instance_ids);
258
259        if let Some(lowest_id) = instance_ids.first() {
260            if force || *lowest_id == my_instance_id {
261                // We should be the leader - attempt atomic claim (either we have lowest ID or we're forcing)
262                if force && *lowest_id != my_instance_id {
263                    log::debug!(
264                        "FORCING leadership takeover for {} (overriding lowest ID rule)",
265                        db_name
266                    );
267                } else {
268                    log::debug!(
269                        "I have the lowest ID - attempting atomic leadership claim for {}",
270                        db_name
271                    );
272                }
273
274                // Check if someone else already claimed leadership (atomic check-and-set)
275                if let Ok(Some(existing_data)) = storage.get_item(&leader_key) {
276                    if let Some(colon_pos) = existing_data.rfind(':') {
277                        let existing_leader_id = &existing_data[..colon_pos];
278                        if let Ok(existing_timestamp) =
279                            existing_data[colon_pos + 1..].parse::<u64>()
280                        {
281                            let existing_lease_expired = (current_time - existing_timestamp) > 5000;
282
283                            if !force
284                                && !existing_lease_expired
285                                && existing_leader_id != my_instance_id
286                            {
287                                // Someone else is already leader and lease is valid (and we're not forcing)
288                                log::debug!(
289                                    "{} already claimed leadership for {}",
290                                    existing_leader_id,
291                                    db_name
292                                );
293
294                                let mut state = self.state.borrow_mut();
295                                state.is_leader = false;
296                                state.leader_id = Some(existing_leader_id.to_string());
297                                state.lease_expiry = existing_timestamp + self.lease_duration_ms;
298                                return Ok(());
299                            }
300                        }
301                    }
302                }
303
304                // Atomically claim leadership (no valid existing leader)
305                let leader_data = format!("{}:{}", my_instance_id, current_time);
306                storage.set_item(&leader_key, &leader_data).map_err(|e| {
307                    DatabaseError::new(
308                        "LEADER_ELECTION_ERROR",
309                        &format!("Failed to set leader: {:?}", e),
310                    )
311                })?;
312
313                let mut state = self.state.borrow_mut();
314                state.is_leader = true;
315                state.leader_id = Some(my_instance_id.clone());
316                state.lease_expiry = current_time + self.lease_duration_ms;
317                drop(state);
318
319                log::info!("Became leader for {} with ID {}", db_name, my_instance_id);
320
321                // BROADCAST EVENT: Leadership claimed - NO POLLING NEEDED!
322                if let Some(ref channel) = self.broadcast_channel {
323                    let message = format!("LEADER_CLAIMED:{}:{}", my_instance_id, current_time);
324                    if let Err(e) = channel.post_message(&JsValue::from_str(&message)) {
325                        log::warn!("Failed to broadcast leadership claim: {:?}", e);
326                    } else {
327                        log::debug!("EVENT: Broadcasted leadership claim for {}", db_name);
328                    }
329                }
330
331                // Start heartbeat to maintain lease
332                if self.heartbeat_interval.is_none() {
333                    let _ = self.start_heartbeat();
334                }
335            } else {
336                // Someone else should be the leader
337                log::debug!(
338                    "Instance {} has lower ID - not claiming leadership for {}",
339                    lowest_id,
340                    db_name
341                );
342
343                let mut state = self.state.borrow_mut();
344                state.is_leader = false;
345                state.leader_id = Some(lowest_id.clone());
346                state.lease_expiry = current_time + self.lease_duration_ms;
347            }
348        }
349
350        Ok(())
351    }
352
353    /// Try to become leader (respects existing leader's lease)
354    pub async fn try_become_leader(&mut self) -> Result<(), DatabaseError> {
355        self.try_become_leader_internal(false).await
356    }
357
358    /// Force leadership takeover (ignores existing leader's lease)
359    pub async fn force_become_leader(&mut self) -> Result<(), DatabaseError> {
360        self.try_become_leader_internal(true).await
361    }
362
363    /// Start sending heartbeats as leader using localStorage
364    pub fn start_heartbeat(&mut self) -> Result<(), DatabaseError> {
365        web_sys::console::log_1(&"start_heartbeat() called".into());
366
367        // CRITICAL: Send initial heartbeat IMMEDIATELY
368        let state = self.state.borrow();
369        web_sys::console::log_1(
370            &format!(
371                "start_heartbeat: is_leader={}, db_name={}",
372                state.is_leader, state.db_name
373            )
374            .into(),
375        );
376
377        if state.is_leader {
378            let current_time = Date::now() as u64;
379
380            let window = web_sys::window()
381                .ok_or_else(|| DatabaseError::new("LEADER_ELECTION_ERROR", "Window unavailable"))?;
382            let storage = window
383                .local_storage()
384                .map_err(|_| {
385                    DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage unavailable")
386                })?
387                .ok_or_else(|| {
388                    DatabaseError::new("LEADER_ELECTION_ERROR", "localStorage is None")
389                })?;
390
391            let leader_key = format!("datasync_leader_{}", state.db_name);
392            let leader_data = format!("{}:{}", state.instance_id, current_time);
393
394            web_sys::console::log_1(
395                &format!(
396                    "Writing heartbeat: key={}, data={}",
397                    leader_key, leader_data
398                )
399                .into(),
400            );
401
402            storage.set_item(&leader_key, &leader_data).map_err(|e| {
403                web_sys::console::error_1(
404                    &format!("Failed to write initial heartbeat: {:?}", e).into(),
405                );
406                DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to write initial heartbeat")
407            })?;
408
409            web_sys::console::log_1(
410                &format!(
411                    "Sent initial heartbeat for {} from leader {}",
412                    state.db_name, state.instance_id
413                )
414                .into(),
415            );
416        } else {
417            web_sys::console::warn_1(&"start_heartbeat called but is_leader=false".into());
418        }
419        drop(state);
420
421        // Now set up interval for periodic updates
422        let state_clone = self.state.clone();
423
424        let closure = Closure::wrap(Box::new(move || {
425            // Reentrancy guard: skip if heartbeat is already running
426            // This prevents "closure invoked recursively" errors from wasm-bindgen
427            let already_running = HEARTBEAT_RUNNING.with(|running| {
428                let was_running = *running.borrow();
429                if !was_running {
430                    *running.borrow_mut() = true;
431                }
432                was_running
433            });
434
435            if already_running {
436                log::debug!("Heartbeat skipped - previous invocation still running");
437                return;
438            }
439
440            // Ensure we clear the flag when done (even on early return)
441            struct HeartbeatGuard;
442            impl Drop for HeartbeatGuard {
443                fn drop(&mut self) {
444                    HEARTBEAT_RUNNING.with(|running| {
445                        *running.borrow_mut() = false;
446                    });
447                }
448            }
449            let _guard = HeartbeatGuard;
450
451            let state = state_clone.borrow();
452            if state.is_leader {
453                let current_time = Date::now() as u64;
454
455                // Update leader heartbeat in localStorage
456                let window = match web_sys::window() {
457                    Some(w) => w,
458                    None => {
459                        log::error!("Window unavailable in heartbeat - stopping heartbeat");
460                        return;
461                    }
462                };
463                let storage = match window.local_storage() {
464                    Ok(Some(s)) => s,
465                    Ok(None) => {
466                        log::warn!("localStorage unavailable in heartbeat (private browsing?)");
467                        return;
468                    }
469                    Err(_) => {
470                        log::error!("localStorage access denied in heartbeat");
471                        return;
472                    }
473                };
474                let leader_key = format!("datasync_leader_{}", state.db_name);
475                let leader_data = format!("{}:{}", state.instance_id, current_time);
476
477                let _ = storage.set_item(&leader_key, &leader_data);
478
479                log::debug!(
480                    "Updated leader heartbeat for {} from leader {}",
481                    state.db_name,
482                    state.instance_id
483                );
484            }
485        }) as Box<dyn FnMut()>);
486
487        let interval_id = web_sys::window()
488            .unwrap()
489            .set_interval_with_callback_and_timeout_and_arguments_0(
490                closure.as_ref().unchecked_ref(),
491                1000, // Send heartbeat every 1 second
492            )
493            .map_err(|_| {
494                DatabaseError::new(
495                    "LEADER_ELECTION_ERROR",
496                    "Failed to start heartbeat interval",
497                )
498            })?;
499
500        // CRITICAL: Store closure instead of forgetting it, so it can be properly cleaned up
501        self.heartbeat_interval = Some(interval_id);
502        self.heartbeat_closure = Some(closure);
503
504        Ok(())
505    }
506
507    /// Stop leader election (e.g., when tab is closing)
508    pub async fn stop_election(&mut self) -> Result<(), DatabaseError> {
509        // CRITICAL: Check if already stopped (idempotent)
510        if self.heartbeat_interval.is_none() && self.heartbeat_closure.is_none() {
511            web_sys::console::log_1(&"[STOP] Already stopped - skipping".into());
512            return Ok(());
513        }
514
515        let state = self.state.borrow();
516        let db_name = state.db_name.clone();
517        let instance_id = state.instance_id.clone();
518        let was_leader = state.is_leader;
519        drop(state);
520
521        log::info!(
522            "[STOP] Stopping leader election for {} (was_leader: {})",
523            db_name,
524            was_leader
525        );
526
527        // CRITICAL: Clear interval and closure FIRST to release Rc references
528        if let Some(interval_id) = self.heartbeat_interval.take() {
529            web_sys::console::log_1(
530                &format!("[STOP] Clearing interval {} for {}", interval_id, db_name).into(),
531            );
532            if let Some(window) = web_sys::window() {
533                window.clear_interval_with_handle(interval_id);
534            }
535        }
536
537        // Drop the closure to release any Rc<RefCell<State>> references
538        if let Some(_closure) = self.heartbeat_closure.take() {
539            web_sys::console::log_1(
540                &format!("[STOP] Dropped heartbeat closure for {}", db_name).into(),
541            );
542        }
543
544        // CRITICAL: Close the BroadcastChannel to prevent test interference
545        if let Some(channel) = self.broadcast_channel.take() {
546            channel.close();
547            web_sys::console::log_1(
548                &format!("[STOP] Closed BroadcastChannel for {}", db_name).into(),
549            );
550        }
551
552        // Remove ourselves from localStorage instances list
553        let Some(window) = web_sys::window() else {
554            log::warn!("Window unavailable during cleanup");
555            return Ok(());
556        };
557        let storage = match window.local_storage() {
558            Ok(Some(s)) => s,
559            Ok(None) | Err(_) => {
560                log::warn!("localStorage unavailable during cleanup (private browsing?)");
561                return Ok(());
562            }
563        };
564        let instances_key = format!("datasync_instances_{}", db_name);
565
566        if let Ok(Some(existing_instances)) = storage.get_item(&instances_key) {
567            let all_instances: Vec<String> = existing_instances
568                .split(',')
569                .map(|s| s.to_string())
570                .collect();
571            let filtered_instances: Vec<String> = all_instances
572                .into_iter()
573                .filter(|inst| !inst.starts_with(&format!("{}:", instance_id)))
574                .collect();
575
576            if filtered_instances.is_empty() {
577                // Remove the key entirely if no instances left
578                let _ = storage.remove_item(&instances_key);
579            } else {
580                // Update with remaining instances
581                let instances_str = filtered_instances.join(",");
582                let _ = storage.set_item(&instances_key, &instances_str);
583            }
584        }
585
586        // Clear leader data if we were the leader
587        if was_leader {
588            let leader_key = format!("datasync_leader_{}", db_name);
589            let _ = storage.remove_item(&leader_key);
590
591            log::debug!(
592                "Cleared leader data for {} (was leader: {})",
593                db_name,
594                instance_id
595            );
596        }
597
598        // Reset state
599        let mut state = self.state.borrow_mut();
600        state.is_leader = false;
601        state.leader_id = None;
602
603        Ok(())
604    }
605
606    /// Check if this instance is the leader (with localStorage validation and re-election)
607    pub async fn is_leader(&self) -> bool {
608        let now = Date::now() as u64;
609        let state = self.state.borrow();
610        let db_name = state.db_name.clone();
611        let my_instance_id = state.instance_id.clone();
612
613        // If localStorage is unavailable, we can't coordinate - return false
614        let Some(window) = web_sys::window() else {
615            log::warn!("Window unavailable for leader check - assuming not leader");
616            return false;
617        };
618        let storage = match window.local_storage() {
619            Ok(Some(s)) => s,
620            Ok(None) => {
621                log::warn!(
622                    "localStorage unavailable for leader check (private browsing?) - assuming not leader"
623                );
624                return false;
625            }
626            Err(_) => {
627                log::error!("localStorage access denied for leader check - assuming not leader");
628                return false;
629            }
630        };
631        let leader_key = format!("datasync_leader_{}", db_name);
632
633        // Check current leader in localStorage
634        let current_leader_expired = if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
635            if let Some(colon_pos) = leader_data.rfind(':') {
636                let leader_id = &leader_data[..colon_pos];
637                if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
638                    let lease_expired = (now - timestamp) > 5000; // 5 second lease
639
640                    if leader_id == my_instance_id && !lease_expired {
641                        return true; // We're still the valid leader
642                    }
643
644                    lease_expired // Return whether the current leader's lease expired
645                } else {
646                    true // Invalid timestamp, consider expired
647                }
648            } else {
649                true // Invalid format, consider expired
650            }
651        } else {
652            true // No leader data, consider expired
653        };
654
655        drop(state);
656
657        // If current leader expired, we need re-election (but can't do it from immutable self)
658        if current_leader_expired {
659            log::debug!(
660                "Current leader lease expired for {} - re-election needed",
661                db_name
662            );
663
664            // Update our state to reflect no current leader
665            let mut state = self.state.borrow_mut();
666            state.is_leader = false;
667            state.leader_id = None;
668            false
669        } else {
670            // Update our state to reflect we're not leader
671            let mut state = self.state.borrow_mut();
672            state.is_leader = false;
673            false
674        }
675    }
676
677    /// Send a heartbeat (for testing)
678    pub async fn send_heartbeat(&self) -> Result<(), DatabaseError> {
679        if let Some(ref channel) = self.broadcast_channel {
680            let state = self.state.borrow();
681            let now = Date::now() as u64;
682
683            let message = js_sys::Object::new();
684            js_sys::Reflect::set(&message, &"type".into(), &"heartbeat".into()).unwrap();
685            js_sys::Reflect::set(
686                &message,
687                &"leader_id".into(),
688                &state.instance_id.clone().into(),
689            )
690            .unwrap();
691            js_sys::Reflect::set(&message, &"timestamp".into(), &(now as f64).into()).unwrap();
692
693            channel.post_message(&message).map_err(|_| {
694                DatabaseError::new("LEADER_ELECTION_ERROR", "Failed to send heartbeat")
695            })?;
696        }
697
698        Ok(())
699    }
700
701    /// Get timestamp of last received leader heartbeat from localStorage
702    pub async fn get_last_heartbeat(&self) -> u64 {
703        let state = self.state.borrow();
704        let Some(window) = web_sys::window() else {
705            log::warn!("Window unavailable for heartbeat check");
706            return 0;
707        };
708        let storage = match window.local_storage() {
709            Ok(Some(s)) => s,
710            Ok(None) | Err(_) => {
711                log::warn!("localStorage unavailable for heartbeat check (private browsing?)");
712                return 0;
713            }
714        };
715        let leader_key = format!("datasync_leader_{}", state.db_name);
716
717        if let Ok(Some(leader_data)) = storage.get_item(&leader_key) {
718            if let Some(colon_pos) = leader_data.rfind(':') {
719                if let Ok(timestamp) = leader_data[colon_pos + 1..].parse::<u64>() {
720                    return timestamp;
721                }
722            }
723        }
724
725        // Fallback to stored value
726        state.last_heartbeat
727    }
728}