stomp_agnostic/lib.rs
1//! # STOMP Agnostic
2//!
3//! `stomp-agnostic` - A transport agnostic library for handling of STOMP messages
4//!
5//! This library exposes STOMP functionality through the [StompHandle] type.
6//! The `StompHandle` needs an implementation of [Transport].
7//!
8//! `stomp-agnostic` is both transport agnostic, and async agnostic.
9//!
10//! # (Non-) Performance
11//! This crate does not have a specific focus on performance.
12//!
13//! # Transport agnostic
14//! Other STOMP libraries, like [async-stomp](https://github.com/snaggen/async-stomp),
15//! [wstomp](https://crates.io/crates/wstomp), etc. focus on one, or a few, specific transport
16//! methods such as TCP or WebSockets. This crate on the other hand, exposes a trait [Transport](transport::Transport)
17//! and the implementor is responsible for the transport. This makes this crate compatible with
18//! e.g. [tokio-tungstenite](https://crates.io/crates/tokio-tungstenite), but you have to implement
19//! the `Transport` trait yourself, there is nothing implemented for `tokio-tungstenite` out-of-the box.
20//!
21//! # Async agnostic
22//! This crate does not depend on a specific async stack. Bring your own.
23
24use bytes::{Bytes, BytesMut};
25use custom_debug_derive::Debug as CustomDebug;
26use frame::Frame;
27pub use handle::StompHandle;
28pub use transport::{ReadError, ReadResponse, Response, Transport, WriteError};
29
30mod frame;
31mod handle;
32mod transport;
33
34/// A representation of a STOMP frame
35///
36/// This struct holds the content of a STOMP message (which can be either
37/// a message sent to the server or received from the server) along with
38/// any extra headers that were present in the frame but not required by
39/// the specific message type.
40#[derive(Debug)]
41pub struct Message<T> {
42 /// The message content, which is either a ToServer or FromServer enum
43 pub content: T,
44 /// Headers present in the frame which were not required by the content type
45 /// Stored as raw bytes to avoid unnecessary conversions
46 pub extra_headers: Vec<(Vec<u8>, Vec<u8>)>,
47}
48
49/// Helper function for pretty-printing binary data in debug output
50///
51/// This function converts binary data (Option<Vec<u8>>) to a UTF-8 string
52/// for better readability in debug output.
53fn pretty_bytes(b: &Option<Vec<u8>>, f: &mut std::fmt::Formatter) -> std::fmt::Result {
54 if let Some(v) = b {
55 write!(f, "{}", String::from_utf8_lossy(v))
56 } else {
57 write!(f, "None")
58 }
59}
60
61/// A STOMP message sent from the server
62///
63/// This enum represents all possible message types that can be received from
64/// a STOMP server according to the STOMP 1.2 specification.
65///
66/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
67/// for more detailed information about each message type.
68#[derive(CustomDebug, Clone)]
69pub enum FromServer {
70 /// Connection established acknowledgment
71 ///
72 /// Sent by the server in response to a successful CONNECT/STOMP frame.
73 #[doc(hidden)] // The user shouldn't need to know about this one
74 Connected {
75 /// Protocol version
76 version: String,
77 /// Optional session identifier
78 session: Option<String>,
79 /// Optional server identifier
80 server: Option<String>,
81 /// Optional heartbeat settings
82 heartbeat: Option<String>,
83 },
84
85 /// Message received from a subscription
86 ///
87 /// Conveys messages from subscriptions to the client. Contains the
88 /// message content and associated metadata.
89 Message {
90 /// Destination the message was sent to
91 destination: String,
92 /// Unique message identifier
93 message_id: String,
94 /// Subscription identifier this message relates to
95 subscription: String,
96 /// All headers included in the message
97 headers: Vec<(String, String)>,
98 /// Optional message body
99 #[debug(with = "pretty_bytes")]
100 body: Option<Vec<u8>>,
101 },
102
103 /// Receipt confirmation
104 ///
105 /// Sent from the server to the client once a server has successfully
106 /// processed a client frame that requested a receipt.
107 Receipt {
108 /// Receipt identifier matching the client's receipt request
109 receipt_id: String,
110 },
111
112 /// Error notification
113 ///
114 /// Sent when something goes wrong. After sending an Error,
115 /// the server will close the connection.
116 Error {
117 /// Optional error message
118 message: Option<String>,
119 /// Optional error body with additional details
120 #[debug(with = "pretty_bytes")]
121 body: Option<Vec<u8>>,
122 },
123}
124
125// TODO tidy this lot up with traits?
126impl Message<FromServer> {
127 // fn to_frame<'a>(&'a self) -> Frame<'a> {
128 // unimplemented!()
129 // }
130
131 /// Convert a Frame into a Message<FromServer>
132 ///
133 /// This internal method handles conversion from the low-level Frame
134 /// representation to the high-level Message representation.
135 fn from_frame(frame: Frame) -> anyhow::Result<Message<FromServer>> {
136 frame.to_server_msg()
137 }
138}
139
140/// A STOMP message sent by the client
141///
142/// This enum represents all possible message types that can be sent to
143/// a STOMP server according to the STOMP 1.2 specification.
144///
145/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
146/// for more detailed information about each message type.
147#[derive(Debug, Clone)]
148pub enum ToServer {
149 /// Connection request message
150 ///
151 /// First frame sent to the server to establish a STOMP session.
152 Connect {
153 /// Protocol versions the client supports
154 accept_version: String,
155 /// Virtual host the client wants to connect to
156 host: String,
157 /// Optional authentication username
158 login: Option<String>,
159 /// Optional authentication password
160 passcode: Option<String>,
161 /// Optional heartbeat configuration (cx, cy)
162 heartbeat: Option<(u32, u32)>,
163 },
164
165 /// Send a message to a destination in the messaging system
166 ///
167 /// Used to send a message to a specific destination like a queue or topic.
168 Send {
169 /// Destination to send the message to
170 destination: String,
171 /// Optional transaction identifier
172 transaction: Option<String>,
173 /// Optional additional headers to include
174 headers: Option<Vec<(String, String)>>,
175 /// Optional message body
176 body: Option<Vec<u8>>,
177 },
178
179 /// Register to listen to a given destination
180 ///
181 /// Creates a subscription to receive messages from a specific destination.
182 Subscribe {
183 /// Destination to subscribe to
184 destination: String,
185 /// Client-generated subscription identifier
186 id: String,
187 /// Optional acknowledgment mode
188 ack: Option<AckMode>,
189 },
190
191 /// Remove an existing subscription
192 ///
193 /// Cancels a subscription so the client stops receiving messages from it.
194 Unsubscribe {
195 /// Subscription identifier to unsubscribe from
196 id: String,
197 },
198
199 /// Acknowledge consumption of a message from a subscription
200 ///
201 /// Used with 'client' or 'client-individual' acknowledgment modes to
202 /// confirm successful processing of a message.
203 Ack {
204 /// Message or subscription identifier to acknowledge
205 id: String,
206 /// Optional transaction identifier
207 transaction: Option<String>,
208 },
209
210 /// Notify the server that the client did not consume the message
211 ///
212 /// Used with 'client' or 'client-individual' acknowledgment modes to
213 /// indicate that a message could not be processed successfully.
214 Nack {
215 /// Message or subscription identifier to negative-acknowledge
216 id: String,
217 /// Optional transaction identifier
218 transaction: Option<String>,
219 },
220
221 /// Start a transaction
222 ///
223 /// Begins a new transaction that can group multiple STOMP operations.
224 Begin {
225 /// Client-generated transaction identifier
226 transaction: String,
227 },
228
229 /// Commit an in-progress transaction
230 ///
231 /// Completes a transaction and applies all its operations.
232 Commit {
233 /// Transaction identifier to commit
234 transaction: String,
235 },
236
237 /// Roll back an in-progress transaction
238 ///
239 /// Cancels a transaction and rolls back all its operations.
240 Abort {
241 /// Transaction identifier to abort
242 transaction: String,
243 },
244
245 /// Gracefully disconnect from the server
246 ///
247 /// Cleanly ends the STOMP session. Clients MUST NOT send any more
248 /// frames after the DISCONNECT frame is sent.
249 Disconnect {
250 /// Optional receipt request
251 receipt: Option<String>,
252 },
253}
254
255/// Acknowledgment modes for STOMP subscriptions
256///
257/// Controls how messages should be acknowledged when received through a subscription.
258#[derive(Debug, Clone, Copy)]
259pub enum AckMode {
260 /// Auto acknowledgment (the default if not specified)
261 ///
262 /// The client does not need to send ACK frames; the server will
263 /// assume the client received the message as soon as it is sent.
264 Auto,
265
266 /// Client acknowledgment
267 ///
268 /// The client must send an ACK frame for each message received.
269 /// An ACK acknowledges all messages received so far on the connection.
270 Client,
271
272 /// Client individual acknowledgment
273 ///
274 /// The client must send an ACK frame for each individual message.
275 /// Only the individual message referenced in the ACK is acknowledged.
276 ClientIndividual,
277}
278
279impl Message<ToServer> {
280 /// Convert this message to a low-level Frame
281 ///
282 /// This method converts the high-level Message to the low-level Frame
283 /// representation needed for serialization.
284 fn to_frame<'a>(&'a self) -> Frame<'a> {
285 // Create a frame from the message content
286 let mut frame = self.content.to_frame();
287 // Add any extra headers to the frame
288 frame.add_extra_headers(&self.extra_headers);
289 frame
290 }
291
292 /// Converts the message to a [Frame] and then serializes the frame as bytes. This is useful
293 /// for implementors that need to implement the [Transport](transport::Transport) trait.
294 pub fn into_bytes(self) -> Bytes {
295 let mut bytes_mut = BytesMut::new();
296 let frame = self.to_frame();
297 frame.serialize(&mut bytes_mut);
298 Bytes::from_owner(bytes_mut)
299 }
300
301 /// Convert a Frame into a Message<ToServer>
302 ///
303 /// This internal method handles conversion from the low-level Frame
304 /// representation to the high-level Message representation.
305 #[allow(dead_code)]
306 fn from_frame(frame: Frame) -> anyhow::Result<Message<ToServer>> {
307 frame.to_client_msg()
308 }
309}
310
311/// Implement From<ToServer> for Message<ToServer> to allow easy conversion
312///
313/// This allows ToServer enum variants to be easily converted to a Message
314/// with empty extra_headers, which is a common need when sending messages.
315impl From<ToServer> for Message<ToServer> {
316 /// Convert a ToServer enum into a Message<ToServer>
317 ///
318 /// This creates a Message with the given content and empty extra_headers.
319 fn from(content: ToServer) -> Message<ToServer> {
320 Message {
321 content,
322 extra_headers: vec![],
323 }
324 }
325}