1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionError;
8use crate::FunctionFqn;
9use crate::JoinSetId;
10use crate::Params;
11use crate::StrVariant;
12use crate::SupportedFunctionReturnValue;
13use crate::component_id::InputContentDigest;
14use crate::prefixed_ulid::DelayId;
15use crate::prefixed_ulid::DeploymentId;
16use crate::prefixed_ulid::ExecutionIdDerived;
17use crate::prefixed_ulid::ExecutorId;
18use crate::prefixed_ulid::RunId;
19use assert_matches::assert_matches;
20use async_trait::async_trait;
21use chrono::TimeDelta;
22use chrono::{DateTime, Utc};
23use http_client_trace::HttpClientTrace;
24use serde::Deserialize;
25use serde::Serialize;
26use std::fmt::Debug;
27use std::fmt::Display;
28use std::panic::Location;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33use tracing::instrument;
34use tracing_error::SpanTrace;
35
36pub const STATE_PENDING_AT: &str = "pending_at";
38pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
39pub const STATE_LOCKED: &str = "locked";
40pub const STATE_FINISHED: &str = "finished";
41pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; #[derive(Debug, PartialEq, Eq, Clone)]
44pub struct ExecutionLog {
45 pub execution_id: ExecutionId,
46 pub events: Vec<ExecutionEvent>,
47 pub responses: Vec<ResponseWithCursor>,
48 pub next_version: Version, pub pending_state: PendingState, pub component_digest: InputContentDigest, pub component_type: ComponentType,
52 pub deployment_id: DeploymentId, }
54
55impl ExecutionLog {
56 #[must_use]
59 pub fn can_be_retried_after(
60 temporary_event_count: u32,
61 max_retries: Option<u32>,
62 retry_exp_backoff: Duration,
63 ) -> Option<Duration> {
64 if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
66 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
68 Some(duration)
69 } else {
70 None
71 }
72 }
73
74 #[must_use]
75 pub fn compute_retry_duration_when_retrying_forever(
76 temporary_event_count: u32,
77 retry_exp_backoff: Duration,
78 ) -> Duration {
79 Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
80 .expect("`max_retries` set to MAX must never return None")
81 }
82
83 #[must_use]
84 pub fn ffqn(&self) -> &FunctionFqn {
85 assert_matches!(self.events.first(), Some(ExecutionEvent {
86 event: ExecutionRequest::Created { ffqn, .. },
87 ..
88 }) => ffqn)
89 }
90
91 #[must_use]
92 pub fn params(&self) -> &Params {
93 assert_matches!(self.events.first(), Some(ExecutionEvent {
94 event: ExecutionRequest::Created { params, .. },
95 ..
96 }) => params)
97 }
98
99 #[must_use]
100 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
101 assert_matches!(self.events.first(), Some(ExecutionEvent {
102 event: ExecutionRequest::Created { parent, .. },
103 ..
104 }) => parent.clone())
105 }
106
107 #[must_use]
108 pub fn last_event(&self) -> &ExecutionEvent {
109 self.events.last().expect("must contain at least one event")
110 }
111
112 #[must_use]
113 pub fn is_finished(&self) -> bool {
114 matches!(
115 self.events.last(),
116 Some(ExecutionEvent {
117 event: ExecutionRequest::Finished { .. },
118 ..
119 })
120 )
121 }
122
123 #[must_use]
124 pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
125 if let ExecutionEvent {
126 event: ExecutionRequest::Finished { result, .. },
127 ..
128 } = self.events.last().expect("must contain at least one event")
129 {
130 Some(result.clone())
131 } else {
132 None
133 }
134 }
135
136 pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
137 self.events.iter().filter_map(|event| {
138 if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
139 Some((eh.clone(), event.version.clone()))
140 } else {
141 None
142 }
143 })
144 }
145
146 #[cfg(feature = "test")]
147 #[must_use]
148 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
149 self.events
150 .iter()
151 .find_map(move |event| match &event.event {
152 ExecutionRequest::HistoryEvent {
153 event:
154 HistoryEvent::JoinSetRequest {
155 join_set_id: found,
156 request,
157 },
158 ..
159 } if *join_set_id == *found => Some(request),
160 _ => None,
161 })
162 }
163}
164
165pub type VersionType = u32;
166#[derive(
167 Debug,
168 Default,
169 Clone,
170 PartialEq,
171 Eq,
172 Hash,
173 derive_more::Display,
174 derive_more::Into,
175 serde::Serialize,
176 serde::Deserialize,
177)]
178#[serde(transparent)]
179pub struct Version(pub VersionType);
180impl Version {
181 #[must_use]
182 pub fn new(arg: VersionType) -> Version {
183 Version(arg)
184 }
185
186 #[must_use]
187 pub fn increment(&self) -> Version {
188 Version(self.0 + 1)
189 }
190}
191impl TryFrom<i64> for Version {
192 type Error = VersionParseError;
193 fn try_from(value: i64) -> Result<Self, Self::Error> {
194 VersionType::try_from(value)
195 .map(Version::new)
196 .map_err(|_| VersionParseError)
197 }
198}
199impl From<Version> for usize {
200 fn from(value: Version) -> Self {
201 usize::try_from(value.0).expect("16 bit systems are unsupported")
202 }
203}
204impl From<&Version> for usize {
205 fn from(value: &Version) -> Self {
206 usize::try_from(value.0).expect("16 bit systems are unsupported")
207 }
208}
209
210#[derive(Debug, thiserror::Error)]
211#[error("version must be u32")]
212pub struct VersionParseError;
213
214#[derive(
215 Clone, Debug, derive_more::Display, PartialEq, Eq, serde::Serialize, serde::Deserialize,
216)]
217#[display("{event}")]
218pub struct ExecutionEvent {
219 pub created_at: DateTime<Utc>,
220 pub event: ExecutionRequest,
221 #[serde(skip_serializing_if = "Option::is_none")]
222 pub backtrace_id: Option<Version>,
223 pub version: Version,
224}
225
226#[derive(
227 Debug,
228 Clone,
229 Copy,
230 PartialEq,
231 Eq,
232 derive_more::Display,
233 derive_more::Into,
234 Serialize, )]
236pub struct ResponseCursor(pub u32);
237
238#[derive(Debug, Clone, PartialEq, Eq, Serialize )]
239pub struct ResponseWithCursor {
240 pub event: JoinSetResponseEventOuter,
241 pub cursor: ResponseCursor,
242}
243
244#[derive(Debug)]
245pub struct ListExecutionEventsResponse {
246 pub events: Vec<ExecutionEvent>,
247 pub max_version: Version,
248}
249
250#[derive(Debug)]
251pub struct ListResponsesResponse {
252 pub responses: Vec<ResponseWithCursor>,
253 pub max_cursor: ResponseCursor,
254}
255
256#[derive(Debug, Clone, PartialEq, Eq, Serialize )]
257pub struct JoinSetResponseEventOuter {
258 pub created_at: DateTime<Utc>,
259 pub event: JoinSetResponseEvent,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
263pub struct JoinSetResponseEvent {
264 pub join_set_id: JoinSetId,
265 pub event: JoinSetResponse,
266}
267
268#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
269#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
270#[serde(tag = "type", rename_all = "snake_case")]
271pub enum JoinSetResponse {
272 #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
273 DelayFinished {
274 delay_id: DelayId,
275 result: Result<(), ()>,
276 },
277 #[display("{result}: {child_execution_id}")] ChildExecutionFinished {
279 child_execution_id: ExecutionIdDerived,
280 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
281 finished_version: Version,
282 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
283 result: SupportedFunctionReturnValue,
284 },
285}
286
287pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
288 ffqn: FunctionFqn::new_static("", ""),
289 params: Params::empty(),
290 parent: None,
291 scheduled_at: DateTime::from_timestamp_nanos(0),
292 component_id: ComponentId::dummy_activity(),
293 deployment_id: DeploymentId::from_parts(0, 0),
294 metadata: ExecutionMetadata::empty(),
295 scheduled_by: None,
296};
297pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
298 event: HistoryEvent::JoinSetCreate {
299 join_set_id: JoinSetId {
300 kind: crate::JoinSetKind::OneOff,
301 name: StrVariant::empty(),
302 },
303 },
304};
305
306#[derive(
307 Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
308)]
309#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
310#[serde(rename_all = "snake_case")]
311pub enum ExecutionRequest {
312 #[display("Created({ffqn}, `{scheduled_at}`)")]
313 Created {
314 ffqn: FunctionFqn,
315 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
316 #[debug(skip)]
317 params: Params,
318 parent: Option<(ExecutionId, JoinSetId)>,
319 scheduled_at: DateTime<Utc>,
320 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
321 component_id: ComponentId,
322 deployment_id: DeploymentId,
323 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
324 metadata: ExecutionMetadata,
325 scheduled_by: Option<ExecutionId>,
326 },
327 Locked(Locked),
328 #[display("Unlocked(`{backoff_expires_at}`)")]
333 Unlocked {
334 backoff_expires_at: DateTime<Utc>,
335 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
336 reason: StrVariant,
337 },
338 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
341 TemporarilyFailed {
342 backoff_expires_at: DateTime<Utc>,
343 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
344 reason: StrVariant,
345 detail: Option<String>,
346 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
347 http_client_traces: Option<Vec<HttpClientTrace>>,
348 },
349 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
352 TemporarilyTimedOut {
353 backoff_expires_at: DateTime<Utc>,
354 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
355 http_client_traces: Option<Vec<HttpClientTrace>>,
356 },
357 #[display("Finished")]
359 Finished {
360 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
361 result: SupportedFunctionReturnValue,
362 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
363 http_client_traces: Option<Vec<HttpClientTrace>>,
364 },
365
366 #[display("HistoryEvent({event})")]
367 HistoryEvent {
368 event: HistoryEvent,
369 },
370 #[display("Paused")]
371 Paused,
372 #[display("Unpaused")]
373 Unpaused,
374}
375
376impl ExecutionRequest {
377 #[must_use]
378 pub fn is_temporary_event(&self) -> bool {
379 matches!(
380 self,
381 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
382 )
383 }
384
385 #[must_use]
387 pub const fn variant(&self) -> &'static str {
388 match self {
389 ExecutionRequest::Created { .. } => "created",
390 ExecutionRequest::Locked(_) => "locked",
391 ExecutionRequest::Unlocked { .. } => "unlocked",
392 ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
393 ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
394 ExecutionRequest::Finished { .. } => "finished",
395 ExecutionRequest::HistoryEvent { .. } => "history_event",
396 ExecutionRequest::Paused => "paused",
397 ExecutionRequest::Unpaused => "unpaused",
398 }
399 }
400
401 #[must_use]
402 pub fn join_set_id(&self) -> Option<&JoinSetId> {
403 match self {
404 Self::Created {
405 parent: Some((_parent_id, join_set_id)),
406 ..
407 } => Some(join_set_id),
408 Self::HistoryEvent {
409 event:
410 HistoryEvent::JoinSetCreate { join_set_id, .. }
411 | HistoryEvent::JoinSetRequest { join_set_id, .. }
412 | HistoryEvent::JoinNext { join_set_id, .. },
413 } => Some(join_set_id),
414 _ => None,
415 }
416 }
417}
418
419#[derive(
420 Clone, derive_more::Debug, derive_more::Display, PartialEq, Eq, Serialize, Deserialize,
421)]
422#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
423#[display("Locked(`{lock_expires_at}`, {component_id})")]
424pub struct Locked {
425 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
426 pub component_id: ComponentId,
427 pub executor_id: ExecutorId,
428 pub deployment_id: DeploymentId,
429 pub run_id: RunId,
430 pub lock_expires_at: DateTime<Utc>,
431 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
432 pub retry_config: ComponentRetryConfig,
433}
434
435#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
436#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
437#[serde(tag = "type", rename_all = "snake_case")]
438pub enum PersistKind {
439 #[display("RandomU64({min}, {max_inclusive})")]
440 RandomU64 { min: u64, max_inclusive: u64 },
441 #[display("RandomString({min_length}, {max_length_exclusive})")]
442 RandomString {
443 min_length: u64,
444 max_length_exclusive: u64,
445 },
446}
447
448#[must_use]
449pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
450 value.to_be_bytes()
451}
452
453#[must_use]
454pub fn from_bytes_to_u64(bytes: [u8; 8]) -> u64 {
455 u64::from_be_bytes(bytes)
456}
457
458#[derive(
459 derive_more::Debug, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize,
460)]
461#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
462#[serde(tag = "type", rename_all = "snake_case")]
463pub enum HistoryEvent {
465 #[display("Persist")]
467 Persist {
468 #[debug(skip)]
469 value: Vec<u8>, kind: PersistKind,
471 },
472 #[display("JoinSetCreate({join_set_id})")]
473 JoinSetCreate { join_set_id: JoinSetId },
474 #[display("JoinSetRequest({request})")]
475 JoinSetRequest {
477 join_set_id: JoinSetId,
478 request: JoinSetRequest,
479 },
480 #[display("JoinNext({join_set_id})")]
486 JoinNext {
487 join_set_id: JoinSetId,
488 run_expires_at: DateTime<Utc>,
491 requested_ffqn: Option<FunctionFqn>,
494 closing: bool,
496 },
497 #[display("JoinNextTry({join_set_id})")]
499 JoinNextTry {
500 join_set_id: JoinSetId,
501 found_response: bool, },
503 #[display("JoinNextTooMany({join_set_id})")]
505 JoinNextTooMany {
506 join_set_id: JoinSetId,
507 requested_ffqn: Option<FunctionFqn>,
510 },
511 #[display("Schedule({execution_id}, {schedule_at})")]
512 Schedule {
513 execution_id: ExecutionId,
514 schedule_at: HistoryEventScheduleAt, },
516 #[display("Stub({target_execution_id})")]
517 Stub {
518 target_execution_id: ExecutionIdDerived,
519 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
520 result: SupportedFunctionReturnValue, persist_result: Result<(), ()>, },
523}
524
525#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
526#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
527#[serde(rename_all = "snake_case")]
528pub enum HistoryEventScheduleAt {
529 Now,
530 #[display("At(`{_0}`)")]
531 At(DateTime<Utc>),
532 #[display("In({_0:?})")]
533 In(Duration),
534}
535
536#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
537pub enum ScheduleAtConversionError {
538 #[error("source duration value is out of range")]
539 OutOfRangeError,
540}
541
542impl HistoryEventScheduleAt {
543 pub fn as_date_time(
544 &self,
545 now: DateTime<Utc>,
546 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
547 match self {
548 Self::Now => Ok(now),
549 Self::At(date_time) => Ok(*date_time),
550 Self::In(duration) => {
551 let time_delta = TimeDelta::from_std(*duration)
552 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
553 now.checked_add_signed(time_delta)
554 .ok_or(ScheduleAtConversionError::OutOfRangeError)
555 }
556 }
557 }
558}
559
560#[derive(Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
561#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
562#[serde(tag = "type", rename_all = "snake_case")]
563pub enum JoinSetRequest {
564 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
566 DelayRequest {
567 delay_id: DelayId,
568 expires_at: DateTime<Utc>,
569 schedule_at: HistoryEventScheduleAt,
570 },
571 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
573 ChildExecutionRequest {
574 child_execution_id: ExecutionIdDerived,
575 target_ffqn: FunctionFqn,
576 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
577 params: Params,
578 },
579}
580
581#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
583pub enum DbErrorGeneric {
584 #[error("database error: {reason}")]
585 Uncategorized {
586 reason: StrVariant,
587 #[eq(skip)]
588 #[partial_eq(skip)]
589 context: SpanTrace,
590 #[eq(skip)]
591 #[partial_eq(skip)]
592 #[source]
593 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
594 loc: &'static Location<'static>,
595 },
596 #[error("database was closed")]
597 Close,
598}
599
600#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
601pub enum DbErrorWriteNonRetriable {
602 #[error("validation failed: {0}")]
603 ValidationFailed(StrVariant),
604 #[error("conflict")]
605 Conflict,
606 #[error("illegal state: {reason}")]
607 IllegalState {
608 reason: StrVariant,
609 #[eq(skip)]
610 #[partial_eq(skip)]
611 context: SpanTrace,
612 #[eq(skip)]
613 #[partial_eq(skip)]
614 #[source]
615 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
616 loc: &'static Location<'static>,
617 },
618 #[error("version conflict: expected: {expected}, got: {requested}")]
619 VersionConflict {
620 expected: Version,
621 requested: Version,
622 },
623}
624
625#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
627pub enum DbErrorWrite {
628 #[error("cannot write - row not found")]
629 NotFound,
630 #[error("non-retriable error: {0}")]
631 NonRetriable(#[from] DbErrorWriteNonRetriable),
632 #[error(transparent)]
633 Generic(#[from] DbErrorGeneric),
634}
635
636#[derive(Debug, Clone, thiserror::Error, PartialEq)]
638pub enum DbErrorRead {
639 #[error("cannot read - row not found")]
640 NotFound,
641 #[error(transparent)]
642 Generic(#[from] DbErrorGeneric),
643}
644
645#[derive(Debug, thiserror::Error, PartialEq)]
646pub enum DbErrorReadWithTimeout {
647 #[error("timeout")]
648 Timeout(TimeoutOutcome),
649 #[error(transparent)]
650 DbErrorRead(#[from] DbErrorRead),
651}
652
653pub type AppendResponse = Version;
656pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
657
658#[derive(Debug, Clone)]
659pub struct LockedExecution {
660 pub execution_id: ExecutionId,
661 pub next_version: Version,
662 pub metadata: ExecutionMetadata,
663 pub locked_event: Locked,
664 pub ffqn: FunctionFqn,
665 pub params: Params,
666 pub event_history: Vec<(HistoryEvent, Version)>,
667 pub responses: Vec<ResponseWithCursor>,
668 pub parent: Option<(ExecutionId, JoinSetId)>,
669 pub intermittent_event_count: u32,
670}
671
672pub type LockPendingResponse = Vec<LockedExecution>;
673pub type AppendBatchResponse = Version;
674
675#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
676#[display("{event}")]
677pub struct AppendRequest {
678 pub created_at: DateTime<Utc>,
679 pub event: ExecutionRequest,
680}
681
682#[derive(Debug, Clone)]
683pub struct CreateRequest {
684 pub created_at: DateTime<Utc>,
685 pub execution_id: ExecutionId,
686 pub ffqn: FunctionFqn,
687 pub params: Params,
688 pub parent: Option<(ExecutionId, JoinSetId)>,
689 pub scheduled_at: DateTime<Utc>,
690 pub component_id: ComponentId,
691 pub deployment_id: DeploymentId,
692 pub metadata: ExecutionMetadata,
693 pub scheduled_by: Option<ExecutionId>,
694}
695
696impl From<CreateRequest> for ExecutionRequest {
697 fn from(value: CreateRequest) -> Self {
698 Self::Created {
699 ffqn: value.ffqn,
700 params: value.params,
701 parent: value.parent,
702 scheduled_at: value.scheduled_at,
703 component_id: value.component_id,
704 deployment_id: value.deployment_id,
705 metadata: value.metadata,
706 scheduled_by: value.scheduled_by,
707 }
708 }
709}
710
711#[async_trait]
712pub trait DbPool: Send + Sync {
713 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
714
715 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
716
717 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
718
719 #[cfg(feature = "test")]
720 async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
721}
722
723#[async_trait]
724pub trait DbPoolCloseable {
725 async fn close(&self);
726}
727
728#[derive(Clone, Debug)]
729pub struct AppendEventsToExecution {
730 pub execution_id: ExecutionId,
731 pub version: Version,
732 pub batch: Vec<AppendRequest>,
733}
734
735#[derive(Clone, Debug)]
736pub struct AppendResponseToExecution {
737 pub parent_execution_id: ExecutionId,
738 pub created_at: DateTime<Utc>,
739 pub join_set_id: JoinSetId,
740 pub child_execution_id: ExecutionIdDerived,
741 pub finished_version: Version,
742 pub result: SupportedFunctionReturnValue,
743}
744
745#[async_trait]
746pub trait DbExecutor: Send + Sync {
747 #[expect(clippy::too_many_arguments)]
748 async fn lock_pending_by_ffqns(
749 &self,
750 batch_size: u32,
751 pending_at_or_sooner: DateTime<Utc>,
752 ffqns: Arc<[FunctionFqn]>,
753 created_at: DateTime<Utc>,
754 component_id: ComponentId,
755 deployment_id: DeploymentId,
756 executor_id: ExecutorId,
757 lock_expires_at: DateTime<Utc>,
758 run_id: RunId,
759 retry_config: ComponentRetryConfig,
760 ) -> Result<LockPendingResponse, DbErrorWrite>;
761
762 #[expect(clippy::too_many_arguments)]
763 async fn lock_pending_by_component_digest(
764 &self,
765 batch_size: u32,
766 pending_at_or_sooner: DateTime<Utc>,
767 component_id: &ComponentId,
768 deployment_id: DeploymentId,
769 created_at: DateTime<Utc>,
770 executor_id: ExecutorId,
771 lock_expires_at: DateTime<Utc>,
772 run_id: RunId,
773 retry_config: ComponentRetryConfig,
774 ) -> Result<LockPendingResponse, DbErrorWrite>;
775
776 #[cfg(feature = "test")]
777 #[expect(clippy::too_many_arguments)]
778 async fn lock_one(
779 &self,
780 created_at: DateTime<Utc>,
781 component_id: ComponentId,
782 deployment_id: DeploymentId,
783 execution_id: &ExecutionId,
784 run_id: RunId,
785 version: Version,
786 executor_id: ExecutorId,
787 lock_expires_at: DateTime<Utc>,
788 retry_config: ComponentRetryConfig,
789 ) -> Result<LockedExecution, DbErrorWrite>;
790
791 async fn append(
794 &self,
795 execution_id: ExecutionId,
796 version: Version,
797 req: AppendRequest,
798 ) -> Result<AppendResponse, DbErrorWrite>;
799
800 async fn append_batch_respond_to_parent(
803 &self,
804 events: AppendEventsToExecution,
805 response: AppendResponseToExecution,
806 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
808
809 async fn wait_for_pending_by_ffqn(
814 &self,
815 pending_at_or_sooner: DateTime<Utc>,
816 ffqns: Arc<[FunctionFqn]>,
817 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
818 );
819
820 async fn wait_for_pending_by_component_digest(
825 &self,
826 pending_at_or_sooner: DateTime<Utc>,
827 component_digest: &InputContentDigest,
828 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
829 );
830
831 async fn cancel_activity_with_retries(
832 &self,
833 execution_id: &ExecutionId,
834 cancelled_at: DateTime<Utc>,
835 ) -> Result<CancelOutcome, DbErrorWrite> {
836 let mut retries = 5;
837 loop {
838 let res = self.cancel_activity(execution_id, cancelled_at).await;
839 if res.is_ok() || retries == 0 {
840 return res;
841 }
842 retries -= 1;
843 }
844 }
845
846 async fn get_last_execution_event(
848 &self,
849 execution_id: &ExecutionId,
850 ) -> Result<ExecutionEvent, DbErrorRead>;
851
852 async fn cancel_activity(
853 &self,
854 execution_id: &ExecutionId,
855 cancelled_at: DateTime<Utc>,
856 ) -> Result<CancelOutcome, DbErrorWrite> {
857 debug!("Determining cancellation state of {execution_id}");
858
859 let last_event = self
860 .get_last_execution_event(execution_id)
861 .await
862 .map_err(DbErrorWrite::from)?;
863 if let ExecutionRequest::Finished {
864 result:
865 SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
866 kind: ExecutionFailureKind::Cancelled,
867 ..
868 }),
869 ..
870 } = last_event.event
871 {
872 return Ok(CancelOutcome::Cancelled);
873 } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
874 debug!("Not cancelling, {execution_id} is already finished");
875 return Ok(CancelOutcome::AlreadyFinished);
876 }
877 let finished_version = last_event.version.increment();
878 let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
879 reason: None,
880 kind: ExecutionFailureKind::Cancelled,
881 detail: None,
882 });
883 let cancel_request = AppendRequest {
884 created_at: cancelled_at,
885 event: ExecutionRequest::Finished {
886 result: child_result.clone(),
887 http_client_traces: None,
888 },
889 };
890 debug!("Cancelling activity {execution_id} at {finished_version}");
891 if let ExecutionId::Derived(execution_id) = execution_id {
892 let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
893 let child_execution_id = ExecutionId::Derived(execution_id.clone());
894 self.append_batch_respond_to_parent(
895 AppendEventsToExecution {
896 execution_id: child_execution_id,
897 version: finished_version.clone(),
898 batch: vec![cancel_request],
899 },
900 AppendResponseToExecution {
901 parent_execution_id,
902 created_at: cancelled_at,
903 join_set_id: join_set_id.clone(),
904 child_execution_id: execution_id.clone(),
905 finished_version,
906 result: child_result,
907 },
908 cancelled_at,
909 )
910 .await?;
911 } else {
912 self.append(execution_id.clone(), finished_version, cancel_request)
913 .await?;
914 }
915 debug!("Cancelled {execution_id}");
916 Ok(CancelOutcome::Cancelled)
917 }
918}
919
920pub enum AppendDelayResponseOutcome {
921 Success,
922 AlreadyFinished,
923 AlreadyCancelled,
924}
925
926#[derive(Debug, Clone, Default)]
927pub struct ListExecutionsFilter {
928 pub ffqn_prefix: Option<String>,
929 pub show_derived: bool,
930 pub hide_finished: bool,
931 pub execution_id_prefix: Option<String>,
932 pub component_digest: Option<InputContentDigest>,
933 pub deployment_id: Option<DeploymentId>,
934}
935
936#[async_trait]
937pub trait DbExternalApi: DbConnection {
938 async fn get_backtrace(
940 &self,
941 execution_id: &ExecutionId,
942 filter: BacktraceFilter,
943 ) -> Result<BacktraceInfo, DbErrorRead>;
944
945 async fn list_executions(
947 &self,
948 filter: ListExecutionsFilter,
949 pagination: ExecutionListPagination,
950 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
951
952 async fn list_execution_events(
957 &self,
958 execution_id: &ExecutionId,
959 pagination: Pagination<VersionType>,
960 include_backtrace_id: bool,
961 ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
962
963 async fn list_responses(
972 &self,
973 execution_id: &ExecutionId,
974 pagination: Pagination<u32>,
975 ) -> Result<ListResponsesResponse, DbErrorRead>;
976
977 async fn list_execution_events_responses(
978 &self,
979 execution_id: &ExecutionId,
980 req_since: &Version,
981 req_max_length: VersionType,
982 req_include_backtrace_id: bool,
983 resp_pagination: Pagination<VersionType>,
984 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
985
986 async fn upgrade_execution_component(
987 &self,
988 execution_id: &ExecutionId,
989 old: &InputContentDigest,
990 new: &InputContentDigest,
991 ) -> Result<(), DbErrorWrite>;
992
993 async fn list_logs(
994 &self,
995 execution_id: &ExecutionId,
996 filter: LogFilter,
997 pagination: Pagination<u32>,
998 ) -> Result<ListLogsResponse, DbErrorRead>;
999
1000 async fn list_deployment_states(
1001 &self,
1002 current_time: DateTime<Utc>,
1003 pagination: Pagination<Option<DeploymentId>>,
1004 ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1005
1006 async fn pause_execution(
1008 &self,
1009 execution_id: &ExecutionId,
1010 paused_at: DateTime<Utc>,
1011 ) -> Result<AppendResponse, DbErrorWrite>;
1012
1013 async fn unpause_execution(
1015 &self,
1016 execution_id: &ExecutionId,
1017 unpaused_at: DateTime<Utc>,
1018 ) -> Result<AppendResponse, DbErrorWrite>;
1019}
1020pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1021pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1022 Pagination::OlderThan {
1023 length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1024 cursor: None,
1025 including_cursor: false,
1026 };
1027
1028pub struct DeploymentState {
1029 pub deployment_id: DeploymentId,
1030 pub locked: u32,
1031 pub pending: u32,
1033 pub scheduled: u32,
1035 pub blocked: u32,
1036 pub finished: u32,
1037}
1038impl DeploymentState {
1039 #[must_use]
1040 pub fn new(deployment_id: DeploymentId) -> Self {
1041 DeploymentState {
1042 deployment_id,
1043 locked: 0,
1044 pending: 0,
1045 scheduled: 0,
1046 blocked: 0,
1047 finished: 0,
1048 }
1049 }
1050}
1051
1052#[derive(Debug)]
1053pub struct ListLogsResponse {
1054 pub items: Vec<LogEntryRow>,
1055 pub next_page: Pagination<u32>, pub prev_page: Option<Pagination<u32>>, }
1058
1059#[derive(Debug)]
1060pub struct LogFilter {
1061 show_logs: bool,
1062 show_streams: bool,
1063 levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>, }
1066impl LogFilter {
1067 #[must_use]
1069 pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1070 LogFilter {
1071 show_logs: true,
1072 show_streams: false,
1073 levels,
1074 stream_types: Vec::new(),
1075 }
1076 }
1077 #[must_use]
1079 pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1080 LogFilter {
1081 show_logs: false,
1082 show_streams: true,
1083 levels: Vec::new(),
1084 stream_types,
1085 }
1086 }
1087 #[must_use]
1089 pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1090 LogFilter {
1091 show_logs: true,
1092 show_streams: true,
1093 levels,
1094 stream_types,
1095 }
1096 }
1097 #[must_use]
1099 pub fn should_show_logs(&self) -> bool {
1100 self.show_logs
1101 }
1102 #[must_use]
1103 pub fn should_show_streams(&self) -> bool {
1104 self.show_streams
1105 }
1106 #[must_use]
1107 pub fn levels(&self) -> &Vec<LogLevel> {
1108 &self.levels
1109 }
1110 #[must_use]
1111 pub fn stream_types(&self) -> &Vec<LogStreamType> {
1112 &self.stream_types
1113 }
1114}
1115
1116#[derive(Debug, Clone)]
1117pub struct ExecutionWithStateRequestsResponses {
1118 pub execution_with_state: ExecutionWithState,
1119 pub events: Vec<ExecutionEvent>,
1120 pub responses: Vec<ResponseWithCursor>,
1121 pub max_version: Version,
1122 pub max_cursor: ResponseCursor,
1123}
1124
1125#[async_trait]
1126pub trait DbConnection: DbExecutor {
1127 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1129
1130 async fn append_delay_response(
1131 &self,
1132 created_at: DateTime<Utc>,
1133 execution_id: ExecutionId,
1134 join_set_id: JoinSetId,
1135 delay_id: DelayId,
1136 outcome: Result<(), ()>, ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1138
1139 async fn append_batch(
1142 &self,
1143 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
1145 execution_id: ExecutionId,
1146 version: Version,
1147 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1148
1149 async fn append_batch_create_new_execution(
1152 &self,
1153 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
1156 version: Version,
1157 child_req: Vec<CreateRequest>,
1158 backtraces: Vec<BacktraceInfo>,
1159 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1160
1161 async fn get_execution_event(
1163 &self,
1164 execution_id: &ExecutionId,
1165 version: &Version,
1166 ) -> Result<ExecutionEvent, DbErrorRead>;
1167
1168 #[instrument(skip(self))]
1169 async fn get_create_request(
1170 &self,
1171 execution_id: &ExecutionId,
1172 ) -> Result<CreateRequest, DbErrorRead> {
1173 let execution_event = self
1174 .get_execution_event(execution_id, &Version::new(0))
1175 .await?;
1176 if let ExecutionRequest::Created {
1177 ffqn,
1178 params,
1179 parent,
1180 scheduled_at,
1181 component_id,
1182 deployment_id,
1183 metadata,
1184 scheduled_by,
1185 } = execution_event.event
1186 {
1187 Ok(CreateRequest {
1188 created_at: execution_event.created_at,
1189 execution_id: execution_id.clone(),
1190 ffqn,
1191 params,
1192 parent,
1193 scheduled_at,
1194 component_id,
1195 deployment_id,
1196 metadata,
1197 scheduled_by,
1198 })
1199 } else {
1200 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1201 reason: "execution log must start with creation".into(),
1202 context: SpanTrace::capture(),
1203 source: None,
1204 loc: Location::caller(),
1205 }))
1206 }
1207 }
1208
1209 async fn get_pending_state(
1210 &self,
1211 execution_id: &ExecutionId,
1212 ) -> Result<ExecutionWithState, DbErrorRead>;
1213
1214 async fn get_expired_timers(
1216 &self,
1217 at: DateTime<Utc>,
1218 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1219
1220 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1222
1223 async fn subscribe_to_next_responses(
1230 &self,
1231 execution_id: &ExecutionId,
1232 last_response: ResponseCursor,
1233 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1234 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1235
1236 async fn wait_for_finished_result(
1243 &self,
1244 execution_id: &ExecutionId,
1245 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1246 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1247
1248 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1249
1250 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1251
1252 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1253
1254 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1255
1256 #[cfg(feature = "test")]
1258 async fn get_finished_result(
1259 &self,
1260 execution_id: &ExecutionId,
1261 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1262 self.wait_for_finished_result(
1263 execution_id,
1264 Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1265 )
1266 .await
1267 }
1268}
1269
1270#[derive(Clone, Debug)]
1271pub struct LogInfoAppendRow {
1272 pub execution_id: ExecutionId,
1273 pub run_id: RunId,
1274 pub log_entry: LogEntry,
1275}
1276
1277#[derive(Debug, Clone)]
1278pub struct LogEntryRow {
1279 pub cursor: u32,
1280 pub run_id: RunId,
1281 pub log_entry: LogEntry,
1282}
1283
1284#[derive(Debug, Clone)]
1285pub enum LogEntry {
1286 Log {
1287 created_at: DateTime<Utc>,
1288 level: LogLevel,
1289 message: String,
1290 },
1291 Stream {
1292 created_at: DateTime<Utc>,
1293 payload: Vec<u8>,
1294 stream_type: LogStreamType,
1295 },
1296}
1297impl LogEntry {
1298 #[must_use]
1299 pub fn created_at(&self) -> DateTime<Utc> {
1300 match self {
1301 LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1302 }
1303 }
1304}
1305
1306#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1307#[try_from(repr)]
1308#[repr(u8)]
1309pub enum LogLevel {
1310 Trace = 1,
1311 Debug,
1312 Info,
1313 Warn,
1314 Error,
1315}
1316#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1317#[try_from(repr)]
1318#[repr(u8)]
1319pub enum LogStreamType {
1320 StdOut = 1,
1321 StdErr,
1322}
1323
1324#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1325pub enum TimeoutOutcome {
1326 Timeout,
1327 Cancel,
1328}
1329
1330#[cfg(feature = "test")]
1331#[async_trait]
1332pub trait DbConnectionTest: DbConnection {
1333 async fn append_response(
1334 &self,
1335 created_at: DateTime<Utc>,
1336 execution_id: ExecutionId,
1337 response_event: JoinSetResponseEvent,
1338 ) -> Result<(), DbErrorWrite>;
1339}
1340
1341#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1342pub enum CancelOutcome {
1343 Cancelled,
1344 AlreadyFinished,
1345}
1346
1347#[instrument(skip(db_connection))]
1348pub async fn stub_execution(
1349 db_connection: &dyn DbConnection,
1350 execution_id: ExecutionIdDerived,
1351 parent_execution_id: ExecutionId,
1352 join_set_id: JoinSetId,
1353 created_at: DateTime<Utc>,
1354 return_value: SupportedFunctionReturnValue,
1355) -> Result<(), DbErrorWrite> {
1356 let stub_finished_version = Version::new(1); let write_attempt = {
1359 let finished_req = AppendRequest {
1360 created_at,
1361 event: ExecutionRequest::Finished {
1362 result: return_value.clone(),
1363 http_client_traces: None,
1364 },
1365 };
1366 db_connection
1367 .append_batch_respond_to_parent(
1368 AppendEventsToExecution {
1369 execution_id: ExecutionId::Derived(execution_id.clone()),
1370 version: stub_finished_version.clone(),
1371 batch: vec![finished_req],
1372 },
1373 AppendResponseToExecution {
1374 parent_execution_id,
1375 created_at,
1376 join_set_id,
1377 child_execution_id: execution_id.clone(),
1378 finished_version: stub_finished_version.clone(),
1379 result: return_value.clone(),
1380 },
1381 created_at,
1382 )
1383 .await
1384 };
1385 if let Err(write_attempt) = write_attempt {
1386 debug!("Stub write attempt failed - {write_attempt:?}");
1388
1389 let found = db_connection
1390 .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1391 .await?; match found.event {
1393 ExecutionRequest::Finished {
1394 result: found_result,
1395 ..
1396 } if return_value == found_result => {
1397 Ok(())
1399 }
1400 ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1401 DbErrorWriteNonRetriable::Conflict,
1402 )),
1403 _other => Err(DbErrorWrite::NonRetriable(
1404 DbErrorWriteNonRetriable::IllegalState {
1405 reason: "unexpected execution event at stubbed execution".into(),
1406 context: SpanTrace::capture(),
1407 source: None,
1408 loc: Location::caller(),
1409 },
1410 )),
1411 }
1412 } else {
1413 Ok(())
1414 }
1415}
1416
1417pub async fn cancel_delay(
1418 db_connection: &dyn DbConnection,
1419 delay_id: DelayId,
1420 created_at: DateTime<Utc>,
1421) -> Result<CancelOutcome, DbErrorWrite> {
1422 let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1423 db_connection
1424 .append_delay_response(
1425 created_at,
1426 parent_execution_id,
1427 join_set_id,
1428 delay_id,
1429 Err(()), )
1431 .await
1432 .map(|ok| match ok {
1433 AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1434 CancelOutcome::Cancelled
1435 }
1436 AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1437 })
1438}
1439
1440#[derive(Clone, Debug)]
1441pub enum BacktraceFilter {
1442 First,
1443 Last,
1444 Specific(Version),
1445}
1446
1447#[derive(Clone, Debug, PartialEq, Eq)]
1448pub struct BacktraceInfo {
1449 pub execution_id: ExecutionId,
1450 pub component_id: ComponentId,
1451 pub version_min_including: Version,
1452 pub version_max_excluding: Version,
1453 pub wasm_backtrace: WasmBacktrace,
1454}
1455
1456#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1457pub struct WasmBacktrace {
1458 pub frames: Vec<FrameInfo>,
1459}
1460
1461#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1462pub struct FrameInfo {
1463 pub module: String,
1464 pub func_name: String,
1465 pub symbols: Vec<FrameSymbol>,
1466}
1467
1468#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
1469pub struct FrameSymbol {
1470 pub func_name: Option<String>,
1471 pub file: Option<String>,
1472 pub line: Option<u32>,
1473 pub col: Option<u32>,
1474}
1475
1476mod wasm_backtrace {
1477 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1478
1479 impl WasmBacktrace {
1480 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1481 if backtrace.frames().is_empty() {
1482 None
1483 } else {
1484 Some(Self {
1485 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1486 })
1487 }
1488 }
1489 }
1490
1491 impl From<&wasmtime::FrameInfo> for FrameInfo {
1492 fn from(frame: &wasmtime::FrameInfo) -> Self {
1493 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1494 let mut func_name = String::new();
1495 wasmtime_environ::demangle_function_name_or_index(
1496 &mut func_name,
1497 frame.func_name(),
1498 frame.func_index() as usize,
1499 )
1500 .expect("writing to string must succeed");
1501 Self {
1502 module: module_name,
1503 func_name,
1504 symbols: frame
1505 .symbols()
1506 .iter()
1507 .map(std::convert::Into::into)
1508 .collect(),
1509 }
1510 }
1511 }
1512
1513 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1514 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1515 let func_name = symbol.name().map(|name| {
1516 let mut writer = String::new();
1517 wasmtime_environ::demangle_function_name(&mut writer, name)
1518 .expect("writing to string must succeed");
1519 writer
1520 });
1521
1522 Self {
1523 func_name,
1524 file: symbol.file().map(ToString::to_string),
1525 line: symbol.line(),
1526 col: symbol.column(),
1527 }
1528 }
1529 }
1530}
1531#[derive(Debug, Clone, derive_more::Display)]
1532#[display("{execution_id} {pending_state} {component_digest}")]
1533pub struct ExecutionWithState {
1534 pub execution_id: ExecutionId,
1535 pub ffqn: FunctionFqn,
1536 pub pending_state: PendingState,
1537 pub created_at: DateTime<Utc>,
1538 pub first_scheduled_at: DateTime<Utc>,
1539 pub component_digest: InputContentDigest,
1540 pub component_type: ComponentType,
1541 pub deployment_id: DeploymentId,
1542}
1543
1544#[derive(Debug, Clone)]
1545pub enum ExecutionListPagination {
1546 CreatedBy(Pagination<Option<DateTime<Utc>>>),
1547 ExecutionId(Pagination<Option<ExecutionId>>),
1548}
1549impl Default for ExecutionListPagination {
1550 fn default() -> ExecutionListPagination {
1551 ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1552 length: 20,
1553 cursor: None,
1554 including_cursor: false, })
1556 }
1557}
1558impl ExecutionListPagination {
1559 #[must_use]
1560 pub fn length(&self) -> u16 {
1561 match self {
1562 ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1563 ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1564 }
1565 }
1566}
1567
1568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1569pub enum Pagination<T> {
1570 NewerThan {
1571 length: u16,
1572 cursor: T,
1573 including_cursor: bool,
1574 },
1575 OlderThan {
1576 length: u16,
1577 cursor: T,
1578 including_cursor: bool,
1579 },
1580}
1581impl<T: Clone> Pagination<T> {
1582 pub fn length(&self) -> u16 {
1583 match self {
1584 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1585 }
1586 }
1587
1588 pub fn rel(&self) -> &'static str {
1589 match self {
1590 Pagination::NewerThan {
1591 including_cursor: false,
1592 ..
1593 } => ">",
1594 Pagination::NewerThan {
1595 including_cursor: true,
1596 ..
1597 } => ">=",
1598 Pagination::OlderThan {
1599 including_cursor: false,
1600 ..
1601 } => "<",
1602 Pagination::OlderThan {
1603 including_cursor: true,
1604 ..
1605 } => "<=",
1606 }
1607 }
1608
1609 pub fn is_desc(&self) -> bool {
1610 matches!(self, Pagination::OlderThan { .. })
1611 }
1612
1613 pub fn asc_or_desc(&self) -> &'static str {
1614 if self.is_asc() { "asc" } else { "desc" }
1615 }
1616
1617 pub fn is_asc(&self) -> bool {
1618 !self.is_desc()
1619 }
1620
1621 pub fn cursor(&self) -> &T {
1622 match self {
1623 Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1624 }
1625 }
1626
1627 #[must_use]
1628 pub fn invert(&self) -> Self {
1629 match self {
1630 Pagination::NewerThan {
1631 length,
1632 cursor,
1633 including_cursor,
1634 } => Pagination::OlderThan {
1635 length: *length,
1636 cursor: cursor.clone(),
1637 including_cursor: !including_cursor,
1638 },
1639 Pagination::OlderThan {
1640 length,
1641 cursor,
1642 including_cursor,
1643 } => Pagination::NewerThan {
1644 length: *length,
1645 cursor: cursor.clone(),
1646 including_cursor: !including_cursor,
1647 },
1648 }
1649 }
1650}
1651
1652#[cfg(feature = "test")]
1653pub async fn wait_for_pending_state_fn<T: Debug>(
1654 db_connection: &dyn DbConnectionTest,
1655 execution_id: &ExecutionId,
1656 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1657 timeout: Option<Duration>,
1658) -> Result<T, DbErrorReadWithTimeout> {
1659 tracing::trace!(%execution_id, "Waiting for predicate");
1660 let fut = async move {
1661 loop {
1662 let execution_log = db_connection.get(execution_id).await?;
1663 if let Some(t) = predicate(execution_log) {
1664 tracing::debug!(%execution_id, "Found: {t:?}");
1665 return Ok(t);
1666 }
1667 tokio::time::sleep(Duration::from_millis(10)).await;
1668 }
1669 };
1670
1671 if let Some(timeout) = timeout {
1672 tokio::select! { res = fut => res,
1674 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
1675 }
1676 } else {
1677 fut.await
1678 }
1679}
1680
1681#[derive(Debug, Clone, PartialEq, Eq)]
1682pub enum ExpiredTimer {
1683 Lock(ExpiredLock),
1684 Delay(ExpiredDelay),
1685}
1686
1687#[derive(Debug, Clone, PartialEq, Eq)]
1688pub struct ExpiredLock {
1689 pub execution_id: ExecutionId,
1690 pub locked_at_version: Version,
1692 pub next_version: Version,
1693 pub intermittent_event_count: u32,
1695 pub max_retries: Option<u32>,
1696 pub retry_exp_backoff: Duration,
1697 pub locked_by: LockedBy,
1698}
1699
1700#[derive(Debug, Clone, PartialEq, Eq)]
1701pub struct ExpiredDelay {
1702 pub execution_id: ExecutionId,
1703 pub join_set_id: JoinSetId,
1704 pub delay_id: DelayId,
1705}
1706
1707#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1708#[serde(tag = "status", rename_all = "snake_case")]
1709pub enum PendingState {
1710 Locked(PendingStateLocked),
1712
1713 #[display("PendingAt(`{_0}`)")]
1714 PendingAt(PendingStatePendingAt),
1715
1716 #[display("BlockedByJoinSet({_0})")]
1718 BlockedByJoinSet(PendingStateBlockedByJoinSet),
1719
1720 #[display("Paused({_0})")]
1721 Paused(PendingStatePaused),
1722
1723 #[display("Finished({_0})")]
1724 Finished(PendingStateFinished),
1725}
1726
1727pub enum PendingStateMergedPause {
1728 Locked {
1729 state: PendingStateLocked,
1730 paused: bool,
1731 },
1732 PendingAt {
1733 state: PendingStatePendingAt,
1734 paused: bool,
1735 },
1736 BlockedByJoinSet {
1737 state: PendingStateBlockedByJoinSet,
1738 paused: bool,
1739 },
1740 Finished(PendingStateFinished),
1741}
1742impl From<PendingState> for PendingStateMergedPause {
1743 fn from(state: PendingState) -> Self {
1744 match state {
1745 PendingState::Locked(s) => PendingStateMergedPause::Locked {
1746 state: s,
1747 paused: false,
1748 },
1749
1750 PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
1751 state: s,
1752 paused: false,
1753 },
1754
1755 PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
1756 state: s,
1757 paused: false,
1758 },
1759
1760 PendingState::Paused(paused) => match paused {
1761 PendingStatePaused::Locked(s) => PendingStateMergedPause::Locked {
1762 state: s,
1763 paused: true,
1764 },
1765 PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
1766 state: s,
1767 paused: true,
1768 },
1769 PendingStatePaused::BlockedByJoinSet(s) => {
1770 PendingStateMergedPause::BlockedByJoinSet {
1771 state: s,
1772 paused: true,
1773 }
1774 }
1775 },
1776
1777 PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
1778 }
1779 }
1780}
1781
1782#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1783#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
1784pub struct PendingStateLocked {
1785 pub locked_by: LockedBy,
1786 pub lock_expires_at: DateTime<Utc>,
1787}
1788
1789#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1790#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
1791pub struct PendingStatePendingAt {
1792 pub scheduled_at: DateTime<Utc>,
1793 pub last_lock: Option<LockedBy>,
1795}
1796
1797#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1798#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
1799pub struct PendingStateBlockedByJoinSet {
1800 pub join_set_id: JoinSetId,
1801 pub lock_expires_at: DateTime<Utc>,
1803 pub closing: bool,
1805}
1806
1807#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize)]
1809pub enum PendingStatePaused {
1810 #[display("Locked({_0})")]
1811 Locked(PendingStateLocked),
1812 #[display("PendingAt({_0})")]
1813 PendingAt(PendingStatePendingAt),
1814 #[display("BlockedByJoinSet({_0})")]
1815 BlockedByJoinSet(PendingStateBlockedByJoinSet),
1816}
1817
1818#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1819pub struct LockedBy {
1820 pub executor_id: ExecutorId,
1821 pub run_id: RunId,
1822}
1823impl From<&Locked> for LockedBy {
1824 fn from(value: &Locked) -> Self {
1825 LockedBy {
1826 executor_id: value.executor_id,
1827 run_id: value.run_id,
1828 }
1829 }
1830}
1831
1832#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
1833#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
1834pub struct PendingStateFinished {
1835 pub version: VersionType, pub finished_at: DateTime<Utc>,
1837 pub result_kind: PendingStateFinishedResultKind,
1838}
1839impl Display for PendingStateFinished {
1840 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1841 match self.result_kind {
1842 PendingStateFinishedResultKind::Ok => write!(f, "ok"),
1843 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
1844 }
1845 }
1846}
1847
1848#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1850#[serde(rename_all = "snake_case")]
1851pub enum PendingStateFinishedResultKind {
1852 Ok,
1853 Err(PendingStateFinishedError),
1854}
1855impl PendingStateFinishedResultKind {
1856 pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
1857 match self {
1858 PendingStateFinishedResultKind::Ok => Ok(()),
1859 PendingStateFinishedResultKind::Err(err) => Err(err),
1860 }
1861 }
1862}
1863
1864impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
1865 fn from(result: &SupportedFunctionReturnValue) -> Self {
1866 result.as_pending_state_finished_result()
1867 }
1868}
1869
1870#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, derive_more::Display)]
1871#[serde(rename_all = "snake_case")]
1872pub enum PendingStateFinishedError {
1873 #[display("execution terminated: {_0}")]
1874 ExecutionFailure(ExecutionFailureKind),
1875 #[display("execution completed with an error")]
1876 Error,
1877}
1878
1879impl PendingState {
1880 #[instrument(skip(self))]
1881 pub fn can_append_lock(
1882 &self,
1883 created_at: DateTime<Utc>,
1884 executor_id: ExecutorId,
1885 run_id: RunId,
1886 lock_expires_at: DateTime<Utc>,
1887 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
1888 if lock_expires_at <= created_at {
1889 return Err(DbErrorWriteNonRetriable::ValidationFailed(
1890 "invalid expiry date".into(),
1891 ));
1892 }
1893 match self {
1894 PendingState::PendingAt(PendingStatePendingAt {
1895 scheduled_at,
1896 last_lock,
1897 }) => {
1898 if *scheduled_at <= created_at {
1899 Ok(LockKind::CreatingNewLock)
1901 } else if let Some(LockedBy {
1902 executor_id: last_executor_id,
1903 run_id: last_run_id,
1904 }) = last_lock
1905 && executor_id == *last_executor_id
1906 && run_id == *last_run_id
1907 {
1908 Ok(LockKind::Extending)
1910 } else {
1911 Err(DbErrorWriteNonRetriable::ValidationFailed(
1912 "cannot lock, not yet pending".into(),
1913 ))
1914 }
1915 }
1916 PendingState::Locked(PendingStateLocked {
1917 locked_by:
1918 LockedBy {
1919 executor_id: current_pending_state_executor_id,
1920 run_id: current_pending_state_run_id,
1921 },
1922 lock_expires_at: _,
1923 }) => {
1924 if executor_id == *current_pending_state_executor_id
1925 && run_id == *current_pending_state_run_id
1926 {
1927 Ok(LockKind::Extending)
1929 } else {
1930 Err(DbErrorWriteNonRetriable::IllegalState {
1931 reason: "cannot lock, already locked".into(),
1932 context: SpanTrace::capture(),
1933 source: None,
1934 loc: Location::caller(),
1935 })
1936 }
1937 }
1938 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
1939 reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
1940 context: SpanTrace::capture(),
1941 source: None,
1942 loc: Location::caller(),
1943 }),
1944 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
1945 reason: "already finished".into(),
1946 context: SpanTrace::capture(),
1947 source: None,
1948 loc: Location::caller(),
1949 }),
1950 PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
1951 reason: "cannot lock, execution is paused".into(),
1952 context: SpanTrace::capture(),
1953 source: None,
1954 loc: Location::caller(),
1955 }),
1956 }
1957 }
1958
1959 #[must_use]
1960 pub fn is_finished(&self) -> bool {
1961 matches!(self, PendingState::Finished { .. })
1962 }
1963
1964 #[must_use]
1965 pub fn is_paused(&self) -> bool {
1966 matches!(self, PendingState::Paused(_))
1967 }
1968}
1969
1970#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1971pub enum LockKind {
1972 Extending,
1973 CreatingNewLock,
1974}
1975
1976pub mod http_client_trace {
1977 use chrono::{DateTime, Utc};
1978 use serde::{Deserialize, Serialize};
1979
1980 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1981 pub struct HttpClientTrace {
1982 pub req: RequestTrace,
1983 pub resp: Option<ResponseTrace>,
1984 }
1985
1986 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1987 pub struct RequestTrace {
1988 pub sent_at: DateTime<Utc>,
1989 pub uri: String,
1990 pub method: String,
1991 }
1992
1993 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1994 pub struct ResponseTrace {
1995 pub finished_at: DateTime<Utc>,
1996 pub status: Result<u16, String>,
1997 }
1998}
1999
2000#[cfg(test)]
2001mod tests {
2002 use super::HistoryEventScheduleAt;
2003 use super::PendingStateFinished;
2004 use super::PendingStateFinishedError;
2005 use super::PendingStateFinishedResultKind;
2006 use crate::ExecutionFailureKind;
2007 use crate::SupportedFunctionReturnValue;
2008 use chrono::DateTime;
2009 use chrono::Datelike;
2010 use chrono::Utc;
2011 use insta::assert_snapshot;
2012 use rstest::rstest;
2013 use std::time::Duration;
2014 use val_json::type_wrapper::TypeWrapper;
2015 use val_json::wast_val::WastVal;
2016 use val_json::wast_val::WastValWithType;
2017
2018 #[rstest(expected => [
2019 PendingStateFinishedResultKind::Ok,
2020 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2021 ])]
2022 #[test]
2023 fn serde_pending_state_finished_result_kind_should_work(
2024 expected: PendingStateFinishedResultKind,
2025 ) {
2026 let ser = serde_json::to_string(&expected).unwrap();
2027 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2028 assert_eq!(expected, actual);
2029 }
2030
2031 #[rstest(result_kind => [
2032 PendingStateFinishedResultKind::Ok,
2033 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2034 ])]
2035 #[test]
2036 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2037 let expected = PendingStateFinished {
2038 version: 0,
2039 finished_at: Utc::now(),
2040 result_kind,
2041 };
2042
2043 let ser = serde_json::to_string(&expected).unwrap();
2044 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2045 assert_eq!(expected, actual);
2046 }
2047
2048 #[test]
2049 fn join_set_deser_with_result_ok_option_none_should_work() {
2050 let expected = SupportedFunctionReturnValue::Ok {
2051 ok: Some(WastValWithType {
2052 r#type: TypeWrapper::Result {
2053 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2054 err: Some(Box::new(TypeWrapper::String)),
2055 },
2056 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2057 }),
2058 };
2059 let json = serde_json::to_string(&expected).unwrap();
2060 assert_snapshot!(json);
2061
2062 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2063
2064 assert_eq!(expected, actual);
2065 }
2066
2067 #[test]
2068 fn as_date_time_should_work_with_duration_u32_max_secs() {
2069 let duration = Duration::from_secs(u64::from(u32::MAX));
2070 let schedule_at = HistoryEventScheduleAt::In(duration);
2071 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2072 assert_eq!(2106, resolved.year());
2073 }
2074
2075 const MILLIS_PER_SEC: i64 = 1000;
2076 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2077
2078 #[test]
2079 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2080 let duration = Duration::from_secs(
2082 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2083 );
2084 let schedule_at = HistoryEventScheduleAt::In(duration);
2085 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2086 }
2087}