stomp_agnostic/
lib.rs

1//! # STOMP Agnostic
2//!
3//! `stomp-agnostic` - A transport agnostic library for handling of STOMP messages
4//!
5//! This library exposes STOMP functionality through the [StompHandle](client::StompHandle) type.
6//! The `StompHandle` needs an implementation of [Transport](transport::Transport).
7//!
8//! `stomp-agnostic` is both transport agnostic, and async agnostic.
9//!
10//! # (Non-) Performance
11//! This crate does not have a specific focus on performance.
12//!
13//! # Transport agnostic
14//! Other STOMP libraries, like [async-stomp](https://github.com/snaggen/async-stomp),
15//! [wstomp](https://crates.io/crates/wstomp), etc. focus on one, or a few, specific transport
16//! methods such as TCP or WebSockets. This crate on the other hand, exposes a trait [Transport](transport::Transport)
17//! and the implementor is responsible for the transport. This makes this crate compatible with
18//! e.g. [tokio-tungstenite](https://crates.io/crates/tokio-tungstenite), but you have to implement
19//! the `Transport` trait yourself, there is nothing implemented for `tokio-tungstenite` out-of-the box.
20//!
21//! # Async agnostic
22//! This crate does not depend on a specific async stack. Bring your own.
23
24use bytes::{Bytes, BytesMut};
25use custom_debug_derive::Debug as CustomDebug;
26use frame::Frame;
27
28pub mod client;
29mod frame;
30pub mod transport;
31
32/// A representation of a STOMP frame
33///
34/// This struct holds the content of a STOMP message (which can be either
35/// a message sent to the server or received from the server) along with
36/// any extra headers that were present in the frame but not required by
37/// the specific message type.
38#[derive(Debug)]
39pub struct Message<T> {
40    /// The message content, which is either a ToServer or FromServer enum
41    pub content: T,
42    /// Headers present in the frame which were not required by the content type
43    /// Stored as raw bytes to avoid unnecessary conversions
44    pub extra_headers: Vec<(Vec<u8>, Vec<u8>)>,
45}
46
47/// Helper function for pretty-printing binary data in debug output
48///
49/// This function converts binary data (Option<Vec<u8>>) to a UTF-8 string
50/// for better readability in debug output.
51fn pretty_bytes(b: &Option<Vec<u8>>, f: &mut std::fmt::Formatter) -> std::fmt::Result {
52    if let Some(v) = b {
53        write!(f, "{}", String::from_utf8_lossy(v))
54    } else {
55        write!(f, "None")
56    }
57}
58
59/// A STOMP message sent from the server
60///
61/// This enum represents all possible message types that can be received from
62/// a STOMP server according to the STOMP 1.2 specification.
63///
64/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
65/// for more detailed information about each message type.
66#[derive(CustomDebug, Clone)]
67pub enum FromServer {
68    /// Connection established acknowledgment
69    ///
70    /// Sent by the server in response to a successful CONNECT/STOMP frame.
71    #[doc(hidden)] // The user shouldn't need to know about this one
72    Connected {
73        /// Protocol version
74        version: String,
75        /// Optional session identifier
76        session: Option<String>,
77        /// Optional server identifier
78        server: Option<String>,
79        /// Optional heartbeat settings
80        heartbeat: Option<String>,
81    },
82
83    /// Message received from a subscription
84    ///
85    /// Conveys messages from subscriptions to the client. Contains the
86    /// message content and associated metadata.
87    Message {
88        /// Destination the message was sent to
89        destination: String,
90        /// Unique message identifier
91        message_id: String,
92        /// Subscription identifier this message relates to
93        subscription: String,
94        /// All headers included in the message
95        headers: Vec<(String, String)>,
96        /// Optional message body
97        #[debug(with = "pretty_bytes")]
98        body: Option<Vec<u8>>,
99    },
100
101    /// Receipt confirmation
102    ///
103    /// Sent from the server to the client once a server has successfully
104    /// processed a client frame that requested a receipt.
105    Receipt {
106        /// Receipt identifier matching the client's receipt request
107        receipt_id: String,
108    },
109
110    /// Error notification
111    ///
112    /// Sent when something goes wrong. After sending an Error,
113    /// the server will close the connection.
114    Error {
115        /// Optional error message
116        message: Option<String>,
117        /// Optional error body with additional details
118        #[debug(with = "pretty_bytes")]
119        body: Option<Vec<u8>>,
120    },
121}
122
123// TODO tidy this lot up with traits?
124impl Message<FromServer> {
125    // fn to_frame<'a>(&'a self) -> Frame<'a> {
126    //     unimplemented!()
127    // }
128
129    /// Convert a Frame into a Message<FromServer>
130    ///
131    /// This internal method handles conversion from the low-level Frame
132    /// representation to the high-level Message representation.
133    fn from_frame(frame: Frame) -> anyhow::Result<Message<FromServer>> {
134        frame.to_server_msg()
135    }
136}
137
138/// A STOMP message sent by the client
139///
140/// This enum represents all possible message types that can be sent to
141/// a STOMP server according to the STOMP 1.2 specification.
142///
143/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
144/// for more detailed information about each message type.
145#[derive(Debug, Clone)]
146pub enum ToServer {
147    /// Connection request message
148    ///
149    /// First frame sent to the server to establish a STOMP session.
150    Connect {
151        /// Protocol versions the client supports
152        accept_version: String,
153        /// Virtual host the client wants to connect to
154        host: String,
155        /// Optional authentication username
156        login: Option<String>,
157        /// Optional authentication password
158        passcode: Option<String>,
159        /// Optional heartbeat configuration (cx, cy)
160        heartbeat: Option<(u32, u32)>,
161    },
162
163    /// Send a message to a destination in the messaging system
164    ///
165    /// Used to send a message to a specific destination like a queue or topic.
166    Send {
167        /// Destination to send the message to
168        destination: String,
169        /// Optional transaction identifier
170        transaction: Option<String>,
171        /// Optional additional headers to include
172        headers: Option<Vec<(String, String)>>,
173        /// Optional message body
174        body: Option<Vec<u8>>,
175    },
176
177    /// Register to listen to a given destination
178    ///
179    /// Creates a subscription to receive messages from a specific destination.
180    Subscribe {
181        /// Destination to subscribe to
182        destination: String,
183        /// Client-generated subscription identifier
184        id: String,
185        /// Optional acknowledgment mode
186        ack: Option<AckMode>,
187    },
188
189    /// Remove an existing subscription
190    ///
191    /// Cancels a subscription so the client stops receiving messages from it.
192    Unsubscribe {
193        /// Subscription identifier to unsubscribe from
194        id: String,
195    },
196
197    /// Acknowledge consumption of a message from a subscription
198    ///
199    /// Used with 'client' or 'client-individual' acknowledgment modes to
200    /// confirm successful processing of a message.
201    Ack {
202        /// Message or subscription identifier to acknowledge
203        id: String,
204        /// Optional transaction identifier
205        transaction: Option<String>,
206    },
207
208    /// Notify the server that the client did not consume the message
209    ///
210    /// Used with 'client' or 'client-individual' acknowledgment modes to
211    /// indicate that a message could not be processed successfully.
212    Nack {
213        /// Message or subscription identifier to negative-acknowledge
214        id: String,
215        /// Optional transaction identifier
216        transaction: Option<String>,
217    },
218
219    /// Start a transaction
220    ///
221    /// Begins a new transaction that can group multiple STOMP operations.
222    Begin {
223        /// Client-generated transaction identifier
224        transaction: String,
225    },
226
227    /// Commit an in-progress transaction
228    ///
229    /// Completes a transaction and applies all its operations.
230    Commit {
231        /// Transaction identifier to commit
232        transaction: String,
233    },
234
235    /// Roll back an in-progress transaction
236    ///
237    /// Cancels a transaction and rolls back all its operations.
238    Abort {
239        /// Transaction identifier to abort
240        transaction: String,
241    },
242
243    /// Gracefully disconnect from the server
244    ///
245    /// Cleanly ends the STOMP session. Clients MUST NOT send any more
246    /// frames after the DISCONNECT frame is sent.
247    Disconnect {
248        /// Optional receipt request
249        receipt: Option<String>,
250    },
251}
252
253/// Acknowledgment modes for STOMP subscriptions
254///
255/// Controls how messages should be acknowledged when received through a subscription.
256#[derive(Debug, Clone, Copy)]
257pub enum AckMode {
258    /// Auto acknowledgment (the default if not specified)
259    ///
260    /// The client does not need to send ACK frames; the server will
261    /// assume the client received the message as soon as it is sent.
262    Auto,
263
264    /// Client acknowledgment
265    ///
266    /// The client must send an ACK frame for each message received.
267    /// An ACK acknowledges all messages received so far on the connection.
268    Client,
269
270    /// Client individual acknowledgment
271    ///
272    /// The client must send an ACK frame for each individual message.
273    /// Only the individual message referenced in the ACK is acknowledged.
274    ClientIndividual,
275}
276
277impl Message<ToServer> {
278    /// Convert this message to a low-level Frame
279    ///
280    /// This method converts the high-level Message to the low-level Frame
281    /// representation needed for serialization.
282    fn to_frame<'a>(&'a self) -> Frame<'a> {
283        // Create a frame from the message content
284        let mut frame = self.content.to_frame();
285        // Add any extra headers to the frame
286        frame.add_extra_headers(&self.extra_headers);
287        frame
288    }
289
290    /// Converts the message to a [Frame] and then serializes the frame as bytes. This is useful
291    /// for implementors that need to implement the [Transport](transport::Transport) trait.
292    pub fn into_bytes(self) -> Bytes {
293        let mut bytes_mut = BytesMut::new();
294        let frame = self.to_frame();
295        frame.serialize(&mut bytes_mut);
296        Bytes::from_owner(bytes_mut)
297    }
298
299    /// Convert a Frame into a Message<ToServer>
300    ///
301    /// This internal method handles conversion from the low-level Frame
302    /// representation to the high-level Message representation.
303    #[allow(dead_code)]
304    fn from_frame(frame: Frame) -> anyhow::Result<Message<ToServer>> {
305        frame.to_client_msg()
306    }
307}
308
309/// Implement From<ToServer> for Message<ToServer> to allow easy conversion
310///
311/// This allows ToServer enum variants to be easily converted to a Message
312/// with empty extra_headers, which is a common need when sending messages.
313impl From<ToServer> for Message<ToServer> {
314    /// Convert a ToServer enum into a Message<ToServer>
315    ///
316    /// This creates a Message with the given content and empty extra_headers.
317    fn from(content: ToServer) -> Message<ToServer> {
318        Message {
319            content,
320            extra_headers: vec![],
321        }
322    }
323}