spatio_server/
handler.rs

1//! Handler implementation for Spatio RPC service
2
3use crate::protocol::{CurrentLocation, LocationUpdate, SpatioService, Stats};
4use crate::reader::Reader;
5use crate::writer::WriteOp;
6use spatio::Spatio;
7use spatio_types::geo::{DistanceMetric, Point, Polygon};
8use spatio_types::point::Point3d;
9use std::sync::Arc;
10use tarpc::context;
11use tokio::sync::mpsc;
12
13#[derive(Clone)]
14pub struct Handler {
15    write_tx: mpsc::Sender<WriteOp>,
16    reader: Reader,
17}
18
19impl Handler {
20    pub fn new(db: Arc<Spatio>, write_tx: mpsc::Sender<WriteOp>) -> Self {
21        let reader = Reader::new(db);
22        Self { write_tx, reader }
23    }
24}
25
26impl SpatioService for Handler {
27    async fn upsert(
28        self,
29        _: context::Context,
30        namespace: String,
31        id: String,
32        point: Point3d,
33        metadata: serde_json::Value,
34    ) -> Result<(), String> {
35        let op = WriteOp::Upsert {
36            namespace,
37            id,
38            point,
39            metadata,
40        };
41
42        self.write_tx
43            .send(op)
44            .await
45            .map_err(|_| "Server storage is overwhelmed or shutting down".to_string())
46    }
47
48    async fn get(
49        self,
50        _: context::Context,
51        namespace: String,
52        id: String,
53    ) -> Result<Option<CurrentLocation>, String> {
54        self.reader.get(&namespace, &id)
55    }
56
57    async fn delete(
58        self,
59        _: context::Context,
60        namespace: String,
61        id: String,
62    ) -> Result<(), String> {
63        let op = WriteOp::Delete { namespace, id };
64        self.write_tx
65            .send(op)
66            .await
67            .map_err(|_| "Server storage is overwhelmed or shutting down".to_string())
68    }
69
70    async fn query_radius(
71        self,
72        _: context::Context,
73        namespace: String,
74        center: Point3d,
75        radius: f64,
76        limit: usize,
77    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
78        self.reader.query_radius(&namespace, &center, radius, limit)
79    }
80
81    async fn knn(
82        self,
83        _: context::Context,
84        namespace: String,
85        center: Point3d,
86        k: usize,
87    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
88        self.reader.knn(&namespace, &center, k)
89    }
90
91    async fn query_bbox(
92        self,
93        _: context::Context,
94        namespace: String,
95        min_x: f64,
96        min_y: f64,
97        max_x: f64,
98        max_y: f64,
99        limit: usize,
100    ) -> Result<Vec<CurrentLocation>, String> {
101        self.reader
102            .query_bbox(&namespace, min_x, min_y, max_x, max_y, limit)
103    }
104
105    async fn query_cylinder(
106        self,
107        _: context::Context,
108        namespace: String,
109        center: Point,
110        min_z: f64,
111        max_z: f64,
112        radius: f64,
113        limit: usize,
114    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
115        self.reader
116            .query_cylinder(&namespace, center, min_z, max_z, radius, limit)
117    }
118
119    async fn query_trajectory(
120        self,
121        _: context::Context,
122        namespace: String,
123        id: String,
124        start_time: Option<f64>,
125        end_time: Option<f64>,
126        limit: usize,
127    ) -> Result<Vec<LocationUpdate>, String> {
128        let reader = self.reader.clone();
129        tokio::task::spawn_blocking(move || {
130            reader.query_trajectory(&namespace, &id, start_time, end_time, limit)
131        })
132        .await
133        .map_err(|e| format!("Internal error: {}", e))?
134    }
135
136    async fn insert_trajectory(
137        self,
138        _: context::Context,
139        namespace: String,
140        id: String,
141        trajectory: Vec<(f64, Point3d, serde_json::Value)>,
142    ) -> Result<(), String> {
143        let op = WriteOp::InsertTrajectory {
144            namespace,
145            id,
146            trajectory,
147        };
148        self.write_tx
149            .send(op)
150            .await
151            .map_err(|_| "Server storage is overwhelmed or shutting down".to_string())
152    }
153
154    async fn query_bbox_3d(
155        self,
156        _: context::Context,
157        namespace: String,
158        min_x: f64,
159        min_y: f64,
160        min_z: f64,
161        max_x: f64,
162        max_y: f64,
163        max_z: f64,
164        limit: usize,
165    ) -> Result<Vec<CurrentLocation>, String> {
166        self.reader
167            .query_bbox_3d(&namespace, min_x, min_y, min_z, max_x, max_y, max_z, limit)
168    }
169
170    async fn query_near(
171        self,
172        _: context::Context,
173        namespace: String,
174        id: String,
175        radius: f64,
176        limit: usize,
177    ) -> Result<Vec<(CurrentLocation, f64)>, String> {
178        self.reader.query_near(&namespace, &id, radius, limit)
179    }
180
181    async fn contains(
182        self,
183        _: context::Context,
184        namespace: String,
185        polygon: Polygon,
186        limit: usize,
187    ) -> Result<Vec<CurrentLocation>, String> {
188        self.reader.contains(&namespace, &polygon, limit)
189    }
190
191    async fn distance(
192        self,
193        _: context::Context,
194        namespace: String,
195        id1: String,
196        id2: String,
197        metric: Option<DistanceMetric>,
198    ) -> Result<Option<f64>, String> {
199        self.reader.distance(&namespace, &id1, &id2, metric)
200    }
201
202    async fn distance_to(
203        self,
204        _: context::Context,
205        namespace: String,
206        id: String,
207        point: Point,
208        metric: Option<DistanceMetric>,
209    ) -> Result<Option<f64>, String> {
210        self.reader.distance_to(&namespace, &id, &point, metric)
211    }
212
213    async fn convex_hull(
214        self,
215        _: context::Context,
216        namespace: String,
217    ) -> Result<Option<Polygon>, String> {
218        self.reader.convex_hull(&namespace)
219    }
220
221    async fn bounding_box(
222        self,
223        _: context::Context,
224        namespace: String,
225    ) -> Result<Option<spatio_types::bbox::BoundingBox2D>, String> {
226        self.reader.bounding_box(&namespace)
227    }
228
229    async fn stats(self, _: context::Context) -> Stats {
230        self.reader.stats()
231    }
232}