1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionError;
8use crate::FunctionFqn;
9use crate::JoinSetId;
10use crate::Params;
11use crate::StrVariant;
12use crate::SupportedFunctionReturnValue;
13use crate::component_id::ComponentDigest;
14use crate::prefixed_ulid::DelayId;
15use crate::prefixed_ulid::DeploymentId;
16use crate::prefixed_ulid::ExecutionIdDerived;
17use crate::prefixed_ulid::ExecutorId;
18use crate::prefixed_ulid::RunId;
19use assert_matches::assert_matches;
20use async_trait::async_trait;
21use chrono::TimeDelta;
22use chrono::{DateTime, Utc};
23use http_client_trace::HttpClientTrace;
24use serde::Deserialize;
25use serde::Serialize;
26use std::fmt::Debug;
27use std::fmt::Display;
28use std::panic::Location;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33use tracing::instrument;
34use tracing_error::SpanTrace;
35
36pub const STATE_PENDING_AT: &str = "pending_at";
38pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
39pub const STATE_LOCKED: &str = "locked";
40pub const STATE_FINISHED: &str = "finished";
41pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; #[derive(Debug, PartialEq, Eq, Clone)]
44pub struct ExecutionLog {
45 pub execution_id: ExecutionId,
46 pub events: Vec<ExecutionEvent>,
47 pub responses: Vec<ResponseWithCursor>,
48 pub next_version: Version, pub pending_state: PendingState, pub component_digest: ComponentDigest, pub component_type: ComponentType,
52 pub deployment_id: DeploymentId, }
54
55impl ExecutionLog {
56 #[must_use]
59 pub fn can_be_retried_after(
60 temporary_event_count: u32,
61 max_retries: Option<u32>,
62 retry_exp_backoff: Duration,
63 ) -> Option<Duration> {
64 if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
66 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
68 Some(duration)
69 } else {
70 None
71 }
72 }
73
74 #[must_use]
75 pub fn compute_retry_duration_when_retrying_forever(
76 temporary_event_count: u32,
77 retry_exp_backoff: Duration,
78 ) -> Duration {
79 Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
80 .expect("`max_retries` set to MAX must never return None")
81 }
82
83 #[must_use]
84 pub fn ffqn(&self) -> &FunctionFqn {
85 assert_matches!(self.events.first(), Some(ExecutionEvent {
86 event: ExecutionRequest::Created { ffqn, .. },
87 ..
88 }) => ffqn)
89 }
90
91 #[must_use]
92 pub fn params(&self) -> &Params {
93 assert_matches!(self.events.first(), Some(ExecutionEvent {
94 event: ExecutionRequest::Created { params, .. },
95 ..
96 }) => params)
97 }
98
99 #[must_use]
100 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
101 assert_matches!(self.events.first(), Some(ExecutionEvent {
102 event: ExecutionRequest::Created { parent, .. },
103 ..
104 }) => parent.clone())
105 }
106
107 #[must_use]
108 pub fn last_event(&self) -> &ExecutionEvent {
109 self.events.last().expect("must contain at least one event")
110 }
111
112 #[must_use]
113 pub fn is_finished(&self) -> bool {
114 matches!(
115 self.events.last(),
116 Some(ExecutionEvent {
117 event: ExecutionRequest::Finished { .. },
118 ..
119 })
120 )
121 }
122
123 #[must_use]
124 pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
125 if let ExecutionEvent {
126 event: ExecutionRequest::Finished { retval: result, .. },
127 ..
128 } = self.events.last().expect("must contain at least one event")
129 {
130 Some(result.clone())
131 } else {
132 None
133 }
134 }
135
136 pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
137 self.events.iter().filter_map(|event| {
138 if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
139 Some((eh.clone(), event.version.clone()))
140 } else {
141 None
142 }
143 })
144 }
145
146 #[cfg(feature = "test")]
147 #[must_use]
148 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
149 self.events
150 .iter()
151 .find_map(move |event| match &event.event {
152 ExecutionRequest::HistoryEvent {
153 event:
154 HistoryEvent::JoinSetRequest {
155 join_set_id: found,
156 request,
157 },
158 ..
159 } if *join_set_id == *found => Some(request),
160 _ => None,
161 })
162 }
163}
164
165pub type VersionType = u32;
166#[derive(
167 Debug,
168 Default,
169 Clone,
170 PartialEq,
171 Eq,
172 Hash,
173 derive_more::Display,
174 derive_more::Into,
175 serde::Serialize,
176 serde::Deserialize,
177 schemars::JsonSchema,
178)]
179#[serde(transparent)]
180#[schemars(transparent)]
181pub struct Version(pub VersionType);
182impl Version {
183 #[must_use]
184 pub fn new(arg: VersionType) -> Version {
185 Version(arg)
186 }
187
188 #[must_use]
189 pub fn increment(&self) -> Version {
190 Version(self.0 + 1)
191 }
192}
193impl TryFrom<i64> for Version {
194 type Error = VersionParseError;
195 fn try_from(value: i64) -> Result<Self, Self::Error> {
196 VersionType::try_from(value)
197 .map(Version::new)
198 .map_err(|_| VersionParseError)
199 }
200}
201impl From<Version> for usize {
202 fn from(value: Version) -> Self {
203 usize::try_from(value.0).expect("16 bit systems are unsupported")
204 }
205}
206impl From<&Version> for usize {
207 fn from(value: &Version) -> Self {
208 usize::try_from(value.0).expect("16 bit systems are unsupported")
209 }
210}
211
212#[derive(Debug, thiserror::Error)]
213#[error("version must be u32")]
214pub struct VersionParseError;
215
216#[derive(
217 Clone,
218 Debug,
219 derive_more::Display,
220 PartialEq,
221 Eq,
222 serde::Serialize,
223 serde::Deserialize,
224 schemars::JsonSchema,
225)]
226#[display("{event}")]
227pub struct ExecutionEvent {
228 pub created_at: DateTime<Utc>,
229 pub event: ExecutionRequest,
230 #[serde(skip_serializing_if = "Option::is_none")]
231 pub backtrace_id: Option<Version>,
232 pub version: Version,
233}
234
235#[derive(
236 Debug,
237 Clone,
238 Copy,
239 PartialEq,
240 Eq,
241 derive_more::Display,
242 derive_more::Into,
243 Serialize, schemars::JsonSchema,
245)]
246pub struct ResponseCursor(pub u32);
247
248#[derive(Debug, Clone, PartialEq, Eq, Serialize , schemars::JsonSchema)]
249pub struct ResponseWithCursor {
250 pub event: JoinSetResponseEventOuter,
251 pub cursor: ResponseCursor,
252}
253
254#[derive(Debug)]
255pub struct ListExecutionEventsResponse {
256 pub events: Vec<ExecutionEvent>,
257 pub max_version: Version,
258}
259
260#[derive(Debug)]
261pub struct ListResponsesResponse {
262 pub responses: Vec<ResponseWithCursor>,
263 pub max_cursor: ResponseCursor,
264}
265
266#[derive(Debug, Clone, PartialEq, Eq, Serialize , schemars::JsonSchema)]
267pub struct JoinSetResponseEventOuter {
268 pub created_at: DateTime<Utc>,
269 pub event: JoinSetResponseEvent,
270}
271
272#[derive(
273 Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
274)]
275pub struct JoinSetResponseEvent {
276 pub join_set_id: JoinSetId,
277 pub event: JoinSetResponse,
278}
279
280#[derive(
281 Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display, schemars::JsonSchema,
282)]
283#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
284#[serde(tag = "type", rename_all = "snake_case")]
285pub enum JoinSetResponse {
286 #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
287 DelayFinished {
288 delay_id: DelayId,
289 result: Result<(), ()>,
290 },
291 #[display("{result}: {child_execution_id}")] ChildExecutionFinished {
293 child_execution_id: ExecutionIdDerived,
294 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
295 finished_version: Version,
296 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
297 result: SupportedFunctionReturnValue,
298 },
299}
300
301pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
302 ffqn: FunctionFqn::new_static("", ""),
303 params: Params::empty(),
304 parent: None,
305 scheduled_at: DateTime::from_timestamp_nanos(0),
306 component_id: ComponentId::dummy_activity(),
307 deployment_id: DeploymentId::from_parts(0, 0),
308 metadata: ExecutionMetadata::empty(),
309 scheduled_by: None,
310};
311pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
312 event: HistoryEvent::JoinSetCreate {
313 join_set_id: JoinSetId {
314 kind: crate::JoinSetKind::OneOff,
315 name: StrVariant::empty(),
316 },
317 },
318};
319
320#[derive(
321 Clone,
322 derive_more::Debug,
323 derive_more::Display,
324 PartialEq,
325 Eq,
326 Serialize,
327 Deserialize,
328 schemars::JsonSchema,
329)]
330#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
331#[serde(rename_all = "snake_case")]
332pub enum ExecutionRequest {
333 #[display("Created({ffqn}, `{scheduled_at}`)")]
334 Created {
335 ffqn: FunctionFqn,
336 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
337 #[debug(skip)]
338 params: Params,
339 parent: Option<(ExecutionId, JoinSetId)>,
340 scheduled_at: DateTime<Utc>,
341 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
342 component_id: ComponentId,
343 deployment_id: DeploymentId,
344 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
345 metadata: ExecutionMetadata,
346 scheduled_by: Option<ExecutionId>,
347 },
348 Locked(Locked),
349 #[display("Unlocked(`{backoff_expires_at}`)")]
355 Unlocked {
356 backoff_expires_at: DateTime<Utc>,
357 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
358 reason: StrVariant,
359 },
360 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
363 TemporarilyFailed {
364 backoff_expires_at: DateTime<Utc>,
365 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
366 reason: StrVariant,
367 detail: Option<String>,
368 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
369 http_client_traces: Option<Vec<HttpClientTrace>>,
370 },
371 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
374 TemporarilyTimedOut {
375 backoff_expires_at: DateTime<Utc>,
376 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
377 http_client_traces: Option<Vec<HttpClientTrace>>,
378 },
379 #[display("Finished")]
381 Finished {
382 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
383 retval: SupportedFunctionReturnValue,
384 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
385 http_client_traces: Option<Vec<HttpClientTrace>>,
386 },
387
388 #[display("HistoryEvent({event})")]
389 HistoryEvent {
390 event: HistoryEvent,
391 },
392 #[display("Paused")]
393 Paused,
394 #[display("Unpaused")]
395 Unpaused,
396}
397
398impl ExecutionRequest {
399 #[must_use]
400 pub fn is_temporary_event(&self) -> bool {
401 matches!(
402 self,
403 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
404 )
405 }
406
407 #[must_use]
409 pub const fn variant(&self) -> &'static str {
410 match self {
411 ExecutionRequest::Created { .. } => "created",
412 ExecutionRequest::Locked(_) => "locked",
413 ExecutionRequest::Unlocked { .. } => "unlocked",
414 ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
415 ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
416 ExecutionRequest::Finished { .. } => "finished",
417 ExecutionRequest::HistoryEvent { .. } => "history_event",
418 ExecutionRequest::Paused => "paused",
419 ExecutionRequest::Unpaused => "unpaused",
420 }
421 }
422
423 #[must_use]
424 pub fn join_set_id(&self) -> Option<&JoinSetId> {
425 match self {
426 Self::Created {
427 parent: Some((_parent_id, join_set_id)),
428 ..
429 } => Some(join_set_id),
430 Self::HistoryEvent {
431 event:
432 HistoryEvent::JoinSetCreate { join_set_id, .. }
433 | HistoryEvent::JoinSetRequest { join_set_id, .. }
434 | HistoryEvent::JoinNext { join_set_id, .. },
435 } => Some(join_set_id),
436 _ => None,
437 }
438 }
439}
440
441#[derive(
442 Clone,
443 derive_more::Debug,
444 derive_more::Display,
445 PartialEq,
446 Eq,
447 Serialize,
448 Deserialize,
449 schemars::JsonSchema,
450)]
451#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
452#[display("Locked(`{lock_expires_at}`, {component_id})")]
453pub struct Locked {
454 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
455 pub component_id: ComponentId,
456 pub executor_id: ExecutorId,
457 pub deployment_id: DeploymentId,
458 pub run_id: RunId,
459 pub lock_expires_at: DateTime<Utc>,
460 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
461 pub retry_config: ComponentRetryConfig,
462}
463
464#[derive(
465 Debug,
466 Clone,
467 Copy,
468 PartialEq,
469 Eq,
470 derive_more::Display,
471 Serialize,
472 Deserialize,
473 schemars::JsonSchema,
474)]
475#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
476#[serde(tag = "type", rename_all = "snake_case")]
477pub enum PersistKind {
478 #[display("RandomU64({min}, {max_inclusive})")]
479 RandomU64 {
480 min: u64,
481 max_inclusive: u64,
482 },
483 #[display("RandomString({min_length}, {max_length_exclusive})")]
484 RandomString {
485 min_length: u64,
486 max_length_exclusive: u64,
487 },
488 ExecutionId,
489}
490
491#[must_use]
492pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
493 value.to_be_bytes()
494}
495
496#[derive(
497 derive_more::Debug,
498 Clone,
499 PartialEq,
500 Eq,
501 derive_more::Display,
502 Serialize,
503 Deserialize,
504 schemars::JsonSchema,
505)]
506#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
507#[serde(tag = "type", rename_all = "snake_case")]
508pub enum HistoryEvent {
510 #[display("Persist")]
512 Persist {
513 #[debug(skip)]
514 value: Vec<u8>, kind: PersistKind,
516 },
517 #[display("JoinSetCreate({join_set_id})")]
518 JoinSetCreate { join_set_id: JoinSetId },
519 #[display("JoinSetRequest({request})")]
520 JoinSetRequest {
522 join_set_id: JoinSetId,
523 request: JoinSetRequest,
524 },
525 #[display("JoinNext({join_set_id})")]
531 JoinNext {
532 join_set_id: JoinSetId,
533 run_expires_at: DateTime<Utc>,
536 requested_ffqn: Option<FunctionFqn>,
539 closing: bool,
541 },
542 #[display("JoinNextTry({join_set_id}, {outcome})")]
544 JoinNextTry {
545 join_set_id: JoinSetId,
546 outcome: JoinNextTryOutcome,
547 },
548 #[display("JoinNextTooMany({join_set_id})")]
550 JoinNextTooMany {
551 join_set_id: JoinSetId,
552 requested_ffqn: Option<FunctionFqn>,
555 },
556 #[display("Schedule({execution_id}, {schedule_at})")]
557 Schedule {
558 execution_id: ExecutionId,
559 schedule_at: HistoryEventScheduleAt, #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
561 result: Result<(), ScheduleRequestError>,
562 },
563 #[display("Stub({target_execution_id})")]
564 Stub {
565 target_execution_id: ExecutionIdDerived,
566 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StubRetVal::Typed(crate::SUPPORTED_RETURN_VALUE_OK_EMPTY).hash()))]
567 retval_hash: StubRetValHash,
568 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
569 result: Result<(), StubError>,
570 },
571}
572
573#[derive(derive_more::Debug, Clone, PartialEq, Eq)]
575#[cfg_attr(any(test, feature = "test"), derive(Serialize, Deserialize))]
576#[cfg_attr(any(test, feature = "test"), serde(rename_all = "snake_case"))]
577pub enum StubRetVal {
578 Typed(SupportedFunctionReturnValue),
579 Untyped(String),
580}
581
582impl StubRetVal {
583 #[must_use]
585 pub fn hash(&self) -> StubRetValHash {
586 use sha2::{Digest as _, Sha256};
587 const STUB_RETVAL_HASH_VERSION: u8 = 1;
588 let mut hasher = Sha256::default();
589
590 match self {
591 StubRetVal::Typed(val) => {
592 hasher.update(b"T|");
593 let json = serde_json::to_string(val)
595 .expect("SupportedFunctionReturnValue is always serializable");
596 hasher.update(json.as_bytes());
597 }
598 StubRetVal::Untyped(s) => {
599 hasher.update(b"U|");
600 hasher.update(s.as_bytes());
601 }
602 }
603
604 let hash_bytes = hasher.finalize();
605 let mut result = [0u8; 33];
606 result[0] = STUB_RETVAL_HASH_VERSION;
607 result[1..].copy_from_slice(&hash_bytes);
608
609 StubRetValHash(result)
610 }
611}
612
613#[derive(
616 Clone,
617 PartialEq,
618 Eq,
619 serde_with::SerializeDisplay,
620 serde_with::DeserializeFromStr,
621 schemars::JsonSchema,
622)]
623#[schemars(with = "String")]
624pub struct StubRetValHash([u8; 33]);
625
626impl Display for StubRetValHash {
627 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
628 for b in self.0 {
629 write!(f, "{b:02x}")?;
630 }
631 Ok(())
632 }
633}
634
635impl Debug for StubRetValHash {
636 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
637 Display::fmt(self, f)
638 }
639}
640
641impl std::str::FromStr for StubRetValHash {
642 type Err = StubRetValHashParseError;
643
644 fn from_str(s: &str) -> Result<Self, Self::Err> {
645 if s.len() != 66 {
646 return Err(StubRetValHashParseError::InvalidLength(s.len()));
648 }
649 let mut bytes = [0u8; 33];
650 for i in 0..33 {
651 let chunk = &s[i * 2..i * 2 + 2];
652 bytes[i] =
653 u8::from_str_radix(chunk, 16).map_err(|_| StubRetValHashParseError::InvalidHex)?;
654 }
655 Ok(StubRetValHash(bytes))
656 }
657}
658
659#[derive(Debug, thiserror::Error)]
660pub enum StubRetValHashParseError {
661 #[error("invalid length: expected 66 hex chars, got {0}")]
662 InvalidLength(usize),
663 #[error("invalid hex character")]
664 InvalidHex,
665}
666
667#[derive(
670 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
671)]
672#[serde(rename_all = "snake_case")]
673pub enum StubError {
674 #[error("execution not found")]
675 ExecutionNotFound,
676 #[error("type check error: {0}")]
677 TypeCheckError(String),
678 #[error("conflict")]
679 Conflict,
680}
681
682#[derive(
684 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
685)]
686#[serde(rename_all = "snake_case")]
687pub enum ScheduleRequestError {
688 #[error("function not found")]
689 FunctionNotFound,
690 #[error("params parsing error: {0}")]
691 TypeCheckError(String),
692}
693
694#[derive(
696 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
697)]
698#[serde(rename_all = "snake_case")]
699pub enum ChildExecutionRequestError {
700 #[error("function not found")]
701 FunctionNotFound,
702 #[error("params parsing error: {0}")]
703 TypeCheckError(String),
704}
705
706#[derive(
707 Debug,
708 Clone,
709 Copy,
710 PartialEq,
711 Eq,
712 derive_more::Display,
713 Serialize,
714 Deserialize,
715 schemars::JsonSchema,
716)]
717#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
718#[serde(rename_all = "snake_case")]
719pub enum JoinNextTryOutcome {
720 #[display("found")]
722 Found,
723 #[display("pending")]
725 Pending,
726 #[display("all_processed")]
728 AllProcessed,
729}
730
731impl From<bool> for JoinNextTryOutcome {
732 fn from(found_response: bool) -> Self {
736 if found_response {
737 JoinNextTryOutcome::Found
738 } else {
739 JoinNextTryOutcome::Pending
740 }
741 }
742}
743
744#[derive(
745 Debug,
746 Clone,
747 Copy,
748 PartialEq,
749 Eq,
750 derive_more::Display,
751 Serialize,
752 Deserialize,
753 schemars::JsonSchema,
754)]
755#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
756#[serde(rename_all = "snake_case")]
757pub enum HistoryEventScheduleAt {
758 Now,
759 #[display("At(`{_0}`)")]
760 At(DateTime<Utc>),
761 #[display("In({_0:?})")]
762 In(Duration),
763}
764
765#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
766pub enum ScheduleAtConversionError {
767 #[error("source duration value is out of range")]
768 OutOfRangeError,
769}
770
771impl HistoryEventScheduleAt {
772 pub fn as_date_time(
773 &self,
774 now: DateTime<Utc>,
775 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
776 match self {
777 Self::Now => Ok(now),
778 Self::At(date_time) => Ok(*date_time),
779 Self::In(duration) => {
780 let time_delta = TimeDelta::from_std(*duration)
781 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
782 now.checked_add_signed(time_delta)
783 .ok_or(ScheduleAtConversionError::OutOfRangeError)
784 }
785 }
786 }
787}
788
789#[derive(
790 Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize, schemars::JsonSchema,
791)]
792#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
793#[serde(tag = "type", rename_all = "snake_case")]
794pub enum JoinSetRequest {
795 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
797 DelayRequest {
798 delay_id: DelayId,
799 expires_at: DateTime<Utc>,
800 schedule_at: HistoryEventScheduleAt,
801 },
802 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
804 ChildExecutionRequest {
805 child_execution_id: ExecutionIdDerived,
806 target_ffqn: FunctionFqn,
807 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
808 params: Params,
809 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
810 result: Result<(), ChildExecutionRequestError>,
811 },
812}
813
814#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
816pub enum DbErrorGeneric {
817 #[error("database error: {reason}")]
818 Uncategorized {
819 reason: StrVariant,
820 #[eq(skip)]
821 #[partial_eq(skip)]
822 context: SpanTrace,
823 #[eq(skip)]
824 #[partial_eq(skip)]
825 #[source]
826 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
827 loc: &'static Location<'static>,
828 },
829 #[error("database was closed")]
830 Close,
831}
832
833#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
834pub enum DbErrorWriteNonRetriable {
835 #[error("validation failed: {0}")]
836 ValidationFailed(StrVariant),
837 #[error("conflict")]
838 Conflict,
839 #[error("already finished")]
840 AlreadyFinished,
841 #[error("illegal state: {reason}")]
842 IllegalState {
843 reason: StrVariant,
844 #[eq(skip)]
845 #[partial_eq(skip)]
846 context: SpanTrace,
847 #[eq(skip)]
848 #[partial_eq(skip)]
849 #[source]
850 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
851 loc: &'static Location<'static>,
852 },
853 #[error("version conflict: expected: {expected}, got: {requested}")]
854 VersionConflict {
855 expected: Version,
856 requested: Version,
857 },
858}
859
860#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
862pub enum DbErrorWrite {
863 #[error("cannot write - row not found")]
864 NotFound,
865 #[error("non-retriable error: {0}")]
866 NonRetriable(#[from] DbErrorWriteNonRetriable),
867 #[error(transparent)]
868 Generic(#[from] DbErrorGeneric),
869}
870
871#[derive(Debug, Clone, thiserror::Error, PartialEq)]
873pub enum DbErrorRead {
874 #[error("cannot read - row not found")]
875 NotFound,
876 #[error(transparent)]
877 Generic(#[from] DbErrorGeneric),
878}
879
880#[derive(Debug, thiserror::Error, PartialEq)]
881pub enum DbErrorReadWithTimeout {
882 #[error("timeout")]
883 Timeout(TimeoutOutcome),
884 #[error(transparent)]
885 DbErrorRead(#[from] DbErrorRead),
886}
887
888pub type AppendResponse = Version;
891pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
892
893#[derive(Debug, Clone)]
894pub struct LockedExecution {
895 pub execution_id: ExecutionId,
896 pub next_version: Version,
897 pub metadata: ExecutionMetadata,
898 pub locked_event: Locked,
899 pub ffqn: FunctionFqn,
900 pub params: Params,
901 pub event_history: Vec<(HistoryEvent, Version)>,
902 pub responses: Vec<ResponseWithCursor>,
903 pub parent: Option<(ExecutionId, JoinSetId)>,
904 pub intermittent_event_count: u32,
905}
906
907pub type LockPendingResponse = Vec<LockedExecution>;
908pub type AppendBatchResponse = Version;
909
910#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
911#[display("{event}")]
912pub struct AppendRequest {
913 pub created_at: DateTime<Utc>,
914 pub event: ExecutionRequest,
915}
916
917#[derive(Debug, Clone)]
918pub struct CreateRequest {
919 pub created_at: DateTime<Utc>,
920 pub execution_id: ExecutionId,
921 pub ffqn: FunctionFqn,
922 pub params: Params,
923 pub parent: Option<(ExecutionId, JoinSetId)>,
924 pub scheduled_at: DateTime<Utc>,
925 pub component_id: ComponentId,
926 pub deployment_id: DeploymentId,
927 pub metadata: ExecutionMetadata,
928 pub scheduled_by: Option<ExecutionId>,
929}
930
931impl From<CreateRequest> for ExecutionRequest {
932 fn from(value: CreateRequest) -> Self {
933 Self::Created {
934 ffqn: value.ffqn,
935 params: value.params,
936 parent: value.parent,
937 scheduled_at: value.scheduled_at,
938 component_id: value.component_id,
939 deployment_id: value.deployment_id,
940 metadata: value.metadata,
941 scheduled_by: value.scheduled_by,
942 }
943 }
944}
945
946#[async_trait]
947pub trait DbPool: Send + Sync {
948 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
949
950 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
951
952 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
953
954 #[cfg(feature = "test")]
955 async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
956}
957
958#[async_trait]
959pub trait DbPoolCloseable {
960 async fn close(&self);
961}
962
963#[derive(Clone, Debug)]
964pub struct AppendEventsToExecution {
965 pub execution_id: ExecutionId,
966 pub version: Version,
967 pub batch: Vec<AppendRequest>,
968}
969
970#[derive(Clone, Debug)]
971pub struct AppendResponseToExecution {
972 pub parent_execution_id: ExecutionId,
973 pub created_at: DateTime<Utc>,
974 pub join_set_id: JoinSetId,
975 pub child_execution_id: ExecutionIdDerived,
976 pub finished_version: Version,
977 pub result: SupportedFunctionReturnValue,
978}
979
980#[async_trait]
981pub trait DbExecutor: Send + Sync {
982 #[expect(clippy::too_many_arguments)]
983 async fn lock_pending_by_ffqns(
984 &self,
985 batch_size: u32,
986 pending_at_or_sooner: DateTime<Utc>,
987 ffqns: Arc<[FunctionFqn]>,
988 created_at: DateTime<Utc>,
989 component_id: ComponentId,
990 deployment_id: DeploymentId,
991 executor_id: ExecutorId,
992 lock_expires_at: DateTime<Utc>,
993 run_id: RunId,
994 retry_config: ComponentRetryConfig,
995 ) -> Result<LockPendingResponse, DbErrorWrite>;
996
997 #[expect(clippy::too_many_arguments)]
998 async fn lock_pending_by_component_digest(
999 &self,
1000 batch_size: u32,
1001 pending_at_or_sooner: DateTime<Utc>,
1002 component_id: &ComponentId,
1003 deployment_id: DeploymentId,
1004 created_at: DateTime<Utc>,
1005 executor_id: ExecutorId,
1006 lock_expires_at: DateTime<Utc>,
1007 run_id: RunId,
1008 retry_config: ComponentRetryConfig,
1009 ) -> Result<LockPendingResponse, DbErrorWrite>;
1010
1011 #[cfg(feature = "test")]
1012 #[expect(clippy::too_many_arguments)]
1013 async fn lock_one(
1014 &self,
1015 created_at: DateTime<Utc>,
1016 component_id: ComponentId,
1017 deployment_id: DeploymentId,
1018 execution_id: &ExecutionId,
1019 run_id: RunId,
1020 version: Version,
1021 executor_id: ExecutorId,
1022 lock_expires_at: DateTime<Utc>,
1023 retry_config: ComponentRetryConfig,
1024 ) -> Result<LockedExecution, DbErrorWrite>;
1025
1026 async fn append(
1029 &self,
1030 execution_id: ExecutionId,
1031 version: Version,
1032 req: AppendRequest,
1033 ) -> Result<AppendResponse, DbErrorWrite>;
1034
1035 async fn append_batch_respond_to_parent(
1038 &self,
1039 events: AppendEventsToExecution,
1040 response: AppendResponseToExecution,
1041 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
1043
1044 async fn wait_for_pending_by_ffqn(
1049 &self,
1050 pending_at_or_sooner: DateTime<Utc>,
1051 ffqns: Arc<[FunctionFqn]>,
1052 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1053 );
1054
1055 async fn wait_for_pending_by_component_digest(
1060 &self,
1061 pending_at_or_sooner: DateTime<Utc>,
1062 component_digest: &ComponentDigest,
1063 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1064 );
1065
1066 async fn cancel_activity_with_retries(
1067 &self,
1068 execution_id: &ExecutionId,
1069 cancelled_at: DateTime<Utc>,
1070 ) -> Result<CancelOutcome, DbErrorWrite> {
1071 let mut retries = 5;
1072 loop {
1073 let res = self.cancel_activity(execution_id, cancelled_at).await;
1074 if res.is_ok() || retries == 0 {
1075 return res;
1076 }
1077 retries -= 1;
1078 }
1079 }
1080
1081 async fn get_last_execution_event(
1083 &self,
1084 execution_id: &ExecutionId,
1085 ) -> Result<ExecutionEvent, DbErrorRead>;
1086
1087 async fn cancel_activity(
1088 &self,
1089 execution_id: &ExecutionId,
1090 cancelled_at: DateTime<Utc>,
1091 ) -> Result<CancelOutcome, DbErrorWrite> {
1092 debug!("Determining cancellation state of {execution_id}");
1093
1094 let last_event = self
1095 .get_last_execution_event(execution_id)
1096 .await
1097 .map_err(DbErrorWrite::from)?;
1098 if let ExecutionRequest::Finished {
1099 retval:
1100 SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1101 kind: ExecutionFailureKind::Cancelled,
1102 ..
1103 }),
1104 ..
1105 } = last_event.event
1106 {
1107 return Ok(CancelOutcome::Cancelled);
1108 } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
1109 debug!("Not cancelling, {execution_id} is already finished");
1110 return Ok(CancelOutcome::AlreadyFinished);
1111 }
1112 let finished_version = last_event.version.increment();
1113 let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1114 reason: None,
1115 kind: ExecutionFailureKind::Cancelled,
1116 detail: None,
1117 });
1118 let cancel_request = AppendRequest {
1119 created_at: cancelled_at,
1120 event: ExecutionRequest::Finished {
1121 retval: child_result.clone(),
1122 http_client_traces: None,
1123 },
1124 };
1125 debug!("Cancelling activity {execution_id} at {finished_version}");
1126 if let ExecutionId::Derived(execution_id) = execution_id {
1127 let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
1128 let child_execution_id = ExecutionId::Derived(execution_id.clone());
1129 self.append_batch_respond_to_parent(
1130 AppendEventsToExecution {
1131 execution_id: child_execution_id,
1132 version: finished_version.clone(),
1133 batch: vec![cancel_request],
1134 },
1135 AppendResponseToExecution {
1136 parent_execution_id,
1137 created_at: cancelled_at,
1138 join_set_id: join_set_id.clone(),
1139 child_execution_id: execution_id.clone(),
1140 finished_version,
1141 result: child_result,
1142 },
1143 cancelled_at,
1144 )
1145 .await?;
1146 } else {
1147 self.append(execution_id.clone(), finished_version, cancel_request)
1148 .await?;
1149 }
1150 debug!("Cancelled {execution_id}");
1151 Ok(CancelOutcome::Cancelled)
1152 }
1153}
1154
1155pub enum AppendDelayResponseOutcome {
1156 Success,
1157 AlreadyFinished,
1158 AlreadyCancelled,
1159}
1160
1161#[derive(Debug, Clone, Default)]
1162pub struct ListExecutionsFilter {
1163 pub ffqn_prefix: Option<String>,
1164 pub show_derived: bool,
1165 pub hide_finished: bool,
1166 pub execution_id_prefix: Option<String>,
1167 pub component_digest: Option<ComponentDigest>,
1168 pub deployment_id: Option<DeploymentId>,
1169}
1170
1171#[async_trait]
1172pub trait DbExternalApi: DbConnection {
1173 async fn get_backtrace(
1175 &self,
1176 execution_id: &ExecutionId,
1177 filter: BacktraceFilter,
1178 ) -> Result<BacktraceInfo, DbErrorRead>;
1179
1180 async fn upsert_source_file(
1184 &self,
1185 component_digest: &ComponentDigest,
1186 frame_key: &str,
1187 is_suffix: bool,
1188 content: &str,
1189 ) -> Result<(), DbErrorWrite>;
1190
1191 async fn get_source_file(
1195 &self,
1196 component_digest: &ComponentDigest,
1197 file: &str,
1198 ) -> Result<Option<String>, DbErrorRead>;
1199
1200 async fn list_executions(
1202 &self,
1203 filter: ListExecutionsFilter,
1204 pagination: ExecutionListPagination,
1205 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
1206
1207 async fn list_execution_events(
1212 &self,
1213 execution_id: &ExecutionId,
1214 pagination: Pagination<VersionType>,
1215 include_backtrace_id: bool,
1216 ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
1217
1218 async fn list_responses(
1227 &self,
1228 execution_id: &ExecutionId,
1229 pagination: Pagination<u32>,
1230 ) -> Result<ListResponsesResponse, DbErrorRead>;
1231
1232 async fn list_execution_events_responses(
1233 &self,
1234 execution_id: &ExecutionId,
1235 req_since: &Version,
1236 req_max_length: VersionType,
1237 req_include_backtrace_id: bool,
1238 resp_pagination: Pagination<VersionType>,
1239 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
1240
1241 async fn upgrade_execution_component(
1242 &self,
1243 execution_id: &ExecutionId,
1244 old: &ComponentDigest,
1245 new: &ComponentDigest,
1246 ) -> Result<(), DbErrorWrite>;
1247
1248 async fn list_logs(
1249 &self,
1250 execution_id: &ExecutionId,
1251 filter: LogFilter,
1252 pagination: Pagination<u32>,
1253 ) -> Result<ListLogsResponse, DbErrorRead>;
1254
1255 async fn list_deployment_states(
1256 &self,
1257 current_time: DateTime<Utc>,
1258 pagination: Pagination<Option<DeploymentId>>,
1259 include_config_json: bool,
1260 ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1261
1262 async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite>;
1265
1266 async fn activate_deployment(
1267 &self,
1268 deployment_id: DeploymentId,
1269 now: DateTime<Utc>,
1270 ) -> Result<(), DbErrorWrite>;
1271
1272 async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite>;
1276
1277 async fn get_deployment(
1279 &self,
1280 deployment_id: DeploymentId,
1281 ) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1282
1283 #[cfg(feature = "test")]
1286 async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1287
1288 async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1291
1292 async fn list_deployments(
1293 &self,
1294 pagination: Pagination<Option<DeploymentId>>,
1295 ) -> Result<Vec<DeploymentRecord>, DbErrorRead>;
1296
1297 async fn pause_execution(
1299 &self,
1300 execution_id: &ExecutionId,
1301 paused_at: DateTime<Utc>,
1302 ) -> Result<AppendResponse, DbErrorWrite>;
1303
1304 async fn unpause_execution(
1306 &self,
1307 execution_id: &ExecutionId,
1308 unpaused_at: DateTime<Utc>,
1309 ) -> Result<AppendResponse, DbErrorWrite>;
1310}
1311pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1312pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1313 Pagination::OlderThan {
1314 length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1315 cursor: None,
1316 including_cursor: false,
1317 };
1318
1319pub struct DeploymentState {
1320 pub deployment_id: DeploymentId,
1321 pub locked: u32,
1322 pub pending: u32,
1324 pub scheduled: u32,
1326 pub blocked: u32,
1327 pub finished: u32,
1328 pub config_json: Option<String>,
1330 pub created_at: DateTime<Utc>,
1331 pub last_active_at: Option<DateTime<Utc>>,
1333 pub status: DeploymentStatus,
1334}
1335
1336#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1337pub enum DeploymentStatus {
1338 Inactive,
1339 Enqueued,
1341 Active,
1342}
1343
1344impl DeploymentStatus {
1345 #[must_use]
1346 pub fn as_str(&self) -> &'static str {
1347 match self {
1348 DeploymentStatus::Inactive => "inactive",
1349 DeploymentStatus::Enqueued => "enqueued",
1350 DeploymentStatus::Active => "active",
1351 }
1352 }
1353}
1354
1355impl std::str::FromStr for DeploymentStatus {
1356 type Err = StrVariant;
1357 fn from_str(s: &str) -> Result<Self, Self::Err> {
1358 match s {
1359 "inactive" => Ok(DeploymentStatus::Inactive),
1360 "enqueued" => Ok(DeploymentStatus::Enqueued),
1361 "active" => Ok(DeploymentStatus::Active),
1362 _ => Err(StrVariant::from(format!("unknown deployment status: {s}"))),
1363 }
1364 }
1365}
1366
1367#[derive(Debug, Clone)]
1368pub struct DeploymentRecord {
1369 pub deployment_id: DeploymentId,
1370 pub created_at: DateTime<Utc>,
1371 pub last_active_at: Option<DateTime<Utc>>,
1373 pub status: DeploymentStatus,
1374 pub config_json: String,
1375 pub obelisk_version: String,
1376 pub created_by: Option<String>,
1377}
1378
1379#[derive(Debug)]
1380pub struct ListLogsResponse {
1381 pub items: Vec<LogEntryRow>,
1382 pub next_page: Pagination<u32>, pub prev_page: Option<Pagination<u32>>, }
1385
1386#[derive(Debug)]
1387pub struct LogFilter {
1388 show_logs: bool,
1389 show_streams: bool,
1390 levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>, }
1393impl LogFilter {
1394 #[must_use]
1396 pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1397 LogFilter {
1398 show_logs: true,
1399 show_streams: false,
1400 levels,
1401 stream_types: Vec::new(),
1402 }
1403 }
1404 #[must_use]
1406 pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1407 LogFilter {
1408 show_logs: false,
1409 show_streams: true,
1410 levels: Vec::new(),
1411 stream_types,
1412 }
1413 }
1414 #[must_use]
1416 pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1417 LogFilter {
1418 show_logs: true,
1419 show_streams: true,
1420 levels,
1421 stream_types,
1422 }
1423 }
1424 #[must_use]
1426 pub fn should_show_logs(&self) -> bool {
1427 self.show_logs
1428 }
1429 #[must_use]
1430 pub fn should_show_streams(&self) -> bool {
1431 self.show_streams
1432 }
1433 #[must_use]
1434 pub fn levels(&self) -> &Vec<LogLevel> {
1435 &self.levels
1436 }
1437 #[must_use]
1438 pub fn stream_types(&self) -> &Vec<LogStreamType> {
1439 &self.stream_types
1440 }
1441}
1442
1443#[derive(Debug, Clone)]
1444pub struct ExecutionWithStateRequestsResponses {
1445 pub execution_with_state: ExecutionWithState,
1446 pub events: Vec<ExecutionEvent>,
1447 pub responses: Vec<ResponseWithCursor>,
1448 pub max_version: Version,
1449 pub max_cursor: ResponseCursor,
1450}
1451
1452#[async_trait]
1453pub trait DbConnection: DbExecutor {
1454 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1456
1457 async fn append_delay_response(
1458 &self,
1459 created_at: DateTime<Utc>,
1460 execution_id: ExecutionId,
1461 join_set_id: JoinSetId,
1462 delay_id: DelayId,
1463 outcome: Result<(), ()>, ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1465
1466 async fn append_batch(
1469 &self,
1470 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
1472 execution_id: ExecutionId,
1473 version: Version,
1474 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1475
1476 async fn append_batch_create_new_execution(
1479 &self,
1480 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
1483 version: Version,
1484 child_req: Vec<CreateRequest>,
1485 backtraces: Vec<BacktraceInfo>,
1486 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1487
1488 async fn get_execution_event(
1490 &self,
1491 execution_id: &ExecutionId,
1492 version: &Version,
1493 ) -> Result<ExecutionEvent, DbErrorRead>;
1494
1495 #[instrument(skip(self))]
1496 async fn get_create_request(
1497 &self,
1498 execution_id: &ExecutionId,
1499 ) -> Result<CreateRequest, DbErrorRead> {
1500 let execution_event = self
1501 .get_execution_event(execution_id, &Version::new(0))
1502 .await?;
1503 if let ExecutionRequest::Created {
1504 ffqn,
1505 params,
1506 parent,
1507 scheduled_at,
1508 component_id,
1509 deployment_id,
1510 metadata,
1511 scheduled_by,
1512 } = execution_event.event
1513 {
1514 Ok(CreateRequest {
1515 created_at: execution_event.created_at,
1516 execution_id: execution_id.clone(),
1517 ffqn,
1518 params,
1519 parent,
1520 scheduled_at,
1521 component_id,
1522 deployment_id,
1523 metadata,
1524 scheduled_by,
1525 })
1526 } else {
1527 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1528 reason: "execution log must start with creation".into(),
1529 context: SpanTrace::capture(),
1530 source: None,
1531 loc: Location::caller(),
1532 }))
1533 }
1534 }
1535
1536 async fn get_pending_state(
1537 &self,
1538 execution_id: &ExecutionId,
1539 ) -> Result<ExecutionWithState, DbErrorRead>;
1540
1541 async fn get_expired_timers(
1543 &self,
1544 at: DateTime<Utc>,
1545 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1546
1547 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1549
1550 async fn subscribe_to_next_responses(
1557 &self,
1558 execution_id: &ExecutionId,
1559 last_response: ResponseCursor,
1560 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1561 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1562
1563 async fn wait_for_finished_result(
1570 &self,
1571 execution_id: &ExecutionId,
1572 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1573 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1574
1575 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1576
1577 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1578
1579 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1580
1581 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1582
1583 #[cfg(feature = "test")]
1585 async fn get_finished_result(
1586 &self,
1587 execution_id: &ExecutionId,
1588 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1589 self.wait_for_finished_result(
1590 execution_id,
1591 Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1592 )
1593 .await
1594 }
1595}
1596
1597#[derive(Clone, Debug)]
1598pub struct LogInfoAppendRow {
1599 pub execution_id: ExecutionId,
1600 pub run_id: RunId,
1601 pub log_entry: LogEntry,
1602}
1603
1604#[derive(Debug, Clone)]
1605pub struct LogEntryRow {
1606 pub cursor: u32,
1607 pub run_id: RunId,
1608 pub log_entry: LogEntry,
1609}
1610
1611#[derive(Debug, Clone)]
1612pub enum LogEntry {
1613 Log {
1614 created_at: DateTime<Utc>,
1615 level: LogLevel,
1616 message: String,
1617 },
1618 Stream {
1619 created_at: DateTime<Utc>,
1620 payload: Vec<u8>,
1621 stream_type: LogStreamType,
1622 },
1623}
1624impl LogEntry {
1625 #[must_use]
1626 pub fn created_at(&self) -> DateTime<Utc> {
1627 match self {
1628 LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1629 }
1630 }
1631}
1632
1633#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1634#[try_from(repr)]
1635#[repr(u8)]
1636pub enum LogLevel {
1637 Trace = 1,
1638 Debug,
1639 Info,
1640 Warn,
1641 Error,
1642}
1643#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1644#[try_from(repr)]
1645#[repr(u8)]
1646pub enum LogStreamType {
1647 StdOut = 1,
1648 StdErr,
1649}
1650
1651#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1652pub enum TimeoutOutcome {
1653 Timeout,
1654 Cancel,
1655}
1656
1657#[cfg(feature = "test")]
1658#[async_trait]
1659pub trait DbConnectionTest: DbConnection {
1660 async fn append_response(
1661 &self,
1662 created_at: DateTime<Utc>,
1663 execution_id: ExecutionId,
1664 response_event: JoinSetResponseEvent,
1665 ) -> Result<(), DbErrorWrite>;
1666}
1667
1668#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1669pub enum CancelOutcome {
1670 Cancelled,
1671 AlreadyFinished,
1672}
1673
1674#[instrument(skip(db_connection))]
1675pub async fn stub_execution(
1676 db_connection: &dyn DbConnection,
1677 execution_id: ExecutionIdDerived,
1678 parent_execution_id: ExecutionId,
1679 join_set_id: JoinSetId,
1680 created_at: DateTime<Utc>,
1681 return_value: SupportedFunctionReturnValue,
1682) -> Result<(), DbErrorWrite> {
1683 let stub_finished_version = Version::new(1); let write_attempt = {
1686 let finished_req = AppendRequest {
1687 created_at,
1688 event: ExecutionRequest::Finished {
1689 retval: return_value.clone(),
1690 http_client_traces: None,
1691 },
1692 };
1693 db_connection
1694 .append_batch_respond_to_parent(
1695 AppendEventsToExecution {
1696 execution_id: ExecutionId::Derived(execution_id.clone()),
1697 version: stub_finished_version.clone(),
1698 batch: vec![finished_req],
1699 },
1700 AppendResponseToExecution {
1701 parent_execution_id,
1702 created_at,
1703 join_set_id,
1704 child_execution_id: execution_id.clone(),
1705 finished_version: stub_finished_version.clone(),
1706 result: return_value.clone(),
1707 },
1708 created_at,
1709 )
1710 .await
1711 };
1712 if let Err(write_attempt) = write_attempt {
1713 debug!("Stub write attempt failed - {write_attempt:?}");
1715
1716 let found = db_connection
1717 .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1718 .await?; match found.event {
1720 ExecutionRequest::Finished {
1721 retval: found_result,
1722 ..
1723 } if return_value == found_result => {
1724 Ok(())
1726 }
1727 ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1728 DbErrorWriteNonRetriable::Conflict,
1729 )),
1730 _other => Err(DbErrorWrite::NonRetriable(
1731 DbErrorWriteNonRetriable::IllegalState {
1732 reason: "unexpected execution event at stubbed execution".into(),
1733 context: SpanTrace::capture(),
1734 source: None,
1735 loc: Location::caller(),
1736 },
1737 )),
1738 }
1739 } else {
1740 Ok(())
1741 }
1742}
1743
1744pub async fn cancel_delay(
1745 db_connection: &dyn DbConnection,
1746 delay_id: DelayId,
1747 created_at: DateTime<Utc>,
1748) -> Result<CancelOutcome, DbErrorWrite> {
1749 let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1750 db_connection
1751 .append_delay_response(
1752 created_at,
1753 parent_execution_id,
1754 join_set_id,
1755 delay_id,
1756 Err(()), )
1758 .await
1759 .map(|ok| match ok {
1760 AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1761 CancelOutcome::Cancelled
1762 }
1763 AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1764 })
1765}
1766
1767#[derive(Clone, Debug)]
1768pub enum BacktraceFilter {
1769 First,
1770 Last,
1771 Specific(Version),
1772}
1773
1774#[derive(Clone, Debug, PartialEq, Eq)]
1775pub struct BacktraceInfo {
1776 pub execution_id: ExecutionId,
1777 pub component_id: ComponentId,
1778 pub version_min_including: Version,
1779 pub version_max_excluding: Version,
1780 pub wasm_backtrace: WasmBacktrace,
1781}
1782
1783#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1784pub struct WasmBacktrace {
1785 pub frames: Vec<FrameInfo>,
1786}
1787
1788#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1789pub struct FrameInfo {
1790 pub module: String,
1791 pub func_name: String,
1792 pub symbols: Vec<FrameSymbol>,
1793}
1794
1795#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1796pub struct FrameSymbol {
1797 pub func_name: Option<String>,
1798 pub file: Option<String>,
1799 pub line: Option<u32>,
1800 pub col: Option<u32>,
1801}
1802
1803mod wasm_backtrace {
1804 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1805
1806 impl WasmBacktrace {
1807 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1808 if backtrace.frames().is_empty() {
1809 None
1810 } else {
1811 Some(Self {
1812 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1813 })
1814 }
1815 }
1816 }
1817
1818 impl From<&wasmtime::FrameInfo> for FrameInfo {
1819 fn from(frame: &wasmtime::FrameInfo) -> Self {
1820 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1821 let mut func_name = String::new();
1822 wasmtime_environ::demangle_function_name_or_index(
1823 &mut func_name,
1824 frame.func_name(),
1825 frame.func_index() as usize,
1826 )
1827 .expect("writing to string must succeed");
1828 Self {
1829 module: module_name,
1830 func_name,
1831 symbols: frame
1832 .symbols()
1833 .iter()
1834 .map(std::convert::Into::into)
1835 .collect(),
1836 }
1837 }
1838 }
1839
1840 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1841 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1842 let func_name = symbol.name().map(|name| {
1843 let mut writer = String::new();
1844 wasmtime_environ::demangle_function_name(&mut writer, name)
1845 .expect("writing to string must succeed");
1846 writer
1847 });
1848
1849 Self {
1850 func_name,
1851 file: symbol.file().map(ToString::to_string),
1852 line: symbol.line(),
1853 col: symbol.column(),
1854 }
1855 }
1856 }
1857}
1858#[derive(Debug, Clone, derive_more::Display)]
1859#[display("{execution_id} {pending_state} {component_digest}")]
1860pub struct ExecutionWithState {
1861 pub execution_id: ExecutionId,
1862 pub ffqn: FunctionFqn,
1863 pub pending_state: PendingState,
1864 pub created_at: DateTime<Utc>,
1865 pub first_scheduled_at: DateTime<Utc>,
1866 pub component_digest: ComponentDigest,
1867 pub component_type: ComponentType,
1868 pub deployment_id: DeploymentId,
1869}
1870
1871#[derive(Debug, Clone)]
1872pub enum ExecutionListPagination {
1873 CreatedBy(Pagination<Option<DateTime<Utc>>>),
1874 ExecutionId(Pagination<Option<ExecutionId>>),
1875}
1876impl Default for ExecutionListPagination {
1877 fn default() -> ExecutionListPagination {
1878 ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1879 length: 20,
1880 cursor: None,
1881 including_cursor: false, })
1883 }
1884}
1885impl ExecutionListPagination {
1886 #[must_use]
1887 pub fn length(&self) -> u16 {
1888 match self {
1889 ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1890 ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1891 }
1892 }
1893}
1894
1895#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1896pub enum Pagination<T> {
1897 NewerThan {
1898 length: u16,
1899 cursor: T,
1900 including_cursor: bool,
1901 },
1902 OlderThan {
1903 length: u16,
1904 cursor: T,
1905 including_cursor: bool,
1906 },
1907}
1908impl<T: Clone> Pagination<T> {
1909 pub fn length(&self) -> u16 {
1910 match self {
1911 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1912 }
1913 }
1914
1915 pub fn rel(&self) -> &'static str {
1916 match self {
1917 Pagination::NewerThan {
1918 including_cursor: false,
1919 ..
1920 } => ">",
1921 Pagination::NewerThan {
1922 including_cursor: true,
1923 ..
1924 } => ">=",
1925 Pagination::OlderThan {
1926 including_cursor: false,
1927 ..
1928 } => "<",
1929 Pagination::OlderThan {
1930 including_cursor: true,
1931 ..
1932 } => "<=",
1933 }
1934 }
1935
1936 pub fn is_desc(&self) -> bool {
1937 matches!(self, Pagination::OlderThan { .. })
1938 }
1939
1940 pub fn asc_or_desc(&self) -> &'static str {
1941 if self.is_asc() { "asc" } else { "desc" }
1942 }
1943
1944 pub fn is_asc(&self) -> bool {
1945 !self.is_desc()
1946 }
1947
1948 pub fn cursor(&self) -> &T {
1949 match self {
1950 Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1951 }
1952 }
1953
1954 #[must_use]
1955 pub fn invert(&self) -> Self {
1956 match self {
1957 Pagination::NewerThan {
1958 length,
1959 cursor,
1960 including_cursor,
1961 } => Pagination::OlderThan {
1962 length: *length,
1963 cursor: cursor.clone(),
1964 including_cursor: !including_cursor,
1965 },
1966 Pagination::OlderThan {
1967 length,
1968 cursor,
1969 including_cursor,
1970 } => Pagination::NewerThan {
1971 length: *length,
1972 cursor: cursor.clone(),
1973 including_cursor: !including_cursor,
1974 },
1975 }
1976 }
1977}
1978
1979#[cfg(feature = "test")]
1980pub async fn wait_for_pending_state_fn<T: Debug>(
1981 db_connection: &dyn DbConnectionTest,
1982 execution_id: &ExecutionId,
1983 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1984 timeout: Option<Duration>,
1985) -> Result<T, DbErrorReadWithTimeout> {
1986 tracing::trace!(%execution_id, "Waiting for predicate");
1987 let fut = async move {
1988 loop {
1989 let execution_log = db_connection.get(execution_id).await?;
1990 if let Some(t) = predicate(execution_log) {
1991 tracing::debug!(%execution_id, "Found: {t:?}");
1992 return Ok(t);
1993 }
1994 tokio::time::sleep(Duration::from_millis(10)).await;
1995 }
1996 };
1997
1998 if let Some(timeout) = timeout {
1999 tokio::select! { res = fut => res,
2001 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
2002 }
2003 } else {
2004 fut.await
2005 }
2006}
2007
2008#[derive(Debug, Clone, PartialEq, Eq)]
2009pub enum ExpiredTimer {
2010 Lock(ExpiredLock),
2011 Delay(ExpiredDelay),
2012}
2013
2014#[derive(Debug, Clone, PartialEq, Eq)]
2015pub struct ExpiredLock {
2016 pub execution_id: ExecutionId,
2017 pub locked_at_version: Version,
2019 pub next_version: Version,
2020 pub intermittent_event_count: u32,
2022 pub max_retries: Option<u32>,
2023 pub retry_exp_backoff: Duration,
2024 pub locked_by: LockedBy,
2025}
2026
2027#[derive(Debug, Clone, PartialEq, Eq)]
2028pub struct ExpiredDelay {
2029 pub execution_id: ExecutionId,
2030 pub join_set_id: JoinSetId,
2031 pub delay_id: DelayId,
2032}
2033
2034#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2035#[serde(tag = "status", rename_all = "snake_case")]
2036pub enum PendingState {
2037 Locked(PendingStateLocked),
2039
2040 #[display("PendingAt(`{_0}`)")]
2041 PendingAt(PendingStatePendingAt),
2042
2043 #[display("BlockedByJoinSet({_0})")]
2045 BlockedByJoinSet(PendingStateBlockedByJoinSet),
2046
2047 #[display("Paused({_0})")]
2048 Paused(PendingStatePaused),
2049
2050 #[display("Finished({_0})")]
2051 Finished(PendingStateFinished),
2052}
2053
2054pub enum PendingStateMergedPause {
2055 Locked {
2056 state: PendingStateLocked,
2057 paused: bool,
2058 },
2059 PendingAt {
2060 state: PendingStatePendingAt,
2061 paused: bool,
2062 },
2063 BlockedByJoinSet {
2064 state: PendingStateBlockedByJoinSet,
2065 paused: bool,
2066 },
2067 Finished(PendingStateFinished),
2068}
2069impl From<PendingState> for PendingStateMergedPause {
2070 fn from(state: PendingState) -> Self {
2071 match state {
2072 PendingState::Locked(s) => PendingStateMergedPause::Locked {
2073 state: s,
2074 paused: false,
2075 },
2076
2077 PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
2078 state: s,
2079 paused: false,
2080 },
2081
2082 PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
2083 state: s,
2084 paused: false,
2085 },
2086
2087 PendingState::Paused(paused) => match paused {
2088 PendingStatePaused::Locked(s) => PendingStateMergedPause::Locked {
2089 state: s,
2090 paused: true,
2091 },
2092 PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
2093 state: s,
2094 paused: true,
2095 },
2096 PendingStatePaused::BlockedByJoinSet(s) => {
2097 PendingStateMergedPause::BlockedByJoinSet {
2098 state: s,
2099 paused: true,
2100 }
2101 }
2102 },
2103
2104 PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
2105 }
2106 }
2107}
2108
2109#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2110#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
2111pub struct PendingStateLocked {
2112 pub locked_by: LockedBy,
2113 pub lock_expires_at: DateTime<Utc>,
2114}
2115
2116#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2117#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
2118pub struct PendingStatePendingAt {
2119 pub scheduled_at: DateTime<Utc>,
2120 pub last_lock: Option<LockedBy>,
2122}
2123
2124#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2125#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
2126pub struct PendingStateBlockedByJoinSet {
2127 pub join_set_id: JoinSetId,
2128 pub lock_expires_at: DateTime<Utc>,
2130 pub closing: bool,
2132}
2133
2134#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2136pub enum PendingStatePaused {
2137 #[display("Locked({_0})")]
2138 Locked(PendingStateLocked),
2139 #[display("PendingAt({_0})")]
2140 PendingAt(PendingStatePendingAt),
2141 #[display("BlockedByJoinSet({_0})")]
2142 BlockedByJoinSet(PendingStateBlockedByJoinSet),
2143}
2144
2145#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2146pub struct LockedBy {
2147 pub executor_id: ExecutorId,
2148 pub run_id: RunId,
2149}
2150impl From<&Locked> for LockedBy {
2151 fn from(value: &Locked) -> Self {
2152 LockedBy {
2153 executor_id: value.executor_id,
2154 run_id: value.run_id,
2155 }
2156 }
2157}
2158
2159#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2160#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
2161pub struct PendingStateFinished {
2162 pub version: VersionType, pub finished_at: DateTime<Utc>,
2164 pub result_kind: PendingStateFinishedResultKind,
2165}
2166impl Display for PendingStateFinished {
2167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2168 match self.result_kind {
2169 PendingStateFinishedResultKind::Ok => write!(f, "ok"),
2170 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
2171 }
2172 }
2173}
2174
2175#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2177#[serde(rename_all = "snake_case")]
2178pub enum PendingStateFinishedResultKind {
2179 Ok,
2180 Err(PendingStateFinishedError),
2181}
2182impl PendingStateFinishedResultKind {
2183 pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
2184 match self {
2185 PendingStateFinishedResultKind::Ok => Ok(()),
2186 PendingStateFinishedResultKind::Err(err) => Err(err),
2187 }
2188 }
2189}
2190
2191impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
2192 fn from(result: &SupportedFunctionReturnValue) -> Self {
2193 result.as_pending_state_finished_result()
2194 }
2195}
2196
2197#[derive(
2198 Debug,
2199 Clone,
2200 Copy,
2201 PartialEq,
2202 Eq,
2203 Serialize,
2204 Deserialize,
2205 derive_more::Display,
2206 schemars::JsonSchema,
2207)]
2208#[serde(rename_all = "snake_case")]
2209pub enum PendingStateFinishedError {
2210 #[display("execution terminated: {_0}")]
2211 ExecutionFailure(ExecutionFailureKind),
2212 #[display("execution completed with an error")]
2213 Error,
2214}
2215
2216impl PendingState {
2217 #[instrument(skip(self))]
2218 pub fn can_append_lock(
2219 &self,
2220 created_at: DateTime<Utc>,
2221 executor_id: ExecutorId,
2222 run_id: RunId,
2223 lock_expires_at: DateTime<Utc>,
2224 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
2225 if lock_expires_at <= created_at {
2226 return Err(DbErrorWriteNonRetriable::ValidationFailed(
2227 "invalid expiry date".into(),
2228 ));
2229 }
2230 match self {
2231 PendingState::PendingAt(PendingStatePendingAt {
2232 scheduled_at,
2233 last_lock,
2234 }) => {
2235 if *scheduled_at <= created_at {
2236 Ok(LockKind::CreatingNewLock)
2238 } else if let Some(LockedBy {
2239 executor_id: last_executor_id,
2240 run_id: last_run_id,
2241 }) = last_lock
2242 && executor_id == *last_executor_id
2243 && run_id == *last_run_id
2244 {
2245 Ok(LockKind::Extending)
2247 } else {
2248 Err(DbErrorWriteNonRetriable::ValidationFailed(
2249 "cannot lock, not yet pending".into(),
2250 ))
2251 }
2252 }
2253 PendingState::Locked(PendingStateLocked {
2254 locked_by:
2255 LockedBy {
2256 executor_id: current_pending_state_executor_id,
2257 run_id: current_pending_state_run_id,
2258 },
2259 lock_expires_at: _,
2260 }) => {
2261 if executor_id == *current_pending_state_executor_id
2262 && run_id == *current_pending_state_run_id
2263 {
2264 Ok(LockKind::Extending)
2266 } else {
2267 Err(DbErrorWriteNonRetriable::IllegalState {
2268 reason: "cannot lock, already locked".into(),
2269 context: SpanTrace::capture(),
2270 source: None,
2271 loc: Location::caller(),
2272 })
2273 }
2274 }
2275 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2276 reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
2277 context: SpanTrace::capture(),
2278 source: None,
2279 loc: Location::caller(),
2280 }),
2281 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2282 reason: "already finished".into(),
2283 context: SpanTrace::capture(),
2284 source: None,
2285 loc: Location::caller(),
2286 }),
2287 PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
2288 reason: "cannot lock, execution is paused".into(),
2289 context: SpanTrace::capture(),
2290 source: None,
2291 loc: Location::caller(),
2292 }),
2293 }
2294 }
2295
2296 #[must_use]
2297 pub fn is_finished(&self) -> bool {
2298 matches!(self, PendingState::Finished { .. })
2299 }
2300
2301 #[must_use]
2302 pub fn is_paused(&self) -> bool {
2303 matches!(self, PendingState::Paused(_))
2304 }
2305}
2306
2307#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2308pub enum LockKind {
2309 Extending,
2310 CreatingNewLock,
2311}
2312
2313pub mod http_client_trace {
2314 use chrono::{DateTime, Utc};
2315 use serde::{Deserialize, Serialize};
2316
2317 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2318 pub struct HttpClientTrace {
2319 pub req: RequestTrace,
2320 pub resp: Option<ResponseTrace>,
2321 }
2322
2323 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2324 pub struct RequestTrace {
2325 pub sent_at: DateTime<Utc>,
2326 pub uri: String,
2327 pub method: String,
2328 }
2329
2330 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2331 pub struct ResponseTrace {
2332 pub finished_at: DateTime<Utc>,
2333 pub status: Result<u16, String>,
2334 }
2335}
2336
2337#[derive(schemars::JsonSchema)]
2339pub struct DbStorageSchema {
2340 pub execution_event: ExecutionEvent,
2341 pub pending_state: PendingState,
2342 pub join_set_response: JoinSetResponse,
2343 pub wasm_backtrace: WasmBacktrace,
2344}
2345
2346#[cfg(test)]
2347mod tests {
2348 use super::HistoryEvent;
2349 use super::HistoryEventScheduleAt;
2350 use super::JoinNextTryOutcome;
2351 use super::PendingStateFinished;
2352 use super::PendingStateFinishedError;
2353 use super::PendingStateFinishedResultKind;
2354 use crate::ExecutionFailureKind;
2355 use crate::JoinSetId;
2356 use crate::SupportedFunctionReturnValue;
2357 use chrono::DateTime;
2358 use chrono::Datelike;
2359 use chrono::Utc;
2360 use insta::assert_snapshot;
2361 use rstest::rstest;
2362 use std::time::Duration;
2363 use val_json::type_wrapper::TypeWrapper;
2364 use val_json::wast_val::WastVal;
2365 use val_json::wast_val::WastValWithType;
2366
2367 #[rstest(expected => [
2368 PendingStateFinishedResultKind::Ok,
2369 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2370 ])]
2371 #[test]
2372 fn serde_pending_state_finished_result_kind_should_work(
2373 expected: PendingStateFinishedResultKind,
2374 ) {
2375 let ser = serde_json::to_string(&expected).unwrap();
2376 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2377 assert_eq!(expected, actual);
2378 }
2379
2380 #[rstest(result_kind => [
2381 PendingStateFinishedResultKind::Ok,
2382 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2383 ])]
2384 #[test]
2385 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2386 let expected = PendingStateFinished {
2387 version: 0,
2388 finished_at: Utc::now(),
2389 result_kind,
2390 };
2391
2392 let ser = serde_json::to_string(&expected).unwrap();
2393 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2394 assert_eq!(expected, actual);
2395 }
2396
2397 #[test]
2398 fn join_set_deser_with_result_ok_option_none_should_work() {
2399 let expected = SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2400 r#type: TypeWrapper::Result {
2401 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2402 err: Some(Box::new(TypeWrapper::String)),
2403 },
2404 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2405 }));
2406 let json = serde_json::to_string(&expected).unwrap();
2407 assert_snapshot!(json);
2408
2409 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2410
2411 assert_eq!(expected, actual);
2412 }
2413
2414 #[test]
2415 fn as_date_time_should_work_with_duration_u32_max_secs() {
2416 let duration = Duration::from_secs(u64::from(u32::MAX));
2417 let schedule_at = HistoryEventScheduleAt::In(duration);
2418 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2419 assert_eq!(2106, resolved.year());
2420 }
2421
2422 const MILLIS_PER_SEC: i64 = 1000;
2423 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2424
2425 #[test]
2426 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2427 let duration = Duration::from_secs(
2429 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2430 );
2431 let schedule_at = HistoryEventScheduleAt::In(duration);
2432 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2433 }
2434
2435 #[test]
2436 fn join_next_try_outcome_new_format() {
2437 let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"found"}"#;
2438 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2439 assert_eq!(
2440 event,
2441 HistoryEvent::JoinNextTry {
2442 join_set_id: JoinSetId::new(
2443 crate::JoinSetKind::Named,
2444 crate::StrVariant::Static("test")
2445 )
2446 .unwrap(),
2447 outcome: JoinNextTryOutcome::Found,
2448 }
2449 );
2450
2451 let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"all_processed"}"#;
2452 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2453 assert_eq!(
2454 event,
2455 HistoryEvent::JoinNextTry {
2456 join_set_id: JoinSetId::new(
2457 crate::JoinSetKind::Named,
2458 crate::StrVariant::Static("test")
2459 )
2460 .unwrap(),
2461 outcome: JoinNextTryOutcome::AllProcessed,
2462 }
2463 );
2464 }
2465
2466 #[test]
2467 fn join_next_try_outcome_serializes_new_format() {
2468 let event = HistoryEvent::JoinNextTry {
2469 join_set_id: JoinSetId::new(
2470 crate::JoinSetKind::Named,
2471 crate::StrVariant::Static("test"),
2472 )
2473 .unwrap(),
2474 outcome: JoinNextTryOutcome::AllProcessed,
2475 };
2476 let json = serde_json::to_string(&event).unwrap();
2477 assert!(
2478 json.contains(r#""outcome":"all_processed""#),
2479 "expected outcome field, got: {json}"
2480 );
2481 assert!(
2482 !json.contains("found_response"),
2483 "should not contain old field, got: {json}"
2484 );
2485 }
2486
2487 mod stub_retval_hash {
2488 use super::super::{StubRetVal, StubRetValHash};
2489 use crate::SupportedFunctionReturnValue;
2490 use val_json::type_wrapper::TypeWrapper;
2491 use val_json::wast_val::{WastVal, WastValWithType};
2492
2493 #[test]
2494 fn typed_variant_hash_is_stable() {
2495 let retval =
2496 StubRetVal::Typed(SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2497 r#type: TypeWrapper::String,
2498 value: WastVal::String("hello".into()),
2499 })));
2500 let hash = retval.hash();
2501 assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2503 assert_eq!(hash.to_string().len(), 66);
2505 }
2506
2507 #[test]
2508 fn untyped_variant_hash_is_stable() {
2509 let retval = StubRetVal::Untyped(r#"{"ok": "hello"}"#.to_string());
2510 let hash = retval.hash();
2511 assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2513 assert_eq!(hash.to_string().len(), 66);
2515 }
2516
2517 #[test]
2518 fn different_values_produce_different_hashes() {
2519 let typed1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2520 let typed2 = StubRetVal::Typed(SupportedFunctionReturnValue::Err(None));
2521 let untyped1 = StubRetVal::Untyped("value1".to_string());
2522 let untyped2 = StubRetVal::Untyped("value2".to_string());
2523
2524 let hashes: Vec<_> = [typed1, typed2, untyped1, untyped2]
2525 .into_iter()
2526 .map(|r| r.hash().to_string())
2527 .collect();
2528
2529 for (i, h1) in hashes.iter().enumerate() {
2531 for h2 in hashes.iter().skip(i + 1) {
2532 assert_ne!(h1, h2, "hashes should be different");
2533 }
2534 }
2535 }
2536
2537 #[test]
2538 fn same_values_produce_same_hashes() {
2539 let retval1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2540 let retval2 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2541 assert_eq!(retval1.hash(), retval2.hash());
2542
2543 let untyped1 = StubRetVal::Untyped("test".to_string());
2544 let untyped2 = StubRetVal::Untyped("test".to_string());
2545 assert_eq!(untyped1.hash(), untyped2.hash());
2546 }
2547
2548 #[test]
2549 fn hash_serialization_roundtrip() {
2550 let retval = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2551 let hash = retval.hash();
2552
2553 let serialized = serde_json::to_string(&hash).unwrap();
2554 let deserialized: StubRetValHash = serde_json::from_str(&serialized).unwrap();
2555
2556 assert_eq!(hash, deserialized);
2557 }
2558
2559 #[test]
2560 fn hash_display_and_fromstr_roundtrip() {
2561 let retval = StubRetVal::Untyped("test value".to_string());
2562 let hash = retval.hash();
2563
2564 let display = hash.to_string();
2565 let parsed: StubRetValHash = display.parse().unwrap();
2566
2567 assert_eq!(hash, parsed);
2568 }
2569
2570 #[test]
2571 fn typed_and_untyped_with_same_content_produce_different_hashes() {
2572 let typed = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2574 let json_of_typed =
2575 serde_json::to_string(&SupportedFunctionReturnValue::Ok(None)).unwrap();
2576 let untyped = StubRetVal::Untyped(json_of_typed);
2577
2578 assert_ne!(typed.hash(), untyped.hash());
2579 }
2580 }
2581}