spatio_server/
handler.rs

1use crate::rpc::{Command, ResponsePayload, ResponseStatus};
2use spatio::Spatio;
3use std::sync::Arc;
4
5pub struct Handler {
6    db: Arc<Spatio>,
7}
8
9impl Handler {
10    pub fn new(db: Arc<Spatio>) -> Self {
11        Self { db }
12    }
13
14    pub async fn handle(&self, cmd: Command) -> (ResponseStatus, ResponsePayload) {
15        let db = self.db.clone();
16        let result = tokio::task::spawn_blocking(move || match cmd {
17            Command::Upsert {
18                namespace,
19                id,
20                point,
21                metadata,
22                opts,
23            } => {
24                let metadata_json =
25                    serde_json::from_slice(&metadata).unwrap_or(serde_json::Value::Null);
26                match db.upsert(&namespace, &id, point, metadata_json, opts) {
27                    Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
28                    Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
29                }
30            }
31            Command::Get { namespace, id } => match db.get(&namespace, &id) {
32                Ok(Some(loc)) => (
33                    ResponseStatus::Ok,
34                    ResponsePayload::Object {
35                        id: loc.object_id,
36                        point: loc.position,
37                        metadata: serde_json::to_vec(&loc.metadata).unwrap_or_default(),
38                    },
39                ),
40                Ok(None) => (
41                    ResponseStatus::Error,
42                    ResponsePayload::Error("Not found".into()),
43                ),
44                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
45            },
46            Command::QueryRadius {
47                namespace,
48                center,
49                radius,
50                limit,
51            } => match db.query_radius(&namespace, &center, radius, limit) {
52                Ok(results) => {
53                    let formatted = results
54                        .into_iter()
55                        .map(|(loc, dist)| {
56                            (
57                                loc.object_id,
58                                loc.position,
59                                serde_json::to_vec(&loc.metadata).unwrap_or_default(),
60                                dist,
61                            )
62                        })
63                        .collect();
64                    (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
65                }
66                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
67            },
68            Command::Knn {
69                namespace,
70                center,
71                k,
72            } => match db.knn(&namespace, &center, k) {
73                Ok(results) => {
74                    let formatted = results
75                        .into_iter()
76                        .map(|(loc, dist)| {
77                            (
78                                loc.object_id,
79                                loc.position,
80                                serde_json::to_vec(&loc.metadata).unwrap_or_default(),
81                                dist,
82                            )
83                        })
84                        .collect();
85                    (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
86                }
87                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
88            },
89            Command::Stats => {
90                let stats = db.stats();
91                (ResponseStatus::Ok, ResponsePayload::Stats(stats))
92            }
93            Command::Close => match db.close() {
94                Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
95                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
96            },
97            Command::Delete { namespace, id } => match db.delete(&namespace, &id) {
98                Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
99                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
100            },
101            Command::QueryBbox {
102                namespace,
103                min_x,
104                min_y,
105                max_x,
106                max_y,
107                limit,
108            } => match db.query_bbox(&namespace, min_x, min_y, max_x, max_y, limit) {
109                Ok(results) => {
110                    let formatted = results
111                        .into_iter()
112                        .map(|loc| {
113                            (
114                                loc.object_id,
115                                loc.position,
116                                serde_json::to_vec(&loc.metadata).unwrap_or_default(),
117                            )
118                        })
119                        .collect();
120                    (ResponseStatus::Ok, ResponsePayload::ObjectList(formatted))
121                }
122                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
123            },
124            Command::QueryCylinder {
125                namespace,
126                center_x,
127                center_y,
128                min_z,
129                max_z,
130                radius,
131                limit,
132            } => {
133                let center = spatio_types::geo::Point::new(center_x, center_y);
134                match db.query_within_cylinder(&namespace, center, min_z, max_z, radius, limit) {
135                    Ok(results) => {
136                        let formatted = results
137                            .into_iter()
138                            .map(|(loc, dist)| {
139                                (
140                                    loc.object_id,
141                                    loc.position,
142                                    serde_json::to_vec(&loc.metadata).unwrap_or_default(),
143                                    dist,
144                                )
145                            })
146                            .collect();
147                        (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
148                    }
149                    Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
150                }
151            }
152            Command::QueryTrajectory {
153                namespace,
154                id,
155                start_time,
156                end_time,
157                limit,
158            } => match db.query_trajectory(&namespace, &id, start_time, end_time, limit) {
159                Ok(updates) => {
160                    let mut formatted = Vec::with_capacity(updates.len());
161                    let mut error = None;
162                    for upd in updates {
163                        match serde_json::to_vec(&upd.metadata) {
164                            Ok(metadata_bytes) => {
165                                formatted.push(crate::rpc::LocationUpdate {
166                                    timestamp: upd.timestamp,
167                                    position: upd.position,
168                                    metadata: metadata_bytes,
169                                });
170                            }
171                            Err(e) => {
172                                error = Some(e);
173                                break;
174                            }
175                        }
176                    }
177                    if let Some(e) = error {
178                        (
179                            ResponseStatus::Error,
180                            ResponsePayload::Error(format!(
181                                "Failed to serialize trajectory metadata: {}",
182                                e
183                            )),
184                        )
185                    } else {
186                        (ResponseStatus::Ok, ResponsePayload::Trajectory(formatted))
187                    }
188                }
189                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
190            },
191            Command::InsertTrajectory {
192                namespace,
193                id,
194                trajectory,
195            } => match db.insert_trajectory(&namespace, &id, &trajectory) {
196                Ok(_) => (ResponseStatus::Ok, ResponsePayload::Ok),
197                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
198            },
199            Command::QueryBbox3d {
200                namespace,
201                min_x,
202                min_y,
203                min_z,
204                max_x,
205                max_y,
206                max_z,
207                limit,
208            } => match db
209                .query_within_bbox_3d(&namespace, min_x, min_y, min_z, max_x, max_y, max_z, limit)
210            {
211                Ok(results) => {
212                    let formatted = results
213                        .into_iter()
214                        .map(|loc| {
215                            (
216                                loc.object_id,
217                                loc.position,
218                                serde_json::to_vec(&loc.metadata).unwrap_or_default(),
219                            )
220                        })
221                        .collect();
222                    (ResponseStatus::Ok, ResponsePayload::ObjectList(formatted))
223                }
224                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
225            },
226            Command::QueryNear {
227                namespace,
228                id,
229                radius,
230                limit,
231            } => match db.query_near(&namespace, &id, radius, limit) {
232                Ok(results) => {
233                    let formatted = results
234                        .into_iter()
235                        .map(|(loc, dist)| {
236                            (
237                                loc.object_id,
238                                loc.position,
239                                serde_json::to_vec(&loc.metadata).unwrap_or_default(),
240                                dist,
241                            )
242                        })
243                        .collect();
244                    (ResponseStatus::Ok, ResponsePayload::Objects(formatted))
245                }
246                Err(e) => (ResponseStatus::Error, ResponsePayload::Error(e.to_string())),
247            },
248        })
249        .await;
250
251        match result {
252            Ok(response) => response,
253            Err(e) => (
254                ResponseStatus::Error,
255                ResponsePayload::Error(format!("Task failed: {}", e)),
256            ),
257        }
258    }
259}