Skip to main content

stomp_agnostic/
lib.rs

1//! # STOMP Agnostic
2//!
3//! `stomp-agnostic` - A transport and async agnostic library for handling of STOMP messages
4//!
5//! This library exposes _client_ STOMP functionality through the [ClientStompHandle] type.
6//! The `ClientStompHandle` needs an implementation of [ClientTransport].
7//!
8//! This library also exposes _server_ STOMP functionality through the [ServerStompHandle] type.
9//! The `ServerStompHandle` needs an implementation of [ServerTransport].
10//!
11//! `stomp-agnostic` is both transport agnostic and async agnostic.
12//!
13//! ## (Non-) Performance
14//! This crate does not have a specific focus on performance.
15//!
16//! ## Transport agnostic
17//! Other STOMP libraries, like [async-stomp](https://github.com/snaggen/async-stomp),
18//! [wstomp](https://crates.io/crates/wstomp), etc. focus on one, or a few, specific transport
19//! methods such as TCP or WebSockets. This crate on the other hand, exposes two traits,
20//! [ClientTransport] and [ServerTransport] and the implementor is responsible for the transport.
21//! This makes this crate compatible with e.g. [tokio-tungstenite](https://crates.io/crates/tokio-tungstenite),
22//! but you have to implement the `Transport` trait yourself, there is nothing implemented for
23//! `tokio-tungstenite` out-of-the box.
24//!
25//! ## Async agnostic
26//! This crate does not depend on a specific async stack. Bring your own.
27//!
28//! ## High level STOMP interface and low level control of the transport
29//! `ClientStompHandle` and `ServerStompHandle` are the high-level APIs to use when sending and
30//! receiving STOMP messages, but `stomp-agnostic` also makes it easy to get a hold of the
31//! underlying transport implementation, both as an exclusive refernce `&mut T` and consuming the
32//! handle itself to get the original `T: ClientTransport` or `T: ServerTransport` back, to perform
33//! low-level cleanup at any time, usually at the end of a session. This is accomplished through
34//! [ClientStompHandle::into_transport], [ClientStompHandle::as_mut_transport],
35//! [ServerStompHandle::into_transport], and [ServerStompHandle::as_mut_transport].
36//!
37//! ## Examples
38//! There are two examples: one implementing a basic WebSocket STOMP client using
39//! `tokio-tungstenite`, and another implementing a basic WebSocket STOMP server using `axum`.
40
41use bytes::{Bytes, BytesMut};
42use custom_debug_derive::Debug as CustomDebug;
43use frame::Frame;
44pub use handle::client::ClientStompHandle;
45pub use handle::server::ServerStompHandle;
46pub use transport::client::{ClientTransport, ServerResponse};
47pub use transport::server::{ClientData, ServerTransport};
48pub use transport::{ReadData, ReadError, WriteError};
49
50mod frame;
51mod handle;
52mod transport;
53
54/// A representation of a STOMP frame
55///
56/// This struct holds the content of a STOMP message (which can be either
57/// a message sent to the server or received from the server) along with
58/// any extra headers that were present in the frame but not required by
59/// the specific message type.
60#[derive(Debug)]
61pub struct Message<T> {
62    /// The message content, which is either a ToServer or FromServer enum
63    pub content: T,
64    /// Headers present in the frame which were not required by the content type
65    /// Stored as raw bytes to avoid unnecessary conversions
66    pub extra_headers: Vec<(Vec<u8>, Vec<u8>)>,
67}
68
69/// Helper function for pretty-printing binary data in debug output
70///
71/// This function converts binary data (Option<Vec<u8>>) to a UTF-8 string
72/// for better readability in debug output.
73fn pretty_bytes(b: &Option<Vec<u8>>, f: &mut std::fmt::Formatter) -> std::fmt::Result {
74    if let Some(v) = b {
75        write!(f, "{}", String::from_utf8_lossy(v))
76    } else {
77        write!(f, "None")
78    }
79}
80
81/// A STOMP message sent from the server
82///
83/// This enum represents all possible message types that can be received from
84/// a STOMP server according to the STOMP 1.2 specification.
85///
86/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
87/// for more detailed information about each message type.
88#[derive(CustomDebug, Clone)]
89pub enum FromServer {
90    /// Connection established acknowledgment
91    ///
92    /// Sent by the server in response to a successful CONNECT/STOMP frame.
93    #[doc(hidden)] // The user shouldn't need to know about this one
94    Connected {
95        /// Protocol version
96        version: String,
97        /// Optional session identifier
98        session: Option<String>,
99        /// Optional server identifier
100        server: Option<String>,
101        /// Optional heartbeat settings
102        heartbeat: Option<String>,
103    },
104
105    /// Message received from a subscription
106    ///
107    /// Conveys messages from subscriptions to the client. Contains the
108    /// message content and associated metadata.
109    Message {
110        /// Destination the message was sent to
111        destination: String,
112        /// Unique message identifier
113        message_id: String,
114        /// Subscription identifier this message relates to
115        subscription: String,
116        /// All headers included in the message
117        headers: Vec<(String, String)>,
118        /// Optional message body
119        #[debug(with = "pretty_bytes")]
120        body: Option<Vec<u8>>,
121    },
122
123    /// Receipt confirmation
124    ///
125    /// Sent from the server to the client once a server has successfully
126    /// processed a client frame that requested a receipt.
127    Receipt {
128        /// Receipt identifier matching the client's receipt request
129        receipt_id: String,
130    },
131
132    /// Error notification
133    ///
134    /// Sent when something goes wrong. After sending an Error,
135    /// the server will close the connection.
136    Error {
137        /// Optional error message
138        message: Option<String>,
139        /// Optional error body with additional details
140        #[debug(with = "pretty_bytes")]
141        body: Option<Vec<u8>>,
142    },
143}
144
145// TODO tidy this lot up with traits?
146impl Message<FromServer> {
147    fn to_frame<'a>(&'a self) -> Frame<'a> {
148        let mut frame = self.content.to_frame();
149        // Add any extra headers to the frame
150        frame.add_extra_headers(&self.extra_headers);
151        frame
152    }
153
154    pub fn into_bytes(self) -> Bytes {
155        let mut bytes_mut = BytesMut::new();
156        let frame = self.to_frame();
157        frame.serialize(&mut bytes_mut);
158        Bytes::from_owner(bytes_mut)
159    }
160
161    /// Convert a Frame into a Message<FromServer>
162    ///
163    /// This internal method handles conversion from the low-level Frame
164    /// representation to the high-level Message representation.
165    fn from_frame(frame: Frame) -> anyhow::Result<Message<FromServer>> {
166        frame.to_server_msg()
167    }
168}
169
170/// A STOMP message sent by the client
171///
172/// This enum represents all possible message types that can be sent to
173/// a STOMP server according to the STOMP 1.2 specification.
174///
175/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
176/// for more detailed information about each message type.
177#[derive(Debug, Clone)]
178pub enum ToServer {
179    /// Connection request message
180    ///
181    /// First frame sent to the server to establish a STOMP session.
182    Connect {
183        /// Protocol versions the client supports
184        accept_version: String,
185        /// Virtual host the client wants to connect to
186        host: String,
187        /// Optional authentication username
188        login: Option<String>,
189        /// Optional authentication password
190        passcode: Option<String>,
191        /// Optional heartbeat configuration (cx, cy)
192        heartbeat: Option<(u32, u32)>,
193    },
194
195    /// Send a message to a destination in the messaging system
196    ///
197    /// Used to send a message to a specific destination like a queue or topic.
198    Send {
199        /// Destination to send the message to
200        destination: String,
201        /// Optional transaction identifier
202        transaction: Option<String>,
203        /// Optional additional headers to include
204        headers: Option<Vec<(String, String)>>,
205        /// Optional message body
206        body: Option<Vec<u8>>,
207    },
208
209    /// Register to listen to a given destination
210    ///
211    /// Creates a subscription to receive messages from a specific destination.
212    Subscribe {
213        /// Destination to subscribe to
214        destination: String,
215        /// Client-generated subscription identifier
216        id: String,
217        /// Optional acknowledgment mode
218        ack: Option<AckMode>,
219    },
220
221    /// Remove an existing subscription
222    ///
223    /// Cancels a subscription so the client stops receiving messages from it.
224    Unsubscribe {
225        /// Subscription identifier to unsubscribe from
226        id: String,
227    },
228
229    /// Acknowledge consumption of a message from a subscription
230    ///
231    /// Used with 'client' or 'client-individual' acknowledgment modes to
232    /// confirm successful processing of a message.
233    Ack {
234        /// Message or subscription identifier to acknowledge
235        id: String,
236        /// Optional transaction identifier
237        transaction: Option<String>,
238    },
239
240    /// Notify the server that the client did not consume the message
241    ///
242    /// Used with 'client' or 'client-individual' acknowledgment modes to
243    /// indicate that a message could not be processed successfully.
244    Nack {
245        /// Message or subscription identifier to negative-acknowledge
246        id: String,
247        /// Optional transaction identifier
248        transaction: Option<String>,
249    },
250
251    /// Start a transaction
252    ///
253    /// Begins a new transaction that can group multiple STOMP operations.
254    Begin {
255        /// Client-generated transaction identifier
256        transaction: String,
257    },
258
259    /// Commit an in-progress transaction
260    ///
261    /// Completes a transaction and applies all its operations.
262    Commit {
263        /// Transaction identifier to commit
264        transaction: String,
265    },
266
267    /// Roll back an in-progress transaction
268    ///
269    /// Cancels a transaction and rolls back all its operations.
270    Abort {
271        /// Transaction identifier to abort
272        transaction: String,
273    },
274
275    /// Gracefully disconnect from the server
276    ///
277    /// Cleanly ends the STOMP session. Clients MUST NOT send any more
278    /// frames after the DISCONNECT frame is sent.
279    Disconnect {
280        /// Optional receipt request
281        receipt: Option<String>,
282    },
283}
284
285/// Acknowledgment modes for STOMP subscriptions
286///
287/// Controls how messages should be acknowledged when received through a subscription.
288#[derive(Debug, Clone, Copy)]
289pub enum AckMode {
290    /// Auto acknowledgment (the default if not specified)
291    ///
292    /// The client does not need to send ACK frames; the server will
293    /// assume the client received the message as soon as it is sent.
294    Auto,
295
296    /// Client acknowledgment
297    ///
298    /// The client must send an ACK frame for each message received.
299    /// An ACK acknowledges all messages received so far on the connection.
300    Client,
301
302    /// Client individual acknowledgment
303    ///
304    /// The client must send an ACK frame for each individual message.
305    /// Only the individual message referenced in the ACK is acknowledged.
306    ClientIndividual,
307}
308
309impl Message<ToServer> {
310    /// Convert this message to a low-level Frame
311    ///
312    /// This method converts the high-level [Message] to the low-level Frame
313    /// representation needed for serialization.
314    fn to_frame<'a>(&'a self) -> Frame<'a> {
315        // Create a frame from the message content
316        let mut frame = self.content.to_frame();
317        // Add any extra headers to the frame
318        frame.add_extra_headers(&self.extra_headers);
319        frame
320    }
321
322    /// Converts the message to a Frame and then serializes the frame as bytes. This is useful
323    /// for implementors that need to implement the [ClientTransport] trait.
324    pub fn into_bytes(self) -> Bytes {
325        let mut bytes_mut = BytesMut::new();
326        let frame = self.to_frame();
327        frame.serialize(&mut bytes_mut);
328        Bytes::from_owner(bytes_mut)
329    }
330
331    /// Convert a Frame into a Message<ToServer>
332    ///
333    /// This internal method handles conversion from the low-level Frame
334    /// representation to the high-level Message representation.
335    #[allow(dead_code)]
336    fn from_frame(frame: Frame) -> anyhow::Result<Message<ToServer>> {
337        frame.to_client_msg()
338    }
339}
340
341/// Implement `From<ToServer>` for `Message<ToServer>` to allow easy conversion
342///
343/// This allows [ToServer] enum variants to be easily converted to a [Message]
344/// with empty extra_headers, which is a common need when sending messages.
345impl From<ToServer> for Message<ToServer> {
346    /// Convert a [ToServer] enum into a `Message<ToServer>`
347    ///
348    /// This creates a [Message] with the given content and empty extra_headers.
349    fn from(content: ToServer) -> Message<ToServer> {
350        Message {
351            content,
352            extra_headers: vec![],
353        }
354    }
355}