poststation_sdk/
lib.rs

1use core::fmt::Debug;
2use std::{
3    error::Error,
4    fmt::Display,
5    future::Future,
6    marker::PhantomData,
7    net::{IpAddr, Ipv4Addr, SocketAddr},
8    path::{Path, PathBuf},
9    sync::Arc,
10};
11
12use directories::ProjectDirs;
13use postcard_rpc::{
14    host_client::{
15        HostClient, HostErr, MultiSubRxError, MultiSubscription, SchemaReport, TopicReport, WireRx,
16        WireSpawn, WireTx,
17    },
18    standard_icd::{PingEndpoint, WireError, ERROR_PATH},
19    Endpoint, Topic,
20};
21use poststation_api_icd::postsock::{
22    Anchor, DeviceData, Direction, GetDevicesEndpoint, GetLogsEndpoint, GetLogsRangeEndpoint,
23    GetSchemasEndpoint, GetTopicsEndpoint, Log, LogRangeRequest, LogRequest, ProxyEndpoint,
24    ProxyRequest, ProxyResponse, PublishEndpoint, PublishRequest, PublishResponse,
25    StartStreamEndpoint, SubscribeTopic, TopicMsg, TopicRequest, TopicStreamMsg,
26    TopicStreamRequest, TopicStreamResult, Uuidv7,
27};
28use rustls::{
29    pki_types::{pem::PemObject, CertificateDer, ServerName},
30    RootCertStore,
31};
32use serde::{de::DeserializeOwned, Serialize};
33use tokio::{
34    io::{split, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf},
35    net::TcpStream,
36};
37
38pub use postcard_schema as schema;
39pub use poststation_api_icd as icd;
40pub use postcard_dyn::Value;
41use tokio_rustls::TlsConnector;
42
43// ---
44
45#[non_exhaustive]
46#[derive(Debug)]
47pub enum ClientError {
48    /// The connection to the server has been closed
49    ConnectionClosed,
50    /// An issue with the protocol between client and server occurred
51    Protocol,
52    /// An issue with the encoding of messages occurred
53    Encoding,
54    /// An issue was reported by the server
55    Server(String),
56    /// An issue occurred between the remote device and the poststation server
57    Remote(String),
58    /// An issue occurred with dynamic serialization/deserialization
59    Dynamic(String),
60}
61
62impl From<HostErr<WireError>> for ClientError {
63    fn from(value: HostErr<WireError>) -> Self {
64        match value {
65            HostErr::Wire(_e) => ClientError::Protocol,
66            HostErr::BadResponse => ClientError::Protocol,
67            HostErr::Postcard(_error) => ClientError::Encoding,
68            HostErr::Closed => ClientError::ConnectionClosed,
69        }
70    }
71}
72
73impl Display for ClientError {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        <Self as Debug>::fmt(self, f)
76    }
77}
78
79impl Error for ClientError {}
80
81#[derive(Clone)]
82pub struct PoststationClient {
83    client: HostClient<WireError>,
84}
85
86impl PoststationClient {
87    /// Obtain the raw postcard-rpc client used to talk to the poststation server
88    ///
89    /// Not recommended for direct usage.
90    #[doc(hidden)]
91    pub fn raw_client(&self) -> &HostClient<WireError> {
92        &self.client
93    }
94
95    /// Get devices known by the Poststation server
96    ///
97    /// Returns an Error if connection to the Poststation server has failed.
98    pub async fn get_devices(&self) -> Result<Vec<DeviceData>, ClientError> {
99        Ok(self.client.send_resp::<GetDevicesEndpoint>(&()).await?)
100    }
101
102    /// Get a schema report of the given device
103    ///
104    /// * Returns `Ok(Some(report))` if the device exists and the operation succeeded
105    /// * Returns `Ok(None)` if the operation succeeded, but the given device was not known to Poststation
106    /// * Returns an error if the connection with the server failed
107    pub async fn get_device_schemas(
108        &self,
109        serial: u64,
110    ) -> Result<Option<SchemaReport>, ClientError> {
111        let res = self.client.send_resp::<GetSchemasEndpoint>(&serial).await?;
112        Ok(res)
113    }
114
115    /// Obtain the most recent `count` logs from the given device
116    ///
117    /// * Returns `Ok(Some(logs))` if the device exists and the operation succeeded
118    /// * Returns `Ok(None)` if the operation succeeded, but the given device was not known to Poststation
119    /// * Returns an error if the connection with the server failed
120    pub async fn get_device_logs(
121        &self,
122        serial: u64,
123        count: u32,
124    ) -> Result<Option<Vec<Log>>, ClientError> {
125        Ok(self
126            .client
127            .send_resp::<GetLogsEndpoint>(&LogRequest { serial, count })
128            .await?)
129    }
130
131    /// Obtain the up to `count` logs from the given device
132    ///
133    /// While [`get_device_logs()`][Self::get_device_logs] gets the NEWEST logs, this method can be used
134    /// to obtain logs before or after a given "anchor" point. This is often handy for pagination of results,
135    /// allowing for getting N logs before or after a given point.
136    ///
137    /// [Anchor] can be either a unix millisecond timestamp, or the UUID of a specific log event.
138    ///
139    /// * Returns `Ok(Some(logs))` if the device exists and the operation succeeded
140    /// * Returns `Ok(None)` if the operation succeeded, but the given device was not known to Poststation
141    /// * Returns an error if the connection with the server failed
142    pub async fn get_device_logs_range(
143        &self,
144        serial: u64,
145        count: u32,
146        dir: Direction,
147        anchor: Anchor,
148    ) -> Result<Option<Vec<Log>>, ClientError> {
149        Ok(self
150            .client
151            .send_resp::<GetLogsRangeEndpoint>(&LogRangeRequest {
152                serial,
153                count,
154                anchor,
155                direction: dir,
156            })
157            .await?)
158    }
159
160    /// Get the last `count` topic-out messages for a given `path`
161    ///
162    /// This function returns the raw, serialized form of messages. You will need to deserialize
163    /// the messages using `postcard` and the known type of the message.
164    ///
165    /// * Returns `Ok(Some(msgs))` if the device exists and the operation succeeded
166    /// * Returns `Ok(None)` if the operation succeeded, but the given device was not known to Poststation
167    /// * Returns an error if the connection with the server failed
168    pub async fn get_device_topics_out_by_path_raw(
169        &self,
170        serial: u64,
171        path: &str,
172        count: u32,
173    ) -> Result<Option<Vec<TopicMsg>>, ClientError> {
174        let schemas = self.get_device_schemas(serial).await?;
175        let Some(schemas) = schemas else {
176            return Ok(None);
177        };
178
179        // find key
180        let res = schemas
181            .topics_out
182            .iter()
183            .find(|t| t.path.as_str() == path)
184            .map(|t| t.key);
185        let Some(key) = res else { return Ok(None) };
186
187        Ok(self
188            .client
189            .send_resp::<GetTopicsEndpoint>(&TopicRequest {
190                serial,
191                count,
192                path: path.to_string(),
193                key,
194            })
195            .await?)
196    }
197
198    /// Get the last `count` topic-out messages for a given `path`
199    ///
200    /// This function returns the messages in JSON form.
201    ///
202    /// * Returns `Ok(Some(msgs))` if the device exists and the operation succeeded
203    /// * Returns `Ok(None)` if the operation succeeded, but the given device was not known to Poststation
204    /// * Returns an error if the connection with the server failed
205    pub async fn get_device_topics_out_by_path_json(
206        &self,
207        serial: u64,
208        path: &str,
209        count: u32,
210    ) -> Result<Option<Vec<(Uuidv7, Value)>>, ClientError> {
211        let schemas = self.get_device_schemas(serial).await?;
212        let Some(schemas) = schemas else {
213            return Ok(None);
214        };
215
216        // find key
217        let res = schemas.topics_out.iter().find(|t| t.path.as_str() == path);
218        let Some(schema) = res else { return Ok(None) };
219
220        let raws = self
221            .client
222            .send_resp::<GetTopicsEndpoint>(&TopicRequest {
223                serial,
224                count,
225                path: path.to_string(),
226                key: schema.key,
227            })
228            .await?;
229        let Some(raws) = raws else {
230            return Ok(None);
231        };
232
233        let res = raws
234            .into_iter()
235            .map(|tm| {
236                let msg = postcard_dyn::from_slice_dyn(&schema.ty, &tm.msg)
237                    .map_err(|_| ClientError::Encoding)?;
238                Result::<_, ClientError>::Ok((tm.uuidv7, msg))
239            })
240            .collect::<Result<Vec<_>, _>>()?;
241
242        Ok(Some(res))
243    }
244
245    /// Proxy to an endpoint, using shared types
246    ///
247    /// This method should be used when the types used by the remote server are known.
248    ///
249    /// This method is the equivalent of `send_resp` in postcard-rpc. The message will be
250    /// sent to poststation, and proxied to the remote device. The response will then be
251    /// forwarded back from poststation to the client.
252    ///
253    /// Returns an error if the connection between the client and poststation failed, or
254    /// if the connection between poststation and the remote device failed.
255    pub async fn proxy_endpoint<E>(
256        &self,
257        serial: u64,
258        seq_no: u32,
259        body: &E::Request,
260    ) -> Result<E::Response, ClientError>
261    where
262        E: Endpoint,
263        E::Request: Serialize,
264        E::Response: DeserializeOwned,
265    {
266        let Some(schemas) = self.get_device_schemas(serial).await? else {
267            return Err(ClientError::Server("endpoint not found".into()));
268        };
269
270        // find key
271        // TODO: Don't compare the types because the names don't match even though we've
272        // type-punned
273        let res = schemas.endpoints.iter().find(|e| {
274            e.path.as_str() == E::PATH && e.req_key == E::REQ_KEY && e.resp_key == E::RESP_KEY
275        });
276        let Some(schema) = res else {
277            return Err(ClientError::Server("endpoint not found".into()));
278        };
279        let Ok(body) = postcard::to_stdvec(body) else {
280            return Err(ClientError::Encoding);
281        };
282
283        let req = ProxyRequest {
284            serial,
285            path: schema.path.clone(),
286            req_key: schema.req_key,
287            resp_key: schema.resp_key,
288            seq_no,
289            req_body: body,
290        };
291
292        let resp = self.client.send_resp::<ProxyEndpoint>(&req).await;
293
294        // client to poststation comms
295        let resp = resp?;
296
297        // poststation to remote comms
298        let resp = match resp {
299            ProxyResponse::Ok { body, .. } => body,
300            ProxyResponse::WireErr { body, .. } => {
301                return Err(ClientError::Remote(format!("WireErr: {body:?}")))
302            }
303            ProxyResponse::OtherErr(e) => {
304                return Err(ClientError::Remote(format!("Other Server Err: '{e}'")))
305            }
306        };
307
308        let resp = postcard::from_bytes::<E::Response>(&resp);
309
310        match resp {
311            Ok(v) => Ok(v),
312            Err(_e) => Err(ClientError::Encoding),
313        }
314    }
315
316    /// Proxy to an endpoint, WITHOUT using shared types
317    ///
318    /// This method should be used when the types used by the remote server are NOT known.
319    ///
320    /// This method is the equivalent of `send_resp` in postcard-rpc. The message will be
321    /// sent to poststation, and proxied to the remote device. The response will then be
322    /// forwarded back from poststation to the client.
323    ///
324    /// Returns an error if the connection between the client and poststation failed, or
325    /// if the connection between poststation and the remote device failed. This method
326    /// also returns an error if the provided `Value` does not match the schema reported
327    /// by the remote device.
328    pub async fn proxy_endpoint_json(
329        &self,
330        serial: u64,
331        path: &str,
332        seq_no: u32,
333        body: Value,
334    ) -> Result<Value, ClientError> {
335        let Ok(Some(schemas)) = self.get_device_schemas(serial).await else {
336            return Err(ClientError::Server("endpoint not found".into()));
337        };
338
339        // find key
340        let res = schemas.endpoints.iter().find(|e| e.path.as_str() == path);
341        let Some(schema) = res else {
342            return Err(ClientError::Server("endpoint not found".into()));
343        };
344
345        let Ok(body) = postcard_dyn::to_stdvec_dyn(&schema.req_ty, &body) else {
346            return Err(ClientError::Dynamic(
347                "provided JSON does not match the expected schema for this endpoint".into(),
348            ));
349        };
350        let req = ProxyRequest {
351            serial,
352            path: schema.path.clone(),
353            req_key: schema.req_key,
354            resp_key: schema.resp_key,
355            seq_no,
356            req_body: body,
357        };
358
359        let resp = self.client.send_resp::<ProxyEndpoint>(&req).await;
360
361        // client to poststation comms
362        let resp = resp?;
363
364        // poststation to remote comms
365        let resp = match resp {
366            ProxyResponse::Ok { body, .. } => body,
367            ProxyResponse::WireErr { body, .. } => {
368                return Err(ClientError::Remote(format!("WireErr: {body:?}")))
369            }
370            ProxyResponse::OtherErr(e) => {
371                return Err(ClientError::Remote(format!("Other Server Err: '{e}'")))
372            }
373        };
374
375        let resp = postcard_dyn::from_slice_dyn(&schema.resp_ty, &resp);
376
377        match resp {
378            Ok(v) => Ok(v),
379            Err(e) => Err(ClientError::Dynamic(format!("Decode error: '{e:?}'"))),
380        }
381    }
382
383    /// Publish a `topic-in` message to a given device, WITHOUT shared types
384    ///
385    /// This is the equivalent to `publish` in `postcard-rpc`.
386    ///
387    /// Returns an error if the connection between the client and poststation, or the connection
388    /// between poststation and the remote device has failed. Also returns an error if the provided
389    /// `Value` does not match the schema reported by the remote device.
390    pub async fn publish_topic_json(
391        &self,
392        serial: u64,
393        path: &str,
394        seq_no: u32,
395        body: Value,
396    ) -> Result<(), ClientError> {
397        let Ok(Some(schemas)) = self.get_device_schemas(serial).await else {
398            return Err(ClientError::Server("topic not found".into()));
399        };
400
401        // find key
402        let res = schemas.topics_in.iter().find(|e| e.path.as_str() == path);
403        let Some(schema) = res else {
404            return Err(ClientError::Server("topic not found".into()));
405        };
406
407        let Ok(body) = postcard_dyn::to_stdvec_dyn(&schema.ty, &body) else {
408            return Err(ClientError::Dynamic(
409                "provided JSON does not match the schema for this topic".into(),
410            ));
411        };
412        let req = PublishRequest {
413            serial,
414            path: schema.path.clone(),
415            topic_key: schema.key,
416            seq_no,
417            topic_body: body,
418        };
419
420        let resp = self.client.send_resp::<PublishEndpoint>(&req).await;
421
422        let resp = resp?;
423
424        match resp {
425            PublishResponse::Sent => Ok(()),
426            PublishResponse::OtherErr(e) => Err(ClientError::Server(e)),
427        }
428    }
429
430    /// Publish a `topic-in` message to a given device, WITH shared types
431    ///
432    /// Returns an error if the connection between the client and poststation, or the connection
433    /// between poststation and the remote device has failed.
434    pub async fn publish_topic<T>(
435        &self,
436        serial: u64,
437        seq_no: u32,
438        body: &T::Message,
439    ) -> Result<(), ClientError>
440    where
441        T: Topic,
442        T::Message: Serialize,
443    {
444        let Ok(Some(schemas)) = self.get_device_schemas(serial).await else {
445            return Err(ClientError::Server("topic not found".into()));
446        };
447
448        // find key
449        // TODO: Don't compare the types because the names don't match even though we've
450        // type-punned
451        let res = schemas
452            .topics_in
453            .iter()
454            .find(|t| t.path.as_str() == T::PATH && t.key == T::TOPIC_KEY);
455        let Some(schema) = res else {
456            return Err(ClientError::Server("topic not found".into()));
457        };
458
459        let Ok(body) = postcard::to_stdvec(body) else {
460            return Err(ClientError::Encoding);
461        };
462        let req = PublishRequest {
463            serial,
464            path: schema.path.clone(),
465            topic_key: schema.key,
466            seq_no,
467            topic_body: body,
468        };
469
470        let resp = self.client.send_resp::<PublishEndpoint>(&req).await;
471
472        let resp = resp?;
473
474        match resp {
475            PublishResponse::Sent => Ok(()),
476            PublishResponse::OtherErr(e) => Err(ClientError::Server(e)),
477        }
478    }
479
480    /// Listen to a given topic path, receiving a subscription that yields live messages
481    pub async fn stream_topic_json(
482        &self,
483        serial: u64,
484        path: &str,
485    ) -> Result<JsonStreamListener, ClientError> {
486        let Ok(Some(schemas)) = self.get_device_schemas(serial).await else {
487            return Err(ClientError::Server("topic not found".into()));
488        };
489
490        // find key
491        let res = schemas
492            .topics_out
493            .iter()
494            .find(|e| e.path.as_str() == path)
495            .cloned();
496        let Some(schema) = res else {
497            return Err(ClientError::Server("topic not found".into()));
498        };
499
500        let sub = self
501            .client
502            .subscribe_multi::<SubscribeTopic>(64)
503            .await
504            .map_err(|_| ClientError::ConnectionClosed)?;
505
506        let res = self
507            .client
508            .send_resp::<StartStreamEndpoint>(&TopicStreamRequest {
509                serial,
510                path: path.to_string(),
511                key: schema.key,
512            })
513            .await;
514
515        let stream_id = match res? {
516            TopicStreamResult::Started(id) => id,
517            TopicStreamResult::DeviceDisconnected => {
518                return Err(ClientError::Server("Device Disconnected".into()))
519            }
520            TopicStreamResult::NoDeviceKnown => {
521                return Err(ClientError::Server("No Device Known".into()))
522            }
523            TopicStreamResult::NoSuchTopic => {
524                return Err(ClientError::Server("No Such Topic".into()))
525            }
526        };
527
528        Ok(JsonStreamListener {
529            schema,
530            sub,
531            stream_id,
532        })
533    }
534
535    /// Listen to a given topic path, receiving a subscription that yields live messages
536    pub async fn stream_topic<T>(&self, serial: u64) -> Result<StreamListener<T>, ClientError>
537    where
538        T: Topic,
539        T::Message: DeserializeOwned,
540    {
541        let Ok(Some(schemas)) = self.get_device_schemas(serial).await else {
542            return Err(ClientError::Server("topic not found".into()));
543        };
544
545        // find key
546        let res = schemas
547            .topics_out
548            .iter()
549            .find(|e| e.path.as_str() == T::PATH && e.key == T::TOPIC_KEY)
550            .cloned();
551        let Some(schema) = res else {
552            return Err(ClientError::Server("topic not found".into()));
553        };
554
555        let sub = self
556            .client
557            .subscribe_multi::<SubscribeTopic>(64)
558            .await
559            .map_err(|_| ClientError::ConnectionClosed)?;
560
561        let res = self
562            .client
563            .send_resp::<StartStreamEndpoint>(&TopicStreamRequest {
564                serial,
565                path: T::PATH.to_string(),
566                key: schema.key,
567            })
568            .await;
569
570        let stream_id = match res? {
571            TopicStreamResult::Started(id) => id,
572            TopicStreamResult::DeviceDisconnected => {
573                return Err(ClientError::Server("Device Disconnected".into()))
574            }
575            TopicStreamResult::NoDeviceKnown => {
576                return Err(ClientError::Server("No Device Known".into()))
577            }
578            TopicStreamResult::NoSuchTopic => {
579                return Err(ClientError::Server("No Such Topic".into()))
580            }
581        };
582
583        Ok(StreamListener {
584            sub,
585            stream_id,
586            _pd: PhantomData,
587        })
588    }
589}
590
591pub struct JsonStreamListener {
592    stream_id: Uuidv7,
593    schema: TopicReport,
594    sub: MultiSubscription<TopicStreamMsg>,
595}
596
597impl JsonStreamListener {
598    /// Receive a single message from this subscription
599    ///
600    /// Returns None if the connection has been closed
601    pub async fn recv(&mut self) -> Option<Value> {
602        loop {
603            let msg = match self.sub.recv().await {
604                Ok(m) => m,
605                Err(MultiSubRxError::IoClosed) => return None,
606                Err(MultiSubRxError::Lagged(n)) => {
607                    tracing::warn!(stream_id = ?self.stream_id, lags = n, "Stream lagged");
608                    continue;
609                }
610            };
611
612            let TopicStreamMsg { stream_id, msg } = msg;
613            if stream_id != self.stream_id {
614                continue;
615            }
616
617            let Ok(msg) = postcard_dyn::from_slice_dyn(&self.schema.ty, &msg) else {
618                continue;
619            };
620            return Some(msg);
621        }
622    }
623}
624
625pub struct StreamListener<T>
626where
627    T: Topic,
628    T::Message: DeserializeOwned,
629{
630    stream_id: Uuidv7,
631    sub: MultiSubscription<TopicStreamMsg>,
632    _pd: PhantomData<fn() -> T>,
633}
634
635impl<T> StreamListener<T>
636where
637    T: Topic,
638    T::Message: DeserializeOwned,
639{
640    /// Receive a single message from this subscription
641    ///
642    /// Returns None if the connection has been closed
643    pub async fn recv(&mut self) -> Option<T::Message> {
644        loop {
645            let msg = match self.sub.recv().await {
646                Ok(m) => m,
647                Err(MultiSubRxError::IoClosed) => return None,
648                Err(MultiSubRxError::Lagged(n)) => {
649                    tracing::warn!(stream_id = ?self.stream_id, lags = n, "Stream lagged");
650                    continue;
651                }
652            };
653
654            let TopicStreamMsg { stream_id, msg } = msg;
655            if stream_id != self.stream_id {
656                continue;
657            }
658
659            let Ok(msg) = postcard::from_bytes(&msg) else {
660                continue;
661            };
662            return Some(msg);
663        }
664    }
665}
666
667/// Connect to a server configured in "insecure" mode
668///
669/// "Insecure" is not the default setting. Your poststation server must
670/// be configured to "insecure" to allow this.
671pub async fn connect_insecure(port: u16) -> Result<PoststationClient, ConnectError> {
672    // Insecure can only be located on localhost
673    let socket = TcpStream::connect(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port))
674        .await
675        .map_err(|_| ConnectError::Connection)?;
676    let addr = socket.peer_addr().map_err(|_| ConnectError::Connection)?;
677    socket
678        .set_nodelay(true)
679        .map_err(|_| ConnectError::Connection)?;
680    let (rx, tx) = split(socket);
681
682    let client = HostClient::<WireError>::new_with_wire(
683        TcpCommsTx { tx },
684        TcpCommsRx {
685            rx,
686            addr,
687            buf: vec![],
688        },
689        TcpSpawn,
690        postcard_rpc::header::VarSeqKind::Seq4,
691        ERROR_PATH,
692        64,
693    );
694
695    let res = client
696        .send_resp::<PingEndpoint>(&42)
697        .await
698        .map_err(|_| ConnectError::Protocol)?;
699
700    if res != 42 {
701        return Err(ConnectError::Protocol);
702    }
703
704    Ok(PoststationClient { client })
705}
706
707#[non_exhaustive]
708#[derive(Debug)]
709pub enum ConnectError {
710    // Failed to load CA Certificate for server
711    CaCertificate,
712    // Failed to connect to poststation server
713    Connection,
714    // Protocol check failed
715    Protocol,
716}
717
718impl Display for ConnectError {
719    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
720        <Self as Debug>::fmt(self, f)
721    }
722}
723
724impl Error for ConnectError {}
725
726/// Connect to a server configured with Self Signed TLS certificates (default)
727///
728/// This function can only be used on the same device as the poststation server.
729///
730/// If you are connecting to a remote device, you will need to obtain the CA cert
731/// from the poststation server, and use [`connect_with_ca_pem`] to connect
732pub async fn connect<T: tokio::net::ToSocketAddrs>(
733    addr: T,
734) -> Result<PoststationClient, ConnectError> {
735    // If we are on the same machine as the Poststation server, we can load the CA cert from the
736    // working folder of poststation
737    let Some(dirs) = ProjectDirs::from("com.onevariable", "onevariable", "poststation") else {
738        return Err(ConnectError::CaCertificate);
739    };
740    let data_dir = dirs.data_dir();
741    let mut pem_path = PathBuf::from(data_dir);
742    pem_path.push("ca-cert.pem");
743    connect_with_ca_pem(addr, &pem_path).await
744}
745
746/// Connect to a server with the given TLS CA certificate
747pub async fn connect_with_ca_pem<T: tokio::net::ToSocketAddrs>(
748    addr: T,
749    ca_path: &Path,
750) -> Result<PoststationClient, ConnectError> {
751    let mut root_cert_store = RootCertStore::empty();
752    root_cert_store
753        .add(CertificateDer::from_pem_file(ca_path).map_err(|_| ConnectError::CaCertificate)?)
754        .map_err(|_| ConnectError::CaCertificate)?;
755    let config = rustls::ClientConfig::builder()
756        .with_root_certificates(root_cert_store)
757        .with_no_client_auth();
758    let connector = TlsConnector::from(Arc::new(config));
759    let stream = TcpStream::connect(addr)
760        .await
761        .map_err(|_| ConnectError::Connection)?;
762    stream
763        .set_nodelay(false)
764        .map_err(|_| ConnectError::Connection)?;
765    let addr = stream.peer_addr().map_err(|_| ConnectError::Connection)?;
766    let stream = connector
767        .connect(ServerName::IpAddress(addr.ip().into()), stream)
768        .await
769        .map_err(|_| ConnectError::Connection)?;
770    let (rx, tx) = split(stream);
771
772    let client = HostClient::<WireError>::new_with_wire(
773        TcpCommsTx { tx },
774        TcpCommsRx {
775            rx,
776            addr,
777            buf: vec![],
778        },
779        TcpSpawn,
780        postcard_rpc::header::VarSeqKind::Seq4,
781        ERROR_PATH,
782        64,
783    );
784
785    let res = client
786        .send_resp::<PingEndpoint>(&42)
787        .await
788        .map_err(|_| ConnectError::Protocol)?;
789
790    if res != 42 {
791        return Err(ConnectError::Protocol);
792    }
793
794    Ok(PoststationClient { client })
795}
796
797pub enum TcpCommsRxError {
798    RxOverflow,
799    ConnError,
800}
801
802impl Debug for TcpCommsRxError {
803    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
804        f.write_str("oops")
805    }
806}
807
808impl Display for TcpCommsRxError {
809    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
810        f.write_str("oops")
811    }
812}
813
814impl Error for TcpCommsRxError {}
815
816struct TcpCommsRx<T: AsyncRead + Send + 'static> {
817    addr: SocketAddr,
818    buf: Vec<u8>,
819    rx: ReadHalf<T>,
820}
821
822impl<T: AsyncRead + Send + 'static> TcpCommsRx<T> {
823    async fn receive_inner(&mut self) -> Result<Vec<u8>, TcpCommsRxError> {
824        let mut rx_buf = [0u8; 1024];
825        'frame: loop {
826            if self.buf.len() > (1024 * 1024) {
827                tracing::warn!(?self.addr, "Refusing to collect >1MiB, terminating");
828                self.buf.clear();
829                return Err(TcpCommsRxError::RxOverflow);
830            }
831
832            // Do we have a message already?
833            if let Some(pos) = self.buf.iter().position(|b| *b == 0) {
834                // we found the end of a message, attempt to decode it
835                let mut split = self.buf.split_off(pos + 1);
836                core::mem::swap(&mut self.buf, &mut split);
837
838                // Can we decode the cobs?
839                let res = cobs::decode_vec(&split);
840                let Ok(msg) = res else {
841                    tracing::warn!(?self.addr, discarded = split.len(), "Discarding bad message (cobs)");
842                    continue 'frame;
843                };
844
845                return Ok(msg);
846            }
847
848            // No message yet, let's try and receive some data
849            let Ok(used) = self.rx.read(&mut rx_buf).await else {
850                tracing::warn!(?self.addr, "Closing");
851                return Err(TcpCommsRxError::ConnError);
852            };
853            if used == 0 {
854                tracing::warn!(?self.addr, "Closing");
855                return Err(TcpCommsRxError::ConnError);
856            }
857            self.buf.extend_from_slice(&rx_buf[..used]);
858        }
859    }
860}
861
862impl<T: AsyncRead + Send + 'static> WireRx for TcpCommsRx<T> {
863    type Error = TcpCommsRxError;
864
865    fn receive(&mut self) -> impl Future<Output = Result<Vec<u8>, Self::Error>> + Send {
866        self.receive_inner()
867    }
868}
869
870// ---
871
872pub enum TcpCommsTxError {
873    CommsError,
874}
875
876impl Debug for TcpCommsTxError {
877    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
878        f.write_str("oops")
879    }
880}
881
882impl Display for TcpCommsTxError {
883    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
884        f.write_str("oops")
885    }
886}
887
888impl Error for TcpCommsTxError {}
889
890struct TcpCommsTx<T: AsyncWrite + Send + 'static> {
891    tx: WriteHalf<T>,
892}
893
894impl<T: AsyncWrite + Send + 'static> TcpCommsTx<T> {
895    async fn send_inner(&mut self, data: Vec<u8>) -> Result<(), TcpCommsTxError> {
896        let mut data = cobs::encode_vec(&data);
897        data.push(0);
898        self.tx
899            .write_all(&data)
900            .await
901            .map_err(|_| TcpCommsTxError::CommsError)
902    }
903}
904
905impl<T: AsyncWrite + Send + 'static> WireTx for TcpCommsTx<T> {
906    type Error = TcpCommsTxError;
907
908    fn send(&mut self, data: Vec<u8>) -> impl Future<Output = Result<(), Self::Error>> + Send {
909        self.send_inner(data)
910    }
911}
912
913// ---
914
915struct TcpSpawn;
916
917impl WireSpawn for TcpSpawn {
918    fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
919        tokio::spawn(fut);
920    }
921}