1use crate::ClosingStrategy;
2use crate::ComponentId;
3use crate::ExecutionId;
4use crate::ExecutionMetadata;
5use crate::FunctionFqn;
6use crate::JoinSetId;
7use crate::Params;
8use crate::StrVariant;
9use crate::SupportedFunctionReturnValue;
10use crate::prefixed_ulid::DelayId;
11use crate::prefixed_ulid::ExecutionIdDerived;
12use crate::prefixed_ulid::ExecutorId;
13use crate::prefixed_ulid::RunId;
14use assert_matches::assert_matches;
15use async_trait::async_trait;
16use chrono::TimeDelta;
17use chrono::{DateTime, Utc};
18use http_client_trace::HttpClientTrace;
19use serde::Deserialize;
20use serde::Serialize;
21use std::fmt::Debug;
22use std::fmt::Display;
23use std::pin::Pin;
24use std::str::FromStr;
25use std::sync::Arc;
26use std::time::Duration;
27use strum::IntoStaticStr;
28use tracing::error;
29
30#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
32pub struct ExecutionLog {
33 pub execution_id: ExecutionId,
34 pub events: Vec<ExecutionEvent>,
35 pub responses: Vec<JoinSetResponseEventOuter>,
36 pub next_version: Version, pub pending_state: PendingState,
38}
39
40impl ExecutionLog {
41 #[must_use]
42 pub fn can_be_retried_after(
43 temporary_event_count: u32,
44 max_retries: u32,
45 retry_exp_backoff: Duration,
46 ) -> Option<Duration> {
47 if temporary_event_count <= max_retries {
49 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
51 Some(duration)
52 } else {
53 None
54 }
55 }
56
57 #[must_use]
58 pub fn compute_retry_duration_when_retrying_forever(
59 temporary_event_count: u32,
60 retry_exp_backoff: Duration,
61 ) -> Duration {
62 Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
63 .expect("`max_retries` set to MAX must never return None")
64 }
65
66 #[must_use]
67 pub fn retry_exp_backoff(&self) -> Duration {
68 assert_matches!(self.events.first(), Some(ExecutionEvent {
69 event: ExecutionEventInner::Created { retry_exp_backoff, .. },
70 ..
71 }) => *retry_exp_backoff)
72 }
73
74 #[must_use]
75 pub fn max_retries(&self) -> u32 {
76 assert_matches!(self.events.first(), Some(ExecutionEvent {
77 event: ExecutionEventInner::Created { max_retries, .. },
78 ..
79 }) => *max_retries)
80 }
81
82 #[must_use]
83 pub fn ffqn(&self) -> &FunctionFqn {
84 assert_matches!(self.events.first(), Some(ExecutionEvent {
85 event: ExecutionEventInner::Created { ffqn, .. },
86 ..
87 }) => ffqn)
88 }
89
90 #[must_use]
91 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
92 assert_matches!(self.events.first(), Some(ExecutionEvent {
93 event: ExecutionEventInner::Created { parent, .. },
94 ..
95 }) => parent.clone())
96 }
97
98 #[must_use]
99 pub fn last_event(&self) -> &ExecutionEvent {
100 self.events.last().expect("must contain at least one event")
101 }
102
103 #[must_use]
104 pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
105 if let ExecutionEvent {
106 event: ExecutionEventInner::Finished { result, .. },
107 ..
108 } = self.events.pop().expect("must contain at least one event")
109 {
110 Some(result)
111 } else {
112 None
113 }
114 }
115
116 pub fn event_history(&self) -> impl Iterator<Item = HistoryEvent> + '_ {
117 self.events.iter().filter_map(|event| {
118 if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
119 Some(eh.clone())
120 } else {
121 None
122 }
123 })
124 }
125
126 #[cfg(feature = "test")]
127 #[must_use]
128 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
129 self.events
130 .iter()
131 .find_map(move |event| match &event.event {
132 ExecutionEventInner::HistoryEvent {
133 event:
134 HistoryEvent::JoinSetRequest {
135 join_set_id: found,
136 request,
137 },
138 ..
139 } if *join_set_id == *found => Some(request),
140 _ => None,
141 })
142 }
143}
144
145pub type VersionType = u32;
146#[derive(
147 Debug,
148 Default,
149 Clone,
150 PartialEq,
151 Eq,
152 Hash,
153 derive_more::Display,
154 derive_more::Into,
155 serde::Serialize,
156 serde::Deserialize,
157)]
158#[serde(transparent)]
159pub struct Version(pub VersionType);
160impl Version {
161 #[must_use]
162 pub fn new(arg: VersionType) -> Version {
163 Version(arg)
164 }
165
166 #[must_use]
167 pub fn increment(&self) -> Version {
168 Version(self.0 + 1)
169 }
170}
171
172#[derive(
173 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
174)]
175#[display("{event}")]
176pub struct ExecutionEvent {
177 pub created_at: DateTime<Utc>,
178 pub event: ExecutionEventInner,
179 pub backtrace_id: Option<Version>,
180}
181
182#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
184pub struct JoinSetResponseEventOuter {
185 pub created_at: DateTime<Utc>,
186 pub event: JoinSetResponseEvent,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
190pub struct JoinSetResponseEvent {
191 pub join_set_id: JoinSetId,
192 pub event: JoinSetResponse,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
197#[serde(tag = "type")]
198pub enum JoinSetResponse {
199 DelayFinished {
200 delay_id: DelayId,
201 },
202 ChildExecutionFinished {
203 child_execution_id: ExecutionIdDerived,
204 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
205 finished_version: Version,
206 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
207 result: SupportedFunctionReturnValue,
208 },
209}
210
211pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
212 ffqn: FunctionFqn::new_static("", ""),
213 params: Params::empty(),
214 parent: None,
215 scheduled_at: DateTime::from_timestamp_nanos(0),
216 retry_exp_backoff: Duration::ZERO,
217 max_retries: 0,
218 component_id: ComponentId::dummy_activity(),
219 metadata: ExecutionMetadata::empty(),
220 scheduled_by: None,
221};
222pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
223 event: HistoryEvent::JoinSetCreate {
224 join_set_id: JoinSetId {
225 kind: crate::JoinSetKind::OneOff,
226 name: StrVariant::empty(),
227 },
228 closing_strategy: ClosingStrategy::Complete,
229 },
230};
231
232#[derive(
233 Clone,
234 derive_more::Debug,
235 derive_more::Display,
236 PartialEq,
237 Eq,
238 Serialize,
239 Deserialize,
240 IntoStaticStr,
241)]
242#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
243#[allow(clippy::large_enum_variant)]
244pub enum ExecutionEventInner {
245 #[display("Created({ffqn}, `{scheduled_at}`)")]
249 Created {
250 ffqn: FunctionFqn,
251 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
252 #[debug(skip)]
253 params: Params,
254 parent: Option<(ExecutionId, JoinSetId)>,
255 scheduled_at: DateTime<Utc>,
256 retry_exp_backoff: Duration,
257 max_retries: u32,
258 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
259 component_id: ComponentId,
260 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
261 metadata: ExecutionMetadata,
262 scheduled_by: Option<ExecutionId>,
263 },
264 #[display("Locked(`{lock_expires_at}`, {component_id})")]
268 Locked {
269 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
270 component_id: ComponentId,
271 executor_id: ExecutorId,
272 run_id: RunId,
273 lock_expires_at: DateTime<Utc>,
274 },
275 #[display("Unlocked(`{backoff_expires_at}`)")]
280 Unlocked {
281 backoff_expires_at: DateTime<Utc>,
282 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
283 reason: StrVariant,
284 },
285 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
288 TemporarilyFailed {
289 backoff_expires_at: DateTime<Utc>,
290 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
291 reason_full: StrVariant,
292 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason inner")))]
293 reason_inner: StrVariant,
294 detail: Option<String>,
295 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
296 http_client_traces: Option<Vec<HttpClientTrace>>,
297 },
298 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
301 TemporarilyTimedOut {
302 backoff_expires_at: DateTime<Utc>,
303 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
304 http_client_traces: Option<Vec<HttpClientTrace>>,
305 },
306 #[display("Finished")]
310 Finished {
311 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
312 result: SupportedFunctionReturnValue,
313 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
314 http_client_traces: Option<Vec<HttpClientTrace>>,
315 },
316
317 #[display("HistoryEvent({event})")]
318 HistoryEvent { event: HistoryEvent },
319}
320
321impl ExecutionEventInner {
322 #[must_use]
323 pub fn is_temporary_event(&self) -> bool {
324 matches!(
325 self,
326 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
327 )
328 }
329
330 #[must_use]
331 pub fn variant(&self) -> &'static str {
332 Into::<&'static str>::into(self)
333 }
334
335 #[must_use]
336 pub fn join_set_id(&self) -> Option<&JoinSetId> {
337 match self {
338 Self::Created {
339 parent: Some((_parent_id, join_set_id)),
340 ..
341 } => Some(join_set_id),
342 Self::HistoryEvent {
343 event:
344 HistoryEvent::JoinSetCreate { join_set_id, .. }
345 | HistoryEvent::JoinSetRequest { join_set_id, .. }
346 | HistoryEvent::JoinNext { join_set_id, .. },
347 } => Some(join_set_id),
348 _ => None,
349 }
350 }
351}
352
353#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
354#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
355#[serde(tag = "type")]
356pub enum PersistKind {
357 #[display("RandomU64({min}, {max_inclusive})")]
358 RandomU64 { min: u64, max_inclusive: u64 },
359 #[display("RandomString({min_length}, {max_length_exclusive})")]
360 RandomString {
361 min_length: u64,
362 max_length_exclusive: u64,
363 },
364}
365
366#[must_use]
367pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
368 value.to_be_bytes()
369}
370
371#[must_use]
372pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
373 u64::from_be_bytes(bytes)
374}
375
376#[derive(
377 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
378)]
379#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
380#[serde(tag = "type")]
381pub enum HistoryEvent {
383 #[display("Persist")]
385 Persist {
386 #[debug(skip)]
387 value: Vec<u8>, kind: PersistKind,
389 },
390 #[display("JoinSetCreate({join_set_id})")]
391 JoinSetCreate {
392 join_set_id: JoinSetId,
393 closing_strategy: ClosingStrategy,
394 },
395 #[display("JoinSetRequest({join_set_id}, {request})")]
396 JoinSetRequest {
397 join_set_id: JoinSetId,
398 request: JoinSetRequest,
399 },
400 #[display("JoinNext({join_set_id})")]
406 JoinNext {
407 join_set_id: JoinSetId,
408 run_expires_at: DateTime<Utc>,
411 requested_ffqn: Option<FunctionFqn>,
414 closing: bool,
416 },
417 #[display("JoinNextTooMany({join_set_id})")]
419 JoinNextTooMany {
420 join_set_id: JoinSetId,
421 requested_ffqn: Option<FunctionFqn>,
424 },
425 #[display("Schedule({execution_id}, {schedule_at})")]
426 Schedule {
427 execution_id: ExecutionId,
428 schedule_at: HistoryEventScheduleAt, },
430 #[display("Stub({target_execution_id})")]
431 Stub {
432 target_execution_id: ExecutionIdDerived,
433 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
434 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
437}
438
439#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
440#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
441pub enum HistoryEventScheduleAt {
442 Now,
443 At(DateTime<Utc>),
444 #[display("In({_0:?})")]
445 In(Duration),
446}
447
448#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
449pub enum ScheduleAtConversionError {
450 #[error("source duration value is out of range")]
451 OutOfRangeError,
452}
453
454impl HistoryEventScheduleAt {
455 pub fn as_date_time(
456 &self,
457 now: DateTime<Utc>,
458 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
459 match self {
460 Self::Now => Ok(now),
461 Self::At(date_time) => Ok(*date_time),
462 Self::In(duration) => {
463 let time_delta = TimeDelta::from_std(*duration)
464 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
465 now.checked_add_signed(time_delta)
466 .ok_or(ScheduleAtConversionError::OutOfRangeError)
467 }
468 }
469 }
470}
471
472#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
473#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
474#[serde(tag = "type")]
475pub enum JoinSetRequest {
476 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
478 DelayRequest {
479 delay_id: DelayId,
480 expires_at: DateTime<Utc>,
481 schedule_at: HistoryEventScheduleAt,
482 },
483 #[display("ChildExecutionRequest({child_execution_id})")]
485 ChildExecutionRequest {
486 child_execution_id: ExecutionIdDerived,
487 },
488}
489
490#[derive(Debug, Clone, thiserror::Error, PartialEq)]
492pub enum DbErrorGeneric {
493 #[error("database error: {0}")]
494 Uncategorized(StrVariant), #[error("database was closed")]
496 Close,
497}
498
499#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
500pub enum DbErrorWritePermanent {
501 #[error("validation failed: {0}")]
503 ValidationFailed(StrVariant),
504 #[error("cannot write: {reason}")]
505 CannotWrite {
506 reason: StrVariant,
507 expected_version: Option<Version>,
508 },
509}
510
511#[derive(Debug, Clone, thiserror::Error, PartialEq)]
513pub enum DbErrorWrite {
514 #[error("cannot write - row not found")]
515 NotFound,
516 #[error("permanent error: {0}")]
518 Permanent(#[from] DbErrorWritePermanent),
519 #[error(transparent)]
520 DbErrorGeneric(#[from] DbErrorGeneric),
521}
522
523#[derive(Debug, Clone, thiserror::Error, PartialEq)]
525pub enum DbErrorRead {
526 #[error("cannot read - row not found")]
527 NotFound,
528 #[error(transparent)]
529 DbErrorGeneric(#[from] DbErrorGeneric),
530}
531
532impl From<DbErrorRead> for DbErrorWrite {
533 fn from(value: DbErrorRead) -> DbErrorWrite {
534 match value {
535 DbErrorRead::NotFound => DbErrorWrite::NotFound,
536 DbErrorRead::DbErrorGeneric(err) => DbErrorWrite::DbErrorGeneric(err),
537 }
538 }
539}
540
541#[derive(Debug, thiserror::Error, PartialEq)]
542pub enum DbErrorReadWithTimeout {
543 #[error("timeout")]
544 Timeout,
545 #[error(transparent)]
546 DbErrorRead(#[from] DbErrorRead),
547}
548impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
549 fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
550 Self::from(DbErrorRead::from(value))
551 }
552}
553
554pub type AppendResponse = Version;
557pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
558
559#[derive(Debug, Clone)]
560pub struct LockedExecution {
561 pub execution_id: ExecutionId,
562 pub next_version: Version,
563 pub metadata: ExecutionMetadata,
564 pub run_id: RunId,
565 pub ffqn: FunctionFqn,
566 pub params: Params,
567 pub event_history: Vec<HistoryEvent>,
568 pub responses: Vec<JoinSetResponseEventOuter>,
569 pub retry_exp_backoff: Duration,
570 pub max_retries: u32,
571 pub parent: Option<(ExecutionId, JoinSetId)>,
572 pub intermittent_event_count: u32,
573}
574
575pub type LockPendingResponse = Vec<LockedExecution>;
576pub type AppendBatchResponse = Version;
577
578#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
579#[display("{event}")]
580pub struct AppendRequest {
581 pub created_at: DateTime<Utc>,
582 pub event: ExecutionEventInner,
583}
584
585#[derive(Debug, Clone)]
586pub struct CreateRequest {
587 pub created_at: DateTime<Utc>,
588 pub execution_id: ExecutionId,
589 pub ffqn: FunctionFqn,
590 pub params: Params,
591 pub parent: Option<(ExecutionId, JoinSetId)>,
592 pub scheduled_at: DateTime<Utc>,
593 pub retry_exp_backoff: Duration,
594 pub max_retries: u32,
595 pub component_id: ComponentId,
596 pub metadata: ExecutionMetadata,
597 pub scheduled_by: Option<ExecutionId>,
598}
599
600impl From<CreateRequest> for ExecutionEventInner {
601 fn from(value: CreateRequest) -> Self {
602 Self::Created {
603 ffqn: value.ffqn,
604 params: value.params,
605 parent: value.parent,
606 scheduled_at: value.scheduled_at,
607 retry_exp_backoff: value.retry_exp_backoff,
608 max_retries: value.max_retries,
609 component_id: value.component_id,
610 metadata: value.metadata,
611 scheduled_by: value.scheduled_by,
612 }
613 }
614}
615
616#[async_trait]
617pub trait DbPool: Send + Sync {
618 fn connection(&self) -> Box<dyn DbConnection>;
619}
620
621#[async_trait]
622pub trait DbPoolCloseable {
623 async fn close(self);
624}
625
626#[async_trait]
627pub trait DbExecutor: Send + Sync {
628 #[expect(clippy::too_many_arguments)]
629 async fn lock_pending(
630 &self,
631 batch_size: usize,
632 pending_at_or_sooner: DateTime<Utc>,
633 ffqns: Arc<[FunctionFqn]>,
634 created_at: DateTime<Utc>,
635 component_id: ComponentId,
636 executor_id: ExecutorId,
637 lock_expires_at: DateTime<Utc>,
638 run_id: RunId,
639 ) -> Result<LockPendingResponse, DbErrorGeneric>;
640
641 #[expect(clippy::too_many_arguments)]
643 async fn lock_one(
644 &self,
645 created_at: DateTime<Utc>,
646 component_id: ComponentId,
647 execution_id: &ExecutionId,
648 run_id: RunId,
649 version: Version,
650 executor_id: ExecutorId,
651 lock_expires_at: DateTime<Utc>,
652 ) -> Result<LockedExecution, DbErrorWrite>;
653
654 async fn append(
657 &self,
658 execution_id: ExecutionId,
659 version: Version,
660 req: AppendRequest,
661 ) -> Result<AppendResponse, DbErrorWrite>;
662
663 async fn append_batch_respond_to_parent(
666 &self,
667 execution_id: ExecutionIdDerived,
668 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
670 version: Version,
671 parent_execution_id: ExecutionId,
672 parent_response_event: JoinSetResponseEventOuter,
673 ) -> Result<AppendBatchResponse, DbErrorWrite>;
674
675 async fn wait_for_pending(
680 &self,
681 pending_at_or_sooner: DateTime<Utc>,
682 ffqns: Arc<[FunctionFqn]>,
683 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
684 );
685}
686
687#[async_trait]
688pub trait DbConnection: DbExecutor {
689 async fn append_response(
690 &self,
691 created_at: DateTime<Utc>,
692 execution_id: ExecutionId,
693 response_event: JoinSetResponseEvent,
694 ) -> Result<(), DbErrorWrite>;
695
696 async fn append_batch(
699 &self,
700 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
702 execution_id: ExecutionId,
703 version: Version,
704 ) -> Result<AppendBatchResponse, DbErrorWrite>;
705
706 async fn append_batch_create_new_execution(
709 &self,
710 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
713 version: Version,
714 child_req: Vec<CreateRequest>,
715 ) -> Result<AppendBatchResponse, DbErrorWrite>;
716
717 #[cfg(feature = "test")]
718 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
720
721 async fn list_execution_events(
722 &self,
723 execution_id: &ExecutionId,
724 since: &Version,
725 max_length: VersionType,
726 include_backtrace_id: bool,
727 ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
728
729 async fn get_execution_event(
731 &self,
732 execution_id: &ExecutionId,
733 version: &Version,
734 ) -> Result<ExecutionEvent, DbErrorRead>;
735
736 async fn get_create_request(
737 &self,
738 execution_id: &ExecutionId,
739 ) -> Result<CreateRequest, DbErrorRead> {
740 let execution_event = self
741 .get_execution_event(execution_id, &Version::new(0))
742 .await?;
743 if let ExecutionEventInner::Created {
744 ffqn,
745 params,
746 parent,
747 scheduled_at,
748 retry_exp_backoff,
749 max_retries,
750 component_id,
751 metadata,
752 scheduled_by,
753 } = execution_event.event
754 {
755 Ok(CreateRequest {
756 created_at: execution_event.created_at,
757 execution_id: execution_id.clone(),
758 ffqn,
759 params,
760 parent,
761 scheduled_at,
762 retry_exp_backoff,
763 max_retries,
764 component_id,
765 metadata,
766 scheduled_by,
767 })
768 } else {
769 error!(%execution_id, "Execution log must start with creation");
770 Err(DbErrorRead::DbErrorGeneric(DbErrorGeneric::Uncategorized(
771 "execution log must start with creation".into(),
772 )))
773 }
774 }
775
776 async fn get_finished_result(
777 &self,
778 execution_id: &ExecutionId,
779 finished: PendingStateFinished,
780 ) -> Result<Option<SupportedFunctionReturnValue>, DbErrorRead> {
781 let last_event = self
782 .get_execution_event(execution_id, &Version::new(finished.version))
783 .await?;
784 if let ExecutionEventInner::Finished { result, .. } = last_event.event {
785 Ok(Some(result))
786 } else {
787 Ok(None)
788 }
789 }
790
791 async fn get_pending_state(
792 &self,
793 execution_id: &ExecutionId,
794 ) -> Result<PendingState, DbErrorRead>;
795
796 async fn get_expired_timers(
798 &self,
799 at: DateTime<Utc>,
800 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
801
802 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
804
805 async fn subscribe_to_next_responses(
810 &self,
811 execution_id: &ExecutionId,
812 start_idx: usize,
813 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
814 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
815
816 async fn wait_for_finished_result(
819 &self,
820 execution_id: &ExecutionId,
821 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
822 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
823
824 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
825
826 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
827
828 async fn get_backtrace(
831 &self,
832 execution_id: &ExecutionId,
833 filter: BacktraceFilter,
834 ) -> Result<BacktraceInfo, DbErrorRead>;
835
836 async fn list_executions(
839 &self,
840 ffqn: Option<FunctionFqn>,
841 top_level_only: bool,
842 pagination: ExecutionListPagination,
843 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
844
845 async fn list_responses(
849 &self,
850 execution_id: &ExecutionId,
851 pagination: Pagination<u32>,
852 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
853}
854
855#[derive(Clone)]
856pub enum BacktraceFilter {
857 First,
858 Last,
859 Specific(Version),
860}
861
862pub struct BacktraceInfo {
863 pub execution_id: ExecutionId,
864 pub component_id: ComponentId,
865 pub version_min_including: Version,
866 pub version_max_excluding: Version,
867 pub wasm_backtrace: WasmBacktrace,
868}
869
870#[derive(Serialize, Deserialize, Debug, Clone)]
871pub struct WasmBacktrace {
872 pub frames: Vec<FrameInfo>,
873}
874
875#[derive(Serialize, Deserialize, Debug, Clone)]
876pub struct FrameInfo {
877 pub module: String,
878 pub func_name: String,
879 pub symbols: Vec<FrameSymbol>,
880}
881
882#[derive(Serialize, Deserialize, Debug, Clone)]
883pub struct FrameSymbol {
884 pub func_name: Option<String>,
885 pub file: Option<String>,
886 pub line: Option<u32>,
887 pub col: Option<u32>,
888}
889
890mod wasm_backtrace {
891 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
892
893 impl WasmBacktrace {
894 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
895 if backtrace.frames().is_empty() {
896 None
897 } else {
898 Some(Self {
899 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
900 })
901 }
902 }
903 }
904
905 impl From<&wasmtime::FrameInfo> for FrameInfo {
906 fn from(frame: &wasmtime::FrameInfo) -> Self {
907 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
908 let mut func_name = String::new();
909 wasmtime_environ::demangle_function_name_or_index(
910 &mut func_name,
911 frame.func_name(),
912 frame.func_index() as usize,
913 )
914 .expect("writing to string must succeed");
915 Self {
916 module: module_name,
917 func_name,
918 symbols: frame
919 .symbols()
920 .iter()
921 .map(std::convert::Into::into)
922 .collect(),
923 }
924 }
925 }
926
927 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
928 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
929 let func_name = symbol.name().map(|name| {
930 let mut writer = String::new();
931 wasmtime_environ::demangle_function_name(&mut writer, name)
932 .expect("writing to string must succeed");
933 writer
934 });
935
936 Self {
937 func_name,
938 file: symbol.file().map(ToString::to_string),
939 line: symbol.line(),
940 col: symbol.column(),
941 }
942 }
943 }
944}
945
946pub type ResponseCursorType = u32; #[derive(Debug, Clone)]
949pub struct ResponseWithCursor {
950 pub event: JoinSetResponseEventOuter,
951 pub cursor: ResponseCursorType,
952}
953
954#[derive(Debug)]
955pub struct ExecutionWithState {
956 pub execution_id: ExecutionId,
957 pub ffqn: FunctionFqn,
958 pub pending_state: PendingState,
959 pub created_at: DateTime<Utc>,
960 pub scheduled_at: DateTime<Utc>,
961}
962
963pub enum ExecutionListPagination {
964 CreatedBy(Pagination<Option<DateTime<Utc>>>),
965 ExecutionId(Pagination<Option<ExecutionId>>),
966}
967
968#[derive(Debug, Clone, Copy)]
969pub enum Pagination<T> {
970 NewerThan {
971 length: ResponseCursorType,
972 cursor: T,
973 including_cursor: bool,
974 },
975 OlderThan {
976 length: ResponseCursorType,
977 cursor: T,
978 including_cursor: bool,
979 },
980}
981impl<T> Pagination<T> {
982 pub fn length(&self) -> ResponseCursorType {
983 match self {
984 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
985 }
986 }
987 pub fn rel(&self) -> &'static str {
988 match self {
989 Pagination::NewerThan {
990 including_cursor: false,
991 ..
992 } => ">",
993 Pagination::NewerThan {
994 including_cursor: true,
995 ..
996 } => ">=",
997 Pagination::OlderThan {
998 including_cursor: false,
999 ..
1000 } => "<",
1001 Pagination::OlderThan {
1002 including_cursor: true,
1003 ..
1004 } => "<=",
1005 }
1006 }
1007 pub fn is_desc(&self) -> bool {
1008 matches!(self, Pagination::OlderThan { .. })
1009 }
1010}
1011
1012#[cfg(feature = "test")]
1013pub async fn wait_for_pending_state_fn<T: Debug>(
1014 db_connection: &dyn DbConnection,
1015 execution_id: &ExecutionId,
1016 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1017 timeout: Option<Duration>,
1018) -> Result<T, DbErrorReadWithTimeout> {
1019 tracing::trace!(%execution_id, "Waiting for predicate");
1020 let fut = async move {
1021 loop {
1022 let execution_log = db_connection.get(execution_id).await?;
1023 if let Some(t) = predicate(execution_log) {
1024 tracing::debug!(%execution_id, "Found: {t:?}");
1025 return Ok(t);
1026 }
1027 tokio::time::sleep(Duration::from_millis(10)).await;
1028 }
1029 };
1030
1031 if let Some(timeout) = timeout {
1032 tokio::select! { res = fut => res,
1034 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1035 }
1036 } else {
1037 fut.await
1038 }
1039}
1040
1041#[derive(Debug, Clone, PartialEq, Eq)]
1042pub enum ExpiredTimer {
1043 Lock(ExpiredLock),
1044 Delay(ExpiredDelay),
1045}
1046
1047#[derive(Debug, Clone, PartialEq, Eq)]
1048pub struct ExpiredLock {
1049 pub execution_id: ExecutionId,
1050 pub locked_at_version: Version,
1052 pub next_version: Version,
1053 pub intermittent_event_count: u32,
1055 pub max_retries: u32,
1056 pub retry_exp_backoff: Duration,
1057 pub parent: Option<(ExecutionId, JoinSetId)>, }
1059
1060#[derive(Debug, Clone, PartialEq, Eq)]
1061pub struct ExpiredDelay {
1062 pub execution_id: ExecutionId,
1063 pub join_set_id: JoinSetId,
1064 pub delay_id: DelayId,
1065}
1066
1067#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1068#[serde(tag = "type")]
1069pub enum PendingState {
1070 #[display("Locked(`{lock_expires_at}`, {executor_id}, {run_id})")]
1071 Locked {
1072 executor_id: ExecutorId,
1073 run_id: RunId,
1074 lock_expires_at: DateTime<Utc>,
1075 },
1076 #[display("PendingAt(`{scheduled_at}`)")]
1077 PendingAt { scheduled_at: DateTime<Utc> }, #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1079 BlockedByJoinSet {
1081 join_set_id: JoinSetId,
1082 lock_expires_at: DateTime<Utc>,
1084 closing: bool,
1086 },
1087 #[display("Finished({finished})")]
1088 Finished { finished: PendingStateFinished },
1089}
1090
1091#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1092#[display("{result_kind}")]
1093pub struct PendingStateFinished {
1094 pub version: VersionType, pub finished_at: DateTime<Utc>,
1096 pub result_kind: PendingStateFinishedResultKind,
1097}
1098
1099#[derive(
1100 Debug, Clone, Copy, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
1101)]
1102pub struct PendingStateFinishedResultKind(pub Result<(), PendingStateFinishedError>);
1103const OK_VARIANT: &str = "ok";
1104impl FromStr for PendingStateFinishedResultKind {
1105 type Err = strum::ParseError;
1106
1107 fn from_str(s: &str) -> Result<Self, Self::Err> {
1108 if s == OK_VARIANT {
1109 Ok(PendingStateFinishedResultKind(Ok(())))
1110 } else {
1111 let err = PendingStateFinishedError::from_str(s)?;
1112 Ok(PendingStateFinishedResultKind(Err(err)))
1113 }
1114 }
1115}
1116
1117impl Display for PendingStateFinishedResultKind {
1118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1119 match self.0 {
1120 Ok(()) => write!(f, "{OK_VARIANT}"),
1121 Err(err) => write!(f, "{err}"),
1122 }
1123 }
1124}
1125
1126impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1127 fn from(result: &SupportedFunctionReturnValue) -> Self {
1128 result.as_pending_state_finished_result()
1129 }
1130}
1131
1132#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumString, strum::Display)]
1133#[cfg_attr(test, derive(strum::VariantArray))]
1134#[strum(serialize_all = "snake_case")]
1135pub enum PendingStateFinishedError {
1136 Timeout,
1137 ExecutionFailure,
1138 FallibleError,
1139}
1140
1141impl PendingState {
1142 pub fn can_append_lock(
1143 &self,
1144 created_at: DateTime<Utc>,
1145 executor_id: ExecutorId,
1146 run_id: RunId,
1147 lock_expires_at: DateTime<Utc>,
1148 ) -> Result<LockKind, DbErrorWritePermanent> {
1149 if lock_expires_at <= created_at {
1150 return Err(DbErrorWritePermanent::ValidationFailed(
1151 "invalid expiry date".into(),
1152 ));
1153 }
1154 match self {
1155 PendingState::PendingAt { scheduled_at } => {
1156 if *scheduled_at <= created_at {
1157 Ok(LockKind::CreatingNewLock)
1159 } else {
1160 Err(DbErrorWritePermanent::ValidationFailed(
1161 "cannot lock, not yet pending".into(),
1162 ))
1163 }
1164 }
1165 PendingState::Locked {
1166 executor_id: current_pending_state_executor_id,
1167 run_id: current_pending_state_run_id,
1168 ..
1169 } => {
1170 if executor_id == *current_pending_state_executor_id
1171 && run_id == *current_pending_state_run_id
1172 {
1173 Ok(LockKind::Extending)
1175 } else {
1176 Err(DbErrorWritePermanent::CannotWrite {
1177 reason: "cannot lock, already locked".into(),
1178 expected_version: None,
1179 })
1180 }
1181 }
1182 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWritePermanent::CannotWrite {
1183 reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1184 expected_version: None,
1185 }),
1186 PendingState::Finished { .. } => Err(DbErrorWritePermanent::CannotWrite {
1187 reason: "already finished".into(),
1188 expected_version: None,
1189 }),
1190 }
1191 }
1192
1193 #[must_use]
1194 pub fn is_finished(&self) -> bool {
1195 matches!(self, PendingState::Finished { .. })
1196 }
1197}
1198
1199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1200pub enum LockKind {
1201 Extending,
1202 CreatingNewLock,
1203}
1204
1205pub mod http_client_trace {
1206 use chrono::{DateTime, Utc};
1207 use serde::{Deserialize, Serialize};
1208
1209 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1210 pub struct HttpClientTrace {
1211 pub req: RequestTrace,
1212 pub resp: Option<ResponseTrace>,
1213 }
1214
1215 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1216 pub struct RequestTrace {
1217 pub sent_at: DateTime<Utc>,
1218 pub uri: String,
1219 pub method: String,
1220 }
1221
1222 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1223 pub struct ResponseTrace {
1224 pub finished_at: DateTime<Utc>,
1225 pub status: Result<u16, String>,
1226 }
1227}
1228
1229#[cfg(test)]
1230mod tests {
1231 use super::HistoryEventScheduleAt;
1232 use super::PendingStateFinished;
1233 use super::PendingStateFinishedError;
1234 use super::PendingStateFinishedResultKind;
1235 use crate::SupportedFunctionReturnValue;
1236 use chrono::DateTime;
1237 use chrono::Datelike;
1238 use chrono::Utc;
1239 use insta::assert_snapshot;
1240 use rstest::rstest;
1241 use std::time::Duration;
1242 use strum::VariantArray as _;
1243 use val_json::type_wrapper::TypeWrapper;
1244 use val_json::wast_val::WastVal;
1245 use val_json::wast_val::WastValWithType;
1246
1247 #[rstest(expected => [
1248 PendingStateFinishedResultKind(Result::Ok(())),
1249 PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1250 ])]
1251 #[test]
1252 fn serde_pending_state_finished_result_kind_should_work(
1253 expected: PendingStateFinishedResultKind,
1254 ) {
1255 let ser = serde_json::to_string(&expected).unwrap();
1256 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1257 assert_eq!(expected, actual);
1258 }
1259
1260 #[rstest(result_kind => [
1261 PendingStateFinishedResultKind(Result::Ok(())),
1262 PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1263 ])]
1264 #[test]
1265 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1266 let expected = PendingStateFinished {
1267 version: 0,
1268 finished_at: Utc::now(),
1269 result_kind,
1270 };
1271
1272 let ser = serde_json::to_string(&expected).unwrap();
1273 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1274 assert_eq!(expected, actual);
1275 }
1276
1277 #[test]
1278 fn join_set_deser_with_result_ok_option_none_should_work() {
1279 let expected = SupportedFunctionReturnValue::Ok {
1280 ok: Some(WastValWithType {
1281 r#type: TypeWrapper::Result {
1282 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1283 err: Some(Box::new(TypeWrapper::String)),
1284 },
1285 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1286 }),
1287 };
1288 let json = serde_json::to_string(&expected).unwrap();
1289 assert_snapshot!(json);
1290
1291 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1292
1293 assert_eq!(expected, actual);
1294 }
1295
1296 #[test]
1297 fn verify_pending_state_finished_result_kind_serde() {
1298 let variants: Vec<_> = PendingStateFinishedError::VARIANTS
1299 .iter()
1300 .map(|var| PendingStateFinishedResultKind(Err(*var)))
1301 .chain(std::iter::once(PendingStateFinishedResultKind(Ok(()))))
1302 .collect();
1303 let ser = serde_json::to_string_pretty(&variants).unwrap();
1304 insta::assert_snapshot!(ser);
1305 let deser: Vec<PendingStateFinishedResultKind> = serde_json::from_str(&ser).unwrap();
1306 assert_eq!(variants, deser);
1307 }
1308
1309 #[test]
1310 fn as_date_time_should_work_with_duration_u32_max_secs() {
1311 let duration = Duration::from_secs(u64::from(u32::MAX));
1312 let schedule_at = HistoryEventScheduleAt::In(duration);
1313 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1314 assert_eq!(2106, resolved.year());
1315 }
1316
1317 const MILLIS_PER_SEC: i64 = 1000;
1318 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1319
1320 #[test]
1321 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1322 let duration = Duration::from_secs(
1324 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1325 );
1326 let schedule_at = HistoryEventScheduleAt::In(duration);
1327 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1328 }
1329}