obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ExecutionFailureKind;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FinishedExecutionError;
7use crate::FunctionFqn;
8use crate::JoinSetId;
9use crate::Params;
10use crate::StrVariant;
11use crate::SupportedFunctionReturnValue;
12use crate::component_id::InputContentDigest;
13use crate::prefixed_ulid::DelayId;
14use crate::prefixed_ulid::ExecutionIdDerived;
15use crate::prefixed_ulid::ExecutorId;
16use crate::prefixed_ulid::RunId;
17use assert_matches::assert_matches;
18use async_trait::async_trait;
19use chrono::TimeDelta;
20use chrono::{DateTime, Utc};
21use http_client_trace::HttpClientTrace;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Display;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::time::Duration;
29use strum::IntoStaticStr;
30use tracing::debug;
31use tracing::error;
32
33pub const STATE_PENDING_AT: &str = "PendingAt";
34pub const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
35pub const STATE_LOCKED: &str = "Locked";
36pub const STATE_FINISHED: &str = "Finished";
37pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
38
39/// Remote client representation of the execution journal.
40#[derive(Debug, PartialEq, Eq)]
41#[cfg_attr(feature = "test", derive(Serialize))]
42pub struct ExecutionLog {
43    pub execution_id: ExecutionId,
44    pub events: Vec<ExecutionEvent>,
45    pub responses: Vec<JoinSetResponseEventOuter>,
46    pub next_version: Version, // Is not advanced once in Finished state
47    pub pending_state: PendingState,
48}
49
50impl ExecutionLog {
51    /// Return some duration after which the execution will be retried.
52    /// Return `None` if no more retries are allowed.
53    #[must_use]
54    pub fn can_be_retried_after(
55        temporary_event_count: u32,
56        max_retries: Option<u32>,
57        retry_exp_backoff: Duration,
58    ) -> Option<Duration> {
59        // If max_retries == None, wrapping is OK after this succeeds - we want to retry forever.
60        if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
61            // TODO: Add test for number of retries
62            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
63            Some(duration)
64        } else {
65            None
66        }
67    }
68
69    #[must_use]
70    pub fn compute_retry_duration_when_retrying_forever(
71        temporary_event_count: u32,
72        retry_exp_backoff: Duration,
73    ) -> Duration {
74        Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
75            .expect("`max_retries` set to MAX must never return None")
76    }
77
78    #[must_use]
79    pub fn ffqn(&self) -> &FunctionFqn {
80        assert_matches!(self.events.first(), Some(ExecutionEvent {
81            event: ExecutionRequest::Created { ffqn, .. },
82            ..
83        }) => ffqn)
84    }
85
86    #[must_use]
87    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
88        assert_matches!(self.events.first(), Some(ExecutionEvent {
89            event: ExecutionRequest::Created { parent, .. },
90            ..
91        }) => parent.clone())
92    }
93
94    #[must_use]
95    pub fn last_event(&self) -> &ExecutionEvent {
96        self.events.last().expect("must contain at least one event")
97    }
98
99    #[must_use]
100    pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
101        if let ExecutionEvent {
102            event: ExecutionRequest::Finished { result, .. },
103            ..
104        } = self.events.pop().expect("must contain at least one event")
105        {
106            Some(result)
107        } else {
108            None
109        }
110    }
111
112    #[cfg(feature = "test")]
113    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
114        self.events.iter().filter_map(|event| {
115            if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
116                Some((eh.clone(), event.version.clone()))
117            } else {
118                None
119            }
120        })
121    }
122
123    #[cfg(feature = "test")]
124    #[must_use]
125    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
126        self.events
127            .iter()
128            .find_map(move |event| match &event.event {
129                ExecutionRequest::HistoryEvent {
130                    event:
131                        HistoryEvent::JoinSetRequest {
132                            join_set_id: found,
133                            request,
134                        },
135                    ..
136                } if *join_set_id == *found => Some(request),
137                _ => None,
138            })
139    }
140}
141
142pub type VersionType = u32;
143#[derive(
144    Debug,
145    Default,
146    Clone,
147    PartialEq,
148    Eq,
149    Hash,
150    derive_more::Display,
151    derive_more::Into,
152    serde::Serialize,
153    serde::Deserialize,
154)]
155#[serde(transparent)]
156pub struct Version(pub VersionType);
157impl Version {
158    #[must_use]
159    pub fn new(arg: VersionType) -> Version {
160        Version(arg)
161    }
162
163    #[must_use]
164    pub fn increment(&self) -> Version {
165        Version(self.0 + 1)
166    }
167}
168impl TryFrom<i64> for Version {
169    type Error = VersionParseError;
170    fn try_from(value: i64) -> Result<Self, Self::Error> {
171        VersionType::try_from(value)
172            .map(Version::new)
173            .map_err(|_| VersionParseError)
174    }
175}
176impl From<Version> for usize {
177    fn from(value: Version) -> Self {
178        usize::try_from(value.0).expect("16 bit systems are unsupported")
179    }
180}
181impl From<&Version> for usize {
182    fn from(value: &Version) -> Self {
183        usize::try_from(value.0).expect("16 bit systems are unsupported")
184    }
185}
186
187#[derive(Debug, thiserror::Error)]
188#[error("version must be u32")]
189pub struct VersionParseError;
190
191#[derive(
192    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
193)]
194#[display("{event}")]
195pub struct ExecutionEvent {
196    pub created_at: DateTime<Utc>,
197    pub event: ExecutionRequest,
198    #[serde(skip_serializing_if = "Option::is_none")]
199    pub backtrace_id: Option<Version>,
200    pub version: Version,
201}
202
203/// Moves the execution to [`PendingState::PendingNow`] if it is currently blocked on `JoinNextBlocking`.
204#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
205pub struct JoinSetResponseEventOuter {
206    pub created_at: DateTime<Utc>,
207    pub event: JoinSetResponseEvent,
208}
209
210#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
211pub struct JoinSetResponseEvent {
212    pub join_set_id: JoinSetId,
213    pub event: JoinSetResponse,
214}
215
216#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
217#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
218#[serde(tag = "type")]
219pub enum JoinSetResponse {
220    #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
221    DelayFinished {
222        delay_id: DelayId,
223        result: Result<(), ()>,
224    },
225    #[display("{result}: {child_execution_id}")] // execution completed..
226    ChildExecutionFinished {
227        child_execution_id: ExecutionIdDerived,
228        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
229        finished_version: Version,
230        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
231        result: SupportedFunctionReturnValue,
232    },
233}
234
235pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
236    ffqn: FunctionFqn::new_static("", ""),
237    params: Params::empty(),
238    parent: None,
239    scheduled_at: DateTime::from_timestamp_nanos(0),
240    component_id: ComponentId::dummy_activity(),
241    metadata: ExecutionMetadata::empty(),
242    scheduled_by: None,
243};
244pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
245    event: HistoryEvent::JoinSetCreate {
246        join_set_id: JoinSetId {
247            kind: crate::JoinSetKind::OneOff,
248            name: StrVariant::empty(),
249        },
250    },
251};
252
253#[derive(
254    Clone,
255    derive_more::Debug,
256    derive_more::Display,
257    PartialEq,
258    Eq,
259    Serialize,
260    Deserialize,
261    IntoStaticStr,
262)]
263#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
264#[allow(clippy::large_enum_variant)]
265pub enum ExecutionRequest {
266    #[display("Created({ffqn}, `{scheduled_at}`)")]
267    Created {
268        ffqn: FunctionFqn,
269        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
270        #[debug(skip)]
271        params: Params,
272        parent: Option<(ExecutionId, JoinSetId)>,
273        scheduled_at: DateTime<Utc>,
274        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
275        component_id: ComponentId,
276        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
277        metadata: ExecutionMetadata,
278        scheduled_by: Option<ExecutionId>,
279    },
280    Locked(Locked),
281    /// Returns execution to [`PendingState::PendingNow`] state
282    /// without timing out. This can happen when the executor is running
283    /// out of resources like [`WorkerError::LimitReached`] or when
284    /// the executor is shutting down.
285    #[display("Unlocked(`{backoff_expires_at}`)")]
286    Unlocked {
287        backoff_expires_at: DateTime<Utc>,
288        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
289        reason: StrVariant,
290    },
291    // Created by the executor holding the lock.
292    // After expiry interpreted as pending.
293    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
294    TemporarilyFailed {
295        backoff_expires_at: DateTime<Utc>,
296        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
297        reason: StrVariant,
298        detail: Option<String>,
299        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
300        http_client_traces: Option<Vec<HttpClientTrace>>,
301    },
302    // Created by the executor holding the lock.
303    // After expiry interpreted as pending.
304    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
305    TemporarilyTimedOut {
306        backoff_expires_at: DateTime<Utc>,
307        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
308        http_client_traces: Option<Vec<HttpClientTrace>>,
309    },
310    // Created by the executor holding the lock.
311    #[display("Finished")]
312    Finished {
313        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
314        result: SupportedFunctionReturnValue,
315        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
316        http_client_traces: Option<Vec<HttpClientTrace>>,
317    },
318
319    #[display("HistoryEvent({event})")]
320    HistoryEvent {
321        event: HistoryEvent,
322    },
323}
324
325impl ExecutionRequest {
326    #[must_use]
327    pub fn is_temporary_event(&self) -> bool {
328        matches!(
329            self,
330            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
331        )
332    }
333
334    #[must_use]
335    pub fn variant(&self) -> &'static str {
336        Into::<&'static str>::into(self)
337    }
338
339    #[must_use]
340    pub fn join_set_id(&self) -> Option<&JoinSetId> {
341        match self {
342            Self::Created {
343                parent: Some((_parent_id, join_set_id)),
344                ..
345            } => Some(join_set_id),
346            Self::HistoryEvent {
347                event:
348                    HistoryEvent::JoinSetCreate { join_set_id, .. }
349                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
350                    | HistoryEvent::JoinNext { join_set_id, .. },
351            } => Some(join_set_id),
352            _ => None,
353        }
354    }
355}
356
357#[derive(
358    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
359)]
360#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
361#[display("Locked(`{lock_expires_at}`, {component_id})")]
362pub struct Locked {
363    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
364    pub component_id: ComponentId,
365    pub executor_id: ExecutorId,
366    pub run_id: RunId,
367    pub lock_expires_at: DateTime<Utc>,
368    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
369    pub retry_config: ComponentRetryConfig,
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
373#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
374#[serde(tag = "type")]
375pub enum PersistKind {
376    #[display("RandomU64({min}, {max_inclusive})")]
377    RandomU64 { min: u64, max_inclusive: u64 },
378    #[display("RandomString({min_length}, {max_length_exclusive})")]
379    RandomString {
380        min_length: u64,
381        max_length_exclusive: u64,
382    },
383}
384
385#[must_use]
386pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
387    value.to_be_bytes()
388}
389
390#[must_use]
391pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
392    u64::from_be_bytes(bytes)
393}
394
395#[derive(
396    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
397)]
398#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
399#[serde(tag = "type")]
400/// Must be created by the executor in [`PendingState::Locked`].
401pub enum HistoryEvent {
402    /// Persist a generated pseudorandom value.
403    #[display("Persist")]
404    Persist {
405        #[debug(skip)]
406        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
407        kind: PersistKind,
408    },
409    #[display("JoinSetCreate({join_set_id})")]
410    JoinSetCreate { join_set_id: JoinSetId },
411    #[display("JoinSetRequest({request})")]
412    // join_set_id is part of ExecutionId or DelayId in the `request`
413    JoinSetRequest {
414        join_set_id: JoinSetId,
415        request: JoinSetRequest,
416    },
417    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
418    /// When the response arrives at `resp_time`:
419    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
420    /// original executor can continue. After the expiry any executor can continue without
421    /// marking the execution as timed out.
422    #[display("JoinNext({join_set_id})")]
423    JoinNext {
424        join_set_id: JoinSetId,
425        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
426        /// The pending status will be kept in Locked state until `run_expires_at`.
427        run_expires_at: DateTime<Utc>,
428        /// Set to a specific function when calling `-await-next` extension function, used for
429        /// determinism checks.
430        requested_ffqn: Option<FunctionFqn>,
431        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
432        closing: bool,
433    },
434    /// Records the fact that a join set was awaited more times than its submission count.
435    #[display("JoinNextTooMany({join_set_id})")]
436    JoinNextTooMany {
437        join_set_id: JoinSetId,
438        /// Set to a specific function when calling `-await-next` extension function, used for
439        /// determinism checks.
440        requested_ffqn: Option<FunctionFqn>,
441    },
442    #[display("Schedule({execution_id}, {schedule_at})")]
443    Schedule {
444        execution_id: ExecutionId,
445        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
446    },
447    #[display("Stub({target_execution_id})")]
448    Stub {
449        target_execution_id: ExecutionIdDerived,
450        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
451        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
452        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
453    },
454}
455
456#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
457#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
458pub enum HistoryEventScheduleAt {
459    Now,
460    #[display("At(`{_0}`)")]
461    At(DateTime<Utc>),
462    #[display("In({_0:?})")]
463    In(Duration),
464}
465
466#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
467pub enum ScheduleAtConversionError {
468    #[error("source duration value is out of range")]
469    OutOfRangeError,
470}
471
472impl HistoryEventScheduleAt {
473    pub fn as_date_time(
474        &self,
475        now: DateTime<Utc>,
476    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
477        match self {
478            Self::Now => Ok(now),
479            Self::At(date_time) => Ok(*date_time),
480            Self::In(duration) => {
481                let time_delta = TimeDelta::from_std(*duration)
482                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
483                now.checked_add_signed(time_delta)
484                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
485            }
486        }
487    }
488}
489
490#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
491#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
492#[serde(tag = "type")]
493pub enum JoinSetRequest {
494    // Must be created by the executor in `PendingState::Locked`.
495    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
496    DelayRequest {
497        delay_id: DelayId,
498        expires_at: DateTime<Utc>,
499        schedule_at: HistoryEventScheduleAt,
500    },
501    // Must be created by the executor in `PendingState::Locked`.
502    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
503    ChildExecutionRequest {
504        child_execution_id: ExecutionIdDerived,
505        target_ffqn: FunctionFqn,
506        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
507        params: Params,
508    },
509}
510
511/// Error that is not specific to an execution.
512#[derive(Debug, Clone, thiserror::Error, PartialEq)]
513pub enum DbErrorGeneric {
514    #[error("database error: {0}")]
515    Uncategorized(StrVariant),
516    #[error("database was closed")]
517    Close,
518}
519
520#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
521pub enum DbErrorWriteNonRetriable {
522    #[error("validation failed: {0}")]
523    ValidationFailed(StrVariant),
524    #[error("conflict")]
525    Conflict,
526    #[error("illegal state: {0}")]
527    IllegalState(StrVariant),
528    #[error("version conflict: expected: {expected}, got: {requested}")]
529    VersionConflict {
530        expected: Version,
531        requested: Version,
532    },
533}
534
535/// Write error tied to an execution
536#[derive(Debug, Clone, thiserror::Error, PartialEq)]
537pub enum DbErrorWrite {
538    #[error("cannot write - row not found")]
539    NotFound,
540    #[error("non-retriable error: {0}")]
541    NonRetriable(#[from] DbErrorWriteNonRetriable),
542    #[error(transparent)]
543    Generic(#[from] DbErrorGeneric),
544}
545
546/// Read error tied to an execution
547#[derive(Debug, Clone, thiserror::Error, PartialEq)]
548pub enum DbErrorRead {
549    #[error("cannot read - row not found")]
550    NotFound,
551    #[error(transparent)]
552    Generic(#[from] DbErrorGeneric),
553}
554
555impl From<DbErrorRead> for DbErrorWrite {
556    fn from(value: DbErrorRead) -> DbErrorWrite {
557        match value {
558            DbErrorRead::NotFound => DbErrorWrite::NotFound,
559            DbErrorRead::Generic(err) => DbErrorWrite::Generic(err),
560        }
561    }
562}
563
564#[derive(Debug, thiserror::Error, PartialEq)]
565pub enum DbErrorReadWithTimeout {
566    #[error("timeout")]
567    Timeout(TimeoutOutcome),
568    #[error(transparent)]
569    DbErrorRead(#[from] DbErrorRead),
570}
571impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
572    fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
573        Self::from(DbErrorRead::from(value))
574    }
575}
576
577// Represents next version after successfuly appended to execution log.
578// TODO: Convert to struct with next_version
579pub type AppendResponse = Version;
580pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
581
582#[derive(Debug, Clone)]
583pub struct LockedExecution {
584    pub execution_id: ExecutionId,
585    pub next_version: Version,
586    pub metadata: ExecutionMetadata,
587    pub locked_event: Locked,
588    pub ffqn: FunctionFqn,
589    pub params: Params,
590    pub event_history: Vec<(HistoryEvent, Version)>,
591    pub responses: Vec<JoinSetResponseEventOuter>,
592    pub parent: Option<(ExecutionId, JoinSetId)>,
593    pub intermittent_event_count: u32,
594}
595
596pub type LockPendingResponse = Vec<LockedExecution>;
597pub type AppendBatchResponse = Version;
598
599#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
600#[display("{event}")]
601pub struct AppendRequest {
602    pub created_at: DateTime<Utc>,
603    pub event: ExecutionRequest,
604}
605
606#[derive(Debug, Clone)]
607pub struct CreateRequest {
608    pub created_at: DateTime<Utc>,
609    pub execution_id: ExecutionId,
610    pub ffqn: FunctionFqn,
611    pub params: Params,
612    pub parent: Option<(ExecutionId, JoinSetId)>,
613    pub scheduled_at: DateTime<Utc>,
614    pub component_id: ComponentId,
615    pub metadata: ExecutionMetadata,
616    pub scheduled_by: Option<ExecutionId>,
617}
618
619impl From<CreateRequest> for ExecutionRequest {
620    fn from(value: CreateRequest) -> Self {
621        Self::Created {
622            ffqn: value.ffqn,
623            params: value.params,
624            parent: value.parent,
625            scheduled_at: value.scheduled_at,
626            component_id: value.component_id,
627            metadata: value.metadata,
628            scheduled_by: value.scheduled_by,
629        }
630    }
631}
632
633#[async_trait]
634pub trait DbPool: Send + Sync {
635    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
636
637    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
638
639    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
640
641    #[cfg(feature = "test")]
642    async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
643}
644
645#[async_trait]
646pub trait DbPoolCloseable {
647    async fn close(&self);
648}
649
650#[derive(Clone, Debug)]
651pub struct AppendEventsToExecution {
652    pub execution_id: ExecutionId,
653    pub version: Version,
654    pub batch: Vec<AppendRequest>,
655}
656
657#[derive(Clone, Debug)]
658pub struct AppendResponseToExecution {
659    pub parent_execution_id: ExecutionId,
660    pub created_at: DateTime<Utc>,
661    pub join_set_id: JoinSetId,
662    pub child_execution_id: ExecutionIdDerived,
663    pub finished_version: Version,
664    pub result: SupportedFunctionReturnValue,
665}
666
667#[async_trait]
668pub trait DbExecutor: Send + Sync {
669    #[expect(clippy::too_many_arguments)]
670    async fn lock_pending_by_ffqns(
671        &self,
672        batch_size: u32,
673        pending_at_or_sooner: DateTime<Utc>,
674        ffqns: Arc<[FunctionFqn]>,
675        created_at: DateTime<Utc>,
676        component_id: ComponentId,
677        executor_id: ExecutorId,
678        lock_expires_at: DateTime<Utc>,
679        run_id: RunId,
680        retry_config: ComponentRetryConfig,
681    ) -> Result<LockPendingResponse, DbErrorGeneric>;
682
683    #[expect(clippy::too_many_arguments)]
684    async fn lock_pending_by_component_id(
685        &self,
686        batch_size: u32,
687        pending_at_or_sooner: DateTime<Utc>,
688        component_id: &ComponentId,
689        created_at: DateTime<Utc>,
690        executor_id: ExecutorId,
691        lock_expires_at: DateTime<Utc>,
692        run_id: RunId,
693        retry_config: ComponentRetryConfig,
694    ) -> Result<LockPendingResponse, DbErrorGeneric>;
695
696    /// Specialized locking for e.g. extending the lock by the original executor and run.
697    #[expect(clippy::too_many_arguments)]
698    async fn lock_one(
699        &self,
700        created_at: DateTime<Utc>,
701        component_id: ComponentId,
702        execution_id: &ExecutionId,
703        run_id: RunId,
704        version: Version,
705        executor_id: ExecutorId,
706        lock_expires_at: DateTime<Utc>,
707        retry_config: ComponentRetryConfig,
708    ) -> Result<LockedExecution, DbErrorWrite>;
709
710    /// Append a single event to an existing execution log.
711    /// The request cannot contain `ExecutionEventInner::Created`.
712    async fn append(
713        &self,
714        execution_id: ExecutionId,
715        version: Version,
716        req: AppendRequest,
717    ) -> Result<AppendResponse, DbErrorWrite>;
718
719    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
720    /// The batch cannot contain `ExecutionEventInner::Created`.
721    async fn append_batch_respond_to_parent(
722        &self,
723        events: AppendEventsToExecution,
724        response: AppendResponseToExecution,
725        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
726    ) -> Result<AppendBatchResponse, DbErrorWrite>;
727
728    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
729    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
730    /// Otherwise wait until `timeout_fut` resolves.
731    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
732    async fn wait_for_pending_by_ffqn(
733        &self,
734        pending_at_or_sooner: DateTime<Utc>,
735        ffqns: Arc<[FunctionFqn]>,
736        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
737    );
738
739    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
740    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
741    /// Otherwise wait until `timeout_fut` resolves.
742    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
743    async fn wait_for_pending_by_component_id(
744        &self,
745        pending_at_or_sooner: DateTime<Utc>,
746        component_id: &ComponentId,
747        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
748    );
749
750    async fn cancel_activity_with_retries(
751        &self,
752        execution_id: &ExecutionId,
753        cancelled_at: DateTime<Utc>,
754    ) -> Result<CancelOutcome, DbErrorWrite> {
755        let mut retries = 5;
756        loop {
757            let res = self.cancel_activity(execution_id, cancelled_at).await;
758            if res.is_ok() || retries == 0 {
759                return res;
760            }
761            retries -= 1;
762        }
763    }
764
765    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
766    async fn get_last_execution_event(
767        &self,
768        execution_id: &ExecutionId,
769    ) -> Result<ExecutionEvent, DbErrorRead>;
770
771    async fn cancel_activity(
772        &self,
773        execution_id: &ExecutionId,
774        cancelled_at: DateTime<Utc>,
775    ) -> Result<CancelOutcome, DbErrorWrite> {
776        debug!("Determining cancellation state of {execution_id}");
777
778        let last_event = self
779            .get_last_execution_event(execution_id)
780            .await
781            .map_err(DbErrorWrite::from)?;
782        if let ExecutionRequest::Finished {
783            result:
784                SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
785                    kind: ExecutionFailureKind::Cancelled,
786                    ..
787                }),
788            ..
789        } = last_event.event
790        {
791            return Ok(CancelOutcome::Cancelled);
792        } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
793            debug!("Not cancelling, {execution_id} is already finished");
794            return Ok(CancelOutcome::AlreadyFinished);
795        }
796        let finished_version = last_event.version.increment();
797        let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
798            reason: None,
799            kind: ExecutionFailureKind::Cancelled,
800            detail: None,
801        });
802        let cancel_request = AppendRequest {
803            created_at: cancelled_at,
804            event: ExecutionRequest::Finished {
805                result: child_result.clone(),
806                http_client_traces: None,
807            },
808        };
809        debug!("Cancelling activity {execution_id} at {finished_version}");
810        if let ExecutionId::Derived(execution_id) = execution_id {
811            let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
812            let child_execution_id = ExecutionId::Derived(execution_id.clone());
813            self.append_batch_respond_to_parent(
814                AppendEventsToExecution {
815                    execution_id: child_execution_id,
816                    version: finished_version.clone(),
817                    batch: vec![cancel_request],
818                },
819                AppendResponseToExecution {
820                    parent_execution_id,
821                    created_at: cancelled_at,
822                    join_set_id: join_set_id.clone(),
823                    child_execution_id: execution_id.clone(),
824                    finished_version,
825                    result: child_result,
826                },
827                cancelled_at,
828            )
829            .await?;
830        } else {
831            self.append(execution_id.clone(), finished_version, cancel_request)
832                .await?;
833        }
834        debug!("Cancelled {execution_id}");
835        Ok(CancelOutcome::Cancelled)
836    }
837}
838
839pub enum AppendDelayResponseOutcome {
840    Success,
841    AlreadyFinished,
842    AlreadyCancelled,
843}
844
845#[async_trait]
846pub trait DbExternalApi: DbConnection {
847    /// Get the latest backtrace if version is not set.
848    async fn get_backtrace(
849        &self,
850        execution_id: &ExecutionId,
851        filter: BacktraceFilter,
852    ) -> Result<BacktraceInfo, DbErrorRead>;
853
854    /// Returns executions sorted in descending order.
855    async fn list_executions(
856        &self,
857        ffqn: Option<FunctionFqn>,
858        top_level_only: bool,
859        pagination: ExecutionListPagination,
860    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
861
862    async fn list_execution_events(
863        &self,
864        execution_id: &ExecutionId,
865        since: &Version,
866        max_length: VersionType,
867        include_backtrace_id: bool,
868    ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
869
870    /// Returns responses of an execution ordered as they arrived,
871    /// enabling matching each `JoinNext` to its corresponding response.
872    async fn list_responses(
873        &self,
874        execution_id: &ExecutionId,
875        pagination: Pagination<u32>,
876    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
877
878    async fn upgrade_execution_component(
879        &self,
880        execution_id: &ExecutionId,
881        old: &InputContentDigest,
882        new: &InputContentDigest,
883    ) -> Result<(), DbErrorWrite>;
884}
885
886#[async_trait]
887pub trait DbConnection: DbExecutor {
888    async fn append_delay_response(
889        &self,
890        created_at: DateTime<Utc>,
891        execution_id: ExecutionId,
892        join_set_id: JoinSetId,
893        delay_id: DelayId,
894        outcome: Result<(), ()>, // Successfully finished - `Ok(())` or cancelled - `Err(())`
895    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
896
897    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
898    /// The batch cannot contain `ExecutionEventInner::Created`.
899    async fn append_batch(
900        &self,
901        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
902        batch: Vec<AppendRequest>,
903        execution_id: ExecutionId,
904        version: Version,
905    ) -> Result<AppendBatchResponse, DbErrorWrite>;
906
907    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
908    /// The batch cannot contain `ExecutionEventInner::Created`.
909    async fn append_batch_create_new_execution(
910        &self,
911        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
912        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
913        execution_id: ExecutionId,
914        version: Version,
915        child_req: Vec<CreateRequest>,
916    ) -> Result<AppendBatchResponse, DbErrorWrite>;
917
918    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
919    async fn get_execution_event(
920        &self,
921        execution_id: &ExecutionId,
922        version: &Version,
923    ) -> Result<ExecutionEvent, DbErrorRead>;
924
925    async fn get_create_request(
926        &self,
927        execution_id: &ExecutionId,
928    ) -> Result<CreateRequest, DbErrorRead> {
929        let execution_event = self
930            .get_execution_event(execution_id, &Version::new(0))
931            .await?;
932        if let ExecutionRequest::Created {
933            ffqn,
934            params,
935            parent,
936            scheduled_at,
937            component_id,
938            metadata,
939            scheduled_by,
940        } = execution_event.event
941        {
942            Ok(CreateRequest {
943                created_at: execution_event.created_at,
944                execution_id: execution_id.clone(),
945                ffqn,
946                params,
947                parent,
948                scheduled_at,
949                component_id,
950                metadata,
951                scheduled_by,
952            })
953        } else {
954            error!(%execution_id, "Execution log must start with creation");
955            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized(
956                "execution log must start with creation".into(),
957            )))
958        }
959    }
960
961    async fn get_pending_state(
962        &self,
963        execution_id: &ExecutionId,
964    ) -> Result<PendingState, DbErrorRead>;
965
966    /// Get currently expired locks and async timers (delay requests)
967    async fn get_expired_timers(
968        &self,
969        at: DateTime<Utc>,
970    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
971
972    /// Create a new execution log
973    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
974
975    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
976    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
977    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
978    /// Implementations with no pubsub support should use polling.
979    /// Callers are expected to call this function in a loop with a reasonable timeout
980    /// to support less stellar implementations.
981    async fn subscribe_to_next_responses(
982        &self,
983        execution_id: &ExecutionId,
984        start_idx: u32,
985        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
986    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
987
988    /// Notification mechainism with no strict guarantees for getting the finished result.
989    /// Implementations with no pubsub support should use polling.
990    /// Callers are expected to call this function in a loop with a reasonable timeout
991    /// to support less stellar implementations.
992    async fn wait_for_finished_result(
993        &self,
994        execution_id: &ExecutionId,
995        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
996        // TODO: camcel fut
997    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
998
999    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1000
1001    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1002}
1003
1004#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1005pub enum TimeoutOutcome {
1006    Timeout,
1007    Cancel,
1008}
1009
1010#[cfg(feature = "test")]
1011#[async_trait]
1012pub trait DbConnectionTest: DbConnection {
1013    async fn append_response(
1014        &self,
1015        created_at: DateTime<Utc>,
1016        execution_id: ExecutionId,
1017        response_event: JoinSetResponseEvent,
1018    ) -> Result<(), DbErrorWrite>;
1019
1020    /// Get execution log.
1021    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1022}
1023
1024#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1025pub enum CancelOutcome {
1026    Cancelled,
1027    AlreadyFinished,
1028}
1029
1030pub async fn stub_execution(
1031    db_connection: &dyn DbConnection,
1032    execution_id: ExecutionIdDerived,
1033    parent_execution_id: ExecutionId,
1034    join_set_id: JoinSetId,
1035    created_at: DateTime<Utc>,
1036    return_value: SupportedFunctionReturnValue,
1037) -> Result<(), DbErrorWrite> {
1038    let stub_finished_version = Version::new(1); // Stub activities have no execution log except Created event.
1039    // Attempt to write to `execution_id` and its parent, ignoring the possible conflict error on this tx
1040    let write_attempt = {
1041        let finished_req = AppendRequest {
1042            created_at,
1043            event: ExecutionRequest::Finished {
1044                result: return_value.clone(),
1045                http_client_traces: None,
1046            },
1047        };
1048        db_connection
1049            .append_batch_respond_to_parent(
1050                AppendEventsToExecution {
1051                    execution_id: ExecutionId::Derived(execution_id.clone()),
1052                    version: stub_finished_version.clone(),
1053                    batch: vec![finished_req],
1054                },
1055                AppendResponseToExecution {
1056                    parent_execution_id,
1057                    created_at,
1058                    join_set_id,
1059                    child_execution_id: execution_id.clone(),
1060                    finished_version: stub_finished_version.clone(),
1061                    result: return_value.clone(),
1062                },
1063                created_at,
1064            )
1065            .await
1066    };
1067    if let Err(write_attempt) = write_attempt {
1068        // Check that the expected value is in the database
1069        debug!("Stub write attempt failed - {write_attempt:?}");
1070
1071        let found = db_connection
1072            .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1073            .await?; // Not found at this point should not happen, unless the previous write failed. Will be retried.
1074        match found.event {
1075            ExecutionRequest::Finished {
1076                result: found_result,
1077                ..
1078            } if return_value == found_result => {
1079                // Same value has already be written, RPC is successful.
1080                Ok(())
1081            }
1082            ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1083                DbErrorWriteNonRetriable::Conflict,
1084            )),
1085            _other => Err(DbErrorWrite::NonRetriable(
1086                DbErrorWriteNonRetriable::IllegalState(
1087                    "unexpected execution event at stubbed execution".into(),
1088                ),
1089            )),
1090        }
1091    } else {
1092        Ok(())
1093    }
1094}
1095
1096pub async fn cancel_delay(
1097    db_connection: &dyn DbConnection,
1098    delay_id: DelayId,
1099    created_at: DateTime<Utc>,
1100) -> Result<CancelOutcome, DbErrorWrite> {
1101    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1102    db_connection
1103        .append_delay_response(
1104            created_at,
1105            parent_execution_id,
1106            join_set_id,
1107            delay_id,
1108            Err(()), // Mark as cancelled.
1109        )
1110        .await
1111        .map(|ok| match ok {
1112            AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1113                CancelOutcome::Cancelled
1114            }
1115            AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1116        })
1117}
1118
1119#[derive(Clone)]
1120pub enum BacktraceFilter {
1121    First,
1122    Last,
1123    Specific(Version),
1124}
1125
1126#[derive(Clone, Debug, PartialEq, Eq)]
1127pub struct BacktraceInfo {
1128    pub execution_id: ExecutionId,
1129    pub component_id: ComponentId,
1130    pub version_min_including: Version,
1131    pub version_max_excluding: Version,
1132    pub wasm_backtrace: WasmBacktrace,
1133}
1134
1135#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1136pub struct WasmBacktrace {
1137    pub frames: Vec<FrameInfo>,
1138}
1139
1140#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1141pub struct FrameInfo {
1142    pub module: String,
1143    pub func_name: String,
1144    pub symbols: Vec<FrameSymbol>,
1145}
1146
1147#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1148pub struct FrameSymbol {
1149    pub func_name: Option<String>,
1150    pub file: Option<String>,
1151    pub line: Option<u32>,
1152    pub col: Option<u32>,
1153}
1154
1155mod wasm_backtrace {
1156    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1157
1158    impl WasmBacktrace {
1159        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1160            if backtrace.frames().is_empty() {
1161                None
1162            } else {
1163                Some(Self {
1164                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1165                })
1166            }
1167        }
1168    }
1169
1170    impl From<&wasmtime::FrameInfo> for FrameInfo {
1171        fn from(frame: &wasmtime::FrameInfo) -> Self {
1172            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1173            let mut func_name = String::new();
1174            wasmtime_environ::demangle_function_name_or_index(
1175                &mut func_name,
1176                frame.func_name(),
1177                frame.func_index() as usize,
1178            )
1179            .expect("writing to string must succeed");
1180            Self {
1181                module: module_name,
1182                func_name,
1183                symbols: frame
1184                    .symbols()
1185                    .iter()
1186                    .map(std::convert::Into::into)
1187                    .collect(),
1188            }
1189        }
1190    }
1191
1192    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1193        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1194            let func_name = symbol.name().map(|name| {
1195                let mut writer = String::new();
1196                wasmtime_environ::demangle_function_name(&mut writer, name)
1197                    .expect("writing to string must succeed");
1198                writer
1199            });
1200
1201            Self {
1202                func_name,
1203                file: symbol.file().map(ToString::to_string),
1204                line: symbol.line(),
1205                col: symbol.column(),
1206            }
1207        }
1208    }
1209}
1210
1211pub type ResponseCursorType = u32;
1212
1213#[derive(Debug, Clone, Serialize)]
1214pub struct ResponseWithCursor {
1215    pub event: JoinSetResponseEventOuter,
1216    pub cursor: ResponseCursorType,
1217}
1218
1219#[derive(Debug)]
1220pub struct ExecutionWithState {
1221    pub execution_id: ExecutionId,
1222    pub ffqn: FunctionFqn,
1223    pub pending_state: PendingState,
1224    pub created_at: DateTime<Utc>,
1225    pub first_scheduled_at: DateTime<Utc>,
1226    pub component_digest: InputContentDigest,
1227}
1228
1229#[derive(Debug, Clone)]
1230pub enum ExecutionListPagination {
1231    CreatedBy(Pagination<Option<DateTime<Utc>>>),
1232    ExecutionId(Pagination<Option<ExecutionId>>),
1233}
1234impl Default for ExecutionListPagination {
1235    fn default() -> ExecutionListPagination {
1236        ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1237            length: 20,
1238            cursor: None,
1239            including_cursor: false, // does not matter when `cursor` is not specified
1240        })
1241    }
1242}
1243impl ExecutionListPagination {
1244    #[must_use]
1245    pub fn length(&self) -> u16 {
1246        match self {
1247            ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1248            ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1249        }
1250    }
1251}
1252
1253#[derive(Debug, Clone, Copy)]
1254pub enum Pagination<T> {
1255    NewerThan {
1256        length: u16,
1257        cursor: T,
1258        including_cursor: bool,
1259    },
1260    OlderThan {
1261        length: u16,
1262        cursor: T,
1263        including_cursor: bool,
1264    },
1265}
1266impl<T> Pagination<T> {
1267    pub fn length(&self) -> u16 {
1268        match self {
1269            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1270        }
1271    }
1272    pub fn rel(&self) -> &'static str {
1273        match self {
1274            Pagination::NewerThan {
1275                including_cursor: false,
1276                ..
1277            } => ">",
1278            Pagination::NewerThan {
1279                including_cursor: true,
1280                ..
1281            } => ">=",
1282            Pagination::OlderThan {
1283                including_cursor: false,
1284                ..
1285            } => "<",
1286            Pagination::OlderThan {
1287                including_cursor: true,
1288                ..
1289            } => "<=",
1290        }
1291    }
1292    pub fn is_desc(&self) -> bool {
1293        matches!(self, Pagination::OlderThan { .. })
1294    }
1295    pub fn cursor(&self) -> &T {
1296        match self {
1297            Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1298        }
1299    }
1300}
1301
1302#[cfg(feature = "test")]
1303pub async fn wait_for_pending_state_fn<T: Debug>(
1304    db_connection: &dyn DbConnectionTest,
1305    execution_id: &ExecutionId,
1306    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1307    timeout: Option<Duration>,
1308) -> Result<T, DbErrorReadWithTimeout> {
1309    tracing::trace!(%execution_id, "Waiting for predicate");
1310    let fut = async move {
1311        loop {
1312            let execution_log = db_connection.get(execution_id).await?;
1313            if let Some(t) = predicate(execution_log) {
1314                tracing::debug!(%execution_id, "Found: {t:?}");
1315                return Ok(t);
1316            }
1317            tokio::time::sleep(Duration::from_millis(10)).await;
1318        }
1319    };
1320
1321    if let Some(timeout) = timeout {
1322        tokio::select! { // future's liveness: Dropping the loser immediately.
1323            res = fut => res,
1324            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
1325        }
1326    } else {
1327        fut.await
1328    }
1329}
1330
1331#[derive(Debug, Clone, PartialEq, Eq)]
1332pub enum ExpiredTimer {
1333    Lock(ExpiredLock),
1334    Delay(ExpiredDelay),
1335}
1336
1337#[derive(Debug, Clone, PartialEq, Eq)]
1338pub struct ExpiredLock {
1339    pub execution_id: ExecutionId,
1340    // Version of last `Locked` event, used to detect whether the execution made progress.
1341    pub locked_at_version: Version,
1342    pub next_version: Version,
1343    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1344    pub intermittent_event_count: u32,
1345    pub max_retries: Option<u32>,
1346    pub retry_exp_backoff: Duration,
1347    pub locked_by: LockedBy,
1348}
1349
1350#[derive(Debug, Clone, PartialEq, Eq)]
1351pub struct ExpiredDelay {
1352    pub execution_id: ExecutionId,
1353    pub join_set_id: JoinSetId,
1354    pub delay_id: DelayId,
1355}
1356
1357#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1358#[serde(tag = "status")]
1359pub enum PendingState {
1360    Locked(PendingStateLocked),
1361    #[display("PendingAt(`{scheduled_at}`)")]
1362    PendingAt {
1363        scheduled_at: DateTime<Utc>,
1364        last_lock: Option<LockedBy>, // Needed for lock extension
1365        component_id_input_digest: InputContentDigest,
1366    }, // e.g. created with a schedule, temporary timeout/failure
1367    #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1368    /// Caused by [`HistoryEvent::JoinNext`]
1369    BlockedByJoinSet {
1370        join_set_id: JoinSetId,
1371        /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1372        lock_expires_at: DateTime<Utc>,
1373        /// Blocked by closing of the join set
1374        closing: bool,
1375        component_id_input_digest: InputContentDigest,
1376    },
1377    #[display("Finished({finished})")]
1378    Finished {
1379        #[serde(flatten)]
1380        finished: PendingStateFinished,
1381        component_id_input_digest: InputContentDigest,
1382    },
1383}
1384impl PendingState {
1385    #[must_use]
1386    pub fn get_component_id_input_digest(&self) -> &InputContentDigest {
1387        match self {
1388            PendingState::Locked(pending_state_locked) => {
1389                &pending_state_locked.component_id_input_digest
1390            }
1391            PendingState::PendingAt {
1392                component_id_input_digest,
1393                ..
1394            }
1395            | PendingState::BlockedByJoinSet {
1396                component_id_input_digest,
1397                ..
1398            }
1399            | PendingState::Finished {
1400                component_id_input_digest,
1401                ..
1402            } => component_id_input_digest,
1403        }
1404    }
1405}
1406
1407#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1408#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1409pub struct PendingStateLocked {
1410    pub locked_by: LockedBy,
1411    pub lock_expires_at: DateTime<Utc>,
1412    pub component_id_input_digest: InputContentDigest,
1413}
1414
1415#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1416pub struct LockedBy {
1417    pub executor_id: ExecutorId,
1418    pub run_id: RunId,
1419}
1420
1421#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1422pub struct PendingStateFinished {
1423    pub version: VersionType, // not Version since it must be Copy
1424    pub finished_at: DateTime<Utc>,
1425    pub result_kind: PendingStateFinishedResultKind,
1426}
1427impl Display for PendingStateFinished {
1428    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1429        match self.result_kind {
1430            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1431            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1432        }
1433    }
1434}
1435
1436// This is not a Result so that it can be customized for serialization
1437#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1438#[serde(rename_all = "snake_case")]
1439pub enum PendingStateFinishedResultKind {
1440    Ok,
1441    Err(PendingStateFinishedError),
1442}
1443impl PendingStateFinishedResultKind {
1444    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1445        match self {
1446            PendingStateFinishedResultKind::Ok => Ok(()),
1447            PendingStateFinishedResultKind::Err(err) => Err(err),
1448        }
1449    }
1450}
1451
1452impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1453    fn from(result: &SupportedFunctionReturnValue) -> Self {
1454        result.as_pending_state_finished_result()
1455    }
1456}
1457
1458#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1459pub enum PendingStateFinishedError {
1460    #[display("execution terminated: {_0}")]
1461    ExecutionFailure(ExecutionFailureKind),
1462    #[display("execution completed with an error")]
1463    FallibleError,
1464}
1465
1466impl PendingState {
1467    pub fn can_append_lock(
1468        &self,
1469        created_at: DateTime<Utc>,
1470        executor_id: ExecutorId,
1471        run_id: RunId,
1472        lock_expires_at: DateTime<Utc>,
1473    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1474        if lock_expires_at <= created_at {
1475            return Err(DbErrorWriteNonRetriable::ValidationFailed(
1476                "invalid expiry date".into(),
1477            ));
1478        }
1479        match self {
1480            PendingState::PendingAt {
1481                scheduled_at,
1482                last_lock,
1483                component_id_input_digest: _,
1484            } => {
1485                if *scheduled_at <= created_at {
1486                    // pending now, ok to lock
1487                    Ok(LockKind::CreatingNewLock)
1488                } else if let Some(LockedBy {
1489                    executor_id: last_executor_id,
1490                    run_id: last_run_id,
1491                }) = last_lock
1492                    && executor_id == *last_executor_id
1493                    && run_id == *last_run_id
1494                {
1495                    // Original executor is extending the lock.
1496                    Ok(LockKind::Extending)
1497                } else {
1498                    Err(DbErrorWriteNonRetriable::ValidationFailed(
1499                        "cannot lock, not yet pending".into(),
1500                    ))
1501                }
1502            }
1503            PendingState::Locked(PendingStateLocked {
1504                locked_by:
1505                    LockedBy {
1506                        executor_id: current_pending_state_executor_id,
1507                        run_id: current_pending_state_run_id,
1508                    },
1509                lock_expires_at: _,
1510                component_id_input_digest: _,
1511            }) => {
1512                if executor_id == *current_pending_state_executor_id
1513                    && run_id == *current_pending_state_run_id
1514                {
1515                    // Original executor is extending the lock.
1516                    Ok(LockKind::Extending)
1517                } else {
1518                    Err(DbErrorWriteNonRetriable::IllegalState(
1519                        "cannot lock, already locked".into(),
1520                    ))
1521                }
1522            }
1523            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1524                "cannot append Locked event when in BlockedByJoinSet state".into(),
1525            )),
1526            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1527                "already finished".into(),
1528            )),
1529        }
1530    }
1531
1532    #[must_use]
1533    pub fn is_finished(&self) -> bool {
1534        matches!(self, PendingState::Finished { .. })
1535    }
1536}
1537
1538#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1539pub enum LockKind {
1540    Extending,
1541    CreatingNewLock,
1542}
1543
1544pub mod http_client_trace {
1545    use chrono::{DateTime, Utc};
1546    use serde::{Deserialize, Serialize};
1547
1548    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1549    pub struct HttpClientTrace {
1550        pub req: RequestTrace,
1551        pub resp: Option<ResponseTrace>,
1552    }
1553
1554    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1555    pub struct RequestTrace {
1556        pub sent_at: DateTime<Utc>,
1557        pub uri: String,
1558        pub method: String,
1559    }
1560
1561    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1562    pub struct ResponseTrace {
1563        pub finished_at: DateTime<Utc>,
1564        pub status: Result<u16, String>,
1565    }
1566}
1567
1568#[cfg(test)]
1569mod tests {
1570    use super::HistoryEventScheduleAt;
1571    use super::PendingStateFinished;
1572    use super::PendingStateFinishedError;
1573    use super::PendingStateFinishedResultKind;
1574    use crate::ExecutionFailureKind;
1575    use crate::SupportedFunctionReturnValue;
1576    use chrono::DateTime;
1577    use chrono::Datelike;
1578    use chrono::Utc;
1579    use insta::assert_snapshot;
1580    use rstest::rstest;
1581    use std::time::Duration;
1582    use val_json::type_wrapper::TypeWrapper;
1583    use val_json::wast_val::WastVal;
1584    use val_json::wast_val::WastValWithType;
1585
1586    #[rstest(expected => [
1587        PendingStateFinishedResultKind::Ok,
1588        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1589    ])]
1590    #[test]
1591    fn serde_pending_state_finished_result_kind_should_work(
1592        expected: PendingStateFinishedResultKind,
1593    ) {
1594        let ser = serde_json::to_string(&expected).unwrap();
1595        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1596        assert_eq!(expected, actual);
1597    }
1598
1599    #[rstest(result_kind => [
1600        PendingStateFinishedResultKind::Ok,
1601        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1602    ])]
1603    #[test]
1604    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1605        let expected = PendingStateFinished {
1606            version: 0,
1607            finished_at: Utc::now(),
1608            result_kind,
1609        };
1610
1611        let ser = serde_json::to_string(&expected).unwrap();
1612        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1613        assert_eq!(expected, actual);
1614    }
1615
1616    #[test]
1617    fn join_set_deser_with_result_ok_option_none_should_work() {
1618        let expected = SupportedFunctionReturnValue::Ok {
1619            ok: Some(WastValWithType {
1620                r#type: TypeWrapper::Result {
1621                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1622                    err: Some(Box::new(TypeWrapper::String)),
1623                },
1624                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1625            }),
1626        };
1627        let json = serde_json::to_string(&expected).unwrap();
1628        assert_snapshot!(json);
1629
1630        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1631
1632        assert_eq!(expected, actual);
1633    }
1634
1635    #[test]
1636    fn as_date_time_should_work_with_duration_u32_max_secs() {
1637        let duration = Duration::from_secs(u64::from(u32::MAX));
1638        let schedule_at = HistoryEventScheduleAt::In(duration);
1639        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1640        assert_eq!(2106, resolved.year());
1641    }
1642
1643    const MILLIS_PER_SEC: i64 = 1000;
1644    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1645
1646    #[test]
1647    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1648        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
1649        let duration = Duration::from_secs(
1650            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1651        );
1652        let schedule_at = HistoryEventScheduleAt::In(duration);
1653        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1654    }
1655}