peat_mesh/beacon/
storage.rs1use super::types::GeographicBeacon;
2use async_trait::async_trait;
3use std::error::Error as StdError;
4use std::fmt;
5
6#[derive(Debug)]
8pub enum StorageError {
9 SaveFailed(String),
11
12 QueryFailed(String),
14
15 SubscribeFailed(String),
17
18 Other(Box<dyn StdError + Send + Sync>),
20}
21
22impl fmt::Display for StorageError {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 match self {
25 StorageError::SaveFailed(msg) => write!(f, "Save failed: {}", msg),
26 StorageError::QueryFailed(msg) => write!(f, "Query failed: {}", msg),
27 StorageError::SubscribeFailed(msg) => write!(f, "Subscribe failed: {}", msg),
28 StorageError::Other(err) => write!(f, "Storage error: {}", err),
29 }
30 }
31}
32
33impl StdError for StorageError {
34 fn source(&self) -> Option<&(dyn StdError + 'static)> {
35 match self {
36 StorageError::Other(err) => Some(err.as_ref()),
37 _ => None,
38 }
39 }
40}
41
42pub type Result<T> = std::result::Result<T, StorageError>;
43
44#[async_trait]
57pub trait BeaconStorage: Send + Sync {
58 async fn save_beacon(&self, beacon: &GeographicBeacon) -> Result<()>;
72
73 async fn query_by_geohash(&self, geohash_prefix: &str) -> Result<Vec<GeographicBeacon>>;
87
88 async fn query_all(&self) -> Result<Vec<GeographicBeacon>>;
98
99 async fn subscribe(&self) -> Result<BeaconChangeStream>;
110}
111
112pub type BeaconChangeStream = Box<dyn futures::Stream<Item = BeaconChangeEvent> + Send + Unpin>;
117
118#[derive(Debug, Clone)]
122pub enum BeaconChangeEvent {
123 Inserted(GeographicBeacon),
125
126 Updated(GeographicBeacon),
128
129 Removed { node_id: String },
131}
132
133#[cfg(test)]
134pub use tests::MockBeaconStorage;
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use crate::beacon::types::{GeoPosition, HierarchyLevel};
140 use futures::stream;
141 use std::sync::Arc;
142 use tokio::sync::Mutex;
143
144 pub struct MockBeaconStorage {
146 beacons: Arc<Mutex<Vec<GeographicBeacon>>>,
147 }
148
149 impl Default for MockBeaconStorage {
150 fn default() -> Self {
151 Self::new()
152 }
153 }
154
155 impl MockBeaconStorage {
156 pub fn new() -> Self {
157 Self {
158 beacons: Arc::new(Mutex::new(Vec::new())),
159 }
160 }
161 }
162
163 #[async_trait]
164 impl BeaconStorage for MockBeaconStorage {
165 async fn save_beacon(&self, beacon: &GeographicBeacon) -> Result<()> {
166 let mut beacons = self.beacons.lock().await;
167
168 if let Some(existing) = beacons.iter_mut().find(|b| b.node_id == beacon.node_id) {
170 *existing = beacon.clone();
171 } else {
172 beacons.push(beacon.clone());
173 }
174
175 Ok(())
176 }
177
178 async fn query_by_geohash(&self, geohash_prefix: &str) -> Result<Vec<GeographicBeacon>> {
179 let beacons = self.beacons.lock().await;
180 Ok(beacons
181 .iter()
182 .filter(|b| b.geohash.starts_with(geohash_prefix))
183 .cloned()
184 .collect())
185 }
186
187 async fn query_all(&self) -> Result<Vec<GeographicBeacon>> {
188 let beacons = self.beacons.lock().await;
189 Ok(beacons.clone())
190 }
191
192 async fn subscribe(&self) -> Result<BeaconChangeStream> {
193 Ok(Box::new(stream::empty()))
195 }
196 }
197
198 #[tokio::test]
199 async fn test_mock_storage_save_and_query() {
200 let storage = MockBeaconStorage::new();
201
202 let beacon = GeographicBeacon::new(
203 "test-node".to_string(),
204 GeoPosition::new(37.7749, -122.4194),
205 HierarchyLevel::Platform,
206 );
207
208 storage.save_beacon(&beacon).await.unwrap();
210
211 let all_beacons = storage.query_all().await.unwrap();
213 assert_eq!(all_beacons.len(), 1);
214 assert_eq!(all_beacons[0].node_id, "test-node");
215
216 let nearby = storage.query_by_geohash("9q8").await.unwrap();
218 assert_eq!(nearby.len(), 1);
219 }
220
221 #[tokio::test]
222 async fn test_mock_storage_idempotent_save() {
223 let storage = MockBeaconStorage::new();
224
225 let beacon = GeographicBeacon::new(
226 "test-node".to_string(),
227 GeoPosition::new(37.7749, -122.4194),
228 HierarchyLevel::Platform,
229 );
230
231 storage.save_beacon(&beacon).await.unwrap();
233 storage.save_beacon(&beacon).await.unwrap();
234
235 let all_beacons = storage.query_all().await.unwrap();
237 assert_eq!(all_beacons.len(), 1);
238 }
239
240 #[test]
241 fn test_storage_error_display() {
242 let err = StorageError::SaveFailed("disk full".to_string());
243 assert_eq!(err.to_string(), "Save failed: disk full");
244
245 let err = StorageError::QueryFailed("timeout".to_string());
246 assert_eq!(err.to_string(), "Query failed: timeout");
247
248 let err = StorageError::SubscribeFailed("connection lost".to_string());
249 assert_eq!(err.to_string(), "Subscribe failed: connection lost");
250
251 let inner = std::io::Error::new(std::io::ErrorKind::Other, "io error");
252 let err = StorageError::Other(Box::new(inner));
253 assert!(err.to_string().contains("Storage error"));
254 }
255
256 #[test]
257 fn test_storage_error_source() {
258 let err = StorageError::SaveFailed("test".to_string());
260 assert!(err.source().is_none());
261
262 let err = StorageError::QueryFailed("test".to_string());
263 assert!(err.source().is_none());
264
265 let err = StorageError::SubscribeFailed("test".to_string());
266 assert!(err.source().is_none());
267
268 let inner = std::io::Error::new(std::io::ErrorKind::Other, "io error");
270 let err = StorageError::Other(Box::new(inner));
271 assert!(err.source().is_some());
272 }
273
274 #[test]
275 fn test_storage_error_debug() {
276 let err = StorageError::SaveFailed("test".to_string());
277 let debug_str = format!("{:?}", err);
278 assert!(debug_str.contains("SaveFailed"));
279 }
280
281 #[test]
282 fn test_beacon_change_event_variants() {
283 let beacon = GeographicBeacon::new(
284 "node-1".to_string(),
285 GeoPosition::new(37.7749, -122.4194),
286 HierarchyLevel::Squad,
287 );
288
289 let inserted = BeaconChangeEvent::Inserted(beacon.clone());
290 let updated = BeaconChangeEvent::Updated(beacon);
291 let removed = BeaconChangeEvent::Removed {
292 node_id: "node-1".to_string(),
293 };
294
295 let _ = format!("{:?}", inserted);
297 let _ = format!("{:?}", updated);
298 let _ = format!("{:?}", removed);
299 }
300
301 #[tokio::test]
302 async fn test_mock_storage_query_by_geohash_no_match() {
303 let storage = MockBeaconStorage::new();
304
305 let beacon = GeographicBeacon::new(
306 "test-node".to_string(),
307 GeoPosition::new(37.7749, -122.4194),
308 HierarchyLevel::Platform,
309 );
310 storage.save_beacon(&beacon).await.unwrap();
311
312 let result = storage.query_by_geohash("xyz").await.unwrap();
314 assert!(result.is_empty());
315 }
316
317 #[tokio::test]
318 async fn test_mock_storage_multiple_beacons() {
319 let storage = MockBeaconStorage::new();
320
321 let beacon1 = GeographicBeacon::new(
322 "node-1".to_string(),
323 GeoPosition::new(37.7749, -122.4194),
324 HierarchyLevel::Squad,
325 );
326 let beacon2 = GeographicBeacon::new(
327 "node-2".to_string(),
328 GeoPosition::new(37.7750, -122.4195),
329 HierarchyLevel::Squad,
330 );
331 let beacon3 = GeographicBeacon::new(
332 "node-3".to_string(),
333 GeoPosition::new(40.7128, -74.0060), HierarchyLevel::Platoon,
335 );
336
337 storage.save_beacon(&beacon1).await.unwrap();
338 storage.save_beacon(&beacon2).await.unwrap();
339 storage.save_beacon(&beacon3).await.unwrap();
340
341 let all = storage.query_all().await.unwrap();
342 assert_eq!(all.len(), 3);
343
344 let sf_beacons = storage.query_by_geohash("9q8").await.unwrap();
346 assert_eq!(sf_beacons.len(), 2);
347 }
348
349 #[tokio::test]
350 async fn test_mock_storage_subscribe_returns_empty() {
351 let storage = MockBeaconStorage::new();
352 let stream = storage.subscribe().await;
353 assert!(stream.is_ok());
354 }
355
356 #[test]
357 fn test_mock_beacon_storage_default() {
358 let storage = MockBeaconStorage::default();
359 let _ = format!("{:?}", "created storage");
361 assert!(storage.beacons.try_lock().unwrap().is_empty());
362 }
363}