1use crate::ClosingStrategy;
2use crate::ComponentId;
3use crate::ExecutionId;
4use crate::ExecutionMetadata;
5use crate::FunctionFqn;
6use crate::JoinSetId;
7use crate::Params;
8use crate::StrVariant;
9use crate::SupportedFunctionReturnValue;
10use crate::prefixed_ulid::DelayId;
11use crate::prefixed_ulid::ExecutionIdDerived;
12use crate::prefixed_ulid::ExecutorId;
13use crate::prefixed_ulid::RunId;
14use assert_matches::assert_matches;
15use async_trait::async_trait;
16use chrono::TimeDelta;
17use chrono::{DateTime, Utc};
18use http_client_trace::HttpClientTrace;
19use serde::Deserialize;
20use serde::Serialize;
21use std::fmt::Debug;
22use std::fmt::Display;
23use std::pin::Pin;
24use std::str::FromStr;
25use std::sync::Arc;
26use std::time::Duration;
27use strum::IntoStaticStr;
28use tracing::error;
29
30#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
32pub struct ExecutionLog {
33 pub execution_id: ExecutionId,
34 pub events: Vec<ExecutionEvent>,
35 pub responses: Vec<JoinSetResponseEventOuter>,
36 pub next_version: Version, pub pending_state: PendingState,
38}
39
40impl ExecutionLog {
41 #[must_use]
42 pub fn can_be_retried_after(
43 temporary_event_count: u32,
44 max_retries: u32,
45 retry_exp_backoff: Duration,
46 ) -> Option<Duration> {
47 if temporary_event_count <= max_retries {
49 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
51 Some(duration)
52 } else {
53 None
54 }
55 }
56
57 #[must_use]
58 pub fn compute_retry_duration_when_retrying_forever(
59 temporary_event_count: u32,
60 retry_exp_backoff: Duration,
61 ) -> Duration {
62 Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
63 .expect("`max_retries` set to MAX must never return None")
64 }
65
66 #[must_use]
67 pub fn retry_exp_backoff(&self) -> Duration {
68 assert_matches!(self.events.first(), Some(ExecutionEvent {
69 event: ExecutionEventInner::Created { retry_exp_backoff, .. },
70 ..
71 }) => *retry_exp_backoff)
72 }
73
74 #[must_use]
75 pub fn max_retries(&self) -> u32 {
76 assert_matches!(self.events.first(), Some(ExecutionEvent {
77 event: ExecutionEventInner::Created { max_retries, .. },
78 ..
79 }) => *max_retries)
80 }
81
82 #[must_use]
83 pub fn ffqn(&self) -> &FunctionFqn {
84 assert_matches!(self.events.first(), Some(ExecutionEvent {
85 event: ExecutionEventInner::Created { ffqn, .. },
86 ..
87 }) => ffqn)
88 }
89
90 #[must_use]
91 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
92 assert_matches!(self.events.first(), Some(ExecutionEvent {
93 event: ExecutionEventInner::Created { parent, .. },
94 ..
95 }) => parent.clone())
96 }
97
98 #[must_use]
99 pub fn last_event(&self) -> &ExecutionEvent {
100 self.events.last().expect("must contain at least one event")
101 }
102
103 #[must_use]
104 pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
105 if let ExecutionEvent {
106 event: ExecutionEventInner::Finished { result, .. },
107 ..
108 } = self.events.pop().expect("must contain at least one event")
109 {
110 Some(result)
111 } else {
112 None
113 }
114 }
115
116 pub fn event_history(&self) -> impl Iterator<Item = HistoryEvent> + '_ {
117 self.events.iter().filter_map(|event| {
118 if let ExecutionEventInner::HistoryEvent { event: eh, .. } = &event.event {
119 Some(eh.clone())
120 } else {
121 None
122 }
123 })
124 }
125
126 #[cfg(feature = "test")]
127 #[must_use]
128 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
129 self.events
130 .iter()
131 .find_map(move |event| match &event.event {
132 ExecutionEventInner::HistoryEvent {
133 event:
134 HistoryEvent::JoinSetRequest {
135 join_set_id: found,
136 request,
137 },
138 ..
139 } if *join_set_id == *found => Some(request),
140 _ => None,
141 })
142 }
143}
144
145pub type VersionType = u32;
146#[derive(
147 Debug,
148 Default,
149 Clone,
150 PartialEq,
151 Eq,
152 Hash,
153 derive_more::Display,
154 derive_more::Into,
155 serde::Serialize,
156 serde::Deserialize,
157)]
158#[serde(transparent)]
159pub struct Version(pub VersionType);
160impl Version {
161 #[must_use]
162 pub fn new(arg: VersionType) -> Self {
163 Self(arg)
164 }
165}
166
167#[derive(
168 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
169)]
170#[display("{event}")]
171pub struct ExecutionEvent {
172 pub created_at: DateTime<Utc>,
173 pub event: ExecutionEventInner,
174 pub backtrace_id: Option<Version>,
175}
176
177#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
179pub struct JoinSetResponseEventOuter {
180 pub created_at: DateTime<Utc>,
181 pub event: JoinSetResponseEvent,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
185pub struct JoinSetResponseEvent {
186 pub join_set_id: JoinSetId,
187 pub event: JoinSetResponse,
188}
189
190#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
191#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
192#[serde(tag = "type")]
193pub enum JoinSetResponse {
194 DelayFinished {
195 delay_id: DelayId,
196 },
197 ChildExecutionFinished {
198 child_execution_id: ExecutionIdDerived,
199 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
200 finished_version: Version,
201 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
202 result: SupportedFunctionReturnValue,
203 },
204}
205
206pub const DUMMY_CREATED: ExecutionEventInner = ExecutionEventInner::Created {
207 ffqn: FunctionFqn::new_static("", ""),
208 params: Params::empty(),
209 parent: None,
210 scheduled_at: DateTime::from_timestamp_nanos(0),
211 retry_exp_backoff: Duration::ZERO,
212 max_retries: 0,
213 component_id: ComponentId::dummy_activity(),
214 metadata: ExecutionMetadata::empty(),
215 scheduled_by: None,
216};
217pub const DUMMY_HISTORY_EVENT: ExecutionEventInner = ExecutionEventInner::HistoryEvent {
218 event: HistoryEvent::JoinSetCreate {
219 join_set_id: JoinSetId {
220 kind: crate::JoinSetKind::OneOff,
221 name: StrVariant::empty(),
222 },
223 closing_strategy: ClosingStrategy::Complete,
224 },
225};
226pub const DUMMY_TEMPORARILY_TIMED_OUT: ExecutionEventInner =
227 ExecutionEventInner::TemporarilyTimedOut {
228 backoff_expires_at: DateTime::from_timestamp_nanos(0),
229 http_client_traces: None,
230 };
231pub const DUMMY_TEMPORARILY_FAILED: ExecutionEventInner = ExecutionEventInner::TemporarilyFailed {
232 backoff_expires_at: DateTime::from_timestamp_nanos(0),
233 reason_full: StrVariant::empty(),
234 reason_inner: StrVariant::empty(),
235 detail: None,
236 http_client_traces: None,
237};
238
239#[derive(
240 Clone,
241 derive_more::Debug,
242 derive_more::Display,
243 PartialEq,
244 Eq,
245 Serialize,
246 Deserialize,
247 IntoStaticStr,
248)]
249#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
250#[allow(clippy::large_enum_variant)]
251pub enum ExecutionEventInner {
252 #[display("Created({ffqn}, `{scheduled_at}`)")]
256 Created {
257 ffqn: FunctionFqn,
258 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
259 #[debug(skip)]
260 params: Params,
261 parent: Option<(ExecutionId, JoinSetId)>,
262 scheduled_at: DateTime<Utc>,
263 retry_exp_backoff: Duration,
264 max_retries: u32,
265 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
266 component_id: ComponentId,
267 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
268 metadata: ExecutionMetadata,
269 scheduled_by: Option<ExecutionId>,
270 },
271 #[display("Locked(`{lock_expires_at}`, {component_id})")]
275 Locked {
276 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
277 component_id: ComponentId,
278 executor_id: ExecutorId,
279 run_id: RunId,
280 lock_expires_at: DateTime<Utc>,
281 },
282 #[display("Unlocked(`{backoff_expires_at}`)")]
287 Unlocked {
288 backoff_expires_at: DateTime<Utc>,
289 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
290 reason: StrVariant,
291 },
292 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
295 TemporarilyFailed {
296 backoff_expires_at: DateTime<Utc>,
297 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
298 reason_full: StrVariant,
299 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason inner")))]
300 reason_inner: StrVariant,
301 detail: Option<String>,
302 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
303 http_client_traces: Option<Vec<HttpClientTrace>>,
304 },
305 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
308 TemporarilyTimedOut {
309 backoff_expires_at: DateTime<Utc>,
310 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
311 http_client_traces: Option<Vec<HttpClientTrace>>,
312 },
313 #[display("Finished")]
317 Finished {
318 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
319 result: SupportedFunctionReturnValue,
320 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
321 http_client_traces: Option<Vec<HttpClientTrace>>,
322 },
323
324 #[display("HistoryEvent({event})")]
325 HistoryEvent { event: HistoryEvent },
326}
327
328impl ExecutionEventInner {
329 #[must_use]
330 pub fn is_temporary_event(&self) -> bool {
331 matches!(
332 self,
333 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
334 )
335 }
336
337 #[must_use]
338 pub fn variant(&self) -> &'static str {
339 Into::<&'static str>::into(self)
340 }
341
342 #[must_use]
343 pub fn join_set_id(&self) -> Option<&JoinSetId> {
344 match self {
345 Self::Created {
346 parent: Some((_parent_id, join_set_id)),
347 ..
348 } => Some(join_set_id),
349 Self::HistoryEvent {
350 event:
351 HistoryEvent::JoinSetCreate { join_set_id, .. }
352 | HistoryEvent::JoinSetRequest { join_set_id, .. }
353 | HistoryEvent::JoinNext { join_set_id, .. },
354 } => Some(join_set_id),
355 _ => None,
356 }
357 }
358}
359
360#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
361#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
362#[serde(tag = "type")]
363pub enum PersistKind {
364 #[display("RandomU64({min}, {max_inclusive})")]
365 RandomU64 { min: u64, max_inclusive: u64 },
366 #[display("RandomString({min_length}, {max_length_exclusive})")]
367 RandomString {
368 min_length: u64,
369 max_length_exclusive: u64,
370 },
371}
372
373#[must_use]
374pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
375 value.to_be_bytes()
376}
377
378#[must_use]
379pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
380 u64::from_be_bytes(bytes)
381}
382
383#[derive(
384 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
385)]
386#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
387#[serde(tag = "type")]
388pub enum HistoryEvent {
390 #[display("Persist")]
392 Persist {
393 #[debug(skip)]
394 value: Vec<u8>, kind: PersistKind,
396 },
397 #[display("JoinSetCreate({join_set_id})")]
398 JoinSetCreate {
399 join_set_id: JoinSetId,
400 closing_strategy: ClosingStrategy,
401 },
402 #[display("JoinSetRequest({join_set_id}, {request})")]
403 JoinSetRequest {
404 join_set_id: JoinSetId,
405 request: JoinSetRequest,
406 },
407 #[display("JoinNext({join_set_id})")]
413 JoinNext {
414 join_set_id: JoinSetId,
415 run_expires_at: DateTime<Utc>,
418 requested_ffqn: Option<FunctionFqn>,
421 closing: bool,
423 },
424 #[display("JoinNextTooMany({join_set_id})")]
426 JoinNextTooMany {
427 join_set_id: JoinSetId,
428 requested_ffqn: Option<FunctionFqn>,
431 },
432 #[display("Schedule({execution_id}, {schedule_at})")]
433 Schedule {
434 execution_id: ExecutionId,
435 schedule_at: HistoryEventScheduleAt, },
437 #[display("Stub({target_execution_id})")]
438 Stub {
439 target_execution_id: ExecutionIdDerived,
440 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
441 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
444}
445
446#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
447#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
448pub enum HistoryEventScheduleAt {
449 Now,
450 At(DateTime<Utc>),
451 #[display("In({_0:?})")]
452 In(Duration),
453}
454
455#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
456pub enum ScheduleAtConversionError {
457 #[error("source duration value is out of range")]
458 OutOfRangeError,
459}
460
461impl HistoryEventScheduleAt {
462 pub fn as_date_time(
463 &self,
464 now: DateTime<Utc>,
465 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
466 match self {
467 Self::Now => Ok(now),
468 Self::At(date_time) => Ok(*date_time),
469 Self::In(duration) => {
470 let time_delta = TimeDelta::from_std(*duration)
471 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
472 now.checked_add_signed(time_delta)
473 .ok_or(ScheduleAtConversionError::OutOfRangeError)
474 }
475 }
476 }
477}
478
479#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
480#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
481#[serde(tag = "type")]
482pub enum JoinSetRequest {
483 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
485 DelayRequest {
486 delay_id: DelayId,
487 expires_at: DateTime<Utc>,
488 schedule_at: HistoryEventScheduleAt,
489 },
490 #[display("ChildExecutionRequest({child_execution_id})")]
492 ChildExecutionRequest {
493 child_execution_id: ExecutionIdDerived,
494 },
495}
496
497#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
498pub enum DbConnectionError {
499 #[error("send error")]
500 SendError,
501 #[error("receive error")]
502 RecvError,
503}
504
505#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
506pub enum SpecificError {
507 #[error("validation failed: {0}")]
508 ValidationFailed(StrVariant),
509 #[error("version mismatch")]
510 VersionMismatch {
511 appending_version: Version,
512 expected_version: Version,
513 },
514 #[error("version missing")]
515 VersionMissing,
516 #[error("not found")]
517 NotFound,
518 #[error("consistency error: `{0}`")]
519 ConsistencyError(StrVariant),
520 #[error("{0}")]
521 GenericError(StrVariant),
522}
523
524#[derive(thiserror::Error, Debug, PartialEq, Eq, Clone)]
525pub enum DbError {
526 #[error(transparent)]
527 Connection(#[from] DbConnectionError),
528 #[error(transparent)]
529 Specific(#[from] SpecificError),
530}
531
532#[derive(thiserror::Error, Debug, PartialEq, Eq, Clone)]
533pub enum SubscribeError {
534 #[error("interrupted")]
535 Interrupted,
536 #[error(transparent)]
537 DbError(DbError),
538}
539
540pub type AppendResponse = Version;
541pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
542pub type LockResponse = (Vec<HistoryEvent>, Version);
543
544#[derive(Debug, Clone)]
545pub struct LockedExecution {
546 pub execution_id: ExecutionId,
547 pub metadata: ExecutionMetadata,
548 pub run_id: RunId,
549 pub version: Version,
550 pub ffqn: FunctionFqn,
551 pub params: Params,
552 pub event_history: Vec<HistoryEvent>,
553 pub responses: Vec<JoinSetResponseEventOuter>,
554 pub scheduled_at: DateTime<Utc>,
555 pub retry_exp_backoff: Duration,
556 pub max_retries: u32,
557 pub parent: Option<(ExecutionId, JoinSetId)>,
558 pub temporary_event_count: u32,
559}
560
561pub type LockPendingResponse = Vec<LockedExecution>;
562pub type AppendBatchResponse = Version;
563
564#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
565#[display("{event}")]
566pub struct AppendRequest {
567 pub created_at: DateTime<Utc>,
568 pub event: ExecutionEventInner,
569}
570
571#[derive(Debug, Clone)]
572pub struct CreateRequest {
573 pub created_at: DateTime<Utc>,
574 pub execution_id: ExecutionId,
575 pub ffqn: FunctionFqn,
576 pub params: Params,
577 pub parent: Option<(ExecutionId, JoinSetId)>,
578 pub scheduled_at: DateTime<Utc>,
579 pub retry_exp_backoff: Duration,
580 pub max_retries: u32,
581 pub component_id: ComponentId,
582 pub metadata: ExecutionMetadata,
583 pub scheduled_by: Option<ExecutionId>,
584}
585
586impl From<CreateRequest> for ExecutionEventInner {
587 fn from(value: CreateRequest) -> Self {
588 Self::Created {
589 ffqn: value.ffqn,
590 params: value.params,
591 parent: value.parent,
592 scheduled_at: value.scheduled_at,
593 retry_exp_backoff: value.retry_exp_backoff,
594 max_retries: value.max_retries,
595 component_id: value.component_id,
596 metadata: value.metadata,
597 scheduled_by: value.scheduled_by,
598 }
599 }
600}
601
602#[async_trait]
603pub trait DbPool: Send + Sync {
604 fn connection(&self) -> Box<dyn DbConnection>;
605 fn is_closing(&self) -> bool;
606 async fn close(&self) -> Result<(), DbError>;
607}
608
609#[derive(Debug, thiserror::Error)]
610pub enum ClientError {
611 #[error("client timeout")]
612 Timeout,
613 #[error(transparent)]
614 DbError(#[from] DbError),
615}
616
617#[async_trait]
618pub trait DbConnection: Send + Sync {
619 #[expect(clippy::too_many_arguments)]
620 async fn lock_pending(
621 &self,
622 batch_size: usize,
623 pending_at_or_sooner: DateTime<Utc>,
624 ffqns: Arc<[FunctionFqn]>,
625 created_at: DateTime<Utc>,
626 component_id: ComponentId,
627 executor_id: ExecutorId,
628 lock_expires_at: DateTime<Utc>,
629 run_id: RunId,
630 ) -> Result<LockPendingResponse, DbError>;
631
632 #[expect(clippy::too_many_arguments)]
634 async fn lock(
635 &self,
636 created_at: DateTime<Utc>,
637 component_id: ComponentId,
638 execution_id: &ExecutionId,
639 run_id: RunId,
640 version: Version,
641 executor_id: ExecutorId,
642 lock_expires_at: DateTime<Utc>,
643 ) -> Result<LockResponse, DbError>;
644
645 async fn append(
647 &self,
648 execution_id: ExecutionId,
649 version: Version,
650 req: AppendRequest,
651 ) -> Result<AppendResponse, DbError>;
652
653 async fn append_response(
654 &self,
655 created_at: DateTime<Utc>,
656 execution_id: ExecutionId,
657 response_event: JoinSetResponseEvent,
658 ) -> Result<(), DbError>;
659
660 async fn append_batch(
662 &self,
663 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
665 execution_id: ExecutionId,
666 version: Version,
667 ) -> Result<AppendBatchResponse, DbError>;
668
669 async fn append_batch_create_new_execution(
671 &self,
672 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
675 version: Version,
676 child_req: Vec<CreateRequest>,
677 ) -> Result<AppendBatchResponse, DbError>;
678
679 async fn append_batch_respond_to_parent(
680 &self,
681 execution_id: ExecutionIdDerived,
682 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
684 version: Version,
685 parent_execution_id: ExecutionId,
686 parent_response_event: JoinSetResponseEventOuter,
687 ) -> Result<AppendBatchResponse, DbError>;
688
689 #[cfg(feature = "test")]
690 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbError>;
692
693 async fn list_execution_events(
694 &self,
695 execution_id: &ExecutionId,
696 since: &Version,
697 max_length: VersionType,
698 include_backtrace_id: bool,
699 ) -> Result<Vec<ExecutionEvent>, DbError>;
700
701 async fn get_execution_event(
703 &self,
704 execution_id: &ExecutionId,
705 version: &Version,
706 ) -> Result<ExecutionEvent, DbError>;
707
708 async fn get_create_request(
709 &self,
710 execution_id: &ExecutionId,
711 ) -> Result<CreateRequest, DbError> {
712 let execution_event = self
713 .get_execution_event(execution_id, &Version::new(0))
714 .await?;
715 if let ExecutionEventInner::Created {
716 ffqn,
717 params,
718 parent,
719 scheduled_at,
720 retry_exp_backoff,
721 max_retries,
722 component_id,
723 metadata,
724 scheduled_by,
725 } = execution_event.event
726 {
727 Ok(CreateRequest {
728 created_at: execution_event.created_at,
729 execution_id: execution_id.clone(),
730 ffqn,
731 params,
732 parent,
733 scheduled_at,
734 retry_exp_backoff,
735 max_retries,
736 component_id,
737 metadata,
738 scheduled_by,
739 })
740 } else {
741 error!(%execution_id, "Execution log must start with creation");
742 Err(DbError::Specific(SpecificError::ConsistencyError(
743 StrVariant::Static("execution log must start with creation"),
744 )))
745 }
746 }
747
748 async fn get_finished_result(
749 &self,
750 execution_id: &ExecutionId,
751 finished: PendingStateFinished,
752 ) -> Result<Option<SupportedFunctionReturnValue>, DbError> {
753 let last_event = self
754 .get_execution_event(execution_id, &Version::new(finished.version))
755 .await?;
756 if let ExecutionEventInner::Finished { result, .. } = last_event.event {
757 Ok(Some(result))
758 } else {
759 Ok(None)
760 }
761 }
762
763 async fn get_pending_state(&self, execution_id: &ExecutionId) -> Result<PendingState, DbError>;
764
765 async fn get_expired_timers(&self, at: DateTime<Utc>) -> Result<Vec<ExpiredTimer>, DbError>;
767
768 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbError>;
770
771 async fn subscribe_to_next_responses(
775 &self,
776 execution_id: &ExecutionId,
777 start_idx: usize,
778 interrupt_after: Pin<Box<dyn Future<Output = ()> + Send>>,
779 ) -> Result<Vec<JoinSetResponseEventOuter>, SubscribeError>;
780
781 async fn wait_for_finished_result(
782 &self,
783 execution_id: &ExecutionId,
784 timeout: Option<Duration>,
785 ) -> Result<SupportedFunctionReturnValue, ClientError>;
786
787 async fn wait_for_pending(
793 &self,
794 pending_at_or_sooner: DateTime<Utc>,
795 ffqns: Arc<[FunctionFqn]>,
796 max_wait: Duration,
797 );
798
799 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbError>;
800
801 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbError>;
802
803 async fn get_backtrace(
806 &self,
807 execution_id: &ExecutionId,
808 filter: BacktraceFilter,
809 ) -> Result<BacktraceInfo, DbError>;
810
811 async fn list_executions(
814 &self,
815 ffqn: Option<FunctionFqn>,
816 top_level_only: bool,
817 pagination: ExecutionListPagination,
818 ) -> Result<Vec<ExecutionWithState>, DbError>;
819
820 async fn list_responses(
824 &self,
825 execution_id: &ExecutionId,
826 pagination: Pagination<u32>,
827 ) -> Result<Vec<ResponseWithCursor>, DbError>;
828}
829
830#[derive(Clone)]
831pub enum BacktraceFilter {
832 First,
833 Last,
834 Specific(Version),
835}
836
837pub struct BacktraceInfo {
838 pub execution_id: ExecutionId,
839 pub component_id: ComponentId,
840 pub version_min_including: Version,
841 pub version_max_excluding: Version,
842 pub wasm_backtrace: WasmBacktrace,
843}
844
845#[derive(Serialize, Deserialize, Debug, Clone)]
846pub struct WasmBacktrace {
847 pub frames: Vec<FrameInfo>,
848}
849
850#[derive(Serialize, Deserialize, Debug, Clone)]
851pub struct FrameInfo {
852 pub module: String,
853 pub func_name: String,
854 pub symbols: Vec<FrameSymbol>,
855}
856
857#[derive(Serialize, Deserialize, Debug, Clone)]
858pub struct FrameSymbol {
859 pub func_name: Option<String>,
860 pub file: Option<String>,
861 pub line: Option<u32>,
862 pub col: Option<u32>,
863}
864
865mod wasm_backtrace {
866 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
867
868 impl WasmBacktrace {
869 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
870 if backtrace.frames().is_empty() {
871 None
872 } else {
873 Some(Self {
874 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
875 })
876 }
877 }
878 }
879
880 impl From<&wasmtime::FrameInfo> for FrameInfo {
881 fn from(frame: &wasmtime::FrameInfo) -> Self {
882 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
883 let mut func_name = String::new();
884 wasmtime_environ::demangle_function_name_or_index(
885 &mut func_name,
886 frame.func_name(),
887 frame.func_index() as usize,
888 )
889 .expect("writing to string must succeed");
890 Self {
891 module: module_name,
892 func_name,
893 symbols: frame
894 .symbols()
895 .iter()
896 .map(std::convert::Into::into)
897 .collect(),
898 }
899 }
900 }
901
902 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
903 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
904 let func_name = symbol.name().map(|name| {
905 let mut writer = String::new();
906 wasmtime_environ::demangle_function_name(&mut writer, name)
907 .expect("writing to string must succeed");
908 writer
909 });
910
911 Self {
912 func_name,
913 file: symbol.file().map(ToString::to_string),
914 line: symbol.line(),
915 col: symbol.column(),
916 }
917 }
918 }
919}
920
921#[derive(Debug, Clone)]
922pub struct ResponseWithCursor {
923 pub event: JoinSetResponseEventOuter,
924 pub cursor: u32,
925}
926
927pub struct ExecutionWithState {
928 pub execution_id: ExecutionId,
929 pub ffqn: FunctionFqn,
930 pub pending_state: PendingState,
931 pub created_at: DateTime<Utc>,
932 pub scheduled_at: DateTime<Utc>,
933}
934
935pub enum ExecutionListPagination {
936 CreatedBy(Pagination<Option<DateTime<Utc>>>),
937 ExecutionId(Pagination<Option<ExecutionId>>),
938}
939
940#[derive(Debug, Clone, Copy)]
941pub enum Pagination<T> {
942 NewerThan {
943 length: u32,
944 cursor: T,
945 including_cursor: bool,
946 },
947 OlderThan {
948 length: u32,
949 cursor: T,
950 including_cursor: bool,
951 },
952}
953impl<T> Pagination<T> {
954 pub fn length(&self) -> u32 {
955 match self {
956 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
957 }
958 }
959 pub fn rel(&self) -> &'static str {
960 match self {
961 Pagination::NewerThan {
962 including_cursor: false,
963 ..
964 } => ">",
965 Pagination::NewerThan {
966 including_cursor: true,
967 ..
968 } => ">=",
969 Pagination::OlderThan {
970 including_cursor: false,
971 ..
972 } => "<",
973 Pagination::OlderThan {
974 including_cursor: true,
975 ..
976 } => "<=",
977 }
978 }
979 pub fn is_desc(&self) -> bool {
980 matches!(self, Pagination::OlderThan { .. })
981 }
982}
983
984#[cfg(feature = "test")]
985pub async fn wait_for_pending_state_fn<T: Debug>(
986 db_connection: &dyn DbConnection,
987 execution_id: &ExecutionId,
988 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
989 timeout: Option<Duration>,
990) -> Result<T, ClientError> {
991 tracing::trace!(%execution_id, "Waiting for predicate");
992 let fut = async move {
993 loop {
994 let execution_log = db_connection.get(execution_id).await?;
995 if let Some(t) = predicate(execution_log) {
996 tracing::debug!(%execution_id, "Found: {t:?}");
997 return Ok(t);
998 }
999 tokio::time::sleep(Duration::from_millis(10)).await;
1000 }
1001 };
1002
1003 if let Some(timeout) = timeout {
1004 tokio::select! { res = fut => res,
1006 () = tokio::time::sleep(timeout) => Err(ClientError::Timeout)
1007 }
1008 } else {
1009 fut.await
1010 }
1011}
1012
1013#[derive(Debug, Clone, PartialEq, Eq)]
1014pub enum ExpiredTimer {
1015 Lock {
1016 execution_id: ExecutionId,
1017 locked_at_version: Version,
1018 version: Version, temporary_event_count: u32,
1021 max_retries: u32,
1022 retry_exp_backoff: Duration,
1023 parent: Option<(ExecutionId, JoinSetId)>,
1024 },
1025 Delay {
1026 execution_id: ExecutionId,
1027 join_set_id: JoinSetId,
1028 delay_id: DelayId,
1029 },
1030}
1031
1032#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1033#[serde(tag = "type")]
1034pub enum PendingState {
1035 #[display("Locked(`{lock_expires_at}`, {executor_id}, {run_id})")]
1036 Locked {
1037 executor_id: ExecutorId,
1038 run_id: RunId,
1039 lock_expires_at: DateTime<Utc>,
1040 },
1041 #[display("PendingAt(`{scheduled_at}`)")]
1042 PendingAt { scheduled_at: DateTime<Utc> }, #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1044 BlockedByJoinSet {
1046 join_set_id: JoinSetId,
1047 lock_expires_at: DateTime<Utc>,
1049 closing: bool,
1051 },
1052 #[display("Finished({finished})")]
1053 Finished { finished: PendingStateFinished },
1054}
1055
1056#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
1057#[display("{result_kind}")]
1058pub struct PendingStateFinished {
1059 pub version: VersionType, pub finished_at: DateTime<Utc>,
1061 pub result_kind: PendingStateFinishedResultKind,
1062}
1063
1064#[derive(
1065 Debug, Clone, Copy, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
1066)]
1067pub struct PendingStateFinishedResultKind(pub Result<(), PendingStateFinishedError>);
1068const OK_VARIANT: &str = "ok";
1069impl FromStr for PendingStateFinishedResultKind {
1070 type Err = strum::ParseError;
1071
1072 fn from_str(s: &str) -> Result<Self, Self::Err> {
1073 if s == OK_VARIANT {
1074 Ok(PendingStateFinishedResultKind(Ok(())))
1075 } else {
1076 let err = PendingStateFinishedError::from_str(s)?;
1077 Ok(PendingStateFinishedResultKind(Err(err)))
1078 }
1079 }
1080}
1081
1082impl Display for PendingStateFinishedResultKind {
1083 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1084 match self.0 {
1085 Ok(()) => write!(f, "{OK_VARIANT}"),
1086 Err(err) => write!(f, "{err}"),
1087 }
1088 }
1089}
1090
1091impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1092 fn from(result: &SupportedFunctionReturnValue) -> Self {
1093 result.as_pending_state_finished_result()
1094 }
1095}
1096
1097#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumString, strum::Display)]
1098#[cfg_attr(test, derive(strum::VariantArray))]
1099#[strum(serialize_all = "snake_case")]
1100pub enum PendingStateFinishedError {
1101 Timeout,
1102 ExecutionFailure,
1103 FallibleError,
1104}
1105
1106impl PendingState {
1107 pub fn can_append_lock(
1108 &self,
1109 created_at: DateTime<Utc>,
1110 executor_id: ExecutorId,
1111 run_id: RunId,
1112 lock_expires_at: DateTime<Utc>,
1113 ) -> Result<LockKind, SpecificError> {
1114 if lock_expires_at <= created_at {
1115 return Err(SpecificError::ValidationFailed(StrVariant::Static(
1116 "invalid expiry date",
1117 )));
1118 }
1119 match self {
1120 PendingState::PendingAt { scheduled_at } => {
1121 if *scheduled_at <= created_at {
1122 Ok(LockKind::CreatingNewLock)
1124 } else {
1125 Err(SpecificError::ValidationFailed(StrVariant::Static(
1126 "cannot lock, not yet pending",
1127 )))
1128 }
1129 }
1130 PendingState::Locked {
1131 executor_id: current_pending_state_executor_id,
1132 run_id: current_pending_state_run_id,
1133 ..
1134 } => {
1135 if executor_id == *current_pending_state_executor_id && run_id == *current_pending_state_run_id
1137 {
1138 Ok(LockKind::Extending)
1140 } else {
1141 Err(SpecificError::ValidationFailed(StrVariant::Static(
1142 "cannot lock, already locked",
1143 )))
1144 }
1145 }
1146 PendingState::BlockedByJoinSet { .. } => Err(SpecificError::ValidationFailed(
1147 StrVariant::Static("cannot append Locked event when in BlockedByJoinSet state"),
1148 )),
1149 PendingState::Finished { .. } => Err(SpecificError::ValidationFailed(
1150 StrVariant::Static("already finished"),
1151 )),
1152 }
1153 }
1154
1155 #[must_use]
1156 pub fn is_finished(&self) -> bool {
1157 matches!(self, PendingState::Finished { .. })
1158 }
1159}
1160
1161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1162pub enum LockKind {
1163 Extending,
1164 CreatingNewLock,
1165}
1166
1167pub mod http_client_trace {
1168 use chrono::{DateTime, Utc};
1169 use serde::{Deserialize, Serialize};
1170
1171 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1172 pub struct HttpClientTrace {
1173 pub req: RequestTrace,
1174 pub resp: Option<ResponseTrace>,
1175 }
1176
1177 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1178 pub struct RequestTrace {
1179 pub sent_at: DateTime<Utc>,
1180 pub uri: String,
1181 pub method: String,
1182 }
1183
1184 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1185 pub struct ResponseTrace {
1186 pub finished_at: DateTime<Utc>,
1187 pub status: Result<u16, String>,
1188 }
1189}
1190
1191#[cfg(test)]
1192mod tests {
1193 use super::HistoryEventScheduleAt;
1194 use super::PendingStateFinished;
1195 use super::PendingStateFinishedError;
1196 use super::PendingStateFinishedResultKind;
1197 use crate::SupportedFunctionReturnValue;
1198 use chrono::DateTime;
1199 use chrono::Datelike;
1200 use chrono::Utc;
1201 use insta::assert_snapshot;
1202 use rstest::rstest;
1203 use std::time::Duration;
1204 use strum::VariantArray as _;
1205 use val_json::type_wrapper::TypeWrapper;
1206 use val_json::wast_val::WastVal;
1207 use val_json::wast_val::WastValWithType;
1208
1209 #[rstest(expected => [
1210 PendingStateFinishedResultKind(Result::Ok(())),
1211 PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1212 ])]
1213 #[test]
1214 fn serde_pending_state_finished_result_kind_should_work(
1215 expected: PendingStateFinishedResultKind,
1216 ) {
1217 let ser = serde_json::to_string(&expected).unwrap();
1218 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1219 assert_eq!(expected, actual);
1220 }
1221
1222 #[rstest(result_kind => [
1223 PendingStateFinishedResultKind(Result::Ok(())),
1224 PendingStateFinishedResultKind(Result::Err(PendingStateFinishedError::Timeout)),
1225 ])]
1226 #[test]
1227 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1228 let expected = PendingStateFinished {
1229 version: 0,
1230 finished_at: Utc::now(),
1231 result_kind,
1232 };
1233
1234 let ser = serde_json::to_string(&expected).unwrap();
1235 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1236 assert_eq!(expected, actual);
1237 }
1238
1239 #[test]
1240 fn join_set_deser_with_result_ok_option_none_should_work() {
1241 let expected = SupportedFunctionReturnValue::Ok {
1242 ok: Some(WastValWithType {
1243 r#type: TypeWrapper::Result {
1244 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1245 err: Some(Box::new(TypeWrapper::String)),
1246 },
1247 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1248 }),
1249 };
1250 let json = serde_json::to_string(&expected).unwrap();
1251 assert_snapshot!(json);
1252
1253 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1254
1255 assert_eq!(expected, actual);
1256 }
1257
1258 #[test]
1259 fn verify_pending_state_finished_result_kind_serde() {
1260 let variants: Vec<_> = PendingStateFinishedError::VARIANTS
1261 .iter()
1262 .map(|var| PendingStateFinishedResultKind(Err(*var)))
1263 .chain(std::iter::once(PendingStateFinishedResultKind(Ok(()))))
1264 .collect();
1265 let ser = serde_json::to_string_pretty(&variants).unwrap();
1266 insta::assert_snapshot!(ser);
1267 let deser: Vec<PendingStateFinishedResultKind> = serde_json::from_str(&ser).unwrap();
1268 assert_eq!(variants, deser);
1269 }
1270
1271 #[test]
1272 fn as_date_time_should_work_with_duration_u32_max_secs() {
1273 let duration = Duration::from_secs(u64::from(u32::MAX));
1274 let schedule_at = HistoryEventScheduleAt::In(duration);
1275 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1276 assert_eq!(2106, resolved.year());
1277 }
1278
1279 const MILLIS_PER_SEC: i64 = 1000;
1280 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1281
1282 #[test]
1283 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1284 let duration = Duration::from_secs(
1286 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1287 );
1288 let schedule_at = HistoryEventScheduleAt::In(duration);
1289 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1290 }
1291}