Skip to main content

gstthreadshare/runtime/
task.rs

1// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
2// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
3//
4// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
5// If a copy of the MPL was not distributed with this file, You can obtain one at
6// <https://mozilla.org/MPL/2.0/>.
7//
8// SPDX-License-Identifier: MPL-2.0
9
10//! An execution loop to run asynchronous processing.
11
12use futures::channel::mpsc as async_mpsc;
13use futures::channel::oneshot;
14use futures::prelude::*;
15
16use std::fmt;
17use std::ops::Deref;
18use std::pin::{Pin, pin};
19use std::sync::{Arc, Mutex, MutexGuard};
20use std::task::Poll;
21
22use gst::glib;
23use gst::glib::prelude::*;
24
25use super::{Context, JoinHandle, RUNTIME_CAT};
26
27#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
28pub enum TaskState {
29    Error,
30    Flushing,
31    Paused,
32    PausedFlushing,
33    Prepared,
34    Preparing,
35    Started,
36    Stopped,
37    Unprepared,
38}
39
40#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub enum Trigger {
42    Error,
43    FlushStart,
44    FlushStop,
45    Pause,
46    Prepare,
47    Start,
48    Stop,
49    Unprepare,
50}
51
52/// Transition success details.
53#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub enum TransitionOk {
55    /// Transition completed successfully.
56    Complete {
57        origin: TaskState,
58        target: TaskState,
59    },
60    /// Not waiting for transition result.
61    ///
62    /// This is to prevent:
63    /// - A deadlock when executing a transition action.
64    /// - A potential infinite wait when pausing a running loop
65    ///   which could be awaiting for an `nominal` to complete.
66    NotWaiting { trigger: Trigger, origin: TaskState },
67    /// Skipping triggering event due to current state.
68    Skipped { trigger: Trigger, state: TaskState },
69}
70
71/// TriggeringEvent error details.
72#[derive(Clone, Debug, Eq, PartialEq)]
73pub struct TransitionError {
74    pub trigger: Trigger,
75    pub state: TaskState,
76    pub err_msg: gst::ErrorMessage,
77}
78
79impl fmt::Display for TransitionError {
80    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81        write!(
82            f,
83            "{:?} from state {:?}: {:?}",
84            self.trigger, self.state, self.err_msg
85        )
86    }
87}
88
89impl std::error::Error for TransitionError {}
90
91impl From<TransitionError> for gst::ErrorMessage {
92    fn from(err: TransitionError) -> Self {
93        err.err_msg
94    }
95}
96
97/// Transition status.
98///
99/// A state transition occurs as a result of a triggering event.
100/// The triggering event is asynchronously handled by a state machine
101/// running on a [`Context`].
102#[must_use = "This `TransitionStatus` may be `Pending`. In most cases it should be awaited. See `block_on_or_add_subtask_then`"]
103pub enum TransitionStatus {
104    /// Transition result is ready.
105    Ready(Result<TransitionOk, TransitionError>),
106    /// Transition is pending.
107    Pending {
108        trigger: Trigger,
109        origin: TaskState,
110        res_fut: Pin<Box<dyn Future<Output = Result<TransitionOk, TransitionError>> + Send>>,
111    },
112}
113
114impl TransitionStatus {
115    pub fn is_ready(&self) -> bool {
116        matches!(self, TransitionStatus::Ready { .. })
117    }
118
119    pub fn is_pending(&self) -> bool {
120        matches!(self, TransitionStatus::Pending { .. })
121    }
122
123    /// Converts the `TransitionStatus` into a `Result`.
124    ///
125    /// This function allows getting the `TransitionError` when
126    /// the transition result is ready without `await`ing nor blocking.
127    ///
128    /// See also [`Self::block_on_or_add_subtask_then`].
129    // FIXME once stabilized, this could use https://github.com/rust-lang/rust/issues/84277
130    pub fn check(self) -> Result<TransitionStatus, TransitionError> {
131        match self {
132            TransitionStatus::Ready(Err(err)) => Err(err),
133            other => Ok(other),
134        }
135    }
136
137    /// Blocks on this state transition to complete, or adds a subtask if running on a [`Context`].
138    ///
139    /// Notes:
140    ///
141    /// - If you need to execute code after the transition succeeds or fails,
142    ///   see [`Self::block_on_or_add_subtask_then`].
143    /// - When running in an `async` block within a running transition or
144    ///   task iteration, don't await for the transition as it would deadlock.
145    ///   Use [`Self::check`] to make sure the state transition is valid.
146    /// - When running in an `async` block out of a running transition or
147    ///   task iteration, just `.await` normally. E.g.:
148    ///
149    /// ```
150    /// # use gstthreadshare::runtime::task::{Task, TransitionOk, TransitionError};
151    /// # async fn async_fn() -> Result<TransitionOk, TransitionError> {
152    /// # let task = Task::default();
153    ///   let flush_ok = task.flush_start().await?;
154    /// # Ok(flush_ok)
155    /// # }
156    /// ```
157    ///
158    /// This function makes sure the transition completes successfully or
159    /// produces an error. It must be used in situations where we don't know
160    /// whether we are running on a [`Context`] or not. This is the case for
161    /// functions in [`PadSrc`] and [`PadSink`] as well as the synchronous
162    /// functions transitively called from them.
163    ///
164    /// As an example, a `PadSrc::src_event` function which handles a
165    /// `FlushStart` could call:
166    ///
167    /// ```
168    /// # fn src_event(pad: &gst::Pad, event: gst::Event) -> bool {
169    /// # let task = gstthreadshare::runtime::Task::default();
170    ///   return task
171    ///       .flush_start()
172    ///       .block_on_or_add_subtask(pad)
173    ///       .is_ok();
174    /// # }
175    /// ```
176    ///
177    /// If the transition is already complete, the result is returned immediately.
178    ///
179    /// If we are NOT running on a [`Context`], the transition result is awaited
180    /// by blocking on current thread and the result is returned.
181    ///
182    /// If we are running on a [`Context`], the transition result is awaited
183    /// in a sub task for current [`Context`]'s Scheduler task. As a consequence,
184    /// the sub task will be awaited in usual [`Context::drain_sub_tasks`]
185    /// rendezvous, ensuring some kind of synchronization. To avoid deadlocks,
186    /// `Ok(TransitionOk::NotWaiting { .. })` is immediately returned.
187    ///
188    /// [`PadSrc`]: ../pad/struct.PadSrc.html
189    /// [`PadSink`]: ../pad/struct.PadSink.html
190    pub fn block_on_or_add_subtask<O>(self, obj: &O) -> Result<TransitionOk, TransitionError>
191    where
192        O: IsA<glib::Object> + Send,
193    {
194        use TransitionStatus::*;
195        match self {
196            Pending {
197                trigger,
198                origin,
199                res_fut,
200            } => match Context::current_task() {
201                Some((ctx, task_id)) => {
202                    gst::debug!(
203                        RUNTIME_CAT,
204                        obj = obj,
205                        "Awaiting for {trigger:?} ack in a subtask on context {}",
206                        ctx.name()
207                    );
208
209                    let obj = obj.clone();
210                    let _ = ctx.add_sub_task(task_id, async move {
211                        let res = res_fut.await;
212                        match res {
213                            Ok(status) => {
214                                gst::log!(
215                                    RUNTIME_CAT,
216                                    obj = obj,
217                                    "Task {trigger:?} success: {status:?}",
218                                );
219                            }
220                            Err(err) => {
221                                gst::error!(
222                                    RUNTIME_CAT,
223                                    obj = obj,
224                                    "Task {trigger:?} failure: {err}"
225                                );
226                            }
227                        }
228
229                        Ok(())
230                    });
231
232                    Ok(TransitionOk::NotWaiting { trigger, origin })
233                }
234                _ => {
235                    gst::debug!(
236                        RUNTIME_CAT,
237                        obj = obj,
238                        "Awaiting for {trigger:?} ack on current thread",
239                    );
240                    let res = futures::executor::block_on(res_fut);
241                    match res {
242                        Ok(ref status) => {
243                            gst::log!(
244                                RUNTIME_CAT,
245                                obj = obj,
246                                "Task {trigger:?} success: {status:?}",
247                            );
248                        }
249                        Err(ref err) => {
250                            gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}");
251                        }
252                    }
253
254                    res
255                }
256            },
257            Ready(res) => {
258                match res {
259                    Ok(ref status) => {
260                        gst::log!(
261                            RUNTIME_CAT,
262                            obj = obj,
263                            "Task transition immediate success: {status:?}",
264                        );
265                    }
266                    Err(ref err) => {
267                        gst::error!(
268                            RUNTIME_CAT,
269                            obj = obj,
270                            "Task transition immediate failure: {err}",
271                        );
272                    }
273                }
274
275                res
276            }
277        }
278    }
279
280    /// Blocks on this state transition to complete, or adds a subtask if running on a [`Context`]
281    /// executing the provided function after the transition succeeds or fails.
282    ///
283    /// This function executes the provided `func` after the transition succeeded or failed.
284    /// Code following `block_on_or_addsubtask_then` can actually be executed before
285    /// the transition if a subtask was added and the returned `Result` might not reflect
286    /// the actual transition result.
287    ///
288    /// If the transition is already complete, `func` is executed immediately.
289    ///
290    /// If we are NOT running on a [`Context`], the transition result is awaited
291    /// by blocking on current thread and `func` is executed.
292    ///
293    /// If we are running on a [`Context`], the transition result is awaited
294    /// in a sub task for current [`Context`]'s Scheduler task and `func` is executed with
295    /// the transition result. In this case, `block_on_or_add_subtask_then` always
296    /// returns `Ok(())` since the actual processing is handled asynchronously.
297    ///
298    /// ## Example
299    ///
300    /// ```
301    /// # use gstthreadshare::runtime::task::{Task, TaskState, TransitionOk, TransitionError};
302    /// # use gst::{glib, subclass::prelude::ObjectSubclassExt};
303    /// # glib::wrapper! { pub struct MinimalObject(ObjectSubclass<MinimalImp>); }
304    /// # #[derive(Default)]
305    /// # pub struct MinimalImp;
306    /// # #[glib::object_subclass]
307    /// # impl gst::subclass::prelude::ObjectSubclass for MinimalImp {
308    /// #     const NAME: &'static str = "MinimalObject";
309    /// #     type Type = MinimalObject;
310    /// #     type ParentType = glib::Object;
311    /// # }
312    /// # impl gst::subclass::prelude::ObjectImpl for MinimalImp {}
313    /// # impl MinimalImp {
314    /// fn stop(&self) -> Result<(), gst::ErrorMessage> {
315    /// # let task = Task::default();
316    ///   task
317    ///       .stop()
318    ///       .block_on_or_add_subtask_then(self.obj(), |elem, res| {
319    ///           // Add specific stop code here,
320    ///           // it will be executed after the transition succeeds or fails
321    ///
322    ///           if res.is_ok() {
323    /// #             let CAT = gst::DebugCategory::new("ts-test", gst::DebugColorFlags::empty(), Some("ts test"));
324    ///               gst::debug!(CAT, obj = elem, "Stopped");
325    ///           }
326    ///       })
327    /// }
328    /// # }
329    /// ```
330    pub fn block_on_or_add_subtask_then<T, F>(
331        self,
332        obj: glib::BorrowedObject<'_, T>,
333        func: F,
334    ) -> Result<(), gst::ErrorMessage>
335    where
336        T: IsA<glib::Object> + Send,
337        F: FnOnce(&T, &Result<TransitionOk, TransitionError>) + Send + 'static,
338    {
339        use TransitionStatus::*;
340        match self {
341            Pending {
342                trigger, res_fut, ..
343            } => match Context::current_task() {
344                Some((ctx, task_id)) => {
345                    gst::debug!(
346                        RUNTIME_CAT,
347                        obj = obj,
348                        "Awaiting for {trigger:?} ack in a subtask on context {}",
349                        ctx.name()
350                    );
351                    let obj = obj.clone();
352                    let _ = ctx.add_sub_task(task_id, async move {
353                        let res = res_fut.await;
354                        match res {
355                            Ok(ref status) => {
356                                gst::log!(
357                                    RUNTIME_CAT,
358                                    obj = obj,
359                                    "Task {trigger:?} success: {status:?}",
360                                );
361                                func(&obj, &res);
362                                Ok(())
363                            }
364                            Err(ref err) => {
365                                gst::error!(
366                                    RUNTIME_CAT,
367                                    obj = obj,
368                                    "Task {trigger:?} failure: {err}",
369                                );
370                                func(&obj, &res);
371                                Err(gst::FlowError::Error)
372                            }
373                        }
374                    });
375
376                    Ok(())
377                }
378                _ => {
379                    gst::debug!(
380                        RUNTIME_CAT,
381                        obj = obj,
382                        "Awaiting for {trigger:?} ack on current thread",
383                    );
384                    let res = futures::executor::block_on(res_fut);
385                    match res {
386                        Ok(ref status) => {
387                            gst::log!(
388                                RUNTIME_CAT,
389                                obj = obj,
390                                "Task {trigger:?} success: {status:?}",
391                            );
392                            func(&obj, &res);
393                            Ok(())
394                        }
395                        Err(ref err) => {
396                            gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}",);
397                            func(&obj, &res);
398                            res.map(|_| ()).map_err(|err| err.into())
399                        }
400                    }
401                }
402            },
403            Ready(res) => match res {
404                Ok(ref status) => {
405                    gst::log!(
406                        RUNTIME_CAT,
407                        obj = obj,
408                        "Task transition immediate success: {status:?}",
409                    );
410                    func(&obj, &res);
411                    Ok(())
412                }
413                Err(ref err) => {
414                    gst::error!(
415                        RUNTIME_CAT,
416                        obj = obj,
417                        "Task transition immediate failure: {err}",
418                    );
419                    func(&obj, &res);
420                    res.map(|_| ()).map_err(|err| err.into())
421                }
422            },
423        }
424    }
425}
426
427impl Future for TransitionStatus {
428    type Output = Result<TransitionOk, TransitionError>;
429
430    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
431        use TransitionStatus::*;
432
433        match &mut *self {
434            Ready(res) => Poll::Ready(res.clone()),
435            Pending { res_fut, .. } => match Pin::new(res_fut).poll(cx) {
436                Poll::Pending => Poll::Pending,
437                Poll::Ready(res) => {
438                    *self = Ready(res.clone());
439
440                    Poll::Ready(res)
441                }
442            },
443        }
444    }
445}
446
447impl From<TransitionOk> for TransitionStatus {
448    fn from(ok: TransitionOk) -> Self {
449        Self::Ready(Ok(ok))
450    }
451}
452
453impl From<TransitionError> for TransitionStatus {
454    fn from(err: TransitionError) -> Self {
455        Self::Ready(Err(err))
456    }
457}
458
459// Explicit impl due to `res_fut` not implementing `Debug`.
460impl fmt::Debug for TransitionStatus {
461    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462        use TransitionStatus::*;
463        match self {
464            Ready(res) => f.debug_tuple("Ready").field(res).finish(),
465            Pending {
466                trigger, origin, ..
467            } => f
468                .debug_struct("Pending")
469                .field("trigger", trigger)
470                .field("origin", origin)
471                .finish(),
472        }
473    }
474}
475
476/// Implementation trait for `Task`s.
477///
478/// Defines implementations for state transition actions and error handlers.
479pub trait TaskImpl: Send + 'static {
480    type Item: Send + 'static;
481
482    fn obj(&self) -> &impl IsA<glib::Object>;
483
484    fn prepare(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
485        future::ok(())
486    }
487
488    fn unprepare(&mut self) -> impl Future<Output = ()> + Send {
489        future::ready(())
490    }
491
492    fn start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
493        future::ok(())
494    }
495
496    /// Tries to retrieve the next item to process.
497    ///
498    /// With [`Self::handle_item`], this is one of the two `Task` loop
499    /// functions. They are executed in a loop in the `Started` state.
500    ///
501    /// Function `try_next` is awaited at the beginning of each iteration,
502    /// and can be cancelled at `await` point if a state transition is requested.
503    ///
504    /// If `Ok(item)` is returned, the iteration calls [`Self::handle_item`]
505    /// with said `Item`.
506    ///
507    /// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`].
508    fn try_next(&mut self) -> impl Future<Output = Result<Self::Item, gst::FlowError>> + Send;
509
510    /// Does whatever needs to be done with the `item`.
511    ///
512    /// With [`Self::try_next`], this is one of the two `Task` loop
513    /// functions. They are executed in a loop in the `Started` state.
514    ///
515    /// Function `handle_item` asynchronously processes an `item` previously
516    /// retrieved by [`Self::try_next`]. Processing is guaranteed to run
517    /// to completion even if a state transition is requested.
518    ///
519    /// If `Err(..)` is returned, the iteration calls [`Self::handle_loop_error`].
520    fn handle_item(
521        &mut self,
522        _item: Self::Item,
523    ) -> impl Future<Output = Result<(), gst::FlowError>> + Send;
524
525    fn pause(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
526        future::ok(())
527    }
528
529    fn flush_start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
530        future::ready(Ok(()))
531    }
532
533    fn flush_stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
534        future::ready(Ok(()))
535    }
536
537    fn stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
538        future::ready(Ok(()))
539    }
540
541    /// Handles an error occurring during the execution of the `Task` loop.
542    ///
543    /// This include errors returned by [`Self::try_next`] & [`Self::handle_item`].
544    ///
545    /// If the error is unrecoverable, implementations might use
546    /// `gst::Element::post_error_message` and return `Trigger::Error`.
547    ///
548    /// Otherwise, handle the error and return the requested `Transition` to recover.
549    ///
550    /// Default behaviour depends on the `err`:
551    ///
552    /// - `FlowError::Flushing` -> `Trigger::FlushStart`.
553    /// - `FlowError::Eos` -> `Trigger::Stop`.
554    /// - Other `FlowError` -> `Trigger::Error`.
555    fn handle_loop_error(&mut self, err: gst::FlowError) -> impl Future<Output = Trigger> + Send {
556        async move {
557            match err {
558                gst::FlowError::Flushing => {
559                    gst::debug!(
560                        RUNTIME_CAT,
561                        obj = self.obj(),
562                        "Task loop returned Flushing. Posting FlushStart"
563                    );
564                    Trigger::FlushStart
565                }
566                gst::FlowError::Eos => {
567                    gst::debug!(
568                        RUNTIME_CAT,
569                        obj = self.obj(),
570                        "Task loop returned Eos. Posting Stop"
571                    );
572                    Trigger::Stop
573                }
574                other => {
575                    gst::error!(
576                        RUNTIME_CAT,
577                        obj = self.obj(),
578                        "Task loop returned {other:?}. Posting Error",
579                    );
580                    Trigger::Error
581                }
582            }
583        }
584    }
585
586    /// Handles an error occurring during the execution of a transition action.
587    ///
588    /// This handler also catches errors returned by subtasks spawned by the transition action.
589    ///
590    /// If the error is unrecoverable, implementations might use `gst::Element::post_error_message`
591    /// and return `Trigger::Error`.
592    ///
593    /// Otherwise, handle the error and return the recovering `Trigger`.
594    ///
595    /// Default is to `gst::error` log and return `Trigger::Error`.
596    fn handle_action_error(
597        &mut self,
598        trigger: Trigger,
599        state: TaskState,
600        err: gst::ErrorMessage,
601    ) -> impl Future<Output = Trigger> + Send {
602        async move {
603            gst::error!(
604                RUNTIME_CAT,
605                obj = self.obj(),
606                "TaskImpl transition action error during {trigger:?} from {state:?}: {err:?}. Posting Trigger::Error",
607            );
608
609            Trigger::Error
610        }
611    }
612}
613
614type AckSender = oneshot::Sender<Result<TransitionOk, TransitionError>>;
615type AckReceiver = oneshot::Receiver<Result<TransitionOk, TransitionError>>;
616
617struct TriggeringEvent {
618    trigger: Trigger,
619    ack_tx: AckSender,
620}
621
622impl TriggeringEvent {
623    fn new(trigger: Trigger) -> (Self, AckReceiver) {
624        let (ack_tx, ack_rx) = oneshot::channel();
625        let req = TriggeringEvent { trigger, ack_tx };
626
627        (req, ack_rx)
628    }
629
630    fn send_ack(self, res: Result<TransitionOk, TransitionError>) {
631        let _ = self.ack_tx.send(res);
632    }
633
634    fn send_err_ack(self) {
635        let res = Err(TransitionError {
636            trigger: self.trigger,
637            state: TaskState::Error,
638            err_msg: gst::error_msg!(
639                gst::CoreError::StateChange,
640                [
641                    "Triggering Event {:?} rejected due to a previous unrecoverable error",
642                    self.trigger,
643                ]
644            ),
645        });
646
647        self.send_ack(res);
648    }
649}
650
651impl fmt::Debug for TriggeringEvent {
652    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
653        f.debug_struct("TriggeringEvent")
654            .field("trigger", &self.trigger)
655            .finish()
656    }
657}
658
659#[derive(Debug)]
660struct StateMachineHandle {
661    join_handle: JoinHandle<()>,
662    triggering_evt_tx: async_mpsc::Sender<TriggeringEvent>,
663    context: Context,
664}
665
666impl StateMachineHandle {
667    fn trigger(&mut self, trigger: Trigger) -> AckReceiver {
668        let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
669
670        gst::log!(RUNTIME_CAT, "Pushing {triggering_evt:?}");
671        self.triggering_evt_tx.try_send(triggering_evt).unwrap();
672
673        self.context.unpark();
674
675        ack_rx
676    }
677
678    async fn join(self) {
679        self.join_handle
680            .await
681            .expect("state machine shouldn't have been cancelled");
682    }
683}
684
685#[derive(Debug)]
686struct TaskInner {
687    state: TaskState,
688    state_machine_handle: Option<StateMachineHandle>,
689}
690
691impl Default for TaskInner {
692    fn default() -> Self {
693        TaskInner {
694            state: TaskState::Unprepared,
695            state_machine_handle: None,
696        }
697    }
698}
699
700impl TaskInner {
701    fn switch_to_state(&mut self, target_state: TaskState, triggering_evt: TriggeringEvent) {
702        let res = Ok(TransitionOk::Complete {
703            origin: self.state,
704            target: target_state,
705        });
706
707        self.state = target_state;
708        triggering_evt.send_ack(res);
709    }
710
711    fn switch_to_err(&mut self, triggering_evt: TriggeringEvent) {
712        let res = Err(TransitionError {
713            trigger: triggering_evt.trigger,
714            state: self.state,
715            err_msg: gst::error_msg!(
716                gst::CoreError::StateChange,
717                [
718                    "Unrecoverable error for {triggering_evt:?} from state {:?}",
719                    self.state,
720                ]
721            ),
722        });
723
724        self.state = TaskState::Error;
725        triggering_evt.send_ack(res);
726    }
727
728    fn skip_triggering_evt(&mut self, triggering_evt: TriggeringEvent) {
729        let res = Ok(TransitionOk::Skipped {
730            trigger: triggering_evt.trigger,
731            state: self.state,
732        });
733
734        triggering_evt.send_ack(res);
735    }
736
737    fn trigger(&mut self, trigger: Trigger) -> Result<AckReceiver, TransitionError> {
738        self.state_machine_handle
739            .as_mut()
740            .map(|state_machine| state_machine.trigger(trigger))
741            .ok_or_else(|| {
742                gst::warning!(RUNTIME_CAT, "Unable to send {trigger:?}: no state machine",);
743                TransitionError {
744                    trigger,
745                    state: TaskState::Unprepared,
746                    err_msg: gst::error_msg!(
747                        gst::ResourceError::NotFound,
748                        ["Unable to send {trigger:?}: no state machine"]
749                    ),
750                }
751            })
752    }
753}
754
755impl Drop for TaskInner {
756    fn drop(&mut self) {
757        if self.state != TaskState::Unprepared {
758            // Don't panic here: in case another panic occurs, we would get
759            // "panicked while panicking" which would prevents developers
760            // from getting the initial panic message.
761            gst::fixme!(RUNTIME_CAT, "Missing call to `Task::unprepare`");
762        }
763    }
764}
765
766/// An RAII implementation of scoped locked `TaskState`.
767pub struct TaskStateGuard<'guard>(MutexGuard<'guard, TaskInner>);
768
769impl Deref for TaskStateGuard<'_> {
770    type Target = TaskState;
771
772    fn deref(&self) -> &Self::Target {
773        &(self.0).state
774    }
775}
776
777/// A `Task` operating on a `threadshare` [`Context`].
778///
779/// [`Context`]: ../executor/struct.Context.html
780#[derive(Debug, Clone)]
781pub struct Task(Arc<Mutex<TaskInner>>);
782
783impl Default for Task {
784    fn default() -> Self {
785        Task(Arc::new(Mutex::new(TaskInner::default())))
786    }
787}
788
789impl Task {
790    pub fn state(&self) -> TaskState {
791        self.0.lock().unwrap().state
792    }
793
794    pub fn lock_state(&self) -> TaskStateGuard<'_> {
795        TaskStateGuard(self.0.lock().unwrap())
796    }
797
798    pub fn prepare(&self, task_impl: impl TaskImpl, context: Context) -> TransitionStatus {
799        let mut inner = self.0.lock().unwrap();
800
801        let origin = inner.state;
802        match origin {
803            TaskState::Unprepared => (),
804            TaskState::Prepared | TaskState::Preparing => {
805                gst::debug!(
806                    RUNTIME_CAT,
807                    obj = task_impl.obj(),
808                    "Task already {origin:?}",
809                );
810                return TransitionOk::Skipped {
811                    trigger: Trigger::Prepare,
812                    state: origin,
813                }
814                .into();
815            }
816            state => {
817                gst::warning!(
818                    RUNTIME_CAT,
819                    obj = task_impl.obj(),
820                    "Attempt to prepare Task in state {state:?}"
821                );
822                return TransitionError {
823                    trigger: Trigger::Prepare,
824                    state: inner.state,
825                    err_msg: gst::error_msg!(
826                        gst::CoreError::StateChange,
827                        ["Attempt to prepare Task in state {state:?}"]
828                    ),
829                }
830                .into();
831            }
832        }
833
834        assert!(inner.state_machine_handle.is_none());
835
836        inner.state = TaskState::Preparing;
837
838        gst::log!(
839            RUNTIME_CAT,
840            obj = task_impl.obj(),
841            "Spawning task state machine"
842        );
843        inner.state_machine_handle = Some(StateMachine::spawn(self.0.clone(), task_impl, context));
844
845        let ack_rx = match inner.trigger(Trigger::Prepare) {
846            Ok(ack_rx) => ack_rx,
847            Err(err) => return err.into(),
848        };
849        drop(inner);
850
851        TransitionStatus::Pending {
852            trigger: Trigger::Prepare,
853            origin: TaskState::Unprepared,
854            res_fut: Box::pin(ack_rx.map(Result::unwrap)),
855        }
856    }
857
858    pub fn unprepare(&self) -> TransitionStatus {
859        let mut inner = self.0.lock().unwrap();
860
861        let origin = inner.state;
862        let mut state_machine_handle = match origin {
863            TaskState::Stopped
864            | TaskState::Error
865            | TaskState::Prepared
866            | TaskState::Preparing
867            | TaskState::Unprepared => match inner.state_machine_handle.take() {
868                Some(state_machine_handle) => {
869                    gst::debug!(RUNTIME_CAT, "Unpreparing task");
870
871                    state_machine_handle
872                }
873                None => {
874                    gst::debug!(RUNTIME_CAT, "Task already unpreparing");
875                    return TransitionOk::Skipped {
876                        trigger: Trigger::Unprepare,
877                        state: origin,
878                    }
879                    .into();
880                }
881            },
882            state => {
883                gst::warning!(RUNTIME_CAT, "Attempt to unprepare Task in state {state:?}");
884                return TransitionError {
885                    trigger: Trigger::Unprepare,
886                    state: inner.state,
887                    err_msg: gst::error_msg!(
888                        gst::CoreError::StateChange,
889                        ["Attempt to unprepare Task in state {state:?}"]
890                    ),
891                }
892                .into();
893            }
894        };
895
896        let ack_rx = state_machine_handle.trigger(Trigger::Unprepare);
897        drop(inner);
898
899        let state_machine_end_fut = async {
900            state_machine_handle.join().await;
901            ack_rx.await.unwrap()
902        };
903
904        TransitionStatus::Pending {
905            trigger: Trigger::Unprepare,
906            origin,
907            res_fut: Box::pin(state_machine_end_fut),
908        }
909    }
910
911    /// Starts the `Task`.
912    ///
913    /// The execution occurs on the `Task` context.
914    pub fn start(&self) -> TransitionStatus {
915        let mut inner = self.0.lock().unwrap();
916
917        if let TaskState::Started = inner.state {
918            return TransitionOk::Skipped {
919                trigger: Trigger::Start,
920                state: TaskState::Started,
921            }
922            .into();
923        }
924
925        let ack_rx = match inner.trigger(Trigger::Start) {
926            Ok(ack_rx) => ack_rx,
927            Err(err) => return err.into(),
928        };
929
930        let origin = inner.state;
931        drop(inner);
932
933        TransitionStatus::Pending {
934            trigger: Trigger::Start,
935            origin,
936            res_fut: Box::pin(ack_rx.map(Result::unwrap)),
937        }
938    }
939
940    /// Requests the `Task` loop to pause.
941    ///
942    /// If an item handling is in progress, it will run to completion,
943    /// then no iterations will be executed before `start` is called again.
944    pub fn pause(&self) -> TransitionStatus {
945        self.push_pending(Trigger::Pause)
946    }
947
948    pub fn flush_start(&self) -> TransitionStatus {
949        self.push_pending(Trigger::FlushStart)
950    }
951
952    pub fn flush_stop(&self) -> TransitionStatus {
953        self.push_pending(Trigger::FlushStop)
954    }
955
956    /// Stops the `Started` `Task` and wait for it to finish.
957    pub fn stop(&self) -> TransitionStatus {
958        self.push_pending(Trigger::Stop)
959    }
960
961    /// Pushes a [`Trigger`] and returns TransitionStatus::Pending.
962    fn push_pending(&self, trigger: Trigger) -> TransitionStatus {
963        let mut inner = self.0.lock().unwrap();
964
965        let ack_rx = match inner.trigger(trigger) {
966            Ok(ack_rx) => ack_rx,
967            Err(err) => return err.into(),
968        };
969
970        let origin = inner.state;
971        drop(inner);
972
973        TransitionStatus::Pending {
974            trigger,
975            origin,
976            res_fut: Box::pin(ack_rx.map(Result::unwrap)),
977        }
978    }
979}
980
981struct StateMachine<Task: TaskImpl> {
982    task_impl: Task,
983    triggering_evt_rx: async_mpsc::Receiver<TriggeringEvent>,
984    pending_triggering_evt: Option<TriggeringEvent>,
985}
986
987macro_rules! exec_action {
988    ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
989        match $self.task_impl.$action().await {
990            Ok(()) => Ok($triggering_evt),
991            Err(err) => {
992                // FIXME problem is that we loose the origin trigger in the
993                //       final TransitionStatus.
994
995                let next_trigger = $self
996                    .task_impl
997                    .handle_action_error($triggering_evt.trigger, $origin, err)
998                    .await;
999
1000                // Convert triggering event according to the error handler's decision
1001                gst::trace!(
1002                    RUNTIME_CAT,
1003                    obj = $self.task_impl.obj(),
1004                    "TaskImpl transition action error: converting {:?} to {next_trigger:?}",
1005                    $triggering_evt.trigger,
1006                );
1007
1008                $triggering_evt.trigger = next_trigger;
1009                $self.pending_triggering_evt = Some($triggering_evt);
1010
1011                Err(())
1012            }
1013        }
1014    }};
1015}
1016
1017impl<Task: TaskImpl> StateMachine<Task> {
1018    fn spawn(
1019        task_inner: Arc<Mutex<TaskInner>>,
1020        task_impl: Task,
1021        context: Context,
1022    ) -> StateMachineHandle {
1023        let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
1024
1025        let state_machine = StateMachine {
1026            task_impl,
1027            triggering_evt_rx,
1028            pending_triggering_evt: None,
1029        };
1030
1031        StateMachineHandle {
1032            join_handle: context.spawn_and_unpark(state_machine.run(task_inner)),
1033            triggering_evt_tx,
1034            context,
1035        }
1036    }
1037
1038    async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>) {
1039        let mut triggering_evt = self
1040            .triggering_evt_rx
1041            .next()
1042            .await
1043            .expect("triggering_evt_rx dropped");
1044
1045        if let Trigger::Prepare = triggering_evt.trigger {
1046            gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Preparing task");
1047
1048            let res = exec_action!(
1049                self,
1050                prepare,
1051                triggering_evt,
1052                TaskState::Unprepared,
1053                &task_inner
1054            );
1055            if let Ok(triggering_evt) = res {
1056                let mut task_inner = task_inner.lock().unwrap();
1057                let res = Ok(TransitionOk::Complete {
1058                    origin: TaskState::Unprepared,
1059                    target: TaskState::Prepared,
1060                });
1061
1062                task_inner.state = TaskState::Prepared;
1063                triggering_evt.send_ack(res);
1064
1065                gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task Prepared");
1066            }
1067        } else {
1068            panic!("Unexpected initial trigger {:?}", triggering_evt.trigger);
1069        }
1070
1071        loop {
1072            triggering_evt = match self.pending_triggering_evt.take() {
1073                Some(pending_triggering_evt) => pending_triggering_evt,
1074                None => self
1075                    .triggering_evt_rx
1076                    .next()
1077                    .await
1078                    .expect("triggering_evt_rx dropped"),
1079            };
1080            gst::trace!(
1081                RUNTIME_CAT,
1082                obj = self.task_impl.obj(),
1083                "State machine popped {triggering_evt:?}"
1084            );
1085
1086            match triggering_evt.trigger {
1087                Trigger::Error => {
1088                    let mut task_inner = task_inner.lock().unwrap();
1089                    task_inner.switch_to_err(triggering_evt);
1090                    gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Switched to Error");
1091                }
1092                Trigger::Start => {
1093                    let origin = {
1094                        let mut task_inner = task_inner.lock().unwrap();
1095                        let origin = task_inner.state;
1096                        match origin {
1097                            TaskState::Stopped | TaskState::Paused | TaskState::Prepared => (),
1098                            TaskState::PausedFlushing => {
1099                                task_inner.switch_to_state(TaskState::Flushing, triggering_evt);
1100                                gst::trace!(
1101                                    RUNTIME_CAT,
1102                                    obj = self.task_impl.obj(),
1103                                    "Switched from PausedFlushing to Flushing"
1104                                );
1105                                continue;
1106                            }
1107                            TaskState::Error => {
1108                                triggering_evt.send_err_ack();
1109                                continue;
1110                            }
1111                            state => {
1112                                task_inner.skip_triggering_evt(triggering_evt);
1113                                gst::trace!(
1114                                    RUNTIME_CAT,
1115                                    obj = self.task_impl.obj(),
1116                                    "Skipped Start in state {state:?}"
1117                                );
1118                                continue;
1119                            }
1120                        }
1121
1122                        origin
1123                    };
1124
1125                    self.start(triggering_evt, origin, &task_inner).await;
1126                    // next/pending triggering event handled in next iteration
1127                }
1128                Trigger::Pause => {
1129                    let (origin, target) = {
1130                        let mut task_inner = task_inner.lock().unwrap();
1131                        let origin = task_inner.state;
1132                        match origin {
1133                            TaskState::Started | TaskState::Stopped | TaskState::Prepared => {
1134                                (origin, TaskState::Paused)
1135                            }
1136                            TaskState::Flushing => (origin, TaskState::PausedFlushing),
1137                            TaskState::Error => (TaskState::Error, TaskState::Error),
1138                            state => {
1139                                task_inner.skip_triggering_evt(triggering_evt);
1140                                gst::trace!(
1141                                    RUNTIME_CAT,
1142                                    obj = self.task_impl.obj(),
1143                                    "Skipped Pause in state {state:?}"
1144                                );
1145                                continue;
1146                            }
1147                        }
1148                    };
1149
1150                    let res = exec_action!(self, pause, triggering_evt, origin, &task_inner);
1151                    if let Ok(triggering_evt) = res {
1152                        task_inner
1153                            .lock()
1154                            .unwrap()
1155                            .switch_to_state(target, triggering_evt);
1156                        gst::trace!(
1157                            RUNTIME_CAT,
1158                            obj = self.task_impl.obj(),
1159                            "Task loop {target:?}"
1160                        );
1161                    }
1162                }
1163                Trigger::Stop => {
1164                    let (origin, target) = {
1165                        let mut task_inner = task_inner.lock().unwrap();
1166                        let origin = task_inner.state;
1167                        match origin {
1168                            TaskState::Started
1169                            | TaskState::Paused
1170                            | TaskState::PausedFlushing
1171                            | TaskState::Flushing => (origin, TaskState::Stopped),
1172                            TaskState::Error => (TaskState::Error, TaskState::Error),
1173                            state => {
1174                                task_inner.skip_triggering_evt(triggering_evt);
1175                                gst::trace!(
1176                                    RUNTIME_CAT,
1177                                    obj = self.task_impl.obj(),
1178                                    "Skipped Stop in state {state:?}"
1179                                );
1180                                continue;
1181                            }
1182                        }
1183                    };
1184
1185                    let res = exec_action!(self, stop, triggering_evt, origin, &task_inner);
1186                    if let Ok(triggering_evt) = res {
1187                        task_inner
1188                            .lock()
1189                            .unwrap()
1190                            .switch_to_state(target, triggering_evt);
1191                        gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1192                    }
1193                }
1194                Trigger::FlushStart => {
1195                    let (origin, target) = {
1196                        let mut task_inner = task_inner.lock().unwrap();
1197                        let origin = task_inner.state;
1198                        match origin {
1199                            TaskState::Started => (origin, TaskState::Flushing),
1200                            TaskState::Paused => (origin, TaskState::PausedFlushing),
1201                            TaskState::Error => (TaskState::Error, TaskState::Error),
1202                            state => {
1203                                task_inner.skip_triggering_evt(triggering_evt);
1204                                gst::trace!(
1205                                    RUNTIME_CAT,
1206                                    obj = self.task_impl.obj(),
1207                                    "Skipped FlushStart in state {state:?}"
1208                                );
1209                                continue;
1210                            }
1211                        }
1212                    };
1213
1214                    let res = exec_action!(self, flush_start, triggering_evt, origin, &task_inner);
1215                    if let Ok(triggering_evt) = res {
1216                        task_inner
1217                            .lock()
1218                            .unwrap()
1219                            .switch_to_state(target, triggering_evt);
1220                        gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1221                    }
1222                }
1223                Trigger::FlushStop => {
1224                    let origin = task_inner.lock().unwrap().state;
1225                    let is_paused = match origin {
1226                        TaskState::Flushing => false,
1227                        TaskState::PausedFlushing => true,
1228                        TaskState::Error => {
1229                            triggering_evt.send_err_ack();
1230                            continue;
1231                        }
1232                        state => {
1233                            task_inner
1234                                .lock()
1235                                .unwrap()
1236                                .skip_triggering_evt(triggering_evt);
1237                            gst::trace!(
1238                                RUNTIME_CAT,
1239                                obj = self.task_impl.obj(),
1240                                "Skipped FlushStop in state {state:?}"
1241                            );
1242                            continue;
1243                        }
1244                    };
1245
1246                    let res = exec_action!(self, flush_stop, triggering_evt, origin, &task_inner);
1247                    if let Ok(triggering_evt) = res {
1248                        if is_paused {
1249                            task_inner
1250                                .lock()
1251                                .unwrap()
1252                                .switch_to_state(TaskState::Paused, triggering_evt);
1253                            gst::trace!(
1254                                RUNTIME_CAT,
1255                                obj = self.task_impl.obj(),
1256                                "Switched from PausedFlushing to Paused"
1257                            );
1258                        } else {
1259                            self.start(triggering_evt, origin, &task_inner).await;
1260                            // next/pending triggering event handled in next iteration
1261                        }
1262                    }
1263                }
1264                Trigger::Unprepare => {
1265                    // Unprepare is not joined by an ack_rx but by joining the state machine handle
1266                    self.task_impl.unprepare().await;
1267
1268                    task_inner
1269                        .lock()
1270                        .unwrap()
1271                        .switch_to_state(TaskState::Unprepared, triggering_evt);
1272
1273                    break;
1274                }
1275                _ => unreachable!("State machine handler {:?}", triggering_evt),
1276            }
1277        }
1278
1279        gst::trace!(
1280            RUNTIME_CAT,
1281            obj = self.task_impl.obj(),
1282            "Task state machine terminated"
1283        );
1284    }
1285
1286    async fn start(
1287        &mut self,
1288        mut triggering_evt: TriggeringEvent,
1289        origin: TaskState,
1290        task_inner: &Arc<Mutex<TaskInner>>,
1291    ) {
1292        match exec_action!(self, start, triggering_evt, origin, &task_inner) {
1293            Ok(triggering_evt) => {
1294                let mut task_inner = task_inner.lock().unwrap();
1295                task_inner.switch_to_state(TaskState::Started, triggering_evt);
1296            }
1297            Err(_) => {
1298                // error handled by exec_action
1299                return;
1300            }
1301        }
1302
1303        match self.run_loop().await {
1304            Ok(()) => (),
1305            Err(err) => {
1306                let next_trigger = self.task_impl.handle_loop_error(err).await;
1307                let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
1308                self.pending_triggering_evt = Some(triggering_evt);
1309            }
1310        }
1311    }
1312
1313    async fn run_loop(&mut self) -> Result<(), gst::FlowError> {
1314        gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task loop started");
1315
1316        let mut try_next_res;
1317        loop {
1318            try_next_res = {
1319                // select_biased requires the selected futures to implement
1320                // `FusedFuture`. Because async trait functions are not stable,
1321                // we use `BoxFuture` for the `TaskImpl` function, including
1322                // `try_next`. Since we need to get a new `BoxFuture` at
1323                // each iteration, we can guarantee that the future is
1324                // always valid for use in `select_biased`.
1325                let mut try_next_fut = pin!(self.task_impl.try_next().fuse());
1326                futures::select_biased! {
1327                    triggering_evt = self.triggering_evt_rx.next() => {
1328                        let triggering_evt = triggering_evt.expect("broken state machine channel");
1329                        gst::trace!(
1330                            RUNTIME_CAT,
1331                            "Task loop handing {:?} to state machine",
1332                            triggering_evt,
1333                        );
1334                        self.pending_triggering_evt = Some(triggering_evt);
1335                        return Ok(());
1336                    }
1337                    try_next_res = try_next_fut => try_next_res,
1338                }
1339            };
1340
1341            let item = try_next_res.inspect_err(|err| {
1342                gst::debug!(
1343                    RUNTIME_CAT,
1344                    obj = self.task_impl.obj(),
1345                    "TaskImpl::try_next returned {err:?}"
1346                );
1347            })?;
1348
1349            self.task_impl.handle_item(item).await.inspect_err(|&err| {
1350                gst::debug!(
1351                    RUNTIME_CAT,
1352                    obj = self.task_impl.obj(),
1353                    "TaskImpl::handle_item returned {err:?}"
1354                );
1355            })?;
1356        }
1357    }
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362    use futures::channel::{mpsc, oneshot};
1363    use futures::executor::block_on;
1364    use futures::prelude::*;
1365    use gst::glib;
1366    use gst::glib::prelude::*;
1367    use std::future::pending;
1368    use std::time::Duration;
1369
1370    use super::{
1371        Task, TaskImpl,
1372        TaskState::{self, *},
1373        TransitionError, TransitionOk,
1374        TransitionOk::*,
1375        TransitionStatus,
1376        TransitionStatus::*,
1377        Trigger::{self, *},
1378    };
1379    use crate::runtime::{Context, RUNTIME_CAT};
1380
1381    impl TransitionStatus {
1382        // Only useful for unit tests, use `block_on_or_add_sub_task_and_then`
1383        // or `block_on_or_add_sub_task` in user code.
1384        fn block_on(self) -> Result<TransitionOk, TransitionError> {
1385            assert!(!Context::is_context_thread());
1386            match self {
1387                Pending {
1388                    trigger, res_fut, ..
1389                } => {
1390                    gst::debug!(
1391                        RUNTIME_CAT,
1392                        "Awaiting for {:?} ack on current thread",
1393                        trigger,
1394                    );
1395                    futures::executor::block_on(res_fut)
1396                }
1397                Ready(res) => res,
1398            }
1399        }
1400    }
1401
1402    #[track_caller]
1403    fn stop_then_unprepare(task: Task) {
1404        task.stop().block_on().unwrap();
1405        task.unprepare().block_on().unwrap();
1406    }
1407
1408    #[test]
1409    fn nominal() {
1410        gst::init().unwrap();
1411
1412        struct TaskTest {
1413            obj: gst::Object,
1414            prepared_sender: mpsc::Sender<()>,
1415            started_sender: mpsc::Sender<()>,
1416            try_next_ready_sender: mpsc::Sender<()>,
1417            try_next_receiver: mpsc::Receiver<()>,
1418            handle_item_ready_sender: mpsc::Sender<()>,
1419            handle_item_sender: mpsc::Sender<()>,
1420            paused_sender: mpsc::Sender<()>,
1421            stopped_sender: mpsc::Sender<()>,
1422            unprepared_sender: mpsc::Sender<()>,
1423        }
1424
1425        impl TaskImpl for TaskTest {
1426            type Item = ();
1427
1428            fn obj(&self) -> &impl IsA<glib::Object> {
1429                &self.obj
1430            }
1431
1432            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1433                gst::debug!(RUNTIME_CAT, "nominal: prepared");
1434                self.prepared_sender.send(()).await.unwrap();
1435                Ok(())
1436            }
1437
1438            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1439                gst::debug!(RUNTIME_CAT, "nominal: started");
1440                self.started_sender.send(()).await.unwrap();
1441                Ok(())
1442            }
1443
1444            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1445                gst::debug!(RUNTIME_CAT, "nominal: entering try_next");
1446                self.try_next_ready_sender.send(()).await.unwrap();
1447                gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next");
1448                self.try_next_receiver.next().await.unwrap();
1449                Ok(())
1450            }
1451
1452            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1453                gst::debug!(RUNTIME_CAT, "nominal: entering handle_item");
1454                self.handle_item_ready_sender.send(()).await.unwrap();
1455
1456                gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item");
1457                self.handle_item_sender.send(()).await.unwrap();
1458                gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item");
1459
1460                Ok(())
1461            }
1462
1463            async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
1464                gst::debug!(RUNTIME_CAT, "nominal: paused");
1465                self.paused_sender.send(()).await.unwrap();
1466                Ok(())
1467            }
1468
1469            async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
1470                gst::debug!(RUNTIME_CAT, "nominal: stopped");
1471                self.stopped_sender.send(()).await.unwrap();
1472                Ok(())
1473            }
1474
1475            async fn unprepare(&mut self) {
1476                gst::debug!(RUNTIME_CAT, "nominal: unprepared");
1477                self.unprepared_sender.send(()).await.unwrap();
1478            }
1479        }
1480
1481        let context = Context::acquire("nominal", Duration::from_millis(2)).unwrap();
1482
1483        let task = Task::default();
1484
1485        assert_eq!(task.state(), Unprepared);
1486
1487        gst::debug!(RUNTIME_CAT, "nominal: preparing");
1488
1489        let (prepared_sender, mut prepared_receiver) = mpsc::channel(1);
1490        let (started_sender, mut started_receiver) = mpsc::channel(1);
1491        let (try_next_ready_sender, mut try_next_ready_receiver) = mpsc::channel(1);
1492        let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
1493        let (handle_item_ready_sender, mut handle_item_ready_receiver) = mpsc::channel(1);
1494        let (handle_item_sender, mut handle_item_receiver) = mpsc::channel(0);
1495        let (paused_sender, mut paused_receiver) = mpsc::channel(1);
1496        let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
1497        let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
1498        let obj = gst::Pad::builder(gst::PadDirection::Unknown)
1499            .name("runtime::Task::nominal")
1500            .build();
1501        let prepare_status = task.prepare(
1502            TaskTest {
1503                obj: obj.clone().into(),
1504                prepared_sender,
1505                started_sender,
1506                try_next_ready_sender,
1507                try_next_receiver,
1508                handle_item_ready_sender,
1509                handle_item_sender,
1510                paused_sender,
1511                stopped_sender,
1512                unprepared_sender,
1513            },
1514            context,
1515        );
1516
1517        assert!(prepare_status.is_pending());
1518        match prepare_status {
1519            Pending {
1520                trigger: Prepare,
1521                origin: Unprepared,
1522                ..
1523            } => (),
1524            other => panic!("{other:?}"),
1525        };
1526
1527        gst::debug!(RUNTIME_CAT, "nominal: starting (async prepare)");
1528        let start_status = task.start().check().unwrap();
1529
1530        block_on(prepared_receiver.next()).unwrap();
1531        assert_eq!(
1532            prepare_status.block_on_or_add_subtask(&obj).unwrap(),
1533            Complete {
1534                origin: Unprepared,
1535                target: Prepared,
1536            },
1537        );
1538
1539        block_on(started_receiver.next()).unwrap();
1540        assert_eq!(
1541            start_status.block_on().unwrap(),
1542            Complete {
1543                origin: Prepared,
1544                target: Started,
1545            }
1546        );
1547        assert_eq!(task.state(), Started);
1548
1549        // unlock task loop and keep looping
1550        block_on(try_next_ready_receiver.next()).unwrap();
1551        block_on(try_next_sender.send(())).unwrap();
1552        block_on(handle_item_ready_receiver.next()).unwrap();
1553        block_on(handle_item_receiver.next()).unwrap();
1554
1555        gst::debug!(RUNTIME_CAT, "nominal: starting (redundant)");
1556        // already started
1557        assert_eq!(
1558            task.start().block_on().unwrap(),
1559            Skipped {
1560                trigger: Start,
1561                state: Started,
1562            },
1563        );
1564        assert_eq!(task.state(), Started);
1565
1566        // Attempt to prepare Task in state Started (also tests check)
1567        match task.unprepare().check().unwrap_err() {
1568            TransitionError {
1569                trigger: Unprepare,
1570                state: Started,
1571                ..
1572            } => (),
1573            other => panic!("{other:?}"),
1574        }
1575
1576        gst::debug!(RUNTIME_CAT, "nominal: pause cancelling try_next");
1577        block_on(try_next_ready_receiver.next()).unwrap();
1578
1579        let pause_status = task.pause().check().unwrap();
1580        gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1581        block_on(paused_receiver.next()).unwrap();
1582        assert_eq!(
1583            pause_status.block_on().unwrap(),
1584            Complete {
1585                origin: Started,
1586                target: Paused,
1587            },
1588        );
1589
1590        // handle_item not reached
1591        assert!(handle_item_ready_receiver.try_recv().is_err());
1592        // try_next not reached again
1593        assert!(try_next_ready_receiver.try_recv().is_err());
1594
1595        gst::debug!(
1596            RUNTIME_CAT,
1597            "nominal: starting (after pause cancelling try_next)"
1598        );
1599        let start_receiver = task.start().check().unwrap();
1600        block_on(started_receiver.next()).unwrap();
1601        assert_eq!(
1602            start_receiver.block_on().unwrap(),
1603            Complete {
1604                origin: Paused,
1605                target: Started,
1606            },
1607        );
1608        assert_eq!(task.state(), Started);
1609
1610        gst::debug!(RUNTIME_CAT, "nominal: pause // handle_item");
1611        block_on(try_next_ready_receiver.next()).unwrap();
1612        block_on(try_next_sender.send(())).unwrap();
1613        // Make sure item is picked
1614        block_on(handle_item_ready_receiver.next()).unwrap();
1615
1616        gst::debug!(RUNTIME_CAT, "nominal: requesting to pause");
1617        let pause_status = task.pause().check().unwrap();
1618
1619        gst::debug!(RUNTIME_CAT, "nominal: unlocking item handling");
1620        block_on(handle_item_receiver.next()).unwrap();
1621
1622        gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1623        block_on(paused_receiver.next()).unwrap();
1624        assert_eq!(
1625            pause_status.block_on().unwrap(),
1626            Complete {
1627                origin: Started,
1628                target: Paused,
1629            },
1630        );
1631
1632        // try_next not reached again
1633        assert!(try_next_ready_receiver.try_recv().is_err());
1634
1635        gst::debug!(
1636            RUNTIME_CAT,
1637            "nominal: starting (after pause // handle_item)"
1638        );
1639        let start_receiver = task.start().check().unwrap();
1640        block_on(started_receiver.next()).unwrap();
1641        assert_eq!(
1642            start_receiver.block_on().unwrap(),
1643            Complete {
1644                origin: Paused,
1645                target: Started,
1646            },
1647        );
1648        assert_eq!(task.state(), Started);
1649
1650        gst::debug!(RUNTIME_CAT, "nominal: stopping");
1651        assert_eq!(
1652            task.stop().block_on().unwrap(),
1653            Complete {
1654                origin: Started,
1655                target: Stopped,
1656            },
1657        );
1658
1659        assert_eq!(task.state(), Stopped);
1660        let _ = block_on(stopped_receiver.next());
1661
1662        // purge remaining iteration received before stop if any
1663        let _ = try_next_ready_receiver.try_recv();
1664
1665        gst::debug!(RUNTIME_CAT, "nominal: starting (after stop)");
1666        assert_eq!(
1667            task.start().block_on().unwrap(),
1668            Complete {
1669                origin: Stopped,
1670                target: Started,
1671            },
1672        );
1673        let _ = block_on(started_receiver.next());
1674
1675        gst::debug!(RUNTIME_CAT, "nominal: stopping");
1676        assert_eq!(
1677            task.stop().block_on().unwrap(),
1678            Complete {
1679                origin: Started,
1680                target: Stopped,
1681            },
1682        );
1683
1684        assert_eq!(
1685            task.unprepare().block_on().unwrap(),
1686            Complete {
1687                origin: Stopped,
1688                target: Unprepared,
1689            },
1690        );
1691
1692        assert_eq!(task.state(), Unprepared);
1693        let _ = block_on(unprepared_receiver.next());
1694    }
1695
1696    #[test]
1697    fn prepare_error() {
1698        gst::init().unwrap();
1699
1700        struct TaskPrepareTest {
1701            obj: gst::Object,
1702            prepare_error_sender: mpsc::Sender<()>,
1703        }
1704
1705        impl TaskImpl for TaskPrepareTest {
1706            type Item = ();
1707
1708            fn obj(&self) -> &impl IsA<glib::Object> {
1709                &self.obj
1710            }
1711
1712            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1713                gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
1714                Err(gst::error_msg!(
1715                    gst::ResourceError::Failed,
1716                    ["prepare_error: intentional error"]
1717                ))
1718            }
1719
1720            async fn handle_action_error(
1721                &mut self,
1722                trigger: Trigger,
1723                state: TaskState,
1724                err: gst::ErrorMessage,
1725            ) -> Trigger {
1726                gst::debug!(
1727                    RUNTIME_CAT,
1728                    "prepare_error: handling prepare error {:?}",
1729                    err
1730                );
1731                match (trigger, state) {
1732                    (Prepare, Unprepared) => {
1733                        self.prepare_error_sender.send(()).await.unwrap();
1734                    }
1735                    other => unreachable!("{:?}", other),
1736                }
1737                Trigger::Error
1738            }
1739
1740            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1741                unreachable!("prepare_error: try_next");
1742            }
1743
1744            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1745                unreachable!("prepare_error: handle_item");
1746            }
1747        }
1748
1749        let context = Context::acquire("prepare_error", Duration::from_millis(2)).unwrap();
1750
1751        let task = Task::default();
1752
1753        assert_eq!(task.state(), Unprepared);
1754
1755        let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
1756        let prepare_status = task.prepare(
1757            TaskPrepareTest {
1758                obj: gst::Pad::builder(gst::PadDirection::Unknown)
1759                    .name("runtime::Task::prepare_error")
1760                    .build()
1761                    .into(),
1762                prepare_error_sender,
1763            },
1764            context,
1765        );
1766
1767        gst::debug!(
1768            RUNTIME_CAT,
1769            "prepare_error: await action error notification"
1770        );
1771        block_on(prepare_error_receiver.next()).unwrap();
1772
1773        match prepare_status.block_on().unwrap_err() {
1774            TransitionError {
1775                trigger: Trigger::Error,
1776                state: Preparing,
1777                ..
1778            } => (),
1779            other => panic!("{other:?}"),
1780        }
1781
1782        // Wait for state machine to reach Error
1783        while TaskState::Error != task.state() {
1784            std::thread::sleep(Duration::from_millis(2));
1785        }
1786
1787        match task.start().block_on().unwrap_err() {
1788            TransitionError {
1789                trigger: Start,
1790                state: TaskState::Error,
1791                ..
1792            } => (),
1793            other => panic!("{other:?}"),
1794        }
1795
1796        block_on(task.unprepare()).unwrap();
1797    }
1798
1799    #[test]
1800    fn prepare_start_ok() {
1801        // Hold the preparation function so that it completes after the start request is engaged
1802
1803        gst::init().unwrap();
1804
1805        struct TaskPrepareTest {
1806            obj: gst::Object,
1807            prepare_receiver: mpsc::Receiver<()>,
1808        }
1809
1810        impl TaskImpl for TaskPrepareTest {
1811            type Item = ();
1812
1813            fn obj(&self) -> &impl IsA<glib::Object> {
1814                &self.obj
1815            }
1816
1817            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1818                gst::debug!(
1819                    RUNTIME_CAT,
1820                    "prepare_start_ok: preparation awaiting trigger"
1821                );
1822                self.prepare_receiver.next().await.unwrap();
1823                gst::debug!(RUNTIME_CAT, "prepare_start_ok: preparation complete Ok");
1824                Ok(())
1825            }
1826
1827            async fn handle_action_error(
1828                &mut self,
1829                _trigger: Trigger,
1830                _state: TaskState,
1831                _err: gst::ErrorMessage,
1832            ) -> Trigger {
1833                unreachable!("prepare_start_ok: handle_prepare_error");
1834            }
1835
1836            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1837                gst::debug!(RUNTIME_CAT, "prepare_start_ok: started");
1838                Ok(())
1839            }
1840
1841            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1842                pending().await
1843            }
1844
1845            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1846                unreachable!("prepare_start_ok: handle_item");
1847            }
1848        }
1849
1850        let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap();
1851
1852        let task = Task::default();
1853
1854        let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
1855        let fut = task.prepare(
1856            TaskPrepareTest {
1857                obj: gst::Pad::builder(gst::PadDirection::Unknown)
1858                    .name("runtime::Task::prepare_start_ok")
1859                    .build()
1860                    .into(),
1861                prepare_receiver,
1862            },
1863            context,
1864        );
1865        drop(fut);
1866
1867        let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
1868        let (ready_sender, ready_receiver) = oneshot::channel();
1869        let start_handle = start_ctx.spawn(async move {
1870            assert_eq!(task.state(), Preparing);
1871            gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting");
1872            let start_status = task.start();
1873            match start_status {
1874                Pending {
1875                    trigger: Start,
1876                    origin: Preparing,
1877                    ..
1878                } => (),
1879                other => panic!("{other:?}"),
1880            }
1881            ready_sender.send(()).unwrap();
1882            assert_eq!(
1883                start_status.await.unwrap(),
1884                Complete {
1885                    origin: Prepared,
1886                    target: Started,
1887                },
1888            );
1889            assert_eq!(task.state(), Started);
1890
1891            let stop_status = task.stop();
1892            match stop_status {
1893                Pending {
1894                    trigger: Stop,
1895                    origin: Started,
1896                    ..
1897                } => (),
1898                other => panic!("{other:?}"),
1899            }
1900            assert_eq!(
1901                stop_status.await.unwrap(),
1902                Complete {
1903                    origin: Started,
1904                    target: Stopped,
1905                },
1906            );
1907            assert_eq!(task.state(), Stopped);
1908
1909            let unprepare_status = task.unprepare();
1910            match unprepare_status {
1911                Pending {
1912                    trigger: Unprepare,
1913                    origin: Stopped,
1914                    ..
1915                } => (),
1916                other => panic!("{other:?}"),
1917            };
1918            assert_eq!(
1919                unprepare_status.await.unwrap(),
1920                Complete {
1921                    origin: Stopped,
1922                    target: Unprepared,
1923                },
1924            );
1925            assert_eq!(task.state(), Unprepared);
1926        });
1927
1928        gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
1929        block_on(ready_receiver).unwrap();
1930
1931        gst::debug!(RUNTIME_CAT, "prepare_start_ok: triggering preparation");
1932        block_on(prepare_sender.send(())).unwrap();
1933
1934        block_on(start_handle).unwrap();
1935    }
1936
1937    #[test]
1938    fn prepare_start_error() {
1939        // Hold the preparation function so that it completes after the start request is engaged
1940
1941        gst::init().unwrap();
1942
1943        struct TaskPrepareTest {
1944            obj: gst::Object,
1945            prepare_receiver: mpsc::Receiver<()>,
1946            prepare_error_sender: mpsc::Sender<()>,
1947        }
1948
1949        impl TaskImpl for TaskPrepareTest {
1950            type Item = ();
1951
1952            fn obj(&self) -> &impl IsA<glib::Object> {
1953                &self.obj
1954            }
1955
1956            async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1957                gst::debug!(
1958                    RUNTIME_CAT,
1959                    "prepare_start_error: preparation awaiting trigger"
1960                );
1961                self.prepare_receiver.next().await.unwrap();
1962                gst::debug!(RUNTIME_CAT, "prepare_start_error: preparation complete Err");
1963
1964                Err(gst::error_msg!(
1965                    gst::ResourceError::Failed,
1966                    ["prepare_start_error: intentional error"]
1967                ))
1968            }
1969
1970            async fn handle_action_error(
1971                &mut self,
1972                trigger: Trigger,
1973                state: TaskState,
1974                err: gst::ErrorMessage,
1975            ) -> Trigger {
1976                gst::debug!(
1977                    RUNTIME_CAT,
1978                    "prepare_start_error: handling prepare error {:?}",
1979                    err
1980                );
1981                match (trigger, state) {
1982                    (Prepare, Unprepared) => {
1983                        self.prepare_error_sender.send(()).await.unwrap();
1984                    }
1985                    other => panic!("action error for {other:?}"),
1986                }
1987                Trigger::Error
1988            }
1989
1990            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1991                unreachable!("prepare_start_error: start");
1992            }
1993
1994            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1995                unreachable!("prepare_start_error: try_next");
1996            }
1997
1998            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1999                unreachable!("prepare_start_error: handle_item");
2000            }
2001        }
2002
2003        let context = Context::acquire("prepare_start_error", Duration::from_millis(2)).unwrap();
2004
2005        let task = Task::default();
2006
2007        let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
2008        let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
2009        let prepare_status = task.prepare(
2010            TaskPrepareTest {
2011                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2012                    .name("runtime::Task::prepare_start_error")
2013                    .build()
2014                    .into(),
2015                prepare_receiver,
2016                prepare_error_sender,
2017            },
2018            context,
2019        );
2020        match prepare_status {
2021            Pending {
2022                trigger: Prepare,
2023                origin: Unprepared,
2024                ..
2025            } => (),
2026            other => panic!("{other:?}"),
2027        };
2028
2029        let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap();
2030        let (ready_sender, ready_receiver) = oneshot::channel();
2031        let start_handle = start_ctx.spawn(async move {
2032            gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)");
2033            let fut = task.start();
2034            drop(fut);
2035            ready_sender.send(()).unwrap();
2036            // FIXME we loose the origin Trigger (Start)
2037            // and only get the Trigger returned by handle_action_error
2038            // see also: comment in exec_action!
2039            match prepare_status.await {
2040                Err(TransitionError {
2041                    trigger: Trigger::Error,
2042                    state: Preparing,
2043                    ..
2044                }) => (),
2045                other => panic!("{other:?}"),
2046            }
2047
2048            let unprepare_status = task.unprepare();
2049            match unprepare_status {
2050                Pending {
2051                    trigger: Unprepare,
2052                    origin: TaskState::Error,
2053                    ..
2054                } => (),
2055                other => panic!("{other:?}"),
2056            };
2057            assert_eq!(
2058                unprepare_status.await.unwrap(),
2059                Complete {
2060                    origin: TaskState::Error,
2061                    target: Unprepared,
2062                },
2063            );
2064        });
2065
2066        gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx");
2067        block_on(ready_receiver).unwrap();
2068
2069        gst::debug!(
2070            RUNTIME_CAT,
2071            "prepare_start_error: triggering preparation (failure)"
2072        );
2073        block_on(prepare_sender.send(())).unwrap();
2074
2075        gst::debug!(
2076            RUNTIME_CAT,
2077            "prepare_start_error: await prepare error notification"
2078        );
2079        block_on(prepare_error_receiver.next()).unwrap();
2080
2081        block_on(start_handle).unwrap();
2082    }
2083
2084    #[test]
2085    fn item_error() {
2086        gst::init().unwrap();
2087
2088        struct TaskTest {
2089            obj: gst::Object,
2090            try_next_receiver: mpsc::Receiver<gst::FlowError>,
2091        }
2092
2093        impl TaskImpl for TaskTest {
2094            type Item = gst::FlowError;
2095
2096            fn obj(&self) -> &impl IsA<glib::Object> {
2097                &self.obj
2098            }
2099
2100            async fn try_next(&mut self) -> Result<gst::FlowError, gst::FlowError> {
2101                gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
2102                Ok(self.try_next_receiver.next().await.unwrap())
2103            }
2104
2105            async fn handle_item(&mut self, item: gst::FlowError) -> Result<(), gst::FlowError> {
2106                gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item);
2107                Err(item)
2108            }
2109        }
2110
2111        let context = Context::acquire("item_error", Duration::from_millis(2)).unwrap();
2112        let task = Task::default();
2113        gst::debug!(RUNTIME_CAT, "item_error: prepare and start");
2114        let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
2115        task.prepare(
2116            TaskTest {
2117                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2118                    .name("runtime::Task::item_error")
2119                    .build()
2120                    .into(),
2121                try_next_receiver,
2122            },
2123            context,
2124        )
2125        .block_on()
2126        .unwrap();
2127        task.start().block_on().unwrap();
2128
2129        gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos");
2130        block_on(try_next_sender.send(gst::FlowError::Eos)).unwrap();
2131        // Wait for state machine to reach Stopped
2132        while Stopped != task.state() {
2133            std::thread::sleep(Duration::from_millis(2));
2134        }
2135
2136        gst::debug!(RUNTIME_CAT, "item_error: starting (after stop)");
2137        assert_eq!(
2138            task.start().block_on().unwrap(),
2139            Complete {
2140                origin: Stopped,
2141                target: Started,
2142            },
2143        );
2144
2145        gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Error");
2146        block_on(try_next_sender.send(gst::FlowError::Error)).unwrap();
2147        // Wait for state machine to reach Error
2148        while TaskState::Error != task.state() {
2149            std::thread::sleep(Duration::from_millis(2));
2150        }
2151
2152        gst::debug!(RUNTIME_CAT, "item_error: attempting to start (after Error)");
2153        match task.start().block_on().unwrap_err() {
2154            TransitionError {
2155                trigger: Start,
2156                state: TaskState::Error,
2157                ..
2158            } => (),
2159            other => panic!("{other:?}"),
2160        }
2161
2162        assert_eq!(
2163            task.unprepare().block_on().unwrap(),
2164            Complete {
2165                origin: TaskState::Error,
2166                target: Unprepared,
2167            },
2168        );
2169    }
2170
2171    #[test]
2172    fn flush_regular_sync() {
2173        gst::init().unwrap();
2174
2175        struct TaskFlushTest {
2176            obj: gst::Object,
2177            flush_start_sender: mpsc::Sender<()>,
2178            flush_stop_sender: mpsc::Sender<()>,
2179        }
2180
2181        impl TaskImpl for TaskFlushTest {
2182            type Item = ();
2183
2184            fn obj(&self) -> &impl IsA<glib::Object> {
2185                &self.obj
2186            }
2187
2188            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2189                pending().await
2190            }
2191
2192            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2193                unreachable!("flush_regular_sync: handle_item");
2194            }
2195
2196            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2197                gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
2198                self.flush_start_sender.send(()).await.unwrap();
2199                Ok(())
2200            }
2201
2202            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2203                gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopped flushing");
2204                self.flush_stop_sender.send(()).await.unwrap();
2205                Ok(())
2206            }
2207        }
2208
2209        let context = Context::acquire("flush_regular_sync", Duration::from_millis(2)).unwrap();
2210
2211        let task = Task::default();
2212
2213        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2214        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2215        let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2216            .name("runtime::Task::flush_regular_sync")
2217            .build();
2218        let fut = task.prepare(
2219            TaskFlushTest {
2220                obj: obj.clone().into(),
2221                flush_start_sender,
2222                flush_stop_sender,
2223            },
2224            context,
2225        );
2226        drop(fut);
2227
2228        gst::debug!(RUNTIME_CAT, "flush_regular_sync: start");
2229        block_on(task.start()).unwrap();
2230
2231        gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush");
2232        assert_eq!(
2233            task.flush_start().block_on().unwrap(),
2234            Complete {
2235                origin: Started,
2236                target: Flushing,
2237            },
2238        );
2239        assert_eq!(task.state(), Flushing);
2240
2241        block_on(flush_start_receiver.next()).unwrap();
2242
2243        gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
2244        assert_eq!(
2245            task.flush_stop().block_on_or_add_subtask(&obj).unwrap(),
2246            Complete {
2247                origin: Flushing,
2248                target: Started,
2249            },
2250        );
2251        assert_eq!(task.state(), Started);
2252
2253        block_on(flush_stop_receiver.next()).unwrap();
2254
2255        let fut = task.pause();
2256        drop(fut);
2257        stop_then_unprepare(task);
2258    }
2259
2260    #[test]
2261    fn flush_regular_different_context() {
2262        // Purpose: make sure a flush sequence triggered from a Context doesn't block.
2263        gst::init().unwrap();
2264
2265        struct TaskFlushTest {
2266            obj: gst::Object,
2267            flush_start_sender: mpsc::Sender<()>,
2268            flush_stop_sender: mpsc::Sender<()>,
2269        }
2270
2271        impl TaskImpl for TaskFlushTest {
2272            type Item = ();
2273
2274            fn obj(&self) -> &impl IsA<glib::Object> {
2275                &self.obj
2276            }
2277
2278            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2279                pending().await
2280            }
2281
2282            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2283                unreachable!("flush_regular_different_context: handle_item");
2284            }
2285
2286            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2287                gst::debug!(
2288                    RUNTIME_CAT,
2289                    "flush_regular_different_context: started flushing"
2290                );
2291                self.flush_start_sender.send(()).await.unwrap();
2292                Ok(())
2293            }
2294
2295            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2296                gst::debug!(
2297                    RUNTIME_CAT,
2298                    "flush_regular_different_context: stopped flushing"
2299                );
2300                self.flush_stop_sender.send(()).await.unwrap();
2301                Ok(())
2302            }
2303        }
2304
2305        let context =
2306            Context::acquire("flush_regular_different_context", Duration::from_millis(2)).unwrap();
2307
2308        let task = Task::default();
2309
2310        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2311        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2312        let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2313            .name("runtime::Task::flush_regular_different_context")
2314            .build();
2315        let fut = task.prepare(
2316            TaskFlushTest {
2317                obj: obj.clone().into(),
2318                flush_start_sender,
2319                flush_stop_sender,
2320            },
2321            context,
2322        );
2323        drop(fut);
2324
2325        gst::debug!(RUNTIME_CAT, "flush_regular_different_context: start");
2326        task.start().block_on().unwrap();
2327
2328        let oob_context = Context::acquire(
2329            "flush_regular_different_context_oob",
2330            Duration::from_millis(2),
2331        )
2332        .unwrap();
2333
2334        let task_clone = task.clone();
2335        let flush_res_fut = oob_context.spawn(async move {
2336            let flush_start_status = task_clone.flush_start();
2337            match flush_start_status {
2338                Pending {
2339                    trigger: FlushStart,
2340                    origin: Started,
2341                    ..
2342                } => (),
2343                other => panic!("{other:?}"),
2344            };
2345            assert_eq!(
2346                flush_start_status.await.unwrap(),
2347                Complete {
2348                    origin: Started,
2349                    target: Flushing,
2350                },
2351            );
2352            assert_eq!(task_clone.state(), Flushing);
2353            flush_start_receiver.next().await.unwrap();
2354
2355            let flush_stop_status = task_clone.flush_stop();
2356            match flush_stop_status {
2357                Pending {
2358                    trigger: FlushStop,
2359                    origin: Flushing,
2360                    ..
2361                } => (),
2362                other => panic!("{other:?}"),
2363            };
2364            assert_eq!(
2365                flush_stop_status.block_on_or_add_subtask(&obj).unwrap(),
2366                NotWaiting {
2367                    trigger: FlushStop,
2368                    origin: Flushing,
2369                },
2370            );
2371
2372            Context::drain_sub_tasks().await.unwrap();
2373            assert_eq!(task_clone.state(), Started);
2374        });
2375
2376        block_on(flush_res_fut).unwrap();
2377        block_on(flush_stop_receiver.next()).unwrap();
2378
2379        stop_then_unprepare(task);
2380    }
2381
2382    #[test]
2383    fn flush_regular_same_context() {
2384        // Purpose: make sure a flush sequence triggered from the same Context doesn't block.
2385        gst::init().unwrap();
2386
2387        struct TaskFlushTest {
2388            obj: gst::Object,
2389            flush_start_sender: mpsc::Sender<()>,
2390            flush_stop_sender: mpsc::Sender<()>,
2391        }
2392
2393        impl TaskImpl for TaskFlushTest {
2394            type Item = ();
2395
2396            fn obj(&self) -> &impl IsA<glib::Object> {
2397                &self.obj
2398            }
2399
2400            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2401                pending().await
2402            }
2403
2404            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2405                unreachable!("flush_regular_same_context: handle_item");
2406            }
2407
2408            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2409                gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
2410                self.flush_start_sender.send(()).await.unwrap();
2411                Ok(())
2412            }
2413
2414            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2415                gst::debug!(RUNTIME_CAT, "flush_regular_same_context: stopped flushing");
2416                self.flush_stop_sender.send(()).await.unwrap();
2417                Ok(())
2418            }
2419        }
2420
2421        let context =
2422            Context::acquire("flush_regular_same_context", Duration::from_millis(2)).unwrap();
2423
2424        let task = Task::default();
2425
2426        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2427        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2428        let fut = task.prepare(
2429            TaskFlushTest {
2430                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2431                    .name("runtime::Task::flush_regular_same_context")
2432                    .build()
2433                    .into(),
2434                flush_start_sender,
2435                flush_stop_sender,
2436            },
2437            context.clone(),
2438        );
2439        drop(fut);
2440
2441        block_on(task.start()).unwrap();
2442
2443        let task_clone = task.clone();
2444        let flush_handle = context.spawn(async move {
2445            let flush_start_status = task_clone.flush_start();
2446            match flush_start_status {
2447                Pending {
2448                    trigger: FlushStart,
2449                    origin: Started,
2450                    ..
2451                } => (),
2452                other => panic!("{other:?}"),
2453            };
2454            assert_eq!(
2455                flush_start_status.await.unwrap(),
2456                Complete {
2457                    origin: Started,
2458                    target: Flushing,
2459                },
2460            );
2461            assert_eq!(task_clone.state(), Flushing);
2462            flush_start_receiver.next().await.unwrap();
2463
2464            let flush_stop_status = task_clone.flush_stop();
2465            match flush_stop_status {
2466                Pending {
2467                    trigger: FlushStop,
2468                    origin: Flushing,
2469                    ..
2470                } => (),
2471                other => panic!("{other:?}"),
2472            };
2473            assert_eq!(
2474                flush_stop_status.await.unwrap(),
2475                Complete {
2476                    origin: Flushing,
2477                    target: Started,
2478                },
2479            );
2480            assert_eq!(task_clone.state(), Started);
2481        });
2482
2483        block_on(flush_handle).unwrap();
2484        block_on(flush_stop_receiver.next()).unwrap();
2485
2486        stop_then_unprepare(task);
2487    }
2488
2489    #[test]
2490    fn flush_from_loop() {
2491        // Purpose: make sure a flush_start triggered from an iteration doesn't block.
2492        gst::init().unwrap();
2493
2494        struct TaskFlushTest {
2495            obj: gst::Object,
2496            task: Task,
2497            flush_start_sender: mpsc::Sender<()>,
2498        }
2499
2500        impl TaskImpl for TaskFlushTest {
2501            type Item = ();
2502
2503            fn obj(&self) -> &impl IsA<glib::Object> {
2504                &self.obj
2505            }
2506
2507            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2508                Ok(())
2509            }
2510
2511            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2512                gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item");
2513                match self.task.flush_start() {
2514                    Pending {
2515                        trigger: FlushStart,
2516                        origin: Started,
2517                        ..
2518                    } => (),
2519                    other => panic!("{other:?}"),
2520                }
2521                Ok(())
2522            }
2523
2524            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2525                gst::debug!(RUNTIME_CAT, "flush_from_loop: started flushing");
2526                self.flush_start_sender.send(()).await.unwrap();
2527                Ok(())
2528            }
2529        }
2530
2531        let context = Context::acquire("flush_from_loop", Duration::from_millis(2)).unwrap();
2532
2533        let task = Task::default();
2534
2535        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2536        let fut = task.prepare(
2537            TaskFlushTest {
2538                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2539                    .name("runtime::Task::flush_from_loop")
2540                    .build()
2541                    .into(),
2542                task: task.clone(),
2543                flush_start_sender,
2544            },
2545            context,
2546        );
2547        drop(fut);
2548
2549        let fut = task.start();
2550        drop(fut);
2551
2552        gst::debug!(
2553            RUNTIME_CAT,
2554            "flush_from_loop: awaiting flush_start notification"
2555        );
2556        block_on(flush_start_receiver.next()).unwrap();
2557
2558        assert_eq!(
2559            task.stop().block_on().unwrap(),
2560            Complete {
2561                origin: Flushing,
2562                target: Stopped,
2563            },
2564        );
2565        task.unprepare().block_on().unwrap();
2566    }
2567
2568    #[test]
2569    fn pause_from_loop() {
2570        // Purpose: make sure a start triggered from an iteration doesn't block.
2571        // E.g. an auto pause cancellation after a delay.
2572        gst::init().unwrap();
2573
2574        struct TaskStartTest {
2575            obj: gst::Object,
2576            task: Task,
2577            pause_sender: mpsc::Sender<()>,
2578        }
2579
2580        impl TaskImpl for TaskStartTest {
2581            type Item = ();
2582
2583            fn obj(&self) -> &impl IsA<glib::Object> {
2584                &self.obj
2585            }
2586
2587            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2588                Ok(())
2589            }
2590
2591            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2592                gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
2593
2594                crate::runtime::timer::delay_for(Duration::from_millis(50)).await;
2595
2596                gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
2597                match self.task.pause() {
2598                    Pending {
2599                        trigger: Pause,
2600                        origin: Started,
2601                        ..
2602                    } => (),
2603                    other => panic!("{other:?}"),
2604                }
2605
2606                Ok(())
2607            }
2608
2609            async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
2610                gst::debug!(RUNTIME_CAT, "pause_from_loop: entering pause action");
2611                self.pause_sender.send(()).await.unwrap();
2612                Ok(())
2613            }
2614        }
2615
2616        let context = Context::acquire("pause_from_loop", Duration::from_millis(2)).unwrap();
2617
2618        let task = Task::default();
2619
2620        let (pause_sender, mut pause_receiver) = mpsc::channel(1);
2621        let fut = task.prepare(
2622            TaskStartTest {
2623                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2624                    .name("runtime::Task::pause_from_loop")
2625                    .build()
2626                    .into(),
2627                task: task.clone(),
2628                pause_sender,
2629            },
2630            context,
2631        );
2632        drop(fut);
2633
2634        let fut = task.start();
2635        drop(fut);
2636
2637        gst::debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification");
2638        block_on(pause_receiver.next()).unwrap();
2639
2640        stop_then_unprepare(task);
2641    }
2642
2643    #[test]
2644    fn trigger_from_action() {
2645        // Purpose: make sure an event triggered from a transition action doesn't block.
2646        gst::init().unwrap();
2647
2648        struct TaskFlushTest {
2649            obj: gst::Object,
2650            task: Task,
2651            flush_stop_sender: mpsc::Sender<()>,
2652        }
2653
2654        impl TaskImpl for TaskFlushTest {
2655            type Item = ();
2656
2657            fn obj(&self) -> &impl IsA<glib::Object> {
2658                &self.obj
2659            }
2660
2661            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2662                pending().await
2663            }
2664
2665            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2666                unreachable!("trigger_from_action: handle_item");
2667            }
2668
2669            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2670                gst::debug!(
2671                    RUNTIME_CAT,
2672                    "trigger_from_action: flush_start triggering flush_stop"
2673                );
2674                match self.task.flush_stop() {
2675                    Pending {
2676                        trigger: FlushStop,
2677                        origin: Started,
2678                        ..
2679                    } => (),
2680                    other => panic!("{other:?}"),
2681                }
2682
2683                Ok(())
2684            }
2685
2686            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2687                gst::debug!(RUNTIME_CAT, "trigger_from_action: stopped flushing");
2688                self.flush_stop_sender.send(()).await.unwrap();
2689                Ok(())
2690            }
2691        }
2692
2693        let context = Context::acquire("trigger_from_action", Duration::from_millis(2)).unwrap();
2694
2695        let task = Task::default();
2696
2697        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2698        let fut = task.prepare(
2699            TaskFlushTest {
2700                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2701                    .name("runtime::Task::trigger_from_action")
2702                    .build()
2703                    .into(),
2704                task: task.clone(),
2705                flush_stop_sender,
2706            },
2707            context,
2708        );
2709        drop(fut);
2710
2711        task.start().block_on().unwrap();
2712        let fut = task.flush_start();
2713        drop(fut);
2714
2715        gst::debug!(
2716            RUNTIME_CAT,
2717            "trigger_from_action: awaiting flush_stop notification"
2718        );
2719        block_on(flush_stop_receiver.next()).unwrap();
2720
2721        stop_then_unprepare(task);
2722    }
2723
2724    #[test]
2725    fn pause_flush_start() {
2726        gst::init().unwrap();
2727
2728        struct TaskFlushTest {
2729            obj: gst::Object,
2730            started_sender: mpsc::Sender<()>,
2731            flush_start_sender: mpsc::Sender<()>,
2732            flush_stop_sender: mpsc::Sender<()>,
2733        }
2734
2735        impl TaskImpl for TaskFlushTest {
2736            type Item = ();
2737
2738            fn obj(&self) -> &impl IsA<glib::Object> {
2739                &self.obj
2740            }
2741
2742            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2743                gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
2744                self.started_sender.send(()).await.unwrap();
2745                Ok(())
2746            }
2747
2748            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2749                pending().await
2750            }
2751
2752            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2753                unreachable!("pause_flush_start: handle_item");
2754            }
2755
2756            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2757                gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
2758                self.flush_start_sender.send(()).await.unwrap();
2759                Ok(())
2760            }
2761
2762            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2763                gst::debug!(RUNTIME_CAT, "pause_flush_start: stopped flushing");
2764                self.flush_stop_sender.send(()).await.unwrap();
2765                Ok(())
2766            }
2767        }
2768
2769        let context = Context::acquire("pause_flush_start", Duration::from_millis(2)).unwrap();
2770
2771        let task = Task::default();
2772
2773        let (started_sender, mut started_receiver) = mpsc::channel(1);
2774        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2775        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2776        let fut = task.prepare(
2777            TaskFlushTest {
2778                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2779                    .name("runtime::Task::pause_flush_start")
2780                    .build()
2781                    .into(),
2782                started_sender,
2783                flush_start_sender,
2784                flush_stop_sender,
2785            },
2786            context,
2787        );
2788        drop(fut);
2789
2790        // Pause, FlushStart, FlushStop, Start
2791
2792        gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing");
2793        assert_eq!(
2794            task.pause().block_on().unwrap(),
2795            Complete {
2796                origin: Prepared,
2797                target: Paused,
2798            },
2799        );
2800
2801        gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush");
2802        assert_eq!(
2803            task.flush_start().block_on().unwrap(),
2804            Complete {
2805                origin: Paused,
2806                target: PausedFlushing,
2807            },
2808        );
2809        assert_eq!(task.state(), PausedFlushing);
2810        block_on(flush_start_receiver.next());
2811
2812        gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
2813        assert_eq!(
2814            task.flush_stop().block_on().unwrap(),
2815            Complete {
2816                origin: PausedFlushing,
2817                target: Paused,
2818            },
2819        );
2820        assert_eq!(task.state(), Paused);
2821        block_on(flush_stop_receiver.next());
2822
2823        // start action not executed
2824        started_receiver.try_recv().unwrap_err();
2825
2826        gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing");
2827        assert_eq!(
2828            task.start().block_on().unwrap(),
2829            Complete {
2830                origin: Paused,
2831                target: Started,
2832            },
2833        );
2834        assert_eq!(task.state(), Started);
2835        block_on(started_receiver.next());
2836
2837        stop_then_unprepare(task);
2838    }
2839
2840    #[test]
2841    fn pause_flushing_start() {
2842        gst::init().unwrap();
2843
2844        struct TaskFlushTest {
2845            obj: gst::Object,
2846            started_sender: mpsc::Sender<()>,
2847            flush_start_sender: mpsc::Sender<()>,
2848            flush_stop_sender: mpsc::Sender<()>,
2849        }
2850
2851        impl TaskImpl for TaskFlushTest {
2852            type Item = ();
2853
2854            fn obj(&self) -> &impl IsA<glib::Object> {
2855                &self.obj
2856            }
2857
2858            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2859                gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
2860                self.started_sender.send(()).await.unwrap();
2861                Ok(())
2862            }
2863
2864            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2865                pending().await
2866            }
2867
2868            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2869                unreachable!("pause_flushing_start: handle_item");
2870            }
2871
2872            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2873                gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
2874                self.flush_start_sender.send(()).await.unwrap();
2875                Ok(())
2876            }
2877
2878            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2879                gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopped flushing");
2880                self.flush_stop_sender.send(()).await.unwrap();
2881                Ok(())
2882            }
2883        }
2884
2885        let context = Context::acquire("pause_flushing_start", Duration::from_millis(2)).unwrap();
2886
2887        let task = Task::default();
2888
2889        let (started_sender, mut started_receiver) = mpsc::channel(1);
2890        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2891        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2892        let fut = task.prepare(
2893            TaskFlushTest {
2894                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2895                    .name("runtime::Task::pause_flushing_start")
2896                    .build()
2897                    .into(),
2898                started_sender,
2899                flush_start_sender,
2900                flush_stop_sender,
2901            },
2902            context,
2903        );
2904        drop(fut);
2905
2906        // Pause, FlushStart, Start, FlushStop
2907
2908        gst::debug!(RUNTIME_CAT, "pause_flushing_start: pausing");
2909        let fut = task.pause();
2910        drop(fut);
2911
2912        gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting flush");
2913        block_on(task.flush_start()).unwrap();
2914        assert_eq!(task.state(), PausedFlushing);
2915        block_on(flush_start_receiver.next());
2916
2917        gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
2918        assert_eq!(
2919            task.start().block_on().unwrap(),
2920            Complete {
2921                origin: PausedFlushing,
2922                target: Flushing,
2923            },
2924        );
2925        assert_eq!(task.state(), Flushing);
2926
2927        // start action not executed
2928        started_receiver.try_recv().unwrap_err();
2929
2930        gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush");
2931        assert_eq!(
2932            task.flush_stop().block_on().unwrap(),
2933            Complete {
2934                origin: Flushing,
2935                target: Started,
2936            },
2937        );
2938        assert_eq!(task.state(), Started);
2939        block_on(flush_stop_receiver.next());
2940        block_on(started_receiver.next());
2941
2942        stop_then_unprepare(task);
2943    }
2944
2945    #[test]
2946    fn flush_concurrent_start() {
2947        // Purpose: check the racy case of start being triggered in // after flush_start
2948        // e.g.: a FlushStart event received on a Pad and the element starting after a Pause
2949        gst::init().unwrap();
2950
2951        struct TaskStartTest {
2952            obj: gst::Object,
2953            flush_start_sender: mpsc::Sender<()>,
2954            flush_stop_sender: mpsc::Sender<()>,
2955        }
2956
2957        impl TaskImpl for TaskStartTest {
2958            type Item = ();
2959
2960            fn obj(&self) -> &impl IsA<glib::Object> {
2961                &self.obj
2962            }
2963
2964            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2965                pending().await
2966            }
2967
2968            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2969                unreachable!("flush_concurrent_start: handle_item");
2970            }
2971
2972            async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2973                gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
2974                self.flush_start_sender.send(()).await.unwrap();
2975                Ok(())
2976            }
2977
2978            async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2979                gst::debug!(RUNTIME_CAT, "flush_concurrent_start: stopped flushing");
2980                self.flush_stop_sender.send(()).await.unwrap();
2981                Ok(())
2982            }
2983        }
2984
2985        let context = Context::acquire("flush_concurrent_start", Duration::from_millis(2)).unwrap();
2986
2987        let task = Task::default();
2988
2989        let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2990        let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2991        let fut = task.prepare(
2992            TaskStartTest {
2993                obj: gst::Pad::builder(gst::PadDirection::Unknown)
2994                    .name("runtime::Task::flush_concurrent_start")
2995                    .build()
2996                    .into(),
2997                flush_start_sender,
2998                flush_stop_sender,
2999            },
3000            context,
3001        );
3002        drop(fut);
3003
3004        let oob_context =
3005            Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap();
3006        let task_clone = task.clone();
3007
3008        block_on(task.pause()).unwrap();
3009
3010        // Launch flush_start // start
3011        let (ready_sender, ready_receiver) = oneshot::channel();
3012        gst::debug!(RUNTIME_CAT, "flush_concurrent_start: spawning flush_start");
3013        let flush_start_handle = oob_context.spawn(async move {
3014            gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start");
3015            ready_sender.send(()).unwrap();
3016            let status = task_clone.flush_start();
3017            match status {
3018                Pending {
3019                    trigger: FlushStart,
3020                    origin: Paused,
3021                    ..
3022                } => (),
3023                Pending {
3024                    trigger: FlushStart,
3025                    origin: Started,
3026                    ..
3027                } => (),
3028                other => panic!("{other:?}"),
3029            };
3030            status.await.unwrap();
3031            flush_start_receiver.next().await.unwrap();
3032        });
3033
3034        gst::debug!(
3035            RUNTIME_CAT,
3036            "flush_concurrent_start: awaiting for oob_context"
3037        );
3038        block_on(ready_receiver).unwrap();
3039
3040        gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
3041        match block_on(task.start()) {
3042            Ok(Complete {
3043                origin: Paused,
3044                target: Started,
3045            }) => (),
3046            Ok(Complete {
3047                origin: PausedFlushing,
3048                target: Flushing,
3049            }) => (),
3050            other => panic!("{other:?}"),
3051        }
3052
3053        block_on(flush_start_handle).unwrap();
3054
3055        gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
3056        assert_eq!(
3057            task.flush_stop().block_on().unwrap(),
3058            Complete {
3059                origin: Flushing,
3060                target: Started,
3061            },
3062        );
3063        assert_eq!(task.state(), Started);
3064        block_on(flush_stop_receiver.next());
3065
3066        stop_then_unprepare(task);
3067    }
3068
3069    #[test]
3070    fn start_timer() {
3071        use crate::runtime::timer;
3072
3073        // Purpose: make sure a Timer initialized in a transition is
3074        // available when iterating in the loop.
3075        gst::init().unwrap();
3076
3077        struct TaskTimerTest {
3078            obj: gst::Object,
3079            timer: Option<timer::Oneshot>,
3080            timer_elapsed_sender: Option<oneshot::Sender<()>>,
3081        }
3082
3083        impl TaskImpl for TaskTimerTest {
3084            type Item = ();
3085
3086            fn obj(&self) -> &impl IsA<glib::Object> {
3087                &self.obj
3088            }
3089
3090            async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
3091                self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
3092                gst::debug!(RUNTIME_CAT, "start_timer: started");
3093                Ok(())
3094            }
3095
3096            async fn try_next(&mut self) -> Result<(), gst::FlowError> {
3097                gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer");
3098                self.timer.take().unwrap().await;
3099                Ok(())
3100            }
3101
3102            async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
3103                gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
3104                if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() {
3105                    timer_elapsed_sender.send(()).unwrap();
3106                }
3107
3108                Err(gst::FlowError::Eos)
3109            }
3110        }
3111
3112        let context = Context::acquire("start_timer", Duration::from_millis(2)).unwrap();
3113
3114        let task = Task::default();
3115
3116        let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel();
3117        let fut = task.prepare(
3118            TaskTimerTest {
3119                obj: gst::Pad::builder(gst::PadDirection::Unknown)
3120                    .name("runtime::Task::start_timer")
3121                    .build()
3122                    .into(),
3123                timer: None,
3124                timer_elapsed_sender: Some(timer_elapsed_sender),
3125            },
3126            context,
3127        );
3128        drop(fut);
3129
3130        gst::debug!(RUNTIME_CAT, "start_timer: start");
3131        let fut = task.start();
3132        drop(fut);
3133
3134        block_on(timer_elapsed_receiver).unwrap();
3135        gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed received");
3136
3137        stop_then_unprepare(task);
3138    }
3139}