1use std::fmt::Debug;
2use std::hash::Hash;
3use std::ops::DerefMut;
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6use async_named_locker::{NamedStateHolder, ObjectHolder};
7use bucky_raw_codec::{RawConvertTo, RawDecode, RawEncode, RawFixedBytes, RawFrom};
8use num::{FromPrimitive, ToPrimitive};
9use sfo_split::Splittable;
10use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
11use crate::cmd::{CmdBodyRead, CmdHandler, CmdHandlerMap, CmdHeader};
12use crate::{CmdBody, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, TunnelId};
13use crate::client::{gen_resp_id, gen_seq, RespWaiter, RespWaiterRef};
14use crate::errors::{cmd_err, into_cmd_err, CmdErrorCode, CmdResult};
15use crate::peer_connection::PeerConnection;
16use crate::peer_id::PeerId;
17use crate::server::CmdServer;
18use super::peer_manager::{PeerManager, PeerManagerRef};
19
20#[async_trait::async_trait]
21pub trait CmdTunnelListener<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>: Send + Sync + 'static {
22 async fn accept(&self) -> CmdResult<Splittable<R, W>>;
23}
24
25#[async_trait::async_trait]
26pub trait CmdServerEventListener: Send + Sync + 'static {
27 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()>;
28 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()>;
29}
30
31#[derive(Clone)]
32struct CmdServerEventListenerEmit {
33 listeners: Arc<Mutex<Vec<Arc<dyn CmdServerEventListener>>>>,
34}
35
36impl CmdServerEventListenerEmit {
37 pub fn new() -> Self {
38 Self {
39 listeners: Arc::new(Mutex::new(Vec::new())),
40 }
41 }
42
43 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
44 self.listeners.lock().unwrap().push(event_listener);
45 }
46}
47
48#[async_trait::async_trait]
49impl CmdServerEventListener for CmdServerEventListenerEmit {
50 async fn on_peer_connected(&self, peer_id: &PeerId) -> CmdResult<()> {
51 let listeners = {
52 self.listeners.lock().unwrap().clone()
53 };
54 for listener in listeners.iter() {
55 if let Err(e) = listener.on_peer_connected(peer_id).await {
56 log::error!("on_peer_connected error: {:?}", e);
57 }
58 }
59 Ok(())
60 }
61
62 async fn on_peer_disconnected(&self, peer_id: &PeerId) -> CmdResult<()> {
63 let listeners = {
64 self.listeners.lock().unwrap().clone()
65 };
66 for listener in listeners.iter() {
67 if let Err(e) = listener.on_peer_disconnected(peer_id).await {
68 log::error!("on_peer_disconnected error: {:?}", e);
69 }
70 }
71 Ok(())
72 }
73}
74
75pub struct DefaultCmdServer<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>, LEN, CMD, LISTENER> {
76 tunnel_listener: LISTENER,
77 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
78 peer_manager: PeerManagerRef<M, R, W>,
79 event_emit: CmdServerEventListenerEmit,
80 resp_waiter: RespWaiterRef,
81 state_holder: Arc<NamedStateHolder<tokio::task::Id>>,
82 _l: Mutex<std::marker::PhantomData<(R, W, LEN, CMD)>>,
83}
84
85impl<M: CmdTunnelMeta,
86 R: CmdTunnelRead<M>,
87 W: CmdTunnelWrite<M>,
88 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
89 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash + Debug,
90 LISTENER: CmdTunnelListener<M, R, W>> DefaultCmdServer<M, R, W, LEN, CMD, LISTENER> {
91 pub fn new(tunnel_listener: LISTENER) -> Arc<Self> {
92 let event_emit = CmdServerEventListenerEmit::new();
93 Arc::new(Self {
94 tunnel_listener,
95 cmd_handler_map: Arc::new(CmdHandlerMap::new()),
96 peer_manager: PeerManager::new(Arc::new(event_emit.clone())),
97 event_emit,
98 resp_waiter: Arc::new(RespWaiter::new()),
99 state_holder: NamedStateHolder::new(),
100 _l: Default::default(),
101 })
102 }
103
104 pub fn attach_event_listener(&self, event_listener: Arc<dyn CmdServerEventListener>) {
105 self.event_emit.attach_event_listener(event_listener);
106 }
107
108 pub async fn get_peer_tunnels(&self, peer_id: &PeerId) -> Vec<Arc<PeerConnection<R, W>>> {
109 let connections = self.peer_manager.find_connections(peer_id);
110 connections
111 }
112
113 pub fn start(self: &Arc<Self>) {
114 let this = self.clone();
115 tokio::spawn(async move {
116 if let Err(e) = this.run().await {
117 log::error!("cmd server error: {:?}", e);
118 }
119 });
120 }
121
122 async fn run(self: &Arc<Self>) -> CmdResult<()> {
123 loop {
124 let tunnel = self.tunnel_listener.accept().await?;
125 let peer_id = tunnel.get_remote_peer_id();
126 let tunnel_id = self.peer_manager.generate_conn_id();
127 let this = self.clone();
128 let resp_waiter = self.resp_waiter.clone();
129 let state_holder = self.state_holder.clone();
130 tokio::spawn(async move {
131 let remote_id = peer_id.clone();
132 let ret: CmdResult<()> = async move {
133 let this = this.clone();
134 let cmd_handler_map = this.cmd_handler_map.clone();
135 let (mut reader, writer) = tunnel.split();
136 let writer = ObjectHolder::new(writer);
137 let resp_write = writer.clone();
138 let resp_waiter = resp_waiter.clone();
139 let state_holder = state_holder.clone();
140 let recv_handle = tokio::spawn(async move {
141 let ret: CmdResult<()> = async move {
142 loop {
143 let header_len = reader.read_u8().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
144 let mut header = vec![0u8; header_len as usize];
145 let n = reader.read_exact(&mut header).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
146 if n == 0 {
147 break;
148 }
149 let header = CmdHeader::<LEN, CMD>::clone_from_slice(header.as_slice()).map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
150 println!("recv cmd {:?}", header.cmd_code());
151 let body_len = header.pkg_len().to_u64().unwrap();
152 let cmd_read = CmdBodyRead::new(reader, header.pkg_len().to_u64().unwrap() as usize);
153 let waiter = cmd_read.get_waiter();
154 let future = waiter.create_result_future().map_err(into_cmd_err!(CmdErrorCode::Failed))?;
155 {
156 let body_read = cmd_read;
157 let body = CmdBody::from_reader(BufReader::new(body_read), body_len);
158 if header.is_resp() && header.seq().is_some() {
159 let resp_id = gen_resp_id(header.cmd_code(), header.seq().unwrap());
160 let _ = resp_waiter.set_result(resp_id, body);
161 } else {
162 if let Some(handler) = cmd_handler_map.get(header.cmd_code()) {
163 let version = header.version();
164 let seq = header.seq();
165 let cmd_code = header.cmd_code();
166 match {
167 let _handle_state = state_holder.new_state(tokio::task::id());
168 handler.handle(remote_id.clone(), tunnel_id, header, body).await
169 } {
170 Ok(Some(mut body)) => {
171 let mut write = resp_write.get().await;
172 let header = CmdHeader::<LEN, CMD>::new(version, true, seq, cmd_code, LEN::from_u64(body.len()).unwrap());
173 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
174 if buf.len() > 255 {
175 return Err(cmd_err!(CmdErrorCode::RawCodecError, "header len too large"));
176 }
177 write.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
178 write.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
179 tokio::io::copy(&mut body, write.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
180 write.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
181 }
182 Err(e) => {
183 log::error!("handle cmd error: {:?}", e);
184 }
185 _ => {}
186 }
187 }
188 }
189 };
190 reader = future.await.map_err(into_cmd_err!(CmdErrorCode::Failed))??;
191 }
193 Ok(())
194 }.await;
195 ret
196 });
197
198 let peer_conn = PeerConnection {
199 conn_id: tunnel_id,
200 peer_id: peer_id.clone(),
201 send: writer,
202 handle: Some(recv_handle),
203 };
204 this.peer_manager.add_peer_connection(peer_conn).await;
205 Ok(())
206 }.await;
207 if let Err(e) = ret {
208 log::error!("peer connection error: {:?}", e);
209 }
210 });
211 }
212 }
213}
214
215#[async_trait::async_trait]
216impl<M: CmdTunnelMeta,
217 R: CmdTunnelRead<M>,
218 W: CmdTunnelWrite<M>,
219 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + FromPrimitive + ToPrimitive,
220 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + RawFixedBytes + Sync + Send + 'static + Eq + Hash + Debug,
221 LISTENER: CmdTunnelListener<M, R, W>> CmdServer<LEN, CMD> for DefaultCmdServer<M, R, W, LEN, CMD, LISTENER> {
222 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
223 self.cmd_handler_map.insert(cmd, handler);
224 }
225
226 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
227 let connections = self.peer_manager.find_connections(peer_id);
228 for conn in connections {
229 let ret: CmdResult<()> = async move {
230 log::trace!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
231 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
232 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
233 let mut send = conn.send.get().await;
234 if buf.len() > 255 {
235 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
236 }
237 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
238 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
239 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
240 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
241 Ok(())
242 }.await;
243 if ret.is_ok() {
244 break;
245 }
246 }
247 Ok(())
248 }
249
250 async fn send_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
251 let connections = self.peer_manager.find_connections(peer_id);
252 for conn in connections {
253 if let Some(id) = tokio::task::try_id() {
254 if self.state_holder.has_state(id) {
255 continue;
256 }
257 }
258 let ret: CmdResult<CmdBody> = async move {
259 log::trace!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
260 let seq = gen_seq();
261 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len() as u64).unwrap());
262 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
263 let resp_id = gen_resp_id(cmd, seq);
264 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
265 {
266 let mut send = conn.send.get().await;
267 if buf.len() > 255 {
268 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
269 }
270 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
271 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
272 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
273 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
274 }
275 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout, "cmd {:?}", cmd))?;
276 Ok(body )
277 }.await;
278 if ret.is_ok() {
279 return ret;
280 } else {
281 println!("send err {:?}", ret.unwrap_err());
282 }
283 }
284 Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
285 }
286
287 async fn send2(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
288 let connections = self.peer_manager.find_connections(peer_id);
289 for conn in connections {
290 let ret: CmdResult<()> = async move {
291 let mut len = 0;
292 for b in body.iter() {
293 len += b.len();
294 log::trace!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
295 }
296 log::trace!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
297 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
298 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
299 let mut send = conn.send.get().await;
300 if buf.len() > 255 {
301 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
302 }
303 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
304 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
305 for b in body.iter() {
306 send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
307 }
308 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
309 Ok(())
310 }.await;
311 if ret.is_ok() {
312 break;
313 }
314 }
315 Ok(())
316 }
317
318 async fn send2_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
319 let connections = self.peer_manager.find_connections(peer_id);
320 for conn in connections {
321 if let Some(id) = tokio::task::try_id() {
322 if self.state_holder.has_state(id) {
323 continue;
324 }
325 }
326 let ret: CmdResult<CmdBody> = async move {
327 let mut len = 0;
328 for b in body.iter() {
329 len += b.len();
330 log::trace!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
331 }
332 log::trace!("send2 peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
333 let seq = gen_seq();
334 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(len as u64).unwrap());
335 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
336 let resp_id = gen_resp_id(cmd, seq);
337 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
338 {
339 let mut send = conn.send.get().await;
340 if buf.len() > 255 {
341 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
342 }
343 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
344 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
345 for b in body.iter() {
346 send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
347 }
348 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
349 }
350 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
351 Ok(body)
352 }.await;
353 if ret.is_ok() {
354 return ret;
355 }
356 }
357 Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
358 }
359
360 async fn send_cmd(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody) -> CmdResult<()> {
361 let body_data = body.into_bytes().await?;
362 let body = body_data.as_slice();
363 let connections = self.peer_manager.find_connections(peer_id);
364 for conn in connections {
365 let ret: CmdResult<()> = async move {
366 log::trace!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
367 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
368 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
369 let mut send = conn.send.get().await;
370 if buf.len() > 255 {
371 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
372 }
373 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
374 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
375 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
376 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
377 Ok(())
378 }.await;
379 if ret.is_ok() {
380 break;
381 }
382 }
383 Ok(())
384 }
385
386 async fn send_cmd_with_resp(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
387 let connections = self.peer_manager.find_connections(peer_id);
388 let body_data = body.into_bytes().await?;
389 let data_ref = body_data.as_slice();
390 for conn in connections {
391 if let Some(id) = tokio::task::try_id() {
392 if self.state_holder.has_state(id) {
393 continue;
394 }
395 }
396 let ret: CmdResult<CmdBody> = async move {
397 log::trace!("send peer_id: {}, tunnel_id {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, data_ref.len());
398 let seq = gen_seq();
399 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(data_ref.len() as u64).unwrap());
400 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
401 let resp_id = gen_resp_id(cmd, seq);
402 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
403 {
404 let mut send = conn.send.get().await;
405 if buf.len() > 255 {
406 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
407 }
408 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
409 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
410 send.write_all(data_ref).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
411 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
412 }
413 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
414 Ok(body )
415 }.await;
416 if ret.is_ok() {
417 return ret;
418 }
419 }
420 Err(cmd_err!(CmdErrorCode::Failed, "send to peer_id: {}", peer_id))
421 }
422
423 async fn send_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
424 let conn = self.peer_manager.find_connection(tunnel_id);
425 if conn.is_none() {
426 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
427 }
428 let conn = conn.unwrap();
429 assert_eq!(tunnel_id, conn.conn_id);
430 log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
431 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
432 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
433 let mut send = conn.send.get().await;
434 if buf.len() > 255 {
435 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
436 }
437 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
438 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
439 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
440 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
441 Ok(())
442 }
443
444 async fn send_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[u8], timeout: Duration) -> CmdResult<CmdBody> {
445 let conn = self.peer_manager.find_connection(tunnel_id);
446 if conn.is_none() {
447 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
448 }
449 let conn = conn.unwrap();
450 if let Some(id) = tokio::task::try_id() {
451 if self.state_holder.has_state(id) {
452 return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
453 }
454 }
455 assert_eq!(tunnel_id, conn.conn_id);
456 log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {} data: {}", peer_id, conn.conn_id, cmd, body.len(), hex::encode(body));
457 let seq = gen_seq();
458 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len() as u64).unwrap());
459 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
460 let resp_id = gen_resp_id(cmd, seq);
461 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
462 {
463 let mut send = conn.send.get().await;
464 if buf.len() > 255 {
465 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
466 }
467 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
468 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
469 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
470 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
471 }
472 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
473 Ok(body)
474 }
475
476 async fn send2_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
477 let conn = self.peer_manager.find_connection(tunnel_id);
478 if conn.is_none() {
479 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
480 }
481 let conn = conn.unwrap();
482 assert_eq!(tunnel_id, conn.conn_id);
483 let mut len = 0;
484 for b in body.iter() {
485 len += b.len();
486 log::trace!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
487 }
488 log::trace!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
489 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
490 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
491 if buf.len() > 255 {
492 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
493 }
494 let mut send = conn.send.get().await;
495 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
496 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
497 for b in body.iter() {
498 send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
499 }
500 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
501 Ok(())
502 }
503
504 async fn send2_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, body: &[&[u8]], timeout: Duration) -> CmdResult<CmdBody> {
505 let conn = self.peer_manager.find_connection(tunnel_id);
506 if conn.is_none() {
507 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
508 }
509 let conn = conn.unwrap();
510 if let Some(id) = tokio::task::try_id() {
511 if self.state_holder.has_state(id) {
512 return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
513 }
514 }
515 assert_eq!(tunnel_id, conn.conn_id);
516 let mut len = 0;
517 for b in body.iter() {
518 len += b.len();
519 log::trace!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} body: {}", peer_id, conn.conn_id, cmd, hex::encode(b));
520 }
521 log::trace!("send2_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?} len: {}", peer_id, conn.conn_id, cmd, len);
522 let seq = gen_seq();
523 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(len as u64).unwrap());
524 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
525 let resp_id = gen_resp_id(cmd, seq);
526 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
527 if buf.len() > 255 {
528 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
529 }
530 {
531 let mut send = conn.send.get().await;
532 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
533 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
534 for b in body.iter() {
535 send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
536 }
537 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
538 }
539 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
540 Ok(body)
541 }
542
543 async fn send_cmd_by_specify_tunnel(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, mut body: CmdBody) -> CmdResult<()> {
544 let conn = self.peer_manager.find_connection(tunnel_id);
545 if conn.is_none() {
546 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
547 }
548 let conn = conn.unwrap();
549 assert_eq!(tunnel_id, conn.conn_id);
550 log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, body.len());
551 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len()).unwrap());
552 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
553 let mut send = conn.send.get().await;
554 if buf.len() > 255 {
555 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
556 }
557 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
558 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
559 tokio::io::copy(&mut body, send.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
560 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
561 Ok(())
562 }
563
564 async fn send_cmd_by_specify_tunnel_with_resp(&self, peer_id: &PeerId, tunnel_id: TunnelId, cmd: CMD, version: u8, mut body: CmdBody, timeout: Duration) -> CmdResult<CmdBody> {
565 let conn = self.peer_manager.find_connection(tunnel_id);
566 if conn.is_none() {
567 return Err(cmd_err!(CmdErrorCode::PeerConnectionNotFound, "tunnel_id: {:?}", tunnel_id));
568 }
569 let conn = conn.unwrap();
570 if let Some(id) = tokio::task::try_id() {
571 if self.state_holder.has_state(id) {
572 return Err(cmd_err!(CmdErrorCode::Failed, "can't send msg with resp in tunnel {:?} msg handle", conn.conn_id));
573 }
574 }
575 assert_eq!(tunnel_id, conn.conn_id);
576 log::trace!("send_by_specify_tunnel peer_id: {}, tunnel_id: {:?}, cmd: {:?}, len: {}", peer_id, conn.conn_id, cmd, body.len());
577 let seq = gen_seq();
578 let header = CmdHeader::<LEN, CMD>::new(version, false, Some(seq), cmd, LEN::from_u64(body.len()).unwrap());
579 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
580 let resp_id = gen_resp_id(cmd, seq);
581 let waiter = self.resp_waiter.create_timeout_result_future(resp_id, timeout).map_err(into_cmd_err!(CmdErrorCode::Failed))?;
582 {
583 let mut send = conn.send.get().await;
584 if buf.len() > 255 {
585 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
586 }
587 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
588 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
589 tokio::io::copy(&mut body, send.deref_mut().deref_mut()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
590 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
591 }
592 let body = waiter.await.map_err(into_cmd_err!(CmdErrorCode::Timeout))?;
593 Ok(body)
594 }
595
596 async fn send_by_all_tunnels(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
597 let connections = self.peer_manager.find_connections(peer_id);
598 for conn in connections {
599 let _ret: CmdResult<()> = async move {
600 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(body.len() as u64).unwrap());
601 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
602 let mut send = conn.send.get().await;
603 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
604 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
605 send.write_all(body).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
606 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
607 Ok(())
608 }.await;
609 }
610 Ok(())
611 }
612
613 async fn send2_by_all_tunnels(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[&[u8]]) -> CmdResult<()> {
614 let connections = self.peer_manager.find_connections(peer_id);
615 let mut len = 0;
616 for b in body.iter() {
617 len += b.len();
618 }
619 for conn in connections {
620 let _ret: CmdResult<()> = async move {
621 let header = CmdHeader::<LEN, CMD>::new(version, false, None, cmd, LEN::from_u64(len as u64).unwrap());
622 let buf = header.to_vec().map_err(into_cmd_err!(CmdErrorCode::RawCodecError))?;
623 if buf.len() > 255 {
624 return Err(cmd_err!(CmdErrorCode::InvalidParam, "header len too large"));
625 }
626 let mut send = conn.send.get().await;
627 send.write_u8(buf.len() as u8).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
628 send.write_all(buf.as_slice()).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
629 for b in body.iter() {
630 send.write_all(b).await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
631 }
632 send.flush().await.map_err(into_cmd_err!(CmdErrorCode::IoError))?;
633 Ok(())
634 }.await;
635 }
636 Ok(())
637 }
638}