obeli_sk_concepts/
storage.rs

1use crate::ClosingStrategy;
2use crate::ComponentId;
3use crate::ExecutionId;
4use crate::ExecutionMetadata;
5use crate::FunctionFqn;
6use crate::JoinSetId;
7use crate::Params;
8use crate::StrVariant;
9use crate::SupportedFunctionReturnValue;
10use crate::prefixed_ulid::DelayId;
11use crate::prefixed_ulid::ExecutionIdDerived;
12use crate::prefixed_ulid::ExecutorId;
13use crate::prefixed_ulid::RunId;
14use assert_matches::assert_matches;
15use async_trait::async_trait;
16use chrono::TimeDelta;
17use chrono::{DateTime, Utc};
18use http_client_trace::HttpClientTrace;
19use serde::Deserialize;
20use serde::Serialize;
21use std::fmt::Debug;
22use std::fmt::Display;
23use std::pin::Pin;
24use std::str::FromStr;
25use std::sync::Arc;
26use std::time::Duration;
27use strum::IntoStaticStr;
28use tracing::error;
29
30/// Remote client representation of the execution journal.
31#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
32pub struct ExecutionLog {
33    pub execution_id: ExecutionId,
34    pub events: Vec<ExecutionEvent>,
35    pub responses: Vec<JoinSetResponseEventOuter>,
36    pub next_version: Version, // Is not advanced once in Finished state
37    pub pending_state: PendingState,
38}
39
40impl ExecutionLog {
41    #[must_use]
42    pub fn can_be_retried_after(
43        temporary_event_count: u32,
44        max_retries: u32,
45        retry_exp_backoff: Duration,
46    ) -> Option<Duration> {
47        // If max_retries == u32::MAX, wrapping is OK after this succeeds - we want to retry forever.
48        if temporary_event_count <= max_retries {
49            // TODO: Add test for number of retries
50            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
51            Some(duration)
52        } else {
53            None
54        }
55    }
56
57    #[must_use]
58    pub fn compute_retry_duration_when_retrying_forever(
59        temporary_event_count: u32,
60        retry_exp_backoff: Duration,
61    ) -> Duration {
62        Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
63            .expect("`max_retries` set to MAX must never return None")
64    }
65
66    #[must_use]
67    pub fn retry_exp_backoff(&self) -> Duration {
68        assert_matches!(self.events.first(), Some(ExecutionEvent {
69            event: ExecutionEventInner::Created { retry_exp_backoff, .. },
70            ..
71        }) => *retry_exp_backoff)
72    }
73
74    #[must_use]
75    pub fn max_retries(&self) -> u32 {
76        assert_matches!(self.events.first(), Some(ExecutionEvent {
77            event: ExecutionEventInner::Created { max_retries, .. },
78            ..
79        }) => *max_retries)
80    }
81
82    #[must_use]
83    pub fn ffqn(&self) -> &FunctionFqn {
84        assert_matches!(self.events.first(), Some(ExecutionEvent {
85            event: ExecutionEventInner::Created { ffqn, .. },
86            ..
87        }) => ffqn)
88    }
89
90    #[must_use]
91    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
92        assert_matches!(self.events.first(), Some(ExecutionEvent {
93            event: ExecutionEventInner::Created { parent, .. },
94            ..
95        }) => parent.clone())
96    }
97
98    #[must_use]
99    pub fn last_event(&self) -> &ExecutionEvent {
100        self.events.last().expect("must contain at least one event")
101    }
102
103    #[must_use]
104    pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
105        if let ExecutionEvent {
106            event: ExecutionEventInner::Finished { result, .. },
107            ..
108        } = self.events.pop().expect("must contain at least one event")
109        {
110            Some(result)
111        } else {
112            None
113        }
114    }
115
116    pub fn event_history(&self) -> impl Iterator<Item = HistoryEvent> + '_ {
117        self.events.iter().filter_map(|event| {
118            if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
119                Some(eh.clone())
120            } else {
121                None
122            }
123        })
124    }
125
126    #[cfg(feature = "test")]
127    #[must_use]
128    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
129        self.events
130            .iter()
131            .find_map(move |event| match &event.event {
132                ExecutionEventInner::HistoryEvent {
133                    event:
134                        HistoryEvent::JoinSetRequest {
135                            join_set_id: found,
136                            request,
137                        },
138                    ..
139                } if *join_set_id == *found => Some(request),
140                _ => None,
141            })
142    }
143}
144
145pub type VersionType = u32;
146#[derive(
147    Debug,
148    Default,
149    Clone,
150    PartialEq,
151    Eq,
152    Hash,
153    derive_more::Display,
154    derive_more::Into,
155    serde::Serialize,
156    serde::Deserialize,
157)]
158#[serde(transparent)]
159pub struct Version(pub VersionType);
160impl Version {
161    #[must_use]
162    pub fn new(arg: VersionType) -> Version {
163        Version(arg)
164    }
165
166    #[must_use]
167    pub fn increment(&self) -> Version {
168        Version(self.0 + 1)
169    }
170}
171
172#[derive(
173    Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
174)]
175#[display("{event}")]
176pub struct ExecutionEvent {
177    pub created_at: DateTime<Utc>,
178    pub event: ExecutionEventInner,
179    pub backtrace_id: Option<Version>,
180}
181
182/// Moves the execution to [`PendingState::PendingNow`] if it is currently blocked on `JoinNextBlocking`.
183#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
184pub struct JoinSetResponseEventOuter {
185    pub created_at: DateTime<Utc>,
186    pub event: JoinSetResponseEvent,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
190pub struct JoinSetResponseEvent {
191    pub join_set_id: JoinSetId,
192    pub event: JoinSetResponse,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
197#[serde(tag = "type")]
198pub enum JoinSetResponse {
199    DelayFinished {
200        delay_id: DelayId,
201    },
202    ChildExecutionFinished {
203        child_execution_id: ExecutionIdDerived,
204        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
205        finished_version: Version,
206        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
207        result: SupportedFunctionReturnValue,
208    },
209}
210
211pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
212    ffqn: FunctionFqn::new_static("", ""),
213    params: Params::empty(),
214    parent: None,
215    scheduled_at: DateTime::from_timestamp_nanos(0),
216    retry_exp_backoff: Duration::ZERO,
217    max_retries: 0,
218    component_id: ComponentId::dummy_activity(),
219    metadata: ExecutionMetadata::empty(),
220    scheduled_by: None,
221};
222pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
223    event: HistoryEvent::JoinSetCreate {
224        join_set_id: JoinSetId {
225            kind: crate::JoinSetKind::OneOff,
226            name: StrVariant::empty(),
227        },
228        closing_strategy: ClosingStrategy::Complete,
229    },
230};
231
232#[derive(
233    Clone,
234    derive_more::Debug,
235    derive_more::Display,
236    PartialEq,
237    Eq,
238    Serialize,
239    Deserialize,
240    IntoStaticStr,
241)]
242#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
243#[allow(clippy::large_enum_variant)]
244pub enum ExecutionEventInner {
245    /// Created by an external system or a scheduler when requesting a child execution or
246    /// an executor when continuing as new `FinishedExecutionError`::`ContinueAsNew`,`CancelledWithNew` .
247    /// The execution is [`PendingState::PendingAt`]`(scheduled_at)`.
248    #[display("Created({ffqn}, `{scheduled_at}`)")]
249    Created {
250        ffqn: FunctionFqn,
251        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
252        #[debug(skip)]
253        params: Params,
254        parent: Option<(ExecutionId, JoinSetId)>,
255        scheduled_at: DateTime<Utc>,
256        retry_exp_backoff: Duration,
257        max_retries: u32,
258        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
259        component_id: ComponentId,
260        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
261        metadata: ExecutionMetadata,
262        scheduled_by: Option<ExecutionId>,
263    },
264    // Created by an executor.
265    // Either immediately followed by an execution request by an executor or
266    // after expiry immediately followed by WaitingForExecutor by a scheduler.
267    #[display("Locked(`{lock_expires_at}`, {component_id})")]
268    Locked {
269        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
270        component_id: ComponentId,
271        executor_id: ExecutorId,
272        run_id: RunId,
273        lock_expires_at: DateTime<Utc>,
274    },
275    /// Returns execution to [`PendingState::PendingNow`] state
276    /// without timing out. This can happen when the executor is running
277    /// out of resources like [`WorkerError::LimitReached`] or when
278    /// the executor is shutting down.
279    #[display("Unlocked(`{backoff_expires_at}`)")]
280    Unlocked {
281        backoff_expires_at: DateTime<Utc>,
282        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
283        reason: StrVariant,
284    },
285    // Created by the executor holding the lock.
286    // After expiry interpreted as pending.
287    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
288    TemporarilyFailed {
289        backoff_expires_at: DateTime<Utc>,
290        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
291        reason_full: StrVariant,
292        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason inner")))]
293        reason_inner: StrVariant,
294        detail: Option<String>,
295        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
296        http_client_traces: Option<Vec<HttpClientTrace>>,
297    },
298    // Created by the executor holding last lock.
299    // After expiry interpreted as pending.
300    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
301    TemporarilyTimedOut {
302        backoff_expires_at: DateTime<Utc>,
303        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
304        http_client_traces: Option<Vec<HttpClientTrace>>,
305    },
306    // Created by the executor holding last lock.
307    // Processed by a scheduler if a parent execution needs to be notified,
308    // also when
309    #[display("Finished")]
310    Finished {
311        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
312        result: SupportedFunctionReturnValue,
313        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
314        http_client_traces: Option<Vec<HttpClientTrace>>,
315    },
316
317    #[display("HistoryEvent({event})")]
318    HistoryEvent { event: HistoryEvent },
319}
320
321impl ExecutionEventInner {
322    #[must_use]
323    pub fn is_temporary_event(&self) -> bool {
324        matches!(
325            self,
326            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
327        )
328    }
329
330    #[must_use]
331    pub fn variant(&self) -> &'static str {
332        Into::<&'static str>::into(self)
333    }
334
335    #[must_use]
336    pub fn join_set_id(&self) -> Option<&JoinSetId> {
337        match self {
338            Self::Created {
339                parent: Some((_parent_id, join_set_id)),
340                ..
341            } => Some(join_set_id),
342            Self::HistoryEvent {
343                event:
344                    HistoryEvent::JoinSetCreate { join_set_id, .. }
345                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
346                    | HistoryEvent::JoinNext { join_set_id, .. },
347            } => Some(join_set_id),
348            _ => None,
349        }
350    }
351}
352
353#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
354#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
355#[serde(tag = "type")]
356pub enum PersistKind {
357    #[display("RandomU64({min}, {max_inclusive})")]
358    RandomU64 { min: u64, max_inclusive: u64 },
359    #[display("RandomString({min_length}, {max_length_exclusive})")]
360    RandomString {
361        min_length: u64,
362        max_length_exclusive: u64,
363    },
364}
365
366#[must_use]
367pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
368    value.to_be_bytes()
369}
370
371#[must_use]
372pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
373    u64::from_be_bytes(bytes)
374}
375
376#[derive(
377    derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
378)]
379#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
380#[serde(tag = "type")]
381/// Must be created by the executor in [`PendingState::Locked`].
382pub enum HistoryEvent {
383    /// Persist a generated pseudorandom value.
384    #[display("Persist")]
385    Persist {
386        #[debug(skip)]
387        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
388        kind: PersistKind,
389    },
390    #[display("JoinSetCreate({join_set_id})")]
391    JoinSetCreate {
392        join_set_id: JoinSetId,
393        closing_strategy: ClosingStrategy,
394    },
395    #[display("JoinSetRequest({join_set_id}, {request})")]
396    JoinSetRequest {
397        join_set_id: JoinSetId,
398        request: JoinSetRequest,
399    },
400    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
401    /// When the response arrives at `resp_time`:
402    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
403    /// original executor can continue. After the expiry any executor can continue without
404    /// marking the execution as timed out.
405    #[display("JoinNext({join_set_id})")]
406    JoinNext {
407        join_set_id: JoinSetId,
408        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
409        /// The pending status will be kept in Locked state until `run_expires_at`.
410        run_expires_at: DateTime<Utc>,
411        /// Set to a specific function when calling `-await-next` extension function, used for
412        /// determinism checks.
413        requested_ffqn: Option<FunctionFqn>,
414        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
415        closing: bool,
416    },
417    /// Records the fact that a join set was awaited more times than its submission count.
418    #[display("JoinNextTooMany({join_set_id})")]
419    JoinNextTooMany {
420        join_set_id: JoinSetId,
421        /// Set to a specific function when calling `-await-next` extension function, used for
422        /// determinism checks.
423        requested_ffqn: Option<FunctionFqn>,
424    },
425    #[display("Schedule({execution_id}, {schedule_at})")]
426    Schedule {
427        execution_id: ExecutionId,
428        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
429    },
430    #[display("Stub({target_execution_id})")]
431    Stub {
432        target_execution_id: ExecutionIdDerived,
433        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
434        result: SupportedFunctionReturnValue, // Only stored for nondeterminism checks. TODO: Consider using a hashed value.
435        persist_result: Result<(), ()>, // Does the row (target_execution_id,Version:1) match the proposed `result`?
436    },
437}
438
439#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
440#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
441pub enum HistoryEventScheduleAt {
442    Now,
443    At(DateTime<Utc>),
444    #[display("In({_0:?})")]
445    In(Duration),
446}
447
448#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
449pub enum ScheduleAtConversionError {
450    #[error("source duration value is out of range")]
451    OutOfRangeError,
452}
453
454impl HistoryEventScheduleAt {
455    pub fn as_date_time(
456        &self,
457        now: DateTime<Utc>,
458    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
459        match self {
460            Self::Now => Ok(now),
461            Self::At(date_time) => Ok(*date_time),
462            Self::In(duration) => {
463                let time_delta = TimeDelta::from_std(*duration)
464                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
465                now.checked_add_signed(time_delta)
466                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
467            }
468        }
469    }
470}
471
472#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
473#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
474#[serde(tag = "type")]
475pub enum JoinSetRequest {
476    // Must be created by the executor in `PendingState::Locked`.
477    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
478    DelayRequest {
479        delay_id: DelayId,
480        expires_at: DateTime<Utc>,
481        schedule_at: HistoryEventScheduleAt,
482    },
483    // Must be created by the executor in `PendingState::Locked`.
484    #[display("ChildExecutionRequest({child_execution_id})")]
485    ChildExecutionRequest {
486        child_execution_id: ExecutionIdDerived,
487    },
488}
489
490/// Error that is not specific to an execution.
491#[derive(Debug, Clone, thiserror::Error, PartialEq)]
492pub enum DbErrorGeneric {
493    #[error("database error: {0}")]
494    Uncategorized(StrVariant), // Previously GenericError
495    #[error("database was closed")]
496    Close,
497}
498
499#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
500pub enum DbErrorWritePermanent {
501    /// Parameter error.
502    #[error("validation failed: {0}")]
503    ValidationFailed(StrVariant),
504    #[error("cannot write: {reason}")]
505    CannotWrite {
506        reason: StrVariant,
507        expected_version: Option<Version>,
508    },
509}
510
511/// Write error tied to an execution
512#[derive(Debug, Clone, thiserror::Error, PartialEq)]
513pub enum DbErrorWrite {
514    #[error("cannot write - row not found")]
515    NotFound,
516    /// Retrying will not resolve the error, the execution is in an incorrect state.
517    #[error("permanent error: {0}")]
518    Permanent(#[from] DbErrorWritePermanent),
519    #[error(transparent)]
520    DbErrorGeneric(#[from] DbErrorGeneric),
521}
522
523/// Read error tied to an execution
524#[derive(Debug, Clone, thiserror::Error, PartialEq)]
525pub enum DbErrorRead {
526    #[error("cannot read - row not found")]
527    NotFound,
528    #[error(transparent)]
529    DbErrorGeneric(#[from] DbErrorGeneric),
530}
531
532impl From<DbErrorRead> for DbErrorWrite {
533    fn from(value: DbErrorRead) -> DbErrorWrite {
534        match value {
535            DbErrorRead::NotFound => DbErrorWrite::NotFound,
536            DbErrorRead::DbErrorGeneric(err) => DbErrorWrite::DbErrorGeneric(err),
537        }
538    }
539}
540
541#[derive(Debug, thiserror::Error, PartialEq)]
542pub enum DbErrorReadWithTimeout {
543    #[error("timeout")]
544    Timeout,
545    #[error(transparent)]
546    DbErrorRead(#[from] DbErrorRead),
547}
548impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
549    fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
550        Self::from(DbErrorRead::from(value))
551    }
552}
553
554// Represents next version after successfuly appended to execution log.
555// TODO: Convert to struct with next_version
556pub type AppendResponse = Version;
557pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
558
559#[derive(Debug, Clone)]
560pub struct LockedExecution {
561    pub execution_id: ExecutionId,
562    pub next_version: Version,
563    pub metadata: ExecutionMetadata,
564    pub run_id: RunId,
565    pub ffqn: FunctionFqn,
566    pub params: Params,
567    pub event_history: Vec<HistoryEvent>,
568    pub responses: Vec<JoinSetResponseEventOuter>,
569    pub retry_exp_backoff: Duration,
570    pub max_retries: u32,
571    pub parent: Option<(ExecutionId, JoinSetId)>,
572    pub intermittent_event_count: u32,
573}
574
575pub type LockPendingResponse = Vec<LockedExecution>;
576pub type AppendBatchResponse = Version;
577
578#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
579#[display("{event}")]
580pub struct AppendRequest {
581    pub created_at: DateTime<Utc>,
582    pub event: ExecutionEventInner,
583}
584
585#[derive(Debug, Clone)]
586pub struct CreateRequest {
587    pub created_at: DateTime<Utc>,
588    pub execution_id: ExecutionId,
589    pub ffqn: FunctionFqn,
590    pub params: Params,
591    pub parent: Option<(ExecutionId, JoinSetId)>,
592    pub scheduled_at: DateTime<Utc>,
593    pub retry_exp_backoff: Duration,
594    pub max_retries: u32,
595    pub component_id: ComponentId,
596    pub metadata: ExecutionMetadata,
597    pub scheduled_by: Option<ExecutionId>,
598}
599
600impl From<CreateRequest> for ExecutionEventInner {
601    fn from(value: CreateRequest) -> Self {
602        Self::Created {
603            ffqn: value.ffqn,
604            params: value.params,
605            parent: value.parent,
606            scheduled_at: value.scheduled_at,
607            retry_exp_backoff: value.retry_exp_backoff,
608            max_retries: value.max_retries,
609            component_id: value.component_id,
610            metadata: value.metadata,
611            scheduled_by: value.scheduled_by,
612        }
613    }
614}
615
616#[async_trait]
617pub trait DbPool: Send + Sync {
618    fn connection(&self) -> Box<dyn DbConnection>;
619}
620
621#[async_trait]
622pub trait DbPoolCloseable {
623    async fn close(self);
624}
625
626#[derive(Clone, Debug)]
627pub struct AppendEventsToExecution {
628    pub execution_id: ExecutionId,
629    pub version: Version,
630    pub batch: Vec<AppendRequest>,
631}
632
633#[derive(Clone, Debug)]
634pub struct AppendResponseToExecution {
635    pub parent_execution_id: ExecutionId,
636    pub parent_response_event: JoinSetResponseEventOuter,
637}
638
639#[async_trait]
640pub trait DbExecutor: Send + Sync {
641    #[expect(clippy::too_many_arguments)]
642    async fn lock_pending(
643        &self,
644        batch_size: usize,
645        pending_at_or_sooner: DateTime<Utc>,
646        ffqns: Arc<[FunctionFqn]>,
647        created_at: DateTime<Utc>,
648        component_id: ComponentId,
649        executor_id: ExecutorId,
650        lock_expires_at: DateTime<Utc>,
651        run_id: RunId,
652    ) -> Result<LockPendingResponse, DbErrorGeneric>;
653
654    /// Specialized locking for e.g. extending the lock by the original executor and run.
655    #[expect(clippy::too_many_arguments)]
656    async fn lock_one(
657        &self,
658        created_at: DateTime<Utc>,
659        component_id: ComponentId,
660        execution_id: &ExecutionId,
661        run_id: RunId,
662        version: Version,
663        executor_id: ExecutorId,
664        lock_expires_at: DateTime<Utc>,
665    ) -> Result<LockedExecution, DbErrorWrite>;
666
667    /// Append a single event to an existing execution log.
668    /// The request cannot contain `ExecutionEventInner::Created`.
669    async fn append(
670        &self,
671        execution_id: ExecutionId,
672        version: Version,
673        req: AppendRequest,
674    ) -> Result<AppendResponse, DbErrorWrite>;
675
676    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
677    /// The batch cannot contain `ExecutionEventInner::Created`.
678    async fn append_batch_respond_to_parent(
679        &self,
680        events: AppendEventsToExecution,
681        response: AppendResponseToExecution,
682        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
683    ) -> Result<AppendBatchResponse, DbErrorWrite>;
684
685    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
686    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
687    /// Otherwise wait until `timeout_fut` resolves.
688    /// Timers that expire between `pending_at_or_sooner` and timeout can be disregarded.
689    async fn wait_for_pending(
690        &self,
691        pending_at_or_sooner: DateTime<Utc>,
692        ffqns: Arc<[FunctionFqn]>,
693        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
694    );
695}
696
697#[async_trait]
698pub trait DbConnection: DbExecutor {
699    async fn append_response(
700        &self,
701        created_at: DateTime<Utc>,
702        execution_id: ExecutionId,
703        response_event: JoinSetResponseEvent,
704    ) -> Result<(), DbErrorWrite>;
705
706    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
707    /// The batch cannot contain `ExecutionEventInner::Created`.
708    async fn append_batch(
709        &self,
710        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
711        batch: Vec<AppendRequest>,
712        execution_id: ExecutionId,
713        version: Version,
714    ) -> Result<AppendBatchResponse, DbErrorWrite>;
715
716    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
717    /// The batch cannot contain `ExecutionEventInner::Created`.
718    async fn append_batch_create_new_execution(
719        &self,
720        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
721        batch: Vec<AppendRequest>,   // must not contain `ExecutionEventInner::Created` events
722        execution_id: ExecutionId,
723        version: Version,
724        child_req: Vec<CreateRequest>,
725    ) -> Result<AppendBatchResponse, DbErrorWrite>;
726
727    #[cfg(feature = "test")]
728    /// Get execution log.
729    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
730
731    async fn list_execution_events(
732        &self,
733        execution_id: &ExecutionId,
734        since: &Version,
735        max_length: VersionType,
736        include_backtrace_id: bool,
737    ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
738
739    /// Get a single event without `backtrace_id`
740    async fn get_execution_event(
741        &self,
742        execution_id: &ExecutionId,
743        version: &Version,
744    ) -> Result<ExecutionEvent, DbErrorRead>;
745
746    async fn get_create_request(
747        &self,
748        execution_id: &ExecutionId,
749    ) -> Result<CreateRequest, DbErrorRead> {
750        let execution_event = self
751            .get_execution_event(execution_id, &Version::new(0))
752            .await?;
753        if let ExecutionEventInner::Created {
754            ffqn,
755            params,
756            parent,
757            scheduled_at,
758            retry_exp_backoff,
759            max_retries,
760            component_id,
761            metadata,
762            scheduled_by,
763        } = execution_event.event
764        {
765            Ok(CreateRequest {
766                created_at: execution_event.created_at,
767                execution_id: execution_id.clone(),
768                ffqn,
769                params,
770                parent,
771                scheduled_at,
772                retry_exp_backoff,
773                max_retries,
774                component_id,
775                metadata,
776                scheduled_by,
777            })
778        } else {
779            error!(%execution_id, "Execution log must start with creation");
780            Err(DbErrorRead::DbErrorGeneric(DbErrorGeneric::Uncategorized(
781                "execution log must start with creation".into(),
782            )))
783        }
784    }
785
786    async fn get_finished_result(
787        &self,
788        execution_id: &ExecutionId,
789        finished: PendingStateFinished,
790    ) -> Result<Option<SupportedFunctionReturnValue>, DbErrorRead> {
791        let last_event = self
792            .get_execution_event(execution_id, &Version::new(finished.version))
793            .await?;
794        if let ExecutionEventInner::Finished { result, .. } = last_event.event {
795            Ok(Some(result))
796        } else {
797            Ok(None)
798        }
799    }
800
801    async fn get_pending_state(
802        &self,
803        execution_id: &ExecutionId,
804    ) -> Result<PendingState, DbErrorRead>;
805
806    /// Get currently expired locks and async timers (delay requests)
807    async fn get_expired_timers(
808        &self,
809        at: DateTime<Utc>,
810    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
811
812    /// Create a new execution log
813    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
814
815    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
816    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
817    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
818    /// Implementations with no pubsub support should use polling.
819    async fn subscribe_to_next_responses(
820        &self,
821        execution_id: &ExecutionId,
822        start_idx: usize,
823        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
824    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
825
826    /// Notification mechainism with no strict guarantees for getting the finished result.
827    /// Implementations with no pubsub support should use polling.
828    async fn wait_for_finished_result(
829        &self,
830        execution_id: &ExecutionId,
831        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
832    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
833
834    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
835
836    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
837
838    /// Used by gRPC only.
839    /// Get the latest backtrace if version is not set.
840    async fn get_backtrace(
841        &self,
842        execution_id: &ExecutionId,
843        filter: BacktraceFilter,
844    ) -> Result<BacktraceInfo, DbErrorRead>;
845
846    /// Returns executions sorted in descending order.
847    /// Used by gRPC only.
848    async fn list_executions(
849        &self,
850        ffqn: Option<FunctionFqn>,
851        top_level_only: bool,
852        pagination: ExecutionListPagination,
853    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
854
855    /// Returns responses of an execution ordered as they arrived,
856    /// enabling matching each `JoinNext` to its corresponding response.
857    /// Used by gRPC only.
858    async fn list_responses(
859        &self,
860        execution_id: &ExecutionId,
861        pagination: Pagination<u32>,
862    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
863}
864
865#[derive(Clone)]
866pub enum BacktraceFilter {
867    First,
868    Last,
869    Specific(Version),
870}
871
872pub struct BacktraceInfo {
873    pub execution_id: ExecutionId,
874    pub component_id: ComponentId,
875    pub version_min_including: Version,
876    pub version_max_excluding: Version,
877    pub wasm_backtrace: WasmBacktrace,
878}
879
880#[derive(Serialize, Deserialize, Debug, Clone)]
881pub struct WasmBacktrace {
882    pub frames: Vec<FrameInfo>,
883}
884
885#[derive(Serialize, Deserialize, Debug, Clone)]
886pub struct FrameInfo {
887    pub module: String,
888    pub func_name: String,
889    pub symbols: Vec<FrameSymbol>,
890}
891
892#[derive(Serialize, Deserialize, Debug, Clone)]
893pub struct FrameSymbol {
894    pub func_name: Option<String>,
895    pub file: Option<String>,
896    pub line: Option<u32>,
897    pub col: Option<u32>,
898}
899
900mod wasm_backtrace {
901    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
902
903    impl WasmBacktrace {
904        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
905            if backtrace.frames().is_empty() {
906                None
907            } else {
908                Some(Self {
909                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
910                })
911            }
912        }
913    }
914
915    impl From<&wasmtime::FrameInfo> for FrameInfo {
916        fn from(frame: &wasmtime::FrameInfo) -> Self {
917            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
918            let mut func_name = String::new();
919            wasmtime_environ::demangle_function_name_or_index(
920                &mut func_name,
921                frame.func_name(),
922                frame.func_index() as usize,
923            )
924            .expect("writing to string must succeed");
925            Self {
926                module: module_name,
927                func_name,
928                symbols: frame
929                    .symbols()
930                    .iter()
931                    .map(std::convert::Into::into)
932                    .collect(),
933            }
934        }
935    }
936
937    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
938        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
939            let func_name = symbol.name().map(|name| {
940                let mut writer = String::new();
941                wasmtime_environ::demangle_function_name(&mut writer, name)
942                    .expect("writing to string must succeed");
943                writer
944            });
945
946            Self {
947                func_name,
948                file: symbol.file().map(ToString::to_string),
949                line: symbol.line(),
950                col: symbol.column(),
951            }
952        }
953    }
954}
955
956pub type ResponseCursorType = u32; // FIXME: Switch to u64
957
958#[derive(Debug, Clone)]
959pub struct ResponseWithCursor {
960    pub event: JoinSetResponseEventOuter,
961    pub cursor: ResponseCursorType,
962}
963
964#[derive(Debug)]
965pub struct ExecutionWithState {
966    pub execution_id: ExecutionId,
967    pub ffqn: FunctionFqn,
968    pub pending_state: PendingState,
969    pub created_at: DateTime<Utc>,
970    pub scheduled_at: DateTime<Utc>,
971}
972
973pub enum ExecutionListPagination {
974    CreatedBy(Pagination<Option<DateTime<Utc>>>),
975    ExecutionId(Pagination<Option<ExecutionId>>),
976}
977
978#[derive(Debug, Clone, Copy)]
979pub enum Pagination<T> {
980    NewerThan {
981        length: ResponseCursorType,
982        cursor: T,
983        including_cursor: bool,
984    },
985    OlderThan {
986        length: ResponseCursorType,
987        cursor: T,
988        including_cursor: bool,
989    },
990}
991impl<T> Pagination<T> {
992    pub fn length(&self) -> ResponseCursorType {
993        match self {
994            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
995        }
996    }
997    pub fn rel(&self) -> &'static str {
998        match self {
999            Pagination::NewerThan {
1000                including_cursor: false,
1001                ..
1002            } => ">",
1003            Pagination::NewerThan {
1004                including_cursor: true,
1005                ..
1006            } => ">=",
1007            Pagination::OlderThan {
1008                including_cursor: false,
1009                ..
1010            } => "<",
1011            Pagination::OlderThan {
1012                including_cursor: true,
1013                ..
1014            } => "<=",
1015        }
1016    }
1017    pub fn is_desc(&self) -> bool {
1018        matches!(self, Pagination::OlderThan { .. })
1019    }
1020}
1021
1022#[cfg(feature = "test")]
1023pub async fn wait_for_pending_state_fn<T: Debug>(
1024    db_connection: &dyn DbConnection,
1025    execution_id: &ExecutionId,
1026    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1027    timeout: Option<Duration>,
1028) -> Result<T, DbErrorReadWithTimeout> {
1029    tracing::trace!(%execution_id, "Waiting for predicate");
1030    let fut = async move {
1031        loop {
1032            let execution_log = db_connection.get(execution_id).await?;
1033            if let Some(t) = predicate(execution_log) {
1034                tracing::debug!(%execution_id, "Found: {t:?}");
1035                return Ok(t);
1036            }
1037            tokio::time::sleep(Duration::from_millis(10)).await;
1038        }
1039    };
1040
1041    if let Some(timeout) = timeout {
1042        tokio::select! { // future's liveness: Dropping the loser immediately.
1043            res = fut => res,
1044            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1045        }
1046    } else {
1047        fut.await
1048    }
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq)]
1052pub enum ExpiredTimer {
1053    Lock(ExpiredLock),
1054    Delay(ExpiredDelay),
1055}
1056
1057#[derive(Debug, Clone, PartialEq, Eq)]
1058pub struct ExpiredLock {
1059    pub execution_id: ExecutionId,
1060    // Version of last `Locked` event, used to detect whether the execution made progress.
1061    pub locked_at_version: Version,
1062    pub next_version: Version,
1063    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
1064    pub intermittent_event_count: u32,
1065    pub max_retries: u32,
1066    pub retry_exp_backoff: Duration,
1067    pub parent: Option<(ExecutionId, JoinSetId)>, // TODO: Remove
1068}
1069
1070#[derive(Debug, Clone, PartialEq, Eq)]
1071pub struct ExpiredDelay {
1072    pub execution_id: ExecutionId,
1073    pub join_set_id: JoinSetId,
1074    pub delay_id: DelayId,
1075}
1076
1077#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1078#[serde(tag = "type")]
1079pub enum PendingState {
1080    #[display("Locked(`{lock_expires_at}`, {executor_id}, {run_id})")]
1081    Locked {
1082        executor_id: ExecutorId,
1083        run_id: RunId,
1084        lock_expires_at: DateTime<Utc>,
1085    },
1086    #[display("PendingAt(`{scheduled_at}`)")]
1087    PendingAt { scheduled_at: DateTime<Utc> }, // e.g. created with a schedule, temporary timeout/failure
1088    #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1089    /// Caused by [`HistoryEvent::JoinNext`]
1090    BlockedByJoinSet {
1091        join_set_id: JoinSetId,
1092        /// See [`HistoryEvent::JoinNext::lock_expires_at`].
1093        lock_expires_at: DateTime<Utc>,
1094        /// Blocked by closing of the join set
1095        closing: bool,
1096    },
1097    #[display("Finished({finished})")]
1098    Finished { finished: PendingStateFinished },
1099}
1100
1101#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1102#[display("{result_kind}")]
1103pub struct PendingStateFinished {
1104    pub version: VersionType, // not Version since it must be Copy
1105    pub finished_at: DateTime<Utc>,
1106    pub result_kind: PendingStateFinishedResultKind,
1107}
1108
1109#[derive(
1110    Debug, Clone, Copy, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
1111)]
1112pub struct PendingStateFinishedResultKind(pub Result<(), PendingStateFinishedError>);
1113const OK_VARIANT: &str = "ok";
1114impl FromStr for PendingStateFinishedResultKind {
1115    type Err = strum::ParseError;
1116
1117    fn from_str(s: &str) -> Result<Self, Self::Err> {
1118        if s == OK_VARIANT {
1119            Ok(PendingStateFinishedResultKind(Ok(())))
1120        } else {
1121            let err = PendingStateFinishedError::from_str(s)?;
1122            Ok(PendingStateFinishedResultKind(Err(err)))
1123        }
1124    }
1125}
1126
1127impl Display for PendingStateFinishedResultKind {
1128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1129        match self.0 {
1130            Ok(()) => write!(f, "{OK_VARIANT}"),
1131            Err(err) => write!(f, "{err}"),
1132        }
1133    }
1134}
1135
1136impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1137    fn from(result: &SupportedFunctionReturnValue) -> Self {
1138        result.as_pending_state_finished_result()
1139    }
1140}
1141
1142#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumString, strum::Display)]
1143#[cfg_attr(test, derive(strum::VariantArray))]
1144#[strum(serialize_all = "snake_case")]
1145pub enum PendingStateFinishedError {
1146    Timeout,
1147    ExecutionFailure,
1148    FallibleError,
1149}
1150
1151impl PendingState {
1152    pub fn can_append_lock(
1153        &self,
1154        created_at: DateTime<Utc>,
1155        executor_id: ExecutorId,
1156        run_id: RunId,
1157        lock_expires_at: DateTime<Utc>,
1158    ) -> Result<LockKind, DbErrorWritePermanent> {
1159        if lock_expires_at <= created_at {
1160            return Err(DbErrorWritePermanent::ValidationFailed(
1161                "invalid expiry date".into(),
1162            ));
1163        }
1164        match self {
1165            PendingState::PendingAt { scheduled_at } => {
1166                if *scheduled_at <= created_at {
1167                    // pending now, ok to lock
1168                    Ok(LockKind::CreatingNewLock)
1169                } else {
1170                    Err(DbErrorWritePermanent::ValidationFailed(
1171                        "cannot lock, not yet pending".into(),
1172                    ))
1173                }
1174            }
1175            PendingState::Locked {
1176                executor_id: current_pending_state_executor_id,
1177                run_id: current_pending_state_run_id,
1178                ..
1179            } => {
1180                if executor_id == *current_pending_state_executor_id
1181                    && run_id == *current_pending_state_run_id
1182                {
1183                    // Original executor is extending the lock.
1184                    Ok(LockKind::Extending)
1185                } else {
1186                    Err(DbErrorWritePermanent::CannotWrite {
1187                        reason: "cannot lock, already locked".into(),
1188                        expected_version: None,
1189                    })
1190                }
1191            }
1192            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWritePermanent::CannotWrite {
1193                reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1194                expected_version: None,
1195            }),
1196            PendingState::Finished { .. } => Err(DbErrorWritePermanent::CannotWrite {
1197                reason: "already finished".into(),
1198                expected_version: None,
1199            }),
1200        }
1201    }
1202
1203    #[must_use]
1204    pub fn is_finished(&self) -> bool {
1205        matches!(self, PendingState::Finished { .. })
1206    }
1207}
1208
1209#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1210pub enum LockKind {
1211    Extending,
1212    CreatingNewLock,
1213}
1214
1215pub mod http_client_trace {
1216    use chrono::{DateTime, Utc};
1217    use serde::{Deserialize, Serialize};
1218
1219    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1220    pub struct HttpClientTrace {
1221        pub req: RequestTrace,
1222        pub resp: Option<ResponseTrace>,
1223    }
1224
1225    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1226    pub struct RequestTrace {
1227        pub sent_at: DateTime<Utc>,
1228        pub uri: String,
1229        pub method: String,
1230    }
1231
1232    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1233    pub struct ResponseTrace {
1234        pub finished_at: DateTime<Utc>,
1235        pub status: Result<u16, String>,
1236    }
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241    use super::HistoryEventScheduleAt;
1242    use super::PendingStateFinished;
1243    use super::PendingStateFinishedError;
1244    use super::PendingStateFinishedResultKind;
1245    use crate::SupportedFunctionReturnValue;
1246    use chrono::DateTime;
1247    use chrono::Datelike;
1248    use chrono::Utc;
1249    use insta::assert_snapshot;
1250    use rstest::rstest;
1251    use std::time::Duration;
1252    use strum::VariantArray as _;
1253    use val_json::type_wrapper::TypeWrapper;
1254    use val_json::wast_val::WastVal;
1255    use val_json::wast_val::WastValWithType;
1256
1257    #[rstest(expected => [
1258        PendingStateFinishedResultKind(Result::Ok(())),
1259        PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1260    ])]
1261    #[test]
1262    fn serde_pending_state_finished_result_kind_should_work(
1263        expected: PendingStateFinishedResultKind,
1264    ) {
1265        let ser = serde_json::to_string(&expected).unwrap();
1266        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1267        assert_eq!(expected, actual);
1268    }
1269
1270    #[rstest(result_kind => [
1271        PendingStateFinishedResultKind(Result::Ok(())),
1272        PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1273    ])]
1274    #[test]
1275    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1276        let expected = PendingStateFinished {
1277            version: 0,
1278            finished_at: Utc::now(),
1279            result_kind,
1280        };
1281
1282        let ser = serde_json::to_string(&expected).unwrap();
1283        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1284        assert_eq!(expected, actual);
1285    }
1286
1287    #[test]
1288    fn join_set_deser_with_result_ok_option_none_should_work() {
1289        let expected = SupportedFunctionReturnValue::Ok {
1290            ok: Some(WastValWithType {
1291                r#type: TypeWrapper::Result {
1292                    ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1293                    err: Some(Box::new(TypeWrapper::String)),
1294                },
1295                value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1296            }),
1297        };
1298        let json = serde_json::to_string(&expected).unwrap();
1299        assert_snapshot!(json);
1300
1301        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1302
1303        assert_eq!(expected, actual);
1304    }
1305
1306    #[test]
1307    fn verify_pending_state_finished_result_kind_serde() {
1308        let variants: Vec<_> = PendingStateFinishedError::VARIANTS
1309            .iter()
1310            .map(|var| PendingStateFinishedResultKind(Err(*var)))
1311            .chain(std::iter::once(PendingStateFinishedResultKind(Ok(()))))
1312            .collect();
1313        let ser = serde_json::to_string_pretty(&variants).unwrap();
1314        insta::assert_snapshot!(ser);
1315        let deser: Vec<PendingStateFinishedResultKind> = serde_json::from_str(&ser).unwrap();
1316        assert_eq!(variants, deser);
1317    }
1318
1319    #[test]
1320    fn as_date_time_should_work_with_duration_u32_max_secs() {
1321        let duration = Duration::from_secs(u64::from(u32::MAX));
1322        let schedule_at = HistoryEventScheduleAt::In(duration);
1323        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1324        assert_eq!(2106, resolved.year());
1325    }
1326
1327    const MILLIS_PER_SEC: i64 = 1000;
1328    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1329
1330    #[test]
1331    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1332        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
1333        let duration = Duration::from_secs(
1334            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1335        );
1336        let schedule_at = HistoryEventScheduleAt::In(duration);
1337        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1338    }
1339}