spatio_server/
handler.rs

1use crate::protocol::{CurrentLocation, LocationUpdate, SpatioService, Stats, UpsertOptions};
2use spatio::Spatio;
3use spatio_types::geo::{DistanceMetric, Point, Polygon};
4use spatio_types::point::Point3d;
5use std::sync::Arc;
6use tarpc::context;
7
8#[derive(Clone)]
9pub struct Handler {
10    db: Arc<Spatio>,
11}
12
13impl Handler {
14    pub fn new(db: Arc<Spatio>) -> Self {
15        Self { db }
16    }
17}
18
19// tarpc 0.34+ removed #[tarpc::server] macro - traits use async fn directly
20impl SpatioService for Handler {
21    async fn upsert(
22        self,
23        _: context::Context,
24        namespace: String,
25        id: String,
26        point: Point3d,
27        metadata: serde_json::Value,
28        opts: Option<UpsertOptions>,
29    ) -> Result<(), String> {
30        // Convert RPC options to internal options if necessary, or pass through
31        // Here we assume internal API options match or we ignore for now,
32        // but `upsert` in DB takes `Option<UpsertOptions>`?
33        // Let's check DB signature. The DB.upsert takes `Option<UpsertOptions>`.
34        // We need to make sure the types align or convert.
35        // `protocol` module UpsertOptions struct is defined there.
36        // `spatio-core` has its own UpsertOptions? Let's check.
37        // Assuming we need to convert or if they are compatible.
38        // For now, let's map the fields manually to be safe if types differ, or use serde.
39
40        let db_opts = opts.map(|o| spatio::config::SetOptions {
41            ttl: Some(o.ttl),
42            ..Default::default()
43        });
44
45        self.db
46            .upsert(&namespace, &id, point, metadata, db_opts)
47            .map_err(|e| e.to_string())
48    }
49
50    async fn get(
51        self,
52        _: context::Context,
53        namespace: String,
54        id: String,
55    ) -> Result<Option<CurrentLocation>, String> {
56        match self.db.get(&namespace, &id) {
57            Ok(Some(loc)) => Ok(Some(CurrentLocation {
58                object_id: loc.object_id,
59                position: loc.position,
60                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
61            })),
62            Ok(None) => Ok(None),
63            Err(e) => Err(e.to_string()),
64        }
65    }
66
67    async fn delete(
68        self,
69        _: context::Context,
70        namespace: String,
71        id: String,
72    ) -> Result<(), String> {
73        self.db.delete(&namespace, &id).map_err(|e| e.to_string())
74    }
75
76    async fn query_radius(
77        self,
78        _: context::Context,
79        namespace: String,
80        center: Point3d,
81        radius: f64,
82        limit: usize,
83    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
84        self.db
85            .query_radius(&namespace, &center, radius, limit)
86            .map(|results| {
87                results
88                    .into_iter()
89                    .map(|(loc, dist)| {
90                        (
91                            CurrentLocation {
92                                object_id: loc.object_id,
93                                position: loc.position,
94                                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
95                            },
96                            dist,
97                        )
98                    })
99                    .collect()
100            })
101            .map_err(|e| e.to_string())
102    }
103
104    async fn knn(
105        self,
106        _: context::Context,
107        namespace: String,
108        center: Point3d,
109        k: usize,
110    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
111        self.db
112            .knn(&namespace, &center, k)
113            .map(|results| {
114                results
115                    .into_iter()
116                    .map(|(loc, dist)| {
117                        (
118                            CurrentLocation {
119                                object_id: loc.object_id,
120                                position: loc.position,
121                                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
122                            },
123                            dist,
124                        )
125                    })
126                    .collect()
127            })
128            .map_err(|e| e.to_string())
129    }
130
131    async fn stats(self, _: context::Context) -> Stats {
132        let s = self.db.stats();
133        Stats {
134            object_count: s.hot_state_objects,
135            memory_usage_bytes: s.memory_usage_bytes,
136        }
137    }
138
139    async fn query_bbox(
140        self,
141        _: context::Context,
142        namespace: String,
143        min_x: f64,
144        min_y: f64,
145        max_x: f64,
146        max_y: f64,
147        limit: usize,
148    ) -> Result<Vec<CurrentLocation>, String> {
149        self.db
150            .query_bbox(&namespace, min_x, min_y, max_x, max_y, limit)
151            .map(|results| {
152                results
153                    .into_iter()
154                    .map(|loc| CurrentLocation {
155                        object_id: loc.object_id,
156                        position: loc.position,
157                        metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
158                    })
159                    .collect()
160            })
161            .map_err(|e| e.to_string())
162    }
163
164    async fn query_cylinder(
165        self,
166        _: context::Context,
167        namespace: String,
168        center: Point,
169        min_z: f64,
170        max_z: f64,
171        radius: f64,
172        limit: usize,
173    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
174        self.db
175            .query_within_cylinder(&namespace, center, min_z, max_z, radius, limit)
176            .map(|results| {
177                results
178                    .into_iter()
179                    .map(|(loc, dist)| {
180                        (
181                            CurrentLocation {
182                                object_id: loc.object_id,
183                                position: loc.position,
184                                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
185                            },
186                            dist,
187                        )
188                    })
189                    .collect()
190            })
191            .map_err(|e| e.to_string())
192    }
193
194    async fn query_trajectory(
195        self,
196        _: context::Context,
197        namespace: String,
198        id: String,
199        start_time: Option<f64>,
200        end_time: Option<f64>,
201        limit: usize,
202    ) -> Result<Vec<LocationUpdate>, String> {
203        let start = start_time
204            .map(|t| std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(t))
205            .unwrap_or(std::time::UNIX_EPOCH);
206        let end = end_time
207            .map(|t| std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(t))
208            .unwrap_or_else(std::time::SystemTime::now);
209
210        self.db
211            .query_trajectory(&namespace, &id, start, end, limit)
212            .map(|results| {
213                results
214                    .into_iter()
215                    .map(|upd| {
216                        let timestamp = upd
217                            .timestamp
218                            .duration_since(std::time::UNIX_EPOCH)
219                            .unwrap_or_default()
220                            .as_secs_f64();
221
222                        LocationUpdate {
223                            timestamp,
224                            position: upd.position,
225                            metadata: serde_json::to_vec(&upd.metadata).unwrap_or_default(),
226                        }
227                    })
228                    .collect()
229            })
230            .map_err(|e| e.to_string())
231    }
232
233    async fn insert_trajectory(
234        self,
235        _: context::Context,
236        namespace: String,
237        id: String,
238        trajectory: Vec<(f64, Point3d, serde_json::Value)>,
239    ) -> Result<(), String> {
240        let updates: Vec<spatio::config::TemporalPoint> = trajectory
241            .into_iter()
242            .map(|(ts, p, _meta)| {
243                // Note: Current DB insert_trajectory uses TemporalPoint (2D) and drops Z/metadata
244                let timestamp = std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(ts);
245                spatio::config::TemporalPoint::new(*p.point_2d(), timestamp)
246            })
247            .collect();
248
249        self.db
250            .insert_trajectory(&namespace, &id, &updates)
251            .map_err(|e| e.to_string())
252    }
253
254    async fn query_bbox_3d(
255        self,
256        _: context::Context,
257        namespace: String,
258        min_x: f64,
259        min_y: f64,
260        min_z: f64,
261        max_x: f64,
262        max_y: f64,
263        max_z: f64,
264        limit: usize,
265    ) -> Result<Vec<CurrentLocation>, String> {
266        self.db
267            .query_within_bbox_3d(&namespace, min_x, min_y, min_z, max_x, max_y, max_z, limit)
268            .map(|results| {
269                results
270                    .into_iter()
271                    .map(|loc| CurrentLocation {
272                        object_id: loc.object_id,
273                        position: loc.position,
274                        metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
275                    })
276                    .collect()
277            })
278            .map_err(|e| e.to_string())
279    }
280
281    async fn query_near(
282        self,
283        _: context::Context,
284        namespace: String,
285        id: String,
286        radius: f64,
287        limit: usize,
288    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
289        self.db
290            .query_near(&namespace, &id, radius, limit)
291            .map(|results| {
292                results
293                    .into_iter()
294                    .map(|(loc, dist)| {
295                        (
296                            CurrentLocation {
297                                object_id: loc.object_id,
298                                position: loc.position,
299                                metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
300                            },
301                            dist,
302                        )
303                    })
304                    .collect()
305            })
306            .map_err(|e| e.to_string())
307    }
308
309    async fn contains(
310        self,
311        _: context::Context,
312        namespace: String,
313        polygon: Polygon,
314        limit: usize,
315    ) -> Result<Vec<CurrentLocation>, String> {
316        self.db
317            .query_polygon(&namespace, &polygon, limit)
318            .map(|results| {
319                results
320                    .into_iter()
321                    .map(|loc| CurrentLocation {
322                        object_id: loc.object_id,
323                        position: loc.position,
324                        metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
325                    })
326                    .collect()
327            })
328            .map_err(|e| e.to_string())
329    }
330
331    async fn distance(
332        self,
333        _: context::Context,
334        namespace: String,
335        id1: String,
336        id2: String,
337        metric: Option<DistanceMetric>,
338    ) -> Result<Option<f64>, String> {
339        self.db
340            .distance_between(&namespace, &id1, &id2, metric.unwrap_or_default())
341            .map_err(|e| e.to_string())
342    }
343
344    async fn distance_to(
345        self,
346        _: context::Context,
347        namespace: String,
348        id: String,
349        point: Point,
350        metric: Option<DistanceMetric>,
351    ) -> Result<Option<f64>, String> {
352        self.db
353            .distance_to(&namespace, &id, &point, metric.unwrap_or_default())
354            .map_err(|e| e.to_string())
355    }
356
357    async fn convex_hull(
358        self,
359        _: context::Context,
360        namespace: String,
361    ) -> Result<Option<Polygon>, String> {
362        self.db.convex_hull(&namespace).map_err(|e| e.to_string())
363    }
364
365    async fn bounding_box(
366        self,
367        _: context::Context,
368        namespace: String,
369    ) -> Result<Option<spatio_types::bbox::BoundingBox2D>, String> {
370        self.db
371            .bounding_box(&namespace)
372            .map(|opt| opt.map(spatio_types::bbox::BoundingBox2D::from_rect))
373            .map_err(|e| e.to_string())
374    }
375}