Skip to main content

obeli_sk_concepts/
storage.rs

1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionFailure;
8use crate::FunctionExtension;
9use crate::FunctionFqn;
10use crate::FunctionMetadata;
11use crate::JoinSetId;
12use crate::Params;
13use crate::StrVariant;
14use crate::SupportedFunctionReturnValue;
15use crate::component_id::ComponentDigest;
16use crate::prefixed_ulid::DelayId;
17use crate::prefixed_ulid::DeploymentId;
18use crate::prefixed_ulid::ExecutionIdDerived;
19use crate::prefixed_ulid::ExecutorId;
20use crate::prefixed_ulid::RunId;
21use assert_matches::assert_matches;
22use async_trait::async_trait;
23use chrono::TimeDelta;
24use chrono::{DateTime, Utc};
25use http_client_trace::HttpClientTrace;
26use serde::Deserialize;
27use serde::Serialize;
28use std::fmt::Debug;
29use std::fmt::Display;
30use std::panic::Location;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::time::Duration;
34use tracing::debug;
35use tracing::instrument;
36use tracing_error::SpanTrace;
37
38// Shared between databases. TODO: Extract to db-common
39pub const STATE_PENDING_AT: &str = "pending_at";
40pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
41pub const STATE_LOCKED: &str = "locked";
42pub const STATE_FINISHED: &str = "finished";
43pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; // Serialization tag of `HistoryEvent::JoinNext`
44
45#[derive(Debug, PartialEq, Eq, Clone)]
46pub struct ExecutionLog {
47    pub execution_id: ExecutionId,
48    pub events: Vec<ExecutionEvent>,
49    pub responses: Vec<ResponseWithCursor>,
50    pub next_version: Version, // Is not advanced once in Finished state
51    pub pending_state: PendingState, // reflecting the current state
52    pub component_digest: ComponentDigest, // reflecting the current state
53    pub component_type: ComponentType,
54    pub deployment_id: DeploymentId, // reflecting the current state
55}
56
57impl ExecutionLog {
58    /// Return some duration after which the execution will be retried.
59    /// Return `None` if no more retries are allowed.
60    #[must_use]
61    pub fn can_be_retried_after(
62        temporary_event_count: u32,
63        max_retries: Option<u32>,
64        retry_exp_backoff: Duration,
65    ) -> Option<Duration> {
66        // If max_retries == None, wrapping is OK after this succeeds - we want to retry forever.
67        if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
68            // TODO: Add test for number of retries
69            let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
70            Some(duration)
71        } else {
72            None
73        }
74    }
75
76    #[must_use]
77    pub fn compute_retry_duration_when_retrying_forever(
78        temporary_event_count: u32,
79        retry_exp_backoff: Duration,
80    ) -> Duration {
81        Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
82            .expect("`max_retries` set to MAX must never return None")
83    }
84
85    #[must_use]
86    pub fn get_create_request(&self) -> CreateRequest {
87        assert_matches!(self.events.first().cloned(), Some(ExecutionEvent {
88            event:ExecutionRequest::Created{
89                ffqn,params,parent,scheduled_at,component_id,deployment_id,metadata,scheduled_by},
90                created_at, .. }) => CreateRequest { created_at, execution_id:
91                    self.execution_id.clone(), ffqn, params, parent, scheduled_at,
92                    component_id, deployment_id, metadata, scheduled_by, paused: false })
93    }
94
95    #[must_use]
96    pub fn ffqn(&self) -> &FunctionFqn {
97        assert_matches!(self.events.first(), Some(ExecutionEvent {
98            event: ExecutionRequest::Created { ffqn, .. },
99            ..
100        }) => ffqn)
101    }
102
103    #[must_use]
104    pub fn params(&self) -> &Params {
105        assert_matches!(self.events.first(), Some(ExecutionEvent {
106            event: ExecutionRequest::Created { params, .. },
107            ..
108        }) => params)
109    }
110
111    #[must_use]
112    pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
113        assert_matches!(self.events.first(), Some(ExecutionEvent {
114            event: ExecutionRequest::Created { parent, .. },
115            ..
116        }) => parent.clone())
117    }
118
119    #[must_use]
120    pub fn last_event(&self) -> &ExecutionEvent {
121        self.events.last().expect("must contain at least one event")
122    }
123
124    #[must_use]
125    pub fn is_finished(&self) -> bool {
126        matches!(
127            self.events.last(),
128            Some(ExecutionEvent {
129                event: ExecutionRequest::Finished { .. },
130                ..
131            })
132        )
133    }
134
135    #[must_use]
136    pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
137        if let ExecutionEvent {
138            event: ExecutionRequest::Finished { retval: result, .. },
139            ..
140        } = self.events.last().expect("must contain at least one event")
141        {
142            Some(result.clone())
143        } else {
144            None
145        }
146    }
147
148    pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
149        self.events.iter().filter_map(|event| {
150            if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
151                Some((eh.clone(), event.version.clone()))
152            } else {
153                None
154            }
155        })
156    }
157
158    #[cfg(feature = "test")]
159    #[must_use]
160    pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
161        self.events
162            .iter()
163            .find_map(move |event| match &event.event {
164                ExecutionRequest::HistoryEvent {
165                    event:
166                        HistoryEvent::JoinSetRequest {
167                            join_set_id: found,
168                            request,
169                        },
170                    ..
171                } if *join_set_id == *found => Some(request),
172                _ => None,
173            })
174    }
175}
176
177pub type VersionType = u32;
178#[derive(
179    Debug,
180    Default,
181    Clone,
182    PartialEq,
183    PartialOrd,
184    Ord,
185    Eq,
186    Hash,
187    derive_more::Display,
188    derive_more::Into,
189    serde::Serialize,
190    serde::Deserialize,
191    schemars::JsonSchema,
192)]
193#[serde(transparent)]
194#[schemars(transparent)]
195pub struct Version(pub VersionType);
196impl Version {
197    #[must_use]
198    pub fn new(arg: VersionType) -> Version {
199        Version(arg)
200    }
201
202    #[must_use]
203    pub fn increment(&self) -> Version {
204        Version(self.0 + 1)
205    }
206}
207impl TryFrom<i64> for Version {
208    type Error = VersionParseError;
209    fn try_from(value: i64) -> Result<Self, Self::Error> {
210        VersionType::try_from(value)
211            .map(Version::new)
212            .map_err(|_| VersionParseError)
213    }
214}
215impl From<Version> for usize {
216    fn from(value: Version) -> Self {
217        usize::try_from(value.0).expect("16 bit systems are unsupported")
218    }
219}
220impl From<&Version> for usize {
221    fn from(value: &Version) -> Self {
222        usize::try_from(value.0).expect("16 bit systems are unsupported")
223    }
224}
225
226#[derive(Debug, thiserror::Error)]
227#[error("version must be u32")]
228pub struct VersionParseError;
229
230#[derive(
231    Clone,
232    Debug,
233    derive_more::Display,
234    PartialEq,
235    Eq,
236    serde::Serialize,
237    serde::Deserialize,
238    schemars::JsonSchema,
239)]
240#[display("{event}")]
241pub struct ExecutionEvent {
242    pub created_at: DateTime<Utc>,
243    pub event: ExecutionRequest,
244    #[serde(skip_serializing_if = "Option::is_none")]
245    pub backtrace_id: Option<Version>,
246    pub version: Version,
247}
248
249#[derive(
250    Debug,
251    Clone,
252    Copy,
253    PartialEq,
254    Eq,
255    derive_more::Display,
256    derive_more::Into,
257    Serialize, /* webapi */
258    schemars::JsonSchema,
259)]
260pub struct ResponseCursor(pub u32);
261
262#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */, schemars::JsonSchema)]
263pub struct ResponseWithCursor {
264    pub event: JoinSetResponseEventOuter,
265    pub cursor: ResponseCursor,
266}
267
268#[derive(Debug)]
269pub struct ListExecutionEventsResponse {
270    pub events: Vec<ExecutionEvent>,
271    pub max_version: Version,
272}
273
274#[derive(Debug)]
275pub struct ListResponsesResponse {
276    pub responses: Vec<ResponseWithCursor>,
277    pub max_cursor: ResponseCursor,
278}
279
280#[derive(Debug, Clone, PartialEq, Eq, Serialize /* webapi */, schemars::JsonSchema)]
281pub struct JoinSetResponseEventOuter {
282    pub created_at: DateTime<Utc>,
283    pub event: JoinSetResponseEvent,
284}
285
286#[derive(
287    Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
288)]
289pub struct JoinSetResponseEvent {
290    pub join_set_id: JoinSetId,
291    pub event: JoinSetResponse,
292}
293
294#[derive(
295    Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display, schemars::JsonSchema,
296)]
297#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
298#[serde(tag = "type", rename_all = "snake_case")]
299pub enum JoinSetResponse {
300    #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
301    DelayFinished {
302        delay_id: DelayId,
303        result: Result<(), ()>,
304    },
305    #[display("{result}: {child_execution_id}")] // execution completed..
306    ChildExecutionFinished {
307        child_execution_id: ExecutionIdDerived,
308        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
309        finished_version: Version,
310        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
311        result: SupportedFunctionReturnValue,
312    },
313}
314
315pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
316    ffqn: FunctionFqn::new_static("", ""),
317    params: Params::empty(),
318    parent: None,
319    scheduled_at: DateTime::from_timestamp_nanos(0),
320    component_id: ComponentId::dummy_activity(),
321    deployment_id: DeploymentId::from_parts(0, 0),
322    metadata: ExecutionMetadata::empty(),
323    scheduled_by: None,
324};
325pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
326    event: HistoryEvent::JoinSetCreate {
327        join_set_id: JoinSetId {
328            kind: crate::JoinSetKind::OneOff,
329            name: StrVariant::empty(),
330        },
331    },
332};
333
334#[derive(
335    Clone,
336    derive_more::Debug,
337    derive_more::Display,
338    PartialEq,
339    Eq,
340    Serialize,
341    Deserialize,
342    schemars::JsonSchema,
343)]
344#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
345#[serde(rename_all = "snake_case")]
346pub enum ExecutionRequest {
347    #[display("Created({ffqn}, `{scheduled_at}`)")]
348    Created {
349        ffqn: FunctionFqn,
350        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
351        #[debug(skip)]
352        params: Params,
353        parent: Option<(ExecutionId, JoinSetId)>,
354        scheduled_at: DateTime<Utc>,
355        #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
356        component_id: ComponentId,
357        deployment_id: DeploymentId,
358        #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
359        metadata: ExecutionMetadata,
360        scheduled_by: Option<ExecutionId>,
361    },
362    Locked(Locked),
363    /// Returns execution to [`PendingState::PendingAt`] state at the specified time.
364    /// This can happen when:
365    /// - executor is running out of resources like [`WorkerError::LimitReached`]
366    /// - workflow made progress but then its lock expired.
367    /// - executor is being closed (shutdown or hot redeploy requested)
368    /// - activity is paused
369    #[display("Unlocked(`{backoff_expires_at}`)")]
370    Unlocked {
371        backoff_expires_at: DateTime<Utc>,
372        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
373        reason: StrVariant,
374    },
375    // Created by the executor holding the lock.
376    // After expiry interpreted as pending.
377    #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
378    TemporarilyFailed {
379        backoff_expires_at: DateTime<Utc>,
380        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
381        reason: StrVariant,
382        detail: Option<String>,
383        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
384        http_client_traces: Option<Vec<HttpClientTrace>>,
385    },
386    // Created by the executor holding the lock.
387    // After expiry interpreted as pending.
388    #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
389    TemporarilyTimedOut {
390        backoff_expires_at: DateTime<Utc>,
391        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
392        http_client_traces: Option<Vec<HttpClientTrace>>,
393    },
394    // Created by the executor holding the lock.
395    #[display("Finished: {retval}")]
396    Finished {
397        #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
398        retval: SupportedFunctionReturnValue,
399        #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
400        http_client_traces: Option<Vec<HttpClientTrace>>,
401    },
402
403    #[display("HistoryEvent({event})")]
404    HistoryEvent {
405        event: HistoryEvent,
406    },
407    #[display("Paused")]
408    Paused,
409    #[display("Unpaused")]
410    Unpaused,
411}
412
413impl ExecutionRequest {
414    #[must_use]
415    pub fn is_temporary_event(&self) -> bool {
416        matches!(
417            self,
418            Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
419        )
420    }
421
422    /// String representation of `ExecutionRequest`, used in execution log table to fetch events of certain type, e.g. `created` + `history_event`.
423    #[must_use]
424    pub const fn variant(&self) -> &'static str {
425        match self {
426            ExecutionRequest::Created { .. } => "created",
427            ExecutionRequest::Locked(_) => "locked",
428            ExecutionRequest::Unlocked { .. } => "unlocked",
429            ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
430            ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
431            ExecutionRequest::Finished { .. } => "finished",
432            ExecutionRequest::HistoryEvent { .. } => "history_event",
433            ExecutionRequest::Paused => "paused",
434            ExecutionRequest::Unpaused => "unpaused",
435        }
436    }
437
438    #[must_use]
439    pub fn join_set_id(&self) -> Option<&JoinSetId> {
440        match self {
441            Self::Created {
442                parent: Some((_parent_id, join_set_id)),
443                ..
444            } => Some(join_set_id),
445            Self::HistoryEvent {
446                event:
447                    HistoryEvent::JoinSetCreate { join_set_id, .. }
448                    | HistoryEvent::JoinSetRequest { join_set_id, .. }
449                    | HistoryEvent::JoinNext { join_set_id, .. },
450            } => Some(join_set_id),
451            _ => None,
452        }
453    }
454}
455
456#[derive(
457    Clone,
458    derive_more::Debug,
459    derive_more::Display,
460    PartialEq,
461    Eq,
462    Serialize,
463    Deserialize,
464    schemars::JsonSchema,
465)]
466#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
467#[display("Locked(`{lock_expires_at}`, {component_id})")]
468pub struct Locked {
469    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
470    pub component_id: ComponentId,
471    pub executor_id: ExecutorId,
472    pub deployment_id: DeploymentId,
473    pub run_id: RunId,
474    pub lock_expires_at: DateTime<Utc>,
475    #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
476    pub retry_config: ComponentRetryConfig,
477}
478
479#[derive(
480    Debug,
481    Clone,
482    Copy,
483    PartialEq,
484    Eq,
485    derive_more::Display,
486    Serialize,
487    Deserialize,
488    schemars::JsonSchema,
489)]
490#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
491#[serde(tag = "type", rename_all = "snake_case")]
492pub enum PersistKind {
493    #[display("RandomU64({min}, {max_inclusive})")]
494    RandomU64 {
495        min: u64,
496        max_inclusive: u64,
497    },
498    #[display("RandomString({min_length}, {max_length_exclusive})")]
499    RandomString {
500        min_length: u64,
501        max_length_exclusive: u64,
502    },
503    ExecutionId,
504}
505
506#[must_use]
507pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
508    value.to_be_bytes()
509}
510
511#[derive(
512    derive_more::Debug,
513    Clone,
514    PartialEq,
515    Eq,
516    derive_more::Display,
517    Serialize,
518    Deserialize,
519    schemars::JsonSchema,
520)]
521#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
522#[serde(tag = "type", rename_all = "snake_case")]
523/// Must be created by the executor in [`PendingState::Locked`].
524pub enum HistoryEvent {
525    /// Persist a generated pseudorandom value.
526    #[display("Persist")]
527    Persist {
528        #[debug(skip)]
529        value: Vec<u8>, // Only stored for nondeterminism checks. TODO: Consider using a hashed value or just the intention.
530        kind: PersistKind,
531    },
532    #[display("JoinSetCreate({join_set_id})")]
533    JoinSetCreate { join_set_id: JoinSetId },
534    #[display("JoinSetRequest({request})")]
535    // join_set_id is part of ExecutionId or DelayId in the `request`
536    JoinSetRequest {
537        join_set_id: JoinSetId,
538        request: JoinSetRequest,
539    },
540    /// Sets the pending state to [`PendingState::BlockedByJoinSet`].
541    /// When the response arrives at `resp_time`:
542    /// The execution is [`PendingState::PendingAt`]`(max(resp_time, lock_expires_at)`, so that the
543    /// original executor can continue. After the expiry any executor can continue without
544    /// marking the execution as timed out.
545    #[display("JoinNext({join_set_id})")]
546    JoinNext {
547        join_set_id: JoinSetId,
548        /// Set to a future time if the worker is keeping the execution invocation warm waiting for the result.
549        /// The pending status will be kept in Locked state until `run_expires_at`.
550        run_expires_at: DateTime<Utc>,
551        /// Set to a specific function when calling `-await-next` extension function, used for
552        /// determinism checks.
553        requested_ffqn: Option<FunctionFqn>,
554        /// Closing request must never set `requested_ffqn` and is ignored by determinism checks.
555        closing: bool,
556    },
557    /// Attempt to process next response without changing the pending state.
558    #[display("JoinNextTry({join_set_id}, {outcome})")]
559    JoinNextTry {
560        join_set_id: JoinSetId,
561        outcome: JoinNextTryOutcome,
562    },
563    /// Records the fact that a join set was awaited more times than its submission count.
564    #[display("JoinNextTooMany({join_set_id})")]
565    JoinNextTooMany {
566        join_set_id: JoinSetId,
567        /// Set to a specific function when calling `-await-next` extension function, used for
568        /// determinism checks.
569        requested_ffqn: Option<FunctionFqn>,
570    },
571    #[display("Schedule({execution_id}, {schedule_at})")]
572    Schedule {
573        execution_id: ExecutionId,
574        schedule_at: HistoryEventScheduleAt, // Stores intention to schedule an execution at a date/offset
575        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
576        result: Result<(), ScheduleRequestError>,
577    },
578    #[display("Stub({target_execution_id})")]
579    Stub {
580        target_execution_id: ExecutionIdDerived,
581        #[cfg_attr(any(test, feature = "test"), arbitrary(value = StubRetVal::Typed(crate::SUPPORTED_RETURN_VALUE_OK_EMPTY).hash()))]
582        retval_hash: StubRetValHash,
583        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
584        result: Result<(), StubError>,
585    },
586}
587
588/// Stub return value - only used during processing, not stored in history.
589#[derive(derive_more::Debug, Clone, PartialEq, Eq)]
590#[cfg_attr(any(test, feature = "test"), derive(Serialize, Deserialize))]
591#[cfg_attr(any(test, feature = "test"), serde(rename_all = "snake_case"))]
592pub enum StubRetVal {
593    Typed(SupportedFunctionReturnValue),
594    Untyped(String),
595}
596
597impl StubRetVal {
598    /// Compute a stable hash of the return value for determinism checks.
599    #[must_use]
600    pub fn hash(&self) -> StubRetValHash {
601        use sha2::{Digest as _, Sha256};
602        const STUB_RETVAL_HASH_VERSION: u8 = 1;
603        let mut hasher = Sha256::default();
604
605        match self {
606            StubRetVal::Typed(val) => {
607                hasher.update(b"T|");
608                // Serialize to JSON for stable hashing
609                let json = serde_json::to_string(val)
610                    .expect("SupportedFunctionReturnValue is always serializable");
611                hasher.update(json.as_bytes());
612            }
613            StubRetVal::Untyped(s) => {
614                hasher.update(b"U|");
615                hasher.update(s.as_bytes());
616            }
617        }
618
619        let hash_bytes = hasher.finalize();
620        let mut result = [0u8; 33];
621        result[0] = STUB_RETVAL_HASH_VERSION;
622        result[1..].copy_from_slice(&hash_bytes);
623
624        StubRetValHash(result)
625    }
626}
627
628/// Hash of a stub return value, stored in history for determinism checks.
629/// Format: 1 byte version + 32 bytes SHA-256 hash.
630#[derive(
631    Clone,
632    PartialEq,
633    Eq,
634    serde_with::SerializeDisplay,
635    serde_with::DeserializeFromStr,
636    schemars::JsonSchema,
637)]
638#[schemars(with = "String")]
639pub struct StubRetValHash([u8; 33]);
640
641impl Display for StubRetValHash {
642    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
643        for b in self.0 {
644            write!(f, "{b:02x}")?;
645        }
646        Ok(())
647    }
648}
649
650impl Debug for StubRetValHash {
651    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652        Display::fmt(self, f)
653    }
654}
655
656impl std::str::FromStr for StubRetValHash {
657    type Err = StubRetValHashParseError;
658
659    fn from_str(s: &str) -> Result<Self, Self::Err> {
660        if s.len() != 66 {
661            // 33 bytes * 2 hex chars = 66
662            return Err(StubRetValHashParseError::InvalidLength(s.len()));
663        }
664        let mut bytes = [0u8; 33];
665        for i in 0..33 {
666            let chunk = &s[i * 2..i * 2 + 2];
667            bytes[i] =
668                u8::from_str_radix(chunk, 16).map_err(|_| StubRetValHashParseError::InvalidHex)?;
669        }
670        Ok(StubRetValHash(bytes))
671    }
672}
673
674#[derive(Debug, thiserror::Error)]
675pub enum StubRetValHashParseError {
676    #[error("invalid length: expected 66 hex chars, got {0}")]
677    InvalidLength(usize),
678    #[error("invalid hex character")]
679    InvalidHex,
680}
681
682/// Error from the `-stub` extension function.
683/// Mirrors `obelisk:types/execution.{stub-error}` from WIT.
684#[derive(
685    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
686)]
687#[serde(rename_all = "snake_case")]
688pub enum StubError {
689    #[error("execution not found")]
690    ExecutionNotFound,
691    #[error("type check error: {0}")]
692    TypeCheckError(String),
693    #[error("conflict")]
694    Conflict,
695}
696
697/// Error from the `schedule-json` function. Persisted in history for determinism.
698#[derive(
699    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
700)]
701#[serde(rename_all = "snake_case")]
702pub enum ScheduleRequestError {
703    #[error("function not found")]
704    FunctionNotFound,
705    #[error("params parsing error: {0}")]
706    TypeCheckError(String),
707}
708
709/// Error from the `submit-json` function. Persisted in history for determinism.
710#[derive(
711    Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
712)]
713#[serde(rename_all = "snake_case")]
714pub enum ChildExecutionRequestError {
715    #[error("function not found")]
716    FunctionNotFound,
717    #[error("params parsing error: {0}")]
718    TypeCheckError(String),
719}
720
721#[derive(
722    Debug,
723    Clone,
724    Copy,
725    PartialEq,
726    Eq,
727    derive_more::Display,
728    Serialize,
729    Deserialize,
730    schemars::JsonSchema,
731)]
732#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
733#[serde(rename_all = "snake_case")]
734pub enum JoinNextTryOutcome {
735    /// A response was found and processed.
736    #[display("found")]
737    Found,
738    /// No response available, but there are still pending requests.
739    #[display("pending")]
740    Pending,
741    /// No response available, and all requests have been processed.
742    #[display("all_processed")]
743    AllProcessed,
744}
745
746impl From<bool> for JoinNextTryOutcome {
747    /// Migration helper: converts old `found_response: bool` to the new enum.
748    /// `false` maps to `Pending` as a conservative default (the exact error
749    /// was not stored before).
750    fn from(found_response: bool) -> Self {
751        if found_response {
752            JoinNextTryOutcome::Found
753        } else {
754            JoinNextTryOutcome::Pending
755        }
756    }
757}
758
759#[derive(
760    Debug,
761    Clone,
762    Copy,
763    PartialEq,
764    Eq,
765    derive_more::Display,
766    Serialize,
767    Deserialize,
768    schemars::JsonSchema,
769)]
770#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
771#[serde(rename_all = "snake_case")]
772pub enum HistoryEventScheduleAt {
773    Now,
774    #[display("At(`{_0}`)")]
775    At(DateTime<Utc>),
776    #[display("In({_0:?})")]
777    In(Duration),
778}
779
780#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
781pub enum ScheduleAtConversionError {
782    #[error("source duration value is out of range")]
783    OutOfRangeError,
784}
785
786impl HistoryEventScheduleAt {
787    pub fn as_date_time(
788        &self,
789        now: DateTime<Utc>,
790    ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
791        match self {
792            Self::Now => Ok(now),
793            Self::At(date_time) => Ok(*date_time),
794            Self::In(duration) => {
795                let time_delta = TimeDelta::from_std(*duration)
796                    .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
797                now.checked_add_signed(time_delta)
798                    .ok_or(ScheduleAtConversionError::OutOfRangeError)
799            }
800        }
801    }
802}
803
804#[derive(
805    Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize, schemars::JsonSchema,
806)]
807#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
808#[serde(tag = "type", rename_all = "snake_case")]
809pub enum JoinSetRequest {
810    // Must be created by the executor in `PendingState::Locked`.
811    #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
812    DelayRequest {
813        delay_id: DelayId,
814        expires_at: DateTime<Utc>,
815        schedule_at: HistoryEventScheduleAt,
816        #[serde(default)]
817        paused: bool,
818    },
819    // Must be created by the executor in `PendingState::Locked`.
820    #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
821    ChildExecutionRequest {
822        child_execution_id: ExecutionIdDerived,
823        target_ffqn: FunctionFqn,
824        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
825        params: Params,
826        #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
827        result: Result<(), ChildExecutionRequestError>,
828    },
829}
830
831/// Error that is not specific to an execution.
832#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
833pub enum DbErrorGeneric {
834    #[error("database error: {reason}")]
835    Uncategorized {
836        reason: StrVariant,
837        #[eq(skip)]
838        #[partial_eq(skip)]
839        context: SpanTrace,
840        #[eq(skip)]
841        #[partial_eq(skip)]
842        #[source]
843        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
844        loc: &'static Location<'static>,
845    },
846    #[error("database was closed")]
847    Close,
848}
849
850#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
851pub enum DbErrorWriteNonRetriable {
852    #[error("validation failed: {0}")]
853    ValidationFailed(StrVariant),
854    #[error("conflict")]
855    Conflict,
856    #[error("already finished")]
857    AlreadyFinished,
858    #[error("illegal state: {reason}")]
859    IllegalState {
860        reason: StrVariant,
861        #[eq(skip)]
862        #[partial_eq(skip)]
863        context: SpanTrace,
864        #[eq(skip)]
865        #[partial_eq(skip)]
866        #[source]
867        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
868        loc: &'static Location<'static>,
869    },
870    #[error("version conflict: expected: {expected}, got: {requested}")]
871    VersionConflict {
872        expected: Version,
873        requested: Version,
874    },
875}
876
877/// Write error tied to an execution
878#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
879pub enum DbErrorWrite {
880    #[error("cannot write - row not found")]
881    NotFound,
882    #[error("non-retriable error: {0}")]
883    NonRetriable(#[from] DbErrorWriteNonRetriable),
884    #[error(transparent)]
885    Generic(#[from] DbErrorGeneric),
886}
887
888/// Error from idempotent stub response write.
889#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
890pub enum DbErrorStubResponse {
891    #[error("stub conflict: already finished with a different value")]
892    StubConflict,
893    #[error(transparent)]
894    Write(#[from] DbErrorWrite),
895}
896
897/// Read error tied to an execution
898#[derive(Debug, Clone, thiserror::Error, PartialEq)]
899pub enum DbErrorRead {
900    #[error("cannot read - row not found")]
901    NotFound,
902    #[error(transparent)]
903    Generic(#[from] DbErrorGeneric),
904}
905
906#[derive(Debug, thiserror::Error, PartialEq)]
907pub enum DbErrorReadWithTimeout {
908    #[error("timeout")]
909    Timeout(TimeoutOutcome),
910    #[error(transparent)]
911    DbErrorRead(#[from] DbErrorRead),
912}
913
914// Represents next version after successfuly appended to execution log.
915// TODO: Convert to struct with next_version
916pub type AppendResponse = Version;
917pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
918
919#[derive(Debug, Clone)]
920pub struct LockedExecution {
921    pub execution_id: ExecutionId,
922    pub next_version: Version,
923    pub metadata: ExecutionMetadata,
924    pub locked_event: Locked,
925    pub ffqn: FunctionFqn,
926    pub params: Params,
927    pub event_history: Vec<(HistoryEvent, Version)>,
928    pub responses: Vec<ResponseWithCursor>,
929    pub parent: Option<(ExecutionId, JoinSetId)>,
930    pub intermittent_event_count: u32,
931}
932
933pub type LockPendingResponse = Vec<LockedExecution>;
934pub type AppendBatchResponse = Version;
935
936#[derive(Debug, Clone, PartialEq, derive_more::Display, Serialize, Deserialize)]
937#[display("{event}")]
938pub struct AppendRequest {
939    pub created_at: DateTime<Utc>,
940    pub event: ExecutionRequest,
941}
942
943#[derive(Debug, Clone, PartialEq)]
944#[cfg_attr(feature = "test", derive(Serialize))]
945pub struct CreateRequest {
946    pub created_at: DateTime<Utc>,
947    pub execution_id: ExecutionId,
948    pub ffqn: FunctionFqn,
949    pub params: Params,
950    pub parent: Option<(ExecutionId, JoinSetId)>,
951    pub scheduled_at: DateTime<Utc>,
952    pub component_id: ComponentId,
953    pub deployment_id: DeploymentId,
954    pub metadata: ExecutionMetadata,
955    pub scheduled_by: Option<ExecutionId>,
956    pub paused: bool,
957}
958
959impl From<CreateRequest> for ExecutionRequest {
960    fn from(value: CreateRequest) -> Self {
961        Self::Created {
962            ffqn: value.ffqn,
963            params: value.params,
964            parent: value.parent,
965            scheduled_at: value.scheduled_at,
966            component_id: value.component_id,
967            deployment_id: value.deployment_id,
968            metadata: value.metadata,
969            scheduled_by: value.scheduled_by,
970        }
971    }
972}
973
974#[async_trait]
975pub trait DbPool: Send + Sync {
976    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
977
978    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
979
980    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
981
982    #[cfg(feature = "test")]
983    async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
984}
985
986#[async_trait]
987pub trait DbPoolCloseable {
988    async fn close(&self);
989}
990
991#[derive(Clone, Debug, PartialEq)]
992#[cfg_attr(feature = "test", derive(Serialize))]
993pub struct AppendEventsToExecution {
994    pub execution_id: ExecutionId,
995    pub version: Version,
996    pub batch: Vec<AppendRequest>,
997}
998
999#[derive(Clone, Debug, PartialEq)]
1000#[cfg_attr(feature = "test", derive(Serialize))]
1001pub struct AppendResponseToExecution {
1002    pub parent_execution_id: ExecutionId,
1003    pub created_at: DateTime<Utc>,
1004    pub join_set_id: JoinSetId,
1005    pub child_execution_id: ExecutionIdDerived,
1006    pub finished_version: Version,
1007    pub result: SupportedFunctionReturnValue,
1008}
1009
1010/// A captured database write operation with all arguments needed to replay it
1011/// against the real database.
1012/// Dates carry meaning only on a fresh replay, ignoring user's input when persisting.
1013#[derive(Debug, Clone, PartialEq)]
1014#[cfg_attr(feature = "test", derive(Serialize))]
1015pub enum CapturedDbWrite {
1016    Append {
1017        execution_id: ExecutionId,
1018        version: Version,
1019        req: AppendRequest,
1020        backtraces: Vec<BacktraceInfo>,
1021    },
1022    AppendBatch {
1023        current_time: DateTime<Utc>,
1024        batch: Vec<AppendRequest>,
1025        execution_id: ExecutionId,
1026        version: Version,
1027        backtraces: Vec<BacktraceInfo>,
1028    },
1029    AppendBatchCreateNewExecution {
1030        current_time: DateTime<Utc>,
1031        batch: Vec<AppendRequest>,
1032        execution_id: ExecutionId,
1033        version: Version,
1034        child_req: Vec<CreateRequest>,
1035        backtraces: Vec<BacktraceInfo>,
1036    },
1037    AppendStubResponse {
1038        events: AppendEventsToExecution,
1039        response: AppendResponseToExecution,
1040        current_time: DateTime<Utc>,
1041        backtraces: Vec<BacktraceInfo>,
1042    },
1043    AppendFinished {
1044        // TODO: Extract struct AppendFinished
1045        execution_id: ExecutionId,
1046        version: Version,
1047        current_time: DateTime<Utc>,
1048        retval: SupportedFunctionReturnValue,
1049        parent: Option<(ExecutionId, JoinSetId)>,
1050    },
1051}
1052
1053#[async_trait]
1054pub trait DbExecutor: Send + Sync {
1055    #[expect(clippy::too_many_arguments)]
1056    async fn lock_pending_by_ffqns(
1057        &self,
1058        batch_size: u32,
1059        pending_at_or_sooner: DateTime<Utc>,
1060        ffqns: Arc<[FunctionFqn]>,
1061        created_at: DateTime<Utc>,
1062        component_id: ComponentId,
1063        deployment_id: DeploymentId,
1064        executor_id: ExecutorId,
1065        lock_expires_at: DateTime<Utc>,
1066        run_id: RunId,
1067        retry_config: ComponentRetryConfig,
1068    ) -> Result<LockPendingResponse, DbErrorWrite>;
1069
1070    #[expect(clippy::too_many_arguments)]
1071    async fn lock_pending_by_component_digest(
1072        &self,
1073        batch_size: u32,
1074        pending_at_or_sooner: DateTime<Utc>,
1075        component_id: &ComponentId,
1076        deployment_id: DeploymentId,
1077        created_at: DateTime<Utc>,
1078        executor_id: ExecutorId,
1079        lock_expires_at: DateTime<Utc>,
1080        run_id: RunId,
1081        retry_config: ComponentRetryConfig,
1082    ) -> Result<LockPendingResponse, DbErrorWrite>;
1083
1084    #[cfg(feature = "test")]
1085    #[expect(clippy::too_many_arguments)]
1086    async fn lock_one(
1087        &self,
1088        created_at: DateTime<Utc>,
1089        component_id: ComponentId,
1090        deployment_id: DeploymentId,
1091        execution_id: &ExecutionId,
1092        run_id: RunId,
1093        version: Version,
1094        executor_id: ExecutorId,
1095        lock_expires_at: DateTime<Utc>,
1096        retry_config: ComponentRetryConfig,
1097    ) -> Result<LockedExecution, DbErrorWrite>;
1098
1099    /// Append a single event to an existing execution log.
1100    /// The request cannot contain [`ExecutionRequest::Created`].
1101    async fn append(
1102        &self,
1103        execution_id: ExecutionId,
1104        version: Version,
1105        req: AppendRequest,
1106    ) -> Result<AppendResponse, DbErrorWrite>;
1107
1108    /// Append a batch of events to an existing execution log, and append a response to a parent execution.
1109    /// The batch cannot contain [`ExecutionRequest::Created`].
1110    async fn append_batch_respond_to_parent(
1111        &self,
1112        events: AppendEventsToExecution,
1113        response: AppendResponseToExecution,
1114        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1115    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1116
1117    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
1118    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
1119    /// Otherwise wait until `timeout_fut` resolves.
1120    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
1121    async fn wait_for_pending_by_ffqn(
1122        &self,
1123        pending_at_or_sooner: DateTime<Utc>,
1124        ffqns: Arc<[FunctionFqn]>,
1125        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1126    );
1127
1128    /// Notification mechainism with no strict guarantees for waiting while there are no pending executions.
1129    /// Return immediately if there are pending notifications at `pending_at_or_sooner`.
1130    /// Otherwise wait until `timeout_fut` resolves.
1131    /// Delay requests that expire between `pending_at_or_sooner` and timeout can be disregarded.
1132    async fn wait_for_pending_by_component_digest(
1133        &self,
1134        pending_at_or_sooner: DateTime<Utc>,
1135        component_digest: &ComponentDigest,
1136        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1137    );
1138
1139    async fn cancel_activity_with_retries(
1140        &self,
1141        execution_id: &ExecutionId,
1142        cancelled_at: DateTime<Utc>,
1143    ) -> Result<CancelOutcome, DbErrorWrite> {
1144        let mut retries = 5;
1145        loop {
1146            let res = self.cancel_activity(execution_id, cancelled_at).await;
1147            if res.is_ok() || retries == 0 {
1148                return res;
1149            }
1150            retries -= 1;
1151        }
1152    }
1153
1154    /// Get last event. Impls may set `ExecutionEvent::backtrace_id` to `None`.
1155    async fn get_last_execution_event(
1156        &self,
1157        execution_id: &ExecutionId,
1158    ) -> Result<ExecutionEvent, DbErrorRead>;
1159
1160    async fn cancel_activity(
1161        &self,
1162        execution_id: &ExecutionId,
1163        cancelled_at: DateTime<Utc>,
1164    ) -> Result<CancelOutcome, DbErrorWrite> {
1165        debug!("Determining cancellation state of {execution_id}");
1166
1167        let last_event = self
1168            .get_last_execution_event(execution_id)
1169            .await
1170            .map_err(DbErrorWrite::from)?;
1171        if let ExecutionRequest::Finished {
1172            retval:
1173                SupportedFunctionReturnValue::ExecutionFailure(FinishedExecutionFailure {
1174                    kind: ExecutionFailureKind::Cancelled,
1175                    ..
1176                }),
1177            ..
1178        } = last_event.event
1179        {
1180            return Ok(CancelOutcome::Cancelled);
1181        } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
1182            debug!("Not cancelling, {execution_id} is already finished");
1183            return Ok(CancelOutcome::AlreadyFinished);
1184        }
1185        let finished_version = last_event.version.increment();
1186        let child_result =
1187            SupportedFunctionReturnValue::ExecutionFailure(FinishedExecutionFailure {
1188                reason: None,
1189                kind: ExecutionFailureKind::Cancelled,
1190                detail: None,
1191            });
1192        let cancel_request = AppendRequest {
1193            created_at: cancelled_at,
1194            event: ExecutionRequest::Finished {
1195                retval: child_result.clone(),
1196                http_client_traces: None,
1197            },
1198        };
1199        debug!("Cancelling activity {execution_id} at {finished_version}");
1200        if let ExecutionId::Derived(execution_id) = execution_id {
1201            let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
1202            let child_execution_id = ExecutionId::Derived(execution_id.clone());
1203            self.append_batch_respond_to_parent(
1204                AppendEventsToExecution {
1205                    execution_id: child_execution_id,
1206                    version: finished_version.clone(),
1207                    batch: vec![cancel_request],
1208                },
1209                AppendResponseToExecution {
1210                    parent_execution_id,
1211                    created_at: cancelled_at,
1212                    join_set_id: join_set_id.clone(),
1213                    child_execution_id: execution_id.clone(),
1214                    finished_version,
1215                    result: child_result,
1216                },
1217                cancelled_at,
1218            )
1219            .await?;
1220        } else {
1221            self.append(execution_id.clone(), finished_version, cancel_request)
1222                .await?;
1223        }
1224        debug!("Cancelled {execution_id}");
1225        Ok(CancelOutcome::Cancelled)
1226    }
1227}
1228
1229pub enum AppendDelayResponseOutcome {
1230    Success,
1231    AlreadyFinished,
1232    AlreadyCancelled,
1233}
1234
1235#[derive(Debug, Clone, Default)]
1236pub struct ListExecutionsFilter {
1237    pub function_name_filter: Option<FunctionNameFilter>,
1238    pub show_derived: bool,
1239    pub hide_finished: bool,
1240    pub execution_id_prefix: Option<String>,
1241    pub component_digest: Option<ComponentDigest>,
1242    pub deployment_id: Option<DeploymentId>,
1243}
1244
1245#[derive(Debug, Clone, PartialEq, Eq)]
1246pub enum FunctionNameFilter {
1247    PackageName(String),
1248    InterfaceName(String),
1249    FunctionName(String),
1250}
1251
1252impl FunctionNameFilter {
1253    #[must_use]
1254    pub fn like_pattern(&self) -> String {
1255        match self {
1256            Self::FunctionName(function_name) | Self::InterfaceName(function_name) => {
1257                format!("{function_name}%")
1258            }
1259            Self::PackageName(package_name) => {
1260                if let Some((pkg_fqn_without_version, version)) = package_name.rsplit_once('@')
1261                    && !version.is_empty()
1262                    && pkg_fqn_without_version.contains(':')
1263                {
1264                    format!("{pkg_fqn_without_version}/%@{version}.%")
1265                } else {
1266                    format!("{package_name}%")
1267                }
1268            }
1269        }
1270    }
1271}
1272
1273#[async_trait]
1274pub trait DbExternalApi: DbConnection {
1275    /// Get the latest backtrace if version is not set.
1276    async fn get_backtrace(
1277        &self,
1278        execution_id: &ExecutionId,
1279        filter: BacktraceFilter,
1280    ) -> Result<BacktraceInfo, DbErrorRead>;
1281
1282    /// Store a source file associated with a component digest.
1283    /// `frame_key` is either an exact frame symbol path or a suffix (with leading `/`)
1284    /// when `is_suffix` is true. Idempotent — repeated calls for the same key are ignored.
1285    async fn upsert_source_file(
1286        &self,
1287        component_digest: &ComponentDigest,
1288        frame_key: &str,
1289        is_suffix: bool,
1290        content: &str,
1291    ) -> Result<(), DbErrorWrite>;
1292
1293    /// Look up a source file by component digest and a frame symbol path.
1294    /// Matches either exact keys or suffix keys (where the frame path ends with the stored key).
1295    /// Returns `None` if not found or if multiple suffix entries match (ambiguous).
1296    async fn get_source_file(
1297        &self,
1298        component_digest: &ComponentDigest,
1299        file: &str,
1300    ) -> Result<Option<String>, DbErrorRead>;
1301
1302    /// Insert or reuse normalized component metadata rows.
1303    async fn upsert_component_metadata(
1304        &self,
1305        records: Vec<ComponentMetadataRecord>,
1306    ) -> Result<(), DbErrorWrite>;
1307
1308    /// Insert deployment-local component bindings for a deployment.
1309    async fn insert_deployment_components(
1310        &self,
1311        deployment_id: DeploymentId,
1312        records: Vec<DeploymentComponentRecord>,
1313    ) -> Result<(), DbErrorWrite>;
1314
1315    /// List all components visible in a deployment, including persisted imports, exports and WIT.
1316    async fn list_deployment_components(
1317        &self,
1318        deployment_id: DeploymentId,
1319    ) -> Result<Vec<DeploymentComponentDetail>, DbErrorRead>;
1320
1321    /// Get the WIT for a component digest scoped to a deployment.
1322    async fn get_deployment_component_wit(
1323        &self,
1324        deployment_id: DeploymentId,
1325        component_digest: &ComponentDigest,
1326    ) -> Result<Option<String>, DbErrorRead>;
1327
1328    /// Returns executions sorted in descending order.
1329    async fn list_executions(
1330        &self,
1331        filter: ListExecutionsFilter,
1332        pagination: ExecutionListPagination,
1333    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
1334
1335    /// Returns execution events for the given execution.
1336    ///
1337    /// Results are always ordered from oldest to newest (ascending by version),
1338    /// regardless of pagination direction.
1339    async fn list_execution_events(
1340        &self,
1341        execution_id: &ExecutionId,
1342        pagination: Pagination<VersionType>,
1343        include_backtrace_id: bool,
1344    ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
1345
1346    /// Returns responses of an execution ordered as they arrived,
1347    /// enabling matching each `JoinNext` to its corresponding response.
1348    ///
1349    /// Results are always ordered from oldest to newest (ascending by cursor),
1350    /// regardless of pagination direction.
1351    ///
1352    /// As an optimization, the implementation can return an empty list of `responses`
1353    /// and `max_cursor` set to 0 if the execution is not found.
1354    async fn list_responses(
1355        &self,
1356        execution_id: &ExecutionId,
1357        pagination: Pagination<u32>,
1358    ) -> Result<ListResponsesResponse, DbErrorRead>;
1359
1360    async fn list_execution_events_responses(
1361        &self,
1362        execution_id: &ExecutionId,
1363        req_since: &Version,
1364        req_max_length: VersionType,
1365        req_include_backtrace_id: bool,
1366        resp_pagination: Pagination<VersionType>,
1367    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
1368
1369    async fn upgrade_execution_component(
1370        &self,
1371        execution_id: &ExecutionId,
1372        old: &ComponentDigest,
1373        new: &ComponentDigest,
1374    ) -> Result<(), DbErrorWrite>;
1375
1376    async fn list_logs(
1377        &self,
1378        execution_id: &ExecutionId,
1379        show_derived: bool,
1380        filter: LogFilter,
1381        pagination: Pagination<DateTime<Utc>>,
1382    ) -> Result<ListLogsResponse, DbErrorRead>;
1383
1384    async fn list_deployment_states(
1385        &self,
1386        current_time: DateTime<Utc>,
1387        pagination: Pagination<Option<DeploymentId>>,
1388        include_config_json: bool,
1389    ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1390
1391    /// Insert a new deployment. The record must have `status == Inactive` and
1392    /// `last_active_at == None`; activation is a separate step via [`Self::activate_deployment`].
1393    async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite>;
1394
1395    async fn activate_deployment(
1396        &self,
1397        deployment_id: DeploymentId,
1398        now: DateTime<Utc>,
1399    ) -> Result<(), DbErrorWrite>;
1400
1401    /// Mark a deployment as Enqueued (pending next server restart).
1402    /// Returns `Err(DbErrorWriteNonRetriable::Conflict)` if the deployment is currently Active.
1403    /// Any previously Enqueued deployment is demoted to Inactive.
1404    async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite>;
1405
1406    /// Returned [`DeploymentRecord`] must contain `config_json`.
1407    async fn get_deployment(
1408        &self,
1409        deployment_id: DeploymentId,
1410    ) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1411
1412    /// Return active deployment.
1413    /// Returned [`DeploymentRecord`] must contain `config_json`.
1414    #[cfg(feature = "test")]
1415    async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1416
1417    /// Return the most relevant current deployment: Enqueued if present, otherwise Active.
1418    /// Returned [`DeploymentRecord`] must contain `config_json`.
1419    async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1420
1421    async fn list_deployments(
1422        &self,
1423        pagination: Pagination<Option<DeploymentId>>,
1424    ) -> Result<Vec<DeploymentRecord>, DbErrorRead>;
1425
1426    /// Pause an execution. Only pending executions can be paused.
1427    /// If the execution is currently locked, implementations must release the lock first
1428    /// and then record the paused state.
1429    async fn pause_execution(
1430        &self,
1431        execution_id: &ExecutionId,
1432        paused_at: DateTime<Utc>,
1433    ) -> Result<AppendResponse, DbErrorWrite>;
1434
1435    /// Unpause an execution. Only paused executions can be unpaused.
1436    async fn unpause_execution(
1437        &self,
1438        execution_id: &ExecutionId,
1439        unpaused_at: DateTime<Utc>,
1440    ) -> Result<AppendResponse, DbErrorWrite>;
1441
1442    /// Pause a delay, preventing it from being picked up by the expired timers watcher.
1443    /// No-op if the delay is already paused.
1444    /// Returns `NotFound` if the delay does not exist (already processed or cancelled).
1445    async fn pause_delay(&self, delay_id: &DelayId) -> Result<(), DbErrorWrite>;
1446
1447    /// Unpause a previously paused delay.
1448    /// No-op if the delay is already unpaused.
1449    /// Returns `NotFound` if the delay does not exist (already processed or cancelled).
1450    async fn unpause_delay(&self, delay_id: &DelayId) -> Result<(), DbErrorWrite>;
1451}
1452pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1453pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1454    Pagination::OlderThan {
1455        length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1456        cursor: None,
1457        including_cursor: false,
1458    };
1459
1460pub struct DeploymentState {
1461    pub deployment_id: DeploymentId,
1462    pub locked: u32,
1463    // In `PendingAt` state, scheduled to present or past
1464    pub pending: u32,
1465    // In `PendingAt` state, scheduled into the future
1466    pub scheduled: u32,
1467    pub blocked: u32,
1468    pub finished: u32,
1469    /// None if not requested from db.
1470    pub config_json: Option<String>,
1471    pub created_at: DateTime<Utc>,
1472    /// Set when the deployment becomes Active; None if it has never been active.
1473    pub last_active_at: Option<DateTime<Utc>>,
1474    pub status: DeploymentStatus,
1475}
1476
1477#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1478pub enum DeploymentStatus {
1479    Inactive,
1480    /// Queued to become Active on the next server restart.
1481    Enqueued,
1482    Active,
1483}
1484
1485impl DeploymentStatus {
1486    #[must_use]
1487    pub fn as_str(&self) -> &'static str {
1488        match self {
1489            DeploymentStatus::Inactive => "inactive",
1490            DeploymentStatus::Enqueued => "enqueued",
1491            DeploymentStatus::Active => "active",
1492        }
1493    }
1494}
1495
1496impl std::str::FromStr for DeploymentStatus {
1497    type Err = StrVariant;
1498    fn from_str(s: &str) -> Result<Self, Self::Err> {
1499        match s {
1500            "inactive" => Ok(DeploymentStatus::Inactive),
1501            "enqueued" => Ok(DeploymentStatus::Enqueued),
1502            "active" => Ok(DeploymentStatus::Active),
1503            _ => Err(StrVariant::from(format!("unknown deployment status: {s}"))),
1504        }
1505    }
1506}
1507
1508#[derive(Debug, Clone)]
1509pub struct DeploymentRecord {
1510    pub deployment_id: DeploymentId,
1511    pub created_at: DateTime<Utc>,
1512    /// Set when the deployment becomes Active; None if it has never been active.
1513    pub last_active_at: Option<DateTime<Utc>>,
1514    pub status: DeploymentStatus,
1515    pub config_json: String, // Serialized `DeploymentCanonical`
1516    pub obelisk_version: String,
1517    pub created_by: Option<String>,
1518}
1519
1520#[derive(Debug, Clone)]
1521pub struct ComponentMetadataRecord {
1522    pub component_digest: ComponentDigest,
1523    pub imports: Vec<PersistedFunctionMetadata>,
1524    pub exports: Vec<PersistedFunctionMetadata>,
1525    pub wit: String,
1526    pub wit_origin: String,
1527}
1528
1529/// Relation between a deployment and its components
1530#[derive(Debug, Clone)]
1531pub struct DeploymentComponentRecord {
1532    pub deployment_id: DeploymentId,
1533    pub component_name: StrVariant,
1534    pub component_digest: ComponentDigest,
1535    pub component_type: ComponentType,
1536}
1537
1538#[derive(Debug, Clone)]
1539pub struct DeploymentComponentDetail {
1540    pub component_id: ComponentId,
1541    pub imports: Vec<PersistedFunctionMetadata>,
1542    pub exports: Vec<PersistedFunctionMetadata>,
1543    pub wit: String,
1544}
1545
1546#[derive(
1547    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, schemars::JsonSchema,
1548)]
1549pub struct PersistedFunctionMetadata {
1550    pub ffqn: FunctionFqn,
1551    pub parameter_types: Vec<PersistedParameterType>,
1552    pub return_type: String,
1553    pub extension: Option<FunctionExtension>,
1554    pub submittable: bool,
1555}
1556
1557#[derive(
1558    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, schemars::JsonSchema,
1559)]
1560pub struct PersistedParameterType {
1561    pub name: String,
1562    pub wit_type: String,
1563}
1564
1565impl From<FunctionMetadata> for PersistedFunctionMetadata {
1566    fn from(value: FunctionMetadata) -> Self {
1567        PersistedFunctionMetadata {
1568            ffqn: value.ffqn,
1569            parameter_types: value
1570                .parameter_types
1571                .0
1572                .into_iter()
1573                .map(|param| PersistedParameterType {
1574                    name: param.name.to_string(),
1575                    wit_type: param.wit_type.to_string(),
1576                })
1577                .collect(),
1578            return_type: value.return_type.wit_type().to_string(),
1579            extension: value.extension,
1580            submittable: value.submittable,
1581        }
1582    }
1583}
1584
1585#[derive(Debug)]
1586pub struct ListLogsResponse {
1587    pub items: Vec<LogEntryRow>,
1588    pub next_page: Pagination<DateTime<Utc>>, // Newer logs can always arrive e.g. via replay
1589    pub prev_page: Option<Pagination<DateTime<Utc>>>, // None if we are already at the beginning
1590}
1591
1592#[derive(Debug)]
1593pub struct LogFilter {
1594    show_logs: bool,
1595    show_streams: bool,
1596    levels: Vec<LogLevel>, // Only applied if `show_logs` = true, empty means return all levels.
1597    stream_types: Vec<LogStreamType>, // Only applied if `show_streams` = true, empty means return all stream types.
1598}
1599impl LogFilter {
1600    // Constructor for logs only
1601    #[must_use]
1602    pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1603        LogFilter {
1604            show_logs: true,
1605            show_streams: false,
1606            levels,
1607            stream_types: Vec::new(),
1608        }
1609    }
1610    // Constructor for streams only
1611    #[must_use]
1612    pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1613        LogFilter {
1614            show_logs: false,
1615            show_streams: true,
1616            levels: Vec::new(),
1617            stream_types,
1618        }
1619    }
1620    // Constructor for both logs and streams
1621    #[must_use]
1622    pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1623        LogFilter {
1624            show_logs: true,
1625            show_streams: true,
1626            levels,
1627            stream_types,
1628        }
1629    }
1630    // Getters
1631    #[must_use]
1632    pub fn should_show_logs(&self) -> bool {
1633        self.show_logs
1634    }
1635    #[must_use]
1636    pub fn should_show_streams(&self) -> bool {
1637        self.show_streams
1638    }
1639    #[must_use]
1640    pub fn levels(&self) -> &Vec<LogLevel> {
1641        &self.levels
1642    }
1643    #[must_use]
1644    pub fn stream_types(&self) -> &Vec<LogStreamType> {
1645        &self.stream_types
1646    }
1647}
1648
1649#[derive(Debug, Clone)]
1650pub struct ExecutionWithStateRequestsResponses {
1651    pub execution_with_state: ExecutionWithState,
1652    pub events: Vec<ExecutionEvent>,
1653    pub responses: Vec<ResponseWithCursor>,
1654    pub max_version: Version,
1655    pub max_cursor: ResponseCursor,
1656}
1657
1658#[async_trait]
1659pub trait DbConnection: DbExecutor {
1660    /// Get execution log.
1661    async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1662
1663    async fn append_delay_response(
1664        &self,
1665        created_at: DateTime<Utc>,
1666        execution_id: ExecutionId,
1667        join_set_id: JoinSetId,
1668        delay_id: DelayId,
1669        outcome: Result<(), ()>, // Successfully finished - `Ok(())` or cancelled - `Err(())`
1670    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1671
1672    /// Append a batch of events to an existing execution log.
1673    /// The batch must not contain [`ExecutionRequest::Created`].
1674    async fn append_batch(
1675        &self,
1676        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1677        batch: Vec<AppendRequest>,
1678        execution_id: ExecutionId,
1679        version: Version,
1680    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1681
1682    /// Append one or more events to the parent execution log, and create zero or more child execution logs.
1683    /// The batch must not contain [`ExecutionRequest::Created`].
1684    async fn append_batch_create_new_execution(
1685        &self,
1686        current_time: DateTime<Utc>, // not persisted, can be used for unblocking `subscribe_to_pending`
1687        batch: Vec<AppendRequest>,   // must not contain [`ExecutionRequest::Created`] events
1688        execution_id: ExecutionId,
1689        version: Version,
1690        child_req: Vec<CreateRequest>,
1691        backtraces: Vec<BacktraceInfo>,
1692    ) -> Result<AppendBatchResponse, DbErrorWrite>;
1693
1694    /// Get a single event specified by version. Impls may set `ExecutionEvent::backtrace_id` to `None`.
1695    async fn get_execution_event(
1696        &self,
1697        execution_id: &ExecutionId,
1698        version: &Version,
1699    ) -> Result<ExecutionEvent, DbErrorRead>;
1700
1701    /// Idempotent stub response write. Appends a Finished event to the child execution
1702    /// and a response to the parent. If the child is already finished with the same retval,
1703    /// succeeds silently. If finished with a different retval, returns [`DbErrorStubResponse::StubConflict`].
1704    async fn upsert_stub_response(
1705        &self,
1706        execution_id: ExecutionIdDerived,
1707        version: Version,
1708        req: AppendRequest,
1709        response: AppendResponseToExecution,
1710        current_time: DateTime<Utc>,
1711    ) -> Result<(), DbErrorStubResponse>;
1712
1713    #[instrument(skip(self))]
1714    async fn get_create_request(
1715        &self,
1716        execution_id: &ExecutionId,
1717    ) -> Result<CreateRequest, DbErrorRead> {
1718        let execution_event = self
1719            .get_execution_event(execution_id, &Version::new(0))
1720            .await?;
1721        if let ExecutionRequest::Created {
1722            ffqn,
1723            params,
1724            parent,
1725            scheduled_at,
1726            component_id,
1727            deployment_id,
1728            metadata,
1729            scheduled_by,
1730        } = execution_event.event
1731        {
1732            Ok(CreateRequest {
1733                created_at: execution_event.created_at,
1734                execution_id: execution_id.clone(),
1735                ffqn,
1736                params,
1737                parent,
1738                scheduled_at,
1739                component_id,
1740                deployment_id,
1741                metadata,
1742                scheduled_by,
1743                paused: false,
1744            })
1745        } else {
1746            Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1747                reason: "execution log must start with creation".into(),
1748                context: SpanTrace::capture(),
1749                source: None,
1750                loc: Location::caller(),
1751            }))
1752        }
1753    }
1754
1755    async fn get_pending_state(
1756        &self,
1757        execution_id: &ExecutionId,
1758    ) -> Result<ExecutionWithState, DbErrorRead>;
1759
1760    /// Get currently expired locks and async timers (delay requests)
1761    async fn get_expired_timers(
1762        &self,
1763        at: DateTime<Utc>,
1764    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1765
1766    /// Create a new execution log
1767    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1768
1769    /// Notification mechainism with no strict guarantees for getting notified when a new response arrives.
1770    /// Parameter `start_idx` must be at most be equal to current size of responses in the execution log.
1771    /// If no response arrives immediately and `interrupt_after` resolves, `DbErrorReadWithTimeout::Timeout` is returned.
1772    /// Implementations with no pubsub support should use polling.
1773    /// Callers are expected to call this function in a loop with a reasonable timeout
1774    /// to support less stellar implementations.
1775    async fn subscribe_to_next_responses(
1776        &self,
1777        execution_id: &ExecutionId,
1778        last_response: ResponseCursor,
1779        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1780    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1781
1782    /// First, attempt to fetch the finished value. If the execution is not finished yet, poll
1783    /// periodically or subscribe to db changes, racing with `timeout_fut`.
1784    /// Notification mechainism with no strict guarantees for getting the finished result.
1785    /// Implementations with no pubsub support should use polling.
1786    /// Callers are expected to call this function in a loop with a reasonable timeout
1787    /// to support less stellar implementations.
1788    async fn wait_for_finished_result(
1789        &self,
1790        execution_id: &ExecutionId,
1791        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1792    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1793
1794    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1795
1796    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1797
1798    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1799
1800    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1801
1802    /// Returns `TimeoutOutcome::Timeout` if not in Finished state.
1803    #[cfg(feature = "test")]
1804    async fn get_finished_result(
1805        &self,
1806        execution_id: &ExecutionId,
1807    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1808        self.wait_for_finished_result(
1809            execution_id,
1810            Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1811        )
1812        .await
1813    }
1814}
1815
1816#[derive(Clone, Debug)]
1817pub struct LogInfoAppendRow {
1818    pub execution_id: ExecutionId,
1819    pub run_id: RunId,
1820    pub log_entry: LogEntry,
1821}
1822
1823#[derive(Debug, Clone)]
1824pub struct LogEntryRow {
1825    pub cursor: DateTime<Utc>,
1826    pub run_id: RunId,
1827    pub log_entry: LogEntry,
1828    pub execution_id: ExecutionId,
1829}
1830
1831#[derive(Debug, Clone)]
1832pub enum LogEntry {
1833    Log {
1834        created_at: DateTime<Utc>,
1835        level: LogLevel,
1836        message: String,
1837    },
1838    Stream {
1839        created_at: DateTime<Utc>,
1840        payload: Vec<u8>,
1841        stream_type: LogStreamType,
1842    },
1843}
1844impl LogEntry {
1845    #[must_use]
1846    pub fn created_at(&self) -> DateTime<Utc> {
1847        match self {
1848            LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1849        }
1850    }
1851}
1852
1853#[derive(
1854    Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, derive_more::TryFrom, strum::EnumIter,
1855)]
1856#[try_from(repr)]
1857#[repr(u8)]
1858pub enum LogLevel {
1859    Trace = 1,
1860    Debug,
1861    Info,
1862    Warn,
1863    Error,
1864}
1865#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1866#[try_from(repr)]
1867#[repr(u8)]
1868pub enum LogStreamType {
1869    StdOut = 1,
1870    StdErr,
1871}
1872
1873#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1874pub enum TimeoutOutcome {
1875    Timeout,
1876    Cancel,
1877}
1878
1879#[cfg(feature = "test")]
1880#[async_trait]
1881pub trait DbConnectionTest: DbConnection {
1882    async fn append_response(
1883        &self,
1884        created_at: DateTime<Utc>,
1885        execution_id: ExecutionId,
1886        response_event: JoinSetResponseEvent,
1887    ) -> Result<(), DbErrorWrite>;
1888}
1889
1890#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1891pub enum CancelOutcome {
1892    Cancelled,
1893    AlreadyFinished,
1894}
1895
1896#[instrument(skip(db_connection))]
1897pub async fn stub_execution(
1898    db_connection: &dyn DbConnection,
1899    execution_id: ExecutionIdDerived,
1900    parent_execution_id: ExecutionId,
1901    join_set_id: JoinSetId,
1902    created_at: DateTime<Utc>,
1903    return_value: SupportedFunctionReturnValue,
1904) -> Result<(), DbErrorWrite> {
1905    let stub_finished_version = Version::new(1); // Stub activities have no execution log except Created event.
1906    let finished_req = AppendRequest {
1907        created_at,
1908        event: ExecutionRequest::Finished {
1909            retval: return_value.clone(),
1910            http_client_traces: None,
1911        },
1912    };
1913    db_connection
1914        .upsert_stub_response(
1915            execution_id.clone(),
1916            stub_finished_version.clone(),
1917            finished_req,
1918            AppendResponseToExecution {
1919                parent_execution_id,
1920                created_at,
1921                join_set_id,
1922                child_execution_id: execution_id,
1923                finished_version: stub_finished_version,
1924                result: return_value,
1925            },
1926            created_at,
1927        )
1928        .await
1929        .map_err(|err| match err {
1930            DbErrorStubResponse::StubConflict => {
1931                DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)
1932            }
1933            DbErrorStubResponse::Write(db_err) => db_err,
1934        })
1935}
1936
1937pub async fn cancel_delay(
1938    db_connection: &dyn DbConnection,
1939    delay_id: DelayId,
1940    cancelled_at: DateTime<Utc>,
1941) -> Result<CancelOutcome, DbErrorWrite> {
1942    let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1943    db_connection
1944        .append_delay_response(
1945            cancelled_at,
1946            parent_execution_id,
1947            join_set_id,
1948            delay_id,
1949            Err(()), // Mark as cancelled.
1950        )
1951        .await
1952        .map(|ok| match ok {
1953            AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1954                CancelOutcome::Cancelled
1955            }
1956            AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1957        })
1958}
1959
1960#[derive(Clone, Debug)]
1961pub enum BacktraceFilter {
1962    First,
1963    Last,
1964    Specific(Version),
1965}
1966
1967#[derive(Clone, Debug, PartialEq, Eq)]
1968#[cfg_attr(feature = "test", derive(Serialize))]
1969pub struct BacktraceInfo {
1970    pub execution_id: ExecutionId,
1971    pub component_id: ComponentId,
1972    pub version_min_including: Version,
1973    pub version_max_excluding: Version,
1974    pub wasm_backtrace: WasmBacktrace,
1975}
1976
1977#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1978pub struct WasmBacktrace {
1979    pub frames: Vec<FrameInfo>,
1980}
1981
1982#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1983pub struct FrameInfo {
1984    pub module: String,
1985    pub func_name: String,
1986    pub symbols: Vec<FrameSymbol>,
1987}
1988
1989#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1990pub struct FrameSymbol {
1991    pub func_name: Option<String>,
1992    pub file: Option<String>,
1993    pub line: Option<u32>,
1994    pub col: Option<u32>,
1995}
1996
1997mod wasm_backtrace {
1998    use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1999
2000    impl WasmBacktrace {
2001        pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
2002            if backtrace.frames().is_empty() {
2003                None
2004            } else {
2005                Some(Self {
2006                    frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
2007                })
2008            }
2009        }
2010    }
2011
2012    impl From<&wasmtime::FrameInfo> for FrameInfo {
2013        fn from(frame: &wasmtime::FrameInfo) -> Self {
2014            let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
2015            let mut func_name = String::new();
2016            wasmtime_environ::demangle_function_name_or_index(
2017                &mut func_name,
2018                frame.func_name(),
2019                frame.func_index() as usize,
2020            )
2021            .expect("writing to string must succeed");
2022            Self {
2023                module: module_name,
2024                func_name,
2025                symbols: frame
2026                    .symbols()
2027                    .iter()
2028                    .map(std::convert::Into::into)
2029                    .collect(),
2030            }
2031        }
2032    }
2033
2034    impl From<&wasmtime::FrameSymbol> for FrameSymbol {
2035        fn from(symbol: &wasmtime::FrameSymbol) -> Self {
2036            let func_name = symbol.name().map(|name| {
2037                let mut writer = String::new();
2038                wasmtime_environ::demangle_function_name(&mut writer, name)
2039                    .expect("writing to string must succeed");
2040                writer
2041            });
2042
2043            Self {
2044                func_name,
2045                file: symbol.file().map(ToString::to_string),
2046                line: symbol.line(),
2047                col: symbol.column(),
2048            }
2049        }
2050    }
2051}
2052#[derive(Debug, Clone, derive_more::Display)]
2053#[display("{execution_id} {pending_state} {component_digest}")]
2054pub struct ExecutionWithState {
2055    pub execution_id: ExecutionId,
2056    pub ffqn: FunctionFqn,
2057    pub pending_state: PendingState,
2058    pub created_at: DateTime<Utc>,
2059    pub first_scheduled_at: DateTime<Utc>,
2060    pub component_digest: ComponentDigest,
2061    pub component_type: ComponentType,
2062    pub deployment_id: DeploymentId,
2063}
2064
2065#[derive(Debug, Clone)]
2066pub enum ExecutionListPagination {
2067    CreatedBy(Pagination<Option<DateTime<Utc>>>),
2068    ExecutionId(Pagination<Option<ExecutionId>>),
2069}
2070impl Default for ExecutionListPagination {
2071    fn default() -> ExecutionListPagination {
2072        ExecutionListPagination::CreatedBy(Pagination::OlderThan {
2073            length: 20,
2074            cursor: None,
2075            including_cursor: false, // does not matter when `cursor` is not specified
2076        })
2077    }
2078}
2079impl ExecutionListPagination {
2080    #[must_use]
2081    pub fn length(&self) -> u16 {
2082        match self {
2083            ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
2084            ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
2085        }
2086    }
2087}
2088
2089#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2090pub enum Pagination<T> {
2091    NewerThan {
2092        length: u16,
2093        cursor: T,
2094        including_cursor: bool,
2095    },
2096    OlderThan {
2097        length: u16,
2098        cursor: T,
2099        including_cursor: bool,
2100    },
2101}
2102impl<T: Clone> Pagination<T> {
2103    pub fn length(&self) -> u16 {
2104        match self {
2105            Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
2106        }
2107    }
2108
2109    pub fn rel(&self) -> &'static str {
2110        match self {
2111            Pagination::NewerThan {
2112                including_cursor: false,
2113                ..
2114            } => ">",
2115            Pagination::NewerThan {
2116                including_cursor: true,
2117                ..
2118            } => ">=",
2119            Pagination::OlderThan {
2120                including_cursor: false,
2121                ..
2122            } => "<",
2123            Pagination::OlderThan {
2124                including_cursor: true,
2125                ..
2126            } => "<=",
2127        }
2128    }
2129
2130    pub fn is_desc(&self) -> bool {
2131        matches!(self, Pagination::OlderThan { .. })
2132    }
2133
2134    pub fn asc_or_desc(&self) -> &'static str {
2135        if self.is_asc() { "asc" } else { "desc" }
2136    }
2137
2138    pub fn is_asc(&self) -> bool {
2139        !self.is_desc()
2140    }
2141
2142    pub fn cursor(&self) -> &T {
2143        match self {
2144            Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
2145        }
2146    }
2147
2148    #[must_use]
2149    pub fn invert(&self) -> Self {
2150        match self {
2151            Pagination::NewerThan {
2152                length,
2153                cursor,
2154                including_cursor,
2155            } => Pagination::OlderThan {
2156                length: *length,
2157                cursor: cursor.clone(),
2158                including_cursor: !including_cursor,
2159            },
2160            Pagination::OlderThan {
2161                length,
2162                cursor,
2163                including_cursor,
2164            } => Pagination::NewerThan {
2165                length: *length,
2166                cursor: cursor.clone(),
2167                including_cursor: !including_cursor,
2168            },
2169        }
2170    }
2171}
2172
2173#[cfg(feature = "test")]
2174pub async fn wait_for_pending_state_fn<T: Debug>(
2175    db_connection: &dyn DbConnectionTest,
2176    execution_id: &ExecutionId,
2177    predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
2178    timeout: Option<Duration>,
2179) -> Result<T, DbErrorReadWithTimeout> {
2180    tracing::trace!(%execution_id, "Waiting for predicate");
2181    let fut = async move {
2182        loop {
2183            let execution_log = db_connection.get(execution_id).await?;
2184            if let Some(t) = predicate(execution_log) {
2185                tracing::debug!(%execution_id, "Found: {t:?}");
2186                return Ok(t);
2187            }
2188            tokio::time::sleep(Duration::from_millis(10)).await;
2189        }
2190    };
2191
2192    if let Some(timeout) = timeout {
2193        tokio::select! { // future's liveness: Dropping the loser immediately.
2194            res = fut => res,
2195            () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
2196        }
2197    } else {
2198        fut.await
2199    }
2200}
2201
2202#[derive(Debug, Clone, PartialEq, Eq)]
2203pub enum ExpiredTimer {
2204    Lock(ExpiredLock),
2205    Delay(ExpiredDelay),
2206}
2207
2208#[derive(Debug, Clone, PartialEq, Eq)]
2209pub struct ExpiredLock {
2210    pub execution_id: ExecutionId,
2211    // Version of last `Locked` event, used to detect whether the execution made progress.
2212    pub locked_at_version: Version,
2213    pub next_version: Version,
2214    /// As the execution may still be running, this represents the number of intermittent failures + timeouts prior to this execution.
2215    pub intermittent_event_count: u32,
2216    pub max_retries: Option<u32>,
2217    pub retry_exp_backoff: Duration,
2218    pub locked_by: LockedBy,
2219}
2220
2221#[derive(Debug, Clone, PartialEq, Eq)]
2222pub struct ExpiredDelay {
2223    pub execution_id: ExecutionId,
2224    pub join_set_id: JoinSetId,
2225    pub delay_id: DelayId,
2226}
2227
2228#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2229#[serde(tag = "status", rename_all = "snake_case")]
2230pub enum PendingState {
2231    /// Caused by [`ExecutionRequest::Locked`].
2232    Locked(PendingStateLocked),
2233
2234    #[display("PendingAt(`{_0}`)")]
2235    PendingAt(PendingStatePendingAt),
2236
2237    /// Caused by [`HistoryEvent::JoinNext`]
2238    #[display("BlockedByJoinSet({_0})")]
2239    BlockedByJoinSet(PendingStateBlockedByJoinSet),
2240
2241    #[display("Paused({_0})")]
2242    Paused(PendingStatePaused),
2243
2244    #[display("Finished: {_0}")]
2245    Finished(PendingStateFinished),
2246}
2247
2248pub enum PendingStateMergedPause {
2249    Locked {
2250        state: PendingStateLocked,
2251        paused: bool,
2252    },
2253    PendingAt {
2254        state: PendingStatePendingAt,
2255        paused: bool,
2256    },
2257    BlockedByJoinSet {
2258        state: PendingStateBlockedByJoinSet,
2259        paused: bool,
2260    },
2261    Finished(PendingStateFinished),
2262}
2263impl From<PendingState> for PendingStateMergedPause {
2264    fn from(state: PendingState) -> Self {
2265        match state {
2266            PendingState::Locked(s) => PendingStateMergedPause::Locked {
2267                state: s,
2268                paused: false,
2269            },
2270
2271            PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
2272                state: s,
2273                paused: false,
2274            },
2275
2276            PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
2277                state: s,
2278                paused: false,
2279            },
2280
2281            PendingState::Paused(paused) => match paused {
2282                PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
2283                    state: s,
2284                    paused: true,
2285                },
2286                PendingStatePaused::BlockedByJoinSet(s) => {
2287                    PendingStateMergedPause::BlockedByJoinSet {
2288                        state: s,
2289                        paused: true,
2290                    }
2291                }
2292            },
2293
2294            PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
2295        }
2296    }
2297}
2298
2299#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2300#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
2301pub struct PendingStateLocked {
2302    pub locked_by: LockedBy,
2303    pub lock_expires_at: DateTime<Utc>,
2304}
2305
2306#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2307#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
2308pub struct PendingStatePendingAt {
2309    pub scheduled_at: DateTime<Utc>,
2310    /// `last_lock` is needed for lock extension.
2311    pub last_lock: Option<LockedBy>,
2312}
2313
2314#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2315#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
2316pub struct PendingStateBlockedByJoinSet {
2317    pub join_set_id: JoinSetId,
2318    /// See [`HistoryEvent::JoinNext::lock_expires_at`].
2319    pub lock_expires_at: DateTime<Utc>,
2320    /// Blocked by closing of the join set
2321    pub closing: bool,
2322}
2323
2324/// State of execution before it was paused.
2325///
2326/// Pausing a locked execution must first append `Unlocked` and only then `Paused`.
2327#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2328pub enum PendingStatePaused {
2329    #[display("PendingAt({_0})")]
2330    PendingAt(PendingStatePendingAt),
2331    #[display("BlockedByJoinSet({_0})")]
2332    BlockedByJoinSet(PendingStateBlockedByJoinSet),
2333}
2334
2335#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2336pub struct LockedBy {
2337    pub executor_id: ExecutorId,
2338    pub run_id: RunId,
2339}
2340impl From<&Locked> for LockedBy {
2341    fn from(value: &Locked) -> Self {
2342        LockedBy {
2343            executor_id: value.executor_id,
2344            run_id: value.run_id,
2345        }
2346    }
2347}
2348
2349#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2350#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
2351pub struct PendingStateFinished {
2352    pub version: VersionType, // not Version since it must be Copy
2353    pub finished_at: DateTime<Utc>,
2354    pub result_kind: PendingStateFinishedResultKind,
2355}
2356impl Display for PendingStateFinished {
2357    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2358        match self.result_kind {
2359            PendingStateFinishedResultKind::Ok => write!(f, "OK"),
2360            PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
2361        }
2362    }
2363}
2364
2365// This is not a Result so that it can be customized for serialization
2366#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2367#[serde(rename_all = "snake_case")]
2368pub enum PendingStateFinishedResultKind {
2369    Ok,
2370    Err(PendingStateFinishedError),
2371}
2372impl PendingStateFinishedResultKind {
2373    pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
2374        match self {
2375            PendingStateFinishedResultKind::Ok => Ok(()),
2376            PendingStateFinishedResultKind::Err(err) => Err(err),
2377        }
2378    }
2379}
2380
2381impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
2382    fn from(result: &SupportedFunctionReturnValue) -> Self {
2383        result.as_pending_state_finished_result()
2384    }
2385}
2386
2387#[derive(
2388    Debug,
2389    Clone,
2390    Copy,
2391    PartialEq,
2392    Eq,
2393    Serialize,
2394    Deserialize,
2395    derive_more::Display,
2396    schemars::JsonSchema,
2397)]
2398#[serde(rename_all = "snake_case")]
2399pub enum PendingStateFinishedError {
2400    #[display("Execution failure ({_0})")]
2401    ExecutionFailure(ExecutionFailureKind),
2402    #[display("Error")]
2403    Error,
2404}
2405
2406impl PendingState {
2407    #[instrument(skip(self))]
2408    pub fn can_append_lock(
2409        &self,
2410        created_at: DateTime<Utc>,
2411        executor_id: ExecutorId,
2412        run_id: RunId,
2413        lock_expires_at: DateTime<Utc>,
2414    ) -> Result<LockKind, DbErrorWriteNonRetriable> {
2415        if lock_expires_at <= created_at {
2416            return Err(DbErrorWriteNonRetriable::ValidationFailed(
2417                "invalid expiry date".into(),
2418            ));
2419        }
2420        match self {
2421            PendingState::PendingAt(PendingStatePendingAt {
2422                scheduled_at,
2423                last_lock,
2424            }) => {
2425                if *scheduled_at <= created_at {
2426                    // pending now, ok to lock
2427                    Ok(LockKind::CreatingNewLock)
2428                } else if let Some(LockedBy {
2429                    executor_id: last_executor_id,
2430                    run_id: last_run_id,
2431                }) = last_lock
2432                    && executor_id == *last_executor_id
2433                    && run_id == *last_run_id
2434                {
2435                    // Original executor is extending the lock.
2436                    Ok(LockKind::Extending)
2437                } else {
2438                    Err(DbErrorWriteNonRetriable::ValidationFailed(
2439                        "cannot lock, not yet pending".into(),
2440                    ))
2441                }
2442            }
2443            PendingState::Locked(PendingStateLocked {
2444                locked_by:
2445                    LockedBy {
2446                        executor_id: current_pending_state_executor_id,
2447                        run_id: current_pending_state_run_id,
2448                    },
2449                lock_expires_at: _,
2450            }) => {
2451                if executor_id == *current_pending_state_executor_id
2452                    && run_id == *current_pending_state_run_id
2453                {
2454                    // Original executor is extending the lock.
2455                    Ok(LockKind::Extending)
2456                } else {
2457                    Err(DbErrorWriteNonRetriable::IllegalState {
2458                        reason: "cannot lock, already locked".into(),
2459                        context: SpanTrace::capture(),
2460                        source: None,
2461                        loc: Location::caller(),
2462                    })
2463                }
2464            }
2465            PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2466                reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
2467                context: SpanTrace::capture(),
2468                source: None,
2469                loc: Location::caller(),
2470            }),
2471            PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2472                reason: "already finished".into(),
2473                context: SpanTrace::capture(),
2474                source: None,
2475                loc: Location::caller(),
2476            }),
2477            PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
2478                reason: "cannot lock, execution is paused".into(),
2479                context: SpanTrace::capture(),
2480                source: None,
2481                loc: Location::caller(),
2482            }),
2483        }
2484    }
2485
2486    #[must_use]
2487    pub fn is_finished(&self) -> bool {
2488        matches!(self, PendingState::Finished { .. })
2489    }
2490
2491    #[must_use]
2492    pub fn is_paused(&self) -> bool {
2493        matches!(self, PendingState::Paused(_))
2494    }
2495}
2496
2497#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2498pub enum LockKind {
2499    Extending,
2500    CreatingNewLock,
2501}
2502
2503pub mod http_client_trace {
2504    use chrono::{DateTime, Utc};
2505    use serde::{Deserialize, Serialize};
2506
2507    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2508    pub struct HttpClientTrace {
2509        pub req: RequestTrace,
2510        pub resp: Option<ResponseTrace>,
2511    }
2512
2513    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2514    pub struct RequestTrace {
2515        pub sent_at: DateTime<Utc>,
2516        pub uri: String,
2517        pub method: String,
2518    }
2519
2520    #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2521    pub struct ResponseTrace {
2522        pub finished_at: DateTime<Utc>,
2523        pub status: Result<u16, String>,
2524    }
2525}
2526
2527/// Root type for DB schema generation - contains all serialized DB types
2528#[derive(schemars::JsonSchema)]
2529pub struct DbStorageSchema {
2530    pub execution_event: ExecutionEvent,
2531    pub pending_state: PendingState,
2532    pub join_set_response: JoinSetResponse,
2533    pub wasm_backtrace: WasmBacktrace,
2534    pub persisted_function_metadata: PersistedFunctionMetadata,
2535}
2536
2537#[cfg(test)]
2538mod tests {
2539    use super::HistoryEvent;
2540    use super::HistoryEventScheduleAt;
2541    use super::JoinNextTryOutcome;
2542    use super::PendingStateFinished;
2543    use super::PendingStateFinishedError;
2544    use super::PendingStateFinishedResultKind;
2545    use crate::ExecutionFailureKind;
2546    use crate::JoinSetId;
2547    use crate::SupportedFunctionReturnValue;
2548    use chrono::DateTime;
2549    use chrono::Datelike;
2550    use chrono::Utc;
2551    use insta::assert_snapshot;
2552    use rstest::rstest;
2553    use std::time::Duration;
2554    use val_json::type_wrapper::TypeWrapper;
2555    use val_json::wast_val::WastVal;
2556    use val_json::wast_val::WastValWithType;
2557
2558    #[rstest(expected => [
2559        PendingStateFinishedResultKind::Ok,
2560        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2561    ])]
2562    #[test]
2563    fn serde_pending_state_finished_result_kind_should_work(
2564        expected: PendingStateFinishedResultKind,
2565    ) {
2566        let ser = serde_json::to_string(&expected).unwrap();
2567        let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2568        assert_eq!(expected, actual);
2569    }
2570
2571    #[rstest(result_kind => [
2572        PendingStateFinishedResultKind::Ok,
2573        PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2574    ])]
2575    #[test]
2576    fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2577        let expected = PendingStateFinished {
2578            version: 0,
2579            finished_at: Utc::now(),
2580            result_kind,
2581        };
2582
2583        let ser = serde_json::to_string(&expected).unwrap();
2584        let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2585        assert_eq!(expected, actual);
2586    }
2587
2588    #[test]
2589    fn join_set_deser_with_result_ok_option_none_should_work() {
2590        let expected = SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2591            r#type: TypeWrapper::Result {
2592                ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2593                err: Some(Box::new(TypeWrapper::String)),
2594            },
2595            value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2596        }));
2597        let json = serde_json::to_string(&expected).unwrap();
2598        assert_snapshot!(json);
2599
2600        let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2601
2602        assert_eq!(expected, actual);
2603    }
2604
2605    #[test]
2606    fn as_date_time_should_work_with_duration_u32_max_secs() {
2607        let duration = Duration::from_secs(u64::from(u32::MAX));
2608        let schedule_at = HistoryEventScheduleAt::In(duration);
2609        let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2610        assert_eq!(2106, resolved.year());
2611    }
2612
2613    const MILLIS_PER_SEC: i64 = 1000;
2614    const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2615
2616    #[test]
2617    fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2618        // Fails on duration -> timedelta conversion, but a smaller duration can fail on datetime + timedelta
2619        let duration = Duration::from_secs(
2620            u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2621        );
2622        let schedule_at = HistoryEventScheduleAt::In(duration);
2623        schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2624    }
2625
2626    #[test]
2627    fn join_next_try_outcome_new_format() {
2628        let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"found"}"#;
2629        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2630        assert_eq!(
2631            event,
2632            HistoryEvent::JoinNextTry {
2633                join_set_id: JoinSetId::new(
2634                    crate::JoinSetKind::Named,
2635                    crate::StrVariant::Static("test")
2636                )
2637                .unwrap(),
2638                outcome: JoinNextTryOutcome::Found,
2639            }
2640        );
2641
2642        let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"all_processed"}"#;
2643        let event: HistoryEvent = serde_json::from_str(json).unwrap();
2644        assert_eq!(
2645            event,
2646            HistoryEvent::JoinNextTry {
2647                join_set_id: JoinSetId::new(
2648                    crate::JoinSetKind::Named,
2649                    crate::StrVariant::Static("test")
2650                )
2651                .unwrap(),
2652                outcome: JoinNextTryOutcome::AllProcessed,
2653            }
2654        );
2655    }
2656
2657    #[test]
2658    fn join_next_try_outcome_serializes_new_format() {
2659        let event = HistoryEvent::JoinNextTry {
2660            join_set_id: JoinSetId::new(
2661                crate::JoinSetKind::Named,
2662                crate::StrVariant::Static("test"),
2663            )
2664            .unwrap(),
2665            outcome: JoinNextTryOutcome::AllProcessed,
2666        };
2667        let json = serde_json::to_string(&event).unwrap();
2668        assert!(
2669            json.contains(r#""outcome":"all_processed""#),
2670            "expected outcome field, got: {json}"
2671        );
2672        assert!(
2673            !json.contains("found_response"),
2674            "should not contain old field, got: {json}"
2675        );
2676    }
2677
2678    mod stub_retval_hash {
2679        use super::super::{StubRetVal, StubRetValHash};
2680        use crate::SupportedFunctionReturnValue;
2681        use val_json::type_wrapper::TypeWrapper;
2682        use val_json::wast_val::{WastVal, WastValWithType};
2683
2684        #[test]
2685        fn typed_variant_hash_is_stable() {
2686            let retval =
2687                StubRetVal::Typed(SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2688                    r#type: TypeWrapper::String,
2689                    value: WastVal::String("hello".into()),
2690                })));
2691            let hash = retval.hash();
2692            // Hash should start with version byte 0x01
2693            assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2694            // Hash should be 66 hex characters (33 bytes * 2)
2695            assert_eq!(hash.to_string().len(), 66);
2696        }
2697
2698        #[test]
2699        fn untyped_variant_hash_is_stable() {
2700            let retval = StubRetVal::Untyped(r#"{"ok": "hello"}"#.to_string());
2701            let hash = retval.hash();
2702            // Hash should start with version byte 0x01
2703            assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2704            // Hash should be 66 hex characters (33 bytes * 2)
2705            assert_eq!(hash.to_string().len(), 66);
2706        }
2707
2708        #[test]
2709        fn different_values_produce_different_hashes() {
2710            let typed1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2711            let typed2 = StubRetVal::Typed(SupportedFunctionReturnValue::Err(None));
2712            let untyped1 = StubRetVal::Untyped("value1".to_string());
2713            let untyped2 = StubRetVal::Untyped("value2".to_string());
2714
2715            let hashes: Vec<_> = [typed1, typed2, untyped1, untyped2]
2716                .into_iter()
2717                .map(|r| r.hash().to_string())
2718                .collect();
2719
2720            // All hashes should be unique
2721            for (i, h1) in hashes.iter().enumerate() {
2722                for h2 in hashes.iter().skip(i + 1) {
2723                    assert_ne!(h1, h2, "hashes should be different");
2724                }
2725            }
2726        }
2727
2728        #[test]
2729        fn same_values_produce_same_hashes() {
2730            let retval1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2731            let retval2 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2732            assert_eq!(retval1.hash(), retval2.hash());
2733
2734            let untyped1 = StubRetVal::Untyped("test".to_string());
2735            let untyped2 = StubRetVal::Untyped("test".to_string());
2736            assert_eq!(untyped1.hash(), untyped2.hash());
2737        }
2738
2739        #[test]
2740        fn hash_serialization_roundtrip() {
2741            let retval = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2742            let hash = retval.hash();
2743
2744            let serialized = serde_json::to_string(&hash).unwrap();
2745            let deserialized: StubRetValHash = serde_json::from_str(&serialized).unwrap();
2746
2747            assert_eq!(hash, deserialized);
2748        }
2749
2750        #[test]
2751        fn hash_display_and_fromstr_roundtrip() {
2752            let retval = StubRetVal::Untyped("test value".to_string());
2753            let hash = retval.hash();
2754
2755            let display = hash.to_string();
2756            let parsed: StubRetValHash = display.parse().unwrap();
2757
2758            assert_eq!(hash, parsed);
2759        }
2760
2761        #[test]
2762        fn typed_and_untyped_with_same_content_produce_different_hashes() {
2763            // Even if the JSON content is the same, Typed vs Untyped should hash differently
2764            let typed = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2765            let json_of_typed =
2766                serde_json::to_string(&SupportedFunctionReturnValue::Ok(None)).unwrap();
2767            let untyped = StubRetVal::Untyped(json_of_typed);
2768
2769            assert_ne!(typed.hash(), untyped.hash());
2770        }
2771    }
2772}