use std::cell::RefCell;
use std::{fmt, thread};
use std::sync::Arc;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use super::{Value, Error, Version, Id, SyncOutput, SyncResponse, Output, Success, Failure, MethodResult};
pub type Res = Result<Value, Error>;
#[derive(Debug, Clone)]
pub struct AsyncResult {
result: Arc<Mutex<AsyncResultHandler>>,
}
impl AsyncResult {
pub fn new() -> (AsyncResult, Ready) {
let res = Arc::new(Mutex::new(AsyncResultHandler::default()));
(AsyncResult { result: res.clone() }, Ready { result: res })
}
pub fn on_result<F>(self, f: F) -> bool where F: FnOnce(Res) + Send + 'static {
self.result.lock().on_result(f)
}
pub fn await(self) -> Res {
{
let mut result = self.result.lock();
if let Some(res) = result.try_result() {
return res;
}
let current = thread::current();
result.notify(move || {
current.unpark();
});
}
loop {
if let Some(res) = self.result.lock().try_result() {
return res;
}
thread::park();
}
}
}
impl Into<MethodResult> for AsyncResult {
fn into(self) -> MethodResult {
let result = self.result.lock().try_result();
match result {
Some(result) => MethodResult::Sync(result),
None => MethodResult::Async(self)
}
}
}
#[derive(Debug, Clone)]
pub struct Ready {
result: Arc<Mutex<AsyncResultHandler>>,
}
impl Ready {
pub fn ready(self, result: Result<Value, Error>) {
self.result.lock().set_result(result);
}
}
enum ListenerType {
Result(Box<FnMut(Res) + Send>),
Notify(Box<FnMut() + Send>),
}
#[derive(Default)]
struct AsyncResultHandler {
result: Option<Res>,
listener: Option<ListenerType>,
}
impl fmt::Debug for AsyncResultHandler {
fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(formatter, "AsyncResult: {:?}", self.result)
}
}
impl AsyncResultHandler {
pub fn set_result(&mut self, res: Res) {
match self.listener.take() {
Some(ListenerType::Result(mut listener)) => listener(res),
Some(ListenerType::Notify(mut notifier)) => {
self.result = Some(res);
notifier();
},
None => {
self.result = Some(res);
}
}
}
pub fn on_result<F>(&mut self, f: F) -> bool where F: FnOnce(Res) + Send + 'static {
if let Some(result) = self.result.take() {
f(result);
true
} else {
let listener = RefCell::new(Some(f));
self.listener = Some(ListenerType::Result(Box::new(move |res| {
listener.borrow_mut().take().unwrap()(res);
})));
false
}
}
fn notify<F>(&mut self, f: F) where F: FnOnce() + Send + 'static {
assert!(self.result.is_none());
let notifier = RefCell::new(Some(f));
self.listener = Some(ListenerType::Notify(Box::new(move || {
notifier.borrow_mut().take().unwrap()();
})));
}
fn try_result(&mut self) -> Option<Res> {
self.result.take()
}
}
#[derive(Debug)]
pub struct AsyncOutput {
result: AsyncResult,
id: Id,
jsonrpc: Version,
}
impl AsyncOutput {
pub fn on_result<F>(self, f: F) -> bool where F: FnOnce(SyncOutput) + Send + 'static {
let id = self.id;
let jsonrpc = self.jsonrpc;
self.result.on_result(move |result| {
f(SyncOutput::from(result, id, jsonrpc))
})
}
pub fn await(self) -> SyncOutput {
let result = self.result.await();
SyncOutput::from(result, self.id, self.jsonrpc)
}
pub fn from(result: AsyncResult, id: Id, jsonrpc: Version) -> Self {
AsyncOutput {
result: result,
id: id,
jsonrpc: jsonrpc,
}
}
}
#[derive(Debug)]
pub enum Response {
Single(Output),
Batch(Vec<Output>),
}
impl Response {
pub fn await(self) -> SyncResponse {
match self {
Response::Single(Output::Sync(output)) => SyncResponse::Single(output),
Response::Single(Output::Async(output)) => SyncResponse::Single(output.await()),
Response::Batch(outputs) => {
let mut res = Vec::new();
for output in outputs.into_iter() {
match output {
Output::Sync(output) => res.push(output),
Output::Async(output) => res.push(output.await()),
};
}
SyncResponse::Batch(res)
},
}
}
pub fn on_result<F>(self, f: F) -> bool where F: FnOnce(SyncResponse) + Send + 'static {
match self {
Response::Single(Output::Sync(output)) => {
f(SyncResponse::Single(output));
true
},
Response::Single(Output::Async(output)) => {
output.on_result(move |res| f(SyncResponse::Single(res)))
},
Response::Batch(outputs) => {
let mut async = true;
let mut count = 0;
for output in &outputs {
if let Output::Async(_) = *output {
async = true;
count += 1;
}
}
let callback = Arc::new(Mutex::new(Some(f)));
let responses = Arc::new(Mutex::new(Some(Vec::new())));
let count = Arc::new(AtomicUsize::new(count));
for output in outputs {
match output {
Output::Async(output) => {
let count = count.clone();
let callback = callback.clone();
let responses = responses.clone();
output.on_result(move |res| {
{
let mut response = responses.lock();
response.as_mut().expect("Callback called only once.").push(res);
}
let result = count.fetch_sub(1, Ordering::Relaxed);
if result == 1 {
let callback = callback.lock().take().expect("Callback called only once.");
let responses = responses.lock().take().expect("Callback called only once.");
callback(SyncResponse::Batch(responses));
}
});
},
Output::Sync(output) => {
let mut res = responses.lock();
res.as_mut().expect("Callback called only once").push(output);
},
}
}
if !async {
let responses = responses.lock().take().expect("Callback called only once.");
let callback = callback.lock().take().expect("Callback called only once.");
callback(SyncResponse::Batch(responses));
true
} else {
count.load(Ordering::Relaxed) == 0
}
}
}
}
}
impl From<Failure> for Response {
fn from(res: Failure) -> Self {
Response::Single(Output::Sync(SyncOutput::Failure(res)))
}
}
impl From<Success> for Response {
fn from(res: Success) -> Self {
Response::Single(Output::Sync(SyncOutput::Success(res)))
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use super::super::*;
#[test]
fn should_wait_for_all_results_in_batch() {
let (res1, ready1) = AsyncResult::new();
let (res2, ready2) = AsyncResult::new();
let output1 = AsyncOutput::from(res1.clone(), Id::Null, Version::V2);
let output2 = AsyncOutput::from(res2.clone(), Id::Null, Version::V2);
let response = Response::Batch(vec![Output::Async(output1), Output::Async(output2)]);
let val = Arc::new(Mutex::new(None));
let v = val.clone();
assert!(!response.on_result(move |result| {
*v.lock().unwrap() = Some(result);
}));
assert_eq!(val.lock().unwrap().is_none(), true);
ready1.ready(Ok(Value::U64(1)));
assert_eq!(val.lock().unwrap().is_none(), true);
ready2.ready(Ok(Value::U64(2)));
assert_eq!(val.lock().unwrap().is_none(), false);
assert_eq!(val.lock().unwrap().as_ref().unwrap(), &SyncResponse::Batch(vec![
SyncOutput::Success(Success { result: Value::U64(1), id: Id::Null, jsonrpc: Version::V2 }),
SyncOutput::Success(Success { result: Value::U64(2), id: Id::Null, jsonrpc: Version::V2 }),
]));
}
#[test]
fn should_call_on_result_if_available() {
let (res, ready) = AsyncResult::new();
let output = AsyncOutput::from(res.clone(), Id::Null, Version::V2);
ready.ready(Ok(Value::String("hello".into())));
let val = Arc::new(AtomicBool::new(false));
let v = val.clone();
assert!(output.on_result(move |_| { v.store(true, Ordering::Relaxed) }));
assert_eq!(val.load(Ordering::Relaxed), true);
}
#[test]
fn should_wait_for_output() {
let (res, ready) = AsyncResult::new();
let output = AsyncOutput::from(res.clone(), Id::Null, Version::V2);
thread::spawn(move || {
ready.ready(Ok(Value::String("hello".into())));
});
assert_eq!(output.await(), SyncOutput::Success(Success {
id: Id::Null,
jsonrpc: Version::V2,
result: Value::String("hello".into()),
}));
}
#[test]
fn should_return_output_if_available() {
let (res, ready) = AsyncResult::new();
let output = AsyncOutput::from(res.clone(), Id::Null, Version::V2);
ready.ready(Ok(Value::String("hello".into())));
assert_eq!(output.await(), SyncOutput::Success(Success {
id: Id::Null,
jsonrpc: Version::V2,
result: Value::String("hello".into()),
}));
}
}