romp 0.5.2

STOMP server and WebSockets platform
Documentation
//! Session specific message queue of StompMessages that are pending write.

use std::sync::Arc;

use log::*;
use tokio::prelude::Async;
use tokio::prelude::task::Task;

use crate::message::stomp_message::{StompMessage, Header};

/// Outgoing queue of messages the session is going to send
/// TODO flup the mq so slow consumers cant hog memory

/// A single message and headers specific to this session, e.g. subscription it came from, or session id
pub struct SessionMessage {
    pub message: Arc<StompMessage>,
    // TODO could use &'static str for name in these headers to avoid some unnecessary allocations
    pub headers: Vec<Header>
}
impl SessionMessage {
    pub fn session_hdr_len(&self) -> usize {
        let mut len: usize = 0;
        for h in &self.headers {
            len += h.len();
            len += 1;
        }
        len
    }
}

pub struct Mq {
    // TODO nasty that all messages are behind Arc when some can be owned and others could be 'static strings
    q: Vec<SessionMessage>,
    task: Option<Task>,
    // close when q is empty
    drain: bool,
    // empty and closed
    close: bool,
}

impl Mq {

    pub fn new() -> Mq {
        Mq {
            q: vec!(),
            task: None,
            drain: false,
            close: false,
        }
    }

    pub fn set_task(&mut self, task: Task) {
        self.task = Some(task);
    }

    pub fn push(&mut self, message: Arc<StompMessage>, headers: Vec<Header>) -> usize {
        if self.drain | self.close {
            debug!("dropping message, session closing");
            return self.q.len();
        }

        debug!("mq pushed {:?} len={}", message.command, self.q.len());

        self.q.push(SessionMessage {message, headers});

        match &self.task {
            Some(task) => {
                debug!("mq notified");
                task.notify();
            },
            _ => {
                debug!("no-one to notify");
            }
        }

        self.q.len()
    }

    /// pops a message and transfers ownership
    pub fn next(&mut self) -> Option<SessionMessage> {
        if self.q.len() > 0 {
            return Some(self.q.remove(0));
        }
        None
    }

    pub fn len(&self) -> usize {
        self.q.len()
    }

    pub fn close(&mut self) {
        self.q.clear();
        self.close = true;
        self.notify();
        self.task = None;
    }

    pub fn is_closed(&self) -> bool {
        self.close
    }

    /// Tell the MQ to stop accepting new messages and carry on writing any existing ones.
    /// When there are none left shutdown
    pub fn drain(&mut self) -> Result<(), usize> {
        self.drain = true;
        self.notify();

        match self.q.len() {
            0 => {
                self.close = true;
                self.task = None;
                Ok(())
            },
            len => Err(len)
        }
    }

    pub fn notify(&self) {
        match &self.task {
            Some(task) => task.notify(),
            _ => {}
        }
    }

    // not a real future only called by Writer via session
    // N.B. does not require a write lock
    pub fn poll(&self) -> Result<Async<()>, ()> {
        debug!("mq polled with {} messages on the q", self.q.len());
        if self.close {
            if self.len() > 0 {
                warn!("close with messages on the q");
            }
            debug!("mq closed");
            return Err(());
        }
        match self.q.len() {
            0 => {
                if self.drain {
                    debug!("mq closed");
                    return Err(());
                }
                Ok(Async::NotReady)
            },
            _ => {
                debug!("mq ready");
                Ok(Async::Ready(()))
            },
        }
    }
}