1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionError;
8use crate::FunctionFqn;
9use crate::JoinSetId;
10use crate::Params;
11use crate::StrVariant;
12use crate::SupportedFunctionReturnValue;
13use crate::component_id::ComponentDigest;
14use crate::prefixed_ulid::DelayId;
15use crate::prefixed_ulid::DeploymentId;
16use crate::prefixed_ulid::ExecutionIdDerived;
17use crate::prefixed_ulid::ExecutorId;
18use crate::prefixed_ulid::RunId;
19use assert_matches::assert_matches;
20use async_trait::async_trait;
21use chrono::TimeDelta;
22use chrono::{DateTime, Utc};
23use http_client_trace::HttpClientTrace;
24use serde::Deserialize;
25use serde::Serialize;
26use std::fmt::Debug;
27use std::fmt::Display;
28use std::panic::Location;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::time::Duration;
32use tracing::debug;
33use tracing::instrument;
34use tracing_error::SpanTrace;
35
36pub const STATE_PENDING_AT: &str = "pending_at";
38pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
39pub const STATE_LOCKED: &str = "locked";
40pub const STATE_FINISHED: &str = "finished";
41pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; #[derive(Debug, PartialEq, Eq, Clone)]
44pub struct ExecutionLog {
45 pub execution_id: ExecutionId,
46 pub events: Vec<ExecutionEvent>,
47 pub responses: Vec<ResponseWithCursor>,
48 pub next_version: Version, pub pending_state: PendingState, pub component_digest: ComponentDigest, pub component_type: ComponentType,
52 pub deployment_id: DeploymentId, }
54
55impl ExecutionLog {
56 #[must_use]
59 pub fn can_be_retried_after(
60 temporary_event_count: u32,
61 max_retries: Option<u32>,
62 retry_exp_backoff: Duration,
63 ) -> Option<Duration> {
64 if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
66 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
68 Some(duration)
69 } else {
70 None
71 }
72 }
73
74 #[must_use]
75 pub fn compute_retry_duration_when_retrying_forever(
76 temporary_event_count: u32,
77 retry_exp_backoff: Duration,
78 ) -> Duration {
79 Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
80 .expect("`max_retries` set to MAX must never return None")
81 }
82
83 #[must_use]
84 pub fn ffqn(&self) -> &FunctionFqn {
85 assert_matches!(self.events.first(), Some(ExecutionEvent {
86 event: ExecutionRequest::Created { ffqn, .. },
87 ..
88 }) => ffqn)
89 }
90
91 #[must_use]
92 pub fn params(&self) -> &Params {
93 assert_matches!(self.events.first(), Some(ExecutionEvent {
94 event: ExecutionRequest::Created { params, .. },
95 ..
96 }) => params)
97 }
98
99 #[must_use]
100 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
101 assert_matches!(self.events.first(), Some(ExecutionEvent {
102 event: ExecutionRequest::Created { parent, .. },
103 ..
104 }) => parent.clone())
105 }
106
107 #[must_use]
108 pub fn last_event(&self) -> &ExecutionEvent {
109 self.events.last().expect("must contain at least one event")
110 }
111
112 #[must_use]
113 pub fn is_finished(&self) -> bool {
114 matches!(
115 self.events.last(),
116 Some(ExecutionEvent {
117 event: ExecutionRequest::Finished { .. },
118 ..
119 })
120 )
121 }
122
123 #[must_use]
124 pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
125 if let ExecutionEvent {
126 event: ExecutionRequest::Finished { retval: result, .. },
127 ..
128 } = self.events.last().expect("must contain at least one event")
129 {
130 Some(result.clone())
131 } else {
132 None
133 }
134 }
135
136 pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
137 self.events.iter().filter_map(|event| {
138 if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
139 Some((eh.clone(), event.version.clone()))
140 } else {
141 None
142 }
143 })
144 }
145
146 #[cfg(feature = "test")]
147 #[must_use]
148 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
149 self.events
150 .iter()
151 .find_map(move |event| match &event.event {
152 ExecutionRequest::HistoryEvent {
153 event:
154 HistoryEvent::JoinSetRequest {
155 join_set_id: found,
156 request,
157 },
158 ..
159 } if *join_set_id == *found => Some(request),
160 _ => None,
161 })
162 }
163}
164
165pub type VersionType = u32;
166#[derive(
167 Debug,
168 Default,
169 Clone,
170 PartialEq,
171 Eq,
172 Hash,
173 derive_more::Display,
174 derive_more::Into,
175 serde::Serialize,
176 serde::Deserialize,
177 schemars::JsonSchema,
178)]
179#[serde(transparent)]
180#[schemars(transparent)]
181pub struct Version(pub VersionType);
182impl Version {
183 #[must_use]
184 pub fn new(arg: VersionType) -> Version {
185 Version(arg)
186 }
187
188 #[must_use]
189 pub fn increment(&self) -> Version {
190 Version(self.0 + 1)
191 }
192}
193impl TryFrom<i64> for Version {
194 type Error = VersionParseError;
195 fn try_from(value: i64) -> Result<Self, Self::Error> {
196 VersionType::try_from(value)
197 .map(Version::new)
198 .map_err(|_| VersionParseError)
199 }
200}
201impl From<Version> for usize {
202 fn from(value: Version) -> Self {
203 usize::try_from(value.0).expect("16 bit systems are unsupported")
204 }
205}
206impl From<&Version> for usize {
207 fn from(value: &Version) -> Self {
208 usize::try_from(value.0).expect("16 bit systems are unsupported")
209 }
210}
211
212#[derive(Debug, thiserror::Error)]
213#[error("version must be u32")]
214pub struct VersionParseError;
215
216#[derive(
217 Clone,
218 Debug,
219 derive_more::Display,
220 PartialEq,
221 Eq,
222 serde::Serialize,
223 serde::Deserialize,
224 schemars::JsonSchema,
225)]
226#[display("{event}")]
227pub struct ExecutionEvent {
228 pub created_at: DateTime<Utc>,
229 pub event: ExecutionRequest,
230 #[serde(skip_serializing_if = "Option::is_none")]
231 pub backtrace_id: Option<Version>,
232 pub version: Version,
233}
234
235#[derive(
236 Debug,
237 Clone,
238 Copy,
239 PartialEq,
240 Eq,
241 derive_more::Display,
242 derive_more::Into,
243 Serialize, schemars::JsonSchema,
245)]
246pub struct ResponseCursor(pub u32);
247
248#[derive(Debug, Clone, PartialEq, Eq, Serialize , schemars::JsonSchema)]
249pub struct ResponseWithCursor {
250 pub event: JoinSetResponseEventOuter,
251 pub cursor: ResponseCursor,
252}
253
254#[derive(Debug)]
255pub struct ListExecutionEventsResponse {
256 pub events: Vec<ExecutionEvent>,
257 pub max_version: Version,
258}
259
260#[derive(Debug)]
261pub struct ListResponsesResponse {
262 pub responses: Vec<ResponseWithCursor>,
263 pub max_cursor: ResponseCursor,
264}
265
266#[derive(Debug, Clone, PartialEq, Eq, Serialize , schemars::JsonSchema)]
267pub struct JoinSetResponseEventOuter {
268 pub created_at: DateTime<Utc>,
269 pub event: JoinSetResponseEvent,
270}
271
272#[derive(
273 Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
274)]
275pub struct JoinSetResponseEvent {
276 pub join_set_id: JoinSetId,
277 pub event: JoinSetResponse,
278}
279
280#[derive(
281 Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display, schemars::JsonSchema,
282)]
283#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
284#[serde(tag = "type", rename_all = "snake_case")]
285pub enum JoinSetResponse {
286 #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
287 DelayFinished {
288 delay_id: DelayId,
289 result: Result<(), ()>,
290 },
291 #[display("{result}: {child_execution_id}")] ChildExecutionFinished {
293 child_execution_id: ExecutionIdDerived,
294 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
295 finished_version: Version,
296 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
297 result: SupportedFunctionReturnValue,
298 },
299}
300
301pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
302 ffqn: FunctionFqn::new_static("", ""),
303 params: Params::empty(),
304 parent: None,
305 scheduled_at: DateTime::from_timestamp_nanos(0),
306 component_id: ComponentId::dummy_activity(),
307 deployment_id: DeploymentId::from_parts(0, 0),
308 metadata: ExecutionMetadata::empty(),
309 scheduled_by: None,
310};
311pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
312 event: HistoryEvent::JoinSetCreate {
313 join_set_id: JoinSetId {
314 kind: crate::JoinSetKind::OneOff,
315 name: StrVariant::empty(),
316 },
317 },
318};
319
320#[derive(
321 Clone,
322 derive_more::Debug,
323 derive_more::Display,
324 PartialEq,
325 Eq,
326 Serialize,
327 Deserialize,
328 schemars::JsonSchema,
329)]
330#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
331#[serde(rename_all = "snake_case")]
332pub enum ExecutionRequest {
333 #[display("Created({ffqn}, `{scheduled_at}`)")]
334 Created {
335 ffqn: FunctionFqn,
336 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
337 #[debug(skip)]
338 params: Params,
339 parent: Option<(ExecutionId, JoinSetId)>,
340 scheduled_at: DateTime<Utc>,
341 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
342 component_id: ComponentId,
343 deployment_id: DeploymentId,
344 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
345 metadata: ExecutionMetadata,
346 scheduled_by: Option<ExecutionId>,
347 },
348 Locked(Locked),
349 #[display("Unlocked(`{backoff_expires_at}`)")]
355 Unlocked {
356 backoff_expires_at: DateTime<Utc>,
357 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
358 reason: StrVariant,
359 },
360 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
363 TemporarilyFailed {
364 backoff_expires_at: DateTime<Utc>,
365 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
366 reason: StrVariant,
367 detail: Option<String>,
368 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
369 http_client_traces: Option<Vec<HttpClientTrace>>,
370 },
371 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
374 TemporarilyTimedOut {
375 backoff_expires_at: DateTime<Utc>,
376 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
377 http_client_traces: Option<Vec<HttpClientTrace>>,
378 },
379 #[display("Finished")]
381 Finished {
382 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
383 retval: SupportedFunctionReturnValue,
384 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
385 http_client_traces: Option<Vec<HttpClientTrace>>,
386 },
387
388 #[display("HistoryEvent({event})")]
389 HistoryEvent {
390 event: HistoryEvent,
391 },
392 #[display("Paused")]
393 Paused,
394 #[display("Unpaused")]
395 Unpaused,
396}
397
398impl ExecutionRequest {
399 #[must_use]
400 pub fn is_temporary_event(&self) -> bool {
401 matches!(
402 self,
403 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
404 )
405 }
406
407 #[must_use]
409 pub const fn variant(&self) -> &'static str {
410 match self {
411 ExecutionRequest::Created { .. } => "created",
412 ExecutionRequest::Locked(_) => "locked",
413 ExecutionRequest::Unlocked { .. } => "unlocked",
414 ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
415 ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
416 ExecutionRequest::Finished { .. } => "finished",
417 ExecutionRequest::HistoryEvent { .. } => "history_event",
418 ExecutionRequest::Paused => "paused",
419 ExecutionRequest::Unpaused => "unpaused",
420 }
421 }
422
423 #[must_use]
424 pub fn join_set_id(&self) -> Option<&JoinSetId> {
425 match self {
426 Self::Created {
427 parent: Some((_parent_id, join_set_id)),
428 ..
429 } => Some(join_set_id),
430 Self::HistoryEvent {
431 event:
432 HistoryEvent::JoinSetCreate { join_set_id, .. }
433 | HistoryEvent::JoinSetRequest { join_set_id, .. }
434 | HistoryEvent::JoinNext { join_set_id, .. },
435 } => Some(join_set_id),
436 _ => None,
437 }
438 }
439}
440
441#[derive(
442 Clone,
443 derive_more::Debug,
444 derive_more::Display,
445 PartialEq,
446 Eq,
447 Serialize,
448 Deserialize,
449 schemars::JsonSchema,
450)]
451#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
452#[display("Locked(`{lock_expires_at}`, {component_id})")]
453pub struct Locked {
454 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
455 pub component_id: ComponentId,
456 pub executor_id: ExecutorId,
457 pub deployment_id: DeploymentId,
458 pub run_id: RunId,
459 pub lock_expires_at: DateTime<Utc>,
460 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
461 pub retry_config: ComponentRetryConfig,
462}
463
464#[derive(
465 Debug,
466 Clone,
467 Copy,
468 PartialEq,
469 Eq,
470 derive_more::Display,
471 Serialize,
472 Deserialize,
473 schemars::JsonSchema,
474)]
475#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
476#[serde(tag = "type", rename_all = "snake_case")]
477pub enum PersistKind {
478 #[display("RandomU64({min}, {max_inclusive})")]
479 RandomU64 {
480 min: u64,
481 max_inclusive: u64,
482 },
483 #[display("RandomString({min_length}, {max_length_exclusive})")]
484 RandomString {
485 min_length: u64,
486 max_length_exclusive: u64,
487 },
488 ExecutionId,
489}
490
491#[must_use]
492pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
493 value.to_be_bytes()
494}
495
496#[derive(
497 derive_more::Debug,
498 Clone,
499 PartialEq,
500 Eq,
501 derive_more::Display,
502 Serialize,
503 Deserialize,
504 schemars::JsonSchema,
505)]
506#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
507#[serde(tag = "type", rename_all = "snake_case")]
508pub enum HistoryEvent {
510 #[display("Persist")]
512 Persist {
513 #[debug(skip)]
514 value: Vec<u8>, kind: PersistKind,
516 },
517 #[display("JoinSetCreate({join_set_id})")]
518 JoinSetCreate { join_set_id: JoinSetId },
519 #[display("JoinSetRequest({request})")]
520 JoinSetRequest {
522 join_set_id: JoinSetId,
523 request: JoinSetRequest,
524 },
525 #[display("JoinNext({join_set_id})")]
531 JoinNext {
532 join_set_id: JoinSetId,
533 run_expires_at: DateTime<Utc>,
536 requested_ffqn: Option<FunctionFqn>,
539 closing: bool,
541 },
542 #[display("JoinNextTry({join_set_id}, {outcome})")]
544 JoinNextTry {
545 join_set_id: JoinSetId,
546 outcome: JoinNextTryOutcome,
547 },
548 #[display("JoinNextTooMany({join_set_id})")]
550 JoinNextTooMany {
551 join_set_id: JoinSetId,
552 requested_ffqn: Option<FunctionFqn>,
555 },
556 #[display("Schedule({execution_id}, {schedule_at})")]
557 Schedule {
558 execution_id: ExecutionId,
559 schedule_at: HistoryEventScheduleAt, #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
561 result: Result<(), ScheduleRequestError>,
562 },
563 #[display("Stub({target_execution_id})")]
564 Stub {
565 target_execution_id: ExecutionIdDerived,
566 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StubRetVal::Typed(crate::SUPPORTED_RETURN_VALUE_OK_EMPTY).hash()))]
567 retval_hash: StubRetValHash,
568 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
569 result: Result<(), StubError>,
570 },
571}
572
573#[derive(derive_more::Debug, Clone, PartialEq, Eq)]
575#[cfg_attr(any(test, feature = "test"), derive(Serialize, Deserialize))]
576#[cfg_attr(any(test, feature = "test"), serde(rename_all = "snake_case"))]
577pub enum StubRetVal {
578 Typed(SupportedFunctionReturnValue),
579 Untyped(String),
580}
581
582impl StubRetVal {
583 #[must_use]
585 pub fn hash(&self) -> StubRetValHash {
586 use sha2::{Digest as _, Sha256};
587 const STUB_RETVAL_HASH_VERSION: u8 = 1;
588 let mut hasher = Sha256::default();
589
590 match self {
591 StubRetVal::Typed(val) => {
592 hasher.update(b"T|");
593 let json = serde_json::to_string(val)
595 .expect("SupportedFunctionReturnValue is always serializable");
596 hasher.update(json.as_bytes());
597 }
598 StubRetVal::Untyped(s) => {
599 hasher.update(b"U|");
600 hasher.update(s.as_bytes());
601 }
602 }
603
604 let hash_bytes = hasher.finalize();
605 let mut result = [0u8; 33];
606 result[0] = STUB_RETVAL_HASH_VERSION;
607 result[1..].copy_from_slice(&hash_bytes);
608
609 StubRetValHash(result)
610 }
611}
612
613#[derive(
616 Clone,
617 PartialEq,
618 Eq,
619 serde_with::SerializeDisplay,
620 serde_with::DeserializeFromStr,
621 schemars::JsonSchema,
622)]
623#[schemars(with = "String")]
624pub struct StubRetValHash([u8; 33]);
625
626impl Display for StubRetValHash {
627 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
628 for b in self.0 {
629 write!(f, "{b:02x}")?;
630 }
631 Ok(())
632 }
633}
634
635impl Debug for StubRetValHash {
636 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
637 Display::fmt(self, f)
638 }
639}
640
641impl std::str::FromStr for StubRetValHash {
642 type Err = StubRetValHashParseError;
643
644 fn from_str(s: &str) -> Result<Self, Self::Err> {
645 if s.len() != 66 {
646 return Err(StubRetValHashParseError::InvalidLength(s.len()));
648 }
649 let mut bytes = [0u8; 33];
650 for i in 0..33 {
651 let chunk = &s[i * 2..i * 2 + 2];
652 bytes[i] =
653 u8::from_str_radix(chunk, 16).map_err(|_| StubRetValHashParseError::InvalidHex)?;
654 }
655 Ok(StubRetValHash(bytes))
656 }
657}
658
659#[derive(Debug, thiserror::Error)]
660pub enum StubRetValHashParseError {
661 #[error("invalid length: expected 66 hex chars, got {0}")]
662 InvalidLength(usize),
663 #[error("invalid hex character")]
664 InvalidHex,
665}
666
667#[derive(
670 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
671)]
672#[serde(rename_all = "snake_case")]
673pub enum StubError {
674 #[error("execution not found")]
675 ExecutionNotFound,
676 #[error("type check error: {0}")]
677 TypeCheckError(String),
678 #[error("conflict")]
679 Conflict,
680}
681
682#[derive(
684 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
685)]
686#[serde(rename_all = "snake_case")]
687pub enum ScheduleRequestError {
688 #[error("function not found")]
689 FunctionNotFound,
690 #[error("params parsing error: {0}")]
691 TypeCheckError(String),
692}
693
694#[derive(
696 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
697)]
698#[serde(rename_all = "snake_case")]
699pub enum ChildExecutionRequestError {
700 #[error("function not found")]
701 FunctionNotFound,
702 #[error("params parsing error: {0}")]
703 TypeCheckError(String),
704}
705
706#[derive(
707 Debug,
708 Clone,
709 Copy,
710 PartialEq,
711 Eq,
712 derive_more::Display,
713 Serialize,
714 Deserialize,
715 schemars::JsonSchema,
716)]
717#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
718#[serde(rename_all = "snake_case")]
719pub enum JoinNextTryOutcome {
720 #[display("found")]
722 Found,
723 #[display("pending")]
725 Pending,
726 #[display("all_processed")]
728 AllProcessed,
729}
730
731impl From<bool> for JoinNextTryOutcome {
732 fn from(found_response: bool) -> Self {
736 if found_response {
737 JoinNextTryOutcome::Found
738 } else {
739 JoinNextTryOutcome::Pending
740 }
741 }
742}
743
744#[derive(
745 Debug,
746 Clone,
747 Copy,
748 PartialEq,
749 Eq,
750 derive_more::Display,
751 Serialize,
752 Deserialize,
753 schemars::JsonSchema,
754)]
755#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
756#[serde(rename_all = "snake_case")]
757pub enum HistoryEventScheduleAt {
758 Now,
759 #[display("At(`{_0}`)")]
760 At(DateTime<Utc>),
761 #[display("In({_0:?})")]
762 In(Duration),
763}
764
765#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
766pub enum ScheduleAtConversionError {
767 #[error("source duration value is out of range")]
768 OutOfRangeError,
769}
770
771impl HistoryEventScheduleAt {
772 pub fn as_date_time(
773 &self,
774 now: DateTime<Utc>,
775 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
776 match self {
777 Self::Now => Ok(now),
778 Self::At(date_time) => Ok(*date_time),
779 Self::In(duration) => {
780 let time_delta = TimeDelta::from_std(*duration)
781 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
782 now.checked_add_signed(time_delta)
783 .ok_or(ScheduleAtConversionError::OutOfRangeError)
784 }
785 }
786 }
787}
788
789#[derive(
790 Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize, schemars::JsonSchema,
791)]
792#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
793#[serde(tag = "type", rename_all = "snake_case")]
794pub enum JoinSetRequest {
795 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
797 DelayRequest {
798 delay_id: DelayId,
799 expires_at: DateTime<Utc>,
800 schedule_at: HistoryEventScheduleAt,
801 },
802 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
804 ChildExecutionRequest {
805 child_execution_id: ExecutionIdDerived,
806 target_ffqn: FunctionFqn,
807 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
808 params: Params,
809 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
810 result: Result<(), ChildExecutionRequestError>,
811 },
812}
813
814#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
816pub enum DbErrorGeneric {
817 #[error("database error: {reason}")]
818 Uncategorized {
819 reason: StrVariant,
820 #[eq(skip)]
821 #[partial_eq(skip)]
822 context: SpanTrace,
823 #[eq(skip)]
824 #[partial_eq(skip)]
825 #[source]
826 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
827 loc: &'static Location<'static>,
828 },
829 #[error("database was closed")]
830 Close,
831}
832
833#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
834pub enum DbErrorWriteNonRetriable {
835 #[error("validation failed: {0}")]
836 ValidationFailed(StrVariant),
837 #[error("conflict")]
838 Conflict,
839 #[error("already finished")]
840 AlreadyFinished,
841 #[error("illegal state: {reason}")]
842 IllegalState {
843 reason: StrVariant,
844 #[eq(skip)]
845 #[partial_eq(skip)]
846 context: SpanTrace,
847 #[eq(skip)]
848 #[partial_eq(skip)]
849 #[source]
850 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
851 loc: &'static Location<'static>,
852 },
853 #[error("version conflict: expected: {expected}, got: {requested}")]
854 VersionConflict {
855 expected: Version,
856 requested: Version,
857 },
858}
859
860#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
862pub enum DbErrorWrite {
863 #[error("cannot write - row not found")]
864 NotFound,
865 #[error("non-retriable error: {0}")]
866 NonRetriable(#[from] DbErrorWriteNonRetriable),
867 #[error(transparent)]
868 Generic(#[from] DbErrorGeneric),
869}
870
871#[derive(Debug, Clone, thiserror::Error, PartialEq)]
873pub enum DbErrorRead {
874 #[error("cannot read - row not found")]
875 NotFound,
876 #[error(transparent)]
877 Generic(#[from] DbErrorGeneric),
878}
879
880#[derive(Debug, thiserror::Error, PartialEq)]
881pub enum DbErrorReadWithTimeout {
882 #[error("timeout")]
883 Timeout(TimeoutOutcome),
884 #[error(transparent)]
885 DbErrorRead(#[from] DbErrorRead),
886}
887
888pub type AppendResponse = Version;
891pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
892
893#[derive(Debug, Clone)]
894pub struct LockedExecution {
895 pub execution_id: ExecutionId,
896 pub next_version: Version,
897 pub metadata: ExecutionMetadata,
898 pub locked_event: Locked,
899 pub ffqn: FunctionFqn,
900 pub params: Params,
901 pub event_history: Vec<(HistoryEvent, Version)>,
902 pub responses: Vec<ResponseWithCursor>,
903 pub parent: Option<(ExecutionId, JoinSetId)>,
904 pub intermittent_event_count: u32,
905}
906
907pub type LockPendingResponse = Vec<LockedExecution>;
908pub type AppendBatchResponse = Version;
909
910#[derive(Debug, Clone, derive_more::Display, Serialize, Deserialize)]
911#[display("{event}")]
912pub struct AppendRequest {
913 pub created_at: DateTime<Utc>,
914 pub event: ExecutionRequest,
915}
916
917#[derive(Debug, Clone)]
918pub struct CreateRequest {
919 pub created_at: DateTime<Utc>,
920 pub execution_id: ExecutionId,
921 pub ffqn: FunctionFqn,
922 pub params: Params,
923 pub parent: Option<(ExecutionId, JoinSetId)>,
924 pub scheduled_at: DateTime<Utc>,
925 pub component_id: ComponentId,
926 pub deployment_id: DeploymentId,
927 pub metadata: ExecutionMetadata,
928 pub scheduled_by: Option<ExecutionId>,
929}
930
931impl From<CreateRequest> for ExecutionRequest {
932 fn from(value: CreateRequest) -> Self {
933 Self::Created {
934 ffqn: value.ffqn,
935 params: value.params,
936 parent: value.parent,
937 scheduled_at: value.scheduled_at,
938 component_id: value.component_id,
939 deployment_id: value.deployment_id,
940 metadata: value.metadata,
941 scheduled_by: value.scheduled_by,
942 }
943 }
944}
945
946#[async_trait]
947pub trait DbPool: Send + Sync {
948 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
949
950 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
951
952 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
953
954 #[cfg(feature = "test")]
955 async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
956}
957
958#[async_trait]
959pub trait DbPoolCloseable {
960 async fn close(&self);
961}
962
963#[derive(Clone, Debug)]
964pub struct AppendEventsToExecution {
965 pub execution_id: ExecutionId,
966 pub version: Version,
967 pub batch: Vec<AppendRequest>,
968}
969
970#[derive(Clone, Debug)]
971pub struct AppendResponseToExecution {
972 pub parent_execution_id: ExecutionId,
973 pub created_at: DateTime<Utc>,
974 pub join_set_id: JoinSetId,
975 pub child_execution_id: ExecutionIdDerived,
976 pub finished_version: Version,
977 pub result: SupportedFunctionReturnValue,
978}
979
980#[async_trait]
981pub trait DbExecutor: Send + Sync {
982 #[expect(clippy::too_many_arguments)]
983 async fn lock_pending_by_ffqns(
984 &self,
985 batch_size: u32,
986 pending_at_or_sooner: DateTime<Utc>,
987 ffqns: Arc<[FunctionFqn]>,
988 created_at: DateTime<Utc>,
989 component_id: ComponentId,
990 deployment_id: DeploymentId,
991 executor_id: ExecutorId,
992 lock_expires_at: DateTime<Utc>,
993 run_id: RunId,
994 retry_config: ComponentRetryConfig,
995 ) -> Result<LockPendingResponse, DbErrorWrite>;
996
997 #[expect(clippy::too_many_arguments)]
998 async fn lock_pending_by_component_digest(
999 &self,
1000 batch_size: u32,
1001 pending_at_or_sooner: DateTime<Utc>,
1002 component_id: &ComponentId,
1003 deployment_id: DeploymentId,
1004 created_at: DateTime<Utc>,
1005 executor_id: ExecutorId,
1006 lock_expires_at: DateTime<Utc>,
1007 run_id: RunId,
1008 retry_config: ComponentRetryConfig,
1009 ) -> Result<LockPendingResponse, DbErrorWrite>;
1010
1011 #[cfg(feature = "test")]
1012 #[expect(clippy::too_many_arguments)]
1013 async fn lock_one(
1014 &self,
1015 created_at: DateTime<Utc>,
1016 component_id: ComponentId,
1017 deployment_id: DeploymentId,
1018 execution_id: &ExecutionId,
1019 run_id: RunId,
1020 version: Version,
1021 executor_id: ExecutorId,
1022 lock_expires_at: DateTime<Utc>,
1023 retry_config: ComponentRetryConfig,
1024 ) -> Result<LockedExecution, DbErrorWrite>;
1025
1026 async fn append(
1029 &self,
1030 execution_id: ExecutionId,
1031 version: Version,
1032 req: AppendRequest,
1033 ) -> Result<AppendResponse, DbErrorWrite>;
1034
1035 async fn append_batch_respond_to_parent(
1038 &self,
1039 events: AppendEventsToExecution,
1040 response: AppendResponseToExecution,
1041 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
1043
1044 async fn wait_for_pending_by_ffqn(
1049 &self,
1050 pending_at_or_sooner: DateTime<Utc>,
1051 ffqns: Arc<[FunctionFqn]>,
1052 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1053 );
1054
1055 async fn wait_for_pending_by_component_digest(
1060 &self,
1061 pending_at_or_sooner: DateTime<Utc>,
1062 component_digest: &ComponentDigest,
1063 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1064 );
1065
1066 async fn cancel_activity_with_retries(
1067 &self,
1068 execution_id: &ExecutionId,
1069 cancelled_at: DateTime<Utc>,
1070 ) -> Result<CancelOutcome, DbErrorWrite> {
1071 let mut retries = 5;
1072 loop {
1073 let res = self.cancel_activity(execution_id, cancelled_at).await;
1074 if res.is_ok() || retries == 0 {
1075 return res;
1076 }
1077 retries -= 1;
1078 }
1079 }
1080
1081 async fn get_last_execution_event(
1083 &self,
1084 execution_id: &ExecutionId,
1085 ) -> Result<ExecutionEvent, DbErrorRead>;
1086
1087 async fn cancel_activity(
1088 &self,
1089 execution_id: &ExecutionId,
1090 cancelled_at: DateTime<Utc>,
1091 ) -> Result<CancelOutcome, DbErrorWrite> {
1092 debug!("Determining cancellation state of {execution_id}");
1093
1094 let last_event = self
1095 .get_last_execution_event(execution_id)
1096 .await
1097 .map_err(DbErrorWrite::from)?;
1098 if let ExecutionRequest::Finished {
1099 retval:
1100 SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1101 kind: ExecutionFailureKind::Cancelled,
1102 ..
1103 }),
1104 ..
1105 } = last_event.event
1106 {
1107 return Ok(CancelOutcome::Cancelled);
1108 } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
1109 debug!("Not cancelling, {execution_id} is already finished");
1110 return Ok(CancelOutcome::AlreadyFinished);
1111 }
1112 let finished_version = last_event.version.increment();
1113 let child_result = SupportedFunctionReturnValue::ExecutionError(FinishedExecutionError {
1114 reason: None,
1115 kind: ExecutionFailureKind::Cancelled,
1116 detail: None,
1117 });
1118 let cancel_request = AppendRequest {
1119 created_at: cancelled_at,
1120 event: ExecutionRequest::Finished {
1121 retval: child_result.clone(),
1122 http_client_traces: None,
1123 },
1124 };
1125 debug!("Cancelling activity {execution_id} at {finished_version}");
1126 if let ExecutionId::Derived(execution_id) = execution_id {
1127 let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
1128 let child_execution_id = ExecutionId::Derived(execution_id.clone());
1129 self.append_batch_respond_to_parent(
1130 AppendEventsToExecution {
1131 execution_id: child_execution_id,
1132 version: finished_version.clone(),
1133 batch: vec![cancel_request],
1134 },
1135 AppendResponseToExecution {
1136 parent_execution_id,
1137 created_at: cancelled_at,
1138 join_set_id: join_set_id.clone(),
1139 child_execution_id: execution_id.clone(),
1140 finished_version,
1141 result: child_result,
1142 },
1143 cancelled_at,
1144 )
1145 .await?;
1146 } else {
1147 self.append(execution_id.clone(), finished_version, cancel_request)
1148 .await?;
1149 }
1150 debug!("Cancelled {execution_id}");
1151 Ok(CancelOutcome::Cancelled)
1152 }
1153}
1154
1155pub enum AppendDelayResponseOutcome {
1156 Success,
1157 AlreadyFinished,
1158 AlreadyCancelled,
1159}
1160
1161#[derive(Debug, Clone, Default)]
1162pub struct ListExecutionsFilter {
1163 pub ffqn_prefix: Option<String>,
1164 pub show_derived: bool,
1165 pub hide_finished: bool,
1166 pub execution_id_prefix: Option<String>,
1167 pub component_digest: Option<ComponentDigest>,
1168 pub deployment_id: Option<DeploymentId>,
1169}
1170
1171#[async_trait]
1172pub trait DbExternalApi: DbConnection {
1173 async fn get_backtrace(
1175 &self,
1176 execution_id: &ExecutionId,
1177 filter: BacktraceFilter,
1178 ) -> Result<BacktraceInfo, DbErrorRead>;
1179
1180 async fn upsert_source_file(
1184 &self,
1185 component_digest: &ComponentDigest,
1186 frame_key: &str,
1187 is_suffix: bool,
1188 content: &str,
1189 ) -> Result<(), DbErrorWrite>;
1190
1191 async fn get_source_file(
1195 &self,
1196 component_digest: &ComponentDigest,
1197 file: &str,
1198 ) -> Result<Option<String>, DbErrorRead>;
1199
1200 async fn list_executions(
1202 &self,
1203 filter: ListExecutionsFilter,
1204 pagination: ExecutionListPagination,
1205 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
1206
1207 async fn list_execution_events(
1212 &self,
1213 execution_id: &ExecutionId,
1214 pagination: Pagination<VersionType>,
1215 include_backtrace_id: bool,
1216 ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
1217
1218 async fn list_responses(
1227 &self,
1228 execution_id: &ExecutionId,
1229 pagination: Pagination<u32>,
1230 ) -> Result<ListResponsesResponse, DbErrorRead>;
1231
1232 async fn list_execution_events_responses(
1233 &self,
1234 execution_id: &ExecutionId,
1235 req_since: &Version,
1236 req_max_length: VersionType,
1237 req_include_backtrace_id: bool,
1238 resp_pagination: Pagination<VersionType>,
1239 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
1240
1241 async fn upgrade_execution_component(
1242 &self,
1243 execution_id: &ExecutionId,
1244 old: &ComponentDigest,
1245 new: &ComponentDigest,
1246 ) -> Result<(), DbErrorWrite>;
1247
1248 async fn list_logs(
1249 &self,
1250 execution_id: &ExecutionId,
1251 show_derived: bool,
1252 filter: LogFilter,
1253 pagination: Pagination<DateTime<Utc>>,
1254 ) -> Result<ListLogsResponse, DbErrorRead>;
1255
1256 async fn list_deployment_states(
1257 &self,
1258 current_time: DateTime<Utc>,
1259 pagination: Pagination<Option<DeploymentId>>,
1260 include_config_json: bool,
1261 ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1262
1263 async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite>;
1266
1267 async fn activate_deployment(
1268 &self,
1269 deployment_id: DeploymentId,
1270 now: DateTime<Utc>,
1271 ) -> Result<(), DbErrorWrite>;
1272
1273 async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite>;
1277
1278 async fn get_deployment(
1280 &self,
1281 deployment_id: DeploymentId,
1282 ) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1283
1284 #[cfg(feature = "test")]
1287 async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1288
1289 async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1292
1293 async fn list_deployments(
1294 &self,
1295 pagination: Pagination<Option<DeploymentId>>,
1296 ) -> Result<Vec<DeploymentRecord>, DbErrorRead>;
1297
1298 async fn pause_execution(
1300 &self,
1301 execution_id: &ExecutionId,
1302 paused_at: DateTime<Utc>,
1303 ) -> Result<AppendResponse, DbErrorWrite>;
1304
1305 async fn unpause_execution(
1307 &self,
1308 execution_id: &ExecutionId,
1309 unpaused_at: DateTime<Utc>,
1310 ) -> Result<AppendResponse, DbErrorWrite>;
1311}
1312pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1313pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1314 Pagination::OlderThan {
1315 length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1316 cursor: None,
1317 including_cursor: false,
1318 };
1319
1320pub struct DeploymentState {
1321 pub deployment_id: DeploymentId,
1322 pub locked: u32,
1323 pub pending: u32,
1325 pub scheduled: u32,
1327 pub blocked: u32,
1328 pub finished: u32,
1329 pub config_json: Option<String>,
1331 pub created_at: DateTime<Utc>,
1332 pub last_active_at: Option<DateTime<Utc>>,
1334 pub status: DeploymentStatus,
1335}
1336
1337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1338pub enum DeploymentStatus {
1339 Inactive,
1340 Enqueued,
1342 Active,
1343}
1344
1345impl DeploymentStatus {
1346 #[must_use]
1347 pub fn as_str(&self) -> &'static str {
1348 match self {
1349 DeploymentStatus::Inactive => "inactive",
1350 DeploymentStatus::Enqueued => "enqueued",
1351 DeploymentStatus::Active => "active",
1352 }
1353 }
1354}
1355
1356impl std::str::FromStr for DeploymentStatus {
1357 type Err = StrVariant;
1358 fn from_str(s: &str) -> Result<Self, Self::Err> {
1359 match s {
1360 "inactive" => Ok(DeploymentStatus::Inactive),
1361 "enqueued" => Ok(DeploymentStatus::Enqueued),
1362 "active" => Ok(DeploymentStatus::Active),
1363 _ => Err(StrVariant::from(format!("unknown deployment status: {s}"))),
1364 }
1365 }
1366}
1367
1368#[derive(Debug, Clone)]
1369pub struct DeploymentRecord {
1370 pub deployment_id: DeploymentId,
1371 pub created_at: DateTime<Utc>,
1372 pub last_active_at: Option<DateTime<Utc>>,
1374 pub status: DeploymentStatus,
1375 pub config_json: String,
1376 pub obelisk_version: String,
1377 pub created_by: Option<String>,
1378}
1379
1380#[derive(Debug)]
1381pub struct ListLogsResponse {
1382 pub items: Vec<LogEntryRow>,
1383 pub next_page: Pagination<DateTime<Utc>>, pub prev_page: Option<Pagination<DateTime<Utc>>>, }
1386
1387#[derive(Debug)]
1388pub struct LogFilter {
1389 show_logs: bool,
1390 show_streams: bool,
1391 levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>, }
1394impl LogFilter {
1395 #[must_use]
1397 pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1398 LogFilter {
1399 show_logs: true,
1400 show_streams: false,
1401 levels,
1402 stream_types: Vec::new(),
1403 }
1404 }
1405 #[must_use]
1407 pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1408 LogFilter {
1409 show_logs: false,
1410 show_streams: true,
1411 levels: Vec::new(),
1412 stream_types,
1413 }
1414 }
1415 #[must_use]
1417 pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1418 LogFilter {
1419 show_logs: true,
1420 show_streams: true,
1421 levels,
1422 stream_types,
1423 }
1424 }
1425 #[must_use]
1427 pub fn should_show_logs(&self) -> bool {
1428 self.show_logs
1429 }
1430 #[must_use]
1431 pub fn should_show_streams(&self) -> bool {
1432 self.show_streams
1433 }
1434 #[must_use]
1435 pub fn levels(&self) -> &Vec<LogLevel> {
1436 &self.levels
1437 }
1438 #[must_use]
1439 pub fn stream_types(&self) -> &Vec<LogStreamType> {
1440 &self.stream_types
1441 }
1442}
1443
1444#[derive(Debug, Clone)]
1445pub struct ExecutionWithStateRequestsResponses {
1446 pub execution_with_state: ExecutionWithState,
1447 pub events: Vec<ExecutionEvent>,
1448 pub responses: Vec<ResponseWithCursor>,
1449 pub max_version: Version,
1450 pub max_cursor: ResponseCursor,
1451}
1452
1453#[async_trait]
1454pub trait DbConnection: DbExecutor {
1455 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1457
1458 async fn append_delay_response(
1459 &self,
1460 created_at: DateTime<Utc>,
1461 execution_id: ExecutionId,
1462 join_set_id: JoinSetId,
1463 delay_id: DelayId,
1464 outcome: Result<(), ()>, ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1466
1467 async fn append_batch(
1470 &self,
1471 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
1473 execution_id: ExecutionId,
1474 version: Version,
1475 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1476
1477 async fn append_batch_create_new_execution(
1480 &self,
1481 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
1484 version: Version,
1485 child_req: Vec<CreateRequest>,
1486 backtraces: Vec<BacktraceInfo>,
1487 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1488
1489 async fn get_execution_event(
1491 &self,
1492 execution_id: &ExecutionId,
1493 version: &Version,
1494 ) -> Result<ExecutionEvent, DbErrorRead>;
1495
1496 #[instrument(skip(self))]
1497 async fn get_create_request(
1498 &self,
1499 execution_id: &ExecutionId,
1500 ) -> Result<CreateRequest, DbErrorRead> {
1501 let execution_event = self
1502 .get_execution_event(execution_id, &Version::new(0))
1503 .await?;
1504 if let ExecutionRequest::Created {
1505 ffqn,
1506 params,
1507 parent,
1508 scheduled_at,
1509 component_id,
1510 deployment_id,
1511 metadata,
1512 scheduled_by,
1513 } = execution_event.event
1514 {
1515 Ok(CreateRequest {
1516 created_at: execution_event.created_at,
1517 execution_id: execution_id.clone(),
1518 ffqn,
1519 params,
1520 parent,
1521 scheduled_at,
1522 component_id,
1523 deployment_id,
1524 metadata,
1525 scheduled_by,
1526 })
1527 } else {
1528 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1529 reason: "execution log must start with creation".into(),
1530 context: SpanTrace::capture(),
1531 source: None,
1532 loc: Location::caller(),
1533 }))
1534 }
1535 }
1536
1537 async fn get_pending_state(
1538 &self,
1539 execution_id: &ExecutionId,
1540 ) -> Result<ExecutionWithState, DbErrorRead>;
1541
1542 async fn get_expired_timers(
1544 &self,
1545 at: DateTime<Utc>,
1546 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1547
1548 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1550
1551 async fn subscribe_to_next_responses(
1558 &self,
1559 execution_id: &ExecutionId,
1560 last_response: ResponseCursor,
1561 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1562 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1563
1564 async fn wait_for_finished_result(
1571 &self,
1572 execution_id: &ExecutionId,
1573 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1574 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1575
1576 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1577
1578 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1579
1580 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1581
1582 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1583
1584 #[cfg(feature = "test")]
1586 async fn get_finished_result(
1587 &self,
1588 execution_id: &ExecutionId,
1589 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1590 self.wait_for_finished_result(
1591 execution_id,
1592 Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1593 )
1594 .await
1595 }
1596}
1597
1598#[derive(Clone, Debug)]
1599pub struct LogInfoAppendRow {
1600 pub execution_id: ExecutionId,
1601 pub run_id: RunId,
1602 pub log_entry: LogEntry,
1603}
1604
1605#[derive(Debug, Clone)]
1606pub struct LogEntryRow {
1607 pub cursor: DateTime<Utc>,
1608 pub run_id: RunId,
1609 pub log_entry: LogEntry,
1610 pub execution_id: ExecutionId,
1611}
1612
1613#[derive(Debug, Clone)]
1614pub enum LogEntry {
1615 Log {
1616 created_at: DateTime<Utc>,
1617 level: LogLevel,
1618 message: String,
1619 },
1620 Stream {
1621 created_at: DateTime<Utc>,
1622 payload: Vec<u8>,
1623 stream_type: LogStreamType,
1624 },
1625}
1626impl LogEntry {
1627 #[must_use]
1628 pub fn created_at(&self) -> DateTime<Utc> {
1629 match self {
1630 LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1631 }
1632 }
1633}
1634
1635#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1636#[try_from(repr)]
1637#[repr(u8)]
1638pub enum LogLevel {
1639 Trace = 1,
1640 Debug,
1641 Info,
1642 Warn,
1643 Error,
1644}
1645#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1646#[try_from(repr)]
1647#[repr(u8)]
1648pub enum LogStreamType {
1649 StdOut = 1,
1650 StdErr,
1651}
1652
1653#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1654pub enum TimeoutOutcome {
1655 Timeout,
1656 Cancel,
1657}
1658
1659#[cfg(feature = "test")]
1660#[async_trait]
1661pub trait DbConnectionTest: DbConnection {
1662 async fn append_response(
1663 &self,
1664 created_at: DateTime<Utc>,
1665 execution_id: ExecutionId,
1666 response_event: JoinSetResponseEvent,
1667 ) -> Result<(), DbErrorWrite>;
1668}
1669
1670#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1671pub enum CancelOutcome {
1672 Cancelled,
1673 AlreadyFinished,
1674}
1675
1676#[instrument(skip(db_connection))]
1677pub async fn stub_execution(
1678 db_connection: &dyn DbConnection,
1679 execution_id: ExecutionIdDerived,
1680 parent_execution_id: ExecutionId,
1681 join_set_id: JoinSetId,
1682 created_at: DateTime<Utc>,
1683 return_value: SupportedFunctionReturnValue,
1684) -> Result<(), DbErrorWrite> {
1685 let stub_finished_version = Version::new(1); let write_attempt = {
1688 let finished_req = AppendRequest {
1689 created_at,
1690 event: ExecutionRequest::Finished {
1691 retval: return_value.clone(),
1692 http_client_traces: None,
1693 },
1694 };
1695 db_connection
1696 .append_batch_respond_to_parent(
1697 AppendEventsToExecution {
1698 execution_id: ExecutionId::Derived(execution_id.clone()),
1699 version: stub_finished_version.clone(),
1700 batch: vec![finished_req],
1701 },
1702 AppendResponseToExecution {
1703 parent_execution_id,
1704 created_at,
1705 join_set_id,
1706 child_execution_id: execution_id.clone(),
1707 finished_version: stub_finished_version.clone(),
1708 result: return_value.clone(),
1709 },
1710 created_at,
1711 )
1712 .await
1713 };
1714 if let Err(write_attempt) = write_attempt {
1715 debug!("Stub write attempt failed - {write_attempt:?}");
1717
1718 let found = db_connection
1719 .get_execution_event(&ExecutionId::Derived(execution_id), &stub_finished_version)
1720 .await?; match found.event {
1722 ExecutionRequest::Finished {
1723 retval: found_result,
1724 ..
1725 } if return_value == found_result => {
1726 Ok(())
1728 }
1729 ExecutionRequest::Finished { .. } => Err(DbErrorWrite::NonRetriable(
1730 DbErrorWriteNonRetriable::Conflict,
1731 )),
1732 _other => Err(DbErrorWrite::NonRetriable(
1733 DbErrorWriteNonRetriable::IllegalState {
1734 reason: "unexpected execution event at stubbed execution".into(),
1735 context: SpanTrace::capture(),
1736 source: None,
1737 loc: Location::caller(),
1738 },
1739 )),
1740 }
1741 } else {
1742 Ok(())
1743 }
1744}
1745
1746pub async fn cancel_delay(
1747 db_connection: &dyn DbConnection,
1748 delay_id: DelayId,
1749 created_at: DateTime<Utc>,
1750) -> Result<CancelOutcome, DbErrorWrite> {
1751 let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1752 db_connection
1753 .append_delay_response(
1754 created_at,
1755 parent_execution_id,
1756 join_set_id,
1757 delay_id,
1758 Err(()), )
1760 .await
1761 .map(|ok| match ok {
1762 AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1763 CancelOutcome::Cancelled
1764 }
1765 AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1766 })
1767}
1768
1769#[derive(Clone, Debug)]
1770pub enum BacktraceFilter {
1771 First,
1772 Last,
1773 Specific(Version),
1774}
1775
1776#[derive(Clone, Debug, PartialEq, Eq)]
1777pub struct BacktraceInfo {
1778 pub execution_id: ExecutionId,
1779 pub component_id: ComponentId,
1780 pub version_min_including: Version,
1781 pub version_max_excluding: Version,
1782 pub wasm_backtrace: WasmBacktrace,
1783}
1784
1785#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1786pub struct WasmBacktrace {
1787 pub frames: Vec<FrameInfo>,
1788}
1789
1790#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1791pub struct FrameInfo {
1792 pub module: String,
1793 pub func_name: String,
1794 pub symbols: Vec<FrameSymbol>,
1795}
1796
1797#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1798pub struct FrameSymbol {
1799 pub func_name: Option<String>,
1800 pub file: Option<String>,
1801 pub line: Option<u32>,
1802 pub col: Option<u32>,
1803}
1804
1805mod wasm_backtrace {
1806 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1807
1808 impl WasmBacktrace {
1809 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
1810 if backtrace.frames().is_empty() {
1811 None
1812 } else {
1813 Some(Self {
1814 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
1815 })
1816 }
1817 }
1818 }
1819
1820 impl From<&wasmtime::FrameInfo> for FrameInfo {
1821 fn from(frame: &wasmtime::FrameInfo) -> Self {
1822 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
1823 let mut func_name = String::new();
1824 wasmtime_environ::demangle_function_name_or_index(
1825 &mut func_name,
1826 frame.func_name(),
1827 frame.func_index() as usize,
1828 )
1829 .expect("writing to string must succeed");
1830 Self {
1831 module: module_name,
1832 func_name,
1833 symbols: frame
1834 .symbols()
1835 .iter()
1836 .map(std::convert::Into::into)
1837 .collect(),
1838 }
1839 }
1840 }
1841
1842 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
1843 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
1844 let func_name = symbol.name().map(|name| {
1845 let mut writer = String::new();
1846 wasmtime_environ::demangle_function_name(&mut writer, name)
1847 .expect("writing to string must succeed");
1848 writer
1849 });
1850
1851 Self {
1852 func_name,
1853 file: symbol.file().map(ToString::to_string),
1854 line: symbol.line(),
1855 col: symbol.column(),
1856 }
1857 }
1858 }
1859}
1860#[derive(Debug, Clone, derive_more::Display)]
1861#[display("{execution_id} {pending_state} {component_digest}")]
1862pub struct ExecutionWithState {
1863 pub execution_id: ExecutionId,
1864 pub ffqn: FunctionFqn,
1865 pub pending_state: PendingState,
1866 pub created_at: DateTime<Utc>,
1867 pub first_scheduled_at: DateTime<Utc>,
1868 pub component_digest: ComponentDigest,
1869 pub component_type: ComponentType,
1870 pub deployment_id: DeploymentId,
1871}
1872
1873#[derive(Debug, Clone)]
1874pub enum ExecutionListPagination {
1875 CreatedBy(Pagination<Option<DateTime<Utc>>>),
1876 ExecutionId(Pagination<Option<ExecutionId>>),
1877}
1878impl Default for ExecutionListPagination {
1879 fn default() -> ExecutionListPagination {
1880 ExecutionListPagination::CreatedBy(Pagination::OlderThan {
1881 length: 20,
1882 cursor: None,
1883 including_cursor: false, })
1885 }
1886}
1887impl ExecutionListPagination {
1888 #[must_use]
1889 pub fn length(&self) -> u16 {
1890 match self {
1891 ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
1892 ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
1893 }
1894 }
1895}
1896
1897#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1898pub enum Pagination<T> {
1899 NewerThan {
1900 length: u16,
1901 cursor: T,
1902 including_cursor: bool,
1903 },
1904 OlderThan {
1905 length: u16,
1906 cursor: T,
1907 including_cursor: bool,
1908 },
1909}
1910impl<T: Clone> Pagination<T> {
1911 pub fn length(&self) -> u16 {
1912 match self {
1913 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
1914 }
1915 }
1916
1917 pub fn rel(&self) -> &'static str {
1918 match self {
1919 Pagination::NewerThan {
1920 including_cursor: false,
1921 ..
1922 } => ">",
1923 Pagination::NewerThan {
1924 including_cursor: true,
1925 ..
1926 } => ">=",
1927 Pagination::OlderThan {
1928 including_cursor: false,
1929 ..
1930 } => "<",
1931 Pagination::OlderThan {
1932 including_cursor: true,
1933 ..
1934 } => "<=",
1935 }
1936 }
1937
1938 pub fn is_desc(&self) -> bool {
1939 matches!(self, Pagination::OlderThan { .. })
1940 }
1941
1942 pub fn asc_or_desc(&self) -> &'static str {
1943 if self.is_asc() { "asc" } else { "desc" }
1944 }
1945
1946 pub fn is_asc(&self) -> bool {
1947 !self.is_desc()
1948 }
1949
1950 pub fn cursor(&self) -> &T {
1951 match self {
1952 Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
1953 }
1954 }
1955
1956 #[must_use]
1957 pub fn invert(&self) -> Self {
1958 match self {
1959 Pagination::NewerThan {
1960 length,
1961 cursor,
1962 including_cursor,
1963 } => Pagination::OlderThan {
1964 length: *length,
1965 cursor: cursor.clone(),
1966 including_cursor: !including_cursor,
1967 },
1968 Pagination::OlderThan {
1969 length,
1970 cursor,
1971 including_cursor,
1972 } => Pagination::NewerThan {
1973 length: *length,
1974 cursor: cursor.clone(),
1975 including_cursor: !including_cursor,
1976 },
1977 }
1978 }
1979}
1980
1981#[cfg(feature = "test")]
1982pub async fn wait_for_pending_state_fn<T: Debug>(
1983 db_connection: &dyn DbConnectionTest,
1984 execution_id: &ExecutionId,
1985 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
1986 timeout: Option<Duration>,
1987) -> Result<T, DbErrorReadWithTimeout> {
1988 tracing::trace!(%execution_id, "Waiting for predicate");
1989 let fut = async move {
1990 loop {
1991 let execution_log = db_connection.get(execution_id).await?;
1992 if let Some(t) = predicate(execution_log) {
1993 tracing::debug!(%execution_id, "Found: {t:?}");
1994 return Ok(t);
1995 }
1996 tokio::time::sleep(Duration::from_millis(10)).await;
1997 }
1998 };
1999
2000 if let Some(timeout) = timeout {
2001 tokio::select! { res = fut => res,
2003 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
2004 }
2005 } else {
2006 fut.await
2007 }
2008}
2009
2010#[derive(Debug, Clone, PartialEq, Eq)]
2011pub enum ExpiredTimer {
2012 Lock(ExpiredLock),
2013 Delay(ExpiredDelay),
2014}
2015
2016#[derive(Debug, Clone, PartialEq, Eq)]
2017pub struct ExpiredLock {
2018 pub execution_id: ExecutionId,
2019 pub locked_at_version: Version,
2021 pub next_version: Version,
2022 pub intermittent_event_count: u32,
2024 pub max_retries: Option<u32>,
2025 pub retry_exp_backoff: Duration,
2026 pub locked_by: LockedBy,
2027}
2028
2029#[derive(Debug, Clone, PartialEq, Eq)]
2030pub struct ExpiredDelay {
2031 pub execution_id: ExecutionId,
2032 pub join_set_id: JoinSetId,
2033 pub delay_id: DelayId,
2034}
2035
2036#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2037#[serde(tag = "status", rename_all = "snake_case")]
2038pub enum PendingState {
2039 Locked(PendingStateLocked),
2041
2042 #[display("PendingAt(`{_0}`)")]
2043 PendingAt(PendingStatePendingAt),
2044
2045 #[display("BlockedByJoinSet({_0})")]
2047 BlockedByJoinSet(PendingStateBlockedByJoinSet),
2048
2049 #[display("Paused({_0})")]
2050 Paused(PendingStatePaused),
2051
2052 #[display("Finished({_0})")]
2053 Finished(PendingStateFinished),
2054}
2055
2056pub enum PendingStateMergedPause {
2057 Locked {
2058 state: PendingStateLocked,
2059 paused: bool,
2060 },
2061 PendingAt {
2062 state: PendingStatePendingAt,
2063 paused: bool,
2064 },
2065 BlockedByJoinSet {
2066 state: PendingStateBlockedByJoinSet,
2067 paused: bool,
2068 },
2069 Finished(PendingStateFinished),
2070}
2071impl From<PendingState> for PendingStateMergedPause {
2072 fn from(state: PendingState) -> Self {
2073 match state {
2074 PendingState::Locked(s) => PendingStateMergedPause::Locked {
2075 state: s,
2076 paused: false,
2077 },
2078
2079 PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
2080 state: s,
2081 paused: false,
2082 },
2083
2084 PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
2085 state: s,
2086 paused: false,
2087 },
2088
2089 PendingState::Paused(paused) => match paused {
2090 PendingStatePaused::Locked(s) => PendingStateMergedPause::Locked {
2091 state: s,
2092 paused: true,
2093 },
2094 PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
2095 state: s,
2096 paused: true,
2097 },
2098 PendingStatePaused::BlockedByJoinSet(s) => {
2099 PendingStateMergedPause::BlockedByJoinSet {
2100 state: s,
2101 paused: true,
2102 }
2103 }
2104 },
2105
2106 PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
2107 }
2108 }
2109}
2110
2111#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2112#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
2113pub struct PendingStateLocked {
2114 pub locked_by: LockedBy,
2115 pub lock_expires_at: DateTime<Utc>,
2116}
2117
2118#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2119#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
2120pub struct PendingStatePendingAt {
2121 pub scheduled_at: DateTime<Utc>,
2122 pub last_lock: Option<LockedBy>,
2124}
2125
2126#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2127#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
2128pub struct PendingStateBlockedByJoinSet {
2129 pub join_set_id: JoinSetId,
2130 pub lock_expires_at: DateTime<Utc>,
2132 pub closing: bool,
2134}
2135
2136#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2138pub enum PendingStatePaused {
2139 #[display("Locked({_0})")]
2140 Locked(PendingStateLocked),
2141 #[display("PendingAt({_0})")]
2142 PendingAt(PendingStatePendingAt),
2143 #[display("BlockedByJoinSet({_0})")]
2144 BlockedByJoinSet(PendingStateBlockedByJoinSet),
2145}
2146
2147#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2148pub struct LockedBy {
2149 pub executor_id: ExecutorId,
2150 pub run_id: RunId,
2151}
2152impl From<&Locked> for LockedBy {
2153 fn from(value: &Locked) -> Self {
2154 LockedBy {
2155 executor_id: value.executor_id,
2156 run_id: value.run_id,
2157 }
2158 }
2159}
2160
2161#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2162#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
2163pub struct PendingStateFinished {
2164 pub version: VersionType, pub finished_at: DateTime<Utc>,
2166 pub result_kind: PendingStateFinishedResultKind,
2167}
2168impl Display for PendingStateFinished {
2169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2170 match self.result_kind {
2171 PendingStateFinishedResultKind::Ok => write!(f, "ok"),
2172 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
2173 }
2174 }
2175}
2176
2177#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2179#[serde(rename_all = "snake_case")]
2180pub enum PendingStateFinishedResultKind {
2181 Ok,
2182 Err(PendingStateFinishedError),
2183}
2184impl PendingStateFinishedResultKind {
2185 pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
2186 match self {
2187 PendingStateFinishedResultKind::Ok => Ok(()),
2188 PendingStateFinishedResultKind::Err(err) => Err(err),
2189 }
2190 }
2191}
2192
2193impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
2194 fn from(result: &SupportedFunctionReturnValue) -> Self {
2195 result.as_pending_state_finished_result()
2196 }
2197}
2198
2199#[derive(
2200 Debug,
2201 Clone,
2202 Copy,
2203 PartialEq,
2204 Eq,
2205 Serialize,
2206 Deserialize,
2207 derive_more::Display,
2208 schemars::JsonSchema,
2209)]
2210#[serde(rename_all = "snake_case")]
2211pub enum PendingStateFinishedError {
2212 #[display("{_0}")]
2213 ExecutionFailure(ExecutionFailureKind),
2214 #[display("completed with an error")]
2215 Error,
2216}
2217
2218impl PendingState {
2219 #[instrument(skip(self))]
2220 pub fn can_append_lock(
2221 &self,
2222 created_at: DateTime<Utc>,
2223 executor_id: ExecutorId,
2224 run_id: RunId,
2225 lock_expires_at: DateTime<Utc>,
2226 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
2227 if lock_expires_at <= created_at {
2228 return Err(DbErrorWriteNonRetriable::ValidationFailed(
2229 "invalid expiry date".into(),
2230 ));
2231 }
2232 match self {
2233 PendingState::PendingAt(PendingStatePendingAt {
2234 scheduled_at,
2235 last_lock,
2236 }) => {
2237 if *scheduled_at <= created_at {
2238 Ok(LockKind::CreatingNewLock)
2240 } else if let Some(LockedBy {
2241 executor_id: last_executor_id,
2242 run_id: last_run_id,
2243 }) = last_lock
2244 && executor_id == *last_executor_id
2245 && run_id == *last_run_id
2246 {
2247 Ok(LockKind::Extending)
2249 } else {
2250 Err(DbErrorWriteNonRetriable::ValidationFailed(
2251 "cannot lock, not yet pending".into(),
2252 ))
2253 }
2254 }
2255 PendingState::Locked(PendingStateLocked {
2256 locked_by:
2257 LockedBy {
2258 executor_id: current_pending_state_executor_id,
2259 run_id: current_pending_state_run_id,
2260 },
2261 lock_expires_at: _,
2262 }) => {
2263 if executor_id == *current_pending_state_executor_id
2264 && run_id == *current_pending_state_run_id
2265 {
2266 Ok(LockKind::Extending)
2268 } else {
2269 Err(DbErrorWriteNonRetriable::IllegalState {
2270 reason: "cannot lock, already locked".into(),
2271 context: SpanTrace::capture(),
2272 source: None,
2273 loc: Location::caller(),
2274 })
2275 }
2276 }
2277 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2278 reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
2279 context: SpanTrace::capture(),
2280 source: None,
2281 loc: Location::caller(),
2282 }),
2283 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2284 reason: "already finished".into(),
2285 context: SpanTrace::capture(),
2286 source: None,
2287 loc: Location::caller(),
2288 }),
2289 PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
2290 reason: "cannot lock, execution is paused".into(),
2291 context: SpanTrace::capture(),
2292 source: None,
2293 loc: Location::caller(),
2294 }),
2295 }
2296 }
2297
2298 #[must_use]
2299 pub fn is_finished(&self) -> bool {
2300 matches!(self, PendingState::Finished { .. })
2301 }
2302
2303 #[must_use]
2304 pub fn is_paused(&self) -> bool {
2305 matches!(self, PendingState::Paused(_))
2306 }
2307}
2308
2309#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2310pub enum LockKind {
2311 Extending,
2312 CreatingNewLock,
2313}
2314
2315pub mod http_client_trace {
2316 use chrono::{DateTime, Utc};
2317 use serde::{Deserialize, Serialize};
2318
2319 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2320 pub struct HttpClientTrace {
2321 pub req: RequestTrace,
2322 pub resp: Option<ResponseTrace>,
2323 }
2324
2325 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2326 pub struct RequestTrace {
2327 pub sent_at: DateTime<Utc>,
2328 pub uri: String,
2329 pub method: String,
2330 }
2331
2332 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2333 pub struct ResponseTrace {
2334 pub finished_at: DateTime<Utc>,
2335 pub status: Result<u16, String>,
2336 }
2337}
2338
2339#[derive(schemars::JsonSchema)]
2341pub struct DbStorageSchema {
2342 pub execution_event: ExecutionEvent,
2343 pub pending_state: PendingState,
2344 pub join_set_response: JoinSetResponse,
2345 pub wasm_backtrace: WasmBacktrace,
2346}
2347
2348#[cfg(test)]
2349mod tests {
2350 use super::HistoryEvent;
2351 use super::HistoryEventScheduleAt;
2352 use super::JoinNextTryOutcome;
2353 use super::PendingStateFinished;
2354 use super::PendingStateFinishedError;
2355 use super::PendingStateFinishedResultKind;
2356 use crate::ExecutionFailureKind;
2357 use crate::JoinSetId;
2358 use crate::SupportedFunctionReturnValue;
2359 use chrono::DateTime;
2360 use chrono::Datelike;
2361 use chrono::Utc;
2362 use insta::assert_snapshot;
2363 use rstest::rstest;
2364 use std::time::Duration;
2365 use val_json::type_wrapper::TypeWrapper;
2366 use val_json::wast_val::WastVal;
2367 use val_json::wast_val::WastValWithType;
2368
2369 #[rstest(expected => [
2370 PendingStateFinishedResultKind::Ok,
2371 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2372 ])]
2373 #[test]
2374 fn serde_pending_state_finished_result_kind_should_work(
2375 expected: PendingStateFinishedResultKind,
2376 ) {
2377 let ser = serde_json::to_string(&expected).unwrap();
2378 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2379 assert_eq!(expected, actual);
2380 }
2381
2382 #[rstest(result_kind => [
2383 PendingStateFinishedResultKind::Ok,
2384 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2385 ])]
2386 #[test]
2387 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2388 let expected = PendingStateFinished {
2389 version: 0,
2390 finished_at: Utc::now(),
2391 result_kind,
2392 };
2393
2394 let ser = serde_json::to_string(&expected).unwrap();
2395 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2396 assert_eq!(expected, actual);
2397 }
2398
2399 #[test]
2400 fn join_set_deser_with_result_ok_option_none_should_work() {
2401 let expected = SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2402 r#type: TypeWrapper::Result {
2403 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2404 err: Some(Box::new(TypeWrapper::String)),
2405 },
2406 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2407 }));
2408 let json = serde_json::to_string(&expected).unwrap();
2409 assert_snapshot!(json);
2410
2411 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2412
2413 assert_eq!(expected, actual);
2414 }
2415
2416 #[test]
2417 fn as_date_time_should_work_with_duration_u32_max_secs() {
2418 let duration = Duration::from_secs(u64::from(u32::MAX));
2419 let schedule_at = HistoryEventScheduleAt::In(duration);
2420 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2421 assert_eq!(2106, resolved.year());
2422 }
2423
2424 const MILLIS_PER_SEC: i64 = 1000;
2425 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2426
2427 #[test]
2428 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2429 let duration = Duration::from_secs(
2431 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2432 );
2433 let schedule_at = HistoryEventScheduleAt::In(duration);
2434 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2435 }
2436
2437 #[test]
2438 fn join_next_try_outcome_new_format() {
2439 let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"found"}"#;
2440 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2441 assert_eq!(
2442 event,
2443 HistoryEvent::JoinNextTry {
2444 join_set_id: JoinSetId::new(
2445 crate::JoinSetKind::Named,
2446 crate::StrVariant::Static("test")
2447 )
2448 .unwrap(),
2449 outcome: JoinNextTryOutcome::Found,
2450 }
2451 );
2452
2453 let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"all_processed"}"#;
2454 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2455 assert_eq!(
2456 event,
2457 HistoryEvent::JoinNextTry {
2458 join_set_id: JoinSetId::new(
2459 crate::JoinSetKind::Named,
2460 crate::StrVariant::Static("test")
2461 )
2462 .unwrap(),
2463 outcome: JoinNextTryOutcome::AllProcessed,
2464 }
2465 );
2466 }
2467
2468 #[test]
2469 fn join_next_try_outcome_serializes_new_format() {
2470 let event = HistoryEvent::JoinNextTry {
2471 join_set_id: JoinSetId::new(
2472 crate::JoinSetKind::Named,
2473 crate::StrVariant::Static("test"),
2474 )
2475 .unwrap(),
2476 outcome: JoinNextTryOutcome::AllProcessed,
2477 };
2478 let json = serde_json::to_string(&event).unwrap();
2479 assert!(
2480 json.contains(r#""outcome":"all_processed""#),
2481 "expected outcome field, got: {json}"
2482 );
2483 assert!(
2484 !json.contains("found_response"),
2485 "should not contain old field, got: {json}"
2486 );
2487 }
2488
2489 mod stub_retval_hash {
2490 use super::super::{StubRetVal, StubRetValHash};
2491 use crate::SupportedFunctionReturnValue;
2492 use val_json::type_wrapper::TypeWrapper;
2493 use val_json::wast_val::{WastVal, WastValWithType};
2494
2495 #[test]
2496 fn typed_variant_hash_is_stable() {
2497 let retval =
2498 StubRetVal::Typed(SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2499 r#type: TypeWrapper::String,
2500 value: WastVal::String("hello".into()),
2501 })));
2502 let hash = retval.hash();
2503 assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2505 assert_eq!(hash.to_string().len(), 66);
2507 }
2508
2509 #[test]
2510 fn untyped_variant_hash_is_stable() {
2511 let retval = StubRetVal::Untyped(r#"{"ok": "hello"}"#.to_string());
2512 let hash = retval.hash();
2513 assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2515 assert_eq!(hash.to_string().len(), 66);
2517 }
2518
2519 #[test]
2520 fn different_values_produce_different_hashes() {
2521 let typed1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2522 let typed2 = StubRetVal::Typed(SupportedFunctionReturnValue::Err(None));
2523 let untyped1 = StubRetVal::Untyped("value1".to_string());
2524 let untyped2 = StubRetVal::Untyped("value2".to_string());
2525
2526 let hashes: Vec<_> = [typed1, typed2, untyped1, untyped2]
2527 .into_iter()
2528 .map(|r| r.hash().to_string())
2529 .collect();
2530
2531 for (i, h1) in hashes.iter().enumerate() {
2533 for h2 in hashes.iter().skip(i + 1) {
2534 assert_ne!(h1, h2, "hashes should be different");
2535 }
2536 }
2537 }
2538
2539 #[test]
2540 fn same_values_produce_same_hashes() {
2541 let retval1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2542 let retval2 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2543 assert_eq!(retval1.hash(), retval2.hash());
2544
2545 let untyped1 = StubRetVal::Untyped("test".to_string());
2546 let untyped2 = StubRetVal::Untyped("test".to_string());
2547 assert_eq!(untyped1.hash(), untyped2.hash());
2548 }
2549
2550 #[test]
2551 fn hash_serialization_roundtrip() {
2552 let retval = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2553 let hash = retval.hash();
2554
2555 let serialized = serde_json::to_string(&hash).unwrap();
2556 let deserialized: StubRetValHash = serde_json::from_str(&serialized).unwrap();
2557
2558 assert_eq!(hash, deserialized);
2559 }
2560
2561 #[test]
2562 fn hash_display_and_fromstr_roundtrip() {
2563 let retval = StubRetVal::Untyped("test value".to_string());
2564 let hash = retval.hash();
2565
2566 let display = hash.to_string();
2567 let parsed: StubRetValHash = display.parse().unwrap();
2568
2569 assert_eq!(hash, parsed);
2570 }
2571
2572 #[test]
2573 fn typed_and_untyped_with_same_content_produce_different_hashes() {
2574 let typed = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2576 let json_of_typed =
2577 serde_json::to_string(&SupportedFunctionReturnValue::Ok(None)).unwrap();
2578 let untyped = StubRetVal::Untyped(json_of_typed);
2579
2580 assert_ne!(typed.hash(), untyped.hash());
2581 }
2582 }
2583}