Skip to main content

peat_mesh/beacon/
observer.rs

1use super::storage::{BeaconChangeEvent, BeaconStorage};
2use super::types::GeographicBeacon;
3use futures::StreamExt;
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use tracing::{debug, error, info};
8
9/// Observes and tracks nearby geographic beacons
10///
11/// BeaconObserver subscribes to beacon updates from the storage backend
12/// and maintains a cache of nearby beacons based on geohash proximity.
13pub struct BeaconObserver {
14    storage: Arc<dyn BeaconStorage>,
15    my_geohash: String,
16    nearby_beacons: Arc<RwLock<HashMap<String, GeographicBeacon>>>,
17    running: Arc<RwLock<bool>>,
18}
19
20impl BeaconObserver {
21    /// Create a new beacon observer
22    ///
23    /// # Arguments
24    ///
25    /// * `storage` - Storage backend for beacon queries and subscriptions
26    /// * `my_geohash` - This node's geohash for proximity filtering
27    pub fn new(storage: Arc<dyn BeaconStorage>, my_geohash: String) -> Self {
28        Self {
29            storage,
30            my_geohash,
31            nearby_beacons: Arc::new(RwLock::new(HashMap::new())),
32            running: Arc::new(RwLock::new(false)),
33        }
34    }
35
36    /// Start observing beacons
37    ///
38    /// Subscribes to beacon change events from storage and maintains
39    /// a cache of nearby beacons based on geohash proximity.
40    pub async fn start(&self) {
41        let mut running = self.running.write().await;
42        if *running {
43            debug!("Beacon observer already running");
44            return;
45        }
46        *running = true;
47        drop(running);
48
49        info!("Starting beacon observer for geohash {}", self.my_geohash);
50
51        // Subscribe to beacon changes
52        let mut stream = match self.storage.subscribe().await {
53            Ok(s) => s,
54            Err(e) => {
55                error!("Failed to subscribe to beacon changes: {}", e);
56                let mut running = self.running.write().await;
57                *running = false;
58                return;
59            }
60        };
61
62        let running_clone = self.running.clone();
63        let nearby_beacons_clone = self.nearby_beacons.clone();
64        let my_geohash = self.my_geohash.clone();
65
66        tokio::spawn(async move {
67            while *running_clone.read().await {
68                tokio::select! {
69                    Some(event) = stream.next() => {
70                        match event {
71                            BeaconChangeEvent::Inserted(beacon) | BeaconChangeEvent::Updated(beacon) => {
72                                // Check if beacon is nearby
73                                if Self::is_nearby_geohash(&my_geohash, &beacon.geohash) {
74                                    debug!("Nearby beacon detected: {}", beacon.node_id);
75                                    let mut beacons = nearby_beacons_clone.write().await;
76                                    beacons.insert(beacon.node_id.clone(), beacon);
77                                }
78                            }
79                            BeaconChangeEvent::Removed { node_id } => {
80                                debug!("Beacon removed: {}", node_id);
81                                let mut beacons = nearby_beacons_clone.write().await;
82                                beacons.remove(&node_id);
83                            }
84                        }
85                    }
86                    _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
87                        // Periodic check to ensure loop continues
88                    }
89                }
90            }
91            debug!("Beacon observer event loop stopped");
92        });
93    }
94
95    /// Stop observing beacons
96    pub async fn stop(&self) {
97        let mut running = self.running.write().await;
98        *running = false;
99        info!("Stopped beacon observer");
100    }
101
102    /// Get all nearby beacons
103    pub async fn get_nearby_beacons(&self) -> Vec<GeographicBeacon> {
104        self.nearby_beacons.read().await.values().cloned().collect()
105    }
106
107    /// Check if a geohash is nearby (same or adjacent cell)
108    fn is_nearby_geohash(my_geohash: &str, other_geohash: &str) -> bool {
109        use geohash::Direction;
110
111        if my_geohash == other_geohash {
112            return true;
113        }
114
115        // Check all 8 adjacent cells
116        let directions = [
117            Direction::N,
118            Direction::NE,
119            Direction::E,
120            Direction::SE,
121            Direction::S,
122            Direction::SW,
123            Direction::W,
124            Direction::NW,
125        ];
126
127        for dir in &directions {
128            if let Ok(neighbor) = geohash::neighbor(my_geohash, *dir) {
129                if neighbor == other_geohash {
130                    return true;
131                }
132            }
133        }
134
135        false
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use crate::beacon::storage::{BeaconChangeStream, Result};
143    use async_trait::async_trait;
144    use futures::stream;
145    use std::sync::Arc;
146    use tokio::sync::Mutex;
147
148    /// Mock storage for testing
149    struct MockBeaconStorage {
150        beacons: Arc<Mutex<Vec<GeographicBeacon>>>,
151    }
152
153    impl MockBeaconStorage {
154        fn new() -> Self {
155            Self {
156                beacons: Arc::new(Mutex::new(Vec::new())),
157            }
158        }
159    }
160
161    #[async_trait]
162    impl BeaconStorage for MockBeaconStorage {
163        async fn save_beacon(&self, beacon: &GeographicBeacon) -> Result<()> {
164            let mut beacons = self.beacons.lock().await;
165            if let Some(existing) = beacons.iter_mut().find(|b| b.node_id == beacon.node_id) {
166                *existing = beacon.clone();
167            } else {
168                beacons.push(beacon.clone());
169            }
170            Ok(())
171        }
172
173        async fn query_by_geohash(&self, geohash_prefix: &str) -> Result<Vec<GeographicBeacon>> {
174            let beacons = self.beacons.lock().await;
175            Ok(beacons
176                .iter()
177                .filter(|b| b.geohash.starts_with(geohash_prefix))
178                .cloned()
179                .collect())
180        }
181
182        async fn query_all(&self) -> Result<Vec<GeographicBeacon>> {
183            let beacons = self.beacons.lock().await;
184            Ok(beacons.clone())
185        }
186
187        async fn subscribe(&self) -> Result<BeaconChangeStream> {
188            // For testing, return empty stream
189            // In real tests with events, we'd use the event_tx channel
190            Ok(Box::new(stream::empty()))
191        }
192    }
193
194    #[tokio::test]
195    async fn test_observer_lifecycle() {
196        let storage = MockBeaconStorage::new();
197        let observer = BeaconObserver::new(Arc::new(storage), "9q8yy9m".to_string());
198
199        observer.start().await;
200        assert!(*observer.running.read().await);
201
202        observer.stop().await;
203        assert!(!*observer.running.read().await);
204    }
205
206    #[test]
207    fn test_is_nearby_geohash() {
208        // Same geohash should be nearby
209        assert!(BeaconObserver::is_nearby_geohash("9q8yy9m", "9q8yy9m"));
210
211        // Adjacent geohashes should be nearby
212        let north = geohash::neighbor("9q8yy9m", geohash::Direction::N).unwrap();
213        assert!(BeaconObserver::is_nearby_geohash("9q8yy9m", &north));
214
215        // Distant geohash should not be nearby
216        assert!(!BeaconObserver::is_nearby_geohash("9q8yy9m", "u4pruyd")); // Sydney, Australia
217    }
218
219    #[tokio::test]
220    async fn test_observer_filters_nearby_beacons() {
221        let storage = MockBeaconStorage::new();
222        let my_geohash = "9q8yy9m"; // San Francisco area
223
224        let observer = BeaconObserver::new(Arc::new(storage), my_geohash.to_string());
225
226        // Verify empty initially
227        let nearby = observer.get_nearby_beacons().await;
228        assert_eq!(nearby.len(), 0);
229    }
230
231    #[tokio::test]
232    async fn test_get_nearby_beacons() {
233        let storage = MockBeaconStorage::new();
234        let observer = BeaconObserver::new(Arc::new(storage), "9q8yy9m".to_string());
235
236        observer.start().await;
237
238        // Initially empty
239        let nearby = observer.get_nearby_beacons().await;
240        assert_eq!(nearby.len(), 0);
241
242        observer.stop().await;
243    }
244
245    #[tokio::test]
246    async fn test_observer_start_twice() {
247        let storage = MockBeaconStorage::new();
248        let observer = BeaconObserver::new(Arc::new(storage), "9q8yy9m".to_string());
249
250        observer.start().await;
251        assert!(*observer.running.read().await);
252
253        // Starting again should be a no-op (already running)
254        observer.start().await;
255        assert!(*observer.running.read().await);
256
257        observer.stop().await;
258    }
259
260    #[test]
261    fn test_is_nearby_geohash_all_directions() {
262        let my = "9q8yy9m";
263
264        // Check all 8 neighbors
265        let directions = [
266            geohash::Direction::N,
267            geohash::Direction::NE,
268            geohash::Direction::E,
269            geohash::Direction::SE,
270            geohash::Direction::S,
271            geohash::Direction::SW,
272            geohash::Direction::W,
273            geohash::Direction::NW,
274        ];
275
276        for dir in &directions {
277            let neighbor = geohash::neighbor(my, *dir).unwrap();
278            assert!(
279                BeaconObserver::is_nearby_geohash(my, &neighbor),
280                "Neighbor in direction {:?} should be nearby",
281                dir
282            );
283        }
284    }
285
286    #[test]
287    fn test_is_nearby_geohash_same_hash() {
288        assert!(BeaconObserver::is_nearby_geohash("9q8yy9m", "9q8yy9m"));
289    }
290
291    #[test]
292    fn test_is_nearby_geohash_distant() {
293        // Two very different geohashes should not be nearby
294        assert!(!BeaconObserver::is_nearby_geohash("9q8yy9m", "u4pruyd"));
295        assert!(!BeaconObserver::is_nearby_geohash("9q8yy9m", "s00000"));
296    }
297
298    #[tokio::test]
299    async fn test_observer_with_failing_subscribe() {
300        use crate::beacon::storage::StorageError;
301
302        struct FailingStorage;
303
304        #[async_trait]
305        impl BeaconStorage for FailingStorage {
306            async fn save_beacon(
307                &self,
308                _beacon: &crate::beacon::types::GeographicBeacon,
309            ) -> Result<()> {
310                Ok(())
311            }
312
313            async fn query_by_geohash(
314                &self,
315                _geohash_prefix: &str,
316            ) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
317                Ok(vec![])
318            }
319
320            async fn query_all(&self) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
321                Ok(vec![])
322            }
323
324            async fn subscribe(&self) -> Result<BeaconChangeStream> {
325                Err(StorageError::SubscribeFailed("test failure".to_string()))
326            }
327        }
328
329        let observer = BeaconObserver::new(Arc::new(FailingStorage), "9q8yy9m".to_string());
330
331        // Start should handle the subscribe failure gracefully
332        observer.start().await;
333        // After failed subscribe, running should be set back to false
334        assert!(!*observer.running.read().await);
335    }
336
337    #[tokio::test]
338    async fn test_observer_processes_events() {
339        use crate::beacon::types::{GeoPosition, HierarchyLevel};
340
341        struct EventStorage {
342            events: Vec<BeaconChangeEvent>,
343        }
344
345        #[async_trait]
346        impl BeaconStorage for EventStorage {
347            async fn save_beacon(
348                &self,
349                _beacon: &crate::beacon::types::GeographicBeacon,
350            ) -> Result<()> {
351                Ok(())
352            }
353
354            async fn query_by_geohash(
355                &self,
356                _geohash_prefix: &str,
357            ) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
358                Ok(vec![])
359            }
360
361            async fn query_all(&self) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
362                Ok(vec![])
363            }
364
365            async fn subscribe(&self) -> Result<BeaconChangeStream> {
366                let events = self.events.clone();
367                Ok(Box::new(stream::iter(events)))
368            }
369        }
370
371        // Create a beacon that is nearby (same geohash)
372        let my_geohash = "9q8yy9m";
373        let beacon = crate::beacon::types::GeographicBeacon::new(
374            "nearby-node".to_string(),
375            GeoPosition::new(37.7749, -122.4194),
376            HierarchyLevel::Squad,
377        );
378
379        // The beacon's geohash needs to match or be adjacent to my_geohash
380        // We'll use a beacon with the same geohash
381        let mut nearby_beacon = beacon.clone();
382        nearby_beacon.geohash = my_geohash.to_string();
383
384        let storage = EventStorage {
385            events: vec![BeaconChangeEvent::Inserted(nearby_beacon.clone())],
386        };
387
388        let observer = BeaconObserver::new(Arc::new(storage), my_geohash.to_string());
389        observer.start().await;
390
391        // Give the spawned task time to process
392        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
393
394        let nearby = observer.get_nearby_beacons().await;
395        assert_eq!(nearby.len(), 1);
396        assert_eq!(nearby[0].node_id, "nearby-node");
397
398        observer.stop().await;
399    }
400
401    /// Reusable event-based storage mock for observer tests
402    struct EventStorage {
403        events: Vec<BeaconChangeEvent>,
404    }
405
406    impl EventStorage {
407        fn new(events: Vec<BeaconChangeEvent>) -> Self {
408            Self { events }
409        }
410    }
411
412    #[async_trait]
413    impl BeaconStorage for EventStorage {
414        async fn save_beacon(
415            &self,
416            _beacon: &crate::beacon::types::GeographicBeacon,
417        ) -> Result<()> {
418            Ok(())
419        }
420        async fn query_by_geohash(
421            &self,
422            _geohash_prefix: &str,
423        ) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
424            Ok(vec![])
425        }
426        async fn query_all(&self) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
427            Ok(vec![])
428        }
429        async fn subscribe(&self) -> Result<BeaconChangeStream> {
430            let events = self.events.clone();
431            Ok(Box::new(stream::iter(events)))
432        }
433    }
434
435    fn make_nearby_beacon(node_id: &str, geohash: &str) -> crate::beacon::types::GeographicBeacon {
436        use crate::beacon::types::{GeoPosition, HierarchyLevel};
437        let mut beacon = crate::beacon::types::GeographicBeacon::new(
438            node_id.to_string(),
439            GeoPosition::new(37.7749, -122.4194),
440            HierarchyLevel::Squad,
441        );
442        beacon.geohash = geohash.to_string();
443        beacon
444    }
445
446    #[tokio::test]
447    async fn test_observer_processes_removed_event() {
448        let my_geohash = "9q8yy9m";
449        let beacon = make_nearby_beacon("remove-me", my_geohash);
450
451        let storage = EventStorage::new(vec![
452            BeaconChangeEvent::Inserted(beacon),
453            BeaconChangeEvent::Removed {
454                node_id: "remove-me".to_string(),
455            },
456        ]);
457
458        let observer = BeaconObserver::new(Arc::new(storage), my_geohash.to_string());
459        observer.start().await;
460        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
461
462        let nearby = observer.get_nearby_beacons().await;
463        assert_eq!(nearby.len(), 0);
464
465        observer.stop().await;
466    }
467
468    #[tokio::test]
469    async fn test_observer_processes_updated_event() {
470        let my_geohash = "9q8yy9m";
471        let beacon = make_nearby_beacon("update-me", my_geohash);
472        let updated = make_nearby_beacon("update-me", my_geohash);
473
474        let storage = EventStorage::new(vec![
475            BeaconChangeEvent::Inserted(beacon),
476            BeaconChangeEvent::Updated(updated),
477        ]);
478
479        let observer = BeaconObserver::new(Arc::new(storage), my_geohash.to_string());
480        observer.start().await;
481        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
482
483        let nearby = observer.get_nearby_beacons().await;
484        assert_eq!(nearby.len(), 1);
485        assert_eq!(nearby[0].node_id, "update-me");
486
487        observer.stop().await;
488    }
489
490    #[tokio::test]
491    async fn test_observer_ignores_distant_beacons() {
492        let my_geohash = "9q8yy9m";
493        let distant = make_nearby_beacon("far-away", "u4pruyd");
494
495        let storage = EventStorage::new(vec![BeaconChangeEvent::Inserted(distant)]);
496
497        let observer = BeaconObserver::new(Arc::new(storage), my_geohash.to_string());
498        observer.start().await;
499        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
500
501        let nearby = observer.get_nearby_beacons().await;
502        assert_eq!(nearby.len(), 0);
503
504        observer.stop().await;
505    }
506
507    #[tokio::test]
508    async fn test_event_storage_methods() {
509        // Ensure mock storage methods are exercised for coverage
510        use crate::beacon::types::{GeoPosition, HierarchyLevel};
511        let storage = EventStorage::new(vec![]);
512        let beacon = crate::beacon::types::GeographicBeacon::new(
513            "test".to_string(),
514            GeoPosition::new(37.7749, -122.4194),
515            HierarchyLevel::Squad,
516        );
517        storage.save_beacon(&beacon).await.unwrap();
518        let results = storage.query_by_geohash("9q8").await.unwrap();
519        assert!(results.is_empty());
520        let all = storage.query_all().await.unwrap();
521        assert!(all.is_empty());
522    }
523
524    #[tokio::test]
525    async fn test_mock_storage_methods() {
526        use crate::beacon::types::{GeoPosition, HierarchyLevel};
527        let storage = MockBeaconStorage::new();
528        let beacon = crate::beacon::types::GeographicBeacon::new(
529            "test".to_string(),
530            GeoPosition::new(37.7749, -122.4194),
531            HierarchyLevel::Squad,
532        );
533        storage.save_beacon(&beacon).await.unwrap();
534        let results = storage.query_by_geohash("9q8").await.unwrap();
535        assert_eq!(results.len(), 1);
536        let all = storage.query_all().await.unwrap();
537        assert_eq!(all.len(), 1);
538        // Save again (update path)
539        storage.save_beacon(&beacon).await.unwrap();
540        let all = storage.query_all().await.unwrap();
541        assert_eq!(all.len(), 1);
542    }
543}