1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionError;
8use crate::FunctionFqn;
9use crate::JoinSetId;
10use crate::Params;
11use crate::StrVariant;
12use crate::SupportedFunctionReturnValue;
13use crate::component_id::ComponentDigest;
14use crate::prefixed_ulid::DelayId;
15use crate::prefixed_ulid::DeploymentId;
16use crate::prefixed_ulid::ExecutionIdDerived;
17use crate::prefixed_ulid::ExecutorId;
18use crate::prefixed_ulid::RunId;
19use assert_matches::assert_matches;
20use async_trait::async_trait;
21use chrono::TimeDelta;
22use chrono::{DateTime, Utc};
23use http_client_trace::HttpClientTrace;
24use serde::Deserialize;
25use serde::Serialize;
26use std::fmt::Debug;
27use std::fmt::Display;
28use std::panic::Location;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33use tracing::instrument;
34use tracing_error::SpanTrace;
35
36pub const STATE_PENDING_AT: &str = "pending_at";
38pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
39pub const STATE_LOCKED: &str = "locked";
40pub const STATE_FINISHED: &str = "finished";
41pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; #[derive(Debug, PartialEq, Eq, Clone)]
44pub struct ExecutionLog {
45 pub execution_id: ExecutionId,
46 pub events: Vec<ExecutionEvent>,
47 pub responses: Vec<ResponseWithCursor>,
48 pub next_version: Version, pub pending_state: PendingState, pub component_digest: ComponentDigest, pub component_type: ComponentType,
52 pub deployment_id: DeploymentId, }
54
55impl ExecutionLog {
56 #[must_use]
59 pub fn can_be_retried_after(
60 temporary_event_count: u32,
61 max_retries: Option<u32>,
62 retry_exp_backoff: Duration,
63 ) -> Option<Duration> {
64 if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
66 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
68 Some(duration)
69 } else {
70 None
71 }
72 }
73
74 #[must_use]
75 pub fn compute_retry_duration_when_retrying_forever(
76 temporary_event_count: u32,
77 retry_exp_backoff: Duration,
78 ) -> Duration {
79 Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
80 .expect("`max_retries` set to MAX must never return None")
81 }
82
83 #[must_use]
84 pub fn get_create_request(&self) -> CreateRequest {
85 assert_matches!(self.events.first().cloned(), Some(ExecutionEvent {
86 event:ExecutionRequest::Created{
87 ffqn,params,parent,scheduled_at,component_id,deployment_id,metadata,scheduled_by},
88 created_at, .. }) => CreateRequest { created_at, execution_id:
89 self.execution_id.clone(), ffqn, params, parent, scheduled_at,
90 component_id, deployment_id, metadata, scheduled_by })
91 }
92
93 #[must_use]
94 pub fn ffqn(&self) -> &FunctionFqn {
95 assert_matches!(self.events.first(), Some(ExecutionEvent {
96 event: ExecutionRequest::Created { ffqn, .. },
97 ..
98 }) => ffqn)
99 }
100
101 #[must_use]
102 pub fn params(&self) -> &Params {
103 assert_matches!(self.events.first(), Some(ExecutionEvent {
104 event: ExecutionRequest::Created { params, .. },
105 ..
106 }) => params)
107 }
108
109 #[must_use]
110 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
111 assert_matches!(self.events.first(), Some(ExecutionEvent {
112 event: ExecutionRequest::Created { parent, .. },
113 ..
114 }) => parent.clone())
115 }
116
117 #[must_use]
118 pub fn last_event(&self) -> &ExecutionEvent {
119 self.events.last().expect("must contain at least one event")
120 }
121
122 #[must_use]
123 pub fn is_finished(&self) -> bool {
124 matches!(
125 self.events.last(),
126 Some(ExecutionEvent {
127 event: ExecutionRequest::Finished { .. },
128 ..
129 })
130 )
131 }
132
133 #[must_use]
134 pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
135 if let ExecutionEvent {
136 event: ExecutionRequest::Finished { retval: result, .. },
137 ..
138 } = self.events.last().expect("must contain at least one event")
139 {
140 Some(result.clone())
141 } else {
142 None
143 }
144 }
145
146 pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
147 self.events.iter().filter_map(|event| {
148 if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
149 Some((eh.clone(), event.version.clone()))
150 } else {
151 None
152 }
153 })
154 }
155
156 #[cfg(feature = "test")]
157 #[must_use]
158 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
159 self.events
160 .iter()
161 .find_map(move |event| match &event.event {
162 ExecutionRequest::HistoryEvent {
163 event:
164 HistoryEvent::JoinSetRequest {
165 join_set_id: found,
166 request,
167 },
168 ..
169 } if *join_set_id == *found => Some(request),
170 _ => None,
171 })
172 }
173}
174
175pub type VersionType = u32;
176#[derive(
177 Debug,
178 Default,
179 Clone,
180 PartialEq,
181 Eq,
182 Hash,
183 derive_more::Display,
184 derive_more::Into,
185 serde::Serialize,
186 serde::Deserialize,
187 schemars::JsonSchema,
188)]
189#[serde(transparent)]
190#[schemars(transparent)]
191pub struct Version(pub VersionType);
192impl Version {
193 #[must_use]
194 pub fn new(arg: VersionType) -> Version {
195 Version(arg)
196 }
197
198 #[must_use]
199 pub fn increment(&self) -> Version {
200 Version(self.0 + 1)
201 }
202}
203impl TryFrom<i64> for Version {
204 type Error = VersionParseError;
205 fn try_from(value: i64) -> Result<Self, Self::Error> {
206 VersionType::try_from(value)
207 .map(Version::new)
208 .map_err(|_| VersionParseError)
209 }
210}
211impl From<Version> for usize {
212 fn from(value: Version) -> Self {
213 usize::try_from(value.0).expect("16 bit systems are unsupported")
214 }
215}
216impl From<&Version> for usize {
217 fn from(value: &Version) -> Self {
218 usize::try_from(value.0).expect("16 bit systems are unsupported")
219 }
220}
221
222#[derive(Debug, thiserror::Error)]
223#[error("version must be u32")]
224pub struct VersionParseError;
225
226#[derive(
227 Clone,
228 Debug,
229 derive_more::Display,
230 PartialEq,
231 Eq,
232 serde::Serialize,
233 serde::Deserialize,
234 schemars::JsonSchema,
235)]
236#[display("{event}")]
237pub struct ExecutionEvent {
238 pub created_at: DateTime<Utc>,
239 pub event: ExecutionRequest,
240 #[serde(skip_serializing_if = "Option::is_none")]
241 pub backtrace_id: Option<Version>,
242 pub version: Version,
243}
244
245#[derive(
246 Debug,
247 Clone,
248 Copy,
249 PartialEq,
250 Eq,
251 derive_more::Display,
252 derive_more::Into,
253 Serialize, schemars::JsonSchema,
255)]
256pub struct ResponseCursor(pub u32);
257
258#[derive(Debug, Clone, PartialEq, Eq, Serialize , schemars::JsonSchema)]
259pub struct ResponseWithCursor {
260 pub event: JoinSetResponseEventOuter,
261 pub cursor: ResponseCursor,
262}
263
264#[derive(Debug)]
265pub struct ListExecutionEventsResponse {
266 pub events: Vec<ExecutionEvent>,
267 pub max_version: Version,
268}
269
270#[derive(Debug)]
271pub struct ListResponsesResponse {
272 pub responses: Vec<ResponseWithCursor>,
273 pub max_cursor: ResponseCursor,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq, Serialize , schemars::JsonSchema)]
277pub struct JoinSetResponseEventOuter {
278 pub created_at: DateTime<Utc>,
279 pub event: JoinSetResponseEvent,
280}
281
282#[derive(
283 Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
284)]
285pub struct JoinSetResponseEvent {
286 pub join_set_id: JoinSetId,
287 pub event: JoinSetResponse,
288}
289
290#[derive(
291 Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display, schemars::JsonSchema,
292)]
293#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
294#[serde(tag = "type", rename_all = "snake_case")]
295pub enum JoinSetResponse {
296 #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
297 DelayFinished {
298 delay_id: DelayId,
299 result: Result<(), ()>,
300 },
301 #[display("{result}: {child_execution_id}")] ChildExecutionFinished {
303 child_execution_id: ExecutionIdDerived,
304 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
305 finished_version: Version,
306 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
307 result: SupportedFunctionReturnValue,
308 },
309}
310
311pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
312 ffqn: FunctionFqn::new_static("", ""),
313 params: Params::empty(),
314 parent: None,
315 scheduled_at: DateTime::from_timestamp_nanos(0),
316 component_id: ComponentId::dummy_activity(),
317 deployment_id: DeploymentId::from_parts(0, 0),
318 metadata: ExecutionMetadata::empty(),
319 scheduled_by: None,
320};
321pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
322 event: HistoryEvent::JoinSetCreate {
323 join_set_id: JoinSetId {
324 kind: crate::JoinSetKind::OneOff,
325 name: StrVariant::empty(),
326 },
327 },
328};
329
330#[derive(
331 Clone,
332 derive_more::Debug,
333 derive_more::Display,
334 PartialEq,
335 Eq,
336 Serialize,
337 Deserialize,
338 schemars::JsonSchema,
339)]
340#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
341#[serde(rename_all = "snake_case")]
342pub enum ExecutionRequest {
343 #[display("Created({ffqn}, `{scheduled_at}`)")]
344 Created {
345 ffqn: FunctionFqn,
346 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
347 #[debug(skip)]
348 params: Params,
349 parent: Option<(ExecutionId, JoinSetId)>,
350 scheduled_at: DateTime<Utc>,
351 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
352 component_id: ComponentId,
353 deployment_id: DeploymentId,
354 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
355 metadata: ExecutionMetadata,
356 scheduled_by: Option<ExecutionId>,
357 },
358 Locked(Locked),
359 #[display("Unlocked(`{backoff_expires_at}`)")]
365 Unlocked {
366 backoff_expires_at: DateTime<Utc>,
367 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
368 reason: StrVariant,
369 },
370 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
373 TemporarilyFailed {
374 backoff_expires_at: DateTime<Utc>,
375 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
376 reason: StrVariant,
377 detail: Option<String>,
378 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
379 http_client_traces: Option<Vec<HttpClientTrace>>,
380 },
381 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
384 TemporarilyTimedOut {
385 backoff_expires_at: DateTime<Utc>,
386 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
387 http_client_traces: Option<Vec<HttpClientTrace>>,
388 },
389 #[display("Finished")]
391 Finished {
392 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
393 retval: SupportedFunctionReturnValue,
394 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
395 http_client_traces: Option<Vec<HttpClientTrace>>,
396 },
397
398 #[display("HistoryEvent({event})")]
399 HistoryEvent {
400 event: HistoryEvent,
401 },
402 #[display("Paused")]
403 Paused,
404 #[display("Unpaused")]
405 Unpaused,
406}
407
408impl ExecutionRequest {
409 #[must_use]
410 pub fn is_temporary_event(&self) -> bool {
411 matches!(
412 self,
413 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
414 )
415 }
416
417 #[must_use]
419 pub const fn variant(&self) -> &'static str {
420 match self {
421 ExecutionRequest::Created { .. } => "created",
422 ExecutionRequest::Locked(_) => "locked",
423 ExecutionRequest::Unlocked { .. } => "unlocked",
424 ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
425 ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
426 ExecutionRequest::Finished { .. } => "finished",
427 ExecutionRequest::HistoryEvent { .. } => "history_event",
428 ExecutionRequest::Paused => "paused",
429 ExecutionRequest::Unpaused => "unpaused",
430 }
431 }
432
433 #[must_use]
434 pub fn join_set_id(&self) -> Option<&JoinSetId> {
435 match self {
436 Self::Created {
437 parent: Some((_parent_id, join_set_id)),
438 ..
439 } => Some(join_set_id),
440 Self::HistoryEvent {
441 event:
442 HistoryEvent::JoinSetCreate { join_set_id, .. }
443 | HistoryEvent::JoinSetRequest { join_set_id, .. }
444 | HistoryEvent::JoinNext { join_set_id, .. },
445 } => Some(join_set_id),
446 _ => None,
447 }
448 }
449}
450
451#[derive(
452 Clone,
453 derive_more::Debug,
454 derive_more::Display,
455 PartialEq,
456 Eq,
457 Serialize,
458 Deserialize,
459 schemars::JsonSchema,
460)]
461#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
462#[display("Locked(`{lock_expires_at}`, {component_id})")]
463pub struct Locked {
464 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
465 pub component_id: ComponentId,
466 pub executor_id: ExecutorId,
467 pub deployment_id: DeploymentId,
468 pub run_id: RunId,
469 pub lock_expires_at: DateTime<Utc>,
470 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
471 pub retry_config: ComponentRetryConfig,
472}
473
474#[derive(
475 Debug,
476 Clone,
477 Copy,
478 PartialEq,
479 Eq,
480 derive_more::Display,
481 Serialize,
482 Deserialize,
483 schemars::JsonSchema,
484)]
485#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
486#[serde(tag = "type", rename_all = "snake_case")]
487pub enum PersistKind {
488 #[display("RandomU64({min}, {max_inclusive})")]
489 RandomU64 {
490 min: u64,
491 max_inclusive: u64,
492 },
493 #[display("RandomString({min_length}, {max_length_exclusive})")]
494 RandomString {
495 min_length: u64,
496 max_length_exclusive: u64,
497 },
498 ExecutionId,
499}
500
501#[must_use]
502pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
503 value.to_be_bytes()
504}
505
506#[derive(
507 derive_more::Debug,
508 Clone,
509 PartialEq,
510 Eq,
511 derive_more::Display,
512 Serialize,
513 Deserialize,
514 schemars::JsonSchema,
515)]
516#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
517#[serde(tag = "type", rename_all = "snake_case")]
518pub enum HistoryEvent {
520 #[display("Persist")]
522 Persist {
523 #[debug(skip)]
524 value: Vec<u8>, kind: PersistKind,
526 },
527 #[display("JoinSetCreate({join_set_id})")]
528 JoinSetCreate { join_set_id: JoinSetId },
529 #[display("JoinSetRequest({request})")]
530 JoinSetRequest {
532 join_set_id: JoinSetId,
533 request: JoinSetRequest,
534 },
535 #[display("JoinNext({join_set_id})")]
541 JoinNext {
542 join_set_id: JoinSetId,
543 run_expires_at: DateTime<Utc>,
546 requested_ffqn: Option<FunctionFqn>,
549 closing: bool,
551 },
552 #[display("JoinNextTry({join_set_id}, {outcome})")]
554 JoinNextTry {
555 join_set_id: JoinSetId,
556 outcome: JoinNextTryOutcome,
557 },
558 #[display("JoinNextTooMany({join_set_id})")]
560 JoinNextTooMany {
561 join_set_id: JoinSetId,
562 requested_ffqn: Option<FunctionFqn>,
565 },
566 #[display("Schedule({execution_id}, {schedule_at})")]
567 Schedule {
568 execution_id: ExecutionId,
569 schedule_at: HistoryEventScheduleAt, #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
571 result: Result<(), ScheduleRequestError>,
572 },
573 #[display("Stub({target_execution_id})")]
574 Stub {
575 target_execution_id: ExecutionIdDerived,
576 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StubRetVal::Typed(crate::SUPPORTED_RETURN_VALUE_OK_EMPTY).hash()))]
577 retval_hash: StubRetValHash,
578 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
579 result: Result<(), StubError>,
580 },
581}
582
583#[derive(derive_more::Debug, Clone, PartialEq, Eq)]
585#[cfg_attr(any(test, feature = "test"), derive(Serialize, Deserialize))]
586#[cfg_attr(any(test, feature = "test"), serde(rename_all = "snake_case"))]
587pub enum StubRetVal {
588 Typed(SupportedFunctionReturnValue),
589 Untyped(String),
590}
591
592impl StubRetVal {
593 #[must_use]
595 pub fn hash(&self) -> StubRetValHash {
596 use sha2::{Digest as _, Sha256};
597 const STUB_RETVAL_HASH_VERSION: u8 = 1;
598 let mut hasher = Sha256::default();
599
600 match self {
601 StubRetVal::Typed(val) => {
602 hasher.update(b"T|");
603 let json = serde_json::to_string(val)
605 .expect("SupportedFunctionReturnValue is always serializable");
606 hasher.update(json.as_bytes());
607 }
608 StubRetVal::Untyped(s) => {
609 hasher.update(b"U|");
610 hasher.update(s.as_bytes());
611 }
612 }
613
614 let hash_bytes = hasher.finalize();
615 let mut result = [0u8; 33];
616 result[0] = STUB_RETVAL_HASH_VERSION;
617 result[1..].copy_from_slice(&hash_bytes);
618
619 StubRetValHash(result)
620 }
621}
622
623#[derive(
626 Clone,
627 PartialEq,
628 Eq,
629 serde_with::SerializeDisplay,
630 serde_with::DeserializeFromStr,
631 schemars::JsonSchema,
632)]
633#[schemars(with = "String")]
634pub struct StubRetValHash([u8; 33]);
635
636impl Display for StubRetValHash {
637 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
638 for b in self.0 {
639 write!(f, "{b:02x}")?;
640 }
641 Ok(())
642 }
643}
644
645impl Debug for StubRetValHash {
646 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
647 Display::fmt(self, f)
648 }
649}
650
651impl std::str::FromStr for StubRetValHash {
652 type Err = StubRetValHashParseError;
653
654 fn from_str(s: &str) -> Result<Self, Self::Err> {
655 if s.len() != 66 {
656 return Err(StubRetValHashParseError::InvalidLength(s.len()));
658 }
659 let mut bytes = [0u8; 33];
660 for i in 0..33 {
661 let chunk = &s[i * 2..i * 2 + 2];
662 bytes[i] =
663 u8::from_str_radix(chunk, 16).map_err(|_| StubRetValHashParseError::InvalidHex)?;
664 }
665 Ok(StubRetValHash(bytes))
666 }
667}
668
669#[derive(Debug, thiserror::Error)]
670pub enum StubRetValHashParseError {
671 #[error("invalid length: expected 66 hex chars, got {0}")]
672 InvalidLength(usize),
673 #[error("invalid hex character")]
674 InvalidHex,
675}
676
677#[derive(
680 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
681)]
682#[serde(rename_all = "snake_case")]
683pub enum StubError {
684 #[error("execution not found")]
685 ExecutionNotFound,
686 #[error("type check error: {0}")]
687 TypeCheckError(String),
688 #[error("conflict")]
689 Conflict,
690}
691
692#[derive(
694 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
695)]
696#[serde(rename_all = "snake_case")]
697pub enum ScheduleRequestError {
698 #[error("function not found")]
699 FunctionNotFound,
700 #[error("params parsing error: {0}")]
701 TypeCheckError(String),
702}
703
704#[derive(
706 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
707)]
708#[serde(rename_all = "snake_case")]
709pub enum ChildExecutionRequestError {
710 #[error("function not found")]
711 FunctionNotFound,
712 #[error("params parsing error: {0}")]
713 TypeCheckError(String),
714}
715
716#[derive(
717 Debug,
718 Clone,
719 Copy,
720 PartialEq,
721 Eq,
722 derive_more::Display,
723 Serialize,
724 Deserialize,
725 schemars::JsonSchema,
726)]
727#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
728#[serde(rename_all = "snake_case")]
729pub enum JoinNextTryOutcome {
730 #[display("found")]
732 Found,
733 #[display("pending")]
735 Pending,
736 #[display("all_processed")]
738 AllProcessed,
739}
740
741impl From<bool> for JoinNextTryOutcome {
742 fn from(found_response: bool) -> Self {
746 if found_response {
747 JoinNextTryOutcome::Found
748 } else {
749 JoinNextTryOutcome::Pending
750 }
751 }
752}
753
754#[derive(
755 Debug,
756 Clone,
757 Copy,
758 PartialEq,
759 Eq,
760 derive_more::Display,
761 Serialize,
762 Deserialize,
763 schemars::JsonSchema,
764)]
765#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
766#[serde(rename_all = "snake_case")]
767pub enum HistoryEventScheduleAt {
768 Now,
769 #[display("At(`{_0}`)")]
770 At(DateTime<Utc>),
771 #[display("In({_0:?})")]
772 In(Duration),
773}
774
775#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
776pub enum ScheduleAtConversionError {
777 #[error("source duration value is out of range")]
778 OutOfRangeError,
779}
780
781impl HistoryEventScheduleAt {
782 pub fn as_date_time(
783 &self,
784 now: DateTime<Utc>,
785 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
786 match self {
787 Self::Now => Ok(now),
788 Self::At(date_time) => Ok(*date_time),
789 Self::In(duration) => {
790 let time_delta = TimeDelta::from_std(*duration)
791 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
792 now.checked_add_signed(time_delta)
793 .ok_or(ScheduleAtConversionError::OutOfRangeError)
794 }
795 }
796 }
797}
798
799#[derive(
800 Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize, schemars::JsonSchema,
801)]
802#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
803#[serde(tag = "type", rename_all = "snake_case")]
804pub enum JoinSetRequest {
805 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
807 DelayRequest {
808 delay_id: DelayId,
809 expires_at: DateTime<Utc>,
810 schedule_at: HistoryEventScheduleAt,
811 },
812 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
814 ChildExecutionRequest {
815 child_execution_id: ExecutionIdDerived,
816 target_ffqn: FunctionFqn,
817 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
818 params: Params,
819 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
820 result: Result<(), ChildExecutionRequestError>,
821 },
822}
823
824#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
826pub enum DbErrorGeneric {
827 #[error("database error: {reason}")]
828 Uncategorized {
829 reason: StrVariant,
830 #[eq(skip)]
831 #[partial_eq(skip)]
832 context: SpanTrace,
833 #[eq(skip)]
834 #[partial_eq(skip)]
835 #[source]
836 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
837 loc: &'static Location<'static>,
838 },
839 #[error("database was closed")]
840 Close,
841}
842
843#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
844pub enum DbErrorWriteNonRetriable {
845 #[error("validation failed: {0}")]
846 ValidationFailed(StrVariant),
847 #[error("conflict")]
848 Conflict,
849 #[error("already finished")]
850 AlreadyFinished,
851 #[error("illegal state: {reason}")]
852 IllegalState {
853 reason: StrVariant,
854 #[eq(skip)]
855 #[partial_eq(skip)]
856 context: SpanTrace,
857 #[eq(skip)]
858 #[partial_eq(skip)]
859 #[source]
860 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
861 loc: &'static Location<'static>,
862 },
863 #[error("version conflict: expected: {expected}, got: {requested}")]
864 VersionConflict {
865 expected: Version,
866 requested: Version,
867 },
868}
869
870#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
872pub enum DbErrorWrite {
873 #[error("cannot write - row not found")]
874 NotFound,
875 #[error("non-retriable error: {0}")]
876 NonRetriable(#[from] DbErrorWriteNonRetriable),
877 #[error(transparent)]
878 Generic(#[from] DbErrorGeneric),
879}
880
881#[derive(Debug, Clone, thiserror::Error, PartialEq)]
883pub enum DbErrorRead {
884 #[error("cannot read - row not found")]
885 NotFound,
886 #[error(transparent)]
887 Generic(#[from] DbErrorGeneric),
888}
889
890#[derive(Debug, thiserror::Error, PartialEq)]
891pub enum DbErrorReadWithTimeout {
892 #[error("timeout")]
893 Timeout(TimeoutOutcome),
894 #[error(transparent)]
895 DbErrorRead(#[from] DbErrorRead),
896}
897
898pub type AppendResponse = Version;
901pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
902
903#[derive(Debug, Clone)]
904pub struct LockedExecution {
905 pub execution_id: ExecutionId,
906 pub next_version: Version,
907 pub metadata: ExecutionMetadata,
908 pub locked_event: Locked,
909 pub ffqn: FunctionFqn,
910 pub params: Params,
911 pub event_history: Vec<(HistoryEvent, Version)>,
912 pub responses: Vec<ResponseWithCursor>,
913 pub parent: Option<(ExecutionId, JoinSetId)>,
914 pub intermittent_event_count: u32,
915}
916
917pub type LockPendingResponse = Vec<LockedExecution>;
918pub type AppendBatchResponse = Version;
919
920#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
921#[display("{event}")]
922pub struct AppendRequest {
923 pub created_at: DateTime<Utc>,
924 pub event: ExecutionRequest,
925}
926
927#[derive(Debug, Clone)]
928pub struct CreateRequest {
929 pub created_at: DateTime<Utc>,
930 pub execution_id: ExecutionId,
931 pub ffqn: FunctionFqn,
932 pub params: Params,
933 pub parent: Option<(ExecutionId, JoinSetId)>,
934 pub scheduled_at: DateTime<Utc>,
935 pub component_id: ComponentId,
936 pub deployment_id: DeploymentId,
937 pub metadata: ExecutionMetadata,
938 pub scheduled_by: Option<ExecutionId>,
939}
940
941impl From<CreateRequest> for ExecutionRequest {
942 fn from(value: CreateRequest) -> Self {
943 Self::Created {
944 ffqn: value.ffqn,
945 params: value.params,
946 parent: value.parent,
947 scheduled_at: value.scheduled_at,
948 component_id: value.component_id,
949 deployment_id: value.deployment_id,
950 metadata: value.metadata,
951 scheduled_by: value.scheduled_by,
952 }
953 }
954}
955
956#[async_trait]
957pub trait DbPool: Send + Sync {
958 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
959
960 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
961
962 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
963
964 #[cfg(feature = "test")]
965 async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
966}
967
968#[async_trait]
969pub trait DbPoolCloseable {
970 async fn close(&self);
971}
972
973#[derive(Clone, Debug)]
974pub struct AppendEventsToExecution {
975 pub execution_id: ExecutionId,
976 pub version: Version,
977 pub batch: Vec<AppendRequest>,
978}
979
980#[derive(Clone, Debug)]
981pub struct AppendResponseToExecution {
982 pub parent_execution_id: ExecutionId,
983 pub created_at: DateTime<Utc>,
984 pub join_set_id: JoinSetId,
985 pub child_execution_id: ExecutionIdDerived,
986 pub finished_version: Version,
987 pub result: SupportedFunctionReturnValue,
988}
989
990#[async_trait]
991pub trait DbExecutor: Send + Sync {
992 #[expect(clippy::too_many_arguments)]
993 async fn lock_pending_by_ffqns(
994 &self,
995 batch_size: u32,
996 pending_at_or_sooner: DateTime<Utc>,
997 ffqns: Arc<[FunctionFqn]>,
998 created_at: DateTime<Utc>,
999 component_id: ComponentId,
1000 deployment_id: DeploymentId,
1001 executor_id: ExecutorId,
1002 lock_expires_at: DateTime<Utc>,
1003 run_id: RunId,
1004 retry_config: ComponentRetryConfig,
1005 ) -> Result<LockPendingResponse, DbErrorWrite>;
1006
1007 #[expect(clippy::too_many_arguments)]
1008 async fn lock_pending_by_component_digest(
1009 &self,
1010 batch_size: u32,
1011 pending_at_or_sooner: DateTime<Utc>,
1012 component_id: &ComponentId,
1013 deployment_id: DeploymentId,
1014 created_at: DateTime<Utc>,
1015 executor_id: ExecutorId,
1016 lock_expires_at: DateTime<Utc>,
1017 run_id: RunId,
1018 retry_config: ComponentRetryConfig,
1019 ) -> Result<LockPendingResponse, DbErrorWrite>;
1020
1021 #[cfg(feature = "test")]
1022 #[expect(clippy::too_many_arguments)]
1023 async fn lock_one(
1024 &self,
1025 created_at: DateTime<Utc>,
1026 component_id: ComponentId,
1027 deployment_id: DeploymentId,
1028 execution_id: &ExecutionId,
1029 run_id: RunId,
1030 version: Version,
1031 executor_id: ExecutorId,
1032 lock_expires_at: DateTime<Utc>,
1033 retry_config: ComponentRetryConfig,
1034 ) -> Result<LockedExecution, DbErrorWrite>;
1035
1036 async fn append(
1039 &self,
1040 execution_id: ExecutionId,
1041 version: Version,
1042 req: AppendRequest,
1043 ) -> Result<AppendResponse, DbErrorWrite>;
1044
1045 async fn append_batch_respond_to_parent(
1048 &self,
1049 events: AppendEventsToExecution,
1050 response: AppendResponseToExecution,
1051 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
1053
1054 async fn wait_for_pending_by_ffqn(
1059 &self,
1060 pending_at_or_sooner: DateTime<Utc>,
1061 ffqns: Arc<[FunctionFqn]>,
1062 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1063 );
1064
1065 async fn wait_for_pending_by_component_digest(
1070 &self,
1071 pending_at_or_sooner: DateTime<Utc>,
1072 component_digest: &ComponentDigest,
1073 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1074 );
1075
1076 async fn cancel_activity_with_retries(
1077 &self,
1078 execution_id: &ExecutionId,
1079 cancelled_at: DateTime<Utc>,
1080 ) -> Result<CancelOutcome, DbErrorWrite> {
1081 let mut retries = 5;
1082 loop {
1083 let res = self.cancel_activity(execution_id, cancelled_at).await;
1084 if res.is_ok() || retries == 0 {
1085 return res;
1086 }
1087 retries -= 1;
1088 }
1089 }
1090
1091 async fn get_last_execution_event(
1093 &self,
1094 execution_id: &ExecutionId,
1095 ) -> Result<ExecutionEvent, DbErrorRead>;
1096
1097 async fn cancel_activity(
1098 &self,
1099 execution_id: &ExecutionId,
1100 cancelled_at: DateTime<Utc>,
1101 ) -> Result<CancelOutcome, DbErrorWrite> {
1102 debug!("Determining cancellation state of {execution_id}");
1103
1104 let last_event = self
1105 .get_last_execution_event(execution_id)
1106 .await
1107 .map_err(DbErrorWrite::from)?;
1108 if let ExecutionRequest::Finished {
1109 retval:
1110 SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1111 kind: ExecutionFailureKind::Cancelled,
1112 ..
1113 }),
1114 ..
1115 } = last_event.event
1116 {
1117 return Ok(CancelOutcome::Cancelled);
1118 } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
1119 debug!("Not cancelling, {execution_id} is already finished");
1120 return Ok(CancelOutcome::AlreadyFinished);
1121 }
1122 let finished_version = last_event.version.increment();
1123 let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1124 reason: None,
1125 kind: ExecutionFailureKind::Cancelled,
1126 detail: None,
1127 });
1128 let cancel_request = AppendRequest {
1129 created_at: cancelled_at,
1130 event: ExecutionRequest::Finished {
1131 retval: child_result.clone(),
1132 http_client_traces: None,
1133 },
1134 };
1135 debug!("Cancelling activity {execution_id} at {finished_version}");
1136 if let ExecutionId::Derived(execution_id) = execution_id {
1137 let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
1138 let child_execution_id = ExecutionId::Derived(execution_id.clone());
1139 self.append_batch_respond_to_parent(
1140 AppendEventsToExecution {
1141 execution_id: child_execution_id,
1142 version: finished_version.clone(),
1143 batch: vec![cancel_request],
1144 },
1145 AppendResponseToExecution {
1146 parent_execution_id,
1147 created_at: cancelled_at,
1148 join_set_id: join_set_id.clone(),
1149 child_execution_id: execution_id.clone(),
1150 finished_version,
1151 result: child_result,
1152 },
1153 cancelled_at,
1154 )
1155 .await?;
1156 } else {
1157 self.append(execution_id.clone(), finished_version, cancel_request)
1158 .await?;
1159 }
1160 debug!("Cancelled {execution_id}");
1161 Ok(CancelOutcome::Cancelled)
1162 }
1163}
1164
1165pub enum AppendDelayResponseOutcome {
1166 Success,
1167 AlreadyFinished,
1168 AlreadyCancelled,
1169}
1170
1171#[derive(Debug, Clone, Default)]
1172pub struct ListExecutionsFilter {
1173 pub ffqn_prefix: Option<String>,
1174 pub show_derived: bool,
1175 pub hide_finished: bool,
1176 pub execution_id_prefix: Option<String>,
1177 pub component_digest: Option<ComponentDigest>,
1178 pub deployment_id: Option<DeploymentId>,
1179}
1180
1181#[async_trait]
1182pub trait DbExternalApi: DbConnection {
1183 async fn get_backtrace(
1185 &self,
1186 execution_id: &ExecutionId,
1187 filter: BacktraceFilter,
1188 ) -> Result<BacktraceInfo, DbErrorRead>;
1189
1190 async fn upsert_source_file(
1194 &self,
1195 component_digest: &ComponentDigest,
1196 frame_key: &str,
1197 is_suffix: bool,
1198 content: &str,
1199 ) -> Result<(), DbErrorWrite>;
1200
1201 async fn get_source_file(
1205 &self,
1206 component_digest: &ComponentDigest,
1207 file: &str,
1208 ) -> Result<Option<String>, DbErrorRead>;
1209
1210 async fn list_executions(
1212 &self,
1213 filter: ListExecutionsFilter,
1214 pagination: ExecutionListPagination,
1215 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
1216
1217 async fn list_execution_events(
1222 &self,
1223 execution_id: &ExecutionId,
1224 pagination: Pagination<VersionType>,
1225 include_backtrace_id: bool,
1226 ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
1227
1228 async fn list_responses(
1237 &self,
1238 execution_id: &ExecutionId,
1239 pagination: Pagination<u32>,
1240 ) -> Result<ListResponsesResponse, DbErrorRead>;
1241
1242 async fn list_execution_events_responses(
1243 &self,
1244 execution_id: &ExecutionId,
1245 req_since: &Version,
1246 req_max_length: VersionType,
1247 req_include_backtrace_id: bool,
1248 resp_pagination: Pagination<VersionType>,
1249 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
1250
1251 async fn upgrade_execution_component(
1252 &self,
1253 execution_id: &ExecutionId,
1254 old: &ComponentDigest,
1255 new: &ComponentDigest,
1256 ) -> Result<(), DbErrorWrite>;
1257
1258 async fn list_logs(
1259 &self,
1260 execution_id: &ExecutionId,
1261 show_derived: bool,
1262 filter: LogFilter,
1263 pagination: Pagination<DateTime<Utc>>,
1264 ) -> Result<ListLogsResponse, DbErrorRead>;
1265
1266 async fn list_deployment_states(
1267 &self,
1268 current_time: DateTime<Utc>,
1269 pagination: Pagination<Option<DeploymentId>>,
1270 include_config_json: bool,
1271 ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1272
1273 async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite>;
1276
1277 async fn activate_deployment(
1278 &self,
1279 deployment_id: DeploymentId,
1280 now: DateTime<Utc>,
1281 ) -> Result<(), DbErrorWrite>;
1282
1283 async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite>;
1287
1288 async fn get_deployment(
1290 &self,
1291 deployment_id: DeploymentId,
1292 ) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1293
1294 #[cfg(feature = "test")]
1297 async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1298
1299 async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1302
1303 async fn list_deployments(
1304 &self,
1305 pagination: Pagination<Option<DeploymentId>>,
1306 ) -> Result<Vec<DeploymentRecord>, DbErrorRead>;
1307
1308 async fn pause_execution(
1310 &self,
1311 execution_id: &ExecutionId,
1312 paused_at: DateTime<Utc>,
1313 ) -> Result<AppendResponse, DbErrorWrite>;
1314
1315 async fn unpause_execution(
1317 &self,
1318 execution_id: &ExecutionId,
1319 unpaused_at: DateTime<Utc>,
1320 ) -> Result<AppendResponse, DbErrorWrite>;
1321}
1322pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1323pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1324 Pagination::OlderThan {
1325 length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1326 cursor: None,
1327 including_cursor: false,
1328 };
1329
1330pub struct DeploymentState {
1331 pub deployment_id: DeploymentId,
1332 pub locked: u32,
1333 pub pending: u32,
1335 pub scheduled: u32,
1337 pub blocked: u32,
1338 pub finished: u32,
1339 pub config_json: Option<String>,
1341 pub created_at: DateTime<Utc>,
1342 pub last_active_at: Option<DateTime<Utc>>,
1344 pub status: DeploymentStatus,
1345}
1346
1347#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1348pub enum DeploymentStatus {
1349 Inactive,
1350 Enqueued,
1352 Active,
1353}
1354
1355impl DeploymentStatus {
1356 #[must_use]
1357 pub fn as_str(&self) -> &'static str {
1358 match self {
1359 DeploymentStatus::Inactive => "inactive",
1360 DeploymentStatus::Enqueued => "enqueued",
1361 DeploymentStatus::Active => "active",
1362 }
1363 }
1364}
1365
1366impl std::str::FromStr for DeploymentStatus {
1367 type Err = StrVariant;
1368 fn from_str(s: &str) -> Result<Self, Self::Err> {
1369 match s {
1370 "inactive" => Ok(DeploymentStatus::Inactive),
1371 "enqueued" => Ok(DeploymentStatus::Enqueued),
1372 "active" => Ok(DeploymentStatus::Active),
1373 _ => Err(StrVariant::from(format!("unknown deployment status: {s}"))),
1374 }
1375 }
1376}
1377
1378#[derive(Debug, Clone)]
1379pub struct DeploymentRecord {
1380 pub deployment_id: DeploymentId,
1381 pub created_at: DateTime<Utc>,
1382 pub last_active_at: Option<DateTime<Utc>>,
1384 pub status: DeploymentStatus,
1385 pub config_json: String,
1386 pub obelisk_version: String,
1387 pub created_by: Option<String>,
1388}
1389
1390#[derive(Debug)]
1391pub struct ListLogsResponse {
1392 pub items: Vec<LogEntryRow>,
1393 pub next_page: Pagination<DateTime<Utc>>, pub prev_page: Option<Pagination<DateTime<Utc>>>, }
1396
1397#[derive(Debug)]
1398pub struct LogFilter {
1399 show_logs: bool,
1400 show_streams: bool,
1401 levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>, }
1404impl LogFilter {
1405 #[must_use]
1407 pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1408 LogFilter {
1409 show_logs: true,
1410 show_streams: false,
1411 levels,
1412 stream_types: Vec::new(),
1413 }
1414 }
1415 #[must_use]
1417 pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1418 LogFilter {
1419 show_logs: false,
1420 show_streams: true,
1421 levels: Vec::new(),
1422 stream_types,
1423 }
1424 }
1425 #[must_use]
1427 pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1428 LogFilter {
1429 show_logs: true,
1430 show_streams: true,
1431 levels,
1432 stream_types,
1433 }
1434 }
1435 #[must_use]
1437 pub fn should_show_logs(&self) -> bool {
1438 self.show_logs
1439 }
1440 #[must_use]
1441 pub fn should_show_streams(&self) -> bool {
1442 self.show_streams
1443 }
1444 #[must_use]
1445 pub fn levels(&self) -> &Vec<LogLevel> {
1446 &self.levels
1447 }
1448 #[must_use]
1449 pub fn stream_types(&self) -> &Vec<LogStreamType> {
1450 &self.stream_types
1451 }
1452}
1453
1454#[derive(Debug, Clone)]
1455pub struct ExecutionWithStateRequestsResponses {
1456 pub execution_with_state: ExecutionWithState,
1457 pub events: Vec<ExecutionEvent>,
1458 pub responses: Vec<ResponseWithCursor>,
1459 pub max_version: Version,
1460 pub max_cursor: ResponseCursor,
1461}
1462
1463#[async_trait]
1464pub trait DbConnection: DbExecutor {
1465 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1467
1468 async fn append_delay_response(
1469 &self,
1470 created_at: DateTime<Utc>,
1471 execution_id: ExecutionId,
1472 join_set_id: JoinSetId,
1473 delay_id: DelayId,
1474 outcome: Result<(), ()>, ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1476
1477 async fn append_batch(
1480 &self,
1481 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
1483 execution_id: ExecutionId,
1484 version: Version,
1485 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1486
1487 async fn append_batch_create_new_execution(
1490 &self,
1491 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
1494 version: Version,
1495 child_req: Vec<CreateRequest>,
1496 backtraces: Vec<BacktraceInfo>,
1497 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1498
1499 async fn get_execution_event(
1501 &self,
1502 execution_id: &ExecutionId,
1503 version: &Version,
1504 ) -> Result<ExecutionEvent, DbErrorRead>;
1505
1506 #[instrument(skip(self))]
1507 async fn get_create_request(
1508 &self,
1509 execution_id: &ExecutionId,
1510 ) -> Result<CreateRequest, DbErrorRead> {
1511 let execution_event = self
1512 .get_execution_event(execution_id, &Version::new(0))
1513 .await?;
1514 if let ExecutionRequest::Created {
1515 ffqn,
1516 params,
1517 parent,
1518 scheduled_at,
1519 component_id,
1520 deployment_id,
1521 metadata,
1522 scheduled_by,
1523 } = execution_event.event
1524 {
1525 Ok(CreateRequest {
1526 created_at: execution_event.created_at,
1527 execution_id: execution_id.clone(),
1528 ffqn,
1529 params,
1530 parent,
1531 scheduled_at,
1532 component_id,
1533 deployment_id,
1534 metadata,
1535 scheduled_by,
1536 })
1537 } else {
1538 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1539 reason: "execution log must start with creation".into(),
1540 context: SpanTrace::capture(),
1541 source: None,
1542 loc: Location::caller(),
1543 }))
1544 }
1545 }
1546
1547 async fn get_pending_state(
1548 &self,
1549 execution_id: &ExecutionId,
1550 ) -> Result<ExecutionWithState, DbErrorRead>;
1551
1552 async fn get_expired_timers(
1554 &self,
1555 at: DateTime<Utc>,
1556 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1557
1558 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1560
1561 async fn subscribe_to_next_responses(
1568 &self,
1569 execution_id: &ExecutionId,
1570 last_response: ResponseCursor,
1571 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1572 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1573
1574 async fn wait_for_finished_result(
1581 &self,
1582 execution_id: &ExecutionId,
1583 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1584 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1585
1586 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1587
1588 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1589
1590 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1591
1592 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1593
1594 #[cfg(feature = "test")]
1596 async fn get_finished_result(
1597 &self,
1598 execution_id: &ExecutionId,
1599 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1600 self.wait_for_finished_result(
1601 execution_id,
1602 Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1603 )
1604 .await
1605 }
1606}
1607
1608#[derive(Clone, Debug)]
1609pub struct LogInfoAppendRow {
1610 pub execution_id: ExecutionId,
1611 pub run_id: RunId,
1612 pub log_entry: LogEntry,
1613}
1614
1615#[derive(Debug, Clone)]
1616pub struct LogEntryRow {
1617 pub cursor: DateTime<Utc>,
1618 pub run_id: RunId,
1619 pub log_entry: LogEntry,
1620 pub execution_id: ExecutionId,
1621}
1622
1623#[derive(Debug, Clone)]
1624pub enum LogEntry {
1625 Log {
1626 created_at: DateTime<Utc>,
1627 level: LogLevel,
1628 message: String,
1629 },
1630 Stream {
1631 created_at: DateTime<Utc>,
1632 payload: Vec<u8>,
1633 stream_type: LogStreamType,
1634 },
1635}
1636impl LogEntry {
1637 #[must_use]
1638 pub fn created_at(&self) -> DateTime<Utc> {
1639 match self {
1640 LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1641 }
1642 }
1643}
1644
1645#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1646#[try_from(repr)]
1647#[repr(u8)]
1648pub enum LogLevel {
1649 Trace = 1,
1650 Debug,
1651 Info,
1652 Warn,
1653 Error,
1654}
1655#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1656#[try_from(repr)]
1657#[repr(u8)]
1658pub enum LogStreamType {
1659 StdOut = 1,
1660 StdErr,
1661}
1662
1663#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1664pub enum TimeoutOutcome {
1665 Timeout,
1666 Cancel,
1667}
1668
1669#[cfg(feature = "test")]
1670#[async_trait]
1671pub trait DbConnectionTest: DbConnection {
1672 async fn append_response(
1673 &self,
1674 created_at: DateTime<Utc>,
1675 execution_id: ExecutionId,
1676 response_event: JoinSetResponseEvent,
1677 ) -> Result<(), DbErrorWrite>;
1678}
1679
1680#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1681pub enum CancelOutcome {
1682 Cancelled,
1683 AlreadyFinished,
1684}
1685
1686#[instrument(skip(db_connection))]
1687pub async fn stub_execution(
1688 db_connection: &dyn DbConnection,
1689 execution_id: ExecutionIdDerived,
1690 parent_execution_id: ExecutionId,
1691 join_set_id: JoinSetId,
1692 created_at: DateTime<Utc>,
1693 return_value: SupportedFunctionReturnValue,
1694) -> Result<(), DbErrorWrite> {
1695 let stub_finished_version = Version::new(1); let write_attempt = {
1698 let finished_req = AppendRequest {
1699 created_at,
1700 event: ExecutionRequest::Finished {
1701 retval: return_value.clone(),
1702 http_client_traces: None,
1703 },
1704 };
1705 db_connection
1706 .append_batch_respond_to_parent(
1707 AppendEventsToExecution {
1708 execution_id: ExecutionId::Derived(execution_id.clone()),
1709 version: stub_finished_version.clone(),
1710 batch: vec![finished_req],
1711 },
1712 AppendResponseToExecution {
1713 parent_execution_id,
1714 created_at,
1715 join_set_id,
1716 child_execution_id: execution_id.clone(),
1717 finished_version: stub_finished_version.clone(),
1718 result: return_value.clone(),
1719 },
1720 created_at,
1721 )
1722 .await
1723 };
1724 if let Err(write_attempt) = write_attempt {
1725 debug!("Stub write attempt failed - {write_attempt:?}");
1727
1728 let found = db_connection
1729 .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1730 .await?; match found.event {
1732 ExecutionRequest::Finished {
1733 retval: found_result,
1734 ..
1735 } if return_value == found_result => {
1736 Ok(())
1738 }
1739 ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1740 DbErrorWriteNonRetriable::Conflict,
1741 )),
1742 _other => Err(DbErrorWrite::NonRetriable(
1743 DbErrorWriteNonRetriable::IllegalState {
1744 reason: "unexpected execution event at stubbed execution".into(),
1745 context: SpanTrace::capture(),
1746 source: None,
1747 loc: Location::caller(),
1748 },
1749 )),
1750 }
1751 } else {
1752 Ok(())
1753 }
1754}
1755
1756pub async fn cancel_delay(
1757 db_connection: &dyn DbConnection,
1758 delay_id: DelayId,
1759 cancelled_at: DateTime<Utc>,
1760) -> Result<CancelOutcome, DbErrorWrite> {
1761 let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1762 db_connection
1763 .append_delay_response(
1764 cancelled_at,
1765 parent_execution_id,
1766 join_set_id,
1767 delay_id,
1768 Err(()), )
1770 .await
1771 .map(|ok| match ok {
1772 AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1773 CancelOutcome::Cancelled
1774 }
1775 AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1776 })
1777}
1778
1779#[derive(Clone, Debug)]
1780pub enum BacktraceFilter {
1781 First,
1782 Last,
1783 Specific(Version),
1784}
1785
1786#[derive(Clone, Debug, PartialEq, Eq)]
1787pub struct BacktraceInfo {
1788 pub execution_id: ExecutionId,
1789 pub component_id: ComponentId,
1790 pub version_min_including: Version,
1791 pub version_max_excluding: Version,
1792 pub wasm_backtrace: WasmBacktrace,
1793}
1794
1795#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1796pub struct WasmBacktrace {
1797 pub frames: Vec<FrameInfo>,
1798}
1799
1800#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1801pub struct FrameInfo {
1802 pub module: String,
1803 pub func_name: String,
1804 pub symbols: Vec<FrameSymbol>,
1805}
1806
1807#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1808pub struct FrameSymbol {
1809 pub func_name: Option<String>,
1810 pub file: Option<String>,
1811 pub line: Option<u32>,
1812 pub col: Option<u32>,
1813}
1814
1815mod wasm_backtrace {
1816 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1817
1818 impl WasmBacktrace {
1819 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1820 if backtrace.frames().is_empty() {
1821 None
1822 } else {
1823 Some(Self {
1824 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1825 })
1826 }
1827 }
1828 }
1829
1830 impl From<&wasmtime::FrameInfo> for FrameInfo {
1831 fn from(frame: &wasmtime::FrameInfo) -> Self {
1832 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1833 let mut func_name = String::new();
1834 wasmtime_environ::demangle_function_name_or_index(
1835 &mut func_name,
1836 frame.func_name(),
1837 frame.func_index() as usize,
1838 )
1839 .expect("writing to string must succeed");
1840 Self {
1841 module: module_name,
1842 func_name,
1843 symbols: frame
1844 .symbols()
1845 .iter()
1846 .map(std::convert::Into::into)
1847 .collect(),
1848 }
1849 }
1850 }
1851
1852 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1853 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1854 let func_name = symbol.name().map(|name| {
1855 let mut writer = String::new();
1856 wasmtime_environ::demangle_function_name(&mut writer, name)
1857 .expect("writing to string must succeed");
1858 writer
1859 });
1860
1861 Self {
1862 func_name,
1863 file: symbol.file().map(ToString::to_string),
1864 line: symbol.line(),
1865 col: symbol.column(),
1866 }
1867 }
1868 }
1869}
1870#[derive(Debug, Clone, derive_more::Display)]
1871#[display("{execution_id} {pending_state} {component_digest}")]
1872pub struct ExecutionWithState {
1873 pub execution_id: ExecutionId,
1874 pub ffqn: FunctionFqn,
1875 pub pending_state: PendingState,
1876 pub created_at: DateTime<Utc>,
1877 pub first_scheduled_at: DateTime<Utc>,
1878 pub component_digest: ComponentDigest,
1879 pub component_type: ComponentType,
1880 pub deployment_id: DeploymentId,
1881}
1882
1883#[derive(Debug, Clone)]
1884pub enum ExecutionListPagination {
1885 CreatedBy(Pagination<Option<DateTime<Utc>>>),
1886 ExecutionId(Pagination<Option<ExecutionId>>),
1887}
1888impl Default for ExecutionListPagination {
1889 fn default() -> ExecutionListPagination {
1890 ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1891 length: 20,
1892 cursor: None,
1893 including_cursor: false, })
1895 }
1896}
1897impl ExecutionListPagination {
1898 #[must_use]
1899 pub fn length(&self) -> u16 {
1900 match self {
1901 ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1902 ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1903 }
1904 }
1905}
1906
1907#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1908pub enum Pagination<T> {
1909 NewerThan {
1910 length: u16,
1911 cursor: T,
1912 including_cursor: bool,
1913 },
1914 OlderThan {
1915 length: u16,
1916 cursor: T,
1917 including_cursor: bool,
1918 },
1919}
1920impl<T: Clone> Pagination<T> {
1921 pub fn length(&self) -> u16 {
1922 match self {
1923 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1924 }
1925 }
1926
1927 pub fn rel(&self) -> &'static str {
1928 match self {
1929 Pagination::NewerThan {
1930 including_cursor: false,
1931 ..
1932 } => ">",
1933 Pagination::NewerThan {
1934 including_cursor: true,
1935 ..
1936 } => ">=",
1937 Pagination::OlderThan {
1938 including_cursor: false,
1939 ..
1940 } => "<",
1941 Pagination::OlderThan {
1942 including_cursor: true,
1943 ..
1944 } => "<=",
1945 }
1946 }
1947
1948 pub fn is_desc(&self) -> bool {
1949 matches!(self, Pagination::OlderThan { .. })
1950 }
1951
1952 pub fn asc_or_desc(&self) -> &'static str {
1953 if self.is_asc() { "asc" } else { "desc" }
1954 }
1955
1956 pub fn is_asc(&self) -> bool {
1957 !self.is_desc()
1958 }
1959
1960 pub fn cursor(&self) -> &T {
1961 match self {
1962 Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1963 }
1964 }
1965
1966 #[must_use]
1967 pub fn invert(&self) -> Self {
1968 match self {
1969 Pagination::NewerThan {
1970 length,
1971 cursor,
1972 including_cursor,
1973 } => Pagination::OlderThan {
1974 length: *length,
1975 cursor: cursor.clone(),
1976 including_cursor: !including_cursor,
1977 },
1978 Pagination::OlderThan {
1979 length,
1980 cursor,
1981 including_cursor,
1982 } => Pagination::NewerThan {
1983 length: *length,
1984 cursor: cursor.clone(),
1985 including_cursor: !including_cursor,
1986 },
1987 }
1988 }
1989}
1990
1991#[cfg(feature = "test")]
1992pub async fn wait_for_pending_state_fn<T: Debug>(
1993 db_connection: &dyn DbConnectionTest,
1994 execution_id: &ExecutionId,
1995 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1996 timeout: Option<Duration>,
1997) -> Result<T, DbErrorReadWithTimeout> {
1998 tracing::trace!(%execution_id, "Waiting for predicate");
1999 let fut = async move {
2000 loop {
2001 let execution_log = db_connection.get(execution_id).await?;
2002 if let Some(t) = predicate(execution_log) {
2003 tracing::debug!(%execution_id, "Found: {t:?}");
2004 return Ok(t);
2005 }
2006 tokio::time::sleep(Duration::from_millis(10)).await;
2007 }
2008 };
2009
2010 if let Some(timeout) = timeout {
2011 tokio::select! { res = fut => res,
2013 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
2014 }
2015 } else {
2016 fut.await
2017 }
2018}
2019
2020#[derive(Debug, Clone, PartialEq, Eq)]
2021pub enum ExpiredTimer {
2022 Lock(ExpiredLock),
2023 Delay(ExpiredDelay),
2024}
2025
2026#[derive(Debug, Clone, PartialEq, Eq)]
2027pub struct ExpiredLock {
2028 pub execution_id: ExecutionId,
2029 pub locked_at_version: Version,
2031 pub next_version: Version,
2032 pub intermittent_event_count: u32,
2034 pub max_retries: Option<u32>,
2035 pub retry_exp_backoff: Duration,
2036 pub locked_by: LockedBy,
2037}
2038
2039#[derive(Debug, Clone, PartialEq, Eq)]
2040pub struct ExpiredDelay {
2041 pub execution_id: ExecutionId,
2042 pub join_set_id: JoinSetId,
2043 pub delay_id: DelayId,
2044}
2045
2046#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2047#[serde(tag = "status", rename_all = "snake_case")]
2048pub enum PendingState {
2049 Locked(PendingStateLocked),
2051
2052 #[display("PendingAt(`{_0}`)")]
2053 PendingAt(PendingStatePendingAt),
2054
2055 #[display("BlockedByJoinSet({_0})")]
2057 BlockedByJoinSet(PendingStateBlockedByJoinSet),
2058
2059 #[display("Paused({_0})")]
2060 Paused(PendingStatePaused),
2061
2062 #[display("Finished({_0})")]
2063 Finished(PendingStateFinished),
2064}
2065
2066pub enum PendingStateMergedPause {
2067 Locked {
2068 state: PendingStateLocked,
2069 paused: bool,
2070 },
2071 PendingAt {
2072 state: PendingStatePendingAt,
2073 paused: bool,
2074 },
2075 BlockedByJoinSet {
2076 state: PendingStateBlockedByJoinSet,
2077 paused: bool,
2078 },
2079 Finished(PendingStateFinished),
2080}
2081impl From<PendingState> for PendingStateMergedPause {
2082 fn from(state: PendingState) -> Self {
2083 match state {
2084 PendingState::Locked(s) => PendingStateMergedPause::Locked {
2085 state: s,
2086 paused: false,
2087 },
2088
2089 PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
2090 state: s,
2091 paused: false,
2092 },
2093
2094 PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
2095 state: s,
2096 paused: false,
2097 },
2098
2099 PendingState::Paused(paused) => match paused {
2100 PendingStatePaused::Locked(s) => PendingStateMergedPause::Locked {
2101 state: s,
2102 paused: true,
2103 },
2104 PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
2105 state: s,
2106 paused: true,
2107 },
2108 PendingStatePaused::BlockedByJoinSet(s) => {
2109 PendingStateMergedPause::BlockedByJoinSet {
2110 state: s,
2111 paused: true,
2112 }
2113 }
2114 },
2115
2116 PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
2117 }
2118 }
2119}
2120
2121#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2122#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
2123pub struct PendingStateLocked {
2124 pub locked_by: LockedBy,
2125 pub lock_expires_at: DateTime<Utc>,
2126}
2127
2128#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2129#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
2130pub struct PendingStatePendingAt {
2131 pub scheduled_at: DateTime<Utc>,
2132 pub last_lock: Option<LockedBy>,
2134}
2135
2136#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2137#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
2138pub struct PendingStateBlockedByJoinSet {
2139 pub join_set_id: JoinSetId,
2140 pub lock_expires_at: DateTime<Utc>,
2142 pub closing: bool,
2144}
2145
2146#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2148pub enum PendingStatePaused {
2149 #[display("Locked({_0})")]
2150 Locked(PendingStateLocked),
2151 #[display("PendingAt({_0})")]
2152 PendingAt(PendingStatePendingAt),
2153 #[display("BlockedByJoinSet({_0})")]
2154 BlockedByJoinSet(PendingStateBlockedByJoinSet),
2155}
2156
2157#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2158pub struct LockedBy {
2159 pub executor_id: ExecutorId,
2160 pub run_id: RunId,
2161}
2162impl From<&Locked> for LockedBy {
2163 fn from(value: &Locked) -> Self {
2164 LockedBy {
2165 executor_id: value.executor_id,
2166 run_id: value.run_id,
2167 }
2168 }
2169}
2170
2171#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2172#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
2173pub struct PendingStateFinished {
2174 pub version: VersionType, pub finished_at: DateTime<Utc>,
2176 pub result_kind: PendingStateFinishedResultKind,
2177}
2178impl Display for PendingStateFinished {
2179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2180 match self.result_kind {
2181 PendingStateFinishedResultKind::Ok => write!(f, "ok"),
2182 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
2183 }
2184 }
2185}
2186
2187#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2189#[serde(rename_all = "snake_case")]
2190pub enum PendingStateFinishedResultKind {
2191 Ok,
2192 Err(PendingStateFinishedError),
2193}
2194impl PendingStateFinishedResultKind {
2195 pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
2196 match self {
2197 PendingStateFinishedResultKind::Ok => Ok(()),
2198 PendingStateFinishedResultKind::Err(err) => Err(err),
2199 }
2200 }
2201}
2202
2203impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
2204 fn from(result: &SupportedFunctionReturnValue) -> Self {
2205 result.as_pending_state_finished_result()
2206 }
2207}
2208
2209#[derive(
2210 Debug,
2211 Clone,
2212 Copy,
2213 PartialEq,
2214 Eq,
2215 Serialize,
2216 Deserialize,
2217 derive_more::Display,
2218 schemars::JsonSchema,
2219)]
2220#[serde(rename_all = "snake_case")]
2221pub enum PendingStateFinishedError {
2222 #[display("{_0}")]
2223 ExecutionFailure(ExecutionFailureKind),
2224 #[display("completed with an error")]
2225 Error,
2226}
2227
2228impl PendingState {
2229 #[instrument(skip(self))]
2230 pub fn can_append_lock(
2231 &self,
2232 created_at: DateTime<Utc>,
2233 executor_id: ExecutorId,
2234 run_id: RunId,
2235 lock_expires_at: DateTime<Utc>,
2236 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
2237 if lock_expires_at <= created_at {
2238 return Err(DbErrorWriteNonRetriable::ValidationFailed(
2239 "invalid expiry date".into(),
2240 ));
2241 }
2242 match self {
2243 PendingState::PendingAt(PendingStatePendingAt {
2244 scheduled_at,
2245 last_lock,
2246 }) => {
2247 if *scheduled_at <= created_at {
2248 Ok(LockKind::CreatingNewLock)
2250 } else if let Some(LockedBy {
2251 executor_id: last_executor_id,
2252 run_id: last_run_id,
2253 }) = last_lock
2254 && executor_id == *last_executor_id
2255 && run_id == *last_run_id
2256 {
2257 Ok(LockKind::Extending)
2259 } else {
2260 Err(DbErrorWriteNonRetriable::ValidationFailed(
2261 "cannot lock, not yet pending".into(),
2262 ))
2263 }
2264 }
2265 PendingState::Locked(PendingStateLocked {
2266 locked_by:
2267 LockedBy {
2268 executor_id: current_pending_state_executor_id,
2269 run_id: current_pending_state_run_id,
2270 },
2271 lock_expires_at: _,
2272 }) => {
2273 if executor_id == *current_pending_state_executor_id
2274 && run_id == *current_pending_state_run_id
2275 {
2276 Ok(LockKind::Extending)
2278 } else {
2279 Err(DbErrorWriteNonRetriable::IllegalState {
2280 reason: "cannot lock, already locked".into(),
2281 context: SpanTrace::capture(),
2282 source: None,
2283 loc: Location::caller(),
2284 })
2285 }
2286 }
2287 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2288 reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
2289 context: SpanTrace::capture(),
2290 source: None,
2291 loc: Location::caller(),
2292 }),
2293 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2294 reason: "already finished".into(),
2295 context: SpanTrace::capture(),
2296 source: None,
2297 loc: Location::caller(),
2298 }),
2299 PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
2300 reason: "cannot lock, execution is paused".into(),
2301 context: SpanTrace::capture(),
2302 source: None,
2303 loc: Location::caller(),
2304 }),
2305 }
2306 }
2307
2308 #[must_use]
2309 pub fn is_finished(&self) -> bool {
2310 matches!(self, PendingState::Finished { .. })
2311 }
2312
2313 #[must_use]
2314 pub fn is_paused(&self) -> bool {
2315 matches!(self, PendingState::Paused(_))
2316 }
2317}
2318
2319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2320pub enum LockKind {
2321 Extending,
2322 CreatingNewLock,
2323}
2324
2325pub mod http_client_trace {
2326 use chrono::{DateTime, Utc};
2327 use serde::{Deserialize, Serialize};
2328
2329 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2330 pub struct HttpClientTrace {
2331 pub req: RequestTrace,
2332 pub resp: Option<ResponseTrace>,
2333 }
2334
2335 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2336 pub struct RequestTrace {
2337 pub sent_at: DateTime<Utc>,
2338 pub uri: String,
2339 pub method: String,
2340 }
2341
2342 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2343 pub struct ResponseTrace {
2344 pub finished_at: DateTime<Utc>,
2345 pub status: Result<u16, String>,
2346 }
2347}
2348
2349#[derive(schemars::JsonSchema)]
2351pub struct DbStorageSchema {
2352 pub execution_event: ExecutionEvent,
2353 pub pending_state: PendingState,
2354 pub join_set_response: JoinSetResponse,
2355 pub wasm_backtrace: WasmBacktrace,
2356}
2357
2358#[cfg(test)]
2359mod tests {
2360 use super::HistoryEvent;
2361 use super::HistoryEventScheduleAt;
2362 use super::JoinNextTryOutcome;
2363 use super::PendingStateFinished;
2364 use super::PendingStateFinishedError;
2365 use super::PendingStateFinishedResultKind;
2366 use crate::ExecutionFailureKind;
2367 use crate::JoinSetId;
2368 use crate::SupportedFunctionReturnValue;
2369 use chrono::DateTime;
2370 use chrono::Datelike;
2371 use chrono::Utc;
2372 use insta::assert_snapshot;
2373 use rstest::rstest;
2374 use std::time::Duration;
2375 use val_json::type_wrapper::TypeWrapper;
2376 use val_json::wast_val::WastVal;
2377 use val_json::wast_val::WastValWithType;
2378
2379 #[rstest(expected => [
2380 PendingStateFinishedResultKind::Ok,
2381 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2382 ])]
2383 #[test]
2384 fn serde_pending_state_finished_result_kind_should_work(
2385 expected: PendingStateFinishedResultKind,
2386 ) {
2387 let ser = serde_json::to_string(&expected).unwrap();
2388 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2389 assert_eq!(expected, actual);
2390 }
2391
2392 #[rstest(result_kind => [
2393 PendingStateFinishedResultKind::Ok,
2394 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2395 ])]
2396 #[test]
2397 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2398 let expected = PendingStateFinished {
2399 version: 0,
2400 finished_at: Utc::now(),
2401 result_kind,
2402 };
2403
2404 let ser = serde_json::to_string(&expected).unwrap();
2405 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2406 assert_eq!(expected, actual);
2407 }
2408
2409 #[test]
2410 fn join_set_deser_with_result_ok_option_none_should_work() {
2411 let expected = SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2412 r#type: TypeWrapper::Result {
2413 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2414 err: Some(Box::new(TypeWrapper::String)),
2415 },
2416 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2417 }));
2418 let json = serde_json::to_string(&expected).unwrap();
2419 assert_snapshot!(json);
2420
2421 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2422
2423 assert_eq!(expected, actual);
2424 }
2425
2426 #[test]
2427 fn as_date_time_should_work_with_duration_u32_max_secs() {
2428 let duration = Duration::from_secs(u64::from(u32::MAX));
2429 let schedule_at = HistoryEventScheduleAt::In(duration);
2430 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2431 assert_eq!(2106, resolved.year());
2432 }
2433
2434 const MILLIS_PER_SEC: i64 = 1000;
2435 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2436
2437 #[test]
2438 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2439 let duration = Duration::from_secs(
2441 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2442 );
2443 let schedule_at = HistoryEventScheduleAt::In(duration);
2444 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2445 }
2446
2447 #[test]
2448 fn join_next_try_outcome_new_format() {
2449 let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"found"}"#;
2450 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2451 assert_eq!(
2452 event,
2453 HistoryEvent::JoinNextTry {
2454 join_set_id: JoinSetId::new(
2455 crate::JoinSetKind::Named,
2456 crate::StrVariant::Static("test")
2457 )
2458 .unwrap(),
2459 outcome: JoinNextTryOutcome::Found,
2460 }
2461 );
2462
2463 let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"all_processed"}"#;
2464 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2465 assert_eq!(
2466 event,
2467 HistoryEvent::JoinNextTry {
2468 join_set_id: JoinSetId::new(
2469 crate::JoinSetKind::Named,
2470 crate::StrVariant::Static("test")
2471 )
2472 .unwrap(),
2473 outcome: JoinNextTryOutcome::AllProcessed,
2474 }
2475 );
2476 }
2477
2478 #[test]
2479 fn join_next_try_outcome_serializes_new_format() {
2480 let event = HistoryEvent::JoinNextTry {
2481 join_set_id: JoinSetId::new(
2482 crate::JoinSetKind::Named,
2483 crate::StrVariant::Static("test"),
2484 )
2485 .unwrap(),
2486 outcome: JoinNextTryOutcome::AllProcessed,
2487 };
2488 let json = serde_json::to_string(&event).unwrap();
2489 assert!(
2490 json.contains(r#""outcome":"all_processed""#),
2491 "expected outcome field, got: {json}"
2492 );
2493 assert!(
2494 !json.contains("found_response"),
2495 "should not contain old field, got: {json}"
2496 );
2497 }
2498
2499 mod stub_retval_hash {
2500 use super::super::{StubRetVal, StubRetValHash};
2501 use crate::SupportedFunctionReturnValue;
2502 use val_json::type_wrapper::TypeWrapper;
2503 use val_json::wast_val::{WastVal, WastValWithType};
2504
2505 #[test]
2506 fn typed_variant_hash_is_stable() {
2507 let retval =
2508 StubRetVal::Typed(SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2509 r#type: TypeWrapper::String,
2510 value: WastVal::String("hello".into()),
2511 })));
2512 let hash = retval.hash();
2513 assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2515 assert_eq!(hash.to_string().len(), 66);
2517 }
2518
2519 #[test]
2520 fn untyped_variant_hash_is_stable() {
2521 let retval = StubRetVal::Untyped(r#"{"ok": "hello"}"#.to_string());
2522 let hash = retval.hash();
2523 assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2525 assert_eq!(hash.to_string().len(), 66);
2527 }
2528
2529 #[test]
2530 fn different_values_produce_different_hashes() {
2531 let typed1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2532 let typed2 = StubRetVal::Typed(SupportedFunctionReturnValue::Err(None));
2533 let untyped1 = StubRetVal::Untyped("value1".to_string());
2534 let untyped2 = StubRetVal::Untyped("value2".to_string());
2535
2536 let hashes: Vec<_> = [typed1, typed2, untyped1, untyped2]
2537 .into_iter()
2538 .map(|r| r.hash().to_string())
2539 .collect();
2540
2541 for (i, h1) in hashes.iter().enumerate() {
2543 for h2 in hashes.iter().skip(i + 1) {
2544 assert_ne!(h1, h2, "hashes should be different");
2545 }
2546 }
2547 }
2548
2549 #[test]
2550 fn same_values_produce_same_hashes() {
2551 let retval1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2552 let retval2 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2553 assert_eq!(retval1.hash(), retval2.hash());
2554
2555 let untyped1 = StubRetVal::Untyped("test".to_string());
2556 let untyped2 = StubRetVal::Untyped("test".to_string());
2557 assert_eq!(untyped1.hash(), untyped2.hash());
2558 }
2559
2560 #[test]
2561 fn hash_serialization_roundtrip() {
2562 let retval = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2563 let hash = retval.hash();
2564
2565 let serialized = serde_json::to_string(&hash).unwrap();
2566 let deserialized: StubRetValHash = serde_json::from_str(&serialized).unwrap();
2567
2568 assert_eq!(hash, deserialized);
2569 }
2570
2571 #[test]
2572 fn hash_display_and_fromstr_roundtrip() {
2573 let retval = StubRetVal::Untyped("test value".to_string());
2574 let hash = retval.hash();
2575
2576 let display = hash.to_string();
2577 let parsed: StubRetValHash = display.parse().unwrap();
2578
2579 assert_eq!(hash, parsed);
2580 }
2581
2582 #[test]
2583 fn typed_and_untyped_with_same_content_produce_different_hashes() {
2584 let typed = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2586 let json_of_typed =
2587 serde_json::to_string(&SupportedFunctionReturnValue::Ok(None)).unwrap();
2588 let untyped = StubRetVal::Untyped(json_of_typed);
2589
2590 assert_ne!(typed.hash(), untyped.hash());
2591 }
2592 }
2593}