stomp_rs/
client.rs

1mod handler;
2mod interceptor;
3mod internal;
4
5use crate::client::internal::InternalClient;
6use crate::protocol::frame::{Abort, Ack, Begin, Commit, Nack, Send, Subscribe};
7use crate::protocol::{Frame, ServerCommand};
8use std::error::Error;
9use std::fmt::{Display, Formatter};
10use std::sync::Arc;
11use tokio::sync::mpsc::Sender;
12use uuid::Uuid;
13
14type ReceiptId = String;
15
16pub struct Transaction {
17    transaction_id: String,
18    internal_client: Arc<InternalClient>,
19}
20
21impl Transaction {
22    fn new(transaction_id: String, internal_client: Arc<InternalClient>) -> Self {
23        Self {
24            transaction_id,
25            internal_client,
26        }
27    }
28
29    pub async fn send(&self, send: Send) -> Result<(), Box<dyn Error>> {
30        self.internal_client
31            .send(send.header("transaction", self.transaction_id.clone()))
32            .await
33    }
34
35    pub async fn ack(&self, ack: Ack) -> Result<(), Box<dyn Error>> {
36        self.internal_client
37            .ack(ack.transaction(self.transaction_id.clone()))
38            .await
39    }
40
41    pub async fn nack(&self, nack: Nack) -> Result<(), Box<dyn Error>> {
42        self.internal_client
43            .nack(nack.transaction(self.transaction_id.clone()))
44            .await
45    }
46
47    pub async fn commit(&self) -> Result<(), Box<dyn Error>> {
48        self.internal_client
49            .emit(
50                Commit::new(self.transaction_id.clone())
51                    .receipt(Uuid::new_v4().to_string())
52                    .into(),
53            )
54            .await
55    }
56
57    pub async fn abort(&self) -> Result<(), Box<dyn Error>> {
58        self.internal_client
59            .emit(
60                Abort::new(self.transaction_id.clone())
61                    .receipt(Uuid::new_v4().to_string())
62                    .into(),
63            )
64            .await
65    }
66}
67
68pub struct Client {
69    internal_client: Arc<InternalClient>,
70}
71
72pub struct ClientBuilder {
73    host: String,
74    heartbeat: Option<(u32, u32)>,
75}
76
77impl ClientBuilder {
78    pub fn new<A: Into<String>>(host: A) -> Self {
79        Self {
80            host: host.into(),
81            heartbeat: None,
82        }
83    }
84
85    pub fn heartbeat(mut self, client_interval: u32, server_interval: u32) -> Self {
86        self.heartbeat = Some((client_interval, server_interval));
87
88        self
89    }
90}
91
92#[derive(Debug)]
93pub enum ClientError {
94    ReceiptTimeout(String),
95    Nack(String),
96    ConnectionError(Option<Box<dyn Error>>),
97}
98
99impl Display for ClientError {
100    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
101        write!(f, "Client error")
102    }
103}
104
105impl Error for ClientError {}
106
107impl Client {
108    pub async fn connect(builder: ClientBuilder) -> Result<Self, Box<dyn Error>> {
109        let internal_client = InternalClient::connect(builder).await?;
110
111        Ok(Self {
112            internal_client: Arc::new(internal_client),
113        })
114    }
115
116    pub async fn subscribe(
117        &self,
118        subscribe: Subscribe,
119        sender: Sender<Frame<ServerCommand>>,
120    ) -> Result<(), Box<dyn Error>> {
121        self.internal_client.subscribe(subscribe, sender).await
122    }
123
124    pub async fn send(&self, send: Send) -> Result<(), Box<dyn Error>> {
125        self.internal_client.send(send).await
126    }
127
128    pub async fn ack(&self, ack: Ack) -> Result<(), Box<dyn Error>> {
129        self.internal_client.ack(ack).await
130    }
131
132    pub async fn nack(&self, nack: Nack) -> Result<(), Box<dyn Error>> {
133        self.internal_client.nack(nack).await
134    }
135
136    pub async fn begin(&self) -> Result<Transaction, Box<dyn Error>> {
137        let transaction_id = Uuid::new_v4();
138        let receipt_id = Uuid::new_v4();
139
140        self.internal_client
141            .emit(
142                Begin::new(transaction_id.to_string())
143                    .receipt(receipt_id.to_string())
144                    .into(),
145            )
146            .await?;
147
148        Ok(Transaction {
149            transaction_id: transaction_id.to_string(),
150            internal_client: Arc::clone(&self.internal_client),
151        })
152    }
153}
154
155impl Frame<ServerCommand> {
156    pub fn ack(&self) -> Option<Ack> {
157        if let ServerCommand::Message = self.command {
158            if let Some(ack) = self.headers.get("ack") {
159                return Some(Ack::new(ack));
160            }
161        }
162
163        None
164    }
165
166    pub fn nack(&self) -> Option<Nack> {
167        if let ServerCommand::Message = self.command {
168            if let Some(ack) = self.headers.get("ack") {
169                return Some(Nack::new(ack));
170            }
171        }
172
173        None
174    }
175}