mod builder;
mod context;
mod executor;
mod stream;
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use crossbeam_channel::{Receiver, Sender};
use executor::{Task, TaskId};
use futures::task::AtomicWaker;
use futures::{FutureExt as _, Stream, StreamExt as _};
use slab::Slab;
use stream::CommandStreamExt as _;
pub use builder::{NotificationBuilder, RequestBuilder, StreamBuilder};
pub use context::CommandContext;
pub use executor::AbortHandle;
pub use stream::CommandOutput;
use crate::Request;
use crate::capability::Operation;
#[must_use = "Unused commands never execute. Return the command from your app's update function or combine it with other commands with Command::and or Command::all"]
pub struct Command<Effect, Event> {
effects: Receiver<Effect>,
events: Receiver<Event>,
context: CommandContext<Effect, Event>,
ready_queue: Receiver<TaskId>,
spawn_queue: Receiver<Task>,
tasks: Slab<Task>,
ready_sender: Sender<TaskId>, waker: Arc<AtomicWaker>,
aborted: Arc<AtomicBool>,
}
impl<Effect, Event> Command<Effect, Event>
where
Effect: Send + 'static,
Event: Send + 'static,
{
pub fn new<F, Fut>(create_task: F) -> Self
where
F: FnOnce(CommandContext<Effect, Event>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let (effect_sender, effect_receiver) = crossbeam_channel::unbounded();
let (event_sender, event_receiver) = crossbeam_channel::unbounded();
let (ready_sender, ready_receiver) = crossbeam_channel::unbounded();
let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded();
let (_, waker_receiver) = crossbeam_channel::unbounded();
let context = context::CommandContext {
effects: effect_sender,
events: event_sender,
tasks: spawn_sender,
rc: Arc::default(),
};
let aborted: Arc<AtomicBool> = Arc::default();
let task = Task {
finished: Arc::default(),
aborted: aborted.clone(),
future: create_task(context.clone()).boxed(),
join_handle_wakers: waker_receiver,
};
let mut tasks = Slab::with_capacity(1);
let task_id = TaskId(tasks.insert(task));
ready_sender
.send(task_id)
.expect("Could not make task ready, ready channel disconnected");
Command {
effects: effect_receiver,
events: event_receiver,
context,
ready_queue: ready_receiver,
spawn_queue: spawn_receiver,
ready_sender,
tasks,
waker: Arc::default(),
aborted,
}
}
pub fn done() -> Self {
Command::new(|_ctx| futures::future::ready(()))
}
pub fn from<Ef, Ev>(subcmd: Command<Ef, Ev>) -> Self
where
Ef: Send + 'static + Into<Effect> + Unpin,
Ev: Send + 'static + Into<Event> + Unpin,
Effect: Unpin,
Event: Unpin,
{
subcmd.map_effect(Into::into).map_event(Into::into)
}
pub fn into<Ef, Ev>(self) -> Command<Ef, Ev>
where
Ef: Send + 'static + Unpin,
Ev: Send + 'static + Unpin,
Effect: Unpin + Into<Ef>,
Event: Unpin + Into<Ev>,
{
self.map_effect(Into::into).map_event(Into::into)
}
pub fn event(event: Event) -> Self {
Command::new(|ctx| futures::future::lazy(move |_| ctx.send_event(event)))
}
pub fn notify_shell<Op>(
operation: Op,
) -> builder::NotificationBuilder<Effect, Event, impl Future<Output = ()>>
where
Op: Operation,
Effect: From<Request<Op>>,
{
builder::NotificationBuilder::new(|ctx| {
futures::future::lazy(move |_| ctx.notify_shell(operation))
})
}
pub fn request_from_shell<Op>(
operation: Op,
) -> builder::RequestBuilder<Effect, Event, impl Future<Output = Op::Output>>
where
Op: Operation,
Effect: From<Request<Op>>,
{
builder::RequestBuilder::new(|ctx| ctx.request_from_shell(operation))
}
pub fn stream_from_shell<Op>(
operation: Op,
) -> builder::StreamBuilder<Effect, Event, impl Stream<Item = Op::Output>>
where
Op: Operation,
Effect: From<Request<Op>>,
{
builder::StreamBuilder::new(|ctx| ctx.stream_from_shell(operation))
}
pub fn is_done(&mut self) -> bool {
self.run_until_settled();
let all_context_dropped = Arc::strong_count(&self.context.rc) == 1;
all_context_dropped
&& self.effects.is_empty()
&& self.events.is_empty()
&& self.tasks.is_empty()
}
pub fn effects(&mut self) -> impl Iterator<Item = Effect> + '_ {
self.run_until_settled();
self.effects.try_iter()
}
pub fn events(&mut self) -> impl Iterator<Item = Event> + '_ {
self.run_until_settled();
self.events.try_iter()
}
pub fn into_future(self, ctx: CommandContext<Effect, Event>) -> impl Future<Output = ()>
where
Effect: Unpin + Send + 'static,
Event: Unpin + Send + 'static,
{
self.host(ctx.effects, ctx.events).map(|_| ())
}
pub fn then(self, other: Self) -> Self
where
Effect: Unpin,
Event: Unpin,
{
Command::new(|ctx| {
self.into_future(ctx.clone())
.then(|()| other.into_future(ctx))
})
}
pub fn and(mut self, other: Self) -> Self
where
Effect: Unpin,
Event: Unpin,
{
self.spawn(|ctx| other.into_future(ctx));
self
}
pub fn all<I>(commands: I) -> Self
where
I: IntoIterator<Item = Self>,
Effect: Unpin,
Event: Unpin,
{
let mut command = Command::done();
for c in commands {
command.spawn(|ctx| c.into_future(ctx));
}
command
}
pub fn map_effect<F, NewEffect>(self, map: F) -> Command<NewEffect, Event>
where
F: Fn(Effect) -> NewEffect + Send + Sync + 'static,
NewEffect: Send + Unpin + 'static,
Effect: Unpin,
Event: Unpin,
{
Command::new(move |ctx| {
self.map(move |output| match output {
CommandOutput::Effect(effect) => CommandOutput::Effect(map(effect)),
CommandOutput::Event(event) => CommandOutput::Event(event),
})
.host(ctx.effects, ctx.events)
.map(|_| ())
})
}
pub fn map_event<F, NewEvent>(self, map: F) -> Command<Effect, NewEvent>
where
F: Fn(Event) -> NewEvent + Send + Sync + 'static,
NewEvent: Send + Unpin + 'static,
Effect: Unpin,
Event: Unpin,
{
Command::new(move |ctx| {
self.map(move |output| match output {
CommandOutput::Effect(effect) => CommandOutput::Effect(effect),
CommandOutput::Event(event) => CommandOutput::Event(map(event)),
})
.host(ctx.effects, ctx.events)
.map(|_| ())
})
}
pub fn spawn<F, Fut>(&mut self, create_task: F)
where
F: FnOnce(CommandContext<Effect, Event>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
self.context.spawn(create_task);
}
#[must_use]
pub fn abort_handle(&self) -> AbortHandle {
AbortHandle {
aborted: self.aborted.clone(),
}
}
}
impl<Effect, Event> FromIterator<Command<Effect, Event>> for Command<Effect, Event>
where
Effect: Send + Unpin + 'static,
Event: Send + Unpin + 'static,
{
fn from_iter<I: IntoIterator<Item = Command<Effect, Event>>>(iter: I) -> Self {
Command::all(iter)
}
}
#[cfg(test)]
mod tests;