1use crate::rpc::{Command, ResponsePayload, ResponseStatus};
2use spatio::Spatio;
3use std::sync::Arc;
4
5pub struct Handler {
6 db: Arc<Spatio>,
7}
8
9impl Handler {
10 pub fn new(db: Arc<Spatio>) -> Self {
11 Self { db }
12 }
13
14 pub async fn handle(&self, cmd: Command) -> (ResponseStatus, ResponsePayload) {
15 let db = self.db.clone();
16 let result = tokio::task::spawn_blocking(move || match cmd {
17 Command::Upsert {
18 namespace,
19 id,
20 point,
21 metadata,
22 opts,
23 } => {
24 let metadata_json =
25 serde_json::from_slice(&metadata).unwrap_or(serde_json::Value::Null);
26 match db.upsert(&namespace, &id, point, metadata_json, opts) {
27 Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
28 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
29 }
30 }
31 Command::Get { namespace, id } => match db.get(&namespace, &id) {
32 Ok(Some(loc)) => (
33 ResponseStatus::Ok,
34 ResponsePayload::Object {
35 id: loc.object_id,
36 point: loc.position,
37 metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
38 },
39 ),
40 Ok(None) => (
41 ResponseStatus::Error,
42 ResponsePayload::Error("Not found".into()),
43 ),
44 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
45 },
46 Command::QueryRadius {
47 namespace,
48 center,
49 radius,
50 limit,
51 } => match db.query_radius(&namespace, ¢er, radius, limit) {
52 Ok(results) => {
53 let formatted = results
54 .into_iter()
55 .map(|(loc, dist)| {
56 (
57 loc.object_id,
58 loc.position,
59 serde_json::to_vec(&loc.metadata).unwrap_or_default(),
60 dist,
61 )
62 })
63 .collect();
64 (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
65 }
66 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
67 },
68 Command::Knn {
69 namespace,
70 center,
71 k,
72 } => match db.knn(&namespace, ¢er, k) {
73 Ok(results) => {
74 let formatted = results
75 .into_iter()
76 .map(|(loc, dist)| {
77 (
78 loc.object_id,
79 loc.position,
80 serde_json::to_vec(&loc.metadata).unwrap_or_default(),
81 dist,
82 )
83 })
84 .collect();
85 (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
86 }
87 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
88 },
89 Command::Stats => {
90 let stats = db.stats();
91 (ResponseStatus::Ok, ResponsePayload::Stats(stats))
92 }
93 Command::Close => match db.close() {
94 Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
95 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
96 },
97 Command::Delete { namespace, id } => match db.delete(&namespace, &id) {
98 Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
99 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
100 },
101 Command::QueryBbox {
102 namespace,
103 min_x,
104 min_y,
105 max_x,
106 max_y,
107 limit,
108 } => match db.query_bbox(&namespace, min_x, min_y, max_x, max_y, limit) {
109 Ok(results) => {
110 let formatted = results
111 .into_iter()
112 .map(|loc| {
113 (
114 loc.object_id,
115 loc.position,
116 serde_json::to_vec(&loc.metadata).unwrap_or_default(),
117 )
118 })
119 .collect();
120 (ResponseStatus::Ok, ResponsePayload::ObjectList(formatted))
121 }
122 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
123 },
124 Command::QueryCylinder {
125 namespace,
126 center_x,
127 center_y,
128 min_z,
129 max_z,
130 radius,
131 limit,
132 } => {
133 let center = spatio_types::geo::Point::new(center_x, center_y);
134 match db.query_within_cylinder(&namespace, center, min_z, max_z, radius, limit) {
135 Ok(results) => {
136 let formatted = results
137 .into_iter()
138 .map(|(loc, dist)| {
139 (
140 loc.object_id,
141 loc.position,
142 serde_json::to_vec(&loc.metadata).unwrap_or_default(),
143 dist,
144 )
145 })
146 .collect();
147 (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
148 }
149 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
150 }
151 }
152 Command::QueryTrajectory {
153 namespace,
154 id,
155 start_time,
156 end_time,
157 limit,
158 } => match db.query_trajectory(&namespace, &id, start_time, end_time, limit) {
159 Ok(updates) => {
160 let mut formatted = Vec::with_capacity(updates.len());
161 let mut error = None;
162 for upd in updates {
163 match serde_json::to_vec(&upd.metadata) {
164 Ok(metadata_bytes) => {
165 formatted.push(crate::rpc::LocationUpdate {
166 timestamp: upd.timestamp,
167 position: upd.position,
168 metadata: metadata_bytes,
169 });
170 }
171 Err(e) => {
172 error = Some(e);
173 break;
174 }
175 }
176 }
177 if let Some(e) = error {
178 (
179 ResponseStatus::Error,
180 ResponsePayload::Error(format!(
181 "Failed to serialize trajectory metadata: {}",
182 e
183 )),
184 )
185 } else {
186 (ResponseStatus::Ok, ResponsePayload::Trajectory(formatted))
187 }
188 }
189 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
190 },
191 Command::InsertTrajectory {
192 namespace,
193 id,
194 trajectory,
195 } => match db.insert_trajectory(&namespace, &id, &trajectory) {
196 Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
197 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
198 },
199 Command::QueryBbox3d {
200 namespace,
201 min_x,
202 min_y,
203 min_z,
204 max_x,
205 max_y,
206 max_z,
207 limit,
208 } => match db
209 .query_within_bbox_3d(&namespace, min_x, min_y, min_z, max_x, max_y, max_z, limit)
210 {
211 Ok(results) => {
212 let formatted = results
213 .into_iter()
214 .map(|loc| {
215 (
216 loc.object_id,
217 loc.position,
218 serde_json::to_vec(&loc.metadata).unwrap_or_default(),
219 )
220 })
221 .collect();
222 (ResponseStatus::Ok, ResponsePayload::ObjectList(formatted))
223 }
224 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
225 },
226 Command::QueryNear {
227 namespace,
228 id,
229 radius,
230 limit,
231 } => match db.query_near(&namespace, &id, radius, limit) {
232 Ok(results) => {
233 let formatted = results
234 .into_iter()
235 .map(|(loc, dist)| {
236 (
237 loc.object_id,
238 loc.position,
239 serde_json::to_vec(&loc.metadata).unwrap_or_default(),
240 dist,
241 )
242 })
243 .collect();
244 (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
245 }
246 Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
247 },
248 })
249 .await;
250
251 match result {
252 Ok(response) => response,
253 Err(e) => (
254 ResponseStatus::Error,
255 ResponsePayload::Error(format!("Task failed: {}", e)),
256 ),
257 }
258 }
259}