1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionError;
8use crate::FunctionFqn;
9use crate::JoinSetId;
10use crate::Params;
11use crate::StrVariant;
12use crate::SupportedFunctionReturnValue;
13use crate::component_id::InputContentDigest;
14use crate::prefixed_ulid::DelayId;
15use crate::prefixed_ulid::DeploymentId;
16use crate::prefixed_ulid::ExecutionIdDerived;
17use crate::prefixed_ulid::ExecutorId;
18use crate::prefixed_ulid::RunId;
19use assert_matches::assert_matches;
20use async_trait::async_trait;
21use chrono::TimeDelta;
22use chrono::{DateTime, Utc};
23use http_client_trace::HttpClientTrace;
24use serde::Deserialize;
25use serde::Serialize;
26use std::fmt::Debug;
27use std::fmt::Display;
28use std::panic::Location;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33use tracing::instrument;
34use tracing_error::SpanTrace;
35
36pub const STATE_PENDING_AT: &str = "pending_at";
38pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
39pub const STATE_LOCKED: &str = "locked";
40pub const STATE_FINISHED: &str = "finished";
41pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; #[derive(Debug, PartialEq, Eq, Clone)]
44pub struct ExecutionLog {
45 pub execution_id: ExecutionId,
46 pub events: Vec<ExecutionEvent>,
47 pub responses: Vec<ResponseWithCursor>,
48 pub next_version: Version, pub pending_state: PendingState, pub component_digest: InputContentDigest, pub component_type: ComponentType,
52 pub deployment_id: DeploymentId, }
54
55impl ExecutionLog {
56 #[must_use]
59 pub fn can_be_retried_after(
60 temporary_event_count: u32,
61 max_retries: Option<u32>,
62 retry_exp_backoff: Duration,
63 ) -> Option<Duration> {
64 if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
66 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
68 Some(duration)
69 } else {
70 None
71 }
72 }
73
74 #[must_use]
75 pub fn compute_retry_duration_when_retrying_forever(
76 temporary_event_count: u32,
77 retry_exp_backoff: Duration,
78 ) -> Duration {
79 Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
80 .expect("`max_retries` set to MAX must never return None")
81 }
82
83 #[must_use]
84 pub fn ffqn(&self) -> &FunctionFqn {
85 assert_matches!(self.events.first(), Some(ExecutionEvent {
86 event: ExecutionRequest::Created { ffqn, .. },
87 ..
88 }) => ffqn)
89 }
90
91 #[must_use]
92 pub fn params(&self) -> &Params {
93 assert_matches!(self.events.first(), Some(ExecutionEvent {
94 event: ExecutionRequest::Created { params, .. },
95 ..
96 }) => params)
97 }
98
99 #[must_use]
100 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
101 assert_matches!(self.events.first(), Some(ExecutionEvent {
102 event: ExecutionRequest::Created { parent, .. },
103 ..
104 }) => parent.clone())
105 }
106
107 #[must_use]
108 pub fn last_event(&self) -> &ExecutionEvent {
109 self.events.last().expect("must contain at least one event")
110 }
111
112 #[must_use]
113 pub fn is_finished(&self) -> bool {
114 matches!(
115 self.events.last(),
116 Some(ExecutionEvent {
117 event: ExecutionRequest::Finished { .. },
118 ..
119 })
120 )
121 }
122
123 #[must_use]
124 pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
125 if let ExecutionEvent {
126 event: ExecutionRequest::Finished { result, .. },
127 ..
128 } = self.events.last().expect("must contain at least one event")
129 {
130 Some(result.clone())
131 } else {
132 None
133 }
134 }
135
136 pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
137 self.events.iter().filter_map(|event| {
138 if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
139 Some((eh.clone(), event.version.clone()))
140 } else {
141 None
142 }
143 })
144 }
145
146 #[cfg(feature = "test")]
147 #[must_use]
148 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
149 self.events
150 .iter()
151 .find_map(move |event| match &event.event {
152 ExecutionRequest::HistoryEvent {
153 event:
154 HistoryEvent::JoinSetRequest {
155 join_set_id: found,
156 request,
157 },
158 ..
159 } if *join_set_id == *found => Some(request),
160 _ => None,
161 })
162 }
163}
164
165pub type VersionType = u32;
166#[derive(
167 Debug,
168 Default,
169 Clone,
170 PartialEq,
171 Eq,
172 Hash,
173 derive_more::Display,
174 derive_more::Into,
175 serde::Serialize,
176 serde::Deserialize,
177)]
178#[serde(transparent)]
179pub struct Version(pub VersionType);
180impl Version {
181 #[must_use]
182 pub fn new(arg: VersionType) -> Version {
183 Version(arg)
184 }
185
186 #[must_use]
187 pub fn increment(&self) -> Version {
188 Version(self.0 + 1)
189 }
190}
191impl TryFrom<i64> for Version {
192 type Error = VersionParseError;
193 fn try_from(value: i64) -> Result<Self, Self::Error> {
194 VersionType::try_from(value)
195 .map(Version::new)
196 .map_err(|_| VersionParseError)
197 }
198}
199impl From<Version> for usize {
200 fn from(value: Version) -> Self {
201 usize::try_from(value.0).expect("16 bit systems are unsupported")
202 }
203}
204impl From<&Version> for usize {
205 fn from(value: &Version) -> Self {
206 usize::try_from(value.0).expect("16 bit systems are unsupported")
207 }
208}
209
210#[derive(Debug, thiserror::Error)]
211#[error("version must be u32")]
212pub struct VersionParseError;
213
214#[derive(
215 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
216)]
217#[display("{event}")]
218pub struct ExecutionEvent {
219 pub created_at: DateTime<Utc>,
220 pub event: ExecutionRequest,
221 #[serde(skip_serializing_if = "Option::is_none")]
222 pub backtrace_id: Option<Version>,
223 pub version: Version,
224}
225
226#[derive(
227 Debug,
228 Clone,
229 Copy,
230 PartialEq,
231 Eq,
232 derive_more::Display,
233 derive_more::Into,
234 Serialize, )]
236pub struct ResponseCursor(pub u32);
237
238#[derive(Debug, Clone, PartialEq, Eq, Serialize )]
239pub struct ResponseWithCursor {
240 pub event: JoinSetResponseEventOuter,
241 pub cursor: ResponseCursor,
242}
243
244#[derive(Debug)]
245pub struct ListExecutionEventsResponse {
246 pub events: Vec<ExecutionEvent>,
247 pub max_version: Version,
248}
249
250#[derive(Debug)]
251pub struct ListResponsesResponse {
252 pub responses: Vec<ResponseWithCursor>,
253 pub max_cursor: ResponseCursor,
254}
255
256#[derive(Debug, Clone, PartialEq, Eq, Serialize )]
257pub struct JoinSetResponseEventOuter {
258 pub created_at: DateTime<Utc>,
259 pub event: JoinSetResponseEvent,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
263pub struct JoinSetResponseEvent {
264 pub join_set_id: JoinSetId,
265 pub event: JoinSetResponse,
266}
267
268#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
269#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
270#[serde(tag = "type", rename_all = "snake_case")]
271pub enum JoinSetResponse {
272 #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
273 DelayFinished {
274 delay_id: DelayId,
275 result: Result<(), ()>,
276 },
277 #[display("{result}: {child_execution_id}")] ChildExecutionFinished {
279 child_execution_id: ExecutionIdDerived,
280 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
281 finished_version: Version,
282 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
283 result: SupportedFunctionReturnValue,
284 },
285}
286
287pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
288 ffqn: FunctionFqn::new_static("", ""),
289 params: Params::empty(),
290 parent: None,
291 scheduled_at: DateTime::from_timestamp_nanos(0),
292 component_id: ComponentId::dummy_activity(),
293 deployment_id: DeploymentId::from_parts(0, 0),
294 metadata: ExecutionMetadata::empty(),
295 scheduled_by: None,
296};
297pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
298 event: HistoryEvent::JoinSetCreate {
299 join_set_id: JoinSetId {
300 kind: crate::JoinSetKind::OneOff,
301 name: StrVariant::empty(),
302 },
303 },
304};
305
306#[derive(
307 Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
308)]
309#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
310#[serde(rename_all = "snake_case")]
311pub enum ExecutionRequest {
312 #[display("Created({ffqn}, `{scheduled_at}`)")]
313 Created {
314 ffqn: FunctionFqn,
315 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
316 #[debug(skip)]
317 params: Params,
318 parent: Option<(ExecutionId, JoinSetId)>,
319 scheduled_at: DateTime<Utc>,
320 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
321 component_id: ComponentId,
322 deployment_id: DeploymentId,
323 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
324 metadata: ExecutionMetadata,
325 scheduled_by: Option<ExecutionId>,
326 },
327 Locked(Locked),
328 #[display("Unlocked(`{backoff_expires_at}`)")]
333 Unlocked {
334 backoff_expires_at: DateTime<Utc>,
335 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
336 reason: StrVariant,
337 },
338 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
341 TemporarilyFailed {
342 backoff_expires_at: DateTime<Utc>,
343 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
344 reason: StrVariant,
345 detail: Option<String>,
346 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
347 http_client_traces: Option<Vec<HttpClientTrace>>,
348 },
349 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
352 TemporarilyTimedOut {
353 backoff_expires_at: DateTime<Utc>,
354 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
355 http_client_traces: Option<Vec<HttpClientTrace>>,
356 },
357 #[display("Finished")]
359 Finished {
360 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
361 result: SupportedFunctionReturnValue,
362 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
363 http_client_traces: Option<Vec<HttpClientTrace>>,
364 },
365
366 #[display("HistoryEvent({event})")]
367 HistoryEvent {
368 event: HistoryEvent,
369 },
370 #[display("Paused")]
371 Paused,
372 #[display("Unpaused")]
373 Unpaused,
374}
375
376impl ExecutionRequest {
377 #[must_use]
378 pub fn is_temporary_event(&self) -> bool {
379 matches!(
380 self,
381 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
382 )
383 }
384
385 #[must_use]
387 pub const fn variant(&self) -> &'static str {
388 match self {
389 ExecutionRequest::Created { .. } => "created",
390 ExecutionRequest::Locked(_) => "locked",
391 ExecutionRequest::Unlocked { .. } => "unlocked",
392 ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
393 ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
394 ExecutionRequest::Finished { .. } => "finished",
395 ExecutionRequest::HistoryEvent { .. } => "history_event",
396 ExecutionRequest::Paused => "paused",
397 ExecutionRequest::Unpaused => "unpaused",
398 }
399 }
400
401 #[must_use]
402 pub fn join_set_id(&self) -> Option<&JoinSetId> {
403 match self {
404 Self::Created {
405 parent: Some((_parent_id, join_set_id)),
406 ..
407 } => Some(join_set_id),
408 Self::HistoryEvent {
409 event:
410 HistoryEvent::JoinSetCreate { join_set_id, .. }
411 | HistoryEvent::JoinSetRequest { join_set_id, .. }
412 | HistoryEvent::JoinNext { join_set_id, .. },
413 } => Some(join_set_id),
414 _ => None,
415 }
416 }
417}
418
419#[derive(
420 Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
421)]
422#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
423#[display("Locked(`{lock_expires_at}`, {component_id})")]
424pub struct Locked {
425 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
426 pub component_id: ComponentId,
427 pub executor_id: ExecutorId,
428 pub deployment_id: DeploymentId,
429 pub run_id: RunId,
430 pub lock_expires_at: DateTime<Utc>,
431 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
432 pub retry_config: ComponentRetryConfig,
433}
434
435#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
436#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
437#[serde(tag = "type", rename_all = "snake_case")]
438pub enum PersistKind {
439 #[display("RandomU64({min}, {max_inclusive})")]
440 RandomU64 { min: u64, max_inclusive: u64 },
441 #[display("RandomString({min_length}, {max_length_exclusive})")]
442 RandomString {
443 min_length: u64,
444 max_length_exclusive: u64,
445 },
446}
447
448#[must_use]
449pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
450 value.to_be_bytes()
451}
452
453#[must_use]
454pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
455 u64::from_be_bytes(bytes)
456}
457
458#[derive(
459 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
460)]
461#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
462#[serde(tag = "type", rename_all = "snake_case")]
463pub enum HistoryEvent {
465 #[display("Persist")]
467 Persist {
468 #[debug(skip)]
469 value: Vec<u8>, kind: PersistKind,
471 },
472 #[display("JoinSetCreate({join_set_id})")]
473 JoinSetCreate { join_set_id: JoinSetId },
474 #[display("JoinSetRequest({request})")]
475 JoinSetRequest {
477 join_set_id: JoinSetId,
478 request: JoinSetRequest,
479 },
480 #[display("JoinNext({join_set_id})")]
486 JoinNext {
487 join_set_id: JoinSetId,
488 run_expires_at: DateTime<Utc>,
491 requested_ffqn: Option<FunctionFqn>,
494 closing: bool,
496 },
497 #[display("JoinNextTry({join_set_id}, {outcome})")]
499 JoinNextTry {
500 join_set_id: JoinSetId,
501 #[serde(alias = "found_response")]
505 outcome: JoinNextTryOutcome,
506 },
507 #[display("JoinNextTooMany({join_set_id})")]
509 JoinNextTooMany {
510 join_set_id: JoinSetId,
511 requested_ffqn: Option<FunctionFqn>,
514 },
515 #[display("Schedule({execution_id}, {schedule_at})")]
516 Schedule {
517 execution_id: ExecutionId,
518 schedule_at: HistoryEventScheduleAt, },
520 #[display("Stub({target_execution_id})")]
521 Stub {
522 target_execution_id: ExecutionIdDerived,
523 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
524 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
527}
528
529#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize)]
530#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
531#[serde(rename_all = "snake_case")]
532pub enum JoinNextTryOutcome {
533 #[display("found")]
535 Found,
536 #[display("pending")]
538 Pending,
539 #[display("all_processed")]
541 AllProcessed,
542}
543
544impl<'de> serde::Deserialize<'de> for JoinNextTryOutcome {
545 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
546 use serde::Deserialize;
547 #[derive(Deserialize)]
549 #[serde(untagged)]
550 enum BoolOrString {
551 Bool(bool),
552 String(String),
553 }
554 match BoolOrString::deserialize(deserializer)? {
555 BoolOrString::Bool(b) => Ok(JoinNextTryOutcome::from(b)),
556 BoolOrString::String(s) => match s.as_str() {
557 "found" => Ok(JoinNextTryOutcome::Found),
558 "pending" => Ok(JoinNextTryOutcome::Pending),
559 "all_processed" => Ok(JoinNextTryOutcome::AllProcessed),
560 other => Err(serde::de::Error::unknown_variant(
561 other,
562 &["found", "pending", "all_processed"],
563 )),
564 },
565 }
566 }
567}
568
569impl From<bool> for JoinNextTryOutcome {
570 fn from(found_response: bool) -> Self {
574 if found_response {
575 JoinNextTryOutcome::Found
576 } else {
577 JoinNextTryOutcome::Pending
578 }
579 }
580}
581
582#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
583#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
584#[serde(rename_all = "snake_case")]
585pub enum HistoryEventScheduleAt {
586 Now,
587 #[display("At(`{_0}`)")]
588 At(DateTime<Utc>),
589 #[display("In({_0:?})")]
590 In(Duration),
591}
592
593#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
594pub enum ScheduleAtConversionError {
595 #[error("source duration value is out of range")]
596 OutOfRangeError,
597}
598
599impl HistoryEventScheduleAt {
600 pub fn as_date_time(
601 &self,
602 now: DateTime<Utc>,
603 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
604 match self {
605 Self::Now => Ok(now),
606 Self::At(date_time) => Ok(*date_time),
607 Self::In(duration) => {
608 let time_delta = TimeDelta::from_std(*duration)
609 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
610 now.checked_add_signed(time_delta)
611 .ok_or(ScheduleAtConversionError::OutOfRangeError)
612 }
613 }
614 }
615}
616
617#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
618#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
619#[serde(tag = "type", rename_all = "snake_case")]
620pub enum JoinSetRequest {
621 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
623 DelayRequest {
624 delay_id: DelayId,
625 expires_at: DateTime<Utc>,
626 schedule_at: HistoryEventScheduleAt,
627 },
628 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
630 ChildExecutionRequest {
631 child_execution_id: ExecutionIdDerived,
632 target_ffqn: FunctionFqn,
633 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
634 params: Params,
635 },
636}
637
638#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
640pub enum DbErrorGeneric {
641 #[error("database error: {reason}")]
642 Uncategorized {
643 reason: StrVariant,
644 #[eq(skip)]
645 #[partial_eq(skip)]
646 context: SpanTrace,
647 #[eq(skip)]
648 #[partial_eq(skip)]
649 #[source]
650 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
651 loc: &'static Location<'static>,
652 },
653 #[error("database was closed")]
654 Close,
655}
656
657#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
658pub enum DbErrorWriteNonRetriable {
659 #[error("validation failed: {0}")]
660 ValidationFailed(StrVariant),
661 #[error("conflict")]
662 Conflict,
663 #[error("illegal state: {reason}")]
664 IllegalState {
665 reason: StrVariant,
666 #[eq(skip)]
667 #[partial_eq(skip)]
668 context: SpanTrace,
669 #[eq(skip)]
670 #[partial_eq(skip)]
671 #[source]
672 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
673 loc: &'static Location<'static>,
674 },
675 #[error("version conflict: expected: {expected}, got: {requested}")]
676 VersionConflict {
677 expected: Version,
678 requested: Version,
679 },
680}
681
682#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
684pub enum DbErrorWrite {
685 #[error("cannot write - row not found")]
686 NotFound,
687 #[error("non-retriable error: {0}")]
688 NonRetriable(#[from] DbErrorWriteNonRetriable),
689 #[error(transparent)]
690 Generic(#[from] DbErrorGeneric),
691}
692
693#[derive(Debug, Clone, thiserror::Error, PartialEq)]
695pub enum DbErrorRead {
696 #[error("cannot read - row not found")]
697 NotFound,
698 #[error(transparent)]
699 Generic(#[from] DbErrorGeneric),
700}
701
702#[derive(Debug, thiserror::Error, PartialEq)]
703pub enum DbErrorReadWithTimeout {
704 #[error("timeout")]
705 Timeout(TimeoutOutcome),
706 #[error(transparent)]
707 DbErrorRead(#[from] DbErrorRead),
708}
709
710pub type AppendResponse = Version;
713pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
714
715#[derive(Debug, Clone)]
716pub struct LockedExecution {
717 pub execution_id: ExecutionId,
718 pub next_version: Version,
719 pub metadata: ExecutionMetadata,
720 pub locked_event: Locked,
721 pub ffqn: FunctionFqn,
722 pub params: Params,
723 pub event_history: Vec<(HistoryEvent, Version)>,
724 pub responses: Vec<ResponseWithCursor>,
725 pub parent: Option<(ExecutionId, JoinSetId)>,
726 pub intermittent_event_count: u32,
727}
728
729pub type LockPendingResponse = Vec<LockedExecution>;
730pub type AppendBatchResponse = Version;
731
732#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
733#[display("{event}")]
734pub struct AppendRequest {
735 pub created_at: DateTime<Utc>,
736 pub event: ExecutionRequest,
737}
738
739#[derive(Debug, Clone)]
740pub struct CreateRequest {
741 pub created_at: DateTime<Utc>,
742 pub execution_id: ExecutionId,
743 pub ffqn: FunctionFqn,
744 pub params: Params,
745 pub parent: Option<(ExecutionId, JoinSetId)>,
746 pub scheduled_at: DateTime<Utc>,
747 pub component_id: ComponentId,
748 pub deployment_id: DeploymentId,
749 pub metadata: ExecutionMetadata,
750 pub scheduled_by: Option<ExecutionId>,
751}
752
753impl From<CreateRequest> for ExecutionRequest {
754 fn from(value: CreateRequest) -> Self {
755 Self::Created {
756 ffqn: value.ffqn,
757 params: value.params,
758 parent: value.parent,
759 scheduled_at: value.scheduled_at,
760 component_id: value.component_id,
761 deployment_id: value.deployment_id,
762 metadata: value.metadata,
763 scheduled_by: value.scheduled_by,
764 }
765 }
766}
767
768#[async_trait]
769pub trait DbPool: Send + Sync {
770 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
771
772 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
773
774 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
775
776 #[cfg(feature = "test")]
777 async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
778}
779
780#[async_trait]
781pub trait DbPoolCloseable {
782 async fn close(&self);
783}
784
785#[derive(Clone, Debug)]
786pub struct AppendEventsToExecution {
787 pub execution_id: ExecutionId,
788 pub version: Version,
789 pub batch: Vec<AppendRequest>,
790}
791
792#[derive(Clone, Debug)]
793pub struct AppendResponseToExecution {
794 pub parent_execution_id: ExecutionId,
795 pub created_at: DateTime<Utc>,
796 pub join_set_id: JoinSetId,
797 pub child_execution_id: ExecutionIdDerived,
798 pub finished_version: Version,
799 pub result: SupportedFunctionReturnValue,
800}
801
802#[async_trait]
803pub trait DbExecutor: Send + Sync {
804 #[expect(clippy::too_many_arguments)]
805 async fn lock_pending_by_ffqns(
806 &self,
807 batch_size: u32,
808 pending_at_or_sooner: DateTime<Utc>,
809 ffqns: Arc<[FunctionFqn]>,
810 created_at: DateTime<Utc>,
811 component_id: ComponentId,
812 deployment_id: DeploymentId,
813 executor_id: ExecutorId,
814 lock_expires_at: DateTime<Utc>,
815 run_id: RunId,
816 retry_config: ComponentRetryConfig,
817 ) -> Result<LockPendingResponse, DbErrorWrite>;
818
819 #[expect(clippy::too_many_arguments)]
820 async fn lock_pending_by_component_digest(
821 &self,
822 batch_size: u32,
823 pending_at_or_sooner: DateTime<Utc>,
824 component_id: &ComponentId,
825 deployment_id: DeploymentId,
826 created_at: DateTime<Utc>,
827 executor_id: ExecutorId,
828 lock_expires_at: DateTime<Utc>,
829 run_id: RunId,
830 retry_config: ComponentRetryConfig,
831 ) -> Result<LockPendingResponse, DbErrorWrite>;
832
833 #[cfg(feature = "test")]
834 #[expect(clippy::too_many_arguments)]
835 async fn lock_one(
836 &self,
837 created_at: DateTime<Utc>,
838 component_id: ComponentId,
839 deployment_id: DeploymentId,
840 execution_id: &ExecutionId,
841 run_id: RunId,
842 version: Version,
843 executor_id: ExecutorId,
844 lock_expires_at: DateTime<Utc>,
845 retry_config: ComponentRetryConfig,
846 ) -> Result<LockedExecution, DbErrorWrite>;
847
848 async fn append(
851 &self,
852 execution_id: ExecutionId,
853 version: Version,
854 req: AppendRequest,
855 ) -> Result<AppendResponse, DbErrorWrite>;
856
857 async fn append_batch_respond_to_parent(
860 &self,
861 events: AppendEventsToExecution,
862 response: AppendResponseToExecution,
863 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
865
866 async fn wait_for_pending_by_ffqn(
871 &self,
872 pending_at_or_sooner: DateTime<Utc>,
873 ffqns: Arc<[FunctionFqn]>,
874 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
875 );
876
877 async fn wait_for_pending_by_component_digest(
882 &self,
883 pending_at_or_sooner: DateTime<Utc>,
884 component_digest: &InputContentDigest,
885 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
886 );
887
888 async fn cancel_activity_with_retries(
889 &self,
890 execution_id: &ExecutionId,
891 cancelled_at: DateTime<Utc>,
892 ) -> Result<CancelOutcome, DbErrorWrite> {
893 let mut retries = 5;
894 loop {
895 let res = self.cancel_activity(execution_id, cancelled_at).await;
896 if res.is_ok() || retries == 0 {
897 return res;
898 }
899 retries -= 1;
900 }
901 }
902
903 async fn get_last_execution_event(
905 &self,
906 execution_id: &ExecutionId,
907 ) -> Result<ExecutionEvent, DbErrorRead>;
908
909 async fn cancel_activity(
910 &self,
911 execution_id: &ExecutionId,
912 cancelled_at: DateTime<Utc>,
913 ) -> Result<CancelOutcome, DbErrorWrite> {
914 debug!("Determining cancellation state of {execution_id}");
915
916 let last_event = self
917 .get_last_execution_event(execution_id)
918 .await
919 .map_err(DbErrorWrite::from)?;
920 if let ExecutionRequest::Finished {
921 result:
922 SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
923 kind: ExecutionFailureKind::Cancelled,
924 ..
925 }),
926 ..
927 } = last_event.event
928 {
929 return Ok(CancelOutcome::Cancelled);
930 } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
931 debug!("Not cancelling, {execution_id} is already finished");
932 return Ok(CancelOutcome::AlreadyFinished);
933 }
934 let finished_version = last_event.version.increment();
935 let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
936 reason: None,
937 kind: ExecutionFailureKind::Cancelled,
938 detail: None,
939 });
940 let cancel_request = AppendRequest {
941 created_at: cancelled_at,
942 event: ExecutionRequest::Finished {
943 result: child_result.clone(),
944 http_client_traces: None,
945 },
946 };
947 debug!("Cancelling activity {execution_id} at {finished_version}");
948 if let ExecutionId::Derived(execution_id) = execution_id {
949 let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
950 let child_execution_id = ExecutionId::Derived(execution_id.clone());
951 self.append_batch_respond_to_parent(
952 AppendEventsToExecution {
953 execution_id: child_execution_id,
954 version: finished_version.clone(),
955 batch: vec![cancel_request],
956 },
957 AppendResponseToExecution {
958 parent_execution_id,
959 created_at: cancelled_at,
960 join_set_id: join_set_id.clone(),
961 child_execution_id: execution_id.clone(),
962 finished_version,
963 result: child_result,
964 },
965 cancelled_at,
966 )
967 .await?;
968 } else {
969 self.append(execution_id.clone(), finished_version, cancel_request)
970 .await?;
971 }
972 debug!("Cancelled {execution_id}");
973 Ok(CancelOutcome::Cancelled)
974 }
975}
976
977pub enum AppendDelayResponseOutcome {
978 Success,
979 AlreadyFinished,
980 AlreadyCancelled,
981}
982
983#[derive(Debug, Clone, Default)]
984pub struct ListExecutionsFilter {
985 pub ffqn_prefix: Option<String>,
986 pub show_derived: bool,
987 pub hide_finished: bool,
988 pub execution_id_prefix: Option<String>,
989 pub component_digest: Option<InputContentDigest>,
990 pub deployment_id: Option<DeploymentId>,
991}
992
993#[async_trait]
994pub trait DbExternalApi: DbConnection {
995 async fn get_backtrace(
997 &self,
998 execution_id: &ExecutionId,
999 filter: BacktraceFilter,
1000 ) -> Result<BacktraceInfo, DbErrorRead>;
1001
1002 async fn list_executions(
1004 &self,
1005 filter: ListExecutionsFilter,
1006 pagination: ExecutionListPagination,
1007 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
1008
1009 async fn list_execution_events(
1014 &self,
1015 execution_id: &ExecutionId,
1016 pagination: Pagination<VersionType>,
1017 include_backtrace_id: bool,
1018 ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
1019
1020 async fn list_responses(
1029 &self,
1030 execution_id: &ExecutionId,
1031 pagination: Pagination<u32>,
1032 ) -> Result<ListResponsesResponse, DbErrorRead>;
1033
1034 async fn list_execution_events_responses(
1035 &self,
1036 execution_id: &ExecutionId,
1037 req_since: &Version,
1038 req_max_length: VersionType,
1039 req_include_backtrace_id: bool,
1040 resp_pagination: Pagination<VersionType>,
1041 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
1042
1043 async fn upgrade_execution_component(
1044 &self,
1045 execution_id: &ExecutionId,
1046 old: &InputContentDigest,
1047 new: &InputContentDigest,
1048 ) -> Result<(), DbErrorWrite>;
1049
1050 async fn list_logs(
1051 &self,
1052 execution_id: &ExecutionId,
1053 filter: LogFilter,
1054 pagination: Pagination<u32>,
1055 ) -> Result<ListLogsResponse, DbErrorRead>;
1056
1057 async fn list_deployment_states(
1058 &self,
1059 current_time: DateTime<Utc>,
1060 pagination: Pagination<Option<DeploymentId>>,
1061 ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1062
1063 async fn pause_execution(
1065 &self,
1066 execution_id: &ExecutionId,
1067 paused_at: DateTime<Utc>,
1068 ) -> Result<AppendResponse, DbErrorWrite>;
1069
1070 async fn unpause_execution(
1072 &self,
1073 execution_id: &ExecutionId,
1074 unpaused_at: DateTime<Utc>,
1075 ) -> Result<AppendResponse, DbErrorWrite>;
1076}
1077pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1078pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1079 Pagination::OlderThan {
1080 length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1081 cursor: None,
1082 including_cursor: false,
1083 };
1084
1085pub struct DeploymentState {
1086 pub deployment_id: DeploymentId,
1087 pub locked: u32,
1088 pub pending: u32,
1090 pub scheduled: u32,
1092 pub blocked: u32,
1093 pub finished: u32,
1094}
1095impl DeploymentState {
1096 #[must_use]
1097 pub fn new(deployment_id: DeploymentId) -> Self {
1098 DeploymentState {
1099 deployment_id,
1100 locked: 0,
1101 pending: 0,
1102 scheduled: 0,
1103 blocked: 0,
1104 finished: 0,
1105 }
1106 }
1107}
1108
1109#[derive(Debug)]
1110pub struct ListLogsResponse {
1111 pub items: Vec<LogEntryRow>,
1112 pub next_page: Pagination<u32>, pub prev_page: Option<Pagination<u32>>, }
1115
1116#[derive(Debug)]
1117pub struct LogFilter {
1118 show_logs: bool,
1119 show_streams: bool,
1120 levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>, }
1123impl LogFilter {
1124 #[must_use]
1126 pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1127 LogFilter {
1128 show_logs: true,
1129 show_streams: false,
1130 levels,
1131 stream_types: Vec::new(),
1132 }
1133 }
1134 #[must_use]
1136 pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1137 LogFilter {
1138 show_logs: false,
1139 show_streams: true,
1140 levels: Vec::new(),
1141 stream_types,
1142 }
1143 }
1144 #[must_use]
1146 pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1147 LogFilter {
1148 show_logs: true,
1149 show_streams: true,
1150 levels,
1151 stream_types,
1152 }
1153 }
1154 #[must_use]
1156 pub fn should_show_logs(&self) -> bool {
1157 self.show_logs
1158 }
1159 #[must_use]
1160 pub fn should_show_streams(&self) -> bool {
1161 self.show_streams
1162 }
1163 #[must_use]
1164 pub fn levels(&self) -> &Vec<LogLevel> {
1165 &self.levels
1166 }
1167 #[must_use]
1168 pub fn stream_types(&self) -> &Vec<LogStreamType> {
1169 &self.stream_types
1170 }
1171}
1172
1173#[derive(Debug, Clone)]
1174pub struct ExecutionWithStateRequestsResponses {
1175 pub execution_with_state: ExecutionWithState,
1176 pub events: Vec<ExecutionEvent>,
1177 pub responses: Vec<ResponseWithCursor>,
1178 pub max_version: Version,
1179 pub max_cursor: ResponseCursor,
1180}
1181
1182#[async_trait]
1183pub trait DbConnection: DbExecutor {
1184 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1186
1187 async fn append_delay_response(
1188 &self,
1189 created_at: DateTime<Utc>,
1190 execution_id: ExecutionId,
1191 join_set_id: JoinSetId,
1192 delay_id: DelayId,
1193 outcome: Result<(), ()>, ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1195
1196 async fn append_batch(
1199 &self,
1200 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
1202 execution_id: ExecutionId,
1203 version: Version,
1204 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1205
1206 async fn append_batch_create_new_execution(
1209 &self,
1210 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
1213 version: Version,
1214 child_req: Vec<CreateRequest>,
1215 backtraces: Vec<BacktraceInfo>,
1216 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1217
1218 async fn get_execution_event(
1220 &self,
1221 execution_id: &ExecutionId,
1222 version: &Version,
1223 ) -> Result<ExecutionEvent, DbErrorRead>;
1224
1225 #[instrument(skip(self))]
1226 async fn get_create_request(
1227 &self,
1228 execution_id: &ExecutionId,
1229 ) -> Result<CreateRequest, DbErrorRead> {
1230 let execution_event = self
1231 .get_execution_event(execution_id, &Version::new(0))
1232 .await?;
1233 if let ExecutionRequest::Created {
1234 ffqn,
1235 params,
1236 parent,
1237 scheduled_at,
1238 component_id,
1239 deployment_id,
1240 metadata,
1241 scheduled_by,
1242 } = execution_event.event
1243 {
1244 Ok(CreateRequest {
1245 created_at: execution_event.created_at,
1246 execution_id: execution_id.clone(),
1247 ffqn,
1248 params,
1249 parent,
1250 scheduled_at,
1251 component_id,
1252 deployment_id,
1253 metadata,
1254 scheduled_by,
1255 })
1256 } else {
1257 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1258 reason: "execution log must start with creation".into(),
1259 context: SpanTrace::capture(),
1260 source: None,
1261 loc: Location::caller(),
1262 }))
1263 }
1264 }
1265
1266 async fn get_pending_state(
1267 &self,
1268 execution_id: &ExecutionId,
1269 ) -> Result<ExecutionWithState, DbErrorRead>;
1270
1271 async fn get_expired_timers(
1273 &self,
1274 at: DateTime<Utc>,
1275 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1276
1277 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1279
1280 async fn subscribe_to_next_responses(
1287 &self,
1288 execution_id: &ExecutionId,
1289 last_response: ResponseCursor,
1290 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1291 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1292
1293 async fn wait_for_finished_result(
1300 &self,
1301 execution_id: &ExecutionId,
1302 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1303 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1304
1305 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1306
1307 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1308
1309 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1310
1311 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1312
1313 #[cfg(feature = "test")]
1315 async fn get_finished_result(
1316 &self,
1317 execution_id: &ExecutionId,
1318 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1319 self.wait_for_finished_result(
1320 execution_id,
1321 Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1322 )
1323 .await
1324 }
1325}
1326
1327#[derive(Clone, Debug)]
1328pub struct LogInfoAppendRow {
1329 pub execution_id: ExecutionId,
1330 pub run_id: RunId,
1331 pub log_entry: LogEntry,
1332}
1333
1334#[derive(Debug, Clone)]
1335pub struct LogEntryRow {
1336 pub cursor: u32,
1337 pub run_id: RunId,
1338 pub log_entry: LogEntry,
1339}
1340
1341#[derive(Debug, Clone)]
1342pub enum LogEntry {
1343 Log {
1344 created_at: DateTime<Utc>,
1345 level: LogLevel,
1346 message: String,
1347 },
1348 Stream {
1349 created_at: DateTime<Utc>,
1350 payload: Vec<u8>,
1351 stream_type: LogStreamType,
1352 },
1353}
1354impl LogEntry {
1355 #[must_use]
1356 pub fn created_at(&self) -> DateTime<Utc> {
1357 match self {
1358 LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1359 }
1360 }
1361}
1362
1363#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1364#[try_from(repr)]
1365#[repr(u8)]
1366pub enum LogLevel {
1367 Trace = 1,
1368 Debug,
1369 Info,
1370 Warn,
1371 Error,
1372}
1373#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1374#[try_from(repr)]
1375#[repr(u8)]
1376pub enum LogStreamType {
1377 StdOut = 1,
1378 StdErr,
1379}
1380
1381#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1382pub enum TimeoutOutcome {
1383 Timeout,
1384 Cancel,
1385}
1386
1387#[cfg(feature = "test")]
1388#[async_trait]
1389pub trait DbConnectionTest: DbConnection {
1390 async fn append_response(
1391 &self,
1392 created_at: DateTime<Utc>,
1393 execution_id: ExecutionId,
1394 response_event: JoinSetResponseEvent,
1395 ) -> Result<(), DbErrorWrite>;
1396}
1397
1398#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1399pub enum CancelOutcome {
1400 Cancelled,
1401 AlreadyFinished,
1402}
1403
1404#[instrument(skip(db_connection))]
1405pub async fn stub_execution(
1406 db_connection: &dyn DbConnection,
1407 execution_id: ExecutionIdDerived,
1408 parent_execution_id: ExecutionId,
1409 join_set_id: JoinSetId,
1410 created_at: DateTime<Utc>,
1411 return_value: SupportedFunctionReturnValue,
1412) -> Result<(), DbErrorWrite> {
1413 let stub_finished_version = Version::new(1); let write_attempt = {
1416 let finished_req = AppendRequest {
1417 created_at,
1418 event: ExecutionRequest::Finished {
1419 result: return_value.clone(),
1420 http_client_traces: None,
1421 },
1422 };
1423 db_connection
1424 .append_batch_respond_to_parent(
1425 AppendEventsToExecution {
1426 execution_id: ExecutionId::Derived(execution_id.clone()),
1427 version: stub_finished_version.clone(),
1428 batch: vec![finished_req],
1429 },
1430 AppendResponseToExecution {
1431 parent_execution_id,
1432 created_at,
1433 join_set_id,
1434 child_execution_id: execution_id.clone(),
1435 finished_version: stub_finished_version.clone(),
1436 result: return_value.clone(),
1437 },
1438 created_at,
1439 )
1440 .await
1441 };
1442 if let Err(write_attempt) = write_attempt {
1443 debug!("Stub write attempt failed - {write_attempt:?}");
1445
1446 let found = db_connection
1447 .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1448 .await?; match found.event {
1450 ExecutionRequest::Finished {
1451 result: found_result,
1452 ..
1453 } if return_value == found_result => {
1454 Ok(())
1456 }
1457 ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1458 DbErrorWriteNonRetriable::Conflict,
1459 )),
1460 _other => Err(DbErrorWrite::NonRetriable(
1461 DbErrorWriteNonRetriable::IllegalState {
1462 reason: "unexpected execution event at stubbed execution".into(),
1463 context: SpanTrace::capture(),
1464 source: None,
1465 loc: Location::caller(),
1466 },
1467 )),
1468 }
1469 } else {
1470 Ok(())
1471 }
1472}
1473
1474pub async fn cancel_delay(
1475 db_connection: &dyn DbConnection,
1476 delay_id: DelayId,
1477 created_at: DateTime<Utc>,
1478) -> Result<CancelOutcome, DbErrorWrite> {
1479 let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1480 db_connection
1481 .append_delay_response(
1482 created_at,
1483 parent_execution_id,
1484 join_set_id,
1485 delay_id,
1486 Err(()), )
1488 .await
1489 .map(|ok| match ok {
1490 AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1491 CancelOutcome::Cancelled
1492 }
1493 AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1494 })
1495}
1496
1497#[derive(Clone, Debug)]
1498pub enum BacktraceFilter {
1499 First,
1500 Last,
1501 Specific(Version),
1502}
1503
1504#[derive(Clone, Debug, PartialEq, Eq)]
1505pub struct BacktraceInfo {
1506 pub execution_id: ExecutionId,
1507 pub component_id: ComponentId,
1508 pub version_min_including: Version,
1509 pub version_max_excluding: Version,
1510 pub wasm_backtrace: WasmBacktrace,
1511}
1512
1513#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1514pub struct WasmBacktrace {
1515 pub frames: Vec<FrameInfo>,
1516}
1517
1518#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1519pub struct FrameInfo {
1520 pub module: String,
1521 pub func_name: String,
1522 pub symbols: Vec<FrameSymbol>,
1523}
1524
1525#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1526pub struct FrameSymbol {
1527 pub func_name: Option<String>,
1528 pub file: Option<String>,
1529 pub line: Option<u32>,
1530 pub col: Option<u32>,
1531}
1532
1533mod wasm_backtrace {
1534 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1535
1536 impl WasmBacktrace {
1537 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1538 if backtrace.frames().is_empty() {
1539 None
1540 } else {
1541 Some(Self {
1542 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1543 })
1544 }
1545 }
1546 }
1547
1548 impl From<&wasmtime::FrameInfo> for FrameInfo {
1549 fn from(frame: &wasmtime::FrameInfo) -> Self {
1550 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1551 let mut func_name = String::new();
1552 wasmtime_environ::demangle_function_name_or_index(
1553 &mut func_name,
1554 frame.func_name(),
1555 frame.func_index() as usize,
1556 )
1557 .expect("writing to string must succeed");
1558 Self {
1559 module: module_name,
1560 func_name,
1561 symbols: frame
1562 .symbols()
1563 .iter()
1564 .map(std::convert::Into::into)
1565 .collect(),
1566 }
1567 }
1568 }
1569
1570 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1571 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1572 let func_name = symbol.name().map(|name| {
1573 let mut writer = String::new();
1574 wasmtime_environ::demangle_function_name(&mut writer, name)
1575 .expect("writing to string must succeed");
1576 writer
1577 });
1578
1579 Self {
1580 func_name,
1581 file: symbol.file().map(ToString::to_string),
1582 line: symbol.line(),
1583 col: symbol.column(),
1584 }
1585 }
1586 }
1587}
1588#[derive(Debug, Clone, derive_more::Display)]
1589#[display("{execution_id} {pending_state} {component_digest}")]
1590pub struct ExecutionWithState {
1591 pub execution_id: ExecutionId,
1592 pub ffqn: FunctionFqn,
1593 pub pending_state: PendingState,
1594 pub created_at: DateTime<Utc>,
1595 pub first_scheduled_at: DateTime<Utc>,
1596 pub component_digest: InputContentDigest,
1597 pub component_type: ComponentType,
1598 pub deployment_id: DeploymentId,
1599}
1600
1601#[derive(Debug, Clone)]
1602pub enum ExecutionListPagination {
1603 CreatedBy(Pagination<Option<DateTime<Utc>>>),
1604 ExecutionId(Pagination<Option<ExecutionId>>),
1605}
1606impl Default for ExecutionListPagination {
1607 fn default() -> ExecutionListPagination {
1608 ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1609 length: 20,
1610 cursor: None,
1611 including_cursor: false, })
1613 }
1614}
1615impl ExecutionListPagination {
1616 #[must_use]
1617 pub fn length(&self) -> u16 {
1618 match self {
1619 ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1620 ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1621 }
1622 }
1623}
1624
1625#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1626pub enum Pagination<T> {
1627 NewerThan {
1628 length: u16,
1629 cursor: T,
1630 including_cursor: bool,
1631 },
1632 OlderThan {
1633 length: u16,
1634 cursor: T,
1635 including_cursor: bool,
1636 },
1637}
1638impl<T: Clone> Pagination<T> {
1639 pub fn length(&self) -> u16 {
1640 match self {
1641 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1642 }
1643 }
1644
1645 pub fn rel(&self) -> &'static str {
1646 match self {
1647 Pagination::NewerThan {
1648 including_cursor: false,
1649 ..
1650 } => ">",
1651 Pagination::NewerThan {
1652 including_cursor: true,
1653 ..
1654 } => ">=",
1655 Pagination::OlderThan {
1656 including_cursor: false,
1657 ..
1658 } => "<",
1659 Pagination::OlderThan {
1660 including_cursor: true,
1661 ..
1662 } => "<=",
1663 }
1664 }
1665
1666 pub fn is_desc(&self) -> bool {
1667 matches!(self, Pagination::OlderThan { .. })
1668 }
1669
1670 pub fn asc_or_desc(&self) -> &'static str {
1671 if self.is_asc() { "asc" } else { "desc" }
1672 }
1673
1674 pub fn is_asc(&self) -> bool {
1675 !self.is_desc()
1676 }
1677
1678 pub fn cursor(&self) -> &T {
1679 match self {
1680 Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1681 }
1682 }
1683
1684 #[must_use]
1685 pub fn invert(&self) -> Self {
1686 match self {
1687 Pagination::NewerThan {
1688 length,
1689 cursor,
1690 including_cursor,
1691 } => Pagination::OlderThan {
1692 length: *length,
1693 cursor: cursor.clone(),
1694 including_cursor: !including_cursor,
1695 },
1696 Pagination::OlderThan {
1697 length,
1698 cursor,
1699 including_cursor,
1700 } => Pagination::NewerThan {
1701 length: *length,
1702 cursor: cursor.clone(),
1703 including_cursor: !including_cursor,
1704 },
1705 }
1706 }
1707}
1708
1709#[cfg(feature = "test")]
1710pub async fn wait_for_pending_state_fn<T: Debug>(
1711 db_connection: &dyn DbConnectionTest,
1712 execution_id: &ExecutionId,
1713 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1714 timeout: Option<Duration>,
1715) -> Result<T, DbErrorReadWithTimeout> {
1716 tracing::trace!(%execution_id, "Waiting for predicate");
1717 let fut = async move {
1718 loop {
1719 let execution_log = db_connection.get(execution_id).await?;
1720 if let Some(t) = predicate(execution_log) {
1721 tracing::debug!(%execution_id, "Found: {t:?}");
1722 return Ok(t);
1723 }
1724 tokio::time::sleep(Duration::from_millis(10)).await;
1725 }
1726 };
1727
1728 if let Some(timeout) = timeout {
1729 tokio::select! { res = fut => res,
1731 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
1732 }
1733 } else {
1734 fut.await
1735 }
1736}
1737
1738#[derive(Debug, Clone, PartialEq, Eq)]
1739pub enum ExpiredTimer {
1740 Lock(ExpiredLock),
1741 Delay(ExpiredDelay),
1742}
1743
1744#[derive(Debug, Clone, PartialEq, Eq)]
1745pub struct ExpiredLock {
1746 pub execution_id: ExecutionId,
1747 pub locked_at_version: Version,
1749 pub next_version: Version,
1750 pub intermittent_event_count: u32,
1752 pub max_retries: Option<u32>,
1753 pub retry_exp_backoff: Duration,
1754 pub locked_by: LockedBy,
1755}
1756
1757#[derive(Debug, Clone, PartialEq, Eq)]
1758pub struct ExpiredDelay {
1759 pub execution_id: ExecutionId,
1760 pub join_set_id: JoinSetId,
1761 pub delay_id: DelayId,
1762}
1763
1764#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1765#[serde(tag = "status", rename_all = "snake_case")]
1766pub enum PendingState {
1767 Locked(PendingStateLocked),
1769
1770 #[display("PendingAt(`{_0}`)")]
1771 PendingAt(PendingStatePendingAt),
1772
1773 #[display("BlockedByJoinSet({_0})")]
1775 BlockedByJoinSet(PendingStateBlockedByJoinSet),
1776
1777 #[display("Paused({_0})")]
1778 Paused(PendingStatePaused),
1779
1780 #[display("Finished({_0})")]
1781 Finished(PendingStateFinished),
1782}
1783
1784pub enum PendingStateMergedPause {
1785 Locked {
1786 state: PendingStateLocked,
1787 paused: bool,
1788 },
1789 PendingAt {
1790 state: PendingStatePendingAt,
1791 paused: bool,
1792 },
1793 BlockedByJoinSet {
1794 state: PendingStateBlockedByJoinSet,
1795 paused: bool,
1796 },
1797 Finished(PendingStateFinished),
1798}
1799impl From<PendingState> for PendingStateMergedPause {
1800 fn from(state: PendingState) -> Self {
1801 match state {
1802 PendingState::Locked(s) => PendingStateMergedPause::Locked {
1803 state: s,
1804 paused: false,
1805 },
1806
1807 PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
1808 state: s,
1809 paused: false,
1810 },
1811
1812 PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
1813 state: s,
1814 paused: false,
1815 },
1816
1817 PendingState::Paused(paused) => match paused {
1818 PendingStatePaused::Locked(s) => PendingStateMergedPause::Locked {
1819 state: s,
1820 paused: true,
1821 },
1822 PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
1823 state: s,
1824 paused: true,
1825 },
1826 PendingStatePaused::BlockedByJoinSet(s) => {
1827 PendingStateMergedPause::BlockedByJoinSet {
1828 state: s,
1829 paused: true,
1830 }
1831 }
1832 },
1833
1834 PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
1835 }
1836 }
1837}
1838
1839#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1840#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1841pub struct PendingStateLocked {
1842 pub locked_by: LockedBy,
1843 pub lock_expires_at: DateTime<Utc>,
1844}
1845
1846#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1847#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
1848pub struct PendingStatePendingAt {
1849 pub scheduled_at: DateTime<Utc>,
1850 pub last_lock: Option<LockedBy>,
1852}
1853
1854#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1855#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
1856pub struct PendingStateBlockedByJoinSet {
1857 pub join_set_id: JoinSetId,
1858 pub lock_expires_at: DateTime<Utc>,
1860 pub closing: bool,
1862}
1863
1864#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1866pub enum PendingStatePaused {
1867 #[display("Locked({_0})")]
1868 Locked(PendingStateLocked),
1869 #[display("PendingAt({_0})")]
1870 PendingAt(PendingStatePendingAt),
1871 #[display("BlockedByJoinSet({_0})")]
1872 BlockedByJoinSet(PendingStateBlockedByJoinSet),
1873}
1874
1875#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1876pub struct LockedBy {
1877 pub executor_id: ExecutorId,
1878 pub run_id: RunId,
1879}
1880impl From<&Locked> for LockedBy {
1881 fn from(value: &Locked) -> Self {
1882 LockedBy {
1883 executor_id: value.executor_id,
1884 run_id: value.run_id,
1885 }
1886 }
1887}
1888
1889#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
1890#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
1891pub struct PendingStateFinished {
1892 pub version: VersionType, pub finished_at: DateTime<Utc>,
1894 pub result_kind: PendingStateFinishedResultKind,
1895}
1896impl Display for PendingStateFinished {
1897 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1898 match self.result_kind {
1899 PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1900 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1901 }
1902 }
1903}
1904
1905#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1907#[serde(rename_all = "snake_case")]
1908pub enum PendingStateFinishedResultKind {
1909 Ok,
1910 Err(PendingStateFinishedError),
1911}
1912impl PendingStateFinishedResultKind {
1913 pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1914 match self {
1915 PendingStateFinishedResultKind::Ok => Ok(()),
1916 PendingStateFinishedResultKind::Err(err) => Err(err),
1917 }
1918 }
1919}
1920
1921impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1922 fn from(result: &SupportedFunctionReturnValue) -> Self {
1923 result.as_pending_state_finished_result()
1924 }
1925}
1926
1927#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1928#[serde(rename_all = "snake_case")]
1929pub enum PendingStateFinishedError {
1930 #[display("execution terminated: {_0}")]
1931 ExecutionFailure(ExecutionFailureKind),
1932 #[display("execution completed with an error")]
1933 Error,
1934}
1935
1936impl PendingState {
1937 #[instrument(skip(self))]
1938 pub fn can_append_lock(
1939 &self,
1940 created_at: DateTime<Utc>,
1941 executor_id: ExecutorId,
1942 run_id: RunId,
1943 lock_expires_at: DateTime<Utc>,
1944 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1945 if lock_expires_at <= created_at {
1946 return Err(DbErrorWriteNonRetriable::ValidationFailed(
1947 "invalid expiry date".into(),
1948 ));
1949 }
1950 match self {
1951 PendingState::PendingAt(PendingStatePendingAt {
1952 scheduled_at,
1953 last_lock,
1954 }) => {
1955 if *scheduled_at <= created_at {
1956 Ok(LockKind::CreatingNewLock)
1958 } else if let Some(LockedBy {
1959 executor_id: last_executor_id,
1960 run_id: last_run_id,
1961 }) = last_lock
1962 && executor_id == *last_executor_id
1963 && run_id == *last_run_id
1964 {
1965 Ok(LockKind::Extending)
1967 } else {
1968 Err(DbErrorWriteNonRetriable::ValidationFailed(
1969 "cannot lock, not yet pending".into(),
1970 ))
1971 }
1972 }
1973 PendingState::Locked(PendingStateLocked {
1974 locked_by:
1975 LockedBy {
1976 executor_id: current_pending_state_executor_id,
1977 run_id: current_pending_state_run_id,
1978 },
1979 lock_expires_at: _,
1980 }) => {
1981 if executor_id == *current_pending_state_executor_id
1982 && run_id == *current_pending_state_run_id
1983 {
1984 Ok(LockKind::Extending)
1986 } else {
1987 Err(DbErrorWriteNonRetriable::IllegalState {
1988 reason: "cannot lock, already locked".into(),
1989 context: SpanTrace::capture(),
1990 source: None,
1991 loc: Location::caller(),
1992 })
1993 }
1994 }
1995 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
1996 reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1997 context: SpanTrace::capture(),
1998 source: None,
1999 loc: Location::caller(),
2000 }),
2001 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2002 reason: "already finished".into(),
2003 context: SpanTrace::capture(),
2004 source: None,
2005 loc: Location::caller(),
2006 }),
2007 PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
2008 reason: "cannot lock, execution is paused".into(),
2009 context: SpanTrace::capture(),
2010 source: None,
2011 loc: Location::caller(),
2012 }),
2013 }
2014 }
2015
2016 #[must_use]
2017 pub fn is_finished(&self) -> bool {
2018 matches!(self, PendingState::Finished { .. })
2019 }
2020
2021 #[must_use]
2022 pub fn is_paused(&self) -> bool {
2023 matches!(self, PendingState::Paused(_))
2024 }
2025}
2026
2027#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2028pub enum LockKind {
2029 Extending,
2030 CreatingNewLock,
2031}
2032
2033pub mod http_client_trace {
2034 use chrono::{DateTime, Utc};
2035 use serde::{Deserialize, Serialize};
2036
2037 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2038 pub struct HttpClientTrace {
2039 pub req: RequestTrace,
2040 pub resp: Option<ResponseTrace>,
2041 }
2042
2043 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2044 pub struct RequestTrace {
2045 pub sent_at: DateTime<Utc>,
2046 pub uri: String,
2047 pub method: String,
2048 }
2049
2050 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2051 pub struct ResponseTrace {
2052 pub finished_at: DateTime<Utc>,
2053 pub status: Result<u16, String>,
2054 }
2055}
2056
2057#[cfg(test)]
2058mod tests {
2059 use super::HistoryEvent;
2060 use super::HistoryEventScheduleAt;
2061 use super::JoinNextTryOutcome;
2062 use super::PendingStateFinished;
2063 use super::PendingStateFinishedError;
2064 use super::PendingStateFinishedResultKind;
2065 use crate::ExecutionFailureKind;
2066 use crate::JoinSetId;
2067 use crate::SupportedFunctionReturnValue;
2068 use chrono::DateTime;
2069 use chrono::Datelike;
2070 use chrono::Utc;
2071 use insta::assert_snapshot;
2072 use rstest::rstest;
2073 use std::time::Duration;
2074 use val_json::type_wrapper::TypeWrapper;
2075 use val_json::wast_val::WastVal;
2076 use val_json::wast_val::WastValWithType;
2077
2078 #[rstest(expected => [
2079 PendingStateFinishedResultKind::Ok,
2080 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2081 ])]
2082 #[test]
2083 fn serde_pending_state_finished_result_kind_should_work(
2084 expected: PendingStateFinishedResultKind,
2085 ) {
2086 let ser = serde_json::to_string(&expected).unwrap();
2087 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2088 assert_eq!(expected, actual);
2089 }
2090
2091 #[rstest(result_kind => [
2092 PendingStateFinishedResultKind::Ok,
2093 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2094 ])]
2095 #[test]
2096 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2097 let expected = PendingStateFinished {
2098 version: 0,
2099 finished_at: Utc::now(),
2100 result_kind,
2101 };
2102
2103 let ser = serde_json::to_string(&expected).unwrap();
2104 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2105 assert_eq!(expected, actual);
2106 }
2107
2108 #[test]
2109 fn join_set_deser_with_result_ok_option_none_should_work() {
2110 let expected = SupportedFunctionReturnValue::Ok {
2111 ok: Some(WastValWithType {
2112 r#type: TypeWrapper::Result {
2113 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2114 err: Some(Box::new(TypeWrapper::String)),
2115 },
2116 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2117 }),
2118 };
2119 let json = serde_json::to_string(&expected).unwrap();
2120 assert_snapshot!(json);
2121
2122 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2123
2124 assert_eq!(expected, actual);
2125 }
2126
2127 #[test]
2128 fn as_date_time_should_work_with_duration_u32_max_secs() {
2129 let duration = Duration::from_secs(u64::from(u32::MAX));
2130 let schedule_at = HistoryEventScheduleAt::In(duration);
2131 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2132 assert_eq!(2106, resolved.year());
2133 }
2134
2135 const MILLIS_PER_SEC: i64 = 1000;
2136 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2137
2138 #[test]
2139 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2140 let duration = Duration::from_secs(
2142 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2143 );
2144 let schedule_at = HistoryEventScheduleAt::In(duration);
2145 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2146 }
2147
2148 #[test]
2149 fn join_next_try_outcome_new_format() {
2150 let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"found"}"#;
2151 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2152 assert_eq!(
2153 event,
2154 HistoryEvent::JoinNextTry {
2155 join_set_id: JoinSetId::new(
2156 crate::JoinSetKind::Named,
2157 crate::StrVariant::Static("test")
2158 )
2159 .unwrap(),
2160 outcome: JoinNextTryOutcome::Found,
2161 }
2162 );
2163
2164 let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"all_processed"}"#;
2165 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2166 assert_eq!(
2167 event,
2168 HistoryEvent::JoinNextTry {
2169 join_set_id: JoinSetId::new(
2170 crate::JoinSetKind::Named,
2171 crate::StrVariant::Static("test")
2172 )
2173 .unwrap(),
2174 outcome: JoinNextTryOutcome::AllProcessed,
2175 }
2176 );
2177 }
2178
2179 #[test]
2180 fn join_next_try_outcome_old_format_compat() {
2181 let json = r#"{"type":"join_next_try","join_set_id":"n:test","found_response":true}"#;
2183 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2184 assert_eq!(
2185 event,
2186 HistoryEvent::JoinNextTry {
2187 join_set_id: JoinSetId::new(
2188 crate::JoinSetKind::Named,
2189 crate::StrVariant::Static("test")
2190 )
2191 .unwrap(),
2192 outcome: JoinNextTryOutcome::Found,
2193 }
2194 );
2195
2196 let json = r#"{"type":"join_next_try","join_set_id":"n:test","found_response":false}"#;
2198 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2199 assert_eq!(
2200 event,
2201 HistoryEvent::JoinNextTry {
2202 join_set_id: JoinSetId::new(
2203 crate::JoinSetKind::Named,
2204 crate::StrVariant::Static("test")
2205 )
2206 .unwrap(),
2207 outcome: JoinNextTryOutcome::Pending,
2208 }
2209 );
2210 }
2211
2212 #[test]
2213 fn join_next_try_outcome_serializes_new_format() {
2214 let event = HistoryEvent::JoinNextTry {
2215 join_set_id: JoinSetId::new(
2216 crate::JoinSetKind::Named,
2217 crate::StrVariant::Static("test"),
2218 )
2219 .unwrap(),
2220 outcome: JoinNextTryOutcome::AllProcessed,
2221 };
2222 let json = serde_json::to_string(&event).unwrap();
2223 assert!(
2224 json.contains(r#""outcome":"all_processed""#),
2225 "expected outcome field, got: {json}"
2226 );
2227 assert!(
2228 !json.contains("found_response"),
2229 "should not contain old field, got: {json}"
2230 );
2231 }
2232}