1use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use tokio::sync::RwLock;
15use tracing::{debug, info, warn};
16
17use apiary_core::error::ApiaryError;
18use apiary_core::storage::StorageBackend;
19use apiary_core::types::NodeId;
20use apiary_core::Result;
21
22use crate::bee::BeePool;
23use crate::behavioral::ColonyThermometer;
24use crate::cache::CellCache;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct HeartbeatCapacity {
33 pub cores: usize,
34 pub memory_total_bytes: u64,
35 pub memory_per_bee: u64,
36 pub target_cell_size: u64,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct HeartbeatLoad {
42 pub bees_total: usize,
43 pub bees_busy: usize,
44 pub bees_idle: usize,
45 pub memory_pressure: f64,
46 pub queue_depth: usize,
47 pub colony_temperature: f64,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct HeartbeatCache {
53 pub size_bytes: u64,
54 pub cached_cells: HashMap<String, u64>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct Heartbeat {
62 pub node_id: String,
63 pub timestamp: DateTime<Utc>,
64 pub version: u64,
65 pub capacity: HeartbeatCapacity,
66 pub load: HeartbeatLoad,
67 pub cache: HeartbeatCache,
68}
69
70pub struct HeartbeatWriter {
76 storage: Arc<dyn StorageBackend>,
77 node_id: NodeId,
78 interval: Duration,
79 version: AtomicU64,
80 bee_pool: Arc<BeePool>,
81 cell_cache: Arc<CellCache>,
82 thermometer: ColonyThermometer,
83 cores: usize,
84 memory_total_bytes: u64,
85 memory_per_bee: u64,
86 target_cell_size: u64,
87}
88
89impl HeartbeatWriter {
90 pub fn new(
92 storage: Arc<dyn StorageBackend>,
93 config: &apiary_core::config::NodeConfig,
94 bee_pool: Arc<BeePool>,
95 cell_cache: Arc<CellCache>,
96 ) -> Self {
97 Self {
98 storage,
99 node_id: config.node_id.clone(),
100 interval: config.heartbeat_interval,
101 version: AtomicU64::new(0),
102 bee_pool,
103 cell_cache,
104 thermometer: ColonyThermometer::default(),
105 cores: config.cores,
106 memory_total_bytes: config.memory_bytes,
107 memory_per_bee: config.memory_per_bee,
108 target_cell_size: config.target_cell_size,
109 }
110 }
111
112 pub async fn collect_heartbeat(&self) -> Heartbeat {
114 let statuses = self.bee_pool.status().await;
115 let bees_total = statuses.len();
116 let bees_busy = statuses.iter().filter(|s| s.state != "idle").count();
117 let bees_idle = bees_total - bees_busy;
118
119 let total_memory_used: u64 = statuses.iter().map(|s| s.memory_used).sum();
120 let total_budget: u64 = statuses.iter().map(|s| s.memory_budget).sum();
121 let memory_pressure = if total_budget > 0 {
122 total_memory_used as f64 / total_budget as f64
123 } else {
124 0.0
125 };
126
127 let colony_temperature = self.thermometer.measure(&self.bee_pool).await;
129
130 let queue_depth = self.bee_pool.queue_size().await;
132
133 let version = self.version.fetch_add(1, Ordering::Relaxed) + 1;
134
135 let cached_cells = self.cell_cache.list_cached_cells().await;
137 let cache_size = self.cell_cache.size();
138
139 Heartbeat {
140 node_id: self.node_id.as_str().to_string(),
141 timestamp: Utc::now(),
142 version,
143 capacity: HeartbeatCapacity {
144 cores: self.cores,
145 memory_total_bytes: self.memory_total_bytes,
146 memory_per_bee: self.memory_per_bee,
147 target_cell_size: self.target_cell_size,
148 },
149 load: HeartbeatLoad {
150 bees_total,
151 bees_busy,
152 bees_idle,
153 memory_pressure,
154 queue_depth,
155 colony_temperature,
156 },
157 cache: HeartbeatCache {
158 size_bytes: cache_size,
159 cached_cells,
160 },
161 }
162 }
163
164 pub async fn write_once(&self) -> Result<()> {
166 let heartbeat = self.collect_heartbeat().await;
167 let key = format!("_heartbeats/node_{}.json", self.node_id);
168 let json = serde_json::to_vec_pretty(&heartbeat)
169 .map_err(|e| ApiaryError::Serialization(e.to_string()))?;
170 self.storage.put(&key, json.into()).await
171 }
172
173 pub async fn run(&self, cancel: tokio::sync::watch::Receiver<bool>) {
175 if let Err(e) = self.write_once().await {
177 warn!(error = %e, "Failed to write initial heartbeat");
178 } else {
179 info!(node_id = %self.node_id, "Heartbeat writer started");
180 }
181
182 loop {
183 tokio::select! {
184 _ = tokio::time::sleep(self.interval) => {
185 if let Err(e) = self.write_once().await {
186 warn!(error = %e, "Failed to write heartbeat");
187 }
188 }
189 _ = wait_for_cancel(&cancel) => {
190 debug!(node_id = %self.node_id, "Heartbeat writer stopping");
191 break;
192 }
193 }
194 }
195 }
196
197 pub async fn delete_heartbeat(&self) -> Result<()> {
199 let key = format!("_heartbeats/node_{}.json", self.node_id);
200 self.storage.delete(&key).await
201 }
202}
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
210pub enum NodeState {
211 Alive,
213 Suspect,
215 Dead,
217}
218
219#[derive(Debug, Clone)]
221pub struct NodeStatus {
222 pub node_id: NodeId,
223 pub heartbeat: Heartbeat,
224 pub state: NodeState,
225}
226
227#[derive(Debug, Clone)]
229pub struct WorldView {
230 pub nodes: HashMap<NodeId, NodeStatus>,
231 pub updated_at: DateTime<Utc>,
232}
233
234impl WorldView {
235 pub fn empty() -> Self {
237 Self {
238 nodes: HashMap::new(),
239 updated_at: Utc::now(),
240 }
241 }
242
243 pub fn alive_nodes(&self) -> Vec<&NodeStatus> {
245 self.nodes
246 .values()
247 .filter(|n| n.state == NodeState::Alive)
248 .collect()
249 }
250
251 pub fn total_bees(&self) -> usize {
253 self.alive_nodes()
254 .iter()
255 .map(|n| n.heartbeat.load.bees_total)
256 .sum()
257 }
258
259 pub fn total_idle_bees(&self) -> usize {
261 self.alive_nodes()
262 .iter()
263 .map(|n| n.heartbeat.load.bees_idle)
264 .sum()
265 }
266}
267
268pub struct WorldViewBuilder {
274 storage: Arc<dyn StorageBackend>,
275 poll_interval: Duration,
276 dead_threshold: Duration,
277 world_view: Arc<RwLock<WorldView>>,
278}
279
280impl WorldViewBuilder {
281 pub fn new(
283 storage: Arc<dyn StorageBackend>,
284 poll_interval: Duration,
285 dead_threshold: Duration,
286 ) -> Self {
287 Self {
288 storage,
289 poll_interval,
290 dead_threshold,
291 world_view: Arc::new(RwLock::new(WorldView::empty())),
292 }
293 }
294
295 pub fn world_view(&self) -> Arc<RwLock<WorldView>> {
297 Arc::clone(&self.world_view)
298 }
299
300 pub async fn build_once(&self) -> Result<WorldView> {
302 let keys = self.storage.list("_heartbeats/").await?;
303 let now = Utc::now();
304 let mut nodes = HashMap::new();
305
306 for key in &keys {
307 if !key.ends_with(".json") {
309 continue;
310 }
311
312 match self.storage.get(key).await {
313 Ok(data) => match serde_json::from_slice::<Heartbeat>(&data) {
314 Ok(hb) => {
315 let age = now
316 .signed_duration_since(hb.timestamp)
317 .to_std()
318 .unwrap_or(Duration::from_secs(86400 * 365));
319
320 let state = if age > self.dead_threshold {
321 NodeState::Dead
322 } else if age > self.dead_threshold / 2 {
323 NodeState::Suspect
324 } else {
325 NodeState::Alive
326 };
327
328 let node_id = NodeId::new(&hb.node_id);
329 nodes.insert(
330 node_id.clone(),
331 NodeStatus {
332 node_id,
333 heartbeat: hb,
334 state,
335 },
336 );
337 }
338 Err(e) => {
339 warn!(key = %key, error = %e, "Failed to parse heartbeat");
340 }
341 },
342 Err(e) => {
343 warn!(key = %key, error = %e, "Failed to read heartbeat file");
344 }
345 }
346 }
347
348 Ok(WorldView {
349 nodes,
350 updated_at: now,
351 })
352 }
353
354 pub async fn poll_once(&self) -> Result<WorldView> {
356 let view = self.build_once().await?;
357 {
358 let mut wv = self.world_view.write().await;
359 *wv = view.clone();
360 }
361 Ok(view)
362 }
363
364 pub async fn run(&self, cancel: tokio::sync::watch::Receiver<bool>) {
366 if let Err(e) = self.poll_once().await {
368 warn!(error = %e, "Failed to build initial world view");
369 } else {
370 info!("World view builder started");
371 }
372
373 loop {
374 tokio::select! {
375 _ = tokio::time::sleep(self.poll_interval) => {
376 if let Err(e) = self.poll_once().await {
377 warn!(error = %e, "Failed to poll world view");
378 }
379 }
380 _ = wait_for_cancel(&cancel) => {
381 debug!("World view builder stopping");
382 break;
383 }
384 }
385 }
386 }
387
388 pub async fn cleanup_stale(&self, cleanup_age: Duration) -> Result<usize> {
390 let view = self.world_view.read().await;
391 let now = Utc::now();
392 let mut cleaned = 0;
393
394 for status in view.nodes.values() {
395 if status.state == NodeState::Dead {
396 let age = now
397 .signed_duration_since(status.heartbeat.timestamp)
398 .to_std()
399 .unwrap_or(Duration::from_secs(0));
400 if age > cleanup_age {
401 let key = format!("_heartbeats/node_{}.json", status.heartbeat.node_id);
402 if let Err(e) = self.storage.delete(&key).await {
403 warn!(key = %key, error = %e, "Failed to clean up stale heartbeat");
404 } else {
405 cleaned += 1;
406 info!(node_id = %status.heartbeat.node_id, "Cleaned up stale heartbeat");
407 }
408 }
409 }
410 }
411
412 Ok(cleaned)
413 }
414}
415
416async fn wait_for_cancel(rx: &tokio::sync::watch::Receiver<bool>) {
422 let mut rx = rx.clone();
423 loop {
425 if *rx.borrow() {
426 return;
427 }
428 if rx.changed().await.is_err() {
429 return;
431 }
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use apiary_core::config::NodeConfig;
439 use apiary_storage::local::LocalBackend;
440
441 async fn make_storage(tmp: &tempfile::TempDir) -> Arc<dyn StorageBackend> {
442 Arc::new(LocalBackend::new(tmp.path()).await.unwrap())
443 }
444
445 fn make_config(
446 node_id: &str,
447 cores: usize,
448 memory: u64,
449 pool_tmp: &tempfile::TempDir,
450 ) -> NodeConfig {
451 let mut config = NodeConfig::detect("local://test");
452 config.node_id = NodeId::new(node_id);
453 config.cores = cores;
454 config.memory_bytes = memory;
455 config.memory_per_bee = if cores > 0 {
456 memory / cores as u64
457 } else {
458 memory
459 };
460 config.target_cell_size = config.memory_per_bee / 4;
461 config.cache_dir = pool_tmp.path().to_path_buf();
462 config
463 }
464
465 fn make_pool(config: &NodeConfig) -> Arc<BeePool> {
466 Arc::new(BeePool::new(config))
467 }
468
469 async fn make_cache(config: &NodeConfig, storage: Arc<dyn StorageBackend>) -> Arc<CellCache> {
470 let cache_dir = config.cache_dir.join("cells");
471 Arc::new(
472 CellCache::new(cache_dir, config.max_cache_size, storage)
473 .await
474 .unwrap(),
475 )
476 }
477
478 #[tokio::test]
479 async fn test_heartbeat_write_and_read() {
480 let tmp = tempfile::TempDir::new().unwrap();
481 let storage = make_storage(&tmp).await;
482 let pool_tmp = tempfile::TempDir::new().unwrap();
483 let config = make_config("test-node", 4, 4 * 1024 * 1024 * 1024, &pool_tmp);
484 let pool = make_pool(&config);
485 let cache = make_cache(&config, Arc::clone(&storage)).await;
486
487 let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
488
489 writer.write_once().await.unwrap();
491
492 let data = storage
494 .get("_heartbeats/node_test-node.json")
495 .await
496 .unwrap();
497 let hb: Heartbeat = serde_json::from_slice(&data).unwrap();
498
499 assert_eq!(hb.node_id, "test-node");
500 assert_eq!(hb.version, 1);
501 assert_eq!(hb.capacity.cores, 4);
502 assert_eq!(hb.load.bees_total, 4); assert_eq!(hb.load.bees_idle, 4);
504 assert_eq!(hb.load.bees_busy, 0);
505 }
506
507 #[tokio::test]
508 async fn test_heartbeat_version_increments() {
509 let tmp = tempfile::TempDir::new().unwrap();
510 let storage = make_storage(&tmp).await;
511 let pool_tmp = tempfile::TempDir::new().unwrap();
512 let config = make_config("inc-node", 2, 1024, &pool_tmp);
513 let pool = make_pool(&config);
514 let cache = make_cache(&config, Arc::clone(&storage)).await;
515
516 let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
517
518 writer.write_once().await.unwrap();
519 writer.write_once().await.unwrap();
520 writer.write_once().await.unwrap();
521
522 let data = storage.get("_heartbeats/node_inc-node.json").await.unwrap();
523 let hb: Heartbeat = serde_json::from_slice(&data).unwrap();
524 assert_eq!(hb.version, 3);
525 }
526
527 #[tokio::test]
528 async fn test_heartbeat_delete() {
529 let tmp = tempfile::TempDir::new().unwrap();
530 let storage = make_storage(&tmp).await;
531 let pool_tmp = tempfile::TempDir::new().unwrap();
532 let config = make_config("del-node", 2, 1024, &pool_tmp);
533 let pool = make_pool(&config);
534 let cache = make_cache(&config, Arc::clone(&storage)).await;
535
536 let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
537
538 writer.write_once().await.unwrap();
539 assert!(storage
540 .exists("_heartbeats/node_del-node.json")
541 .await
542 .unwrap());
543
544 writer.delete_heartbeat().await.unwrap();
545 assert!(!storage
546 .exists("_heartbeats/node_del-node.json")
547 .await
548 .unwrap());
549 }
550
551 #[tokio::test]
552 async fn test_world_view_discovers_nodes() {
553 let tmp = tempfile::TempDir::new().unwrap();
554 let storage = make_storage(&tmp).await;
555 let pool_tmp = tempfile::TempDir::new().unwrap();
556 let config_a = make_config("node-a", 2, 1024, &pool_tmp);
557 let pool = make_pool(&config_a);
558 let cache1 = make_cache(&config_a, Arc::clone(&storage)).await;
559
560 let writer1 = HeartbeatWriter::new(
561 Arc::clone(&storage),
562 &config_a,
563 Arc::clone(&pool),
564 Arc::clone(&cache1),
565 );
566
567 let config_b = make_config("node-b", 4, 2048, &pool_tmp);
568 let cache2 = make_cache(&config_b, Arc::clone(&storage)).await;
569 let writer2 = HeartbeatWriter::new(Arc::clone(&storage), &config_b, pool, cache2);
570
571 writer1.write_once().await.unwrap();
572 writer2.write_once().await.unwrap();
573
574 let builder = WorldViewBuilder::new(
576 Arc::clone(&storage),
577 Duration::from_secs(5),
578 Duration::from_secs(30),
579 );
580 let view = builder.build_once().await.unwrap();
581
582 assert_eq!(view.nodes.len(), 2);
583 assert!(view.nodes.contains_key(&NodeId::new("node-a")));
584 assert!(view.nodes.contains_key(&NodeId::new("node-b")));
585
586 assert_eq!(view.nodes[&NodeId::new("node-a")].state, NodeState::Alive);
588 assert_eq!(view.nodes[&NodeId::new("node-b")].state, NodeState::Alive);
589 }
590
591 #[tokio::test]
592 async fn test_world_view_stale_heartbeat_becomes_dead() {
593 let tmp = tempfile::TempDir::new().unwrap();
594 let storage = make_storage(&tmp).await;
595
596 let stale_hb = Heartbeat {
598 node_id: "stale-node".to_string(),
599 timestamp: Utc::now() - chrono::Duration::seconds(60),
600 version: 1,
601 capacity: HeartbeatCapacity {
602 cores: 2,
603 memory_total_bytes: 1024,
604 memory_per_bee: 512,
605 target_cell_size: 128,
606 },
607 load: HeartbeatLoad {
608 bees_total: 2,
609 bees_busy: 0,
610 bees_idle: 2,
611 memory_pressure: 0.0,
612 queue_depth: 0,
613 colony_temperature: 0.0,
614 },
615 cache: HeartbeatCache {
616 size_bytes: 0,
617 cached_cells: HashMap::new(),
618 },
619 };
620
621 let json = serde_json::to_vec(&stale_hb).unwrap();
622 storage
623 .put("_heartbeats/node_stale-node.json", json.into())
624 .await
625 .unwrap();
626
627 let builder = WorldViewBuilder::new(
628 Arc::clone(&storage),
629 Duration::from_secs(5),
630 Duration::from_secs(30), );
632 let view = builder.build_once().await.unwrap();
633
634 assert_eq!(view.nodes.len(), 1);
635 assert_eq!(
636 view.nodes[&NodeId::new("stale-node")].state,
637 NodeState::Dead
638 );
639 }
640
641 #[tokio::test]
642 async fn test_world_view_suspect_state() {
643 let tmp = tempfile::TempDir::new().unwrap();
644 let storage = make_storage(&tmp).await;
645
646 let suspect_hb = Heartbeat {
648 node_id: "suspect-node".to_string(),
649 timestamp: Utc::now() - chrono::Duration::seconds(20), version: 1,
651 capacity: HeartbeatCapacity {
652 cores: 2,
653 memory_total_bytes: 1024,
654 memory_per_bee: 512,
655 target_cell_size: 128,
656 },
657 load: HeartbeatLoad {
658 bees_total: 2,
659 bees_busy: 0,
660 bees_idle: 2,
661 memory_pressure: 0.0,
662 queue_depth: 0,
663 colony_temperature: 0.0,
664 },
665 cache: HeartbeatCache {
666 size_bytes: 0,
667 cached_cells: HashMap::new(),
668 },
669 };
670
671 let json = serde_json::to_vec(&suspect_hb).unwrap();
672 storage
673 .put("_heartbeats/node_suspect-node.json", json.into())
674 .await
675 .unwrap();
676
677 let builder = WorldViewBuilder::new(
678 Arc::clone(&storage),
679 Duration::from_secs(5),
680 Duration::from_secs(30),
681 );
682 let view = builder.build_once().await.unwrap();
683
684 assert_eq!(
685 view.nodes[&NodeId::new("suspect-node")].state,
686 NodeState::Suspect
687 );
688 }
689
690 #[tokio::test]
691 async fn test_world_view_poll_updates_shared_state() {
692 let tmp = tempfile::TempDir::new().unwrap();
693 let storage = make_storage(&tmp).await;
694 let pool_tmp = tempfile::TempDir::new().unwrap();
695 let config = make_config("shared-node", 2, 1024, &pool_tmp);
696 let pool = make_pool(&config);
697 let cache = make_cache(&config, Arc::clone(&storage)).await;
698
699 let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
700 writer.write_once().await.unwrap();
701
702 let builder = WorldViewBuilder::new(
703 Arc::clone(&storage),
704 Duration::from_secs(5),
705 Duration::from_secs(30),
706 );
707
708 {
710 let wv_arc = builder.world_view();
711 let wv = wv_arc.read().await;
712 assert_eq!(wv.nodes.len(), 0);
713 }
714
715 builder.poll_once().await.unwrap();
717 {
718 let wv_arc = builder.world_view();
719 let wv = wv_arc.read().await;
720 assert_eq!(wv.nodes.len(), 1);
721 assert!(wv.nodes.contains_key(&NodeId::new("shared-node")));
722 }
723 }
724
725 #[tokio::test]
726 async fn test_world_view_graceful_departure() {
727 let tmp = tempfile::TempDir::new().unwrap();
728 let storage = make_storage(&tmp).await;
729 let pool_tmp = tempfile::TempDir::new().unwrap();
730 let config = make_config("departing-node", 2, 1024, &pool_tmp);
731 let pool = make_pool(&config);
732 let cache = make_cache(&config, Arc::clone(&storage)).await;
733
734 let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
735
736 writer.write_once().await.unwrap();
737
738 let builder = WorldViewBuilder::new(
739 Arc::clone(&storage),
740 Duration::from_secs(5),
741 Duration::from_secs(30),
742 );
743 let view = builder.build_once().await.unwrap();
744 assert_eq!(view.nodes.len(), 1);
745
746 writer.delete_heartbeat().await.unwrap();
748
749 let view = builder.build_once().await.unwrap();
751 assert_eq!(view.nodes.len(), 0);
752 }
753
754 #[tokio::test]
755 async fn test_world_view_solo_mode() {
756 let tmp = tempfile::TempDir::new().unwrap();
757 let storage = make_storage(&tmp).await;
758 let pool_tmp = tempfile::TempDir::new().unwrap();
759 let config = make_config("solo-node", 2, 1024, &pool_tmp);
760 let pool = make_pool(&config);
761 let cache = make_cache(&config, Arc::clone(&storage)).await;
762
763 let writer = HeartbeatWriter::new(Arc::clone(&storage), &config, pool, cache);
764 writer.write_once().await.unwrap();
765
766 let builder = WorldViewBuilder::new(
767 Arc::clone(&storage),
768 Duration::from_secs(5),
769 Duration::from_secs(30),
770 );
771 let view = builder.build_once().await.unwrap();
772
773 assert_eq!(view.nodes.len(), 1);
775 assert_eq!(view.alive_nodes().len(), 1);
776 assert_eq!(view.alive_nodes()[0].node_id, NodeId::new("solo-node"));
777 }
778
779 #[tokio::test]
780 async fn test_world_view_totals() {
781 let tmp = tempfile::TempDir::new().unwrap();
782 let storage = make_storage(&tmp).await;
783 let pool_tmp = tempfile::TempDir::new().unwrap();
784 let config_n1 = make_config("n1", 2, 1024, &pool_tmp);
785 let pool = make_pool(&config_n1);
786 let cache1 = make_cache(&config_n1, Arc::clone(&storage)).await;
787
788 let writer1 = HeartbeatWriter::new(
789 Arc::clone(&storage),
790 &config_n1,
791 Arc::clone(&pool),
792 Arc::clone(&cache1),
793 );
794
795 let config_n2 = make_config("n2", 4, 2048, &pool_tmp);
796 let cache2 = make_cache(&config_n2, Arc::clone(&storage)).await;
797 let writer2 = HeartbeatWriter::new(Arc::clone(&storage), &config_n2, pool, cache2);
798
799 writer1.write_once().await.unwrap();
800 writer2.write_once().await.unwrap();
801
802 let builder = WorldViewBuilder::new(
803 Arc::clone(&storage),
804 Duration::from_secs(5),
805 Duration::from_secs(30),
806 );
807 let view = builder.build_once().await.unwrap();
808
809 assert_eq!(view.total_bees(), 4); assert_eq!(view.total_idle_bees(), 4); }
813
814 #[tokio::test]
815 async fn test_cleanup_stale_heartbeats() {
816 let tmp = tempfile::TempDir::new().unwrap();
817 let storage = make_storage(&tmp).await;
818
819 let old_hb = Heartbeat {
821 node_id: "ancient-node".to_string(),
822 timestamp: Utc::now() - chrono::Duration::hours(2),
823 version: 1,
824 capacity: HeartbeatCapacity {
825 cores: 2,
826 memory_total_bytes: 1024,
827 memory_per_bee: 512,
828 target_cell_size: 128,
829 },
830 load: HeartbeatLoad {
831 bees_total: 2,
832 bees_busy: 0,
833 bees_idle: 2,
834 memory_pressure: 0.0,
835 queue_depth: 0,
836 colony_temperature: 0.0,
837 },
838 cache: HeartbeatCache {
839 size_bytes: 0,
840 cached_cells: HashMap::new(),
841 },
842 };
843
844 let json = serde_json::to_vec(&old_hb).unwrap();
845 storage
846 .put("_heartbeats/node_ancient-node.json", json.into())
847 .await
848 .unwrap();
849
850 let builder = WorldViewBuilder::new(
851 Arc::clone(&storage),
852 Duration::from_secs(5),
853 Duration::from_secs(30),
854 );
855
856 builder.poll_once().await.unwrap();
858
859 let cleaned = builder
861 .cleanup_stale(Duration::from_secs(3600))
862 .await
863 .unwrap();
864 assert_eq!(cleaned, 1);
865
866 assert!(!storage
868 .exists("_heartbeats/node_ancient-node.json")
869 .await
870 .unwrap());
871 }
872
873 #[tokio::test]
874 async fn test_heartbeat_writer_with_cancel() {
875 let tmp = tempfile::TempDir::new().unwrap();
876 let storage = make_storage(&tmp).await;
877 let pool_tmp = tempfile::TempDir::new().unwrap();
878 let mut config = make_config("cancel-node", 2, 1024, &pool_tmp);
879 config.heartbeat_interval = Duration::from_millis(50);
880 let pool = make_pool(&config);
881 let cache = make_cache(&config, Arc::clone(&storage)).await;
882
883 let writer = Arc::new(HeartbeatWriter::new(
884 Arc::clone(&storage),
885 &config,
886 pool,
887 cache,
888 ));
889
890 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
891
892 let w = Arc::clone(&writer);
893 let handle = tokio::spawn(async move {
894 w.run(cancel_rx).await;
895 });
896
897 tokio::time::sleep(Duration::from_millis(200)).await;
899
900 cancel_tx.send(true).unwrap();
902 handle.await.unwrap();
903
904 let data = storage
906 .get("_heartbeats/node_cancel-node.json")
907 .await
908 .unwrap();
909 let hb: Heartbeat = serde_json::from_slice(&data).unwrap();
910 assert!(hb.version >= 1);
911 }
912}