spatio_server/
handler.rs

1use crate::protocol::{CurrentLocation, LocationUpdate, SpatioService, Stats, UpsertOptions};
2use spatio::Spatio;
3use spatio_types::geo::{DistanceMetric, Point, Polygon};
4use spatio_types::point::Point3d;
5use std::sync::Arc;
6use tarpc::context;
7
8#[derive(Clone)]
9pub struct Handler {
10    db: Arc<Spatio>,
11}
12
13impl Handler {
14    pub fn new(db: Arc<Spatio>) -> Self {
15        Self { db }
16    }
17}
18
19// tarpc 0.34+ removed #[tarpc::server] macro - traits use async fn directly
20impl SpatioService for Handler {
21    async fn upsert(
22        self,
23        _: context::Context,
24        namespace: String,
25        id: String,
26        point: Point3d,
27        metadata: serde_json::Value,
28        opts: Option<UpsertOptions>,
29    ) -> Result<(), String> {
30        // Convert RPC options to internal options
31        // DB.upsert takes Option<UpsertOptions> which needs to be mapped to internal config type
32
33        let db_opts = opts.map(|o| spatio::config::SetOptions {
34            ttl: Some(o.ttl),
35            ..Default::default()
36        });
37
38        self.db
39            .upsert(&namespace, &id, point, metadata, db_opts)
40            .map_err(|e| e.to_string())
41    }
42
43    async fn get(
44        self,
45        _: context::Context,
46        namespace: String,
47        id: String,
48    ) -> Result<Option<CurrentLocation>, String> {
49        match self.db.get(&namespace, &id) {
50            Ok(Some(loc)) => Ok(Some(CurrentLocation {
51                object_id: loc.object_id,
52                position: loc.position,
53                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
54            })),
55            Ok(None) => Ok(None),
56            Err(e) => Err(e.to_string()),
57        }
58    }
59
60    async fn delete(
61        self,
62        _: context::Context,
63        namespace: String,
64        id: String,
65    ) -> Result<(), String> {
66        self.db.delete(&namespace, &id).map_err(|e| e.to_string())
67    }
68
69    async fn query_radius(
70        self,
71        _: context::Context,
72        namespace: String,
73        center: Point3d,
74        radius: f64,
75        limit: usize,
76    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
77        self.db
78            .query_radius(&namespace, &center, radius, limit)
79            .map(|results| {
80                results
81                    .into_iter()
82                    .map(|(loc, dist)| {
83                        (
84                            CurrentLocation {
85                                object_id: loc.object_id,
86                                position: loc.position,
87                                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
88                            },
89                            dist,
90                        )
91                    })
92                    .collect()
93            })
94            .map_err(|e| e.to_string())
95    }
96
97    async fn knn(
98        self,
99        _: context::Context,
100        namespace: String,
101        center: Point3d,
102        k: usize,
103    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
104        self.db
105            .knn(&namespace, &center, k)
106            .map(|results| {
107                results
108                    .into_iter()
109                    .map(|(loc, dist)| {
110                        (
111                            CurrentLocation {
112                                object_id: loc.object_id,
113                                position: loc.position,
114                                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
115                            },
116                            dist,
117                        )
118                    })
119                    .collect()
120            })
121            .map_err(|e| e.to_string())
122    }
123
124    async fn stats(self, _: context::Context) -> Stats {
125        let s = self.db.stats();
126        Stats {
127            object_count: s.hot_state_objects,
128            memory_usage_bytes: s.memory_usage_bytes,
129        }
130    }
131
132    async fn query_bbox(
133        self,
134        _: context::Context,
135        namespace: String,
136        min_x: f64,
137        min_y: f64,
138        max_x: f64,
139        max_y: f64,
140        limit: usize,
141    ) -> Result<Vec<CurrentLocation>, String> {
142        self.db
143            .query_bbox(&namespace, min_x, min_y, max_x, max_y, limit)
144            .map(|results| {
145                results
146                    .into_iter()
147                    .map(|loc| CurrentLocation {
148                        object_id: loc.object_id,
149                        position: loc.position,
150                        metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
151                    })
152                    .collect()
153            })
154            .map_err(|e| e.to_string())
155    }
156
157    async fn query_cylinder(
158        self,
159        _: context::Context,
160        namespace: String,
161        center: Point,
162        min_z: f64,
163        max_z: f64,
164        radius: f64,
165        limit: usize,
166    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
167        self.db
168            .query_within_cylinder(&namespace, center, min_z, max_z, radius, limit)
169            .map(|results| {
170                results
171                    .into_iter()
172                    .map(|(loc, dist)| {
173                        (
174                            CurrentLocation {
175                                object_id: loc.object_id,
176                                position: loc.position,
177                                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
178                            },
179                            dist,
180                        )
181                    })
182                    .collect()
183            })
184            .map_err(|e| e.to_string())
185    }
186
187    async fn query_trajectory(
188        self,
189        _: context::Context,
190        namespace: String,
191        id: String,
192        start_time: Option<f64>,
193        end_time: Option<f64>,
194        limit: usize,
195    ) -> Result<Vec<LocationUpdate>, String> {
196        let start = start_time
197            .map(|t| std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(t))
198            .unwrap_or(std::time::UNIX_EPOCH);
199        let end = end_time
200            .map(|t| std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(t))
201            .unwrap_or_else(std::time::SystemTime::now);
202
203        self.db
204            .query_trajectory(&namespace, &id, start, end, limit)
205            .map(|results| {
206                results
207                    .into_iter()
208                    .map(|upd| {
209                        let timestamp = upd
210                            .timestamp
211                            .duration_since(std::time::UNIX_EPOCH)
212                            .unwrap_or_default()
213                            .as_secs_f64();
214
215                        LocationUpdate {
216                            timestamp,
217                            position: upd.position,
218                            metadata: serde_json::to_vec(&upd.metadata).unwrap_or_default(),
219                        }
220                    })
221                    .collect()
222            })
223            .map_err(|e| e.to_string())
224    }
225
226    async fn insert_trajectory(
227        self,
228        _: context::Context,
229        namespace: String,
230        id: String,
231        trajectory: Vec<(f64, Point3d, serde_json::Value)>,
232    ) -> Result<(), String> {
233        let updates: Vec<spatio::config::TemporalPoint> = trajectory
234            .into_iter()
235            .map(|(ts, p, _meta)| {
236                // TODO: Current DB insert_trajectory uses TemporalPoint (2D) and drops Z/metadata
237                // We need to update spatio-core to support 3D points and metadata in trajectory storage
238                let timestamp = std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(ts);
239                spatio::config::TemporalPoint::new(*p.point_2d(), timestamp)
240            })
241            .collect();
242
243        self.db
244            .insert_trajectory(&namespace, &id, &updates)
245            .map_err(|e| e.to_string())
246    }
247
248    async fn query_bbox_3d(
249        self,
250        _: context::Context,
251        namespace: String,
252        min_x: f64,
253        min_y: f64,
254        min_z: f64,
255        max_x: f64,
256        max_y: f64,
257        max_z: f64,
258        limit: usize,
259    ) -> Result<Vec<CurrentLocation>, String> {
260        self.db
261            .query_within_bbox_3d(&namespace, min_x, min_y, min_z, max_x, max_y, max_z, limit)
262            .map(|results| {
263                results
264                    .into_iter()
265                    .map(|loc| CurrentLocation {
266                        object_id: loc.object_id,
267                        position: loc.position,
268                        metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
269                    })
270                    .collect()
271            })
272            .map_err(|e| e.to_string())
273    }
274
275    async fn query_near(
276        self,
277        _: context::Context,
278        namespace: String,
279        id: String,
280        radius: f64,
281        limit: usize,
282    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
283        self.db
284            .query_near(&namespace, &id, radius, limit)
285            .map(|results| {
286                results
287                    .into_iter()
288                    .map(|(loc, dist)| {
289                        (
290                            CurrentLocation {
291                                object_id: loc.object_id,
292                                position: loc.position,
293                                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
294                            },
295                            dist,
296                        )
297                    })
298                    .collect()
299            })
300            .map_err(|e| e.to_string())
301    }
302
303    async fn contains(
304        self,
305        _: context::Context,
306        namespace: String,
307        polygon: Polygon,
308        limit: usize,
309    ) -> Result<Vec<CurrentLocation>, String> {
310        self.db
311            .query_polygon(&namespace, &polygon, limit)
312            .map(|results| {
313                results
314                    .into_iter()
315                    .map(|loc| CurrentLocation {
316                        object_id: loc.object_id,
317                        position: loc.position,
318                        metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
319                    })
320                    .collect()
321            })
322            .map_err(|e| e.to_string())
323    }
324
325    async fn distance(
326        self,
327        _: context::Context,
328        namespace: String,
329        id1: String,
330        id2: String,
331        metric: Option<DistanceMetric>,
332    ) -> Result<Option<f64>, String> {
333        self.db
334            .distance_between(&namespace, &id1, &id2, metric.unwrap_or_default())
335            .map_err(|e| e.to_string())
336    }
337
338    async fn distance_to(
339        self,
340        _: context::Context,
341        namespace: String,
342        id: String,
343        point: Point,
344        metric: Option<DistanceMetric>,
345    ) -> Result<Option<f64>, String> {
346        self.db
347            .distance_to(&namespace, &id, &point, metric.unwrap_or_default())
348            .map_err(|e| e.to_string())
349    }
350
351    async fn convex_hull(
352        self,
353        _: context::Context,
354        namespace: String,
355    ) -> Result<Option<Polygon>, String> {
356        self.db.convex_hull(&namespace).map_err(|e| e.to_string())
357    }
358
359    async fn bounding_box(
360        self,
361        _: context::Context,
362        namespace: String,
363    ) -> Result<Option<spatio_types::bbox::BoundingBox2D>, String> {
364        self.db
365            .bounding_box(&namespace)
366            .map(|opt| opt.map(spatio_types::bbox::BoundingBox2D::from_rect))
367            .map_err(|e| e.to_string())
368    }
369}