use std::collections::HashMap;
use tokio::sync::mpsc::{self, Sender};
use crate::{
channel::Channel,
command_channel::{COMMAND_CHANNEL_ID, CommandChannel},
model::packet::Packet,
rules::Rule,
tcp_channel::TcpChannel,
};
pub struct ChannelHandler {
channel_senders: HashMap<i32, Sender<Vec<u8>>>,
to_server_tx: Sender<Vec<u8>>,
last_channel: i32,
rule: Rule,
}
impl ChannelHandler {
pub fn new(to_server_tx: Sender<Vec<u8>>, rule: Rule) -> (ChannelHandler, CommandChannel) {
let mut channel_senders = HashMap::new();
let (from_server_tx, from_server_rx) = mpsc::channel::<Vec<u8>>(8);
let command_channel =
Channel::new(to_server_tx.clone(), from_server_rx, COMMAND_CHANNEL_ID);
let command_channel = CommandChannel::new(command_channel);
channel_senders.insert(COMMAND_CHANNEL_ID, from_server_tx);
(
ChannelHandler {
to_server_tx,
last_channel: 0,
channel_senders,
rule,
},
command_channel,
)
}
pub async fn route(&mut self, packet: Packet) {
self.channel_senders
.retain(|_channel_id, channel| !channel.is_closed());
let channel = self.channel_senders.get(&packet.channel_id);
match channel {
Some(channel) => {
if packet.data.is_empty() {
self.channel_senders.remove(&packet.channel_id);
return;
}
match channel.send(packet.data).await {
Ok(_) => {}
Err(_) => {
log::warn!("failed to post data to receive channel")
}
}
}
None => {
if packet.channel_id <= self.last_channel {
log::warn!(
"dropping packet for already closed channel {:}",
packet.channel_id
);
return;
}
self.last_channel = packet.channel_id;
log::trace!("new connection for {:}", packet.channel_id);
let (from_server_tx, from_server_rx) = mpsc::channel::<Vec<u8>>(8);
let channel =
Channel::new(self.to_server_tx.clone(), from_server_rx, packet.channel_id);
self.channel_senders
.insert(packet.channel_id, from_server_tx);
let tcp_channel = TcpChannel::new(channel);
tcp_channel.spawn(packet, &self.rule);
}
}
}
}