amiquip 0.4.2

Pure Rust RabbitMQ client
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
use crate::errors::*;
use crate::{Confirm, ConfirmPayload, Return};
use amq_protocol::frame::AMQPFrame;
use amq_protocol::protocol::basic::AMQPMethod as AmqpBasic;
use amq_protocol::protocol::basic::CancelOk;
use amq_protocol::protocol::channel::AMQPMethod as AmqpChannel;
use amq_protocol::protocol::channel::CloseOk as ChannelCloseOk;
use amq_protocol::protocol::confirm::AMQPMethod as AmqpConfirm;
use amq_protocol::protocol::connection::AMQPMethod as AmqpConnection;
use amq_protocol::protocol::connection::Close as ConnectionClose;
use amq_protocol::protocol::connection::CloseOk as ConnectionCloseOk;
use amq_protocol::protocol::exchange::AMQPMethod as AmqpExchange;
use amq_protocol::protocol::queue::AMQPMethod as AmqpQueue;
use amq_protocol::protocol::{AMQPClass, AMQPHardError};
use crossbeam_channel::{Sender, TrySendError};
use log::{debug, error, trace, warn};
use snafu::OptionExt;
use std::collections::hash_map::Entry;

use super::content_collector::CollectorResult;
use super::{
    Channel0Slot, ChannelMessage, ChannelSlot, ConnectionBlockedNotification, ConsumerMessage,
    Inner,
};

// Clippy warns about ConnectionState::Steady being much larger than the other variants, but we
// expect ConnectionState to be in the Steady case almost all the time.
#[allow(clippy::large_enum_variant)]
pub(super) enum ConnectionState {
    Steady(Channel0Slot),
    ServerClosing(ConnectionClose),
    ClientException,
    ClientClosed,
}

fn slot_remove(inner: &mut Inner, channel_id: u16) -> Result<ChannelSlot> {
    inner
        .chan_slots
        .remove(channel_id)
        .context(ReceivedFrameWithBogusChannelIdSnafu { channel_id })
}

fn slot_get(inner: &mut Inner, channel_id: u16) -> Result<&ChannelSlot> {
    inner
        .chan_slots
        .get(channel_id)
        .context(ReceivedFrameWithBogusChannelIdSnafu { channel_id })
}

fn slot_get_mut(inner: &mut Inner, channel_id: u16) -> Result<&mut ChannelSlot> {
    inner
        .chan_slots
        .get_mut(channel_id)
        .context(ReceivedFrameWithBogusChannelIdSnafu { channel_id })
}

fn send<T: Send + Sync + 'static>(tx: &Sender<T>, item: T) -> Result<()> {
    // See comment in ChannelSlot::new() about the bound size of the control
    // channel. If we're sending to a consumer channel, they are not bounded
    // and will not return Full.
    match tx.try_send(item) {
        Ok(()) => Ok(()),
        Err(TrySendError::Full(_)) => {
            error!("internal error - bounded channel is unexpectedly full");
            FrameUnexpectedSnafu.fail()
        }
        Err(TrySendError::Disconnected(_)) => {
            error!("internal error - channel client dropped without being disconnected");
            EventLoopClientDroppedSnafu.fail()
        }
    }
}

// When we set up a return listener, it's just a crossbeam channel. If it gets dropped,
// we don't want to error; just start discarding returned messages.
fn try_send_return(slot: &mut ChannelSlot, return_: Return) {
    let return_ = if let Some(tx) = &slot.return_handler {
        match tx.try_send(return_) {
            Ok(()) => return,
            Err(TrySendError::Full(return_)) | Err(TrySendError::Disconnected(return_)) => {
                slot.return_handler = None;
                return_
            }
        }
    } else {
        return_
    };
    warn!("discarding returned data {:?}", return_);
}

// When we set up a pub confirm listener, it's just a crossbeam channel. If it gets dropped,
// we don't want to error; just start discarding acks/nacks
fn try_send_confirm(slot: &mut ChannelSlot, confirm: Confirm) {
    let confirm = if let Some(tx) = &slot.pub_confirm_handler {
        match tx.try_send(confirm) {
            Ok(()) => return,
            Err(TrySendError::Full(confirm)) | Err(TrySendError::Disconnected(confirm)) => {
                slot.pub_confirm_handler = None;
                confirm
            }
        }
    } else {
        confirm
    };
    warn!("discarding returned data {:?}", confirm);
}

