Skip to main content

peat_mesh/beacon/
storage.rs

1use super::types::GeographicBeacon;
2use async_trait::async_trait;
3use std::error::Error as StdError;
4use std::fmt;
5
6/// Error type for beacon storage operations
7#[derive(Debug)]
8pub enum StorageError {
9    /// Failed to save beacon
10    SaveFailed(String),
11
12    /// Failed to query beacons
13    QueryFailed(String),
14
15    /// Failed to subscribe to beacon updates
16    SubscribeFailed(String),
17
18    /// Generic storage error
19    Other(Box<dyn StdError + Send + Sync>),
20}
21
22impl fmt::Display for StorageError {
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        match self {
25            StorageError::SaveFailed(msg) => write!(f, "Save failed: {}", msg),
26            StorageError::QueryFailed(msg) => write!(f, "Query failed: {}", msg),
27            StorageError::SubscribeFailed(msg) => write!(f, "Subscribe failed: {}", msg),
28            StorageError::Other(err) => write!(f, "Storage error: {}", err),
29        }
30    }
31}
32
33impl StdError for StorageError {
34    fn source(&self) -> Option<&(dyn StdError + 'static)> {
35        match self {
36            StorageError::Other(err) => Some(err.as_ref()),
37            _ => None,
38        }
39    }
40}
41
42pub type Result<T> = std::result::Result<T, StorageError>;
43
44/// Storage abstraction for geographic beacons
45///
46/// This trait defines the storage operations needed by the beacon system
47/// without coupling to any specific backend implementation. Implementations
48/// can use Ditto, in-memory storage, or any other CRDT-based system.
49///
50/// # Design Principles
51///
52/// - **Backend Agnostic**: No direct dependency on storage backends
53/// - **Testable**: Easy to mock for unit tests
54/// - **Async**: All operations are async for non-blocking I/O
55/// - **CRDT-Friendly**: Designed for eventual consistency
56#[async_trait]
57pub trait BeaconStorage: Send + Sync {
58    /// Save a beacon to storage
59    ///
60    /// This operation should be idempotent - saving the same beacon
61    /// multiple times should result in the same state.
62    ///
63    /// # Arguments
64    ///
65    /// * `beacon` - The geographic beacon to save
66    ///
67    /// # Returns
68    ///
69    /// * `Ok(())` - Beacon saved successfully
70    /// * `Err(StorageError)` - Save operation failed
71    async fn save_beacon(&self, beacon: &GeographicBeacon) -> Result<()>;
72
73    /// Query beacons by geohash prefix
74    ///
75    /// Retrieves all beacons matching the given geohash prefix.
76    /// This allows for proximity-based queries.
77    ///
78    /// # Arguments
79    ///
80    /// * `geohash_prefix` - Geohash prefix to match (e.g., "9q8" for San Francisco area)
81    ///
82    /// # Returns
83    ///
84    /// * `Ok(Vec<GeographicBeacon>)` - List of matching beacons
85    /// * `Err(StorageError)` - Query operation failed
86    async fn query_by_geohash(&self, geohash_prefix: &str) -> Result<Vec<GeographicBeacon>>;
87
88    /// Query all beacons
89    ///
90    /// Retrieves all beacons in the system. Use with caution in
91    /// large deployments.
92    ///
93    /// # Returns
94    ///
95    /// * `Ok(Vec<GeographicBeacon>)` - List of all beacons
96    /// * `Err(StorageError)` - Query operation failed
97    async fn query_all(&self) -> Result<Vec<GeographicBeacon>>;
98
99    /// Subscribe to beacon changes
100    ///
101    /// Returns a stream of beacon updates (inserts, updates, deletes).
102    /// Implementations should use CRDT change notifications to provide
103    /// real-time updates.
104    ///
105    /// # Returns
106    ///
107    /// * `Ok(BeaconChangeStream)` - Stream of beacon changes
108    /// * `Err(StorageError)` - Subscribe operation failed
109    async fn subscribe(&self) -> Result<BeaconChangeStream>;
110}
111
112/// Type alias for beacon change stream
113///
114/// This represents a stream of beacon change events that observers
115/// can consume to track beacon updates in real-time.
116pub type BeaconChangeStream = Box<dyn futures::Stream<Item = BeaconChangeEvent> + Send + Unpin>;
117
118/// Beacon change event
119///
120/// Represents a change to a beacon in the storage system.
121#[derive(Debug, Clone)]
122pub enum BeaconChangeEvent {
123    /// A new beacon was inserted
124    Inserted(GeographicBeacon),
125
126    /// An existing beacon was updated
127    Updated(GeographicBeacon),
128
129    /// A beacon was removed (expired or explicitly deleted)
130    Removed { node_id: String },
131}
132
133#[cfg(test)]
134pub use tests::MockBeaconStorage;
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139    use crate::beacon::types::{GeoPosition, HierarchyLevel};
140    use futures::stream;
141    use std::sync::Arc;
142    use tokio::sync::Mutex;
143
144    /// Mock in-memory beacon storage for testing
145    pub struct MockBeaconStorage {
146        beacons: Arc<Mutex<Vec<GeographicBeacon>>>,
147    }
148
149    impl Default for MockBeaconStorage {
150        fn default() -> Self {
151            Self::new()
152        }
153    }
154
155    impl MockBeaconStorage {
156        pub fn new() -> Self {
157            Self {
158                beacons: Arc::new(Mutex::new(Vec::new())),
159            }
160        }
161    }
162
163    #[async_trait]
164    impl BeaconStorage for MockBeaconStorage {
165        async fn save_beacon(&self, beacon: &GeographicBeacon) -> Result<()> {
166            let mut beacons = self.beacons.lock().await;
167
168            // Update existing or insert new
169            if let Some(existing) = beacons.iter_mut().find(|b| b.node_id == beacon.node_id) {
170                *existing = beacon.clone();
171            } else {
172                beacons.push(beacon.clone());
173            }
174
175            Ok(())
176        }
177
178        async fn query_by_geohash(&self, geohash_prefix: &str) -> Result<Vec<GeographicBeacon>> {
179            let beacons = self.beacons.lock().await;
180            Ok(beacons
181                .iter()
182                .filter(|b| b.geohash.starts_with(geohash_prefix))
183                .cloned()
184                .collect())
185        }
186
187        async fn query_all(&self) -> Result<Vec<GeographicBeacon>> {
188            let beacons = self.beacons.lock().await;
189            Ok(beacons.clone())
190        }
191
192        async fn subscribe(&self) -> Result<BeaconChangeStream> {
193            // Return empty stream for mock
194            Ok(Box::new(stream::empty()))
195        }
196    }
197
198    #[tokio::test]
199    async fn test_mock_storage_save_and_query() {
200        let storage = MockBeaconStorage::new();
201
202        let beacon = GeographicBeacon::new(
203            "test-node".to_string(),
204            GeoPosition::new(37.7749, -122.4194),
205            HierarchyLevel::Platform,
206        );
207
208        // Save beacon
209        storage.save_beacon(&beacon).await.unwrap();
210
211        // Query all
212        let all_beacons = storage.query_all().await.unwrap();
213        assert_eq!(all_beacons.len(), 1);
214        assert_eq!(all_beacons[0].node_id, "test-node");
215
216        // Query by geohash prefix
217        let nearby = storage.query_by_geohash("9q8").await.unwrap();
218        assert_eq!(nearby.len(), 1);
219    }
220
221    #[tokio::test]
222    async fn test_mock_storage_idempotent_save() {
223        let storage = MockBeaconStorage::new();
224
225        let beacon = GeographicBeacon::new(
226            "test-node".to_string(),
227            GeoPosition::new(37.7749, -122.4194),
228            HierarchyLevel::Platform,
229        );
230
231        // Save same beacon twice
232        storage.save_beacon(&beacon).await.unwrap();
233        storage.save_beacon(&beacon).await.unwrap();
234
235        // Should only have one beacon
236        let all_beacons = storage.query_all().await.unwrap();
237        assert_eq!(all_beacons.len(), 1);
238    }
239
240    #[test]
241    fn test_storage_error_display() {
242        let err = StorageError::SaveFailed("disk full".to_string());
243        assert_eq!(err.to_string(), "Save failed: disk full");
244
245        let err = StorageError::QueryFailed("timeout".to_string());
246        assert_eq!(err.to_string(), "Query failed: timeout");
247
248        let err = StorageError::SubscribeFailed("connection lost".to_string());
249        assert_eq!(err.to_string(), "Subscribe failed: connection lost");
250
251        let inner = std::io::Error::new(std::io::ErrorKind::Other, "io error");
252        let err = StorageError::Other(Box::new(inner));
253        assert!(err.to_string().contains("Storage error"));
254    }
255
256    #[test]
257    fn test_storage_error_source() {
258        // SaveFailed, QueryFailed, SubscribeFailed have no source
259        let err = StorageError::SaveFailed("test".to_string());
260        assert!(err.source().is_none());
261
262        let err = StorageError::QueryFailed("test".to_string());
263        assert!(err.source().is_none());
264
265        let err = StorageError::SubscribeFailed("test".to_string());
266        assert!(err.source().is_none());
267
268        // Other has a source
269        let inner = std::io::Error::new(std::io::ErrorKind::Other, "io error");
270        let err = StorageError::Other(Box::new(inner));
271        assert!(err.source().is_some());
272    }
273
274    #[test]
275    fn test_storage_error_debug() {
276        let err = StorageError::SaveFailed("test".to_string());
277        let debug_str = format!("{:?}", err);
278        assert!(debug_str.contains("SaveFailed"));
279    }
280
281    #[test]
282    fn test_beacon_change_event_variants() {
283        let beacon = GeographicBeacon::new(
284            "node-1".to_string(),
285            GeoPosition::new(37.7749, -122.4194),
286            HierarchyLevel::Squad,
287        );
288
289        let inserted = BeaconChangeEvent::Inserted(beacon.clone());
290        let updated = BeaconChangeEvent::Updated(beacon);
291        let removed = BeaconChangeEvent::Removed {
292            node_id: "node-1".to_string(),
293        };
294
295        // Verify Debug works
296        let _ = format!("{:?}", inserted);
297        let _ = format!("{:?}", updated);
298        let _ = format!("{:?}", removed);
299    }
300
301    #[tokio::test]
302    async fn test_mock_storage_query_by_geohash_no_match() {
303        let storage = MockBeaconStorage::new();
304
305        let beacon = GeographicBeacon::new(
306            "test-node".to_string(),
307            GeoPosition::new(37.7749, -122.4194),
308            HierarchyLevel::Platform,
309        );
310        storage.save_beacon(&beacon).await.unwrap();
311
312        // Query with a non-matching prefix
313        let result = storage.query_by_geohash("xyz").await.unwrap();
314        assert!(result.is_empty());
315    }
316
317    #[tokio::test]
318    async fn test_mock_storage_multiple_beacons() {
319        let storage = MockBeaconStorage::new();
320
321        let beacon1 = GeographicBeacon::new(
322            "node-1".to_string(),
323            GeoPosition::new(37.7749, -122.4194),
324            HierarchyLevel::Squad,
325        );
326        let beacon2 = GeographicBeacon::new(
327            "node-2".to_string(),
328            GeoPosition::new(37.7750, -122.4195),
329            HierarchyLevel::Squad,
330        );
331        let beacon3 = GeographicBeacon::new(
332            "node-3".to_string(),
333            GeoPosition::new(40.7128, -74.0060), // New York
334            HierarchyLevel::Platoon,
335        );
336
337        storage.save_beacon(&beacon1).await.unwrap();
338        storage.save_beacon(&beacon2).await.unwrap();
339        storage.save_beacon(&beacon3).await.unwrap();
340
341        let all = storage.query_all().await.unwrap();
342        assert_eq!(all.len(), 3);
343
344        // Both SF beacons start with "9q8"
345        let sf_beacons = storage.query_by_geohash("9q8").await.unwrap();
346        assert_eq!(sf_beacons.len(), 2);
347    }
348
349    #[tokio::test]
350    async fn test_mock_storage_subscribe_returns_empty() {
351        let storage = MockBeaconStorage::new();
352        let stream = storage.subscribe().await;
353        assert!(stream.is_ok());
354    }
355
356    #[test]
357    fn test_mock_beacon_storage_default() {
358        let storage = MockBeaconStorage::default();
359        // Should work the same as new()
360        let _ = format!("{:?}", "created storage");
361        assert!(storage.beacons.try_lock().unwrap().is_empty());
362    }
363}