Skip to main content

obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionError;
8use crate::FunctionFqn;
9use crate::JoinSetId;
10use crate::Params;
11use crate::StrVariant;
12use crate::SupportedFunctionReturnValue;
13use crate::component_id::InputContentDigest;
14use crate::prefixed_ulid::DelayId;
15use crate::prefixed_ulid::DeploymentId;
16use crate::prefixed_ulid::ExecutionIdDerived;
17use crate::prefixed_ulid::ExecutorId;
18use crate::prefixed_ulid::RunId;
19use assert_matches::assert_matches;
20use async_trait::async_trait;
21use chrono::TimeDelta;
22use chrono::{DateTime, Utc};
23use http_client_trace::HttpClientTrace;
24use serde::Deserialize;
25use serde::Serialize;
26use std::fmt::Debug;
27use std::fmt::Display;
28use std::panic::Location;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33use tracing::instrument;
34use tracing_error::SpanTrace;
35
36// Shared between databases. TODO: Extract to db-common
37pub const STATE_PENDING_AT: &str = "pending_at";
38pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
39pub const STATE_LOCKED: &str = "locked";
40pub const STATE_FINISHED: &str = "finished";
41pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; // Serialization tag of `HistoryEvent::JoinNext`
42
43#[derive(Debug, PartialEq, Eq, Clone)]
44pub struct ExecutionLog {
45    pub execution_id: ExecutionId,
46    pub events: Vec<ExecutionEvent>,
47    pub responses: Vec<ResponseWithCursor>,
48    pub next_version: Version, // Is not advanced once in Finished state
49    pub pending_state: PendingState, // reflecting the current state
50    pub component_digest: InputContentDigest, // reflecting the current state
51    pub component_type: ComponentType,
52    pub deployment_id: DeploymentId, // reflecting the current state
53}
54
55impl ExecutionLog {
56    /// Return some duration after which the execution will be retried.
57    /// Return `None` if no more retries are allowed.
58    #[must_use]
59    pub fn can_be_retried_after(
60        temporary_event_count: u32,
61        max_retries: Option<u32>,
62        retry_exp_backoff: Duration,
63    ) -> Option<Duration> {
64        // If max_retries == None, wrapping is OK after this succeeds - we want to retry forever.
65        if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
66            // TODO: Add test for number of retries
67            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
68            Some(duration)
69        } else {
70            None
71        }
72    }
73
74    #[must_use]
75    pub fn compute_retry_duration_when_retrying_forever(
76        temporary_event_count: u32,
77        retry_exp_backoff: Duration,
78    ) -> Duration {
79        Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
80            .expect("`max_retries` set to MAX must never return None")
81    }
82
83    #[must_use]
84    pub fn ffqn(&self) -> &FunctionFqn {
85        assert_matches!(self.events.first(), Some(ExecutionEvent {
86            event: ExecutionRequest::Created { ffqn, .. },
87            ..
88        }) => ffqn)
89    }
90
91    #[must_use]
92    pub fn params(&self) -> &Params {
93        assert_matches!(self.events.first(), Some(ExecutionEvent {
94            event: ExecutionRequest::Created { params, .. },
95            ..
96        }) => params)
97    }
98
99    #[must_use]
100    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
101        assert_matches!(self.events.first(), Some(ExecutionEvent {
102            event: ExecutionRequest::Created { parent, .. },
103            ..
104        }) => parent.clone())
105    }
106
107    #[must_use]
108    pub fn last_event(&self) -> &ExecutionEvent {
109        self.events.last().expect("must contain at least one event")
110    }
111
112    #[must_use]
113    pub fn is_finished(&self) -> bool {
114        matches!(
115            self.events.last(),
116            Some(ExecutionEvent {
117                event: ExecutionRequest::Finished { .. },
118                ..
119            })
120        )
121    }
122
123    #[must_use]
124    pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
125        if let ExecutionEvent {
126            event: ExecutionRequest::Finished { result, .. },
127            ..
128        } = self.events.last().expect("must contain at least one event")
129        {
130            Some(result.clone())
131        } else {
132            None
133        }
134    }
135
136    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
137        self.events.iter().filter_map(|event| {
138            if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
139                Some((eh.clone(), event.version.clone()))
140            } else {
141                None
142            }
143        })
144    }
145
146    #[cfg(feature = "test")]
147    #[must_use]
148    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
149        self.events
150            .iter()
151            .find_map(move |event| match &event.event {
152                ExecutionRequest::HistoryEvent {
153                    event:
154                        HistoryEvent::JoinSetRequest {
155                            join_set_id: found,
156                            request,
157                        },
158                    ..
159                } if *join_set_id == *found => Some(request),
160                _ => None,
161            })
162    }
163}
164
165pub type VersionType = u32;
166#[derive(
167    Debug,
168    Default,
169    Clone,
170    PartialEq,
171    Eq,
172    Hash,
173    derive_more::Display,
174    derive_more::Into,
175    serde::Serialize,
176    serde::Deserialize,
177)]
178#[serde(transparent)]
179pub struct Version(pub VersionType);
180impl Version {
181    #[must_use]
182    pub fn new(arg: VersionType) -> Version {
183        Version(arg)
184    }
185
186    #[must_use]
187    pub fn increment(&self) -> Version {
188        Version(self.0 + 1)
189    }
190}
191impl TryFrom<i64> for Version {
192    type Error = VersionParseError;
193    fn try_from(value: i64) -> Result<Self, Self::Error> {
194        VersionType::try_from(value)
195            .map(Version::new)
196            .map_err(|_| VersionParseError)
197    }
198}
199impl From<Version> for usize {
200    fn from(value: Version) -> Self {
201        usize::try_from(value.0).expect("16 bit systems are unsupported")
202    }
203}
204impl From<&Version> for usize {
205    fn from(value: &Version) -> Self {
206        usize::try_from(value.0).expect("16 bit systems are unsupported")
207    }
208}
209
210#[derive(Debug, thiserror::Error)]
211#[error("version must be u32")]
212pub struct VersionParseError;
213
214#[derive(
215    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
216)]
217#[display("{event}")]
218pub struct ExecutionEvent {
219    pub created_at: DateTime<Utc>,
220    pub event: ExecutionRequest,
221    #[serde(skip_serializing_if = "Option::is_none")]
222    pub backtrace_id: Option<Version>,
223    pub version: Version,
224}
225
226#[derive(
227    Debug,
228    Clone,
229    Copy,
230    PartialEq,
231    Eq,
232    derive_more::Display,
233    derive_more::Into,
234    Serialize, /* webapi */
235)]
236pub struct ResponseCursor(pub u32);
237
238#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */)]
239pub struct ResponseWithCursor {
240    pub event: JoinSetResponseEventOuter,
241    pub cursor: ResponseCursor,
242}
243
244#[derive(Debug)]
245pub struct ListExecutionEventsResponse {
246    pub events: Vec<ExecutionEvent>,
247    pub max_version: Version,
248}
249
250#[derive(Debug)]
251pub struct ListResponsesResponse {
252    pub responses: Vec<ResponseWithCursor>,
253    pub max_cursor: ResponseCursor,
254}
255
256#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */)]
257pub struct JoinSetResponseEventOuter {
258    pub created_at: DateTime<Utc>,
259    pub event: JoinSetResponseEvent,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
263pub struct JoinSetResponseEvent {
264    pub join_set_id: JoinSetId,
265    pub event: JoinSetResponse,
266}
267
268#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
269#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
270#[serde(tag = "type", rename_all = "snake_case")]
271pub enum JoinSetResponse {
272    #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
273    DelayFinished {
274        delay_id: DelayId,
275        result: Result<(), ()>,
276    },
277    #[display("{result}: {child_execution_id}")] // execution completed..
278    ChildExecutionFinished {
279        child_execution_id: ExecutionIdDerived,
280        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
281        finished_version: Version,
282        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
283        result: SupportedFunctionReturnValue,
284    },
285}
286
287pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
288    ffqn: FunctionFqn::new_static("", ""),
289    params: Params::empty(),
290    parent: None,
291    scheduled_at: DateTime::from_timestamp_nanos(0),
292    component_id: ComponentId::dummy_activity(),
293    deployment_id: DeploymentId::from_parts(0, 0),
294    metadata: ExecutionMetadata::empty(),
295    scheduled_by: None,
296};
297pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
298    event: HistoryEvent::JoinSetCreate {
299        join_set_id: JoinSetId {
300            kind: crate::JoinSetKind::OneOff,
301            name: StrVariant::empty(),
302        },
303    },
304};
305
306#[derive(
307    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
308)]
309#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
310#[serde(rename_all = "snake_case")]
311pub enum ExecutionRequest {
312    #[display("Created({ffqn}, `{scheduled_at}`)")]
313    Created {
314        ffqn: FunctionFqn,
315        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
316        #[debug(skip)]
317        params: Params,
318        parent: Option<(ExecutionId, JoinSetId)>,
319        scheduled_at: DateTime<Utc>,
320        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
321        component_id: ComponentId,
322        deployment_id: DeploymentId,
323        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
324        metadata: ExecutionMetadata,
325        scheduled_by: Option<ExecutionId>,
326    },
327    Locked(Locked),
328    /// Returns execution to [`PendingState::PendingAt`] state at the specified time.
329    /// This can happen when:
330    /// - executor is running out of resources like [`WorkerError::LimitReached`]
331    /// - executor is shutting down
332    #[display("Unlocked(`{backoff_expires_at}`)")]
333    Unlocked {
334        backoff_expires_at: DateTime<Utc>,
335        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
336        reason: StrVariant,
337    },
338    // Created by the executor holding the lock.
339    // After expiry interpreted as pending.
340    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
341    TemporarilyFailed {
342        backoff_expires_at: DateTime<Utc>,
343        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
344        reason: StrVariant,
345        detail: Option<String>,
346        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
347        http_client_traces: Option<Vec<HttpClientTrace>>,
348    },
349    // Created by the executor holding the lock.
350    // After expiry interpreted as pending.
351    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
352    TemporarilyTimedOut {
353        backoff_expires_at: DateTime<Utc>,
354        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
355        http_client_traces: Option<Vec<HttpClientTrace>>,
356    },
357    // Created by the executor holding the lock.
358    #[display("Finished")]
359    Finished {
360        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
361        result: SupportedFunctionReturnValue,
362        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
363        http_client_traces: Option<Vec<HttpClientTrace>>,
364    },
365
366    #[display("HistoryEvent({event})")]
367    HistoryEvent {
368        event: HistoryEvent,
369    },
370    #[display("Paused")]
371    Paused,
372    #[display("Unpaused")]
373    Unpaused,
374}
375
376impl ExecutionRequest {
377    #[must_use]
378    pub fn is_temporary_event(&self) -> bool {
379        matches!(
380            self,
381            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
382        )
383    }
384
385    /// String representation of `ExecutionRequest`, used in execution log table to fetch events of certain type, e.g. `created` + `history_event`.
386    #[must_use]
387    pub const fn variant(&self) -> &'static str {
388        match self {
389            ExecutionRequest::Created { .. } => "created",
390            ExecutionRequest::Locked(_) => "locked",
391            ExecutionRequest::Unlocked { .. } => "unlocked",
392            ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
393            ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
394            ExecutionRequest::Finished { .. } => "finished",
395            ExecutionRequest::HistoryEvent { .. } => "history_event",
396            ExecutionRequest::Paused => "paused",
397            ExecutionRequest::Unpaused => "unpaused",
398        }
399    }
400
401    #[must_use]
402    pub fn join_set_id(&self) -> Option<&JoinSetId> {
403        match self {
404            Self::Created {
405                parent: Some((_parent_id, join_set_id)),
406                ..
407            } => Some(join_set_id),
408            Self::HistoryEvent {
409                event:
410                    HistoryEvent::JoinSetCreate { join_set_id, .. }
411                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
412                    | HistoryEvent::JoinNext { join_set_id, .. },
413            } => Some(join_set_id),
414            _ => None,
415        }
416    }
417}
418
419#[derive(
420    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
421)]
422#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
423#[display("Locked(`{lock_expires_at}`, {component_id})")]
424pub struct Locked {
425    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
426    pub component_id: ComponentId,
427    pub executor_id: ExecutorId,
428    pub deployment_id: DeploymentId,
429    pub run_id: RunId,
430    pub lock_expires_at: DateTime<Utc>,
431    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
432    pub retry_config: ComponentRetryConfig,
433}
434
435#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
436#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
437#[serde(tag = "type", rename_all = "snake_case")]
438pub enum PersistKind {
439    #[display("RandomU64({min}, {max_inclusive})")]
440    RandomU64 { min: u64, max_inclusive: u64 },
441    #[display("RandomString({min_length}, {max_length_exclusive})")]
442    RandomString {
443        min_length: u64,
444        max_length_exclusive: u64,
445    },
446}
447
448#[must_use]
449pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
450    value.to_be_bytes()
451}
452
453#[must_use]
454pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
455    u64::from_be_bytes(bytes)
456}
457
458#[derive(
459    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
460)]
461#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
462#[serde(tag = "type", rename_all = "snake_case")]
463/// Must be created by the executor in [`PendingState::Locked`].
464pub enum HistoryEvent {
465    /// Persist a generated pseudorandom value.
466    #[display("Persist")]
467    Persist {
468        #[debug(skip)]
469        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
470        kind: PersistKind,
471    },
472    #[display("JoinSetCreate({join_set_id})")]
473    JoinSetCreate { join_set_id: JoinSetId },
474    #[display("JoinSetRequest({request})")]
475    // join_set_id is part of ExecutionId or DelayId in the `request`
476    JoinSetRequest {
477        join_set_id: JoinSetId,
478        request: JoinSetRequest,
479    },
480    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
481    /// When the response arrives at `resp_time`:
482    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
483    /// original executor can continue. After the expiry any executor can continue without
484    /// marking the execution as timed out.
485    #[display("JoinNext({join_set_id})")]
486    JoinNext {
487        join_set_id: JoinSetId,
488        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
489        /// The pending status will be kept in Locked state until `run_expires_at`.
490        run_expires_at: DateTime<Utc>,
491        /// Set to a specific function when calling `-await-next` extension function, used for
492        /// determinism checks.
493        requested_ffqn: Option<FunctionFqn>,
494        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
495        closing: bool,
496    },
497    /// Attempt to process next response without changing the pending state.
498    #[display("JoinNextTry({join_set_id})")]
499    JoinNextTry {
500        join_set_id: JoinSetId,
501        found_response: bool, // False means `JoinNextTryError::Pending` or `JoinNextTryError::AllProcessed` based on previous events.
502    },
503    /// Records the fact that a join set was awaited more times than its submission count.
504    #[display("JoinNextTooMany({join_set_id})")]
505    JoinNextTooMany {
506        join_set_id: JoinSetId,
507        /// Set to a specific function when calling `-await-next` extension function, used for
508        /// determinism checks.
509        requested_ffqn: Option<FunctionFqn>,
510    },
511    #[display("Schedule({execution_id}, {schedule_at})")]
512    Schedule {
513        execution_id: ExecutionId,
514        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
515    },
516    #[display("Stub({target_execution_id})")]
517    Stub {
518        target_execution_id: ExecutionIdDerived,
519        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
520        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
521        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
522    },
523}
524
525#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
526#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
527#[serde(rename_all = "snake_case")]
528pub enum HistoryEventScheduleAt {
529    Now,
530    #[display("At(`{_0}`)")]
531    At(DateTime<Utc>),
532    #[display("In({_0:?})")]
533    In(Duration),
534}
535
536#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
537pub enum ScheduleAtConversionError {
538    #[error("source duration value is out of range")]
539    OutOfRangeError,
540}
541
542impl HistoryEventScheduleAt {
543    pub fn as_date_time(
544        &self,
545        now: DateTime<Utc>,
546    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
547        match self {
548            Self::Now => Ok(now),
549            Self::At(date_time) => Ok(*date_time),
550            Self::In(duration) => {
551                let time_delta = TimeDelta::from_std(*duration)
552                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
553                now.checked_add_signed(time_delta)
554                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
555            }
556        }
557    }
558}
559
560#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
561#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
562#[serde(tag = "type", rename_all = "snake_case")]
563pub enum JoinSetRequest {
564    // Must be created by the executor in `PendingState::Locked`.
565    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
566    DelayRequest {
567        delay_id: DelayId,
568        expires_at: DateTime<Utc>,
569        schedule_at: HistoryEventScheduleAt,
570    },
571    // Must be created by the executor in `PendingState::Locked`.
572    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
573    ChildExecutionRequest {
574        child_execution_id: ExecutionIdDerived,
575        target_ffqn: FunctionFqn,
576        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
577        params: Params,
578    },
579}
580
581/// Error that is not specific to an execution.
582#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
583pub enum DbErrorGeneric {
584    #[error("database error: {reason}")]
585    Uncategorized {
586        reason: StrVariant,
587        #[eq(skip)]
588        #[partial_eq(skip)]
589        context: SpanTrace,
590        #[eq(skip)]
591        #[partial_eq(skip)]
592        #[source]
593        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
594        loc: &'static Location<'static>,
595    },
596    #[error("database was closed")]
597    Close,
598}
599
600#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
601pub enum DbErrorWriteNonRetriable {
602    #[error("validation failed: {0}")]
603    ValidationFailed(StrVariant),
604    #[error("conflict")]
605    Conflict,
606    #[error("illegal state: {reason}")]
607    IllegalState {
608        reason: StrVariant,
609        #[eq(skip)]
610        #[partial_eq(skip)]
611        context: SpanTrace,
612        #[eq(skip)]
613        #[partial_eq(skip)]
614        #[source]
615        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
616        loc: &'static Location<'static>,
617    },
618    #[error("version conflict: expected: {expected}, got: {requested}")]
619    VersionConflict {
620        expected: Version,
621        requested: Version,
622    },
623}
624
625/// Write error tied to an execution
626#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
627pub enum DbErrorWrite {
628    #[error("cannot write - row not found")]
629    NotFound,
630    #[error("non-retriable error: {0}")]
631    NonRetriable(#[from] DbErrorWriteNonRetriable),
632    #[error(transparent)]
633    Generic(#[from] DbErrorGeneric),
634}
635
636/// Read error tied to an execution
637#[derive(Debug, Clone, thiserror::Error, PartialEq)]
638pub enum DbErrorRead {
639    #[error("cannot read - row not found")]
640    NotFound,
641    #[error(transparent)]
642    Generic(#[from] DbErrorGeneric),
643}
644
645#[derive(Debug, thiserror::Error, PartialEq)]
646pub enum DbErrorReadWithTimeout {
647    #[error("timeout")]
648    Timeout(TimeoutOutcome),
649    #[error(transparent)]
650    DbErrorRead(#[from] DbErrorRead),
651}
652
653// Represents next version after successfuly appended to execution log.
654// TODO: Convert to struct with next_version
655pub type AppendResponse = Version;
656pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
657
658#[derive(Debug, Clone)]
659pub struct LockedExecution {
660    pub execution_id: ExecutionId,
661    pub next_version: Version,
662    pub metadata: ExecutionMetadata,
663    pub locked_event: Locked,
664    pub ffqn: FunctionFqn,
665    pub params: Params,
666    pub event_history: Vec<(HistoryEvent, Version)>,
667    pub responses: Vec<ResponseWithCursor>,
668    pub parent: Option<(ExecutionId, JoinSetId)>,
669    pub intermittent_event_count: u32,
670}
671
672pub type LockPendingResponse = Vec<LockedExecution>;
673pub type AppendBatchResponse = Version;
674
675#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
676#[display("{event}")]
677pub struct AppendRequest {
678    pub created_at: DateTime<Utc>,
679    pub event: ExecutionRequest,
680}
681
682#[derive(Debug, Clone)]
683pub struct CreateRequest {
684    pub created_at: DateTime<Utc>,
685    pub execution_id: ExecutionId,
686    pub ffqn: FunctionFqn,
687    pub params: Params,
688    pub parent: Option<(ExecutionId, JoinSetId)>,
689    pub scheduled_at: DateTime<Utc>,
690    pub component_id: ComponentId,
691    pub deployment_id: DeploymentId,
692    pub metadata: ExecutionMetadata,
693    pub scheduled_by: Option<ExecutionId>,
694}
695
696impl From<CreateRequest> for ExecutionRequest {
697    fn from(value: CreateRequest) -> Self {
698        Self::Created {
699            ffqn: value.ffqn,
700            params: value.params,
701            parent: value.parent,
702            scheduled_at: value.scheduled_at,
703            component_id: value.component_id,
704            deployment_id: value.deployment_id,
705            metadata: value.metadata,
706            scheduled_by: value.scheduled_by,
707        }
708    }
709}
710
711#[async_trait]
712pub trait DbPool: Send + Sync {
713    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
714
715    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
716
717    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
718
719    #[cfg(feature = "test")]
720    async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
721}
722
723#[async_trait]
724pub trait DbPoolCloseable {
725    async fn close(&self);
726}
727
728#[derive(Clone, Debug)]
729pub struct AppendEventsToExecution {
730    pub execution_id: ExecutionId,
731    pub version: Version,
732    pub batch: Vec<AppendRequest>,
733}
734
735#[derive(Clone, Debug)]
736pub struct AppendResponseToExecution {
737    pub parent_execution_id: ExecutionId,
738    pub created_at: DateTime<Utc>,
739    pub join_set_id: JoinSetId,
740    pub child_execution_id: ExecutionIdDerived,
741    pub finished_version: Version,
742    pub result: SupportedFunctionReturnValue,
743}
744
745#[async_trait]
746pub trait DbExecutor: Send + Sync {
747    #[expect(clippy::too_many_arguments)]
748    async fn lock_pending_by_ffqns(
749        &self,
750        batch_size: u32,
751        pending_at_or_sooner: DateTime<Utc>,
752        ffqns: Arc<[FunctionFqn]>,
753        created_at: DateTime<Utc>,
754        component_id: ComponentId,
755        deployment_id: DeploymentId,
756        executor_id: ExecutorId,
757        lock_expires_at: DateTime<Utc>,
758        run_id: RunId,
759        retry_config: ComponentRetryConfig,
760    ) -> Result<LockPendingResponse, DbErrorWrite>;
761
762    #[expect(clippy::too_many_arguments)]
763    async fn lock_pending_by_component_digest(
764        &self,
765        batch_size: u32,
766        pending_at_or_sooner: DateTime<Utc>,
767        component_id: &ComponentId,
768        deployment_id: DeploymentId,
769        created_at: DateTime<Utc>,
770        executor_id: ExecutorId,
771        lock_expires_at: DateTime<Utc>,
772        run_id: RunId,
773        retry_config: ComponentRetryConfig,
774    ) -> Result<LockPendingResponse, DbErrorWrite>;
775
776    #[cfg(feature = "test")]
777    #[expect(clippy::too_many_arguments)]
778    async fn lock_one(
779        &self,
780        created_at: DateTime<Utc>,
781        component_id: ComponentId,
782        deployment_id: DeploymentId,
783        execution_id: &ExecutionId,
784        run_id: RunId,
785        version: Version,
786        executor_id: ExecutorId,
787        lock_expires_at: DateTime<Utc>,
788        retry_config: ComponentRetryConfig,
789    ) -> Result<LockedExecution, DbErrorWrite>;
790
791    /// Append a single event to an existing execution log.
792    /// The request cannot contain `ExecutionEventInner::Created`.
793    async fn append(
794        &self,
795        execution_id: ExecutionId,
796        version: Version,
797        req: AppendRequest,
798    ) -> Result<AppendResponse, DbErrorWrite>;
799
800    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
801    /// The batch cannot contain `ExecutionEventInner::Created`.
802    async fn append_batch_respond_to_parent(
803        &self,
804        events: AppendEventsToExecution,
805        response: AppendResponseToExecution,
806        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
807    ) -> Result<AppendBatchResponse, DbErrorWrite>;
808
809    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
810    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
811    /// Otherwise wait until `timeout_fut` resolves.
812    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
813    async fn wait_for_pending_by_ffqn(
814        &self,
815        pending_at_or_sooner: DateTime<Utc>,
816        ffqns: Arc<[FunctionFqn]>,
817        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
818    );
819
820    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
821    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
822    /// Otherwise wait until `timeout_fut` resolves.
823    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
824    async fn wait_for_pending_by_component_digest(
825        &self,
826        pending_at_or_sooner: DateTime<Utc>,
827        component_digest: &InputContentDigest,
828        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
829    );
830
831    async fn cancel_activity_with_retries(
832        &self,
833        execution_id: &ExecutionId,
834        cancelled_at: DateTime<Utc>,
835    ) -> Result<CancelOutcome, DbErrorWrite> {
836        let mut retries = 5;
837        loop {
838            let res = self.cancel_activity(execution_id, cancelled_at).await;
839            if res.is_ok() || retries == 0 {
840                return res;
841            }
842            retries -= 1;
843        }
844    }
845
846    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
847    async fn get_last_execution_event(
848        &self,
849        execution_id: &ExecutionId,
850    ) -> Result<ExecutionEvent, DbErrorRead>;
851
852    async fn cancel_activity(
853        &self,
854        execution_id: &ExecutionId,
855        cancelled_at: DateTime<Utc>,
856    ) -> Result<CancelOutcome, DbErrorWrite> {
857        debug!("Determining cancellation state of {execution_id}");
858
859        let last_event = self
860            .get_last_execution_event(execution_id)
861            .await
862            .map_err(DbErrorWrite::from)?;
863        if let ExecutionRequest::Finished {
864            result:
865                SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
866                    kind: ExecutionFailureKind::Cancelled,
867                    ..
868                }),
869            ..
870        } = last_event.event
871        {
872            return Ok(CancelOutcome::Cancelled);
873        } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
874            debug!("Not cancelling, {execution_id} is already finished");
875            return Ok(CancelOutcome::AlreadyFinished);
876        }
877        let finished_version = last_event.version.increment();
878        let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
879            reason: None,
880            kind: ExecutionFailureKind::Cancelled,
881            detail: None,
882        });
883        let cancel_request = AppendRequest {
884            created_at: cancelled_at,
885            event: ExecutionRequest::Finished {
886                result: child_result.clone(),
887                http_client_traces: None,
888            },
889        };
890        debug!("Cancelling activity {execution_id} at {finished_version}");
891        if let ExecutionId::Derived(execution_id) = execution_id {
892            let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
893            let child_execution_id = ExecutionId::Derived(execution_id.clone());
894            self.append_batch_respond_to_parent(
895                AppendEventsToExecution {
896                    execution_id: child_execution_id,
897                    version: finished_version.clone(),
898                    batch: vec![cancel_request],
899                },
900                AppendResponseToExecution {
901                    parent_execution_id,
902                    created_at: cancelled_at,
903                    join_set_id: join_set_id.clone(),
904                    child_execution_id: execution_id.clone(),
905                    finished_version,
906                    result: child_result,
907                },
908                cancelled_at,
909            )
910            .await?;
911        } else {
912            self.append(execution_id.clone(), finished_version, cancel_request)
913                .await?;
914        }
915        debug!("Cancelled {execution_id}");
916        Ok(CancelOutcome::Cancelled)
917    }
918}
919
920pub enum AppendDelayResponseOutcome {
921    Success,
922    AlreadyFinished,
923    AlreadyCancelled,
924}
925
926#[derive(Debug, Clone, Default)]
927pub struct ListExecutionsFilter {
928    pub ffqn_prefix: Option<String>,
929    pub show_derived: bool,
930    pub hide_finished: bool,
931    pub execution_id_prefix: Option<String>,
932    pub component_digest: Option<InputContentDigest>,
933    pub deployment_id: Option<DeploymentId>,
934}
935
936#[async_trait]
937pub trait DbExternalApi: DbConnection {
938    /// Get the latest backtrace if version is not set.
939    async fn get_backtrace(
940        &self,
941        execution_id: &ExecutionId,
942        filter: BacktraceFilter,
943    ) -> Result<BacktraceInfo, DbErrorRead>;
944
945    /// Returns executions sorted in descending order.
946    async fn list_executions(
947        &self,
948        filter: ListExecutionsFilter,
949        pagination: ExecutionListPagination,
950    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
951
952    /// Returns execution events for the given execution.
953    ///
954    /// Results are always ordered from oldest to newest (ascending by version),
955    /// regardless of pagination direction.
956    async fn list_execution_events(
957        &self,
958        execution_id: &ExecutionId,
959        pagination: Pagination<VersionType>,
960        include_backtrace_id: bool,
961    ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
962
963    /// Returns responses of an execution ordered as they arrived,
964    /// enabling matching each `JoinNext` to its corresponding response.
965    ///
966    /// Results are always ordered from oldest to newest (ascending by cursor),
967    /// regardless of pagination direction.
968    ///
969    /// As an optimization, the implementation can return an empty list of `responses`
970    /// and `max_cursor` set to 0 if the execution is not found.
971    async fn list_responses(
972        &self,
973        execution_id: &ExecutionId,
974        pagination: Pagination<u32>,
975    ) -> Result<ListResponsesResponse, DbErrorRead>;
976
977    async fn list_execution_events_responses(
978        &self,
979        execution_id: &ExecutionId,
980        req_since: &Version,
981        req_max_length: VersionType,
982        req_include_backtrace_id: bool,
983        resp_pagination: Pagination<VersionType>,
984    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
985
986    async fn upgrade_execution_component(
987        &self,
988        execution_id: &ExecutionId,
989        old: &InputContentDigest,
990        new: &InputContentDigest,
991    ) -> Result<(), DbErrorWrite>;
992
993    async fn list_logs(
994        &self,
995        execution_id: &ExecutionId,
996        filter: LogFilter,
997        pagination: Pagination<u32>,
998    ) -> Result<ListLogsResponse, DbErrorRead>;
999
1000    async fn list_deployment_states(
1001        &self,
1002        current_time: DateTime<Utc>,
1003        pagination: Pagination<Option<DeploymentId>>,
1004    ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1005
1006    /// Pause an execution. Only pending executions can be paused.
1007    async fn pause_execution(
1008        &self,
1009        execution_id: &ExecutionId,
1010        paused_at: DateTime<Utc>,
1011    ) -> Result<AppendResponse, DbErrorWrite>;
1012
1013    /// Unpause an execution. Only paused executions can be unpaused.
1014    async fn unpause_execution(
1015        &self,
1016        execution_id: &ExecutionId,
1017        unpaused_at: DateTime<Utc>,
1018    ) -> Result<AppendResponse, DbErrorWrite>;
1019}
1020pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1021pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1022    Pagination::OlderThan {
1023        length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1024        cursor: None,
1025        including_cursor: false,
1026    };
1027
1028pub struct DeploymentState {
1029    pub deployment_id: DeploymentId,
1030    pub locked: u32,
1031    // In `PendingAt` state, scheduled to present or past
1032    pub pending: u32,
1033    // In `PendingAt` state, scheduled into the future
1034    pub scheduled: u32,
1035    pub blocked: u32,
1036    pub finished: u32,
1037}
1038impl DeploymentState {
1039    #[must_use]
1040    pub fn new(deployment_id: DeploymentId) -> Self {
1041        DeploymentState {
1042            deployment_id,
1043            locked: 0,
1044            pending: 0,
1045            scheduled: 0,
1046            blocked: 0,
1047            finished: 0,
1048        }
1049    }
1050}
1051
1052#[derive(Debug)]
1053pub struct ListLogsResponse {
1054    pub items: Vec<LogEntryRow>,
1055    pub next_page: Pagination<u32>, // Newer logs can always arrive e.g. via replay
1056    pub prev_page: Option<Pagination<u32>>, // None if we are already at the beginning
1057}
1058
1059#[derive(Debug)]
1060pub struct LogFilter {
1061    show_logs: bool,
1062    show_streams: bool,
1063    levels: Vec<LogLevel>, // Only applied if `show_logs` = true, empty means return all levels.
1064    stream_types: Vec<LogStreamType>, // Only applied if `show_streams` = true, empty means return all stream types.
1065}
1066impl LogFilter {
1067    // Constructor for logs only
1068    #[must_use]
1069    pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1070        LogFilter {
1071            show_logs: true,
1072            show_streams: false,
1073            levels,
1074            stream_types: Vec::new(),
1075        }
1076    }
1077    // Constructor for streams only
1078    #[must_use]
1079    pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1080        LogFilter {
1081            show_logs: false,
1082            show_streams: true,
1083            levels: Vec::new(),
1084            stream_types,
1085        }
1086    }
1087    // Constructor for both logs and streams
1088    #[must_use]
1089    pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1090        LogFilter {
1091            show_logs: true,
1092            show_streams: true,
1093            levels,
1094            stream_types,
1095        }
1096    }
1097    // Getters
1098    #[must_use]
1099    pub fn should_show_logs(&self) -> bool {
1100        self.show_logs
1101    }
1102    #[must_use]
1103    pub fn should_show_streams(&self) -> bool {
1104        self.show_streams
1105    }
1106    #[must_use]
1107    pub fn levels(&self) -> &Vec<LogLevel> {
1108        &self.levels
1109    }
1110    #[must_use]
1111    pub fn stream_types(&self) -> &Vec<LogStreamType> {
1112        &self.stream_types
1113    }
1114}
1115
1116#[derive(Debug, Clone)]
1117pub struct ExecutionWithStateRequestsResponses {
1118    pub execution_with_state: ExecutionWithState,
1119    pub events: Vec<ExecutionEvent>,
1120    pub responses: Vec<ResponseWithCursor>,
1121    pub max_version: Version,
1122    pub max_cursor: ResponseCursor,
1123}
1124
1125#[async_trait]
1126pub trait DbConnection: DbExecutor {
1127    /// Get execution log.
1128    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1129
1130    async fn append_delay_response(
1131        &self,
1132        created_at: DateTime<Utc>,
1133        execution_id: ExecutionId,
1134        join_set_id: JoinSetId,
1135        delay_id: DelayId,
1136        outcome: Result<(), ()>, // Successfully finished - `Ok(())` or cancelled - `Err(())`
1137    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1138
1139    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
1140    /// The batch cannot contain `ExecutionEventInner::Created`.
1141    async fn append_batch(
1142        &self,
1143        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1144        batch: Vec<AppendRequest>,
1145        execution_id: ExecutionId,
1146        version: Version,
1147    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1148
1149    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
1150    /// The batch cannot contain `ExecutionEventInner::Created`.
1151    async fn append_batch_create_new_execution(
1152        &self,
1153        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1154        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
1155        execution_id: ExecutionId,
1156        version: Version,
1157        child_req: Vec<CreateRequest>,
1158        backtraces: Vec<BacktraceInfo>,
1159    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1160
1161    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
1162    async fn get_execution_event(
1163        &self,
1164        execution_id: &ExecutionId,
1165        version: &Version,
1166    ) -> Result<ExecutionEvent, DbErrorRead>;
1167
1168    #[instrument(skip(self))]
1169    async fn get_create_request(
1170        &self,
1171        execution_id: &ExecutionId,
1172    ) -> Result<CreateRequest, DbErrorRead> {
1173        let execution_event = self
1174            .get_execution_event(execution_id, &Version::new(0))
1175            .await?;
1176        if let ExecutionRequest::Created {
1177            ffqn,
1178            params,
1179            parent,
1180            scheduled_at,
1181            component_id,
1182            deployment_id,
1183            metadata,
1184            scheduled_by,
1185        } = execution_event.event
1186        {
1187            Ok(CreateRequest {
1188                created_at: execution_event.created_at,
1189                execution_id: execution_id.clone(),
1190                ffqn,
1191                params,
1192                parent,
1193                scheduled_at,
1194                component_id,
1195                deployment_id,
1196                metadata,
1197                scheduled_by,
1198            })
1199        } else {
1200            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1201                reason: "execution log must start with creation".into(),
1202                context: SpanTrace::capture(),
1203                source: None,
1204                loc: Location::caller(),
1205            }))
1206        }
1207    }
1208
1209    async fn get_pending_state(
1210        &self,
1211        execution_id: &ExecutionId,
1212    ) -> Result<ExecutionWithState, DbErrorRead>;
1213
1214    /// Get currently expired locks and async timers (delay requests)
1215    async fn get_expired_timers(
1216        &self,
1217        at: DateTime<Utc>,
1218    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1219
1220    /// Create a new execution log
1221    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1222
1223    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
1224    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
1225    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
1226    /// Implementations with no pubsub support should use polling.
1227    /// Callers are expected to call this function in a loop with a reasonable timeout
1228    /// to support less stellar implementations.
1229    async fn subscribe_to_next_responses(
1230        &self,
1231        execution_id: &ExecutionId,
1232        last_response: ResponseCursor,
1233        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1234    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1235
1236    /// First, attempt to fetch the finished value. If the execution is not finished yet, poll
1237    /// periodically or subscribe to db changes, racing with `timeout_fut`.
1238    /// Notification mechainism with no strict guarantees for getting the finished result.
1239    /// Implementations with no pubsub support should use polling.
1240    /// Callers are expected to call this function in a loop with a reasonable timeout
1241    /// to support less stellar implementations.
1242    async fn wait_for_finished_result(
1243        &self,
1244        execution_id: &ExecutionId,
1245        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1246    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1247
1248    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1249
1250    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1251
1252    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1253
1254    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1255
1256    /// Returns `TimeoutOutcome::Timeout` if not in Finished state.
1257    #[cfg(feature = "test")]
1258    async fn get_finished_result(
1259        &self,
1260        execution_id: &ExecutionId,
1261    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1262        self.wait_for_finished_result(
1263            execution_id,
1264            Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1265        )
1266        .await
1267    }
1268}
1269
1270#[derive(Clone, Debug)]
1271pub struct LogInfoAppendRow {
1272    pub execution_id: ExecutionId,
1273    pub run_id: RunId,
1274    pub log_entry: LogEntry,
1275}
1276
1277#[derive(Debug, Clone)]
1278pub struct LogEntryRow {
1279    pub cursor: u32,
1280    pub run_id: RunId,
1281    pub log_entry: LogEntry,
1282}
1283
1284#[derive(Debug, Clone)]
1285pub enum LogEntry {
1286    Log {
1287        created_at: DateTime<Utc>,
1288        level: LogLevel,
1289        message: String,
1290    },
1291    Stream {
1292        created_at: DateTime<Utc>,
1293        payload: Vec<u8>,
1294        stream_type: LogStreamType,
1295    },
1296}
1297impl LogEntry {
1298    #[must_use]
1299    pub fn created_at(&self) -> DateTime<Utc> {
1300        match self {
1301            LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1302        }
1303    }
1304}
1305
1306#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1307#[try_from(repr)]
1308#[repr(u8)]
1309pub enum LogLevel {
1310    Trace = 1,
1311    Debug,
1312    Info,
1313    Warn,
1314    Error,
1315}
1316#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1317#[try_from(repr)]
1318#[repr(u8)]
1319pub enum LogStreamType {
1320    StdOut = 1,
1321    StdErr,
1322}
1323
1324#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1325pub enum TimeoutOutcome {
1326    Timeout,
1327    Cancel,
1328}
1329
1330#[cfg(feature = "test")]
1331#[async_trait]
1332pub trait DbConnectionTest: DbConnection {
1333    async fn append_response(
1334        &self,
1335        created_at: DateTime<Utc>,
1336        execution_id: ExecutionId,
1337        response_event: JoinSetResponseEvent,
1338    ) -> Result<(), DbErrorWrite>;
1339}
1340
1341#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1342pub enum CancelOutcome {
1343    Cancelled,
1344    AlreadyFinished,
1345}
1346
1347#[instrument(skip(db_connection))]
1348pub async fn stub_execution(
1349    db_connection: &dyn DbConnection,
1350    execution_id: ExecutionIdDerived,
1351    parent_execution_id: ExecutionId,
1352    join_set_id: JoinSetId,
1353    created_at: DateTime<Utc>,
1354    return_value: SupportedFunctionReturnValue,
1355) -> Result<(), DbErrorWrite> {
1356    let stub_finished_version = Version::new(1); // Stub activities have no execution log except Created event.
1357    // Attempt to write to `execution_id` and its parent, ignoring the possible conflict error on this tx
1358    let write_attempt = {
1359        let finished_req = AppendRequest {
1360            created_at,
1361            event: ExecutionRequest::Finished {
1362                result: return_value.clone(),
1363                http_client_traces: None,
1364            },
1365        };
1366        db_connection
1367            .append_batch_respond_to_parent(
1368                AppendEventsToExecution {
1369                    execution_id: ExecutionId::Derived(execution_id.clone()),
1370                    version: stub_finished_version.clone(),
1371                    batch: vec![finished_req],
1372                },
1373                AppendResponseToExecution {
1374                    parent_execution_id,
1375                    created_at,
1376                    join_set_id,
1377                    child_execution_id: execution_id.clone(),
1378                    finished_version: stub_finished_version.clone(),
1379                    result: return_value.clone(),
1380                },
1381                created_at,
1382            )
1383            .await
1384    };
1385    if let Err(write_attempt) = write_attempt {
1386        // Check that the expected value is in the database
1387        debug!("Stub write attempt failed - {write_attempt:?}");
1388
1389        let found = db_connection
1390            .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1391            .await?; // Not found at this point should not happen, unless the previous write failed. Will be retried.
1392        match found.event {
1393            ExecutionRequest::Finished {
1394                result: found_result,
1395                ..
1396            } if return_value == found_result => {
1397                // Same value has already be written, RPC is successful.
1398                Ok(())
1399            }
1400            ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1401                DbErrorWriteNonRetriable::Conflict,
1402            )),
1403            _other => Err(DbErrorWrite::NonRetriable(
1404                DbErrorWriteNonRetriable::IllegalState {
1405                    reason: "unexpected execution event at stubbed execution".into(),
1406                    context: SpanTrace::capture(),
1407                    source: None,
1408                    loc: Location::caller(),
1409                },
1410            )),
1411        }
1412    } else {
1413        Ok(())
1414    }
1415}
1416
1417pub async fn cancel_delay(
1418    db_connection: &dyn DbConnection,
1419    delay_id: DelayId,
1420    created_at: DateTime<Utc>,
1421) -> Result<CancelOutcome, DbErrorWrite> {
1422    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1423    db_connection
1424        .append_delay_response(
1425            created_at,
1426            parent_execution_id,
1427            join_set_id,
1428            delay_id,
1429            Err(()), // Mark as cancelled.
1430        )
1431        .await
1432        .map(|ok| match ok {
1433            AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1434                CancelOutcome::Cancelled
1435            }
1436            AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1437        })
1438}
1439
1440#[derive(Clone, Debug)]
1441pub enum BacktraceFilter {
1442    First,
1443    Last,
1444    Specific(Version),
1445}
1446
1447#[derive(Clone, Debug, PartialEq, Eq)]
1448pub struct BacktraceInfo {
1449    pub execution_id: ExecutionId,
1450    pub component_id: ComponentId,
1451    pub version_min_including: Version,
1452    pub version_max_excluding: Version,
1453    pub wasm_backtrace: WasmBacktrace,
1454}
1455
1456#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1457pub struct WasmBacktrace {
1458    pub frames: Vec<FrameInfo>,
1459}
1460
1461#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1462pub struct FrameInfo {
1463    pub module: String,
1464    pub func_name: String,
1465    pub symbols: Vec<FrameSymbol>,
1466}
1467
1468#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1469pub struct FrameSymbol {
1470    pub func_name: Option<String>,
1471    pub file: Option<String>,
1472    pub line: Option<u32>,
1473    pub col: Option<u32>,
1474}
1475
1476mod wasm_backtrace {
1477    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1478
1479    impl WasmBacktrace {
1480        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1481            if backtrace.frames().is_empty() {
1482                None
1483            } else {
1484                Some(Self {
1485                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1486                })
1487            }
1488        }
1489    }
1490
1491    impl From<&wasmtime::FrameInfo> for FrameInfo {
1492        fn from(frame: &wasmtime::FrameInfo) -> Self {
1493            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1494            let mut func_name = String::new();
1495            wasmtime_environ::demangle_function_name_or_index(
1496                &mut func_name,
1497                frame.func_name(),
1498                frame.func_index() as usize,
1499            )
1500            .expect("writing to string must succeed");
1501            Self {
1502                module: module_name,
1503                func_name,
1504                symbols: frame
1505                    .symbols()
1506                    .iter()
1507                    .map(std::convert::Into::into)
1508                    .collect(),
1509            }
1510        }
1511    }
1512
1513    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1514        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1515            let func_name = symbol.name().map(|name| {
1516                let mut writer = String::new();
1517                wasmtime_environ::demangle_function_name(&mut writer, name)
1518                    .expect("writing to string must succeed");
1519                writer
1520            });
1521
1522            Self {
1523                func_name,
1524                file: symbol.file().map(ToString::to_string),
1525                line: symbol.line(),
1526                col: symbol.column(),
1527            }
1528        }
1529    }
1530}
1531#[derive(Debug, Clone, derive_more::Display)]
1532#[display("{execution_id} {pending_state} {component_digest}")]
1533pub struct ExecutionWithState {
1534    pub execution_id: ExecutionId,
1535    pub ffqn: FunctionFqn,
1536    pub pending_state: PendingState,
1537    pub created_at: DateTime<Utc>,
1538    pub first_scheduled_at: DateTime<Utc>,
1539    pub component_digest: InputContentDigest,
1540    pub component_type: ComponentType,
1541    pub deployment_id: DeploymentId,
1542}
1543
1544#[derive(Debug, Clone)]
1545pub enum ExecutionListPagination {
1546    CreatedBy(Pagination<Option<DateTime<Utc>>>),
1547    ExecutionId(Pagination<Option<ExecutionId>>),
1548}
1549impl Default for ExecutionListPagination {
1550    fn default() -> ExecutionListPagination {
1551        ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1552            length: 20,
1553            cursor: None,
1554            including_cursor: false, // does not matter when `cursor` is not specified
1555        })
1556    }
1557}
1558impl ExecutionListPagination {
1559    #[must_use]
1560    pub fn length(&self) -> u16 {
1561        match self {
1562            ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1563            ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1564        }
1565    }
1566}
1567
1568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1569pub enum Pagination<T> {
1570    NewerThan {
1571        length: u16,
1572        cursor: T,
1573        including_cursor: bool,
1574    },
1575    OlderThan {
1576        length: u16,
1577        cursor: T,
1578        including_cursor: bool,
1579    },
1580}
1581impl<T: Clone> Pagination<T> {
1582    pub fn length(&self) -> u16 {
1583        match self {
1584            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1585        }
1586    }
1587
1588    pub fn rel(&self) -> &'static str {
1589        match self {
1590            Pagination::NewerThan {
1591                including_cursor: false,
1592                ..
1593            } => ">",
1594            Pagination::NewerThan {
1595                including_cursor: true,
1596                ..
1597            } => ">=",
1598            Pagination::OlderThan {
1599                including_cursor: false,
1600                ..
1601            } => "<",
1602            Pagination::OlderThan {
1603                including_cursor: true,
1604                ..
1605            } => "<=",
1606        }
1607    }
1608
1609    pub fn is_desc(&self) -> bool {
1610        matches!(self, Pagination::OlderThan { .. })
1611    }
1612
1613    pub fn asc_or_desc(&self) -> &'static str {
1614        if self.is_asc() { "asc" } else { "desc" }
1615    }
1616
1617    pub fn is_asc(&self) -> bool {
1618        !self.is_desc()
1619    }
1620
1621    pub fn cursor(&self) -> &T {
1622        match self {
1623            Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1624        }
1625    }
1626
1627    #[must_use]
1628    pub fn invert(&self) -> Self {
1629        match self {
1630            Pagination::NewerThan {
1631                length,
1632                cursor,
1633                including_cursor,
1634            } => Pagination::OlderThan {
1635                length: *length,
1636                cursor: cursor.clone(),
1637                including_cursor: !including_cursor,
1638            },
1639            Pagination::OlderThan {
1640                length,
1641                cursor,
1642                including_cursor,
1643            } => Pagination::NewerThan {
1644                length: *length,
1645                cursor: cursor.clone(),
1646                including_cursor: !including_cursor,
1647            },
1648        }
1649    }
1650}
1651
1652#[cfg(feature = "test")]
1653pub async fn wait_for_pending_state_fn<T: Debug>(
1654    db_connection: &dyn DbConnectionTest,
1655    execution_id: &ExecutionId,
1656    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1657    timeout: Option<Duration>,
1658) -> Result<T, DbErrorReadWithTimeout> {
1659    tracing::trace!(%execution_id, "Waiting for predicate");
1660    let fut = async move {
1661        loop {
1662            let execution_log = db_connection.get(execution_id).await?;
1663            if let Some(t) = predicate(execution_log) {
1664                tracing::debug!(%execution_id, "Found: {t:?}");
1665                return Ok(t);
1666            }
1667            tokio::time::sleep(Duration::from_millis(10)).await;
1668        }
1669    };
1670
1671    if let Some(timeout) = timeout {
1672        tokio::select! { // future's liveness: Dropping the loser immediately.
1673            res = fut => res,
1674            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
1675        }
1676    } else {
1677        fut.await
1678    }
1679}
1680
1681#[derive(Debug, Clone, PartialEq, Eq)]
1682pub enum ExpiredTimer {
1683    Lock(ExpiredLock),
1684    Delay(ExpiredDelay),
1685}
1686
1687#[derive(Debug, Clone, PartialEq, Eq)]
1688pub struct ExpiredLock {
1689    pub execution_id: ExecutionId,
1690    // Version of last `Locked` event, used to detect whether the execution made progress.
1691    pub locked_at_version: Version,
1692    pub next_version: Version,
1693    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1694    pub intermittent_event_count: u32,
1695    pub max_retries: Option<u32>,
1696    pub retry_exp_backoff: Duration,
1697    pub locked_by: LockedBy,
1698}
1699
1700#[derive(Debug, Clone, PartialEq, Eq)]
1701pub struct ExpiredDelay {
1702    pub execution_id: ExecutionId,
1703    pub join_set_id: JoinSetId,
1704    pub delay_id: DelayId,
1705}
1706
1707#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1708#[serde(tag = "status", rename_all = "snake_case")]
1709pub enum PendingState {
1710    /// Caused by [`ExecutionRequest::Locked`].
1711    Locked(PendingStateLocked),
1712
1713    #[display("PendingAt(`{_0}`)")]
1714    PendingAt(PendingStatePendingAt),
1715
1716    /// Caused by [`HistoryEvent::JoinNext`]
1717    #[display("BlockedByJoinSet({_0})")]
1718    BlockedByJoinSet(PendingStateBlockedByJoinSet),
1719
1720    #[display("Paused({_0})")]
1721    Paused(PendingStatePaused),
1722
1723    #[display("Finished({_0})")]
1724    Finished(PendingStateFinished),
1725}
1726
1727pub enum PendingStateMergedPause {
1728    Locked {
1729        state: PendingStateLocked,
1730        paused: bool,
1731    },
1732    PendingAt {
1733        state: PendingStatePendingAt,
1734        paused: bool,
1735    },
1736    BlockedByJoinSet {
1737        state: PendingStateBlockedByJoinSet,
1738        paused: bool,
1739    },
1740    Finished(PendingStateFinished),
1741}
1742impl From<PendingState> for PendingStateMergedPause {
1743    fn from(state: PendingState) -> Self {
1744        match state {
1745            PendingState::Locked(s) => PendingStateMergedPause::Locked {
1746                state: s,
1747                paused: false,
1748            },
1749
1750            PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
1751                state: s,
1752                paused: false,
1753            },
1754
1755            PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
1756                state: s,
1757                paused: false,
1758            },
1759
1760            PendingState::Paused(paused) => match paused {
1761                PendingStatePaused::Locked(s) => PendingStateMergedPause::Locked {
1762                    state: s,
1763                    paused: true,
1764                },
1765                PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
1766                    state: s,
1767                    paused: true,
1768                },
1769                PendingStatePaused::BlockedByJoinSet(s) => {
1770                    PendingStateMergedPause::BlockedByJoinSet {
1771                        state: s,
1772                        paused: true,
1773                    }
1774                }
1775            },
1776
1777            PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
1778        }
1779    }
1780}
1781
1782#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1783#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1784pub struct PendingStateLocked {
1785    pub locked_by: LockedBy,
1786    pub lock_expires_at: DateTime<Utc>,
1787}
1788
1789#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1790#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
1791pub struct PendingStatePendingAt {
1792    pub scheduled_at: DateTime<Utc>,
1793    /// `last_lock` is needed for lock extension.
1794    pub last_lock: Option<LockedBy>,
1795}
1796
1797#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1798#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
1799pub struct PendingStateBlockedByJoinSet {
1800    pub join_set_id: JoinSetId,
1801    /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1802    pub lock_expires_at: DateTime<Utc>,
1803    /// Blocked by closing of the join set
1804    pub closing: bool,
1805}
1806
1807/// State of execution before it was paused.
1808#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1809pub enum PendingStatePaused {
1810    #[display("Locked({_0})")]
1811    Locked(PendingStateLocked),
1812    #[display("PendingAt({_0})")]
1813    PendingAt(PendingStatePendingAt),
1814    #[display("BlockedByJoinSet({_0})")]
1815    BlockedByJoinSet(PendingStateBlockedByJoinSet),
1816}
1817
1818#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1819pub struct LockedBy {
1820    pub executor_id: ExecutorId,
1821    pub run_id: RunId,
1822}
1823impl From<&Locked> for LockedBy {
1824    fn from(value: &Locked) -> Self {
1825        LockedBy {
1826            executor_id: value.executor_id,
1827            run_id: value.run_id,
1828        }
1829    }
1830}
1831
1832#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
1833#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
1834pub struct PendingStateFinished {
1835    pub version: VersionType, // not Version since it must be Copy
1836    pub finished_at: DateTime<Utc>,
1837    pub result_kind: PendingStateFinishedResultKind,
1838}
1839impl Display for PendingStateFinished {
1840    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1841        match self.result_kind {
1842            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1843            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1844        }
1845    }
1846}
1847
1848// This is not a Result so that it can be customized for serialization
1849#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1850#[serde(rename_all = "snake_case")]
1851pub enum PendingStateFinishedResultKind {
1852    Ok,
1853    Err(PendingStateFinishedError),
1854}
1855impl PendingStateFinishedResultKind {
1856    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1857        match self {
1858            PendingStateFinishedResultKind::Ok => Ok(()),
1859            PendingStateFinishedResultKind::Err(err) => Err(err),
1860        }
1861    }
1862}
1863
1864impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1865    fn from(result: &SupportedFunctionReturnValue) -> Self {
1866        result.as_pending_state_finished_result()
1867    }
1868}
1869
1870#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1871#[serde(rename_all = "snake_case")]
1872pub enum PendingStateFinishedError {
1873    #[display("execution terminated: {_0}")]
1874    ExecutionFailure(ExecutionFailureKind),
1875    #[display("execution completed with an error")]
1876    Error,
1877}
1878
1879impl PendingState {
1880    #[instrument(skip(self))]
1881    pub fn can_append_lock(
1882        &self,
1883        created_at: DateTime<Utc>,
1884        executor_id: ExecutorId,
1885        run_id: RunId,
1886        lock_expires_at: DateTime<Utc>,
1887    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1888        if lock_expires_at <= created_at {
1889            return Err(DbErrorWriteNonRetriable::ValidationFailed(
1890                "invalid expiry date".into(),
1891            ));
1892        }
1893        match self {
1894            PendingState::PendingAt(PendingStatePendingAt {
1895                scheduled_at,
1896                last_lock,
1897            }) => {
1898                if *scheduled_at <= created_at {
1899                    // pending now, ok to lock
1900                    Ok(LockKind::CreatingNewLock)
1901                } else if let Some(LockedBy {
1902                    executor_id: last_executor_id,
1903                    run_id: last_run_id,
1904                }) = last_lock
1905                    && executor_id == *last_executor_id
1906                    && run_id == *last_run_id
1907                {
1908                    // Original executor is extending the lock.
1909                    Ok(LockKind::Extending)
1910                } else {
1911                    Err(DbErrorWriteNonRetriable::ValidationFailed(
1912                        "cannot lock, not yet pending".into(),
1913                    ))
1914                }
1915            }
1916            PendingState::Locked(PendingStateLocked {
1917                locked_by:
1918                    LockedBy {
1919                        executor_id: current_pending_state_executor_id,
1920                        run_id: current_pending_state_run_id,
1921                    },
1922                lock_expires_at: _,
1923            }) => {
1924                if executor_id == *current_pending_state_executor_id
1925                    && run_id == *current_pending_state_run_id
1926                {
1927                    // Original executor is extending the lock.
1928                    Ok(LockKind::Extending)
1929                } else {
1930                    Err(DbErrorWriteNonRetriable::IllegalState {
1931                        reason: "cannot lock, already locked".into(),
1932                        context: SpanTrace::capture(),
1933                        source: None,
1934                        loc: Location::caller(),
1935                    })
1936                }
1937            }
1938            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
1939                reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1940                context: SpanTrace::capture(),
1941                source: None,
1942                loc: Location::caller(),
1943            }),
1944            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
1945                reason: "already finished".into(),
1946                context: SpanTrace::capture(),
1947                source: None,
1948                loc: Location::caller(),
1949            }),
1950            PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
1951                reason: "cannot lock, execution is paused".into(),
1952                context: SpanTrace::capture(),
1953                source: None,
1954                loc: Location::caller(),
1955            }),
1956        }
1957    }
1958
1959    #[must_use]
1960    pub fn is_finished(&self) -> bool {
1961        matches!(self, PendingState::Finished { .. })
1962    }
1963
1964    #[must_use]
1965    pub fn is_paused(&self) -> bool {
1966        matches!(self, PendingState::Paused(_))
1967    }
1968}
1969
1970#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1971pub enum LockKind {
1972    Extending,
1973    CreatingNewLock,
1974}
1975
1976pub mod http_client_trace {
1977    use chrono::{DateTime, Utc};
1978    use serde::{Deserialize, Serialize};
1979
1980    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1981    pub struct HttpClientTrace {
1982        pub req: RequestTrace,
1983        pub resp: Option<ResponseTrace>,
1984    }
1985
1986    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1987    pub struct RequestTrace {
1988        pub sent_at: DateTime<Utc>,
1989        pub uri: String,
1990        pub method: String,
1991    }
1992
1993    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1994    pub struct ResponseTrace {
1995        pub finished_at: DateTime<Utc>,
1996        pub status: Result<u16, String>,
1997    }
1998}
1999
2000#[cfg(test)]
2001mod tests {
2002    use super::HistoryEventScheduleAt;
2003    use super::PendingStateFinished;
2004    use super::PendingStateFinishedError;
2005    use super::PendingStateFinishedResultKind;
2006    use crate::ExecutionFailureKind;
2007    use crate::SupportedFunctionReturnValue;
2008    use chrono::DateTime;
2009    use chrono::Datelike;
2010    use chrono::Utc;
2011    use insta::assert_snapshot;
2012    use rstest::rstest;
2013    use std::time::Duration;
2014    use val_json::type_wrapper::TypeWrapper;
2015    use val_json::wast_val::WastVal;
2016    use val_json::wast_val::WastValWithType;
2017
2018    #[rstest(expected => [
2019        PendingStateFinishedResultKind::Ok,
2020        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2021    ])]
2022    #[test]
2023    fn serde_pending_state_finished_result_kind_should_work(
2024        expected: PendingStateFinishedResultKind,
2025    ) {
2026        let ser = serde_json::to_string(&expected).unwrap();
2027        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2028        assert_eq!(expected, actual);
2029    }
2030
2031    #[rstest(result_kind => [
2032        PendingStateFinishedResultKind::Ok,
2033        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2034    ])]
2035    #[test]
2036    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2037        let expected = PendingStateFinished {
2038            version: 0,
2039            finished_at: Utc::now(),
2040            result_kind,
2041        };
2042
2043        let ser = serde_json::to_string(&expected).unwrap();
2044        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2045        assert_eq!(expected, actual);
2046    }
2047
2048    #[test]
2049    fn join_set_deser_with_result_ok_option_none_should_work() {
2050        let expected = SupportedFunctionReturnValue::Ok {
2051            ok: Some(WastValWithType {
2052                r#type: TypeWrapper::Result {
2053                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2054                    err: Some(Box::new(TypeWrapper::String)),
2055                },
2056                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2057            }),
2058        };
2059        let json = serde_json::to_string(&expected).unwrap();
2060        assert_snapshot!(json);
2061
2062        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2063
2064        assert_eq!(expected, actual);
2065    }
2066
2067    #[test]
2068    fn as_date_time_should_work_with_duration_u32_max_secs() {
2069        let duration = Duration::from_secs(u64::from(u32::MAX));
2070        let schedule_at = HistoryEventScheduleAt::In(duration);
2071        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2072        assert_eq!(2106, resolved.year());
2073    }
2074
2075    const MILLIS_PER_SEC: i64 = 1000;
2076    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2077
2078    #[test]
2079    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2080        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
2081        let duration = Duration::from_secs(
2082            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2083        );
2084        let schedule_at = HistoryEventScheduleAt::In(duration);
2085        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2086    }
2087}