Skip to main content

sfo_cmd_server/client/
mod.rs

1mod client;
2
3use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
4use callback_result::CallbackWaiter;
5pub use client::*;
6use num::{FromPrimitive, ToPrimitive};
7use sfo_pool::WorkerClassification;
8use std::hash::Hash;
9use std::ops::Deref;
10use std::sync::Arc;
11use std::time::Duration;
12
13mod classified_client;
14pub use classified_client::*;
15
16use crate::errors::CmdResult;
17use crate::{CmdBody, CmdHandler, CmdTunnelMeta, PeerId, TunnelId};
18
19pub trait CmdSend<M: CmdTunnelMeta>: Send + 'static {
20    fn get_tunnel_meta(&self) -> Option<Arc<M>>;
21    fn get_remote_peer_id(&self) -> PeerId;
22}
23
24pub trait SendGuard<M: CmdTunnelMeta, S: CmdSend<M>>: Send + 'static + Deref<Target = S> {}
25
26#[async_trait::async_trait]
27pub trait CmdClient<
28    LEN: RawEncode
29        + for<'a> RawDecode<'a>
30        + Copy
31        + RawFixedBytes
32        + Sync
33        + Send
34        + 'static
35        + FromPrimitive
36        + ToPrimitive,
37    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
38    M: CmdTunnelMeta,
39    S: CmdSend<M>,
40    G: SendGuard<M, S>,
41>: Send + Sync + 'static
42{
43    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
44    async fn send(&self, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
45    async fn send_with_resp(
46        &self,
47        cmd: CMD,
48        version: u8,
49        body: &[u8],
50        timeout: Duration,
51    ) -> CmdResult<CmdBody>;
52    async fn send2(&self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
53    async fn send2_with_resp(
54        &self,
55        cmd: CMD,
56        version: u8,
57        body: &[&[u8]],
58        timeout: Duration,
59    ) -> CmdResult<CmdBody>;
60    async fn send_cmd(&self, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
61    async fn send_cmd_with_resp(
62        &self,
63        cmd: CMD,
64        version: u8,
65        body: CmdBody,
66        timeout: Duration,
67    ) -> CmdResult<CmdBody>;
68    async fn send_by_specify_tunnel(
69        &self,
70        tunnel_id: TunnelId,
71        cmd: CMD,
72        version: u8,
73        body: &[u8],
74    ) -> CmdResult<()>;
75    async fn send_by_specify_tunnel_with_resp(
76        &self,
77        tunnel_id: TunnelId,
78        cmd: CMD,
79        version: u8,
80        body: &[u8],
81        timeout: Duration,
82    ) -> CmdResult<CmdBody>;
83    async fn send2_by_specify_tunnel(
84        &self,
85        tunnel_id: TunnelId,
86        cmd: CMD,
87        version: u8,
88        body: &[&[u8]],
89    ) -> CmdResult<()>;
90    async fn send2_by_specify_tunnel_with_resp(
91        &self,
92        tunnel_id: TunnelId,
93        cmd: CMD,
94        version: u8,
95        body: &[&[u8]],
96        timeout: Duration,
97    ) -> CmdResult<CmdBody>;
98    async fn send_cmd_by_specify_tunnel(
99        &self,
100        tunnel_id: TunnelId,
101        cmd: CMD,
102        version: u8,
103        body: CmdBody,
104    ) -> CmdResult<()>;
105    async fn send_cmd_by_specify_tunnel_with_resp(
106        &self,
107        tunnel_id: TunnelId,
108        cmd: CMD,
109        version: u8,
110        body: CmdBody,
111        timeout: Duration,
112    ) -> CmdResult<CmdBody>;
113    async fn clear_all_tunnel(&self);
114    async fn get_send(&self, tunnel_id: TunnelId) -> CmdResult<G>;
115}
116
117#[async_trait::async_trait]
118pub trait ClassifiedCmdClient<
119    LEN: RawEncode
120        + for<'a> RawDecode<'a>
121        + Copy
122        + RawFixedBytes
123        + Sync
124        + Send
125        + 'static
126        + FromPrimitive
127        + ToPrimitive,
128    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
129    C: WorkerClassification,
130    M: CmdTunnelMeta,
131    S: CmdSend<M>,
132    G: SendGuard<M, S>,
133>: CmdClient<LEN, CMD, M, S, G>
134{
135    async fn send_by_classified_tunnel(
136        &self,
137        classification: C,
138        cmd: CMD,
139        version: u8,
140        body: &[u8],
141    ) -> CmdResult<()>;
142    async fn send_by_classified_tunnel_with_resp(
143        &self,
144        classification: C,
145        cmd: CMD,
146        version: u8,
147        body: &[u8],
148        timeout: Duration,
149    ) -> CmdResult<CmdBody>;
150    async fn send2_by_classified_tunnel(
151        &self,
152        classification: C,
153        cmd: CMD,
154        version: u8,
155        body: &[&[u8]],
156    ) -> CmdResult<()>;
157    async fn send2_by_classified_tunnel_with_resp(
158        &self,
159        classification: C,
160        cmd: CMD,
161        version: u8,
162        body: &[&[u8]],
163        timeout: Duration,
164    ) -> CmdResult<CmdBody>;
165    async fn send_cmd_by_classified_tunnel(
166        &self,
167        classification: C,
168        cmd: CMD,
169        version: u8,
170        body: CmdBody,
171    ) -> CmdResult<()>;
172    async fn send_cmd_by_classified_tunnel_with_resp(
173        &self,
174        classification: C,
175        cmd: CMD,
176        version: u8,
177        body: CmdBody,
178        timeout: Duration,
179    ) -> CmdResult<CmdBody>;
180    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
181    async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
182}
183
184pub(crate) type RespWaiter = CallbackWaiter<u128, CmdBody>;
185pub(crate) type RespWaiterRef = Arc<RespWaiter>;
186
187pub(crate) fn gen_resp_id<
188    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static,
189>(
190    tunnel_id: TunnelId,
191    cmd: CMD,
192    seq: u32,
193) -> u128 {
194    let cmd_buf = cmd.raw_encode_to_buffer().unwrap();
195    let cmd = if cmd_buf.len() > 8 {
196        u64::from_be_bytes(cmd_buf[..8].try_into().unwrap())
197    } else {
198        let mut buf = [0u8; 8];
199        buf[..cmd_buf.len()].copy_from_slice(&cmd_buf);
200        u64::from_be_bytes(buf)
201    };
202    ((tunnel_id.value() as u128) << 96) | ((seq as u128) << 64) | (cmd as u128)
203}
204
205pub(crate) fn gen_seq() -> u32 {
206    rand::random::<u32>()
207}
208
209#[cfg(test)]
210mod tests {
211    use super::gen_resp_id;
212    use crate::TunnelId;
213
214    #[test]
215    fn resp_id_changes_with_seq() {
216        let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 1);
217        let id2 = gen_resp_id(TunnelId::from(7), 0x11u8, 2);
218        assert_ne!(id1, id2);
219    }
220
221    #[test]
222    fn resp_id_changes_with_cmd() {
223        let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 5);
224        let id2 = gen_resp_id(TunnelId::from(7), 0x12u8, 5);
225        assert_ne!(id1, id2);
226    }
227
228    #[test]
229    fn resp_id_changes_with_tunnel() {
230        let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 5);
231        let id2 = gen_resp_id(TunnelId::from(8), 0x11u8, 5);
232        assert_ne!(id1, id2);
233    }
234}