1mod handler;
2mod interceptor;
3mod internal;
4
5use crate::client::internal::InternalClient;
6use crate::protocol::frame::{Abort, Ack, Begin, Commit, Nack, Send, Subscribe};
7use crate::protocol::{Frame, ServerCommand};
8use std::error::Error;
9use std::fmt::{Display, Formatter};
10use std::sync::Arc;
11use tokio::sync::mpsc::Sender;
12use uuid::Uuid;
13
14type ReceiptId = String;
15
16pub struct Transaction {
17 transaction_id: String,
18 internal_client: Arc<InternalClient>,
19}
20
21impl Transaction {
22 fn new(transaction_id: String, internal_client: Arc<InternalClient>) -> Self {
23 Self {
24 transaction_id,
25 internal_client,
26 }
27 }
28
29 pub async fn send(&self, send: Send) -> Result<(), Box<dyn Error>> {
30 self.internal_client
31 .send(send.header("transaction", self.transaction_id.clone()))
32 .await
33 }
34
35 pub async fn ack(&self, ack: Ack) -> Result<(), Box<dyn Error>> {
36 self.internal_client
37 .ack(ack.transaction(self.transaction_id.clone()))
38 .await
39 }
40
41 pub async fn nack(&self, nack: Nack) -> Result<(), Box<dyn Error>> {
42 self.internal_client
43 .nack(nack.transaction(self.transaction_id.clone()))
44 .await
45 }
46
47 pub async fn commit(&self) -> Result<(), Box<dyn Error>> {
48 self.internal_client
49 .emit(
50 Commit::new(self.transaction_id.clone())
51 .receipt(Uuid::new_v4().to_string())
52 .into(),
53 )
54 .await
55 }
56
57 pub async fn abort(&self) -> Result<(), Box<dyn Error>> {
58 self.internal_client
59 .emit(
60 Abort::new(self.transaction_id.clone())
61 .receipt(Uuid::new_v4().to_string())
62 .into(),
63 )
64 .await
65 }
66}
67
68pub struct Client {
69 internal_client: Arc<InternalClient>,
70}
71
72pub struct ClientBuilder {
73 host: String,
74 heartbeat: Option<(u32, u32)>,
75}
76
77impl ClientBuilder {
78 pub fn new<A: Into<String>>(host: A) -> Self {
79 Self {
80 host: host.into(),
81 heartbeat: None,
82 }
83 }
84
85 pub fn heartbeat(mut self, client_interval: u32, server_interval: u32) -> Self {
86 self.heartbeat = Some((client_interval, server_interval));
87
88 self
89 }
90}
91
92#[derive(Debug)]
93pub enum ClientError {
94 ReceiptTimeout(String),
95 Nack(String),
96 ConnectionError(Option<Box<dyn Error>>),
97}
98
99impl Display for ClientError {
100 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
101 write!(f, "Client error")
102 }
103}
104
105impl Error for ClientError {}
106
107impl Client {
108 pub async fn connect(builder: ClientBuilder) -> Result<Self, Box<dyn Error>> {
109 let internal_client = InternalClient::connect(builder).await?;
110
111 Ok(Self {
112 internal_client: Arc::new(internal_client),
113 })
114 }
115
116 pub async fn subscribe(
117 &self,
118 subscribe: Subscribe,
119 sender: Sender<Frame<ServerCommand>>,
120 ) -> Result<(), Box<dyn Error>> {
121 self.internal_client.subscribe(subscribe, sender).await
122 }
123
124 pub async fn send(&self, send: Send) -> Result<(), Box<dyn Error>> {
125 self.internal_client.send(send).await
126 }
127
128 pub async fn ack(&self, ack: Ack) -> Result<(), Box<dyn Error>> {
129 self.internal_client.ack(ack).await
130 }
131
132 pub async fn nack(&self, nack: Nack) -> Result<(), Box<dyn Error>> {
133 self.internal_client.nack(nack).await
134 }
135
136 pub async fn begin(&self) -> Result<Transaction, Box<dyn Error>> {
137 let transaction_id = Uuid::new_v4();
138 let receipt_id = Uuid::new_v4();
139
140 self.internal_client
141 .emit(
142 Begin::new(transaction_id.to_string())
143 .receipt(receipt_id.to_string())
144 .into(),
145 )
146 .await?;
147
148 Ok(Transaction {
149 transaction_id: transaction_id.to_string(),
150 internal_client: Arc::clone(&self.internal_client),
151 })
152 }
153}
154
155impl Frame<ServerCommand> {
156 pub fn ack(&self) -> Option<Ack> {
157 if let ServerCommand::Message = self.command {
158 if let Some(ack) = self.headers.get("ack") {
159 return Some(Ack::new(ack));
160 }
161 }
162
163 None
164 }
165
166 pub fn nack(&self) -> Option<Nack> {
167 if let ServerCommand::Message = self.command {
168 if let Some(ack) = self.headers.get("ack") {
169 return Some(Nack::new(ack));
170 }
171 }
172
173 None
174 }
175}