1use super::storage::{BeaconChangeEvent, BeaconStorage};
2use super::types::GeographicBeacon;
3use futures::StreamExt;
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use tracing::{debug, error, info};
8
9pub struct BeaconObserver {
14 storage: Arc<dyn BeaconStorage>,
15 my_geohash: String,
16 nearby_beacons: Arc<RwLock<HashMap<String, GeographicBeacon>>>,
17 running: Arc<RwLock<bool>>,
18}
19
20impl BeaconObserver {
21 pub fn new(storage: Arc<dyn BeaconStorage>, my_geohash: String) -> Self {
28 Self {
29 storage,
30 my_geohash,
31 nearby_beacons: Arc::new(RwLock::new(HashMap::new())),
32 running: Arc::new(RwLock::new(false)),
33 }
34 }
35
36 pub async fn start(&self) {
41 let mut running = self.running.write().await;
42 if *running {
43 debug!("Beacon observer already running");
44 return;
45 }
46 *running = true;
47 drop(running);
48
49 info!("Starting beacon observer for geohash {}", self.my_geohash);
50
51 let mut stream = match self.storage.subscribe().await {
53 Ok(s) => s,
54 Err(e) => {
55 error!("Failed to subscribe to beacon changes: {}", e);
56 let mut running = self.running.write().await;
57 *running = false;
58 return;
59 }
60 };
61
62 let running_clone = self.running.clone();
63 let nearby_beacons_clone = self.nearby_beacons.clone();
64 let my_geohash = self.my_geohash.clone();
65
66 tokio::spawn(async move {
67 while *running_clone.read().await {
68 tokio::select! {
69 Some(event) = stream.next() => {
70 match event {
71 BeaconChangeEvent::Inserted(beacon) | BeaconChangeEvent::Updated(beacon) => {
72 if Self::is_nearby_geohash(&my_geohash, &beacon.geohash) {
74 debug!("Nearby beacon detected: {}", beacon.node_id);
75 let mut beacons = nearby_beacons_clone.write().await;
76 beacons.insert(beacon.node_id.clone(), beacon);
77 }
78 }
79 BeaconChangeEvent::Removed { node_id } => {
80 debug!("Beacon removed: {}", node_id);
81 let mut beacons = nearby_beacons_clone.write().await;
82 beacons.remove(&node_id);
83 }
84 }
85 }
86 _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
87 }
89 }
90 }
91 debug!("Beacon observer event loop stopped");
92 });
93 }
94
95 pub async fn stop(&self) {
97 let mut running = self.running.write().await;
98 *running = false;
99 info!("Stopped beacon observer");
100 }
101
102 pub async fn get_nearby_beacons(&self) -> Vec<GeographicBeacon> {
104 self.nearby_beacons.read().await.values().cloned().collect()
105 }
106
107 fn is_nearby_geohash(my_geohash: &str, other_geohash: &str) -> bool {
109 use geohash::Direction;
110
111 if my_geohash == other_geohash {
112 return true;
113 }
114
115 let directions = [
117 Direction::N,
118 Direction::NE,
119 Direction::E,
120 Direction::SE,
121 Direction::S,
122 Direction::SW,
123 Direction::W,
124 Direction::NW,
125 ];
126
127 for dir in &directions {
128 if let Ok(neighbor) = geohash::neighbor(my_geohash, *dir) {
129 if neighbor == other_geohash {
130 return true;
131 }
132 }
133 }
134
135 false
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use crate::beacon::storage::{BeaconChangeStream, Result};
143 use async_trait::async_trait;
144 use futures::stream;
145 use std::sync::Arc;
146 use tokio::sync::Mutex;
147
148 struct MockBeaconStorage {
150 beacons: Arc<Mutex<Vec<GeographicBeacon>>>,
151 }
152
153 impl MockBeaconStorage {
154 fn new() -> Self {
155 Self {
156 beacons: Arc::new(Mutex::new(Vec::new())),
157 }
158 }
159 }
160
161 #[async_trait]
162 impl BeaconStorage for MockBeaconStorage {
163 async fn save_beacon(&self, beacon: &GeographicBeacon) -> Result<()> {
164 let mut beacons = self.beacons.lock().await;
165 if let Some(existing) = beacons.iter_mut().find(|b| b.node_id == beacon.node_id) {
166 *existing = beacon.clone();
167 } else {
168 beacons.push(beacon.clone());
169 }
170 Ok(())
171 }
172
173 async fn query_by_geohash(&self, geohash_prefix: &str) -> Result<Vec<GeographicBeacon>> {
174 let beacons = self.beacons.lock().await;
175 Ok(beacons
176 .iter()
177 .filter(|b| b.geohash.starts_with(geohash_prefix))
178 .cloned()
179 .collect())
180 }
181
182 async fn query_all(&self) -> Result<Vec<GeographicBeacon>> {
183 let beacons = self.beacons.lock().await;
184 Ok(beacons.clone())
185 }
186
187 async fn subscribe(&self) -> Result<BeaconChangeStream> {
188 Ok(Box::new(stream::empty()))
191 }
192 }
193
194 #[tokio::test]
195 async fn test_observer_lifecycle() {
196 let storage = MockBeaconStorage::new();
197 let observer = BeaconObserver::new(Arc::new(storage), "9q8yy9m".to_string());
198
199 observer.start().await;
200 assert!(*observer.running.read().await);
201
202 observer.stop().await;
203 assert!(!*observer.running.read().await);
204 }
205
206 #[test]
207 fn test_is_nearby_geohash() {
208 assert!(BeaconObserver::is_nearby_geohash("9q8yy9m", "9q8yy9m"));
210
211 let north = geohash::neighbor("9q8yy9m", geohash::Direction::N).unwrap();
213 assert!(BeaconObserver::is_nearby_geohash("9q8yy9m", &north));
214
215 assert!(!BeaconObserver::is_nearby_geohash("9q8yy9m", "u4pruyd")); }
218
219 #[tokio::test]
220 async fn test_observer_filters_nearby_beacons() {
221 let storage = MockBeaconStorage::new();
222 let my_geohash = "9q8yy9m"; let observer = BeaconObserver::new(Arc::new(storage), my_geohash.to_string());
225
226 let nearby = observer.get_nearby_beacons().await;
228 assert_eq!(nearby.len(), 0);
229 }
230
231 #[tokio::test]
232 async fn test_get_nearby_beacons() {
233 let storage = MockBeaconStorage::new();
234 let observer = BeaconObserver::new(Arc::new(storage), "9q8yy9m".to_string());
235
236 observer.start().await;
237
238 let nearby = observer.get_nearby_beacons().await;
240 assert_eq!(nearby.len(), 0);
241
242 observer.stop().await;
243 }
244
245 #[tokio::test]
246 async fn test_observer_start_twice() {
247 let storage = MockBeaconStorage::new();
248 let observer = BeaconObserver::new(Arc::new(storage), "9q8yy9m".to_string());
249
250 observer.start().await;
251 assert!(*observer.running.read().await);
252
253 observer.start().await;
255 assert!(*observer.running.read().await);
256
257 observer.stop().await;
258 }
259
260 #[test]
261 fn test_is_nearby_geohash_all_directions() {
262 let my = "9q8yy9m";
263
264 let directions = [
266 geohash::Direction::N,
267 geohash::Direction::NE,
268 geohash::Direction::E,
269 geohash::Direction::SE,
270 geohash::Direction::S,
271 geohash::Direction::SW,
272 geohash::Direction::W,
273 geohash::Direction::NW,
274 ];
275
276 for dir in &directions {
277 let neighbor = geohash::neighbor(my, *dir).unwrap();
278 assert!(
279 BeaconObserver::is_nearby_geohash(my, &neighbor),
280 "Neighbor in direction {:?} should be nearby",
281 dir
282 );
283 }
284 }
285
286 #[test]
287 fn test_is_nearby_geohash_same_hash() {
288 assert!(BeaconObserver::is_nearby_geohash("9q8yy9m", "9q8yy9m"));
289 }
290
291 #[test]
292 fn test_is_nearby_geohash_distant() {
293 assert!(!BeaconObserver::is_nearby_geohash("9q8yy9m", "u4pruyd"));
295 assert!(!BeaconObserver::is_nearby_geohash("9q8yy9m", "s00000"));
296 }
297
298 #[tokio::test]
299 async fn test_observer_with_failing_subscribe() {
300 use crate::beacon::storage::StorageError;
301
302 struct FailingStorage;
303
304 #[async_trait]
305 impl BeaconStorage for FailingStorage {
306 async fn save_beacon(
307 &self,
308 _beacon: &crate::beacon::types::GeographicBeacon,
309 ) -> Result<()> {
310 Ok(())
311 }
312
313 async fn query_by_geohash(
314 &self,
315 _geohash_prefix: &str,
316 ) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
317 Ok(vec![])
318 }
319
320 async fn query_all(&self) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
321 Ok(vec![])
322 }
323
324 async fn subscribe(&self) -> Result<BeaconChangeStream> {
325 Err(StorageError::SubscribeFailed("test failure".to_string()))
326 }
327 }
328
329 let observer = BeaconObserver::new(Arc::new(FailingStorage), "9q8yy9m".to_string());
330
331 observer.start().await;
333 assert!(!*observer.running.read().await);
335 }
336
337 #[tokio::test]
338 async fn test_observer_processes_events() {
339 use crate::beacon::types::{GeoPosition, HierarchyLevel};
340
341 struct EventStorage {
342 events: Vec<BeaconChangeEvent>,
343 }
344
345 #[async_trait]
346 impl BeaconStorage for EventStorage {
347 async fn save_beacon(
348 &self,
349 _beacon: &crate::beacon::types::GeographicBeacon,
350 ) -> Result<()> {
351 Ok(())
352 }
353
354 async fn query_by_geohash(
355 &self,
356 _geohash_prefix: &str,
357 ) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
358 Ok(vec![])
359 }
360
361 async fn query_all(&self) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
362 Ok(vec![])
363 }
364
365 async fn subscribe(&self) -> Result<BeaconChangeStream> {
366 let events = self.events.clone();
367 Ok(Box::new(stream::iter(events)))
368 }
369 }
370
371 let my_geohash = "9q8yy9m";
373 let beacon = crate::beacon::types::GeographicBeacon::new(
374 "nearby-node".to_string(),
375 GeoPosition::new(37.7749, -122.4194),
376 HierarchyLevel::Squad,
377 );
378
379 let mut nearby_beacon = beacon.clone();
382 nearby_beacon.geohash = my_geohash.to_string();
383
384 let storage = EventStorage {
385 events: vec![BeaconChangeEvent::Inserted(nearby_beacon.clone())],
386 };
387
388 let observer = BeaconObserver::new(Arc::new(storage), my_geohash.to_string());
389 observer.start().await;
390
391 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
393
394 let nearby = observer.get_nearby_beacons().await;
395 assert_eq!(nearby.len(), 1);
396 assert_eq!(nearby[0].node_id, "nearby-node");
397
398 observer.stop().await;
399 }
400
401 struct EventStorage {
403 events: Vec<BeaconChangeEvent>,
404 }
405
406 impl EventStorage {
407 fn new(events: Vec<BeaconChangeEvent>) -> Self {
408 Self { events }
409 }
410 }
411
412 #[async_trait]
413 impl BeaconStorage for EventStorage {
414 async fn save_beacon(
415 &self,
416 _beacon: &crate::beacon::types::GeographicBeacon,
417 ) -> Result<()> {
418 Ok(())
419 }
420 async fn query_by_geohash(
421 &self,
422 _geohash_prefix: &str,
423 ) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
424 Ok(vec![])
425 }
426 async fn query_all(&self) -> Result<Vec<crate::beacon::types::GeographicBeacon>> {
427 Ok(vec![])
428 }
429 async fn subscribe(&self) -> Result<BeaconChangeStream> {
430 let events = self.events.clone();
431 Ok(Box::new(stream::iter(events)))
432 }
433 }
434
435 fn make_nearby_beacon(node_id: &str, geohash: &str) -> crate::beacon::types::GeographicBeacon {
436 use crate::beacon::types::{GeoPosition, HierarchyLevel};
437 let mut beacon = crate::beacon::types::GeographicBeacon::new(
438 node_id.to_string(),
439 GeoPosition::new(37.7749, -122.4194),
440 HierarchyLevel::Squad,
441 );
442 beacon.geohash = geohash.to_string();
443 beacon
444 }
445
446 #[tokio::test]
447 async fn test_observer_processes_removed_event() {
448 let my_geohash = "9q8yy9m";
449 let beacon = make_nearby_beacon("remove-me", my_geohash);
450
451 let storage = EventStorage::new(vec![
452 BeaconChangeEvent::Inserted(beacon),
453 BeaconChangeEvent::Removed {
454 node_id: "remove-me".to_string(),
455 },
456 ]);
457
458 let observer = BeaconObserver::new(Arc::new(storage), my_geohash.to_string());
459 observer.start().await;
460 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
461
462 let nearby = observer.get_nearby_beacons().await;
463 assert_eq!(nearby.len(), 0);
464
465 observer.stop().await;
466 }
467
468 #[tokio::test]
469 async fn test_observer_processes_updated_event() {
470 let my_geohash = "9q8yy9m";
471 let beacon = make_nearby_beacon("update-me", my_geohash);
472 let updated = make_nearby_beacon("update-me", my_geohash);
473
474 let storage = EventStorage::new(vec![
475 BeaconChangeEvent::Inserted(beacon),
476 BeaconChangeEvent::Updated(updated),
477 ]);
478
479 let observer = BeaconObserver::new(Arc::new(storage), my_geohash.to_string());
480 observer.start().await;
481 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
482
483 let nearby = observer.get_nearby_beacons().await;
484 assert_eq!(nearby.len(), 1);
485 assert_eq!(nearby[0].node_id, "update-me");
486
487 observer.stop().await;
488 }
489
490 #[tokio::test]
491 async fn test_observer_ignores_distant_beacons() {
492 let my_geohash = "9q8yy9m";
493 let distant = make_nearby_beacon("far-away", "u4pruyd");
494
495 let storage = EventStorage::new(vec![BeaconChangeEvent::Inserted(distant)]);
496
497 let observer = BeaconObserver::new(Arc::new(storage), my_geohash.to_string());
498 observer.start().await;
499 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
500
501 let nearby = observer.get_nearby_beacons().await;
502 assert_eq!(nearby.len(), 0);
503
504 observer.stop().await;
505 }
506
507 #[tokio::test]
508 async fn test_event_storage_methods() {
509 use crate::beacon::types::{GeoPosition, HierarchyLevel};
511 let storage = EventStorage::new(vec![]);
512 let beacon = crate::beacon::types::GeographicBeacon::new(
513 "test".to_string(),
514 GeoPosition::new(37.7749, -122.4194),
515 HierarchyLevel::Squad,
516 );
517 storage.save_beacon(&beacon).await.unwrap();
518 let results = storage.query_by_geohash("9q8").await.unwrap();
519 assert!(results.is_empty());
520 let all = storage.query_all().await.unwrap();
521 assert!(all.is_empty());
522 }
523
524 #[tokio::test]
525 async fn test_mock_storage_methods() {
526 use crate::beacon::types::{GeoPosition, HierarchyLevel};
527 let storage = MockBeaconStorage::new();
528 let beacon = crate::beacon::types::GeographicBeacon::new(
529 "test".to_string(),
530 GeoPosition::new(37.7749, -122.4194),
531 HierarchyLevel::Squad,
532 );
533 storage.save_beacon(&beacon).await.unwrap();
534 let results = storage.query_by_geohash("9q8").await.unwrap();
535 assert_eq!(results.len(), 1);
536 let all = storage.query_all().await.unwrap();
537 assert_eq!(all.len(), 1);
538 storage.save_beacon(&beacon).await.unwrap();
540 let all = storage.query_all().await.unwrap();
541 assert_eq!(all.len(), 1);
542 }
543}