// When we set up a blocked connection listener, it's just a crossbeam channel. If it gets
// dropped, we don't want to error; just start discarding blocked notifications.
fn try_send_blocked(slot: &mut Channel0Slot, note: ConnectionBlockedNotification) {
    if let Some(tx) = &slot.blocked_tx {
        match tx.try_send(note) {
            Ok(()) => (),
            Err(_) => {
                slot.blocked_tx = None;
            }
        }
    }
}

impl ConnectionState {
    fn client_exception(
        &mut self,
        inner: &mut Inner,
        reply_code: AMQPHardError,
        reply_text: String,
    ) -> Result<()> {
        error!("{} - closing connection", reply_text);
        let close = ConnectionClose {
            reply_code: reply_code.get_id(),
            reply_text,
            class_id: 0,
            method_id: 0,
        };
        inner.push_method(0, AmqpConnection::Close(close));
        inner.seal_writes();
        *self = ConnectionState::ClientException;
        Ok(())
    }

    pub(super) fn process(&mut self, inner: &mut Inner, frame: AMQPFrame) -> Result<()> {
        // bail out if we shouldn't be getting frames
        let ch0_slot = match self {
            ConnectionState::Steady(ch0_slot) => ch0_slot,
            ConnectionState::ClientException => return Ok(()),
            ConnectionState::ServerClosing(_) | ConnectionState::ClientClosed => {
                return FrameUnexpectedSnafu.fail();
            }
        };

        match frame {
            // Server-sent heartbeat
            AMQPFrame::Heartbeat(0) => {
                // nothing to do here; IoLoop already updated heartbeat timer when it
                // received data on the socket
                debug!("received heartbeat");
            }
            // We never expect to see a protocl header (we send it to begin the connection)
            // or a heartbeat on a non-0 channel.
            AMQPFrame::ProtocolHeader | AMQPFrame::Heartbeat(_) => return FrameUnexpectedSnafu.fail(),
            // Server-initiated connection close.
            AMQPFrame::Method(0, AMQPClass::Connection(AmqpConnection::Close(close))) => {
                inner.push_method(0, AmqpConnection::CloseOk(ConnectionCloseOk {}));
                inner.seal_writes();
                let reply_code = close.reply_code;
                let message = close.reply_text.clone();
                let make_err = || Error::ServerClosedConnection {
                    code: reply_code,
                    message: message.clone(),
                };
                *self = ConnectionState::ServerClosing(close);

                for (_, mut slot) in inner.chan_slots.drain() {
                    send(&slot.tx, Err(make_err()))?;
                    for (_, tx) in slot.consumers.drain() {
                        send(&tx, ConsumerMessage::ServerClosedConnection(make_err()))?;
                    }
                }
            }
            // Server ack for client-initiated connection close.
            AMQPFrame::Method(0, AMQPClass::Connection(AmqpConnection::CloseOk(close_ok))) => {
                ch0_slot
                    .common
                    .tx
                    .send(Ok(ChannelMessage::Method(AMQPClass::Connection(
                        AmqpConnection::CloseOk(close_ok),
                    ))))
                    .map_err(|_| Error::EventLoopClientDropped)?;
                *self = ConnectionState::ClientClosed;

                for (_, mut slot) in inner.chan_slots.drain() {
                    send(&slot.tx, Err(Error::ClientClosedConnection))?;
                    for (_, tx) in slot.consumers.drain() {
                        send(&tx, ConsumerMessage::ClientClosedConnection)?;
                    }
                }
            }
            // Server is blocking publishes due to an alarm on its side (e.g., low mem)
            AMQPFrame::Method(0, AMQPClass::Connection(AmqpConnection::Blocked(blocked))) => {
                warn!("server has blocked connection; reason = {}", blocked.reason);
                let note = ConnectionBlockedNotification::Blocked(blocked.reason);
                try_send_blocked(ch0_slot, note);
            }
            // Server has unblocked publishes
            AMQPFrame::Method(0, AMQPClass::Connection(AmqpConnection::Unblocked(_))) => {
                warn!("server has unblocked connection");
                let note = ConnectionBlockedNotification::Unblocked;
                try_send_blocked(ch0_slot, note);
            }
            // Reject all other expected channel 0 methods
            AMQPFrame::Method(0, other) => {
                let text = format!("do not know how to handle channel 0 method {:?}", other);
                self.client_exception(inner, AMQPHardError::NOTIMPLEMENTED, text)?;
            }
            // Reject content frames on channel 0.
            AMQPFrame::Header(0, _, _) | AMQPFrame::Body(0, _) => {
                let text = format!("received illegal channel 0 frame {:?}", frame);
                self.client_exception(inner, AMQPHardError::NOTALLOWED, text)?;
            }
            // Server-initiated channel close.
            AMQPFrame::Method(n, AMQPClass::Channel(AmqpChannel::Close(close))) => {
                warn!("server closing channel {}: {:?}", n, close);
                let mut slot = slot_remove(inner, n)?;
                let make_err = || Error::ServerClosedChannel {
                    channel_id: n,
                    code: close.reply_code,
                    message: close.reply_text.clone(),
                };
                send(&slot.tx, Err(make_err()))?;
                for (_, tx) in slot.consumers.drain() {
                    send(&tx, ConsumerMessage::ServerClosedChannel(make_err()))?;
                }
                inner.push_method(n, AmqpChannel::CloseOk(ChannelCloseOk {}));
            }
            // Server ack for client-initiated channel close.
            AMQPFrame::Method(n, AMQPClass::Channel(AmqpChannel::CloseOk(close_ok))) => {
                // Closing is inherently racy; if we and the server both send a Close at
                // the same time, we might see the server Close and then get a CloseOk, but
                // we will have removed the slot when we got the close. It is therefore not
                // an error to get a CloseOk for a nonexistent slot, since the server is
                // confirming that a channel is gone (and we don't have it anymore anyway).
                if let Ok(mut slot) = slot_remove(inner, n) {
                    send(
                        &slot.tx,
                        Ok(ChannelMessage::Method(AMQPClass::Channel(
                            AmqpChannel::CloseOk(close_ok),
                        ))),
                    )?;
                    for (_, tx) in slot.consumers.drain() {
                        send(&tx, ConsumerMessage::ClientClosedChannel)?;
                    }
                }
            }
            // Server ack for consume request.
            AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::ConsumeOk(consume_ok))) => {
                let consumer_tag = consume_ok.consumer_tag;
                let slot = slot_get_mut(inner, n)?;
                match slot.consumers.entry(consumer_tag.clone()) {
                    Entry::Occupied(_) => {
                        return DuplicateConsumerTagSnafu {
                            channel_id: n,
                            consumer_tag,
                        }
                        .fail();
                    }
                    Entry::Vacant(entry) => {
                        let (tx, rx) = crossbeam_channel::unbounded();
                        entry.insert(tx);
                        send(&slot.tx, Ok(ChannelMessage::ConsumeOk(consumer_tag, rx)))?;
                    }
                }
            }
            // Server-initiated consumer cancel.
            AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::Cancel(cancel))) => {
                let consumer_tag = cancel.consumer_tag;
                let slot = slot_get_mut(inner, n)?;
                if let Some(tx) = slot.consumers.remove(&consumer_tag) {
                    send(&tx, ConsumerMessage::ServerCancelled)?;
                }
                if !cancel.nowait {
                    inner.push_method(n, AmqpBasic::CancelOk(CancelOk { consumer_tag }));
                }
            }
            // Server ack for client-initiated consumer cancel.
            AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::CancelOk(cancel_ok))) => {
                let slot = slot_get_mut(inner, n)?;
                let consumer = slot.consumers.remove(&cancel_ok.consumer_tag);
                send(
                    &slot.tx,
                    Ok(ChannelMessage::Method(AMQPClass::Basic(
                        AmqpBasic::CancelOk(cancel_ok),
                    ))),
                )?;
                if let Some(tx) = consumer {
                    send(&tx, ConsumerMessage::ClientCancelled)?;
                }
            }
            // Server beginning delivery of content to a consumer.
            AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::Deliver(deliver))) => {
                let slot = slot_get_mut(inner, n)?;
                slot.collector.collect_deliver(deliver)?;
            }
            // Server beginning return of undeliverable content.
            AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::Return(return_))) => {
                let slot = slot_get_mut(inner, n)?;
                slot.collector.collect_return(return_)?;
            }
            // Server ack for get (message incoming).
            AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::GetOk(get_ok))) => {
                let slot = slot_get_mut(inner, n)?;
                slot.collector.collect_get(get_ok)?;
            }
            // Server ack for get (no message).
            AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::GetEmpty(_))) => {
                let slot = slot_get(inner, n)?;
                send(&slot.tx, Ok(ChannelMessage::GetOk(Box::new(None))))?;
            }
            // Server ack for publish (publisher confirmation)
            AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::Ack(ack))) => {
                let slot = slot_get_mut(inner, n)?;
                let confirm = ConfirmPayload {
                    delivery_tag: ack.delivery_tag,
                    multiple: ack.multiple,
                };
                try_send_confirm(slot, Confirm::Ack(confirm));
            }
            // Server nack for publish (publisher confirmation)
            AMQPFrame::Method(n, AMQPClass::Basic(AmqpBasic::Nack(nack))) => {
                let slot = slot_get_mut(inner, n)?;
                let confirm = ConfirmPayload {
                    delivery_tag: nack.delivery_tag,
                    multiple: nack.multiple,
                };
                try_send_confirm(slot, Confirm::Nack(confirm));
            }
            // Generic ack messages we send back to the caller.
            AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::QosOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::RecoverOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Channel(AmqpChannel::OpenOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Confirm(AmqpConfirm::SelectOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::DeclareOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::DeleteOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::BindOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::UnbindOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::DeclareOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::DeleteOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::BindOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::PurgeOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::UnbindOk(_))) => {
                let slot = slot_get(inner, n)?;
                trace!(
                    "trying to send method to client for channel {}: {:?}",
                    n,
                    method
                );
                send(&slot.tx, Ok(ChannelMessage::Method(method)))?;
            }
            // Methods we do not handle
            AMQPFrame::Method(n, method @ AMQPClass::Access(_))
            | AMQPFrame::Method(n, method @ AMQPClass::Channel(AmqpChannel::Flow(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Channel(AmqpChannel::FlowOk(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Tx(_)) => {
                let text = format!(
                    "do not know how to handle channel {} method {:?}",
                    n, method
                );
                self.client_exception(inner, AMQPHardError::NOTIMPLEMENTED, text)?;
            }
            // Methods that are illegal coming from the server
            AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Qos(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Consume(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Get(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Publish(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Recover(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::RecoverAsync(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Basic(AmqpBasic::Reject(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Channel(AmqpChannel::Open(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Confirm(AmqpConfirm::Select(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Connection(_))
            | AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::Declare(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::Delete(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::Bind(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Exchange(AmqpExchange::Unbind(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::Declare(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::Delete(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::Bind(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::Purge(_)))
            | AMQPFrame::Method(n, method @ AMQPClass::Queue(AmqpQueue::Unbind(_))) => {
                let text = format!("illegal channel {} method {:?}", n, method);
                self.client_exception(inner, AMQPHardError::NOTALLOWED, text)?;
            }
            // Server sending content header as part of a deliver.
            AMQPFrame::Header(n, _, header) => {
                let slot = slot_get_mut(inner, n)?;
                if let Some(collected) = slot.collector.collect_header(*header)? {
                    match collected {
                        CollectorResult::Delivery((consumer_tag, delivery)) => {
                            let tx =
                                slot.consumers
                                    .get(&consumer_tag)
                                    .context(UnknownConsumerTagSnafu {
                                        channel_id: n,
                                        consumer_tag,
                                    })?;
                            send(tx, ConsumerMessage::Delivery(delivery))?;
                        }
                        CollectorResult::Return(return_) => {
                            try_send_return(slot, return_);
                        }
                        CollectorResult::Get(get) => {
                            send(&slot.tx, Ok(ChannelMessage::GetOk(Box::new(Some(get)))))?;
                        }
                    }
                }
            }
            // Server sending content body as part of a deliver.
            AMQPFrame::Body(n, body) => {
                let slot = slot_get_mut(inner, n)?;
                if let Some(collected) = slot.collector.collect_body(body)? {
                    match collected {
                        CollectorResult::Delivery((consumer_tag, delivery)) => {
                            let tx =
                                slot.consumers
                                    .get(&consumer_tag)
                                    .context(UnknownConsumerTagSnafu {
                                        channel_id: n,
                                        consumer_tag,
                                    })?;
                            send(tx, ConsumerMessage::Delivery(delivery))?;
                        }
                        CollectorResult::Return(return_) => {
                            try_send_return(slot, return_);
                        }
                        CollectorResult::Get(get) => {
                            send(&slot.tx, Ok(ChannelMessage::GetOk(Box::new(Some(get)))))?;
                        }
                    }
                }
            }
        }
        Ok(())
    }
}