1pub mod client;
18mod client_sm;
19
20use env_logger::Builder;
21use frame::Channel;
22use ironmq_codec::frame;
23use log::{info, error};
24use std::collections::HashMap;
25use std::fmt;
26use std::io::Write;
27use std::time::Instant;
28use tokio::sync::{mpsc, oneshot};
29
30pub type Result<T> = std::result::Result<T, Error>;
33
34pub type Error = Box<dyn std::error::Error + Send + Sync>;
35
36pub struct ClientError {
37 pub code: u16,
38 pub message: String
39}
40
41impl std::fmt::Debug for ClientError {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 f.debug_struct("ClientError")
44 .field("code", &self.code)
45 .field("message", &self.message)
46 .finish()
47 }
48}
49
50impl std::fmt::Display for ClientError {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 f.debug_struct("ClientError")
53 .field("code", &self.code)
54 .field("message", &self.message)
55 .finish()
56 }
57}
58
59impl std::error::Error for ClientError {
60}
61
62#[macro_export]
63macro_rules! client_error {
64 ($code:expr, $message:expr) => {
65 ::std::result::Result::Err(::std::boxed::Box::new($crate::ClientError {
66 code: $code,
67 message: ::std::string::String::from($message)
68 }))
69 }
70}
71
72type ConsumeCallback = Box<dyn Fn(String) -> String + Send + Sync>;
73
74pub struct Connection {
77 server_channel: mpsc::Sender<client::Request>,
78}
79
80pub async fn connect(url: String) -> Result<Box<Connection>> {
91 let connection = client::create_connection(url).await?;
92
93 client::sync_call(&connection, frame::AMQPFrame::Header).await?;
94 client::sync_call(
95 &connection,
96 frame::connection_start_ok("guest", "guest", HashMap::new()),
97 )
98 .await?;
99 client::call(&connection, frame::connection_tune_ok(0)).await?;
100
101 Ok(connection)
102}
103
104pub async fn open(connection: &Connection, virtual_host: String) -> Result<()> {
105 client::sync_call(&connection, frame::connection_open(0, virtual_host)).await?;
106
107 Ok(())
108}
109
110pub async fn close(connection: &Connection) -> Result<()> {
111 client::sync_call(&connection, frame::connection_close(0)).await?;
112
113 Ok(())
114}
115
116pub async fn channel_open(connection: &Connection, channel: u16) -> Result<()> {
117 client::sync_call(&connection, frame::channel_open(channel)).await?;
118
119 Ok(())
120}
121
122pub async fn channel_close(connection: &Connection, channel: Channel) -> Result<()> {
123 let (cid, mid) = frame::split_class_method(frame::CHANNEL_CLOSE);
124
125 client::sync_call(
126 &connection,
127 frame::channel_close(channel, 200, "Normal close", cid, mid)
128 ).await?;
129
130 Ok(())
131}
132
133pub async fn exchange_declare(
134 connection: &Connection,
135 channel: u16,
136 exchange_name: &str,
137 exchange_type: &str,
138) -> Result<()> {
139 let frame = frame::exchange_declare(channel, exchange_name.into(), exchange_type.into());
140
141 client::sync_call(&connection, frame).await?;
142
143 Ok(())
144}
145
146pub async fn queue_bind(
147 connection: &Connection,
148 channel: u16,
149 queue_name: &str,
150 exchange_name: &str,
151 routing_key: &str,
152) -> Result<()> {
153 let frame = frame::queue_bind(channel, queue_name.into(), exchange_name.into(), routing_key.into());
154
155 client::sync_call(&connection, frame).await?;
156
157 Ok(())
158}
159
160pub async fn queue_declare(connection: &Connection, channel: u16, queue_name: &str) -> Result<()> {
161 let frame = frame::queue_declare(channel, queue_name.into());
162
163 client::sync_call(&connection, frame).await?;
164
165 Ok(())
166}
167
168pub async fn basic_consume<'a>(
169 connection: &Connection,
170 channel: u16,
171 queue_name: &'a str,
172 consumer_tag: &'a str,
173 cb: fn(String) -> String,
174) -> Result<()> {
175 let frame = frame::basic_consume(channel, queue_name.into(), consumer_tag.into());
176 let (tx, rx) = oneshot::channel();
177
178 connection.server_channel.send(client::Request {
179 param: client::Param::Consume(frame, Box::new(cb)),
180 response: Some(tx)
181 }).await?;
182
183 match rx.await {
184 Ok(()) => Ok(()),
185 Err(_) => client_error!(0, "Channel recv error"),
186 }
187}
188
189pub async fn basic_publish(
190 connection: &Connection,
191 channel: u16,
192 exchange_name: &str,
193 routing_key: &str,
194 payload: String,
195) -> Result<()> {
196 let frame = frame::basic_publish(channel, exchange_name.into(), routing_key.into());
197
198 connection.server_channel.send(client::Request {
199 param: client::Param::Publish(frame, payload.as_bytes().to_vec()),
200 response: None
201 }).await?;
202
203 Ok(())
204}
205
206#[allow(dead_code)]
207async fn publish_bench(connection: &Connection) -> Result<()> {
208 let now = Instant::now();
209 let mut total = 0u32;
210
211 for _ in 0..100_000u32 {
212 basic_publish(&connection, 1, "test".into(), "no-key".into(), "Hello, world".into()).await?;
213 total += 1;
214 }
215
216 println!("{}/100,000 publish takes {} us", total, now.elapsed().as_micros());
217
218 Ok(())
219}
220
221