tokio_stomp_2/
client.rs

1use std::net::ToSocketAddrs;
2
3use bytes::{Buf, BytesMut};
4use futures::prelude::*;
5use futures::sink::SinkExt;
6
7use tokio::net::TcpStream;
8use tokio_util::codec::{Decoder, Encoder, Framed};
9
10pub type ClientTransport = Framed<TcpStream, ClientCodec>;
11
12use crate::frame;
13use crate::{FromServer, Message, Result, ToServer};
14use anyhow::{anyhow, bail};
15
16/// Connect to a STOMP server via TCP, including the connection handshake.
17/// If successful, returns a tuple of a message stream and a sender,
18/// which may be used to receive and send messages respectively.
19pub async fn connect(
20    address: impl Into<String>,
21    login: Option<String>,
22    passcode: Option<String>,
23) -> Result<ClientTransport> {
24    let address = address.into();
25    let addr = address.as_str().to_socket_addrs().unwrap().next().unwrap();
26    let tcp = TcpStream::connect(&addr).await?;
27    let mut transport = ClientCodec.framed(tcp);
28    client_handshake(&mut transport, address, login, passcode).await?;
29    Ok(transport)
30}
31
32async fn client_handshake(
33    transport: &mut ClientTransport,
34    host: String,
35    login: Option<String>,
36    passcode: Option<String>,
37) -> Result<()> {
38    let connect = Message {
39        content: ToServer::Connect {
40            accept_version: "1.2".into(),
41            host,
42            login,
43            passcode,
44            heartbeat: None,
45        },
46        extra_headers: vec![],
47    };
48    // Send the message
49    transport.send(connect).await?;
50    // Receive reply
51    let msg = transport.next().await.transpose()?;
52    if let Some(FromServer::Connected { .. }) = msg.as_ref().map(|m| &m.content) {
53        Ok(())
54    } else {
55        Err(anyhow!("unexpected reply: {:?}", msg))
56    }
57}
58
59/// Convenience function to build a Subscribe message
60pub fn subscribe(dest: impl Into<String>, id: impl Into<String>) -> Message<ToServer> {
61    ToServer::Subscribe {
62        destination: dest.into(),
63        id: id.into(),
64        ack: None,
65    }
66    .into()
67}
68
69pub struct ClientCodec;
70
71impl Decoder for ClientCodec {
72    type Item = Message<FromServer>;
73    type Error = anyhow::Error;
74
75    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
76        let (item, offset) = match frame::parse_frame(src) {
77            Ok((remain, frame)) => (
78                Message::<FromServer>::from_frame(frame),
79                remain.as_ptr() as usize - src.as_ptr() as usize,
80            ),
81            Err(nom::Err::Incomplete(_)) => return Ok(None),
82            Err(e) => bail!("Parse failed: {:?}", e),
83        };
84        src.advance(offset);
85        item.map(Some)
86    }
87}
88
89impl Encoder<Message<ToServer>> for ClientCodec {
90    type Error = anyhow::Error;
91
92    fn encode(
93        &mut self,
94        item: Message<ToServer>,
95        dst: &mut BytesMut,
96    ) -> std::result::Result<(), Self::Error> {
97        item.to_frame().serialize(dst);
98        Ok(())
99    }
100}