#![allow(dead_code)]
use mio::Token;
use futures::prelude::*;
use futures::executor::{Notify, Spawn, spawn};
use futures::task::{self, Task};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use slab::Slab;
use std::collections::{HashSet, VecDeque};
use sozu_command::proxy::{ProxyRequest, ProxyResponse, ProxyResponseStatus};
use sozu_command::command::CommandResponse;
use super::FrontToken;
lazy_static! {
static ref EXECUTOR: Arc<Executor> = {
Arc::new(Executor {
to_notify: Mutex::new(HashMap::new()),
inner: Mutex::new(Runner::new()),
messages: Mutex::new(HashMap::new()),
worker_queue: Mutex::new(VecDeque::new()),
client_queue: Mutex::new(VecDeque::new()),
state_queue: Mutex::new(VecDeque::new()),
})
};
}
pub struct Executor {
pub inner: Mutex<Runner>,
pub to_notify: Mutex<HashMap<(Token, String, MessageStatus), Task>>,
pub messages: Mutex<HashMap<(Token, String, MessageStatus), ProxyResponse>>,
pub worker_queue: Mutex<VecDeque<(Token, ProxyRequest)>>,
pub client_queue: Mutex<VecDeque<(FrontToken, CommandResponse)>>,
pub state_queue: Mutex<VecDeque<StateChange>>,
}
pub struct Runner {
pub ready: HashSet<usize>,
pub tasks: Slab<Spawn<Box<Future<Item = (), Error = ()> + Send>>>,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub enum MessageStatus {
Processing,
Other,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub enum StateChange {
StopWorker(Token),
StopMaster,
}
impl Runner {
pub fn new() -> Runner {
Runner {
ready: HashSet::new(),
tasks: Slab::with_capacity(10000),
}
}
pub fn run(&mut self) {
for id in self.ready.drain() {
if self.tasks.contains(id) {
match self.tasks[id].poll_future_notify(&(EXECUTOR.clone()), id) {
Err(e) => {
error!("error executing future[{}]: {:?}", id, e);
self.tasks.remove(id);
},
Ok(Async::Ready(())) => {
trace!("finished executing future[{}]", id);
self.tasks.remove(id);
},
Ok(Async::NotReady) => {
},
}
}
}
}
}
impl Executor {
pub fn register(worker: Token, message_id: &str, status: MessageStatus, task: Task) {
let mut to_notify = EXECUTOR.to_notify.lock().unwrap();
to_notify.insert((worker, message_id.to_string(), status), task);
}
pub fn send_worker(worker: Token, message: ProxyRequest) {
let mut queue = EXECUTOR.worker_queue.lock().unwrap();
queue.push_back((worker, message));
}
pub fn get_worker_message() -> Option<(Token, ProxyRequest)> {
let mut queue = EXECUTOR.worker_queue.lock().unwrap();
queue.pop_front()
}
pub fn send_client(client: FrontToken, message: CommandResponse) {
let mut queue = EXECUTOR.client_queue.lock().unwrap();
queue.push_back((client, message));
}
pub fn get_client_message() -> Option<(FrontToken, CommandResponse)> {
let mut queue = EXECUTOR.client_queue.lock().unwrap();
queue.pop_front()
}
pub fn stop_worker(worker: Token) {
let mut queue = EXECUTOR.state_queue.lock().unwrap();
queue.push_back(StateChange::StopWorker(worker));
}
pub fn stop_master() {
let mut queue = EXECUTOR.state_queue.lock().unwrap();
queue.push_back(StateChange::StopMaster);
}
pub fn get_state_change() -> Option<StateChange> {
let mut queue = EXECUTOR.state_queue.lock().unwrap();
queue.pop_front()
}
pub fn handle_message(worker: Token, message: ProxyResponse) {
trace!("executor handle_message({:?}, {}, {:?})", worker, message.id, message);
let status = match message.status {
ProxyResponseStatus::Processing => MessageStatus::Processing,
_ => MessageStatus::Other
};
let task = {
let mut to_notify = EXECUTOR.to_notify.lock().unwrap();
match to_notify.remove(&(worker, message.id.to_string(), status)) {
None => {
trace!("no task waiting for {}", message.id);
None
},
Some(task) => {
Some(task.clone())
}
}
};
{
let mut messages = EXECUTOR.messages.lock().unwrap();
trace!("inserting message({:?}, {}", worker, message.id);
messages.insert((worker, message.id.to_string(), status), message);
}
if let Some(t) = task {
t.notify();
}
}
pub fn execute(s: impl Future<Item = (), Error = ()> + Send + 'static) {
let mut inner = EXECUTOR.inner.lock().unwrap();
if let Ok(id) = inner.tasks.insert(spawn(Box::new(s))) {
inner.ready.insert(id);
}
}
pub fn run() {
let mut inner = EXECUTOR.inner.lock().unwrap();
inner.run();
}
pub fn get_message(worker: Token, message_id: &str, status: MessageStatus) -> Option<ProxyResponse> {
{
let mut messages = EXECUTOR.messages.lock().unwrap();
messages.remove(&(worker, message_id.to_string(), status))
}
}
pub fn peek_message(worker: Token, message_id: &str, status: MessageStatus) -> bool {
{
let messages = EXECUTOR.messages.lock().unwrap();
messages.contains_key(&(worker, message_id.to_string(), status))
}
}
}
impl Notify for Executor {
fn notify(&self, id: usize) {
let mut inner = self.inner.lock().unwrap();
inner.ready.insert(id);
}
}
pub struct FutureAnswer {
worker_id: Token,
message_id: String,
}
impl Future for FutureAnswer {
type Item = ProxyResponse;
type Error = String;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match Executor::get_message(self.worker_id, &self.message_id, MessageStatus::Other) {
Some(message) => {
match message.status {
ProxyResponseStatus::Ok => Ok(Async::Ready(message)),
ProxyResponseStatus::Error(s) => Err(s),
_ => panic!(),
}
},
None => {
Executor::register(self.worker_id, &self.message_id, MessageStatus::Other, task::current());
Ok(Async::NotReady)
}
}
}
}
pub struct FutureProcessing {
worker_id: Token,
message_id: String,
}
impl Stream for FutureProcessing {
type Item = ProxyResponse;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if Executor::peek_message(self.worker_id, &self.message_id, MessageStatus::Other) {
return Ok(Async::Ready(None));
} else {
Executor::register(self.worker_id, &self.message_id, MessageStatus::Other, task::current());
}
match Executor::get_message(self.worker_id, &self.message_id, MessageStatus::Processing) {
Some(message) => {
match message.status {
ProxyResponseStatus::Processing => Ok(Async::Ready(Some(message))),
_ => panic!(),
}
},
None => {
Executor::register(self.worker_id, &self.message_id, MessageStatus::Processing, task::current());
Ok(Async::NotReady)
}
}
}
}
pub fn send_processing(worker_id: Token, message: ProxyRequest) -> (FutureProcessing, FutureAnswer) {
let message_id = message.id.to_string();
Executor::send_worker(worker_id, message);
(FutureProcessing {
worker_id,
message_id: message_id.clone()
},
FutureAnswer {
worker_id,
message_id
})
}
pub fn send(worker_id: Token, message: ProxyRequest) -> FutureAnswer {
let message_id = message.id.to_string();
Executor::send_worker(worker_id, message);
FutureAnswer {
worker_id,
message_id,
}
}
#[cfg(test)]
mod tests {
use mio::Token;
use super::*;
use futures::executor::spawn;
use futures::task;
use futures::future::{lazy, result};
use sozu_command::proxy::{ProxyRequestData,ProxyResponseStatus};
use sozu_command::command::CommandStatus;
#[test]
fn executor() {
Executor::execute(lazy(||{
let (processing, msg_future) = send_processing(Token(0), ProxyRequest { id: "test".to_string(), order: ProxyRequestData::Status });
processing.for_each(|msg| {
println!("TEST: got processing message: {:?}", msg);
Ok(())
}).join(msg_future.map(|msg| {
println!("TEST: future got msg: {:?}", msg);
Executor::send_client(FrontToken(1), CommandResponse::new(
"test".to_string(),
CommandStatus::Ok,
"ok".to_string(),
None
));
}).map_err(|e| {
println!("TEST: got error: {:?}", e);
})
).map(|_| ())
}));
Executor::run();
Executor::handle_message(Token(0), ProxyResponse{
id: "test".to_string(),
status: ProxyResponseStatus::Processing,
data: None
});
Executor::run();
Executor::handle_message(Token(0), ProxyResponse{
id: "test".to_string(),
status: ProxyResponseStatus::Ok,
data: None
});
Executor::run();
assert_eq!(
Executor::get_client_message(),
Some((FrontToken(1),
CommandResponse::new(
"test".to_string(),
CommandStatus::Ok,
"ok".to_string(),
None
)
))
);
}
}