romp 0.5.2

STOMP server and WebSockets platform
Documentation
use std::sync::{Arc, RwLock};

use futures::*;

use log::*;
use log::Level::Debug;

use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio::prelude::future::Future;

use crate::init::CONFIG;
use crate::session::stomp_session::{StompSession, FLAG_WEB_SOCKETS};
use crate::message::serializer::MessageSerializer;
use crate::web_socket::ws_response::ECG_FRAME;

/// responsible for sending data to the Tcp Connections WriteHalf

/// xtomp_send()

pub struct Writer {
    session: Arc<RwLock<StompSession>>,
    session_id: usize,
    write_half: Arc<RwLock<WriteHalf<TcpStream>>>,
    pos: usize,
    end: usize,
    out: Box<[u8]>,
    first_poll: bool,
    serializer: MessageSerializer,
    web_socket: bool,
}

impl Writer {
    pub fn new(session: Arc<RwLock<StompSession>>, session_id: usize, write_half: Arc<RwLock<WriteHalf<TcpStream>>>) -> Writer {
        Writer {
            session,
            session_id,
            write_half,
            pos: 0,
            end: 0,
            out: vec![0 as u8; CONFIG.response_client_buffer].into_boxed_slice(),
            first_poll: true,
            serializer: MessageSerializer::new(),
            web_socket: false,
        }
    }

    /// poll Mq via session until it has a message to write, or it has been drained or closed
    fn poll_mq(&self) -> Result<Async<()>, ()> {
        match self.session.read().unwrap().poll_mq() {
            Ok(Async::Ready(())) => Ok(Async::Ready(())),
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(_) => Err(()),
        }
    }

    fn pop_mq(&mut self) {
        let session = self.session.read().unwrap();
        match session.pop() {
            Some(message) => {
                if session.get_flag(FLAG_WEB_SOCKETS) {
                    self.serializer.ws_upgrade();
                    self.web_socket = true;
                }
                self.serializer.set_message(message);
            }
            _ => {
                warn!("unreachable, pop_mq() should be called after poll() returns");
            }
        }
    }

    /// write the contents of self.out to the network
    fn write_buf(&mut self) -> Result<Async<()>, ()> {
        if self.pos == self.end {
            return Ok(Async::Ready(()));
        }
        let mut write_half = self.write_half.write().unwrap();

        while self.pos < self.end {

            let out_buf = &self.out[self.pos..self.end];

            let n = try_ready!(write_half.poll_write(out_buf).map_err(|_e| ()));
            self.pos += n;

            //debug!("WRITE CHUNK '{}'", String::from_utf8_lossy(out_buf));
            //debug!("WRITE CHUNK '{:x?}'", out_buf);
            //debug!("WRITE CHUNK {} bytes", self.pos);
            {
                self.session.read().unwrap().wrote_something();
            }

            if self.pos == self.end {
                // wrote the whole buffer
                if self.pos > 0 {
                    debug!("wrote whole out_buf last char '{}'", self.out[self.pos - 1]);
                }
                self.pos = 0;
                self.end = 0;
                return Ok(Async::Ready(()));
            }

            if n == 0 {
                debug!("write of zero len is error");
                self.session.write().unwrap().write_error();
                debug!("writer closed");
                return Err(());
            }
        }
        Ok(Async::NotReady)
    }

}

impl Future for Writer {
    type Item = ();
    type Error = ();

    /// handles writing from mq to self.out and from self.out to the network.
    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        debug!("write polled");
        if self.first_poll {
            let session = self.session.write().unwrap();
            session.set_mq_task(futures::task::current());
            self.first_poll = false;
        }

        // if there is anything left in out, write it
        match self.write_buf() {
            Ok(Async::Ready(())) => {
                // able to write
            },
            Ok(Async::NotReady) => {
                return Ok(Async::NotReady);
            },
            Err(_) => {
                // write_error() was called
                return Err(());
            },
        }

        // if we are currently processing a message, write the next chunk
        while self.serializer.is_serializing() {
            match self.serializer.write_chunk(&mut self.out) {
                Ok(0) => {
                    warn!("unreachable? zero write_chunk()");
                },
                Ok(writ) => {
                    self.pos = 0;
                    self.end = writ;
                    debug!("prepared some");
                    match self.write_buf() {
                        Ok(Async::Ready(())) => {
                            // able to write
                        },
                        Ok(Async::NotReady) => {
                            return Ok(Async::NotReady);
                        },
                        Err(_) => {
                            // write_error() was called
                            return Err(());
                        }
                    }
                },
                Err(_) => {
                    warn!("discarding message");
                },
            }
        }
        if log_enabled!(Debug) {
            debug!("looking for more {}", self.session.read().unwrap().len());
        }
        match self.poll_mq() {
            Ok(Async::Ready(())) => {
                self.pop_mq();
                return self.poll();
            },
            Ok(Async::NotReady) => {
                debug!("no more messages");

                // heart-beat
                if self.session.read().unwrap().should_heart_beat() {
                    debug!("sending heart-beat");
                    if self.web_socket {
                        self.out[self.pos] = ECG_FRAME[0];
                        self.out[self.pos + 1] = ECG_FRAME[1];
                        self.out[self.pos + 2] = ECG_FRAME[2];
                        self.end += 3;
                    } else {
                        self.out[self.pos] = b'\n';
                        self.end += 1;
                    }

                    match self.write_buf() {
                        Ok(Async::Ready(())) => {
                            // able to write
                        },
                        Ok(Async::NotReady) => {
                            return Ok(Async::NotReady);
                        },
                        Err(_) => {
                            // write_error() was called
                            return Err(());
                        }
                    }
                }
                Ok(Async::NotReady)
            },
            Err(_) => {
                self.write_half.write().unwrap().shutdown().ok();
                debug!("writer closed id={}" , self.session_id);
                self.session.write().unwrap().write_terminated();
                return Err(());
            },
        }

    }
}