tokio_stomp_rs/
client.rs

1use bytes::{Buf, BytesMut};
2use futures::prelude::*;
3use futures::sink::SinkExt;
4
5use tokio::net::TcpStream;
6use tokio_util::codec::{Decoder, Encoder, Framed};
7
8pub type ClientTransport = Framed<TcpStream, ClientCodec>;
9
10use crate::frame;
11use crate::{FromServer, Message, Result, ToServer};
12use anyhow::{anyhow, bail};
13use tracing::debug;
14
15/// Connect to a STOMP server via TCP, including the connection handshake.
16/// If successful, returns a tuple of a message stream and a sender,
17/// which may be used to receive and send messages respectively.
18pub async fn connect(
19    server: impl tokio::net::ToSocketAddrs,
20    host: impl Into<String>,
21    login: Option<String>,
22    passcode: Option<String>,
23) -> Result<ClientTransport> {
24    let tcp = TcpStream::connect(server).await?;
25    let mut transport = ClientCodec.framed(tcp);
26    client_handshake(&mut transport, host.into(), login, passcode).await?;
27    Ok(transport)
28}
29
30async fn client_handshake(
31    transport: &mut ClientTransport,
32    host: String,
33    login: Option<String>,
34    passcode: Option<String>,
35) -> Result<()> {
36    let connect = Message {
37        content: ToServer::Connect {
38            accept_version: "1.2".into(),
39            host,
40            login,
41            passcode,
42            heartbeat: None,
43        },
44        extra_headers: vec![],
45    };
46    // Send the message
47    transport.send(connect).await?;
48    // Receive reply
49    let msg = transport.next().await.transpose()?;
50    if let Some(FromServer::Connected { .. }) = msg.as_ref().map(|m| &m.content) {
51        Ok(())
52    } else {
53        Err(anyhow!("unexpected reply: {:?}", msg))
54    }
55}
56
57/// Convenience function to build a Subscribe message
58pub fn subscribe(dest: impl Into<String>, id: impl Into<String>) -> Message<ToServer> {
59    ToServer::Subscribe {
60        destination: dest.into(),
61        id: id.into(),
62        ack: None,
63    }
64    .into()
65}
66
67pub struct ClientCodec;
68
69impl Decoder for ClientCodec {
70    type Item = Message<FromServer>;
71    type Error = anyhow::Error;
72
73    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
74        debug!("Decoding frame, current buffer: {:?}", src);
75        let (item, offset) = match frame::parse_frame(&src) {
76            Ok((remain, frame)) => {
77                // Debug statement after parsing
78                debug!("Parsed frame: {:?}", frame);
79
80                (
81                    Message::<FromServer>::from_frame(frame),
82                    remain.as_ptr() as usize - src.as_ptr() as usize,
83                )
84            },
85            Err(nom::Err::Incomplete(_)) => return Ok(None),
86            Err(e) => bail!("Parse failed: {:?}", e),
87        };
88        src.advance(offset);
89        item.map(|v| Some(v))
90    }
91}
92
93impl Encoder<Message<ToServer>> for ClientCodec {
94    type Error = anyhow::Error;
95
96    fn encode(
97        &mut self,
98        item: Message<ToServer>,
99        dst: &mut BytesMut,
100    ) -> std::result::Result<(), Self::Error> {
101        debug!("Encoding frame, current buffer: {:?}", dst);
102        item.to_frame().serialize(dst);
103        Ok(())
104    }
105}