actix_amqp/
lib.rs

1#![allow(unused_imports, dead_code)]
2
3#[macro_use]
4extern crate derive_more;
5#[macro_use]
6extern crate log;
7
8use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use actix_utils::oneshot;
14use amqp_codec::protocol::{Disposition, Handle, Milliseconds, Open};
15use bytes::Bytes;
16use bytestring::ByteString;
17use uuid::Uuid;
18
19mod cell;
20pub mod client;
21mod connection;
22mod errors;
23mod hb;
24mod rcvlink;
25pub mod sasl;
26pub mod server;
27mod service;
28mod session;
29mod sndlink;
30
31pub use self::connection::{Connection, ConnectionController};
32pub use self::errors::AmqpTransportError;
33pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder};
34pub use self::session::Session;
35pub use self::sndlink::{SenderLink, SenderLinkBuilder};
36
37pub enum Delivery {
38    Resolved(Result<Disposition, AmqpTransportError>),
39    Pending(oneshot::Receiver<Result<Disposition, AmqpTransportError>>),
40    Gone,
41}
42
43type DeliveryPromise = oneshot::Sender<Result<Disposition, AmqpTransportError>>;
44
45impl Future for Delivery {
46    type Output = Result<Disposition, AmqpTransportError>;
47
48    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
49        if let Delivery::Pending(ref mut receiver) = *self {
50            return match Pin::new(receiver).poll(cx) {
51                Poll::Ready(Ok(r)) => Poll::Ready(r.map(|state| state)),
52                Poll::Pending => Poll::Pending,
53                Poll::Ready(Err(e)) => {
54                    trace!("delivery oneshot is gone: {:?}", e);
55                    Poll::Ready(Err(AmqpTransportError::Disconnected))
56                }
57            };
58        }
59
60        let old_v = ::std::mem::replace(&mut *self, Delivery::Gone);
61        if let Delivery::Resolved(r) = old_v {
62            return match r {
63                Ok(state) => Poll::Ready(Ok(state)),
64                Err(e) => Poll::Ready(Err(e)),
65            };
66        }
67        panic!("Polling Delivery after it was polled as ready is an error.");
68    }
69}
70
71/// Amqp1 transport configuration.
72#[derive(Debug, Clone)]
73pub struct Configuration {
74    pub max_frame_size: u32,
75    pub channel_max: usize,
76    pub idle_time_out: Option<Milliseconds>,
77    pub hostname: Option<ByteString>,
78}
79
80impl Default for Configuration {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl Configuration {
87    /// Create connection configuration.
88    pub fn new() -> Self {
89        Configuration {
90            max_frame_size: std::u16::MAX as u32,
91            channel_max: 1024,
92            idle_time_out: Some(120000),
93            hostname: None,
94        }
95    }
96
97    /// The channel-max value is the highest channel number that
98    /// may be used on the Connection. This value plus one is the maximum
99    /// number of Sessions that can be simultaneously active on the Connection.
100    ///
101    /// By default channel max value is set to 1024
102    pub fn channel_max(&mut self, num: u16) -> &mut Self {
103        self.channel_max = num as usize;
104        self
105    }
106
107    /// Set max frame size for the connection.
108    ///
109    /// By default max size is set to 64kb
110    pub fn max_frame_size(&mut self, size: u32) -> &mut Self {
111        self.max_frame_size = size;
112        self
113    }
114
115    /// Get max frame size for the connection.
116    pub fn get_max_frame_size(&self) -> usize {
117        self.max_frame_size as usize
118    }
119
120    /// Set idle time-out for the connection in milliseconds
121    ///
122    /// By default idle time-out is set to 120000 milliseconds
123    pub fn idle_timeout(&mut self, timeout: u32) -> &mut Self {
124        self.idle_time_out = Some(timeout as Milliseconds);
125        self
126    }
127
128    /// Set connection hostname
129    ///
130    /// Hostname is not set by default
131    pub fn hostname(&mut self, hostname: &str) -> &mut Self {
132        self.hostname = Some(ByteString::from(hostname));
133        self
134    }
135
136    /// Create `Open` performative for this configuration.
137    pub fn to_open(&self) -> Open {
138        Open {
139            container_id: ByteString::from(Uuid::new_v4().to_simple().to_string()),
140            hostname: self.hostname.clone(),
141            max_frame_size: self.max_frame_size,
142            channel_max: self.channel_max as u16,
143            idle_time_out: self.idle_time_out,
144            outgoing_locales: None,
145            incoming_locales: None,
146            offered_capabilities: None,
147            desired_capabilities: None,
148            properties: None,
149        }
150    }
151
152    pub(crate) fn timeout(&self) -> Option<Duration> {
153        self.idle_time_out
154            .map(|v| Duration::from_millis(((v as f32) * 0.8) as u64))
155    }
156}
157
158impl<'a> From<&'a Open> for Configuration {
159    fn from(open: &'a Open) -> Self {
160        Configuration {
161            max_frame_size: open.max_frame_size,
162            channel_max: open.channel_max as usize,
163            idle_time_out: open.idle_time_out,
164            hostname: open.hostname.clone(),
165        }
166    }
167}