stomp_agnostic/
lib.rs

1//! # STOMP Agnostic
2//!
3//! `stomp-agnostic` - A transport agnostic library for handling of STOMP messages
4//!
5//! This library exposes STOMP functionality through the [StompHandle] type.
6//! The `StompHandle` needs an implementation of [Transport].
7//!
8//! `stomp-agnostic` is both transport agnostic, and async agnostic.
9//!
10//! # (Non-) Performance
11//! This crate does not have a specific focus on performance.
12//!
13//! # Transport agnostic
14//! Other STOMP libraries, like [async-stomp](https://github.com/snaggen/async-stomp),
15//! [wstomp](https://crates.io/crates/wstomp), etc. focus on one, or a few, specific transport
16//! methods such as TCP or WebSockets. This crate on the other hand, exposes a trait [Transport](transport::Transport)
17//! and the implementor is responsible for the transport. This makes this crate compatible with
18//! e.g. [tokio-tungstenite](https://crates.io/crates/tokio-tungstenite), but you have to implement
19//! the `Transport` trait yourself, there is nothing implemented for `tokio-tungstenite` out-of-the box.
20//!
21//! # Async agnostic
22//! This crate does not depend on a specific async stack. Bring your own.
23
24use bytes::{Bytes, BytesMut};
25use custom_debug_derive::Debug as CustomDebug;
26use frame::Frame;
27pub use handle::StompHandle;
28pub use transport::{ReadError, ReadResponse, Response, Transport, WriteError};
29
30mod frame;
31mod handle;
32mod transport;
33
34/// A representation of a STOMP frame
35///
36/// This struct holds the content of a STOMP message (which can be either
37/// a message sent to the server or received from the server) along with
38/// any extra headers that were present in the frame but not required by
39/// the specific message type.
40#[derive(Debug)]
41pub struct Message<T> {
42    /// The message content, which is either a ToServer or FromServer enum
43    pub content: T,
44    /// Headers present in the frame which were not required by the content type
45    /// Stored as raw bytes to avoid unnecessary conversions
46    pub extra_headers: Vec<(Vec<u8>, Vec<u8>)>,
47}
48
49/// Helper function for pretty-printing binary data in debug output
50///
51/// This function converts binary data (Option<Vec<u8>>) to a UTF-8 string
52/// for better readability in debug output.
53fn pretty_bytes(b: &Option<Vec<u8>>, f: &mut std::fmt::Formatter) -> std::fmt::Result {
54    if let Some(v) = b {
55        write!(f, "{}", String::from_utf8_lossy(v))
56    } else {
57        write!(f, "None")
58    }
59}
60
61/// A STOMP message sent from the server
62///
63/// This enum represents all possible message types that can be received from
64/// a STOMP server according to the STOMP 1.2 specification.
65///
66/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
67/// for more detailed information about each message type.
68#[derive(CustomDebug, Clone)]
69pub enum FromServer {
70    /// Connection established acknowledgment
71    ///
72    /// Sent by the server in response to a successful CONNECT/STOMP frame.
73    #[doc(hidden)] // The user shouldn't need to know about this one
74    Connected {
75        /// Protocol version
76        version: String,
77        /// Optional session identifier
78        session: Option<String>,
79        /// Optional server identifier
80        server: Option<String>,
81        /// Optional heartbeat settings
82        heartbeat: Option<String>,
83    },
84
85    /// Message received from a subscription
86    ///
87    /// Conveys messages from subscriptions to the client. Contains the
88    /// message content and associated metadata.
89    Message {
90        /// Destination the message was sent to
91        destination: String,
92        /// Unique message identifier
93        message_id: String,
94        /// Subscription identifier this message relates to
95        subscription: String,
96        /// All headers included in the message
97        headers: Vec<(String, String)>,
98        /// Optional message body
99        #[debug(with = "pretty_bytes")]
100        body: Option<Vec<u8>>,
101    },
102
103    /// Receipt confirmation
104    ///
105    /// Sent from the server to the client once a server has successfully
106    /// processed a client frame that requested a receipt.
107    Receipt {
108        /// Receipt identifier matching the client's receipt request
109        receipt_id: String,
110    },
111
112    /// Error notification
113    ///
114    /// Sent when something goes wrong. After sending an Error,
115    /// the server will close the connection.
116    Error {
117        /// Optional error message
118        message: Option<String>,
119        /// Optional error body with additional details
120        #[debug(with = "pretty_bytes")]
121        body: Option<Vec<u8>>,
122    },
123}
124
125// TODO tidy this lot up with traits?
126impl Message<FromServer> {
127    // fn to_frame<'a>(&'a self) -> Frame<'a> {
128    //     unimplemented!()
129    // }
130
131    /// Convert a Frame into a Message<FromServer>
132    ///
133    /// This internal method handles conversion from the low-level Frame
134    /// representation to the high-level Message representation.
135    fn from_frame(frame: Frame) -> anyhow::Result<Message<FromServer>> {
136        frame.to_server_msg()
137    }
138}
139
140/// A STOMP message sent by the client
141///
142/// This enum represents all possible message types that can be sent to
143/// a STOMP server according to the STOMP 1.2 specification.
144///
145/// See the [STOMP 1.2 Specification](https://stomp.github.io/stomp-specification-1.2.html)
146/// for more detailed information about each message type.
147#[derive(Debug, Clone)]
148pub enum ToServer {
149    /// Connection request message
150    ///
151    /// First frame sent to the server to establish a STOMP session.
152    Connect {
153        /// Protocol versions the client supports
154        accept_version: String,
155        /// Virtual host the client wants to connect to
156        host: String,
157        /// Optional authentication username
158        login: Option<String>,
159        /// Optional authentication password
160        passcode: Option<String>,
161        /// Optional heartbeat configuration (cx, cy)
162        heartbeat: Option<(u32, u32)>,
163    },
164
165    /// Send a message to a destination in the messaging system
166    ///
167    /// Used to send a message to a specific destination like a queue or topic.
168    Send {
169        /// Destination to send the message to
170        destination: String,
171        /// Optional transaction identifier
172        transaction: Option<String>,
173        /// Optional additional headers to include
174        headers: Option<Vec<(String, String)>>,
175        /// Optional message body
176        body: Option<Vec<u8>>,
177    },
178
179    /// Register to listen to a given destination
180    ///
181    /// Creates a subscription to receive messages from a specific destination.
182    Subscribe {
183        /// Destination to subscribe to
184        destination: String,
185        /// Client-generated subscription identifier
186        id: String,
187        /// Optional acknowledgment mode
188        ack: Option<AckMode>,
189    },
190
191    /// Remove an existing subscription
192    ///
193    /// Cancels a subscription so the client stops receiving messages from it.
194    Unsubscribe {
195        /// Subscription identifier to unsubscribe from
196        id: String,
197    },
198
199    /// Acknowledge consumption of a message from a subscription
200    ///
201    /// Used with 'client' or 'client-individual' acknowledgment modes to
202    /// confirm successful processing of a message.
203    Ack {
204        /// Message or subscription identifier to acknowledge
205        id: String,
206        /// Optional transaction identifier
207        transaction: Option<String>,
208    },
209
210    /// Notify the server that the client did not consume the message
211    ///
212    /// Used with 'client' or 'client-individual' acknowledgment modes to
213    /// indicate that a message could not be processed successfully.
214    Nack {
215        /// Message or subscription identifier to negative-acknowledge
216        id: String,
217        /// Optional transaction identifier
218        transaction: Option<String>,
219    },
220
221    /// Start a transaction
222    ///
223    /// Begins a new transaction that can group multiple STOMP operations.
224    Begin {
225        /// Client-generated transaction identifier
226        transaction: String,
227    },
228
229    /// Commit an in-progress transaction
230    ///
231    /// Completes a transaction and applies all its operations.
232    Commit {
233        /// Transaction identifier to commit
234        transaction: String,
235    },
236
237    /// Roll back an in-progress transaction
238    ///
239    /// Cancels a transaction and rolls back all its operations.
240    Abort {
241        /// Transaction identifier to abort
242        transaction: String,
243    },
244
245    /// Gracefully disconnect from the server
246    ///
247    /// Cleanly ends the STOMP session. Clients MUST NOT send any more
248    /// frames after the DISCONNECT frame is sent.
249    Disconnect {
250        /// Optional receipt request
251        receipt: Option<String>,
252    },
253}
254
255/// Acknowledgment modes for STOMP subscriptions
256///
257/// Controls how messages should be acknowledged when received through a subscription.
258#[derive(Debug, Clone, Copy)]
259pub enum AckMode {
260    /// Auto acknowledgment (the default if not specified)
261    ///
262    /// The client does not need to send ACK frames; the server will
263    /// assume the client received the message as soon as it is sent.
264    Auto,
265
266    /// Client acknowledgment
267    ///
268    /// The client must send an ACK frame for each message received.
269    /// An ACK acknowledges all messages received so far on the connection.
270    Client,
271
272    /// Client individual acknowledgment
273    ///
274    /// The client must send an ACK frame for each individual message.
275    /// Only the individual message referenced in the ACK is acknowledged.
276    ClientIndividual,
277}
278
279impl Message<ToServer> {
280    /// Convert this message to a low-level Frame
281    ///
282    /// This method converts the high-level Message to the low-level Frame
283    /// representation needed for serialization.
284    fn to_frame<'a>(&'a self) -> Frame<'a> {
285        // Create a frame from the message content
286        let mut frame = self.content.to_frame();
287        // Add any extra headers to the frame
288        frame.add_extra_headers(&self.extra_headers);
289        frame
290    }
291
292    /// Converts the message to a [Frame] and then serializes the frame as bytes. This is useful
293    /// for implementors that need to implement the [Transport](transport::Transport) trait.
294    pub fn into_bytes(self) -> Bytes {
295        let mut bytes_mut = BytesMut::new();
296        let frame = self.to_frame();
297        frame.serialize(&mut bytes_mut);
298        Bytes::from_owner(bytes_mut)
299    }
300
301    /// Convert a Frame into a Message<ToServer>
302    ///
303    /// This internal method handles conversion from the low-level Frame
304    /// representation to the high-level Message representation.
305    #[allow(dead_code)]
306    fn from_frame(frame: Frame) -> anyhow::Result<Message<ToServer>> {
307        frame.to_client_msg()
308    }
309}
310
311/// Implement From<ToServer> for Message<ToServer> to allow easy conversion
312///
313/// This allows ToServer enum variants to be easily converted to a Message
314/// with empty extra_headers, which is a common need when sending messages.
315impl From<ToServer> for Message<ToServer> {
316    /// Convert a ToServer enum into a Message<ToServer>
317    ///
318    /// This creates a Message with the given content and empty extra_headers.
319    fn from(content: ToServer) -> Message<ToServer> {
320        Message {
321            content,
322            extra_headers: vec![],
323        }
324    }
325}