1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ExecutionFailureKind;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FinishedExecutionError;
7use crate::FunctionFqn;
8use crate::JoinSetId;
9use crate::Params;
10use crate::StrVariant;
11use crate::SupportedFunctionReturnValue;
12use crate::prefixed_ulid::DelayId;
13use crate::prefixed_ulid::ExecutionIdDerived;
14use crate::prefixed_ulid::ExecutorId;
15use crate::prefixed_ulid::RunId;
16use assert_matches::assert_matches;
17use async_trait::async_trait;
18use chrono::TimeDelta;
19use chrono::{DateTime, Utc};
20use http_client_trace::HttpClientTrace;
21use serde::Deserialize;
22use serde::Serialize;
23use std::fmt::Debug;
24use std::fmt::Display;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28use strum::IntoStaticStr;
29use tracing::debug;
30use tracing::error;
31
32#[derive(Debug, PartialEq, Eq)]
34#[cfg_attr(feature = "test", derive(Serialize))]
35pub struct ExecutionLog {
36 pub execution_id: ExecutionId,
37 pub events: Vec<ExecutionEvent>,
38 pub responses: Vec<JoinSetResponseEventOuter>,
39 pub next_version: Version, pub pending_state: PendingState,
41}
42
43impl ExecutionLog {
44 #[must_use]
45 pub fn can_be_retried_after(
46 temporary_event_count: u32,
47 max_retries: u32,
48 retry_exp_backoff: Duration,
49 ) -> Option<Duration> {
50 if temporary_event_count <= max_retries {
52 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
54 Some(duration)
55 } else {
56 None
57 }
58 }
59
60 #[must_use]
61 pub fn compute_retry_duration_when_retrying_forever(
62 temporary_event_count: u32,
63 retry_exp_backoff: Duration,
64 ) -> Duration {
65 Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
66 .expect("`max_retries` set to MAX must never return None")
67 }
68
69 #[must_use]
70 pub fn ffqn(&self) -> &FunctionFqn {
71 assert_matches!(self.events.first(), Some(ExecutionEvent {
72 event: ExecutionEventInner::Created { ffqn, .. },
73 ..
74 }) => ffqn)
75 }
76
77 #[must_use]
78 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
79 assert_matches!(self.events.first(), Some(ExecutionEvent {
80 event: ExecutionEventInner::Created { parent, .. },
81 ..
82 }) => parent.clone())
83 }
84
85 #[must_use]
86 pub fn last_event(&self) -> &ExecutionEvent {
87 self.events.last().expect("must contain at least one event")
88 }
89
90 #[must_use]
91 pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
92 if let ExecutionEvent {
93 event: ExecutionEventInner::Finished { result, .. },
94 ..
95 } = self.events.pop().expect("must contain at least one event")
96 {
97 Some(result)
98 } else {
99 None
100 }
101 }
102
103 #[cfg(feature = "test")]
104 pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
105 self.events.iter().filter_map(|event| {
106 if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
107 Some((eh.clone(), event.version.clone()))
108 } else {
109 None
110 }
111 })
112 }
113
114 #[cfg(feature = "test")]
115 #[must_use]
116 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
117 self.events
118 .iter()
119 .find_map(move |event| match &event.event {
120 ExecutionEventInner::HistoryEvent {
121 event:
122 HistoryEvent::JoinSetRequest {
123 join_set_id: found,
124 request,
125 },
126 ..
127 } if *join_set_id == *found => Some(request),
128 _ => None,
129 })
130 }
131}
132
133pub type VersionType = u32;
134#[derive(
135 Debug,
136 Default,
137 Clone,
138 PartialEq,
139 Eq,
140 Hash,
141 derive_more::Display,
142 derive_more::Into,
143 serde::Serialize,
144 serde::Deserialize,
145)]
146#[serde(transparent)]
147pub struct Version(pub VersionType);
148impl Version {
149 #[must_use]
150 pub fn new(arg: VersionType) -> Version {
151 Version(arg)
152 }
153
154 #[must_use]
155 pub fn increment(&self) -> Version {
156 Version(self.0 + 1)
157 }
158}
159
160#[derive(
161 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
162)]
163#[display("{event}")]
164pub struct ExecutionEvent {
165 pub created_at: DateTime<Utc>,
166 pub event: ExecutionEventInner,
167 pub backtrace_id: Option<Version>,
168 pub version: Version,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
173pub struct JoinSetResponseEventOuter {
174 pub created_at: DateTime<Utc>,
175 pub event: JoinSetResponseEvent,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
179pub struct JoinSetResponseEvent {
180 pub join_set_id: JoinSetId,
181 pub event: JoinSetResponse,
182}
183
184#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
185#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
186#[serde(tag = "type")]
187pub enum JoinSetResponse {
188 DelayFinished {
189 delay_id: DelayId,
190 result: Result<(), ()>,
191 },
192 ChildExecutionFinished {
193 child_execution_id: ExecutionIdDerived,
194 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
195 finished_version: Version,
196 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
197 result: SupportedFunctionReturnValue,
198 },
199}
200
201pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
202 ffqn: FunctionFqn::new_static("", ""),
203 params: Params::empty(),
204 parent: None,
205 scheduled_at: DateTime::from_timestamp_nanos(0),
206 component_id: ComponentId::dummy_activity(),
207 metadata: ExecutionMetadata::empty(),
208 scheduled_by: None,
209};
210pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
211 event: HistoryEvent::JoinSetCreate {
212 join_set_id: JoinSetId {
213 kind: crate::JoinSetKind::OneOff,
214 name: StrVariant::empty(),
215 },
216 },
217};
218
219#[derive(
220 Clone,
221 derive_more::Debug,
222 derive_more::Display,
223 PartialEq,
224 Eq,
225 Serialize,
226 Deserialize,
227 IntoStaticStr,
228)]
229#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
230#[allow(clippy::large_enum_variant)]
231pub enum ExecutionEventInner {
232 #[display("Created({ffqn}, `{scheduled_at}`)")]
233 Created {
234 ffqn: FunctionFqn,
235 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
236 #[debug(skip)]
237 params: Params,
238 parent: Option<(ExecutionId, JoinSetId)>,
239 scheduled_at: DateTime<Utc>,
240 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
241 component_id: ComponentId,
242 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
243 metadata: ExecutionMetadata,
244 scheduled_by: Option<ExecutionId>,
245 },
246 Locked(Locked),
247 #[display("Unlocked(`{backoff_expires_at}`)")]
252 Unlocked {
253 backoff_expires_at: DateTime<Utc>,
254 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
255 reason: StrVariant,
256 },
257 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
260 TemporarilyFailed {
261 backoff_expires_at: DateTime<Utc>,
262 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
263 reason: StrVariant,
264 detail: Option<String>,
265 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
266 http_client_traces: Option<Vec<HttpClientTrace>>,
267 },
268 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
271 TemporarilyTimedOut {
272 backoff_expires_at: DateTime<Utc>,
273 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
274 http_client_traces: Option<Vec<HttpClientTrace>>,
275 },
276 #[display("Finished")]
278 Finished {
279 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
280 result: SupportedFunctionReturnValue,
281 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
282 http_client_traces: Option<Vec<HttpClientTrace>>,
283 },
284
285 #[display("HistoryEvent({event})")]
286 HistoryEvent {
287 event: HistoryEvent,
288 },
289}
290
291impl ExecutionEventInner {
292 #[must_use]
293 pub fn is_temporary_event(&self) -> bool {
294 matches!(
295 self,
296 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
297 )
298 }
299
300 #[must_use]
301 pub fn variant(&self) -> &'static str {
302 Into::<&'static str>::into(self)
303 }
304
305 #[must_use]
306 pub fn join_set_id(&self) -> Option<&JoinSetId> {
307 match self {
308 Self::Created {
309 parent: Some((_parent_id, join_set_id)),
310 ..
311 } => Some(join_set_id),
312 Self::HistoryEvent {
313 event:
314 HistoryEvent::JoinSetCreate { join_set_id, .. }
315 | HistoryEvent::JoinSetRequest { join_set_id, .. }
316 | HistoryEvent::JoinNext { join_set_id, .. },
317 } => Some(join_set_id),
318 _ => None,
319 }
320 }
321}
322
323#[derive(
324 Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
325)]
326#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
327#[display("Locked(`{lock_expires_at}`, {component_id})")]
328pub struct Locked {
329 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
330 pub component_id: ComponentId,
331 pub executor_id: ExecutorId,
332 pub run_id: RunId,
333 pub lock_expires_at: DateTime<Utc>,
334 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
335 pub retry_config: ComponentRetryConfig,
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
339#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
340#[serde(tag = "type")]
341pub enum PersistKind {
342 #[display("RandomU64({min}, {max_inclusive})")]
343 RandomU64 { min: u64, max_inclusive: u64 },
344 #[display("RandomString({min_length}, {max_length_exclusive})")]
345 RandomString {
346 min_length: u64,
347 max_length_exclusive: u64,
348 },
349}
350
351#[must_use]
352pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
353 value.to_be_bytes()
354}
355
356#[must_use]
357pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
358 u64::from_be_bytes(bytes)
359}
360
361#[derive(
362 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
363)]
364#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
365#[serde(tag = "type")]
366pub enum HistoryEvent {
368 #[display("Persist")]
370 Persist {
371 #[debug(skip)]
372 value: Vec<u8>, kind: PersistKind,
374 },
375 #[display("JoinSetCreate({join_set_id})")]
376 JoinSetCreate { join_set_id: JoinSetId },
377 #[display("JoinSetRequest({request})")]
378 JoinSetRequest {
380 join_set_id: JoinSetId,
381 request: JoinSetRequest,
382 },
383 #[display("JoinNext({join_set_id})")]
389 JoinNext {
390 join_set_id: JoinSetId,
391 run_expires_at: DateTime<Utc>,
394 requested_ffqn: Option<FunctionFqn>,
397 closing: bool,
399 },
400 #[display("JoinNextTooMany({join_set_id})")]
402 JoinNextTooMany {
403 join_set_id: JoinSetId,
404 requested_ffqn: Option<FunctionFqn>,
407 },
408 #[display("Schedule({execution_id}, {schedule_at})")]
409 Schedule {
410 execution_id: ExecutionId,
411 schedule_at: HistoryEventScheduleAt, },
413 #[display("Stub({target_execution_id})")]
414 Stub {
415 target_execution_id: ExecutionIdDerived,
416 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
417 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
420}
421
422#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
423#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
424pub enum HistoryEventScheduleAt {
425 Now,
426 #[display("At(`{_0}`)")]
427 At(DateTime<Utc>),
428 #[display("In({_0:?})")]
429 In(Duration),
430}
431
432#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
433pub enum ScheduleAtConversionError {
434 #[error("source duration value is out of range")]
435 OutOfRangeError,
436}
437
438impl HistoryEventScheduleAt {
439 pub fn as_date_time(
440 &self,
441 now: DateTime<Utc>,
442 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
443 match self {
444 Self::Now => Ok(now),
445 Self::At(date_time) => Ok(*date_time),
446 Self::In(duration) => {
447 let time_delta = TimeDelta::from_std(*duration)
448 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
449 now.checked_add_signed(time_delta)
450 .ok_or(ScheduleAtConversionError::OutOfRangeError)
451 }
452 }
453 }
454}
455
456#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
457#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
458#[serde(tag = "type")]
459pub enum JoinSetRequest {
460 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
462 DelayRequest {
463 delay_id: DelayId,
464 expires_at: DateTime<Utc>,
465 schedule_at: HistoryEventScheduleAt,
466 },
467 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
469 ChildExecutionRequest {
470 child_execution_id: ExecutionIdDerived,
471 target_ffqn: FunctionFqn,
472 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
473 params: Params,
474 },
475}
476
477#[derive(Debug, Clone, thiserror::Error, PartialEq)]
479pub enum DbErrorGeneric {
480 #[error("database error: {0}")]
481 Uncategorized(StrVariant),
482 #[error("database was closed")]
483 Close,
484}
485
486#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
487pub enum DbErrorWriteNonRetriable {
488 #[error("validation failed: {0}")]
489 ValidationFailed(StrVariant),
490 #[error("illegal state: {0}")]
491 IllegalState(StrVariant),
492 #[error("version conflict: expected: {expected}, got: {requested}")]
493 VersionConflict {
494 expected: Version,
495 requested: Version,
496 },
497}
498
499#[derive(Debug, Clone, thiserror::Error, PartialEq)]
501pub enum DbErrorWrite {
502 #[error("cannot write - row not found")]
503 NotFound,
504 #[error("non-retriable error: {0}")]
505 NonRetriable(#[from] DbErrorWriteNonRetriable),
506 #[error(transparent)]
507 Generic(#[from] DbErrorGeneric),
508}
509
510#[derive(Debug, Clone, thiserror::Error, PartialEq)]
512pub enum DbErrorRead {
513 #[error("cannot read - row not found")]
514 NotFound,
515 #[error(transparent)]
516 Generic(#[from] DbErrorGeneric),
517}
518
519impl From<DbErrorRead> for DbErrorWrite {
520 fn from(value: DbErrorRead) -> DbErrorWrite {
521 match value {
522 DbErrorRead::NotFound => DbErrorWrite::NotFound,
523 DbErrorRead::Generic(err) => DbErrorWrite::Generic(err),
524 }
525 }
526}
527
528#[derive(Debug, thiserror::Error, PartialEq)]
529pub enum DbErrorReadWithTimeout {
530 #[error("timeout")]
531 Timeout,
532 #[error(transparent)]
533 DbErrorRead(#[from] DbErrorRead),
534}
535impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
536 fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
537 Self::from(DbErrorRead::from(value))
538 }
539}
540
541pub type AppendResponse = Version;
544pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
545
546#[derive(Debug, Clone)]
547pub struct LockedExecution {
548 pub execution_id: ExecutionId,
549 pub next_version: Version,
550 pub metadata: ExecutionMetadata,
551 pub locked_event: Locked,
552 pub ffqn: FunctionFqn,
553 pub params: Params,
554 pub event_history: Vec<(HistoryEvent, Version)>,
555 pub responses: Vec<JoinSetResponseEventOuter>,
556 pub parent: Option<(ExecutionId, JoinSetId)>,
557 pub intermittent_event_count: u32,
558}
559
560pub type LockPendingResponse = Vec<LockedExecution>;
561pub type AppendBatchResponse = Version;
562
563#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
564#[display("{event}")]
565pub struct AppendRequest {
566 pub created_at: DateTime<Utc>,
567 pub event: ExecutionEventInner,
568}
569
570#[derive(Debug, Clone)]
571pub struct CreateRequest {
572 pub created_at: DateTime<Utc>,
573 pub execution_id: ExecutionId,
574 pub ffqn: FunctionFqn,
575 pub params: Params,
576 pub parent: Option<(ExecutionId, JoinSetId)>,
577 pub scheduled_at: DateTime<Utc>,
578 pub component_id: ComponentId,
579 pub metadata: ExecutionMetadata,
580 pub scheduled_by: Option<ExecutionId>,
581}
582
583impl From<CreateRequest> for ExecutionEventInner {
584 fn from(value: CreateRequest) -> Self {
585 Self::Created {
586 ffqn: value.ffqn,
587 params: value.params,
588 parent: value.parent,
589 scheduled_at: value.scheduled_at,
590 component_id: value.component_id,
591 metadata: value.metadata,
592 scheduled_by: value.scheduled_by,
593 }
594 }
595}
596
597#[async_trait]
598pub trait DbPool: Send + Sync {
599 fn connection(&self) -> Box<dyn DbConnection>;
600}
601
602#[async_trait]
603pub trait DbPoolCloseable {
604 async fn close(self);
605}
606
607#[derive(Clone, Debug)]
608pub struct AppendEventsToExecution {
609 pub execution_id: ExecutionId,
610 pub version: Version,
611 pub batch: Vec<AppendRequest>,
612}
613
614#[derive(Clone, Debug)]
615pub struct AppendResponseToExecution {
616 pub parent_execution_id: ExecutionId,
617 pub created_at: DateTime<Utc>,
618 pub join_set_id: JoinSetId,
619 pub child_execution_id: ExecutionIdDerived,
620 pub finished_version: Version,
621 pub result: SupportedFunctionReturnValue,
622}
623
624#[async_trait]
625pub trait DbExecutor: Send + Sync {
626 #[expect(clippy::too_many_arguments)]
627 async fn lock_pending(
628 &self,
629 batch_size: usize,
630 pending_at_or_sooner: DateTime<Utc>,
631 ffqns: Arc<[FunctionFqn]>,
632 created_at: DateTime<Utc>,
633 component_id: ComponentId,
634 executor_id: ExecutorId,
635 lock_expires_at: DateTime<Utc>,
636 run_id: RunId,
637 retry_config: ComponentRetryConfig,
638 ) -> Result<LockPendingResponse, DbErrorGeneric>;
639
640 #[expect(clippy::too_many_arguments)]
642 async fn lock_one(
643 &self,
644 created_at: DateTime<Utc>,
645 component_id: ComponentId,
646 execution_id: &ExecutionId,
647 run_id: RunId,
648 version: Version,
649 executor_id: ExecutorId,
650 lock_expires_at: DateTime<Utc>,
651 retry_config: ComponentRetryConfig,
652 ) -> Result<LockedExecution, DbErrorWrite>;
653
654 async fn append(
657 &self,
658 execution_id: ExecutionId,
659 version: Version,
660 req: AppendRequest,
661 ) -> Result<AppendResponse, DbErrorWrite>;
662
663 async fn append_batch_respond_to_parent(
666 &self,
667 events: AppendEventsToExecution,
668 response: AppendResponseToExecution,
669 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
671
672 async fn wait_for_pending(
677 &self,
678 pending_at_or_sooner: DateTime<Utc>,
679 ffqns: Arc<[FunctionFqn]>,
680 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
681 );
682}
683
684#[async_trait]
685pub trait DbConnection: DbExecutor {
686 #[cfg(feature = "test")]
687 async fn append_response(
688 &self,
689 created_at: DateTime<Utc>,
690 execution_id: ExecutionId,
691 response_event: JoinSetResponseEvent,
692 ) -> Result<(), DbErrorWrite>;
693
694 async fn append_delay_response(
695 &self,
696 created_at: DateTime<Utc>,
697 execution_id: ExecutionId,
698 join_set_id: JoinSetId,
699 delay_id: DelayId,
700 outcome: Result<(), ()>, ) -> Result<(), DbErrorWrite>;
702
703 async fn append_batch(
706 &self,
707 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
709 execution_id: ExecutionId,
710 version: Version,
711 ) -> Result<AppendBatchResponse, DbErrorWrite>;
712
713 async fn append_batch_create_new_execution(
716 &self,
717 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
720 version: Version,
721 child_req: Vec<CreateRequest>,
722 ) -> Result<AppendBatchResponse, DbErrorWrite>;
723
724 #[cfg(feature = "test")]
725 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
727
728 async fn list_execution_events(
729 &self,
730 execution_id: &ExecutionId,
731 since: &Version,
732 max_length: VersionType,
733 include_backtrace_id: bool,
734 ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
735
736 async fn get_execution_event(
738 &self,
739 execution_id: &ExecutionId,
740 version: &Version,
741 ) -> Result<ExecutionEvent, DbErrorRead>;
742
743 async fn get_last_execution_event(
745 &self,
746 execution_id: &ExecutionId,
747 ) -> Result<ExecutionEvent, DbErrorRead>;
748
749 async fn get_create_request(
750 &self,
751 execution_id: &ExecutionId,
752 ) -> Result<CreateRequest, DbErrorRead> {
753 let execution_event = self
754 .get_execution_event(execution_id, &Version::new(0))
755 .await?;
756 if let ExecutionEventInner::Created {
757 ffqn,
758 params,
759 parent,
760 scheduled_at,
761 component_id,
762 metadata,
763 scheduled_by,
764 } = execution_event.event
765 {
766 Ok(CreateRequest {
767 created_at: execution_event.created_at,
768 execution_id: execution_id.clone(),
769 ffqn,
770 params,
771 parent,
772 scheduled_at,
773 component_id,
774 metadata,
775 scheduled_by,
776 })
777 } else {
778 error!(%execution_id, "Execution log must start with creation");
779 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized(
780 "execution log must start with creation".into(),
781 )))
782 }
783 }
784
785 async fn get_finished_result(
786 &self,
787 execution_id: &ExecutionId,
788 finished: PendingStateFinished,
789 ) -> Result<Option<SupportedFunctionReturnValue>, DbErrorRead> {
790 let last_event = self
791 .get_execution_event(execution_id, &Version::new(finished.version))
792 .await?;
793 if let ExecutionEventInner::Finished { result, .. } = last_event.event {
794 Ok(Some(result))
795 } else {
796 Ok(None)
797 }
798 }
799
800 async fn get_pending_state(
801 &self,
802 execution_id: &ExecutionId,
803 ) -> Result<PendingState, DbErrorRead>;
804
805 async fn get_expired_timers(
807 &self,
808 at: DateTime<Utc>,
809 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
810
811 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
813
814 async fn subscribe_to_next_responses(
819 &self,
820 execution_id: &ExecutionId,
821 start_idx: usize,
822 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
823 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
824
825 async fn wait_for_finished_result(
828 &self,
829 execution_id: &ExecutionId,
830 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
831 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
832
833 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
834
835 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
836
837 async fn get_backtrace(
840 &self,
841 execution_id: &ExecutionId,
842 filter: BacktraceFilter,
843 ) -> Result<BacktraceInfo, DbErrorRead>;
844
845 async fn list_executions(
848 &self,
849 ffqn: Option<FunctionFqn>,
850 top_level_only: bool,
851 pagination: ExecutionListPagination,
852 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
853
854 async fn list_responses(
858 &self,
859 execution_id: &ExecutionId,
860 pagination: Pagination<u32>,
861 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
862}
863
864pub async fn cancel_activity_with_retries(
866 db_connection: &dyn DbConnection,
867 child_execution_id_derived: &ExecutionIdDerived,
868 created_at: DateTime<Utc>,
869 mut retries: u8,
870) -> Result<CancelOutcome, DbErrorWrite> {
871 loop {
872 let res = cancel_activity(db_connection, child_execution_id_derived, created_at).await;
873 if res.is_ok() || retries == 0 {
874 return res;
875 }
876 retries -= 1;
877 }
878}
879
880#[derive(Clone, Copy, Debug, PartialEq, Eq)]
881pub enum CancelOutcome {
882 Success,
883 AlreadyFinished,
884}
885
886pub async fn cancel_delay(
887 db_connection: &dyn DbConnection,
888 delay_id: DelayId,
889 created_at: DateTime<Utc>,
890) -> Result<CancelOutcome, DbErrorWrite> {
891 let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
892 let res = db_connection
893 .append_delay_response(
894 created_at,
895 parent_execution_id,
896 join_set_id,
897 delay_id,
898 Err(()), )
900 .await;
901 match res {
902 Ok(()) => Ok(CancelOutcome::Success),
903 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState(_))) => {
904 Ok(CancelOutcome::AlreadyFinished)
905 }
906 Err(err) => Err(err),
907 }
908}
909
910pub async fn cancel_activity(
911 db_connection: &dyn DbConnection,
912 child_execution_id_derived: &ExecutionIdDerived,
913 created_at: DateTime<Utc>,
914) -> Result<CancelOutcome, DbErrorWrite> {
915 debug!("Determining cancellation state of {child_execution_id_derived}");
916 let (parent_execution_id, join_set_id) = child_execution_id_derived.split_to_parts();
917 let child_execution_id = ExecutionId::Derived(child_execution_id_derived.clone());
918 let last_event = db_connection
919 .get_last_execution_event(&child_execution_id)
920 .await
921 .map_err(DbErrorWrite::from)?;
922 if matches!(last_event.event, ExecutionEventInner::Finished { .. }) {
923 debug!("Not cancelling, {child_execution_id_derived} is already finished");
924 Ok(CancelOutcome::AlreadyFinished)
925 } else {
926 let finished_version = last_event.version.increment();
927 debug!("Cancelling activity {child_execution_id_derived} at {finished_version}");
928 let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
929 reason: None,
930 kind: ExecutionFailureKind::Cancelled,
931 detail: None,
932 });
933 db_connection
934 .append_batch_respond_to_parent(
935 AppendEventsToExecution {
936 execution_id: child_execution_id,
937 version: finished_version.clone(),
938 batch: vec![AppendRequest {
939 created_at,
940 event: ExecutionEventInner::Finished {
941 result: child_result.clone(),
942 http_client_traces: None,
943 },
944 }],
945 },
946 AppendResponseToExecution {
947 parent_execution_id,
948 created_at,
949 join_set_id: join_set_id.clone(),
950 child_execution_id: child_execution_id_derived.clone(),
951 finished_version,
952 result: child_result,
953 },
954 created_at,
955 )
956 .await?;
957 debug!("Cancelled {child_execution_id_derived}");
958 Ok(CancelOutcome::Success)
959 }
960}
961
962#[derive(Clone)]
963pub enum BacktraceFilter {
964 First,
965 Last,
966 Specific(Version),
967}
968
969pub struct BacktraceInfo {
970 pub execution_id: ExecutionId,
971 pub component_id: ComponentId,
972 pub version_min_including: Version,
973 pub version_max_excluding: Version,
974 pub wasm_backtrace: WasmBacktrace,
975}
976
977#[derive(Serialize, Deserialize, Debug, Clone)]
978pub struct WasmBacktrace {
979 pub frames: Vec<FrameInfo>,
980}
981
982#[derive(Serialize, Deserialize, Debug, Clone)]
983pub struct FrameInfo {
984 pub module: String,
985 pub func_name: String,
986 pub symbols: Vec<FrameSymbol>,
987}
988
989#[derive(Serialize, Deserialize, Debug, Clone)]
990pub struct FrameSymbol {
991 pub func_name: Option<String>,
992 pub file: Option<String>,
993 pub line: Option<u32>,
994 pub col: Option<u32>,
995}
996
997mod wasm_backtrace {
998 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
999
1000 impl WasmBacktrace {
1001 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1002 if backtrace.frames().is_empty() {
1003 None
1004 } else {
1005 Some(Self {
1006 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1007 })
1008 }
1009 }
1010 }
1011
1012 impl From<&wasmtime::FrameInfo> for FrameInfo {
1013 fn from(frame: &wasmtime::FrameInfo) -> Self {
1014 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1015 let mut func_name = String::new();
1016 wasmtime_environ::demangle_function_name_or_index(
1017 &mut func_name,
1018 frame.func_name(),
1019 frame.func_index() as usize,
1020 )
1021 .expect("writing to string must succeed");
1022 Self {
1023 module: module_name,
1024 func_name,
1025 symbols: frame
1026 .symbols()
1027 .iter()
1028 .map(std::convert::Into::into)
1029 .collect(),
1030 }
1031 }
1032 }
1033
1034 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1035 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1036 let func_name = symbol.name().map(|name| {
1037 let mut writer = String::new();
1038 wasmtime_environ::demangle_function_name(&mut writer, name)
1039 .expect("writing to string must succeed");
1040 writer
1041 });
1042
1043 Self {
1044 func_name,
1045 file: symbol.file().map(ToString::to_string),
1046 line: symbol.line(),
1047 col: symbol.column(),
1048 }
1049 }
1050 }
1051}
1052
1053pub type ResponseCursorType = u32; #[derive(Debug, Clone)]
1056pub struct ResponseWithCursor {
1057 pub event: JoinSetResponseEventOuter,
1058 pub cursor: ResponseCursorType,
1059}
1060
1061#[derive(Debug)]
1062pub struct ExecutionWithState {
1063 pub execution_id: ExecutionId,
1064 pub ffqn: FunctionFqn,
1065 pub pending_state: PendingState,
1066 pub created_at: DateTime<Utc>,
1067 pub scheduled_at: DateTime<Utc>,
1068}
1069
1070pub enum ExecutionListPagination {
1071 CreatedBy(Pagination<Option<DateTime<Utc>>>),
1072 ExecutionId(Pagination<Option<ExecutionId>>),
1073}
1074
1075#[derive(Debug, Clone, Copy)]
1076pub enum Pagination<T> {
1077 NewerThan {
1078 length: ResponseCursorType,
1079 cursor: T,
1080 including_cursor: bool,
1081 },
1082 OlderThan {
1083 length: ResponseCursorType,
1084 cursor: T,
1085 including_cursor: bool,
1086 },
1087}
1088impl<T> Pagination<T> {
1089 pub fn length(&self) -> ResponseCursorType {
1090 match self {
1091 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1092 }
1093 }
1094 pub fn rel(&self) -> &'static str {
1095 match self {
1096 Pagination::NewerThan {
1097 including_cursor: false,
1098 ..
1099 } => ">",
1100 Pagination::NewerThan {
1101 including_cursor: true,
1102 ..
1103 } => ">=",
1104 Pagination::OlderThan {
1105 including_cursor: false,
1106 ..
1107 } => "<",
1108 Pagination::OlderThan {
1109 including_cursor: true,
1110 ..
1111 } => "<=",
1112 }
1113 }
1114 pub fn is_desc(&self) -> bool {
1115 matches!(self, Pagination::OlderThan { .. })
1116 }
1117}
1118
1119#[cfg(feature = "test")]
1120pub async fn wait_for_pending_state_fn<T: Debug>(
1121 db_connection: &dyn DbConnection,
1122 execution_id: &ExecutionId,
1123 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1124 timeout: Option<Duration>,
1125) -> Result<T, DbErrorReadWithTimeout> {
1126 tracing::trace!(%execution_id, "Waiting for predicate");
1127 let fut = async move {
1128 loop {
1129 let execution_log = db_connection.get(execution_id).await?;
1130 if let Some(t) = predicate(execution_log) {
1131 tracing::debug!(%execution_id, "Found: {t:?}");
1132 return Ok(t);
1133 }
1134 tokio::time::sleep(Duration::from_millis(10)).await;
1135 }
1136 };
1137
1138 if let Some(timeout) = timeout {
1139 tokio::select! { res = fut => res,
1141 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1142 }
1143 } else {
1144 fut.await
1145 }
1146}
1147
1148#[derive(Debug, Clone, PartialEq, Eq)]
1149pub enum ExpiredTimer {
1150 Lock(ExpiredLock),
1151 Delay(ExpiredDelay),
1152}
1153
1154#[derive(Debug, Clone, PartialEq, Eq)]
1155pub struct ExpiredLock {
1156 pub execution_id: ExecutionId,
1157 pub locked_at_version: Version,
1159 pub next_version: Version,
1160 pub intermittent_event_count: u32,
1162 pub max_retries: u32,
1163 pub retry_exp_backoff: Duration,
1164 pub locked_by: LockedBy,
1165}
1166
1167#[derive(Debug, Clone, PartialEq, Eq)]
1168pub struct ExpiredDelay {
1169 pub execution_id: ExecutionId,
1170 pub join_set_id: JoinSetId,
1171 pub delay_id: DelayId,
1172}
1173
1174#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq)]
1175#[cfg_attr(feature = "test", derive(Serialize))]
1176#[cfg_attr(feature = "test", serde(tag = "type"))]
1177pub enum PendingState {
1178 Locked(PendingStateLocked),
1179 #[display("PendingAt(`{scheduled_at}`)")]
1180 PendingAt {
1181 scheduled_at: DateTime<Utc>,
1182 last_lock: Option<LockedBy>, }, #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1185 BlockedByJoinSet {
1187 join_set_id: JoinSetId,
1188 lock_expires_at: DateTime<Utc>,
1190 closing: bool,
1192 },
1193 #[display("Finished({finished})")]
1194 Finished {
1195 finished: PendingStateFinished,
1196 },
1197}
1198
1199#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq)]
1200#[cfg_attr(feature = "test", derive(Serialize))]
1201#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1202pub struct PendingStateLocked {
1203 pub locked_by: LockedBy,
1204 pub lock_expires_at: DateTime<Utc>,
1205}
1206
1207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1208pub struct LockedBy {
1209 pub executor_id: ExecutorId,
1210 pub run_id: RunId,
1211}
1212
1213#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1214pub struct PendingStateFinished {
1215 pub version: VersionType, pub finished_at: DateTime<Utc>,
1217 pub result_kind: PendingStateFinishedResultKind,
1218}
1219impl Display for PendingStateFinished {
1220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1221 match self.result_kind {
1222 PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1223 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1224 }
1225 }
1226}
1227impl Display for SupportedFunctionReturnValue {
1228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1229 match self.as_pending_state_finished_result() {
1230 PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1231 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1232 }
1233 }
1234}
1235
1236#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1238#[serde(rename_all = "snake_case")]
1239pub enum PendingStateFinishedResultKind {
1240 Ok,
1241 Err(PendingStateFinishedError),
1242}
1243impl PendingStateFinishedResultKind {
1244 pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1245 match self {
1246 PendingStateFinishedResultKind::Ok => Ok(()),
1247 PendingStateFinishedResultKind::Err(err) => Err(err),
1248 }
1249 }
1250}
1251
1252impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1253 fn from(result: &SupportedFunctionReturnValue) -> Self {
1254 result.as_pending_state_finished_result()
1255 }
1256}
1257
1258#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1259pub enum PendingStateFinishedError {
1260 #[display("execution failed: {_0}")]
1261 ExecutionFailure(ExecutionFailureKind),
1262 #[display("returned error variant")]
1263 FallibleError,
1264}
1265
1266impl PendingState {
1267 pub fn can_append_lock(
1268 &self,
1269 created_at: DateTime<Utc>,
1270 executor_id: ExecutorId,
1271 run_id: RunId,
1272 lock_expires_at: DateTime<Utc>,
1273 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1274 if lock_expires_at <= created_at {
1275 return Err(DbErrorWriteNonRetriable::ValidationFailed(
1276 "invalid expiry date".into(),
1277 ));
1278 }
1279 match self {
1280 PendingState::PendingAt {
1281 scheduled_at,
1282 last_lock,
1283 } => {
1284 if *scheduled_at <= created_at {
1285 Ok(LockKind::CreatingNewLock)
1287 } else if let Some(LockedBy {
1288 executor_id: last_executor_id,
1289 run_id: last_run_id,
1290 }) = last_lock
1291 && executor_id == *last_executor_id
1292 && run_id == *last_run_id
1293 {
1294 Ok(LockKind::Extending)
1296 } else {
1297 Err(DbErrorWriteNonRetriable::ValidationFailed(
1298 "cannot lock, not yet pending".into(),
1299 ))
1300 }
1301 }
1302 PendingState::Locked(PendingStateLocked {
1303 locked_by:
1304 LockedBy {
1305 executor_id: current_pending_state_executor_id,
1306 run_id: current_pending_state_run_id,
1307 },
1308 lock_expires_at: _,
1309 }) => {
1310 if executor_id == *current_pending_state_executor_id
1311 && run_id == *current_pending_state_run_id
1312 {
1313 Ok(LockKind::Extending)
1315 } else {
1316 Err(DbErrorWriteNonRetriable::IllegalState(
1317 "cannot lock, already locked".into(),
1318 ))
1319 }
1320 }
1321 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1322 "cannot append Locked event when in BlockedByJoinSet state".into(),
1323 )),
1324 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1325 "already finished".into(),
1326 )),
1327 }
1328 }
1329
1330 #[must_use]
1331 pub fn is_finished(&self) -> bool {
1332 matches!(self, PendingState::Finished { .. })
1333 }
1334}
1335
1336#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1337pub enum LockKind {
1338 Extending,
1339 CreatingNewLock,
1340}
1341
1342pub mod http_client_trace {
1343 use chrono::{DateTime, Utc};
1344 use serde::{Deserialize, Serialize};
1345
1346 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1347 pub struct HttpClientTrace {
1348 pub req: RequestTrace,
1349 pub resp: Option<ResponseTrace>,
1350 }
1351
1352 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1353 pub struct RequestTrace {
1354 pub sent_at: DateTime<Utc>,
1355 pub uri: String,
1356 pub method: String,
1357 }
1358
1359 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1360 pub struct ResponseTrace {
1361 pub finished_at: DateTime<Utc>,
1362 pub status: Result<u16, String>,
1363 }
1364}
1365
1366#[cfg(test)]
1367mod tests {
1368 use super::HistoryEventScheduleAt;
1369 use super::PendingStateFinished;
1370 use super::PendingStateFinishedError;
1371 use super::PendingStateFinishedResultKind;
1372 use crate::ExecutionFailureKind;
1373 use crate::SupportedFunctionReturnValue;
1374 use chrono::DateTime;
1375 use chrono::Datelike;
1376 use chrono::Utc;
1377 use insta::assert_snapshot;
1378 use rstest::rstest;
1379 use std::time::Duration;
1380 use val_json::type_wrapper::TypeWrapper;
1381 use val_json::wast_val::WastVal;
1382 use val_json::wast_val::WastValWithType;
1383
1384 #[rstest(expected => [
1385 PendingStateFinishedResultKind::Ok,
1386 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1387 ])]
1388 #[test]
1389 fn serde_pending_state_finished_result_kind_should_work(
1390 expected: PendingStateFinishedResultKind,
1391 ) {
1392 let ser = serde_json::to_string(&expected).unwrap();
1393 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1394 assert_eq!(expected, actual);
1395 }
1396
1397 #[rstest(result_kind => [
1398 PendingStateFinishedResultKind::Ok,
1399 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1400 ])]
1401 #[test]
1402 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1403 let expected = PendingStateFinished {
1404 version: 0,
1405 finished_at: Utc::now(),
1406 result_kind,
1407 };
1408
1409 let ser = serde_json::to_string(&expected).unwrap();
1410 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1411 assert_eq!(expected, actual);
1412 }
1413
1414 #[test]
1415 fn join_set_deser_with_result_ok_option_none_should_work() {
1416 let expected = SupportedFunctionReturnValue::Ok {
1417 ok: Some(WastValWithType {
1418 r#type: TypeWrapper::Result {
1419 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1420 err: Some(Box::new(TypeWrapper::String)),
1421 },
1422 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1423 }),
1424 };
1425 let json = serde_json::to_string(&expected).unwrap();
1426 assert_snapshot!(json);
1427
1428 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1429
1430 assert_eq!(expected, actual);
1431 }
1432
1433 #[test]
1434 fn as_date_time_should_work_with_duration_u32_max_secs() {
1435 let duration = Duration::from_secs(u64::from(u32::MAX));
1436 let schedule_at = HistoryEventScheduleAt::In(duration);
1437 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1438 assert_eq!(2106, resolved.year());
1439 }
1440
1441 const MILLIS_PER_SEC: i64 = 1000;
1442 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1443
1444 #[test]
1445 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1446 let duration = Duration::from_secs(
1448 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1449 );
1450 let schedule_at = HistoryEventScheduleAt::In(duration);
1451 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1452 }
1453}