extern crate futures_promises;
extern crate tokio;
use futures::sync::mpsc::Sender;
use futures::{future::Future, sink::Sink, stream::Stream, sync::mpsc::channel};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use futures_promises::promises::{Promise, PromiseHandle};
pub struct IoWriter<SendType> {
tx: futures::sync::mpsc::Sender<SendType>,
}
impl<SendType> Clone for IoWriter<SendType> {
fn clone(&self) -> Self {
IoWriter {
tx: self.tx.clone(),
}
}
}
impl<SendType> IoWriter<SendType>
where
SendType: std::marker::Send + 'static,
{
pub fn new<SinkType>(sink: SinkType) -> Self
where
SinkType: Sink<SinkItem = SendType> + Send + 'static,
{
let (tx, sink_rx) = channel::<<SinkType as Sink>::SinkItem>(10);
let sink_task = sink_rx.forward(sink.sink_map_err(|_| ())).map(|_| ());
tokio::spawn(sink_task);
IoWriter { tx }
}
pub fn write(&mut self, frame: SendType) -> PromiseHandle<()> {
let promise = Promise::new();
let handle = promise.get_handle();
tokio::spawn(self.tx.clone().send(frame).then(move |result| {
match result {
Ok(_) => promise.resolve(()),
Err(e) => {
promise.reject(format!("{}", e));
}
};
Ok::<(), ()>(())
}));
handle
}
}
pub trait Filter<SendType, ReceiveType>:
FnMut(ReceiveType, &mut IoWriter<SendType>) -> Option<ReceiveType> + std::marker::Send + 'static
{
}
impl<T, SendType, ReceiveType> Filter<SendType, ReceiveType> for T where
T: FnMut(ReceiveType, &mut IoWriter<SendType>) -> Option<ReceiveType>
+ std::marker::Send
+ 'static
{
}
pub trait ErrorHandler<ErrorType>: FnMut(ErrorType) + std::marker::Send + 'static {}
impl<T, ErrorType> ErrorHandler<ErrorType> for T where
T: FnMut(ErrorType) + std::marker::Send + 'static
{
}
pub struct IoManagerBuilder<SinkType, StreamType, F, EH>
where
SinkType: Sink,
StreamType: Stream,
F: FnMut(
<StreamType as Stream>::Item,
&mut IoWriter<<SinkType as Sink>::SinkItem>,
) -> Option<<StreamType as Stream>::Item>
+ std::marker::Send
+ 'static,
EH: FnMut(<StreamType as Stream>::Error) + std::marker::Send + 'static,
{
sink: SinkType,
stream: StreamType,
filter: Option<F>,
error_handler: Option<EH>,
}
impl<SinkType, StreamType, F, EH> IoManagerBuilder<SinkType, StreamType, F, EH>
where
SinkType: Sink + Send + 'static,
StreamType: Stream + Send + 'static,
<StreamType as Stream>::Item: Send + Clone + 'static,
<StreamType as Stream>::Error: Send,
<SinkType as Sink>::SinkItem: Send + 'static,
F: Filter<<SinkType as Sink>::SinkItem, <StreamType as Stream>::Item>,
EH: ErrorHandler<<StreamType as Stream>::Error>,
{
pub fn new(sink: SinkType, stream: StreamType) -> Self {
Self {
sink,
stream,
filter: None,
error_handler: None,
}
}
pub fn with_filter(mut self, filter: F) -> Self {
self.filter = Some(filter);
self
}
pub fn with_error_handler(mut self, handler: EH) -> Self {
self.error_handler = Some(handler);
self
}
pub fn build(self) -> IoManager<SinkType::SinkItem, StreamType::Item> {
IoManager::<SinkType::SinkItem, StreamType::Item>::constructor(
self.sink,
self.stream,
self.filter,
self.error_handler,
)
}
}
pub struct IoManager<SendType, ReceiveType> {
tx: futures::sync::mpsc::Sender<SendType>,
subscribers: Arc<Mutex<HashMap<u32, futures::sync::mpsc::Sender<ReceiveType>>>>,
next_handle: Mutex<u32>,
}
impl<SendType, ReceiveType> IoManager<SendType, ReceiveType> {
fn constructor<SinkType, StreamType, F, EH>(
sink: SinkType,
stream: StreamType,
mut filter: Option<F>,
error_handler: Option<EH>,
) -> IoManager<SinkType::SinkItem, StreamType::Item>
where
SinkType: Sink + Send + 'static,
StreamType: Stream + Send + 'static,
<StreamType as Stream>::Item: Send + Clone + 'static,
<StreamType as Stream>::Error: Send,
<SinkType as Sink>::SinkItem: Send + 'static,
F: Filter<SinkType::SinkItem, StreamType::Item>,
EH: ErrorHandler<StreamType::Error>,
{
let (sink_tx, sink_rx) = channel::<<SinkType as Sink>::SinkItem>(10);
let sink_task = sink_rx.forward(sink.sink_map_err(|_| ())).map(|_| ());
tokio::spawn(sink_task);
let mut filter_writer = IoWriter {
tx: sink_tx.clone(),
};
let subscribers = Arc::new(Mutex::new(HashMap::<
u32,
futures::sync::mpsc::Sender<<StreamType as Stream>::Item>,
>::new()));
let stream_subscribers_reference = subscribers.clone();
let stream_task = stream
.for_each(move |frame| {
let frame = match &mut filter {
None => Some(frame),
Some(function) => function(frame, &mut filter_writer),
};
match frame {
Some(frame) => {
for (_handle, tx) in stream_subscribers_reference.lock().unwrap().iter_mut()
{
match tx.start_send(frame.clone()) {
Ok(_) => {}
Err(error) => {
eprintln!("Stream Subscriber Error: {}", error);
}
};
}
}
None => {}
}
Ok(())
})
.map_err(|e| match error_handler {
None => (),
Some(mut handler) => handler(e),
});
tokio::spawn(stream_task);
IoManager {
tx: sink_tx,
subscribers,
next_handle: Mutex::new(0),
}
}
pub fn subscribe_mpsc_sender(
&self,
subscriber: futures::sync::mpsc::Sender<ReceiveType>,
) -> u32 {
let mut map = self.subscribers.lock().unwrap();
let mut handle_guard = self.next_handle.lock().unwrap();
let handle = handle_guard.clone();
*handle_guard += 1;
map.insert(handle.clone(), subscriber);
handle
}
pub fn on_receive<F>(&self, callback: F) -> u32
where
F: FnMut(ReceiveType) -> Result<(), ()> + std::marker::Send + 'static,
ReceiveType: std::marker::Send + 'static,
{
let (tx, rx) = channel::<ReceiveType>(10);
let on_frame = rx.for_each(callback).map(|_| ());
tokio::spawn(on_frame);
self.subscribe_mpsc_sender(tx)
}
pub fn extract_callback(&self, key: &u32) -> Option<Sender<ReceiveType>> {
let mut map = self.subscribers.lock().unwrap();
map.remove(key)
}
pub fn get_writer(&self) -> IoWriter<SendType> {
IoWriter {
tx: self.tx.clone(),
}
}
}
pub mod silly_aliases {
pub type DoWhenever<T, U> = crate::IoManager<T, U>;
pub type PushWhenever<T> = crate::IoWriter<T>;
}