1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ExecutionFailureKind;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FinishedExecutionError;
7use crate::FunctionFqn;
8use crate::JoinSetId;
9use crate::Params;
10use crate::StrVariant;
11use crate::SupportedFunctionReturnValue;
12use crate::component_id::InputContentDigest;
13use crate::prefixed_ulid::DelayId;
14use crate::prefixed_ulid::ExecutionIdDerived;
15use crate::prefixed_ulid::ExecutorId;
16use crate::prefixed_ulid::RunId;
17use assert_matches::assert_matches;
18use async_trait::async_trait;
19use chrono::TimeDelta;
20use chrono::{DateTime, Utc};
21use http_client_trace::HttpClientTrace;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Display;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::time::Duration;
29use strum::IntoStaticStr;
30use tracing::debug;
31use tracing::error;
32
33pub const STATE_PENDING_AT: &str = "PendingAt";
34pub const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
35pub const STATE_LOCKED: &str = "Locked";
36pub const STATE_FINISHED: &str = "Finished";
37pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
38
39#[derive(Debug, PartialEq, Eq)]
41#[cfg_attr(feature = "test", derive(Serialize))]
42pub struct ExecutionLog {
43 pub execution_id: ExecutionId,
44 pub events: Vec<ExecutionEvent>,
45 pub responses: Vec<JoinSetResponseEventOuter>,
46 pub next_version: Version, pub pending_state: PendingState,
48}
49
50impl ExecutionLog {
51 #[must_use]
54 pub fn can_be_retried_after(
55 temporary_event_count: u32,
56 max_retries: Option<u32>,
57 retry_exp_backoff: Duration,
58 ) -> Option<Duration> {
59 if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
61 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
63 Some(duration)
64 } else {
65 None
66 }
67 }
68
69 #[must_use]
70 pub fn compute_retry_duration_when_retrying_forever(
71 temporary_event_count: u32,
72 retry_exp_backoff: Duration,
73 ) -> Duration {
74 Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
75 .expect("`max_retries` set to MAX must never return None")
76 }
77
78 #[must_use]
79 pub fn ffqn(&self) -> &FunctionFqn {
80 assert_matches!(self.events.first(), Some(ExecutionEvent {
81 event: ExecutionRequest::Created { ffqn, .. },
82 ..
83 }) => ffqn)
84 }
85
86 #[must_use]
87 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
88 assert_matches!(self.events.first(), Some(ExecutionEvent {
89 event: ExecutionRequest::Created { parent, .. },
90 ..
91 }) => parent.clone())
92 }
93
94 #[must_use]
95 pub fn last_event(&self) -> &ExecutionEvent {
96 self.events.last().expect("must contain at least one event")
97 }
98
99 #[must_use]
100 pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
101 if let ExecutionEvent {
102 event: ExecutionRequest::Finished { result, .. },
103 ..
104 } = self.events.pop().expect("must contain at least one event")
105 {
106 Some(result)
107 } else {
108 None
109 }
110 }
111
112 #[cfg(feature = "test")]
113 pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
114 self.events.iter().filter_map(|event| {
115 if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
116 Some((eh.clone(), event.version.clone()))
117 } else {
118 None
119 }
120 })
121 }
122
123 #[cfg(feature = "test")]
124 #[must_use]
125 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
126 self.events
127 .iter()
128 .find_map(move |event| match &event.event {
129 ExecutionRequest::HistoryEvent {
130 event:
131 HistoryEvent::JoinSetRequest {
132 join_set_id: found,
133 request,
134 },
135 ..
136 } if *join_set_id == *found => Some(request),
137 _ => None,
138 })
139 }
140}
141
142pub type VersionType = u32;
143#[derive(
144 Debug,
145 Default,
146 Clone,
147 PartialEq,
148 Eq,
149 Hash,
150 derive_more::Display,
151 derive_more::Into,
152 serde::Serialize,
153 serde::Deserialize,
154)]
155#[serde(transparent)]
156pub struct Version(pub VersionType);
157impl Version {
158 #[must_use]
159 pub fn new(arg: VersionType) -> Version {
160 Version(arg)
161 }
162
163 #[must_use]
164 pub fn increment(&self) -> Version {
165 Version(self.0 + 1)
166 }
167}
168impl TryFrom<i64> for Version {
169 type Error = VersionParseError;
170 fn try_from(value: i64) -> Result<Self, Self::Error> {
171 VersionType::try_from(value)
172 .map(Version::new)
173 .map_err(|_| VersionParseError)
174 }
175}
176impl From<Version> for usize {
177 fn from(value: Version) -> Self {
178 usize::try_from(value.0).expect("16 bit systems are unsupported")
179 }
180}
181impl From<&Version> for usize {
182 fn from(value: &Version) -> Self {
183 usize::try_from(value.0).expect("16 bit systems are unsupported")
184 }
185}
186
187#[derive(Debug, thiserror::Error)]
188#[error("version must be u32")]
189pub struct VersionParseError;
190
191#[derive(
192 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
193)]
194#[display("{event}")]
195pub struct ExecutionEvent {
196 pub created_at: DateTime<Utc>,
197 pub event: ExecutionRequest,
198 #[serde(skip_serializing_if = "Option::is_none")]
199 pub backtrace_id: Option<Version>,
200 pub version: Version,
201}
202
203#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
205pub struct JoinSetResponseEventOuter {
206 pub created_at: DateTime<Utc>,
207 pub event: JoinSetResponseEvent,
208}
209
210#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
211pub struct JoinSetResponseEvent {
212 pub join_set_id: JoinSetId,
213 pub event: JoinSetResponse,
214}
215
216#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
217#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
218#[serde(tag = "type")]
219pub enum JoinSetResponse {
220 #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
221 DelayFinished {
222 delay_id: DelayId,
223 result: Result<(), ()>,
224 },
225 #[display("{result}: {child_execution_id}")] ChildExecutionFinished {
227 child_execution_id: ExecutionIdDerived,
228 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
229 finished_version: Version,
230 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
231 result: SupportedFunctionReturnValue,
232 },
233}
234
235pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
236 ffqn: FunctionFqn::new_static("", ""),
237 params: Params::empty(),
238 parent: None,
239 scheduled_at: DateTime::from_timestamp_nanos(0),
240 component_id: ComponentId::dummy_activity(),
241 metadata: ExecutionMetadata::empty(),
242 scheduled_by: None,
243};
244pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
245 event: HistoryEvent::JoinSetCreate {
246 join_set_id: JoinSetId {
247 kind: crate::JoinSetKind::OneOff,
248 name: StrVariant::empty(),
249 },
250 },
251};
252
253#[derive(
254 Clone,
255 derive_more::Debug,
256 derive_more::Display,
257 PartialEq,
258 Eq,
259 Serialize,
260 Deserialize,
261 IntoStaticStr,
262)]
263#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
264#[allow(clippy::large_enum_variant)]
265pub enum ExecutionRequest {
266 #[display("Created({ffqn}, `{scheduled_at}`)")]
267 Created {
268 ffqn: FunctionFqn,
269 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
270 #[debug(skip)]
271 params: Params,
272 parent: Option<(ExecutionId, JoinSetId)>,
273 scheduled_at: DateTime<Utc>,
274 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
275 component_id: ComponentId,
276 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
277 metadata: ExecutionMetadata,
278 scheduled_by: Option<ExecutionId>,
279 },
280 Locked(Locked),
281 #[display("Unlocked(`{backoff_expires_at}`)")]
286 Unlocked {
287 backoff_expires_at: DateTime<Utc>,
288 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
289 reason: StrVariant,
290 },
291 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
294 TemporarilyFailed {
295 backoff_expires_at: DateTime<Utc>,
296 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
297 reason: StrVariant,
298 detail: Option<String>,
299 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
300 http_client_traces: Option<Vec<HttpClientTrace>>,
301 },
302 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
305 TemporarilyTimedOut {
306 backoff_expires_at: DateTime<Utc>,
307 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
308 http_client_traces: Option<Vec<HttpClientTrace>>,
309 },
310 #[display("Finished")]
312 Finished {
313 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
314 result: SupportedFunctionReturnValue,
315 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
316 http_client_traces: Option<Vec<HttpClientTrace>>,
317 },
318
319 #[display("HistoryEvent({event})")]
320 HistoryEvent {
321 event: HistoryEvent,
322 },
323}
324
325impl ExecutionRequest {
326 #[must_use]
327 pub fn is_temporary_event(&self) -> bool {
328 matches!(
329 self,
330 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
331 )
332 }
333
334 #[must_use]
335 pub fn variant(&self) -> &'static str {
336 Into::<&'static str>::into(self)
337 }
338
339 #[must_use]
340 pub fn join_set_id(&self) -> Option<&JoinSetId> {
341 match self {
342 Self::Created {
343 parent: Some((_parent_id, join_set_id)),
344 ..
345 } => Some(join_set_id),
346 Self::HistoryEvent {
347 event:
348 HistoryEvent::JoinSetCreate { join_set_id, .. }
349 | HistoryEvent::JoinSetRequest { join_set_id, .. }
350 | HistoryEvent::JoinNext { join_set_id, .. },
351 } => Some(join_set_id),
352 _ => None,
353 }
354 }
355}
356
357#[derive(
358 Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
359)]
360#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
361#[display("Locked(`{lock_expires_at}`, {component_id})")]
362pub struct Locked {
363 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
364 pub component_id: ComponentId,
365 pub executor_id: ExecutorId,
366 pub run_id: RunId,
367 pub lock_expires_at: DateTime<Utc>,
368 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
369 pub retry_config: ComponentRetryConfig,
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
373#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
374#[serde(tag = "type")]
375pub enum PersistKind {
376 #[display("RandomU64({min}, {max_inclusive})")]
377 RandomU64 { min: u64, max_inclusive: u64 },
378 #[display("RandomString({min_length}, {max_length_exclusive})")]
379 RandomString {
380 min_length: u64,
381 max_length_exclusive: u64,
382 },
383}
384
385#[must_use]
386pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
387 value.to_be_bytes()
388}
389
390#[must_use]
391pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
392 u64::from_be_bytes(bytes)
393}
394
395#[derive(
396 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
397)]
398#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
399#[serde(tag = "type")]
400pub enum HistoryEvent {
402 #[display("Persist")]
404 Persist {
405 #[debug(skip)]
406 value: Vec<u8>, kind: PersistKind,
408 },
409 #[display("JoinSetCreate({join_set_id})")]
410 JoinSetCreate { join_set_id: JoinSetId },
411 #[display("JoinSetRequest({request})")]
412 JoinSetRequest {
414 join_set_id: JoinSetId,
415 request: JoinSetRequest,
416 },
417 #[display("JoinNext({join_set_id})")]
423 JoinNext {
424 join_set_id: JoinSetId,
425 run_expires_at: DateTime<Utc>,
428 requested_ffqn: Option<FunctionFqn>,
431 closing: bool,
433 },
434 #[display("JoinNextTooMany({join_set_id})")]
436 JoinNextTooMany {
437 join_set_id: JoinSetId,
438 requested_ffqn: Option<FunctionFqn>,
441 },
442 #[display("Schedule({execution_id}, {schedule_at})")]
443 Schedule {
444 execution_id: ExecutionId,
445 schedule_at: HistoryEventScheduleAt, },
447 #[display("Stub({target_execution_id})")]
448 Stub {
449 target_execution_id: ExecutionIdDerived,
450 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
451 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
454}
455
456#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
457#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
458pub enum HistoryEventScheduleAt {
459 Now,
460 #[display("At(`{_0}`)")]
461 At(DateTime<Utc>),
462 #[display("In({_0:?})")]
463 In(Duration),
464}
465
466#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
467pub enum ScheduleAtConversionError {
468 #[error("source duration value is out of range")]
469 OutOfRangeError,
470}
471
472impl HistoryEventScheduleAt {
473 pub fn as_date_time(
474 &self,
475 now: DateTime<Utc>,
476 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
477 match self {
478 Self::Now => Ok(now),
479 Self::At(date_time) => Ok(*date_time),
480 Self::In(duration) => {
481 let time_delta = TimeDelta::from_std(*duration)
482 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
483 now.checked_add_signed(time_delta)
484 .ok_or(ScheduleAtConversionError::OutOfRangeError)
485 }
486 }
487 }
488}
489
490#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
491#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
492#[serde(tag = "type")]
493pub enum JoinSetRequest {
494 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
496 DelayRequest {
497 delay_id: DelayId,
498 expires_at: DateTime<Utc>,
499 schedule_at: HistoryEventScheduleAt,
500 },
501 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
503 ChildExecutionRequest {
504 child_execution_id: ExecutionIdDerived,
505 target_ffqn: FunctionFqn,
506 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
507 params: Params,
508 },
509}
510
511#[derive(Debug, Clone, thiserror::Error, PartialEq)]
513pub enum DbErrorGeneric {
514 #[error("database error: {0}")]
515 Uncategorized(StrVariant),
516 #[error("database was closed")]
517 Close,
518}
519
520#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
521pub enum DbErrorWriteNonRetriable {
522 #[error("validation failed: {0}")]
523 ValidationFailed(StrVariant),
524 #[error("conflict")]
525 Conflict,
526 #[error("illegal state: {0}")]
527 IllegalState(StrVariant),
528 #[error("version conflict: expected: {expected}, got: {requested}")]
529 VersionConflict {
530 expected: Version,
531 requested: Version,
532 },
533}
534
535#[derive(Debug, Clone, thiserror::Error, PartialEq)]
537pub enum DbErrorWrite {
538 #[error("cannot write - row not found")]
539 NotFound,
540 #[error("non-retriable error: {0}")]
541 NonRetriable(#[from] DbErrorWriteNonRetriable),
542 #[error(transparent)]
543 Generic(#[from] DbErrorGeneric),
544}
545
546#[derive(Debug, Clone, thiserror::Error, PartialEq)]
548pub enum DbErrorRead {
549 #[error("cannot read - row not found")]
550 NotFound,
551 #[error(transparent)]
552 Generic(#[from] DbErrorGeneric),
553}
554
555impl From<DbErrorRead> for DbErrorWrite {
556 fn from(value: DbErrorRead) -> DbErrorWrite {
557 match value {
558 DbErrorRead::NotFound => DbErrorWrite::NotFound,
559 DbErrorRead::Generic(err) => DbErrorWrite::Generic(err),
560 }
561 }
562}
563
564#[derive(Debug, thiserror::Error, PartialEq)]
565pub enum DbErrorReadWithTimeout {
566 #[error("timeout")]
567 Timeout(TimeoutOutcome),
568 #[error(transparent)]
569 DbErrorRead(#[from] DbErrorRead),
570}
571impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
572 fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
573 Self::from(DbErrorRead::from(value))
574 }
575}
576
577pub type AppendResponse = Version;
580pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
581
582#[derive(Debug, Clone)]
583pub struct LockedExecution {
584 pub execution_id: ExecutionId,
585 pub next_version: Version,
586 pub metadata: ExecutionMetadata,
587 pub locked_event: Locked,
588 pub ffqn: FunctionFqn,
589 pub params: Params,
590 pub event_history: Vec<(HistoryEvent, Version)>,
591 pub responses: Vec<JoinSetResponseEventOuter>,
592 pub parent: Option<(ExecutionId, JoinSetId)>,
593 pub intermittent_event_count: u32,
594}
595
596pub type LockPendingResponse = Vec<LockedExecution>;
597pub type AppendBatchResponse = Version;
598
599#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
600#[display("{event}")]
601pub struct AppendRequest {
602 pub created_at: DateTime<Utc>,
603 pub event: ExecutionRequest,
604}
605
606#[derive(Debug, Clone)]
607pub struct CreateRequest {
608 pub created_at: DateTime<Utc>,
609 pub execution_id: ExecutionId,
610 pub ffqn: FunctionFqn,
611 pub params: Params,
612 pub parent: Option<(ExecutionId, JoinSetId)>,
613 pub scheduled_at: DateTime<Utc>,
614 pub component_id: ComponentId,
615 pub metadata: ExecutionMetadata,
616 pub scheduled_by: Option<ExecutionId>,
617}
618
619impl From<CreateRequest> for ExecutionRequest {
620 fn from(value: CreateRequest) -> Self {
621 Self::Created {
622 ffqn: value.ffqn,
623 params: value.params,
624 parent: value.parent,
625 scheduled_at: value.scheduled_at,
626 component_id: value.component_id,
627 metadata: value.metadata,
628 scheduled_by: value.scheduled_by,
629 }
630 }
631}
632
633#[async_trait]
634pub trait DbPool: Send + Sync {
635 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
636
637 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
638
639 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
640
641 #[cfg(feature = "test")]
642 async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
643}
644
645#[async_trait]
646pub trait DbPoolCloseable {
647 async fn close(&self);
648}
649
650#[derive(Clone, Debug)]
651pub struct AppendEventsToExecution {
652 pub execution_id: ExecutionId,
653 pub version: Version,
654 pub batch: Vec<AppendRequest>,
655}
656
657#[derive(Clone, Debug)]
658pub struct AppendResponseToExecution {
659 pub parent_execution_id: ExecutionId,
660 pub created_at: DateTime<Utc>,
661 pub join_set_id: JoinSetId,
662 pub child_execution_id: ExecutionIdDerived,
663 pub finished_version: Version,
664 pub result: SupportedFunctionReturnValue,
665}
666
667#[async_trait]
668pub trait DbExecutor: Send + Sync {
669 #[expect(clippy::too_many_arguments)]
670 async fn lock_pending_by_ffqns(
671 &self,
672 batch_size: u32,
673 pending_at_or_sooner: DateTime<Utc>,
674 ffqns: Arc<[FunctionFqn]>,
675 created_at: DateTime<Utc>,
676 component_id: ComponentId,
677 executor_id: ExecutorId,
678 lock_expires_at: DateTime<Utc>,
679 run_id: RunId,
680 retry_config: ComponentRetryConfig,
681 ) -> Result<LockPendingResponse, DbErrorGeneric>;
682
683 #[expect(clippy::too_many_arguments)]
684 async fn lock_pending_by_component_id(
685 &self,
686 batch_size: u32,
687 pending_at_or_sooner: DateTime<Utc>,
688 component_id: &ComponentId,
689 created_at: DateTime<Utc>,
690 executor_id: ExecutorId,
691 lock_expires_at: DateTime<Utc>,
692 run_id: RunId,
693 retry_config: ComponentRetryConfig,
694 ) -> Result<LockPendingResponse, DbErrorGeneric>;
695
696 #[expect(clippy::too_many_arguments)]
698 async fn lock_one(
699 &self,
700 created_at: DateTime<Utc>,
701 component_id: ComponentId,
702 execution_id: &ExecutionId,
703 run_id: RunId,
704 version: Version,
705 executor_id: ExecutorId,
706 lock_expires_at: DateTime<Utc>,
707 retry_config: ComponentRetryConfig,
708 ) -> Result<LockedExecution, DbErrorWrite>;
709
710 async fn append(
713 &self,
714 execution_id: ExecutionId,
715 version: Version,
716 req: AppendRequest,
717 ) -> Result<AppendResponse, DbErrorWrite>;
718
719 async fn append_batch_respond_to_parent(
722 &self,
723 events: AppendEventsToExecution,
724 response: AppendResponseToExecution,
725 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
727
728 async fn wait_for_pending_by_ffqn(
733 &self,
734 pending_at_or_sooner: DateTime<Utc>,
735 ffqns: Arc<[FunctionFqn]>,
736 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
737 );
738
739 async fn wait_for_pending_by_component_id(
744 &self,
745 pending_at_or_sooner: DateTime<Utc>,
746 component_id: &ComponentId,
747 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
748 );
749
750 async fn cancel_activity_with_retries(
751 &self,
752 execution_id: &ExecutionId,
753 cancelled_at: DateTime<Utc>,
754 ) -> Result<CancelOutcome, DbErrorWrite> {
755 let mut retries = 5;
756 loop {
757 let res = self.cancel_activity(execution_id, cancelled_at).await;
758 if res.is_ok() || retries == 0 {
759 return res;
760 }
761 retries -= 1;
762 }
763 }
764
765 async fn get_last_execution_event(
767 &self,
768 execution_id: &ExecutionId,
769 ) -> Result<ExecutionEvent, DbErrorRead>;
770
771 async fn cancel_activity(
772 &self,
773 execution_id: &ExecutionId,
774 cancelled_at: DateTime<Utc>,
775 ) -> Result<CancelOutcome, DbErrorWrite> {
776 debug!("Determining cancellation state of {execution_id}");
777
778 let last_event = self
779 .get_last_execution_event(execution_id)
780 .await
781 .map_err(DbErrorWrite::from)?;
782 if let ExecutionRequest::Finished {
783 result:
784 SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
785 kind: ExecutionFailureKind::Cancelled,
786 ..
787 }),
788 ..
789 } = last_event.event
790 {
791 return Ok(CancelOutcome::Cancelled);
792 } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
793 debug!("Not cancelling, {execution_id} is already finished");
794 return Ok(CancelOutcome::AlreadyFinished);
795 }
796 let finished_version = last_event.version.increment();
797 let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
798 reason: None,
799 kind: ExecutionFailureKind::Cancelled,
800 detail: None,
801 });
802 let cancel_request = AppendRequest {
803 created_at: cancelled_at,
804 event: ExecutionRequest::Finished {
805 result: child_result.clone(),
806 http_client_traces: None,
807 },
808 };
809 debug!("Cancelling activity {execution_id} at {finished_version}");
810 if let ExecutionId::Derived(execution_id) = execution_id {
811 let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
812 let child_execution_id = ExecutionId::Derived(execution_id.clone());
813 self.append_batch_respond_to_parent(
814 AppendEventsToExecution {
815 execution_id: child_execution_id,
816 version: finished_version.clone(),
817 batch: vec![cancel_request],
818 },
819 AppendResponseToExecution {
820 parent_execution_id,
821 created_at: cancelled_at,
822 join_set_id: join_set_id.clone(),
823 child_execution_id: execution_id.clone(),
824 finished_version,
825 result: child_result,
826 },
827 cancelled_at,
828 )
829 .await?;
830 } else {
831 self.append(execution_id.clone(), finished_version, cancel_request)
832 .await?;
833 }
834 debug!("Cancelled {execution_id}");
835 Ok(CancelOutcome::Cancelled)
836 }
837}
838
839pub enum AppendDelayResponseOutcome {
840 Success,
841 AlreadyFinished,
842 AlreadyCancelled,
843}
844
845#[async_trait]
846pub trait DbExternalApi: DbConnection {
847 async fn get_backtrace(
849 &self,
850 execution_id: &ExecutionId,
851 filter: BacktraceFilter,
852 ) -> Result<BacktraceInfo, DbErrorRead>;
853
854 async fn list_executions(
856 &self,
857 ffqn: Option<FunctionFqn>,
858 top_level_only: bool,
859 pagination: ExecutionListPagination,
860 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
861
862 async fn list_execution_events(
863 &self,
864 execution_id: &ExecutionId,
865 since: &Version,
866 max_length: VersionType,
867 include_backtrace_id: bool,
868 ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
869
870 async fn list_responses(
873 &self,
874 execution_id: &ExecutionId,
875 pagination: Pagination<u32>,
876 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
877
878 async fn upgrade_execution_component(
879 &self,
880 execution_id: &ExecutionId,
881 old: &InputContentDigest,
882 new: &InputContentDigest,
883 ) -> Result<(), DbErrorWrite>;
884}
885
886#[async_trait]
887pub trait DbConnection: DbExecutor {
888 async fn append_delay_response(
889 &self,
890 created_at: DateTime<Utc>,
891 execution_id: ExecutionId,
892 join_set_id: JoinSetId,
893 delay_id: DelayId,
894 outcome: Result<(), ()>, ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
896
897 async fn append_batch(
900 &self,
901 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
903 execution_id: ExecutionId,
904 version: Version,
905 ) -> Result<AppendBatchResponse, DbErrorWrite>;
906
907 async fn append_batch_create_new_execution(
910 &self,
911 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
914 version: Version,
915 child_req: Vec<CreateRequest>,
916 ) -> Result<AppendBatchResponse, DbErrorWrite>;
917
918 async fn get_execution_event(
920 &self,
921 execution_id: &ExecutionId,
922 version: &Version,
923 ) -> Result<ExecutionEvent, DbErrorRead>;
924
925 async fn get_create_request(
926 &self,
927 execution_id: &ExecutionId,
928 ) -> Result<CreateRequest, DbErrorRead> {
929 let execution_event = self
930 .get_execution_event(execution_id, &Version::new(0))
931 .await?;
932 if let ExecutionRequest::Created {
933 ffqn,
934 params,
935 parent,
936 scheduled_at,
937 component_id,
938 metadata,
939 scheduled_by,
940 } = execution_event.event
941 {
942 Ok(CreateRequest {
943 created_at: execution_event.created_at,
944 execution_id: execution_id.clone(),
945 ffqn,
946 params,
947 parent,
948 scheduled_at,
949 component_id,
950 metadata,
951 scheduled_by,
952 })
953 } else {
954 error!(%execution_id, "Execution log must start with creation");
955 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized(
956 "execution log must start with creation".into(),
957 )))
958 }
959 }
960
961 async fn get_pending_state(
962 &self,
963 execution_id: &ExecutionId,
964 ) -> Result<PendingState, DbErrorRead>;
965
966 async fn get_expired_timers(
968 &self,
969 at: DateTime<Utc>,
970 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
971
972 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
974
975 async fn subscribe_to_next_responses(
982 &self,
983 execution_id: &ExecutionId,
984 start_idx: u32,
985 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
986 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
987
988 async fn wait_for_finished_result(
993 &self,
994 execution_id: &ExecutionId,
995 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
996 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
998
999 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1000
1001 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1002}
1003
1004#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1005pub enum TimeoutOutcome {
1006 Timeout,
1007 Cancel,
1008}
1009
1010#[cfg(feature = "test")]
1011#[async_trait]
1012pub trait DbConnectionTest: DbConnection {
1013 async fn append_response(
1014 &self,
1015 created_at: DateTime<Utc>,
1016 execution_id: ExecutionId,
1017 response_event: JoinSetResponseEvent,
1018 ) -> Result<(), DbErrorWrite>;
1019
1020 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1022}
1023
1024#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1025pub enum CancelOutcome {
1026 Cancelled,
1027 AlreadyFinished,
1028}
1029
1030pub async fn stub_execution(
1031 db_connection: &dyn DbConnection,
1032 execution_id: ExecutionIdDerived,
1033 parent_execution_id: ExecutionId,
1034 join_set_id: JoinSetId,
1035 created_at: DateTime<Utc>,
1036 return_value: SupportedFunctionReturnValue,
1037) -> Result<(), DbErrorWrite> {
1038 let stub_finished_version = Version::new(1); let write_attempt = {
1041 let finished_req = AppendRequest {
1042 created_at,
1043 event: ExecutionRequest::Finished {
1044 result: return_value.clone(),
1045 http_client_traces: None,
1046 },
1047 };
1048 db_connection
1049 .append_batch_respond_to_parent(
1050 AppendEventsToExecution {
1051 execution_id: ExecutionId::Derived(execution_id.clone()),
1052 version: stub_finished_version.clone(),
1053 batch: vec![finished_req],
1054 },
1055 AppendResponseToExecution {
1056 parent_execution_id,
1057 created_at,
1058 join_set_id,
1059 child_execution_id: execution_id.clone(),
1060 finished_version: stub_finished_version.clone(),
1061 result: return_value.clone(),
1062 },
1063 created_at,
1064 )
1065 .await
1066 };
1067 if let Err(write_attempt) = write_attempt {
1068 debug!("Stub write attempt failed - {write_attempt:?}");
1070
1071 let found = db_connection
1072 .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1073 .await?; match found.event {
1075 ExecutionRequest::Finished {
1076 result: found_result,
1077 ..
1078 } if return_value == found_result => {
1079 Ok(())
1081 }
1082 ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1083 DbErrorWriteNonRetriable::Conflict,
1084 )),
1085 _other => Err(DbErrorWrite::NonRetriable(
1086 DbErrorWriteNonRetriable::IllegalState(
1087 "unexpected execution event at stubbed execution".into(),
1088 ),
1089 )),
1090 }
1091 } else {
1092 Ok(())
1093 }
1094}
1095
1096pub async fn cancel_delay(
1097 db_connection: &dyn DbConnection,
1098 delay_id: DelayId,
1099 created_at: DateTime<Utc>,
1100) -> Result<CancelOutcome, DbErrorWrite> {
1101 let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1102 db_connection
1103 .append_delay_response(
1104 created_at,
1105 parent_execution_id,
1106 join_set_id,
1107 delay_id,
1108 Err(()), )
1110 .await
1111 .map(|ok| match ok {
1112 AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1113 CancelOutcome::Cancelled
1114 }
1115 AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1116 })
1117}
1118
1119#[derive(Clone)]
1120pub enum BacktraceFilter {
1121 First,
1122 Last,
1123 Specific(Version),
1124}
1125
1126#[derive(Clone, Debug, PartialEq, Eq)]
1127pub struct BacktraceInfo {
1128 pub execution_id: ExecutionId,
1129 pub component_id: ComponentId,
1130 pub version_min_including: Version,
1131 pub version_max_excluding: Version,
1132 pub wasm_backtrace: WasmBacktrace,
1133}
1134
1135#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1136pub struct WasmBacktrace {
1137 pub frames: Vec<FrameInfo>,
1138}
1139
1140#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1141pub struct FrameInfo {
1142 pub module: String,
1143 pub func_name: String,
1144 pub symbols: Vec<FrameSymbol>,
1145}
1146
1147#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1148pub struct FrameSymbol {
1149 pub func_name: Option<String>,
1150 pub file: Option<String>,
1151 pub line: Option<u32>,
1152 pub col: Option<u32>,
1153}
1154
1155mod wasm_backtrace {
1156 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1157
1158 impl WasmBacktrace {
1159 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1160 if backtrace.frames().is_empty() {
1161 None
1162 } else {
1163 Some(Self {
1164 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1165 })
1166 }
1167 }
1168 }
1169
1170 impl From<&wasmtime::FrameInfo> for FrameInfo {
1171 fn from(frame: &wasmtime::FrameInfo) -> Self {
1172 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1173 let mut func_name = String::new();
1174 wasmtime_environ::demangle_function_name_or_index(
1175 &mut func_name,
1176 frame.func_name(),
1177 frame.func_index() as usize,
1178 )
1179 .expect("writing to string must succeed");
1180 Self {
1181 module: module_name,
1182 func_name,
1183 symbols: frame
1184 .symbols()
1185 .iter()
1186 .map(std::convert::Into::into)
1187 .collect(),
1188 }
1189 }
1190 }
1191
1192 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1193 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1194 let func_name = symbol.name().map(|name| {
1195 let mut writer = String::new();
1196 wasmtime_environ::demangle_function_name(&mut writer, name)
1197 .expect("writing to string must succeed");
1198 writer
1199 });
1200
1201 Self {
1202 func_name,
1203 file: symbol.file().map(ToString::to_string),
1204 line: symbol.line(),
1205 col: symbol.column(),
1206 }
1207 }
1208 }
1209}
1210
1211pub type ResponseCursorType = u32;
1212
1213#[derive(Debug, Clone, Serialize)]
1214pub struct ResponseWithCursor {
1215 pub event: JoinSetResponseEventOuter,
1216 pub cursor: ResponseCursorType,
1217}
1218
1219#[derive(Debug)]
1220pub struct ExecutionWithState {
1221 pub execution_id: ExecutionId,
1222 pub ffqn: FunctionFqn,
1223 pub pending_state: PendingState,
1224 pub created_at: DateTime<Utc>,
1225 pub first_scheduled_at: DateTime<Utc>,
1226 pub component_digest: InputContentDigest,
1227}
1228
1229#[derive(Debug, Clone)]
1230pub enum ExecutionListPagination {
1231 CreatedBy(Pagination<Option<DateTime<Utc>>>),
1232 ExecutionId(Pagination<Option<ExecutionId>>),
1233}
1234impl Default for ExecutionListPagination {
1235 fn default() -> ExecutionListPagination {
1236 ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1237 length: 20,
1238 cursor: None,
1239 including_cursor: false, })
1241 }
1242}
1243impl ExecutionListPagination {
1244 #[must_use]
1245 pub fn length(&self) -> u16 {
1246 match self {
1247 ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1248 ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1249 }
1250 }
1251}
1252
1253#[derive(Debug, Clone, Copy)]
1254pub enum Pagination<T> {
1255 NewerThan {
1256 length: u16,
1257 cursor: T,
1258 including_cursor: bool,
1259 },
1260 OlderThan {
1261 length: u16,
1262 cursor: T,
1263 including_cursor: bool,
1264 },
1265}
1266impl<T> Pagination<T> {
1267 pub fn length(&self) -> u16 {
1268 match self {
1269 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1270 }
1271 }
1272 pub fn rel(&self) -> &'static str {
1273 match self {
1274 Pagination::NewerThan {
1275 including_cursor: false,
1276 ..
1277 } => ">",
1278 Pagination::NewerThan {
1279 including_cursor: true,
1280 ..
1281 } => ">=",
1282 Pagination::OlderThan {
1283 including_cursor: false,
1284 ..
1285 } => "<",
1286 Pagination::OlderThan {
1287 including_cursor: true,
1288 ..
1289 } => "<=",
1290 }
1291 }
1292 pub fn is_desc(&self) -> bool {
1293 matches!(self, Pagination::OlderThan { .. })
1294 }
1295 pub fn cursor(&self) -> &T {
1296 match self {
1297 Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1298 }
1299 }
1300}
1301
1302#[cfg(feature = "test")]
1303pub async fn wait_for_pending_state_fn<T: Debug>(
1304 db_connection: &dyn DbConnectionTest,
1305 execution_id: &ExecutionId,
1306 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1307 timeout: Option<Duration>,
1308) -> Result<T, DbErrorReadWithTimeout> {
1309 tracing::trace!(%execution_id, "Waiting for predicate");
1310 let fut = async move {
1311 loop {
1312 let execution_log = db_connection.get(execution_id).await?;
1313 if let Some(t) = predicate(execution_log) {
1314 tracing::debug!(%execution_id, "Found: {t:?}");
1315 return Ok(t);
1316 }
1317 tokio::time::sleep(Duration::from_millis(10)).await;
1318 }
1319 };
1320
1321 if let Some(timeout) = timeout {
1322 tokio::select! { res = fut => res,
1324 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
1325 }
1326 } else {
1327 fut.await
1328 }
1329}
1330
1331#[derive(Debug, Clone, PartialEq, Eq)]
1332pub enum ExpiredTimer {
1333 Lock(ExpiredLock),
1334 Delay(ExpiredDelay),
1335}
1336
1337#[derive(Debug, Clone, PartialEq, Eq)]
1338pub struct ExpiredLock {
1339 pub execution_id: ExecutionId,
1340 pub locked_at_version: Version,
1342 pub next_version: Version,
1343 pub intermittent_event_count: u32,
1345 pub max_retries: Option<u32>,
1346 pub retry_exp_backoff: Duration,
1347 pub locked_by: LockedBy,
1348}
1349
1350#[derive(Debug, Clone, PartialEq, Eq)]
1351pub struct ExpiredDelay {
1352 pub execution_id: ExecutionId,
1353 pub join_set_id: JoinSetId,
1354 pub delay_id: DelayId,
1355}
1356
1357#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1358#[serde(tag = "status")]
1359pub enum PendingState {
1360 Locked(PendingStateLocked),
1361 #[display("PendingAt(`{scheduled_at}`)")]
1362 PendingAt {
1363 scheduled_at: DateTime<Utc>,
1364 last_lock: Option<LockedBy>, component_id_input_digest: InputContentDigest,
1366 }, #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1368 BlockedByJoinSet {
1370 join_set_id: JoinSetId,
1371 lock_expires_at: DateTime<Utc>,
1373 closing: bool,
1375 component_id_input_digest: InputContentDigest,
1376 },
1377 #[display("Finished({finished})")]
1378 Finished {
1379 #[serde(flatten)]
1380 finished: PendingStateFinished,
1381 component_id_input_digest: InputContentDigest,
1382 },
1383}
1384impl PendingState {
1385 #[must_use]
1386 pub fn get_component_id_input_digest(&self) -> &InputContentDigest {
1387 match self {
1388 PendingState::Locked(pending_state_locked) => {
1389 &pending_state_locked.component_id_input_digest
1390 }
1391 PendingState::PendingAt {
1392 component_id_input_digest,
1393 ..
1394 }
1395 | PendingState::BlockedByJoinSet {
1396 component_id_input_digest,
1397 ..
1398 }
1399 | PendingState::Finished {
1400 component_id_input_digest,
1401 ..
1402 } => component_id_input_digest,
1403 }
1404 }
1405}
1406
1407#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1408#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1409pub struct PendingStateLocked {
1410 pub locked_by: LockedBy,
1411 pub lock_expires_at: DateTime<Utc>,
1412 pub component_id_input_digest: InputContentDigest,
1413}
1414
1415#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1416pub struct LockedBy {
1417 pub executor_id: ExecutorId,
1418 pub run_id: RunId,
1419}
1420
1421#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1422pub struct PendingStateFinished {
1423 pub version: VersionType, pub finished_at: DateTime<Utc>,
1425 pub result_kind: PendingStateFinishedResultKind,
1426}
1427impl Display for PendingStateFinished {
1428 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1429 match self.result_kind {
1430 PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1431 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1432 }
1433 }
1434}
1435
1436#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1438#[serde(rename_all = "snake_case")]
1439pub enum PendingStateFinishedResultKind {
1440 Ok,
1441 Err(PendingStateFinishedError),
1442}
1443impl PendingStateFinishedResultKind {
1444 pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1445 match self {
1446 PendingStateFinishedResultKind::Ok => Ok(()),
1447 PendingStateFinishedResultKind::Err(err) => Err(err),
1448 }
1449 }
1450}
1451
1452impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1453 fn from(result: &SupportedFunctionReturnValue) -> Self {
1454 result.as_pending_state_finished_result()
1455 }
1456}
1457
1458#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1459pub enum PendingStateFinishedError {
1460 #[display("execution terminated: {_0}")]
1461 ExecutionFailure(ExecutionFailureKind),
1462 #[display("execution completed with an error")]
1463 FallibleError,
1464}
1465
1466impl PendingState {
1467 pub fn can_append_lock(
1468 &self,
1469 created_at: DateTime<Utc>,
1470 executor_id: ExecutorId,
1471 run_id: RunId,
1472 lock_expires_at: DateTime<Utc>,
1473 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1474 if lock_expires_at <= created_at {
1475 return Err(DbErrorWriteNonRetriable::ValidationFailed(
1476 "invalid expiry date".into(),
1477 ));
1478 }
1479 match self {
1480 PendingState::PendingAt {
1481 scheduled_at,
1482 last_lock,
1483 component_id_input_digest: _,
1484 } => {
1485 if *scheduled_at <= created_at {
1486 Ok(LockKind::CreatingNewLock)
1488 } else if let Some(LockedBy {
1489 executor_id: last_executor_id,
1490 run_id: last_run_id,
1491 }) = last_lock
1492 && executor_id == *last_executor_id
1493 && run_id == *last_run_id
1494 {
1495 Ok(LockKind::Extending)
1497 } else {
1498 Err(DbErrorWriteNonRetriable::ValidationFailed(
1499 "cannot lock, not yet pending".into(),
1500 ))
1501 }
1502 }
1503 PendingState::Locked(PendingStateLocked {
1504 locked_by:
1505 LockedBy {
1506 executor_id: current_pending_state_executor_id,
1507 run_id: current_pending_state_run_id,
1508 },
1509 lock_expires_at: _,
1510 component_id_input_digest: _,
1511 }) => {
1512 if executor_id == *current_pending_state_executor_id
1513 && run_id == *current_pending_state_run_id
1514 {
1515 Ok(LockKind::Extending)
1517 } else {
1518 Err(DbErrorWriteNonRetriable::IllegalState(
1519 "cannot lock, already locked".into(),
1520 ))
1521 }
1522 }
1523 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1524 "cannot append Locked event when in BlockedByJoinSet state".into(),
1525 )),
1526 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1527 "already finished".into(),
1528 )),
1529 }
1530 }
1531
1532 #[must_use]
1533 pub fn is_finished(&self) -> bool {
1534 matches!(self, PendingState::Finished { .. })
1535 }
1536}
1537
1538#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1539pub enum LockKind {
1540 Extending,
1541 CreatingNewLock,
1542}
1543
1544pub mod http_client_trace {
1545 use chrono::{DateTime, Utc};
1546 use serde::{Deserialize, Serialize};
1547
1548 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1549 pub struct HttpClientTrace {
1550 pub req: RequestTrace,
1551 pub resp: Option<ResponseTrace>,
1552 }
1553
1554 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1555 pub struct RequestTrace {
1556 pub sent_at: DateTime<Utc>,
1557 pub uri: String,
1558 pub method: String,
1559 }
1560
1561 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1562 pub struct ResponseTrace {
1563 pub finished_at: DateTime<Utc>,
1564 pub status: Result<u16, String>,
1565 }
1566}
1567
1568#[cfg(test)]
1569mod tests {
1570 use super::HistoryEventScheduleAt;
1571 use super::PendingStateFinished;
1572 use super::PendingStateFinishedError;
1573 use super::PendingStateFinishedResultKind;
1574 use crate::ExecutionFailureKind;
1575 use crate::SupportedFunctionReturnValue;
1576 use chrono::DateTime;
1577 use chrono::Datelike;
1578 use chrono::Utc;
1579 use insta::assert_snapshot;
1580 use rstest::rstest;
1581 use std::time::Duration;
1582 use val_json::type_wrapper::TypeWrapper;
1583 use val_json::wast_val::WastVal;
1584 use val_json::wast_val::WastValWithType;
1585
1586 #[rstest(expected => [
1587 PendingStateFinishedResultKind::Ok,
1588 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1589 ])]
1590 #[test]
1591 fn serde_pending_state_finished_result_kind_should_work(
1592 expected: PendingStateFinishedResultKind,
1593 ) {
1594 let ser = serde_json::to_string(&expected).unwrap();
1595 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1596 assert_eq!(expected, actual);
1597 }
1598
1599 #[rstest(result_kind => [
1600 PendingStateFinishedResultKind::Ok,
1601 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1602 ])]
1603 #[test]
1604 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1605 let expected = PendingStateFinished {
1606 version: 0,
1607 finished_at: Utc::now(),
1608 result_kind,
1609 };
1610
1611 let ser = serde_json::to_string(&expected).unwrap();
1612 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1613 assert_eq!(expected, actual);
1614 }
1615
1616 #[test]
1617 fn join_set_deser_with_result_ok_option_none_should_work() {
1618 let expected = SupportedFunctionReturnValue::Ok {
1619 ok: Some(WastValWithType {
1620 r#type: TypeWrapper::Result {
1621 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1622 err: Some(Box::new(TypeWrapper::String)),
1623 },
1624 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1625 }),
1626 };
1627 let json = serde_json::to_string(&expected).unwrap();
1628 assert_snapshot!(json);
1629
1630 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1631
1632 assert_eq!(expected, actual);
1633 }
1634
1635 #[test]
1636 fn as_date_time_should_work_with_duration_u32_max_secs() {
1637 let duration = Duration::from_secs(u64::from(u32::MAX));
1638 let schedule_at = HistoryEventScheduleAt::In(duration);
1639 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1640 assert_eq!(2106, resolved.year());
1641 }
1642
1643 const MILLIS_PER_SEC: i64 = 1000;
1644 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1645
1646 #[test]
1647 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1648 let duration = Duration::from_secs(
1650 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1651 );
1652 let schedule_at = HistoryEventScheduleAt::In(duration);
1653 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1654 }
1655}