Skip to main content

obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionError;
8use crate::FunctionFqn;
9use crate::JoinSetId;
10use crate::Params;
11use crate::StrVariant;
12use crate::SupportedFunctionReturnValue;
13use crate::component_id::ComponentDigest;
14use crate::prefixed_ulid::DelayId;
15use crate::prefixed_ulid::DeploymentId;
16use crate::prefixed_ulid::ExecutionIdDerived;
17use crate::prefixed_ulid::ExecutorId;
18use crate::prefixed_ulid::RunId;
19use assert_matches::assert_matches;
20use async_trait::async_trait;
21use chrono::TimeDelta;
22use chrono::{DateTime, Utc};
23use http_client_trace::HttpClientTrace;
24use serde::Deserialize;
25use serde::Serialize;
26use std::fmt::Debug;
27use std::fmt::Display;
28use std::panic::Location;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33use tracing::instrument;
34use tracing_error::SpanTrace;
35
36// Shared between databases. TODO: Extract to db-common
37pub const STATE_PENDING_AT: &str = "pending_at";
38pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
39pub const STATE_LOCKED: &str = "locked";
40pub const STATE_FINISHED: &str = "finished";
41pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; // Serialization tag of `HistoryEvent::JoinNext`
42
43#[derive(Debug, PartialEq, Eq, Clone)]
44pub struct ExecutionLog {
45    pub execution_id: ExecutionId,
46    pub events: Vec<ExecutionEvent>,
47    pub responses: Vec<ResponseWithCursor>,
48    pub next_version: Version, // Is not advanced once in Finished state
49    pub pending_state: PendingState, // reflecting the current state
50    pub component_digest: ComponentDigest, // reflecting the current state
51    pub component_type: ComponentType,
52    pub deployment_id: DeploymentId, // reflecting the current state
53}
54
55impl ExecutionLog {
56    /// Return some duration after which the execution will be retried.
57    /// Return `None` if no more retries are allowed.
58    #[must_use]
59    pub fn can_be_retried_after(
60        temporary_event_count: u32,
61        max_retries: Option<u32>,
62        retry_exp_backoff: Duration,
63    ) -> Option<Duration> {
64        // If max_retries == None, wrapping is OK after this succeeds - we want to retry forever.
65        if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
66            // TODO: Add test for number of retries
67            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
68            Some(duration)
69        } else {
70            None
71        }
72    }
73
74    #[must_use]
75    pub fn compute_retry_duration_when_retrying_forever(
76        temporary_event_count: u32,
77        retry_exp_backoff: Duration,
78    ) -> Duration {
79        Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
80            .expect("`max_retries` set to MAX must never return None")
81    }
82
83    #[must_use]
84    pub fn ffqn(&self) -> &FunctionFqn {
85        assert_matches!(self.events.first(), Some(ExecutionEvent {
86            event: ExecutionRequest::Created { ffqn, .. },
87            ..
88        }) => ffqn)
89    }
90
91    #[must_use]
92    pub fn params(&self) -> &Params {
93        assert_matches!(self.events.first(), Some(ExecutionEvent {
94            event: ExecutionRequest::Created { params, .. },
95            ..
96        }) => params)
97    }
98
99    #[must_use]
100    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
101        assert_matches!(self.events.first(), Some(ExecutionEvent {
102            event: ExecutionRequest::Created { parent, .. },
103            ..
104        }) => parent.clone())
105    }
106
107    #[must_use]
108    pub fn last_event(&self) -> &ExecutionEvent {
109        self.events.last().expect("must contain at least one event")
110    }
111
112    #[must_use]
113    pub fn is_finished(&self) -> bool {
114        matches!(
115            self.events.last(),
116            Some(ExecutionEvent {
117                event: ExecutionRequest::Finished { .. },
118                ..
119            })
120        )
121    }
122
123    #[must_use]
124    pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
125        if let ExecutionEvent {
126            event: ExecutionRequest::Finished { retval: result, .. },
127            ..
128        } = self.events.last().expect("must contain at least one event")
129        {
130            Some(result.clone())
131        } else {
132            None
133        }
134    }
135
136    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
137        self.events.iter().filter_map(|event| {
138            if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
139                Some((eh.clone(), event.version.clone()))
140            } else {
141                None
142            }
143        })
144    }
145
146    #[cfg(feature = "test")]
147    #[must_use]
148    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
149        self.events
150            .iter()
151            .find_map(move |event| match &event.event {
152                ExecutionRequest::HistoryEvent {
153                    event:
154                        HistoryEvent::JoinSetRequest {
155                            join_set_id: found,
156                            request,
157                        },
158                    ..
159                } if *join_set_id == *found => Some(request),
160                _ => None,
161            })
162    }
163}
164
165pub type VersionType = u32;
166#[derive(
167    Debug,
168    Default,
169    Clone,
170    PartialEq,
171    Eq,
172    Hash,
173    derive_more::Display,
174    derive_more::Into,
175    serde::Serialize,
176    serde::Deserialize,
177    schemars::JsonSchema,
178)]
179#[serde(transparent)]
180#[schemars(transparent)]
181pub struct Version(pub VersionType);
182impl Version {
183    #[must_use]
184    pub fn new(arg: VersionType) -> Version {
185        Version(arg)
186    }
187
188    #[must_use]
189    pub fn increment(&self) -> Version {
190        Version(self.0 + 1)
191    }
192}
193impl TryFrom<i64> for Version {
194    type Error = VersionParseError;
195    fn try_from(value: i64) -> Result<Self, Self::Error> {
196        VersionType::try_from(value)
197            .map(Version::new)
198            .map_err(|_| VersionParseError)
199    }
200}
201impl From<Version> for usize {
202    fn from(value: Version) -> Self {
203        usize::try_from(value.0).expect("16 bit systems are unsupported")
204    }
205}
206impl From<&Version> for usize {
207    fn from(value: &Version) -> Self {
208        usize::try_from(value.0).expect("16 bit systems are unsupported")
209    }
210}
211
212#[derive(Debug, thiserror::Error)]
213#[error("version must be u32")]
214pub struct VersionParseError;
215
216#[derive(
217    Clone,
218    Debug,
219    derive_more::Display,
220    PartialEq,
221    Eq,
222    serde::Serialize,
223    serde::Deserialize,
224    schemars::JsonSchema,
225)]
226#[display("{event}")]
227pub struct ExecutionEvent {
228    pub created_at: DateTime<Utc>,
229    pub event: ExecutionRequest,
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub backtrace_id: Option<Version>,
232    pub version: Version,
233}
234
235#[derive(
236    Debug,
237    Clone,
238    Copy,
239    PartialEq,
240    Eq,
241    derive_more::Display,
242    derive_more::Into,
243    Serialize, /* webapi */
244    schemars::JsonSchema,
245)]
246pub struct ResponseCursor(pub u32);
247
248#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */, schemars::JsonSchema)]
249pub struct ResponseWithCursor {
250    pub event: JoinSetResponseEventOuter,
251    pub cursor: ResponseCursor,
252}
253
254#[derive(Debug)]
255pub struct ListExecutionEventsResponse {
256    pub events: Vec<ExecutionEvent>,
257    pub max_version: Version,
258}
259
260#[derive(Debug)]
261pub struct ListResponsesResponse {
262    pub responses: Vec<ResponseWithCursor>,
263    pub max_cursor: ResponseCursor,
264}
265
266#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */, schemars::JsonSchema)]
267pub struct JoinSetResponseEventOuter {
268    pub created_at: DateTime<Utc>,
269    pub event: JoinSetResponseEvent,
270}
271
272#[derive(
273    Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
274)]
275pub struct JoinSetResponseEvent {
276    pub join_set_id: JoinSetId,
277    pub event: JoinSetResponse,
278}
279
280#[derive(
281    Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display, schemars::JsonSchema,
282)]
283#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
284#[serde(tag = "type", rename_all = "snake_case")]
285pub enum JoinSetResponse {
286    #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
287    DelayFinished {
288        delay_id: DelayId,
289        result: Result<(), ()>,
290    },
291    #[display("{result}: {child_execution_id}")] // execution completed..
292    ChildExecutionFinished {
293        child_execution_id: ExecutionIdDerived,
294        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
295        finished_version: Version,
296        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
297        result: SupportedFunctionReturnValue,
298    },
299}
300
301pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
302    ffqn: FunctionFqn::new_static("", ""),
303    params: Params::empty(),
304    parent: None,
305    scheduled_at: DateTime::from_timestamp_nanos(0),
306    component_id: ComponentId::dummy_activity(),
307    deployment_id: DeploymentId::from_parts(0, 0),
308    metadata: ExecutionMetadata::empty(),
309    scheduled_by: None,
310};
311pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
312    event: HistoryEvent::JoinSetCreate {
313        join_set_id: JoinSetId {
314            kind: crate::JoinSetKind::OneOff,
315            name: StrVariant::empty(),
316        },
317    },
318};
319
320#[derive(
321    Clone,
322    derive_more::Debug,
323    derive_more::Display,
324    PartialEq,
325    Eq,
326    Serialize,
327    Deserialize,
328    schemars::JsonSchema,
329)]
330#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
331#[serde(rename_all = "snake_case")]
332pub enum ExecutionRequest {
333    #[display("Created({ffqn}, `{scheduled_at}`)")]
334    Created {
335        ffqn: FunctionFqn,
336        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
337        #[debug(skip)]
338        params: Params,
339        parent: Option<(ExecutionId, JoinSetId)>,
340        scheduled_at: DateTime<Utc>,
341        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
342        component_id: ComponentId,
343        deployment_id: DeploymentId,
344        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
345        metadata: ExecutionMetadata,
346        scheduled_by: Option<ExecutionId>,
347    },
348    Locked(Locked),
349    /// Returns execution to [`PendingState::PendingAt`] state at the specified time.
350    /// This can happen when:
351    /// - executor is running out of resources like [`WorkerError::LimitReached`]
352    /// - workflow made progress but then its lock expired.
353    /// - executor is being closed (shutdown or hot redeploy requested)
354    #[display("Unlocked(`{backoff_expires_at}`)")]
355    Unlocked {
356        backoff_expires_at: DateTime<Utc>,
357        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
358        reason: StrVariant,
359    },
360    // Created by the executor holding the lock.
361    // After expiry interpreted as pending.
362    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
363    TemporarilyFailed {
364        backoff_expires_at: DateTime<Utc>,
365        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
366        reason: StrVariant,
367        detail: Option<String>,
368        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
369        http_client_traces: Option<Vec<HttpClientTrace>>,
370    },
371    // Created by the executor holding the lock.
372    // After expiry interpreted as pending.
373    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
374    TemporarilyTimedOut {
375        backoff_expires_at: DateTime<Utc>,
376        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
377        http_client_traces: Option<Vec<HttpClientTrace>>,
378    },
379    // Created by the executor holding the lock.
380    #[display("Finished")]
381    Finished {
382        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
383        retval: SupportedFunctionReturnValue,
384        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
385        http_client_traces: Option<Vec<HttpClientTrace>>,
386    },
387
388    #[display("HistoryEvent({event})")]
389    HistoryEvent {
390        event: HistoryEvent,
391    },
392    #[display("Paused")]
393    Paused,
394    #[display("Unpaused")]
395    Unpaused,
396}
397
398impl ExecutionRequest {
399    #[must_use]
400    pub fn is_temporary_event(&self) -> bool {
401        matches!(
402            self,
403            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
404        )
405    }
406
407    /// String representation of `ExecutionRequest`, used in execution log table to fetch events of certain type, e.g. `created` + `history_event`.
408    #[must_use]
409    pub const fn variant(&self) -> &'static str {
410        match self {
411            ExecutionRequest::Created { .. } => "created",
412            ExecutionRequest::Locked(_) => "locked",
413            ExecutionRequest::Unlocked { .. } => "unlocked",
414            ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
415            ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
416            ExecutionRequest::Finished { .. } => "finished",
417            ExecutionRequest::HistoryEvent { .. } => "history_event",
418            ExecutionRequest::Paused => "paused",
419            ExecutionRequest::Unpaused => "unpaused",
420        }
421    }
422
423    #[must_use]
424    pub fn join_set_id(&self) -> Option<&JoinSetId> {
425        match self {
426            Self::Created {
427                parent: Some((_parent_id, join_set_id)),
428                ..
429            } => Some(join_set_id),
430            Self::HistoryEvent {
431                event:
432                    HistoryEvent::JoinSetCreate { join_set_id, .. }
433                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
434                    | HistoryEvent::JoinNext { join_set_id, .. },
435            } => Some(join_set_id),
436            _ => None,
437        }
438    }
439}
440
441#[derive(
442    Clone,
443    derive_more::Debug,
444    derive_more::Display,
445    PartialEq,
446    Eq,
447    Serialize,
448    Deserialize,
449    schemars::JsonSchema,
450)]
451#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
452#[display("Locked(`{lock_expires_at}`, {component_id})")]
453pub struct Locked {
454    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
455    pub component_id: ComponentId,
456    pub executor_id: ExecutorId,
457    pub deployment_id: DeploymentId,
458    pub run_id: RunId,
459    pub lock_expires_at: DateTime<Utc>,
460    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
461    pub retry_config: ComponentRetryConfig,
462}
463
464#[derive(
465    Debug,
466    Clone,
467    Copy,
468    PartialEq,
469    Eq,
470    derive_more::Display,
471    Serialize,
472    Deserialize,
473    schemars::JsonSchema,
474)]
475#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
476#[serde(tag = "type", rename_all = "snake_case")]
477pub enum PersistKind {
478    #[display("RandomU64({min}, {max_inclusive})")]
479    RandomU64 {
480        min: u64,
481        max_inclusive: u64,
482    },
483    #[display("RandomString({min_length}, {max_length_exclusive})")]
484    RandomString {
485        min_length: u64,
486        max_length_exclusive: u64,
487    },
488    ExecutionId,
489}
490
491#[must_use]
492pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
493    value.to_be_bytes()
494}
495
496#[derive(
497    derive_more::Debug,
498    Clone,
499    PartialEq,
500    Eq,
501    derive_more::Display,
502    Serialize,
503    Deserialize,
504    schemars::JsonSchema,
505)]
506#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
507#[serde(tag = "type", rename_all = "snake_case")]
508/// Must be created by the executor in [`PendingState::Locked`].
509pub enum HistoryEvent {
510    /// Persist a generated pseudorandom value.
511    #[display("Persist")]
512    Persist {
513        #[debug(skip)]
514        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
515        kind: PersistKind,
516    },
517    #[display("JoinSetCreate({join_set_id})")]
518    JoinSetCreate { join_set_id: JoinSetId },
519    #[display("JoinSetRequest({request})")]
520    // join_set_id is part of ExecutionId or DelayId in the `request`
521    JoinSetRequest {
522        join_set_id: JoinSetId,
523        request: JoinSetRequest,
524    },
525    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
526    /// When the response arrives at `resp_time`:
527    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
528    /// original executor can continue. After the expiry any executor can continue without
529    /// marking the execution as timed out.
530    #[display("JoinNext({join_set_id})")]
531    JoinNext {
532        join_set_id: JoinSetId,
533        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
534        /// The pending status will be kept in Locked state until `run_expires_at`.
535        run_expires_at: DateTime<Utc>,
536        /// Set to a specific function when calling `-await-next` extension function, used for
537        /// determinism checks.
538        requested_ffqn: Option<FunctionFqn>,
539        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
540        closing: bool,
541    },
542    /// Attempt to process next response without changing the pending state.
543    #[display("JoinNextTry({join_set_id}, {outcome})")]
544    JoinNextTry {
545        join_set_id: JoinSetId,
546        outcome: JoinNextTryOutcome,
547    },
548    /// Records the fact that a join set was awaited more times than its submission count.
549    #[display("JoinNextTooMany({join_set_id})")]
550    JoinNextTooMany {
551        join_set_id: JoinSetId,
552        /// Set to a specific function when calling `-await-next` extension function, used for
553        /// determinism checks.
554        requested_ffqn: Option<FunctionFqn>,
555    },
556    #[display("Schedule({execution_id}, {schedule_at})")]
557    Schedule {
558        execution_id: ExecutionId,
559        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
560        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
561        result: Result<(), ScheduleRequestError>,
562    },
563    #[display("Stub({target_execution_id})")]
564    Stub {
565        target_execution_id: ExecutionIdDerived,
566        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StubRetVal::Typed(crate::SUPPORTED_RETURN_VALUE_OK_EMPTY).hash()))]
567        retval_hash: StubRetValHash,
568        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
569        result: Result<(), StubError>,
570    },
571}
572
573/// Stub return value - only used during processing, not stored in history.
574#[derive(derive_more::Debug, Clone, PartialEq, Eq)]
575#[cfg_attr(any(test, feature = "test"), derive(Serialize, Deserialize))]
576#[cfg_attr(any(test, feature = "test"), serde(rename_all = "snake_case"))]
577pub enum StubRetVal {
578    Typed(SupportedFunctionReturnValue),
579    Untyped(String),
580}
581
582impl StubRetVal {
583    /// Compute a stable hash of the return value for determinism checks.
584    #[must_use]
585    pub fn hash(&self) -> StubRetValHash {
586        use sha2::{Digest as _, Sha256};
587        const STUB_RETVAL_HASH_VERSION: u8 = 1;
588        let mut hasher = Sha256::default();
589
590        match self {
591            StubRetVal::Typed(val) => {
592                hasher.update(b"T|");
593                // Serialize to JSON for stable hashing
594                let json = serde_json::to_string(val)
595                    .expect("SupportedFunctionReturnValue is always serializable");
596                hasher.update(json.as_bytes());
597            }
598            StubRetVal::Untyped(s) => {
599                hasher.update(b"U|");
600                hasher.update(s.as_bytes());
601            }
602        }
603
604        let hash_bytes = hasher.finalize();
605        let mut result = [0u8; 33];
606        result[0] = STUB_RETVAL_HASH_VERSION;
607        result[1..].copy_from_slice(&hash_bytes);
608
609        StubRetValHash(result)
610    }
611}
612
613/// Hash of a stub return value, stored in history for determinism checks.
614/// Format: 1 byte version + 32 bytes SHA-256 hash.
615#[derive(
616    Clone,
617    PartialEq,
618    Eq,
619    serde_with::SerializeDisplay,
620    serde_with::DeserializeFromStr,
621    schemars::JsonSchema,
622)]
623#[schemars(with = "String")]
624pub struct StubRetValHash([u8; 33]);
625
626impl Display for StubRetValHash {
627    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
628        for b in self.0 {
629            write!(f, "{b:02x}")?;
630        }
631        Ok(())
632    }
633}
634
635impl Debug for StubRetValHash {
636    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
637        Display::fmt(self, f)
638    }
639}
640
641impl std::str::FromStr for StubRetValHash {
642    type Err = StubRetValHashParseError;
643
644    fn from_str(s: &str) -> Result<Self, Self::Err> {
645        if s.len() != 66 {
646            // 33 bytes * 2 hex chars = 66
647            return Err(StubRetValHashParseError::InvalidLength(s.len()));
648        }
649        let mut bytes = [0u8; 33];
650        for i in 0..33 {
651            let chunk = &s[i * 2..i * 2 + 2];
652            bytes[i] =
653                u8::from_str_radix(chunk, 16).map_err(|_| StubRetValHashParseError::InvalidHex)?;
654        }
655        Ok(StubRetValHash(bytes))
656    }
657}
658
659#[derive(Debug, thiserror::Error)]
660pub enum StubRetValHashParseError {
661    #[error("invalid length: expected 66 hex chars, got {0}")]
662    InvalidLength(usize),
663    #[error("invalid hex character")]
664    InvalidHex,
665}
666
667/// Error from the `-stub` extension function.
668/// Mirrors `obelisk:types/execution.{stub-error}` from WIT.
669#[derive(
670    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
671)]
672#[serde(rename_all = "snake_case")]
673pub enum StubError {
674    #[error("execution not found")]
675    ExecutionNotFound,
676    #[error("type check error: {0}")]
677    TypeCheckError(String),
678    #[error("conflict")]
679    Conflict,
680}
681
682/// Error from the `schedule-json` function. Persisted in history for determinism.
683#[derive(
684    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
685)]
686#[serde(rename_all = "snake_case")]
687pub enum ScheduleRequestError {
688    #[error("function not found")]
689    FunctionNotFound,
690    #[error("params parsing error: {0}")]
691    TypeCheckError(String),
692}
693
694/// Error from the `submit-json` function. Persisted in history for determinism.
695#[derive(
696    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
697)]
698#[serde(rename_all = "snake_case")]
699pub enum ChildExecutionRequestError {
700    #[error("function not found")]
701    FunctionNotFound,
702    #[error("params parsing error: {0}")]
703    TypeCheckError(String),
704}
705
706#[derive(
707    Debug,
708    Clone,
709    Copy,
710    PartialEq,
711    Eq,
712    derive_more::Display,
713    Serialize,
714    Deserialize,
715    schemars::JsonSchema,
716)]
717#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
718#[serde(rename_all = "snake_case")]
719pub enum JoinNextTryOutcome {
720    /// A response was found and processed.
721    #[display("found")]
722    Found,
723    /// No response available, but there are still pending requests.
724    #[display("pending")]
725    Pending,
726    /// No response available, and all requests have been processed.
727    #[display("all_processed")]
728    AllProcessed,
729}
730
731impl From<bool> for JoinNextTryOutcome {
732    /// Migration helper: converts old `found_response: bool` to the new enum.
733    /// `false` maps to `Pending` as a conservative default (the exact error
734    /// was not stored before).
735    fn from(found_response: bool) -> Self {
736        if found_response {
737            JoinNextTryOutcome::Found
738        } else {
739            JoinNextTryOutcome::Pending
740        }
741    }
742}
743
744#[derive(
745    Debug,
746    Clone,
747    Copy,
748    PartialEq,
749    Eq,
750    derive_more::Display,
751    Serialize,
752    Deserialize,
753    schemars::JsonSchema,
754)]
755#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
756#[serde(rename_all = "snake_case")]
757pub enum HistoryEventScheduleAt {
758    Now,
759    #[display("At(`{_0}`)")]
760    At(DateTime<Utc>),
761    #[display("In({_0:?})")]
762    In(Duration),
763}
764
765#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
766pub enum ScheduleAtConversionError {
767    #[error("source duration value is out of range")]
768    OutOfRangeError,
769}
770
771impl HistoryEventScheduleAt {
772    pub fn as_date_time(
773        &self,
774        now: DateTime<Utc>,
775    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
776        match self {
777            Self::Now => Ok(now),
778            Self::At(date_time) => Ok(*date_time),
779            Self::In(duration) => {
780                let time_delta = TimeDelta::from_std(*duration)
781                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
782                now.checked_add_signed(time_delta)
783                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
784            }
785        }
786    }
787}
788
789#[derive(
790    Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize, schemars::JsonSchema,
791)]
792#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
793#[serde(tag = "type", rename_all = "snake_case")]
794pub enum JoinSetRequest {
795    // Must be created by the executor in `PendingState::Locked`.
796    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
797    DelayRequest {
798        delay_id: DelayId,
799        expires_at: DateTime<Utc>,
800        schedule_at: HistoryEventScheduleAt,
801    },
802    // Must be created by the executor in `PendingState::Locked`.
803    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
804    ChildExecutionRequest {
805        child_execution_id: ExecutionIdDerived,
806        target_ffqn: FunctionFqn,
807        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
808        params: Params,
809        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
810        result: Result<(), ChildExecutionRequestError>,
811    },
812}
813
814/// Error that is not specific to an execution.
815#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
816pub enum DbErrorGeneric {
817    #[error("database error: {reason}")]
818    Uncategorized {
819        reason: StrVariant,
820        #[eq(skip)]
821        #[partial_eq(skip)]
822        context: SpanTrace,
823        #[eq(skip)]
824        #[partial_eq(skip)]
825        #[source]
826        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
827        loc: &'static Location<'static>,
828    },
829    #[error("database was closed")]
830    Close,
831}
832
833#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
834pub enum DbErrorWriteNonRetriable {
835    #[error("validation failed: {0}")]
836    ValidationFailed(StrVariant),
837    #[error("conflict")]
838    Conflict,
839    #[error("already finished")]
840    AlreadyFinished,
841    #[error("illegal state: {reason}")]
842    IllegalState {
843        reason: StrVariant,
844        #[eq(skip)]
845        #[partial_eq(skip)]
846        context: SpanTrace,
847        #[eq(skip)]
848        #[partial_eq(skip)]
849        #[source]
850        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
851        loc: &'static Location<'static>,
852    },
853    #[error("version conflict: expected: {expected}, got: {requested}")]
854    VersionConflict {
855        expected: Version,
856        requested: Version,
857    },
858}
859
860/// Write error tied to an execution
861#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
862pub enum DbErrorWrite {
863    #[error("cannot write - row not found")]
864    NotFound,
865    #[error("non-retriable error: {0}")]
866    NonRetriable(#[from] DbErrorWriteNonRetriable),
867    #[error(transparent)]
868    Generic(#[from] DbErrorGeneric),
869}
870
871/// Read error tied to an execution
872#[derive(Debug, Clone, thiserror::Error, PartialEq)]
873pub enum DbErrorRead {
874    #[error("cannot read - row not found")]
875    NotFound,
876    #[error(transparent)]
877    Generic(#[from] DbErrorGeneric),
878}
879
880#[derive(Debug, thiserror::Error, PartialEq)]
881pub enum DbErrorReadWithTimeout {
882    #[error("timeout")]
883    Timeout(TimeoutOutcome),
884    #[error(transparent)]
885    DbErrorRead(#[from] DbErrorRead),
886}
887
888// Represents next version after successfuly appended to execution log.
889// TODO: Convert to struct with next_version
890pub type AppendResponse = Version;
891pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
892
893#[derive(Debug, Clone)]
894pub struct LockedExecution {
895    pub execution_id: ExecutionId,
896    pub next_version: Version,
897    pub metadata: ExecutionMetadata,
898    pub locked_event: Locked,
899    pub ffqn: FunctionFqn,
900    pub params: Params,
901    pub event_history: Vec<(HistoryEvent, Version)>,
902    pub responses: Vec<ResponseWithCursor>,
903    pub parent: Option<(ExecutionId, JoinSetId)>,
904    pub intermittent_event_count: u32,
905}
906
907pub type LockPendingResponse = Vec<LockedExecution>;
908pub type AppendBatchResponse = Version;
909
910#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
911#[display("{event}")]
912pub struct AppendRequest {
913    pub created_at: DateTime<Utc>,
914    pub event: ExecutionRequest,
915}
916
917#[derive(Debug, Clone)]
918pub struct CreateRequest {
919    pub created_at: DateTime<Utc>,
920    pub execution_id: ExecutionId,
921    pub ffqn: FunctionFqn,
922    pub params: Params,
923    pub parent: Option<(ExecutionId, JoinSetId)>,
924    pub scheduled_at: DateTime<Utc>,
925    pub component_id: ComponentId,
926    pub deployment_id: DeploymentId,
927    pub metadata: ExecutionMetadata,
928    pub scheduled_by: Option<ExecutionId>,
929}
930
931impl From<CreateRequest> for ExecutionRequest {
932    fn from(value: CreateRequest) -> Self {
933        Self::Created {
934            ffqn: value.ffqn,
935            params: value.params,
936            parent: value.parent,
937            scheduled_at: value.scheduled_at,
938            component_id: value.component_id,
939            deployment_id: value.deployment_id,
940            metadata: value.metadata,
941            scheduled_by: value.scheduled_by,
942        }
943    }
944}
945
946#[async_trait]
947pub trait DbPool: Send + Sync {
948    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
949
950    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
951
952    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
953
954    #[cfg(feature = "test")]
955    async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
956}
957
958#[async_trait]
959pub trait DbPoolCloseable {
960    async fn close(&self);
961}
962
963#[derive(Clone, Debug)]
964pub struct AppendEventsToExecution {
965    pub execution_id: ExecutionId,
966    pub version: Version,
967    pub batch: Vec<AppendRequest>,
968}
969
970#[derive(Clone, Debug)]
971pub struct AppendResponseToExecution {
972    pub parent_execution_id: ExecutionId,
973    pub created_at: DateTime<Utc>,
974    pub join_set_id: JoinSetId,
975    pub child_execution_id: ExecutionIdDerived,
976    pub finished_version: Version,
977    pub result: SupportedFunctionReturnValue,
978}
979
980#[async_trait]
981pub trait DbExecutor: Send + Sync {
982    #[expect(clippy::too_many_arguments)]
983    async fn lock_pending_by_ffqns(
984        &self,
985        batch_size: u32,
986        pending_at_or_sooner: DateTime<Utc>,
987        ffqns: Arc<[FunctionFqn]>,
988        created_at: DateTime<Utc>,
989        component_id: ComponentId,
990        deployment_id: DeploymentId,
991        executor_id: ExecutorId,
992        lock_expires_at: DateTime<Utc>,
993        run_id: RunId,
994        retry_config: ComponentRetryConfig,
995    ) -> Result<LockPendingResponse, DbErrorWrite>;
996
997    #[expect(clippy::too_many_arguments)]
998    async fn lock_pending_by_component_digest(
999        &self,
1000        batch_size: u32,
1001        pending_at_or_sooner: DateTime<Utc>,
1002        component_id: &ComponentId,
1003        deployment_id: DeploymentId,
1004        created_at: DateTime<Utc>,
1005        executor_id: ExecutorId,
1006        lock_expires_at: DateTime<Utc>,
1007        run_id: RunId,
1008        retry_config: ComponentRetryConfig,
1009    ) -> Result<LockPendingResponse, DbErrorWrite>;
1010
1011    #[cfg(feature = "test")]
1012    #[expect(clippy::too_many_arguments)]
1013    async fn lock_one(
1014        &self,
1015        created_at: DateTime<Utc>,
1016        component_id: ComponentId,
1017        deployment_id: DeploymentId,
1018        execution_id: &ExecutionId,
1019        run_id: RunId,
1020        version: Version,
1021        executor_id: ExecutorId,
1022        lock_expires_at: DateTime<Utc>,
1023        retry_config: ComponentRetryConfig,
1024    ) -> Result<LockedExecution, DbErrorWrite>;
1025
1026    /// Append a single event to an existing execution log.
1027    /// The request cannot contain [`ExecutionRequest::Created`].
1028    async fn append(
1029        &self,
1030        execution_id: ExecutionId,
1031        version: Version,
1032        req: AppendRequest,
1033    ) -> Result<AppendResponse, DbErrorWrite>;
1034
1035    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
1036    /// The batch cannot contain [`ExecutionRequest::Created`].
1037    async fn append_batch_respond_to_parent(
1038        &self,
1039        events: AppendEventsToExecution,
1040        response: AppendResponseToExecution,
1041        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1042    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1043
1044    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
1045    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
1046    /// Otherwise wait until `timeout_fut` resolves.
1047    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
1048    async fn wait_for_pending_by_ffqn(
1049        &self,
1050        pending_at_or_sooner: DateTime<Utc>,
1051        ffqns: Arc<[FunctionFqn]>,
1052        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1053    );
1054
1055    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
1056    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
1057    /// Otherwise wait until `timeout_fut` resolves.
1058    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
1059    async fn wait_for_pending_by_component_digest(
1060        &self,
1061        pending_at_or_sooner: DateTime<Utc>,
1062        component_digest: &ComponentDigest,
1063        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1064    );
1065
1066    async fn cancel_activity_with_retries(
1067        &self,
1068        execution_id: &ExecutionId,
1069        cancelled_at: DateTime<Utc>,
1070    ) -> Result<CancelOutcome, DbErrorWrite> {
1071        let mut retries = 5;
1072        loop {
1073            let res = self.cancel_activity(execution_id, cancelled_at).await;
1074            if res.is_ok() || retries == 0 {
1075                return res;
1076            }
1077            retries -= 1;
1078        }
1079    }
1080
1081    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
1082    async fn get_last_execution_event(
1083        &self,
1084        execution_id: &ExecutionId,
1085    ) -> Result<ExecutionEvent, DbErrorRead>;
1086
1087    async fn cancel_activity(
1088        &self,
1089        execution_id: &ExecutionId,
1090        cancelled_at: DateTime<Utc>,
1091    ) -> Result<CancelOutcome, DbErrorWrite> {
1092        debug!("Determining cancellation state of {execution_id}");
1093
1094        let last_event = self
1095            .get_last_execution_event(execution_id)
1096            .await
1097            .map_err(DbErrorWrite::from)?;
1098        if let ExecutionRequest::Finished {
1099            retval:
1100                SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1101                    kind: ExecutionFailureKind::Cancelled,
1102                    ..
1103                }),
1104            ..
1105        } = last_event.event
1106        {
1107            return Ok(CancelOutcome::Cancelled);
1108        } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
1109            debug!("Not cancelling, {execution_id} is already finished");
1110            return Ok(CancelOutcome::AlreadyFinished);
1111        }
1112        let finished_version = last_event.version.increment();
1113        let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1114            reason: None,
1115            kind: ExecutionFailureKind::Cancelled,
1116            detail: None,
1117        });
1118        let cancel_request = AppendRequest {
1119            created_at: cancelled_at,
1120            event: ExecutionRequest::Finished {
1121                retval: child_result.clone(),
1122                http_client_traces: None,
1123            },
1124        };
1125        debug!("Cancelling activity {execution_id} at {finished_version}");
1126        if let ExecutionId::Derived(execution_id) = execution_id {
1127            let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
1128            let child_execution_id = ExecutionId::Derived(execution_id.clone());
1129            self.append_batch_respond_to_parent(
1130                AppendEventsToExecution {
1131                    execution_id: child_execution_id,
1132                    version: finished_version.clone(),
1133                    batch: vec![cancel_request],
1134                },
1135                AppendResponseToExecution {
1136                    parent_execution_id,
1137                    created_at: cancelled_at,
1138                    join_set_id: join_set_id.clone(),
1139                    child_execution_id: execution_id.clone(),
1140                    finished_version,
1141                    result: child_result,
1142                },
1143                cancelled_at,
1144            )
1145            .await?;
1146        } else {
1147            self.append(execution_id.clone(), finished_version, cancel_request)
1148                .await?;
1149        }
1150        debug!("Cancelled {execution_id}");
1151        Ok(CancelOutcome::Cancelled)
1152    }
1153}
1154
1155pub enum AppendDelayResponseOutcome {
1156    Success,
1157    AlreadyFinished,
1158    AlreadyCancelled,
1159}
1160
1161#[derive(Debug, Clone, Default)]
1162pub struct ListExecutionsFilter {
1163    pub ffqn_prefix: Option<String>,
1164    pub show_derived: bool,
1165    pub hide_finished: bool,
1166    pub execution_id_prefix: Option<String>,
1167    pub component_digest: Option<ComponentDigest>,
1168    pub deployment_id: Option<DeploymentId>,
1169}
1170
1171#[async_trait]
1172pub trait DbExternalApi: DbConnection {
1173    /// Get the latest backtrace if version is not set.
1174    async fn get_backtrace(
1175        &self,
1176        execution_id: &ExecutionId,
1177        filter: BacktraceFilter,
1178    ) -> Result<BacktraceInfo, DbErrorRead>;
1179
1180    /// Store a source file associated with a component digest.
1181    /// `frame_key` is either an exact frame symbol path or a suffix (with leading `/`)
1182    /// when `is_suffix` is true. Idempotent — repeated calls for the same key are ignored.
1183    async fn upsert_source_file(
1184        &self,
1185        component_digest: &ComponentDigest,
1186        frame_key: &str,
1187        is_suffix: bool,
1188        content: &str,
1189    ) -> Result<(), DbErrorWrite>;
1190
1191    /// Look up a source file by component digest and a frame symbol path.
1192    /// Matches either exact keys or suffix keys (where the frame path ends with the stored key).
1193    /// Returns `None` if not found or if multiple suffix entries match (ambiguous).
1194    async fn get_source_file(
1195        &self,
1196        component_digest: &ComponentDigest,
1197        file: &str,
1198    ) -> Result<Option<String>, DbErrorRead>;
1199
1200    /// Returns executions sorted in descending order.
1201    async fn list_executions(
1202        &self,
1203        filter: ListExecutionsFilter,
1204        pagination: ExecutionListPagination,
1205    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
1206
1207    /// Returns execution events for the given execution.
1208    ///
1209    /// Results are always ordered from oldest to newest (ascending by version),
1210    /// regardless of pagination direction.
1211    async fn list_execution_events(
1212        &self,
1213        execution_id: &ExecutionId,
1214        pagination: Pagination<VersionType>,
1215        include_backtrace_id: bool,
1216    ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
1217
1218    /// Returns responses of an execution ordered as they arrived,
1219    /// enabling matching each `JoinNext` to its corresponding response.
1220    ///
1221    /// Results are always ordered from oldest to newest (ascending by cursor),
1222    /// regardless of pagination direction.
1223    ///
1224    /// As an optimization, the implementation can return an empty list of `responses`
1225    /// and `max_cursor` set to 0 if the execution is not found.
1226    async fn list_responses(
1227        &self,
1228        execution_id: &ExecutionId,
1229        pagination: Pagination<u32>,
1230    ) -> Result<ListResponsesResponse, DbErrorRead>;
1231
1232    async fn list_execution_events_responses(
1233        &self,
1234        execution_id: &ExecutionId,
1235        req_since: &Version,
1236        req_max_length: VersionType,
1237        req_include_backtrace_id: bool,
1238        resp_pagination: Pagination<VersionType>,
1239    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
1240
1241    async fn upgrade_execution_component(
1242        &self,
1243        execution_id: &ExecutionId,
1244        old: &ComponentDigest,
1245        new: &ComponentDigest,
1246    ) -> Result<(), DbErrorWrite>;
1247
1248    async fn list_logs(
1249        &self,
1250        execution_id: &ExecutionId,
1251        filter: LogFilter,
1252        pagination: Pagination<u32>,
1253    ) -> Result<ListLogsResponse, DbErrorRead>;
1254
1255    async fn list_deployment_states(
1256        &self,
1257        current_time: DateTime<Utc>,
1258        pagination: Pagination<Option<DeploymentId>>,
1259        include_config_json: bool,
1260    ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1261
1262    /// Insert a new deployment. The record must have `status == Inactive` and
1263    /// `last_active_at == None`; activation is a separate step via [`Self::activate_deployment`].
1264    async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite>;
1265
1266    async fn activate_deployment(
1267        &self,
1268        deployment_id: DeploymentId,
1269        now: DateTime<Utc>,
1270    ) -> Result<(), DbErrorWrite>;
1271
1272    /// Mark a deployment as Enqueued (pending next server restart).
1273    /// Returns `Err(DbErrorWriteNonRetriable::Conflict)` if the deployment is currently Active.
1274    /// Any previously Enqueued deployment is demoted to Inactive.
1275    async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite>;
1276
1277    /// Returned [`DeploymentRecord`] must contain `config_json`.
1278    async fn get_deployment(
1279        &self,
1280        deployment_id: DeploymentId,
1281    ) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1282
1283    /// Return active deployment.
1284    /// Returned [`DeploymentRecord`] must contain `config_json`.
1285    #[cfg(feature = "test")]
1286    async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1287
1288    /// Return the most relevant current deployment: Enqueued if present, otherwise Active.
1289    /// Returned [`DeploymentRecord`] must contain `config_json`.
1290    async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1291
1292    async fn list_deployments(
1293        &self,
1294        pagination: Pagination<Option<DeploymentId>>,
1295    ) -> Result<Vec<DeploymentRecord>, DbErrorRead>;
1296
1297    /// Pause an execution. Only pending executions can be paused.
1298    async fn pause_execution(
1299        &self,
1300        execution_id: &ExecutionId,
1301        paused_at: DateTime<Utc>,
1302    ) -> Result<AppendResponse, DbErrorWrite>;
1303
1304    /// Unpause an execution. Only paused executions can be unpaused.
1305    async fn unpause_execution(
1306        &self,
1307        execution_id: &ExecutionId,
1308        unpaused_at: DateTime<Utc>,
1309    ) -> Result<AppendResponse, DbErrorWrite>;
1310}
1311pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1312pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1313    Pagination::OlderThan {
1314        length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1315        cursor: None,
1316        including_cursor: false,
1317    };
1318
1319pub struct DeploymentState {
1320    pub deployment_id: DeploymentId,
1321    pub locked: u32,
1322    // In `PendingAt` state, scheduled to present or past
1323    pub pending: u32,
1324    // In `PendingAt` state, scheduled into the future
1325    pub scheduled: u32,
1326    pub blocked: u32,
1327    pub finished: u32,
1328    /// None if not requested from db.
1329    pub config_json: Option<String>,
1330    pub created_at: DateTime<Utc>,
1331    /// Set when the deployment becomes Active; None if it has never been active.
1332    pub last_active_at: Option<DateTime<Utc>>,
1333    pub status: DeploymentStatus,
1334}
1335
1336#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1337pub enum DeploymentStatus {
1338    Inactive,
1339    /// Queued to become Active on the next server restart.
1340    Enqueued,
1341    Active,
1342}
1343
1344impl DeploymentStatus {
1345    #[must_use]
1346    pub fn as_str(&self) -> &'static str {
1347        match self {
1348            DeploymentStatus::Inactive => "inactive",
1349            DeploymentStatus::Enqueued => "enqueued",
1350            DeploymentStatus::Active => "active",
1351        }
1352    }
1353}
1354
1355impl std::str::FromStr for DeploymentStatus {
1356    type Err = StrVariant;
1357    fn from_str(s: &str) -> Result<Self, Self::Err> {
1358        match s {
1359            "inactive" => Ok(DeploymentStatus::Inactive),
1360            "enqueued" => Ok(DeploymentStatus::Enqueued),
1361            "active" => Ok(DeploymentStatus::Active),
1362            _ => Err(StrVariant::from(format!("unknown deployment status: {s}"))),
1363        }
1364    }
1365}
1366
1367#[derive(Debug, Clone)]
1368pub struct DeploymentRecord {
1369    pub deployment_id: DeploymentId,
1370    pub created_at: DateTime<Utc>,
1371    /// Set when the deployment becomes Active; None if it has never been active.
1372    pub last_active_at: Option<DateTime<Utc>>,
1373    pub status: DeploymentStatus,
1374    pub config_json: String,
1375    pub obelisk_version: String,
1376    pub created_by: Option<String>,
1377}
1378
1379#[derive(Debug)]
1380pub struct ListLogsResponse {
1381    pub items: Vec<LogEntryRow>,
1382    pub next_page: Pagination<u32>, // Newer logs can always arrive e.g. via replay
1383    pub prev_page: Option<Pagination<u32>>, // None if we are already at the beginning
1384}
1385
1386#[derive(Debug)]
1387pub struct LogFilter {
1388    show_logs: bool,
1389    show_streams: bool,
1390    levels: Vec<LogLevel>, // Only applied if `show_logs` = true, empty means return all levels.
1391    stream_types: Vec<LogStreamType>, // Only applied if `show_streams` = true, empty means return all stream types.
1392}
1393impl LogFilter {
1394    // Constructor for logs only
1395    #[must_use]
1396    pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1397        LogFilter {
1398            show_logs: true,
1399            show_streams: false,
1400            levels,
1401            stream_types: Vec::new(),
1402        }
1403    }
1404    // Constructor for streams only
1405    #[must_use]
1406    pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1407        LogFilter {
1408            show_logs: false,
1409            show_streams: true,
1410            levels: Vec::new(),
1411            stream_types,
1412        }
1413    }
1414    // Constructor for both logs and streams
1415    #[must_use]
1416    pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1417        LogFilter {
1418            show_logs: true,
1419            show_streams: true,
1420            levels,
1421            stream_types,
1422        }
1423    }
1424    // Getters
1425    #[must_use]
1426    pub fn should_show_logs(&self) -> bool {
1427        self.show_logs
1428    }
1429    #[must_use]
1430    pub fn should_show_streams(&self) -> bool {
1431        self.show_streams
1432    }
1433    #[must_use]
1434    pub fn levels(&self) -> &Vec<LogLevel> {
1435        &self.levels
1436    }
1437    #[must_use]
1438    pub fn stream_types(&self) -> &Vec<LogStreamType> {
1439        &self.stream_types
1440    }
1441}
1442
1443#[derive(Debug, Clone)]
1444pub struct ExecutionWithStateRequestsResponses {
1445    pub execution_with_state: ExecutionWithState,
1446    pub events: Vec<ExecutionEvent>,
1447    pub responses: Vec<ResponseWithCursor>,
1448    pub max_version: Version,
1449    pub max_cursor: ResponseCursor,
1450}
1451
1452#[async_trait]
1453pub trait DbConnection: DbExecutor {
1454    /// Get execution log.
1455    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1456
1457    async fn append_delay_response(
1458        &self,
1459        created_at: DateTime<Utc>,
1460        execution_id: ExecutionId,
1461        join_set_id: JoinSetId,
1462        delay_id: DelayId,
1463        outcome: Result<(), ()>, // Successfully finished - `Ok(())` or cancelled - `Err(())`
1464    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1465
1466    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
1467    /// The batch cannot contain [`ExecutionRequest::Created`].
1468    async fn append_batch(
1469        &self,
1470        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1471        batch: Vec<AppendRequest>,
1472        execution_id: ExecutionId,
1473        version: Version,
1474    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1475
1476    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
1477    /// The batch cannot contain [`ExecutionRequest::Created`].
1478    async fn append_batch_create_new_execution(
1479        &self,
1480        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1481        batch: Vec<AppendRequest>,   // must not contain [`ExecutionRequest::Created`] events
1482        execution_id: ExecutionId,
1483        version: Version,
1484        child_req: Vec<CreateRequest>,
1485        backtraces: Vec<BacktraceInfo>,
1486    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1487
1488    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
1489    async fn get_execution_event(
1490        &self,
1491        execution_id: &ExecutionId,
1492        version: &Version,
1493    ) -> Result<ExecutionEvent, DbErrorRead>;
1494
1495    #[instrument(skip(self))]
1496    async fn get_create_request(
1497        &self,
1498        execution_id: &ExecutionId,
1499    ) -> Result<CreateRequest, DbErrorRead> {
1500        let execution_event = self
1501            .get_execution_event(execution_id, &Version::new(0))
1502            .await?;
1503        if let ExecutionRequest::Created {
1504            ffqn,
1505            params,
1506            parent,
1507            scheduled_at,
1508            component_id,
1509            deployment_id,
1510            metadata,
1511            scheduled_by,
1512        } = execution_event.event
1513        {
1514            Ok(CreateRequest {
1515                created_at: execution_event.created_at,
1516                execution_id: execution_id.clone(),
1517                ffqn,
1518                params,
1519                parent,
1520                scheduled_at,
1521                component_id,
1522                deployment_id,
1523                metadata,
1524                scheduled_by,
1525            })
1526        } else {
1527            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1528                reason: "execution log must start with creation".into(),
1529                context: SpanTrace::capture(),
1530                source: None,
1531                loc: Location::caller(),
1532            }))
1533        }
1534    }
1535
1536    async fn get_pending_state(
1537        &self,
1538        execution_id: &ExecutionId,
1539    ) -> Result<ExecutionWithState, DbErrorRead>;
1540
1541    /// Get currently expired locks and async timers (delay requests)
1542    async fn get_expired_timers(
1543        &self,
1544        at: DateTime<Utc>,
1545    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1546
1547    /// Create a new execution log
1548    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1549
1550    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
1551    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
1552    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
1553    /// Implementations with no pubsub support should use polling.
1554    /// Callers are expected to call this function in a loop with a reasonable timeout
1555    /// to support less stellar implementations.
1556    async fn subscribe_to_next_responses(
1557        &self,
1558        execution_id: &ExecutionId,
1559        last_response: ResponseCursor,
1560        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1561    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1562
1563    /// First, attempt to fetch the finished value. If the execution is not finished yet, poll
1564    /// periodically or subscribe to db changes, racing with `timeout_fut`.
1565    /// Notification mechainism with no strict guarantees for getting the finished result.
1566    /// Implementations with no pubsub support should use polling.
1567    /// Callers are expected to call this function in a loop with a reasonable timeout
1568    /// to support less stellar implementations.
1569    async fn wait_for_finished_result(
1570        &self,
1571        execution_id: &ExecutionId,
1572        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1573    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1574
1575    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1576
1577    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1578
1579    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1580
1581    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1582
1583    /// Returns `TimeoutOutcome::Timeout` if not in Finished state.
1584    #[cfg(feature = "test")]
1585    async fn get_finished_result(
1586        &self,
1587        execution_id: &ExecutionId,
1588    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1589        self.wait_for_finished_result(
1590            execution_id,
1591            Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1592        )
1593        .await
1594    }
1595}
1596
1597#[derive(Clone, Debug)]
1598pub struct LogInfoAppendRow {
1599    pub execution_id: ExecutionId,
1600    pub run_id: RunId,
1601    pub log_entry: LogEntry,
1602}
1603
1604#[derive(Debug, Clone)]
1605pub struct LogEntryRow {
1606    pub cursor: u32,
1607    pub run_id: RunId,
1608    pub log_entry: LogEntry,
1609}
1610
1611#[derive(Debug, Clone)]
1612pub enum LogEntry {
1613    Log {
1614        created_at: DateTime<Utc>,
1615        level: LogLevel,
1616        message: String,
1617    },
1618    Stream {
1619        created_at: DateTime<Utc>,
1620        payload: Vec<u8>,
1621        stream_type: LogStreamType,
1622    },
1623}
1624impl LogEntry {
1625    #[must_use]
1626    pub fn created_at(&self) -> DateTime<Utc> {
1627        match self {
1628            LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1629        }
1630    }
1631}
1632
1633#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1634#[try_from(repr)]
1635#[repr(u8)]
1636pub enum LogLevel {
1637    Trace = 1,
1638    Debug,
1639    Info,
1640    Warn,
1641    Error,
1642}
1643#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1644#[try_from(repr)]
1645#[repr(u8)]
1646pub enum LogStreamType {
1647    StdOut = 1,
1648    StdErr,
1649}
1650
1651#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1652pub enum TimeoutOutcome {
1653    Timeout,
1654    Cancel,
1655}
1656
1657#[cfg(feature = "test")]
1658#[async_trait]
1659pub trait DbConnectionTest: DbConnection {
1660    async fn append_response(
1661        &self,
1662        created_at: DateTime<Utc>,
1663        execution_id: ExecutionId,
1664        response_event: JoinSetResponseEvent,
1665    ) -> Result<(), DbErrorWrite>;
1666}
1667
1668#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1669pub enum CancelOutcome {
1670    Cancelled,
1671    AlreadyFinished,
1672}
1673
1674#[instrument(skip(db_connection))]
1675pub async fn stub_execution(
1676    db_connection: &dyn DbConnection,
1677    execution_id: ExecutionIdDerived,
1678    parent_execution_id: ExecutionId,
1679    join_set_id: JoinSetId,
1680    created_at: DateTime<Utc>,
1681    return_value: SupportedFunctionReturnValue,
1682) -> Result<(), DbErrorWrite> {
1683    let stub_finished_version = Version::new(1); // Stub activities have no execution log except Created event.
1684    // Attempt to write to `execution_id` and its parent, ignoring the possible conflict error on this tx
1685    let write_attempt = {
1686        let finished_req = AppendRequest {
1687            created_at,
1688            event: ExecutionRequest::Finished {
1689                retval: return_value.clone(),
1690                http_client_traces: None,
1691            },
1692        };
1693        db_connection
1694            .append_batch_respond_to_parent(
1695                AppendEventsToExecution {
1696                    execution_id: ExecutionId::Derived(execution_id.clone()),
1697                    version: stub_finished_version.clone(),
1698                    batch: vec![finished_req],
1699                },
1700                AppendResponseToExecution {
1701                    parent_execution_id,
1702                    created_at,
1703                    join_set_id,
1704                    child_execution_id: execution_id.clone(),
1705                    finished_version: stub_finished_version.clone(),
1706                    result: return_value.clone(),
1707                },
1708                created_at,
1709            )
1710            .await
1711    };
1712    if let Err(write_attempt) = write_attempt {
1713        // Check that the expected value is in the database
1714        debug!("Stub write attempt failed - {write_attempt:?}");
1715
1716        let found = db_connection
1717            .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1718            .await?; // Not found at this point should not happen, unless the previous write failed. Will be retried.
1719        match found.event {
1720            ExecutionRequest::Finished {
1721                retval: found_result,
1722                ..
1723            } if return_value == found_result => {
1724                // Same value has already be written, RPC is successful.
1725                Ok(())
1726            }
1727            ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1728                DbErrorWriteNonRetriable::Conflict,
1729            )),
1730            _other => Err(DbErrorWrite::NonRetriable(
1731                DbErrorWriteNonRetriable::IllegalState {
1732                    reason: "unexpected execution event at stubbed execution".into(),
1733                    context: SpanTrace::capture(),
1734                    source: None,
1735                    loc: Location::caller(),
1736                },
1737            )),
1738        }
1739    } else {
1740        Ok(())
1741    }
1742}
1743
1744pub async fn cancel_delay(
1745    db_connection: &dyn DbConnection,
1746    delay_id: DelayId,
1747    created_at: DateTime<Utc>,
1748) -> Result<CancelOutcome, DbErrorWrite> {
1749    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1750    db_connection
1751        .append_delay_response(
1752            created_at,
1753            parent_execution_id,
1754            join_set_id,
1755            delay_id,
1756            Err(()), // Mark as cancelled.
1757        )
1758        .await
1759        .map(|ok| match ok {
1760            AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1761                CancelOutcome::Cancelled
1762            }
1763            AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1764        })
1765}
1766
1767#[derive(Clone, Debug)]
1768pub enum BacktraceFilter {
1769    First,
1770    Last,
1771    Specific(Version),
1772}
1773
1774#[derive(Clone, Debug, PartialEq, Eq)]
1775pub struct BacktraceInfo {
1776    pub execution_id: ExecutionId,
1777    pub component_id: ComponentId,
1778    pub version_min_including: Version,
1779    pub version_max_excluding: Version,
1780    pub wasm_backtrace: WasmBacktrace,
1781}
1782
1783#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1784pub struct WasmBacktrace {
1785    pub frames: Vec<FrameInfo>,
1786}
1787
1788#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1789pub struct FrameInfo {
1790    pub module: String,
1791    pub func_name: String,
1792    pub symbols: Vec<FrameSymbol>,
1793}
1794
1795#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1796pub struct FrameSymbol {
1797    pub func_name: Option<String>,
1798    pub file: Option<String>,
1799    pub line: Option<u32>,
1800    pub col: Option<u32>,
1801}
1802
1803mod wasm_backtrace {
1804    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1805
1806    impl WasmBacktrace {
1807        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1808            if backtrace.frames().is_empty() {
1809                None
1810            } else {
1811                Some(Self {
1812                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1813                })
1814            }
1815        }
1816    }
1817
1818    impl From<&wasmtime::FrameInfo> for FrameInfo {
1819        fn from(frame: &wasmtime::FrameInfo) -> Self {
1820            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1821            let mut func_name = String::new();
1822            wasmtime_environ::demangle_function_name_or_index(
1823                &mut func_name,
1824                frame.func_name(),
1825                frame.func_index() as usize,
1826            )
1827            .expect("writing to string must succeed");
1828            Self {
1829                module: module_name,
1830                func_name,
1831                symbols: frame
1832                    .symbols()
1833                    .iter()
1834                    .map(std::convert::Into::into)
1835                    .collect(),
1836            }
1837        }
1838    }
1839
1840    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1841        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1842            let func_name = symbol.name().map(|name| {
1843                let mut writer = String::new();
1844                wasmtime_environ::demangle_function_name(&mut writer, name)
1845                    .expect("writing to string must succeed");
1846                writer
1847            });
1848
1849            Self {
1850                func_name,
1851                file: symbol.file().map(ToString::to_string),
1852                line: symbol.line(),
1853                col: symbol.column(),
1854            }
1855        }
1856    }
1857}
1858#[derive(Debug, Clone, derive_more::Display)]
1859#[display("{execution_id} {pending_state} {component_digest}")]
1860pub struct ExecutionWithState {
1861    pub execution_id: ExecutionId,
1862    pub ffqn: FunctionFqn,
1863    pub pending_state: PendingState,
1864    pub created_at: DateTime<Utc>,
1865    pub first_scheduled_at: DateTime<Utc>,
1866    pub component_digest: ComponentDigest,
1867    pub component_type: ComponentType,
1868    pub deployment_id: DeploymentId,
1869}
1870
1871#[derive(Debug, Clone)]
1872pub enum ExecutionListPagination {
1873    CreatedBy(Pagination<Option<DateTime<Utc>>>),
1874    ExecutionId(Pagination<Option<ExecutionId>>),
1875}
1876impl Default for ExecutionListPagination {
1877    fn default() -> ExecutionListPagination {
1878        ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1879            length: 20,
1880            cursor: None,
1881            including_cursor: false, // does not matter when `cursor` is not specified
1882        })
1883    }
1884}
1885impl ExecutionListPagination {
1886    #[must_use]
1887    pub fn length(&self) -> u16 {
1888        match self {
1889            ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1890            ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1891        }
1892    }
1893}
1894
1895#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1896pub enum Pagination<T> {
1897    NewerThan {
1898        length: u16,
1899        cursor: T,
1900        including_cursor: bool,
1901    },
1902    OlderThan {
1903        length: u16,
1904        cursor: T,
1905        including_cursor: bool,
1906    },
1907}
1908impl<T: Clone> Pagination<T> {
1909    pub fn length(&self) -> u16 {
1910        match self {
1911            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1912        }
1913    }
1914
1915    pub fn rel(&self) -> &'static str {
1916        match self {
1917            Pagination::NewerThan {
1918                including_cursor: false,
1919                ..
1920            } => ">",
1921            Pagination::NewerThan {
1922                including_cursor: true,
1923                ..
1924            } => ">=",
1925            Pagination::OlderThan {
1926                including_cursor: false,
1927                ..
1928            } => "<",
1929            Pagination::OlderThan {
1930                including_cursor: true,
1931                ..
1932            } => "<=",
1933        }
1934    }
1935
1936    pub fn is_desc(&self) -> bool {
1937        matches!(self, Pagination::OlderThan { .. })
1938    }
1939
1940    pub fn asc_or_desc(&self) -> &'static str {
1941        if self.is_asc() { "asc" } else { "desc" }
1942    }
1943
1944    pub fn is_asc(&self) -> bool {
1945        !self.is_desc()
1946    }
1947
1948    pub fn cursor(&self) -> &T {
1949        match self {
1950            Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1951        }
1952    }
1953
1954    #[must_use]
1955    pub fn invert(&self) -> Self {
1956        match self {
1957            Pagination::NewerThan {
1958                length,
1959                cursor,
1960                including_cursor,
1961            } => Pagination::OlderThan {
1962                length: *length,
1963                cursor: cursor.clone(),
1964                including_cursor: !including_cursor,
1965            },
1966            Pagination::OlderThan {
1967                length,
1968                cursor,
1969                including_cursor,
1970            } => Pagination::NewerThan {
1971                length: *length,
1972                cursor: cursor.clone(),
1973                including_cursor: !including_cursor,
1974            },
1975        }
1976    }
1977}
1978
1979#[cfg(feature = "test")]
1980pub async fn wait_for_pending_state_fn<T: Debug>(
1981    db_connection: &dyn DbConnectionTest,
1982    execution_id: &ExecutionId,
1983    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1984    timeout: Option<Duration>,
1985) -> Result<T, DbErrorReadWithTimeout> {
1986    tracing::trace!(%execution_id, "Waiting for predicate");
1987    let fut = async move {
1988        loop {
1989            let execution_log = db_connection.get(execution_id).await?;
1990            if let Some(t) = predicate(execution_log) {
1991                tracing::debug!(%execution_id, "Found: {t:?}");
1992                return Ok(t);
1993            }
1994            tokio::time::sleep(Duration::from_millis(10)).await;
1995        }
1996    };
1997
1998    if let Some(timeout) = timeout {
1999        tokio::select! { // future's liveness: Dropping the loser immediately.
2000            res = fut => res,
2001            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
2002        }
2003    } else {
2004        fut.await
2005    }
2006}
2007
2008#[derive(Debug, Clone, PartialEq, Eq)]
2009pub enum ExpiredTimer {
2010    Lock(ExpiredLock),
2011    Delay(ExpiredDelay),
2012}
2013
2014#[derive(Debug, Clone, PartialEq, Eq)]
2015pub struct ExpiredLock {
2016    pub execution_id: ExecutionId,
2017    // Version of last `Locked` event, used to detect whether the execution made progress.
2018    pub locked_at_version: Version,
2019    pub next_version: Version,
2020    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
2021    pub intermittent_event_count: u32,
2022    pub max_retries: Option<u32>,
2023    pub retry_exp_backoff: Duration,
2024    pub locked_by: LockedBy,
2025}
2026
2027#[derive(Debug, Clone, PartialEq, Eq)]
2028pub struct ExpiredDelay {
2029    pub execution_id: ExecutionId,
2030    pub join_set_id: JoinSetId,
2031    pub delay_id: DelayId,
2032}
2033
2034#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2035#[serde(tag = "status", rename_all = "snake_case")]
2036pub enum PendingState {
2037    /// Caused by [`ExecutionRequest::Locked`].
2038    Locked(PendingStateLocked),
2039
2040    #[display("PendingAt(`{_0}`)")]
2041    PendingAt(PendingStatePendingAt),
2042
2043    /// Caused by [`HistoryEvent::JoinNext`]
2044    #[display("BlockedByJoinSet({_0})")]
2045    BlockedByJoinSet(PendingStateBlockedByJoinSet),
2046
2047    #[display("Paused({_0})")]
2048    Paused(PendingStatePaused),
2049
2050    #[display("Finished({_0})")]
2051    Finished(PendingStateFinished),
2052}
2053
2054pub enum PendingStateMergedPause {
2055    Locked {
2056        state: PendingStateLocked,
2057        paused: bool,
2058    },
2059    PendingAt {
2060        state: PendingStatePendingAt,
2061        paused: bool,
2062    },
2063    BlockedByJoinSet {
2064        state: PendingStateBlockedByJoinSet,
2065        paused: bool,
2066    },
2067    Finished(PendingStateFinished),
2068}
2069impl From<PendingState> for PendingStateMergedPause {
2070    fn from(state: PendingState) -> Self {
2071        match state {
2072            PendingState::Locked(s) => PendingStateMergedPause::Locked {
2073                state: s,
2074                paused: false,
2075            },
2076
2077            PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
2078                state: s,
2079                paused: false,
2080            },
2081
2082            PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
2083                state: s,
2084                paused: false,
2085            },
2086
2087            PendingState::Paused(paused) => match paused {
2088                PendingStatePaused::Locked(s) => PendingStateMergedPause::Locked {
2089                    state: s,
2090                    paused: true,
2091                },
2092                PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
2093                    state: s,
2094                    paused: true,
2095                },
2096                PendingStatePaused::BlockedByJoinSet(s) => {
2097                    PendingStateMergedPause::BlockedByJoinSet {
2098                        state: s,
2099                        paused: true,
2100                    }
2101                }
2102            },
2103
2104            PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
2105        }
2106    }
2107}
2108
2109#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2110#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
2111pub struct PendingStateLocked {
2112    pub locked_by: LockedBy,
2113    pub lock_expires_at: DateTime<Utc>,
2114}
2115
2116#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2117#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
2118pub struct PendingStatePendingAt {
2119    pub scheduled_at: DateTime<Utc>,
2120    /// `last_lock` is needed for lock extension.
2121    pub last_lock: Option<LockedBy>,
2122}
2123
2124#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2125#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
2126pub struct PendingStateBlockedByJoinSet {
2127    pub join_set_id: JoinSetId,
2128    /// See [`HistoryEvent::JoinNext::lock_expires_at`].
2129    pub lock_expires_at: DateTime<Utc>,
2130    /// Blocked by closing of the join set
2131    pub closing: bool,
2132}
2133
2134/// State of execution before it was paused.
2135#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2136pub enum PendingStatePaused {
2137    #[display("Locked({_0})")]
2138    Locked(PendingStateLocked),
2139    #[display("PendingAt({_0})")]
2140    PendingAt(PendingStatePendingAt),
2141    #[display("BlockedByJoinSet({_0})")]
2142    BlockedByJoinSet(PendingStateBlockedByJoinSet),
2143}
2144
2145#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2146pub struct LockedBy {
2147    pub executor_id: ExecutorId,
2148    pub run_id: RunId,
2149}
2150impl From<&Locked> for LockedBy {
2151    fn from(value: &Locked) -> Self {
2152        LockedBy {
2153            executor_id: value.executor_id,
2154            run_id: value.run_id,
2155        }
2156    }
2157}
2158
2159#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2160#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
2161pub struct PendingStateFinished {
2162    pub version: VersionType, // not Version since it must be Copy
2163    pub finished_at: DateTime<Utc>,
2164    pub result_kind: PendingStateFinishedResultKind,
2165}
2166impl Display for PendingStateFinished {
2167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2168        match self.result_kind {
2169            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
2170            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
2171        }
2172    }
2173}
2174
2175// This is not a Result so that it can be customized for serialization
2176#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2177#[serde(rename_all = "snake_case")]
2178pub enum PendingStateFinishedResultKind {
2179    Ok,
2180    Err(PendingStateFinishedError),
2181}
2182impl PendingStateFinishedResultKind {
2183    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
2184        match self {
2185            PendingStateFinishedResultKind::Ok => Ok(()),
2186            PendingStateFinishedResultKind::Err(err) => Err(err),
2187        }
2188    }
2189}
2190
2191impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
2192    fn from(result: &SupportedFunctionReturnValue) -> Self {
2193        result.as_pending_state_finished_result()
2194    }
2195}
2196
2197#[derive(
2198    Debug,
2199    Clone,
2200    Copy,
2201    PartialEq,
2202    Eq,
2203    Serialize,
2204    Deserialize,
2205    derive_more::Display,
2206    schemars::JsonSchema,
2207)]
2208#[serde(rename_all = "snake_case")]
2209pub enum PendingStateFinishedError {
2210    #[display("execution terminated: {_0}")]
2211    ExecutionFailure(ExecutionFailureKind),
2212    #[display("execution completed with an error")]
2213    Error,
2214}
2215
2216impl PendingState {
2217    #[instrument(skip(self))]
2218    pub fn can_append_lock(
2219        &self,
2220        created_at: DateTime<Utc>,
2221        executor_id: ExecutorId,
2222        run_id: RunId,
2223        lock_expires_at: DateTime<Utc>,
2224    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
2225        if lock_expires_at <= created_at {
2226            return Err(DbErrorWriteNonRetriable::ValidationFailed(
2227                "invalid expiry date".into(),
2228            ));
2229        }
2230        match self {
2231            PendingState::PendingAt(PendingStatePendingAt {
2232                scheduled_at,
2233                last_lock,
2234            }) => {
2235                if *scheduled_at <= created_at {
2236                    // pending now, ok to lock
2237                    Ok(LockKind::CreatingNewLock)
2238                } else if let Some(LockedBy {
2239                    executor_id: last_executor_id,
2240                    run_id: last_run_id,
2241                }) = last_lock
2242                    && executor_id == *last_executor_id
2243                    && run_id == *last_run_id
2244                {
2245                    // Original executor is extending the lock.
2246                    Ok(LockKind::Extending)
2247                } else {
2248                    Err(DbErrorWriteNonRetriable::ValidationFailed(
2249                        "cannot lock, not yet pending".into(),
2250                    ))
2251                }
2252            }
2253            PendingState::Locked(PendingStateLocked {
2254                locked_by:
2255                    LockedBy {
2256                        executor_id: current_pending_state_executor_id,
2257                        run_id: current_pending_state_run_id,
2258                    },
2259                lock_expires_at: _,
2260            }) => {
2261                if executor_id == *current_pending_state_executor_id
2262                    && run_id == *current_pending_state_run_id
2263                {
2264                    // Original executor is extending the lock.
2265                    Ok(LockKind::Extending)
2266                } else {
2267                    Err(DbErrorWriteNonRetriable::IllegalState {
2268                        reason: "cannot lock, already locked".into(),
2269                        context: SpanTrace::capture(),
2270                        source: None,
2271                        loc: Location::caller(),
2272                    })
2273                }
2274            }
2275            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2276                reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
2277                context: SpanTrace::capture(),
2278                source: None,
2279                loc: Location::caller(),
2280            }),
2281            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2282                reason: "already finished".into(),
2283                context: SpanTrace::capture(),
2284                source: None,
2285                loc: Location::caller(),
2286            }),
2287            PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
2288                reason: "cannot lock, execution is paused".into(),
2289                context: SpanTrace::capture(),
2290                source: None,
2291                loc: Location::caller(),
2292            }),
2293        }
2294    }
2295
2296    #[must_use]
2297    pub fn is_finished(&self) -> bool {
2298        matches!(self, PendingState::Finished { .. })
2299    }
2300
2301    #[must_use]
2302    pub fn is_paused(&self) -> bool {
2303        matches!(self, PendingState::Paused(_))
2304    }
2305}
2306
2307#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2308pub enum LockKind {
2309    Extending,
2310    CreatingNewLock,
2311}
2312
2313pub mod http_client_trace {
2314    use chrono::{DateTime, Utc};
2315    use serde::{Deserialize, Serialize};
2316
2317    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2318    pub struct HttpClientTrace {
2319        pub req: RequestTrace,
2320        pub resp: Option<ResponseTrace>,
2321    }
2322
2323    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2324    pub struct RequestTrace {
2325        pub sent_at: DateTime<Utc>,
2326        pub uri: String,
2327        pub method: String,
2328    }
2329
2330    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2331    pub struct ResponseTrace {
2332        pub finished_at: DateTime<Utc>,
2333        pub status: Result<u16, String>,
2334    }
2335}
2336
2337/// Root type for DB schema generation - contains all serialized DB types
2338#[derive(schemars::JsonSchema)]
2339pub struct DbStorageSchema {
2340    pub execution_event: ExecutionEvent,
2341    pub pending_state: PendingState,
2342    pub join_set_response: JoinSetResponse,
2343    pub wasm_backtrace: WasmBacktrace,
2344}
2345
2346#[cfg(test)]
2347mod tests {
2348    use super::HistoryEvent;
2349    use super::HistoryEventScheduleAt;
2350    use super::JoinNextTryOutcome;
2351    use super::PendingStateFinished;
2352    use super::PendingStateFinishedError;
2353    use super::PendingStateFinishedResultKind;
2354    use crate::ExecutionFailureKind;
2355    use crate::JoinSetId;
2356    use crate::SupportedFunctionReturnValue;
2357    use chrono::DateTime;
2358    use chrono::Datelike;
2359    use chrono::Utc;
2360    use insta::assert_snapshot;
2361    use rstest::rstest;
2362    use std::time::Duration;
2363    use val_json::type_wrapper::TypeWrapper;
2364    use val_json::wast_val::WastVal;
2365    use val_json::wast_val::WastValWithType;
2366
2367    #[rstest(expected => [
2368        PendingStateFinishedResultKind::Ok,
2369        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2370    ])]
2371    #[test]
2372    fn serde_pending_state_finished_result_kind_should_work(
2373        expected: PendingStateFinishedResultKind,
2374    ) {
2375        let ser = serde_json::to_string(&expected).unwrap();
2376        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2377        assert_eq!(expected, actual);
2378    }
2379
2380    #[rstest(result_kind => [
2381        PendingStateFinishedResultKind::Ok,
2382        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2383    ])]
2384    #[test]
2385    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2386        let expected = PendingStateFinished {
2387            version: 0,
2388            finished_at: Utc::now(),
2389            result_kind,
2390        };
2391
2392        let ser = serde_json::to_string(&expected).unwrap();
2393        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2394        assert_eq!(expected, actual);
2395    }
2396
2397    #[test]
2398    fn join_set_deser_with_result_ok_option_none_should_work() {
2399        let expected = SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2400            r#type: TypeWrapper::Result {
2401                ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2402                err: Some(Box::new(TypeWrapper::String)),
2403            },
2404            value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2405        }));
2406        let json = serde_json::to_string(&expected).unwrap();
2407        assert_snapshot!(json);
2408
2409        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2410
2411        assert_eq!(expected, actual);
2412    }
2413
2414    #[test]
2415    fn as_date_time_should_work_with_duration_u32_max_secs() {
2416        let duration = Duration::from_secs(u64::from(u32::MAX));
2417        let schedule_at = HistoryEventScheduleAt::In(duration);
2418        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2419        assert_eq!(2106, resolved.year());
2420    }
2421
2422    const MILLIS_PER_SEC: i64 = 1000;
2423    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2424
2425    #[test]
2426    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2427        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
2428        let duration = Duration::from_secs(
2429            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2430        );
2431        let schedule_at = HistoryEventScheduleAt::In(duration);
2432        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2433    }
2434
2435    #[test]
2436    fn join_next_try_outcome_new_format() {
2437        let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"found"}"#;
2438        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2439        assert_eq!(
2440            event,
2441            HistoryEvent::JoinNextTry {
2442                join_set_id: JoinSetId::new(
2443                    crate::JoinSetKind::Named,
2444                    crate::StrVariant::Static("test")
2445                )
2446                .unwrap(),
2447                outcome: JoinNextTryOutcome::Found,
2448            }
2449        );
2450
2451        let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"all_processed"}"#;
2452        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2453        assert_eq!(
2454            event,
2455            HistoryEvent::JoinNextTry {
2456                join_set_id: JoinSetId::new(
2457                    crate::JoinSetKind::Named,
2458                    crate::StrVariant::Static("test")
2459                )
2460                .unwrap(),
2461                outcome: JoinNextTryOutcome::AllProcessed,
2462            }
2463        );
2464    }
2465
2466    #[test]
2467    fn join_next_try_outcome_serializes_new_format() {
2468        let event = HistoryEvent::JoinNextTry {
2469            join_set_id: JoinSetId::new(
2470                crate::JoinSetKind::Named,
2471                crate::StrVariant::Static("test"),
2472            )
2473            .unwrap(),
2474            outcome: JoinNextTryOutcome::AllProcessed,
2475        };
2476        let json = serde_json::to_string(&event).unwrap();
2477        assert!(
2478            json.contains(r#""outcome":"all_processed""#),
2479            "expected outcome field, got: {json}"
2480        );
2481        assert!(
2482            !json.contains("found_response"),
2483            "should not contain old field, got: {json}"
2484        );
2485    }
2486
2487    mod stub_retval_hash {
2488        use super::super::{StubRetVal, StubRetValHash};
2489        use crate::SupportedFunctionReturnValue;
2490        use val_json::type_wrapper::TypeWrapper;
2491        use val_json::wast_val::{WastVal, WastValWithType};
2492
2493        #[test]
2494        fn typed_variant_hash_is_stable() {
2495            let retval =
2496                StubRetVal::Typed(SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2497                    r#type: TypeWrapper::String,
2498                    value: WastVal::String("hello".into()),
2499                })));
2500            let hash = retval.hash();
2501            // Hash should start with version byte 0x01
2502            assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2503            // Hash should be 66 hex characters (33 bytes * 2)
2504            assert_eq!(hash.to_string().len(), 66);
2505        }
2506
2507        #[test]
2508        fn untyped_variant_hash_is_stable() {
2509            let retval = StubRetVal::Untyped(r#"{"ok": "hello"}"#.to_string());
2510            let hash = retval.hash();
2511            // Hash should start with version byte 0x01
2512            assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2513            // Hash should be 66 hex characters (33 bytes * 2)
2514            assert_eq!(hash.to_string().len(), 66);
2515        }
2516
2517        #[test]
2518        fn different_values_produce_different_hashes() {
2519            let typed1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2520            let typed2 = StubRetVal::Typed(SupportedFunctionReturnValue::Err(None));
2521            let untyped1 = StubRetVal::Untyped("value1".to_string());
2522            let untyped2 = StubRetVal::Untyped("value2".to_string());
2523
2524            let hashes: Vec<_> = [typed1, typed2, untyped1, untyped2]
2525                .into_iter()
2526                .map(|r| r.hash().to_string())
2527                .collect();
2528
2529            // All hashes should be unique
2530            for (i, h1) in hashes.iter().enumerate() {
2531                for h2 in hashes.iter().skip(i + 1) {
2532                    assert_ne!(h1, h2, "hashes should be different");
2533                }
2534            }
2535        }
2536
2537        #[test]
2538        fn same_values_produce_same_hashes() {
2539            let retval1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2540            let retval2 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2541            assert_eq!(retval1.hash(), retval2.hash());
2542
2543            let untyped1 = StubRetVal::Untyped("test".to_string());
2544            let untyped2 = StubRetVal::Untyped("test".to_string());
2545            assert_eq!(untyped1.hash(), untyped2.hash());
2546        }
2547
2548        #[test]
2549        fn hash_serialization_roundtrip() {
2550            let retval = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2551            let hash = retval.hash();
2552
2553            let serialized = serde_json::to_string(&hash).unwrap();
2554            let deserialized: StubRetValHash = serde_json::from_str(&serialized).unwrap();
2555
2556            assert_eq!(hash, deserialized);
2557        }
2558
2559        #[test]
2560        fn hash_display_and_fromstr_roundtrip() {
2561            let retval = StubRetVal::Untyped("test value".to_string());
2562            let hash = retval.hash();
2563
2564            let display = hash.to_string();
2565            let parsed: StubRetValHash = display.parse().unwrap();
2566
2567            assert_eq!(hash, parsed);
2568        }
2569
2570        #[test]
2571        fn typed_and_untyped_with_same_content_produce_different_hashes() {
2572            // Even if the JSON content is the same, Typed vs Untyped should hash differently
2573            let typed = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2574            let json_of_typed =
2575                serde_json::to_string(&SupportedFunctionReturnValue::Ok(None)).unwrap();
2576            let untyped = StubRetVal::Untyped(json_of_typed);
2577
2578            assert_ne!(typed.hash(), untyped.hash());
2579        }
2580    }
2581}