use futures::channel::mpsc as async_mpsc;
use futures::channel::oneshot;
use futures::future::{self, BoxFuture};
use futures::prelude::*;
use std::fmt;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::Poll;
use super::{Context, JoinHandle, RUNTIME_CAT};
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
pub enum TaskState {
Error,
Flushing,
Paused,
PausedFlushing,
Prepared,
Preparing,
Started,
Stopped,
Unprepared,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Trigger {
Error,
FlushStart,
FlushStop,
Pause,
Prepare,
Start,
Stop,
Unprepare,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum TransitionOk {
Complete {
origin: TaskState,
target: TaskState,
},
NotWaiting { trigger: Trigger, origin: TaskState },
Skipped { trigger: Trigger, state: TaskState },
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TransitionError {
pub trigger: Trigger,
pub state: TaskState,
pub err_msg: gst::ErrorMessage,
}
impl fmt::Display for TransitionError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{:?} from state {:?}: {:?}",
self.trigger, self.state, self.err_msg
)
}
}
impl std::error::Error for TransitionError {}
impl From<TransitionError> for gst::ErrorMessage {
fn from(err: TransitionError) -> Self {
err.err_msg
}
}
#[must_use = "This `TransitionStatus` may be `Pending`. In most cases it should be awaited. See `await_maybe_on_context`"]
pub enum TransitionStatus {
Ready(Result<TransitionOk, TransitionError>),
Pending {
trigger: Trigger,
origin: TaskState,
res_fut: Pin<Box<dyn Future<Output = Result<TransitionOk, TransitionError>> + Send>>,
},
}
impl TransitionStatus {
pub fn is_ready(&self) -> bool {
matches!(self, TransitionStatus::Ready { .. })
}
pub fn is_pending(&self) -> bool {
matches!(self, TransitionStatus::Pending { .. })
}
pub fn check(self) -> Result<TransitionStatus, TransitionError> {
match self {
TransitionStatus::Ready(Err(err)) => Err(err),
other => Ok(other),
}
}
pub fn await_maybe_on_context(self) -> Result<TransitionOk, TransitionError> {
use TransitionStatus::*;
match self {
Pending {
trigger,
origin,
res_fut,
} => {
if let Some((ctx, task_id)) = Context::current_task() {
gst::debug!(
RUNTIME_CAT,
"Awaiting for {:?} ack in a subtask on context {}",
trigger,
ctx.name()
);
let _ = ctx.add_sub_task(task_id, async move {
let res = res_fut.await;
if res.is_ok() {
gst::log!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger);
} else {
gst::error!(RUNTIME_CAT, "Received ack {:?} for {:?}", res, trigger);
}
Ok(())
});
Ok(TransitionOk::NotWaiting { trigger, origin })
} else {
gst::debug!(
RUNTIME_CAT,
"Awaiting for {:?} ack on current thread",
trigger,
);
futures::executor::block_on(res_fut)
}
}
Ready(res) => res,
}
}
pub fn block_on(self) -> Result<TransitionOk, TransitionError> {
assert!(!Context::is_context_thread());
use TransitionStatus::*;
match self {
Pending {
trigger, res_fut, ..
} => {
gst::debug!(
RUNTIME_CAT,
"Awaiting for {:?} ack on current thread",
trigger,
);
futures::executor::block_on(res_fut)
}
Ready(res) => res,
}
}
}
impl Future for TransitionStatus {
type Output = Result<TransitionOk, TransitionError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
use TransitionStatus::*;
match &mut *self {
Ready(res) => Poll::Ready(res.clone()),
Pending { res_fut, .. } => match Pin::new(res_fut).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(res) => {
*self = Ready(res.clone());
Poll::Ready(res)
}
},
}
}
}
impl From<TransitionOk> for TransitionStatus {
fn from(ok: TransitionOk) -> Self {
Self::Ready(Ok(ok))
}
}
impl From<TransitionError> for TransitionStatus {
fn from(err: TransitionError) -> Self {
Self::Ready(Err(err))
}
}
impl fmt::Debug for TransitionStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use TransitionStatus::*;
match self {
Ready(res) => f.debug_tuple("Ready").field(res).finish(),
Pending {
trigger, origin, ..
} => f
.debug_struct("Pending")
.field("trigger", trigger)
.field("origin", origin)
.finish(),
}
}
}
pub trait TaskImpl: Send + 'static {
type Item: Send + 'static;
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
}
fn unprepare(&mut self) -> BoxFuture<'_, ()> {
future::ready(()).boxed()
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<Self::Item, gst::FlowError>>;
fn handle_item(&mut self, _item: Self::Item) -> BoxFuture<'_, Result<(), gst::FlowError>>;
fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
future::ok(()).boxed()
}
fn handle_loop_error(&mut self, err: gst::FlowError) -> BoxFuture<'_, Trigger> {
async move {
match err {
gst::FlowError::Flushing => {
gst::debug!(
RUNTIME_CAT,
"Task loop returned Flushing. Posting FlushStart"
);
Trigger::FlushStart
}
gst::FlowError::Eos => {
gst::debug!(RUNTIME_CAT, "Task loop returned Eos. Posting Stop");
Trigger::Stop
}
other => {
gst::error!(RUNTIME_CAT, "Task loop returned {:?}. Posting Error", other);
Trigger::Error
}
}
}
.boxed()
}
fn handle_action_error(
&mut self,
trigger: Trigger,
state: TaskState,
err: gst::ErrorMessage,
) -> BoxFuture<'_, Trigger> {
async move {
gst::error!(
RUNTIME_CAT,
"TaskImpl transition action error during {:?} from {:?}: {:?}. Posting Trigger::Error",
trigger,
state,
err,
);
Trigger::Error
}
.boxed()
}
}
type AckSender = oneshot::Sender<Result<TransitionOk, TransitionError>>;
type AckReceiver = oneshot::Receiver<Result<TransitionOk, TransitionError>>;
struct TriggeringEvent {
trigger: Trigger,
ack_tx: AckSender,
}
impl TriggeringEvent {
fn new(trigger: Trigger) -> (Self, AckReceiver) {
let (ack_tx, ack_rx) = oneshot::channel();
let req = TriggeringEvent { trigger, ack_tx };
(req, ack_rx)
}
fn send_ack(self, res: Result<TransitionOk, TransitionError>) {
let _ = self.ack_tx.send(res);
}
fn send_err_ack(self) {
let res = Err(TransitionError {
trigger: self.trigger,
state: TaskState::Error,
err_msg: gst::error_msg!(
gst::CoreError::StateChange,
[
"Triggering Event {:?} rejected due to a previous unrecoverable error",
self.trigger,
]
),
});
self.send_ack(res);
}
}
impl fmt::Debug for TriggeringEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TriggeringEvent")
.field("trigger", &self.trigger)
.finish()
}
}
#[derive(Debug)]
struct StateMachineHandle {
join_handle: JoinHandle<()>,
triggering_evt_tx: async_mpsc::Sender<TriggeringEvent>,
context: Context,
}
impl StateMachineHandle {
fn trigger(&mut self, trigger: Trigger) -> AckReceiver {
let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
gst::log!(RUNTIME_CAT, "Pushing {:?}", triggering_evt);
self.triggering_evt_tx.try_send(triggering_evt).unwrap();
self.context.unpark();
ack_rx
}
async fn join(self) {
self.join_handle
.await
.expect("state machine shouldn't have been cancelled");
}
}
#[derive(Debug)]
struct TaskInner {
state: TaskState,
state_machine_handle: Option<StateMachineHandle>,
}
impl Default for TaskInner {
fn default() -> Self {
TaskInner {
state: TaskState::Unprepared,
state_machine_handle: None,
}
}
}
impl TaskInner {
fn switch_to_state(&mut self, target_state: TaskState, triggering_evt: TriggeringEvent) {
let res = Ok(TransitionOk::Complete {
origin: self.state,
target: target_state,
});
self.state = target_state;
triggering_evt.send_ack(res);
}
fn switch_to_err(&mut self, triggering_evt: TriggeringEvent) {
let res = Err(TransitionError {
trigger: triggering_evt.trigger,
state: self.state,
err_msg: gst::error_msg!(
gst::CoreError::StateChange,
[
"Unrecoverable error for {:?} from state {:?}",
triggering_evt,
self.state,
]
),
});
self.state = TaskState::Error;
triggering_evt.send_ack(res);
}
fn skip_triggering_evt(&mut self, triggering_evt: TriggeringEvent) {
let res = Ok(TransitionOk::Skipped {
trigger: triggering_evt.trigger,
state: self.state,
});
triggering_evt.send_ack(res);
}
fn trigger(&mut self, trigger: Trigger) -> Result<AckReceiver, TransitionError> {
self.state_machine_handle
.as_mut()
.map(|state_machine| state_machine.trigger(trigger))
.ok_or_else(|| {
gst::warning!(
RUNTIME_CAT,
"Unable to send {:?}: no state machine",
trigger
);
TransitionError {
trigger,
state: TaskState::Unprepared,
err_msg: gst::error_msg!(
gst::ResourceError::NotFound,
["Unable to send {:?}: no state machine", trigger]
),
}
})
}
}
impl Drop for TaskInner {
fn drop(&mut self) {
if self.state != TaskState::Unprepared {
gst::fixme!(RUNTIME_CAT, "Missing call to `Task::unprepare`");
}
}
}
pub struct TaskStateGuard<'guard>(MutexGuard<'guard, TaskInner>);
impl Deref for TaskStateGuard<'_> {
type Target = TaskState;
fn deref(&self) -> &Self::Target {
&(self.0).state
}
}
#[derive(Debug, Clone)]
pub struct Task(Arc<Mutex<TaskInner>>);
impl Default for Task {
fn default() -> Self {
Task(Arc::new(Mutex::new(TaskInner::default())))
}
}
impl Task {
pub fn state(&self) -> TaskState {
self.0.lock().unwrap().state
}
pub fn lock_state(&self) -> TaskStateGuard<'_> {
TaskStateGuard(self.0.lock().unwrap())
}
pub fn prepare(&self, task_impl: impl TaskImpl, context: Context) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
let origin = inner.state;
match origin {
TaskState::Unprepared => (),
TaskState::Prepared | TaskState::Preparing => {
gst::debug!(RUNTIME_CAT, "Task already {:?}", origin);
return TransitionOk::Skipped {
trigger: Trigger::Prepare,
state: origin,
}
.into();
}
state => {
gst::warning!(RUNTIME_CAT, "Attempt to prepare Task in state {:?}", state);
return TransitionError {
trigger: Trigger::Prepare,
state: inner.state,
err_msg: gst::error_msg!(
gst::CoreError::StateChange,
["Attempt to prepare Task in state {:?}", state]
),
}
.into();
}
}
assert!(inner.state_machine_handle.is_none());
inner.state = TaskState::Preparing;
gst::log!(RUNTIME_CAT, "Spawning task state machine");
inner.state_machine_handle = Some(StateMachine::spawn(
self.0.clone(),
Box::new(task_impl),
context,
));
let ack_rx = match inner.trigger(Trigger::Prepare) {
Ok(ack_rx) => ack_rx,
Err(err) => return err.into(),
};
drop(inner);
TransitionStatus::Pending {
trigger: Trigger::Prepare,
origin: TaskState::Unprepared,
res_fut: Box::pin(ack_rx.map(Result::unwrap)),
}
}
pub fn unprepare(&self) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
let origin = inner.state;
let mut state_machine_handle = match origin {
TaskState::Stopped
| TaskState::Error
| TaskState::Prepared
| TaskState::Preparing
| TaskState::Unprepared => match inner.state_machine_handle.take() {
Some(state_machine_handle) => {
gst::debug!(RUNTIME_CAT, "Unpreparing task");
state_machine_handle
}
None => {
gst::debug!(RUNTIME_CAT, "Task already unpreparing");
return TransitionOk::Skipped {
trigger: Trigger::Unprepare,
state: origin,
}
.into();
}
},
state => {
gst::warning!(
RUNTIME_CAT,
"Attempt to unprepare Task in state {:?}",
state
);
return TransitionError {
trigger: Trigger::Unprepare,
state: inner.state,
err_msg: gst::error_msg!(
gst::CoreError::StateChange,
["Attempt to unprepare Task in state {:?}", state]
),
}
.into();
}
};
let ack_rx = state_machine_handle.trigger(Trigger::Unprepare);
drop(inner);
let state_machine_end_fut = async {
state_machine_handle.join().await;
ack_rx.await.unwrap()
};
TransitionStatus::Pending {
trigger: Trigger::Unprepare,
origin,
res_fut: Box::pin(state_machine_end_fut),
}
}
pub fn start(&self) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
if let TaskState::Started = inner.state {
return TransitionOk::Skipped {
trigger: Trigger::Start,
state: TaskState::Started,
}
.into();
}
let ack_rx = match inner.trigger(Trigger::Start) {
Ok(ack_rx) => ack_rx,
Err(err) => return err.into(),
};
let origin = inner.state;
drop(inner);
TransitionStatus::Pending {
trigger: Trigger::Start,
origin,
res_fut: Box::pin(ack_rx.map(Result::unwrap)),
}
}
pub fn pause(&self) -> TransitionStatus {
self.push_pending(Trigger::Pause)
}
pub fn flush_start(&self) -> TransitionStatus {
self.push_pending(Trigger::FlushStart)
}
pub fn flush_stop(&self) -> TransitionStatus {
self.push_pending(Trigger::FlushStop)
}
pub fn stop(&self) -> TransitionStatus {
self.push_pending(Trigger::Stop)
}
fn push_pending(&self, trigger: Trigger) -> TransitionStatus {
let mut inner = self.0.lock().unwrap();
let ack_rx = match inner.trigger(trigger) {
Ok(ack_rx) => ack_rx,
Err(err) => return err.into(),
};
let origin = inner.state;
drop(inner);
TransitionStatus::Pending {
trigger,
origin,
res_fut: Box::pin(ack_rx.map(Result::unwrap)),
}
}
}
struct StateMachine<Item: Send + 'static> {
task_impl: Box<dyn TaskImpl<Item = Item>>,
triggering_evt_rx: async_mpsc::Receiver<TriggeringEvent>,
pending_triggering_evt: Option<TriggeringEvent>,
}
macro_rules! exec_action {
($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
match $self.task_impl.$action().await {
Ok(()) => Ok($triggering_evt),
Err(err) => {
let next_trigger = $self
.task_impl
.handle_action_error($triggering_evt.trigger, $origin, err)
.await;
gst::trace!(
RUNTIME_CAT,
"TaskImpl transition action error: converting {:?} to {:?}",
$triggering_evt.trigger,
next_trigger,
);
$triggering_evt.trigger = next_trigger;
$self.pending_triggering_evt = Some($triggering_evt);
Err(())
}
}
}};
}
impl<Item: Send + 'static> StateMachine<Item> {
fn spawn(
task_inner: Arc<Mutex<TaskInner>>,
task_impl: Box<dyn TaskImpl<Item = Item>>,
context: Context,
) -> StateMachineHandle {
let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
let state_machine = StateMachine {
task_impl,
triggering_evt_rx,
pending_triggering_evt: None,
};
StateMachineHandle {
join_handle: context.spawn_and_unpark(state_machine.run(task_inner)),
triggering_evt_tx,
context,
}
}
async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>) {
let mut triggering_evt = self
.triggering_evt_rx
.next()
.await
.expect("triggering_evt_rx dropped");
if let Trigger::Prepare = triggering_evt.trigger {
gst::trace!(RUNTIME_CAT, "Preparing task");
let res = exec_action!(
self,
prepare,
triggering_evt,
TaskState::Unprepared,
&task_inner
);
if let Ok(triggering_evt) = res {
let mut task_inner = task_inner.lock().unwrap();
let res = Ok(TransitionOk::Complete {
origin: TaskState::Unprepared,
target: TaskState::Prepared,
});
task_inner.state = TaskState::Prepared;
triggering_evt.send_ack(res);
gst::trace!(RUNTIME_CAT, "Task Prepared");
}
} else {
panic!("Unexpected initial trigger {:?}", triggering_evt.trigger);
}
loop {
triggering_evt = match self.pending_triggering_evt.take() {
Some(pending_triggering_evt) => pending_triggering_evt,
None => self
.triggering_evt_rx
.next()
.await
.expect("triggering_evt_rx dropped"),
};
gst::trace!(RUNTIME_CAT, "State machine popped {:?}", triggering_evt);
match triggering_evt.trigger {
Trigger::Error => {
let mut task_inner = task_inner.lock().unwrap();
task_inner.switch_to_err(triggering_evt);
gst::trace!(RUNTIME_CAT, "Switched to Error");
}
Trigger::Start => {
let origin = {
let mut task_inner = task_inner.lock().unwrap();
let origin = task_inner.state;
match origin {
TaskState::Stopped | TaskState::Paused | TaskState::Prepared => (),
TaskState::PausedFlushing => {
task_inner.switch_to_state(TaskState::Flushing, triggering_evt);
gst::trace!(
RUNTIME_CAT,
"Switched from PausedFlushing to Flushing"
);
continue;
}
TaskState::Error => {
triggering_evt.send_err_ack();
continue;
}
state => {
task_inner.skip_triggering_evt(triggering_evt);
gst::trace!(RUNTIME_CAT, "Skipped Start in state {:?}", state);
continue;
}
}
origin
};
self.start(triggering_evt, origin, &task_inner).await;
}
Trigger::Pause => {
let (origin, target) = {
let mut task_inner = task_inner.lock().unwrap();
let origin = task_inner.state;
match origin {
TaskState::Started | TaskState::Stopped | TaskState::Prepared => {
(origin, TaskState::Paused)
}
TaskState::Flushing => (origin, TaskState::PausedFlushing),
TaskState::Error => {
triggering_evt.send_err_ack();
continue;
}
state => {
task_inner.skip_triggering_evt(triggering_evt);
gst::trace!(RUNTIME_CAT, "Skipped Pause in state {:?}", state);
continue;
}
}
};
let res = exec_action!(self, pause, triggering_evt, origin, &task_inner);
if let Ok(triggering_evt) = res {
task_inner
.lock()
.unwrap()
.switch_to_state(target, triggering_evt);
gst::trace!(RUNTIME_CAT, "Task loop {:?}", target);
}
}
Trigger::Stop => {
let origin = {
let mut task_inner = task_inner.lock().unwrap();
let origin = task_inner.state;
match origin {
TaskState::Started
| TaskState::Paused
| TaskState::PausedFlushing
| TaskState::Flushing => origin,
TaskState::Error => {
triggering_evt.send_err_ack();
continue;
}
state => {
task_inner.skip_triggering_evt(triggering_evt);
gst::trace!(RUNTIME_CAT, "Skipped Stop in state {:?}", state);
continue;
}
}
};
let res = exec_action!(self, stop, triggering_evt, origin, &task_inner);
if let Ok(triggering_evt) = res {
task_inner
.lock()
.unwrap()
.switch_to_state(TaskState::Stopped, triggering_evt);
gst::trace!(RUNTIME_CAT, "Task loop Stopped");
}
}
Trigger::FlushStart => {
let (origin, target) = {
let mut task_inner = task_inner.lock().unwrap();
let origin = task_inner.state;
match origin {
TaskState::Started => (origin, TaskState::Flushing),
TaskState::Paused => (origin, TaskState::PausedFlushing),
TaskState::Error => {
triggering_evt.send_err_ack();
continue;
}
state => {
task_inner.skip_triggering_evt(triggering_evt);
gst::trace!(RUNTIME_CAT, "Skipped FlushStart in state {:?}", state);
continue;
}
}
};
let res = exec_action!(self, flush_start, triggering_evt, origin, &task_inner);
if let Ok(triggering_evt) = res {
task_inner
.lock()
.unwrap()
.switch_to_state(target, triggering_evt);
gst::trace!(RUNTIME_CAT, "Task {:?}", target);
}
}
Trigger::FlushStop => {
let origin = task_inner.lock().unwrap().state;
let is_paused = match origin {
TaskState::Flushing => false,
TaskState::PausedFlushing => true,
TaskState::Error => {
triggering_evt.send_err_ack();
continue;
}
state => {
task_inner
.lock()
.unwrap()
.skip_triggering_evt(triggering_evt);
gst::trace!(RUNTIME_CAT, "Skipped FlushStop in state {:?}", state);
continue;
}
};
let res = exec_action!(self, flush_stop, triggering_evt, origin, &task_inner);
if let Ok(triggering_evt) = res {
if is_paused {
task_inner
.lock()
.unwrap()
.switch_to_state(TaskState::Paused, triggering_evt);
gst::trace!(RUNTIME_CAT, "Switched from PausedFlushing to Paused");
} else {
self.start(triggering_evt, origin, &task_inner).await;
}
}
}
Trigger::Unprepare => {
self.task_impl.unprepare().await;
task_inner
.lock()
.unwrap()
.switch_to_state(TaskState::Unprepared, triggering_evt);
break;
}
_ => unreachable!("State machine handler {:?}", triggering_evt),
}
}
gst::trace!(RUNTIME_CAT, "Task state machine terminated");
}
async fn start(
&mut self,
mut triggering_evt: TriggeringEvent,
origin: TaskState,
task_inner: &Arc<Mutex<TaskInner>>,
) {
match exec_action!(self, start, triggering_evt, origin, &task_inner) {
Ok(triggering_evt) => {
let mut task_inner = task_inner.lock().unwrap();
task_inner.switch_to_state(TaskState::Started, triggering_evt);
}
Err(_) => {
return;
}
}
match self.run_loop().await {
Ok(()) => (),
Err(err) => {
let next_trigger = self.task_impl.handle_loop_error(err).await;
let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
self.pending_triggering_evt = Some(triggering_evt);
}
}
}
async fn run_loop(&mut self) -> Result<(), gst::FlowError> {
gst::trace!(RUNTIME_CAT, "Task loop started");
let mut item;
loop {
item = {
let mut try_next_fut = self.task_impl.try_next().fuse();
futures::select_biased! {
triggering_evt = self.triggering_evt_rx.next() => {
let triggering_evt = triggering_evt.expect("broken state machine channel");
gst::trace!(
RUNTIME_CAT,
"Task loop handing {:?} to state machine",
triggering_evt,
);
self.pending_triggering_evt = Some(triggering_evt);
return Ok(());
}
try_next_res = try_next_fut => try_next_res.map_err(|err| {
gst::debug!(RUNTIME_CAT, "TaskImpl::try_next returned {:?}", err);
err
})?,
}
};
self.task_impl.handle_item(item).await.map_err(|err| {
gst::debug!(RUNTIME_CAT, "TaskImpl::handle_item returned {:?}", err);
err
})?;
}
}
}
#[cfg(test)]
mod tests {
use futures::channel::{mpsc, oneshot};
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::prelude::*;
use std::time::Duration;
use super::{
Task, TaskImpl,
TaskState::{self, *},
TransitionError,
TransitionOk::*,
TransitionStatus::*,
Trigger::{self, *},
};
use crate::runtime::{Context, RUNTIME_CAT};
#[track_caller]
fn stop_then_unprepare(task: Task) {
task.stop().block_on().unwrap();
task.unprepare().block_on().unwrap();
}
#[test]
fn nominal() {
gst::init().unwrap();
struct TaskTest {
prepared_sender: mpsc::Sender<()>,
started_sender: mpsc::Sender<()>,
try_next_ready_sender: mpsc::Sender<()>,
try_next_receiver: mpsc::Receiver<()>,
handle_item_ready_sender: mpsc::Sender<()>,
handle_item_sender: mpsc::Sender<()>,
paused_sender: mpsc::Sender<()>,
stopped_sender: mpsc::Sender<()>,
unprepared_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskTest {
type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: prepared");
self.prepared_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: entering try_next");
self.try_next_ready_sender.send(()).await.unwrap();
gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next");
self.try_next_receiver.next().await.unwrap();
Ok(())
}
.boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: entering handle_item");
self.handle_item_ready_sender.send(()).await.unwrap();
gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item");
self.handle_item_sender.send(()).await.unwrap();
gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item");
Ok(())
}
.boxed()
}
fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: paused");
self.paused_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: stopped");
self.stopped_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn unprepare(&mut self) -> BoxFuture<'_, ()> {
async move {
gst::debug!(RUNTIME_CAT, "nominal: unprepared");
self.unprepared_sender.send(()).await.unwrap();
}
.boxed()
}
}
let context = Context::acquire("nominal", Duration::from_millis(2)).unwrap();
let task = Task::default();
assert_eq!(task.state(), Unprepared);
gst::debug!(RUNTIME_CAT, "nominal: preparing");
let (prepared_sender, mut prepared_receiver) = mpsc::channel(1);
let (started_sender, mut started_receiver) = mpsc::channel(1);
let (try_next_ready_sender, mut try_next_ready_receiver) = mpsc::channel(1);
let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
let (handle_item_ready_sender, mut handle_item_ready_receiver) = mpsc::channel(1);
let (handle_item_sender, mut handle_item_receiver) = mpsc::channel(0);
let (paused_sender, mut paused_receiver) = mpsc::channel(1);
let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
let prepare_status = task.prepare(
TaskTest {
prepared_sender,
started_sender,
try_next_ready_sender,
try_next_receiver,
handle_item_ready_sender,
handle_item_sender,
paused_sender,
stopped_sender,
unprepared_sender,
},
context,
);
assert!(prepare_status.is_pending());
match prepare_status {
Pending {
trigger: Prepare,
origin: Unprepared,
..
} => (),
other => panic!("{:?}", other),
};
gst::debug!(RUNTIME_CAT, "nominal: starting (async prepare)");
let start_status = task.start().check().unwrap();
block_on(prepared_receiver.next()).unwrap();
assert_eq!(
prepare_status.await_maybe_on_context().unwrap(),
Complete {
origin: Unprepared,
target: Prepared,
},
);
block_on(started_receiver.next()).unwrap();
assert_eq!(
start_status.block_on().unwrap(),
Complete {
origin: Prepared,
target: Started,
}
);
assert_eq!(task.state(), Started);
block_on(try_next_ready_receiver.next()).unwrap();
block_on(try_next_sender.send(())).unwrap();
block_on(handle_item_ready_receiver.next()).unwrap();
block_on(handle_item_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "nominal: starting (redundant)");
assert_eq!(
task.start().block_on().unwrap(),
Skipped {
trigger: Start,
state: Started,
},
);
assert_eq!(task.state(), Started);
match task.unprepare().check().unwrap_err() {
TransitionError {
trigger: Unprepare,
state: Started,
..
} => (),
other => panic!("{:?}", other),
}
gst::debug!(RUNTIME_CAT, "nominal: pause cancelling try_next");
block_on(try_next_ready_receiver.next()).unwrap();
let pause_status = task.pause().check().unwrap();
gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
block_on(paused_receiver.next()).unwrap();
assert_eq!(
pause_status.block_on().unwrap(),
Complete {
origin: Started,
target: Paused,
},
);
assert!(handle_item_ready_receiver.try_next().is_err());
assert!(try_next_ready_receiver.try_next().is_err());
gst::debug!(
RUNTIME_CAT,
"nominal: starting (after pause cancelling try_next)"
);
let start_receiver = task.start().check().unwrap();
block_on(started_receiver.next()).unwrap();
assert_eq!(
start_receiver.block_on().unwrap(),
Complete {
origin: Paused,
target: Started,
},
);
assert_eq!(task.state(), Started);
gst::debug!(RUNTIME_CAT, "nominal: pause // handle_item");
block_on(try_next_ready_receiver.next()).unwrap();
block_on(try_next_sender.send(())).unwrap();
block_on(handle_item_ready_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "nominal: requesting to pause");
let pause_status = task.pause().check().unwrap();
gst::debug!(RUNTIME_CAT, "nominal: unlocking item handling");
block_on(handle_item_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
block_on(paused_receiver.next()).unwrap();
assert_eq!(
pause_status.block_on().unwrap(),
Complete {
origin: Started,
target: Paused,
},
);
assert!(try_next_ready_receiver.try_next().is_err());
gst::debug!(
RUNTIME_CAT,
"nominal: starting (after pause // handle_item)"
);
let start_receiver = task.start().check().unwrap();
block_on(started_receiver.next()).unwrap();
assert_eq!(
start_receiver.block_on().unwrap(),
Complete {
origin: Paused,
target: Started,
},
);
assert_eq!(task.state(), Started);
gst::debug!(RUNTIME_CAT, "nominal: stopping");
assert_eq!(
task.stop().block_on().unwrap(),
Complete {
origin: Started,
target: Stopped,
},
);
assert_eq!(task.state(), Stopped);
let _ = block_on(stopped_receiver.next());
let _ = try_next_ready_receiver.try_next();
gst::debug!(RUNTIME_CAT, "nominal: starting (after stop)");
assert_eq!(
task.start().block_on().unwrap(),
Complete {
origin: Stopped,
target: Started,
},
);
let _ = block_on(started_receiver.next());
gst::debug!(RUNTIME_CAT, "nominal: stopping");
assert_eq!(
task.stop().block_on().unwrap(),
Complete {
origin: Started,
target: Stopped,
},
);
assert_eq!(
task.unprepare().block_on().unwrap(),
Complete {
origin: Stopped,
target: Unprepared,
},
);
assert_eq!(task.state(), Unprepared);
let _ = block_on(unprepared_receiver.next());
}
#[test]
fn prepare_error() {
gst::init().unwrap();
struct TaskPrepareTest {
prepare_error_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskPrepareTest {
type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
Err(gst::error_msg!(
gst::ResourceError::Failed,
["prepare_error: intentional error"]
))
}
.boxed()
}
fn handle_action_error(
&mut self,
trigger: Trigger,
state: TaskState,
err: gst::ErrorMessage,
) -> BoxFuture<'_, Trigger> {
async move {
gst::debug!(
RUNTIME_CAT,
"prepare_error: handling prepare error {:?}",
err
);
match (trigger, state) {
(Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => unreachable!("{:?}", other),
}
Trigger::Error
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("prepare_error: try_next");
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("prepare_error: handle_item");
}
}
let context = Context::acquire("prepare_error", Duration::from_millis(2)).unwrap();
let task = Task::default();
assert_eq!(task.state(), Unprepared);
let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
let prepare_status = task.prepare(
TaskPrepareTest {
prepare_error_sender,
},
context,
);
gst::debug!(
RUNTIME_CAT,
"prepare_error: await action error notification"
);
block_on(prepare_error_receiver.next()).unwrap();
match prepare_status.block_on().unwrap_err() {
TransitionError {
trigger: Trigger::Error,
state: Preparing,
..
} => (),
other => panic!("{:?}", other),
}
while TaskState::Error != task.state() {
std::thread::sleep(Duration::from_millis(2));
}
match task.start().block_on().unwrap_err() {
TransitionError {
trigger: Start,
state: TaskState::Error,
..
} => (),
other => panic!("{:?}", other),
}
block_on(task.unprepare()).unwrap();
}
#[test]
fn prepare_start_ok() {
gst::init().unwrap();
struct TaskPrepareTest {
prepare_receiver: mpsc::Receiver<()>,
}
impl TaskImpl for TaskPrepareTest {
type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
RUNTIME_CAT,
"prepare_start_ok: preparation awaiting trigger"
);
self.prepare_receiver.next().await.unwrap();
gst::debug!(RUNTIME_CAT, "prepare_start_ok: preparation complete Ok");
Ok(())
}
.boxed()
}
fn handle_action_error(
&mut self,
_trigger: Trigger,
_state: TaskState,
_err: gst::ErrorMessage,
) -> BoxFuture<'_, Trigger> {
unreachable!("prepare_start_ok: handle_prepare_error");
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "prepare_start_ok: started");
Ok(())
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("prepare_start_ok: handle_item");
}
}
let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
let _ = task.prepare(TaskPrepareTest { prepare_receiver }, context);
let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move {
assert_eq!(task.state(), Preparing);
gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting");
let start_status = task.start();
match start_status {
Pending {
trigger: Start,
origin: Preparing,
..
} => (),
other => panic!("{:?}", other),
}
ready_sender.send(()).unwrap();
assert_eq!(
start_status.await.unwrap(),
Complete {
origin: Prepared,
target: Started,
},
);
assert_eq!(task.state(), Started);
let stop_status = task.stop();
match stop_status {
Pending {
trigger: Stop,
origin: Started,
..
} => (),
other => panic!("{:?}", other),
}
assert_eq!(
stop_status.await.unwrap(),
Complete {
origin: Started,
target: Stopped,
},
);
assert_eq!(task.state(), Stopped);
let unprepare_status = task.unprepare();
match unprepare_status {
Pending {
trigger: Unprepare,
origin: Stopped,
..
} => (),
other => panic!("{:?}", other),
};
assert_eq!(
unprepare_status.await.unwrap(),
Complete {
origin: Stopped,
target: Unprepared,
},
);
assert_eq!(task.state(), Unprepared);
});
gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
block_on(ready_receiver).unwrap();
gst::debug!(RUNTIME_CAT, "prepare_start_ok: triggering preparation");
block_on(prepare_sender.send(())).unwrap();
block_on(start_handle).unwrap();
}
#[test]
fn prepare_start_error() {
gst::init().unwrap();
struct TaskPrepareTest {
prepare_receiver: mpsc::Receiver<()>,
prepare_error_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskPrepareTest {
type Item = ();
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
RUNTIME_CAT,
"prepare_start_error: preparation awaiting trigger"
);
self.prepare_receiver.next().await.unwrap();
gst::debug!(RUNTIME_CAT, "prepare_start_error: preparation complete Err");
Err(gst::error_msg!(
gst::ResourceError::Failed,
["prepare_start_error: intentional error"]
))
}
.boxed()
}
fn handle_action_error(
&mut self,
trigger: Trigger,
state: TaskState,
err: gst::ErrorMessage,
) -> BoxFuture<'_, Trigger> {
async move {
gst::debug!(
RUNTIME_CAT,
"prepare_start_error: handling prepare error {:?}",
err
);
match (trigger, state) {
(Prepare, Unprepared) => {
self.prepare_error_sender.send(()).await.unwrap();
}
other => panic!("action error for {:?}", other),
}
Trigger::Error
}
.boxed()
}
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
unreachable!("prepare_start_error: start");
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("prepare_start_error: try_next");
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("prepare_start_error: handle_item");
}
}
let context = Context::acquire("prepare_start_error", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
let prepare_status = task.prepare(
TaskPrepareTest {
prepare_receiver,
prepare_error_sender,
},
context,
);
match prepare_status {
Pending {
trigger: Prepare,
origin: Unprepared,
..
} => (),
other => panic!("{:?}", other),
};
let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap();
let (ready_sender, ready_receiver) = oneshot::channel();
let start_handle = start_ctx.spawn(async move {
gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)");
let _ = task.start();
ready_sender.send(()).unwrap();
match prepare_status.await {
Err(TransitionError {
trigger: Trigger::Error,
state: Preparing,
..
}) => (),
other => panic!("{:?}", other),
}
let unprepare_status = task.unprepare();
match unprepare_status {
Pending {
trigger: Unprepare,
origin: TaskState::Error,
..
} => (),
other => panic!("{:?}", other),
};
assert_eq!(
unprepare_status.await.unwrap(),
Complete {
origin: TaskState::Error,
target: Unprepared,
},
);
});
gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx");
block_on(ready_receiver).unwrap();
gst::debug!(
RUNTIME_CAT,
"prepare_start_error: triggering preparation (failure)"
);
block_on(prepare_sender.send(())).unwrap();
gst::debug!(
RUNTIME_CAT,
"prepare_start_error: await prepare error notification"
);
block_on(prepare_error_receiver.next()).unwrap();
block_on(start_handle).unwrap();
}
#[test]
fn item_error() {
gst::init().unwrap();
struct TaskTest {
try_next_receiver: mpsc::Receiver<gst::FlowError>,
}
impl TaskImpl for TaskTest {
type Item = gst::FlowError;
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::FlowError, gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
Ok(self.try_next_receiver.next().await.unwrap())
}
.boxed()
}
fn handle_item(
&mut self,
item: gst::FlowError,
) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item);
Err(item)
}
.boxed()
}
}
let context = Context::acquire("item_error", Duration::from_millis(2)).unwrap();
let task = Task::default();
gst::debug!(RUNTIME_CAT, "item_error: prepare and start");
let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
task.prepare(TaskTest { try_next_receiver }, context)
.block_on()
.unwrap();
task.start().block_on().unwrap();
gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos");
block_on(try_next_sender.send(gst::FlowError::Eos)).unwrap();
while Stopped != task.state() {
std::thread::sleep(Duration::from_millis(2));
}
gst::debug!(RUNTIME_CAT, "item_error: starting (after stop)");
assert_eq!(
task.start().block_on().unwrap(),
Complete {
origin: Stopped,
target: Started,
},
);
gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Error");
block_on(try_next_sender.send(gst::FlowError::Error)).unwrap();
while TaskState::Error != task.state() {
std::thread::sleep(Duration::from_millis(2));
}
gst::debug!(RUNTIME_CAT, "item_error: attempting to start (after Error)");
match task.start().block_on().unwrap_err() {
TransitionError {
trigger: Start,
state: TaskState::Error,
..
} => (),
other => panic!("{:?}", other),
}
assert_eq!(
task.unprepare().block_on().unwrap(),
Complete {
origin: TaskState::Error,
target: Unprepared,
},
);
}
#[test]
fn flush_regular_sync() {
gst::init().unwrap();
struct TaskFlushTest {
flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskFlushTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("flush_regular_sync: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
}
let context = Context::acquire("flush_regular_sync", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let _ = task.prepare(
TaskFlushTest {
flush_start_sender,
flush_stop_sender,
},
context,
);
gst::debug!(RUNTIME_CAT, "flush_regular_sync: start");
block_on(task.start()).unwrap();
gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush");
assert_eq!(
task.flush_start().block_on().unwrap(),
Complete {
origin: Started,
target: Flushing,
},
);
assert_eq!(task.state(), Flushing);
block_on(flush_start_receiver.next()).unwrap();
gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
assert_eq!(
task.flush_stop().await_maybe_on_context().unwrap(),
Complete {
origin: Flushing,
target: Started,
},
);
assert_eq!(task.state(), Started);
block_on(flush_stop_receiver.next()).unwrap();
let _ = task.pause();
stop_then_unprepare(task);
}
#[test]
fn flush_regular_different_context() {
gst::init().unwrap();
struct TaskFlushTest {
flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskFlushTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("flush_regular_different_context: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
RUNTIME_CAT,
"flush_regular_different_context: started flushing"
);
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
RUNTIME_CAT,
"flush_regular_different_context: stopped flushing"
);
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
}
let context =
Context::acquire("flush_regular_different_context", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let _ = task.prepare(
TaskFlushTest {
flush_start_sender,
flush_stop_sender,
},
context,
);
gst::debug!(RUNTIME_CAT, "flush_regular_different_context: start");
task.start().block_on().unwrap();
let oob_context = Context::acquire(
"flush_regular_different_context_oob",
Duration::from_millis(2),
)
.unwrap();
let task_clone = task.clone();
let flush_res_fut = oob_context.spawn(async move {
let flush_start_status = task_clone.flush_start();
match flush_start_status {
Pending {
trigger: FlushStart,
origin: Started,
..
} => (),
other => panic!("{:?}", other),
};
assert_eq!(
flush_start_status.await.unwrap(),
Complete {
origin: Started,
target: Flushing,
},
);
assert_eq!(task_clone.state(), Flushing);
flush_start_receiver.next().await.unwrap();
let flush_stop_status = task_clone.flush_stop();
match flush_stop_status {
Pending {
trigger: FlushStop,
origin: Flushing,
..
} => (),
other => panic!("{:?}", other),
};
assert_eq!(
flush_stop_status.await_maybe_on_context().unwrap(),
NotWaiting {
trigger: FlushStop,
origin: Flushing,
},
);
Context::drain_sub_tasks().await.unwrap();
assert_eq!(task_clone.state(), Started);
});
block_on(flush_res_fut).unwrap();
block_on(flush_stop_receiver.next()).unwrap();
stop_then_unprepare(task);
}
#[test]
fn flush_regular_same_context() {
gst::init().unwrap();
struct TaskFlushTest {
flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskFlushTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("flush_regular_same_context: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_regular_same_context: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
}
let context =
Context::acquire("flush_regular_same_context", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let _ = task.prepare(
TaskFlushTest {
flush_start_sender,
flush_stop_sender,
},
context.clone(),
);
block_on(task.start()).unwrap();
let task_clone = task.clone();
let flush_handle = context.spawn(async move {
let flush_start_status = task_clone.flush_start();
match flush_start_status {
Pending {
trigger: FlushStart,
origin: Started,
..
} => (),
other => panic!("{:?}", other),
};
assert_eq!(
flush_start_status.await.unwrap(),
Complete {
origin: Started,
target: Flushing,
},
);
assert_eq!(task_clone.state(), Flushing);
flush_start_receiver.next().await.unwrap();
let flush_stop_status = task_clone.flush_stop();
match flush_stop_status {
Pending {
trigger: FlushStop,
origin: Flushing,
..
} => (),
other => panic!("{:?}", other),
};
assert_eq!(
flush_stop_status.await.unwrap(),
Complete {
origin: Flushing,
target: Started,
},
);
assert_eq!(task_clone.state(), Started);
});
block_on(flush_handle).unwrap();
block_on(flush_stop_receiver.next()).unwrap();
stop_then_unprepare(task);
}
#[test]
fn flush_from_loop() {
gst::init().unwrap();
struct TaskFlushTest {
task: Task,
flush_start_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskFlushTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::ok(()).boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item");
match self.task.flush_start() {
Pending {
trigger: FlushStart,
origin: Started,
..
} => (),
other => panic!("{:?}", other),
}
Ok(())
}
.boxed()
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_from_loop: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
}
let context = Context::acquire("flush_from_loop", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let _ = task.prepare(
TaskFlushTest {
task: task.clone(),
flush_start_sender,
},
context,
);
let _ = task.start();
gst::debug!(
RUNTIME_CAT,
"flush_from_loop: awaiting flush_start notification"
);
block_on(flush_start_receiver.next()).unwrap();
assert_eq!(
task.stop().block_on().unwrap(),
Complete {
origin: Flushing,
target: Stopped,
},
);
task.unprepare().block_on().unwrap();
}
#[test]
fn pause_from_loop() {
gst::init().unwrap();
struct TaskStartTest {
task: Task,
pause_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskStartTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::ok(()).boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
crate::runtime::timer::delay_for(Duration::from_millis(50)).await;
gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
match self.task.pause() {
Pending {
trigger: Pause,
origin: Started,
..
} => (),
other => panic!("{:?}", other),
}
Ok(())
}
.boxed()
}
fn pause(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_from_loop: entering pause action");
self.pause_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
}
let context = Context::acquire("pause_from_loop", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (pause_sender, mut pause_receiver) = mpsc::channel(1);
let _ = task.prepare(
TaskStartTest {
task: task.clone(),
pause_sender,
},
context,
);
let _ = task.start();
gst::debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification");
block_on(pause_receiver.next()).unwrap();
stop_then_unprepare(task);
}
#[test]
fn trigger_from_action() {
gst::init().unwrap();
struct TaskFlushTest {
task: Task,
flush_stop_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskFlushTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("trigger_from_action: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(
RUNTIME_CAT,
"trigger_from_action: flush_start triggering flush_stop"
);
match self.task.flush_stop() {
Pending {
trigger: FlushStop,
origin: Started,
..
} => (),
other => panic!("{:?}", other),
}
Ok(())
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "trigger_from_action: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
}
let context = Context::acquire("trigger_from_action", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let _ = task.prepare(
TaskFlushTest {
task: task.clone(),
flush_stop_sender,
},
context,
);
task.start().block_on().unwrap();
let _ = task.flush_start();
gst::debug!(
RUNTIME_CAT,
"trigger_from_action: awaiting flush_stop notification"
);
block_on(flush_stop_receiver.next()).unwrap();
stop_then_unprepare(task);
}
#[test]
fn pause_flush_start() {
gst::init().unwrap();
struct TaskFlushTest {
started_sender: mpsc::Sender<()>,
flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskFlushTest {
type Item = ();
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("pause_flush_start: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flush_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
}
let context = Context::acquire("pause_flush_start", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (started_sender, mut started_receiver) = mpsc::channel(1);
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let _ = task.prepare(
TaskFlushTest {
started_sender,
flush_start_sender,
flush_stop_sender,
},
context,
);
gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing");
assert_eq!(
task.pause().block_on().unwrap(),
Complete {
origin: Prepared,
target: Paused,
},
);
gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush");
assert_eq!(
task.flush_start().block_on().unwrap(),
Complete {
origin: Paused,
target: PausedFlushing,
},
);
assert_eq!(task.state(), PausedFlushing);
block_on(flush_start_receiver.next());
gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
assert_eq!(
task.flush_stop().block_on().unwrap(),
Complete {
origin: PausedFlushing,
target: Paused,
},
);
assert_eq!(task.state(), Paused);
block_on(flush_stop_receiver.next());
started_receiver.try_next().unwrap_err();
gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing");
assert_eq!(
task.start().block_on().unwrap(),
Complete {
origin: Paused,
target: Started,
},
);
assert_eq!(task.state(), Started);
block_on(started_receiver.next());
stop_then_unprepare(task);
}
#[test]
fn pause_flushing_start() {
gst::init().unwrap();
struct TaskFlushTest {
started_sender: mpsc::Sender<()>,
flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskFlushTest {
type Item = ();
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
self.started_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("pause_flushing_start: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
}
let context = Context::acquire("pause_flushing_start", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (started_sender, mut started_receiver) = mpsc::channel(1);
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let _ = task.prepare(
TaskFlushTest {
started_sender,
flush_start_sender,
flush_stop_sender,
},
context,
);
gst::debug!(RUNTIME_CAT, "pause_flushing_start: pausing");
let _ = task.pause();
gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting flush");
block_on(task.flush_start()).unwrap();
assert_eq!(task.state(), PausedFlushing);
block_on(flush_start_receiver.next());
gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
assert_eq!(
task.start().block_on().unwrap(),
Complete {
origin: PausedFlushing,
target: Flushing,
},
);
assert_eq!(task.state(), Flushing);
started_receiver.try_next().unwrap_err();
gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush");
assert_eq!(
task.flush_stop().block_on().unwrap(),
Complete {
origin: Flushing,
target: Started,
},
);
assert_eq!(task.state(), Started);
block_on(flush_stop_receiver.next());
block_on(started_receiver.next());
stop_then_unprepare(task);
}
#[test]
fn flush_concurrent_start() {
gst::init().unwrap();
struct TaskStartTest {
flush_start_sender: mpsc::Sender<()>,
flush_stop_sender: mpsc::Sender<()>,
}
impl TaskImpl for TaskStartTest {
type Item = ();
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
future::pending::<Result<(), gst::FlowError>>().boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
unreachable!("flush_concurrent_start: handle_item");
}
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
self.flush_start_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: stopped flushing");
self.flush_stop_sender.send(()).await.unwrap();
Ok(())
}
.boxed()
}
}
let context = Context::acquire("flush_concurrent_start", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
let _ = task.prepare(
TaskStartTest {
flush_start_sender,
flush_stop_sender,
},
context,
);
let oob_context =
Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap();
let task_clone = task.clone();
block_on(task.pause()).unwrap();
let (ready_sender, ready_receiver) = oneshot::channel();
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: spawning flush_start");
let flush_start_handle = oob_context.spawn(async move {
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start");
ready_sender.send(()).unwrap();
let status = task_clone.flush_start();
match status {
Pending {
trigger: FlushStart,
origin: Paused,
..
} => (),
Pending {
trigger: FlushStart,
origin: Started,
..
} => (),
other => panic!("{:?}", other),
};
status.await.unwrap();
flush_start_receiver.next().await.unwrap();
});
gst::debug!(
RUNTIME_CAT,
"flush_concurrent_start: awaiting for oob_context"
);
block_on(ready_receiver).unwrap();
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
match block_on(task.start()) {
Ok(Complete {
origin: Paused,
target: Started,
}) => (),
Ok(Complete {
origin: PausedFlushing,
target: Flushing,
}) => (),
other => panic!("{:?}", other),
}
block_on(flush_start_handle).unwrap();
gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
assert_eq!(
task.flush_stop().block_on().unwrap(),
Complete {
origin: Flushing,
target: Started,
},
);
assert_eq!(task.state(), Started);
block_on(flush_stop_receiver.next());
stop_then_unprepare(task);
}
#[test]
fn start_timer() {
use crate::runtime::timer;
gst::init().unwrap();
struct TaskTimerTest {
timer: Option<timer::Oneshot>,
timer_elapsed_sender: Option<oneshot::Sender<()>>,
}
impl TaskImpl for TaskTimerTest {
type Item = ();
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
async move {
self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
gst::debug!(RUNTIME_CAT, "start_timer: started");
Ok(())
}
.boxed()
}
fn try_next(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer");
self.timer.take().unwrap().await;
Ok(())
}
.boxed()
}
fn handle_item(&mut self, _item: ()) -> BoxFuture<'_, Result<(), gst::FlowError>> {
async move {
gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() {
timer_elapsed_sender.send(()).unwrap();
}
Err(gst::FlowError::Eos)
}
.boxed()
}
}
let context = Context::acquire("start_timer", Duration::from_millis(2)).unwrap();
let task = Task::default();
let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel();
let _ = task.prepare(
TaskTimerTest {
timer: None,
timer_elapsed_sender: Some(timer_elapsed_sender),
},
context,
);
gst::debug!(RUNTIME_CAT, "start_timer: start");
let _ = task.start();
block_on(timer_elapsed_receiver).unwrap();
gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed received");
stop_then_unprepare(task);
}
}