obeli_sk_concepts/
storage.rs

1use crate::ClosingStrategy;
2use crate::ComponentId;
3use crate::ComponentRetryConfig;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FunctionFqn;
7use crate::JoinSetId;
8use crate::Params;
9use crate::StrVariant;
10use crate::SupportedFunctionReturnValue;
11use crate::prefixed_ulid::DelayId;
12use crate::prefixed_ulid::ExecutionIdDerived;
13use crate::prefixed_ulid::ExecutorId;
14use crate::prefixed_ulid::RunId;
15use assert_matches::assert_matches;
16use async_trait::async_trait;
17use chrono::TimeDelta;
18use chrono::{DateTime, Utc};
19use http_client_trace::HttpClientTrace;
20use serde::Deserialize;
21use serde::Serialize;
22use std::fmt::Debug;
23use std::fmt::Display;
24use std::pin::Pin;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::time::Duration;
28use strum::IntoStaticStr;
29use tracing::error;
30
31/// Remote client representation of the execution journal.
32#[derive(Debug, PartialEq, Eq)]
33#[cfg_attr(feature = "test", derive(Serialize))]
34pub struct ExecutionLog {
35    pub execution_id: ExecutionId,
36    pub events: Vec<ExecutionEvent>,
37    pub responses: Vec<JoinSetResponseEventOuter>,
38    pub next_version: Version, // Is not advanced once in Finished state
39    pub pending_state: PendingState,
40}
41
42impl ExecutionLog {
43    #[must_use]
44    pub fn can_be_retried_after(
45        temporary_event_count: u32,
46        max_retries: u32,
47        retry_exp_backoff: Duration,
48    ) -> Option<Duration> {
49        // If max_retries == u32::MAX, wrapping is OK after this succeeds - we want to retry forever.
50        if temporary_event_count <= max_retries {
51            // TODO: Add test for number of retries
52            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
53            Some(duration)
54        } else {
55            None
56        }
57    }
58
59    #[must_use]
60    pub fn compute_retry_duration_when_retrying_forever(
61        temporary_event_count: u32,
62        retry_exp_backoff: Duration,
63    ) -> Duration {
64        Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
65            .expect("`max_retries` set to MAX must never return None")
66    }
67
68    #[must_use]
69    pub fn ffqn(&self) -> &FunctionFqn {
70        assert_matches!(self.events.first(), Some(ExecutionEvent {
71            event: ExecutionEventInner::Created { ffqn, .. },
72            ..
73        }) => ffqn)
74    }
75
76    #[must_use]
77    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
78        assert_matches!(self.events.first(), Some(ExecutionEvent {
79            event: ExecutionEventInner::Created { parent, .. },
80            ..
81        }) => parent.clone())
82    }
83
84    #[must_use]
85    pub fn last_event(&self) -> &ExecutionEvent {
86        self.events.last().expect("must contain at least one event")
87    }
88
89    #[must_use]
90    pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
91        if let ExecutionEvent {
92            event: ExecutionEventInner::Finished { result, .. },
93            ..
94        } = self.events.pop().expect("must contain at least one event")
95        {
96            Some(result)
97        } else {
98            None
99        }
100    }
101
102    pub fn event_history(&self) -> impl Iterator<Item = HistoryEvent> + '_ {
103        self.events.iter().filter_map(|event| {
104            if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
105                Some(eh.clone())
106            } else {
107                None
108            }
109        })
110    }
111
112    #[cfg(feature = "test")]
113    #[must_use]
114    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
115        self.events
116            .iter()
117            .find_map(move |event| match &event.event {
118                ExecutionEventInner::HistoryEvent {
119                    event:
120                        HistoryEvent::JoinSetRequest {
121                            join_set_id: found,
122                            request,
123                        },
124                    ..
125                } if *join_set_id == *found => Some(request),
126                _ => None,
127            })
128    }
129}
130
131pub type VersionType = u32;
132#[derive(
133    Debug,
134    Default,
135    Clone,
136    PartialEq,
137    Eq,
138    Hash,
139    derive_more::Display,
140    derive_more::Into,
141    serde::Serialize,
142    serde::Deserialize,
143)]
144#[serde(transparent)]
145pub struct Version(pub VersionType);
146impl Version {
147    #[must_use]
148    pub fn new(arg: VersionType) -> Version {
149        Version(arg)
150    }
151
152    #[must_use]
153    pub fn increment(&self) -> Version {
154        Version(self.0 + 1)
155    }
156}
157
158#[derive(
159    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
160)]
161#[display("{event}")]
162pub struct ExecutionEvent {
163    pub created_at: DateTime<Utc>,
164    pub event: ExecutionEventInner,
165    pub backtrace_id: Option<Version>,
166}
167
168/// Moves the execution to [`PendingState::PendingNow`] if it is currently blocked on `JoinNextBlocking`.
169#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
170pub struct JoinSetResponseEventOuter {
171    pub created_at: DateTime<Utc>,
172    pub event: JoinSetResponseEvent,
173}
174
175#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
176pub struct JoinSetResponseEvent {
177    pub join_set_id: JoinSetId,
178    pub event: JoinSetResponse,
179}
180
181#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
182#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
183#[serde(tag = "type")]
184pub enum JoinSetResponse {
185    DelayFinished {
186        delay_id: DelayId,
187    },
188    ChildExecutionFinished {
189        child_execution_id: ExecutionIdDerived,
190        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
191        finished_version: Version,
192        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
193        result: SupportedFunctionReturnValue,
194    },
195}
196
197pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
198    ffqn: FunctionFqn::new_static("", ""),
199    params: Params::empty(),
200    parent: None,
201    scheduled_at: DateTime::from_timestamp_nanos(0),
202    component_id: ComponentId::dummy_activity(),
203    metadata: ExecutionMetadata::empty(),
204    scheduled_by: None,
205};
206pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
207    event: HistoryEvent::JoinSetCreate {
208        join_set_id: JoinSetId {
209            kind: crate::JoinSetKind::OneOff,
210            name: StrVariant::empty(),
211        },
212        closing_strategy: ClosingStrategy::Complete,
213    },
214};
215
216#[derive(
217    Clone,
218    derive_more::Debug,
219    derive_more::Display,
220    PartialEq,
221    Eq,
222    Serialize,
223    Deserialize,
224    IntoStaticStr,
225)]
226#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
227#[allow(clippy::large_enum_variant)]
228pub enum ExecutionEventInner {
229    #[display("Created({ffqn}, `{scheduled_at}`)")]
230    Created {
231        ffqn: FunctionFqn,
232        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
233        #[debug(skip)]
234        params: Params,
235        parent: Option<(ExecutionId, JoinSetId)>,
236        scheduled_at: DateTime<Utc>,
237        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
238        component_id: ComponentId,
239        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
240        metadata: ExecutionMetadata,
241        scheduled_by: Option<ExecutionId>,
242    },
243    Locked(Locked),
244    /// Returns execution to [`PendingState::PendingNow`] state
245    /// without timing out. This can happen when the executor is running
246    /// out of resources like [`WorkerError::LimitReached`] or when
247    /// the executor is shutting down.
248    #[display("Unlocked(`{backoff_expires_at}`)")]
249    Unlocked {
250        backoff_expires_at: DateTime<Utc>,
251        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
252        reason: StrVariant,
253    },
254    // Created by the executor holding the lock.
255    // After expiry interpreted as pending.
256    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
257    TemporarilyFailed {
258        backoff_expires_at: DateTime<Utc>,
259        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
260        reason_full: StrVariant,
261        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason inner")))]
262        reason_inner: StrVariant,
263        detail: Option<String>,
264        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
265        http_client_traces: Option<Vec<HttpClientTrace>>,
266    },
267    // Created by the executor holding the lock.
268    // After expiry interpreted as pending.
269    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
270    TemporarilyTimedOut {
271        backoff_expires_at: DateTime<Utc>,
272        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
273        http_client_traces: Option<Vec<HttpClientTrace>>,
274    },
275    // Created by the executor holding the lock.
276    #[display("Finished")]
277    Finished {
278        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
279        result: SupportedFunctionReturnValue,
280        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
281        http_client_traces: Option<Vec<HttpClientTrace>>,
282    },
283
284    #[display("HistoryEvent({event})")]
285    HistoryEvent {
286        event: HistoryEvent,
287    },
288}
289
290impl ExecutionEventInner {
291    #[must_use]
292    pub fn is_temporary_event(&self) -> bool {
293        matches!(
294            self,
295            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
296        )
297    }
298
299    #[must_use]
300    pub fn variant(&self) -> &'static str {
301        Into::<&'static str>::into(self)
302    }
303
304    #[must_use]
305    pub fn join_set_id(&self) -> Option<&JoinSetId> {
306        match self {
307            Self::Created {
308                parent: Some((_parent_id, join_set_id)),
309                ..
310            } => Some(join_set_id),
311            Self::HistoryEvent {
312                event:
313                    HistoryEvent::JoinSetCreate { join_set_id, .. }
314                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
315                    | HistoryEvent::JoinNext { join_set_id, .. },
316            } => Some(join_set_id),
317            _ => None,
318        }
319    }
320}
321
322#[derive(
323    Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
324)]
325#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
326#[display("Locked(`{lock_expires_at}`, {component_id})")]
327pub struct Locked {
328    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
329    pub component_id: ComponentId,
330    pub executor_id: ExecutorId,
331    pub run_id: RunId,
332    pub lock_expires_at: DateTime<Utc>,
333    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
334    pub retry_config: ComponentRetryConfig,
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
338#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
339#[serde(tag = "type")]
340pub enum PersistKind {
341    #[display("RandomU64({min}, {max_inclusive})")]
342    RandomU64 { min: u64, max_inclusive: u64 },
343    #[display("RandomString({min_length}, {max_length_exclusive})")]
344    RandomString {
345        min_length: u64,
346        max_length_exclusive: u64,
347    },
348}
349
350#[must_use]
351pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
352    value.to_be_bytes()
353}
354
355#[must_use]
356pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
357    u64::from_be_bytes(bytes)
358}
359
360#[derive(
361    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
362)]
363#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
364#[serde(tag = "type")]
365/// Must be created by the executor in [`PendingState::Locked`].
366pub enum HistoryEvent {
367    /// Persist a generated pseudorandom value.
368    #[display("Persist")]
369    Persist {
370        #[debug(skip)]
371        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
372        kind: PersistKind,
373    },
374    #[display("JoinSetCreate({join_set_id})")]
375    JoinSetCreate {
376        join_set_id: JoinSetId,
377        closing_strategy: ClosingStrategy,
378    },
379    #[display("JoinSetRequest({request})")]
380    // join_set_id is part of ExecutionId or DelayId in the `request`
381    JoinSetRequest {
382        join_set_id: JoinSetId,
383        request: JoinSetRequest,
384    },
385    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
386    /// When the response arrives at `resp_time`:
387    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
388    /// original executor can continue. After the expiry any executor can continue without
389    /// marking the execution as timed out.
390    #[display("JoinNext({join_set_id})")]
391    JoinNext {
392        join_set_id: JoinSetId,
393        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
394        /// The pending status will be kept in Locked state until `run_expires_at`.
395        run_expires_at: DateTime<Utc>,
396        /// Set to a specific function when calling `-await-next` extension function, used for
397        /// determinism checks.
398        requested_ffqn: Option<FunctionFqn>,
399        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
400        closing: bool,
401    },
402    /// Records the fact that a join set was awaited more times than its submission count.
403    #[display("JoinNextTooMany({join_set_id})")]
404    JoinNextTooMany {
405        join_set_id: JoinSetId,
406        /// Set to a specific function when calling `-await-next` extension function, used for
407        /// determinism checks.
408        requested_ffqn: Option<FunctionFqn>,
409    },
410    #[display("Schedule({execution_id}, {schedule_at})")]
411    Schedule {
412        execution_id: ExecutionId,
413        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
414    },
415    #[display("Stub({target_execution_id})")]
416    Stub {
417        target_execution_id: ExecutionIdDerived,
418        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
419        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
420        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
421    },
422}
423
424#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
425#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
426pub enum HistoryEventScheduleAt {
427    Now,
428    #[display("At(`{_0}`)")]
429    At(DateTime<Utc>),
430    #[display("In({_0:?})")]
431    In(Duration),
432}
433
434#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
435pub enum ScheduleAtConversionError {
436    #[error("source duration value is out of range")]
437    OutOfRangeError,
438}
439
440impl HistoryEventScheduleAt {
441    pub fn as_date_time(
442        &self,
443        now: DateTime<Utc>,
444    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
445        match self {
446            Self::Now => Ok(now),
447            Self::At(date_time) => Ok(*date_time),
448            Self::In(duration) => {
449                let time_delta = TimeDelta::from_std(*duration)
450                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
451                now.checked_add_signed(time_delta)
452                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
453            }
454        }
455    }
456}
457
458#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
459#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
460#[serde(tag = "type")]
461pub enum JoinSetRequest {
462    // Must be created by the executor in `PendingState::Locked`.
463    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
464    DelayRequest {
465        delay_id: DelayId,
466        expires_at: DateTime<Utc>,
467        schedule_at: HistoryEventScheduleAt,
468    },
469    // Must be created by the executor in `PendingState::Locked`.
470    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
471    ChildExecutionRequest {
472        child_execution_id: ExecutionIdDerived,
473        target_ffqn: FunctionFqn,
474        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
475        params: Params,
476    },
477}
478
479/// Error that is not specific to an execution.
480#[derive(Debug, Clone, thiserror::Error, PartialEq)]
481pub enum DbErrorGeneric {
482    #[error("database error: {0}")]
483    Uncategorized(StrVariant),
484    #[error("database was closed")]
485    Close,
486}
487
488#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
489pub enum DbErrorWriteNonRetriable {
490    #[error("validation failed: {0}")]
491    ValidationFailed(StrVariant),
492    #[error("illegal state: {0}")]
493    IllegalState(StrVariant),
494    #[error("version conflict: {0}")]
495    VersionConflict(Version),
496}
497
498/// Write error tied to an execution
499#[derive(Debug, Clone, thiserror::Error, PartialEq)]
500pub enum DbErrorWrite {
501    #[error("cannot write - row not found")]
502    NotFound,
503    #[error("non-retriable error: {0}")]
504    NonRetriable(#[from] DbErrorWriteNonRetriable),
505    #[error(transparent)]
506    Generic(#[from] DbErrorGeneric),
507}
508
509/// Read error tied to an execution
510#[derive(Debug, Clone, thiserror::Error, PartialEq)]
511pub enum DbErrorRead {
512    #[error("cannot read - row not found")]
513    NotFound,
514    #[error(transparent)]
515    Generic(#[from] DbErrorGeneric),
516}
517
518impl From<DbErrorRead> for DbErrorWrite {
519    fn from(value: DbErrorRead) -> DbErrorWrite {
520        match value {
521            DbErrorRead::NotFound => DbErrorWrite::NotFound,
522            DbErrorRead::Generic(err) => DbErrorWrite::Generic(err),
523        }
524    }
525}
526
527#[derive(Debug, thiserror::Error, PartialEq)]
528pub enum DbErrorReadWithTimeout {
529    #[error("timeout")]
530    Timeout,
531    #[error(transparent)]
532    DbErrorRead(#[from] DbErrorRead),
533}
534impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
535    fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
536        Self::from(DbErrorRead::from(value))
537    }
538}
539
540// Represents next version after successfuly appended to execution log.
541// TODO: Convert to struct with next_version
542pub type AppendResponse = Version;
543pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
544
545#[derive(Debug, Clone)]
546pub struct LockedExecution {
547    pub execution_id: ExecutionId,
548    pub next_version: Version,
549    pub metadata: ExecutionMetadata,
550    pub locked_event: Locked,
551    pub ffqn: FunctionFqn,
552    pub params: Params,
553    pub event_history: Vec<HistoryEvent>, // FIXME: Add version
554    pub responses: Vec<JoinSetResponseEventOuter>,
555    pub parent: Option<(ExecutionId, JoinSetId)>,
556    pub intermittent_event_count: u32,
557}
558
559pub type LockPendingResponse = Vec<LockedExecution>;
560pub type AppendBatchResponse = Version;
561
562#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
563#[display("{event}")]
564pub struct AppendRequest {
565    pub created_at: DateTime<Utc>,
566    pub event: ExecutionEventInner,
567}
568
569#[derive(Debug, Clone)]
570pub struct CreateRequest {
571    pub created_at: DateTime<Utc>,
572    pub execution_id: ExecutionId,
573    pub ffqn: FunctionFqn,
574    pub params: Params,
575    pub parent: Option<(ExecutionId, JoinSetId)>,
576    pub scheduled_at: DateTime<Utc>,
577    pub component_id: ComponentId,
578    pub metadata: ExecutionMetadata,
579    pub scheduled_by: Option<ExecutionId>,
580}
581
582impl From<CreateRequest> for ExecutionEventInner {
583    fn from(value: CreateRequest) -> Self {
584        Self::Created {
585            ffqn: value.ffqn,
586            params: value.params,
587            parent: value.parent,
588            scheduled_at: value.scheduled_at,
589            component_id: value.component_id,
590            metadata: value.metadata,
591            scheduled_by: value.scheduled_by,
592        }
593    }
594}
595
596#[async_trait]
597pub trait DbPool: Send + Sync {
598    fn connection(&self) -> Box<dyn DbConnection>;
599}
600
601#[async_trait]
602pub trait DbPoolCloseable {
603    async fn close(self);
604}
605
606#[derive(Clone, Debug)]
607pub struct AppendEventsToExecution {
608    pub execution_id: ExecutionId,
609    pub version: Version,
610    pub batch: Vec<AppendRequest>,
611}
612
613#[derive(Clone, Debug)]
614pub struct AppendResponseToExecution {
615    pub parent_execution_id: ExecutionId,
616    pub parent_response_event: JoinSetResponseEventOuter,
617}
618
619#[async_trait]
620pub trait DbExecutor: Send + Sync {
621    #[expect(clippy::too_many_arguments)]
622    async fn lock_pending(
623        &self,
624        batch_size: usize,
625        pending_at_or_sooner: DateTime<Utc>,
626        ffqns: Arc<[FunctionFqn]>,
627        created_at: DateTime<Utc>,
628        component_id: ComponentId,
629        executor_id: ExecutorId,
630        lock_expires_at: DateTime<Utc>,
631        run_id: RunId,
632        retry_config: ComponentRetryConfig,
633    ) -> Result<LockPendingResponse, DbErrorGeneric>;
634
635    /// Specialized locking for e.g. extending the lock by the original executor and run.
636    #[expect(clippy::too_many_arguments)]
637    async fn lock_one(
638        &self,
639        created_at: DateTime<Utc>,
640        component_id: ComponentId,
641        execution_id: &ExecutionId,
642        run_id: RunId,
643        version: Version,
644        executor_id: ExecutorId,
645        lock_expires_at: DateTime<Utc>,
646        retry_config: ComponentRetryConfig,
647    ) -> Result<LockedExecution, DbErrorWrite>;
648
649    /// Append a single event to an existing execution log.
650    /// The request cannot contain `ExecutionEventInner::Created`.
651    async fn append(
652        &self,
653        execution_id: ExecutionId,
654        version: Version,
655        req: AppendRequest,
656    ) -> Result<AppendResponse, DbErrorWrite>;
657
658    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
659    /// The batch cannot contain `ExecutionEventInner::Created`.
660    async fn append_batch_respond_to_parent(
661        &self,
662        events: AppendEventsToExecution,
663        response: AppendResponseToExecution,
664        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
665    ) -> Result<AppendBatchResponse, DbErrorWrite>;
666
667    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
668    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
669    /// Otherwise wait until `timeout_fut` resolves.
670    /// Timers that expire between `pending_at_or_sooner` and timeout can be disregarded.
671    async fn wait_for_pending(
672        &self,
673        pending_at_or_sooner: DateTime<Utc>,
674        ffqns: Arc<[FunctionFqn]>,
675        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
676    );
677}
678
679#[async_trait]
680pub trait DbConnection: DbExecutor {
681    async fn append_response(
682        &self,
683        created_at: DateTime<Utc>,
684        execution_id: ExecutionId,
685        response_event: JoinSetResponseEvent,
686    ) -> Result<(), DbErrorWrite>;
687
688    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
689    /// The batch cannot contain `ExecutionEventInner::Created`.
690    async fn append_batch(
691        &self,
692        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
693        batch: Vec<AppendRequest>,
694        execution_id: ExecutionId,
695        version: Version,
696    ) -> Result<AppendBatchResponse, DbErrorWrite>;
697
698    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
699    /// The batch cannot contain `ExecutionEventInner::Created`.
700    async fn append_batch_create_new_execution(
701        &self,
702        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
703        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
704        execution_id: ExecutionId,
705        version: Version,
706        child_req: Vec<CreateRequest>,
707    ) -> Result<AppendBatchResponse, DbErrorWrite>;
708
709    #[cfg(feature = "test")]
710    /// Get execution log.
711    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
712
713    async fn list_execution_events(
714        &self,
715        execution_id: &ExecutionId,
716        since: &Version,
717        max_length: VersionType,
718        include_backtrace_id: bool,
719    ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
720
721    /// Get a single event without `backtrace_id`
722    async fn get_execution_event(
723        &self,
724        execution_id: &ExecutionId,
725        version: &Version,
726    ) -> Result<ExecutionEvent, DbErrorRead>;
727
728    async fn get_create_request(
729        &self,
730        execution_id: &ExecutionId,
731    ) -> Result<CreateRequest, DbErrorRead> {
732        let execution_event = self
733            .get_execution_event(execution_id, &Version::new(0))
734            .await?;
735        if let ExecutionEventInner::Created {
736            ffqn,
737            params,
738            parent,
739            scheduled_at,
740            component_id,
741            metadata,
742            scheduled_by,
743        } = execution_event.event
744        {
745            Ok(CreateRequest {
746                created_at: execution_event.created_at,
747                execution_id: execution_id.clone(),
748                ffqn,
749                params,
750                parent,
751                scheduled_at,
752                component_id,
753                metadata,
754                scheduled_by,
755            })
756        } else {
757            error!(%execution_id, "Execution log must start with creation");
758            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized(
759                "execution log must start with creation".into(),
760            )))
761        }
762    }
763
764    async fn get_finished_result(
765        &self,
766        execution_id: &ExecutionId,
767        finished: PendingStateFinished,
768    ) -> Result<Option<SupportedFunctionReturnValue>, DbErrorRead> {
769        let last_event = self
770            .get_execution_event(execution_id, &Version::new(finished.version))
771            .await?;
772        if let ExecutionEventInner::Finished { result, .. } = last_event.event {
773            Ok(Some(result))
774        } else {
775            Ok(None)
776        }
777    }
778
779    async fn get_pending_state(
780        &self,
781        execution_id: &ExecutionId,
782    ) -> Result<PendingState, DbErrorRead>;
783
784    /// Get currently expired locks and async timers (delay requests)
785    async fn get_expired_timers(
786        &self,
787        at: DateTime<Utc>,
788    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
789
790    /// Create a new execution log
791    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
792
793    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
794    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
795    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
796    /// Implementations with no pubsub support should use polling.
797    async fn subscribe_to_next_responses(
798        &self,
799        execution_id: &ExecutionId,
800        start_idx: usize,
801        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
802    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
803
804    /// Notification mechainism with no strict guarantees for getting the finished result.
805    /// Implementations with no pubsub support should use polling.
806    async fn wait_for_finished_result(
807        &self,
808        execution_id: &ExecutionId,
809        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
810    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
811
812    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
813
814    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
815
816    /// Used by gRPC only.
817    /// Get the latest backtrace if version is not set.
818    async fn get_backtrace(
819        &self,
820        execution_id: &ExecutionId,
821        filter: BacktraceFilter,
822    ) -> Result<BacktraceInfo, DbErrorRead>;
823
824    /// Returns executions sorted in descending order.
825    /// Used by gRPC only.
826    async fn list_executions(
827        &self,
828        ffqn: Option<FunctionFqn>,
829        top_level_only: bool,
830        pagination: ExecutionListPagination,
831    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
832
833    /// Returns responses of an execution ordered as they arrived,
834    /// enabling matching each `JoinNext` to its corresponding response.
835    /// Used by gRPC only.
836    async fn list_responses(
837        &self,
838        execution_id: &ExecutionId,
839        pagination: Pagination<u32>,
840    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
841}
842
843#[derive(Clone)]
844pub enum BacktraceFilter {
845    First,
846    Last,
847    Specific(Version),
848}
849
850pub struct BacktraceInfo {
851    pub execution_id: ExecutionId,
852    pub component_id: ComponentId,
853    pub version_min_including: Version,
854    pub version_max_excluding: Version,
855    pub wasm_backtrace: WasmBacktrace,
856}
857
858#[derive(Serialize, Deserialize, Debug, Clone)]
859pub struct WasmBacktrace {
860    pub frames: Vec<FrameInfo>,
861}
862
863#[derive(Serialize, Deserialize, Debug, Clone)]
864pub struct FrameInfo {
865    pub module: String,
866    pub func_name: String,
867    pub symbols: Vec<FrameSymbol>,
868}
869
870#[derive(Serialize, Deserialize, Debug, Clone)]
871pub struct FrameSymbol {
872    pub func_name: Option<String>,
873    pub file: Option<String>,
874    pub line: Option<u32>,
875    pub col: Option<u32>,
876}
877
878mod wasm_backtrace {
879    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
880
881    impl WasmBacktrace {
882        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
883            if backtrace.frames().is_empty() {
884                None
885            } else {
886                Some(Self {
887                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
888                })
889            }
890        }
891    }
892
893    impl From<&wasmtime::FrameInfo> for FrameInfo {
894        fn from(frame: &wasmtime::FrameInfo) -> Self {
895            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
896            let mut func_name = String::new();
897            wasmtime_environ::demangle_function_name_or_index(
898                &mut func_name,
899                frame.func_name(),
900                frame.func_index() as usize,
901            )
902            .expect("writing to string must succeed");
903            Self {
904                module: module_name,
905                func_name,
906                symbols: frame
907                    .symbols()
908                    .iter()
909                    .map(std::convert::Into::into)
910                    .collect(),
911            }
912        }
913    }
914
915    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
916        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
917            let func_name = symbol.name().map(|name| {
918                let mut writer = String::new();
919                wasmtime_environ::demangle_function_name(&mut writer, name)
920                    .expect("writing to string must succeed");
921                writer
922            });
923
924            Self {
925                func_name,
926                file: symbol.file().map(ToString::to_string),
927                line: symbol.line(),
928                col: symbol.column(),
929            }
930        }
931    }
932}
933
934pub type ResponseCursorType = u32; // FIXME: Switch to u64
935
936#[derive(Debug, Clone)]
937pub struct ResponseWithCursor {
938    pub event: JoinSetResponseEventOuter,
939    pub cursor: ResponseCursorType,
940}
941
942#[derive(Debug)]
943pub struct ExecutionWithState {
944    pub execution_id: ExecutionId,
945    pub ffqn: FunctionFqn,
946    pub pending_state: PendingState,
947    pub created_at: DateTime<Utc>,
948    pub scheduled_at: DateTime<Utc>,
949}
950
951pub enum ExecutionListPagination {
952    CreatedBy(Pagination<Option<DateTime<Utc>>>),
953    ExecutionId(Pagination<Option<ExecutionId>>),
954}
955
956#[derive(Debug, Clone, Copy)]
957pub enum Pagination<T> {
958    NewerThan {
959        length: ResponseCursorType,
960        cursor: T,
961        including_cursor: bool,
962    },
963    OlderThan {
964        length: ResponseCursorType,
965        cursor: T,
966        including_cursor: bool,
967    },
968}
969impl<T> Pagination<T> {
970    pub fn length(&self) -> ResponseCursorType {
971        match self {
972            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
973        }
974    }
975    pub fn rel(&self) -> &'static str {
976        match self {
977            Pagination::NewerThan {
978                including_cursor: false,
979                ..
980            } => ">",
981            Pagination::NewerThan {
982                including_cursor: true,
983                ..
984            } => ">=",
985            Pagination::OlderThan {
986                including_cursor: false,
987                ..
988            } => "<",
989            Pagination::OlderThan {
990                including_cursor: true,
991                ..
992            } => "<=",
993        }
994    }
995    pub fn is_desc(&self) -> bool {
996        matches!(self, Pagination::OlderThan { .. })
997    }
998}
999
1000#[cfg(feature = "test")]
1001pub async fn wait_for_pending_state_fn<T: Debug>(
1002    db_connection: &dyn DbConnection,
1003    execution_id: &ExecutionId,
1004    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1005    timeout: Option<Duration>,
1006) -> Result<T, DbErrorReadWithTimeout> {
1007    tracing::trace!(%execution_id, "Waiting for predicate");
1008    let fut = async move {
1009        loop {
1010            let execution_log = db_connection.get(execution_id).await?;
1011            if let Some(t) = predicate(execution_log) {
1012                tracing::debug!(%execution_id, "Found: {t:?}");
1013                return Ok(t);
1014            }
1015            tokio::time::sleep(Duration::from_millis(10)).await;
1016        }
1017    };
1018
1019    if let Some(timeout) = timeout {
1020        tokio::select! { // future's liveness: Dropping the loser immediately.
1021            res = fut => res,
1022            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1023        }
1024    } else {
1025        fut.await
1026    }
1027}
1028
1029#[derive(Debug, Clone, PartialEq, Eq)]
1030pub enum ExpiredTimer {
1031    Lock(ExpiredLock),
1032    Delay(ExpiredDelay),
1033}
1034
1035#[derive(Debug, Clone, PartialEq, Eq)]
1036pub struct ExpiredLock {
1037    pub execution_id: ExecutionId,
1038    // Version of last `Locked` event, used to detect whether the execution made progress.
1039    pub locked_at_version: Version,
1040    pub next_version: Version,
1041    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1042    pub intermittent_event_count: u32,
1043    pub max_retries: u32,
1044    pub retry_exp_backoff: Duration,
1045    pub locked_by: LockedBy,
1046}
1047
1048#[derive(Debug, Clone, PartialEq, Eq)]
1049pub struct ExpiredDelay {
1050    pub execution_id: ExecutionId,
1051    pub join_set_id: JoinSetId,
1052    pub delay_id: DelayId,
1053}
1054
1055#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq)]
1056#[cfg_attr(feature = "test", derive(Serialize))]
1057#[cfg_attr(feature = "test", serde(tag = "type"))]
1058pub enum PendingState {
1059    Locked(PendingStateLocked),
1060    #[display("PendingAt(`{scheduled_at}`)")]
1061    PendingAt {
1062        scheduled_at: DateTime<Utc>,
1063        last_lock: Option<LockedBy>, // Needed for lock extension
1064    }, // e.g. created with a schedule, temporary timeout/failure
1065    #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1066    /// Caused by [`HistoryEvent::JoinNext`]
1067    BlockedByJoinSet {
1068        join_set_id: JoinSetId,
1069        /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1070        lock_expires_at: DateTime<Utc>,
1071        /// Blocked by closing of the join set
1072        closing: bool,
1073    },
1074    #[display("Finished({finished})")]
1075    Finished {
1076        finished: PendingStateFinished,
1077    },
1078}
1079
1080#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq)]
1081#[cfg_attr(feature = "test", derive(Serialize))]
1082#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1083pub struct PendingStateLocked {
1084    pub locked_by: LockedBy,
1085    pub lock_expires_at: DateTime<Utc>,
1086}
1087
1088#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1089pub struct LockedBy {
1090    pub executor_id: ExecutorId,
1091    pub run_id: RunId,
1092}
1093
1094#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1095#[display("{result_kind}")]
1096pub struct PendingStateFinished {
1097    pub version: VersionType, // not Version since it must be Copy
1098    pub finished_at: DateTime<Utc>,
1099    pub result_kind: PendingStateFinishedResultKind,
1100}
1101
1102#[derive(
1103    Debug, Clone, Copy, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
1104)]
1105pub struct PendingStateFinishedResultKind(pub Result<(), PendingStateFinishedError>);
1106const OK_VARIANT: &str = "ok";
1107impl FromStr for PendingStateFinishedResultKind {
1108    type Err = strum::ParseError;
1109
1110    fn from_str(s: &str) -> Result<Self, Self::Err> {
1111        if s == OK_VARIANT {
1112            Ok(PendingStateFinishedResultKind(Ok(())))
1113        } else {
1114            let err = PendingStateFinishedError::from_str(s)?;
1115            Ok(PendingStateFinishedResultKind(Err(err)))
1116        }
1117    }
1118}
1119
1120impl Display for PendingStateFinishedResultKind {
1121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1122        match self.0 {
1123            Ok(()) => write!(f, "{OK_VARIANT}"),
1124            Err(err) => write!(f, "{err}"),
1125        }
1126    }
1127}
1128
1129impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1130    fn from(result: &SupportedFunctionReturnValue) -> Self {
1131        result.as_pending_state_finished_result()
1132    }
1133}
1134
1135#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumString, strum::Display)]
1136#[cfg_attr(test, derive(strum::VariantArray))]
1137#[strum(serialize_all = "snake_case")]
1138pub enum PendingStateFinishedError {
1139    Timeout,
1140    ExecutionFailure,
1141    FallibleError,
1142}
1143
1144impl PendingState {
1145    pub fn can_append_lock(
1146        &self,
1147        created_at: DateTime<Utc>,
1148        executor_id: ExecutorId,
1149        run_id: RunId,
1150        lock_expires_at: DateTime<Utc>,
1151    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1152        if lock_expires_at <= created_at {
1153            return Err(DbErrorWriteNonRetriable::ValidationFailed(
1154                "invalid expiry date".into(),
1155            ));
1156        }
1157        match self {
1158            PendingState::PendingAt {
1159                scheduled_at,
1160                last_lock,
1161            } => {
1162                if *scheduled_at <= created_at {
1163                    // pending now, ok to lock
1164                    Ok(LockKind::CreatingNewLock)
1165                } else if let Some(LockedBy {
1166                    executor_id: last_executor_id,
1167                    run_id: last_run_id,
1168                }) = last_lock
1169                    && executor_id == *last_executor_id
1170                    && run_id == *last_run_id
1171                {
1172                    // Original executor is extending the lock.
1173                    Ok(LockKind::Extending)
1174                } else {
1175                    Err(DbErrorWriteNonRetriable::ValidationFailed(
1176                        "cannot lock, not yet pending".into(),
1177                    ))
1178                }
1179            }
1180            PendingState::Locked(PendingStateLocked {
1181                locked_by:
1182                    LockedBy {
1183                        executor_id: current_pending_state_executor_id,
1184                        run_id: current_pending_state_run_id,
1185                    },
1186                lock_expires_at: _,
1187            }) => {
1188                if executor_id == *current_pending_state_executor_id
1189                    && run_id == *current_pending_state_run_id
1190                {
1191                    // Original executor is extending the lock.
1192                    Ok(LockKind::Extending)
1193                } else {
1194                    Err(DbErrorWriteNonRetriable::IllegalState(
1195                        "cannot lock, already locked".into(),
1196                    ))
1197                }
1198            }
1199            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1200                "cannot append Locked event when in BlockedByJoinSet state".into(),
1201            )),
1202            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1203                "already finished".into(),
1204            )),
1205        }
1206    }
1207
1208    #[must_use]
1209    pub fn is_finished(&self) -> bool {
1210        matches!(self, PendingState::Finished { .. })
1211    }
1212}
1213
1214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1215pub enum LockKind {
1216    Extending,
1217    CreatingNewLock,
1218}
1219
1220pub mod http_client_trace {
1221    use chrono::{DateTime, Utc};
1222    use serde::{Deserialize, Serialize};
1223
1224    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1225    pub struct HttpClientTrace {
1226        pub req: RequestTrace,
1227        pub resp: Option<ResponseTrace>,
1228    }
1229
1230    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1231    pub struct RequestTrace {
1232        pub sent_at: DateTime<Utc>,
1233        pub uri: String,
1234        pub method: String,
1235    }
1236
1237    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1238    pub struct ResponseTrace {
1239        pub finished_at: DateTime<Utc>,
1240        pub status: Result<u16, String>,
1241    }
1242}
1243
1244#[cfg(test)]
1245mod tests {
1246    use super::HistoryEventScheduleAt;
1247    use super::PendingStateFinished;
1248    use super::PendingStateFinishedError;
1249    use super::PendingStateFinishedResultKind;
1250    use crate::SupportedFunctionReturnValue;
1251    use chrono::DateTime;
1252    use chrono::Datelike;
1253    use chrono::Utc;
1254    use insta::assert_snapshot;
1255    use rstest::rstest;
1256    use std::time::Duration;
1257    use strum::VariantArray as _;
1258    use val_json::type_wrapper::TypeWrapper;
1259    use val_json::wast_val::WastVal;
1260    use val_json::wast_val::WastValWithType;
1261
1262    #[rstest(expected => [
1263        PendingStateFinishedResultKind(Result::Ok(())),
1264        PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1265    ])]
1266    #[test]
1267    fn serde_pending_state_finished_result_kind_should_work(
1268        expected: PendingStateFinishedResultKind,
1269    ) {
1270        let ser = serde_json::to_string(&expected).unwrap();
1271        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1272        assert_eq!(expected, actual);
1273    }
1274
1275    #[rstest(result_kind => [
1276        PendingStateFinishedResultKind(Result::Ok(())),
1277        PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1278    ])]
1279    #[test]
1280    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1281        let expected = PendingStateFinished {
1282            version: 0,
1283            finished_at: Utc::now(),
1284            result_kind,
1285        };
1286
1287        let ser = serde_json::to_string(&expected).unwrap();
1288        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1289        assert_eq!(expected, actual);
1290    }
1291
1292    #[test]
1293    fn join_set_deser_with_result_ok_option_none_should_work() {
1294        let expected = SupportedFunctionReturnValue::Ok {
1295            ok: Some(WastValWithType {
1296                r#type: TypeWrapper::Result {
1297                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1298                    err: Some(Box::new(TypeWrapper::String)),
1299                },
1300                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1301            }),
1302        };
1303        let json = serde_json::to_string(&expected).unwrap();
1304        assert_snapshot!(json);
1305
1306        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1307
1308        assert_eq!(expected, actual);
1309    }
1310
1311    #[test]
1312    fn verify_pending_state_finished_result_kind_serde() {
1313        let variants: Vec<_> = PendingStateFinishedError::VARIANTS
1314            .iter()
1315            .map(|var| PendingStateFinishedResultKind(Err(*var)))
1316            .chain(std::iter::once(PendingStateFinishedResultKind(Ok(()))))
1317            .collect();
1318        let ser = serde_json::to_string_pretty(&variants).unwrap();
1319        insta::assert_snapshot!(ser);
1320        let deser: Vec<PendingStateFinishedResultKind> = serde_json::from_str(&ser).unwrap();
1321        assert_eq!(variants, deser);
1322    }
1323
1324    #[test]
1325    fn as_date_time_should_work_with_duration_u32_max_secs() {
1326        let duration = Duration::from_secs(u64::from(u32::MAX));
1327        let schedule_at = HistoryEventScheduleAt::In(duration);
1328        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1329        assert_eq!(2106, resolved.year());
1330    }
1331
1332    const MILLIS_PER_SEC: i64 = 1000;
1333    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1334
1335    #[test]
1336    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1337        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
1338        let duration = Duration::from_secs(
1339            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1340        );
1341        let schedule_at = HistoryEventScheduleAt::In(duration);
1342        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1343    }
1344}