1pub mod component_id;
2mod error_conversions;
3#[cfg(feature = "postgres")]
4mod postgres_ext;
5#[cfg(feature = "rusqlite")]
6mod rusqlite_ext;
7pub mod storage;
8pub mod time;
9
10pub use crate::component_id::{
11 ComponentId, ComponentType, ContentDigest, InvalidNameError, check_name,
12};
13use ::serde::{Deserialize, Serialize};
14use assert_matches::assert_matches;
15pub use indexmap;
16use indexmap::IndexMap;
17use opentelemetry::propagation::{Extractor, Injector};
18pub use prefixed_ulid::ExecutionId;
19use serde_json::Value;
20use std::{
21 borrow::Borrow,
22 fmt::{Debug, Display},
23 hash::Hash,
24 marker::PhantomData,
25 ops::Deref,
26 str::FromStr,
27 sync::Arc,
28 time::Duration,
29};
30use storage::{PendingStateFinishedError, PendingStateFinishedResultKind};
31use tracing::{Span, error};
32use val_json::{
33 type_wrapper::{TypeConversionError, TypeWrapper},
34 wast_val::{WastVal, WastValWithType},
35 wast_val_ser::params,
36};
37use wasmtime::component::{Type, Val};
38
39pub const NAMESPACE_OBELISK: &str = "obelisk";
40const NAMESPACE_WASI: &str = "wasi";
41pub const SUFFIX_PKG_EXT: &str = "-obelisk-ext";
42pub const SUFFIX_PKG_SCHEDULE: &str = "-obelisk-schedule";
43pub const SUFFIX_PKG_STUB: &str = "-obelisk-stub";
44
45#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
46#[error("execution error: {kind}")]
47pub struct FinishedExecutionError {
48 pub kind: ExecutionFailureKind,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub reason: Option<String>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub detail: Option<String>,
53}
54impl FinishedExecutionError {
55 #[must_use]
56 pub fn as_pending_state_finished_error(&self) -> PendingStateFinishedError {
57 PendingStateFinishedError::ExecutionFailure(self.kind)
58 }
59}
60
61#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum ExecutionFailureKind {
64 TimedOut,
66 NondeterminismDetected,
68 OutOfFuel,
70 Cancelled,
72 Uncategorized,
73}
74
75#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
76#[serde(rename_all = "snake_case")]
77pub enum TrapKind {
78 #[display("trap")]
79 Trap,
80 #[display("post_return_trap")]
81 PostReturnTrap,
82 #[display("out of fuel")]
83 OutOfFuel,
84 #[display("host function error")]
85 HostFunctionError,
86}
87
88#[derive(Clone, Eq, derive_more::Display)]
89pub enum StrVariant {
90 Static(&'static str),
91 Arc(Arc<str>),
92}
93
94impl StrVariant {
95 #[must_use]
96 pub const fn empty() -> StrVariant {
97 StrVariant::Static("")
98 }
99}
100
101impl From<String> for StrVariant {
102 fn from(value: String) -> Self {
103 StrVariant::Arc(Arc::from(value))
104 }
105}
106
107impl From<&'static str> for StrVariant {
108 fn from(value: &'static str) -> Self {
109 StrVariant::Static(value)
110 }
111}
112
113impl PartialEq for StrVariant {
114 fn eq(&self, other: &Self) -> bool {
115 match (self, other) {
116 (Self::Static(left), Self::Static(right)) => left == right,
117 (Self::Static(left), Self::Arc(right)) => *left == right.deref(),
118 (Self::Arc(left), Self::Arc(right)) => left == right,
119 (Self::Arc(left), Self::Static(right)) => left.deref() == *right,
120 }
121 }
122}
123
124impl Hash for StrVariant {
125 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
126 match self {
127 StrVariant::Static(val) => val.hash(state),
128 StrVariant::Arc(val) => {
129 let str: &str = val.deref();
130 str.hash(state);
131 }
132 }
133 }
134}
135
136impl Debug for StrVariant {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 Display::fmt(self, f)
139 }
140}
141
142impl Deref for StrVariant {
143 type Target = str;
144 fn deref(&self) -> &Self::Target {
145 match self {
146 Self::Arc(v) => v,
147 Self::Static(v) => v,
148 }
149 }
150}
151
152impl AsRef<str> for StrVariant {
153 fn as_ref(&self) -> &str {
154 match self {
155 Self::Arc(v) => v,
156 Self::Static(v) => v,
157 }
158 }
159}
160
161mod serde_strvariant {
162 use crate::StrVariant;
163 use serde::{
164 Deserialize, Deserializer, Serialize, Serializer,
165 de::{self, Visitor},
166 };
167 use std::{ops::Deref, sync::Arc};
168
169 impl Serialize for StrVariant {
170 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
171 where
172 S: Serializer,
173 {
174 serializer.serialize_str(self.deref())
175 }
176 }
177
178 impl<'de> Deserialize<'de> for StrVariant {
179 fn deserialize<D>(deserializer: D) -> Result<StrVariant, D::Error>
180 where
181 D: Deserializer<'de>,
182 {
183 deserializer.deserialize_str(StrVariantVisitor)
184 }
185 }
186
187 struct StrVariantVisitor;
188
189 impl Visitor<'_> for StrVariantVisitor {
190 type Value = StrVariant;
191
192 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
193 formatter.write_str("a string")
194 }
195
196 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
197 where
198 E: de::Error,
199 {
200 Ok(StrVariant::Arc(Arc::from(v)))
201 }
202 }
203}
204
205#[derive(Hash, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
206#[display("{value}")]
207#[serde(transparent)]
208pub struct Name<T> {
209 pub value: StrVariant,
210 #[serde(skip)]
211 phantom_data: PhantomData<fn(T) -> T>,
212}
213
214impl<T> Name<T> {
215 #[must_use]
216 pub fn new_arc(value: Arc<str>) -> Self {
217 Self {
218 value: StrVariant::Arc(value),
219 phantom_data: PhantomData,
220 }
221 }
222
223 #[must_use]
224 pub const fn new_static(value: &'static str) -> Self {
225 Self {
226 value: StrVariant::Static(value),
227 phantom_data: PhantomData,
228 }
229 }
230}
231
232impl<T> Debug for Name<T> {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 Display::fmt(&self, f)
235 }
236}
237
238impl<T> Deref for Name<T> {
239 type Target = str;
240
241 fn deref(&self) -> &Self::Target {
242 self.value.deref()
243 }
244}
245
246impl<T> Borrow<str> for Name<T> {
247 fn borrow(&self) -> &str {
248 self.deref()
249 }
250}
251
252impl<T> From<String> for Name<T> {
253 fn from(value: String) -> Self {
254 Self::new_arc(Arc::from(value))
255 }
256}
257
258#[derive(Clone, Copy, Debug, PartialEq, strum::EnumIter, derive_more::Display)]
259#[display("{}", self.suffix())]
260pub enum PackageExtension {
261 ObeliskExt,
262 ObeliskSchedule,
263 ObeliskStub,
264}
265impl PackageExtension {
266 fn suffix(&self) -> &'static str {
267 match self {
268 PackageExtension::ObeliskExt => SUFFIX_PKG_EXT,
269 PackageExtension::ObeliskSchedule => SUFFIX_PKG_SCHEDULE,
270 PackageExtension::ObeliskStub => SUFFIX_PKG_STUB,
271 }
272 }
273}
274
275#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
276#[cfg_attr(feature = "test", derive(Serialize))]
277pub struct PkgFqn {
278 pub namespace: String, pub package_name: String,
280 pub version: Option<String>,
281}
282impl PkgFqn {
283 #[must_use]
284 pub fn is_extension(&self) -> bool {
285 Self::is_package_name_ext(&self.package_name)
286 }
287
288 #[must_use]
289 pub fn split_ext(&self) -> Option<(PkgFqn, PackageExtension)> {
290 use strum::IntoEnumIterator;
291 for package_ext in PackageExtension::iter() {
292 if let Some(package_name) = self.package_name.strip_suffix(package_ext.suffix()) {
293 return Some((
294 PkgFqn {
295 namespace: self.namespace.clone(),
296 package_name: package_name.to_string(),
297 version: self.version.clone(),
298 },
299 package_ext,
300 ));
301 }
302 }
303 None
304 }
305
306 fn is_package_name_ext(package_name: &str) -> bool {
307 package_name.ends_with(SUFFIX_PKG_EXT)
308 || package_name.ends_with(SUFFIX_PKG_SCHEDULE)
309 || package_name.ends_with(SUFFIX_PKG_STUB)
310 }
311}
312impl Display for PkgFqn {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 let PkgFqn {
315 namespace,
316 package_name,
317 version,
318 } = self;
319 if let Some(version) = version {
320 write!(f, "{namespace}:{package_name}@{version}")
321 } else {
322 write!(f, "{namespace}:{package_name}")
323 }
324 }
325}
326
327#[derive(Hash, Clone, PartialEq, Eq)]
328pub struct IfcFqnMarker;
329
330pub type IfcFqnName = Name<IfcFqnMarker>; impl IfcFqnName {
333 #[must_use]
334 pub fn namespace(&self) -> &str {
335 self.deref().split_once(':').unwrap().0
336 }
337
338 #[must_use]
339 pub fn package_name(&self) -> &str {
340 let after_colon = self.deref().split_once(':').unwrap().1;
341 after_colon.split_once('/').unwrap().0
342 }
343
344 #[must_use]
345 pub fn version(&self) -> Option<&str> {
346 self.deref().split_once('@').map(|(_, version)| version)
347 }
348
349 #[must_use]
350 pub fn pkg_fqn_name(&self) -> PkgFqn {
351 let (namespace, rest) = self.deref().split_once(':').unwrap();
352 let (package_name, rest) = rest.split_once('/').unwrap();
353 let version = rest.split_once('@').map(|(_, version)| version);
354 PkgFqn {
355 namespace: namespace.to_string(),
356 package_name: package_name.to_string(),
357 version: version.map(std::string::ToString::to_string),
358 }
359 }
360
361 #[must_use]
362 pub fn ifc_name(&self) -> &str {
363 let after_colon = self.deref().split_once(':').unwrap().1;
364 let after_slash = after_colon.split_once('/').unwrap().1;
365 after_slash
366 .split_once('@')
367 .map_or(after_slash, |(ifc, _)| ifc)
368 }
369
370 #[must_use]
371 pub fn from_parts(
372 namespace: &str,
373 package_name: &str,
374 ifc_name: &str,
375 version: Option<&str>,
376 ) -> Self {
377 let mut str = format!("{namespace}:{package_name}/{ifc_name}");
378 if let Some(version) = version {
379 str += "@";
380 str += version;
381 }
382 Self::new_arc(Arc::from(str))
383 }
384
385 #[must_use]
386 pub fn is_extension(&self) -> bool {
388 PkgFqn::is_package_name_ext(self.package_name())
389 }
390
391 #[must_use]
392 pub fn package_strip_obelisk_ext_suffix(&self) -> Option<&str> {
393 self.package_name().strip_suffix(SUFFIX_PKG_EXT)
394 }
395
396 #[must_use]
397 pub fn package_strip_obelisk_schedule_suffix(&self) -> Option<&str> {
398 self.package_name().strip_suffix(SUFFIX_PKG_SCHEDULE)
399 }
400
401 #[must_use]
402 pub fn package_strip_obelisk_stub_suffix(&self) -> Option<&str> {
403 self.package_name().strip_suffix(SUFFIX_PKG_STUB)
404 }
405
406 #[must_use]
407 pub fn is_namespace_obelisk(&self) -> bool {
408 self.namespace() == NAMESPACE_OBELISK
409 }
410
411 #[must_use]
412 pub fn is_namespace_wasi(&self) -> bool {
413 self.namespace() == NAMESPACE_WASI
414 }
415}
416
417#[derive(Hash, Clone, PartialEq, Eq)]
418pub struct FnMarker;
419
420pub type FnName = Name<FnMarker>;
421
422#[derive(
423 Hash, Clone, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
424)]
425pub struct FunctionFqn {
426 pub ifc_fqn: IfcFqnName,
427 pub function_name: FnName,
428}
429
430impl FunctionFqn {
431 #[must_use]
432 pub fn new_arc(ifc_fqn: Arc<str>, function_name: Arc<str>) -> Self {
433 Self {
434 ifc_fqn: Name::new_arc(ifc_fqn),
435 function_name: Name::new_arc(function_name),
436 }
437 }
438
439 #[must_use]
440 pub const fn new_static(ifc_fqn: &'static str, function_name: &'static str) -> Self {
441 Self {
442 ifc_fqn: Name::new_static(ifc_fqn),
443 function_name: Name::new_static(function_name),
444 }
445 }
446
447 #[must_use]
448 pub const fn new_static_tuple(tuple: (&'static str, &'static str)) -> Self {
449 Self::new_static(tuple.0, tuple.1)
450 }
451
452 pub fn try_from_tuple(
453 ifc_fqn: &str,
454 function_name: &str,
455 ) -> Result<Self, FunctionFqnParseError> {
456 if function_name.contains('.') {
457 Err(FunctionFqnParseError::DelimiterFoundInFunctionName)
458 } else {
459 Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
460 }
461 }
462}
463
464#[derive(Debug, thiserror::Error)]
465pub enum FunctionFqnParseError {
466 #[error("delimiter `.` not found")]
467 DelimiterNotFound,
468 #[error("delimiter `.` found in function name")]
469 DelimiterFoundInFunctionName,
470}
471
472impl FromStr for FunctionFqn {
473 type Err = FunctionFqnParseError;
474
475 fn from_str(s: &str) -> Result<Self, Self::Err> {
476 if let Some((ifc_fqn, function_name)) = s.rsplit_once('.') {
477 Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
478 } else {
479 Err(FunctionFqnParseError::DelimiterNotFound)
480 }
481 }
482}
483
484impl Display for FunctionFqn {
485 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486 write!(
487 f,
488 "{ifc_fqn}.{function_name}",
489 ifc_fqn = self.ifc_fqn,
490 function_name = self.function_name
491 )
492 }
493}
494
495impl Debug for FunctionFqn {
496 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497 Display::fmt(&self, f)
498 }
499}
500
501#[cfg(any(test, feature = "test"))]
502impl<'a> arbitrary::Arbitrary<'a> for FunctionFqn {
503 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
504 let illegal = [':', '@', '.'];
505 let namespace = u.arbitrary::<String>()?.replace(illegal, "");
506 let pkg_name = u.arbitrary::<String>()?.replace(illegal, "");
507 let ifc_name = u.arbitrary::<String>()?.replace(illegal, "");
508 let fn_name = u.arbitrary::<String>()?.replace(illegal, "");
509
510 Ok(FunctionFqn::new_arc(
511 Arc::from(format!("{namespace}:{pkg_name}/{ifc_name}")),
512 Arc::from(fn_name),
513 ))
514 }
515}
516
517#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
518pub struct TypeWrapperTopLevel {
519 pub ok: Option<Box<TypeWrapper>>,
520 pub err: Option<Box<TypeWrapper>>,
521}
522impl From<TypeWrapperTopLevel> for TypeWrapper {
523 fn from(value: TypeWrapperTopLevel) -> TypeWrapper {
524 TypeWrapper::Result {
525 ok: value.ok,
526 err: value.err,
527 }
528 }
529}
530
531#[derive(Clone, derive_more::Debug, PartialEq, Eq, Serialize, Deserialize)]
532pub enum SupportedFunctionReturnValue {
533 Ok {
534 #[debug(skip)]
535 ok: Option<WastValWithType>,
536 },
537 Err {
538 #[debug(skip)]
539 err: Option<WastValWithType>,
540 },
541 ExecutionError(FinishedExecutionError),
542}
543impl Display for SupportedFunctionReturnValue {
544 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
545 match self.as_pending_state_finished_result() {
546 PendingStateFinishedResultKind::Ok => write!(f, "execution completed successfully"),
547 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
548 }
549 }
550}
551
552pub const SUPPORTED_RETURN_VALUE_OK_EMPTY: SupportedFunctionReturnValue =
553 SupportedFunctionReturnValue::Ok { ok: None };
554
555#[derive(Debug, thiserror::Error)]
556pub enum ResultParsingError {
557 #[error("return value must not be empty")]
558 NoValue,
559 #[error("return value cannot be parsed, multi-value results are not supported")]
560 MultiValue,
561 #[error("return value cannot be parsed, {0}")]
562 TypeConversionError(val_json::type_wrapper::TypeConversionError),
563 #[error(transparent)]
564 ResultParsingErrorFromVal(ResultParsingErrorFromVal),
565}
566
567#[derive(Debug, thiserror::Error)]
568pub enum ResultParsingErrorFromVal {
569 #[error("return value cannot be parsed, {0}")]
570 WastValConversionError(val_json::wast_val::WastValConversionError),
571 #[error("top level type must be a result")]
572 TopLevelTypeMustBeAResult,
573 #[error("value does not type check")]
574 TypeCheckError,
575}
576
577impl SupportedFunctionReturnValue {
578 pub fn new<
579 I: ExactSizeIterator<Item = (wasmtime::component::Val, wasmtime::component::Type)>,
580 >(
581 mut iter: I,
582 ) -> Result<Self, ResultParsingError> {
583 if iter.len() == 0 {
584 Err(ResultParsingError::NoValue)
585 } else if iter.len() == 1 {
586 let (val, r#type) = iter.next().unwrap();
587 let r#type =
588 TypeWrapper::try_from(r#type).map_err(ResultParsingError::TypeConversionError)?;
589 Self::from_val_and_type_wrapper(val, r#type)
590 .map_err(ResultParsingError::ResultParsingErrorFromVal)
591 } else {
592 Err(ResultParsingError::MultiValue)
593 }
594 }
595
596 #[expect(clippy::result_unit_err)]
597 pub fn from_wast_val_with_type(
598 value: WastValWithType,
599 ) -> Result<SupportedFunctionReturnValue, ()> {
600 match value {
601 WastValWithType {
602 r#type: TypeWrapper::Result { ok: None, err: _ },
603 value: WastVal::Result(Ok(None)),
604 } => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
605 WastValWithType {
606 r#type:
607 TypeWrapper::Result {
608 ok: Some(ok),
609 err: _,
610 },
611 value: WastVal::Result(Ok(Some(value))),
612 } => Ok(SupportedFunctionReturnValue::Ok {
613 ok: Some(WastValWithType {
614 r#type: *ok,
615 value: *value,
616 }),
617 }),
618 WastValWithType {
619 r#type: TypeWrapper::Result { ok: _, err: None },
620 value: WastVal::Result(Err(None)),
621 } => Ok(SupportedFunctionReturnValue::Err { err: None }),
622 WastValWithType {
623 r#type:
624 TypeWrapper::Result {
625 ok: _,
626 err: Some(err),
627 },
628 value: WastVal::Result(Err(Some(value))),
629 } => Ok(SupportedFunctionReturnValue::Err {
630 err: Some(WastValWithType {
631 r#type: *err,
632 value: *value,
633 }),
634 }),
635 _ => Err(()),
636 }
637 }
638
639 pub fn from_val_and_type_wrapper(
640 value: wasmtime::component::Val,
641 ty: TypeWrapper,
642 ) -> Result<Self, ResultParsingErrorFromVal> {
643 let TypeWrapper::Result { ok, err } = ty else {
644 return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
645 };
646 let ty = TypeWrapperTopLevel { ok, err };
647 Self::from_val_and_type_wrapper_tl(value, ty)
648 }
649
650 pub fn from_val_and_type_wrapper_tl(
651 value: wasmtime::component::Val,
652 ty: TypeWrapperTopLevel,
653 ) -> Result<Self, ResultParsingErrorFromVal> {
654 let wasmtime::component::Val::Result(value) = value else {
655 return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
656 };
657
658 match (ty.ok, ty.err, value) {
659 (None, _, Ok(None)) => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
660 (Some(ok_type), _, Ok(Some(value))) => Ok(SupportedFunctionReturnValue::Ok {
661 ok: Some(WastValWithType {
662 r#type: *ok_type,
663 value: WastVal::try_from(*value)
664 .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
665 }),
666 }),
667 (_, None, Err(None)) => Ok(SupportedFunctionReturnValue::Err { err: None }),
668 (_, Some(err_type), Err(Some(value))) => Ok(SupportedFunctionReturnValue::Err {
669 err: Some(WastValWithType {
670 r#type: *err_type,
671 value: WastVal::try_from(*value)
672 .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
673 }),
674 }),
675 _other => Err(ResultParsingErrorFromVal::TypeCheckError),
676 }
677 }
678
679 #[must_use]
680 pub fn into_wast_val(self, get_return_type: impl FnOnce() -> TypeWrapperTopLevel) -> WastVal {
681 match self {
682 SupportedFunctionReturnValue::Ok { ok: None } => WastVal::Result(Ok(None)),
683 SupportedFunctionReturnValue::Ok { ok: Some(v) } => {
684 WastVal::Result(Ok(Some(Box::new(v.value))))
685 }
686 SupportedFunctionReturnValue::Err { err: None } => WastVal::Result(Err(None)),
687 SupportedFunctionReturnValue::Err { err: Some(v) } => {
688 WastVal::Result(Err(Some(Box::new(v.value))))
689 }
690 SupportedFunctionReturnValue::ExecutionError(_) => {
691 execution_error_to_wast_val(&get_return_type())
692 }
693 }
694 }
695
696 #[must_use]
697 pub fn as_pending_state_finished_result(&self) -> PendingStateFinishedResultKind {
698 match self {
699 SupportedFunctionReturnValue::Ok { ok: _ } => PendingStateFinishedResultKind::Ok,
700 SupportedFunctionReturnValue::Err { err: _ } => {
701 PendingStateFinishedResultKind::Err(PendingStateFinishedError::Error)
702 }
703 SupportedFunctionReturnValue::ExecutionError(err) => {
704 PendingStateFinishedResultKind::Err(err.as_pending_state_finished_error())
705 }
706 }
707 }
708}
709
710#[must_use]
711pub fn execution_error_to_wast_val(ret_type: &TypeWrapperTopLevel) -> WastVal {
712 match ret_type {
713 TypeWrapperTopLevel { ok: _, err: None } => return WastVal::Result(Err(None)),
714 TypeWrapperTopLevel {
715 ok: _,
716 err: Some(inner),
717 } => match inner.as_ref() {
718 TypeWrapper::String => {
719 return WastVal::Result(Err(Some(Box::new(WastVal::String(
720 EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
721 )))));
722 }
723 TypeWrapper::Variant(variants) => {
724 if variants.get(EXECUTION_FAILED_STRING_OR_VARIANT) == Some(&None) {
725 return WastVal::Result(Err(Some(Box::new(WastVal::Variant(
726 EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
727 None,
728 )))));
729 }
730 }
731 _ => {}
732 },
733 }
734 unreachable!("unexpected top-level return type {ret_type:?} cannot be ReturnTypeCompatible")
735}
736
737#[derive(Debug, Clone)]
738pub struct Params(ParamsInternal);
739
740#[derive(derive_more::Debug, Clone)]
741enum ParamsInternal {
742 JsonValues(Arc<[Value]>),
743 Vals {
744 vals: Arc<[wasmtime::component::Val]>,
745 #[debug(skip)]
746 json_vals_cache: Arc<std::sync::RwLock<Option<Arc<[Value]>>>>, },
748 Empty,
749}
750
751pub const SUFFIX_FN_SUBMIT: &str = "-submit";
752pub const SUFFIX_FN_AWAIT_NEXT: &str = "-await-next";
753pub const SUFFIX_FN_SCHEDULE: &str = "-schedule";
754pub const SUFFIX_FN_STUB: &str = "-stub";
755pub const SUFFIX_FN_GET: &str = "-get";
756
757#[derive(
758 Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, strum::EnumIter,
759)]
760#[serde(rename_all = "snake_case")]
761pub enum FunctionExtension {
762 Submit,
763 AwaitNext,
764 Schedule,
765 Stub,
766 Get,
767}
768impl FunctionExtension {
769 #[must_use]
770 pub fn suffix(&self) -> &'static str {
771 match self {
772 FunctionExtension::Submit => SUFFIX_FN_SUBMIT,
773 FunctionExtension::AwaitNext => SUFFIX_FN_AWAIT_NEXT,
774 FunctionExtension::Schedule => SUFFIX_FN_SCHEDULE,
775 FunctionExtension::Stub => SUFFIX_FN_STUB,
776 FunctionExtension::Get => SUFFIX_FN_GET,
777 }
778 }
779
780 #[must_use]
781 pub fn belongs_to(&self, pkg_ext: PackageExtension) -> bool {
782 matches!(
783 (pkg_ext, self),
784 (
785 PackageExtension::ObeliskExt,
786 FunctionExtension::Submit | FunctionExtension::AwaitNext | FunctionExtension::Get
787 ) | (
788 PackageExtension::ObeliskSchedule,
789 FunctionExtension::Schedule
790 ) | (PackageExtension::ObeliskStub, FunctionExtension::Stub)
791 )
792 }
793}
794
795#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
796pub struct FunctionMetadata {
797 pub ffqn: FunctionFqn,
798 pub parameter_types: ParameterTypes,
799 pub return_type: ReturnType,
800 pub extension: Option<FunctionExtension>,
801 pub submittable: bool,
803}
804impl FunctionMetadata {
805 #[must_use]
806 pub fn split_extension(&self) -> Option<(&str, FunctionExtension)> {
807 self.extension.map(|extension| {
808 let prefix = self
809 .ffqn
810 .function_name
811 .value
812 .strip_suffix(extension.suffix())
813 .unwrap_or_else(|| {
814 panic!(
815 "extension function {} must end with expected suffix {}",
816 self.ffqn.function_name,
817 extension.suffix()
818 )
819 });
820 (prefix, extension)
821 })
822 }
823}
824impl Display for FunctionMetadata {
825 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826 write!(
827 f,
828 "{ffqn}: func{params} -> {return_type}",
829 ffqn = self.ffqn,
830 params = self.parameter_types,
831 return_type = self.return_type,
832 )
833 }
834}
835
836pub mod serde_params {
837 use crate::{Params, ParamsInternal};
838 use serde::de::{SeqAccess, Visitor};
839 use serde::ser::SerializeSeq;
840 use serde::{Deserialize, Serialize};
841 use serde_json::Value;
842 use std::sync::Arc;
843 use val_json::wast_val::WastVal;
844
845 impl Serialize for Params {
846 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
847 where
848 S: ::serde::Serializer,
849 {
850 let serialize_json_values = |slice: &[serde_json::Value], serializer: S| {
851 let mut seq = serializer.serialize_seq(Some(slice.len()))?;
852 for item in slice {
853 seq.serialize_element(item)?;
854 }
855 seq.end()
856 };
857 match &self.0 {
858 ParamsInternal::Vals {
859 vals,
860 json_vals_cache,
861 } => {
862 let guard = json_vals_cache.read().unwrap();
863 if let Some(slice) = &*guard {
864 serialize_json_values(slice, serializer)
865 } else {
866 drop(guard);
867 let mut json_vals = Vec::with_capacity(vals.len());
868 for val in vals.iter() {
869 let value = WastVal::try_from(val.clone())
870 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
871 let value = serde_json::to_value(&value)
872 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
873 json_vals.push(value);
874 }
875 let res = serialize_json_values(&json_vals, serializer);
876 *json_vals_cache.write().unwrap() = Some(Arc::from(json_vals));
877 res
878 }
879 }
880 ParamsInternal::Empty => serializer.serialize_seq(Some(0))?.end(),
881 ParamsInternal::JsonValues(vec) => serialize_json_values(vec, serializer),
882 }
883 }
884 }
885
886 pub struct VecVisitor;
887
888 impl<'de> Visitor<'de> for VecVisitor {
889 type Value = Vec<Value>;
890
891 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
892 formatter.write_str("a sequence of `Value`")
893 }
894
895 #[inline]
896 fn visit_seq<V>(self, mut visitor: V) -> Result<Self::Value, V::Error>
897 where
898 V: SeqAccess<'de>,
899 {
900 let mut vec = Vec::new();
901 while let Some(elem) = visitor.next_element()? {
902 vec.push(elem);
903 }
904 Ok(vec)
905 }
906 }
907
908 impl<'de> Deserialize<'de> for Params {
909 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
910 where
911 D: serde::Deserializer<'de>,
912 {
913 let vec: Vec<Value> = deserializer.deserialize_seq(VecVisitor)?;
914 if vec.is_empty() {
915 Ok(Self(ParamsInternal::Empty))
916 } else {
917 Ok(Self(ParamsInternal::JsonValues(Arc::from(vec))))
918 }
919 }
920 }
921}
922
923#[derive(Debug, thiserror::Error)]
924pub enum ParamsParsingError {
925 #[error("parameters cannot be parsed, cannot convert type of {idx}-th parameter")]
926 ParameterTypeError {
927 idx: usize,
928 err: TypeConversionError,
929 },
930 #[error("parameters cannot be deserialized: {0}")]
931 ParamsDeserializationError(serde_json::Error),
932 #[error("parameter cardinality mismatch, expected: {expected}, specified: {specified}")]
933 ParameterCardinalityMismatch { expected: usize, specified: usize },
934}
935
936impl ParamsParsingError {
937 #[must_use]
938 pub fn detail(&self) -> Option<String> {
939 match self {
940 ParamsParsingError::ParameterTypeError { err, .. } => Some(format!("{err:?}")),
941 ParamsParsingError::ParamsDeserializationError(err) => Some(format!("{err:?}")),
942 ParamsParsingError::ParameterCardinalityMismatch { .. } => None,
943 }
944 }
945}
946
947#[derive(Debug, thiserror::Error)]
948pub enum ParamsFromJsonError {
949 #[error("value must be a json array containing function parameters")]
950 MustBeArray,
951}
952
953impl Params {
954 #[must_use]
955 pub const fn empty() -> Self {
956 Self(ParamsInternal::Empty)
957 }
958
959 #[must_use]
960 pub fn from_wasmtime(vals: Arc<[wasmtime::component::Val]>) -> Self {
961 if vals.is_empty() {
962 Self::empty()
963 } else {
964 Self(ParamsInternal::Vals {
965 vals,
966 json_vals_cache: Arc::default(),
967 })
968 }
969 }
970
971 #[cfg(any(test, feature = "test"))]
972 #[must_use]
973 pub fn from_json_values_test(vec: Vec<Value>) -> Self {
974 if vec.is_empty() {
975 Self::empty()
976 } else {
977 Self(ParamsInternal::JsonValues(Arc::from(vec)))
978 }
979 }
980
981 #[must_use]
982 pub fn from_json_values(values: Arc<[Value]>) -> Self {
983 if values.is_empty() {
984 Self::empty()
985 } else {
986 Self(ParamsInternal::JsonValues(values))
987 }
988 }
989
990 pub fn typecheck<'a>(
991 &self,
992 param_types: impl ExactSizeIterator<Item = &'a TypeWrapper>,
993 ) -> Result<(), ParamsParsingError> {
994 if param_types.len() != self.len() {
995 return Err(ParamsParsingError::ParameterCardinalityMismatch {
996 expected: param_types.len(),
997 specified: self.len(),
998 });
999 }
1000 match &self.0 {
1001 ParamsInternal::Vals { .. } | ParamsInternal::Empty => {}
1002 ParamsInternal::JsonValues(params) => {
1003 params::deserialize_values(params, param_types)
1004 .map_err(ParamsParsingError::ParamsDeserializationError)?;
1005 }
1006 }
1007 Ok(())
1008 }
1009
1010 pub fn as_vals<'a>(
1011 &self,
1012 param_types: impl ExactSizeIterator<Item = (&'a str, Type)>,
1013 ) -> Result<Arc<[wasmtime::component::Val]>, ParamsParsingError> {
1014 if param_types.len() != self.len() {
1015 return Err(ParamsParsingError::ParameterCardinalityMismatch {
1016 expected: param_types.len(),
1017 specified: self.len(),
1018 });
1019 }
1020 match &self.0 {
1021 ParamsInternal::JsonValues(json_vec) => {
1022 let param_types = param_types
1023 .enumerate()
1024 .map(|(idx, (_param_name, ty))| {
1025 TypeWrapper::try_from(ty).map_err(|err| (idx, err))
1026 })
1027 .collect::<Result<Vec<_>, _>>()
1028 .map_err(|(idx, err)| ParamsParsingError::ParameterTypeError { idx, err })?;
1029 Ok(params::deserialize_values(json_vec, param_types.iter())
1030 .map_err(ParamsParsingError::ParamsDeserializationError)?
1031 .into_iter()
1032 .map(Val::from)
1033 .collect())
1034 }
1035 ParamsInternal::Vals { vals, .. } => Ok(vals.clone()),
1036 ParamsInternal::Empty => Ok(Arc::from([])),
1037 }
1038 }
1039
1040 #[must_use]
1041 pub fn len(&self) -> usize {
1042 match &self.0 {
1043 ParamsInternal::JsonValues(vec) => vec.len(),
1044 ParamsInternal::Vals { vals, .. } => vals.len(),
1045 ParamsInternal::Empty => 0,
1046 }
1047 }
1048
1049 #[must_use]
1050 pub fn is_empty(&self) -> bool {
1051 self.len() == 0
1052 }
1053}
1054
1055impl PartialEq for Params {
1056 fn eq(&self, other: &Self) -> bool {
1057 if self.is_empty() && other.is_empty() {
1058 return true;
1059 }
1060 if self.len() != other.len() {
1061 return false;
1062 }
1063
1064 match (&self.0, &other.0) {
1065 (ParamsInternal::JsonValues(l), ParamsInternal::JsonValues(r)) => l == r,
1066
1067 (
1068 ParamsInternal::Vals {
1069 vals: l,
1070 json_vals_cache: _,
1071 },
1072 ParamsInternal::Vals {
1073 vals: r,
1074 json_vals_cache: _,
1075 },
1076 ) => l == r,
1077
1078 (
1079 ParamsInternal::JsonValues(json_vals),
1080 ParamsInternal::Vals {
1081 vals,
1082 json_vals_cache,
1083 },
1084 )
1085 | (
1086 ParamsInternal::Vals {
1087 vals,
1088 json_vals_cache,
1089 },
1090 ParamsInternal::JsonValues(json_vals),
1091 ) => {
1092 let Ok(vec) = to_json(vals) else { return false };
1093 let equals = *json_vals == vec;
1094 *json_vals_cache.write().unwrap() = Some(vec);
1095 equals
1096 }
1097
1098 (ParamsInternal::Empty, _) | (_, ParamsInternal::Empty) => {
1099 unreachable!("zero length and different lengths handled earlier")
1100 }
1101 }
1102 }
1103}
1104impl Eq for Params {}
1105
1106fn to_json(vals: &[wasmtime::component::Val]) -> Result<Arc<[serde_json::Value]>, ()> {
1108 let mut vec = Vec::with_capacity(vals.len());
1109 for val in vals {
1110 let value = match WastVal::try_from(val.clone()) {
1111 Ok(ok) => ok,
1112 Err(err) => {
1113 error!("cannot compare Params, cannot convert to WastVal: {err:?}");
1114 return Err(());
1115 }
1116 };
1117 let value = match serde_json::to_value(&value) {
1118 Ok(ok) => ok,
1119 Err(err) => {
1120 error!("cannot compare Params, cannot convert to JSON: {err:?}");
1121 return Err(());
1122 }
1123 };
1124 vec.push(value);
1125 }
1126 Ok(Arc::from(vec))
1127}
1128
1129impl Display for Params {
1130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1131 let render_json_vals =
1132 |f: &mut std::fmt::Formatter<'_>, json_vals: &[Value]| -> std::fmt::Result {
1133 for (i, json_value) in json_vals.iter().enumerate() {
1134 if i > 0 {
1135 write!(f, ", ")?;
1136 }
1137 write!(f, "{json_value}")?;
1138 }
1139 Ok(())
1140 };
1141 write!(f, "[")?;
1142 match &self.0 {
1143 ParamsInternal::Empty => {}
1144 ParamsInternal::JsonValues(json_vals) => {
1145 render_json_vals(f, json_vals)?;
1146 }
1147 ParamsInternal::Vals {
1148 vals,
1149 json_vals_cache,
1150 } => {
1151 let guard = json_vals_cache.read().unwrap();
1152 if let Some(json_vals) = &*guard {
1153 render_json_vals(f, json_vals)?;
1154 } else {
1155 drop(guard);
1156 let Ok(json_vals) = to_json(vals) else {
1157 return write!(f, "<serialization error>]"); };
1159 render_json_vals(f, &json_vals)?;
1160 *json_vals_cache.write().unwrap() = Some(json_vals);
1161 }
1162 }
1163 }
1164 write!(f, "]")
1165 }
1166}
1167
1168pub mod prefixed_ulid {
1169 use crate::{
1170 EXECUTION_ID_INFIX, EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX, JoinSetId,
1171 JoinSetIdParseError,
1172 };
1173 use serde_with::{DeserializeFromStr, SerializeDisplay};
1174 use std::{
1175 fmt::{Debug, Display},
1176 hash::Hasher,
1177 marker::PhantomData,
1178 num::ParseIntError,
1179 str::FromStr,
1180 sync::Arc,
1181 };
1182 use ulid::Ulid;
1183
1184 #[derive(derive_more::Display, SerializeDisplay, DeserializeFromStr)]
1185 #[derive_where::derive_where(Clone, Copy)]
1186 #[display("{}_{ulid}", Self::prefix())]
1187 pub struct PrefixedUlid<T: 'static> {
1188 ulid: Ulid,
1189 phantom_data: PhantomData<fn(T) -> T>,
1190 }
1191
1192 impl<T> PrefixedUlid<T> {
1193 const fn new(ulid: Ulid) -> Self {
1194 Self {
1195 ulid,
1196 phantom_data: PhantomData,
1197 }
1198 }
1199
1200 fn prefix() -> &'static str {
1201 std::any::type_name::<T>().rsplit("::").next().unwrap()
1202 }
1203 }
1204
1205 impl<T> PrefixedUlid<T> {
1206 #[must_use]
1207 pub fn generate() -> Self {
1208 Self::new(Ulid::new())
1209 }
1210
1211 #[must_use]
1212 pub const fn from_parts(timestamp_ms: u64, random: u128) -> Self {
1213 Self::new(Ulid::from_parts(timestamp_ms, random))
1214 }
1215
1216 #[must_use]
1217 pub fn timestamp_part(&self) -> u64 {
1218 self.ulid.timestamp_ms()
1219 }
1220
1221 #[must_use]
1222 pub fn random_part(&self) -> u128 {
1224 self.ulid.random()
1225 }
1226 }
1227
1228 #[derive(Debug, thiserror::Error)]
1229 pub enum PrefixedUlidParseError {
1230 #[error("wrong prefix in `{input}`, expected prefix `{expected}`")]
1231 WrongPrefix { input: String, expected: String },
1232 #[error("cannot parse ULID suffix from `{input}`")]
1233 CannotParseUlid { input: String },
1234 }
1235
1236 mod impls {
1237 use super::{PrefixedUlid, PrefixedUlidParseError, Ulid};
1238 use std::{fmt::Debug, fmt::Display, hash::Hash, marker::PhantomData, str::FromStr};
1239
1240 impl<T> FromStr for PrefixedUlid<T> {
1241 type Err = PrefixedUlidParseError;
1242
1243 fn from_str(input: &str) -> Result<Self, Self::Err> {
1244 let prefix = Self::prefix();
1245 let mut input_chars = input.chars();
1246 for exp in prefix.chars() {
1247 if input_chars.next() != Some(exp) {
1248 return Err(PrefixedUlidParseError::WrongPrefix {
1249 input: input.to_string(),
1250 expected: format!("{prefix}_"),
1251 });
1252 }
1253 }
1254 if input_chars.next() != Some('_') {
1255 return Err(PrefixedUlidParseError::WrongPrefix {
1256 input: input.to_string(),
1257 expected: format!("{prefix}_"),
1258 });
1259 }
1260 let Ok(ulid) = Ulid::from_string(input_chars.as_str()) else {
1261 return Err(PrefixedUlidParseError::CannotParseUlid {
1262 input: input.to_string(),
1263 });
1264 };
1265 Ok(Self {
1266 ulid,
1267 phantom_data: PhantomData,
1268 })
1269 }
1270 }
1271
1272 impl<T> Debug for PrefixedUlid<T> {
1273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1274 Display::fmt(&self, f)
1275 }
1276 }
1277
1278 impl<T> Hash for PrefixedUlid<T> {
1279 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1280 Self::prefix().hash(state);
1281 self.ulid.hash(state);
1282 self.phantom_data.hash(state);
1283 }
1284 }
1285
1286 impl<T> PartialEq for PrefixedUlid<T> {
1287 fn eq(&self, other: &Self) -> bool {
1288 self.ulid == other.ulid
1289 }
1290 }
1291
1292 impl<T> Eq for PrefixedUlid<T> {}
1293
1294 impl<T> PartialOrd for PrefixedUlid<T> {
1295 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1296 Some(self.cmp(other))
1297 }
1298 }
1299
1300 impl<T> Ord for PrefixedUlid<T> {
1301 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1302 self.ulid.cmp(&other.ulid)
1303 }
1304 }
1305 }
1306
1307 pub mod prefix {
1308 pub struct E;
1309 pub struct Exr;
1310 pub struct Run;
1311 pub struct Delay;
1312 }
1313
1314 pub type ExecutorId = PrefixedUlid<prefix::Exr>;
1315 pub type ExecutionIdTopLevel = PrefixedUlid<prefix::E>;
1316 pub type RunId = PrefixedUlid<prefix::Run>;
1317 pub type DelayIdTopLevel = PrefixedUlid<prefix::Delay>; #[cfg(any(test, feature = "test"))]
1320 impl<'a, T> arbitrary::Arbitrary<'a> for PrefixedUlid<T> {
1321 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1322 Ok(Self::new(ulid::Ulid::from_parts(
1323 u.arbitrary()?,
1324 u.arbitrary()?,
1325 )))
1326 }
1327 }
1328
1329 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, SerializeDisplay, DeserializeFromStr, Clone)]
1330 pub enum ExecutionId {
1331 TopLevel(ExecutionIdTopLevel),
1332 Derived(ExecutionIdDerived),
1333 }
1334
1335 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1336 pub struct ExecutionIdDerived {
1337 top_level: ExecutionIdTopLevel,
1338 infix: Arc<str>,
1339 idx: u64,
1340 }
1341 impl ExecutionIdDerived {
1342 #[must_use]
1343 pub fn get_incremented(&self) -> Self {
1344 self.get_incremented_by(1)
1345 }
1346 #[must_use]
1347 pub fn get_incremented_by(&self, count: u64) -> Self {
1348 ExecutionIdDerived {
1349 top_level: self.top_level,
1350 infix: self.infix.clone(),
1351 idx: self.idx + count,
1352 }
1353 }
1354 #[must_use]
1355 pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1356 let ExecutionIdDerived {
1357 top_level,
1358 infix,
1359 idx,
1360 } = self;
1361 let infix = Arc::from(format!(
1362 "{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}{EXECUTION_ID_INFIX}{join_set_id}"
1363 ));
1364 ExecutionIdDerived {
1365 top_level: *top_level,
1366 infix,
1367 idx: EXECUTION_ID_START_IDX,
1368 }
1369 }
1370 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1371 let ExecutionIdDerived {
1372 top_level,
1373 infix,
1374 idx,
1375 } = self;
1376 write!(
1377 f,
1378 "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}"
1379 )
1380 }
1381
1382 #[must_use]
1383 pub fn split_to_parts(&self) -> (ExecutionId, JoinSetId) {
1384 self.split_to_parts_res().expect("verified in from_str")
1385 }
1386
1387 fn split_to_parts_res(&self) -> Result<(ExecutionId, JoinSetId), DerivedIdSplitError> {
1388 if let Some((old_infix_and_index, join_set_id)) =
1392 self.infix.rsplit_once(EXECUTION_ID_INFIX)
1393 {
1394 let join_set_id = JoinSetId::from_str(join_set_id)
1395 .map_err(DerivedIdSplitError::JoinSetIdParseError)?;
1396 let Some((old_infix, old_idx)) =
1397 old_infix_and_index.rsplit_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1398 else {
1399 return Err(DerivedIdSplitError::CannotFindJoinSetDelimiter);
1400 };
1401 let parent = ExecutionIdDerived {
1402 top_level: self.top_level,
1403 infix: Arc::from(old_infix),
1404 idx: old_idx
1405 .parse()
1406 .map_err(DerivedIdSplitError::CannotParseOldIndex)?,
1407 };
1408 Ok((ExecutionId::Derived(parent), join_set_id))
1409 } else {
1410 Ok((
1411 ExecutionId::TopLevel(self.top_level),
1412 JoinSetId::from_str(&self.infix)
1413 .map_err(DerivedIdSplitError::JoinSetIdParseError)?,
1414 ))
1415 }
1416 }
1417 }
1418
1419 #[derive(Debug, thiserror::Error)]
1420 pub enum DerivedIdSplitError {
1421 #[error(transparent)]
1422 JoinSetIdParseError(JoinSetIdParseError),
1423 #[error("cannot parse index of parent execution - {0}")]
1424 CannotParseOldIndex(ParseIntError),
1425 #[error("cannot find join set delimiter")]
1426 CannotFindJoinSetDelimiter,
1427 }
1428
1429 impl Debug for ExecutionIdDerived {
1430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1431 self.display_or_debug(f)
1432 }
1433 }
1434 impl Display for ExecutionIdDerived {
1435 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1436 self.display_or_debug(f)
1437 }
1438 }
1439 impl FromStr for ExecutionIdDerived {
1440 type Err = DerivedIdParseError;
1441
1442 fn from_str(input: &str) -> Result<Self, Self::Err> {
1443 let (top_level, infix, idx) = derived_from_str(input)?;
1444 let id = ExecutionIdDerived {
1445 top_level,
1446 infix,
1447 idx,
1448 };
1449 Ok(id)
1450 }
1451 }
1452
1453 fn derived_from_str<T: 'static>(
1454 input: &str,
1455 ) -> Result<(PrefixedUlid<T>, Arc<str>, u64), DerivedIdParseError> {
1456 let (prefix, full_suffix) = input
1458 .split_once(EXECUTION_ID_INFIX)
1459 .ok_or(DerivedIdParseError::FirstDelimiterNotFound)?;
1460
1461 let top_level =
1463 PrefixedUlid::from_str(prefix).map_err(DerivedIdParseError::PrefixedUlidParseError)?;
1464
1465 let mut last_idx = None;
1467 for segment in full_suffix.split(EXECUTION_ID_INFIX) {
1468 if segment.is_empty() {
1469 return Err(DerivedIdParseError::EmptySegment);
1470 }
1471 let (join_set_str, idx_str) = segment
1473 .split_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1474 .ok_or(DerivedIdParseError::SecondDelimiterNotFound)?;
1475 JoinSetId::from_str(join_set_str).map_err(DerivedIdParseError::JoinSetIdParseError)?;
1476 let idx = u64::from_str(idx_str).map_err(DerivedIdParseError::ParseIndexError)?;
1477 last_idx = Some((idx, idx_str));
1478 }
1479 let (idx, idx_str) = last_idx.expect("split must have returned at least one element");
1480 let infix = full_suffix
1481 .strip_suffix(idx_str)
1482 .expect("must have ended with index");
1483 let infix = infix
1484 .strip_suffix(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1485 .expect("must have ended with `_index`");
1486 Ok((top_level, Arc::from(infix), idx))
1487 }
1488
1489 #[cfg(any(test, feature = "test"))]
1490 impl<'a> arbitrary::Arbitrary<'a> for ExecutionIdDerived {
1491 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1492 let top_level = ExecutionId::TopLevel(ExecutionIdTopLevel::arbitrary(u)?);
1493 let join_set_id = JoinSetId::arbitrary(u)?;
1494 Ok(top_level.next_level(&join_set_id))
1495 }
1496 }
1497
1498 #[derive(Debug, thiserror::Error)]
1499 pub enum DerivedIdParseError {
1500 #[error(transparent)]
1501 PrefixedUlidParseError(PrefixedUlidParseError),
1502 #[error("cannot parse derived id - delimiter `{EXECUTION_ID_INFIX}` not found")]
1503 FirstDelimiterNotFound,
1504 #[error(
1505 "cannot parse derived id - delimiter `{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}` not found"
1506 )]
1507 SecondDelimiterNotFound,
1508 #[error(
1509 "cannot parse derived id - suffix after `{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}` must be a number"
1510 )]
1511 ParseIndexError(ParseIntError),
1512 #[error(transparent)]
1513 DerivedIdSplitError(DerivedIdSplitError),
1514 #[error(transparent)]
1515 JoinSetIdParseError(JoinSetIdParseError),
1516 #[error("empty segment")]
1517 EmptySegment,
1518 }
1519
1520 impl ExecutionId {
1521 #[must_use]
1522 pub fn generate() -> Self {
1523 ExecutionId::TopLevel(PrefixedUlid::generate())
1524 }
1525
1526 #[must_use]
1527 pub fn get_top_level(&self) -> ExecutionIdTopLevel {
1528 match &self {
1529 ExecutionId::TopLevel(prefixed_ulid) => *prefixed_ulid,
1530 ExecutionId::Derived(ExecutionIdDerived { top_level, .. }) => *top_level,
1531 }
1532 }
1533
1534 #[must_use]
1535 pub fn is_top_level(&self) -> bool {
1536 matches!(self, ExecutionId::TopLevel(_))
1537 }
1538
1539 #[must_use]
1540 pub fn random_seed(&self) -> u64 {
1541 let mut hasher = fxhash::FxHasher::default();
1542 #[expect(clippy::cast_possible_truncation)]
1546 let random_part = self.get_top_level().random_part() as u64;
1547 hasher.write_u64(random_part);
1548 hasher.write_u64(self.get_top_level().timestamp_part());
1549 if let ExecutionId::Derived(ExecutionIdDerived {
1550 top_level: _,
1551 infix,
1552 idx,
1553 }) = self
1554 {
1555 hasher.write(infix.as_bytes());
1557 hasher.write_u64(*idx);
1558 }
1559 hasher.finish()
1560 }
1561
1562 #[must_use]
1563 pub const fn from_parts(timestamp_ms: u64, random_part: u128) -> Self {
1564 ExecutionId::TopLevel(ExecutionIdTopLevel::from_parts(timestamp_ms, random_part))
1565 }
1566
1567 #[must_use]
1568 pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1569 match &self {
1570 ExecutionId::TopLevel(top_level) => ExecutionIdDerived {
1571 top_level: *top_level,
1572 infix: Arc::from(join_set_id.to_string()),
1573 idx: EXECUTION_ID_START_IDX,
1574 },
1575 ExecutionId::Derived(derived) => derived.next_level(join_set_id),
1576 }
1577 }
1578
1579 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1580 match &self {
1581 ExecutionId::TopLevel(top_level) => Display::fmt(top_level, f),
1582 ExecutionId::Derived(derived) => Display::fmt(derived, f),
1583 }
1584 }
1585 }
1586
1587 const EXECUTION_ID_START_IDX: u64 = 1;
1588 pub const JOIN_SET_START_IDX: u64 = 1;
1589 const DELAY_ID_START_IDX: u64 = 1;
1590
1591 #[derive(Debug, thiserror::Error)]
1592 pub enum ExecutionIdParseError {
1593 #[error(transparent)]
1594 PrefixedUlidParseError(PrefixedUlidParseError),
1595 #[error(transparent)]
1596 DerivedIdParseError(DerivedIdParseError),
1597 }
1598
1599 impl FromStr for ExecutionId {
1600 type Err = ExecutionIdParseError;
1601
1602 fn from_str(input: &str) -> Result<Self, Self::Err> {
1603 if input.contains(EXECUTION_ID_INFIX) {
1604 ExecutionIdDerived::from_str(input)
1605 .map(ExecutionId::Derived)
1606 .map_err(ExecutionIdParseError::DerivedIdParseError)
1607 } else {
1608 Ok(ExecutionId::TopLevel(
1609 PrefixedUlid::from_str(input)
1610 .map_err(ExecutionIdParseError::PrefixedUlidParseError)?,
1611 ))
1612 }
1613 }
1614 }
1615
1616 #[derive(Debug, thiserror::Error)]
1617 pub enum ExecutionIdStructuralParseError {
1618 #[error(transparent)]
1619 ExecutionIdParseError(#[from] ExecutionIdParseError),
1620 #[error("execution-id must be a record with `id` field of type string")]
1621 TypeError,
1622 }
1623
1624 impl TryFrom<&wasmtime::component::Val> for ExecutionId {
1625 type Error = ExecutionIdStructuralParseError;
1626
1627 fn try_from(execution_id: &wasmtime::component::Val) -> Result<Self, Self::Error> {
1628 if let wasmtime::component::Val::Record(key_vals) = execution_id
1629 && key_vals.len() == 1
1630 && let Some((key, execution_id)) = key_vals.first()
1631 && key == "id"
1632 && let wasmtime::component::Val::String(execution_id) = execution_id
1633 {
1634 ExecutionId::from_str(execution_id)
1635 .map_err(ExecutionIdStructuralParseError::ExecutionIdParseError)
1636 } else {
1637 Err(ExecutionIdStructuralParseError::TypeError)
1638 }
1639 }
1640 }
1641
1642 impl Debug for ExecutionId {
1643 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1644 self.display_or_debug(f)
1645 }
1646 }
1647
1648 impl Display for ExecutionId {
1649 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1650 self.display_or_debug(f)
1651 }
1652 }
1653
1654 #[cfg(any(test, feature = "test"))]
1655 impl<'a> arbitrary::Arbitrary<'a> for ExecutionId {
1656 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1657 Ok(ExecutionId::TopLevel(PrefixedUlid::arbitrary(u)?))
1658 }
1659 }
1660
1661 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1663 pub struct DelayId {
1664 top_level: DelayIdTopLevel,
1665 infix: Arc<str>,
1666 idx: u64,
1667 }
1668 impl DelayId {
1669 #[must_use]
1670 pub fn new(execution_id: &ExecutionId, join_set_id: &JoinSetId) -> DelayId {
1671 Self::new_with_index(execution_id, join_set_id, DELAY_ID_START_IDX)
1672 }
1673
1674 #[must_use]
1675 pub fn new_with_index(
1676 execution_id: &ExecutionId,
1677 join_set_id: &JoinSetId,
1678 idx: u64,
1679 ) -> DelayId {
1680 let ExecutionIdDerived {
1681 top_level: PrefixedUlid { ulid, .. },
1682 infix,
1683 idx: _,
1684 } = execution_id.next_level(join_set_id);
1685 let top_level = DelayIdTopLevel::new(ulid);
1686 DelayId {
1687 top_level,
1688 infix,
1689 idx,
1690 }
1691 }
1692
1693 #[must_use]
1694 pub fn get_incremented(&self) -> Self {
1695 Self {
1696 top_level: self.top_level,
1697 infix: self.infix.clone(),
1698 idx: self.idx + 1,
1699 }
1700 }
1701
1702 #[must_use]
1703 pub fn split_to_parts(&self) -> (ExecutionId, JoinSetId) {
1704 self.split_to_parts_res().expect("verified in from_str")
1705 }
1706
1707 fn split_to_parts_res(&self) -> Result<(ExecutionId, JoinSetId), DerivedIdSplitError> {
1708 if let Some((old_infix_and_index, join_set_id)) =
1712 self.infix.rsplit_once(EXECUTION_ID_INFIX)
1713 {
1714 let join_set_id = JoinSetId::from_str(join_set_id)
1715 .map_err(DerivedIdSplitError::JoinSetIdParseError)?;
1716 let Some((old_infix, old_idx)) =
1717 old_infix_and_index.rsplit_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1718 else {
1719 return Err(DerivedIdSplitError::CannotFindJoinSetDelimiter);
1720 };
1721 let parent = ExecutionIdDerived {
1722 top_level: ExecutionIdTopLevel::new(self.top_level.ulid),
1723 infix: Arc::from(old_infix),
1724 idx: old_idx
1725 .parse()
1726 .map_err(DerivedIdSplitError::CannotParseOldIndex)?,
1727 };
1728 Ok((ExecutionId::Derived(parent), join_set_id))
1729 } else {
1730 Ok((
1731 ExecutionId::TopLevel(ExecutionIdTopLevel::new(self.top_level.ulid)),
1732 JoinSetId::from_str(&self.infix)
1733 .map_err(DerivedIdSplitError::JoinSetIdParseError)?,
1734 ))
1735 }
1736 }
1737
1738 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1739 let DelayId {
1740 top_level,
1741 infix,
1742 idx,
1743 } = self;
1744 write!(
1745 f,
1746 "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}"
1747 )
1748 }
1749 }
1750
1751 pub mod delay_impl {
1752 use super::{DelayId, DerivedIdParseError, derived_from_str};
1753 use std::{
1754 fmt::{Debug, Display},
1755 str::FromStr,
1756 };
1757
1758 impl Debug for DelayId {
1759 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1760 self.display_or_debug(f)
1761 }
1762 }
1763
1764 impl Display for DelayId {
1765 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1766 self.display_or_debug(f)
1767 }
1768 }
1769
1770 impl FromStr for DelayId {
1771 type Err = DerivedIdParseError;
1772
1773 fn from_str(input: &str) -> Result<Self, Self::Err> {
1774 let (top_level, infix, idx) = derived_from_str(input)?;
1775 let id = DelayId {
1776 top_level,
1777 infix,
1778 idx,
1779 };
1780 Ok(id)
1781 }
1782 }
1783
1784 #[cfg(any(test, feature = "test"))]
1785 impl<'a> arbitrary::Arbitrary<'a> for DelayId {
1786 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1787 use super::{ExecutionId, JoinSetId};
1788 let execution_id = ExecutionId::arbitrary(u)?;
1789 let mut join_set_id = JoinSetId::arbitrary(u)?;
1790 join_set_id.kind = crate::JoinSetKind::OneOff;
1791 Ok(DelayId::new(&execution_id, &join_set_id))
1792 }
1793 }
1794 }
1795}
1796
1797#[derive(
1798 Debug,
1799 Clone,
1800 PartialEq,
1801 Eq,
1802 Hash,
1803 derive_more::Display,
1804 serde_with::SerializeDisplay,
1805 serde_with::DeserializeFromStr,
1806)]
1807#[non_exhaustive] #[display("{kind}{JOIN_SET_ID_INFIX}{name}")]
1809pub struct JoinSetId {
1810 pub kind: JoinSetKind,
1811 pub name: StrVariant,
1812}
1813
1814impl JoinSetId {
1815 pub fn new(kind: JoinSetKind, name: StrVariant) -> Result<Self, InvalidNameError<JoinSetId>> {
1816 Ok(Self {
1817 kind,
1818 name: check_name(name, CHARSET_EXTRA_JSON_SET)?,
1819 })
1820 }
1821}
1822
1823pub const CHARSET_ALPHANUMERIC: &str =
1824 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1825
1826#[derive(
1827 Debug,
1828 Clone,
1829 Copy,
1830 PartialEq,
1831 Eq,
1832 Hash,
1833 derive_more::Display,
1834 Serialize,
1835 Deserialize,
1836 strum::EnumIter,
1837)]
1838#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
1839#[display("{}", self.as_code())]
1840pub enum JoinSetKind {
1841 OneOff,
1842 Named,
1843 Generated,
1844}
1845impl JoinSetKind {
1846 fn as_code(&self) -> &'static str {
1847 match self {
1848 JoinSetKind::OneOff => "o",
1849 JoinSetKind::Named => "n",
1850 JoinSetKind::Generated => "g",
1851 }
1852 }
1853}
1854impl FromStr for JoinSetKind {
1855 type Err = &'static str;
1856 fn from_str(s: &str) -> Result<Self, Self::Err> {
1857 use strum::IntoEnumIterator;
1858 Self::iter()
1859 .find(|variant| s == variant.as_code())
1860 .ok_or("unknown join set kind")
1861 }
1862}
1863
1864pub(crate) const EXECUTION_ID_INFIX: char = '.';
1865pub(crate) const EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX: char = '_';
1866
1867const JOIN_SET_ID_INFIX: char = ':';
1868const CHARSET_EXTRA_JSON_SET: &str = "-/";
1869
1870impl FromStr for JoinSetId {
1871 type Err = JoinSetIdParseError;
1872
1873 fn from_str(input: &str) -> Result<Self, Self::Err> {
1874 let Some((kind, name)) = input.split_once(JOIN_SET_ID_INFIX) else {
1875 return Err(JoinSetIdParseError::WrongParts);
1876 };
1877 let kind = kind
1878 .parse()
1879 .map_err(JoinSetIdParseError::JoinSetKindParseError)?;
1880 JoinSetId::new(kind, StrVariant::from(name.to_string()))
1881 .map_err(JoinSetIdParseError::InvalidName)
1882 }
1883}
1884
1885#[derive(Debug, thiserror::Error)]
1886pub enum JoinSetIdParseError {
1887 #[error("join set must consist of three parts separated by {JOIN_SET_ID_INFIX} ")]
1888 WrongParts,
1889 #[error("cannot parse join set kind - {0}")]
1890 JoinSetKindParseError(&'static str),
1891 #[error("cannot parse join set id - {0}")]
1892 InvalidName(InvalidNameError<JoinSetId>),
1893}
1894
1895#[cfg(any(test, feature = "test"))]
1896const CHARSET_JOIN_SET_NAME: &str =
1897 const_format::concatcp!(CHARSET_ALPHANUMERIC, CHARSET_EXTRA_JSON_SET);
1898#[cfg(any(test, feature = "test"))]
1899impl<'a> arbitrary::Arbitrary<'a> for JoinSetId {
1900 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1901 let name: String = {
1902 let length_inclusive = u.int_in_range(0..=10).unwrap();
1903 (0..=length_inclusive)
1904 .map(|_| {
1905 let idx = u.choose_index(CHARSET_JOIN_SET_NAME.len()).unwrap();
1906 CHARSET_JOIN_SET_NAME
1907 .chars()
1908 .nth(idx)
1909 .expect("idx is < charset.len()")
1910 })
1911 .collect()
1912 };
1913
1914 Ok(JoinSetId::new(JoinSetKind::Named, StrVariant::from(name)).unwrap())
1915 }
1916}
1917
1918const EXECUTION_FAILED_STRING_OR_VARIANT: &str = "execution-failed";
1919#[derive(
1920 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1921)]
1922pub enum ReturnType {
1923 Extendable(ReturnTypeExtendable), NonExtendable(ReturnTypeNonExtendable), }
1926impl ReturnType {
1927 #[must_use]
1935 pub fn detect(type_wrapper: TypeWrapper, wit_type: StrVariant) -> ReturnType {
1936 if let TypeWrapper::Result { ok, err: None } = type_wrapper {
1937 return ReturnType::Extendable(ReturnTypeExtendable {
1938 type_wrapper_tl: TypeWrapperTopLevel { ok, err: None },
1939 wit_type,
1940 });
1941 } else if let TypeWrapper::Result { ok, err: Some(err) } = type_wrapper {
1942 if let TypeWrapper::String = err.as_ref() {
1943 return ReturnType::Extendable(ReturnTypeExtendable {
1944 type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
1945 wit_type,
1946 });
1947 } else if let TypeWrapper::Variant(fields) = err.as_ref()
1948 && let Some(None) = fields.get(EXECUTION_FAILED_STRING_OR_VARIANT)
1949 {
1950 return ReturnType::Extendable(ReturnTypeExtendable {
1951 type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
1952 wit_type,
1953 });
1954 }
1955 return ReturnType::NonExtendable(ReturnTypeNonExtendable {
1956 type_wrapper: TypeWrapper::Result { ok, err: Some(err) },
1957 wit_type,
1958 });
1959 }
1960 ReturnType::NonExtendable(ReturnTypeNonExtendable {
1961 type_wrapper: type_wrapper.clone(),
1962 wit_type,
1963 })
1964 }
1965
1966 #[must_use]
1967 pub fn wit_type(&self) -> &str {
1968 match self {
1969 ReturnType::Extendable(compatible) => compatible.wit_type.as_ref(),
1970 ReturnType::NonExtendable(incompatible) => incompatible.wit_type.as_ref(),
1971 }
1972 }
1973
1974 #[must_use]
1975 pub fn type_wrapper(&self) -> TypeWrapper {
1976 match self {
1977 ReturnType::Extendable(compatible) => {
1978 TypeWrapper::from(compatible.type_wrapper_tl.clone())
1979 }
1980 ReturnType::NonExtendable(incompatible) => incompatible.type_wrapper.clone(),
1981 }
1982 }
1983}
1984
1985#[derive(
1986 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1987)]
1988#[display("{wit_type}")]
1989pub struct ReturnTypeNonExtendable {
1990 pub type_wrapper: TypeWrapper,
1991 pub wit_type: StrVariant,
1992}
1993
1994#[derive(
1995 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1996)]
1997#[display("{wit_type}")]
1998pub struct ReturnTypeExtendable {
1999 pub type_wrapper_tl: TypeWrapperTopLevel,
2000 pub wit_type: StrVariant,
2001}
2002
2003#[cfg(any(test, feature = "test"))]
2004pub const RETURN_TYPE_DUMMY: ReturnType = ReturnType::Extendable(ReturnTypeExtendable {
2005 type_wrapper_tl: TypeWrapperTopLevel {
2006 ok: None,
2007 err: None,
2008 },
2009 wit_type: StrVariant::Static("result"),
2010});
2011
2012#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, derive_more::Display)]
2013#[derive_where::derive_where(PartialEq)]
2014#[display("{name}: {wit_type}")]
2015pub struct ParameterType {
2016 pub type_wrapper: TypeWrapper,
2017 #[derive_where(skip)]
2018 pub name: StrVariant,
2020 pub wit_type: StrVariant,
2021}
2022
2023#[derive(
2024 Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default, derive_more::Deref,
2025)]
2026pub struct ParameterTypes(pub Vec<ParameterType>);
2027
2028impl Debug for ParameterTypes {
2029 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2030 write!(f, "(")?;
2031 let mut iter = self.0.iter().peekable();
2032 while let Some(p) = iter.next() {
2033 write!(f, "{p:?}")?;
2034 if iter.peek().is_some() {
2035 write!(f, ", ")?;
2036 }
2037 }
2038 write!(f, ")")
2039 }
2040}
2041
2042impl Display for ParameterTypes {
2043 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2044 write!(f, "(")?;
2045 let mut iter = self.0.iter().peekable();
2046 while let Some(p) = iter.next() {
2047 write!(f, "{p}")?;
2048 if iter.peek().is_some() {
2049 write!(f, ", ")?;
2050 }
2051 }
2052 write!(f, ")")
2053 }
2054}
2055
2056#[derive(Debug, Clone)]
2057pub struct PackageIfcFns {
2058 pub ifc_fqn: IfcFqnName,
2059 pub extension: bool, pub fns: IndexMap<FnName, FunctionMetadata>,
2061}
2062
2063#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2064pub struct ComponentRetryConfig {
2065 pub max_retries: Option<u32>,
2066 pub retry_exp_backoff: Duration,
2067}
2068impl ComponentRetryConfig {
2069 pub const ZERO: ComponentRetryConfig = ComponentRetryConfig {
2070 max_retries: Some(0),
2071 retry_exp_backoff: Duration::ZERO,
2072 };
2073
2074 #[cfg(feature = "test")]
2075 pub const WORKFLOW_TEST: ComponentRetryConfig = ComponentRetryConfig {
2076 max_retries: None,
2077 retry_exp_backoff: Duration::ZERO,
2078 };
2079}
2080
2081pub trait FunctionRegistry: Send + Sync {
2083 fn get_by_exported_function(
2084 &self,
2085 ffqn: &FunctionFqn,
2086 ) -> Option<(FunctionMetadata, ComponentId)>;
2087
2088 fn get_ret_type(&self, ffqn: &FunctionFqn) -> Option<TypeWrapperTopLevel> {
2091 self.get_by_exported_function(ffqn)
2092 .and_then(|(fn_meta, _)| {
2093 if let ReturnType::Extendable(ReturnTypeExtendable {
2094 type_wrapper_tl: type_wrapper,
2095 wit_type: _,
2096 }) = fn_meta.return_type
2097 {
2098 Some(type_wrapper)
2099 } else {
2100 None
2101 }
2102 })
2103 }
2104
2105 fn all_exports(&self) -> &[PackageIfcFns];
2106}
2107
2108#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_more::Display, PartialEq, Eq)]
2109#[display("{_0:?}")]
2110pub struct ExecutionMetadata(Option<hashbrown::HashMap<String, String>>);
2111
2112impl ExecutionMetadata {
2113 const LINKED_KEY: &str = "obelisk-tracing-linked";
2114 #[must_use]
2115 pub const fn empty() -> Self {
2116 Self(None)
2118 }
2119
2120 #[must_use]
2121 pub fn from_parent_span(less_specific: &Span) -> Self {
2122 ExecutionMetadata::create(less_specific, false)
2123 }
2124
2125 #[must_use]
2126 pub fn from_linked_span(less_specific: &Span) -> Self {
2127 ExecutionMetadata::create(less_specific, true)
2128 }
2129
2130 #[must_use]
2135 #[expect(clippy::items_after_statements)]
2136 fn create(span: &Span, link_marker: bool) -> Self {
2137 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2138 let mut metadata = Self(Some(hashbrown::HashMap::default()));
2139 let mut metadata_view = ExecutionMetadataInjectorView {
2140 metadata: &mut metadata,
2141 };
2142 fn inject(s: &Span, metadata_view: &mut ExecutionMetadataInjectorView) {
2144 opentelemetry::global::get_text_map_propagator(|propagator| {
2145 propagator.inject_context(&s.context(), metadata_view);
2146 });
2147 }
2148 inject(&Span::current(), &mut metadata_view);
2149 if metadata_view.is_empty() {
2150 inject(span, &mut metadata_view);
2152 }
2153 if link_marker {
2154 metadata_view.set(Self::LINKED_KEY, String::new());
2155 }
2156 metadata
2157 }
2158
2159 pub fn enrich(&self, span: &Span) {
2160 use opentelemetry::trace::TraceContextExt as _;
2161 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2162
2163 let metadata_view = ExecutionMetadataExtractorView { metadata: self };
2164 let otel_context = opentelemetry::global::get_text_map_propagator(|propagator| {
2165 propagator.extract(&metadata_view)
2166 });
2167 if metadata_view.get(Self::LINKED_KEY).is_some() {
2168 let linked_span_context = otel_context.span().span_context().clone();
2169 span.add_link(linked_span_context);
2170 } else {
2171 let _ = span.set_parent(otel_context);
2172 }
2173 }
2174}
2175
2176struct ExecutionMetadataInjectorView<'a> {
2177 metadata: &'a mut ExecutionMetadata,
2178}
2179
2180impl ExecutionMetadataInjectorView<'_> {
2181 fn is_empty(&self) -> bool {
2182 self.metadata
2183 .0
2184 .as_ref()
2185 .is_some_and(hashbrown::HashMap::is_empty)
2186 }
2187}
2188
2189impl opentelemetry::propagation::Injector for ExecutionMetadataInjectorView<'_> {
2190 fn set(&mut self, key: &str, value: String) {
2191 let key = format!("tracing:{key}");
2192 let map = if let Some(map) = self.metadata.0.as_mut() {
2193 map
2194 } else {
2195 self.metadata.0 = Some(hashbrown::HashMap::new());
2196 assert_matches!(&mut self.metadata.0, Some(map) => map)
2197 };
2198 map.insert(key, value);
2199 }
2200}
2201
2202struct ExecutionMetadataExtractorView<'a> {
2203 metadata: &'a ExecutionMetadata,
2204}
2205
2206impl opentelemetry::propagation::Extractor for ExecutionMetadataExtractorView<'_> {
2207 fn get(&self, key: &str) -> Option<&str> {
2208 self.metadata
2209 .0
2210 .as_ref()
2211 .and_then(|map| map.get(&format!("tracing:{key}")))
2212 .map(std::string::String::as_str)
2213 }
2214
2215 fn keys(&self) -> Vec<&str> {
2216 match &self.metadata.0.as_ref() {
2217 Some(map) => map
2218 .keys()
2219 .filter_map(|key| key.strip_prefix("tracing:"))
2220 .collect(),
2221 None => vec![],
2222 }
2223 }
2224}
2225
2226#[cfg(test)]
2227mod tests {
2228
2229 use rstest::rstest;
2230
2231 use crate::{
2232 ExecutionId, FunctionFqn, JoinSetId, JoinSetKind, StrVariant, prefixed_ulid::ExecutorId,
2233 };
2234 use std::{
2235 hash::{DefaultHasher, Hash, Hasher},
2236 str::FromStr,
2237 sync::Arc,
2238 };
2239
2240 #[test]
2241 fn ulid_parsing() {
2242 let generated = ExecutorId::generate();
2243 let str = generated.to_string();
2244 let parsed = str.parse().unwrap();
2245 assert_eq!(generated, parsed);
2246 }
2247
2248 #[test]
2249 fn execution_id_parsing_top_level() {
2250 let generated = ExecutionId::generate();
2251 let str = generated.to_string();
2252 let parsed = str.parse().unwrap();
2253 assert_eq!(generated, parsed);
2254 }
2255
2256 #[test]
2257 fn execution_id_with_one_level_should_parse() {
2258 let top_level = ExecutionId::generate();
2259 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2260 let first_child = ExecutionId::Derived(top_level.next_level(&join_set_id));
2261 let ser = first_child.to_string();
2262 assert_eq!(format!("{top_level}.n:name_1"), ser);
2263 let parsed = ExecutionId::from_str(&ser).unwrap();
2264 assert_eq!(first_child, parsed);
2265 }
2266
2267 #[test]
2268 fn execution_id_increment_twice() {
2269 let top_level = ExecutionId::generate();
2270 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2271 let first_child = top_level.next_level(&join_set_id);
2272 let second_child = ExecutionId::Derived(first_child.get_incremented());
2273 let ser = second_child.to_string();
2274 assert_eq!(format!("{top_level}.n:name_2"), ser);
2275 let parsed = ExecutionId::from_str(&ser).unwrap();
2276 assert_eq!(second_child, parsed);
2277 }
2278
2279 #[test]
2280 fn execution_id_next_level_twice() {
2281 let top_level = ExecutionId::generate();
2282 let join_set_id_outer =
2283 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("gg")).unwrap();
2284 let join_set_id_inner =
2285 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2286 let execution_id = ExecutionId::Derived(
2287 top_level
2288 .next_level(&join_set_id_outer)
2289 .get_incremented()
2290 .next_level(&join_set_id_inner)
2291 .get_incremented(),
2292 );
2293 let ser = execution_id.to_string();
2294 assert_eq!(format!("{top_level}.g:gg_2.o:oo_2"), ser);
2295 let parsed = ExecutionId::from_str(&ser).unwrap();
2296 assert_eq!(execution_id, parsed);
2297 }
2298
2299 #[test]
2300 fn execution_id_split_first_level() {
2301 let top_level = ExecutionId::generate();
2302 let join_set_id =
2303 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2304 let execution_id = top_level.next_level(&join_set_id);
2305 let (actual_top_level, actual_join_set) = execution_id.split_to_parts();
2306 assert_eq!(top_level, actual_top_level);
2307 assert_eq!(join_set_id, actual_join_set);
2308 }
2309
2310 #[rstest]
2311 fn execution_id_split_second_level(#[values(0, 1)] outer_idx: u64) {
2312 let top_level = ExecutionId::generate();
2313 let join_set_id_outer =
2314 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2315 let first_level = top_level
2316 .next_level(&join_set_id_outer)
2317 .get_incremented_by(outer_idx);
2318
2319 let join_set_id_inner =
2320 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("other")).unwrap();
2321 let second_level = first_level.next_level(&join_set_id_inner);
2322
2323 let (actual_first_level, actual_join_set) = second_level.split_to_parts();
2324 assert_eq!(ExecutionId::Derived(first_level), actual_first_level);
2325 assert_eq!(join_set_id_inner, actual_join_set);
2326 }
2327
2328 #[test]
2329 fn invalid_execution_id_should_fail_to_parse() {
2330 ExecutionId::from_str("E_01KBNHM5FQW81KP5Y3XMNX31K4.g:gg._2.o:oo_2").unwrap_err();
2331 }
2332
2333 #[test]
2334 fn execution_id_hash_should_be_stable() {
2335 let parent = ExecutionId::from_parts(1, 2);
2336 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2337 let sibling_1 = parent.next_level(&join_set_id);
2338 let sibling_2 = ExecutionId::Derived(sibling_1.get_incremented());
2339 let sibling_1 = ExecutionId::Derived(sibling_1);
2340 let join_set_id_inner =
2341 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2342 let child =
2343 ExecutionId::Derived(sibling_1.next_level(&join_set_id_inner).get_incremented());
2344 let parent = parent.random_seed();
2345 let sibling_1 = sibling_1.random_seed();
2346 let sibling_2 = sibling_2.random_seed();
2347 let child = child.random_seed();
2348 let vec = vec![parent, sibling_1, sibling_2, child];
2349 insta::assert_debug_snapshot!(vec);
2350 let set: hashbrown::HashSet<_> = vec.into_iter().collect();
2352 assert_eq!(4, set.len());
2353 }
2354
2355 #[test]
2356 fn hash_of_str_variants_should_be_equal() {
2357 let input = "foo";
2358 let left = StrVariant::Arc(Arc::from(input));
2359 let right = StrVariant::Static(input);
2360 assert_eq!(left, right);
2361 let mut left_hasher = DefaultHasher::new();
2362 left.hash(&mut left_hasher);
2363 let mut right_hasher = DefaultHasher::new();
2364 right.hash(&mut right_hasher);
2365 let left_hasher = left_hasher.finish();
2366 let right_hasher = right_hasher.finish();
2367 println!("left: {left_hasher:x}, right: {right_hasher:x}");
2368 assert_eq!(left_hasher, right_hasher);
2369 }
2370
2371 #[test]
2372 fn ffqn_from_tuple_with_version_should_work() {
2373 let ffqn = FunctionFqn::try_from_tuple("wasi:cli/run@0.2.0", "run").unwrap();
2374 assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2375 }
2376
2377 #[test]
2378 fn ffqn_from_str_with_version_should_work() {
2379 let ffqn = FunctionFqn::from_str("wasi:cli/run@0.2.0.run").unwrap();
2380 assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2381 }
2382
2383 #[tokio::test]
2384 async fn join_set_serde_should_be_consistent() {
2385 use crate::{JoinSetId, JoinSetKind};
2386 use strum::IntoEnumIterator;
2387 for kind in JoinSetKind::iter() {
2388 let join_set_id = JoinSetId::new(kind, StrVariant::from("name")).unwrap();
2389 let ser = serde_json::to_string(&join_set_id).unwrap();
2390 let deser = serde_json::from_str(&ser).unwrap();
2391 assert_eq!(join_set_id, deser);
2392 }
2393 }
2394}