Skip to main content

gstthreadshare/runtime/
task.rs

1// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
2// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
3//
4// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
5// If a copy of the MPL was not distributed with this file, You can obtain one at
6// <https://mozilla.org/MPL/2.0/>.
7//
8// SPDX-License-Identifier: MPL-2.0
9
10//! An execution loop to run asynchronous processing.
11
12use futures::channel::mpsc as async_mpsc;
13use futures::channel::oneshot;
14use futures::prelude::*;
15
16use std::fmt;
17use std::ops::Deref;
18use std::pin::{Pin, pin};
19use std::sync::{Arc, Mutex, MutexGuard};
20use std::task::Poll;
21
22use gst::glib;
23use gst::glib::prelude::*;
24
25use super::{Context, JoinHandle, RUNTIME_CAT};
26
27#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
28pub enum TaskState {
29    Error,
30    Flushing,
31    Paused,
32    PausedFlushing,
33    Prepared,
34    Preparing,
35    Started,
36    Stopped,
37    Unprepared,
38}
39
40#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub enum Trigger {
42    Error,
43    FlushStart,
44    FlushStop,
45    Pause,
46    Prepare,
47    Start,
48    Stop,
49    Unprepare,
50}
51
52/// Transition success details.
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub enum TransitionOk {
55    /// Transition completed successfully.
56    Complete {
57        origin: TaskState,
58        target: TaskState,
59    },
60    /// Not waiting for transition result.
61    ///
62    /// This is to prevent:
63    /// - A deadlock when executing a transition action.
64    /// - A potential infinite wait when pausing a running loop
65    ///   which could be awaiting for an `nominal` to complete.
66    NotWaiting { trigger: Trigger, origin: TaskState },
67    /// Skipping triggering event due to current state.
68    Skipped { trigger: Trigger, state: TaskState },
69}
70
71/// TriggeringEvent error details.
72#[derive(Clone, Debug, Eq, PartialEq)]
73pub struct TransitionError {
74    pub trigger: Trigger,
75    pub state: TaskState,
76    pub err_msg: gst::ErrorMessage,
77}
78
79impl fmt::Display for TransitionError {
80    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81        write!(
82            f,
83            "{:?} from state {:?}: {:?}",
84            self.trigger, self.state, self.err_msg
85        )
86    }
87}
88
89impl std::error::Error for TransitionError {}
90
91impl From<TransitionError> for gst::ErrorMessage {
92    fn from(err: TransitionError) -> Self {
93        err.err_msg
94    }
95}
96
97/// Transition status.
98///
99/// A state transition occurs as a result of a triggering event.
100/// The triggering event is asynchronously handled by a state machine
101/// running on a [`Context`].
102#[must_use = "This `TransitionStatus` may be `Pending`. In most cases it should be awaited. See `await_maybe_on_context`"]
103pub enum TransitionStatus {
104    /// Transition result is ready.
105    Ready(Result<TransitionOk, TransitionError>),
106    /// Transition is pending.
107    Pending {
108        trigger: Trigger,
109        origin: TaskState,
110        res_fut: Pin<Box<dyn Future<Output = Result<TransitionOk, TransitionError>> + Send>>,
111    },
112}
113
114impl TransitionStatus {
115    pub fn is_ready(&self) -> bool {
116        matches!(self, TransitionStatus::Ready { .. })
117    }
118
119    pub fn is_pending(&self) -> bool {
120        matches!(self, TransitionStatus::Pending { .. })
121    }
122
123    /// Converts the `TransitionStatus` into a `Result`.
124    ///
125    /// This function allows getting the `TransitionError` when
126    /// the transition result is ready without `await`ing nor blocking.
127    ///
128    /// See also [`Self::await_maybe_on_context`].
129    // FIXME once stabilized, this could use https://github.com/rust-lang/rust/issues/84277
130    pub fn check(self) -> Result<TransitionStatus, TransitionError> {
131        match self {
132            TransitionStatus::Ready(Err(err)) => Err(err),
133            other => Ok(other),
134        }
135    }
136
137    /// Blocks on this state transition to complete, or adds a subtask if running on a [`Context`].
138    ///
139    /// Notes:
140    ///
141    /// - If you need to execute code after the transition succeeds or fails,
142    ///   see [`block_on_or_add_subtask_then`].
143    /// - When running in an `async` block within a running transition or
144    ///   task iteration, don't await for the transition as it would deadlock.
145    ///   Use [`Self::check`] to make sure the state transition is valid.
146    /// - When running in an `async` block out of a running transition or
147    ///   task iteration, just `.await` normally. E.g.:
148    ///
149    /// ```
150    /// # use gstthreadshare::runtime::task::{Task, TransitionOk, TransitionError};
151    /// # async fn async_fn() -> Result<TransitionOk, TransitionError> {
152    /// # let task = Task::default();
153    ///   let flush_ok = task.flush_start().await?;
154    /// # Ok(flush_ok)
155    /// # }
156    /// ```
157    ///
158    /// This function makes sure the transition completes successfully or
159    /// produces an error. It must be used in situations where we don't know
160    /// whether we are running on a [`Context`] or not. This is the case for
161    /// functions in [`PadSrc`] and [`PadSink`] as well as the synchronous
162    /// functions transitively called from them.
163    ///
164    /// As an example, a `PadSrc::src_event` function which handles a
165    /// `FlushStart` could call:
166    ///
167    /// ```
168    /// # fn src_event() -> bool {
169    /// # let task = gstthreadshare::runtime::Task::default();
170    ///   return task
171    ///       .flush_start()
172    ///       .block_on_or_add_subtask()
173    ///       .is_ok();
174    /// # }
175    /// ```
176    ///
177    /// If the transition is already complete, the result is returned immediately.
178    ///
179    /// If we are NOT running on a [`Context`], the transition result is awaited
180    /// by blocking on current thread and the result is returned.
181    ///
182    /// If we are running on a [`Context`], the transition result is awaited
183    /// in a sub task for current [`Context`]'s Scheduler task. As a consequence,
184    /// the sub task will be awaited in usual [`Context::drain_sub_tasks`]
185    /// rendezvous, ensuring some kind of synchronization. To avoid deadlocks,
186    /// `Ok(TransitionOk::NotWaiting { .. })` is immediately returned.
187    ///
188    /// [`PadSrc`]: ../pad/struct.PadSrc.html
189    /// [`PadSink`]: ../pad/struct.PadSink.html
190    pub fn block_on_or_add_subtask<O>(self, obj: &O) -> Result<TransitionOk, TransitionError>
191    where
192        O: IsA<glib::Object> + Send,
193    {
194        use TransitionStatus::*;
195        match self {
196            Pending {
197                trigger,
198                origin,
199                res_fut,
200            } => match Context::current_task() {
201                Some((ctx, task_id)) => {
202                    gst::debug!(
203                        RUNTIME_CAT,
204                        obj = obj,
205                        "Awaiting for {trigger:?} ack in a subtask on context {}",
206                        ctx.name()
207                    );
208
209                    let obj = obj.clone();
210                    let _ = ctx.add_sub_task(task_id, async move {
211                        let res = res_fut.await;
212                        match res {
213                            Ok(status) => {
214                                gst::log!(
215                                    RUNTIME_CAT,
216                                    obj = obj,
217                                    "Task {trigger:?} success: {status:?}",
218                                );
219                            }
220                            Err(err) => {
221                                gst::error!(
222                                    RUNTIME_CAT,
223                                    obj = obj,
224                                    "Task {trigger:?} failure: {err}"
225                                );
226                            }
227                        }
228
229                        Ok(())
230                    });
231
232                    Ok(TransitionOk::NotWaiting { trigger, origin })
233                }
234                _ => {
235                    gst::debug!(
236                        RUNTIME_CAT,
237                        obj = obj,
238                        "Awaiting for {trigger:?} ack on current thread",
239                    );
240                    let res = futures::executor::block_on(res_fut);
241                    match res {
242                        Ok(ref status) => {
243                            gst::log!(
244                                RUNTIME_CAT,
245                                obj = obj,
246                                "Task {trigger:?} success: {status:?}",
247                            );
248                        }
249                        Err(ref err) => {
250                            gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}");
251                        }
252                    }
253
254                    res
255                }
256            },
257            Ready(res) => {
258                match res {
259                    Ok(ref status) => {
260                        gst::log!(
261                            RUNTIME_CAT,
262                            obj = obj,
263                            "Task transition immediate success: {status:?}",
264                        );
265                    }
266                    Err(ref err) => {
267                        gst::error!(
268                            RUNTIME_CAT,
269                            obj = obj,
270                            "Task transition immediate failure: {err}",
271                        );
272                    }
273                }
274
275                res
276            }
277        }
278    }
279
280    /// Blocks on this state transition to complete, or adds a subtask if running on a [`Context`]
281    /// executing the provided function after the transition succeeds or fails.
282    ///
283    /// Compared to [`block_on_or_addsubtask`], this function also executes the provieded
284    /// `func` after the transition succeeded or failed. Code following [`block_on_or_addsubtask`]
285    /// can actually be executed before the transition if a subtask was added and the returned
286    /// `Result` might not reflect the actual transition result.
287    ///
288    /// If the transition is already complete, `func` is executed immediately.
289    ///
290    /// If we are NOT running on a [`Context`], the transition result is awaited
291    /// by blocking on current thread and `func` is executed.
292    ///
293    /// If we are running on a [`Context`], the transition result is awaited
294    /// in a sub task for current [`Context`]'s Scheduler task and `func` is executed with
295    /// the transition result. In this case, `block_on_or_add_subtask_then` always
296    /// returns `Ok(())` since the actual processing is handled asynchronously.
297    ///
298    /// ## Example
299    ///
300    /// ```
301    /// # use gstthreadshare::runtime::task::{Task, TransitionOk, TransitionError};
302    /// # async fn async_fn() -> Result<TransitionOk, TransitionError> {
303    /// # let task = Task::default();
304    ///   task
305    ///       .stop()
306    ///       .block_on_or_add_subtask_then(self.obj(), |elem, res| {
307    ///           // Add specific stop code here,
308    ///           // it will be executed after the transition succeeds or fails
309    ///
310    ///           if res.is_ok() {
311    ///               gst::debug!(CAT, obj = elem, "Stopped");
312    ///           }
313    ///       })
314    /// # Ok(flush_ok)
315    /// # }
316    /// ```
317    pub fn block_on_or_add_subtask_then<T, F>(
318        self,
319        obj: glib::BorrowedObject<'_, T>,
320        func: F,
321    ) -> Result<(), gst::ErrorMessage>
322    where
323        T: IsA<glib::Object> + Send,
324        F: FnOnce(&T, &Result<TransitionOk, TransitionError>) + Send + 'static,
325    {
326        use TransitionStatus::*;
327        match self {
328            Pending {
329                trigger, res_fut, ..
330            } => match Context::current_task() {
331                Some((ctx, task_id)) => {
332                    gst::debug!(
333                        RUNTIME_CAT,
334                        obj = obj,
335                        "Awaiting for {trigger:?} ack in a subtask on context {}",
336                        ctx.name()
337                    );
338                    let obj = obj.clone();
339                    let _ = ctx.add_sub_task(task_id, async move {
340                        let res = res_fut.await;
341                        match res {
342                            Ok(ref status) => {
343                                gst::log!(
344                                    RUNTIME_CAT,
345                                    obj = obj,
346                                    "Task {trigger:?} success: {status:?}",
347                                );
348                                func(&obj, &res);
349                                Ok(())
350                            }
351                            Err(ref err) => {
352                                gst::error!(
353                                    RUNTIME_CAT,
354                                    obj = obj,
355                                    "Task {trigger:?} failure: {err}",
356                                );
357                                func(&obj, &res);
358                                Err(gst::FlowError::Error)
359                            }
360                        }
361                    });
362
363                    Ok(())
364                }
365                _ => {
366                    gst::debug!(
367                        RUNTIME_CAT,
368                        obj = obj,
369                        "Awaiting for {trigger:?} ack on current thread",
370                    );
371                    let res = futures::executor::block_on(res_fut);
372                    match res {
373                        Ok(ref status) => {
374                            gst::log!(
375                                RUNTIME_CAT,
376                                obj = obj,
377                                "Task {trigger:?} success: {status:?}",
378                            );
379                            func(&obj, &res);
380                            Ok(())
381                        }
382                        Err(ref err) => {
383                            gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}",);
384                            func(&obj, &res);
385                            res.map(|_| ()).map_err(|err| err.into())
386                        }
387                    }
388                }
389            },
390            Ready(res) => match res {
391                Ok(ref status) => {
392                    gst::log!(
393                        RUNTIME_CAT,
394                        obj = obj,
395                        "Task transition immediate success: {status:?}",
396                    );
397                    func(&obj, &res);
398                    Ok(())
399                }
400                Err(ref err) => {
401                    gst::error!(
402                        RUNTIME_CAT,
403                        obj = obj,
404                        "Task transition immediate failure: {err}",
405                    );
406                    func(&obj, &res);
407                    res.map(|_| ()).map_err(|err| err.into())
408                }
409            },
410        }
411    }
412}
413
414impl Future for TransitionStatus {
415    type Output = Result<TransitionOk, TransitionError>;
416
417    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
418        use TransitionStatus::*;
419
420        match &mut *self {
421            Ready(res) => Poll::Ready(res.clone()),
422            Pending { res_fut, .. } => match Pin::new(res_fut).poll(cx) {
423                Poll::Pending => Poll::Pending,
424                Poll::Ready(res) => {
425                    *self = Ready(res.clone());
426
427                    Poll::Ready(res)
428                }
429            },
430        }
431    }
432}
433
434impl From<TransitionOk> for TransitionStatus {
435    fn from(ok: TransitionOk) -> Self {
436        Self::Ready(Ok(ok))
437    }
438}
439
440impl From<TransitionError> for TransitionStatus {
441    fn from(err: TransitionError) -> Self {
442        Self::Ready(Err(err))
443    }
444}
445
446// Explicit impl due to `res_fut` not implementing `Debug`.
447impl fmt::Debug for TransitionStatus {
448    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449        use TransitionStatus::*;
450        match self {
451            Ready(res) => f.debug_tuple("Ready").field(res).finish(),
452            Pending {
453                trigger, origin, ..
454            } => f
455                .debug_struct("Pending")
456                .field("trigger", trigger)
457                .field("origin", origin)
458                .finish(),
459        }
460    }
461}
462
463/// Implementation trait for `Task`s.
464///
465/// Defines implementations for state transition actions and error handlers.
466pub trait TaskImpl: Send + 'static {
467    type Item: Send + 'static;
468
469    fn obj(&self) -> &impl IsA<glib::Object>;
470
471    fn prepare(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
472        future::ok(())
473    }
474
475    fn unprepare(&mut self) -> impl Future<Output = ()> + Send {
476        future::ready(())
477    }
478
479    fn start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
480        future::ok(())
481    }
482
483    /// Tries to retrieve the next item to process.
484    ///
485    /// With [`Self::handle_item`], this is one of the two `Task` loop
486    /// functions. They are executed in a loop in the `Started` state.
487    ///
488    /// Function `try_next` is awaited at the beginning of each iteration,
489    /// and can be cancelled at `await` point if a state transition is requested.
490    ///
491    /// If `Ok(item)` is returned, the iteration calls [`Self::handle_item`]
492    /// with said `Item`.
493    ///
494    /// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`].
495    fn try_next(&mut self) -> impl Future<Output = Result<Self::Item, gst::FlowError>> + Send;
496
497    /// Does whatever needs to be done with the `item`.
498    ///
499    /// With [`Self::try_next`], this is one of the two `Task` loop
500    /// functions. They are executed in a loop in the `Started` state.
501    ///
502    /// Function `handle_item` asynchronously processes an `item` previously
503    /// retrieved by [`Self::try_next`]. Processing is guaranteed to run
504    /// to completion even if a state transition is requested.
505    ///
506    /// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`].
507    fn handle_item(
508        &mut self,
509        _item: Self::Item,
510    ) -> impl Future<Output = Result<(), gst::FlowError>> + Send;
511
512    fn pause(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
513        future::ok(())
514    }
515
516    fn flush_start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
517        future::ready(Ok(()))
518    }
519
520    fn flush_stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
521        future::ready(Ok(()))
522    }
523
524    fn stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
525        future::ready(Ok(()))
526    }
527
528    /// Handles an error occurring during the execution of the `Task` loop.
529    ///
530    /// This include errors returned by [`Self::try_next`] & [`Self::handle_item`].
531    ///
532    /// If the error is unrecoverable, implementations might use
533    /// `gst::Element::post_error_message` and return `Trigger::Error`.
534    ///
535    /// Otherwise, handle the error and return the requested `Transition` to recover.
536    ///
537    /// Default behaviour depends on the `err`:
538    ///
539    /// - `FlowError::Flushing` -> `Trigger::FlushStart`.
540    /// - `FlowError::Eos` -> `Trigger::Stop`.
541    /// - Other `FlowError` -> `Trigger::Error`.
542    fn handle_loop_error(&mut self, err: gst::FlowError) -> impl Future<Output = Trigger> + Send {
543        async move {
544            match err {
545                gst::FlowError::Flushing => {
546                    gst::debug!(
547                        RUNTIME_CAT,
548                        obj = self.obj(),
549                        "Task loop returned Flushing. Posting FlushStart"
550                    );
551                    Trigger::FlushStart
552                }
553                gst::FlowError::Eos => {
554                    gst::debug!(
555                        RUNTIME_CAT,
556                        obj = self.obj(),
557                        "Task loop returned Eos. Posting Stop"
558                    );
559                    Trigger::Stop
560                }
561                other => {
562                    gst::error!(
563                        RUNTIME_CAT,
564                        obj = self.obj(),
565                        "Task loop returned {other:?}. Posting Error",
566                    );
567                    Trigger::Error
568                }
569            }
570        }
571    }
572
573    /// Handles an error occurring during the execution of a transition action.
574    ///
575    /// This handler also catches errors returned by subtasks spawned by the transition action.
576    ///
577    /// If the error is unrecoverable, implementations might use `gst::Element::post_error_message`
578    /// and return `Trigger::Error`.
579    ///
580    /// Otherwise, handle the error and return the recovering `Trigger`.
581    ///
582    /// Default is to `gst::error` log and return `Trigger::Error`.
583    fn handle_action_error(
584        &mut self,
585        trigger: Trigger,
586        state: TaskState,
587        err: gst::ErrorMessage,
588    ) -> impl Future<Output = Trigger> + Send {
589        async move {
590            gst::error!(
591                RUNTIME_CAT,
592                obj = self.obj(),
593                "TaskImpl transition action error during {trigger:?} from {state:?}: {err:?}. Posting Trigger::Error",
594            );
595
596            Trigger::Error
597        }
598    }
599}
600
601type AckSender = oneshot::Sender<Result<TransitionOk, TransitionError>>;
602type AckReceiver = oneshot::Receiver<Result<TransitionOk, TransitionError>>;
603
604struct TriggeringEvent {
605    trigger: Trigger,
606    ack_tx: AckSender,
607}
608
609impl TriggeringEvent {
610    fn new(trigger: Trigger) -> (Self, AckReceiver) {
611        let (ack_tx, ack_rx) = oneshot::channel();
612        let req = TriggeringEvent { trigger, ack_tx };
613
614        (req, ack_rx)
615    }
616
617    fn send_ack(self, res: Result<TransitionOk, TransitionError>) {
618        let _ = self.ack_tx.send(res);
619    }
620
621    fn send_err_ack(self) {
622        let res = Err(TransitionError {
623            trigger: self.trigger,
624            state: TaskState::Error,
625            err_msg: gst::error_msg!(
626                gst::CoreError::StateChange,
627                [
628                    "Triggering Event {:?} rejected due to a previous unrecoverable error",
629                    self.trigger,
630                ]
631            ),
632        });
633
634        self.send_ack(res);
635    }
636}
637
638impl fmt::Debug for TriggeringEvent {
639    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
640        f.debug_struct("TriggeringEvent")
641            .field("trigger", &self.trigger)
642            .finish()
643    }
644}
645
646#[derive(Debug)]
647struct StateMachineHandle {
648    join_handle: JoinHandle<()>,
649    triggering_evt_tx: async_mpsc::Sender<TriggeringEvent>,
650    context: Context,
651}
652
653impl StateMachineHandle {
654    fn trigger(&mut self, trigger: Trigger) -> AckReceiver {
655        let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
656
657        gst::log!(RUNTIME_CAT, "Pushing {triggering_evt:?}");
658        self.triggering_evt_tx.try_send(triggering_evt).unwrap();
659
660        self.context.unpark();
661
662        ack_rx
663    }
664
665    async fn join(self) {
666        self.join_handle
667            .await
668            .expect("state machine shouldn't have been cancelled");
669    }
670}
671
672#[derive(Debug)]
673struct TaskInner {
674    state: TaskState,
675    state_machine_handle: Option<StateMachineHandle>,
676}
677
678impl Default for TaskInner {
679    fn default() -> Self {
680        TaskInner {
681            state: TaskState::Unprepared,
682            state_machine_handle: None,
683        }
684    }
685}
686
687impl TaskInner {
688    fn switch_to_state(&mut self, target_state: TaskState, triggering_evt: TriggeringEvent) {
689        let res = Ok(TransitionOk::Complete {
690            origin: self.state,
691            target: target_state,
692        });
693
694        self.state = target_state;
695        triggering_evt.send_ack(res);
696    }
697
698    fn switch_to_err(&mut self, triggering_evt: TriggeringEvent) {
699        let res = Err(TransitionError {
700            trigger: triggering_evt.trigger,
701            state: self.state,
702            err_msg: gst::error_msg!(
703                gst::CoreError::StateChange,
704                [
705                    "Unrecoverable error for {triggering_evt:?} from state {:?}",
706                    self.state,
707                ]
708            ),
709        });
710
711        self.state = TaskState::Error;
712        triggering_evt.send_ack(res);
713    }
714
715    fn skip_triggering_evt(&mut self, triggering_evt: TriggeringEvent) {
716        let res = Ok(TransitionOk::Skipped {
717            trigger: triggering_evt.trigger,
718            state: self.state,
719        });
720
721        triggering_evt.send_ack(res);
722    }
723
724    fn trigger(&mut self, trigger: Trigger) -> Result<AckReceiver, TransitionError> {
725        self.state_machine_handle
726            .as_mut()
727            .map(|state_machine| state_machine.trigger(trigger))
728            .ok_or_else(|| {
729                gst::warning!(RUNTIME_CAT, "Unable to send {trigger:?}: no state machine",);
730                TransitionError {
731                    trigger,
732                    state: TaskState::Unprepared,
733                    err_msg: gst::error_msg!(
734                        gst::ResourceError::NotFound,
735                        ["Unable to send {trigger:?}: no state machine"]
736                    ),
737                }
738            })
739    }
740}
741
742impl Drop for TaskInner {
743    fn drop(&mut self) {
744        if self.state != TaskState::Unprepared {
745            // Don't panic here: in case another panic occurs, we would get
746            // "panicked while panicking" which would prevents developers
747            // from getting the initial panic message.
748            gst::fixme!(RUNTIME_CAT, "Missing call to `Task::unprepare`");
749        }
750    }
751}
752
753/// An RAII implementation of scoped locked `TaskState`.
754pub struct TaskStateGuard<'guard>(MutexGuard<'guard, TaskInner>);
755
756impl Deref for TaskStateGuard<'_> {
757    type Target = TaskState;
758
759    fn deref(&self) -> &Self::Target {
760        &(self.0).state
761    }
762}
763
764/// A `Task` operating on a `threadshare` [`Context`].
765///
766/// [`Context`]: ../executor/struct.Context.html
767#[derive(Debug, Clone)]
768pub struct Task(Arc<Mutex<TaskInner>>);
769
770impl Default for Task {
771    fn default() -> Self {
772        Task(Arc::new(Mutex::new(TaskInner::default())))
773    }
774}
775
776impl Task {
777    pub fn state(&self) -> TaskState {
778        self.0.lock().unwrap().state
779    }
780
781    pub fn lock_state(&self) -> TaskStateGuard<'_> {
782        TaskStateGuard(self.0.lock().unwrap())
783    }
784
785    pub fn prepare(&self, task_impl: impl TaskImpl, context: Context) -> TransitionStatus {
786        let mut inner = self.0.lock().unwrap();
787
788        let origin = inner.state;
789        match origin {
790            TaskState::Unprepared => (),
791            TaskState::Prepared | TaskState::Preparing => {
792                gst::debug!(
793                    RUNTIME_CAT,
794                    obj = task_impl.obj(),
795                    "Task already {origin:?}",
796                );
797                return TransitionOk::Skipped {
798                    trigger: Trigger::Prepare,
799                    state: origin,
800                }
801                .into();
802            }
803            state => {
804                gst::warning!(
805                    RUNTIME_CAT,
806                    obj = task_impl.obj(),
807                    "Attempt to prepare Task in state {state:?}"
808                );
809                return TransitionError {
810                    trigger: Trigger::Prepare,
811                    state: inner.state,
812                    err_msg: gst::error_msg!(
813                        gst::CoreError::StateChange,
814                        ["Attempt to prepare Task in state {state:?}"]
815                    ),
816                }
817                .into();
818            }
819        }
820
821        assert!(inner.state_machine_handle.is_none());
822
823        inner.state = TaskState::Preparing;
824
825        gst::log!(
826            RUNTIME_CAT,
827            obj = task_impl.obj(),
828            "Spawning task state machine"
829        );
830        inner.state_machine_handle = Some(StateMachine::spawn(self.0.clone(), task_impl, context));
831
832        let ack_rx = match inner.trigger(Trigger::Prepare) {
833            Ok(ack_rx) => ack_rx,
834            Err(err) => return err.into(),
835        };
836        drop(inner);
837
838        TransitionStatus::Pending {
839            trigger: Trigger::Prepare,
840            origin: TaskState::Unprepared,
841            res_fut: Box::pin(ack_rx.map(Result::unwrap)),
842        }
843    }
844
845    pub fn unprepare(&self) -> TransitionStatus {
846        let mut inner = self.0.lock().unwrap();
847
848        let origin = inner.state;
849        let mut state_machine_handle = match origin {
850            TaskState::Stopped
851            | TaskState::Error
852            | TaskState::Prepared
853            | TaskState::Preparing
854            | TaskState::Unprepared => match inner.state_machine_handle.take() {
855                Some(state_machine_handle) => {
856                    gst::debug!(RUNTIME_CAT, "Unpreparing task");
857
858                    state_machine_handle
859                }
860                None => {
861                    gst::debug!(RUNTIME_CAT, "Task already unpreparing");
862                    return TransitionOk::Skipped {
863                        trigger: Trigger::Unprepare,
864                        state: origin,
865                    }
866                    .into();
867                }
868            },
869            state => {
870                gst::warning!(RUNTIME_CAT, "Attempt to unprepare Task in state {state:?}");
871                return TransitionError {
872                    trigger: Trigger::Unprepare,
873                    state: inner.state,
874                    err_msg: gst::error_msg!(
875                        gst::CoreError::StateChange,
876                        ["Attempt to unprepare Task in state {state:?}"]
877                    ),
878                }
879                .into();
880            }
881        };
882
883        let ack_rx = state_machine_handle.trigger(Trigger::Unprepare);
884        drop(inner);
885
886        let state_machine_end_fut = async {
887            state_machine_handle.join().await;
888            ack_rx.await.unwrap()
889        };
890
891        TransitionStatus::Pending {
892            trigger: Trigger::Unprepare,
893            origin,
894            res_fut: Box::pin(state_machine_end_fut),
895        }
896    }
897
898    /// Starts the `Task`.
899    ///
900    /// The execution occurs on the `Task` context.
901    pub fn start(&self) -> TransitionStatus {
902        let mut inner = self.0.lock().unwrap();
903
904        if let TaskState::Started = inner.state {
905            return TransitionOk::Skipped {
906                trigger: Trigger::Start,
907                state: TaskState::Started,
908            }
909            .into();
910        }
911
912        let ack_rx = match inner.trigger(Trigger::Start) {
913            Ok(ack_rx) => ack_rx,
914            Err(err) => return err.into(),
915        };
916
917        let origin = inner.state;
918        drop(inner);
919
920        TransitionStatus::Pending {
921            trigger: Trigger::Start,
922            origin,
923            res_fut: Box::pin(ack_rx.map(Result::unwrap)),
924        }
925    }
926
927    /// Requests the `Task` loop to pause.
928    ///
929    /// If an item handling is in progress, it will run to completion,
930    /// then no iterations will be executed before `start` is called again.
931    pub fn pause(&self) -> TransitionStatus {
932        self.push_pending(Trigger::Pause)
933    }
934
935    pub fn flush_start(&self) -> TransitionStatus {
936        self.push_pending(Trigger::FlushStart)
937    }
938
939    pub fn flush_stop(&self) -> TransitionStatus {
940        self.push_pending(Trigger::FlushStop)
941    }
942
943    /// Stops the `Started` `Task` and wait for it to finish.
944    pub fn stop(&self) -> TransitionStatus {
945        self.push_pending(Trigger::Stop)
946    }
947
948    /// Pushes a [`Trigger`] and returns TransitionStatus::Pending.
949    fn push_pending(&self, trigger: Trigger) -> TransitionStatus {
950        let mut inner = self.0.lock().unwrap();
951
952        let ack_rx = match inner.trigger(trigger) {
953            Ok(ack_rx) => ack_rx,
954            Err(err) => return err.into(),
955        };
956
957        let origin = inner.state;
958        drop(inner);
959
960        TransitionStatus::Pending {
961            trigger,
962            origin,
963            res_fut: Box::pin(ack_rx.map(Result::unwrap)),
964        }
965    }
966}
967
968struct StateMachine<Task: TaskImpl> {
969    task_impl: Task,
970    triggering_evt_rx: async_mpsc::Receiver<TriggeringEvent>,
971    pending_triggering_evt: Option<TriggeringEvent>,
972}
973
974macro_rules! exec_action {
975    ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
976        match $self.task_impl.$action().await {
977            Ok(()) => Ok($triggering_evt),
978            Err(err) => {
979                // FIXME problem is that we loose the origin trigger in the
980                //       final TransitionStatus.
981
982                let next_trigger = $self
983                    .task_impl
984                    .handle_action_error($triggering_evt.trigger, $origin, err)
985                    .await;
986
987                // Convert triggering event according to the error handler's decision
988                gst::trace!(
989                    RUNTIME_CAT,
990                    obj = $self.task_impl.obj(),
991                    "TaskImpl transition action error: converting {:?} to {next_trigger:?}",
992                    $triggering_evt.trigger,
993                );
994
995                $triggering_evt.trigger = next_trigger;
996                $self.pending_triggering_evt = Some($triggering_evt);
997
998                Err(())
999            }
1000        }
1001    }};
1002}
1003
1004impl<Task: TaskImpl> StateMachine<Task> {
1005    fn spawn(
1006        task_inner: Arc<Mutex<TaskInner>>,
1007        task_impl: Task,
1008        context: Context,
1009    ) -> StateMachineHandle {
1010        let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
1011
1012        let state_machine = StateMachine {
1013            task_impl,
1014            triggering_evt_rx,
1015            pending_triggering_evt: None,
1016        };
1017
1018        StateMachineHandle {
1019            join_handle: context.spawn_and_unpark(state_machine.run(task_inner)),
1020            triggering_evt_tx,
1021            context,
1022        }
1023    }
1024
1025    async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>) {
1026        let mut triggering_evt = self
1027            .triggering_evt_rx
1028            .next()
1029            .await
1030            .expect("triggering_evt_rx dropped");
1031
1032        if let Trigger::Prepare = triggering_evt.trigger {
1033            gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Preparing task");
1034
1035            let res = exec_action!(
1036                self,
1037                prepare,
1038                triggering_evt,
1039                TaskState::Unprepared,
1040                &task_inner
1041            );
1042            if let Ok(triggering_evt) = res {
1043                let mut task_inner = task_inner.lock().unwrap();
1044                let res = Ok(TransitionOk::Complete {
1045                    origin: TaskState::Unprepared,
1046                    target: TaskState::Prepared,
1047                });
1048
1049                task_inner.state = TaskState::Prepared;
1050                triggering_evt.send_ack(res);
1051
1052                gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task Prepared");
1053            }
1054        } else {
1055            panic!("Unexpected initial trigger {:?}", triggering_evt.trigger);
1056        }
1057
1058        loop {
1059            triggering_evt = match self.pending_triggering_evt.take() {
1060                Some(pending_triggering_evt) => pending_triggering_evt,
1061                None => self
1062                    .triggering_evt_rx
1063                    .next()
1064                    .await
1065                    .expect("triggering_evt_rx dropped"),
1066            };
1067            gst::trace!(
1068                RUNTIME_CAT,
1069                obj = self.task_impl.obj(),
1070                "State machine popped {triggering_evt:?}"
1071            );
1072
1073            match triggering_evt.trigger {
1074                Trigger::Error => {
1075                    let mut task_inner = task_inner.lock().unwrap();
1076                    task_inner.switch_to_err(triggering_evt);
1077                    gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Switched to Error");
1078                }
1079                Trigger::Start => {
1080                    let origin = {
1081                        let mut task_inner = task_inner.lock().unwrap();
1082                        let origin = task_inner.state;
1083                        match origin {
1084                            TaskState::Stopped | TaskState::Paused | TaskState::Prepared => (),
1085                            TaskState::PausedFlushing => {
1086                                task_inner.switch_to_state(TaskState::Flushing, triggering_evt);
1087                                gst::trace!(
1088                                    RUNTIME_CAT,
1089                                    obj = self.task_impl.obj(),
1090                                    "Switched from PausedFlushing to Flushing"
1091                                );
1092                                continue;
1093                            }
1094                            TaskState::Error => {
1095                                triggering_evt.send_err_ack();
1096                                continue;
1097                            }
1098                            state => {
1099                                task_inner.skip_triggering_evt(triggering_evt);
1100                                gst::trace!(
1101                                    RUNTIME_CAT,
1102                                    obj = self.task_impl.obj(),
1103                                    "Skipped Start in state {state:?}"
1104                                );
1105                                continue;
1106                            }
1107                        }
1108
1109                        origin
1110                    };
1111
1112                    self.start(triggering_evt, origin, &task_inner).await;
1113                    // next/pending triggering event handled in next iteration
1114                }
1115                Trigger::Pause => {
1116                    let (origin, target) = {
1117                        let mut task_inner = task_inner.lock().unwrap();
1118                        let origin = task_inner.state;
1119                        match origin {
1120                            TaskState::Started | TaskState::Stopped | TaskState::Prepared => {
1121                                (origin, TaskState::Paused)
1122                            }
1123                            TaskState::Flushing => (origin, TaskState::PausedFlushing),
1124                            TaskState::Error => (TaskState::Error, TaskState::Error),
1125                            state => {
1126                                task_inner.skip_triggering_evt(triggering_evt);
1127                                gst::trace!(
1128                                    RUNTIME_CAT,
1129                                    obj = self.task_impl.obj(),
1130                                    "Skipped Pause in state {state:?}"
1131                                );
1132                                continue;
1133                            }
1134                        }
1135                    };
1136
1137                    let res = exec_action!(self, pause, triggering_evt, origin, &task_inner);
1138                    if let Ok(triggering_evt) = res {
1139                        task_inner
1140                            .lock()
1141                            .unwrap()
1142                            .switch_to_state(target, triggering_evt);
1143                        gst::trace!(
1144                            RUNTIME_CAT,
1145                            obj = self.task_impl.obj(),
1146                            "Task loop {target:?}"
1147                        );
1148                    }
1149                }
1150                Trigger::Stop => {
1151                    let (origin, target) = {
1152                        let mut task_inner = task_inner.lock().unwrap();
1153                        let origin = task_inner.state;
1154                        match origin {
1155                            TaskState::Started
1156                            | TaskState::Paused
1157                            | TaskState::PausedFlushing
1158                            | TaskState::Flushing => (origin, TaskState::Stopped),
1159                            TaskState::Error => (TaskState::Error, TaskState::Error),
1160                            state => {
1161                                task_inner.skip_triggering_evt(triggering_evt);
1162                                gst::trace!(
1163                                    RUNTIME_CAT,
1164                                    obj = self.task_impl.obj(),
1165                                    "Skipped Stop in state {state:?}"
1166                                );
1167                                continue;
1168                            }
1169                        }
1170                    };
1171
1172                    let res = exec_action!(self, stop, triggering_evt, origin, &task_inner);
1173                    if let Ok(triggering_evt) = res {
1174                        task_inner
1175                            .lock()
1176                            .unwrap()
1177                            .switch_to_state(target, triggering_evt);
1178                        gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1179                    }
1180                }
1181                Trigger::FlushStart => {
1182                    let (origin, target) = {
1183                        let mut task_inner = task_inner.lock().unwrap();
1184                        let origin = task_inner.state;
1185                        match origin {
1186                            TaskState::Started => (origin, TaskState::Flushing),
1187                            TaskState::Paused => (origin, TaskState::PausedFlushing),
1188                            TaskState::Error => (TaskState::Error, TaskState::Error),
1189                            state => {
1190                                task_inner.skip_triggering_evt(triggering_evt);
1191                                gst::trace!(
1192                                    RUNTIME_CAT,
1193                                    obj = self.task_impl.obj(),
1194                                    "Skipped FlushStart in state {state:?}"
1195                                );
1196                                continue;
1197                            }
1198                        }
1199                    };
1200
1201                    let res = exec_action!(self, flush_start, triggering_evt, origin, &task_inner);
1202                    if let Ok(triggering_evt) = res {
1203                        task_inner
1204                            .lock()
1205                            .unwrap()
1206                            .switch_to_state(target, triggering_evt);
1207                        gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1208                    }
1209                }
1210                Trigger::FlushStop => {
1211                    let origin = task_inner.lock().unwrap().state;
1212                    let is_paused = match origin {
1213                        TaskState::Flushing => false,
1214                        TaskState::PausedFlushing => true,
1215                        TaskState::Error => {
1216                            triggering_evt.send_err_ack();
1217                            continue;
1218                        }
1219                        state => {
1220                            task_inner
1221                                .lock()
1222                                .unwrap()
1223                                .skip_triggering_evt(triggering_evt);
1224                            gst::trace!(
1225                                RUNTIME_CAT,
1226                                obj = self.task_impl.obj(),
1227                                "Skipped FlushStop in state {state:?}"
1228                            );
1229                            continue;
1230                        }
1231                    };
1232
1233                    let res = exec_action!(self, flush_stop, triggering_evt, origin, &task_inner);
1234                    if let Ok(triggering_evt) = res {
1235                        if is_paused {
1236                            task_inner
1237                                .lock()
1238                                .unwrap()
1239                                .switch_to_state(TaskState::Paused, triggering_evt);
1240                            gst::trace!(
1241                                RUNTIME_CAT,
1242                                obj = self.task_impl.obj(),
1243                                "Switched from PausedFlushing to Paused"
1244                            );
1245                        } else {
1246                            self.start(triggering_evt, origin, &task_inner).await;
1247                            // next/pending triggering event handled in next iteration
1248                        }
1249                    }
1250                }
1251                Trigger::Unprepare => {
1252                    // Unprepare is not joined by an ack_rx but by joining the state machine handle
1253                    self.task_impl.unprepare().await;
1254
1255                    task_inner
1256                        .lock()
1257                        .unwrap()
1258                        .switch_to_state(TaskState::Unprepared, triggering_evt);
1259
1260                    break;
1261                }
1262                _ => unreachable!("State machine handler {:?}", triggering_evt),
1263            }
1264        }
1265
1266        gst::trace!(
1267            RUNTIME_CAT,
1268            obj = self.task_impl.obj(),
1269            "Task state machine terminated"
1270        );
1271    }
1272
1273    async fn start(
1274        &mut self,
1275        mut triggering_evt: TriggeringEvent,
1276        origin: TaskState,
1277        task_inner: &Arc<Mutex<TaskInner>>,
1278    ) {
1279        match exec_action!(self, start, triggering_evt, origin, &task_inner) {
1280            Ok(triggering_evt) => {
1281                let mut task_inner = task_inner.lock().unwrap();
1282                task_inner.switch_to_state(TaskState::Started, triggering_evt);
1283            }
1284            Err(_) => {
1285                // error handled by exec_action
1286                return;
1287            }
1288        }
1289
1290        match self.run_loop().await {
1291            Ok(()) => (),
1292            Err(err) => {
1293                let next_trigger = self.task_impl.handle_loop_error(err).await;
1294                let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
1295                self.pending_triggering_evt = Some(triggering_evt);
1296            }
1297        }
1298    }
1299
1300    async fn run_loop(&mut self) -> Result<(), gst::FlowError> {
1301        gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task loop started");
1302
1303        let mut try_next_res;
1304        loop {
1305            try_next_res = {
1306                // select_biased requires the selected futures to implement
1307                // `FusedFuture`. Because async trait functions are not stable,
1308                // we use `BoxFuture` for the `TaskImpl` function, including
1309                // `try_next`. Since we need to get a new `BoxFuture` at
1310                // each iteration, we can guarantee that the future is
1311                // always valid for use in `select_biased`.
1312                let mut try_next_fut = pin!(self.task_impl.try_next().fuse());
1313                futures::select_biased! {
1314                    triggering_evt = self.triggering_evt_rx.next() => {
1315                        let triggering_evt = triggering_evt.expect("broken state machine channel");
1316                        gst::trace!(
1317                            RUNTIME_CAT,
1318                            "Task loop handing {:?} to state machine",
1319                            triggering_evt,
1320                        );
1321                        self.pending_triggering_evt = Some(triggering_evt);
1322                        return Ok(());
1323                    }
1324                    try_next_res = try_next_fut => try_next_res,
1325                }
1326            };
1327
1328            let item = try_next_res.inspect_err(|err| {
1329                gst::debug!(
1330                    RUNTIME_CAT,
1331                    obj = self.task_impl.obj(),
1332                    "TaskImpl::try_next returned {err:?}"
1333                );
1334            })?;
1335
1336            self.task_impl.handle_item(item).await.inspect_err(|&err| {
1337                gst::debug!(
1338                    RUNTIME_CAT,
1339                    obj = self.task_impl.obj(),
1340                    "TaskImpl::handle_item returned {err:?}"
1341                );
1342            })?;
1343        }
1344    }
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349    use futures::channel::{mpsc, oneshot};
1350    use futures::executor::block_on;
1351    use futures::prelude::*;
1352    use gst::glib;
1353    use gst::glib::prelude::*;
1354    use std::future::pending;
1355    use std::time::Duration;
1356
1357    use super::{
1358        Task, TaskImpl,
1359        TaskState::{self, *},
1360        TransitionError, TransitionOk,
1361        TransitionOk::*,
1362        TransitionStatus,
1363        TransitionStatus::*,
1364        Trigger::{self, *},
1365    };
1366    use crate::runtime::{Context, RUNTIME_CAT};
1367
1368    impl TransitionStatus {
1369        // Only useful for unit tests, use `block_on_or_add_sub_task_and_then`
1370        // or `block_on_or_add_sub_task` in user code.
1371        fn block_on(self) -> Result<TransitionOk, TransitionError> {
1372            assert!(!Context::is_context_thread());
1373            match self {
1374                Pending {
1375                    trigger, res_fut, ..
1376                } => {
1377                    gst::debug!(
1378                        RUNTIME_CAT,
1379                        "Awaiting for {:?} ack on current thread",
1380                        trigger,
1381                    );
1382                    futures::executor::block_on(res_fut)
1383                }
1384                Ready(res) => res,
1385            }
1386        }
1387    }
1388
1389    #[track_caller]
1390    fn stop_then_unprepare(task: Task) {
1391        task.stop().block_on().unwrap();
1392        task.unprepare().block_on().unwrap();
1393    }
1394
1395    #[test]
1396    fn nominal() {
1397        gst::init().unwrap();
1398
1399        struct TaskTest {
1400            obj: gst::Object,
1401            prepared_sender: mpsc::Sender<()>,
1402            started_sender: mpsc::Sender<()>,
1403            try_next_ready_sender: mpsc::Sender<()>,
1404            try_next_receiver: mpsc::Receiver<()>,
1405            handle_item_ready_sender: mpsc::Sender<()>,
1406            handle_item_sender: mpsc::Sender<()>,
1407            paused_sender: mpsc::Sender<()>,
1408            stopped_sender: mpsc::Sender<()>,
1409            unprepared_sender: mpsc::Sender<()>,
1410        }
1411
1412        impl TaskImpl for TaskTest {
1413            type Item = ();
1414
1415            fn obj(&self) -> &impl IsA<glib::Object> {
1416                &self.obj
1417            }
1418
1419            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1420                gst::debug!(RUNTIME_CAT, "nominal: prepared");
1421                self.prepared_sender.send(()).await.unwrap();
1422                Ok(())
1423            }
1424
1425            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1426                gst::debug!(RUNTIME_CAT, "nominal: started");
1427                self.started_sender.send(()).await.unwrap();
1428                Ok(())
1429            }
1430
1431            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1432                gst::debug!(RUNTIME_CAT, "nominal: entering try_next");
1433                self.try_next_ready_sender.send(()).await.unwrap();
1434                gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next");
1435                self.try_next_receiver.next().await.unwrap();
1436                Ok(())
1437            }
1438
1439            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1440                gst::debug!(RUNTIME_CAT, "nominal: entering handle_item");
1441                self.handle_item_ready_sender.send(()).await.unwrap();
1442
1443                gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item");
1444                self.handle_item_sender.send(()).await.unwrap();
1445                gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item");
1446
1447                Ok(())
1448            }
1449
1450            async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
1451                gst::debug!(RUNTIME_CAT, "nominal: paused");
1452                self.paused_sender.send(()).await.unwrap();
1453                Ok(())
1454            }
1455
1456            async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
1457                gst::debug!(RUNTIME_CAT, "nominal: stopped");
1458                self.stopped_sender.send(()).await.unwrap();
1459                Ok(())
1460            }
1461
1462            async fn unprepare(&mut self) {
1463                gst::debug!(RUNTIME_CAT, "nominal: unprepared");
1464                self.unprepared_sender.send(()).await.unwrap();
1465            }
1466        }
1467
1468        let context = Context::acquire("nominal", Duration::from_millis(2)).unwrap();
1469
1470        let task = Task::default();
1471
1472        assert_eq!(task.state(), Unprepared);
1473
1474        gst::debug!(RUNTIME_CAT, "nominal: preparing");
1475
1476        let (prepared_sender, mut prepared_receiver) = mpsc::channel(1);
1477        let (started_sender, mut started_receiver) = mpsc::channel(1);
1478        let (try_next_ready_sender, mut try_next_ready_receiver) = mpsc::channel(1);
1479        let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
1480        let (handle_item_ready_sender, mut handle_item_ready_receiver) = mpsc::channel(1);
1481        let (handle_item_sender, mut handle_item_receiver) = mpsc::channel(0);
1482        let (paused_sender, mut paused_receiver) = mpsc::channel(1);
1483        let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
1484        let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
1485        let obj = gst::Pad::builder(gst::PadDirection::Unknown)
1486            .name("runtime::Task::nominal")
1487            .build();
1488        let prepare_status = task.prepare(
1489            TaskTest {
1490                obj: obj.clone().into(),
1491                prepared_sender,
1492                started_sender,
1493                try_next_ready_sender,
1494                try_next_receiver,
1495                handle_item_ready_sender,
1496                handle_item_sender,
1497                paused_sender,
1498                stopped_sender,
1499                unprepared_sender,
1500            },
1501            context,
1502        );
1503
1504        assert!(prepare_status.is_pending());
1505        match prepare_status {
1506            Pending {
1507                trigger: Prepare,
1508                origin: Unprepared,
1509                ..
1510            } => (),
1511            other => panic!("{other:?}"),
1512        };
1513
1514        gst::debug!(RUNTIME_CAT, "nominal: starting (async prepare)");
1515        let start_status = task.start().check().unwrap();
1516
1517        block_on(prepared_receiver.next()).unwrap();
1518        // also tests await_maybe_on_context
1519        assert_eq!(
1520            prepare_status.block_on_or_add_subtask(&obj).unwrap(),
1521            Complete {
1522                origin: Unprepared,
1523                target: Prepared,
1524            },
1525        );
1526
1527        block_on(started_receiver.next()).unwrap();
1528        assert_eq!(
1529            start_status.block_on().unwrap(),
1530            Complete {
1531                origin: Prepared,
1532                target: Started,
1533            }
1534        );
1535        assert_eq!(task.state(), Started);
1536
1537        // unlock task loop and keep looping
1538        block_on(try_next_ready_receiver.next()).unwrap();
1539        block_on(try_next_sender.send(())).unwrap();
1540        block_on(handle_item_ready_receiver.next()).unwrap();
1541        block_on(handle_item_receiver.next()).unwrap();
1542
1543        gst::debug!(RUNTIME_CAT, "nominal: starting (redundant)");
1544        // already started
1545        assert_eq!(
1546            task.start().block_on().unwrap(),
1547            Skipped {
1548                trigger: Start,
1549                state: Started,
1550            },
1551        );
1552        assert_eq!(task.state(), Started);
1553
1554        // Attempt to prepare Task in state Started (also tests check)
1555        match task.unprepare().check().unwrap_err() {
1556            TransitionError {
1557                trigger: Unprepare,
1558                state: Started,
1559                ..
1560            } => (),
1561            other => panic!("{other:?}"),
1562        }
1563
1564        gst::debug!(RUNTIME_CAT, "nominal: pause cancelling try_next");
1565        block_on(try_next_ready_receiver.next()).unwrap();
1566
1567        let pause_status = task.pause().check().unwrap();
1568        gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1569        block_on(paused_receiver.next()).unwrap();
1570        assert_eq!(
1571            pause_status.block_on().unwrap(),
1572            Complete {
1573                origin: Started,
1574                target: Paused,
1575            },
1576        );
1577
1578        // handle_item not reached
1579        assert!(handle_item_ready_receiver.try_recv().is_err());
1580        // try_next not reached again
1581        assert!(try_next_ready_receiver.try_recv().is_err());
1582
1583        gst::debug!(
1584            RUNTIME_CAT,
1585            "nominal: starting (after pause cancelling try_next)"
1586        );
1587        let start_receiver = task.start().check().unwrap();
1588        block_on(started_receiver.next()).unwrap();
1589        assert_eq!(
1590            start_receiver.block_on().unwrap(),
1591            Complete {
1592                origin: Paused,
1593                target: Started,
1594            },
1595        );
1596        assert_eq!(task.state(), Started);
1597
1598        gst::debug!(RUNTIME_CAT, "nominal: pause // handle_item");
1599        block_on(try_next_ready_receiver.next()).unwrap();
1600        block_on(try_next_sender.send(())).unwrap();
1601        // Make sure item is picked
1602        block_on(handle_item_ready_receiver.next()).unwrap();
1603
1604        gst::debug!(RUNTIME_CAT, "nominal: requesting to pause");
1605        let pause_status = task.pause().check().unwrap();
1606
1607        gst::debug!(RUNTIME_CAT, "nominal: unlocking item handling");
1608        block_on(handle_item_receiver.next()).unwrap();
1609
1610        gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1611        block_on(paused_receiver.next()).unwrap();
1612        assert_eq!(
1613            pause_status.block_on().unwrap(),
1614            Complete {
1615                origin: Started,
1616                target: Paused,
1617            },
1618        );
1619
1620        // try_next not reached again
1621        assert!(try_next_ready_receiver.try_recv().is_err());
1622
1623        gst::debug!(
1624            RUNTIME_CAT,
1625            "nominal: starting (after pause // handle_item)"
1626        );
1627        let start_receiver = task.start().check().unwrap();
1628        block_on(started_receiver.next()).unwrap();
1629        assert_eq!(
1630            start_receiver.block_on().unwrap(),
1631            Complete {
1632                origin: Paused,
1633                target: Started,
1634            },
1635        );
1636        assert_eq!(task.state(), Started);
1637
1638        gst::debug!(RUNTIME_CAT, "nominal: stopping");
1639        assert_eq!(
1640            task.stop().block_on().unwrap(),
1641            Complete {
1642                origin: Started,
1643                target: Stopped,
1644            },
1645        );
1646
1647        assert_eq!(task.state(), Stopped);
1648        let _ = block_on(stopped_receiver.next());
1649
1650        // purge remaining iteration received before stop if any
1651        let _ = try_next_ready_receiver.try_recv();
1652
1653        gst::debug!(RUNTIME_CAT, "nominal: starting (after stop)");
1654        assert_eq!(
1655            task.start().block_on().unwrap(),
1656            Complete {
1657                origin: Stopped,
1658                target: Started,
1659            },
1660        );
1661        let _ = block_on(started_receiver.next());
1662
1663        gst::debug!(RUNTIME_CAT, "nominal: stopping");
1664        assert_eq!(
1665            task.stop().block_on().unwrap(),
1666            Complete {
1667                origin: Started,
1668                target: Stopped,
1669            },
1670        );
1671
1672        assert_eq!(
1673            task.unprepare().block_on().unwrap(),
1674            Complete {
1675                origin: Stopped,
1676                target: Unprepared,
1677            },
1678        );
1679
1680        assert_eq!(task.state(), Unprepared);
1681        let _ = block_on(unprepared_receiver.next());
1682    }
1683
1684    #[test]
1685    fn prepare_error() {
1686        gst::init().unwrap();
1687
1688        struct TaskPrepareTest {
1689            obj: gst::Object,
1690            prepare_error_sender: mpsc::Sender<()>,
1691        }
1692
1693        impl TaskImpl for TaskPrepareTest {
1694            type Item = ();
1695
1696            fn obj(&self) -> &impl IsA<glib::Object> {
1697                &self.obj
1698            }
1699
1700            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1701                gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
1702                Err(gst::error_msg!(
1703                    gst::ResourceError::Failed,
1704                    ["prepare_error: intentional error"]
1705                ))
1706            }
1707
1708            async fn handle_action_error(
1709                &mut self,
1710                trigger: Trigger,
1711                state: TaskState,
1712                err: gst::ErrorMessage,
1713            ) -> Trigger {
1714                gst::debug!(
1715                    RUNTIME_CAT,
1716                    "prepare_error: handling prepare error {:?}",
1717                    err
1718                );
1719                match (trigger, state) {
1720                    (Prepare, Unprepared) => {
1721                        self.prepare_error_sender.send(()).await.unwrap();
1722                    }
1723                    other => unreachable!("{:?}", other),
1724                }
1725                Trigger::Error
1726            }
1727
1728            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1729                unreachable!("prepare_error: try_next");
1730            }
1731
1732            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1733                unreachable!("prepare_error: handle_item");
1734            }
1735        }
1736
1737        let context = Context::acquire("prepare_error", Duration::from_millis(2)).unwrap();
1738
1739        let task = Task::default();
1740
1741        assert_eq!(task.state(), Unprepared);
1742
1743        let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
1744        let prepare_status = task.prepare(
1745            TaskPrepareTest {
1746                obj: gst::Pad::builder(gst::PadDirection::Unknown)
1747                    .name("runtime::Task::prepare_error")
1748                    .build()
1749                    .into(),
1750                prepare_error_sender,
1751            },
1752            context,
1753        );
1754
1755        gst::debug!(
1756            RUNTIME_CAT,
1757            "prepare_error: await action error notification"
1758        );
1759        block_on(prepare_error_receiver.next()).unwrap();
1760
1761        match prepare_status.block_on().unwrap_err() {
1762            TransitionError {
1763                trigger: Trigger::Error,
1764                state: Preparing,
1765                ..
1766            } => (),
1767            other => panic!("{other:?}"),
1768        }
1769
1770        // Wait for state machine to reach Error
1771        while TaskState::Error != task.state() {
1772            std::thread::sleep(Duration::from_millis(2));
1773        }
1774
1775        match task.start().block_on().unwrap_err() {
1776            TransitionError {
1777                trigger: Start,
1778                state: TaskState::Error,
1779                ..
1780            } => (),
1781            other => panic!("{other:?}"),
1782        }
1783
1784        block_on(task.unprepare()).unwrap();
1785    }
1786
1787    #[test]
1788    fn prepare_start_ok() {
1789        // Hold the preparation function so that it completes after the start request is engaged
1790
1791        gst::init().unwrap();
1792
1793        struct TaskPrepareTest {
1794            obj: gst::Object,
1795            prepare_receiver: mpsc::Receiver<()>,
1796        }
1797
1798        impl TaskImpl for TaskPrepareTest {
1799            type Item = ();
1800
1801            fn obj(&self) -> &impl IsA<glib::Object> {
1802                &self.obj
1803            }
1804
1805            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1806                gst::debug!(
1807                    RUNTIME_CAT,
1808                    "prepare_start_ok: preparation awaiting trigger"
1809                );
1810                self.prepare_receiver.next().await.unwrap();
1811                gst::debug!(RUNTIME_CAT, "prepare_start_ok: preparation complete Ok");
1812                Ok(())
1813            }
1814
1815            async fn handle_action_error(
1816                &mut self,
1817                _trigger: Trigger,
1818                _state: TaskState,
1819                _err: gst::ErrorMessage,
1820            ) -> Trigger {
1821                unreachable!("prepare_start_ok: handle_prepare_error");
1822            }
1823
1824            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1825                gst::debug!(RUNTIME_CAT, "prepare_start_ok: started");
1826                Ok(())
1827            }
1828
1829            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1830                pending().await
1831            }
1832
1833            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1834                unreachable!("prepare_start_ok: handle_item");
1835            }
1836        }
1837
1838        let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap();
1839
1840        let task = Task::default();
1841
1842        let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
1843        let fut = task.prepare(
1844            TaskPrepareTest {
1845                obj: gst::Pad::builder(gst::PadDirection::Unknown)
1846                    .name("runtime::Task::prepare_start_ok")
1847                    .build()
1848                    .into(),
1849                prepare_receiver,
1850            },
1851            context,
1852        );
1853        drop(fut);
1854
1855        let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
1856        let (ready_sender, ready_receiver) = oneshot::channel();
1857        let start_handle = start_ctx.spawn(async move {
1858            assert_eq!(task.state(), Preparing);
1859            gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting");
1860            let start_status = task.start();
1861            match start_status {
1862                Pending {
1863                    trigger: Start,
1864                    origin: Preparing,
1865                    ..
1866                } => (),
1867                other => panic!("{other:?}"),
1868            }
1869            ready_sender.send(()).unwrap();
1870            assert_eq!(
1871                start_status.await.unwrap(),
1872                Complete {
1873                    origin: Prepared,
1874                    target: Started,
1875                },
1876            );
1877            assert_eq!(task.state(), Started);
1878
1879            let stop_status = task.stop();
1880            match stop_status {
1881                Pending {
1882                    trigger: Stop,
1883                    origin: Started,
1884                    ..
1885                } => (),
1886                other => panic!("{other:?}"),
1887            }
1888            assert_eq!(
1889                stop_status.await.unwrap(),
1890                Complete {
1891                    origin: Started,
1892                    target: Stopped,
1893                },
1894            );
1895            assert_eq!(task.state(), Stopped);
1896
1897            let unprepare_status = task.unprepare();
1898            match unprepare_status {
1899                Pending {
1900                    trigger: Unprepare,
1901                    origin: Stopped,
1902                    ..
1903                } => (),
1904                other => panic!("{other:?}"),
1905            };
1906            assert_eq!(
1907                unprepare_status.await.unwrap(),
1908                Complete {
1909                    origin: Stopped,
1910                    target: Unprepared,
1911                },
1912            );
1913            assert_eq!(task.state(), Unprepared);
1914        });
1915
1916        gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
1917        block_on(ready_receiver).unwrap();
1918
1919        gst::debug!(RUNTIME_CAT, "prepare_start_ok: triggering preparation");
1920        block_on(prepare_sender.send(())).unwrap();
1921
1922        block_on(start_handle).unwrap();
1923    }
1924
1925    #[test]
1926    fn prepare_start_error() {
1927        // Hold the preparation function so that it completes after the start request is engaged
1928
1929        gst::init().unwrap();
1930
1931        struct TaskPrepareTest {
1932            obj: gst::Object,
1933            prepare_receiver: mpsc::Receiver<()>,
1934            prepare_error_sender: mpsc::Sender<()>,
1935        }
1936
1937        impl TaskImpl for TaskPrepareTest {
1938            type Item = ();
1939
1940            fn obj(&self) -> &impl IsA<glib::Object> {
1941                &self.obj
1942            }
1943
1944            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1945                gst::debug!(
1946                    RUNTIME_CAT,
1947                    "prepare_start_error: preparation awaiting trigger"
1948                );
1949                self.prepare_receiver.next().await.unwrap();
1950                gst::debug!(RUNTIME_CAT, "prepare_start_error: preparation complete Err");
1951
1952                Err(gst::error_msg!(
1953                    gst::ResourceError::Failed,
1954                    ["prepare_start_error: intentional error"]
1955                ))
1956            }
1957
1958            async fn handle_action_error(
1959                &mut self,
1960                trigger: Trigger,
1961                state: TaskState,
1962                err: gst::ErrorMessage,
1963            ) -> Trigger {
1964                gst::debug!(
1965                    RUNTIME_CAT,
1966                    "prepare_start_error: handling prepare error {:?}",
1967                    err
1968                );
1969                match (trigger, state) {
1970                    (Prepare, Unprepared) => {
1971                        self.prepare_error_sender.send(()).await.unwrap();
1972                    }
1973                    other => panic!("action error for {other:?}"),
1974                }
1975                Trigger::Error
1976            }
1977
1978            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1979                unreachable!("prepare_start_error: start");
1980            }
1981
1982            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1983                unreachable!("prepare_start_error: try_next");
1984            }
1985
1986            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1987                unreachable!("prepare_start_error: handle_item");
1988            }
1989        }
1990
1991        let context = Context::acquire("prepare_start_error", Duration::from_millis(2)).unwrap();
1992
1993        let task = Task::default();
1994
1995        let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
1996        let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
1997        let prepare_status = task.prepare(
1998            TaskPrepareTest {
1999                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2000                    .name("runtime::Task::prepare_start_error")
2001                    .build()
2002                    .into(),
2003                prepare_receiver,
2004                prepare_error_sender,
2005            },
2006            context,
2007        );
2008        match prepare_status {
2009            Pending {
2010                trigger: Prepare,
2011                origin: Unprepared,
2012                ..
2013            } => (),
2014            other => panic!("{other:?}"),
2015        };
2016
2017        let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap();
2018        let (ready_sender, ready_receiver) = oneshot::channel();
2019        let start_handle = start_ctx.spawn(async move {
2020            gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)");
2021            let fut = task.start();
2022            drop(fut);
2023            ready_sender.send(()).unwrap();
2024            // FIXME we loose the origin Trigger (Start)
2025            // and only get the Trigger returned by handle_action_error
2026            // see also: comment in exec_action!
2027            match prepare_status.await {
2028                Err(TransitionError {
2029                    trigger: Trigger::Error,
2030                    state: Preparing,
2031                    ..
2032                }) => (),
2033                other => panic!("{other:?}"),
2034            }
2035
2036            let unprepare_status = task.unprepare();
2037            match unprepare_status {
2038                Pending {
2039                    trigger: Unprepare,
2040                    origin: TaskState::Error,
2041                    ..
2042                } => (),
2043                other => panic!("{other:?}"),
2044            };
2045            assert_eq!(
2046                unprepare_status.await.unwrap(),
2047                Complete {
2048                    origin: TaskState::Error,
2049                    target: Unprepared,
2050                },
2051            );
2052        });
2053
2054        gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx");
2055        block_on(ready_receiver).unwrap();
2056
2057        gst::debug!(
2058            RUNTIME_CAT,
2059            "prepare_start_error: triggering preparation (failure)"
2060        );
2061        block_on(prepare_sender.send(())).unwrap();
2062
2063        gst::debug!(
2064            RUNTIME_CAT,
2065            "prepare_start_error: await prepare error notification"
2066        );
2067        block_on(prepare_error_receiver.next()).unwrap();
2068
2069        block_on(start_handle).unwrap();
2070    }
2071
2072    #[test]
2073    fn item_error() {
2074        gst::init().unwrap();
2075
2076        struct TaskTest {
2077            obj: gst::Object,
2078            try_next_receiver: mpsc::Receiver<gst::FlowError>,
2079        }
2080
2081        impl TaskImpl for TaskTest {
2082            type Item = gst::FlowError;
2083
2084            fn obj(&self) -> &impl IsA<glib::Object> {
2085                &self.obj
2086            }
2087
2088            async fn try_next(&mut self) -> Result<gst::FlowError, gst::FlowError> {
2089                gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
2090                Ok(self.try_next_receiver.next().await.unwrap())
2091            }
2092
2093            async fn handle_item(&mut self, item: gst::FlowError) -> Result<(), gst::FlowError> {
2094                gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item);
2095                Err(item)
2096            }
2097        }
2098
2099        let context = Context::acquire("item_error", Duration::from_millis(2)).unwrap();
2100        let task = Task::default();
2101        gst::debug!(RUNTIME_CAT, "item_error: prepare and start");
2102        let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
2103        task.prepare(
2104            TaskTest {
2105                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2106                    .name("runtime::Task::item_error")
2107                    .build()
2108                    .into(),
2109                try_next_receiver,
2110            },
2111            context,
2112        )
2113        .block_on()
2114        .unwrap();
2115        task.start().block_on().unwrap();
2116
2117        gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos");
2118        block_on(try_next_sender.send(gst::FlowError::Eos)).unwrap();
2119        // Wait for state machine to reach Stopped
2120        while Stopped != task.state() {
2121            std::thread::sleep(Duration::from_millis(2));
2122        }
2123
2124        gst::debug!(RUNTIME_CAT, "item_error: starting (after stop)");
2125        assert_eq!(
2126            task.start().block_on().unwrap(),
2127            Complete {
2128                origin: Stopped,
2129                target: Started,
2130            },
2131        );
2132
2133        gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Error");
2134        block_on(try_next_sender.send(gst::FlowError::Error)).unwrap();
2135        // Wait for state machine to reach Error
2136        while TaskState::Error != task.state() {
2137            std::thread::sleep(Duration::from_millis(2));
2138        }
2139
2140        gst::debug!(RUNTIME_CAT, "item_error: attempting to start (after Error)");
2141        match task.start().block_on().unwrap_err() {
2142            TransitionError {
2143                trigger: Start,
2144                state: TaskState::Error,
2145                ..
2146            } => (),
2147            other => panic!("{other:?}"),
2148        }
2149
2150        assert_eq!(
2151            task.unprepare().block_on().unwrap(),
2152            Complete {
2153                origin: TaskState::Error,
2154                target: Unprepared,
2155            },
2156        );
2157    }
2158
2159    #[test]
2160    fn flush_regular_sync() {
2161        gst::init().unwrap();
2162
2163        struct TaskFlushTest {
2164            obj: gst::Object,
2165            flush_start_sender: mpsc::Sender<()>,
2166            flush_stop_sender: mpsc::Sender<()>,
2167        }
2168
2169        impl TaskImpl for TaskFlushTest {
2170            type Item = ();
2171
2172            fn obj(&self) -> &impl IsA<glib::Object> {
2173                &self.obj
2174            }
2175
2176            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2177                pending().await
2178            }
2179
2180            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2181                unreachable!("flush_regular_sync: handle_item");
2182            }
2183
2184            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2185                gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
2186                self.flush_start_sender.send(()).await.unwrap();
2187                Ok(())
2188            }
2189
2190            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2191                gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopped flushing");
2192                self.flush_stop_sender.send(()).await.unwrap();
2193                Ok(())
2194            }
2195        }
2196
2197        let context = Context::acquire("flush_regular_sync", Duration::from_millis(2)).unwrap();
2198
2199        let task = Task::default();
2200
2201        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2202        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2203        let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2204            .name("runtime::Task::flush_regular_sync")
2205            .build();
2206        let fut = task.prepare(
2207            TaskFlushTest {
2208                obj: obj.clone().into(),
2209                flush_start_sender,
2210                flush_stop_sender,
2211            },
2212            context,
2213        );
2214        drop(fut);
2215
2216        gst::debug!(RUNTIME_CAT, "flush_regular_sync: start");
2217        block_on(task.start()).unwrap();
2218
2219        gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush");
2220        assert_eq!(
2221            task.flush_start().block_on().unwrap(),
2222            Complete {
2223                origin: Started,
2224                target: Flushing,
2225            },
2226        );
2227        assert_eq!(task.state(), Flushing);
2228
2229        block_on(flush_start_receiver.next()).unwrap();
2230
2231        gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
2232        assert_eq!(
2233            task.flush_stop().block_on_or_add_subtask(&obj).unwrap(),
2234            Complete {
2235                origin: Flushing,
2236                target: Started,
2237            },
2238        );
2239        assert_eq!(task.state(), Started);
2240
2241        block_on(flush_stop_receiver.next()).unwrap();
2242
2243        let fut = task.pause();
2244        drop(fut);
2245        stop_then_unprepare(task);
2246    }
2247
2248    #[test]
2249    fn flush_regular_different_context() {
2250        // Purpose: make sure a flush sequence triggered from a Context doesn't block.
2251        gst::init().unwrap();
2252
2253        struct TaskFlushTest {
2254            obj: gst::Object,
2255            flush_start_sender: mpsc::Sender<()>,
2256            flush_stop_sender: mpsc::Sender<()>,
2257        }
2258
2259        impl TaskImpl for TaskFlushTest {
2260            type Item = ();
2261
2262            fn obj(&self) -> &impl IsA<glib::Object> {
2263                &self.obj
2264            }
2265
2266            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2267                pending().await
2268            }
2269
2270            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2271                unreachable!("flush_regular_different_context: handle_item");
2272            }
2273
2274            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2275                gst::debug!(
2276                    RUNTIME_CAT,
2277                    "flush_regular_different_context: started flushing"
2278                );
2279                self.flush_start_sender.send(()).await.unwrap();
2280                Ok(())
2281            }
2282
2283            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2284                gst::debug!(
2285                    RUNTIME_CAT,
2286                    "flush_regular_different_context: stopped flushing"
2287                );
2288                self.flush_stop_sender.send(()).await.unwrap();
2289                Ok(())
2290            }
2291        }
2292
2293        let context =
2294            Context::acquire("flush_regular_different_context", Duration::from_millis(2)).unwrap();
2295
2296        let task = Task::default();
2297
2298        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2299        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2300        let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2301            .name("runtime::Task::flush_regular_different_context")
2302            .build();
2303        let fut = task.prepare(
2304            TaskFlushTest {
2305                obj: obj.clone().into(),
2306                flush_start_sender,
2307                flush_stop_sender,
2308            },
2309            context,
2310        );
2311        drop(fut);
2312
2313        gst::debug!(RUNTIME_CAT, "flush_regular_different_context: start");
2314        task.start().block_on().unwrap();
2315
2316        let oob_context = Context::acquire(
2317            "flush_regular_different_context_oob",
2318            Duration::from_millis(2),
2319        )
2320        .unwrap();
2321
2322        let task_clone = task.clone();
2323        let flush_res_fut = oob_context.spawn(async move {
2324            let flush_start_status = task_clone.flush_start();
2325            match flush_start_status {
2326                Pending {
2327                    trigger: FlushStart,
2328                    origin: Started,
2329                    ..
2330                } => (),
2331                other => panic!("{other:?}"),
2332            };
2333            assert_eq!(
2334                flush_start_status.await.unwrap(),
2335                Complete {
2336                    origin: Started,
2337                    target: Flushing,
2338                },
2339            );
2340            assert_eq!(task_clone.state(), Flushing);
2341            flush_start_receiver.next().await.unwrap();
2342
2343            let flush_stop_status = task_clone.flush_stop();
2344            match flush_stop_status {
2345                Pending {
2346                    trigger: FlushStop,
2347                    origin: Flushing,
2348                    ..
2349                } => (),
2350                other => panic!("{other:?}"),
2351            };
2352            assert_eq!(
2353                flush_stop_status.block_on_or_add_subtask(&obj).unwrap(),
2354                NotWaiting {
2355                    trigger: FlushStop,
2356                    origin: Flushing,
2357                },
2358            );
2359
2360            Context::drain_sub_tasks().await.unwrap();
2361            assert_eq!(task_clone.state(), Started);
2362        });
2363
2364        block_on(flush_res_fut).unwrap();
2365        block_on(flush_stop_receiver.next()).unwrap();
2366
2367        stop_then_unprepare(task);
2368    }
2369
2370    #[test]
2371    fn flush_regular_same_context() {
2372        // Purpose: make sure a flush sequence triggered from the same Context doesn't block.
2373        gst::init().unwrap();
2374
2375        struct TaskFlushTest {
2376            obj: gst::Object,
2377            flush_start_sender: mpsc::Sender<()>,
2378            flush_stop_sender: mpsc::Sender<()>,
2379        }
2380
2381        impl TaskImpl for TaskFlushTest {
2382            type Item = ();
2383
2384            fn obj(&self) -> &impl IsA<glib::Object> {
2385                &self.obj
2386            }
2387
2388            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2389                pending().await
2390            }
2391
2392            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2393                unreachable!("flush_regular_same_context: handle_item");
2394            }
2395
2396            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2397                gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
2398                self.flush_start_sender.send(()).await.unwrap();
2399                Ok(())
2400            }
2401
2402            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2403                gst::debug!(RUNTIME_CAT, "flush_regular_same_context: stopped flushing");
2404                self.flush_stop_sender.send(()).await.unwrap();
2405                Ok(())
2406            }
2407        }
2408
2409        let context =
2410            Context::acquire("flush_regular_same_context", Duration::from_millis(2)).unwrap();
2411
2412        let task = Task::default();
2413
2414        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2415        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2416        let fut = task.prepare(
2417            TaskFlushTest {
2418                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2419                    .name("runtime::Task::flush_regular_same_context")
2420                    .build()
2421                    .into(),
2422                flush_start_sender,
2423                flush_stop_sender,
2424            },
2425            context.clone(),
2426        );
2427        drop(fut);
2428
2429        block_on(task.start()).unwrap();
2430
2431        let task_clone = task.clone();
2432        let flush_handle = context.spawn(async move {
2433            let flush_start_status = task_clone.flush_start();
2434            match flush_start_status {
2435                Pending {
2436                    trigger: FlushStart,
2437                    origin: Started,
2438                    ..
2439                } => (),
2440                other => panic!("{other:?}"),
2441            };
2442            assert_eq!(
2443                flush_start_status.await.unwrap(),
2444                Complete {
2445                    origin: Started,
2446                    target: Flushing,
2447                },
2448            );
2449            assert_eq!(task_clone.state(), Flushing);
2450            flush_start_receiver.next().await.unwrap();
2451
2452            let flush_stop_status = task_clone.flush_stop();
2453            match flush_stop_status {
2454                Pending {
2455                    trigger: FlushStop,
2456                    origin: Flushing,
2457                    ..
2458                } => (),
2459                other => panic!("{other:?}"),
2460            };
2461            assert_eq!(
2462                flush_stop_status.await.unwrap(),
2463                Complete {
2464                    origin: Flushing,
2465                    target: Started,
2466                },
2467            );
2468            assert_eq!(task_clone.state(), Started);
2469        });
2470
2471        block_on(flush_handle).unwrap();
2472        block_on(flush_stop_receiver.next()).unwrap();
2473
2474        stop_then_unprepare(task);
2475    }
2476
2477    #[test]
2478    fn flush_from_loop() {
2479        // Purpose: make sure a flush_start triggered from an iteration doesn't block.
2480        gst::init().unwrap();
2481
2482        struct TaskFlushTest {
2483            obj: gst::Object,
2484            task: Task,
2485            flush_start_sender: mpsc::Sender<()>,
2486        }
2487
2488        impl TaskImpl for TaskFlushTest {
2489            type Item = ();
2490
2491            fn obj(&self) -> &impl IsA<glib::Object> {
2492                &self.obj
2493            }
2494
2495            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2496                Ok(())
2497            }
2498
2499            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2500                gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item");
2501                match self.task.flush_start() {
2502                    Pending {
2503                        trigger: FlushStart,
2504                        origin: Started,
2505                        ..
2506                    } => (),
2507                    other => panic!("{other:?}"),
2508                }
2509                Ok(())
2510            }
2511
2512            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2513                gst::debug!(RUNTIME_CAT, "flush_from_loop: started flushing");
2514                self.flush_start_sender.send(()).await.unwrap();
2515                Ok(())
2516            }
2517        }
2518
2519        let context = Context::acquire("flush_from_loop", Duration::from_millis(2)).unwrap();
2520
2521        let task = Task::default();
2522
2523        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2524        let fut = task.prepare(
2525            TaskFlushTest {
2526                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2527                    .name("runtime::Task::flush_from_loop")
2528                    .build()
2529                    .into(),
2530                task: task.clone(),
2531                flush_start_sender,
2532            },
2533            context,
2534        );
2535        drop(fut);
2536
2537        let fut = task.start();
2538        drop(fut);
2539
2540        gst::debug!(
2541            RUNTIME_CAT,
2542            "flush_from_loop: awaiting flush_start notification"
2543        );
2544        block_on(flush_start_receiver.next()).unwrap();
2545
2546        assert_eq!(
2547            task.stop().block_on().unwrap(),
2548            Complete {
2549                origin: Flushing,
2550                target: Stopped,
2551            },
2552        );
2553        task.unprepare().block_on().unwrap();
2554    }
2555
2556    #[test]
2557    fn pause_from_loop() {
2558        // Purpose: make sure a start triggered from an iteration doesn't block.
2559        // E.g. an auto pause cancellation after a delay.
2560        gst::init().unwrap();
2561
2562        struct TaskStartTest {
2563            obj: gst::Object,
2564            task: Task,
2565            pause_sender: mpsc::Sender<()>,
2566        }
2567
2568        impl TaskImpl for TaskStartTest {
2569            type Item = ();
2570
2571            fn obj(&self) -> &impl IsA<glib::Object> {
2572                &self.obj
2573            }
2574
2575            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2576                Ok(())
2577            }
2578
2579            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2580                gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
2581
2582                crate::runtime::timer::delay_for(Duration::from_millis(50)).await;
2583
2584                gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
2585                match self.task.pause() {
2586                    Pending {
2587                        trigger: Pause,
2588                        origin: Started,
2589                        ..
2590                    } => (),
2591                    other => panic!("{other:?}"),
2592                }
2593
2594                Ok(())
2595            }
2596
2597            async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
2598                gst::debug!(RUNTIME_CAT, "pause_from_loop: entering pause action");
2599                self.pause_sender.send(()).await.unwrap();
2600                Ok(())
2601            }
2602        }
2603
2604        let context = Context::acquire("pause_from_loop", Duration::from_millis(2)).unwrap();
2605
2606        let task = Task::default();
2607
2608        let (pause_sender, mut pause_receiver) = mpsc::channel(1);
2609        let fut = task.prepare(
2610            TaskStartTest {
2611                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2612                    .name("runtime::Task::pause_from_loop")
2613                    .build()
2614                    .into(),
2615                task: task.clone(),
2616                pause_sender,
2617            },
2618            context,
2619        );
2620        drop(fut);
2621
2622        let fut = task.start();
2623        drop(fut);
2624
2625        gst::debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification");
2626        block_on(pause_receiver.next()).unwrap();
2627
2628        stop_then_unprepare(task);
2629    }
2630
2631    #[test]
2632    fn trigger_from_action() {
2633        // Purpose: make sure an event triggered from a transition action doesn't block.
2634        gst::init().unwrap();
2635
2636        struct TaskFlushTest {
2637            obj: gst::Object,
2638            task: Task,
2639            flush_stop_sender: mpsc::Sender<()>,
2640        }
2641
2642        impl TaskImpl for TaskFlushTest {
2643            type Item = ();
2644
2645            fn obj(&self) -> &impl IsA<glib::Object> {
2646                &self.obj
2647            }
2648
2649            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2650                pending().await
2651            }
2652
2653            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2654                unreachable!("trigger_from_action: handle_item");
2655            }
2656
2657            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2658                gst::debug!(
2659                    RUNTIME_CAT,
2660                    "trigger_from_action: flush_start triggering flush_stop"
2661                );
2662                match self.task.flush_stop() {
2663                    Pending {
2664                        trigger: FlushStop,
2665                        origin: Started,
2666                        ..
2667                    } => (),
2668                    other => panic!("{other:?}"),
2669                }
2670
2671                Ok(())
2672            }
2673
2674            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2675                gst::debug!(RUNTIME_CAT, "trigger_from_action: stopped flushing");
2676                self.flush_stop_sender.send(()).await.unwrap();
2677                Ok(())
2678            }
2679        }
2680
2681        let context = Context::acquire("trigger_from_action", Duration::from_millis(2)).unwrap();
2682
2683        let task = Task::default();
2684
2685        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2686        let fut = task.prepare(
2687            TaskFlushTest {
2688                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2689                    .name("runtime::Task::trigger_from_action")
2690                    .build()
2691                    .into(),
2692                task: task.clone(),
2693                flush_stop_sender,
2694            },
2695            context,
2696        );
2697        drop(fut);
2698
2699        task.start().block_on().unwrap();
2700        let fut = task.flush_start();
2701        drop(fut);
2702
2703        gst::debug!(
2704            RUNTIME_CAT,
2705            "trigger_from_action: awaiting flush_stop notification"
2706        );
2707        block_on(flush_stop_receiver.next()).unwrap();
2708
2709        stop_then_unprepare(task);
2710    }
2711
2712    #[test]
2713    fn pause_flush_start() {
2714        gst::init().unwrap();
2715
2716        struct TaskFlushTest {
2717            obj: gst::Object,
2718            started_sender: mpsc::Sender<()>,
2719            flush_start_sender: mpsc::Sender<()>,
2720            flush_stop_sender: mpsc::Sender<()>,
2721        }
2722
2723        impl TaskImpl for TaskFlushTest {
2724            type Item = ();
2725
2726            fn obj(&self) -> &impl IsA<glib::Object> {
2727                &self.obj
2728            }
2729
2730            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2731                gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
2732                self.started_sender.send(()).await.unwrap();
2733                Ok(())
2734            }
2735
2736            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2737                pending().await
2738            }
2739
2740            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2741                unreachable!("pause_flush_start: handle_item");
2742            }
2743
2744            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2745                gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
2746                self.flush_start_sender.send(()).await.unwrap();
2747                Ok(())
2748            }
2749
2750            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2751                gst::debug!(RUNTIME_CAT, "pause_flush_start: stopped flushing");
2752                self.flush_stop_sender.send(()).await.unwrap();
2753                Ok(())
2754            }
2755        }
2756
2757        let context = Context::acquire("pause_flush_start", Duration::from_millis(2)).unwrap();
2758
2759        let task = Task::default();
2760
2761        let (started_sender, mut started_receiver) = mpsc::channel(1);
2762        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2763        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2764        let fut = task.prepare(
2765            TaskFlushTest {
2766                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2767                    .name("runtime::Task::pause_flush_start")
2768                    .build()
2769                    .into(),
2770                started_sender,
2771                flush_start_sender,
2772                flush_stop_sender,
2773            },
2774            context,
2775        );
2776        drop(fut);
2777
2778        // Pause, FlushStart, FlushStop, Start
2779
2780        gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing");
2781        assert_eq!(
2782            task.pause().block_on().unwrap(),
2783            Complete {
2784                origin: Prepared,
2785                target: Paused,
2786            },
2787        );
2788
2789        gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush");
2790        assert_eq!(
2791            task.flush_start().block_on().unwrap(),
2792            Complete {
2793                origin: Paused,
2794                target: PausedFlushing,
2795            },
2796        );
2797        assert_eq!(task.state(), PausedFlushing);
2798        block_on(flush_start_receiver.next());
2799
2800        gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
2801        assert_eq!(
2802            task.flush_stop().block_on().unwrap(),
2803            Complete {
2804                origin: PausedFlushing,
2805                target: Paused,
2806            },
2807        );
2808        assert_eq!(task.state(), Paused);
2809        block_on(flush_stop_receiver.next());
2810
2811        // start action not executed
2812        started_receiver.try_recv().unwrap_err();
2813
2814        gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing");
2815        assert_eq!(
2816            task.start().block_on().unwrap(),
2817            Complete {
2818                origin: Paused,
2819                target: Started,
2820            },
2821        );
2822        assert_eq!(task.state(), Started);
2823        block_on(started_receiver.next());
2824
2825        stop_then_unprepare(task);
2826    }
2827
2828    #[test]
2829    fn pause_flushing_start() {
2830        gst::init().unwrap();
2831
2832        struct TaskFlushTest {
2833            obj: gst::Object,
2834            started_sender: mpsc::Sender<()>,
2835            flush_start_sender: mpsc::Sender<()>,
2836            flush_stop_sender: mpsc::Sender<()>,
2837        }
2838
2839        impl TaskImpl for TaskFlushTest {
2840            type Item = ();
2841
2842            fn obj(&self) -> &impl IsA<glib::Object> {
2843                &self.obj
2844            }
2845
2846            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2847                gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
2848                self.started_sender.send(()).await.unwrap();
2849                Ok(())
2850            }
2851
2852            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2853                pending().await
2854            }
2855
2856            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2857                unreachable!("pause_flushing_start: handle_item");
2858            }
2859
2860            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2861                gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
2862                self.flush_start_sender.send(()).await.unwrap();
2863                Ok(())
2864            }
2865
2866            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2867                gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopped flushing");
2868                self.flush_stop_sender.send(()).await.unwrap();
2869                Ok(())
2870            }
2871        }
2872
2873        let context = Context::acquire("pause_flushing_start", Duration::from_millis(2)).unwrap();
2874
2875        let task = Task::default();
2876
2877        let (started_sender, mut started_receiver) = mpsc::channel(1);
2878        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2879        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2880        let fut = task.prepare(
2881            TaskFlushTest {
2882                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2883                    .name("runtime::Task::pause_flushing_start")
2884                    .build()
2885                    .into(),
2886                started_sender,
2887                flush_start_sender,
2888                flush_stop_sender,
2889            },
2890            context,
2891        );
2892        drop(fut);
2893
2894        // Pause, FlushStart, Start, FlushStop
2895
2896        gst::debug!(RUNTIME_CAT, "pause_flushing_start: pausing");
2897        let fut = task.pause();
2898        drop(fut);
2899
2900        gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting flush");
2901        block_on(task.flush_start()).unwrap();
2902        assert_eq!(task.state(), PausedFlushing);
2903        block_on(flush_start_receiver.next());
2904
2905        gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
2906        assert_eq!(
2907            task.start().block_on().unwrap(),
2908            Complete {
2909                origin: PausedFlushing,
2910                target: Flushing,
2911            },
2912        );
2913        assert_eq!(task.state(), Flushing);
2914
2915        // start action not executed
2916        started_receiver.try_recv().unwrap_err();
2917
2918        gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush");
2919        assert_eq!(
2920            task.flush_stop().block_on().unwrap(),
2921            Complete {
2922                origin: Flushing,
2923                target: Started,
2924            },
2925        );
2926        assert_eq!(task.state(), Started);
2927        block_on(flush_stop_receiver.next());
2928        block_on(started_receiver.next());
2929
2930        stop_then_unprepare(task);
2931    }
2932
2933    #[test]
2934    fn flush_concurrent_start() {
2935        // Purpose: check the racy case of start being triggered in // after flush_start
2936        // e.g.: a FlushStart event received on a Pad and the element starting after a Pause
2937        gst::init().unwrap();
2938
2939        struct TaskStartTest {
2940            obj: gst::Object,
2941            flush_start_sender: mpsc::Sender<()>,
2942            flush_stop_sender: mpsc::Sender<()>,
2943        }
2944
2945        impl TaskImpl for TaskStartTest {
2946            type Item = ();
2947
2948            fn obj(&self) -> &impl IsA<glib::Object> {
2949                &self.obj
2950            }
2951
2952            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2953                pending().await
2954            }
2955
2956            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2957                unreachable!("flush_concurrent_start: handle_item");
2958            }
2959
2960            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2961                gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
2962                self.flush_start_sender.send(()).await.unwrap();
2963                Ok(())
2964            }
2965
2966            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2967                gst::debug!(RUNTIME_CAT, "flush_concurrent_start: stopped flushing");
2968                self.flush_stop_sender.send(()).await.unwrap();
2969                Ok(())
2970            }
2971        }
2972
2973        let context = Context::acquire("flush_concurrent_start", Duration::from_millis(2)).unwrap();
2974
2975        let task = Task::default();
2976
2977        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2978        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2979        let fut = task.prepare(
2980            TaskStartTest {
2981                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2982                    .name("runtime::Task::flush_concurrent_start")
2983                    .build()
2984                    .into(),
2985                flush_start_sender,
2986                flush_stop_sender,
2987            },
2988            context,
2989        );
2990        drop(fut);
2991
2992        let oob_context =
2993            Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap();
2994        let task_clone = task.clone();
2995
2996        block_on(task.pause()).unwrap();
2997
2998        // Launch flush_start // start
2999        let (ready_sender, ready_receiver) = oneshot::channel();
3000        gst::debug!(RUNTIME_CAT, "flush_concurrent_start: spawning flush_start");
3001        let flush_start_handle = oob_context.spawn(async move {
3002            gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start");
3003            ready_sender.send(()).unwrap();
3004            let status = task_clone.flush_start();
3005            match status {
3006                Pending {
3007                    trigger: FlushStart,
3008                    origin: Paused,
3009                    ..
3010                } => (),
3011                Pending {
3012                    trigger: FlushStart,
3013                    origin: Started,
3014                    ..
3015                } => (),
3016                other => panic!("{other:?}"),
3017            };
3018            status.await.unwrap();
3019            flush_start_receiver.next().await.unwrap();
3020        });
3021
3022        gst::debug!(
3023            RUNTIME_CAT,
3024            "flush_concurrent_start: awaiting for oob_context"
3025        );
3026        block_on(ready_receiver).unwrap();
3027
3028        gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
3029        match block_on(task.start()) {
3030            Ok(Complete {
3031                origin: Paused,
3032                target: Started,
3033            }) => (),
3034            Ok(Complete {
3035                origin: PausedFlushing,
3036                target: Flushing,
3037            }) => (),
3038            other => panic!("{other:?}"),
3039        }
3040
3041        block_on(flush_start_handle).unwrap();
3042
3043        gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
3044        assert_eq!(
3045            task.flush_stop().block_on().unwrap(),
3046            Complete {
3047                origin: Flushing,
3048                target: Started,
3049            },
3050        );
3051        assert_eq!(task.state(), Started);
3052        block_on(flush_stop_receiver.next());
3053
3054        stop_then_unprepare(task);
3055    }
3056
3057    #[test]
3058    fn start_timer() {
3059        use crate::runtime::timer;
3060
3061        // Purpose: make sure a Timer initialized in a transition is
3062        // available when iterating in the loop.
3063        gst::init().unwrap();
3064
3065        struct TaskTimerTest {
3066            obj: gst::Object,
3067            timer: Option<timer::Oneshot>,
3068            timer_elapsed_sender: Option<oneshot::Sender<()>>,
3069        }
3070
3071        impl TaskImpl for TaskTimerTest {
3072            type Item = ();
3073
3074            fn obj(&self) -> &impl IsA<glib::Object> {
3075                &self.obj
3076            }
3077
3078            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
3079                self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
3080                gst::debug!(RUNTIME_CAT, "start_timer: started");
3081                Ok(())
3082            }
3083
3084            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
3085                gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer");
3086                self.timer.take().unwrap().await;
3087                Ok(())
3088            }
3089
3090            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
3091                gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
3092                if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() {
3093                    timer_elapsed_sender.send(()).unwrap();
3094                }
3095
3096                Err(gst::FlowError::Eos)
3097            }
3098        }
3099
3100        let context = Context::acquire("start_timer", Duration::from_millis(2)).unwrap();
3101
3102        let task = Task::default();
3103
3104        let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel();
3105        let fut = task.prepare(
3106            TaskTimerTest {
3107                obj: gst::Pad::builder(gst::PadDirection::Unknown)
3108                    .name("runtime::Task::start_timer")
3109                    .build()
3110                    .into(),
3111                timer: None,
3112                timer_elapsed_sender: Some(timer_elapsed_sender),
3113            },
3114            context,
3115        );
3116        drop(fut);
3117
3118        gst::debug!(RUNTIME_CAT, "start_timer: start");
3119        let fut = task.start();
3120        drop(fut);
3121
3122        block_on(timer_elapsed_receiver).unwrap();
3123        gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed received");
3124
3125        stop_then_unprepare(task);
3126    }
3127}