async_stomp/lib.rs
1//! tokio-stomp - A library for asynchronous streaming of STOMP messages
2//!
3//! This library provides an async Rust implementation of the STOMP (Simple/Streaming Text Oriented Messaging Protocol),
4//! built on the tokio stack. It allows for creating STOMP clients that can connect to message brokers,
5//! subscribe to destinations, send messages, and receive messages asynchronously.
6//!
7//! The primary types exposed by this library are:
8//! - `client::Connector` - For establishing connections to STOMP servers
9//! - `client::Subscriber` - For creating subscription messages
10//! - `Message<T>` - For representing STOMP protocol messages
11//! - `ToServer` - Enum of all message types that can be sent to a server
12//! - `FromServer` - Enum of all message types that can be received from a server
13
14use custom_debug_derive::Debug as CustomDebug;
15use frame::Frame;
16
17pub mod client;
18mod frame;
19
20/// Type alias for library results that use anyhow::Error
21pub(crate) type Result<T> = std::result::Result<T, anyhow::Error>;
22
23/// A representation of a STOMP frame
24///
25/// This struct holds the content of a STOMP message (which can be either
26/// a message sent to the server or received from the server) along with
27/// any extra headers that were present in the frame but not required by
28/// the specific message type.
29#[derive(Debug)]
30pub struct Message<T> {
31 /// The message content, which is either a ToServer or FromServer enum
32 pub content: T,
33 /// Headers present in the frame which were not required by the content type
34 /// Stored as raw bytes to avoid unnecessary conversions
35 pub extra_headers: Vec<(Vec<u8>, Vec<u8>)>,
36}
37
38/// Helper function for pretty-printing binary data in debug output
39///
40/// This function converts binary data (Option<Vec<u8>>) to a UTF-8 string
41/// for better readability in debug output.
42fn pretty_bytes(b: &Option<Vec<u8>>, f: &mut std::fmt::Formatter) -> std::fmt::Result {
43 if let Some(v) = b {
44 write!(f, "{}", String::from_utf8_lossy(v))
45 } else {
46 write!(f, "None")
47 }
48}
49
50/// A STOMP message sent from the server
51///
52/// This enum represents all possible message types that can be received from
53/// a STOMP server according to the STOMP 1.2 specification.
54///
55/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
56/// for more detailed information about each message type.
57#[derive(CustomDebug, Clone)]
58pub enum FromServer {
59 /// Connection established acknowledgment
60 ///
61 /// Sent by the server in response to a successful CONNECT/STOMP frame.
62 #[doc(hidden)] // The user shouldn't need to know about this one
63 Connected {
64 /// Protocol version
65 version: String,
66 /// Optional session identifier
67 session: Option<String>,
68 /// Optional server identifier
69 server: Option<String>,
70 /// Optional heartbeat settings
71 heartbeat: Option<String>,
72 },
73
74 /// Message received from a subscription
75 ///
76 /// Conveys messages from subscriptions to the client. Contains the
77 /// message content and associated metadata.
78 Message {
79 /// Destination the message was sent to
80 destination: String,
81 /// Unique message identifier
82 message_id: String,
83 /// Subscription identifier this message relates to
84 subscription: String,
85 /// All headers included in the message
86 headers: Vec<(String, String)>,
87 /// Optional message body
88 #[debug(with = "pretty_bytes")]
89 body: Option<Vec<u8>>,
90 },
91
92 /// Receipt confirmation
93 ///
94 /// Sent from the server to the client once a server has successfully
95 /// processed a client frame that requested a receipt.
96 Receipt {
97 /// Receipt identifier matching the client's receipt request
98 receipt_id: String,
99 },
100
101 /// Error notification
102 ///
103 /// Sent when something goes wrong. After sending an Error,
104 /// the server will close the connection.
105 Error {
106 /// Optional error message
107 message: Option<String>,
108 /// Optional error body with additional details
109 #[debug(with = "pretty_bytes")]
110 body: Option<Vec<u8>>,
111 },
112}
113
114// TODO tidy this lot up with traits?
115impl Message<FromServer> {
116 // fn to_frame<'a>(&'a self) -> Frame<'a> {
117 // unimplemented!()
118 // }
119
120 /// Convert a Frame into a Message<FromServer>
121 ///
122 /// This internal method handles conversion from the low-level Frame
123 /// representation to the high-level Message representation.
124 fn from_frame(frame: Frame) -> Result<Message<FromServer>> {
125 frame.to_server_msg()
126 }
127}
128
129/// A STOMP message sent by the client
130///
131/// This enum represents all possible message types that can be sent to
132/// a STOMP server according to the STOMP 1.2 specification.
133///
134/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
135/// for more detailed information about each message type.
136#[derive(Debug, Clone)]
137pub enum ToServer {
138 /// Connection request message
139 ///
140 /// First frame sent to the server to establish a STOMP session.
141 #[doc(hidden)] // The user shouldn't need to know about this one
142 Connect {
143 /// Protocol versions the client supports
144 accept_version: String,
145 /// Virtual host the client wants to connect to
146 host: String,
147 /// Optional authentication username
148 login: Option<String>,
149 /// Optional authentication password
150 passcode: Option<String>,
151 /// Optional heartbeat configuration (cx, cy)
152 heartbeat: Option<(u32, u32)>,
153 },
154
155 /// Send a message to a destination in the messaging system
156 ///
157 /// Used to send a message to a specific destination like a queue or topic.
158 Send {
159 /// Destination to send the message to
160 destination: String,
161 /// Optional transaction identifier
162 transaction: Option<String>,
163 /// Optional additional headers to include
164 headers: Option<Vec<(String, String)>>,
165 /// Optional message body
166 body: Option<Vec<u8>>,
167 },
168
169 /// Register to listen to a given destination
170 ///
171 /// Creates a subscription to receive messages from a specific destination.
172 Subscribe {
173 /// Destination to subscribe to
174 destination: String,
175 /// Client-generated subscription identifier
176 id: String,
177 /// Optional acknowledgment mode
178 ack: Option<AckMode>,
179 },
180
181 /// Remove an existing subscription
182 ///
183 /// Cancels a subscription so the client stops receiving messages from it.
184 Unsubscribe {
185 /// Subscription identifier to unsubscribe from
186 id: String,
187 },
188
189 /// Acknowledge consumption of a message from a subscription
190 ///
191 /// Used with 'client' or 'client-individual' acknowledgment modes to
192 /// confirm successful processing of a message.
193 Ack {
194 /// Message or subscription identifier to acknowledge
195 id: String,
196 /// Optional transaction identifier
197 transaction: Option<String>,
198 },
199
200 /// Notify the server that the client did not consume the message
201 ///
202 /// Used with 'client' or 'client-individual' acknowledgment modes to
203 /// indicate that a message could not be processed successfully.
204 Nack {
205 /// Message or subscription identifier to negative-acknowledge
206 id: String,
207 /// Optional transaction identifier
208 transaction: Option<String>,
209 },
210
211 /// Start a transaction
212 ///
213 /// Begins a new transaction that can group multiple STOMP operations.
214 Begin {
215 /// Client-generated transaction identifier
216 transaction: String,
217 },
218
219 /// Commit an in-progress transaction
220 ///
221 /// Completes a transaction and applies all its operations.
222 Commit {
223 /// Transaction identifier to commit
224 transaction: String,
225 },
226
227 /// Roll back an in-progress transaction
228 ///
229 /// Cancels a transaction and rolls back all its operations.
230 Abort {
231 /// Transaction identifier to abort
232 transaction: String,
233 },
234
235 /// Gracefully disconnect from the server
236 ///
237 /// Cleanly ends the STOMP session. Clients MUST NOT send any more
238 /// frames after the DISCONNECT frame is sent.
239 Disconnect {
240 /// Optional receipt request
241 receipt: Option<String>,
242 },
243}
244
245/// Acknowledgment modes for STOMP subscriptions
246///
247/// Controls how messages should be acknowledged when received through a subscription.
248#[derive(Debug, Clone, Copy)]
249pub enum AckMode {
250 /// Auto acknowledgment (the default if not specified)
251 ///
252 /// The client does not need to send ACK frames; the server will
253 /// assume the client received the message as soon as it is sent.
254 Auto,
255
256 /// Client acknowledgment
257 ///
258 /// The client must send an ACK frame for each message received.
259 /// An ACK acknowledges all messages received so far on the connection.
260 Client,
261
262 /// Client individual acknowledgment
263 ///
264 /// The client must send an ACK frame for each individual message.
265 /// Only the individual message referenced in the ACK is acknowledged.
266 ClientIndividual,
267}
268
269impl Message<ToServer> {
270 /// Convert this message to a low-level Frame
271 ///
272 /// This method converts the high-level Message to the low-level Frame
273 /// representation needed for serialization.
274 fn to_frame(&self) -> Frame {
275 // Create a frame from the message content
276 let mut frame = self.content.to_frame();
277 // Add any extra headers to the frame
278 frame.add_extra_headers(&self.extra_headers);
279 frame
280 }
281
282 /// Convert a Frame into a Message<ToServer>
283 ///
284 /// This internal method handles conversion from the low-level Frame
285 /// representation to the high-level Message representation.
286 #[allow(dead_code)]
287 fn from_frame(frame: Frame) -> Result<Message<ToServer>> {
288 frame.to_client_msg()
289 }
290}
291
292/// Implement From<ToServer> for Message<ToServer> to allow easy conversion
293///
294/// This allows ToServer enum variants to be easily converted to a Message
295/// with empty extra_headers, which is a common need when sending messages.
296impl From<ToServer> for Message<ToServer> {
297 /// Convert a ToServer enum into a Message<ToServer>
298 ///
299 /// This creates a Message with the given content and empty extra_headers.
300 fn from(content: ToServer) -> Message<ToServer> {
301 Message {
302 content,
303 extra_headers: vec![],
304 }
305 }
306}