1use futures::channel::mpsc as async_mpsc;
13use futures::channel::oneshot;
14use futures::prelude::*;
15
16use std::fmt;
17use std::ops::Deref;
18use std::pin::{pin, Pin};
19use std::sync::{Arc, Mutex, MutexGuard};
20use std::task::Poll;
21
22use gst::glib;
23use gst::glib::prelude::*;
24
25use super::{Context, JoinHandle, RUNTIME_CAT};
26
27#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
28pub enum TaskState {
29 Error,
30 Flushing,
31 Paused,
32 PausedFlushing,
33 Prepared,
34 Preparing,
35 Started,
36 Stopped,
37 Unprepared,
38}
39
40#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub enum Trigger {
42 Error,
43 FlushStart,
44 FlushStop,
45 Pause,
46 Prepare,
47 Start,
48 Stop,
49 Unprepare,
50}
51
52#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub enum TransitionOk {
55 Complete {
57 origin: TaskState,
58 target: TaskState,
59 },
60 NotWaiting { trigger: Trigger, origin: TaskState },
67 Skipped { trigger: Trigger, state: TaskState },
69}
70
71#[derive(Clone, Debug, Eq, PartialEq)]
73pub struct TransitionError {
74 pub trigger: Trigger,
75 pub state: TaskState,
76 pub err_msg: gst::ErrorMessage,
77}
78
79impl fmt::Display for TransitionError {
80 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81 write!(
82 f,
83 "{:?} from state {:?}: {:?}",
84 self.trigger, self.state, self.err_msg
85 )
86 }
87}
88
89impl std::error::Error for TransitionError {}
90
91impl From<TransitionError> for gst::ErrorMessage {
92 fn from(err: TransitionError) -> Self {
93 err.err_msg
94 }
95}
96
97#[must_use = "This `TransitionStatus` may be `Pending`. In most cases it should be awaited. See `await_maybe_on_context`"]
103pub enum TransitionStatus {
104 Ready(Result<TransitionOk, TransitionError>),
106 Pending {
108 trigger: Trigger,
109 origin: TaskState,
110 res_fut: Pin<Box<dyn Future<Output = Result<TransitionOk, TransitionError>> + Send>>,
111 },
112}
113
114impl TransitionStatus {
115 pub fn is_ready(&self) -> bool {
116 matches!(self, TransitionStatus::Ready { .. })
117 }
118
119 pub fn is_pending(&self) -> bool {
120 matches!(self, TransitionStatus::Pending { .. })
121 }
122
123 pub fn check(self) -> Result<TransitionStatus, TransitionError> {
131 match self {
132 TransitionStatus::Ready(Err(err)) => Err(err),
133 other => Ok(other),
134 }
135 }
136
137 pub fn block_on_or_add_subtask<O>(self, obj: &O) -> Result<TransitionOk, TransitionError>
191 where
192 O: IsA<glib::Object> + Send,
193 {
194 use TransitionStatus::*;
195 match self {
196 Pending {
197 trigger,
198 origin,
199 res_fut,
200 } => {
201 if let Some((ctx, task_id)) = Context::current_task() {
202 gst::debug!(
203 RUNTIME_CAT,
204 obj = obj,
205 "Awaiting for {trigger:?} ack in a subtask on context {}",
206 ctx.name()
207 );
208
209 let obj = obj.clone();
210 let _ = ctx.add_sub_task(task_id, async move {
211 let res = res_fut.await;
212 match res {
213 Ok(status) => {
214 gst::log!(
215 RUNTIME_CAT,
216 obj = obj,
217 "Task {trigger:?} success: {status:?}",
218 );
219 }
220 Err(err) => {
221 gst::error!(
222 RUNTIME_CAT,
223 obj = obj,
224 "Task {trigger:?} failure: {err}"
225 );
226 }
227 }
228
229 Ok(())
230 });
231
232 Ok(TransitionOk::NotWaiting { trigger, origin })
233 } else {
234 gst::debug!(
235 RUNTIME_CAT,
236 obj = obj,
237 "Awaiting for {trigger:?} ack on current thread",
238 );
239 let res = futures::executor::block_on(res_fut);
240 match res {
241 Ok(ref status) => {
242 gst::log!(
243 RUNTIME_CAT,
244 obj = obj,
245 "Task {trigger:?} success: {status:?}",
246 );
247 }
248 Err(ref err) => {
249 gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}");
250 }
251 }
252
253 res
254 }
255 }
256 Ready(res) => {
257 match res {
258 Ok(ref status) => {
259 gst::log!(
260 RUNTIME_CAT,
261 obj = obj,
262 "Task transition immediate success: {status:?}",
263 );
264 }
265 Err(ref err) => {
266 gst::error!(
267 RUNTIME_CAT,
268 obj = obj,
269 "Task transition immediate failure: {err}",
270 );
271 }
272 }
273
274 res
275 }
276 }
277 }
278
279 pub fn block_on_or_add_subtask_then<T, F>(
317 self,
318 obj: glib::BorrowedObject<'_, T>,
319 func: F,
320 ) -> Result<(), gst::ErrorMessage>
321 where
322 T: IsA<glib::Object> + Send,
323 F: FnOnce(&T, &Result<TransitionOk, TransitionError>) + Send + 'static,
324 {
325 use TransitionStatus::*;
326 match self {
327 Pending {
328 trigger, res_fut, ..
329 } => {
330 if let Some((ctx, task_id)) = Context::current_task() {
331 gst::debug!(
332 RUNTIME_CAT,
333 obj = obj,
334 "Awaiting for {trigger:?} ack in a subtask on context {}",
335 ctx.name()
336 );
337 let obj = obj.clone();
338 let _ = ctx.add_sub_task(task_id, async move {
339 let res = res_fut.await;
340 match res {
341 Ok(ref status) => {
342 gst::log!(
343 RUNTIME_CAT,
344 obj = obj,
345 "Task {trigger:?} success: {status:?}",
346 );
347 func(&obj, &res);
348 Ok(())
349 }
350 Err(ref err) => {
351 gst::error!(
352 RUNTIME_CAT,
353 obj = obj,
354 "Task {trigger:?} failure: {err}",
355 );
356 func(&obj, &res);
357 Err(gst::FlowError::Error)
358 }
359 }
360 });
361
362 Ok(())
363 } else {
364 gst::debug!(
365 RUNTIME_CAT,
366 obj = obj,
367 "Awaiting for {trigger:?} ack on current thread",
368 );
369 let res = futures::executor::block_on(res_fut);
370 match res {
371 Ok(ref status) => {
372 gst::log!(
373 RUNTIME_CAT,
374 obj = obj,
375 "Task {trigger:?} success: {status:?}",
376 );
377 func(&obj, &res);
378 Ok(())
379 }
380 Err(ref err) => {
381 gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}",);
382 func(&obj, &res);
383 res.map(|_| ()).map_err(|err| err.into())
384 }
385 }
386 }
387 }
388 Ready(res) => match res {
389 Ok(ref status) => {
390 gst::log!(
391 RUNTIME_CAT,
392 obj = obj,
393 "Task transition immediate success: {status:?}",
394 );
395 func(&obj, &res);
396 Ok(())
397 }
398 Err(ref err) => {
399 gst::error!(
400 RUNTIME_CAT,
401 obj = obj,
402 "Task transition immediate failure: {err}",
403 );
404 func(&obj, &res);
405 res.map(|_| ()).map_err(|err| err.into())
406 }
407 },
408 }
409 }
410}
411
412impl Future for TransitionStatus {
413 type Output = Result<TransitionOk, TransitionError>;
414
415 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
416 use TransitionStatus::*;
417
418 match &mut *self {
419 Ready(res) => Poll::Ready(res.clone()),
420 Pending { res_fut, .. } => match Pin::new(res_fut).poll(cx) {
421 Poll::Pending => Poll::Pending,
422 Poll::Ready(res) => {
423 *self = Ready(res.clone());
424
425 Poll::Ready(res)
426 }
427 },
428 }
429 }
430}
431
432impl From<TransitionOk> for TransitionStatus {
433 fn from(ok: TransitionOk) -> Self {
434 Self::Ready(Ok(ok))
435 }
436}
437
438impl From<TransitionError> for TransitionStatus {
439 fn from(err: TransitionError) -> Self {
440 Self::Ready(Err(err))
441 }
442}
443
444impl fmt::Debug for TransitionStatus {
446 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447 use TransitionStatus::*;
448 match self {
449 Ready(res) => f.debug_tuple("Ready").field(res).finish(),
450 Pending {
451 trigger, origin, ..
452 } => f
453 .debug_struct("Pending")
454 .field("trigger", trigger)
455 .field("origin", origin)
456 .finish(),
457 }
458 }
459}
460
461pub trait TaskImpl: Send + 'static {
465 type Item: Send + 'static;
466
467 fn obj(&self) -> &impl IsA<glib::Object>;
468
469 fn prepare(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
470 future::ok(())
471 }
472
473 fn unprepare(&mut self) -> impl Future<Output = ()> + Send {
474 future::ready(())
475 }
476
477 fn start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
478 future::ok(())
479 }
480
481 fn try_next(&mut self) -> impl Future<Output = Result<Self::Item, gst::FlowError>> + Send;
494
495 fn handle_item(
506 &mut self,
507 _item: Self::Item,
508 ) -> impl Future<Output = Result<(), gst::FlowError>> + Send;
509
510 fn pause(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
511 future::ok(())
512 }
513
514 fn flush_start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
515 future::ready(Ok(()))
516 }
517
518 fn flush_stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
519 future::ready(Ok(()))
520 }
521
522 fn stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
523 future::ready(Ok(()))
524 }
525
526 fn handle_loop_error(&mut self, err: gst::FlowError) -> impl Future<Output = Trigger> + Send {
541 async move {
542 match err {
543 gst::FlowError::Flushing => {
544 gst::debug!(
545 RUNTIME_CAT,
546 obj = self.obj(),
547 "Task loop returned Flushing. Posting FlushStart"
548 );
549 Trigger::FlushStart
550 }
551 gst::FlowError::Eos => {
552 gst::debug!(
553 RUNTIME_CAT,
554 obj = self.obj(),
555 "Task loop returned Eos. Posting Stop"
556 );
557 Trigger::Stop
558 }
559 other => {
560 gst::error!(
561 RUNTIME_CAT,
562 obj = self.obj(),
563 "Task loop returned {other:?}. Posting Error",
564 );
565 Trigger::Error
566 }
567 }
568 }
569 }
570
571 fn handle_action_error(
582 &mut self,
583 trigger: Trigger,
584 state: TaskState,
585 err: gst::ErrorMessage,
586 ) -> impl Future<Output = Trigger> + Send {
587 async move {
588 gst::error!(
589 RUNTIME_CAT,
590 obj = self.obj(),
591 "TaskImpl transition action error during {trigger:?} from {state:?}: {err:?}. Posting Trigger::Error",
592 );
593
594 Trigger::Error
595 }
596 }
597}
598
599type AckSender = oneshot::Sender<Result<TransitionOk, TransitionError>>;
600type AckReceiver = oneshot::Receiver<Result<TransitionOk, TransitionError>>;
601
602struct TriggeringEvent {
603 trigger: Trigger,
604 ack_tx: AckSender,
605}
606
607impl TriggeringEvent {
608 fn new(trigger: Trigger) -> (Self, AckReceiver) {
609 let (ack_tx, ack_rx) = oneshot::channel();
610 let req = TriggeringEvent { trigger, ack_tx };
611
612 (req, ack_rx)
613 }
614
615 fn send_ack(self, res: Result<TransitionOk, TransitionError>) {
616 let _ = self.ack_tx.send(res);
617 }
618
619 fn send_err_ack(self) {
620 let res = Err(TransitionError {
621 trigger: self.trigger,
622 state: TaskState::Error,
623 err_msg: gst::error_msg!(
624 gst::CoreError::StateChange,
625 [
626 "Triggering Event {:?} rejected due to a previous unrecoverable error",
627 self.trigger,
628 ]
629 ),
630 });
631
632 self.send_ack(res);
633 }
634}
635
636impl fmt::Debug for TriggeringEvent {
637 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
638 f.debug_struct("TriggeringEvent")
639 .field("trigger", &self.trigger)
640 .finish()
641 }
642}
643
644#[derive(Debug)]
645struct StateMachineHandle {
646 join_handle: JoinHandle<()>,
647 triggering_evt_tx: async_mpsc::Sender<TriggeringEvent>,
648 context: Context,
649}
650
651impl StateMachineHandle {
652 fn trigger(&mut self, trigger: Trigger) -> AckReceiver {
653 let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
654
655 gst::log!(RUNTIME_CAT, "Pushing {triggering_evt:?}");
656 self.triggering_evt_tx.try_send(triggering_evt).unwrap();
657
658 self.context.unpark();
659
660 ack_rx
661 }
662
663 async fn join(self) {
664 self.join_handle
665 .await
666 .expect("state machine shouldn't have been cancelled");
667 }
668}
669
670#[derive(Debug)]
671struct TaskInner {
672 state: TaskState,
673 state_machine_handle: Option<StateMachineHandle>,
674}
675
676impl Default for TaskInner {
677 fn default() -> Self {
678 TaskInner {
679 state: TaskState::Unprepared,
680 state_machine_handle: None,
681 }
682 }
683}
684
685impl TaskInner {
686 fn switch_to_state(&mut self, target_state: TaskState, triggering_evt: TriggeringEvent) {
687 let res = Ok(TransitionOk::Complete {
688 origin: self.state,
689 target: target_state,
690 });
691
692 self.state = target_state;
693 triggering_evt.send_ack(res);
694 }
695
696 fn switch_to_err(&mut self, triggering_evt: TriggeringEvent) {
697 let res = Err(TransitionError {
698 trigger: triggering_evt.trigger,
699 state: self.state,
700 err_msg: gst::error_msg!(
701 gst::CoreError::StateChange,
702 [
703 "Unrecoverable error for {triggering_evt:?} from state {:?}",
704 self.state,
705 ]
706 ),
707 });
708
709 self.state = TaskState::Error;
710 triggering_evt.send_ack(res);
711 }
712
713 fn skip_triggering_evt(&mut self, triggering_evt: TriggeringEvent) {
714 let res = Ok(TransitionOk::Skipped {
715 trigger: triggering_evt.trigger,
716 state: self.state,
717 });
718
719 triggering_evt.send_ack(res);
720 }
721
722 fn trigger(&mut self, trigger: Trigger) -> Result<AckReceiver, TransitionError> {
723 self.state_machine_handle
724 .as_mut()
725 .map(|state_machine| state_machine.trigger(trigger))
726 .ok_or_else(|| {
727 gst::warning!(RUNTIME_CAT, "Unable to send {trigger:?}: no state machine",);
728 TransitionError {
729 trigger,
730 state: TaskState::Unprepared,
731 err_msg: gst::error_msg!(
732 gst::ResourceError::NotFound,
733 ["Unable to send {trigger:?}: no state machine"]
734 ),
735 }
736 })
737 }
738}
739
740impl Drop for TaskInner {
741 fn drop(&mut self) {
742 if self.state != TaskState::Unprepared {
743 gst::fixme!(RUNTIME_CAT, "Missing call to `Task::unprepare`");
747 }
748 }
749}
750
751pub struct TaskStateGuard<'guard>(MutexGuard<'guard, TaskInner>);
753
754impl Deref for TaskStateGuard<'_> {
755 type Target = TaskState;
756
757 fn deref(&self) -> &Self::Target {
758 &(self.0).state
759 }
760}
761
762#[derive(Debug, Clone)]
766pub struct Task(Arc<Mutex<TaskInner>>);
767
768impl Default for Task {
769 fn default() -> Self {
770 Task(Arc::new(Mutex::new(TaskInner::default())))
771 }
772}
773
774impl Task {
775 pub fn state(&self) -> TaskState {
776 self.0.lock().unwrap().state
777 }
778
779 pub fn lock_state(&self) -> TaskStateGuard<'_> {
780 TaskStateGuard(self.0.lock().unwrap())
781 }
782
783 pub fn prepare(&self, task_impl: impl TaskImpl, context: Context) -> TransitionStatus {
784 let mut inner = self.0.lock().unwrap();
785
786 let origin = inner.state;
787 match origin {
788 TaskState::Unprepared => (),
789 TaskState::Prepared | TaskState::Preparing => {
790 gst::debug!(
791 RUNTIME_CAT,
792 obj = task_impl.obj(),
793 "Task already {origin:?}",
794 );
795 return TransitionOk::Skipped {
796 trigger: Trigger::Prepare,
797 state: origin,
798 }
799 .into();
800 }
801 state => {
802 gst::warning!(
803 RUNTIME_CAT,
804 obj = task_impl.obj(),
805 "Attempt to prepare Task in state {state:?}"
806 );
807 return TransitionError {
808 trigger: Trigger::Prepare,
809 state: inner.state,
810 err_msg: gst::error_msg!(
811 gst::CoreError::StateChange,
812 ["Attempt to prepare Task in state {state:?}"]
813 ),
814 }
815 .into();
816 }
817 }
818
819 assert!(inner.state_machine_handle.is_none());
820
821 inner.state = TaskState::Preparing;
822
823 gst::log!(
824 RUNTIME_CAT,
825 obj = task_impl.obj(),
826 "Spawning task state machine"
827 );
828 inner.state_machine_handle = Some(StateMachine::spawn(self.0.clone(), task_impl, context));
829
830 let ack_rx = match inner.trigger(Trigger::Prepare) {
831 Ok(ack_rx) => ack_rx,
832 Err(err) => return err.into(),
833 };
834 drop(inner);
835
836 TransitionStatus::Pending {
837 trigger: Trigger::Prepare,
838 origin: TaskState::Unprepared,
839 res_fut: Box::pin(ack_rx.map(Result::unwrap)),
840 }
841 }
842
843 pub fn unprepare(&self) -> TransitionStatus {
844 let mut inner = self.0.lock().unwrap();
845
846 let origin = inner.state;
847 let mut state_machine_handle = match origin {
848 TaskState::Stopped
849 | TaskState::Error
850 | TaskState::Prepared
851 | TaskState::Preparing
852 | TaskState::Unprepared => match inner.state_machine_handle.take() {
853 Some(state_machine_handle) => {
854 gst::debug!(RUNTIME_CAT, "Unpreparing task");
855
856 state_machine_handle
857 }
858 None => {
859 gst::debug!(RUNTIME_CAT, "Task already unpreparing");
860 return TransitionOk::Skipped {
861 trigger: Trigger::Unprepare,
862 state: origin,
863 }
864 .into();
865 }
866 },
867 state => {
868 gst::warning!(RUNTIME_CAT, "Attempt to unprepare Task in state {state:?}");
869 return TransitionError {
870 trigger: Trigger::Unprepare,
871 state: inner.state,
872 err_msg: gst::error_msg!(
873 gst::CoreError::StateChange,
874 ["Attempt to unprepare Task in state {state:?}"]
875 ),
876 }
877 .into();
878 }
879 };
880
881 let ack_rx = state_machine_handle.trigger(Trigger::Unprepare);
882 drop(inner);
883
884 let state_machine_end_fut = async {
885 state_machine_handle.join().await;
886 ack_rx.await.unwrap()
887 };
888
889 TransitionStatus::Pending {
890 trigger: Trigger::Unprepare,
891 origin,
892 res_fut: Box::pin(state_machine_end_fut),
893 }
894 }
895
896 pub fn start(&self) -> TransitionStatus {
900 let mut inner = self.0.lock().unwrap();
901
902 if let TaskState::Started = inner.state {
903 return TransitionOk::Skipped {
904 trigger: Trigger::Start,
905 state: TaskState::Started,
906 }
907 .into();
908 }
909
910 let ack_rx = match inner.trigger(Trigger::Start) {
911 Ok(ack_rx) => ack_rx,
912 Err(err) => return err.into(),
913 };
914
915 let origin = inner.state;
916 drop(inner);
917
918 TransitionStatus::Pending {
919 trigger: Trigger::Start,
920 origin,
921 res_fut: Box::pin(ack_rx.map(Result::unwrap)),
922 }
923 }
924
925 pub fn pause(&self) -> TransitionStatus {
930 self.push_pending(Trigger::Pause)
931 }
932
933 pub fn flush_start(&self) -> TransitionStatus {
934 self.push_pending(Trigger::FlushStart)
935 }
936
937 pub fn flush_stop(&self) -> TransitionStatus {
938 self.push_pending(Trigger::FlushStop)
939 }
940
941 pub fn stop(&self) -> TransitionStatus {
943 self.push_pending(Trigger::Stop)
944 }
945
946 fn push_pending(&self, trigger: Trigger) -> TransitionStatus {
948 let mut inner = self.0.lock().unwrap();
949
950 let ack_rx = match inner.trigger(trigger) {
951 Ok(ack_rx) => ack_rx,
952 Err(err) => return err.into(),
953 };
954
955 let origin = inner.state;
956 drop(inner);
957
958 TransitionStatus::Pending {
959 trigger,
960 origin,
961 res_fut: Box::pin(ack_rx.map(Result::unwrap)),
962 }
963 }
964}
965
966struct StateMachine<Task: TaskImpl> {
967 task_impl: Task,
968 triggering_evt_rx: async_mpsc::Receiver<TriggeringEvent>,
969 pending_triggering_evt: Option<TriggeringEvent>,
970}
971
972macro_rules! exec_action {
973 ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
974 match $self.task_impl.$action().await {
975 Ok(()) => Ok($triggering_evt),
976 Err(err) => {
977 let next_trigger = $self
981 .task_impl
982 .handle_action_error($triggering_evt.trigger, $origin, err)
983 .await;
984
985 gst::trace!(
987 RUNTIME_CAT,
988 obj = $self.task_impl.obj(),
989 "TaskImpl transition action error: converting {:?} to {next_trigger:?}",
990 $triggering_evt.trigger,
991 );
992
993 $triggering_evt.trigger = next_trigger;
994 $self.pending_triggering_evt = Some($triggering_evt);
995
996 Err(())
997 }
998 }
999 }};
1000}
1001
1002impl<Task: TaskImpl> StateMachine<Task> {
1003 fn spawn(
1004 task_inner: Arc<Mutex<TaskInner>>,
1005 task_impl: Task,
1006 context: Context,
1007 ) -> StateMachineHandle {
1008 let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
1009
1010 let state_machine = StateMachine {
1011 task_impl,
1012 triggering_evt_rx,
1013 pending_triggering_evt: None,
1014 };
1015
1016 StateMachineHandle {
1017 join_handle: context.spawn_and_unpark(state_machine.run(task_inner)),
1018 triggering_evt_tx,
1019 context,
1020 }
1021 }
1022
1023 async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>) {
1024 let mut triggering_evt = self
1025 .triggering_evt_rx
1026 .next()
1027 .await
1028 .expect("triggering_evt_rx dropped");
1029
1030 if let Trigger::Prepare = triggering_evt.trigger {
1031 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Preparing task");
1032
1033 let res = exec_action!(
1034 self,
1035 prepare,
1036 triggering_evt,
1037 TaskState::Unprepared,
1038 &task_inner
1039 );
1040 if let Ok(triggering_evt) = res {
1041 let mut task_inner = task_inner.lock().unwrap();
1042 let res = Ok(TransitionOk::Complete {
1043 origin: TaskState::Unprepared,
1044 target: TaskState::Prepared,
1045 });
1046
1047 task_inner.state = TaskState::Prepared;
1048 triggering_evt.send_ack(res);
1049
1050 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task Prepared");
1051 }
1052 } else {
1053 panic!("Unexpected initial trigger {:?}", triggering_evt.trigger);
1054 }
1055
1056 loop {
1057 triggering_evt = match self.pending_triggering_evt.take() {
1058 Some(pending_triggering_evt) => pending_triggering_evt,
1059 None => self
1060 .triggering_evt_rx
1061 .next()
1062 .await
1063 .expect("triggering_evt_rx dropped"),
1064 };
1065 gst::trace!(
1066 RUNTIME_CAT,
1067 obj = self.task_impl.obj(),
1068 "State machine popped {triggering_evt:?}"
1069 );
1070
1071 match triggering_evt.trigger {
1072 Trigger::Error => {
1073 let mut task_inner = task_inner.lock().unwrap();
1074 task_inner.switch_to_err(triggering_evt);
1075 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Switched to Error");
1076 }
1077 Trigger::Start => {
1078 let origin = {
1079 let mut task_inner = task_inner.lock().unwrap();
1080 let origin = task_inner.state;
1081 match origin {
1082 TaskState::Stopped | TaskState::Paused | TaskState::Prepared => (),
1083 TaskState::PausedFlushing => {
1084 task_inner.switch_to_state(TaskState::Flushing, triggering_evt);
1085 gst::trace!(
1086 RUNTIME_CAT,
1087 obj = self.task_impl.obj(),
1088 "Switched from PausedFlushing to Flushing"
1089 );
1090 continue;
1091 }
1092 TaskState::Error => {
1093 triggering_evt.send_err_ack();
1094 continue;
1095 }
1096 state => {
1097 task_inner.skip_triggering_evt(triggering_evt);
1098 gst::trace!(
1099 RUNTIME_CAT,
1100 obj = self.task_impl.obj(),
1101 "Skipped Start in state {state:?}"
1102 );
1103 continue;
1104 }
1105 }
1106
1107 origin
1108 };
1109
1110 self.start(triggering_evt, origin, &task_inner).await;
1111 }
1113 Trigger::Pause => {
1114 let (origin, target) = {
1115 let mut task_inner = task_inner.lock().unwrap();
1116 let origin = task_inner.state;
1117 match origin {
1118 TaskState::Started | TaskState::Stopped | TaskState::Prepared => {
1119 (origin, TaskState::Paused)
1120 }
1121 TaskState::Flushing => (origin, TaskState::PausedFlushing),
1122 TaskState::Error => (TaskState::Error, TaskState::Error),
1123 state => {
1124 task_inner.skip_triggering_evt(triggering_evt);
1125 gst::trace!(
1126 RUNTIME_CAT,
1127 obj = self.task_impl.obj(),
1128 "Skipped Pause in state {state:?}"
1129 );
1130 continue;
1131 }
1132 }
1133 };
1134
1135 let res = exec_action!(self, pause, triggering_evt, origin, &task_inner);
1136 if let Ok(triggering_evt) = res {
1137 task_inner
1138 .lock()
1139 .unwrap()
1140 .switch_to_state(target, triggering_evt);
1141 gst::trace!(
1142 RUNTIME_CAT,
1143 obj = self.task_impl.obj(),
1144 "Task loop {target:?}"
1145 );
1146 }
1147 }
1148 Trigger::Stop => {
1149 let (origin, target) = {
1150 let mut task_inner = task_inner.lock().unwrap();
1151 let origin = task_inner.state;
1152 match origin {
1153 TaskState::Started
1154 | TaskState::Paused
1155 | TaskState::PausedFlushing
1156 | TaskState::Flushing => (origin, TaskState::Stopped),
1157 TaskState::Error => (TaskState::Error, TaskState::Error),
1158 state => {
1159 task_inner.skip_triggering_evt(triggering_evt);
1160 gst::trace!(
1161 RUNTIME_CAT,
1162 obj = self.task_impl.obj(),
1163 "Skipped Stop in state {state:?}"
1164 );
1165 continue;
1166 }
1167 }
1168 };
1169
1170 let res = exec_action!(self, stop, triggering_evt, origin, &task_inner);
1171 if let Ok(triggering_evt) = res {
1172 task_inner
1173 .lock()
1174 .unwrap()
1175 .switch_to_state(target, triggering_evt);
1176 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1177 }
1178 }
1179 Trigger::FlushStart => {
1180 let (origin, target) = {
1181 let mut task_inner = task_inner.lock().unwrap();
1182 let origin = task_inner.state;
1183 match origin {
1184 TaskState::Started => (origin, TaskState::Flushing),
1185 TaskState::Paused => (origin, TaskState::PausedFlushing),
1186 TaskState::Error => (TaskState::Error, TaskState::Error),
1187 state => {
1188 task_inner.skip_triggering_evt(triggering_evt);
1189 gst::trace!(
1190 RUNTIME_CAT,
1191 obj = self.task_impl.obj(),
1192 "Skipped FlushStart in state {state:?}"
1193 );
1194 continue;
1195 }
1196 }
1197 };
1198
1199 let res = exec_action!(self, flush_start, triggering_evt, origin, &task_inner);
1200 if let Ok(triggering_evt) = res {
1201 task_inner
1202 .lock()
1203 .unwrap()
1204 .switch_to_state(target, triggering_evt);
1205 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1206 }
1207 }
1208 Trigger::FlushStop => {
1209 let origin = task_inner.lock().unwrap().state;
1210 let is_paused = match origin {
1211 TaskState::Flushing => false,
1212 TaskState::PausedFlushing => true,
1213 TaskState::Error => {
1214 triggering_evt.send_err_ack();
1215 continue;
1216 }
1217 state => {
1218 task_inner
1219 .lock()
1220 .unwrap()
1221 .skip_triggering_evt(triggering_evt);
1222 gst::trace!(
1223 RUNTIME_CAT,
1224 obj = self.task_impl.obj(),
1225 "Skipped FlushStop in state {state:?}"
1226 );
1227 continue;
1228 }
1229 };
1230
1231 let res = exec_action!(self, flush_stop, triggering_evt, origin, &task_inner);
1232 if let Ok(triggering_evt) = res {
1233 if is_paused {
1234 task_inner
1235 .lock()
1236 .unwrap()
1237 .switch_to_state(TaskState::Paused, triggering_evt);
1238 gst::trace!(
1239 RUNTIME_CAT,
1240 obj = self.task_impl.obj(),
1241 "Switched from PausedFlushing to Paused"
1242 );
1243 } else {
1244 self.start(triggering_evt, origin, &task_inner).await;
1245 }
1247 }
1248 }
1249 Trigger::Unprepare => {
1250 self.task_impl.unprepare().await;
1252
1253 task_inner
1254 .lock()
1255 .unwrap()
1256 .switch_to_state(TaskState::Unprepared, triggering_evt);
1257
1258 break;
1259 }
1260 _ => unreachable!("State machine handler {:?}", triggering_evt),
1261 }
1262 }
1263
1264 gst::trace!(
1265 RUNTIME_CAT,
1266 obj = self.task_impl.obj(),
1267 "Task state machine terminated"
1268 );
1269 }
1270
1271 async fn start(
1272 &mut self,
1273 mut triggering_evt: TriggeringEvent,
1274 origin: TaskState,
1275 task_inner: &Arc<Mutex<TaskInner>>,
1276 ) {
1277 match exec_action!(self, start, triggering_evt, origin, &task_inner) {
1278 Ok(triggering_evt) => {
1279 let mut task_inner = task_inner.lock().unwrap();
1280 task_inner.switch_to_state(TaskState::Started, triggering_evt);
1281 }
1282 Err(_) => {
1283 return;
1285 }
1286 }
1287
1288 match self.run_loop().await {
1289 Ok(()) => (),
1290 Err(err) => {
1291 let next_trigger = self.task_impl.handle_loop_error(err).await;
1292 let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
1293 self.pending_triggering_evt = Some(triggering_evt);
1294 }
1295 }
1296 }
1297
1298 async fn run_loop(&mut self) -> Result<(), gst::FlowError> {
1299 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task loop started");
1300
1301 let mut try_next_res;
1302 loop {
1303 try_next_res = {
1304 let mut try_next_fut = pin!(self.task_impl.try_next().fuse());
1311 futures::select_biased! {
1312 triggering_evt = self.triggering_evt_rx.next() => {
1313 let triggering_evt = triggering_evt.expect("broken state machine channel");
1314 gst::trace!(
1315 RUNTIME_CAT,
1316 "Task loop handing {:?} to state machine",
1317 triggering_evt,
1318 );
1319 self.pending_triggering_evt = Some(triggering_evt);
1320 return Ok(());
1321 }
1322 try_next_res = try_next_fut => try_next_res,
1323 }
1324 };
1325
1326 let item = try_next_res.inspect_err(|err| {
1327 gst::debug!(
1328 RUNTIME_CAT,
1329 obj = self.task_impl.obj(),
1330 "TaskImpl::try_next returned {err:?}"
1331 );
1332 })?;
1333
1334 self.task_impl.handle_item(item).await.inspect_err(|&err| {
1335 gst::debug!(
1336 RUNTIME_CAT,
1337 obj = self.task_impl.obj(),
1338 "TaskImpl::handle_item returned {err:?}"
1339 );
1340 })?;
1341 }
1342 }
1343}
1344
1345#[cfg(test)]
1346mod tests {
1347 use futures::channel::{mpsc, oneshot};
1348 use futures::executor::block_on;
1349 use futures::prelude::*;
1350 use gst::glib;
1351 use gst::glib::prelude::*;
1352 use std::future::pending;
1353 use std::time::Duration;
1354
1355 use super::{
1356 Task, TaskImpl,
1357 TaskState::{self, *},
1358 TransitionError, TransitionOk,
1359 TransitionOk::*,
1360 TransitionStatus,
1361 TransitionStatus::*,
1362 Trigger::{self, *},
1363 };
1364 use crate::runtime::{Context, RUNTIME_CAT};
1365
1366 impl TransitionStatus {
1367 fn block_on(self) -> Result<TransitionOk, TransitionError> {
1370 assert!(!Context::is_context_thread());
1371 match self {
1372 Pending {
1373 trigger, res_fut, ..
1374 } => {
1375 gst::debug!(
1376 RUNTIME_CAT,
1377 "Awaiting for {:?} ack on current thread",
1378 trigger,
1379 );
1380 futures::executor::block_on(res_fut)
1381 }
1382 Ready(res) => res,
1383 }
1384 }
1385 }
1386
1387 #[track_caller]
1388 fn stop_then_unprepare(task: Task) {
1389 task.stop().block_on().unwrap();
1390 task.unprepare().block_on().unwrap();
1391 }
1392
1393 #[test]
1394 fn nominal() {
1395 gst::init().unwrap();
1396
1397 struct TaskTest {
1398 obj: gst::Object,
1399 prepared_sender: mpsc::Sender<()>,
1400 started_sender: mpsc::Sender<()>,
1401 try_next_ready_sender: mpsc::Sender<()>,
1402 try_next_receiver: mpsc::Receiver<()>,
1403 handle_item_ready_sender: mpsc::Sender<()>,
1404 handle_item_sender: mpsc::Sender<()>,
1405 paused_sender: mpsc::Sender<()>,
1406 stopped_sender: mpsc::Sender<()>,
1407 unprepared_sender: mpsc::Sender<()>,
1408 }
1409
1410 impl TaskImpl for TaskTest {
1411 type Item = ();
1412
1413 fn obj(&self) -> &impl IsA<glib::Object> {
1414 &self.obj
1415 }
1416
1417 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1418 gst::debug!(RUNTIME_CAT, "nominal: prepared");
1419 self.prepared_sender.send(()).await.unwrap();
1420 Ok(())
1421 }
1422
1423 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1424 gst::debug!(RUNTIME_CAT, "nominal: started");
1425 self.started_sender.send(()).await.unwrap();
1426 Ok(())
1427 }
1428
1429 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1430 gst::debug!(RUNTIME_CAT, "nominal: entering try_next");
1431 self.try_next_ready_sender.send(()).await.unwrap();
1432 gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next");
1433 self.try_next_receiver.next().await.unwrap();
1434 Ok(())
1435 }
1436
1437 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1438 gst::debug!(RUNTIME_CAT, "nominal: entering handle_item");
1439 self.handle_item_ready_sender.send(()).await.unwrap();
1440
1441 gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item");
1442 self.handle_item_sender.send(()).await.unwrap();
1443 gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item");
1444
1445 Ok(())
1446 }
1447
1448 async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
1449 gst::debug!(RUNTIME_CAT, "nominal: paused");
1450 self.paused_sender.send(()).await.unwrap();
1451 Ok(())
1452 }
1453
1454 async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
1455 gst::debug!(RUNTIME_CAT, "nominal: stopped");
1456 self.stopped_sender.send(()).await.unwrap();
1457 Ok(())
1458 }
1459
1460 async fn unprepare(&mut self) {
1461 gst::debug!(RUNTIME_CAT, "nominal: unprepared");
1462 self.unprepared_sender.send(()).await.unwrap();
1463 }
1464 }
1465
1466 let context = Context::acquire("nominal", Duration::from_millis(2)).unwrap();
1467
1468 let task = Task::default();
1469
1470 assert_eq!(task.state(), Unprepared);
1471
1472 gst::debug!(RUNTIME_CAT, "nominal: preparing");
1473
1474 let (prepared_sender, mut prepared_receiver) = mpsc::channel(1);
1475 let (started_sender, mut started_receiver) = mpsc::channel(1);
1476 let (try_next_ready_sender, mut try_next_ready_receiver) = mpsc::channel(1);
1477 let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
1478 let (handle_item_ready_sender, mut handle_item_ready_receiver) = mpsc::channel(1);
1479 let (handle_item_sender, mut handle_item_receiver) = mpsc::channel(0);
1480 let (paused_sender, mut paused_receiver) = mpsc::channel(1);
1481 let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
1482 let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
1483 let obj = gst::Pad::builder(gst::PadDirection::Unknown)
1484 .name("runtime::Task::nominal")
1485 .build();
1486 let prepare_status = task.prepare(
1487 TaskTest {
1488 obj: obj.clone().into(),
1489 prepared_sender,
1490 started_sender,
1491 try_next_ready_sender,
1492 try_next_receiver,
1493 handle_item_ready_sender,
1494 handle_item_sender,
1495 paused_sender,
1496 stopped_sender,
1497 unprepared_sender,
1498 },
1499 context,
1500 );
1501
1502 assert!(prepare_status.is_pending());
1503 match prepare_status {
1504 Pending {
1505 trigger: Prepare,
1506 origin: Unprepared,
1507 ..
1508 } => (),
1509 other => panic!("{other:?}"),
1510 };
1511
1512 gst::debug!(RUNTIME_CAT, "nominal: starting (async prepare)");
1513 let start_status = task.start().check().unwrap();
1514
1515 block_on(prepared_receiver.next()).unwrap();
1516 assert_eq!(
1518 prepare_status.block_on_or_add_subtask(&obj).unwrap(),
1519 Complete {
1520 origin: Unprepared,
1521 target: Prepared,
1522 },
1523 );
1524
1525 block_on(started_receiver.next()).unwrap();
1526 assert_eq!(
1527 start_status.block_on().unwrap(),
1528 Complete {
1529 origin: Prepared,
1530 target: Started,
1531 }
1532 );
1533 assert_eq!(task.state(), Started);
1534
1535 block_on(try_next_ready_receiver.next()).unwrap();
1537 block_on(try_next_sender.send(())).unwrap();
1538 block_on(handle_item_ready_receiver.next()).unwrap();
1539 block_on(handle_item_receiver.next()).unwrap();
1540
1541 gst::debug!(RUNTIME_CAT, "nominal: starting (redundant)");
1542 assert_eq!(
1544 task.start().block_on().unwrap(),
1545 Skipped {
1546 trigger: Start,
1547 state: Started,
1548 },
1549 );
1550 assert_eq!(task.state(), Started);
1551
1552 match task.unprepare().check().unwrap_err() {
1554 TransitionError {
1555 trigger: Unprepare,
1556 state: Started,
1557 ..
1558 } => (),
1559 other => panic!("{other:?}"),
1560 }
1561
1562 gst::debug!(RUNTIME_CAT, "nominal: pause cancelling try_next");
1563 block_on(try_next_ready_receiver.next()).unwrap();
1564
1565 let pause_status = task.pause().check().unwrap();
1566 gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1567 block_on(paused_receiver.next()).unwrap();
1568 assert_eq!(
1569 pause_status.block_on().unwrap(),
1570 Complete {
1571 origin: Started,
1572 target: Paused,
1573 },
1574 );
1575
1576 assert!(handle_item_ready_receiver.try_next().is_err());
1578 assert!(try_next_ready_receiver.try_next().is_err());
1580
1581 gst::debug!(
1582 RUNTIME_CAT,
1583 "nominal: starting (after pause cancelling try_next)"
1584 );
1585 let start_receiver = task.start().check().unwrap();
1586 block_on(started_receiver.next()).unwrap();
1587 assert_eq!(
1588 start_receiver.block_on().unwrap(),
1589 Complete {
1590 origin: Paused,
1591 target: Started,
1592 },
1593 );
1594 assert_eq!(task.state(), Started);
1595
1596 gst::debug!(RUNTIME_CAT, "nominal: pause // handle_item");
1597 block_on(try_next_ready_receiver.next()).unwrap();
1598 block_on(try_next_sender.send(())).unwrap();
1599 block_on(handle_item_ready_receiver.next()).unwrap();
1601
1602 gst::debug!(RUNTIME_CAT, "nominal: requesting to pause");
1603 let pause_status = task.pause().check().unwrap();
1604
1605 gst::debug!(RUNTIME_CAT, "nominal: unlocking item handling");
1606 block_on(handle_item_receiver.next()).unwrap();
1607
1608 gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1609 block_on(paused_receiver.next()).unwrap();
1610 assert_eq!(
1611 pause_status.block_on().unwrap(),
1612 Complete {
1613 origin: Started,
1614 target: Paused,
1615 },
1616 );
1617
1618 assert!(try_next_ready_receiver.try_next().is_err());
1620
1621 gst::debug!(
1622 RUNTIME_CAT,
1623 "nominal: starting (after pause // handle_item)"
1624 );
1625 let start_receiver = task.start().check().unwrap();
1626 block_on(started_receiver.next()).unwrap();
1627 assert_eq!(
1628 start_receiver.block_on().unwrap(),
1629 Complete {
1630 origin: Paused,
1631 target: Started,
1632 },
1633 );
1634 assert_eq!(task.state(), Started);
1635
1636 gst::debug!(RUNTIME_CAT, "nominal: stopping");
1637 assert_eq!(
1638 task.stop().block_on().unwrap(),
1639 Complete {
1640 origin: Started,
1641 target: Stopped,
1642 },
1643 );
1644
1645 assert_eq!(task.state(), Stopped);
1646 let _ = block_on(stopped_receiver.next());
1647
1648 let _ = try_next_ready_receiver.try_next();
1650
1651 gst::debug!(RUNTIME_CAT, "nominal: starting (after stop)");
1652 assert_eq!(
1653 task.start().block_on().unwrap(),
1654 Complete {
1655 origin: Stopped,
1656 target: Started,
1657 },
1658 );
1659 let _ = block_on(started_receiver.next());
1660
1661 gst::debug!(RUNTIME_CAT, "nominal: stopping");
1662 assert_eq!(
1663 task.stop().block_on().unwrap(),
1664 Complete {
1665 origin: Started,
1666 target: Stopped,
1667 },
1668 );
1669
1670 assert_eq!(
1671 task.unprepare().block_on().unwrap(),
1672 Complete {
1673 origin: Stopped,
1674 target: Unprepared,
1675 },
1676 );
1677
1678 assert_eq!(task.state(), Unprepared);
1679 let _ = block_on(unprepared_receiver.next());
1680 }
1681
1682 #[test]
1683 fn prepare_error() {
1684 gst::init().unwrap();
1685
1686 struct TaskPrepareTest {
1687 obj: gst::Object,
1688 prepare_error_sender: mpsc::Sender<()>,
1689 }
1690
1691 impl TaskImpl for TaskPrepareTest {
1692 type Item = ();
1693
1694 fn obj(&self) -> &impl IsA<glib::Object> {
1695 &self.obj
1696 }
1697
1698 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1699 gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
1700 Err(gst::error_msg!(
1701 gst::ResourceError::Failed,
1702 ["prepare_error: intentional error"]
1703 ))
1704 }
1705
1706 async fn handle_action_error(
1707 &mut self,
1708 trigger: Trigger,
1709 state: TaskState,
1710 err: gst::ErrorMessage,
1711 ) -> Trigger {
1712 gst::debug!(
1713 RUNTIME_CAT,
1714 "prepare_error: handling prepare error {:?}",
1715 err
1716 );
1717 match (trigger, state) {
1718 (Prepare, Unprepared) => {
1719 self.prepare_error_sender.send(()).await.unwrap();
1720 }
1721 other => unreachable!("{:?}", other),
1722 }
1723 Trigger::Error
1724 }
1725
1726 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1727 unreachable!("prepare_error: try_next");
1728 }
1729
1730 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1731 unreachable!("prepare_error: handle_item");
1732 }
1733 }
1734
1735 let context = Context::acquire("prepare_error", Duration::from_millis(2)).unwrap();
1736
1737 let task = Task::default();
1738
1739 assert_eq!(task.state(), Unprepared);
1740
1741 let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
1742 let prepare_status = task.prepare(
1743 TaskPrepareTest {
1744 obj: gst::Pad::builder(gst::PadDirection::Unknown)
1745 .name("runtime::Task::prepare_error")
1746 .build()
1747 .into(),
1748 prepare_error_sender,
1749 },
1750 context,
1751 );
1752
1753 gst::debug!(
1754 RUNTIME_CAT,
1755 "prepare_error: await action error notification"
1756 );
1757 block_on(prepare_error_receiver.next()).unwrap();
1758
1759 match prepare_status.block_on().unwrap_err() {
1760 TransitionError {
1761 trigger: Trigger::Error,
1762 state: Preparing,
1763 ..
1764 } => (),
1765 other => panic!("{other:?}"),
1766 }
1767
1768 while TaskState::Error != task.state() {
1770 std::thread::sleep(Duration::from_millis(2));
1771 }
1772
1773 match task.start().block_on().unwrap_err() {
1774 TransitionError {
1775 trigger: Start,
1776 state: TaskState::Error,
1777 ..
1778 } => (),
1779 other => panic!("{other:?}"),
1780 }
1781
1782 block_on(task.unprepare()).unwrap();
1783 }
1784
1785 #[test]
1786 fn prepare_start_ok() {
1787 gst::init().unwrap();
1790
1791 struct TaskPrepareTest {
1792 obj: gst::Object,
1793 prepare_receiver: mpsc::Receiver<()>,
1794 }
1795
1796 impl TaskImpl for TaskPrepareTest {
1797 type Item = ();
1798
1799 fn obj(&self) -> &impl IsA<glib::Object> {
1800 &self.obj
1801 }
1802
1803 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1804 gst::debug!(
1805 RUNTIME_CAT,
1806 "prepare_start_ok: preparation awaiting trigger"
1807 );
1808 self.prepare_receiver.next().await.unwrap();
1809 gst::debug!(RUNTIME_CAT, "prepare_start_ok: preparation complete Ok");
1810 Ok(())
1811 }
1812
1813 async fn handle_action_error(
1814 &mut self,
1815 _trigger: Trigger,
1816 _state: TaskState,
1817 _err: gst::ErrorMessage,
1818 ) -> Trigger {
1819 unreachable!("prepare_start_ok: handle_prepare_error");
1820 }
1821
1822 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1823 gst::debug!(RUNTIME_CAT, "prepare_start_ok: started");
1824 Ok(())
1825 }
1826
1827 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1828 pending().await
1829 }
1830
1831 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1832 unreachable!("prepare_start_ok: handle_item");
1833 }
1834 }
1835
1836 let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap();
1837
1838 let task = Task::default();
1839
1840 let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
1841 let fut = task.prepare(
1842 TaskPrepareTest {
1843 obj: gst::Pad::builder(gst::PadDirection::Unknown)
1844 .name("runtime::Task::prepare_start_ok")
1845 .build()
1846 .into(),
1847 prepare_receiver,
1848 },
1849 context,
1850 );
1851 drop(fut);
1852
1853 let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
1854 let (ready_sender, ready_receiver) = oneshot::channel();
1855 let start_handle = start_ctx.spawn(async move {
1856 assert_eq!(task.state(), Preparing);
1857 gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting");
1858 let start_status = task.start();
1859 match start_status {
1860 Pending {
1861 trigger: Start,
1862 origin: Preparing,
1863 ..
1864 } => (),
1865 other => panic!("{other:?}"),
1866 }
1867 ready_sender.send(()).unwrap();
1868 assert_eq!(
1869 start_status.await.unwrap(),
1870 Complete {
1871 origin: Prepared,
1872 target: Started,
1873 },
1874 );
1875 assert_eq!(task.state(), Started);
1876
1877 let stop_status = task.stop();
1878 match stop_status {
1879 Pending {
1880 trigger: Stop,
1881 origin: Started,
1882 ..
1883 } => (),
1884 other => panic!("{other:?}"),
1885 }
1886 assert_eq!(
1887 stop_status.await.unwrap(),
1888 Complete {
1889 origin: Started,
1890 target: Stopped,
1891 },
1892 );
1893 assert_eq!(task.state(), Stopped);
1894
1895 let unprepare_status = task.unprepare();
1896 match unprepare_status {
1897 Pending {
1898 trigger: Unprepare,
1899 origin: Stopped,
1900 ..
1901 } => (),
1902 other => panic!("{other:?}"),
1903 };
1904 assert_eq!(
1905 unprepare_status.await.unwrap(),
1906 Complete {
1907 origin: Stopped,
1908 target: Unprepared,
1909 },
1910 );
1911 assert_eq!(task.state(), Unprepared);
1912 });
1913
1914 gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
1915 block_on(ready_receiver).unwrap();
1916
1917 gst::debug!(RUNTIME_CAT, "prepare_start_ok: triggering preparation");
1918 block_on(prepare_sender.send(())).unwrap();
1919
1920 block_on(start_handle).unwrap();
1921 }
1922
1923 #[test]
1924 fn prepare_start_error() {
1925 gst::init().unwrap();
1928
1929 struct TaskPrepareTest {
1930 obj: gst::Object,
1931 prepare_receiver: mpsc::Receiver<()>,
1932 prepare_error_sender: mpsc::Sender<()>,
1933 }
1934
1935 impl TaskImpl for TaskPrepareTest {
1936 type Item = ();
1937
1938 fn obj(&self) -> &impl IsA<glib::Object> {
1939 &self.obj
1940 }
1941
1942 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1943 gst::debug!(
1944 RUNTIME_CAT,
1945 "prepare_start_error: preparation awaiting trigger"
1946 );
1947 self.prepare_receiver.next().await.unwrap();
1948 gst::debug!(RUNTIME_CAT, "prepare_start_error: preparation complete Err");
1949
1950 Err(gst::error_msg!(
1951 gst::ResourceError::Failed,
1952 ["prepare_start_error: intentional error"]
1953 ))
1954 }
1955
1956 async fn handle_action_error(
1957 &mut self,
1958 trigger: Trigger,
1959 state: TaskState,
1960 err: gst::ErrorMessage,
1961 ) -> Trigger {
1962 gst::debug!(
1963 RUNTIME_CAT,
1964 "prepare_start_error: handling prepare error {:?}",
1965 err
1966 );
1967 match (trigger, state) {
1968 (Prepare, Unprepared) => {
1969 self.prepare_error_sender.send(()).await.unwrap();
1970 }
1971 other => panic!("action error for {other:?}"),
1972 }
1973 Trigger::Error
1974 }
1975
1976 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1977 unreachable!("prepare_start_error: start");
1978 }
1979
1980 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1981 unreachable!("prepare_start_error: try_next");
1982 }
1983
1984 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1985 unreachable!("prepare_start_error: handle_item");
1986 }
1987 }
1988
1989 let context = Context::acquire("prepare_start_error", Duration::from_millis(2)).unwrap();
1990
1991 let task = Task::default();
1992
1993 let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
1994 let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
1995 let prepare_status = task.prepare(
1996 TaskPrepareTest {
1997 obj: gst::Pad::builder(gst::PadDirection::Unknown)
1998 .name("runtime::Task::prepare_start_error")
1999 .build()
2000 .into(),
2001 prepare_receiver,
2002 prepare_error_sender,
2003 },
2004 context,
2005 );
2006 match prepare_status {
2007 Pending {
2008 trigger: Prepare,
2009 origin: Unprepared,
2010 ..
2011 } => (),
2012 other => panic!("{other:?}"),
2013 };
2014
2015 let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap();
2016 let (ready_sender, ready_receiver) = oneshot::channel();
2017 let start_handle = start_ctx.spawn(async move {
2018 gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)");
2019 let fut = task.start();
2020 drop(fut);
2021 ready_sender.send(()).unwrap();
2022 match prepare_status.await {
2026 Err(TransitionError {
2027 trigger: Trigger::Error,
2028 state: Preparing,
2029 ..
2030 }) => (),
2031 other => panic!("{other:?}"),
2032 }
2033
2034 let unprepare_status = task.unprepare();
2035 match unprepare_status {
2036 Pending {
2037 trigger: Unprepare,
2038 origin: TaskState::Error,
2039 ..
2040 } => (),
2041 other => panic!("{other:?}"),
2042 };
2043 assert_eq!(
2044 unprepare_status.await.unwrap(),
2045 Complete {
2046 origin: TaskState::Error,
2047 target: Unprepared,
2048 },
2049 );
2050 });
2051
2052 gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx");
2053 block_on(ready_receiver).unwrap();
2054
2055 gst::debug!(
2056 RUNTIME_CAT,
2057 "prepare_start_error: triggering preparation (failure)"
2058 );
2059 block_on(prepare_sender.send(())).unwrap();
2060
2061 gst::debug!(
2062 RUNTIME_CAT,
2063 "prepare_start_error: await prepare error notification"
2064 );
2065 block_on(prepare_error_receiver.next()).unwrap();
2066
2067 block_on(start_handle).unwrap();
2068 }
2069
2070 #[test]
2071 fn item_error() {
2072 gst::init().unwrap();
2073
2074 struct TaskTest {
2075 obj: gst::Object,
2076 try_next_receiver: mpsc::Receiver<gst::FlowError>,
2077 }
2078
2079 impl TaskImpl for TaskTest {
2080 type Item = gst::FlowError;
2081
2082 fn obj(&self) -> &impl IsA<glib::Object> {
2083 &self.obj
2084 }
2085
2086 async fn try_next(&mut self) -> Result<gst::FlowError, gst::FlowError> {
2087 gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
2088 Ok(self.try_next_receiver.next().await.unwrap())
2089 }
2090
2091 async fn handle_item(&mut self, item: gst::FlowError) -> Result<(), gst::FlowError> {
2092 gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item);
2093 Err(item)
2094 }
2095 }
2096
2097 let context = Context::acquire("item_error", Duration::from_millis(2)).unwrap();
2098 let task = Task::default();
2099 gst::debug!(RUNTIME_CAT, "item_error: prepare and start");
2100 let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
2101 task.prepare(
2102 TaskTest {
2103 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2104 .name("runtime::Task::item_error")
2105 .build()
2106 .into(),
2107 try_next_receiver,
2108 },
2109 context,
2110 )
2111 .block_on()
2112 .unwrap();
2113 task.start().block_on().unwrap();
2114
2115 gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos");
2116 block_on(try_next_sender.send(gst::FlowError::Eos)).unwrap();
2117 while Stopped != task.state() {
2119 std::thread::sleep(Duration::from_millis(2));
2120 }
2121
2122 gst::debug!(RUNTIME_CAT, "item_error: starting (after stop)");
2123 assert_eq!(
2124 task.start().block_on().unwrap(),
2125 Complete {
2126 origin: Stopped,
2127 target: Started,
2128 },
2129 );
2130
2131 gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Error");
2132 block_on(try_next_sender.send(gst::FlowError::Error)).unwrap();
2133 while TaskState::Error != task.state() {
2135 std::thread::sleep(Duration::from_millis(2));
2136 }
2137
2138 gst::debug!(RUNTIME_CAT, "item_error: attempting to start (after Error)");
2139 match task.start().block_on().unwrap_err() {
2140 TransitionError {
2141 trigger: Start,
2142 state: TaskState::Error,
2143 ..
2144 } => (),
2145 other => panic!("{other:?}"),
2146 }
2147
2148 assert_eq!(
2149 task.unprepare().block_on().unwrap(),
2150 Complete {
2151 origin: TaskState::Error,
2152 target: Unprepared,
2153 },
2154 );
2155 }
2156
2157 #[test]
2158 fn flush_regular_sync() {
2159 gst::init().unwrap();
2160
2161 struct TaskFlushTest {
2162 obj: gst::Object,
2163 flush_start_sender: mpsc::Sender<()>,
2164 flush_stop_sender: mpsc::Sender<()>,
2165 }
2166
2167 impl TaskImpl for TaskFlushTest {
2168 type Item = ();
2169
2170 fn obj(&self) -> &impl IsA<glib::Object> {
2171 &self.obj
2172 }
2173
2174 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2175 pending().await
2176 }
2177
2178 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2179 unreachable!("flush_regular_sync: handle_item");
2180 }
2181
2182 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2183 gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
2184 self.flush_start_sender.send(()).await.unwrap();
2185 Ok(())
2186 }
2187
2188 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2189 gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopped flushing");
2190 self.flush_stop_sender.send(()).await.unwrap();
2191 Ok(())
2192 }
2193 }
2194
2195 let context = Context::acquire("flush_regular_sync", Duration::from_millis(2)).unwrap();
2196
2197 let task = Task::default();
2198
2199 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2200 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2201 let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2202 .name("runtime::Task::flush_regular_sync")
2203 .build();
2204 let fut = task.prepare(
2205 TaskFlushTest {
2206 obj: obj.clone().into(),
2207 flush_start_sender,
2208 flush_stop_sender,
2209 },
2210 context,
2211 );
2212 drop(fut);
2213
2214 gst::debug!(RUNTIME_CAT, "flush_regular_sync: start");
2215 block_on(task.start()).unwrap();
2216
2217 gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush");
2218 assert_eq!(
2219 task.flush_start().block_on().unwrap(),
2220 Complete {
2221 origin: Started,
2222 target: Flushing,
2223 },
2224 );
2225 assert_eq!(task.state(), Flushing);
2226
2227 block_on(flush_start_receiver.next()).unwrap();
2228
2229 gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
2230 assert_eq!(
2231 task.flush_stop().block_on_or_add_subtask(&obj).unwrap(),
2232 Complete {
2233 origin: Flushing,
2234 target: Started,
2235 },
2236 );
2237 assert_eq!(task.state(), Started);
2238
2239 block_on(flush_stop_receiver.next()).unwrap();
2240
2241 let fut = task.pause();
2242 drop(fut);
2243 stop_then_unprepare(task);
2244 }
2245
2246 #[test]
2247 fn flush_regular_different_context() {
2248 gst::init().unwrap();
2250
2251 struct TaskFlushTest {
2252 obj: gst::Object,
2253 flush_start_sender: mpsc::Sender<()>,
2254 flush_stop_sender: mpsc::Sender<()>,
2255 }
2256
2257 impl TaskImpl for TaskFlushTest {
2258 type Item = ();
2259
2260 fn obj(&self) -> &impl IsA<glib::Object> {
2261 &self.obj
2262 }
2263
2264 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2265 pending().await
2266 }
2267
2268 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2269 unreachable!("flush_regular_different_context: handle_item");
2270 }
2271
2272 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2273 gst::debug!(
2274 RUNTIME_CAT,
2275 "flush_regular_different_context: started flushing"
2276 );
2277 self.flush_start_sender.send(()).await.unwrap();
2278 Ok(())
2279 }
2280
2281 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2282 gst::debug!(
2283 RUNTIME_CAT,
2284 "flush_regular_different_context: stopped flushing"
2285 );
2286 self.flush_stop_sender.send(()).await.unwrap();
2287 Ok(())
2288 }
2289 }
2290
2291 let context =
2292 Context::acquire("flush_regular_different_context", Duration::from_millis(2)).unwrap();
2293
2294 let task = Task::default();
2295
2296 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2297 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2298 let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2299 .name("runtime::Task::flush_regular_different_context")
2300 .build();
2301 let fut = task.prepare(
2302 TaskFlushTest {
2303 obj: obj.clone().into(),
2304 flush_start_sender,
2305 flush_stop_sender,
2306 },
2307 context,
2308 );
2309 drop(fut);
2310
2311 gst::debug!(RUNTIME_CAT, "flush_regular_different_context: start");
2312 task.start().block_on().unwrap();
2313
2314 let oob_context = Context::acquire(
2315 "flush_regular_different_context_oob",
2316 Duration::from_millis(2),
2317 )
2318 .unwrap();
2319
2320 let task_clone = task.clone();
2321 let flush_res_fut = oob_context.spawn(async move {
2322 let flush_start_status = task_clone.flush_start();
2323 match flush_start_status {
2324 Pending {
2325 trigger: FlushStart,
2326 origin: Started,
2327 ..
2328 } => (),
2329 other => panic!("{other:?}"),
2330 };
2331 assert_eq!(
2332 flush_start_status.await.unwrap(),
2333 Complete {
2334 origin: Started,
2335 target: Flushing,
2336 },
2337 );
2338 assert_eq!(task_clone.state(), Flushing);
2339 flush_start_receiver.next().await.unwrap();
2340
2341 let flush_stop_status = task_clone.flush_stop();
2342 match flush_stop_status {
2343 Pending {
2344 trigger: FlushStop,
2345 origin: Flushing,
2346 ..
2347 } => (),
2348 other => panic!("{other:?}"),
2349 };
2350 assert_eq!(
2351 flush_stop_status.block_on_or_add_subtask(&obj).unwrap(),
2352 NotWaiting {
2353 trigger: FlushStop,
2354 origin: Flushing,
2355 },
2356 );
2357
2358 Context::drain_sub_tasks().await.unwrap();
2359 assert_eq!(task_clone.state(), Started);
2360 });
2361
2362 block_on(flush_res_fut).unwrap();
2363 block_on(flush_stop_receiver.next()).unwrap();
2364
2365 stop_then_unprepare(task);
2366 }
2367
2368 #[test]
2369 fn flush_regular_same_context() {
2370 gst::init().unwrap();
2372
2373 struct TaskFlushTest {
2374 obj: gst::Object,
2375 flush_start_sender: mpsc::Sender<()>,
2376 flush_stop_sender: mpsc::Sender<()>,
2377 }
2378
2379 impl TaskImpl for TaskFlushTest {
2380 type Item = ();
2381
2382 fn obj(&self) -> &impl IsA<glib::Object> {
2383 &self.obj
2384 }
2385
2386 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2387 pending().await
2388 }
2389
2390 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2391 unreachable!("flush_regular_same_context: handle_item");
2392 }
2393
2394 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2395 gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
2396 self.flush_start_sender.send(()).await.unwrap();
2397 Ok(())
2398 }
2399
2400 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2401 gst::debug!(RUNTIME_CAT, "flush_regular_same_context: stopped flushing");
2402 self.flush_stop_sender.send(()).await.unwrap();
2403 Ok(())
2404 }
2405 }
2406
2407 let context =
2408 Context::acquire("flush_regular_same_context", Duration::from_millis(2)).unwrap();
2409
2410 let task = Task::default();
2411
2412 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2413 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2414 let fut = task.prepare(
2415 TaskFlushTest {
2416 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2417 .name("runtime::Task::flush_regular_same_context")
2418 .build()
2419 .into(),
2420 flush_start_sender,
2421 flush_stop_sender,
2422 },
2423 context.clone(),
2424 );
2425 drop(fut);
2426
2427 block_on(task.start()).unwrap();
2428
2429 let task_clone = task.clone();
2430 let flush_handle = context.spawn(async move {
2431 let flush_start_status = task_clone.flush_start();
2432 match flush_start_status {
2433 Pending {
2434 trigger: FlushStart,
2435 origin: Started,
2436 ..
2437 } => (),
2438 other => panic!("{other:?}"),
2439 };
2440 assert_eq!(
2441 flush_start_status.await.unwrap(),
2442 Complete {
2443 origin: Started,
2444 target: Flushing,
2445 },
2446 );
2447 assert_eq!(task_clone.state(), Flushing);
2448 flush_start_receiver.next().await.unwrap();
2449
2450 let flush_stop_status = task_clone.flush_stop();
2451 match flush_stop_status {
2452 Pending {
2453 trigger: FlushStop,
2454 origin: Flushing,
2455 ..
2456 } => (),
2457 other => panic!("{other:?}"),
2458 };
2459 assert_eq!(
2460 flush_stop_status.await.unwrap(),
2461 Complete {
2462 origin: Flushing,
2463 target: Started,
2464 },
2465 );
2466 assert_eq!(task_clone.state(), Started);
2467 });
2468
2469 block_on(flush_handle).unwrap();
2470 block_on(flush_stop_receiver.next()).unwrap();
2471
2472 stop_then_unprepare(task);
2473 }
2474
2475 #[test]
2476 fn flush_from_loop() {
2477 gst::init().unwrap();
2479
2480 struct TaskFlushTest {
2481 obj: gst::Object,
2482 task: Task,
2483 flush_start_sender: mpsc::Sender<()>,
2484 }
2485
2486 impl TaskImpl for TaskFlushTest {
2487 type Item = ();
2488
2489 fn obj(&self) -> &impl IsA<glib::Object> {
2490 &self.obj
2491 }
2492
2493 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2494 Ok(())
2495 }
2496
2497 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2498 gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item");
2499 match self.task.flush_start() {
2500 Pending {
2501 trigger: FlushStart,
2502 origin: Started,
2503 ..
2504 } => (),
2505 other => panic!("{other:?}"),
2506 }
2507 Ok(())
2508 }
2509
2510 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2511 gst::debug!(RUNTIME_CAT, "flush_from_loop: started flushing");
2512 self.flush_start_sender.send(()).await.unwrap();
2513 Ok(())
2514 }
2515 }
2516
2517 let context = Context::acquire("flush_from_loop", Duration::from_millis(2)).unwrap();
2518
2519 let task = Task::default();
2520
2521 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2522 let fut = task.prepare(
2523 TaskFlushTest {
2524 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2525 .name("runtime::Task::flush_from_loop")
2526 .build()
2527 .into(),
2528 task: task.clone(),
2529 flush_start_sender,
2530 },
2531 context,
2532 );
2533 drop(fut);
2534
2535 let fut = task.start();
2536 drop(fut);
2537
2538 gst::debug!(
2539 RUNTIME_CAT,
2540 "flush_from_loop: awaiting flush_start notification"
2541 );
2542 block_on(flush_start_receiver.next()).unwrap();
2543
2544 assert_eq!(
2545 task.stop().block_on().unwrap(),
2546 Complete {
2547 origin: Flushing,
2548 target: Stopped,
2549 },
2550 );
2551 task.unprepare().block_on().unwrap();
2552 }
2553
2554 #[test]
2555 fn pause_from_loop() {
2556 gst::init().unwrap();
2559
2560 struct TaskStartTest {
2561 obj: gst::Object,
2562 task: Task,
2563 pause_sender: mpsc::Sender<()>,
2564 }
2565
2566 impl TaskImpl for TaskStartTest {
2567 type Item = ();
2568
2569 fn obj(&self) -> &impl IsA<glib::Object> {
2570 &self.obj
2571 }
2572
2573 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2574 Ok(())
2575 }
2576
2577 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2578 gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
2579
2580 crate::runtime::timer::delay_for(Duration::from_millis(50)).await;
2581
2582 gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
2583 match self.task.pause() {
2584 Pending {
2585 trigger: Pause,
2586 origin: Started,
2587 ..
2588 } => (),
2589 other => panic!("{other:?}"),
2590 }
2591
2592 Ok(())
2593 }
2594
2595 async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
2596 gst::debug!(RUNTIME_CAT, "pause_from_loop: entering pause action");
2597 self.pause_sender.send(()).await.unwrap();
2598 Ok(())
2599 }
2600 }
2601
2602 let context = Context::acquire("pause_from_loop", Duration::from_millis(2)).unwrap();
2603
2604 let task = Task::default();
2605
2606 let (pause_sender, mut pause_receiver) = mpsc::channel(1);
2607 let fut = task.prepare(
2608 TaskStartTest {
2609 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2610 .name("runtime::Task::pause_from_loop")
2611 .build()
2612 .into(),
2613 task: task.clone(),
2614 pause_sender,
2615 },
2616 context,
2617 );
2618 drop(fut);
2619
2620 let fut = task.start();
2621 drop(fut);
2622
2623 gst::debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification");
2624 block_on(pause_receiver.next()).unwrap();
2625
2626 stop_then_unprepare(task);
2627 }
2628
2629 #[test]
2630 fn trigger_from_action() {
2631 gst::init().unwrap();
2633
2634 struct TaskFlushTest {
2635 obj: gst::Object,
2636 task: Task,
2637 flush_stop_sender: mpsc::Sender<()>,
2638 }
2639
2640 impl TaskImpl for TaskFlushTest {
2641 type Item = ();
2642
2643 fn obj(&self) -> &impl IsA<glib::Object> {
2644 &self.obj
2645 }
2646
2647 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2648 pending().await
2649 }
2650
2651 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2652 unreachable!("trigger_from_action: handle_item");
2653 }
2654
2655 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2656 gst::debug!(
2657 RUNTIME_CAT,
2658 "trigger_from_action: flush_start triggering flush_stop"
2659 );
2660 match self.task.flush_stop() {
2661 Pending {
2662 trigger: FlushStop,
2663 origin: Started,
2664 ..
2665 } => (),
2666 other => panic!("{other:?}"),
2667 }
2668
2669 Ok(())
2670 }
2671
2672 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2673 gst::debug!(RUNTIME_CAT, "trigger_from_action: stopped flushing");
2674 self.flush_stop_sender.send(()).await.unwrap();
2675 Ok(())
2676 }
2677 }
2678
2679 let context = Context::acquire("trigger_from_action", Duration::from_millis(2)).unwrap();
2680
2681 let task = Task::default();
2682
2683 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2684 let fut = task.prepare(
2685 TaskFlushTest {
2686 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2687 .name("runtime::Task::trigger_from_action")
2688 .build()
2689 .into(),
2690 task: task.clone(),
2691 flush_stop_sender,
2692 },
2693 context,
2694 );
2695 drop(fut);
2696
2697 task.start().block_on().unwrap();
2698 let fut = task.flush_start();
2699 drop(fut);
2700
2701 gst::debug!(
2702 RUNTIME_CAT,
2703 "trigger_from_action: awaiting flush_stop notification"
2704 );
2705 block_on(flush_stop_receiver.next()).unwrap();
2706
2707 stop_then_unprepare(task);
2708 }
2709
2710 #[test]
2711 fn pause_flush_start() {
2712 gst::init().unwrap();
2713
2714 struct TaskFlushTest {
2715 obj: gst::Object,
2716 started_sender: mpsc::Sender<()>,
2717 flush_start_sender: mpsc::Sender<()>,
2718 flush_stop_sender: mpsc::Sender<()>,
2719 }
2720
2721 impl TaskImpl for TaskFlushTest {
2722 type Item = ();
2723
2724 fn obj(&self) -> &impl IsA<glib::Object> {
2725 &self.obj
2726 }
2727
2728 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2729 gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
2730 self.started_sender.send(()).await.unwrap();
2731 Ok(())
2732 }
2733
2734 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2735 pending().await
2736 }
2737
2738 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2739 unreachable!("pause_flush_start: handle_item");
2740 }
2741
2742 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2743 gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
2744 self.flush_start_sender.send(()).await.unwrap();
2745 Ok(())
2746 }
2747
2748 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2749 gst::debug!(RUNTIME_CAT, "pause_flush_start: stopped flushing");
2750 self.flush_stop_sender.send(()).await.unwrap();
2751 Ok(())
2752 }
2753 }
2754
2755 let context = Context::acquire("pause_flush_start", Duration::from_millis(2)).unwrap();
2756
2757 let task = Task::default();
2758
2759 let (started_sender, mut started_receiver) = mpsc::channel(1);
2760 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2761 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2762 let fut = task.prepare(
2763 TaskFlushTest {
2764 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2765 .name("runtime::Task::pause_flush_start")
2766 .build()
2767 .into(),
2768 started_sender,
2769 flush_start_sender,
2770 flush_stop_sender,
2771 },
2772 context,
2773 );
2774 drop(fut);
2775
2776 gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing");
2779 assert_eq!(
2780 task.pause().block_on().unwrap(),
2781 Complete {
2782 origin: Prepared,
2783 target: Paused,
2784 },
2785 );
2786
2787 gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush");
2788 assert_eq!(
2789 task.flush_start().block_on().unwrap(),
2790 Complete {
2791 origin: Paused,
2792 target: PausedFlushing,
2793 },
2794 );
2795 assert_eq!(task.state(), PausedFlushing);
2796 block_on(flush_start_receiver.next());
2797
2798 gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
2799 assert_eq!(
2800 task.flush_stop().block_on().unwrap(),
2801 Complete {
2802 origin: PausedFlushing,
2803 target: Paused,
2804 },
2805 );
2806 assert_eq!(task.state(), Paused);
2807 block_on(flush_stop_receiver.next());
2808
2809 started_receiver.try_next().unwrap_err();
2811
2812 gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing");
2813 assert_eq!(
2814 task.start().block_on().unwrap(),
2815 Complete {
2816 origin: Paused,
2817 target: Started,
2818 },
2819 );
2820 assert_eq!(task.state(), Started);
2821 block_on(started_receiver.next());
2822
2823 stop_then_unprepare(task);
2824 }
2825
2826 #[test]
2827 fn pause_flushing_start() {
2828 gst::init().unwrap();
2829
2830 struct TaskFlushTest {
2831 obj: gst::Object,
2832 started_sender: mpsc::Sender<()>,
2833 flush_start_sender: mpsc::Sender<()>,
2834 flush_stop_sender: mpsc::Sender<()>,
2835 }
2836
2837 impl TaskImpl for TaskFlushTest {
2838 type Item = ();
2839
2840 fn obj(&self) -> &impl IsA<glib::Object> {
2841 &self.obj
2842 }
2843
2844 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2845 gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
2846 self.started_sender.send(()).await.unwrap();
2847 Ok(())
2848 }
2849
2850 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2851 pending().await
2852 }
2853
2854 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2855 unreachable!("pause_flushing_start: handle_item");
2856 }
2857
2858 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2859 gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
2860 self.flush_start_sender.send(()).await.unwrap();
2861 Ok(())
2862 }
2863
2864 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2865 gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopped flushing");
2866 self.flush_stop_sender.send(()).await.unwrap();
2867 Ok(())
2868 }
2869 }
2870
2871 let context = Context::acquire("pause_flushing_start", Duration::from_millis(2)).unwrap();
2872
2873 let task = Task::default();
2874
2875 let (started_sender, mut started_receiver) = mpsc::channel(1);
2876 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2877 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2878 let fut = task.prepare(
2879 TaskFlushTest {
2880 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2881 .name("runtime::Task::pause_flushing_start")
2882 .build()
2883 .into(),
2884 started_sender,
2885 flush_start_sender,
2886 flush_stop_sender,
2887 },
2888 context,
2889 );
2890 drop(fut);
2891
2892 gst::debug!(RUNTIME_CAT, "pause_flushing_start: pausing");
2895 let fut = task.pause();
2896 drop(fut);
2897
2898 gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting flush");
2899 block_on(task.flush_start()).unwrap();
2900 assert_eq!(task.state(), PausedFlushing);
2901 block_on(flush_start_receiver.next());
2902
2903 gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
2904 assert_eq!(
2905 task.start().block_on().unwrap(),
2906 Complete {
2907 origin: PausedFlushing,
2908 target: Flushing,
2909 },
2910 );
2911 assert_eq!(task.state(), Flushing);
2912
2913 started_receiver.try_next().unwrap_err();
2915
2916 gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush");
2917 assert_eq!(
2918 task.flush_stop().block_on().unwrap(),
2919 Complete {
2920 origin: Flushing,
2921 target: Started,
2922 },
2923 );
2924 assert_eq!(task.state(), Started);
2925 block_on(flush_stop_receiver.next());
2926 block_on(started_receiver.next());
2927
2928 stop_then_unprepare(task);
2929 }
2930
2931 #[test]
2932 fn flush_concurrent_start() {
2933 gst::init().unwrap();
2936
2937 struct TaskStartTest {
2938 obj: gst::Object,
2939 flush_start_sender: mpsc::Sender<()>,
2940 flush_stop_sender: mpsc::Sender<()>,
2941 }
2942
2943 impl TaskImpl for TaskStartTest {
2944 type Item = ();
2945
2946 fn obj(&self) -> &impl IsA<glib::Object> {
2947 &self.obj
2948 }
2949
2950 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2951 pending().await
2952 }
2953
2954 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2955 unreachable!("flush_concurrent_start: handle_item");
2956 }
2957
2958 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2959 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
2960 self.flush_start_sender.send(()).await.unwrap();
2961 Ok(())
2962 }
2963
2964 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2965 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: stopped flushing");
2966 self.flush_stop_sender.send(()).await.unwrap();
2967 Ok(())
2968 }
2969 }
2970
2971 let context = Context::acquire("flush_concurrent_start", Duration::from_millis(2)).unwrap();
2972
2973 let task = Task::default();
2974
2975 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2976 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2977 let fut = task.prepare(
2978 TaskStartTest {
2979 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2980 .name("runtime::Task::flush_concurrent_start")
2981 .build()
2982 .into(),
2983 flush_start_sender,
2984 flush_stop_sender,
2985 },
2986 context,
2987 );
2988 drop(fut);
2989
2990 let oob_context =
2991 Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap();
2992 let task_clone = task.clone();
2993
2994 block_on(task.pause()).unwrap();
2995
2996 let (ready_sender, ready_receiver) = oneshot::channel();
2998 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: spawning flush_start");
2999 let flush_start_handle = oob_context.spawn(async move {
3000 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start");
3001 ready_sender.send(()).unwrap();
3002 let status = task_clone.flush_start();
3003 match status {
3004 Pending {
3005 trigger: FlushStart,
3006 origin: Paused,
3007 ..
3008 } => (),
3009 Pending {
3010 trigger: FlushStart,
3011 origin: Started,
3012 ..
3013 } => (),
3014 other => panic!("{other:?}"),
3015 };
3016 status.await.unwrap();
3017 flush_start_receiver.next().await.unwrap();
3018 });
3019
3020 gst::debug!(
3021 RUNTIME_CAT,
3022 "flush_concurrent_start: awaiting for oob_context"
3023 );
3024 block_on(ready_receiver).unwrap();
3025
3026 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
3027 match block_on(task.start()) {
3028 Ok(Complete {
3029 origin: Paused,
3030 target: Started,
3031 }) => (),
3032 Ok(Complete {
3033 origin: PausedFlushing,
3034 target: Flushing,
3035 }) => (),
3036 other => panic!("{other:?}"),
3037 }
3038
3039 block_on(flush_start_handle).unwrap();
3040
3041 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
3042 assert_eq!(
3043 task.flush_stop().block_on().unwrap(),
3044 Complete {
3045 origin: Flushing,
3046 target: Started,
3047 },
3048 );
3049 assert_eq!(task.state(), Started);
3050 block_on(flush_stop_receiver.next());
3051
3052 stop_then_unprepare(task);
3053 }
3054
3055 #[test]
3056 fn start_timer() {
3057 use crate::runtime::timer;
3058
3059 gst::init().unwrap();
3062
3063 struct TaskTimerTest {
3064 obj: gst::Object,
3065 timer: Option<timer::Oneshot>,
3066 timer_elapsed_sender: Option<oneshot::Sender<()>>,
3067 }
3068
3069 impl TaskImpl for TaskTimerTest {
3070 type Item = ();
3071
3072 fn obj(&self) -> &impl IsA<glib::Object> {
3073 &self.obj
3074 }
3075
3076 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
3077 self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
3078 gst::debug!(RUNTIME_CAT, "start_timer: started");
3079 Ok(())
3080 }
3081
3082 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
3083 gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer");
3084 self.timer.take().unwrap().await;
3085 Ok(())
3086 }
3087
3088 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
3089 gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
3090 if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() {
3091 timer_elapsed_sender.send(()).unwrap();
3092 }
3093
3094 Err(gst::FlowError::Eos)
3095 }
3096 }
3097
3098 let context = Context::acquire("start_timer", Duration::from_millis(2)).unwrap();
3099
3100 let task = Task::default();
3101
3102 let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel();
3103 let fut = task.prepare(
3104 TaskTimerTest {
3105 obj: gst::Pad::builder(gst::PadDirection::Unknown)
3106 .name("runtime::Task::start_timer")
3107 .build()
3108 .into(),
3109 timer: None,
3110 timer_elapsed_sender: Some(timer_elapsed_sender),
3111 },
3112 context,
3113 );
3114 drop(fut);
3115
3116 gst::debug!(RUNTIME_CAT, "start_timer: start");
3117 let fut = task.start();
3118 drop(fut);
3119
3120 block_on(timer_elapsed_receiver).unwrap();
3121 gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed received");
3122
3123 stop_then_unprepare(task);
3124 }
3125}