use std::sync::{Arc, RwLock};
use std::time::{Instant, Duration};
use log::*;
use tokio::prelude::*;
use tokio::net::TcpListener;
use tokio::timer::Interval;
use crate::init::CONFIG;
use crate::init::SERVER;
use crate::init::USER;
use crate::session::reader::{Reader, ReadKiller};
use crate::session::stomp_session::StompSession;
use crate::workflow::destination::destination::Destination;
use crate::session::interruptible_interval::InterruptibleInterval;
use crate::downstream::downstream_bootstrap;
pub fn romp_bootstrap() -> BootstrapFuture {
let server_name = CONFIG.name();
info!("starting tokio for: {}", server_name);
BootstrapFuture {
}
}
pub struct BootstrapFuture {
}
impl Future for BootstrapFuture {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
USER.len();
debug!("starting destination server");
for (_name, destination) in SERVER.iter() {
tokio::spawn(destination_wrapper(destination.clone()));
let mut expiry;
{
expiry = destination.read().unwrap().expiry();
}
let dest_clone = destination.clone();
if expiry < 60000 {
expiry = 60000;
}
let task = Interval::new(Instant::now(), Duration::from_millis(expiry))
.for_each(move |_| {
dest_clone.write().unwrap().timeout_messages()
} )
.map_err(|_| {});
tokio::spawn(task);
}
let ticker = Interval::new(Instant::now(), Duration::from_millis(60000))
.for_each(|_| {
SERVER.tick()
})
.map_err(|_| {});
tokio::spawn(ticker);
downstream_bootstrap();
for socket_address in CONFIG.socket_addresses.iter() {
let address = socket_address.parse().unwrap();
let listener = TcpListener::bind(&address)
.expect("unable to bind TCP listener");
info!("listening on: {}", socket_address);
let server = listener.incoming()
.map_err(|e| warn!("accept failed = {:?}", e))
.for_each(|sock| {
if SERVER.is_shutdown() {
return Err(());
} else {
debug!("connection");
}
let session = Arc::new(RwLock::new(StompSession::new()));
let timeout_session = session.clone();
let task = InterruptibleInterval::new(session.clone())
.for_each(move |_| {
timeout_session.write().unwrap().timeout()
})
.map_err(|_| {});
tokio::spawn(task);
let mut s = session.write().unwrap();
let (reader, writer) = s.split(sock, session.clone());
let read_killer = Arc::new(RwLock::new(ReadKiller::new()));
let read_killer_cpy = read_killer.clone();
tokio::spawn(writer);
tokio::spawn(read_wrapper(read_killer, reader));
s.set_read_killer(read_killer_cpy);
Ok(())
});
tokio::spawn(server);
}
Err(())
}
}
pub(crate) struct ReadWrapper {
read_killer: Arc<RwLock<ReadKiller>>,
reader: Reader,
}
pub(crate) fn read_wrapper(read_killer: Arc<RwLock<ReadKiller>>, reader: Reader) -> ReadWrapper {
ReadWrapper {
read_killer,
reader,
}
}
impl Future for ReadWrapper {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
debug!("read wrapper polled");
{
match self.read_killer.write().unwrap().poll() {
Err(_) => {
match self.reader.poll() {
Err(_) => {},
_ => debug!("read killer unexpected return"),
}
debug!("read wrapper closed");
return Err(());
},
_ => {}
}
}
match self.reader.poll() {
Ok(Async::Ready(())) => Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => {
debug!("read wrapper closed");
return Err(());
},
}
}
}
struct DestinationFuture {
destination: Arc<RwLock<Destination>>,
first_poll: bool,
}
fn destination_wrapper(destination: Arc<RwLock<Destination>>) -> DestinationFuture {
DestinationFuture {
destination,
first_poll: true,
}
}
impl Future for DestinationFuture {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
debug!("destination future wrapper polled");
if self.first_poll {
self.first_poll = false;
{
let mut d = self.destination.write().unwrap();
d.set_task(futures::task::current());
}
}
match self.destination.write().unwrap().poll() {
true => {
Ok(Async::NotReady)
},
false => {
debug!("destination future closed");
Err(())
},
}
}
}