obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ExecutionFailureKind;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FinishedExecutionError;
7use crate::FunctionFqn;
8use crate::JoinSetId;
9use crate::Params;
10use crate::StrVariant;
11use crate::SupportedFunctionReturnValue;
12use crate::component_id::InputContentDigest;
13use crate::prefixed_ulid::DelayId;
14use crate::prefixed_ulid::ExecutionIdDerived;
15use crate::prefixed_ulid::ExecutorId;
16use crate::prefixed_ulid::RunId;
17use assert_matches::assert_matches;
18use async_trait::async_trait;
19use chrono::TimeDelta;
20use chrono::{DateTime, Utc};
21use http_client_trace::HttpClientTrace;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Display;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::time::Duration;
29use strum::IntoStaticStr;
30use tracing::debug;
31use tracing::error;
32
33pub const STATE_PENDING_AT: &str = "PendingAt";
34pub const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
35pub const STATE_LOCKED: &str = "Locked";
36pub const STATE_FINISHED: &str = "Finished";
37pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
38
39/// Remote client representation of the execution journal.
40#[derive(Debug, PartialEq, Eq)]
41#[cfg_attr(feature = "test", derive(Serialize))]
42pub struct ExecutionLog {
43    pub execution_id: ExecutionId,
44    pub events: Vec<ExecutionEvent>,
45    pub responses: Vec<JoinSetResponseEventOuter>,
46    pub next_version: Version, // Is not advanced once in Finished state
47    pub pending_state: PendingState,
48}
49
50impl ExecutionLog {
51    /// Return some duration after which the execution will be retried.
52    /// Return `None` if no more retries are allowed.
53    #[must_use]
54    pub fn can_be_retried_after(
55        temporary_event_count: u32,
56        max_retries: Option<u32>,
57        retry_exp_backoff: Duration,
58    ) -> Option<Duration> {
59        // If max_retries == None, wrapping is OK after this succeeds - we want to retry forever.
60        if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
61            // TODO: Add test for number of retries
62            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
63            Some(duration)
64        } else {
65            None
66        }
67    }
68
69    #[must_use]
70    pub fn compute_retry_duration_when_retrying_forever(
71        temporary_event_count: u32,
72        retry_exp_backoff: Duration,
73    ) -> Duration {
74        Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
75            .expect("`max_retries` set to MAX must never return None")
76    }
77
78    #[must_use]
79    pub fn ffqn(&self) -> &FunctionFqn {
80        assert_matches!(self.events.first(), Some(ExecutionEvent {
81            event: ExecutionRequest::Created { ffqn, .. },
82            ..
83        }) => ffqn)
84    }
85
86    #[must_use]
87    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
88        assert_matches!(self.events.first(), Some(ExecutionEvent {
89            event: ExecutionRequest::Created { parent, .. },
90            ..
91        }) => parent.clone())
92    }
93
94    #[must_use]
95    pub fn last_event(&self) -> &ExecutionEvent {
96        self.events.last().expect("must contain at least one event")
97    }
98
99    #[must_use]
100    pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
101        if let ExecutionEvent {
102            event: ExecutionRequest::Finished { result, .. },
103            ..
104        } = self.events.pop().expect("must contain at least one event")
105        {
106            Some(result)
107        } else {
108            None
109        }
110    }
111
112    #[cfg(feature = "test")]
113    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
114        self.events.iter().filter_map(|event| {
115            if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
116                Some((eh.clone(), event.version.clone()))
117            } else {
118                None
119            }
120        })
121    }
122
123    #[cfg(feature = "test")]
124    #[must_use]
125    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
126        self.events
127            .iter()
128            .find_map(move |event| match &event.event {
129                ExecutionRequest::HistoryEvent {
130                    event:
131                        HistoryEvent::JoinSetRequest {
132                            join_set_id: found,
133                            request,
134                        },
135                    ..
136                } if *join_set_id == *found => Some(request),
137                _ => None,
138            })
139    }
140}
141
142pub type VersionType = u32;
143#[derive(
144    Debug,
145    Default,
146    Clone,
147    PartialEq,
148    Eq,
149    Hash,
150    derive_more::Display,
151    derive_more::Into,
152    serde::Serialize,
153    serde::Deserialize,
154)]
155#[serde(transparent)]
156pub struct Version(pub VersionType);
157impl Version {
158    #[must_use]
159    pub fn new(arg: VersionType) -> Version {
160        Version(arg)
161    }
162
163    #[must_use]
164    pub fn increment(&self) -> Version {
165        Version(self.0 + 1)
166    }
167}
168impl TryFrom<i64> for Version {
169    type Error = VersionParseError;
170    fn try_from(value: i64) -> Result<Self, Self::Error> {
171        VersionType::try_from(value)
172            .map(Version::new)
173            .map_err(|_| VersionParseError)
174    }
175}
176impl From<Version> for usize {
177    fn from(value: Version) -> Self {
178        usize::try_from(value.0).expect("16 bit systems are unsupported")
179    }
180}
181impl From<&Version> for usize {
182    fn from(value: &Version) -> Self {
183        usize::try_from(value.0).expect("16 bit systems are unsupported")
184    }
185}
186
187#[derive(Debug, thiserror::Error)]
188#[error("version must be u32")]
189pub struct VersionParseError;
190
191#[derive(
192    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
193)]
194#[display("{event}")]
195pub struct ExecutionEvent {
196    pub created_at: DateTime<Utc>,
197    pub event: ExecutionRequest,
198    #[serde(skip_serializing_if = "Option::is_none")]
199    pub backtrace_id: Option<Version>,
200    pub version: Version,
201}
202
203/// Moves the execution to [`PendingState::PendingNow`] if it is currently blocked on `JoinNextBlocking`.
204#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
205pub struct JoinSetResponseEventOuter {
206    pub created_at: DateTime<Utc>,
207    pub event: JoinSetResponseEvent,
208}
209
210#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
211pub struct JoinSetResponseEvent {
212    pub join_set_id: JoinSetId,
213    pub event: JoinSetResponse,
214}
215
216#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
217#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
218#[serde(tag = "type")]
219pub enum JoinSetResponse {
220    #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
221    DelayFinished {
222        delay_id: DelayId,
223        result: Result<(), ()>,
224    },
225    #[display("{result}: {child_execution_id}")] // execution completed..
226    ChildExecutionFinished {
227        child_execution_id: ExecutionIdDerived,
228        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
229        finished_version: Version,
230        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
231        result: SupportedFunctionReturnValue,
232    },
233}
234
235pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
236    ffqn: FunctionFqn::new_static("", ""),
237    params: Params::empty(),
238    parent: None,
239    scheduled_at: DateTime::from_timestamp_nanos(0),
240    component_id: ComponentId::dummy_activity(),
241    metadata: ExecutionMetadata::empty(),
242    scheduled_by: None,
243};
244pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
245    event: HistoryEvent::JoinSetCreate {
246        join_set_id: JoinSetId {
247            kind: crate::JoinSetKind::OneOff,
248            name: StrVariant::empty(),
249        },
250    },
251};
252
253#[derive(
254    Clone,
255    derive_more::Debug,
256    derive_more::Display,
257    PartialEq,
258    Eq,
259    Serialize,
260    Deserialize,
261    IntoStaticStr,
262)]
263#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
264#[allow(clippy::large_enum_variant)]
265pub enum ExecutionRequest {
266    #[display("Created({ffqn}, `{scheduled_at}`)")]
267    Created {
268        ffqn: FunctionFqn,
269        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
270        #[debug(skip)]
271        params: Params,
272        parent: Option<(ExecutionId, JoinSetId)>,
273        scheduled_at: DateTime<Utc>,
274        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
275        component_id: ComponentId,
276        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
277        metadata: ExecutionMetadata,
278        scheduled_by: Option<ExecutionId>,
279    },
280    Locked(Locked),
281    /// Returns execution to [`PendingState::PendingNow`] state
282    /// without timing out. This can happen when the executor is running
283    /// out of resources like [`WorkerError::LimitReached`] or when
284    /// the executor is shutting down.
285    #[display("Unlocked(`{backoff_expires_at}`)")]
286    Unlocked {
287        backoff_expires_at: DateTime<Utc>,
288        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
289        reason: StrVariant,
290    },
291    // Created by the executor holding the lock.
292    // After expiry interpreted as pending.
293    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
294    TemporarilyFailed {
295        backoff_expires_at: DateTime<Utc>,
296        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
297        reason: StrVariant,
298        detail: Option<String>,
299        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
300        http_client_traces: Option<Vec<HttpClientTrace>>,
301    },
302    // Created by the executor holding the lock.
303    // After expiry interpreted as pending.
304    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
305    TemporarilyTimedOut {
306        backoff_expires_at: DateTime<Utc>,
307        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
308        http_client_traces: Option<Vec<HttpClientTrace>>,
309    },
310    // Created by the executor holding the lock.
311    #[display("Finished")]
312    Finished {
313        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
314        result: SupportedFunctionReturnValue,
315        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
316        http_client_traces: Option<Vec<HttpClientTrace>>,
317    },
318
319    #[display("HistoryEvent({event})")]
320    HistoryEvent {
321        event: HistoryEvent,
322    },
323}
324
325impl ExecutionRequest {
326    #[must_use]
327    pub fn is_temporary_event(&self) -> bool {
328        matches!(
329            self,
330            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
331        )
332    }
333
334    #[must_use]
335    pub fn variant(&self) -> &'static str {
336        Into::<&'static str>::into(self)
337    }
338
339    #[must_use]
340    pub fn join_set_id(&self) -> Option<&JoinSetId> {
341        match self {
342            Self::Created {
343                parent: Some((_parent_id, join_set_id)),
344                ..
345            } => Some(join_set_id),
346            Self::HistoryEvent {
347                event:
348                    HistoryEvent::JoinSetCreate { join_set_id, .. }
349                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
350                    | HistoryEvent::JoinNext { join_set_id, .. },
351            } => Some(join_set_id),
352            _ => None,
353        }
354    }
355}
356
357#[derive(
358    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
359)]
360#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
361#[display("Locked(`{lock_expires_at}`, {component_id})")]
362pub struct Locked {
363    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
364    pub component_id: ComponentId,
365    pub executor_id: ExecutorId,
366    pub run_id: RunId,
367    pub lock_expires_at: DateTime<Utc>,
368    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
369    pub retry_config: ComponentRetryConfig,
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
373#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
374#[serde(tag = "type")]
375pub enum PersistKind {
376    #[display("RandomU64({min}, {max_inclusive})")]
377    RandomU64 { min: u64, max_inclusive: u64 },
378    #[display("RandomString({min_length}, {max_length_exclusive})")]
379    RandomString {
380        min_length: u64,
381        max_length_exclusive: u64,
382    },
383}
384
385#[must_use]
386pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
387    value.to_be_bytes()
388}
389
390#[must_use]
391pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
392    u64::from_be_bytes(bytes)
393}
394
395#[derive(
396    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
397)]
398#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
399#[serde(tag = "type")]
400/// Must be created by the executor in [`PendingState::Locked`].
401pub enum HistoryEvent {
402    /// Persist a generated pseudorandom value.
403    #[display("Persist")]
404    Persist {
405        #[debug(skip)]
406        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
407        kind: PersistKind,
408    },
409    #[display("JoinSetCreate({join_set_id})")]
410    JoinSetCreate { join_set_id: JoinSetId },
411    #[display("JoinSetRequest({request})")]
412    // join_set_id is part of ExecutionId or DelayId in the `request`
413    JoinSetRequest {
414        join_set_id: JoinSetId,
415        request: JoinSetRequest,
416    },
417    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
418    /// When the response arrives at `resp_time`:
419    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
420    /// original executor can continue. After the expiry any executor can continue without
421    /// marking the execution as timed out.
422    #[display("JoinNext({join_set_id})")]
423    JoinNext {
424        join_set_id: JoinSetId,
425        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
426        /// The pending status will be kept in Locked state until `run_expires_at`.
427        run_expires_at: DateTime<Utc>,
428        /// Set to a specific function when calling `-await-next` extension function, used for
429        /// determinism checks.
430        requested_ffqn: Option<FunctionFqn>,
431        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
432        closing: bool,
433    },
434    /// Records the fact that a join set was awaited more times than its submission count.
435    #[display("JoinNextTooMany({join_set_id})")]
436    JoinNextTooMany {
437        join_set_id: JoinSetId,
438        /// Set to a specific function when calling `-await-next` extension function, used for
439        /// determinism checks.
440        requested_ffqn: Option<FunctionFqn>,
441    },
442    #[display("Schedule({execution_id}, {schedule_at})")]
443    Schedule {
444        execution_id: ExecutionId,
445        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
446    },
447    #[display("Stub({target_execution_id})")]
448    Stub {
449        target_execution_id: ExecutionIdDerived,
450        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
451        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
452        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
453    },
454}
455
456#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
457#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
458pub enum HistoryEventScheduleAt {
459    Now,
460    #[display("At(`{_0}`)")]
461    At(DateTime<Utc>),
462    #[display("In({_0:?})")]
463    In(Duration),
464}
465
466#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
467pub enum ScheduleAtConversionError {
468    #[error("source duration value is out of range")]
469    OutOfRangeError,
470}
471
472impl HistoryEventScheduleAt {
473    pub fn as_date_time(
474        &self,
475        now: DateTime<Utc>,
476    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
477        match self {
478            Self::Now => Ok(now),
479            Self::At(date_time) => Ok(*date_time),
480            Self::In(duration) => {
481                let time_delta = TimeDelta::from_std(*duration)
482                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
483                now.checked_add_signed(time_delta)
484                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
485            }
486        }
487    }
488}
489
490#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
491#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
492#[serde(tag = "type")]
493pub enum JoinSetRequest {
494    // Must be created by the executor in `PendingState::Locked`.
495    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
496    DelayRequest {
497        delay_id: DelayId,
498        expires_at: DateTime<Utc>,
499        schedule_at: HistoryEventScheduleAt,
500    },
501    // Must be created by the executor in `PendingState::Locked`.
502    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
503    ChildExecutionRequest {
504        child_execution_id: ExecutionIdDerived,
505        target_ffqn: FunctionFqn,
506        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
507        params: Params,
508    },
509}
510
511/// Error that is not specific to an execution.
512#[derive(Debug, Clone, thiserror::Error, PartialEq)]
513pub enum DbErrorGeneric {
514    #[error("database error: {0}")]
515    Uncategorized(StrVariant),
516    #[error("database was closed")]
517    Close,
518}
519
520#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
521pub enum DbErrorWriteNonRetriable {
522    #[error("validation failed: {0}")]
523    ValidationFailed(StrVariant),
524    #[error("conflict")]
525    Conflict,
526    #[error("illegal state: {0}")]
527    IllegalState(StrVariant),
528    #[error("version conflict: expected: {expected}, got: {requested}")]
529    VersionConflict {
530        expected: Version,
531        requested: Version,
532    },
533}
534
535/// Write error tied to an execution
536#[derive(Debug, Clone, thiserror::Error, PartialEq)]
537pub enum DbErrorWrite {
538    #[error("cannot write - row not found")]
539    NotFound,
540    #[error("non-retriable error: {0}")]
541    NonRetriable(#[from] DbErrorWriteNonRetriable),
542    #[error(transparent)]
543    Generic(#[from] DbErrorGeneric),
544}
545
546/// Read error tied to an execution
547#[derive(Debug, Clone, thiserror::Error, PartialEq)]
548pub enum DbErrorRead {
549    #[error("cannot read - row not found")]
550    NotFound,
551    #[error(transparent)]
552    Generic(#[from] DbErrorGeneric),
553}
554
555impl From<DbErrorRead> for DbErrorWrite {
556    fn from(value: DbErrorRead) -> DbErrorWrite {
557        match value {
558            DbErrorRead::NotFound => DbErrorWrite::NotFound,
559            DbErrorRead::Generic(err) => DbErrorWrite::Generic(err),
560        }
561    }
562}
563
564#[derive(Debug, thiserror::Error, PartialEq)]
565pub enum DbErrorReadWithTimeout {
566    #[error("timeout")]
567    Timeout(TimeoutOutcome),
568    #[error(transparent)]
569    DbErrorRead(#[from] DbErrorRead),
570}
571impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
572    fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
573        Self::from(DbErrorRead::from(value))
574    }
575}
576
577// Represents next version after successfuly appended to execution log.
578// TODO: Convert to struct with next_version
579pub type AppendResponse = Version;
580pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
581
582#[derive(Debug, Clone)]
583pub struct LockedExecution {
584    pub execution_id: ExecutionId,
585    pub next_version: Version,
586    pub metadata: ExecutionMetadata,
587    pub locked_event: Locked,
588    pub ffqn: FunctionFqn,
589    pub params: Params,
590    pub event_history: Vec<(HistoryEvent, Version)>,
591    pub responses: Vec<JoinSetResponseEventOuter>,
592    pub parent: Option<(ExecutionId, JoinSetId)>,
593    pub intermittent_event_count: u32,
594}
595
596pub type LockPendingResponse = Vec<LockedExecution>;
597pub type AppendBatchResponse = Version;
598
599#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
600#[display("{event}")]
601pub struct AppendRequest {
602    pub created_at: DateTime<Utc>,
603    pub event: ExecutionRequest,
604}
605
606#[derive(Debug, Clone)]
607pub struct CreateRequest {
608    pub created_at: DateTime<Utc>,
609    pub execution_id: ExecutionId,
610    pub ffqn: FunctionFqn,
611    pub params: Params,
612    pub parent: Option<(ExecutionId, JoinSetId)>,
613    pub scheduled_at: DateTime<Utc>,
614    pub component_id: ComponentId,
615    pub metadata: ExecutionMetadata,
616    pub scheduled_by: Option<ExecutionId>,
617}
618
619impl From<CreateRequest> for ExecutionRequest {
620    fn from(value: CreateRequest) -> Self {
621        Self::Created {
622            ffqn: value.ffqn,
623            params: value.params,
624            parent: value.parent,
625            scheduled_at: value.scheduled_at,
626            component_id: value.component_id,
627            metadata: value.metadata,
628            scheduled_by: value.scheduled_by,
629        }
630    }
631}
632
633#[async_trait]
634pub trait DbPool: Send + Sync {
635    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
636
637    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
638
639    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
640
641    #[cfg(feature = "test")]
642    async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
643}
644
645#[async_trait]
646pub trait DbPoolCloseable {
647    async fn close(&self);
648}
649
650#[derive(Clone, Debug)]
651pub struct AppendEventsToExecution {
652    pub execution_id: ExecutionId,
653    pub version: Version,
654    pub batch: Vec<AppendRequest>,
655}
656
657#[derive(Clone, Debug)]
658pub struct AppendResponseToExecution {
659    pub parent_execution_id: ExecutionId,
660    pub created_at: DateTime<Utc>,
661    pub join_set_id: JoinSetId,
662    pub child_execution_id: ExecutionIdDerived,
663    pub finished_version: Version,
664    pub result: SupportedFunctionReturnValue,
665}
666
667#[async_trait]
668pub trait DbExecutor: Send + Sync {
669    #[expect(clippy::too_many_arguments)]
670    async fn lock_pending_by_ffqns(
671        &self,
672        batch_size: u32,
673        pending_at_or_sooner: DateTime<Utc>,
674        ffqns: Arc<[FunctionFqn]>,
675        created_at: DateTime<Utc>,
676        component_id: ComponentId,
677        executor_id: ExecutorId,
678        lock_expires_at: DateTime<Utc>,
679        run_id: RunId,
680        retry_config: ComponentRetryConfig,
681    ) -> Result<LockPendingResponse, DbErrorGeneric>;
682
683    #[expect(clippy::too_many_arguments)]
684    async fn lock_pending_by_component_id(
685        &self,
686        batch_size: u32,
687        pending_at_or_sooner: DateTime<Utc>,
688        component_id: &ComponentId,
689        created_at: DateTime<Utc>,
690        executor_id: ExecutorId,
691        lock_expires_at: DateTime<Utc>,
692        run_id: RunId,
693        retry_config: ComponentRetryConfig,
694    ) -> Result<LockPendingResponse, DbErrorGeneric>;
695
696    /// Specialized locking for e.g. extending the lock by the original executor and run.
697    #[expect(clippy::too_many_arguments)]
698    async fn lock_one(
699        &self,
700        created_at: DateTime<Utc>,
701        component_id: ComponentId,
702        execution_id: &ExecutionId,
703        run_id: RunId,
704        version: Version,
705        executor_id: ExecutorId,
706        lock_expires_at: DateTime<Utc>,
707        retry_config: ComponentRetryConfig,
708    ) -> Result<LockedExecution, DbErrorWrite>;
709
710    /// Append a single event to an existing execution log.
711    /// The request cannot contain `ExecutionEventInner::Created`.
712    async fn append(
713        &self,
714        execution_id: ExecutionId,
715        version: Version,
716        req: AppendRequest,
717    ) -> Result<AppendResponse, DbErrorWrite>;
718
719    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
720    /// The batch cannot contain `ExecutionEventInner::Created`.
721    async fn append_batch_respond_to_parent(
722        &self,
723        events: AppendEventsToExecution,
724        response: AppendResponseToExecution,
725        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
726    ) -> Result<AppendBatchResponse, DbErrorWrite>;
727
728    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
729    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
730    /// Otherwise wait until `timeout_fut` resolves.
731    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
732    async fn wait_for_pending_by_ffqn(
733        &self,
734        pending_at_or_sooner: DateTime<Utc>,
735        ffqns: Arc<[FunctionFqn]>,
736        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
737    );
738
739    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
740    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
741    /// Otherwise wait until `timeout_fut` resolves.
742    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
743    async fn wait_for_pending_by_component_id(
744        &self,
745        pending_at_or_sooner: DateTime<Utc>,
746        component_id: &ComponentId,
747        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
748    );
749
750    async fn cancel_activity_with_retries(
751        &self,
752        execution_id: &ExecutionId,
753        cancelled_at: DateTime<Utc>,
754    ) -> Result<CancelOutcome, DbErrorWrite> {
755        let mut retries = 5;
756        loop {
757            let res = self.cancel_activity(execution_id, cancelled_at).await;
758            if res.is_ok() || retries == 0 {
759                return res;
760            }
761            retries -= 1;
762        }
763    }
764
765    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
766    async fn get_last_execution_event(
767        &self,
768        execution_id: &ExecutionId,
769    ) -> Result<ExecutionEvent, DbErrorRead>;
770
771    async fn cancel_activity(
772        &self,
773        execution_id: &ExecutionId,
774        cancelled_at: DateTime<Utc>,
775    ) -> Result<CancelOutcome, DbErrorWrite> {
776        debug!("Determining cancellation state of {execution_id}");
777
778        let last_event = self
779            .get_last_execution_event(execution_id)
780            .await
781            .map_err(DbErrorWrite::from)?;
782        if let ExecutionRequest::Finished {
783            result:
784                SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
785                    kind: ExecutionFailureKind::Cancelled,
786                    ..
787                }),
788            ..
789        } = last_event.event
790        {
791            return Ok(CancelOutcome::Cancelled);
792        } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
793            debug!("Not cancelling, {execution_id} is already finished");
794            return Ok(CancelOutcome::AlreadyFinished);
795        }
796        let finished_version = last_event.version.increment();
797        let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
798            reason: None,
799            kind: ExecutionFailureKind::Cancelled,
800            detail: None,
801        });
802        let cancel_request = AppendRequest {
803            created_at: cancelled_at,
804            event: ExecutionRequest::Finished {
805                result: child_result.clone(),
806                http_client_traces: None,
807            },
808        };
809        debug!("Cancelling activity {execution_id} at {finished_version}");
810        if let ExecutionId::Derived(execution_id) = execution_id {
811            let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
812            let child_execution_id = ExecutionId::Derived(execution_id.clone());
813            self.append_batch_respond_to_parent(
814                AppendEventsToExecution {
815                    execution_id: child_execution_id,
816                    version: finished_version.clone(),
817                    batch: vec![cancel_request],
818                },
819                AppendResponseToExecution {
820                    parent_execution_id,
821                    created_at: cancelled_at,
822                    join_set_id: join_set_id.clone(),
823                    child_execution_id: execution_id.clone(),
824                    finished_version,
825                    result: child_result,
826                },
827                cancelled_at,
828            )
829            .await?;
830        } else {
831            self.append(execution_id.clone(), finished_version, cancel_request)
832                .await?;
833        }
834        debug!("Cancelled {execution_id}");
835        Ok(CancelOutcome::Cancelled)
836    }
837}
838
839pub enum AppendDelayResponseOutcome {
840    Success,
841    AlreadyFinished,
842    AlreadyCancelled,
843}
844
845#[async_trait]
846pub trait DbExternalApi: DbConnection {
847    /// Get the latest backtrace if version is not set.
848    async fn get_backtrace(
849        &self,
850        execution_id: &ExecutionId,
851        filter: BacktraceFilter,
852    ) -> Result<BacktraceInfo, DbErrorRead>;
853
854    /// Returns executions sorted in descending order.
855    async fn list_executions(
856        &self,
857        ffqn: Option<FunctionFqn>,
858        top_level_only: bool,
859        pagination: ExecutionListPagination,
860    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
861
862    async fn list_execution_events(
863        &self,
864        execution_id: &ExecutionId,
865        since: &Version,
866        max_length: VersionType,
867        include_backtrace_id: bool,
868    ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
869
870    /// Returns responses of an execution ordered as they arrived,
871    /// enabling matching each `JoinNext` to its corresponding response.
872    async fn list_responses(
873        &self,
874        execution_id: &ExecutionId,
875        pagination: Pagination<u32>,
876    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
877
878    async fn upgrade_execution_component(
879        &self,
880        execution_id: &ExecutionId,
881        old: &InputContentDigest,
882        new: &InputContentDigest,
883    ) -> Result<(), DbErrorWrite>;
884}
885
886#[async_trait]
887pub trait DbConnection: DbExecutor {
888    async fn append_delay_response(
889        &self,
890        created_at: DateTime<Utc>,
891        execution_id: ExecutionId,
892        join_set_id: JoinSetId,
893        delay_id: DelayId,
894        outcome: Result<(), ()>, // Successfully finished - `Ok(())` or cancelled - `Err(())`
895    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
896
897    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
898    /// The batch cannot contain `ExecutionEventInner::Created`.
899    async fn append_batch(
900        &self,
901        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
902        batch: Vec<AppendRequest>,
903        execution_id: ExecutionId,
904        version: Version,
905    ) -> Result<AppendBatchResponse, DbErrorWrite>;
906
907    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
908    /// The batch cannot contain `ExecutionEventInner::Created`.
909    async fn append_batch_create_new_execution(
910        &self,
911        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
912        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
913        execution_id: ExecutionId,
914        version: Version,
915        child_req: Vec<CreateRequest>,
916    ) -> Result<AppendBatchResponse, DbErrorWrite>;
917
918    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
919    async fn get_execution_event(
920        &self,
921        execution_id: &ExecutionId,
922        version: &Version,
923    ) -> Result<ExecutionEvent, DbErrorRead>;
924
925    async fn get_create_request(
926        &self,
927        execution_id: &ExecutionId,
928    ) -> Result<CreateRequest, DbErrorRead> {
929        let execution_event = self
930            .get_execution_event(execution_id, &Version::new(0))
931            .await?;
932        if let ExecutionRequest::Created {
933            ffqn,
934            params,
935            parent,
936            scheduled_at,
937            component_id,
938            metadata,
939            scheduled_by,
940        } = execution_event.event
941        {
942            Ok(CreateRequest {
943                created_at: execution_event.created_at,
944                execution_id: execution_id.clone(),
945                ffqn,
946                params,
947                parent,
948                scheduled_at,
949                component_id,
950                metadata,
951                scheduled_by,
952            })
953        } else {
954            error!(%execution_id, "Execution log must start with creation");
955            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized(
956                "execution log must start with creation".into(),
957            )))
958        }
959    }
960
961    async fn get_pending_state(
962        &self,
963        execution_id: &ExecutionId,
964    ) -> Result<PendingState, DbErrorRead>;
965
966    /// Get currently expired locks and async timers (delay requests)
967    async fn get_expired_timers(
968        &self,
969        at: DateTime<Utc>,
970    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
971
972    /// Create a new execution log
973    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
974
975    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
976    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
977    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
978    /// Implementations with no pubsub support should use polling.
979    /// Callers are expected to call this function in a loop with a reasonable timeout
980    /// to support less stellar implementations.
981    async fn subscribe_to_next_responses(
982        &self,
983        execution_id: &ExecutionId,
984        start_idx: u32,
985        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
986    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
987
988    /// Notification mechainism with no strict guarantees for getting the finished result.
989    /// Implementations with no pubsub support should use polling.
990    /// Callers are expected to call this function in a loop with a reasonable timeout
991    /// to support less stellar implementations.
992    async fn wait_for_finished_result(
993        &self,
994        execution_id: &ExecutionId,
995        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
996        // TODO: camcel fut
997    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
998
999    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1000
1001    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1002}
1003
1004#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1005pub enum TimeoutOutcome {
1006    Timeout,
1007    Cancel,
1008}
1009
1010#[cfg(feature = "test")]
1011#[async_trait]
1012pub trait DbConnectionTest: DbConnection {
1013    async fn append_response(
1014        &self,
1015        created_at: DateTime<Utc>,
1016        execution_id: ExecutionId,
1017        response_event: JoinSetResponseEvent,
1018    ) -> Result<(), DbErrorWrite>;
1019
1020    /// Get execution log.
1021    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1022}
1023
1024#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1025pub enum CancelOutcome {
1026    Cancelled,
1027    AlreadyFinished,
1028}
1029
1030pub async fn stub_execution(
1031    db_connection: &dyn DbConnection,
1032    execution_id: ExecutionIdDerived,
1033    parent_execution_id: ExecutionId,
1034    join_set_id: JoinSetId,
1035    created_at: DateTime<Utc>,
1036    return_value: SupportedFunctionReturnValue,
1037) -> Result<(), DbErrorWrite> {
1038    let stub_finished_version = Version::new(1); // Stub activities have no execution log except Created event.
1039    // Attempt to write to `execution_id` and its parent, ignoring the possible conflict error on this tx
1040    let write_attempt = {
1041        let finished_req = AppendRequest {
1042            created_at,
1043            event: ExecutionRequest::Finished {
1044                result: return_value.clone(),
1045                http_client_traces: None,
1046            },
1047        };
1048        db_connection
1049            .append_batch_respond_to_parent(
1050                AppendEventsToExecution {
1051                    execution_id: ExecutionId::Derived(execution_id.clone()),
1052                    version: stub_finished_version.clone(),
1053                    batch: vec![finished_req],
1054                },
1055                AppendResponseToExecution {
1056                    parent_execution_id,
1057                    created_at,
1058                    join_set_id,
1059                    child_execution_id: execution_id.clone(),
1060                    finished_version: stub_finished_version.clone(),
1061                    result: return_value.clone(),
1062                },
1063                created_at,
1064            )
1065            .await
1066    };
1067    if let Err(write_attempt) = write_attempt {
1068        // Check that the expected value is in the database
1069        debug!("Stub write attempt failed - {write_attempt:?}");
1070
1071        let found = db_connection
1072            .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1073            .await?; // Not found at this point should not happen, unless the previous write failed. Will be retried.
1074        match found.event {
1075            ExecutionRequest::Finished {
1076                result: found_result,
1077                ..
1078            } if return_value == found_result => {
1079                // Same value has already be written, RPC is successful.
1080                Ok(())
1081            }
1082            ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1083                DbErrorWriteNonRetriable::Conflict,
1084            )),
1085            _other => Err(DbErrorWrite::NonRetriable(
1086                DbErrorWriteNonRetriable::IllegalState(
1087                    "unexpected execution event at stubbed execution".into(),
1088                ),
1089            )),
1090        }
1091    } else {
1092        Ok(())
1093    }
1094}
1095
1096pub async fn cancel_delay(
1097    db_connection: &dyn DbConnection,
1098    delay_id: DelayId,
1099    created_at: DateTime<Utc>,
1100) -> Result<CancelOutcome, DbErrorWrite> {
1101    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1102    db_connection
1103        .append_delay_response(
1104            created_at,
1105            parent_execution_id,
1106            join_set_id,
1107            delay_id,
1108            Err(()), // Mark as cancelled.
1109        )
1110        .await
1111        .map(|ok| match ok {
1112            AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1113                CancelOutcome::Cancelled
1114            }
1115            AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1116        })
1117}
1118
1119#[derive(Clone)]
1120pub enum BacktraceFilter {
1121    First,
1122    Last,
1123    Specific(Version),
1124}
1125
1126pub struct BacktraceInfo {
1127    pub execution_id: ExecutionId,
1128    pub component_id: ComponentId,
1129    pub version_min_including: Version,
1130    pub version_max_excluding: Version,
1131    pub wasm_backtrace: WasmBacktrace,
1132}
1133
1134#[derive(Serialize, Deserialize, Debug, Clone)]
1135pub struct WasmBacktrace {
1136    pub frames: Vec<FrameInfo>,
1137}
1138
1139#[derive(Serialize, Deserialize, Debug, Clone)]
1140pub struct FrameInfo {
1141    pub module: String,
1142    pub func_name: String,
1143    pub symbols: Vec<FrameSymbol>,
1144}
1145
1146#[derive(Serialize, Deserialize, Debug, Clone)]
1147pub struct FrameSymbol {
1148    pub func_name: Option<String>,
1149    pub file: Option<String>,
1150    pub line: Option<u32>,
1151    pub col: Option<u32>,
1152}
1153
1154mod wasm_backtrace {
1155    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1156
1157    impl WasmBacktrace {
1158        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1159            if backtrace.frames().is_empty() {
1160                None
1161            } else {
1162                Some(Self {
1163                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1164                })
1165            }
1166        }
1167    }
1168
1169    impl From<&wasmtime::FrameInfo> for FrameInfo {
1170        fn from(frame: &wasmtime::FrameInfo) -> Self {
1171            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1172            let mut func_name = String::new();
1173            wasmtime_environ::demangle_function_name_or_index(
1174                &mut func_name,
1175                frame.func_name(),
1176                frame.func_index() as usize,
1177            )
1178            .expect("writing to string must succeed");
1179            Self {
1180                module: module_name,
1181                func_name,
1182                symbols: frame
1183                    .symbols()
1184                    .iter()
1185                    .map(std::convert::Into::into)
1186                    .collect(),
1187            }
1188        }
1189    }
1190
1191    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1192        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1193            let func_name = symbol.name().map(|name| {
1194                let mut writer = String::new();
1195                wasmtime_environ::demangle_function_name(&mut writer, name)
1196                    .expect("writing to string must succeed");
1197                writer
1198            });
1199
1200            Self {
1201                func_name,
1202                file: symbol.file().map(ToString::to_string),
1203                line: symbol.line(),
1204                col: symbol.column(),
1205            }
1206        }
1207    }
1208}
1209
1210pub type ResponseCursorType = u32;
1211
1212#[derive(Debug, Clone, Serialize)]
1213pub struct ResponseWithCursor {
1214    pub event: JoinSetResponseEventOuter,
1215    pub cursor: ResponseCursorType,
1216}
1217
1218#[derive(Debug)]
1219pub struct ExecutionWithState {
1220    pub execution_id: ExecutionId,
1221    pub ffqn: FunctionFqn,
1222    pub pending_state: PendingState,
1223    pub created_at: DateTime<Utc>,
1224    pub first_scheduled_at: DateTime<Utc>,
1225    pub component_digest: InputContentDigest,
1226}
1227
1228#[derive(Debug, Clone)]
1229pub enum ExecutionListPagination {
1230    CreatedBy(Pagination<Option<DateTime<Utc>>>),
1231    ExecutionId(Pagination<Option<ExecutionId>>),
1232}
1233impl Default for ExecutionListPagination {
1234    fn default() -> ExecutionListPagination {
1235        ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1236            length: 20,
1237            cursor: None,
1238            including_cursor: false, // does not matter when `cursor` is not specified
1239        })
1240    }
1241}
1242impl ExecutionListPagination {
1243    #[must_use]
1244    pub fn length(&self) -> u16 {
1245        match self {
1246            ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1247            ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1248        }
1249    }
1250}
1251
1252#[derive(Debug, Clone, Copy)]
1253pub enum Pagination<T> {
1254    NewerThan {
1255        length: u16,
1256        cursor: T,
1257        including_cursor: bool,
1258    },
1259    OlderThan {
1260        length: u16,
1261        cursor: T,
1262        including_cursor: bool,
1263    },
1264}
1265impl<T> Pagination<T> {
1266    pub fn length(&self) -> u16 {
1267        match self {
1268            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1269        }
1270    }
1271    pub fn rel(&self) -> &'static str {
1272        match self {
1273            Pagination::NewerThan {
1274                including_cursor: false,
1275                ..
1276            } => ">",
1277            Pagination::NewerThan {
1278                including_cursor: true,
1279                ..
1280            } => ">=",
1281            Pagination::OlderThan {
1282                including_cursor: false,
1283                ..
1284            } => "<",
1285            Pagination::OlderThan {
1286                including_cursor: true,
1287                ..
1288            } => "<=",
1289        }
1290    }
1291    pub fn is_desc(&self) -> bool {
1292        matches!(self, Pagination::OlderThan { .. })
1293    }
1294    pub fn cursor(&self) -> &T {
1295        match self {
1296            Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1297        }
1298    }
1299}
1300
1301#[cfg(feature = "test")]
1302pub async fn wait_for_pending_state_fn<T: Debug>(
1303    db_connection: &dyn DbConnectionTest,
1304    execution_id: &ExecutionId,
1305    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1306    timeout: Option<Duration>,
1307) -> Result<T, DbErrorReadWithTimeout> {
1308    tracing::trace!(%execution_id, "Waiting for predicate");
1309    let fut = async move {
1310        loop {
1311            let execution_log = db_connection.get(execution_id).await?;
1312            if let Some(t) = predicate(execution_log) {
1313                tracing::debug!(%execution_id, "Found: {t:?}");
1314                return Ok(t);
1315            }
1316            tokio::time::sleep(Duration::from_millis(10)).await;
1317        }
1318    };
1319
1320    if let Some(timeout) = timeout {
1321        tokio::select! { // future's liveness: Dropping the loser immediately.
1322            res = fut => res,
1323            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
1324        }
1325    } else {
1326        fut.await
1327    }
1328}
1329
1330#[derive(Debug, Clone, PartialEq, Eq)]
1331pub enum ExpiredTimer {
1332    Lock(ExpiredLock),
1333    Delay(ExpiredDelay),
1334}
1335
1336#[derive(Debug, Clone, PartialEq, Eq)]
1337pub struct ExpiredLock {
1338    pub execution_id: ExecutionId,
1339    // Version of last `Locked` event, used to detect whether the execution made progress.
1340    pub locked_at_version: Version,
1341    pub next_version: Version,
1342    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1343    pub intermittent_event_count: u32,
1344    pub max_retries: Option<u32>,
1345    pub retry_exp_backoff: Duration,
1346    pub locked_by: LockedBy,
1347}
1348
1349#[derive(Debug, Clone, PartialEq, Eq)]
1350pub struct ExpiredDelay {
1351    pub execution_id: ExecutionId,
1352    pub join_set_id: JoinSetId,
1353    pub delay_id: DelayId,
1354}
1355
1356#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1357#[serde(tag = "status")]
1358pub enum PendingState {
1359    Locked(PendingStateLocked),
1360    #[display("PendingAt(`{scheduled_at}`)")]
1361    PendingAt {
1362        scheduled_at: DateTime<Utc>,
1363        last_lock: Option<LockedBy>, // Needed for lock extension
1364        component_id_input_digest: InputContentDigest,
1365    }, // e.g. created with a schedule, temporary timeout/failure
1366    #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1367    /// Caused by [`HistoryEvent::JoinNext`]
1368    BlockedByJoinSet {
1369        join_set_id: JoinSetId,
1370        /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1371        lock_expires_at: DateTime<Utc>,
1372        /// Blocked by closing of the join set
1373        closing: bool,
1374        component_id_input_digest: InputContentDigest,
1375    },
1376    #[display("Finished({finished})")]
1377    Finished {
1378        #[serde(flatten)]
1379        finished: PendingStateFinished,
1380        component_id_input_digest: InputContentDigest,
1381    },
1382}
1383impl PendingState {
1384    #[must_use]
1385    pub fn get_component_id_input_digest(&self) -> &InputContentDigest {
1386        match self {
1387            PendingState::Locked(pending_state_locked) => {
1388                &pending_state_locked.component_id_input_digest
1389            }
1390            PendingState::PendingAt {
1391                component_id_input_digest,
1392                ..
1393            }
1394            | PendingState::BlockedByJoinSet {
1395                component_id_input_digest,
1396                ..
1397            }
1398            | PendingState::Finished {
1399                component_id_input_digest,
1400                ..
1401            } => component_id_input_digest,
1402        }
1403    }
1404}
1405
1406#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1407#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1408pub struct PendingStateLocked {
1409    pub locked_by: LockedBy,
1410    pub lock_expires_at: DateTime<Utc>,
1411    pub component_id_input_digest: InputContentDigest,
1412}
1413
1414#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1415pub struct LockedBy {
1416    pub executor_id: ExecutorId,
1417    pub run_id: RunId,
1418}
1419
1420#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1421pub struct PendingStateFinished {
1422    pub version: VersionType, // not Version since it must be Copy
1423    pub finished_at: DateTime<Utc>,
1424    pub result_kind: PendingStateFinishedResultKind,
1425}
1426impl Display for PendingStateFinished {
1427    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1428        match self.result_kind {
1429            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1430            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1431        }
1432    }
1433}
1434
1435// This is not a Result so that it can be customized for serialization
1436#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1437#[serde(rename_all = "snake_case")]
1438pub enum PendingStateFinishedResultKind {
1439    Ok,
1440    Err(PendingStateFinishedError),
1441}
1442impl PendingStateFinishedResultKind {
1443    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1444        match self {
1445            PendingStateFinishedResultKind::Ok => Ok(()),
1446            PendingStateFinishedResultKind::Err(err) => Err(err),
1447        }
1448    }
1449}
1450
1451impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1452    fn from(result: &SupportedFunctionReturnValue) -> Self {
1453        result.as_pending_state_finished_result()
1454    }
1455}
1456
1457#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1458pub enum PendingStateFinishedError {
1459    #[display("execution terminated: {_0}")]
1460    ExecutionFailure(ExecutionFailureKind),
1461    #[display("execution completed with an error")]
1462    FallibleError,
1463}
1464
1465impl PendingState {
1466    pub fn can_append_lock(
1467        &self,
1468        created_at: DateTime<Utc>,
1469        executor_id: ExecutorId,
1470        run_id: RunId,
1471        lock_expires_at: DateTime<Utc>,
1472    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1473        if lock_expires_at <= created_at {
1474            return Err(DbErrorWriteNonRetriable::ValidationFailed(
1475                "invalid expiry date".into(),
1476            ));
1477        }
1478        match self {
1479            PendingState::PendingAt {
1480                scheduled_at,
1481                last_lock,
1482                component_id_input_digest: _,
1483            } => {
1484                if *scheduled_at <= created_at {
1485                    // pending now, ok to lock
1486                    Ok(LockKind::CreatingNewLock)
1487                } else if let Some(LockedBy {
1488                    executor_id: last_executor_id,
1489                    run_id: last_run_id,
1490                }) = last_lock
1491                    && executor_id == *last_executor_id
1492                    && run_id == *last_run_id
1493                {
1494                    // Original executor is extending the lock.
1495                    Ok(LockKind::Extending)
1496                } else {
1497                    Err(DbErrorWriteNonRetriable::ValidationFailed(
1498                        "cannot lock, not yet pending".into(),
1499                    ))
1500                }
1501            }
1502            PendingState::Locked(PendingStateLocked {
1503                locked_by:
1504                    LockedBy {
1505                        executor_id: current_pending_state_executor_id,
1506                        run_id: current_pending_state_run_id,
1507                    },
1508                lock_expires_at: _,
1509                component_id_input_digest: _,
1510            }) => {
1511                if executor_id == *current_pending_state_executor_id
1512                    && run_id == *current_pending_state_run_id
1513                {
1514                    // Original executor is extending the lock.
1515                    Ok(LockKind::Extending)
1516                } else {
1517                    Err(DbErrorWriteNonRetriable::IllegalState(
1518                        "cannot lock, already locked".into(),
1519                    ))
1520                }
1521            }
1522            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1523                "cannot append Locked event when in BlockedByJoinSet state".into(),
1524            )),
1525            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1526                "already finished".into(),
1527            )),
1528        }
1529    }
1530
1531    #[must_use]
1532    pub fn is_finished(&self) -> bool {
1533        matches!(self, PendingState::Finished { .. })
1534    }
1535}
1536
1537#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1538pub enum LockKind {
1539    Extending,
1540    CreatingNewLock,
1541}
1542
1543pub mod http_client_trace {
1544    use chrono::{DateTime, Utc};
1545    use serde::{Deserialize, Serialize};
1546
1547    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1548    pub struct HttpClientTrace {
1549        pub req: RequestTrace,
1550        pub resp: Option<ResponseTrace>,
1551    }
1552
1553    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1554    pub struct RequestTrace {
1555        pub sent_at: DateTime<Utc>,
1556        pub uri: String,
1557        pub method: String,
1558    }
1559
1560    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1561    pub struct ResponseTrace {
1562        pub finished_at: DateTime<Utc>,
1563        pub status: Result<u16, String>,
1564    }
1565}
1566
1567#[cfg(test)]
1568mod tests {
1569    use super::HistoryEventScheduleAt;
1570    use super::PendingStateFinished;
1571    use super::PendingStateFinishedError;
1572    use super::PendingStateFinishedResultKind;
1573    use crate::ExecutionFailureKind;
1574    use crate::SupportedFunctionReturnValue;
1575    use chrono::DateTime;
1576    use chrono::Datelike;
1577    use chrono::Utc;
1578    use insta::assert_snapshot;
1579    use rstest::rstest;
1580    use std::time::Duration;
1581    use val_json::type_wrapper::TypeWrapper;
1582    use val_json::wast_val::WastVal;
1583    use val_json::wast_val::WastValWithType;
1584
1585    #[rstest(expected => [
1586        PendingStateFinishedResultKind::Ok,
1587        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1588    ])]
1589    #[test]
1590    fn serde_pending_state_finished_result_kind_should_work(
1591        expected: PendingStateFinishedResultKind,
1592    ) {
1593        let ser = serde_json::to_string(&expected).unwrap();
1594        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1595        assert_eq!(expected, actual);
1596    }
1597
1598    #[rstest(result_kind => [
1599        PendingStateFinishedResultKind::Ok,
1600        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1601    ])]
1602    #[test]
1603    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1604        let expected = PendingStateFinished {
1605            version: 0,
1606            finished_at: Utc::now(),
1607            result_kind,
1608        };
1609
1610        let ser = serde_json::to_string(&expected).unwrap();
1611        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1612        assert_eq!(expected, actual);
1613    }
1614
1615    #[test]
1616    fn join_set_deser_with_result_ok_option_none_should_work() {
1617        let expected = SupportedFunctionReturnValue::Ok {
1618            ok: Some(WastValWithType {
1619                r#type: TypeWrapper::Result {
1620                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1621                    err: Some(Box::new(TypeWrapper::String)),
1622                },
1623                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1624            }),
1625        };
1626        let json = serde_json::to_string(&expected).unwrap();
1627        assert_snapshot!(json);
1628
1629        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1630
1631        assert_eq!(expected, actual);
1632    }
1633
1634    #[test]
1635    fn as_date_time_should_work_with_duration_u32_max_secs() {
1636        let duration = Duration::from_secs(u64::from(u32::MAX));
1637        let schedule_at = HistoryEventScheduleAt::In(duration);
1638        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1639        assert_eq!(2106, resolved.year());
1640    }
1641
1642    const MILLIS_PER_SEC: i64 = 1000;
1643    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1644
1645    #[test]
1646    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1647        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
1648        let duration = Duration::from_secs(
1649            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1650        );
1651        let schedule_at = HistoryEventScheduleAt::In(duration);
1652        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1653    }
1654}