1use futures::channel::mpsc as async_mpsc;
13use futures::channel::oneshot;
14use futures::prelude::*;
15
16use std::fmt;
17use std::ops::Deref;
18use std::pin::{Pin, pin};
19use std::sync::{Arc, Mutex, MutexGuard};
20use std::task::Poll;
21
22use gst::glib;
23use gst::glib::prelude::*;
24
25use super::{Context, JoinHandle, RUNTIME_CAT};
26
27#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)]
28pub enum TaskState {
29 Error,
30 Flushing,
31 Paused,
32 PausedFlushing,
33 Prepared,
34 Preparing,
35 Started,
36 Stopped,
37 Unprepared,
38}
39
40#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub enum Trigger {
42 Error,
43 FlushStart,
44 FlushStop,
45 Pause,
46 Prepare,
47 Start,
48 Stop,
49 Unprepare,
50}
51
52#[derive(Clone, Copy, Debug, Eq, PartialEq)]
54pub enum TransitionOk {
55 Complete {
57 origin: TaskState,
58 target: TaskState,
59 },
60 NotWaiting { trigger: Trigger, origin: TaskState },
67 Skipped { trigger: Trigger, state: TaskState },
69}
70
71#[derive(Clone, Debug, Eq, PartialEq)]
73pub struct TransitionError {
74 pub trigger: Trigger,
75 pub state: TaskState,
76 pub err_msg: gst::ErrorMessage,
77}
78
79impl fmt::Display for TransitionError {
80 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
81 write!(
82 f,
83 "{:?} from state {:?}: {:?}",
84 self.trigger, self.state, self.err_msg
85 )
86 }
87}
88
89impl std::error::Error for TransitionError {}
90
91impl From<TransitionError> for gst::ErrorMessage {
92 fn from(err: TransitionError) -> Self {
93 err.err_msg
94 }
95}
96
97#[must_use = "This `TransitionStatus` may be `Pending`. In most cases it should be awaited. See `await_maybe_on_context`"]
103pub enum TransitionStatus {
104 Ready(Result<TransitionOk, TransitionError>),
106 Pending {
108 trigger: Trigger,
109 origin: TaskState,
110 res_fut: Pin<Box<dyn Future<Output = Result<TransitionOk, TransitionError>> + Send>>,
111 },
112}
113
114impl TransitionStatus {
115 pub fn is_ready(&self) -> bool {
116 matches!(self, TransitionStatus::Ready { .. })
117 }
118
119 pub fn is_pending(&self) -> bool {
120 matches!(self, TransitionStatus::Pending { .. })
121 }
122
123 pub fn check(self) -> Result<TransitionStatus, TransitionError> {
131 match self {
132 TransitionStatus::Ready(Err(err)) => Err(err),
133 other => Ok(other),
134 }
135 }
136
137 pub fn block_on_or_add_subtask<O>(self, obj: &O) -> Result<TransitionOk, TransitionError>
191 where
192 O: IsA<glib::Object> + Send,
193 {
194 use TransitionStatus::*;
195 match self {
196 Pending {
197 trigger,
198 origin,
199 res_fut,
200 } => match Context::current_task() {
201 Some((ctx, task_id)) => {
202 gst::debug!(
203 RUNTIME_CAT,
204 obj = obj,
205 "Awaiting for {trigger:?} ack in a subtask on context {}",
206 ctx.name()
207 );
208
209 let obj = obj.clone();
210 let _ = ctx.add_sub_task(task_id, async move {
211 let res = res_fut.await;
212 match res {
213 Ok(status) => {
214 gst::log!(
215 RUNTIME_CAT,
216 obj = obj,
217 "Task {trigger:?} success: {status:?}",
218 );
219 }
220 Err(err) => {
221 gst::error!(
222 RUNTIME_CAT,
223 obj = obj,
224 "Task {trigger:?} failure: {err}"
225 );
226 }
227 }
228
229 Ok(())
230 });
231
232 Ok(TransitionOk::NotWaiting { trigger, origin })
233 }
234 _ => {
235 gst::debug!(
236 RUNTIME_CAT,
237 obj = obj,
238 "Awaiting for {trigger:?} ack on current thread",
239 );
240 let res = futures::executor::block_on(res_fut);
241 match res {
242 Ok(ref status) => {
243 gst::log!(
244 RUNTIME_CAT,
245 obj = obj,
246 "Task {trigger:?} success: {status:?}",
247 );
248 }
249 Err(ref err) => {
250 gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}");
251 }
252 }
253
254 res
255 }
256 },
257 Ready(res) => {
258 match res {
259 Ok(ref status) => {
260 gst::log!(
261 RUNTIME_CAT,
262 obj = obj,
263 "Task transition immediate success: {status:?}",
264 );
265 }
266 Err(ref err) => {
267 gst::error!(
268 RUNTIME_CAT,
269 obj = obj,
270 "Task transition immediate failure: {err}",
271 );
272 }
273 }
274
275 res
276 }
277 }
278 }
279
280 pub fn block_on_or_add_subtask_then<T, F>(
318 self,
319 obj: glib::BorrowedObject<'_, T>,
320 func: F,
321 ) -> Result<(), gst::ErrorMessage>
322 where
323 T: IsA<glib::Object> + Send,
324 F: FnOnce(&T, &Result<TransitionOk, TransitionError>) + Send + 'static,
325 {
326 use TransitionStatus::*;
327 match self {
328 Pending {
329 trigger, res_fut, ..
330 } => match Context::current_task() {
331 Some((ctx, task_id)) => {
332 gst::debug!(
333 RUNTIME_CAT,
334 obj = obj,
335 "Awaiting for {trigger:?} ack in a subtask on context {}",
336 ctx.name()
337 );
338 let obj = obj.clone();
339 let _ = ctx.add_sub_task(task_id, async move {
340 let res = res_fut.await;
341 match res {
342 Ok(ref status) => {
343 gst::log!(
344 RUNTIME_CAT,
345 obj = obj,
346 "Task {trigger:?} success: {status:?}",
347 );
348 func(&obj, &res);
349 Ok(())
350 }
351 Err(ref err) => {
352 gst::error!(
353 RUNTIME_CAT,
354 obj = obj,
355 "Task {trigger:?} failure: {err}",
356 );
357 func(&obj, &res);
358 Err(gst::FlowError::Error)
359 }
360 }
361 });
362
363 Ok(())
364 }
365 _ => {
366 gst::debug!(
367 RUNTIME_CAT,
368 obj = obj,
369 "Awaiting for {trigger:?} ack on current thread",
370 );
371 let res = futures::executor::block_on(res_fut);
372 match res {
373 Ok(ref status) => {
374 gst::log!(
375 RUNTIME_CAT,
376 obj = obj,
377 "Task {trigger:?} success: {status:?}",
378 );
379 func(&obj, &res);
380 Ok(())
381 }
382 Err(ref err) => {
383 gst::error!(RUNTIME_CAT, obj = obj, "Task {trigger:?} failure: {err}",);
384 func(&obj, &res);
385 res.map(|_| ()).map_err(|err| err.into())
386 }
387 }
388 }
389 },
390 Ready(res) => match res {
391 Ok(ref status) => {
392 gst::log!(
393 RUNTIME_CAT,
394 obj = obj,
395 "Task transition immediate success: {status:?}",
396 );
397 func(&obj, &res);
398 Ok(())
399 }
400 Err(ref err) => {
401 gst::error!(
402 RUNTIME_CAT,
403 obj = obj,
404 "Task transition immediate failure: {err}",
405 );
406 func(&obj, &res);
407 res.map(|_| ()).map_err(|err| err.into())
408 }
409 },
410 }
411 }
412}
413
414impl Future for TransitionStatus {
415 type Output = Result<TransitionOk, TransitionError>;
416
417 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
418 use TransitionStatus::*;
419
420 match &mut *self {
421 Ready(res) => Poll::Ready(res.clone()),
422 Pending { res_fut, .. } => match Pin::new(res_fut).poll(cx) {
423 Poll::Pending => Poll::Pending,
424 Poll::Ready(res) => {
425 *self = Ready(res.clone());
426
427 Poll::Ready(res)
428 }
429 },
430 }
431 }
432}
433
434impl From<TransitionOk> for TransitionStatus {
435 fn from(ok: TransitionOk) -> Self {
436 Self::Ready(Ok(ok))
437 }
438}
439
440impl From<TransitionError> for TransitionStatus {
441 fn from(err: TransitionError) -> Self {
442 Self::Ready(Err(err))
443 }
444}
445
446impl fmt::Debug for TransitionStatus {
448 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449 use TransitionStatus::*;
450 match self {
451 Ready(res) => f.debug_tuple("Ready").field(res).finish(),
452 Pending {
453 trigger, origin, ..
454 } => f
455 .debug_struct("Pending")
456 .field("trigger", trigger)
457 .field("origin", origin)
458 .finish(),
459 }
460 }
461}
462
463pub trait TaskImpl: Send + 'static {
467 type Item: Send + 'static;
468
469 fn obj(&self) -> &impl IsA<glib::Object>;
470
471 fn prepare(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
472 future::ok(())
473 }
474
475 fn unprepare(&mut self) -> impl Future<Output = ()> + Send {
476 future::ready(())
477 }
478
479 fn start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
480 future::ok(())
481 }
482
483 fn try_next(&mut self) -> impl Future<Output = Result<Self::Item, gst::FlowError>> + Send;
496
497 fn handle_item(
508 &mut self,
509 _item: Self::Item,
510 ) -> impl Future<Output = Result<(), gst::FlowError>> + Send;
511
512 fn pause(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
513 future::ok(())
514 }
515
516 fn flush_start(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
517 future::ready(Ok(()))
518 }
519
520 fn flush_stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
521 future::ready(Ok(()))
522 }
523
524 fn stop(&mut self) -> impl Future<Output = Result<(), gst::ErrorMessage>> + Send {
525 future::ready(Ok(()))
526 }
527
528 fn handle_loop_error(&mut self, err: gst::FlowError) -> impl Future<Output = Trigger> + Send {
543 async move {
544 match err {
545 gst::FlowError::Flushing => {
546 gst::debug!(
547 RUNTIME_CAT,
548 obj = self.obj(),
549 "Task loop returned Flushing. Posting FlushStart"
550 );
551 Trigger::FlushStart
552 }
553 gst::FlowError::Eos => {
554 gst::debug!(
555 RUNTIME_CAT,
556 obj = self.obj(),
557 "Task loop returned Eos. Posting Stop"
558 );
559 Trigger::Stop
560 }
561 other => {
562 gst::error!(
563 RUNTIME_CAT,
564 obj = self.obj(),
565 "Task loop returned {other:?}. Posting Error",
566 );
567 Trigger::Error
568 }
569 }
570 }
571 }
572
573 fn handle_action_error(
584 &mut self,
585 trigger: Trigger,
586 state: TaskState,
587 err: gst::ErrorMessage,
588 ) -> impl Future<Output = Trigger> + Send {
589 async move {
590 gst::error!(
591 RUNTIME_CAT,
592 obj = self.obj(),
593 "TaskImpl transition action error during {trigger:?} from {state:?}: {err:?}. Posting Trigger::Error",
594 );
595
596 Trigger::Error
597 }
598 }
599}
600
601type AckSender = oneshot::Sender<Result<TransitionOk, TransitionError>>;
602type AckReceiver = oneshot::Receiver<Result<TransitionOk, TransitionError>>;
603
604struct TriggeringEvent {
605 trigger: Trigger,
606 ack_tx: AckSender,
607}
608
609impl TriggeringEvent {
610 fn new(trigger: Trigger) -> (Self, AckReceiver) {
611 let (ack_tx, ack_rx) = oneshot::channel();
612 let req = TriggeringEvent { trigger, ack_tx };
613
614 (req, ack_rx)
615 }
616
617 fn send_ack(self, res: Result<TransitionOk, TransitionError>) {
618 let _ = self.ack_tx.send(res);
619 }
620
621 fn send_err_ack(self) {
622 let res = Err(TransitionError {
623 trigger: self.trigger,
624 state: TaskState::Error,
625 err_msg: gst::error_msg!(
626 gst::CoreError::StateChange,
627 [
628 "Triggering Event {:?} rejected due to a previous unrecoverable error",
629 self.trigger,
630 ]
631 ),
632 });
633
634 self.send_ack(res);
635 }
636}
637
638impl fmt::Debug for TriggeringEvent {
639 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
640 f.debug_struct("TriggeringEvent")
641 .field("trigger", &self.trigger)
642 .finish()
643 }
644}
645
646#[derive(Debug)]
647struct StateMachineHandle {
648 join_handle: JoinHandle<()>,
649 triggering_evt_tx: async_mpsc::Sender<TriggeringEvent>,
650 context: Context,
651}
652
653impl StateMachineHandle {
654 fn trigger(&mut self, trigger: Trigger) -> AckReceiver {
655 let (triggering_evt, ack_rx) = TriggeringEvent::new(trigger);
656
657 gst::log!(RUNTIME_CAT, "Pushing {triggering_evt:?}");
658 self.triggering_evt_tx.try_send(triggering_evt).unwrap();
659
660 self.context.unpark();
661
662 ack_rx
663 }
664
665 async fn join(self) {
666 self.join_handle
667 .await
668 .expect("state machine shouldn't have been cancelled");
669 }
670}
671
672#[derive(Debug)]
673struct TaskInner {
674 state: TaskState,
675 state_machine_handle: Option<StateMachineHandle>,
676}
677
678impl Default for TaskInner {
679 fn default() -> Self {
680 TaskInner {
681 state: TaskState::Unprepared,
682 state_machine_handle: None,
683 }
684 }
685}
686
687impl TaskInner {
688 fn switch_to_state(&mut self, target_state: TaskState, triggering_evt: TriggeringEvent) {
689 let res = Ok(TransitionOk::Complete {
690 origin: self.state,
691 target: target_state,
692 });
693
694 self.state = target_state;
695 triggering_evt.send_ack(res);
696 }
697
698 fn switch_to_err(&mut self, triggering_evt: TriggeringEvent) {
699 let res = Err(TransitionError {
700 trigger: triggering_evt.trigger,
701 state: self.state,
702 err_msg: gst::error_msg!(
703 gst::CoreError::StateChange,
704 [
705 "Unrecoverable error for {triggering_evt:?} from state {:?}",
706 self.state,
707 ]
708 ),
709 });
710
711 self.state = TaskState::Error;
712 triggering_evt.send_ack(res);
713 }
714
715 fn skip_triggering_evt(&mut self, triggering_evt: TriggeringEvent) {
716 let res = Ok(TransitionOk::Skipped {
717 trigger: triggering_evt.trigger,
718 state: self.state,
719 });
720
721 triggering_evt.send_ack(res);
722 }
723
724 fn trigger(&mut self, trigger: Trigger) -> Result<AckReceiver, TransitionError> {
725 self.state_machine_handle
726 .as_mut()
727 .map(|state_machine| state_machine.trigger(trigger))
728 .ok_or_else(|| {
729 gst::warning!(RUNTIME_CAT, "Unable to send {trigger:?}: no state machine",);
730 TransitionError {
731 trigger,
732 state: TaskState::Unprepared,
733 err_msg: gst::error_msg!(
734 gst::ResourceError::NotFound,
735 ["Unable to send {trigger:?}: no state machine"]
736 ),
737 }
738 })
739 }
740}
741
742impl Drop for TaskInner {
743 fn drop(&mut self) {
744 if self.state != TaskState::Unprepared {
745 gst::fixme!(RUNTIME_CAT, "Missing call to `Task::unprepare`");
749 }
750 }
751}
752
753pub struct TaskStateGuard<'guard>(MutexGuard<'guard, TaskInner>);
755
756impl Deref for TaskStateGuard<'_> {
757 type Target = TaskState;
758
759 fn deref(&self) -> &Self::Target {
760 &(self.0).state
761 }
762}
763
764#[derive(Debug, Clone)]
768pub struct Task(Arc<Mutex<TaskInner>>);
769
770impl Default for Task {
771 fn default() -> Self {
772 Task(Arc::new(Mutex::new(TaskInner::default())))
773 }
774}
775
776impl Task {
777 pub fn state(&self) -> TaskState {
778 self.0.lock().unwrap().state
779 }
780
781 pub fn lock_state(&self) -> TaskStateGuard<'_> {
782 TaskStateGuard(self.0.lock().unwrap())
783 }
784
785 pub fn prepare(&self, task_impl: impl TaskImpl, context: Context) -> TransitionStatus {
786 let mut inner = self.0.lock().unwrap();
787
788 let origin = inner.state;
789 match origin {
790 TaskState::Unprepared => (),
791 TaskState::Prepared | TaskState::Preparing => {
792 gst::debug!(
793 RUNTIME_CAT,
794 obj = task_impl.obj(),
795 "Task already {origin:?}",
796 );
797 return TransitionOk::Skipped {
798 trigger: Trigger::Prepare,
799 state: origin,
800 }
801 .into();
802 }
803 state => {
804 gst::warning!(
805 RUNTIME_CAT,
806 obj = task_impl.obj(),
807 "Attempt to prepare Task in state {state:?}"
808 );
809 return TransitionError {
810 trigger: Trigger::Prepare,
811 state: inner.state,
812 err_msg: gst::error_msg!(
813 gst::CoreError::StateChange,
814 ["Attempt to prepare Task in state {state:?}"]
815 ),
816 }
817 .into();
818 }
819 }
820
821 assert!(inner.state_machine_handle.is_none());
822
823 inner.state = TaskState::Preparing;
824
825 gst::log!(
826 RUNTIME_CAT,
827 obj = task_impl.obj(),
828 "Spawning task state machine"
829 );
830 inner.state_machine_handle = Some(StateMachine::spawn(self.0.clone(), task_impl, context));
831
832 let ack_rx = match inner.trigger(Trigger::Prepare) {
833 Ok(ack_rx) => ack_rx,
834 Err(err) => return err.into(),
835 };
836 drop(inner);
837
838 TransitionStatus::Pending {
839 trigger: Trigger::Prepare,
840 origin: TaskState::Unprepared,
841 res_fut: Box::pin(ack_rx.map(Result::unwrap)),
842 }
843 }
844
845 pub fn unprepare(&self) -> TransitionStatus {
846 let mut inner = self.0.lock().unwrap();
847
848 let origin = inner.state;
849 let mut state_machine_handle = match origin {
850 TaskState::Stopped
851 | TaskState::Error
852 | TaskState::Prepared
853 | TaskState::Preparing
854 | TaskState::Unprepared => match inner.state_machine_handle.take() {
855 Some(state_machine_handle) => {
856 gst::debug!(RUNTIME_CAT, "Unpreparing task");
857
858 state_machine_handle
859 }
860 None => {
861 gst::debug!(RUNTIME_CAT, "Task already unpreparing");
862 return TransitionOk::Skipped {
863 trigger: Trigger::Unprepare,
864 state: origin,
865 }
866 .into();
867 }
868 },
869 state => {
870 gst::warning!(RUNTIME_CAT, "Attempt to unprepare Task in state {state:?}");
871 return TransitionError {
872 trigger: Trigger::Unprepare,
873 state: inner.state,
874 err_msg: gst::error_msg!(
875 gst::CoreError::StateChange,
876 ["Attempt to unprepare Task in state {state:?}"]
877 ),
878 }
879 .into();
880 }
881 };
882
883 let ack_rx = state_machine_handle.trigger(Trigger::Unprepare);
884 drop(inner);
885
886 let state_machine_end_fut = async {
887 state_machine_handle.join().await;
888 ack_rx.await.unwrap()
889 };
890
891 TransitionStatus::Pending {
892 trigger: Trigger::Unprepare,
893 origin,
894 res_fut: Box::pin(state_machine_end_fut),
895 }
896 }
897
898 pub fn start(&self) -> TransitionStatus {
902 let mut inner = self.0.lock().unwrap();
903
904 if let TaskState::Started = inner.state {
905 return TransitionOk::Skipped {
906 trigger: Trigger::Start,
907 state: TaskState::Started,
908 }
909 .into();
910 }
911
912 let ack_rx = match inner.trigger(Trigger::Start) {
913 Ok(ack_rx) => ack_rx,
914 Err(err) => return err.into(),
915 };
916
917 let origin = inner.state;
918 drop(inner);
919
920 TransitionStatus::Pending {
921 trigger: Trigger::Start,
922 origin,
923 res_fut: Box::pin(ack_rx.map(Result::unwrap)),
924 }
925 }
926
927 pub fn pause(&self) -> TransitionStatus {
932 self.push_pending(Trigger::Pause)
933 }
934
935 pub fn flush_start(&self) -> TransitionStatus {
936 self.push_pending(Trigger::FlushStart)
937 }
938
939 pub fn flush_stop(&self) -> TransitionStatus {
940 self.push_pending(Trigger::FlushStop)
941 }
942
943 pub fn stop(&self) -> TransitionStatus {
945 self.push_pending(Trigger::Stop)
946 }
947
948 fn push_pending(&self, trigger: Trigger) -> TransitionStatus {
950 let mut inner = self.0.lock().unwrap();
951
952 let ack_rx = match inner.trigger(trigger) {
953 Ok(ack_rx) => ack_rx,
954 Err(err) => return err.into(),
955 };
956
957 let origin = inner.state;
958 drop(inner);
959
960 TransitionStatus::Pending {
961 trigger,
962 origin,
963 res_fut: Box::pin(ack_rx.map(Result::unwrap)),
964 }
965 }
966}
967
968struct StateMachine<Task: TaskImpl> {
969 task_impl: Task,
970 triggering_evt_rx: async_mpsc::Receiver<TriggeringEvent>,
971 pending_triggering_evt: Option<TriggeringEvent>,
972}
973
974macro_rules! exec_action {
975 ($self:ident, $action:ident, $triggering_evt:expr, $origin:expr, $task_inner:expr) => {{
976 match $self.task_impl.$action().await {
977 Ok(()) => Ok($triggering_evt),
978 Err(err) => {
979 let next_trigger = $self
983 .task_impl
984 .handle_action_error($triggering_evt.trigger, $origin, err)
985 .await;
986
987 gst::trace!(
989 RUNTIME_CAT,
990 obj = $self.task_impl.obj(),
991 "TaskImpl transition action error: converting {:?} to {next_trigger:?}",
992 $triggering_evt.trigger,
993 );
994
995 $triggering_evt.trigger = next_trigger;
996 $self.pending_triggering_evt = Some($triggering_evt);
997
998 Err(())
999 }
1000 }
1001 }};
1002}
1003
1004impl<Task: TaskImpl> StateMachine<Task> {
1005 fn spawn(
1006 task_inner: Arc<Mutex<TaskInner>>,
1007 task_impl: Task,
1008 context: Context,
1009 ) -> StateMachineHandle {
1010 let (triggering_evt_tx, triggering_evt_rx) = async_mpsc::channel(4);
1011
1012 let state_machine = StateMachine {
1013 task_impl,
1014 triggering_evt_rx,
1015 pending_triggering_evt: None,
1016 };
1017
1018 StateMachineHandle {
1019 join_handle: context.spawn_and_unpark(state_machine.run(task_inner)),
1020 triggering_evt_tx,
1021 context,
1022 }
1023 }
1024
1025 async fn run(mut self, task_inner: Arc<Mutex<TaskInner>>) {
1026 let mut triggering_evt = self
1027 .triggering_evt_rx
1028 .next()
1029 .await
1030 .expect("triggering_evt_rx dropped");
1031
1032 if let Trigger::Prepare = triggering_evt.trigger {
1033 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Preparing task");
1034
1035 let res = exec_action!(
1036 self,
1037 prepare,
1038 triggering_evt,
1039 TaskState::Unprepared,
1040 &task_inner
1041 );
1042 if let Ok(triggering_evt) = res {
1043 let mut task_inner = task_inner.lock().unwrap();
1044 let res = Ok(TransitionOk::Complete {
1045 origin: TaskState::Unprepared,
1046 target: TaskState::Prepared,
1047 });
1048
1049 task_inner.state = TaskState::Prepared;
1050 triggering_evt.send_ack(res);
1051
1052 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task Prepared");
1053 }
1054 } else {
1055 panic!("Unexpected initial trigger {:?}", triggering_evt.trigger);
1056 }
1057
1058 loop {
1059 triggering_evt = match self.pending_triggering_evt.take() {
1060 Some(pending_triggering_evt) => pending_triggering_evt,
1061 None => self
1062 .triggering_evt_rx
1063 .next()
1064 .await
1065 .expect("triggering_evt_rx dropped"),
1066 };
1067 gst::trace!(
1068 RUNTIME_CAT,
1069 obj = self.task_impl.obj(),
1070 "State machine popped {triggering_evt:?}"
1071 );
1072
1073 match triggering_evt.trigger {
1074 Trigger::Error => {
1075 let mut task_inner = task_inner.lock().unwrap();
1076 task_inner.switch_to_err(triggering_evt);
1077 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Switched to Error");
1078 }
1079 Trigger::Start => {
1080 let origin = {
1081 let mut task_inner = task_inner.lock().unwrap();
1082 let origin = task_inner.state;
1083 match origin {
1084 TaskState::Stopped | TaskState::Paused | TaskState::Prepared => (),
1085 TaskState::PausedFlushing => {
1086 task_inner.switch_to_state(TaskState::Flushing, triggering_evt);
1087 gst::trace!(
1088 RUNTIME_CAT,
1089 obj = self.task_impl.obj(),
1090 "Switched from PausedFlushing to Flushing"
1091 );
1092 continue;
1093 }
1094 TaskState::Error => {
1095 triggering_evt.send_err_ack();
1096 continue;
1097 }
1098 state => {
1099 task_inner.skip_triggering_evt(triggering_evt);
1100 gst::trace!(
1101 RUNTIME_CAT,
1102 obj = self.task_impl.obj(),
1103 "Skipped Start in state {state:?}"
1104 );
1105 continue;
1106 }
1107 }
1108
1109 origin
1110 };
1111
1112 self.start(triggering_evt, origin, &task_inner).await;
1113 }
1115 Trigger::Pause => {
1116 let (origin, target) = {
1117 let mut task_inner = task_inner.lock().unwrap();
1118 let origin = task_inner.state;
1119 match origin {
1120 TaskState::Started | TaskState::Stopped | TaskState::Prepared => {
1121 (origin, TaskState::Paused)
1122 }
1123 TaskState::Flushing => (origin, TaskState::PausedFlushing),
1124 TaskState::Error => (TaskState::Error, TaskState::Error),
1125 state => {
1126 task_inner.skip_triggering_evt(triggering_evt);
1127 gst::trace!(
1128 RUNTIME_CAT,
1129 obj = self.task_impl.obj(),
1130 "Skipped Pause in state {state:?}"
1131 );
1132 continue;
1133 }
1134 }
1135 };
1136
1137 let res = exec_action!(self, pause, triggering_evt, origin, &task_inner);
1138 if let Ok(triggering_evt) = res {
1139 task_inner
1140 .lock()
1141 .unwrap()
1142 .switch_to_state(target, triggering_evt);
1143 gst::trace!(
1144 RUNTIME_CAT,
1145 obj = self.task_impl.obj(),
1146 "Task loop {target:?}"
1147 );
1148 }
1149 }
1150 Trigger::Stop => {
1151 let (origin, target) = {
1152 let mut task_inner = task_inner.lock().unwrap();
1153 let origin = task_inner.state;
1154 match origin {
1155 TaskState::Started
1156 | TaskState::Paused
1157 | TaskState::PausedFlushing
1158 | TaskState::Flushing => (origin, TaskState::Stopped),
1159 TaskState::Error => (TaskState::Error, TaskState::Error),
1160 state => {
1161 task_inner.skip_triggering_evt(triggering_evt);
1162 gst::trace!(
1163 RUNTIME_CAT,
1164 obj = self.task_impl.obj(),
1165 "Skipped Stop in state {state:?}"
1166 );
1167 continue;
1168 }
1169 }
1170 };
1171
1172 let res = exec_action!(self, stop, triggering_evt, origin, &task_inner);
1173 if let Ok(triggering_evt) = res {
1174 task_inner
1175 .lock()
1176 .unwrap()
1177 .switch_to_state(target, triggering_evt);
1178 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1179 }
1180 }
1181 Trigger::FlushStart => {
1182 let (origin, target) = {
1183 let mut task_inner = task_inner.lock().unwrap();
1184 let origin = task_inner.state;
1185 match origin {
1186 TaskState::Started => (origin, TaskState::Flushing),
1187 TaskState::Paused => (origin, TaskState::PausedFlushing),
1188 TaskState::Error => (TaskState::Error, TaskState::Error),
1189 state => {
1190 task_inner.skip_triggering_evt(triggering_evt);
1191 gst::trace!(
1192 RUNTIME_CAT,
1193 obj = self.task_impl.obj(),
1194 "Skipped FlushStart in state {state:?}"
1195 );
1196 continue;
1197 }
1198 }
1199 };
1200
1201 let res = exec_action!(self, flush_start, triggering_evt, origin, &task_inner);
1202 if let Ok(triggering_evt) = res {
1203 task_inner
1204 .lock()
1205 .unwrap()
1206 .switch_to_state(target, triggering_evt);
1207 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task {target:?}");
1208 }
1209 }
1210 Trigger::FlushStop => {
1211 let origin = task_inner.lock().unwrap().state;
1212 let is_paused = match origin {
1213 TaskState::Flushing => false,
1214 TaskState::PausedFlushing => true,
1215 TaskState::Error => {
1216 triggering_evt.send_err_ack();
1217 continue;
1218 }
1219 state => {
1220 task_inner
1221 .lock()
1222 .unwrap()
1223 .skip_triggering_evt(triggering_evt);
1224 gst::trace!(
1225 RUNTIME_CAT,
1226 obj = self.task_impl.obj(),
1227 "Skipped FlushStop in state {state:?}"
1228 );
1229 continue;
1230 }
1231 };
1232
1233 let res = exec_action!(self, flush_stop, triggering_evt, origin, &task_inner);
1234 if let Ok(triggering_evt) = res {
1235 if is_paused {
1236 task_inner
1237 .lock()
1238 .unwrap()
1239 .switch_to_state(TaskState::Paused, triggering_evt);
1240 gst::trace!(
1241 RUNTIME_CAT,
1242 obj = self.task_impl.obj(),
1243 "Switched from PausedFlushing to Paused"
1244 );
1245 } else {
1246 self.start(triggering_evt, origin, &task_inner).await;
1247 }
1249 }
1250 }
1251 Trigger::Unprepare => {
1252 self.task_impl.unprepare().await;
1254
1255 task_inner
1256 .lock()
1257 .unwrap()
1258 .switch_to_state(TaskState::Unprepared, triggering_evt);
1259
1260 break;
1261 }
1262 _ => unreachable!("State machine handler {:?}", triggering_evt),
1263 }
1264 }
1265
1266 gst::trace!(
1267 RUNTIME_CAT,
1268 obj = self.task_impl.obj(),
1269 "Task state machine terminated"
1270 );
1271 }
1272
1273 async fn start(
1274 &mut self,
1275 mut triggering_evt: TriggeringEvent,
1276 origin: TaskState,
1277 task_inner: &Arc<Mutex<TaskInner>>,
1278 ) {
1279 match exec_action!(self, start, triggering_evt, origin, &task_inner) {
1280 Ok(triggering_evt) => {
1281 let mut task_inner = task_inner.lock().unwrap();
1282 task_inner.switch_to_state(TaskState::Started, triggering_evt);
1283 }
1284 Err(_) => {
1285 return;
1287 }
1288 }
1289
1290 match self.run_loop().await {
1291 Ok(()) => (),
1292 Err(err) => {
1293 let next_trigger = self.task_impl.handle_loop_error(err).await;
1294 let (triggering_evt, _) = TriggeringEvent::new(next_trigger);
1295 self.pending_triggering_evt = Some(triggering_evt);
1296 }
1297 }
1298 }
1299
1300 async fn run_loop(&mut self) -> Result<(), gst::FlowError> {
1301 gst::trace!(RUNTIME_CAT, obj = self.task_impl.obj(), "Task loop started");
1302
1303 let mut try_next_res;
1304 loop {
1305 try_next_res = {
1306 let mut try_next_fut = pin!(self.task_impl.try_next().fuse());
1313 futures::select_biased! {
1314 triggering_evt = self.triggering_evt_rx.next() => {
1315 let triggering_evt = triggering_evt.expect("broken state machine channel");
1316 gst::trace!(
1317 RUNTIME_CAT,
1318 "Task loop handing {:?} to state machine",
1319 triggering_evt,
1320 );
1321 self.pending_triggering_evt = Some(triggering_evt);
1322 return Ok(());
1323 }
1324 try_next_res = try_next_fut => try_next_res,
1325 }
1326 };
1327
1328 let item = try_next_res.inspect_err(|err| {
1329 gst::debug!(
1330 RUNTIME_CAT,
1331 obj = self.task_impl.obj(),
1332 "TaskImpl::try_next returned {err:?}"
1333 );
1334 })?;
1335
1336 self.task_impl.handle_item(item).await.inspect_err(|&err| {
1337 gst::debug!(
1338 RUNTIME_CAT,
1339 obj = self.task_impl.obj(),
1340 "TaskImpl::handle_item returned {err:?}"
1341 );
1342 })?;
1343 }
1344 }
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349 use futures::channel::{mpsc, oneshot};
1350 use futures::executor::block_on;
1351 use futures::prelude::*;
1352 use gst::glib;
1353 use gst::glib::prelude::*;
1354 use std::future::pending;
1355 use std::time::Duration;
1356
1357 use super::{
1358 Task, TaskImpl,
1359 TaskState::{self, *},
1360 TransitionError, TransitionOk,
1361 TransitionOk::*,
1362 TransitionStatus,
1363 TransitionStatus::*,
1364 Trigger::{self, *},
1365 };
1366 use crate::runtime::{Context, RUNTIME_CAT};
1367
1368 impl TransitionStatus {
1369 fn block_on(self) -> Result<TransitionOk, TransitionError> {
1372 assert!(!Context::is_context_thread());
1373 match self {
1374 Pending {
1375 trigger, res_fut, ..
1376 } => {
1377 gst::debug!(
1378 RUNTIME_CAT,
1379 "Awaiting for {:?} ack on current thread",
1380 trigger,
1381 );
1382 futures::executor::block_on(res_fut)
1383 }
1384 Ready(res) => res,
1385 }
1386 }
1387 }
1388
1389 #[track_caller]
1390 fn stop_then_unprepare(task: Task) {
1391 task.stop().block_on().unwrap();
1392 task.unprepare().block_on().unwrap();
1393 }
1394
1395 #[test]
1396 fn nominal() {
1397 gst::init().unwrap();
1398
1399 struct TaskTest {
1400 obj: gst::Object,
1401 prepared_sender: mpsc::Sender<()>,
1402 started_sender: mpsc::Sender<()>,
1403 try_next_ready_sender: mpsc::Sender<()>,
1404 try_next_receiver: mpsc::Receiver<()>,
1405 handle_item_ready_sender: mpsc::Sender<()>,
1406 handle_item_sender: mpsc::Sender<()>,
1407 paused_sender: mpsc::Sender<()>,
1408 stopped_sender: mpsc::Sender<()>,
1409 unprepared_sender: mpsc::Sender<()>,
1410 }
1411
1412 impl TaskImpl for TaskTest {
1413 type Item = ();
1414
1415 fn obj(&self) -> &impl IsA<glib::Object> {
1416 &self.obj
1417 }
1418
1419 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1420 gst::debug!(RUNTIME_CAT, "nominal: prepared");
1421 self.prepared_sender.send(()).await.unwrap();
1422 Ok(())
1423 }
1424
1425 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1426 gst::debug!(RUNTIME_CAT, "nominal: started");
1427 self.started_sender.send(()).await.unwrap();
1428 Ok(())
1429 }
1430
1431 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1432 gst::debug!(RUNTIME_CAT, "nominal: entering try_next");
1433 self.try_next_ready_sender.send(()).await.unwrap();
1434 gst::debug!(RUNTIME_CAT, "nominal: awaiting try_next");
1435 self.try_next_receiver.next().await.unwrap();
1436 Ok(())
1437 }
1438
1439 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1440 gst::debug!(RUNTIME_CAT, "nominal: entering handle_item");
1441 self.handle_item_ready_sender.send(()).await.unwrap();
1442
1443 gst::debug!(RUNTIME_CAT, "nominal: locked in handle_item");
1444 self.handle_item_sender.send(()).await.unwrap();
1445 gst::debug!(RUNTIME_CAT, "nominal: leaving handle_item");
1446
1447 Ok(())
1448 }
1449
1450 async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
1451 gst::debug!(RUNTIME_CAT, "nominal: paused");
1452 self.paused_sender.send(()).await.unwrap();
1453 Ok(())
1454 }
1455
1456 async fn stop(&mut self) -> Result<(), gst::ErrorMessage> {
1457 gst::debug!(RUNTIME_CAT, "nominal: stopped");
1458 self.stopped_sender.send(()).await.unwrap();
1459 Ok(())
1460 }
1461
1462 async fn unprepare(&mut self) {
1463 gst::debug!(RUNTIME_CAT, "nominal: unprepared");
1464 self.unprepared_sender.send(()).await.unwrap();
1465 }
1466 }
1467
1468 let context = Context::acquire("nominal", Duration::from_millis(2)).unwrap();
1469
1470 let task = Task::default();
1471
1472 assert_eq!(task.state(), Unprepared);
1473
1474 gst::debug!(RUNTIME_CAT, "nominal: preparing");
1475
1476 let (prepared_sender, mut prepared_receiver) = mpsc::channel(1);
1477 let (started_sender, mut started_receiver) = mpsc::channel(1);
1478 let (try_next_ready_sender, mut try_next_ready_receiver) = mpsc::channel(1);
1479 let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
1480 let (handle_item_ready_sender, mut handle_item_ready_receiver) = mpsc::channel(1);
1481 let (handle_item_sender, mut handle_item_receiver) = mpsc::channel(0);
1482 let (paused_sender, mut paused_receiver) = mpsc::channel(1);
1483 let (stopped_sender, mut stopped_receiver) = mpsc::channel(1);
1484 let (unprepared_sender, mut unprepared_receiver) = mpsc::channel(1);
1485 let obj = gst::Pad::builder(gst::PadDirection::Unknown)
1486 .name("runtime::Task::nominal")
1487 .build();
1488 let prepare_status = task.prepare(
1489 TaskTest {
1490 obj: obj.clone().into(),
1491 prepared_sender,
1492 started_sender,
1493 try_next_ready_sender,
1494 try_next_receiver,
1495 handle_item_ready_sender,
1496 handle_item_sender,
1497 paused_sender,
1498 stopped_sender,
1499 unprepared_sender,
1500 },
1501 context,
1502 );
1503
1504 assert!(prepare_status.is_pending());
1505 match prepare_status {
1506 Pending {
1507 trigger: Prepare,
1508 origin: Unprepared,
1509 ..
1510 } => (),
1511 other => panic!("{other:?}"),
1512 };
1513
1514 gst::debug!(RUNTIME_CAT, "nominal: starting (async prepare)");
1515 let start_status = task.start().check().unwrap();
1516
1517 block_on(prepared_receiver.next()).unwrap();
1518 assert_eq!(
1520 prepare_status.block_on_or_add_subtask(&obj).unwrap(),
1521 Complete {
1522 origin: Unprepared,
1523 target: Prepared,
1524 },
1525 );
1526
1527 block_on(started_receiver.next()).unwrap();
1528 assert_eq!(
1529 start_status.block_on().unwrap(),
1530 Complete {
1531 origin: Prepared,
1532 target: Started,
1533 }
1534 );
1535 assert_eq!(task.state(), Started);
1536
1537 block_on(try_next_ready_receiver.next()).unwrap();
1539 block_on(try_next_sender.send(())).unwrap();
1540 block_on(handle_item_ready_receiver.next()).unwrap();
1541 block_on(handle_item_receiver.next()).unwrap();
1542
1543 gst::debug!(RUNTIME_CAT, "nominal: starting (redundant)");
1544 assert_eq!(
1546 task.start().block_on().unwrap(),
1547 Skipped {
1548 trigger: Start,
1549 state: Started,
1550 },
1551 );
1552 assert_eq!(task.state(), Started);
1553
1554 match task.unprepare().check().unwrap_err() {
1556 TransitionError {
1557 trigger: Unprepare,
1558 state: Started,
1559 ..
1560 } => (),
1561 other => panic!("{other:?}"),
1562 }
1563
1564 gst::debug!(RUNTIME_CAT, "nominal: pause cancelling try_next");
1565 block_on(try_next_ready_receiver.next()).unwrap();
1566
1567 let pause_status = task.pause().check().unwrap();
1568 gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1569 block_on(paused_receiver.next()).unwrap();
1570 assert_eq!(
1571 pause_status.block_on().unwrap(),
1572 Complete {
1573 origin: Started,
1574 target: Paused,
1575 },
1576 );
1577
1578 assert!(handle_item_ready_receiver.try_recv().is_err());
1580 assert!(try_next_ready_receiver.try_recv().is_err());
1582
1583 gst::debug!(
1584 RUNTIME_CAT,
1585 "nominal: starting (after pause cancelling try_next)"
1586 );
1587 let start_receiver = task.start().check().unwrap();
1588 block_on(started_receiver.next()).unwrap();
1589 assert_eq!(
1590 start_receiver.block_on().unwrap(),
1591 Complete {
1592 origin: Paused,
1593 target: Started,
1594 },
1595 );
1596 assert_eq!(task.state(), Started);
1597
1598 gst::debug!(RUNTIME_CAT, "nominal: pause // handle_item");
1599 block_on(try_next_ready_receiver.next()).unwrap();
1600 block_on(try_next_sender.send(())).unwrap();
1601 block_on(handle_item_ready_receiver.next()).unwrap();
1603
1604 gst::debug!(RUNTIME_CAT, "nominal: requesting to pause");
1605 let pause_status = task.pause().check().unwrap();
1606
1607 gst::debug!(RUNTIME_CAT, "nominal: unlocking item handling");
1608 block_on(handle_item_receiver.next()).unwrap();
1609
1610 gst::debug!(RUNTIME_CAT, "nominal: awaiting pause ack");
1611 block_on(paused_receiver.next()).unwrap();
1612 assert_eq!(
1613 pause_status.block_on().unwrap(),
1614 Complete {
1615 origin: Started,
1616 target: Paused,
1617 },
1618 );
1619
1620 assert!(try_next_ready_receiver.try_recv().is_err());
1622
1623 gst::debug!(
1624 RUNTIME_CAT,
1625 "nominal: starting (after pause // handle_item)"
1626 );
1627 let start_receiver = task.start().check().unwrap();
1628 block_on(started_receiver.next()).unwrap();
1629 assert_eq!(
1630 start_receiver.block_on().unwrap(),
1631 Complete {
1632 origin: Paused,
1633 target: Started,
1634 },
1635 );
1636 assert_eq!(task.state(), Started);
1637
1638 gst::debug!(RUNTIME_CAT, "nominal: stopping");
1639 assert_eq!(
1640 task.stop().block_on().unwrap(),
1641 Complete {
1642 origin: Started,
1643 target: Stopped,
1644 },
1645 );
1646
1647 assert_eq!(task.state(), Stopped);
1648 let _ = block_on(stopped_receiver.next());
1649
1650 let _ = try_next_ready_receiver.try_recv();
1652
1653 gst::debug!(RUNTIME_CAT, "nominal: starting (after stop)");
1654 assert_eq!(
1655 task.start().block_on().unwrap(),
1656 Complete {
1657 origin: Stopped,
1658 target: Started,
1659 },
1660 );
1661 let _ = block_on(started_receiver.next());
1662
1663 gst::debug!(RUNTIME_CAT, "nominal: stopping");
1664 assert_eq!(
1665 task.stop().block_on().unwrap(),
1666 Complete {
1667 origin: Started,
1668 target: Stopped,
1669 },
1670 );
1671
1672 assert_eq!(
1673 task.unprepare().block_on().unwrap(),
1674 Complete {
1675 origin: Stopped,
1676 target: Unprepared,
1677 },
1678 );
1679
1680 assert_eq!(task.state(), Unprepared);
1681 let _ = block_on(unprepared_receiver.next());
1682 }
1683
1684 #[test]
1685 fn prepare_error() {
1686 gst::init().unwrap();
1687
1688 struct TaskPrepareTest {
1689 obj: gst::Object,
1690 prepare_error_sender: mpsc::Sender<()>,
1691 }
1692
1693 impl TaskImpl for TaskPrepareTest {
1694 type Item = ();
1695
1696 fn obj(&self) -> &impl IsA<glib::Object> {
1697 &self.obj
1698 }
1699
1700 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1701 gst::debug!(RUNTIME_CAT, "prepare_error: prepare returning an error");
1702 Err(gst::error_msg!(
1703 gst::ResourceError::Failed,
1704 ["prepare_error: intentional error"]
1705 ))
1706 }
1707
1708 async fn handle_action_error(
1709 &mut self,
1710 trigger: Trigger,
1711 state: TaskState,
1712 err: gst::ErrorMessage,
1713 ) -> Trigger {
1714 gst::debug!(
1715 RUNTIME_CAT,
1716 "prepare_error: handling prepare error {:?}",
1717 err
1718 );
1719 match (trigger, state) {
1720 (Prepare, Unprepared) => {
1721 self.prepare_error_sender.send(()).await.unwrap();
1722 }
1723 other => unreachable!("{:?}", other),
1724 }
1725 Trigger::Error
1726 }
1727
1728 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1729 unreachable!("prepare_error: try_next");
1730 }
1731
1732 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1733 unreachable!("prepare_error: handle_item");
1734 }
1735 }
1736
1737 let context = Context::acquire("prepare_error", Duration::from_millis(2)).unwrap();
1738
1739 let task = Task::default();
1740
1741 assert_eq!(task.state(), Unprepared);
1742
1743 let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
1744 let prepare_status = task.prepare(
1745 TaskPrepareTest {
1746 obj: gst::Pad::builder(gst::PadDirection::Unknown)
1747 .name("runtime::Task::prepare_error")
1748 .build()
1749 .into(),
1750 prepare_error_sender,
1751 },
1752 context,
1753 );
1754
1755 gst::debug!(
1756 RUNTIME_CAT,
1757 "prepare_error: await action error notification"
1758 );
1759 block_on(prepare_error_receiver.next()).unwrap();
1760
1761 match prepare_status.block_on().unwrap_err() {
1762 TransitionError {
1763 trigger: Trigger::Error,
1764 state: Preparing,
1765 ..
1766 } => (),
1767 other => panic!("{other:?}"),
1768 }
1769
1770 while TaskState::Error != task.state() {
1772 std::thread::sleep(Duration::from_millis(2));
1773 }
1774
1775 match task.start().block_on().unwrap_err() {
1776 TransitionError {
1777 trigger: Start,
1778 state: TaskState::Error,
1779 ..
1780 } => (),
1781 other => panic!("{other:?}"),
1782 }
1783
1784 block_on(task.unprepare()).unwrap();
1785 }
1786
1787 #[test]
1788 fn prepare_start_ok() {
1789 gst::init().unwrap();
1792
1793 struct TaskPrepareTest {
1794 obj: gst::Object,
1795 prepare_receiver: mpsc::Receiver<()>,
1796 }
1797
1798 impl TaskImpl for TaskPrepareTest {
1799 type Item = ();
1800
1801 fn obj(&self) -> &impl IsA<glib::Object> {
1802 &self.obj
1803 }
1804
1805 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1806 gst::debug!(
1807 RUNTIME_CAT,
1808 "prepare_start_ok: preparation awaiting trigger"
1809 );
1810 self.prepare_receiver.next().await.unwrap();
1811 gst::debug!(RUNTIME_CAT, "prepare_start_ok: preparation complete Ok");
1812 Ok(())
1813 }
1814
1815 async fn handle_action_error(
1816 &mut self,
1817 _trigger: Trigger,
1818 _state: TaskState,
1819 _err: gst::ErrorMessage,
1820 ) -> Trigger {
1821 unreachable!("prepare_start_ok: handle_prepare_error");
1822 }
1823
1824 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1825 gst::debug!(RUNTIME_CAT, "prepare_start_ok: started");
1826 Ok(())
1827 }
1828
1829 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1830 pending().await
1831 }
1832
1833 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1834 unreachable!("prepare_start_ok: handle_item");
1835 }
1836 }
1837
1838 let context = Context::acquire("prepare_start_ok", Duration::from_millis(2)).unwrap();
1839
1840 let task = Task::default();
1841
1842 let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
1843 let fut = task.prepare(
1844 TaskPrepareTest {
1845 obj: gst::Pad::builder(gst::PadDirection::Unknown)
1846 .name("runtime::Task::prepare_start_ok")
1847 .build()
1848 .into(),
1849 prepare_receiver,
1850 },
1851 context,
1852 );
1853 drop(fut);
1854
1855 let start_ctx = Context::acquire("prepare_start_ok_requester", Duration::ZERO).unwrap();
1856 let (ready_sender, ready_receiver) = oneshot::channel();
1857 let start_handle = start_ctx.spawn(async move {
1858 assert_eq!(task.state(), Preparing);
1859 gst::debug!(RUNTIME_CAT, "prepare_start_ok: starting");
1860 let start_status = task.start();
1861 match start_status {
1862 Pending {
1863 trigger: Start,
1864 origin: Preparing,
1865 ..
1866 } => (),
1867 other => panic!("{other:?}"),
1868 }
1869 ready_sender.send(()).unwrap();
1870 assert_eq!(
1871 start_status.await.unwrap(),
1872 Complete {
1873 origin: Prepared,
1874 target: Started,
1875 },
1876 );
1877 assert_eq!(task.state(), Started);
1878
1879 let stop_status = task.stop();
1880 match stop_status {
1881 Pending {
1882 trigger: Stop,
1883 origin: Started,
1884 ..
1885 } => (),
1886 other => panic!("{other:?}"),
1887 }
1888 assert_eq!(
1889 stop_status.await.unwrap(),
1890 Complete {
1891 origin: Started,
1892 target: Stopped,
1893 },
1894 );
1895 assert_eq!(task.state(), Stopped);
1896
1897 let unprepare_status = task.unprepare();
1898 match unprepare_status {
1899 Pending {
1900 trigger: Unprepare,
1901 origin: Stopped,
1902 ..
1903 } => (),
1904 other => panic!("{other:?}"),
1905 };
1906 assert_eq!(
1907 unprepare_status.await.unwrap(),
1908 Complete {
1909 origin: Stopped,
1910 target: Unprepared,
1911 },
1912 );
1913 assert_eq!(task.state(), Unprepared);
1914 });
1915
1916 gst::debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx");
1917 block_on(ready_receiver).unwrap();
1918
1919 gst::debug!(RUNTIME_CAT, "prepare_start_ok: triggering preparation");
1920 block_on(prepare_sender.send(())).unwrap();
1921
1922 block_on(start_handle).unwrap();
1923 }
1924
1925 #[test]
1926 fn prepare_start_error() {
1927 gst::init().unwrap();
1930
1931 struct TaskPrepareTest {
1932 obj: gst::Object,
1933 prepare_receiver: mpsc::Receiver<()>,
1934 prepare_error_sender: mpsc::Sender<()>,
1935 }
1936
1937 impl TaskImpl for TaskPrepareTest {
1938 type Item = ();
1939
1940 fn obj(&self) -> &impl IsA<glib::Object> {
1941 &self.obj
1942 }
1943
1944 async fn prepare(&mut self) -> Result<(), gst::ErrorMessage> {
1945 gst::debug!(
1946 RUNTIME_CAT,
1947 "prepare_start_error: preparation awaiting trigger"
1948 );
1949 self.prepare_receiver.next().await.unwrap();
1950 gst::debug!(RUNTIME_CAT, "prepare_start_error: preparation complete Err");
1951
1952 Err(gst::error_msg!(
1953 gst::ResourceError::Failed,
1954 ["prepare_start_error: intentional error"]
1955 ))
1956 }
1957
1958 async fn handle_action_error(
1959 &mut self,
1960 trigger: Trigger,
1961 state: TaskState,
1962 err: gst::ErrorMessage,
1963 ) -> Trigger {
1964 gst::debug!(
1965 RUNTIME_CAT,
1966 "prepare_start_error: handling prepare error {:?}",
1967 err
1968 );
1969 match (trigger, state) {
1970 (Prepare, Unprepared) => {
1971 self.prepare_error_sender.send(()).await.unwrap();
1972 }
1973 other => panic!("action error for {other:?}"),
1974 }
1975 Trigger::Error
1976 }
1977
1978 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
1979 unreachable!("prepare_start_error: start");
1980 }
1981
1982 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
1983 unreachable!("prepare_start_error: try_next");
1984 }
1985
1986 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
1987 unreachable!("prepare_start_error: handle_item");
1988 }
1989 }
1990
1991 let context = Context::acquire("prepare_start_error", Duration::from_millis(2)).unwrap();
1992
1993 let task = Task::default();
1994
1995 let (mut prepare_sender, prepare_receiver) = mpsc::channel(1);
1996 let (prepare_error_sender, mut prepare_error_receiver) = mpsc::channel(1);
1997 let prepare_status = task.prepare(
1998 TaskPrepareTest {
1999 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2000 .name("runtime::Task::prepare_start_error")
2001 .build()
2002 .into(),
2003 prepare_receiver,
2004 prepare_error_sender,
2005 },
2006 context,
2007 );
2008 match prepare_status {
2009 Pending {
2010 trigger: Prepare,
2011 origin: Unprepared,
2012 ..
2013 } => (),
2014 other => panic!("{other:?}"),
2015 };
2016
2017 let start_ctx = Context::acquire("prepare_start_error_requester", Duration::ZERO).unwrap();
2018 let (ready_sender, ready_receiver) = oneshot::channel();
2019 let start_handle = start_ctx.spawn(async move {
2020 gst::debug!(RUNTIME_CAT, "prepare_start_error: starting (Err)");
2021 let fut = task.start();
2022 drop(fut);
2023 ready_sender.send(()).unwrap();
2024 match prepare_status.await {
2028 Err(TransitionError {
2029 trigger: Trigger::Error,
2030 state: Preparing,
2031 ..
2032 }) => (),
2033 other => panic!("{other:?}"),
2034 }
2035
2036 let unprepare_status = task.unprepare();
2037 match unprepare_status {
2038 Pending {
2039 trigger: Unprepare,
2040 origin: TaskState::Error,
2041 ..
2042 } => (),
2043 other => panic!("{other:?}"),
2044 };
2045 assert_eq!(
2046 unprepare_status.await.unwrap(),
2047 Complete {
2048 origin: TaskState::Error,
2049 target: Unprepared,
2050 },
2051 );
2052 });
2053
2054 gst::debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx");
2055 block_on(ready_receiver).unwrap();
2056
2057 gst::debug!(
2058 RUNTIME_CAT,
2059 "prepare_start_error: triggering preparation (failure)"
2060 );
2061 block_on(prepare_sender.send(())).unwrap();
2062
2063 gst::debug!(
2064 RUNTIME_CAT,
2065 "prepare_start_error: await prepare error notification"
2066 );
2067 block_on(prepare_error_receiver.next()).unwrap();
2068
2069 block_on(start_handle).unwrap();
2070 }
2071
2072 #[test]
2073 fn item_error() {
2074 gst::init().unwrap();
2075
2076 struct TaskTest {
2077 obj: gst::Object,
2078 try_next_receiver: mpsc::Receiver<gst::FlowError>,
2079 }
2080
2081 impl TaskImpl for TaskTest {
2082 type Item = gst::FlowError;
2083
2084 fn obj(&self) -> &impl IsA<glib::Object> {
2085 &self.obj
2086 }
2087
2088 async fn try_next(&mut self) -> Result<gst::FlowError, gst::FlowError> {
2089 gst::debug!(RUNTIME_CAT, "item_error: awaiting try_next");
2090 Ok(self.try_next_receiver.next().await.unwrap())
2091 }
2092
2093 async fn handle_item(&mut self, item: gst::FlowError) -> Result<(), gst::FlowError> {
2094 gst::debug!(RUNTIME_CAT, "item_error: handle_item received {:?}", item);
2095 Err(item)
2096 }
2097 }
2098
2099 let context = Context::acquire("item_error", Duration::from_millis(2)).unwrap();
2100 let task = Task::default();
2101 gst::debug!(RUNTIME_CAT, "item_error: prepare and start");
2102 let (mut try_next_sender, try_next_receiver) = mpsc::channel(1);
2103 task.prepare(
2104 TaskTest {
2105 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2106 .name("runtime::Task::item_error")
2107 .build()
2108 .into(),
2109 try_next_receiver,
2110 },
2111 context,
2112 )
2113 .block_on()
2114 .unwrap();
2115 task.start().block_on().unwrap();
2116
2117 gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Eos");
2118 block_on(try_next_sender.send(gst::FlowError::Eos)).unwrap();
2119 while Stopped != task.state() {
2121 std::thread::sleep(Duration::from_millis(2));
2122 }
2123
2124 gst::debug!(RUNTIME_CAT, "item_error: starting (after stop)");
2125 assert_eq!(
2126 task.start().block_on().unwrap(),
2127 Complete {
2128 origin: Stopped,
2129 target: Started,
2130 },
2131 );
2132
2133 gst::debug!(RUNTIME_CAT, "item_error: req. handle_item to return Error");
2134 block_on(try_next_sender.send(gst::FlowError::Error)).unwrap();
2135 while TaskState::Error != task.state() {
2137 std::thread::sleep(Duration::from_millis(2));
2138 }
2139
2140 gst::debug!(RUNTIME_CAT, "item_error: attempting to start (after Error)");
2141 match task.start().block_on().unwrap_err() {
2142 TransitionError {
2143 trigger: Start,
2144 state: TaskState::Error,
2145 ..
2146 } => (),
2147 other => panic!("{other:?}"),
2148 }
2149
2150 assert_eq!(
2151 task.unprepare().block_on().unwrap(),
2152 Complete {
2153 origin: TaskState::Error,
2154 target: Unprepared,
2155 },
2156 );
2157 }
2158
2159 #[test]
2160 fn flush_regular_sync() {
2161 gst::init().unwrap();
2162
2163 struct TaskFlushTest {
2164 obj: gst::Object,
2165 flush_start_sender: mpsc::Sender<()>,
2166 flush_stop_sender: mpsc::Sender<()>,
2167 }
2168
2169 impl TaskImpl for TaskFlushTest {
2170 type Item = ();
2171
2172 fn obj(&self) -> &impl IsA<glib::Object> {
2173 &self.obj
2174 }
2175
2176 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2177 pending().await
2178 }
2179
2180 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2181 unreachable!("flush_regular_sync: handle_item");
2182 }
2183
2184 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2185 gst::debug!(RUNTIME_CAT, "flush_regular_sync: started flushing");
2186 self.flush_start_sender.send(()).await.unwrap();
2187 Ok(())
2188 }
2189
2190 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2191 gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopped flushing");
2192 self.flush_stop_sender.send(()).await.unwrap();
2193 Ok(())
2194 }
2195 }
2196
2197 let context = Context::acquire("flush_regular_sync", Duration::from_millis(2)).unwrap();
2198
2199 let task = Task::default();
2200
2201 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2202 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2203 let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2204 .name("runtime::Task::flush_regular_sync")
2205 .build();
2206 let fut = task.prepare(
2207 TaskFlushTest {
2208 obj: obj.clone().into(),
2209 flush_start_sender,
2210 flush_stop_sender,
2211 },
2212 context,
2213 );
2214 drop(fut);
2215
2216 gst::debug!(RUNTIME_CAT, "flush_regular_sync: start");
2217 block_on(task.start()).unwrap();
2218
2219 gst::debug!(RUNTIME_CAT, "flush_regular_sync: starting flush");
2220 assert_eq!(
2221 task.flush_start().block_on().unwrap(),
2222 Complete {
2223 origin: Started,
2224 target: Flushing,
2225 },
2226 );
2227 assert_eq!(task.state(), Flushing);
2228
2229 block_on(flush_start_receiver.next()).unwrap();
2230
2231 gst::debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush");
2232 assert_eq!(
2233 task.flush_stop().block_on_or_add_subtask(&obj).unwrap(),
2234 Complete {
2235 origin: Flushing,
2236 target: Started,
2237 },
2238 );
2239 assert_eq!(task.state(), Started);
2240
2241 block_on(flush_stop_receiver.next()).unwrap();
2242
2243 let fut = task.pause();
2244 drop(fut);
2245 stop_then_unprepare(task);
2246 }
2247
2248 #[test]
2249 fn flush_regular_different_context() {
2250 gst::init().unwrap();
2252
2253 struct TaskFlushTest {
2254 obj: gst::Object,
2255 flush_start_sender: mpsc::Sender<()>,
2256 flush_stop_sender: mpsc::Sender<()>,
2257 }
2258
2259 impl TaskImpl for TaskFlushTest {
2260 type Item = ();
2261
2262 fn obj(&self) -> &impl IsA<glib::Object> {
2263 &self.obj
2264 }
2265
2266 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2267 pending().await
2268 }
2269
2270 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2271 unreachable!("flush_regular_different_context: handle_item");
2272 }
2273
2274 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2275 gst::debug!(
2276 RUNTIME_CAT,
2277 "flush_regular_different_context: started flushing"
2278 );
2279 self.flush_start_sender.send(()).await.unwrap();
2280 Ok(())
2281 }
2282
2283 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2284 gst::debug!(
2285 RUNTIME_CAT,
2286 "flush_regular_different_context: stopped flushing"
2287 );
2288 self.flush_stop_sender.send(()).await.unwrap();
2289 Ok(())
2290 }
2291 }
2292
2293 let context =
2294 Context::acquire("flush_regular_different_context", Duration::from_millis(2)).unwrap();
2295
2296 let task = Task::default();
2297
2298 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2299 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2300 let obj = gst::Pad::builder(gst::PadDirection::Unknown)
2301 .name("runtime::Task::flush_regular_different_context")
2302 .build();
2303 let fut = task.prepare(
2304 TaskFlushTest {
2305 obj: obj.clone().into(),
2306 flush_start_sender,
2307 flush_stop_sender,
2308 },
2309 context,
2310 );
2311 drop(fut);
2312
2313 gst::debug!(RUNTIME_CAT, "flush_regular_different_context: start");
2314 task.start().block_on().unwrap();
2315
2316 let oob_context = Context::acquire(
2317 "flush_regular_different_context_oob",
2318 Duration::from_millis(2),
2319 )
2320 .unwrap();
2321
2322 let task_clone = task.clone();
2323 let flush_res_fut = oob_context.spawn(async move {
2324 let flush_start_status = task_clone.flush_start();
2325 match flush_start_status {
2326 Pending {
2327 trigger: FlushStart,
2328 origin: Started,
2329 ..
2330 } => (),
2331 other => panic!("{other:?}"),
2332 };
2333 assert_eq!(
2334 flush_start_status.await.unwrap(),
2335 Complete {
2336 origin: Started,
2337 target: Flushing,
2338 },
2339 );
2340 assert_eq!(task_clone.state(), Flushing);
2341 flush_start_receiver.next().await.unwrap();
2342
2343 let flush_stop_status = task_clone.flush_stop();
2344 match flush_stop_status {
2345 Pending {
2346 trigger: FlushStop,
2347 origin: Flushing,
2348 ..
2349 } => (),
2350 other => panic!("{other:?}"),
2351 };
2352 assert_eq!(
2353 flush_stop_status.block_on_or_add_subtask(&obj).unwrap(),
2354 NotWaiting {
2355 trigger: FlushStop,
2356 origin: Flushing,
2357 },
2358 );
2359
2360 Context::drain_sub_tasks().await.unwrap();
2361 assert_eq!(task_clone.state(), Started);
2362 });
2363
2364 block_on(flush_res_fut).unwrap();
2365 block_on(flush_stop_receiver.next()).unwrap();
2366
2367 stop_then_unprepare(task);
2368 }
2369
2370 #[test]
2371 fn flush_regular_same_context() {
2372 gst::init().unwrap();
2374
2375 struct TaskFlushTest {
2376 obj: gst::Object,
2377 flush_start_sender: mpsc::Sender<()>,
2378 flush_stop_sender: mpsc::Sender<()>,
2379 }
2380
2381 impl TaskImpl for TaskFlushTest {
2382 type Item = ();
2383
2384 fn obj(&self) -> &impl IsA<glib::Object> {
2385 &self.obj
2386 }
2387
2388 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2389 pending().await
2390 }
2391
2392 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2393 unreachable!("flush_regular_same_context: handle_item");
2394 }
2395
2396 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2397 gst::debug!(RUNTIME_CAT, "flush_regular_same_context: started flushing");
2398 self.flush_start_sender.send(()).await.unwrap();
2399 Ok(())
2400 }
2401
2402 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2403 gst::debug!(RUNTIME_CAT, "flush_regular_same_context: stopped flushing");
2404 self.flush_stop_sender.send(()).await.unwrap();
2405 Ok(())
2406 }
2407 }
2408
2409 let context =
2410 Context::acquire("flush_regular_same_context", Duration::from_millis(2)).unwrap();
2411
2412 let task = Task::default();
2413
2414 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2415 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2416 let fut = task.prepare(
2417 TaskFlushTest {
2418 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2419 .name("runtime::Task::flush_regular_same_context")
2420 .build()
2421 .into(),
2422 flush_start_sender,
2423 flush_stop_sender,
2424 },
2425 context.clone(),
2426 );
2427 drop(fut);
2428
2429 block_on(task.start()).unwrap();
2430
2431 let task_clone = task.clone();
2432 let flush_handle = context.spawn(async move {
2433 let flush_start_status = task_clone.flush_start();
2434 match flush_start_status {
2435 Pending {
2436 trigger: FlushStart,
2437 origin: Started,
2438 ..
2439 } => (),
2440 other => panic!("{other:?}"),
2441 };
2442 assert_eq!(
2443 flush_start_status.await.unwrap(),
2444 Complete {
2445 origin: Started,
2446 target: Flushing,
2447 },
2448 );
2449 assert_eq!(task_clone.state(), Flushing);
2450 flush_start_receiver.next().await.unwrap();
2451
2452 let flush_stop_status = task_clone.flush_stop();
2453 match flush_stop_status {
2454 Pending {
2455 trigger: FlushStop,
2456 origin: Flushing,
2457 ..
2458 } => (),
2459 other => panic!("{other:?}"),
2460 };
2461 assert_eq!(
2462 flush_stop_status.await.unwrap(),
2463 Complete {
2464 origin: Flushing,
2465 target: Started,
2466 },
2467 );
2468 assert_eq!(task_clone.state(), Started);
2469 });
2470
2471 block_on(flush_handle).unwrap();
2472 block_on(flush_stop_receiver.next()).unwrap();
2473
2474 stop_then_unprepare(task);
2475 }
2476
2477 #[test]
2478 fn flush_from_loop() {
2479 gst::init().unwrap();
2481
2482 struct TaskFlushTest {
2483 obj: gst::Object,
2484 task: Task,
2485 flush_start_sender: mpsc::Sender<()>,
2486 }
2487
2488 impl TaskImpl for TaskFlushTest {
2489 type Item = ();
2490
2491 fn obj(&self) -> &impl IsA<glib::Object> {
2492 &self.obj
2493 }
2494
2495 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2496 Ok(())
2497 }
2498
2499 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2500 gst::debug!(RUNTIME_CAT, "flush_from_loop: flush_start from handle_item");
2501 match self.task.flush_start() {
2502 Pending {
2503 trigger: FlushStart,
2504 origin: Started,
2505 ..
2506 } => (),
2507 other => panic!("{other:?}"),
2508 }
2509 Ok(())
2510 }
2511
2512 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2513 gst::debug!(RUNTIME_CAT, "flush_from_loop: started flushing");
2514 self.flush_start_sender.send(()).await.unwrap();
2515 Ok(())
2516 }
2517 }
2518
2519 let context = Context::acquire("flush_from_loop", Duration::from_millis(2)).unwrap();
2520
2521 let task = Task::default();
2522
2523 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2524 let fut = task.prepare(
2525 TaskFlushTest {
2526 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2527 .name("runtime::Task::flush_from_loop")
2528 .build()
2529 .into(),
2530 task: task.clone(),
2531 flush_start_sender,
2532 },
2533 context,
2534 );
2535 drop(fut);
2536
2537 let fut = task.start();
2538 drop(fut);
2539
2540 gst::debug!(
2541 RUNTIME_CAT,
2542 "flush_from_loop: awaiting flush_start notification"
2543 );
2544 block_on(flush_start_receiver.next()).unwrap();
2545
2546 assert_eq!(
2547 task.stop().block_on().unwrap(),
2548 Complete {
2549 origin: Flushing,
2550 target: Stopped,
2551 },
2552 );
2553 task.unprepare().block_on().unwrap();
2554 }
2555
2556 #[test]
2557 fn pause_from_loop() {
2558 gst::init().unwrap();
2561
2562 struct TaskStartTest {
2563 obj: gst::Object,
2564 task: Task,
2565 pause_sender: mpsc::Sender<()>,
2566 }
2567
2568 impl TaskImpl for TaskStartTest {
2569 type Item = ();
2570
2571 fn obj(&self) -> &impl IsA<glib::Object> {
2572 &self.obj
2573 }
2574
2575 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2576 Ok(())
2577 }
2578
2579 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2580 gst::debug!(RUNTIME_CAT, "pause_from_loop: entering handle_item");
2581
2582 crate::runtime::timer::delay_for(Duration::from_millis(50)).await;
2583
2584 gst::debug!(RUNTIME_CAT, "pause_from_loop: pause from handle_item");
2585 match self.task.pause() {
2586 Pending {
2587 trigger: Pause,
2588 origin: Started,
2589 ..
2590 } => (),
2591 other => panic!("{other:?}"),
2592 }
2593
2594 Ok(())
2595 }
2596
2597 async fn pause(&mut self) -> Result<(), gst::ErrorMessage> {
2598 gst::debug!(RUNTIME_CAT, "pause_from_loop: entering pause action");
2599 self.pause_sender.send(()).await.unwrap();
2600 Ok(())
2601 }
2602 }
2603
2604 let context = Context::acquire("pause_from_loop", Duration::from_millis(2)).unwrap();
2605
2606 let task = Task::default();
2607
2608 let (pause_sender, mut pause_receiver) = mpsc::channel(1);
2609 let fut = task.prepare(
2610 TaskStartTest {
2611 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2612 .name("runtime::Task::pause_from_loop")
2613 .build()
2614 .into(),
2615 task: task.clone(),
2616 pause_sender,
2617 },
2618 context,
2619 );
2620 drop(fut);
2621
2622 let fut = task.start();
2623 drop(fut);
2624
2625 gst::debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification");
2626 block_on(pause_receiver.next()).unwrap();
2627
2628 stop_then_unprepare(task);
2629 }
2630
2631 #[test]
2632 fn trigger_from_action() {
2633 gst::init().unwrap();
2635
2636 struct TaskFlushTest {
2637 obj: gst::Object,
2638 task: Task,
2639 flush_stop_sender: mpsc::Sender<()>,
2640 }
2641
2642 impl TaskImpl for TaskFlushTest {
2643 type Item = ();
2644
2645 fn obj(&self) -> &impl IsA<glib::Object> {
2646 &self.obj
2647 }
2648
2649 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2650 pending().await
2651 }
2652
2653 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2654 unreachable!("trigger_from_action: handle_item");
2655 }
2656
2657 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2658 gst::debug!(
2659 RUNTIME_CAT,
2660 "trigger_from_action: flush_start triggering flush_stop"
2661 );
2662 match self.task.flush_stop() {
2663 Pending {
2664 trigger: FlushStop,
2665 origin: Started,
2666 ..
2667 } => (),
2668 other => panic!("{other:?}"),
2669 }
2670
2671 Ok(())
2672 }
2673
2674 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2675 gst::debug!(RUNTIME_CAT, "trigger_from_action: stopped flushing");
2676 self.flush_stop_sender.send(()).await.unwrap();
2677 Ok(())
2678 }
2679 }
2680
2681 let context = Context::acquire("trigger_from_action", Duration::from_millis(2)).unwrap();
2682
2683 let task = Task::default();
2684
2685 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2686 let fut = task.prepare(
2687 TaskFlushTest {
2688 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2689 .name("runtime::Task::trigger_from_action")
2690 .build()
2691 .into(),
2692 task: task.clone(),
2693 flush_stop_sender,
2694 },
2695 context,
2696 );
2697 drop(fut);
2698
2699 task.start().block_on().unwrap();
2700 let fut = task.flush_start();
2701 drop(fut);
2702
2703 gst::debug!(
2704 RUNTIME_CAT,
2705 "trigger_from_action: awaiting flush_stop notification"
2706 );
2707 block_on(flush_stop_receiver.next()).unwrap();
2708
2709 stop_then_unprepare(task);
2710 }
2711
2712 #[test]
2713 fn pause_flush_start() {
2714 gst::init().unwrap();
2715
2716 struct TaskFlushTest {
2717 obj: gst::Object,
2718 started_sender: mpsc::Sender<()>,
2719 flush_start_sender: mpsc::Sender<()>,
2720 flush_stop_sender: mpsc::Sender<()>,
2721 }
2722
2723 impl TaskImpl for TaskFlushTest {
2724 type Item = ();
2725
2726 fn obj(&self) -> &impl IsA<glib::Object> {
2727 &self.obj
2728 }
2729
2730 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2731 gst::debug!(RUNTIME_CAT, "pause_flush_start: started");
2732 self.started_sender.send(()).await.unwrap();
2733 Ok(())
2734 }
2735
2736 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2737 pending().await
2738 }
2739
2740 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2741 unreachable!("pause_flush_start: handle_item");
2742 }
2743
2744 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2745 gst::debug!(RUNTIME_CAT, "pause_flush_start: started flushing");
2746 self.flush_start_sender.send(()).await.unwrap();
2747 Ok(())
2748 }
2749
2750 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2751 gst::debug!(RUNTIME_CAT, "pause_flush_start: stopped flushing");
2752 self.flush_stop_sender.send(()).await.unwrap();
2753 Ok(())
2754 }
2755 }
2756
2757 let context = Context::acquire("pause_flush_start", Duration::from_millis(2)).unwrap();
2758
2759 let task = Task::default();
2760
2761 let (started_sender, mut started_receiver) = mpsc::channel(1);
2762 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2763 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2764 let fut = task.prepare(
2765 TaskFlushTest {
2766 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2767 .name("runtime::Task::pause_flush_start")
2768 .build()
2769 .into(),
2770 started_sender,
2771 flush_start_sender,
2772 flush_stop_sender,
2773 },
2774 context,
2775 );
2776 drop(fut);
2777
2778 gst::debug!(RUNTIME_CAT, "pause_flush_start: pausing");
2781 assert_eq!(
2782 task.pause().block_on().unwrap(),
2783 Complete {
2784 origin: Prepared,
2785 target: Paused,
2786 },
2787 );
2788
2789 gst::debug!(RUNTIME_CAT, "pause_flush_start: starting flush");
2790 assert_eq!(
2791 task.flush_start().block_on().unwrap(),
2792 Complete {
2793 origin: Paused,
2794 target: PausedFlushing,
2795 },
2796 );
2797 assert_eq!(task.state(), PausedFlushing);
2798 block_on(flush_start_receiver.next());
2799
2800 gst::debug!(RUNTIME_CAT, "pause_flush_start: stopping flush");
2801 assert_eq!(
2802 task.flush_stop().block_on().unwrap(),
2803 Complete {
2804 origin: PausedFlushing,
2805 target: Paused,
2806 },
2807 );
2808 assert_eq!(task.state(), Paused);
2809 block_on(flush_stop_receiver.next());
2810
2811 started_receiver.try_recv().unwrap_err();
2813
2814 gst::debug!(RUNTIME_CAT, "pause_flush_start: starting after flushing");
2815 assert_eq!(
2816 task.start().block_on().unwrap(),
2817 Complete {
2818 origin: Paused,
2819 target: Started,
2820 },
2821 );
2822 assert_eq!(task.state(), Started);
2823 block_on(started_receiver.next());
2824
2825 stop_then_unprepare(task);
2826 }
2827
2828 #[test]
2829 fn pause_flushing_start() {
2830 gst::init().unwrap();
2831
2832 struct TaskFlushTest {
2833 obj: gst::Object,
2834 started_sender: mpsc::Sender<()>,
2835 flush_start_sender: mpsc::Sender<()>,
2836 flush_stop_sender: mpsc::Sender<()>,
2837 }
2838
2839 impl TaskImpl for TaskFlushTest {
2840 type Item = ();
2841
2842 fn obj(&self) -> &impl IsA<glib::Object> {
2843 &self.obj
2844 }
2845
2846 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
2847 gst::debug!(RUNTIME_CAT, "pause_flushing_start: started");
2848 self.started_sender.send(()).await.unwrap();
2849 Ok(())
2850 }
2851
2852 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2853 pending().await
2854 }
2855
2856 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2857 unreachable!("pause_flushing_start: handle_item");
2858 }
2859
2860 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2861 gst::debug!(RUNTIME_CAT, "pause_flushing_start: started flushing");
2862 self.flush_start_sender.send(()).await.unwrap();
2863 Ok(())
2864 }
2865
2866 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2867 gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopped flushing");
2868 self.flush_stop_sender.send(()).await.unwrap();
2869 Ok(())
2870 }
2871 }
2872
2873 let context = Context::acquire("pause_flushing_start", Duration::from_millis(2)).unwrap();
2874
2875 let task = Task::default();
2876
2877 let (started_sender, mut started_receiver) = mpsc::channel(1);
2878 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2879 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2880 let fut = task.prepare(
2881 TaskFlushTest {
2882 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2883 .name("runtime::Task::pause_flushing_start")
2884 .build()
2885 .into(),
2886 started_sender,
2887 flush_start_sender,
2888 flush_stop_sender,
2889 },
2890 context,
2891 );
2892 drop(fut);
2893
2894 gst::debug!(RUNTIME_CAT, "pause_flushing_start: pausing");
2897 let fut = task.pause();
2898 drop(fut);
2899
2900 gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting flush");
2901 block_on(task.flush_start()).unwrap();
2902 assert_eq!(task.state(), PausedFlushing);
2903 block_on(flush_start_receiver.next());
2904
2905 gst::debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing");
2906 assert_eq!(
2907 task.start().block_on().unwrap(),
2908 Complete {
2909 origin: PausedFlushing,
2910 target: Flushing,
2911 },
2912 );
2913 assert_eq!(task.state(), Flushing);
2914
2915 started_receiver.try_recv().unwrap_err();
2917
2918 gst::debug!(RUNTIME_CAT, "pause_flushing_start: stopping flush");
2919 assert_eq!(
2920 task.flush_stop().block_on().unwrap(),
2921 Complete {
2922 origin: Flushing,
2923 target: Started,
2924 },
2925 );
2926 assert_eq!(task.state(), Started);
2927 block_on(flush_stop_receiver.next());
2928 block_on(started_receiver.next());
2929
2930 stop_then_unprepare(task);
2931 }
2932
2933 #[test]
2934 fn flush_concurrent_start() {
2935 gst::init().unwrap();
2938
2939 struct TaskStartTest {
2940 obj: gst::Object,
2941 flush_start_sender: mpsc::Sender<()>,
2942 flush_stop_sender: mpsc::Sender<()>,
2943 }
2944
2945 impl TaskImpl for TaskStartTest {
2946 type Item = ();
2947
2948 fn obj(&self) -> &impl IsA<glib::Object> {
2949 &self.obj
2950 }
2951
2952 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
2953 pending().await
2954 }
2955
2956 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
2957 unreachable!("flush_concurrent_start: handle_item");
2958 }
2959
2960 async fn flush_start(&mut self) -> Result<(), gst::ErrorMessage> {
2961 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: started flushing");
2962 self.flush_start_sender.send(()).await.unwrap();
2963 Ok(())
2964 }
2965
2966 async fn flush_stop(&mut self) -> Result<(), gst::ErrorMessage> {
2967 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: stopped flushing");
2968 self.flush_stop_sender.send(()).await.unwrap();
2969 Ok(())
2970 }
2971 }
2972
2973 let context = Context::acquire("flush_concurrent_start", Duration::from_millis(2)).unwrap();
2974
2975 let task = Task::default();
2976
2977 let (flush_start_sender, mut flush_start_receiver) = mpsc::channel(1);
2978 let (flush_stop_sender, mut flush_stop_receiver) = mpsc::channel(1);
2979 let fut = task.prepare(
2980 TaskStartTest {
2981 obj: gst::Pad::builder(gst::PadDirection::Unknown)
2982 .name("runtime::Task::flush_concurrent_start")
2983 .build()
2984 .into(),
2985 flush_start_sender,
2986 flush_stop_sender,
2987 },
2988 context,
2989 );
2990 drop(fut);
2991
2992 let oob_context =
2993 Context::acquire("flush_concurrent_start_oob", Duration::from_millis(2)).unwrap();
2994 let task_clone = task.clone();
2995
2996 block_on(task.pause()).unwrap();
2997
2998 let (ready_sender, ready_receiver) = oneshot::channel();
3000 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: spawning flush_start");
3001 let flush_start_handle = oob_context.spawn(async move {
3002 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // flush_start");
3003 ready_sender.send(()).unwrap();
3004 let status = task_clone.flush_start();
3005 match status {
3006 Pending {
3007 trigger: FlushStart,
3008 origin: Paused,
3009 ..
3010 } => (),
3011 Pending {
3012 trigger: FlushStart,
3013 origin: Started,
3014 ..
3015 } => (),
3016 other => panic!("{other:?}"),
3017 };
3018 status.await.unwrap();
3019 flush_start_receiver.next().await.unwrap();
3020 });
3021
3022 gst::debug!(
3023 RUNTIME_CAT,
3024 "flush_concurrent_start: awaiting for oob_context"
3025 );
3026 block_on(ready_receiver).unwrap();
3027
3028 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: // start");
3029 match block_on(task.start()) {
3030 Ok(Complete {
3031 origin: Paused,
3032 target: Started,
3033 }) => (),
3034 Ok(Complete {
3035 origin: PausedFlushing,
3036 target: Flushing,
3037 }) => (),
3038 other => panic!("{other:?}"),
3039 }
3040
3041 block_on(flush_start_handle).unwrap();
3042
3043 gst::debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop");
3044 assert_eq!(
3045 task.flush_stop().block_on().unwrap(),
3046 Complete {
3047 origin: Flushing,
3048 target: Started,
3049 },
3050 );
3051 assert_eq!(task.state(), Started);
3052 block_on(flush_stop_receiver.next());
3053
3054 stop_then_unprepare(task);
3055 }
3056
3057 #[test]
3058 fn start_timer() {
3059 use crate::runtime::timer;
3060
3061 gst::init().unwrap();
3064
3065 struct TaskTimerTest {
3066 obj: gst::Object,
3067 timer: Option<timer::Oneshot>,
3068 timer_elapsed_sender: Option<oneshot::Sender<()>>,
3069 }
3070
3071 impl TaskImpl for TaskTimerTest {
3072 type Item = ();
3073
3074 fn obj(&self) -> &impl IsA<glib::Object> {
3075 &self.obj
3076 }
3077
3078 async fn start(&mut self) -> Result<(), gst::ErrorMessage> {
3079 self.timer = Some(crate::runtime::timer::delay_for(Duration::from_millis(50)));
3080 gst::debug!(RUNTIME_CAT, "start_timer: started");
3081 Ok(())
3082 }
3083
3084 async fn try_next(&mut self) -> Result<(), gst::FlowError> {
3085 gst::debug!(RUNTIME_CAT, "start_timer: awaiting timer");
3086 self.timer.take().unwrap().await;
3087 Ok(())
3088 }
3089
3090 async fn handle_item(&mut self, _item: ()) -> Result<(), gst::FlowError> {
3091 gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed");
3092 if let Some(timer_elapsed_sender) = self.timer_elapsed_sender.take() {
3093 timer_elapsed_sender.send(()).unwrap();
3094 }
3095
3096 Err(gst::FlowError::Eos)
3097 }
3098 }
3099
3100 let context = Context::acquire("start_timer", Duration::from_millis(2)).unwrap();
3101
3102 let task = Task::default();
3103
3104 let (timer_elapsed_sender, timer_elapsed_receiver) = oneshot::channel();
3105 let fut = task.prepare(
3106 TaskTimerTest {
3107 obj: gst::Pad::builder(gst::PadDirection::Unknown)
3108 .name("runtime::Task::start_timer")
3109 .build()
3110 .into(),
3111 timer: None,
3112 timer_elapsed_sender: Some(timer_elapsed_sender),
3113 },
3114 context,
3115 );
3116 drop(fut);
3117
3118 gst::debug!(RUNTIME_CAT, "start_timer: start");
3119 let fut = task.start();
3120 drop(fut);
3121
3122 block_on(timer_elapsed_receiver).unwrap();
3123 gst::debug!(RUNTIME_CAT, "start_timer: timer elapsed received");
3124
3125 stop_then_unprepare(task);
3126 }
3127}