1use anyhow::Result;
2use bytes::{Buf, BufMut, BytesMut};
3use serde::{Deserialize, Serialize};
4use spatio_types::config::SetOptions;
5use spatio_types::point::{Point3d, TemporalPoint};
6use spatio_types::stats::DbStats;
7use std::time::SystemTime;
8use tokio_util::codec::{Decoder, Encoder};
9
10pub const MAX_FRAME_SIZE: usize = 10 * 1024 * 1024; #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
13#[repr(u8)]
14pub enum CommandType {
15 Upsert = 0x01,
16 Get = 0x02,
17 QueryRadius = 0x03,
18 Knn = 0x04,
19 Stats = 0x05,
20 Close = 0x06,
21 Delete = 0x07,
22 QueryBbox = 0x08,
23 QueryCylinder = 0x09,
24 QueryTrajectory = 0x0A,
25 InsertTrajectory = 0x0B,
26 QueryBbox3d = 0x0C,
27 QueryNear = 0x0D,
28}
29
30#[derive(Debug, Serialize, Deserialize)]
31pub enum Command {
32 Upsert {
33 namespace: String,
34 id: String,
35 point: Point3d,
36 metadata: Vec<u8>,
37 opts: Option<SetOptions>,
38 },
39 Get {
40 namespace: String,
41 id: String,
42 },
43 QueryRadius {
44 namespace: String,
45 center: Point3d,
46 radius: f64,
47 limit: usize,
48 },
49 Knn {
50 namespace: String,
51 center: Point3d,
52 k: usize,
53 },
54 Stats,
55 Close,
56 Delete {
57 namespace: String,
58 id: String,
59 },
60 QueryBbox {
61 namespace: String,
62 min_x: f64,
63 min_y: f64,
64 max_x: f64,
65 max_y: f64,
66 limit: usize,
67 },
68 QueryCylinder {
69 namespace: String,
70 center_x: f64,
71 center_y: f64,
72 min_z: f64,
73 max_z: f64,
74 radius: f64,
75 limit: usize,
76 },
77 QueryTrajectory {
78 namespace: String,
79 id: String,
80 start_time: SystemTime,
81 end_time: SystemTime,
82 limit: usize,
83 },
84 InsertTrajectory {
85 namespace: String,
86 id: String,
87 trajectory: Vec<TemporalPoint>,
88 },
89 QueryBbox3d {
90 namespace: String,
91 min_x: f64,
92 min_y: f64,
93 min_z: f64,
94 max_x: f64,
95 max_y: f64,
96 max_z: f64,
97 limit: usize,
98 },
99 QueryNear {
100 namespace: String,
101 id: String,
102 radius: f64,
103 limit: usize,
104 },
105}
106
107#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
108#[repr(u8)]
109pub enum ResponseStatus {
110 Ok = 0x00,
111 Error = 0x01,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct LocationUpdate {
116 pub timestamp: SystemTime,
117 pub position: Point3d,
118 pub metadata: Vec<u8>,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub enum ResponsePayload {
123 Ok,
124 Stats(DbStats),
125 Object {
126 id: String,
127 point: Point3d,
128 metadata: Vec<u8>,
129 },
130 Objects(Vec<(String, Point3d, Vec<u8>, f64)>),
131 ObjectList(Vec<(String, Point3d, Vec<u8>)>),
132 Trajectory(Vec<LocationUpdate>),
133 Error(String),
134}
135
136pub struct RpcServerCodec;
137
138impl Decoder for RpcServerCodec {
139 type Item = Command;
140 type Error = anyhow::Error;
141
142 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
143 if src.len() < 5 {
144 return Ok(None);
145 }
146
147 let mut buf = std::io::Cursor::new(&src[..]);
148 let _tag = buf.get_u8();
149 let len = buf.get_u32() as usize;
150
151 if len > MAX_FRAME_SIZE {
152 return Err(anyhow::anyhow!(
153 "Frame size {} exceeds maximum {}",
154 len,
155 MAX_FRAME_SIZE
156 ));
157 }
158
159 if src.len() < 5 + len {
160 return Ok(None);
161 }
162
163 src.advance(5);
164 let payload = src.split_to(len);
165 let cmd: Command = bincode::deserialize(&payload)?;
166
167 Ok(Some(cmd))
168 }
169}
170
171impl Encoder<(ResponseStatus, ResponsePayload)> for RpcServerCodec {
172 type Error = anyhow::Error;
173
174 fn encode(
175 &mut self,
176 item: (ResponseStatus, ResponsePayload),
177 dst: &mut BytesMut,
178 ) -> Result<(), Self::Error> {
179 let (status, payload) = item;
180 let serialized_payload = bincode::serialize(&payload)?;
181 let len = serialized_payload.len() as u32;
182
183 dst.reserve(5 + serialized_payload.len());
184 dst.put_u8(status as u8);
185 dst.put_u32(len);
186 dst.put_slice(&serialized_payload);
187
188 Ok(())
189 }
190}
191
192pub struct RpcClientCodec;
193
194impl Decoder for RpcClientCodec {
195 type Item = (ResponseStatus, ResponsePayload);
196 type Error = anyhow::Error;
197
198 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
199 if src.len() < 5 {
200 return Ok(None);
201 }
202
203 let mut buf = std::io::Cursor::new(&src[..]);
204 let status_raw = buf.get_u8();
205 let status = if status_raw == 0 {
206 ResponseStatus::Ok
207 } else {
208 ResponseStatus::Error
209 };
210 let len = buf.get_u32() as usize;
211
212 if len > MAX_FRAME_SIZE {
213 return Err(anyhow::anyhow!(
214 "Frame size {} exceeds maximum {}",
215 len,
216 MAX_FRAME_SIZE
217 ));
218 }
219
220 if src.len() < 5 + len {
221 return Ok(None);
222 }
223
224 src.advance(5);
225 let payload = src.split_to(len);
226 let response_payload: ResponsePayload = bincode::deserialize(&payload)?;
227
228 Ok(Some((status, response_payload)))
229 }
230}
231
232impl Encoder<Command> for RpcClientCodec {
233 type Error = anyhow::Error;
234
235 fn encode(&mut self, item: Command, dst: &mut BytesMut) -> Result<(), Self::Error> {
236 let serialized_payload = bincode::serialize(&item)?;
237 let len = serialized_payload.len() as u32;
238
239 dst.reserve(5 + serialized_payload.len());
240 dst.put_u8(0x00); dst.put_u32(len);
242 dst.put_slice(&serialized_payload);
243
244 Ok(())
245 }
246}