1#![allow(unused_imports, dead_code)]
2
3#[macro_use]
4extern crate derive_more;
5#[macro_use]
6extern crate log;
7
8use std::future::Future;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use actix_utils::oneshot;
14use amqp_codec::protocol::{Disposition, Handle, Milliseconds, Open};
15use bytes::Bytes;
16use bytestring::ByteString;
17use uuid::Uuid;
18
19mod cell;
20pub mod client;
21mod connection;
22mod errors;
23mod hb;
24mod rcvlink;
25pub mod sasl;
26pub mod server;
27mod service;
28mod session;
29mod sndlink;
30
31pub use self::connection::{Connection, ConnectionController};
32pub use self::errors::AmqpTransportError;
33pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder};
34pub use self::session::Session;
35pub use self::sndlink::{SenderLink, SenderLinkBuilder};
36
37pub enum Delivery {
38 Resolved(Result<Disposition, AmqpTransportError>),
39 Pending(oneshot::Receiver<Result<Disposition, AmqpTransportError>>),
40 Gone,
41}
42
43type DeliveryPromise = oneshot::Sender<Result<Disposition, AmqpTransportError>>;
44
45impl Future for Delivery {
46 type Output = Result<Disposition, AmqpTransportError>;
47
48 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
49 if let Delivery::Pending(ref mut receiver) = *self {
50 return match Pin::new(receiver).poll(cx) {
51 Poll::Ready(Ok(r)) => Poll::Ready(r.map(|state| state)),
52 Poll::Pending => Poll::Pending,
53 Poll::Ready(Err(e)) => {
54 trace!("delivery oneshot is gone: {:?}", e);
55 Poll::Ready(Err(AmqpTransportError::Disconnected))
56 }
57 };
58 }
59
60 let old_v = ::std::mem::replace(&mut *self, Delivery::Gone);
61 if let Delivery::Resolved(r) = old_v {
62 return match r {
63 Ok(state) => Poll::Ready(Ok(state)),
64 Err(e) => Poll::Ready(Err(e)),
65 };
66 }
67 panic!("Polling Delivery after it was polled as ready is an error.");
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct Configuration {
74 pub max_frame_size: u32,
75 pub channel_max: usize,
76 pub idle_time_out: Option<Milliseconds>,
77 pub hostname: Option<ByteString>,
78}
79
80impl Default for Configuration {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Configuration {
87 pub fn new() -> Self {
89 Configuration {
90 max_frame_size: std::u16::MAX as u32,
91 channel_max: 1024,
92 idle_time_out: Some(120000),
93 hostname: None,
94 }
95 }
96
97 pub fn channel_max(&mut self, num: u16) -> &mut Self {
103 self.channel_max = num as usize;
104 self
105 }
106
107 pub fn max_frame_size(&mut self, size: u32) -> &mut Self {
111 self.max_frame_size = size;
112 self
113 }
114
115 pub fn get_max_frame_size(&self) -> usize {
117 self.max_frame_size as usize
118 }
119
120 pub fn idle_timeout(&mut self, timeout: u32) -> &mut Self {
124 self.idle_time_out = Some(timeout as Milliseconds);
125 self
126 }
127
128 pub fn hostname(&mut self, hostname: &str) -> &mut Self {
132 self.hostname = Some(ByteString::from(hostname));
133 self
134 }
135
136 pub fn to_open(&self) -> Open {
138 Open {
139 container_id: ByteString::from(Uuid::new_v4().to_simple().to_string()),
140 hostname: self.hostname.clone(),
141 max_frame_size: self.max_frame_size,
142 channel_max: self.channel_max as u16,
143 idle_time_out: self.idle_time_out,
144 outgoing_locales: None,
145 incoming_locales: None,
146 offered_capabilities: None,
147 desired_capabilities: None,
148 properties: None,
149 }
150 }
151
152 pub(crate) fn timeout(&self) -> Option<Duration> {
153 self.idle_time_out
154 .map(|v| Duration::from_millis(((v as f32) * 0.8) as u64))
155 }
156}
157
158impl<'a> From<&'a Open> for Configuration {
159 fn from(open: &'a Open) -> Self {
160 Configuration {
161 max_frame_size: open.max_frame_size,
162 channel_max: open.channel_max as usize,
163 idle_time_out: open.idle_time_out,
164 hostname: open.hostname.clone(),
165 }
166 }
167}