1use bytes::{Buf, BytesMut};
2use futures::prelude::*;
3use futures::sink::SinkExt;
4
5use tokio::net::TcpStream;
6use tokio_util::codec::{Decoder, Encoder, Framed};
7
8pub type ClientTransport = Framed<TcpStream, ClientCodec>;
9
10use crate::frame;
11use crate::{FromServer, Message, Result, ToServer};
12use anyhow::{anyhow, bail};
13use tracing::debug;
14
15pub async fn connect(
19 server: impl tokio::net::ToSocketAddrs,
20 host: impl Into<String>,
21 login: Option<String>,
22 passcode: Option<String>,
23) -> Result<ClientTransport> {
24 let tcp = TcpStream::connect(server).await?;
25 let mut transport = ClientCodec.framed(tcp);
26 client_handshake(&mut transport, host.into(), login, passcode).await?;
27 Ok(transport)
28}
29
30async fn client_handshake(
31 transport: &mut ClientTransport,
32 host: String,
33 login: Option<String>,
34 passcode: Option<String>,
35) -> Result<()> {
36 let connect = Message {
37 content: ToServer::Connect {
38 accept_version: "1.2".into(),
39 host,
40 login,
41 passcode,
42 heartbeat: None,
43 },
44 extra_headers: vec![],
45 };
46 transport.send(connect).await?;
48 let msg = transport.next().await.transpose()?;
50 if let Some(FromServer::Connected { .. }) = msg.as_ref().map(|m| &m.content) {
51 Ok(())
52 } else {
53 Err(anyhow!("unexpected reply: {:?}", msg))
54 }
55}
56
57pub fn subscribe(dest: impl Into<String>, id: impl Into<String>) -> Message<ToServer> {
59 ToServer::Subscribe {
60 destination: dest.into(),
61 id: id.into(),
62 ack: None,
63 }
64 .into()
65}
66
67pub struct ClientCodec;
68
69impl Decoder for ClientCodec {
70 type Item = Message<FromServer>;
71 type Error = anyhow::Error;
72
73 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
74 debug!("Decoding frame, current buffer: {:?}", src);
75 let (item, offset) = match frame::parse_frame(&src) {
76 Ok((remain, frame)) => {
77 debug!("Parsed frame: {:?}", frame);
79
80 (
81 Message::<FromServer>::from_frame(frame),
82 remain.as_ptr() as usize - src.as_ptr() as usize,
83 )
84 },
85 Err(nom::Err::Incomplete(_)) => return Ok(None),
86 Err(e) => bail!("Parse failed: {:?}", e),
87 };
88 src.advance(offset);
89 item.map(|v| Some(v))
90 }
91}
92
93impl Encoder<Message<ToServer>> for ClientCodec {
94 type Error = anyhow::Error;
95
96 fn encode(
97 &mut self,
98 item: Message<ToServer>,
99 dst: &mut BytesMut,
100 ) -> std::result::Result<(), Self::Error> {
101 debug!("Encoding frame, current buffer: {:?}", dst);
102 item.to_frame().serialize(dst);
103 Ok(())
104 }
105}