1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ExecutionFailureKind;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FinishedExecutionError;
7use crate::FunctionFqn;
8use crate::JoinSetId;
9use crate::Params;
10use crate::StrVariant;
11use crate::SupportedFunctionReturnValue;
12use crate::component_id::InputContentDigest;
13use crate::prefixed_ulid::DelayId;
14use crate::prefixed_ulid::ExecutionIdDerived;
15use crate::prefixed_ulid::ExecutorId;
16use crate::prefixed_ulid::RunId;
17use assert_matches::assert_matches;
18use async_trait::async_trait;
19use chrono::TimeDelta;
20use chrono::{DateTime, Utc};
21use http_client_trace::HttpClientTrace;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Display;
26use std::panic::Location;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::time::Duration;
30use tracing::debug;
31use tracing::instrument;
32use tracing_error::SpanTrace;
33
34pub const STATE_PENDING_AT: &str = "pending_at";
35pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
36pub const STATE_LOCKED: &str = "locked";
37pub const STATE_FINISHED: &str = "finished";
38pub static HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next";
39
40#[derive(Debug, PartialEq, Eq)]
42#[cfg_attr(feature = "test", derive(Serialize))]
43pub struct ExecutionLog {
44 pub execution_id: ExecutionId,
45 pub events: Vec<ExecutionEvent>,
46 pub responses: Vec<JoinSetResponseEventOuter>,
47 pub next_version: Version, pub pending_state: PendingState, pub component_digest: InputContentDigest, }
51
52impl ExecutionLog {
53 #[must_use]
54 pub fn as_execution_with_state(&self) -> ExecutionWithState {
55 let (created_at, first_scheduled_at, ffqn) = assert_matches!(self.events.first(), Some(ExecutionEvent {
56 event: ExecutionRequest::Created { ffqn, scheduled_at: first_scheduled_at,.. },
57 created_at,
58 ..
59 }) => (created_at, first_scheduled_at, ffqn));
60
61 ExecutionWithState {
62 execution_id: self.execution_id.clone(),
63 ffqn: ffqn.clone(),
64 pending_state: self.pending_state.clone(),
65 created_at: *created_at,
66 first_scheduled_at: *first_scheduled_at,
67 component_digest: self.component_digest.clone(),
68 }
69 }
70
71 #[must_use]
74 pub fn can_be_retried_after(
75 temporary_event_count: u32,
76 max_retries: Option<u32>,
77 retry_exp_backoff: Duration,
78 ) -> Option<Duration> {
79 if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
81 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
83 Some(duration)
84 } else {
85 None
86 }
87 }
88
89 #[must_use]
90 pub fn compute_retry_duration_when_retrying_forever(
91 temporary_event_count: u32,
92 retry_exp_backoff: Duration,
93 ) -> Duration {
94 Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
95 .expect("`max_retries` set to MAX must never return None")
96 }
97
98 #[must_use]
99 pub fn ffqn(&self) -> &FunctionFqn {
100 assert_matches!(self.events.first(), Some(ExecutionEvent {
101 event: ExecutionRequest::Created { ffqn, .. },
102 ..
103 }) => ffqn)
104 }
105
106 #[must_use]
107 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
108 assert_matches!(self.events.first(), Some(ExecutionEvent {
109 event: ExecutionRequest::Created { parent, .. },
110 ..
111 }) => parent.clone())
112 }
113
114 #[must_use]
115 pub fn last_event(&self) -> &ExecutionEvent {
116 self.events.last().expect("must contain at least one event")
117 }
118
119 #[must_use]
120 pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
121 if let ExecutionEvent {
122 event: ExecutionRequest::Finished { result, .. },
123 ..
124 } = self.events.pop().expect("must contain at least one event")
125 {
126 Some(result)
127 } else {
128 None
129 }
130 }
131
132 #[cfg(feature = "test")]
133 pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
134 self.events.iter().filter_map(|event| {
135 if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
136 Some((eh.clone(), event.version.clone()))
137 } else {
138 None
139 }
140 })
141 }
142
143 #[cfg(feature = "test")]
144 #[must_use]
145 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
146 self.events
147 .iter()
148 .find_map(move |event| match &event.event {
149 ExecutionRequest::HistoryEvent {
150 event:
151 HistoryEvent::JoinSetRequest {
152 join_set_id: found,
153 request,
154 },
155 ..
156 } if *join_set_id == *found => Some(request),
157 _ => None,
158 })
159 }
160}
161
162pub type VersionType = u32;
163#[derive(
164 Debug,
165 Default,
166 Clone,
167 PartialEq,
168 Eq,
169 Hash,
170 derive_more::Display,
171 derive_more::Into,
172 serde::Serialize,
173 serde::Deserialize,
174)]
175#[serde(transparent)]
176pub struct Version(pub VersionType);
177impl Version {
178 #[must_use]
179 pub fn new(arg: VersionType) -> Version {
180 Version(arg)
181 }
182
183 #[must_use]
184 pub fn increment(&self) -> Version {
185 Version(self.0 + 1)
186 }
187}
188impl TryFrom<i64> for Version {
189 type Error = VersionParseError;
190 fn try_from(value: i64) -> Result<Self, Self::Error> {
191 VersionType::try_from(value)
192 .map(Version::new)
193 .map_err(|_| VersionParseError)
194 }
195}
196impl From<Version> for usize {
197 fn from(value: Version) -> Self {
198 usize::try_from(value.0).expect("16 bit systems are unsupported")
199 }
200}
201impl From<&Version> for usize {
202 fn from(value: &Version) -> Self {
203 usize::try_from(value.0).expect("16 bit systems are unsupported")
204 }
205}
206
207#[derive(Debug, thiserror::Error)]
208#[error("version must be u32")]
209pub struct VersionParseError;
210
211#[derive(
212 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
213)]
214#[display("{event}")]
215pub struct ExecutionEvent {
216 pub created_at: DateTime<Utc>,
217 pub event: ExecutionRequest,
218 #[serde(skip_serializing_if = "Option::is_none")]
219 pub backtrace_id: Option<Version>,
220 pub version: Version,
221}
222
223#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
225pub struct JoinSetResponseEventOuter {
226 pub created_at: DateTime<Utc>,
227 pub event: JoinSetResponseEvent,
228}
229
230#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
231pub struct JoinSetResponseEvent {
232 pub join_set_id: JoinSetId,
233 pub event: JoinSetResponse,
234}
235
236#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
237#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
238#[serde(tag = "type", rename_all = "snake_case")]
239pub enum JoinSetResponse {
240 #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
241 DelayFinished {
242 delay_id: DelayId,
243 result: Result<(), ()>,
244 },
245 #[display("{result}: {child_execution_id}")] ChildExecutionFinished {
247 child_execution_id: ExecutionIdDerived,
248 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
249 finished_version: Version,
250 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
251 result: SupportedFunctionReturnValue,
252 },
253}
254
255pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
256 ffqn: FunctionFqn::new_static("", ""),
257 params: Params::empty(),
258 parent: None,
259 scheduled_at: DateTime::from_timestamp_nanos(0),
260 component_id: ComponentId::dummy_activity(),
261 metadata: ExecutionMetadata::empty(),
262 scheduled_by: None,
263};
264pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
265 event: HistoryEvent::JoinSetCreate {
266 join_set_id: JoinSetId {
267 kind: crate::JoinSetKind::OneOff,
268 name: StrVariant::empty(),
269 },
270 },
271};
272
273#[derive(
274 Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
275)]
276#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
277#[serde(rename_all = "snake_case")]
278pub enum ExecutionRequest {
279 #[display("Created({ffqn}, `{scheduled_at}`)")]
280 Created {
281 ffqn: FunctionFqn,
282 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
283 #[debug(skip)]
284 params: Params,
285 parent: Option<(ExecutionId, JoinSetId)>,
286 scheduled_at: DateTime<Utc>,
287 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
288 component_id: ComponentId,
289 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
290 metadata: ExecutionMetadata,
291 scheduled_by: Option<ExecutionId>,
292 },
293 Locked(Locked),
294 #[display("Unlocked(`{backoff_expires_at}`)")]
299 Unlocked {
300 backoff_expires_at: DateTime<Utc>,
301 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
302 reason: StrVariant,
303 },
304 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
307 TemporarilyFailed {
308 backoff_expires_at: DateTime<Utc>,
309 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
310 reason: StrVariant,
311 detail: Option<String>,
312 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
313 http_client_traces: Option<Vec<HttpClientTrace>>,
314 },
315 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
318 TemporarilyTimedOut {
319 backoff_expires_at: DateTime<Utc>,
320 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
321 http_client_traces: Option<Vec<HttpClientTrace>>,
322 },
323 #[display("Finished")]
325 Finished {
326 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
327 result: SupportedFunctionReturnValue,
328 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
329 http_client_traces: Option<Vec<HttpClientTrace>>,
330 },
331
332 #[display("HistoryEvent({event})")]
333 HistoryEvent {
334 event: HistoryEvent,
335 },
336}
337
338impl ExecutionRequest {
339 #[must_use]
340 pub fn is_temporary_event(&self) -> bool {
341 matches!(
342 self,
343 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
344 )
345 }
346
347 #[must_use]
348 pub const fn variant(&self) -> &'static str {
349 match self {
350 ExecutionRequest::Created { .. } => "created",
351 ExecutionRequest::Locked(_) => "locked",
352 ExecutionRequest::Unlocked { .. } => "unlocked",
353 ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
354 ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
355 ExecutionRequest::Finished { .. } => "finished",
356 ExecutionRequest::HistoryEvent { .. } => "history_event",
357 }
358 }
359
360 #[must_use]
361 pub fn join_set_id(&self) -> Option<&JoinSetId> {
362 match self {
363 Self::Created {
364 parent: Some((_parent_id, join_set_id)),
365 ..
366 } => Some(join_set_id),
367 Self::HistoryEvent {
368 event:
369 HistoryEvent::JoinSetCreate { join_set_id, .. }
370 | HistoryEvent::JoinSetRequest { join_set_id, .. }
371 | HistoryEvent::JoinNext { join_set_id, .. },
372 } => Some(join_set_id),
373 _ => None,
374 }
375 }
376}
377
378#[derive(
379 Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
380)]
381#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
382#[display("Locked(`{lock_expires_at}`, {component_id})")]
383pub struct Locked {
384 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
385 pub component_id: ComponentId,
386 pub executor_id: ExecutorId,
387 pub run_id: RunId,
388 pub lock_expires_at: DateTime<Utc>,
389 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
390 pub retry_config: ComponentRetryConfig,
391}
392
393#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
394#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
395#[serde(tag = "type", rename_all = "snake_case")]
396pub enum PersistKind {
397 #[display("RandomU64({min}, {max_inclusive})")]
398 RandomU64 { min: u64, max_inclusive: u64 },
399 #[display("RandomString({min_length}, {max_length_exclusive})")]
400 RandomString {
401 min_length: u64,
402 max_length_exclusive: u64,
403 },
404}
405
406#[must_use]
407pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
408 value.to_be_bytes()
409}
410
411#[must_use]
412pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
413 u64::from_be_bytes(bytes)
414}
415
416#[derive(
417 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
418)]
419#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
420#[serde(tag = "type", rename_all = "snake_case")]
421pub enum HistoryEvent {
423 #[display("Persist")]
425 Persist {
426 #[debug(skip)]
427 value: Vec<u8>, kind: PersistKind,
429 },
430 #[display("JoinSetCreate({join_set_id})")]
431 JoinSetCreate { join_set_id: JoinSetId },
432 #[display("JoinSetRequest({request})")]
433 JoinSetRequest {
435 join_set_id: JoinSetId,
436 request: JoinSetRequest,
437 },
438 #[display("JoinNext({join_set_id})")]
444 JoinNext {
445 join_set_id: JoinSetId,
446 run_expires_at: DateTime<Utc>,
449 requested_ffqn: Option<FunctionFqn>,
452 closing: bool,
454 },
455 #[display("JoinNextTooMany({join_set_id})")]
457 JoinNextTooMany {
458 join_set_id: JoinSetId,
459 requested_ffqn: Option<FunctionFqn>,
462 },
463 #[display("Schedule({execution_id}, {schedule_at})")]
464 Schedule {
465 execution_id: ExecutionId,
466 schedule_at: HistoryEventScheduleAt, },
468 #[display("Stub({target_execution_id})")]
469 Stub {
470 target_execution_id: ExecutionIdDerived,
471 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
472 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
475}
476
477#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
478#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
479#[serde(rename_all = "snake_case")]
480pub enum HistoryEventScheduleAt {
481 Now,
482 #[display("At(`{_0}`)")]
483 At(DateTime<Utc>),
484 #[display("In({_0:?})")]
485 In(Duration),
486}
487
488#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
489pub enum ScheduleAtConversionError {
490 #[error("source duration value is out of range")]
491 OutOfRangeError,
492}
493
494impl HistoryEventScheduleAt {
495 pub fn as_date_time(
496 &self,
497 now: DateTime<Utc>,
498 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
499 match self {
500 Self::Now => Ok(now),
501 Self::At(date_time) => Ok(*date_time),
502 Self::In(duration) => {
503 let time_delta = TimeDelta::from_std(*duration)
504 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
505 now.checked_add_signed(time_delta)
506 .ok_or(ScheduleAtConversionError::OutOfRangeError)
507 }
508 }
509 }
510}
511
512#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
513#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
514#[serde(tag = "type", rename_all = "snake_case")]
515pub enum JoinSetRequest {
516 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
518 DelayRequest {
519 delay_id: DelayId,
520 expires_at: DateTime<Utc>,
521 schedule_at: HistoryEventScheduleAt,
522 },
523 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
525 ChildExecutionRequest {
526 child_execution_id: ExecutionIdDerived,
527 target_ffqn: FunctionFqn,
528 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
529 params: Params,
530 },
531}
532
533#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
535pub enum DbErrorGeneric {
536 #[error("database error: {reason}")]
537 Uncategorized {
538 reason: StrVariant,
539 #[eq(skip)]
540 #[partial_eq(skip)]
541 context: SpanTrace,
542 #[eq(skip)]
543 #[partial_eq(skip)]
544 #[source]
545 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
546 loc: &'static Location<'static>,
547 },
548 #[error("database was closed")]
549 Close,
550}
551
552#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
553pub enum DbErrorWriteNonRetriable {
554 #[error("validation failed: {0}")]
555 ValidationFailed(StrVariant),
556 #[error("conflict")]
557 Conflict,
558 #[error("illegal state: {reason}")]
559 IllegalState {
560 reason: StrVariant,
561 #[eq(skip)]
562 #[partial_eq(skip)]
563 context: SpanTrace,
564 #[eq(skip)]
565 #[partial_eq(skip)]
566 #[source]
567 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
568 loc: &'static Location<'static>,
569 },
570 #[error("version conflict: expected: {expected}, got: {requested}")]
571 VersionConflict {
572 expected: Version,
573 requested: Version,
574 },
575}
576
577#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
579pub enum DbErrorWrite {
580 #[error("cannot write - row not found")]
581 NotFound,
582 #[error("non-retriable error: {0}")]
583 NonRetriable(#[from] DbErrorWriteNonRetriable),
584 #[error(transparent)]
585 Generic(#[from] DbErrorGeneric),
586}
587
588#[derive(Debug, Clone, thiserror::Error, PartialEq)]
590pub enum DbErrorRead {
591 #[error("cannot read - row not found")]
592 NotFound,
593 #[error(transparent)]
594 Generic(#[from] DbErrorGeneric),
595}
596
597#[derive(Debug, thiserror::Error, PartialEq)]
598pub enum DbErrorReadWithTimeout {
599 #[error("timeout")]
600 Timeout(TimeoutOutcome),
601 #[error(transparent)]
602 DbErrorRead(#[from] DbErrorRead),
603}
604
605pub type AppendResponse = Version;
608pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
609
610#[derive(Debug, Clone)]
611pub struct LockedExecution {
612 pub execution_id: ExecutionId,
613 pub next_version: Version,
614 pub metadata: ExecutionMetadata,
615 pub locked_event: Locked,
616 pub ffqn: FunctionFqn,
617 pub params: Params,
618 pub event_history: Vec<(HistoryEvent, Version)>,
619 pub responses: Vec<JoinSetResponseEventOuter>,
620 pub parent: Option<(ExecutionId, JoinSetId)>,
621 pub intermittent_event_count: u32,
622}
623
624pub type LockPendingResponse = Vec<LockedExecution>;
625pub type AppendBatchResponse = Version;
626
627#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
628#[display("{event}")]
629pub struct AppendRequest {
630 pub created_at: DateTime<Utc>,
631 pub event: ExecutionRequest,
632}
633
634#[derive(Debug, Clone)]
635pub struct CreateRequest {
636 pub created_at: DateTime<Utc>,
637 pub execution_id: ExecutionId,
638 pub ffqn: FunctionFqn,
639 pub params: Params,
640 pub parent: Option<(ExecutionId, JoinSetId)>,
641 pub scheduled_at: DateTime<Utc>,
642 pub component_id: ComponentId,
643 pub metadata: ExecutionMetadata,
644 pub scheduled_by: Option<ExecutionId>,
645}
646
647impl From<CreateRequest> for ExecutionRequest {
648 fn from(value: CreateRequest) -> Self {
649 Self::Created {
650 ffqn: value.ffqn,
651 params: value.params,
652 parent: value.parent,
653 scheduled_at: value.scheduled_at,
654 component_id: value.component_id,
655 metadata: value.metadata,
656 scheduled_by: value.scheduled_by,
657 }
658 }
659}
660
661#[async_trait]
662pub trait DbPool: Send + Sync {
663 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
664
665 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
666
667 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
668
669 #[cfg(feature = "test")]
670 async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
671}
672
673#[async_trait]
674pub trait DbPoolCloseable {
675 async fn close(&self);
676}
677
678#[derive(Clone, Debug)]
679pub struct AppendEventsToExecution {
680 pub execution_id: ExecutionId,
681 pub version: Version,
682 pub batch: Vec<AppendRequest>,
683}
684
685#[derive(Clone, Debug)]
686pub struct AppendResponseToExecution {
687 pub parent_execution_id: ExecutionId,
688 pub created_at: DateTime<Utc>,
689 pub join_set_id: JoinSetId,
690 pub child_execution_id: ExecutionIdDerived,
691 pub finished_version: Version,
692 pub result: SupportedFunctionReturnValue,
693}
694
695#[async_trait]
696pub trait DbExecutor: Send + Sync {
697 #[expect(clippy::too_many_arguments)]
698 async fn lock_pending_by_ffqns(
699 &self,
700 batch_size: u32,
701 pending_at_or_sooner: DateTime<Utc>,
702 ffqns: Arc<[FunctionFqn]>,
703 created_at: DateTime<Utc>,
704 component_id: ComponentId,
705 executor_id: ExecutorId,
706 lock_expires_at: DateTime<Utc>,
707 run_id: RunId,
708 retry_config: ComponentRetryConfig,
709 ) -> Result<LockPendingResponse, DbErrorGeneric>;
710
711 #[expect(clippy::too_many_arguments)]
712 async fn lock_pending_by_component_id(
713 &self,
714 batch_size: u32,
715 pending_at_or_sooner: DateTime<Utc>,
716 component_id: &ComponentId,
717 created_at: DateTime<Utc>,
718 executor_id: ExecutorId,
719 lock_expires_at: DateTime<Utc>,
720 run_id: RunId,
721 retry_config: ComponentRetryConfig,
722 ) -> Result<LockPendingResponse, DbErrorGeneric>;
723
724 #[expect(clippy::too_many_arguments)]
726 async fn lock_one(
727 &self,
728 created_at: DateTime<Utc>,
729 component_id: ComponentId,
730 execution_id: &ExecutionId,
731 run_id: RunId,
732 version: Version,
733 executor_id: ExecutorId,
734 lock_expires_at: DateTime<Utc>,
735 retry_config: ComponentRetryConfig,
736 ) -> Result<LockedExecution, DbErrorWrite>;
737
738 async fn append(
741 &self,
742 execution_id: ExecutionId,
743 version: Version,
744 req: AppendRequest,
745 ) -> Result<AppendResponse, DbErrorWrite>;
746
747 async fn append_batch_respond_to_parent(
750 &self,
751 events: AppendEventsToExecution,
752 response: AppendResponseToExecution,
753 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
755
756 async fn wait_for_pending_by_ffqn(
761 &self,
762 pending_at_or_sooner: DateTime<Utc>,
763 ffqns: Arc<[FunctionFqn]>,
764 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
765 );
766
767 async fn wait_for_pending_by_component_id(
772 &self,
773 pending_at_or_sooner: DateTime<Utc>,
774 component_id: &ComponentId,
775 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
776 );
777
778 async fn cancel_activity_with_retries(
779 &self,
780 execution_id: &ExecutionId,
781 cancelled_at: DateTime<Utc>,
782 ) -> Result<CancelOutcome, DbErrorWrite> {
783 let mut retries = 5;
784 loop {
785 let res = self.cancel_activity(execution_id, cancelled_at).await;
786 if res.is_ok() || retries == 0 {
787 return res;
788 }
789 retries -= 1;
790 }
791 }
792
793 async fn get_last_execution_event(
795 &self,
796 execution_id: &ExecutionId,
797 ) -> Result<ExecutionEvent, DbErrorRead>;
798
799 async fn cancel_activity(
800 &self,
801 execution_id: &ExecutionId,
802 cancelled_at: DateTime<Utc>,
803 ) -> Result<CancelOutcome, DbErrorWrite> {
804 debug!("Determining cancellation state of {execution_id}");
805
806 let last_event = self
807 .get_last_execution_event(execution_id)
808 .await
809 .map_err(DbErrorWrite::from)?;
810 if let ExecutionRequest::Finished {
811 result:
812 SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
813 kind: ExecutionFailureKind::Cancelled,
814 ..
815 }),
816 ..
817 } = last_event.event
818 {
819 return Ok(CancelOutcome::Cancelled);
820 } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
821 debug!("Not cancelling, {execution_id} is already finished");
822 return Ok(CancelOutcome::AlreadyFinished);
823 }
824 let finished_version = last_event.version.increment();
825 let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
826 reason: None,
827 kind: ExecutionFailureKind::Cancelled,
828 detail: None,
829 });
830 let cancel_request = AppendRequest {
831 created_at: cancelled_at,
832 event: ExecutionRequest::Finished {
833 result: child_result.clone(),
834 http_client_traces: None,
835 },
836 };
837 debug!("Cancelling activity {execution_id} at {finished_version}");
838 if let ExecutionId::Derived(execution_id) = execution_id {
839 let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
840 let child_execution_id = ExecutionId::Derived(execution_id.clone());
841 self.append_batch_respond_to_parent(
842 AppendEventsToExecution {
843 execution_id: child_execution_id,
844 version: finished_version.clone(),
845 batch: vec![cancel_request],
846 },
847 AppendResponseToExecution {
848 parent_execution_id,
849 created_at: cancelled_at,
850 join_set_id: join_set_id.clone(),
851 child_execution_id: execution_id.clone(),
852 finished_version,
853 result: child_result,
854 },
855 cancelled_at,
856 )
857 .await?;
858 } else {
859 self.append(execution_id.clone(), finished_version, cancel_request)
860 .await?;
861 }
862 debug!("Cancelled {execution_id}");
863 Ok(CancelOutcome::Cancelled)
864 }
865}
866
867pub enum AppendDelayResponseOutcome {
868 Success,
869 AlreadyFinished,
870 AlreadyCancelled,
871}
872
873#[derive(Debug, Clone, Default)]
874pub struct ListExecutionsFilter {
875 pub ffqn_prefix: Option<String>,
876 pub show_derived: bool,
877 pub hide_finished: bool,
878 pub execution_id_prefix: Option<String>,
879}
880
881#[async_trait]
882pub trait DbExternalApi: DbConnection {
883 async fn get_backtrace(
885 &self,
886 execution_id: &ExecutionId,
887 filter: BacktraceFilter,
888 ) -> Result<BacktraceInfo, DbErrorRead>;
889
890 async fn list_executions(
892 &self,
893 filter: ListExecutionsFilter,
894 pagination: ExecutionListPagination,
895 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
896
897 async fn list_execution_events(
898 &self,
899 execution_id: &ExecutionId,
900 since: &Version,
901 max_length: VersionType,
902 include_backtrace_id: bool,
903 ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
904
905 async fn list_responses(
908 &self,
909 execution_id: &ExecutionId,
910 pagination: Pagination<u32>,
911 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
912
913 async fn list_execution_events_responses(
914 &self,
915 execution_id: &ExecutionId,
916 req_since: &Version,
917 req_max_length: VersionType,
918 req_include_backtrace_id: bool,
919 resp_pagination: Pagination<u32>,
920 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
921
922 async fn upgrade_execution_component(
923 &self,
924 execution_id: &ExecutionId,
925 old: &InputContentDigest,
926 new: &InputContentDigest,
927 ) -> Result<(), DbErrorWrite>;
928}
929
930#[derive(Debug, Clone)]
931pub struct ExecutionWithStateRequestsResponses {
932 pub execution_with_state: ExecutionWithState,
933 pub events: Vec<ExecutionEvent>,
934 pub responses: Vec<ResponseWithCursor>,
935}
936
937#[async_trait]
938pub trait DbConnection: DbExecutor {
939 async fn append_delay_response(
940 &self,
941 created_at: DateTime<Utc>,
942 execution_id: ExecutionId,
943 join_set_id: JoinSetId,
944 delay_id: DelayId,
945 outcome: Result<(), ()>, ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
947
948 async fn append_batch(
951 &self,
952 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
954 execution_id: ExecutionId,
955 version: Version,
956 ) -> Result<AppendBatchResponse, DbErrorWrite>;
957
958 async fn append_batch_create_new_execution(
961 &self,
962 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
965 version: Version,
966 child_req: Vec<CreateRequest>,
967 ) -> Result<AppendBatchResponse, DbErrorWrite>;
968
969 async fn get_execution_event(
971 &self,
972 execution_id: &ExecutionId,
973 version: &Version,
974 ) -> Result<ExecutionEvent, DbErrorRead>;
975
976 #[instrument(skip(self))]
977 async fn get_create_request(
978 &self,
979 execution_id: &ExecutionId,
980 ) -> Result<CreateRequest, DbErrorRead> {
981 let execution_event = self
982 .get_execution_event(execution_id, &Version::new(0))
983 .await?;
984 if let ExecutionRequest::Created {
985 ffqn,
986 params,
987 parent,
988 scheduled_at,
989 component_id,
990 metadata,
991 scheduled_by,
992 } = execution_event.event
993 {
994 Ok(CreateRequest {
995 created_at: execution_event.created_at,
996 execution_id: execution_id.clone(),
997 ffqn,
998 params,
999 parent,
1000 scheduled_at,
1001 component_id,
1002 metadata,
1003 scheduled_by,
1004 })
1005 } else {
1006 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1007 reason: "execution log must start with creation".into(),
1008 context: SpanTrace::capture(),
1009 source: None,
1010 loc: Location::caller(),
1011 }))
1012 }
1013 }
1014
1015 async fn get_pending_state(
1016 &self,
1017 execution_id: &ExecutionId,
1018 ) -> Result<ExecutionWithState, DbErrorRead>;
1019
1020 async fn get_expired_timers(
1022 &self,
1023 at: DateTime<Utc>,
1024 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1025
1026 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1028
1029 async fn subscribe_to_next_responses(
1036 &self,
1037 execution_id: &ExecutionId,
1038 start_idx: u32,
1039 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1040 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
1041
1042 async fn wait_for_finished_result(
1047 &self,
1048 execution_id: &ExecutionId,
1049 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1050 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1052
1053 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1054
1055 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1056}
1057
1058#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1059pub enum TimeoutOutcome {
1060 Timeout,
1061 Cancel,
1062}
1063
1064#[cfg(feature = "test")]
1065#[async_trait]
1066pub trait DbConnectionTest: DbConnection {
1067 async fn append_response(
1068 &self,
1069 created_at: DateTime<Utc>,
1070 execution_id: ExecutionId,
1071 response_event: JoinSetResponseEvent,
1072 ) -> Result<(), DbErrorWrite>;
1073
1074 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1076}
1077
1078#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1079pub enum CancelOutcome {
1080 Cancelled,
1081 AlreadyFinished,
1082}
1083
1084#[instrument(skip(db_connection))]
1085pub async fn stub_execution(
1086 db_connection: &dyn DbConnection,
1087 execution_id: ExecutionIdDerived,
1088 parent_execution_id: ExecutionId,
1089 join_set_id: JoinSetId,
1090 created_at: DateTime<Utc>,
1091 return_value: SupportedFunctionReturnValue,
1092) -> Result<(), DbErrorWrite> {
1093 let stub_finished_version = Version::new(1); let write_attempt = {
1096 let finished_req = AppendRequest {
1097 created_at,
1098 event: ExecutionRequest::Finished {
1099 result: return_value.clone(),
1100 http_client_traces: None,
1101 },
1102 };
1103 db_connection
1104 .append_batch_respond_to_parent(
1105 AppendEventsToExecution {
1106 execution_id: ExecutionId::Derived(execution_id.clone()),
1107 version: stub_finished_version.clone(),
1108 batch: vec![finished_req],
1109 },
1110 AppendResponseToExecution {
1111 parent_execution_id,
1112 created_at,
1113 join_set_id,
1114 child_execution_id: execution_id.clone(),
1115 finished_version: stub_finished_version.clone(),
1116 result: return_value.clone(),
1117 },
1118 created_at,
1119 )
1120 .await
1121 };
1122 if let Err(write_attempt) = write_attempt {
1123 debug!("Stub write attempt failed - {write_attempt:?}");
1125
1126 let found = db_connection
1127 .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1128 .await?; match found.event {
1130 ExecutionRequest::Finished {
1131 result: found_result,
1132 ..
1133 } if return_value == found_result => {
1134 Ok(())
1136 }
1137 ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1138 DbErrorWriteNonRetriable::Conflict,
1139 )),
1140 _other => Err(DbErrorWrite::NonRetriable(
1141 DbErrorWriteNonRetriable::IllegalState {
1142 reason: "unexpected execution event at stubbed execution".into(),
1143 context: SpanTrace::capture(),
1144 source: None,
1145 loc: Location::caller(),
1146 },
1147 )),
1148 }
1149 } else {
1150 Ok(())
1151 }
1152}
1153
1154pub async fn cancel_delay(
1155 db_connection: &dyn DbConnection,
1156 delay_id: DelayId,
1157 created_at: DateTime<Utc>,
1158) -> Result<CancelOutcome, DbErrorWrite> {
1159 let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1160 db_connection
1161 .append_delay_response(
1162 created_at,
1163 parent_execution_id,
1164 join_set_id,
1165 delay_id,
1166 Err(()), )
1168 .await
1169 .map(|ok| match ok {
1170 AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1171 CancelOutcome::Cancelled
1172 }
1173 AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1174 })
1175}
1176
1177#[derive(Clone, Debug)]
1178pub enum BacktraceFilter {
1179 First,
1180 Last,
1181 Specific(Version),
1182}
1183
1184#[derive(Clone, Debug, PartialEq, Eq)]
1185pub struct BacktraceInfo {
1186 pub execution_id: ExecutionId,
1187 pub component_id: ComponentId,
1188 pub version_min_including: Version,
1189 pub version_max_excluding: Version,
1190 pub wasm_backtrace: WasmBacktrace,
1191}
1192
1193#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1194pub struct WasmBacktrace {
1195 pub frames: Vec<FrameInfo>,
1196}
1197
1198#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1199pub struct FrameInfo {
1200 pub module: String,
1201 pub func_name: String,
1202 pub symbols: Vec<FrameSymbol>,
1203}
1204
1205#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1206pub struct FrameSymbol {
1207 pub func_name: Option<String>,
1208 pub file: Option<String>,
1209 pub line: Option<u32>,
1210 pub col: Option<u32>,
1211}
1212
1213mod wasm_backtrace {
1214 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1215
1216 impl WasmBacktrace {
1217 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1218 if backtrace.frames().is_empty() {
1219 None
1220 } else {
1221 Some(Self {
1222 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1223 })
1224 }
1225 }
1226 }
1227
1228 impl From<&wasmtime::FrameInfo> for FrameInfo {
1229 fn from(frame: &wasmtime::FrameInfo) -> Self {
1230 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1231 let mut func_name = String::new();
1232 wasmtime_environ::demangle_function_name_or_index(
1233 &mut func_name,
1234 frame.func_name(),
1235 frame.func_index() as usize,
1236 )
1237 .expect("writing to string must succeed");
1238 Self {
1239 module: module_name,
1240 func_name,
1241 symbols: frame
1242 .symbols()
1243 .iter()
1244 .map(std::convert::Into::into)
1245 .collect(),
1246 }
1247 }
1248 }
1249
1250 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1251 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1252 let func_name = symbol.name().map(|name| {
1253 let mut writer = String::new();
1254 wasmtime_environ::demangle_function_name(&mut writer, name)
1255 .expect("writing to string must succeed");
1256 writer
1257 });
1258
1259 Self {
1260 func_name,
1261 file: symbol.file().map(ToString::to_string),
1262 line: symbol.line(),
1263 col: symbol.column(),
1264 }
1265 }
1266 }
1267}
1268
1269pub type ResponseCursorType = u32;
1270
1271#[derive(Debug, Clone, Serialize)]
1272pub struct ResponseWithCursor {
1273 pub event: JoinSetResponseEventOuter,
1274 pub cursor: ResponseCursorType,
1275}
1276
1277#[derive(Debug, Clone, Serialize, derive_more::Display)]
1278#[display("{execution_id} {pending_state} {component_digest}")]
1279pub struct ExecutionWithState {
1280 pub execution_id: ExecutionId,
1281 pub ffqn: FunctionFqn,
1282 pub pending_state: PendingState,
1283 pub created_at: DateTime<Utc>,
1284 pub first_scheduled_at: DateTime<Utc>,
1285 pub component_digest: InputContentDigest,
1286}
1287
1288#[derive(Debug, Clone)]
1289pub enum ExecutionListPagination {
1290 CreatedBy(Pagination<Option<DateTime<Utc>>>),
1291 ExecutionId(Pagination<Option<ExecutionId>>),
1292}
1293impl Default for ExecutionListPagination {
1294 fn default() -> ExecutionListPagination {
1295 ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1296 length: 20,
1297 cursor: None,
1298 including_cursor: false, })
1300 }
1301}
1302impl ExecutionListPagination {
1303 #[must_use]
1304 pub fn length(&self) -> u16 {
1305 match self {
1306 ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1307 ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1308 }
1309 }
1310}
1311
1312#[derive(Debug, Clone, Copy)]
1313pub enum Pagination<T> {
1314 NewerThan {
1315 length: u16,
1316 cursor: T,
1317 including_cursor: bool,
1318 },
1319 OlderThan {
1320 length: u16,
1321 cursor: T,
1322 including_cursor: bool,
1323 },
1324}
1325impl<T> Pagination<T> {
1326 pub fn length(&self) -> u16 {
1327 match self {
1328 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1329 }
1330 }
1331 pub fn rel(&self) -> &'static str {
1332 match self {
1333 Pagination::NewerThan {
1334 including_cursor: false,
1335 ..
1336 } => ">",
1337 Pagination::NewerThan {
1338 including_cursor: true,
1339 ..
1340 } => ">=",
1341 Pagination::OlderThan {
1342 including_cursor: false,
1343 ..
1344 } => "<",
1345 Pagination::OlderThan {
1346 including_cursor: true,
1347 ..
1348 } => "<=",
1349 }
1350 }
1351 pub fn is_desc(&self) -> bool {
1352 matches!(self, Pagination::OlderThan { .. })
1353 }
1354 pub fn cursor(&self) -> &T {
1355 match self {
1356 Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1357 }
1358 }
1359}
1360
1361#[cfg(feature = "test")]
1362pub async fn wait_for_pending_state_fn<T: Debug>(
1363 db_connection: &dyn DbConnectionTest,
1364 execution_id: &ExecutionId,
1365 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1366 timeout: Option<Duration>,
1367) -> Result<T, DbErrorReadWithTimeout> {
1368 tracing::trace!(%execution_id, "Waiting for predicate");
1369 let fut = async move {
1370 loop {
1371 let execution_log = db_connection.get(execution_id).await?;
1372 if let Some(t) = predicate(execution_log) {
1373 tracing::debug!(%execution_id, "Found: {t:?}");
1374 return Ok(t);
1375 }
1376 tokio::time::sleep(Duration::from_millis(10)).await;
1377 }
1378 };
1379
1380 if let Some(timeout) = timeout {
1381 tokio::select! { res = fut => res,
1383 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
1384 }
1385 } else {
1386 fut.await
1387 }
1388}
1389
1390#[derive(Debug, Clone, PartialEq, Eq)]
1391pub enum ExpiredTimer {
1392 Lock(ExpiredLock),
1393 Delay(ExpiredDelay),
1394}
1395
1396#[derive(Debug, Clone, PartialEq, Eq)]
1397pub struct ExpiredLock {
1398 pub execution_id: ExecutionId,
1399 pub locked_at_version: Version,
1401 pub next_version: Version,
1402 pub intermittent_event_count: u32,
1404 pub max_retries: Option<u32>,
1405 pub retry_exp_backoff: Duration,
1406 pub locked_by: LockedBy,
1407}
1408
1409#[derive(Debug, Clone, PartialEq, Eq)]
1410pub struct ExpiredDelay {
1411 pub execution_id: ExecutionId,
1412 pub join_set_id: JoinSetId,
1413 pub delay_id: DelayId,
1414}
1415
1416#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1417#[serde(tag = "status", rename_all = "snake_case")]
1418pub enum PendingState {
1419 Locked(PendingStateLocked),
1420 #[display("PendingAt(`{scheduled_at}`)")]
1421 PendingAt {
1422 scheduled_at: DateTime<Utc>,
1423 last_lock: Option<LockedBy>, }, #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1426 BlockedByJoinSet {
1428 join_set_id: JoinSetId,
1429 lock_expires_at: DateTime<Utc>,
1431 closing: bool,
1433 },
1434 #[display("Finished({finished})")]
1435 Finished {
1436 #[serde(flatten)]
1437 finished: PendingStateFinished,
1438 },
1439}
1440
1441#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1442#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1443pub struct PendingStateLocked {
1444 pub locked_by: LockedBy,
1445 pub lock_expires_at: DateTime<Utc>,
1446}
1447
1448#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1449pub struct LockedBy {
1450 pub executor_id: ExecutorId,
1451 pub run_id: RunId,
1452}
1453
1454#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1455pub struct PendingStateFinished {
1456 pub version: VersionType, pub finished_at: DateTime<Utc>,
1458 pub result_kind: PendingStateFinishedResultKind,
1459}
1460impl Display for PendingStateFinished {
1461 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1462 match self.result_kind {
1463 PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1464 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1465 }
1466 }
1467}
1468
1469#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1471#[serde(rename_all = "snake_case")]
1472pub enum PendingStateFinishedResultKind {
1473 Ok,
1474 Err(PendingStateFinishedError),
1475}
1476impl PendingStateFinishedResultKind {
1477 pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1478 match self {
1479 PendingStateFinishedResultKind::Ok => Ok(()),
1480 PendingStateFinishedResultKind::Err(err) => Err(err),
1481 }
1482 }
1483}
1484
1485impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1486 fn from(result: &SupportedFunctionReturnValue) -> Self {
1487 result.as_pending_state_finished_result()
1488 }
1489}
1490
1491#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1492#[serde(rename_all = "snake_case")]
1493pub enum PendingStateFinishedError {
1494 #[display("execution terminated: {_0}")]
1495 ExecutionFailure(ExecutionFailureKind),
1496 #[display("execution completed with an error")]
1497 Error,
1498}
1499
1500impl PendingState {
1501 #[instrument(skip(self))]
1502 pub fn can_append_lock(
1503 &self,
1504 created_at: DateTime<Utc>,
1505 executor_id: ExecutorId,
1506 run_id: RunId,
1507 lock_expires_at: DateTime<Utc>,
1508 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1509 if lock_expires_at <= created_at {
1510 return Err(DbErrorWriteNonRetriable::ValidationFailed(
1511 "invalid expiry date".into(),
1512 ));
1513 }
1514 match self {
1515 PendingState::PendingAt {
1516 scheduled_at,
1517 last_lock,
1518 } => {
1519 if *scheduled_at <= created_at {
1520 Ok(LockKind::CreatingNewLock)
1522 } else if let Some(LockedBy {
1523 executor_id: last_executor_id,
1524 run_id: last_run_id,
1525 }) = last_lock
1526 && executor_id == *last_executor_id
1527 && run_id == *last_run_id
1528 {
1529 Ok(LockKind::Extending)
1531 } else {
1532 Err(DbErrorWriteNonRetriable::ValidationFailed(
1533 "cannot lock, not yet pending".into(),
1534 ))
1535 }
1536 }
1537 PendingState::Locked(PendingStateLocked {
1538 locked_by:
1539 LockedBy {
1540 executor_id: current_pending_state_executor_id,
1541 run_id: current_pending_state_run_id,
1542 },
1543 lock_expires_at: _,
1544 }) => {
1545 if executor_id == *current_pending_state_executor_id
1546 && run_id == *current_pending_state_run_id
1547 {
1548 Ok(LockKind::Extending)
1550 } else {
1551 Err(DbErrorWriteNonRetriable::IllegalState {
1552 reason: "cannot lock, already locked".into(),
1553 context: SpanTrace::capture(),
1554 source: None,
1555 loc: Location::caller(),
1556 })
1557 }
1558 }
1559 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
1560 reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1561 context: SpanTrace::capture(),
1562 source: None,
1563 loc: Location::caller(),
1564 }),
1565 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
1566 reason: "already finished".into(),
1567 context: SpanTrace::capture(),
1568 source: None,
1569 loc: Location::caller(),
1570 }),
1571 }
1572 }
1573
1574 #[must_use]
1575 pub fn is_finished(&self) -> bool {
1576 matches!(self, PendingState::Finished { .. })
1577 }
1578}
1579
1580#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1581pub enum LockKind {
1582 Extending,
1583 CreatingNewLock,
1584}
1585
1586pub mod http_client_trace {
1587 use chrono::{DateTime, Utc};
1588 use serde::{Deserialize, Serialize};
1589
1590 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1591 pub struct HttpClientTrace {
1592 pub req: RequestTrace,
1593 pub resp: Option<ResponseTrace>,
1594 }
1595
1596 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1597 pub struct RequestTrace {
1598 pub sent_at: DateTime<Utc>,
1599 pub uri: String,
1600 pub method: String,
1601 }
1602
1603 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1604 pub struct ResponseTrace {
1605 pub finished_at: DateTime<Utc>,
1606 pub status: Result<u16, String>,
1607 }
1608}
1609
1610#[cfg(test)]
1611mod tests {
1612 use super::HistoryEventScheduleAt;
1613 use super::PendingStateFinished;
1614 use super::PendingStateFinishedError;
1615 use super::PendingStateFinishedResultKind;
1616 use crate::ExecutionFailureKind;
1617 use crate::SupportedFunctionReturnValue;
1618 use chrono::DateTime;
1619 use chrono::Datelike;
1620 use chrono::Utc;
1621 use insta::assert_snapshot;
1622 use rstest::rstest;
1623 use std::time::Duration;
1624 use val_json::type_wrapper::TypeWrapper;
1625 use val_json::wast_val::WastVal;
1626 use val_json::wast_val::WastValWithType;
1627
1628 #[rstest(expected => [
1629 PendingStateFinishedResultKind::Ok,
1630 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1631 ])]
1632 #[test]
1633 fn serde_pending_state_finished_result_kind_should_work(
1634 expected: PendingStateFinishedResultKind,
1635 ) {
1636 let ser = serde_json::to_string(&expected).unwrap();
1637 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1638 assert_eq!(expected, actual);
1639 }
1640
1641 #[rstest(result_kind => [
1642 PendingStateFinishedResultKind::Ok,
1643 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1644 ])]
1645 #[test]
1646 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1647 let expected = PendingStateFinished {
1648 version: 0,
1649 finished_at: Utc::now(),
1650 result_kind,
1651 };
1652
1653 let ser = serde_json::to_string(&expected).unwrap();
1654 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1655 assert_eq!(expected, actual);
1656 }
1657
1658 #[test]
1659 fn join_set_deser_with_result_ok_option_none_should_work() {
1660 let expected = SupportedFunctionReturnValue::Ok {
1661 ok: Some(WastValWithType {
1662 r#type: TypeWrapper::Result {
1663 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1664 err: Some(Box::new(TypeWrapper::String)),
1665 },
1666 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1667 }),
1668 };
1669 let json = serde_json::to_string(&expected).unwrap();
1670 assert_snapshot!(json);
1671
1672 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1673
1674 assert_eq!(expected, actual);
1675 }
1676
1677 #[test]
1678 fn as_date_time_should_work_with_duration_u32_max_secs() {
1679 let duration = Duration::from_secs(u64::from(u32::MAX));
1680 let schedule_at = HistoryEventScheduleAt::In(duration);
1681 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1682 assert_eq!(2106, resolved.year());
1683 }
1684
1685 const MILLIS_PER_SEC: i64 = 1000;
1686 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1687
1688 #[test]
1689 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1690 let duration = Duration::from_secs(
1692 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1693 );
1694 let schedule_at = HistoryEventScheduleAt::In(duration);
1695 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1696 }
1697}