use crate::net::ConnectionState;
use futures::sync;
use spaniel::frames::Frame;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
pub struct QueueManager {
inner: HashMap<SocketAddr, VecDeque<Frame>>,
}
impl QueueManager {
pub fn new() -> Self {
QueueManager {
inner: HashMap::new(),
}
}
pub fn enqueue_frame(&mut self, frame: Frame, dst: SocketAddr) {
self.inner
.entry(dst)
.or_insert(VecDeque::new())
.push_back(frame);
}
pub fn pop_frame(&mut self, dst: &SocketAddr) -> Option<Frame> {
let res = self.inner.get_mut(dst).and_then(|q| q.pop_back());
if self.inner.contains_key(dst) && res.is_none() {
self.inner.remove(dst);
}
res
}
pub fn try_drain(
&mut self,
dst: SocketAddr,
tx: &mut sync::mpsc::UnboundedSender<Frame>,
) -> Option<ConnectionState> {
while let Some(frame) = self.pop_frame(&dst) {
if let Err(err) = tx.unbounded_send(frame) {
let next = Some(ConnectionState::Closed);
let frame = err.into_inner();
self.enqueue_frame(frame, dst);
return next;
}
}
None
}
pub fn exists(&self, dst: &SocketAddr) -> bool {
self.inner.contains_key(dst)
}
pub fn has_frame(&self, dst: &SocketAddr) -> bool {
self.inner.get(dst).map_or(false, |q| !q.is_empty())
}
}