spatio_server/
handler.rs

1use crate::protocol::{CurrentLocation, LocationUpdate, SpatioService, Stats, UpsertOptions};
2use spatio::Spatio;
3use spatio_types::geo::{DistanceMetric, Point, Polygon};
4use spatio_types::point::Point3d;
5use std::sync::Arc;
6use tarpc::context;
7
8#[derive(Clone)]
9pub struct Handler {
10    db: Arc<Spatio>,
11}
12
13impl Handler {
14    pub fn new(db: Arc<Spatio>) -> Self {
15        Self { db }
16    }
17}
18
19impl SpatioService for Handler {
20    async fn upsert(
21        self,
22        _: context::Context,
23        namespace: String,
24        id: String,
25        point: Point3d,
26        metadata: serde_json::Value,
27        opts: Option<UpsertOptions>,
28    ) -> Result<(), String> {
29        let db = self.db.clone();
30        tokio::task::spawn_blocking(move || {
31            let db_opts = opts.map(|o| spatio::config::SetOptions {
32                ttl: Some(o.ttl),
33                ..Default::default()
34            });
35
36            db.upsert(&namespace, &id, point, metadata, db_opts)
37                .map_err(|e| e.to_string())
38        })
39        .await
40        .map_err(|e| format!("Internal error: {}", e))?
41    }
42
43    async fn get(
44        self,
45        _: context::Context,
46        namespace: String,
47        id: String,
48    ) -> Result<Option<CurrentLocation>, String> {
49        let db = self.db.clone();
50        tokio::task::spawn_blocking(move || match db.get(&namespace, &id) {
51            Ok(Some(loc)) => Ok(Some(CurrentLocation {
52                object_id: loc.object_id,
53                position: loc.position,
54                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
55            })),
56            Ok(None) => Ok(None),
57            Err(e) => Err(e.to_string()),
58        })
59        .await
60        .map_err(|e| format!("Internal error: {}", e))?
61    }
62
63    async fn delete(
64        self,
65        _: context::Context,
66        namespace: String,
67        id: String,
68    ) -> Result<(), String> {
69        let db = self.db.clone();
70        tokio::task::spawn_blocking(move || db.delete(&namespace, &id).map_err(|e| e.to_string()))
71            .await
72            .map_err(|e| format!("Internal error: {}", e))?
73    }
74
75    async fn query_radius(
76        self,
77        _: context::Context,
78        namespace: String,
79        center: Point3d,
80        radius: f64,
81        limit: usize,
82    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
83        let db = self.db.clone();
84        tokio::task::spawn_blocking(move || {
85            db.query_radius(&namespace, &center, radius, limit)
86                .map(|results| {
87                    results
88                        .into_iter()
89                        .map(|(loc, dist)| {
90                            (
91                                CurrentLocation {
92                                    object_id: loc.object_id,
93                                    position: loc.position,
94                                    metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
95                                },
96                                dist,
97                            )
98                        })
99                        .collect()
100                })
101                .map_err(|e| e.to_string())
102        })
103        .await
104        .map_err(|e| format!("Internal error: {}", e))?
105    }
106
107    async fn knn(
108        self,
109        _: context::Context,
110        namespace: String,
111        center: Point3d,
112        k: usize,
113    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
114        let db = self.db.clone();
115        tokio::task::spawn_blocking(move || {
116            db.knn(&namespace, &center, k)
117                .map(|results| {
118                    results
119                        .into_iter()
120                        .map(|(loc, dist)| {
121                            (
122                                CurrentLocation {
123                                    object_id: loc.object_id,
124                                    position: loc.position,
125                                    metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
126                                },
127                                dist,
128                            )
129                        })
130                        .collect()
131                })
132                .map_err(|e| e.to_string())
133        })
134        .await
135        .map_err(|e| format!("Internal error: {}", e))?
136    }
137
138    async fn stats(self, _: context::Context) -> Stats {
139        let db = self.db.clone();
140        tokio::task::spawn_blocking(move || {
141            let s = db.stats();
142            Stats {
143                object_count: s.hot_state_objects,
144                memory_usage_bytes: s.memory_usage_bytes,
145            }
146        })
147        .await
148        .unwrap()
149    }
150
151    async fn query_bbox(
152        self,
153        _: context::Context,
154        namespace: String,
155        min_x: f64,
156        min_y: f64,
157        max_x: f64,
158        max_y: f64,
159        limit: usize,
160    ) -> Result<Vec<CurrentLocation>, String> {
161        let db = self.db.clone();
162        tokio::task::spawn_blocking(move || {
163            db.query_bbox(&namespace, min_x, min_y, max_x, max_y, limit)
164                .map(|results| {
165                    results
166                        .into_iter()
167                        .map(|loc| CurrentLocation {
168                            object_id: loc.object_id,
169                            position: loc.position,
170                            metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
171                        })
172                        .collect()
173                })
174                .map_err(|e| e.to_string())
175        })
176        .await
177        .map_err(|e| format!("Internal error: {}", e))?
178    }
179
180    async fn query_cylinder(
181        self,
182        _: context::Context,
183        namespace: String,
184        center: Point,
185        min_z: f64,
186        max_z: f64,
187        radius: f64,
188        limit: usize,
189    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
190        let db = self.db.clone();
191        tokio::task::spawn_blocking(move || {
192            db.query_within_cylinder(&namespace, center, min_z, max_z, radius, limit)
193                .map(|results| {
194                    results
195                        .into_iter()
196                        .map(|(loc, dist)| {
197                            (
198                                CurrentLocation {
199                                    object_id: loc.object_id,
200                                    position: loc.position,
201                                    metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
202                                },
203                                dist,
204                            )
205                        })
206                        .collect()
207                })
208                .map_err(|e| e.to_string())
209        })
210        .await
211        .map_err(|e| format!("Internal error: {}", e))?
212    }
213
214    async fn query_trajectory(
215        self,
216        _: context::Context,
217        namespace: String,
218        id: String,
219        start_time: Option<f64>,
220        end_time: Option<f64>,
221        limit: usize,
222    ) -> Result<Vec<LocationUpdate>, String> {
223        let db = self.db.clone();
224        tokio::task::spawn_blocking(move || {
225            let start = start_time
226                .map(|t| std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(t))
227                .unwrap_or(std::time::UNIX_EPOCH);
228            let end = end_time
229                .map(|t| std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(t))
230                .unwrap_or_else(std::time::SystemTime::now);
231
232            db.query_trajectory(&namespace, &id, start, end, limit)
233                .map(|results| {
234                    results
235                        .into_iter()
236                        .map(|upd| {
237                            let timestamp = upd
238                                .timestamp
239                                .duration_since(std::time::UNIX_EPOCH)
240                                .unwrap_or_default()
241                                .as_secs_f64();
242
243                            LocationUpdate {
244                                timestamp,
245                                position: upd.position,
246                                metadata: serde_json::to_vec(&upd.metadata).unwrap_or_default(),
247                            }
248                        })
249                        .collect()
250                })
251                .map_err(|e| e.to_string())
252        })
253        .await
254        .map_err(|e| format!("Internal error: {}", e))?
255    }
256
257    async fn insert_trajectory(
258        self,
259        _: context::Context,
260        namespace: String,
261        id: String,
262        trajectory: Vec<(f64, Point3d, serde_json::Value)>,
263    ) -> Result<(), String> {
264        let db = self.db.clone();
265        tokio::task::spawn_blocking(move || {
266            let updates: Vec<spatio::config::TemporalPoint> = trajectory
267                .into_iter()
268                .map(|(ts, p, _meta)| {
269                    // TODO: Current DB insert_trajectory uses TemporalPoint (2D) and drops Z/metadata
270                    // We need to update spatio-core to support 3D points and metadata in trajectory storage
271                    let timestamp = std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(ts);
272                    spatio::config::TemporalPoint::new(*p.point_2d(), timestamp)
273                })
274                .collect();
275
276            db.insert_trajectory(&namespace, &id, &updates)
277                .map_err(|e| e.to_string())
278        })
279        .await
280        .map_err(|e| format!("Internal error: {}", e))?
281    }
282
283    async fn query_bbox_3d(
284        self,
285        _: context::Context,
286        namespace: String,
287        min_x: f64,
288        min_y: f64,
289        min_z: f64,
290        max_x: f64,
291        max_y: f64,
292        max_z: f64,
293        limit: usize,
294    ) -> Result<Vec<CurrentLocation>, String> {
295        let db = self.db.clone();
296        tokio::task::spawn_blocking(move || {
297            db.query_within_bbox_3d(&namespace, min_x, min_y, min_z, max_x, max_y, max_z, limit)
298                .map(|results| {
299                    results
300                        .into_iter()
301                        .map(|loc| CurrentLocation {
302                            object_id: loc.object_id,
303                            position: loc.position,
304                            metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
305                        })
306                        .collect()
307                })
308                .map_err(|e| e.to_string())
309        })
310        .await
311        .map_err(|e| format!("Internal error: {}", e))?
312    }
313
314    async fn query_near(
315        self,
316        _: context::Context,
317        namespace: String,
318        id: String,
319        radius: f64,
320        limit: usize,
321    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
322        let db = self.db.clone();
323        tokio::task::spawn_blocking(move || {
324            db.query_near(&namespace, &id, radius, limit)
325                .map(|results| {
326                    results
327                        .into_iter()
328                        .map(|(loc, dist)| {
329                            (
330                                CurrentLocation {
331                                    object_id: loc.object_id,
332                                    position: loc.position,
333                                    metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
334                                },
335                                dist,
336                            )
337                        })
338                        .collect()
339                })
340                .map_err(|e| e.to_string())
341        })
342        .await
343        .map_err(|e| format!("Internal error: {}", e))?
344    }
345
346    async fn contains(
347        self,
348        _: context::Context,
349        namespace: String,
350        polygon: Polygon,
351        limit: usize,
352    ) -> Result<Vec<CurrentLocation>, String> {
353        let db = self.db.clone();
354        tokio::task::spawn_blocking(move || {
355            db.query_polygon(&namespace, &polygon, limit)
356                .map(|results| {
357                    results
358                        .into_iter()
359                        .map(|loc| CurrentLocation {
360                            object_id: loc.object_id,
361                            position: loc.position,
362                            metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
363                        })
364                        .collect()
365                })
366                .map_err(|e| e.to_string())
367        })
368        .await
369        .map_err(|e| format!("Internal error: {}", e))?
370    }
371
372    async fn distance(
373        self,
374        _: context::Context,
375        namespace: String,
376        id1: String,
377        id2: String,
378        metric: Option<DistanceMetric>,
379    ) -> Result<Option<f64>, String> {
380        let db = self.db.clone();
381        tokio::task::spawn_blocking(move || {
382            db.distance_between(&namespace, &id1, &id2, metric.unwrap_or_default())
383                .map_err(|e| e.to_string())
384        })
385        .await
386        .map_err(|e| format!("Internal error: {}", e))?
387    }
388
389    async fn distance_to(
390        self,
391        _: context::Context,
392        namespace: String,
393        id: String,
394        point: Point,
395        metric: Option<DistanceMetric>,
396    ) -> Result<Option<f64>, String> {
397        let db = self.db.clone();
398        tokio::task::spawn_blocking(move || {
399            db.distance_to(&namespace, &id, &point, metric.unwrap_or_default())
400                .map_err(|e| e.to_string())
401        })
402        .await
403        .map_err(|e| format!("Internal error: {}", e))?
404    }
405
406    async fn convex_hull(
407        self,
408        _: context::Context,
409        namespace: String,
410    ) -> Result<Option<Polygon>, String> {
411        let db = self.db.clone();
412        tokio::task::spawn_blocking(move || db.convex_hull(&namespace).map_err(|e| e.to_string()))
413            .await
414            .map_err(|e| format!("Internal error: {}", e))?
415    }
416
417    async fn bounding_box(
418        self,
419        _: context::Context,
420        namespace: String,
421    ) -> Result<Option<spatio_types::bbox::BoundingBox2D>, String> {
422        let db = self.db.clone();
423        tokio::task::spawn_blocking(move || {
424            db.bounding_box(&namespace)
425                .map(|opt| opt.map(spatio_types::bbox::BoundingBox2D::from_rect))
426                .map_err(|e| e.to_string())
427        })
428        .await
429        .map_err(|e| format!("Internal error: {}", e))?
430    }
431}