Skip to main content

sfo_cmd_server/client/
mod.rs

1mod client;
2
3use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
4use callback_result::CallbackWaiter;
5pub use client::*;
6use num::{FromPrimitive, ToPrimitive};
7use sfo_pool::WorkerClassification;
8use std::hash::Hash;
9use std::ops::Deref;
10use std::sync::Arc;
11use std::time::Duration;
12
13mod classified_client;
14pub use classified_client::*;
15
16use crate::errors::CmdResult;
17use crate::{CmdBody, CmdHandler, CmdTunnelMeta, PeerId, TunnelId};
18
19pub trait CmdSend<M: CmdTunnelMeta>: Send + 'static {
20    fn get_tunnel_meta(&self) -> Option<Arc<M>>;
21    fn get_remote_peer_id(&self) -> PeerId;
22}
23
24pub trait SendGuard<M: CmdTunnelMeta, S: CmdSend<M>>: Send + 'static + Deref<Target = S> {}
25
26#[async_trait::async_trait]
27pub trait CmdClient<
28    LEN: RawEncode
29        + for<'a> RawDecode<'a>
30        + Copy
31        + RawFixedBytes
32        + Sync
33        + Send
34        + 'static
35        + FromPrimitive
36        + ToPrimitive,
37    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
38    M: CmdTunnelMeta,
39    S: CmdSend<M>,
40    G: SendGuard<M, S>,
41>: Send + Sync + 'static
42{
43    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
44    async fn send(&self, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
45    async fn send_with_resp(
46        &self,
47        cmd: CMD,
48        version: u8,
49        body: &[u8],
50        timeout: Duration,
51    ) -> CmdResult<CmdBody>;
52    async fn send_parts(&self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
53    async fn send_parts_with_resp(
54        &self,
55        cmd: CMD,
56        version: u8,
57        body: &[&[u8]],
58        timeout: Duration,
59    ) -> CmdResult<CmdBody>;
60    #[deprecated(note = "use send_parts instead")]
61    async fn send2(&self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
62        self.send_parts(cmd, version, body).await
63    }
64    #[deprecated(note = "use send_parts_with_resp instead")]
65    async fn send2_with_resp(
66        &self,
67        cmd: CMD,
68        version: u8,
69        body: &[&[u8]],
70        timeout: Duration,
71    ) -> CmdResult<CmdBody> {
72        self.send_parts_with_resp(cmd, version, body, timeout).await
73    }
74    async fn send_cmd(&self, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
75    async fn send_cmd_with_resp(
76        &self,
77        cmd: CMD,
78        version: u8,
79        body: CmdBody,
80        timeout: Duration,
81    ) -> CmdResult<CmdBody>;
82    async fn send_by_specify_tunnel(
83        &self,
84        tunnel_id: TunnelId,
85        cmd: CMD,
86        version: u8,
87        body: &[u8],
88    ) -> CmdResult<()>;
89    async fn send_by_specify_tunnel_with_resp(
90        &self,
91        tunnel_id: TunnelId,
92        cmd: CMD,
93        version: u8,
94        body: &[u8],
95        timeout: Duration,
96    ) -> CmdResult<CmdBody>;
97    async fn send_parts_by_specify_tunnel(
98        &self,
99        tunnel_id: TunnelId,
100        cmd: CMD,
101        version: u8,
102        body: &[&[u8]],
103    ) -> CmdResult<()>;
104    async fn send_parts_by_specify_tunnel_with_resp(
105        &self,
106        tunnel_id: TunnelId,
107        cmd: CMD,
108        version: u8,
109        body: &[&[u8]],
110        timeout: Duration,
111    ) -> CmdResult<CmdBody>;
112    #[deprecated(note = "use send_parts_by_specify_tunnel instead")]
113    async fn send2_by_specify_tunnel(
114        &self,
115        tunnel_id: TunnelId,
116        cmd: CMD,
117        version: u8,
118        body: &[&[u8]],
119    ) -> CmdResult<()> {
120        self.send_parts_by_specify_tunnel(tunnel_id, cmd, version, body)
121            .await
122    }
123    #[deprecated(note = "use send_parts_by_specify_tunnel_with_resp instead")]
124    async fn send2_by_specify_tunnel_with_resp(
125        &self,
126        tunnel_id: TunnelId,
127        cmd: CMD,
128        version: u8,
129        body: &[&[u8]],
130        timeout: Duration,
131    ) -> CmdResult<CmdBody> {
132        self.send_parts_by_specify_tunnel_with_resp(tunnel_id, cmd, version, body, timeout)
133            .await
134    }
135    async fn send_cmd_by_specify_tunnel(
136        &self,
137        tunnel_id: TunnelId,
138        cmd: CMD,
139        version: u8,
140        body: CmdBody,
141    ) -> CmdResult<()>;
142    async fn send_cmd_by_specify_tunnel_with_resp(
143        &self,
144        tunnel_id: TunnelId,
145        cmd: CMD,
146        version: u8,
147        body: CmdBody,
148        timeout: Duration,
149    ) -> CmdResult<CmdBody>;
150    async fn clear_all_tunnel(&self);
151    async fn get_send(&self, tunnel_id: TunnelId) -> CmdResult<G>;
152}
153
154#[async_trait::async_trait]
155pub trait ClassifiedCmdClient<
156    LEN: RawEncode
157        + for<'a> RawDecode<'a>
158        + Copy
159        + RawFixedBytes
160        + Sync
161        + Send
162        + 'static
163        + FromPrimitive
164        + ToPrimitive,
165    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
166    C: WorkerClassification,
167    M: CmdTunnelMeta,
168    S: CmdSend<M>,
169    G: SendGuard<M, S>,
170>: CmdClient<LEN, CMD, M, S, G>
171{
172    async fn send_by_classified_tunnel(
173        &self,
174        classification: C,
175        cmd: CMD,
176        version: u8,
177        body: &[u8],
178    ) -> CmdResult<()>;
179    async fn send_by_classified_tunnel_with_resp(
180        &self,
181        classification: C,
182        cmd: CMD,
183        version: u8,
184        body: &[u8],
185        timeout: Duration,
186    ) -> CmdResult<CmdBody>;
187    async fn send_parts_by_classified_tunnel(
188        &self,
189        classification: C,
190        cmd: CMD,
191        version: u8,
192        body: &[&[u8]],
193    ) -> CmdResult<()>;
194    async fn send_parts_by_classified_tunnel_with_resp(
195        &self,
196        classification: C,
197        cmd: CMD,
198        version: u8,
199        body: &[&[u8]],
200        timeout: Duration,
201    ) -> CmdResult<CmdBody>;
202    #[deprecated(note = "use send_parts_by_classified_tunnel instead")]
203    async fn send2_by_classified_tunnel(
204        &self,
205        classification: C,
206        cmd: CMD,
207        version: u8,
208        body: &[&[u8]],
209    ) -> CmdResult<()> {
210        self.send_parts_by_classified_tunnel(classification, cmd, version, body)
211            .await
212    }
213    #[deprecated(note = "use send_parts_by_classified_tunnel_with_resp instead")]
214    async fn send2_by_classified_tunnel_with_resp(
215        &self,
216        classification: C,
217        cmd: CMD,
218        version: u8,
219        body: &[&[u8]],
220        timeout: Duration,
221    ) -> CmdResult<CmdBody> {
222        self.send_parts_by_classified_tunnel_with_resp(classification, cmd, version, body, timeout)
223            .await
224    }
225    async fn send_cmd_by_classified_tunnel(
226        &self,
227        classification: C,
228        cmd: CMD,
229        version: u8,
230        body: CmdBody,
231    ) -> CmdResult<()>;
232    async fn send_cmd_by_classified_tunnel_with_resp(
233        &self,
234        classification: C,
235        cmd: CMD,
236        version: u8,
237        body: CmdBody,
238        timeout: Duration,
239    ) -> CmdResult<CmdBody>;
240    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
241    async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
242}
243
244pub(crate) type RespWaiter = CallbackWaiter<u128, CmdBody>;
245pub(crate) type RespWaiterRef = Arc<RespWaiter>;
246
247pub(crate) fn gen_resp_id<
248    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static,
249>(
250    tunnel_id: TunnelId,
251    cmd: CMD,
252    seq: u32,
253) -> u128 {
254    let cmd_buf = cmd.raw_encode_to_buffer().unwrap();
255    let mut cmd = cmd_buf.len() as u64;
256    for chunk in cmd_buf.chunks(8) {
257        let mut buf = [0u8; 8];
258        buf[..chunk.len()].copy_from_slice(chunk);
259        cmd = cmd.rotate_left(13) ^ u64::from_be_bytes(buf);
260    }
261    ((tunnel_id.value() as u128) << 96) | ((seq as u128) << 64) | (cmd as u128)
262}
263
264pub(crate) fn gen_seq() -> u32 {
265    rand::random::<u32>()
266}
267
268#[cfg(test)]
269mod tests {
270    use super::gen_resp_id;
271    use crate::TunnelId;
272
273    #[test]
274    fn resp_id_changes_with_seq() {
275        let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 1);
276        let id2 = gen_resp_id(TunnelId::from(7), 0x11u8, 2);
277        assert_ne!(id1, id2);
278    }
279
280    #[test]
281    fn resp_id_changes_with_cmd() {
282        let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 5);
283        let id2 = gen_resp_id(TunnelId::from(7), 0x12u8, 5);
284        assert_ne!(id1, id2);
285    }
286
287    #[test]
288    fn resp_id_changes_with_tunnel() {
289        let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 5);
290        let id2 = gen_resp_id(TunnelId::from(8), 0x11u8, 5);
291        assert_ne!(id1, id2);
292    }
293
294    #[test]
295    fn resp_id_changes_with_long_cmd_suffix() {
296        let id1 = gen_resp_id(
297            TunnelId::from(7),
298            0x1122_3344_5566_7788_0000_0000_0000_0001u128,
299            5,
300        );
301        let id2 = gen_resp_id(
302            TunnelId::from(7),
303            0x1122_3344_5566_7788_0000_0000_0000_0002u128,
304            5,
305        );
306        assert_ne!(id1, id2);
307    }
308}