1use amq_protocol::{
2 frame::AMQPFrame,
3 tcp::TcpStream,
4 uri::AMQPUri,
5};
6use mio::{Evented, Poll, PollOpt, Ready, Token};
7use log::{debug, error, trace};
8
9use std::{
10 io,
11 thread::JoinHandle,
12};
13
14use crate::{
15 channel::{Channel, Reply},
16 channels::Channels,
17 confirmation::Confirmation,
18 configuration::Configuration,
19 connection_properties::ConnectionProperties,
20 connection_status::{ConnectionStatus, ConnectionState},
21 error::{Error, ErrorKind},
22 error_handler::ErrorHandler,
23 frames::{Frames, Priority, SendId},
24 io_loop::{IoLoop, IoLoopHandle},
25 registration::Registration,
26 tcp::AMQPUriTcpExt,
27 types::ShortUInt,
28 wait::Wait,
29};
30
31#[derive(Clone, Debug)]
32#[deprecated(note = "use lapin instead")]
33pub struct Connection {
34 configuration: Configuration,
35 status: ConnectionStatus,
36 channels: Channels,
37 registration: Registration,
38 frames: Frames,
39 io_loop: IoLoopHandle,
40 error_handler: ErrorHandler,
41}
42
43impl Default for Connection {
44 fn default() -> Self {
45 let connection = Self {
46 configuration: Configuration::default(),
47 status: ConnectionStatus::default(),
48 channels: Channels::default(),
49 registration: Registration::default(),
50 frames: Frames::default(),
51 io_loop: IoLoopHandle::default(),
52 error_handler: ErrorHandler::default(),
53 };
54
55 connection.channels.create_zero(connection.clone());
56 connection
57 }
58}
59
60impl Evented for Connection {
61 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
62 self.registration.register(poll, token, interest, opts)
63 }
64
65 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
66 self.registration.reregister(poll, token, interest, opts)
67 }
68
69 fn deregister(&self, poll: &Poll) -> io::Result<()> {
70 self.registration.deregister(poll)
71 }
72}
73
74impl Connection {
75 #[deprecated(note = "use lapin instead")]
77 pub fn connect(uri: &str, options: ConnectionProperties) -> Confirmation<Connection> {
78 Connect::connect(uri, options)
79 }
80
81 #[deprecated(note = "use lapin instead")]
83 pub fn connect_uri(uri: AMQPUri, options: ConnectionProperties) -> Confirmation<Connection> {
84 Connect::connect(uri, options)
85 }
86
87 #[deprecated(note = "use lapin instead")]
88 pub fn create_channel(&self) -> Confirmation<Channel> {
89 if !self.status.connected() {
90 return Confirmation::new_error(ErrorKind::InvalidConnectionState(self.status.state()).into());
91 }
92 match self.channels.create(self.clone()) {
93 Ok(channel) => channel.channel_open(),
94 Err(error) => Confirmation::new_error(error),
95 }
96 }
97
98 #[deprecated(note = "use lapin instead")]
102 pub fn run(&self) -> Result<(), Error> {
103 self.io_loop.wait()
104 }
105
106 #[deprecated(note = "use lapin instead")]
107 pub fn on_error<E: Fn() + Send + 'static>(&self, handler: Box<E>) {
108 self.error_handler.set_handler(handler);
109 }
110
111 #[deprecated(note = "use lapin instead")]
112 pub fn configuration(&self) -> &Configuration {
113 &self.configuration
114 }
115
116 #[deprecated(note = "use lapin instead")]
117 pub fn status(&self) -> &ConnectionStatus {
118 &self.status
119 }
120
121 pub(crate) fn flow(&self) -> bool {
122 self.channels.flow()
123 }
124
125 #[deprecated(note = "use lapin instead")]
126 pub fn close(&self, reply_code: ShortUInt, reply_text: &str) -> Confirmation<()> {
127 self.channels.get(0).expect("channel 0").connection_close(reply_code, reply_text, 0, 0)
128 }
129
130 pub(crate) fn set_io_loop(&self, io_loop: JoinHandle<Result<(), Error>>) {
131 self.io_loop.register(io_loop);
132 }
133
134 pub(crate) fn drop_pending_frames(&self) {
135 self.frames.drop_pending();
136 }
137
138 fn connector(options: ConnectionProperties) -> impl FnOnce(TcpStream, AMQPUri) -> Result<(Wait<Connection>, IoLoop<TcpStream>), Error> + 'static {
139 move |stream, uri| {
140 let conn = Connection::default();
141 conn.status.set_vhost(&uri.vhost);
142 if let Some(frame_max) = uri.query.frame_max {
143 conn.configuration.set_frame_max(frame_max);
144 }
145 if let Some(channel_max) = uri.query.channel_max {
146 conn.configuration.set_channel_max(channel_max);
147 }
148 if let Some(heartbeat) = uri.query.heartbeat {
149 conn.configuration.set_heartbeat(heartbeat);
150 }
151 conn.send_frame(0, Priority::CRITICAL, AMQPFrame::ProtocolHeader, None)?;
152 let (wait, wait_handle) = Wait::new();
153 conn.set_state(ConnectionState::SentProtocolHeader(wait_handle, uri.authority.userinfo.into(), options));
154 let io_loop = IoLoop::new(conn.clone(), stream)?;
155 Ok((wait, io_loop))
156 }
157 }
158
159 pub(crate) fn set_state(&self, state: ConnectionState) {
160 self.status.set_state(state);
161 }
162
163 pub(crate) fn block(&self) {
164 self.status.block();
165 }
166
167 pub(crate) fn unblock(&self) {
168 self.status.unblock();
169 }
170
171 fn set_readable(&self) -> Result<(), Error> {
172 trace!("connection set readable");
173 self.registration.set_readiness(Ready::readable()).map_err(ErrorKind::IOError)?;
174 Ok(())
175 }
176
177 pub(crate) fn send_frame(&self, channel_id: u16, priority: Priority, frame: AMQPFrame, expected_reply: Option<Reply>) -> Result<Wait<()>, Error> {
178 trace!("connection send_frame; channel_id={}", channel_id);
179 let wait = self.frames.push(channel_id, priority, frame, expected_reply);
180 self.set_readable()?;
181 Ok(wait)
182 }
183
184 pub(crate) fn next_expected_reply(&self, channel_id: u16) -> Option<Reply> {
185 self.frames.next_expected_reply(channel_id)
186 }
187
188 pub(crate) fn next_frame(&self) -> Option<(SendId, AMQPFrame)> {
192 self.frames.pop(self.flow())
193 }
194
195 pub(crate) fn handle_frame(&self, f: AMQPFrame) -> Result<(), Error> {
197 trace!("will handle frame: {:?}", f);
198 match f {
199 AMQPFrame::ProtocolHeader => {
200 error!("error: the client should not receive a protocol header");
201 self.set_error()?;
202 },
203 AMQPFrame::Method(channel_id, method) => {
204 self.channels.receive_method(channel_id, method)?;
205 },
206 AMQPFrame::Heartbeat(_) => {
207 debug!("received heartbeat from server");
208 },
209 AMQPFrame::Header(channel_id, _, header) => {
210 self.channels.handle_content_header_frame(channel_id, header.body_size, header.properties)?;
211 },
212 AMQPFrame::Body(channel_id, payload) => {
213 self.channels.handle_body_frame(channel_id, payload)?;
214 }
215 };
216 Ok(())
217 }
218
219 pub(crate) fn send_heartbeat(&self) -> Result<(), Error> {
220 self.set_readable()?;
221 self.send_frame(0, Priority::CRITICAL, AMQPFrame::Heartbeat(0), None)?;
222 Ok(())
223 }
224
225 pub(crate) fn requeue_frame(&self, send_id: SendId, frame: AMQPFrame) -> Result<(), Error> {
226 self.set_readable()?;
227 self.frames.retry(send_id, frame);
228 Ok(())
229 }
230
231 pub(crate) fn mark_sent(&self, send_id: SendId) {
232 self.frames.mark_sent(send_id);
233 }
234
235 pub(crate) fn remove_channel(&self, channel_id: u16) -> Result<(), Error> {
236 self.frames.clear_expected_replies(channel_id);
237 self.channels.remove(channel_id)
238 }
239
240 pub(crate) fn set_closing(&self) {
241 self.set_state(ConnectionState::Closing);
242 self.channels.set_closing();
243 }
244
245 pub(crate) fn set_closed(&self) -> Result<(), Error> {
246 self.set_state(ConnectionState::Closed);
247 self.channels.set_closed()
248 }
249
250 pub(crate) fn set_error(&self) -> Result<(), Error> {
251 error!("Connection error");
252 self.set_state(ConnectionState::Error);
253 self.channels.set_error()?;
254 self.error_handler.on_error();
255 Ok(())
256 }
257}
258
259#[deprecated(note = "use lapin instead")]
261pub trait Connect {
262 fn connect(self, options: ConnectionProperties) -> Confirmation<Connection> where Self: Sized {
264 self.connect_raw(options).into()
265 }
266
267 fn connect_raw(self, options: ConnectionProperties) -> Result<Wait<Connection>, Error>;
269}
270
271impl Connect for AMQPUri {
272 fn connect_raw(self, options: ConnectionProperties) -> Result<Wait<Connection>, Error> {
273 let (conn, io_loop) = AMQPUriTcpExt::connect(self, Connection::connector(options)).map_err(ErrorKind::IOError)??;
274 io_loop.run()?;
275 Ok(conn)
276 }
277}
278
279impl Connect for &str {
280 fn connect_raw(self, options: ConnectionProperties) -> Result<Wait<Connection>, Error> {
281 let (conn, io_loop) = AMQPUriTcpExt::connect(self, Connection::connector(options)).map_err(ErrorKind::IOError)??;
282 io_loop.run()?;
283 Ok(conn)
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use env_logger;
290
291 use super::*;
292 use crate::BasicProperties;
293 use crate::channel_status::ChannelState;
294 use crate::consumer::ConsumerSubscriber;
295 use crate::message::Delivery;
296 use crate::types::ShortString;
297 use amq_protocol::protocol::{basic, AMQPClass};
298 use amq_protocol::frame::AMQPContentHeader;
299
300 #[derive(Clone,Debug,PartialEq)]
301 struct DummySubscriber;
302
303 impl ConsumerSubscriber for DummySubscriber {
304 fn new_delivery(&self, _delivery: Delivery) {}
305 fn drop_prefetched_messages(&self) {}
306 fn cancel(&self) {}
307 }
308
309 #[test]
310 fn basic_consume_small_payload() {
311 let _ = env_logger::try_init();
312
313 use crate::consumer::Consumer;
314 use crate::queue::{Queue, QueueState};
315
316 let conn = Connection::default();
318 conn.set_state(ConnectionState::Connected);
319 conn.configuration.set_channel_max(2047);
320 let channel = conn.channels.create(conn.clone()).unwrap();
321 channel.set_state(ChannelState::Connected);
322 let queue_name = ShortString::from("consumed");
323 let mut queue: QueueState = Queue::new(queue_name.clone(), 0, 0).into();
324 let consumer_tag = ShortString::from("consumer-tag");
325 let consumer = Consumer::new(consumer_tag.clone(), false, false, false, Box::new(DummySubscriber));
326 queue.register_consumer(consumer_tag.clone(), consumer);
327 conn.channels.get(channel.id()).map(|c| {
328 c.register_queue(queue);
329 });
330 {
332 let deliver_frame = AMQPFrame::Method(
333 channel.id(),
334 AMQPClass::Basic(
335 basic::AMQPMethod::Deliver(
336 basic::Deliver {
337 consumer_tag: consumer_tag.clone(),
338 delivery_tag: 1,
339 redelivered: false,
340 exchange: "".into(),
341 routing_key: queue_name.clone(),
342 }
343 )
344 )
345 );
346 conn.handle_frame(deliver_frame).unwrap();
347 let channel_state = channel.status().state();
348 let expected_state = ChannelState::WillReceiveContent(
349 Some(queue_name.clone()),
350 Some(consumer_tag.clone())
351 );
352 assert_eq!(channel_state, expected_state);
353 }
354 {
355 let header_frame = AMQPFrame::Header(
356 channel.id(),
357 60,
358 Box::new(AMQPContentHeader {
359 class_id: 60,
360 weight: 0,
361 body_size: 2,
362 properties: BasicProperties::default(),
363 })
364 );
365 conn.handle_frame(header_frame).unwrap();
366 let channel_state = channel.status().state();
367 let expected_state = ChannelState::ReceivingContent(Some(queue_name.clone()), Some(consumer_tag.clone()), 2);
368 assert_eq!(channel_state, expected_state);
369 }
370 {
371 let body_frame = AMQPFrame::Body(channel.id(), "{}".as_bytes().to_vec());
372 conn.handle_frame(body_frame).unwrap();
373 let channel_state = channel.status().state();
374 let expected_state = ChannelState::Connected;
375 assert_eq!(channel_state, expected_state);
376 }
377 }
378
379 #[test]
380 fn basic_consume_empty_payload() {
381 let _ = env_logger::try_init();
382
383 use crate::consumer::Consumer;
384 use crate::queue::{Queue, QueueState};
385
386 let conn = Connection::default();
388 conn.set_state(ConnectionState::Connected);
389 conn.configuration.set_channel_max(2047);
390 let channel = conn.channels.create(conn.clone()).unwrap();
391 channel.set_state(ChannelState::Connected);
392 let queue_name = ShortString::from("consumed");
393 let mut queue: QueueState = Queue::new(queue_name.clone(), 0, 0).into();
394 let consumer_tag = ShortString::from("consumer-tag");
395 let consumer = Consumer::new(consumer_tag.clone(), false, false, false, Box::new(DummySubscriber));
396 queue.register_consumer(consumer_tag.clone(), consumer);
397 conn.channels.get(channel.id()).map(|c| {
398 c.register_queue(queue);
399 });
400 {
402 let deliver_frame = AMQPFrame::Method(
403 channel.id(),
404 AMQPClass::Basic(
405 basic::AMQPMethod::Deliver(
406 basic::Deliver {
407 consumer_tag: consumer_tag.clone(),
408 delivery_tag: 1,
409 redelivered: false,
410 exchange: "".into(),
411 routing_key: queue_name.clone(),
412 }
413 )
414 )
415 );
416 conn.handle_frame(deliver_frame).unwrap();
417 let channel_state = channel.status().state();
418 let expected_state = ChannelState::WillReceiveContent(
419 Some(queue_name.clone()),
420 Some(consumer_tag.clone())
421 );
422 assert_eq!(channel_state, expected_state);
423 }
424 {
425 let header_frame = AMQPFrame::Header(
426 channel.id(),
427 60,
428 Box::new(AMQPContentHeader {
429 class_id: 60,
430 weight: 0,
431 body_size: 0,
432 properties: BasicProperties::default(),
433 })
434 );
435 conn.handle_frame(header_frame).unwrap();
436 let channel_state = channel.status().state();
437 let expected_state = ChannelState::Connected;
438 assert_eq!(channel_state, expected_state);
439 }
440 }
441}