spatio_rpc/
lib.rs

1use anyhow::Result;
2use bytes::{Buf, BufMut, BytesMut};
3use serde::{Deserialize, Serialize};
4use spatio_types::config::SetOptions;
5use spatio_types::point::{Point3d, TemporalPoint};
6use spatio_types::stats::DbStats;
7use std::time::SystemTime;
8use tokio_util::codec::{Decoder, Encoder};
9
10pub const MAX_FRAME_SIZE: usize = 10 * 1024 * 1024; // 10MB
11
12#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
13#[repr(u8)]
14pub enum CommandType {
15    Upsert = 0x01,
16    Get = 0x02,
17    QueryRadius = 0x03,
18    Knn = 0x04,
19    Stats = 0x05,
20    Close = 0x06,
21    Delete = 0x07,
22    QueryBbox = 0x08,
23    QueryCylinder = 0x09,
24    QueryTrajectory = 0x0A,
25    InsertTrajectory = 0x0B,
26    QueryBbox3d = 0x0C,
27    QueryNear = 0x0D,
28}
29
30#[derive(Debug, Serialize, Deserialize)]
31pub enum Command {
32    Upsert {
33        namespace: String,
34        id: String,
35        point: Point3d,
36        metadata: Vec<u8>,
37        opts: Option<SetOptions>,
38    },
39    Get {
40        namespace: String,
41        id: String,
42    },
43    QueryRadius {
44        namespace: String,
45        center: Point3d,
46        radius: f64,
47        limit: usize,
48    },
49    Knn {
50        namespace: String,
51        center: Point3d,
52        k: usize,
53    },
54    Stats,
55    Close,
56    Delete {
57        namespace: String,
58        id: String,
59    },
60    QueryBbox {
61        namespace: String,
62        min_x: f64,
63        min_y: f64,
64        max_x: f64,
65        max_y: f64,
66        limit: usize,
67    },
68    QueryCylinder {
69        namespace: String,
70        center_x: f64,
71        center_y: f64,
72        min_z: f64,
73        max_z: f64,
74        radius: f64,
75        limit: usize,
76    },
77    QueryTrajectory {
78        namespace: String,
79        id: String,
80        start_time: SystemTime,
81        end_time: SystemTime,
82        limit: usize,
83    },
84    InsertTrajectory {
85        namespace: String,
86        id: String,
87        trajectory: Vec<TemporalPoint>,
88    },
89    QueryBbox3d {
90        namespace: String,
91        min_x: f64,
92        min_y: f64,
93        min_z: f64,
94        max_x: f64,
95        max_y: f64,
96        max_z: f64,
97        limit: usize,
98    },
99    QueryNear {
100        namespace: String,
101        id: String,
102        radius: f64,
103        limit: usize,
104    },
105}
106
107#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
108#[repr(u8)]
109pub enum ResponseStatus {
110    Ok = 0x00,
111    Error = 0x01,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct LocationUpdate {
116    pub timestamp: SystemTime,
117    pub position: Point3d,
118    pub metadata: Vec<u8>,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub enum ResponsePayload {
123    Ok,
124    Stats(DbStats),
125    Object {
126        id: String,
127        point: Point3d,
128        metadata: Vec<u8>,
129    },
130    Objects(Vec<(String, Point3d, Vec<u8>, f64)>),
131    ObjectList(Vec<(String, Point3d, Vec<u8>)>),
132    Trajectory(Vec<LocationUpdate>),
133    Error(String),
134}
135
136pub struct RpcServerCodec;
137
138impl Decoder for RpcServerCodec {
139    type Item = Command;
140    type Error = anyhow::Error;
141
142    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
143        if src.len() < 5 {
144            return Ok(None);
145        }
146
147        let mut buf = std::io::Cursor::new(&src[..]);
148        let _tag = buf.get_u8();
149        let len = buf.get_u32() as usize;
150
151        if len > MAX_FRAME_SIZE {
152            return Err(anyhow::anyhow!(
153                "Frame size {} exceeds maximum {}",
154                len,
155                MAX_FRAME_SIZE
156            ));
157        }
158
159        if src.len() < 5 + len {
160            return Ok(None);
161        }
162
163        src.advance(5);
164        let payload = src.split_to(len);
165        let cmd: Command = bincode::deserialize(&payload)?;
166
167        Ok(Some(cmd))
168    }
169}
170
171impl Encoder<(ResponseStatus, ResponsePayload)> for RpcServerCodec {
172    type Error = anyhow::Error;
173
174    fn encode(
175        &mut self,
176        item: (ResponseStatus, ResponsePayload),
177        dst: &mut BytesMut,
178    ) -> Result<(), Self::Error> {
179        let (status, payload) = item;
180        let serialized_payload = bincode::serialize(&payload)?;
181        let len = serialized_payload.len() as u32;
182
183        dst.reserve(5 + serialized_payload.len());
184        dst.put_u8(status as u8);
185        dst.put_u32(len);
186        dst.put_slice(&serialized_payload);
187
188        Ok(())
189    }
190}
191
192pub struct RpcClientCodec;
193
194impl Decoder for RpcClientCodec {
195    type Item = (ResponseStatus, ResponsePayload);
196    type Error = anyhow::Error;
197
198    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
199        if src.len() < 5 {
200            return Ok(None);
201        }
202
203        let mut buf = std::io::Cursor::new(&src[..]);
204        let status_raw = buf.get_u8();
205        let status = if status_raw == 0 {
206            ResponseStatus::Ok
207        } else {
208            ResponseStatus::Error
209        };
210        let len = buf.get_u32() as usize;
211
212        if len > MAX_FRAME_SIZE {
213            return Err(anyhow::anyhow!(
214                "Frame size {} exceeds maximum {}",
215                len,
216                MAX_FRAME_SIZE
217            ));
218        }
219
220        if src.len() < 5 + len {
221            return Ok(None);
222        }
223
224        src.advance(5);
225        let payload = src.split_to(len);
226        let response_payload: ResponsePayload = bincode::deserialize(&payload)?;
227
228        Ok(Some((status, response_payload)))
229    }
230}
231
232impl Encoder<Command> for RpcClientCodec {
233    type Error = anyhow::Error;
234
235    fn encode(&mut self, item: Command, dst: &mut BytesMut) -> Result<(), Self::Error> {
236        let serialized_payload = bincode::serialize(&item)?;
237        let len = serialized_payload.len() as u32;
238
239        dst.reserve(5 + serialized_payload.len());
240        dst.put_u8(0x00); // Tag for Command
241        dst.put_u32(len);
242        dst.put_slice(&serialized_payload);
243
244        Ok(())
245    }
246}