Skip to main content

obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionError;
8use crate::FunctionFqn;
9use crate::JoinSetId;
10use crate::Params;
11use crate::StrVariant;
12use crate::SupportedFunctionReturnValue;
13use crate::component_id::ComponentDigest;
14use crate::prefixed_ulid::DelayId;
15use crate::prefixed_ulid::DeploymentId;
16use crate::prefixed_ulid::ExecutionIdDerived;
17use crate::prefixed_ulid::ExecutorId;
18use crate::prefixed_ulid::RunId;
19use assert_matches::assert_matches;
20use async_trait::async_trait;
21use chrono::TimeDelta;
22use chrono::{DateTime, Utc};
23use http_client_trace::HttpClientTrace;
24use serde::Deserialize;
25use serde::Serialize;
26use std::fmt::Debug;
27use std::fmt::Display;
28use std::panic::Location;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33use tracing::instrument;
34use tracing_error::SpanTrace;
35
36// Shared between databases. TODO: Extract to db-common
37pub const STATE_PENDING_AT: &str = "pending_at";
38pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
39pub const STATE_LOCKED: &str = "locked";
40pub const STATE_FINISHED: &str = "finished";
41pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; // Serialization tag of `HistoryEvent::JoinNext`
42
43#[derive(Debug, PartialEq, Eq, Clone)]
44pub struct ExecutionLog {
45    pub execution_id: ExecutionId,
46    pub events: Vec<ExecutionEvent>,
47    pub responses: Vec<ResponseWithCursor>,
48    pub next_version: Version, // Is not advanced once in Finished state
49    pub pending_state: PendingState, // reflecting the current state
50    pub component_digest: ComponentDigest, // reflecting the current state
51    pub component_type: ComponentType,
52    pub deployment_id: DeploymentId, // reflecting the current state
53}
54
55impl ExecutionLog {
56    /// Return some duration after which the execution will be retried.
57    /// Return `None` if no more retries are allowed.
58    #[must_use]
59    pub fn can_be_retried_after(
60        temporary_event_count: u32,
61        max_retries: Option<u32>,
62        retry_exp_backoff: Duration,
63    ) -> Option<Duration> {
64        // If max_retries == None, wrapping is OK after this succeeds - we want to retry forever.
65        if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
66            // TODO: Add test for number of retries
67            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
68            Some(duration)
69        } else {
70            None
71        }
72    }
73
74    #[must_use]
75    pub fn compute_retry_duration_when_retrying_forever(
76        temporary_event_count: u32,
77        retry_exp_backoff: Duration,
78    ) -> Duration {
79        Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
80            .expect("`max_retries` set to MAX must never return None")
81    }
82
83    #[must_use]
84    pub fn get_create_request(&self) -> CreateRequest {
85        assert_matches!(self.events.first().cloned(), Some(ExecutionEvent {
86            event:ExecutionRequest::Created{
87                ffqn,params,parent,scheduled_at,component_id,deployment_id,metadata,scheduled_by},
88                created_at, .. }) => CreateRequest { created_at, execution_id:
89                    self.execution_id.clone(), ffqn, params, parent, scheduled_at,
90                    component_id, deployment_id, metadata, scheduled_by })
91    }
92
93    #[must_use]
94    pub fn ffqn(&self) -> &FunctionFqn {
95        assert_matches!(self.events.first(), Some(ExecutionEvent {
96            event: ExecutionRequest::Created { ffqn, .. },
97            ..
98        }) => ffqn)
99    }
100
101    #[must_use]
102    pub fn params(&self) -> &Params {
103        assert_matches!(self.events.first(), Some(ExecutionEvent {
104            event: ExecutionRequest::Created { params, .. },
105            ..
106        }) => params)
107    }
108
109    #[must_use]
110    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
111        assert_matches!(self.events.first(), Some(ExecutionEvent {
112            event: ExecutionRequest::Created { parent, .. },
113            ..
114        }) => parent.clone())
115    }
116
117    #[must_use]
118    pub fn last_event(&self) -> &ExecutionEvent {
119        self.events.last().expect("must contain at least one event")
120    }
121
122    #[must_use]
123    pub fn is_finished(&self) -> bool {
124        matches!(
125            self.events.last(),
126            Some(ExecutionEvent {
127                event: ExecutionRequest::Finished { .. },
128                ..
129            })
130        )
131    }
132
133    #[must_use]
134    pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
135        if let ExecutionEvent {
136            event: ExecutionRequest::Finished { retval: result, .. },
137            ..
138        } = self.events.last().expect("must contain at least one event")
139        {
140            Some(result.clone())
141        } else {
142            None
143        }
144    }
145
146    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
147        self.events.iter().filter_map(|event| {
148            if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
149                Some((eh.clone(), event.version.clone()))
150            } else {
151                None
152            }
153        })
154    }
155
156    #[cfg(feature = "test")]
157    #[must_use]
158    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
159        self.events
160            .iter()
161            .find_map(move |event| match &event.event {
162                ExecutionRequest::HistoryEvent {
163                    event:
164                        HistoryEvent::JoinSetRequest {
165                            join_set_id: found,
166                            request,
167                        },
168                    ..
169                } if *join_set_id == *found => Some(request),
170                _ => None,
171            })
172    }
173}
174
175pub type VersionType = u32;
176#[derive(
177    Debug,
178    Default,
179    Clone,
180    PartialEq,
181    Eq,
182    Hash,
183    derive_more::Display,
184    derive_more::Into,
185    serde::Serialize,
186    serde::Deserialize,
187    schemars::JsonSchema,
188)]
189#[serde(transparent)]
190#[schemars(transparent)]
191pub struct Version(pub VersionType);
192impl Version {
193    #[must_use]
194    pub fn new(arg: VersionType) -> Version {
195        Version(arg)
196    }
197
198    #[must_use]
199    pub fn increment(&self) -> Version {
200        Version(self.0 + 1)
201    }
202}
203impl TryFrom<i64> for Version {
204    type Error = VersionParseError;
205    fn try_from(value: i64) -> Result<Self, Self::Error> {
206        VersionType::try_from(value)
207            .map(Version::new)
208            .map_err(|_| VersionParseError)
209    }
210}
211impl From<Version> for usize {
212    fn from(value: Version) -> Self {
213        usize::try_from(value.0).expect("16 bit systems are unsupported")
214    }
215}
216impl From<&Version> for usize {
217    fn from(value: &Version) -> Self {
218        usize::try_from(value.0).expect("16 bit systems are unsupported")
219    }
220}
221
222#[derive(Debug, thiserror::Error)]
223#[error("version must be u32")]
224pub struct VersionParseError;
225
226#[derive(
227    Clone,
228    Debug,
229    derive_more::Display,
230    PartialEq,
231    Eq,
232    serde::Serialize,
233    serde::Deserialize,
234    schemars::JsonSchema,
235)]
236#[display("{event}")]
237pub struct ExecutionEvent {
238    pub created_at: DateTime<Utc>,
239    pub event: ExecutionRequest,
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub backtrace_id: Option<Version>,
242    pub version: Version,
243}
244
245#[derive(
246    Debug,
247    Clone,
248    Copy,
249    PartialEq,
250    Eq,
251    derive_more::Display,
252    derive_more::Into,
253    Serialize, /* webapi */
254    schemars::JsonSchema,
255)]
256pub struct ResponseCursor(pub u32);
257
258#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */, schemars::JsonSchema)]
259pub struct ResponseWithCursor {
260    pub event: JoinSetResponseEventOuter,
261    pub cursor: ResponseCursor,
262}
263
264#[derive(Debug)]
265pub struct ListExecutionEventsResponse {
266    pub events: Vec<ExecutionEvent>,
267    pub max_version: Version,
268}
269
270#[derive(Debug)]
271pub struct ListResponsesResponse {
272    pub responses: Vec<ResponseWithCursor>,
273    pub max_cursor: ResponseCursor,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */, schemars::JsonSchema)]
277pub struct JoinSetResponseEventOuter {
278    pub created_at: DateTime<Utc>,
279    pub event: JoinSetResponseEvent,
280}
281
282#[derive(
283    Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
284)]
285pub struct JoinSetResponseEvent {
286    pub join_set_id: JoinSetId,
287    pub event: JoinSetResponse,
288}
289
290#[derive(
291    Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display, schemars::JsonSchema,
292)]
293#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
294#[serde(tag = "type", rename_all = "snake_case")]
295pub enum JoinSetResponse {
296    #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
297    DelayFinished {
298        delay_id: DelayId,
299        result: Result<(), ()>,
300    },
301    #[display("{result}: {child_execution_id}")] // execution completed..
302    ChildExecutionFinished {
303        child_execution_id: ExecutionIdDerived,
304        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
305        finished_version: Version,
306        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
307        result: SupportedFunctionReturnValue,
308    },
309}
310
311pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
312    ffqn: FunctionFqn::new_static("", ""),
313    params: Params::empty(),
314    parent: None,
315    scheduled_at: DateTime::from_timestamp_nanos(0),
316    component_id: ComponentId::dummy_activity(),
317    deployment_id: DeploymentId::from_parts(0, 0),
318    metadata: ExecutionMetadata::empty(),
319    scheduled_by: None,
320};
321pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
322    event: HistoryEvent::JoinSetCreate {
323        join_set_id: JoinSetId {
324            kind: crate::JoinSetKind::OneOff,
325            name: StrVariant::empty(),
326        },
327    },
328};
329
330#[derive(
331    Clone,
332    derive_more::Debug,
333    derive_more::Display,
334    PartialEq,
335    Eq,
336    Serialize,
337    Deserialize,
338    schemars::JsonSchema,
339)]
340#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
341#[serde(rename_all = "snake_case")]
342pub enum ExecutionRequest {
343    #[display("Created({ffqn}, `{scheduled_at}`)")]
344    Created {
345        ffqn: FunctionFqn,
346        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
347        #[debug(skip)]
348        params: Params,
349        parent: Option<(ExecutionId, JoinSetId)>,
350        scheduled_at: DateTime<Utc>,
351        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
352        component_id: ComponentId,
353        deployment_id: DeploymentId,
354        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
355        metadata: ExecutionMetadata,
356        scheduled_by: Option<ExecutionId>,
357    },
358    Locked(Locked),
359    /// Returns execution to [`PendingState::PendingAt`] state at the specified time.
360    /// This can happen when:
361    /// - executor is running out of resources like [`WorkerError::LimitReached`]
362    /// - workflow made progress but then its lock expired.
363    /// - executor is being closed (shutdown or hot redeploy requested)
364    #[display("Unlocked(`{backoff_expires_at}`)")]
365    Unlocked {
366        backoff_expires_at: DateTime<Utc>,
367        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
368        reason: StrVariant,
369    },
370    // Created by the executor holding the lock.
371    // After expiry interpreted as pending.
372    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
373    TemporarilyFailed {
374        backoff_expires_at: DateTime<Utc>,
375        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
376        reason: StrVariant,
377        detail: Option<String>,
378        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
379        http_client_traces: Option<Vec<HttpClientTrace>>,
380    },
381    // Created by the executor holding the lock.
382    // After expiry interpreted as pending.
383    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
384    TemporarilyTimedOut {
385        backoff_expires_at: DateTime<Utc>,
386        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
387        http_client_traces: Option<Vec<HttpClientTrace>>,
388    },
389    // Created by the executor holding the lock.
390    #[display("Finished")]
391    Finished {
392        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
393        retval: SupportedFunctionReturnValue,
394        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
395        http_client_traces: Option<Vec<HttpClientTrace>>,
396    },
397
398    #[display("HistoryEvent({event})")]
399    HistoryEvent {
400        event: HistoryEvent,
401    },
402    #[display("Paused")]
403    Paused,
404    #[display("Unpaused")]
405    Unpaused,
406}
407
408impl ExecutionRequest {
409    #[must_use]
410    pub fn is_temporary_event(&self) -> bool {
411        matches!(
412            self,
413            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
414        )
415    }
416
417    /// String representation of `ExecutionRequest`, used in execution log table to fetch events of certain type, e.g. `created` + `history_event`.
418    #[must_use]
419    pub const fn variant(&self) -> &'static str {
420        match self {
421            ExecutionRequest::Created { .. } => "created",
422            ExecutionRequest::Locked(_) => "locked",
423            ExecutionRequest::Unlocked { .. } => "unlocked",
424            ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
425            ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
426            ExecutionRequest::Finished { .. } => "finished",
427            ExecutionRequest::HistoryEvent { .. } => "history_event",
428            ExecutionRequest::Paused => "paused",
429            ExecutionRequest::Unpaused => "unpaused",
430        }
431    }
432
433    #[must_use]
434    pub fn join_set_id(&self) -> Option<&JoinSetId> {
435        match self {
436            Self::Created {
437                parent: Some((_parent_id, join_set_id)),
438                ..
439            } => Some(join_set_id),
440            Self::HistoryEvent {
441                event:
442                    HistoryEvent::JoinSetCreate { join_set_id, .. }
443                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
444                    | HistoryEvent::JoinNext { join_set_id, .. },
445            } => Some(join_set_id),
446            _ => None,
447        }
448    }
449}
450
451#[derive(
452    Clone,
453    derive_more::Debug,
454    derive_more::Display,
455    PartialEq,
456    Eq,
457    Serialize,
458    Deserialize,
459    schemars::JsonSchema,
460)]
461#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
462#[display("Locked(`{lock_expires_at}`, {component_id})")]
463pub struct Locked {
464    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
465    pub component_id: ComponentId,
466    pub executor_id: ExecutorId,
467    pub deployment_id: DeploymentId,
468    pub run_id: RunId,
469    pub lock_expires_at: DateTime<Utc>,
470    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
471    pub retry_config: ComponentRetryConfig,
472}
473
474#[derive(
475    Debug,
476    Clone,
477    Copy,
478    PartialEq,
479    Eq,
480    derive_more::Display,
481    Serialize,
482    Deserialize,
483    schemars::JsonSchema,
484)]
485#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
486#[serde(tag = "type", rename_all = "snake_case")]
487pub enum PersistKind {
488    #[display("RandomU64({min}, {max_inclusive})")]
489    RandomU64 {
490        min: u64,
491        max_inclusive: u64,
492    },
493    #[display("RandomString({min_length}, {max_length_exclusive})")]
494    RandomString {
495        min_length: u64,
496        max_length_exclusive: u64,
497    },
498    ExecutionId,
499}
500
501#[must_use]
502pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
503    value.to_be_bytes()
504}
505
506#[derive(
507    derive_more::Debug,
508    Clone,
509    PartialEq,
510    Eq,
511    derive_more::Display,
512    Serialize,
513    Deserialize,
514    schemars::JsonSchema,
515)]
516#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
517#[serde(tag = "type", rename_all = "snake_case")]
518/// Must be created by the executor in [`PendingState::Locked`].
519pub enum HistoryEvent {
520    /// Persist a generated pseudorandom value.
521    #[display("Persist")]
522    Persist {
523        #[debug(skip)]
524        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
525        kind: PersistKind,
526    },
527    #[display("JoinSetCreate({join_set_id})")]
528    JoinSetCreate { join_set_id: JoinSetId },
529    #[display("JoinSetRequest({request})")]
530    // join_set_id is part of ExecutionId or DelayId in the `request`
531    JoinSetRequest {
532        join_set_id: JoinSetId,
533        request: JoinSetRequest,
534    },
535    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
536    /// When the response arrives at `resp_time`:
537    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
538    /// original executor can continue. After the expiry any executor can continue without
539    /// marking the execution as timed out.
540    #[display("JoinNext({join_set_id})")]
541    JoinNext {
542        join_set_id: JoinSetId,
543        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
544        /// The pending status will be kept in Locked state until `run_expires_at`.
545        run_expires_at: DateTime<Utc>,
546        /// Set to a specific function when calling `-await-next` extension function, used for
547        /// determinism checks.
548        requested_ffqn: Option<FunctionFqn>,
549        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
550        closing: bool,
551    },
552    /// Attempt to process next response without changing the pending state.
553    #[display("JoinNextTry({join_set_id}, {outcome})")]
554    JoinNextTry {
555        join_set_id: JoinSetId,
556        outcome: JoinNextTryOutcome,
557    },
558    /// Records the fact that a join set was awaited more times than its submission count.
559    #[display("JoinNextTooMany({join_set_id})")]
560    JoinNextTooMany {
561        join_set_id: JoinSetId,
562        /// Set to a specific function when calling `-await-next` extension function, used for
563        /// determinism checks.
564        requested_ffqn: Option<FunctionFqn>,
565    },
566    #[display("Schedule({execution_id}, {schedule_at})")]
567    Schedule {
568        execution_id: ExecutionId,
569        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
570        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
571        result: Result<(), ScheduleRequestError>,
572    },
573    #[display("Stub({target_execution_id})")]
574    Stub {
575        target_execution_id: ExecutionIdDerived,
576        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StubRetVal::Typed(crate::SUPPORTED_RETURN_VALUE_OK_EMPTY).hash()))]
577        retval_hash: StubRetValHash,
578        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
579        result: Result<(), StubError>,
580    },
581}
582
583/// Stub return value - only used during processing, not stored in history.
584#[derive(derive_more::Debug, Clone, PartialEq, Eq)]
585#[cfg_attr(any(test, feature = "test"), derive(Serialize, Deserialize))]
586#[cfg_attr(any(test, feature = "test"), serde(rename_all = "snake_case"))]
587pub enum StubRetVal {
588    Typed(SupportedFunctionReturnValue),
589    Untyped(String),
590}
591
592impl StubRetVal {
593    /// Compute a stable hash of the return value for determinism checks.
594    #[must_use]
595    pub fn hash(&self) -> StubRetValHash {
596        use sha2::{Digest as _, Sha256};
597        const STUB_RETVAL_HASH_VERSION: u8 = 1;
598        let mut hasher = Sha256::default();
599
600        match self {
601            StubRetVal::Typed(val) => {
602                hasher.update(b"T|");
603                // Serialize to JSON for stable hashing
604                let json = serde_json::to_string(val)
605                    .expect("SupportedFunctionReturnValue is always serializable");
606                hasher.update(json.as_bytes());
607            }
608            StubRetVal::Untyped(s) => {
609                hasher.update(b"U|");
610                hasher.update(s.as_bytes());
611            }
612        }
613
614        let hash_bytes = hasher.finalize();
615        let mut result = [0u8; 33];
616        result[0] = STUB_RETVAL_HASH_VERSION;
617        result[1..].copy_from_slice(&hash_bytes);
618
619        StubRetValHash(result)
620    }
621}
622
623/// Hash of a stub return value, stored in history for determinism checks.
624/// Format: 1 byte version + 32 bytes SHA-256 hash.
625#[derive(
626    Clone,
627    PartialEq,
628    Eq,
629    serde_with::SerializeDisplay,
630    serde_with::DeserializeFromStr,
631    schemars::JsonSchema,
632)]
633#[schemars(with = "String")]
634pub struct StubRetValHash([u8; 33]);
635
636impl Display for StubRetValHash {
637    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
638        for b in self.0 {
639            write!(f, "{b:02x}")?;
640        }
641        Ok(())
642    }
643}
644
645impl Debug for StubRetValHash {
646    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
647        Display::fmt(self, f)
648    }
649}
650
651impl std::str::FromStr for StubRetValHash {
652    type Err = StubRetValHashParseError;
653
654    fn from_str(s: &str) -> Result<Self, Self::Err> {
655        if s.len() != 66 {
656            // 33 bytes * 2 hex chars = 66
657            return Err(StubRetValHashParseError::InvalidLength(s.len()));
658        }
659        let mut bytes = [0u8; 33];
660        for i in 0..33 {
661            let chunk = &s[i * 2..i * 2 + 2];
662            bytes[i] =
663                u8::from_str_radix(chunk, 16).map_err(|_| StubRetValHashParseError::InvalidHex)?;
664        }
665        Ok(StubRetValHash(bytes))
666    }
667}
668
669#[derive(Debug, thiserror::Error)]
670pub enum StubRetValHashParseError {
671    #[error("invalid length: expected 66 hex chars, got {0}")]
672    InvalidLength(usize),
673    #[error("invalid hex character")]
674    InvalidHex,
675}
676
677/// Error from the `-stub` extension function.
678/// Mirrors `obelisk:types/execution.{stub-error}` from WIT.
679#[derive(
680    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
681)]
682#[serde(rename_all = "snake_case")]
683pub enum StubError {
684    #[error("execution not found")]
685    ExecutionNotFound,
686    #[error("type check error: {0}")]
687    TypeCheckError(String),
688    #[error("conflict")]
689    Conflict,
690}
691
692/// Error from the `schedule-json` function. Persisted in history for determinism.
693#[derive(
694    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
695)]
696#[serde(rename_all = "snake_case")]
697pub enum ScheduleRequestError {
698    #[error("function not found")]
699    FunctionNotFound,
700    #[error("params parsing error: {0}")]
701    TypeCheckError(String),
702}
703
704/// Error from the `submit-json` function. Persisted in history for determinism.
705#[derive(
706    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
707)]
708#[serde(rename_all = "snake_case")]
709pub enum ChildExecutionRequestError {
710    #[error("function not found")]
711    FunctionNotFound,
712    #[error("params parsing error: {0}")]
713    TypeCheckError(String),
714}
715
716#[derive(
717    Debug,
718    Clone,
719    Copy,
720    PartialEq,
721    Eq,
722    derive_more::Display,
723    Serialize,
724    Deserialize,
725    schemars::JsonSchema,
726)]
727#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
728#[serde(rename_all = "snake_case")]
729pub enum JoinNextTryOutcome {
730    /// A response was found and processed.
731    #[display("found")]
732    Found,
733    /// No response available, but there are still pending requests.
734    #[display("pending")]
735    Pending,
736    /// No response available, and all requests have been processed.
737    #[display("all_processed")]
738    AllProcessed,
739}
740
741impl From<bool> for JoinNextTryOutcome {
742    /// Migration helper: converts old `found_response: bool` to the new enum.
743    /// `false` maps to `Pending` as a conservative default (the exact error
744    /// was not stored before).
745    fn from(found_response: bool) -> Self {
746        if found_response {
747            JoinNextTryOutcome::Found
748        } else {
749            JoinNextTryOutcome::Pending
750        }
751    }
752}
753
754#[derive(
755    Debug,
756    Clone,
757    Copy,
758    PartialEq,
759    Eq,
760    derive_more::Display,
761    Serialize,
762    Deserialize,
763    schemars::JsonSchema,
764)]
765#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
766#[serde(rename_all = "snake_case")]
767pub enum HistoryEventScheduleAt {
768    Now,
769    #[display("At(`{_0}`)")]
770    At(DateTime<Utc>),
771    #[display("In({_0:?})")]
772    In(Duration),
773}
774
775#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
776pub enum ScheduleAtConversionError {
777    #[error("source duration value is out of range")]
778    OutOfRangeError,
779}
780
781impl HistoryEventScheduleAt {
782    pub fn as_date_time(
783        &self,
784        now: DateTime<Utc>,
785    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
786        match self {
787            Self::Now => Ok(now),
788            Self::At(date_time) => Ok(*date_time),
789            Self::In(duration) => {
790                let time_delta = TimeDelta::from_std(*duration)
791                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
792                now.checked_add_signed(time_delta)
793                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
794            }
795        }
796    }
797}
798
799#[derive(
800    Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize, schemars::JsonSchema,
801)]
802#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
803#[serde(tag = "type", rename_all = "snake_case")]
804pub enum JoinSetRequest {
805    // Must be created by the executor in `PendingState::Locked`.
806    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
807    DelayRequest {
808        delay_id: DelayId,
809        expires_at: DateTime<Utc>,
810        schedule_at: HistoryEventScheduleAt,
811    },
812    // Must be created by the executor in `PendingState::Locked`.
813    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
814    ChildExecutionRequest {
815        child_execution_id: ExecutionIdDerived,
816        target_ffqn: FunctionFqn,
817        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
818        params: Params,
819        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
820        result: Result<(), ChildExecutionRequestError>,
821    },
822}
823
824/// Error that is not specific to an execution.
825#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
826pub enum DbErrorGeneric {
827    #[error("database error: {reason}")]
828    Uncategorized {
829        reason: StrVariant,
830        #[eq(skip)]
831        #[partial_eq(skip)]
832        context: SpanTrace,
833        #[eq(skip)]
834        #[partial_eq(skip)]
835        #[source]
836        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
837        loc: &'static Location<'static>,
838    },
839    #[error("database was closed")]
840    Close,
841}
842
843#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
844pub enum DbErrorWriteNonRetriable {
845    #[error("validation failed: {0}")]
846    ValidationFailed(StrVariant),
847    #[error("conflict")]
848    Conflict,
849    #[error("already finished")]
850    AlreadyFinished,
851    #[error("illegal state: {reason}")]
852    IllegalState {
853        reason: StrVariant,
854        #[eq(skip)]
855        #[partial_eq(skip)]
856        context: SpanTrace,
857        #[eq(skip)]
858        #[partial_eq(skip)]
859        #[source]
860        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
861        loc: &'static Location<'static>,
862    },
863    #[error("version conflict: expected: {expected}, got: {requested}")]
864    VersionConflict {
865        expected: Version,
866        requested: Version,
867    },
868}
869
870/// Write error tied to an execution
871#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
872pub enum DbErrorWrite {
873    #[error("cannot write - row not found")]
874    NotFound,
875    #[error("non-retriable error: {0}")]
876    NonRetriable(#[from] DbErrorWriteNonRetriable),
877    #[error(transparent)]
878    Generic(#[from] DbErrorGeneric),
879}
880
881/// Read error tied to an execution
882#[derive(Debug, Clone, thiserror::Error, PartialEq)]
883pub enum DbErrorRead {
884    #[error("cannot read - row not found")]
885    NotFound,
886    #[error(transparent)]
887    Generic(#[from] DbErrorGeneric),
888}
889
890#[derive(Debug, thiserror::Error, PartialEq)]
891pub enum DbErrorReadWithTimeout {
892    #[error("timeout")]
893    Timeout(TimeoutOutcome),
894    #[error(transparent)]
895    DbErrorRead(#[from] DbErrorRead),
896}
897
898// Represents next version after successfuly appended to execution log.
899// TODO: Convert to struct with next_version
900pub type AppendResponse = Version;
901pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
902
903#[derive(Debug, Clone)]
904pub struct LockedExecution {
905    pub execution_id: ExecutionId,
906    pub next_version: Version,
907    pub metadata: ExecutionMetadata,
908    pub locked_event: Locked,
909    pub ffqn: FunctionFqn,
910    pub params: Params,
911    pub event_history: Vec<(HistoryEvent, Version)>,
912    pub responses: Vec<ResponseWithCursor>,
913    pub parent: Option<(ExecutionId, JoinSetId)>,
914    pub intermittent_event_count: u32,
915}
916
917pub type LockPendingResponse = Vec<LockedExecution>;
918pub type AppendBatchResponse = Version;
919
920#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
921#[display("{event}")]
922pub struct AppendRequest {
923    pub created_at: DateTime<Utc>,
924    pub event: ExecutionRequest,
925}
926
927#[derive(Debug, Clone)]
928pub struct CreateRequest {
929    pub created_at: DateTime<Utc>,
930    pub execution_id: ExecutionId,
931    pub ffqn: FunctionFqn,
932    pub params: Params,
933    pub parent: Option<(ExecutionId, JoinSetId)>,
934    pub scheduled_at: DateTime<Utc>,
935    pub component_id: ComponentId,
936    pub deployment_id: DeploymentId,
937    pub metadata: ExecutionMetadata,
938    pub scheduled_by: Option<ExecutionId>,
939}
940
941impl From<CreateRequest> for ExecutionRequest {
942    fn from(value: CreateRequest) -> Self {
943        Self::Created {
944            ffqn: value.ffqn,
945            params: value.params,
946            parent: value.parent,
947            scheduled_at: value.scheduled_at,
948            component_id: value.component_id,
949            deployment_id: value.deployment_id,
950            metadata: value.metadata,
951            scheduled_by: value.scheduled_by,
952        }
953    }
954}
955
956#[async_trait]
957pub trait DbPool: Send + Sync {
958    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
959
960    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
961
962    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
963
964    #[cfg(feature = "test")]
965    async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
966}
967
968#[async_trait]
969pub trait DbPoolCloseable {
970    async fn close(&self);
971}
972
973#[derive(Clone, Debug)]
974pub struct AppendEventsToExecution {
975    pub execution_id: ExecutionId,
976    pub version: Version,
977    pub batch: Vec<AppendRequest>,
978}
979
980#[derive(Clone, Debug)]
981pub struct AppendResponseToExecution {
982    pub parent_execution_id: ExecutionId,
983    pub created_at: DateTime<Utc>,
984    pub join_set_id: JoinSetId,
985    pub child_execution_id: ExecutionIdDerived,
986    pub finished_version: Version,
987    pub result: SupportedFunctionReturnValue,
988}
989
990#[async_trait]
991pub trait DbExecutor: Send + Sync {
992    #[expect(clippy::too_many_arguments)]
993    async fn lock_pending_by_ffqns(
994        &self,
995        batch_size: u32,
996        pending_at_or_sooner: DateTime<Utc>,
997        ffqns: Arc<[FunctionFqn]>,
998        created_at: DateTime<Utc>,
999        component_id: ComponentId,
1000        deployment_id: DeploymentId,
1001        executor_id: ExecutorId,
1002        lock_expires_at: DateTime<Utc>,
1003        run_id: RunId,
1004        retry_config: ComponentRetryConfig,
1005    ) -> Result<LockPendingResponse, DbErrorWrite>;
1006
1007    #[expect(clippy::too_many_arguments)]
1008    async fn lock_pending_by_component_digest(
1009        &self,
1010        batch_size: u32,
1011        pending_at_or_sooner: DateTime<Utc>,
1012        component_id: &ComponentId,
1013        deployment_id: DeploymentId,
1014        created_at: DateTime<Utc>,
1015        executor_id: ExecutorId,
1016        lock_expires_at: DateTime<Utc>,
1017        run_id: RunId,
1018        retry_config: ComponentRetryConfig,
1019    ) -> Result<LockPendingResponse, DbErrorWrite>;
1020
1021    #[cfg(feature = "test")]
1022    #[expect(clippy::too_many_arguments)]
1023    async fn lock_one(
1024        &self,
1025        created_at: DateTime<Utc>,
1026        component_id: ComponentId,
1027        deployment_id: DeploymentId,
1028        execution_id: &ExecutionId,
1029        run_id: RunId,
1030        version: Version,
1031        executor_id: ExecutorId,
1032        lock_expires_at: DateTime<Utc>,
1033        retry_config: ComponentRetryConfig,
1034    ) -> Result<LockedExecution, DbErrorWrite>;
1035
1036    /// Append a single event to an existing execution log.
1037    /// The request cannot contain [`ExecutionRequest::Created`].
1038    async fn append(
1039        &self,
1040        execution_id: ExecutionId,
1041        version: Version,
1042        req: AppendRequest,
1043    ) -> Result<AppendResponse, DbErrorWrite>;
1044
1045    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
1046    /// The batch cannot contain [`ExecutionRequest::Created`].
1047    async fn append_batch_respond_to_parent(
1048        &self,
1049        events: AppendEventsToExecution,
1050        response: AppendResponseToExecution,
1051        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1052    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1053
1054    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
1055    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
1056    /// Otherwise wait until `timeout_fut` resolves.
1057    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
1058    async fn wait_for_pending_by_ffqn(
1059        &self,
1060        pending_at_or_sooner: DateTime<Utc>,
1061        ffqns: Arc<[FunctionFqn]>,
1062        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1063    );
1064
1065    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
1066    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
1067    /// Otherwise wait until `timeout_fut` resolves.
1068    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
1069    async fn wait_for_pending_by_component_digest(
1070        &self,
1071        pending_at_or_sooner: DateTime<Utc>,
1072        component_digest: &ComponentDigest,
1073        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1074    );
1075
1076    async fn cancel_activity_with_retries(
1077        &self,
1078        execution_id: &ExecutionId,
1079        cancelled_at: DateTime<Utc>,
1080    ) -> Result<CancelOutcome, DbErrorWrite> {
1081        let mut retries = 5;
1082        loop {
1083            let res = self.cancel_activity(execution_id, cancelled_at).await;
1084            if res.is_ok() || retries == 0 {
1085                return res;
1086            }
1087            retries -= 1;
1088        }
1089    }
1090
1091    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
1092    async fn get_last_execution_event(
1093        &self,
1094        execution_id: &ExecutionId,
1095    ) -> Result<ExecutionEvent, DbErrorRead>;
1096
1097    async fn cancel_activity(
1098        &self,
1099        execution_id: &ExecutionId,
1100        cancelled_at: DateTime<Utc>,
1101    ) -> Result<CancelOutcome, DbErrorWrite> {
1102        debug!("Determining cancellation state of {execution_id}");
1103
1104        let last_event = self
1105            .get_last_execution_event(execution_id)
1106            .await
1107            .map_err(DbErrorWrite::from)?;
1108        if let ExecutionRequest::Finished {
1109            retval:
1110                SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1111                    kind: ExecutionFailureKind::Cancelled,
1112                    ..
1113                }),
1114            ..
1115        } = last_event.event
1116        {
1117            return Ok(CancelOutcome::Cancelled);
1118        } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
1119            debug!("Not cancelling, {execution_id} is already finished");
1120            return Ok(CancelOutcome::AlreadyFinished);
1121        }
1122        let finished_version = last_event.version.increment();
1123        let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1124            reason: None,
1125            kind: ExecutionFailureKind::Cancelled,
1126            detail: None,
1127        });
1128        let cancel_request = AppendRequest {
1129            created_at: cancelled_at,
1130            event: ExecutionRequest::Finished {
1131                retval: child_result.clone(),
1132                http_client_traces: None,
1133            },
1134        };
1135        debug!("Cancelling activity {execution_id} at {finished_version}");
1136        if let ExecutionId::Derived(execution_id) = execution_id {
1137            let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
1138            let child_execution_id = ExecutionId::Derived(execution_id.clone());
1139            self.append_batch_respond_to_parent(
1140                AppendEventsToExecution {
1141                    execution_id: child_execution_id,
1142                    version: finished_version.clone(),
1143                    batch: vec![cancel_request],
1144                },
1145                AppendResponseToExecution {
1146                    parent_execution_id,
1147                    created_at: cancelled_at,
1148                    join_set_id: join_set_id.clone(),
1149                    child_execution_id: execution_id.clone(),
1150                    finished_version,
1151                    result: child_result,
1152                },
1153                cancelled_at,
1154            )
1155            .await?;
1156        } else {
1157            self.append(execution_id.clone(), finished_version, cancel_request)
1158                .await?;
1159        }
1160        debug!("Cancelled {execution_id}");
1161        Ok(CancelOutcome::Cancelled)
1162    }
1163}
1164
1165pub enum AppendDelayResponseOutcome {
1166    Success,
1167    AlreadyFinished,
1168    AlreadyCancelled,
1169}
1170
1171#[derive(Debug, Clone, Default)]
1172pub struct ListExecutionsFilter {
1173    pub ffqn_prefix: Option<String>,
1174    pub show_derived: bool,
1175    pub hide_finished: bool,
1176    pub execution_id_prefix: Option<String>,
1177    pub component_digest: Option<ComponentDigest>,
1178    pub deployment_id: Option<DeploymentId>,
1179}
1180
1181#[async_trait]
1182pub trait DbExternalApi: DbConnection {
1183    /// Get the latest backtrace if version is not set.
1184    async fn get_backtrace(
1185        &self,
1186        execution_id: &ExecutionId,
1187        filter: BacktraceFilter,
1188    ) -> Result<BacktraceInfo, DbErrorRead>;
1189
1190    /// Store a source file associated with a component digest.
1191    /// `frame_key` is either an exact frame symbol path or a suffix (with leading `/`)
1192    /// when `is_suffix` is true. Idempotent — repeated calls for the same key are ignored.
1193    async fn upsert_source_file(
1194        &self,
1195        component_digest: &ComponentDigest,
1196        frame_key: &str,
1197        is_suffix: bool,
1198        content: &str,
1199    ) -> Result<(), DbErrorWrite>;
1200
1201    /// Look up a source file by component digest and a frame symbol path.
1202    /// Matches either exact keys or suffix keys (where the frame path ends with the stored key).
1203    /// Returns `None` if not found or if multiple suffix entries match (ambiguous).
1204    async fn get_source_file(
1205        &self,
1206        component_digest: &ComponentDigest,
1207        file: &str,
1208    ) -> Result<Option<String>, DbErrorRead>;
1209
1210    /// Returns executions sorted in descending order.
1211    async fn list_executions(
1212        &self,
1213        filter: ListExecutionsFilter,
1214        pagination: ExecutionListPagination,
1215    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
1216
1217    /// Returns execution events for the given execution.
1218    ///
1219    /// Results are always ordered from oldest to newest (ascending by version),
1220    /// regardless of pagination direction.
1221    async fn list_execution_events(
1222        &self,
1223        execution_id: &ExecutionId,
1224        pagination: Pagination<VersionType>,
1225        include_backtrace_id: bool,
1226    ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
1227
1228    /// Returns responses of an execution ordered as they arrived,
1229    /// enabling matching each `JoinNext` to its corresponding response.
1230    ///
1231    /// Results are always ordered from oldest to newest (ascending by cursor),
1232    /// regardless of pagination direction.
1233    ///
1234    /// As an optimization, the implementation can return an empty list of `responses`
1235    /// and `max_cursor` set to 0 if the execution is not found.
1236    async fn list_responses(
1237        &self,
1238        execution_id: &ExecutionId,
1239        pagination: Pagination<u32>,
1240    ) -> Result<ListResponsesResponse, DbErrorRead>;
1241
1242    async fn list_execution_events_responses(
1243        &self,
1244        execution_id: &ExecutionId,
1245        req_since: &Version,
1246        req_max_length: VersionType,
1247        req_include_backtrace_id: bool,
1248        resp_pagination: Pagination<VersionType>,
1249    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
1250
1251    async fn upgrade_execution_component(
1252        &self,
1253        execution_id: &ExecutionId,
1254        old: &ComponentDigest,
1255        new: &ComponentDigest,
1256    ) -> Result<(), DbErrorWrite>;
1257
1258    async fn list_logs(
1259        &self,
1260        execution_id: &ExecutionId,
1261        show_derived: bool,
1262        filter: LogFilter,
1263        pagination: Pagination<DateTime<Utc>>,
1264    ) -> Result<ListLogsResponse, DbErrorRead>;
1265
1266    async fn list_deployment_states(
1267        &self,
1268        current_time: DateTime<Utc>,
1269        pagination: Pagination<Option<DeploymentId>>,
1270        include_config_json: bool,
1271    ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1272
1273    /// Insert a new deployment. The record must have `status == Inactive` and
1274    /// `last_active_at == None`; activation is a separate step via [`Self::activate_deployment`].
1275    async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite>;
1276
1277    async fn activate_deployment(
1278        &self,
1279        deployment_id: DeploymentId,
1280        now: DateTime<Utc>,
1281    ) -> Result<(), DbErrorWrite>;
1282
1283    /// Mark a deployment as Enqueued (pending next server restart).
1284    /// Returns `Err(DbErrorWriteNonRetriable::Conflict)` if the deployment is currently Active.
1285    /// Any previously Enqueued deployment is demoted to Inactive.
1286    async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite>;
1287
1288    /// Returned [`DeploymentRecord`] must contain `config_json`.
1289    async fn get_deployment(
1290        &self,
1291        deployment_id: DeploymentId,
1292    ) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1293
1294    /// Return active deployment.
1295    /// Returned [`DeploymentRecord`] must contain `config_json`.
1296    #[cfg(feature = "test")]
1297    async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1298
1299    /// Return the most relevant current deployment: Enqueued if present, otherwise Active.
1300    /// Returned [`DeploymentRecord`] must contain `config_json`.
1301    async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1302
1303    async fn list_deployments(
1304        &self,
1305        pagination: Pagination<Option<DeploymentId>>,
1306    ) -> Result<Vec<DeploymentRecord>, DbErrorRead>;
1307
1308    /// Pause an execution. Only pending executions can be paused.
1309    async fn pause_execution(
1310        &self,
1311        execution_id: &ExecutionId,
1312        paused_at: DateTime<Utc>,
1313    ) -> Result<AppendResponse, DbErrorWrite>;
1314
1315    /// Unpause an execution. Only paused executions can be unpaused.
1316    async fn unpause_execution(
1317        &self,
1318        execution_id: &ExecutionId,
1319        unpaused_at: DateTime<Utc>,
1320    ) -> Result<AppendResponse, DbErrorWrite>;
1321}
1322pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1323pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1324    Pagination::OlderThan {
1325        length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1326        cursor: None,
1327        including_cursor: false,
1328    };
1329
1330pub struct DeploymentState {
1331    pub deployment_id: DeploymentId,
1332    pub locked: u32,
1333    // In `PendingAt` state, scheduled to present or past
1334    pub pending: u32,
1335    // In `PendingAt` state, scheduled into the future
1336    pub scheduled: u32,
1337    pub blocked: u32,
1338    pub finished: u32,
1339    /// None if not requested from db.
1340    pub config_json: Option<String>,
1341    pub created_at: DateTime<Utc>,
1342    /// Set when the deployment becomes Active; None if it has never been active.
1343    pub last_active_at: Option<DateTime<Utc>>,
1344    pub status: DeploymentStatus,
1345}
1346
1347#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1348pub enum DeploymentStatus {
1349    Inactive,
1350    /// Queued to become Active on the next server restart.
1351    Enqueued,
1352    Active,
1353}
1354
1355impl DeploymentStatus {
1356    #[must_use]
1357    pub fn as_str(&self) -> &'static str {
1358        match self {
1359            DeploymentStatus::Inactive => "inactive",
1360            DeploymentStatus::Enqueued => "enqueued",
1361            DeploymentStatus::Active => "active",
1362        }
1363    }
1364}
1365
1366impl std::str::FromStr for DeploymentStatus {
1367    type Err = StrVariant;
1368    fn from_str(s: &str) -> Result<Self, Self::Err> {
1369        match s {
1370            "inactive" => Ok(DeploymentStatus::Inactive),
1371            "enqueued" => Ok(DeploymentStatus::Enqueued),
1372            "active" => Ok(DeploymentStatus::Active),
1373            _ => Err(StrVariant::from(format!("unknown deployment status: {s}"))),
1374        }
1375    }
1376}
1377
1378#[derive(Debug, Clone)]
1379pub struct DeploymentRecord {
1380    pub deployment_id: DeploymentId,
1381    pub created_at: DateTime<Utc>,
1382    /// Set when the deployment becomes Active; None if it has never been active.
1383    pub last_active_at: Option<DateTime<Utc>>,
1384    pub status: DeploymentStatus,
1385    pub config_json: String,
1386    pub obelisk_version: String,
1387    pub created_by: Option<String>,
1388}
1389
1390#[derive(Debug)]
1391pub struct ListLogsResponse {
1392    pub items: Vec<LogEntryRow>,
1393    pub next_page: Pagination<DateTime<Utc>>, // Newer logs can always arrive e.g. via replay
1394    pub prev_page: Option<Pagination<DateTime<Utc>>>, // None if we are already at the beginning
1395}
1396
1397#[derive(Debug)]
1398pub struct LogFilter {
1399    show_logs: bool,
1400    show_streams: bool,
1401    levels: Vec<LogLevel>, // Only applied if `show_logs` = true, empty means return all levels.
1402    stream_types: Vec<LogStreamType>, // Only applied if `show_streams` = true, empty means return all stream types.
1403}
1404impl LogFilter {
1405    // Constructor for logs only
1406    #[must_use]
1407    pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1408        LogFilter {
1409            show_logs: true,
1410            show_streams: false,
1411            levels,
1412            stream_types: Vec::new(),
1413        }
1414    }
1415    // Constructor for streams only
1416    #[must_use]
1417    pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1418        LogFilter {
1419            show_logs: false,
1420            show_streams: true,
1421            levels: Vec::new(),
1422            stream_types,
1423        }
1424    }
1425    // Constructor for both logs and streams
1426    #[must_use]
1427    pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1428        LogFilter {
1429            show_logs: true,
1430            show_streams: true,
1431            levels,
1432            stream_types,
1433        }
1434    }
1435    // Getters
1436    #[must_use]
1437    pub fn should_show_logs(&self) -> bool {
1438        self.show_logs
1439    }
1440    #[must_use]
1441    pub fn should_show_streams(&self) -> bool {
1442        self.show_streams
1443    }
1444    #[must_use]
1445    pub fn levels(&self) -> &Vec<LogLevel> {
1446        &self.levels
1447    }
1448    #[must_use]
1449    pub fn stream_types(&self) -> &Vec<LogStreamType> {
1450        &self.stream_types
1451    }
1452}
1453
1454#[derive(Debug, Clone)]
1455pub struct ExecutionWithStateRequestsResponses {
1456    pub execution_with_state: ExecutionWithState,
1457    pub events: Vec<ExecutionEvent>,
1458    pub responses: Vec<ResponseWithCursor>,
1459    pub max_version: Version,
1460    pub max_cursor: ResponseCursor,
1461}
1462
1463#[async_trait]
1464pub trait DbConnection: DbExecutor {
1465    /// Get execution log.
1466    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1467
1468    async fn append_delay_response(
1469        &self,
1470        created_at: DateTime<Utc>,
1471        execution_id: ExecutionId,
1472        join_set_id: JoinSetId,
1473        delay_id: DelayId,
1474        outcome: Result<(), ()>, // Successfully finished - `Ok(())` or cancelled - `Err(())`
1475    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1476
1477    /// Append a batch of events to an existing execution log.
1478    /// The batch must not contain [`ExecutionRequest::Created`].
1479    async fn append_batch(
1480        &self,
1481        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1482        batch: Vec<AppendRequest>,
1483        execution_id: ExecutionId,
1484        version: Version,
1485    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1486
1487    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
1488    /// The batch must not contain [`ExecutionRequest::Created`].
1489    async fn append_batch_create_new_execution(
1490        &self,
1491        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1492        batch: Vec<AppendRequest>,   // must not contain [`ExecutionRequest::Created`] events
1493        execution_id: ExecutionId,
1494        version: Version,
1495        child_req: Vec<CreateRequest>,
1496        backtraces: Vec<BacktraceInfo>,
1497    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1498
1499    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
1500    async fn get_execution_event(
1501        &self,
1502        execution_id: &ExecutionId,
1503        version: &Version,
1504    ) -> Result<ExecutionEvent, DbErrorRead>;
1505
1506    #[instrument(skip(self))]
1507    async fn get_create_request(
1508        &self,
1509        execution_id: &ExecutionId,
1510    ) -> Result<CreateRequest, DbErrorRead> {
1511        let execution_event = self
1512            .get_execution_event(execution_id, &Version::new(0))
1513            .await?;
1514        if let ExecutionRequest::Created {
1515            ffqn,
1516            params,
1517            parent,
1518            scheduled_at,
1519            component_id,
1520            deployment_id,
1521            metadata,
1522            scheduled_by,
1523        } = execution_event.event
1524        {
1525            Ok(CreateRequest {
1526                created_at: execution_event.created_at,
1527                execution_id: execution_id.clone(),
1528                ffqn,
1529                params,
1530                parent,
1531                scheduled_at,
1532                component_id,
1533                deployment_id,
1534                metadata,
1535                scheduled_by,
1536            })
1537        } else {
1538            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1539                reason: "execution log must start with creation".into(),
1540                context: SpanTrace::capture(),
1541                source: None,
1542                loc: Location::caller(),
1543            }))
1544        }
1545    }
1546
1547    async fn get_pending_state(
1548        &self,
1549        execution_id: &ExecutionId,
1550    ) -> Result<ExecutionWithState, DbErrorRead>;
1551
1552    /// Get currently expired locks and async timers (delay requests)
1553    async fn get_expired_timers(
1554        &self,
1555        at: DateTime<Utc>,
1556    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1557
1558    /// Create a new execution log
1559    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1560
1561    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
1562    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
1563    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
1564    /// Implementations with no pubsub support should use polling.
1565    /// Callers are expected to call this function in a loop with a reasonable timeout
1566    /// to support less stellar implementations.
1567    async fn subscribe_to_next_responses(
1568        &self,
1569        execution_id: &ExecutionId,
1570        last_response: ResponseCursor,
1571        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1572    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1573
1574    /// First, attempt to fetch the finished value. If the execution is not finished yet, poll
1575    /// periodically or subscribe to db changes, racing with `timeout_fut`.
1576    /// Notification mechainism with no strict guarantees for getting the finished result.
1577    /// Implementations with no pubsub support should use polling.
1578    /// Callers are expected to call this function in a loop with a reasonable timeout
1579    /// to support less stellar implementations.
1580    async fn wait_for_finished_result(
1581        &self,
1582        execution_id: &ExecutionId,
1583        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1584    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1585
1586    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1587
1588    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1589
1590    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1591
1592    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1593
1594    /// Returns `TimeoutOutcome::Timeout` if not in Finished state.
1595    #[cfg(feature = "test")]
1596    async fn get_finished_result(
1597        &self,
1598        execution_id: &ExecutionId,
1599    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1600        self.wait_for_finished_result(
1601            execution_id,
1602            Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1603        )
1604        .await
1605    }
1606}
1607
1608#[derive(Clone, Debug)]
1609pub struct LogInfoAppendRow {
1610    pub execution_id: ExecutionId,
1611    pub run_id: RunId,
1612    pub log_entry: LogEntry,
1613}
1614
1615#[derive(Debug, Clone)]
1616pub struct LogEntryRow {
1617    pub cursor: DateTime<Utc>,
1618    pub run_id: RunId,
1619    pub log_entry: LogEntry,
1620    pub execution_id: ExecutionId,
1621}
1622
1623#[derive(Debug, Clone)]
1624pub enum LogEntry {
1625    Log {
1626        created_at: DateTime<Utc>,
1627        level: LogLevel,
1628        message: String,
1629    },
1630    Stream {
1631        created_at: DateTime<Utc>,
1632        payload: Vec<u8>,
1633        stream_type: LogStreamType,
1634    },
1635}
1636impl LogEntry {
1637    #[must_use]
1638    pub fn created_at(&self) -> DateTime<Utc> {
1639        match self {
1640            LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1641        }
1642    }
1643}
1644
1645#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1646#[try_from(repr)]
1647#[repr(u8)]
1648pub enum LogLevel {
1649    Trace = 1,
1650    Debug,
1651    Info,
1652    Warn,
1653    Error,
1654}
1655#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1656#[try_from(repr)]
1657#[repr(u8)]
1658pub enum LogStreamType {
1659    StdOut = 1,
1660    StdErr,
1661}
1662
1663#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1664pub enum TimeoutOutcome {
1665    Timeout,
1666    Cancel,
1667}
1668
1669#[cfg(feature = "test")]
1670#[async_trait]
1671pub trait DbConnectionTest: DbConnection {
1672    async fn append_response(
1673        &self,
1674        created_at: DateTime<Utc>,
1675        execution_id: ExecutionId,
1676        response_event: JoinSetResponseEvent,
1677    ) -> Result<(), DbErrorWrite>;
1678}
1679
1680#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1681pub enum CancelOutcome {
1682    Cancelled,
1683    AlreadyFinished,
1684}
1685
1686#[instrument(skip(db_connection))]
1687pub async fn stub_execution(
1688    db_connection: &dyn DbConnection,
1689    execution_id: ExecutionIdDerived,
1690    parent_execution_id: ExecutionId,
1691    join_set_id: JoinSetId,
1692    created_at: DateTime<Utc>,
1693    return_value: SupportedFunctionReturnValue,
1694) -> Result<(), DbErrorWrite> {
1695    let stub_finished_version = Version::new(1); // Stub activities have no execution log except Created event.
1696    // Attempt to write to `execution_id` and its parent, ignoring the possible conflict error on this tx
1697    let write_attempt = {
1698        let finished_req = AppendRequest {
1699            created_at,
1700            event: ExecutionRequest::Finished {
1701                retval: return_value.clone(),
1702                http_client_traces: None,
1703            },
1704        };
1705        db_connection
1706            .append_batch_respond_to_parent(
1707                AppendEventsToExecution {
1708                    execution_id: ExecutionId::Derived(execution_id.clone()),
1709                    version: stub_finished_version.clone(),
1710                    batch: vec![finished_req],
1711                },
1712                AppendResponseToExecution {
1713                    parent_execution_id,
1714                    created_at,
1715                    join_set_id,
1716                    child_execution_id: execution_id.clone(),
1717                    finished_version: stub_finished_version.clone(),
1718                    result: return_value.clone(),
1719                },
1720                created_at,
1721            )
1722            .await
1723    };
1724    if let Err(write_attempt) = write_attempt {
1725        // Check that the expected value is in the database
1726        debug!("Stub write attempt failed - {write_attempt:?}");
1727
1728        let found = db_connection
1729            .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1730            .await?; // Not found at this point should not happen, unless the previous write failed. Will be retried.
1731        match found.event {
1732            ExecutionRequest::Finished {
1733                retval: found_result,
1734                ..
1735            } if return_value == found_result => {
1736                // Same value has already be written, RPC is successful.
1737                Ok(())
1738            }
1739            ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1740                DbErrorWriteNonRetriable::Conflict,
1741            )),
1742            _other => Err(DbErrorWrite::NonRetriable(
1743                DbErrorWriteNonRetriable::IllegalState {
1744                    reason: "unexpected execution event at stubbed execution".into(),
1745                    context: SpanTrace::capture(),
1746                    source: None,
1747                    loc: Location::caller(),
1748                },
1749            )),
1750        }
1751    } else {
1752        Ok(())
1753    }
1754}
1755
1756pub async fn cancel_delay(
1757    db_connection: &dyn DbConnection,
1758    delay_id: DelayId,
1759    cancelled_at: DateTime<Utc>,
1760) -> Result<CancelOutcome, DbErrorWrite> {
1761    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1762    db_connection
1763        .append_delay_response(
1764            cancelled_at,
1765            parent_execution_id,
1766            join_set_id,
1767            delay_id,
1768            Err(()), // Mark as cancelled.
1769        )
1770        .await
1771        .map(|ok| match ok {
1772            AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1773                CancelOutcome::Cancelled
1774            }
1775            AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1776        })
1777}
1778
1779#[derive(Clone, Debug)]
1780pub enum BacktraceFilter {
1781    First,
1782    Last,
1783    Specific(Version),
1784}
1785
1786#[derive(Clone, Debug, PartialEq, Eq)]
1787pub struct BacktraceInfo {
1788    pub execution_id: ExecutionId,
1789    pub component_id: ComponentId,
1790    pub version_min_including: Version,
1791    pub version_max_excluding: Version,
1792    pub wasm_backtrace: WasmBacktrace,
1793}
1794
1795#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1796pub struct WasmBacktrace {
1797    pub frames: Vec<FrameInfo>,
1798}
1799
1800#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1801pub struct FrameInfo {
1802    pub module: String,
1803    pub func_name: String,
1804    pub symbols: Vec<FrameSymbol>,
1805}
1806
1807#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1808pub struct FrameSymbol {
1809    pub func_name: Option<String>,
1810    pub file: Option<String>,
1811    pub line: Option<u32>,
1812    pub col: Option<u32>,
1813}
1814
1815mod wasm_backtrace {
1816    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1817
1818    impl WasmBacktrace {
1819        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1820            if backtrace.frames().is_empty() {
1821                None
1822            } else {
1823                Some(Self {
1824                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1825                })
1826            }
1827        }
1828    }
1829
1830    impl From<&wasmtime::FrameInfo> for FrameInfo {
1831        fn from(frame: &wasmtime::FrameInfo) -> Self {
1832            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1833            let mut func_name = String::new();
1834            wasmtime_environ::demangle_function_name_or_index(
1835                &mut func_name,
1836                frame.func_name(),
1837                frame.func_index() as usize,
1838            )
1839            .expect("writing to string must succeed");
1840            Self {
1841                module: module_name,
1842                func_name,
1843                symbols: frame
1844                    .symbols()
1845                    .iter()
1846                    .map(std::convert::Into::into)
1847                    .collect(),
1848            }
1849        }
1850    }
1851
1852    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1853        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1854            let func_name = symbol.name().map(|name| {
1855                let mut writer = String::new();
1856                wasmtime_environ::demangle_function_name(&mut writer, name)
1857                    .expect("writing to string must succeed");
1858                writer
1859            });
1860
1861            Self {
1862                func_name,
1863                file: symbol.file().map(ToString::to_string),
1864                line: symbol.line(),
1865                col: symbol.column(),
1866            }
1867        }
1868    }
1869}
1870#[derive(Debug, Clone, derive_more::Display)]
1871#[display("{execution_id} {pending_state} {component_digest}")]
1872pub struct ExecutionWithState {
1873    pub execution_id: ExecutionId,
1874    pub ffqn: FunctionFqn,
1875    pub pending_state: PendingState,
1876    pub created_at: DateTime<Utc>,
1877    pub first_scheduled_at: DateTime<Utc>,
1878    pub component_digest: ComponentDigest,
1879    pub component_type: ComponentType,
1880    pub deployment_id: DeploymentId,
1881}
1882
1883#[derive(Debug, Clone)]
1884pub enum ExecutionListPagination {
1885    CreatedBy(Pagination<Option<DateTime<Utc>>>),
1886    ExecutionId(Pagination<Option<ExecutionId>>),
1887}
1888impl Default for ExecutionListPagination {
1889    fn default() -> ExecutionListPagination {
1890        ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1891            length: 20,
1892            cursor: None,
1893            including_cursor: false, // does not matter when `cursor` is not specified
1894        })
1895    }
1896}
1897impl ExecutionListPagination {
1898    #[must_use]
1899    pub fn length(&self) -> u16 {
1900        match self {
1901            ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1902            ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1903        }
1904    }
1905}
1906
1907#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1908pub enum Pagination<T> {
1909    NewerThan {
1910        length: u16,
1911        cursor: T,
1912        including_cursor: bool,
1913    },
1914    OlderThan {
1915        length: u16,
1916        cursor: T,
1917        including_cursor: bool,
1918    },
1919}
1920impl<T: Clone> Pagination<T> {
1921    pub fn length(&self) -> u16 {
1922        match self {
1923            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1924        }
1925    }
1926
1927    pub fn rel(&self) -> &'static str {
1928        match self {
1929            Pagination::NewerThan {
1930                including_cursor: false,
1931                ..
1932            } => ">",
1933            Pagination::NewerThan {
1934                including_cursor: true,
1935                ..
1936            } => ">=",
1937            Pagination::OlderThan {
1938                including_cursor: false,
1939                ..
1940            } => "<",
1941            Pagination::OlderThan {
1942                including_cursor: true,
1943                ..
1944            } => "<=",
1945        }
1946    }
1947
1948    pub fn is_desc(&self) -> bool {
1949        matches!(self, Pagination::OlderThan { .. })
1950    }
1951
1952    pub fn asc_or_desc(&self) -> &'static str {
1953        if self.is_asc() { "asc" } else { "desc" }
1954    }
1955
1956    pub fn is_asc(&self) -> bool {
1957        !self.is_desc()
1958    }
1959
1960    pub fn cursor(&self) -> &T {
1961        match self {
1962            Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1963        }
1964    }
1965
1966    #[must_use]
1967    pub fn invert(&self) -> Self {
1968        match self {
1969            Pagination::NewerThan {
1970                length,
1971                cursor,
1972                including_cursor,
1973            } => Pagination::OlderThan {
1974                length: *length,
1975                cursor: cursor.clone(),
1976                including_cursor: !including_cursor,
1977            },
1978            Pagination::OlderThan {
1979                length,
1980                cursor,
1981                including_cursor,
1982            } => Pagination::NewerThan {
1983                length: *length,
1984                cursor: cursor.clone(),
1985                including_cursor: !including_cursor,
1986            },
1987        }
1988    }
1989}
1990
1991#[cfg(feature = "test")]
1992pub async fn wait_for_pending_state_fn<T: Debug>(
1993    db_connection: &dyn DbConnectionTest,
1994    execution_id: &ExecutionId,
1995    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1996    timeout: Option<Duration>,
1997) -> Result<T, DbErrorReadWithTimeout> {
1998    tracing::trace!(%execution_id, "Waiting for predicate");
1999    let fut = async move {
2000        loop {
2001            let execution_log = db_connection.get(execution_id).await?;
2002            if let Some(t) = predicate(execution_log) {
2003                tracing::debug!(%execution_id, "Found: {t:?}");
2004                return Ok(t);
2005            }
2006            tokio::time::sleep(Duration::from_millis(10)).await;
2007        }
2008    };
2009
2010    if let Some(timeout) = timeout {
2011        tokio::select! { // future's liveness: Dropping the loser immediately.
2012            res = fut => res,
2013            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
2014        }
2015    } else {
2016        fut.await
2017    }
2018}
2019
2020#[derive(Debug, Clone, PartialEq, Eq)]
2021pub enum ExpiredTimer {
2022    Lock(ExpiredLock),
2023    Delay(ExpiredDelay),
2024}
2025
2026#[derive(Debug, Clone, PartialEq, Eq)]
2027pub struct ExpiredLock {
2028    pub execution_id: ExecutionId,
2029    // Version of last `Locked` event, used to detect whether the execution made progress.
2030    pub locked_at_version: Version,
2031    pub next_version: Version,
2032    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
2033    pub intermittent_event_count: u32,
2034    pub max_retries: Option<u32>,
2035    pub retry_exp_backoff: Duration,
2036    pub locked_by: LockedBy,
2037}
2038
2039#[derive(Debug, Clone, PartialEq, Eq)]
2040pub struct ExpiredDelay {
2041    pub execution_id: ExecutionId,
2042    pub join_set_id: JoinSetId,
2043    pub delay_id: DelayId,
2044}
2045
2046#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2047#[serde(tag = "status", rename_all = "snake_case")]
2048pub enum PendingState {
2049    /// Caused by [`ExecutionRequest::Locked`].
2050    Locked(PendingStateLocked),
2051
2052    #[display("PendingAt(`{_0}`)")]
2053    PendingAt(PendingStatePendingAt),
2054
2055    /// Caused by [`HistoryEvent::JoinNext`]
2056    #[display("BlockedByJoinSet({_0})")]
2057    BlockedByJoinSet(PendingStateBlockedByJoinSet),
2058
2059    #[display("Paused({_0})")]
2060    Paused(PendingStatePaused),
2061
2062    #[display("Finished({_0})")]
2063    Finished(PendingStateFinished),
2064}
2065
2066pub enum PendingStateMergedPause {
2067    Locked {
2068        state: PendingStateLocked,
2069        paused: bool,
2070    },
2071    PendingAt {
2072        state: PendingStatePendingAt,
2073        paused: bool,
2074    },
2075    BlockedByJoinSet {
2076        state: PendingStateBlockedByJoinSet,
2077        paused: bool,
2078    },
2079    Finished(PendingStateFinished),
2080}
2081impl From<PendingState> for PendingStateMergedPause {
2082    fn from(state: PendingState) -> Self {
2083        match state {
2084            PendingState::Locked(s) => PendingStateMergedPause::Locked {
2085                state: s,
2086                paused: false,
2087            },
2088
2089            PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
2090                state: s,
2091                paused: false,
2092            },
2093
2094            PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
2095                state: s,
2096                paused: false,
2097            },
2098
2099            PendingState::Paused(paused) => match paused {
2100                PendingStatePaused::Locked(s) => PendingStateMergedPause::Locked {
2101                    state: s,
2102                    paused: true,
2103                },
2104                PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
2105                    state: s,
2106                    paused: true,
2107                },
2108                PendingStatePaused::BlockedByJoinSet(s) => {
2109                    PendingStateMergedPause::BlockedByJoinSet {
2110                        state: s,
2111                        paused: true,
2112                    }
2113                }
2114            },
2115
2116            PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
2117        }
2118    }
2119}
2120
2121#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2122#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
2123pub struct PendingStateLocked {
2124    pub locked_by: LockedBy,
2125    pub lock_expires_at: DateTime<Utc>,
2126}
2127
2128#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2129#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
2130pub struct PendingStatePendingAt {
2131    pub scheduled_at: DateTime<Utc>,
2132    /// `last_lock` is needed for lock extension.
2133    pub last_lock: Option<LockedBy>,
2134}
2135
2136#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2137#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
2138pub struct PendingStateBlockedByJoinSet {
2139    pub join_set_id: JoinSetId,
2140    /// See [`HistoryEvent::JoinNext::lock_expires_at`].
2141    pub lock_expires_at: DateTime<Utc>,
2142    /// Blocked by closing of the join set
2143    pub closing: bool,
2144}
2145
2146/// State of execution before it was paused.
2147#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2148pub enum PendingStatePaused {
2149    #[display("Locked({_0})")]
2150    Locked(PendingStateLocked),
2151    #[display("PendingAt({_0})")]
2152    PendingAt(PendingStatePendingAt),
2153    #[display("BlockedByJoinSet({_0})")]
2154    BlockedByJoinSet(PendingStateBlockedByJoinSet),
2155}
2156
2157#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2158pub struct LockedBy {
2159    pub executor_id: ExecutorId,
2160    pub run_id: RunId,
2161}
2162impl From<&Locked> for LockedBy {
2163    fn from(value: &Locked) -> Self {
2164        LockedBy {
2165            executor_id: value.executor_id,
2166            run_id: value.run_id,
2167        }
2168    }
2169}
2170
2171#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2172#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
2173pub struct PendingStateFinished {
2174    pub version: VersionType, // not Version since it must be Copy
2175    pub finished_at: DateTime<Utc>,
2176    pub result_kind: PendingStateFinishedResultKind,
2177}
2178impl Display for PendingStateFinished {
2179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2180        match self.result_kind {
2181            PendingStateFinishedResultKind::Ok => write!(f, "ok"),
2182            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
2183        }
2184    }
2185}
2186
2187// This is not a Result so that it can be customized for serialization
2188#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2189#[serde(rename_all = "snake_case")]
2190pub enum PendingStateFinishedResultKind {
2191    Ok,
2192    Err(PendingStateFinishedError),
2193}
2194impl PendingStateFinishedResultKind {
2195    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
2196        match self {
2197            PendingStateFinishedResultKind::Ok => Ok(()),
2198            PendingStateFinishedResultKind::Err(err) => Err(err),
2199        }
2200    }
2201}
2202
2203impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
2204    fn from(result: &SupportedFunctionReturnValue) -> Self {
2205        result.as_pending_state_finished_result()
2206    }
2207}
2208
2209#[derive(
2210    Debug,
2211    Clone,
2212    Copy,
2213    PartialEq,
2214    Eq,
2215    Serialize,
2216    Deserialize,
2217    derive_more::Display,
2218    schemars::JsonSchema,
2219)]
2220#[serde(rename_all = "snake_case")]
2221pub enum PendingStateFinishedError {
2222    #[display("{_0}")]
2223    ExecutionFailure(ExecutionFailureKind),
2224    #[display("completed with an error")]
2225    Error,
2226}
2227
2228impl PendingState {
2229    #[instrument(skip(self))]
2230    pub fn can_append_lock(
2231        &self,
2232        created_at: DateTime<Utc>,
2233        executor_id: ExecutorId,
2234        run_id: RunId,
2235        lock_expires_at: DateTime<Utc>,
2236    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
2237        if lock_expires_at <= created_at {
2238            return Err(DbErrorWriteNonRetriable::ValidationFailed(
2239                "invalid expiry date".into(),
2240            ));
2241        }
2242        match self {
2243            PendingState::PendingAt(PendingStatePendingAt {
2244                scheduled_at,
2245                last_lock,
2246            }) => {
2247                if *scheduled_at <= created_at {
2248                    // pending now, ok to lock
2249                    Ok(LockKind::CreatingNewLock)
2250                } else if let Some(LockedBy {
2251                    executor_id: last_executor_id,
2252                    run_id: last_run_id,
2253                }) = last_lock
2254                    && executor_id == *last_executor_id
2255                    && run_id == *last_run_id
2256                {
2257                    // Original executor is extending the lock.
2258                    Ok(LockKind::Extending)
2259                } else {
2260                    Err(DbErrorWriteNonRetriable::ValidationFailed(
2261                        "cannot lock, not yet pending".into(),
2262                    ))
2263                }
2264            }
2265            PendingState::Locked(PendingStateLocked {
2266                locked_by:
2267                    LockedBy {
2268                        executor_id: current_pending_state_executor_id,
2269                        run_id: current_pending_state_run_id,
2270                    },
2271                lock_expires_at: _,
2272            }) => {
2273                if executor_id == *current_pending_state_executor_id
2274                    && run_id == *current_pending_state_run_id
2275                {
2276                    // Original executor is extending the lock.
2277                    Ok(LockKind::Extending)
2278                } else {
2279                    Err(DbErrorWriteNonRetriable::IllegalState {
2280                        reason: "cannot lock, already locked".into(),
2281                        context: SpanTrace::capture(),
2282                        source: None,
2283                        loc: Location::caller(),
2284                    })
2285                }
2286            }
2287            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2288                reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
2289                context: SpanTrace::capture(),
2290                source: None,
2291                loc: Location::caller(),
2292            }),
2293            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2294                reason: "already finished".into(),
2295                context: SpanTrace::capture(),
2296                source: None,
2297                loc: Location::caller(),
2298            }),
2299            PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
2300                reason: "cannot lock, execution is paused".into(),
2301                context: SpanTrace::capture(),
2302                source: None,
2303                loc: Location::caller(),
2304            }),
2305        }
2306    }
2307
2308    #[must_use]
2309    pub fn is_finished(&self) -> bool {
2310        matches!(self, PendingState::Finished { .. })
2311    }
2312
2313    #[must_use]
2314    pub fn is_paused(&self) -> bool {
2315        matches!(self, PendingState::Paused(_))
2316    }
2317}
2318
2319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2320pub enum LockKind {
2321    Extending,
2322    CreatingNewLock,
2323}
2324
2325pub mod http_client_trace {
2326    use chrono::{DateTime, Utc};
2327    use serde::{Deserialize, Serialize};
2328
2329    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2330    pub struct HttpClientTrace {
2331        pub req: RequestTrace,
2332        pub resp: Option<ResponseTrace>,
2333    }
2334
2335    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2336    pub struct RequestTrace {
2337        pub sent_at: DateTime<Utc>,
2338        pub uri: String,
2339        pub method: String,
2340    }
2341
2342    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2343    pub struct ResponseTrace {
2344        pub finished_at: DateTime<Utc>,
2345        pub status: Result<u16, String>,
2346    }
2347}
2348
2349/// Root type for DB schema generation - contains all serialized DB types
2350#[derive(schemars::JsonSchema)]
2351pub struct DbStorageSchema {
2352    pub execution_event: ExecutionEvent,
2353    pub pending_state: PendingState,
2354    pub join_set_response: JoinSetResponse,
2355    pub wasm_backtrace: WasmBacktrace,
2356}
2357
2358#[cfg(test)]
2359mod tests {
2360    use super::HistoryEvent;
2361    use super::HistoryEventScheduleAt;
2362    use super::JoinNextTryOutcome;
2363    use super::PendingStateFinished;
2364    use super::PendingStateFinishedError;
2365    use super::PendingStateFinishedResultKind;
2366    use crate::ExecutionFailureKind;
2367    use crate::JoinSetId;
2368    use crate::SupportedFunctionReturnValue;
2369    use chrono::DateTime;
2370    use chrono::Datelike;
2371    use chrono::Utc;
2372    use insta::assert_snapshot;
2373    use rstest::rstest;
2374    use std::time::Duration;
2375    use val_json::type_wrapper::TypeWrapper;
2376    use val_json::wast_val::WastVal;
2377    use val_json::wast_val::WastValWithType;
2378
2379    #[rstest(expected => [
2380        PendingStateFinishedResultKind::Ok,
2381        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2382    ])]
2383    #[test]
2384    fn serde_pending_state_finished_result_kind_should_work(
2385        expected: PendingStateFinishedResultKind,
2386    ) {
2387        let ser = serde_json::to_string(&expected).unwrap();
2388        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2389        assert_eq!(expected, actual);
2390    }
2391
2392    #[rstest(result_kind => [
2393        PendingStateFinishedResultKind::Ok,
2394        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2395    ])]
2396    #[test]
2397    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2398        let expected = PendingStateFinished {
2399            version: 0,
2400            finished_at: Utc::now(),
2401            result_kind,
2402        };
2403
2404        let ser = serde_json::to_string(&expected).unwrap();
2405        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2406        assert_eq!(expected, actual);
2407    }
2408
2409    #[test]
2410    fn join_set_deser_with_result_ok_option_none_should_work() {
2411        let expected = SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2412            r#type: TypeWrapper::Result {
2413                ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2414                err: Some(Box::new(TypeWrapper::String)),
2415            },
2416            value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2417        }));
2418        let json = serde_json::to_string(&expected).unwrap();
2419        assert_snapshot!(json);
2420
2421        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2422
2423        assert_eq!(expected, actual);
2424    }
2425
2426    #[test]
2427    fn as_date_time_should_work_with_duration_u32_max_secs() {
2428        let duration = Duration::from_secs(u64::from(u32::MAX));
2429        let schedule_at = HistoryEventScheduleAt::In(duration);
2430        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2431        assert_eq!(2106, resolved.year());
2432    }
2433
2434    const MILLIS_PER_SEC: i64 = 1000;
2435    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2436
2437    #[test]
2438    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2439        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
2440        let duration = Duration::from_secs(
2441            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2442        );
2443        let schedule_at = HistoryEventScheduleAt::In(duration);
2444        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2445    }
2446
2447    #[test]
2448    fn join_next_try_outcome_new_format() {
2449        let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"found"}"#;
2450        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2451        assert_eq!(
2452            event,
2453            HistoryEvent::JoinNextTry {
2454                join_set_id: JoinSetId::new(
2455                    crate::JoinSetKind::Named,
2456                    crate::StrVariant::Static("test")
2457                )
2458                .unwrap(),
2459                outcome: JoinNextTryOutcome::Found,
2460            }
2461        );
2462
2463        let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"all_processed"}"#;
2464        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2465        assert_eq!(
2466            event,
2467            HistoryEvent::JoinNextTry {
2468                join_set_id: JoinSetId::new(
2469                    crate::JoinSetKind::Named,
2470                    crate::StrVariant::Static("test")
2471                )
2472                .unwrap(),
2473                outcome: JoinNextTryOutcome::AllProcessed,
2474            }
2475        );
2476    }
2477
2478    #[test]
2479    fn join_next_try_outcome_serializes_new_format() {
2480        let event = HistoryEvent::JoinNextTry {
2481            join_set_id: JoinSetId::new(
2482                crate::JoinSetKind::Named,
2483                crate::StrVariant::Static("test"),
2484            )
2485            .unwrap(),
2486            outcome: JoinNextTryOutcome::AllProcessed,
2487        };
2488        let json = serde_json::to_string(&event).unwrap();
2489        assert!(
2490            json.contains(r#""outcome":"all_processed""#),
2491            "expected outcome field, got: {json}"
2492        );
2493        assert!(
2494            !json.contains("found_response"),
2495            "should not contain old field, got: {json}"
2496        );
2497    }
2498
2499    mod stub_retval_hash {
2500        use super::super::{StubRetVal, StubRetValHash};
2501        use crate::SupportedFunctionReturnValue;
2502        use val_json::type_wrapper::TypeWrapper;
2503        use val_json::wast_val::{WastVal, WastValWithType};
2504
2505        #[test]
2506        fn typed_variant_hash_is_stable() {
2507            let retval =
2508                StubRetVal::Typed(SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2509                    r#type: TypeWrapper::String,
2510                    value: WastVal::String("hello".into()),
2511                })));
2512            let hash = retval.hash();
2513            // Hash should start with version byte 0x01
2514            assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2515            // Hash should be 66 hex characters (33 bytes * 2)
2516            assert_eq!(hash.to_string().len(), 66);
2517        }
2518
2519        #[test]
2520        fn untyped_variant_hash_is_stable() {
2521            let retval = StubRetVal::Untyped(r#"{"ok": "hello"}"#.to_string());
2522            let hash = retval.hash();
2523            // Hash should start with version byte 0x01
2524            assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2525            // Hash should be 66 hex characters (33 bytes * 2)
2526            assert_eq!(hash.to_string().len(), 66);
2527        }
2528
2529        #[test]
2530        fn different_values_produce_different_hashes() {
2531            let typed1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2532            let typed2 = StubRetVal::Typed(SupportedFunctionReturnValue::Err(None));
2533            let untyped1 = StubRetVal::Untyped("value1".to_string());
2534            let untyped2 = StubRetVal::Untyped("value2".to_string());
2535
2536            let hashes: Vec<_> = [typed1, typed2, untyped1, untyped2]
2537                .into_iter()
2538                .map(|r| r.hash().to_string())
2539                .collect();
2540
2541            // All hashes should be unique
2542            for (i, h1) in hashes.iter().enumerate() {
2543                for h2 in hashes.iter().skip(i + 1) {
2544                    assert_ne!(h1, h2, "hashes should be different");
2545                }
2546            }
2547        }
2548
2549        #[test]
2550        fn same_values_produce_same_hashes() {
2551            let retval1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2552            let retval2 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2553            assert_eq!(retval1.hash(), retval2.hash());
2554
2555            let untyped1 = StubRetVal::Untyped("test".to_string());
2556            let untyped2 = StubRetVal::Untyped("test".to_string());
2557            assert_eq!(untyped1.hash(), untyped2.hash());
2558        }
2559
2560        #[test]
2561        fn hash_serialization_roundtrip() {
2562            let retval = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2563            let hash = retval.hash();
2564
2565            let serialized = serde_json::to_string(&hash).unwrap();
2566            let deserialized: StubRetValHash = serde_json::from_str(&serialized).unwrap();
2567
2568            assert_eq!(hash, deserialized);
2569        }
2570
2571        #[test]
2572        fn hash_display_and_fromstr_roundtrip() {
2573            let retval = StubRetVal::Untyped("test value".to_string());
2574            let hash = retval.hash();
2575
2576            let display = hash.to_string();
2577            let parsed: StubRetValHash = display.parse().unwrap();
2578
2579            assert_eq!(hash, parsed);
2580        }
2581
2582        #[test]
2583        fn typed_and_untyped_with_same_content_produce_different_hashes() {
2584            // Even if the JSON content is the same, Typed vs Untyped should hash differently
2585            let typed = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2586            let json_of_typed =
2587                serde_json::to_string(&SupportedFunctionReturnValue::Ok(None)).unwrap();
2588            let untyped = StubRetVal::Untyped(json_of_typed);
2589
2590            assert_ne!(typed.hash(), untyped.hash());
2591        }
2592    }
2593}