1use crate::protocol::{CurrentLocation, LocationUpdate, SpatioService, Stats, UpsertOptions};
2use spatio::Spatio;
3use spatio_types::geo::{DistanceMetric, Point, Polygon};
4use spatio_types::point::Point3d;
5use std::sync::Arc;
6use tarpc::context;
7
8#[derive(Clone)]
9pub struct Handler {
10 db: Arc<Spatio>,
11}
12
13impl Handler {
14 pub fn new(db: Arc<Spatio>) -> Self {
15 Self { db }
16 }
17}
18
19impl SpatioService for Handler {
20 async fn upsert(
21 self,
22 _: context::Context,
23 namespace: String,
24 id: String,
25 point: Point3d,
26 metadata: serde_json::Value,
27 opts: Option<UpsertOptions>,
28 ) -> Result<(), String> {
29 let db = self.db.clone();
30 tokio::task::spawn_blocking(move || {
31 let db_opts = opts.map(|o| spatio::config::SetOptions {
32 ttl: Some(o.ttl),
33 ..Default::default()
34 });
35
36 db.upsert(&namespace, &id, point, metadata, db_opts)
37 .map_err(|e| e.to_string())
38 })
39 .await
40 .map_err(|e| format!("Internal error: {}", e))?
41 }
42
43 async fn get(
44 self,
45 _: context::Context,
46 namespace: String,
47 id: String,
48 ) -> Result<Option<CurrentLocation>, String> {
49 let db = self.db.clone();
50 tokio::task::spawn_blocking(move || match db.get(&namespace, &id) {
51 Ok(Some(loc)) => Ok(Some(CurrentLocation {
52 object_id: loc.object_id,
53 position: loc.position,
54 metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
55 })),
56 Ok(None) => Ok(None),
57 Err(e) => Err(e.to_string()),
58 })
59 .await
60 .map_err(|e| format!("Internal error: {}", e))?
61 }
62
63 async fn delete(
64 self,
65 _: context::Context,
66 namespace: String,
67 id: String,
68 ) -> Result<(), String> {
69 let db = self.db.clone();
70 tokio::task::spawn_blocking(move || db.delete(&namespace, &id).map_err(|e| e.to_string()))
71 .await
72 .map_err(|e| format!("Internal error: {}", e))?
73 }
74
75 async fn query_radius(
76 self,
77 _: context::Context,
78 namespace: String,
79 center: Point3d,
80 radius: f64,
81 limit: usize,
82 ) -> Result<Vec<(CurrentLocation, f64)>, String> {
83 let db = self.db.clone();
84 tokio::task::spawn_blocking(move || {
85 db.query_radius(&namespace, ¢er, radius, limit)
86 .map(|results| {
87 results
88 .into_iter()
89 .map(|(loc, dist)| {
90 (
91 CurrentLocation {
92 object_id: loc.object_id,
93 position: loc.position,
94 metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
95 },
96 dist,
97 )
98 })
99 .collect()
100 })
101 .map_err(|e| e.to_string())
102 })
103 .await
104 .map_err(|e| format!("Internal error: {}", e))?
105 }
106
107 async fn knn(
108 self,
109 _: context::Context,
110 namespace: String,
111 center: Point3d,
112 k: usize,
113 ) -> Result<Vec<(CurrentLocation, f64)>, String> {
114 let db = self.db.clone();
115 tokio::task::spawn_blocking(move || {
116 db.knn(&namespace, ¢er, k)
117 .map(|results| {
118 results
119 .into_iter()
120 .map(|(loc, dist)| {
121 (
122 CurrentLocation {
123 object_id: loc.object_id,
124 position: loc.position,
125 metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
126 },
127 dist,
128 )
129 })
130 .collect()
131 })
132 .map_err(|e| e.to_string())
133 })
134 .await
135 .map_err(|e| format!("Internal error: {}", e))?
136 }
137
138 async fn stats(self, _: context::Context) -> Stats {
139 let db = self.db.clone();
140 tokio::task::spawn_blocking(move || {
141 let s = db.stats();
142 Stats {
143 object_count: s.hot_state_objects,
144 memory_usage_bytes: s.memory_usage_bytes,
145 }
146 })
147 .await
148 .unwrap()
149 }
150
151 async fn query_bbox(
152 self,
153 _: context::Context,
154 namespace: String,
155 min_x: f64,
156 min_y: f64,
157 max_x: f64,
158 max_y: f64,
159 limit: usize,
160 ) -> Result<Vec<CurrentLocation>, String> {
161 let db = self.db.clone();
162 tokio::task::spawn_blocking(move || {
163 db.query_bbox(&namespace, min_x, min_y, max_x, max_y, limit)
164 .map(|results| {
165 results
166 .into_iter()
167 .map(|loc| CurrentLocation {
168 object_id: loc.object_id,
169 position: loc.position,
170 metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
171 })
172 .collect()
173 })
174 .map_err(|e| e.to_string())
175 })
176 .await
177 .map_err(|e| format!("Internal error: {}", e))?
178 }
179
180 async fn query_cylinder(
181 self,
182 _: context::Context,
183 namespace: String,
184 center: Point,
185 min_z: f64,
186 max_z: f64,
187 radius: f64,
188 limit: usize,
189 ) -> Result<Vec<(CurrentLocation, f64)>, String> {
190 let db = self.db.clone();
191 tokio::task::spawn_blocking(move || {
192 db.query_within_cylinder(&namespace, center, min_z, max_z, radius, limit)
193 .map(|results| {
194 results
195 .into_iter()
196 .map(|(loc, dist)| {
197 (
198 CurrentLocation {
199 object_id: loc.object_id,
200 position: loc.position,
201 metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
202 },
203 dist,
204 )
205 })
206 .collect()
207 })
208 .map_err(|e| e.to_string())
209 })
210 .await
211 .map_err(|e| format!("Internal error: {}", e))?
212 }
213
214 async fn query_trajectory(
215 self,
216 _: context::Context,
217 namespace: String,
218 id: String,
219 start_time: Option<f64>,
220 end_time: Option<f64>,
221 limit: usize,
222 ) -> Result<Vec<LocationUpdate>, String> {
223 let db = self.db.clone();
224 tokio::task::spawn_blocking(move || {
225 let start = start_time
226 .map(|t| std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(t))
227 .unwrap_or(std::time::UNIX_EPOCH);
228 let end = end_time
229 .map(|t| std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(t))
230 .unwrap_or_else(std::time::SystemTime::now);
231
232 db.query_trajectory(&namespace, &id, start, end, limit)
233 .map(|results| {
234 results
235 .into_iter()
236 .map(|upd| {
237 let timestamp = upd
238 .timestamp
239 .duration_since(std::time::UNIX_EPOCH)
240 .unwrap_or_default()
241 .as_secs_f64();
242
243 LocationUpdate {
244 timestamp,
245 position: upd.position,
246 metadata: serde_json::to_vec(&upd.metadata).unwrap_or_default(),
247 }
248 })
249 .collect()
250 })
251 .map_err(|e| e.to_string())
252 })
253 .await
254 .map_err(|e| format!("Internal error: {}", e))?
255 }
256
257 async fn insert_trajectory(
258 self,
259 _: context::Context,
260 namespace: String,
261 id: String,
262 trajectory: Vec<(f64, Point3d, serde_json::Value)>,
263 ) -> Result<(), String> {
264 let db = self.db.clone();
265 tokio::task::spawn_blocking(move || {
266 let updates: Vec<spatio::config::TemporalPoint> = trajectory
267 .into_iter()
268 .map(|(ts, p, _meta)| {
269 let timestamp = std::time::UNIX_EPOCH + std::time::Duration::from_secs_f64(ts);
272 spatio::config::TemporalPoint::new(*p.point_2d(), timestamp)
273 })
274 .collect();
275
276 db.insert_trajectory(&namespace, &id, &updates)
277 .map_err(|e| e.to_string())
278 })
279 .await
280 .map_err(|e| format!("Internal error: {}", e))?
281 }
282
283 async fn query_bbox_3d(
284 self,
285 _: context::Context,
286 namespace: String,
287 min_x: f64,
288 min_y: f64,
289 min_z: f64,
290 max_x: f64,
291 max_y: f64,
292 max_z: f64,
293 limit: usize,
294 ) -> Result<Vec<CurrentLocation>, String> {
295 let db = self.db.clone();
296 tokio::task::spawn_blocking(move || {
297 db.query_within_bbox_3d(&namespace, min_x, min_y, min_z, max_x, max_y, max_z, limit)
298 .map(|results| {
299 results
300 .into_iter()
301 .map(|loc| CurrentLocation {
302 object_id: loc.object_id,
303 position: loc.position,
304 metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
305 })
306 .collect()
307 })
308 .map_err(|e| e.to_string())
309 })
310 .await
311 .map_err(|e| format!("Internal error: {}", e))?
312 }
313
314 async fn query_near(
315 self,
316 _: context::Context,
317 namespace: String,
318 id: String,
319 radius: f64,
320 limit: usize,
321 ) -> Result<Vec<(CurrentLocation, f64)>, String> {
322 let db = self.db.clone();
323 tokio::task::spawn_blocking(move || {
324 db.query_near(&namespace, &id, radius, limit)
325 .map(|results| {
326 results
327 .into_iter()
328 .map(|(loc, dist)| {
329 (
330 CurrentLocation {
331 object_id: loc.object_id,
332 position: loc.position,
333 metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
334 },
335 dist,
336 )
337 })
338 .collect()
339 })
340 .map_err(|e| e.to_string())
341 })
342 .await
343 .map_err(|e| format!("Internal error: {}", e))?
344 }
345
346 async fn contains(
347 self,
348 _: context::Context,
349 namespace: String,
350 polygon: Polygon,
351 limit: usize,
352 ) -> Result<Vec<CurrentLocation>, String> {
353 let db = self.db.clone();
354 tokio::task::spawn_blocking(move || {
355 db.query_polygon(&namespace, &polygon, limit)
356 .map(|results| {
357 results
358 .into_iter()
359 .map(|loc| CurrentLocation {
360 object_id: loc.object_id,
361 position: loc.position,
362 metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
363 })
364 .collect()
365 })
366 .map_err(|e| e.to_string())
367 })
368 .await
369 .map_err(|e| format!("Internal error: {}", e))?
370 }
371
372 async fn distance(
373 self,
374 _: context::Context,
375 namespace: String,
376 id1: String,
377 id2: String,
378 metric: Option<DistanceMetric>,
379 ) -> Result<Option<f64>, String> {
380 let db = self.db.clone();
381 tokio::task::spawn_blocking(move || {
382 db.distance_between(&namespace, &id1, &id2, metric.unwrap_or_default())
383 .map_err(|e| e.to_string())
384 })
385 .await
386 .map_err(|e| format!("Internal error: {}", e))?
387 }
388
389 async fn distance_to(
390 self,
391 _: context::Context,
392 namespace: String,
393 id: String,
394 point: Point,
395 metric: Option<DistanceMetric>,
396 ) -> Result<Option<f64>, String> {
397 let db = self.db.clone();
398 tokio::task::spawn_blocking(move || {
399 db.distance_to(&namespace, &id, &point, metric.unwrap_or_default())
400 .map_err(|e| e.to_string())
401 })
402 .await
403 .map_err(|e| format!("Internal error: {}", e))?
404 }
405
406 async fn convex_hull(
407 self,
408 _: context::Context,
409 namespace: String,
410 ) -> Result<Option<Polygon>, String> {
411 let db = self.db.clone();
412 tokio::task::spawn_blocking(move || db.convex_hull(&namespace).map_err(|e| e.to_string()))
413 .await
414 .map_err(|e| format!("Internal error: {}", e))?
415 }
416
417 async fn bounding_box(
418 self,
419 _: context::Context,
420 namespace: String,
421 ) -> Result<Option<spatio_types::bbox::BoundingBox2D>, String> {
422 let db = self.db.clone();
423 tokio::task::spawn_blocking(move || {
424 db.bounding_box(&namespace)
425 .map(|opt| opt.map(spatio_types::bbox::BoundingBox2D::from_rect))
426 .map_err(|e| e.to_string())
427 })
428 .await
429 .map_err(|e| format!("Internal error: {}", e))?
430 }
431}