async_stomp/
lib.rs

1//! tokio-stomp - A library for asynchronous streaming of STOMP messages
2
3use custom_debug_derive::Debug as CustomDebug;
4use frame::Frame;
5
6pub mod client;
7mod frame;
8
9pub(crate) type Result<T> = std::result::Result<T, anyhow::Error>;
10
11/// A representation of a STOMP frame
12#[derive(Debug)]
13pub struct Message<T> {
14    /// The message content
15    pub content: T,
16    /// Headers present in the frame which were not required by the content
17    pub extra_headers: Vec<(Vec<u8>, Vec<u8>)>,
18}
19
20fn pretty_bytes(b: &Option<Vec<u8>>, f: &mut std::fmt::Formatter) -> std::fmt::Result {
21    if let Some(v) = b {
22        write!(f, "{}", String::from_utf8_lossy(v))
23    } else {
24        write!(f, "None")
25    }
26}
27
28/// A STOMP message sent from the server
29/// See the [Spec](https://stomp.github.io/stomp-specification-1.2.html) for more information
30#[derive(CustomDebug, Clone)]
31pub enum FromServer {
32    #[doc(hidden)] // The user shouldn't need to know about this one
33    Connected {
34        version: String,
35        session: Option<String>,
36        server: Option<String>,
37        heartbeat: Option<String>,
38    },
39    /// Conveys messages from subscriptions to the client
40    Message {
41        destination: String,
42        message_id: String,
43        subscription: String,
44        headers: Vec<(String, String)>,
45        #[debug(with = "pretty_bytes")]
46        body: Option<Vec<u8>>,
47    },
48    /// Sent from the server to the client once a server has successfully
49    /// processed a client frame that requests a receipt
50    Receipt { receipt_id: String },
51    /// Something went wrong. After sending an Error, the server will close the connection
52    Error {
53        message: Option<String>,
54        #[debug(with = "pretty_bytes")]
55        body: Option<Vec<u8>>,
56    },
57}
58
59// TODO tidy this lot up with traits?
60impl Message<FromServer> {
61    // fn to_frame<'a>(&'a self) -> Frame<'a> {
62    //     unimplemented!()
63    // }
64
65    // TODO make this undead
66    fn from_frame(frame: Frame) -> Result<Message<FromServer>> {
67        frame.to_server_msg()
68    }
69}
70
71/// A STOMP message sent by the client.
72/// See the [Spec](https://stomp.github.io/stomp-specification-1.2.html) for more information
73#[derive(Debug, Clone)]
74pub enum ToServer {
75    #[doc(hidden)] // The user shouldn't need to know about this one
76    Connect {
77        accept_version: String,
78        host: String,
79        login: Option<String>,
80        passcode: Option<String>,
81        heartbeat: Option<(u32, u32)>,
82    },
83    /// Send a message to a destination in the messaging system
84    Send {
85        destination: String,
86        transaction: Option<String>,
87        headers: Option<Vec<(String, String)>>,
88        body: Option<Vec<u8>>,
89    },
90    /// Register to listen to a given destination
91    Subscribe {
92        destination: String,
93        id: String,
94        ack: Option<AckMode>,
95    },
96    /// Remove an existing subscription
97    Unsubscribe { id: String },
98    /// Acknowledge consumption of a message from a subscription using
99    /// 'client' or 'client-individual' acknowledgment.
100    Ack {
101        // TODO ack and nack should be automatic?
102        id: String,
103        transaction: Option<String>,
104    },
105    /// Notify the server that the client did not consume the message
106    Nack {
107        id: String,
108        transaction: Option<String>,
109    },
110    /// Start a transaction
111    Begin { transaction: String },
112    /// Commit an in-progress transaction
113    Commit { transaction: String },
114    /// Roll back an in-progress transaction
115    Abort { transaction: String },
116    /// Gracefully disconnect from the server
117    /// Clients MUST NOT send any more frames after the DISCONNECT frame is sent.
118    Disconnect { receipt: Option<String> },
119}
120
121#[derive(Debug, Clone, Copy)]
122pub enum AckMode {
123    Auto,
124    Client,
125    ClientIndividual,
126}
127
128impl Message<ToServer> {
129    fn to_frame(&self) -> Frame {
130        let mut frame = self.content.to_frame();
131        frame.add_extra_headers(&self.extra_headers);
132        frame
133    }
134    #[allow(dead_code)]
135    fn from_frame(frame: Frame) -> Result<Message<ToServer>> {
136        frame.to_client_msg()
137    }
138}
139
140impl From<ToServer> for Message<ToServer> {
141    fn from(content: ToServer) -> Message<ToServer> {
142        Message {
143            content,
144            extra_headers: vec![],
145        }
146    }
147}