1use crate::console::Message;
2use bytes::Bytes;
3use futures_util::{SinkExt, StreamExt};
4use serde::Serialize;
5use tokio::net::{TcpStream, ToSocketAddrs};
6use tokio_util::codec::{BytesCodec, Framed};
7use tracing::debug;
8
9pub struct Client {
11 stream: Framed<TcpStream, BytesCodec>,
12}
13
14impl Client {
15 pub async fn new<A: ToSocketAddrs>(address: A) -> anyhow::Result<Self> {
16 let mut stream = Framed::new(TcpStream::connect(address).await?, BytesCodec::new());
18 debug!("Connected to server");
19
20 match stream.next().await {
22 Some(Ok(_bytes)) => Ok(Client { stream }),
23 Some(Err(e)) => Err(anyhow::Error::from(e)),
24 None => Err(anyhow::Error::msg("Connection closed unexpectedly")),
25 }
26 }
27
28 pub async fn send<S: Serialize, M: Serialize>(
30 &mut self,
31 service_id: S,
32 message: &M,
33 ) -> anyhow::Result<()> {
34 let console_message = Message::new(service_id, message)?;
35
36 let bytes: Bytes = bcs::to_bytes(&console_message)?.into();
38
39 self.stream.send(bytes).await?;
41
42 Ok(())
43 }
44
45 pub async fn weak_send(&mut self, message: &str) -> anyhow::Result<()> {
47 let bytes: Bytes = message.as_bytes().to_vec().into();
48 self.stream.send(bytes).await?;
49
50 Ok(())
51 }
52
53 pub async fn weak_read(&mut self) -> anyhow::Result<String> {
55 let bytes = self
56 .stream
57 .next()
58 .await
59 .ok_or(anyhow::anyhow!("Connection closed unexpectedly"))??
60 .freeze();
61
62 Ok(String::from_utf8_lossy(bytes.as_ref()).trim().to_string())
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use crate::{Subscription, SubscriptionError};
69 use async_trait::async_trait;
70 use bytes::Bytes;
71 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
72 use std::time::Duration;
73 use tokio::time;
74 use tracing::debug;
75 use tracing_subscriber::EnvFilter;
76
77 #[tokio::test]
78 async fn ipv4_vs_ipv6() -> anyhow::Result<()> {
79 let _ = tracing_subscriber::fmt()
80 .with_env_filter(EnvFilter::from_default_env()) .with_target(true) .try_init();
83
84 for address in [
85 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9090),
86 SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 2020),
87 ] {
88 let mut console = crate::Builder::new()
89 .bind_address(address)
90 .welcome("Welcome to TCP console!")
91 .subscribe(1u8, Test)?
92 .accept_only_localhost()
93 .build()?;
94
95 console.spawn().await?;
96
97 let mut client = crate::Client::new(address)
98 .await
99 .expect("Failed to create client");
100
101 client
102 .weak_send(&format!("Client connects to {address:?}"))
103 .await
104 .expect("Failed to send unknown message");
105
106 time::sleep(Duration::from_millis(100)).await;
107 console.stop();
108 time::sleep(Duration::from_millis(100)).await;
109 }
110
111 Ok(())
112 }
113
114 struct Test;
115
116 #[async_trait]
117 impl Subscription for Test {
118 async fn handle(&self, _message: Bytes) -> Result<Option<Bytes>, SubscriptionError> {
119 debug!("`Test` receives a strongly typed message");
120 Ok(None)
121 }
122
123 async fn weak_handle(&self, message: &str) -> Result<Option<String>, SubscriptionError> {
124 debug!("`Test` receives a text message: {message}");
125 Ok(None)
126 }
127 }
128}