tokio_stomp/
client.rs

1use bytes::{Buf, BytesMut};
2use futures::prelude::*;
3use futures::sink::SinkExt;
4
5use tokio::net::TcpStream;
6use tokio_util::codec::{Decoder, Encoder, Framed};
7
8pub type ClientTransport = Framed<TcpStream, ClientCodec>;
9
10use crate::frame;
11use crate::{FromServer, Message, Result, ToServer};
12use anyhow::{anyhow, bail};
13
14/// Connect to a STOMP server via TCP, including the connection handshake.
15/// If successful, returns a tuple of a message stream and a sender,
16/// which may be used to receive and send messages respectively.
17pub async fn connect(
18    server: impl tokio::net::ToSocketAddrs,
19    host: impl Into<String>,
20    login: Option<String>,
21    passcode: Option<String>,
22) -> Result<ClientTransport> {
23    let tcp = TcpStream::connect(server).await?;
24    let mut transport = ClientCodec.framed(tcp);
25    client_handshake(&mut transport, host.into(), login, passcode).await?;
26    Ok(transport)
27}
28
29async fn client_handshake(
30    transport: &mut ClientTransport,
31    host: String,
32    login: Option<String>,
33    passcode: Option<String>,
34) -> Result<()> {
35    let connect = Message {
36        content: ToServer::Connect {
37            accept_version: "1.2".into(),
38            host,
39            login,
40            passcode,
41            heartbeat: None,
42        },
43        extra_headers: vec![],
44    };
45    // Send the message
46    transport.send(connect).await?;
47    // Receive reply
48    let msg = transport.next().await.transpose()?;
49    if let Some(FromServer::Connected { .. }) = msg.as_ref().map(|m| &m.content) {
50        Ok(())
51    } else {
52        Err(anyhow!("unexpected reply: {:?}", msg))
53    }
54}
55
56/// Convenience function to build a Subscribe message
57pub fn subscribe(dest: impl Into<String>, id: impl Into<String>) -> Message<ToServer> {
58    ToServer::Subscribe {
59        destination: dest.into(),
60        id: id.into(),
61        ack: None,
62    }
63    .into()
64}
65
66pub struct ClientCodec;
67
68impl Decoder for ClientCodec {
69    type Item = Message<FromServer>;
70    type Error = anyhow::Error;
71
72    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
73        let (item, offset) = match frame::parse_frame(&src) {
74            Ok((remain, frame)) => (
75                Message::<FromServer>::from_frame(frame),
76                remain.as_ptr() as usize - src.as_ptr() as usize,
77            ),
78            Err(nom::Err::Incomplete(_)) => return Ok(None),
79            Err(e) => bail!("Parse failed: {:?}", e),
80        };
81        src.advance(offset);
82        item.map(|v| Some(v))
83    }
84}
85
86impl Encoder<Message<ToServer>> for ClientCodec {
87    type Error = anyhow::Error;
88
89    fn encode(
90        &mut self,
91        item: Message<ToServer>,
92        dst: &mut BytesMut,
93    ) -> std::result::Result<(), Self::Error> {
94        item.to_frame().serialize(dst);
95        Ok(())
96    }
97}