1use core::fmt::Debug;
2use std::{
3 error::Error,
4 fmt::Display,
5 future::Future,
6 marker::PhantomData,
7 net::{IpAddr, Ipv4Addr, SocketAddr},
8 path::{Path, PathBuf},
9 sync::Arc,
10};
11
12use directories::ProjectDirs;
13use postcard_rpc::{
14 host_client::{
15 HostClient, HostErr, MultiSubRxError, MultiSubscription, SchemaReport, TopicReport, WireRx,
16 WireSpawn, WireTx,
17 },
18 standard_icd::{PingEndpoint, WireError, ERROR_PATH},
19 Endpoint, Topic,
20};
21use poststation_api_icd::postsock::{
22 Anchor, DeviceData, Direction, GetDevicesEndpoint, GetLogsEndpoint, GetLogsRangeEndpoint,
23 GetSchemasEndpoint, GetTopicsEndpoint, Log, LogRangeRequest, LogRequest, ProxyEndpoint,
24 ProxyRequest, ProxyResponse, PublishEndpoint, PublishRequest, PublishResponse,
25 StartStreamEndpoint, SubscribeTopic, TopicMsg, TopicRequest, TopicStreamMsg,
26 TopicStreamRequest, TopicStreamResult, Uuidv7,
27};
28use rustls::{
29 pki_types::{pem::PemObject, CertificateDer, ServerName},
30 RootCertStore,
31};
32use serde::{de::DeserializeOwned, Serialize};
33use tokio::{
34 io::{split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf},
35 net::TcpStream,
36};
37
38pub use postcard_schema as schema;
39pub use poststation_api_icd as icd;
40pub use postcard_dyn::Value;
41use tokio_rustls::TlsConnector;
42
43#[non_exhaustive]
46#[derive(Debug)]
47pub enum ClientError {
48 ConnectionClosed,
50 Protocol,
52 Encoding,
54 Server(String),
56 Remote(String),
58 Dynamic(String),
60}
61
62impl From<HostErr<WireError>> for ClientError {
63 fn from(value: HostErr<WireError>) -> Self {
64 match value {
65 HostErr::Wire(_e) => ClientError::Protocol,
66 HostErr::BadResponse => ClientError::Protocol,
67 HostErr::Postcard(_error) => ClientError::Encoding,
68 HostErr::Closed => ClientError::ConnectionClosed,
69 }
70 }
71}
72
73impl Display for ClientError {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 <Self as Debug>::fmt(self, f)
76 }
77}
78
79impl Error for ClientError {}
80
81#[derive(Clone)]
82pub struct PoststationClient {
83 client: HostClient<WireError>,
84}
85
86impl PoststationClient {
87 #[doc(hidden)]
91 pub fn raw_client(&self) -> &HostClient<WireError> {
92 &self.client
93 }
94
95 pub async fn get_devices(&self) -> Result<Vec<DeviceData>, ClientError> {
99 Ok(self.client.send_resp::<GetDevicesEndpoint>(&()).await?)
100 }
101
102 pub async fn get_device_schemas(
108 &self,
109 serial: u64,
110 ) -> Result<Option<SchemaReport>, ClientError> {
111 let res = self.client.send_resp::<GetSchemasEndpoint>(&serial).await?;
112 Ok(res)
113 }
114
115 pub async fn get_device_logs(
121 &self,
122 serial: u64,
123 count: u32,
124 ) -> Result<Option<Vec<Log>>, ClientError> {
125 Ok(self
126 .client
127 .send_resp::<GetLogsEndpoint>(&LogRequest { serial, count })
128 .await?)
129 }
130
131 pub async fn get_device_logs_range(
143 &self,
144 serial: u64,
145 count: u32,
146 dir: Direction,
147 anchor: Anchor,
148 ) -> Result<Option<Vec<Log>>, ClientError> {
149 Ok(self
150 .client
151 .send_resp::<GetLogsRangeEndpoint>(&LogRangeRequest {
152 serial,
153 count,
154 anchor,
155 direction: dir,
156 })
157 .await?)
158 }
159
160 pub async fn get_device_topics_out_by_path_raw(
169 &self,
170 serial: u64,
171 path: &str,
172 count: u32,
173 ) -> Result<Option<Vec<TopicMsg>>, ClientError> {
174 let schemas = self.get_device_schemas(serial).await?;
175 let Some(schemas) = schemas else {
176 return Ok(None);
177 };
178
179 let res = schemas
181 .topics_out
182 .iter()
183 .find(|t| t.path.as_str() == path)
184 .map(|t| t.key);
185 let Some(key) = res else { return Ok(None) };
186
187 Ok(self
188 .client
189 .send_resp::<GetTopicsEndpoint>(&TopicRequest {
190 serial,
191 count,
192 path: path.to_string(),
193 key,
194 })
195 .await?)
196 }
197
198 pub async fn get_device_topics_out_by_path_json(
206 &self,
207 serial: u64,
208 path: &str,
209 count: u32,
210 ) -> Result<Option<Vec<(Uuidv7, Value)>>, ClientError> {
211 let schemas = self.get_device_schemas(serial).await?;
212 let Some(schemas) = schemas else {
213 return Ok(None);
214 };
215
216 let res = schemas.topics_out.iter().find(|t| t.path.as_str() == path);
218 let Some(schema) = res else { return Ok(None) };
219
220 let raws = self
221 .client
222 .send_resp::<GetTopicsEndpoint>(&TopicRequest {
223 serial,
224 count,
225 path: path.to_string(),
226 key: schema.key,
227 })
228 .await?;
229 let Some(raws) = raws else {
230 return Ok(None);
231 };
232
233 let res = raws
234 .into_iter()
235 .map(|tm| {
236 let msg = postcard_dyn::from_slice_dyn(&schema.ty, &tm.msg)
237 .map_err(|_| ClientError::Encoding)?;
238 Result::<_, ClientError>::Ok((tm.uuidv7, msg))
239 })
240 .collect::<Result<Vec<_>, _>>()?;
241
242 Ok(Some(res))
243 }
244
245 pub async fn proxy_endpoint<E>(
256 &self,
257 serial: u64,
258 seq_no: u32,
259 body: &E::Request,
260 ) -> Result<E::Response, ClientError>
261 where
262 E: Endpoint,
263 E::Request: Serialize,
264 E::Response: DeserializeOwned,
265 {
266 let Some(schemas) = self.get_device_schemas(serial).await? else {
267 return Err(ClientError::Server("endpoint not found".into()));
268 };
269
270 let res = schemas.endpoints.iter().find(|e| {
274 e.path.as_str() == E::PATH && e.req_key == E::REQ_KEY && e.resp_key == E::RESP_KEY
275 });
276 let Some(schema) = res else {
277 return Err(ClientError::Server("endpoint not found".into()));
278 };
279 let Ok(body) = postcard::to_stdvec(body) else {
280 return Err(ClientError::Encoding);
281 };
282
283 let req = ProxyRequest {
284 serial,
285 path: schema.path.clone(),
286 req_key: schema.req_key,
287 resp_key: schema.resp_key,
288 seq_no,
289 req_body: body,
290 };
291
292 let resp = self.client.send_resp::<ProxyEndpoint>(&req).await;
293
294 let resp = resp?;
296
297 let resp = match resp {
299 ProxyResponse::Ok { body, .. } => body,
300 ProxyResponse::WireErr { body, .. } => {
301 return Err(ClientError::Remote(format!("WireErr: {body:?}")))
302 }
303 ProxyResponse::OtherErr(e) => {
304 return Err(ClientError::Remote(format!("Other Server Err: '{e}'")))
305 }
306 };
307
308 let resp = postcard::from_bytes::<E::Response>(&resp);
309
310 match resp {
311 Ok(v) => Ok(v),
312 Err(_e) => Err(ClientError::Encoding),
313 }
314 }
315
316 pub async fn proxy_endpoint_json(
329 &self,
330 serial: u64,
331 path: &str,
332 seq_no: u32,
333 body: Value,
334 ) -> Result<Value, ClientError> {
335 let Ok(Some(schemas)) = self.get_device_schemas(serial).await else {
336 return Err(ClientError::Server("endpoint not found".into()));
337 };
338
339 let res = schemas.endpoints.iter().find(|e| e.path.as_str() == path);
341 let Some(schema) = res else {
342 return Err(ClientError::Server("endpoint not found".into()));
343 };
344
345 let Ok(body) = postcard_dyn::to_stdvec_dyn(&schema.req_ty, &body) else {
346 return Err(ClientError::Dynamic(
347 "provided JSON does not match the expected schema for this endpoint".into(),
348 ));
349 };
350 let req = ProxyRequest {
351 serial,
352 path: schema.path.clone(),
353 req_key: schema.req_key,
354 resp_key: schema.resp_key,
355 seq_no,
356 req_body: body,
357 };
358
359 let resp = self.client.send_resp::<ProxyEndpoint>(&req).await;
360
361 let resp = resp?;
363
364 let resp = match resp {
366 ProxyResponse::Ok { body, .. } => body,
367 ProxyResponse::WireErr { body, .. } => {
368 return Err(ClientError::Remote(format!("WireErr: {body:?}")))
369 }
370 ProxyResponse::OtherErr(e) => {
371 return Err(ClientError::Remote(format!("Other Server Err: '{e}'")))
372 }
373 };
374
375 let resp = postcard_dyn::from_slice_dyn(&schema.resp_ty, &resp);
376
377 match resp {
378 Ok(v) => Ok(v),
379 Err(e) => Err(ClientError::Dynamic(format!("Decode error: '{e:?}'"))),
380 }
381 }
382
383 pub async fn publish_topic_json(
391 &self,
392 serial: u64,
393 path: &str,
394 seq_no: u32,
395 body: Value,
396 ) -> Result<(), ClientError> {
397 let Ok(Some(schemas)) = self.get_device_schemas(serial).await else {
398 return Err(ClientError::Server("topic not found".into()));
399 };
400
401 let res = schemas.topics_in.iter().find(|e| e.path.as_str() == path);
403 let Some(schema) = res else {
404 return Err(ClientError::Server("topic not found".into()));
405 };
406
407 let Ok(body) = postcard_dyn::to_stdvec_dyn(&schema.ty, &body) else {
408 return Err(ClientError::Dynamic(
409 "provided JSON does not match the schema for this topic".into(),
410 ));
411 };
412 let req = PublishRequest {
413 serial,
414 path: schema.path.clone(),
415 topic_key: schema.key,
416 seq_no,
417 topic_body: body,
418 };
419
420 let resp = self.client.send_resp::<PublishEndpoint>(&req).await;
421
422 let resp = resp?;
423
424 match resp {
425 PublishResponse::Sent => Ok(()),
426 PublishResponse::OtherErr(e) => Err(ClientError::Server(e)),
427 }
428 }
429
430 pub async fn publish_topic<T>(
435 &self,
436 serial: u64,
437 seq_no: u32,
438 body: &T::Message,
439 ) -> Result<(), ClientError>
440 where
441 T: Topic,
442 T::Message: Serialize,
443 {
444 let Ok(Some(schemas)) = self.get_device_schemas(serial).await else {
445 return Err(ClientError::Server("topic not found".into()));
446 };
447
448 let res = schemas
452 .topics_in
453 .iter()
454 .find(|t| t.path.as_str() == T::PATH && t.key == T::TOPIC_KEY);
455 let Some(schema) = res else {
456 return Err(ClientError::Server("topic not found".into()));
457 };
458
459 let Ok(body) = postcard::to_stdvec(body) else {
460 return Err(ClientError::Encoding);
461 };
462 let req = PublishRequest {
463 serial,
464 path: schema.path.clone(),
465 topic_key: schema.key,
466 seq_no,
467 topic_body: body,
468 };
469
470 let resp = self.client.send_resp::<PublishEndpoint>(&req).await;
471
472 let resp = resp?;
473
474 match resp {
475 PublishResponse::Sent => Ok(()),
476 PublishResponse::OtherErr(e) => Err(ClientError::Server(e)),
477 }
478 }
479
480 pub async fn stream_topic_json(
482 &self,
483 serial: u64,
484 path: &str,
485 ) -> Result<JsonStreamListener, ClientError> {
486 let Ok(Some(schemas)) = self.get_device_schemas(serial).await else {
487 return Err(ClientError::Server("topic not found".into()));
488 };
489
490 let res = schemas
492 .topics_out
493 .iter()
494 .find(|e| e.path.as_str() == path)
495 .cloned();
496 let Some(schema) = res else {
497 return Err(ClientError::Server("topic not found".into()));
498 };
499
500 let sub = self
501 .client
502 .subscribe_multi::<SubscribeTopic>(64)
503 .await
504 .map_err(|_| ClientError::ConnectionClosed)?;
505
506 let res = self
507 .client
508 .send_resp::<StartStreamEndpoint>(&TopicStreamRequest {
509 serial,
510 path: path.to_string(),
511 key: schema.key,
512 })
513 .await;
514
515 let stream_id = match res? {
516 TopicStreamResult::Started(id) => id,
517 TopicStreamResult::DeviceDisconnected => {
518 return Err(ClientError::Server("Device Disconnected".into()))
519 }
520 TopicStreamResult::NoDeviceKnown => {
521 return Err(ClientError::Server("No Device Known".into()))
522 }
523 TopicStreamResult::NoSuchTopic => {
524 return Err(ClientError::Server("No Such Topic".into()))
525 }
526 };
527
528 Ok(JsonStreamListener {
529 schema,
530 sub,
531 stream_id,
532 })
533 }
534
535 pub async fn stream_topic<T>(&self, serial: u64) -> Result<StreamListener<T>, ClientError>
537 where
538 T: Topic,
539 T::Message: DeserializeOwned,
540 {
541 let Ok(Some(schemas)) = self.get_device_schemas(serial).await else {
542 return Err(ClientError::Server("topic not found".into()));
543 };
544
545 let res = schemas
547 .topics_out
548 .iter()
549 .find(|e| e.path.as_str() == T::PATH && e.key == T::TOPIC_KEY)
550 .cloned();
551 let Some(schema) = res else {
552 return Err(ClientError::Server("topic not found".into()));
553 };
554
555 let sub = self
556 .client
557 .subscribe_multi::<SubscribeTopic>(64)
558 .await
559 .map_err(|_| ClientError::ConnectionClosed)?;
560
561 let res = self
562 .client
563 .send_resp::<StartStreamEndpoint>(&TopicStreamRequest {
564 serial,
565 path: T::PATH.to_string(),
566 key: schema.key,
567 })
568 .await;
569
570 let stream_id = match res? {
571 TopicStreamResult::Started(id) => id,
572 TopicStreamResult::DeviceDisconnected => {
573 return Err(ClientError::Server("Device Disconnected".into()))
574 }
575 TopicStreamResult::NoDeviceKnown => {
576 return Err(ClientError::Server("No Device Known".into()))
577 }
578 TopicStreamResult::NoSuchTopic => {
579 return Err(ClientError::Server("No Such Topic".into()))
580 }
581 };
582
583 Ok(StreamListener {
584 sub,
585 stream_id,
586 _pd: PhantomData,
587 })
588 }
589}
590
591pub struct JsonStreamListener {
592 stream_id: Uuidv7,
593 schema: TopicReport,
594 sub: MultiSubscription<TopicStreamMsg>,
595}
596
597impl JsonStreamListener {
598 pub async fn recv(&mut self) -> Option<Value> {
602 loop {
603 let msg = match self.sub.recv().await {
604 Ok(m) => m,
605 Err(MultiSubRxError::IoClosed) => return None,
606 Err(MultiSubRxError::Lagged(n)) => {
607 tracing::warn!(stream_id = ?self.stream_id, lags = n, "Stream lagged");
608 continue;
609 }
610 };
611
612 let TopicStreamMsg { stream_id, msg } = msg;
613 if stream_id != self.stream_id {
614 continue;
615 }
616
617 let Ok(msg) = postcard_dyn::from_slice_dyn(&self.schema.ty, &msg) else {
618 continue;
619 };
620 return Some(msg);
621 }
622 }
623}
624
625pub struct StreamListener<T>
626where
627 T: Topic,
628 T::Message: DeserializeOwned,
629{
630 stream_id: Uuidv7,
631 sub: MultiSubscription<TopicStreamMsg>,
632 _pd: PhantomData<fn() -> T>,
633}
634
635impl<T> StreamListener<T>
636where
637 T: Topic,
638 T::Message: DeserializeOwned,
639{
640 pub async fn recv(&mut self) -> Option<T::Message> {
644 loop {
645 let msg = match self.sub.recv().await {
646 Ok(m) => m,
647 Err(MultiSubRxError::IoClosed) => return None,
648 Err(MultiSubRxError::Lagged(n)) => {
649 tracing::warn!(stream_id = ?self.stream_id, lags = n, "Stream lagged");
650 continue;
651 }
652 };
653
654 let TopicStreamMsg { stream_id, msg } = msg;
655 if stream_id != self.stream_id {
656 continue;
657 }
658
659 let Ok(msg) = postcard::from_bytes(&msg) else {
660 continue;
661 };
662 return Some(msg);
663 }
664 }
665}
666
667pub async fn connect_insecure(port: u16) -> Result<PoststationClient, ConnectError> {
672 let socket = TcpStream::connect(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port))
674 .await
675 .map_err(|_| ConnectError::Connection)?;
676 let addr = socket.peer_addr().map_err(|_| ConnectError::Connection)?;
677 socket
678 .set_nodelay(true)
679 .map_err(|_| ConnectError::Connection)?;
680 let (rx, tx) = split(socket);
681
682 let client = HostClient::<WireError>::new_with_wire(
683 TcpCommsTx { tx },
684 TcpCommsRx {
685 rx,
686 addr,
687 buf: vec![],
688 },
689 TcpSpawn,
690 postcard_rpc::header::VarSeqKind::Seq4,
691 ERROR_PATH,
692 64,
693 );
694
695 let res = client
696 .send_resp::<PingEndpoint>(&42)
697 .await
698 .map_err(|_| ConnectError::Protocol)?;
699
700 if res != 42 {
701 return Err(ConnectError::Protocol);
702 }
703
704 Ok(PoststationClient { client })
705}
706
707#[non_exhaustive]
708#[derive(Debug)]
709pub enum ConnectError {
710 CaCertificate,
712 Connection,
714 Protocol,
716}
717
718impl Display for ConnectError {
719 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
720 <Self as Debug>::fmt(self, f)
721 }
722}
723
724impl Error for ConnectError {}
725
726pub async fn connect<T: tokio::net::ToSocketAddrs>(
733 addr: T,
734) -> Result<PoststationClient, ConnectError> {
735 let Some(dirs) = ProjectDirs::from("com.onevariable", "onevariable", "poststation") else {
738 return Err(ConnectError::CaCertificate);
739 };
740 let data_dir = dirs.data_dir();
741 let mut pem_path = PathBuf::from(data_dir);
742 pem_path.push("ca-cert.pem");
743 connect_with_ca_pem(addr, &pem_path).await
744}
745
746pub async fn connect_with_ca_pem<T: tokio::net::ToSocketAddrs>(
748 addr: T,
749 ca_path: &Path,
750) -> Result<PoststationClient, ConnectError> {
751 let mut root_cert_store = RootCertStore::empty();
752 root_cert_store
753 .add(CertificateDer::from_pem_file(ca_path).map_err(|_| ConnectError::CaCertificate)?)
754 .map_err(|_| ConnectError::CaCertificate)?;
755 let config = rustls::ClientConfig::builder()
756 .with_root_certificates(root_cert_store)
757 .with_no_client_auth();
758 let connector = TlsConnector::from(Arc::new(config));
759 let stream = TcpStream::connect(addr)
760 .await
761 .map_err(|_| ConnectError::Connection)?;
762 stream
763 .set_nodelay(false)
764 .map_err(|_| ConnectError::Connection)?;
765 let addr = stream.peer_addr().map_err(|_| ConnectError::Connection)?;
766 let stream = connector
767 .connect(ServerName::IpAddress(addr.ip().into()), stream)
768 .await
769 .map_err(|_| ConnectError::Connection)?;
770 let (rx, tx) = split(stream);
771
772 let client = HostClient::<WireError>::new_with_wire(
773 TcpCommsTx { tx },
774 TcpCommsRx {
775 rx,
776 addr,
777 buf: vec![],
778 },
779 TcpSpawn,
780 postcard_rpc::header::VarSeqKind::Seq4,
781 ERROR_PATH,
782 64,
783 );
784
785 let res = client
786 .send_resp::<PingEndpoint>(&42)
787 .await
788 .map_err(|_| ConnectError::Protocol)?;
789
790 if res != 42 {
791 return Err(ConnectError::Protocol);
792 }
793
794 Ok(PoststationClient { client })
795}
796
797pub enum TcpCommsRxError {
798 RxOverflow,
799 ConnError,
800}
801
802impl Debug for TcpCommsRxError {
803 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804 f.write_str("oops")
805 }
806}
807
808impl Display for TcpCommsRxError {
809 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
810 f.write_str("oops")
811 }
812}
813
814impl Error for TcpCommsRxError {}
815
816struct TcpCommsRx<T: AsyncRead + Send + 'static> {
817 addr: SocketAddr,
818 buf: Vec<u8>,
819 rx: ReadHalf<T>,
820}
821
822impl<T: AsyncRead + Send + 'static> TcpCommsRx<T> {
823 async fn receive_inner(&mut self) -> Result<Vec<u8>, TcpCommsRxError> {
824 let mut rx_buf = [0u8; 1024];
825 'frame: loop {
826 if self.buf.len() > (1024 * 1024) {
827 tracing::warn!(?self.addr, "Refusing to collect >1MiB, terminating");
828 self.buf.clear();
829 return Err(TcpCommsRxError::RxOverflow);
830 }
831
832 if let Some(pos) = self.buf.iter().position(|b| *b == 0) {
834 let mut split = self.buf.split_off(pos + 1);
836 core::mem::swap(&mut self.buf, &mut split);
837
838 let res = cobs::decode_vec(&split);
840 let Ok(msg) = res else {
841 tracing::warn!(?self.addr, discarded = split.len(), "Discarding bad message (cobs)");
842 continue 'frame;
843 };
844
845 return Ok(msg);
846 }
847
848 let Ok(used) = self.rx.read(&mut rx_buf).await else {
850 tracing::warn!(?self.addr, "Closing");
851 return Err(TcpCommsRxError::ConnError);
852 };
853 if used == 0 {
854 tracing::warn!(?self.addr, "Closing");
855 return Err(TcpCommsRxError::ConnError);
856 }
857 self.buf.extend_from_slice(&rx_buf[..used]);
858 }
859 }
860}
861
862impl<T: AsyncRead + Send + 'static> WireRx for TcpCommsRx<T> {
863 type Error = TcpCommsRxError;
864
865 fn receive(&mut self) -> impl Future<Output = Result<Vec<u8>, Self::Error>> + Send {
866 self.receive_inner()
867 }
868}
869
870pub enum TcpCommsTxError {
873 CommsError,
874}
875
876impl Debug for TcpCommsTxError {
877 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
878 f.write_str("oops")
879 }
880}
881
882impl Display for TcpCommsTxError {
883 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
884 f.write_str("oops")
885 }
886}
887
888impl Error for TcpCommsTxError {}
889
890struct TcpCommsTx<T: AsyncWrite + Send + 'static> {
891 tx: WriteHalf<T>,
892}
893
894impl<T: AsyncWrite + Send + 'static> TcpCommsTx<T> {
895 async fn send_inner(&mut self, data: Vec<u8>) -> Result<(), TcpCommsTxError> {
896 let mut data = cobs::encode_vec(&data);
897 data.push(0);
898 self.tx
899 .write_all(&data)
900 .await
901 .map_err(|_| TcpCommsTxError::CommsError)
902 }
903}
904
905impl<T: AsyncWrite + Send + 'static> WireTx for TcpCommsTx<T> {
906 type Error = TcpCommsTxError;
907
908 fn send(&mut self, data: Vec<u8>) -> impl Future<Output = Result<(), Self::Error>> + Send {
909 self.send_inner(data)
910 }
911}
912
913struct TcpSpawn;
916
917impl WireSpawn for TcpSpawn {
918 fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
919 tokio::spawn(fut);
920 }
921}