use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use log::*;
use tokio::prelude::*;
use tokio::net::TcpStream;
use crate::prelude::*;
use crate::init::CONFIG;
use crate::config::config::Downstream;
use crate::session::interruptible_interval::InterruptibleInterval;
use crate::session::stomp_session::{StompSession, FLAG_DOWNSTREAM};
use crate::session::reader::ReadKiller;
use crate::bootstrap::read_wrapper;
use std::time::Duration;
use crate::message::request::{get_request_subscribe, get_request_connect};
pub fn downstream_bootstrap() {
let downstreams: &HashMap<String, Downstream> = &CONFIG.downstreams;
if downstreams.len() > 0 {
romp_stomp_filter(Box::new(ConnectOrchestratorFilter {}));
}
for (k, downstream) in downstreams {
if let Ok(_) = downstream.address.parse::<SocketAddr>() {
let dsl = DownstreamConnector {
downstream: downstream.copy()
};
let dsl_lock = Arc::new(RwLock::new(dsl));
dsl_lock.read().unwrap().connect(dsl_lock.clone());
} else {
panic!("unable to parse downstream address: {} {}", k, downstream.address)
}
}
}
pub struct DownstreamConnector {
pub downstream: Downstream,
}
impl DownstreamConnector {
pub(crate) fn session_shutdown(&self, myself: Arc<RwLock<DownstreamConnector>>, _id: usize, _flags: u64) {
self.connect(myself);
}
pub fn connect(&self, myself: Arc<RwLock<DownstreamConnector>>) {
let addr = self.downstream.address.parse::<SocketAddr>().unwrap();
info!("downstream connecting... {}", addr);
let m = get_request_connect(&self.downstream);
let stream = TcpStream::connect(&addr).map(move|sock| {
info!("downstream connected {}", addr);
sock.set_keepalive(Some(Duration::from_secs(30))).ok();
sock.set_recv_buffer_size(CONFIG.request_client_buffer).ok();
sock.set_send_buffer_size(CONFIG.request_client_buffer).ok();
let mut session = StompSession::new();
session.set_flag(FLAG_DOWNSTREAM);
session.downstream_connector = Some(myself.clone());
let session = Arc::new(RwLock::new(session));
let timeout_session = session.clone();
let task = InterruptibleInterval::new(session.clone())
.for_each(move |_| {
timeout_session.write().unwrap().timeout()
})
.map_err(|_| {});
tokio::spawn(task);
let mut s = session.write().unwrap();
let (reader, writer) = s.split(sock, session.clone());
let read_killer = Arc::new(RwLock::new(ReadKiller::new()));
let read_killer_cpy = read_killer.clone();
tokio::spawn(writer);
tokio::spawn(read_wrapper(read_killer, reader));
s.set_read_killer(read_killer_cpy);
s.send_message(m);
Ok(()) as Result<(), ()>
}).map_err(|e| {
error!("downstream error {}", e);
}).flatten();
tokio::spawn(stream);
}
}
#[derive(Debug)]
pub struct ConnectOrchestratorFilter {
}
impl ConnectOrchestratorFilter {
pub fn new()-> ConnectOrchestratorFilter {
ConnectOrchestratorFilter {
}
}
}
impl MessageFilter for ConnectOrchestratorFilter {
fn init(&mut self) {
}
fn do_filter(&self, context: &mut Context, message: &mut StompMessage) -> Result<bool, FilterError> {
if message.command == StompCommand::Connected && context.is_downstream() {
info!("downstream CONNECTED");
let downstream: &Downstream = CONFIG.downstreams.get(&context.downstream()).unwrap();
if let Ok(session) = context.session.read() {
for remote_destination in &downstream.destinations {
debug!("subscribing to {}", remote_destination);
session.send_message(get_request_subscribe(remote_destination, downstream));
}
return Ok(HANDLED);
}
}
else if message.command == StompCommand::Receipt && context.is_downstream() {
if let Some(receipt) = message.get_header("receipt") {
if receipt.starts_with("sub|") {
let receipt_fields: Vec<&str> = receipt.split("|").collect();
info!("downstream SUBSCRIBED {}", receipt_fields[2]);
}
}
}
else if message.command == StompCommand::Error && context.is_downstream() {
if let Some(error_message) = message.get_header("message") {
error!("downstream error for {}, {}", context.downstream(), error_message);
} else {
error!("downstream error for {}", context.downstream());
}
return Ok(HANDLED);
}
Ok(CONTINUE)
}
}