Skip to main content

obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionError;
8use crate::FunctionFqn;
9use crate::JoinSetId;
10use crate::Params;
11use crate::StrVariant;
12use crate::SupportedFunctionReturnValue;
13use crate::component_id::InputContentDigest;
14use crate::prefixed_ulid::DelayId;
15use crate::prefixed_ulid::DeploymentId;
16use crate::prefixed_ulid::ExecutionIdDerived;
17use crate::prefixed_ulid::ExecutorId;
18use crate::prefixed_ulid::RunId;
19use assert_matches::assert_matches;
20use async_trait::async_trait;
21use chrono::TimeDelta;
22use chrono::{DateTime, Utc};
23use http_client_trace::HttpClientTrace;
24use serde::Deserialize;
25use serde::Serialize;
26use std::fmt::Debug;
27use std::fmt::Display;
28use std::panic::Location;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33use tracing::instrument;
34use tracing_error::SpanTrace;
35
36// Shared between databases. TODO: Extract to db-common
37pub const STATE_PENDING_AT: &str = "pending_at";
38pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
39pub const STATE_LOCKED: &str = "locked";
40pub const STATE_FINISHED: &str = "finished";
41pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; // Serialization tag of `HistoryEvent::JoinNext`
42
43#[derive(Debug, PartialEq, Eq, Clone)]
44pub struct ExecutionLog {
45    pub execution_id: ExecutionId,
46    pub events: Vec<ExecutionEvent>,
47    pub responses: Vec<ResponseWithCursor>,
48    pub next_version: Version, // Is not advanced once in Finished state
49    pub pending_state: PendingState, // reflecting the current state
50    pub component_digest: InputContentDigest, // reflecting the current state
51    pub component_type: ComponentType,
52    pub deployment_id: DeploymentId, // reflecting the current state
53}
54
55impl ExecutionLog {
56    /// Return some duration after which the execution will be retried.
57    /// Return `None` if no more retries are allowed.
58    #[must_use]
59    pub fn can_be_retried_after(
60        temporary_event_count: u32,
61        max_retries: Option<u32>,
62        retry_exp_backoff: Duration,
63    ) -> Option<Duration> {
64        // If max_retries == None, wrapping is OK after this succeeds - we want to retry forever.
65        if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
66            // TODO: Add test for number of retries
67            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
68            Some(duration)
69        } else {
70            None
71        }
72    }
73
74    #[must_use]
75    pub fn compute_retry_duration_when_retrying_forever(
76        temporary_event_count: u32,
77        retry_exp_backoff: Duration,
78    ) -> Duration {
79        Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
80            .expect("`max_retries` set to MAX must never return None")
81    }
82
83    #[must_use]
84    pub fn ffqn(&self) -> &FunctionFqn {
85        assert_matches!(self.events.first(), Some(ExecutionEvent {
86            event: ExecutionRequest::Created { ffqn, .. },
87            ..
88        }) => ffqn)
89    }
90
91    #[must_use]
92    pub fn params(&self) -> &Params {
93        assert_matches!(self.events.first(), Some(ExecutionEvent {
94            event: ExecutionRequest::Created { params, .. },
95            ..
96        }) => params)
97    }
98
99    #[must_use]
100    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
101        assert_matches!(self.events.first(), Some(ExecutionEvent {
102            event: ExecutionRequest::Created { parent, .. },
103            ..
104        }) => parent.clone())
105    }
106
107    #[must_use]
108    pub fn last_event(&self) -> &ExecutionEvent {
109        self.events.last().expect("must contain at least one event")
110    }
111
112    #[must_use]
113    pub fn is_finished(&self) -> bool {
114        matches!(
115            self.events.last(),
116            Some(ExecutionEvent {
117                event: ExecutionRequest::Finished { .. },
118                ..
119            })
120        )
121    }
122
123    #[must_use]
124    pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
125        if let ExecutionEvent {
126            event: ExecutionRequest::Finished { result, .. },
127            ..
128        } = self.events.last().expect("must contain at least one event")
129        {
130            Some(result.clone())
131        } else {
132            None
133        }
134    }
135
136    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
137        self.events.iter().filter_map(|event| {
138            if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
139                Some((eh.clone(), event.version.clone()))
140            } else {
141                None
142            }
143        })
144    }
145
146    #[cfg(feature = "test")]
147    #[must_use]
148    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
149        self.events
150            .iter()
151            .find_map(move |event| match &event.event {
152                ExecutionRequest::HistoryEvent {
153                    event:
154                        HistoryEvent::JoinSetRequest {
155                            join_set_id: found,
156                            request,
157                        },
158                    ..
159                } if *join_set_id == *found => Some(request),
160                _ => None,
161            })
162    }
163}
164
165pub type VersionType = u32;
166#[derive(
167    Debug,
168    Default,
169    Clone,
170    PartialEq,
171    Eq,
172    Hash,
173    derive_more::Display,
174    derive_more::Into,
175    serde::Serialize,
176    serde::Deserialize,
177)]
178#[serde(transparent)]
179pub struct Version(pub VersionType);
180impl Version {
181    #[must_use]
182    pub fn new(arg: VersionType) -> Version {
183        Version(arg)
184    }
185
186    #[must_use]
187    pub fn increment(&self) -> Version {
188        Version(self.0 + 1)
189    }
190}
191impl TryFrom<i64> for Version {
192    type Error = VersionParseError;
193    fn try_from(value: i64) -> Result<Self, Self::Error> {
194        VersionType::try_from(value)
195            .map(Version::new)
196            .map_err(|_| VersionParseError)
197    }
198}
199impl From<Version> for usize {
200    fn from(value: Version) -> Self {
201        usize::try_from(value.0).expect("16 bit systems are unsupported")
202    }
203}
204impl From<&Version> for usize {
205    fn from(value: &Version) -> Self {
206        usize::try_from(value.0).expect("16 bit systems are unsupported")
207    }
208}
209
210#[derive(Debug, thiserror::Error)]
211#[error("version must be u32")]
212pub struct VersionParseError;
213
214#[derive(
215    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
216)]
217#[display("{event}")]
218pub struct ExecutionEvent {
219    pub created_at: DateTime<Utc>,
220    pub event: ExecutionRequest,
221    #[serde(skip_serializing_if = "Option::is_none")]
222    pub backtrace_id: Option<Version>,
223    pub version: Version,
224}
225
226#[derive(
227    Debug,
228    Clone,
229    Copy,
230    PartialEq,
231    Eq,
232    derive_more::Display,
233    derive_more::Into,
234    Serialize, /* webapi */
235)]
236pub struct ResponseCursor(pub u32);
237
238#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */)]
239pub struct ResponseWithCursor {
240    pub event: JoinSetResponseEventOuter,
241    pub cursor: ResponseCursor,
242}
243
244#[derive(Debug)]
245pub struct ListExecutionEventsResponse {
246    pub events: Vec<ExecutionEvent>,
247    pub max_version: Version,
248}
249
250#[derive(Debug)]
251pub struct ListResponsesResponse {
252    pub responses: Vec<ResponseWithCursor>,
253    pub max_cursor: ResponseCursor,
254}
255
256#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */)]
257pub struct JoinSetResponseEventOuter {
258    pub created_at: DateTime<Utc>,
259    pub event: JoinSetResponseEvent,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
263pub struct JoinSetResponseEvent {
264    pub join_set_id: JoinSetId,
265    pub event: JoinSetResponse,
266}
267
268#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
269#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
270#[serde(tag = "type", rename_all = "snake_case")]
271pub enum JoinSetResponse {
272    #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
273    DelayFinished {
274        delay_id: DelayId,
275        result: Result<(), ()>,
276    },
277    #[display("{result}: {child_execution_id}")] // execution completed..
278    ChildExecutionFinished {
279        child_execution_id: ExecutionIdDerived,
280        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
281        finished_version: Version,
282        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
283        result: SupportedFunctionReturnValue,
284    },
285}
286
287pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
288    ffqn: FunctionFqn::new_static("", ""),
289    params: Params::empty(),
290    parent: None,
291    scheduled_at: DateTime::from_timestamp_nanos(0),
292    component_id: ComponentId::dummy_activity(),
293    deployment_id: DeploymentId::from_parts(0, 0),
294    metadata: ExecutionMetadata::empty(),
295    scheduled_by: None,
296};
297pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
298    event: HistoryEvent::JoinSetCreate {
299        join_set_id: JoinSetId {
300            kind: crate::JoinSetKind::OneOff,
301            name: StrVariant::empty(),
302        },
303    },
304};
305
306#[derive(
307    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
308)]
309#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
310#[serde(rename_all = "snake_case")]
311pub enum ExecutionRequest {
312    #[display("Created({ffqn}, `{scheduled_at}`)")]
313    Created {
314        ffqn: FunctionFqn,
315        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
316        #[debug(skip)]
317        params: Params,
318        parent: Option<(ExecutionId, JoinSetId)>,
319        scheduled_at: DateTime<Utc>,
320        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
321        component_id: ComponentId,
322        deployment_id: DeploymentId,
323        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
324        metadata: ExecutionMetadata,
325        scheduled_by: Option<ExecutionId>,
326    },
327    Locked(Locked),
328    /// Returns execution to [`PendingState::PendingAt`] state at the specified time.
329    /// This can happen when:
330    /// - executor is running out of resources like [`WorkerError::LimitReached`]
331    /// - executor is shutting down
332    #[display("Unlocked(`{backoff_expires_at}`)")]
333    Unlocked {
334        backoff_expires_at: DateTime<Utc>,
335        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
336        reason: StrVariant,
337    },
338    // Created by the executor holding the lock.
339    // After expiry interpreted as pending.
340    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
341    TemporarilyFailed {
342        backoff_expires_at: DateTime<Utc>,
343        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
344        reason: StrVariant,
345        detail: Option<String>,
346        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
347        http_client_traces: Option<Vec<HttpClientTrace>>,
348    },
349    // Created by the executor holding the lock.
350    // After expiry interpreted as pending.
351    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
352    TemporarilyTimedOut {
353        backoff_expires_at: DateTime<Utc>,
354        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
355        http_client_traces: Option<Vec<HttpClientTrace>>,
356    },
357    // Created by the executor holding the lock.
358    #[display("Finished")]
359    Finished {
360        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
361        result: SupportedFunctionReturnValue,
362        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
363        http_client_traces: Option<Vec<HttpClientTrace>>,
364    },
365
366    #[display("HistoryEvent({event})")]
367    HistoryEvent {
368        event: HistoryEvent,
369    },
370    #[display("Paused")]
371    Paused,
372    #[display("Unpaused")]
373    Unpaused,
374}
375
376impl ExecutionRequest {
377    #[must_use]
378    pub fn is_temporary_event(&self) -> bool {
379        matches!(
380            self,
381            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
382        )
383    }
384
385    /// String representation of `ExecutionRequest`, used in execution log table to fetch events of certain type, e.g. `created` + `history_event`.
386    #[must_use]
387    pub const fn variant(&self) -> &'static str {
388        match self {
389            ExecutionRequest::Created { .. } => "created",
390            ExecutionRequest::Locked(_) => "locked",
391            ExecutionRequest::Unlocked { .. } => "unlocked",
392            ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
393            ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
394            ExecutionRequest::Finished { .. } => "finished",
395            ExecutionRequest::HistoryEvent { .. } => "history_event",
396            ExecutionRequest::Paused => "paused",
397            ExecutionRequest::Unpaused => "unpaused",
398        }
399    }
400
401    #[must_use]
402    pub fn join_set_id(&self) -> Option<&JoinSetId> {
403        match self {
404            Self::Created {
405                parent: Some((_parent_id, join_set_id)),
406                ..
407            } => Some(join_set_id),
408            Self::HistoryEvent {
409                event:
410                    HistoryEvent::JoinSetCreate { join_set_id, .. }
411                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
412                    | HistoryEvent::JoinNext { join_set_id, .. },
413            } => Some(join_set_id),
414            _ => None,
415        }
416    }
417}
418
419#[derive(
420    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
421)]
422#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
423#[display("Locked(`{lock_expires_at}`, {component_id})")]
424pub struct Locked {
425    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
426    pub component_id: ComponentId,
427    pub executor_id: ExecutorId,
428    pub deployment_id: DeploymentId,
429    pub run_id: RunId,
430    pub lock_expires_at: DateTime<Utc>,
431    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
432    pub retry_config: ComponentRetryConfig,
433}
434
435#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
436#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
437#[serde(tag = "type", rename_all = "snake_case")]
438pub enum PersistKind {
439    #[display("RandomU64({min}, {max_inclusive})")]
440    RandomU64 { min: u64, max_inclusive: u64 },
441    #[display("RandomString({min_length}, {max_length_exclusive})")]
442    RandomString {
443        min_length: u64,
444        max_length_exclusive: u64,
445    },
446}
447
448#[must_use]
449pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
450    value.to_be_bytes()
451}
452
453#[must_use]
454pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
455    u64::from_be_bytes(bytes)
456}
457
458#[derive(
459    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
460)]
461#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
462#[serde(tag = "type", rename_all = "snake_case")]
463/// Must be created by the executor in [`PendingState::Locked`].
464pub enum HistoryEvent {
465    /// Persist a generated pseudorandom value.
466    #[display("Persist")]
467    Persist {
468        #[debug(skip)]
469        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
470        kind: PersistKind,
471    },
472    #[display("JoinSetCreate({join_set_id})")]
473    JoinSetCreate { join_set_id: JoinSetId },
474    #[display("JoinSetRequest({request})")]
475    // join_set_id is part of ExecutionId or DelayId in the `request`
476    JoinSetRequest {
477        join_set_id: JoinSetId,
478        request: JoinSetRequest,
479    },
480    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
481    /// When the response arrives at `resp_time`:
482    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
483    /// original executor can continue. After the expiry any executor can continue without
484    /// marking the execution as timed out.
485    #[display("JoinNext({join_set_id})")]
486    JoinNext {
487        join_set_id: JoinSetId,
488        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
489        /// The pending status will be kept in Locked state until `run_expires_at`.
490        run_expires_at: DateTime<Utc>,
491        /// Set to a specific function when calling `-await-next` extension function, used for
492        /// determinism checks.
493        requested_ffqn: Option<FunctionFqn>,
494        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
495        closing: bool,
496    },
497    /// Attempt to process next response without changing the pending state.
498    #[display("JoinNextTry({join_set_id}, {outcome})")]
499    JoinNextTry {
500        join_set_id: JoinSetId,
501        /// The outcome replaces the old `found_response: bool` field.
502        /// Deserialization from old records uses [`JoinNextTryOutcome`]'s `Deserialize`
503        /// impl which accepts both the new string values and legacy booleans.
504        #[serde(alias = "found_response")]
505        outcome: JoinNextTryOutcome,
506    },
507    /// Records the fact that a join set was awaited more times than its submission count.
508    #[display("JoinNextTooMany({join_set_id})")]
509    JoinNextTooMany {
510        join_set_id: JoinSetId,
511        /// Set to a specific function when calling `-await-next` extension function, used for
512        /// determinism checks.
513        requested_ffqn: Option<FunctionFqn>,
514    },
515    #[display("Schedule({execution_id}, {schedule_at})")]
516    Schedule {
517        execution_id: ExecutionId,
518        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
519    },
520    #[display("Stub({target_execution_id})")]
521    Stub {
522        target_execution_id: ExecutionIdDerived,
523        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
524        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
525        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
526    },
527}
528
529#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize)]
530#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
531#[serde(rename_all = "snake_case")]
532pub enum JoinNextTryOutcome {
533    /// A response was found and processed.
534    #[display("found")]
535    Found,
536    /// No response available, but there are still pending requests.
537    #[display("pending")]
538    Pending,
539    /// No response available, and all requests have been processed.
540    #[display("all_processed")]
541    AllProcessed,
542}
543
544impl<'de> serde::Deserialize<'de> for JoinNextTryOutcome {
545    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
546        use serde::Deserialize;
547        // Accept both the new string values and the legacy bool from `found_response`.
548        #[derive(Deserialize)]
549        #[serde(untagged)]
550        enum BoolOrString {
551            Bool(bool),
552            String(String),
553        }
554        match BoolOrString::deserialize(deserializer)? {
555            BoolOrString::Bool(b) => Ok(JoinNextTryOutcome::from(b)),
556            BoolOrString::String(s) => match s.as_str() {
557                "found" => Ok(JoinNextTryOutcome::Found),
558                "pending" => Ok(JoinNextTryOutcome::Pending),
559                "all_processed" => Ok(JoinNextTryOutcome::AllProcessed),
560                other => Err(serde::de::Error::unknown_variant(
561                    other,
562                    &["found", "pending", "all_processed"],
563                )),
564            },
565        }
566    }
567}
568
569impl From<bool> for JoinNextTryOutcome {
570    /// Migration helper: converts old `found_response: bool` to the new enum.
571    /// `false` maps to `Pending` as a conservative default (the exact error
572    /// was not stored before).
573    fn from(found_response: bool) -> Self {
574        if found_response {
575            JoinNextTryOutcome::Found
576        } else {
577            JoinNextTryOutcome::Pending
578        }
579    }
580}
581
582#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
583#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
584#[serde(rename_all = "snake_case")]
585pub enum HistoryEventScheduleAt {
586    Now,
587    #[display("At(`{_0}`)")]
588    At(DateTime<Utc>),
589    #[display("In({_0:?})")]
590    In(Duration),
591}
592
593#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
594pub enum ScheduleAtConversionError {
595    #[error("source duration value is out of range")]
596    OutOfRangeError,
597}
598
599impl HistoryEventScheduleAt {
600    pub fn as_date_time(
601        &self,
602        now: DateTime<Utc>,
603    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
604        match self {
605            Self::Now => Ok(now),
606            Self::At(date_time) => Ok(*date_time),
607            Self::In(duration) => {
608                let time_delta = TimeDelta::from_std(*duration)
609                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
610                now.checked_add_signed(time_delta)
611                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
612            }
613        }
614    }
615}
616
617#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
618#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
619#[serde(tag = "type", rename_all = "snake_case")]
620pub enum JoinSetRequest {
621    // Must be created by the executor in `PendingState::Locked`.
622    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
623    DelayRequest {
624        delay_id: DelayId,
625        expires_at: DateTime<Utc>,
626        schedule_at: HistoryEventScheduleAt,
627    },
628    // Must be created by the executor in `PendingState::Locked`.
629    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
630    ChildExecutionRequest {
631        child_execution_id: ExecutionIdDerived,
632        target_ffqn: FunctionFqn,
633        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
634        params: Params,
635    },
636}
637
638/// Error that is not specific to an execution.
639#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
640pub enum DbErrorGeneric {
641    #[error("database error: {reason}")]
642    Uncategorized {
643        reason: StrVariant,
644        #[eq(skip)]
645        #[partial_eq(skip)]
646        context: SpanTrace,
647        #[eq(skip)]
648        #[partial_eq(skip)]
649        #[source]
650        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
651        loc: &'static Location<'static>,
652    },
653    #[error("database was closed")]
654    Close,
655}
656
657#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
658pub enum DbErrorWriteNonRetriable {
659    #[error("validation failed: {0}")]
660    ValidationFailed(StrVariant),
661    #[error("conflict")]
662    Conflict,
663    #[error("illegal state: {reason}")]
664    IllegalState {
665        reason: StrVariant,
666        #[eq(skip)]
667        #[partial_eq(skip)]
668        context: SpanTrace,
669        #[eq(skip)]
670        #[partial_eq(skip)]
671        #[source]
672        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
673        loc: &'static Location<'static>,
674    },
675    #[error("version conflict: expected: {expected}, got: {requested}")]
676    VersionConflict {
677        expected: Version,
678        requested: Version,
679    },
680}
681
682/// Write error tied to an execution
683#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
684pub enum DbErrorWrite {
685    #[error("cannot write - row not found")]
686    NotFound,
687    #[error("non-retriable error: {0}")]
688    NonRetriable(#[from] DbErrorWriteNonRetriable),
689    #[error(transparent)]
690    Generic(#[from] DbErrorGeneric),
691}
692
693/// Read error tied to an execution
694#[derive(Debug, Clone, thiserror::Error, PartialEq)]
695pub enum DbErrorRead {
696    #[error("cannot read - row not found")]
697    NotFound,
698    #[error(transparent)]
699    Generic(#[from] DbErrorGeneric),
700}
701
702#[derive(Debug, thiserror::Error, PartialEq)]
703pub enum DbErrorReadWithTimeout {
704    #[error("timeout")]
705    Timeout(TimeoutOutcome),
706    #[error(transparent)]
707    DbErrorRead(#[from] DbErrorRead),
708}
709
710// Represents next version after successfuly appended to execution log.
711// TODO: Convert to struct with next_version
712pub type AppendResponse = Version;
713pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
714
715#[derive(Debug, Clone)]
716pub struct LockedExecution {
717    pub execution_id: ExecutionId,
718    pub next_version: Version,
719    pub metadata: ExecutionMetadata,
720    pub locked_event: Locked,
721    pub ffqn: FunctionFqn,
722    pub params: Params,
723    pub event_history: Vec<(HistoryEvent, Version)>,
724    pub responses: Vec<ResponseWithCursor>,
725    pub parent: Option<(ExecutionId, JoinSetId)>,
726    pub intermittent_event_count: u32,
727}
728
729pub type LockPendingResponse = Vec<LockedExecution>;
730pub type AppendBatchResponse = Version;
731
732#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
733#[display("{event}")]
734pub struct AppendRequest {
735    pub created_at: DateTime<Utc>,
736    pub event: ExecutionRequest,
737}
738
739#[derive(Debug, Clone)]
740pub struct CreateRequest {
741    pub created_at: DateTime<Utc>,
742    pub execution_id: ExecutionId,
743    pub ffqn: FunctionFqn,
744    pub params: Params,
745    pub parent: Option<(ExecutionId, JoinSetId)>,
746    pub scheduled_at: DateTime<Utc>,
747    pub component_id: ComponentId,
748    pub deployment_id: DeploymentId,
749    pub metadata: ExecutionMetadata,
750    pub scheduled_by: Option<ExecutionId>,
751}
752
753impl From<CreateRequest> for ExecutionRequest {
754    fn from(value: CreateRequest) -> Self {
755        Self::Created {
756            ffqn: value.ffqn,
757            params: value.params,
758            parent: value.parent,
759            scheduled_at: value.scheduled_at,
760            component_id: value.component_id,
761            deployment_id: value.deployment_id,
762            metadata: value.metadata,
763            scheduled_by: value.scheduled_by,
764        }
765    }
766}
767
768#[async_trait]
769pub trait DbPool: Send + Sync {
770    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
771
772    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
773
774    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
775
776    #[cfg(feature = "test")]
777    async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
778}
779
780#[async_trait]
781pub trait DbPoolCloseable {
782    async fn close(&self);
783}
784
785#[derive(Clone, Debug)]
786pub struct AppendEventsToExecution {
787    pub execution_id: ExecutionId,
788    pub version: Version,
789    pub batch: Vec<AppendRequest>,
790}
791
792#[derive(Clone, Debug)]
793pub struct AppendResponseToExecution {
794    pub parent_execution_id: ExecutionId,
795    pub created_at: DateTime<Utc>,
796    pub join_set_id: JoinSetId,
797    pub child_execution_id: ExecutionIdDerived,
798    pub finished_version: Version,
799    pub result: SupportedFunctionReturnValue,
800}
801
802#[async_trait]
803pub trait DbExecutor: Send + Sync {
804    #[expect(clippy::too_many_arguments)]
805    async fn lock_pending_by_ffqns(
806        &self,
807        batch_size: u32,
808        pending_at_or_sooner: DateTime<Utc>,
809        ffqns: Arc<[FunctionFqn]>,
810        created_at: DateTime<Utc>,
811        component_id: ComponentId,
812        deployment_id: DeploymentId,
813        executor_id: ExecutorId,
814        lock_expires_at: DateTime<Utc>,
815        run_id: RunId,
816        retry_config: ComponentRetryConfig,
817    ) -> Result<LockPendingResponse, DbErrorWrite>;
818
819    #[expect(clippy::too_many_arguments)]
820    async fn lock_pending_by_component_digest(
821        &self,
822        batch_size: u32,
823        pending_at_or_sooner: DateTime<Utc>,
824        component_id: &ComponentId,
825        deployment_id: DeploymentId,
826        created_at: DateTime<Utc>,
827        executor_id: ExecutorId,
828        lock_expires_at: DateTime<Utc>,
829        run_id: RunId,
830        retry_config: ComponentRetryConfig,
831    ) -> Result<LockPendingResponse, DbErrorWrite>;
832
833    #[cfg(feature = "test")]
834    #[expect(clippy::too_many_arguments)]
835    async fn lock_one(
836        &self,
837        created_at: DateTime<Utc>,
838        component_id: ComponentId,
839        deployment_id: DeploymentId,
840        execution_id: &ExecutionId,
841        run_id: RunId,
842        version: Version,
843        executor_id: ExecutorId,
844        lock_expires_at: DateTime<Utc>,
845        retry_config: ComponentRetryConfig,
846    ) -> Result<LockedExecution, DbErrorWrite>;
847
848    /// Append a single event to an existing execution log.
849    /// The request cannot contain `ExecutionEventInner::Created`.
850    async fn append(
851        &self,
852        execution_id: ExecutionId,
853        version: Version,
854        req: AppendRequest,
855    ) -> Result<AppendResponse, DbErrorWrite>;
856
857    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
858    /// The batch cannot contain `ExecutionEventInner::Created`.
859    async fn append_batch_respond_to_parent(
860        &self,
861        events: AppendEventsToExecution,
862        response: AppendResponseToExecution,
863        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
864    ) -> Result<AppendBatchResponse, DbErrorWrite>;
865
866    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
867    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
868    /// Otherwise wait until `timeout_fut` resolves.
869    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
870    async fn wait_for_pending_by_ffqn(
871        &self,
872        pending_at_or_sooner: DateTime<Utc>,
873        ffqns: Arc<[FunctionFqn]>,
874        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
875    );
876
877    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
878    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
879    /// Otherwise wait until `timeout_fut` resolves.
880    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
881    async fn wait_for_pending_by_component_digest(
882        &self,
883        pending_at_or_sooner: DateTime<Utc>,
884        component_digest: &InputContentDigest,
885        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
886    );
887
888    async fn cancel_activity_with_retries(
889        &self,
890        execution_id: &ExecutionId,
891        cancelled_at: DateTime<Utc>,
892    ) -> Result<CancelOutcome, DbErrorWrite> {
893        let mut retries = 5;
894        loop {
895            let res = self.cancel_activity(execution_id, cancelled_at).await;
896            if res.is_ok() || retries == 0 {
897                return res;
898            }
899            retries -= 1;
900        }
901    }
902
903    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
904    async fn get_last_execution_event(
905        &self,
906        execution_id: &ExecutionId,
907    ) -> Result<ExecutionEvent, DbErrorRead>;
908
909    async fn cancel_activity(
910        &self,
911        execution_id: &ExecutionId,
912        cancelled_at: DateTime<Utc>,
913    ) -> Result<CancelOutcome, DbErrorWrite> {
914        debug!("Determining cancellation state of {execution_id}");
915
916        let last_event = self
917            .get_last_execution_event(execution_id)
918            .await
919            .map_err(DbErrorWrite::from)?;
920        if let ExecutionRequest::Finished {
921            result:
922                SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
923                    kind: ExecutionFailureKind::Cancelled,
924                    ..
925                }),
926            ..
927        } = last_event.event
928        {
929            return Ok(CancelOutcome::Cancelled);
930        } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
931            debug!("Not cancelling, {execution_id} is already finished");
932            return Ok(CancelOutcome::AlreadyFinished);
933        }
934        let finished_version = last_event.version.increment();
935        let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
936            reason: None,
937            kind: ExecutionFailureKind::Cancelled,
938            detail: None,
939        });
940        let cancel_request = AppendRequest {
941            created_at: cancelled_at,
942            event: ExecutionRequest::Finished {
943                result: child_result.clone(),
944                http_client_traces: None,
945            },
946        };
947        debug!("Cancelling activity {execution_id} at {finished_version}");
948        if let ExecutionId::Derived(execution_id) = execution_id {
949            let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
950            let child_execution_id = ExecutionId::Derived(execution_id.clone());
951            self.append_batch_respond_to_parent(
952                AppendEventsToExecution {
953                    execution_id: child_execution_id,
954                    version: finished_version.clone(),
955                    batch: vec![cancel_request],
956                },
957                AppendResponseToExecution {
958                    parent_execution_id,
959                    created_at: cancelled_at,
960                    join_set_id: join_set_id.clone(),
961                    child_execution_id: execution_id.clone(),
962                    finished_version,
963                    result: child_result,
964                },
965                cancelled_at,
966            )
967            .await?;
968        } else {
969            self.append(execution_id.clone(), finished_version, cancel_request)
970                .await?;
971        }
972        debug!("Cancelled {execution_id}");
973        Ok(CancelOutcome::Cancelled)
974    }
975}
976
977pub enum AppendDelayResponseOutcome {
978    Success,
979    AlreadyFinished,
980    AlreadyCancelled,
981}
982
983#[derive(Debug, Clone, Default)]
984pub struct ListExecutionsFilter {
985    pub ffqn_prefix: Option<String>,
986    pub show_derived: bool,
987    pub hide_finished: bool,
988    pub execution_id_prefix: Option<String>,
989    pub component_digest: Option<InputContentDigest>,
990    pub deployment_id: Option<DeploymentId>,
991}
992
993#[async_trait]
994pub trait DbExternalApi: DbConnection {
995    /// Get the latest backtrace if version is not set.
996    async fn get_backtrace(
997        &self,
998        execution_id: &ExecutionId,
999        filter: BacktraceFilter,
1000    ) -> Result<BacktraceInfo, DbErrorRead>;
1001
1002    /// Returns executions sorted in descending order.
1003    async fn list_executions(
1004        &self,
1005        filter: ListExecutionsFilter,
1006        pagination: ExecutionListPagination,
1007    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
1008
1009    /// Returns execution events for the given execution.
1010    ///
1011    /// Results are always ordered from oldest to newest (ascending by version),
1012    /// regardless of pagination direction.
1013    async fn list_execution_events(
1014        &self,
1015        execution_id: &ExecutionId,
1016        pagination: Pagination<VersionType>,
1017        include_backtrace_id: bool,
1018    ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
1019
1020    /// Returns responses of an execution ordered as they arrived,
1021    /// enabling matching each `JoinNext` to its corresponding response.
1022    ///
1023    /// Results are always ordered from oldest to newest (ascending by cursor),
1024    /// regardless of pagination direction.
1025    ///
1026    /// As an optimization, the implementation can return an empty list of `responses`
1027    /// and `max_cursor` set to 0 if the execution is not found.
1028    async fn list_responses(
1029        &self,
1030        execution_id: &ExecutionId,
1031        pagination: Pagination<u32>,
1032    ) -> Result<ListResponsesResponse, DbErrorRead>;
1033
1034    async fn list_execution_events_responses(
1035        &self,
1036        execution_id: &ExecutionId,
1037        req_since: &Version,
1038        req_max_length: VersionType,
1039        req_include_backtrace_id: bool,
1040        resp_pagination: Pagination<VersionType>,
1041    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
1042
1043    async fn upgrade_execution_component(
1044        &self,
1045        execution_id: &ExecutionId,
1046        old: &InputContentDigest,
1047        new: &InputContentDigest,
1048    ) -> Result<(), DbErrorWrite>;
1049
1050    async fn list_logs(
1051        &self,
1052        execution_id: &ExecutionId,
1053        filter: LogFilter,
1054        pagination: Pagination<u32>,
1055    ) -> Result<ListLogsResponse, DbErrorRead>;
1056
1057    async fn list_deployment_states(
1058        &self,
1059        current_time: DateTime<Utc>,
1060        pagination: Pagination<Option<DeploymentId>>,
1061    ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1062
1063    /// Pause an execution. Only pending executions can be paused.
1064    async fn pause_execution(
1065        &self,
1066        execution_id: &ExecutionId,
1067        paused_at: DateTime<Utc>,
1068    ) -> Result<AppendResponse, DbErrorWrite>;
1069
1070    /// Unpause an execution. Only paused executions can be unpaused.
1071    async fn unpause_execution(
1072        &self,
1073        execution_id: &ExecutionId,
1074        unpaused_at: DateTime<Utc>,
1075    ) -> Result<AppendResponse, DbErrorWrite>;
1076}
1077pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1078pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1079    Pagination::OlderThan {
1080        length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1081        cursor: None,
1082        including_cursor: false,
1083    };
1084
1085pub struct DeploymentState {
1086    pub deployment_id: DeploymentId,
1087    pub locked: u32,
1088    // In `PendingAt` state, scheduled to present or past
1089    pub pending: u32,
1090    // In `PendingAt` state, scheduled into the future
1091    pub scheduled: u32,
1092    pub blocked: u32,
1093    pub finished: u32,
1094}
1095impl DeploymentState {
1096    #[must_use]
1097    pub fn new(deployment_id: DeploymentId) -> Self {
1098        DeploymentState {
1099            deployment_id,
1100            locked: 0,
1101            pending: 0,
1102            scheduled: 0,
1103            blocked: 0,
1104            finished: 0,
1105        }
1106    }
1107}
1108
1109#[derive(Debug)]
1110pub struct ListLogsResponse {
1111    pub items: Vec<LogEntryRow>,
1112    pub next_page: Pagination<u32>, // Newer logs can always arrive e.g. via replay
1113    pub prev_page: Option<Pagination<u32>>, // None if we are already at the beginning
1114}
1115
1116#[derive(Debug)]
1117pub struct LogFilter {
1118    show_logs: bool,
1119    show_streams: bool,
1120    levels: Vec<LogLevel>, // Only applied if `show_logs` = true, empty means return all levels.
1121    stream_types: Vec<LogStreamType>, // Only applied if `show_streams` = true, empty means return all stream types.
1122}
1123impl LogFilter {
1124    // Constructor for logs only
1125    #[must_use]
1126    pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1127        LogFilter {
1128            show_logs: true,
1129            show_streams: false,
1130            levels,
1131            stream_types: Vec::new(),
1132        }
1133    }
1134    // Constructor for streams only
1135    #[must_use]
1136    pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1137        LogFilter {
1138            show_logs: false,
1139            show_streams: true,
1140            levels: Vec::new(),
1141            stream_types,
1142        }
1143    }
1144    // Constructor for both logs and streams
1145    #[must_use]
1146    pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1147        LogFilter {
1148            show_logs: true,
1149            show_streams: true,
1150            levels,
1151            stream_types,
1152        }
1153    }
1154    // Getters
1155    #[must_use]
1156    pub fn should_show_logs(&self) -> bool {
1157        self.show_logs
1158    }
1159    #[must_use]
1160    pub fn should_show_streams(&self) -> bool {
1161        self.show_streams
1162    }
1163    #[must_use]
1164    pub fn levels(&self) -> &Vec<LogLevel> {
1165        &self.levels
1166    }
1167    #[must_use]
1168    pub fn stream_types(&self) -> &Vec<LogStreamType> {
1169        &self.stream_types
1170    }
1171}
1172
1173#[derive(Debug, Clone)]
1174pub struct ExecutionWithStateRequestsResponses {
1175    pub execution_with_state: ExecutionWithState,
1176    pub events: Vec<ExecutionEvent>,
1177    pub responses: Vec<ResponseWithCursor>,
1178    pub max_version: Version,
1179    pub max_cursor: ResponseCursor,
1180}
1181
1182#[async_trait]
1183pub trait DbConnection: DbExecutor {
1184    /// Get execution log.
1185    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1186
1187    async fn append_delay_response(
1188        &self,
1189        created_at: DateTime<Utc>,
1190        execution_id: ExecutionId,
1191        join_set_id: JoinSetId,
1192        delay_id: DelayId,
1193        outcome: Result<(), ()>, // Successfully finished - `Ok(())` or cancelled - `Err(())`
1194    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1195
1196    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
1197    /// The batch cannot contain `ExecutionEventInner::Created`.
1198    async fn append_batch(
1199        &self,
1200        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1201        batch: Vec<AppendRequest>,
1202        execution_id: ExecutionId,
1203        version: Version,
1204    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1205
1206    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
1207    /// The batch cannot contain `ExecutionEventInner::Created`.
1208    async fn append_batch_create_new_execution(
1209        &self,
1210        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1211        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
1212        execution_id: ExecutionId,
1213        version: Version,
1214        child_req: Vec<CreateRequest>,
1215        backtraces: Vec<BacktraceInfo>,
1216    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1217
1218    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
1219    async fn get_execution_event(
1220        &self,
1221        execution_id: &ExecutionId,
1222        version: &Version,
1223    ) -> Result<ExecutionEvent, DbErrorRead>;
1224
1225    #[instrument(skip(self))]
1226    async fn get_create_request(
1227        &self,
1228        execution_id: &ExecutionId,
1229    ) -> Result<CreateRequest, DbErrorRead> {
1230        let execution_event = self
1231            .get_execution_event(execution_id, &Version::new(0))
1232            .await?;
1233        if let ExecutionRequest::Created {
1234            ffqn,
1235            params,
1236            parent,
1237            scheduled_at,
1238            component_id,
1239            deployment_id,
1240            metadata,
1241            scheduled_by,
1242        } = execution_event.event
1243        {
1244            Ok(CreateRequest {
1245                created_at: execution_event.created_at,
1246                execution_id: execution_id.clone(),
1247                ffqn,
1248                params,
1249                parent,
1250                scheduled_at,
1251                component_id,
1252                deployment_id,
1253                metadata,
1254                scheduled_by,
1255            })
1256        } else {
1257            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1258                reason: "execution log must start with creation".into(),
1259                context: SpanTrace::capture(),
1260                source: None,
1261                loc: Location::caller(),
1262            }))
1263        }
1264    }
1265
1266    async fn get_pending_state(
1267        &self,
1268        execution_id: &ExecutionId,
1269    ) -> Result<ExecutionWithState, DbErrorRead>;
1270
1271    /// Get currently expired locks and async timers (delay requests)
1272    async fn get_expired_timers(
1273        &self,
1274        at: DateTime<Utc>,
1275    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1276
1277    /// Create a new execution log
1278    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1279
1280    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
1281    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
1282    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
1283    /// Implementations with no pubsub support should use polling.
1284    /// Callers are expected to call this function in a loop with a reasonable timeout
1285    /// to support less stellar implementations.
1286    async fn subscribe_to_next_responses(
1287        &self,
1288        execution_id: &ExecutionId,
1289        last_response: ResponseCursor,
1290        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1291    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1292
1293    /// First, attempt to fetch the finished value. If the execution is not finished yet, poll
1294    /// periodically or subscribe to db changes, racing with `timeout_fut`.
1295    /// Notification mechainism with no strict guarantees for getting the finished result.
1296    /// Implementations with no pubsub support should use polling.
1297    /// Callers are expected to call this function in a loop with a reasonable timeout
1298    /// to support less stellar implementations.
1299    async fn wait_for_finished_result(
1300        &self,
1301        execution_id: &ExecutionId,
1302        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1303    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1304
1305    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1306
1307    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1308
1309    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1310
1311    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1312
1313    /// Returns `TimeoutOutcome::Timeout` if not in Finished state.
1314    #[cfg(feature = "test")]
1315    async fn get_finished_result(
1316        &self,
1317        execution_id: &ExecutionId,
1318    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1319        self.wait_for_finished_result(
1320            execution_id,
1321            Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1322        )
1323        .await
1324    }
1325}
1326
1327#[derive(Clone, Debug)]
1328pub struct LogInfoAppendRow {
1329    pub execution_id: ExecutionId,
1330    pub run_id: RunId,
1331    pub log_entry: LogEntry,
1332}
1333
1334#[derive(Debug, Clone)]
1335pub struct LogEntryRow {
1336    pub cursor: u32,
1337    pub run_id: RunId,
1338    pub log_entry: LogEntry,
1339}
1340
1341#[derive(Debug, Clone)]
1342pub enum LogEntry {
1343    Log {
1344        created_at: DateTime<Utc>,
1345        level: LogLevel,
1346        message: String,
1347    },
1348    Stream {
1349        created_at: DateTime<Utc>,
1350        payload: Vec<u8>,
1351        stream_type: LogStreamType,
1352    },
1353}
1354impl LogEntry {
1355    #[must_use]
1356    pub fn created_at(&self) -> DateTime<Utc> {
1357        match self {
1358            LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1359        }
1360    }
1361}
1362
1363#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1364#[try_from(repr)]
1365#[repr(u8)]
1366pub enum LogLevel {
1367    Trace = 1,
1368    Debug,
1369    Info,
1370    Warn,
1371    Error,
1372}
1373#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1374#[try_from(repr)]
1375#[repr(u8)]
1376pub enum LogStreamType {
1377    StdOut = 1,
1378    StdErr,
1379}
1380
1381#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1382pub enum TimeoutOutcome {
1383    Timeout,
1384    Cancel,
1385}
1386
1387#[cfg(feature = "test")]
1388#[async_trait]
1389pub trait DbConnectionTest: DbConnection {
1390    async fn append_response(
1391        &self,
1392        created_at: DateTime<Utc>,
1393        execution_id: ExecutionId,
1394        response_event: JoinSetResponseEvent,
1395    ) -> Result<(), DbErrorWrite>;
1396}
1397
1398#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1399pub enum CancelOutcome {
1400    Cancelled,
1401    AlreadyFinished,
1402}
1403
1404#[instrument(skip(db_connection))]
1405pub async fn stub_execution(
1406    db_connection: &dyn DbConnection,
1407    execution_id: ExecutionIdDerived,
1408    parent_execution_id: ExecutionId,
1409    join_set_id: JoinSetId,
1410    created_at: DateTime<Utc>,
1411    return_value: SupportedFunctionReturnValue,
1412) -> Result<(), DbErrorWrite> {
1413    let stub_finished_version = Version::new(1); // Stub activities have no execution log except Created event.
1414    // Attempt to write to `execution_id` and its parent, ignoring the possible conflict error on this tx
1415    let write_attempt = {
1416        let finished_req = AppendRequest {
1417            created_at,
1418            event: ExecutionRequest::Finished {
1419                result: return_value.clone(),
1420                http_client_traces: None,
1421            },
1422        };
1423        db_connection
1424            .append_batch_respond_to_parent(
1425                AppendEventsToExecution {
1426                    execution_id: ExecutionId::Derived(execution_id.clone()),
1427                    version: stub_finished_version.clone(),
1428                    batch: vec![finished_req],
1429                },
1430                AppendResponseToExecution {
1431                    parent_execution_id,
1432                    created_at,
1433                    join_set_id,
1434                    child_execution_id: execution_id.clone(),
1435                    finished_version: stub_finished_version.clone(),
1436                    result: return_value.clone(),
1437                },
1438                created_at,
1439            )
1440            .await
1441    };
1442    if let Err(write_attempt) = write_attempt {
1443        // Check that the expected value is in the database
1444        debug!("Stub write attempt failed - {write_attempt:?}");
1445
1446        let found = db_connection
1447            .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1448            .await?; // Not found at this point should not happen, unless the previous write failed. Will be retried.
1449        match found.event {
1450            ExecutionRequest::Finished {
1451                result: found_result,
1452                ..
1453            } if return_value == found_result => {
1454                // Same value has already be written, RPC is successful.
1455                Ok(())
1456            }
1457            ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1458                DbErrorWriteNonRetriable::Conflict,
1459            )),
1460            _other => Err(DbErrorWrite::NonRetriable(
1461                DbErrorWriteNonRetriable::IllegalState {
1462                    reason: "unexpected execution event at stubbed execution".into(),
1463                    context: SpanTrace::capture(),
1464                    source: None,
1465                    loc: Location::caller(),
1466                },
1467            )),
1468        }
1469    } else {
1470        Ok(())
1471    }
1472}
1473
1474pub async fn cancel_delay(
1475    db_connection: &dyn DbConnection,
1476    delay_id: DelayId,
1477    created_at: DateTime<Utc>,
1478) -> Result<CancelOutcome, DbErrorWrite> {
1479    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1480    db_connection
1481        .append_delay_response(
1482            created_at,
1483            parent_execution_id,
1484            join_set_id,
1485            delay_id,
1486            Err(()), // Mark as cancelled.
1487        )
1488        .await
1489        .map(|ok| match ok {
1490            AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1491                CancelOutcome::Cancelled
1492            }
1493            AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1494        })
1495}
1496
1497#[derive(Clone, Debug)]
1498pub enum BacktraceFilter {
1499    First,
1500    Last,
1501    Specific(Version),
1502}
1503
1504#[derive(Clone, Debug, PartialEq, Eq)]
1505pub struct BacktraceInfo {
1506    pub execution_id: ExecutionId,
1507    pub component_id: ComponentId,
1508    pub version_min_including: Version,
1509    pub version_max_excluding: Version,
1510    pub wasm_backtrace: WasmBacktrace,
1511}
1512
1513#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1514pub struct WasmBacktrace {
1515    pub frames: Vec<FrameInfo>,
1516}
1517
1518#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1519pub struct FrameInfo {
1520    pub module: String,
1521    pub func_name: String,
1522    pub symbols: Vec<FrameSymbol>,
1523}
1524
1525#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1526pub struct FrameSymbol {
1527    pub func_name: Option<String>,
1528    pub file: Option<String>,
1529    pub line: Option<u32>,
1530    pub col: Option<u32>,
1531}
1532
1533mod wasm_backtrace {
1534    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1535
1536    impl WasmBacktrace {
1537        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1538            if backtrace.frames().is_empty() {
1539                None
1540            } else {
1541                Some(Self {
1542                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1543                })
1544            }
1545        }
1546    }
1547
1548    impl From<&wasmtime::FrameInfo> for FrameInfo {
1549        fn from(frame: &wasmtime::FrameInfo) -> Self {
1550            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1551            let mut func_name = String::new();
1552            wasmtime_environ::demangle_function_name_or_index(
1553                &mut func_name,
1554                frame.func_name(),
1555                frame.func_index() as usize,
1556            )
1557            .expect("writing to string must succeed");
1558            Self {
1559                module: module_name,
1560                func_name,
1561                symbols: frame
1562                    .symbols()
1563                    .iter()
1564                    .map(std::convert::Into::into)
1565                    .collect(),
1566            }
1567        }
1568    }
1569
1570    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1571        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1572            let func_name = symbol.name().map(|name| {
1573                let mut writer = String::new();
1574                wasmtime_environ::demangle_function_name(&mut writer, name)
1575                    .expect("writing to string must succeed");
1576                writer
1577            });
1578
1579            Self {
1580                func_name,
1581                file: symbol.file().map(ToString::to_string),
1582                line: symbol.line(),
1583                col: symbol.column(),
1584            }
1585        }
1586    }
1587}
1588#[derive(Debug, Clone, derive_more::Display)]
1589#[display("{execution_id} {pending_state} {component_digest}")]
1590pub struct ExecutionWithState {
1591    pub execution_id: ExecutionId,
1592    pub ffqn: FunctionFqn,
1593    pub pending_state: PendingState,
1594    pub created_at: DateTime<Utc>,
1595    pub first_scheduled_at: DateTime<Utc>,
1596    pub component_digest: InputContentDigest,
1597    pub component_type: ComponentType,
1598    pub deployment_id: DeploymentId,
1599}
1600
1601#[derive(Debug, Clone)]
1602pub enum ExecutionListPagination {
1603    CreatedBy(Pagination<Option<DateTime<Utc>>>),
1604    ExecutionId(Pagination<Option<ExecutionId>>),
1605}
1606impl Default for ExecutionListPagination {
1607    fn default() -> ExecutionListPagination {
1608        ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1609            length: 20,
1610            cursor: None,
1611            including_cursor: false, // does not matter when `cursor` is not specified
1612        })
1613    }
1614}
1615impl ExecutionListPagination {
1616    #[must_use]
1617    pub fn length(&self) -> u16 {
1618        match self {
1619            ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1620            ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1621        }
1622    }
1623}
1624
1625#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1626pub enum Pagination<T> {
1627    NewerThan {
1628        length: u16,
1629        cursor: T,
1630        including_cursor: bool,
1631    },
1632    OlderThan {
1633        length: u16,
1634        cursor: T,
1635        including_cursor: bool,
1636    },
1637}
1638impl<T: Clone> Pagination<T> {
1639    pub fn length(&self) -> u16 {
1640        match self {
1641            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1642        }
1643    }
1644
1645    pub fn rel(&self) -> &'static str {
1646        match self {
1647            Pagination::NewerThan {
1648                including_cursor: false,
1649                ..
1650            } => ">",
1651            Pagination::NewerThan {
1652                including_cursor: true,
1653                ..
1654            } => ">=",
1655            Pagination::OlderThan {
1656                including_cursor: false,
1657                ..
1658            } => "<",
1659            Pagination::OlderThan {
1660                including_cursor: true,
1661                ..
1662            } => "<=",
1663        }
1664    }
1665
1666    pub fn is_desc(&self) -> bool {
1667        matches!(self, Pagination::OlderThan { .. })
1668    }
1669
1670    pub fn asc_or_desc(&self) -> &'static str {
1671        if self.is_asc() { "asc" } else { "desc" }
1672    }
1673
1674    pub fn is_asc(&self) -> bool {
1675        !self.is_desc()
1676    }
1677
1678    pub fn cursor(&self) -> &T {
1679        match self {
1680            Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1681        }
1682    }
1683
1684    #[must_use]
1685    pub fn invert(&self) -> Self {
1686        match self {
1687            Pagination::NewerThan {
1688                length,
1689                cursor,
1690                including_cursor,
1691            } => Pagination::OlderThan {
1692                length: *length,
1693                cursor: cursor.clone(),
1694                including_cursor: !including_cursor,
1695            },
1696            Pagination::OlderThan {
1697                length,
1698                cursor,
1699                including_cursor,
1700            } => Pagination::NewerThan {
1701                length: *length,
1702                cursor: cursor.clone(),
1703                including_cursor: !including_cursor,
1704            },
1705        }
1706    }
1707}
1708
1709#[cfg(feature = "test")]
1710pub async fn wait_for_pending_state_fn<T: Debug>(
1711    db_connection: &dyn DbConnectionTest,
1712    execution_id: &ExecutionId,
1713    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1714    timeout: Option<Duration>,
1715) -> Result<T, DbErrorReadWithTimeout> {
1716    tracing::trace!(%execution_id, "Waiting for predicate");
1717    let fut = async move {
1718        loop {
1719            let execution_log = db_connection.get(execution_id).await?;
1720            if let Some(t) = predicate(execution_log) {
1721                tracing::debug!(%execution_id, "Found: {t:?}");
1722                return Ok(t);
1723            }
1724            tokio::time::sleep(Duration::from_millis(10)).await;
1725        }
1726    };
1727
1728    if let Some(timeout) = timeout {
1729        tokio::select! { // future's liveness: Dropping the loser immediately.
1730            res = fut => res,
1731            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
1732        }
1733    } else {
1734        fut.await
1735    }
1736}
1737
1738#[derive(Debug, Clone, PartialEq, Eq)]
1739pub enum ExpiredTimer {
1740    Lock(ExpiredLock),
1741    Delay(ExpiredDelay),
1742}
1743
1744#[derive(Debug, Clone, PartialEq, Eq)]
1745pub struct ExpiredLock {
1746    pub execution_id: ExecutionId,
1747    // Version of last `Locked` event, used to detect whether the execution made progress.
1748    pub locked_at_version: Version,
1749    pub next_version: Version,
1750    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1751    pub intermittent_event_count: u32,
1752    pub max_retries: Option<u32>,
1753    pub retry_exp_backoff: Duration,
1754    pub locked_by: LockedBy,
1755}
1756
1757#[derive(Debug, Clone, PartialEq, Eq)]
1758pub struct ExpiredDelay {
1759    pub execution_id: ExecutionId,
1760    pub join_set_id: JoinSetId,
1761    pub delay_id: DelayId,
1762}
1763
1764#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1765#[serde(tag = "status", rename_all = "snake_case")]
1766pub enum PendingState {
1767    /// Caused by [`ExecutionRequest::Locked`].
1768    Locked(PendingStateLocked),
1769
1770    #[display("PendingAt(`{_0}`)")]
1771    PendingAt(PendingStatePendingAt),
1772
1773    /// Caused by [`HistoryEvent::JoinNext`]
1774    #[display("BlockedByJoinSet({_0})")]
1775    BlockedByJoinSet(PendingStateBlockedByJoinSet),
1776
1777    #[display("Paused({_0})")]
1778    Paused(PendingStatePaused),
1779
1780    #[display("Finished({_0})")]
1781    Finished(PendingStateFinished),
1782}
1783
1784pub enum PendingStateMergedPause {
1785    Locked {
1786        state: PendingStateLocked,
1787        paused: bool,
1788    },
1789    PendingAt {
1790        state: PendingStatePendingAt,
1791        paused: bool,
1792    },
1793    BlockedByJoinSet {
1794        state: PendingStateBlockedByJoinSet,
1795        paused: bool,
1796    },
1797    Finished(PendingStateFinished),
1798}
1799impl From<PendingState> for PendingStateMergedPause {
1800    fn from(state: PendingState) -> Self {
1801        match state {
1802            PendingState::Locked(s) => PendingStateMergedPause::Locked {
1803                state: s,
1804                paused: false,
1805            },
1806
1807            PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
1808                state: s,
1809                paused: false,
1810            },
1811
1812            PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
1813                state: s,
1814                paused: false,
1815            },
1816
1817            PendingState::Paused(paused) => match paused {
1818                PendingStatePaused::Locked(s) => PendingStateMergedPause::Locked {
1819                    state: s,
1820                    paused: true,
1821                },
1822                PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
1823                    state: s,
1824                    paused: true,
1825                },
1826                PendingStatePaused::BlockedByJoinSet(s) => {
1827                    PendingStateMergedPause::BlockedByJoinSet {
1828                        state: s,
1829                        paused: true,
1830                    }
1831                }
1832            },
1833
1834            PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
1835        }
1836    }
1837}
1838
1839#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1840#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1841pub struct PendingStateLocked {
1842    pub locked_by: LockedBy,
1843    pub lock_expires_at: DateTime<Utc>,
1844}
1845
1846#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1847#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
1848pub struct PendingStatePendingAt {
1849    pub scheduled_at: DateTime<Utc>,
1850    /// `last_lock` is needed for lock extension.
1851    pub last_lock: Option<LockedBy>,
1852}
1853
1854#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1855#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
1856pub struct PendingStateBlockedByJoinSet {
1857    pub join_set_id: JoinSetId,
1858    /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1859    pub lock_expires_at: DateTime<Utc>,
1860    /// Blocked by closing of the join set
1861    pub closing: bool,
1862}
1863
1864/// State of execution before it was paused.
1865#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1866pub enum PendingStatePaused {
1867    #[display("Locked({_0})")]
1868    Locked(PendingStateLocked),
1869    #[display("PendingAt({_0})")]
1870    PendingAt(PendingStatePendingAt),
1871    #[display("BlockedByJoinSet({_0})")]
1872    BlockedByJoinSet(PendingStateBlockedByJoinSet),
1873}
1874
1875#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1876pub struct LockedBy {
1877    pub executor_id: ExecutorId,
1878    pub run_id: RunId,
1879}
1880impl From<&Locked> for LockedBy {
1881    fn from(value: &Locked) -> Self {
1882        LockedBy {
1883            executor_id: value.executor_id,
1884            run_id: value.run_id,
1885        }
1886    }
1887}
1888
1889#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
1890#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
1891pub struct PendingStateFinished {
1892    pub version: VersionType, // not Version since it must be Copy
1893    pub finished_at: DateTime<Utc>,
1894    pub result_kind: PendingStateFinishedResultKind,
1895}
1896impl Display for PendingStateFinished {
1897    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1898        match self.result_kind {
1899            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1900            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1901        }
1902    }
1903}
1904
1905// This is not a Result so that it can be customized for serialization
1906#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1907#[serde(rename_all = "snake_case")]
1908pub enum PendingStateFinishedResultKind {
1909    Ok,
1910    Err(PendingStateFinishedError),
1911}
1912impl PendingStateFinishedResultKind {
1913    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1914        match self {
1915            PendingStateFinishedResultKind::Ok => Ok(()),
1916            PendingStateFinishedResultKind::Err(err) => Err(err),
1917        }
1918    }
1919}
1920
1921impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1922    fn from(result: &SupportedFunctionReturnValue) -> Self {
1923        result.as_pending_state_finished_result()
1924    }
1925}
1926
1927#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1928#[serde(rename_all = "snake_case")]
1929pub enum PendingStateFinishedError {
1930    #[display("execution terminated: {_0}")]
1931    ExecutionFailure(ExecutionFailureKind),
1932    #[display("execution completed with an error")]
1933    Error,
1934}
1935
1936impl PendingState {
1937    #[instrument(skip(self))]
1938    pub fn can_append_lock(
1939        &self,
1940        created_at: DateTime<Utc>,
1941        executor_id: ExecutorId,
1942        run_id: RunId,
1943        lock_expires_at: DateTime<Utc>,
1944    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1945        if lock_expires_at <= created_at {
1946            return Err(DbErrorWriteNonRetriable::ValidationFailed(
1947                "invalid expiry date".into(),
1948            ));
1949        }
1950        match self {
1951            PendingState::PendingAt(PendingStatePendingAt {
1952                scheduled_at,
1953                last_lock,
1954            }) => {
1955                if *scheduled_at <= created_at {
1956                    // pending now, ok to lock
1957                    Ok(LockKind::CreatingNewLock)
1958                } else if let Some(LockedBy {
1959                    executor_id: last_executor_id,
1960                    run_id: last_run_id,
1961                }) = last_lock
1962                    && executor_id == *last_executor_id
1963                    && run_id == *last_run_id
1964                {
1965                    // Original executor is extending the lock.
1966                    Ok(LockKind::Extending)
1967                } else {
1968                    Err(DbErrorWriteNonRetriable::ValidationFailed(
1969                        "cannot lock, not yet pending".into(),
1970                    ))
1971                }
1972            }
1973            PendingState::Locked(PendingStateLocked {
1974                locked_by:
1975                    LockedBy {
1976                        executor_id: current_pending_state_executor_id,
1977                        run_id: current_pending_state_run_id,
1978                    },
1979                lock_expires_at: _,
1980            }) => {
1981                if executor_id == *current_pending_state_executor_id
1982                    && run_id == *current_pending_state_run_id
1983                {
1984                    // Original executor is extending the lock.
1985                    Ok(LockKind::Extending)
1986                } else {
1987                    Err(DbErrorWriteNonRetriable::IllegalState {
1988                        reason: "cannot lock, already locked".into(),
1989                        context: SpanTrace::capture(),
1990                        source: None,
1991                        loc: Location::caller(),
1992                    })
1993                }
1994            }
1995            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
1996                reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1997                context: SpanTrace::capture(),
1998                source: None,
1999                loc: Location::caller(),
2000            }),
2001            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2002                reason: "already finished".into(),
2003                context: SpanTrace::capture(),
2004                source: None,
2005                loc: Location::caller(),
2006            }),
2007            PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
2008                reason: "cannot lock, execution is paused".into(),
2009                context: SpanTrace::capture(),
2010                source: None,
2011                loc: Location::caller(),
2012            }),
2013        }
2014    }
2015
2016    #[must_use]
2017    pub fn is_finished(&self) -> bool {
2018        matches!(self, PendingState::Finished { .. })
2019    }
2020
2021    #[must_use]
2022    pub fn is_paused(&self) -> bool {
2023        matches!(self, PendingState::Paused(_))
2024    }
2025}
2026
2027#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2028pub enum LockKind {
2029    Extending,
2030    CreatingNewLock,
2031}
2032
2033pub mod http_client_trace {
2034    use chrono::{DateTime, Utc};
2035    use serde::{Deserialize, Serialize};
2036
2037    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2038    pub struct HttpClientTrace {
2039        pub req: RequestTrace,
2040        pub resp: Option<ResponseTrace>,
2041    }
2042
2043    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2044    pub struct RequestTrace {
2045        pub sent_at: DateTime<Utc>,
2046        pub uri: String,
2047        pub method: String,
2048    }
2049
2050    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2051    pub struct ResponseTrace {
2052        pub finished_at: DateTime<Utc>,
2053        pub status: Result<u16, String>,
2054    }
2055}
2056
2057#[cfg(test)]
2058mod tests {
2059    use super::HistoryEvent;
2060    use super::HistoryEventScheduleAt;
2061    use super::JoinNextTryOutcome;
2062    use super::PendingStateFinished;
2063    use super::PendingStateFinishedError;
2064    use super::PendingStateFinishedResultKind;
2065    use crate::ExecutionFailureKind;
2066    use crate::JoinSetId;
2067    use crate::SupportedFunctionReturnValue;
2068    use chrono::DateTime;
2069    use chrono::Datelike;
2070    use chrono::Utc;
2071    use insta::assert_snapshot;
2072    use rstest::rstest;
2073    use std::time::Duration;
2074    use val_json::type_wrapper::TypeWrapper;
2075    use val_json::wast_val::WastVal;
2076    use val_json::wast_val::WastValWithType;
2077
2078    #[rstest(expected => [
2079        PendingStateFinishedResultKind::Ok,
2080        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2081    ])]
2082    #[test]
2083    fn serde_pending_state_finished_result_kind_should_work(
2084        expected: PendingStateFinishedResultKind,
2085    ) {
2086        let ser = serde_json::to_string(&expected).unwrap();
2087        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2088        assert_eq!(expected, actual);
2089    }
2090
2091    #[rstest(result_kind => [
2092        PendingStateFinishedResultKind::Ok,
2093        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2094    ])]
2095    #[test]
2096    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2097        let expected = PendingStateFinished {
2098            version: 0,
2099            finished_at: Utc::now(),
2100            result_kind,
2101        };
2102
2103        let ser = serde_json::to_string(&expected).unwrap();
2104        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2105        assert_eq!(expected, actual);
2106    }
2107
2108    #[test]
2109    fn join_set_deser_with_result_ok_option_none_should_work() {
2110        let expected = SupportedFunctionReturnValue::Ok {
2111            ok: Some(WastValWithType {
2112                r#type: TypeWrapper::Result {
2113                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2114                    err: Some(Box::new(TypeWrapper::String)),
2115                },
2116                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2117            }),
2118        };
2119        let json = serde_json::to_string(&expected).unwrap();
2120        assert_snapshot!(json);
2121
2122        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2123
2124        assert_eq!(expected, actual);
2125    }
2126
2127    #[test]
2128    fn as_date_time_should_work_with_duration_u32_max_secs() {
2129        let duration = Duration::from_secs(u64::from(u32::MAX));
2130        let schedule_at = HistoryEventScheduleAt::In(duration);
2131        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2132        assert_eq!(2106, resolved.year());
2133    }
2134
2135    const MILLIS_PER_SEC: i64 = 1000;
2136    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2137
2138    #[test]
2139    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2140        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
2141        let duration = Duration::from_secs(
2142            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2143        );
2144        let schedule_at = HistoryEventScheduleAt::In(duration);
2145        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2146    }
2147
2148    #[test]
2149    fn join_next_try_outcome_new_format() {
2150        let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"found"}"#;
2151        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2152        assert_eq!(
2153            event,
2154            HistoryEvent::JoinNextTry {
2155                join_set_id: JoinSetId::new(
2156                    crate::JoinSetKind::Named,
2157                    crate::StrVariant::Static("test")
2158                )
2159                .unwrap(),
2160                outcome: JoinNextTryOutcome::Found,
2161            }
2162        );
2163
2164        let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"all_processed"}"#;
2165        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2166        assert_eq!(
2167            event,
2168            HistoryEvent::JoinNextTry {
2169                join_set_id: JoinSetId::new(
2170                    crate::JoinSetKind::Named,
2171                    crate::StrVariant::Static("test")
2172                )
2173                .unwrap(),
2174                outcome: JoinNextTryOutcome::AllProcessed,
2175            }
2176        );
2177    }
2178
2179    #[test]
2180    fn join_next_try_outcome_old_format_compat() {
2181        // Old format: `found_response: true` -> Found
2182        let json = r#"{"type":"join_next_try","join_set_id":"n:test","found_response":true}"#;
2183        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2184        assert_eq!(
2185            event,
2186            HistoryEvent::JoinNextTry {
2187                join_set_id: JoinSetId::new(
2188                    crate::JoinSetKind::Named,
2189                    crate::StrVariant::Static("test")
2190                )
2191                .unwrap(),
2192                outcome: JoinNextTryOutcome::Found,
2193            }
2194        );
2195
2196        // Old format: `found_response: false` -> Pending (conservative default)
2197        let json = r#"{"type":"join_next_try","join_set_id":"n:test","found_response":false}"#;
2198        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2199        assert_eq!(
2200            event,
2201            HistoryEvent::JoinNextTry {
2202                join_set_id: JoinSetId::new(
2203                    crate::JoinSetKind::Named,
2204                    crate::StrVariant::Static("test")
2205                )
2206                .unwrap(),
2207                outcome: JoinNextTryOutcome::Pending,
2208            }
2209        );
2210    }
2211
2212    #[test]
2213    fn join_next_try_outcome_serializes_new_format() {
2214        let event = HistoryEvent::JoinNextTry {
2215            join_set_id: JoinSetId::new(
2216                crate::JoinSetKind::Named,
2217                crate::StrVariant::Static("test"),
2218            )
2219            .unwrap(),
2220            outcome: JoinNextTryOutcome::AllProcessed,
2221        };
2222        let json = serde_json::to_string(&event).unwrap();
2223        assert!(
2224            json.contains(r#""outcome":"all_processed""#),
2225            "expected outcome field, got: {json}"
2226        );
2227        assert!(
2228            !json.contains("found_response"),
2229            "should not contain old field, got: {json}"
2230        );
2231    }
2232}