1use crate::ClosingStrategy;
2use crate::ComponentId;
3use crate::ComponentRetryConfig;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FunctionFqn;
7use crate::JoinSetId;
8use crate::Params;
9use crate::StrVariant;
10use crate::SupportedFunctionReturnValue;
11use crate::prefixed_ulid::DelayId;
12use crate::prefixed_ulid::ExecutionIdDerived;
13use crate::prefixed_ulid::ExecutorId;
14use crate::prefixed_ulid::RunId;
15use assert_matches::assert_matches;
16use async_trait::async_trait;
17use chrono::TimeDelta;
18use chrono::{DateTime, Utc};
19use http_client_trace::HttpClientTrace;
20use serde::Deserialize;
21use serde::Serialize;
22use std::fmt::Debug;
23use std::fmt::Display;
24use std::pin::Pin;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::time::Duration;
28use strum::IntoStaticStr;
29use tracing::error;
30
31#[derive(Debug, PartialEq, Eq)]
33#[cfg_attr(feature = "test", derive(Serialize))]
34pub struct ExecutionLog {
35 pub execution_id: ExecutionId,
36 pub events: Vec<ExecutionEvent>,
37 pub responses: Vec<JoinSetResponseEventOuter>,
38 pub next_version: Version, pub pending_state: PendingState,
40}
41
42impl ExecutionLog {
43 #[must_use]
44 pub fn can_be_retried_after(
45 temporary_event_count: u32,
46 max_retries: u32,
47 retry_exp_backoff: Duration,
48 ) -> Option<Duration> {
49 if temporary_event_count <= max_retries {
51 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
53 Some(duration)
54 } else {
55 None
56 }
57 }
58
59 #[must_use]
60 pub fn compute_retry_duration_when_retrying_forever(
61 temporary_event_count: u32,
62 retry_exp_backoff: Duration,
63 ) -> Duration {
64 Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
65 .expect("`max_retries` set to MAX must never return None")
66 }
67
68 #[must_use]
69 pub fn ffqn(&self) -> &FunctionFqn {
70 assert_matches!(self.events.first(), Some(ExecutionEvent {
71 event: ExecutionEventInner::Created { ffqn, .. },
72 ..
73 }) => ffqn)
74 }
75
76 #[must_use]
77 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
78 assert_matches!(self.events.first(), Some(ExecutionEvent {
79 event: ExecutionEventInner::Created { parent, .. },
80 ..
81 }) => parent.clone())
82 }
83
84 #[must_use]
85 pub fn last_event(&self) -> &ExecutionEvent {
86 self.events.last().expect("must contain at least one event")
87 }
88
89 #[must_use]
90 pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
91 if let ExecutionEvent {
92 event: ExecutionEventInner::Finished { result, .. },
93 ..
94 } = self.events.pop().expect("must contain at least one event")
95 {
96 Some(result)
97 } else {
98 None
99 }
100 }
101
102 pub fn event_history(&self) -> impl Iterator<Item = HistoryEvent> + '_ {
103 self.events.iter().filter_map(|event| {
104 if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
105 Some(eh.clone())
106 } else {
107 None
108 }
109 })
110 }
111
112 #[cfg(feature = "test")]
113 #[must_use]
114 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
115 self.events
116 .iter()
117 .find_map(move |event| match &event.event {
118 ExecutionEventInner::HistoryEvent {
119 event:
120 HistoryEvent::JoinSetRequest {
121 join_set_id: found,
122 request,
123 },
124 ..
125 } if *join_set_id == *found => Some(request),
126 _ => None,
127 })
128 }
129}
130
131pub type VersionType = u32;
132#[derive(
133 Debug,
134 Default,
135 Clone,
136 PartialEq,
137 Eq,
138 Hash,
139 derive_more::Display,
140 derive_more::Into,
141 serde::Serialize,
142 serde::Deserialize,
143)]
144#[serde(transparent)]
145pub struct Version(pub VersionType);
146impl Version {
147 #[must_use]
148 pub fn new(arg: VersionType) -> Version {
149 Version(arg)
150 }
151
152 #[must_use]
153 pub fn increment(&self) -> Version {
154 Version(self.0 + 1)
155 }
156}
157
158#[derive(
159 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
160)]
161#[display("{event}")]
162pub struct ExecutionEvent {
163 pub created_at: DateTime<Utc>,
164 pub event: ExecutionEventInner,
165 pub backtrace_id: Option<Version>,
166}
167
168#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
170pub struct JoinSetResponseEventOuter {
171 pub created_at: DateTime<Utc>,
172 pub event: JoinSetResponseEvent,
173}
174
175#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
176pub struct JoinSetResponseEvent {
177 pub join_set_id: JoinSetId,
178 pub event: JoinSetResponse,
179}
180
181#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
182#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
183#[serde(tag = "type")]
184pub enum JoinSetResponse {
185 DelayFinished {
186 delay_id: DelayId,
187 },
188 ChildExecutionFinished {
189 child_execution_id: ExecutionIdDerived,
190 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
191 finished_version: Version,
192 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
193 result: SupportedFunctionReturnValue,
194 },
195}
196
197pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
198 ffqn: FunctionFqn::new_static("", ""),
199 params: Params::empty(),
200 parent: None,
201 scheduled_at: DateTime::from_timestamp_nanos(0),
202 component_id: ComponentId::dummy_activity(),
203 metadata: ExecutionMetadata::empty(),
204 scheduled_by: None,
205};
206pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
207 event: HistoryEvent::JoinSetCreate {
208 join_set_id: JoinSetId {
209 kind: crate::JoinSetKind::OneOff,
210 name: StrVariant::empty(),
211 },
212 closing_strategy: ClosingStrategy::Complete,
213 },
214};
215
216#[derive(
217 Clone,
218 derive_more::Debug,
219 derive_more::Display,
220 PartialEq,
221 Eq,
222 Serialize,
223 Deserialize,
224 IntoStaticStr,
225)]
226#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
227#[allow(clippy::large_enum_variant)]
228pub enum ExecutionEventInner {
229 #[display("Created({ffqn}, `{scheduled_at}`)")]
230 Created {
231 ffqn: FunctionFqn,
232 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
233 #[debug(skip)]
234 params: Params,
235 parent: Option<(ExecutionId, JoinSetId)>,
236 scheduled_at: DateTime<Utc>,
237 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
238 component_id: ComponentId,
239 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
240 metadata: ExecutionMetadata,
241 scheduled_by: Option<ExecutionId>,
242 },
243 Locked(Locked),
244 #[display("Unlocked(`{backoff_expires_at}`)")]
249 Unlocked {
250 backoff_expires_at: DateTime<Utc>,
251 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
252 reason: StrVariant,
253 },
254 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
257 TemporarilyFailed {
258 backoff_expires_at: DateTime<Utc>,
259 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
260 reason_full: StrVariant,
261 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason inner")))]
262 reason_inner: StrVariant,
263 detail: Option<String>,
264 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
265 http_client_traces: Option<Vec<HttpClientTrace>>,
266 },
267 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
270 TemporarilyTimedOut {
271 backoff_expires_at: DateTime<Utc>,
272 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
273 http_client_traces: Option<Vec<HttpClientTrace>>,
274 },
275 #[display("Finished")]
277 Finished {
278 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
279 result: SupportedFunctionReturnValue,
280 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
281 http_client_traces: Option<Vec<HttpClientTrace>>,
282 },
283
284 #[display("HistoryEvent({event})")]
285 HistoryEvent {
286 event: HistoryEvent,
287 },
288}
289
290impl ExecutionEventInner {
291 #[must_use]
292 pub fn is_temporary_event(&self) -> bool {
293 matches!(
294 self,
295 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
296 )
297 }
298
299 #[must_use]
300 pub fn variant(&self) -> &'static str {
301 Into::<&'static str>::into(self)
302 }
303
304 #[must_use]
305 pub fn join_set_id(&self) -> Option<&JoinSetId> {
306 match self {
307 Self::Created {
308 parent: Some((_parent_id, join_set_id)),
309 ..
310 } => Some(join_set_id),
311 Self::HistoryEvent {
312 event:
313 HistoryEvent::JoinSetCreate { join_set_id, .. }
314 | HistoryEvent::JoinSetRequest { join_set_id, .. }
315 | HistoryEvent::JoinNext { join_set_id, .. },
316 } => Some(join_set_id),
317 _ => None,
318 }
319 }
320}
321
322#[derive(
323 Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
324)]
325#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
326#[display("Locked(`{lock_expires_at}`, {component_id})")]
327pub struct Locked {
328 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
329 pub component_id: ComponentId,
330 pub executor_id: ExecutorId,
331 pub run_id: RunId,
332 pub lock_expires_at: DateTime<Utc>,
333 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
334 pub retry_config: ComponentRetryConfig,
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
338#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
339#[serde(tag = "type")]
340pub enum PersistKind {
341 #[display("RandomU64({min}, {max_inclusive})")]
342 RandomU64 { min: u64, max_inclusive: u64 },
343 #[display("RandomString({min_length}, {max_length_exclusive})")]
344 RandomString {
345 min_length: u64,
346 max_length_exclusive: u64,
347 },
348}
349
350#[must_use]
351pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
352 value.to_be_bytes()
353}
354
355#[must_use]
356pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
357 u64::from_be_bytes(bytes)
358}
359
360#[derive(
361 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
362)]
363#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
364#[serde(tag = "type")]
365pub enum HistoryEvent {
367 #[display("Persist")]
369 Persist {
370 #[debug(skip)]
371 value: Vec<u8>, kind: PersistKind,
373 },
374 #[display("JoinSetCreate({join_set_id})")]
375 JoinSetCreate {
376 join_set_id: JoinSetId,
377 closing_strategy: ClosingStrategy,
378 },
379 #[display("JoinSetRequest({request})")]
380 JoinSetRequest {
382 join_set_id: JoinSetId,
383 request: JoinSetRequest,
384 },
385 #[display("JoinNext({join_set_id})")]
391 JoinNext {
392 join_set_id: JoinSetId,
393 run_expires_at: DateTime<Utc>,
396 requested_ffqn: Option<FunctionFqn>,
399 closing: bool,
401 },
402 #[display("JoinNextTooMany({join_set_id})")]
404 JoinNextTooMany {
405 join_set_id: JoinSetId,
406 requested_ffqn: Option<FunctionFqn>,
409 },
410 #[display("Schedule({execution_id}, {schedule_at})")]
411 Schedule {
412 execution_id: ExecutionId,
413 schedule_at: HistoryEventScheduleAt, },
415 #[display("Stub({target_execution_id})")]
416 Stub {
417 target_execution_id: ExecutionIdDerived,
418 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
419 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
422}
423
424#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
425#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
426pub enum HistoryEventScheduleAt {
427 Now,
428 #[display("At(`{_0}`)")]
429 At(DateTime<Utc>),
430 #[display("In({_0:?})")]
431 In(Duration),
432}
433
434#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
435pub enum ScheduleAtConversionError {
436 #[error("source duration value is out of range")]
437 OutOfRangeError,
438}
439
440impl HistoryEventScheduleAt {
441 pub fn as_date_time(
442 &self,
443 now: DateTime<Utc>,
444 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
445 match self {
446 Self::Now => Ok(now),
447 Self::At(date_time) => Ok(*date_time),
448 Self::In(duration) => {
449 let time_delta = TimeDelta::from_std(*duration)
450 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
451 now.checked_add_signed(time_delta)
452 .ok_or(ScheduleAtConversionError::OutOfRangeError)
453 }
454 }
455 }
456}
457
458#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
459#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
460#[serde(tag = "type")]
461pub enum JoinSetRequest {
462 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
464 DelayRequest {
465 delay_id: DelayId,
466 expires_at: DateTime<Utc>,
467 schedule_at: HistoryEventScheduleAt,
468 },
469 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
471 ChildExecutionRequest {
472 child_execution_id: ExecutionIdDerived,
473 target_ffqn: FunctionFqn,
474 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
475 params: Params,
476 },
477}
478
479#[derive(Debug, Clone, thiserror::Error, PartialEq)]
481pub enum DbErrorGeneric {
482 #[error("database error: {0}")]
483 Uncategorized(StrVariant),
484 #[error("database was closed")]
485 Close,
486}
487
488#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
489pub enum DbErrorWriteNonRetriable {
490 #[error("validation failed: {0}")]
491 ValidationFailed(StrVariant),
492 #[error("illegal state: {0}")]
493 IllegalState(StrVariant),
494 #[error("version conflict: {0}")]
495 VersionConflict(Version),
496}
497
498#[derive(Debug, Clone, thiserror::Error, PartialEq)]
500pub enum DbErrorWrite {
501 #[error("cannot write - row not found")]
502 NotFound,
503 #[error("non-retriable error: {0}")]
504 NonRetriable(#[from] DbErrorWriteNonRetriable),
505 #[error(transparent)]
506 Generic(#[from] DbErrorGeneric),
507}
508
509#[derive(Debug, Clone, thiserror::Error, PartialEq)]
511pub enum DbErrorRead {
512 #[error("cannot read - row not found")]
513 NotFound,
514 #[error(transparent)]
515 Generic(#[from] DbErrorGeneric),
516}
517
518impl From<DbErrorRead> for DbErrorWrite {
519 fn from(value: DbErrorRead) -> DbErrorWrite {
520 match value {
521 DbErrorRead::NotFound => DbErrorWrite::NotFound,
522 DbErrorRead::Generic(err) => DbErrorWrite::Generic(err),
523 }
524 }
525}
526
527#[derive(Debug, thiserror::Error, PartialEq)]
528pub enum DbErrorReadWithTimeout {
529 #[error("timeout")]
530 Timeout,
531 #[error(transparent)]
532 DbErrorRead(#[from] DbErrorRead),
533}
534impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
535 fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
536 Self::from(DbErrorRead::from(value))
537 }
538}
539
540pub type AppendResponse = Version;
543pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
544
545#[derive(Debug, Clone)]
546pub struct LockedExecution {
547 pub execution_id: ExecutionId,
548 pub next_version: Version,
549 pub metadata: ExecutionMetadata,
550 pub locked_event: Locked,
551 pub ffqn: FunctionFqn,
552 pub params: Params,
553 pub event_history: Vec<HistoryEvent>, pub responses: Vec<JoinSetResponseEventOuter>,
555 pub parent: Option<(ExecutionId, JoinSetId)>,
556 pub intermittent_event_count: u32,
557}
558
559pub type LockPendingResponse = Vec<LockedExecution>;
560pub type AppendBatchResponse = Version;
561
562#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
563#[display("{event}")]
564pub struct AppendRequest {
565 pub created_at: DateTime<Utc>,
566 pub event: ExecutionEventInner,
567}
568
569#[derive(Debug, Clone)]
570pub struct CreateRequest {
571 pub created_at: DateTime<Utc>,
572 pub execution_id: ExecutionId,
573 pub ffqn: FunctionFqn,
574 pub params: Params,
575 pub parent: Option<(ExecutionId, JoinSetId)>,
576 pub scheduled_at: DateTime<Utc>,
577 pub component_id: ComponentId,
578 pub metadata: ExecutionMetadata,
579 pub scheduled_by: Option<ExecutionId>,
580}
581
582impl From<CreateRequest> for ExecutionEventInner {
583 fn from(value: CreateRequest) -> Self {
584 Self::Created {
585 ffqn: value.ffqn,
586 params: value.params,
587 parent: value.parent,
588 scheduled_at: value.scheduled_at,
589 component_id: value.component_id,
590 metadata: value.metadata,
591 scheduled_by: value.scheduled_by,
592 }
593 }
594}
595
596#[async_trait]
597pub trait DbPool: Send + Sync {
598 fn connection(&self) -> Box<dyn DbConnection>;
599}
600
601#[async_trait]
602pub trait DbPoolCloseable {
603 async fn close(self);
604}
605
606#[derive(Clone, Debug)]
607pub struct AppendEventsToExecution {
608 pub execution_id: ExecutionId,
609 pub version: Version,
610 pub batch: Vec<AppendRequest>,
611}
612
613#[derive(Clone, Debug)]
614pub struct AppendResponseToExecution {
615 pub parent_execution_id: ExecutionId,
616 pub parent_response_event: JoinSetResponseEventOuter,
617}
618
619#[async_trait]
620pub trait DbExecutor: Send + Sync {
621 #[expect(clippy::too_many_arguments)]
622 async fn lock_pending(
623 &self,
624 batch_size: usize,
625 pending_at_or_sooner: DateTime<Utc>,
626 ffqns: Arc<[FunctionFqn]>,
627 created_at: DateTime<Utc>,
628 component_id: ComponentId,
629 executor_id: ExecutorId,
630 lock_expires_at: DateTime<Utc>,
631 run_id: RunId,
632 retry_config: ComponentRetryConfig,
633 ) -> Result<LockPendingResponse, DbErrorGeneric>;
634
635 #[expect(clippy::too_many_arguments)]
637 async fn lock_one(
638 &self,
639 created_at: DateTime<Utc>,
640 component_id: ComponentId,
641 execution_id: &ExecutionId,
642 run_id: RunId,
643 version: Version,
644 executor_id: ExecutorId,
645 lock_expires_at: DateTime<Utc>,
646 retry_config: ComponentRetryConfig,
647 ) -> Result<LockedExecution, DbErrorWrite>;
648
649 async fn append(
652 &self,
653 execution_id: ExecutionId,
654 version: Version,
655 req: AppendRequest,
656 ) -> Result<AppendResponse, DbErrorWrite>;
657
658 async fn append_batch_respond_to_parent(
661 &self,
662 events: AppendEventsToExecution,
663 response: AppendResponseToExecution,
664 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
666
667 async fn wait_for_pending(
672 &self,
673 pending_at_or_sooner: DateTime<Utc>,
674 ffqns: Arc<[FunctionFqn]>,
675 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
676 );
677}
678
679#[async_trait]
680pub trait DbConnection: DbExecutor {
681 async fn append_response(
682 &self,
683 created_at: DateTime<Utc>,
684 execution_id: ExecutionId,
685 response_event: JoinSetResponseEvent,
686 ) -> Result<(), DbErrorWrite>;
687
688 async fn append_batch(
691 &self,
692 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
694 execution_id: ExecutionId,
695 version: Version,
696 ) -> Result<AppendBatchResponse, DbErrorWrite>;
697
698 async fn append_batch_create_new_execution(
701 &self,
702 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
705 version: Version,
706 child_req: Vec<CreateRequest>,
707 ) -> Result<AppendBatchResponse, DbErrorWrite>;
708
709 #[cfg(feature = "test")]
710 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
712
713 async fn list_execution_events(
714 &self,
715 execution_id: &ExecutionId,
716 since: &Version,
717 max_length: VersionType,
718 include_backtrace_id: bool,
719 ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
720
721 async fn get_execution_event(
723 &self,
724 execution_id: &ExecutionId,
725 version: &Version,
726 ) -> Result<ExecutionEvent, DbErrorRead>;
727
728 async fn get_create_request(
729 &self,
730 execution_id: &ExecutionId,
731 ) -> Result<CreateRequest, DbErrorRead> {
732 let execution_event = self
733 .get_execution_event(execution_id, &Version::new(0))
734 .await?;
735 if let ExecutionEventInner::Created {
736 ffqn,
737 params,
738 parent,
739 scheduled_at,
740 component_id,
741 metadata,
742 scheduled_by,
743 } = execution_event.event
744 {
745 Ok(CreateRequest {
746 created_at: execution_event.created_at,
747 execution_id: execution_id.clone(),
748 ffqn,
749 params,
750 parent,
751 scheduled_at,
752 component_id,
753 metadata,
754 scheduled_by,
755 })
756 } else {
757 error!(%execution_id, "Execution log must start with creation");
758 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized(
759 "execution log must start with creation".into(),
760 )))
761 }
762 }
763
764 async fn get_finished_result(
765 &self,
766 execution_id: &ExecutionId,
767 finished: PendingStateFinished,
768 ) -> Result<Option<SupportedFunctionReturnValue>, DbErrorRead> {
769 let last_event = self
770 .get_execution_event(execution_id, &Version::new(finished.version))
771 .await?;
772 if let ExecutionEventInner::Finished { result, .. } = last_event.event {
773 Ok(Some(result))
774 } else {
775 Ok(None)
776 }
777 }
778
779 async fn get_pending_state(
780 &self,
781 execution_id: &ExecutionId,
782 ) -> Result<PendingState, DbErrorRead>;
783
784 async fn get_expired_timers(
786 &self,
787 at: DateTime<Utc>,
788 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
789
790 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
792
793 async fn subscribe_to_next_responses(
798 &self,
799 execution_id: &ExecutionId,
800 start_idx: usize,
801 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
802 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
803
804 async fn wait_for_finished_result(
807 &self,
808 execution_id: &ExecutionId,
809 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
810 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
811
812 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
813
814 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
815
816 async fn get_backtrace(
819 &self,
820 execution_id: &ExecutionId,
821 filter: BacktraceFilter,
822 ) -> Result<BacktraceInfo, DbErrorRead>;
823
824 async fn list_executions(
827 &self,
828 ffqn: Option<FunctionFqn>,
829 top_level_only: bool,
830 pagination: ExecutionListPagination,
831 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
832
833 async fn list_responses(
837 &self,
838 execution_id: &ExecutionId,
839 pagination: Pagination<u32>,
840 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
841}
842
843#[derive(Clone)]
844pub enum BacktraceFilter {
845 First,
846 Last,
847 Specific(Version),
848}
849
850pub struct BacktraceInfo {
851 pub execution_id: ExecutionId,
852 pub component_id: ComponentId,
853 pub version_min_including: Version,
854 pub version_max_excluding: Version,
855 pub wasm_backtrace: WasmBacktrace,
856}
857
858#[derive(Serialize, Deserialize, Debug, Clone)]
859pub struct WasmBacktrace {
860 pub frames: Vec<FrameInfo>,
861}
862
863#[derive(Serialize, Deserialize, Debug, Clone)]
864pub struct FrameInfo {
865 pub module: String,
866 pub func_name: String,
867 pub symbols: Vec<FrameSymbol>,
868}
869
870#[derive(Serialize, Deserialize, Debug, Clone)]
871pub struct FrameSymbol {
872 pub func_name: Option<String>,
873 pub file: Option<String>,
874 pub line: Option<u32>,
875 pub col: Option<u32>,
876}
877
878mod wasm_backtrace {
879 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
880
881 impl WasmBacktrace {
882 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
883 if backtrace.frames().is_empty() {
884 None
885 } else {
886 Some(Self {
887 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
888 })
889 }
890 }
891 }
892
893 impl From<&wasmtime::FrameInfo> for FrameInfo {
894 fn from(frame: &wasmtime::FrameInfo) -> Self {
895 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
896 let mut func_name = String::new();
897 wasmtime_environ::demangle_function_name_or_index(
898 &mut func_name,
899 frame.func_name(),
900 frame.func_index() as usize,
901 )
902 .expect("writing to string must succeed");
903 Self {
904 module: module_name,
905 func_name,
906 symbols: frame
907 .symbols()
908 .iter()
909 .map(std::convert::Into::into)
910 .collect(),
911 }
912 }
913 }
914
915 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
916 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
917 let func_name = symbol.name().map(|name| {
918 let mut writer = String::new();
919 wasmtime_environ::demangle_function_name(&mut writer, name)
920 .expect("writing to string must succeed");
921 writer
922 });
923
924 Self {
925 func_name,
926 file: symbol.file().map(ToString::to_string),
927 line: symbol.line(),
928 col: symbol.column(),
929 }
930 }
931 }
932}
933
934pub type ResponseCursorType = u32; #[derive(Debug, Clone)]
937pub struct ResponseWithCursor {
938 pub event: JoinSetResponseEventOuter,
939 pub cursor: ResponseCursorType,
940}
941
942#[derive(Debug)]
943pub struct ExecutionWithState {
944 pub execution_id: ExecutionId,
945 pub ffqn: FunctionFqn,
946 pub pending_state: PendingState,
947 pub created_at: DateTime<Utc>,
948 pub scheduled_at: DateTime<Utc>,
949}
950
951pub enum ExecutionListPagination {
952 CreatedBy(Pagination<Option<DateTime<Utc>>>),
953 ExecutionId(Pagination<Option<ExecutionId>>),
954}
955
956#[derive(Debug, Clone, Copy)]
957pub enum Pagination<T> {
958 NewerThan {
959 length: ResponseCursorType,
960 cursor: T,
961 including_cursor: bool,
962 },
963 OlderThan {
964 length: ResponseCursorType,
965 cursor: T,
966 including_cursor: bool,
967 },
968}
969impl<T> Pagination<T> {
970 pub fn length(&self) -> ResponseCursorType {
971 match self {
972 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
973 }
974 }
975 pub fn rel(&self) -> &'static str {
976 match self {
977 Pagination::NewerThan {
978 including_cursor: false,
979 ..
980 } => ">",
981 Pagination::NewerThan {
982 including_cursor: true,
983 ..
984 } => ">=",
985 Pagination::OlderThan {
986 including_cursor: false,
987 ..
988 } => "<",
989 Pagination::OlderThan {
990 including_cursor: true,
991 ..
992 } => "<=",
993 }
994 }
995 pub fn is_desc(&self) -> bool {
996 matches!(self, Pagination::OlderThan { .. })
997 }
998}
999
1000#[cfg(feature = "test")]
1001pub async fn wait_for_pending_state_fn<T: Debug>(
1002 db_connection: &dyn DbConnection,
1003 execution_id: &ExecutionId,
1004 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1005 timeout: Option<Duration>,
1006) -> Result<T, DbErrorReadWithTimeout> {
1007 tracing::trace!(%execution_id, "Waiting for predicate");
1008 let fut = async move {
1009 loop {
1010 let execution_log = db_connection.get(execution_id).await?;
1011 if let Some(t) = predicate(execution_log) {
1012 tracing::debug!(%execution_id, "Found: {t:?}");
1013 return Ok(t);
1014 }
1015 tokio::time::sleep(Duration::from_millis(10)).await;
1016 }
1017 };
1018
1019 if let Some(timeout) = timeout {
1020 tokio::select! { res = fut => res,
1022 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1023 }
1024 } else {
1025 fut.await
1026 }
1027}
1028
1029#[derive(Debug, Clone, PartialEq, Eq)]
1030pub enum ExpiredTimer {
1031 Lock(ExpiredLock),
1032 Delay(ExpiredDelay),
1033}
1034
1035#[derive(Debug, Clone, PartialEq, Eq)]
1036pub struct ExpiredLock {
1037 pub execution_id: ExecutionId,
1038 pub locked_at_version: Version,
1040 pub next_version: Version,
1041 pub intermittent_event_count: u32,
1043 pub max_retries: u32,
1044 pub retry_exp_backoff: Duration,
1045 pub locked_by: LockedBy,
1046}
1047
1048#[derive(Debug, Clone, PartialEq, Eq)]
1049pub struct ExpiredDelay {
1050 pub execution_id: ExecutionId,
1051 pub join_set_id: JoinSetId,
1052 pub delay_id: DelayId,
1053}
1054
1055#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq)]
1056#[cfg_attr(feature = "test", derive(Serialize))]
1057#[cfg_attr(feature = "test", serde(tag = "type"))]
1058pub enum PendingState {
1059 Locked(PendingStateLocked),
1060 #[display("PendingAt(`{scheduled_at}`)")]
1061 PendingAt {
1062 scheduled_at: DateTime<Utc>,
1063 last_lock: Option<LockedBy>, }, #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1066 BlockedByJoinSet {
1068 join_set_id: JoinSetId,
1069 lock_expires_at: DateTime<Utc>,
1071 closing: bool,
1073 },
1074 #[display("Finished({finished})")]
1075 Finished {
1076 finished: PendingStateFinished,
1077 },
1078}
1079
1080#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq)]
1081#[cfg_attr(feature = "test", derive(Serialize))]
1082#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1083pub struct PendingStateLocked {
1084 pub locked_by: LockedBy,
1085 pub lock_expires_at: DateTime<Utc>,
1086}
1087
1088#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1089pub struct LockedBy {
1090 pub executor_id: ExecutorId,
1091 pub run_id: RunId,
1092}
1093
1094#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1095#[display("{result_kind}")]
1096pub struct PendingStateFinished {
1097 pub version: VersionType, pub finished_at: DateTime<Utc>,
1099 pub result_kind: PendingStateFinishedResultKind,
1100}
1101
1102#[derive(
1103 Debug, Clone, Copy, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
1104)]
1105pub struct PendingStateFinishedResultKind(pub Result<(), PendingStateFinishedError>);
1106const OK_VARIANT: &str = "ok";
1107impl FromStr for PendingStateFinishedResultKind {
1108 type Err = strum::ParseError;
1109
1110 fn from_str(s: &str) -> Result<Self, Self::Err> {
1111 if s == OK_VARIANT {
1112 Ok(PendingStateFinishedResultKind(Ok(())))
1113 } else {
1114 let err = PendingStateFinishedError::from_str(s)?;
1115 Ok(PendingStateFinishedResultKind(Err(err)))
1116 }
1117 }
1118}
1119
1120impl Display for PendingStateFinishedResultKind {
1121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1122 match self.0 {
1123 Ok(()) => write!(f, "{OK_VARIANT}"),
1124 Err(err) => write!(f, "{err}"),
1125 }
1126 }
1127}
1128
1129impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1130 fn from(result: &SupportedFunctionReturnValue) -> Self {
1131 result.as_pending_state_finished_result()
1132 }
1133}
1134
1135#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumString, strum::Display)]
1136#[cfg_attr(test, derive(strum::VariantArray))]
1137#[strum(serialize_all = "snake_case")]
1138pub enum PendingStateFinishedError {
1139 Timeout,
1140 ExecutionFailure,
1141 FallibleError,
1142}
1143
1144impl PendingState {
1145 pub fn can_append_lock(
1146 &self,
1147 created_at: DateTime<Utc>,
1148 executor_id: ExecutorId,
1149 run_id: RunId,
1150 lock_expires_at: DateTime<Utc>,
1151 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1152 if lock_expires_at <= created_at {
1153 return Err(DbErrorWriteNonRetriable::ValidationFailed(
1154 "invalid expiry date".into(),
1155 ));
1156 }
1157 match self {
1158 PendingState::PendingAt {
1159 scheduled_at,
1160 last_lock,
1161 } => {
1162 if *scheduled_at <= created_at {
1163 Ok(LockKind::CreatingNewLock)
1165 } else if let Some(LockedBy {
1166 executor_id: last_executor_id,
1167 run_id: last_run_id,
1168 }) = last_lock
1169 && executor_id == *last_executor_id
1170 && run_id == *last_run_id
1171 {
1172 Ok(LockKind::Extending)
1174 } else {
1175 Err(DbErrorWriteNonRetriable::ValidationFailed(
1176 "cannot lock, not yet pending".into(),
1177 ))
1178 }
1179 }
1180 PendingState::Locked(PendingStateLocked {
1181 locked_by:
1182 LockedBy {
1183 executor_id: current_pending_state_executor_id,
1184 run_id: current_pending_state_run_id,
1185 },
1186 lock_expires_at: _,
1187 }) => {
1188 if executor_id == *current_pending_state_executor_id
1189 && run_id == *current_pending_state_run_id
1190 {
1191 Ok(LockKind::Extending)
1193 } else {
1194 Err(DbErrorWriteNonRetriable::IllegalState(
1195 "cannot lock, already locked".into(),
1196 ))
1197 }
1198 }
1199 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1200 "cannot append Locked event when in BlockedByJoinSet state".into(),
1201 )),
1202 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1203 "already finished".into(),
1204 )),
1205 }
1206 }
1207
1208 #[must_use]
1209 pub fn is_finished(&self) -> bool {
1210 matches!(self, PendingState::Finished { .. })
1211 }
1212}
1213
1214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1215pub enum LockKind {
1216 Extending,
1217 CreatingNewLock,
1218}
1219
1220pub mod http_client_trace {
1221 use chrono::{DateTime, Utc};
1222 use serde::{Deserialize, Serialize};
1223
1224 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1225 pub struct HttpClientTrace {
1226 pub req: RequestTrace,
1227 pub resp: Option<ResponseTrace>,
1228 }
1229
1230 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1231 pub struct RequestTrace {
1232 pub sent_at: DateTime<Utc>,
1233 pub uri: String,
1234 pub method: String,
1235 }
1236
1237 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1238 pub struct ResponseTrace {
1239 pub finished_at: DateTime<Utc>,
1240 pub status: Result<u16, String>,
1241 }
1242}
1243
1244#[cfg(test)]
1245mod tests {
1246 use super::HistoryEventScheduleAt;
1247 use super::PendingStateFinished;
1248 use super::PendingStateFinishedError;
1249 use super::PendingStateFinishedResultKind;
1250 use crate::SupportedFunctionReturnValue;
1251 use chrono::DateTime;
1252 use chrono::Datelike;
1253 use chrono::Utc;
1254 use insta::assert_snapshot;
1255 use rstest::rstest;
1256 use std::time::Duration;
1257 use strum::VariantArray as _;
1258 use val_json::type_wrapper::TypeWrapper;
1259 use val_json::wast_val::WastVal;
1260 use val_json::wast_val::WastValWithType;
1261
1262 #[rstest(expected => [
1263 PendingStateFinishedResultKind(Result::Ok(())),
1264 PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1265 ])]
1266 #[test]
1267 fn serde_pending_state_finished_result_kind_should_work(
1268 expected: PendingStateFinishedResultKind,
1269 ) {
1270 let ser = serde_json::to_string(&expected).unwrap();
1271 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1272 assert_eq!(expected, actual);
1273 }
1274
1275 #[rstest(result_kind => [
1276 PendingStateFinishedResultKind(Result::Ok(())),
1277 PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1278 ])]
1279 #[test]
1280 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1281 let expected = PendingStateFinished {
1282 version: 0,
1283 finished_at: Utc::now(),
1284 result_kind,
1285 };
1286
1287 let ser = serde_json::to_string(&expected).unwrap();
1288 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1289 assert_eq!(expected, actual);
1290 }
1291
1292 #[test]
1293 fn join_set_deser_with_result_ok_option_none_should_work() {
1294 let expected = SupportedFunctionReturnValue::Ok {
1295 ok: Some(WastValWithType {
1296 r#type: TypeWrapper::Result {
1297 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1298 err: Some(Box::new(TypeWrapper::String)),
1299 },
1300 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1301 }),
1302 };
1303 let json = serde_json::to_string(&expected).unwrap();
1304 assert_snapshot!(json);
1305
1306 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1307
1308 assert_eq!(expected, actual);
1309 }
1310
1311 #[test]
1312 fn verify_pending_state_finished_result_kind_serde() {
1313 let variants: Vec<_> = PendingStateFinishedError::VARIANTS
1314 .iter()
1315 .map(|var| PendingStateFinishedResultKind(Err(*var)))
1316 .chain(std::iter::once(PendingStateFinishedResultKind(Ok(()))))
1317 .collect();
1318 let ser = serde_json::to_string_pretty(&variants).unwrap();
1319 insta::assert_snapshot!(ser);
1320 let deser: Vec<PendingStateFinishedResultKind> = serde_json::from_str(&ser).unwrap();
1321 assert_eq!(variants, deser);
1322 }
1323
1324 #[test]
1325 fn as_date_time_should_work_with_duration_u32_max_secs() {
1326 let duration = Duration::from_secs(u64::from(u32::MAX));
1327 let schedule_at = HistoryEventScheduleAt::In(duration);
1328 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1329 assert_eq!(2106, resolved.year());
1330 }
1331
1332 const MILLIS_PER_SEC: i64 = 1000;
1333 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1334
1335 #[test]
1336 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1337 let duration = Duration::from_secs(
1339 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1340 );
1341 let schedule_at = HistoryEventScheduleAt::In(duration);
1342 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1343 }
1344}