tcp_console/
client.rs

1use crate::console::Message;
2use bytes::Bytes;
3use futures_util::{SinkExt, StreamExt};
4use serde::Serialize;
5use tokio::net::{TcpStream, ToSocketAddrs};
6use tokio_util::codec::{BytesCodec, Framed};
7use tracing::debug;
8
9/// Client for [Console].
10pub struct Client {
11    stream: Framed<TcpStream, BytesCodec>,
12}
13
14impl Client {
15    pub async fn new<A: ToSocketAddrs>(address: A) -> anyhow::Result<Self> {
16        // Connect to the TCP console server.
17        let mut stream = Framed::new(TcpStream::connect(address).await?, BytesCodec::new());
18        debug!("Connected to server");
19
20        // Receive the welcome message.
21        match stream.next().await {
22            Some(Ok(_bytes)) => Ok(Client { stream }),
23            Some(Err(e)) => Err(anyhow::Error::from(e)),
24            None => Err(anyhow::Error::msg("Connection closed unexpectedly")),
25        }
26    }
27
28    /// Sends a message to [Console] with any serializable payload.
29    pub async fn send<S: Serialize, M: Serialize>(
30        &mut self,
31        service_id: S,
32        message: &M,
33    ) -> anyhow::Result<()> {
34        let console_message = Message::new(service_id, message)?;
35
36        // Create bytes to send.
37        let bytes: Bytes = bcs::to_bytes(&console_message)?.into();
38
39        // Send bytes.
40        self.stream.send(bytes).await?;
41
42        Ok(())
43    }
44
45    /// Sends a message to [Console] with any text.
46    pub async fn weak_send(&mut self, message: &str) -> anyhow::Result<()> {
47        let bytes: Bytes = message.as_bytes().to_vec().into();
48        self.stream.send(bytes).await?;
49
50        Ok(())
51    }
52
53    /// Receives a text message from [Console].
54    pub async fn weak_read(&mut self) -> anyhow::Result<String> {
55        let bytes = self
56            .stream
57            .next()
58            .await
59            .ok_or(anyhow::anyhow!("Connection closed unexpectedly"))??
60            .freeze();
61
62        Ok(String::from_utf8_lossy(bytes.as_ref()).trim().to_string())
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use crate::{Subscription, SubscriptionError};
69    use async_trait::async_trait;
70    use bytes::Bytes;
71    use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
72    use std::time::Duration;
73    use tokio::time;
74    use tracing::debug;
75    use tracing_subscriber::EnvFilter;
76
77    #[tokio::test]
78    async fn ipv4_vs_ipv6() -> anyhow::Result<()> {
79        let _ = tracing_subscriber::fmt()
80            .with_env_filter(EnvFilter::from_default_env()) // Read filter level from RUST_LOG
81            .with_target(true) // Include target in logs
82            .try_init();
83
84        for address in [
85            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090),
86            SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 2020),
87        ] {
88            let mut console = crate::Builder::new()
89                .bind_address(address)
90                .welcome("Welcome to TCP console!")
91                .subscribe(1u8, Test)?
92                .accept_only_localhost()
93                .build()?;
94
95            console.spawn().await?;
96
97            let mut client = crate::Client::new(address)
98                .await
99                .expect("Failed to create client");
100
101            client
102                .weak_send(&format!("Client connects to {address:?}"))
103                .await
104                .expect("Failed to send unknown message");
105
106            time::sleep(Duration::from_millis(100)).await;
107            console.stop();
108            time::sleep(Duration::from_millis(100)).await;
109        }
110
111        Ok(())
112    }
113
114    struct Test;
115
116    #[async_trait]
117    impl Subscription for Test {
118        async fn handle(&self, _message: Bytes) -> Result<Option<Bytes>, SubscriptionError> {
119            debug!("`Test` receives a strongly typed message");
120            Ok(None)
121        }
122
123        async fn weak_handle(&self, message: &str) -> Result<Option<String>, SubscriptionError> {
124            debug!("`Test` receives a text message: {message}");
125            Ok(None)
126        }
127    }
128}