1use futures::channel::mpsc as async_mpsc;
13use futures::channel::oneshot;
14use futures::prelude::*;
15
16use std::fmt;
17use std::ops::Deref;
18use std::pin::{Pin, pin};
19use std::sync::{Arc, Mutex, MutexGuard};
20use std::task::Poll;
21
22use gst::glib;
23use gst::glib::prelude::*;
24
25use super::{Context, JoinHandle, RUNTIME_CAT};
26
27#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
28pub enum TaskState {
29 Error,
30 Flushing,
31 Paused,
32 PausedFlushing,
33 Prepared,
34 Preparing,
35 Started,
36 Stopped,
37 Unprepared,
38}
39
40#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub enum Trigger {
42 Error,
43 FlushStart,
44 FlushStop,
45 Pause,
46 Prepare,
47 Start,
48 Stop,
49 Unprepare,
50}
51
52#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub enum TransitionOk {
55 Complete {
57 origin: TaskState,
58 target: TaskState,
59 },
60 NotWaiting { trigger: Trigger, origin: TaskState },
67 Skipped { trigger: Trigger, state: TaskState },
69}
70
71#[derive(Clone, Debug, Eq, PartialEq)]
73pub struct TransitionError {
74 pub trigger: Trigger,
75 pub state: TaskState,
76 pub err_msg: gst::ErrorMessage,
77}
78
79impl fmt::Display for TransitionError {
80 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81 write!(
82 f,
83 "{:?} from state {:?}: {:?}",
84 self.trigger, self.state, self.err_msg
85 )
86 }
87}
88
89impl std::error::Error for TransitionError {}
90
91impl From<TransitionError> for gst::ErrorMessage {
92 fn from(err: TransitionError) -> Self {
93 err.err_msg
94 }
95}
96
97#[must_use = "This `TransitionStatus` may be `Pending`. In most cases it should be awaited. See `block_on_or_add_subtask_then`"]
103pub enum TransitionStatus {
104 Ready(Result<TransitionOk, TransitionError>),
106 Pending {
108 trigger: Trigger,
109 origin: TaskState,
110 res_fut: Pin<Box<dyn Future<Output = Result<TransitionOk, TransitionError>> + Send>>,
111 },
112}
113
114impl TransitionStatus {
115 pub fn is_ready(&self) -> bool {
116 matches!(self, TransitionStatus::Ready { .. })
117 }
118
119 pub fn is_pending(&self) -> bool {
120 matches!(self, TransitionStatus::Pending { .. })
121 }
122
123 pub fn check(self) -> Result<TransitionStatus, TransitionError> {
131 match self {
132 TransitionStatus::Ready(Err(err)) => Err(err),
133 other => Ok(other),
134 }
135 }
136
137 pub fn block_on_or_add_subtask<O>(self, obj: &O) -> Result<TransitionOk, TransitionError>
191 where
192 O: IsA<glib::Object> + Send,
193 {
194 use TransitionStatus::*;
195 match self {
196 Pending {
197 trigger,
198 origin,
199 res_fut,
200 } => match Context::current_task() {
201 Some((ctx, task_id)) => {
202 gst::debug!(
203 RUNTIME_CAT,
204 obj = obj,
205 "Awaiting for {trigger:?} ack in a subtask on context {}",
206 ctx.name()
207 );
208
209 let obj = obj.clone();
210 let _ = ctx.add_sub_task(task_id, async move {
211 let res = res_fut.await;
212 match res {
213 Ok(status) => {
214 gst::log!(
215 RUNTIME_CAT,
216 obj = obj,
217 "Task {trigger:?} success: {status:?}",
218 );
219 }
220 Err(err) => {
221 gst::error!(
222 RUNTIME_CAT,
223 obj = obj,
224 "Task {trigger:?} failure: {err}"
225 );
226 }
227 }
228
229 Ok(())
230 });
231
232 Ok(TransitionOk::NotWaiting { trigger, origin })
233 }
234 _ => {
235 gst::debug!(
236 RUNTIME_CAT,
237 obj = obj,
238 "Awaiting for {trigger:?} ack on current thread",
239 );
240 let res = futures::executor::block_on(res_fut);
241 match res {
242 Ok(ref status) => {
243 gst::log!(
244 RUNTIME_CAT,
245 obj = obj,
246 "Task {trigger:?} success: {status:?}",
247 );
248 }
249 Err(ref err) => {
250 gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}");
251 }
252 }
253
254 res
255 }
256 },
257 Ready(res) => {
258 match res {
259 Ok(ref status) => {
260 gst::log!(
261 RUNTIME_CAT,
262 obj = obj,
263 "Task transition immediate success: {status:?}",
264 );
265 }
266 Err(ref err) => {
267 gst::error!(
268 RUNTIME_CAT,
269 obj = obj,
270 "Task transition immediate failure: {err}",
271 );
272 }
273 }
274
275 res
276 }
277 }
278 }
279
280 pub fn block_on_or_add_subtask_then<T, F>(
331 self,
332 obj: glib::BorrowedObject<'_, T>,
333 func: F,
334 ) -> Result<(), gst::ErrorMessage>
335 where
336 T: IsA<glib::Object> + Send,
337 F: FnOnce(&T, &Result<TransitionOk, TransitionError>) + Send + 'static,
338 {
339 use TransitionStatus::*;
340 match self {
341 Pending {
342 trigger, res_fut, ..
343 } => match Context::current_task() {
344 Some((ctx, task_id)) => {
345 gst::debug!(
346 RUNTIME_CAT,
347 obj = obj,
348 "Awaiting for {trigger:?} ack in a subtask on context {}",
349 ctx.name()
350 );
351 let obj = obj.clone();
352 let _ = ctx.add_sub_task(task_id, async move {
353 let res = res_fut.await;
354 match res {
355 Ok(ref status) => {
356 gst::log!(
357 RUNTIME_CAT,
358 obj = obj,
359 "Task {trigger:?} success: {status:?}",
360 );
361 func(&obj, &res);
362 Ok(())
363 }
364 Err(ref err) => {
365 gst::error!(
366 RUNTIME_CAT,
367 obj = obj,
368 "Task {trigger:?} failure: {err}",
369 );
370 func(&obj, &res);
371 Err(gst::FlowError::Error)
372 }
373 }
374 });
375
376 Ok(())
377 }
378 _ => {
379 gst::debug!(
380 RUNTIME_CAT,
381 obj = obj,
382 "Awaiting for {trigger:?} ack on current thread",
383 );
384 let res = futures::executor::block_on(res_fut);
385 match res {
386 Ok(ref status) => {
387 gst::log!(
388 RUNTIME_CAT,
389 obj = obj,
390 "Task {trigger:?} success: {status:?}",
391 );
392 func(&obj, &res);
393 Ok(())
394 }
395 Err(ref err) => {
396 gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}",);
397 func(&obj, &res);
398 res.map(|_| ()).map_err(|err| err.into())
399 }
400 }
401 }
402 },
403 Ready(res) => match res {
404 Ok(ref status) => {
405 gst::log!(
406 RUNTIME_CAT,
407 obj = obj,
408 "Task transition immediate success: {status:?}",
409 );
410 func(&obj, &res);
411 Ok(())
412 }
413 Err(ref err) => {
414 gst::error!(
415 RUNTIME_CAT,
416 obj = obj,
417 "Task transition immediate failure: {err}",
418 );
419 func(&obj, &res);
420 res.map(|_| ()).map_err(|err| err.into())
421 }
422 },
423 }
424 }
425}
426
427impl Future for TransitionStatus {
428 type Output = Result<TransitionOk, TransitionError>;
429
430 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
431 use TransitionStatus::*;
432
433 match &mut *self {
434 Ready(res) => Poll::Ready(res.clone()),
435 Pending { res_fut, .. } => match Pin::new(res_fut).poll(cx) {
436 Poll::Pending => Poll::Pending,
437 Poll::Ready(res) => {
438 *self = Ready(res.clone());
439
440 Poll::Ready(res)
441 }
442 },
443 }
444 }
445}
446
447impl From<TransitionOk> for TransitionStatus {
448 fn from(ok: TransitionOk) -> Self {
449 Self::Ready(Ok(ok))
450 }
451}
452
453impl From<TransitionError> for TransitionStatus {
454 fn from(err: TransitionError) -> Self {
455 Self::Ready(Err(err))
456 }
457}
458
459impl fmt::Debug for TransitionStatus {
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 use TransitionStatus::*;
463 match self {
464 Ready(res) => f.debug_tuple("Ready").field(res).finish(),
465 Pending {
466 trigger, origin, ..
467 } => f
468 .debug_struct("Pending")
469 .field("trigger", trigger)
470 .field("origin", origin)
471 .finish(),
472 }
473 }
474}
475
476pub trait TaskImpl: Send + 'static {
480 type Item: Send + 'static;
481
482 fn obj(&self) -> &impl IsA<glib::Object>;
483
484 fn prepare(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
485 future::ok(())
486 }
487
488 fn unprepare(&mut self) -> impl Future<Output = ()> + Send {
489 future::ready(())
490 }
491
492 fn start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
493 future::ok(())
494 }
495
496 fn try_next(&mut self) -> impl Future<Output = Result<Self::Item, gst::FlowError>> + Send;
509
510 fn handle_item(
521 &mut self,
522 _item: Self::Item,
523 ) -> impl Future<Output = Result<(), gst::FlowError>> + Send;
524
525 fn pause(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
526 future::ok(())
527 }
528
529 fn flush_start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
530 future::ready(Ok(()))
531 }
532
533 fn flush_stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
534 future::ready(Ok(()))
535 }
536
537 fn stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
538 future::ready(Ok(()))
539 }
540
541 fn handle_loop_error(&mut self, err: gst::FlowError) -> impl Future<Output = Trigger> + Send {
556 async move {
557 match err {
558 gst::FlowError::Flushing => {
559 gst::debug!(
560 RUNTIME_CAT,
561 obj = self.obj(),
562 "Task loop returned Flushing. Posting FlushStart"
563 );
564 Trigger::FlushStart
565 }
566 gst::FlowError::Eos => {
567 gst::debug!(
568 RUNTIME_CAT,
569 obj = self.obj(),
570 "Task loop returned Eos. Posting Stop"
571 );
572 Trigger::Stop
573 }
574 other => {
575 gst::error!(
576 RUNTIME_CAT,
577 obj = self.obj(),
578 "Task loop returned {other:?}. Posting Error",
579 );
580 Trigger::Error
581 }
582 }
583 }
584 }
585
586 fn handle_action_error(
597 &mut self,
598 trigger: Trigger,
599 state: TaskState,
600 err: gst::ErrorMessage,
601 ) -> impl Future<Output = Trigger> + Send {
602 async move {
603 gst::error!(
604 RUNTIME_CAT,
605 obj = self.obj(),
606 "TaskImpl transition action error during {trigger:?} from {state:?}: {err:?}. Posting Trigger::Error",
607 );
608
609 Trigger::Error
610 }
611 }
612}
613
614type AckSender = oneshot::Sender<Result<TransitionOk, TransitionError>>;
615type AckReceiver = oneshot::Receiver<Result<TransitionOk, TransitionError>>;
616
617struct TriggeringEvent {
618 trigger: Trigger,
619 ack_tx: AckSender,
620}
621
622impl TriggeringEvent {
623 fn new(trigger: Trigger) -> (Self, AckReceiver) {
624 let (ack_tx, ack_rx) = oneshot::channel();
625 let req = TriggeringEvent { trigger, ack_tx };
626
627 (req, ack_rx)
628 }
629
630 fn send_ack(self, res: Result<TransitionOk, TransitionError>) {
631 let _ = self.ack_tx.send(res);
632 }
633
634 fn send_err_ack(self) {
635 let res = Err(TransitionError {
636 trigger: self.trigger,
637 state: TaskState::Error,
638 err_msg: gst::error_msg!(
639 gst::CoreError::StateChange,
640 [
641 "Triggering Event {:?} rejected due to a previous unrecoverable error",
642 self.trigger,
643 ]
644 ),
645 });
646
647 self.send_ack(res);
648 }
649}
650
651impl fmt::Debug for TriggeringEvent {
652 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
653 f.debug_struct("TriggeringEvent")
654 .field("trigger", &self.trigger)
655 .finish()
656 }
657}
658
659#[derive(Debug)]
660struct StateMachineHandle {
661 join_handle: JoinHandle<()>,
662 triggering_evt_tx: async_mpsc::Sender<TriggeringEvent>,
663 context: Context,
664}
665
666impl StateMachineHandle {
667 fn trigger(&mut self, trigger: Trigger) -> AckReceiver {
668 let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
669
670 gst::log!(RUNTIME_CAT, "Pushing {triggering_evt:?}");
671 self.triggering_evt_tx.try_send(triggering_evt).unwrap();
672
673 self.context.unpark();
674
675 ack_rx
676 }
677
678 async fn join(self) {
679 self.join_handle
680 .await
681 .expect("state machine shouldn't have been cancelled");
682 }
683}
684
685#[derive(Debug)]
686struct TaskInner {
687 state: TaskState,
688 state_machine_handle: Option<StateMachineHandle>,
689}
690
691impl Default for TaskInner {
692 fn default() -> Self {
693 TaskInner {
694 state: TaskState::Unprepared,
695 state_machine_handle: None,
696 }
697 }
698}
699
700impl TaskInner {
701 fn switch_to_state(&mut self, target_state: TaskState, triggering_evt: TriggeringEvent) {
702 let res = Ok(TransitionOk::Complete {
703 origin: self.state,
704 target: target_state,
705 });
706
707 self.state = target_state;
708 triggering_evt.send_ack(res);
709 }
710
711 fn switch_to_err(&mut self, triggering_evt: TriggeringEvent) {
712 let res = Err(TransitionError {
713 trigger: triggering_evt.trigger,
714 state: self.state,
715 err_msg: gst::error_msg!(
716 gst::CoreError::StateChange,
717 [
718 "Unrecoverable error for {triggering_evt:?} from state {:?}",
719 self.state,
720 ]
721 ),
722 });
723
724 self.state = TaskState::Error;
725 triggering_evt.send_ack(res);
726 }
727
728 fn skip_triggering_evt(&mut self, triggering_evt: TriggeringEvent) {
729 let res = Ok(TransitionOk::Skipped {
730 trigger: triggering_evt.trigger,
731 state: self.state,
732 });
733
734 triggering_evt.send_ack(res);
735 }
736
737 fn trigger(&mut self, trigger: Trigger) -> Result<AckReceiver, TransitionError> {
738 self.state_machine_handle
739 .as_mut()
740 .map(|state_machine| state_machine.trigger(trigger))
741 .ok_or_else(|| {
742 gst::warning!(RUNTIME_CAT, "Unable to send {trigger:?}: no state machine",);
743 TransitionError {
744 trigger,
745 state: TaskState::Unprepared,
746 err_msg: gst::error_msg!(
747 gst::ResourceError::NotFound,
748 ["Unable to send {trigger:?}: no state machine"]
749 ),
750 }
751 })
752 }
753}
754
755impl Drop for TaskInner {
756 fn drop(&mut self) {
757 if self.state != TaskState::Unprepared {
758 gst::fixme!(RUNTIME_CAT, "Missing call to `Task::unprepare`");
762 }
763 }
764}
765
766pub struct TaskStateGuard<'guard>(MutexGuard<'guard, TaskInner>);
768
769impl Deref for TaskStateGuard<'_> {
770 type Target = TaskState;
771
772 fn deref(&self) -> &Self::Target {
773 &(self.0).state
774 }
775}
776
777#[derive(Debug, Clone)]
781pub struct Task(Arc<Mutex<TaskInner>>);
782
783impl Default for Task {
784 fn default() -> Self {
785 Task(Arc::new(Mutex::new(TaskInner::default())))
786 }
787}
788
789impl Task {
790 pub fn state(&self) -> TaskState {
791 self.0.lock().unwrap().state
792 }
793
794 pub fn lock_state(&self) -> TaskStateGuard<'_> {
795 TaskStateGuard(self.0.lock().unwrap())
796 }
797
798 pub fn prepare(&self, task_impl: impl TaskImpl, context: Context) -> TransitionStatus {
799 let mut inner = self.0.lock().unwrap();
800
801 let origin = inner.state;
802 match origin {
803 TaskState::Unprepared => (),
804 TaskState::Prepared | TaskState::Preparing => {
805 gst::debug!(
806 RUNTIME_CAT,
807 obj = task_impl.obj(),
808 "Task already {origin:?}",
809 );
810 return TransitionOk::Skipped {
811 trigger: Trigger::Prepare,
812 state: origin,
813 }
814 .into();
815 }
816 state => {
817 gst::warning!(
818 RUNTIME_CAT,
819 obj = task_impl.obj(),
820 "Attempt to prepare Task in state {state:?}"
821 );
822 return TransitionError {
823 trigger: Trigger::Prepare,
824 state: inner.state,
825 err_msg: gst::error_msg!(
826 gst::CoreError::StateChange,
827 ["Attempt to prepare Task in state {state:?}"]
828 ),
829 }
830 .into();
831 }
832 }
833
834 assert!(inner.state_machine_handle.is_none());
835
836 inner.state = TaskState::Preparing;
837
838 gst::log!(
839 RUNTIME_CAT,
840 obj = task_impl.obj(),
841 "Spawning task state machine"
842 );
843 inner.state_machine_handle = Some(StateMachine::spawn(self.0.clone(), task_impl, context));
844
845 let ack_rx = match inner.trigger(Trigger::Prepare) {
846 Ok(ack_rx) => ack_rx,
847 Err(err) => return err.into(),
848 };
849 drop(inner);
850
851 TransitionStatus::Pending {
852 trigger: Trigger::Prepare,
853 origin: TaskState::Unprepared,
854 res_fut: Box::pin(ack_rx.map(Result::unwrap)),
855 }
856 }
857
858 pub fn unprepare(&self) -> TransitionStatus {
859 let mut inner = self.0.lock().unwrap();
860
861 let origin = inner.state;
862 let mut state_machine_handle = match origin {
863 TaskState::Stopped
864 | TaskState::Error
865 | TaskState::Prepared
866 | TaskState::Preparing
867 | TaskState::Unprepared => match inner.state_machine_handle.take() {
868 Some(state_machine_handle) => {
869 gst::debug!(RUNTIME_CAT, "Unpreparing task");
870
871 state_machine_handle
872 }
873 None => {
874 gst::debug!(RUNTIME_CAT, "Task already unpreparing");
875 return TransitionOk::Skipped {
876 trigger: Trigger::Unprepare,
877 state: origin,
878 }
879 .into();
880 }
881 },
882 state => {
883 gst::warning!(RUNTIME_CAT, "Attempt to unprepare Task in state {state:?}");
884 return TransitionError {
885 trigger: Trigger::Unprepare,
886 state: inner.state,
887 err_msg: gst::error_msg!(
888 gst::CoreError::StateChange,
889 ["Attempt to unprepare Task in state {state:?}"]
890 ),
891 }
892 .into();
893 }
894 };
895
896 let ack_rx = state_machine_handle.trigger(Trigger::Unprepare);
897 drop(inner);
898
899 let state_machine_end_fut = async {
900 state_machine_handle.join().await;
901 ack_rx.await.unwrap()
902 };
903
904 TransitionStatus::Pending {
905 trigger: Trigger::Unprepare,
906 origin,
907 res_fut: Box::pin(state_machine_end_fut),
908 }
909 }
910
911 pub fn start(&self) -> TransitionStatus {
915 let mut inner = self.0.lock().unwrap();
916
917 if let TaskState::Started = inner.state {
918 return TransitionOk::Skipped {
919 trigger: Trigger::Start,
920 state: TaskState::Started,
921 }
922 .into();
923 }
924
925 let ack_rx = match inner.trigger(Trigger::Start) {
926 Ok(ack_rx) => ack_rx,
927 Err(err) => return err.into(),
928 };
929
930 let origin = inner.state;
931 drop(inner);
932
933 TransitionStatus::Pending {
934 trigger: Trigger::Start,
935 origin,
936 res_fut: Box::pin(ack_rx.map(Result::unwrap)),
937 }
938 }
939
940 pub fn pause(&self) -> TransitionStatus {
945 self.push_pending(Trigger::Pause)
946 }
947
948 pub fn flush_start(&self) -> TransitionStatus {
949 self.push_pending(Trigger::FlushStart)
950 }
951
952 pub fn flush_stop(&self) -> TransitionStatus {
953 self.push_pending(Trigger::FlushStop)
954 }
955
956 pub fn stop(&self) -> TransitionStatus {
958 self.push_pending(Trigger::Stop)
959 }
960
961 fn push_pending(&self, trigger: Trigger) -> TransitionStatus {
963 let mut inner = self.0.lock().unwrap();
964
965 let ack_rx = match inner.trigger(trigger) {
966 Ok(ack_rx) => ack_rx,
967 Err(err) => return err.into(),
968 };
969
970 let origin = inner.state;
971 drop(inner);
972
973 TransitionStatus::Pending {
974 trigger,
975 origin,
976 res_fut: Box::pin(ack_rx.map(Result::unwrap)),
977 }
978 }
979}
980
981struct StateMachine<Task: TaskImpl> {
982 task_impl: Task,
983 triggering_evt_rx: async_mpsc::Receiver<TriggeringEvent>,
984 pending_triggering_evt: Option<TriggeringEvent>,
985}
986
987macro_rules! exec_action {
988 ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
989 match $self.task_impl.$action().await {
990 Ok(()) => Ok($triggering_evt),
991 Err(err) => {
992 let next_trigger = $self
996 .task_impl
997 .handle_action_error($triggering_evt.trigger, $origin, err)
998 .await;
999
1000 gst::trace!(
1002 RUNTIME_CAT,
1003 obj = $self.task_impl.obj(),
1004 "TaskImpl transition action error: converting {:?} to {next_trigger:?}",
1005 $triggering_evt.trigger,
1006 );
1007
1008 $triggering_evt.trigger = next_trigger;
1009 $self.pending_triggering_evt = Some($triggering_evt);
1010
1011 Err(())
1012 }
1013 }
1014 }};
1015}
1016
1017impl<Task: TaskImpl> StateMachine<Task> {
1018 fn spawn(
1019 task_inner: Arc<Mutex<TaskInner>>,
1020 task_impl: Task,
1021 context: Context,
1022 ) -> StateMachineHandle {
1023 let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
1024
1025 let state_machine = StateMachine {
1026 task_impl,
1027 triggering_evt_rx,
1028 pending_triggering_evt: None,
1029 };
1030
1031 StateMachineHandle {
1032 join_handle: context.spawn_and_unpark(state_machine.run(task_inner)),
1033 triggering_evt_tx,
1034 context,
1035 }
1036 }
1037
1038 async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>) {
1039 let mut triggering_evt = self
1040 .triggering_evt_rx
1041 .next()
1042 .await
1043 .expect("triggering_evt_rx dropped");
1044
1045 if let Trigger::Prepare = triggering_evt.trigger {
1046 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Preparing task");
1047
1048 let res = exec_action!(
1049 self,
1050 prepare,
1051 triggering_evt,
1052 TaskState::Unprepared,
1053 &task_inner
1054 );
1055 if let Ok(triggering_evt) = res {
1056 let mut task_inner = task_inner.lock().unwrap();
1057 let res = Ok(TransitionOk::Complete {
1058 origin: TaskState::Unprepared,
1059 target: TaskState::Prepared,
1060 });
1061
1062 task_inner.state = TaskState::Prepared;
1063 triggering_evt.send_ack(res);
1064
1065 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task Prepared");
1066 }
1067 } else {
1068 panic!("Unexpected initial trigger {:?}", triggering_evt.trigger);
1069 }
1070
1071 loop {
1072 triggering_evt = match self.pending_triggering_evt.take() {
1073 Some(pending_triggering_evt) => pending_triggering_evt,
1074 None => self
1075 .triggering_evt_rx
1076 .next()
1077 .await
1078 .expect("triggering_evt_rx dropped"),
1079 };
1080 gst::trace!(
1081 RUNTIME_CAT,
1082 obj = self.task_impl.obj(),
1083 "State machine popped {triggering_evt:?}"
1084 );
1085
1086 match triggering_evt.trigger {
1087 Trigger::Error => {
1088 let mut task_inner = task_inner.lock().unwrap();
1089 task_inner.switch_to_err(triggering_evt);
1090 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Switched to Error");
1091 }
1092 Trigger::Start => {
1093 let origin = {
1094 let mut task_inner = task_inner.lock().unwrap();
1095 let origin = task_inner.state;
1096 match origin {
1097 TaskState::Stopped | TaskState::Paused | TaskState::Prepared => (),
1098 TaskState::PausedFlushing => {
1099 task_inner.switch_to_state(TaskState::Flushing, triggering_evt);
1100 gst::trace!(
1101 RUNTIME_CAT,
1102 obj = self.task_impl.obj(),
1103 "Switched from PausedFlushing to Flushing"
1104 );
1105 continue;
1106 }
1107 TaskState::Error => {
1108 triggering_evt.send_err_ack();
1109 continue;
1110 }
1111 state => {
1112 task_inner.skip_triggering_evt(triggering_evt);
1113 gst::trace!(
1114 RUNTIME_CAT,
1115 obj = self.task_impl.obj(),
1116 "Skipped Start in state {state:?}"
1117 );
1118 continue;
1119 }
1120 }
1121
1122 origin
1123 };
1124
1125 self.start(triggering_evt, origin, &task_inner).await;
1126 }
1128 Trigger::Pause => {
1129 let (origin, target) = {
1130 let mut task_inner = task_inner.lock().unwrap();
1131 let origin = task_inner.state;
1132 match origin {
1133 TaskState::Started | TaskState::Stopped | TaskState::Prepared => {
1134 (origin, TaskState::Paused)
1135 }
1136 TaskState::Flushing => (origin, TaskState::PausedFlushing),
1137 TaskState::Error => (TaskState::Error, TaskState::Error),
1138 state => {
1139 task_inner.skip_triggering_evt(triggering_evt);
1140 gst::trace!(
1141 RUNTIME_CAT,
1142 obj = self.task_impl.obj(),
1143 "Skipped Pause in state {state:?}"
1144 );
1145 continue;
1146 }
1147 }
1148 };
1149
1150 let res = exec_action!(self, pause, triggering_evt, origin, &task_inner);
1151 if let Ok(triggering_evt) = res {
1152 task_inner
1153 .lock()
1154 .unwrap()
1155 .switch_to_state(target, triggering_evt);
1156 gst::trace!(
1157 RUNTIME_CAT,
1158 obj = self.task_impl.obj(),
1159 "Task loop {target:?}"
1160 );
1161 }
1162 }
1163 Trigger::Stop => {
1164 let (origin, target) = {
1165 let mut task_inner = task_inner.lock().unwrap();
1166 let origin = task_inner.state;
1167 match origin {
1168 TaskState::Started
1169 | TaskState::Paused
1170 | TaskState::PausedFlushing
1171 | TaskState::Flushing => (origin, TaskState::Stopped),
1172 TaskState::Error => (TaskState::Error, TaskState::Error),
1173 state => {
1174 task_inner.skip_triggering_evt(triggering_evt);
1175 gst::trace!(
1176 RUNTIME_CAT,
1177 obj = self.task_impl.obj(),
1178 "Skipped Stop in state {state:?}"
1179 );
1180 continue;
1181 }
1182 }
1183 };
1184
1185 let res = exec_action!(self, stop, triggering_evt, origin, &task_inner);
1186 if let Ok(triggering_evt) = res {
1187 task_inner
1188 .lock()
1189 .unwrap()
1190 .switch_to_state(target, triggering_evt);
1191 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1192 }
1193 }
1194 Trigger::FlushStart => {
1195 let (origin, target) = {
1196 let mut task_inner = task_inner.lock().unwrap();
1197 let origin = task_inner.state;
1198 match origin {
1199 TaskState::Started => (origin, TaskState::Flushing),
1200 TaskState::Paused => (origin, TaskState::PausedFlushing),
1201 TaskState::Error => (TaskState::Error, TaskState::Error),
1202 state => {
1203 task_inner.skip_triggering_evt(triggering_evt);
1204 gst::trace!(
1205 RUNTIME_CAT,
1206 obj = self.task_impl.obj(),
1207 "Skipped FlushStart in state {state:?}"
1208 );
1209 continue;
1210 }
1211 }
1212 };
1213
1214 let res = exec_action!(self, flush_start, triggering_evt, origin, &task_inner);
1215 if let Ok(triggering_evt) = res {
1216 task_inner
1217 .lock()
1218 .unwrap()
1219 .switch_to_state(target, triggering_evt);
1220 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1221 }
1222 }
1223 Trigger::FlushStop => {
1224 let origin = task_inner.lock().unwrap().state;
1225 let is_paused = match origin {
1226 TaskState::Flushing => false,
1227 TaskState::PausedFlushing => true,
1228 TaskState::Error => {
1229 triggering_evt.send_err_ack();
1230 continue;
1231 }
1232 state => {
1233 task_inner
1234 .lock()
1235 .unwrap()
1236 .skip_triggering_evt(triggering_evt);
1237 gst::trace!(
1238 RUNTIME_CAT,
1239 obj = self.task_impl.obj(),
1240 "Skipped FlushStop in state {state:?}"
1241 );
1242 continue;
1243 }
1244 };
1245
1246 let res = exec_action!(self, flush_stop, triggering_evt, origin, &task_inner);
1247 if let Ok(triggering_evt) = res {
1248 if is_paused {
1249 task_inner
1250 .lock()
1251 .unwrap()
1252 .switch_to_state(TaskState::Paused, triggering_evt);
1253 gst::trace!(
1254 RUNTIME_CAT,
1255 obj = self.task_impl.obj(),
1256 "Switched from PausedFlushing to Paused"
1257 );
1258 } else {
1259 self.start(triggering_evt, origin, &task_inner).await;
1260 }
1262 }
1263 }
1264 Trigger::Unprepare => {
1265 self.task_impl.unprepare().await;
1267
1268 task_inner
1269 .lock()
1270 .unwrap()
1271 .switch_to_state(TaskState::Unprepared, triggering_evt);
1272
1273 break;
1274 }
1275 _ => unreachable!("State machine handler {:?}", triggering_evt),
1276 }
1277 }
1278
1279 gst::trace!(
1280 RUNTIME_CAT,
1281 obj = self.task_impl.obj(),
1282 "Task state machine terminated"
1283 );
1284 }
1285
1286 async fn start(
1287 &mut self,
1288 mut triggering_evt: TriggeringEvent,
1289 origin: TaskState,
1290 task_inner: &Arc<Mutex<TaskInner>>,
1291 ) {
1292 match exec_action!(self, start, triggering_evt, origin, &task_inner) {
1293 Ok(triggering_evt) => {
1294 let mut task_inner = task_inner.lock().unwrap();
1295 task_inner.switch_to_state(TaskState::Started, triggering_evt);
1296 }
1297 Err(_) => {
1298 return;
1300 }
1301 }
1302
1303 match self.run_loop().await {
1304 Ok(()) => (),
1305 Err(err) => {
1306 let next_trigger = self.task_impl.handle_loop_error(err).await;
1307 let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
1308 self.pending_triggering_evt = Some(triggering_evt);
1309 }
1310 }
1311 }
1312
1313 async fn run_loop(&mut self) -> Result<(), gst::FlowError> {
1314 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task loop started");
1315
1316 let mut try_next_res;
1317 loop {
1318 try_next_res = {
1319 let mut try_next_fut = pin!(self.task_impl.try_next().fuse());
1326 futures::select_biased! {
1327 triggering_evt = self.triggering_evt_rx.next() => {
1328 let triggering_evt = triggering_evt.expect("broken state machine channel");
1329 gst::trace!(
1330 RUNTIME_CAT,
1331 "Task loop handing {:?} to state machine",
1332 triggering_evt,
1333 );
1334 self.pending_triggering_evt = Some(triggering_evt);
1335 return Ok(());
1336 }
1337 try_next_res = try_next_fut => try_next_res,
1338 }
1339 };
1340
1341 let item = try_next_res.inspect_err(|err| {
1342 gst::debug!(
1343 RUNTIME_CAT,
1344 obj = self.task_impl.obj(),
1345 "TaskImpl::try_next returned {err:?}"
1346 );
1347 })?;
1348
1349 self.task_impl.handle_item(item).await.inspect_err(|&err| {
1350 gst::debug!(
1351 RUNTIME_CAT,
1352 obj = self.task_impl.obj(),
1353 "TaskImpl::handle_item returned {err:?}"
1354 );
1355 })?;
1356 }
1357 }
1358}
1359
1360#[cfg(test)]
1361mod tests {
1362 use futures::channel::{mpsc, oneshot};
1363 use futures::executor::block_on;
1364 use futures::prelude::*;
1365 use gst::glib;
1366 use gst::glib::prelude::*;
1367 use std::future::pending;
1368 use std::time::Duration;
1369
1370 use super::{
1371 Task, TaskImpl,
1372 TaskState::{self, *},
1373 TransitionError, TransitionOk,
1374 TransitionOk::*,
1375 TransitionStatus,
1376 TransitionStatus::*,
1377 Trigger::{self, *},
1378 };
1379 use crate::runtime::{Context, RUNTIME_CAT};
1380
1381 impl TransitionStatus {
1382 fn block_on(self) -> Result<TransitionOk, TransitionError> {
1385 assert!(!Context::is_context_thread());
1386 match self {
1387 Pending {
1388 trigger, res_fut, ..
1389 } => {
1390 gst::debug!(
1391 RUNTIME_CAT,
1392 "Awaiting for {:?} ack on current thread",
1393 trigger,
1394 );
1395 futures::executor::block_on(res_fut)
1396 }
1397 Ready(res) => res,
1398 }
1399 }
1400 }
1401
1402 #[track_caller]
1403 fn stop_then_unprepare(task: Task) {
1404 task.stop().block_on().unwrap();
1405 task.unprepare().block_on().unwrap();
1406 }
1407
1408 #[test]
1409 fn nominal() {
1410 gst::init().unwrap();
1411
1412 struct TaskTest {
1413 obj: gst::Object,
1414 prepared_sender: mpsc::Sender<()>,
1415 started_sender: mpsc::Sender<()>,
1416 try_next_ready_sender: mpsc::Sender<()>,
1417 try_next_receiver: mpsc::Receiver<()>,
1418 handle_item_ready_sender: mpsc::Sender<()>,
1419 handle_item_sender: mpsc::Sender<()>,
1420 paused_sender: mpsc::Sender<()>,
1421 stopped_sender: mpsc::Sender<()>,
1422 unprepared_sender: mpsc::Sender<()>,
1423 }
1424
1425 impl TaskImpl for TaskTest {
1426 type Item = ();
1427
1428 fn obj(&self) -> &impl IsA<glib::Object> {
1429 &self.obj
1430 }
1431
1432 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1433 gst::debug!(RUNTIME_CAT, "nominal: prepared");
1434 self.prepared_sender.send(()).await.unwrap();
1435 Ok(())
1436 }
1437
1438 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1439 gst::debug!(RUNTIME_CAT, "nominal: started");
1440 self.started_sender.send(()).await.unwrap();
1441 Ok(())
1442 }
1443
1444 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1445 gst::debug!(RUNTIME_CAT, "nominal: entering try_next");
1446 self.try_next_ready_sender.send(()).await.unwrap();
1447 gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next");
1448 self.try_next_receiver.next().await.unwrap();
1449 Ok(())
1450 }
1451
1452 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1453 gst::debug!(RUNTIME_CAT, "nominal: entering handle_item");
1454 self.handle_item_ready_sender.send(()).await.unwrap();
1455
1456 gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item");
1457 self.handle_item_sender.send(()).await.unwrap();
1458 gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item");
1459
1460 Ok(())
1461 }
1462
1463 async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
1464 gst::debug!(RUNTIME_CAT, "nominal: paused");
1465 self.paused_sender.send(()).await.unwrap();
1466 Ok(())
1467 }
1468
1469 async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
1470 gst::debug!(RUNTIME_CAT, "nominal: stopped");
1471 self.stopped_sender.send(()).await.unwrap();
1472 Ok(())
1473 }
1474
1475 async fn unprepare(&mut self) {
1476 gst::debug!(RUNTIME_CAT, "nominal: unprepared");
1477 self.unprepared_sender.send(()).await.unwrap();
1478 }
1479 }
1480
1481 let context = Context::acquire("nominal", Duration::from_millis(2)).unwrap();
1482
1483 let task = Task::default();
1484
1485 assert_eq!(task.state(), Unprepared);
1486
1487 gst::debug!(RUNTIME_CAT, "nominal: preparing");
1488
1489 let (prepared_sender, mut prepared_receiver) = mpsc::channel(1);
1490 let (started_sender, mut started_receiver) = mpsc::channel(1);
1491 let (try_next_ready_sender, mut try_next_ready_receiver) = mpsc::channel(1);
1492 let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
1493 let (handle_item_ready_sender, mut handle_item_ready_receiver) = mpsc::channel(1);
1494 let (handle_item_sender, mut handle_item_receiver) = mpsc::channel(0);
1495 let (paused_sender, mut paused_receiver) = mpsc::channel(1);
1496 let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
1497 let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
1498 let obj = gst::Pad::builder(gst::PadDirection::Unknown)
1499 .name("runtime::Task::nominal")
1500 .build();
1501 let prepare_status = task.prepare(
1502 TaskTest {
1503 obj: obj.clone().into(),
1504 prepared_sender,
1505 started_sender,
1506 try_next_ready_sender,
1507 try_next_receiver,
1508 handle_item_ready_sender,
1509 handle_item_sender,
1510 paused_sender,
1511 stopped_sender,
1512 unprepared_sender,
1513 },
1514 context,
1515 );
1516
1517 assert!(prepare_status.is_pending());
1518 match prepare_status {
1519 Pending {
1520 trigger: Prepare,
1521 origin: Unprepared,
1522 ..
1523 } => (),
1524 other => panic!("{other:?}"),
1525 };
1526
1527 gst::debug!(RUNTIME_CAT, "nominal: starting (async prepare)");
1528 let start_status = task.start().check().unwrap();
1529
1530 block_on(prepared_receiver.next()).unwrap();
1531 assert_eq!(
1532 prepare_status.block_on_or_add_subtask(&obj).unwrap(),
1533 Complete {
1534 origin: Unprepared,
1535 target: Prepared,
1536 },
1537 );
1538
1539 block_on(started_receiver.next()).unwrap();
1540 assert_eq!(
1541 start_status.block_on().unwrap(),
1542 Complete {
1543 origin: Prepared,
1544 target: Started,
1545 }
1546 );
1547 assert_eq!(task.state(), Started);
1548
1549 block_on(try_next_ready_receiver.next()).unwrap();
1551 block_on(try_next_sender.send(())).unwrap();
1552 block_on(handle_item_ready_receiver.next()).unwrap();
1553 block_on(handle_item_receiver.next()).unwrap();
1554
1555 gst::debug!(RUNTIME_CAT, "nominal: starting (redundant)");
1556 assert_eq!(
1558 task.start().block_on().unwrap(),
1559 Skipped {
1560 trigger: Start,
1561 state: Started,
1562 },
1563 );
1564 assert_eq!(task.state(), Started);
1565
1566 match task.unprepare().check().unwrap_err() {
1568 TransitionError {
1569 trigger: Unprepare,
1570 state: Started,
1571 ..
1572 } => (),
1573 other => panic!("{other:?}"),
1574 }
1575
1576 gst::debug!(RUNTIME_CAT, "nominal: pause cancelling try_next");
1577 block_on(try_next_ready_receiver.next()).unwrap();
1578
1579 let pause_status = task.pause().check().unwrap();
1580 gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1581 block_on(paused_receiver.next()).unwrap();
1582 assert_eq!(
1583 pause_status.block_on().unwrap(),
1584 Complete {
1585 origin: Started,
1586 target: Paused,
1587 },
1588 );
1589
1590 assert!(handle_item_ready_receiver.try_recv().is_err());
1592 assert!(try_next_ready_receiver.try_recv().is_err());
1594
1595 gst::debug!(
1596 RUNTIME_CAT,
1597 "nominal: starting (after pause cancelling try_next)"
1598 );
1599 let start_receiver = task.start().check().unwrap();
1600 block_on(started_receiver.next()).unwrap();
1601 assert_eq!(
1602 start_receiver.block_on().unwrap(),
1603 Complete {
1604 origin: Paused,
1605 target: Started,
1606 },
1607 );
1608 assert_eq!(task.state(), Started);
1609
1610 gst::debug!(RUNTIME_CAT, "nominal: pause // handle_item");
1611 block_on(try_next_ready_receiver.next()).unwrap();
1612 block_on(try_next_sender.send(())).unwrap();
1613 block_on(handle_item_ready_receiver.next()).unwrap();
1615
1616 gst::debug!(RUNTIME_CAT, "nominal: requesting to pause");
1617 let pause_status = task.pause().check().unwrap();
1618
1619 gst::debug!(RUNTIME_CAT, "nominal: unlocking item handling");
1620 block_on(handle_item_receiver.next()).unwrap();
1621
1622 gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1623 block_on(paused_receiver.next()).unwrap();
1624 assert_eq!(
1625 pause_status.block_on().unwrap(),
1626 Complete {
1627 origin: Started,
1628 target: Paused,
1629 },
1630 );
1631
1632 assert!(try_next_ready_receiver.try_recv().is_err());
1634
1635 gst::debug!(
1636 RUNTIME_CAT,
1637 "nominal: starting (after pause // handle_item)"
1638 );
1639 let start_receiver = task.start().check().unwrap();
1640 block_on(started_receiver.next()).unwrap();
1641 assert_eq!(
1642 start_receiver.block_on().unwrap(),
1643 Complete {
1644 origin: Paused,
1645 target: Started,
1646 },
1647 );
1648 assert_eq!(task.state(), Started);
1649
1650 gst::debug!(RUNTIME_CAT, "nominal: stopping");
1651 assert_eq!(
1652 task.stop().block_on().unwrap(),
1653 Complete {
1654 origin: Started,
1655 target: Stopped,
1656 },
1657 );
1658
1659 assert_eq!(task.state(), Stopped);
1660 let _ = block_on(stopped_receiver.next());
1661
1662 let _ = try_next_ready_receiver.try_recv();
1664
1665 gst::debug!(RUNTIME_CAT, "nominal: starting (after stop)");
1666 assert_eq!(
1667 task.start().block_on().unwrap(),
1668 Complete {
1669 origin: Stopped,
1670 target: Started,
1671 },
1672 );
1673 let _ = block_on(started_receiver.next());
1674
1675 gst::debug!(RUNTIME_CAT, "nominal: stopping");
1676 assert_eq!(
1677 task.stop().block_on().unwrap(),
1678 Complete {
1679 origin: Started,
1680 target: Stopped,
1681 },
1682 );
1683
1684 assert_eq!(
1685 task.unprepare().block_on().unwrap(),
1686 Complete {
1687 origin: Stopped,
1688 target: Unprepared,
1689 },
1690 );
1691
1692 assert_eq!(task.state(), Unprepared);
1693 let _ = block_on(unprepared_receiver.next());
1694 }
1695
1696 #[test]
1697 fn prepare_error() {
1698 gst::init().unwrap();
1699
1700 struct TaskPrepareTest {
1701 obj: gst::Object,
1702 prepare_error_sender: mpsc::Sender<()>,
1703 }
1704
1705 impl TaskImpl for TaskPrepareTest {
1706 type Item = ();
1707
1708 fn obj(&self) -> &impl IsA<glib::Object> {
1709 &self.obj
1710 }
1711
1712 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1713 gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
1714 Err(gst::error_msg!(
1715 gst::ResourceError::Failed,
1716 ["prepare_error: intentional error"]
1717 ))
1718 }
1719
1720 async fn handle_action_error(
1721 &mut self,
1722 trigger: Trigger,
1723 state: TaskState,
1724 err: gst::ErrorMessage,
1725 ) -> Trigger {
1726 gst::debug!(
1727 RUNTIME_CAT,
1728 "prepare_error: handling prepare error {:?}",
1729 err
1730 );
1731 match (trigger, state) {
1732 (Prepare, Unprepared) => {
1733 self.prepare_error_sender.send(()).await.unwrap();
1734 }
1735 other => unreachable!("{:?}", other),
1736 }
1737 Trigger::Error
1738 }
1739
1740 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1741 unreachable!("prepare_error: try_next");
1742 }
1743
1744 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1745 unreachable!("prepare_error: handle_item");
1746 }
1747 }
1748
1749 let context = Context::acquire("prepare_error", Duration::from_millis(2)).unwrap();
1750
1751 let task = Task::default();
1752
1753 assert_eq!(task.state(), Unprepared);
1754
1755 let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
1756 let prepare_status = task.prepare(
1757 TaskPrepareTest {
1758 obj: gst::Pad::builder(gst::PadDirection::Unknown)
1759 .name("runtime::Task::prepare_error")
1760 .build()
1761 .into(),
1762 prepare_error_sender,
1763 },
1764 context,
1765 );
1766
1767 gst::debug!(
1768 RUNTIME_CAT,
1769 "prepare_error: await action error notification"
1770 );
1771 block_on(prepare_error_receiver.next()).unwrap();
1772
1773 match prepare_status.block_on().unwrap_err() {
1774 TransitionError {
1775 trigger: Trigger::Error,
1776 state: Preparing,
1777 ..
1778 } => (),
1779 other => panic!("{other:?}"),
1780 }
1781
1782 while TaskState::Error != task.state() {
1784 std::thread::sleep(Duration::from_millis(2));
1785 }
1786
1787 match task.start().block_on().unwrap_err() {
1788 TransitionError {
1789 trigger: Start,
1790 state: TaskState::Error,
1791 ..
1792 } => (),
1793 other => panic!("{other:?}"),
1794 }
1795
1796 block_on(task.unprepare()).unwrap();
1797 }
1798
1799 #[test]
1800 fn prepare_start_ok() {
1801 gst::init().unwrap();
1804
1805 struct TaskPrepareTest {
1806 obj: gst::Object,
1807 prepare_receiver: mpsc::Receiver<()>,
1808 }
1809
1810 impl TaskImpl for TaskPrepareTest {
1811 type Item = ();
1812
1813 fn obj(&self) -> &impl IsA<glib::Object> {
1814 &self.obj
1815 }
1816
1817 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1818 gst::debug!(
1819 RUNTIME_CAT,
1820 "prepare_start_ok: preparation awaiting trigger"
1821 );
1822 self.prepare_receiver.next().await.unwrap();
1823 gst::debug!(RUNTIME_CAT, "prepare_start_ok: preparation complete Ok");
1824 Ok(())
1825 }
1826
1827 async fn handle_action_error(
1828 &mut self,
1829 _trigger: Trigger,
1830 _state: TaskState,
1831 _err: gst::ErrorMessage,
1832 ) -> Trigger {
1833 unreachable!("prepare_start_ok: handle_prepare_error");
1834 }
1835
1836 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1837 gst::debug!(RUNTIME_CAT, "prepare_start_ok: started");
1838 Ok(())
1839 }
1840
1841 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1842 pending().await
1843 }
1844
1845 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1846 unreachable!("prepare_start_ok: handle_item");
1847 }
1848 }
1849
1850 let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap();
1851
1852 let task = Task::default();
1853
1854 let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
1855 let fut = task.prepare(
1856 TaskPrepareTest {
1857 obj: gst::Pad::builder(gst::PadDirection::Unknown)
1858 .name("runtime::Task::prepare_start_ok")
1859 .build()
1860 .into(),
1861 prepare_receiver,
1862 },
1863 context,
1864 );
1865 drop(fut);
1866
1867 let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
1868 let (ready_sender, ready_receiver) = oneshot::channel();
1869 let start_handle = start_ctx.spawn(async move {
1870 assert_eq!(task.state(), Preparing);
1871 gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting");
1872 let start_status = task.start();
1873 match start_status {
1874 Pending {
1875 trigger: Start,
1876 origin: Preparing,
1877 ..
1878 } => (),
1879 other => panic!("{other:?}"),
1880 }
1881 ready_sender.send(()).unwrap();
1882 assert_eq!(
1883 start_status.await.unwrap(),
1884 Complete {
1885 origin: Prepared,
1886 target: Started,
1887 },
1888 );
1889 assert_eq!(task.state(), Started);
1890
1891 let stop_status = task.stop();
1892 match stop_status {
1893 Pending {
1894 trigger: Stop,
1895 origin: Started,
1896 ..
1897 } => (),
1898 other => panic!("{other:?}"),
1899 }
1900 assert_eq!(
1901 stop_status.await.unwrap(),
1902 Complete {
1903 origin: Started,
1904 target: Stopped,
1905 },
1906 );
1907 assert_eq!(task.state(), Stopped);
1908
1909 let unprepare_status = task.unprepare();
1910 match unprepare_status {
1911 Pending {
1912 trigger: Unprepare,
1913 origin: Stopped,
1914 ..
1915 } => (),
1916 other => panic!("{other:?}"),
1917 };
1918 assert_eq!(
1919 unprepare_status.await.unwrap(),
1920 Complete {
1921 origin: Stopped,
1922 target: Unprepared,
1923 },
1924 );
1925 assert_eq!(task.state(), Unprepared);
1926 });
1927
1928 gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
1929 block_on(ready_receiver).unwrap();
1930
1931 gst::debug!(RUNTIME_CAT, "prepare_start_ok: triggering preparation");
1932 block_on(prepare_sender.send(())).unwrap();
1933
1934 block_on(start_handle).unwrap();
1935 }
1936
1937 #[test]
1938 fn prepare_start_error() {
1939 gst::init().unwrap();
1942
1943 struct TaskPrepareTest {
1944 obj: gst::Object,
1945 prepare_receiver: mpsc::Receiver<()>,
1946 prepare_error_sender: mpsc::Sender<()>,
1947 }
1948
1949 impl TaskImpl for TaskPrepareTest {
1950 type Item = ();
1951
1952 fn obj(&self) -> &impl IsA<glib::Object> {
1953 &self.obj
1954 }
1955
1956 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1957 gst::debug!(
1958 RUNTIME_CAT,
1959 "prepare_start_error: preparation awaiting trigger"
1960 );
1961 self.prepare_receiver.next().await.unwrap();
1962 gst::debug!(RUNTIME_CAT, "prepare_start_error: preparation complete Err");
1963
1964 Err(gst::error_msg!(
1965 gst::ResourceError::Failed,
1966 ["prepare_start_error: intentional error"]
1967 ))
1968 }
1969
1970 async fn handle_action_error(
1971 &mut self,
1972 trigger: Trigger,
1973 state: TaskState,
1974 err: gst::ErrorMessage,
1975 ) -> Trigger {
1976 gst::debug!(
1977 RUNTIME_CAT,
1978 "prepare_start_error: handling prepare error {:?}",
1979 err
1980 );
1981 match (trigger, state) {
1982 (Prepare, Unprepared) => {
1983 self.prepare_error_sender.send(()).await.unwrap();
1984 }
1985 other => panic!("action error for {other:?}"),
1986 }
1987 Trigger::Error
1988 }
1989
1990 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1991 unreachable!("prepare_start_error: start");
1992 }
1993
1994 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1995 unreachable!("prepare_start_error: try_next");
1996 }
1997
1998 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1999 unreachable!("prepare_start_error: handle_item");
2000 }
2001 }
2002
2003 let context = Context::acquire("prepare_start_error", Duration::from_millis(2)).unwrap();
2004
2005 let task = Task::default();
2006
2007 let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
2008 let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
2009 let prepare_status = task.prepare(
2010 TaskPrepareTest {
2011 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2012 .name("runtime::Task::prepare_start_error")
2013 .build()
2014 .into(),
2015 prepare_receiver,
2016 prepare_error_sender,
2017 },
2018 context,
2019 );
2020 match prepare_status {
2021 Pending {
2022 trigger: Prepare,
2023 origin: Unprepared,
2024 ..
2025 } => (),
2026 other => panic!("{other:?}"),
2027 };
2028
2029 let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap();
2030 let (ready_sender, ready_receiver) = oneshot::channel();
2031 let start_handle = start_ctx.spawn(async move {
2032 gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)");
2033 let fut = task.start();
2034 drop(fut);
2035 ready_sender.send(()).unwrap();
2036 match prepare_status.await {
2040 Err(TransitionError {
2041 trigger: Trigger::Error,
2042 state: Preparing,
2043 ..
2044 }) => (),
2045 other => panic!("{other:?}"),
2046 }
2047
2048 let unprepare_status = task.unprepare();
2049 match unprepare_status {
2050 Pending {
2051 trigger: Unprepare,
2052 origin: TaskState::Error,
2053 ..
2054 } => (),
2055 other => panic!("{other:?}"),
2056 };
2057 assert_eq!(
2058 unprepare_status.await.unwrap(),
2059 Complete {
2060 origin: TaskState::Error,
2061 target: Unprepared,
2062 },
2063 );
2064 });
2065
2066 gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx");
2067 block_on(ready_receiver).unwrap();
2068
2069 gst::debug!(
2070 RUNTIME_CAT,
2071 "prepare_start_error: triggering preparation (failure)"
2072 );
2073 block_on(prepare_sender.send(())).unwrap();
2074
2075 gst::debug!(
2076 RUNTIME_CAT,
2077 "prepare_start_error: await prepare error notification"
2078 );
2079 block_on(prepare_error_receiver.next()).unwrap();
2080
2081 block_on(start_handle).unwrap();
2082 }
2083
2084 #[test]
2085 fn item_error() {
2086 gst::init().unwrap();
2087
2088 struct TaskTest {
2089 obj: gst::Object,
2090 try_next_receiver: mpsc::Receiver<gst::FlowError>,
2091 }
2092
2093 impl TaskImpl for TaskTest {
2094 type Item = gst::FlowError;
2095
2096 fn obj(&self) -> &impl IsA<glib::Object> {
2097 &self.obj
2098 }
2099
2100 async fn try_next(&mut self) -> Result<gst::FlowError, gst::FlowError> {
2101 gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
2102 Ok(self.try_next_receiver.next().await.unwrap())
2103 }
2104
2105 async fn handle_item(&mut self, item: gst::FlowError) -> Result<(), gst::FlowError> {
2106 gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item);
2107 Err(item)
2108 }
2109 }
2110
2111 let context = Context::acquire("item_error", Duration::from_millis(2)).unwrap();
2112 let task = Task::default();
2113 gst::debug!(RUNTIME_CAT, "item_error: prepare and start");
2114 let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
2115 task.prepare(
2116 TaskTest {
2117 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2118 .name("runtime::Task::item_error")
2119 .build()
2120 .into(),
2121 try_next_receiver,
2122 },
2123 context,
2124 )
2125 .block_on()
2126 .unwrap();
2127 task.start().block_on().unwrap();
2128
2129 gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos");
2130 block_on(try_next_sender.send(gst::FlowError::Eos)).unwrap();
2131 while Stopped != task.state() {
2133 std::thread::sleep(Duration::from_millis(2));
2134 }
2135
2136 gst::debug!(RUNTIME_CAT, "item_error: starting (after stop)");
2137 assert_eq!(
2138 task.start().block_on().unwrap(),
2139 Complete {
2140 origin: Stopped,
2141 target: Started,
2142 },
2143 );
2144
2145 gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Error");
2146 block_on(try_next_sender.send(gst::FlowError::Error)).unwrap();
2147 while TaskState::Error != task.state() {
2149 std::thread::sleep(Duration::from_millis(2));
2150 }
2151
2152 gst::debug!(RUNTIME_CAT, "item_error: attempting to start (after Error)");
2153 match task.start().block_on().unwrap_err() {
2154 TransitionError {
2155 trigger: Start,
2156 state: TaskState::Error,
2157 ..
2158 } => (),
2159 other => panic!("{other:?}"),
2160 }
2161
2162 assert_eq!(
2163 task.unprepare().block_on().unwrap(),
2164 Complete {
2165 origin: TaskState::Error,
2166 target: Unprepared,
2167 },
2168 );
2169 }
2170
2171 #[test]
2172 fn flush_regular_sync() {
2173 gst::init().unwrap();
2174
2175 struct TaskFlushTest {
2176 obj: gst::Object,
2177 flush_start_sender: mpsc::Sender<()>,
2178 flush_stop_sender: mpsc::Sender<()>,
2179 }
2180
2181 impl TaskImpl for TaskFlushTest {
2182 type Item = ();
2183
2184 fn obj(&self) -> &impl IsA<glib::Object> {
2185 &self.obj
2186 }
2187
2188 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2189 pending().await
2190 }
2191
2192 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2193 unreachable!("flush_regular_sync: handle_item");
2194 }
2195
2196 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2197 gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
2198 self.flush_start_sender.send(()).await.unwrap();
2199 Ok(())
2200 }
2201
2202 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2203 gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopped flushing");
2204 self.flush_stop_sender.send(()).await.unwrap();
2205 Ok(())
2206 }
2207 }
2208
2209 let context = Context::acquire("flush_regular_sync", Duration::from_millis(2)).unwrap();
2210
2211 let task = Task::default();
2212
2213 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2214 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2215 let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2216 .name("runtime::Task::flush_regular_sync")
2217 .build();
2218 let fut = task.prepare(
2219 TaskFlushTest {
2220 obj: obj.clone().into(),
2221 flush_start_sender,
2222 flush_stop_sender,
2223 },
2224 context,
2225 );
2226 drop(fut);
2227
2228 gst::debug!(RUNTIME_CAT, "flush_regular_sync: start");
2229 block_on(task.start()).unwrap();
2230
2231 gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush");
2232 assert_eq!(
2233 task.flush_start().block_on().unwrap(),
2234 Complete {
2235 origin: Started,
2236 target: Flushing,
2237 },
2238 );
2239 assert_eq!(task.state(), Flushing);
2240
2241 block_on(flush_start_receiver.next()).unwrap();
2242
2243 gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
2244 assert_eq!(
2245 task.flush_stop().block_on_or_add_subtask(&obj).unwrap(),
2246 Complete {
2247 origin: Flushing,
2248 target: Started,
2249 },
2250 );
2251 assert_eq!(task.state(), Started);
2252
2253 block_on(flush_stop_receiver.next()).unwrap();
2254
2255 let fut = task.pause();
2256 drop(fut);
2257 stop_then_unprepare(task);
2258 }
2259
2260 #[test]
2261 fn flush_regular_different_context() {
2262 gst::init().unwrap();
2264
2265 struct TaskFlushTest {
2266 obj: gst::Object,
2267 flush_start_sender: mpsc::Sender<()>,
2268 flush_stop_sender: mpsc::Sender<()>,
2269 }
2270
2271 impl TaskImpl for TaskFlushTest {
2272 type Item = ();
2273
2274 fn obj(&self) -> &impl IsA<glib::Object> {
2275 &self.obj
2276 }
2277
2278 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2279 pending().await
2280 }
2281
2282 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2283 unreachable!("flush_regular_different_context: handle_item");
2284 }
2285
2286 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2287 gst::debug!(
2288 RUNTIME_CAT,
2289 "flush_regular_different_context: started flushing"
2290 );
2291 self.flush_start_sender.send(()).await.unwrap();
2292 Ok(())
2293 }
2294
2295 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2296 gst::debug!(
2297 RUNTIME_CAT,
2298 "flush_regular_different_context: stopped flushing"
2299 );
2300 self.flush_stop_sender.send(()).await.unwrap();
2301 Ok(())
2302 }
2303 }
2304
2305 let context =
2306 Context::acquire("flush_regular_different_context", Duration::from_millis(2)).unwrap();
2307
2308 let task = Task::default();
2309
2310 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2311 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2312 let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2313 .name("runtime::Task::flush_regular_different_context")
2314 .build();
2315 let fut = task.prepare(
2316 TaskFlushTest {
2317 obj: obj.clone().into(),
2318 flush_start_sender,
2319 flush_stop_sender,
2320 },
2321 context,
2322 );
2323 drop(fut);
2324
2325 gst::debug!(RUNTIME_CAT, "flush_regular_different_context: start");
2326 task.start().block_on().unwrap();
2327
2328 let oob_context = Context::acquire(
2329 "flush_regular_different_context_oob",
2330 Duration::from_millis(2),
2331 )
2332 .unwrap();
2333
2334 let task_clone = task.clone();
2335 let flush_res_fut = oob_context.spawn(async move {
2336 let flush_start_status = task_clone.flush_start();
2337 match flush_start_status {
2338 Pending {
2339 trigger: FlushStart,
2340 origin: Started,
2341 ..
2342 } => (),
2343 other => panic!("{other:?}"),
2344 };
2345 assert_eq!(
2346 flush_start_status.await.unwrap(),
2347 Complete {
2348 origin: Started,
2349 target: Flushing,
2350 },
2351 );
2352 assert_eq!(task_clone.state(), Flushing);
2353 flush_start_receiver.next().await.unwrap();
2354
2355 let flush_stop_status = task_clone.flush_stop();
2356 match flush_stop_status {
2357 Pending {
2358 trigger: FlushStop,
2359 origin: Flushing,
2360 ..
2361 } => (),
2362 other => panic!("{other:?}"),
2363 };
2364 assert_eq!(
2365 flush_stop_status.block_on_or_add_subtask(&obj).unwrap(),
2366 NotWaiting {
2367 trigger: FlushStop,
2368 origin: Flushing,
2369 },
2370 );
2371
2372 Context::drain_sub_tasks().await.unwrap();
2373 assert_eq!(task_clone.state(), Started);
2374 });
2375
2376 block_on(flush_res_fut).unwrap();
2377 block_on(flush_stop_receiver.next()).unwrap();
2378
2379 stop_then_unprepare(task);
2380 }
2381
2382 #[test]
2383 fn flush_regular_same_context() {
2384 gst::init().unwrap();
2386
2387 struct TaskFlushTest {
2388 obj: gst::Object,
2389 flush_start_sender: mpsc::Sender<()>,
2390 flush_stop_sender: mpsc::Sender<()>,
2391 }
2392
2393 impl TaskImpl for TaskFlushTest {
2394 type Item = ();
2395
2396 fn obj(&self) -> &impl IsA<glib::Object> {
2397 &self.obj
2398 }
2399
2400 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2401 pending().await
2402 }
2403
2404 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2405 unreachable!("flush_regular_same_context: handle_item");
2406 }
2407
2408 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2409 gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
2410 self.flush_start_sender.send(()).await.unwrap();
2411 Ok(())
2412 }
2413
2414 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2415 gst::debug!(RUNTIME_CAT, "flush_regular_same_context: stopped flushing");
2416 self.flush_stop_sender.send(()).await.unwrap();
2417 Ok(())
2418 }
2419 }
2420
2421 let context =
2422 Context::acquire("flush_regular_same_context", Duration::from_millis(2)).unwrap();
2423
2424 let task = Task::default();
2425
2426 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2427 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2428 let fut = task.prepare(
2429 TaskFlushTest {
2430 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2431 .name("runtime::Task::flush_regular_same_context")
2432 .build()
2433 .into(),
2434 flush_start_sender,
2435 flush_stop_sender,
2436 },
2437 context.clone(),
2438 );
2439 drop(fut);
2440
2441 block_on(task.start()).unwrap();
2442
2443 let task_clone = task.clone();
2444 let flush_handle = context.spawn(async move {
2445 let flush_start_status = task_clone.flush_start();
2446 match flush_start_status {
2447 Pending {
2448 trigger: FlushStart,
2449 origin: Started,
2450 ..
2451 } => (),
2452 other => panic!("{other:?}"),
2453 };
2454 assert_eq!(
2455 flush_start_status.await.unwrap(),
2456 Complete {
2457 origin: Started,
2458 target: Flushing,
2459 },
2460 );
2461 assert_eq!(task_clone.state(), Flushing);
2462 flush_start_receiver.next().await.unwrap();
2463
2464 let flush_stop_status = task_clone.flush_stop();
2465 match flush_stop_status {
2466 Pending {
2467 trigger: FlushStop,
2468 origin: Flushing,
2469 ..
2470 } => (),
2471 other => panic!("{other:?}"),
2472 };
2473 assert_eq!(
2474 flush_stop_status.await.unwrap(),
2475 Complete {
2476 origin: Flushing,
2477 target: Started,
2478 },
2479 );
2480 assert_eq!(task_clone.state(), Started);
2481 });
2482
2483 block_on(flush_handle).unwrap();
2484 block_on(flush_stop_receiver.next()).unwrap();
2485
2486 stop_then_unprepare(task);
2487 }
2488
2489 #[test]
2490 fn flush_from_loop() {
2491 gst::init().unwrap();
2493
2494 struct TaskFlushTest {
2495 obj: gst::Object,
2496 task: Task,
2497 flush_start_sender: mpsc::Sender<()>,
2498 }
2499
2500 impl TaskImpl for TaskFlushTest {
2501 type Item = ();
2502
2503 fn obj(&self) -> &impl IsA<glib::Object> {
2504 &self.obj
2505 }
2506
2507 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2508 Ok(())
2509 }
2510
2511 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2512 gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item");
2513 match self.task.flush_start() {
2514 Pending {
2515 trigger: FlushStart,
2516 origin: Started,
2517 ..
2518 } => (),
2519 other => panic!("{other:?}"),
2520 }
2521 Ok(())
2522 }
2523
2524 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2525 gst::debug!(RUNTIME_CAT, "flush_from_loop: started flushing");
2526 self.flush_start_sender.send(()).await.unwrap();
2527 Ok(())
2528 }
2529 }
2530
2531 let context = Context::acquire("flush_from_loop", Duration::from_millis(2)).unwrap();
2532
2533 let task = Task::default();
2534
2535 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2536 let fut = task.prepare(
2537 TaskFlushTest {
2538 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2539 .name("runtime::Task::flush_from_loop")
2540 .build()
2541 .into(),
2542 task: task.clone(),
2543 flush_start_sender,
2544 },
2545 context,
2546 );
2547 drop(fut);
2548
2549 let fut = task.start();
2550 drop(fut);
2551
2552 gst::debug!(
2553 RUNTIME_CAT,
2554 "flush_from_loop: awaiting flush_start notification"
2555 );
2556 block_on(flush_start_receiver.next()).unwrap();
2557
2558 assert_eq!(
2559 task.stop().block_on().unwrap(),
2560 Complete {
2561 origin: Flushing,
2562 target: Stopped,
2563 },
2564 );
2565 task.unprepare().block_on().unwrap();
2566 }
2567
2568 #[test]
2569 fn pause_from_loop() {
2570 gst::init().unwrap();
2573
2574 struct TaskStartTest {
2575 obj: gst::Object,
2576 task: Task,
2577 pause_sender: mpsc::Sender<()>,
2578 }
2579
2580 impl TaskImpl for TaskStartTest {
2581 type Item = ();
2582
2583 fn obj(&self) -> &impl IsA<glib::Object> {
2584 &self.obj
2585 }
2586
2587 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2588 Ok(())
2589 }
2590
2591 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2592 gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
2593
2594 crate::runtime::timer::delay_for(Duration::from_millis(50)).await;
2595
2596 gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
2597 match self.task.pause() {
2598 Pending {
2599 trigger: Pause,
2600 origin: Started,
2601 ..
2602 } => (),
2603 other => panic!("{other:?}"),
2604 }
2605
2606 Ok(())
2607 }
2608
2609 async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
2610 gst::debug!(RUNTIME_CAT, "pause_from_loop: entering pause action");
2611 self.pause_sender.send(()).await.unwrap();
2612 Ok(())
2613 }
2614 }
2615
2616 let context = Context::acquire("pause_from_loop", Duration::from_millis(2)).unwrap();
2617
2618 let task = Task::default();
2619
2620 let (pause_sender, mut pause_receiver) = mpsc::channel(1);
2621 let fut = task.prepare(
2622 TaskStartTest {
2623 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2624 .name("runtime::Task::pause_from_loop")
2625 .build()
2626 .into(),
2627 task: task.clone(),
2628 pause_sender,
2629 },
2630 context,
2631 );
2632 drop(fut);
2633
2634 let fut = task.start();
2635 drop(fut);
2636
2637 gst::debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification");
2638 block_on(pause_receiver.next()).unwrap();
2639
2640 stop_then_unprepare(task);
2641 }
2642
2643 #[test]
2644 fn trigger_from_action() {
2645 gst::init().unwrap();
2647
2648 struct TaskFlushTest {
2649 obj: gst::Object,
2650 task: Task,
2651 flush_stop_sender: mpsc::Sender<()>,
2652 }
2653
2654 impl TaskImpl for TaskFlushTest {
2655 type Item = ();
2656
2657 fn obj(&self) -> &impl IsA<glib::Object> {
2658 &self.obj
2659 }
2660
2661 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2662 pending().await
2663 }
2664
2665 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2666 unreachable!("trigger_from_action: handle_item");
2667 }
2668
2669 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2670 gst::debug!(
2671 RUNTIME_CAT,
2672 "trigger_from_action: flush_start triggering flush_stop"
2673 );
2674 match self.task.flush_stop() {
2675 Pending {
2676 trigger: FlushStop,
2677 origin: Started,
2678 ..
2679 } => (),
2680 other => panic!("{other:?}"),
2681 }
2682
2683 Ok(())
2684 }
2685
2686 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2687 gst::debug!(RUNTIME_CAT, "trigger_from_action: stopped flushing");
2688 self.flush_stop_sender.send(()).await.unwrap();
2689 Ok(())
2690 }
2691 }
2692
2693 let context = Context::acquire("trigger_from_action", Duration::from_millis(2)).unwrap();
2694
2695 let task = Task::default();
2696
2697 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2698 let fut = task.prepare(
2699 TaskFlushTest {
2700 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2701 .name("runtime::Task::trigger_from_action")
2702 .build()
2703 .into(),
2704 task: task.clone(),
2705 flush_stop_sender,
2706 },
2707 context,
2708 );
2709 drop(fut);
2710
2711 task.start().block_on().unwrap();
2712 let fut = task.flush_start();
2713 drop(fut);
2714
2715 gst::debug!(
2716 RUNTIME_CAT,
2717 "trigger_from_action: awaiting flush_stop notification"
2718 );
2719 block_on(flush_stop_receiver.next()).unwrap();
2720
2721 stop_then_unprepare(task);
2722 }
2723
2724 #[test]
2725 fn pause_flush_start() {
2726 gst::init().unwrap();
2727
2728 struct TaskFlushTest {
2729 obj: gst::Object,
2730 started_sender: mpsc::Sender<()>,
2731 flush_start_sender: mpsc::Sender<()>,
2732 flush_stop_sender: mpsc::Sender<()>,
2733 }
2734
2735 impl TaskImpl for TaskFlushTest {
2736 type Item = ();
2737
2738 fn obj(&self) -> &impl IsA<glib::Object> {
2739 &self.obj
2740 }
2741
2742 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2743 gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
2744 self.started_sender.send(()).await.unwrap();
2745 Ok(())
2746 }
2747
2748 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2749 pending().await
2750 }
2751
2752 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2753 unreachable!("pause_flush_start: handle_item");
2754 }
2755
2756 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2757 gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
2758 self.flush_start_sender.send(()).await.unwrap();
2759 Ok(())
2760 }
2761
2762 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2763 gst::debug!(RUNTIME_CAT, "pause_flush_start: stopped flushing");
2764 self.flush_stop_sender.send(()).await.unwrap();
2765 Ok(())
2766 }
2767 }
2768
2769 let context = Context::acquire("pause_flush_start", Duration::from_millis(2)).unwrap();
2770
2771 let task = Task::default();
2772
2773 let (started_sender, mut started_receiver) = mpsc::channel(1);
2774 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2775 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2776 let fut = task.prepare(
2777 TaskFlushTest {
2778 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2779 .name("runtime::Task::pause_flush_start")
2780 .build()
2781 .into(),
2782 started_sender,
2783 flush_start_sender,
2784 flush_stop_sender,
2785 },
2786 context,
2787 );
2788 drop(fut);
2789
2790 gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing");
2793 assert_eq!(
2794 task.pause().block_on().unwrap(),
2795 Complete {
2796 origin: Prepared,
2797 target: Paused,
2798 },
2799 );
2800
2801 gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush");
2802 assert_eq!(
2803 task.flush_start().block_on().unwrap(),
2804 Complete {
2805 origin: Paused,
2806 target: PausedFlushing,
2807 },
2808 );
2809 assert_eq!(task.state(), PausedFlushing);
2810 block_on(flush_start_receiver.next());
2811
2812 gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
2813 assert_eq!(
2814 task.flush_stop().block_on().unwrap(),
2815 Complete {
2816 origin: PausedFlushing,
2817 target: Paused,
2818 },
2819 );
2820 assert_eq!(task.state(), Paused);
2821 block_on(flush_stop_receiver.next());
2822
2823 started_receiver.try_recv().unwrap_err();
2825
2826 gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing");
2827 assert_eq!(
2828 task.start().block_on().unwrap(),
2829 Complete {
2830 origin: Paused,
2831 target: Started,
2832 },
2833 );
2834 assert_eq!(task.state(), Started);
2835 block_on(started_receiver.next());
2836
2837 stop_then_unprepare(task);
2838 }
2839
2840 #[test]
2841 fn pause_flushing_start() {
2842 gst::init().unwrap();
2843
2844 struct TaskFlushTest {
2845 obj: gst::Object,
2846 started_sender: mpsc::Sender<()>,
2847 flush_start_sender: mpsc::Sender<()>,
2848 flush_stop_sender: mpsc::Sender<()>,
2849 }
2850
2851 impl TaskImpl for TaskFlushTest {
2852 type Item = ();
2853
2854 fn obj(&self) -> &impl IsA<glib::Object> {
2855 &self.obj
2856 }
2857
2858 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2859 gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
2860 self.started_sender.send(()).await.unwrap();
2861 Ok(())
2862 }
2863
2864 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2865 pending().await
2866 }
2867
2868 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2869 unreachable!("pause_flushing_start: handle_item");
2870 }
2871
2872 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2873 gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
2874 self.flush_start_sender.send(()).await.unwrap();
2875 Ok(())
2876 }
2877
2878 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2879 gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopped flushing");
2880 self.flush_stop_sender.send(()).await.unwrap();
2881 Ok(())
2882 }
2883 }
2884
2885 let context = Context::acquire("pause_flushing_start", Duration::from_millis(2)).unwrap();
2886
2887 let task = Task::default();
2888
2889 let (started_sender, mut started_receiver) = mpsc::channel(1);
2890 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2891 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2892 let fut = task.prepare(
2893 TaskFlushTest {
2894 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2895 .name("runtime::Task::pause_flushing_start")
2896 .build()
2897 .into(),
2898 started_sender,
2899 flush_start_sender,
2900 flush_stop_sender,
2901 },
2902 context,
2903 );
2904 drop(fut);
2905
2906 gst::debug!(RUNTIME_CAT, "pause_flushing_start: pausing");
2909 let fut = task.pause();
2910 drop(fut);
2911
2912 gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting flush");
2913 block_on(task.flush_start()).unwrap();
2914 assert_eq!(task.state(), PausedFlushing);
2915 block_on(flush_start_receiver.next());
2916
2917 gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
2918 assert_eq!(
2919 task.start().block_on().unwrap(),
2920 Complete {
2921 origin: PausedFlushing,
2922 target: Flushing,
2923 },
2924 );
2925 assert_eq!(task.state(), Flushing);
2926
2927 started_receiver.try_recv().unwrap_err();
2929
2930 gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush");
2931 assert_eq!(
2932 task.flush_stop().block_on().unwrap(),
2933 Complete {
2934 origin: Flushing,
2935 target: Started,
2936 },
2937 );
2938 assert_eq!(task.state(), Started);
2939 block_on(flush_stop_receiver.next());
2940 block_on(started_receiver.next());
2941
2942 stop_then_unprepare(task);
2943 }
2944
2945 #[test]
2946 fn flush_concurrent_start() {
2947 gst::init().unwrap();
2950
2951 struct TaskStartTest {
2952 obj: gst::Object,
2953 flush_start_sender: mpsc::Sender<()>,
2954 flush_stop_sender: mpsc::Sender<()>,
2955 }
2956
2957 impl TaskImpl for TaskStartTest {
2958 type Item = ();
2959
2960 fn obj(&self) -> &impl IsA<glib::Object> {
2961 &self.obj
2962 }
2963
2964 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2965 pending().await
2966 }
2967
2968 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2969 unreachable!("flush_concurrent_start: handle_item");
2970 }
2971
2972 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2973 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
2974 self.flush_start_sender.send(()).await.unwrap();
2975 Ok(())
2976 }
2977
2978 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2979 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: stopped flushing");
2980 self.flush_stop_sender.send(()).await.unwrap();
2981 Ok(())
2982 }
2983 }
2984
2985 let context = Context::acquire("flush_concurrent_start", Duration::from_millis(2)).unwrap();
2986
2987 let task = Task::default();
2988
2989 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2990 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2991 let fut = task.prepare(
2992 TaskStartTest {
2993 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2994 .name("runtime::Task::flush_concurrent_start")
2995 .build()
2996 .into(),
2997 flush_start_sender,
2998 flush_stop_sender,
2999 },
3000 context,
3001 );
3002 drop(fut);
3003
3004 let oob_context =
3005 Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap();
3006 let task_clone = task.clone();
3007
3008 block_on(task.pause()).unwrap();
3009
3010 let (ready_sender, ready_receiver) = oneshot::channel();
3012 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: spawning flush_start");
3013 let flush_start_handle = oob_context.spawn(async move {
3014 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start");
3015 ready_sender.send(()).unwrap();
3016 let status = task_clone.flush_start();
3017 match status {
3018 Pending {
3019 trigger: FlushStart,
3020 origin: Paused,
3021 ..
3022 } => (),
3023 Pending {
3024 trigger: FlushStart,
3025 origin: Started,
3026 ..
3027 } => (),
3028 other => panic!("{other:?}"),
3029 };
3030 status.await.unwrap();
3031 flush_start_receiver.next().await.unwrap();
3032 });
3033
3034 gst::debug!(
3035 RUNTIME_CAT,
3036 "flush_concurrent_start: awaiting for oob_context"
3037 );
3038 block_on(ready_receiver).unwrap();
3039
3040 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
3041 match block_on(task.start()) {
3042 Ok(Complete {
3043 origin: Paused,
3044 target: Started,
3045 }) => (),
3046 Ok(Complete {
3047 origin: PausedFlushing,
3048 target: Flushing,
3049 }) => (),
3050 other => panic!("{other:?}"),
3051 }
3052
3053 block_on(flush_start_handle).unwrap();
3054
3055 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
3056 assert_eq!(
3057 task.flush_stop().block_on().unwrap(),
3058 Complete {
3059 origin: Flushing,
3060 target: Started,
3061 },
3062 );
3063 assert_eq!(task.state(), Started);
3064 block_on(flush_stop_receiver.next());
3065
3066 stop_then_unprepare(task);
3067 }
3068
3069 #[test]
3070 fn start_timer() {
3071 use crate::runtime::timer;
3072
3073 gst::init().unwrap();
3076
3077 struct TaskTimerTest {
3078 obj: gst::Object,
3079 timer: Option<timer::Oneshot>,
3080 timer_elapsed_sender: Option<oneshot::Sender<()>>,
3081 }
3082
3083 impl TaskImpl for TaskTimerTest {
3084 type Item = ();
3085
3086 fn obj(&self) -> &impl IsA<glib::Object> {
3087 &self.obj
3088 }
3089
3090 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
3091 self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
3092 gst::debug!(RUNTIME_CAT, "start_timer: started");
3093 Ok(())
3094 }
3095
3096 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
3097 gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer");
3098 self.timer.take().unwrap().await;
3099 Ok(())
3100 }
3101
3102 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
3103 gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
3104 if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() {
3105 timer_elapsed_sender.send(()).unwrap();
3106 }
3107
3108 Err(gst::FlowError::Eos)
3109 }
3110 }
3111
3112 let context = Context::acquire("start_timer", Duration::from_millis(2)).unwrap();
3113
3114 let task = Task::default();
3115
3116 let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel();
3117 let fut = task.prepare(
3118 TaskTimerTest {
3119 obj: gst::Pad::builder(gst::PadDirection::Unknown)
3120 .name("runtime::Task::start_timer")
3121 .build()
3122 .into(),
3123 timer: None,
3124 timer_elapsed_sender: Some(timer_elapsed_sender),
3125 },
3126 context,
3127 );
3128 drop(fut);
3129
3130 gst::debug!(RUNTIME_CAT, "start_timer: start");
3131 let fut = task.start();
3132 drop(fut);
3133
3134 block_on(timer_elapsed_receiver).unwrap();
3135 gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed received");
3136
3137 stop_then_unprepare(task);
3138 }
3139}