gstthreadshare/runtime/
task.rs

1// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
2// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
3//
4// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
5// If a copy of the MPL was not distributed with this file, You can obtain one at
6// <https://mozilla.org/MPL/2.0/>.
7//
8// SPDX-License-Identifier: MPL-2.0
9
10//! An execution loop to run asynchronous processing.
11
12use futures::channel::mpsc as async_mpsc;
13use futures::channel::oneshot;
14use futures::prelude::*;
15
16use std::fmt;
17use std::ops::Deref;
18use std::pin::{pin, Pin};
19use std::sync::{Arc, Mutex, MutexGuard};
20use std::task::Poll;
21
22use gst::glib;
23use gst::glib::prelude::*;
24
25use super::{Context, JoinHandle, RUNTIME_CAT};
26
27#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
28pub enum TaskState {
29    Error,
30    Flushing,
31    Paused,
32    PausedFlushing,
33    Prepared,
34    Preparing,
35    Started,
36    Stopped,
37    Unprepared,
38}
39
40#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub enum Trigger {
42    Error,
43    FlushStart,
44    FlushStop,
45    Pause,
46    Prepare,
47    Start,
48    Stop,
49    Unprepare,
50}
51
52/// Transition success details.
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub enum TransitionOk {
55    /// Transition completed successfully.
56    Complete {
57        origin: TaskState,
58        target: TaskState,
59    },
60    /// Not waiting for transition result.
61    ///
62    /// This is to prevent:
63    /// - A deadlock when executing a transition action.
64    /// - A potential infinite wait when pausing a running loop
65    ///   which could be awaiting for an `nominal` to complete.
66    NotWaiting { trigger: Trigger, origin: TaskState },
67    /// Skipping triggering event due to current state.
68    Skipped { trigger: Trigger, state: TaskState },
69}
70
71/// TriggeringEvent error details.
72#[derive(Clone, Debug, Eq, PartialEq)]
73pub struct TransitionError {
74    pub trigger: Trigger,
75    pub state: TaskState,
76    pub err_msg: gst::ErrorMessage,
77}
78
79impl fmt::Display for TransitionError {
80    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81        write!(
82            f,
83            "{:?} from state {:?}: {:?}",
84            self.trigger, self.state, self.err_msg
85        )
86    }
87}
88
89impl std::error::Error for TransitionError {}
90
91impl From<TransitionError> for gst::ErrorMessage {
92    fn from(err: TransitionError) -> Self {
93        err.err_msg
94    }
95}
96
97/// Transition status.
98///
99/// A state transition occurs as a result of a triggering event.
100/// The triggering event is asynchronously handled by a state machine
101/// running on a [`Context`].
102#[must_use = "This `TransitionStatus` may be `Pending`. In most cases it should be awaited. See `await_maybe_on_context`"]
103pub enum TransitionStatus {
104    /// Transition result is ready.
105    Ready(Result<TransitionOk, TransitionError>),
106    /// Transition is pending.
107    Pending {
108        trigger: Trigger,
109        origin: TaskState,
110        res_fut: Pin<Box<dyn Future<Output = Result<TransitionOk, TransitionError>> + Send>>,
111    },
112}
113
114impl TransitionStatus {
115    pub fn is_ready(&self) -> bool {
116        matches!(self, TransitionStatus::Ready { .. })
117    }
118
119    pub fn is_pending(&self) -> bool {
120        matches!(self, TransitionStatus::Pending { .. })
121    }
122
123    /// Converts the `TransitionStatus` into a `Result`.
124    ///
125    /// This function allows getting the `TransitionError` when
126    /// the transition result is ready without `await`ing nor blocking.
127    ///
128    /// See also [`Self::await_maybe_on_context`].
129    // FIXME once stabilized, this could use https://github.com/rust-lang/rust/issues/84277
130    pub fn check(self) -> Result<TransitionStatus, TransitionError> {
131        match self {
132            TransitionStatus::Ready(Err(err)) => Err(err),
133            other => Ok(other),
134        }
135    }
136
137    /// Blocks on this state transition to complete, or adds a subtask if running on a [`Context`].
138    ///
139    /// Notes:
140    ///
141    /// - If you need to execute code after the transition succeeds or fails,
142    ///   see [`block_on_or_add_subtask_then`].
143    /// - When running in an `async` block within a running transition or
144    ///   task iteration, don't await for the transition as it would deadlock.
145    ///   Use [`Self::check`] to make sure the state transition is valid.
146    /// - When running in an `async` block out of a running transition or
147    ///   task iteration, just `.await` normally. E.g.:
148    ///
149    /// ```
150    /// # use gstthreadshare::runtime::task::{Task, TransitionOk, TransitionError};
151    /// # async fn async_fn() -> Result<TransitionOk, TransitionError> {
152    /// # let task = Task::default();
153    ///   let flush_ok = task.flush_start().await?;
154    /// # Ok(flush_ok)
155    /// # }
156    /// ```
157    ///
158    /// This function makes sure the transition completes successfully or
159    /// produces an error. It must be used in situations where we don't know
160    /// whether we are running on a [`Context`] or not. This is the case for
161    /// functions in [`PadSrc`] and [`PadSink`] as well as the synchronous
162    /// functions transitively called from them.
163    ///
164    /// As an example, a `PadSrc::src_event` function which handles a
165    /// `FlushStart` could call:
166    ///
167    /// ```
168    /// # fn src_event() -> bool {
169    /// # let task = gstthreadshare::runtime::Task::default();
170    ///   return task
171    ///       .flush_start()
172    ///       .block_on_or_add_subtask()
173    ///       .is_ok();
174    /// # }
175    /// ```
176    ///
177    /// If the transition is already complete, the result is returned immediately.
178    ///
179    /// If we are NOT running on a [`Context`], the transition result is awaited
180    /// by blocking on current thread and the result is returned.
181    ///
182    /// If we are running on a [`Context`], the transition result is awaited
183    /// in a sub task for current [`Context`]'s Scheduler task. As a consequence,
184    /// the sub task will be awaited in usual [`Context::drain_sub_tasks`]
185    /// rendezvous, ensuring some kind of synchronization. To avoid deadlocks,
186    /// `Ok(TransitionOk::NotWaiting { .. })` is immediately returned.
187    ///
188    /// [`PadSrc`]: ../pad/struct.PadSrc.html
189    /// [`PadSink`]: ../pad/struct.PadSink.html
190    pub fn block_on_or_add_subtask<O>(self, obj: &O) -> Result<TransitionOk, TransitionError>
191    where
192        O: IsA<glib::Object> + Send,
193    {
194        use TransitionStatus::*;
195        match self {
196            Pending {
197                trigger,
198                origin,
199                res_fut,
200            } => {
201                if let Some((ctx, task_id)) = Context::current_task() {
202                    gst::debug!(
203                        RUNTIME_CAT,
204                        obj = obj,
205                        "Awaiting for {trigger:?} ack in a subtask on context {}",
206                        ctx.name()
207                    );
208
209                    let obj = obj.clone();
210                    let _ = ctx.add_sub_task(task_id, async move {
211                        let res = res_fut.await;
212                        match res {
213                            Ok(status) => {
214                                gst::log!(
215                                    RUNTIME_CAT,
216                                    obj = obj,
217                                    "Task {trigger:?} success: {status:?}",
218                                );
219                            }
220                            Err(err) => {
221                                gst::error!(
222                                    RUNTIME_CAT,
223                                    obj = obj,
224                                    "Task {trigger:?} failure: {err}"
225                                );
226                            }
227                        }
228
229                        Ok(())
230                    });
231
232                    Ok(TransitionOk::NotWaiting { trigger, origin })
233                } else {
234                    gst::debug!(
235                        RUNTIME_CAT,
236                        obj = obj,
237                        "Awaiting for {trigger:?} ack on current thread",
238                    );
239                    let res = futures::executor::block_on(res_fut);
240                    match res {
241                        Ok(ref status) => {
242                            gst::log!(
243                                RUNTIME_CAT,
244                                obj = obj,
245                                "Task {trigger:?} success: {status:?}",
246                            );
247                        }
248                        Err(ref err) => {
249                            gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}");
250                        }
251                    }
252
253                    res
254                }
255            }
256            Ready(res) => {
257                match res {
258                    Ok(ref status) => {
259                        gst::log!(
260                            RUNTIME_CAT,
261                            obj = obj,
262                            "Task transition immediate success: {status:?}",
263                        );
264                    }
265                    Err(ref err) => {
266                        gst::error!(
267                            RUNTIME_CAT,
268                            obj = obj,
269                            "Task transition immediate failure: {err}",
270                        );
271                    }
272                }
273
274                res
275            }
276        }
277    }
278
279    /// Blocks on this state transition to complete, or adds a subtask if running on a [`Context`]
280    /// executing the provided function after the transition succeeds or fails.
281    ///
282    /// Compared to [`block_on_or_addsubtask`], this function also executes the provieded
283    /// `func` after the transition succeeded or failed. Code following [`block_on_or_addsubtask`]
284    /// can actually be executed before the transition if a subtask was added and the returned
285    /// `Result` might not reflect the actual transition result.
286    ///
287    /// If the transition is already complete, `func` is executed immediately.
288    ///
289    /// If we are NOT running on a [`Context`], the transition result is awaited
290    /// by blocking on current thread and `func` is executed.
291    ///
292    /// If we are running on a [`Context`], the transition result is awaited
293    /// in a sub task for current [`Context`]'s Scheduler task and `func` is executed with
294    /// the transition result. In this case, `block_on_or_add_subtask_then` always
295    /// returns `Ok(())` since the actual processing is handled asynchronously.
296    ///
297    /// ## Example
298    ///
299    /// ```
300    /// # use gstthreadshare::runtime::task::{Task, TransitionOk, TransitionError};
301    /// # async fn async_fn() -> Result<TransitionOk, TransitionError> {
302    /// # let task = Task::default();
303    ///   task
304    ///       .stop()
305    ///       .block_on_or_add_subtask_then(self.obj(), |elem, res| {
306    ///           // Add specific stop code here,
307    ///           // it will be executed after the transition succeeds or fails
308    ///
309    ///           if res.is_ok() {
310    ///               gst::debug!(CAT, obj = elem, "Stopped");
311    ///           }
312    ///       })
313    /// # Ok(flush_ok)
314    /// # }
315    /// ```
316    pub fn block_on_or_add_subtask_then<T, F>(
317        self,
318        obj: glib::BorrowedObject<'_, T>,
319        func: F,
320    ) -> Result<(), gst::ErrorMessage>
321    where
322        T: IsA<glib::Object> + Send,
323        F: FnOnce(&T, &Result<TransitionOk, TransitionError>) + Send + 'static,
324    {
325        use TransitionStatus::*;
326        match self {
327            Pending {
328                trigger, res_fut, ..
329            } => {
330                if let Some((ctx, task_id)) = Context::current_task() {
331                    gst::debug!(
332                        RUNTIME_CAT,
333                        obj = obj,
334                        "Awaiting for {trigger:?} ack in a subtask on context {}",
335                        ctx.name()
336                    );
337                    let obj = obj.clone();
338                    let _ = ctx.add_sub_task(task_id, async move {
339                        let res = res_fut.await;
340                        match res {
341                            Ok(ref status) => {
342                                gst::log!(
343                                    RUNTIME_CAT,
344                                    obj = obj,
345                                    "Task {trigger:?} success: {status:?}",
346                                );
347                                func(&obj, &res);
348                                Ok(())
349                            }
350                            Err(ref err) => {
351                                gst::error!(
352                                    RUNTIME_CAT,
353                                    obj = obj,
354                                    "Task {trigger:?} failure: {err}",
355                                );
356                                func(&obj, &res);
357                                Err(gst::FlowError::Error)
358                            }
359                        }
360                    });
361
362                    Ok(())
363                } else {
364                    gst::debug!(
365                        RUNTIME_CAT,
366                        obj = obj,
367                        "Awaiting for {trigger:?} ack on current thread",
368                    );
369                    let res = futures::executor::block_on(res_fut);
370                    match res {
371                        Ok(ref status) => {
372                            gst::log!(
373                                RUNTIME_CAT,
374                                obj = obj,
375                                "Task {trigger:?} success: {status:?}",
376                            );
377                            func(&obj, &res);
378                            Ok(())
379                        }
380                        Err(ref err) => {
381                            gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}",);
382                            func(&obj, &res);
383                            res.map(|_| ()).map_err(|err| err.into())
384                        }
385                    }
386                }
387            }
388            Ready(res) => match res {
389                Ok(ref status) => {
390                    gst::log!(
391                        RUNTIME_CAT,
392                        obj = obj,
393                        "Task transition immediate success: {status:?}",
394                    );
395                    func(&obj, &res);
396                    Ok(())
397                }
398                Err(ref err) => {
399                    gst::error!(
400                        RUNTIME_CAT,
401                        obj = obj,
402                        "Task transition immediate failure: {err}",
403                    );
404                    func(&obj, &res);
405                    res.map(|_| ()).map_err(|err| err.into())
406                }
407            },
408        }
409    }
410}
411
412impl Future for TransitionStatus {
413    type Output = Result<TransitionOk, TransitionError>;
414
415    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
416        use TransitionStatus::*;
417
418        match &mut *self {
419            Ready(res) => Poll::Ready(res.clone()),
420            Pending { res_fut, .. } => match Pin::new(res_fut).poll(cx) {
421                Poll::Pending => Poll::Pending,
422                Poll::Ready(res) => {
423                    *self = Ready(res.clone());
424
425                    Poll::Ready(res)
426                }
427            },
428        }
429    }
430}
431
432impl From<TransitionOk> for TransitionStatus {
433    fn from(ok: TransitionOk) -> Self {
434        Self::Ready(Ok(ok))
435    }
436}
437
438impl From<TransitionError> for TransitionStatus {
439    fn from(err: TransitionError) -> Self {
440        Self::Ready(Err(err))
441    }
442}
443
444// Explicit impl due to `res_fut` not implementing `Debug`.
445impl fmt::Debug for TransitionStatus {
446    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447        use TransitionStatus::*;
448        match self {
449            Ready(res) => f.debug_tuple("Ready").field(res).finish(),
450            Pending {
451                trigger, origin, ..
452            } => f
453                .debug_struct("Pending")
454                .field("trigger", trigger)
455                .field("origin", origin)
456                .finish(),
457        }
458    }
459}
460
461/// Implementation trait for `Task`s.
462///
463/// Defines implementations for state transition actions and error handlers.
464pub trait TaskImpl: Send + 'static {
465    type Item: Send + 'static;
466
467    fn obj(&self) -> &impl IsA<glib::Object>;
468
469    fn prepare(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
470        future::ok(())
471    }
472
473    fn unprepare(&mut self) -> impl Future<Output = ()> + Send {
474        future::ready(())
475    }
476
477    fn start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
478        future::ok(())
479    }
480
481    /// Tries to retrieve the next item to process.
482    ///
483    /// With [`Self::handle_item`], this is one of the two `Task` loop
484    /// functions. They are executed in a loop in the `Started` state.
485    ///
486    /// Function `try_next` is awaited at the beginning of each iteration,
487    /// and can be cancelled at `await` point if a state transition is requested.
488    ///
489    /// If `Ok(item)` is returned, the iteration calls [`Self::handle_item`]
490    /// with said `Item`.
491    ///
492    /// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`].
493    fn try_next(&mut self) -> impl Future<Output = Result<Self::Item, gst::FlowError>> + Send;
494
495    /// Does whatever needs to be done with the `item`.
496    ///
497    /// With [`Self::try_next`], this is one of the two `Task` loop
498    /// functions. They are executed in a loop in the `Started` state.
499    ///
500    /// Function `handle_item` asynchronously processes an `item` previously
501    /// retrieved by [`Self::try_next`]. Processing is guaranteed to run
502    /// to completion even if a state transition is requested.
503    ///
504    /// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`].
505    fn handle_item(
506        &mut self,
507        _item: Self::Item,
508    ) -> impl Future<Output = Result<(), gst::FlowError>> + Send;
509
510    fn pause(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
511        future::ok(())
512    }
513
514    fn flush_start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
515        future::ready(Ok(()))
516    }
517
518    fn flush_stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
519        future::ready(Ok(()))
520    }
521
522    fn stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
523        future::ready(Ok(()))
524    }
525
526    /// Handles an error occurring during the execution of the `Task` loop.
527    ///
528    /// This include errors returned by [`Self::try_next`] & [`Self::handle_item`].
529    ///
530    /// If the error is unrecoverable, implementations might use
531    /// `gst::Element::post_error_message` and return `Trigger::Error`.
532    ///
533    /// Otherwise, handle the error and return the requested `Transition` to recover.
534    ///
535    /// Default behaviour depends on the `err`:
536    ///
537    /// - `FlowError::Flushing` -> `Trigger::FlushStart`.
538    /// - `FlowError::Eos` -> `Trigger::Stop`.
539    /// - Other `FlowError` -> `Trigger::Error`.
540    fn handle_loop_error(&mut self, err: gst::FlowError) -> impl Future<Output = Trigger> + Send {
541        async move {
542            match err {
543                gst::FlowError::Flushing => {
544                    gst::debug!(
545                        RUNTIME_CAT,
546                        obj = self.obj(),
547                        "Task loop returned Flushing. Posting FlushStart"
548                    );
549                    Trigger::FlushStart
550                }
551                gst::FlowError::Eos => {
552                    gst::debug!(
553                        RUNTIME_CAT,
554                        obj = self.obj(),
555                        "Task loop returned Eos. Posting Stop"
556                    );
557                    Trigger::Stop
558                }
559                other => {
560                    gst::error!(
561                        RUNTIME_CAT,
562                        obj = self.obj(),
563                        "Task loop returned {other:?}. Posting Error",
564                    );
565                    Trigger::Error
566                }
567            }
568        }
569    }
570
571    /// Handles an error occurring during the execution of a transition action.
572    ///
573    /// This handler also catches errors returned by subtasks spawned by the transition action.
574    ///
575    /// If the error is unrecoverable, implementations might use `gst::Element::post_error_message`
576    /// and return `Trigger::Error`.
577    ///
578    /// Otherwise, handle the error and return the recovering `Trigger`.
579    ///
580    /// Default is to `gst::error` log and return `Trigger::Error`.
581    fn handle_action_error(
582        &mut self,
583        trigger: Trigger,
584        state: TaskState,
585        err: gst::ErrorMessage,
586    ) -> impl Future<Output = Trigger> + Send {
587        async move {
588            gst::error!(
589                RUNTIME_CAT,
590                obj = self.obj(),
591                "TaskImpl transition action error during {trigger:?} from {state:?}: {err:?}. Posting Trigger::Error",
592            );
593
594            Trigger::Error
595        }
596    }
597}
598
599type AckSender = oneshot::Sender<Result<TransitionOk, TransitionError>>;
600type AckReceiver = oneshot::Receiver<Result<TransitionOk, TransitionError>>;
601
602struct TriggeringEvent {
603    trigger: Trigger,
604    ack_tx: AckSender,
605}
606
607impl TriggeringEvent {
608    fn new(trigger: Trigger) -> (Self, AckReceiver) {
609        let (ack_tx, ack_rx) = oneshot::channel();
610        let req = TriggeringEvent { trigger, ack_tx };
611
612        (req, ack_rx)
613    }
614
615    fn send_ack(self, res: Result<TransitionOk, TransitionError>) {
616        let _ = self.ack_tx.send(res);
617    }
618
619    fn send_err_ack(self) {
620        let res = Err(TransitionError {
621            trigger: self.trigger,
622            state: TaskState::Error,
623            err_msg: gst::error_msg!(
624                gst::CoreError::StateChange,
625                [
626                    "Triggering Event {:?} rejected due to a previous unrecoverable error",
627                    self.trigger,
628                ]
629            ),
630        });
631
632        self.send_ack(res);
633    }
634}
635
636impl fmt::Debug for TriggeringEvent {
637    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
638        f.debug_struct("TriggeringEvent")
639            .field("trigger", &self.trigger)
640            .finish()
641    }
642}
643
644#[derive(Debug)]
645struct StateMachineHandle {
646    join_handle: JoinHandle<()>,
647    triggering_evt_tx: async_mpsc::Sender<TriggeringEvent>,
648    context: Context,
649}
650
651impl StateMachineHandle {
652    fn trigger(&mut self, trigger: Trigger) -> AckReceiver {
653        let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
654
655        gst::log!(RUNTIME_CAT, "Pushing {triggering_evt:?}");
656        self.triggering_evt_tx.try_send(triggering_evt).unwrap();
657
658        self.context.unpark();
659
660        ack_rx
661    }
662
663    async fn join(self) {
664        self.join_handle
665            .await
666            .expect("state machine shouldn't have been cancelled");
667    }
668}
669
670#[derive(Debug)]
671struct TaskInner {
672    state: TaskState,
673    state_machine_handle: Option<StateMachineHandle>,
674}
675
676impl Default for TaskInner {
677    fn default() -> Self {
678        TaskInner {
679            state: TaskState::Unprepared,
680            state_machine_handle: None,
681        }
682    }
683}
684
685impl TaskInner {
686    fn switch_to_state(&mut self, target_state: TaskState, triggering_evt: TriggeringEvent) {
687        let res = Ok(TransitionOk::Complete {
688            origin: self.state,
689            target: target_state,
690        });
691
692        self.state = target_state;
693        triggering_evt.send_ack(res);
694    }
695
696    fn switch_to_err(&mut self, triggering_evt: TriggeringEvent) {
697        let res = Err(TransitionError {
698            trigger: triggering_evt.trigger,
699            state: self.state,
700            err_msg: gst::error_msg!(
701                gst::CoreError::StateChange,
702                [
703                    "Unrecoverable error for {triggering_evt:?} from state {:?}",
704                    self.state,
705                ]
706            ),
707        });
708
709        self.state = TaskState::Error;
710        triggering_evt.send_ack(res);
711    }
712
713    fn skip_triggering_evt(&mut self, triggering_evt: TriggeringEvent) {
714        let res = Ok(TransitionOk::Skipped {
715            trigger: triggering_evt.trigger,
716            state: self.state,
717        });
718
719        triggering_evt.send_ack(res);
720    }
721
722    fn trigger(&mut self, trigger: Trigger) -> Result<AckReceiver, TransitionError> {
723        self.state_machine_handle
724            .as_mut()
725            .map(|state_machine| state_machine.trigger(trigger))
726            .ok_or_else(|| {
727                gst::warning!(RUNTIME_CAT, "Unable to send {trigger:?}: no state machine",);
728                TransitionError {
729                    trigger,
730                    state: TaskState::Unprepared,
731                    err_msg: gst::error_msg!(
732                        gst::ResourceError::NotFound,
733                        ["Unable to send {trigger:?}: no state machine"]
734                    ),
735                }
736            })
737    }
738}
739
740impl Drop for TaskInner {
741    fn drop(&mut self) {
742        if self.state != TaskState::Unprepared {
743            // Don't panic here: in case another panic occurs, we would get
744            // "panicked while panicking" which would prevents developers
745            // from getting the initial panic message.
746            gst::fixme!(RUNTIME_CAT, "Missing call to `Task::unprepare`");
747        }
748    }
749}
750
751/// An RAII implementation of scoped locked `TaskState`.
752pub struct TaskStateGuard<'guard>(MutexGuard<'guard, TaskInner>);
753
754impl Deref for TaskStateGuard<'_> {
755    type Target = TaskState;
756
757    fn deref(&self) -> &Self::Target {
758        &(self.0).state
759    }
760}
761
762/// A `Task` operating on a `threadshare` [`Context`].
763///
764/// [`Context`]: ../executor/struct.Context.html
765#[derive(Debug, Clone)]
766pub struct Task(Arc<Mutex<TaskInner>>);
767
768impl Default for Task {
769    fn default() -> Self {
770        Task(Arc::new(Mutex::new(TaskInner::default())))
771    }
772}
773
774impl Task {
775    pub fn state(&self) -> TaskState {
776        self.0.lock().unwrap().state
777    }
778
779    pub fn lock_state(&self) -> TaskStateGuard<'_> {
780        TaskStateGuard(self.0.lock().unwrap())
781    }
782
783    pub fn prepare(&self, task_impl: impl TaskImpl, context: Context) -> TransitionStatus {
784        let mut inner = self.0.lock().unwrap();
785
786        let origin = inner.state;
787        match origin {
788            TaskState::Unprepared => (),
789            TaskState::Prepared | TaskState::Preparing => {
790                gst::debug!(
791                    RUNTIME_CAT,
792                    obj = task_impl.obj(),
793                    "Task already {origin:?}",
794                );
795                return TransitionOk::Skipped {
796                    trigger: Trigger::Prepare,
797                    state: origin,
798                }
799                .into();
800            }
801            state => {
802                gst::warning!(
803                    RUNTIME_CAT,
804                    obj = task_impl.obj(),
805                    "Attempt to prepare Task in state {state:?}"
806                );
807                return TransitionError {
808                    trigger: Trigger::Prepare,
809                    state: inner.state,
810                    err_msg: gst::error_msg!(
811                        gst::CoreError::StateChange,
812                        ["Attempt to prepare Task in state {state:?}"]
813                    ),
814                }
815                .into();
816            }
817        }
818
819        assert!(inner.state_machine_handle.is_none());
820
821        inner.state = TaskState::Preparing;
822
823        gst::log!(
824            RUNTIME_CAT,
825            obj = task_impl.obj(),
826            "Spawning task state machine"
827        );
828        inner.state_machine_handle = Some(StateMachine::spawn(self.0.clone(), task_impl, context));
829
830        let ack_rx = match inner.trigger(Trigger::Prepare) {
831            Ok(ack_rx) => ack_rx,
832            Err(err) => return err.into(),
833        };
834        drop(inner);
835
836        TransitionStatus::Pending {
837            trigger: Trigger::Prepare,
838            origin: TaskState::Unprepared,
839            res_fut: Box::pin(ack_rx.map(Result::unwrap)),
840        }
841    }
842
843    pub fn unprepare(&self) -> TransitionStatus {
844        let mut inner = self.0.lock().unwrap();
845
846        let origin = inner.state;
847        let mut state_machine_handle = match origin {
848            TaskState::Stopped
849            | TaskState::Error
850            | TaskState::Prepared
851            | TaskState::Preparing
852            | TaskState::Unprepared => match inner.state_machine_handle.take() {
853                Some(state_machine_handle) => {
854                    gst::debug!(RUNTIME_CAT, "Unpreparing task");
855
856                    state_machine_handle
857                }
858                None => {
859                    gst::debug!(RUNTIME_CAT, "Task already unpreparing");
860                    return TransitionOk::Skipped {
861                        trigger: Trigger::Unprepare,
862                        state: origin,
863                    }
864                    .into();
865                }
866            },
867            state => {
868                gst::warning!(RUNTIME_CAT, "Attempt to unprepare Task in state {state:?}");
869                return TransitionError {
870                    trigger: Trigger::Unprepare,
871                    state: inner.state,
872                    err_msg: gst::error_msg!(
873                        gst::CoreError::StateChange,
874                        ["Attempt to unprepare Task in state {state:?}"]
875                    ),
876                }
877                .into();
878            }
879        };
880
881        let ack_rx = state_machine_handle.trigger(Trigger::Unprepare);
882        drop(inner);
883
884        let state_machine_end_fut = async {
885            state_machine_handle.join().await;
886            ack_rx.await.unwrap()
887        };
888
889        TransitionStatus::Pending {
890            trigger: Trigger::Unprepare,
891            origin,
892            res_fut: Box::pin(state_machine_end_fut),
893        }
894    }
895
896    /// Starts the `Task`.
897    ///
898    /// The execution occurs on the `Task` context.
899    pub fn start(&self) -> TransitionStatus {
900        let mut inner = self.0.lock().unwrap();
901
902        if let TaskState::Started = inner.state {
903            return TransitionOk::Skipped {
904                trigger: Trigger::Start,
905                state: TaskState::Started,
906            }
907            .into();
908        }
909
910        let ack_rx = match inner.trigger(Trigger::Start) {
911            Ok(ack_rx) => ack_rx,
912            Err(err) => return err.into(),
913        };
914
915        let origin = inner.state;
916        drop(inner);
917
918        TransitionStatus::Pending {
919            trigger: Trigger::Start,
920            origin,
921            res_fut: Box::pin(ack_rx.map(Result::unwrap)),
922        }
923    }
924
925    /// Requests the `Task` loop to pause.
926    ///
927    /// If an item handling is in progress, it will run to completion,
928    /// then no iterations will be executed before `start` is called again.
929    pub fn pause(&self) -> TransitionStatus {
930        self.push_pending(Trigger::Pause)
931    }
932
933    pub fn flush_start(&self) -> TransitionStatus {
934        self.push_pending(Trigger::FlushStart)
935    }
936
937    pub fn flush_stop(&self) -> TransitionStatus {
938        self.push_pending(Trigger::FlushStop)
939    }
940
941    /// Stops the `Started` `Task` and wait for it to finish.
942    pub fn stop(&self) -> TransitionStatus {
943        self.push_pending(Trigger::Stop)
944    }
945
946    /// Pushes a [`Trigger`] and returns TransitionStatus::Pending.
947    fn push_pending(&self, trigger: Trigger) -> TransitionStatus {
948        let mut inner = self.0.lock().unwrap();
949
950        let ack_rx = match inner.trigger(trigger) {
951            Ok(ack_rx) => ack_rx,
952            Err(err) => return err.into(),
953        };
954
955        let origin = inner.state;
956        drop(inner);
957
958        TransitionStatus::Pending {
959            trigger,
960            origin,
961            res_fut: Box::pin(ack_rx.map(Result::unwrap)),
962        }
963    }
964}
965
966struct StateMachine<Task: TaskImpl> {
967    task_impl: Task,
968    triggering_evt_rx: async_mpsc::Receiver<TriggeringEvent>,
969    pending_triggering_evt: Option<TriggeringEvent>,
970}
971
972macro_rules! exec_action {
973    ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
974        match $self.task_impl.$action().await {
975            Ok(()) => Ok($triggering_evt),
976            Err(err) => {
977                // FIXME problem is that we loose the origin trigger in the
978                //       final TransitionStatus.
979
980                let next_trigger = $self
981                    .task_impl
982                    .handle_action_error($triggering_evt.trigger, $origin, err)
983                    .await;
984
985                // Convert triggering event according to the error handler's decision
986                gst::trace!(
987                    RUNTIME_CAT,
988                    obj = $self.task_impl.obj(),
989                    "TaskImpl transition action error: converting {:?} to {next_trigger:?}",
990                    $triggering_evt.trigger,
991                );
992
993                $triggering_evt.trigger = next_trigger;
994                $self.pending_triggering_evt = Some($triggering_evt);
995
996                Err(())
997            }
998        }
999    }};
1000}
1001
1002impl<Task: TaskImpl> StateMachine<Task> {
1003    fn spawn(
1004        task_inner: Arc<Mutex<TaskInner>>,
1005        task_impl: Task,
1006        context: Context,
1007    ) -> StateMachineHandle {
1008        let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
1009
1010        let state_machine = StateMachine {
1011            task_impl,
1012            triggering_evt_rx,
1013            pending_triggering_evt: None,
1014        };
1015
1016        StateMachineHandle {
1017            join_handle: context.spawn_and_unpark(state_machine.run(task_inner)),
1018            triggering_evt_tx,
1019            context,
1020        }
1021    }
1022
1023    async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>) {
1024        let mut triggering_evt = self
1025            .triggering_evt_rx
1026            .next()
1027            .await
1028            .expect("triggering_evt_rx dropped");
1029
1030        if let Trigger::Prepare = triggering_evt.trigger {
1031            gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Preparing task");
1032
1033            let res = exec_action!(
1034                self,
1035                prepare,
1036                triggering_evt,
1037                TaskState::Unprepared,
1038                &task_inner
1039            );
1040            if let Ok(triggering_evt) = res {
1041                let mut task_inner = task_inner.lock().unwrap();
1042                let res = Ok(TransitionOk::Complete {
1043                    origin: TaskState::Unprepared,
1044                    target: TaskState::Prepared,
1045                });
1046
1047                task_inner.state = TaskState::Prepared;
1048                triggering_evt.send_ack(res);
1049
1050                gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task Prepared");
1051            }
1052        } else {
1053            panic!("Unexpected initial trigger {:?}", triggering_evt.trigger);
1054        }
1055
1056        loop {
1057            triggering_evt = match self.pending_triggering_evt.take() {
1058                Some(pending_triggering_evt) => pending_triggering_evt,
1059                None => self
1060                    .triggering_evt_rx
1061                    .next()
1062                    .await
1063                    .expect("triggering_evt_rx dropped"),
1064            };
1065            gst::trace!(
1066                RUNTIME_CAT,
1067                obj = self.task_impl.obj(),
1068                "State machine popped {triggering_evt:?}"
1069            );
1070
1071            match triggering_evt.trigger {
1072                Trigger::Error => {
1073                    let mut task_inner = task_inner.lock().unwrap();
1074                    task_inner.switch_to_err(triggering_evt);
1075                    gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Switched to Error");
1076                }
1077                Trigger::Start => {
1078                    let origin = {
1079                        let mut task_inner = task_inner.lock().unwrap();
1080                        let origin = task_inner.state;
1081                        match origin {
1082                            TaskState::Stopped | TaskState::Paused | TaskState::Prepared => (),
1083                            TaskState::PausedFlushing => {
1084                                task_inner.switch_to_state(TaskState::Flushing, triggering_evt);
1085                                gst::trace!(
1086                                    RUNTIME_CAT,
1087                                    obj = self.task_impl.obj(),
1088                                    "Switched from PausedFlushing to Flushing"
1089                                );
1090                                continue;
1091                            }
1092                            TaskState::Error => {
1093                                triggering_evt.send_err_ack();
1094                                continue;
1095                            }
1096                            state => {
1097                                task_inner.skip_triggering_evt(triggering_evt);
1098                                gst::trace!(
1099                                    RUNTIME_CAT,
1100                                    obj = self.task_impl.obj(),
1101                                    "Skipped Start in state {state:?}"
1102                                );
1103                                continue;
1104                            }
1105                        }
1106
1107                        origin
1108                    };
1109
1110                    self.start(triggering_evt, origin, &task_inner).await;
1111                    // next/pending triggering event handled in next iteration
1112                }
1113                Trigger::Pause => {
1114                    let (origin, target) = {
1115                        let mut task_inner = task_inner.lock().unwrap();
1116                        let origin = task_inner.state;
1117                        match origin {
1118                            TaskState::Started | TaskState::Stopped | TaskState::Prepared => {
1119                                (origin, TaskState::Paused)
1120                            }
1121                            TaskState::Flushing => (origin, TaskState::PausedFlushing),
1122                            TaskState::Error => (TaskState::Error, TaskState::Error),
1123                            state => {
1124                                task_inner.skip_triggering_evt(triggering_evt);
1125                                gst::trace!(
1126                                    RUNTIME_CAT,
1127                                    obj = self.task_impl.obj(),
1128                                    "Skipped Pause in state {state:?}"
1129                                );
1130                                continue;
1131                            }
1132                        }
1133                    };
1134
1135                    let res = exec_action!(self, pause, triggering_evt, origin, &task_inner);
1136                    if let Ok(triggering_evt) = res {
1137                        task_inner
1138                            .lock()
1139                            .unwrap()
1140                            .switch_to_state(target, triggering_evt);
1141                        gst::trace!(
1142                            RUNTIME_CAT,
1143                            obj = self.task_impl.obj(),
1144                            "Task loop {target:?}"
1145                        );
1146                    }
1147                }
1148                Trigger::Stop => {
1149                    let (origin, target) = {
1150                        let mut task_inner = task_inner.lock().unwrap();
1151                        let origin = task_inner.state;
1152                        match origin {
1153                            TaskState::Started
1154                            | TaskState::Paused
1155                            | TaskState::PausedFlushing
1156                            | TaskState::Flushing => (origin, TaskState::Stopped),
1157                            TaskState::Error => (TaskState::Error, TaskState::Error),
1158                            state => {
1159                                task_inner.skip_triggering_evt(triggering_evt);
1160                                gst::trace!(
1161                                    RUNTIME_CAT,
1162                                    obj = self.task_impl.obj(),
1163                                    "Skipped Stop in state {state:?}"
1164                                );
1165                                continue;
1166                            }
1167                        }
1168                    };
1169
1170                    let res = exec_action!(self, stop, triggering_evt, origin, &task_inner);
1171                    if let Ok(triggering_evt) = res {
1172                        task_inner
1173                            .lock()
1174                            .unwrap()
1175                            .switch_to_state(target, triggering_evt);
1176                        gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1177                    }
1178                }
1179                Trigger::FlushStart => {
1180                    let (origin, target) = {
1181                        let mut task_inner = task_inner.lock().unwrap();
1182                        let origin = task_inner.state;
1183                        match origin {
1184                            TaskState::Started => (origin, TaskState::Flushing),
1185                            TaskState::Paused => (origin, TaskState::PausedFlushing),
1186                            TaskState::Error => (TaskState::Error, TaskState::Error),
1187                            state => {
1188                                task_inner.skip_triggering_evt(triggering_evt);
1189                                gst::trace!(
1190                                    RUNTIME_CAT,
1191                                    obj = self.task_impl.obj(),
1192                                    "Skipped FlushStart in state {state:?}"
1193                                );
1194                                continue;
1195                            }
1196                        }
1197                    };
1198
1199                    let res = exec_action!(self, flush_start, triggering_evt, origin, &task_inner);
1200                    if let Ok(triggering_evt) = res {
1201                        task_inner
1202                            .lock()
1203                            .unwrap()
1204                            .switch_to_state(target, triggering_evt);
1205                        gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1206                    }
1207                }
1208                Trigger::FlushStop => {
1209                    let origin = task_inner.lock().unwrap().state;
1210                    let is_paused = match origin {
1211                        TaskState::Flushing => false,
1212                        TaskState::PausedFlushing => true,
1213                        TaskState::Error => {
1214                            triggering_evt.send_err_ack();
1215                            continue;
1216                        }
1217                        state => {
1218                            task_inner
1219                                .lock()
1220                                .unwrap()
1221                                .skip_triggering_evt(triggering_evt);
1222                            gst::trace!(
1223                                RUNTIME_CAT,
1224                                obj = self.task_impl.obj(),
1225                                "Skipped FlushStop in state {state:?}"
1226                            );
1227                            continue;
1228                        }
1229                    };
1230
1231                    let res = exec_action!(self, flush_stop, triggering_evt, origin, &task_inner);
1232                    if let Ok(triggering_evt) = res {
1233                        if is_paused {
1234                            task_inner
1235                                .lock()
1236                                .unwrap()
1237                                .switch_to_state(TaskState::Paused, triggering_evt);
1238                            gst::trace!(
1239                                RUNTIME_CAT,
1240                                obj = self.task_impl.obj(),
1241                                "Switched from PausedFlushing to Paused"
1242                            );
1243                        } else {
1244                            self.start(triggering_evt, origin, &task_inner).await;
1245                            // next/pending triggering event handled in next iteration
1246                        }
1247                    }
1248                }
1249                Trigger::Unprepare => {
1250                    // Unprepare is not joined by an ack_rx but by joining the state machine handle
1251                    self.task_impl.unprepare().await;
1252
1253                    task_inner
1254                        .lock()
1255                        .unwrap()
1256                        .switch_to_state(TaskState::Unprepared, triggering_evt);
1257
1258                    break;
1259                }
1260                _ => unreachable!("State machine handler {:?}", triggering_evt),
1261            }
1262        }
1263
1264        gst::trace!(
1265            RUNTIME_CAT,
1266            obj = self.task_impl.obj(),
1267            "Task state machine terminated"
1268        );
1269    }
1270
1271    async fn start(
1272        &mut self,
1273        mut triggering_evt: TriggeringEvent,
1274        origin: TaskState,
1275        task_inner: &Arc<Mutex<TaskInner>>,
1276    ) {
1277        match exec_action!(self, start, triggering_evt, origin, &task_inner) {
1278            Ok(triggering_evt) => {
1279                let mut task_inner = task_inner.lock().unwrap();
1280                task_inner.switch_to_state(TaskState::Started, triggering_evt);
1281            }
1282            Err(_) => {
1283                // error handled by exec_action
1284                return;
1285            }
1286        }
1287
1288        match self.run_loop().await {
1289            Ok(()) => (),
1290            Err(err) => {
1291                let next_trigger = self.task_impl.handle_loop_error(err).await;
1292                let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
1293                self.pending_triggering_evt = Some(triggering_evt);
1294            }
1295        }
1296    }
1297
1298    async fn run_loop(&mut self) -> Result<(), gst::FlowError> {
1299        gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task loop started");
1300
1301        let mut try_next_res;
1302        loop {
1303            try_next_res = {
1304                // select_biased requires the selected futures to implement
1305                // `FusedFuture`. Because async trait functions are not stable,
1306                // we use `BoxFuture` for the `TaskImpl` function, including
1307                // `try_next`. Since we need to get a new `BoxFuture` at
1308                // each iteration, we can guarantee that the future is
1309                // always valid for use in `select_biased`.
1310                let mut try_next_fut = pin!(self.task_impl.try_next().fuse());
1311                futures::select_biased! {
1312                    triggering_evt = self.triggering_evt_rx.next() => {
1313                        let triggering_evt = triggering_evt.expect("broken state machine channel");
1314                        gst::trace!(
1315                            RUNTIME_CAT,
1316                            "Task loop handing {:?} to state machine",
1317                            triggering_evt,
1318                        );
1319                        self.pending_triggering_evt = Some(triggering_evt);
1320                        return Ok(());
1321                    }
1322                    try_next_res = try_next_fut => try_next_res,
1323                }
1324            };
1325
1326            let item = try_next_res.inspect_err(|err| {
1327                gst::debug!(
1328                    RUNTIME_CAT,
1329                    obj = self.task_impl.obj(),
1330                    "TaskImpl::try_next returned {err:?}"
1331                );
1332            })?;
1333
1334            self.task_impl.handle_item(item).await.inspect_err(|&err| {
1335                gst::debug!(
1336                    RUNTIME_CAT,
1337                    obj = self.task_impl.obj(),
1338                    "TaskImpl::handle_item returned {err:?}"
1339                );
1340            })?;
1341        }
1342    }
1343}
1344
1345#[cfg(test)]
1346mod tests {
1347    use futures::channel::{mpsc, oneshot};
1348    use futures::executor::block_on;
1349    use futures::prelude::*;
1350    use gst::glib;
1351    use gst::glib::prelude::*;
1352    use std::future::pending;
1353    use std::time::Duration;
1354
1355    use super::{
1356        Task, TaskImpl,
1357        TaskState::{self, *},
1358        TransitionError, TransitionOk,
1359        TransitionOk::*,
1360        TransitionStatus,
1361        TransitionStatus::*,
1362        Trigger::{self, *},
1363    };
1364    use crate::runtime::{Context, RUNTIME_CAT};
1365
1366    impl TransitionStatus {
1367        // Only useful for unit tests, use `block_on_or_add_sub_task_and_then`
1368        // or `block_on_or_add_sub_task` in user code.
1369        fn block_on(self) -> Result<TransitionOk, TransitionError> {
1370            assert!(!Context::is_context_thread());
1371            match self {
1372                Pending {
1373                    trigger, res_fut, ..
1374                } => {
1375                    gst::debug!(
1376                        RUNTIME_CAT,
1377                        "Awaiting for {:?} ack on current thread",
1378                        trigger,
1379                    );
1380                    futures::executor::block_on(res_fut)
1381                }
1382                Ready(res) => res,
1383            }
1384        }
1385    }
1386
1387    #[track_caller]
1388    fn stop_then_unprepare(task: Task) {
1389        task.stop().block_on().unwrap();
1390        task.unprepare().block_on().unwrap();
1391    }
1392
1393    #[test]
1394    fn nominal() {
1395        gst::init().unwrap();
1396
1397        struct TaskTest {
1398            obj: gst::Object,
1399            prepared_sender: mpsc::Sender<()>,
1400            started_sender: mpsc::Sender<()>,
1401            try_next_ready_sender: mpsc::Sender<()>,
1402            try_next_receiver: mpsc::Receiver<()>,
1403            handle_item_ready_sender: mpsc::Sender<()>,
1404            handle_item_sender: mpsc::Sender<()>,
1405            paused_sender: mpsc::Sender<()>,
1406            stopped_sender: mpsc::Sender<()>,
1407            unprepared_sender: mpsc::Sender<()>,
1408        }
1409
1410        impl TaskImpl for TaskTest {
1411            type Item = ();
1412
1413            fn obj(&self) -> &impl IsA<glib::Object> {
1414                &self.obj
1415            }
1416
1417            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1418                gst::debug!(RUNTIME_CAT, "nominal: prepared");
1419                self.prepared_sender.send(()).await.unwrap();
1420                Ok(())
1421            }
1422
1423            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1424                gst::debug!(RUNTIME_CAT, "nominal: started");
1425                self.started_sender.send(()).await.unwrap();
1426                Ok(())
1427            }
1428
1429            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1430                gst::debug!(RUNTIME_CAT, "nominal: entering try_next");
1431                self.try_next_ready_sender.send(()).await.unwrap();
1432                gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next");
1433                self.try_next_receiver.next().await.unwrap();
1434                Ok(())
1435            }
1436
1437            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1438                gst::debug!(RUNTIME_CAT, "nominal: entering handle_item");
1439                self.handle_item_ready_sender.send(()).await.unwrap();
1440
1441                gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item");
1442                self.handle_item_sender.send(()).await.unwrap();
1443                gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item");
1444
1445                Ok(())
1446            }
1447
1448            async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
1449                gst::debug!(RUNTIME_CAT, "nominal: paused");
1450                self.paused_sender.send(()).await.unwrap();
1451                Ok(())
1452            }
1453
1454            async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
1455                gst::debug!(RUNTIME_CAT, "nominal: stopped");
1456                self.stopped_sender.send(()).await.unwrap();
1457                Ok(())
1458            }
1459
1460            async fn unprepare(&mut self) {
1461                gst::debug!(RUNTIME_CAT, "nominal: unprepared");
1462                self.unprepared_sender.send(()).await.unwrap();
1463            }
1464        }
1465
1466        let context = Context::acquire("nominal", Duration::from_millis(2)).unwrap();
1467
1468        let task = Task::default();
1469
1470        assert_eq!(task.state(), Unprepared);
1471
1472        gst::debug!(RUNTIME_CAT, "nominal: preparing");
1473
1474        let (prepared_sender, mut prepared_receiver) = mpsc::channel(1);
1475        let (started_sender, mut started_receiver) = mpsc::channel(1);
1476        let (try_next_ready_sender, mut try_next_ready_receiver) = mpsc::channel(1);
1477        let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
1478        let (handle_item_ready_sender, mut handle_item_ready_receiver) = mpsc::channel(1);
1479        let (handle_item_sender, mut handle_item_receiver) = mpsc::channel(0);
1480        let (paused_sender, mut paused_receiver) = mpsc::channel(1);
1481        let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
1482        let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
1483        let obj = gst::Pad::builder(gst::PadDirection::Unknown)
1484            .name("runtime::Task::nominal")
1485            .build();
1486        let prepare_status = task.prepare(
1487            TaskTest {
1488                obj: obj.clone().into(),
1489                prepared_sender,
1490                started_sender,
1491                try_next_ready_sender,
1492                try_next_receiver,
1493                handle_item_ready_sender,
1494                handle_item_sender,
1495                paused_sender,
1496                stopped_sender,
1497                unprepared_sender,
1498            },
1499            context,
1500        );
1501
1502        assert!(prepare_status.is_pending());
1503        match prepare_status {
1504            Pending {
1505                trigger: Prepare,
1506                origin: Unprepared,
1507                ..
1508            } => (),
1509            other => panic!("{other:?}"),
1510        };
1511
1512        gst::debug!(RUNTIME_CAT, "nominal: starting (async prepare)");
1513        let start_status = task.start().check().unwrap();
1514
1515        block_on(prepared_receiver.next()).unwrap();
1516        // also tests await_maybe_on_context
1517        assert_eq!(
1518            prepare_status.block_on_or_add_subtask(&obj).unwrap(),
1519            Complete {
1520                origin: Unprepared,
1521                target: Prepared,
1522            },
1523        );
1524
1525        block_on(started_receiver.next()).unwrap();
1526        assert_eq!(
1527            start_status.block_on().unwrap(),
1528            Complete {
1529                origin: Prepared,
1530                target: Started,
1531            }
1532        );
1533        assert_eq!(task.state(), Started);
1534
1535        // unlock task loop and keep looping
1536        block_on(try_next_ready_receiver.next()).unwrap();
1537        block_on(try_next_sender.send(())).unwrap();
1538        block_on(handle_item_ready_receiver.next()).unwrap();
1539        block_on(handle_item_receiver.next()).unwrap();
1540
1541        gst::debug!(RUNTIME_CAT, "nominal: starting (redundant)");
1542        // already started
1543        assert_eq!(
1544            task.start().block_on().unwrap(),
1545            Skipped {
1546                trigger: Start,
1547                state: Started,
1548            },
1549        );
1550        assert_eq!(task.state(), Started);
1551
1552        // Attempt to prepare Task in state Started (also tests check)
1553        match task.unprepare().check().unwrap_err() {
1554            TransitionError {
1555                trigger: Unprepare,
1556                state: Started,
1557                ..
1558            } => (),
1559            other => panic!("{other:?}"),
1560        }
1561
1562        gst::debug!(RUNTIME_CAT, "nominal: pause cancelling try_next");
1563        block_on(try_next_ready_receiver.next()).unwrap();
1564
1565        let pause_status = task.pause().check().unwrap();
1566        gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1567        block_on(paused_receiver.next()).unwrap();
1568        assert_eq!(
1569            pause_status.block_on().unwrap(),
1570            Complete {
1571                origin: Started,
1572                target: Paused,
1573            },
1574        );
1575
1576        // handle_item not reached
1577        assert!(handle_item_ready_receiver.try_next().is_err());
1578        // try_next not reached again
1579        assert!(try_next_ready_receiver.try_next().is_err());
1580
1581        gst::debug!(
1582            RUNTIME_CAT,
1583            "nominal: starting (after pause cancelling try_next)"
1584        );
1585        let start_receiver = task.start().check().unwrap();
1586        block_on(started_receiver.next()).unwrap();
1587        assert_eq!(
1588            start_receiver.block_on().unwrap(),
1589            Complete {
1590                origin: Paused,
1591                target: Started,
1592            },
1593        );
1594        assert_eq!(task.state(), Started);
1595
1596        gst::debug!(RUNTIME_CAT, "nominal: pause // handle_item");
1597        block_on(try_next_ready_receiver.next()).unwrap();
1598        block_on(try_next_sender.send(())).unwrap();
1599        // Make sure item is picked
1600        block_on(handle_item_ready_receiver.next()).unwrap();
1601
1602        gst::debug!(RUNTIME_CAT, "nominal: requesting to pause");
1603        let pause_status = task.pause().check().unwrap();
1604
1605        gst::debug!(RUNTIME_CAT, "nominal: unlocking item handling");
1606        block_on(handle_item_receiver.next()).unwrap();
1607
1608        gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1609        block_on(paused_receiver.next()).unwrap();
1610        assert_eq!(
1611            pause_status.block_on().unwrap(),
1612            Complete {
1613                origin: Started,
1614                target: Paused,
1615            },
1616        );
1617
1618        // try_next not reached again
1619        assert!(try_next_ready_receiver.try_next().is_err());
1620
1621        gst::debug!(
1622            RUNTIME_CAT,
1623            "nominal: starting (after pause // handle_item)"
1624        );
1625        let start_receiver = task.start().check().unwrap();
1626        block_on(started_receiver.next()).unwrap();
1627        assert_eq!(
1628            start_receiver.block_on().unwrap(),
1629            Complete {
1630                origin: Paused,
1631                target: Started,
1632            },
1633        );
1634        assert_eq!(task.state(), Started);
1635
1636        gst::debug!(RUNTIME_CAT, "nominal: stopping");
1637        assert_eq!(
1638            task.stop().block_on().unwrap(),
1639            Complete {
1640                origin: Started,
1641                target: Stopped,
1642            },
1643        );
1644
1645        assert_eq!(task.state(), Stopped);
1646        let _ = block_on(stopped_receiver.next());
1647
1648        // purge remaining iteration received before stop if any
1649        let _ = try_next_ready_receiver.try_next();
1650
1651        gst::debug!(RUNTIME_CAT, "nominal: starting (after stop)");
1652        assert_eq!(
1653            task.start().block_on().unwrap(),
1654            Complete {
1655                origin: Stopped,
1656                target: Started,
1657            },
1658        );
1659        let _ = block_on(started_receiver.next());
1660
1661        gst::debug!(RUNTIME_CAT, "nominal: stopping");
1662        assert_eq!(
1663            task.stop().block_on().unwrap(),
1664            Complete {
1665                origin: Started,
1666                target: Stopped,
1667            },
1668        );
1669
1670        assert_eq!(
1671            task.unprepare().block_on().unwrap(),
1672            Complete {
1673                origin: Stopped,
1674                target: Unprepared,
1675            },
1676        );
1677
1678        assert_eq!(task.state(), Unprepared);
1679        let _ = block_on(unprepared_receiver.next());
1680    }
1681
1682    #[test]
1683    fn prepare_error() {
1684        gst::init().unwrap();
1685
1686        struct TaskPrepareTest {
1687            obj: gst::Object,
1688            prepare_error_sender: mpsc::Sender<()>,
1689        }
1690
1691        impl TaskImpl for TaskPrepareTest {
1692            type Item = ();
1693
1694            fn obj(&self) -> &impl IsA<glib::Object> {
1695                &self.obj
1696            }
1697
1698            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1699                gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
1700                Err(gst::error_msg!(
1701                    gst::ResourceError::Failed,
1702                    ["prepare_error: intentional error"]
1703                ))
1704            }
1705
1706            async fn handle_action_error(
1707                &mut self,
1708                trigger: Trigger,
1709                state: TaskState,
1710                err: gst::ErrorMessage,
1711            ) -> Trigger {
1712                gst::debug!(
1713                    RUNTIME_CAT,
1714                    "prepare_error: handling prepare error {:?}",
1715                    err
1716                );
1717                match (trigger, state) {
1718                    (Prepare, Unprepared) => {
1719                        self.prepare_error_sender.send(()).await.unwrap();
1720                    }
1721                    other => unreachable!("{:?}", other),
1722                }
1723                Trigger::Error
1724            }
1725
1726            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1727                unreachable!("prepare_error: try_next");
1728            }
1729
1730            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1731                unreachable!("prepare_error: handle_item");
1732            }
1733        }
1734
1735        let context = Context::acquire("prepare_error", Duration::from_millis(2)).unwrap();
1736
1737        let task = Task::default();
1738
1739        assert_eq!(task.state(), Unprepared);
1740
1741        let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
1742        let prepare_status = task.prepare(
1743            TaskPrepareTest {
1744                obj: gst::Pad::builder(gst::PadDirection::Unknown)
1745                    .name("runtime::Task::prepare_error")
1746                    .build()
1747                    .into(),
1748                prepare_error_sender,
1749            },
1750            context,
1751        );
1752
1753        gst::debug!(
1754            RUNTIME_CAT,
1755            "prepare_error: await action error notification"
1756        );
1757        block_on(prepare_error_receiver.next()).unwrap();
1758
1759        match prepare_status.block_on().unwrap_err() {
1760            TransitionError {
1761                trigger: Trigger::Error,
1762                state: Preparing,
1763                ..
1764            } => (),
1765            other => panic!("{other:?}"),
1766        }
1767
1768        // Wait for state machine to reach Error
1769        while TaskState::Error != task.state() {
1770            std::thread::sleep(Duration::from_millis(2));
1771        }
1772
1773        match task.start().block_on().unwrap_err() {
1774            TransitionError {
1775                trigger: Start,
1776                state: TaskState::Error,
1777                ..
1778            } => (),
1779            other => panic!("{other:?}"),
1780        }
1781
1782        block_on(task.unprepare()).unwrap();
1783    }
1784
1785    #[test]
1786    fn prepare_start_ok() {
1787        // Hold the preparation function so that it completes after the start request is engaged
1788
1789        gst::init().unwrap();
1790
1791        struct TaskPrepareTest {
1792            obj: gst::Object,
1793            prepare_receiver: mpsc::Receiver<()>,
1794        }
1795
1796        impl TaskImpl for TaskPrepareTest {
1797            type Item = ();
1798
1799            fn obj(&self) -> &impl IsA<glib::Object> {
1800                &self.obj
1801            }
1802
1803            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1804                gst::debug!(
1805                    RUNTIME_CAT,
1806                    "prepare_start_ok: preparation awaiting trigger"
1807                );
1808                self.prepare_receiver.next().await.unwrap();
1809                gst::debug!(RUNTIME_CAT, "prepare_start_ok: preparation complete Ok");
1810                Ok(())
1811            }
1812
1813            async fn handle_action_error(
1814                &mut self,
1815                _trigger: Trigger,
1816                _state: TaskState,
1817                _err: gst::ErrorMessage,
1818            ) -> Trigger {
1819                unreachable!("prepare_start_ok: handle_prepare_error");
1820            }
1821
1822            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1823                gst::debug!(RUNTIME_CAT, "prepare_start_ok: started");
1824                Ok(())
1825            }
1826
1827            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1828                pending().await
1829            }
1830
1831            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1832                unreachable!("prepare_start_ok: handle_item");
1833            }
1834        }
1835
1836        let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap();
1837
1838        let task = Task::default();
1839
1840        let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
1841        let fut = task.prepare(
1842            TaskPrepareTest {
1843                obj: gst::Pad::builder(gst::PadDirection::Unknown)
1844                    .name("runtime::Task::prepare_start_ok")
1845                    .build()
1846                    .into(),
1847                prepare_receiver,
1848            },
1849            context,
1850        );
1851        drop(fut);
1852
1853        let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
1854        let (ready_sender, ready_receiver) = oneshot::channel();
1855        let start_handle = start_ctx.spawn(async move {
1856            assert_eq!(task.state(), Preparing);
1857            gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting");
1858            let start_status = task.start();
1859            match start_status {
1860                Pending {
1861                    trigger: Start,
1862                    origin: Preparing,
1863                    ..
1864                } => (),
1865                other => panic!("{other:?}"),
1866            }
1867            ready_sender.send(()).unwrap();
1868            assert_eq!(
1869                start_status.await.unwrap(),
1870                Complete {
1871                    origin: Prepared,
1872                    target: Started,
1873                },
1874            );
1875            assert_eq!(task.state(), Started);
1876
1877            let stop_status = task.stop();
1878            match stop_status {
1879                Pending {
1880                    trigger: Stop,
1881                    origin: Started,
1882                    ..
1883                } => (),
1884                other => panic!("{other:?}"),
1885            }
1886            assert_eq!(
1887                stop_status.await.unwrap(),
1888                Complete {
1889                    origin: Started,
1890                    target: Stopped,
1891                },
1892            );
1893            assert_eq!(task.state(), Stopped);
1894
1895            let unprepare_status = task.unprepare();
1896            match unprepare_status {
1897                Pending {
1898                    trigger: Unprepare,
1899                    origin: Stopped,
1900                    ..
1901                } => (),
1902                other => panic!("{other:?}"),
1903            };
1904            assert_eq!(
1905                unprepare_status.await.unwrap(),
1906                Complete {
1907                    origin: Stopped,
1908                    target: Unprepared,
1909                },
1910            );
1911            assert_eq!(task.state(), Unprepared);
1912        });
1913
1914        gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
1915        block_on(ready_receiver).unwrap();
1916
1917        gst::debug!(RUNTIME_CAT, "prepare_start_ok: triggering preparation");
1918        block_on(prepare_sender.send(())).unwrap();
1919
1920        block_on(start_handle).unwrap();
1921    }
1922
1923    #[test]
1924    fn prepare_start_error() {
1925        // Hold the preparation function so that it completes after the start request is engaged
1926
1927        gst::init().unwrap();
1928
1929        struct TaskPrepareTest {
1930            obj: gst::Object,
1931            prepare_receiver: mpsc::Receiver<()>,
1932            prepare_error_sender: mpsc::Sender<()>,
1933        }
1934
1935        impl TaskImpl for TaskPrepareTest {
1936            type Item = ();
1937
1938            fn obj(&self) -> &impl IsA<glib::Object> {
1939                &self.obj
1940            }
1941
1942            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1943                gst::debug!(
1944                    RUNTIME_CAT,
1945                    "prepare_start_error: preparation awaiting trigger"
1946                );
1947                self.prepare_receiver.next().await.unwrap();
1948                gst::debug!(RUNTIME_CAT, "prepare_start_error: preparation complete Err");
1949
1950                Err(gst::error_msg!(
1951                    gst::ResourceError::Failed,
1952                    ["prepare_start_error: intentional error"]
1953                ))
1954            }
1955
1956            async fn handle_action_error(
1957                &mut self,
1958                trigger: Trigger,
1959                state: TaskState,
1960                err: gst::ErrorMessage,
1961            ) -> Trigger {
1962                gst::debug!(
1963                    RUNTIME_CAT,
1964                    "prepare_start_error: handling prepare error {:?}",
1965                    err
1966                );
1967                match (trigger, state) {
1968                    (Prepare, Unprepared) => {
1969                        self.prepare_error_sender.send(()).await.unwrap();
1970                    }
1971                    other => panic!("action error for {other:?}"),
1972                }
1973                Trigger::Error
1974            }
1975
1976            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1977                unreachable!("prepare_start_error: start");
1978            }
1979
1980            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1981                unreachable!("prepare_start_error: try_next");
1982            }
1983
1984            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1985                unreachable!("prepare_start_error: handle_item");
1986            }
1987        }
1988
1989        let context = Context::acquire("prepare_start_error", Duration::from_millis(2)).unwrap();
1990
1991        let task = Task::default();
1992
1993        let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
1994        let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
1995        let prepare_status = task.prepare(
1996            TaskPrepareTest {
1997                obj: gst::Pad::builder(gst::PadDirection::Unknown)
1998                    .name("runtime::Task::prepare_start_error")
1999                    .build()
2000                    .into(),
2001                prepare_receiver,
2002                prepare_error_sender,
2003            },
2004            context,
2005        );
2006        match prepare_status {
2007            Pending {
2008                trigger: Prepare,
2009                origin: Unprepared,
2010                ..
2011            } => (),
2012            other => panic!("{other:?}"),
2013        };
2014
2015        let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap();
2016        let (ready_sender, ready_receiver) = oneshot::channel();
2017        let start_handle = start_ctx.spawn(async move {
2018            gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)");
2019            let fut = task.start();
2020            drop(fut);
2021            ready_sender.send(()).unwrap();
2022            // FIXME we loose the origin Trigger (Start)
2023            // and only get the Trigger returned by handle_action_error
2024            // see also: comment in exec_action!
2025            match prepare_status.await {
2026                Err(TransitionError {
2027                    trigger: Trigger::Error,
2028                    state: Preparing,
2029                    ..
2030                }) => (),
2031                other => panic!("{other:?}"),
2032            }
2033
2034            let unprepare_status = task.unprepare();
2035            match unprepare_status {
2036                Pending {
2037                    trigger: Unprepare,
2038                    origin: TaskState::Error,
2039                    ..
2040                } => (),
2041                other => panic!("{other:?}"),
2042            };
2043            assert_eq!(
2044                unprepare_status.await.unwrap(),
2045                Complete {
2046                    origin: TaskState::Error,
2047                    target: Unprepared,
2048                },
2049            );
2050        });
2051
2052        gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx");
2053        block_on(ready_receiver).unwrap();
2054
2055        gst::debug!(
2056            RUNTIME_CAT,
2057            "prepare_start_error: triggering preparation (failure)"
2058        );
2059        block_on(prepare_sender.send(())).unwrap();
2060
2061        gst::debug!(
2062            RUNTIME_CAT,
2063            "prepare_start_error: await prepare error notification"
2064        );
2065        block_on(prepare_error_receiver.next()).unwrap();
2066
2067        block_on(start_handle).unwrap();
2068    }
2069
2070    #[test]
2071    fn item_error() {
2072        gst::init().unwrap();
2073
2074        struct TaskTest {
2075            obj: gst::Object,
2076            try_next_receiver: mpsc::Receiver<gst::FlowError>,
2077        }
2078
2079        impl TaskImpl for TaskTest {
2080            type Item = gst::FlowError;
2081
2082            fn obj(&self) -> &impl IsA<glib::Object> {
2083                &self.obj
2084            }
2085
2086            async fn try_next(&mut self) -> Result<gst::FlowError, gst::FlowError> {
2087                gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
2088                Ok(self.try_next_receiver.next().await.unwrap())
2089            }
2090
2091            async fn handle_item(&mut self, item: gst::FlowError) -> Result<(), gst::FlowError> {
2092                gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item);
2093                Err(item)
2094            }
2095        }
2096
2097        let context = Context::acquire("item_error", Duration::from_millis(2)).unwrap();
2098        let task = Task::default();
2099        gst::debug!(RUNTIME_CAT, "item_error: prepare and start");
2100        let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
2101        task.prepare(
2102            TaskTest {
2103                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2104                    .name("runtime::Task::item_error")
2105                    .build()
2106                    .into(),
2107                try_next_receiver,
2108            },
2109            context,
2110        )
2111        .block_on()
2112        .unwrap();
2113        task.start().block_on().unwrap();
2114
2115        gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos");
2116        block_on(try_next_sender.send(gst::FlowError::Eos)).unwrap();
2117        // Wait for state machine to reach Stopped
2118        while Stopped != task.state() {
2119            std::thread::sleep(Duration::from_millis(2));
2120        }
2121
2122        gst::debug!(RUNTIME_CAT, "item_error: starting (after stop)");
2123        assert_eq!(
2124            task.start().block_on().unwrap(),
2125            Complete {
2126                origin: Stopped,
2127                target: Started,
2128            },
2129        );
2130
2131        gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Error");
2132        block_on(try_next_sender.send(gst::FlowError::Error)).unwrap();
2133        // Wait for state machine to reach Error
2134        while TaskState::Error != task.state() {
2135            std::thread::sleep(Duration::from_millis(2));
2136        }
2137
2138        gst::debug!(RUNTIME_CAT, "item_error: attempting to start (after Error)");
2139        match task.start().block_on().unwrap_err() {
2140            TransitionError {
2141                trigger: Start,
2142                state: TaskState::Error,
2143                ..
2144            } => (),
2145            other => panic!("{other:?}"),
2146        }
2147
2148        assert_eq!(
2149            task.unprepare().block_on().unwrap(),
2150            Complete {
2151                origin: TaskState::Error,
2152                target: Unprepared,
2153            },
2154        );
2155    }
2156
2157    #[test]
2158    fn flush_regular_sync() {
2159        gst::init().unwrap();
2160
2161        struct TaskFlushTest {
2162            obj: gst::Object,
2163            flush_start_sender: mpsc::Sender<()>,
2164            flush_stop_sender: mpsc::Sender<()>,
2165        }
2166
2167        impl TaskImpl for TaskFlushTest {
2168            type Item = ();
2169
2170            fn obj(&self) -> &impl IsA<glib::Object> {
2171                &self.obj
2172            }
2173
2174            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2175                pending().await
2176            }
2177
2178            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2179                unreachable!("flush_regular_sync: handle_item");
2180            }
2181
2182            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2183                gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
2184                self.flush_start_sender.send(()).await.unwrap();
2185                Ok(())
2186            }
2187
2188            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2189                gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopped flushing");
2190                self.flush_stop_sender.send(()).await.unwrap();
2191                Ok(())
2192            }
2193        }
2194
2195        let context = Context::acquire("flush_regular_sync", Duration::from_millis(2)).unwrap();
2196
2197        let task = Task::default();
2198
2199        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2200        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2201        let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2202            .name("runtime::Task::flush_regular_sync")
2203            .build();
2204        let fut = task.prepare(
2205            TaskFlushTest {
2206                obj: obj.clone().into(),
2207                flush_start_sender,
2208                flush_stop_sender,
2209            },
2210            context,
2211        );
2212        drop(fut);
2213
2214        gst::debug!(RUNTIME_CAT, "flush_regular_sync: start");
2215        block_on(task.start()).unwrap();
2216
2217        gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush");
2218        assert_eq!(
2219            task.flush_start().block_on().unwrap(),
2220            Complete {
2221                origin: Started,
2222                target: Flushing,
2223            },
2224        );
2225        assert_eq!(task.state(), Flushing);
2226
2227        block_on(flush_start_receiver.next()).unwrap();
2228
2229        gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
2230        assert_eq!(
2231            task.flush_stop().block_on_or_add_subtask(&obj).unwrap(),
2232            Complete {
2233                origin: Flushing,
2234                target: Started,
2235            },
2236        );
2237        assert_eq!(task.state(), Started);
2238
2239        block_on(flush_stop_receiver.next()).unwrap();
2240
2241        let fut = task.pause();
2242        drop(fut);
2243        stop_then_unprepare(task);
2244    }
2245
2246    #[test]
2247    fn flush_regular_different_context() {
2248        // Purpose: make sure a flush sequence triggered from a Context doesn't block.
2249        gst::init().unwrap();
2250
2251        struct TaskFlushTest {
2252            obj: gst::Object,
2253            flush_start_sender: mpsc::Sender<()>,
2254            flush_stop_sender: mpsc::Sender<()>,
2255        }
2256
2257        impl TaskImpl for TaskFlushTest {
2258            type Item = ();
2259
2260            fn obj(&self) -> &impl IsA<glib::Object> {
2261                &self.obj
2262            }
2263
2264            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2265                pending().await
2266            }
2267
2268            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2269                unreachable!("flush_regular_different_context: handle_item");
2270            }
2271
2272            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2273                gst::debug!(
2274                    RUNTIME_CAT,
2275                    "flush_regular_different_context: started flushing"
2276                );
2277                self.flush_start_sender.send(()).await.unwrap();
2278                Ok(())
2279            }
2280
2281            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2282                gst::debug!(
2283                    RUNTIME_CAT,
2284                    "flush_regular_different_context: stopped flushing"
2285                );
2286                self.flush_stop_sender.send(()).await.unwrap();
2287                Ok(())
2288            }
2289        }
2290
2291        let context =
2292            Context::acquire("flush_regular_different_context", Duration::from_millis(2)).unwrap();
2293
2294        let task = Task::default();
2295
2296        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2297        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2298        let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2299            .name("runtime::Task::flush_regular_different_context")
2300            .build();
2301        let fut = task.prepare(
2302            TaskFlushTest {
2303                obj: obj.clone().into(),
2304                flush_start_sender,
2305                flush_stop_sender,
2306            },
2307            context,
2308        );
2309        drop(fut);
2310
2311        gst::debug!(RUNTIME_CAT, "flush_regular_different_context: start");
2312        task.start().block_on().unwrap();
2313
2314        let oob_context = Context::acquire(
2315            "flush_regular_different_context_oob",
2316            Duration::from_millis(2),
2317        )
2318        .unwrap();
2319
2320        let task_clone = task.clone();
2321        let flush_res_fut = oob_context.spawn(async move {
2322            let flush_start_status = task_clone.flush_start();
2323            match flush_start_status {
2324                Pending {
2325                    trigger: FlushStart,
2326                    origin: Started,
2327                    ..
2328                } => (),
2329                other => panic!("{other:?}"),
2330            };
2331            assert_eq!(
2332                flush_start_status.await.unwrap(),
2333                Complete {
2334                    origin: Started,
2335                    target: Flushing,
2336                },
2337            );
2338            assert_eq!(task_clone.state(), Flushing);
2339            flush_start_receiver.next().await.unwrap();
2340
2341            let flush_stop_status = task_clone.flush_stop();
2342            match flush_stop_status {
2343                Pending {
2344                    trigger: FlushStop,
2345                    origin: Flushing,
2346                    ..
2347                } => (),
2348                other => panic!("{other:?}"),
2349            };
2350            assert_eq!(
2351                flush_stop_status.block_on_or_add_subtask(&obj).unwrap(),
2352                NotWaiting {
2353                    trigger: FlushStop,
2354                    origin: Flushing,
2355                },
2356            );
2357
2358            Context::drain_sub_tasks().await.unwrap();
2359            assert_eq!(task_clone.state(), Started);
2360        });
2361
2362        block_on(flush_res_fut).unwrap();
2363        block_on(flush_stop_receiver.next()).unwrap();
2364
2365        stop_then_unprepare(task);
2366    }
2367
2368    #[test]
2369    fn flush_regular_same_context() {
2370        // Purpose: make sure a flush sequence triggered from the same Context doesn't block.
2371        gst::init().unwrap();
2372
2373        struct TaskFlushTest {
2374            obj: gst::Object,
2375            flush_start_sender: mpsc::Sender<()>,
2376            flush_stop_sender: mpsc::Sender<()>,
2377        }
2378
2379        impl TaskImpl for TaskFlushTest {
2380            type Item = ();
2381
2382            fn obj(&self) -> &impl IsA<glib::Object> {
2383                &self.obj
2384            }
2385
2386            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2387                pending().await
2388            }
2389
2390            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2391                unreachable!("flush_regular_same_context: handle_item");
2392            }
2393
2394            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2395                gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
2396                self.flush_start_sender.send(()).await.unwrap();
2397                Ok(())
2398            }
2399
2400            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2401                gst::debug!(RUNTIME_CAT, "flush_regular_same_context: stopped flushing");
2402                self.flush_stop_sender.send(()).await.unwrap();
2403                Ok(())
2404            }
2405        }
2406
2407        let context =
2408            Context::acquire("flush_regular_same_context", Duration::from_millis(2)).unwrap();
2409
2410        let task = Task::default();
2411
2412        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2413        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2414        let fut = task.prepare(
2415            TaskFlushTest {
2416                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2417                    .name("runtime::Task::flush_regular_same_context")
2418                    .build()
2419                    .into(),
2420                flush_start_sender,
2421                flush_stop_sender,
2422            },
2423            context.clone(),
2424        );
2425        drop(fut);
2426
2427        block_on(task.start()).unwrap();
2428
2429        let task_clone = task.clone();
2430        let flush_handle = context.spawn(async move {
2431            let flush_start_status = task_clone.flush_start();
2432            match flush_start_status {
2433                Pending {
2434                    trigger: FlushStart,
2435                    origin: Started,
2436                    ..
2437                } => (),
2438                other => panic!("{other:?}"),
2439            };
2440            assert_eq!(
2441                flush_start_status.await.unwrap(),
2442                Complete {
2443                    origin: Started,
2444                    target: Flushing,
2445                },
2446            );
2447            assert_eq!(task_clone.state(), Flushing);
2448            flush_start_receiver.next().await.unwrap();
2449
2450            let flush_stop_status = task_clone.flush_stop();
2451            match flush_stop_status {
2452                Pending {
2453                    trigger: FlushStop,
2454                    origin: Flushing,
2455                    ..
2456                } => (),
2457                other => panic!("{other:?}"),
2458            };
2459            assert_eq!(
2460                flush_stop_status.await.unwrap(),
2461                Complete {
2462                    origin: Flushing,
2463                    target: Started,
2464                },
2465            );
2466            assert_eq!(task_clone.state(), Started);
2467        });
2468
2469        block_on(flush_handle).unwrap();
2470        block_on(flush_stop_receiver.next()).unwrap();
2471
2472        stop_then_unprepare(task);
2473    }
2474
2475    #[test]
2476    fn flush_from_loop() {
2477        // Purpose: make sure a flush_start triggered from an iteration doesn't block.
2478        gst::init().unwrap();
2479
2480        struct TaskFlushTest {
2481            obj: gst::Object,
2482            task: Task,
2483            flush_start_sender: mpsc::Sender<()>,
2484        }
2485
2486        impl TaskImpl for TaskFlushTest {
2487            type Item = ();
2488
2489            fn obj(&self) -> &impl IsA<glib::Object> {
2490                &self.obj
2491            }
2492
2493            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2494                Ok(())
2495            }
2496
2497            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2498                gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item");
2499                match self.task.flush_start() {
2500                    Pending {
2501                        trigger: FlushStart,
2502                        origin: Started,
2503                        ..
2504                    } => (),
2505                    other => panic!("{other:?}"),
2506                }
2507                Ok(())
2508            }
2509
2510            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2511                gst::debug!(RUNTIME_CAT, "flush_from_loop: started flushing");
2512                self.flush_start_sender.send(()).await.unwrap();
2513                Ok(())
2514            }
2515        }
2516
2517        let context = Context::acquire("flush_from_loop", Duration::from_millis(2)).unwrap();
2518
2519        let task = Task::default();
2520
2521        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2522        let fut = task.prepare(
2523            TaskFlushTest {
2524                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2525                    .name("runtime::Task::flush_from_loop")
2526                    .build()
2527                    .into(),
2528                task: task.clone(),
2529                flush_start_sender,
2530            },
2531            context,
2532        );
2533        drop(fut);
2534
2535        let fut = task.start();
2536        drop(fut);
2537
2538        gst::debug!(
2539            RUNTIME_CAT,
2540            "flush_from_loop: awaiting flush_start notification"
2541        );
2542        block_on(flush_start_receiver.next()).unwrap();
2543
2544        assert_eq!(
2545            task.stop().block_on().unwrap(),
2546            Complete {
2547                origin: Flushing,
2548                target: Stopped,
2549            },
2550        );
2551        task.unprepare().block_on().unwrap();
2552    }
2553
2554    #[test]
2555    fn pause_from_loop() {
2556        // Purpose: make sure a start triggered from an iteration doesn't block.
2557        // E.g. an auto pause cancellation after a delay.
2558        gst::init().unwrap();
2559
2560        struct TaskStartTest {
2561            obj: gst::Object,
2562            task: Task,
2563            pause_sender: mpsc::Sender<()>,
2564        }
2565
2566        impl TaskImpl for TaskStartTest {
2567            type Item = ();
2568
2569            fn obj(&self) -> &impl IsA<glib::Object> {
2570                &self.obj
2571            }
2572
2573            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2574                Ok(())
2575            }
2576
2577            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2578                gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
2579
2580                crate::runtime::timer::delay_for(Duration::from_millis(50)).await;
2581
2582                gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
2583                match self.task.pause() {
2584                    Pending {
2585                        trigger: Pause,
2586                        origin: Started,
2587                        ..
2588                    } => (),
2589                    other => panic!("{other:?}"),
2590                }
2591
2592                Ok(())
2593            }
2594
2595            async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
2596                gst::debug!(RUNTIME_CAT, "pause_from_loop: entering pause action");
2597                self.pause_sender.send(()).await.unwrap();
2598                Ok(())
2599            }
2600        }
2601
2602        let context = Context::acquire("pause_from_loop", Duration::from_millis(2)).unwrap();
2603
2604        let task = Task::default();
2605
2606        let (pause_sender, mut pause_receiver) = mpsc::channel(1);
2607        let fut = task.prepare(
2608            TaskStartTest {
2609                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2610                    .name("runtime::Task::pause_from_loop")
2611                    .build()
2612                    .into(),
2613                task: task.clone(),
2614                pause_sender,
2615            },
2616            context,
2617        );
2618        drop(fut);
2619
2620        let fut = task.start();
2621        drop(fut);
2622
2623        gst::debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification");
2624        block_on(pause_receiver.next()).unwrap();
2625
2626        stop_then_unprepare(task);
2627    }
2628
2629    #[test]
2630    fn trigger_from_action() {
2631        // Purpose: make sure an event triggered from a transition action doesn't block.
2632        gst::init().unwrap();
2633
2634        struct TaskFlushTest {
2635            obj: gst::Object,
2636            task: Task,
2637            flush_stop_sender: mpsc::Sender<()>,
2638        }
2639
2640        impl TaskImpl for TaskFlushTest {
2641            type Item = ();
2642
2643            fn obj(&self) -> &impl IsA<glib::Object> {
2644                &self.obj
2645            }
2646
2647            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2648                pending().await
2649            }
2650
2651            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2652                unreachable!("trigger_from_action: handle_item");
2653            }
2654
2655            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2656                gst::debug!(
2657                    RUNTIME_CAT,
2658                    "trigger_from_action: flush_start triggering flush_stop"
2659                );
2660                match self.task.flush_stop() {
2661                    Pending {
2662                        trigger: FlushStop,
2663                        origin: Started,
2664                        ..
2665                    } => (),
2666                    other => panic!("{other:?}"),
2667                }
2668
2669                Ok(())
2670            }
2671
2672            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2673                gst::debug!(RUNTIME_CAT, "trigger_from_action: stopped flushing");
2674                self.flush_stop_sender.send(()).await.unwrap();
2675                Ok(())
2676            }
2677        }
2678
2679        let context = Context::acquire("trigger_from_action", Duration::from_millis(2)).unwrap();
2680
2681        let task = Task::default();
2682
2683        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2684        let fut = task.prepare(
2685            TaskFlushTest {
2686                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2687                    .name("runtime::Task::trigger_from_action")
2688                    .build()
2689                    .into(),
2690                task: task.clone(),
2691                flush_stop_sender,
2692            },
2693            context,
2694        );
2695        drop(fut);
2696
2697        task.start().block_on().unwrap();
2698        let fut = task.flush_start();
2699        drop(fut);
2700
2701        gst::debug!(
2702            RUNTIME_CAT,
2703            "trigger_from_action: awaiting flush_stop notification"
2704        );
2705        block_on(flush_stop_receiver.next()).unwrap();
2706
2707        stop_then_unprepare(task);
2708    }
2709
2710    #[test]
2711    fn pause_flush_start() {
2712        gst::init().unwrap();
2713
2714        struct TaskFlushTest {
2715            obj: gst::Object,
2716            started_sender: mpsc::Sender<()>,
2717            flush_start_sender: mpsc::Sender<()>,
2718            flush_stop_sender: mpsc::Sender<()>,
2719        }
2720
2721        impl TaskImpl for TaskFlushTest {
2722            type Item = ();
2723
2724            fn obj(&self) -> &impl IsA<glib::Object> {
2725                &self.obj
2726            }
2727
2728            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2729                gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
2730                self.started_sender.send(()).await.unwrap();
2731                Ok(())
2732            }
2733
2734            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2735                pending().await
2736            }
2737
2738            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2739                unreachable!("pause_flush_start: handle_item");
2740            }
2741
2742            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2743                gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
2744                self.flush_start_sender.send(()).await.unwrap();
2745                Ok(())
2746            }
2747
2748            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2749                gst::debug!(RUNTIME_CAT, "pause_flush_start: stopped flushing");
2750                self.flush_stop_sender.send(()).await.unwrap();
2751                Ok(())
2752            }
2753        }
2754
2755        let context = Context::acquire("pause_flush_start", Duration::from_millis(2)).unwrap();
2756
2757        let task = Task::default();
2758
2759        let (started_sender, mut started_receiver) = mpsc::channel(1);
2760        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2761        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2762        let fut = task.prepare(
2763            TaskFlushTest {
2764                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2765                    .name("runtime::Task::pause_flush_start")
2766                    .build()
2767                    .into(),
2768                started_sender,
2769                flush_start_sender,
2770                flush_stop_sender,
2771            },
2772            context,
2773        );
2774        drop(fut);
2775
2776        // Pause, FlushStart, FlushStop, Start
2777
2778        gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing");
2779        assert_eq!(
2780            task.pause().block_on().unwrap(),
2781            Complete {
2782                origin: Prepared,
2783                target: Paused,
2784            },
2785        );
2786
2787        gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush");
2788        assert_eq!(
2789            task.flush_start().block_on().unwrap(),
2790            Complete {
2791                origin: Paused,
2792                target: PausedFlushing,
2793            },
2794        );
2795        assert_eq!(task.state(), PausedFlushing);
2796        block_on(flush_start_receiver.next());
2797
2798        gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
2799        assert_eq!(
2800            task.flush_stop().block_on().unwrap(),
2801            Complete {
2802                origin: PausedFlushing,
2803                target: Paused,
2804            },
2805        );
2806        assert_eq!(task.state(), Paused);
2807        block_on(flush_stop_receiver.next());
2808
2809        // start action not executed
2810        started_receiver.try_next().unwrap_err();
2811
2812        gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing");
2813        assert_eq!(
2814            task.start().block_on().unwrap(),
2815            Complete {
2816                origin: Paused,
2817                target: Started,
2818            },
2819        );
2820        assert_eq!(task.state(), Started);
2821        block_on(started_receiver.next());
2822
2823        stop_then_unprepare(task);
2824    }
2825
2826    #[test]
2827    fn pause_flushing_start() {
2828        gst::init().unwrap();
2829
2830        struct TaskFlushTest {
2831            obj: gst::Object,
2832            started_sender: mpsc::Sender<()>,
2833            flush_start_sender: mpsc::Sender<()>,
2834            flush_stop_sender: mpsc::Sender<()>,
2835        }
2836
2837        impl TaskImpl for TaskFlushTest {
2838            type Item = ();
2839
2840            fn obj(&self) -> &impl IsA<glib::Object> {
2841                &self.obj
2842            }
2843
2844            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2845                gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
2846                self.started_sender.send(()).await.unwrap();
2847                Ok(())
2848            }
2849
2850            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2851                pending().await
2852            }
2853
2854            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2855                unreachable!("pause_flushing_start: handle_item");
2856            }
2857
2858            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2859                gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
2860                self.flush_start_sender.send(()).await.unwrap();
2861                Ok(())
2862            }
2863
2864            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2865                gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopped flushing");
2866                self.flush_stop_sender.send(()).await.unwrap();
2867                Ok(())
2868            }
2869        }
2870
2871        let context = Context::acquire("pause_flushing_start", Duration::from_millis(2)).unwrap();
2872
2873        let task = Task::default();
2874
2875        let (started_sender, mut started_receiver) = mpsc::channel(1);
2876        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2877        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2878        let fut = task.prepare(
2879            TaskFlushTest {
2880                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2881                    .name("runtime::Task::pause_flushing_start")
2882                    .build()
2883                    .into(),
2884                started_sender,
2885                flush_start_sender,
2886                flush_stop_sender,
2887            },
2888            context,
2889        );
2890        drop(fut);
2891
2892        // Pause, FlushStart, Start, FlushStop
2893
2894        gst::debug!(RUNTIME_CAT, "pause_flushing_start: pausing");
2895        let fut = task.pause();
2896        drop(fut);
2897
2898        gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting flush");
2899        block_on(task.flush_start()).unwrap();
2900        assert_eq!(task.state(), PausedFlushing);
2901        block_on(flush_start_receiver.next());
2902
2903        gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
2904        assert_eq!(
2905            task.start().block_on().unwrap(),
2906            Complete {
2907                origin: PausedFlushing,
2908                target: Flushing,
2909            },
2910        );
2911        assert_eq!(task.state(), Flushing);
2912
2913        // start action not executed
2914        started_receiver.try_next().unwrap_err();
2915
2916        gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush");
2917        assert_eq!(
2918            task.flush_stop().block_on().unwrap(),
2919            Complete {
2920                origin: Flushing,
2921                target: Started,
2922            },
2923        );
2924        assert_eq!(task.state(), Started);
2925        block_on(flush_stop_receiver.next());
2926        block_on(started_receiver.next());
2927
2928        stop_then_unprepare(task);
2929    }
2930
2931    #[test]
2932    fn flush_concurrent_start() {
2933        // Purpose: check the racy case of start being triggered in // after flush_start
2934        // e.g.: a FlushStart event received on a Pad and the element starting after a Pause
2935        gst::init().unwrap();
2936
2937        struct TaskStartTest {
2938            obj: gst::Object,
2939            flush_start_sender: mpsc::Sender<()>,
2940            flush_stop_sender: mpsc::Sender<()>,
2941        }
2942
2943        impl TaskImpl for TaskStartTest {
2944            type Item = ();
2945
2946            fn obj(&self) -> &impl IsA<glib::Object> {
2947                &self.obj
2948            }
2949
2950            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2951                pending().await
2952            }
2953
2954            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2955                unreachable!("flush_concurrent_start: handle_item");
2956            }
2957
2958            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2959                gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
2960                self.flush_start_sender.send(()).await.unwrap();
2961                Ok(())
2962            }
2963
2964            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2965                gst::debug!(RUNTIME_CAT, "flush_concurrent_start: stopped flushing");
2966                self.flush_stop_sender.send(()).await.unwrap();
2967                Ok(())
2968            }
2969        }
2970
2971        let context = Context::acquire("flush_concurrent_start", Duration::from_millis(2)).unwrap();
2972
2973        let task = Task::default();
2974
2975        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2976        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2977        let fut = task.prepare(
2978            TaskStartTest {
2979                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2980                    .name("runtime::Task::flush_concurrent_start")
2981                    .build()
2982                    .into(),
2983                flush_start_sender,
2984                flush_stop_sender,
2985            },
2986            context,
2987        );
2988        drop(fut);
2989
2990        let oob_context =
2991            Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap();
2992        let task_clone = task.clone();
2993
2994        block_on(task.pause()).unwrap();
2995
2996        // Launch flush_start // start
2997        let (ready_sender, ready_receiver) = oneshot::channel();
2998        gst::debug!(RUNTIME_CAT, "flush_concurrent_start: spawning flush_start");
2999        let flush_start_handle = oob_context.spawn(async move {
3000            gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start");
3001            ready_sender.send(()).unwrap();
3002            let status = task_clone.flush_start();
3003            match status {
3004                Pending {
3005                    trigger: FlushStart,
3006                    origin: Paused,
3007                    ..
3008                } => (),
3009                Pending {
3010                    trigger: FlushStart,
3011                    origin: Started,
3012                    ..
3013                } => (),
3014                other => panic!("{other:?}"),
3015            };
3016            status.await.unwrap();
3017            flush_start_receiver.next().await.unwrap();
3018        });
3019
3020        gst::debug!(
3021            RUNTIME_CAT,
3022            "flush_concurrent_start: awaiting for oob_context"
3023        );
3024        block_on(ready_receiver).unwrap();
3025
3026        gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
3027        match block_on(task.start()) {
3028            Ok(Complete {
3029                origin: Paused,
3030                target: Started,
3031            }) => (),
3032            Ok(Complete {
3033                origin: PausedFlushing,
3034                target: Flushing,
3035            }) => (),
3036            other => panic!("{other:?}"),
3037        }
3038
3039        block_on(flush_start_handle).unwrap();
3040
3041        gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
3042        assert_eq!(
3043            task.flush_stop().block_on().unwrap(),
3044            Complete {
3045                origin: Flushing,
3046                target: Started,
3047            },
3048        );
3049        assert_eq!(task.state(), Started);
3050        block_on(flush_stop_receiver.next());
3051
3052        stop_then_unprepare(task);
3053    }
3054
3055    #[test]
3056    fn start_timer() {
3057        use crate::runtime::timer;
3058
3059        // Purpose: make sure a Timer initialized in a transition is
3060        // available when iterating in the loop.
3061        gst::init().unwrap();
3062
3063        struct TaskTimerTest {
3064            obj: gst::Object,
3065            timer: Option<timer::Oneshot>,
3066            timer_elapsed_sender: Option<oneshot::Sender<()>>,
3067        }
3068
3069        impl TaskImpl for TaskTimerTest {
3070            type Item = ();
3071
3072            fn obj(&self) -> &impl IsA<glib::Object> {
3073                &self.obj
3074            }
3075
3076            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
3077                self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
3078                gst::debug!(RUNTIME_CAT, "start_timer: started");
3079                Ok(())
3080            }
3081
3082            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
3083                gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer");
3084                self.timer.take().unwrap().await;
3085                Ok(())
3086            }
3087
3088            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
3089                gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
3090                if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() {
3091                    timer_elapsed_sender.send(()).unwrap();
3092                }
3093
3094                Err(gst::FlowError::Eos)
3095            }
3096        }
3097
3098        let context = Context::acquire("start_timer", Duration::from_millis(2)).unwrap();
3099
3100        let task = Task::default();
3101
3102        let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel();
3103        let fut = task.prepare(
3104            TaskTimerTest {
3105                obj: gst::Pad::builder(gst::PadDirection::Unknown)
3106                    .name("runtime::Task::start_timer")
3107                    .build()
3108                    .into(),
3109                timer: None,
3110                timer_elapsed_sender: Some(timer_elapsed_sender),
3111            },
3112            context,
3113        );
3114        drop(fut);
3115
3116        gst::debug!(RUNTIME_CAT, "start_timer: start");
3117        let fut = task.start();
3118        drop(fut);
3119
3120        block_on(timer_elapsed_receiver).unwrap();
3121        gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed received");
3122
3123        stop_then_unprepare(task);
3124    }
3125}