ironmq_client/
lib.rs

1//! Client of ironmq.
2//!
3//! Usage
4//! ```no_run
5//! use ironmq_client::*;
6//!
7//! async fn client() -> Result<()> {
8//!     let conn = connect("127.0.0.1:5672".to_string()).await?;
9//!     open(&conn, "/".into()).await?;
10//!     channel_open(&conn, 1).await?;
11//!     basic_publish(&conn, 1, "exchange", "routing", "Hello".into()).await?;
12//!     close(&conn).await?;
13//!
14//!     Ok(())
15//! }
16//! ```
17pub mod client;
18mod client_sm;
19
20use env_logger::Builder;
21use frame::Channel;
22use ironmq_codec::frame;
23use log::{info, error};
24use std::collections::HashMap;
25use std::fmt;
26use std::io::Write;
27use std::time::Instant;
28use tokio::sync::{mpsc, oneshot};
29
30// TODO expose Channel type and put it here!
31
32pub type Result<T> = std::result::Result<T, Error>;
33
34pub type Error = Box<dyn std::error::Error + Send + Sync>;
35
36pub struct ClientError {
37    pub code: u16,
38    pub message: String
39}
40
41impl std::fmt::Debug for ClientError {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        f.debug_struct("ClientError")
44         .field("code", &self.code)
45         .field("message", &self.message)
46         .finish()
47    }
48}
49
50impl std::fmt::Display for ClientError {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        f.debug_struct("ClientError")
53         .field("code", &self.code)
54         .field("message", &self.message)
55         .finish()
56    }
57}
58
59impl std::error::Error for ClientError {
60}
61
62#[macro_export]
63macro_rules! client_error {
64    ($code:expr, $message:expr) => {
65        ::std::result::Result::Err(::std::boxed::Box::new($crate::ClientError {
66            code: $code,
67            message: ::std::string::String::from($message)
68        }))
69    }
70}
71
72type ConsumeCallback = Box<dyn Fn(String) -> String + Send + Sync>;
73
74/// Represents a connection to AMQP server. It is not a trait since async functions in a trait
75/// are not yet supported.
76pub struct Connection {
77    server_channel: mpsc::Sender<client::Request>,
78}
79
80/// Connect to an AMQP server.
81///
82/// This is async code and wait for the Connection.Tune-Ok message.
83///
84/// ```no_run
85/// async fn connect() -> ironmq_client::Result<()> {
86///     let conn = ironmq_client::connect("127.0.0.1:5672".to_string()).await?;
87///     Ok(())
88/// }
89/// ```
90pub async fn connect(url: String) -> Result<Box<Connection>> {
91    let connection = client::create_connection(url).await?;
92
93    client::sync_call(&connection, frame::AMQPFrame::Header).await?;
94    client::sync_call(
95        &connection,
96        frame::connection_start_ok("guest", "guest", HashMap::new()),
97    )
98    .await?;
99    client::call(&connection, frame::connection_tune_ok(0)).await?;
100
101    Ok(connection)
102}
103
104pub async fn open(connection: &Connection, virtual_host: String) -> Result<()> {
105    client::sync_call(&connection, frame::connection_open(0, virtual_host)).await?;
106
107    Ok(())
108}
109
110pub async fn close(connection: &Connection) -> Result<()> {
111    client::sync_call(&connection, frame::connection_close(0)).await?;
112
113    Ok(())
114}
115
116pub async fn channel_open(connection: &Connection, channel: u16) -> Result<()> {
117    client::sync_call(&connection, frame::channel_open(channel)).await?;
118
119    Ok(())
120}
121
122pub async fn channel_close(connection: &Connection, channel: Channel) -> Result<()> {
123    let (cid, mid) = frame::split_class_method(frame::CHANNEL_CLOSE);
124
125    client::sync_call(
126        &connection,
127        frame::channel_close(channel, 200, "Normal close", cid, mid)
128    ).await?;
129
130    Ok(())
131}
132
133pub async fn exchange_declare(
134    connection: &Connection,
135    channel: u16,
136    exchange_name: &str,
137    exchange_type: &str,
138) -> Result<()> {
139    let frame = frame::exchange_declare(channel, exchange_name.into(), exchange_type.into());
140
141    client::sync_call(&connection, frame).await?;
142
143    Ok(())
144}
145
146pub async fn queue_bind(
147    connection: &Connection,
148    channel: u16,
149    queue_name: &str,
150    exchange_name: &str,
151    routing_key: &str,
152) -> Result<()> {
153    let frame = frame::queue_bind(channel, queue_name.into(), exchange_name.into(), routing_key.into());
154
155    client::sync_call(&connection, frame).await?;
156
157    Ok(())
158}
159
160pub async fn queue_declare(connection: &Connection, channel: u16, queue_name: &str) -> Result<()> {
161    let frame = frame::queue_declare(channel, queue_name.into());
162
163    client::sync_call(&connection, frame).await?;
164
165    Ok(())
166}
167
168pub async fn basic_consume<'a>(
169    connection: &Connection,
170    channel: u16,
171    queue_name: &'a str,
172    consumer_tag: &'a str,
173    cb: fn(String) -> String,
174) -> Result<()> {
175    let frame = frame::basic_consume(channel, queue_name.into(), consumer_tag.into());
176    let (tx, rx) = oneshot::channel();
177
178    connection.server_channel.send(client::Request {
179        param: client::Param::Consume(frame, Box::new(cb)),
180        response: Some(tx)
181    }).await?;
182
183    match rx.await {
184        Ok(()) => Ok(()),
185        Err(_) => client_error!(0, "Channel recv error"),
186    }
187}
188
189pub async fn basic_publish(
190    connection: &Connection,
191    channel: u16,
192    exchange_name: &str,
193    routing_key: &str,
194    payload: String,
195) -> Result<()> {
196    let frame = frame::basic_publish(channel, exchange_name.into(), routing_key.into());
197
198    connection.server_channel.send(client::Request {
199        param: client::Param::Publish(frame, payload.as_bytes().to_vec()),
200        response: None
201    }).await?;
202
203    Ok(())
204}
205
206#[allow(dead_code)]
207async fn publish_bench(connection: &Connection) -> Result<()> {
208    let now = Instant::now();
209    let mut total = 0u32;
210
211    for _ in 0..100_000u32 {
212        basic_publish(&connection, 1, "test".into(), "no-key".into(), "Hello, world".into()).await?;
213        total += 1;
214    }
215
216    println!("{}/100,000 publish takes {} us", total, now.elapsed().as_micros());
217
218    Ok(())
219}
220
221//fn consumer_handler(s: String) -> String {
222//    info!("Handling content {}", s);
223//    "".into()
224//}
225//
226//#[tokio::main]
227//pub async fn main() -> Result<()> {
228//    let mut builder = Builder::from_default_env();
229//
230//    builder
231//        .format_timestamp_millis()
232//        .format(|buf, record| {
233//            writeln!(buf, "{} - [{}] {}:{} {}", buf.timestamp_millis(), record.level(),
234//                record.file().unwrap_or_default(), record.line().unwrap_or_default(), record.args())
235//        })
236//        .init();
237//
238//    let exchange = "test";
239//    let queue = "queue-test";
240//    let consumer_tag = "ctag1";
241//
242//    match connect("127.0.0.1:5672".into()).await {
243//        Ok(connection) => {
244//            info!("Connection is opened");
245//            open(&connection, "/".into()).await?;
246//            channel_open(&connection, 1).await?;
247//
248//            exchange_declare(&connection, 1, exchange, "fanout").await?;
249//            queue_declare(&connection, 1, queue).await?;
250//            queue_bind(&connection, 1, queue, exchange, "").await?;
251//
252//            basic_publish(&connection, 1, exchange, "no-key", "Hey man".into()).await?;
253//
254//            basic_consume(&connection, 1, queue, consumer_tag, consumer_handler).await?;
255//
256//            let (_tx, rx) = tokio::sync::oneshot::channel::<()>();
257//            if let Err(e) = rx.await {
258//                error!("Error {}", e)
259//            }
260//
261//            close(&connection).await?
262//        },
263//        Err(e) =>
264//            error!("Error {}", e)
265//    }
266//
267//    Ok(())
268//}