1use crate::client::{ClassifiedSendGuard, CommonCmdSend, RespWaiter, RespWaiterRef, gen_resp_id};
2use crate::cmd::CmdHandlerMap;
3use crate::errors::{CmdErrorCode, CmdResult, into_cmd_err};
4use crate::node::create_recv_handle;
5use crate::server::CmdTunnelListener;
6use crate::{
7 CmdBody, CmdHandler, CmdHeader, CmdNode, CmdTunnelMeta, CmdTunnelRead, CmdTunnelWrite, PeerId,
8 TunnelId, TunnelIdGenerator, into_pool_err, pool_err,
9};
10use async_named_locker::ObjectHolder;
11use bucky_raw_codec::{RawDecode, RawEncode, RawFixedBytes};
12use num::{FromPrimitive, ToPrimitive};
13use sfo_pool::{
14 ClassifiedWorker, ClassifiedWorkerFactory, ClassifiedWorkerGuard, ClassifiedWorkerPool,
15 ClassifiedWorkerPoolRef, PoolErrorCode, PoolResult,
16};
17use sfo_split::Splittable;
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::hash::Hash;
21use std::sync::{Arc, Mutex};
22use std::time::Duration;
23
24#[async_trait::async_trait]
25pub trait CmdNodeTunnelFactory<M: CmdTunnelMeta, R: CmdTunnelRead<M>, W: CmdTunnelWrite<M>>:
26 Send + Sync + 'static
27{
28 async fn create_tunnel(&self, remote_id: &PeerId) -> CmdResult<Splittable<R, W>>;
29}
30
31impl<M, R, W, LEN, CMD> ClassifiedWorker<(PeerId, Option<TunnelId>)>
32 for CommonCmdSend<M, R, W, LEN, CMD>
33where
34 M: CmdTunnelMeta,
35 R: CmdTunnelRead<M>,
36 W: CmdTunnelWrite<M>,
37 LEN: RawEncode
38 + for<'a> RawDecode<'a>
39 + Copy
40 + Send
41 + Sync
42 + 'static
43 + FromPrimitive
44 + ToPrimitive,
45 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
46{
47 fn is_work(&self) -> bool {
48 self.is_work && !self.recv_handle.is_finished()
49 }
50
51 fn is_valid(&self, c: (PeerId, Option<TunnelId>)) -> bool {
52 let (peer_id, tunnel_id) = c;
53 if tunnel_id.is_some() {
54 self.tunnel_id == tunnel_id.unwrap() && peer_id == self.remote_id
55 } else {
56 peer_id == self.remote_id
57 }
58 }
59
60 fn classification(&self) -> (PeerId, Option<TunnelId>) {
61 (self.remote_id.clone(), Some(self.tunnel_id))
62 }
63}
64
65struct CmdWriteFactoryImpl<
66 M: CmdTunnelMeta,
67 R: CmdTunnelRead<M>,
68 W: CmdTunnelWrite<M>,
69 F: CmdNodeTunnelFactory<M, R, W>,
70 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
71 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
72 LISTENER: CmdTunnelListener<M, R, W>,
73> {
74 tunnel_listener: LISTENER,
75 tunnel_factory: F,
76 cmd_handler: Arc<dyn CmdHandler<LEN, CMD>>,
77 tunnel_id_generator: TunnelIdGenerator,
78 resp_waiter: RespWaiterRef,
79 send_cache: Arc<Mutex<HashMap<PeerId, Vec<CommonCmdSend<M, R, W, LEN, CMD>>>>>,
80}
81
82impl<
83 M: CmdTunnelMeta,
84 R: CmdTunnelRead<M>,
85 W: CmdTunnelWrite<M>,
86 F: CmdNodeTunnelFactory<M, R, W>,
87 LEN: RawEncode
88 + for<'a> RawDecode<'a>
89 + Copy
90 + Send
91 + Sync
92 + 'static
93 + FromPrimitive
94 + ToPrimitive
95 + RawFixedBytes,
96 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
97 LISTENER: CmdTunnelListener<M, R, W>,
98> CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>
99{
100 pub fn new(
101 tunnel_factory: F,
102 tunnel_listener: LISTENER,
103 cmd_handler: impl CmdHandler<LEN, CMD>,
104 resp_waiter: RespWaiterRef,
105 ) -> Self {
106 Self {
107 tunnel_listener,
108 tunnel_factory,
109 cmd_handler: Arc::new(cmd_handler),
110 tunnel_id_generator: TunnelIdGenerator::new(),
111 resp_waiter,
112 send_cache: Arc::new(Mutex::new(Default::default())),
113 }
114 }
115
116 pub fn start(self: &Arc<Self>) {
117 let this = self.clone();
118 tokio::spawn(async move {
119 if let Err(e) = this.run().await {
120 log::error!("cmd server error: {:?}", e);
121 }
122 });
123 }
124
125 async fn run(self: &Arc<Self>) -> CmdResult<()> {
126 loop {
127 let tunnel = self.tunnel_listener.accept().await?;
128 let peer_id = tunnel.get_remote_peer_id();
129 let tunnel_id = self.tunnel_id_generator.generate();
130 let resp_waiter = self.resp_waiter.clone();
131 let this = self.clone();
132 tokio::spawn(async move {
133 let ret: CmdResult<()> = async move {
134 let this = this.clone();
135 let cmd_handler = this.cmd_handler.clone();
136 let (reader, writer) = tunnel.split();
137 let remote_id = reader.get_remote_peer_id();
138 let tunnel_meta = reader.get_tunnel_meta();
139 let writer = ObjectHolder::new(writer);
140 let recv_handle = create_recv_handle::<M, R, W, LEN, CMD>(
141 reader,
142 writer.clone(),
143 tunnel_id,
144 cmd_handler,
145 );
146 {
147 let mut send_cache = this.send_cache.lock().unwrap();
148 let send_list = send_cache.entry(peer_id).or_insert(Vec::new());
149 send_list.push(CommonCmdSend::new(
150 tunnel_id,
151 recv_handle,
152 writer,
153 resp_waiter,
154 remote_id,
155 tunnel_meta,
156 ));
157 }
158 Ok(())
159 }
160 .await;
161 if let Err(e) = ret {
162 log::error!("peer connection error: {:?}", e);
163 }
164 });
165 }
166 }
167}
168
169#[async_trait::async_trait]
170impl<
171 M: CmdTunnelMeta,
172 R: CmdTunnelRead<M>,
173 W: CmdTunnelWrite<M>,
174 F: CmdNodeTunnelFactory<M, R, W>,
175 LEN: RawEncode
176 + for<'a> RawDecode<'a>
177 + Copy
178 + Send
179 + Sync
180 + 'static
181 + FromPrimitive
182 + ToPrimitive
183 + RawFixedBytes,
184 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
185 LISTENER: CmdTunnelListener<M, R, W>,
186> ClassifiedWorkerFactory<(PeerId, Option<TunnelId>), CommonCmdSend<M, R, W, LEN, CMD>>
187 for CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>
188{
189 async fn create(
190 &self,
191 c: Option<(PeerId, Option<TunnelId>)>,
192 ) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
193 if c.is_some() {
194 let (peer_id, tunnel_id) = c.unwrap();
195 if tunnel_id.is_some() {
196 let mut send_cache = self.send_cache.lock().unwrap();
197 if let Some(send_list) = send_cache.get_mut(&peer_id) {
198 let mut send_index = None;
199 for (index, send) in send_list.iter().enumerate() {
200 if send.get_tunnel_id() == tunnel_id.unwrap() {
201 send_index = Some(index);
202 break;
203 }
204 }
205 if let Some(send_index) = send_index {
206 let send = send_list.remove(send_index);
207 Ok(send)
208 } else {
209 Err(pool_err!(
210 PoolErrorCode::Failed,
211 "tunnel {:?} not found",
212 tunnel_id.unwrap()
213 ))
214 }
215 } else {
216 Err(pool_err!(
217 PoolErrorCode::Failed,
218 "tunnel {:?} not found",
219 tunnel_id.unwrap()
220 ))
221 }
222 } else {
223 {
224 let mut send_cache = self.send_cache.lock().unwrap();
225 if let Some(send_list) = send_cache.get_mut(&peer_id) {
226 if !send_list.is_empty() {
227 let send = send_list.pop().unwrap();
228 if send_list.is_empty() {
229 send_cache.remove(&peer_id);
230 }
231 return Ok(send);
232 }
233 }
234 }
235 let tunnel = self
236 .tunnel_factory
237 .create_tunnel(&peer_id)
238 .await
239 .map_err(into_pool_err!(PoolErrorCode::Failed))?;
240 let tunnel_id = self.tunnel_id_generator.generate();
241 let (recv, write) = tunnel.split();
242 let remote_id = recv.get_remote_peer_id();
243 let tunnel_meta = recv.get_tunnel_meta();
244 let write = ObjectHolder::new(write);
245 let cmd_handler = self.cmd_handler.clone();
246 let handle = create_recv_handle::<M, R, W, LEN, CMD>(
247 recv,
248 write.clone(),
249 tunnel_id,
250 cmd_handler,
251 );
252 Ok(CommonCmdSend::new(
253 tunnel_id,
254 handle,
255 write,
256 self.resp_waiter.clone(),
257 remote_id,
258 tunnel_meta,
259 ))
260 }
261 } else {
262 Err(pool_err!(PoolErrorCode::Failed, "peer id is none"))
263 }
264 }
265}
266
267pub struct CmdNodeWriteFactory<
268 M: CmdTunnelMeta,
269 R: CmdTunnelRead<M>,
270 W: CmdTunnelWrite<M>,
271 F: CmdNodeTunnelFactory<M, R, W>,
272 LEN: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + FromPrimitive + ToPrimitive,
273 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
274 LISTENER: CmdTunnelListener<M, R, W>,
275> {
276 inner: Arc<CmdWriteFactoryImpl<M, R, W, F, LEN, CMD, LISTENER>>,
277}
278
279impl<
280 M: CmdTunnelMeta,
281 R: CmdTunnelRead<M>,
282 W: CmdTunnelWrite<M>,
283 F: CmdNodeTunnelFactory<M, R, W>,
284 LEN: RawEncode
285 + for<'a> RawDecode<'a>
286 + Copy
287 + Send
288 + Sync
289 + 'static
290 + FromPrimitive
291 + ToPrimitive
292 + RawFixedBytes,
293 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + Debug + RawFixedBytes,
294 LISTENER: CmdTunnelListener<M, R, W>,
295> CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>
296{
297 pub(crate) fn new(
298 tunnel_factory: F,
299 tunnel_listener: LISTENER,
300 cmd_handler: impl CmdHandler<LEN, CMD>,
301 resp_waiter: RespWaiterRef,
302 ) -> Self {
303 Self {
304 inner: Arc::new(CmdWriteFactoryImpl::new(
305 tunnel_factory,
306 tunnel_listener,
307 cmd_handler,
308 resp_waiter,
309 )),
310 }
311 }
312
313 pub fn start(&self) {
314 self.inner.start();
315 }
316}
317
318#[async_trait::async_trait]
319impl<
320 M: CmdTunnelMeta,
321 R: CmdTunnelRead<M>,
322 W: CmdTunnelWrite<M>,
323 F: CmdNodeTunnelFactory<M, R, W>,
324 LEN: RawEncode
325 + for<'a> RawDecode<'a>
326 + Copy
327 + Send
328 + Sync
329 + 'static
330 + FromPrimitive
331 + ToPrimitive
332 + RawFixedBytes,
333 CMD: RawEncode + for<'a> RawDecode<'a> + Copy + Send + Sync + 'static + RawFixedBytes + Debug,
334 LISTENER: CmdTunnelListener<M, R, W>,
335> ClassifiedWorkerFactory<(PeerId, Option<TunnelId>), CommonCmdSend<M, R, W, LEN, CMD>>
336 for CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>
337{
338 async fn create(
339 &self,
340 c: Option<(PeerId, Option<TunnelId>)>,
341 ) -> PoolResult<CommonCmdSend<M, R, W, LEN, CMD>> {
342 self.inner.create(c).await
343 }
344}
345pub struct DefaultCmdNode<
346 M: CmdTunnelMeta,
347 R: CmdTunnelRead<M>,
348 W: CmdTunnelWrite<M>,
349 F: CmdNodeTunnelFactory<M, R, W>,
350 LEN: RawEncode
351 + for<'a> RawDecode<'a>
352 + Copy
353 + Send
354 + Sync
355 + 'static
356 + FromPrimitive
357 + ToPrimitive
358 + RawFixedBytes,
359 CMD: RawEncode
360 + for<'a> RawDecode<'a>
361 + Copy
362 + Send
363 + Sync
364 + 'static
365 + RawFixedBytes
366 + Eq
367 + Hash
368 + Debug,
369 LISTENER: CmdTunnelListener<M, R, W>,
370> {
371 tunnel_pool: ClassifiedWorkerPoolRef<
372 (PeerId, Option<TunnelId>),
373 CommonCmdSend<M, R, W, LEN, CMD>,
374 CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
375 >,
376 cmd_handler_map: Arc<CmdHandlerMap<LEN, CMD>>,
377}
378
379impl<
380 M: CmdTunnelMeta,
381 R: CmdTunnelRead<M>,
382 W: CmdTunnelWrite<M>,
383 F: CmdNodeTunnelFactory<M, R, W>,
384 LEN: RawEncode
385 + for<'a> RawDecode<'a>
386 + Copy
387 + Send
388 + Sync
389 + 'static
390 + FromPrimitive
391 + ToPrimitive
392 + RawFixedBytes,
393 CMD: RawEncode
394 + for<'a> RawDecode<'a>
395 + Copy
396 + Send
397 + Sync
398 + 'static
399 + RawFixedBytes
400 + Eq
401 + Hash
402 + Debug,
403 LISTENER: CmdTunnelListener<M, R, W>,
404> DefaultCmdNode<M, R, W, F, LEN, CMD, LISTENER>
405{
406 pub fn new(listener: LISTENER, factory: F, tunnel_count: u16) -> Arc<Self> {
407 let cmd_handler_map = Arc::new(CmdHandlerMap::new());
408 let handler_map = cmd_handler_map.clone();
409 let resp_waiter = Arc::new(RespWaiter::new());
410 let waiter = resp_waiter.clone();
411 let write_factory = CmdNodeWriteFactory::<M, R, W, _, LEN, CMD, LISTENER>::new(
412 factory,
413 listener,
414 move |local_id: PeerId,
415 peer_id: PeerId,
416 tunnel_id: TunnelId,
417 header: CmdHeader<LEN, CMD>,
418 body_read: CmdBody| {
419 let handler_map = handler_map.clone();
420 let waiter = waiter.clone();
421 async move {
422 if header.is_resp() && header.seq().is_some() {
423 let resp_id =
424 gen_resp_id(tunnel_id, header.cmd_code(), header.seq().unwrap());
425 let _ = waiter.set_result(resp_id, body_read);
426 Ok(None)
427 } else {
428 if let Some(handler) = handler_map.get(header.cmd_code()) {
429 handler
430 .handle(local_id, peer_id, tunnel_id, header, body_read)
431 .await
432 } else {
433 Ok(None)
434 }
435 }
436 }
437 },
438 resp_waiter.clone(),
439 );
440 write_factory.start();
441 Arc::new(Self {
442 tunnel_pool: ClassifiedWorkerPool::new(tunnel_count, write_factory),
443 cmd_handler_map,
444 })
445 }
446
447 async fn get_send(
448 &self,
449 peer_id: PeerId,
450 ) -> CmdResult<
451 ClassifiedWorkerGuard<
452 (PeerId, Option<TunnelId>),
453 CommonCmdSend<M, R, W, LEN, CMD>,
454 CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
455 >,
456 > {
457 self.tunnel_pool
458 .get_classified_worker((peer_id, None))
459 .await
460 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
461 }
462
463 async fn get_send_of_tunnel_id(
464 &self,
465 peer_id: PeerId,
466 tunnel_id: TunnelId,
467 ) -> CmdResult<
468 ClassifiedWorkerGuard<
469 (PeerId, Option<TunnelId>),
470 CommonCmdSend<M, R, W, LEN, CMD>,
471 CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
472 >,
473 > {
474 self.tunnel_pool
475 .get_classified_worker((peer_id, Some(tunnel_id)))
476 .await
477 .map_err(into_cmd_err!(CmdErrorCode::Failed, "get worker failed"))
478 }
479}
480
481pub type CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER> = ClassifiedSendGuard<
482 (PeerId, Option<TunnelId>),
483 M,
484 CommonCmdSend<M, R, W, LEN, CMD>,
485 CmdNodeWriteFactory<M, R, W, F, LEN, CMD, LISTENER>,
486>;
487#[async_trait::async_trait]
488impl<
489 M: CmdTunnelMeta,
490 R: CmdTunnelRead<M>,
491 W: CmdTunnelWrite<M>,
492 F: CmdNodeTunnelFactory<M, R, W>,
493 LEN: RawEncode
494 + for<'a> RawDecode<'a>
495 + Copy
496 + RawFixedBytes
497 + Sync
498 + Send
499 + 'static
500 + FromPrimitive
501 + ToPrimitive,
502 CMD: RawEncode
503 + for<'a> RawDecode<'a>
504 + Copy
505 + RawFixedBytes
506 + Sync
507 + Send
508 + 'static
509 + Eq
510 + Hash
511 + Debug,
512 LISTENER: CmdTunnelListener<M, R, W>,
513>
514 CmdNode<
515 LEN,
516 CMD,
517 M,
518 CommonCmdSend<M, R, W, LEN, CMD>,
519 CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>,
520 > for DefaultCmdNode<M, R, W, F, LEN, CMD, LISTENER>
521{
522 fn register_cmd_handler(&self, cmd: CMD, handler: impl CmdHandler<LEN, CMD>) {
523 self.cmd_handler_map.insert(cmd, handler);
524 }
525
526 async fn send(&self, peer_id: &PeerId, cmd: CMD, version: u8, body: &[u8]) -> CmdResult<()> {
527 let mut send = self.get_send(peer_id.clone()).await?;
528 send.send(cmd, version, body).await
529 }
530
531 async fn send_with_resp(
532 &self,
533 peer_id: &PeerId,
534 cmd: CMD,
535 version: u8,
536 body: &[u8],
537 timeout: Duration,
538 ) -> CmdResult<CmdBody> {
539 let mut send = self.get_send(peer_id.clone()).await?;
540 send.send_with_resp(cmd, version, body, timeout).await
541 }
542
543 async fn send2(
544 &self,
545 peer_id: &PeerId,
546 cmd: CMD,
547 version: u8,
548 body: &[&[u8]],
549 ) -> CmdResult<()> {
550 let mut send = self.get_send(peer_id.clone()).await?;
551 send.send2(cmd, version, body).await
552 }
553
554 async fn send2_with_resp(
555 &self,
556 peer_id: &PeerId,
557 cmd: CMD,
558 version: u8,
559 body: &[&[u8]],
560 timeout: Duration,
561 ) -> CmdResult<CmdBody> {
562 let mut send = self.get_send(peer_id.clone()).await?;
563 send.send2_with_resp(cmd, version, body, timeout).await
564 }
565
566 async fn send_cmd(
567 &self,
568 peer_id: &PeerId,
569 cmd: CMD,
570 version: u8,
571 body: CmdBody,
572 ) -> CmdResult<()> {
573 let mut send = self.get_send(peer_id.clone()).await?;
574 send.send_cmd(cmd, version, body).await
575 }
576
577 async fn send_cmd_with_resp(
578 &self,
579 peer_id: &PeerId,
580 cmd: CMD,
581 version: u8,
582 body: CmdBody,
583 timeout: Duration,
584 ) -> CmdResult<CmdBody> {
585 let mut send = self.get_send(peer_id.clone()).await?;
586 send.send_cmd_with_resp(cmd, version, body, timeout).await
587 }
588
589 async fn send_by_specify_tunnel(
590 &self,
591 peer_id: &PeerId,
592 tunnel_id: TunnelId,
593 cmd: CMD,
594 version: u8,
595 body: &[u8],
596 ) -> CmdResult<()> {
597 let mut send = self
598 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
599 .await?;
600 send.send(cmd, version, body).await
601 }
602
603 async fn send_by_specify_tunnel_with_resp(
604 &self,
605 peer_id: &PeerId,
606 tunnel_id: TunnelId,
607 cmd: CMD,
608 version: u8,
609 body: &[u8],
610 timeout: Duration,
611 ) -> CmdResult<CmdBody> {
612 let mut send = self
613 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
614 .await?;
615 send.send_with_resp(cmd, version, body, timeout).await
616 }
617
618 async fn send2_by_specify_tunnel(
619 &self,
620 peer_id: &PeerId,
621 tunnel_id: TunnelId,
622 cmd: CMD,
623 version: u8,
624 body: &[&[u8]],
625 ) -> CmdResult<()> {
626 let mut send = self
627 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
628 .await?;
629 send.send2(cmd, version, body).await
630 }
631
632 async fn send2_by_specify_tunnel_with_resp(
633 &self,
634 peer_id: &PeerId,
635 tunnel_id: TunnelId,
636 cmd: CMD,
637 version: u8,
638 body: &[&[u8]],
639 timeout: Duration,
640 ) -> CmdResult<CmdBody> {
641 let mut send = self
642 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
643 .await?;
644 send.send2_with_resp(cmd, version, body, timeout).await
645 }
646
647 async fn send_cmd_by_specify_tunnel(
648 &self,
649 peer_id: &PeerId,
650 tunnel_id: TunnelId,
651 cmd: CMD,
652 version: u8,
653 body: CmdBody,
654 ) -> CmdResult<()> {
655 let mut send = self
656 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
657 .await?;
658 send.send_cmd(cmd, version, body).await
659 }
660
661 async fn send_cmd_by_specify_tunnel_with_resp(
662 &self,
663 peer_id: &PeerId,
664 tunnel_id: TunnelId,
665 cmd: CMD,
666 version: u8,
667 body: CmdBody,
668 timeout: Duration,
669 ) -> CmdResult<CmdBody> {
670 let mut send = self
671 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
672 .await?;
673 send.send_cmd_with_resp(cmd, version, body, timeout).await
674 }
675
676 async fn clear_all_tunnel(&self) {
677 self.tunnel_pool.clear_all_worker().await
678 }
679
680 async fn get_send(
681 &self,
682 peer_id: &PeerId,
683 tunnel_id: TunnelId,
684 ) -> CmdResult<CmdNodeSendGuard<M, R, W, F, LEN, CMD, LISTENER>> {
685 Ok(ClassifiedSendGuard {
686 worker_guard: self
687 .get_send_of_tunnel_id(peer_id.clone(), tunnel_id)
688 .await?,
689 _p: Default::default(),
690 })
691 }
692}