Skip to main content

stomp_agnostic/handle/
client.rs

1use crate::transport::client::{BufferedTransport, ClientTransport, ServerResponse};
2use crate::{FromServer, Message, ReadError, ToServer, WriteError};
3use anyhow::anyhow;
4use std::fmt::Debug;
5
6/// A handle that reads and writes STOMP messages given an implementation of [ClientTransport].
7pub struct ClientStompHandle<T>
8where
9    T: ClientTransport,
10    T::ProtocolSideChannel: Debug,
11{
12    transport: BufferedTransport<T>,
13}
14
15impl<T> ClientStompHandle<T>
16where
17    T: ClientTransport,
18    T::ProtocolSideChannel: Debug,
19{
20    /// Creates a new [ClientStompHandle] for your code to interface with.
21    /// Requires an implementation of [ClientTransport].
22    pub async fn connect(
23        transport: T,
24        virtualhost: String,
25        login: Option<String>,
26        passcode: Option<String>,
27        headers: Vec<(String, String)>,
28    ) -> anyhow::Result<ClientStompHandle<T>> {
29        let transport = client_handshake(
30            BufferedTransport::new(transport),
31            virtualhost.clone(),
32            login,
33            passcode,
34            headers,
35        )
36        .await?;
37
38        Ok(ClientStompHandle { transport })
39    }
40
41    /// Send a STOMP message through the underlying transport
42    pub async fn send_message(&mut self, message: Message<ToServer>) -> Result<(), WriteError> {
43        self.transport.send(message).await
44    }
45
46    /// Read a STOMP message from the underlying transport
47    pub async fn read_response(
48        &mut self,
49    ) -> Result<ServerResponse<T::ProtocolSideChannel>, ReadError> {
50        self.transport.next().await
51    }
52
53    /// Consume the [ClientStompHandle] to get the original [ClientTransport] back.
54    pub fn into_transport(self) -> T {
55        self.transport.into_transport()
56    }
57
58    /// Get a mutable reference to the transport, to be able to handle e.g. WebSocket Ping/Pong
59    pub fn as_mut_transport(&mut self) -> &mut T {
60        self.transport.as_mut_inner()
61    }
62}
63
64/// Performs the STOMP protocol handshake with the server
65///
66/// This function sends a CONNECT frame to the server and waits for
67/// a CONNECTED response. If the server responds with anything else,
68/// the handshake is considered failed.
69async fn client_handshake<T>(
70    mut transport: BufferedTransport<T>,
71    virtualhost: String,
72    login: Option<String>,
73    passcode: Option<String>,
74    headers: Vec<(String, String)>,
75) -> anyhow::Result<BufferedTransport<T>>
76where
77    T: ClientTransport,
78    T::ProtocolSideChannel: Debug,
79{
80    // Convert custom headers to the binary format expected by the protocol
81    let extra_headers = headers
82        .iter()
83        .map(|(k, v)| (k.as_bytes().to_vec(), v.as_bytes().to_vec()))
84        .collect();
85
86    // Create the CONNECT message
87    let connect = Message {
88        content: ToServer::Connect {
89            accept_version: "1.2".into(),
90            host: virtualhost,
91            login,
92            passcode,
93            heartbeat: None,
94        },
95        extra_headers,
96    };
97
98    // Send the message to the server
99    transport.send(connect).await?;
100
101    // Receive and process the server's reply
102    let response = transport.next().await?;
103
104    match response {
105        ServerResponse::Message(msg) => {
106            // Check if the reply is a CONNECTED frame
107            if let FromServer::Connected { .. } = msg.content {
108                Ok(transport)
109            } else {
110                Err(anyhow!("Unexpected response: {msg:?}"))
111            }
112        }
113        ServerResponse::Custom(custom) => Err(anyhow!("Unexpected response: {custom:?}")),
114    }
115}