1use std::fmt::Debug;
2use std::hash::Hash;
3use std::marker::PhantomData;
4use std::ops::{Deref, DerefMut};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use async_named_locker::ObjectHolder;
8use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
9use num::{FromPrimitive, ToPrimitive};
10use sfo_pool::{into_pool_err, pool_err, ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool, ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult, WorkerClassification};
11use sfo_split::{Splittable, WHalf};
12use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
13use tokio::spawn;
14use tokio::task::JoinHandle;
15use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId, TunnelIdGenerator};
16use crate::client::{gen_resp_id, gen_seq, CmdClient, CmdSend, RespWaiter, RespWaiterRef, SendGuard};
17use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
18use crate::errors::{cmd_err, into_cmd_err, CmdErrorCode, CmdResult};
19use crate::peer_id::PeerId;
20
21#[async_trait::async_trait]
22pub trait CmdTunnelFactory<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>: Send + Sync + 'static {
23 async fn create_tunnel(&self) -> CmdResult<Splittable<R, W>>;
24}
25
26pub struct CommonCmdSend<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD>
27where LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
28 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
29 pub(crate) recv_handle: JoinHandle<CmdResult<()>>,
30 pub(crate) write: ObjectHolder<WHalf<R, W>>,
31 pub(crate) is_work: bool,
32 pub(crate) tunnel_id: TunnelId,
33 pub(crate) remote_id: PeerId,
34 pub(crate) resp_waiter: RespWaiterRef,
35 pub(crate) tunnel_meta: Option<Arc<M>>,
36 _p: std::marker::PhantomData<(LEN, CMD)>,
37
38}
39
40impl<M, R, W, LEN, CMD> CommonCmdSend<M, R, W, LEN, CMD>
53where M: CmdTunnelMeta,
54 R: CmdTunnelRead<M>,
55 W: CmdTunnelWrite<M>,
56 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
57 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
58 pub fn new(tunnel_id: TunnelId,
59 recv_handle: JoinHandle<CmdResult<()>>,
60 write: ObjectHolder<WHalf<R, W>>,
61 resp_waiter: RespWaiterRef,
62 remote_id: PeerId,
63 tunnel_meta: Option<Arc<M>>) -> Self {
64 Self {
65 recv_handle,
66 write,
67 is_work: true,
68 tunnel_id,
69 remote_id,
70 resp_waiter,
71 tunnel_meta,
72 _p: Default::default(),
73 }
74 }
75
76 pub fn get_tunnel_id(&self) -> TunnelId {
77 self.tunnel_id
78 }
79
80 pub fn set_disable(&mut self) {
81 self.is_work = false;
82 self.recv_handle.abort();
83 }
84
85 pub async fn send(&mut self, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
86 log::trace!("client {:?} send cmd: {:?}, len: {} data:{}", self.tunnel_id, cmd, body.len(), hex::encode(body));
87 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
88 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
89 let ret = self.send_inner(buf.as_slice(), body).await;
90 if let Err(e) = ret {
91 self.set_disable();
92 return Err(e);
93 }
94 Ok(())
95 }
96
97 pub async fn send_with_resp(&mut self, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
98 if let Some(id) = tokio::task::try_id() {
99 if id == self.recv_handle.id() {
100 return Err(cmd_err!(CmdErrorCode::Failed, "can't send with resp in recv task"));
101 }
102 }
103 log::trace!("client {:?} send cmd: {:?}, len: {}, data: {}", self.tunnel_id, cmd, body.len(), hex::encode(body));
104 let seq = gen_seq();
105 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len() as u64).unwrap());
106 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
107 let resp_id = gen_resp_id(cmd, seq);
108 let waiter = self.resp_waiter.clone();
109 let resp_waiter = waiter.create_timeout_result_future(resp_id, timeout)
110 .map_err(into_cmd_err!(CmdErrorCode::Failed, "create timeout result future error"))?;
111 let ret = self.send_inner(buf.as_slice(), body).await;
112 if let Err(e) = ret {
113 self.set_disable();
114 return Err(e);
115 }
116 let resp = resp_waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout, "recv resp error"))?;
117 Ok(resp)
118 }
119
120 pub async fn send2(&mut self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
121 let mut len = 0;
122 for b in body.iter() {
123 len += b.len();
124 log::trace!("client {:?} send2 cmd: {:?}, data {}", self.tunnel_id, cmd, hex::encode(b));
125 }
126 log::trace!("client {:?} send2 cmd: {:?}, len {}", self.tunnel_id, cmd, len);
127 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
128 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
129 let ret = self.send_inner2(buf.as_slice(), body).await;
130 if let Err(e) = ret {
131 self.set_disable();
132 return Err(e);
133 }
134 Ok(())
135 }
136
137 pub async fn send2_with_resp(&mut self, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
138 if let Some(id) = tokio::task::try_id() {
139 if id == self.recv_handle.id() {
140 return Err(cmd_err!(CmdErrorCode::Failed, "can't send with resp in recv task"));
141 }
142 }
143 let mut len = 0;
144 for b in body.iter() {
145 len += b.len();
146 log::trace!("client {:?} send2 cmd {:?} body: {}", self.tunnel_id, cmd, hex::encode(b));
147 }
148 log::trace!("client {:?} send2 cmd: {:?}, len {}", self.tunnel_id, cmd, len);
149 let seq = gen_seq();
150 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(len as u64).unwrap());
151 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
152 let resp_id = gen_resp_id(cmd, seq);
153 let waiter = self.resp_waiter.clone();
154 let resp_waiter = waiter.create_timeout_result_future(resp_id, timeout)
155 .map_err(into_cmd_err!(CmdErrorCode::Failed, "create timeout result future error"))?;
156 let ret = self.send_inner2(buf.as_slice(), body).await;
157 if let Err(e) = ret {
158 self.set_disable();
159 return Err(e);
160 }
161 let resp = resp_waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout, "recv resp error"))?;
162 Ok(resp)
163 }
164
165 pub async fn send_cmd(&mut self, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
166 log::trace!("client {:?} send cmd: {:?}, len: {}", self.tunnel_id, cmd, body.len());
167 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len()).unwrap());
168 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
169 let ret = self.send_inner_cmd(buf.as_slice(), body).await;
170 if let Err(e) = ret {
171 self.set_disable();
172 return Err(e);
173 }
174 Ok(())
175 }
176
177 pub async fn send_cmd_with_resp(&mut self, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
178 if let Some(id) = tokio::task::try_id() {
179 if id == self.recv_handle.id() {
180 return Err(cmd_err!(CmdErrorCode::Failed, "can't send with resp in recv task"));
181 }
182 }
183 log::trace!("client {:?} send cmd: {:?}, len: {}", self.tunnel_id, cmd, body.len());
184 let seq = gen_seq();
185 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len()).unwrap());
186 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
187 let resp_id = gen_resp_id(cmd, seq);
188 let waiter = self.resp_waiter.clone();
189 let resp_waiter = waiter.create_timeout_result_future(resp_id, timeout)
190 .map_err(into_cmd_err!(CmdErrorCode::Failed, "create timeout result future error"))?;
191 let ret = self.send_inner_cmd(buf.as_slice(), body).await;
192 if let Err(e) = ret {
193 self.set_disable();
194 return Err(e);
195 }
196 let resp = resp_waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout, "recv resp error"))?;
197 Ok(resp)
198 }
199
200 async fn send_inner(&mut self, header: &[u8], body: &[u8]) -> CmdResult<()> {
201 let mut write = self.write.get().await;
202 if header.len() > 255 {
203 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too long"));
204 }
205 write.write_u8(header.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
206 write.write_all(header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
207 write.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
208 write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
209 Ok(())
210 }
211
212 async fn send_inner2(&mut self, header: &[u8], body: &[&[u8]]) -> CmdResult<()> {
213 let mut write = self.write.get().await;
214 if header.len() > 255 {
215 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too long"));
216 }
217 write.write_u8(header.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
218 write.write_all(header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
219 for b in body.iter() {
220 write.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
221 }
222 write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
223 Ok(())
224 }
225
226 async fn send_inner_cmd(&mut self, header: &[u8], mut body: CmdBody) -> CmdResult<()> {
227 let mut write = self.write.get().await;
228 if header.len() > 255 {
229 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
230 }
231 write.write_u8(header.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
232 write.write_all(header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
233 tokio::io::copy(&mut body, write.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
234 write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
235 Ok(())
236 }
237}
238
239impl<M, R, W, LEN, CMD> Drop for CommonCmdSend<M, R, W, LEN, CMD>
240where M: CmdTunnelMeta,
241 R: CmdTunnelRead<M>,
242 W: CmdTunnelWrite<M>,
243 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
244 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
245 fn drop(&mut self) {
246 self.set_disable();
247 }
248}
249
250impl<M, R, W, LEN, CMD> CmdSend<M> for CommonCmdSend<M, R, W, LEN, CMD>
251where M: CmdTunnelMeta,
252 R: CmdTunnelRead<M>,
253 W: CmdTunnelWrite<M>,
254 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
255 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
256 fn get_tunnel_meta(&self) -> Option<Arc<M>> {
257 self.tunnel_meta.clone()
258 }
259
260 fn get_remote_peer_id(&self) -> PeerId {
261 self.remote_id.clone()
262 }
263}
264
265impl<M, R, W, LEN, CMD> ClassifiedWorker<TunnelId> for CommonCmdSend<M, R, W, LEN, CMD>
266where M: CmdTunnelMeta,
267 R: CmdTunnelRead<M>,
268 W: CmdTunnelWrite<M>,
269 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
270 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
271 fn is_work(&self) -> bool {
272 self.is_work && !self.recv_handle.is_finished()
273 }
274
275 fn is_valid(&self, c: TunnelId) -> bool {
276 self.tunnel_id == c
277 }
278
279 fn classification(&self) -> TunnelId {
280 self.tunnel_id
281 }
282}
283
284pub struct ClassifiedSendGuard<
285 C: WorkerClassification,
286 M: CmdTunnelMeta,
287 CW: ClassifiedWorker<C> + CmdSend<M>,
288 F: ClassifiedWorkerFactory<C, CW>> {
289 pub(crate) worker_guard: ClassifiedWorkerGuard<C, CW, F>,
290 pub(crate) _p: PhantomData<M>,
291}
292
293impl<
294 C: WorkerClassification,
295 M: CmdTunnelMeta,
296 CW: ClassifiedWorker<C> + CmdSend<M>,
297 F: ClassifiedWorkerFactory<C, CW>> Deref for ClassifiedSendGuard<C, M, CW, F> {
298 type Target = CW;
299
300 fn deref(&self) -> &Self::Target {
301 &self.worker_guard.deref()
302 }
303}
304
305impl<
306 C: WorkerClassification,
307 M: CmdTunnelMeta,
308 CW: ClassifiedWorker<C> + CmdSend<M>,
309 F: ClassifiedWorkerFactory<C, CW>> SendGuard<M, CW> for ClassifiedSendGuard<C, M, CW, F> {
310}
311
312pub struct CmdWriteFactory<M: CmdTunnelMeta,
313 R: CmdTunnelRead<M>,
314 W: CmdTunnelWrite<M>,
315 F: CmdTunnelFactory<M, R, W>,
316 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
317 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug> {
318 tunnel_factory: F,
319 cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
320 resp_waiter: RespWaiterRef,
321 tunnel_id_generator: TunnelIdGenerator,
322 p: std::marker::PhantomData<Mutex<(R, W, M)>>,
323}
324
325impl<M: CmdTunnelMeta,
326 R: CmdTunnelRead<M>,
327 W: CmdTunnelWrite<M>,
328 F: CmdTunnelFactory<M, R, W>,
329 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
330 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug> CmdWriteFactory<M, R, W, F, LEN, CMD> {
331 pub(crate) fn new(tunnel_factory: F,
332 cmd_handler: impl CmdHandler<LEN, CMD>,
333 resp_waiter: RespWaiterRef,) -> Self {
334 Self {
335 tunnel_factory,
336 cmd_handler: Arc::new(cmd_handler),
337 resp_waiter,
338 tunnel_id_generator: TunnelIdGenerator::new(),
339 p: Default::default(),
340 }
341 }
342}
343
344#[async_trait::async_trait]
345impl<M: CmdTunnelMeta,
346 R: CmdTunnelRead<M>,
347 W: CmdTunnelWrite<M>,
348 F: CmdTunnelFactory<M, R, W>,
349 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
350 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug> ClassifiedWorkerFactory<TunnelId, CommonCmdSend<M, R, W, LEN, CMD>> for CmdWriteFactory<M, R, W, F, LEN, CMD> {
351 async fn create(&self, c: Option<TunnelId>) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
352 if c.is_some() {
353 return Err(pool_err!(PoolErrorCode::Failed, "tunnel {:?} not found", c.unwrap()));
354 }
355 let tunnel = self.tunnel_factory.create_tunnel().await.map_err(into_pool_err!(PoolErrorCode::Failed))?;
356 let peer_id = tunnel.get_remote_peer_id();
357 let tunnel_id = self.tunnel_id_generator.generate();
358 let (mut recv, write) = tunnel.split();
359 let remote_id = write.get_remote_peer_id();
360 let meta = write.get_tunnel_meta();
361 let write = ObjectHolder::new(write);
362 let resp_write = write.clone();
363 let cmd_handler = self.cmd_handler.clone();
364 let handle = spawn(async move {
365 let ret: CmdResult<()> = async move {
366 loop {
367 let header_len = recv.read_u8().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
368 let mut header = vec![0u8; header_len as usize];
369 let n = recv.read_exact(header.as_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
370 if n == 0 {
371 break;
372 }
373 let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice()).map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
374 log::trace!("recv cmd {:?} from {} len {}", header.cmd_code(), peer_id.to_base58(), header.pkg_len().to_u64().unwrap());
375 let body_len = header.pkg_len().to_u64().unwrap();
376 let cmd_read = CmdBodyRead::new(recv, header.pkg_len().to_u64().unwrap() as usize);
377 let waiter = cmd_read.get_waiter();
378 let future = waiter.create_result_future().map_err(into_cmd_err!(CmdErrorCode::Failed))?;
379 let version = header.version();
380 let seq = header.seq();
381 let cmd_code = header.cmd_code();
382 match cmd_handler.handle(peer_id.clone(), tunnel_id, header, CmdBody::from_reader(BufReader::new(cmd_read), body_len)).await {
383 Ok(Some(mut body)) => {
384 let mut write = resp_write.get().await;
385 let header = CmdHeader::<LEN, CMD>::new(version, true, seq, cmd_code, LEN::from_u64(body.len()).unwrap());
386 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
387 if buf.len() > 255 {
388 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too long"));
389 }
390 write.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
391 write.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
392 tokio::io::copy(&mut body, write.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
393 write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
394 }
395 Ok(None) => {
396
397 }
398 Err(e) => {
399 log::error!("handle cmd error: {:?}", e);
400 }
401 }
402 recv = future.await.map_err(into_cmd_err!(CmdErrorCode::Failed))??;
403 }
404 Ok(())
405 }.await;
406 ret
407 });
408 Ok(CommonCmdSend::new(tunnel_id, handle, write, self.resp_waiter.clone(), remote_id, meta))
409 }
410}
411
412pub struct DefaultCmdClient<M: CmdTunnelMeta,
413 R: CmdTunnelRead<M>,
414 W: CmdTunnelWrite<M>,
415 F: CmdTunnelFactory<M, R, W>,
416 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
417 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Eq + Hash + Debug> {
418 tunnel_pool: ClassifiedWorkerPoolRef<TunnelId, CommonCmdSend<M, R, W, LEN, CMD>, CmdWriteFactory<M, R, W, F, LEN, CMD>>,
419 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
420}
421
422impl<M: CmdTunnelMeta,
423 R: CmdTunnelRead<M>,
424 W: CmdTunnelWrite<M>,
425 F: CmdTunnelFactory<M, R, W>,
426 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
427 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Eq + Hash + Debug> DefaultCmdClient<M, R, W, F, LEN, CMD> {
428 pub fn new(factory: F, tunnel_count: u16) -> Arc<Self> {
429 let cmd_handler_map = Arc::new(CmdHandlerMap::new());
430 let handler_map = cmd_handler_map.clone();
431 let resp_waiter = Arc::new(RespWaiter::new());
432 let waiter = resp_waiter.clone();
433 Arc::new(Self {
434 tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, CmdWriteFactory::<M, R, W, _, LEN, CMD>::new(factory, move |peer_id: PeerId, tunnel_id: TunnelId, header: CmdHeader<LEN, CMD>, body_read: CmdBody| {
435 let handler_map = handler_map.clone();
436 let waiter = waiter.clone();
437 async move {
438 if header.is_resp() && header.seq().is_some() {
439 let resp_id = gen_resp_id(header.cmd_code(), header.seq().unwrap());
440 let _ = waiter.set_result(resp_id, body_read);
441 Ok(None)
442 } else {
443 if let Some(handler) = handler_map.get(header.cmd_code()) {
444 handler.handle(peer_id, tunnel_id, header, body_read).await
445 } else {
446 Ok(None)
447 }
448 }
449 }
450 }, resp_waiter.clone())),
451 cmd_handler_map,
452 })
453 }
454
455 async fn get_send(&self) -> CmdResult<ClassifiedWorkerGuard<TunnelId, CommonCmdSend<M, R, W, LEN, CMD>, CmdWriteFactory<M, R, W, F, LEN, CMD>>> {
456 self.tunnel_pool.get_worker().await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
457 }
458
459 async fn get_send_of_tunnel_id(&self, tunnel_id: TunnelId) -> CmdResult<ClassifiedWorkerGuard<TunnelId, CommonCmdSend<M, R, W, LEN, CMD>, CmdWriteFactory<M, R, W, F, LEN, CMD>>> {
460 self.tunnel_pool.get_classified_worker(tunnel_id).await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
461 }
462}
463
464pub type CmdClientSendGuard<M, R, W, F, LEN, CMD> = ClassifiedSendGuard<TunnelId, M, CommonCmdSend<M, R, W, LEN, CMD>, CmdWriteFactory<M, R, W, F, LEN, CMD>>;
465#[async_trait::async_trait]
466impl<M: CmdTunnelMeta,
467 R: CmdTunnelRead<M>,
468 W: CmdTunnelWrite<M>,
469 F: CmdTunnelFactory<M, R, W>,
470 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
471 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Eq + Hash + Debug,
472 > CmdClient<LEN, CMD, M, CommonCmdSend<M, R, W, LEN, CMD>, CmdClientSendGuard<M, R, W, F, LEN, CMD>, > for DefaultCmdClient<M, R, W, F, LEN, CMD> {
473 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
474 self.cmd_handler_map.insert(cmd, handler);
475 }
476
477 async fn send(&self, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
478 let mut send = self.get_send().await?;
479 send.send(cmd, version, body).await
480 }
481
482 async fn send_with_resp(&self, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
483 let mut send = self.get_send().await?;
484 send.send_with_resp(cmd, version, body, timeout).await
485 }
486
487 async fn send2(&self, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
488 let mut send = self.get_send().await?;
489 send.send2(cmd, version, body).await
490 }
491
492 async fn send2_with_resp(&self, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
493 let mut send = self.get_send().await?;
494 send.send2_with_resp(cmd, version, body, timeout).await
495 }
496
497 async fn send_cmd(&self, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
498 let mut send = self.get_send().await?;
499 send.send_cmd(cmd, version, body).await
500 }
501
502 async fn send_cmd_with_resp(&self, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
503 let mut send = self.get_send().await?;
504 send.send_cmd_with_resp(cmd, version, body, timeout).await
505 }
506
507 async fn send_by_specify_tunnel(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
508 let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
509 send.send(cmd, version, body).await
510 }
511
512 async fn send_by_specify_tunnel_with_resp(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
513 let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
514 send.send_with_resp(cmd, version, body, timeout).await
515 }
516
517 async fn send2_by_specify_tunnel(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
518 let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
519 send.send2(cmd, version, body).await
520 }
521
522 async fn send2_by_specify_tunnel_with_resp(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
523 let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
524 send.send2_with_resp(cmd, version, body, timeout).await
525 }
526
527 async fn send_cmd_by_specify_tunnel(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
528 let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
529 send.send_cmd(cmd, version, body).await
530 }
531
532 async fn send_cmd_by_specify_tunnel_with_resp(&self, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
533 let mut send = self.get_send_of_tunnel_id(tunnel_id).await?;
534 send.send_cmd_with_resp(cmd, version, body, timeout).await
535 }
536
537 async fn clear_all_tunnel(&self) {
538 self.tunnel_pool.clear_all_worker().await;
539 }
540
541 async fn get_send(&self, tunnel_id: TunnelId) -> CmdResult<CmdClientSendGuard<M, R, W, F, LEN, CMD>> {
542 Ok(ClassifiedSendGuard {
543 worker_guard: self.get_send_of_tunnel_id(tunnel_id).await?,
544 _p: Default::default(),
545 })
546 }
547}