Skip to main content

obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionError;
8use crate::FunctionFqn;
9use crate::JoinSetId;
10use crate::Params;
11use crate::StrVariant;
12use crate::SupportedFunctionReturnValue;
13use crate::component_id::ComponentDigest;
14use crate::prefixed_ulid::DelayId;
15use crate::prefixed_ulid::DeploymentId;
16use crate::prefixed_ulid::ExecutionIdDerived;
17use crate::prefixed_ulid::ExecutorId;
18use crate::prefixed_ulid::RunId;
19use assert_matches::assert_matches;
20use async_trait::async_trait;
21use chrono::TimeDelta;
22use chrono::{DateTime, Utc};
23use http_client_trace::HttpClientTrace;
24use serde::Deserialize;
25use serde::Serialize;
26use std::fmt::Debug;
27use std::fmt::Display;
28use std::panic::Location;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33use tracing::instrument;
34use tracing_error::SpanTrace;
35
36// Shared between databases. TODO: Extract to db-common
37pub const STATE_PENDING_AT: &str = "pending_at";
38pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
39pub const STATE_LOCKED: &str = "locked";
40pub const STATE_FINISHED: &str = "finished";
41pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; // Serialization tag of `HistoryEvent::JoinNext`
42
43#[derive(Debug, PartialEq, Eq, Clone)]
44pub struct ExecutionLog {
45    pub execution_id: ExecutionId,
46    pub events: Vec<ExecutionEvent>,
47    pub responses: Vec<ResponseWithCursor>,
48    pub next_version: Version, // Is not advanced once in Finished state
49    pub pending_state: PendingState, // reflecting the current state
50    pub component_digest: ComponentDigest, // reflecting the current state
51    pub component_type: ComponentType,
52    pub deployment_id: DeploymentId, // reflecting the current state
53}
54
55impl ExecutionLog {
56    /// Return some duration after which the execution will be retried.
57    /// Return `None` if no more retries are allowed.
58    #[must_use]
59    pub fn can_be_retried_after(
60        temporary_event_count: u32,
61        max_retries: Option<u32>,
62        retry_exp_backoff: Duration,
63    ) -> Option<Duration> {
64        // If max_retries == None, wrapping is OK after this succeeds - we want to retry forever.
65        if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
66            // TODO: Add test for number of retries
67            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
68            Some(duration)
69        } else {
70            None
71        }
72    }
73
74    #[must_use]
75    pub fn compute_retry_duration_when_retrying_forever(
76        temporary_event_count: u32,
77        retry_exp_backoff: Duration,
78    ) -> Duration {
79        Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
80            .expect("`max_retries` set to MAX must never return None")
81    }
82
83    #[must_use]
84    pub fn ffqn(&self) -> &FunctionFqn {
85        assert_matches!(self.events.first(), Some(ExecutionEvent {
86            event: ExecutionRequest::Created { ffqn, .. },
87            ..
88        }) => ffqn)
89    }
90
91    #[must_use]
92    pub fn params(&self) -> &Params {
93        assert_matches!(self.events.first(), Some(ExecutionEvent {
94            event: ExecutionRequest::Created { params, .. },
95            ..
96        }) => params)
97    }
98
99    #[must_use]
100    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
101        assert_matches!(self.events.first(), Some(ExecutionEvent {
102            event: ExecutionRequest::Created { parent, .. },
103            ..
104        }) => parent.clone())
105    }
106
107    #[must_use]
108    pub fn last_event(&self) -> &ExecutionEvent {
109        self.events.last().expect("must contain at least one event")
110    }
111
112    #[must_use]
113    pub fn is_finished(&self) -> bool {
114        matches!(
115            self.events.last(),
116            Some(ExecutionEvent {
117                event: ExecutionRequest::Finished { .. },
118                ..
119            })
120        )
121    }
122
123    #[must_use]
124    pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
125        if let ExecutionEvent {
126            event: ExecutionRequest::Finished { retval: result, .. },
127            ..
128        } = self.events.last().expect("must contain at least one event")
129        {
130            Some(result.clone())
131        } else {
132            None
133        }
134    }
135
136    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
137        self.events.iter().filter_map(|event| {
138            if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
139                Some((eh.clone(), event.version.clone()))
140            } else {
141                None
142            }
143        })
144    }
145
146    #[cfg(feature = "test")]
147    #[must_use]
148    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
149        self.events
150            .iter()
151            .find_map(move |event| match &event.event {
152                ExecutionRequest::HistoryEvent {
153                    event:
154                        HistoryEvent::JoinSetRequest {
155                            join_set_id: found,
156                            request,
157                        },
158                    ..
159                } if *join_set_id == *found => Some(request),
160                _ => None,
161            })
162    }
163}
164
165pub type VersionType = u32;
166#[derive(
167    Debug,
168    Default,
169    Clone,
170    PartialEq,
171    Eq,
172    Hash,
173    derive_more::Display,
174    derive_more::Into,
175    serde::Serialize,
176    serde::Deserialize,
177    schemars::JsonSchema,
178)]
179#[serde(transparent)]
180#[schemars(transparent)]
181pub struct Version(pub VersionType);
182impl Version {
183    #[must_use]
184    pub fn new(arg: VersionType) -> Version {
185        Version(arg)
186    }
187
188    #[must_use]
189    pub fn increment(&self) -> Version {
190        Version(self.0 + 1)
191    }
192}
193impl TryFrom<i64> for Version {
194    type Error = VersionParseError;
195    fn try_from(value: i64) -> Result<Self, Self::Error> {
196        VersionType::try_from(value)
197            .map(Version::new)
198            .map_err(|_| VersionParseError)
199    }
200}
201impl From<Version> for usize {
202    fn from(value: Version) -> Self {
203        usize::try_from(value.0).expect("16 bit systems are unsupported")
204    }
205}
206impl From<&Version> for usize {
207    fn from(value: &Version) -> Self {
208        usize::try_from(value.0).expect("16 bit systems are unsupported")
209    }
210}
211
212#[derive(Debug, thiserror::Error)]
213#[error("version must be u32")]
214pub struct VersionParseError;
215
216#[derive(
217    Clone,
218    Debug,
219    derive_more::Display,
220    PartialEq,
221    Eq,
222    serde::Serialize,
223    serde::Deserialize,
224    schemars::JsonSchema,
225)]
226#[display("{event}")]
227pub struct ExecutionEvent {
228    pub created_at: DateTime<Utc>,
229    pub event: ExecutionRequest,
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub backtrace_id: Option<Version>,
232    pub version: Version,
233}
234
235#[derive(
236    Debug,
237    Clone,
238    Copy,
239    PartialEq,
240    Eq,
241    derive_more::Display,
242    derive_more::Into,
243    Serialize, /* webapi */
244    schemars::JsonSchema,
245)]
246pub struct ResponseCursor(pub u32);
247
248#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */, schemars::JsonSchema)]
249pub struct ResponseWithCursor {
250    pub event: JoinSetResponseEventOuter,
251    pub cursor: ResponseCursor,
252}
253
254#[derive(Debug)]
255pub struct ListExecutionEventsResponse {
256    pub events: Vec<ExecutionEvent>,
257    pub max_version: Version,
258}
259
260#[derive(Debug)]
261pub struct ListResponsesResponse {
262    pub responses: Vec<ResponseWithCursor>,
263    pub max_cursor: ResponseCursor,
264}
265
266#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */, schemars::JsonSchema)]
267pub struct JoinSetResponseEventOuter {
268    pub created_at: DateTime<Utc>,
269    pub event: JoinSetResponseEvent,
270}
271
272#[derive(
273    Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
274)]
275pub struct JoinSetResponseEvent {
276    pub join_set_id: JoinSetId,
277    pub event: JoinSetResponse,
278}
279
280#[derive(
281    Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display, schemars::JsonSchema,
282)]
283#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
284#[serde(tag = "type", rename_all = "snake_case")]
285pub enum JoinSetResponse {
286    #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
287    DelayFinished {
288        delay_id: DelayId,
289        result: Result<(), ()>,
290    },
291    #[display("{result}: {child_execution_id}")] // execution completed..
292    ChildExecutionFinished {
293        child_execution_id: ExecutionIdDerived,
294        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
295        finished_version: Version,
296        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
297        result: SupportedFunctionReturnValue,
298    },
299}
300
301pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
302    ffqn: FunctionFqn::new_static("", ""),
303    params: Params::empty(),
304    parent: None,
305    scheduled_at: DateTime::from_timestamp_nanos(0),
306    component_id: ComponentId::dummy_activity(),
307    deployment_id: DeploymentId::from_parts(0, 0),
308    metadata: ExecutionMetadata::empty(),
309    scheduled_by: None,
310};
311pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
312    event: HistoryEvent::JoinSetCreate {
313        join_set_id: JoinSetId {
314            kind: crate::JoinSetKind::OneOff,
315            name: StrVariant::empty(),
316        },
317    },
318};
319
320#[derive(
321    Clone,
322    derive_more::Debug,
323    derive_more::Display,
324    PartialEq,
325    Eq,
326    Serialize,
327    Deserialize,
328    schemars::JsonSchema,
329)]
330#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
331#[serde(rename_all = "snake_case")]
332pub enum ExecutionRequest {
333    #[display("Created({ffqn}, `{scheduled_at}`)")]
334    Created {
335        ffqn: FunctionFqn,
336        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
337        #[debug(skip)]
338        params: Params,
339        parent: Option<(ExecutionId, JoinSetId)>,
340        scheduled_at: DateTime<Utc>,
341        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
342        component_id: ComponentId,
343        deployment_id: DeploymentId,
344        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
345        metadata: ExecutionMetadata,
346        scheduled_by: Option<ExecutionId>,
347    },
348    Locked(Locked),
349    /// Returns execution to [`PendingState::PendingAt`] state at the specified time.
350    /// This can happen when:
351    /// - executor is running out of resources like [`WorkerError::LimitReached`]
352    /// - workflow made progress but then its lock expired.
353    /// - executor is being closed (shutdown or hot redeploy requested)
354    #[display("Unlocked(`{backoff_expires_at}`)")]
355    Unlocked {
356        backoff_expires_at: DateTime<Utc>,
357        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
358        reason: StrVariant,
359    },
360    // Created by the executor holding the lock.
361    // After expiry interpreted as pending.
362    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
363    TemporarilyFailed {
364        backoff_expires_at: DateTime<Utc>,
365        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
366        reason: StrVariant,
367        detail: Option<String>,
368        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
369        http_client_traces: Option<Vec<HttpClientTrace>>,
370    },
371    // Created by the executor holding the lock.
372    // After expiry interpreted as pending.
373    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
374    TemporarilyTimedOut {
375        backoff_expires_at: DateTime<Utc>,
376        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
377        http_client_traces: Option<Vec<HttpClientTrace>>,
378    },
379    // Created by the executor holding the lock.
380    #[display("Finished")]
381    Finished {
382        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
383        retval: SupportedFunctionReturnValue,
384        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
385        http_client_traces: Option<Vec<HttpClientTrace>>,
386    },
387
388    #[display("HistoryEvent({event})")]
389    HistoryEvent {
390        event: HistoryEvent,
391    },
392    #[display("Paused")]
393    Paused,
394    #[display("Unpaused")]
395    Unpaused,
396}
397
398impl ExecutionRequest {
399    #[must_use]
400    pub fn is_temporary_event(&self) -> bool {
401        matches!(
402            self,
403            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
404        )
405    }
406
407    /// String representation of `ExecutionRequest`, used in execution log table to fetch events of certain type, e.g. `created` + `history_event`.
408    #[must_use]
409    pub const fn variant(&self) -> &'static str {
410        match self {
411            ExecutionRequest::Created { .. } => "created",
412            ExecutionRequest::Locked(_) => "locked",
413            ExecutionRequest::Unlocked { .. } => "unlocked",
414            ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
415            ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
416            ExecutionRequest::Finished { .. } => "finished",
417            ExecutionRequest::HistoryEvent { .. } => "history_event",
418            ExecutionRequest::Paused => "paused",
419            ExecutionRequest::Unpaused => "unpaused",
420        }
421    }
422
423    #[must_use]
424    pub fn join_set_id(&self) -> Option<&JoinSetId> {
425        match self {
426            Self::Created {
427                parent: Some((_parent_id, join_set_id)),
428                ..
429            } => Some(join_set_id),
430            Self::HistoryEvent {
431                event:
432                    HistoryEvent::JoinSetCreate { join_set_id, .. }
433                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
434                    | HistoryEvent::JoinNext { join_set_id, .. },
435            } => Some(join_set_id),
436            _ => None,
437        }
438    }
439}
440
441#[derive(
442    Clone,
443    derive_more::Debug,
444    derive_more::Display,
445    PartialEq,
446    Eq,
447    Serialize,
448    Deserialize,
449    schemars::JsonSchema,
450)]
451#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
452#[display("Locked(`{lock_expires_at}`, {component_id})")]
453pub struct Locked {
454    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
455    pub component_id: ComponentId,
456    pub executor_id: ExecutorId,
457    pub deployment_id: DeploymentId,
458    pub run_id: RunId,
459    pub lock_expires_at: DateTime<Utc>,
460    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
461    pub retry_config: ComponentRetryConfig,
462}
463
464#[derive(
465    Debug,
466    Clone,
467    Copy,
468    PartialEq,
469    Eq,
470    derive_more::Display,
471    Serialize,
472    Deserialize,
473    schemars::JsonSchema,
474)]
475#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
476#[serde(tag = "type", rename_all = "snake_case")]
477pub enum PersistKind {
478    #[display("RandomU64({min}, {max_inclusive})")]
479    RandomU64 {
480        min: u64,
481        max_inclusive: u64,
482    },
483    #[display("RandomString({min_length}, {max_length_exclusive})")]
484    RandomString {
485        min_length: u64,
486        max_length_exclusive: u64,
487    },
488    ExecutionId,
489}
490
491#[must_use]
492pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
493    value.to_be_bytes()
494}
495
496#[derive(
497    derive_more::Debug,
498    Clone,
499    PartialEq,
500    Eq,
501    derive_more::Display,
502    Serialize,
503    Deserialize,
504    schemars::JsonSchema,
505)]
506#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
507#[serde(tag = "type", rename_all = "snake_case")]
508/// Must be created by the executor in [`PendingState::Locked`].
509pub enum HistoryEvent {
510    /// Persist a generated pseudorandom value.
511    #[display("Persist")]
512    Persist {
513        #[debug(skip)]
514        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
515        kind: PersistKind,
516    },
517    #[display("JoinSetCreate({join_set_id})")]
518    JoinSetCreate { join_set_id: JoinSetId },
519    #[display("JoinSetRequest({request})")]
520    // join_set_id is part of ExecutionId or DelayId in the `request`
521    JoinSetRequest {
522        join_set_id: JoinSetId,
523        request: JoinSetRequest,
524    },
525    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
526    /// When the response arrives at `resp_time`:
527    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
528    /// original executor can continue. After the expiry any executor can continue without
529    /// marking the execution as timed out.
530    #[display("JoinNext({join_set_id})")]
531    JoinNext {
532        join_set_id: JoinSetId,
533        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
534        /// The pending status will be kept in Locked state until `run_expires_at`.
535        run_expires_at: DateTime<Utc>,
536        /// Set to a specific function when calling `-await-next` extension function, used for
537        /// determinism checks.
538        requested_ffqn: Option<FunctionFqn>,
539        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
540        closing: bool,
541    },
542    /// Attempt to process next response without changing the pending state.
543    #[display("JoinNextTry({join_set_id}, {outcome})")]
544    JoinNextTry {
545        join_set_id: JoinSetId,
546        outcome: JoinNextTryOutcome,
547    },
548    /// Records the fact that a join set was awaited more times than its submission count.
549    #[display("JoinNextTooMany({join_set_id})")]
550    JoinNextTooMany {
551        join_set_id: JoinSetId,
552        /// Set to a specific function when calling `-await-next` extension function, used for
553        /// determinism checks.
554        requested_ffqn: Option<FunctionFqn>,
555    },
556    #[display("Schedule({execution_id}, {schedule_at})")]
557    Schedule {
558        execution_id: ExecutionId,
559        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
560        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
561        result: Result<(), ScheduleRequestError>,
562    },
563    #[display("Stub({target_execution_id})")]
564    Stub {
565        target_execution_id: ExecutionIdDerived,
566        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StubRetVal::Typed(crate::SUPPORTED_RETURN_VALUE_OK_EMPTY).hash()))]
567        retval_hash: StubRetValHash,
568        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
569        result: Result<(), StubError>,
570    },
571}
572
573/// Stub return value - only used during processing, not stored in history.
574#[derive(derive_more::Debug, Clone, PartialEq, Eq)]
575#[cfg_attr(any(test, feature = "test"), derive(Serialize, Deserialize))]
576#[cfg_attr(any(test, feature = "test"), serde(rename_all = "snake_case"))]
577pub enum StubRetVal {
578    Typed(SupportedFunctionReturnValue),
579    Untyped(String),
580}
581
582impl StubRetVal {
583    /// Compute a stable hash of the return value for determinism checks.
584    #[must_use]
585    pub fn hash(&self) -> StubRetValHash {
586        use sha2::{Digest as _, Sha256};
587        const STUB_RETVAL_HASH_VERSION: u8 = 1;
588        let mut hasher = Sha256::default();
589
590        match self {
591            StubRetVal::Typed(val) => {
592                hasher.update(b"T|");
593                // Serialize to JSON for stable hashing
594                let json = serde_json::to_string(val)
595                    .expect("SupportedFunctionReturnValue is always serializable");
596                hasher.update(json.as_bytes());
597            }
598            StubRetVal::Untyped(s) => {
599                hasher.update(b"U|");
600                hasher.update(s.as_bytes());
601            }
602        }
603
604        let hash_bytes = hasher.finalize();
605        let mut result = [0u8; 33];
606        result[0] = STUB_RETVAL_HASH_VERSION;
607        result[1..].copy_from_slice(&hash_bytes);
608
609        StubRetValHash(result)
610    }
611}
612
613/// Hash of a stub return value, stored in history for determinism checks.
614/// Format: 1 byte version + 32 bytes SHA-256 hash.
615#[derive(
616    Clone,
617    PartialEq,
618    Eq,
619    serde_with::SerializeDisplay,
620    serde_with::DeserializeFromStr,
621    schemars::JsonSchema,
622)]
623#[schemars(with = "String")]
624pub struct StubRetValHash([u8; 33]);
625
626impl Display for StubRetValHash {
627    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
628        for b in self.0 {
629            write!(f, "{b:02x}")?;
630        }
631        Ok(())
632    }
633}
634
635impl Debug for StubRetValHash {
636    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
637        Display::fmt(self, f)
638    }
639}
640
641impl std::str::FromStr for StubRetValHash {
642    type Err = StubRetValHashParseError;
643
644    fn from_str(s: &str) -> Result<Self, Self::Err> {
645        if s.len() != 66 {
646            // 33 bytes * 2 hex chars = 66
647            return Err(StubRetValHashParseError::InvalidLength(s.len()));
648        }
649        let mut bytes = [0u8; 33];
650        for i in 0..33 {
651            let chunk = &s[i * 2..i * 2 + 2];
652            bytes[i] =
653                u8::from_str_radix(chunk, 16).map_err(|_| StubRetValHashParseError::InvalidHex)?;
654        }
655        Ok(StubRetValHash(bytes))
656    }
657}
658
659#[derive(Debug, thiserror::Error)]
660pub enum StubRetValHashParseError {
661    #[error("invalid length: expected 66 hex chars, got {0}")]
662    InvalidLength(usize),
663    #[error("invalid hex character")]
664    InvalidHex,
665}
666
667/// Error from the `-stub` extension function.
668/// Mirrors `obelisk:types/execution.{stub-error}` from WIT.
669#[derive(
670    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
671)]
672#[serde(rename_all = "snake_case")]
673pub enum StubError {
674    #[error("execution not found")]
675    ExecutionNotFound,
676    #[error("type check error: {0}")]
677    TypeCheckError(String),
678    #[error("conflict")]
679    Conflict,
680}
681
682/// Error from the `schedule-json` function. Persisted in history for determinism.
683#[derive(
684    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
685)]
686#[serde(rename_all = "snake_case")]
687pub enum ScheduleRequestError {
688    #[error("function not found")]
689    FunctionNotFound,
690    #[error("params parsing error: {0}")]
691    TypeCheckError(String),
692}
693
694/// Error from the `submit-json` function. Persisted in history for determinism.
695#[derive(
696    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
697)]
698#[serde(rename_all = "snake_case")]
699pub enum ChildExecutionRequestError {
700    #[error("function not found")]
701    FunctionNotFound,
702    #[error("params parsing error: {0}")]
703    TypeCheckError(String),
704}
705
706#[derive(
707    Debug,
708    Clone,
709    Copy,
710    PartialEq,
711    Eq,
712    derive_more::Display,
713    Serialize,
714    Deserialize,
715    schemars::JsonSchema,
716)]
717#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
718#[serde(rename_all = "snake_case")]
719pub enum JoinNextTryOutcome {
720    /// A response was found and processed.
721    #[display("found")]
722    Found,
723    /// No response available, but there are still pending requests.
724    #[display("pending")]
725    Pending,
726    /// No response available, and all requests have been processed.
727    #[display("all_processed")]
728    AllProcessed,
729}
730
731impl From<bool> for JoinNextTryOutcome {
732    /// Migration helper: converts old `found_response: bool` to the new enum.
733    /// `false` maps to `Pending` as a conservative default (the exact error
734    /// was not stored before).
735    fn from(found_response: bool) -> Self {
736        if found_response {
737            JoinNextTryOutcome::Found
738        } else {
739            JoinNextTryOutcome::Pending
740        }
741    }
742}
743
744#[derive(
745    Debug,
746    Clone,
747    Copy,
748    PartialEq,
749    Eq,
750    derive_more::Display,
751    Serialize,
752    Deserialize,
753    schemars::JsonSchema,
754)]
755#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
756#[serde(rename_all = "snake_case")]
757pub enum HistoryEventScheduleAt {
758    Now,
759    #[display("At(`{_0}`)")]
760    At(DateTime<Utc>),
761    #[display("In({_0:?})")]
762    In(Duration),
763}
764
765#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
766pub enum ScheduleAtConversionError {
767    #[error("source duration value is out of range")]
768    OutOfRangeError,
769}
770
771impl HistoryEventScheduleAt {
772    pub fn as_date_time(
773        &self,
774        now: DateTime<Utc>,
775    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
776        match self {
777            Self::Now => Ok(now),
778            Self::At(date_time) => Ok(*date_time),
779            Self::In(duration) => {
780                let time_delta = TimeDelta::from_std(*duration)
781                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
782                now.checked_add_signed(time_delta)
783                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
784            }
785        }
786    }
787}
788
789#[derive(
790    Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize, schemars::JsonSchema,
791)]
792#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
793#[serde(tag = "type", rename_all = "snake_case")]
794pub enum JoinSetRequest {
795    // Must be created by the executor in `PendingState::Locked`.
796    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
797    DelayRequest {
798        delay_id: DelayId,
799        expires_at: DateTime<Utc>,
800        schedule_at: HistoryEventScheduleAt,
801    },
802    // Must be created by the executor in `PendingState::Locked`.
803    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
804    ChildExecutionRequest {
805        child_execution_id: ExecutionIdDerived,
806        target_ffqn: FunctionFqn,
807        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
808        params: Params,
809        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
810        result: Result<(), ChildExecutionRequestError>,
811    },
812}
813
814/// Error that is not specific to an execution.
815#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
816pub enum DbErrorGeneric {
817    #[error("database error: {reason}")]
818    Uncategorized {
819        reason: StrVariant,
820        #[eq(skip)]
821        #[partial_eq(skip)]
822        context: SpanTrace,
823        #[eq(skip)]
824        #[partial_eq(skip)]
825        #[source]
826        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
827        loc: &'static Location<'static>,
828    },
829    #[error("database was closed")]
830    Close,
831}
832
833#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
834pub enum DbErrorWriteNonRetriable {
835    #[error("validation failed: {0}")]
836    ValidationFailed(StrVariant),
837    #[error("conflict")]
838    Conflict,
839    #[error("already finished")]
840    AlreadyFinished,
841    #[error("illegal state: {reason}")]
842    IllegalState {
843        reason: StrVariant,
844        #[eq(skip)]
845        #[partial_eq(skip)]
846        context: SpanTrace,
847        #[eq(skip)]
848        #[partial_eq(skip)]
849        #[source]
850        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
851        loc: &'static Location<'static>,
852    },
853    #[error("version conflict: expected: {expected}, got: {requested}")]
854    VersionConflict {
855        expected: Version,
856        requested: Version,
857    },
858}
859
860/// Write error tied to an execution
861#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
862pub enum DbErrorWrite {
863    #[error("cannot write - row not found")]
864    NotFound,
865    #[error("non-retriable error: {0}")]
866    NonRetriable(#[from] DbErrorWriteNonRetriable),
867    #[error(transparent)]
868    Generic(#[from] DbErrorGeneric),
869}
870
871/// Read error tied to an execution
872#[derive(Debug, Clone, thiserror::Error, PartialEq)]
873pub enum DbErrorRead {
874    #[error("cannot read - row not found")]
875    NotFound,
876    #[error(transparent)]
877    Generic(#[from] DbErrorGeneric),
878}
879
880#[derive(Debug, thiserror::Error, PartialEq)]
881pub enum DbErrorReadWithTimeout {
882    #[error("timeout")]
883    Timeout(TimeoutOutcome),
884    #[error(transparent)]
885    DbErrorRead(#[from] DbErrorRead),
886}
887
888// Represents next version after successfuly appended to execution log.
889// TODO: Convert to struct with next_version
890pub type AppendResponse = Version;
891pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
892
893#[derive(Debug, Clone)]
894pub struct LockedExecution {
895    pub execution_id: ExecutionId,
896    pub next_version: Version,
897    pub metadata: ExecutionMetadata,
898    pub locked_event: Locked,
899    pub ffqn: FunctionFqn,
900    pub params: Params,
901    pub event_history: Vec<(HistoryEvent, Version)>,
902    pub responses: Vec<ResponseWithCursor>,
903    pub parent: Option<(ExecutionId, JoinSetId)>,
904    pub intermittent_event_count: u32,
905}
906
907pub type LockPendingResponse = Vec<LockedExecution>;
908pub type AppendBatchResponse = Version;
909
910#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
911#[display("{event}")]
912pub struct AppendRequest {
913    pub created_at: DateTime<Utc>,
914    pub event: ExecutionRequest,
915}
916
917#[derive(Debug, Clone)]
918pub struct CreateRequest {
919    pub created_at: DateTime<Utc>,
920    pub execution_id: ExecutionId,
921    pub ffqn: FunctionFqn,
922    pub params: Params,
923    pub parent: Option<(ExecutionId, JoinSetId)>,
924    pub scheduled_at: DateTime<Utc>,
925    pub component_id: ComponentId,
926    pub deployment_id: DeploymentId,
927    pub metadata: ExecutionMetadata,
928    pub scheduled_by: Option<ExecutionId>,
929}
930
931impl From<CreateRequest> for ExecutionRequest {
932    fn from(value: CreateRequest) -> Self {
933        Self::Created {
934            ffqn: value.ffqn,
935            params: value.params,
936            parent: value.parent,
937            scheduled_at: value.scheduled_at,
938            component_id: value.component_id,
939            deployment_id: value.deployment_id,
940            metadata: value.metadata,
941            scheduled_by: value.scheduled_by,
942        }
943    }
944}
945
946#[async_trait]
947pub trait DbPool: Send + Sync {
948    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
949
950    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
951
952    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
953
954    #[cfg(feature = "test")]
955    async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
956}
957
958#[async_trait]
959pub trait DbPoolCloseable {
960    async fn close(&self);
961}
962
963#[derive(Clone, Debug)]
964pub struct AppendEventsToExecution {
965    pub execution_id: ExecutionId,
966    pub version: Version,
967    pub batch: Vec<AppendRequest>,
968}
969
970#[derive(Clone, Debug)]
971pub struct AppendResponseToExecution {
972    pub parent_execution_id: ExecutionId,
973    pub created_at: DateTime<Utc>,
974    pub join_set_id: JoinSetId,
975    pub child_execution_id: ExecutionIdDerived,
976    pub finished_version: Version,
977    pub result: SupportedFunctionReturnValue,
978}
979
980#[async_trait]
981pub trait DbExecutor: Send + Sync {
982    #[expect(clippy::too_many_arguments)]
983    async fn lock_pending_by_ffqns(
984        &self,
985        batch_size: u32,
986        pending_at_or_sooner: DateTime<Utc>,
987        ffqns: Arc<[FunctionFqn]>,
988        created_at: DateTime<Utc>,
989        component_id: ComponentId,
990        deployment_id: DeploymentId,
991        executor_id: ExecutorId,
992        lock_expires_at: DateTime<Utc>,
993        run_id: RunId,
994        retry_config: ComponentRetryConfig,
995    ) -> Result<LockPendingResponse, DbErrorWrite>;
996
997    #[expect(clippy::too_many_arguments)]
998    async fn lock_pending_by_component_digest(
999        &self,
1000        batch_size: u32,
1001        pending_at_or_sooner: DateTime<Utc>,
1002        component_id: &ComponentId,
1003        deployment_id: DeploymentId,
1004        created_at: DateTime<Utc>,
1005        executor_id: ExecutorId,
1006        lock_expires_at: DateTime<Utc>,
1007        run_id: RunId,
1008        retry_config: ComponentRetryConfig,
1009    ) -> Result<LockPendingResponse, DbErrorWrite>;
1010
1011    #[cfg(feature = "test")]
1012    #[expect(clippy::too_many_arguments)]
1013    async fn lock_one(
1014        &self,
1015        created_at: DateTime<Utc>,
1016        component_id: ComponentId,
1017        deployment_id: DeploymentId,
1018        execution_id: &ExecutionId,
1019        run_id: RunId,
1020        version: Version,
1021        executor_id: ExecutorId,
1022        lock_expires_at: DateTime<Utc>,
1023        retry_config: ComponentRetryConfig,
1024    ) -> Result<LockedExecution, DbErrorWrite>;
1025
1026    /// Append a single event to an existing execution log.
1027    /// The request cannot contain [`ExecutionRequest::Created`].
1028    async fn append(
1029        &self,
1030        execution_id: ExecutionId,
1031        version: Version,
1032        req: AppendRequest,
1033    ) -> Result<AppendResponse, DbErrorWrite>;
1034
1035    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
1036    /// The batch cannot contain [`ExecutionRequest::Created`].
1037    async fn append_batch_respond_to_parent(
1038        &self,
1039        events: AppendEventsToExecution,
1040        response: AppendResponseToExecution,
1041        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1042    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1043
1044    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
1045    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
1046    /// Otherwise wait until `timeout_fut` resolves.
1047    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
1048    async fn wait_for_pending_by_ffqn(
1049        &self,
1050        pending_at_or_sooner: DateTime<Utc>,
1051        ffqns: Arc<[FunctionFqn]>,
1052        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1053    );
1054
1055    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
1056    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
1057    /// Otherwise wait until `timeout_fut` resolves.
1058    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
1059    async fn wait_for_pending_by_component_digest(
1060        &self,
1061        pending_at_or_sooner: DateTime<Utc>,
1062        component_digest: &ComponentDigest,
1063        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1064    );
1065
1066    async fn cancel_activity_with_retries(
1067        &self,
1068        execution_id: &ExecutionId,
1069        cancelled_at: DateTime<Utc>,
1070    ) -> Result<CancelOutcome, DbErrorWrite> {
1071        let mut retries = 5;
1072        loop {
1073            let res = self.cancel_activity(execution_id, cancelled_at).await;
1074            if res.is_ok() || retries == 0 {
1075                return res;
1076            }
1077            retries -= 1;
1078        }
1079    }
1080
1081    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
1082    async fn get_last_execution_event(
1083        &self,
1084        execution_id: &ExecutionId,
1085    ) -> Result<ExecutionEvent, DbErrorRead>;
1086
1087    async fn cancel_activity(
1088        &self,
1089        execution_id: &ExecutionId,
1090        cancelled_at: DateTime<Utc>,
1091    ) -> Result<CancelOutcome, DbErrorWrite> {
1092        debug!("Determining cancellation state of {execution_id}");
1093
1094        let last_event = self
1095            .get_last_execution_event(execution_id)
1096            .await
1097            .map_err(DbErrorWrite::from)?;
1098        if let ExecutionRequest::Finished {
1099            retval:
1100                SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1101                    kind: ExecutionFailureKind::Cancelled,
1102                    ..
1103                }),
1104            ..
1105        } = last_event.event
1106        {
1107            return Ok(CancelOutcome::Cancelled);
1108        } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
1109            debug!("Not cancelling, {execution_id} is already finished");
1110            return Ok(CancelOutcome::AlreadyFinished);
1111        }
1112        let finished_version = last_event.version.increment();
1113        let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1114            reason: None,
1115            kind: ExecutionFailureKind::Cancelled,
1116            detail: None,
1117        });
1118        let cancel_request = AppendRequest {
1119            created_at: cancelled_at,
1120            event: ExecutionRequest::Finished {
1121                retval: child_result.clone(),
1122                http_client_traces: None,
1123            },
1124        };
1125        debug!("Cancelling activity {execution_id} at {finished_version}");
1126        if let ExecutionId::Derived(execution_id) = execution_id {
1127            let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
1128            let child_execution_id = ExecutionId::Derived(execution_id.clone());
1129            self.append_batch_respond_to_parent(
1130                AppendEventsToExecution {
1131                    execution_id: child_execution_id,
1132                    version: finished_version.clone(),
1133                    batch: vec![cancel_request],
1134                },
1135                AppendResponseToExecution {
1136                    parent_execution_id,
1137                    created_at: cancelled_at,
1138                    join_set_id: join_set_id.clone(),
1139                    child_execution_id: execution_id.clone(),
1140                    finished_version,
1141                    result: child_result,
1142                },
1143                cancelled_at,
1144            )
1145            .await?;
1146        } else {
1147            self.append(execution_id.clone(), finished_version, cancel_request)
1148                .await?;
1149        }
1150        debug!("Cancelled {execution_id}");
1151        Ok(CancelOutcome::Cancelled)
1152    }
1153}
1154
1155pub enum AppendDelayResponseOutcome {
1156    Success,
1157    AlreadyFinished,
1158    AlreadyCancelled,
1159}
1160
1161#[derive(Debug, Clone, Default)]
1162pub struct ListExecutionsFilter {
1163    pub ffqn_prefix: Option<String>,
1164    pub show_derived: bool,
1165    pub hide_finished: bool,
1166    pub execution_id_prefix: Option<String>,
1167    pub component_digest: Option<ComponentDigest>,
1168    pub deployment_id: Option<DeploymentId>,
1169}
1170
1171#[async_trait]
1172pub trait DbExternalApi: DbConnection {
1173    /// Get the latest backtrace if version is not set.
1174    async fn get_backtrace(
1175        &self,
1176        execution_id: &ExecutionId,
1177        filter: BacktraceFilter,
1178    ) -> Result<BacktraceInfo, DbErrorRead>;
1179
1180    /// Store a source file associated with a component digest.
1181    /// `frame_key` is either an exact frame symbol path or a suffix (with leading `/`)
1182    /// when `is_suffix` is true. Idempotent — repeated calls for the same key are ignored.
1183    async fn upsert_source_file(
1184        &self,
1185        component_digest: &ComponentDigest,
1186        frame_key: &str,
1187        is_suffix: bool,
1188        content: &str,
1189    ) -> Result<(), DbErrorWrite>;
1190
1191    /// Look up a source file by component digest and a frame symbol path.
1192    /// Matches either exact keys or suffix keys (where the frame path ends with the stored key).
1193    /// Returns `None` if not found or if multiple suffix entries match (ambiguous).
1194    async fn get_source_file(
1195        &self,
1196        component_digest: &ComponentDigest,
1197        file: &str,
1198    ) -> Result<Option<String>, DbErrorRead>;
1199
1200    /// Returns executions sorted in descending order.
1201    async fn list_executions(
1202        &self,
1203        filter: ListExecutionsFilter,
1204        pagination: ExecutionListPagination,
1205    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
1206
1207    /// Returns execution events for the given execution.
1208    ///
1209    /// Results are always ordered from oldest to newest (ascending by version),
1210    /// regardless of pagination direction.
1211    async fn list_execution_events(
1212        &self,
1213        execution_id: &ExecutionId,
1214        pagination: Pagination<VersionType>,
1215        include_backtrace_id: bool,
1216    ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
1217
1218    /// Returns responses of an execution ordered as they arrived,
1219    /// enabling matching each `JoinNext` to its corresponding response.
1220    ///
1221    /// Results are always ordered from oldest to newest (ascending by cursor),
1222    /// regardless of pagination direction.
1223    ///
1224    /// As an optimization, the implementation can return an empty list of `responses`
1225    /// and `max_cursor` set to 0 if the execution is not found.
1226    async fn list_responses(
1227        &self,
1228        execution_id: &ExecutionId,
1229        pagination: Pagination<u32>,
1230    ) -> Result<ListResponsesResponse, DbErrorRead>;
1231
1232    async fn list_execution_events_responses(
1233        &self,
1234        execution_id: &ExecutionId,
1235        req_since: &Version,
1236        req_max_length: VersionType,
1237        req_include_backtrace_id: bool,
1238        resp_pagination: Pagination<VersionType>,
1239    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
1240
1241    async fn upgrade_execution_component(
1242        &self,
1243        execution_id: &ExecutionId,
1244        old: &ComponentDigest,
1245        new: &ComponentDigest,
1246    ) -> Result<(), DbErrorWrite>;
1247
1248    async fn list_logs(
1249        &self,
1250        execution_id: &ExecutionId,
1251        show_derived: bool,
1252        filter: LogFilter,
1253        pagination: Pagination<DateTime<Utc>>,
1254    ) -> Result<ListLogsResponse, DbErrorRead>;
1255
1256    async fn list_deployment_states(
1257        &self,
1258        current_time: DateTime<Utc>,
1259        pagination: Pagination<Option<DeploymentId>>,
1260        include_config_json: bool,
1261    ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1262
1263    /// Insert a new deployment. The record must have `status == Inactive` and
1264    /// `last_active_at == None`; activation is a separate step via [`Self::activate_deployment`].
1265    async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite>;
1266
1267    async fn activate_deployment(
1268        &self,
1269        deployment_id: DeploymentId,
1270        now: DateTime<Utc>,
1271    ) -> Result<(), DbErrorWrite>;
1272
1273    /// Mark a deployment as Enqueued (pending next server restart).
1274    /// Returns `Err(DbErrorWriteNonRetriable::Conflict)` if the deployment is currently Active.
1275    /// Any previously Enqueued deployment is demoted to Inactive.
1276    async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite>;
1277
1278    /// Returned [`DeploymentRecord`] must contain `config_json`.
1279    async fn get_deployment(
1280        &self,
1281        deployment_id: DeploymentId,
1282    ) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1283
1284    /// Return active deployment.
1285    /// Returned [`DeploymentRecord`] must contain `config_json`.
1286    #[cfg(feature = "test")]
1287    async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1288
1289    /// Return the most relevant current deployment: Enqueued if present, otherwise Active.
1290    /// Returned [`DeploymentRecord`] must contain `config_json`.
1291    async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1292
1293    async fn list_deployments(
1294        &self,
1295        pagination: Pagination<Option<DeploymentId>>,
1296    ) -> Result<Vec<DeploymentRecord>, DbErrorRead>;
1297
1298    /// Pause an execution. Only pending executions can be paused.
1299    async fn pause_execution(
1300        &self,
1301        execution_id: &ExecutionId,
1302        paused_at: DateTime<Utc>,
1303    ) -> Result<AppendResponse, DbErrorWrite>;
1304
1305    /// Unpause an execution. Only paused executions can be unpaused.
1306    async fn unpause_execution(
1307        &self,
1308        execution_id: &ExecutionId,
1309        unpaused_at: DateTime<Utc>,
1310    ) -> Result<AppendResponse, DbErrorWrite>;
1311}
1312pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1313pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1314    Pagination::OlderThan {
1315        length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1316        cursor: None,
1317        including_cursor: false,
1318    };
1319
1320pub struct DeploymentState {
1321    pub deployment_id: DeploymentId,
1322    pub locked: u32,
1323    // In `PendingAt` state, scheduled to present or past
1324    pub pending: u32,
1325    // In `PendingAt` state, scheduled into the future
1326    pub scheduled: u32,
1327    pub blocked: u32,
1328    pub finished: u32,
1329    /// None if not requested from db.
1330    pub config_json: Option<String>,
1331    pub created_at: DateTime<Utc>,
1332    /// Set when the deployment becomes Active; None if it has never been active.
1333    pub last_active_at: Option<DateTime<Utc>>,
1334    pub status: DeploymentStatus,
1335}
1336
1337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1338pub enum DeploymentStatus {
1339    Inactive,
1340    /// Queued to become Active on the next server restart.
1341    Enqueued,
1342    Active,
1343}
1344
1345impl DeploymentStatus {
1346    #[must_use]
1347    pub fn as_str(&self) -> &'static str {
1348        match self {
1349            DeploymentStatus::Inactive => "inactive",
1350            DeploymentStatus::Enqueued => "enqueued",
1351            DeploymentStatus::Active => "active",
1352        }
1353    }
1354}
1355
1356impl std::str::FromStr for DeploymentStatus {
1357    type Err = StrVariant;
1358    fn from_str(s: &str) -> Result<Self, Self::Err> {
1359        match s {
1360            "inactive" => Ok(DeploymentStatus::Inactive),
1361            "enqueued" => Ok(DeploymentStatus::Enqueued),
1362            "active" => Ok(DeploymentStatus::Active),
1363            _ => Err(StrVariant::from(format!("unknown deployment status: {s}"))),
1364        }
1365    }
1366}
1367
1368#[derive(Debug, Clone)]
1369pub struct DeploymentRecord {
1370    pub deployment_id: DeploymentId,
1371    pub created_at: DateTime<Utc>,
1372    /// Set when the deployment becomes Active; None if it has never been active.
1373    pub last_active_at: Option<DateTime<Utc>>,
1374    pub status: DeploymentStatus,
1375    pub config_json: String,
1376    pub obelisk_version: String,
1377    pub created_by: Option<String>,
1378}
1379
1380#[derive(Debug)]
1381pub struct ListLogsResponse {
1382    pub items: Vec<LogEntryRow>,
1383    pub next_page: Pagination<DateTime<Utc>>, // Newer logs can always arrive e.g. via replay
1384    pub prev_page: Option<Pagination<DateTime<Utc>>>, // None if we are already at the beginning
1385}
1386
1387#[derive(Debug)]
1388pub struct LogFilter {
1389    show_logs: bool,
1390    show_streams: bool,
1391    levels: Vec<LogLevel>, // Only applied if `show_logs` = true, empty means return all levels.
1392    stream_types: Vec<LogStreamType>, // Only applied if `show_streams` = true, empty means return all stream types.
1393}
1394impl LogFilter {
1395    // Constructor for logs only
1396    #[must_use]
1397    pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1398        LogFilter {
1399            show_logs: true,
1400            show_streams: false,
1401            levels,
1402            stream_types: Vec::new(),
1403        }
1404    }
1405    // Constructor for streams only
1406    #[must_use]
1407    pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1408        LogFilter {
1409            show_logs: false,
1410            show_streams: true,
1411            levels: Vec::new(),
1412            stream_types,
1413        }
1414    }
1415    // Constructor for both logs and streams
1416    #[must_use]
1417    pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1418        LogFilter {
1419            show_logs: true,
1420            show_streams: true,
1421            levels,
1422            stream_types,
1423        }
1424    }
1425    // Getters
1426    #[must_use]
1427    pub fn should_show_logs(&self) -> bool {
1428        self.show_logs
1429    }
1430    #[must_use]
1431    pub fn should_show_streams(&self) -> bool {
1432        self.show_streams
1433    }
1434    #[must_use]
1435    pub fn levels(&self) -> &Vec<LogLevel> {
1436        &self.levels
1437    }
1438    #[must_use]
1439    pub fn stream_types(&self) -> &Vec<LogStreamType> {
1440        &self.stream_types
1441    }
1442}
1443
1444#[derive(Debug, Clone)]
1445pub struct ExecutionWithStateRequestsResponses {
1446    pub execution_with_state: ExecutionWithState,
1447    pub events: Vec<ExecutionEvent>,
1448    pub responses: Vec<ResponseWithCursor>,
1449    pub max_version: Version,
1450    pub max_cursor: ResponseCursor,
1451}
1452
1453#[async_trait]
1454pub trait DbConnection: DbExecutor {
1455    /// Get execution log.
1456    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1457
1458    async fn append_delay_response(
1459        &self,
1460        created_at: DateTime<Utc>,
1461        execution_id: ExecutionId,
1462        join_set_id: JoinSetId,
1463        delay_id: DelayId,
1464        outcome: Result<(), ()>, // Successfully finished - `Ok(())` or cancelled - `Err(())`
1465    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1466
1467    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
1468    /// The batch cannot contain [`ExecutionRequest::Created`].
1469    async fn append_batch(
1470        &self,
1471        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1472        batch: Vec<AppendRequest>,
1473        execution_id: ExecutionId,
1474        version: Version,
1475    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1476
1477    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
1478    /// The batch cannot contain [`ExecutionRequest::Created`].
1479    async fn append_batch_create_new_execution(
1480        &self,
1481        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1482        batch: Vec<AppendRequest>,   // must not contain [`ExecutionRequest::Created`] events
1483        execution_id: ExecutionId,
1484        version: Version,
1485        child_req: Vec<CreateRequest>,
1486        backtraces: Vec<BacktraceInfo>,
1487    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1488
1489    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
1490    async fn get_execution_event(
1491        &self,
1492        execution_id: &ExecutionId,
1493        version: &Version,
1494    ) -> Result<ExecutionEvent, DbErrorRead>;
1495
1496    #[instrument(skip(self))]
1497    async fn get_create_request(
1498        &self,
1499        execution_id: &ExecutionId,
1500    ) -> Result<CreateRequest, DbErrorRead> {
1501        let execution_event = self
1502            .get_execution_event(execution_id, &Version::new(0))
1503            .await?;
1504        if let ExecutionRequest::Created {
1505            ffqn,
1506            params,
1507            parent,
1508            scheduled_at,
1509            component_id,
1510            deployment_id,
1511            metadata,
1512            scheduled_by,
1513        } = execution_event.event
1514        {
1515            Ok(CreateRequest {
1516                created_at: execution_event.created_at,
1517                execution_id: execution_id.clone(),
1518                ffqn,
1519                params,
1520                parent,
1521                scheduled_at,
1522                component_id,
1523                deployment_id,
1524                metadata,
1525                scheduled_by,
1526            })
1527        } else {
1528            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1529                reason: "execution log must start with creation".into(),
1530                context: SpanTrace::capture(),
1531                source: None,
1532                loc: Location::caller(),
1533            }))
1534        }
1535    }
1536
1537    async fn get_pending_state(
1538        &self,
1539        execution_id: &ExecutionId,
1540    ) -> Result<ExecutionWithState, DbErrorRead>;
1541
1542    /// Get currently expired locks and async timers (delay requests)
1543    async fn get_expired_timers(
1544        &self,
1545        at: DateTime<Utc>,
1546    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1547
1548    /// Create a new execution log
1549    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1550
1551    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
1552    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
1553    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
1554    /// Implementations with no pubsub support should use polling.
1555    /// Callers are expected to call this function in a loop with a reasonable timeout
1556    /// to support less stellar implementations.
1557    async fn subscribe_to_next_responses(
1558        &self,
1559        execution_id: &ExecutionId,
1560        last_response: ResponseCursor,
1561        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1562    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1563
1564    /// First, attempt to fetch the finished value. If the execution is not finished yet, poll
1565    /// periodically or subscribe to db changes, racing with `timeout_fut`.
1566    /// Notification mechainism with no strict guarantees for getting the finished result.
1567    /// Implementations with no pubsub support should use polling.
1568    /// Callers are expected to call this function in a loop with a reasonable timeout
1569    /// to support less stellar implementations.
1570    async fn wait_for_finished_result(
1571        &self,
1572        execution_id: &ExecutionId,
1573        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1574    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1575
1576    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1577
1578    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1579
1580    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1581
1582    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1583
1584    /// Returns `TimeoutOutcome::Timeout` if not in Finished state.
1585    #[cfg(feature = "test")]
1586    async fn get_finished_result(
1587        &self,
1588        execution_id: &ExecutionId,
1589    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1590        self.wait_for_finished_result(
1591            execution_id,
1592            Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1593        )
1594        .await
1595    }
1596}
1597
1598#[derive(Clone, Debug)]
1599pub struct LogInfoAppendRow {
1600    pub execution_id: ExecutionId,
1601    pub run_id: RunId,
1602    pub log_entry: LogEntry,
1603}
1604
1605#[derive(Debug, Clone)]
1606pub struct LogEntryRow {
1607    pub cursor: DateTime<Utc>,
1608    pub run_id: RunId,
1609    pub log_entry: LogEntry,
1610    pub execution_id: ExecutionId,
1611}
1612
1613#[derive(Debug, Clone)]
1614pub enum LogEntry {
1615    Log {
1616        created_at: DateTime<Utc>,
1617        level: LogLevel,
1618        message: String,
1619    },
1620    Stream {
1621        created_at: DateTime<Utc>,
1622        payload: Vec<u8>,
1623        stream_type: LogStreamType,
1624    },
1625}
1626impl LogEntry {
1627    #[must_use]
1628    pub fn created_at(&self) -> DateTime<Utc> {
1629        match self {
1630            LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1631        }
1632    }
1633}
1634
1635#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1636#[try_from(repr)]
1637#[repr(u8)]
1638pub enum LogLevel {
1639    Trace = 1,
1640    Debug,
1641    Info,
1642    Warn,
1643    Error,
1644}
1645#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1646#[try_from(repr)]
1647#[repr(u8)]
1648pub enum LogStreamType {
1649    StdOut = 1,
1650    StdErr,
1651}
1652
1653#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1654pub enum TimeoutOutcome {
1655    Timeout,
1656    Cancel,
1657}
1658
1659#[cfg(feature = "test")]
1660#[async_trait]
1661pub trait DbConnectionTest: DbConnection {
1662    async fn append_response(
1663        &self,
1664        created_at: DateTime<Utc>,
1665        execution_id: ExecutionId,
1666        response_event: JoinSetResponseEvent,
1667    ) -> Result<(), DbErrorWrite>;
1668}
1669
1670#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1671pub enum CancelOutcome {
1672    Cancelled,
1673    AlreadyFinished,
1674}
1675
1676#[instrument(skip(db_connection))]
1677pub async fn stub_execution(
1678    db_connection: &dyn DbConnection,
1679    execution_id: ExecutionIdDerived,
1680    parent_execution_id: ExecutionId,
1681    join_set_id: JoinSetId,
1682    created_at: DateTime<Utc>,
1683    return_value: SupportedFunctionReturnValue,
1684) -> Result<(), DbErrorWrite> {
1685    let stub_finished_version = Version::new(1); // Stub activities have no execution log except Created event.
1686    // Attempt to write to `execution_id` and its parent, ignoring the possible conflict error on this tx
1687    let write_attempt = {
1688        let finished_req = AppendRequest {
1689            created_at,
1690            event: ExecutionRequest::Finished {
1691                retval: return_value.clone(),
1692                http_client_traces: None,
1693            },
1694        };
1695        db_connection
1696            .append_batch_respond_to_parent(
1697                AppendEventsToExecution {
1698                    execution_id: ExecutionId::Derived(execution_id.clone()),
1699                    version: stub_finished_version.clone(),
1700                    batch: vec![finished_req],
1701                },
1702                AppendResponseToExecution {
1703                    parent_execution_id,
1704                    created_at,
1705                    join_set_id,
1706                    child_execution_id: execution_id.clone(),
1707                    finished_version: stub_finished_version.clone(),
1708                    result: return_value.clone(),
1709                },
1710                created_at,
1711            )
1712            .await
1713    };
1714    if let Err(write_attempt) = write_attempt {
1715        // Check that the expected value is in the database
1716        debug!("Stub write attempt failed - {write_attempt:?}");
1717
1718        let found = db_connection
1719            .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1720            .await?; // Not found at this point should not happen, unless the previous write failed. Will be retried.
1721        match found.event {
1722            ExecutionRequest::Finished {
1723                retval: found_result,
1724                ..
1725            } if return_value == found_result => {
1726                // Same value has already be written, RPC is successful.
1727                Ok(())
1728            }
1729            ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1730                DbErrorWriteNonRetriable::Conflict,
1731            )),
1732            _other => Err(DbErrorWrite::NonRetriable(
1733                DbErrorWriteNonRetriable::IllegalState {
1734                    reason: "unexpected execution event at stubbed execution".into(),
1735                    context: SpanTrace::capture(),
1736                    source: None,
1737                    loc: Location::caller(),
1738                },
1739            )),
1740        }
1741    } else {
1742        Ok(())
1743    }
1744}
1745
1746pub async fn cancel_delay(
1747    db_connection: &dyn DbConnection,
1748    delay_id: DelayId,
1749    created_at: DateTime<Utc>,
1750) -> Result<CancelOutcome, DbErrorWrite> {
1751    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1752    db_connection
1753        .append_delay_response(
1754            created_at,
1755            parent_execution_id,
1756            join_set_id,
1757            delay_id,
1758            Err(()), // Mark as cancelled.
1759        )
1760        .await
1761        .map(|ok| match ok {
1762            AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1763                CancelOutcome::Cancelled
1764            }
1765            AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1766        })
1767}
1768
1769#[derive(Clone, Debug)]
1770pub enum BacktraceFilter {
1771    First,
1772    Last,
1773    Specific(Version),
1774}
1775
1776#[derive(Clone, Debug, PartialEq, Eq)]
1777pub struct BacktraceInfo {
1778    pub execution_id: ExecutionId,
1779    pub component_id: ComponentId,
1780    pub version_min_including: Version,
1781    pub version_max_excluding: Version,
1782    pub wasm_backtrace: WasmBacktrace,
1783}
1784
1785#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1786pub struct WasmBacktrace {
1787    pub frames: Vec<FrameInfo>,
1788}
1789
1790#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1791pub struct FrameInfo {
1792    pub module: String,
1793    pub func_name: String,
1794    pub symbols: Vec<FrameSymbol>,
1795}
1796
1797#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1798pub struct FrameSymbol {
1799    pub func_name: Option<String>,
1800    pub file: Option<String>,
1801    pub line: Option<u32>,
1802    pub col: Option<u32>,
1803}
1804
1805mod wasm_backtrace {
1806    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1807
1808    impl WasmBacktrace {
1809        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1810            if backtrace.frames().is_empty() {
1811                None
1812            } else {
1813                Some(Self {
1814                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1815                })
1816            }
1817        }
1818    }
1819
1820    impl From<&wasmtime::FrameInfo> for FrameInfo {
1821        fn from(frame: &wasmtime::FrameInfo) -> Self {
1822            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1823            let mut func_name = String::new();
1824            wasmtime_environ::demangle_function_name_or_index(
1825                &mut func_name,
1826                frame.func_name(),
1827                frame.func_index() as usize,
1828            )
1829            .expect("writing to string must succeed");
1830            Self {
1831                module: module_name,
1832                func_name,
1833                symbols: frame
1834                    .symbols()
1835                    .iter()
1836                    .map(std::convert::Into::into)
1837                    .collect(),
1838            }
1839        }
1840    }
1841
1842    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1843        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1844            let func_name = symbol.name().map(|name| {
1845                let mut writer = String::new();
1846                wasmtime_environ::demangle_function_name(&mut writer, name)
1847                    .expect("writing to string must succeed");
1848                writer
1849            });
1850
1851            Self {
1852                func_name,
1853                file: symbol.file().map(ToString::to_string),
1854                line: symbol.line(),
1855                col: symbol.column(),
1856            }
1857        }
1858    }
1859}
1860#[derive(Debug, Clone, derive_more::Display)]
1861#[display("{execution_id} {pending_state} {component_digest}")]
1862pub struct ExecutionWithState {
1863    pub execution_id: ExecutionId,
1864    pub ffqn: FunctionFqn,
1865    pub pending_state: PendingState,
1866    pub created_at: DateTime<Utc>,
1867    pub first_scheduled_at: DateTime<Utc>,
1868    pub component_digest: ComponentDigest,
1869    pub component_type: ComponentType,
1870    pub deployment_id: DeploymentId,
1871}
1872
1873#[derive(Debug, Clone)]
1874pub enum ExecutionListPagination {
1875    CreatedBy(Pagination<Option<DateTime<Utc>>>),
1876    ExecutionId(Pagination<Option<ExecutionId>>),
1877}
1878impl Default for ExecutionListPagination {
1879    fn default() -> ExecutionListPagination {
1880        ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1881            length: 20,
1882            cursor: None,
1883            including_cursor: false, // does not matter when `cursor` is not specified
1884        })
1885    }
1886}
1887impl ExecutionListPagination {
1888    #[must_use]
1889    pub fn length(&self) -> u16 {
1890        match self {
1891            ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1892            ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1893        }
1894    }
1895}
1896
1897#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1898pub enum Pagination<T> {
1899    NewerThan {
1900        length: u16,
1901        cursor: T,
1902        including_cursor: bool,
1903    },
1904    OlderThan {
1905        length: u16,
1906        cursor: T,
1907        including_cursor: bool,
1908    },
1909}
1910impl<T: Clone> Pagination<T> {
1911    pub fn length(&self) -> u16 {
1912        match self {
1913            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1914        }
1915    }
1916
1917    pub fn rel(&self) -> &'static str {
1918        match self {
1919            Pagination::NewerThan {
1920                including_cursor: false,
1921                ..
1922            } => ">",
1923            Pagination::NewerThan {
1924                including_cursor: true,
1925                ..
1926            } => ">=",
1927            Pagination::OlderThan {
1928                including_cursor: false,
1929                ..
1930            } => "<",
1931            Pagination::OlderThan {
1932                including_cursor: true,
1933                ..
1934            } => "<=",
1935        }
1936    }
1937
1938    pub fn is_desc(&self) -> bool {
1939        matches!(self, Pagination::OlderThan { .. })
1940    }
1941
1942    pub fn asc_or_desc(&self) -> &'static str {
1943        if self.is_asc() { "asc" } else { "desc" }
1944    }
1945
1946    pub fn is_asc(&self) -> bool {
1947        !self.is_desc()
1948    }
1949
1950    pub fn cursor(&self) -> &T {
1951        match self {
1952            Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1953        }
1954    }
1955
1956    #[must_use]
1957    pub fn invert(&self) -> Self {
1958        match self {
1959            Pagination::NewerThan {
1960                length,
1961                cursor,
1962                including_cursor,
1963            } => Pagination::OlderThan {
1964                length: *length,
1965                cursor: cursor.clone(),
1966                including_cursor: !including_cursor,
1967            },
1968            Pagination::OlderThan {
1969                length,
1970                cursor,
1971                including_cursor,
1972            } => Pagination::NewerThan {
1973                length: *length,
1974                cursor: cursor.clone(),
1975                including_cursor: !including_cursor,
1976            },
1977        }
1978    }
1979}
1980
1981#[cfg(feature = "test")]
1982pub async fn wait_for_pending_state_fn<T: Debug>(
1983    db_connection: &dyn DbConnectionTest,
1984    execution_id: &ExecutionId,
1985    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1986    timeout: Option<Duration>,
1987) -> Result<T, DbErrorReadWithTimeout> {
1988    tracing::trace!(%execution_id, "Waiting for predicate");
1989    let fut = async move {
1990        loop {
1991            let execution_log = db_connection.get(execution_id).await?;
1992            if let Some(t) = predicate(execution_log) {
1993                tracing::debug!(%execution_id, "Found: {t:?}");
1994                return Ok(t);
1995            }
1996            tokio::time::sleep(Duration::from_millis(10)).await;
1997        }
1998    };
1999
2000    if let Some(timeout) = timeout {
2001        tokio::select! { // future's liveness: Dropping the loser immediately.
2002            res = fut => res,
2003            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
2004        }
2005    } else {
2006        fut.await
2007    }
2008}
2009
2010#[derive(Debug, Clone, PartialEq, Eq)]
2011pub enum ExpiredTimer {
2012    Lock(ExpiredLock),
2013    Delay(ExpiredDelay),
2014}
2015
2016#[derive(Debug, Clone, PartialEq, Eq)]
2017pub struct ExpiredLock {
2018    pub execution_id: ExecutionId,
2019    // Version of last `Locked` event, used to detect whether the execution made progress.
2020    pub locked_at_version: Version,
2021    pub next_version: Version,
2022    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
2023    pub intermittent_event_count: u32,
2024    pub max_retries: Option<u32>,
2025    pub retry_exp_backoff: Duration,
2026    pub locked_by: LockedBy,
2027}
2028
2029#[derive(Debug, Clone, PartialEq, Eq)]
2030pub struct ExpiredDelay {
2031    pub execution_id: ExecutionId,
2032    pub join_set_id: JoinSetId,
2033    pub delay_id: DelayId,
2034}
2035
2036#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2037#[serde(tag = "status", rename_all = "snake_case")]
2038pub enum PendingState {
2039    /// Caused by [`ExecutionRequest::Locked`].
2040    Locked(PendingStateLocked),
2041
2042    #[display("PendingAt(`{_0}`)")]
2043    PendingAt(PendingStatePendingAt),
2044
2045    /// Caused by [`HistoryEvent::JoinNext`]
2046    #[display("BlockedByJoinSet({_0})")]
2047    BlockedByJoinSet(PendingStateBlockedByJoinSet),
2048
2049    #[display("Paused({_0})")]
2050    Paused(PendingStatePaused),
2051
2052    #[display("Finished({_0})")]
2053    Finished(PendingStateFinished),
2054}
2055
2056pub enum PendingStateMergedPause {
2057    Locked {
2058        state: PendingStateLocked,
2059        paused: bool,
2060    },
2061    PendingAt {
2062        state: PendingStatePendingAt,
2063        paused: bool,
2064    },
2065    BlockedByJoinSet {
2066        state: PendingStateBlockedByJoinSet,
2067        paused: bool,
2068    },
2069    Finished(PendingStateFinished),
2070}
2071impl From<PendingState> for PendingStateMergedPause {
2072    fn from(state: PendingState) -> Self {
2073        match state {
2074            PendingState::Locked(s) => PendingStateMergedPause::Locked {
2075                state: s,
2076                paused: false,
2077            },
2078
2079            PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
2080                state: s,
2081                paused: false,
2082            },
2083
2084            PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
2085                state: s,
2086                paused: false,
2087            },
2088
2089            PendingState::Paused(paused) => match paused {
2090                PendingStatePaused::Locked(s) => PendingStateMergedPause::Locked {
2091                    state: s,
2092                    paused: true,
2093                },
2094                PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
2095                    state: s,
2096                    paused: true,
2097                },
2098                PendingStatePaused::BlockedByJoinSet(s) => {
2099                    PendingStateMergedPause::BlockedByJoinSet {
2100                        state: s,
2101                        paused: true,
2102                    }
2103                }
2104            },
2105
2106            PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
2107        }
2108    }
2109}
2110
2111#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2112#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
2113pub struct PendingStateLocked {
2114    pub locked_by: LockedBy,
2115    pub lock_expires_at: DateTime<Utc>,
2116}
2117
2118#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2119#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
2120pub struct PendingStatePendingAt {
2121    pub scheduled_at: DateTime<Utc>,
2122    /// `last_lock` is needed for lock extension.
2123    pub last_lock: Option<LockedBy>,
2124}
2125
2126#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2127#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
2128pub struct PendingStateBlockedByJoinSet {
2129    pub join_set_id: JoinSetId,
2130    /// See [`HistoryEvent::JoinNext::lock_expires_at`].
2131    pub lock_expires_at: DateTime<Utc>,
2132    /// Blocked by closing of the join set
2133    pub closing: bool,
2134}
2135
2136/// State of execution before it was paused.
2137#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2138pub enum PendingStatePaused {
2139    #[display("Locked({_0})")]
2140    Locked(PendingStateLocked),
2141    #[display("PendingAt({_0})")]
2142    PendingAt(PendingStatePendingAt),
2143    #[display("BlockedByJoinSet({_0})")]
2144    BlockedByJoinSet(PendingStateBlockedByJoinSet),
2145}
2146
2147#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2148pub struct LockedBy {
2149    pub executor_id: ExecutorId,
2150    pub run_id: RunId,
2151}
2152impl From<&Locked> for LockedBy {
2153    fn from(value: &Locked) -> Self {
2154        LockedBy {
2155            executor_id: value.executor_id,
2156            run_id: value.run_id,
2157        }
2158    }
2159}
2160
2161#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2162#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
2163pub struct PendingStateFinished {
2164    pub version: VersionType, // not Version since it must be Copy
2165    pub finished_at: DateTime<Utc>,
2166    pub result_kind: PendingStateFinishedResultKind,
2167}
2168impl Display for PendingStateFinished {
2169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2170        match self.result_kind {
2171            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
2172            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
2173        }
2174    }
2175}
2176
2177// This is not a Result so that it can be customized for serialization
2178#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2179#[serde(rename_all = "snake_case")]
2180pub enum PendingStateFinishedResultKind {
2181    Ok,
2182    Err(PendingStateFinishedError),
2183}
2184impl PendingStateFinishedResultKind {
2185    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
2186        match self {
2187            PendingStateFinishedResultKind::Ok => Ok(()),
2188            PendingStateFinishedResultKind::Err(err) => Err(err),
2189        }
2190    }
2191}
2192
2193impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
2194    fn from(result: &SupportedFunctionReturnValue) -> Self {
2195        result.as_pending_state_finished_result()
2196    }
2197}
2198
2199#[derive(
2200    Debug,
2201    Clone,
2202    Copy,
2203    PartialEq,
2204    Eq,
2205    Serialize,
2206    Deserialize,
2207    derive_more::Display,
2208    schemars::JsonSchema,
2209)]
2210#[serde(rename_all = "snake_case")]
2211pub enum PendingStateFinishedError {
2212    #[display("{_0}")]
2213    ExecutionFailure(ExecutionFailureKind),
2214    #[display("completed with an error")]
2215    Error,
2216}
2217
2218impl PendingState {
2219    #[instrument(skip(self))]
2220    pub fn can_append_lock(
2221        &self,
2222        created_at: DateTime<Utc>,
2223        executor_id: ExecutorId,
2224        run_id: RunId,
2225        lock_expires_at: DateTime<Utc>,
2226    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
2227        if lock_expires_at <= created_at {
2228            return Err(DbErrorWriteNonRetriable::ValidationFailed(
2229                "invalid expiry date".into(),
2230            ));
2231        }
2232        match self {
2233            PendingState::PendingAt(PendingStatePendingAt {
2234                scheduled_at,
2235                last_lock,
2236            }) => {
2237                if *scheduled_at <= created_at {
2238                    // pending now, ok to lock
2239                    Ok(LockKind::CreatingNewLock)
2240                } else if let Some(LockedBy {
2241                    executor_id: last_executor_id,
2242                    run_id: last_run_id,
2243                }) = last_lock
2244                    && executor_id == *last_executor_id
2245                    && run_id == *last_run_id
2246                {
2247                    // Original executor is extending the lock.
2248                    Ok(LockKind::Extending)
2249                } else {
2250                    Err(DbErrorWriteNonRetriable::ValidationFailed(
2251                        "cannot lock, not yet pending".into(),
2252                    ))
2253                }
2254            }
2255            PendingState::Locked(PendingStateLocked {
2256                locked_by:
2257                    LockedBy {
2258                        executor_id: current_pending_state_executor_id,
2259                        run_id: current_pending_state_run_id,
2260                    },
2261                lock_expires_at: _,
2262            }) => {
2263                if executor_id == *current_pending_state_executor_id
2264                    && run_id == *current_pending_state_run_id
2265                {
2266                    // Original executor is extending the lock.
2267                    Ok(LockKind::Extending)
2268                } else {
2269                    Err(DbErrorWriteNonRetriable::IllegalState {
2270                        reason: "cannot lock, already locked".into(),
2271                        context: SpanTrace::capture(),
2272                        source: None,
2273                        loc: Location::caller(),
2274                    })
2275                }
2276            }
2277            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2278                reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
2279                context: SpanTrace::capture(),
2280                source: None,
2281                loc: Location::caller(),
2282            }),
2283            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2284                reason: "already finished".into(),
2285                context: SpanTrace::capture(),
2286                source: None,
2287                loc: Location::caller(),
2288            }),
2289            PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
2290                reason: "cannot lock, execution is paused".into(),
2291                context: SpanTrace::capture(),
2292                source: None,
2293                loc: Location::caller(),
2294            }),
2295        }
2296    }
2297
2298    #[must_use]
2299    pub fn is_finished(&self) -> bool {
2300        matches!(self, PendingState::Finished { .. })
2301    }
2302
2303    #[must_use]
2304    pub fn is_paused(&self) -> bool {
2305        matches!(self, PendingState::Paused(_))
2306    }
2307}
2308
2309#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2310pub enum LockKind {
2311    Extending,
2312    CreatingNewLock,
2313}
2314
2315pub mod http_client_trace {
2316    use chrono::{DateTime, Utc};
2317    use serde::{Deserialize, Serialize};
2318
2319    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2320    pub struct HttpClientTrace {
2321        pub req: RequestTrace,
2322        pub resp: Option<ResponseTrace>,
2323    }
2324
2325    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2326    pub struct RequestTrace {
2327        pub sent_at: DateTime<Utc>,
2328        pub uri: String,
2329        pub method: String,
2330    }
2331
2332    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2333    pub struct ResponseTrace {
2334        pub finished_at: DateTime<Utc>,
2335        pub status: Result<u16, String>,
2336    }
2337}
2338
2339/// Root type for DB schema generation - contains all serialized DB types
2340#[derive(schemars::JsonSchema)]
2341pub struct DbStorageSchema {
2342    pub execution_event: ExecutionEvent,
2343    pub pending_state: PendingState,
2344    pub join_set_response: JoinSetResponse,
2345    pub wasm_backtrace: WasmBacktrace,
2346}
2347
2348#[cfg(test)]
2349mod tests {
2350    use super::HistoryEvent;
2351    use super::HistoryEventScheduleAt;
2352    use super::JoinNextTryOutcome;
2353    use super::PendingStateFinished;
2354    use super::PendingStateFinishedError;
2355    use super::PendingStateFinishedResultKind;
2356    use crate::ExecutionFailureKind;
2357    use crate::JoinSetId;
2358    use crate::SupportedFunctionReturnValue;
2359    use chrono::DateTime;
2360    use chrono::Datelike;
2361    use chrono::Utc;
2362    use insta::assert_snapshot;
2363    use rstest::rstest;
2364    use std::time::Duration;
2365    use val_json::type_wrapper::TypeWrapper;
2366    use val_json::wast_val::WastVal;
2367    use val_json::wast_val::WastValWithType;
2368
2369    #[rstest(expected => [
2370        PendingStateFinishedResultKind::Ok,
2371        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2372    ])]
2373    #[test]
2374    fn serde_pending_state_finished_result_kind_should_work(
2375        expected: PendingStateFinishedResultKind,
2376    ) {
2377        let ser = serde_json::to_string(&expected).unwrap();
2378        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2379        assert_eq!(expected, actual);
2380    }
2381
2382    #[rstest(result_kind => [
2383        PendingStateFinishedResultKind::Ok,
2384        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2385    ])]
2386    #[test]
2387    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2388        let expected = PendingStateFinished {
2389            version: 0,
2390            finished_at: Utc::now(),
2391            result_kind,
2392        };
2393
2394        let ser = serde_json::to_string(&expected).unwrap();
2395        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2396        assert_eq!(expected, actual);
2397    }
2398
2399    #[test]
2400    fn join_set_deser_with_result_ok_option_none_should_work() {
2401        let expected = SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2402            r#type: TypeWrapper::Result {
2403                ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2404                err: Some(Box::new(TypeWrapper::String)),
2405            },
2406            value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2407        }));
2408        let json = serde_json::to_string(&expected).unwrap();
2409        assert_snapshot!(json);
2410
2411        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2412
2413        assert_eq!(expected, actual);
2414    }
2415
2416    #[test]
2417    fn as_date_time_should_work_with_duration_u32_max_secs() {
2418        let duration = Duration::from_secs(u64::from(u32::MAX));
2419        let schedule_at = HistoryEventScheduleAt::In(duration);
2420        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2421        assert_eq!(2106, resolved.year());
2422    }
2423
2424    const MILLIS_PER_SEC: i64 = 1000;
2425    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2426
2427    #[test]
2428    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2429        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
2430        let duration = Duration::from_secs(
2431            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2432        );
2433        let schedule_at = HistoryEventScheduleAt::In(duration);
2434        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2435    }
2436
2437    #[test]
2438    fn join_next_try_outcome_new_format() {
2439        let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"found"}"#;
2440        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2441        assert_eq!(
2442            event,
2443            HistoryEvent::JoinNextTry {
2444                join_set_id: JoinSetId::new(
2445                    crate::JoinSetKind::Named,
2446                    crate::StrVariant::Static("test")
2447                )
2448                .unwrap(),
2449                outcome: JoinNextTryOutcome::Found,
2450            }
2451        );
2452
2453        let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"all_processed"}"#;
2454        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2455        assert_eq!(
2456            event,
2457            HistoryEvent::JoinNextTry {
2458                join_set_id: JoinSetId::new(
2459                    crate::JoinSetKind::Named,
2460                    crate::StrVariant::Static("test")
2461                )
2462                .unwrap(),
2463                outcome: JoinNextTryOutcome::AllProcessed,
2464            }
2465        );
2466    }
2467
2468    #[test]
2469    fn join_next_try_outcome_serializes_new_format() {
2470        let event = HistoryEvent::JoinNextTry {
2471            join_set_id: JoinSetId::new(
2472                crate::JoinSetKind::Named,
2473                crate::StrVariant::Static("test"),
2474            )
2475            .unwrap(),
2476            outcome: JoinNextTryOutcome::AllProcessed,
2477        };
2478        let json = serde_json::to_string(&event).unwrap();
2479        assert!(
2480            json.contains(r#""outcome":"all_processed""#),
2481            "expected outcome field, got: {json}"
2482        );
2483        assert!(
2484            !json.contains("found_response"),
2485            "should not contain old field, got: {json}"
2486        );
2487    }
2488
2489    mod stub_retval_hash {
2490        use super::super::{StubRetVal, StubRetValHash};
2491        use crate::SupportedFunctionReturnValue;
2492        use val_json::type_wrapper::TypeWrapper;
2493        use val_json::wast_val::{WastVal, WastValWithType};
2494
2495        #[test]
2496        fn typed_variant_hash_is_stable() {
2497            let retval =
2498                StubRetVal::Typed(SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2499                    r#type: TypeWrapper::String,
2500                    value: WastVal::String("hello".into()),
2501                })));
2502            let hash = retval.hash();
2503            // Hash should start with version byte 0x01
2504            assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2505            // Hash should be 66 hex characters (33 bytes * 2)
2506            assert_eq!(hash.to_string().len(), 66);
2507        }
2508
2509        #[test]
2510        fn untyped_variant_hash_is_stable() {
2511            let retval = StubRetVal::Untyped(r#"{"ok": "hello"}"#.to_string());
2512            let hash = retval.hash();
2513            // Hash should start with version byte 0x01
2514            assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2515            // Hash should be 66 hex characters (33 bytes * 2)
2516            assert_eq!(hash.to_string().len(), 66);
2517        }
2518
2519        #[test]
2520        fn different_values_produce_different_hashes() {
2521            let typed1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2522            let typed2 = StubRetVal::Typed(SupportedFunctionReturnValue::Err(None));
2523            let untyped1 = StubRetVal::Untyped("value1".to_string());
2524            let untyped2 = StubRetVal::Untyped("value2".to_string());
2525
2526            let hashes: Vec<_> = [typed1, typed2, untyped1, untyped2]
2527                .into_iter()
2528                .map(|r| r.hash().to_string())
2529                .collect();
2530
2531            // All hashes should be unique
2532            for (i, h1) in hashes.iter().enumerate() {
2533                for h2 in hashes.iter().skip(i + 1) {
2534                    assert_ne!(h1, h2, "hashes should be different");
2535                }
2536            }
2537        }
2538
2539        #[test]
2540        fn same_values_produce_same_hashes() {
2541            let retval1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2542            let retval2 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2543            assert_eq!(retval1.hash(), retval2.hash());
2544
2545            let untyped1 = StubRetVal::Untyped("test".to_string());
2546            let untyped2 = StubRetVal::Untyped("test".to_string());
2547            assert_eq!(untyped1.hash(), untyped2.hash());
2548        }
2549
2550        #[test]
2551        fn hash_serialization_roundtrip() {
2552            let retval = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2553            let hash = retval.hash();
2554
2555            let serialized = serde_json::to_string(&hash).unwrap();
2556            let deserialized: StubRetValHash = serde_json::from_str(&serialized).unwrap();
2557
2558            assert_eq!(hash, deserialized);
2559        }
2560
2561        #[test]
2562        fn hash_display_and_fromstr_roundtrip() {
2563            let retval = StubRetVal::Untyped("test value".to_string());
2564            let hash = retval.hash();
2565
2566            let display = hash.to_string();
2567            let parsed: StubRetValHash = display.parse().unwrap();
2568
2569            assert_eq!(hash, parsed);
2570        }
2571
2572        #[test]
2573        fn typed_and_untyped_with_same_content_produce_different_hashes() {
2574            // Even if the JSON content is the same, Typed vs Untyped should hash differently
2575            let typed = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2576            let json_of_typed =
2577                serde_json::to_string(&SupportedFunctionReturnValue::Ok(None)).unwrap();
2578            let untyped = StubRetVal::Untyped(json_of_typed);
2579
2580            assert_ne!(typed.hash(), untyped.hash());
2581        }
2582    }
2583}