romp 0.5.2

STOMP server and WebSockets platform
Documentation
//! upstream/downstream features i.e. handling MESSAGE requests for a downstream "edge" server. i.e. xtomp.
//! Upstream STOMP servers should CONNECT to a downstream SUBSCRIBE to `memtop/up` destination
//! and handle incoming SEND requests returning MESSAGE responses to `memtop`.
//! Upstreams should perform some bespoke business logic based on messages.


use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};

use log::*;
use tokio::prelude::*;
use tokio::net::TcpStream;

use crate::prelude::*;
use crate::init::CONFIG;
use crate::config::config::Downstream;
use crate::session::interruptible_interval::InterruptibleInterval;
use crate::session::stomp_session::{StompSession, FLAG_DOWNSTREAM};
use crate::session::reader::ReadKiller;
use crate::bootstrap::read_wrapper;
use std::time::Duration;
use crate::message::request::{get_request_subscribe, get_request_connect};

pub fn downstream_bootstrap() {

    let downstreams: &HashMap<String, Downstream> =  &CONFIG.downstreams;

    if downstreams.len() > 0 {
        romp_stomp_filter(Box::new(ConnectOrchestratorFilter {}));
    }

    for (k, downstream) in downstreams {

        if let Ok(_) = downstream.address.parse::<SocketAddr>() {
            let dsl = DownstreamConnector {
                downstream: downstream.copy()
            };
            let dsl_lock = Arc::new(RwLock::new(dsl));
            // TODO copy to track connection status
            //LISTENERS.push(dsl_lock.clone());

            dsl_lock.read().unwrap().connect(dsl_lock.clone());
        } else {
            panic!("unable to parse downstream address: {} {}", k, downstream.address)
        }
    }

}

/// Handles starting and restarting downstream connections.
pub struct DownstreamConnector {
    pub downstream: Downstream,
}

impl DownstreamConnector {

    // WTF I need two pointers to my self, one with thread safe locks, TODO why can we have this on one thread
    // this object is immutable its just a function pointer with no concurrency issues but I can't find a better way to do this
    // root cause of this nonsense is StompMessage having to be send safe, I need to put myself in a send safe obejct
    // this is done once on creation there is no send issue
    pub(crate) fn session_shutdown(&self, myself: Arc<RwLock<DownstreamConnector>>, _id: usize, _flags: u64) {
        self.connect(myself);
    }

    pub fn connect(&self, myself: Arc<RwLock<DownstreamConnector>>) {

        let addr = self.downstream.address.parse::<SocketAddr>().unwrap();
        info!("downstream connecting... {}", addr);

        let m = get_request_connect(&self.downstream);

        let stream = TcpStream::connect(&addr).map(move|sock| {
            info!("downstream connected {}", addr);
            sock.set_keepalive(Some(Duration::from_secs(30))).ok();
            // expect responses to be the size of requests for downstream
            sock.set_recv_buffer_size(CONFIG.request_client_buffer).ok();
            sock.set_send_buffer_size(CONFIG.request_client_buffer).ok();
            let mut session = StompSession::new();
            session.set_flag(FLAG_DOWNSTREAM);
            session.downstream_connector = Some(myself.clone());

            let session = Arc::new(RwLock::new(session));

            // Session timeouts
            let timeout_session = session.clone();
            let task = InterruptibleInterval::new(session.clone())
                .for_each(move |_| {
                    timeout_session.write().unwrap().timeout()
                })
                .map_err(|_| {});
            tokio::spawn(task);

            let mut s = session.write().unwrap();

            // Spawn the future as a concurrent task.
            let (reader, writer) = s.split(sock, session.clone());

            let read_killer = Arc::new(RwLock::new(ReadKiller::new()));
            let read_killer_cpy = read_killer.clone();

            // TODO combine these to one spawn task so we guarantee that all read write and mq ops happen on the same thread
            tokio::spawn(writer);
            tokio::spawn(read_wrapper(read_killer, reader));
            s.set_read_killer(read_killer_cpy);
            s.send_message(m);

            // cast the unused error
            Ok(()) as Result<(), ()>
        }).map_err(|e| {
            error!("downstream error {}", e);
        }).flatten(); // WTF does this do these last lines are needed but undocumented

        tokio::spawn(stream);

    }

}

// Need to, CONNECT, on success, need to SUBSCRIBE, need to async report errors.
// use receipt mechanism track which downstream we are talking about
//
// receipt header can be op|downstream_name|memtop-d|127.0.0.1:61613|memtop-d/up
#[derive(Debug)]
pub struct ConnectOrchestratorFilter {

}

impl ConnectOrchestratorFilter {
    pub fn new()-> ConnectOrchestratorFilter {
        ConnectOrchestratorFilter {
        }
    }
}

impl MessageFilter for ConnectOrchestratorFilter {

    fn init(&mut self) {
    }



    fn do_filter(&self, context: &mut Context, message: &mut StompMessage) -> Result<bool, FilterError> {
        if message.command  == StompCommand::Connected && context.is_downstream() {
            info!("downstream CONNECTED");
            let downstream: &Downstream = CONFIG.downstreams.get(&context.downstream()).unwrap();
            if let Ok(session) = context.session.read() {
                for remote_destination in &downstream.destinations {
                    debug!("subscribing to {}", remote_destination);
                    session.send_message(get_request_subscribe(remote_destination, downstream));
                }
                return Ok(HANDLED);
            }

        }
        else if message.command  == StompCommand::Receipt && context.is_downstream() {
            if let Some(receipt) =  message.get_header("receipt") {
                if receipt.starts_with("sub|") {
                    let receipt_fields: Vec<&str> = receipt.split("|").collect();
                    info!("downstream SUBSCRIBED {}", receipt_fields[2]);
                }
            }
        }
        else if message.command  == StompCommand::Error && context.is_downstream() {
            if let Some(error_message) =  message.get_header("message") {
                error!("downstream error for {}, {}", context.downstream(), error_message);
            } else {
                error!("downstream error for {}", context.downstream());
            }
            return Ok(HANDLED);
        }

        Ok(CONTINUE)
    }

}