obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ExecutionFailureKind;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FinishedExecutionError;
7use crate::FunctionFqn;
8use crate::JoinSetId;
9use crate::Params;
10use crate::StrVariant;
11use crate::SupportedFunctionReturnValue;
12use crate::component_id::InputContentDigest;
13use crate::prefixed_ulid::DelayId;
14use crate::prefixed_ulid::ExecutionIdDerived;
15use crate::prefixed_ulid::ExecutorId;
16use crate::prefixed_ulid::RunId;
17use assert_matches::assert_matches;
18use async_trait::async_trait;
19use chrono::TimeDelta;
20use chrono::{DateTime, Utc};
21use http_client_trace::HttpClientTrace;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Display;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::time::Duration;
29use strum::IntoStaticStr;
30use tracing::debug;
31use tracing::error;
32
33/// Remote client representation of the execution journal.
34#[derive(Debug, PartialEq, Eq)]
35#[cfg_attr(feature = "test", derive(Serialize))]
36pub struct ExecutionLog {
37    pub execution_id: ExecutionId,
38    pub events: Vec<ExecutionEvent>,
39    pub responses: Vec<JoinSetResponseEventOuter>,
40    pub next_version: Version, // Is not advanced once in Finished state
41    pub pending_state: PendingState,
42}
43
44impl ExecutionLog {
45    #[must_use]
46    pub fn can_be_retried_after(
47        temporary_event_count: u32,
48        max_retries: u32,
49        retry_exp_backoff: Duration,
50    ) -> Option<Duration> {
51        // If max_retries == u32::MAX, wrapping is OK after this succeeds - we want to retry forever.
52        if temporary_event_count <= max_retries {
53            // TODO: Add test for number of retries
54            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
55            Some(duration)
56        } else {
57            None
58        }
59    }
60
61    #[must_use]
62    pub fn compute_retry_duration_when_retrying_forever(
63        temporary_event_count: u32,
64        retry_exp_backoff: Duration,
65    ) -> Duration {
66        Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
67            .expect("`max_retries` set to MAX must never return None")
68    }
69
70    #[must_use]
71    pub fn ffqn(&self) -> &FunctionFqn {
72        assert_matches!(self.events.first(), Some(ExecutionEvent {
73            event: ExecutionRequest::Created { ffqn, .. },
74            ..
75        }) => ffqn)
76    }
77
78    #[must_use]
79    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
80        assert_matches!(self.events.first(), Some(ExecutionEvent {
81            event: ExecutionRequest::Created { parent, .. },
82            ..
83        }) => parent.clone())
84    }
85
86    #[must_use]
87    pub fn last_event(&self) -> &ExecutionEvent {
88        self.events.last().expect("must contain at least one event")
89    }
90
91    #[must_use]
92    pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
93        if let ExecutionEvent {
94            event: ExecutionRequest::Finished { result, .. },
95            ..
96        } = self.events.pop().expect("must contain at least one event")
97        {
98            Some(result)
99        } else {
100            None
101        }
102    }
103
104    #[cfg(feature = "test")]
105    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
106        self.events.iter().filter_map(|event| {
107            if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
108                Some((eh.clone(), event.version.clone()))
109            } else {
110                None
111            }
112        })
113    }
114
115    #[cfg(feature = "test")]
116    #[must_use]
117    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
118        self.events
119            .iter()
120            .find_map(move |event| match &event.event {
121                ExecutionRequest::HistoryEvent {
122                    event:
123                        HistoryEvent::JoinSetRequest {
124                            join_set_id: found,
125                            request,
126                        },
127                    ..
128                } if *join_set_id == *found => Some(request),
129                _ => None,
130            })
131    }
132}
133
134pub type VersionType = u32;
135#[derive(
136    Debug,
137    Default,
138    Clone,
139    PartialEq,
140    Eq,
141    Hash,
142    derive_more::Display,
143    derive_more::Into,
144    serde::Serialize,
145    serde::Deserialize,
146)]
147#[serde(transparent)]
148pub struct Version(pub VersionType);
149impl Version {
150    #[must_use]
151    pub fn new(arg: VersionType) -> Version {
152        Version(arg)
153    }
154
155    #[must_use]
156    pub fn increment(&self) -> Version {
157        Version(self.0 + 1)
158    }
159}
160
161#[derive(
162    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
163)]
164#[display("{event}")]
165pub struct ExecutionEvent {
166    pub created_at: DateTime<Utc>,
167    pub event: ExecutionRequest,
168    #[serde(skip_serializing_if = "Option::is_none")]
169    pub backtrace_id: Option<Version>,
170    pub version: Version,
171}
172
173/// Moves the execution to [`PendingState::PendingNow`] if it is currently blocked on `JoinNextBlocking`.
174#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
175pub struct JoinSetResponseEventOuter {
176    pub created_at: DateTime<Utc>,
177    pub event: JoinSetResponseEvent,
178}
179
180#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
181pub struct JoinSetResponseEvent {
182    pub join_set_id: JoinSetId,
183    pub event: JoinSetResponse,
184}
185
186#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
187#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
188#[serde(tag = "type")]
189pub enum JoinSetResponse {
190    #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
191    DelayFinished {
192        delay_id: DelayId,
193        result: Result<(), ()>,
194    },
195    #[display("{result}: {child_execution_id}")] // execution completed..
196    ChildExecutionFinished {
197        child_execution_id: ExecutionIdDerived,
198        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
199        finished_version: Version,
200        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
201        result: SupportedFunctionReturnValue,
202    },
203}
204
205pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
206    ffqn: FunctionFqn::new_static("", ""),
207    params: Params::empty(),
208    parent: None,
209    scheduled_at: DateTime::from_timestamp_nanos(0),
210    component_id: ComponentId::dummy_activity(),
211    metadata: ExecutionMetadata::empty(),
212    scheduled_by: None,
213};
214pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
215    event: HistoryEvent::JoinSetCreate {
216        join_set_id: JoinSetId {
217            kind: crate::JoinSetKind::OneOff,
218            name: StrVariant::empty(),
219        },
220    },
221};
222
223#[derive(
224    Clone,
225    derive_more::Debug,
226    derive_more::Display,
227    PartialEq,
228    Eq,
229    Serialize,
230    Deserialize,
231    IntoStaticStr,
232)]
233#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
234#[allow(clippy::large_enum_variant)]
235pub enum ExecutionRequest {
236    #[display("Created({ffqn}, `{scheduled_at}`)")]
237    Created {
238        ffqn: FunctionFqn,
239        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
240        #[debug(skip)]
241        params: Params,
242        parent: Option<(ExecutionId, JoinSetId)>,
243        scheduled_at: DateTime<Utc>,
244        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
245        component_id: ComponentId,
246        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
247        metadata: ExecutionMetadata,
248        scheduled_by: Option<ExecutionId>,
249    },
250    Locked(Locked),
251    /// Returns execution to [`PendingState::PendingNow`] state
252    /// without timing out. This can happen when the executor is running
253    /// out of resources like [`WorkerError::LimitReached`] or when
254    /// the executor is shutting down.
255    #[display("Unlocked(`{backoff_expires_at}`)")]
256    Unlocked {
257        backoff_expires_at: DateTime<Utc>,
258        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
259        reason: StrVariant,
260    },
261    // Created by the executor holding the lock.
262    // After expiry interpreted as pending.
263    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
264    TemporarilyFailed {
265        backoff_expires_at: DateTime<Utc>,
266        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
267        reason: StrVariant,
268        detail: Option<String>,
269        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
270        http_client_traces: Option<Vec<HttpClientTrace>>,
271    },
272    // Created by the executor holding the lock.
273    // After expiry interpreted as pending.
274    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
275    TemporarilyTimedOut {
276        backoff_expires_at: DateTime<Utc>,
277        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
278        http_client_traces: Option<Vec<HttpClientTrace>>,
279    },
280    // Created by the executor holding the lock.
281    #[display("Finished")]
282    Finished {
283        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
284        result: SupportedFunctionReturnValue,
285        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
286        http_client_traces: Option<Vec<HttpClientTrace>>,
287    },
288
289    #[display("HistoryEvent({event})")]
290    HistoryEvent {
291        event: HistoryEvent,
292    },
293}
294
295impl ExecutionRequest {
296    #[must_use]
297    pub fn is_temporary_event(&self) -> bool {
298        matches!(
299            self,
300            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
301        )
302    }
303
304    #[must_use]
305    pub fn variant(&self) -> &'static str {
306        Into::<&'static str>::into(self)
307    }
308
309    #[must_use]
310    pub fn join_set_id(&self) -> Option<&JoinSetId> {
311        match self {
312            Self::Created {
313                parent: Some((_parent_id, join_set_id)),
314                ..
315            } => Some(join_set_id),
316            Self::HistoryEvent {
317                event:
318                    HistoryEvent::JoinSetCreate { join_set_id, .. }
319                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
320                    | HistoryEvent::JoinNext { join_set_id, .. },
321            } => Some(join_set_id),
322            _ => None,
323        }
324    }
325}
326
327#[derive(
328    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
329)]
330#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
331#[display("Locked(`{lock_expires_at}`, {component_id})")]
332pub struct Locked {
333    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
334    pub component_id: ComponentId,
335    pub executor_id: ExecutorId,
336    pub run_id: RunId,
337    pub lock_expires_at: DateTime<Utc>,
338    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
339    pub retry_config: ComponentRetryConfig,
340}
341
342#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
343#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
344#[serde(tag = "type")]
345pub enum PersistKind {
346    #[display("RandomU64({min}, {max_inclusive})")]
347    RandomU64 { min: u64, max_inclusive: u64 },
348    #[display("RandomString({min_length}, {max_length_exclusive})")]
349    RandomString {
350        min_length: u64,
351        max_length_exclusive: u64,
352    },
353}
354
355#[must_use]
356pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
357    value.to_be_bytes()
358}
359
360#[must_use]
361pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
362    u64::from_be_bytes(bytes)
363}
364
365#[derive(
366    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
367)]
368#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
369#[serde(tag = "type")]
370/// Must be created by the executor in [`PendingState::Locked`].
371pub enum HistoryEvent {
372    /// Persist a generated pseudorandom value.
373    #[display("Persist")]
374    Persist {
375        #[debug(skip)]
376        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
377        kind: PersistKind,
378    },
379    #[display("JoinSetCreate({join_set_id})")]
380    JoinSetCreate { join_set_id: JoinSetId },
381    #[display("JoinSetRequest({request})")]
382    // join_set_id is part of ExecutionId or DelayId in the `request`
383    JoinSetRequest {
384        join_set_id: JoinSetId,
385        request: JoinSetRequest,
386    },
387    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
388    /// When the response arrives at `resp_time`:
389    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
390    /// original executor can continue. After the expiry any executor can continue without
391    /// marking the execution as timed out.
392    #[display("JoinNext({join_set_id})")]
393    JoinNext {
394        join_set_id: JoinSetId,
395        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
396        /// The pending status will be kept in Locked state until `run_expires_at`.
397        run_expires_at: DateTime<Utc>,
398        /// Set to a specific function when calling `-await-next` extension function, used for
399        /// determinism checks.
400        requested_ffqn: Option<FunctionFqn>,
401        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
402        closing: bool,
403    },
404    /// Records the fact that a join set was awaited more times than its submission count.
405    #[display("JoinNextTooMany({join_set_id})")]
406    JoinNextTooMany {
407        join_set_id: JoinSetId,
408        /// Set to a specific function when calling `-await-next` extension function, used for
409        /// determinism checks.
410        requested_ffqn: Option<FunctionFqn>,
411    },
412    #[display("Schedule({execution_id}, {schedule_at})")]
413    Schedule {
414        execution_id: ExecutionId,
415        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
416    },
417    #[display("Stub({target_execution_id})")]
418    Stub {
419        target_execution_id: ExecutionIdDerived,
420        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
421        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
422        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
423    },
424}
425
426#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
427#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
428pub enum HistoryEventScheduleAt {
429    Now,
430    #[display("At(`{_0}`)")]
431    At(DateTime<Utc>),
432    #[display("In({_0:?})")]
433    In(Duration),
434}
435
436#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
437pub enum ScheduleAtConversionError {
438    #[error("source duration value is out of range")]
439    OutOfRangeError,
440}
441
442impl HistoryEventScheduleAt {
443    pub fn as_date_time(
444        &self,
445        now: DateTime<Utc>,
446    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
447        match self {
448            Self::Now => Ok(now),
449            Self::At(date_time) => Ok(*date_time),
450            Self::In(duration) => {
451                let time_delta = TimeDelta::from_std(*duration)
452                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
453                now.checked_add_signed(time_delta)
454                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
455            }
456        }
457    }
458}
459
460#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
461#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
462#[serde(tag = "type")]
463pub enum JoinSetRequest {
464    // Must be created by the executor in `PendingState::Locked`.
465    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
466    DelayRequest {
467        delay_id: DelayId,
468        expires_at: DateTime<Utc>,
469        schedule_at: HistoryEventScheduleAt,
470    },
471    // Must be created by the executor in `PendingState::Locked`.
472    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
473    ChildExecutionRequest {
474        child_execution_id: ExecutionIdDerived,
475        target_ffqn: FunctionFqn,
476        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
477        params: Params,
478    },
479}
480
481/// Error that is not specific to an execution.
482#[derive(Debug, Clone, thiserror::Error, PartialEq)]
483pub enum DbErrorGeneric {
484    #[error("database error: {0}")]
485    Uncategorized(StrVariant),
486    #[error("database was closed")]
487    Close,
488}
489
490#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
491pub enum DbErrorWriteNonRetriable {
492    #[error("validation failed: {0}")]
493    ValidationFailed(StrVariant),
494    #[error("conflict")]
495    Conflict,
496    #[error("illegal state: {0}")]
497    IllegalState(StrVariant),
498    #[error("version conflict: expected: {expected}, got: {requested}")]
499    VersionConflict {
500        expected: Version,
501        requested: Version,
502    },
503}
504
505/// Write error tied to an execution
506#[derive(Debug, Clone, thiserror::Error, PartialEq)]
507pub enum DbErrorWrite {
508    #[error("cannot write - row not found")]
509    NotFound,
510    #[error("non-retriable error: {0}")]
511    NonRetriable(#[from] DbErrorWriteNonRetriable),
512    #[error(transparent)]
513    Generic(#[from] DbErrorGeneric),
514}
515
516/// Read error tied to an execution
517#[derive(Debug, Clone, thiserror::Error, PartialEq)]
518pub enum DbErrorRead {
519    #[error("cannot read - row not found")]
520    NotFound,
521    #[error(transparent)]
522    Generic(#[from] DbErrorGeneric),
523}
524
525impl From<DbErrorRead> for DbErrorWrite {
526    fn from(value: DbErrorRead) -> DbErrorWrite {
527        match value {
528            DbErrorRead::NotFound => DbErrorWrite::NotFound,
529            DbErrorRead::Generic(err) => DbErrorWrite::Generic(err),
530        }
531    }
532}
533
534#[derive(Debug, thiserror::Error, PartialEq)]
535pub enum DbErrorReadWithTimeout {
536    #[error("timeout")]
537    Timeout,
538    #[error(transparent)]
539    DbErrorRead(#[from] DbErrorRead),
540}
541impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
542    fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
543        Self::from(DbErrorRead::from(value))
544    }
545}
546
547// Represents next version after successfuly appended to execution log.
548// TODO: Convert to struct with next_version
549pub type AppendResponse = Version;
550pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
551
552#[derive(Debug, Clone)]
553pub struct LockedExecution {
554    pub execution_id: ExecutionId,
555    pub next_version: Version,
556    pub metadata: ExecutionMetadata,
557    pub locked_event: Locked,
558    pub ffqn: FunctionFqn,
559    pub params: Params,
560    pub event_history: Vec<(HistoryEvent, Version)>,
561    pub responses: Vec<JoinSetResponseEventOuter>,
562    pub parent: Option<(ExecutionId, JoinSetId)>,
563    pub intermittent_event_count: u32,
564}
565
566pub type LockPendingResponse = Vec<LockedExecution>;
567pub type AppendBatchResponse = Version;
568
569#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
570#[display("{event}")]
571pub struct AppendRequest {
572    pub created_at: DateTime<Utc>,
573    pub event: ExecutionRequest,
574}
575
576#[derive(Debug, Clone)]
577pub struct CreateRequest {
578    pub created_at: DateTime<Utc>,
579    pub execution_id: ExecutionId,
580    pub ffqn: FunctionFqn,
581    pub params: Params,
582    pub parent: Option<(ExecutionId, JoinSetId)>,
583    pub scheduled_at: DateTime<Utc>,
584    pub component_id: ComponentId,
585    pub metadata: ExecutionMetadata,
586    pub scheduled_by: Option<ExecutionId>,
587}
588
589impl From<CreateRequest> for ExecutionRequest {
590    fn from(value: CreateRequest) -> Self {
591        Self::Created {
592            ffqn: value.ffqn,
593            params: value.params,
594            parent: value.parent,
595            scheduled_at: value.scheduled_at,
596            component_id: value.component_id,
597            metadata: value.metadata,
598            scheduled_by: value.scheduled_by,
599        }
600    }
601}
602
603#[async_trait]
604pub trait DbPool: Send + Sync {
605    fn connection(&self) -> Box<dyn DbConnection>;
606}
607
608#[async_trait]
609pub trait DbPoolCloseable {
610    async fn close(self);
611}
612
613#[derive(Clone, Debug)]
614pub struct AppendEventsToExecution {
615    pub execution_id: ExecutionId,
616    pub version: Version,
617    pub batch: Vec<AppendRequest>,
618}
619
620#[derive(Clone, Debug)]
621pub struct AppendResponseToExecution {
622    pub parent_execution_id: ExecutionId,
623    pub created_at: DateTime<Utc>,
624    pub join_set_id: JoinSetId,
625    pub child_execution_id: ExecutionIdDerived,
626    pub finished_version: Version,
627    pub result: SupportedFunctionReturnValue,
628}
629
630#[async_trait]
631pub trait DbExecutor: Send + Sync {
632    #[expect(clippy::too_many_arguments)]
633    async fn lock_pending_by_ffqns(
634        &self,
635        batch_size: usize,
636        pending_at_or_sooner: DateTime<Utc>,
637        ffqns: Arc<[FunctionFqn]>,
638        created_at: DateTime<Utc>,
639        component_id: ComponentId,
640        executor_id: ExecutorId,
641        lock_expires_at: DateTime<Utc>,
642        run_id: RunId,
643        retry_config: ComponentRetryConfig,
644    ) -> Result<LockPendingResponse, DbErrorGeneric>;
645
646    #[expect(clippy::too_many_arguments)]
647    async fn lock_pending_by_component_id(
648        &self,
649        batch_size: usize,
650        pending_at_or_sooner: DateTime<Utc>,
651        component_id: &ComponentId,
652        created_at: DateTime<Utc>,
653        executor_id: ExecutorId,
654        lock_expires_at: DateTime<Utc>,
655        run_id: RunId,
656        retry_config: ComponentRetryConfig,
657    ) -> Result<LockPendingResponse, DbErrorGeneric>;
658
659    /// Specialized locking for e.g. extending the lock by the original executor and run.
660    #[expect(clippy::too_many_arguments)]
661    async fn lock_one(
662        &self,
663        created_at: DateTime<Utc>,
664        component_id: ComponentId,
665        execution_id: &ExecutionId,
666        run_id: RunId,
667        version: Version,
668        executor_id: ExecutorId,
669        lock_expires_at: DateTime<Utc>,
670        retry_config: ComponentRetryConfig,
671    ) -> Result<LockedExecution, DbErrorWrite>;
672
673    /// Append a single event to an existing execution log.
674    /// The request cannot contain `ExecutionEventInner::Created`.
675    async fn append(
676        &self,
677        execution_id: ExecutionId,
678        version: Version,
679        req: AppendRequest,
680    ) -> Result<AppendResponse, DbErrorWrite>;
681
682    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
683    /// The batch cannot contain `ExecutionEventInner::Created`.
684    async fn append_batch_respond_to_parent(
685        &self,
686        events: AppendEventsToExecution,
687        response: AppendResponseToExecution,
688        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
689    ) -> Result<AppendBatchResponse, DbErrorWrite>;
690
691    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
692    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
693    /// Otherwise wait until `timeout_fut` resolves.
694    /// Timers that expire between `pending_at_or_sooner` and timeout can be disregarded.
695    async fn wait_for_pending_by_ffqn(
696        &self,
697        pending_at_or_sooner: DateTime<Utc>,
698        ffqns: Arc<[FunctionFqn]>,
699        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
700    );
701
702    async fn wait_for_pending_by_component_id(
703        &self,
704        pending_at_or_sooner: DateTime<Utc>,
705        component_id: &ComponentId,
706        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
707    );
708
709    async fn cancel_activity_with_retries(
710        &self,
711        execution_id: &ExecutionId,
712        cancelled_at: DateTime<Utc>,
713    ) -> Result<CancelOutcome, DbErrorWrite> {
714        let mut retries = 5;
715        loop {
716            let res = self.cancel_activity(execution_id, cancelled_at).await;
717            if res.is_ok() || retries == 0 {
718                return res;
719            }
720            retries -= 1;
721        }
722    }
723
724    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
725    async fn get_last_execution_event(
726        &self,
727        execution_id: &ExecutionId,
728    ) -> Result<ExecutionEvent, DbErrorRead>;
729
730    async fn cancel_activity(
731        &self,
732        execution_id: &ExecutionId,
733        cancelled_at: DateTime<Utc>,
734    ) -> Result<CancelOutcome, DbErrorWrite> {
735        debug!("Determining cancellation state of {execution_id}");
736
737        let last_event = self
738            .get_last_execution_event(execution_id)
739            .await
740            .map_err(DbErrorWrite::from)?;
741        if let ExecutionRequest::Finished {
742            result:
743                SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
744                    kind: ExecutionFailureKind::Cancelled,
745                    ..
746                }),
747            ..
748        } = last_event.event
749        {
750            return Ok(CancelOutcome::Cancelled);
751        } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
752            debug!("Not cancelling, {execution_id} is already finished");
753            return Ok(CancelOutcome::AlreadyFinished);
754        }
755        let finished_version = last_event.version.increment();
756        let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
757            reason: None,
758            kind: ExecutionFailureKind::Cancelled,
759            detail: None,
760        });
761        let cancel_request = AppendRequest {
762            created_at: cancelled_at,
763            event: ExecutionRequest::Finished {
764                result: child_result.clone(),
765                http_client_traces: None,
766            },
767        };
768        debug!("Cancelling activity {execution_id} at {finished_version}");
769        if let ExecutionId::Derived(execution_id) = execution_id {
770            let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
771            let child_execution_id = ExecutionId::Derived(execution_id.clone());
772            self.append_batch_respond_to_parent(
773                AppendEventsToExecution {
774                    execution_id: child_execution_id,
775                    version: finished_version.clone(),
776                    batch: vec![cancel_request],
777                },
778                AppendResponseToExecution {
779                    parent_execution_id,
780                    created_at: cancelled_at,
781                    join_set_id: join_set_id.clone(),
782                    child_execution_id: execution_id.clone(),
783                    finished_version,
784                    result: child_result,
785                },
786                cancelled_at,
787            )
788            .await?;
789        } else {
790            self.append(execution_id.clone(), finished_version, cancel_request)
791                .await?;
792        }
793        debug!("Cancelled {execution_id}");
794        Ok(CancelOutcome::Cancelled)
795    }
796}
797
798pub enum AppendDelayResponseOutcome {
799    Success,
800    AlreadyFinished,
801    AlreadyCancelled,
802}
803
804#[async_trait]
805pub trait DbConnection: DbExecutor {
806    #[cfg(feature = "test")]
807    async fn append_response(
808        &self,
809        created_at: DateTime<Utc>,
810        execution_id: ExecutionId,
811        response_event: JoinSetResponseEvent,
812    ) -> Result<(), DbErrorWrite>;
813
814    async fn append_delay_response(
815        &self,
816        created_at: DateTime<Utc>,
817        execution_id: ExecutionId,
818        join_set_id: JoinSetId,
819        delay_id: DelayId,
820        outcome: Result<(), ()>, // Successfully finished - `Ok(())` or cancelled - `Err(())`
821    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
822
823    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
824    /// The batch cannot contain `ExecutionEventInner::Created`.
825    async fn append_batch(
826        &self,
827        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
828        batch: Vec<AppendRequest>,
829        execution_id: ExecutionId,
830        version: Version,
831    ) -> Result<AppendBatchResponse, DbErrorWrite>;
832
833    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
834    /// The batch cannot contain `ExecutionEventInner::Created`.
835    async fn append_batch_create_new_execution(
836        &self,
837        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
838        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
839        execution_id: ExecutionId,
840        version: Version,
841        child_req: Vec<CreateRequest>,
842    ) -> Result<AppendBatchResponse, DbErrorWrite>;
843
844    #[cfg(feature = "test")]
845    /// Get execution log.
846    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
847
848    async fn list_execution_events(
849        &self,
850        execution_id: &ExecutionId,
851        since: &Version,
852        max_length: VersionType,
853        include_backtrace_id: bool,
854    ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
855
856    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
857    async fn get_execution_event(
858        &self,
859        execution_id: &ExecutionId,
860        version: &Version,
861    ) -> Result<ExecutionEvent, DbErrorRead>;
862
863    async fn get_create_request(
864        &self,
865        execution_id: &ExecutionId,
866    ) -> Result<CreateRequest, DbErrorRead> {
867        let execution_event = self
868            .get_execution_event(execution_id, &Version::new(0))
869            .await?;
870        if let ExecutionRequest::Created {
871            ffqn,
872            params,
873            parent,
874            scheduled_at,
875            component_id,
876            metadata,
877            scheduled_by,
878        } = execution_event.event
879        {
880            Ok(CreateRequest {
881                created_at: execution_event.created_at,
882                execution_id: execution_id.clone(),
883                ffqn,
884                params,
885                parent,
886                scheduled_at,
887                component_id,
888                metadata,
889                scheduled_by,
890            })
891        } else {
892            error!(%execution_id, "Execution log must start with creation");
893            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized(
894                "execution log must start with creation".into(),
895            )))
896        }
897    }
898
899    async fn get_pending_state(
900        &self,
901        execution_id: &ExecutionId,
902    ) -> Result<PendingState, DbErrorRead>;
903
904    /// Get currently expired locks and async timers (delay requests)
905    async fn get_expired_timers(
906        &self,
907        at: DateTime<Utc>,
908    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
909
910    /// Create a new execution log
911    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
912
913    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
914    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
915    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
916    /// Implementations with no pubsub support should use polling.
917    async fn subscribe_to_next_responses(
918        &self,
919        execution_id: &ExecutionId,
920        start_idx: usize,
921        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
922    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
923
924    /// Notification mechainism with no strict guarantees for getting the finished result.
925    /// Implementations with no pubsub support should use polling.
926    async fn wait_for_finished_result(
927        &self,
928        execution_id: &ExecutionId,
929        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
930    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
931
932    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
933
934    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
935
936    /// Used by gRPC only.
937    /// Get the latest backtrace if version is not set.
938    async fn get_backtrace(
939        &self,
940        execution_id: &ExecutionId,
941        filter: BacktraceFilter,
942    ) -> Result<BacktraceInfo, DbErrorRead>;
943
944    /// Returns executions sorted in descending order.
945    /// Used by gRPC only.
946    async fn list_executions(
947        &self,
948        ffqn: Option<FunctionFqn>,
949        top_level_only: bool,
950        pagination: ExecutionListPagination,
951    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
952
953    /// Returns responses of an execution ordered as they arrived,
954    /// enabling matching each `JoinNext` to its corresponding response.
955    /// Used by gRPC only.
956    async fn list_responses(
957        &self,
958        execution_id: &ExecutionId,
959        pagination: Pagination<u32>,
960    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
961
962    async fn upgrade_execution_component(
963        &self,
964        execution_id: &ExecutionId,
965        old: &InputContentDigest,
966        new: &InputContentDigest,
967    ) -> Result<(), DbErrorWrite>;
968}
969
970#[derive(Clone, Copy, Debug, PartialEq, Eq)]
971pub enum CancelOutcome {
972    Cancelled,
973    AlreadyFinished,
974}
975
976pub async fn stub_execution(
977    db_connection: &dyn DbConnection,
978    execution_id: ExecutionIdDerived,
979    parent_execution_id: ExecutionId,
980    join_set_id: JoinSetId,
981    created_at: DateTime<Utc>,
982    return_value: SupportedFunctionReturnValue,
983) -> Result<(), DbErrorWrite> {
984    let stub_finished_version = Version::new(1); // Stub activities have no execution log except Created event.
985    // Attempt to write to `execution_id` and its parent, ignoring the possible conflict error on this tx
986    let write_attempt = {
987        let finished_req = AppendRequest {
988            created_at,
989            event: ExecutionRequest::Finished {
990                result: return_value.clone(),
991                http_client_traces: None,
992            },
993        };
994        db_connection
995            .append_batch_respond_to_parent(
996                AppendEventsToExecution {
997                    execution_id: ExecutionId::Derived(execution_id.clone()),
998                    version: stub_finished_version.clone(),
999                    batch: vec![finished_req],
1000                },
1001                AppendResponseToExecution {
1002                    parent_execution_id,
1003                    created_at,
1004                    join_set_id,
1005                    child_execution_id: execution_id.clone(),
1006                    finished_version: stub_finished_version.clone(),
1007                    result: return_value.clone(),
1008                },
1009                created_at,
1010            )
1011            .await
1012    };
1013    if let Err(write_attempt) = write_attempt {
1014        // Check that the expected value is in the database
1015        debug!("Stub write attempt failed - {write_attempt:?}");
1016
1017        let found = db_connection
1018            .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1019            .await?; // Not found at this point should not happen, unless the previous write failed. Will be retried.
1020        match found.event {
1021            ExecutionRequest::Finished {
1022                result: found_result,
1023                ..
1024            } if return_value == found_result => {
1025                // Same value has already be written, RPC is successful.
1026                Ok(())
1027            }
1028            ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1029                DbErrorWriteNonRetriable::Conflict,
1030            )),
1031            _other => Err(DbErrorWrite::NonRetriable(
1032                DbErrorWriteNonRetriable::IllegalState(
1033                    "unexpected execution event at stubbed execution".into(),
1034                ),
1035            )),
1036        }
1037    } else {
1038        Ok(())
1039    }
1040}
1041
1042pub async fn cancel_delay(
1043    db_connection: &dyn DbConnection,
1044    delay_id: DelayId,
1045    created_at: DateTime<Utc>,
1046) -> Result<CancelOutcome, DbErrorWrite> {
1047    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1048    db_connection
1049        .append_delay_response(
1050            created_at,
1051            parent_execution_id,
1052            join_set_id,
1053            delay_id,
1054            Err(()), // Mark as cancelled.
1055        )
1056        .await
1057        .map(|ok| match ok {
1058            AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1059                CancelOutcome::Cancelled
1060            }
1061            AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1062        })
1063}
1064
1065#[derive(Clone)]
1066pub enum BacktraceFilter {
1067    First,
1068    Last,
1069    Specific(Version),
1070}
1071
1072pub struct BacktraceInfo {
1073    pub execution_id: ExecutionId,
1074    pub component_id: ComponentId,
1075    pub version_min_including: Version,
1076    pub version_max_excluding: Version,
1077    pub wasm_backtrace: WasmBacktrace,
1078}
1079
1080#[derive(Serialize, Deserialize, Debug, Clone)]
1081pub struct WasmBacktrace {
1082    pub frames: Vec<FrameInfo>,
1083}
1084
1085#[derive(Serialize, Deserialize, Debug, Clone)]
1086pub struct FrameInfo {
1087    pub module: String,
1088    pub func_name: String,
1089    pub symbols: Vec<FrameSymbol>,
1090}
1091
1092#[derive(Serialize, Deserialize, Debug, Clone)]
1093pub struct FrameSymbol {
1094    pub func_name: Option<String>,
1095    pub file: Option<String>,
1096    pub line: Option<u32>,
1097    pub col: Option<u32>,
1098}
1099
1100mod wasm_backtrace {
1101    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1102
1103    impl WasmBacktrace {
1104        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1105            if backtrace.frames().is_empty() {
1106                None
1107            } else {
1108                Some(Self {
1109                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1110                })
1111            }
1112        }
1113    }
1114
1115    impl From<&wasmtime::FrameInfo> for FrameInfo {
1116        fn from(frame: &wasmtime::FrameInfo) -> Self {
1117            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1118            let mut func_name = String::new();
1119            wasmtime_environ::demangle_function_name_or_index(
1120                &mut func_name,
1121                frame.func_name(),
1122                frame.func_index() as usize,
1123            )
1124            .expect("writing to string must succeed");
1125            Self {
1126                module: module_name,
1127                func_name,
1128                symbols: frame
1129                    .symbols()
1130                    .iter()
1131                    .map(std::convert::Into::into)
1132                    .collect(),
1133            }
1134        }
1135    }
1136
1137    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1138        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1139            let func_name = symbol.name().map(|name| {
1140                let mut writer = String::new();
1141                wasmtime_environ::demangle_function_name(&mut writer, name)
1142                    .expect("writing to string must succeed");
1143                writer
1144            });
1145
1146            Self {
1147                func_name,
1148                file: symbol.file().map(ToString::to_string),
1149                line: symbol.line(),
1150                col: symbol.column(),
1151            }
1152        }
1153    }
1154}
1155
1156pub type ResponseCursorType = u32; // FIXME: Switch to u64
1157
1158#[derive(Debug, Clone, Serialize)]
1159pub struct ResponseWithCursor {
1160    pub event: JoinSetResponseEventOuter,
1161    pub cursor: ResponseCursorType,
1162}
1163
1164#[derive(Debug)]
1165pub struct ExecutionWithState {
1166    pub execution_id: ExecutionId,
1167    pub ffqn: FunctionFqn,
1168    pub pending_state: PendingState,
1169    pub created_at: DateTime<Utc>,
1170    pub first_scheduled_at: DateTime<Utc>,
1171    pub component_digest: InputContentDigest,
1172}
1173
1174#[derive(Debug, Clone)]
1175pub enum ExecutionListPagination {
1176    CreatedBy(Pagination<Option<DateTime<Utc>>>),
1177    ExecutionId(Pagination<Option<ExecutionId>>),
1178}
1179impl Default for ExecutionListPagination {
1180    fn default() -> ExecutionListPagination {
1181        ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1182            length: 20,
1183            cursor: None,
1184            including_cursor: false, // does not matter when `cursor` is not specified
1185        })
1186    }
1187}
1188impl ExecutionListPagination {
1189    #[must_use]
1190    pub fn length(&self) -> u8 {
1191        match self {
1192            ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1193            ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1194        }
1195    }
1196}
1197
1198#[derive(Debug, Clone, Copy)]
1199pub enum Pagination<T> {
1200    NewerThan {
1201        length: u8,
1202        cursor: T,
1203        including_cursor: bool,
1204    },
1205    OlderThan {
1206        length: u8,
1207        cursor: T,
1208        including_cursor: bool,
1209    },
1210}
1211impl<T> Pagination<T> {
1212    pub fn length(&self) -> u8 {
1213        match self {
1214            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1215        }
1216    }
1217    pub fn rel(&self) -> &'static str {
1218        match self {
1219            Pagination::NewerThan {
1220                including_cursor: false,
1221                ..
1222            } => ">",
1223            Pagination::NewerThan {
1224                including_cursor: true,
1225                ..
1226            } => ">=",
1227            Pagination::OlderThan {
1228                including_cursor: false,
1229                ..
1230            } => "<",
1231            Pagination::OlderThan {
1232                including_cursor: true,
1233                ..
1234            } => "<=",
1235        }
1236    }
1237    pub fn is_desc(&self) -> bool {
1238        matches!(self, Pagination::OlderThan { .. })
1239    }
1240}
1241
1242#[cfg(feature = "test")]
1243pub async fn wait_for_pending_state_fn<T: Debug>(
1244    db_connection: &dyn DbConnection,
1245    execution_id: &ExecutionId,
1246    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1247    timeout: Option<Duration>,
1248) -> Result<T, DbErrorReadWithTimeout> {
1249    tracing::trace!(%execution_id, "Waiting for predicate");
1250    let fut = async move {
1251        loop {
1252            let execution_log = db_connection.get(execution_id).await?;
1253            if let Some(t) = predicate(execution_log) {
1254                tracing::debug!(%execution_id, "Found: {t:?}");
1255                return Ok(t);
1256            }
1257            tokio::time::sleep(Duration::from_millis(10)).await;
1258        }
1259    };
1260
1261    if let Some(timeout) = timeout {
1262        tokio::select! { // future's liveness: Dropping the loser immediately.
1263            res = fut => res,
1264            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1265        }
1266    } else {
1267        fut.await
1268    }
1269}
1270
1271#[derive(Debug, Clone, PartialEq, Eq)]
1272pub enum ExpiredTimer {
1273    Lock(ExpiredLock),
1274    Delay(ExpiredDelay),
1275}
1276
1277#[derive(Debug, Clone, PartialEq, Eq)]
1278pub struct ExpiredLock {
1279    pub execution_id: ExecutionId,
1280    // Version of last `Locked` event, used to detect whether the execution made progress.
1281    pub locked_at_version: Version,
1282    pub next_version: Version,
1283    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1284    pub intermittent_event_count: u32,
1285    pub max_retries: u32,
1286    pub retry_exp_backoff: Duration,
1287    pub locked_by: LockedBy,
1288}
1289
1290#[derive(Debug, Clone, PartialEq, Eq)]
1291pub struct ExpiredDelay {
1292    pub execution_id: ExecutionId,
1293    pub join_set_id: JoinSetId,
1294    pub delay_id: DelayId,
1295}
1296
1297#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1298#[serde(tag = "status")]
1299pub enum PendingState {
1300    Locked(PendingStateLocked),
1301    #[display("PendingAt(`{scheduled_at}`)")]
1302    PendingAt {
1303        scheduled_at: DateTime<Utc>,
1304        last_lock: Option<LockedBy>, // Needed for lock extension
1305        component_id_input_digest: InputContentDigest,
1306    }, // e.g. created with a schedule, temporary timeout/failure
1307    #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1308    /// Caused by [`HistoryEvent::JoinNext`]
1309    BlockedByJoinSet {
1310        join_set_id: JoinSetId,
1311        /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1312        lock_expires_at: DateTime<Utc>,
1313        /// Blocked by closing of the join set
1314        closing: bool,
1315        component_id_input_digest: InputContentDigest,
1316    },
1317    #[display("Finished({finished})")]
1318    Finished {
1319        #[serde(flatten)]
1320        finished: PendingStateFinished,
1321        component_id_input_digest: InputContentDigest,
1322    },
1323}
1324impl PendingState {
1325    #[must_use]
1326    pub fn get_component_id_input_digest(&self) -> &InputContentDigest {
1327        match self {
1328            PendingState::Locked(pending_state_locked) => {
1329                &pending_state_locked.component_id_input_digest
1330            }
1331            PendingState::PendingAt {
1332                component_id_input_digest,
1333                ..
1334            }
1335            | PendingState::BlockedByJoinSet {
1336                component_id_input_digest,
1337                ..
1338            }
1339            | PendingState::Finished {
1340                component_id_input_digest,
1341                ..
1342            } => component_id_input_digest,
1343        }
1344    }
1345}
1346
1347#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1348#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1349pub struct PendingStateLocked {
1350    pub locked_by: LockedBy,
1351    pub lock_expires_at: DateTime<Utc>,
1352    pub component_id_input_digest: InputContentDigest,
1353}
1354
1355#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1356pub struct LockedBy {
1357    pub executor_id: ExecutorId,
1358    pub run_id: RunId,
1359}
1360
1361#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1362pub struct PendingStateFinished {
1363    pub version: VersionType, // not Version since it must be Copy
1364    pub finished_at: DateTime<Utc>,
1365    pub result_kind: PendingStateFinishedResultKind,
1366}
1367impl Display for PendingStateFinished {
1368    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1369        match self.result_kind {
1370            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1371            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1372        }
1373    }
1374}
1375
1376// This is not a Result so that it can be customized for serialization
1377#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1378#[serde(rename_all = "snake_case")]
1379pub enum PendingStateFinishedResultKind {
1380    Ok,
1381    Err(PendingStateFinishedError),
1382}
1383impl PendingStateFinishedResultKind {
1384    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1385        match self {
1386            PendingStateFinishedResultKind::Ok => Ok(()),
1387            PendingStateFinishedResultKind::Err(err) => Err(err),
1388        }
1389    }
1390}
1391
1392impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1393    fn from(result: &SupportedFunctionReturnValue) -> Self {
1394        result.as_pending_state_finished_result()
1395    }
1396}
1397
1398#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1399pub enum PendingStateFinishedError {
1400    #[display("execution terminated: {_0}")]
1401    ExecutionFailure(ExecutionFailureKind),
1402    #[display("execution completed with an error")]
1403    FallibleError,
1404}
1405
1406impl PendingState {
1407    pub fn can_append_lock(
1408        &self,
1409        created_at: DateTime<Utc>,
1410        executor_id: ExecutorId,
1411        run_id: RunId,
1412        lock_expires_at: DateTime<Utc>,
1413    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1414        if lock_expires_at <= created_at {
1415            return Err(DbErrorWriteNonRetriable::ValidationFailed(
1416                "invalid expiry date".into(),
1417            ));
1418        }
1419        match self {
1420            PendingState::PendingAt {
1421                scheduled_at,
1422                last_lock,
1423                component_id_input_digest: _,
1424            } => {
1425                if *scheduled_at <= created_at {
1426                    // pending now, ok to lock
1427                    Ok(LockKind::CreatingNewLock)
1428                } else if let Some(LockedBy {
1429                    executor_id: last_executor_id,
1430                    run_id: last_run_id,
1431                }) = last_lock
1432                    && executor_id == *last_executor_id
1433                    && run_id == *last_run_id
1434                {
1435                    // Original executor is extending the lock.
1436                    Ok(LockKind::Extending)
1437                } else {
1438                    Err(DbErrorWriteNonRetriable::ValidationFailed(
1439                        "cannot lock, not yet pending".into(),
1440                    ))
1441                }
1442            }
1443            PendingState::Locked(PendingStateLocked {
1444                locked_by:
1445                    LockedBy {
1446                        executor_id: current_pending_state_executor_id,
1447                        run_id: current_pending_state_run_id,
1448                    },
1449                lock_expires_at: _,
1450                component_id_input_digest: _,
1451            }) => {
1452                if executor_id == *current_pending_state_executor_id
1453                    && run_id == *current_pending_state_run_id
1454                {
1455                    // Original executor is extending the lock.
1456                    Ok(LockKind::Extending)
1457                } else {
1458                    Err(DbErrorWriteNonRetriable::IllegalState(
1459                        "cannot lock, already locked".into(),
1460                    ))
1461                }
1462            }
1463            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1464                "cannot append Locked event when in BlockedByJoinSet state".into(),
1465            )),
1466            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1467                "already finished".into(),
1468            )),
1469        }
1470    }
1471
1472    #[must_use]
1473    pub fn is_finished(&self) -> bool {
1474        matches!(self, PendingState::Finished { .. })
1475    }
1476}
1477
1478#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1479pub enum LockKind {
1480    Extending,
1481    CreatingNewLock,
1482}
1483
1484pub mod http_client_trace {
1485    use chrono::{DateTime, Utc};
1486    use serde::{Deserialize, Serialize};
1487
1488    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1489    pub struct HttpClientTrace {
1490        pub req: RequestTrace,
1491        pub resp: Option<ResponseTrace>,
1492    }
1493
1494    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1495    pub struct RequestTrace {
1496        pub sent_at: DateTime<Utc>,
1497        pub uri: String,
1498        pub method: String,
1499    }
1500
1501    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1502    pub struct ResponseTrace {
1503        pub finished_at: DateTime<Utc>,
1504        pub status: Result<u16, String>,
1505    }
1506}
1507
1508#[cfg(test)]
1509mod tests {
1510    use super::HistoryEventScheduleAt;
1511    use super::PendingStateFinished;
1512    use super::PendingStateFinishedError;
1513    use super::PendingStateFinishedResultKind;
1514    use crate::ExecutionFailureKind;
1515    use crate::SupportedFunctionReturnValue;
1516    use chrono::DateTime;
1517    use chrono::Datelike;
1518    use chrono::Utc;
1519    use insta::assert_snapshot;
1520    use rstest::rstest;
1521    use std::time::Duration;
1522    use val_json::type_wrapper::TypeWrapper;
1523    use val_json::wast_val::WastVal;
1524    use val_json::wast_val::WastValWithType;
1525
1526    #[rstest(expected => [
1527        PendingStateFinishedResultKind::Ok,
1528        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1529    ])]
1530    #[test]
1531    fn serde_pending_state_finished_result_kind_should_work(
1532        expected: PendingStateFinishedResultKind,
1533    ) {
1534        let ser = serde_json::to_string(&expected).unwrap();
1535        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1536        assert_eq!(expected, actual);
1537    }
1538
1539    #[rstest(result_kind => [
1540        PendingStateFinishedResultKind::Ok,
1541        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1542    ])]
1543    #[test]
1544    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1545        let expected = PendingStateFinished {
1546            version: 0,
1547            finished_at: Utc::now(),
1548            result_kind,
1549        };
1550
1551        let ser = serde_json::to_string(&expected).unwrap();
1552        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1553        assert_eq!(expected, actual);
1554    }
1555
1556    #[test]
1557    fn join_set_deser_with_result_ok_option_none_should_work() {
1558        let expected = SupportedFunctionReturnValue::Ok {
1559            ok: Some(WastValWithType {
1560                r#type: TypeWrapper::Result {
1561                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1562                    err: Some(Box::new(TypeWrapper::String)),
1563                },
1564                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1565            }),
1566        };
1567        let json = serde_json::to_string(&expected).unwrap();
1568        assert_snapshot!(json);
1569
1570        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1571
1572        assert_eq!(expected, actual);
1573    }
1574
1575    #[test]
1576    fn as_date_time_should_work_with_duration_u32_max_secs() {
1577        let duration = Duration::from_secs(u64::from(u32::MAX));
1578        let schedule_at = HistoryEventScheduleAt::In(duration);
1579        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1580        assert_eq!(2106, resolved.year());
1581    }
1582
1583    const MILLIS_PER_SEC: i64 = 1000;
1584    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1585
1586    #[test]
1587    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1588        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
1589        let duration = Duration::from_secs(
1590            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1591        );
1592        let schedule_at = HistoryEventScheduleAt::In(duration);
1593        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1594    }
1595}