1#[cfg(feature = "schemars08")]
12use crate::spec::JsonSchemaEngineSpec;
13use crate::{
14 errors::ConvertGenericError,
15 spec::{EngineSpec, GenericSpec},
16};
17use derive_where::derive_where;
18use newtype_uuid::{TypedUuid, TypedUuidKind, TypedUuidTag};
19use serde::{Deserialize, Deserializer, Serialize, de::IgnoredAny};
20use std::{borrow::Cow, fmt, time::Duration};
21
22pub enum ExecutionUuidKind {}
24
25impl TypedUuidKind for ExecutionUuidKind {
26 #[inline]
27 fn tag() -> TypedUuidTag {
28 const TAG: TypedUuidTag = TypedUuidTag::new("execution_id");
29 TAG
30 }
31
32 #[inline]
33 fn alias() -> Option<&'static str> {
34 Some("ExecutionUuid")
37 }
38}
39
40#[cfg(feature = "schemars08")]
41impl schemars::JsonSchema for ExecutionUuidKind {
42 fn schema_name() -> String {
43 "ExecutionUuidKind".to_owned()
44 }
45
46 fn json_schema(
47 _: &mut schemars::r#gen::SchemaGenerator,
48 ) -> schemars::schema::Schema {
49 use crate::schema::{EVENTS_MODULE, rust_type_for_events};
50
51 schemars::schema::SchemaObject {
57 instance_type: Some(schemars::schema::InstanceType::String.into()),
58 format: Some("uuid".to_owned()),
59 extensions: [(
60 "x-rust-type".to_owned(),
61 rust_type_for_events(&format!(
62 "{EVENTS_MODULE}::ExecutionUuidKind"
63 )),
64 )]
65 .into_iter()
66 .collect(),
67 ..Default::default()
68 }
69 .into()
70 }
71}
72
73pub type ExecutionUuid = TypedUuid<ExecutionUuidKind>;
78
79#[derive_where(Clone, Debug, PartialEq, Eq)]
80pub enum Event<S: EngineSpec> {
81 Step(StepEvent<S>),
82 Progress(ProgressEvent<S>),
83}
84
85impl<S: EngineSpec> Event<S> {
86 pub fn from_generic(
91 value: Event<GenericSpec>,
92 ) -> Result<Self, ConvertGenericError> {
93 let ret = match value {
94 Event::Step(event) => Event::Step(StepEvent::from_generic(event)?),
95 Event::Progress(event) => {
96 Event::Progress(ProgressEvent::from_generic(event)?)
97 }
98 };
99 Ok(ret)
100 }
101
102 pub fn into_generic(self) -> Event<GenericSpec> {
113 match self {
114 Event::Step(event) => Event::Step(event.into_generic()),
115 Event::Progress(event) => Event::Progress(event.into_generic()),
116 }
117 }
118}
119
120#[derive(Deserialize, Serialize)]
121#[derive_where(Clone, Debug, Eq, PartialEq)]
122#[serde(bound = "", rename_all = "snake_case")]
123pub struct StepEvent<S: EngineSpec> {
124 pub spec: String,
140
141 pub execution_id: ExecutionUuid,
143
144 pub event_index: usize,
146
147 pub total_elapsed: Duration,
149
150 #[serde(rename = "data")]
152 pub kind: StepEventKind<S>,
153}
154
155#[cfg(feature = "schemars08")]
156impl<S: JsonSchemaEngineSpec> schemars::JsonSchema for StepEvent<S> {
157 fn schema_name() -> String {
158 format!("StepEventFor{}", S::schema_name())
159 }
160
161 fn json_schema(
162 generator: &mut schemars::r#gen::SchemaGenerator,
163 ) -> schemars::schema::Schema {
164 use crate::schema::with_description;
165 use schemars::schema::{ObjectValidation, SchemaObject};
166
167 let mut obj = ObjectValidation::default();
168 obj.properties.insert(
169 "spec".to_owned(),
170 with_description(
171 generator.subschema_for::<String>(),
172 "The specification that this event belongs to.",
173 ),
174 );
175 obj.properties.insert(
176 "execution_id".to_owned(),
177 with_description(
178 generator.subschema_for::<ExecutionUuid>(),
179 "The execution ID.",
180 ),
181 );
182 obj.properties.insert(
183 "event_index".to_owned(),
184 with_description(
185 generator.subschema_for::<usize>(),
186 "A monotonically increasing index for this \
187 `StepEvent`.",
188 ),
189 );
190 obj.properties.insert(
191 "total_elapsed".to_owned(),
192 with_description(
193 generator.subschema_for::<Duration>(),
194 "Total time elapsed since the start of execution.",
195 ),
196 );
197 obj.properties.insert(
198 "data".to_owned(),
199 with_description(
200 generator.subschema_for::<StepEventKind<S>>(),
201 "The kind of event this is.",
202 ),
203 );
204 obj.required =
205 ["spec", "execution_id", "event_index", "total_elapsed", "data"]
206 .into_iter()
207 .map(String::from)
208 .collect();
209
210 let mut extensions = serde_json::Map::new();
211 if let Some(info) = S::rust_type_info() {
212 extensions.insert(
213 "x-rust-type".to_owned(),
214 crate::schema::rust_type_for_generic(&info, "StepEvent"),
215 );
216 }
217
218 SchemaObject {
219 instance_type: Some(schemars::schema::InstanceType::Object.into()),
220 object: Some(Box::new(obj)),
221 extensions: extensions.into_iter().collect(),
222 ..Default::default()
223 }
224 .into()
225 }
226}
227
228impl<S: EngineSpec> StepEvent<S> {
229 pub fn progress_event(&self) -> Option<ProgressEvent<S>> {
235 match &self.kind {
236 StepEventKind::ExecutionStarted { first_step, .. } => {
237 Some(ProgressEvent {
238 spec: self.spec.clone(),
239 execution_id: self.execution_id,
240 total_elapsed: self.total_elapsed,
241 kind: ProgressEventKind::WaitingForProgress {
242 step: first_step.clone(),
243 attempt: 1,
244 step_elapsed: Duration::ZERO,
245 attempt_elapsed: Duration::ZERO,
246 },
247 })
248 }
249 StepEventKind::ProgressReset {
250 step,
251 attempt,
252 step_elapsed,
253 attempt_elapsed,
254 ..
255 } => Some(ProgressEvent {
256 spec: self.spec.clone(),
257 execution_id: self.execution_id,
258 total_elapsed: self.total_elapsed,
259 kind: ProgressEventKind::WaitingForProgress {
260 step: step.clone(),
261 attempt: *attempt,
262 step_elapsed: *step_elapsed,
263 attempt_elapsed: *attempt_elapsed,
264 },
265 }),
266 StepEventKind::AttemptRetry {
267 step,
268 next_attempt,
269 step_elapsed,
270 ..
271 } => Some(ProgressEvent {
272 spec: self.spec.clone(),
273 execution_id: self.execution_id,
274 total_elapsed: self.total_elapsed,
275 kind: ProgressEventKind::WaitingForProgress {
276 step: step.clone(),
277 attempt: *next_attempt,
278 step_elapsed: *step_elapsed,
279 attempt_elapsed: Duration::ZERO,
281 },
282 }),
283 StepEventKind::StepCompleted { next_step, .. } => {
284 Some(ProgressEvent {
285 spec: self.spec.clone(),
286 execution_id: self.execution_id,
287 total_elapsed: self.total_elapsed,
288 kind: ProgressEventKind::WaitingForProgress {
289 step: next_step.clone(),
290 attempt: 1,
291 step_elapsed: Duration::ZERO,
293 attempt_elapsed: Duration::ZERO,
294 },
295 })
296 }
297 StepEventKind::Nested {
298 step,
299 attempt,
300 step_elapsed,
301 attempt_elapsed,
302 event,
303 ..
304 } => event.progress_event().map(|progress_event| ProgressEvent {
305 spec: self.spec.clone(),
306 execution_id: self.execution_id,
307 total_elapsed: self.total_elapsed,
308 kind: ProgressEventKind::Nested {
309 step: step.clone(),
310 attempt: *attempt,
311 event: Box::new(progress_event),
312 step_elapsed: *step_elapsed,
313 attempt_elapsed: *attempt_elapsed,
314 },
315 }),
316 StepEventKind::NoStepsDefined
317 | StepEventKind::ExecutionCompleted { .. }
318 | StepEventKind::ExecutionFailed { .. }
319 | StepEventKind::ExecutionAborted { .. }
320 | StepEventKind::Unknown => None,
321 }
322 }
323
324 pub fn leaf_execution_id(&self) -> ExecutionUuid {
327 match &self.kind {
328 StepEventKind::Nested { event, .. } => event.leaf_execution_id(),
329 _ => self.execution_id,
330 }
331 }
332
333 pub fn leaf_event_index(&self) -> usize {
336 match &self.kind {
337 StepEventKind::Nested { event, .. } => event.leaf_event_index(),
338 _ => self.event_index,
339 }
340 }
341
342 pub fn from_generic(
347 value: StepEvent<GenericSpec>,
348 ) -> Result<Self, ConvertGenericError> {
349 Ok(StepEvent {
350 spec: value.spec,
351 execution_id: value.execution_id,
352 event_index: value.event_index,
353 total_elapsed: value.total_elapsed,
354 kind: StepEventKind::from_generic(value.kind)
355 .map_err(|error| error.parent("kind"))?,
356 })
357 }
358
359 pub fn into_generic(self) -> StepEvent<GenericSpec> {
370 StepEvent {
371 spec: self.spec,
372 execution_id: self.execution_id,
373 event_index: self.event_index,
374 total_elapsed: self.total_elapsed,
375 kind: self.kind.into_generic(),
376 }
377 }
378}
379
380#[derive(Deserialize, Serialize)]
381#[cfg_attr(feature = "schemars08", derive(schemars::JsonSchema))]
382#[derive_where(Clone, Debug, Eq, PartialEq)]
383#[serde(bound = "", rename_all = "snake_case", tag = "kind")]
384#[cfg_attr(
385 feature = "schemars08",
386 schemars(
387 rename = "StepEventKindFor{S}",
388 bound = "S: JsonSchemaEngineSpec",
389 )
390)]
391pub enum StepEventKind<S: EngineSpec> {
392 NoStepsDefined,
397
398 ExecutionStarted {
403 steps: Vec<StepInfo<S>>,
405
406 components: Vec<StepComponentSummary<S>>,
408
409 first_step: StepInfoWithMetadata<S>,
411 },
412
413 ProgressReset {
416 step: StepInfoWithMetadata<S>,
418
419 attempt: usize,
421
422 metadata: S::ProgressMetadata,
424
425 step_elapsed: Duration,
428
429 attempt_elapsed: Duration,
431
432 message: Cow<'static, str>,
434 },
435
436 AttemptRetry {
438 step: StepInfoWithMetadata<S>,
440
441 next_attempt: usize,
443
444 step_elapsed: Duration,
447
448 attempt_elapsed: Duration,
450
451 message: Cow<'static, str>,
453 },
454
455 StepCompleted {
457 step: StepInfoWithMetadata<S>,
459
460 attempt: usize,
462
463 outcome: StepOutcome<S>,
465
466 next_step: StepInfoWithMetadata<S>,
468
469 step_elapsed: Duration,
472
473 attempt_elapsed: Duration,
475 },
476
477 ExecutionCompleted {
482 last_step: StepInfoWithMetadata<S>,
484
485 last_attempt: usize,
487
488 last_outcome: StepOutcome<S>,
490
491 step_elapsed: Duration,
494
495 attempt_elapsed: Duration,
497 },
498
499 ExecutionFailed {
504 failed_step: StepInfoWithMetadata<S>,
506
507 total_attempts: usize,
509
510 step_elapsed: Duration,
513
514 attempt_elapsed: Duration,
516
517 message: String,
519
520 causes: Vec<String>,
522 },
523
524 ExecutionAborted {
529 aborted_step: StepInfoWithMetadata<S>,
532
533 attempt: usize,
535
536 step_elapsed: Duration,
539
540 attempt_elapsed: Duration,
542
543 message: String,
545 },
546
547 Nested {
549 step: StepInfoWithMetadata<S>,
551
552 attempt: usize,
554
555 event: Box<StepEvent<GenericSpec>>,
557
558 step_elapsed: Duration,
561
562 attempt_elapsed: Duration,
564 },
565
566 #[serde(other, deserialize_with = "deserialize_ignore_any")]
568 Unknown,
569}
570
571fn deserialize_ignore_any<'de, D: Deserializer<'de>, T: Default>(
572 deserializer: D,
573) -> Result<T, D::Error> {
574 IgnoredAny::deserialize(deserializer).map(|_| T::default())
575}
576
577impl<S: EngineSpec> StepEventKind<S> {
578 pub fn is_terminal(&self) -> StepEventIsTerminal {
585 match self {
586 StepEventKind::NoStepsDefined
587 | StepEventKind::ExecutionCompleted { .. } => {
588 StepEventIsTerminal::Terminal { success: true }
589 }
590 StepEventKind::ExecutionFailed { .. }
591 | StepEventKind::ExecutionAborted { .. } => {
592 StepEventIsTerminal::Terminal { success: false }
593 }
594 StepEventKind::ExecutionStarted { .. }
595 | StepEventKind::ProgressReset { .. }
596 | StepEventKind::AttemptRetry { .. }
597 | StepEventKind::StepCompleted { .. }
598 | StepEventKind::Nested { .. }
599 | StepEventKind::Unknown => StepEventIsTerminal::NonTerminal,
600 }
601 }
602
603 pub fn priority(&self) -> StepEventPriority {
607 match self {
608 StepEventKind::NoStepsDefined
609 | StepEventKind::ExecutionStarted { .. }
610 | StepEventKind::StepCompleted { .. }
611 | StepEventKind::ExecutionCompleted { .. }
612 | StepEventKind::ExecutionFailed { .. }
613 | StepEventKind::ExecutionAborted { .. } => StepEventPriority::High,
614 StepEventKind::ProgressReset { .. }
615 | StepEventKind::AttemptRetry { .. }
616 | StepEventKind::Unknown => StepEventPriority::Low,
617 StepEventKind::Nested { event, .. } => event.kind.priority(),
618 }
619 }
620
621 pub fn from_generic(
626 value: StepEventKind<GenericSpec>,
627 ) -> Result<Self, ConvertGenericError> {
628 let ret = match value {
629 StepEventKind::NoStepsDefined => StepEventKind::NoStepsDefined,
630 StepEventKind::ExecutionStarted {
631 steps,
632 components,
633 first_step,
634 } => StepEventKind::ExecutionStarted {
635 steps: steps
636 .into_iter()
637 .enumerate()
638 .map(|(index, step)| {
639 StepInfo::from_generic(step)
640 .map_err(|error| error.parent_array("steps", index))
641 })
642 .collect::<Result<Vec<_>, _>>()?,
643 components: components
644 .into_iter()
645 .enumerate()
646 .map(|(index, component)| {
647 StepComponentSummary::from_generic(component).map_err(
648 |error| error.parent_array("components", index),
649 )
650 })
651 .collect::<Result<Vec<_>, _>>()?,
652 first_step: StepInfoWithMetadata::from_generic(first_step)
653 .map_err(|error| error.parent("first_step"))?,
654 },
655 StepEventKind::ProgressReset {
656 step,
657 attempt,
658 metadata,
659 step_elapsed,
660 attempt_elapsed,
661 message,
662 } => StepEventKind::ProgressReset {
663 step: StepInfoWithMetadata::from_generic(step)
664 .map_err(|error| error.parent("step"))?,
665 attempt,
666 metadata: serde_json::from_value(metadata).map_err(
667 |error| ConvertGenericError::new("metadata", error),
668 )?,
669 step_elapsed,
670 attempt_elapsed,
671 message,
672 },
673 StepEventKind::AttemptRetry {
674 step,
675 next_attempt,
676 step_elapsed,
677 attempt_elapsed,
678 message,
679 } => StepEventKind::AttemptRetry {
680 step: StepInfoWithMetadata::from_generic(step)
681 .map_err(|error| error.parent("step"))?,
682 next_attempt,
683 step_elapsed,
684 attempt_elapsed,
685 message,
686 },
687 StepEventKind::StepCompleted {
688 step,
689 attempt,
690 outcome,
691 next_step,
692 step_elapsed,
693 attempt_elapsed,
694 } => StepEventKind::StepCompleted {
695 step: StepInfoWithMetadata::from_generic(step)
696 .map_err(|error| error.parent("step"))?,
697 attempt,
698 outcome: StepOutcome::from_generic(outcome)
699 .map_err(|error| error.parent("outcome"))?,
700 next_step: StepInfoWithMetadata::from_generic(next_step)
701 .map_err(|error| error.parent("next_step"))?,
702 step_elapsed,
703 attempt_elapsed,
704 },
705 StepEventKind::ExecutionCompleted {
706 last_step,
707 last_attempt,
708 last_outcome,
709 step_elapsed,
710 attempt_elapsed,
711 } => StepEventKind::ExecutionCompleted {
712 last_step: StepInfoWithMetadata::from_generic(last_step)
713 .map_err(|error| error.parent("last_step"))?,
714 last_attempt,
715 last_outcome: StepOutcome::from_generic(last_outcome)
716 .map_err(|error| error.parent("last_outcome"))?,
717 step_elapsed,
718 attempt_elapsed,
719 },
720 StepEventKind::ExecutionFailed {
721 failed_step,
722 total_attempts,
723 step_elapsed,
724 attempt_elapsed,
725 message,
726 causes,
727 } => StepEventKind::ExecutionFailed {
728 failed_step: StepInfoWithMetadata::from_generic(failed_step)
729 .map_err(|error| error.parent("failed_step"))?,
730 total_attempts,
731 step_elapsed,
732 attempt_elapsed,
733 message,
734 causes,
735 },
736 StepEventKind::ExecutionAborted {
737 aborted_step,
738 attempt,
739 step_elapsed,
740 attempt_elapsed,
741 message,
742 } => StepEventKind::ExecutionAborted {
743 aborted_step: StepInfoWithMetadata::from_generic(aborted_step)
744 .map_err(|error| error.parent("aborted_step"))?,
745 attempt,
746 step_elapsed,
747 attempt_elapsed,
748 message,
749 },
750 StepEventKind::Nested {
751 step,
752 attempt,
753 event,
754 step_elapsed,
755 attempt_elapsed,
756 } => StepEventKind::Nested {
757 step: StepInfoWithMetadata::from_generic(step)
758 .map_err(|error| error.parent("step"))?,
759 attempt,
760 event,
761 step_elapsed,
762 attempt_elapsed,
763 },
764 StepEventKind::Unknown => StepEventKind::Unknown,
765 };
766 Ok(ret)
767 }
768
769 pub fn into_generic(self) -> StepEventKind<GenericSpec> {
780 match self {
781 StepEventKind::NoStepsDefined => StepEventKind::NoStepsDefined,
782 StepEventKind::ExecutionStarted {
783 steps,
784 components,
785 first_step,
786 } => StepEventKind::ExecutionStarted {
787 steps: steps
788 .into_iter()
789 .map(|step| step.into_generic())
790 .collect(),
791 components: components
792 .into_iter()
793 .map(|component| component.into_generic())
794 .collect(),
795 first_step: first_step.into_generic(),
796 },
797 StepEventKind::ProgressReset {
798 step,
799 attempt,
800 metadata,
801 step_elapsed,
802 attempt_elapsed,
803 message,
804 } => StepEventKind::ProgressReset {
805 step: step.into_generic(),
806 attempt,
807 metadata: serde_json::to_value(metadata)
808 .unwrap_or(serde_json::Value::Null),
809 step_elapsed,
810 attempt_elapsed,
811 message,
812 },
813 StepEventKind::AttemptRetry {
814 step,
815 next_attempt,
816 step_elapsed,
817 attempt_elapsed,
818 message,
819 } => StepEventKind::AttemptRetry {
820 step: step.into_generic(),
821 next_attempt,
822 step_elapsed,
823 attempt_elapsed,
824 message,
825 },
826 StepEventKind::StepCompleted {
827 step,
828 attempt,
829 outcome,
830 next_step,
831 step_elapsed,
832 attempt_elapsed,
833 } => StepEventKind::StepCompleted {
834 step: step.into_generic(),
835 attempt,
836 outcome: outcome.into_generic(),
837 next_step: next_step.into_generic(),
838 step_elapsed,
839 attempt_elapsed,
840 },
841 StepEventKind::ExecutionCompleted {
842 last_step,
843 last_attempt,
844 last_outcome,
845 step_elapsed,
846 attempt_elapsed,
847 } => StepEventKind::ExecutionCompleted {
848 last_step: last_step.into_generic(),
849 last_attempt,
850 last_outcome: last_outcome.into_generic(),
851 step_elapsed,
852 attempt_elapsed,
853 },
854 StepEventKind::ExecutionFailed {
855 failed_step,
856 total_attempts,
857 step_elapsed,
858 attempt_elapsed,
859 message,
860 causes,
861 } => StepEventKind::ExecutionFailed {
862 failed_step: failed_step.into_generic(),
863 total_attempts,
864 step_elapsed,
865 attempt_elapsed,
866 message,
867 causes,
868 },
869 StepEventKind::ExecutionAborted {
870 aborted_step,
871 attempt,
872 step_elapsed,
873 attempt_elapsed,
874 message,
875 } => StepEventKind::ExecutionAborted {
876 aborted_step: aborted_step.into_generic(),
877 attempt,
878 step_elapsed,
879 attempt_elapsed,
880 message,
881 },
882 StepEventKind::Nested {
883 step,
884 attempt,
885 event,
886 step_elapsed,
887 attempt_elapsed,
888 } => StepEventKind::Nested {
889 step: step.into_generic(),
890 attempt,
891 event,
892 step_elapsed,
893 attempt_elapsed,
894 },
895 StepEventKind::Unknown => StepEventKind::Unknown,
896 }
897 }
898
899 pub fn step_outcome(&self) -> Option<&StepOutcome<S>> {
903 match self {
904 StepEventKind::StepCompleted { outcome, .. }
905 | StepEventKind::ExecutionCompleted {
906 last_outcome: outcome, ..
907 } => Some(outcome),
908 StepEventKind::NoStepsDefined
909 | StepEventKind::ExecutionStarted { .. }
910 | StepEventKind::ProgressReset { .. }
911 | StepEventKind::AttemptRetry { .. }
912 | StepEventKind::ExecutionFailed { .. }
913 | StepEventKind::ExecutionAborted { .. }
914 | StepEventKind::Nested { .. }
915 | StepEventKind::Unknown => None,
916 }
917 }
918}
919
920#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
927pub enum StepEventIsTerminal {
928 NonTerminal,
930
931 Terminal {
933 success: bool,
935 },
936}
937
938#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
951pub enum StepEventPriority {
952 Low,
956
957 High,
961}
962
963#[derive(Deserialize, Serialize)]
964#[cfg_attr(feature = "schemars08", derive(schemars::JsonSchema))]
965#[derive_where(Clone, Debug, Eq, PartialEq)]
966#[serde(bound = "", rename_all = "snake_case", tag = "kind")]
967#[cfg_attr(
968 feature = "schemars08",
969 schemars(rename = "StepOutcomeFor{S}", bound = "S: JsonSchemaEngineSpec",)
970)]
971pub enum StepOutcome<S: EngineSpec> {
972 Success {
974 message: Option<Cow<'static, str>>,
976
977 metadata: Option<S::CompletionMetadata>,
979 },
980
981 Warning {
983 message: Cow<'static, str>,
985
986 metadata: Option<S::CompletionMetadata>,
988 },
989
990 Skipped {
992 message: Cow<'static, str>,
994
995 metadata: Option<S::SkippedMetadata>,
997 },
998}
999
1000impl<S: EngineSpec> StepOutcome<S> {
1001 pub fn from_generic(
1006 value: StepOutcome<GenericSpec>,
1007 ) -> Result<Self, ConvertGenericError> {
1008 let ret = match value {
1009 StepOutcome::Success { message, metadata } => {
1010 StepOutcome::Success {
1011 message,
1012 metadata: metadata
1013 .map(|metadata| {
1014 serde_json::from_value(metadata).map_err(|error| {
1015 ConvertGenericError::new("metadata", error)
1016 })
1017 })
1018 .transpose()?,
1019 }
1020 }
1021 StepOutcome::Warning { message, metadata } => {
1022 StepOutcome::Warning {
1023 message,
1024 metadata: metadata
1025 .map(|metadata| {
1026 serde_json::from_value(metadata).map_err(|error| {
1027 ConvertGenericError::new("metadata", error)
1028 })
1029 })
1030 .transpose()?,
1031 }
1032 }
1033 StepOutcome::Skipped { message, metadata } => {
1034 StepOutcome::Skipped {
1035 message,
1036 metadata: metadata
1037 .map(|metadata| {
1038 serde_json::from_value(metadata).map_err(|error| {
1039 ConvertGenericError::new("metadata", error)
1040 })
1041 })
1042 .transpose()?,
1043 }
1044 }
1045 };
1046 Ok(ret)
1047 }
1048
1049 pub fn completion_metadata(&self) -> Option<&S::CompletionMetadata> {
1054 match self {
1055 StepOutcome::Success { metadata, .. }
1056 | StepOutcome::Warning { metadata, .. } => metadata.as_ref(),
1057 StepOutcome::Skipped { .. } => None,
1058 }
1059 }
1060
1061 pub fn into_generic(self) -> StepOutcome<GenericSpec> {
1072 match self {
1073 StepOutcome::Success { message, metadata } => {
1074 StepOutcome::Success {
1075 message,
1076 metadata: metadata.map(|metadata| {
1077 serde_json::to_value(metadata)
1078 .unwrap_or(serde_json::Value::Null)
1079 }),
1080 }
1081 }
1082 StepOutcome::Warning { metadata, message } => {
1083 StepOutcome::Warning {
1084 message,
1085 metadata: metadata.map(|metadata| {
1086 serde_json::to_value(metadata)
1087 .unwrap_or(serde_json::Value::Null)
1088 }),
1089 }
1090 }
1091 StepOutcome::Skipped { metadata, message } => {
1092 StepOutcome::Skipped {
1093 message,
1094 metadata: metadata.map(|metadata| {
1095 serde_json::to_value(metadata)
1096 .unwrap_or(serde_json::Value::Null)
1097 }),
1098 }
1099 }
1100 }
1101 }
1102
1103 pub fn is_success_or_warning(&self) -> bool {
1106 match self {
1107 Self::Success { .. } | Self::Warning { .. } => true,
1108 Self::Skipped { .. } => false,
1109 }
1110 }
1111
1112 pub fn is_skipped(&self) -> bool {
1114 match self {
1115 Self::Skipped { .. } => true,
1116 Self::Success { .. } | Self::Warning { .. } => false,
1117 }
1118 }
1119
1120 pub fn message(&self) -> Option<&Cow<'static, str>> {
1122 match self {
1123 StepOutcome::Success { message, .. } => message.as_ref(),
1124 StepOutcome::Warning { message, .. }
1125 | StepOutcome::Skipped { message, .. } => Some(message),
1126 }
1127 }
1128}
1129
1130#[derive(Deserialize, Serialize)]
1131#[derive_where(Clone, Debug, Eq, PartialEq)]
1132#[serde(bound = "", rename_all = "snake_case")]
1133pub struct ProgressEvent<S: EngineSpec> {
1134 pub spec: String,
1142
1143 pub execution_id: ExecutionUuid,
1145
1146 pub total_elapsed: Duration,
1148
1149 #[serde(rename = "data")]
1151 pub kind: ProgressEventKind<S>,
1152}
1153
1154#[cfg(feature = "schemars08")]
1155impl<S: JsonSchemaEngineSpec> schemars::JsonSchema for ProgressEvent<S> {
1156 fn schema_name() -> String {
1157 format!("ProgressEventFor{}", S::schema_name())
1158 }
1159
1160 fn json_schema(
1161 generator: &mut schemars::r#gen::SchemaGenerator,
1162 ) -> schemars::schema::Schema {
1163 use crate::schema::with_description;
1164 use schemars::schema::{ObjectValidation, SchemaObject};
1165
1166 let mut obj = ObjectValidation::default();
1167 obj.properties.insert(
1168 "spec".to_owned(),
1169 with_description(
1170 generator.subschema_for::<String>(),
1171 "The specification that this event belongs to.",
1172 ),
1173 );
1174 obj.properties.insert(
1175 "execution_id".to_owned(),
1176 with_description(
1177 generator.subschema_for::<ExecutionUuid>(),
1178 "The execution ID.",
1179 ),
1180 );
1181 obj.properties.insert(
1182 "total_elapsed".to_owned(),
1183 with_description(
1184 generator.subschema_for::<Duration>(),
1185 "Total time elapsed since the start of execution.",
1186 ),
1187 );
1188 obj.properties.insert(
1189 "data".to_owned(),
1190 with_description(
1191 generator.subschema_for::<ProgressEventKind<S>>(),
1192 "The kind of event this is.",
1193 ),
1194 );
1195 obj.required = ["spec", "execution_id", "total_elapsed", "data"]
1196 .into_iter()
1197 .map(String::from)
1198 .collect();
1199
1200 let mut extensions = serde_json::Map::new();
1201 if let Some(info) = S::rust_type_info() {
1202 extensions.insert(
1203 "x-rust-type".to_owned(),
1204 crate::schema::rust_type_for_generic(&info, "ProgressEvent"),
1205 );
1206 }
1207
1208 SchemaObject {
1209 instance_type: Some(schemars::schema::InstanceType::Object.into()),
1210 object: Some(Box::new(obj)),
1211 extensions: extensions.into_iter().collect(),
1212 ..Default::default()
1213 }
1214 .into()
1215 }
1216}
1217
1218impl<S: EngineSpec> ProgressEvent<S> {
1219 pub fn from_generic(
1224 value: ProgressEvent<GenericSpec>,
1225 ) -> Result<Self, ConvertGenericError> {
1226 Ok(Self {
1227 spec: value.spec,
1228 execution_id: value.execution_id,
1229 total_elapsed: value.total_elapsed,
1230 kind: ProgressEventKind::from_generic(value.kind)
1231 .map_err(|error| error.parent("kind"))?,
1232 })
1233 }
1234
1235 pub fn into_generic(self) -> ProgressEvent<GenericSpec> {
1246 ProgressEvent {
1247 spec: self.spec,
1248 execution_id: self.execution_id,
1249 total_elapsed: self.total_elapsed,
1250 kind: self.kind.into_generic(),
1251 }
1252 }
1253}
1254
1255#[derive(Deserialize, Serialize)]
1256#[cfg_attr(feature = "schemars08", derive(schemars::JsonSchema))]
1257#[derive_where(Clone, Debug, Eq, PartialEq)]
1258#[serde(bound = "", rename_all = "snake_case", tag = "kind")]
1259#[cfg_attr(
1260 feature = "schemars08",
1261 schemars(
1262 rename = "ProgressEventKindFor{S}",
1263 bound = "S: JsonSchemaEngineSpec",
1264 )
1265)]
1266pub enum ProgressEventKind<S: EngineSpec> {
1267 WaitingForProgress {
1272 step: StepInfoWithMetadata<S>,
1274
1275 attempt: usize,
1277
1278 step_elapsed: Duration,
1281
1282 attempt_elapsed: Duration,
1284 },
1285
1286 Progress {
1287 step: StepInfoWithMetadata<S>,
1289
1290 attempt: usize,
1292
1293 metadata: S::ProgressMetadata,
1295
1296 progress: Option<ProgressCounter>,
1298
1299 step_elapsed: Duration,
1302
1303 attempt_elapsed: Duration,
1305 },
1306
1307 Nested {
1308 step: StepInfoWithMetadata<S>,
1310
1311 attempt: usize,
1313
1314 event: Box<ProgressEvent<GenericSpec>>,
1316
1317 step_elapsed: Duration,
1320
1321 attempt_elapsed: Duration,
1323 },
1324
1325 #[serde(other, deserialize_with = "deserialize_ignore_any")]
1327 Unknown,
1328}
1329
1330impl<S: EngineSpec> ProgressEventKind<S> {
1331 pub fn progress_counter(&self) -> Option<&ProgressCounter> {
1333 match self {
1334 ProgressEventKind::Progress { progress, .. } => progress.as_ref(),
1335 ProgressEventKind::Nested { event, .. } => {
1336 event.kind.progress_counter()
1337 }
1338 ProgressEventKind::WaitingForProgress { .. }
1339 | ProgressEventKind::Unknown => None,
1340 }
1341 }
1342
1343 pub fn leaf_attempt(&self) -> Option<usize> {
1348 match self {
1349 ProgressEventKind::WaitingForProgress { attempt, .. }
1350 | ProgressEventKind::Progress { attempt, .. } => Some(*attempt),
1351 ProgressEventKind::Nested { event, .. } => {
1352 event.kind.leaf_attempt()
1353 }
1354 ProgressEventKind::Unknown => None,
1355 }
1356 }
1357
1358 pub fn leaf_step_elapsed(&self) -> Option<Duration> {
1363 match self {
1364 ProgressEventKind::WaitingForProgress { step_elapsed, .. }
1365 | ProgressEventKind::Progress { step_elapsed, .. } => {
1366 Some(*step_elapsed)
1367 }
1368 ProgressEventKind::Nested { event, .. } => {
1369 event.kind.leaf_step_elapsed()
1370 }
1371 ProgressEventKind::Unknown => None,
1372 }
1373 }
1374
1375 pub fn leaf_attempt_elapsed(&self) -> Option<Duration> {
1380 match self {
1381 ProgressEventKind::WaitingForProgress {
1382 attempt_elapsed, ..
1383 }
1384 | ProgressEventKind::Progress { attempt_elapsed, .. } => {
1385 Some(*attempt_elapsed)
1386 }
1387 ProgressEventKind::Nested { event, .. } => {
1388 event.kind.leaf_attempt_elapsed()
1389 }
1390 ProgressEventKind::Unknown => None,
1391 }
1392 }
1393
1394 pub fn from_generic(
1399 value: ProgressEventKind<GenericSpec>,
1400 ) -> Result<Self, ConvertGenericError> {
1401 let ret = match value {
1402 ProgressEventKind::WaitingForProgress {
1403 step,
1404 attempt,
1405 step_elapsed,
1406 attempt_elapsed,
1407 } => ProgressEventKind::WaitingForProgress {
1408 step: StepInfoWithMetadata::from_generic(step)
1409 .map_err(|error| error.parent("step"))?,
1410 attempt,
1411 step_elapsed,
1412 attempt_elapsed,
1413 },
1414 ProgressEventKind::Progress {
1415 step,
1416 attempt,
1417 metadata,
1418 progress,
1419 step_elapsed,
1420 attempt_elapsed,
1421 } => ProgressEventKind::Progress {
1422 step: StepInfoWithMetadata::from_generic(step)
1423 .map_err(|error| error.parent("step"))?,
1424 attempt,
1425 metadata: serde_json::from_value(metadata).map_err(
1426 |error| ConvertGenericError::new("metadata", error),
1427 )?,
1428 progress,
1429 step_elapsed,
1430 attempt_elapsed,
1431 },
1432 ProgressEventKind::Nested {
1433 step,
1434 attempt,
1435 event,
1436 step_elapsed,
1437 attempt_elapsed,
1438 } => ProgressEventKind::Nested {
1439 step: StepInfoWithMetadata::from_generic(step)
1440 .map_err(|error| error.parent("step"))?,
1441 attempt,
1442 event,
1443 step_elapsed,
1444 attempt_elapsed,
1445 },
1446 ProgressEventKind::Unknown => ProgressEventKind::Unknown,
1447 };
1448 Ok(ret)
1449 }
1450
1451 pub fn into_generic(self) -> ProgressEventKind<GenericSpec> {
1462 match self {
1463 ProgressEventKind::WaitingForProgress {
1464 step,
1465 attempt,
1466 step_elapsed,
1467 attempt_elapsed,
1468 } => ProgressEventKind::WaitingForProgress {
1469 step: step.into_generic(),
1470 attempt,
1471 step_elapsed,
1472 attempt_elapsed,
1473 },
1474 ProgressEventKind::Progress {
1475 step,
1476 attempt,
1477 metadata,
1478 progress,
1479 step_elapsed,
1480 attempt_elapsed,
1481 } => ProgressEventKind::Progress {
1482 step: step.into_generic(),
1483 attempt,
1484 metadata: serde_json::to_value(metadata)
1485 .unwrap_or(serde_json::Value::Null),
1486 progress,
1487 step_elapsed,
1488 attempt_elapsed,
1489 },
1490 ProgressEventKind::Nested {
1491 step,
1492 attempt,
1493 event,
1494 step_elapsed,
1495 attempt_elapsed,
1496 } => ProgressEventKind::Nested {
1497 step: step.into_generic(),
1498 attempt,
1499 event,
1500 step_elapsed,
1501 attempt_elapsed,
1502 },
1503 ProgressEventKind::Unknown => ProgressEventKind::Unknown,
1504 }
1505 }
1506}
1507
1508#[derive(Deserialize, Serialize)]
1510#[cfg_attr(feature = "schemars08", derive(schemars::JsonSchema))]
1511#[derive_where(Clone, Debug, Eq, PartialEq)]
1512#[serde(bound = "", rename_all = "snake_case")]
1513#[cfg_attr(
1514 feature = "schemars08",
1515 schemars(rename = "StepInfoFor{S}", bound = "S: JsonSchemaEngineSpec",)
1516)]
1517pub struct StepInfo<S: EngineSpec> {
1518 pub id: S::StepId,
1520
1521 pub component: S::Component,
1523
1524 pub description: Cow<'static, str>,
1526
1527 pub index: usize,
1529
1530 pub component_index: usize,
1532
1533 pub total_component_steps: usize,
1535}
1536
1537impl<S: EngineSpec> StepInfo<S> {
1538 pub fn is_last_step_in_component(&self) -> bool {
1540 self.component_index + 1 == self.total_component_steps
1541 }
1542
1543 pub fn from_generic(
1548 value: StepInfo<GenericSpec>,
1549 ) -> Result<Self, ConvertGenericError> {
1550 Ok(Self {
1551 id: serde_json::from_value(value.id)
1552 .map_err(|error| ConvertGenericError::new("id", error))?,
1553 component: serde_json::from_value(value.component).map_err(
1554 |error| ConvertGenericError::new("component", error),
1555 )?,
1556 description: value.description,
1557 index: value.index,
1558 component_index: value.component_index,
1559 total_component_steps: value.total_component_steps,
1560 })
1561 }
1562
1563 pub fn into_generic(self) -> StepInfo<GenericSpec> {
1574 StepInfo {
1575 id: serde_json::to_value(self.id)
1576 .unwrap_or(serde_json::Value::Null),
1577 component: serde_json::to_value(self.component)
1578 .unwrap_or(serde_json::Value::Null),
1579 description: self.description,
1580 index: self.index,
1581 component_index: self.component_index,
1582 total_component_steps: self.total_component_steps,
1583 }
1584 }
1585}
1586
1587#[derive(Deserialize, Serialize)]
1588#[cfg_attr(feature = "schemars08", derive(schemars::JsonSchema))]
1589#[derive_where(Clone, Debug, Eq, PartialEq)]
1590#[serde(bound = "", rename_all = "snake_case")]
1591#[cfg_attr(
1592 feature = "schemars08",
1593 schemars(
1594 rename = "StepComponentSummaryFor{S}",
1595 bound = "S: JsonSchemaEngineSpec",
1596 )
1597)]
1598pub struct StepComponentSummary<S: EngineSpec> {
1599 pub component: S::Component,
1601
1602 pub total_component_steps: usize,
1604}
1605
1606impl<S: EngineSpec> StepComponentSummary<S> {
1607 pub fn from_generic(
1612 value: StepComponentSummary<GenericSpec>,
1613 ) -> Result<Self, ConvertGenericError> {
1614 Ok(Self {
1615 component: serde_json::from_value(value.component).map_err(
1616 |error| ConvertGenericError::new("component", error),
1617 )?,
1618 total_component_steps: value.total_component_steps,
1619 })
1620 }
1621
1622 pub fn into_generic(self) -> StepComponentSummary<GenericSpec> {
1633 StepComponentSummary {
1634 component: serde_json::to_value(self.component)
1635 .unwrap_or(serde_json::Value::Null),
1636 total_component_steps: self.total_component_steps,
1637 }
1638 }
1639}
1640
1641#[derive(Deserialize, Serialize)]
1643#[cfg_attr(feature = "schemars08", derive(schemars::JsonSchema))]
1644#[derive_where(Clone, Debug, Eq, PartialEq)]
1645#[serde(bound = "", rename_all = "snake_case")]
1646#[cfg_attr(
1647 feature = "schemars08",
1648 schemars(
1649 rename = "StepInfoWithMetadataFor{S}",
1650 bound = "S: JsonSchemaEngineSpec",
1651 )
1652)]
1653pub struct StepInfoWithMetadata<S: EngineSpec> {
1654 pub info: StepInfo<S>,
1656
1657 pub metadata: Option<S::StepMetadata>,
1659}
1660
1661impl<S: EngineSpec> StepInfoWithMetadata<S> {
1662 pub fn from_generic(
1667 value: StepInfoWithMetadata<GenericSpec>,
1668 ) -> Result<Self, ConvertGenericError> {
1669 Ok(Self {
1670 info: StepInfo::from_generic(value.info)
1671 .map_err(|error| error.parent("info"))?,
1672 metadata: value
1673 .metadata
1674 .map(|metadata| {
1675 serde_json::from_value(metadata).map_err(|error| {
1676 ConvertGenericError::new("metadata", error)
1677 })
1678 })
1679 .transpose()?,
1680 })
1681 }
1682
1683 pub fn into_generic(self) -> StepInfoWithMetadata<GenericSpec> {
1694 StepInfoWithMetadata {
1695 info: self.info.into_generic(),
1696 metadata: self.metadata.map(|metadata| {
1697 serde_json::to_value(metadata)
1698 .unwrap_or(serde_json::Value::Null)
1699 }),
1700 }
1701 }
1702}
1703
1704#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
1711#[serde(rename_all = "snake_case")]
1712pub struct ProgressCounter {
1713 pub current: u64,
1715
1716 pub total: Option<u64>,
1718
1719 pub units: ProgressUnits,
1721}
1722
1723#[cfg(feature = "schemars08")]
1724impl schemars::JsonSchema for ProgressCounter {
1725 fn schema_name() -> String {
1726 "ProgressCounter".to_owned()
1727 }
1728
1729 fn json_schema(
1730 generator: &mut schemars::r#gen::SchemaGenerator,
1731 ) -> schemars::schema::Schema {
1732 use crate::schema::{
1733 EVENTS_MODULE, rust_type_for_events, with_description,
1734 };
1735 use schemars::schema::{ObjectValidation, SchemaObject};
1736
1737 let mut obj = ObjectValidation::default();
1738 obj.properties.insert(
1739 "current".to_owned(),
1740 with_description(
1741 generator.subschema_for::<u64>(),
1742 "The current progress.",
1743 ),
1744 );
1745 obj.properties.insert(
1746 "total".to_owned(),
1747 with_description(
1748 generator.subschema_for::<Option<u64>>(),
1749 "The total progress.",
1750 ),
1751 );
1752 obj.properties.insert(
1753 "units".to_owned(),
1754 with_description(
1755 generator.subschema_for::<ProgressUnits>(),
1756 "Progress units.",
1757 ),
1758 );
1759 obj.required =
1760 ["current", "units"].into_iter().map(String::from).collect();
1761
1762 SchemaObject {
1763 metadata: Some(Box::new(schemars::schema::Metadata {
1764 description: Some("Current progress.".to_owned()),
1765 ..Default::default()
1766 })),
1767 instance_type: Some(schemars::schema::InstanceType::Object.into()),
1768 object: Some(Box::new(obj)),
1769 extensions: [(
1770 "x-rust-type".to_owned(),
1771 rust_type_for_events(&format!(
1772 "{EVENTS_MODULE}::ProgressCounter"
1773 )),
1774 )]
1775 .into_iter()
1776 .collect(),
1777 ..Default::default()
1778 }
1779 .into()
1780 }
1781}
1782
1783impl ProgressCounter {
1784 #[inline]
1786 pub fn new(
1787 current: u64,
1788 total: u64,
1789 units: impl Into<ProgressUnits>,
1790 ) -> Self {
1791 Self { current, total: Some(total), units: units.into() }
1792 }
1793
1794 #[inline]
1796 pub fn current(current: u64, units: impl Into<ProgressUnits>) -> Self {
1797 Self { current, total: None, units: units.into() }
1798 }
1799}
1800
1801#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
1802#[serde(transparent)]
1803pub struct ProgressUnits(pub Cow<'static, str>);
1804
1805#[cfg(feature = "schemars08")]
1806impl schemars::JsonSchema for ProgressUnits {
1807 fn schema_name() -> String {
1808 "ProgressUnits".to_owned()
1809 }
1810
1811 fn json_schema(
1812 generator: &mut schemars::r#gen::SchemaGenerator,
1813 ) -> schemars::schema::Schema {
1814 use crate::schema::{EVENTS_MODULE, rust_type_for_events};
1815
1816 let mut schema = match generator.subschema_for::<String>() {
1819 schemars::schema::Schema::Object(obj) => obj,
1820 other => {
1824 debug_assert!(
1825 false,
1826 "expected String to produce an Object \
1827 schema, got: {other:?}",
1828 );
1829 return other;
1830 }
1831 };
1832 schema.extensions.insert(
1833 "x-rust-type".to_owned(),
1834 rust_type_for_events(&format!("{EVENTS_MODULE}::ProgressUnits")),
1835 );
1836 schema.into()
1837 }
1838}
1839
1840impl ProgressUnits {
1841 pub fn new(s: impl Into<Cow<'static, str>>) -> Self {
1843 Self(s.into())
1844 }
1845
1846 pub const fn new_const(s: &'static str) -> Self {
1848 Self(Cow::Borrowed(s))
1849 }
1850
1851 pub const BYTES: Self = Self::new_const("bytes");
1855}
1856
1857impl AsRef<str> for ProgressUnits {
1858 fn as_ref(&self) -> &str {
1859 self.0.as_ref()
1860 }
1861}
1862
1863impl fmt::Display for ProgressUnits {
1864 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1865 f.write_str(self.as_ref())
1866 }
1867}
1868
1869impl<T> From<T> for ProgressUnits
1870where
1871 T: Into<Cow<'static, str>>,
1872{
1873 fn from(value: T) -> Self {
1874 Self(value.into())
1875 }
1876}
1877
1878#[derive_where(Clone, Debug, Eq, PartialEq)]
1879pub enum StepProgress<S: EngineSpec> {
1880 Progress {
1882 progress: Option<ProgressCounter>,
1884
1885 metadata: S::ProgressMetadata,
1887 },
1888
1889 Reset {
1895 metadata: S::ProgressMetadata,
1897
1898 message: Cow<'static, str>,
1900 },
1901
1902 Retry {
1904 message: Cow<'static, str>,
1906 },
1907}
1908
1909impl<S: EngineSpec> StepProgress<S> {
1910 pub fn with_current_and_total(
1912 current: u64,
1913 total: u64,
1914 units: impl Into<ProgressUnits>,
1915 metadata: S::ProgressMetadata,
1916 ) -> Self {
1917 Self::Progress {
1918 progress: Some(ProgressCounter {
1919 current,
1920 total: Some(total),
1921 units: units.into(),
1922 }),
1923 metadata,
1924 }
1925 }
1926
1927 pub fn with_current(
1929 current: u64,
1930 units: impl Into<ProgressUnits>,
1931 metadata: S::ProgressMetadata,
1932 ) -> Self {
1933 Self::Progress {
1934 progress: Some(ProgressCounter {
1935 current,
1936 total: None,
1937 units: units.into(),
1938 }),
1939 metadata,
1940 }
1941 }
1942
1943 pub fn progress(metadata: S::ProgressMetadata) -> Self {
1945 Self::Progress { progress: None, metadata }
1946 }
1947
1948 pub fn reset(
1950 metadata: S::ProgressMetadata,
1951 message: impl Into<Cow<'static, str>>,
1952 ) -> Self {
1953 Self::Reset { metadata, message: message.into() }
1954 }
1955
1956 pub fn retry(message: impl Into<Cow<'static, str>>) -> Self {
1958 Self::Retry { message: message.into() }
1959 }
1960}
1961
1962#[derive_where(Clone, Debug, Default, Eq, PartialEq)]
1967#[derive(Deserialize, Serialize)]
1968#[serde(bound = "", rename_all = "snake_case")]
1969pub struct EventReport<S: EngineSpec> {
1970 pub step_events: Vec<StepEvent<S>>,
1974
1975 pub progress_events: Vec<ProgressEvent<S>>,
1981
1982 pub root_execution_id: Option<ExecutionUuid>,
1988
1989 pub last_seen: Option<usize>,
1993}
1994
1995#[cfg(feature = "schemars08")]
1996impl<S: JsonSchemaEngineSpec> schemars::JsonSchema for EventReport<S> {
1997 fn schema_name() -> String {
1998 format!("EventReportFor{}", S::schema_name())
1999 }
2000
2001 fn json_schema(
2002 generator: &mut schemars::r#gen::SchemaGenerator,
2003 ) -> schemars::schema::Schema {
2004 use crate::schema::with_description;
2005 use schemars::schema::{ObjectValidation, SchemaObject};
2006
2007 let mut obj = ObjectValidation::default();
2008 obj.properties.insert(
2009 "step_events".to_owned(),
2010 with_description(
2011 generator.subschema_for::<Vec<StepEvent<S>>>(),
2012 "A list of step events.",
2013 ),
2014 );
2015 obj.properties.insert(
2016 "progress_events".to_owned(),
2017 with_description(
2018 generator.subschema_for::<Vec<ProgressEvent<S>>>(),
2019 "A list of progress events, or whether we're \
2020 currently waiting for a progress event.",
2021 ),
2022 );
2023 obj.properties.insert(
2024 "root_execution_id".to_owned(),
2025 with_description(
2026 generator.subschema_for::<Option<ExecutionUuid>>(),
2027 "The root execution ID for this report.",
2028 ),
2029 );
2030 obj.properties.insert(
2031 "last_seen".to_owned(),
2032 with_description(
2033 generator.subschema_for::<Option<usize>>(),
2034 "The last event seen.",
2035 ),
2036 );
2037 obj.required = ["step_events", "progress_events"]
2038 .into_iter()
2039 .map(String::from)
2040 .collect();
2041
2042 let mut extensions = serde_json::Map::new();
2043 if let Some(info) = S::rust_type_info() {
2044 extensions.insert(
2045 "x-rust-type".to_owned(),
2046 crate::schema::rust_type_for_generic(&info, "EventReport"),
2047 );
2048 }
2049
2050 SchemaObject {
2051 metadata: Some(Box::new(schemars::schema::Metadata {
2052 description: Some(
2053 "An oxide-update-engine event report.\
2054 \n\nRemote reports can be passed into a \
2055 `StepContext`, in which case they show up as \
2056 nested events."
2057 .to_owned(),
2058 ),
2059 ..Default::default()
2060 })),
2061 instance_type: Some(schemars::schema::InstanceType::Object.into()),
2062 object: Some(Box::new(obj)),
2063 extensions: extensions.into_iter().collect(),
2064 ..Default::default()
2065 }
2066 .into()
2067 }
2068}
2069
2070impl<S: EngineSpec> EventReport<S> {
2071 pub fn from_generic(
2076 value: EventReport<GenericSpec>,
2077 ) -> Result<Self, ConvertGenericError> {
2078 Ok(Self {
2079 step_events: value
2080 .step_events
2081 .into_iter()
2082 .enumerate()
2083 .map(|(index, event)| {
2084 StepEvent::from_generic(event).map_err(|error| {
2085 error.parent_array("step_events", index)
2086 })
2087 })
2088 .collect::<Result<Vec<_>, _>>()?,
2089 progress_events: value
2090 .progress_events
2091 .into_iter()
2092 .enumerate()
2093 .map(|(index, event)| {
2094 ProgressEvent::from_generic(event).map_err(|error| {
2095 error.parent_array("progress_events", index)
2096 })
2097 })
2098 .collect::<Result<Vec<_>, _>>()?,
2099 root_execution_id: value.root_execution_id,
2100 last_seen: value.last_seen,
2101 })
2102 }
2103
2104 pub fn into_generic(self) -> EventReport<GenericSpec> {
2115 EventReport {
2116 step_events: self
2117 .step_events
2118 .into_iter()
2119 .map(|event| event.into_generic())
2120 .collect(),
2121 progress_events: self
2122 .progress_events
2123 .into_iter()
2124 .map(|event| event.into_generic())
2125 .collect(),
2126 root_execution_id: self.root_execution_id,
2127 last_seen: self.last_seen,
2128 }
2129 }
2130}
2131
2132#[cfg(test)]
2133mod tests {
2134 use super::*;
2135 use crate::spec::EngineSpec;
2136
2137 enum TestSpec {}
2138
2139 impl EngineSpec for TestSpec {
2140 fn spec_name() -> String {
2141 "TestSpec".to_owned()
2142 }
2143
2144 type Component = String;
2145 type StepId = usize;
2146 type StepMetadata = serde_json::Value;
2147 type ProgressMetadata = serde_json::Value;
2148 type CompletionMetadata = serde_json::Value;
2149 type SkippedMetadata = serde_json::Value;
2150 type Error = anyhow::Error;
2151 }
2152
2153 fn test_execution_id() -> ExecutionUuid {
2154 "2cc08a14-5e96-4917-bc70-e98293a3b703".parse().expect("parsed UUID")
2155 }
2156
2157 #[test]
2158 fn step_event_parse_unknown() {
2159 let execution_id = test_execution_id();
2160 let tests = [
2161 (
2162 r#"
2163 {
2164 "spec": "TestSpec",
2165 "execution_id": "2cc08a14-5e96-4917-bc70-e98293a3b703",
2166 "event_index": 0,
2167 "total_elapsed": {
2168 "secs": 0,
2169 "nanos": 0
2170 },
2171 "data": {
2172 "kind": "unknown_variant",
2173 "last_step": {
2174 "info": {
2175 "id": 0,
2176 "component": "foo",
2177 "description": "Description",
2178 "index": 0,
2179 "component_index": 0,
2180 "total_component_steps": 1
2181 },
2182 "metadata": null
2183 },
2184 "last_attempt": 1,
2185 "last_outcome": {
2186 "kind": "success",
2187 "metadata": null
2188 },
2189 "step_elapsed": {
2190 "secs": 0,
2191 "nanos": 0
2192 },
2193 "attempt_elapsed": {
2194 "secs": 0,
2195 "nanos": 0
2196 }
2197 }
2198 }
2199 "#,
2200 StepEvent {
2201 spec: TestSpec::spec_name(),
2202 execution_id,
2203 event_index: 0,
2204 total_elapsed: Duration::ZERO,
2205 kind: StepEventKind::Unknown,
2206 },
2207 ),
2208 (
2209 r#"
2210 {
2211 "spec": "TestSpec",
2212 "execution_id": "2cc08a14-5e96-4917-bc70-e98293a3b703",
2213 "event_index": 1,
2214 "total_elapsed": {
2215 "secs": 0,
2216 "nanos": 0
2217 },
2218 "data": {
2219 "kind": "execution_completed",
2220 "last_step": {
2221 "info": {
2222 "id": 0,
2223 "component": "foo",
2224 "description": "Description",
2225 "index": 0,
2226 "component_index": 0,
2227 "total_component_steps": 1
2228 },
2229 "metadata": null
2230 },
2231 "last_attempt": 1,
2232 "last_outcome": {
2233 "kind": "success",
2234 "message": null,
2235 "metadata": null
2236 },
2237 "step_elapsed": {
2238 "secs": 0,
2239 "nanos": 0
2240 },
2241 "attempt_elapsed": {
2242 "secs": 0,
2243 "nanos": 0
2244 },
2245 "unknown_field": 123
2246 }
2247 }
2248 "#,
2249 StepEvent::<TestSpec> {
2250 spec: TestSpec::spec_name(),
2251 execution_id,
2252 event_index: 1,
2253 total_elapsed: Duration::ZERO,
2254 kind: StepEventKind::ExecutionCompleted {
2255 last_step: StepInfoWithMetadata {
2256 info: StepInfo {
2257 id: 0,
2258 component: "foo".to_owned(),
2259 description: "Description".into(),
2260 index: 0,
2261 component_index: 0,
2262 total_component_steps: 1,
2263 },
2264 metadata: None,
2265 },
2266 last_attempt: 1,
2267 last_outcome: StepOutcome::Success {
2268 message: None,
2269 metadata: None,
2270 },
2271 step_elapsed: Duration::ZERO,
2272 attempt_elapsed: Duration::ZERO,
2273 },
2274 },
2275 ),
2276 ];
2277
2278 for (index, (input, expected)) in tests.into_iter().enumerate() {
2279 let actual: StepEvent<TestSpec> = serde_json::from_str(input)
2280 .unwrap_or_else(|error| {
2281 panic!("index {index}: unknown variant deserialized correctly: {error}")
2282 });
2283 assert_eq!(expected, actual, "input matches actual output");
2284 }
2285 }
2286
2287 #[test]
2288 fn progress_event_parse_unknown() {
2289 let execution_id = test_execution_id();
2290
2291 let tests = [
2292 (
2293 r#"
2294 {
2295 "spec": "TestSpec",
2296 "execution_id": "2cc08a14-5e96-4917-bc70-e98293a3b703",
2297 "total_elapsed": {
2298 "secs": 0,
2299 "nanos": 0
2300 },
2301 "data": {
2302 "kind": "unknown_variant",
2303 "step": {
2304 "info": {
2305 "id": 0,
2306 "component": "foo",
2307 "description": "Description",
2308 "index": 0,
2309 "component_index": 0,
2310 "total_component_steps": 1
2311 },
2312 "metadata": null
2313 },
2314 "attempt": 1,
2315 "metadata": null,
2316 "progress": {
2317 "current": 123,
2318 "total": null
2319 },
2320 "step_elapsed": {
2321 "secs": 0,
2322 "nanos": 0
2323 },
2324 "attempt_elapsed": {
2325 "secs": 0,
2326 "nanos": 0
2327 }
2328 }
2329 }
2330 "#,
2331 ProgressEvent {
2332 spec: TestSpec::spec_name(),
2333 execution_id,
2334 total_elapsed: Duration::ZERO,
2335 kind: ProgressEventKind::Unknown,
2336 },
2337 ),
2338 (
2339 r#"
2340 {
2341 "spec": "TestSpec",
2342 "execution_id": "2cc08a14-5e96-4917-bc70-e98293a3b703",
2343 "total_elapsed": {
2344 "secs": 0,
2345 "nanos": 0
2346 },
2347 "data": {
2348 "kind": "progress",
2349 "step": {
2350 "info": {
2351 "id": 0,
2352 "component": "foo",
2353 "description": "Description",
2354 "index": 0,
2355 "component_index": 0,
2356 "total_component_steps": 1
2357 },
2358 "metadata": null
2359 },
2360 "attempt": 1,
2361 "metadata": null,
2362 "progress": {
2363 "current": 123,
2364 "total": null,
2365 "units": "bytes"
2366 },
2367 "step_elapsed": {
2368 "secs": 0,
2369 "nanos": 0
2370 },
2371 "attempt_elapsed": {
2372 "secs": 0,
2373 "nanos": 0
2374 },
2375 "unknown_field": 123
2376 }
2377 }
2378 "#,
2379 ProgressEvent::<TestSpec> {
2380 spec: TestSpec::spec_name(),
2381 execution_id,
2382 total_elapsed: Duration::ZERO,
2383 kind: ProgressEventKind::Progress {
2384 step: StepInfoWithMetadata {
2385 info: StepInfo {
2386 id: 0,
2387 component: "foo".to_owned(),
2388 description: "Description".into(),
2389 index: 0,
2390 component_index: 0,
2391 total_component_steps: 1,
2392 },
2393 metadata: None,
2394 },
2395 attempt: 1,
2396 metadata: serde_json::Value::Null,
2397 progress: Some(ProgressCounter::current(
2398 123,
2399 ProgressUnits::BYTES,
2400 )),
2401 step_elapsed: Duration::ZERO,
2402 attempt_elapsed: Duration::ZERO,
2403 },
2404 },
2405 ),
2406 ];
2407
2408 for (index, (input, expected)) in tests.into_iter().enumerate() {
2409 let actual: ProgressEvent<TestSpec> = serde_json::from_str(input)
2410 .unwrap_or_else(|error| {
2411 panic!("index {index}: unknown variant deserialized correctly: {error}")
2412 });
2413 assert_eq!(expected, actual, "input matches actual output");
2414 }
2415 }
2416
2417 #[cfg(feature = "schemars08")]
2420 mod schema_tests {
2421 use super::*;
2422 use crate::schema::RustTypeInfo;
2423 use schemars::{JsonSchema, r#gen::SchemaGenerator, schema::Schema};
2424
2425 fn get_rust_type_ext(schema: &Schema) -> Option<&serde_json::Value> {
2428 match schema {
2429 Schema::Object(obj) => obj.extensions.get("x-rust-type"),
2430 Schema::Bool(_) => None,
2431 }
2432 }
2433
2434 fn assert_rust_type_ext(
2437 x_rust_type: &serde_json::Value,
2438 expected_crate: &str,
2439 expected_version: &str,
2440 expected_path: &str,
2441 ) {
2442 assert_eq!(
2443 x_rust_type.get("crate").and_then(|v| v.as_str()),
2444 Some(expected_crate),
2445 "x-rust-type crate"
2446 );
2447 assert_eq!(
2448 x_rust_type.get("version").and_then(|v| v.as_str()),
2449 Some(expected_version),
2450 "x-rust-type version"
2451 );
2452 assert_eq!(
2453 x_rust_type.get("path").and_then(|v| v.as_str()),
2454 Some(expected_path),
2455 "x-rust-type path"
2456 );
2457 }
2458
2459 #[test]
2462 fn execution_uuid_kind_rust_type() {
2463 let mut generator = SchemaGenerator::default();
2464 let schema = ExecutionUuidKind::json_schema(&mut generator);
2465 let xrt = get_rust_type_ext(&schema).expect("x-rust-type present");
2466 assert_rust_type_ext(
2467 xrt,
2468 "oxide-update-engine-types",
2469 env!("CARGO_PKG_VERSION"),
2470 "oxide_update_engine_types::events\
2471 ::ExecutionUuidKind",
2472 );
2473 }
2474
2475 #[test]
2476 fn execution_uuid_lifted_rust_type() {
2477 let mut generator = SchemaGenerator::default();
2481 let schema = ExecutionUuid::json_schema(&mut generator);
2482 let xrt = get_rust_type_ext(&schema)
2483 .expect("x-rust-type present on ExecutionUuid");
2484 assert_rust_type_ext(
2485 xrt,
2486 "oxide-update-engine-types",
2487 env!("CARGO_PKG_VERSION"),
2488 "oxide_update_engine_types::events\
2489 ::ExecutionUuid",
2490 );
2491 }
2492
2493 #[test]
2496 fn event_report_generic_spec_schema() {
2497 let schema = schemars::schema_for!(EventReport<GenericSpec>);
2498 let json = serde_json::to_string_pretty(&schema)
2499 .expect("serialized schema");
2500 expectorate::assert_contents(
2501 "tests/output/event_report_generic_spec_schema.json",
2502 &json,
2503 );
2504 }
2505
2506 impl schemars::JsonSchema for super::TestSpec {
2510 fn schema_name() -> String {
2511 "TestSpec".to_owned()
2512 }
2513
2514 fn json_schema(_: &mut SchemaGenerator) -> Schema {
2515 Schema::Bool(true)
2516 }
2517 }
2518
2519 #[test]
2520 fn step_event_no_rust_type_without_info() {
2521 let mut generator = SchemaGenerator::default();
2522 let schema =
2523 StepEvent::<super::TestSpec>::json_schema(&mut generator);
2524 assert!(
2525 get_rust_type_ext(&schema).is_none(),
2526 "no x-rust-type when spec returns None"
2527 );
2528 }
2529
2530 #[test]
2531 fn progress_event_no_rust_type_without_info() {
2532 let mut generator = SchemaGenerator::default();
2533 let schema =
2534 ProgressEvent::<super::TestSpec>::json_schema(&mut generator);
2535 assert!(
2536 get_rust_type_ext(&schema).is_none(),
2537 "no x-rust-type when spec returns None"
2538 );
2539 }
2540
2541 #[test]
2542 fn event_report_no_rust_type_without_info() {
2543 let mut generator = SchemaGenerator::default();
2544 let schema =
2545 EventReport::<super::TestSpec>::json_schema(&mut generator);
2546 assert!(
2547 get_rust_type_ext(&schema).is_none(),
2548 "no x-rust-type when spec returns None"
2549 );
2550 }
2551
2552 enum TestSpecWithInfo {}
2559
2560 impl crate::spec::EngineSpec for TestSpecWithInfo {
2561 fn spec_name() -> String {
2562 "TestSpecWithInfo".to_owned()
2563 }
2564
2565 type Component = serde_json::Value;
2566 type StepId = serde_json::Value;
2567 type StepMetadata = serde_json::Value;
2568 type ProgressMetadata = serde_json::Value;
2569 type CompletionMetadata = serde_json::Value;
2570 type SkippedMetadata = serde_json::Value;
2571 type Error = anyhow::Error;
2572
2573 fn rust_type_info() -> Option<RustTypeInfo> {
2574 Some(RustTypeInfo {
2575 crate_name: "my-external-crate",
2576 version: "1.2.3",
2577 path: "my_external_crate::MySpec",
2578 })
2579 }
2580 }
2581
2582 impl JsonSchema for TestSpecWithInfo {
2583 fn schema_name() -> String {
2584 "TestSpecWithInfo".to_owned()
2585 }
2586
2587 fn json_schema(_: &mut SchemaGenerator) -> Schema {
2588 Schema::Bool(true)
2589 }
2590 }
2591
2592 fn assert_external_spec_rust_type(
2596 x_rust_type: &serde_json::Value,
2597 expected_outer_path: &str,
2598 expected_param_crate: &str,
2599 expected_param_version: &str,
2600 expected_param_path: &str,
2601 ) {
2602 assert_rust_type_ext(
2604 x_rust_type,
2605 "oxide-update-engine-types",
2606 env!("CARGO_PKG_VERSION"),
2607 expected_outer_path,
2608 );
2609
2610 let params = x_rust_type
2612 .get("parameters")
2613 .and_then(|v| v.as_array())
2614 .expect("parameters array present");
2615 assert_eq!(params.len(), 1, "exactly one parameter");
2616 let param_xrt = params[0]
2617 .get("x-rust-type")
2618 .expect("parameter x-rust-type present");
2619 assert_rust_type_ext(
2620 param_xrt,
2621 expected_param_crate,
2622 expected_param_version,
2623 expected_param_path,
2624 );
2625 }
2626
2627 #[test]
2628 fn step_event_external_spec_rust_type() {
2629 let mut generator = SchemaGenerator::default();
2630 let schema =
2631 StepEvent::<TestSpecWithInfo>::json_schema(&mut generator);
2632 let xrt = get_rust_type_ext(&schema).expect("x-rust-type present");
2633 assert_external_spec_rust_type(
2634 xrt,
2635 "oxide_update_engine_types::events::StepEvent",
2636 "my-external-crate",
2637 "1.2.3",
2638 "my_external_crate::MySpec",
2639 );
2640 }
2641
2642 #[test]
2643 fn progress_event_external_spec_rust_type() {
2644 let mut generator = SchemaGenerator::default();
2645 let schema =
2646 ProgressEvent::<TestSpecWithInfo>::json_schema(&mut generator);
2647 let xrt = get_rust_type_ext(&schema).expect("x-rust-type present");
2648 assert_external_spec_rust_type(
2649 xrt,
2650 "oxide_update_engine_types::events\
2651 ::ProgressEvent",
2652 "my-external-crate",
2653 "1.2.3",
2654 "my_external_crate::MySpec",
2655 );
2656 }
2657
2658 #[test]
2659 fn event_report_external_spec_rust_type() {
2660 let mut generator = SchemaGenerator::default();
2661 let schema =
2662 EventReport::<TestSpecWithInfo>::json_schema(&mut generator);
2663 let xrt = get_rust_type_ext(&schema).expect("x-rust-type present");
2664 assert_external_spec_rust_type(
2665 xrt,
2666 "oxide_update_engine_types::events\
2667 ::EventReport",
2668 "my-external-crate",
2669 "1.2.3",
2670 "my_external_crate::MySpec",
2671 );
2672 }
2673 }
2674}