1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::hash::Hash;
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6use async_named_locker::ObjectHolder;
7use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
8use num::{FromPrimitive, ToPrimitive};
9use sfo_pool::{ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool, ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult, WorkerClassification};
10use sfo_split::Splittable;
11use crate::{into_pool_err, pool_err, ClassifiedCmdNode, CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, PeerId, TunnelId, TunnelIdGenerator};
12use crate::client::{gen_resp_id, ClassifiedCmdSend, ClassifiedCmdTunnelRead, ClassifiedCmdTunnelWrite, ClassifiedSendGuard, RespWaiter, RespWaiterRef};
13use crate::cmd::CmdHandlerMap;
14use crate::errors::{into_cmd_err, CmdErrorCode, CmdResult};
15use crate::node::create_recv_handle;
16use crate::server::CmdTunnelListener;
17
18#[derive(Debug, Clone, Eq, Hash)]
19pub struct CmdNodeTunnelClassification<C: WorkerClassification> {
20 pub peer_id: Option<PeerId>,
21 pub tunnel_id: Option<TunnelId>,
22 pub classification: Option<C>,
23}
24
25impl<C: WorkerClassification> PartialEq for CmdNodeTunnelClassification<C> {
26 fn eq(&self, other: &Self) -> bool {
27 self.peer_id == other.peer_id && self.tunnel_id == other.tunnel_id && self.classification == other.classification
28 }
29}
30
31impl<C, M, R, W, LEN, CMD> ClassifiedWorker<CmdNodeTunnelClassification<C>> for ClassifiedCmdSend<C, M, R, W, LEN, CMD>
32where C: WorkerClassification,
33 M: CmdTunnelMeta,
34 R: ClassifiedCmdTunnelRead<C, M>,
35 W: ClassifiedCmdTunnelWrite<C, M>,
36 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
37 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes {
38 fn is_work(&self) -> bool {
39 self.is_work && !self.recv_handle.is_finished()
40 }
41
42 fn is_valid(&self, c: CmdNodeTunnelClassification<C>) -> bool {
43 if c.peer_id.is_some() && c.peer_id.as_ref().unwrap() != &self.remote_id {
44 return false;
45 }
46
47 if c.tunnel_id.is_some() {
48 self.tunnel_id == c.tunnel_id.unwrap()
49 } else {
50 if c.classification.is_some() {
51 self.classification == c.classification.unwrap()
52 } else {
53 true
54 }
55 }
56 }
57
58 fn classification(&self) -> CmdNodeTunnelClassification<C> {
59 CmdNodeTunnelClassification {
60 peer_id: Some(self.remote_id.clone()),
61 tunnel_id: Some(self.tunnel_id),
62 classification: Some(self.classification.clone()),
63 }
64 }
65}
66
67#[async_trait::async_trait]
68pub trait ClassifiedCmdNodeTunnelFactory<C: WorkerClassification, M: CmdTunnelMeta, R: ClassifiedCmdTunnelRead<C, M>, W: ClassifiedCmdTunnelWrite<C, M>>: Send + Sync + 'static {
69 async fn create_tunnel(&self, classification: Option<CmdNodeTunnelClassification<C>>) -> CmdResult<Splittable<R, W>>;
70}
71
72struct CmdWriteFactoryImpl<C: WorkerClassification,
73 M: CmdTunnelMeta,
74 R: ClassifiedCmdTunnelRead<C, M>,
75 W: ClassifiedCmdTunnelWrite<C, M>,
76 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
77 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
78 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
79 LISTENER: CmdTunnelListener<M, R, W>> {
80 tunnel_listener: LISTENER,
81 tunnel_factory: F,
82 cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
83 tunnel_id_generator: TunnelIdGenerator,
84 resp_waiter: RespWaiterRef,
85 send_cache: Arc<Mutex<HashMap<PeerId, Vec<ClassifiedCmdSend<C, M, R, W, LEN, CMD>>>>>,
86}
87
88
89impl<C: WorkerClassification,
90 M: CmdTunnelMeta,
91 R: ClassifiedCmdTunnelRead<C, M>,
92 W: ClassifiedCmdTunnelWrite<C, M>,
93 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
94 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
95 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
96 LISTENER: CmdTunnelListener<M, R, W>> CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER> {
97 pub fn new(tunnel_factory: F,
98 tunnel_listener: LISTENER,
99 cmd_handler: impl CmdHandler<LEN, CMD>,
100 resp_waiter: RespWaiterRef,) -> Self {
101 Self {
102 tunnel_listener,
103 tunnel_factory,
104 cmd_handler: Arc::new(cmd_handler),
105 tunnel_id_generator: TunnelIdGenerator::new(),
106 resp_waiter,
107 send_cache: Arc::new(Mutex::new(Default::default())),
108 }
109 }
110
111
112 pub fn start(self: &Arc<Self>) {
113 let this = self.clone();
114 tokio::spawn(async move {
115 if let Err(e) = this.run().await {
116 log::error!("cmd server error: {:?}", e);
117 }
118 });
119 }
120
121 async fn run(self: &Arc<Self>) -> CmdResult<()> {
122 loop {
123 let tunnel = self.tunnel_listener.accept().await?;
124 let peer_id = tunnel.get_remote_peer_id();
125 let classification = tunnel.get_classification();
126 let tunnel_id = self.tunnel_id_generator.generate();
127 let this = self.clone();
128 let resp_waiter = self.resp_waiter.clone();
129 tokio::spawn(async move {
130 let ret: CmdResult<()> = async move {
131 let this = this.clone();
132 let cmd_handler = this.cmd_handler.clone();
133 let (reader, writer) = tunnel.split();
134 let tunnel_meta = reader.get_tunnel_meta();
135 let remote_id = writer.get_remote_peer_id();
136 let writer = ObjectHolder::new(writer);
137 let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(reader, writer.clone(), tunnel_id, cmd_handler);
138 {
139 let mut send_cache = this.send_cache.lock().unwrap();
140 let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
141 send_list.push(ClassifiedCmdSend::new(tunnel_id, classification, recv_handle, writer, resp_waiter, remote_id, tunnel_meta));
142 }
143 Ok(())
144 }.await;
145 if let Err(e) = ret {
146 log::error!("peer connection error: {:?}", e);
147 }
148 });
149 }
150 }
151}
152
153#[async_trait::async_trait]
154impl<C: WorkerClassification,
155 M: CmdTunnelMeta,
156 R: ClassifiedCmdTunnelRead<C, M>,
157 W: ClassifiedCmdTunnelWrite<C, M>,
158 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
159 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
160 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
161 LISTENER: CmdTunnelListener<M, R, W>> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>> for CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER> {
162 async fn create(&self, c: Option<CmdNodeTunnelClassification<C>>) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
163 if c.is_some() {
164 let classification = c.unwrap();
165 if classification.peer_id.is_some() {
166 let peer_id = classification.peer_id.clone().unwrap();
167 let tunnel_id = classification.tunnel_id;
168 if tunnel_id.is_some() {
169 let mut send_cache = self.send_cache.lock().unwrap();
170 if let Some(send_list) = send_cache.get_mut(&peer_id) {
171 let mut send_index = None;
172 for (index, send) in send_list.iter().enumerate() {
173 if send.get_tunnel_id() == tunnel_id.unwrap() {
174 send_index = Some(index);
175 break;
176 }
177 }
178 if let Some(send_index) = send_index {
179 let send = send_list.remove(send_index);
180 Ok(send)
181 } else {
182 Err(pool_err!(PoolErrorCode::Failed, "tunnel {:?} not found", tunnel_id.unwrap()))
183 }
184 } else {
185 Err(pool_err!(PoolErrorCode::Failed, "tunnel {:?} not found", tunnel_id.unwrap()))
186 }
187 } else {
188 {
189 let mut send_cache = self.send_cache.lock().unwrap();
190 if let Some(send_list) = send_cache.get_mut(&peer_id) {
191 if !send_list.is_empty() {
192 let send = send_list.pop().unwrap();
193 if send_list.is_empty() {
194 send_cache.remove(&peer_id);
195 }
196 return Ok(send);
197 }
198 }
199 }
200 let tunnel = self.tunnel_factory.create_tunnel(Some(classification)).await.map_err(into_pool_err!(PoolErrorCode::Failed))?;
201 let classification = tunnel.get_classification();
202 let tunnel_id = self.tunnel_id_generator.generate();
203 let (recv, write) = tunnel.split();
204 let remote_id = write.get_remote_peer_id();
205 let tunnel_meta = recv.get_tunnel_meta();
206 let write = ObjectHolder::new(write);
207 let cmd_handler = self.cmd_handler.clone();
208 let handle = create_recv_handle::<M, R, W, LEN, CMD>(recv, write.clone(), tunnel_id, cmd_handler);
209 Ok(ClassifiedCmdSend::new(tunnel_id, classification, handle, write, self.resp_waiter.clone(), remote_id, tunnel_meta))
210 }
211 } else {
212 if classification.tunnel_id.is_some() {
213 Err(pool_err!(PoolErrorCode::Failed, "must set peer id when set tunnel id"))
214 } else {
215 let tunnel = self.tunnel_factory.create_tunnel(Some(classification)).await.map_err(into_pool_err!(PoolErrorCode::Failed))?;
216 let classification = tunnel.get_classification();
217 let tunnel_id = self.tunnel_id_generator.generate();
218 let (recv, write) = tunnel.split();
219 let remote_id = write.get_remote_peer_id();
220 let tunnel_meta = write.get_tunnel_meta();
221 let write = ObjectHolder::new(write);
222 let cmd_handler = self.cmd_handler.clone();
223 let handle = create_recv_handle::<M, R, W, LEN, CMD>(recv, write.clone(), tunnel_id, cmd_handler);
224 Ok(ClassifiedCmdSend::new(tunnel_id, classification, handle, write, self.resp_waiter.clone(), remote_id, tunnel_meta))
225 }
226 }
227
228 } else {
229 Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
230 }
231 }
232}
233
234pub struct ClassifiedCmdNodeWriteFactory<C: WorkerClassification,
235 M: CmdTunnelMeta,
236 R: ClassifiedCmdTunnelRead<C, M>,
237 W: ClassifiedCmdTunnelWrite<C, M>,
238 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
239 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
240 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
241 LISTENER: CmdTunnelListener<M, R, W>> {
242 inner: Arc<CmdWriteFactoryImpl<C, M, R, W, F, LEN, CMD, LISTENER>>
243}
244
245
246impl<C: WorkerClassification,
247 M: CmdTunnelMeta,
248 R: ClassifiedCmdTunnelRead<C, M>,
249 W: ClassifiedCmdTunnelWrite<C, M>,
250 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
251 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes + RawFixedBytes,
252 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes + RawFixedBytes,
253 LISTENER: CmdTunnelListener<M, R, W>> ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER> {
254 pub(crate) fn new(tunnel_factory: F,
255 tunnel_listener: LISTENER,
256 cmd_handler: impl CmdHandler<LEN, CMD>,
257 resp_waiter: RespWaiterRef) -> Self {
258 Self {
259 inner: Arc::new(CmdWriteFactoryImpl::new(tunnel_factory, tunnel_listener, cmd_handler, resp_waiter)),
260 }
261 }
262
263 pub fn start(&self) {
264 self.inner.start();
265 }
266}
267
268#[async_trait::async_trait]
269impl<C: WorkerClassification,
270 M: CmdTunnelMeta,
271 R: ClassifiedCmdTunnelRead<C, M>,
272 W: ClassifiedCmdTunnelWrite<C, M>,
273 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
274 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive + RawFixedBytes,
275 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
276 LISTENER: CmdTunnelListener<M, R, W>> ClassifiedWorkerFactory<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>> for ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER> {
277 async fn create(&self, c: Option<CmdNodeTunnelClassification<C>>) -> PoolResult<ClassifiedCmdSend<C, M, R, W, LEN, CMD>> {
278 self.inner.create(c).await
279 }
280}
281
282pub struct DefaultClassifiedCmdNode<C: WorkerClassification,
283 M: CmdTunnelMeta,
284 R: ClassifiedCmdTunnelRead<C, M>,
285 W: ClassifiedCmdTunnelWrite<C, M>,
286 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
287 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + FromPrimitive + ToPrimitive + RawFixedBytes,
288 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + RawFixedBytes + Eq + Hash + Debug,
289 LISTENER: CmdTunnelListener<M, R, W>> {
290 tunnel_pool: ClassifiedWorkerPoolRef<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>,
291 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
292}
293
294impl<C: WorkerClassification,
295 M: CmdTunnelMeta,
296 R: ClassifiedCmdTunnelRead<C, M>,
297 W: ClassifiedCmdTunnelWrite<C, M>,
298 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
299 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + FromPrimitive + ToPrimitive + RawFixedBytes,
300 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + RawFixedBytes + Eq + Hash + Debug,
301 LISTENER: CmdTunnelListener<M, R, W>> DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER> {
302 pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
303 let cmd_handler_map = Arc::new(CmdHandlerMap::new());
304 let handler_map = cmd_handler_map.clone();
305 let resp_waiter = Arc::new(RespWaiter::new());
306 let waiter = resp_waiter.clone();
307 let write_factory = ClassifiedCmdNodeWriteFactory::<C, M, R, W, _, LEN, CMD, LISTENER>::new(factory, listener, move |peer_id: PeerId, tunnel_id: TunnelId, header: CmdHeader<LEN, CMD>, body_read: CmdBody| {
308 let handler_map = handler_map.clone();
309 let waiter = waiter.clone();
310 async move {
311 if header.is_resp() && header.seq().is_some() {
312 let resp_id = gen_resp_id(header.cmd_code(), header.seq().unwrap());
313 let _ = waiter.set_result(resp_id, body_read);
314 Ok(None)
315 } else {
316 if let Some(handler) = handler_map.get(header.cmd_code()) {
317 handler.handle(peer_id, tunnel_id, header, body_read).await
318 } else {
319 Ok(None)
320 }
321 }
322 }
323 }, resp_waiter.clone());
324 write_factory.start();
325 Arc::new(Self {
326 tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
327 cmd_handler_map,
328 })
329 }
330
331
332 async fn get_send(&self, peer_id: PeerId) -> CmdResult<ClassifiedWorkerGuard<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>> {
333 self.tunnel_pool.get_classified_worker(CmdNodeTunnelClassification {
334 peer_id: Some(peer_id),
335 tunnel_id: None,
336 classification: None,
337 }).await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
338 }
339
340 async fn get_send_of_tunnel_id(&self, peer_id: PeerId, tunnel_id: TunnelId) -> CmdResult<ClassifiedWorkerGuard<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>> {
341 self.tunnel_pool.get_classified_worker(CmdNodeTunnelClassification {
342 peer_id: Some(peer_id),
343 tunnel_id: Some(tunnel_id),
344 classification: None,
345 }).await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
346 }
347
348 async fn get_classified_send(&self, classification: C) -> CmdResult<ClassifiedWorkerGuard<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>> {
349 self.tunnel_pool.get_classified_worker(CmdNodeTunnelClassification {
350 peer_id: None,
351 tunnel_id: None,
352 classification: Some(classification),
353 }).await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
354 }
355
356 async fn get_peer_classified_send(&self, peer_id: PeerId, classification: C) -> CmdResult<ClassifiedWorkerGuard<CmdNodeTunnelClassification<C>, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>> {
357 self.tunnel_pool.get_classified_worker(CmdNodeTunnelClassification {
358 peer_id: Some(peer_id),
359 tunnel_id: None,
360 classification: Some(classification),
361 }).await.map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
362 }
363}
364
365pub type ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<CmdNodeTunnelClassification<C>, M, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeWriteFactory<C, M, R, W, F, LEN, CMD, LISTENER>>;
366#[async_trait::async_trait]
367impl<C: WorkerClassification,
368 M: CmdTunnelMeta,
369 R: ClassifiedCmdTunnelRead<C, M>,
370 W: ClassifiedCmdTunnelWrite<C, M>,
371 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
372 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + FromPrimitive + ToPrimitive + RawFixedBytes,
373 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + RawFixedBytes + Eq + Hash + Debug,
374 LISTENER: CmdTunnelListener<M, R, W>> CmdNode<LEN, CMD, M, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER> {
375 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
376 self.cmd_handler_map.insert(cmd, handler)
377 }
378
379 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
380 let mut send = self.get_send(peer_id.clone()).await?;
381 send.send(cmd, version, body).await
382 }
383
384 async fn send_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
385 let mut send = self.get_send(peer_id.clone()).await?;
386 send.send_with_resp(cmd, version, body, timeout).await
387 }
388
389 async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
390 let mut send = self.get_send(peer_id.clone()).await?;
391 send.send2(cmd, version, body).await
392 }
393
394 async fn send2_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
395 let mut send = self.get_send(peer_id.clone()).await?;
396 send.send2_with_resp(cmd, version, body, timeout).await
397 }
398
399 async fn send_cmd(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
400 let mut send = self.get_send(peer_id.clone()).await?;
401 send.send_cmd(cmd, version, body).await
402 }
403
404 async fn send_cmd_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
405 let mut send = self.get_send(peer_id.clone()).await?;
406 send.send_cmd_with_resp(cmd, version, body, timeout).await
407 }
408
409 async fn send_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
410 let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
411 send.send(cmd, version, body).await
412 }
413
414 async fn send_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
415 let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
416 send.send_with_resp(cmd, version, body, timeout).await
417 }
418
419 async fn send2_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
420 let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
421 send.send2(cmd, version, body).await
422 }
423
424 async fn send2_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
425 let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
426 send.send2_with_resp(cmd, version, body, timeout).await
427 }
428
429 async fn send_cmd_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
430 let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
431 send.send_cmd(cmd, version, body).await
432 }
433
434 async fn send_cmd_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
435 let mut send = self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?;
436 send.send_cmd_with_resp(cmd, version, body, timeout).await
437 }
438
439 async fn clear_all_tunnel(&self) {
440 self.tunnel_pool.clear_all_worker().await;
441 }
442
443 async fn get_send(&self, peer_id: &PeerId, tunnel_id: TunnelId) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
444 Ok(ClassifiedSendGuard {
445 worker_guard: self.get_send_of_tunnel_id(peer_id.clone(), tunnel_id).await?,
446 _p: Default::default(),
447 })
448 }
449}
450
451#[async_trait::async_trait]
452impl<C: WorkerClassification,
453 M: CmdTunnelMeta,
454 R: ClassifiedCmdTunnelRead<C, M>,
455 W: ClassifiedCmdTunnelWrite<C, M>,
456 F: ClassifiedCmdNodeTunnelFactory<C, M, R, W>,
457 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + FromPrimitive + ToPrimitive + RawFixedBytes,
458 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync +'static + RawFixedBytes + Eq + Hash + Debug,
459 LISTENER: CmdTunnelListener<M, R, W>> ClassifiedCmdNode<LEN, CMD, C, M, ClassifiedCmdSend<C, M, R, W, LEN, CMD>, ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> for DefaultClassifiedCmdNode<C, M, R, W, F, LEN, CMD, LISTENER> {
460 async fn send_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
461 let mut send = self.get_classified_send(classification).await?;
462 send.send(cmd, version, body).await
463 }
464
465 async fn send_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
466 let mut send = self.get_classified_send(classification).await?;
467 send.send_with_resp(cmd, version, body, timeout).await
468 }
469
470 async fn send2_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
471 let mut send = self.get_classified_send(classification).await?;
472 send.send2(cmd, version, body).await
473 }
474
475 async fn send2_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
476 let mut send = self.get_classified_send(classification).await?;
477 send.send2_with_resp(cmd, version, body, timeout).await
478 }
479
480 async fn send_cmd_by_classified_tunnel(&self, classification: C, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
481 let mut send = self.get_classified_send(classification).await?;
482 send.send_cmd(cmd, version, body).await
483 }
484
485 async fn send_cmd_by_classified_tunnel_with_resp(&self, classification: C, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
486 let mut send = self.get_classified_send(classification).await?;
487 send.send_cmd_with_resp(cmd, version, body, timeout).await
488 }
489
490 async fn send_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
491 let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
492 send.send(cmd, version, body).await
493 }
494
495 async fn send_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
496 let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
497 send.send_with_resp(cmd, version, body, timeout).await
498 }
499
500 async fn send2_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
501 let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
502 send.send2(cmd, version, body).await
503 }
504
505 async fn send2_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
506 let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
507 send.send2_with_resp(cmd, version, body, timeout).await
508 }
509
510 async fn send_cmd_by_peer_classified_tunnel(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
511 let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
512 send.send_cmd(cmd, version, body).await
513 }
514
515 async fn send_cmd_by_peer_classified_tunnel_with_resp(&self, peer_id: &PeerId, classification: C, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
516 let mut send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
517 send.send_cmd_with_resp(cmd, version, body, timeout).await
518 }
519
520 async fn find_tunnel_id_by_classified(&self, classification: C) -> CmdResult<TunnelId> {
521 let send = self.get_classified_send(classification).await?;
522 Ok(send.get_tunnel_id())
523 }
524
525 async fn find_tunnel_id_by_peer_classified(&self, peer_id: &PeerId, classification: C) -> CmdResult<TunnelId> {
526 let send = self.get_peer_classified_send(peer_id.clone(), classification).await?;
527 Ok(send.get_tunnel_id())
528 }
529
530 async fn get_send_by_classified(&self, classification: C) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
531 Ok(ClassifiedSendGuard {
532 worker_guard: self.get_classified_send(classification).await?,
533 _p: Default::default(),
534 })
535 }
536
537 async fn get_send_by_peer_classified(&self, peer_id: &PeerId, classification: C) -> CmdResult<ClassifiedCmdNodeSendGuard<C, M, R, W, F, LEN, CMD, LISTENER>> {
538 Ok(ClassifiedSendGuard {
539 worker_guard: self.get_peer_classified_send(peer_id.clone(), classification).await?,
540 _p: Default::default(),
541 })
542 }
543}