stomp_agnostic/lib.rs
1//! # STOMP Agnostic
2//!
3//! `stomp-agnostic` - A transport and async agnostic library for handling of STOMP messages
4//!
5//! This library exposes _client_ STOMP functionality through the [ClientStompHandle] type.
6//! The `ClientStompHandle` needs an implementation of [ClientTransport].
7//!
8//! This library also exposes _server_ STOMP functionality through the [ServerStompHandle] type.
9//! The `ServerStompHandle` needs an implementation of [ServerTransport].
10//!
11//! `stomp-agnostic` is both transport agnostic and async agnostic.
12//!
13//! ## (Non-) Performance
14//! This crate does not have a specific focus on performance.
15//!
16//! ## Transport agnostic
17//! Other STOMP libraries, like [async-stomp](https://github.com/snaggen/async-stomp),
18//! [wstomp](https://crates.io/crates/wstomp), etc. focus on one, or a few, specific transport
19//! methods such as TCP or WebSockets. This crate on the other hand, exposes two traits,
20//! [ClientTransport] and [ServerTransport] and the implementor is responsible for the transport.
21//! This makes this crate compatible with e.g. [tokio-tungstenite](https://crates.io/crates/tokio-tungstenite),
22//! but you have to implement the `Transport` trait yourself, there is nothing implemented for
23//! `tokio-tungstenite` out-of-the box.
24//!
25//! ## Async agnostic
26//! This crate does not depend on a specific async stack. Bring your own.
27//!
28//! ## High level STOMP interface and low level control of the transport
29//! `ClientStompHandle` and `ServerStompHandle` are the high-level APIs to use when sending and
30//! receiving STOMP messages, but `stomp-agnostic` also makes it easy to get a hold of the
31//! underlying transport implementation, both as an exclusive refernce `&mut T` and consuming the
32//! handle itself to get the original `T: ClientTransport` or `T: ServerTransport` back, to perform
33//! low-level cleanup at any time, usually at the end of a session. This is accomplished through
34//! [ClientStompHandle::into_transport], [ClientStompHandle::as_mut_transport],
35//! [ServerStompHandle::into_transport], and [ServerStompHandle::as_mut_transport].
36//!
37//! ## Examples
38//! There are two examples: one implementing a basic WebSocket STOMP client using
39//! `tokio-tungstenite`, and another implementing a basic WebSocket STOMP server using `axum`.
40
41use bytes::{Bytes, BytesMut};
42use custom_debug_derive::Debug as CustomDebug;
43use frame::Frame;
44pub use handle::client::ClientStompHandle;
45pub use handle::server::ServerStompHandle;
46pub use transport::client::{ClientTransport, ServerResponse};
47pub use transport::server::{ClientData, ServerTransport};
48pub use transport::{ReadData, ReadError, WriteError};
49
50mod frame;
51mod handle;
52mod transport;
53
54/// A representation of a STOMP frame
55///
56/// This struct holds the content of a STOMP message (which can be either
57/// a message sent to the server or received from the server) along with
58/// any extra headers that were present in the frame but not required by
59/// the specific message type.
60#[derive(Debug)]
61pub struct Message<T> {
62 /// The message content, which is either a ToServer or FromServer enum
63 pub content: T,
64 /// Headers present in the frame which were not required by the content type
65 /// Stored as raw bytes to avoid unnecessary conversions
66 pub extra_headers: Vec<(Vec<u8>, Vec<u8>)>,
67}
68
69/// Helper function for pretty-printing binary data in debug output
70///
71/// This function converts binary data (Option<Vec<u8>>) to a UTF-8 string
72/// for better readability in debug output.
73fn pretty_bytes(b: &Option<Vec<u8>>, f: &mut std::fmt::Formatter) -> std::fmt::Result {
74 if let Some(v) = b {
75 write!(f, "{}", String::from_utf8_lossy(v))
76 } else {
77 write!(f, "None")
78 }
79}
80
81/// A STOMP message sent from the server
82///
83/// This enum represents all possible message types that can be received from
84/// a STOMP server according to the STOMP 1.2 specification.
85///
86/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
87/// for more detailed information about each message type.
88#[derive(CustomDebug, Clone)]
89pub enum FromServer {
90 /// Connection established acknowledgment
91 ///
92 /// Sent by the server in response to a successful CONNECT/STOMP frame.
93 #[doc(hidden)] // The user shouldn't need to know about this one
94 Connected {
95 /// Protocol version
96 version: String,
97 /// Optional session identifier
98 session: Option<String>,
99 /// Optional server identifier
100 server: Option<String>,
101 /// Optional heartbeat settings
102 heartbeat: Option<String>,
103 },
104
105 /// Message received from a subscription
106 ///
107 /// Conveys messages from subscriptions to the client. Contains the
108 /// message content and associated metadata.
109 Message {
110 /// Destination the message was sent to
111 destination: String,
112 /// Unique message identifier
113 message_id: String,
114 /// Subscription identifier this message relates to
115 subscription: String,
116 /// All headers included in the message
117 headers: Vec<(String, String)>,
118 /// Optional message body
119 #[debug(with = "pretty_bytes")]
120 body: Option<Vec<u8>>,
121 },
122
123 /// Receipt confirmation
124 ///
125 /// Sent from the server to the client once a server has successfully
126 /// processed a client frame that requested a receipt.
127 Receipt {
128 /// Receipt identifier matching the client's receipt request
129 receipt_id: String,
130 },
131
132 /// Error notification
133 ///
134 /// Sent when something goes wrong. After sending an Error,
135 /// the server will close the connection.
136 Error {
137 /// Optional error message
138 message: Option<String>,
139 /// Optional error body with additional details
140 #[debug(with = "pretty_bytes")]
141 body: Option<Vec<u8>>,
142 },
143}
144
145// TODO tidy this lot up with traits?
146impl Message<FromServer> {
147 fn to_frame<'a>(&'a self) -> Frame<'a> {
148 let mut frame = self.content.to_frame();
149 // Add any extra headers to the frame
150 frame.add_extra_headers(&self.extra_headers);
151 frame
152 }
153
154 pub fn into_bytes(self) -> Bytes {
155 let mut bytes_mut = BytesMut::new();
156 let frame = self.to_frame();
157 frame.serialize(&mut bytes_mut);
158 Bytes::from_owner(bytes_mut)
159 }
160
161 /// Convert a Frame into a Message<FromServer>
162 ///
163 /// This internal method handles conversion from the low-level Frame
164 /// representation to the high-level Message representation.
165 fn from_frame(frame: Frame) -> anyhow::Result<Message<FromServer>> {
166 frame.to_server_msg()
167 }
168}
169
170/// A STOMP message sent by the client
171///
172/// This enum represents all possible message types that can be sent to
173/// a STOMP server according to the STOMP 1.2 specification.
174///
175/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
176/// for more detailed information about each message type.
177#[derive(Debug, Clone)]
178pub enum ToServer {
179 /// Connection request message
180 ///
181 /// First frame sent to the server to establish a STOMP session.
182 Connect {
183 /// Protocol versions the client supports
184 accept_version: String,
185 /// Virtual host the client wants to connect to
186 host: String,
187 /// Optional authentication username
188 login: Option<String>,
189 /// Optional authentication password
190 passcode: Option<String>,
191 /// Optional heartbeat configuration (cx, cy)
192 heartbeat: Option<(u32, u32)>,
193 },
194
195 /// Send a message to a destination in the messaging system
196 ///
197 /// Used to send a message to a specific destination like a queue or topic.
198 Send {
199 /// Destination to send the message to
200 destination: String,
201 /// Optional transaction identifier
202 transaction: Option<String>,
203 /// Optional additional headers to include
204 headers: Option<Vec<(String, String)>>,
205 /// Optional message body
206 body: Option<Vec<u8>>,
207 },
208
209 /// Register to listen to a given destination
210 ///
211 /// Creates a subscription to receive messages from a specific destination.
212 Subscribe {
213 /// Destination to subscribe to
214 destination: String,
215 /// Client-generated subscription identifier
216 id: String,
217 /// Optional acknowledgment mode
218 ack: Option<AckMode>,
219 },
220
221 /// Remove an existing subscription
222 ///
223 /// Cancels a subscription so the client stops receiving messages from it.
224 Unsubscribe {
225 /// Subscription identifier to unsubscribe from
226 id: String,
227 },
228
229 /// Acknowledge consumption of a message from a subscription
230 ///
231 /// Used with 'client' or 'client-individual' acknowledgment modes to
232 /// confirm successful processing of a message.
233 Ack {
234 /// Message or subscription identifier to acknowledge
235 id: String,
236 /// Optional transaction identifier
237 transaction: Option<String>,
238 },
239
240 /// Notify the server that the client did not consume the message
241 ///
242 /// Used with 'client' or 'client-individual' acknowledgment modes to
243 /// indicate that a message could not be processed successfully.
244 Nack {
245 /// Message or subscription identifier to negative-acknowledge
246 id: String,
247 /// Optional transaction identifier
248 transaction: Option<String>,
249 },
250
251 /// Start a transaction
252 ///
253 /// Begins a new transaction that can group multiple STOMP operations.
254 Begin {
255 /// Client-generated transaction identifier
256 transaction: String,
257 },
258
259 /// Commit an in-progress transaction
260 ///
261 /// Completes a transaction and applies all its operations.
262 Commit {
263 /// Transaction identifier to commit
264 transaction: String,
265 },
266
267 /// Roll back an in-progress transaction
268 ///
269 /// Cancels a transaction and rolls back all its operations.
270 Abort {
271 /// Transaction identifier to abort
272 transaction: String,
273 },
274
275 /// Gracefully disconnect from the server
276 ///
277 /// Cleanly ends the STOMP session. Clients MUST NOT send any more
278 /// frames after the DISCONNECT frame is sent.
279 Disconnect {
280 /// Optional receipt request
281 receipt: Option<String>,
282 },
283}
284
285/// Acknowledgment modes for STOMP subscriptions
286///
287/// Controls how messages should be acknowledged when received through a subscription.
288#[derive(Debug, Clone, Copy)]
289pub enum AckMode {
290 /// Auto acknowledgment (the default if not specified)
291 ///
292 /// The client does not need to send ACK frames; the server will
293 /// assume the client received the message as soon as it is sent.
294 Auto,
295
296 /// Client acknowledgment
297 ///
298 /// The client must send an ACK frame for each message received.
299 /// An ACK acknowledges all messages received so far on the connection.
300 Client,
301
302 /// Client individual acknowledgment
303 ///
304 /// The client must send an ACK frame for each individual message.
305 /// Only the individual message referenced in the ACK is acknowledged.
306 ClientIndividual,
307}
308
309impl Message<ToServer> {
310 /// Convert this message to a low-level Frame
311 ///
312 /// This method converts the high-level [Message] to the low-level Frame
313 /// representation needed for serialization.
314 fn to_frame<'a>(&'a self) -> Frame<'a> {
315 // Create a frame from the message content
316 let mut frame = self.content.to_frame();
317 // Add any extra headers to the frame
318 frame.add_extra_headers(&self.extra_headers);
319 frame
320 }
321
322 /// Converts the message to a Frame and then serializes the frame as bytes. This is useful
323 /// for implementors that need to implement the [ClientTransport] trait.
324 pub fn into_bytes(self) -> Bytes {
325 let mut bytes_mut = BytesMut::new();
326 let frame = self.to_frame();
327 frame.serialize(&mut bytes_mut);
328 Bytes::from_owner(bytes_mut)
329 }
330
331 /// Convert a Frame into a Message<ToServer>
332 ///
333 /// This internal method handles conversion from the low-level Frame
334 /// representation to the high-level Message representation.
335 #[allow(dead_code)]
336 fn from_frame(frame: Frame) -> anyhow::Result<Message<ToServer>> {
337 frame.to_client_msg()
338 }
339}
340
341/// Implement `From<ToServer>` for `Message<ToServer>` to allow easy conversion
342///
343/// This allows [ToServer] enum variants to be easily converted to a [Message]
344/// with empty extra_headers, which is a common need when sending messages.
345impl From<ToServer> for Message<ToServer> {
346 /// Convert a [ToServer] enum into a `Message<ToServer>`
347 ///
348 /// This creates a [Message] with the given content and empty extra_headers.
349 fn from(content: ToServer) -> Message<ToServer> {
350 Message {
351 content,
352 extra_headers: vec![],
353 }
354 }
355}