#[cfg(test)]
mod tests;
use crate::client::{channel_error, ChannelError};
use crate::exchange::manager::{self as em, ExchangeManagerSink, UnbindQueueCommand};
use crate::message::{self, Message};
use crate::queue::Queue;
use crate::{logerr, Result};
use log::{error, info, trace};
use metalmq_codec::codec::Frame;
use metalmq_codec::frame;
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::task::Poll;
use std::time::Instant;
use tokio::sync::{mpsc, oneshot};
pub type QueueCommandSink = mpsc::Sender<QueueCommand>;
#[derive(Clone, Debug)]
pub struct Tag {
pub consumer_tag: String,
pub delivery_tag: u64,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum QueueCommand {
PublishMessage(Arc<Message>),
AckMessage {
consumer_tag: String,
delivery_tag: u64,
},
ExchangeBound {
exchange_name: String,
},
ExchangeUnbound {
exchange_name: String,
},
StartConsuming {
conn_id: String,
channel: u16,
consumer_tag: String,
no_ack: bool,
exclusive: bool,
sink: FrameSink,
result: oneshot::Sender<Result<()>>,
},
StartDelivering {
consumer_tag: String,
},
CancelConsuming {
consumer_tag: String,
result: oneshot::Sender<bool>,
},
Purge {
result: oneshot::Sender<Result<u32>>,
},
DeleteQueue {
channel: u16,
if_unused: bool,
if_empty: bool,
exchange_manager: ExchangeManagerSink,
result: oneshot::Sender<Result<u32>>,
},
MessageRejected,
Recover,
}
#[derive(Debug)]
pub enum SendResult {
MessageSent,
//QueueEmpty,
NoConsumer,
ConsumerInvalid(String, Arc<Message>),
}
pub type FrameSink = mpsc::Sender<Frame>;
/// Information about the queue instance
struct QueueState {
queue: Queue,
declaring_connection: String,
/// Messages represents the current message queue. It stores Arc references because we need to
/// keep multiple copies of the same message (send that out, waiting for the ack, resending if
/// there is no ack, etc).
messages: VecDeque<Arc<Message>>,
outbox: Outbox,
candidate_consumers: Vec<Consumer>,
consumers: Vec<Consumer>,
next_consumer: usize,
// TODO we need to store the routing key and headers and so on
bound_exchanges: HashSet<String>,
// TODO message metrics, current, current outgoing, etc...
}
// TODO consumer should have a counter of the in-flight messages.
// In that case we don't have the mpsc buffer full in the client
// because we know what is the size of the message buffer. On the
// other hand, we need to implement a receive message buffer in
// client, so we won't have problem when the client callback
// takes too much time to complete. Even though, right now we
// have only mpsc receiver-based client message processing, in
// case of a message buffer, we can make independent the receive
// of messages on client side, and to call the callback.
struct Consumer {
/// The channel the consumer uses
channel: u16,
/// Consumer tag, identifies the consumer
consumer_tag: String,
/// Consumer doesn't need ack, so we can delete sent-out messages promptly
no_ack: bool,
/// If consumer is exclusive consumer
exclusive: bool,
/// Consumer network socket abstraction
sink: FrameSink,
/// The next delivery tag it needs to send out
delivery_tag_counter: u64,
}
// Message delivery
// pick up a consumer - randomly
// pick up the next message from the queue
// send it via the channel and
// mark it as SentOut
// store the timestamp
// set delivery try = 1
// send multiple messages like 10 in a way - max out flight messages
// if a messages is acked, let us remove from the queue
// since we set up a time, if there is a timeout, we can redeliver the message
// (it would be good to choose a different consumer)
// set delivery try += 1
// if delivery try is greater than 5 before, we can drop the message
// (later we can send it to an alternative queue)
//
// Cancel consume
// All messages which are outflight needs to be redelivered to the
// remaining consumers.
pub async fn start(queue: Queue, declaring_connection: String, commands: &mut mpsc::Receiver<QueueCommand>) {
QueueState {
queue,
declaring_connection,
messages: VecDeque::new(),
outbox: Outbox {
outgoing_messages: vec![],
},
candidate_consumers: vec![],
consumers: vec![],
next_consumer: 0,
bound_exchanges: HashSet::new(),
}
.queue_loop(commands)
.await;
}
pub async fn purge(sink: &mpsc::Sender<QueueCommand>) -> Result<u32> {
let (tx, rx) = oneshot::channel();
sink.send(QueueCommand::Purge { result: tx }).await?;
rx.await?
}
impl QueueState {
pub async fn queue_loop(&mut self, commands: &mut mpsc::Receiver<QueueCommand>) {
// TODO we need to store the delivery tags by consumers
// Also we need to mark a message that it is sent, so we need to wait
// for the ack, until that we cannot send new messages out - or depending
// the consuming yes?
loop {
// TODO here we need to check if there are messages what we can send out,
// if yes but there is no consumer yet, we need to remember that we already
// tried and we don't go into an infinite loop. So in this case we can
// safely go to the other branch doing blocking wait.
// If we get a message we can clear this 'state flag'.
//
// FIXME When client sends basic consume, we reply with basic consume ok,
// but right before that we start to deliver messages. That is a race
// condition what we need to send with a coordination.
// Probably we need to have StartConsuming and ConsumerSubscribed messages
// and we need to wait the client state machine to send us an ok message
// back, that it sent out the consume-ok message.
if !self.messages.is_empty() && !self.consumers.is_empty() {
trace!("There are queued messages, sending out one...");
let message = self.messages.pop_front().unwrap();
logerr!(self.send_out_message(message).await);
match poll_command_chan(commands) {
Poll::Pending => (), // no commands, so we can keep sending out messages
Poll::Ready(Some(command)) => {
if let Ok(false) = self.handle_command(command).await {
break;
}
}
Poll::Ready(None) => {
// Command channel is closed, let us exit from the command queue loop.
break;
} // TODO break the loop and cleanup
}
} else {
match commands.recv().await {
Some(command) => {
if let Ok(false) = self.handle_command(command).await {
break;
}
}
None => {
break;
}
}
}
}
}
async fn handle_command(&mut self, command: QueueCommand) -> Result<bool> {
//let start = Instant::now();
match command {
QueueCommand::PublishMessage(message) => {
logerr!(self.send_out_message(message).await);
Ok(true)
}
QueueCommand::AckMessage {
consumer_tag,
delivery_tag,
} => {
self.outbox.on_ack_arrive(consumer_tag, delivery_tag);
Ok(true)
}
QueueCommand::ExchangeBound { exchange_name } => {
self.bound_exchanges.insert(exchange_name);
Ok(true)
}
QueueCommand::ExchangeUnbound { exchange_name } => {
self.bound_exchanges.remove(&exchange_name);
Ok(true)
}
QueueCommand::StartDelivering { consumer_tag } => {
if let Some(p) = self
.candidate_consumers
.iter()
.position(|c| c.consumer_tag.eq(&consumer_tag))
{
let consumer = self.candidate_consumers.remove(p);
self.consumers.push(consumer);
}
Ok(true)
}
QueueCommand::StartConsuming {
conn_id,
channel,
consumer_tag,
no_ack,
exclusive,
sink,
result,
} => {
info!(
"Queue {} is consumed by {} ctag is {}",
self.queue.name, conn_id, consumer_tag
);
if self.queue.exclusive && self.declaring_connection != conn_id {
logerr!(result.send(channel_error(
channel,
frame::BASIC_CONSUME,
ChannelError::ResourceLocked,
"Cannot consume exclusive queue"
)));
} else if exclusive && !self.consumers.is_empty() {
logerr!(result.send(channel_error(
channel,
frame::BASIC_CONSUME,
ChannelError::AccessRefused,
"Queue is already consumed, cannot consume exclusively"
)));
} else {
let consumer = Consumer {
channel,
consumer_tag,
no_ack,
exclusive,
sink,
delivery_tag_counter: 1u64,
};
self.candidate_consumers.push(consumer);
logerr!(result.send(Ok(())));
}
Ok(true)
}
QueueCommand::CancelConsuming { consumer_tag, result } => {
info!(
"Queue {} is stopped consuming by ctag {}",
self.queue.name, consumer_tag
);
self.consumers.retain(|c| !c.consumer_tag.eq(&consumer_tag));
self.next_consumer = 0;
if self.queue.auto_delete && self.consumers.is_empty() {
logerr!(result.send(false));
Ok(false)
} else {
logerr!(result.send(true));
Ok(true)
}
}
QueueCommand::Purge { result } => {
let message_count = self.messages.len();
self.messages.clear();
logerr!(result.send(Ok(message_count as u32)));
Ok(true)
}
QueueCommand::DeleteQueue {
channel,
if_unused,
if_empty,
exchange_manager,
result,
} => {
info!("Queue {} is about to be deleted", self.queue.name);
// If there are consumers or candidates or if there are exchanges bound to this
// queue, and we cannot delete if it is used, send back and error.
if if_unused {
if !self.consumers.is_empty() || !self.candidate_consumers.is_empty() {
logerr!(result.send(channel_error(
channel,
frame::QUEUE_DELETE,
ChannelError::PreconditionFailed,
"Queue is consumed"
)));
return Ok(true);
}
if !self.bound_exchanges.is_empty() {
logerr!(result.send(channel_error(
channel,
frame::QUEUE_DELETE,
ChannelError::PreconditionFailed,
"Exchanges are bound to this queue"
)));
return Ok(true);
}
}
if if_empty && !self.messages.is_empty() {
logerr!(result.send(channel_error(
channel,
frame::QUEUE_DELETE,
ChannelError::PreconditionFailed,
"Queue is not empty"
)));
return Ok(true);
}
// Notify all exchanges about the delete, so they can unbound themselves.
for exchange_name in &self.bound_exchanges {
let unbind_cmd = UnbindQueueCommand {
channel,
exchange_name: exchange_name.clone(),
queue_name: self.queue.name.clone(),
routing_key: "".to_owned(),
};
logerr!(em::unbind_queue(&exchange_manager, unbind_cmd).await);
}
// Cancel all consumers by sending a basic cancel.
for consumer in &self.consumers {
logerr!(
consumer
.sink
.send(Frame::Frame(frame::basic_cancel(
consumer.channel,
"Queue is deleted",
false
)))
.await
);
}
// Cancel all candidate consumers by sending a basic cancel.
for consumer in &self.candidate_consumers {
logerr!(
consumer
.sink
.send(Frame::Frame(frame::basic_cancel(
consumer.channel,
"Queue is deleted",
false
)))
.await
);
}
// Pass the number of messages in the queue to the caller.
logerr!(result.send(Ok(self.messages.len() as u32)));
// Quit the queue event loop.
Ok(false)
}
QueueCommand::MessageRejected => Ok(true),
QueueCommand::Recover => Ok(true),
}
}
async fn send_out_message(&mut self, message: Arc<Message>) -> Result<()> {
let res = match self.consumers.get(self.next_consumer) {
None => {
trace!("No consumers, pushing message back to the queue");
self.messages.push_back(message);
SendResult::NoConsumer
}
Some(consumer) => {
let tag = Tag {
consumer_tag: consumer.consumer_tag.clone(),
delivery_tag: consumer.delivery_tag_counter,
};
let res = message::send_message(consumer.channel, message.clone(), &tag, &consumer.sink).await;
match res {
Ok(()) => {
self.outbox.on_sent_out(OutgoingMessage {
message,
tag,
sent_at: Instant::now(),
});
if let Some(p) = self
.consumers
.iter()
.position(|c| c.consumer_tag.eq(&consumer.consumer_tag))
{
self.consumers[p].delivery_tag_counter += 1;
}
SendResult::MessageSent
}
Err(e) => {
error!("Consumer sink seems to be invalid {:?}", e);
SendResult::ConsumerInvalid(consumer.consumer_tag.clone(), message)
}
}
}
};
match res {
SendResult::ConsumerInvalid(ctag, msg) => {
self.messages.push_back(msg);
self.consumers.retain(|c| !c.consumer_tag.eq(&ctag));
self.next_consumer = 0;
}
SendResult::MessageSent => {
self.next_consumer = (self.next_consumer + 1) % self.consumers.len();
}
_ => (),
}
Ok(())
}
}
fn poll_command_chan(commands: &mut mpsc::Receiver<QueueCommand>) -> Poll<Option<QueueCommand>> {
use futures::task::noop_waker_ref;
use std::task::Context;
let mut cx = Context::from_waker(noop_waker_ref());
commands.poll_recv(&mut cx)
}
struct OutgoingMessage {
message: Arc<Message>,
tag: Tag,
sent_at: Instant,
}
struct Outbox {
outgoing_messages: Vec<OutgoingMessage>,
}
impl Outbox {
fn on_ack_arrive(&mut self, consumer_tag: String, delivery_tag: u64) {
self.outgoing_messages
.retain(|om| om.tag.delivery_tag != delivery_tag || om.tag.consumer_tag != consumer_tag);
}
fn on_sent_out(&mut self, outgoing_message: OutgoingMessage) {
self.outgoing_messages.push(outgoing_message);
}
}