stomp_agnostic/lib.rs
1//! # STOMP Agnostic
2//!
3//! `stomp-agnostic` - A transport agnostic library for handling of STOMP messages
4//!
5//! This library exposes STOMP functionality through the [StompHandle](client::StompHandle) type.
6//! The `StompHandle` needs an implementation of [Transport](transport::Transport).
7//!
8//! `stomp-agnostic` is both transport agnostic, and async agnostic.
9//!
10//! # (Non-) Performance
11//! This crate does not have a specific focus on performance.
12//!
13//! # Transport agnostic
14//! Other STOMP libraries, like [async-stomp](https://github.com/snaggen/async-stomp),
15//! [wstomp](https://crates.io/crates/wstomp), etc. focus on one, or a few, specific transport
16//! methods such as TCP or WebSockets. This crate on the other hand, exposes a trait [Transport](transport::Transport)
17//! and the implementor is responsible for the transport. This makes this crate compatible with
18//! e.g. [tokio-tungstenite](https://crates.io/crates/tokio-tungstenite), but you have to implement
19//! the `Transport` trait yourself, there is nothing implemented for `tokio-tungstenite` out-of-the box.
20//!
21//! # Async agnostic
22//! This crate does not depend on a specific async stack. Bring your own.
23
24use bytes::{Bytes, BytesMut};
25use custom_debug_derive::Debug as CustomDebug;
26use frame::Frame;
27
28pub mod client;
29mod frame;
30pub mod transport;
31
32/// A representation of a STOMP frame
33///
34/// This struct holds the content of a STOMP message (which can be either
35/// a message sent to the server or received from the server) along with
36/// any extra headers that were present in the frame but not required by
37/// the specific message type.
38#[derive(Debug)]
39pub struct Message<T> {
40 /// The message content, which is either a ToServer or FromServer enum
41 pub content: T,
42 /// Headers present in the frame which were not required by the content type
43 /// Stored as raw bytes to avoid unnecessary conversions
44 pub extra_headers: Vec<(Vec<u8>, Vec<u8>)>,
45}
46
47/// Helper function for pretty-printing binary data in debug output
48///
49/// This function converts binary data (Option<Vec<u8>>) to a UTF-8 string
50/// for better readability in debug output.
51fn pretty_bytes(b: &Option<Vec<u8>>, f: &mut std::fmt::Formatter) -> std::fmt::Result {
52 if let Some(v) = b {
53 write!(f, "{}", String::from_utf8_lossy(v))
54 } else {
55 write!(f, "None")
56 }
57}
58
59/// A STOMP message sent from the server
60///
61/// This enum represents all possible message types that can be received from
62/// a STOMP server according to the STOMP 1.2 specification.
63///
64/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
65/// for more detailed information about each message type.
66#[derive(CustomDebug, Clone)]
67pub enum FromServer {
68 /// Connection established acknowledgment
69 ///
70 /// Sent by the server in response to a successful CONNECT/STOMP frame.
71 #[doc(hidden)] // The user shouldn't need to know about this one
72 Connected {
73 /// Protocol version
74 version: String,
75 /// Optional session identifier
76 session: Option<String>,
77 /// Optional server identifier
78 server: Option<String>,
79 /// Optional heartbeat settings
80 heartbeat: Option<String>,
81 },
82
83 /// Message received from a subscription
84 ///
85 /// Conveys messages from subscriptions to the client. Contains the
86 /// message content and associated metadata.
87 Message {
88 /// Destination the message was sent to
89 destination: String,
90 /// Unique message identifier
91 message_id: String,
92 /// Subscription identifier this message relates to
93 subscription: String,
94 /// All headers included in the message
95 headers: Vec<(String, String)>,
96 /// Optional message body
97 #[debug(with = "pretty_bytes")]
98 body: Option<Vec<u8>>,
99 },
100
101 /// Receipt confirmation
102 ///
103 /// Sent from the server to the client once a server has successfully
104 /// processed a client frame that requested a receipt.
105 Receipt {
106 /// Receipt identifier matching the client's receipt request
107 receipt_id: String,
108 },
109
110 /// Error notification
111 ///
112 /// Sent when something goes wrong. After sending an Error,
113 /// the server will close the connection.
114 Error {
115 /// Optional error message
116 message: Option<String>,
117 /// Optional error body with additional details
118 #[debug(with = "pretty_bytes")]
119 body: Option<Vec<u8>>,
120 },
121}
122
123// TODO tidy this lot up with traits?
124impl Message<FromServer> {
125 // fn to_frame<'a>(&'a self) -> Frame<'a> {
126 // unimplemented!()
127 // }
128
129 /// Convert a Frame into a Message<FromServer>
130 ///
131 /// This internal method handles conversion from the low-level Frame
132 /// representation to the high-level Message representation.
133 fn from_frame(frame: Frame) -> anyhow::Result<Message<FromServer>> {
134 frame.to_server_msg()
135 }
136}
137
138/// A STOMP message sent by the client
139///
140/// This enum represents all possible message types that can be sent to
141/// a STOMP server according to the STOMP 1.2 specification.
142///
143/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
144/// for more detailed information about each message type.
145#[derive(Debug, Clone)]
146pub enum ToServer {
147 /// Connection request message
148 ///
149 /// First frame sent to the server to establish a STOMP session.
150 Connect {
151 /// Protocol versions the client supports
152 accept_version: String,
153 /// Virtual host the client wants to connect to
154 host: String,
155 /// Optional authentication username
156 login: Option<String>,
157 /// Optional authentication password
158 passcode: Option<String>,
159 /// Optional heartbeat configuration (cx, cy)
160 heartbeat: Option<(u32, u32)>,
161 },
162
163 /// Send a message to a destination in the messaging system
164 ///
165 /// Used to send a message to a specific destination like a queue or topic.
166 Send {
167 /// Destination to send the message to
168 destination: String,
169 /// Optional transaction identifier
170 transaction: Option<String>,
171 /// Optional additional headers to include
172 headers: Option<Vec<(String, String)>>,
173 /// Optional message body
174 body: Option<Vec<u8>>,
175 },
176
177 /// Register to listen to a given destination
178 ///
179 /// Creates a subscription to receive messages from a specific destination.
180 Subscribe {
181 /// Destination to subscribe to
182 destination: String,
183 /// Client-generated subscription identifier
184 id: String,
185 /// Optional acknowledgment mode
186 ack: Option<AckMode>,
187 },
188
189 /// Remove an existing subscription
190 ///
191 /// Cancels a subscription so the client stops receiving messages from it.
192 Unsubscribe {
193 /// Subscription identifier to unsubscribe from
194 id: String,
195 },
196
197 /// Acknowledge consumption of a message from a subscription
198 ///
199 /// Used with 'client' or 'client-individual' acknowledgment modes to
200 /// confirm successful processing of a message.
201 Ack {
202 /// Message or subscription identifier to acknowledge
203 id: String,
204 /// Optional transaction identifier
205 transaction: Option<String>,
206 },
207
208 /// Notify the server that the client did not consume the message
209 ///
210 /// Used with 'client' or 'client-individual' acknowledgment modes to
211 /// indicate that a message could not be processed successfully.
212 Nack {
213 /// Message or subscription identifier to negative-acknowledge
214 id: String,
215 /// Optional transaction identifier
216 transaction: Option<String>,
217 },
218
219 /// Start a transaction
220 ///
221 /// Begins a new transaction that can group multiple STOMP operations.
222 Begin {
223 /// Client-generated transaction identifier
224 transaction: String,
225 },
226
227 /// Commit an in-progress transaction
228 ///
229 /// Completes a transaction and applies all its operations.
230 Commit {
231 /// Transaction identifier to commit
232 transaction: String,
233 },
234
235 /// Roll back an in-progress transaction
236 ///
237 /// Cancels a transaction and rolls back all its operations.
238 Abort {
239 /// Transaction identifier to abort
240 transaction: String,
241 },
242
243 /// Gracefully disconnect from the server
244 ///
245 /// Cleanly ends the STOMP session. Clients MUST NOT send any more
246 /// frames after the DISCONNECT frame is sent.
247 Disconnect {
248 /// Optional receipt request
249 receipt: Option<String>,
250 },
251}
252
253/// Acknowledgment modes for STOMP subscriptions
254///
255/// Controls how messages should be acknowledged when received through a subscription.
256#[derive(Debug, Clone, Copy)]
257pub enum AckMode {
258 /// Auto acknowledgment (the default if not specified)
259 ///
260 /// The client does not need to send ACK frames; the server will
261 /// assume the client received the message as soon as it is sent.
262 Auto,
263
264 /// Client acknowledgment
265 ///
266 /// The client must send an ACK frame for each message received.
267 /// An ACK acknowledges all messages received so far on the connection.
268 Client,
269
270 /// Client individual acknowledgment
271 ///
272 /// The client must send an ACK frame for each individual message.
273 /// Only the individual message referenced in the ACK is acknowledged.
274 ClientIndividual,
275}
276
277impl Message<ToServer> {
278 /// Convert this message to a low-level Frame
279 ///
280 /// This method converts the high-level Message to the low-level Frame
281 /// representation needed for serialization.
282 fn to_frame<'a>(&'a self) -> Frame<'a> {
283 // Create a frame from the message content
284 let mut frame = self.content.to_frame();
285 // Add any extra headers to the frame
286 frame.add_extra_headers(&self.extra_headers);
287 frame
288 }
289
290 /// Converts the message to a [Frame] and then serializes the frame as bytes. This is useful
291 /// for implementors that need to implement the [Transport](transport::Transport) trait.
292 pub fn into_bytes(self) -> Bytes {
293 let mut bytes_mut = BytesMut::new();
294 let frame = self.to_frame();
295 frame.serialize(&mut bytes_mut);
296 Bytes::from_owner(bytes_mut)
297 }
298
299 /// Convert a Frame into a Message<ToServer>
300 ///
301 /// This internal method handles conversion from the low-level Frame
302 /// representation to the high-level Message representation.
303 #[allow(dead_code)]
304 fn from_frame(frame: Frame) -> anyhow::Result<Message<ToServer>> {
305 frame.to_client_msg()
306 }
307}
308
309/// Implement From<ToServer> for Message<ToServer> to allow easy conversion
310///
311/// This allows ToServer enum variants to be easily converted to a Message
312/// with empty extra_headers, which is a common need when sending messages.
313impl From<ToServer> for Message<ToServer> {
314 /// Convert a ToServer enum into a Message<ToServer>
315 ///
316 /// This creates a Message with the given content and empty extra_headers.
317 fn from(content: ToServer) -> Message<ToServer> {
318 Message {
319 content,
320 extra_headers: vec![],
321 }
322 }
323}