obeli_sk_concepts/
storage.rs

1use crate::ClosingStrategy;
2use crate::ComponentId;
3use crate::ExecutionId;
4use crate::ExecutionMetadata;
5use crate::FunctionFqn;
6use crate::JoinSetId;
7use crate::Params;
8use crate::StrVariant;
9use crate::SupportedFunctionReturnValue;
10use crate::prefixed_ulid::DelayId;
11use crate::prefixed_ulid::ExecutionIdDerived;
12use crate::prefixed_ulid::ExecutorId;
13use crate::prefixed_ulid::RunId;
14use assert_matches::assert_matches;
15use async_trait::async_trait;
16use chrono::TimeDelta;
17use chrono::{DateTime, Utc};
18use http_client_trace::HttpClientTrace;
19use serde::Deserialize;
20use serde::Serialize;
21use std::fmt::Debug;
22use std::fmt::Display;
23use std::pin::Pin;
24use std::str::FromStr;
25use std::sync::Arc;
26use std::time::Duration;
27use strum::IntoStaticStr;
28use tracing::error;
29
30/// Remote client representation of the execution journal.
31#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
32pub struct ExecutionLog {
33    pub execution_id: ExecutionId,
34    pub events: Vec<ExecutionEvent>,
35    pub responses: Vec<JoinSetResponseEventOuter>,
36    pub next_version: Version, // Is not advanced once in Finished state
37    pub pending_state: PendingState,
38}
39
40impl ExecutionLog {
41    #[must_use]
42    pub fn can_be_retried_after(
43        temporary_event_count: u32,
44        max_retries: u32,
45        retry_exp_backoff: Duration,
46    ) -> Option<Duration> {
47        // If max_retries == u32::MAX, wrapping is OK after this succeeds - we want to retry forever.
48        if temporary_event_count <= max_retries {
49            // TODO: Add test for number of retries
50            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
51            Some(duration)
52        } else {
53            None
54        }
55    }
56
57    #[must_use]
58    pub fn compute_retry_duration_when_retrying_forever(
59        temporary_event_count: u32,
60        retry_exp_backoff: Duration,
61    ) -> Duration {
62        Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
63            .expect("`max_retries` set to MAX must never return None")
64    }
65
66    #[must_use]
67    pub fn retry_exp_backoff(&self) -> Duration {
68        assert_matches!(self.events.first(), Some(ExecutionEvent {
69            event: ExecutionEventInner::Created { retry_exp_backoff, .. },
70            ..
71        }) => *retry_exp_backoff)
72    }
73
74    #[must_use]
75    pub fn max_retries(&self) -> u32 {
76        assert_matches!(self.events.first(), Some(ExecutionEvent {
77            event: ExecutionEventInner::Created { max_retries, .. },
78            ..
79        }) => *max_retries)
80    }
81
82    #[must_use]
83    pub fn ffqn(&self) -> &FunctionFqn {
84        assert_matches!(self.events.first(), Some(ExecutionEvent {
85            event: ExecutionEventInner::Created { ffqn, .. },
86            ..
87        }) => ffqn)
88    }
89
90    #[must_use]
91    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
92        assert_matches!(self.events.first(), Some(ExecutionEvent {
93            event: ExecutionEventInner::Created { parent, .. },
94            ..
95        }) => parent.clone())
96    }
97
98    #[must_use]
99    pub fn last_event(&self) -> &ExecutionEvent {
100        self.events.last().expect("must contain at least one event")
101    }
102
103    #[must_use]
104    pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
105        if let ExecutionEvent {
106            event: ExecutionEventInner::Finished { result, .. },
107            ..
108        } = self.events.pop().expect("must contain at least one event")
109        {
110            Some(result)
111        } else {
112            None
113        }
114    }
115
116    pub fn event_history(&self) -> impl Iterator<Item = HistoryEvent> + '_ {
117        self.events.iter().filter_map(|event| {
118            if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
119                Some(eh.clone())
120            } else {
121                None
122            }
123        })
124    }
125
126    #[cfg(feature = "test")]
127    #[must_use]
128    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
129        self.events
130            .iter()
131            .find_map(move |event| match &event.event {
132                ExecutionEventInner::HistoryEvent {
133                    event:
134                        HistoryEvent::JoinSetRequest {
135                            join_set_id: found,
136                            request,
137                        },
138                    ..
139                } if *join_set_id == *found => Some(request),
140                _ => None,
141            })
142    }
143}
144
145pub type VersionType = u32;
146#[derive(
147    Debug,
148    Default,
149    Clone,
150    PartialEq,
151    Eq,
152    Hash,
153    derive_more::Display,
154    derive_more::Into,
155    serde::Serialize,
156    serde::Deserialize,
157)]
158#[serde(transparent)]
159pub struct Version(pub VersionType);
160impl Version {
161    #[must_use]
162    pub fn new(arg: VersionType) -> Self {
163        Self(arg)
164    }
165}
166
167#[derive(
168    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
169)]
170#[display("{event}")]
171pub struct ExecutionEvent {
172    pub created_at: DateTime<Utc>,
173    pub event: ExecutionEventInner,
174    pub backtrace_id: Option<Version>,
175}
176
177/// Moves the execution to [`PendingState::PendingNow`] if it is currently blocked on `JoinNextBlocking`.
178#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
179pub struct JoinSetResponseEventOuter {
180    pub created_at: DateTime<Utc>,
181    pub event: JoinSetResponseEvent,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
185pub struct JoinSetResponseEvent {
186    pub join_set_id: JoinSetId,
187    pub event: JoinSetResponse,
188}
189
190#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
191#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
192#[serde(tag = "type")]
193pub enum JoinSetResponse {
194    DelayFinished {
195        delay_id: DelayId,
196    },
197    ChildExecutionFinished {
198        child_execution_id: ExecutionIdDerived,
199        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
200        finished_version: Version,
201        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
202        result: SupportedFunctionReturnValue,
203    },
204}
205
206pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
207    ffqn: FunctionFqn::new_static("", ""),
208    params: Params::empty(),
209    parent: None,
210    scheduled_at: DateTime::from_timestamp_nanos(0),
211    retry_exp_backoff: Duration::ZERO,
212    max_retries: 0,
213    component_id: ComponentId::dummy_activity(),
214    metadata: ExecutionMetadata::empty(),
215    scheduled_by: None,
216};
217pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
218    event: HistoryEvent::JoinSetCreate {
219        join_set_id: JoinSetId {
220            kind: crate::JoinSetKind::OneOff,
221            name: StrVariant::empty(),
222        },
223        closing_strategy: ClosingStrategy::Complete,
224    },
225};
226pub const DUMMY_TEMPORARILY_TIMED_OUT: ExecutionEventInner =
227    ExecutionEventInner::TemporarilyTimedOut {
228        backoff_expires_at: DateTime::from_timestamp_nanos(0),
229        http_client_traces: None,
230    };
231pub const DUMMY_TEMPORARILY_FAILED: ExecutionEventInner = ExecutionEventInner::TemporarilyFailed {
232    backoff_expires_at: DateTime::from_timestamp_nanos(0),
233    reason_full: StrVariant::empty(),
234    reason_inner: StrVariant::empty(),
235    detail: None,
236    http_client_traces: None,
237};
238
239#[derive(
240    Clone,
241    derive_more::Debug,
242    derive_more::Display,
243    PartialEq,
244    Eq,
245    Serialize,
246    Deserialize,
247    IntoStaticStr,
248)]
249#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
250#[allow(clippy::large_enum_variant)]
251pub enum ExecutionEventInner {
252    /// Created by an external system or a scheduler when requesting a child execution or
253    /// an executor when continuing as new `FinishedExecutionError`::`ContinueAsNew`,`CancelledWithNew` .
254    /// The execution is [`PendingState::PendingAt`]`(scheduled_at)`.
255    #[display("Created({ffqn}, `{scheduled_at}`)")]
256    Created {
257        ffqn: FunctionFqn,
258        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
259        #[debug(skip)]
260        params: Params,
261        parent: Option<(ExecutionId, JoinSetId)>,
262        scheduled_at: DateTime<Utc>,
263        retry_exp_backoff: Duration,
264        max_retries: u32,
265        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
266        component_id: ComponentId,
267        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
268        metadata: ExecutionMetadata,
269        scheduled_by: Option<ExecutionId>,
270    },
271    // Created by an executor.
272    // Either immediately followed by an execution request by an executor or
273    // after expiry immediately followed by WaitingForExecutor by a scheduler.
274    #[display("Locked(`{lock_expires_at}`, {component_id})")]
275    Locked {
276        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
277        component_id: ComponentId,
278        executor_id: ExecutorId,
279        run_id: RunId,
280        lock_expires_at: DateTime<Utc>,
281    },
282    /// Returns execution to [`PendingState::PendingNow`] state
283    /// without timing out. This can happen when the executor is running
284    /// out of resources like [`WorkerError::LimitReached`] or when
285    /// the executor is shutting down.
286    #[display("Unlocked(`{backoff_expires_at}`)")]
287    Unlocked {
288        backoff_expires_at: DateTime<Utc>,
289        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
290        reason: StrVariant,
291    },
292    // Created by the executor holding the lock.
293    // After expiry interpreted as pending.
294    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
295    TemporarilyFailed {
296        backoff_expires_at: DateTime<Utc>,
297        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
298        reason_full: StrVariant,
299        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason inner")))]
300        reason_inner: StrVariant,
301        detail: Option<String>,
302        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
303        http_client_traces: Option<Vec<HttpClientTrace>>,
304    },
305    // Created by the executor holding last lock.
306    // After expiry interpreted as pending.
307    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
308    TemporarilyTimedOut {
309        backoff_expires_at: DateTime<Utc>,
310        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
311        http_client_traces: Option<Vec<HttpClientTrace>>,
312    },
313    // Created by the executor holding last lock.
314    // Processed by a scheduler if a parent execution needs to be notified,
315    // also when
316    #[display("Finished")]
317    Finished {
318        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
319        result: SupportedFunctionReturnValue,
320        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
321        http_client_traces: Option<Vec<HttpClientTrace>>,
322    },
323
324    #[display("HistoryEvent({event})")]
325    HistoryEvent { event: HistoryEvent },
326}
327
328impl ExecutionEventInner {
329    #[must_use]
330    pub fn is_temporary_event(&self) -> bool {
331        matches!(
332            self,
333            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
334        )
335    }
336
337    #[must_use]
338    pub fn variant(&self) -> &'static str {
339        Into::<&'static str>::into(self)
340    }
341
342    #[must_use]
343    pub fn join_set_id(&self) -> Option<&JoinSetId> {
344        match self {
345            Self::Created {
346                parent: Some((_parent_id, join_set_id)),
347                ..
348            } => Some(join_set_id),
349            Self::HistoryEvent {
350                event:
351                    HistoryEvent::JoinSetCreate { join_set_id, .. }
352                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
353                    | HistoryEvent::JoinNext { join_set_id, .. },
354            } => Some(join_set_id),
355            _ => None,
356        }
357    }
358}
359
360#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
361#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
362#[serde(tag = "type")]
363pub enum PersistKind {
364    #[display("RandomU64({min}, {max_inclusive})")]
365    RandomU64 { min: u64, max_inclusive: u64 },
366    #[display("RandomString({min_length}, {max_length_exclusive})")]
367    RandomString {
368        min_length: u64,
369        max_length_exclusive: u64,
370    },
371}
372
373#[must_use]
374pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
375    value.to_be_bytes()
376}
377
378#[must_use]
379pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
380    u64::from_be_bytes(bytes)
381}
382
383#[derive(
384    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
385)]
386#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
387#[serde(tag = "type")]
388/// Must be created by the executor in [`PendingState::Locked`].
389pub enum HistoryEvent {
390    /// Persist a generated pseudorandom value.
391    #[display("Persist")]
392    Persist {
393        #[debug(skip)]
394        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
395        kind: PersistKind,
396    },
397    #[display("JoinSetCreate({join_set_id})")]
398    JoinSetCreate {
399        join_set_id: JoinSetId,
400        closing_strategy: ClosingStrategy,
401    },
402    #[display("JoinSetRequest({join_set_id}, {request})")]
403    JoinSetRequest {
404        join_set_id: JoinSetId,
405        request: JoinSetRequest,
406    },
407    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
408    /// When the response arrives at `resp_time`:
409    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
410    /// original executor can continue. After the expiry any executor can continue without
411    /// marking the execution as timed out.
412    #[display("JoinNext({join_set_id})")]
413    JoinNext {
414        join_set_id: JoinSetId,
415        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
416        /// The pending status will be kept in Locked state until `run_expires_at`.
417        run_expires_at: DateTime<Utc>,
418        /// Set to a specific function when calling `-await-next` extension function, used for
419        /// determinism checks.
420        requested_ffqn: Option<FunctionFqn>,
421        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
422        closing: bool,
423    },
424    /// Records the fact that a join set was awaited more times than its submission count.
425    #[display("JoinNextTooMany({join_set_id})")]
426    JoinNextTooMany {
427        join_set_id: JoinSetId,
428        /// Set to a specific function when calling `-await-next` extension function, used for
429        /// determinism checks.
430        requested_ffqn: Option<FunctionFqn>,
431    },
432    #[display("Schedule({execution_id}, {schedule_at})")]
433    Schedule {
434        execution_id: ExecutionId,
435        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
436    },
437    #[display("Stub({target_execution_id})")]
438    Stub {
439        target_execution_id: ExecutionIdDerived,
440        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
441        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
442        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
443    },
444}
445
446#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
447#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
448pub enum HistoryEventScheduleAt {
449    Now,
450    At(DateTime<Utc>),
451    #[display("In({_0:?})")]
452    In(Duration),
453}
454
455#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
456pub enum ScheduleAtConversionError {
457    #[error("source duration value is out of range")]
458    OutOfRangeError,
459}
460
461impl HistoryEventScheduleAt {
462    pub fn as_date_time(
463        &self,
464        now: DateTime<Utc>,
465    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
466        match self {
467            Self::Now => Ok(now),
468            Self::At(date_time) => Ok(*date_time),
469            Self::In(duration) => {
470                let time_delta = TimeDelta::from_std(*duration)
471                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
472                now.checked_add_signed(time_delta)
473                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
474            }
475        }
476    }
477}
478
479#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
480#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
481#[serde(tag = "type")]
482pub enum JoinSetRequest {
483    // Must be created by the executor in `PendingState::Locked`.
484    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
485    DelayRequest {
486        delay_id: DelayId,
487        expires_at: DateTime<Utc>,
488        schedule_at: HistoryEventScheduleAt,
489    },
490    // Must be created by the executor in `PendingState::Locked`.
491    #[display("ChildExecutionRequest({child_execution_id})")]
492    ChildExecutionRequest {
493        child_execution_id: ExecutionIdDerived,
494    },
495}
496
497#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
498pub enum DbConnectionError {
499    #[error("send error")]
500    SendError,
501    #[error("receive error")]
502    RecvError,
503}
504
505#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
506pub enum SpecificError {
507    #[error("validation failed: {0}")]
508    ValidationFailed(StrVariant),
509    #[error("version mismatch")]
510    VersionMismatch {
511        appending_version: Version,
512        expected_version: Version,
513    },
514    #[error("version missing")]
515    VersionMissing,
516    #[error("not found")]
517    NotFound,
518    #[error("consistency error: `{0}`")]
519    ConsistencyError(StrVariant),
520    #[error("{0}")]
521    GenericError(StrVariant),
522}
523
524#[derive(thiserror::Error, Debug, PartialEq, Eq, Clone)]
525pub enum DbError {
526    #[error(transparent)]
527    Connection(#[from] DbConnectionError),
528    #[error(transparent)]
529    Specific(#[from] SpecificError),
530}
531
532#[derive(thiserror::Error, Debug, PartialEq, Eq, Clone)]
533pub enum SubscribeError {
534    #[error("interrupted")]
535    Interrupted,
536    #[error(transparent)]
537    DbError(DbError),
538}
539
540pub type AppendResponse = Version;
541pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
542pub type LockResponse = (Vec<HistoryEvent>, Version);
543
544#[derive(Debug, Clone)]
545pub struct LockedExecution {
546    pub execution_id: ExecutionId,
547    pub metadata: ExecutionMetadata,
548    pub run_id: RunId,
549    pub version: Version,
550    pub ffqn: FunctionFqn,
551    pub params: Params,
552    pub event_history: Vec<HistoryEvent>,
553    pub responses: Vec<JoinSetResponseEventOuter>,
554    pub scheduled_at: DateTime<Utc>,
555    pub retry_exp_backoff: Duration,
556    pub max_retries: u32,
557    pub parent: Option<(ExecutionId, JoinSetId)>,
558    pub temporary_event_count: u32,
559}
560
561pub type LockPendingResponse = Vec<LockedExecution>;
562pub type AppendBatchResponse = Version;
563
564#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
565#[display("{event}")]
566pub struct AppendRequest {
567    pub created_at: DateTime<Utc>,
568    pub event: ExecutionEventInner,
569}
570
571#[derive(Debug, Clone)]
572pub struct CreateRequest {
573    pub created_at: DateTime<Utc>,
574    pub execution_id: ExecutionId,
575    pub ffqn: FunctionFqn,
576    pub params: Params,
577    pub parent: Option<(ExecutionId, JoinSetId)>,
578    pub scheduled_at: DateTime<Utc>,
579    pub retry_exp_backoff: Duration,
580    pub max_retries: u32,
581    pub component_id: ComponentId,
582    pub metadata: ExecutionMetadata,
583    pub scheduled_by: Option<ExecutionId>,
584}
585
586impl From<CreateRequest> for ExecutionEventInner {
587    fn from(value: CreateRequest) -> Self {
588        Self::Created {
589            ffqn: value.ffqn,
590            params: value.params,
591            parent: value.parent,
592            scheduled_at: value.scheduled_at,
593            retry_exp_backoff: value.retry_exp_backoff,
594            max_retries: value.max_retries,
595            component_id: value.component_id,
596            metadata: value.metadata,
597            scheduled_by: value.scheduled_by,
598        }
599    }
600}
601
602#[async_trait]
603pub trait DbPool: Send + Sync {
604    fn connection(&self) -> Box<dyn DbConnection>;
605    fn is_closing(&self) -> bool;
606    async fn close(&self) -> Result<(), DbError>;
607}
608
609#[derive(Debug, thiserror::Error)]
610pub enum ClientError {
611    #[error("client timeout")]
612    Timeout,
613    #[error(transparent)]
614    DbError(#[from] DbError),
615}
616
617#[async_trait]
618pub trait DbConnection: Send + Sync {
619    #[expect(clippy::too_many_arguments)]
620    async fn lock_pending(
621        &self,
622        batch_size: usize,
623        pending_at_or_sooner: DateTime<Utc>,
624        ffqns: Arc<[FunctionFqn]>,
625        created_at: DateTime<Utc>,
626        component_id: ComponentId,
627        executor_id: ExecutorId,
628        lock_expires_at: DateTime<Utc>,
629        run_id: RunId,
630    ) -> Result<LockPendingResponse, DbError>;
631
632    /// Specialized `append` which returns the event history.
633    #[expect(clippy::too_many_arguments)]
634    async fn lock(
635        &self,
636        created_at: DateTime<Utc>,
637        component_id: ComponentId,
638        execution_id: &ExecutionId,
639        run_id: RunId,
640        version: Version,
641        executor_id: ExecutorId,
642        lock_expires_at: DateTime<Utc>,
643    ) -> Result<LockResponse, DbError>;
644
645    /// Append a single event to an existing execution log
646    async fn append(
647        &self,
648        execution_id: ExecutionId,
649        version: Version,
650        req: AppendRequest,
651    ) -> Result<AppendResponse, DbError>;
652
653    async fn append_response(
654        &self,
655        created_at: DateTime<Utc>,
656        execution_id: ExecutionId,
657        response_event: JoinSetResponseEvent,
658    ) -> Result<(), DbError>;
659
660    /// Append one or more events to an existing execution log
661    async fn append_batch(
662        &self,
663        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
664        batch: Vec<AppendRequest>,
665        execution_id: ExecutionId,
666        version: Version,
667    ) -> Result<AppendBatchResponse, DbError>;
668
669    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
670    async fn append_batch_create_new_execution(
671        &self,
672        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
673        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
674        execution_id: ExecutionId,
675        version: Version,
676        child_req: Vec<CreateRequest>,
677    ) -> Result<AppendBatchResponse, DbError>;
678
679    async fn append_batch_respond_to_parent(
680        &self,
681        execution_id: ExecutionIdDerived,
682        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
683        batch: Vec<AppendRequest>,
684        version: Version,
685        parent_execution_id: ExecutionId,
686        parent_response_event: JoinSetResponseEventOuter,
687    ) -> Result<AppendBatchResponse, DbError>;
688
689    #[cfg(feature = "test")]
690    /// Get execution log.
691    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbError>;
692
693    async fn list_execution_events(
694        &self,
695        execution_id: &ExecutionId,
696        since: &Version,
697        max_length: VersionType,
698        include_backtrace_id: bool,
699    ) -> Result<Vec<ExecutionEvent>, DbError>;
700
701    /// Get a single event without `backtrace_id`
702    async fn get_execution_event(
703        &self,
704        execution_id: &ExecutionId,
705        version: &Version,
706    ) -> Result<ExecutionEvent, DbError>;
707
708    async fn get_create_request(
709        &self,
710        execution_id: &ExecutionId,
711    ) -> Result<CreateRequest, DbError> {
712        let execution_event = self
713            .get_execution_event(execution_id, &Version::new(0))
714            .await?;
715        if let ExecutionEventInner::Created {
716            ffqn,
717            params,
718            parent,
719            scheduled_at,
720            retry_exp_backoff,
721            max_retries,
722            component_id,
723            metadata,
724            scheduled_by,
725        } = execution_event.event
726        {
727            Ok(CreateRequest {
728                created_at: execution_event.created_at,
729                execution_id: execution_id.clone(),
730                ffqn,
731                params,
732                parent,
733                scheduled_at,
734                retry_exp_backoff,
735                max_retries,
736                component_id,
737                metadata,
738                scheduled_by,
739            })
740        } else {
741            error!(%execution_id, "Execution log must start with creation");
742            Err(DbError::Specific(SpecificError::ConsistencyError(
743                StrVariant::Static("execution log must start with creation"),
744            )))
745        }
746    }
747
748    async fn get_finished_result(
749        &self,
750        execution_id: &ExecutionId,
751        finished: PendingStateFinished,
752    ) -> Result<Option<SupportedFunctionReturnValue>, DbError> {
753        let last_event = self
754            .get_execution_event(execution_id, &Version::new(finished.version))
755            .await?;
756        if let ExecutionEventInner::Finished { result, .. } = last_event.event {
757            Ok(Some(result))
758        } else {
759            Ok(None)
760        }
761    }
762
763    async fn get_pending_state(&self, execution_id: &ExecutionId) -> Result<PendingState, DbError>;
764
765    /// Get currently expired locks and async timers (delay requests)
766    async fn get_expired_timers(&self, at: DateTime<Utc>) -> Result<Vec<ExpiredTimer>, DbError>;
767
768    /// Create a new execution log
769    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbError>;
770
771    /// Get notified when a new response arrives.
772    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
773    /// If no response arrives immediately and `interrupt_after` resolves, return an interrupt.
774    async fn subscribe_to_next_responses(
775        &self,
776        execution_id: &ExecutionId,
777        start_idx: usize,
778        interrupt_after: Pin<Box<dyn Future<Output = ()> + Send>>,
779    ) -> Result<Vec<JoinSetResponseEventOuter>, SubscribeError>;
780
781    async fn wait_for_finished_result(
782        &self,
783        execution_id: &ExecutionId,
784        timeout: Option<Duration>,
785    ) -> Result<SupportedFunctionReturnValue, ClientError>;
786
787    /// Best effort for blocking while there are no pending executions.
788    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
789    /// Implementation must return not later than at expiry date, which is: `pending_at_or_sooner` + `max_wait`.
790    /// Timers that expire until the expiry date can be disregarded.
791    /// Databases that do not support subscriptions should wait for `max_wait`.
792    async fn wait_for_pending(
793        &self,
794        pending_at_or_sooner: DateTime<Utc>,
795        ffqns: Arc<[FunctionFqn]>,
796        max_wait: Duration,
797    );
798
799    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbError>;
800
801    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbError>;
802
803    /// Used by gRPC only.
804    /// Get the latest backtrace if version is not set.
805    async fn get_backtrace(
806        &self,
807        execution_id: &ExecutionId,
808        filter: BacktraceFilter,
809    ) -> Result<BacktraceInfo, DbError>;
810
811    /// Returns executions sorted in descending order.
812    /// Used by gRPC only.
813    async fn list_executions(
814        &self,
815        ffqn: Option<FunctionFqn>,
816        top_level_only: bool,
817        pagination: ExecutionListPagination,
818    ) -> Result<Vec<ExecutionWithState>, DbError>;
819
820    /// Returns responses of an execution ordered as they arrived,
821    /// enabling matching each `JoinNext` to its corresponding response.
822    /// Used by gRPC only.
823    async fn list_responses(
824        &self,
825        execution_id: &ExecutionId,
826        pagination: Pagination<u32>,
827    ) -> Result<Vec<ResponseWithCursor>, DbError>;
828}
829
830#[derive(Clone)]
831pub enum BacktraceFilter {
832    First,
833    Last,
834    Specific(Version),
835}
836
837pub struct BacktraceInfo {
838    pub execution_id: ExecutionId,
839    pub component_id: ComponentId,
840    pub version_min_including: Version,
841    pub version_max_excluding: Version,
842    pub wasm_backtrace: WasmBacktrace,
843}
844
845#[derive(Serialize, Deserialize, Debug, Clone)]
846pub struct WasmBacktrace {
847    pub frames: Vec<FrameInfo>,
848}
849
850#[derive(Serialize, Deserialize, Debug, Clone)]
851pub struct FrameInfo {
852    pub module: String,
853    pub func_name: String,
854    pub symbols: Vec<FrameSymbol>,
855}
856
857#[derive(Serialize, Deserialize, Debug, Clone)]
858pub struct FrameSymbol {
859    pub func_name: Option<String>,
860    pub file: Option<String>,
861    pub line: Option<u32>,
862    pub col: Option<u32>,
863}
864
865mod wasm_backtrace {
866    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
867
868    impl WasmBacktrace {
869        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
870            if backtrace.frames().is_empty() {
871                None
872            } else {
873                Some(Self {
874                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
875                })
876            }
877        }
878    }
879
880    impl From<&wasmtime::FrameInfo> for FrameInfo {
881        fn from(frame: &wasmtime::FrameInfo) -> Self {
882            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
883            let mut func_name = String::new();
884            wasmtime_environ::demangle_function_name_or_index(
885                &mut func_name,
886                frame.func_name(),
887                frame.func_index() as usize,
888            )
889            .expect("writing to string must succeed");
890            Self {
891                module: module_name,
892                func_name,
893                symbols: frame
894                    .symbols()
895                    .iter()
896                    .map(std::convert::Into::into)
897                    .collect(),
898            }
899        }
900    }
901
902    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
903        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
904            let func_name = symbol.name().map(|name| {
905                let mut writer = String::new();
906                wasmtime_environ::demangle_function_name(&mut writer, name)
907                    .expect("writing to string must succeed");
908                writer
909            });
910
911            Self {
912                func_name,
913                file: symbol.file().map(ToString::to_string),
914                line: symbol.line(),
915                col: symbol.column(),
916            }
917        }
918    }
919}
920
921#[derive(Debug, Clone)]
922pub struct ResponseWithCursor {
923    pub event: JoinSetResponseEventOuter,
924    pub cursor: u32,
925}
926
927pub struct ExecutionWithState {
928    pub execution_id: ExecutionId,
929    pub ffqn: FunctionFqn,
930    pub pending_state: PendingState,
931    pub created_at: DateTime<Utc>,
932    pub scheduled_at: DateTime<Utc>,
933}
934
935pub enum ExecutionListPagination {
936    CreatedBy(Pagination<Option<DateTime<Utc>>>),
937    ExecutionId(Pagination<Option<ExecutionId>>),
938}
939
940#[derive(Debug, Clone, Copy)]
941pub enum Pagination<T> {
942    NewerThan {
943        length: u32,
944        cursor: T,
945        including_cursor: bool,
946    },
947    OlderThan {
948        length: u32,
949        cursor: T,
950        including_cursor: bool,
951    },
952}
953impl<T> Pagination<T> {
954    pub fn length(&self) -> u32 {
955        match self {
956            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
957        }
958    }
959    pub fn rel(&self) -> &'static str {
960        match self {
961            Pagination::NewerThan {
962                including_cursor: false,
963                ..
964            } => ">",
965            Pagination::NewerThan {
966                including_cursor: true,
967                ..
968            } => ">=",
969            Pagination::OlderThan {
970                including_cursor: false,
971                ..
972            } => "<",
973            Pagination::OlderThan {
974                including_cursor: true,
975                ..
976            } => "<=",
977        }
978    }
979    pub fn is_desc(&self) -> bool {
980        matches!(self, Pagination::OlderThan { .. })
981    }
982}
983
984#[cfg(feature = "test")]
985pub async fn wait_for_pending_state_fn<T: Debug>(
986    db_connection: &dyn DbConnection,
987    execution_id: &ExecutionId,
988    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
989    timeout: Option<Duration>,
990) -> Result<T, ClientError> {
991    tracing::trace!(%execution_id, "Waiting for predicate");
992    let fut = async move {
993        loop {
994            let execution_log = db_connection.get(execution_id).await?;
995            if let Some(t) = predicate(execution_log) {
996                tracing::debug!(%execution_id, "Found: {t:?}");
997                return Ok(t);
998            }
999            tokio::time::sleep(Duration::from_millis(10)).await;
1000        }
1001    };
1002
1003    if let Some(timeout) = timeout {
1004        tokio::select! { // future's liveness: Dropping the loser immediately.
1005            res = fut => res,
1006            () = tokio::time::sleep(timeout) => Err(ClientError::Timeout)
1007        }
1008    } else {
1009        fut.await
1010    }
1011}
1012
1013#[derive(Debug, Clone, PartialEq, Eq)]
1014pub enum ExpiredTimer {
1015    Lock {
1016        execution_id: ExecutionId,
1017        locked_at_version: Version,
1018        version: Version, // Next version
1019        /// As the execution may still be running, this represents the number of temporary failures + timeouts prior to this execution.
1020        temporary_event_count: u32,
1021        max_retries: u32,
1022        retry_exp_backoff: Duration,
1023        parent: Option<(ExecutionId, JoinSetId)>,
1024    },
1025    Delay {
1026        execution_id: ExecutionId,
1027        join_set_id: JoinSetId,
1028        delay_id: DelayId,
1029    },
1030}
1031
1032#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1033#[serde(tag = "type")]
1034pub enum PendingState {
1035    #[display("Locked(`{lock_expires_at}`, {executor_id}, {run_id})")]
1036    Locked {
1037        executor_id: ExecutorId,
1038        run_id: RunId,
1039        lock_expires_at: DateTime<Utc>,
1040    },
1041    #[display("PendingAt(`{scheduled_at}`)")]
1042    PendingAt { scheduled_at: DateTime<Utc> }, // e.g. created with a schedule, temporary timeout/failure
1043    #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1044    /// Caused by [`HistoryEvent::JoinNext`]
1045    BlockedByJoinSet {
1046        join_set_id: JoinSetId,
1047        /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1048        lock_expires_at: DateTime<Utc>,
1049        /// Blocked by closing of the join set
1050        closing: bool,
1051    },
1052    #[display("Finished({finished})")]
1053    Finished { finished: PendingStateFinished },
1054}
1055
1056#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1057#[display("{result_kind}")]
1058pub struct PendingStateFinished {
1059    pub version: VersionType, // not Version since it must be Copy
1060    pub finished_at: DateTime<Utc>,
1061    pub result_kind: PendingStateFinishedResultKind,
1062}
1063
1064#[derive(
1065    Debug, Clone, Copy, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
1066)]
1067pub struct PendingStateFinishedResultKind(pub Result<(), PendingStateFinishedError>);
1068const OK_VARIANT: &str = "ok";
1069impl FromStr for PendingStateFinishedResultKind {
1070    type Err = strum::ParseError;
1071
1072    fn from_str(s: &str) -> Result<Self, Self::Err> {
1073        if s == OK_VARIANT {
1074            Ok(PendingStateFinishedResultKind(Ok(())))
1075        } else {
1076            let err = PendingStateFinishedError::from_str(s)?;
1077            Ok(PendingStateFinishedResultKind(Err(err)))
1078        }
1079    }
1080}
1081
1082impl Display for PendingStateFinishedResultKind {
1083    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1084        match self.0 {
1085            Ok(()) => write!(f, "{OK_VARIANT}"),
1086            Err(err) => write!(f, "{err}"),
1087        }
1088    }
1089}
1090
1091impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1092    fn from(result: &SupportedFunctionReturnValue) -> Self {
1093        result.as_pending_state_finished_result()
1094    }
1095}
1096
1097#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumString, strum::Display)]
1098#[cfg_attr(test, derive(strum::VariantArray))]
1099#[strum(serialize_all = "snake_case")]
1100pub enum PendingStateFinishedError {
1101    Timeout,
1102    ExecutionFailure,
1103    FallibleError,
1104}
1105
1106impl PendingState {
1107    pub fn can_append_lock(
1108        &self,
1109        created_at: DateTime<Utc>,
1110        executor_id: ExecutorId,
1111        run_id: RunId,
1112        lock_expires_at: DateTime<Utc>,
1113    ) -> Result<LockKind, SpecificError> {
1114        if lock_expires_at <= created_at {
1115            return Err(SpecificError::ValidationFailed(StrVariant::Static(
1116                "invalid expiry date",
1117            )));
1118        }
1119        match self {
1120            PendingState::PendingAt { scheduled_at } => {
1121                if *scheduled_at <= created_at {
1122                    // pending now, ok to lock
1123                    Ok(LockKind::CreatingNewLock)
1124                } else {
1125                    Err(SpecificError::ValidationFailed(StrVariant::Static(
1126                        "cannot lock, not yet pending",
1127                    )))
1128                }
1129            }
1130            PendingState::Locked {
1131                executor_id: current_pending_state_executor_id,
1132                run_id: current_pending_state_run_id,
1133                ..
1134            } => {
1135                if executor_id == *current_pending_state_executor_id // if the executor_id is same, component_id must be same as well
1136                    && run_id == *current_pending_state_run_id
1137                {
1138                    // Original executor is extending the lock.
1139                    Ok(LockKind::Extending)
1140                } else {
1141                    Err(SpecificError::ValidationFailed(StrVariant::Static(
1142                        "cannot lock, already locked",
1143                    )))
1144                }
1145            }
1146            PendingState::BlockedByJoinSet { .. } => Err(SpecificError::ValidationFailed(
1147                StrVariant::Static("cannot append Locked event when in BlockedByJoinSet state"),
1148            )),
1149            PendingState::Finished { .. } => Err(SpecificError::ValidationFailed(
1150                StrVariant::Static("already finished"),
1151            )),
1152        }
1153    }
1154
1155    #[must_use]
1156    pub fn is_finished(&self) -> bool {
1157        matches!(self, PendingState::Finished { .. })
1158    }
1159}
1160
1161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1162pub enum LockKind {
1163    Extending,
1164    CreatingNewLock,
1165}
1166
1167pub mod http_client_trace {
1168    use chrono::{DateTime, Utc};
1169    use serde::{Deserialize, Serialize};
1170
1171    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1172    pub struct HttpClientTrace {
1173        pub req: RequestTrace,
1174        pub resp: Option<ResponseTrace>,
1175    }
1176
1177    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1178    pub struct RequestTrace {
1179        pub sent_at: DateTime<Utc>,
1180        pub uri: String,
1181        pub method: String,
1182    }
1183
1184    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1185    pub struct ResponseTrace {
1186        pub finished_at: DateTime<Utc>,
1187        pub status: Result<u16, String>,
1188    }
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193    use super::HistoryEventScheduleAt;
1194    use super::PendingStateFinished;
1195    use super::PendingStateFinishedError;
1196    use super::PendingStateFinishedResultKind;
1197    use crate::SupportedFunctionReturnValue;
1198    use chrono::DateTime;
1199    use chrono::Datelike;
1200    use chrono::Utc;
1201    use insta::assert_snapshot;
1202    use rstest::rstest;
1203    use std::time::Duration;
1204    use strum::VariantArray as _;
1205    use val_json::type_wrapper::TypeWrapper;
1206    use val_json::wast_val::WastVal;
1207    use val_json::wast_val::WastValWithType;
1208
1209    #[rstest(expected => [
1210        PendingStateFinishedResultKind(Result::Ok(())),
1211        PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1212    ])]
1213    #[test]
1214    fn serde_pending_state_finished_result_kind_should_work(
1215        expected: PendingStateFinishedResultKind,
1216    ) {
1217        let ser = serde_json::to_string(&expected).unwrap();
1218        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1219        assert_eq!(expected, actual);
1220    }
1221
1222    #[rstest(result_kind => [
1223        PendingStateFinishedResultKind(Result::Ok(())),
1224        PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1225    ])]
1226    #[test]
1227    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1228        let expected = PendingStateFinished {
1229            version: 0,
1230            finished_at: Utc::now(),
1231            result_kind,
1232        };
1233
1234        let ser = serde_json::to_string(&expected).unwrap();
1235        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1236        assert_eq!(expected, actual);
1237    }
1238
1239    #[test]
1240    fn join_set_deser_with_result_ok_option_none_should_work() {
1241        let expected = SupportedFunctionReturnValue::Ok {
1242            ok: Some(WastValWithType {
1243                r#type: TypeWrapper::Result {
1244                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1245                    err: Some(Box::new(TypeWrapper::String)),
1246                },
1247                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1248            }),
1249        };
1250        let json = serde_json::to_string(&expected).unwrap();
1251        assert_snapshot!(json);
1252
1253        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1254
1255        assert_eq!(expected, actual);
1256    }
1257
1258    #[test]
1259    fn verify_pending_state_finished_result_kind_serde() {
1260        let variants: Vec<_> = PendingStateFinishedError::VARIANTS
1261            .iter()
1262            .map(|var| PendingStateFinishedResultKind(Err(*var)))
1263            .chain(std::iter::once(PendingStateFinishedResultKind(Ok(()))))
1264            .collect();
1265        let ser = serde_json::to_string_pretty(&variants).unwrap();
1266        insta::assert_snapshot!(ser);
1267        let deser: Vec<PendingStateFinishedResultKind> = serde_json::from_str(&ser).unwrap();
1268        assert_eq!(variants, deser);
1269    }
1270
1271    #[test]
1272    fn as_date_time_should_work_with_duration_u32_max_secs() {
1273        let duration = Duration::from_secs(u64::from(u32::MAX));
1274        let schedule_at = HistoryEventScheduleAt::In(duration);
1275        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1276        assert_eq!(2106, resolved.year());
1277    }
1278
1279    const MILLIS_PER_SEC: i64 = 1000;
1280    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1281
1282    #[test]
1283    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1284        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
1285        let duration = Duration::from_secs(
1286            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1287        );
1288        let schedule_at = HistoryEventScheduleAt::In(duration);
1289        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1290    }
1291}