Skip to main content

aegis_geo/
engine.rs

1//! The geospatial engine: named collections of geo points, each backed by a
2//! grid index, with radius / bounding-box / nearest-k queries, metadata
3//! filtering, and snapshot persistence.
4
5use crate::grid::GridIndex;
6use crate::types::{valid_coord, GeoError, GeoFeature, GeoHit};
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11/// Default grid cell size in degrees (~55 km of latitude).
12const DEFAULT_CELL_DEG: f64 = 0.5;
13
14#[derive(Debug, Clone, Serialize)]
15pub struct CollectionStats {
16    pub name: String,
17    pub count: usize,
18}
19
20struct Collection {
21    grid: GridIndex,
22    id_to_node: HashMap<String, u32>,
23    node_meta: Vec<serde_json::Value>,
24    node_id: Vec<Option<String>>,
25}
26
27impl Collection {
28    fn new(cell_deg: f64) -> Self {
29        Self {
30            grid: GridIndex::new(cell_deg),
31            id_to_node: HashMap::new(),
32            node_meta: Vec::new(),
33            node_id: Vec::new(),
34        }
35    }
36
37    fn upsert(
38        &mut self,
39        id: String,
40        lat: f64,
41        lon: f64,
42        metadata: serde_json::Value,
43    ) -> Result<(), GeoError> {
44        if !valid_coord(lat, lon) {
45            return Err(GeoError::InvalidCoordinate);
46        }
47        if let Some(&old) = self.id_to_node.get(&id) {
48            self.grid.remove(old);
49            self.node_id[old as usize] = None;
50            self.node_meta[old as usize] = serde_json::Value::Null;
51        }
52        let node = self.grid.insert(lat, lon);
53        debug_assert_eq!(node as usize, self.node_meta.len());
54        self.node_meta.push(metadata);
55        self.node_id.push(Some(id.clone()));
56        self.id_to_node.insert(id, node);
57        Ok(())
58    }
59
60    fn feature(&self, node: u32) -> Option<GeoFeature> {
61        let (lat, lon) = self.grid.coord(node)?;
62        let id = self.node_id[node as usize].clone()?;
63        Some(GeoFeature {
64            id,
65            lat,
66            lon,
67            metadata: self.node_meta[node as usize].clone(),
68        })
69    }
70
71    fn get(&self, id: &str) -> Option<GeoFeature> {
72        let &node = self.id_to_node.get(id)?;
73        self.feature(node)
74    }
75
76    fn delete(&mut self, id: &str) -> bool {
77        match self.id_to_node.remove(id) {
78            Some(node) => {
79                self.grid.remove(node);
80                self.node_id[node as usize] = None;
81                self.node_meta[node as usize] = serde_json::Value::Null;
82                true
83            }
84            None => false,
85        }
86    }
87
88    fn hit(&self, node: u32, distance_m: f64) -> Option<GeoHit> {
89        let (lat, lon) = self.grid.coord(node)?;
90        let id = self.node_id[node as usize].clone()?;
91        Some(GeoHit {
92            id,
93            lat,
94            lon,
95            distance_m,
96            metadata: self.node_meta[node as usize].clone(),
97        })
98    }
99
100    fn passes(&self, node: u32, has_filter: bool, filter: &serde_json::Value) -> bool {
101        !has_filter || matches_filter(&self.node_meta[node as usize], filter)
102    }
103}
104
105fn matches_filter(meta: &serde_json::Value, filter: &serde_json::Value) -> bool {
106    match (meta.as_object(), filter.as_object()) {
107        (Some(m), Some(f)) => f.iter().all(|(k, v)| m.get(k) == Some(v)),
108        (_, Some(f)) => f.is_empty(),
109        _ => true,
110    }
111}
112
113fn has_filter(filter: &serde_json::Value) -> bool {
114    filter.as_object().map(|m| !m.is_empty()).unwrap_or(false)
115}
116
117// ============================================================================
118// Engine
119// ============================================================================
120
121/// Multi-collection geospatial engine.
122pub struct GeoEngine {
123    collections: RwLock<HashMap<String, Collection>>,
124    cell_deg: f64,
125}
126
127impl Default for GeoEngine {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133impl GeoEngine {
134    pub fn new() -> Self {
135        Self {
136            collections: RwLock::new(HashMap::new()),
137            cell_deg: DEFAULT_CELL_DEG,
138        }
139    }
140
141    pub fn create_collection(&self, name: impl Into<String>) -> Result<(), GeoError> {
142        let name = name.into();
143        let mut cols = self.collections.write();
144        if cols.contains_key(&name) {
145            return Err(GeoError::CollectionExists(name));
146        }
147        cols.insert(name, Collection::new(self.cell_deg));
148        Ok(())
149    }
150
151    pub fn drop_collection(&self, name: &str) -> Result<(), GeoError> {
152        self.collections
153            .write()
154            .remove(name)
155            .map(|_| ())
156            .ok_or_else(|| GeoError::CollectionNotFound(name.to_string()))
157    }
158
159    pub fn list_collections(&self) -> Vec<String> {
160        let mut v: Vec<String> = self.collections.read().keys().cloned().collect();
161        v.sort();
162        v
163    }
164
165    pub fn collection_exists(&self, name: &str) -> bool {
166        self.collections.read().contains_key(name)
167    }
168
169    pub fn collection_stats(&self, name: &str) -> Option<CollectionStats> {
170        let cols = self.collections.read();
171        let c = cols.get(name)?;
172        Some(CollectionStats {
173            name: name.to_string(),
174            count: c.grid.len(),
175        })
176    }
177
178    /// Upsert a feature. The collection is created on demand if it does not yet
179    /// exist (the coordinate is validated first, so a bad write never creates an
180    /// empty collection).
181    pub fn upsert(
182        &self,
183        collection: &str,
184        id: impl Into<String>,
185        lat: f64,
186        lon: f64,
187        metadata: serde_json::Value,
188    ) -> Result<(), GeoError> {
189        if !valid_coord(lat, lon) {
190            return Err(GeoError::InvalidCoordinate);
191        }
192        let mut cols = self.collections.write();
193        let c = cols
194            .entry(collection.to_string())
195            .or_insert_with(|| Collection::new(self.cell_deg));
196        c.upsert(id.into(), lat, lon, metadata)
197    }
198
199    pub fn get(&self, collection: &str, id: &str) -> Result<Option<GeoFeature>, GeoError> {
200        let cols = self.collections.read();
201        let c = cols
202            .get(collection)
203            .ok_or_else(|| GeoError::CollectionNotFound(collection.to_string()))?;
204        Ok(c.get(id))
205    }
206
207    pub fn delete(&self, collection: &str, id: &str) -> Result<bool, GeoError> {
208        let mut cols = self.collections.write();
209        let c = cols
210            .get_mut(collection)
211            .ok_or_else(|| GeoError::CollectionNotFound(collection.to_string()))?;
212        Ok(c.delete(id))
213    }
214
215    /// Features within `radius_m` of `(lat, lon)`, nearest first.
216    pub fn within_radius(
217        &self,
218        collection: &str,
219        lat: f64,
220        lon: f64,
221        radius_m: f64,
222        filter: &serde_json::Value,
223    ) -> Result<Vec<GeoHit>, GeoError> {
224        let cols = self.collections.read();
225        let c = cols
226            .get(collection)
227            .ok_or_else(|| GeoError::CollectionNotFound(collection.to_string()))?;
228        let hf = has_filter(filter);
229        let mut hits: Vec<GeoHit> = c
230            .grid
231            .within_radius(lat, lon, radius_m)
232            .into_iter()
233            .filter(|(node, _)| c.passes(*node, hf, filter))
234            .filter_map(|(node, d)| c.hit(node, d))
235            .collect();
236        hits.sort_by(|a, b| {
237            a.distance_m
238                .total_cmp(&b.distance_m)
239                .then_with(|| a.id.cmp(&b.id))
240        });
241        Ok(hits)
242    }
243
244    /// Features inside a bounding box.
245    pub fn within_bbox(
246        &self,
247        collection: &str,
248        min_lat: f64,
249        min_lon: f64,
250        max_lat: f64,
251        max_lon: f64,
252        filter: &serde_json::Value,
253    ) -> Result<Vec<GeoHit>, GeoError> {
254        let cols = self.collections.read();
255        let c = cols
256            .get(collection)
257            .ok_or_else(|| GeoError::CollectionNotFound(collection.to_string()))?;
258        let hf = has_filter(filter);
259        let hits = c
260            .grid
261            .within_bbox(min_lat, min_lon, max_lat, max_lon)
262            .into_iter()
263            .filter(|node| c.passes(*node, hf, filter))
264            .filter_map(|node| c.hit(node, 0.0))
265            .collect();
266        Ok(hits)
267    }
268
269    /// The `k` nearest features to `(lat, lon)`.
270    pub fn nearest(
271        &self,
272        collection: &str,
273        lat: f64,
274        lon: f64,
275        k: usize,
276        filter: &serde_json::Value,
277    ) -> Result<Vec<GeoHit>, GeoError> {
278        let cols = self.collections.read();
279        let c = cols
280            .get(collection)
281            .ok_or_else(|| GeoError::CollectionNotFound(collection.to_string()))?;
282        let hf = has_filter(filter);
283        // Without a filter, k nearest is exactly k. With a filter, matches may be
284        // sparse and far away, so expand the candidate set geometrically until we
285        // have k matches or the whole collection has been scanned — otherwise a
286        // fixed over-fetch can silently return fewer than k.
287        let live = c.grid.len();
288        let mut fetch = if hf { (k * 4).max(k) } else { k };
289        loop {
290            let hits: Vec<GeoHit> = c
291                .grid
292                .nearest(lat, lon, fetch)
293                .into_iter()
294                .filter(|(node, _)| c.passes(*node, hf, filter))
295                .filter_map(|(node, d)| c.hit(node, d))
296                .take(k)
297                .collect();
298            if !hf || hits.len() >= k || fetch >= live {
299                return Ok(hits);
300            }
301            fetch = (fetch * 4).min(live);
302        }
303    }
304
305    // ---- Persistence ----------------------------------------------------
306
307    pub fn snapshot(&self) -> EngineSnapshot {
308        let cols = self.collections.read();
309        EngineSnapshot {
310            collections: cols
311                .iter()
312                .map(|(name, c)| CollectionSnapshot {
313                    name: name.clone(),
314                    features: c
315                        .id_to_node
316                        .values()
317                        .filter_map(|&node| c.feature(node))
318                        .collect(),
319                })
320                .collect(),
321        }
322    }
323
324    pub fn load_snapshot(&self, snap: EngineSnapshot) {
325        let mut cols = self.collections.write();
326        cols.clear();
327        for cs in snap.collections {
328            let mut c = Collection::new(self.cell_deg);
329            for f in cs.features {
330                let _ = c.upsert(f.id, f.lat, f.lon, f.metadata);
331            }
332            cols.insert(cs.name, c);
333        }
334    }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
338pub struct EngineSnapshot {
339    pub collections: Vec<CollectionSnapshot>,
340}
341
342#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct CollectionSnapshot {
344    pub name: String,
345    pub features: Vec<GeoFeature>,
346}