1use std::fmt::Debug;
2use std::hash::Hash;
3use std::ops::DerefMut;
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6use async_named_locker::{NamedStateHolder, ObjectHolder};
7use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
8use num::{FromPrimitive, ToPrimitive};
9use sfo_split::Splittable;
10use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
11use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
12use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
13use crate::client::{gen_resp_id, gen_seq, RespWaiter, RespWaiterRef};
14use crate::errors::{cmd_err, into_cmd_err, CmdErrorCode, CmdResult};
15use crate::peer_connection::PeerConnection;
16use crate::peer_id::PeerId;
17use crate::server::CmdServer;
18use super::peer_manager::{PeerManager, PeerManagerRef};
19
20#[async_trait::async_trait]
21pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>: Send + Sync + 'static {
22 async fn accept(&self) -> CmdResult<Splittable<R, W>>;
23}
24
25#[async_trait::async_trait]
26pub trait CmdServerEventListener: Send + Sync + 'static {
27 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
28 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
29}
30
31#[derive(Clone)]
32struct CmdServerEventListenerEmit {
33 listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
34}
35
36impl CmdServerEventListenerEmit {
37 pub fn new() -> Self {
38 Self {
39 listeners: Arc::new(Mutex::new(Vec::new())),
40 }
41 }
42
43 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
44 self.listeners.lock().unwrap().push(event_listener);
45 }
46}
47
48#[async_trait::async_trait]
49impl CmdServerEventListener for CmdServerEventListenerEmit {
50 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
51 let listeners = {
52 self.listeners.lock().unwrap().clone()
53 };
54 for listener in listeners.iter() {
55 if let Err(e) = listener.on_peer_connected(peer_id).await {
56 log::error!("on_peer_connected error: {:?}", e);
57 }
58 }
59 Ok(())
60 }
61
62 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
63 let listeners = {
64 self.listeners.lock().unwrap().clone()
65 };
66 for listener in listeners.iter() {
67 if let Err(e) = listener.on_peer_disconnected(peer_id).await {
68 log::error!("on_peer_disconnected error: {:?}", e);
69 }
70 }
71 Ok(())
72 }
73}
74
75pub struct DefaultCmdServer<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD, LISTENER> {
76 tunnel_listener: LISTENER,
77 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
78 peer_manager: PeerManagerRef<M, R, W>,
79 event_emit: CmdServerEventListenerEmit,
80 resp_waiter: RespWaiterRef,
81 state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
82 _l: Mutex<std::marker::PhantomData<(R, W, LEN, CMD)>>,
83}
84
85impl<M: CmdTunnelMeta,
86 R: CmdTunnelRead<M>,
87 W: CmdTunnelWrite<M>,
88 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
89 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash + Debug,
90 LISTENER: CmdTunnelListener<M, R, W>> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER> {
91 pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
92 let event_emit = CmdServerEventListenerEmit::new();
93 Arc::new(Self {
94 tunnel_listener,
95 cmd_handler_map: Arc::new(CmdHandlerMap::new()),
96 peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
97 event_emit,
98 resp_waiter: Arc::new(RespWaiter::new()),
99 state_holder: NamedStateHolder::new(),
100 _l: Default::default(),
101 })
102 }
103
104 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
105 self.event_emit.attach_event_listener(event_listener);
106 }
107
108 pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
109 let connections = self.peer_manager.find_connections(peer_id);
110 connections
111 }
112
113 pub fn start(self: &Arc<Self>) {
114 let this = self.clone();
115 tokio::spawn(async move {
116 if let Err(e) = this.run().await {
117 log::error!("cmd server error: {:?}", e);
118 }
119 });
120 }
121
122 async fn run(self: &Arc<Self>) -> CmdResult<()> {
123 loop {
124 let tunnel = self.tunnel_listener.accept().await?;
125 let peer_id = tunnel.get_remote_peer_id();
126 let tunnel_id = self.peer_manager.generate_conn_id();
127 let this = self.clone();
128 let resp_waiter = self.resp_waiter.clone();
129 let state_holder = self.state_holder.clone();
130 tokio::spawn(async move {
131 let remote_id = peer_id.clone();
132 let ret: CmdResult<()> = async move {
133 let this = this.clone();
134 let cmd_handler_map = this.cmd_handler_map.clone();
135 let (mut reader, writer) = tunnel.split();
136 let writer = ObjectHolder::new(writer);
137 let resp_write = writer.clone();
138 let resp_waiter = resp_waiter.clone();
139 let state_holder = state_holder.clone();
140 let recv_handle = tokio::spawn(async move {
141 let ret: CmdResult<()> = async move {
142 loop {
143 let header_len = reader.read_u8().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
144 let mut header = vec![0u8; header_len as usize];
145 let n = reader.read_exact(&mut header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
146 if n == 0 {
147 break;
148 }
149 let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice()).map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
150 sfo_log::debug!("recv cmd {:?}", header.cmd_code());
151 let body_len = header.pkg_len().to_u64().unwrap();
152 let cmd_read = CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
153 let waiter = cmd_read.get_waiter();
154 let future = waiter.create_result_future().map_err(into_cmd_err!(CmdErrorCode::Failed))?;
155 {
156 let body_read = cmd_read;
157 let body = CmdBody::from_reader(BufReader::new(body_read), body_len);
158 if header.is_resp() && header.seq().is_some() {
159 let resp_id = gen_resp_id(header.cmd_code(), header.seq().unwrap());
160 let _ = resp_waiter.set_result(resp_id, body);
161 } else {
162 if let Some(handler) = cmd_handler_map.get(header.cmd_code()) {
163 let version = header.version();
164 let seq = header.seq();
165 let cmd_code = header.cmd_code();
166 match {
167 let _handle_state = state_holder.new_state(tokio::task::id());
168 handler.handle(remote_id.clone(), tunnel_id, header, body).await
169 } {
170 Ok(Some(mut body)) => {
171 let mut write = resp_write.get().await;
172 let header = CmdHeader::<LEN, CMD>::new(version, true, seq, cmd_code, LEN::from_u64(body.len()).unwrap());
173 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
174 if buf.len() > 255 {
175 return Err(cmd_err!(CmdErrorCode::RawCodecError, "header len too large"));
176 }
177 write.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
178 write.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
179 tokio::io::copy(&mut body, write.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
180 write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
181 }
182 Err(e) => {
183 log::error!("handle cmd error: {:?}", e);
184 }
185 _ => {}
186 }
187 }
188 }
189 };
190 reader = future.await.map_err(into_cmd_err!(CmdErrorCode::Failed))??;
191 }
193 Ok(())
194 }.await;
195 if ret.is_err() {
196 log::error!("recv cmd error: {:?}", ret.as_ref().err().unwrap());
197 }
198 ret
199 });
200
201 let peer_conn = PeerConnection {
202 conn_id: tunnel_id,
203 peer_id: peer_id.clone(),
204 send: writer,
205 handle: Some(recv_handle),
206 };
207 this.peer_manager.add_peer_connection(peer_conn).await;
208 Ok(())
209 }.await;
210 if let Err(e) = ret {
211 log::error!("peer connection error: {:?}", e);
212 }
213 });
214 }
215 }
216}
217
218#[async_trait::async_trait]
219impl<M: CmdTunnelMeta,
220 R: CmdTunnelRead<M>,
221 W: CmdTunnelWrite<M>,
222 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
223 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash + Debug,
224 LISTENER: CmdTunnelListener<M, R, W>> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER> {
225 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
226 self.cmd_handler_map.insert(cmd, handler);
227 }
228
229 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
230 let connections = self.peer_manager.find_connections(peer_id);
231 for conn in connections {
232 let ret: CmdResult<()> = async move {
233 log::debug!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
234 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
235 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
236 let mut send = conn.send.get().await;
237 if buf.len() > 255 {
238 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
239 }
240 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
241 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
242 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
243 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
244 Ok(())
245 }.await;
246 if ret.is_ok() {
247 break;
248 }
249 }
250 Ok(())
251 }
252
253 async fn send_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
254 let connections = self.peer_manager.find_connections(peer_id);
255 for conn in connections {
256 if let Some(id) = tokio::task::try_id() {
257 if self.state_holder.has_state(id) {
258 continue;
259 }
260 }
261 let ret: CmdResult<CmdBody> = async move {
262 log::debug!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
263 let seq = gen_seq();
264 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len() as u64).unwrap());
265 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
266 let resp_id = gen_resp_id(cmd, seq);
267 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
268 {
269 let mut send = conn.send.get().await;
270 if buf.len() > 255 {
271 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
272 }
273 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
274 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
275 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
276 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
277 }
278 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
279 Ok(body )
280 }.await;
281 if ret.is_ok() {
282 return ret;
283 } else {
284 sfo_log::error!("send err {:?}", ret.unwrap_err());
285 }
286 }
287 Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
288 }
289
290 async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
291 let connections = self.peer_manager.find_connections(peer_id);
292 for conn in connections {
293 let ret: CmdResult<()> = async move {
294 let mut len = 0;
295 for b in body.iter() {
296 len += b.len();
297 log::debug!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
298 }
299 log::debug!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
300 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
301 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
302 let mut send = conn.send.get().await;
303 if buf.len() > 255 {
304 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
305 }
306 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
307 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
308 for b in body.iter() {
309 send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
310 }
311 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
312 Ok(())
313 }.await;
314 if ret.is_ok() {
315 break;
316 }
317 }
318 Ok(())
319 }
320
321 async fn send2_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
322 let connections = self.peer_manager.find_connections(peer_id);
323 for conn in connections {
324 if let Some(id) = tokio::task::try_id() {
325 if self.state_holder.has_state(id) {
326 continue;
327 }
328 }
329 let ret: CmdResult<CmdBody> = async move {
330 let mut len = 0;
331 for b in body.iter() {
332 len += b.len();
333 log::debug!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
334 }
335 log::debug!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
336 let seq = gen_seq();
337 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(len as u64).unwrap());
338 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
339 let resp_id = gen_resp_id(cmd, seq);
340 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
341 {
342 let mut send = conn.send.get().await;
343 if buf.len() > 255 {
344 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
345 }
346 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
347 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
348 for b in body.iter() {
349 send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
350 }
351 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
352 }
353 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
354 Ok(body)
355 }.await;
356 if ret.is_ok() {
357 return ret;
358 }
359 }
360 Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
361 }
362
363 async fn send_cmd(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
364 let body_data = body.into_bytes().await?;
365 let body = body_data.as_slice();
366 let connections = self.peer_manager.find_connections(peer_id);
367 for conn in connections {
368 let ret: CmdResult<()> = async move {
369 log::debug!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
370 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
371 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
372 let mut send = conn.send.get().await;
373 if buf.len() > 255 {
374 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
375 }
376 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
377 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
378 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
379 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
380 Ok(())
381 }.await;
382 if ret.is_ok() {
383 break;
384 }
385 }
386 Ok(())
387 }
388
389 async fn send_cmd_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
390 let connections = self.peer_manager.find_connections(peer_id);
391 let body_data = body.into_bytes().await?;
392 let data_ref = body_data.as_slice();
393 for conn in connections {
394 if let Some(id) = tokio::task::try_id() {
395 if self.state_holder.has_state(id) {
396 continue;
397 }
398 }
399 let ret: CmdResult<CmdBody> = async move {
400 log::debug!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, data_ref.len());
401 let seq = gen_seq();
402 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(data_ref.len() as u64).unwrap());
403 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
404 let resp_id = gen_resp_id(cmd, seq);
405 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
406 {
407 let mut send = conn.send.get().await;
408 if buf.len() > 255 {
409 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
410 }
411 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
412 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
413 send.write_all(data_ref).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
414 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
415 }
416 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
417 Ok(body )
418 }.await;
419 if ret.is_ok() {
420 return ret;
421 }
422 }
423 Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
424 }
425
426 async fn send_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
427 let conn = self.peer_manager.find_connection(tunnel_id);
428 if conn.is_none() {
429 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
430 }
431 let conn = conn.unwrap();
432 assert_eq!(tunnel_id, conn.conn_id);
433 log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
434 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
435 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
436 let mut send = conn.send.get().await;
437 if buf.len() > 255 {
438 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
439 }
440 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
441 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
442 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
443 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
444 Ok(())
445 }
446
447 async fn send_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
448 let conn = self.peer_manager.find_connection(tunnel_id);
449 if conn.is_none() {
450 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
451 }
452 let conn = conn.unwrap();
453 if let Some(id) = tokio::task::try_id() {
454 if self.state_holder.has_state(id) {
455 return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
456 }
457 }
458 assert_eq!(tunnel_id, conn.conn_id);
459 log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
460 let seq = gen_seq();
461 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len() as u64).unwrap());
462 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
463 let resp_id = gen_resp_id(cmd, seq);
464 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
465 {
466 let mut send = conn.send.get().await;
467 if buf.len() > 255 {
468 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
469 }
470 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
471 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
472 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
473 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
474 }
475 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
476 Ok(body)
477 }
478
479 async fn send2_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
480 let conn = self.peer_manager.find_connection(tunnel_id);
481 if conn.is_none() {
482 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
483 }
484 let conn = conn.unwrap();
485 assert_eq!(tunnel_id, conn.conn_id);
486 let mut len = 0;
487 for b in body.iter() {
488 len += b.len();
489 log::debug!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
490 }
491 log::debug!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
492 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
493 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
494 if buf.len() > 255 {
495 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
496 }
497 let mut send = conn.send.get().await;
498 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
499 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
500 for b in body.iter() {
501 send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
502 }
503 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
504 Ok(())
505 }
506
507 async fn send2_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
508 let conn = self.peer_manager.find_connection(tunnel_id);
509 if conn.is_none() {
510 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
511 }
512 let conn = conn.unwrap();
513 if let Some(id) = tokio::task::try_id() {
514 if self.state_holder.has_state(id) {
515 return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
516 }
517 }
518 assert_eq!(tunnel_id, conn.conn_id);
519 let mut len = 0;
520 for b in body.iter() {
521 len += b.len();
522 log::debug!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
523 }
524 log::debug!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
525 let seq = gen_seq();
526 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(len as u64).unwrap());
527 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
528 let resp_id = gen_resp_id(cmd, seq);
529 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
530 if buf.len() > 255 {
531 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
532 }
533 {
534 let mut send = conn.send.get().await;
535 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
536 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
537 for b in body.iter() {
538 send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
539 }
540 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
541 }
542 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
543 Ok(body)
544 }
545
546 async fn send_cmd_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, mut body: CmdBody) -> CmdResult<()> {
547 let conn = self.peer_manager.find_connection(tunnel_id);
548 if conn.is_none() {
549 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
550 }
551 let conn = conn.unwrap();
552 assert_eq!(tunnel_id, conn.conn_id);
553 log::debug!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, body.len());
554 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len()).unwrap());
555 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
556 let mut send = conn.send.get().await;
557 if buf.len() > 255 {
558 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
559 }
560 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
561 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
562 tokio::io::copy(&mut body, send.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
563 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
564 Ok(())
565 }
566
567 async fn send_cmd_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, mut body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
568 let conn = self.peer_manager.find_connection(tunnel_id);
569 if conn.is_none() {
570 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
571 }
572 let conn = conn.unwrap();
573 if let Some(id) = tokio::task::try_id() {
574 if self.state_holder.has_state(id) {
575 return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
576 }
577 }
578 assert_eq!(tunnel_id, conn.conn_id);
579 log::debug!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, body.len());
580 let seq = gen_seq();
581 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len()).unwrap());
582 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
583 let resp_id = gen_resp_id(cmd, seq);
584 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
585 {
586 let mut send = conn.send.get().await;
587 if buf.len() > 255 {
588 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
589 }
590 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
591 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
592 tokio::io::copy(&mut body, send.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
593 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
594 }
595 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
596 Ok(body)
597 }
598
599 async fn send_by_all_tunnels(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
600 let connections = self.peer_manager.find_connections(peer_id);
601 for conn in connections {
602 let _ret: CmdResult<()> = async move {
603 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
604 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
605 let mut send = conn.send.get().await;
606 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
607 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
608 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
609 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
610 Ok(())
611 }.await;
612 }
613 Ok(())
614 }
615
616 async fn send2_by_all_tunnels(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
617 let connections = self.peer_manager.find_connections(peer_id);
618 let mut len = 0;
619 for b in body.iter() {
620 len += b.len();
621 }
622 for conn in connections {
623 let _ret: CmdResult<()> = async move {
624 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
625 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
626 if buf.len() > 255 {
627 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
628 }
629 let mut send = conn.send.get().await;
630 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
631 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
632 for b in body.iter() {
633 send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
634 }
635 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
636 Ok(())
637 }.await;
638 }
639 Ok(())
640 }
641}