obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ExecutionFailureKind;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FinishedExecutionError;
7use crate::FunctionFqn;
8use crate::JoinSetId;
9use crate::Params;
10use crate::StrVariant;
11use crate::SupportedFunctionReturnValue;
12use crate::prefixed_ulid::DelayId;
13use crate::prefixed_ulid::ExecutionIdDerived;
14use crate::prefixed_ulid::ExecutorId;
15use crate::prefixed_ulid::RunId;
16use assert_matches::assert_matches;
17use async_trait::async_trait;
18use chrono::TimeDelta;
19use chrono::{DateTime, Utc};
20use http_client_trace::HttpClientTrace;
21use serde::Deserialize;
22use serde::Serialize;
23use std::fmt::Debug;
24use std::fmt::Display;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28use strum::IntoStaticStr;
29use tracing::debug;
30use tracing::error;
31
32/// Remote client representation of the execution journal.
33#[derive(Debug, PartialEq, Eq)]
34#[cfg_attr(feature = "test", derive(Serialize))]
35pub struct ExecutionLog {
36    pub execution_id: ExecutionId,
37    pub events: Vec<ExecutionEvent>,
38    pub responses: Vec<JoinSetResponseEventOuter>,
39    pub next_version: Version, // Is not advanced once in Finished state
40    pub pending_state: PendingState,
41}
42
43impl ExecutionLog {
44    #[must_use]
45    pub fn can_be_retried_after(
46        temporary_event_count: u32,
47        max_retries: u32,
48        retry_exp_backoff: Duration,
49    ) -> Option<Duration> {
50        // If max_retries == u32::MAX, wrapping is OK after this succeeds - we want to retry forever.
51        if temporary_event_count <= max_retries {
52            // TODO: Add test for number of retries
53            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
54            Some(duration)
55        } else {
56            None
57        }
58    }
59
60    #[must_use]
61    pub fn compute_retry_duration_when_retrying_forever(
62        temporary_event_count: u32,
63        retry_exp_backoff: Duration,
64    ) -> Duration {
65        Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
66            .expect("`max_retries` set to MAX must never return None")
67    }
68
69    #[must_use]
70    pub fn ffqn(&self) -> &FunctionFqn {
71        assert_matches!(self.events.first(), Some(ExecutionEvent {
72            event: ExecutionEventInner::Created { ffqn, .. },
73            ..
74        }) => ffqn)
75    }
76
77    #[must_use]
78    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
79        assert_matches!(self.events.first(), Some(ExecutionEvent {
80            event: ExecutionEventInner::Created { parent, .. },
81            ..
82        }) => parent.clone())
83    }
84
85    #[must_use]
86    pub fn last_event(&self) -> &ExecutionEvent {
87        self.events.last().expect("must contain at least one event")
88    }
89
90    #[must_use]
91    pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
92        if let ExecutionEvent {
93            event: ExecutionEventInner::Finished { result, .. },
94            ..
95        } = self.events.pop().expect("must contain at least one event")
96        {
97            Some(result)
98        } else {
99            None
100        }
101    }
102
103    #[cfg(feature = "test")]
104    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
105        self.events.iter().filter_map(|event| {
106            if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
107                Some((eh.clone(), event.version.clone()))
108            } else {
109                None
110            }
111        })
112    }
113
114    #[cfg(feature = "test")]
115    #[must_use]
116    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
117        self.events
118            .iter()
119            .find_map(move |event| match &event.event {
120                ExecutionEventInner::HistoryEvent {
121                    event:
122                        HistoryEvent::JoinSetRequest {
123                            join_set_id: found,
124                            request,
125                        },
126                    ..
127                } if *join_set_id == *found => Some(request),
128                _ => None,
129            })
130    }
131}
132
133pub type VersionType = u32;
134#[derive(
135    Debug,
136    Default,
137    Clone,
138    PartialEq,
139    Eq,
140    Hash,
141    derive_more::Display,
142    derive_more::Into,
143    serde::Serialize,
144    serde::Deserialize,
145)]
146#[serde(transparent)]
147pub struct Version(pub VersionType);
148impl Version {
149    #[must_use]
150    pub fn new(arg: VersionType) -> Version {
151        Version(arg)
152    }
153
154    #[must_use]
155    pub fn increment(&self) -> Version {
156        Version(self.0 + 1)
157    }
158}
159
160#[derive(
161    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
162)]
163#[display("{event}")]
164pub struct ExecutionEvent {
165    pub created_at: DateTime<Utc>,
166    pub event: ExecutionEventInner,
167    pub backtrace_id: Option<Version>,
168    pub version: Version,
169}
170
171/// Moves the execution to [`PendingState::PendingNow`] if it is currently blocked on `JoinNextBlocking`.
172#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
173pub struct JoinSetResponseEventOuter {
174    pub created_at: DateTime<Utc>,
175    pub event: JoinSetResponseEvent,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
179pub struct JoinSetResponseEvent {
180    pub join_set_id: JoinSetId,
181    pub event: JoinSetResponse,
182}
183
184#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
185#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
186#[serde(tag = "type")]
187pub enum JoinSetResponse {
188    DelayFinished {
189        delay_id: DelayId,
190        result: Result<(), ()>,
191    },
192    ChildExecutionFinished {
193        child_execution_id: ExecutionIdDerived,
194        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
195        finished_version: Version,
196        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
197        result: SupportedFunctionReturnValue,
198    },
199}
200
201pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
202    ffqn: FunctionFqn::new_static("", ""),
203    params: Params::empty(),
204    parent: None,
205    scheduled_at: DateTime::from_timestamp_nanos(0),
206    component_id: ComponentId::dummy_activity(),
207    metadata: ExecutionMetadata::empty(),
208    scheduled_by: None,
209};
210pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
211    event: HistoryEvent::JoinSetCreate {
212        join_set_id: JoinSetId {
213            kind: crate::JoinSetKind::OneOff,
214            name: StrVariant::empty(),
215        },
216    },
217};
218
219#[derive(
220    Clone,
221    derive_more::Debug,
222    derive_more::Display,
223    PartialEq,
224    Eq,
225    Serialize,
226    Deserialize,
227    IntoStaticStr,
228)]
229#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
230#[allow(clippy::large_enum_variant)]
231pub enum ExecutionEventInner {
232    #[display("Created({ffqn}, `{scheduled_at}`)")]
233    Created {
234        ffqn: FunctionFqn,
235        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
236        #[debug(skip)]
237        params: Params,
238        parent: Option<(ExecutionId, JoinSetId)>,
239        scheduled_at: DateTime<Utc>,
240        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
241        component_id: ComponentId,
242        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
243        metadata: ExecutionMetadata,
244        scheduled_by: Option<ExecutionId>,
245    },
246    Locked(Locked),
247    /// Returns execution to [`PendingState::PendingNow`] state
248    /// without timing out. This can happen when the executor is running
249    /// out of resources like [`WorkerError::LimitReached`] or when
250    /// the executor is shutting down.
251    #[display("Unlocked(`{backoff_expires_at}`)")]
252    Unlocked {
253        backoff_expires_at: DateTime<Utc>,
254        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
255        reason: StrVariant,
256    },
257    // Created by the executor holding the lock.
258    // After expiry interpreted as pending.
259    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
260    TemporarilyFailed {
261        backoff_expires_at: DateTime<Utc>,
262        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
263        reason: StrVariant,
264        detail: Option<String>,
265        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
266        http_client_traces: Option<Vec<HttpClientTrace>>,
267    },
268    // Created by the executor holding the lock.
269    // After expiry interpreted as pending.
270    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
271    TemporarilyTimedOut {
272        backoff_expires_at: DateTime<Utc>,
273        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
274        http_client_traces: Option<Vec<HttpClientTrace>>,
275    },
276    // Created by the executor holding the lock.
277    #[display("Finished")]
278    Finished {
279        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
280        result: SupportedFunctionReturnValue,
281        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
282        http_client_traces: Option<Vec<HttpClientTrace>>,
283    },
284
285    #[display("HistoryEvent({event})")]
286    HistoryEvent {
287        event: HistoryEvent,
288    },
289}
290
291impl ExecutionEventInner {
292    #[must_use]
293    pub fn is_temporary_event(&self) -> bool {
294        matches!(
295            self,
296            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
297        )
298    }
299
300    #[must_use]
301    pub fn variant(&self) -> &'static str {
302        Into::<&'static str>::into(self)
303    }
304
305    #[must_use]
306    pub fn join_set_id(&self) -> Option<&JoinSetId> {
307        match self {
308            Self::Created {
309                parent: Some((_parent_id, join_set_id)),
310                ..
311            } => Some(join_set_id),
312            Self::HistoryEvent {
313                event:
314                    HistoryEvent::JoinSetCreate { join_set_id, .. }
315                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
316                    | HistoryEvent::JoinNext { join_set_id, .. },
317            } => Some(join_set_id),
318            _ => None,
319        }
320    }
321}
322
323#[derive(
324    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
325)]
326#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
327#[display("Locked(`{lock_expires_at}`, {component_id})")]
328pub struct Locked {
329    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
330    pub component_id: ComponentId,
331    pub executor_id: ExecutorId,
332    pub run_id: RunId,
333    pub lock_expires_at: DateTime<Utc>,
334    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
335    pub retry_config: ComponentRetryConfig,
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
339#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
340#[serde(tag = "type")]
341pub enum PersistKind {
342    #[display("RandomU64({min}, {max_inclusive})")]
343    RandomU64 { min: u64, max_inclusive: u64 },
344    #[display("RandomString({min_length}, {max_length_exclusive})")]
345    RandomString {
346        min_length: u64,
347        max_length_exclusive: u64,
348    },
349}
350
351#[must_use]
352pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
353    value.to_be_bytes()
354}
355
356#[must_use]
357pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
358    u64::from_be_bytes(bytes)
359}
360
361#[derive(
362    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
363)]
364#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
365#[serde(tag = "type")]
366/// Must be created by the executor in [`PendingState::Locked`].
367pub enum HistoryEvent {
368    /// Persist a generated pseudorandom value.
369    #[display("Persist")]
370    Persist {
371        #[debug(skip)]
372        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
373        kind: PersistKind,
374    },
375    #[display("JoinSetCreate({join_set_id})")]
376    JoinSetCreate { join_set_id: JoinSetId },
377    #[display("JoinSetRequest({request})")]
378    // join_set_id is part of ExecutionId or DelayId in the `request`
379    JoinSetRequest {
380        join_set_id: JoinSetId,
381        request: JoinSetRequest,
382    },
383    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
384    /// When the response arrives at `resp_time`:
385    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
386    /// original executor can continue. After the expiry any executor can continue without
387    /// marking the execution as timed out.
388    #[display("JoinNext({join_set_id})")]
389    JoinNext {
390        join_set_id: JoinSetId,
391        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
392        /// The pending status will be kept in Locked state until `run_expires_at`.
393        run_expires_at: DateTime<Utc>,
394        /// Set to a specific function when calling `-await-next` extension function, used for
395        /// determinism checks.
396        requested_ffqn: Option<FunctionFqn>,
397        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
398        closing: bool,
399    },
400    /// Records the fact that a join set was awaited more times than its submission count.
401    #[display("JoinNextTooMany({join_set_id})")]
402    JoinNextTooMany {
403        join_set_id: JoinSetId,
404        /// Set to a specific function when calling `-await-next` extension function, used for
405        /// determinism checks.
406        requested_ffqn: Option<FunctionFqn>,
407    },
408    #[display("Schedule({execution_id}, {schedule_at})")]
409    Schedule {
410        execution_id: ExecutionId,
411        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
412    },
413    #[display("Stub({target_execution_id})")]
414    Stub {
415        target_execution_id: ExecutionIdDerived,
416        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
417        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
418        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
419    },
420}
421
422#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
423#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
424pub enum HistoryEventScheduleAt {
425    Now,
426    #[display("At(`{_0}`)")]
427    At(DateTime<Utc>),
428    #[display("In({_0:?})")]
429    In(Duration),
430}
431
432#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
433pub enum ScheduleAtConversionError {
434    #[error("source duration value is out of range")]
435    OutOfRangeError,
436}
437
438impl HistoryEventScheduleAt {
439    pub fn as_date_time(
440        &self,
441        now: DateTime<Utc>,
442    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
443        match self {
444            Self::Now => Ok(now),
445            Self::At(date_time) => Ok(*date_time),
446            Self::In(duration) => {
447                let time_delta = TimeDelta::from_std(*duration)
448                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
449                now.checked_add_signed(time_delta)
450                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
451            }
452        }
453    }
454}
455
456#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
457#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
458#[serde(tag = "type")]
459pub enum JoinSetRequest {
460    // Must be created by the executor in `PendingState::Locked`.
461    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
462    DelayRequest {
463        delay_id: DelayId,
464        expires_at: DateTime<Utc>,
465        schedule_at: HistoryEventScheduleAt,
466    },
467    // Must be created by the executor in `PendingState::Locked`.
468    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
469    ChildExecutionRequest {
470        child_execution_id: ExecutionIdDerived,
471        target_ffqn: FunctionFqn,
472        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
473        params: Params,
474    },
475}
476
477/// Error that is not specific to an execution.
478#[derive(Debug, Clone, thiserror::Error, PartialEq)]
479pub enum DbErrorGeneric {
480    #[error("database error: {0}")]
481    Uncategorized(StrVariant),
482    #[error("database was closed")]
483    Close,
484}
485
486#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
487pub enum DbErrorWriteNonRetriable {
488    #[error("validation failed: {0}")]
489    ValidationFailed(StrVariant),
490    #[error("illegal state: {0}")]
491    IllegalState(StrVariant),
492    #[error("version conflict: expected: {expected}, got: {requested}")]
493    VersionConflict {
494        expected: Version,
495        requested: Version,
496    },
497}
498
499/// Write error tied to an execution
500#[derive(Debug, Clone, thiserror::Error, PartialEq)]
501pub enum DbErrorWrite {
502    #[error("cannot write - row not found")]
503    NotFound,
504    #[error("non-retriable error: {0}")]
505    NonRetriable(#[from] DbErrorWriteNonRetriable),
506    #[error(transparent)]
507    Generic(#[from] DbErrorGeneric),
508}
509
510/// Read error tied to an execution
511#[derive(Debug, Clone, thiserror::Error, PartialEq)]
512pub enum DbErrorRead {
513    #[error("cannot read - row not found")]
514    NotFound,
515    #[error(transparent)]
516    Generic(#[from] DbErrorGeneric),
517}
518
519impl From<DbErrorRead> for DbErrorWrite {
520    fn from(value: DbErrorRead) -> DbErrorWrite {
521        match value {
522            DbErrorRead::NotFound => DbErrorWrite::NotFound,
523            DbErrorRead::Generic(err) => DbErrorWrite::Generic(err),
524        }
525    }
526}
527
528#[derive(Debug, thiserror::Error, PartialEq)]
529pub enum DbErrorReadWithTimeout {
530    #[error("timeout")]
531    Timeout,
532    #[error(transparent)]
533    DbErrorRead(#[from] DbErrorRead),
534}
535impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
536    fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
537        Self::from(DbErrorRead::from(value))
538    }
539}
540
541// Represents next version after successfuly appended to execution log.
542// TODO: Convert to struct with next_version
543pub type AppendResponse = Version;
544pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
545
546#[derive(Debug, Clone)]
547pub struct LockedExecution {
548    pub execution_id: ExecutionId,
549    pub next_version: Version,
550    pub metadata: ExecutionMetadata,
551    pub locked_event: Locked,
552    pub ffqn: FunctionFqn,
553    pub params: Params,
554    pub event_history: Vec<(HistoryEvent, Version)>,
555    pub responses: Vec<JoinSetResponseEventOuter>,
556    pub parent: Option<(ExecutionId, JoinSetId)>,
557    pub intermittent_event_count: u32,
558}
559
560pub type LockPendingResponse = Vec<LockedExecution>;
561pub type AppendBatchResponse = Version;
562
563#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
564#[display("{event}")]
565pub struct AppendRequest {
566    pub created_at: DateTime<Utc>,
567    pub event: ExecutionEventInner,
568}
569
570#[derive(Debug, Clone)]
571pub struct CreateRequest {
572    pub created_at: DateTime<Utc>,
573    pub execution_id: ExecutionId,
574    pub ffqn: FunctionFqn,
575    pub params: Params,
576    pub parent: Option<(ExecutionId, JoinSetId)>,
577    pub scheduled_at: DateTime<Utc>,
578    pub component_id: ComponentId,
579    pub metadata: ExecutionMetadata,
580    pub scheduled_by: Option<ExecutionId>,
581}
582
583impl From<CreateRequest> for ExecutionEventInner {
584    fn from(value: CreateRequest) -> Self {
585        Self::Created {
586            ffqn: value.ffqn,
587            params: value.params,
588            parent: value.parent,
589            scheduled_at: value.scheduled_at,
590            component_id: value.component_id,
591            metadata: value.metadata,
592            scheduled_by: value.scheduled_by,
593        }
594    }
595}
596
597#[async_trait]
598pub trait DbPool: Send + Sync {
599    fn connection(&self) -> Box<dyn DbConnection>;
600}
601
602#[async_trait]
603pub trait DbPoolCloseable {
604    async fn close(self);
605}
606
607#[derive(Clone, Debug)]
608pub struct AppendEventsToExecution {
609    pub execution_id: ExecutionId,
610    pub version: Version,
611    pub batch: Vec<AppendRequest>,
612}
613
614#[derive(Clone, Debug)]
615pub struct AppendResponseToExecution {
616    pub parent_execution_id: ExecutionId,
617    pub created_at: DateTime<Utc>,
618    pub join_set_id: JoinSetId,
619    pub child_execution_id: ExecutionIdDerived,
620    pub finished_version: Version,
621    pub result: SupportedFunctionReturnValue,
622}
623
624#[async_trait]
625pub trait DbExecutor: Send + Sync {
626    #[expect(clippy::too_many_arguments)]
627    async fn lock_pending(
628        &self,
629        batch_size: usize,
630        pending_at_or_sooner: DateTime<Utc>,
631        ffqns: Arc<[FunctionFqn]>,
632        created_at: DateTime<Utc>,
633        component_id: ComponentId,
634        executor_id: ExecutorId,
635        lock_expires_at: DateTime<Utc>,
636        run_id: RunId,
637        retry_config: ComponentRetryConfig,
638    ) -> Result<LockPendingResponse, DbErrorGeneric>;
639
640    /// Specialized locking for e.g. extending the lock by the original executor and run.
641    #[expect(clippy::too_many_arguments)]
642    async fn lock_one(
643        &self,
644        created_at: DateTime<Utc>,
645        component_id: ComponentId,
646        execution_id: &ExecutionId,
647        run_id: RunId,
648        version: Version,
649        executor_id: ExecutorId,
650        lock_expires_at: DateTime<Utc>,
651        retry_config: ComponentRetryConfig,
652    ) -> Result<LockedExecution, DbErrorWrite>;
653
654    /// Append a single event to an existing execution log.
655    /// The request cannot contain `ExecutionEventInner::Created`.
656    async fn append(
657        &self,
658        execution_id: ExecutionId,
659        version: Version,
660        req: AppendRequest,
661    ) -> Result<AppendResponse, DbErrorWrite>;
662
663    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
664    /// The batch cannot contain `ExecutionEventInner::Created`.
665    async fn append_batch_respond_to_parent(
666        &self,
667        events: AppendEventsToExecution,
668        response: AppendResponseToExecution,
669        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
670    ) -> Result<AppendBatchResponse, DbErrorWrite>;
671
672    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
673    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
674    /// Otherwise wait until `timeout_fut` resolves.
675    /// Timers that expire between `pending_at_or_sooner` and timeout can be disregarded.
676    async fn wait_for_pending(
677        &self,
678        pending_at_or_sooner: DateTime<Utc>,
679        ffqns: Arc<[FunctionFqn]>,
680        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
681    );
682}
683
684#[async_trait]
685pub trait DbConnection: DbExecutor {
686    #[cfg(feature = "test")]
687    async fn append_response(
688        &self,
689        created_at: DateTime<Utc>,
690        execution_id: ExecutionId,
691        response_event: JoinSetResponseEvent,
692    ) -> Result<(), DbErrorWrite>;
693
694    async fn append_delay_response(
695        &self,
696        created_at: DateTime<Utc>,
697        execution_id: ExecutionId,
698        join_set_id: JoinSetId,
699        delay_id: DelayId,
700        outcome: Result<(), ()>, // Successfully awaited by the watcher - `Ok(())` or cancelled - `Err(())`
701    ) -> Result<(), DbErrorWrite>;
702
703    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
704    /// The batch cannot contain `ExecutionEventInner::Created`.
705    async fn append_batch(
706        &self,
707        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
708        batch: Vec<AppendRequest>,
709        execution_id: ExecutionId,
710        version: Version,
711    ) -> Result<AppendBatchResponse, DbErrorWrite>;
712
713    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
714    /// The batch cannot contain `ExecutionEventInner::Created`.
715    async fn append_batch_create_new_execution(
716        &self,
717        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
718        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
719        execution_id: ExecutionId,
720        version: Version,
721        child_req: Vec<CreateRequest>,
722    ) -> Result<AppendBatchResponse, DbErrorWrite>;
723
724    #[cfg(feature = "test")]
725    /// Get execution log.
726    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
727
728    async fn list_execution_events(
729        &self,
730        execution_id: &ExecutionId,
731        since: &Version,
732        max_length: VersionType,
733        include_backtrace_id: bool,
734    ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
735
736    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
737    async fn get_execution_event(
738        &self,
739        execution_id: &ExecutionId,
740        version: &Version,
741    ) -> Result<ExecutionEvent, DbErrorRead>;
742
743    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
744    async fn get_last_execution_event(
745        &self,
746        execution_id: &ExecutionId,
747    ) -> Result<ExecutionEvent, DbErrorRead>;
748
749    async fn get_create_request(
750        &self,
751        execution_id: &ExecutionId,
752    ) -> Result<CreateRequest, DbErrorRead> {
753        let execution_event = self
754            .get_execution_event(execution_id, &Version::new(0))
755            .await?;
756        if let ExecutionEventInner::Created {
757            ffqn,
758            params,
759            parent,
760            scheduled_at,
761            component_id,
762            metadata,
763            scheduled_by,
764        } = execution_event.event
765        {
766            Ok(CreateRequest {
767                created_at: execution_event.created_at,
768                execution_id: execution_id.clone(),
769                ffqn,
770                params,
771                parent,
772                scheduled_at,
773                component_id,
774                metadata,
775                scheduled_by,
776            })
777        } else {
778            error!(%execution_id, "Execution log must start with creation");
779            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized(
780                "execution log must start with creation".into(),
781            )))
782        }
783    }
784
785    async fn get_finished_result(
786        &self,
787        execution_id: &ExecutionId,
788        finished: PendingStateFinished,
789    ) -> Result<Option<SupportedFunctionReturnValue>, DbErrorRead> {
790        let last_event = self
791            .get_execution_event(execution_id, &Version::new(finished.version))
792            .await?;
793        if let ExecutionEventInner::Finished { result, .. } = last_event.event {
794            Ok(Some(result))
795        } else {
796            Ok(None)
797        }
798    }
799
800    async fn get_pending_state(
801        &self,
802        execution_id: &ExecutionId,
803    ) -> Result<PendingState, DbErrorRead>;
804
805    /// Get currently expired locks and async timers (delay requests)
806    async fn get_expired_timers(
807        &self,
808        at: DateTime<Utc>,
809    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
810
811    /// Create a new execution log
812    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
813
814    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
815    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
816    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
817    /// Implementations with no pubsub support should use polling.
818    async fn subscribe_to_next_responses(
819        &self,
820        execution_id: &ExecutionId,
821        start_idx: usize,
822        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
823    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
824
825    /// Notification mechainism with no strict guarantees for getting the finished result.
826    /// Implementations with no pubsub support should use polling.
827    async fn wait_for_finished_result(
828        &self,
829        execution_id: &ExecutionId,
830        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
831    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
832
833    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
834
835    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
836
837    /// Used by gRPC only.
838    /// Get the latest backtrace if version is not set.
839    async fn get_backtrace(
840        &self,
841        execution_id: &ExecutionId,
842        filter: BacktraceFilter,
843    ) -> Result<BacktraceInfo, DbErrorRead>;
844
845    /// Returns executions sorted in descending order.
846    /// Used by gRPC only.
847    async fn list_executions(
848        &self,
849        ffqn: Option<FunctionFqn>,
850        top_level_only: bool,
851        pagination: ExecutionListPagination,
852    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
853
854    /// Returns responses of an execution ordered as they arrived,
855    /// enabling matching each `JoinNext` to its corresponding response.
856    /// Used by gRPC only.
857    async fn list_responses(
858        &self,
859        execution_id: &ExecutionId,
860        pagination: Pagination<u32>,
861    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
862}
863
864/// It is the responsibiilty of the caller to check that the execution belongs to an activity!
865pub async fn cancel_activity_with_retries(
866    db_connection: &dyn DbConnection,
867    child_execution_id_derived: &ExecutionIdDerived,
868    created_at: DateTime<Utc>,
869    mut retries: u8,
870) -> Result<CancelOutcome, DbErrorWrite> {
871    loop {
872        let res = cancel_activity(db_connection, child_execution_id_derived, created_at).await;
873        if res.is_ok() || retries == 0 {
874            return res;
875        }
876        retries -= 1;
877    }
878}
879
880#[derive(Clone, Copy, Debug, PartialEq, Eq)]
881pub enum CancelOutcome {
882    Success,
883    AlreadyFinished,
884}
885
886pub async fn cancel_delay(
887    db_connection: &dyn DbConnection,
888    delay_id: DelayId,
889    created_at: DateTime<Utc>,
890) -> Result<CancelOutcome, DbErrorWrite> {
891    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
892    let res = db_connection
893        .append_delay_response(
894            created_at,
895            parent_execution_id,
896            join_set_id,
897            delay_id,
898            Err(()), // Mark as cancelled.
899        )
900        .await;
901    match res {
902        Ok(()) => Ok(CancelOutcome::Success),
903        Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState(_))) => {
904            Ok(CancelOutcome::AlreadyFinished)
905        }
906        Err(err) => Err(err),
907    }
908}
909
910pub async fn cancel_activity(
911    db_connection: &dyn DbConnection,
912    child_execution_id_derived: &ExecutionIdDerived,
913    created_at: DateTime<Utc>,
914) -> Result<CancelOutcome, DbErrorWrite> {
915    debug!("Determining cancellation state of {child_execution_id_derived}");
916    let (parent_execution_id, join_set_id) = child_execution_id_derived.split_to_parts();
917    let child_execution_id = ExecutionId::Derived(child_execution_id_derived.clone());
918    let last_event = db_connection
919        .get_last_execution_event(&child_execution_id)
920        .await
921        .map_err(DbErrorWrite::from)?;
922    if matches!(last_event.event, ExecutionEventInner::Finished { .. }) {
923        debug!("Not cancelling, {child_execution_id_derived} is already finished");
924        Ok(CancelOutcome::AlreadyFinished)
925    } else {
926        let finished_version = last_event.version.increment();
927        debug!("Cancelling activity {child_execution_id_derived} at {finished_version}");
928        let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
929            reason: None,
930            kind: ExecutionFailureKind::Cancelled,
931            detail: None,
932        });
933        db_connection
934            .append_batch_respond_to_parent(
935                AppendEventsToExecution {
936                    execution_id: child_execution_id,
937                    version: finished_version.clone(),
938                    batch: vec![AppendRequest {
939                        created_at,
940                        event: ExecutionEventInner::Finished {
941                            result: child_result.clone(),
942                            http_client_traces: None,
943                        },
944                    }],
945                },
946                AppendResponseToExecution {
947                    parent_execution_id,
948                    created_at,
949                    join_set_id: join_set_id.clone(),
950                    child_execution_id: child_execution_id_derived.clone(),
951                    finished_version,
952                    result: child_result,
953                },
954                created_at,
955            )
956            .await?;
957        debug!("Cancelled {child_execution_id_derived}");
958        Ok(CancelOutcome::Success)
959    }
960}
961
962#[derive(Clone)]
963pub enum BacktraceFilter {
964    First,
965    Last,
966    Specific(Version),
967}
968
969pub struct BacktraceInfo {
970    pub execution_id: ExecutionId,
971    pub component_id: ComponentId,
972    pub version_min_including: Version,
973    pub version_max_excluding: Version,
974    pub wasm_backtrace: WasmBacktrace,
975}
976
977#[derive(Serialize, Deserialize, Debug, Clone)]
978pub struct WasmBacktrace {
979    pub frames: Vec<FrameInfo>,
980}
981
982#[derive(Serialize, Deserialize, Debug, Clone)]
983pub struct FrameInfo {
984    pub module: String,
985    pub func_name: String,
986    pub symbols: Vec<FrameSymbol>,
987}
988
989#[derive(Serialize, Deserialize, Debug, Clone)]
990pub struct FrameSymbol {
991    pub func_name: Option<String>,
992    pub file: Option<String>,
993    pub line: Option<u32>,
994    pub col: Option<u32>,
995}
996
997mod wasm_backtrace {
998    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
999
1000    impl WasmBacktrace {
1001        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1002            if backtrace.frames().is_empty() {
1003                None
1004            } else {
1005                Some(Self {
1006                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1007                })
1008            }
1009        }
1010    }
1011
1012    impl From<&wasmtime::FrameInfo> for FrameInfo {
1013        fn from(frame: &wasmtime::FrameInfo) -> Self {
1014            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1015            let mut func_name = String::new();
1016            wasmtime_environ::demangle_function_name_or_index(
1017                &mut func_name,
1018                frame.func_name(),
1019                frame.func_index() as usize,
1020            )
1021            .expect("writing to string must succeed");
1022            Self {
1023                module: module_name,
1024                func_name,
1025                symbols: frame
1026                    .symbols()
1027                    .iter()
1028                    .map(std::convert::Into::into)
1029                    .collect(),
1030            }
1031        }
1032    }
1033
1034    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1035        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1036            let func_name = symbol.name().map(|name| {
1037                let mut writer = String::new();
1038                wasmtime_environ::demangle_function_name(&mut writer, name)
1039                    .expect("writing to string must succeed");
1040                writer
1041            });
1042
1043            Self {
1044                func_name,
1045                file: symbol.file().map(ToString::to_string),
1046                line: symbol.line(),
1047                col: symbol.column(),
1048            }
1049        }
1050    }
1051}
1052
1053pub type ResponseCursorType = u32; // FIXME: Switch to u64
1054
1055#[derive(Debug, Clone)]
1056pub struct ResponseWithCursor {
1057    pub event: JoinSetResponseEventOuter,
1058    pub cursor: ResponseCursorType,
1059}
1060
1061#[derive(Debug)]
1062pub struct ExecutionWithState {
1063    pub execution_id: ExecutionId,
1064    pub ffqn: FunctionFqn,
1065    pub pending_state: PendingState,
1066    pub created_at: DateTime<Utc>,
1067    pub scheduled_at: DateTime<Utc>,
1068}
1069
1070pub enum ExecutionListPagination {
1071    CreatedBy(Pagination<Option<DateTime<Utc>>>),
1072    ExecutionId(Pagination<Option<ExecutionId>>),
1073}
1074
1075#[derive(Debug, Clone, Copy)]
1076pub enum Pagination<T> {
1077    NewerThan {
1078        length: ResponseCursorType,
1079        cursor: T,
1080        including_cursor: bool,
1081    },
1082    OlderThan {
1083        length: ResponseCursorType,
1084        cursor: T,
1085        including_cursor: bool,
1086    },
1087}
1088impl<T> Pagination<T> {
1089    pub fn length(&self) -> ResponseCursorType {
1090        match self {
1091            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1092        }
1093    }
1094    pub fn rel(&self) -> &'static str {
1095        match self {
1096            Pagination::NewerThan {
1097                including_cursor: false,
1098                ..
1099            } => ">",
1100            Pagination::NewerThan {
1101                including_cursor: true,
1102                ..
1103            } => ">=",
1104            Pagination::OlderThan {
1105                including_cursor: false,
1106                ..
1107            } => "<",
1108            Pagination::OlderThan {
1109                including_cursor: true,
1110                ..
1111            } => "<=",
1112        }
1113    }
1114    pub fn is_desc(&self) -> bool {
1115        matches!(self, Pagination::OlderThan { .. })
1116    }
1117}
1118
1119#[cfg(feature = "test")]
1120pub async fn wait_for_pending_state_fn<T: Debug>(
1121    db_connection: &dyn DbConnection,
1122    execution_id: &ExecutionId,
1123    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1124    timeout: Option<Duration>,
1125) -> Result<T, DbErrorReadWithTimeout> {
1126    tracing::trace!(%execution_id, "Waiting for predicate");
1127    let fut = async move {
1128        loop {
1129            let execution_log = db_connection.get(execution_id).await?;
1130            if let Some(t) = predicate(execution_log) {
1131                tracing::debug!(%execution_id, "Found: {t:?}");
1132                return Ok(t);
1133            }
1134            tokio::time::sleep(Duration::from_millis(10)).await;
1135        }
1136    };
1137
1138    if let Some(timeout) = timeout {
1139        tokio::select! { // future's liveness: Dropping the loser immediately.
1140            res = fut => res,
1141            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1142        }
1143    } else {
1144        fut.await
1145    }
1146}
1147
1148#[derive(Debug, Clone, PartialEq, Eq)]
1149pub enum ExpiredTimer {
1150    Lock(ExpiredLock),
1151    Delay(ExpiredDelay),
1152}
1153
1154#[derive(Debug, Clone, PartialEq, Eq)]
1155pub struct ExpiredLock {
1156    pub execution_id: ExecutionId,
1157    // Version of last `Locked` event, used to detect whether the execution made progress.
1158    pub locked_at_version: Version,
1159    pub next_version: Version,
1160    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1161    pub intermittent_event_count: u32,
1162    pub max_retries: u32,
1163    pub retry_exp_backoff: Duration,
1164    pub locked_by: LockedBy,
1165}
1166
1167#[derive(Debug, Clone, PartialEq, Eq)]
1168pub struct ExpiredDelay {
1169    pub execution_id: ExecutionId,
1170    pub join_set_id: JoinSetId,
1171    pub delay_id: DelayId,
1172}
1173
1174#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq)]
1175#[cfg_attr(feature = "test", derive(Serialize))]
1176#[cfg_attr(feature = "test", serde(tag = "type"))]
1177pub enum PendingState {
1178    Locked(PendingStateLocked),
1179    #[display("PendingAt(`{scheduled_at}`)")]
1180    PendingAt {
1181        scheduled_at: DateTime<Utc>,
1182        last_lock: Option<LockedBy>, // Needed for lock extension
1183    }, // e.g. created with a schedule, temporary timeout/failure
1184    #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1185    /// Caused by [`HistoryEvent::JoinNext`]
1186    BlockedByJoinSet {
1187        join_set_id: JoinSetId,
1188        /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1189        lock_expires_at: DateTime<Utc>,
1190        /// Blocked by closing of the join set
1191        closing: bool,
1192    },
1193    #[display("Finished({finished})")]
1194    Finished {
1195        finished: PendingStateFinished,
1196    },
1197}
1198
1199#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq)]
1200#[cfg_attr(feature = "test", derive(Serialize))]
1201#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1202pub struct PendingStateLocked {
1203    pub locked_by: LockedBy,
1204    pub lock_expires_at: DateTime<Utc>,
1205}
1206
1207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1208pub struct LockedBy {
1209    pub executor_id: ExecutorId,
1210    pub run_id: RunId,
1211}
1212
1213#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1214pub struct PendingStateFinished {
1215    pub version: VersionType, // not Version since it must be Copy
1216    pub finished_at: DateTime<Utc>,
1217    pub result_kind: PendingStateFinishedResultKind,
1218}
1219impl Display for PendingStateFinished {
1220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1221        match self.result_kind {
1222            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1223            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1224        }
1225    }
1226}
1227impl Display for SupportedFunctionReturnValue {
1228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1229        match self.as_pending_state_finished_result() {
1230            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1231            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1232        }
1233    }
1234}
1235
1236// This is not a Result so that it can be customized for serialization
1237#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1238#[serde(rename_all = "snake_case")]
1239pub enum PendingStateFinishedResultKind {
1240    Ok,
1241    Err(PendingStateFinishedError),
1242}
1243impl PendingStateFinishedResultKind {
1244    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1245        match self {
1246            PendingStateFinishedResultKind::Ok => Ok(()),
1247            PendingStateFinishedResultKind::Err(err) => Err(err),
1248        }
1249    }
1250}
1251
1252impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1253    fn from(result: &SupportedFunctionReturnValue) -> Self {
1254        result.as_pending_state_finished_result()
1255    }
1256}
1257
1258#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1259pub enum PendingStateFinishedError {
1260    #[display("execution failed: {_0}")]
1261    ExecutionFailure(ExecutionFailureKind),
1262    #[display("returned error variant")]
1263    FallibleError,
1264}
1265
1266impl PendingState {
1267    pub fn can_append_lock(
1268        &self,
1269        created_at: DateTime<Utc>,
1270        executor_id: ExecutorId,
1271        run_id: RunId,
1272        lock_expires_at: DateTime<Utc>,
1273    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1274        if lock_expires_at <= created_at {
1275            return Err(DbErrorWriteNonRetriable::ValidationFailed(
1276                "invalid expiry date".into(),
1277            ));
1278        }
1279        match self {
1280            PendingState::PendingAt {
1281                scheduled_at,
1282                last_lock,
1283            } => {
1284                if *scheduled_at <= created_at {
1285                    // pending now, ok to lock
1286                    Ok(LockKind::CreatingNewLock)
1287                } else if let Some(LockedBy {
1288                    executor_id: last_executor_id,
1289                    run_id: last_run_id,
1290                }) = last_lock
1291                    && executor_id == *last_executor_id
1292                    && run_id == *last_run_id
1293                {
1294                    // Original executor is extending the lock.
1295                    Ok(LockKind::Extending)
1296                } else {
1297                    Err(DbErrorWriteNonRetriable::ValidationFailed(
1298                        "cannot lock, not yet pending".into(),
1299                    ))
1300                }
1301            }
1302            PendingState::Locked(PendingStateLocked {
1303                locked_by:
1304                    LockedBy {
1305                        executor_id: current_pending_state_executor_id,
1306                        run_id: current_pending_state_run_id,
1307                    },
1308                lock_expires_at: _,
1309            }) => {
1310                if executor_id == *current_pending_state_executor_id
1311                    && run_id == *current_pending_state_run_id
1312                {
1313                    // Original executor is extending the lock.
1314                    Ok(LockKind::Extending)
1315                } else {
1316                    Err(DbErrorWriteNonRetriable::IllegalState(
1317                        "cannot lock, already locked".into(),
1318                    ))
1319                }
1320            }
1321            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1322                "cannot append Locked event when in BlockedByJoinSet state".into(),
1323            )),
1324            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1325                "already finished".into(),
1326            )),
1327        }
1328    }
1329
1330    #[must_use]
1331    pub fn is_finished(&self) -> bool {
1332        matches!(self, PendingState::Finished { .. })
1333    }
1334}
1335
1336#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1337pub enum LockKind {
1338    Extending,
1339    CreatingNewLock,
1340}
1341
1342pub mod http_client_trace {
1343    use chrono::{DateTime, Utc};
1344    use serde::{Deserialize, Serialize};
1345
1346    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1347    pub struct HttpClientTrace {
1348        pub req: RequestTrace,
1349        pub resp: Option<ResponseTrace>,
1350    }
1351
1352    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1353    pub struct RequestTrace {
1354        pub sent_at: DateTime<Utc>,
1355        pub uri: String,
1356        pub method: String,
1357    }
1358
1359    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1360    pub struct ResponseTrace {
1361        pub finished_at: DateTime<Utc>,
1362        pub status: Result<u16, String>,
1363    }
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368    use super::HistoryEventScheduleAt;
1369    use super::PendingStateFinished;
1370    use super::PendingStateFinishedError;
1371    use super::PendingStateFinishedResultKind;
1372    use crate::ExecutionFailureKind;
1373    use crate::SupportedFunctionReturnValue;
1374    use chrono::DateTime;
1375    use chrono::Datelike;
1376    use chrono::Utc;
1377    use insta::assert_snapshot;
1378    use rstest::rstest;
1379    use std::time::Duration;
1380    use val_json::type_wrapper::TypeWrapper;
1381    use val_json::wast_val::WastVal;
1382    use val_json::wast_val::WastValWithType;
1383
1384    #[rstest(expected => [
1385        PendingStateFinishedResultKind::Ok,
1386        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1387    ])]
1388    #[test]
1389    fn serde_pending_state_finished_result_kind_should_work(
1390        expected: PendingStateFinishedResultKind,
1391    ) {
1392        let ser = serde_json::to_string(&expected).unwrap();
1393        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1394        assert_eq!(expected, actual);
1395    }
1396
1397    #[rstest(result_kind => [
1398        PendingStateFinishedResultKind::Ok,
1399        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1400    ])]
1401    #[test]
1402    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1403        let expected = PendingStateFinished {
1404            version: 0,
1405            finished_at: Utc::now(),
1406            result_kind,
1407        };
1408
1409        let ser = serde_json::to_string(&expected).unwrap();
1410        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1411        assert_eq!(expected, actual);
1412    }
1413
1414    #[test]
1415    fn join_set_deser_with_result_ok_option_none_should_work() {
1416        let expected = SupportedFunctionReturnValue::Ok {
1417            ok: Some(WastValWithType {
1418                r#type: TypeWrapper::Result {
1419                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1420                    err: Some(Box::new(TypeWrapper::String)),
1421                },
1422                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1423            }),
1424        };
1425        let json = serde_json::to_string(&expected).unwrap();
1426        assert_snapshot!(json);
1427
1428        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1429
1430        assert_eq!(expected, actual);
1431    }
1432
1433    #[test]
1434    fn as_date_time_should_work_with_duration_u32_max_secs() {
1435        let duration = Duration::from_secs(u64::from(u32::MAX));
1436        let schedule_at = HistoryEventScheduleAt::In(duration);
1437        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1438        assert_eq!(2106, resolved.year());
1439    }
1440
1441    const MILLIS_PER_SEC: i64 = 1000;
1442    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1443
1444    #[test]
1445    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1446        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
1447        let duration = Duration::from_secs(
1448            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1449        );
1450        let schedule_at = HistoryEventScheduleAt::In(duration);
1451        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1452    }
1453}