sfo_cmd_server/node/
classified_node.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6use async_named_locker::ObjectHolder;
7use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
8use num::{FromPrimitive, ToPrimitive};
9use sfo_pool::{ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool, ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult, WorkerClassification};
10use sfo_split::Splittable;
11use crate::{into_pool_err, pool_err, ClassifiedCmdNode, CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, PeerId, TunnelId, TunnelIdGenerator};
12use crate::client::{gen_resp_id, ClassifiedCmdSend, ClassifiedCmdTunnelRead, ClassifiedCmdTunnelWrite, ClassifiedSendGuard, RespWaiter, RespWaiterRef};
13use crate::cmd::CmdHandlerMap;
14use crate::errors::{into_cmd_err, CmdErrorCode, CmdResult};
15use crate::node::create_recv_handle;
16use crate::server::CmdTunnelListener;
17
18#[derive(Debug, Clone, Eq, Hash)]
19pub struct CmdNodeTunnelClassification<C: WorkerClassification> {
20    pub peer_id: Option<PeerId>,
21    pub tunnel_id: Option<TunnelId>,
22    pub classification: Option<C>,
23}
24
25impl<C: WorkerClassification> PartialEq for CmdNodeTunnelClassification<C> {
26    fn eq(&self, other: &Self) -> bool {
27        self.peer_id == other.peer_id && self.tunnel_id == other.tunnel_id && self.classification == other.classification
28    }
29}
30
31impl<C, M, R, W, LEN, CMD> ClassifiedWorker<CmdNodeTunnelClassification<C>> for ClassifiedCmdSend<C, M, R, W, LEN, CMD>
32where C: WorkerClassification,
33      M: CmdTunnelMeta,
34      R: ClassifiedCmdTunnelRead<C, M>,
35      W: ClassifiedCmdTunnelWrite<C, M>,
36      LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
37      CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
38    fn is_work(&self) -> bool {
39        self.is_work && !self.recv_handle.is_finished()
40    }
41
42    fn is_valid(&self, c: CmdNodeTunnelClassification<C>) -> bool {
43        if c.peer_id.is_some() && c.peer_id.as_ref().unwrap() != &self.remote_id {
44            return false;
45        }
46
47        if c.tunnel_id.is_some() {
48            self.tunnel_id == c.tunnel_id.unwrap()
49        } else {
50            if c.classification.is_some() {
51                self.classification == c.classification.unwrap()
52            } else {
53                true
54            }
55        }
56    }
57
58    fn classification(&self) -> CmdNodeTunnelClassification<C> {
59        CmdNodeTunnelClassification {
60            peer_id: Some(self.remote_id.clone()),
61            tunnel_id: Some(self.tunnel_id),
62            classification: Some(self.classification.clone()),
63        }
64    }
65}
66
67#[async_trait::async_trait]
68pub trait ClassifiedCmdNodeTunnelFactory<C: WorkerClassification, M: CmdTunnelMeta, R: ClassifiedCmdTunnelRead<C, M>, W: ClassifiedCmdTunnelWrite<C, M>>: Send + Sync + 'static {
69    async fn create_tunnel(&self, classification: Option<CmdNodeTunnelClassification<C>>) -> CmdResult<Splittable<R, W>>;
70}
71
72struct CmdWriteFactoryImpl<C: WorkerClassification,
73    M: CmdTunnelMeta,
74    R: ClassifiedCmdTunnelRead<C, M>,
75    W: ClassifiedCmdTunnelWrite<C, M>,
76    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
77    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
78    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
79    LISTENER: CmdTunnelListener<M, R, W>> {
80    tunnel_listener: LISTENER,
81    tunnel_factory: F,
82    cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
83    tunnel_id_generator: TunnelIdGenerator,
84    resp_waiter: RespWaiterRef,
85    send_cache: Arc<Mutex<HashMap<PeerId, Vec<ClassifiedCmdSend<C, M, R, W, LEN, CMD>>>>>,
86}
87
88
89impl<C: WorkerClassification,
90    M: CmdTunnelMeta,
91    R: ClassifiedCmdTunnelRead<C, M>,
92    W: ClassifiedCmdTunnelWrite<C, M>,
93    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
94    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
95    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
96    LISTENER: CmdTunnelListener<M, R, W>> CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER> {
97    pub fn new(tunnel_factory: F,
98               tunnel_listener: LISTENER,
99               cmd_handler: impl CmdHandler<LEN, CMD>,
100               resp_waiter: RespWaiterRef,) -> Self {
101        Self {
102            tunnel_listener,
103            tunnel_factory,
104            cmd_handler: Arc::new(cmd_handler),
105            tunnel_id_generator: TunnelIdGenerator::new(),
106            resp_waiter,
107            send_cache: Arc::new(Mutex::new(Default::default())),
108        }
109    }
110
111
112    pub fn start(self: &Arc<Self>) {
113        let this = self.clone();
114        tokio::spawn(async move {
115            if let Err(e) = this.run().await {
116                log::error!("cmd server error: {:?}", e);
117            }
118        });
119    }
120
121    async fn run(self: &Arc<Self>) -> CmdResult<()> {
122        loop {
123            let tunnel = self.tunnel_listener.accept().await?;
124            let peer_id = tunnel.get_remote_peer_id();
125            let classification = tunnel.get_classification();
126            let tunnel_id = self.tunnel_id_generator.generate();
127            let this = self.clone();
128            let resp_waiter = self.resp_waiter.clone();
129            tokio::spawn(async move {
130                let ret: CmdResult<()> = async move {
131                    let this = this.clone();
132                    let cmd_handler = this.cmd_handler.clone();
133                    let (reader, writer) = tunnel.split();
134                    let tunnel_meta = reader.get_tunnel_meta();
135                    let remote_id = writer.get_remote_peer_id();
136                    let writer = ObjectHolder::new(writer);
137                    let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(reader, writer.clone(), tunnel_id, cmd_handler);
138                    {
139                        let mut send_cache = this.send_cache.lock().unwrap();
140                        let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
141                        send_list.push(ClassifiedCmdSend::new(tunnel_id, classification, recv_handle, writer, resp_waiter, remote_id, tunnel_meta));
142                    }
143                    Ok(())
144                }.await;
145                if let Err(e) = ret {
146                    log::error!("peer connection error: {:?}", e);
147                }
148            });
149        }
150    }
151}
152
153#[async_trait::async_trait]
154impl<C: WorkerClassification,
155    M: CmdTunnelMeta,
156    R: ClassifiedCmdTunnelRead<C, M>,
157    W: ClassifiedCmdTunnelWrite<C, M>,
158    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
159    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
160    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
161    LISTENER: CmdTunnelListener<M, R, W>> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>> for CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER> {
162    async fn create(&self, c: Option<CmdNodeTunnelClassification<C>>) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
163        if c.is_some() {
164            let classification = c.unwrap();
165            if classification.peer_id.is_some() {
166                let peer_id = classification.peer_id.clone().unwrap();
167                let tunnel_id = classification.tunnel_id;
168                if tunnel_id.is_some() {
169                    let mut send_cache = self.send_cache.lock().unwrap();
170                    if let Some(send_list) = send_cache.get_mut(&peer_id) {
171                        let mut send_index = None;
172                        for (index, send) in send_list.iter().enumerate() {
173                            if send.get_tunnel_id() == tunnel_id.unwrap() {
174                                send_index = Some(index);
175                                break;
176                            }
177                        }
178                        if let Some(send_index) = send_index {
179                            let send = send_list.remove(send_index);
180                            Ok(send)
181                        } else {
182                            Err(pool_err!(PoolErrorCode::Failed, "tunnel {:?} not found", tunnel_id.unwrap()))
183                        }
184                    } else {
185                        Err(pool_err!(PoolErrorCode::Failed, "tunnel {:?} not found", tunnel_id.unwrap()))
186                    }
187                } else {
188                    {
189                        let mut send_cache = self.send_cache.lock().unwrap();
190                        if let Some(send_list) = send_cache.get_mut(&peer_id) {
191                            if !send_list.is_empty() {
192                                let send = send_list.pop().unwrap();
193                                if send_list.is_empty() {
194                                    send_cache.remove(&peer_id);
195                                }
196                                return Ok(send);
197                            }
198                        }
199                    }
200                    let tunnel = self.tunnel_factory.create_tunnel(Some(classification)).await.map_err(into_pool_err!(PoolErrorCode::Failed))?;
201                    let classification = tunnel.get_classification();
202                    let tunnel_id = self.tunnel_id_generator.generate();
203                    let (recv, write) = tunnel.split();
204                    let remote_id = write.get_remote_peer_id();
205                    let tunnel_meta = recv.get_tunnel_meta();
206                    let write = ObjectHolder::new(write);
207                    let cmd_handler = self.cmd_handler.clone();
208                    let handle = create_recv_handle::<M, R, W, LEN, CMD>(recv, write.clone(), tunnel_id, cmd_handler);
209                    Ok(ClassifiedCmdSend::new(tunnel_id, classification, handle, write, self.resp_waiter.clone(), remote_id, tunnel_meta))
210                }
211            } else {
212                if classification.tunnel_id.is_some() {
213                    Err(pool_err!(PoolErrorCode::Failed, "must set peer id when set tunnel id"))
214                } else {
215                    let tunnel = self.tunnel_factory.create_tunnel(Some(classification)).await.map_err(into_pool_err!(PoolErrorCode::Failed))?;
216                    let classification = tunnel.get_classification();
217                    let tunnel_id = self.tunnel_id_generator.generate();
218                    let (recv, write) = tunnel.split();
219                    let remote_id = write.get_remote_peer_id();
220                    let tunnel_meta = write.get_tunnel_meta();
221                    let write = ObjectHolder::new(write);
222                    let cmd_handler = self.cmd_handler.clone();
223                    let handle = create_recv_handle::<M, R, W, LEN, CMD>(recv, write.clone(), tunnel_id, cmd_handler);
224                    Ok(ClassifiedCmdSend::new(tunnel_id, classification, handle, write, self.resp_waiter.clone(), remote_id, tunnel_meta))
225                }
226            }
227
228        } else {
229            Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
230        }
231    }
232}
233
234pub struct ClassifiedCmdNodeWriteFactory<C: WorkerClassification,
235    M: CmdTunnelMeta,
236    R: ClassifiedCmdTunnelRead<C, M>,
237    W: ClassifiedCmdTunnelWrite<C, M>,
238    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
239    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
240    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
241    LISTENER: CmdTunnelListener<M, R, W>> {
242    inner: Arc<CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>>
243}
244
245
246impl<C: WorkerClassification,
247    M: CmdTunnelMeta,
248    R: ClassifiedCmdTunnelRead<C, M>,
249    W: ClassifiedCmdTunnelWrite<C, M>,
250    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
251    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes + RawFixedBytes,
252    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes + RawFixedBytes,
253    LISTENER: CmdTunnelListener<M, R, W>> ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER> {
254    pub(crate) fn new(tunnel_factory: F,
255               tunnel_listener: LISTENER,
256               cmd_handler: impl CmdHandler<LEN, CMD>,
257                      resp_waiter: RespWaiterRef) -> Self {
258        Self {
259            inner: Arc::new(CmdWriteFactoryImpl::new(tunnel_factory, tunnel_listener, cmd_handler, resp_waiter)),
260        }
261    }
262
263    pub fn start(&self) {
264        self.inner.start();
265    }
266}
267
268#[async_trait::async_trait]
269impl<C: WorkerClassification,
270    M: CmdTunnelMeta,
271    R: ClassifiedCmdTunnelRead<C, M>,
272    W: ClassifiedCmdTunnelWrite<C, M>,
273    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
274    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
275    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
276    LISTENER: CmdTunnelListener<M, R, W>> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>> for ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER> {
277    async fn create(&self, c: Option<CmdNodeTunnelClassification<C>>) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
278        self.inner.create(c).await
279    }
280}
281
282pub struct DefaultClassifiedCmdNode<C: WorkerClassification,
283    M: CmdTunnelMeta,
284    R: ClassifiedCmdTunnelRead<C, M>,
285    W: ClassifiedCmdTunnelWrite<C, M>,
286    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
287    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + FromPrimitive + ToPrimitive + RawFixedBytes,
288    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + RawFixedBytes + Eq + Hash + Debug,
289    LISTENER: CmdTunnelListener<M, R, W>> {
290    tunnel_pool: ClassifiedWorkerPoolRef<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>,
291    cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
292}
293
294impl<C: WorkerClassification,
295    M: CmdTunnelMeta,
296    R: ClassifiedCmdTunnelRead<C, M>,
297    W: ClassifiedCmdTunnelWrite<C, M>,
298    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
299    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + FromPrimitive + ToPrimitive + RawFixedBytes,
300    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + RawFixedBytes + Eq + Hash + Debug,
301    LISTENER: CmdTunnelListener<M, R, W>> DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER> {
302    pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
303        let cmd_handler_map = Arc::new(CmdHandlerMap::new());
304        let handler_map = cmd_handler_map.clone();
305        let resp_waiter = Arc::new(RespWaiter::new());
306        let waiter = resp_waiter.clone();
307        let write_factory = ClassifiedCmdNodeWriteFactory::<C, M, R, W, _, LEN, CMD, LISTENER>::new(factory, listener, move |peer_id: PeerId, tunnel_id: TunnelId, header: CmdHeader<LEN, CMD>, body_read: CmdBody| {
308            let handler_map = handler_map.clone();
309            let waiter = waiter.clone();
310            async move {
311                if header.is_resp() && header.seq().is_some() {
312                    let resp_id = gen_resp_id(header.cmd_code(), header.seq().unwrap());
313                    let _ = waiter.set_result(resp_id, body_read);
314                    Ok(None)
315                } else {
316                    if let Some(handler) = handler_map.get(header.cmd_code()) {
317                        handler.handle(peer_id, tunnel_id, header, body_read).await
318                    } else {
319                        Ok(None)
320                    }
321                }
322            }
323        }, resp_waiter.clone());
324        write_factory.start();
325        Arc::new(Self {
326            tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
327            cmd_handler_map,
328        })
329    }
330
331
332    async fn get_send(&self, peer_id: PeerId) -> CmdResult<ClassifiedWorkerGuard<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>> {
333        self.tunnel_pool.get_classified_worker(CmdNodeTunnelClassification {
334            peer_id: Some(peer_id),
335            tunnel_id: None,
336            classification: None,
337        }).await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
338    }
339
340    async fn get_send_of_tunnel_id(&self, peer_id: PeerId, tunnel_id: TunnelId) -> CmdResult<ClassifiedWorkerGuard<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>> {
341        self.tunnel_pool.get_classified_worker(CmdNodeTunnelClassification {
342            peer_id: Some(peer_id),
343            tunnel_id: Some(tunnel_id),
344            classification: None,
345        }).await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
346    }
347
348    async fn get_classified_send(&self, classification: C) -> CmdResult<ClassifiedWorkerGuard<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>> {
349        self.tunnel_pool.get_classified_worker(CmdNodeTunnelClassification {
350            peer_id: None,
351            tunnel_id: None,
352            classification: Some(classification),
353        }).await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
354    }
355
356    async fn get_peer_classified_send(&self, peer_id: PeerId, classification: C) -> CmdResult<ClassifiedWorkerGuard<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>> {
357        self.tunnel_pool.get_classified_worker(CmdNodeTunnelClassification {
358            peer_id: Some(peer_id),
359            tunnel_id: None,
360            classification: Some(classification),
361        }).await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
362    }
363}
364
365pub type ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<CmdNodeTunnelClassification<C>, M, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>;
366#[async_trait::async_trait]
367impl<C: WorkerClassification,
368    M: CmdTunnelMeta,
369    R: ClassifiedCmdTunnelRead<C, M>,
370    W: ClassifiedCmdTunnelWrite<C, M>,
371    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
372    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + FromPrimitive + ToPrimitive + RawFixedBytes,
373    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + RawFixedBytes + Eq + Hash + Debug,
374    LISTENER: CmdTunnelListener<M, R, W>> CmdNode<LEN, CMD, M, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER> {
375    fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
376        self.cmd_handler_map.insert(cmd, handler)
377    }
378
379    async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
380        let mut send = self.get_send(peer_id.clone()).await?;
381        send.send(cmd, version, body).await
382    }
383
384    async fn send_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
385        let mut send = self.get_send(peer_id.clone()).await?;
386        send.send_with_resp(cmd, version, body, timeout).await
387    }
388
389    async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
390        let mut send = self.get_send(peer_id.clone()).await?;
391        send.send2(cmd, version, body).await
392    }
393
394    async fn send2_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
395        let mut send = self.get_send(peer_id.clone()).await?;
396        send.send2_with_resp(cmd, version, body, timeout).await
397    }
398
399    async fn send_cmd(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
400        let mut send = self.get_send(peer_id.clone()).await?;
401        send.send_cmd(cmd, version, body).await
402    }
403
404    async fn send_cmd_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
405        let mut send = self.get_send(peer_id.clone()).await?;
406        send.send_cmd_with_resp(cmd, version, body, timeout).await
407    }
408
409    async fn send_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
410        let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
411        send.send(cmd, version, body).await
412    }
413
414    async fn send_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
415        let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
416        send.send_with_resp(cmd, version, body, timeout).await
417    }
418
419    async fn send2_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
420        let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
421        send.send2(cmd, version, body).await
422    }
423
424    async fn send2_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
425        let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
426        send.send2_with_resp(cmd, version, body, timeout).await
427    }
428
429    async fn send_cmd_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
430        let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
431        send.send_cmd(cmd, version, body).await
432    }
433
434    async fn send_cmd_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
435        let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
436        send.send_cmd_with_resp(cmd, version, body, timeout).await
437    }
438
439    async fn clear_all_tunnel(&self) {
440        self.tunnel_pool.clear_all_worker().await;
441    }
442
443    async fn get_send(&self, peer_id: &PeerId, tunnel_id: TunnelId) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
444        Ok(ClassifiedSendGuard {
445            worker_guard: self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?,
446            _p: Default::default(),
447        })
448    }
449}
450
451#[async_trait::async_trait]
452impl<C: WorkerClassification,
453    M: CmdTunnelMeta,
454    R: ClassifiedCmdTunnelRead<C, M>,
455    W: ClassifiedCmdTunnelWrite<C, M>,
456    F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
457    LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + FromPrimitive + ToPrimitive + RawFixedBytes,
458    CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + RawFixedBytes + Eq + Hash + Debug,
459    LISTENER: CmdTunnelListener<M, R, W>> ClassifiedCmdNode<LEN, CMD, C, M, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER> {
460    async fn send_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
461        let mut send = self.get_classified_send(classification).await?;
462        send.send(cmd, version, body).await
463    }
464
465    async fn send_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
466        let mut send = self.get_classified_send(classification).await?;
467        send.send_with_resp(cmd, version, body, timeout).await
468    }
469
470    async fn send2_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
471        let mut send = self.get_classified_send(classification).await?;
472        send.send2(cmd, version, body).await
473    }
474
475    async fn send2_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
476        let mut send = self.get_classified_send(classification).await?;
477        send.send2_with_resp(cmd, version, body, timeout).await
478    }
479
480    async fn send_cmd_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
481        let mut send = self.get_classified_send(classification).await?;
482        send.send_cmd(cmd, version, body).await
483    }
484
485    async fn send_cmd_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
486        let mut send = self.get_classified_send(classification).await?;
487        send.send_cmd_with_resp(cmd, version, body, timeout).await
488    }
489
490    async fn send_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
491        let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
492        send.send(cmd, version, body).await
493    }
494
495    async fn send_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
496        let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
497        send.send_with_resp(cmd, version, body, timeout).await
498    }
499
500    async fn send2_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
501        let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
502        send.send2(cmd, version, body).await
503    }
504
505    async fn send2_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
506        let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
507        send.send2_with_resp(cmd, version, body, timeout).await
508    }
509
510    async fn send_cmd_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
511        let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
512        send.send_cmd(cmd, version, body).await
513    }
514
515    async fn send_cmd_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
516        let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
517        send.send_cmd_with_resp(cmd, version, body, timeout).await
518    }
519
520    async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId> {
521        let send = self.get_classified_send(classification).await?;
522        Ok(send.get_tunnel_id())
523    }
524
525    async fn find_tunnel_id_by_peer_classified(&self, peer_id: &PeerId, classification: C) -> CmdResult<TunnelId> {
526        let send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
527        Ok(send.get_tunnel_id())
528    }
529
530    async fn get_send_by_classified(&self, classification: C) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
531        Ok(ClassifiedSendGuard {
532            worker_guard: self.get_classified_send(classification).await?,
533            _p: Default::default(),
534        })
535    }
536
537    async fn get_send_by_peer_classified(&self, peer_id: &PeerId, classification: C) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
538        Ok(ClassifiedSendGuard {
539            worker_guard: self.get_peer_classified_send(peer_id.clone(), classification).await?,
540            _p: Default::default(),
541        })
542    }
543}