use pool::{self, Lifecycle, Pool, MAX_FUTURES};
use task::Task;
use std::sync::atomic::Ordering::{AcqRel, Acquire};
use std::sync::Arc;
use futures::{future, Future};
use tokio_executor::{self, SpawnError};
#[derive(Debug)]
pub struct Sender {
pub(crate) pool: Arc<Pool>,
}
impl Sender {
pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
where
F: Future<Item = (), Error = ()> + Send + 'static,
{
let mut s = self;
tokio_executor::Executor::spawn(&mut s, Box::new(future))
}
fn prepare_for_spawn(&self) -> Result<(), SpawnError> {
let mut state: pool::State = self.pool.state.load(Acquire).into();
loop {
let mut next = state;
if next.num_futures() == MAX_FUTURES {
return Err(SpawnError::at_capacity());
}
if next.lifecycle() == Lifecycle::ShutdownNow {
return Err(SpawnError::shutdown());
}
next.inc_num_futures();
let actual = self
.pool
.state
.compare_and_swap(state.into(), next.into(), AcqRel)
.into();
if actual == state {
trace!("execute; count={:?}", next.num_futures());
break;
}
state = actual;
}
Ok(())
}
}
impl tokio_executor::Executor for Sender {
fn status(&self) -> Result<(), tokio_executor::SpawnError> {
let s = self;
tokio_executor::Executor::status(&s)
}
fn spawn(
&mut self,
future: Box<dyn Future<Item = (), Error = ()> + Send>,
) -> Result<(), SpawnError> {
let mut s = &*self;
tokio_executor::Executor::spawn(&mut s, future)
}
}
impl<'a> tokio_executor::Executor for &'a Sender {
fn status(&self) -> Result<(), tokio_executor::SpawnError> {
let state: pool::State = self.pool.state.load(Acquire).into();
if state.num_futures() == MAX_FUTURES {
return Err(SpawnError::at_capacity());
}
if state.lifecycle() == Lifecycle::ShutdownNow {
return Err(SpawnError::shutdown());
}
Ok(())
}
fn spawn(
&mut self,
future: Box<dyn Future<Item = (), Error = ()> + Send>,
) -> Result<(), SpawnError> {
self.prepare_for_spawn()?;
let task = Arc::new(Task::new(future));
self.pool.submit_external(task, &self.pool);
Ok(())
}
}
impl<T> tokio_executor::TypedExecutor<T> for Sender
where
T: Future<Item = (), Error = ()> + Send + 'static,
{
fn status(&self) -> Result<(), tokio_executor::SpawnError> {
tokio_executor::Executor::status(self)
}
fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
tokio_executor::Executor::spawn(self, Box::new(future))
}
}
impl<T> future::Executor<T> for Sender
where
T: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
if let Err(e) = tokio_executor::Executor::status(self) {
let kind = if e.is_at_capacity() {
future::ExecuteErrorKind::NoCapacity
} else {
future::ExecuteErrorKind::Shutdown
};
return Err(future::ExecuteError::new(kind, future));
}
let _ = self.spawn(future);
Ok(())
}
}
impl Clone for Sender {
#[inline]
fn clone(&self) -> Sender {
let pool = self.pool.clone();
Sender { pool }
}
}