Skip to main content

apiary_runtime/
heartbeat.rs

1//! Heartbeat writing, world view building, and node lifecycle.
2//!
3//! Each node writes a heartbeat file to object storage at a regular interval.
4//! The [`WorldViewBuilder`] polls the heartbeat prefix to build a
5//! [`WorldView`] — a snapshot of all known nodes and their status.
6
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use tokio::sync::RwLock;
15use tracing::{debug, info, warn};
16
17use apiary_core::error::ApiaryError;
18use apiary_core::storage::StorageBackend;
19use apiary_core::types::NodeId;
20use apiary_core::Result;
21
22use crate::bee::BeePool;
23use crate::behavioral::ColonyThermometer;
24use crate::cache::CellCache;
25
26// ---------------------------------------------------------------------------
27// Heartbeat data structures
28// ---------------------------------------------------------------------------
29
30/// Capacity information for a node.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct HeartbeatCapacity {
33    pub cores: usize,
34    pub memory_total_bytes: u64,
35    pub memory_per_bee: u64,
36    pub target_cell_size: u64,
37}
38
39/// Current load information for a node.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct HeartbeatLoad {
42    pub bees_total: usize,
43    pub bees_busy: usize,
44    pub bees_idle: usize,
45    pub memory_pressure: f64,
46    pub queue_depth: usize,
47    pub colony_temperature: f64,
48}
49
50/// Cache summary for a node.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct HeartbeatCache {
53    pub size_bytes: u64,
54    /// Map of storage keys to cell sizes used for both heartbeat reporting
55    /// and cache-aware query planning in distributed execution
56    pub cached_cells: HashMap<String, u64>,
57}
58
59/// A heartbeat written by a node to object storage.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct Heartbeat {
62    pub node_id: String,
63    pub timestamp: DateTime<Utc>,
64    pub version: u64,
65    pub capacity: HeartbeatCapacity,
66    pub load: HeartbeatLoad,
67    pub cache: HeartbeatCache,
68}
69
70// ---------------------------------------------------------------------------
71// HeartbeatWriter
72// ---------------------------------------------------------------------------
73
74/// Writes the local node's heartbeat to storage at a regular interval.
75pub struct HeartbeatWriter {
76    storage: Arc<dyn StorageBackend>,
77    node_id: NodeId,
78    interval: Duration,
79    version: AtomicU64,
80    bee_pool: Arc<BeePool>,
81    cell_cache: Arc<CellCache>,
82    thermometer: ColonyThermometer,
83    cores: usize,
84    memory_total_bytes: u64,
85    memory_per_bee: u64,
86    target_cell_size: u64,
87}
88
89impl HeartbeatWriter {
90    /// Create a new heartbeat writer from a node config and bee pool.
91    pub fn new(
92        storage: Arc<dyn StorageBackend>,
93        config: &apiary_core::config::NodeConfig,
94        bee_pool: Arc<BeePool>,
95        cell_cache: Arc<CellCache>,
96    ) -> Self {
97        Self {
98            storage,
99            node_id: config.node_id.clone(),
100            interval: config.heartbeat_interval,
101            version: AtomicU64::new(0),
102            bee_pool,
103            cell_cache,
104            thermometer: ColonyThermometer::default(),
105            cores: config.cores,
106            memory_total_bytes: config.memory_bytes,
107            memory_per_bee: config.memory_per_bee,
108            target_cell_size: config.target_cell_size,
109        }
110    }
111
112    /// Collect a heartbeat snapshot from the current node state.
113    pub async fn collect_heartbeat(&self) -> Heartbeat {
114        let statuses = self.bee_pool.status().await;
115        let bees_total = statuses.len();
116        let bees_busy = statuses.iter().filter(|s| s.state != "idle").count();
117        let bees_idle = bees_total - bees_busy;
118
119        let total_memory_used: u64 = statuses.iter().map(|s| s.memory_used).sum();
120        let total_budget: u64 = statuses.iter().map(|s| s.memory_budget).sum();
121        let memory_pressure = if total_budget > 0 {
122            total_memory_used as f64 / total_budget as f64
123        } else {
124            0.0
125        };
126
127        // Use the colony thermometer to measure system health
128        let colony_temperature = self.thermometer.measure(&self.bee_pool).await;
129
130        // Get queue depth from bee pool
131        let queue_depth = self.bee_pool.queue_size().await;
132
133        let version = self.version.fetch_add(1, Ordering::Relaxed) + 1;
134
135        // Get cached cells from cell cache
136        let cached_cells = self.cell_cache.list_cached_cells().await;
137        let cache_size = self.cell_cache.size();
138
139        Heartbeat {
140            node_id: self.node_id.as_str().to_string(),
141            timestamp: Utc::now(),
142            version,
143            capacity: HeartbeatCapacity {
144                cores: self.cores,
145                memory_total_bytes: self.memory_total_bytes,
146                memory_per_bee: self.memory_per_bee,
147                target_cell_size: self.target_cell_size,
148            },
149            load: HeartbeatLoad {
150                bees_total,
151                bees_busy,
152                bees_idle,
153                memory_pressure,
154                queue_depth,
155                colony_temperature,
156            },
157            cache: HeartbeatCache {
158                size_bytes: cache_size,
159                cached_cells,
160            },
161        }
162    }
163
164    /// Write a single heartbeat to storage.
165    pub async fn write_once(&self) -> Result<()> {
166        let heartbeat = self.collect_heartbeat().await;
167        let key = format!("_heartbeats/node_{}.json", self.node_id);
168        let json = serde_json::to_vec_pretty(&heartbeat)
169            .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
170        self.storage.put(&key, json.into()).await
171    }
172
173    /// Run the heartbeat writer loop until the cancellation token fires.
174    pub async fn run(&self, cancel: tokio::sync::watch::Receiver<bool>) {
175        // Write the first heartbeat immediately on start (join the swarm).
176        if let Err(e) = self.write_once().await {
177            warn!(error = %e, "Failed to write initial heartbeat");
178        } else {
179            info!(node_id = %self.node_id, "Heartbeat writer started");
180        }
181
182        loop {
183            tokio::select! {
184                _ = tokio::time::sleep(self.interval) => {
185                    if let Err(e) = self.write_once().await {
186                        warn!(error = %e, "Failed to write heartbeat");
187                    }
188                }
189                _ = wait_for_cancel(&cancel) => {
190                    debug!(node_id = %self.node_id, "Heartbeat writer stopping");
191                    break;
192                }
193            }
194        }
195    }
196
197    /// Delete this node's heartbeat file (graceful departure).
198    pub async fn delete_heartbeat(&self) -> Result<()> {
199        let key = format!("_heartbeats/node_{}.json", self.node_id);
200        self.storage.delete(&key).await
201    }
202}
203
204// ---------------------------------------------------------------------------
205// World View
206// ---------------------------------------------------------------------------
207
208/// The state of a node as observed via its heartbeat.
209#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
210pub enum NodeState {
211    /// Heartbeat is recent — node is healthy.
212    Alive,
213    /// Heartbeat is stale (> dead_threshold / 2) — may be in trouble.
214    Suspect,
215    /// Heartbeat is very stale (> dead_threshold) — considered dead.
216    Dead,
217}
218
219/// Status of a single node in the world view.
220#[derive(Debug, Clone)]
221pub struct NodeStatus {
222    pub node_id: NodeId,
223    pub heartbeat: Heartbeat,
224    pub state: NodeState,
225}
226
227/// A snapshot of all known nodes in the swarm.
228#[derive(Debug, Clone)]
229pub struct WorldView {
230    pub nodes: HashMap<NodeId, NodeStatus>,
231    pub updated_at: DateTime<Utc>,
232}
233
234impl WorldView {
235    /// Create an empty world view.
236    pub fn empty() -> Self {
237        Self {
238            nodes: HashMap::new(),
239            updated_at: Utc::now(),
240        }
241    }
242
243    /// Return only alive nodes.
244    pub fn alive_nodes(&self) -> Vec<&NodeStatus> {
245        self.nodes
246            .values()
247            .filter(|n| n.state == NodeState::Alive)
248            .collect()
249    }
250
251    /// Return the total number of bees across alive nodes.
252    pub fn total_bees(&self) -> usize {
253        self.alive_nodes()
254            .iter()
255            .map(|n| n.heartbeat.load.bees_total)
256            .sum()
257    }
258
259    /// Return the total number of idle bees across alive nodes.
260    pub fn total_idle_bees(&self) -> usize {
261        self.alive_nodes()
262            .iter()
263            .map(|n| n.heartbeat.load.bees_idle)
264            .sum()
265    }
266}
267
268// ---------------------------------------------------------------------------
269// WorldViewBuilder
270// ---------------------------------------------------------------------------
271
272/// Periodically polls the heartbeat prefix and builds the world view.
273pub struct WorldViewBuilder {
274    storage: Arc<dyn StorageBackend>,
275    poll_interval: Duration,
276    dead_threshold: Duration,
277    world_view: Arc<RwLock<WorldView>>,
278}
279
280impl WorldViewBuilder {
281    /// Create a new world view builder.
282    pub fn new(
283        storage: Arc<dyn StorageBackend>,
284        poll_interval: Duration,
285        dead_threshold: Duration,
286    ) -> Self {
287        Self {
288            storage,
289            poll_interval,
290            dead_threshold,
291            world_view: Arc::new(RwLock::new(WorldView::empty())),
292        }
293    }
294
295    /// Return a shared handle to the world view.
296    pub fn world_view(&self) -> Arc<RwLock<WorldView>> {
297        Arc::clone(&self.world_view)
298    }
299
300    /// Build the world view once by reading all heartbeat files.
301    pub async fn build_once(&self) -> Result<WorldView> {
302        let keys = self.storage.list("_heartbeats/").await?;
303        let now = Utc::now();
304        let mut nodes = HashMap::new();
305
306        for key in &keys {
307            // Only process JSON heartbeat files
308            if !key.ends_with(".json") {
309                continue;
310            }
311
312            match self.storage.get(key).await {
313                Ok(data) => match serde_json::from_slice::<Heartbeat>(&data) {
314                    Ok(hb) => {
315                        let age = now
316                            .signed_duration_since(hb.timestamp)
317                            .to_std()
318                            .unwrap_or(Duration::from_secs(86400 * 365));
319
320                        let state = if age > self.dead_threshold {
321                            NodeState::Dead
322                        } else if age > self.dead_threshold / 2 {
323                            NodeState::Suspect
324                        } else {
325                            NodeState::Alive
326                        };
327
328                        let node_id = NodeId::new(&hb.node_id);
329                        nodes.insert(
330                            node_id.clone(),
331                            NodeStatus {
332                                node_id,
333                                heartbeat: hb,
334                                state,
335                            },
336                        );
337                    }
338                    Err(e) => {
339                        warn!(key = %key, error = %e, "Failed to parse heartbeat");
340                    }
341                },
342                Err(e) => {
343                    warn!(key = %key, error = %e, "Failed to read heartbeat file");
344                }
345            }
346        }
347
348        Ok(WorldView {
349            nodes,
350            updated_at: now,
351        })
352    }
353
354    /// Poll once, update the shared world view, and return it.
355    pub async fn poll_once(&self) -> Result<WorldView> {
356        let view = self.build_once().await?;
357        {
358            let mut wv = self.world_view.write().await;
359            *wv = view.clone();
360        }
361        Ok(view)
362    }
363
364    /// Run the world view builder loop until cancellation.
365    pub async fn run(&self, cancel: tokio::sync::watch::Receiver<bool>) {
366        // Build immediately on start.
367        if let Err(e) = self.poll_once().await {
368            warn!(error = %e, "Failed to build initial world view");
369        } else {
370            info!("World view builder started");
371        }
372
373        loop {
374            tokio::select! {
375                _ = tokio::time::sleep(self.poll_interval) => {
376                    if let Err(e) = self.poll_once().await {
377                        warn!(error = %e, "Failed to poll world view");
378                    }
379                }
380                _ = wait_for_cancel(&cancel) => {
381                    debug!("World view builder stopping");
382                    break;
383                }
384            }
385        }
386    }
387
388    /// Clean up heartbeat files for nodes that have been dead longer than `cleanup_age`.
389    pub async fn cleanup_stale(&self, cleanup_age: Duration) -> Result<usize> {
390        let view = self.world_view.read().await;
391        let now = Utc::now();
392        let mut cleaned = 0;
393
394        for status in view.nodes.values() {
395            if status.state == NodeState::Dead {
396                let age = now
397                    .signed_duration_since(status.heartbeat.timestamp)
398                    .to_std()
399                    .unwrap_or(Duration::from_secs(0));
400                if age > cleanup_age {
401                    let key = format!("_heartbeats/node_{}.json", status.heartbeat.node_id);
402                    if let Err(e) = self.storage.delete(&key).await {
403                        warn!(key = %key, error = %e, "Failed to clean up stale heartbeat");
404                    } else {
405                        cleaned += 1;
406                        info!(node_id = %status.heartbeat.node_id, "Cleaned up stale heartbeat");
407                    }
408                }
409            }
410        }
411
412        Ok(cleaned)
413    }
414}
415
416// ---------------------------------------------------------------------------
417// Helpers
418// ---------------------------------------------------------------------------
419
420/// Wait until the watch channel signals `true` (cancellation).
421async fn wait_for_cancel(rx: &tokio::sync::watch::Receiver<bool>) {
422    let mut rx = rx.clone();
423    // Wait until the value is true
424    loop {
425        if *rx.borrow() {
426            return;
427        }
428        if rx.changed().await.is_err() {
429            // Sender dropped — treat as cancel
430            return;
431        }
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use apiary_core::config::NodeConfig;
439    use apiary_storage::local::LocalBackend;
440
441    async fn make_storage(tmp: &tempfile::TempDir) -> Arc<dyn StorageBackend> {
442        Arc::new(LocalBackend::new(tmp.path()).await.unwrap())
443    }
444
445    fn make_config(
446        node_id: &str,
447        cores: usize,
448        memory: u64,
449        pool_tmp: &tempfile::TempDir,
450    ) -> NodeConfig {
451        let mut config = NodeConfig::detect("local://test");
452        config.node_id = NodeId::new(node_id);
453        config.cores = cores;
454        config.memory_bytes = memory;
455        config.memory_per_bee = if cores > 0 {
456            memory / cores as u64
457        } else {
458            memory
459        };
460        config.target_cell_size = config.memory_per_bee / 4;
461        config.cache_dir = pool_tmp.path().to_path_buf();
462        config
463    }
464
465    fn make_pool(config: &NodeConfig) -> Arc<BeePool> {
466        Arc::new(BeePool::new(config))
467    }
468
469    async fn make_cache(config: &NodeConfig, storage: Arc<dyn StorageBackend>) -> Arc<CellCache> {
470        let cache_dir = config.cache_dir.join("cells");
471        Arc::new(
472            CellCache::new(cache_dir, config.max_cache_size, storage)
473                .await
474                .unwrap(),
475        )
476    }
477
478    #[tokio::test]
479    async fn test_heartbeat_write_and_read() {
480        let tmp = tempfile::TempDir::new().unwrap();
481        let storage = make_storage(&tmp).await;
482        let pool_tmp = tempfile::TempDir::new().unwrap();
483        let config = make_config("test-node", 4, 4 * 1024 * 1024 * 1024, &pool_tmp);
484        let pool = make_pool(&config);
485        let cache = make_cache(&config, Arc::clone(&storage)).await;
486
487        let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
488
489        // Write a heartbeat
490        writer.write_once().await.unwrap();
491
492        // Read it back
493        let data = storage
494            .get("_heartbeats/node_test-node.json")
495            .await
496            .unwrap();
497        let hb: Heartbeat = serde_json::from_slice(&data).unwrap();
498
499        assert_eq!(hb.node_id, "test-node");
500        assert_eq!(hb.version, 1);
501        assert_eq!(hb.capacity.cores, 4);
502        assert_eq!(hb.load.bees_total, 4); // from pool with 4 cores
503        assert_eq!(hb.load.bees_idle, 4);
504        assert_eq!(hb.load.bees_busy, 0);
505    }
506
507    #[tokio::test]
508    async fn test_heartbeat_version_increments() {
509        let tmp = tempfile::TempDir::new().unwrap();
510        let storage = make_storage(&tmp).await;
511        let pool_tmp = tempfile::TempDir::new().unwrap();
512        let config = make_config("inc-node", 2, 1024, &pool_tmp);
513        let pool = make_pool(&config);
514        let cache = make_cache(&config, Arc::clone(&storage)).await;
515
516        let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
517
518        writer.write_once().await.unwrap();
519        writer.write_once().await.unwrap();
520        writer.write_once().await.unwrap();
521
522        let data = storage.get("_heartbeats/node_inc-node.json").await.unwrap();
523        let hb: Heartbeat = serde_json::from_slice(&data).unwrap();
524        assert_eq!(hb.version, 3);
525    }
526
527    #[tokio::test]
528    async fn test_heartbeat_delete() {
529        let tmp = tempfile::TempDir::new().unwrap();
530        let storage = make_storage(&tmp).await;
531        let pool_tmp = tempfile::TempDir::new().unwrap();
532        let config = make_config("del-node", 2, 1024, &pool_tmp);
533        let pool = make_pool(&config);
534        let cache = make_cache(&config, Arc::clone(&storage)).await;
535
536        let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
537
538        writer.write_once().await.unwrap();
539        assert!(storage
540            .exists("_heartbeats/node_del-node.json")
541            .await
542            .unwrap());
543
544        writer.delete_heartbeat().await.unwrap();
545        assert!(!storage
546            .exists("_heartbeats/node_del-node.json")
547            .await
548            .unwrap());
549    }
550
551    #[tokio::test]
552    async fn test_world_view_discovers_nodes() {
553        let tmp = tempfile::TempDir::new().unwrap();
554        let storage = make_storage(&tmp).await;
555        let pool_tmp = tempfile::TempDir::new().unwrap();
556        let config_a = make_config("node-a", 2, 1024, &pool_tmp);
557        let pool = make_pool(&config_a);
558        let cache1 = make_cache(&config_a, Arc::clone(&storage)).await;
559
560        let writer1 = HeartbeatWriter::new(
561            Arc::clone(&storage),
562            &config_a,
563            Arc::clone(&pool),
564            Arc::clone(&cache1),
565        );
566
567        let config_b = make_config("node-b", 4, 2048, &pool_tmp);
568        let cache2 = make_cache(&config_b, Arc::clone(&storage)).await;
569        let writer2 = HeartbeatWriter::new(Arc::clone(&storage), &config_b, pool, cache2);
570
571        writer1.write_once().await.unwrap();
572        writer2.write_once().await.unwrap();
573
574        // Build world view
575        let builder = WorldViewBuilder::new(
576            Arc::clone(&storage),
577            Duration::from_secs(5),
578            Duration::from_secs(30),
579        );
580        let view = builder.build_once().await.unwrap();
581
582        assert_eq!(view.nodes.len(), 2);
583        assert!(view.nodes.contains_key(&NodeId::new("node-a")));
584        assert!(view.nodes.contains_key(&NodeId::new("node-b")));
585
586        // Both should be alive
587        assert_eq!(view.nodes[&NodeId::new("node-a")].state, NodeState::Alive);
588        assert_eq!(view.nodes[&NodeId::new("node-b")].state, NodeState::Alive);
589    }
590
591    #[tokio::test]
592    async fn test_world_view_stale_heartbeat_becomes_dead() {
593        let tmp = tempfile::TempDir::new().unwrap();
594        let storage = make_storage(&tmp).await;
595
596        // Write a heartbeat with an old timestamp
597        let stale_hb = Heartbeat {
598            node_id: "stale-node".to_string(),
599            timestamp: Utc::now() - chrono::Duration::seconds(60),
600            version: 1,
601            capacity: HeartbeatCapacity {
602                cores: 2,
603                memory_total_bytes: 1024,
604                memory_per_bee: 512,
605                target_cell_size: 128,
606            },
607            load: HeartbeatLoad {
608                bees_total: 2,
609                bees_busy: 0,
610                bees_idle: 2,
611                memory_pressure: 0.0,
612                queue_depth: 0,
613                colony_temperature: 0.0,
614            },
615            cache: HeartbeatCache {
616                size_bytes: 0,
617                cached_cells: HashMap::new(),
618            },
619        };
620
621        let json = serde_json::to_vec(&stale_hb).unwrap();
622        storage
623            .put("_heartbeats/node_stale-node.json", json.into())
624            .await
625            .unwrap();
626
627        let builder = WorldViewBuilder::new(
628            Arc::clone(&storage),
629            Duration::from_secs(5),
630            Duration::from_secs(30), // dead after 30s
631        );
632        let view = builder.build_once().await.unwrap();
633
634        assert_eq!(view.nodes.len(), 1);
635        assert_eq!(
636            view.nodes[&NodeId::new("stale-node")].state,
637            NodeState::Dead
638        );
639    }
640
641    #[tokio::test]
642    async fn test_world_view_suspect_state() {
643        let tmp = tempfile::TempDir::new().unwrap();
644        let storage = make_storage(&tmp).await;
645
646        // Write heartbeat at exactly suspect range (between threshold/2 and threshold)
647        let suspect_hb = Heartbeat {
648            node_id: "suspect-node".to_string(),
649            timestamp: Utc::now() - chrono::Duration::seconds(20), // 20s old, threshold=30s, half=15s
650            version: 1,
651            capacity: HeartbeatCapacity {
652                cores: 2,
653                memory_total_bytes: 1024,
654                memory_per_bee: 512,
655                target_cell_size: 128,
656            },
657            load: HeartbeatLoad {
658                bees_total: 2,
659                bees_busy: 0,
660                bees_idle: 2,
661                memory_pressure: 0.0,
662                queue_depth: 0,
663                colony_temperature: 0.0,
664            },
665            cache: HeartbeatCache {
666                size_bytes: 0,
667                cached_cells: HashMap::new(),
668            },
669        };
670
671        let json = serde_json::to_vec(&suspect_hb).unwrap();
672        storage
673            .put("_heartbeats/node_suspect-node.json", json.into())
674            .await
675            .unwrap();
676
677        let builder = WorldViewBuilder::new(
678            Arc::clone(&storage),
679            Duration::from_secs(5),
680            Duration::from_secs(30),
681        );
682        let view = builder.build_once().await.unwrap();
683
684        assert_eq!(
685            view.nodes[&NodeId::new("suspect-node")].state,
686            NodeState::Suspect
687        );
688    }
689
690    #[tokio::test]
691    async fn test_world_view_poll_updates_shared_state() {
692        let tmp = tempfile::TempDir::new().unwrap();
693        let storage = make_storage(&tmp).await;
694        let pool_tmp = tempfile::TempDir::new().unwrap();
695        let config = make_config("shared-node", 2, 1024, &pool_tmp);
696        let pool = make_pool(&config);
697        let cache = make_cache(&config, Arc::clone(&storage)).await;
698
699        let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
700        writer.write_once().await.unwrap();
701
702        let builder = WorldViewBuilder::new(
703            Arc::clone(&storage),
704            Duration::from_secs(5),
705            Duration::from_secs(30),
706        );
707
708        // Before polling, world view is empty
709        {
710            let wv_arc = builder.world_view();
711            let wv = wv_arc.read().await;
712            assert_eq!(wv.nodes.len(), 0);
713        }
714
715        // After polling, world view has the node
716        builder.poll_once().await.unwrap();
717        {
718            let wv_arc = builder.world_view();
719            let wv = wv_arc.read().await;
720            assert_eq!(wv.nodes.len(), 1);
721            assert!(wv.nodes.contains_key(&NodeId::new("shared-node")));
722        }
723    }
724
725    #[tokio::test]
726    async fn test_world_view_graceful_departure() {
727        let tmp = tempfile::TempDir::new().unwrap();
728        let storage = make_storage(&tmp).await;
729        let pool_tmp = tempfile::TempDir::new().unwrap();
730        let config = make_config("departing-node", 2, 1024, &pool_tmp);
731        let pool = make_pool(&config);
732        let cache = make_cache(&config, Arc::clone(&storage)).await;
733
734        let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
735
736        writer.write_once().await.unwrap();
737
738        let builder = WorldViewBuilder::new(
739            Arc::clone(&storage),
740            Duration::from_secs(5),
741            Duration::from_secs(30),
742        );
743        let view = builder.build_once().await.unwrap();
744        assert_eq!(view.nodes.len(), 1);
745
746        // Graceful departure: delete heartbeat
747        writer.delete_heartbeat().await.unwrap();
748
749        // World view should now be empty
750        let view = builder.build_once().await.unwrap();
751        assert_eq!(view.nodes.len(), 0);
752    }
753
754    #[tokio::test]
755    async fn test_world_view_solo_mode() {
756        let tmp = tempfile::TempDir::new().unwrap();
757        let storage = make_storage(&tmp).await;
758        let pool_tmp = tempfile::TempDir::new().unwrap();
759        let config = make_config("solo-node", 2, 1024, &pool_tmp);
760        let pool = make_pool(&config);
761        let cache = make_cache(&config, Arc::clone(&storage)).await;
762
763        let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
764        writer.write_once().await.unwrap();
765
766        let builder = WorldViewBuilder::new(
767            Arc::clone(&storage),
768            Duration::from_secs(5),
769            Duration::from_secs(30),
770        );
771        let view = builder.build_once().await.unwrap();
772
773        // Solo mode: exactly 1 node, alive
774        assert_eq!(view.nodes.len(), 1);
775        assert_eq!(view.alive_nodes().len(), 1);
776        assert_eq!(view.alive_nodes()[0].node_id, NodeId::new("solo-node"));
777    }
778
779    #[tokio::test]
780    async fn test_world_view_totals() {
781        let tmp = tempfile::TempDir::new().unwrap();
782        let storage = make_storage(&tmp).await;
783        let pool_tmp = tempfile::TempDir::new().unwrap();
784        let config_n1 = make_config("n1", 2, 1024, &pool_tmp);
785        let pool = make_pool(&config_n1);
786        let cache1 = make_cache(&config_n1, Arc::clone(&storage)).await;
787
788        let writer1 = HeartbeatWriter::new(
789            Arc::clone(&storage),
790            &config_n1,
791            Arc::clone(&pool),
792            Arc::clone(&cache1),
793        );
794
795        let config_n2 = make_config("n2", 4, 2048, &pool_tmp);
796        let cache2 = make_cache(&config_n2, Arc::clone(&storage)).await;
797        let writer2 = HeartbeatWriter::new(Arc::clone(&storage), &config_n2, pool, cache2);
798
799        writer1.write_once().await.unwrap();
800        writer2.write_once().await.unwrap();
801
802        let builder = WorldViewBuilder::new(
803            Arc::clone(&storage),
804            Duration::from_secs(5),
805            Duration::from_secs(30),
806        );
807        let view = builder.build_once().await.unwrap();
808
809        // Both writers share the same BeePool with 2 bees, so totals reflect that
810        assert_eq!(view.total_bees(), 4); // 2 + 2
811        assert_eq!(view.total_idle_bees(), 4); // all idle
812    }
813
814    #[tokio::test]
815    async fn test_cleanup_stale_heartbeats() {
816        let tmp = tempfile::TempDir::new().unwrap();
817        let storage = make_storage(&tmp).await;
818
819        // Write a very old heartbeat (dead for > 1 hour)
820        let old_hb = Heartbeat {
821            node_id: "ancient-node".to_string(),
822            timestamp: Utc::now() - chrono::Duration::hours(2),
823            version: 1,
824            capacity: HeartbeatCapacity {
825                cores: 2,
826                memory_total_bytes: 1024,
827                memory_per_bee: 512,
828                target_cell_size: 128,
829            },
830            load: HeartbeatLoad {
831                bees_total: 2,
832                bees_busy: 0,
833                bees_idle: 2,
834                memory_pressure: 0.0,
835                queue_depth: 0,
836                colony_temperature: 0.0,
837            },
838            cache: HeartbeatCache {
839                size_bytes: 0,
840                cached_cells: HashMap::new(),
841            },
842        };
843
844        let json = serde_json::to_vec(&old_hb).unwrap();
845        storage
846            .put("_heartbeats/node_ancient-node.json", json.into())
847            .await
848            .unwrap();
849
850        let builder = WorldViewBuilder::new(
851            Arc::clone(&storage),
852            Duration::from_secs(5),
853            Duration::from_secs(30),
854        );
855
856        // Build view first
857        builder.poll_once().await.unwrap();
858
859        // Clean up heartbeats dead > 1 hour
860        let cleaned = builder
861            .cleanup_stale(Duration::from_secs(3600))
862            .await
863            .unwrap();
864        assert_eq!(cleaned, 1);
865
866        // File should be gone
867        assert!(!storage
868            .exists("_heartbeats/node_ancient-node.json")
869            .await
870            .unwrap());
871    }
872
873    #[tokio::test]
874    async fn test_heartbeat_writer_with_cancel() {
875        let tmp = tempfile::TempDir::new().unwrap();
876        let storage = make_storage(&tmp).await;
877        let pool_tmp = tempfile::TempDir::new().unwrap();
878        let mut config = make_config("cancel-node", 2, 1024, &pool_tmp);
879        config.heartbeat_interval = Duration::from_millis(50);
880        let pool = make_pool(&config);
881        let cache = make_cache(&config, Arc::clone(&storage)).await;
882
883        let writer = Arc::new(HeartbeatWriter::new(
884            Arc::clone(&storage),
885            &config,
886            pool,
887            cache,
888        ));
889
890        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
891
892        let w = Arc::clone(&writer);
893        let handle = tokio::spawn(async move {
894            w.run(cancel_rx).await;
895        });
896
897        // Let it run for a bit
898        tokio::time::sleep(Duration::from_millis(200)).await;
899
900        // Cancel it
901        cancel_tx.send(true).unwrap();
902        handle.await.unwrap();
903
904        // Should have written at least the initial heartbeat
905        let data = storage
906            .get("_heartbeats/node_cancel-node.json")
907            .await
908            .unwrap();
909        let hb: Heartbeat = serde_json::from_slice(&data).unwrap();
910        assert!(hb.version >= 1);
911    }
912}