use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use disk::fs::FileSystem;
use disk::{IDiskMessage, ODiskMessage};
use disk::tasks;
use disk::tasks::context::DiskManagerContext;
use disk::builder::DiskManagerBuilder;
use crossbeam::sync::MsQueue;
use futures::task::{self, Task};
use futures::sync::mpsc::{self, Receiver};
use futures::{StartSend, Poll, Stream, Sink, AsyncSink, Async};
use futures_cpupool::{CpuPool};
pub struct DiskManager<F> {
sink: DiskManagerSink<F>,
stream: DiskManagerStream
}
impl<F> DiskManager<F> {
pub fn from_builder(mut builder: DiskManagerBuilder, fs: F) -> DiskManager<F> {
let cur_sink_capacity = Arc::new(AtomicUsize::new(0));
let sink_capacity = builder.sink_buffer_capacity();
let stream_capacity = builder.stream_buffer_capacity();
let pool_builder = builder.worker_config();
let (out_send, out_recv) = mpsc::channel(stream_capacity);
let context = DiskManagerContext::new(out_send, fs);
let task_queue = Arc::new(MsQueue::new());
let sink = DiskManagerSink::new(pool_builder.create(), context, sink_capacity, cur_sink_capacity.clone(),
task_queue.clone());
let stream = DiskManagerStream::new(out_recv, cur_sink_capacity, task_queue.clone());
DiskManager{ sink: sink, stream: stream }
}
pub fn into_parts(self) -> (DiskManagerSink<F>, DiskManagerStream) {
(self.sink, self.stream)
}
}
impl<F> Sink for DiskManager<F> where F: FileSystem + Send + Sync + 'static {
type SinkItem = IDiskMessage;
type SinkError = ();
fn start_send(&mut self, item: IDiskMessage) -> StartSend<IDiskMessage, ()> {
self.sink.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), ()> {
self.sink.poll_complete()
}
}
impl<F> Stream for DiskManager<F> {
type Item = ODiskMessage;
type Error = ();
fn poll(&mut self) -> Poll<Option<ODiskMessage>, ()> {
self.stream.poll()
}
}
pub struct DiskManagerSink<F> {
pool: CpuPool,
context: DiskManagerContext<F>,
max_capacity: usize,
cur_capacity: Arc<AtomicUsize>,
task_queue: Arc<MsQueue<Task>>
}
impl<F> Clone for DiskManagerSink<F> {
fn clone(&self) -> DiskManagerSink<F> {
DiskManagerSink{ pool: self.pool.clone(), context: self.context.clone(), max_capacity: self.max_capacity,
cur_capacity: self.cur_capacity.clone(), task_queue: self.task_queue.clone() }
}
}
impl<F> DiskManagerSink<F> {
fn new(pool: CpuPool, context: DiskManagerContext<F>, max_capacity: usize,
cur_capacity: Arc<AtomicUsize>, task_queue: Arc<MsQueue<Task>>) -> DiskManagerSink<F> {
DiskManagerSink{ pool: pool, context: context, max_capacity: max_capacity,
cur_capacity: cur_capacity, task_queue: task_queue }
}
fn try_submit_work(&self) -> bool {
let cur_capacity = self.cur_capacity.fetch_add(1, Ordering::SeqCst);
if cur_capacity < self.max_capacity {
true
} else {
self.cur_capacity.fetch_sub(1, Ordering::SeqCst);
false
}
}
}
impl<F> Sink for DiskManagerSink<F> where F: FileSystem + Send + Sync + 'static {
type SinkItem = IDiskMessage;
type SinkError = ();
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
info!("Starting Send For DiskManagerSink With IDiskMessage");
if self.try_submit_work() {
info!("DiskManagerSink Submitted Work On First Attempt");
tasks::execute_on_pool(item, &self.pool, self.context.clone());
return Ok(AsyncSink::Ready)
}
info!("DiskManagerSink Failed To Submit Work On First Attempt, Adding Task To Queue");
self.task_queue.push(task::current());
if self.try_submit_work() {
info!("DiskManagerSink Submitted Work On Second Attempt");
tasks::execute_on_pool(item, &self.pool, self.context.clone());
return Ok(AsyncSink::Ready)
} else {
Ok(AsyncSink::NotReady(item))
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
Ok(Async::Ready(()))
}
}
pub struct DiskManagerStream {
recv: Receiver<ODiskMessage>,
cur_capacity: Arc<AtomicUsize>,
task_queue: Arc<MsQueue<Task>>
}
impl DiskManagerStream {
fn new(recv: Receiver<ODiskMessage>, cur_capacity: Arc<AtomicUsize>, task_queue: Arc<MsQueue<Task>>) -> DiskManagerStream {
DiskManagerStream{ recv: recv, cur_capacity: cur_capacity, task_queue: task_queue }
}
fn complete_work(&self) {
self.cur_capacity.fetch_sub(1, Ordering::SeqCst);
}
}
impl Stream for DiskManagerStream {
type Item = ODiskMessage;
type Error = ();
fn poll(&mut self) -> Poll<Option<ODiskMessage>, ()> {
info!("Polling DiskManagerStream For ODiskMessage");
match self.recv.poll() {
res @ Ok(Async::Ready(Some(ODiskMessage::TorrentAdded(_)))) |
res @ Ok(Async::Ready(Some(ODiskMessage::TorrentRemoved(_)))) |
res @ Ok(Async::Ready(Some(ODiskMessage::TorrentSynced(_)))) |
res @ Ok(Async::Ready(Some(ODiskMessage::BlockLoaded(_)))) |
res @ Ok(Async::Ready(Some(ODiskMessage::BlockProcessed(_)))) => {
self.complete_work();
info!("Notifying DiskManager That We Can Submit More Work");
loop {
match self.task_queue.try_pop() {
Some(task) => task.notify(),
None => { break; }
}
}
res
},
other => other
}
}
}