1use crate::protocol::{CurrentLocation, LocationUpdate, SpatioService, Stats};
4use crate::reader::Reader;
5use crate::writer::WriteOp;
6use spatio::Spatio;
7use spatio_types::geo::{DistanceMetric, Point, Polygon};
8use spatio_types::point::Point3d;
9use std::sync::Arc;
10use tarpc::context;
11use tokio::sync::mpsc;
12
13#[derive(Clone)]
14pub struct Handler {
15 write_tx: mpsc::Sender<WriteOp>,
16 reader: Reader,
17}
18
19impl Handler {
20 pub fn new(db: Arc<Spatio>, write_tx: mpsc::Sender<WriteOp>) -> Self {
21 let reader = Reader::new(db);
22 Self { write_tx, reader }
23 }
24}
25
26impl SpatioService for Handler {
27 async fn upsert(
28 self,
29 _: context::Context,
30 namespace: String,
31 id: String,
32 point: Point3d,
33 metadata: serde_json::Value,
34 ) -> Result<(), String> {
35 let op = WriteOp::Upsert {
36 namespace,
37 id,
38 point,
39 metadata,
40 };
41
42 self.write_tx
43 .send(op)
44 .await
45 .map_err(|_| "Server storage is overwhelmed or shutting down".to_string())
46 }
47
48 async fn get(
49 self,
50 _: context::Context,
51 namespace: String,
52 id: String,
53 ) -> Result<Option<CurrentLocation>, String> {
54 self.reader.get(&namespace, &id)
55 }
56
57 async fn delete(
58 self,
59 _: context::Context,
60 namespace: String,
61 id: String,
62 ) -> Result<(), String> {
63 let op = WriteOp::Delete { namespace, id };
64 self.write_tx
65 .send(op)
66 .await
67 .map_err(|_| "Server storage is overwhelmed or shutting down".to_string())
68 }
69
70 async fn query_radius(
71 self,
72 _: context::Context,
73 namespace: String,
74 center: Point3d,
75 radius: f64,
76 limit: usize,
77 ) -> Result<Vec<(CurrentLocation, f64)>, String> {
78 self.reader.query_radius(&namespace, ¢er, radius, limit)
79 }
80
81 async fn knn(
82 self,
83 _: context::Context,
84 namespace: String,
85 center: Point3d,
86 k: usize,
87 ) -> Result<Vec<(CurrentLocation, f64)>, String> {
88 self.reader.knn(&namespace, ¢er, k)
89 }
90
91 async fn query_bbox(
92 self,
93 _: context::Context,
94 namespace: String,
95 min_x: f64,
96 min_y: f64,
97 max_x: f64,
98 max_y: f64,
99 limit: usize,
100 ) -> Result<Vec<CurrentLocation>, String> {
101 self.reader
102 .query_bbox(&namespace, min_x, min_y, max_x, max_y, limit)
103 }
104
105 async fn query_cylinder(
106 self,
107 _: context::Context,
108 namespace: String,
109 center: Point,
110 min_z: f64,
111 max_z: f64,
112 radius: f64,
113 limit: usize,
114 ) -> Result<Vec<(CurrentLocation, f64)>, String> {
115 self.reader
116 .query_cylinder(&namespace, center, min_z, max_z, radius, limit)
117 }
118
119 async fn query_trajectory(
120 self,
121 _: context::Context,
122 namespace: String,
123 id: String,
124 start_time: Option<f64>,
125 end_time: Option<f64>,
126 limit: usize,
127 ) -> Result<Vec<LocationUpdate>, String> {
128 let reader = self.reader.clone();
129 tokio::task::spawn_blocking(move || {
130 reader.query_trajectory(&namespace, &id, start_time, end_time, limit)
131 })
132 .await
133 .map_err(|e| format!("Internal error: {}", e))?
134 }
135
136 async fn insert_trajectory(
137 self,
138 _: context::Context,
139 namespace: String,
140 id: String,
141 trajectory: Vec<(f64, Point3d, serde_json::Value)>,
142 ) -> Result<(), String> {
143 let op = WriteOp::InsertTrajectory {
144 namespace,
145 id,
146 trajectory,
147 };
148 self.write_tx
149 .send(op)
150 .await
151 .map_err(|_| "Server storage is overwhelmed or shutting down".to_string())
152 }
153
154 async fn query_bbox_3d(
155 self,
156 _: context::Context,
157 namespace: String,
158 min_x: f64,
159 min_y: f64,
160 min_z: f64,
161 max_x: f64,
162 max_y: f64,
163 max_z: f64,
164 limit: usize,
165 ) -> Result<Vec<CurrentLocation>, String> {
166 self.reader
167 .query_bbox_3d(&namespace, min_x, min_y, min_z, max_x, max_y, max_z, limit)
168 }
169
170 async fn query_near(
171 self,
172 _: context::Context,
173 namespace: String,
174 id: String,
175 radius: f64,
176 limit: usize,
177 ) -> Result<Vec<(CurrentLocation, f64)>, String> {
178 self.reader.query_near(&namespace, &id, radius, limit)
179 }
180
181 async fn contains(
182 self,
183 _: context::Context,
184 namespace: String,
185 polygon: Polygon,
186 limit: usize,
187 ) -> Result<Vec<CurrentLocation>, String> {
188 self.reader.contains(&namespace, &polygon, limit)
189 }
190
191 async fn distance(
192 self,
193 _: context::Context,
194 namespace: String,
195 id1: String,
196 id2: String,
197 metric: Option<DistanceMetric>,
198 ) -> Result<Option<f64>, String> {
199 self.reader.distance(&namespace, &id1, &id2, metric)
200 }
201
202 async fn distance_to(
203 self,
204 _: context::Context,
205 namespace: String,
206 id: String,
207 point: Point,
208 metric: Option<DistanceMetric>,
209 ) -> Result<Option<f64>, String> {
210 self.reader.distance_to(&namespace, &id, &point, metric)
211 }
212
213 async fn convex_hull(
214 self,
215 _: context::Context,
216 namespace: String,
217 ) -> Result<Option<Polygon>, String> {
218 self.reader.convex_hull(&namespace)
219 }
220
221 async fn bounding_box(
222 self,
223 _: context::Context,
224 namespace: String,
225 ) -> Result<Option<spatio_types::bbox::BoundingBox2D>, String> {
226 self.reader.bounding_box(&namespace)
227 }
228
229 async fn stats(self, _: context::Context) -> Stats {
230 self.reader.stats()
231 }
232}