spatio_server/
handler.rs

1use crate::rpc::{Command, ResponsePayload, ResponseStatus};
2use spatio::Spatio;
3use std::sync::Arc;
4
5pub struct Handler {
6    db: Arc<Spatio>,
7}
8
9impl Handler {
10    pub fn new(db: Arc<Spatio>) -> Self {
11        Self { db }
12    }
13
14    pub async fn handle(&self, cmd: Command) -> (ResponseStatus, ResponsePayload) {
15        match cmd {
16            Command::Upsert {
17                namespace,
18                id,
19                point,
20                metadata,
21                opts,
22            } => {
23                let metadata_json =
24                    serde_json::from_slice(&metadata).unwrap_or(serde_json::Value::Null);
25                match self.db.upsert(&namespace, &id, point, metadata_json, opts) {
26                    Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
27                    Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
28                }
29            }
30            Command::Get { namespace, id } => match self.db.get(&namespace, &id) {
31                Ok(Some(loc)) => (
32                    ResponseStatus::Ok,
33                    ResponsePayload::Object {
34                        id: loc.object_id,
35                        point: loc.position,
36                        metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
37                    },
38                ),
39                Ok(None) => (
40                    ResponseStatus::Error,
41                    ResponsePayload::Error("Not found".into()),
42                ),
43                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
44            },
45            Command::QueryRadius {
46                namespace,
47                center,
48                radius,
49                limit,
50            } => match self.db.query_radius(&namespace, &center, radius, limit) {
51                Ok(results) => {
52                    let formatted = results
53                        .into_iter()
54                        .map(|(loc, dist)| {
55                            (
56                                loc.object_id,
57                                loc.position,
58                                serde_json::to_vec(&loc.metadata).unwrap_or_default(),
59                                dist,
60                            )
61                        })
62                        .collect();
63                    (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
64                }
65                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
66            },
67            Command::Knn {
68                namespace,
69                center,
70                k,
71            } => match self.db.knn(&namespace, &center, k) {
72                Ok(results) => {
73                    let formatted = results
74                        .into_iter()
75                        .map(|(loc, dist)| {
76                            (
77                                loc.object_id,
78                                loc.position,
79                                serde_json::to_vec(&loc.metadata).unwrap_or_default(),
80                                dist,
81                            )
82                        })
83                        .collect();
84                    (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
85                }
86                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
87            },
88            Command::Stats => {
89                let stats = self.db.stats();
90                (ResponseStatus::Ok, ResponsePayload::Stats(stats))
91            }
92            Command::Close => match self.db.close() {
93                Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
94                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
95            },
96            Command::Delete { namespace, id } => match self.db.delete(&namespace, &id) {
97                Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
98                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
99            },
100            Command::QueryBbox {
101                namespace,
102                min_x,
103                min_y,
104                max_x,
105                max_y,
106                limit,
107            } => match self
108                .db
109                .query_bbox(&namespace, min_x, min_y, max_x, max_y, limit)
110            {
111                Ok(results) => {
112                    let formatted = results
113                        .into_iter()
114                        .map(|loc| {
115                            (
116                                loc.object_id,
117                                loc.position,
118                                serde_json::to_vec(&loc.metadata).unwrap_or_default(),
119                            )
120                        })
121                        .collect();
122                    (ResponseStatus::Ok, ResponsePayload::ObjectList(formatted))
123                }
124                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
125            },
126            Command::QueryCylinder {
127                namespace,
128                center_x,
129                center_y,
130                min_z,
131                max_z,
132                radius,
133                limit,
134            } => {
135                let center = spatio_types::geo::Point::new(center_x, center_y);
136                match self
137                    .db
138                    .query_within_cylinder(&namespace, center, min_z, max_z, radius, limit)
139                {
140                    Ok(results) => {
141                        let formatted = results
142                            .into_iter()
143                            .map(|(loc, dist)| {
144                                (
145                                    loc.object_id,
146                                    loc.position,
147                                    serde_json::to_vec(&loc.metadata).unwrap_or_default(),
148                                    dist,
149                                )
150                            })
151                            .collect();
152                        (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
153                    }
154                    Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
155                }
156            }
157            Command::QueryTrajectory {
158                namespace,
159                id,
160                start_time,
161                end_time,
162                limit,
163            } => match self
164                .db
165                .query_trajectory(&namespace, &id, start_time, end_time, limit)
166            {
167                Ok(updates) => {
168                    let mut formatted = Vec::with_capacity(updates.len());
169                    for upd in updates {
170                        match serde_json::to_vec(&upd.metadata) {
171                            Ok(metadata_bytes) => {
172                                formatted.push(crate::rpc::LocationUpdate {
173                                    timestamp: upd.timestamp,
174                                    position: upd.position,
175                                    metadata: metadata_bytes,
176                                });
177                            }
178                            Err(e) => {
179                                return (
180                                    ResponseStatus::Error,
181                                    ResponsePayload::Error(format!(
182                                        "Failed to serialize trajectory metadata: {}",
183                                        e
184                                    )),
185                                );
186                            }
187                        }
188                    }
189                    (ResponseStatus::Ok, ResponsePayload::Trajectory(formatted))
190                }
191                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
192            },
193            Command::InsertTrajectory {
194                namespace,
195                id,
196                trajectory,
197            } => match self.db.insert_trajectory(&namespace, &id, &trajectory) {
198                Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
199                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
200            },
201            Command::QueryBbox3d {
202                namespace,
203                min_x,
204                min_y,
205                min_z,
206                max_x,
207                max_y,
208                max_z,
209                limit,
210            } => match self
211                .db
212                .query_within_bbox_3d(&namespace, min_x, min_y, min_z, max_x, max_y, max_z, limit)
213            {
214                Ok(results) => {
215                    let formatted = results
216                        .into_iter()
217                        .map(|loc| {
218                            (
219                                loc.object_id,
220                                loc.position,
221                                serde_json::to_vec(&loc.metadata).unwrap_or_default(),
222                            )
223                        })
224                        .collect();
225                    (ResponseStatus::Ok, ResponsePayload::ObjectList(formatted))
226                }
227                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
228            },
229            Command::QueryNear {
230                namespace,
231                id,
232                radius,
233                limit,
234            } => match self.db.query_near(&namespace, &id, radius, limit) {
235                Ok(results) => {
236                    let formatted = results
237                        .into_iter()
238                        .map(|(loc, dist)| {
239                            (
240                                loc.object_id,
241                                loc.position,
242                                serde_json::to_vec(&loc.metadata).unwrap_or_default(),
243                                dist,
244                            )
245                        })
246                        .collect();
247                    (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
248                }
249                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
250            },
251        }
252    }
253}