1use crate::ClosingStrategy;
2use crate::ComponentId;
3use crate::ComponentRetryConfig;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FunctionFqn;
7use crate::JoinSetId;
8use crate::Params;
9use crate::StrVariant;
10use crate::SupportedFunctionReturnValue;
11use crate::prefixed_ulid::DelayId;
12use crate::prefixed_ulid::ExecutionIdDerived;
13use crate::prefixed_ulid::ExecutorId;
14use crate::prefixed_ulid::RunId;
15use assert_matches::assert_matches;
16use async_trait::async_trait;
17use chrono::TimeDelta;
18use chrono::{DateTime, Utc};
19use http_client_trace::HttpClientTrace;
20use serde::Deserialize;
21use serde::Serialize;
22use std::fmt::Debug;
23use std::fmt::Display;
24use std::pin::Pin;
25use std::str::FromStr;
26use std::sync::Arc;
27use std::time::Duration;
28use strum::IntoStaticStr;
29use tracing::error;
30
31#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
33pub struct ExecutionLog {
34 pub execution_id: ExecutionId,
35 pub events: Vec<ExecutionEvent>,
36 pub responses: Vec<JoinSetResponseEventOuter>,
37 pub next_version: Version, pub pending_state: PendingState,
39}
40
41impl ExecutionLog {
42 #[must_use]
43 pub fn can_be_retried_after(
44 temporary_event_count: u32,
45 max_retries: u32,
46 retry_exp_backoff: Duration,
47 ) -> Option<Duration> {
48 if temporary_event_count <= max_retries {
50 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
52 Some(duration)
53 } else {
54 None
55 }
56 }
57
58 #[must_use]
59 pub fn compute_retry_duration_when_retrying_forever(
60 temporary_event_count: u32,
61 retry_exp_backoff: Duration,
62 ) -> Duration {
63 Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
64 .expect("`max_retries` set to MAX must never return None")
65 }
66
67 #[must_use]
68 pub fn ffqn(&self) -> &FunctionFqn {
69 assert_matches!(self.events.first(), Some(ExecutionEvent {
70 event: ExecutionEventInner::Created { ffqn, .. },
71 ..
72 }) => ffqn)
73 }
74
75 #[must_use]
76 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
77 assert_matches!(self.events.first(), Some(ExecutionEvent {
78 event: ExecutionEventInner::Created { parent, .. },
79 ..
80 }) => parent.clone())
81 }
82
83 #[must_use]
84 pub fn last_event(&self) -> &ExecutionEvent {
85 self.events.last().expect("must contain at least one event")
86 }
87
88 #[must_use]
89 pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
90 if let ExecutionEvent {
91 event: ExecutionEventInner::Finished { result, .. },
92 ..
93 } = self.events.pop().expect("must contain at least one event")
94 {
95 Some(result)
96 } else {
97 None
98 }
99 }
100
101 pub fn event_history(&self) -> impl Iterator<Item = HistoryEvent> + '_ {
102 self.events.iter().filter_map(|event| {
103 if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
104 Some(eh.clone())
105 } else {
106 None
107 }
108 })
109 }
110
111 #[cfg(feature = "test")]
112 #[must_use]
113 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
114 self.events
115 .iter()
116 .find_map(move |event| match &event.event {
117 ExecutionEventInner::HistoryEvent {
118 event:
119 HistoryEvent::JoinSetRequest {
120 join_set_id: found,
121 request,
122 },
123 ..
124 } if *join_set_id == *found => Some(request),
125 _ => None,
126 })
127 }
128}
129
130pub type VersionType = u32;
131#[derive(
132 Debug,
133 Default,
134 Clone,
135 PartialEq,
136 Eq,
137 Hash,
138 derive_more::Display,
139 derive_more::Into,
140 serde::Serialize,
141 serde::Deserialize,
142)]
143#[serde(transparent)]
144pub struct Version(pub VersionType);
145impl Version {
146 #[must_use]
147 pub fn new(arg: VersionType) -> Version {
148 Version(arg)
149 }
150
151 #[must_use]
152 pub fn increment(&self) -> Version {
153 Version(self.0 + 1)
154 }
155}
156
157#[derive(
158 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
159)]
160#[display("{event}")]
161pub struct ExecutionEvent {
162 pub created_at: DateTime<Utc>,
163 pub event: ExecutionEventInner,
164 pub backtrace_id: Option<Version>,
165}
166
167#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
169pub struct JoinSetResponseEventOuter {
170 pub created_at: DateTime<Utc>,
171 pub event: JoinSetResponseEvent,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
175pub struct JoinSetResponseEvent {
176 pub join_set_id: JoinSetId,
177 pub event: JoinSetResponse,
178}
179
180#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
181#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
182#[serde(tag = "type")]
183pub enum JoinSetResponse {
184 DelayFinished {
185 delay_id: DelayId,
186 },
187 ChildExecutionFinished {
188 child_execution_id: ExecutionIdDerived,
189 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
190 finished_version: Version,
191 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
192 result: SupportedFunctionReturnValue,
193 },
194}
195
196pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
197 ffqn: FunctionFqn::new_static("", ""),
198 params: Params::empty(),
199 parent: None,
200 scheduled_at: DateTime::from_timestamp_nanos(0),
201 component_id: ComponentId::dummy_activity(),
202 metadata: ExecutionMetadata::empty(),
203 scheduled_by: None,
204};
205pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
206 event: HistoryEvent::JoinSetCreate {
207 join_set_id: JoinSetId {
208 kind: crate::JoinSetKind::OneOff,
209 name: StrVariant::empty(),
210 },
211 closing_strategy: ClosingStrategy::Complete,
212 },
213};
214
215#[derive(
216 Clone,
217 derive_more::Debug,
218 derive_more::Display,
219 PartialEq,
220 Eq,
221 Serialize,
222 Deserialize,
223 IntoStaticStr,
224)]
225#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
226#[allow(clippy::large_enum_variant)]
227pub enum ExecutionEventInner {
228 #[display("Created({ffqn}, `{scheduled_at}`)")]
229 Created {
230 ffqn: FunctionFqn,
231 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
232 #[debug(skip)]
233 params: Params,
234 parent: Option<(ExecutionId, JoinSetId)>,
235 scheduled_at: DateTime<Utc>,
236 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
237 component_id: ComponentId,
238 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
239 metadata: ExecutionMetadata,
240 scheduled_by: Option<ExecutionId>,
241 },
242 #[display("Locked(`{lock_expires_at}`, {component_id})")]
243 Locked {
244 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
245 component_id: ComponentId,
246 executor_id: ExecutorId,
247 run_id: RunId,
248 lock_expires_at: DateTime<Utc>,
249 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
250 retry_config: ComponentRetryConfig,
251 },
252 #[display("Unlocked(`{backoff_expires_at}`)")]
257 Unlocked {
258 backoff_expires_at: DateTime<Utc>,
259 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
260 reason: StrVariant,
261 },
262 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
265 TemporarilyFailed {
266 backoff_expires_at: DateTime<Utc>,
267 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
268 reason_full: StrVariant,
269 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason inner")))]
270 reason_inner: StrVariant,
271 detail: Option<String>,
272 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
273 http_client_traces: Option<Vec<HttpClientTrace>>,
274 },
275 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
278 TemporarilyTimedOut {
279 backoff_expires_at: DateTime<Utc>,
280 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
281 http_client_traces: Option<Vec<HttpClientTrace>>,
282 },
283 #[display("Finished")]
285 Finished {
286 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
287 result: SupportedFunctionReturnValue,
288 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
289 http_client_traces: Option<Vec<HttpClientTrace>>,
290 },
291
292 #[display("HistoryEvent({event})")]
293 HistoryEvent { event: HistoryEvent },
294}
295
296impl ExecutionEventInner {
297 #[must_use]
298 pub fn is_temporary_event(&self) -> bool {
299 matches!(
300 self,
301 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
302 )
303 }
304
305 #[must_use]
306 pub fn variant(&self) -> &'static str {
307 Into::<&'static str>::into(self)
308 }
309
310 #[must_use]
311 pub fn join_set_id(&self) -> Option<&JoinSetId> {
312 match self {
313 Self::Created {
314 parent: Some((_parent_id, join_set_id)),
315 ..
316 } => Some(join_set_id),
317 Self::HistoryEvent {
318 event:
319 HistoryEvent::JoinSetCreate { join_set_id, .. }
320 | HistoryEvent::JoinSetRequest { join_set_id, .. }
321 | HistoryEvent::JoinNext { join_set_id, .. },
322 } => Some(join_set_id),
323 _ => None,
324 }
325 }
326}
327
328#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
329#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
330#[serde(tag = "type")]
331pub enum PersistKind {
332 #[display("RandomU64({min}, {max_inclusive})")]
333 RandomU64 { min: u64, max_inclusive: u64 },
334 #[display("RandomString({min_length}, {max_length_exclusive})")]
335 RandomString {
336 min_length: u64,
337 max_length_exclusive: u64,
338 },
339}
340
341#[must_use]
342pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
343 value.to_be_bytes()
344}
345
346#[must_use]
347pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
348 u64::from_be_bytes(bytes)
349}
350
351#[derive(
352 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
353)]
354#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
355#[serde(tag = "type")]
356pub enum HistoryEvent {
358 #[display("Persist")]
360 Persist {
361 #[debug(skip)]
362 value: Vec<u8>, kind: PersistKind,
364 },
365 #[display("JoinSetCreate({join_set_id})")]
366 JoinSetCreate {
367 join_set_id: JoinSetId,
368 closing_strategy: ClosingStrategy,
369 },
370 #[display("JoinSetRequest({request})")]
371 JoinSetRequest {
373 join_set_id: JoinSetId,
374 request: JoinSetRequest,
375 },
376 #[display("JoinNext({join_set_id})")]
382 JoinNext {
383 join_set_id: JoinSetId,
384 run_expires_at: DateTime<Utc>,
387 requested_ffqn: Option<FunctionFqn>,
390 closing: bool,
392 },
393 #[display("JoinNextTooMany({join_set_id})")]
395 JoinNextTooMany {
396 join_set_id: JoinSetId,
397 requested_ffqn: Option<FunctionFqn>,
400 },
401 #[display("Schedule({execution_id}, {schedule_at})")]
402 Schedule {
403 execution_id: ExecutionId,
404 schedule_at: HistoryEventScheduleAt, },
406 #[display("Stub({target_execution_id})")]
407 Stub {
408 target_execution_id: ExecutionIdDerived,
409 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
410 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
413}
414
415#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
416#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
417pub enum HistoryEventScheduleAt {
418 Now,
419 #[display("At(`{_0}`)")]
420 At(DateTime<Utc>),
421 #[display("In({_0:?})")]
422 In(Duration),
423}
424
425#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
426pub enum ScheduleAtConversionError {
427 #[error("source duration value is out of range")]
428 OutOfRangeError,
429}
430
431impl HistoryEventScheduleAt {
432 pub fn as_date_time(
433 &self,
434 now: DateTime<Utc>,
435 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
436 match self {
437 Self::Now => Ok(now),
438 Self::At(date_time) => Ok(*date_time),
439 Self::In(duration) => {
440 let time_delta = TimeDelta::from_std(*duration)
441 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
442 now.checked_add_signed(time_delta)
443 .ok_or(ScheduleAtConversionError::OutOfRangeError)
444 }
445 }
446 }
447}
448
449#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
450#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
451#[serde(tag = "type")]
452pub enum JoinSetRequest {
453 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
455 DelayRequest {
456 delay_id: DelayId,
457 expires_at: DateTime<Utc>,
458 schedule_at: HistoryEventScheduleAt,
459 },
460 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
462 ChildExecutionRequest {
463 child_execution_id: ExecutionIdDerived,
464 target_ffqn: FunctionFqn,
465 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
466 params: Params,
467 },
468}
469
470#[derive(Debug, Clone, thiserror::Error, PartialEq)]
472pub enum DbErrorGeneric {
473 #[error("database error: {0}")]
474 Uncategorized(StrVariant), #[error("database was closed")]
476 Close,
477}
478
479#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
480pub enum DbErrorWritePermanent {
481 #[error("validation failed: {0}")]
483 ValidationFailed(StrVariant),
484 #[error("cannot write: {reason}")]
485 CannotWrite {
486 reason: StrVariant,
487 expected_version: Option<Version>,
488 },
489}
490
491#[derive(Debug, Clone, thiserror::Error, PartialEq)]
493pub enum DbErrorWrite {
494 #[error("cannot write - row not found")]
495 NotFound,
496 #[error("permanent error: {0}")]
498 Permanent(#[from] DbErrorWritePermanent),
499 #[error(transparent)]
500 DbErrorGeneric(#[from] DbErrorGeneric),
501}
502
503#[derive(Debug, Clone, thiserror::Error, PartialEq)]
505pub enum DbErrorRead {
506 #[error("cannot read - row not found")]
507 NotFound,
508 #[error(transparent)]
509 DbErrorGeneric(#[from] DbErrorGeneric),
510}
511
512impl From<DbErrorRead> for DbErrorWrite {
513 fn from(value: DbErrorRead) -> DbErrorWrite {
514 match value {
515 DbErrorRead::NotFound => DbErrorWrite::NotFound,
516 DbErrorRead::DbErrorGeneric(err) => DbErrorWrite::DbErrorGeneric(err),
517 }
518 }
519}
520
521#[derive(Debug, thiserror::Error, PartialEq)]
522pub enum DbErrorReadWithTimeout {
523 #[error("timeout")]
524 Timeout,
525 #[error(transparent)]
526 DbErrorRead(#[from] DbErrorRead),
527}
528impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
529 fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
530 Self::from(DbErrorRead::from(value))
531 }
532}
533
534pub type AppendResponse = Version;
537pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
538
539#[derive(Debug, Clone)]
540pub struct LockedExecution {
541 pub execution_id: ExecutionId,
542 pub next_version: Version,
543 pub metadata: ExecutionMetadata,
544 pub run_id: RunId,
545 pub ffqn: FunctionFqn,
546 pub params: Params,
547 pub event_history: Vec<HistoryEvent>, pub responses: Vec<JoinSetResponseEventOuter>,
549 pub parent: Option<(ExecutionId, JoinSetId)>,
550 pub intermittent_event_count: u32,
551}
552
553pub type LockPendingResponse = Vec<LockedExecution>;
554pub type AppendBatchResponse = Version;
555
556#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
557#[display("{event}")]
558pub struct AppendRequest {
559 pub created_at: DateTime<Utc>,
560 pub event: ExecutionEventInner,
561}
562
563#[derive(Debug, Clone)]
564pub struct CreateRequest {
565 pub created_at: DateTime<Utc>,
566 pub execution_id: ExecutionId,
567 pub ffqn: FunctionFqn,
568 pub params: Params,
569 pub parent: Option<(ExecutionId, JoinSetId)>,
570 pub scheduled_at: DateTime<Utc>,
571 pub component_id: ComponentId,
572 pub metadata: ExecutionMetadata,
573 pub scheduled_by: Option<ExecutionId>,
574}
575
576impl From<CreateRequest> for ExecutionEventInner {
577 fn from(value: CreateRequest) -> Self {
578 Self::Created {
579 ffqn: value.ffqn,
580 params: value.params,
581 parent: value.parent,
582 scheduled_at: value.scheduled_at,
583 component_id: value.component_id,
584 metadata: value.metadata,
585 scheduled_by: value.scheduled_by,
586 }
587 }
588}
589
590#[async_trait]
591pub trait DbPool: Send + Sync {
592 fn connection(&self) -> Box<dyn DbConnection>;
593}
594
595#[async_trait]
596pub trait DbPoolCloseable {
597 async fn close(self);
598}
599
600#[derive(Clone, Debug)]
601pub struct AppendEventsToExecution {
602 pub execution_id: ExecutionId,
603 pub version: Version,
604 pub batch: Vec<AppendRequest>,
605}
606
607#[derive(Clone, Debug)]
608pub struct AppendResponseToExecution {
609 pub parent_execution_id: ExecutionId,
610 pub parent_response_event: JoinSetResponseEventOuter,
611}
612
613#[async_trait]
614pub trait DbExecutor: Send + Sync {
615 #[expect(clippy::too_many_arguments)]
616 async fn lock_pending(
617 &self,
618 batch_size: usize,
619 pending_at_or_sooner: DateTime<Utc>,
620 ffqns: Arc<[FunctionFqn]>,
621 created_at: DateTime<Utc>,
622 component_id: ComponentId,
623 executor_id: ExecutorId,
624 lock_expires_at: DateTime<Utc>,
625 run_id: RunId,
626 retry_config: ComponentRetryConfig,
627 ) -> Result<LockPendingResponse, DbErrorGeneric>;
628
629 #[expect(clippy::too_many_arguments)]
631 async fn lock_one(
632 &self,
633 created_at: DateTime<Utc>,
634 component_id: ComponentId,
635 execution_id: &ExecutionId,
636 run_id: RunId,
637 version: Version,
638 executor_id: ExecutorId,
639 lock_expires_at: DateTime<Utc>,
640 retry_config: ComponentRetryConfig,
641 ) -> Result<LockedExecution, DbErrorWrite>;
642
643 async fn append(
646 &self,
647 execution_id: ExecutionId,
648 version: Version,
649 req: AppendRequest,
650 ) -> Result<AppendResponse, DbErrorWrite>;
651
652 async fn append_batch_respond_to_parent(
655 &self,
656 events: AppendEventsToExecution,
657 response: AppendResponseToExecution,
658 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
660
661 async fn wait_for_pending(
666 &self,
667 pending_at_or_sooner: DateTime<Utc>,
668 ffqns: Arc<[FunctionFqn]>,
669 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
670 );
671}
672
673#[async_trait]
674pub trait DbConnection: DbExecutor {
675 async fn append_response(
676 &self,
677 created_at: DateTime<Utc>,
678 execution_id: ExecutionId,
679 response_event: JoinSetResponseEvent,
680 ) -> Result<(), DbErrorWrite>;
681
682 async fn append_batch(
685 &self,
686 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
688 execution_id: ExecutionId,
689 version: Version,
690 ) -> Result<AppendBatchResponse, DbErrorWrite>;
691
692 async fn append_batch_create_new_execution(
695 &self,
696 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
699 version: Version,
700 child_req: Vec<CreateRequest>,
701 ) -> Result<AppendBatchResponse, DbErrorWrite>;
702
703 #[cfg(feature = "test")]
704 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
706
707 async fn list_execution_events(
708 &self,
709 execution_id: &ExecutionId,
710 since: &Version,
711 max_length: VersionType,
712 include_backtrace_id: bool,
713 ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
714
715 async fn get_execution_event(
717 &self,
718 execution_id: &ExecutionId,
719 version: &Version,
720 ) -> Result<ExecutionEvent, DbErrorRead>;
721
722 async fn get_create_request(
723 &self,
724 execution_id: &ExecutionId,
725 ) -> Result<CreateRequest, DbErrorRead> {
726 let execution_event = self
727 .get_execution_event(execution_id, &Version::new(0))
728 .await?;
729 if let ExecutionEventInner::Created {
730 ffqn,
731 params,
732 parent,
733 scheduled_at,
734 component_id,
735 metadata,
736 scheduled_by,
737 } = execution_event.event
738 {
739 Ok(CreateRequest {
740 created_at: execution_event.created_at,
741 execution_id: execution_id.clone(),
742 ffqn,
743 params,
744 parent,
745 scheduled_at,
746 component_id,
747 metadata,
748 scheduled_by,
749 })
750 } else {
751 error!(%execution_id, "Execution log must start with creation");
752 Err(DbErrorRead::DbErrorGeneric(DbErrorGeneric::Uncategorized(
753 "execution log must start with creation".into(),
754 )))
755 }
756 }
757
758 async fn get_finished_result(
759 &self,
760 execution_id: &ExecutionId,
761 finished: PendingStateFinished,
762 ) -> Result<Option<SupportedFunctionReturnValue>, DbErrorRead> {
763 let last_event = self
764 .get_execution_event(execution_id, &Version::new(finished.version))
765 .await?;
766 if let ExecutionEventInner::Finished { result, .. } = last_event.event {
767 Ok(Some(result))
768 } else {
769 Ok(None)
770 }
771 }
772
773 async fn get_pending_state(
774 &self,
775 execution_id: &ExecutionId,
776 ) -> Result<PendingState, DbErrorRead>;
777
778 async fn get_expired_timers(
780 &self,
781 at: DateTime<Utc>,
782 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
783
784 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
786
787 async fn subscribe_to_next_responses(
792 &self,
793 execution_id: &ExecutionId,
794 start_idx: usize,
795 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
796 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
797
798 async fn wait_for_finished_result(
801 &self,
802 execution_id: &ExecutionId,
803 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
804 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
805
806 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
807
808 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
809
810 async fn get_backtrace(
813 &self,
814 execution_id: &ExecutionId,
815 filter: BacktraceFilter,
816 ) -> Result<BacktraceInfo, DbErrorRead>;
817
818 async fn list_executions(
821 &self,
822 ffqn: Option<FunctionFqn>,
823 top_level_only: bool,
824 pagination: ExecutionListPagination,
825 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
826
827 async fn list_responses(
831 &self,
832 execution_id: &ExecutionId,
833 pagination: Pagination<u32>,
834 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
835}
836
837#[derive(Clone)]
838pub enum BacktraceFilter {
839 First,
840 Last,
841 Specific(Version),
842}
843
844pub struct BacktraceInfo {
845 pub execution_id: ExecutionId,
846 pub component_id: ComponentId,
847 pub version_min_including: Version,
848 pub version_max_excluding: Version,
849 pub wasm_backtrace: WasmBacktrace,
850}
851
852#[derive(Serialize, Deserialize, Debug, Clone)]
853pub struct WasmBacktrace {
854 pub frames: Vec<FrameInfo>,
855}
856
857#[derive(Serialize, Deserialize, Debug, Clone)]
858pub struct FrameInfo {
859 pub module: String,
860 pub func_name: String,
861 pub symbols: Vec<FrameSymbol>,
862}
863
864#[derive(Serialize, Deserialize, Debug, Clone)]
865pub struct FrameSymbol {
866 pub func_name: Option<String>,
867 pub file: Option<String>,
868 pub line: Option<u32>,
869 pub col: Option<u32>,
870}
871
872mod wasm_backtrace {
873 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
874
875 impl WasmBacktrace {
876 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
877 if backtrace.frames().is_empty() {
878 None
879 } else {
880 Some(Self {
881 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
882 })
883 }
884 }
885 }
886
887 impl From<&wasmtime::FrameInfo> for FrameInfo {
888 fn from(frame: &wasmtime::FrameInfo) -> Self {
889 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
890 let mut func_name = String::new();
891 wasmtime_environ::demangle_function_name_or_index(
892 &mut func_name,
893 frame.func_name(),
894 frame.func_index() as usize,
895 )
896 .expect("writing to string must succeed");
897 Self {
898 module: module_name,
899 func_name,
900 symbols: frame
901 .symbols()
902 .iter()
903 .map(std::convert::Into::into)
904 .collect(),
905 }
906 }
907 }
908
909 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
910 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
911 let func_name = symbol.name().map(|name| {
912 let mut writer = String::new();
913 wasmtime_environ::demangle_function_name(&mut writer, name)
914 .expect("writing to string must succeed");
915 writer
916 });
917
918 Self {
919 func_name,
920 file: symbol.file().map(ToString::to_string),
921 line: symbol.line(),
922 col: symbol.column(),
923 }
924 }
925 }
926}
927
928pub type ResponseCursorType = u32; #[derive(Debug, Clone)]
931pub struct ResponseWithCursor {
932 pub event: JoinSetResponseEventOuter,
933 pub cursor: ResponseCursorType,
934}
935
936#[derive(Debug)]
937pub struct ExecutionWithState {
938 pub execution_id: ExecutionId,
939 pub ffqn: FunctionFqn,
940 pub pending_state: PendingState,
941 pub created_at: DateTime<Utc>,
942 pub scheduled_at: DateTime<Utc>,
943}
944
945pub enum ExecutionListPagination {
946 CreatedBy(Pagination<Option<DateTime<Utc>>>),
947 ExecutionId(Pagination<Option<ExecutionId>>),
948}
949
950#[derive(Debug, Clone, Copy)]
951pub enum Pagination<T> {
952 NewerThan {
953 length: ResponseCursorType,
954 cursor: T,
955 including_cursor: bool,
956 },
957 OlderThan {
958 length: ResponseCursorType,
959 cursor: T,
960 including_cursor: bool,
961 },
962}
963impl<T> Pagination<T> {
964 pub fn length(&self) -> ResponseCursorType {
965 match self {
966 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
967 }
968 }
969 pub fn rel(&self) -> &'static str {
970 match self {
971 Pagination::NewerThan {
972 including_cursor: false,
973 ..
974 } => ">",
975 Pagination::NewerThan {
976 including_cursor: true,
977 ..
978 } => ">=",
979 Pagination::OlderThan {
980 including_cursor: false,
981 ..
982 } => "<",
983 Pagination::OlderThan {
984 including_cursor: true,
985 ..
986 } => "<=",
987 }
988 }
989 pub fn is_desc(&self) -> bool {
990 matches!(self, Pagination::OlderThan { .. })
991 }
992}
993
994#[cfg(feature = "test")]
995pub async fn wait_for_pending_state_fn<T: Debug>(
996 db_connection: &dyn DbConnection,
997 execution_id: &ExecutionId,
998 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
999 timeout: Option<Duration>,
1000) -> Result<T, DbErrorReadWithTimeout> {
1001 tracing::trace!(%execution_id, "Waiting for predicate");
1002 let fut = async move {
1003 loop {
1004 let execution_log = db_connection.get(execution_id).await?;
1005 if let Some(t) = predicate(execution_log) {
1006 tracing::debug!(%execution_id, "Found: {t:?}");
1007 return Ok(t);
1008 }
1009 tokio::time::sleep(Duration::from_millis(10)).await;
1010 }
1011 };
1012
1013 if let Some(timeout) = timeout {
1014 tokio::select! { res = fut => res,
1016 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1017 }
1018 } else {
1019 fut.await
1020 }
1021}
1022
1023#[derive(Debug, Clone, PartialEq, Eq)]
1024pub enum ExpiredTimer {
1025 Lock(ExpiredLock),
1026 Delay(ExpiredDelay),
1027}
1028
1029#[derive(Debug, Clone, PartialEq, Eq)]
1030pub struct ExpiredLock {
1031 pub execution_id: ExecutionId,
1032 pub locked_at_version: Version,
1034 pub next_version: Version,
1035 pub intermittent_event_count: u32,
1037 pub max_retries: u32,
1038 pub retry_exp_backoff: Duration,
1039}
1040
1041#[derive(Debug, Clone, PartialEq, Eq)]
1042pub struct ExpiredDelay {
1043 pub execution_id: ExecutionId,
1044 pub join_set_id: JoinSetId,
1045 pub delay_id: DelayId,
1046}
1047
1048#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1049#[serde(tag = "type")]
1050pub enum PendingState {
1051 #[display("Locked(`{lock_expires_at}`, {executor_id}, {run_id})")]
1052 Locked {
1053 executor_id: ExecutorId,
1054 run_id: RunId,
1055 lock_expires_at: DateTime<Utc>,
1056 },
1057 #[display("PendingAt(`{scheduled_at}`)")]
1058 PendingAt { scheduled_at: DateTime<Utc> }, #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1060 BlockedByJoinSet {
1062 join_set_id: JoinSetId,
1063 lock_expires_at: DateTime<Utc>,
1065 closing: bool,
1067 },
1068 #[display("Finished({finished})")]
1069 Finished { finished: PendingStateFinished },
1070}
1071
1072#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1073#[display("{result_kind}")]
1074pub struct PendingStateFinished {
1075 pub version: VersionType, pub finished_at: DateTime<Utc>,
1077 pub result_kind: PendingStateFinishedResultKind,
1078}
1079
1080#[derive(
1081 Debug, Clone, Copy, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
1082)]
1083pub struct PendingStateFinishedResultKind(pub Result<(), PendingStateFinishedError>);
1084const OK_VARIANT: &str = "ok";
1085impl FromStr for PendingStateFinishedResultKind {
1086 type Err = strum::ParseError;
1087
1088 fn from_str(s: &str) -> Result<Self, Self::Err> {
1089 if s == OK_VARIANT {
1090 Ok(PendingStateFinishedResultKind(Ok(())))
1091 } else {
1092 let err = PendingStateFinishedError::from_str(s)?;
1093 Ok(PendingStateFinishedResultKind(Err(err)))
1094 }
1095 }
1096}
1097
1098impl Display for PendingStateFinishedResultKind {
1099 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1100 match self.0 {
1101 Ok(()) => write!(f, "{OK_VARIANT}"),
1102 Err(err) => write!(f, "{err}"),
1103 }
1104 }
1105}
1106
1107impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1108 fn from(result: &SupportedFunctionReturnValue) -> Self {
1109 result.as_pending_state_finished_result()
1110 }
1111}
1112
1113#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumString, strum::Display)]
1114#[cfg_attr(test, derive(strum::VariantArray))]
1115#[strum(serialize_all = "snake_case")]
1116pub enum PendingStateFinishedError {
1117 Timeout,
1118 ExecutionFailure,
1119 FallibleError,
1120}
1121
1122impl PendingState {
1123 pub fn can_append_lock(
1124 &self,
1125 created_at: DateTime<Utc>,
1126 executor_id: ExecutorId,
1127 run_id: RunId,
1128 lock_expires_at: DateTime<Utc>,
1129 ) -> Result<LockKind, DbErrorWritePermanent> {
1130 if lock_expires_at <= created_at {
1131 return Err(DbErrorWritePermanent::ValidationFailed(
1132 "invalid expiry date".into(),
1133 ));
1134 }
1135 match self {
1136 PendingState::PendingAt { scheduled_at } => {
1137 if *scheduled_at <= created_at {
1138 Ok(LockKind::CreatingNewLock)
1140 } else {
1141 Err(DbErrorWritePermanent::ValidationFailed(
1142 "cannot lock, not yet pending".into(),
1143 ))
1144 }
1145 }
1146 PendingState::Locked {
1147 executor_id: current_pending_state_executor_id,
1148 run_id: current_pending_state_run_id,
1149 ..
1150 } => {
1151 if executor_id == *current_pending_state_executor_id
1152 && run_id == *current_pending_state_run_id
1153 {
1154 Ok(LockKind::Extending)
1156 } else {
1157 Err(DbErrorWritePermanent::CannotWrite {
1158 reason: "cannot lock, already locked".into(),
1159 expected_version: None,
1160 })
1161 }
1162 }
1163 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWritePermanent::CannotWrite {
1164 reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1165 expected_version: None,
1166 }),
1167 PendingState::Finished { .. } => Err(DbErrorWritePermanent::CannotWrite {
1168 reason: "already finished".into(),
1169 expected_version: None,
1170 }),
1171 }
1172 }
1173
1174 #[must_use]
1175 pub fn is_finished(&self) -> bool {
1176 matches!(self, PendingState::Finished { .. })
1177 }
1178}
1179
1180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1181pub enum LockKind {
1182 Extending,
1183 CreatingNewLock,
1184}
1185
1186pub mod http_client_trace {
1187 use chrono::{DateTime, Utc};
1188 use serde::{Deserialize, Serialize};
1189
1190 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1191 pub struct HttpClientTrace {
1192 pub req: RequestTrace,
1193 pub resp: Option<ResponseTrace>,
1194 }
1195
1196 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1197 pub struct RequestTrace {
1198 pub sent_at: DateTime<Utc>,
1199 pub uri: String,
1200 pub method: String,
1201 }
1202
1203 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1204 pub struct ResponseTrace {
1205 pub finished_at: DateTime<Utc>,
1206 pub status: Result<u16, String>,
1207 }
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212 use super::HistoryEventScheduleAt;
1213 use super::PendingStateFinished;
1214 use super::PendingStateFinishedError;
1215 use super::PendingStateFinishedResultKind;
1216 use crate::SupportedFunctionReturnValue;
1217 use chrono::DateTime;
1218 use chrono::Datelike;
1219 use chrono::Utc;
1220 use insta::assert_snapshot;
1221 use rstest::rstest;
1222 use std::time::Duration;
1223 use strum::VariantArray as _;
1224 use val_json::type_wrapper::TypeWrapper;
1225 use val_json::wast_val::WastVal;
1226 use val_json::wast_val::WastValWithType;
1227
1228 #[rstest(expected => [
1229 PendingStateFinishedResultKind(Result::Ok(())),
1230 PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1231 ])]
1232 #[test]
1233 fn serde_pending_state_finished_result_kind_should_work(
1234 expected: PendingStateFinishedResultKind,
1235 ) {
1236 let ser = serde_json::to_string(&expected).unwrap();
1237 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1238 assert_eq!(expected, actual);
1239 }
1240
1241 #[rstest(result_kind => [
1242 PendingStateFinishedResultKind(Result::Ok(())),
1243 PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1244 ])]
1245 #[test]
1246 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1247 let expected = PendingStateFinished {
1248 version: 0,
1249 finished_at: Utc::now(),
1250 result_kind,
1251 };
1252
1253 let ser = serde_json::to_string(&expected).unwrap();
1254 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1255 assert_eq!(expected, actual);
1256 }
1257
1258 #[test]
1259 fn join_set_deser_with_result_ok_option_none_should_work() {
1260 let expected = SupportedFunctionReturnValue::Ok {
1261 ok: Some(WastValWithType {
1262 r#type: TypeWrapper::Result {
1263 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1264 err: Some(Box::new(TypeWrapper::String)),
1265 },
1266 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1267 }),
1268 };
1269 let json = serde_json::to_string(&expected).unwrap();
1270 assert_snapshot!(json);
1271
1272 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1273
1274 assert_eq!(expected, actual);
1275 }
1276
1277 #[test]
1278 fn verify_pending_state_finished_result_kind_serde() {
1279 let variants: Vec<_> = PendingStateFinishedError::VARIANTS
1280 .iter()
1281 .map(|var| PendingStateFinishedResultKind(Err(*var)))
1282 .chain(std::iter::once(PendingStateFinishedResultKind(Ok(()))))
1283 .collect();
1284 let ser = serde_json::to_string_pretty(&variants).unwrap();
1285 insta::assert_snapshot!(ser);
1286 let deser: Vec<PendingStateFinishedResultKind> = serde_json::from_str(&ser).unwrap();
1287 assert_eq!(variants, deser);
1288 }
1289
1290 #[test]
1291 fn as_date_time_should_work_with_duration_u32_max_secs() {
1292 let duration = Duration::from_secs(u64::from(u32::MAX));
1293 let schedule_at = HistoryEventScheduleAt::In(duration);
1294 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1295 assert_eq!(2106, resolved.year());
1296 }
1297
1298 const MILLIS_PER_SEC: i64 = 1000;
1299 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1300
1301 #[test]
1302 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1303 let duration = Duration::from_secs(
1305 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1306 );
1307 let schedule_at = HistoryEventScheduleAt::In(duration);
1308 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1309 }
1310}