1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ExecutionFailureKind;
4use crate::ExecutionId;
5use crate::ExecutionMetadata;
6use crate::FinishedExecutionError;
7use crate::FunctionFqn;
8use crate::JoinSetId;
9use crate::Params;
10use crate::StrVariant;
11use crate::SupportedFunctionReturnValue;
12use crate::component_id::InputContentDigest;
13use crate::prefixed_ulid::DelayId;
14use crate::prefixed_ulid::ExecutionIdDerived;
15use crate::prefixed_ulid::ExecutorId;
16use crate::prefixed_ulid::RunId;
17use assert_matches::assert_matches;
18use async_trait::async_trait;
19use chrono::TimeDelta;
20use chrono::{DateTime, Utc};
21use http_client_trace::HttpClientTrace;
22use serde::Deserialize;
23use serde::Serialize;
24use std::fmt::Debug;
25use std::fmt::Display;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::time::Duration;
29use strum::IntoStaticStr;
30use tracing::debug;
31use tracing::error;
32
33#[derive(Debug, PartialEq, Eq)]
35#[cfg_attr(feature = "test", derive(Serialize))]
36pub struct ExecutionLog {
37 pub execution_id: ExecutionId,
38 pub events: Vec<ExecutionEvent>,
39 pub responses: Vec<JoinSetResponseEventOuter>,
40 pub next_version: Version, pub pending_state: PendingState,
42}
43
44impl ExecutionLog {
45 #[must_use]
46 pub fn can_be_retried_after(
47 temporary_event_count: u32,
48 max_retries: u32,
49 retry_exp_backoff: Duration,
50 ) -> Option<Duration> {
51 if temporary_event_count <= max_retries {
53 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
55 Some(duration)
56 } else {
57 None
58 }
59 }
60
61 #[must_use]
62 pub fn compute_retry_duration_when_retrying_forever(
63 temporary_event_count: u32,
64 retry_exp_backoff: Duration,
65 ) -> Duration {
66 Self::can_be_retried_after(temporary_event_count, u32::MAX, retry_exp_backoff)
67 .expect("`max_retries` set to MAX must never return None")
68 }
69
70 #[must_use]
71 pub fn ffqn(&self) -> &FunctionFqn {
72 assert_matches!(self.events.first(), Some(ExecutionEvent {
73 event: ExecutionRequest::Created { ffqn, .. },
74 ..
75 }) => ffqn)
76 }
77
78 #[must_use]
79 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
80 assert_matches!(self.events.first(), Some(ExecutionEvent {
81 event: ExecutionRequest::Created { parent, .. },
82 ..
83 }) => parent.clone())
84 }
85
86 #[must_use]
87 pub fn last_event(&self) -> &ExecutionEvent {
88 self.events.last().expect("must contain at least one event")
89 }
90
91 #[must_use]
92 pub fn into_finished_result(mut self) -> Option<SupportedFunctionReturnValue> {
93 if let ExecutionEvent {
94 event: ExecutionRequest::Finished { result, .. },
95 ..
96 } = self.events.pop().expect("must contain at least one event")
97 {
98 Some(result)
99 } else {
100 None
101 }
102 }
103
104 #[cfg(feature = "test")]
105 pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
106 self.events.iter().filter_map(|event| {
107 if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
108 Some((eh.clone(), event.version.clone()))
109 } else {
110 None
111 }
112 })
113 }
114
115 #[cfg(feature = "test")]
116 #[must_use]
117 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
118 self.events
119 .iter()
120 .find_map(move |event| match &event.event {
121 ExecutionRequest::HistoryEvent {
122 event:
123 HistoryEvent::JoinSetRequest {
124 join_set_id: found,
125 request,
126 },
127 ..
128 } if *join_set_id == *found => Some(request),
129 _ => None,
130 })
131 }
132}
133
134pub type VersionType = u32;
135#[derive(
136 Debug,
137 Default,
138 Clone,
139 PartialEq,
140 Eq,
141 Hash,
142 derive_more::Display,
143 derive_more::Into,
144 serde::Serialize,
145 serde::Deserialize,
146)]
147#[serde(transparent)]
148pub struct Version(pub VersionType);
149impl Version {
150 #[must_use]
151 pub fn new(arg: VersionType) -> Version {
152 Version(arg)
153 }
154
155 #[must_use]
156 pub fn increment(&self) -> Version {
157 Version(self.0 + 1)
158 }
159}
160
161#[derive(
162 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
163)]
164#[display("{event}")]
165pub struct ExecutionEvent {
166 pub created_at: DateTime<Utc>,
167 pub event: ExecutionRequest,
168 #[serde(skip_serializing_if = "Option::is_none")]
169 pub backtrace_id: Option<Version>,
170 pub version: Version,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
175pub struct JoinSetResponseEventOuter {
176 pub created_at: DateTime<Utc>,
177 pub event: JoinSetResponseEvent,
178}
179
180#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
181pub struct JoinSetResponseEvent {
182 pub join_set_id: JoinSetId,
183 pub event: JoinSetResponse,
184}
185
186#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
187#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
188#[serde(tag = "type")]
189pub enum JoinSetResponse {
190 #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
191 DelayFinished {
192 delay_id: DelayId,
193 result: Result<(), ()>,
194 },
195 #[display("{result}: {child_execution_id}")] ChildExecutionFinished {
197 child_execution_id: ExecutionIdDerived,
198 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
199 finished_version: Version,
200 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
201 result: SupportedFunctionReturnValue,
202 },
203}
204
205pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
206 ffqn: FunctionFqn::new_static("", ""),
207 params: Params::empty(),
208 parent: None,
209 scheduled_at: DateTime::from_timestamp_nanos(0),
210 component_id: ComponentId::dummy_activity(),
211 metadata: ExecutionMetadata::empty(),
212 scheduled_by: None,
213};
214pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
215 event: HistoryEvent::JoinSetCreate {
216 join_set_id: JoinSetId {
217 kind: crate::JoinSetKind::OneOff,
218 name: StrVariant::empty(),
219 },
220 },
221};
222
223#[derive(
224 Clone,
225 derive_more::Debug,
226 derive_more::Display,
227 PartialEq,
228 Eq,
229 Serialize,
230 Deserialize,
231 IntoStaticStr,
232)]
233#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
234#[allow(clippy::large_enum_variant)]
235pub enum ExecutionRequest {
236 #[display("Created({ffqn}, `{scheduled_at}`)")]
237 Created {
238 ffqn: FunctionFqn,
239 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
240 #[debug(skip)]
241 params: Params,
242 parent: Option<(ExecutionId, JoinSetId)>,
243 scheduled_at: DateTime<Utc>,
244 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
245 component_id: ComponentId,
246 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
247 metadata: ExecutionMetadata,
248 scheduled_by: Option<ExecutionId>,
249 },
250 Locked(Locked),
251 #[display("Unlocked(`{backoff_expires_at}`)")]
256 Unlocked {
257 backoff_expires_at: DateTime<Utc>,
258 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
259 reason: StrVariant,
260 },
261 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
264 TemporarilyFailed {
265 backoff_expires_at: DateTime<Utc>,
266 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
267 reason: StrVariant,
268 detail: Option<String>,
269 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
270 http_client_traces: Option<Vec<HttpClientTrace>>,
271 },
272 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
275 TemporarilyTimedOut {
276 backoff_expires_at: DateTime<Utc>,
277 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
278 http_client_traces: Option<Vec<HttpClientTrace>>,
279 },
280 #[display("Finished")]
282 Finished {
283 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
284 result: SupportedFunctionReturnValue,
285 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
286 http_client_traces: Option<Vec<HttpClientTrace>>,
287 },
288
289 #[display("HistoryEvent({event})")]
290 HistoryEvent {
291 event: HistoryEvent,
292 },
293}
294
295impl ExecutionRequest {
296 #[must_use]
297 pub fn is_temporary_event(&self) -> bool {
298 matches!(
299 self,
300 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
301 )
302 }
303
304 #[must_use]
305 pub fn variant(&self) -> &'static str {
306 Into::<&'static str>::into(self)
307 }
308
309 #[must_use]
310 pub fn join_set_id(&self) -> Option<&JoinSetId> {
311 match self {
312 Self::Created {
313 parent: Some((_parent_id, join_set_id)),
314 ..
315 } => Some(join_set_id),
316 Self::HistoryEvent {
317 event:
318 HistoryEvent::JoinSetCreate { join_set_id, .. }
319 | HistoryEvent::JoinSetRequest { join_set_id, .. }
320 | HistoryEvent::JoinNext { join_set_id, .. },
321 } => Some(join_set_id),
322 _ => None,
323 }
324 }
325}
326
327#[derive(
328 Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
329)]
330#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
331#[display("Locked(`{lock_expires_at}`, {component_id})")]
332pub struct Locked {
333 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
334 pub component_id: ComponentId,
335 pub executor_id: ExecutorId,
336 pub run_id: RunId,
337 pub lock_expires_at: DateTime<Utc>,
338 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
339 pub retry_config: ComponentRetryConfig,
340}
341
342#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
343#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
344#[serde(tag = "type")]
345pub enum PersistKind {
346 #[display("RandomU64({min}, {max_inclusive})")]
347 RandomU64 { min: u64, max_inclusive: u64 },
348 #[display("RandomString({min_length}, {max_length_exclusive})")]
349 RandomString {
350 min_length: u64,
351 max_length_exclusive: u64,
352 },
353}
354
355#[must_use]
356pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
357 value.to_be_bytes()
358}
359
360#[must_use]
361pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
362 u64::from_be_bytes(bytes)
363}
364
365#[derive(
366 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
367)]
368#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
369#[serde(tag = "type")]
370pub enum HistoryEvent {
372 #[display("Persist")]
374 Persist {
375 #[debug(skip)]
376 value: Vec<u8>, kind: PersistKind,
378 },
379 #[display("JoinSetCreate({join_set_id})")]
380 JoinSetCreate { join_set_id: JoinSetId },
381 #[display("JoinSetRequest({request})")]
382 JoinSetRequest {
384 join_set_id: JoinSetId,
385 request: JoinSetRequest,
386 },
387 #[display("JoinNext({join_set_id})")]
393 JoinNext {
394 join_set_id: JoinSetId,
395 run_expires_at: DateTime<Utc>,
398 requested_ffqn: Option<FunctionFqn>,
401 closing: bool,
403 },
404 #[display("JoinNextTooMany({join_set_id})")]
406 JoinNextTooMany {
407 join_set_id: JoinSetId,
408 requested_ffqn: Option<FunctionFqn>,
411 },
412 #[display("Schedule({execution_id}, {schedule_at})")]
413 Schedule {
414 execution_id: ExecutionId,
415 schedule_at: HistoryEventScheduleAt, },
417 #[display("Stub({target_execution_id})")]
418 Stub {
419 target_execution_id: ExecutionIdDerived,
420 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
421 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
424}
425
426#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
427#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
428pub enum HistoryEventScheduleAt {
429 Now,
430 #[display("At(`{_0}`)")]
431 At(DateTime<Utc>),
432 #[display("In({_0:?})")]
433 In(Duration),
434}
435
436#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
437pub enum ScheduleAtConversionError {
438 #[error("source duration value is out of range")]
439 OutOfRangeError,
440}
441
442impl HistoryEventScheduleAt {
443 pub fn as_date_time(
444 &self,
445 now: DateTime<Utc>,
446 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
447 match self {
448 Self::Now => Ok(now),
449 Self::At(date_time) => Ok(*date_time),
450 Self::In(duration) => {
451 let time_delta = TimeDelta::from_std(*duration)
452 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
453 now.checked_add_signed(time_delta)
454 .ok_or(ScheduleAtConversionError::OutOfRangeError)
455 }
456 }
457 }
458}
459
460#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
461#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
462#[serde(tag = "type")]
463pub enum JoinSetRequest {
464 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
466 DelayRequest {
467 delay_id: DelayId,
468 expires_at: DateTime<Utc>,
469 schedule_at: HistoryEventScheduleAt,
470 },
471 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
473 ChildExecutionRequest {
474 child_execution_id: ExecutionIdDerived,
475 target_ffqn: FunctionFqn,
476 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
477 params: Params,
478 },
479}
480
481#[derive(Debug, Clone, thiserror::Error, PartialEq)]
483pub enum DbErrorGeneric {
484 #[error("database error: {0}")]
485 Uncategorized(StrVariant),
486 #[error("database was closed")]
487 Close,
488}
489
490#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq)]
491pub enum DbErrorWriteNonRetriable {
492 #[error("validation failed: {0}")]
493 ValidationFailed(StrVariant),
494 #[error("conflict")]
495 Conflict,
496 #[error("illegal state: {0}")]
497 IllegalState(StrVariant),
498 #[error("version conflict: expected: {expected}, got: {requested}")]
499 VersionConflict {
500 expected: Version,
501 requested: Version,
502 },
503}
504
505#[derive(Debug, Clone, thiserror::Error, PartialEq)]
507pub enum DbErrorWrite {
508 #[error("cannot write - row not found")]
509 NotFound,
510 #[error("non-retriable error: {0}")]
511 NonRetriable(#[from] DbErrorWriteNonRetriable),
512 #[error(transparent)]
513 Generic(#[from] DbErrorGeneric),
514}
515
516#[derive(Debug, Clone, thiserror::Error, PartialEq)]
518pub enum DbErrorRead {
519 #[error("cannot read - row not found")]
520 NotFound,
521 #[error(transparent)]
522 Generic(#[from] DbErrorGeneric),
523}
524
525impl From<DbErrorRead> for DbErrorWrite {
526 fn from(value: DbErrorRead) -> DbErrorWrite {
527 match value {
528 DbErrorRead::NotFound => DbErrorWrite::NotFound,
529 DbErrorRead::Generic(err) => DbErrorWrite::Generic(err),
530 }
531 }
532}
533
534#[derive(Debug, thiserror::Error, PartialEq)]
535pub enum DbErrorReadWithTimeout {
536 #[error("timeout")]
537 Timeout,
538 #[error(transparent)]
539 DbErrorRead(#[from] DbErrorRead),
540}
541impl From<DbErrorGeneric> for DbErrorReadWithTimeout {
542 fn from(value: DbErrorGeneric) -> DbErrorReadWithTimeout {
543 Self::from(DbErrorRead::from(value))
544 }
545}
546
547pub type AppendResponse = Version;
550pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
551
552#[derive(Debug, Clone)]
553pub struct LockedExecution {
554 pub execution_id: ExecutionId,
555 pub next_version: Version,
556 pub metadata: ExecutionMetadata,
557 pub locked_event: Locked,
558 pub ffqn: FunctionFqn,
559 pub params: Params,
560 pub event_history: Vec<(HistoryEvent, Version)>,
561 pub responses: Vec<JoinSetResponseEventOuter>,
562 pub parent: Option<(ExecutionId, JoinSetId)>,
563 pub intermittent_event_count: u32,
564}
565
566pub type LockPendingResponse = Vec<LockedExecution>;
567pub type AppendBatchResponse = Version;
568
569#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
570#[display("{event}")]
571pub struct AppendRequest {
572 pub created_at: DateTime<Utc>,
573 pub event: ExecutionRequest,
574}
575
576#[derive(Debug, Clone)]
577pub struct CreateRequest {
578 pub created_at: DateTime<Utc>,
579 pub execution_id: ExecutionId,
580 pub ffqn: FunctionFqn,
581 pub params: Params,
582 pub parent: Option<(ExecutionId, JoinSetId)>,
583 pub scheduled_at: DateTime<Utc>,
584 pub component_id: ComponentId,
585 pub metadata: ExecutionMetadata,
586 pub scheduled_by: Option<ExecutionId>,
587}
588
589impl From<CreateRequest> for ExecutionRequest {
590 fn from(value: CreateRequest) -> Self {
591 Self::Created {
592 ffqn: value.ffqn,
593 params: value.params,
594 parent: value.parent,
595 scheduled_at: value.scheduled_at,
596 component_id: value.component_id,
597 metadata: value.metadata,
598 scheduled_by: value.scheduled_by,
599 }
600 }
601}
602
603#[async_trait]
604pub trait DbPool: Send + Sync {
605 fn connection(&self) -> Box<dyn DbConnection>;
606}
607
608#[async_trait]
609pub trait DbPoolCloseable {
610 async fn close(self);
611}
612
613#[derive(Clone, Debug)]
614pub struct AppendEventsToExecution {
615 pub execution_id: ExecutionId,
616 pub version: Version,
617 pub batch: Vec<AppendRequest>,
618}
619
620#[derive(Clone, Debug)]
621pub struct AppendResponseToExecution {
622 pub parent_execution_id: ExecutionId,
623 pub created_at: DateTime<Utc>,
624 pub join_set_id: JoinSetId,
625 pub child_execution_id: ExecutionIdDerived,
626 pub finished_version: Version,
627 pub result: SupportedFunctionReturnValue,
628}
629
630#[async_trait]
631pub trait DbExecutor: Send + Sync {
632 #[expect(clippy::too_many_arguments)]
633 async fn lock_pending_by_ffqns(
634 &self,
635 batch_size: usize,
636 pending_at_or_sooner: DateTime<Utc>,
637 ffqns: Arc<[FunctionFqn]>,
638 created_at: DateTime<Utc>,
639 component_id: ComponentId,
640 executor_id: ExecutorId,
641 lock_expires_at: DateTime<Utc>,
642 run_id: RunId,
643 retry_config: ComponentRetryConfig,
644 ) -> Result<LockPendingResponse, DbErrorGeneric>;
645
646 #[expect(clippy::too_many_arguments)]
647 async fn lock_pending_by_component_id(
648 &self,
649 batch_size: usize,
650 pending_at_or_sooner: DateTime<Utc>,
651 component_id: &ComponentId,
652 created_at: DateTime<Utc>,
653 executor_id: ExecutorId,
654 lock_expires_at: DateTime<Utc>,
655 run_id: RunId,
656 retry_config: ComponentRetryConfig,
657 ) -> Result<LockPendingResponse, DbErrorGeneric>;
658
659 #[expect(clippy::too_many_arguments)]
661 async fn lock_one(
662 &self,
663 created_at: DateTime<Utc>,
664 component_id: ComponentId,
665 execution_id: &ExecutionId,
666 run_id: RunId,
667 version: Version,
668 executor_id: ExecutorId,
669 lock_expires_at: DateTime<Utc>,
670 retry_config: ComponentRetryConfig,
671 ) -> Result<LockedExecution, DbErrorWrite>;
672
673 async fn append(
676 &self,
677 execution_id: ExecutionId,
678 version: Version,
679 req: AppendRequest,
680 ) -> Result<AppendResponse, DbErrorWrite>;
681
682 async fn append_batch_respond_to_parent(
685 &self,
686 events: AppendEventsToExecution,
687 response: AppendResponseToExecution,
688 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
690
691 async fn wait_for_pending_by_ffqn(
696 &self,
697 pending_at_or_sooner: DateTime<Utc>,
698 ffqns: Arc<[FunctionFqn]>,
699 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
700 );
701
702 async fn wait_for_pending_by_component_id(
703 &self,
704 pending_at_or_sooner: DateTime<Utc>,
705 component_id: &ComponentId,
706 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
707 );
708
709 async fn cancel_activity_with_retries(
710 &self,
711 execution_id: &ExecutionId,
712 cancelled_at: DateTime<Utc>,
713 ) -> Result<CancelOutcome, DbErrorWrite> {
714 let mut retries = 5;
715 loop {
716 let res = self.cancel_activity(execution_id, cancelled_at).await;
717 if res.is_ok() || retries == 0 {
718 return res;
719 }
720 retries -= 1;
721 }
722 }
723
724 async fn get_last_execution_event(
726 &self,
727 execution_id: &ExecutionId,
728 ) -> Result<ExecutionEvent, DbErrorRead>;
729
730 async fn cancel_activity(
731 &self,
732 execution_id: &ExecutionId,
733 cancelled_at: DateTime<Utc>,
734 ) -> Result<CancelOutcome, DbErrorWrite> {
735 debug!("Determining cancellation state of {execution_id}");
736
737 let last_event = self
738 .get_last_execution_event(execution_id)
739 .await
740 .map_err(DbErrorWrite::from)?;
741 if let ExecutionRequest::Finished {
742 result:
743 SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
744 kind: ExecutionFailureKind::Cancelled,
745 ..
746 }),
747 ..
748 } = last_event.event
749 {
750 return Ok(CancelOutcome::Cancelled);
751 } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
752 debug!("Not cancelling, {execution_id} is already finished");
753 return Ok(CancelOutcome::AlreadyFinished);
754 }
755 let finished_version = last_event.version.increment();
756 let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
757 reason: None,
758 kind: ExecutionFailureKind::Cancelled,
759 detail: None,
760 });
761 let cancel_request = AppendRequest {
762 created_at: cancelled_at,
763 event: ExecutionRequest::Finished {
764 result: child_result.clone(),
765 http_client_traces: None,
766 },
767 };
768 debug!("Cancelling activity {execution_id} at {finished_version}");
769 if let ExecutionId::Derived(execution_id) = execution_id {
770 let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
771 let child_execution_id = ExecutionId::Derived(execution_id.clone());
772 self.append_batch_respond_to_parent(
773 AppendEventsToExecution {
774 execution_id: child_execution_id,
775 version: finished_version.clone(),
776 batch: vec![cancel_request],
777 },
778 AppendResponseToExecution {
779 parent_execution_id,
780 created_at: cancelled_at,
781 join_set_id: join_set_id.clone(),
782 child_execution_id: execution_id.clone(),
783 finished_version,
784 result: child_result,
785 },
786 cancelled_at,
787 )
788 .await?;
789 } else {
790 self.append(execution_id.clone(), finished_version, cancel_request)
791 .await?;
792 }
793 debug!("Cancelled {execution_id}");
794 Ok(CancelOutcome::Cancelled)
795 }
796}
797
798pub enum AppendDelayResponseOutcome {
799 Success,
800 AlreadyFinished,
801 AlreadyCancelled,
802}
803
804#[async_trait]
805pub trait DbConnection: DbExecutor {
806 #[cfg(feature = "test")]
807 async fn append_response(
808 &self,
809 created_at: DateTime<Utc>,
810 execution_id: ExecutionId,
811 response_event: JoinSetResponseEvent,
812 ) -> Result<(), DbErrorWrite>;
813
814 async fn append_delay_response(
815 &self,
816 created_at: DateTime<Utc>,
817 execution_id: ExecutionId,
818 join_set_id: JoinSetId,
819 delay_id: DelayId,
820 outcome: Result<(), ()>, ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
822
823 async fn append_batch(
826 &self,
827 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
829 execution_id: ExecutionId,
830 version: Version,
831 ) -> Result<AppendBatchResponse, DbErrorWrite>;
832
833 async fn append_batch_create_new_execution(
836 &self,
837 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
840 version: Version,
841 child_req: Vec<CreateRequest>,
842 ) -> Result<AppendBatchResponse, DbErrorWrite>;
843
844 #[cfg(feature = "test")]
845 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
847
848 async fn list_execution_events(
849 &self,
850 execution_id: &ExecutionId,
851 since: &Version,
852 max_length: VersionType,
853 include_backtrace_id: bool,
854 ) -> Result<Vec<ExecutionEvent>, DbErrorRead>;
855
856 async fn get_execution_event(
858 &self,
859 execution_id: &ExecutionId,
860 version: &Version,
861 ) -> Result<ExecutionEvent, DbErrorRead>;
862
863 async fn get_create_request(
864 &self,
865 execution_id: &ExecutionId,
866 ) -> Result<CreateRequest, DbErrorRead> {
867 let execution_event = self
868 .get_execution_event(execution_id, &Version::new(0))
869 .await?;
870 if let ExecutionRequest::Created {
871 ffqn,
872 params,
873 parent,
874 scheduled_at,
875 component_id,
876 metadata,
877 scheduled_by,
878 } = execution_event.event
879 {
880 Ok(CreateRequest {
881 created_at: execution_event.created_at,
882 execution_id: execution_id.clone(),
883 ffqn,
884 params,
885 parent,
886 scheduled_at,
887 component_id,
888 metadata,
889 scheduled_by,
890 })
891 } else {
892 error!(%execution_id, "Execution log must start with creation");
893 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized(
894 "execution log must start with creation".into(),
895 )))
896 }
897 }
898
899 async fn get_pending_state(
900 &self,
901 execution_id: &ExecutionId,
902 ) -> Result<PendingState, DbErrorRead>;
903
904 async fn get_expired_timers(
906 &self,
907 at: DateTime<Utc>,
908 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
909
910 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
912
913 async fn subscribe_to_next_responses(
918 &self,
919 execution_id: &ExecutionId,
920 start_idx: usize,
921 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
922 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout>;
923
924 async fn wait_for_finished_result(
927 &self,
928 execution_id: &ExecutionId,
929 timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
930 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
931
932 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
933
934 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
935
936 async fn get_backtrace(
939 &self,
940 execution_id: &ExecutionId,
941 filter: BacktraceFilter,
942 ) -> Result<BacktraceInfo, DbErrorRead>;
943
944 async fn list_executions(
947 &self,
948 ffqn: Option<FunctionFqn>,
949 top_level_only: bool,
950 pagination: ExecutionListPagination,
951 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
952
953 async fn list_responses(
957 &self,
958 execution_id: &ExecutionId,
959 pagination: Pagination<u32>,
960 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead>;
961
962 async fn upgrade_execution_component(
963 &self,
964 execution_id: &ExecutionId,
965 old: &InputContentDigest,
966 new: &InputContentDigest,
967 ) -> Result<(), DbErrorWrite>;
968}
969
970#[derive(Clone, Copy, Debug, PartialEq, Eq)]
971pub enum CancelOutcome {
972 Cancelled,
973 AlreadyFinished,
974}
975
976pub async fn stub_execution(
977 db_connection: &dyn DbConnection,
978 execution_id: ExecutionIdDerived,
979 parent_execution_id: ExecutionId,
980 join_set_id: JoinSetId,
981 created_at: DateTime<Utc>,
982 return_value: SupportedFunctionReturnValue,
983) -> Result<(), DbErrorWrite> {
984 let stub_finished_version = Version::new(1); let write_attempt = {
987 let finished_req = AppendRequest {
988 created_at,
989 event: ExecutionRequest::Finished {
990 result: return_value.clone(),
991 http_client_traces: None,
992 },
993 };
994 db_connection
995 .append_batch_respond_to_parent(
996 AppendEventsToExecution {
997 execution_id: ExecutionId::Derived(execution_id.clone()),
998 version: stub_finished_version.clone(),
999 batch: vec![finished_req],
1000 },
1001 AppendResponseToExecution {
1002 parent_execution_id,
1003 created_at,
1004 join_set_id,
1005 child_execution_id: execution_id.clone(),
1006 finished_version: stub_finished_version.clone(),
1007 result: return_value.clone(),
1008 },
1009 created_at,
1010 )
1011 .await
1012 };
1013 if let Err(write_attempt) = write_attempt {
1014 debug!("Stub write attempt failed - {write_attempt:?}");
1016
1017 let found = db_connection
1018 .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1019 .await?; match found.event {
1021 ExecutionRequest::Finished {
1022 result: found_result,
1023 ..
1024 } if return_value == found_result => {
1025 Ok(())
1027 }
1028 ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1029 DbErrorWriteNonRetriable::Conflict,
1030 )),
1031 _other => Err(DbErrorWrite::NonRetriable(
1032 DbErrorWriteNonRetriable::IllegalState(
1033 "unexpected execution event at stubbed execution".into(),
1034 ),
1035 )),
1036 }
1037 } else {
1038 Ok(())
1039 }
1040}
1041
1042pub async fn cancel_delay(
1043 db_connection: &dyn DbConnection,
1044 delay_id: DelayId,
1045 created_at: DateTime<Utc>,
1046) -> Result<CancelOutcome, DbErrorWrite> {
1047 let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1048 db_connection
1049 .append_delay_response(
1050 created_at,
1051 parent_execution_id,
1052 join_set_id,
1053 delay_id,
1054 Err(()), )
1056 .await
1057 .map(|ok| match ok {
1058 AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1059 CancelOutcome::Cancelled
1060 }
1061 AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1062 })
1063}
1064
1065#[derive(Clone)]
1066pub enum BacktraceFilter {
1067 First,
1068 Last,
1069 Specific(Version),
1070}
1071
1072pub struct BacktraceInfo {
1073 pub execution_id: ExecutionId,
1074 pub component_id: ComponentId,
1075 pub version_min_including: Version,
1076 pub version_max_excluding: Version,
1077 pub wasm_backtrace: WasmBacktrace,
1078}
1079
1080#[derive(Serialize, Deserialize, Debug, Clone)]
1081pub struct WasmBacktrace {
1082 pub frames: Vec<FrameInfo>,
1083}
1084
1085#[derive(Serialize, Deserialize, Debug, Clone)]
1086pub struct FrameInfo {
1087 pub module: String,
1088 pub func_name: String,
1089 pub symbols: Vec<FrameSymbol>,
1090}
1091
1092#[derive(Serialize, Deserialize, Debug, Clone)]
1093pub struct FrameSymbol {
1094 pub func_name: Option<String>,
1095 pub file: Option<String>,
1096 pub line: Option<u32>,
1097 pub col: Option<u32>,
1098}
1099
1100mod wasm_backtrace {
1101 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1102
1103 impl WasmBacktrace {
1104 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1105 if backtrace.frames().is_empty() {
1106 None
1107 } else {
1108 Some(Self {
1109 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1110 })
1111 }
1112 }
1113 }
1114
1115 impl From<&wasmtime::FrameInfo> for FrameInfo {
1116 fn from(frame: &wasmtime::FrameInfo) -> Self {
1117 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1118 let mut func_name = String::new();
1119 wasmtime_environ::demangle_function_name_or_index(
1120 &mut func_name,
1121 frame.func_name(),
1122 frame.func_index() as usize,
1123 )
1124 .expect("writing to string must succeed");
1125 Self {
1126 module: module_name,
1127 func_name,
1128 symbols: frame
1129 .symbols()
1130 .iter()
1131 .map(std::convert::Into::into)
1132 .collect(),
1133 }
1134 }
1135 }
1136
1137 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1138 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1139 let func_name = symbol.name().map(|name| {
1140 let mut writer = String::new();
1141 wasmtime_environ::demangle_function_name(&mut writer, name)
1142 .expect("writing to string must succeed");
1143 writer
1144 });
1145
1146 Self {
1147 func_name,
1148 file: symbol.file().map(ToString::to_string),
1149 line: symbol.line(),
1150 col: symbol.column(),
1151 }
1152 }
1153 }
1154}
1155
1156pub type ResponseCursorType = u32; #[derive(Debug, Clone, Serialize)]
1159pub struct ResponseWithCursor {
1160 pub event: JoinSetResponseEventOuter,
1161 pub cursor: ResponseCursorType,
1162}
1163
1164#[derive(Debug)]
1165pub struct ExecutionWithState {
1166 pub execution_id: ExecutionId,
1167 pub ffqn: FunctionFqn,
1168 pub pending_state: PendingState,
1169 pub created_at: DateTime<Utc>,
1170 pub first_scheduled_at: DateTime<Utc>,
1171 pub component_digest: InputContentDigest,
1172}
1173
1174#[derive(Debug, Clone)]
1175pub enum ExecutionListPagination {
1176 CreatedBy(Pagination<Option<DateTime<Utc>>>),
1177 ExecutionId(Pagination<Option<ExecutionId>>),
1178}
1179impl Default for ExecutionListPagination {
1180 fn default() -> ExecutionListPagination {
1181 ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1182 length: 20,
1183 cursor: None,
1184 including_cursor: false, })
1186 }
1187}
1188impl ExecutionListPagination {
1189 #[must_use]
1190 pub fn length(&self) -> u8 {
1191 match self {
1192 ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1193 ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1194 }
1195 }
1196}
1197
1198#[derive(Debug, Clone, Copy)]
1199pub enum Pagination<T> {
1200 NewerThan {
1201 length: u8,
1202 cursor: T,
1203 including_cursor: bool,
1204 },
1205 OlderThan {
1206 length: u8,
1207 cursor: T,
1208 including_cursor: bool,
1209 },
1210}
1211impl<T> Pagination<T> {
1212 pub fn length(&self) -> u8 {
1213 match self {
1214 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1215 }
1216 }
1217 pub fn rel(&self) -> &'static str {
1218 match self {
1219 Pagination::NewerThan {
1220 including_cursor: false,
1221 ..
1222 } => ">",
1223 Pagination::NewerThan {
1224 including_cursor: true,
1225 ..
1226 } => ">=",
1227 Pagination::OlderThan {
1228 including_cursor: false,
1229 ..
1230 } => "<",
1231 Pagination::OlderThan {
1232 including_cursor: true,
1233 ..
1234 } => "<=",
1235 }
1236 }
1237 pub fn is_desc(&self) -> bool {
1238 matches!(self, Pagination::OlderThan { .. })
1239 }
1240}
1241
1242#[cfg(feature = "test")]
1243pub async fn wait_for_pending_state_fn<T: Debug>(
1244 db_connection: &dyn DbConnection,
1245 execution_id: &ExecutionId,
1246 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1247 timeout: Option<Duration>,
1248) -> Result<T, DbErrorReadWithTimeout> {
1249 tracing::trace!(%execution_id, "Waiting for predicate");
1250 let fut = async move {
1251 loop {
1252 let execution_log = db_connection.get(execution_id).await?;
1253 if let Some(t) = predicate(execution_log) {
1254 tracing::debug!(%execution_id, "Found: {t:?}");
1255 return Ok(t);
1256 }
1257 tokio::time::sleep(Duration::from_millis(10)).await;
1258 }
1259 };
1260
1261 if let Some(timeout) = timeout {
1262 tokio::select! { res = fut => res,
1264 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout)
1265 }
1266 } else {
1267 fut.await
1268 }
1269}
1270
1271#[derive(Debug, Clone, PartialEq, Eq)]
1272pub enum ExpiredTimer {
1273 Lock(ExpiredLock),
1274 Delay(ExpiredDelay),
1275}
1276
1277#[derive(Debug, Clone, PartialEq, Eq)]
1278pub struct ExpiredLock {
1279 pub execution_id: ExecutionId,
1280 pub locked_at_version: Version,
1282 pub next_version: Version,
1283 pub intermittent_event_count: u32,
1285 pub max_retries: u32,
1286 pub retry_exp_backoff: Duration,
1287 pub locked_by: LockedBy,
1288}
1289
1290#[derive(Debug, Clone, PartialEq, Eq)]
1291pub struct ExpiredDelay {
1292 pub execution_id: ExecutionId,
1293 pub join_set_id: JoinSetId,
1294 pub delay_id: DelayId,
1295}
1296
1297#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1298#[serde(tag = "status")]
1299pub enum PendingState {
1300 Locked(PendingStateLocked),
1301 #[display("PendingAt(`{scheduled_at}`)")]
1302 PendingAt {
1303 scheduled_at: DateTime<Utc>,
1304 last_lock: Option<LockedBy>, component_id_input_digest: InputContentDigest,
1306 }, #[display("BlockedByJoinSet({join_set_id},`{lock_expires_at}`)")]
1308 BlockedByJoinSet {
1310 join_set_id: JoinSetId,
1311 lock_expires_at: DateTime<Utc>,
1313 closing: bool,
1315 component_id_input_digest: InputContentDigest,
1316 },
1317 #[display("Finished({finished})")]
1318 Finished {
1319 #[serde(flatten)]
1320 finished: PendingStateFinished,
1321 component_id_input_digest: InputContentDigest,
1322 },
1323}
1324impl PendingState {
1325 #[must_use]
1326 pub fn get_component_id_input_digest(&self) -> &InputContentDigest {
1327 match self {
1328 PendingState::Locked(pending_state_locked) => {
1329 &pending_state_locked.component_id_input_digest
1330 }
1331 PendingState::PendingAt {
1332 component_id_input_digest,
1333 ..
1334 }
1335 | PendingState::BlockedByJoinSet {
1336 component_id_input_digest,
1337 ..
1338 }
1339 | PendingState::Finished {
1340 component_id_input_digest,
1341 ..
1342 } => component_id_input_digest,
1343 }
1344 }
1345}
1346
1347#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1348#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1349pub struct PendingStateLocked {
1350 pub locked_by: LockedBy,
1351 pub lock_expires_at: DateTime<Utc>,
1352 pub component_id_input_digest: InputContentDigest,
1353}
1354
1355#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1356pub struct LockedBy {
1357 pub executor_id: ExecutorId,
1358 pub run_id: RunId,
1359}
1360
1361#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1362pub struct PendingStateFinished {
1363 pub version: VersionType, pub finished_at: DateTime<Utc>,
1365 pub result_kind: PendingStateFinishedResultKind,
1366}
1367impl Display for PendingStateFinished {
1368 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1369 match self.result_kind {
1370 PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1371 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1372 }
1373 }
1374}
1375
1376#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1378#[serde(rename_all = "snake_case")]
1379pub enum PendingStateFinishedResultKind {
1380 Ok,
1381 Err(PendingStateFinishedError),
1382}
1383impl PendingStateFinishedResultKind {
1384 pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1385 match self {
1386 PendingStateFinishedResultKind::Ok => Ok(()),
1387 PendingStateFinishedResultKind::Err(err) => Err(err),
1388 }
1389 }
1390}
1391
1392impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1393 fn from(result: &SupportedFunctionReturnValue) -> Self {
1394 result.as_pending_state_finished_result()
1395 }
1396}
1397
1398#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1399pub enum PendingStateFinishedError {
1400 #[display("execution terminated: {_0}")]
1401 ExecutionFailure(ExecutionFailureKind),
1402 #[display("execution completed with an error")]
1403 FallibleError,
1404}
1405
1406impl PendingState {
1407 pub fn can_append_lock(
1408 &self,
1409 created_at: DateTime<Utc>,
1410 executor_id: ExecutorId,
1411 run_id: RunId,
1412 lock_expires_at: DateTime<Utc>,
1413 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1414 if lock_expires_at <= created_at {
1415 return Err(DbErrorWriteNonRetriable::ValidationFailed(
1416 "invalid expiry date".into(),
1417 ));
1418 }
1419 match self {
1420 PendingState::PendingAt {
1421 scheduled_at,
1422 last_lock,
1423 component_id_input_digest: _,
1424 } => {
1425 if *scheduled_at <= created_at {
1426 Ok(LockKind::CreatingNewLock)
1428 } else if let Some(LockedBy {
1429 executor_id: last_executor_id,
1430 run_id: last_run_id,
1431 }) = last_lock
1432 && executor_id == *last_executor_id
1433 && run_id == *last_run_id
1434 {
1435 Ok(LockKind::Extending)
1437 } else {
1438 Err(DbErrorWriteNonRetriable::ValidationFailed(
1439 "cannot lock, not yet pending".into(),
1440 ))
1441 }
1442 }
1443 PendingState::Locked(PendingStateLocked {
1444 locked_by:
1445 LockedBy {
1446 executor_id: current_pending_state_executor_id,
1447 run_id: current_pending_state_run_id,
1448 },
1449 lock_expires_at: _,
1450 component_id_input_digest: _,
1451 }) => {
1452 if executor_id == *current_pending_state_executor_id
1453 && run_id == *current_pending_state_run_id
1454 {
1455 Ok(LockKind::Extending)
1457 } else {
1458 Err(DbErrorWriteNonRetriable::IllegalState(
1459 "cannot lock, already locked".into(),
1460 ))
1461 }
1462 }
1463 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1464 "cannot append Locked event when in BlockedByJoinSet state".into(),
1465 )),
1466 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState(
1467 "already finished".into(),
1468 )),
1469 }
1470 }
1471
1472 #[must_use]
1473 pub fn is_finished(&self) -> bool {
1474 matches!(self, PendingState::Finished { .. })
1475 }
1476}
1477
1478#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1479pub enum LockKind {
1480 Extending,
1481 CreatingNewLock,
1482}
1483
1484pub mod http_client_trace {
1485 use chrono::{DateTime, Utc};
1486 use serde::{Deserialize, Serialize};
1487
1488 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1489 pub struct HttpClientTrace {
1490 pub req: RequestTrace,
1491 pub resp: Option<ResponseTrace>,
1492 }
1493
1494 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1495 pub struct RequestTrace {
1496 pub sent_at: DateTime<Utc>,
1497 pub uri: String,
1498 pub method: String,
1499 }
1500
1501 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1502 pub struct ResponseTrace {
1503 pub finished_at: DateTime<Utc>,
1504 pub status: Result<u16, String>,
1505 }
1506}
1507
1508#[cfg(test)]
1509mod tests {
1510 use super::HistoryEventScheduleAt;
1511 use super::PendingStateFinished;
1512 use super::PendingStateFinishedError;
1513 use super::PendingStateFinishedResultKind;
1514 use crate::ExecutionFailureKind;
1515 use crate::SupportedFunctionReturnValue;
1516 use chrono::DateTime;
1517 use chrono::Datelike;
1518 use chrono::Utc;
1519 use insta::assert_snapshot;
1520 use rstest::rstest;
1521 use std::time::Duration;
1522 use val_json::type_wrapper::TypeWrapper;
1523 use val_json::wast_val::WastVal;
1524 use val_json::wast_val::WastValWithType;
1525
1526 #[rstest(expected => [
1527 PendingStateFinishedResultKind::Ok,
1528 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1529 ])]
1530 #[test]
1531 fn serde_pending_state_finished_result_kind_should_work(
1532 expected: PendingStateFinishedResultKind,
1533 ) {
1534 let ser = serde_json::to_string(&expected).unwrap();
1535 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
1536 assert_eq!(expected, actual);
1537 }
1538
1539 #[rstest(result_kind => [
1540 PendingStateFinishedResultKind::Ok,
1541 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
1542 ])]
1543 #[test]
1544 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
1545 let expected = PendingStateFinished {
1546 version: 0,
1547 finished_at: Utc::now(),
1548 result_kind,
1549 };
1550
1551 let ser = serde_json::to_string(&expected).unwrap();
1552 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
1553 assert_eq!(expected, actual);
1554 }
1555
1556 #[test]
1557 fn join_set_deser_with_result_ok_option_none_should_work() {
1558 let expected = SupportedFunctionReturnValue::Ok {
1559 ok: Some(WastValWithType {
1560 r#type: TypeWrapper::Result {
1561 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
1562 err: Some(Box::new(TypeWrapper::String)),
1563 },
1564 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
1565 }),
1566 };
1567 let json = serde_json::to_string(&expected).unwrap();
1568 assert_snapshot!(json);
1569
1570 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
1571
1572 assert_eq!(expected, actual);
1573 }
1574
1575 #[test]
1576 fn as_date_time_should_work_with_duration_u32_max_secs() {
1577 let duration = Duration::from_secs(u64::from(u32::MAX));
1578 let schedule_at = HistoryEventScheduleAt::In(duration);
1579 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
1580 assert_eq!(2106, resolved.year());
1581 }
1582
1583 const MILLIS_PER_SEC: i64 = 1000;
1584 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
1585
1586 #[test]
1587 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
1588 let duration = Duration::from_secs(
1590 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
1591 );
1592 let schedule_at = HistoryEventScheduleAt::In(duration);
1593 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
1594 }
1595}