use std::collections::{BTreeMap, HashMap};
use amqp_serde::types::{AmqpChannelId, ShortUint};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use crate::frame::MethodHeader;
use super::{channel_id_repo::ChannelIdRepository, IncomingMessage};
pub(crate) struct ChannelResource {
pub responders: HashMap<&'static MethodHeader, oneshot::Sender<IncomingMessage>>,
pub dispatcher: Option<UnboundedSender<IncomingMessage>>,
}
impl ChannelResource {
pub(crate) fn new(dispatcher: Option<UnboundedSender<IncomingMessage>>) -> Self {
Self {
responders: HashMap::new(),
dispatcher,
}
}
}
pub(super) struct ChannelManager {
channel_id_repo: ChannelIdRepository,
resource: BTreeMap<AmqpChannelId, ChannelResource>,
}
impl ChannelManager {
pub fn new(channel_max: ShortUint) -> Self {
Self {
channel_id_repo: ChannelIdRepository::new(channel_max),
resource: BTreeMap::new(),
}
}
pub fn insert_resource(
&mut self,
channel_id: Option<AmqpChannelId>,
resource: ChannelResource,
) -> Option<AmqpChannelId> {
let id = match channel_id {
Some(id) => {
if id == 0 || self.channel_id_repo.reserve(id) {
match self.resource.insert(id, resource) {
Some(_old) => unreachable!("implementation error"),
None => id,
}
} else {
return None;
}
}
None => {
let id = self.channel_id_repo.allocate();
match self.resource.insert(id, resource) {
Some(_old) => unreachable!("implementation error"),
None => id,
}
}
};
Some(id)
}
pub fn remove_resource(&mut self, channel_id: &AmqpChannelId) -> Option<ChannelResource> {
assert!(
self.channel_id_repo.release(*channel_id),
"release a free id, implementation error"
);
self.resource.remove(channel_id)
}
pub fn get_dispatcher(
&self,
channel_id: &AmqpChannelId,
) -> Option<&UnboundedSender<IncomingMessage>> {
self.resource.get(channel_id)?.dispatcher.as_ref()
}
pub fn insert_responder(
&mut self,
channel_id: &AmqpChannelId,
method_header: &'static MethodHeader,
responder: oneshot::Sender<IncomingMessage>,
) -> Option<oneshot::Sender<IncomingMessage>> {
self.resource
.get_mut(channel_id)?
.responders
.insert(method_header, responder)
}
pub fn remove_responder(
&mut self,
channel_id: &AmqpChannelId,
method_header: &'static MethodHeader,
) -> Option<oneshot::Sender<IncomingMessage>> {
self.resource
.get_mut(channel_id)?
.responders
.remove(method_header)
}
}