use std::{
fmt,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use amqp_serde::types::AmqpChannelId;
use tokio::sync::{mpsc, oneshot};
use super::callbacks::ChannelCallback;
use crate::{
api::{error::Error, Result},
connection::Connection,
frame::{CloseChannel, CloseChannelOk, Deliver, Flow, FlowOk, Frame, MethodHeader, Return},
net::{ConnManagementCommand, IncomingMessage, OutgoingMessage},
BasicProperties,
};
#[cfg(feature = "traces")]
use tracing::{error, info, trace};
pub struct ConsumerMessage {
pub deliver: Option<Deliver>,
pub basic_properties: Option<BasicProperties>,
pub content: Option<Vec<u8>>,
remaining: usize,
}
pub(crate) struct ReturnMessage {
ret: Option<Return>,
basic_properties: Option<BasicProperties>,
content: Option<Vec<u8>>,
remaining: usize,
}
pub(crate) struct GetOkMessage {
content: Option<Vec<u8>>,
remaining: usize,
}
pub(crate) struct RegisterContentConsumer {
consumer_tag: String,
consumer_tx: mpsc::UnboundedSender<ConsumerMessage>,
}
pub(crate) struct DeregisterContentConsumer {
consumer_tag: String,
}
pub(crate) struct RegisterGetContentResponder {
tx: mpsc::UnboundedSender<IncomingMessage>,
}
pub(crate) struct RegisterOneshotResponder {
pub method_header: &'static MethodHeader,
pub responder: oneshot::Sender<IncomingMessage>,
pub acker: oneshot::Sender<()>,
}
pub(crate) struct RegisterChannelCallback {
pub callback: Box<dyn ChannelCallback + Send + 'static>,
}
pub(crate) enum DispatcherManagementCommand {
RegisterContentConsumer(RegisterContentConsumer),
DeregisterContentConsumer(DeregisterContentConsumer),
RegisterGetContentResponder(RegisterGetContentResponder),
RegisterOneshotResponder(RegisterOneshotResponder),
RegisterChannelCallback(RegisterChannelCallback),
}
#[derive(Clone)]
pub struct Channel {
shared: Arc<SharedChannelInner>,
connection: Connection,
_guard: Option<Arc<DropGuard>>,
}
struct DropGuard(Arc<SharedChannelInner>);
pub(crate) struct SharedChannelInner {
is_open: AtomicBool,
channel_id: AmqpChannelId,
outgoing_tx: mpsc::Sender<OutgoingMessage>,
conn_mgmt_tx: mpsc::Sender<ConnManagementCommand>,
dispatcher_mgmt_tx: mpsc::UnboundedSender<DispatcherManagementCommand>,
}
impl SharedChannelInner {
async fn register_responder(
&self,
method_header: &'static MethodHeader,
) -> Result<oneshot::Receiver<IncomingMessage>> {
let (responder, responder_rx) = oneshot::channel();
let (acker, acker_rx) = oneshot::channel();
let cmd = RegisterOneshotResponder {
method_header,
responder,
acker,
};
self.dispatcher_mgmt_tx
.send(DispatcherManagementCommand::RegisterOneshotResponder(cmd))?;
acker_rx.await?;
Ok(responder_rx)
}
async fn close_handshake(&self) -> Result<()> {
let responder_rx = self.register_responder(CloseChannelOk::header()).await?;
synchronous_request!(
self.outgoing_tx,
(self.channel_id, CloseChannel::default().into_frame()),
responder_rx,
Frame::CloseChannelOk,
Error::ChannelCloseError
)?;
let cmd = ConnManagementCommand::DeregisterChannelResource(self.channel_id);
self.conn_mgmt_tx.send(cmd).await?;
Ok(())
}
}
impl Channel {
pub(in crate::api) fn new(
is_open: AtomicBool,
connection: Connection,
channel_id: AmqpChannelId,
outgoing_tx: mpsc::Sender<OutgoingMessage>,
conn_mgmt_tx: mpsc::Sender<ConnManagementCommand>,
dispatcher_mgmt_tx: mpsc::UnboundedSender<DispatcherManagementCommand>,
) -> Self {
let shared = Arc::new(SharedChannelInner::new(
is_open,
channel_id,
outgoing_tx,
conn_mgmt_tx,
dispatcher_mgmt_tx,
));
let guard = Some(Arc::new(DropGuard(shared.clone())));
Self {
_guard: guard,
connection,
shared,
}
}
pub async fn register_callback<F>(&self, callback: F) -> Result<()>
where
F: ChannelCallback + Send + 'static,
{
let cmd = RegisterChannelCallback {
callback: Box::new(callback),
};
self.shared
.dispatcher_mgmt_tx
.send(DispatcherManagementCommand::RegisterChannelCallback(cmd))?;
Ok(())
}
async fn register_responder(
&self,
method_header: &'static MethodHeader,
) -> Result<oneshot::Receiver<IncomingMessage>> {
let (responder, responder_rx) = oneshot::channel();
let (acker, acker_rx) = oneshot::channel();
let cmd = RegisterOneshotResponder {
method_header,
responder,
acker,
};
self.shared
.dispatcher_mgmt_tx
.send(DispatcherManagementCommand::RegisterOneshotResponder(cmd))?;
acker_rx.await?;
Ok(responder_rx)
}
pub fn channel_id(&self) -> AmqpChannelId {
self.shared.channel_id
}
pub fn connection_name(&self) -> &str {
self.connection.connection_name()
}
pub fn is_connection_open(&self) -> bool {
self.connection.is_open()
}
pub fn is_open(&self) -> bool {
self.shared.is_open.load(Ordering::Relaxed)
}
pub(crate) fn set_is_open(&self, is_open: bool) {
self.shared.is_open.store(is_open, Ordering::Relaxed);
}
pub async fn flow(&self, active: bool) -> Result<bool> {
let responder_rx = self.register_responder(FlowOk::header()).await?;
let flow_ok = synchronous_request!(
self.shared.outgoing_tx,
(self.shared.channel_id, Flow::new(active).into_frame()),
responder_rx,
Frame::FlowOk,
Error::ChannelUseError
)?;
Ok(flow_ok.active)
}
pub async fn close(self) -> Result<()> {
if self.is_connection_open() {
if let Ok(true) = self.shared.is_open.compare_exchange(
true,
false,
Ordering::Acquire,
Ordering::Relaxed,
) {
#[cfg(feature = "traces")]
info!("close channel {}", self);
self.shared.close_handshake().await?;
}
}
Ok(())
}
pub(crate) fn clone_as_secondary(&self) -> Self {
Self {
shared: self.shared.clone(),
connection: self.connection.clone_no_drop_guard(),
_guard: None,
}
}
}
impl Drop for DropGuard {
fn drop(&mut self) {
if let Ok(true) =
self.0
.is_open
.compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
{
#[cfg(feature = "traces")]
trace!("drop channel {}", self.0.channel_id);
let inner = self.0.clone();
tokio::spawn(async move {
#[cfg(feature = "traces")]
info!("try to close channel {} at drop", inner.channel_id);
if let Err(err) = inner.close_handshake().await {
#[cfg(feature = "traces")]
error!(
"failed to gracefully close channel {} at drop, cause: '{}'",
inner.channel_id, err,
);
} else {
#[cfg(feature = "traces")]
info!("channel {} is closed OK after drop", inner.channel_id);
}
});
}
}
}
impl fmt::Display for Channel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{} [{}] of connection {}",
self.channel_id(),
if self.is_open() { "open" } else { "closed" },
self.connection,
)
}
}
impl SharedChannelInner {
fn new(
is_open: AtomicBool,
channel_id: AmqpChannelId,
outgoing_tx: mpsc::Sender<OutgoingMessage>,
conn_mgmt_tx: mpsc::Sender<ConnManagementCommand>,
dispatcher_mgmt_tx: mpsc::UnboundedSender<DispatcherManagementCommand>,
) -> Self {
Self {
is_open,
channel_id,
outgoing_tx,
conn_mgmt_tx,
dispatcher_mgmt_tx,
}
}
}
#[cfg(test)]
mod tests {
use tokio::time;
use crate::{
channel::Channel,
connection::{Connection, OpenConnectionArguments},
test_utils::setup_logging,
};
use std::marker::PhantomData;
#[ignore = "https://github.com/gftea/amqprs/issues/69"]
#[tokio::test]
async fn test_channel_cloneable() {
trait NotCloneable {
const IS_CLONEABLE: bool = false;
}
impl<T> NotCloneable for T {}
struct Wrapper<T>(PhantomData<T>);
#[allow(dead_code)]
impl<T: Clone> Wrapper<T> {
const IS_CLONEABLE: bool = true;
}
assert_eq!(<Wrapper<Channel>>::IS_CLONEABLE.to_string(), "true");
}
#[tokio::test]
async fn test_channel_clone_and_drop() {
setup_logging();
let args = OpenConnectionArguments::new("localhost", 5672, "user", "bitnami");
let conn = Connection::open(&args).await.unwrap();
{
let ch1 = conn.open_channel(Some(1)).await.unwrap();
let ch2 = ch1.clone();
let h = tokio::spawn(async move {
time::sleep(time::Duration::from_millis(10)).await;
assert!(ch1.is_open());
});
h.await.unwrap();
time::sleep(time::Duration::from_millis(10)).await;
assert!(ch2.is_open());
}
time::sleep(time::Duration::from_millis(50)).await;
conn.close().await.unwrap();
}
}
mod dispatcher;
pub(crate) use dispatcher::*;
mod basic;
mod confim;
mod exchange;
mod queue;
mod tx;
pub use basic::*;
pub use confim::*;
pub use exchange::*;
pub use queue::*;
#[allow(unused_imports)] pub use tx::*;