obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ExecutionFailureKind;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FinishedExecutionError;
7use crate::FunctionFqn;
8use crate::JoinSetId;
9use crate::Params;
10use crate::StrVariant;
11use crate::SupportedFunctionReturnValue;
12use crate::component_id::InputContentDigest;
13use crate::prefixed_ulid::DelayId;
14use crate::prefixed_ulid::ExecutionIdDerived;
15use crate::prefixed_ulid::ExecutorId;
16use crate::prefixed_ulid::RunId;
17use assert_matches::assert_matches;
18use async_trait::async_trait;
19use chrono::TimeDelta;
20use chrono::{DateTime, Utc};
21use http_client_trace::HttpClientTrace;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Display;
26use std::panic::Location;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::time::Duration;
30use tracing::debug;
31use tracing::instrument;
32use tracing_error::SpanTrace;
33
34pub const STATE_PENDING_AT: &str = "pending_at";
35pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
36pub const STATE_LOCKED: &str = "locked";
37pub const STATE_FINISHED: &str = "finished";
38pub static HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next";
39
40/// Remote client representation of the execution journal.
41#[derive(Debug, PartialEq, Eq)]
42#[cfg_attr(feature = "test", derive(Serialize))]
43pub struct ExecutionLog {
44    pub execution_id: ExecutionId,
45    pub events: Vec<ExecutionEvent>,
46    pub responses: Vec<JoinSetResponseEventOuter>,
47    pub next_version: Version, // Is not advanced once in Finished state
48    pub pending_state: PendingState, // updated on every state change
49    pub component_digest: InputContentDigest, // updated on every state change
50}
51
52impl ExecutionLog {
53    #[must_use]
54    pub fn as_execution_with_state(&self) -> ExecutionWithState {
55        let (created_at, first_scheduled_at, ffqn) = assert_matches!(self.events.first(), Some(ExecutionEvent {
56            event: ExecutionRequest::Created { ffqn, scheduled_at: first_scheduled_at,.. },
57            created_at,
58            ..
59        }) => (created_at, first_scheduled_at, ffqn));
60
61        ExecutionWithState {
62            execution_id: self.execution_id.clone(),
63            ffqn: ffqn.clone(),
64            pending_state: self.pending_state.clone(),
65            created_at: *created_at,
66            first_scheduled_at: *first_scheduled_at,
67            component_digest: self.component_digest.clone(),
68        }
69    }
70
71    /// Return some duration after which the execution will be retried.
72    /// Return `None` if no more retries are allowed.
73    #[must_use]
74    pub fn can_be_retried_after(
75        temporary_event_count: u32,
76        max_retries: Option<u32>,
77        retry_exp_backoff: Duration,
78    ) -> Option<Duration> {
79        // If max_retries == None, wrapping is OK after this succeeds - we want to retry forever.
80        if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
81            // TODO: Add test for number of retries
82            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
83            Some(duration)
84        } else {
85            None
86        }
87    }
88
89    #[must_use]
90    pub fn compute_retry_duration_when_retrying_forever(
91        temporary_event_count: u32,
92        retry_exp_backoff: Duration,
93    ) -> Duration {
94        Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
95            .expect("`max_retries` set to MAX must never return None")
96    }
97
98    #[must_use]
99    pub fn ffqn(&self) -> &FunctionFqn {
100        assert_matches!(self.events.first(), Some(ExecutionEvent {
101            event: ExecutionRequest::Created { ffqn, .. },
102            ..
103        }) => ffqn)
104    }
105
106    #[must_use]
107    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
108        assert_matches!(self.events.first(), Some(ExecutionEvent {
109            event: ExecutionRequest::Created { parent, .. },
110            ..
111        }) => parent.clone())
112    }
113
114    #[must_use]
115    pub fn last_event(&self) -> &ExecutionEvent {
116        self.events.last().expect("must contain at least one event")
117    }
118
119    #[must_use]
120    pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
121        if let ExecutionEvent {
122            event: ExecutionRequest::Finished { result, .. },
123            ..
124        } = self.events.pop().expect("must contain at least one event")
125        {
126            Some(result)
127        } else {
128            None
129        }
130    }
131
132    #[cfg(feature = "test")]
133    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
134        self.events.iter().filter_map(|event| {
135            if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
136                Some((eh.clone(), event.version.clone()))
137            } else {
138                None
139            }
140        })
141    }
142
143    #[cfg(feature = "test")]
144    #[must_use]
145    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
146        self.events
147            .iter()
148            .find_map(move |event| match &event.event {
149                ExecutionRequest::HistoryEvent {
150                    event:
151                        HistoryEvent::JoinSetRequest {
152                            join_set_id: found,
153                            request,
154                        },
155                    ..
156                } if *join_set_id == *found => Some(request),
157                _ => None,
158            })
159    }
160}
161
162pub type VersionType = u32;
163#[derive(
164    Debug,
165    Default,
166    Clone,
167    PartialEq,
168    Eq,
169    Hash,
170    derive_more::Display,
171    derive_more::Into,
172    serde::Serialize,
173    serde::Deserialize,
174)]
175#[serde(transparent)]
176pub struct Version(pub VersionType);
177impl Version {
178    #[must_use]
179    pub fn new(arg: VersionType) -> Version {
180        Version(arg)
181    }
182
183    #[must_use]
184    pub fn increment(&self) -> Version {
185        Version(self.0 + 1)
186    }
187}
188impl TryFrom<i64> for Version {
189    type Error = VersionParseError;
190    fn try_from(value: i64) -> Result<Self, Self::Error> {
191        VersionType::try_from(value)
192            .map(Version::new)
193            .map_err(|_| VersionParseError)
194    }
195}
196impl From<Version> for usize {
197    fn from(value: Version) -> Self {
198        usize::try_from(value.0).expect("16 bit systems are unsupported")
199    }
200}
201impl From<&Version> for usize {
202    fn from(value: &Version) -> Self {
203        usize::try_from(value.0).expect("16 bit systems are unsupported")
204    }
205}
206
207#[derive(Debug, thiserror::Error)]
208#[error("version must be u32")]
209pub struct VersionParseError;
210
211#[derive(
212    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
213)]
214#[display("{event}")]
215pub struct ExecutionEvent {
216    pub created_at: DateTime<Utc>,
217    pub event: ExecutionRequest,
218    #[serde(skip_serializing_if = "Option::is_none")]
219    pub backtrace_id: Option<Version>,
220    pub version: Version,
221}
222
223/// Moves the execution to [`PendingState::PendingNow`] if it is currently blocked on `JoinNextBlocking`.
224#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
225pub struct JoinSetResponseEventOuter {
226    pub created_at: DateTime<Utc>,
227    pub event: JoinSetResponseEvent,
228}
229
230#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
231pub struct JoinSetResponseEvent {
232    pub join_set_id: JoinSetId,
233    pub event: JoinSetResponse,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
237#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
238#[serde(tag = "type", rename_all = "snake_case")]
239pub enum JoinSetResponse {
240    #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
241    DelayFinished {
242        delay_id: DelayId,
243        result: Result<(), ()>,
244    },
245    #[display("{result}: {child_execution_id}")] // execution completed..
246    ChildExecutionFinished {
247        child_execution_id: ExecutionIdDerived,
248        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
249        finished_version: Version,
250        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
251        result: SupportedFunctionReturnValue,
252    },
253}
254
255pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
256    ffqn: FunctionFqn::new_static("", ""),
257    params: Params::empty(),
258    parent: None,
259    scheduled_at: DateTime::from_timestamp_nanos(0),
260    component_id: ComponentId::dummy_activity(),
261    metadata: ExecutionMetadata::empty(),
262    scheduled_by: None,
263};
264pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
265    event: HistoryEvent::JoinSetCreate {
266        join_set_id: JoinSetId {
267            kind: crate::JoinSetKind::OneOff,
268            name: StrVariant::empty(),
269        },
270    },
271};
272
273#[derive(
274    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
275)]
276#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
277#[serde(rename_all = "snake_case")]
278pub enum ExecutionRequest {
279    #[display("Created({ffqn}, `{scheduled_at}`)")]
280    Created {
281        ffqn: FunctionFqn,
282        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
283        #[debug(skip)]
284        params: Params,
285        parent: Option<(ExecutionId, JoinSetId)>,
286        scheduled_at: DateTime<Utc>,
287        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
288        component_id: ComponentId,
289        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
290        metadata: ExecutionMetadata,
291        scheduled_by: Option<ExecutionId>,
292    },
293    Locked(Locked),
294    /// Returns execution to [`PendingState::PendingNow`] state
295    /// without timing out. This can happen when the executor is running
296    /// out of resources like [`WorkerError::LimitReached`] or when
297    /// the executor is shutting down.
298    #[display("Unlocked(`{backoff_expires_at}`)")]
299    Unlocked {
300        backoff_expires_at: DateTime<Utc>,
301        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
302        reason: StrVariant,
303    },
304    // Created by the executor holding the lock.
305    // After expiry interpreted as pending.
306    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
307    TemporarilyFailed {
308        backoff_expires_at: DateTime<Utc>,
309        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
310        reason: StrVariant,
311        detail: Option<String>,
312        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
313        http_client_traces: Option<Vec<HttpClientTrace>>,
314    },
315    // Created by the executor holding the lock.
316    // After expiry interpreted as pending.
317    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
318    TemporarilyTimedOut {
319        backoff_expires_at: DateTime<Utc>,
320        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
321        http_client_traces: Option<Vec<HttpClientTrace>>,
322    },
323    // Created by the executor holding the lock.
324    #[display("Finished")]
325    Finished {
326        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
327        result: SupportedFunctionReturnValue,
328        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
329        http_client_traces: Option<Vec<HttpClientTrace>>,
330    },
331
332    #[display("HistoryEvent({event})")]
333    HistoryEvent {
334        event: HistoryEvent,
335    },
336}
337
338impl ExecutionRequest {
339    #[must_use]
340    pub fn is_temporary_event(&self) -> bool {
341        matches!(
342            self,
343            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
344        )
345    }
346
347    #[must_use]
348    pub const fn variant(&self) -> &'static str {
349        match self {
350            ExecutionRequest::Created { .. } => "created",
351            ExecutionRequest::Locked(_) => "locked",
352            ExecutionRequest::Unlocked { .. } => "unlocked",
353            ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
354            ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
355            ExecutionRequest::Finished { .. } => "finished",
356            ExecutionRequest::HistoryEvent { .. } => "history_event",
357        }
358    }
359
360    #[must_use]
361    pub fn join_set_id(&self) -> Option<&JoinSetId> {
362        match self {
363            Self::Created {
364                parent: Some((_parent_id, join_set_id)),
365                ..
366            } => Some(join_set_id),
367            Self::HistoryEvent {
368                event:
369                    HistoryEvent::JoinSetCreate { join_set_id, .. }
370                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
371                    | HistoryEvent::JoinNext { join_set_id, .. },
372            } => Some(join_set_id),
373            _ => None,
374        }
375    }
376}
377
378#[derive(
379    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
380)]
381#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
382#[display("Locked(`{lock_expires_at}`, {component_id})")]
383pub struct Locked {
384    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
385    pub component_id: ComponentId,
386    pub executor_id: ExecutorId,
387    pub run_id: RunId,
388    pub lock_expires_at: DateTime<Utc>,
389    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
390    pub retry_config: ComponentRetryConfig,
391}
392
393#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
394#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
395#[serde(tag = "type", rename_all = "snake_case")]
396pub enum PersistKind {
397    #[display("RandomU64({min}, {max_inclusive})")]
398    RandomU64 { min: u64, max_inclusive: u64 },
399    #[display("RandomString({min_length}, {max_length_exclusive})")]
400    RandomString {
401        min_length: u64,
402        max_length_exclusive: u64,
403    },
404}
405
406#[must_use]
407pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
408    value.to_be_bytes()
409}
410
411#[must_use]
412pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
413    u64::from_be_bytes(bytes)
414}
415
416#[derive(
417    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
418)]
419#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
420#[serde(tag = "type", rename_all = "snake_case")]
421/// Must be created by the executor in [`PendingState::Locked`].
422pub enum HistoryEvent {
423    /// Persist a generated pseudorandom value.
424    #[display("Persist")]
425    Persist {
426        #[debug(skip)]
427        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
428        kind: PersistKind,
429    },
430    #[display("JoinSetCreate({join_set_id})")]
431    JoinSetCreate { join_set_id: JoinSetId },
432    #[display("JoinSetRequest({request})")]
433    // join_set_id is part of ExecutionId or DelayId in the `request`
434    JoinSetRequest {
435        join_set_id: JoinSetId,
436        request: JoinSetRequest,
437    },
438    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
439    /// When the response arrives at `resp_time`:
440    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
441    /// original executor can continue. After the expiry any executor can continue without
442    /// marking the execution as timed out.
443    #[display("JoinNext({join_set_id})")]
444    JoinNext {
445        join_set_id: JoinSetId,
446        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
447        /// The pending status will be kept in Locked state until `run_expires_at`.
448        run_expires_at: DateTime<Utc>,
449        /// Set to a specific function when calling `-await-next` extension function, used for
450        /// determinism checks.
451        requested_ffqn: Option<FunctionFqn>,
452        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
453        closing: bool,
454    },
455    /// Records the fact that a join set was awaited more times than its submission count.
456    #[display("JoinNextTooMany({join_set_id})")]
457    JoinNextTooMany {
458        join_set_id: JoinSetId,
459        /// Set to a specific function when calling `-await-next` extension function, used for
460        /// determinism checks.
461        requested_ffqn: Option<FunctionFqn>,
462    },
463    #[display("Schedule({execution_id}, {schedule_at})")]
464    Schedule {
465        execution_id: ExecutionId,
466        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
467    },
468    #[display("Stub({target_execution_id})")]
469    Stub {
470        target_execution_id: ExecutionIdDerived,
471        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
472        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
473        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
474    },
475}
476
477#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
478#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
479#[serde(rename_all = "snake_case")]
480pub enum HistoryEventScheduleAt {
481    Now,
482    #[display("At(`{_0}`)")]
483    At(DateTime<Utc>),
484    #[display("In({_0:?})")]
485    In(Duration),
486}
487
488#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
489pub enum ScheduleAtConversionError {
490    #[error("source duration value is out of range")]
491    OutOfRangeError,
492}
493
494impl HistoryEventScheduleAt {
495    pub fn as_date_time(
496        &self,
497        now: DateTime<Utc>,
498    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
499        match self {
500            Self::Now => Ok(now),
501            Self::At(date_time) => Ok(*date_time),
502            Self::In(duration) => {
503                let time_delta = TimeDelta::from_std(*duration)
504                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
505                now.checked_add_signed(time_delta)
506                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
507            }
508        }
509    }
510}
511
512#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
513#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
514#[serde(tag = "type", rename_all = "snake_case")]
515pub enum JoinSetRequest {
516    // Must be created by the executor in `PendingState::Locked`.
517    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
518    DelayRequest {
519        delay_id: DelayId,
520        expires_at: DateTime<Utc>,
521        schedule_at: HistoryEventScheduleAt,
522    },
523    // Must be created by the executor in `PendingState::Locked`.
524    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
525    ChildExecutionRequest {
526        child_execution_id: ExecutionIdDerived,
527        target_ffqn: FunctionFqn,
528        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
529        params: Params,
530    },
531}
532
533/// Error that is not specific to an execution.
534#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
535pub enum DbErrorGeneric {
536    #[error("database error: {reason}")]
537    Uncategorized {
538        reason: StrVariant,
539        #[eq(skip)]
540        #[partial_eq(skip)]
541        context: SpanTrace,
542        #[eq(skip)]
543        #[partial_eq(skip)]
544        #[source]
545        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
546        loc: &'static Location<'static>,
547    },
548    #[error("database was closed")]
549    Close,
550}
551
552#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
553pub enum DbErrorWriteNonRetriable {
554    #[error("validation failed: {0}")]
555    ValidationFailed(StrVariant),
556    #[error("conflict")]
557    Conflict,
558    #[error("illegal state: {reason}")]
559    IllegalState {
560        reason: StrVariant,
561        #[eq(skip)]
562        #[partial_eq(skip)]
563        context: SpanTrace,
564        #[eq(skip)]
565        #[partial_eq(skip)]
566        #[source]
567        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
568        loc: &'static Location<'static>,
569    },
570    #[error("version conflict: expected: {expected}, got: {requested}")]
571    VersionConflict {
572        expected: Version,
573        requested: Version,
574    },
575}
576
577/// Write error tied to an execution
578#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
579pub enum DbErrorWrite {
580    #[error("cannot write - row not found")]
581    NotFound,
582    #[error("non-retriable error: {0}")]
583    NonRetriable(#[from] DbErrorWriteNonRetriable),
584    #[error(transparent)]
585    Generic(#[from] DbErrorGeneric),
586}
587
588/// Read error tied to an execution
589#[derive(Debug, Clone, thiserror::Error, PartialEq)]
590pub enum DbErrorRead {
591    #[error("cannot read - row not found")]
592    NotFound,
593    #[error(transparent)]
594    Generic(#[from] DbErrorGeneric),
595}
596
597#[derive(Debug, thiserror::Error, PartialEq)]
598pub enum DbErrorReadWithTimeout {
599    #[error("timeout")]
600    Timeout(TimeoutOutcome),
601    #[error(transparent)]
602    DbErrorRead(#[from] DbErrorRead),
603}
604
605// Represents next version after successfuly appended to execution log.
606// TODO: Convert to struct with next_version
607pub type AppendResponse = Version;
608pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
609
610#[derive(Debug, Clone)]
611pub struct LockedExecution {
612    pub execution_id: ExecutionId,
613    pub next_version: Version,
614    pub metadata: ExecutionMetadata,
615    pub locked_event: Locked,
616    pub ffqn: FunctionFqn,
617    pub params: Params,
618    pub event_history: Vec<(HistoryEvent, Version)>,
619    pub responses: Vec<JoinSetResponseEventOuter>,
620    pub parent: Option<(ExecutionId, JoinSetId)>,
621    pub intermittent_event_count: u32,
622}
623
624pub type LockPendingResponse = Vec<LockedExecution>;
625pub type AppendBatchResponse = Version;
626
627#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
628#[display("{event}")]
629pub struct AppendRequest {
630    pub created_at: DateTime<Utc>,
631    pub event: ExecutionRequest,
632}
633
634#[derive(Debug, Clone)]
635pub struct CreateRequest {
636    pub created_at: DateTime<Utc>,
637    pub execution_id: ExecutionId,
638    pub ffqn: FunctionFqn,
639    pub params: Params,
640    pub parent: Option<(ExecutionId, JoinSetId)>,
641    pub scheduled_at: DateTime<Utc>,
642    pub component_id: ComponentId,
643    pub metadata: ExecutionMetadata,
644    pub scheduled_by: Option<ExecutionId>,
645}
646
647impl From<CreateRequest> for ExecutionRequest {
648    fn from(value: CreateRequest) -> Self {
649        Self::Created {
650            ffqn: value.ffqn,
651            params: value.params,
652            parent: value.parent,
653            scheduled_at: value.scheduled_at,
654            component_id: value.component_id,
655            metadata: value.metadata,
656            scheduled_by: value.scheduled_by,
657        }
658    }
659}
660
661#[async_trait]
662pub trait DbPool: Send + Sync {
663    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
664
665    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
666
667    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
668
669    #[cfg(feature = "test")]
670    async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
671}
672
673#[async_trait]
674pub trait DbPoolCloseable {
675    async fn close(&self);
676}
677
678#[derive(Clone, Debug)]
679pub struct AppendEventsToExecution {
680    pub execution_id: ExecutionId,
681    pub version: Version,
682    pub batch: Vec<AppendRequest>,
683}
684
685#[derive(Clone, Debug)]
686pub struct AppendResponseToExecution {
687    pub parent_execution_id: ExecutionId,
688    pub created_at: DateTime<Utc>,
689    pub join_set_id: JoinSetId,
690    pub child_execution_id: ExecutionIdDerived,
691    pub finished_version: Version,
692    pub result: SupportedFunctionReturnValue,
693}
694
695#[async_trait]
696pub trait DbExecutor: Send + Sync {
697    #[expect(clippy::too_many_arguments)]
698    async fn lock_pending_by_ffqns(
699        &self,
700        batch_size: u32,
701        pending_at_or_sooner: DateTime<Utc>,
702        ffqns: Arc<[FunctionFqn]>,
703        created_at: DateTime<Utc>,
704        component_id: ComponentId,
705        executor_id: ExecutorId,
706        lock_expires_at: DateTime<Utc>,
707        run_id: RunId,
708        retry_config: ComponentRetryConfig,
709    ) -> Result<LockPendingResponse, DbErrorGeneric>;
710
711    #[expect(clippy::too_many_arguments)]
712    async fn lock_pending_by_component_id(
713        &self,
714        batch_size: u32,
715        pending_at_or_sooner: DateTime<Utc>,
716        component_id: &ComponentId,
717        created_at: DateTime<Utc>,
718        executor_id: ExecutorId,
719        lock_expires_at: DateTime<Utc>,
720        run_id: RunId,
721        retry_config: ComponentRetryConfig,
722    ) -> Result<LockPendingResponse, DbErrorGeneric>;
723
724    /// Specialized locking for e.g. extending the lock by the original executor and run.
725    #[expect(clippy::too_many_arguments)]
726    async fn lock_one(
727        &self,
728        created_at: DateTime<Utc>,
729        component_id: ComponentId,
730        execution_id: &ExecutionId,
731        run_id: RunId,
732        version: Version,
733        executor_id: ExecutorId,
734        lock_expires_at: DateTime<Utc>,
735        retry_config: ComponentRetryConfig,
736    ) -> Result<LockedExecution, DbErrorWrite>;
737
738    /// Append a single event to an existing execution log.
739    /// The request cannot contain `ExecutionEventInner::Created`.
740    async fn append(
741        &self,
742        execution_id: ExecutionId,
743        version: Version,
744        req: AppendRequest,
745    ) -> Result<AppendResponse, DbErrorWrite>;
746
747    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
748    /// The batch cannot contain `ExecutionEventInner::Created`.
749    async fn append_batch_respond_to_parent(
750        &self,
751        events: AppendEventsToExecution,
752        response: AppendResponseToExecution,
753        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
754    ) -> Result<AppendBatchResponse, DbErrorWrite>;
755
756    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
757    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
758    /// Otherwise wait until `timeout_fut` resolves.
759    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
760    async fn wait_for_pending_by_ffqn(
761        &self,
762        pending_at_or_sooner: DateTime<Utc>,
763        ffqns: Arc<[FunctionFqn]>,
764        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
765    );
766
767    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
768    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
769    /// Otherwise wait until `timeout_fut` resolves.
770    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
771    async fn wait_for_pending_by_component_id(
772        &self,
773        pending_at_or_sooner: DateTime<Utc>,
774        component_id: &ComponentId,
775        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
776    );
777
778    async fn cancel_activity_with_retries(
779        &self,
780        execution_id: &ExecutionId,
781        cancelled_at: DateTime<Utc>,
782    ) -> Result<CancelOutcome, DbErrorWrite> {
783        let mut retries = 5;
784        loop {
785            let res = self.cancel_activity(execution_id, cancelled_at).await;
786            if res.is_ok() || retries == 0 {
787                return res;
788            }
789            retries -= 1;
790        }
791    }
792
793    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
794    async fn get_last_execution_event(
795        &self,
796        execution_id: &ExecutionId,
797    ) -> Result<ExecutionEvent, DbErrorRead>;
798
799    async fn cancel_activity(
800        &self,
801        execution_id: &ExecutionId,
802        cancelled_at: DateTime<Utc>,
803    ) -> Result<CancelOutcome, DbErrorWrite> {
804        debug!("Determining cancellation state of {execution_id}");
805
806        let last_event = self
807            .get_last_execution_event(execution_id)
808            .await
809            .map_err(DbErrorWrite::from)?;
810        if let ExecutionRequest::Finished {
811            result:
812                SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
813                    kind: ExecutionFailureKind::Cancelled,
814                    ..
815                }),
816            ..
817        } = last_event.event
818        {
819            return Ok(CancelOutcome::Cancelled);
820        } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
821            debug!("Not cancelling, {execution_id} is already finished");
822            return Ok(CancelOutcome::AlreadyFinished);
823        }
824        let finished_version = last_event.version.increment();
825        let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
826            reason: None,
827            kind: ExecutionFailureKind::Cancelled,
828            detail: None,
829        });
830        let cancel_request = AppendRequest {
831            created_at: cancelled_at,
832            event: ExecutionRequest::Finished {
833                result: child_result.clone(),
834                http_client_traces: None,
835            },
836        };
837        debug!("Cancelling activity {execution_id} at {finished_version}");
838        if let ExecutionId::Derived(execution_id) = execution_id {
839            let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
840            let child_execution_id = ExecutionId::Derived(execution_id.clone());
841            self.append_batch_respond_to_parent(
842                AppendEventsToExecution {
843                    execution_id: child_execution_id,
844                    version: finished_version.clone(),
845                    batch: vec![cancel_request],
846                },
847                AppendResponseToExecution {
848                    parent_execution_id,
849                    created_at: cancelled_at,
850                    join_set_id: join_set_id.clone(),
851                    child_execution_id: execution_id.clone(),
852                    finished_version,
853                    result: child_result,
854                },
855                cancelled_at,
856            )
857            .await?;
858        } else {
859            self.append(execution_id.clone(), finished_version, cancel_request)
860                .await?;
861        }
862        debug!("Cancelled {execution_id}");
863        Ok(CancelOutcome::Cancelled)
864    }
865}
866
867pub enum AppendDelayResponseOutcome {
868    Success,
869    AlreadyFinished,
870    AlreadyCancelled,
871}
872
873#[derive(Debug, Clone, Default)]
874pub struct ListExecutionsFilter {
875    pub ffqn_prefix: Option<String>,
876    pub show_derived: bool,
877    pub hide_finished: bool,
878    pub execution_id_prefix: Option<String>,
879}
880
881#[async_trait]
882pub trait DbExternalApi: DbConnection {
883    /// Get the latest backtrace if version is not set.
884    async fn get_backtrace(
885        &self,
886        execution_id: &ExecutionId,
887        filter: BacktraceFilter,
888    ) -> Result<BacktraceInfo, DbErrorRead>;
889
890    /// Returns executions sorted in descending order.
891    async fn list_executions(
892        &self,
893        filter: ListExecutionsFilter,
894        pagination: ExecutionListPagination,
895    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
896
897    async fn list_execution_events(
898        &self,
899        execution_id: &ExecutionId,
900        since: &Version,
901        max_length: VersionType,
902        include_backtrace_id: bool,
903    ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
904
905    /// Returns responses of an execution ordered as they arrived,
906    /// enabling matching each `JoinNext` to its corresponding response.
907    async fn list_responses(
908        &self,
909        execution_id: &ExecutionId,
910        pagination: Pagination<u32>,
911    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
912
913    async fn list_execution_events_responses(
914        &self,
915        execution_id: &ExecutionId,
916        req_since: &Version,
917        req_max_length: VersionType,
918        req_include_backtrace_id: bool,
919        resp_pagination: Pagination<u32>,
920    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
921
922    async fn upgrade_execution_component(
923        &self,
924        execution_id: &ExecutionId,
925        old: &InputContentDigest,
926        new: &InputContentDigest,
927    ) -> Result<(), DbErrorWrite>;
928}
929
930#[derive(Debug, Clone)]
931pub struct ExecutionWithStateRequestsResponses {
932    pub execution_with_state: ExecutionWithState,
933    pub events: Vec<ExecutionEvent>,
934    pub responses: Vec<ResponseWithCursor>,
935}
936
937#[async_trait]
938pub trait DbConnection: DbExecutor {
939    async fn append_delay_response(
940        &self,
941        created_at: DateTime<Utc>,
942        execution_id: ExecutionId,
943        join_set_id: JoinSetId,
944        delay_id: DelayId,
945        outcome: Result<(), ()>, // Successfully finished - `Ok(())` or cancelled - `Err(())`
946    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
947
948    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
949    /// The batch cannot contain `ExecutionEventInner::Created`.
950    async fn append_batch(
951        &self,
952        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
953        batch: Vec<AppendRequest>,
954        execution_id: ExecutionId,
955        version: Version,
956    ) -> Result<AppendBatchResponse, DbErrorWrite>;
957
958    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
959    /// The batch cannot contain `ExecutionEventInner::Created`.
960    async fn append_batch_create_new_execution(
961        &self,
962        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
963        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
964        execution_id: ExecutionId,
965        version: Version,
966        child_req: Vec<CreateRequest>,
967    ) -> Result<AppendBatchResponse, DbErrorWrite>;
968
969    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
970    async fn get_execution_event(
971        &self,
972        execution_id: &ExecutionId,
973        version: &Version,
974    ) -> Result<ExecutionEvent, DbErrorRead>;
975
976    #[instrument(skip(self))]
977    async fn get_create_request(
978        &self,
979        execution_id: &ExecutionId,
980    ) -> Result<CreateRequest, DbErrorRead> {
981        let execution_event = self
982            .get_execution_event(execution_id, &Version::new(0))
983            .await?;
984        if let ExecutionRequest::Created {
985            ffqn,
986            params,
987            parent,
988            scheduled_at,
989            component_id,
990            metadata,
991            scheduled_by,
992        } = execution_event.event
993        {
994            Ok(CreateRequest {
995                created_at: execution_event.created_at,
996                execution_id: execution_id.clone(),
997                ffqn,
998                params,
999                parent,
1000                scheduled_at,
1001                component_id,
1002                metadata,
1003                scheduled_by,
1004            })
1005        } else {
1006            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1007                reason: "execution log must start with creation".into(),
1008                context: SpanTrace::capture(),
1009                source: None,
1010                loc: Location::caller(),
1011            }))
1012        }
1013    }
1014
1015    async fn get_pending_state(
1016        &self,
1017        execution_id: &ExecutionId,
1018    ) -> Result<ExecutionWithState, DbErrorRead>;
1019
1020    /// Get currently expired locks and async timers (delay requests)
1021    async fn get_expired_timers(
1022        &self,
1023        at: DateTime<Utc>,
1024    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1025
1026    /// Create a new execution log
1027    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1028
1029    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
1030    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
1031    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
1032    /// Implementations with no pubsub support should use polling.
1033    /// Callers are expected to call this function in a loop with a reasonable timeout
1034    /// to support less stellar implementations.
1035    async fn subscribe_to_next_responses(
1036        &self,
1037        execution_id: &ExecutionId,
1038        start_idx: u32,
1039        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1040    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
1041
1042    /// Notification mechainism with no strict guarantees for getting the finished result.
1043    /// Implementations with no pubsub support should use polling.
1044    /// Callers are expected to call this function in a loop with a reasonable timeout
1045    /// to support less stellar implementations.
1046    async fn wait_for_finished_result(
1047        &self,
1048        execution_id: &ExecutionId,
1049        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1050        // TODO: camcel fut
1051    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1052
1053    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1054
1055    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1056}
1057
1058#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1059pub enum TimeoutOutcome {
1060    Timeout,
1061    Cancel,
1062}
1063
1064#[cfg(feature = "test")]
1065#[async_trait]
1066pub trait DbConnectionTest: DbConnection {
1067    async fn append_response(
1068        &self,
1069        created_at: DateTime<Utc>,
1070        execution_id: ExecutionId,
1071        response_event: JoinSetResponseEvent,
1072    ) -> Result<(), DbErrorWrite>;
1073
1074    /// Get execution log.
1075    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1076}
1077
1078#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1079pub enum CancelOutcome {
1080    Cancelled,
1081    AlreadyFinished,
1082}
1083
1084#[instrument(skip(db_connection))]
1085pub async fn stub_execution(
1086    db_connection: &dyn DbConnection,
1087    execution_id: ExecutionIdDerived,
1088    parent_execution_id: ExecutionId,
1089    join_set_id: JoinSetId,
1090    created_at: DateTime<Utc>,
1091    return_value: SupportedFunctionReturnValue,
1092) -> Result<(), DbErrorWrite> {
1093    let stub_finished_version = Version::new(1); // Stub activities have no execution log except Created event.
1094    // Attempt to write to `execution_id` and its parent, ignoring the possible conflict error on this tx
1095    let write_attempt = {
1096        let finished_req = AppendRequest {
1097            created_at,
1098            event: ExecutionRequest::Finished {
1099                result: return_value.clone(),
1100                http_client_traces: None,
1101            },
1102        };
1103        db_connection
1104            .append_batch_respond_to_parent(
1105                AppendEventsToExecution {
1106                    execution_id: ExecutionId::Derived(execution_id.clone()),
1107                    version: stub_finished_version.clone(),
1108                    batch: vec![finished_req],
1109                },
1110                AppendResponseToExecution {
1111                    parent_execution_id,
1112                    created_at,
1113                    join_set_id,
1114                    child_execution_id: execution_id.clone(),
1115                    finished_version: stub_finished_version.clone(),
1116                    result: return_value.clone(),
1117                },
1118                created_at,
1119            )
1120            .await
1121    };
1122    if let Err(write_attempt) = write_attempt {
1123        // Check that the expected value is in the database
1124        debug!("Stub write attempt failed - {write_attempt:?}");
1125
1126        let found = db_connection
1127            .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1128            .await?; // Not found at this point should not happen, unless the previous write failed. Will be retried.
1129        match found.event {
1130            ExecutionRequest::Finished {
1131                result: found_result,
1132                ..
1133            } if return_value == found_result => {
1134                // Same value has already be written, RPC is successful.
1135                Ok(())
1136            }
1137            ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1138                DbErrorWriteNonRetriable::Conflict,
1139            )),
1140            _other => Err(DbErrorWrite::NonRetriable(
1141                DbErrorWriteNonRetriable::IllegalState {
1142                    reason: "unexpected execution event at stubbed execution".into(),
1143                    context: SpanTrace::capture(),
1144                    source: None,
1145                    loc: Location::caller(),
1146                },
1147            )),
1148        }
1149    } else {
1150        Ok(())
1151    }
1152}
1153
1154pub async fn cancel_delay(
1155    db_connection: &dyn DbConnection,
1156    delay_id: DelayId,
1157    created_at: DateTime<Utc>,
1158) -> Result<CancelOutcome, DbErrorWrite> {
1159    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1160    db_connection
1161        .append_delay_response(
1162            created_at,
1163            parent_execution_id,
1164            join_set_id,
1165            delay_id,
1166            Err(()), // Mark as cancelled.
1167        )
1168        .await
1169        .map(|ok| match ok {
1170            AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1171                CancelOutcome::Cancelled
1172            }
1173            AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1174        })
1175}
1176
1177#[derive(Clone, Debug)]
1178pub enum BacktraceFilter {
1179    First,
1180    Last,
1181    Specific(Version),
1182}
1183
1184#[derive(Clone, Debug, PartialEq, Eq)]
1185pub struct BacktraceInfo {
1186    pub execution_id: ExecutionId,
1187    pub component_id: ComponentId,
1188    pub version_min_including: Version,
1189    pub version_max_excluding: Version,
1190    pub wasm_backtrace: WasmBacktrace,
1191}
1192
1193#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1194pub struct WasmBacktrace {
1195    pub frames: Vec<FrameInfo>,
1196}
1197
1198#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1199pub struct FrameInfo {
1200    pub module: String,
1201    pub func_name: String,
1202    pub symbols: Vec<FrameSymbol>,
1203}
1204
1205#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1206pub struct FrameSymbol {
1207    pub func_name: Option<String>,
1208    pub file: Option<String>,
1209    pub line: Option<u32>,
1210    pub col: Option<u32>,
1211}
1212
1213mod wasm_backtrace {
1214    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1215
1216    impl WasmBacktrace {
1217        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1218            if backtrace.frames().is_empty() {
1219                None
1220            } else {
1221                Some(Self {
1222                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1223                })
1224            }
1225        }
1226    }
1227
1228    impl From<&wasmtime::FrameInfo> for FrameInfo {
1229        fn from(frame: &wasmtime::FrameInfo) -> Self {
1230            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1231            let mut func_name = String::new();
1232            wasmtime_environ::demangle_function_name_or_index(
1233                &mut func_name,
1234                frame.func_name(),
1235                frame.func_index() as usize,
1236            )
1237            .expect("writing to string must succeed");
1238            Self {
1239                module: module_name,
1240                func_name,
1241                symbols: frame
1242                    .symbols()
1243                    .iter()
1244                    .map(std::convert::Into::into)
1245                    .collect(),
1246            }
1247        }
1248    }
1249
1250    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1251        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1252            let func_name = symbol.name().map(|name| {
1253                let mut writer = String::new();
1254                wasmtime_environ::demangle_function_name(&mut writer, name)
1255                    .expect("writing to string must succeed");
1256                writer
1257            });
1258
1259            Self {
1260                func_name,
1261                file: symbol.file().map(ToString::to_string),
1262                line: symbol.line(),
1263                col: symbol.column(),
1264            }
1265        }
1266    }
1267}
1268
1269pub type ResponseCursorType = u32;
1270
1271#[derive(Debug, Clone, Serialize)]
1272pub struct ResponseWithCursor {
1273    pub event: JoinSetResponseEventOuter,
1274    pub cursor: ResponseCursorType,
1275}
1276
1277#[derive(Debug, Clone, Serialize, derive_more::Display)]
1278#[display("{execution_id} {pending_state} {component_digest}")]
1279pub struct ExecutionWithState {
1280    pub execution_id: ExecutionId,
1281    pub ffqn: FunctionFqn,
1282    pub pending_state: PendingState,
1283    pub created_at: DateTime<Utc>,
1284    pub first_scheduled_at: DateTime<Utc>,
1285    pub component_digest: InputContentDigest,
1286}
1287
1288#[derive(Debug, Clone)]
1289pub enum ExecutionListPagination {
1290    CreatedBy(Pagination<Option<DateTime<Utc>>>),
1291    ExecutionId(Pagination<Option<ExecutionId>>),
1292}
1293impl Default for ExecutionListPagination {
1294    fn default() -> ExecutionListPagination {
1295        ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1296            length: 20,
1297            cursor: None,
1298            including_cursor: false, // does not matter when `cursor` is not specified
1299        })
1300    }
1301}
1302impl ExecutionListPagination {
1303    #[must_use]
1304    pub fn length(&self) -> u16 {
1305        match self {
1306            ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1307            ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1308        }
1309    }
1310}
1311
1312#[derive(Debug, Clone, Copy)]
1313pub enum Pagination<T> {
1314    NewerThan {
1315        length: u16,
1316        cursor: T,
1317        including_cursor: bool,
1318    },
1319    OlderThan {
1320        length: u16,
1321        cursor: T,
1322        including_cursor: bool,
1323    },
1324}
1325impl<T> Pagination<T> {
1326    pub fn length(&self) -> u16 {
1327        match self {
1328            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1329        }
1330    }
1331    pub fn rel(&self) -> &'static str {
1332        match self {
1333            Pagination::NewerThan {
1334                including_cursor: false,
1335                ..
1336            } => ">",
1337            Pagination::NewerThan {
1338                including_cursor: true,
1339                ..
1340            } => ">=",
1341            Pagination::OlderThan {
1342                including_cursor: false,
1343                ..
1344            } => "<",
1345            Pagination::OlderThan {
1346                including_cursor: true,
1347                ..
1348            } => "<=",
1349        }
1350    }
1351    pub fn is_desc(&self) -> bool {
1352        matches!(self, Pagination::OlderThan { .. })
1353    }
1354    pub fn cursor(&self) -> &T {
1355        match self {
1356            Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1357        }
1358    }
1359}
1360
1361#[cfg(feature = "test")]
1362pub async fn wait_for_pending_state_fn<T: Debug>(
1363    db_connection: &dyn DbConnectionTest,
1364    execution_id: &ExecutionId,
1365    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1366    timeout: Option<Duration>,
1367) -> Result<T, DbErrorReadWithTimeout> {
1368    tracing::trace!(%execution_id, "Waiting for predicate");
1369    let fut = async move {
1370        loop {
1371            let execution_log = db_connection.get(execution_id).await?;
1372            if let Some(t) = predicate(execution_log) {
1373                tracing::debug!(%execution_id, "Found: {t:?}");
1374                return Ok(t);
1375            }
1376            tokio::time::sleep(Duration::from_millis(10)).await;
1377        }
1378    };
1379
1380    if let Some(timeout) = timeout {
1381        tokio::select! { // future's liveness: Dropping the loser immediately.
1382            res = fut => res,
1383            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
1384        }
1385    } else {
1386        fut.await
1387    }
1388}
1389
1390#[derive(Debug, Clone, PartialEq, Eq)]
1391pub enum ExpiredTimer {
1392    Lock(ExpiredLock),
1393    Delay(ExpiredDelay),
1394}
1395
1396#[derive(Debug, Clone, PartialEq, Eq)]
1397pub struct ExpiredLock {
1398    pub execution_id: ExecutionId,
1399    // Version of last `Locked` event, used to detect whether the execution made progress.
1400    pub locked_at_version: Version,
1401    pub next_version: Version,
1402    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1403    pub intermittent_event_count: u32,
1404    pub max_retries: Option<u32>,
1405    pub retry_exp_backoff: Duration,
1406    pub locked_by: LockedBy,
1407}
1408
1409#[derive(Debug, Clone, PartialEq, Eq)]
1410pub struct ExpiredDelay {
1411    pub execution_id: ExecutionId,
1412    pub join_set_id: JoinSetId,
1413    pub delay_id: DelayId,
1414}
1415
1416#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1417#[serde(tag = "status", rename_all = "snake_case")]
1418pub enum PendingState {
1419    Locked(PendingStateLocked),
1420    #[display("PendingAt(`{scheduled_at}`)")]
1421    PendingAt {
1422        scheduled_at: DateTime<Utc>,
1423        last_lock: Option<LockedBy>, // Needed for lock extension
1424    }, // e.g. created with a schedule, temporary timeout/failure
1425    #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1426    /// Caused by [`HistoryEvent::JoinNext`]
1427    BlockedByJoinSet {
1428        join_set_id: JoinSetId,
1429        /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1430        lock_expires_at: DateTime<Utc>,
1431        /// Blocked by closing of the join set
1432        closing: bool,
1433    },
1434    #[display("Finished({finished})")]
1435    Finished {
1436        #[serde(flatten)]
1437        finished: PendingStateFinished,
1438    },
1439}
1440
1441#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1442#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1443pub struct PendingStateLocked {
1444    pub locked_by: LockedBy,
1445    pub lock_expires_at: DateTime<Utc>,
1446}
1447
1448#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1449pub struct LockedBy {
1450    pub executor_id: ExecutorId,
1451    pub run_id: RunId,
1452}
1453
1454#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1455pub struct PendingStateFinished {
1456    pub version: VersionType, // not Version since it must be Copy
1457    pub finished_at: DateTime<Utc>,
1458    pub result_kind: PendingStateFinishedResultKind,
1459}
1460impl Display for PendingStateFinished {
1461    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1462        match self.result_kind {
1463            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1464            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1465        }
1466    }
1467}
1468
1469// This is not a Result so that it can be customized for serialization
1470#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1471#[serde(rename_all = "snake_case")]
1472pub enum PendingStateFinishedResultKind {
1473    Ok,
1474    Err(PendingStateFinishedError),
1475}
1476impl PendingStateFinishedResultKind {
1477    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1478        match self {
1479            PendingStateFinishedResultKind::Ok => Ok(()),
1480            PendingStateFinishedResultKind::Err(err) => Err(err),
1481        }
1482    }
1483}
1484
1485impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1486    fn from(result: &SupportedFunctionReturnValue) -> Self {
1487        result.as_pending_state_finished_result()
1488    }
1489}
1490
1491#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1492#[serde(rename_all = "snake_case")]
1493pub enum PendingStateFinishedError {
1494    #[display("execution terminated: {_0}")]
1495    ExecutionFailure(ExecutionFailureKind),
1496    #[display("execution completed with an error")]
1497    Error,
1498}
1499
1500impl PendingState {
1501    #[instrument(skip(self))]
1502    pub fn can_append_lock(
1503        &self,
1504        created_at: DateTime<Utc>,
1505        executor_id: ExecutorId,
1506        run_id: RunId,
1507        lock_expires_at: DateTime<Utc>,
1508    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1509        if lock_expires_at <= created_at {
1510            return Err(DbErrorWriteNonRetriable::ValidationFailed(
1511                "invalid expiry date".into(),
1512            ));
1513        }
1514        match self {
1515            PendingState::PendingAt {
1516                scheduled_at,
1517                last_lock,
1518            } => {
1519                if *scheduled_at <= created_at {
1520                    // pending now, ok to lock
1521                    Ok(LockKind::CreatingNewLock)
1522                } else if let Some(LockedBy {
1523                    executor_id: last_executor_id,
1524                    run_id: last_run_id,
1525                }) = last_lock
1526                    && executor_id == *last_executor_id
1527                    && run_id == *last_run_id
1528                {
1529                    // Original executor is extending the lock.
1530                    Ok(LockKind::Extending)
1531                } else {
1532                    Err(DbErrorWriteNonRetriable::ValidationFailed(
1533                        "cannot lock, not yet pending".into(),
1534                    ))
1535                }
1536            }
1537            PendingState::Locked(PendingStateLocked {
1538                locked_by:
1539                    LockedBy {
1540                        executor_id: current_pending_state_executor_id,
1541                        run_id: current_pending_state_run_id,
1542                    },
1543                lock_expires_at: _,
1544            }) => {
1545                if executor_id == *current_pending_state_executor_id
1546                    && run_id == *current_pending_state_run_id
1547                {
1548                    // Original executor is extending the lock.
1549                    Ok(LockKind::Extending)
1550                } else {
1551                    Err(DbErrorWriteNonRetriable::IllegalState {
1552                        reason: "cannot lock, already locked".into(),
1553                        context: SpanTrace::capture(),
1554                        source: None,
1555                        loc: Location::caller(),
1556                    })
1557                }
1558            }
1559            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
1560                reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1561                context: SpanTrace::capture(),
1562                source: None,
1563                loc: Location::caller(),
1564            }),
1565            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
1566                reason: "already finished".into(),
1567                context: SpanTrace::capture(),
1568                source: None,
1569                loc: Location::caller(),
1570            }),
1571        }
1572    }
1573
1574    #[must_use]
1575    pub fn is_finished(&self) -> bool {
1576        matches!(self, PendingState::Finished { .. })
1577    }
1578}
1579
1580#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1581pub enum LockKind {
1582    Extending,
1583    CreatingNewLock,
1584}
1585
1586pub mod http_client_trace {
1587    use chrono::{DateTime, Utc};
1588    use serde::{Deserialize, Serialize};
1589
1590    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1591    pub struct HttpClientTrace {
1592        pub req: RequestTrace,
1593        pub resp: Option<ResponseTrace>,
1594    }
1595
1596    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1597    pub struct RequestTrace {
1598        pub sent_at: DateTime<Utc>,
1599        pub uri: String,
1600        pub method: String,
1601    }
1602
1603    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1604    pub struct ResponseTrace {
1605        pub finished_at: DateTime<Utc>,
1606        pub status: Result<u16, String>,
1607    }
1608}
1609
1610#[cfg(test)]
1611mod tests {
1612    use super::HistoryEventScheduleAt;
1613    use super::PendingStateFinished;
1614    use super::PendingStateFinishedError;
1615    use super::PendingStateFinishedResultKind;
1616    use crate::ExecutionFailureKind;
1617    use crate::SupportedFunctionReturnValue;
1618    use chrono::DateTime;
1619    use chrono::Datelike;
1620    use chrono::Utc;
1621    use insta::assert_snapshot;
1622    use rstest::rstest;
1623    use std::time::Duration;
1624    use val_json::type_wrapper::TypeWrapper;
1625    use val_json::wast_val::WastVal;
1626    use val_json::wast_val::WastValWithType;
1627
1628    #[rstest(expected => [
1629        PendingStateFinishedResultKind::Ok,
1630        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1631    ])]
1632    #[test]
1633    fn serde_pending_state_finished_result_kind_should_work(
1634        expected: PendingStateFinishedResultKind,
1635    ) {
1636        let ser = serde_json::to_string(&expected).unwrap();
1637        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1638        assert_eq!(expected, actual);
1639    }
1640
1641    #[rstest(result_kind => [
1642        PendingStateFinishedResultKind::Ok,
1643        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1644    ])]
1645    #[test]
1646    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1647        let expected = PendingStateFinished {
1648            version: 0,
1649            finished_at: Utc::now(),
1650            result_kind,
1651        };
1652
1653        let ser = serde_json::to_string(&expected).unwrap();
1654        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1655        assert_eq!(expected, actual);
1656    }
1657
1658    #[test]
1659    fn join_set_deser_with_result_ok_option_none_should_work() {
1660        let expected = SupportedFunctionReturnValue::Ok {
1661            ok: Some(WastValWithType {
1662                r#type: TypeWrapper::Result {
1663                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1664                    err: Some(Box::new(TypeWrapper::String)),
1665                },
1666                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1667            }),
1668        };
1669        let json = serde_json::to_string(&expected).unwrap();
1670        assert_snapshot!(json);
1671
1672        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1673
1674        assert_eq!(expected, actual);
1675    }
1676
1677    #[test]
1678    fn as_date_time_should_work_with_duration_u32_max_secs() {
1679        let duration = Duration::from_secs(u64::from(u32::MAX));
1680        let schedule_at = HistoryEventScheduleAt::In(duration);
1681        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1682        assert_eq!(2106, resolved.year());
1683    }
1684
1685    const MILLIS_PER_SEC: i64 = 1000;
1686    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1687
1688    #[test]
1689    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1690        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
1691        let duration = Duration::from_secs(
1692            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1693        );
1694        let schedule_at = HistoryEventScheduleAt::In(duration);
1695        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1696    }
1697}