obeli_sk_concepts/
storage.rs

1use crate::ClosingStrategy;
2use crate::ComponentId;
3use crate::ExecutionId;
4use crate::ExecutionMetadata;
5use crate::FunctionFqn;
6use crate::JoinSetId;
7use crate::Params;
8use crate::StrVariant;
9use crate::SupportedFunctionReturnValue;
10use crate::prefixed_ulid::DelayId;
11use crate::prefixed_ulid::ExecutionIdDerived;
12use crate::prefixed_ulid::ExecutorId;
13use crate::prefixed_ulid::RunId;
14use assert_matches::assert_matches;
15use async_trait::async_trait;
16use chrono::TimeDelta;
17use chrono::{DateTime, Utc};
18use http_client_trace::HttpClientTrace;
19use serde::Deserialize;
20use serde::Serialize;
21use std::fmt::Debug;
22use std::fmt::Display;
23use std::pin::Pin;
24use std::str::FromStr;
25use std::sync::Arc;
26use std::time::Duration;
27use strum::IntoStaticStr;
28use tracing::error;
29
30/// Remote client representation of the execution journal.
31#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
32pub struct ExecutionLog {
33    pub execution_id: ExecutionId,
34    pub events: Vec<ExecutionEvent>,
35    pub responses: Vec<JoinSetResponseEventOuter>,
36    pub next_version: Version, // Is not advanced once in Finished state
37    pub pending_state: PendingState,
38}
39
40impl ExecutionLog {
41    #[must_use]
42    pub fn can_be_retried_after(
43        temporary_event_count: u32,
44        max_retries: u32,
45        retry_exp_backoff: Duration,
46    ) -> Option<Duration> {
47        // If max_retries == u32::MAX, wrapping is OK after this succeeds - we want to retry forever.
48        if temporary_event_count <= max_retries {
49            // TODO: Add test for number of retries
50            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
51            Some(duration)
52        } else {
53            None
54        }
55    }
56
57    #[must_use]
58    pub fn compute_retry_duration_when_retrying_forever(
59        temporary_event_count: u32,
60        retry_exp_backoff: Duration,
61    ) -> Duration {
62        Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
63            .expect("`max_retries` set to MAX must never return None")
64    }
65
66    #[must_use]
67    pub fn retry_exp_backoff(&self) -> Duration {
68        assert_matches!(self.events.first(), Some(ExecutionEvent {
69            event: ExecutionEventInner::Created { retry_exp_backoff, .. },
70            ..
71        }) => *retry_exp_backoff)
72    }
73
74    #[must_use]
75    pub fn max_retries(&self) -> u32 {
76        assert_matches!(self.events.first(), Some(ExecutionEvent {
77            event: ExecutionEventInner::Created { max_retries, .. },
78            ..
79        }) => *max_retries)
80    }
81
82    #[must_use]
83    pub fn ffqn(&self) -> &FunctionFqn {
84        assert_matches!(self.events.first(), Some(ExecutionEvent {
85            event: ExecutionEventInner::Created { ffqn, .. },
86            ..
87        }) => ffqn)
88    }
89
90    #[must_use]
91    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
92        assert_matches!(self.events.first(), Some(ExecutionEvent {
93            event: ExecutionEventInner::Created { parent, .. },
94            ..
95        }) => parent.clone())
96    }
97
98    #[must_use]
99    pub fn last_event(&self) -> &ExecutionEvent {
100        self.events.last().expect("must contain at least one event")
101    }
102
103    #[must_use]
104    pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
105        if let ExecutionEvent {
106            event: ExecutionEventInner::Finished { result, .. },
107            ..
108        } = self.events.pop().expect("must contain at least one event")
109        {
110            Some(result)
111        } else {
112            None
113        }
114    }
115
116    pub fn event_history(&self) -> impl Iterator<Item = HistoryEvent> + '_ {
117        self.events.iter().filter_map(|event| {
118            if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
119                Some(eh.clone())
120            } else {
121                None
122            }
123        })
124    }
125
126    #[cfg(feature = "test")]
127    #[must_use]
128    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
129        self.events
130            .iter()
131            .find_map(move |event| match &event.event {
132                ExecutionEventInner::HistoryEvent {
133                    event:
134                        HistoryEvent::JoinSetRequest {
135                            join_set_id: found,
136                            request,
137                        },
138                    ..
139                } if *join_set_id == *found => Some(request),
140                _ => None,
141            })
142    }
143}
144
145pub type VersionType = u32;
146#[derive(
147    Debug,
148    Default,
149    Clone,
150    PartialEq,
151    Eq,
152    Hash,
153    derive_more::Display,
154    derive_more::Into,
155    serde::Serialize,
156    serde::Deserialize,
157)]
158#[serde(transparent)]
159pub struct Version(pub VersionType);
160impl Version {
161    #[must_use]
162    pub fn new(arg: VersionType) -> Version {
163        Version(arg)
164    }
165
166    #[must_use]
167    pub fn increment(&self) -> Version {
168        Version(self.0 + 1)
169    }
170}
171
172#[derive(
173    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
174)]
175#[display("{event}")]
176pub struct ExecutionEvent {
177    pub created_at: DateTime<Utc>,
178    pub event: ExecutionEventInner,
179    pub backtrace_id: Option<Version>,
180}
181
182/// Moves the execution to [`PendingState::PendingNow`] if it is currently blocked on `JoinNextBlocking`.
183#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
184pub struct JoinSetResponseEventOuter {
185    pub created_at: DateTime<Utc>,
186    pub event: JoinSetResponseEvent,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
190pub struct JoinSetResponseEvent {
191    pub join_set_id: JoinSetId,
192    pub event: JoinSetResponse,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
197#[serde(tag = "type")]
198pub enum JoinSetResponse {
199    DelayFinished {
200        delay_id: DelayId,
201    },
202    ChildExecutionFinished {
203        child_execution_id: ExecutionIdDerived,
204        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
205        finished_version: Version,
206        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
207        result: SupportedFunctionReturnValue,
208    },
209}
210
211pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
212    ffqn: FunctionFqn::new_static("", ""),
213    params: Params::empty(),
214    parent: None,
215    scheduled_at: DateTime::from_timestamp_nanos(0),
216    retry_exp_backoff: Duration::ZERO,
217    max_retries: 0,
218    component_id: ComponentId::dummy_activity(),
219    metadata: ExecutionMetadata::empty(),
220    scheduled_by: None,
221};
222pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
223    event: HistoryEvent::JoinSetCreate {
224        join_set_id: JoinSetId {
225            kind: crate::JoinSetKind::OneOff,
226            name: StrVariant::empty(),
227        },
228        closing_strategy: ClosingStrategy::Complete,
229    },
230};
231
232#[derive(
233    Clone,
234    derive_more::Debug,
235    derive_more::Display,
236    PartialEq,
237    Eq,
238    Serialize,
239    Deserialize,
240    IntoStaticStr,
241)]
242#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
243#[allow(clippy::large_enum_variant)]
244pub enum ExecutionEventInner {
245    /// Created by an external system or a scheduler when requesting a child execution or
246    /// an executor when continuing as new `FinishedExecutionError`::`ContinueAsNew`,`CancelledWithNew` .
247    /// The execution is [`PendingState::PendingAt`]`(scheduled_at)`.
248    #[display("Created({ffqn}, `{scheduled_at}`)")]
249    Created {
250        ffqn: FunctionFqn,
251        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
252        #[debug(skip)]
253        params: Params,
254        parent: Option<(ExecutionId, JoinSetId)>,
255        scheduled_at: DateTime<Utc>,
256        retry_exp_backoff: Duration,
257        max_retries: u32,
258        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
259        component_id: ComponentId,
260        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
261        metadata: ExecutionMetadata,
262        scheduled_by: Option<ExecutionId>,
263    },
264    // Created by an executor.
265    // Either immediately followed by an execution request by an executor or
266    // after expiry immediately followed by WaitingForExecutor by a scheduler.
267    #[display("Locked(`{lock_expires_at}`, {component_id})")]
268    Locked {
269        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
270        component_id: ComponentId,
271        executor_id: ExecutorId,
272        run_id: RunId,
273        lock_expires_at: DateTime<Utc>,
274    },
275    /// Returns execution to [`PendingState::PendingNow`] state
276    /// without timing out. This can happen when the executor is running
277    /// out of resources like [`WorkerError::LimitReached`] or when
278    /// the executor is shutting down.
279    #[display("Unlocked(`{backoff_expires_at}`)")]
280    Unlocked {
281        backoff_expires_at: DateTime<Utc>,
282        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
283        reason: StrVariant,
284    },
285    // Created by the executor holding the lock.
286    // After expiry interpreted as pending.
287    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
288    TemporarilyFailed {
289        backoff_expires_at: DateTime<Utc>,
290        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
291        reason_full: StrVariant,
292        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason inner")))]
293        reason_inner: StrVariant,
294        detail: Option<String>,
295        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
296        http_client_traces: Option<Vec<HttpClientTrace>>,
297    },
298    // Created by the executor holding last lock.
299    // After expiry interpreted as pending.
300    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
301    TemporarilyTimedOut {
302        backoff_expires_at: DateTime<Utc>,
303        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
304        http_client_traces: Option<Vec<HttpClientTrace>>,
305    },
306    // Created by the executor holding last lock.
307    // Processed by a scheduler if a parent execution needs to be notified,
308    // also when
309    #[display("Finished")]
310    Finished {
311        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
312        result: SupportedFunctionReturnValue,
313        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
314        http_client_traces: Option<Vec<HttpClientTrace>>,
315    },
316
317    #[display("HistoryEvent({event})")]
318    HistoryEvent { event: HistoryEvent },
319}
320
321impl ExecutionEventInner {
322    #[must_use]
323    pub fn is_temporary_event(&self) -> bool {
324        matches!(
325            self,
326            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
327        )
328    }
329
330    #[must_use]
331    pub fn variant(&self) -> &'static str {
332        Into::<&'static str>::into(self)
333    }
334
335    #[must_use]
336    pub fn join_set_id(&self) -> Option<&JoinSetId> {
337        match self {
338            Self::Created {
339                parent: Some((_parent_id, join_set_id)),
340                ..
341            } => Some(join_set_id),
342            Self::HistoryEvent {
343                event:
344                    HistoryEvent::JoinSetCreate { join_set_id, .. }
345                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
346                    | HistoryEvent::JoinNext { join_set_id, .. },
347            } => Some(join_set_id),
348            _ => None,
349        }
350    }
351}
352
353#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
354#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
355#[serde(tag = "type")]
356pub enum PersistKind {
357    #[display("RandomU64({min}, {max_inclusive})")]
358    RandomU64 { min: u64, max_inclusive: u64 },
359    #[display("RandomString({min_length}, {max_length_exclusive})")]
360    RandomString {
361        min_length: u64,
362        max_length_exclusive: u64,
363    },
364}
365
366#[must_use]
367pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
368    value.to_be_bytes()
369}
370
371#[must_use]
372pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
373    u64::from_be_bytes(bytes)
374}
375
376#[derive(
377    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
378)]
379#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
380#[serde(tag = "type")]
381/// Must be created by the executor in [`PendingState::Locked`].
382pub enum HistoryEvent {
383    /// Persist a generated pseudorandom value.
384    #[display("Persist")]
385    Persist {
386        #[debug(skip)]
387        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
388        kind: PersistKind,
389    },
390    #[display("JoinSetCreate({join_set_id})")]
391    JoinSetCreate {
392        join_set_id: JoinSetId,
393        closing_strategy: ClosingStrategy,
394    },
395    #[display("JoinSetRequest({join_set_id}, {request})")]
396    JoinSetRequest {
397        join_set_id: JoinSetId,
398        request: JoinSetRequest,
399    },
400    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
401    /// When the response arrives at `resp_time`:
402    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
403    /// original executor can continue. After the expiry any executor can continue without
404    /// marking the execution as timed out.
405    #[display("JoinNext({join_set_id})")]
406    JoinNext {
407        join_set_id: JoinSetId,
408        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
409        /// The pending status will be kept in Locked state until `run_expires_at`.
410        run_expires_at: DateTime<Utc>,
411        /// Set to a specific function when calling `-await-next` extension function, used for
412        /// determinism checks.
413        requested_ffqn: Option<FunctionFqn>,
414        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
415        closing: bool,
416    },
417    /// Records the fact that a join set was awaited more times than its submission count.
418    #[display("JoinNextTooMany({join_set_id})")]
419    JoinNextTooMany {
420        join_set_id: JoinSetId,
421        /// Set to a specific function when calling `-await-next` extension function, used for
422        /// determinism checks.
423        requested_ffqn: Option<FunctionFqn>,
424    },
425    #[display("Schedule({execution_id}, {schedule_at})")]
426    Schedule {
427        execution_id: ExecutionId,
428        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
429    },
430    #[display("Stub({target_execution_id})")]
431    Stub {
432        target_execution_id: ExecutionIdDerived,
433        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
434        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
435        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
436    },
437}
438
439#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
440#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
441pub enum HistoryEventScheduleAt {
442    Now,
443    At(DateTime<Utc>),
444    #[display("In({_0:?})")]
445    In(Duration),
446}
447
448#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
449pub enum ScheduleAtConversionError {
450    #[error("source duration value is out of range")]
451    OutOfRangeError,
452}
453
454impl HistoryEventScheduleAt {
455    pub fn as_date_time(
456        &self,
457        now: DateTime<Utc>,
458    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
459        match self {
460            Self::Now => Ok(now),
461            Self::At(date_time) => Ok(*date_time),
462            Self::In(duration) => {
463                let time_delta = TimeDelta::from_std(*duration)
464                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
465                now.checked_add_signed(time_delta)
466                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
467            }
468        }
469    }
470}
471
472#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
473#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
474#[serde(tag = "type")]
475pub enum JoinSetRequest {
476    // Must be created by the executor in `PendingState::Locked`.
477    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
478    DelayRequest {
479        delay_id: DelayId,
480        expires_at: DateTime<Utc>,
481        schedule_at: HistoryEventScheduleAt,
482    },
483    // Must be created by the executor in `PendingState::Locked`.
484    #[display("ChildExecutionRequest({child_execution_id})")]
485    ChildExecutionRequest {
486        child_execution_id: ExecutionIdDerived,
487    },
488}
489
490/// Error that is not specific to an execution.
491#[derive(Debug, Clone, thiserror::Error, PartialEq)]
492pub enum DbErrorGeneric {
493    #[error("database error: {0}")]
494    Uncategorized(StrVariant), // Previously GenericError
495    #[error("database was closed")]
496    Close,
497}
498
499#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
500pub enum DbErrorWritePermanent {
501    /// Parameter error.
502    #[error("validation failed: {0}")]
503    ValidationFailed(StrVariant),
504    #[error("cannot write: {reason}")]
505    CannotWrite {
506        reason: StrVariant,
507        expected_version: Option<Version>,
508    },
509}
510
511/// Write error tied to an execution
512#[derive(Debug, Clone, thiserror::Error, PartialEq)]
513pub enum DbErrorWrite {
514    #[error("cannot write - row not found")]
515    NotFound,
516    /// Retrying will not resolve the error, the execution is in an incorrect state.
517    #[error("permanent error: {0}")]
518    Permanent(#[from] DbErrorWritePermanent),
519    #[error(transparent)]
520    DbErrorGeneric(#[from] DbErrorGeneric),
521}
522
523/// Read error tied to an execution
524#[derive(Debug, Clone, thiserror::Error, PartialEq)]
525pub enum DbErrorRead {
526    #[error("cannot read - row not found")]
527    NotFound,
528    #[error(transparent)]
529    DbErrorGeneric(#[from] DbErrorGeneric),
530}
531
532impl From<DbErrorRead> for DbErrorWrite {
533    fn from(value: DbErrorRead) -> DbErrorWrite {
534        match value {
535            DbErrorRead::NotFound => DbErrorWrite::NotFound,
536            DbErrorRead::DbErrorGeneric(err) => DbErrorWrite::DbErrorGeneric(err),
537        }
538    }
539}
540
541#[derive(Debug, thiserror::Error, PartialEq)]
542pub enum DbErrorReadWithTimeout {
543    #[error("timeout")]
544    Timeout,
545    #[error(transparent)]
546    DbErrorRead(#[from] DbErrorRead),
547}
548impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
549    fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
550        Self::from(DbErrorRead::from(value))
551    }
552}
553
554// Represents next version after successfuly appended to execution log.
555// TODO: Convert to struct with next_version
556pub type AppendResponse = Version;
557pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
558
559#[derive(Debug, Clone)]
560pub struct LockedExecution {
561    pub execution_id: ExecutionId,
562    pub next_version: Version,
563    pub metadata: ExecutionMetadata,
564    pub run_id: RunId,
565    pub ffqn: FunctionFqn,
566    pub params: Params,
567    pub event_history: Vec<HistoryEvent>,
568    pub responses: Vec<JoinSetResponseEventOuter>,
569    pub retry_exp_backoff: Duration,
570    pub max_retries: u32,
571    pub parent: Option<(ExecutionId, JoinSetId)>,
572    pub intermittent_event_count: u32,
573}
574
575pub type LockPendingResponse = Vec<LockedExecution>;
576pub type AppendBatchResponse = Version;
577
578#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
579#[display("{event}")]
580pub struct AppendRequest {
581    pub created_at: DateTime<Utc>,
582    pub event: ExecutionEventInner,
583}
584
585#[derive(Debug, Clone)]
586pub struct CreateRequest {
587    pub created_at: DateTime<Utc>,
588    pub execution_id: ExecutionId,
589    pub ffqn: FunctionFqn,
590    pub params: Params,
591    pub parent: Option<(ExecutionId, JoinSetId)>,
592    pub scheduled_at: DateTime<Utc>,
593    pub retry_exp_backoff: Duration,
594    pub max_retries: u32,
595    pub component_id: ComponentId,
596    pub metadata: ExecutionMetadata,
597    pub scheduled_by: Option<ExecutionId>,
598}
599
600impl From<CreateRequest> for ExecutionEventInner {
601    fn from(value: CreateRequest) -> Self {
602        Self::Created {
603            ffqn: value.ffqn,
604            params: value.params,
605            parent: value.parent,
606            scheduled_at: value.scheduled_at,
607            retry_exp_backoff: value.retry_exp_backoff,
608            max_retries: value.max_retries,
609            component_id: value.component_id,
610            metadata: value.metadata,
611            scheduled_by: value.scheduled_by,
612        }
613    }
614}
615
616#[async_trait]
617pub trait DbPool: Send + Sync {
618    fn connection(&self) -> Box<dyn DbConnection>;
619}
620
621#[async_trait]
622pub trait DbPoolCloseable {
623    async fn close(self);
624}
625
626#[async_trait]
627pub trait DbExecutor: Send + Sync {
628    #[expect(clippy::too_many_arguments)]
629    async fn lock_pending(
630        &self,
631        batch_size: usize,
632        pending_at_or_sooner: DateTime<Utc>,
633        ffqns: Arc<[FunctionFqn]>,
634        created_at: DateTime<Utc>,
635        component_id: ComponentId,
636        executor_id: ExecutorId,
637        lock_expires_at: DateTime<Utc>,
638        run_id: RunId,
639    ) -> Result<LockPendingResponse, DbErrorGeneric>;
640
641    /// Specialized locking for e.g. extending the lock by the original executor and run.
642    #[expect(clippy::too_many_arguments)]
643    async fn lock_one(
644        &self,
645        created_at: DateTime<Utc>,
646        component_id: ComponentId,
647        execution_id: &ExecutionId,
648        run_id: RunId,
649        version: Version,
650        executor_id: ExecutorId,
651        lock_expires_at: DateTime<Utc>,
652    ) -> Result<LockedExecution, DbErrorWrite>;
653
654    /// Append a single event to an existing execution log.
655    /// The request cannot contain `ExecutionEventInner::Created`.
656    async fn append(
657        &self,
658        execution_id: ExecutionId,
659        version: Version,
660        req: AppendRequest,
661    ) -> Result<AppendResponse, DbErrorWrite>;
662
663    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
664    /// The batch cannot contain `ExecutionEventInner::Created`.
665    async fn append_batch_respond_to_parent(
666        &self,
667        execution_id: ExecutionIdDerived,
668        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
669        batch: Vec<AppendRequest>,
670        version: Version,
671        parent_execution_id: ExecutionId,
672        parent_response_event: JoinSetResponseEventOuter,
673    ) -> Result<AppendBatchResponse, DbErrorWrite>;
674
675    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
676    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
677    /// Otherwise wait until `timeout_fut` resolves.
678    /// Timers that expire between `pending_at_or_sooner` and timeout can be disregarded.
679    async fn wait_for_pending(
680        &self,
681        pending_at_or_sooner: DateTime<Utc>,
682        ffqns: Arc<[FunctionFqn]>,
683        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
684    );
685}
686
687#[async_trait]
688pub trait DbConnection: DbExecutor {
689    async fn append_response(
690        &self,
691        created_at: DateTime<Utc>,
692        execution_id: ExecutionId,
693        response_event: JoinSetResponseEvent,
694    ) -> Result<(), DbErrorWrite>;
695
696    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
697    /// The batch cannot contain `ExecutionEventInner::Created`.
698    async fn append_batch(
699        &self,
700        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
701        batch: Vec<AppendRequest>,
702        execution_id: ExecutionId,
703        version: Version,
704    ) -> Result<AppendBatchResponse, DbErrorWrite>;
705
706    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
707    /// The batch cannot contain `ExecutionEventInner::Created`.
708    async fn append_batch_create_new_execution(
709        &self,
710        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
711        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
712        execution_id: ExecutionId,
713        version: Version,
714        child_req: Vec<CreateRequest>,
715    ) -> Result<AppendBatchResponse, DbErrorWrite>;
716
717    #[cfg(feature = "test")]
718    /// Get execution log.
719    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
720
721    async fn list_execution_events(
722        &self,
723        execution_id: &ExecutionId,
724        since: &Version,
725        max_length: VersionType,
726        include_backtrace_id: bool,
727    ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
728
729    /// Get a single event without `backtrace_id`
730    async fn get_execution_event(
731        &self,
732        execution_id: &ExecutionId,
733        version: &Version,
734    ) -> Result<ExecutionEvent, DbErrorRead>;
735
736    async fn get_create_request(
737        &self,
738        execution_id: &ExecutionId,
739    ) -> Result<CreateRequest, DbErrorRead> {
740        let execution_event = self
741            .get_execution_event(execution_id, &Version::new(0))
742            .await?;
743        if let ExecutionEventInner::Created {
744            ffqn,
745            params,
746            parent,
747            scheduled_at,
748            retry_exp_backoff,
749            max_retries,
750            component_id,
751            metadata,
752            scheduled_by,
753        } = execution_event.event
754        {
755            Ok(CreateRequest {
756                created_at: execution_event.created_at,
757                execution_id: execution_id.clone(),
758                ffqn,
759                params,
760                parent,
761                scheduled_at,
762                retry_exp_backoff,
763                max_retries,
764                component_id,
765                metadata,
766                scheduled_by,
767            })
768        } else {
769            error!(%execution_id, "Execution log must start with creation");
770            Err(DbErrorRead::DbErrorGeneric(DbErrorGeneric::Uncategorized(
771                "execution log must start with creation".into(),
772            )))
773        }
774    }
775
776    async fn get_finished_result(
777        &self,
778        execution_id: &ExecutionId,
779        finished: PendingStateFinished,
780    ) -> Result<Option<SupportedFunctionReturnValue>, DbErrorRead> {
781        let last_event = self
782            .get_execution_event(execution_id, &Version::new(finished.version))
783            .await?;
784        if let ExecutionEventInner::Finished { result, .. } = last_event.event {
785            Ok(Some(result))
786        } else {
787            Ok(None)
788        }
789    }
790
791    async fn get_pending_state(
792        &self,
793        execution_id: &ExecutionId,
794    ) -> Result<PendingState, DbErrorRead>;
795
796    /// Get currently expired locks and async timers (delay requests)
797    async fn get_expired_timers(
798        &self,
799        at: DateTime<Utc>,
800    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
801
802    /// Create a new execution log
803    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
804
805    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
806    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
807    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
808    /// Implementations with no pubsub support should use polling.
809    async fn subscribe_to_next_responses(
810        &self,
811        execution_id: &ExecutionId,
812        start_idx: usize,
813        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
814    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
815
816    /// Notification mechainism with no strict guarantees for getting the finished result.
817    /// Implementations with no pubsub support should use polling.
818    async fn wait_for_finished_result(
819        &self,
820        execution_id: &ExecutionId,
821        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
822    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
823
824    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
825
826    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
827
828    /// Used by gRPC only.
829    /// Get the latest backtrace if version is not set.
830    async fn get_backtrace(
831        &self,
832        execution_id: &ExecutionId,
833        filter: BacktraceFilter,
834    ) -> Result<BacktraceInfo, DbErrorRead>;
835
836    /// Returns executions sorted in descending order.
837    /// Used by gRPC only.
838    async fn list_executions(
839        &self,
840        ffqn: Option<FunctionFqn>,
841        top_level_only: bool,
842        pagination: ExecutionListPagination,
843    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
844
845    /// Returns responses of an execution ordered as they arrived,
846    /// enabling matching each `JoinNext` to its corresponding response.
847    /// Used by gRPC only.
848    async fn list_responses(
849        &self,
850        execution_id: &ExecutionId,
851        pagination: Pagination<u32>,
852    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
853}
854
855#[derive(Clone)]
856pub enum BacktraceFilter {
857    First,
858    Last,
859    Specific(Version),
860}
861
862pub struct BacktraceInfo {
863    pub execution_id: ExecutionId,
864    pub component_id: ComponentId,
865    pub version_min_including: Version,
866    pub version_max_excluding: Version,
867    pub wasm_backtrace: WasmBacktrace,
868}
869
870#[derive(Serialize, Deserialize, Debug, Clone)]
871pub struct WasmBacktrace {
872    pub frames: Vec<FrameInfo>,
873}
874
875#[derive(Serialize, Deserialize, Debug, Clone)]
876pub struct FrameInfo {
877    pub module: String,
878    pub func_name: String,
879    pub symbols: Vec<FrameSymbol>,
880}
881
882#[derive(Serialize, Deserialize, Debug, Clone)]
883pub struct FrameSymbol {
884    pub func_name: Option<String>,
885    pub file: Option<String>,
886    pub line: Option<u32>,
887    pub col: Option<u32>,
888}
889
890mod wasm_backtrace {
891    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
892
893    impl WasmBacktrace {
894        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
895            if backtrace.frames().is_empty() {
896                None
897            } else {
898                Some(Self {
899                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
900                })
901            }
902        }
903    }
904
905    impl From<&wasmtime::FrameInfo> for FrameInfo {
906        fn from(frame: &wasmtime::FrameInfo) -> Self {
907            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
908            let mut func_name = String::new();
909            wasmtime_environ::demangle_function_name_or_index(
910                &mut func_name,
911                frame.func_name(),
912                frame.func_index() as usize,
913            )
914            .expect("writing to string must succeed");
915            Self {
916                module: module_name,
917                func_name,
918                symbols: frame
919                    .symbols()
920                    .iter()
921                    .map(std::convert::Into::into)
922                    .collect(),
923            }
924        }
925    }
926
927    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
928        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
929            let func_name = symbol.name().map(|name| {
930                let mut writer = String::new();
931                wasmtime_environ::demangle_function_name(&mut writer, name)
932                    .expect("writing to string must succeed");
933                writer
934            });
935
936            Self {
937                func_name,
938                file: symbol.file().map(ToString::to_string),
939                line: symbol.line(),
940                col: symbol.column(),
941            }
942        }
943    }
944}
945
946pub type ResponseCursorType = u32; // FIXME: Switch to u64
947
948#[derive(Debug, Clone)]
949pub struct ResponseWithCursor {
950    pub event: JoinSetResponseEventOuter,
951    pub cursor: ResponseCursorType,
952}
953
954#[derive(Debug)]
955pub struct ExecutionWithState {
956    pub execution_id: ExecutionId,
957    pub ffqn: FunctionFqn,
958    pub pending_state: PendingState,
959    pub created_at: DateTime<Utc>,
960    pub scheduled_at: DateTime<Utc>,
961}
962
963pub enum ExecutionListPagination {
964    CreatedBy(Pagination<Option<DateTime<Utc>>>),
965    ExecutionId(Pagination<Option<ExecutionId>>),
966}
967
968#[derive(Debug, Clone, Copy)]
969pub enum Pagination<T> {
970    NewerThan {
971        length: ResponseCursorType,
972        cursor: T,
973        including_cursor: bool,
974    },
975    OlderThan {
976        length: ResponseCursorType,
977        cursor: T,
978        including_cursor: bool,
979    },
980}
981impl<T> Pagination<T> {
982    pub fn length(&self) -> ResponseCursorType {
983        match self {
984            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
985        }
986    }
987    pub fn rel(&self) -> &'static str {
988        match self {
989            Pagination::NewerThan {
990                including_cursor: false,
991                ..
992            } => ">",
993            Pagination::NewerThan {
994                including_cursor: true,
995                ..
996            } => ">=",
997            Pagination::OlderThan {
998                including_cursor: false,
999                ..
1000            } => "<",
1001            Pagination::OlderThan {
1002                including_cursor: true,
1003                ..
1004            } => "<=",
1005        }
1006    }
1007    pub fn is_desc(&self) -> bool {
1008        matches!(self, Pagination::OlderThan { .. })
1009    }
1010}
1011
1012#[cfg(feature = "test")]
1013pub async fn wait_for_pending_state_fn<T: Debug>(
1014    db_connection: &dyn DbConnection,
1015    execution_id: &ExecutionId,
1016    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1017    timeout: Option<Duration>,
1018) -> Result<T, DbErrorReadWithTimeout> {
1019    tracing::trace!(%execution_id, "Waiting for predicate");
1020    let fut = async move {
1021        loop {
1022            let execution_log = db_connection.get(execution_id).await?;
1023            if let Some(t) = predicate(execution_log) {
1024                tracing::debug!(%execution_id, "Found: {t:?}");
1025                return Ok(t);
1026            }
1027            tokio::time::sleep(Duration::from_millis(10)).await;
1028        }
1029    };
1030
1031    if let Some(timeout) = timeout {
1032        tokio::select! { // future's liveness: Dropping the loser immediately.
1033            res = fut => res,
1034            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1035        }
1036    } else {
1037        fut.await
1038    }
1039}
1040
1041#[derive(Debug, Clone, PartialEq, Eq)]
1042pub enum ExpiredTimer {
1043    Lock(ExpiredLock),
1044    Delay(ExpiredDelay),
1045}
1046
1047#[derive(Debug, Clone, PartialEq, Eq)]
1048pub struct ExpiredLock {
1049    pub execution_id: ExecutionId,
1050    // Version of last `Locked` event, used to detect whether the execution made progress.
1051    pub locked_at_version: Version,
1052    pub next_version: Version,
1053    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1054    pub intermittent_event_count: u32,
1055    pub max_retries: u32,
1056    pub retry_exp_backoff: Duration,
1057    pub parent: Option<(ExecutionId, JoinSetId)>, // TODO: Remove
1058}
1059
1060#[derive(Debug, Clone, PartialEq, Eq)]
1061pub struct ExpiredDelay {
1062    pub execution_id: ExecutionId,
1063    pub join_set_id: JoinSetId,
1064    pub delay_id: DelayId,
1065}
1066
1067#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1068#[serde(tag = "type")]
1069pub enum PendingState {
1070    #[display("Locked(`{lock_expires_at}`, {executor_id}, {run_id})")]
1071    Locked {
1072        executor_id: ExecutorId,
1073        run_id: RunId,
1074        lock_expires_at: DateTime<Utc>,
1075    },
1076    #[display("PendingAt(`{scheduled_at}`)")]
1077    PendingAt { scheduled_at: DateTime<Utc> }, // e.g. created with a schedule, temporary timeout/failure
1078    #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1079    /// Caused by [`HistoryEvent::JoinNext`]
1080    BlockedByJoinSet {
1081        join_set_id: JoinSetId,
1082        /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1083        lock_expires_at: DateTime<Utc>,
1084        /// Blocked by closing of the join set
1085        closing: bool,
1086    },
1087    #[display("Finished({finished})")]
1088    Finished { finished: PendingStateFinished },
1089}
1090
1091#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1092#[display("{result_kind}")]
1093pub struct PendingStateFinished {
1094    pub version: VersionType, // not Version since it must be Copy
1095    pub finished_at: DateTime<Utc>,
1096    pub result_kind: PendingStateFinishedResultKind,
1097}
1098
1099#[derive(
1100    Debug, Clone, Copy, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
1101)]
1102pub struct PendingStateFinishedResultKind(pub Result<(), PendingStateFinishedError>);
1103const OK_VARIANT: &str = "ok";
1104impl FromStr for PendingStateFinishedResultKind {
1105    type Err = strum::ParseError;
1106
1107    fn from_str(s: &str) -> Result<Self, Self::Err> {
1108        if s == OK_VARIANT {
1109            Ok(PendingStateFinishedResultKind(Ok(())))
1110        } else {
1111            let err = PendingStateFinishedError::from_str(s)?;
1112            Ok(PendingStateFinishedResultKind(Err(err)))
1113        }
1114    }
1115}
1116
1117impl Display for PendingStateFinishedResultKind {
1118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1119        match self.0 {
1120            Ok(()) => write!(f, "{OK_VARIANT}"),
1121            Err(err) => write!(f, "{err}"),
1122        }
1123    }
1124}
1125
1126impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1127    fn from(result: &SupportedFunctionReturnValue) -> Self {
1128        result.as_pending_state_finished_result()
1129    }
1130}
1131
1132#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumString, strum::Display)]
1133#[cfg_attr(test, derive(strum::VariantArray))]
1134#[strum(serialize_all = "snake_case")]
1135pub enum PendingStateFinishedError {
1136    Timeout,
1137    ExecutionFailure,
1138    FallibleError,
1139}
1140
1141impl PendingState {
1142    pub fn can_append_lock(
1143        &self,
1144        created_at: DateTime<Utc>,
1145        executor_id: ExecutorId,
1146        run_id: RunId,
1147        lock_expires_at: DateTime<Utc>,
1148    ) -> Result<LockKind, DbErrorWritePermanent> {
1149        if lock_expires_at <= created_at {
1150            return Err(DbErrorWritePermanent::ValidationFailed(
1151                "invalid expiry date".into(),
1152            ));
1153        }
1154        match self {
1155            PendingState::PendingAt { scheduled_at } => {
1156                if *scheduled_at <= created_at {
1157                    // pending now, ok to lock
1158                    Ok(LockKind::CreatingNewLock)
1159                } else {
1160                    Err(DbErrorWritePermanent::ValidationFailed(
1161                        "cannot lock, not yet pending".into(),
1162                    ))
1163                }
1164            }
1165            PendingState::Locked {
1166                executor_id: current_pending_state_executor_id,
1167                run_id: current_pending_state_run_id,
1168                ..
1169            } => {
1170                if executor_id == *current_pending_state_executor_id
1171                    && run_id == *current_pending_state_run_id
1172                {
1173                    // Original executor is extending the lock.
1174                    Ok(LockKind::Extending)
1175                } else {
1176                    Err(DbErrorWritePermanent::CannotWrite {
1177                        reason: "cannot lock, already locked".into(),
1178                        expected_version: None,
1179                    })
1180                }
1181            }
1182            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWritePermanent::CannotWrite {
1183                reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1184                expected_version: None,
1185            }),
1186            PendingState::Finished { .. } => Err(DbErrorWritePermanent::CannotWrite {
1187                reason: "already finished".into(),
1188                expected_version: None,
1189            }),
1190        }
1191    }
1192
1193    #[must_use]
1194    pub fn is_finished(&self) -> bool {
1195        matches!(self, PendingState::Finished { .. })
1196    }
1197}
1198
1199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1200pub enum LockKind {
1201    Extending,
1202    CreatingNewLock,
1203}
1204
1205pub mod http_client_trace {
1206    use chrono::{DateTime, Utc};
1207    use serde::{Deserialize, Serialize};
1208
1209    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1210    pub struct HttpClientTrace {
1211        pub req: RequestTrace,
1212        pub resp: Option<ResponseTrace>,
1213    }
1214
1215    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1216    pub struct RequestTrace {
1217        pub sent_at: DateTime<Utc>,
1218        pub uri: String,
1219        pub method: String,
1220    }
1221
1222    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1223    pub struct ResponseTrace {
1224        pub finished_at: DateTime<Utc>,
1225        pub status: Result<u16, String>,
1226    }
1227}
1228
1229#[cfg(test)]
1230mod tests {
1231    use super::HistoryEventScheduleAt;
1232    use super::PendingStateFinished;
1233    use super::PendingStateFinishedError;
1234    use super::PendingStateFinishedResultKind;
1235    use crate::SupportedFunctionReturnValue;
1236    use chrono::DateTime;
1237    use chrono::Datelike;
1238    use chrono::Utc;
1239    use insta::assert_snapshot;
1240    use rstest::rstest;
1241    use std::time::Duration;
1242    use strum::VariantArray as _;
1243    use val_json::type_wrapper::TypeWrapper;
1244    use val_json::wast_val::WastVal;
1245    use val_json::wast_val::WastValWithType;
1246
1247    #[rstest(expected => [
1248        PendingStateFinishedResultKind(Result::Ok(())),
1249        PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1250    ])]
1251    #[test]
1252    fn serde_pending_state_finished_result_kind_should_work(
1253        expected: PendingStateFinishedResultKind,
1254    ) {
1255        let ser = serde_json::to_string(&expected).unwrap();
1256        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1257        assert_eq!(expected, actual);
1258    }
1259
1260    #[rstest(result_kind => [
1261        PendingStateFinishedResultKind(Result::Ok(())),
1262        PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1263    ])]
1264    #[test]
1265    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1266        let expected = PendingStateFinished {
1267            version: 0,
1268            finished_at: Utc::now(),
1269            result_kind,
1270        };
1271
1272        let ser = serde_json::to_string(&expected).unwrap();
1273        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1274        assert_eq!(expected, actual);
1275    }
1276
1277    #[test]
1278    fn join_set_deser_with_result_ok_option_none_should_work() {
1279        let expected = SupportedFunctionReturnValue::Ok {
1280            ok: Some(WastValWithType {
1281                r#type: TypeWrapper::Result {
1282                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1283                    err: Some(Box::new(TypeWrapper::String)),
1284                },
1285                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1286            }),
1287        };
1288        let json = serde_json::to_string(&expected).unwrap();
1289        assert_snapshot!(json);
1290
1291        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1292
1293        assert_eq!(expected, actual);
1294    }
1295
1296    #[test]
1297    fn verify_pending_state_finished_result_kind_serde() {
1298        let variants: Vec<_> = PendingStateFinishedError::VARIANTS
1299            .iter()
1300            .map(|var| PendingStateFinishedResultKind(Err(*var)))
1301            .chain(std::iter::once(PendingStateFinishedResultKind(Ok(()))))
1302            .collect();
1303        let ser = serde_json::to_string_pretty(&variants).unwrap();
1304        insta::assert_snapshot!(ser);
1305        let deser: Vec<PendingStateFinishedResultKind> = serde_json::from_str(&ser).unwrap();
1306        assert_eq!(variants, deser);
1307    }
1308
1309    #[test]
1310    fn as_date_time_should_work_with_duration_u32_max_secs() {
1311        let duration = Duration::from_secs(u64::from(u32::MAX));
1312        let schedule_at = HistoryEventScheduleAt::In(duration);
1313        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1314        assert_eq!(2106, resolved.year());
1315    }
1316
1317    const MILLIS_PER_SEC: i64 = 1000;
1318    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1319
1320    #[test]
1321    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1322        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
1323        let duration = Duration::from_secs(
1324            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1325        );
1326        let schedule_at = HistoryEventScheduleAt::In(duration);
1327        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1328    }
1329}