1use dashmap::DashMap;
27use serde::{Deserialize, Serialize};
28use uuid::Uuid;
29
30use crate::domain::entities::Event;
31
32const EARTH_RADIUS_KM: f64 = 6371.0;
34
35const GRID_CELL_SIZE: f64 = 0.1;
37
38#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
40pub struct Coordinate {
41 pub lat: f64,
42 pub lng: f64,
43}
44
45impl Coordinate {
46 pub fn new(lat: f64, lng: f64) -> Result<Self, String> {
47 if !(-90.0..=90.0).contains(&lat) {
48 return Err(format!("Latitude {lat} out of range [-90, 90]"));
49 }
50 if !(-180.0..=180.0).contains(&lng) {
51 return Err(format!("Longitude {lng} out of range [-180, 180]"));
52 }
53 Ok(Self { lat, lng })
54 }
55}
56
57#[derive(Debug, Clone, Copy, Deserialize)]
59pub struct BoundingBox {
60 pub min_lat: f64,
61 pub min_lng: f64,
62 pub max_lat: f64,
63 pub max_lng: f64,
64}
65
66impl BoundingBox {
67 pub fn contains(&self, coord: &Coordinate) -> bool {
68 coord.lat >= self.min_lat
69 && coord.lat <= self.max_lat
70 && coord.lng >= self.min_lng
71 && coord.lng <= self.max_lng
72 }
73}
74
75#[derive(Debug, Clone, Copy, Deserialize)]
77pub struct RadiusQuery {
78 pub center_lat: f64,
79 pub center_lng: f64,
80 pub radius_km: f64,
82}
83
84#[derive(Debug, Clone, Deserialize)]
86pub struct GeoQueryRequest {
87 pub bbox: Option<BoundingBox>,
89 pub radius: Option<RadiusQuery>,
91 pub event_type: Option<String>,
93 pub tenant_id: Option<String>,
95 pub limit: Option<usize>,
97 pub sort_by_distance: Option<bool>,
99}
100
101#[derive(Debug, Clone, Serialize)]
103pub struct GeoEventResult {
104 pub event: serde_json::Value,
105 pub coordinate: Coordinate,
106 pub distance_km: Option<f64>,
108}
109
110#[derive(Debug, Clone, Hash, Eq, PartialEq)]
112struct GridCell {
113 lat_cell: i32,
114 lng_cell: i32,
115}
116
117impl GridCell {
118 fn from_coordinate(coord: &Coordinate) -> Self {
119 Self {
120 lat_cell: (coord.lat / GRID_CELL_SIZE).floor() as i32,
121 lng_cell: (coord.lng / GRID_CELL_SIZE).floor() as i32,
122 }
123 }
124}
125
126pub struct GeoIndex {
128 grid: DashMap<(i32, i32), Vec<(Uuid, Coordinate)>>,
130 coordinates: DashMap<Uuid, Coordinate>,
132}
133
134impl Default for GeoIndex {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140impl GeoIndex {
141 pub fn new() -> Self {
142 Self {
143 grid: DashMap::new(),
144 coordinates: DashMap::new(),
145 }
146 }
147
148 pub fn index_event(&self, event: &Event) {
150 if let Some(coord) = extract_coordinates(event) {
151 let cell = GridCell::from_coordinate(&coord);
152 self.grid
153 .entry((cell.lat_cell, cell.lng_cell))
154 .or_default()
155 .push((event.id, coord));
156 self.coordinates.insert(event.id, coord);
157 }
158 }
159
160 pub fn get_coordinate(&self, event_id: &Uuid) -> Option<Coordinate> {
162 self.coordinates.get(event_id).map(|c| *c)
163 }
164
165 pub fn query_bbox(&self, bbox: &BoundingBox) -> Vec<(Uuid, Coordinate)> {
167 let min_cell = GridCell::from_coordinate(&Coordinate {
168 lat: bbox.min_lat,
169 lng: bbox.min_lng,
170 });
171 let max_cell = GridCell::from_coordinate(&Coordinate {
172 lat: bbox.max_lat,
173 lng: bbox.max_lng,
174 });
175
176 let mut results = Vec::new();
177 for lat_cell in min_cell.lat_cell..=max_cell.lat_cell {
178 for lng_cell in min_cell.lng_cell..=max_cell.lng_cell {
179 if let Some(entries) = self.grid.get(&(lat_cell, lng_cell)) {
180 for (id, coord) in entries.iter() {
181 if bbox.contains(coord) {
182 results.push((*id, *coord));
183 }
184 }
185 }
186 }
187 }
188 results
189 }
190
191 pub fn query_radius(&self, query: &RadiusQuery) -> Vec<(Uuid, Coordinate, f64)> {
193 let lat_delta = query.radius_km / 111.0; let lng_delta = query.radius_km / (111.0 * query.center_lat.to_radians().cos().max(0.001));
196 let bbox = BoundingBox {
197 min_lat: query.center_lat - lat_delta,
198 max_lat: query.center_lat + lat_delta,
199 min_lng: query.center_lng - lng_delta,
200 max_lng: query.center_lng + lng_delta,
201 };
202
203 let candidates = self.query_bbox(&bbox);
204 let center = Coordinate {
205 lat: query.center_lat,
206 lng: query.center_lng,
207 };
208
209 candidates
210 .into_iter()
211 .filter_map(|(id, coord)| {
212 let dist = haversine_distance(¢er, &coord);
213 if dist <= query.radius_km {
214 Some((id, coord, dist))
215 } else {
216 None
217 }
218 })
219 .collect()
220 }
221
222 pub fn len(&self) -> usize {
224 self.coordinates.len()
225 }
226
227 pub fn is_empty(&self) -> bool {
228 self.coordinates.is_empty()
229 }
230
231 pub fn stats(&self) -> GeoIndexStats {
233 GeoIndexStats {
234 indexed_events: self.coordinates.len(),
235 grid_cells: self.grid.len(),
236 }
237 }
238}
239
240#[derive(Debug, Clone, Serialize)]
242pub struct GeoIndexStats {
243 pub indexed_events: usize,
244 pub grid_cells: usize,
245}
246
247pub fn haversine_distance(a: &Coordinate, b: &Coordinate) -> f64 {
249 let d_lat = (b.lat - a.lat).to_radians();
250 let d_lng = (b.lng - a.lng).to_radians();
251 let lat1 = a.lat.to_radians();
252 let lat2 = b.lat.to_radians();
253
254 let h = (d_lat / 2.0).sin().powi(2) + lat1.cos() * lat2.cos() * (d_lng / 2.0).sin().powi(2);
255 let c = 2.0 * h.sqrt().asin();
256
257 EARTH_RADIUS_KM * c
258}
259
260pub fn extract_coordinates(event: &Event) -> Option<Coordinate> {
262 if let Some(ref meta) = event.metadata
264 && let Some(coord) = try_extract_coord(meta)
265 {
266 return Some(coord);
267 }
268 try_extract_coord(&event.payload)
270}
271
272fn try_extract_coord(value: &serde_json::Value) -> Option<Coordinate> {
274 if let Some(loc) = value.get("location") {
276 if let (Some(lat), Some(lng)) = (
277 loc.get("lat").and_then(serde_json::Value::as_f64),
278 loc.get("lng").and_then(serde_json::Value::as_f64),
279 ) && let Ok(c) = Coordinate::new(lat, lng)
280 {
281 return Some(c);
282 }
283 if let (Some(lat), Some(lng)) = (
284 loc.get("latitude").and_then(serde_json::Value::as_f64),
285 loc.get("longitude").and_then(serde_json::Value::as_f64),
286 ) && let Ok(c) = Coordinate::new(lat, lng)
287 {
288 return Some(c);
289 }
290 }
291
292 if let (Some(lat), Some(lng)) = (
294 value.get("lat").and_then(serde_json::Value::as_f64),
295 value.get("lng").and_then(serde_json::Value::as_f64),
296 ) && let Ok(c) = Coordinate::new(lat, lng)
297 {
298 return Some(c);
299 }
300
301 if let (Some(lat), Some(lng)) = (
303 value.get("latitude").and_then(serde_json::Value::as_f64),
304 value.get("longitude").and_then(serde_json::Value::as_f64),
305 ) && let Ok(c) = Coordinate::new(lat, lng)
306 {
307 return Some(c);
308 }
309
310 None
311}
312
313pub fn execute_geo_query(
315 events: &[Event],
316 geo_index: &GeoIndex,
317 request: &GeoQueryRequest,
318) -> Vec<GeoEventResult> {
319 let candidates: Vec<(Uuid, Coordinate, Option<f64>)> = if let Some(ref radius) = request.radius
321 {
322 geo_index
323 .query_radius(radius)
324 .into_iter()
325 .map(|(id, coord, dist)| (id, coord, Some(dist)))
326 .collect()
327 } else if let Some(ref bbox) = request.bbox {
328 geo_index
329 .query_bbox(bbox)
330 .into_iter()
331 .map(|(id, coord)| (id, coord, None))
332 .collect()
333 } else {
334 geo_index
336 .coordinates
337 .iter()
338 .map(|entry| (*entry.key(), *entry.value(), None))
339 .collect()
340 };
341
342 let candidate_map: std::collections::HashMap<Uuid, (Coordinate, Option<f64>)> = candidates
344 .into_iter()
345 .map(|(id, coord, dist)| (id, (coord, dist)))
346 .collect();
347
348 let mut results: Vec<GeoEventResult> = events
350 .iter()
351 .filter_map(|event| {
352 let (coord, dist) = candidate_map.get(&event.id)?;
353
354 if let Some(ref et) = request.event_type
356 && event.event_type_str() != et
357 {
358 return None;
359 }
360
361 if let Some(ref tid) = request.tenant_id
363 && event.tenant_id_str() != tid
364 {
365 return None;
366 }
367
368 Some(GeoEventResult {
369 event: serde_json::json!({
370 "id": event.id.to_string(),
371 "event_type": event.event_type_str(),
372 "entity_id": event.entity_id_str(),
373 "tenant_id": event.tenant_id_str(),
374 "payload": event.payload,
375 "metadata": event.metadata,
376 "timestamp": event.timestamp.to_rfc3339(),
377 "version": event.version,
378 }),
379 coordinate: *coord,
380 distance_km: *dist,
381 })
382 })
383 .collect();
384
385 if request.sort_by_distance.unwrap_or(false) {
387 results.sort_by(|a, b| {
388 a.distance_km
389 .unwrap_or(f64::MAX)
390 .partial_cmp(&b.distance_km.unwrap_or(f64::MAX))
391 .unwrap_or(std::cmp::Ordering::Equal)
392 });
393 }
394
395 if let Some(limit) = request.limit {
397 results.truncate(limit);
398 }
399
400 results
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 fn make_geo_event(entity: &str, lat: f64, lng: f64) -> Event {
408 Event::from_strings(
409 "location.update".to_string(),
410 entity.to_string(),
411 "default".to_string(),
412 serde_json::json!({"action": "checkin"}),
413 Some(serde_json::json!({"location": {"lat": lat, "lng": lng}})),
414 )
415 .unwrap()
416 }
417
418 #[test]
419 fn test_haversine_same_point() {
420 let a = Coordinate { lat: 0.0, lng: 0.0 };
421 assert!((haversine_distance(&a, &a) - 0.0).abs() < 0.001);
422 }
423
424 #[test]
425 fn test_haversine_known_distance() {
426 let nyc = Coordinate {
428 lat: 40.7128,
429 lng: -74.0060,
430 };
431 let london = Coordinate {
432 lat: 51.5074,
433 lng: -0.1278,
434 };
435 let dist = haversine_distance(&nyc, &london);
436 assert!((dist - 5570.0).abs() < 50.0, "NYC-London: {dist} km");
437 }
438
439 #[test]
440 fn test_coordinate_validation() {
441 assert!(Coordinate::new(45.0, 90.0).is_ok());
442 assert!(Coordinate::new(91.0, 0.0).is_err());
443 assert!(Coordinate::new(0.0, 181.0).is_err());
444 }
445
446 #[test]
447 fn test_bounding_box_contains() {
448 let bbox = BoundingBox {
449 min_lat: 40.0,
450 max_lat: 42.0,
451 min_lng: -75.0,
452 max_lng: -73.0,
453 };
454 assert!(bbox.contains(&Coordinate {
455 lat: 41.0,
456 lng: -74.0
457 }));
458 assert!(!bbox.contains(&Coordinate {
459 lat: 43.0,
460 lng: -74.0
461 }));
462 }
463
464 #[test]
465 fn test_extract_coordinates_from_metadata_location() {
466 let event = make_geo_event("e1", 40.7128, -74.0060);
467 let coord = extract_coordinates(&event).unwrap();
468 assert!((coord.lat - 40.7128).abs() < 0.001);
469 assert!((coord.lng - (-74.0060)).abs() < 0.001);
470 }
471
472 #[test]
473 fn test_extract_coordinates_from_payload_flat() {
474 let event = Event::from_strings(
475 "location.update".to_string(),
476 "e1".to_string(),
477 "default".to_string(),
478 serde_json::json!({"lat": 51.5074, "lng": -0.1278}),
479 None,
480 )
481 .unwrap();
482 let coord = extract_coordinates(&event).unwrap();
483 assert!((coord.lat - 51.5074).abs() < 0.001);
484 }
485
486 #[test]
487 fn test_no_coordinates() {
488 let event = Event::from_strings(
489 "user.created".to_string(),
490 "u1".to_string(),
491 "default".to_string(),
492 serde_json::json!({"name": "Alice"}),
493 None,
494 )
495 .unwrap();
496 assert!(extract_coordinates(&event).is_none());
497 }
498
499 #[test]
500 fn test_geo_index_and_bbox_query() {
501 let index = GeoIndex::new();
502 let events = vec![
503 make_geo_event("e1", 40.7128, -74.0060), make_geo_event("e2", 51.5074, -0.1278), make_geo_event("e3", 41.0, -73.5), ];
507 for e in &events {
508 index.index_event(e);
509 }
510 assert_eq!(index.len(), 3);
511
512 let bbox = BoundingBox {
513 min_lat: 40.0,
514 max_lat: 42.0,
515 min_lng: -75.0,
516 max_lng: -73.0,
517 };
518 let results = index.query_bbox(&bbox);
519 assert_eq!(results.len(), 2); }
521
522 #[test]
523 fn test_geo_index_radius_query() {
524 let index = GeoIndex::new();
525 let events = vec![
526 make_geo_event("e1", 40.7128, -74.0060), make_geo_event("e2", 51.5074, -0.1278), make_geo_event("e3", 40.75, -73.99), ];
530 for e in &events {
531 index.index_event(e);
532 }
533
534 let query = RadiusQuery {
535 center_lat: 40.7128,
536 center_lng: -74.0060,
537 radius_km: 10.0,
538 };
539 let results = index.query_radius(&query);
540 assert_eq!(results.len(), 2); for (_, _, dist) in &results {
542 assert!(*dist <= 10.0);
543 }
544 }
545
546 #[test]
547 fn test_execute_geo_query_with_type_filter() {
548 let index = GeoIndex::new();
549 let mut events = vec![make_geo_event("e1", 40.7128, -74.0060)];
550 let other = Event::from_strings(
551 "user.created".to_string(),
552 "e2".to_string(),
553 "default".to_string(),
554 serde_json::json!({"lat": 40.75, "lng": -73.99}),
555 None,
556 )
557 .unwrap();
558 events.push(other);
559 for e in &events {
560 index.index_event(e);
561 }
562
563 let req = GeoQueryRequest {
564 bbox: Some(BoundingBox {
565 min_lat: 40.0,
566 max_lat: 42.0,
567 min_lng: -75.0,
568 max_lng: -73.0,
569 }),
570 radius: None,
571 event_type: Some("location.update".to_string()),
572 tenant_id: None,
573 limit: None,
574 sort_by_distance: None,
575 };
576 let results = execute_geo_query(&events, &index, &req);
577 assert_eq!(results.len(), 1);
578 }
579
580 #[test]
581 fn test_geo_index_stats() {
582 let index = GeoIndex::new();
583 let events = vec![
584 make_geo_event("e1", 40.7128, -74.0060),
585 make_geo_event("e2", 51.5074, -0.1278),
586 ];
587 for e in &events {
588 index.index_event(e);
589 }
590 let stats = index.stats();
591 assert_eq!(stats.indexed_events, 2);
592 assert!(stats.grid_cells >= 1);
593 }
594}