1use crate::ClosingStrategy;
2use crate::ComponentId;
3use crate::ExecutionId;
4use crate::ExecutionMetadata;
5use crate::FunctionFqn;
6use crate::JoinSetId;
7use crate::Params;
8use crate::StrVariant;
9use crate::SupportedFunctionReturnValue;
10use crate::prefixed_ulid::DelayId;
11use crate::prefixed_ulid::ExecutionIdDerived;
12use crate::prefixed_ulid::ExecutorId;
13use crate::prefixed_ulid::RunId;
14use assert_matches::assert_matches;
15use async_trait::async_trait;
16use chrono::TimeDelta;
17use chrono::{DateTime, Utc};
18use http_client_trace::HttpClientTrace;
19use serde::Deserialize;
20use serde::Serialize;
21use std::fmt::Debug;
22use std::fmt::Display;
23use std::pin::Pin;
24use std::str::FromStr;
25use std::sync::Arc;
26use std::time::Duration;
27use strum::IntoStaticStr;
28use tracing::error;
29
30#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
32pub struct ExecutionLog {
33 pub execution_id: ExecutionId,
34 pub events: Vec<ExecutionEvent>,
35 pub responses: Vec<JoinSetResponseEventOuter>,
36 pub next_version: Version, pub pending_state: PendingState,
38}
39
40impl ExecutionLog {
41 #[must_use]
42 pub fn can_be_retried_after(
43 temporary_event_count: u32,
44 max_retries: u32,
45 retry_exp_backoff: Duration,
46 ) -> Option<Duration> {
47 if temporary_event_count <= max_retries {
49 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
51 Some(duration)
52 } else {
53 None
54 }
55 }
56
57 #[must_use]
58 pub fn compute_retry_duration_when_retrying_forever(
59 temporary_event_count: u32,
60 retry_exp_backoff: Duration,
61 ) -> Duration {
62 Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
63 .expect("`max_retries` set to MAX must never return None")
64 }
65
66 #[must_use]
67 pub fn retry_exp_backoff(&self) -> Duration {
68 assert_matches!(self.events.first(), Some(ExecutionEvent {
69 event: ExecutionEventInner::Created { retry_exp_backoff, .. },
70 ..
71 }) => *retry_exp_backoff)
72 }
73
74 #[must_use]
75 pub fn max_retries(&self) -> u32 {
76 assert_matches!(self.events.first(), Some(ExecutionEvent {
77 event: ExecutionEventInner::Created { max_retries, .. },
78 ..
79 }) => *max_retries)
80 }
81
82 #[must_use]
83 pub fn ffqn(&self) -> &FunctionFqn {
84 assert_matches!(self.events.first(), Some(ExecutionEvent {
85 event: ExecutionEventInner::Created { ffqn, .. },
86 ..
87 }) => ffqn)
88 }
89
90 #[must_use]
91 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
92 assert_matches!(self.events.first(), Some(ExecutionEvent {
93 event: ExecutionEventInner::Created { parent, .. },
94 ..
95 }) => parent.clone())
96 }
97
98 #[must_use]
99 pub fn last_event(&self) -> &ExecutionEvent {
100 self.events.last().expect("must contain at least one event")
101 }
102
103 #[must_use]
104 pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
105 if let ExecutionEvent {
106 event: ExecutionEventInner::Finished { result, .. },
107 ..
108 } = self.events.pop().expect("must contain at least one event")
109 {
110 Some(result)
111 } else {
112 None
113 }
114 }
115
116 pub fn event_history(&self) -> impl Iterator<Item = HistoryEvent> + '_ {
117 self.events.iter().filter_map(|event| {
118 if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
119 Some(eh.clone())
120 } else {
121 None
122 }
123 })
124 }
125
126 #[cfg(feature = "test")]
127 #[must_use]
128 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
129 self.events
130 .iter()
131 .find_map(move |event| match &event.event {
132 ExecutionEventInner::HistoryEvent {
133 event:
134 HistoryEvent::JoinSetRequest {
135 join_set_id: found,
136 request,
137 },
138 ..
139 } if *join_set_id == *found => Some(request),
140 _ => None,
141 })
142 }
143}
144
145pub type VersionType = u32;
146#[derive(
147 Debug,
148 Default,
149 Clone,
150 PartialEq,
151 Eq,
152 Hash,
153 derive_more::Display,
154 derive_more::Into,
155 serde::Serialize,
156 serde::Deserialize,
157)]
158#[serde(transparent)]
159pub struct Version(pub VersionType);
160impl Version {
161 #[must_use]
162 pub fn new(arg: VersionType) -> Version {
163 Version(arg)
164 }
165
166 #[must_use]
167 pub fn increment(&self) -> Version {
168 Version(self.0 + 1)
169 }
170}
171
172#[derive(
173 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
174)]
175#[display("{event}")]
176pub struct ExecutionEvent {
177 pub created_at: DateTime<Utc>,
178 pub event: ExecutionEventInner,
179 pub backtrace_id: Option<Version>,
180}
181
182#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
184pub struct JoinSetResponseEventOuter {
185 pub created_at: DateTime<Utc>,
186 pub event: JoinSetResponseEvent,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
190pub struct JoinSetResponseEvent {
191 pub join_set_id: JoinSetId,
192 pub event: JoinSetResponse,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
197#[serde(tag = "type")]
198pub enum JoinSetResponse {
199 DelayFinished {
200 delay_id: DelayId,
201 },
202 ChildExecutionFinished {
203 child_execution_id: ExecutionIdDerived,
204 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
205 finished_version: Version,
206 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
207 result: SupportedFunctionReturnValue,
208 },
209}
210
211pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
212 ffqn: FunctionFqn::new_static("", ""),
213 params: Params::empty(),
214 parent: None,
215 scheduled_at: DateTime::from_timestamp_nanos(0),
216 retry_exp_backoff: Duration::ZERO,
217 max_retries: 0,
218 component_id: ComponentId::dummy_activity(),
219 metadata: ExecutionMetadata::empty(),
220 scheduled_by: None,
221};
222pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
223 event: HistoryEvent::JoinSetCreate {
224 join_set_id: JoinSetId {
225 kind: crate::JoinSetKind::OneOff,
226 name: StrVariant::empty(),
227 },
228 closing_strategy: ClosingStrategy::Complete,
229 },
230};
231
232#[derive(
233 Clone,
234 derive_more::Debug,
235 derive_more::Display,
236 PartialEq,
237 Eq,
238 Serialize,
239 Deserialize,
240 IntoStaticStr,
241)]
242#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
243#[allow(clippy::large_enum_variant)]
244pub enum ExecutionEventInner {
245 #[display("Created({ffqn}, `{scheduled_at}`)")]
249 Created {
250 ffqn: FunctionFqn,
251 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
252 #[debug(skip)]
253 params: Params,
254 parent: Option<(ExecutionId, JoinSetId)>,
255 scheduled_at: DateTime<Utc>,
256 retry_exp_backoff: Duration,
257 max_retries: u32,
258 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
259 component_id: ComponentId,
260 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
261 metadata: ExecutionMetadata,
262 scheduled_by: Option<ExecutionId>,
263 },
264 #[display("Locked(`{lock_expires_at}`, {component_id})")]
268 Locked {
269 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
270 component_id: ComponentId,
271 executor_id: ExecutorId,
272 run_id: RunId,
273 lock_expires_at: DateTime<Utc>,
274 },
275 #[display("Unlocked(`{backoff_expires_at}`)")]
280 Unlocked {
281 backoff_expires_at: DateTime<Utc>,
282 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
283 reason: StrVariant,
284 },
285 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
288 TemporarilyFailed {
289 backoff_expires_at: DateTime<Utc>,
290 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
291 reason_full: StrVariant,
292 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason inner")))]
293 reason_inner: StrVariant,
294 detail: Option<String>,
295 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
296 http_client_traces: Option<Vec<HttpClientTrace>>,
297 },
298 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
301 TemporarilyTimedOut {
302 backoff_expires_at: DateTime<Utc>,
303 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
304 http_client_traces: Option<Vec<HttpClientTrace>>,
305 },
306 #[display("Finished")]
310 Finished {
311 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
312 result: SupportedFunctionReturnValue,
313 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
314 http_client_traces: Option<Vec<HttpClientTrace>>,
315 },
316
317 #[display("HistoryEvent({event})")]
318 HistoryEvent { event: HistoryEvent },
319}
320
321impl ExecutionEventInner {
322 #[must_use]
323 pub fn is_temporary_event(&self) -> bool {
324 matches!(
325 self,
326 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
327 )
328 }
329
330 #[must_use]
331 pub fn variant(&self) -> &'static str {
332 Into::<&'static str>::into(self)
333 }
334
335 #[must_use]
336 pub fn join_set_id(&self) -> Option<&JoinSetId> {
337 match self {
338 Self::Created {
339 parent: Some((_parent_id, join_set_id)),
340 ..
341 } => Some(join_set_id),
342 Self::HistoryEvent {
343 event:
344 HistoryEvent::JoinSetCreate { join_set_id, .. }
345 | HistoryEvent::JoinSetRequest { join_set_id, .. }
346 | HistoryEvent::JoinNext { join_set_id, .. },
347 } => Some(join_set_id),
348 _ => None,
349 }
350 }
351}
352
353#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
354#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
355#[serde(tag = "type")]
356pub enum PersistKind {
357 #[display("RandomU64({min}, {max_inclusive})")]
358 RandomU64 { min: u64, max_inclusive: u64 },
359 #[display("RandomString({min_length}, {max_length_exclusive})")]
360 RandomString {
361 min_length: u64,
362 max_length_exclusive: u64,
363 },
364}
365
366#[must_use]
367pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
368 value.to_be_bytes()
369}
370
371#[must_use]
372pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
373 u64::from_be_bytes(bytes)
374}
375
376#[derive(
377 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
378)]
379#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
380#[serde(tag = "type")]
381pub enum HistoryEvent {
383 #[display("Persist")]
385 Persist {
386 #[debug(skip)]
387 value: Vec<u8>, kind: PersistKind,
389 },
390 #[display("JoinSetCreate({join_set_id})")]
391 JoinSetCreate {
392 join_set_id: JoinSetId,
393 closing_strategy: ClosingStrategy,
394 },
395 #[display("JoinSetRequest({join_set_id}, {request})")]
396 JoinSetRequest {
397 join_set_id: JoinSetId,
398 request: JoinSetRequest,
399 },
400 #[display("JoinNext({join_set_id})")]
406 JoinNext {
407 join_set_id: JoinSetId,
408 run_expires_at: DateTime<Utc>,
411 requested_ffqn: Option<FunctionFqn>,
414 closing: bool,
416 },
417 #[display("JoinNextTooMany({join_set_id})")]
419 JoinNextTooMany {
420 join_set_id: JoinSetId,
421 requested_ffqn: Option<FunctionFqn>,
424 },
425 #[display("Schedule({execution_id}, {schedule_at})")]
426 Schedule {
427 execution_id: ExecutionId,
428 schedule_at: HistoryEventScheduleAt, },
430 #[display("Stub({target_execution_id})")]
431 Stub {
432 target_execution_id: ExecutionIdDerived,
433 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
434 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
437}
438
439#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
440#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
441pub enum HistoryEventScheduleAt {
442 Now,
443 At(DateTime<Utc>),
444 #[display("In({_0:?})")]
445 In(Duration),
446}
447
448#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
449pub enum ScheduleAtConversionError {
450 #[error("source duration value is out of range")]
451 OutOfRangeError,
452}
453
454impl HistoryEventScheduleAt {
455 pub fn as_date_time(
456 &self,
457 now: DateTime<Utc>,
458 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
459 match self {
460 Self::Now => Ok(now),
461 Self::At(date_time) => Ok(*date_time),
462 Self::In(duration) => {
463 let time_delta = TimeDelta::from_std(*duration)
464 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
465 now.checked_add_signed(time_delta)
466 .ok_or(ScheduleAtConversionError::OutOfRangeError)
467 }
468 }
469 }
470}
471
472#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
473#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
474#[serde(tag = "type")]
475pub enum JoinSetRequest {
476 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
478 DelayRequest {
479 delay_id: DelayId,
480 expires_at: DateTime<Utc>,
481 schedule_at: HistoryEventScheduleAt,
482 },
483 #[display("ChildExecutionRequest({child_execution_id})")]
485 ChildExecutionRequest {
486 child_execution_id: ExecutionIdDerived,
487 },
488}
489
490#[derive(Debug, Clone, thiserror::Error, PartialEq)]
492pub enum DbErrorGeneric {
493 #[error("database error: {0}")]
494 Uncategorized(StrVariant), #[error("database was closed")]
496 Close,
497}
498
499#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
500pub enum DbErrorWritePermanent {
501 #[error("validation failed: {0}")]
503 ValidationFailed(StrVariant),
504 #[error("cannot write: {reason}")]
505 CannotWrite {
506 reason: StrVariant,
507 expected_version: Option<Version>,
508 },
509}
510
511#[derive(Debug, Clone, thiserror::Error, PartialEq)]
513pub enum DbErrorWrite {
514 #[error("cannot write - row not found")]
515 NotFound,
516 #[error("permanent error: {0}")]
518 Permanent(#[from] DbErrorWritePermanent),
519 #[error(transparent)]
520 DbErrorGeneric(#[from] DbErrorGeneric),
521}
522
523#[derive(Debug, Clone, thiserror::Error, PartialEq)]
525pub enum DbErrorRead {
526 #[error("cannot read - row not found")]
527 NotFound,
528 #[error(transparent)]
529 DbErrorGeneric(#[from] DbErrorGeneric),
530}
531
532impl From<DbErrorRead> for DbErrorWrite {
533 fn from(value: DbErrorRead) -> DbErrorWrite {
534 match value {
535 DbErrorRead::NotFound => DbErrorWrite::NotFound,
536 DbErrorRead::DbErrorGeneric(err) => DbErrorWrite::DbErrorGeneric(err),
537 }
538 }
539}
540
541#[derive(Debug, thiserror::Error, PartialEq)]
542pub enum DbErrorReadWithTimeout {
543 #[error("timeout")]
544 Timeout,
545 #[error(transparent)]
546 DbErrorRead(#[from] DbErrorRead),
547}
548impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
549 fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
550 Self::from(DbErrorRead::from(value))
551 }
552}
553
554pub type AppendResponse = Version;
557pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
558
559#[derive(Debug, Clone)]
560pub struct LockedExecution {
561 pub execution_id: ExecutionId,
562 pub next_version: Version,
563 pub metadata: ExecutionMetadata,
564 pub run_id: RunId,
565 pub ffqn: FunctionFqn,
566 pub params: Params,
567 pub event_history: Vec<HistoryEvent>,
568 pub responses: Vec<JoinSetResponseEventOuter>,
569 pub retry_exp_backoff: Duration,
570 pub max_retries: u32,
571 pub parent: Option<(ExecutionId, JoinSetId)>,
572 pub intermittent_event_count: u32,
573}
574
575pub type LockPendingResponse = Vec<LockedExecution>;
576pub type AppendBatchResponse = Version;
577
578#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
579#[display("{event}")]
580pub struct AppendRequest {
581 pub created_at: DateTime<Utc>,
582 pub event: ExecutionEventInner,
583}
584
585#[derive(Debug, Clone)]
586pub struct CreateRequest {
587 pub created_at: DateTime<Utc>,
588 pub execution_id: ExecutionId,
589 pub ffqn: FunctionFqn,
590 pub params: Params,
591 pub parent: Option<(ExecutionId, JoinSetId)>,
592 pub scheduled_at: DateTime<Utc>,
593 pub retry_exp_backoff: Duration,
594 pub max_retries: u32,
595 pub component_id: ComponentId,
596 pub metadata: ExecutionMetadata,
597 pub scheduled_by: Option<ExecutionId>,
598}
599
600impl From<CreateRequest> for ExecutionEventInner {
601 fn from(value: CreateRequest) -> Self {
602 Self::Created {
603 ffqn: value.ffqn,
604 params: value.params,
605 parent: value.parent,
606 scheduled_at: value.scheduled_at,
607 retry_exp_backoff: value.retry_exp_backoff,
608 max_retries: value.max_retries,
609 component_id: value.component_id,
610 metadata: value.metadata,
611 scheduled_by: value.scheduled_by,
612 }
613 }
614}
615
616#[async_trait]
617pub trait DbPool: Send + Sync {
618 fn connection(&self) -> Box<dyn DbConnection>;
619}
620
621#[async_trait]
622pub trait DbPoolCloseable {
623 async fn close(self);
624}
625
626#[derive(Clone, Debug)]
627pub struct AppendEventsToExecution {
628 pub execution_id: ExecutionId,
629 pub version: Version,
630 pub batch: Vec<AppendRequest>,
631}
632
633#[derive(Clone, Debug)]
634pub struct AppendResponseToExecution {
635 pub parent_execution_id: ExecutionId,
636 pub parent_response_event: JoinSetResponseEventOuter,
637}
638
639#[async_trait]
640pub trait DbExecutor: Send + Sync {
641 #[expect(clippy::too_many_arguments)]
642 async fn lock_pending(
643 &self,
644 batch_size: usize,
645 pending_at_or_sooner: DateTime<Utc>,
646 ffqns: Arc<[FunctionFqn]>,
647 created_at: DateTime<Utc>,
648 component_id: ComponentId,
649 executor_id: ExecutorId,
650 lock_expires_at: DateTime<Utc>,
651 run_id: RunId,
652 ) -> Result<LockPendingResponse, DbErrorGeneric>;
653
654 #[expect(clippy::too_many_arguments)]
656 async fn lock_one(
657 &self,
658 created_at: DateTime<Utc>,
659 component_id: ComponentId,
660 execution_id: &ExecutionId,
661 run_id: RunId,
662 version: Version,
663 executor_id: ExecutorId,
664 lock_expires_at: DateTime<Utc>,
665 ) -> Result<LockedExecution, DbErrorWrite>;
666
667 async fn append(
670 &self,
671 execution_id: ExecutionId,
672 version: Version,
673 req: AppendRequest,
674 ) -> Result<AppendResponse, DbErrorWrite>;
675
676 async fn append_batch_respond_to_parent(
679 &self,
680 events: AppendEventsToExecution,
681 response: AppendResponseToExecution,
682 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
684
685 async fn wait_for_pending(
690 &self,
691 pending_at_or_sooner: DateTime<Utc>,
692 ffqns: Arc<[FunctionFqn]>,
693 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
694 );
695}
696
697#[async_trait]
698pub trait DbConnection: DbExecutor {
699 async fn append_response(
700 &self,
701 created_at: DateTime<Utc>,
702 execution_id: ExecutionId,
703 response_event: JoinSetResponseEvent,
704 ) -> Result<(), DbErrorWrite>;
705
706 async fn append_batch(
709 &self,
710 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
712 execution_id: ExecutionId,
713 version: Version,
714 ) -> Result<AppendBatchResponse, DbErrorWrite>;
715
716 async fn append_batch_create_new_execution(
719 &self,
720 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
723 version: Version,
724 child_req: Vec<CreateRequest>,
725 ) -> Result<AppendBatchResponse, DbErrorWrite>;
726
727 #[cfg(feature = "test")]
728 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
730
731 async fn list_execution_events(
732 &self,
733 execution_id: &ExecutionId,
734 since: &Version,
735 max_length: VersionType,
736 include_backtrace_id: bool,
737 ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
738
739 async fn get_execution_event(
741 &self,
742 execution_id: &ExecutionId,
743 version: &Version,
744 ) -> Result<ExecutionEvent, DbErrorRead>;
745
746 async fn get_create_request(
747 &self,
748 execution_id: &ExecutionId,
749 ) -> Result<CreateRequest, DbErrorRead> {
750 let execution_event = self
751 .get_execution_event(execution_id, &Version::new(0))
752 .await?;
753 if let ExecutionEventInner::Created {
754 ffqn,
755 params,
756 parent,
757 scheduled_at,
758 retry_exp_backoff,
759 max_retries,
760 component_id,
761 metadata,
762 scheduled_by,
763 } = execution_event.event
764 {
765 Ok(CreateRequest {
766 created_at: execution_event.created_at,
767 execution_id: execution_id.clone(),
768 ffqn,
769 params,
770 parent,
771 scheduled_at,
772 retry_exp_backoff,
773 max_retries,
774 component_id,
775 metadata,
776 scheduled_by,
777 })
778 } else {
779 error!(%execution_id, "Execution log must start with creation");
780 Err(DbErrorRead::DbErrorGeneric(DbErrorGeneric::Uncategorized(
781 "execution log must start with creation".into(),
782 )))
783 }
784 }
785
786 async fn get_finished_result(
787 &self,
788 execution_id: &ExecutionId,
789 finished: PendingStateFinished,
790 ) -> Result<Option<SupportedFunctionReturnValue>, DbErrorRead> {
791 let last_event = self
792 .get_execution_event(execution_id, &Version::new(finished.version))
793 .await?;
794 if let ExecutionEventInner::Finished { result, .. } = last_event.event {
795 Ok(Some(result))
796 } else {
797 Ok(None)
798 }
799 }
800
801 async fn get_pending_state(
802 &self,
803 execution_id: &ExecutionId,
804 ) -> Result<PendingState, DbErrorRead>;
805
806 async fn get_expired_timers(
808 &self,
809 at: DateTime<Utc>,
810 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
811
812 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
814
815 async fn subscribe_to_next_responses(
820 &self,
821 execution_id: &ExecutionId,
822 start_idx: usize,
823 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
824 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
825
826 async fn wait_for_finished_result(
829 &self,
830 execution_id: &ExecutionId,
831 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
832 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
833
834 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
835
836 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
837
838 async fn get_backtrace(
841 &self,
842 execution_id: &ExecutionId,
843 filter: BacktraceFilter,
844 ) -> Result<BacktraceInfo, DbErrorRead>;
845
846 async fn list_executions(
849 &self,
850 ffqn: Option<FunctionFqn>,
851 top_level_only: bool,
852 pagination: ExecutionListPagination,
853 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
854
855 async fn list_responses(
859 &self,
860 execution_id: &ExecutionId,
861 pagination: Pagination<u32>,
862 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
863}
864
865#[derive(Clone)]
866pub enum BacktraceFilter {
867 First,
868 Last,
869 Specific(Version),
870}
871
872pub struct BacktraceInfo {
873 pub execution_id: ExecutionId,
874 pub component_id: ComponentId,
875 pub version_min_including: Version,
876 pub version_max_excluding: Version,
877 pub wasm_backtrace: WasmBacktrace,
878}
879
880#[derive(Serialize, Deserialize, Debug, Clone)]
881pub struct WasmBacktrace {
882 pub frames: Vec<FrameInfo>,
883}
884
885#[derive(Serialize, Deserialize, Debug, Clone)]
886pub struct FrameInfo {
887 pub module: String,
888 pub func_name: String,
889 pub symbols: Vec<FrameSymbol>,
890}
891
892#[derive(Serialize, Deserialize, Debug, Clone)]
893pub struct FrameSymbol {
894 pub func_name: Option<String>,
895 pub file: Option<String>,
896 pub line: Option<u32>,
897 pub col: Option<u32>,
898}
899
900mod wasm_backtrace {
901 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
902
903 impl WasmBacktrace {
904 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
905 if backtrace.frames().is_empty() {
906 None
907 } else {
908 Some(Self {
909 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
910 })
911 }
912 }
913 }
914
915 impl From<&wasmtime::FrameInfo> for FrameInfo {
916 fn from(frame: &wasmtime::FrameInfo) -> Self {
917 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
918 let mut func_name = String::new();
919 wasmtime_environ::demangle_function_name_or_index(
920 &mut func_name,
921 frame.func_name(),
922 frame.func_index() as usize,
923 )
924 .expect("writing to string must succeed");
925 Self {
926 module: module_name,
927 func_name,
928 symbols: frame
929 .symbols()
930 .iter()
931 .map(std::convert::Into::into)
932 .collect(),
933 }
934 }
935 }
936
937 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
938 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
939 let func_name = symbol.name().map(|name| {
940 let mut writer = String::new();
941 wasmtime_environ::demangle_function_name(&mut writer, name)
942 .expect("writing to string must succeed");
943 writer
944 });
945
946 Self {
947 func_name,
948 file: symbol.file().map(ToString::to_string),
949 line: symbol.line(),
950 col: symbol.column(),
951 }
952 }
953 }
954}
955
956pub type ResponseCursorType = u32; #[derive(Debug, Clone)]
959pub struct ResponseWithCursor {
960 pub event: JoinSetResponseEventOuter,
961 pub cursor: ResponseCursorType,
962}
963
964#[derive(Debug)]
965pub struct ExecutionWithState {
966 pub execution_id: ExecutionId,
967 pub ffqn: FunctionFqn,
968 pub pending_state: PendingState,
969 pub created_at: DateTime<Utc>,
970 pub scheduled_at: DateTime<Utc>,
971}
972
973pub enum ExecutionListPagination {
974 CreatedBy(Pagination<Option<DateTime<Utc>>>),
975 ExecutionId(Pagination<Option<ExecutionId>>),
976}
977
978#[derive(Debug, Clone, Copy)]
979pub enum Pagination<T> {
980 NewerThan {
981 length: ResponseCursorType,
982 cursor: T,
983 including_cursor: bool,
984 },
985 OlderThan {
986 length: ResponseCursorType,
987 cursor: T,
988 including_cursor: bool,
989 },
990}
991impl<T> Pagination<T> {
992 pub fn length(&self) -> ResponseCursorType {
993 match self {
994 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
995 }
996 }
997 pub fn rel(&self) -> &'static str {
998 match self {
999 Pagination::NewerThan {
1000 including_cursor: false,
1001 ..
1002 } => ">",
1003 Pagination::NewerThan {
1004 including_cursor: true,
1005 ..
1006 } => ">=",
1007 Pagination::OlderThan {
1008 including_cursor: false,
1009 ..
1010 } => "<",
1011 Pagination::OlderThan {
1012 including_cursor: true,
1013 ..
1014 } => "<=",
1015 }
1016 }
1017 pub fn is_desc(&self) -> bool {
1018 matches!(self, Pagination::OlderThan { .. })
1019 }
1020}
1021
1022#[cfg(feature = "test")]
1023pub async fn wait_for_pending_state_fn<T: Debug>(
1024 db_connection: &dyn DbConnection,
1025 execution_id: &ExecutionId,
1026 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1027 timeout: Option<Duration>,
1028) -> Result<T, DbErrorReadWithTimeout> {
1029 tracing::trace!(%execution_id, "Waiting for predicate");
1030 let fut = async move {
1031 loop {
1032 let execution_log = db_connection.get(execution_id).await?;
1033 if let Some(t) = predicate(execution_log) {
1034 tracing::debug!(%execution_id, "Found: {t:?}");
1035 return Ok(t);
1036 }
1037 tokio::time::sleep(Duration::from_millis(10)).await;
1038 }
1039 };
1040
1041 if let Some(timeout) = timeout {
1042 tokio::select! { res = fut => res,
1044 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1045 }
1046 } else {
1047 fut.await
1048 }
1049}
1050
1051#[derive(Debug, Clone, PartialEq, Eq)]
1052pub enum ExpiredTimer {
1053 Lock(ExpiredLock),
1054 Delay(ExpiredDelay),
1055}
1056
1057#[derive(Debug, Clone, PartialEq, Eq)]
1058pub struct ExpiredLock {
1059 pub execution_id: ExecutionId,
1060 pub locked_at_version: Version,
1062 pub next_version: Version,
1063 pub intermittent_event_count: u32,
1065 pub max_retries: u32,
1066 pub retry_exp_backoff: Duration,
1067 pub parent: Option<(ExecutionId, JoinSetId)>, }
1069
1070#[derive(Debug, Clone, PartialEq, Eq)]
1071pub struct ExpiredDelay {
1072 pub execution_id: ExecutionId,
1073 pub join_set_id: JoinSetId,
1074 pub delay_id: DelayId,
1075}
1076
1077#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1078#[serde(tag = "type")]
1079pub enum PendingState {
1080 #[display("Locked(`{lock_expires_at}`, {executor_id}, {run_id})")]
1081 Locked {
1082 executor_id: ExecutorId,
1083 run_id: RunId,
1084 lock_expires_at: DateTime<Utc>,
1085 },
1086 #[display("PendingAt(`{scheduled_at}`)")]
1087 PendingAt { scheduled_at: DateTime<Utc> }, #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1089 BlockedByJoinSet {
1091 join_set_id: JoinSetId,
1092 lock_expires_at: DateTime<Utc>,
1094 closing: bool,
1096 },
1097 #[display("Finished({finished})")]
1098 Finished { finished: PendingStateFinished },
1099}
1100
1101#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1102#[display("{result_kind}")]
1103pub struct PendingStateFinished {
1104 pub version: VersionType, pub finished_at: DateTime<Utc>,
1106 pub result_kind: PendingStateFinishedResultKind,
1107}
1108
1109#[derive(
1110 Debug, Clone, Copy, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
1111)]
1112pub struct PendingStateFinishedResultKind(pub Result<(), PendingStateFinishedError>);
1113const OK_VARIANT: &str = "ok";
1114impl FromStr for PendingStateFinishedResultKind {
1115 type Err = strum::ParseError;
1116
1117 fn from_str(s: &str) -> Result<Self, Self::Err> {
1118 if s == OK_VARIANT {
1119 Ok(PendingStateFinishedResultKind(Ok(())))
1120 } else {
1121 let err = PendingStateFinishedError::from_str(s)?;
1122 Ok(PendingStateFinishedResultKind(Err(err)))
1123 }
1124 }
1125}
1126
1127impl Display for PendingStateFinishedResultKind {
1128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1129 match self.0 {
1130 Ok(()) => write!(f, "{OK_VARIANT}"),
1131 Err(err) => write!(f, "{err}"),
1132 }
1133 }
1134}
1135
1136impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1137 fn from(result: &SupportedFunctionReturnValue) -> Self {
1138 result.as_pending_state_finished_result()
1139 }
1140}
1141
1142#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumString, strum::Display)]
1143#[cfg_attr(test, derive(strum::VariantArray))]
1144#[strum(serialize_all = "snake_case")]
1145pub enum PendingStateFinishedError {
1146 Timeout,
1147 ExecutionFailure,
1148 FallibleError,
1149}
1150
1151impl PendingState {
1152 pub fn can_append_lock(
1153 &self,
1154 created_at: DateTime<Utc>,
1155 executor_id: ExecutorId,
1156 run_id: RunId,
1157 lock_expires_at: DateTime<Utc>,
1158 ) -> Result<LockKind, DbErrorWritePermanent> {
1159 if lock_expires_at <= created_at {
1160 return Err(DbErrorWritePermanent::ValidationFailed(
1161 "invalid expiry date".into(),
1162 ));
1163 }
1164 match self {
1165 PendingState::PendingAt { scheduled_at } => {
1166 if *scheduled_at <= created_at {
1167 Ok(LockKind::CreatingNewLock)
1169 } else {
1170 Err(DbErrorWritePermanent::ValidationFailed(
1171 "cannot lock, not yet pending".into(),
1172 ))
1173 }
1174 }
1175 PendingState::Locked {
1176 executor_id: current_pending_state_executor_id,
1177 run_id: current_pending_state_run_id,
1178 ..
1179 } => {
1180 if executor_id == *current_pending_state_executor_id
1181 && run_id == *current_pending_state_run_id
1182 {
1183 Ok(LockKind::Extending)
1185 } else {
1186 Err(DbErrorWritePermanent::CannotWrite {
1187 reason: "cannot lock, already locked".into(),
1188 expected_version: None,
1189 })
1190 }
1191 }
1192 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWritePermanent::CannotWrite {
1193 reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1194 expected_version: None,
1195 }),
1196 PendingState::Finished { .. } => Err(DbErrorWritePermanent::CannotWrite {
1197 reason: "already finished".into(),
1198 expected_version: None,
1199 }),
1200 }
1201 }
1202
1203 #[must_use]
1204 pub fn is_finished(&self) -> bool {
1205 matches!(self, PendingState::Finished { .. })
1206 }
1207}
1208
1209#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1210pub enum LockKind {
1211 Extending,
1212 CreatingNewLock,
1213}
1214
1215pub mod http_client_trace {
1216 use chrono::{DateTime, Utc};
1217 use serde::{Deserialize, Serialize};
1218
1219 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1220 pub struct HttpClientTrace {
1221 pub req: RequestTrace,
1222 pub resp: Option<ResponseTrace>,
1223 }
1224
1225 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1226 pub struct RequestTrace {
1227 pub sent_at: DateTime<Utc>,
1228 pub uri: String,
1229 pub method: String,
1230 }
1231
1232 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1233 pub struct ResponseTrace {
1234 pub finished_at: DateTime<Utc>,
1235 pub status: Result<u16, String>,
1236 }
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241 use super::HistoryEventScheduleAt;
1242 use super::PendingStateFinished;
1243 use super::PendingStateFinishedError;
1244 use super::PendingStateFinishedResultKind;
1245 use crate::SupportedFunctionReturnValue;
1246 use chrono::DateTime;
1247 use chrono::Datelike;
1248 use chrono::Utc;
1249 use insta::assert_snapshot;
1250 use rstest::rstest;
1251 use std::time::Duration;
1252 use strum::VariantArray as _;
1253 use val_json::type_wrapper::TypeWrapper;
1254 use val_json::wast_val::WastVal;
1255 use val_json::wast_val::WastValWithType;
1256
1257 #[rstest(expected => [
1258 PendingStateFinishedResultKind(Result::Ok(())),
1259 PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1260 ])]
1261 #[test]
1262 fn serde_pending_state_finished_result_kind_should_work(
1263 expected: PendingStateFinishedResultKind,
1264 ) {
1265 let ser = serde_json::to_string(&expected).unwrap();
1266 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1267 assert_eq!(expected, actual);
1268 }
1269
1270 #[rstest(result_kind => [
1271 PendingStateFinishedResultKind(Result::Ok(())),
1272 PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1273 ])]
1274 #[test]
1275 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1276 let expected = PendingStateFinished {
1277 version: 0,
1278 finished_at: Utc::now(),
1279 result_kind,
1280 };
1281
1282 let ser = serde_json::to_string(&expected).unwrap();
1283 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1284 assert_eq!(expected, actual);
1285 }
1286
1287 #[test]
1288 fn join_set_deser_with_result_ok_option_none_should_work() {
1289 let expected = SupportedFunctionReturnValue::Ok {
1290 ok: Some(WastValWithType {
1291 r#type: TypeWrapper::Result {
1292 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1293 err: Some(Box::new(TypeWrapper::String)),
1294 },
1295 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1296 }),
1297 };
1298 let json = serde_json::to_string(&expected).unwrap();
1299 assert_snapshot!(json);
1300
1301 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1302
1303 assert_eq!(expected, actual);
1304 }
1305
1306 #[test]
1307 fn verify_pending_state_finished_result_kind_serde() {
1308 let variants: Vec<_> = PendingStateFinishedError::VARIANTS
1309 .iter()
1310 .map(|var| PendingStateFinishedResultKind(Err(*var)))
1311 .chain(std::iter::once(PendingStateFinishedResultKind(Ok(()))))
1312 .collect();
1313 let ser = serde_json::to_string_pretty(&variants).unwrap();
1314 insta::assert_snapshot!(ser);
1315 let deser: Vec<PendingStateFinishedResultKind> = serde_json::from_str(&ser).unwrap();
1316 assert_eq!(variants, deser);
1317 }
1318
1319 #[test]
1320 fn as_date_time_should_work_with_duration_u32_max_secs() {
1321 let duration = Duration::from_secs(u64::from(u32::MAX));
1322 let schedule_at = HistoryEventScheduleAt::In(duration);
1323 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1324 assert_eq!(2106, resolved.year());
1325 }
1326
1327 const MILLIS_PER_SEC: i64 = 1000;
1328 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1329
1330 #[test]
1331 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1332 let duration = Duration::from_secs(
1334 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1335 );
1336 let schedule_at = HistoryEventScheduleAt::In(duration);
1337 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1338 }
1339}