Skip to main content

allsource_core/infrastructure/query/
geospatial.rs

1//! Geospatial query support for AllSource events
2//!
3//! Enables spatial filtering of events based on coordinates stored in event
4//! metadata or payload. Supports:
5//!
6//! - **Bounding box queries**: Find events within a rectangular region
7//! - **Radius queries**: Find events within a distance of a point (haversine)
8//! - **Geospatial indexing**: In-memory spatial index using grid cells for O(1) lookups
9//!
10//! ## Coordinate extraction
11//!
12//! Coordinates are extracted from events by checking (in order):
13//! 1. `metadata.location.lat` / `metadata.location.lng`
14//! 2. `metadata.lat` / `metadata.lng`
15//! 3. `payload.location.lat` / `payload.location.lng`
16//! 4. `payload.lat` / `payload.lng`
17//! 5. `metadata.latitude` / `metadata.longitude`
18//! 6. `payload.latitude` / `payload.longitude`
19//!
20//! ## Limitations
21//!
22//! - Grid-based spatial index (not R-tree) — adequate for moderate datasets
23//! - Haversine assumes spherical Earth (max ~0.3% error vs ellipsoidal)
24//! - Events without coordinates are silently excluded from spatial queries
25
26use dashmap::DashMap;
27use serde::{Deserialize, Serialize};
28use uuid::Uuid;
29
30use crate::domain::entities::Event;
31
32/// Earth's radius in kilometers
33const EARTH_RADIUS_KM: f64 = 6371.0;
34
35/// Grid cell size in degrees (approximately 11km at equator)
36const GRID_CELL_SIZE: f64 = 0.1;
37
38/// A geographic coordinate
39#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
40pub struct Coordinate {
41    pub lat: f64,
42    pub lng: f64,
43}
44
45impl Coordinate {
46    pub fn new(lat: f64, lng: f64) -> Result<Self, String> {
47        if !(-90.0..=90.0).contains(&lat) {
48            return Err(format!("Latitude {lat} out of range [-90, 90]"));
49        }
50        if !(-180.0..=180.0).contains(&lng) {
51            return Err(format!("Longitude {lng} out of range [-180, 180]"));
52        }
53        Ok(Self { lat, lng })
54    }
55}
56
57/// Bounding box for rectangular queries
58#[derive(Debug, Clone, Copy, Deserialize)]
59pub struct BoundingBox {
60    pub min_lat: f64,
61    pub min_lng: f64,
62    pub max_lat: f64,
63    pub max_lng: f64,
64}
65
66impl BoundingBox {
67    pub fn contains(&self, coord: &Coordinate) -> bool {
68        coord.lat >= self.min_lat
69            && coord.lat <= self.max_lat
70            && coord.lng >= self.min_lng
71            && coord.lng <= self.max_lng
72    }
73}
74
75/// Radius query parameters
76#[derive(Debug, Clone, Copy, Deserialize)]
77pub struct RadiusQuery {
78    pub center_lat: f64,
79    pub center_lng: f64,
80    /// Radius in kilometers
81    pub radius_km: f64,
82}
83
84/// Geospatial query request
85#[derive(Debug, Clone, Deserialize)]
86pub struct GeoQueryRequest {
87    /// Bounding box filter (optional)
88    pub bbox: Option<BoundingBox>,
89    /// Radius filter (optional, applied after bbox if both present)
90    pub radius: Option<RadiusQuery>,
91    /// Additional event_type filter
92    pub event_type: Option<String>,
93    /// Additional tenant_id filter
94    pub tenant_id: Option<String>,
95    /// Maximum results
96    pub limit: Option<usize>,
97    /// Sort by distance from a point (only meaningful with radius query)
98    pub sort_by_distance: Option<bool>,
99}
100
101/// A geo-enriched event result
102#[derive(Debug, Clone, Serialize)]
103pub struct GeoEventResult {
104    pub event: serde_json::Value,
105    pub coordinate: Coordinate,
106    /// Distance from query center in km (only set for radius queries)
107    pub distance_km: Option<f64>,
108}
109
110/// Grid cell key for the spatial index
111#[derive(Debug, Clone, Hash, Eq, PartialEq)]
112struct GridCell {
113    lat_cell: i32,
114    lng_cell: i32,
115}
116
117impl GridCell {
118    fn from_coordinate(coord: &Coordinate) -> Self {
119        Self {
120            lat_cell: (coord.lat / GRID_CELL_SIZE).floor() as i32,
121            lng_cell: (coord.lng / GRID_CELL_SIZE).floor() as i32,
122        }
123    }
124}
125
126/// In-memory geospatial index using grid cells
127pub struct GeoIndex {
128    /// Grid cell → list of (event_id, coordinate) entries
129    grid: DashMap<(i32, i32), Vec<(Uuid, Coordinate)>>,
130    /// Event ID → coordinate for quick lookups
131    coordinates: DashMap<Uuid, Coordinate>,
132}
133
134impl Default for GeoIndex {
135    fn default() -> Self {
136        Self::new()
137    }
138}
139
140impl GeoIndex {
141    pub fn new() -> Self {
142        Self {
143            grid: DashMap::new(),
144            coordinates: DashMap::new(),
145        }
146    }
147
148    /// Index an event's coordinates (if extractable)
149    pub fn index_event(&self, event: &Event) {
150        if let Some(coord) = extract_coordinates(event) {
151            let cell = GridCell::from_coordinate(&coord);
152            self.grid
153                .entry((cell.lat_cell, cell.lng_cell))
154                .or_default()
155                .push((event.id, coord));
156            self.coordinates.insert(event.id, coord);
157        }
158    }
159
160    /// Get coordinate for an event
161    pub fn get_coordinate(&self, event_id: &Uuid) -> Option<Coordinate> {
162        self.coordinates.get(event_id).map(|c| *c)
163    }
164
165    /// Find all event IDs within a bounding box
166    pub fn query_bbox(&self, bbox: &BoundingBox) -> Vec<(Uuid, Coordinate)> {
167        let min_cell = GridCell::from_coordinate(&Coordinate {
168            lat: bbox.min_lat,
169            lng: bbox.min_lng,
170        });
171        let max_cell = GridCell::from_coordinate(&Coordinate {
172            lat: bbox.max_lat,
173            lng: bbox.max_lng,
174        });
175
176        let mut results = Vec::new();
177        for lat_cell in min_cell.lat_cell..=max_cell.lat_cell {
178            for lng_cell in min_cell.lng_cell..=max_cell.lng_cell {
179                if let Some(entries) = self.grid.get(&(lat_cell, lng_cell)) {
180                    for (id, coord) in entries.iter() {
181                        if bbox.contains(coord) {
182                            results.push((*id, *coord));
183                        }
184                    }
185                }
186            }
187        }
188        results
189    }
190
191    /// Find all event IDs within a radius
192    pub fn query_radius(&self, query: &RadiusQuery) -> Vec<(Uuid, Coordinate, f64)> {
193        // Compute bounding box for the radius to narrow grid cells
194        let lat_delta = query.radius_km / 111.0; // ~111 km per degree latitude
195        let lng_delta = query.radius_km / (111.0 * query.center_lat.to_radians().cos().max(0.001));
196        let bbox = BoundingBox {
197            min_lat: query.center_lat - lat_delta,
198            max_lat: query.center_lat + lat_delta,
199            min_lng: query.center_lng - lng_delta,
200            max_lng: query.center_lng + lng_delta,
201        };
202
203        let candidates = self.query_bbox(&bbox);
204        let center = Coordinate {
205            lat: query.center_lat,
206            lng: query.center_lng,
207        };
208
209        candidates
210            .into_iter()
211            .filter_map(|(id, coord)| {
212                let dist = haversine_distance(&center, &coord);
213                if dist <= query.radius_km {
214                    Some((id, coord, dist))
215                } else {
216                    None
217                }
218            })
219            .collect()
220    }
221
222    /// Number of indexed events
223    pub fn len(&self) -> usize {
224        self.coordinates.len()
225    }
226
227    pub fn is_empty(&self) -> bool {
228        self.coordinates.is_empty()
229    }
230
231    /// Statistics about the geo index
232    pub fn stats(&self) -> GeoIndexStats {
233        GeoIndexStats {
234            indexed_events: self.coordinates.len(),
235            grid_cells: self.grid.len(),
236        }
237    }
238}
239
240/// Geo index statistics
241#[derive(Debug, Clone, Serialize)]
242pub struct GeoIndexStats {
243    pub indexed_events: usize,
244    pub grid_cells: usize,
245}
246
247/// Haversine distance between two coordinates in kilometers
248pub fn haversine_distance(a: &Coordinate, b: &Coordinate) -> f64 {
249    let d_lat = (b.lat - a.lat).to_radians();
250    let d_lng = (b.lng - a.lng).to_radians();
251    let lat1 = a.lat.to_radians();
252    let lat2 = b.lat.to_radians();
253
254    let h = (d_lat / 2.0).sin().powi(2) + lat1.cos() * lat2.cos() * (d_lng / 2.0).sin().powi(2);
255    let c = 2.0 * h.sqrt().asin();
256
257    EARTH_RADIUS_KM * c
258}
259
260/// Extract coordinates from an event's metadata or payload
261pub fn extract_coordinates(event: &Event) -> Option<Coordinate> {
262    // Try metadata first
263    if let Some(ref meta) = event.metadata
264        && let Some(coord) = try_extract_coord(meta)
265    {
266        return Some(coord);
267    }
268    // Fall back to payload
269    try_extract_coord(&event.payload)
270}
271
272/// Try to extract coordinates from a JSON value
273fn try_extract_coord(value: &serde_json::Value) -> Option<Coordinate> {
274    // Try nested location object
275    if let Some(loc) = value.get("location") {
276        if let (Some(lat), Some(lng)) = (
277            loc.get("lat").and_then(serde_json::Value::as_f64),
278            loc.get("lng").and_then(serde_json::Value::as_f64),
279        ) && let Ok(c) = Coordinate::new(lat, lng)
280        {
281            return Some(c);
282        }
283        if let (Some(lat), Some(lng)) = (
284            loc.get("latitude").and_then(serde_json::Value::as_f64),
285            loc.get("longitude").and_then(serde_json::Value::as_f64),
286        ) && let Ok(c) = Coordinate::new(lat, lng)
287        {
288            return Some(c);
289        }
290    }
291
292    // Try flat lat/lng
293    if let (Some(lat), Some(lng)) = (
294        value.get("lat").and_then(serde_json::Value::as_f64),
295        value.get("lng").and_then(serde_json::Value::as_f64),
296    ) && let Ok(c) = Coordinate::new(lat, lng)
297    {
298        return Some(c);
299    }
300
301    // Try flat latitude/longitude
302    if let (Some(lat), Some(lng)) = (
303        value.get("latitude").and_then(serde_json::Value::as_f64),
304        value.get("longitude").and_then(serde_json::Value::as_f64),
305    ) && let Ok(c) = Coordinate::new(lat, lng)
306    {
307        return Some(c);
308    }
309
310    None
311}
312
313/// Execute a geospatial query against events using the geo index
314pub fn execute_geo_query(
315    events: &[Event],
316    geo_index: &GeoIndex,
317    request: &GeoQueryRequest,
318) -> Vec<GeoEventResult> {
319    // Determine candidate event IDs from spatial filters
320    let candidates: Vec<(Uuid, Coordinate, Option<f64>)> = if let Some(ref radius) = request.radius
321    {
322        geo_index
323            .query_radius(radius)
324            .into_iter()
325            .map(|(id, coord, dist)| (id, coord, Some(dist)))
326            .collect()
327    } else if let Some(ref bbox) = request.bbox {
328        geo_index
329            .query_bbox(bbox)
330            .into_iter()
331            .map(|(id, coord)| (id, coord, None))
332            .collect()
333    } else {
334        // No spatial filter — return all geo-indexed events
335        geo_index
336            .coordinates
337            .iter()
338            .map(|entry| (*entry.key(), *entry.value(), None))
339            .collect()
340    };
341
342    // Build a set of candidate IDs for fast lookup
343    let candidate_map: std::collections::HashMap<Uuid, (Coordinate, Option<f64>)> = candidates
344        .into_iter()
345        .map(|(id, coord, dist)| (id, (coord, dist)))
346        .collect();
347
348    // Filter events by candidates + additional filters
349    let mut results: Vec<GeoEventResult> = events
350        .iter()
351        .filter_map(|event| {
352            let (coord, dist) = candidate_map.get(&event.id)?;
353
354            // Apply event_type filter
355            if let Some(ref et) = request.event_type
356                && event.event_type_str() != et
357            {
358                return None;
359            }
360
361            // Apply tenant_id filter
362            if let Some(ref tid) = request.tenant_id
363                && event.tenant_id_str() != tid
364            {
365                return None;
366            }
367
368            Some(GeoEventResult {
369                event: serde_json::json!({
370                    "id": event.id.to_string(),
371                    "event_type": event.event_type_str(),
372                    "entity_id": event.entity_id_str(),
373                    "tenant_id": event.tenant_id_str(),
374                    "payload": event.payload,
375                    "metadata": event.metadata,
376                    "timestamp": event.timestamp.to_rfc3339(),
377                    "version": event.version,
378                }),
379                coordinate: *coord,
380                distance_km: *dist,
381            })
382        })
383        .collect();
384
385    // Sort by distance if requested
386    if request.sort_by_distance.unwrap_or(false) {
387        results.sort_by(|a, b| {
388            a.distance_km
389                .unwrap_or(f64::MAX)
390                .partial_cmp(&b.distance_km.unwrap_or(f64::MAX))
391                .unwrap_or(std::cmp::Ordering::Equal)
392        });
393    }
394
395    // Apply limit
396    if let Some(limit) = request.limit {
397        results.truncate(limit);
398    }
399
400    results
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    fn make_geo_event(entity: &str, lat: f64, lng: f64) -> Event {
408        Event::from_strings(
409            "location.update".to_string(),
410            entity.to_string(),
411            "default".to_string(),
412            serde_json::json!({"action": "checkin"}),
413            Some(serde_json::json!({"location": {"lat": lat, "lng": lng}})),
414        )
415        .unwrap()
416    }
417
418    #[test]
419    fn test_haversine_same_point() {
420        let a = Coordinate { lat: 0.0, lng: 0.0 };
421        assert!((haversine_distance(&a, &a) - 0.0).abs() < 0.001);
422    }
423
424    #[test]
425    fn test_haversine_known_distance() {
426        // NYC to London is approximately 5570 km
427        let nyc = Coordinate {
428            lat: 40.7128,
429            lng: -74.0060,
430        };
431        let london = Coordinate {
432            lat: 51.5074,
433            lng: -0.1278,
434        };
435        let dist = haversine_distance(&nyc, &london);
436        assert!((dist - 5570.0).abs() < 50.0, "NYC-London: {dist} km");
437    }
438
439    #[test]
440    fn test_coordinate_validation() {
441        assert!(Coordinate::new(45.0, 90.0).is_ok());
442        assert!(Coordinate::new(91.0, 0.0).is_err());
443        assert!(Coordinate::new(0.0, 181.0).is_err());
444    }
445
446    #[test]
447    fn test_bounding_box_contains() {
448        let bbox = BoundingBox {
449            min_lat: 40.0,
450            max_lat: 42.0,
451            min_lng: -75.0,
452            max_lng: -73.0,
453        };
454        assert!(bbox.contains(&Coordinate {
455            lat: 41.0,
456            lng: -74.0
457        }));
458        assert!(!bbox.contains(&Coordinate {
459            lat: 43.0,
460            lng: -74.0
461        }));
462    }
463
464    #[test]
465    fn test_extract_coordinates_from_metadata_location() {
466        let event = make_geo_event("e1", 40.7128, -74.0060);
467        let coord = extract_coordinates(&event).unwrap();
468        assert!((coord.lat - 40.7128).abs() < 0.001);
469        assert!((coord.lng - (-74.0060)).abs() < 0.001);
470    }
471
472    #[test]
473    fn test_extract_coordinates_from_payload_flat() {
474        let event = Event::from_strings(
475            "location.update".to_string(),
476            "e1".to_string(),
477            "default".to_string(),
478            serde_json::json!({"lat": 51.5074, "lng": -0.1278}),
479            None,
480        )
481        .unwrap();
482        let coord = extract_coordinates(&event).unwrap();
483        assert!((coord.lat - 51.5074).abs() < 0.001);
484    }
485
486    #[test]
487    fn test_no_coordinates() {
488        let event = Event::from_strings(
489            "user.created".to_string(),
490            "u1".to_string(),
491            "default".to_string(),
492            serde_json::json!({"name": "Alice"}),
493            None,
494        )
495        .unwrap();
496        assert!(extract_coordinates(&event).is_none());
497    }
498
499    #[test]
500    fn test_geo_index_and_bbox_query() {
501        let index = GeoIndex::new();
502        let events = vec![
503            make_geo_event("e1", 40.7128, -74.0060), // NYC
504            make_geo_event("e2", 51.5074, -0.1278),  // London
505            make_geo_event("e3", 41.0, -73.5),       // Near NYC
506        ];
507        for e in &events {
508            index.index_event(e);
509        }
510        assert_eq!(index.len(), 3);
511
512        let bbox = BoundingBox {
513            min_lat: 40.0,
514            max_lat: 42.0,
515            min_lng: -75.0,
516            max_lng: -73.0,
517        };
518        let results = index.query_bbox(&bbox);
519        assert_eq!(results.len(), 2); // NYC and Near NYC
520    }
521
522    #[test]
523    fn test_geo_index_radius_query() {
524        let index = GeoIndex::new();
525        let events = vec![
526            make_geo_event("e1", 40.7128, -74.0060), // NYC
527            make_geo_event("e2", 51.5074, -0.1278),  // London
528            make_geo_event("e3", 40.75, -73.99),     // Near NYC (~5km)
529        ];
530        for e in &events {
531            index.index_event(e);
532        }
533
534        let query = RadiusQuery {
535            center_lat: 40.7128,
536            center_lng: -74.0060,
537            radius_km: 10.0,
538        };
539        let results = index.query_radius(&query);
540        assert_eq!(results.len(), 2); // NYC and Near NYC
541        for (_, _, dist) in &results {
542            assert!(*dist <= 10.0);
543        }
544    }
545
546    #[test]
547    fn test_execute_geo_query_with_type_filter() {
548        let index = GeoIndex::new();
549        let mut events = vec![make_geo_event("e1", 40.7128, -74.0060)];
550        let other = Event::from_strings(
551            "user.created".to_string(),
552            "e2".to_string(),
553            "default".to_string(),
554            serde_json::json!({"lat": 40.75, "lng": -73.99}),
555            None,
556        )
557        .unwrap();
558        events.push(other);
559        for e in &events {
560            index.index_event(e);
561        }
562
563        let req = GeoQueryRequest {
564            bbox: Some(BoundingBox {
565                min_lat: 40.0,
566                max_lat: 42.0,
567                min_lng: -75.0,
568                max_lng: -73.0,
569            }),
570            radius: None,
571            event_type: Some("location.update".to_string()),
572            tenant_id: None,
573            limit: None,
574            sort_by_distance: None,
575        };
576        let results = execute_geo_query(&events, &index, &req);
577        assert_eq!(results.len(), 1);
578    }
579
580    #[test]
581    fn test_geo_index_stats() {
582        let index = GeoIndex::new();
583        let events = vec![
584            make_geo_event("e1", 40.7128, -74.0060),
585            make_geo_event("e2", 51.5074, -0.1278),
586        ];
587        for e in &events {
588            index.index_event(e);
589        }
590        let stats = index.stats();
591        assert_eq!(stats.indexed_events, 2);
592        assert!(stats.grid_cells >= 1);
593    }
594}