1use std::net::ToSocketAddrs;
2
3use bytes::{Buf, BytesMut};
4use futures::prelude::*;
5use futures::sink::SinkExt;
6
7use tokio::net::TcpStream;
8use tokio_util::codec::{Decoder, Encoder, Framed};
9
10pub type ClientTransport = Framed<TcpStream, ClientCodec>;
11
12use crate::frame;
13use crate::{FromServer, Message, Result, ToServer};
14use anyhow::{anyhow, bail};
15
16pub async fn connect(
20 address: impl Into<String>,
21 login: Option<String>,
22 passcode: Option<String>,
23) -> Result<ClientTransport> {
24 let address = address.into();
25 let addr = address.as_str().to_socket_addrs().unwrap().next().unwrap();
26 let tcp = TcpStream::connect(&addr).await?;
27 let mut transport = ClientCodec.framed(tcp);
28 client_handshake(&mut transport, address, login, passcode).await?;
29 Ok(transport)
30}
31
32async fn client_handshake(
33 transport: &mut ClientTransport,
34 host: String,
35 login: Option<String>,
36 passcode: Option<String>,
37) -> Result<()> {
38 let connect = Message {
39 content: ToServer::Connect {
40 accept_version: "1.2".into(),
41 host,
42 login,
43 passcode,
44 heartbeat: None,
45 },
46 extra_headers: vec![],
47 };
48 transport.send(connect).await?;
50 let msg = transport.next().await.transpose()?;
52 if let Some(FromServer::Connected { .. }) = msg.as_ref().map(|m| &m.content) {
53 Ok(())
54 } else {
55 Err(anyhow!("unexpected reply: {:?}", msg))
56 }
57}
58
59pub fn subscribe(dest: impl Into<String>, id: impl Into<String>) -> Message<ToServer> {
61 ToServer::Subscribe {
62 destination: dest.into(),
63 id: id.into(),
64 ack: None,
65 }
66 .into()
67}
68
69pub struct ClientCodec;
70
71impl Decoder for ClientCodec {
72 type Item = Message<FromServer>;
73 type Error = anyhow::Error;
74
75 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
76 let (item, offset) = match frame::parse_frame(src) {
77 Ok((remain, frame)) => (
78 Message::<FromServer>::from_frame(frame),
79 remain.as_ptr() as usize - src.as_ptr() as usize,
80 ),
81 Err(nom::Err::Incomplete(_)) => return Ok(None),
82 Err(e) => bail!("Parse failed: {:?}", e),
83 };
84 src.advance(offset);
85 item.map(Some)
86 }
87}
88
89impl Encoder<Message<ToServer>> for ClientCodec {
90 type Error = anyhow::Error;
91
92 fn encode(
93 &mut self,
94 item: Message<ToServer>,
95 dst: &mut BytesMut,
96 ) -> std::result::Result<(), Self::Error> {
97 item.to_frame().serialize(dst);
98 Ok(())
99 }
100}