1use bytes::{Buf, BytesMut};
2use futures::prelude::*;
3use futures::sink::SinkExt;
4
5use tokio::net::TcpStream;
6use tokio_util::codec::{Decoder, Encoder, Framed};
7
8pub type ClientTransport = Framed<TcpStream, ClientCodec>;
9
10use crate::frame;
11use crate::{FromServer, Message, Result, ToServer};
12use anyhow::{anyhow, bail};
13
14pub async fn connect(
18 server: impl tokio::net::ToSocketAddrs,
19 host: impl Into<String>,
20 login: Option<String>,
21 passcode: Option<String>,
22) -> Result<ClientTransport> {
23 let tcp = TcpStream::connect(server).await?;
24 let mut transport = ClientCodec.framed(tcp);
25 client_handshake(&mut transport, host.into(), login, passcode).await?;
26 Ok(transport)
27}
28
29async fn client_handshake(
30 transport: &mut ClientTransport,
31 host: String,
32 login: Option<String>,
33 passcode: Option<String>,
34) -> Result<()> {
35 let connect = Message {
36 content: ToServer::Connect {
37 accept_version: "1.2".into(),
38 host,
39 login,
40 passcode,
41 heartbeat: None,
42 },
43 extra_headers: vec![],
44 };
45 transport.send(connect).await?;
47 let msg = transport.next().await.transpose()?;
49 if let Some(FromServer::Connected { .. }) = msg.as_ref().map(|m| &m.content) {
50 Ok(())
51 } else {
52 Err(anyhow!("unexpected reply: {:?}", msg))
53 }
54}
55
56pub fn subscribe(dest: impl Into<String>, id: impl Into<String>) -> Message<ToServer> {
58 ToServer::Subscribe {
59 destination: dest.into(),
60 id: id.into(),
61 ack: None,
62 }
63 .into()
64}
65
66pub struct ClientCodec;
67
68impl Decoder for ClientCodec {
69 type Item = Message<FromServer>;
70 type Error = anyhow::Error;
71
72 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
73 let (item, offset) = match frame::parse_frame(&src) {
74 Ok((remain, frame)) => (
75 Message::<FromServer>::from_frame(frame),
76 remain.as_ptr() as usize - src.as_ptr() as usize,
77 ),
78 Err(nom::Err::Incomplete(_)) => return Ok(None),
79 Err(e) => bail!("Parse failed: {:?}", e),
80 };
81 src.advance(offset);
82 item.map(|v| Some(v))
83 }
84}
85
86impl Encoder<Message<ToServer>> for ClientCodec {
87 type Error = anyhow::Error;
88
89 fn encode(
90 &mut self,
91 item: Message<ToServer>,
92 dst: &mut BytesMut,
93 ) -> std::result::Result<(), Self::Error> {
94 item.to_frame().serialize(dst);
95 Ok(())
96 }
97}