romp 0.5.2

STOMP server and WebSockets platform
Documentation
//! Bootstrapping, i.e. initializing, a `romp` server.
//!
use std::sync::{Arc, RwLock};
use std::time::{Instant, Duration};

use log::*;

use tokio::prelude::*;
use tokio::net::TcpListener;
use tokio::timer::Interval;


use crate::init::CONFIG;
use crate::init::SERVER;
use crate::init::USER;
use crate::session::reader::{Reader, ReadKiller};
use crate::session::stomp_session::StompSession;
use crate::workflow::destination::destination::Destination;
use crate::session::interruptible_interval::InterruptibleInterval;
use crate::downstream::downstream_bootstrap;

/// Initialises the `romp` server, this method should be called once
/// bespoke filters have been registered with the routers.
pub fn romp_bootstrap() -> BootstrapFuture {

    // referencing the CONFIG here ensures it lazy loads first
    let server_name = CONFIG.name();
    info!("starting tokio for: {}", server_name);

    BootstrapFuture {
    }
}

pub struct BootstrapFuture {

}

impl Future for BootstrapFuture {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {

        // referencing the struct causes it to init
        USER.len();


        debug!("starting destination server");
        for (_name, destination) in SERVER.iter() {
            tokio::spawn(destination_wrapper(destination.clone()));

            let mut expiry;
            {
                expiry = destination.read().unwrap().expiry();
            }
            let dest_clone = destination.clone();

            // short expiry threads take up lots of CPU
            if expiry < 60000 {
                expiry = 60000;
            }

            // TODO interruptible
            let task = Interval::new(Instant::now(), Duration::from_millis(expiry))
                // dont understand this "move" by default the unused arg is borrowed and this affects closure lifetime checking somehow
                .for_each(move |_| {
                    dest_clone.write().unwrap().timeout_messages()
                } )
                .map_err(|_| {});

            tokio::spawn(task);

        }

        // clock tick every 60 seconds
        let ticker = Interval::new(Instant::now(), Duration::from_millis(60000))
            .for_each(|_| {
                SERVER.tick()
            })
            .map_err(|_| {});

        tokio::spawn(ticker);

        // bootstrap outgoing TCP connections
        downstream_bootstrap();

        for socket_address in CONFIG.socket_addresses.iter() {
            // Bind the server's sockets.
            let address = socket_address.parse().unwrap();
            let listener = TcpListener::bind(&address)
                .expect("unable to bind TCP listener");

            info!("listening on: {}", socket_address);

            // Pull out a stream of sockets for incoming connections
            // TODO the type of server (ForEach<...>) is stupidly complicated, so much so that we can not pass it around
            // https://tokio.rs/docs/going-deeper/returning/ does not help
            let server = listener.incoming()
                .map_err(|e| warn!("accept failed = {:?}", e))
                .for_each(|sock| {
                    if SERVER.is_shutdown() {
                        return Err(());
                    } else {
                        debug!("connection");
                    }

                    let session = Arc::new(RwLock::new(StompSession::new()));

                    // Session timeouts
                    let timeout_session = session.clone();
                    let task = InterruptibleInterval::new(session.clone())
                        .for_each(move |_| {
                            timeout_session.write().unwrap().timeout()
                        })
                        .map_err(|_| {});
                    tokio::spawn(task);

                    let mut s = session.write().unwrap();

                    // Spawn the future as a concurrent task.
                    let (reader, writer) = s.split(sock, session.clone());

                    //let mq = Arc::new(RwLock::new(Mq::new()));
                    //let mq_cpy = mq.clone();
                    let read_killer = Arc::new(RwLock::new(ReadKiller::new()));
                    let read_killer_cpy = read_killer.clone();

                    // TODO combine these to one spawn task so we guarantee that all read write and mq ops happen on the same thread
                    tokio::spawn(writer);
                    tokio::spawn(read_wrapper(read_killer, reader));
                    s.set_read_killer(read_killer_cpy);

                    Ok(())
                });

            tokio::spawn(server);
        }
        Err(())
    }
}


/// wraps a reader and a way to stop the reader
pub(crate) struct ReadWrapper {
    read_killer: Arc<RwLock<ReadKiller>>,
    reader: Reader,
}
pub(crate) fn read_wrapper(read_killer: Arc<RwLock<ReadKiller>>, reader: Reader) -> ReadWrapper {
    ReadWrapper {
        read_killer,
        reader,
    }
}
impl Future for ReadWrapper {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        debug!("read wrapper polled");
        {
            match self.read_killer.write().unwrap().poll() {
                Err(_) => {
                    match self.reader.poll() {
                        Err(_) => {},
                        _ => debug!("read killer unexpected return"),
                    }
                    debug!("read wrapper closed");
                    return Err(());
                },
                _ => {}
            }
        }

        match self.reader.poll() {
            Ok(Async::Ready(())) => Ok(Async::Ready(())),
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(_) => {
                debug!("read wrapper closed");
                return Err(());
            },
        }
    }
}

struct DestinationFuture {
    destination: Arc<RwLock<Destination>>,
    first_poll: bool,
}
fn destination_wrapper(destination: Arc<RwLock<Destination>>) -> DestinationFuture {
    DestinationFuture {
        destination,
        first_poll: true,
    }
}
impl Future for DestinationFuture {
    type Item = ();
    type Error = ();

    // If this does not return NotReady it never polls again??
    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        debug!("destination future wrapper polled");
        if self.first_poll {
            self.first_poll = false;
            {
                let mut d = self.destination.write().unwrap();
                d.set_task(futures::task::current());
            }
        }

        match self.destination.write().unwrap().poll() {
            true => {
                Ok(Async::NotReady)
            },
            false => {
                debug!("destination future closed");
                Err(())
            },
        }
    }
}