1mod client;
2
3use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
4use callback_result::CallbackWaiter;
5pub use client::*;
6use num::{FromPrimitive, ToPrimitive};
7use sfo_pool::WorkerClassification;
8use std::hash::Hash;
9use std::ops::Deref;
10use std::sync::Arc;
11use std::time::Duration;
12
13mod classified_client;
14pub use classified_client::*;
15
16use crate::errors::CmdResult;
17use crate::{CmdBody, CmdHandler, CmdTunnelMeta, PeerId, TunnelId};
18
19pub trait CmdSend<M: CmdTunnelMeta>: Send + 'static {
20 fn get_tunnel_meta(&self) -> Option<Arc<M>>;
21 fn get_remote_peer_id(&self) -> PeerId;
22}
23
24pub trait SendGuard<M: CmdTunnelMeta, S: CmdSend<M>>: Send + 'static + Deref<Target = S> {}
25
26#[async_trait::async_trait]
27pub trait CmdClient<
28 LEN: RawEncode
29 + for<'a> RawDecode<'a>
30 + Copy
31 + RawFixedBytes
32 + Sync
33 + Send
34 + 'static
35 + FromPrimitive
36 + ToPrimitive,
37 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
38 M: CmdTunnelMeta,
39 S: CmdSend<M>,
40 G: SendGuard<M, S>,
41>: Send + Sync + 'static
42{
43 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>);
44 async fn send(&self, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()>;
45 async fn send_with_resp(
46 &self,
47 cmd: CMD,
48 version: u8,
49 body: &[u8],
50 timeout: Duration,
51 ) -> CmdResult<CmdBody>;
52 async fn send_parts(&self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()>;
53 async fn send_parts_with_resp(
54 &self,
55 cmd: CMD,
56 version: u8,
57 body: &[&[u8]],
58 timeout: Duration,
59 ) -> CmdResult<CmdBody>;
60 #[deprecated(note = "use send_parts instead")]
61 async fn send2(&self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
62 self.send_parts(cmd, version, body).await
63 }
64 #[deprecated(note = "use send_parts_with_resp instead")]
65 async fn send2_with_resp(
66 &self,
67 cmd: CMD,
68 version: u8,
69 body: &[&[u8]],
70 timeout: Duration,
71 ) -> CmdResult<CmdBody> {
72 self.send_parts_with_resp(cmd, version, body, timeout).await
73 }
74 async fn send_cmd(&self, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()>;
75 async fn send_cmd_with_resp(
76 &self,
77 cmd: CMD,
78 version: u8,
79 body: CmdBody,
80 timeout: Duration,
81 ) -> CmdResult<CmdBody>;
82 async fn send_by_specify_tunnel(
83 &self,
84 tunnel_id: TunnelId,
85 cmd: CMD,
86 version: u8,
87 body: &[u8],
88 ) -> CmdResult<()>;
89 async fn send_by_specify_tunnel_with_resp(
90 &self,
91 tunnel_id: TunnelId,
92 cmd: CMD,
93 version: u8,
94 body: &[u8],
95 timeout: Duration,
96 ) -> CmdResult<CmdBody>;
97 async fn send_parts_by_specify_tunnel(
98 &self,
99 tunnel_id: TunnelId,
100 cmd: CMD,
101 version: u8,
102 body: &[&[u8]],
103 ) -> CmdResult<()>;
104 async fn send_parts_by_specify_tunnel_with_resp(
105 &self,
106 tunnel_id: TunnelId,
107 cmd: CMD,
108 version: u8,
109 body: &[&[u8]],
110 timeout: Duration,
111 ) -> CmdResult<CmdBody>;
112 #[deprecated(note = "use send_parts_by_specify_tunnel instead")]
113 async fn send2_by_specify_tunnel(
114 &self,
115 tunnel_id: TunnelId,
116 cmd: CMD,
117 version: u8,
118 body: &[&[u8]],
119 ) -> CmdResult<()> {
120 self.send_parts_by_specify_tunnel(tunnel_id, cmd, version, body)
121 .await
122 }
123 #[deprecated(note = "use send_parts_by_specify_tunnel_with_resp instead")]
124 async fn send2_by_specify_tunnel_with_resp(
125 &self,
126 tunnel_id: TunnelId,
127 cmd: CMD,
128 version: u8,
129 body: &[&[u8]],
130 timeout: Duration,
131 ) -> CmdResult<CmdBody> {
132 self.send_parts_by_specify_tunnel_with_resp(tunnel_id, cmd, version, body, timeout)
133 .await
134 }
135 async fn send_cmd_by_specify_tunnel(
136 &self,
137 tunnel_id: TunnelId,
138 cmd: CMD,
139 version: u8,
140 body: CmdBody,
141 ) -> CmdResult<()>;
142 async fn send_cmd_by_specify_tunnel_with_resp(
143 &self,
144 tunnel_id: TunnelId,
145 cmd: CMD,
146 version: u8,
147 body: CmdBody,
148 timeout: Duration,
149 ) -> CmdResult<CmdBody>;
150 async fn clear_all_tunnel(&self);
151 async fn get_send(&self, tunnel_id: TunnelId) -> CmdResult<G>;
152}
153
154#[async_trait::async_trait]
155pub trait ClassifiedCmdClient<
156 LEN: RawEncode
157 + for<'a> RawDecode<'a>
158 + Copy
159 + RawFixedBytes
160 + Sync
161 + Send
162 + 'static
163 + FromPrimitive
164 + ToPrimitive,
165 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash,
166 C: WorkerClassification,
167 M: CmdTunnelMeta,
168 S: CmdSend<M>,
169 G: SendGuard<M, S>,
170>: CmdClient<LEN, CMD, M, S, G>
171{
172 async fn send_by_classified_tunnel(
173 &self,
174 classification: C,
175 cmd: CMD,
176 version: u8,
177 body: &[u8],
178 ) -> CmdResult<()>;
179 async fn send_by_classified_tunnel_with_resp(
180 &self,
181 classification: C,
182 cmd: CMD,
183 version: u8,
184 body: &[u8],
185 timeout: Duration,
186 ) -> CmdResult<CmdBody>;
187 async fn send_parts_by_classified_tunnel(
188 &self,
189 classification: C,
190 cmd: CMD,
191 version: u8,
192 body: &[&[u8]],
193 ) -> CmdResult<()>;
194 async fn send_parts_by_classified_tunnel_with_resp(
195 &self,
196 classification: C,
197 cmd: CMD,
198 version: u8,
199 body: &[&[u8]],
200 timeout: Duration,
201 ) -> CmdResult<CmdBody>;
202 #[deprecated(note = "use send_parts_by_classified_tunnel instead")]
203 async fn send2_by_classified_tunnel(
204 &self,
205 classification: C,
206 cmd: CMD,
207 version: u8,
208 body: &[&[u8]],
209 ) -> CmdResult<()> {
210 self.send_parts_by_classified_tunnel(classification, cmd, version, body)
211 .await
212 }
213 #[deprecated(note = "use send_parts_by_classified_tunnel_with_resp instead")]
214 async fn send2_by_classified_tunnel_with_resp(
215 &self,
216 classification: C,
217 cmd: CMD,
218 version: u8,
219 body: &[&[u8]],
220 timeout: Duration,
221 ) -> CmdResult<CmdBody> {
222 self.send_parts_by_classified_tunnel_with_resp(classification, cmd, version, body, timeout)
223 .await
224 }
225 async fn send_cmd_by_classified_tunnel(
226 &self,
227 classification: C,
228 cmd: CMD,
229 version: u8,
230 body: CmdBody,
231 ) -> CmdResult<()>;
232 async fn send_cmd_by_classified_tunnel_with_resp(
233 &self,
234 classification: C,
235 cmd: CMD,
236 version: u8,
237 body: CmdBody,
238 timeout: Duration,
239 ) -> CmdResult<CmdBody>;
240 async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId>;
241 async fn get_send_by_classified(&self, classification: C) -> CmdResult<G>;
242}
243
244pub(crate) type RespWaiter = CallbackWaiter<u128, CmdBody>;
245pub(crate) type RespWaiterRef = Arc<RespWaiter>;
246
247pub(crate) fn gen_resp_id<
248 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static,
249>(
250 tunnel_id: TunnelId,
251 cmd: CMD,
252 seq: u32,
253) -> u128 {
254 let cmd_buf = cmd.raw_encode_to_buffer().unwrap();
255 let mut cmd = cmd_buf.len() as u64;
256 for chunk in cmd_buf.chunks(8) {
257 let mut buf = [0u8; 8];
258 buf[..chunk.len()].copy_from_slice(chunk);
259 cmd = cmd.rotate_left(13) ^ u64::from_be_bytes(buf);
260 }
261 ((tunnel_id.value() as u128) << 96) | ((seq as u128) << 64) | (cmd as u128)
262}
263
264pub(crate) fn gen_seq() -> u32 {
265 rand::random::<u32>()
266}
267
268#[cfg(test)]
269mod tests {
270 use super::gen_resp_id;
271 use crate::TunnelId;
272
273 #[test]
274 fn resp_id_changes_with_seq() {
275 let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 1);
276 let id2 = gen_resp_id(TunnelId::from(7), 0x11u8, 2);
277 assert_ne!(id1, id2);
278 }
279
280 #[test]
281 fn resp_id_changes_with_cmd() {
282 let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 5);
283 let id2 = gen_resp_id(TunnelId::from(7), 0x12u8, 5);
284 assert_ne!(id1, id2);
285 }
286
287 #[test]
288 fn resp_id_changes_with_tunnel() {
289 let id1 = gen_resp_id(TunnelId::from(7), 0x11u8, 5);
290 let id2 = gen_resp_id(TunnelId::from(8), 0x11u8, 5);
291 assert_ne!(id1, id2);
292 }
293
294 #[test]
295 fn resp_id_changes_with_long_cmd_suffix() {
296 let id1 = gen_resp_id(
297 TunnelId::from(7),
298 0x1122_3344_5566_7788_0000_0000_0000_0001u128,
299 5,
300 );
301 let id2 = gen_resp_id(
302 TunnelId::from(7),
303 0x1122_3344_5566_7788_0000_0000_0000_0002u128,
304 5,
305 );
306 assert_ne!(id1, id2);
307 }
308}