use std::sync::Arc;
use log::*;
use tokio::prelude::Async;
use tokio::prelude::task::Task;
use crate::message::stomp_message::{StompMessage, Header};
pub struct SessionMessage {
pub message: Arc<StompMessage>,
pub headers: Vec<Header>
}
impl SessionMessage {
pub fn session_hdr_len(&self) -> usize {
let mut len: usize = 0;
for h in &self.headers {
len += h.len();
len += 1;
}
len
}
}
pub struct Mq {
q: Vec<SessionMessage>,
task: Option<Task>,
drain: bool,
close: bool,
}
impl Mq {
pub fn new() -> Mq {
Mq {
q: vec!(),
task: None,
drain: false,
close: false,
}
}
pub fn set_task(&mut self, task: Task) {
self.task = Some(task);
}
pub fn push(&mut self, message: Arc<StompMessage>, headers: Vec<Header>) -> usize {
if self.drain | self.close {
debug!("dropping message, session closing");
return self.q.len();
}
debug!("mq pushed {:?} len={}", message.command, self.q.len());
self.q.push(SessionMessage {message, headers});
match &self.task {
Some(task) => {
debug!("mq notified");
task.notify();
},
_ => {
debug!("no-one to notify");
}
}
self.q.len()
}
pub fn next(&mut self) -> Option<SessionMessage> {
if self.q.len() > 0 {
return Some(self.q.remove(0));
}
None
}
pub fn len(&self) -> usize {
self.q.len()
}
pub fn close(&mut self) {
self.q.clear();
self.close = true;
self.notify();
self.task = None;
}
pub fn is_closed(&self) -> bool {
self.close
}
pub fn drain(&mut self) -> Result<(), usize> {
self.drain = true;
self.notify();
match self.q.len() {
0 => {
self.close = true;
self.task = None;
Ok(())
},
len => Err(len)
}
}
pub fn notify(&self) {
match &self.task {
Some(task) => task.notify(),
_ => {}
}
}
pub fn poll(&self) -> Result<Async<()>, ()> {
debug!("mq polled with {} messages on the q", self.q.len());
if self.close {
if self.len() > 0 {
warn!("close with messages on the q");
}
debug!("mq closed");
return Err(());
}
match self.q.len() {
0 => {
if self.drain {
debug!("mq closed");
return Err(());
}
Ok(Async::NotReady)
},
_ => {
debug!("mq ready");
Ok(Async::Ready(()))
},
}
}
}