1use crate::ComponentId;
2use crate::ComponentRetryConfig;
3use crate::ComponentType;
4use crate::ExecutionFailureKind;
5use crate::ExecutionId;
6use crate::ExecutionMetadata;
7use crate::FinishedExecutionFailure;
8use crate::FunctionExtension;
9use crate::FunctionFqn;
10use crate::FunctionMetadata;
11use crate::JoinSetId;
12use crate::Params;
13use crate::StrVariant;
14use crate::SupportedFunctionReturnValue;
15use crate::component_id::ComponentDigest;
16use crate::prefixed_ulid::DelayId;
17use crate::prefixed_ulid::DeploymentId;
18use crate::prefixed_ulid::ExecutionIdDerived;
19use crate::prefixed_ulid::ExecutorId;
20use crate::prefixed_ulid::RunId;
21use assert_matches::assert_matches;
22use async_trait::async_trait;
23use chrono::TimeDelta;
24use chrono::{DateTime, Utc};
25use http_client_trace::HttpClientTrace;
26use serde::Deserialize;
27use serde::Serialize;
28use std::fmt::Debug;
29use std::fmt::Display;
30use std::panic::Location;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::time::Duration;
34use tracing::debug;
35use tracing::instrument;
36use tracing_error::SpanTrace;
37
38pub const STATE_PENDING_AT: &str = "pending_at";
40pub const STATE_BLOCKED_BY_JOIN_SET: &str = "blocked_by_join_set";
41pub const STATE_LOCKED: &str = "locked";
42pub const STATE_FINISHED: &str = "finished";
43pub const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "join_next"; #[derive(Debug, PartialEq, Eq, Clone)]
46pub struct ExecutionLog {
47 pub execution_id: ExecutionId,
48 pub events: Vec<ExecutionEvent>,
49 pub responses: Vec<ResponseWithCursor>,
50 pub next_version: Version, pub pending_state: PendingState, pub component_digest: ComponentDigest, pub component_type: ComponentType,
54 pub deployment_id: DeploymentId, }
56
57impl ExecutionLog {
58 #[must_use]
61 pub fn can_be_retried_after(
62 temporary_event_count: u32,
63 max_retries: Option<u32>,
64 retry_exp_backoff: Duration,
65 ) -> Option<Duration> {
66 if temporary_event_count <= max_retries.unwrap_or(u32::MAX) {
68 let duration = retry_exp_backoff * 2_u32.saturating_pow(temporary_event_count - 1);
70 Some(duration)
71 } else {
72 None
73 }
74 }
75
76 #[must_use]
77 pub fn compute_retry_duration_when_retrying_forever(
78 temporary_event_count: u32,
79 retry_exp_backoff: Duration,
80 ) -> Duration {
81 Self::can_be_retried_after(temporary_event_count, None, retry_exp_backoff)
82 .expect("`max_retries` set to MAX must never return None")
83 }
84
85 #[must_use]
86 pub fn get_create_request(&self) -> CreateRequest {
87 assert_matches!(self.events.first().cloned(), Some(ExecutionEvent {
88 event:ExecutionRequest::Created{
89 ffqn,params,parent,scheduled_at,component_id,deployment_id,metadata,scheduled_by},
90 created_at, .. }) => CreateRequest { created_at, execution_id:
91 self.execution_id.clone(), ffqn, params, parent, scheduled_at,
92 component_id, deployment_id, metadata, scheduled_by, paused: false })
93 }
94
95 #[must_use]
96 pub fn ffqn(&self) -> &FunctionFqn {
97 assert_matches!(self.events.first(), Some(ExecutionEvent {
98 event: ExecutionRequest::Created { ffqn, .. },
99 ..
100 }) => ffqn)
101 }
102
103 #[must_use]
104 pub fn params(&self) -> &Params {
105 assert_matches!(self.events.first(), Some(ExecutionEvent {
106 event: ExecutionRequest::Created { params, .. },
107 ..
108 }) => params)
109 }
110
111 #[must_use]
112 pub fn parent(&self) -> Option<(ExecutionId, JoinSetId)> {
113 assert_matches!(self.events.first(), Some(ExecutionEvent {
114 event: ExecutionRequest::Created { parent, .. },
115 ..
116 }) => parent.clone())
117 }
118
119 #[must_use]
120 pub fn last_event(&self) -> &ExecutionEvent {
121 self.events.last().expect("must contain at least one event")
122 }
123
124 #[must_use]
125 pub fn is_finished(&self) -> bool {
126 matches!(
127 self.events.last(),
128 Some(ExecutionEvent {
129 event: ExecutionRequest::Finished { .. },
130 ..
131 })
132 )
133 }
134
135 #[must_use]
136 pub fn as_finished_result(&self) -> Option<SupportedFunctionReturnValue> {
137 if let ExecutionEvent {
138 event: ExecutionRequest::Finished { retval: result, .. },
139 ..
140 } = self.events.last().expect("must contain at least one event")
141 {
142 Some(result.clone())
143 } else {
144 None
145 }
146 }
147
148 pub fn event_history(&self) -> impl Iterator<Item = (HistoryEvent, Version)> + '_ {
149 self.events.iter().filter_map(|event| {
150 if let ExecutionRequest::HistoryEvent { event: eh, .. } = &event.event {
151 Some((eh.clone(), event.version.clone()))
152 } else {
153 None
154 }
155 })
156 }
157
158 #[cfg(feature = "test")]
159 #[must_use]
160 pub fn find_join_set_request(&self, join_set_id: &JoinSetId) -> Option<&JoinSetRequest> {
161 self.events
162 .iter()
163 .find_map(move |event| match &event.event {
164 ExecutionRequest::HistoryEvent {
165 event:
166 HistoryEvent::JoinSetRequest {
167 join_set_id: found,
168 request,
169 },
170 ..
171 } if *join_set_id == *found => Some(request),
172 _ => None,
173 })
174 }
175}
176
177pub type VersionType = u32;
178#[derive(
179 Debug,
180 Default,
181 Clone,
182 PartialEq,
183 PartialOrd,
184 Ord,
185 Eq,
186 Hash,
187 derive_more::Display,
188 derive_more::Into,
189 serde::Serialize,
190 serde::Deserialize,
191 schemars::JsonSchema,
192)]
193#[serde(transparent)]
194#[schemars(transparent)]
195pub struct Version(pub VersionType);
196impl Version {
197 #[must_use]
198 pub fn new(arg: VersionType) -> Version {
199 Version(arg)
200 }
201
202 #[must_use]
203 pub fn increment(&self) -> Version {
204 Version(self.0 + 1)
205 }
206}
207impl TryFrom<i64> for Version {
208 type Error = VersionParseError;
209 fn try_from(value: i64) -> Result<Self, Self::Error> {
210 VersionType::try_from(value)
211 .map(Version::new)
212 .map_err(|_| VersionParseError)
213 }
214}
215impl From<Version> for usize {
216 fn from(value: Version) -> Self {
217 usize::try_from(value.0).expect("16 bit systems are unsupported")
218 }
219}
220impl From<&Version> for usize {
221 fn from(value: &Version) -> Self {
222 usize::try_from(value.0).expect("16 bit systems are unsupported")
223 }
224}
225
226#[derive(Debug, thiserror::Error)]
227#[error("version must be u32")]
228pub struct VersionParseError;
229
230#[derive(
231 Clone,
232 Debug,
233 derive_more::Display,
234 PartialEq,
235 Eq,
236 serde::Serialize,
237 serde::Deserialize,
238 schemars::JsonSchema,
239)]
240#[display("{event}")]
241pub struct ExecutionEvent {
242 pub created_at: DateTime<Utc>,
243 pub event: ExecutionRequest,
244 #[serde(skip_serializing_if = "Option::is_none")]
245 pub backtrace_id: Option<Version>,
246 pub version: Version,
247}
248
249#[derive(
250 Debug,
251 Clone,
252 Copy,
253 PartialEq,
254 Eq,
255 derive_more::Display,
256 derive_more::Into,
257 Serialize, schemars::JsonSchema,
259)]
260pub struct ResponseCursor(pub u32);
261
262#[derive(Debug, Clone, PartialEq, Eq, Serialize , schemars::JsonSchema)]
263pub struct ResponseWithCursor {
264 pub event: JoinSetResponseEventOuter,
265 pub cursor: ResponseCursor,
266}
267
268#[derive(Debug)]
269pub struct ListExecutionEventsResponse {
270 pub events: Vec<ExecutionEvent>,
271 pub max_version: Version,
272}
273
274#[derive(Debug)]
275pub struct ListResponsesResponse {
276 pub responses: Vec<ResponseWithCursor>,
277 pub max_cursor: ResponseCursor,
278}
279
280#[derive(Debug, Clone, PartialEq, Eq, Serialize , schemars::JsonSchema)]
281pub struct JoinSetResponseEventOuter {
282 pub created_at: DateTime<Utc>,
283 pub event: JoinSetResponseEvent,
284}
285
286#[derive(
287 Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema,
288)]
289pub struct JoinSetResponseEvent {
290 pub join_set_id: JoinSetId,
291 pub event: JoinSetResponse,
292}
293
294#[derive(
295 Clone, Debug, PartialEq, Eq, Serialize, Deserialize, derive_more::Display, schemars::JsonSchema,
296)]
297#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
298#[serde(tag = "type", rename_all = "snake_case")]
299pub enum JoinSetResponse {
300 #[display("delay {}: {delay_id}", if result.is_ok() { "finished" } else { "cancelled"})]
301 DelayFinished {
302 delay_id: DelayId,
303 result: Result<(), ()>,
304 },
305 #[display("{result}: {child_execution_id}")] ChildExecutionFinished {
307 child_execution_id: ExecutionIdDerived,
308 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Version(2)))]
309 finished_version: Version,
310 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
311 result: SupportedFunctionReturnValue,
312 },
313}
314
315pub const DUMMY_CREATED: ExecutionRequest = ExecutionRequest::Created {
316 ffqn: FunctionFqn::new_static("", ""),
317 params: Params::empty(),
318 parent: None,
319 scheduled_at: DateTime::from_timestamp_nanos(0),
320 component_id: ComponentId::dummy_activity(),
321 deployment_id: DeploymentId::from_parts(0, 0),
322 metadata: ExecutionMetadata::empty(),
323 scheduled_by: None,
324};
325pub const DUMMY_HISTORY_EVENT: ExecutionRequest = ExecutionRequest::HistoryEvent {
326 event: HistoryEvent::JoinSetCreate {
327 join_set_id: JoinSetId {
328 kind: crate::JoinSetKind::OneOff,
329 name: StrVariant::empty(),
330 },
331 },
332};
333
334#[derive(
335 Clone,
336 derive_more::Debug,
337 derive_more::Display,
338 PartialEq,
339 Eq,
340 Serialize,
341 Deserialize,
342 schemars::JsonSchema,
343)]
344#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
345#[serde(rename_all = "snake_case")]
346pub enum ExecutionRequest {
347 #[display("Created({ffqn}, `{scheduled_at}`)")]
348 Created {
349 ffqn: FunctionFqn,
350 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
351 #[debug(skip)]
352 params: Params,
353 parent: Option<(ExecutionId, JoinSetId)>,
354 scheduled_at: DateTime<Utc>,
355 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
356 component_id: ComponentId,
357 deployment_id: DeploymentId,
358 #[cfg_attr(any(test, feature = "test"), arbitrary(default))]
359 metadata: ExecutionMetadata,
360 scheduled_by: Option<ExecutionId>,
361 },
362 Locked(Locked),
363 #[display("Unlocked(`{backoff_expires_at}`)")]
370 Unlocked {
371 backoff_expires_at: DateTime<Utc>,
372 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
373 reason: StrVariant,
374 },
375 #[display("TemporarilyFailed(`{backoff_expires_at}`)")]
378 TemporarilyFailed {
379 backoff_expires_at: DateTime<Utc>,
380 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StrVariant::Static("reason")))]
381 reason: StrVariant,
382 detail: Option<String>,
383 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
384 http_client_traces: Option<Vec<HttpClientTrace>>,
385 },
386 #[display("TemporarilyTimedOut(`{backoff_expires_at}`)")]
389 TemporarilyTimedOut {
390 backoff_expires_at: DateTime<Utc>,
391 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
392 http_client_traces: Option<Vec<HttpClientTrace>>,
393 },
394 #[display("Finished: {retval}")]
396 Finished {
397 #[cfg_attr(any(test, feature = "test"), arbitrary(value = crate::SUPPORTED_RETURN_VALUE_OK_EMPTY))]
398 retval: SupportedFunctionReturnValue,
399 #[cfg_attr(any(test, feature = "test"), arbitrary(value = None))]
400 http_client_traces: Option<Vec<HttpClientTrace>>,
401 },
402
403 #[display("HistoryEvent({event})")]
404 HistoryEvent {
405 event: HistoryEvent,
406 },
407 #[display("Paused")]
408 Paused,
409 #[display("Unpaused")]
410 Unpaused,
411}
412
413impl ExecutionRequest {
414 #[must_use]
415 pub fn is_temporary_event(&self) -> bool {
416 matches!(
417 self,
418 Self::TemporarilyFailed { .. } | Self::TemporarilyTimedOut { .. }
419 )
420 }
421
422 #[must_use]
424 pub const fn variant(&self) -> &'static str {
425 match self {
426 ExecutionRequest::Created { .. } => "created",
427 ExecutionRequest::Locked(_) => "locked",
428 ExecutionRequest::Unlocked { .. } => "unlocked",
429 ExecutionRequest::TemporarilyFailed { .. } => "temporarily_failed",
430 ExecutionRequest::TemporarilyTimedOut { .. } => "temporarily_timed_out",
431 ExecutionRequest::Finished { .. } => "finished",
432 ExecutionRequest::HistoryEvent { .. } => "history_event",
433 ExecutionRequest::Paused => "paused",
434 ExecutionRequest::Unpaused => "unpaused",
435 }
436 }
437
438 #[must_use]
439 pub fn join_set_id(&self) -> Option<&JoinSetId> {
440 match self {
441 Self::Created {
442 parent: Some((_parent_id, join_set_id)),
443 ..
444 } => Some(join_set_id),
445 Self::HistoryEvent {
446 event:
447 HistoryEvent::JoinSetCreate { join_set_id, .. }
448 | HistoryEvent::JoinSetRequest { join_set_id, .. }
449 | HistoryEvent::JoinNext { join_set_id, .. },
450 } => Some(join_set_id),
451 _ => None,
452 }
453 }
454}
455
456#[derive(
457 Clone,
458 derive_more::Debug,
459 derive_more::Display,
460 PartialEq,
461 Eq,
462 Serialize,
463 Deserialize,
464 schemars::JsonSchema,
465)]
466#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
467#[display("Locked(`{lock_expires_at}`, {component_id})")]
468pub struct Locked {
469 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentId::dummy_activity()))]
470 pub component_id: ComponentId,
471 pub executor_id: ExecutorId,
472 pub deployment_id: DeploymentId,
473 pub run_id: RunId,
474 pub lock_expires_at: DateTime<Utc>,
475 #[cfg_attr(any(test, feature = "test"), arbitrary(value = ComponentRetryConfig::ZERO))]
476 pub retry_config: ComponentRetryConfig,
477}
478
479#[derive(
480 Debug,
481 Clone,
482 Copy,
483 PartialEq,
484 Eq,
485 derive_more::Display,
486 Serialize,
487 Deserialize,
488 schemars::JsonSchema,
489)]
490#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
491#[serde(tag = "type", rename_all = "snake_case")]
492pub enum PersistKind {
493 #[display("RandomU64({min}, {max_inclusive})")]
494 RandomU64 {
495 min: u64,
496 max_inclusive: u64,
497 },
498 #[display("RandomString({min_length}, {max_length_exclusive})")]
499 RandomString {
500 min_length: u64,
501 max_length_exclusive: u64,
502 },
503 ExecutionId,
504}
505
506#[must_use]
507pub fn from_u64_to_bytes(value: u64) -> [u8; 8] {
508 value.to_be_bytes()
509}
510
511#[derive(
512 derive_more::Debug,
513 Clone,
514 PartialEq,
515 Eq,
516 derive_more::Display,
517 Serialize,
518 Deserialize,
519 schemars::JsonSchema,
520)]
521#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
522#[serde(tag = "type", rename_all = "snake_case")]
523pub enum HistoryEvent {
525 #[display("Persist")]
527 Persist {
528 #[debug(skip)]
529 value: Vec<u8>, kind: PersistKind,
531 },
532 #[display("JoinSetCreate({join_set_id})")]
533 JoinSetCreate { join_set_id: JoinSetId },
534 #[display("JoinSetRequest({request})")]
535 JoinSetRequest {
537 join_set_id: JoinSetId,
538 request: JoinSetRequest,
539 },
540 #[display("JoinNext({join_set_id})")]
546 JoinNext {
547 join_set_id: JoinSetId,
548 run_expires_at: DateTime<Utc>,
551 requested_ffqn: Option<FunctionFqn>,
554 closing: bool,
556 },
557 #[display("JoinNextTry({join_set_id}, {outcome})")]
559 JoinNextTry {
560 join_set_id: JoinSetId,
561 outcome: JoinNextTryOutcome,
562 },
563 #[display("JoinNextTooMany({join_set_id})")]
565 JoinNextTooMany {
566 join_set_id: JoinSetId,
567 requested_ffqn: Option<FunctionFqn>,
570 },
571 #[display("Schedule({execution_id}, {schedule_at})")]
572 Schedule {
573 execution_id: ExecutionId,
574 schedule_at: HistoryEventScheduleAt, #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
576 result: Result<(), ScheduleRequestError>,
577 },
578 #[display("Stub({target_execution_id})")]
579 Stub {
580 target_execution_id: ExecutionIdDerived,
581 #[cfg_attr(any(test, feature = "test"), arbitrary(value = StubRetVal::Typed(crate::SUPPORTED_RETURN_VALUE_OK_EMPTY).hash()))]
582 retval_hash: StubRetValHash,
583 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
584 result: Result<(), StubError>,
585 },
586}
587
588#[derive(derive_more::Debug, Clone, PartialEq, Eq)]
590#[cfg_attr(any(test, feature = "test"), derive(Serialize, Deserialize))]
591#[cfg_attr(any(test, feature = "test"), serde(rename_all = "snake_case"))]
592pub enum StubRetVal {
593 Typed(SupportedFunctionReturnValue),
594 Untyped(String),
595}
596
597impl StubRetVal {
598 #[must_use]
600 pub fn hash(&self) -> StubRetValHash {
601 use sha2::{Digest as _, Sha256};
602 const STUB_RETVAL_HASH_VERSION: u8 = 1;
603 let mut hasher = Sha256::default();
604
605 match self {
606 StubRetVal::Typed(val) => {
607 hasher.update(b"T|");
608 let json = serde_json::to_string(val)
610 .expect("SupportedFunctionReturnValue is always serializable");
611 hasher.update(json.as_bytes());
612 }
613 StubRetVal::Untyped(s) => {
614 hasher.update(b"U|");
615 hasher.update(s.as_bytes());
616 }
617 }
618
619 let hash_bytes = hasher.finalize();
620 let mut result = [0u8; 33];
621 result[0] = STUB_RETVAL_HASH_VERSION;
622 result[1..].copy_from_slice(&hash_bytes);
623
624 StubRetValHash(result)
625 }
626}
627
628#[derive(
631 Clone,
632 PartialEq,
633 Eq,
634 serde_with::SerializeDisplay,
635 serde_with::DeserializeFromStr,
636 schemars::JsonSchema,
637)]
638#[schemars(with = "String")]
639pub struct StubRetValHash([u8; 33]);
640
641impl Display for StubRetValHash {
642 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
643 for b in self.0 {
644 write!(f, "{b:02x}")?;
645 }
646 Ok(())
647 }
648}
649
650impl Debug for StubRetValHash {
651 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652 Display::fmt(self, f)
653 }
654}
655
656impl std::str::FromStr for StubRetValHash {
657 type Err = StubRetValHashParseError;
658
659 fn from_str(s: &str) -> Result<Self, Self::Err> {
660 if s.len() != 66 {
661 return Err(StubRetValHashParseError::InvalidLength(s.len()));
663 }
664 let mut bytes = [0u8; 33];
665 for i in 0..33 {
666 let chunk = &s[i * 2..i * 2 + 2];
667 bytes[i] =
668 u8::from_str_radix(chunk, 16).map_err(|_| StubRetValHashParseError::InvalidHex)?;
669 }
670 Ok(StubRetValHash(bytes))
671 }
672}
673
674#[derive(Debug, thiserror::Error)]
675pub enum StubRetValHashParseError {
676 #[error("invalid length: expected 66 hex chars, got {0}")]
677 InvalidLength(usize),
678 #[error("invalid hex character")]
679 InvalidHex,
680}
681
682#[derive(
685 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
686)]
687#[serde(rename_all = "snake_case")]
688pub enum StubError {
689 #[error("execution not found")]
690 ExecutionNotFound,
691 #[error("type check error: {0}")]
692 TypeCheckError(String),
693 #[error("conflict")]
694 Conflict,
695}
696
697#[derive(
699 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
700)]
701#[serde(rename_all = "snake_case")]
702pub enum ScheduleRequestError {
703 #[error("function not found")]
704 FunctionNotFound,
705 #[error("params parsing error: {0}")]
706 TypeCheckError(String),
707}
708
709#[derive(
711 Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
712)]
713#[serde(rename_all = "snake_case")]
714pub enum ChildExecutionRequestError {
715 #[error("function not found")]
716 FunctionNotFound,
717 #[error("params parsing error: {0}")]
718 TypeCheckError(String),
719}
720
721#[derive(
722 Debug,
723 Clone,
724 Copy,
725 PartialEq,
726 Eq,
727 derive_more::Display,
728 Serialize,
729 Deserialize,
730 schemars::JsonSchema,
731)]
732#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
733#[serde(rename_all = "snake_case")]
734pub enum JoinNextTryOutcome {
735 #[display("found")]
737 Found,
738 #[display("pending")]
740 Pending,
741 #[display("all_processed")]
743 AllProcessed,
744}
745
746impl From<bool> for JoinNextTryOutcome {
747 fn from(found_response: bool) -> Self {
751 if found_response {
752 JoinNextTryOutcome::Found
753 } else {
754 JoinNextTryOutcome::Pending
755 }
756 }
757}
758
759#[derive(
760 Debug,
761 Clone,
762 Copy,
763 PartialEq,
764 Eq,
765 derive_more::Display,
766 Serialize,
767 Deserialize,
768 schemars::JsonSchema,
769)]
770#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
771#[serde(rename_all = "snake_case")]
772pub enum HistoryEventScheduleAt {
773 Now,
774 #[display("At(`{_0}`)")]
775 At(DateTime<Utc>),
776 #[display("In({_0:?})")]
777 In(Duration),
778}
779
780#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
781pub enum ScheduleAtConversionError {
782 #[error("source duration value is out of range")]
783 OutOfRangeError,
784}
785
786impl HistoryEventScheduleAt {
787 pub fn as_date_time(
788 &self,
789 now: DateTime<Utc>,
790 ) -> Result<DateTime<Utc>, ScheduleAtConversionError> {
791 match self {
792 Self::Now => Ok(now),
793 Self::At(date_time) => Ok(*date_time),
794 Self::In(duration) => {
795 let time_delta = TimeDelta::from_std(*duration)
796 .map_err(|_| ScheduleAtConversionError::OutOfRangeError)?;
797 now.checked_add_signed(time_delta)
798 .ok_or(ScheduleAtConversionError::OutOfRangeError)
799 }
800 }
801 }
802}
803
804#[derive(
805 Clone, Debug, PartialEq, Eq, derive_more::Display, Serialize, Deserialize, schemars::JsonSchema,
806)]
807#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
808#[serde(tag = "type", rename_all = "snake_case")]
809pub enum JoinSetRequest {
810 #[display("DelayRequest({delay_id}, expires_at: `{expires_at}`, schedule_at: `{schedule_at}`)")]
812 DelayRequest {
813 delay_id: DelayId,
814 expires_at: DateTime<Utc>,
815 schedule_at: HistoryEventScheduleAt,
816 #[serde(default)]
817 paused: bool,
818 },
819 #[display("ChildExecutionRequest({child_execution_id}, {target_ffqn}, params: {params})")]
821 ChildExecutionRequest {
822 child_execution_id: ExecutionIdDerived,
823 target_ffqn: FunctionFqn,
824 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Params::empty()))]
825 params: Params,
826 #[cfg_attr(any(test, feature = "test"), arbitrary(value = Ok(())))]
827 result: Result<(), ChildExecutionRequestError>,
828 },
829}
830
831#[derive(Debug, Clone, thiserror::Error, derive_more::PartialEq, derive_more::Eq)]
833pub enum DbErrorGeneric {
834 #[error("database error: {reason}")]
835 Uncategorized {
836 reason: StrVariant,
837 #[eq(skip)]
838 #[partial_eq(skip)]
839 context: SpanTrace,
840 #[eq(skip)]
841 #[partial_eq(skip)]
842 #[source]
843 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
844 loc: &'static Location<'static>,
845 },
846 #[error("database was closed")]
847 Close,
848}
849
850#[derive(thiserror::Error, Clone, Debug, derive_more::PartialEq, derive_more::Eq)]
851pub enum DbErrorWriteNonRetriable {
852 #[error("validation failed: {0}")]
853 ValidationFailed(StrVariant),
854 #[error("conflict")]
855 Conflict,
856 #[error("already finished")]
857 AlreadyFinished,
858 #[error("illegal state: {reason}")]
859 IllegalState {
860 reason: StrVariant,
861 #[eq(skip)]
862 #[partial_eq(skip)]
863 context: SpanTrace,
864 #[eq(skip)]
865 #[partial_eq(skip)]
866 #[source]
867 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
868 loc: &'static Location<'static>,
869 },
870 #[error("version conflict: expected: {expected}, got: {requested}")]
871 VersionConflict {
872 expected: Version,
873 requested: Version,
874 },
875}
876
877#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
879pub enum DbErrorWrite {
880 #[error("cannot write - row not found")]
881 NotFound,
882 #[error("non-retriable error: {0}")]
883 NonRetriable(#[from] DbErrorWriteNonRetriable),
884 #[error(transparent)]
885 Generic(#[from] DbErrorGeneric),
886}
887
888#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
890pub enum DbErrorStubResponse {
891 #[error("stub conflict: already finished with a different value")]
892 StubConflict,
893 #[error(transparent)]
894 Write(#[from] DbErrorWrite),
895}
896
897#[derive(Debug, Clone, thiserror::Error, PartialEq)]
899pub enum DbErrorRead {
900 #[error("cannot read - row not found")]
901 NotFound,
902 #[error(transparent)]
903 Generic(#[from] DbErrorGeneric),
904}
905
906#[derive(Debug, thiserror::Error, PartialEq)]
907pub enum DbErrorReadWithTimeout {
908 #[error("timeout")]
909 Timeout(TimeoutOutcome),
910 #[error(transparent)]
911 DbErrorRead(#[from] DbErrorRead),
912}
913
914pub type AppendResponse = Version;
917pub type PendingExecution = (ExecutionId, Version, Params, Option<DateTime<Utc>>);
918
919#[derive(Debug, Clone)]
920pub struct LockedExecution {
921 pub execution_id: ExecutionId,
922 pub next_version: Version,
923 pub metadata: ExecutionMetadata,
924 pub locked_event: Locked,
925 pub ffqn: FunctionFqn,
926 pub params: Params,
927 pub event_history: Vec<(HistoryEvent, Version)>,
928 pub responses: Vec<ResponseWithCursor>,
929 pub parent: Option<(ExecutionId, JoinSetId)>,
930 pub intermittent_event_count: u32,
931}
932
933pub type LockPendingResponse = Vec<LockedExecution>;
934pub type AppendBatchResponse = Version;
935
936#[derive(Debug, Clone, PartialEq, derive_more::Display, Serialize, Deserialize)]
937#[display("{event}")]
938pub struct AppendRequest {
939 pub created_at: DateTime<Utc>,
940 pub event: ExecutionRequest,
941}
942
943#[derive(Debug, Clone, PartialEq)]
944#[cfg_attr(feature = "test", derive(Serialize))]
945pub struct CreateRequest {
946 pub created_at: DateTime<Utc>,
947 pub execution_id: ExecutionId,
948 pub ffqn: FunctionFqn,
949 pub params: Params,
950 pub parent: Option<(ExecutionId, JoinSetId)>,
951 pub scheduled_at: DateTime<Utc>,
952 pub component_id: ComponentId,
953 pub deployment_id: DeploymentId,
954 pub metadata: ExecutionMetadata,
955 pub scheduled_by: Option<ExecutionId>,
956 pub paused: bool,
957}
958
959impl From<CreateRequest> for ExecutionRequest {
960 fn from(value: CreateRequest) -> Self {
961 Self::Created {
962 ffqn: value.ffqn,
963 params: value.params,
964 parent: value.parent,
965 scheduled_at: value.scheduled_at,
966 component_id: value.component_id,
967 deployment_id: value.deployment_id,
968 metadata: value.metadata,
969 scheduled_by: value.scheduled_by,
970 }
971 }
972}
973
974#[async_trait]
975pub trait DbPool: Send + Sync {
976 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric>;
977
978 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric>;
979
980 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric>;
981
982 #[cfg(feature = "test")]
983 async fn connection_test(&self) -> Result<Box<dyn DbConnectionTest>, DbErrorGeneric>;
984}
985
986#[async_trait]
987pub trait DbPoolCloseable {
988 async fn close(&self);
989}
990
991#[derive(Clone, Debug, PartialEq)]
992#[cfg_attr(feature = "test", derive(Serialize))]
993pub struct AppendEventsToExecution {
994 pub execution_id: ExecutionId,
995 pub version: Version,
996 pub batch: Vec<AppendRequest>,
997}
998
999#[derive(Clone, Debug, PartialEq)]
1000#[cfg_attr(feature = "test", derive(Serialize))]
1001pub struct AppendResponseToExecution {
1002 pub parent_execution_id: ExecutionId,
1003 pub created_at: DateTime<Utc>,
1004 pub join_set_id: JoinSetId,
1005 pub child_execution_id: ExecutionIdDerived,
1006 pub finished_version: Version,
1007 pub result: SupportedFunctionReturnValue,
1008}
1009
1010#[derive(Debug, Clone, PartialEq)]
1014#[cfg_attr(feature = "test", derive(Serialize))]
1015pub enum CapturedDbWrite {
1016 Append {
1017 execution_id: ExecutionId,
1018 version: Version,
1019 req: AppendRequest,
1020 backtraces: Vec<BacktraceInfo>,
1021 },
1022 AppendBatch {
1023 current_time: DateTime<Utc>,
1024 batch: Vec<AppendRequest>,
1025 execution_id: ExecutionId,
1026 version: Version,
1027 backtraces: Vec<BacktraceInfo>,
1028 },
1029 AppendBatchCreateNewExecution {
1030 current_time: DateTime<Utc>,
1031 batch: Vec<AppendRequest>,
1032 execution_id: ExecutionId,
1033 version: Version,
1034 child_req: Vec<CreateRequest>,
1035 backtraces: Vec<BacktraceInfo>,
1036 },
1037 AppendStubResponse {
1038 events: AppendEventsToExecution,
1039 response: AppendResponseToExecution,
1040 current_time: DateTime<Utc>,
1041 backtraces: Vec<BacktraceInfo>,
1042 },
1043 AppendFinished {
1044 execution_id: ExecutionId,
1046 version: Version,
1047 current_time: DateTime<Utc>,
1048 retval: SupportedFunctionReturnValue,
1049 parent: Option<(ExecutionId, JoinSetId)>,
1050 },
1051}
1052
1053#[async_trait]
1054pub trait DbExecutor: Send + Sync {
1055 #[expect(clippy::too_many_arguments)]
1056 async fn lock_pending_by_ffqns(
1057 &self,
1058 batch_size: u32,
1059 pending_at_or_sooner: DateTime<Utc>,
1060 ffqns: Arc<[FunctionFqn]>,
1061 created_at: DateTime<Utc>,
1062 component_id: ComponentId,
1063 deployment_id: DeploymentId,
1064 executor_id: ExecutorId,
1065 lock_expires_at: DateTime<Utc>,
1066 run_id: RunId,
1067 retry_config: ComponentRetryConfig,
1068 ) -> Result<LockPendingResponse, DbErrorWrite>;
1069
1070 #[expect(clippy::too_many_arguments)]
1071 async fn lock_pending_by_component_digest(
1072 &self,
1073 batch_size: u32,
1074 pending_at_or_sooner: DateTime<Utc>,
1075 component_id: &ComponentId,
1076 deployment_id: DeploymentId,
1077 created_at: DateTime<Utc>,
1078 executor_id: ExecutorId,
1079 lock_expires_at: DateTime<Utc>,
1080 run_id: RunId,
1081 retry_config: ComponentRetryConfig,
1082 ) -> Result<LockPendingResponse, DbErrorWrite>;
1083
1084 #[cfg(feature = "test")]
1085 #[expect(clippy::too_many_arguments)]
1086 async fn lock_one(
1087 &self,
1088 created_at: DateTime<Utc>,
1089 component_id: ComponentId,
1090 deployment_id: DeploymentId,
1091 execution_id: &ExecutionId,
1092 run_id: RunId,
1093 version: Version,
1094 executor_id: ExecutorId,
1095 lock_expires_at: DateTime<Utc>,
1096 retry_config: ComponentRetryConfig,
1097 ) -> Result<LockedExecution, DbErrorWrite>;
1098
1099 async fn append(
1102 &self,
1103 execution_id: ExecutionId,
1104 version: Version,
1105 req: AppendRequest,
1106 ) -> Result<AppendResponse, DbErrorWrite>;
1107
1108 async fn append_batch_respond_to_parent(
1111 &self,
1112 events: AppendEventsToExecution,
1113 response: AppendResponseToExecution,
1114 current_time: DateTime<Utc>, ) -> Result<AppendBatchResponse, DbErrorWrite>;
1116
1117 async fn wait_for_pending_by_ffqn(
1122 &self,
1123 pending_at_or_sooner: DateTime<Utc>,
1124 ffqns: Arc<[FunctionFqn]>,
1125 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1126 );
1127
1128 async fn wait_for_pending_by_component_digest(
1133 &self,
1134 pending_at_or_sooner: DateTime<Utc>,
1135 component_digest: &ComponentDigest,
1136 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
1137 );
1138
1139 async fn cancel_activity_with_retries(
1140 &self,
1141 execution_id: &ExecutionId,
1142 cancelled_at: DateTime<Utc>,
1143 ) -> Result<CancelOutcome, DbErrorWrite> {
1144 let mut retries = 5;
1145 loop {
1146 let res = self.cancel_activity(execution_id, cancelled_at).await;
1147 if res.is_ok() || retries == 0 {
1148 return res;
1149 }
1150 retries -= 1;
1151 }
1152 }
1153
1154 async fn get_last_execution_event(
1156 &self,
1157 execution_id: &ExecutionId,
1158 ) -> Result<ExecutionEvent, DbErrorRead>;
1159
1160 async fn cancel_activity(
1161 &self,
1162 execution_id: &ExecutionId,
1163 cancelled_at: DateTime<Utc>,
1164 ) -> Result<CancelOutcome, DbErrorWrite> {
1165 debug!("Determining cancellation state of {execution_id}");
1166
1167 let last_event = self
1168 .get_last_execution_event(execution_id)
1169 .await
1170 .map_err(DbErrorWrite::from)?;
1171 if let ExecutionRequest::Finished {
1172 retval:
1173 SupportedFunctionReturnValue::ExecutionFailure(FinishedExecutionFailure {
1174 kind: ExecutionFailureKind::Cancelled,
1175 ..
1176 }),
1177 ..
1178 } = last_event.event
1179 {
1180 return Ok(CancelOutcome::Cancelled);
1181 } else if matches!(last_event.event, ExecutionRequest::Finished { .. }) {
1182 debug!("Not cancelling, {execution_id} is already finished");
1183 return Ok(CancelOutcome::AlreadyFinished);
1184 }
1185 let finished_version = last_event.version.increment();
1186 let child_result =
1187 SupportedFunctionReturnValue::ExecutionFailure(FinishedExecutionFailure {
1188 reason: None,
1189 kind: ExecutionFailureKind::Cancelled,
1190 detail: None,
1191 });
1192 let cancel_request = AppendRequest {
1193 created_at: cancelled_at,
1194 event: ExecutionRequest::Finished {
1195 retval: child_result.clone(),
1196 http_client_traces: None,
1197 },
1198 };
1199 debug!("Cancelling activity {execution_id} at {finished_version}");
1200 if let ExecutionId::Derived(execution_id) = execution_id {
1201 let (parent_execution_id, join_set_id) = execution_id.split_to_parts();
1202 let child_execution_id = ExecutionId::Derived(execution_id.clone());
1203 self.append_batch_respond_to_parent(
1204 AppendEventsToExecution {
1205 execution_id: child_execution_id,
1206 version: finished_version.clone(),
1207 batch: vec![cancel_request],
1208 },
1209 AppendResponseToExecution {
1210 parent_execution_id,
1211 created_at: cancelled_at,
1212 join_set_id: join_set_id.clone(),
1213 child_execution_id: execution_id.clone(),
1214 finished_version,
1215 result: child_result,
1216 },
1217 cancelled_at,
1218 )
1219 .await?;
1220 } else {
1221 self.append(execution_id.clone(), finished_version, cancel_request)
1222 .await?;
1223 }
1224 debug!("Cancelled {execution_id}");
1225 Ok(CancelOutcome::Cancelled)
1226 }
1227}
1228
1229pub enum AppendDelayResponseOutcome {
1230 Success,
1231 AlreadyFinished,
1232 AlreadyCancelled,
1233}
1234
1235#[derive(Debug, Clone, Default)]
1236pub struct ListExecutionsFilter {
1237 pub function_name_filter: Option<FunctionNameFilter>,
1238 pub show_derived: bool,
1239 pub hide_finished: bool,
1240 pub execution_id_prefix: Option<String>,
1241 pub component_digest: Option<ComponentDigest>,
1242 pub deployment_id: Option<DeploymentId>,
1243}
1244
1245#[derive(Debug, Clone, PartialEq, Eq)]
1246pub enum FunctionNameFilter {
1247 PackageName(String),
1248 InterfaceName(String),
1249 FunctionName(String),
1250}
1251
1252impl FunctionNameFilter {
1253 #[must_use]
1254 pub fn like_pattern(&self) -> String {
1255 match self {
1256 Self::FunctionName(function_name) | Self::InterfaceName(function_name) => {
1257 format!("{function_name}%")
1258 }
1259 Self::PackageName(package_name) => {
1260 if let Some((pkg_fqn_without_version, version)) = package_name.rsplit_once('@')
1261 && !version.is_empty()
1262 && pkg_fqn_without_version.contains(':')
1263 {
1264 format!("{pkg_fqn_without_version}/%@{version}.%")
1265 } else {
1266 format!("{package_name}%")
1267 }
1268 }
1269 }
1270 }
1271}
1272
1273#[async_trait]
1274pub trait DbExternalApi: DbConnection {
1275 async fn get_backtrace(
1277 &self,
1278 execution_id: &ExecutionId,
1279 filter: BacktraceFilter,
1280 ) -> Result<BacktraceInfo, DbErrorRead>;
1281
1282 async fn upsert_source_file(
1286 &self,
1287 component_digest: &ComponentDigest,
1288 frame_key: &str,
1289 is_suffix: bool,
1290 content: &str,
1291 ) -> Result<(), DbErrorWrite>;
1292
1293 async fn get_source_file(
1297 &self,
1298 component_digest: &ComponentDigest,
1299 file: &str,
1300 ) -> Result<Option<String>, DbErrorRead>;
1301
1302 async fn upsert_component_metadata(
1304 &self,
1305 records: Vec<ComponentMetadataRecord>,
1306 ) -> Result<(), DbErrorWrite>;
1307
1308 async fn insert_deployment_components(
1310 &self,
1311 deployment_id: DeploymentId,
1312 records: Vec<DeploymentComponentRecord>,
1313 ) -> Result<(), DbErrorWrite>;
1314
1315 async fn list_deployment_components(
1317 &self,
1318 deployment_id: DeploymentId,
1319 ) -> Result<Vec<DeploymentComponentDetail>, DbErrorRead>;
1320
1321 async fn get_deployment_component_wit(
1323 &self,
1324 deployment_id: DeploymentId,
1325 component_digest: &ComponentDigest,
1326 ) -> Result<Option<String>, DbErrorRead>;
1327
1328 async fn list_executions(
1330 &self,
1331 filter: ListExecutionsFilter,
1332 pagination: ExecutionListPagination,
1333 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric>;
1334
1335 async fn list_execution_events(
1340 &self,
1341 execution_id: &ExecutionId,
1342 pagination: Pagination<VersionType>,
1343 include_backtrace_id: bool,
1344 ) -> Result<ListExecutionEventsResponse, DbErrorRead>;
1345
1346 async fn list_responses(
1355 &self,
1356 execution_id: &ExecutionId,
1357 pagination: Pagination<u32>,
1358 ) -> Result<ListResponsesResponse, DbErrorRead>;
1359
1360 async fn list_execution_events_responses(
1361 &self,
1362 execution_id: &ExecutionId,
1363 req_since: &Version,
1364 req_max_length: VersionType,
1365 req_include_backtrace_id: bool,
1366 resp_pagination: Pagination<VersionType>,
1367 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead>;
1368
1369 async fn upgrade_execution_component(
1370 &self,
1371 execution_id: &ExecutionId,
1372 old: &ComponentDigest,
1373 new: &ComponentDigest,
1374 ) -> Result<(), DbErrorWrite>;
1375
1376 async fn list_logs(
1377 &self,
1378 execution_id: &ExecutionId,
1379 show_derived: bool,
1380 filter: LogFilter,
1381 pagination: Pagination<DateTime<Utc>>,
1382 ) -> Result<ListLogsResponse, DbErrorRead>;
1383
1384 async fn list_deployment_states(
1385 &self,
1386 current_time: DateTime<Utc>,
1387 pagination: Pagination<Option<DeploymentId>>,
1388 include_config_json: bool,
1389 ) -> Result<Vec<DeploymentState>, DbErrorRead>;
1390
1391 async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite>;
1394
1395 async fn activate_deployment(
1396 &self,
1397 deployment_id: DeploymentId,
1398 now: DateTime<Utc>,
1399 ) -> Result<(), DbErrorWrite>;
1400
1401 async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite>;
1405
1406 async fn get_deployment(
1408 &self,
1409 deployment_id: DeploymentId,
1410 ) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1411
1412 #[cfg(feature = "test")]
1415 async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1416
1417 async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead>;
1420
1421 async fn list_deployments(
1422 &self,
1423 pagination: Pagination<Option<DeploymentId>>,
1424 ) -> Result<Vec<DeploymentRecord>, DbErrorRead>;
1425
1426 async fn pause_execution(
1430 &self,
1431 execution_id: &ExecutionId,
1432 paused_at: DateTime<Utc>,
1433 ) -> Result<AppendResponse, DbErrorWrite>;
1434
1435 async fn unpause_execution(
1437 &self,
1438 execution_id: &ExecutionId,
1439 unpaused_at: DateTime<Utc>,
1440 ) -> Result<AppendResponse, DbErrorWrite>;
1441
1442 async fn pause_delay(&self, delay_id: &DelayId) -> Result<(), DbErrorWrite>;
1446
1447 async fn unpause_delay(&self, delay_id: &DelayId) -> Result<(), DbErrorWrite>;
1451}
1452pub const LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH: u16 = 20;
1453pub const LIST_DEPLOYMENT_STATES_DEFAULT_PAGINATION: Pagination<Option<DeploymentId>> =
1454 Pagination::OlderThan {
1455 length: LIST_DEPLOYMENT_STATES_DEFAULT_LENGTH,
1456 cursor: None,
1457 including_cursor: false,
1458 };
1459
1460pub struct DeploymentState {
1461 pub deployment_id: DeploymentId,
1462 pub locked: u32,
1463 pub pending: u32,
1465 pub scheduled: u32,
1467 pub blocked: u32,
1468 pub finished: u32,
1469 pub config_json: Option<String>,
1471 pub created_at: DateTime<Utc>,
1472 pub last_active_at: Option<DateTime<Utc>>,
1474 pub status: DeploymentStatus,
1475}
1476
1477#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1478pub enum DeploymentStatus {
1479 Inactive,
1480 Enqueued,
1482 Active,
1483}
1484
1485impl DeploymentStatus {
1486 #[must_use]
1487 pub fn as_str(&self) -> &'static str {
1488 match self {
1489 DeploymentStatus::Inactive => "inactive",
1490 DeploymentStatus::Enqueued => "enqueued",
1491 DeploymentStatus::Active => "active",
1492 }
1493 }
1494}
1495
1496impl std::str::FromStr for DeploymentStatus {
1497 type Err = StrVariant;
1498 fn from_str(s: &str) -> Result<Self, Self::Err> {
1499 match s {
1500 "inactive" => Ok(DeploymentStatus::Inactive),
1501 "enqueued" => Ok(DeploymentStatus::Enqueued),
1502 "active" => Ok(DeploymentStatus::Active),
1503 _ => Err(StrVariant::from(format!("unknown deployment status: {s}"))),
1504 }
1505 }
1506}
1507
1508#[derive(Debug, Clone)]
1509pub struct DeploymentRecord {
1510 pub deployment_id: DeploymentId,
1511 pub created_at: DateTime<Utc>,
1512 pub last_active_at: Option<DateTime<Utc>>,
1514 pub status: DeploymentStatus,
1515 pub config_json: String, pub obelisk_version: String,
1517 pub created_by: Option<String>,
1518}
1519
1520#[derive(Debug, Clone)]
1521pub struct ComponentMetadataRecord {
1522 pub component_digest: ComponentDigest,
1523 pub imports: Vec<PersistedFunctionMetadata>,
1524 pub exports: Vec<PersistedFunctionMetadata>,
1525 pub wit: String,
1526 pub wit_origin: String,
1527}
1528
1529#[derive(Debug, Clone)]
1531pub struct DeploymentComponentRecord {
1532 pub deployment_id: DeploymentId,
1533 pub component_name: StrVariant,
1534 pub component_digest: ComponentDigest,
1535 pub component_type: ComponentType,
1536}
1537
1538#[derive(Debug, Clone)]
1539pub struct DeploymentComponentDetail {
1540 pub component_id: ComponentId,
1541 pub imports: Vec<PersistedFunctionMetadata>,
1542 pub exports: Vec<PersistedFunctionMetadata>,
1543 pub wit: String,
1544}
1545
1546#[derive(
1547 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, schemars::JsonSchema,
1548)]
1549pub struct PersistedFunctionMetadata {
1550 pub ffqn: FunctionFqn,
1551 pub parameter_types: Vec<PersistedParameterType>,
1552 pub return_type: String,
1553 pub extension: Option<FunctionExtension>,
1554 pub submittable: bool,
1555}
1556
1557#[derive(
1558 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, schemars::JsonSchema,
1559)]
1560pub struct PersistedParameterType {
1561 pub name: String,
1562 pub wit_type: String,
1563}
1564
1565impl From<FunctionMetadata> for PersistedFunctionMetadata {
1566 fn from(value: FunctionMetadata) -> Self {
1567 PersistedFunctionMetadata {
1568 ffqn: value.ffqn,
1569 parameter_types: value
1570 .parameter_types
1571 .0
1572 .into_iter()
1573 .map(|param| PersistedParameterType {
1574 name: param.name.to_string(),
1575 wit_type: param.wit_type.to_string(),
1576 })
1577 .collect(),
1578 return_type: value.return_type.wit_type().to_string(),
1579 extension: value.extension,
1580 submittable: value.submittable,
1581 }
1582 }
1583}
1584
1585#[derive(Debug)]
1586pub struct ListLogsResponse {
1587 pub items: Vec<LogEntryRow>,
1588 pub next_page: Pagination<DateTime<Utc>>, pub prev_page: Option<Pagination<DateTime<Utc>>>, }
1591
1592#[derive(Debug)]
1593pub struct LogFilter {
1594 show_logs: bool,
1595 show_streams: bool,
1596 levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>, }
1599impl LogFilter {
1600 #[must_use]
1602 pub fn show_logs(levels: Vec<LogLevel>) -> LogFilter {
1603 LogFilter {
1604 show_logs: true,
1605 show_streams: false,
1606 levels,
1607 stream_types: Vec::new(),
1608 }
1609 }
1610 #[must_use]
1612 pub fn show_streams(stream_types: Vec<LogStreamType>) -> LogFilter {
1613 LogFilter {
1614 show_logs: false,
1615 show_streams: true,
1616 levels: Vec::new(),
1617 stream_types,
1618 }
1619 }
1620 #[must_use]
1622 pub fn show_combined(levels: Vec<LogLevel>, stream_types: Vec<LogStreamType>) -> LogFilter {
1623 LogFilter {
1624 show_logs: true,
1625 show_streams: true,
1626 levels,
1627 stream_types,
1628 }
1629 }
1630 #[must_use]
1632 pub fn should_show_logs(&self) -> bool {
1633 self.show_logs
1634 }
1635 #[must_use]
1636 pub fn should_show_streams(&self) -> bool {
1637 self.show_streams
1638 }
1639 #[must_use]
1640 pub fn levels(&self) -> &Vec<LogLevel> {
1641 &self.levels
1642 }
1643 #[must_use]
1644 pub fn stream_types(&self) -> &Vec<LogStreamType> {
1645 &self.stream_types
1646 }
1647}
1648
1649#[derive(Debug, Clone)]
1650pub struct ExecutionWithStateRequestsResponses {
1651 pub execution_with_state: ExecutionWithState,
1652 pub events: Vec<ExecutionEvent>,
1653 pub responses: Vec<ResponseWithCursor>,
1654 pub max_version: Version,
1655 pub max_cursor: ResponseCursor,
1656}
1657
1658#[async_trait]
1659pub trait DbConnection: DbExecutor {
1660 async fn get(&self, execution_id: &ExecutionId) -> Result<ExecutionLog, DbErrorRead>;
1662
1663 async fn append_delay_response(
1664 &self,
1665 created_at: DateTime<Utc>,
1666 execution_id: ExecutionId,
1667 join_set_id: JoinSetId,
1668 delay_id: DelayId,
1669 outcome: Result<(), ()>, ) -> Result<AppendDelayResponseOutcome, DbErrorWrite>;
1671
1672 async fn append_batch(
1675 &self,
1676 current_time: DateTime<Utc>, batch: Vec<AppendRequest>,
1678 execution_id: ExecutionId,
1679 version: Version,
1680 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1681
1682 async fn append_batch_create_new_execution(
1685 &self,
1686 current_time: DateTime<Utc>, batch: Vec<AppendRequest>, execution_id: ExecutionId,
1689 version: Version,
1690 child_req: Vec<CreateRequest>,
1691 backtraces: Vec<BacktraceInfo>,
1692 ) -> Result<AppendBatchResponse, DbErrorWrite>;
1693
1694 async fn get_execution_event(
1696 &self,
1697 execution_id: &ExecutionId,
1698 version: &Version,
1699 ) -> Result<ExecutionEvent, DbErrorRead>;
1700
1701 async fn upsert_stub_response(
1705 &self,
1706 execution_id: ExecutionIdDerived,
1707 version: Version,
1708 req: AppendRequest,
1709 response: AppendResponseToExecution,
1710 current_time: DateTime<Utc>,
1711 ) -> Result<(), DbErrorStubResponse>;
1712
1713 #[instrument(skip(self))]
1714 async fn get_create_request(
1715 &self,
1716 execution_id: &ExecutionId,
1717 ) -> Result<CreateRequest, DbErrorRead> {
1718 let execution_event = self
1719 .get_execution_event(execution_id, &Version::new(0))
1720 .await?;
1721 if let ExecutionRequest::Created {
1722 ffqn,
1723 params,
1724 parent,
1725 scheduled_at,
1726 component_id,
1727 deployment_id,
1728 metadata,
1729 scheduled_by,
1730 } = execution_event.event
1731 {
1732 Ok(CreateRequest {
1733 created_at: execution_event.created_at,
1734 execution_id: execution_id.clone(),
1735 ffqn,
1736 params,
1737 parent,
1738 scheduled_at,
1739 component_id,
1740 deployment_id,
1741 metadata,
1742 scheduled_by,
1743 paused: false,
1744 })
1745 } else {
1746 Err(DbErrorRead::Generic(DbErrorGeneric::Uncategorized {
1747 reason: "execution log must start with creation".into(),
1748 context: SpanTrace::capture(),
1749 source: None,
1750 loc: Location::caller(),
1751 }))
1752 }
1753 }
1754
1755 async fn get_pending_state(
1756 &self,
1757 execution_id: &ExecutionId,
1758 ) -> Result<ExecutionWithState, DbErrorRead>;
1759
1760 async fn get_expired_timers(
1762 &self,
1763 at: DateTime<Utc>,
1764 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric>;
1765
1766 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite>;
1768
1769 async fn subscribe_to_next_responses(
1776 &self,
1777 execution_id: &ExecutionId,
1778 last_response: ResponseCursor,
1779 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
1780 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout>;
1781
1782 async fn wait_for_finished_result(
1789 &self,
1790 execution_id: &ExecutionId,
1791 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
1792 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout>;
1793
1794 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite>;
1795
1796 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite>;
1797
1798 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite>;
1799
1800 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite>;
1801
1802 #[cfg(feature = "test")]
1804 async fn get_finished_result(
1805 &self,
1806 execution_id: &ExecutionId,
1807 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
1808 self.wait_for_finished_result(
1809 execution_id,
1810 Some(Box::pin(std::future::ready(TimeoutOutcome::Timeout))),
1811 )
1812 .await
1813 }
1814}
1815
1816#[derive(Clone, Debug)]
1817pub struct LogInfoAppendRow {
1818 pub execution_id: ExecutionId,
1819 pub run_id: RunId,
1820 pub log_entry: LogEntry,
1821}
1822
1823#[derive(Debug, Clone)]
1824pub struct LogEntryRow {
1825 pub cursor: DateTime<Utc>,
1826 pub run_id: RunId,
1827 pub log_entry: LogEntry,
1828 pub execution_id: ExecutionId,
1829}
1830
1831#[derive(Debug, Clone)]
1832pub enum LogEntry {
1833 Log {
1834 created_at: DateTime<Utc>,
1835 level: LogLevel,
1836 message: String,
1837 },
1838 Stream {
1839 created_at: DateTime<Utc>,
1840 payload: Vec<u8>,
1841 stream_type: LogStreamType,
1842 },
1843}
1844impl LogEntry {
1845 #[must_use]
1846 pub fn created_at(&self) -> DateTime<Utc> {
1847 match self {
1848 LogEntry::Log { created_at, .. } | LogEntry::Stream { created_at, .. } => *created_at,
1849 }
1850 }
1851}
1852
1853#[derive(
1854 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, derive_more::TryFrom, strum::EnumIter,
1855)]
1856#[try_from(repr)]
1857#[repr(u8)]
1858pub enum LogLevel {
1859 Trace = 1,
1860 Debug,
1861 Info,
1862 Warn,
1863 Error,
1864}
1865#[derive(Debug, Clone, Copy, PartialEq, Eq, derive_more::TryFrom, strum::EnumIter)]
1866#[try_from(repr)]
1867#[repr(u8)]
1868pub enum LogStreamType {
1869 StdOut = 1,
1870 StdErr,
1871}
1872
1873#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1874pub enum TimeoutOutcome {
1875 Timeout,
1876 Cancel,
1877}
1878
1879#[cfg(feature = "test")]
1880#[async_trait]
1881pub trait DbConnectionTest: DbConnection {
1882 async fn append_response(
1883 &self,
1884 created_at: DateTime<Utc>,
1885 execution_id: ExecutionId,
1886 response_event: JoinSetResponseEvent,
1887 ) -> Result<(), DbErrorWrite>;
1888}
1889
1890#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1891pub enum CancelOutcome {
1892 Cancelled,
1893 AlreadyFinished,
1894}
1895
1896#[instrument(skip(db_connection))]
1897pub async fn stub_execution(
1898 db_connection: &dyn DbConnection,
1899 execution_id: ExecutionIdDerived,
1900 parent_execution_id: ExecutionId,
1901 join_set_id: JoinSetId,
1902 created_at: DateTime<Utc>,
1903 return_value: SupportedFunctionReturnValue,
1904) -> Result<(), DbErrorWrite> {
1905 let stub_finished_version = Version::new(1); let finished_req = AppendRequest {
1907 created_at,
1908 event: ExecutionRequest::Finished {
1909 retval: return_value.clone(),
1910 http_client_traces: None,
1911 },
1912 };
1913 db_connection
1914 .upsert_stub_response(
1915 execution_id.clone(),
1916 stub_finished_version.clone(),
1917 finished_req,
1918 AppendResponseToExecution {
1919 parent_execution_id,
1920 created_at,
1921 join_set_id,
1922 child_execution_id: execution_id,
1923 finished_version: stub_finished_version,
1924 result: return_value,
1925 },
1926 created_at,
1927 )
1928 .await
1929 .map_err(|err| match err {
1930 DbErrorStubResponse::StubConflict => {
1931 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)
1932 }
1933 DbErrorStubResponse::Write(db_err) => db_err,
1934 })
1935}
1936
1937pub async fn cancel_delay(
1938 db_connection: &dyn DbConnection,
1939 delay_id: DelayId,
1940 cancelled_at: DateTime<Utc>,
1941) -> Result<CancelOutcome, DbErrorWrite> {
1942 let (parent_execution_id, join_set_id) = delay_id.split_to_parts();
1943 db_connection
1944 .append_delay_response(
1945 cancelled_at,
1946 parent_execution_id,
1947 join_set_id,
1948 delay_id,
1949 Err(()), )
1951 .await
1952 .map(|ok| match ok {
1953 AppendDelayResponseOutcome::Success | AppendDelayResponseOutcome::AlreadyCancelled => {
1954 CancelOutcome::Cancelled
1955 }
1956 AppendDelayResponseOutcome::AlreadyFinished => CancelOutcome::AlreadyFinished,
1957 })
1958}
1959
1960#[derive(Clone, Debug)]
1961pub enum BacktraceFilter {
1962 First,
1963 Last,
1964 Specific(Version),
1965}
1966
1967#[derive(Clone, Debug, PartialEq, Eq)]
1968#[cfg_attr(feature = "test", derive(Serialize))]
1969pub struct BacktraceInfo {
1970 pub execution_id: ExecutionId,
1971 pub component_id: ComponentId,
1972 pub version_min_including: Version,
1973 pub version_max_excluding: Version,
1974 pub wasm_backtrace: WasmBacktrace,
1975}
1976
1977#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1978pub struct WasmBacktrace {
1979 pub frames: Vec<FrameInfo>,
1980}
1981
1982#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1983pub struct FrameInfo {
1984 pub module: String,
1985 pub func_name: String,
1986 pub symbols: Vec<FrameSymbol>,
1987}
1988
1989#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, schemars::JsonSchema)]
1990pub struct FrameSymbol {
1991 pub func_name: Option<String>,
1992 pub file: Option<String>,
1993 pub line: Option<u32>,
1994 pub col: Option<u32>,
1995}
1996
1997mod wasm_backtrace {
1998 use super::{FrameInfo, FrameSymbol, WasmBacktrace};
1999
2000 impl WasmBacktrace {
2001 pub fn maybe_from(backtrace: &wasmtime::WasmBacktrace) -> Option<Self> {
2002 if backtrace.frames().is_empty() {
2003 None
2004 } else {
2005 Some(Self {
2006 frames: backtrace.frames().iter().map(FrameInfo::from).collect(),
2007 })
2008 }
2009 }
2010 }
2011
2012 impl From<&wasmtime::FrameInfo> for FrameInfo {
2013 fn from(frame: &wasmtime::FrameInfo) -> Self {
2014 let module_name = frame.module().name().unwrap_or("<unknown>").to_string();
2015 let mut func_name = String::new();
2016 wasmtime_environ::demangle_function_name_or_index(
2017 &mut func_name,
2018 frame.func_name(),
2019 frame.func_index() as usize,
2020 )
2021 .expect("writing to string must succeed");
2022 Self {
2023 module: module_name,
2024 func_name,
2025 symbols: frame
2026 .symbols()
2027 .iter()
2028 .map(std::convert::Into::into)
2029 .collect(),
2030 }
2031 }
2032 }
2033
2034 impl From<&wasmtime::FrameSymbol> for FrameSymbol {
2035 fn from(symbol: &wasmtime::FrameSymbol) -> Self {
2036 let func_name = symbol.name().map(|name| {
2037 let mut writer = String::new();
2038 wasmtime_environ::demangle_function_name(&mut writer, name)
2039 .expect("writing to string must succeed");
2040 writer
2041 });
2042
2043 Self {
2044 func_name,
2045 file: symbol.file().map(ToString::to_string),
2046 line: symbol.line(),
2047 col: symbol.column(),
2048 }
2049 }
2050 }
2051}
2052#[derive(Debug, Clone, derive_more::Display)]
2053#[display("{execution_id} {pending_state} {component_digest}")]
2054pub struct ExecutionWithState {
2055 pub execution_id: ExecutionId,
2056 pub ffqn: FunctionFqn,
2057 pub pending_state: PendingState,
2058 pub created_at: DateTime<Utc>,
2059 pub first_scheduled_at: DateTime<Utc>,
2060 pub component_digest: ComponentDigest,
2061 pub component_type: ComponentType,
2062 pub deployment_id: DeploymentId,
2063}
2064
2065#[derive(Debug, Clone)]
2066pub enum ExecutionListPagination {
2067 CreatedBy(Pagination<Option<DateTime<Utc>>>),
2068 ExecutionId(Pagination<Option<ExecutionId>>),
2069}
2070impl Default for ExecutionListPagination {
2071 fn default() -> ExecutionListPagination {
2072 ExecutionListPagination::CreatedBy(Pagination::OlderThan {
2073 length: 20,
2074 cursor: None,
2075 including_cursor: false, })
2077 }
2078}
2079impl ExecutionListPagination {
2080 #[must_use]
2081 pub fn length(&self) -> u16 {
2082 match self {
2083 ExecutionListPagination::CreatedBy(pagination) => pagination.length(),
2084 ExecutionListPagination::ExecutionId(pagination) => pagination.length(),
2085 }
2086 }
2087}
2088
2089#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2090pub enum Pagination<T> {
2091 NewerThan {
2092 length: u16,
2093 cursor: T,
2094 including_cursor: bool,
2095 },
2096 OlderThan {
2097 length: u16,
2098 cursor: T,
2099 including_cursor: bool,
2100 },
2101}
2102impl<T: Clone> Pagination<T> {
2103 pub fn length(&self) -> u16 {
2104 match self {
2105 Pagination::NewerThan { length, .. } | Pagination::OlderThan { length, .. } => *length,
2106 }
2107 }
2108
2109 pub fn rel(&self) -> &'static str {
2110 match self {
2111 Pagination::NewerThan {
2112 including_cursor: false,
2113 ..
2114 } => ">",
2115 Pagination::NewerThan {
2116 including_cursor: true,
2117 ..
2118 } => ">=",
2119 Pagination::OlderThan {
2120 including_cursor: false,
2121 ..
2122 } => "<",
2123 Pagination::OlderThan {
2124 including_cursor: true,
2125 ..
2126 } => "<=",
2127 }
2128 }
2129
2130 pub fn is_desc(&self) -> bool {
2131 matches!(self, Pagination::OlderThan { .. })
2132 }
2133
2134 pub fn asc_or_desc(&self) -> &'static str {
2135 if self.is_asc() { "asc" } else { "desc" }
2136 }
2137
2138 pub fn is_asc(&self) -> bool {
2139 !self.is_desc()
2140 }
2141
2142 pub fn cursor(&self) -> &T {
2143 match self {
2144 Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. } => cursor,
2145 }
2146 }
2147
2148 #[must_use]
2149 pub fn invert(&self) -> Self {
2150 match self {
2151 Pagination::NewerThan {
2152 length,
2153 cursor,
2154 including_cursor,
2155 } => Pagination::OlderThan {
2156 length: *length,
2157 cursor: cursor.clone(),
2158 including_cursor: !including_cursor,
2159 },
2160 Pagination::OlderThan {
2161 length,
2162 cursor,
2163 including_cursor,
2164 } => Pagination::NewerThan {
2165 length: *length,
2166 cursor: cursor.clone(),
2167 including_cursor: !including_cursor,
2168 },
2169 }
2170 }
2171}
2172
2173#[cfg(feature = "test")]
2174pub async fn wait_for_pending_state_fn<T: Debug>(
2175 db_connection: &dyn DbConnectionTest,
2176 execution_id: &ExecutionId,
2177 predicate: impl Fn(ExecutionLog) -> Option<T> + Send,
2178 timeout: Option<Duration>,
2179) -> Result<T, DbErrorReadWithTimeout> {
2180 tracing::trace!(%execution_id, "Waiting for predicate");
2181 let fut = async move {
2182 loop {
2183 let execution_log = db_connection.get(execution_id).await?;
2184 if let Some(t) = predicate(execution_log) {
2185 tracing::debug!(%execution_id, "Found: {t:?}");
2186 return Ok(t);
2187 }
2188 tokio::time::sleep(Duration::from_millis(10)).await;
2189 }
2190 };
2191
2192 if let Some(timeout) = timeout {
2193 tokio::select! { res = fut => res,
2195 () = tokio::time::sleep(timeout) => Err(DbErrorReadWithTimeout::Timeout(TimeoutOutcome::Timeout))
2196 }
2197 } else {
2198 fut.await
2199 }
2200}
2201
2202#[derive(Debug, Clone, PartialEq, Eq)]
2203pub enum ExpiredTimer {
2204 Lock(ExpiredLock),
2205 Delay(ExpiredDelay),
2206}
2207
2208#[derive(Debug, Clone, PartialEq, Eq)]
2209pub struct ExpiredLock {
2210 pub execution_id: ExecutionId,
2211 pub locked_at_version: Version,
2213 pub next_version: Version,
2214 pub intermittent_event_count: u32,
2216 pub max_retries: Option<u32>,
2217 pub retry_exp_backoff: Duration,
2218 pub locked_by: LockedBy,
2219}
2220
2221#[derive(Debug, Clone, PartialEq, Eq)]
2222pub struct ExpiredDelay {
2223 pub execution_id: ExecutionId,
2224 pub join_set_id: JoinSetId,
2225 pub delay_id: DelayId,
2226}
2227
2228#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2229#[serde(tag = "status", rename_all = "snake_case")]
2230pub enum PendingState {
2231 Locked(PendingStateLocked),
2233
2234 #[display("PendingAt(`{_0}`)")]
2235 PendingAt(PendingStatePendingAt),
2236
2237 #[display("BlockedByJoinSet({_0})")]
2239 BlockedByJoinSet(PendingStateBlockedByJoinSet),
2240
2241 #[display("Paused({_0})")]
2242 Paused(PendingStatePaused),
2243
2244 #[display("Finished: {_0}")]
2245 Finished(PendingStateFinished),
2246}
2247
2248pub enum PendingStateMergedPause {
2249 Locked {
2250 state: PendingStateLocked,
2251 paused: bool,
2252 },
2253 PendingAt {
2254 state: PendingStatePendingAt,
2255 paused: bool,
2256 },
2257 BlockedByJoinSet {
2258 state: PendingStateBlockedByJoinSet,
2259 paused: bool,
2260 },
2261 Finished(PendingStateFinished),
2262}
2263impl From<PendingState> for PendingStateMergedPause {
2264 fn from(state: PendingState) -> Self {
2265 match state {
2266 PendingState::Locked(s) => PendingStateMergedPause::Locked {
2267 state: s,
2268 paused: false,
2269 },
2270
2271 PendingState::PendingAt(s) => PendingStateMergedPause::PendingAt {
2272 state: s,
2273 paused: false,
2274 },
2275
2276 PendingState::BlockedByJoinSet(s) => PendingStateMergedPause::BlockedByJoinSet {
2277 state: s,
2278 paused: false,
2279 },
2280
2281 PendingState::Paused(paused) => match paused {
2282 PendingStatePaused::PendingAt(s) => PendingStateMergedPause::PendingAt {
2283 state: s,
2284 paused: true,
2285 },
2286 PendingStatePaused::BlockedByJoinSet(s) => {
2287 PendingStateMergedPause::BlockedByJoinSet {
2288 state: s,
2289 paused: true,
2290 }
2291 }
2292 },
2293
2294 PendingState::Finished(s) => PendingStateMergedPause::Finished(s),
2295 }
2296 }
2297}
2298
2299#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2300#[display("Locked(`{lock_expires_at}`, {}, {})", locked_by.executor_id, locked_by.run_id)]
2301pub struct PendingStateLocked {
2302 pub locked_by: LockedBy,
2303 pub lock_expires_at: DateTime<Utc>,
2304}
2305
2306#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2307#[display("`{scheduled_at}`, last_lock={last_lock:?}")]
2308pub struct PendingStatePendingAt {
2309 pub scheduled_at: DateTime<Utc>,
2310 pub last_lock: Option<LockedBy>,
2312}
2313
2314#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2315#[display("{join_set_id}, `{lock_expires_at}`, closing={closing}")]
2316pub struct PendingStateBlockedByJoinSet {
2317 pub join_set_id: JoinSetId,
2318 pub lock_expires_at: DateTime<Utc>,
2320 pub closing: bool,
2322}
2323
2324#[derive(Debug, Clone, derive_more::Display, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2328pub enum PendingStatePaused {
2329 #[display("PendingAt({_0})")]
2330 PendingAt(PendingStatePendingAt),
2331 #[display("BlockedByJoinSet({_0})")]
2332 BlockedByJoinSet(PendingStateBlockedByJoinSet),
2333}
2334
2335#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2336pub struct LockedBy {
2337 pub executor_id: ExecutorId,
2338 pub run_id: RunId,
2339}
2340impl From<&Locked> for LockedBy {
2341 fn from(value: &Locked) -> Self {
2342 LockedBy {
2343 executor_id: value.executor_id,
2344 run_id: value.run_id,
2345 }
2346 }
2347}
2348
2349#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, schemars::JsonSchema)]
2350#[cfg_attr(any(test, feature = "test"), derive(Deserialize))]
2351pub struct PendingStateFinished {
2352 pub version: VersionType, pub finished_at: DateTime<Utc>,
2354 pub result_kind: PendingStateFinishedResultKind,
2355}
2356impl Display for PendingStateFinished {
2357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2358 match self.result_kind {
2359 PendingStateFinishedResultKind::Ok => write!(f, "OK"),
2360 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
2361 }
2362 }
2363}
2364
2365#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2367#[serde(rename_all = "snake_case")]
2368pub enum PendingStateFinishedResultKind {
2369 Ok,
2370 Err(PendingStateFinishedError),
2371}
2372impl PendingStateFinishedResultKind {
2373 pub fn as_result(&self) -> Result<(), &PendingStateFinishedError> {
2374 match self {
2375 PendingStateFinishedResultKind::Ok => Ok(()),
2376 PendingStateFinishedResultKind::Err(err) => Err(err),
2377 }
2378 }
2379}
2380
2381impl From<&SupportedFunctionReturnValue> for PendingStateFinishedResultKind {
2382 fn from(result: &SupportedFunctionReturnValue) -> Self {
2383 result.as_pending_state_finished_result()
2384 }
2385}
2386
2387#[derive(
2388 Debug,
2389 Clone,
2390 Copy,
2391 PartialEq,
2392 Eq,
2393 Serialize,
2394 Deserialize,
2395 derive_more::Display,
2396 schemars::JsonSchema,
2397)]
2398#[serde(rename_all = "snake_case")]
2399pub enum PendingStateFinishedError {
2400 #[display("Execution failure ({_0})")]
2401 ExecutionFailure(ExecutionFailureKind),
2402 #[display("Error")]
2403 Error,
2404}
2405
2406impl PendingState {
2407 #[instrument(skip(self))]
2408 pub fn can_append_lock(
2409 &self,
2410 created_at: DateTime<Utc>,
2411 executor_id: ExecutorId,
2412 run_id: RunId,
2413 lock_expires_at: DateTime<Utc>,
2414 ) -> Result<LockKind, DbErrorWriteNonRetriable> {
2415 if lock_expires_at <= created_at {
2416 return Err(DbErrorWriteNonRetriable::ValidationFailed(
2417 "invalid expiry date".into(),
2418 ));
2419 }
2420 match self {
2421 PendingState::PendingAt(PendingStatePendingAt {
2422 scheduled_at,
2423 last_lock,
2424 }) => {
2425 if *scheduled_at <= created_at {
2426 Ok(LockKind::CreatingNewLock)
2428 } else if let Some(LockedBy {
2429 executor_id: last_executor_id,
2430 run_id: last_run_id,
2431 }) = last_lock
2432 && executor_id == *last_executor_id
2433 && run_id == *last_run_id
2434 {
2435 Ok(LockKind::Extending)
2437 } else {
2438 Err(DbErrorWriteNonRetriable::ValidationFailed(
2439 "cannot lock, not yet pending".into(),
2440 ))
2441 }
2442 }
2443 PendingState::Locked(PendingStateLocked {
2444 locked_by:
2445 LockedBy {
2446 executor_id: current_pending_state_executor_id,
2447 run_id: current_pending_state_run_id,
2448 },
2449 lock_expires_at: _,
2450 }) => {
2451 if executor_id == *current_pending_state_executor_id
2452 && run_id == *current_pending_state_run_id
2453 {
2454 Ok(LockKind::Extending)
2456 } else {
2457 Err(DbErrorWriteNonRetriable::IllegalState {
2458 reason: "cannot lock, already locked".into(),
2459 context: SpanTrace::capture(),
2460 source: None,
2461 loc: Location::caller(),
2462 })
2463 }
2464 }
2465 PendingState::BlockedByJoinSet { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2466 reason: "cannot append Locked event when in BlockedByJoinSet state".into(),
2467 context: SpanTrace::capture(),
2468 source: None,
2469 loc: Location::caller(),
2470 }),
2471 PendingState::Finished { .. } => Err(DbErrorWriteNonRetriable::IllegalState {
2472 reason: "already finished".into(),
2473 context: SpanTrace::capture(),
2474 source: None,
2475 loc: Location::caller(),
2476 }),
2477 PendingState::Paused(..) => Err(DbErrorWriteNonRetriable::IllegalState {
2478 reason: "cannot lock, execution is paused".into(),
2479 context: SpanTrace::capture(),
2480 source: None,
2481 loc: Location::caller(),
2482 }),
2483 }
2484 }
2485
2486 #[must_use]
2487 pub fn is_finished(&self) -> bool {
2488 matches!(self, PendingState::Finished { .. })
2489 }
2490
2491 #[must_use]
2492 pub fn is_paused(&self) -> bool {
2493 matches!(self, PendingState::Paused(_))
2494 }
2495}
2496
2497#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2498pub enum LockKind {
2499 Extending,
2500 CreatingNewLock,
2501}
2502
2503pub mod http_client_trace {
2504 use chrono::{DateTime, Utc};
2505 use serde::{Deserialize, Serialize};
2506
2507 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2508 pub struct HttpClientTrace {
2509 pub req: RequestTrace,
2510 pub resp: Option<ResponseTrace>,
2511 }
2512
2513 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2514 pub struct RequestTrace {
2515 pub sent_at: DateTime<Utc>,
2516 pub uri: String,
2517 pub method: String,
2518 }
2519
2520 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2521 pub struct ResponseTrace {
2522 pub finished_at: DateTime<Utc>,
2523 pub status: Result<u16, String>,
2524 }
2525}
2526
2527#[derive(schemars::JsonSchema)]
2529pub struct DbStorageSchema {
2530 pub execution_event: ExecutionEvent,
2531 pub pending_state: PendingState,
2532 pub join_set_response: JoinSetResponse,
2533 pub wasm_backtrace: WasmBacktrace,
2534 pub persisted_function_metadata: PersistedFunctionMetadata,
2535}
2536
2537#[cfg(test)]
2538mod tests {
2539 use super::HistoryEvent;
2540 use super::HistoryEventScheduleAt;
2541 use super::JoinNextTryOutcome;
2542 use super::PendingStateFinished;
2543 use super::PendingStateFinishedError;
2544 use super::PendingStateFinishedResultKind;
2545 use crate::ExecutionFailureKind;
2546 use crate::JoinSetId;
2547 use crate::SupportedFunctionReturnValue;
2548 use chrono::DateTime;
2549 use chrono::Datelike;
2550 use chrono::Utc;
2551 use insta::assert_snapshot;
2552 use rstest::rstest;
2553 use std::time::Duration;
2554 use val_json::type_wrapper::TypeWrapper;
2555 use val_json::wast_val::WastVal;
2556 use val_json::wast_val::WastValWithType;
2557
2558 #[rstest(expected => [
2559 PendingStateFinishedResultKind::Ok,
2560 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2561 ])]
2562 #[test]
2563 fn serde_pending_state_finished_result_kind_should_work(
2564 expected: PendingStateFinishedResultKind,
2565 ) {
2566 let ser = serde_json::to_string(&expected).unwrap();
2567 let actual: PendingStateFinishedResultKind = serde_json::from_str(&ser).unwrap();
2568 assert_eq!(expected, actual);
2569 }
2570
2571 #[rstest(result_kind => [
2572 PendingStateFinishedResultKind::Ok,
2573 PendingStateFinishedResultKind::Err(PendingStateFinishedError::ExecutionFailure(ExecutionFailureKind::TimedOut)),
2574 ])]
2575 #[test]
2576 fn serde_pending_state_finished_should_work(result_kind: PendingStateFinishedResultKind) {
2577 let expected = PendingStateFinished {
2578 version: 0,
2579 finished_at: Utc::now(),
2580 result_kind,
2581 };
2582
2583 let ser = serde_json::to_string(&expected).unwrap();
2584 let actual: PendingStateFinished = serde_json::from_str(&ser).unwrap();
2585 assert_eq!(expected, actual);
2586 }
2587
2588 #[test]
2589 fn join_set_deser_with_result_ok_option_none_should_work() {
2590 let expected = SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2591 r#type: TypeWrapper::Result {
2592 ok: Some(Box::new(TypeWrapper::Option(Box::new(TypeWrapper::String)))),
2593 err: Some(Box::new(TypeWrapper::String)),
2594 },
2595 value: WastVal::Result(Ok(Some(Box::new(WastVal::Option(None))))),
2596 }));
2597 let json = serde_json::to_string(&expected).unwrap();
2598 assert_snapshot!(json);
2599
2600 let actual: SupportedFunctionReturnValue = serde_json::from_str(&json).unwrap();
2601
2602 assert_eq!(expected, actual);
2603 }
2604
2605 #[test]
2606 fn as_date_time_should_work_with_duration_u32_max_secs() {
2607 let duration = Duration::from_secs(u64::from(u32::MAX));
2608 let schedule_at = HistoryEventScheduleAt::In(duration);
2609 let resolved = schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap();
2610 assert_eq!(2106, resolved.year());
2611 }
2612
2613 const MILLIS_PER_SEC: i64 = 1000;
2614 const TIMEDELTA_MAX_SECS: i64 = i64::MAX / MILLIS_PER_SEC;
2615
2616 #[test]
2617 fn as_date_time_should_fail_on_duration_secs_greater_than_i64_max() {
2618 let duration = Duration::from_secs(
2620 u64::try_from(TIMEDELTA_MAX_SECS).expect("positive number must not fail") + 1,
2621 );
2622 let schedule_at = HistoryEventScheduleAt::In(duration);
2623 schedule_at.as_date_time(DateTime::UNIX_EPOCH).unwrap_err();
2624 }
2625
2626 #[test]
2627 fn join_next_try_outcome_new_format() {
2628 let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"found"}"#;
2629 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2630 assert_eq!(
2631 event,
2632 HistoryEvent::JoinNextTry {
2633 join_set_id: JoinSetId::new(
2634 crate::JoinSetKind::Named,
2635 crate::StrVariant::Static("test")
2636 )
2637 .unwrap(),
2638 outcome: JoinNextTryOutcome::Found,
2639 }
2640 );
2641
2642 let json = r#"{"type":"join_next_try","join_set_id":"n:test","outcome":"all_processed"}"#;
2643 let event: HistoryEvent = serde_json::from_str(json).unwrap();
2644 assert_eq!(
2645 event,
2646 HistoryEvent::JoinNextTry {
2647 join_set_id: JoinSetId::new(
2648 crate::JoinSetKind::Named,
2649 crate::StrVariant::Static("test")
2650 )
2651 .unwrap(),
2652 outcome: JoinNextTryOutcome::AllProcessed,
2653 }
2654 );
2655 }
2656
2657 #[test]
2658 fn join_next_try_outcome_serializes_new_format() {
2659 let event = HistoryEvent::JoinNextTry {
2660 join_set_id: JoinSetId::new(
2661 crate::JoinSetKind::Named,
2662 crate::StrVariant::Static("test"),
2663 )
2664 .unwrap(),
2665 outcome: JoinNextTryOutcome::AllProcessed,
2666 };
2667 let json = serde_json::to_string(&event).unwrap();
2668 assert!(
2669 json.contains(r#""outcome":"all_processed""#),
2670 "expected outcome field, got: {json}"
2671 );
2672 assert!(
2673 !json.contains("found_response"),
2674 "should not contain old field, got: {json}"
2675 );
2676 }
2677
2678 mod stub_retval_hash {
2679 use super::super::{StubRetVal, StubRetValHash};
2680 use crate::SupportedFunctionReturnValue;
2681 use val_json::type_wrapper::TypeWrapper;
2682 use val_json::wast_val::{WastVal, WastValWithType};
2683
2684 #[test]
2685 fn typed_variant_hash_is_stable() {
2686 let retval =
2687 StubRetVal::Typed(SupportedFunctionReturnValue::Ok(Some(WastValWithType {
2688 r#type: TypeWrapper::String,
2689 value: WastVal::String("hello".into()),
2690 })));
2691 let hash = retval.hash();
2692 assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2694 assert_eq!(hash.to_string().len(), 66);
2696 }
2697
2698 #[test]
2699 fn untyped_variant_hash_is_stable() {
2700 let retval = StubRetVal::Untyped(r#"{"ok": "hello"}"#.to_string());
2701 let hash = retval.hash();
2702 assert_eq!(hash.to_string().chars().take(2).collect::<String>(), "01");
2704 assert_eq!(hash.to_string().len(), 66);
2706 }
2707
2708 #[test]
2709 fn different_values_produce_different_hashes() {
2710 let typed1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2711 let typed2 = StubRetVal::Typed(SupportedFunctionReturnValue::Err(None));
2712 let untyped1 = StubRetVal::Untyped("value1".to_string());
2713 let untyped2 = StubRetVal::Untyped("value2".to_string());
2714
2715 let hashes: Vec<_> = [typed1, typed2, untyped1, untyped2]
2716 .into_iter()
2717 .map(|r| r.hash().to_string())
2718 .collect();
2719
2720 for (i, h1) in hashes.iter().enumerate() {
2722 for h2 in hashes.iter().skip(i + 1) {
2723 assert_ne!(h1, h2, "hashes should be different");
2724 }
2725 }
2726 }
2727
2728 #[test]
2729 fn same_values_produce_same_hashes() {
2730 let retval1 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2731 let retval2 = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2732 assert_eq!(retval1.hash(), retval2.hash());
2733
2734 let untyped1 = StubRetVal::Untyped("test".to_string());
2735 let untyped2 = StubRetVal::Untyped("test".to_string());
2736 assert_eq!(untyped1.hash(), untyped2.hash());
2737 }
2738
2739 #[test]
2740 fn hash_serialization_roundtrip() {
2741 let retval = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2742 let hash = retval.hash();
2743
2744 let serialized = serde_json::to_string(&hash).unwrap();
2745 let deserialized: StubRetValHash = serde_json::from_str(&serialized).unwrap();
2746
2747 assert_eq!(hash, deserialized);
2748 }
2749
2750 #[test]
2751 fn hash_display_and_fromstr_roundtrip() {
2752 let retval = StubRetVal::Untyped("test value".to_string());
2753 let hash = retval.hash();
2754
2755 let display = hash.to_string();
2756 let parsed: StubRetValHash = display.parse().unwrap();
2757
2758 assert_eq!(hash, parsed);
2759 }
2760
2761 #[test]
2762 fn typed_and_untyped_with_same_content_produce_different_hashes() {
2763 let typed = StubRetVal::Typed(SupportedFunctionReturnValue::Ok(None));
2765 let json_of_typed =
2766 serde_json::to_string(&SupportedFunctionReturnValue::Ok(None)).unwrap();
2767 let untyped = StubRetVal::Untyped(json_of_typed);
2768
2769 assert_ne!(typed.hash(), untyped.hash());
2770 }
2771 }
2772}