1pub mod component_id;
2mod error_conversions;
3#[cfg(feature = "postgres")]
4mod postgres_ext;
5#[cfg(feature = "rusqlite")]
6mod rusqlite_ext;
7pub mod storage;
8pub mod time;
9
10pub use crate::component_id::{
11 ComponentId, ComponentType, ContentDigest, InvalidNameError, check_name,
12};
13use ::serde::{Deserialize, Serialize};
14use assert_matches::assert_matches;
15pub use indexmap;
16use indexmap::IndexMap;
17use opentelemetry::propagation::{Extractor, Injector};
18pub use prefixed_ulid::ExecutionId;
19use serde_json::Value;
20use std::{
21 borrow::Borrow,
22 fmt::{Debug, Display, Write},
23 hash::Hash,
24 marker::PhantomData,
25 ops::Deref,
26 str::FromStr,
27 sync::Arc,
28 time::Duration,
29};
30use storage::{PendingStateFinishedError, PendingStateFinishedResultKind};
31use tracing::{Span, error};
32use val_json::{
33 type_wrapper::{TypeConversionError, TypeWrapper},
34 wast_val::{WastVal, WastValWithType},
35 wast_val_ser::params,
36};
37use wasmtime::component::{Type, Val};
38
39pub const NAMESPACE_OBELISK: &str = "obelisk";
40const NAMESPACE_WASI: &str = "wasi";
41pub const SUFFIX_PKG_EXT: &str = "-obelisk-ext";
42pub const SUFFIX_PKG_SCHEDULE: &str = "-obelisk-schedule";
43pub const SUFFIX_PKG_STUB: &str = "-obelisk-stub";
44
45#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
46#[error("execution error: {kind}")]
47pub struct FinishedExecutionError {
48 pub kind: ExecutionFailureKind,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub reason: Option<String>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub detail: Option<String>,
53}
54impl FinishedExecutionError {
55 #[must_use]
56 pub fn as_pending_state_finished_error(&self) -> PendingStateFinishedError {
57 PendingStateFinishedError::ExecutionFailure(self.kind)
58 }
59}
60
61#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
62#[serde(rename_all = "snake_case")]
63pub enum ExecutionFailureKind {
64 TimedOut,
66 NondeterminismDetected,
68 OutOfFuel,
70 Cancelled,
72 Uncategorized,
73}
74
75#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
76#[serde(rename_all = "snake_case")]
77pub enum TrapKind {
78 #[display("trap")]
79 Trap,
80 #[display("post_return_trap")]
81 PostReturnTrap,
82 #[display("out of fuel")]
83 OutOfFuel,
84 #[display("host function error")]
85 HostFunctionError,
86}
87
88#[derive(Clone, Eq, derive_more::Display)]
89pub enum StrVariant {
90 Static(&'static str),
91 Arc(Arc<str>),
92}
93
94impl StrVariant {
95 #[must_use]
96 pub const fn empty() -> StrVariant {
97 StrVariant::Static("")
98 }
99}
100
101impl From<String> for StrVariant {
102 fn from(value: String) -> Self {
103 StrVariant::Arc(Arc::from(value))
104 }
105}
106
107impl From<&'static str> for StrVariant {
108 fn from(value: &'static str) -> Self {
109 StrVariant::Static(value)
110 }
111}
112
113impl PartialEq for StrVariant {
114 fn eq(&self, other: &Self) -> bool {
115 match (self, other) {
116 (Self::Static(left), Self::Static(right)) => left == right,
117 (Self::Static(left), Self::Arc(right)) => *left == right.deref(),
118 (Self::Arc(left), Self::Arc(right)) => left == right,
119 (Self::Arc(left), Self::Static(right)) => left.deref() == *right,
120 }
121 }
122}
123
124impl Hash for StrVariant {
125 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
126 match self {
127 StrVariant::Static(val) => val.hash(state),
128 StrVariant::Arc(val) => {
129 let str: &str = val.deref();
130 str.hash(state);
131 }
132 }
133 }
134}
135
136impl Debug for StrVariant {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 Display::fmt(self, f)
139 }
140}
141
142impl Deref for StrVariant {
143 type Target = str;
144 fn deref(&self) -> &Self::Target {
145 match self {
146 Self::Arc(v) => v,
147 Self::Static(v) => v,
148 }
149 }
150}
151
152impl AsRef<str> for StrVariant {
153 fn as_ref(&self) -> &str {
154 match self {
155 Self::Arc(v) => v,
156 Self::Static(v) => v,
157 }
158 }
159}
160
161mod serde_strvariant {
162 use crate::StrVariant;
163 use serde::{
164 Deserialize, Deserializer, Serialize, Serializer,
165 de::{self, Visitor},
166 };
167 use std::{ops::Deref, sync::Arc};
168
169 impl Serialize for StrVariant {
170 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
171 where
172 S: Serializer,
173 {
174 serializer.serialize_str(self.deref())
175 }
176 }
177
178 impl<'de> Deserialize<'de> for StrVariant {
179 fn deserialize<D>(deserializer: D) -> Result<StrVariant, D::Error>
180 where
181 D: Deserializer<'de>,
182 {
183 deserializer.deserialize_str(StrVariantVisitor)
184 }
185 }
186
187 struct StrVariantVisitor;
188
189 impl Visitor<'_> for StrVariantVisitor {
190 type Value = StrVariant;
191
192 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
193 formatter.write_str("a string")
194 }
195
196 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
197 where
198 E: de::Error,
199 {
200 Ok(StrVariant::Arc(Arc::from(v)))
201 }
202 }
203}
204
205#[derive(Hash, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
206#[display("{value}")]
207#[serde(transparent)]
208pub struct Name<T> {
209 pub value: StrVariant,
210 #[serde(skip)]
211 phantom_data: PhantomData<fn(T) -> T>,
212}
213
214impl<T> Name<T> {
215 #[must_use]
216 pub fn new_arc(value: Arc<str>) -> Self {
217 Self {
218 value: StrVariant::Arc(value),
219 phantom_data: PhantomData,
220 }
221 }
222
223 #[must_use]
224 pub const fn new_static(value: &'static str) -> Self {
225 Self {
226 value: StrVariant::Static(value),
227 phantom_data: PhantomData,
228 }
229 }
230}
231
232impl<T> Debug for Name<T> {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 Display::fmt(&self, f)
235 }
236}
237
238impl<T> Deref for Name<T> {
239 type Target = str;
240
241 fn deref(&self) -> &Self::Target {
242 self.value.deref()
243 }
244}
245
246impl<T> Borrow<str> for Name<T> {
247 fn borrow(&self) -> &str {
248 self.deref()
249 }
250}
251
252impl<T> From<String> for Name<T> {
253 fn from(value: String) -> Self {
254 Self::new_arc(Arc::from(value))
255 }
256}
257
258#[derive(Clone, Copy, Debug, PartialEq, strum::EnumIter, derive_more::Display)]
259#[display("{}", self.suffix())]
260pub enum PackageExtension {
261 ObeliskExt,
262 ObeliskSchedule,
263 ObeliskStub,
264}
265impl PackageExtension {
266 fn suffix(&self) -> &'static str {
267 match self {
268 PackageExtension::ObeliskExt => SUFFIX_PKG_EXT,
269 PackageExtension::ObeliskSchedule => SUFFIX_PKG_SCHEDULE,
270 PackageExtension::ObeliskStub => SUFFIX_PKG_STUB,
271 }
272 }
273}
274
275#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
276#[cfg_attr(feature = "test", derive(Serialize))]
277pub struct PkgFqn {
278 pub namespace: String, pub package_name: String,
280 pub version: Option<String>,
281}
282impl PkgFqn {
283 #[must_use]
284 pub fn is_extension(&self) -> bool {
285 Self::is_package_name_ext(&self.package_name)
286 }
287
288 #[must_use]
289 pub fn split_ext(&self) -> Option<(PkgFqn, PackageExtension)> {
290 use strum::IntoEnumIterator;
291 for package_ext in PackageExtension::iter() {
292 if let Some(package_name) = self.package_name.strip_suffix(package_ext.suffix()) {
293 return Some((
294 PkgFqn {
295 namespace: self.namespace.clone(),
296 package_name: package_name.to_string(),
297 version: self.version.clone(),
298 },
299 package_ext,
300 ));
301 }
302 }
303 None
304 }
305
306 fn is_package_name_ext(package_name: &str) -> bool {
307 package_name.ends_with(SUFFIX_PKG_EXT)
308 || package_name.ends_with(SUFFIX_PKG_SCHEDULE)
309 || package_name.ends_with(SUFFIX_PKG_STUB)
310 }
311
312 fn fmt(&self, f: &mut dyn Write, delimiter: &'static str) -> std::fmt::Result {
313 let PkgFqn {
314 namespace,
315 package_name,
316 version,
317 } = self;
318 if let Some(version) = version {
319 write!(f, "{namespace}{delimiter}{package_name}@{version}")
320 } else {
321 write!(f, "{namespace}{delimiter}{package_name}")
322 }
323 }
324
325 #[must_use]
326 pub fn as_file_name(&self) -> String {
327 let mut buffer = String::new();
328 self.fmt(&mut buffer, "_").expect("writing to string");
329 buffer
330 }
331}
332impl Display for PkgFqn {
333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334 self.fmt(f, ":")
335 }
336}
337
338#[derive(Hash, Clone, PartialEq, Eq)]
339pub struct IfcFqnMarker;
340
341pub type IfcFqnName = Name<IfcFqnMarker>; impl IfcFqnName {
344 #[must_use]
345 pub fn from_parts(
346 namespace: &str,
347 package_name: &str,
348 ifc_name: &str,
349 version: Option<&str>,
350 ) -> Self {
351 let mut str = format!("{namespace}:{package_name}/{ifc_name}");
352 if let Some(version) = version {
353 str += "@";
354 str += version;
355 }
356 Self::new_arc(Arc::from(str))
357 }
358
359 #[must_use]
360 pub fn from_pkg_fqn(pkg_fqn: &PkgFqn, ifc_name: &str) -> Self {
361 let mut str = format!(
362 "{namespace}:{package_name}/{ifc_name}",
363 namespace = pkg_fqn.namespace,
364 package_name = pkg_fqn.package_name
365 );
366 if let Some(version) = &pkg_fqn.version {
367 str += "@";
368 str += version;
369 }
370 Self::new_arc(Arc::from(str))
371 }
372
373 #[must_use]
374 pub fn namespace(&self) -> &str {
375 self.deref().split_once(':').unwrap().0
376 }
377
378 #[must_use]
379 pub fn package_name(&self) -> &str {
380 let after_colon = self.deref().split_once(':').unwrap().1;
381 after_colon.split_once('/').unwrap().0
382 }
383
384 #[must_use]
385 pub fn version(&self) -> Option<&str> {
386 self.deref().split_once('@').map(|(_, version)| version)
387 }
388
389 #[must_use]
390 pub fn pkg_fqn_name(&self) -> PkgFqn {
391 let (namespace, rest) = self.deref().split_once(':').unwrap();
392 let (package_name, rest) = rest.split_once('/').unwrap();
393 let version = rest.split_once('@').map(|(_, version)| version);
394 PkgFqn {
395 namespace: namespace.to_string(),
396 package_name: package_name.to_string(),
397 version: version.map(std::string::ToString::to_string),
398 }
399 }
400
401 #[must_use]
402 pub fn ifc_name(&self) -> &str {
403 let after_colon = self.deref().split_once(':').unwrap().1;
404 let after_slash = after_colon.split_once('/').unwrap().1;
405 after_slash
406 .split_once('@')
407 .map_or(after_slash, |(ifc, _)| ifc)
408 }
409
410 #[must_use]
411 pub fn is_extension(&self) -> bool {
413 PkgFqn::is_package_name_ext(self.package_name())
414 }
415
416 #[must_use]
417 pub fn package_strip_obelisk_ext_suffix(&self) -> Option<&str> {
418 self.package_name().strip_suffix(SUFFIX_PKG_EXT)
419 }
420
421 #[must_use]
422 pub fn package_strip_obelisk_schedule_suffix(&self) -> Option<&str> {
423 self.package_name().strip_suffix(SUFFIX_PKG_SCHEDULE)
424 }
425
426 #[must_use]
427 pub fn package_strip_obelisk_stub_suffix(&self) -> Option<&str> {
428 self.package_name().strip_suffix(SUFFIX_PKG_STUB)
429 }
430
431 #[must_use]
432 pub fn is_namespace_obelisk(&self) -> bool {
433 self.namespace() == NAMESPACE_OBELISK
434 }
435
436 #[must_use]
437 pub fn is_namespace_wasi(&self) -> bool {
438 self.namespace() == NAMESPACE_WASI
439 }
440}
441
442#[derive(Hash, Clone, PartialEq, Eq)]
443pub struct FnMarker;
444
445pub type FnName = Name<FnMarker>;
446
447#[derive(
448 Hash, Clone, PartialEq, Eq, serde_with::SerializeDisplay, serde_with::DeserializeFromStr,
449)]
450pub struct FunctionFqn {
451 pub ifc_fqn: IfcFqnName,
452 pub function_name: FnName,
453}
454
455impl FunctionFqn {
456 #[must_use]
457 pub fn new_arc(ifc_fqn: Arc<str>, function_name: Arc<str>) -> Self {
458 Self {
459 ifc_fqn: Name::new_arc(ifc_fqn),
460 function_name: Name::new_arc(function_name),
461 }
462 }
463
464 #[must_use]
465 pub const fn new_static(ifc_fqn: &'static str, function_name: &'static str) -> Self {
466 Self {
467 ifc_fqn: Name::new_static(ifc_fqn),
468 function_name: Name::new_static(function_name),
469 }
470 }
471
472 #[must_use]
473 pub const fn new_static_tuple(tuple: (&'static str, &'static str)) -> Self {
474 Self::new_static(tuple.0, tuple.1)
475 }
476
477 pub fn try_from_tuple(
478 ifc_fqn: &str,
479 function_name: &str,
480 ) -> Result<Self, FunctionFqnParseError> {
481 if function_name.contains('.') {
482 Err(FunctionFqnParseError::DelimiterFoundInFunctionName)
483 } else {
484 Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
485 }
486 }
487}
488
489#[derive(Debug, thiserror::Error)]
490pub enum FunctionFqnParseError {
491 #[error("delimiter `.` not found")]
492 DelimiterNotFound,
493 #[error("delimiter `.` found in function name")]
494 DelimiterFoundInFunctionName,
495}
496
497impl FromStr for FunctionFqn {
498 type Err = FunctionFqnParseError;
499
500 fn from_str(s: &str) -> Result<Self, Self::Err> {
501 if let Some((ifc_fqn, function_name)) = s.rsplit_once('.') {
502 Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
503 } else {
504 Err(FunctionFqnParseError::DelimiterNotFound)
505 }
506 }
507}
508
509impl Display for FunctionFqn {
510 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511 write!(
512 f,
513 "{ifc_fqn}.{function_name}",
514 ifc_fqn = self.ifc_fqn,
515 function_name = self.function_name
516 )
517 }
518}
519
520impl Debug for FunctionFqn {
521 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
522 Display::fmt(&self, f)
523 }
524}
525
526#[cfg(any(test, feature = "test"))]
527impl<'a> arbitrary::Arbitrary<'a> for FunctionFqn {
528 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
529 let illegal = [':', '@', '.'];
530 let namespace = u.arbitrary::<String>()?.replace(illegal, "");
531 let pkg_name = u.arbitrary::<String>()?.replace(illegal, "");
532 let ifc_name = u.arbitrary::<String>()?.replace(illegal, "");
533 let fn_name = u.arbitrary::<String>()?.replace(illegal, "");
534
535 Ok(FunctionFqn::new_arc(
536 Arc::from(format!("{namespace}:{pkg_name}/{ifc_name}")),
537 Arc::from(fn_name),
538 ))
539 }
540}
541
542#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
543pub struct TypeWrapperTopLevel {
544 pub ok: Option<Box<TypeWrapper>>,
545 pub err: Option<Box<TypeWrapper>>,
546}
547impl From<TypeWrapperTopLevel> for TypeWrapper {
548 fn from(value: TypeWrapperTopLevel) -> TypeWrapper {
549 TypeWrapper::Result {
550 ok: value.ok,
551 err: value.err,
552 }
553 }
554}
555
556#[derive(Clone, derive_more::Debug, PartialEq, Eq, Serialize, Deserialize)]
557#[serde(rename_all = "snake_case")]
558pub enum SupportedFunctionReturnValue {
559 Ok {
560 #[debug(skip)]
561 ok: Option<WastValWithType>,
562 },
563 Err {
564 #[debug(skip)]
565 err: Option<WastValWithType>,
566 },
567 ExecutionError(FinishedExecutionError),
568}
569impl Display for SupportedFunctionReturnValue {
570 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
571 match self.as_pending_state_finished_result() {
572 PendingStateFinishedResultKind::Ok => write!(f, "execution completed successfully"),
573 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
574 }
575 }
576}
577
578pub const SUPPORTED_RETURN_VALUE_OK_EMPTY: SupportedFunctionReturnValue =
579 SupportedFunctionReturnValue::Ok { ok: None };
580
581#[derive(Debug, thiserror::Error)]
582pub enum ResultParsingError {
583 #[error("return value must not be empty")]
584 NoValue,
585 #[error("return value cannot be parsed, multi-value results are not supported")]
586 MultiValue,
587 #[error("return value cannot be parsed, {0}")]
588 TypeConversionError(val_json::type_wrapper::TypeConversionError),
589 #[error(transparent)]
590 ResultParsingErrorFromVal(ResultParsingErrorFromVal),
591}
592
593#[derive(Debug, thiserror::Error)]
594pub enum ResultParsingErrorFromVal {
595 #[error("return value cannot be parsed, {0}")]
596 WastValConversionError(val_json::wast_val::WastValConversionError),
597 #[error("top level type must be a result")]
598 TopLevelTypeMustBeAResult,
599 #[error("value does not type check")]
600 TypeCheckError,
601}
602
603impl SupportedFunctionReturnValue {
604 pub fn new<
605 I: ExactSizeIterator<Item = (wasmtime::component::Val, wasmtime::component::Type)>,
606 >(
607 mut iter: I,
608 ) -> Result<Self, ResultParsingError> {
609 if iter.len() == 0 {
610 Err(ResultParsingError::NoValue)
611 } else if iter.len() == 1 {
612 let (val, r#type) = iter.next().unwrap();
613 let r#type =
614 TypeWrapper::try_from(r#type).map_err(ResultParsingError::TypeConversionError)?;
615 Self::from_val_and_type_wrapper(val, r#type)
616 .map_err(ResultParsingError::ResultParsingErrorFromVal)
617 } else {
618 Err(ResultParsingError::MultiValue)
619 }
620 }
621
622 #[expect(clippy::result_unit_err)]
623 pub fn from_wast_val_with_type(
624 value: WastValWithType,
625 ) -> Result<SupportedFunctionReturnValue, ()> {
626 match value {
627 WastValWithType {
628 r#type: TypeWrapper::Result { ok: None, err: _ },
629 value: WastVal::Result(Ok(None)),
630 } => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
631 WastValWithType {
632 r#type:
633 TypeWrapper::Result {
634 ok: Some(ok),
635 err: _,
636 },
637 value: WastVal::Result(Ok(Some(value))),
638 } => Ok(SupportedFunctionReturnValue::Ok {
639 ok: Some(WastValWithType {
640 r#type: *ok,
641 value: *value,
642 }),
643 }),
644 WastValWithType {
645 r#type: TypeWrapper::Result { ok: _, err: None },
646 value: WastVal::Result(Err(None)),
647 } => Ok(SupportedFunctionReturnValue::Err { err: None }),
648 WastValWithType {
649 r#type:
650 TypeWrapper::Result {
651 ok: _,
652 err: Some(err),
653 },
654 value: WastVal::Result(Err(Some(value))),
655 } => Ok(SupportedFunctionReturnValue::Err {
656 err: Some(WastValWithType {
657 r#type: *err,
658 value: *value,
659 }),
660 }),
661 _ => Err(()),
662 }
663 }
664
665 pub fn from_val_and_type_wrapper(
666 value: wasmtime::component::Val,
667 ty: TypeWrapper,
668 ) -> Result<Self, ResultParsingErrorFromVal> {
669 let TypeWrapper::Result { ok, err } = ty else {
670 return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
671 };
672 let ty = TypeWrapperTopLevel { ok, err };
673 Self::from_val_and_type_wrapper_tl(value, ty)
674 }
675
676 pub fn from_val_and_type_wrapper_tl(
677 value: wasmtime::component::Val,
678 ty: TypeWrapperTopLevel,
679 ) -> Result<Self, ResultParsingErrorFromVal> {
680 let wasmtime::component::Val::Result(value) = value else {
681 return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
682 };
683
684 match (ty.ok, ty.err, value) {
685 (None, _, Ok(None)) => Ok(SupportedFunctionReturnValue::Ok { ok: None }),
686 (Some(ok_type), _, Ok(Some(value))) => Ok(SupportedFunctionReturnValue::Ok {
687 ok: Some(WastValWithType {
688 r#type: *ok_type,
689 value: WastVal::try_from(*value)
690 .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
691 }),
692 }),
693 (_, None, Err(None)) => Ok(SupportedFunctionReturnValue::Err { err: None }),
694 (_, Some(err_type), Err(Some(value))) => Ok(SupportedFunctionReturnValue::Err {
695 err: Some(WastValWithType {
696 r#type: *err_type,
697 value: WastVal::try_from(*value)
698 .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
699 }),
700 }),
701 _other => Err(ResultParsingErrorFromVal::TypeCheckError),
702 }
703 }
704
705 #[must_use]
706 pub fn into_wast_val(self, get_return_type: impl FnOnce() -> TypeWrapperTopLevel) -> WastVal {
707 match self {
708 SupportedFunctionReturnValue::Ok { ok: None } => WastVal::Result(Ok(None)),
709 SupportedFunctionReturnValue::Ok { ok: Some(v) } => {
710 WastVal::Result(Ok(Some(Box::new(v.value))))
711 }
712 SupportedFunctionReturnValue::Err { err: None } => WastVal::Result(Err(None)),
713 SupportedFunctionReturnValue::Err { err: Some(v) } => {
714 WastVal::Result(Err(Some(Box::new(v.value))))
715 }
716 SupportedFunctionReturnValue::ExecutionError(_) => {
717 execution_error_to_wast_val(&get_return_type())
718 }
719 }
720 }
721
722 #[must_use]
723 pub fn as_pending_state_finished_result(&self) -> PendingStateFinishedResultKind {
724 match self {
725 SupportedFunctionReturnValue::Ok { ok: _ } => PendingStateFinishedResultKind::Ok,
726 SupportedFunctionReturnValue::Err { err: _ } => {
727 PendingStateFinishedResultKind::Err(PendingStateFinishedError::Error)
728 }
729 SupportedFunctionReturnValue::ExecutionError(err) => {
730 PendingStateFinishedResultKind::Err(err.as_pending_state_finished_error())
731 }
732 }
733 }
734}
735
736#[must_use]
737pub fn execution_error_to_wast_val(ret_type: &TypeWrapperTopLevel) -> WastVal {
738 match ret_type {
739 TypeWrapperTopLevel { ok: _, err: None } => return WastVal::Result(Err(None)),
740 TypeWrapperTopLevel {
741 ok: _,
742 err: Some(inner),
743 } => match inner.as_ref() {
744 TypeWrapper::String => {
745 return WastVal::Result(Err(Some(Box::new(WastVal::String(
746 EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
747 )))));
748 }
749 TypeWrapper::Variant(variants) => {
750 if variants.get(EXECUTION_FAILED_STRING_OR_VARIANT) == Some(&None) {
751 return WastVal::Result(Err(Some(Box::new(WastVal::Variant(
752 EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
753 None,
754 )))));
755 }
756 }
757 _ => {}
758 },
759 }
760 unreachable!("unexpected top-level return type {ret_type:?} cannot be ReturnTypeCompatible")
761}
762
763#[derive(Debug, Clone)]
764pub struct Params(ParamsInternal);
765
766#[derive(derive_more::Debug, Clone)]
767enum ParamsInternal {
768 JsonValues(Arc<[Value]>),
769 Vals {
770 vals: Arc<[wasmtime::component::Val]>,
771 #[debug(skip)]
772 json_vals_cache: Arc<std::sync::RwLock<Option<Arc<[Value]>>>>, },
774 Empty,
775}
776
777pub const SUFFIX_FN_SUBMIT: &str = "-submit";
778pub const SUFFIX_FN_AWAIT_NEXT: &str = "-await-next";
779pub const SUFFIX_FN_SCHEDULE: &str = "-schedule";
780pub const SUFFIX_FN_STUB: &str = "-stub";
781pub const SUFFIX_FN_GET: &str = "-get";
782
783#[derive(
784 Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, strum::EnumIter,
785)]
786#[serde(rename_all = "snake_case")]
787pub enum FunctionExtension {
788 Submit,
789 AwaitNext,
790 Schedule,
791 Stub,
792 Get,
793}
794impl FunctionExtension {
795 #[must_use]
796 pub fn suffix(&self) -> &'static str {
797 match self {
798 FunctionExtension::Submit => SUFFIX_FN_SUBMIT,
799 FunctionExtension::AwaitNext => SUFFIX_FN_AWAIT_NEXT,
800 FunctionExtension::Schedule => SUFFIX_FN_SCHEDULE,
801 FunctionExtension::Stub => SUFFIX_FN_STUB,
802 FunctionExtension::Get => SUFFIX_FN_GET,
803 }
804 }
805
806 #[must_use]
807 pub fn belongs_to(&self, pkg_ext: PackageExtension) -> bool {
808 matches!(
809 (pkg_ext, self),
810 (
811 PackageExtension::ObeliskExt,
812 FunctionExtension::Submit | FunctionExtension::AwaitNext | FunctionExtension::Get
813 ) | (
814 PackageExtension::ObeliskSchedule,
815 FunctionExtension::Schedule
816 ) | (PackageExtension::ObeliskStub, FunctionExtension::Stub)
817 )
818 }
819}
820
821#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
822pub struct FunctionMetadata {
823 pub ffqn: FunctionFqn,
824 pub parameter_types: ParameterTypes,
825 pub return_type: ReturnType,
826 pub extension: Option<FunctionExtension>,
827 pub submittable: bool,
829}
830impl FunctionMetadata {
831 #[must_use]
832 pub fn split_extension(&self) -> Option<(&str, FunctionExtension)> {
833 self.extension.map(|extension| {
834 let prefix = self
835 .ffqn
836 .function_name
837 .value
838 .strip_suffix(extension.suffix())
839 .unwrap_or_else(|| {
840 panic!(
841 "extension function {} must end with expected suffix {}",
842 self.ffqn.function_name,
843 extension.suffix()
844 )
845 });
846 (prefix, extension)
847 })
848 }
849}
850impl Display for FunctionMetadata {
851 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
852 write!(
853 f,
854 "{ffqn}: func{params} -> {return_type}",
855 ffqn = self.ffqn,
856 params = self.parameter_types,
857 return_type = self.return_type,
858 )
859 }
860}
861
862pub mod serde_params {
863 use crate::{Params, ParamsInternal};
864 use serde::de::{SeqAccess, Visitor};
865 use serde::ser::SerializeSeq;
866 use serde::{Deserialize, Serialize};
867 use serde_json::Value;
868 use std::sync::Arc;
869 use val_json::wast_val::WastVal;
870
871 impl Serialize for Params {
872 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
873 where
874 S: ::serde::Serializer,
875 {
876 let serialize_json_values = |slice: &[serde_json::Value], serializer: S| {
877 let mut seq = serializer.serialize_seq(Some(slice.len()))?;
878 for item in slice {
879 seq.serialize_element(item)?;
880 }
881 seq.end()
882 };
883 match &self.0 {
884 ParamsInternal::Vals {
885 vals,
886 json_vals_cache,
887 } => {
888 let guard = json_vals_cache.read().unwrap();
889 if let Some(slice) = &*guard {
890 serialize_json_values(slice, serializer)
891 } else {
892 drop(guard);
893 let mut json_vals = Vec::with_capacity(vals.len());
894 for val in vals.iter() {
895 let value = WastVal::try_from(val.clone())
896 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
897 let value = serde_json::to_value(&value)
898 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
899 json_vals.push(value);
900 }
901 let res = serialize_json_values(&json_vals, serializer);
902 *json_vals_cache.write().unwrap() = Some(Arc::from(json_vals));
903 res
904 }
905 }
906 ParamsInternal::Empty => serializer.serialize_seq(Some(0))?.end(),
907 ParamsInternal::JsonValues(vec) => serialize_json_values(vec, serializer),
908 }
909 }
910 }
911
912 pub struct VecVisitor;
913
914 impl<'de> Visitor<'de> for VecVisitor {
915 type Value = Vec<Value>;
916
917 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
918 formatter.write_str("a sequence of `Value`")
919 }
920
921 #[inline]
922 fn visit_seq<V>(self, mut visitor: V) -> Result<Self::Value, V::Error>
923 where
924 V: SeqAccess<'de>,
925 {
926 let mut vec = Vec::new();
927 while let Some(elem) = visitor.next_element()? {
928 vec.push(elem);
929 }
930 Ok(vec)
931 }
932 }
933
934 impl<'de> Deserialize<'de> for Params {
935 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
936 where
937 D: serde::Deserializer<'de>,
938 {
939 let vec: Vec<Value> = deserializer.deserialize_seq(VecVisitor)?;
940 if vec.is_empty() {
941 Ok(Self(ParamsInternal::Empty))
942 } else {
943 Ok(Self(ParamsInternal::JsonValues(Arc::from(vec))))
944 }
945 }
946 }
947}
948
949#[derive(Debug, thiserror::Error)]
950pub enum ParamsParsingError {
951 #[error("parameters cannot be parsed, cannot convert type of {idx}-th parameter")]
952 ParameterTypeError {
953 idx: usize,
954 err: TypeConversionError,
955 },
956 #[error("parameters cannot be deserialized: {0}")]
957 ParamsDeserializationError(serde_json::Error),
958 #[error("parameter cardinality mismatch, expected: {expected}, specified: {specified}")]
959 ParameterCardinalityMismatch { expected: usize, specified: usize },
960}
961
962impl ParamsParsingError {
963 #[must_use]
964 pub fn detail(&self) -> Option<String> {
965 match self {
966 ParamsParsingError::ParameterTypeError { err, .. } => Some(format!("{err:?}")),
967 ParamsParsingError::ParamsDeserializationError(err) => Some(format!("{err:?}")),
968 ParamsParsingError::ParameterCardinalityMismatch { .. } => None,
969 }
970 }
971}
972
973#[derive(Debug, thiserror::Error)]
974pub enum ParamsFromJsonError {
975 #[error("value must be a json array containing function parameters")]
976 MustBeArray,
977}
978
979impl Params {
980 #[must_use]
981 pub const fn empty() -> Self {
982 Self(ParamsInternal::Empty)
983 }
984
985 #[must_use]
986 pub fn from_wasmtime(vals: Arc<[wasmtime::component::Val]>) -> Self {
987 if vals.is_empty() {
988 Self::empty()
989 } else {
990 Self(ParamsInternal::Vals {
991 vals,
992 json_vals_cache: Arc::default(),
993 })
994 }
995 }
996
997 #[cfg(any(test, feature = "test"))]
998 #[must_use]
999 pub fn from_json_values_test(vec: Vec<Value>) -> Self {
1000 if vec.is_empty() {
1001 Self::empty()
1002 } else {
1003 Self(ParamsInternal::JsonValues(Arc::from(vec)))
1004 }
1005 }
1006
1007 #[must_use]
1008 pub fn from_json_values(values: Arc<[Value]>) -> Self {
1009 if values.is_empty() {
1010 Self::empty()
1011 } else {
1012 Self(ParamsInternal::JsonValues(values))
1013 }
1014 }
1015
1016 pub fn typecheck<'a>(
1017 &self,
1018 param_types: impl ExactSizeIterator<Item = &'a TypeWrapper>,
1019 ) -> Result<(), ParamsParsingError> {
1020 if param_types.len() != self.len() {
1021 return Err(ParamsParsingError::ParameterCardinalityMismatch {
1022 expected: param_types.len(),
1023 specified: self.len(),
1024 });
1025 }
1026 match &self.0 {
1027 ParamsInternal::Vals { .. } | ParamsInternal::Empty => {}
1028 ParamsInternal::JsonValues(params) => {
1029 params::deserialize_values(params, param_types)
1030 .map_err(ParamsParsingError::ParamsDeserializationError)?;
1031 }
1032 }
1033 Ok(())
1034 }
1035
1036 pub fn as_vals<'a>(
1037 &self,
1038 param_types: impl ExactSizeIterator<Item = (&'a str, Type)>,
1039 ) -> Result<Arc<[wasmtime::component::Val]>, ParamsParsingError> {
1040 if param_types.len() != self.len() {
1041 return Err(ParamsParsingError::ParameterCardinalityMismatch {
1042 expected: param_types.len(),
1043 specified: self.len(),
1044 });
1045 }
1046 match &self.0 {
1047 ParamsInternal::JsonValues(json_vec) => {
1048 let param_types = param_types
1049 .enumerate()
1050 .map(|(idx, (_param_name, ty))| {
1051 TypeWrapper::try_from(ty).map_err(|err| (idx, err))
1052 })
1053 .collect::<Result<Vec<_>, _>>()
1054 .map_err(|(idx, err)| ParamsParsingError::ParameterTypeError { idx, err })?;
1055 Ok(params::deserialize_values(json_vec, param_types.iter())
1056 .map_err(ParamsParsingError::ParamsDeserializationError)?
1057 .into_iter()
1058 .map(Val::from)
1059 .collect())
1060 }
1061 ParamsInternal::Vals { vals, .. } => Ok(vals.clone()),
1062 ParamsInternal::Empty => Ok(Arc::from([])),
1063 }
1064 }
1065
1066 #[must_use]
1067 pub fn len(&self) -> usize {
1068 match &self.0 {
1069 ParamsInternal::JsonValues(vec) => vec.len(),
1070 ParamsInternal::Vals { vals, .. } => vals.len(),
1071 ParamsInternal::Empty => 0,
1072 }
1073 }
1074
1075 #[must_use]
1076 pub fn is_empty(&self) -> bool {
1077 self.len() == 0
1078 }
1079}
1080
1081impl PartialEq for Params {
1082 fn eq(&self, other: &Self) -> bool {
1083 if self.is_empty() && other.is_empty() {
1084 return true;
1085 }
1086 if self.len() != other.len() {
1087 return false;
1088 }
1089
1090 match (&self.0, &other.0) {
1091 (ParamsInternal::JsonValues(l), ParamsInternal::JsonValues(r)) => l == r,
1092
1093 (
1094 ParamsInternal::Vals {
1095 vals: l,
1096 json_vals_cache: _,
1097 },
1098 ParamsInternal::Vals {
1099 vals: r,
1100 json_vals_cache: _,
1101 },
1102 ) => l == r,
1103
1104 (
1105 ParamsInternal::JsonValues(json_vals),
1106 ParamsInternal::Vals {
1107 vals,
1108 json_vals_cache,
1109 },
1110 )
1111 | (
1112 ParamsInternal::Vals {
1113 vals,
1114 json_vals_cache,
1115 },
1116 ParamsInternal::JsonValues(json_vals),
1117 ) => {
1118 let Ok(vec) = to_json(vals) else { return false };
1119 let equals = *json_vals == vec;
1120 *json_vals_cache.write().unwrap() = Some(vec);
1121 equals
1122 }
1123
1124 (ParamsInternal::Empty, _) | (_, ParamsInternal::Empty) => {
1125 unreachable!("zero length and different lengths handled earlier")
1126 }
1127 }
1128 }
1129}
1130impl Eq for Params {}
1131
1132fn to_json(vals: &[wasmtime::component::Val]) -> Result<Arc<[serde_json::Value]>, ()> {
1134 let mut vec = Vec::with_capacity(vals.len());
1135 for val in vals {
1136 let value = match WastVal::try_from(val.clone()) {
1137 Ok(ok) => ok,
1138 Err(err) => {
1139 error!("cannot compare Params, cannot convert to WastVal: {err:?}");
1140 return Err(());
1141 }
1142 };
1143 let value = match serde_json::to_value(&value) {
1144 Ok(ok) => ok,
1145 Err(err) => {
1146 error!("cannot compare Params, cannot convert to JSON: {err:?}");
1147 return Err(());
1148 }
1149 };
1150 vec.push(value);
1151 }
1152 Ok(Arc::from(vec))
1153}
1154
1155impl Display for Params {
1156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1157 let render_json_vals =
1158 |f: &mut std::fmt::Formatter<'_>, json_vals: &[Value]| -> std::fmt::Result {
1159 for (i, json_value) in json_vals.iter().enumerate() {
1160 if i > 0 {
1161 write!(f, ", ")?;
1162 }
1163 write!(f, "{json_value}")?;
1164 }
1165 Ok(())
1166 };
1167 write!(f, "[")?;
1168 match &self.0 {
1169 ParamsInternal::Empty => {}
1170 ParamsInternal::JsonValues(json_vals) => {
1171 render_json_vals(f, json_vals)?;
1172 }
1173 ParamsInternal::Vals {
1174 vals,
1175 json_vals_cache,
1176 } => {
1177 let guard = json_vals_cache.read().unwrap();
1178 if let Some(json_vals) = &*guard {
1179 render_json_vals(f, json_vals)?;
1180 } else {
1181 drop(guard);
1182 let Ok(json_vals) = to_json(vals) else {
1183 return write!(f, "<serialization error>]"); };
1185 render_json_vals(f, &json_vals)?;
1186 *json_vals_cache.write().unwrap() = Some(json_vals);
1187 }
1188 }
1189 }
1190 write!(f, "]")
1191 }
1192}
1193
1194pub mod prefixed_ulid {
1195 use crate::{
1196 EXECUTION_ID_INFIX, EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX, JoinSetId,
1197 JoinSetIdParseError,
1198 };
1199 use serde_with::{DeserializeFromStr, SerializeDisplay};
1200 use std::{
1201 fmt::{Debug, Display},
1202 hash::Hasher,
1203 marker::PhantomData,
1204 num::ParseIntError,
1205 str::FromStr,
1206 sync::Arc,
1207 };
1208 use ulid::Ulid;
1209
1210 #[derive(derive_more::Display, SerializeDisplay, DeserializeFromStr)]
1211 #[derive_where::derive_where(Clone, Copy)]
1212 #[display("{}_{ulid}", Self::prefix())]
1213 pub struct PrefixedUlid<T: 'static> {
1214 ulid: Ulid,
1215 phantom_data: PhantomData<fn(T) -> T>,
1216 }
1217
1218 impl<T> PrefixedUlid<T> {
1219 const fn new(ulid: Ulid) -> Self {
1220 Self {
1221 ulid,
1222 phantom_data: PhantomData,
1223 }
1224 }
1225
1226 fn prefix() -> &'static str {
1227 std::any::type_name::<T>().rsplit("::").next().unwrap()
1228 }
1229 }
1230
1231 impl<T> PrefixedUlid<T> {
1232 #[must_use]
1233 pub fn generate() -> Self {
1234 Self::new(Ulid::new())
1235 }
1236
1237 #[must_use]
1238 pub const fn from_parts(timestamp_ms: u64, random: u128) -> Self {
1239 Self::new(Ulid::from_parts(timestamp_ms, random))
1240 }
1241
1242 #[must_use]
1243 pub fn timestamp_part(&self) -> u64 {
1244 self.ulid.timestamp_ms()
1245 }
1246
1247 #[must_use]
1248 pub fn random_part(&self) -> u128 {
1250 self.ulid.random()
1251 }
1252 }
1253
1254 #[derive(Debug, thiserror::Error)]
1255 pub enum PrefixedUlidParseError {
1256 #[error("wrong prefix in `{input}`, expected prefix `{expected}`")]
1257 WrongPrefix { input: String, expected: String },
1258 #[error("cannot parse ULID suffix from `{input}`")]
1259 CannotParseUlid { input: String },
1260 }
1261
1262 mod impls {
1263 use super::{PrefixedUlid, PrefixedUlidParseError, Ulid};
1264 use std::{fmt::Debug, fmt::Display, hash::Hash, marker::PhantomData, str::FromStr};
1265
1266 impl<T> FromStr for PrefixedUlid<T> {
1267 type Err = PrefixedUlidParseError;
1268
1269 fn from_str(input: &str) -> Result<Self, Self::Err> {
1270 let prefix = Self::prefix();
1271 let mut input_chars = input.chars();
1272 for exp in prefix.chars() {
1273 if input_chars.next() != Some(exp) {
1274 return Err(PrefixedUlidParseError::WrongPrefix {
1275 input: input.to_string(),
1276 expected: format!("{prefix}_"),
1277 });
1278 }
1279 }
1280 if input_chars.next() != Some('_') {
1281 return Err(PrefixedUlidParseError::WrongPrefix {
1282 input: input.to_string(),
1283 expected: format!("{prefix}_"),
1284 });
1285 }
1286 let Ok(ulid) = Ulid::from_string(input_chars.as_str()) else {
1287 return Err(PrefixedUlidParseError::CannotParseUlid {
1288 input: input.to_string(),
1289 });
1290 };
1291 Ok(Self {
1292 ulid,
1293 phantom_data: PhantomData,
1294 })
1295 }
1296 }
1297
1298 impl<T> Debug for PrefixedUlid<T> {
1299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1300 Display::fmt(&self, f)
1301 }
1302 }
1303
1304 impl<T> Hash for PrefixedUlid<T> {
1305 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1306 Self::prefix().hash(state);
1307 self.ulid.hash(state);
1308 self.phantom_data.hash(state);
1309 }
1310 }
1311
1312 impl<T> PartialEq for PrefixedUlid<T> {
1313 fn eq(&self, other: &Self) -> bool {
1314 self.ulid == other.ulid
1315 }
1316 }
1317
1318 impl<T> Eq for PrefixedUlid<T> {}
1319
1320 impl<T> PartialOrd for PrefixedUlid<T> {
1321 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1322 Some(self.cmp(other))
1323 }
1324 }
1325
1326 impl<T> Ord for PrefixedUlid<T> {
1327 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1328 self.ulid.cmp(&other.ulid)
1329 }
1330 }
1331 }
1332
1333 pub mod prefix {
1334 pub struct E;
1335 pub struct Exr;
1336 pub struct Run;
1337 pub struct Delay;
1338 }
1339
1340 pub type ExecutorId = PrefixedUlid<prefix::Exr>;
1341 pub type ExecutionIdTopLevel = PrefixedUlid<prefix::E>;
1342 pub type RunId = PrefixedUlid<prefix::Run>;
1343 pub type DelayIdTopLevel = PrefixedUlid<prefix::Delay>; #[cfg(any(test, feature = "test"))]
1346 impl<'a, T> arbitrary::Arbitrary<'a> for PrefixedUlid<T> {
1347 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1348 Ok(Self::new(ulid::Ulid::from_parts(
1349 u.arbitrary()?,
1350 u.arbitrary()?,
1351 )))
1352 }
1353 }
1354
1355 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, SerializeDisplay, DeserializeFromStr, Clone)]
1356 pub enum ExecutionId {
1357 TopLevel(ExecutionIdTopLevel),
1358 Derived(ExecutionIdDerived),
1359 }
1360
1361 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1362 pub struct ExecutionIdDerived {
1363 top_level: ExecutionIdTopLevel,
1364 infix: Arc<str>,
1365 idx: u64,
1366 }
1367 impl ExecutionIdDerived {
1368 #[must_use]
1369 pub fn get_incremented(&self) -> Self {
1370 self.get_incremented_by(1)
1371 }
1372 #[must_use]
1373 pub fn get_incremented_by(&self, count: u64) -> Self {
1374 ExecutionIdDerived {
1375 top_level: self.top_level,
1376 infix: self.infix.clone(),
1377 idx: self.idx + count,
1378 }
1379 }
1380 #[must_use]
1381 pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1382 let ExecutionIdDerived {
1383 top_level,
1384 infix,
1385 idx,
1386 } = self;
1387 let infix = Arc::from(format!(
1388 "{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}{EXECUTION_ID_INFIX}{join_set_id}"
1389 ));
1390 ExecutionIdDerived {
1391 top_level: *top_level,
1392 infix,
1393 idx: EXECUTION_ID_START_IDX,
1394 }
1395 }
1396 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1397 let ExecutionIdDerived {
1398 top_level,
1399 infix,
1400 idx,
1401 } = self;
1402 write!(
1403 f,
1404 "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}"
1405 )
1406 }
1407
1408 #[must_use]
1409 pub fn split_to_parts(&self) -> (ExecutionId, JoinSetId) {
1410 self.split_to_parts_res().expect("verified in from_str")
1411 }
1412
1413 fn split_to_parts_res(&self) -> Result<(ExecutionId, JoinSetId), DerivedIdSplitError> {
1414 if let Some((old_infix_and_index, join_set_id)) =
1418 self.infix.rsplit_once(EXECUTION_ID_INFIX)
1419 {
1420 let join_set_id = JoinSetId::from_str(join_set_id)
1421 .map_err(DerivedIdSplitError::JoinSetIdParseError)?;
1422 let Some((old_infix, old_idx)) =
1423 old_infix_and_index.rsplit_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1424 else {
1425 return Err(DerivedIdSplitError::CannotFindJoinSetDelimiter);
1426 };
1427 let parent = ExecutionIdDerived {
1428 top_level: self.top_level,
1429 infix: Arc::from(old_infix),
1430 idx: old_idx
1431 .parse()
1432 .map_err(DerivedIdSplitError::CannotParseOldIndex)?,
1433 };
1434 Ok((ExecutionId::Derived(parent), join_set_id))
1435 } else {
1436 Ok((
1437 ExecutionId::TopLevel(self.top_level),
1438 JoinSetId::from_str(&self.infix)
1439 .map_err(DerivedIdSplitError::JoinSetIdParseError)?,
1440 ))
1441 }
1442 }
1443 }
1444
1445 #[derive(Debug, thiserror::Error)]
1446 pub enum DerivedIdSplitError {
1447 #[error(transparent)]
1448 JoinSetIdParseError(JoinSetIdParseError),
1449 #[error("cannot parse index of parent execution - {0}")]
1450 CannotParseOldIndex(ParseIntError),
1451 #[error("cannot find join set delimiter")]
1452 CannotFindJoinSetDelimiter,
1453 }
1454
1455 impl Debug for ExecutionIdDerived {
1456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1457 self.display_or_debug(f)
1458 }
1459 }
1460 impl Display for ExecutionIdDerived {
1461 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1462 self.display_or_debug(f)
1463 }
1464 }
1465 impl FromStr for ExecutionIdDerived {
1466 type Err = DerivedIdParseError;
1467
1468 fn from_str(input: &str) -> Result<Self, Self::Err> {
1469 let (top_level, infix, idx) = derived_from_str(input)?;
1470 let id = ExecutionIdDerived {
1471 top_level,
1472 infix,
1473 idx,
1474 };
1475 Ok(id)
1476 }
1477 }
1478
1479 fn derived_from_str<T: 'static>(
1480 input: &str,
1481 ) -> Result<(PrefixedUlid<T>, Arc<str>, u64), DerivedIdParseError> {
1482 let (prefix, full_suffix) = input
1484 .split_once(EXECUTION_ID_INFIX)
1485 .ok_or(DerivedIdParseError::FirstDelimiterNotFound)?;
1486
1487 let top_level =
1489 PrefixedUlid::from_str(prefix).map_err(DerivedIdParseError::PrefixedUlidParseError)?;
1490
1491 let mut last_idx = None;
1493 for segment in full_suffix.split(EXECUTION_ID_INFIX) {
1494 if segment.is_empty() {
1495 return Err(DerivedIdParseError::EmptySegment);
1496 }
1497 let (join_set_str, idx_str) = segment
1499 .split_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1500 .ok_or(DerivedIdParseError::SecondDelimiterNotFound)?;
1501 JoinSetId::from_str(join_set_str).map_err(DerivedIdParseError::JoinSetIdParseError)?;
1502 let idx = u64::from_str(idx_str).map_err(DerivedIdParseError::ParseIndexError)?;
1503 last_idx = Some((idx, idx_str));
1504 }
1505 let (idx, idx_str) = last_idx.expect("split must have returned at least one element");
1506 let infix = full_suffix
1507 .strip_suffix(idx_str)
1508 .expect("must have ended with index");
1509 let infix = infix
1510 .strip_suffix(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1511 .expect("must have ended with `_index`");
1512 Ok((top_level, Arc::from(infix), idx))
1513 }
1514
1515 #[cfg(any(test, feature = "test"))]
1516 impl<'a> arbitrary::Arbitrary<'a> for ExecutionIdDerived {
1517 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1518 let top_level = ExecutionId::TopLevel(ExecutionIdTopLevel::arbitrary(u)?);
1519 let join_set_id = JoinSetId::arbitrary(u)?;
1520 Ok(top_level.next_level(&join_set_id))
1521 }
1522 }
1523
1524 #[derive(Debug, thiserror::Error)]
1525 pub enum DerivedIdParseError {
1526 #[error(transparent)]
1527 PrefixedUlidParseError(PrefixedUlidParseError),
1528 #[error("cannot parse derived id - delimiter `{EXECUTION_ID_INFIX}` not found")]
1529 FirstDelimiterNotFound,
1530 #[error(
1531 "cannot parse derived id - delimiter `{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}` not found"
1532 )]
1533 SecondDelimiterNotFound,
1534 #[error(
1535 "cannot parse derived id - suffix after `{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}` must be a number"
1536 )]
1537 ParseIndexError(ParseIntError),
1538 #[error(transparent)]
1539 DerivedIdSplitError(DerivedIdSplitError),
1540 #[error(transparent)]
1541 JoinSetIdParseError(JoinSetIdParseError),
1542 #[error("empty segment")]
1543 EmptySegment,
1544 }
1545
1546 impl ExecutionId {
1547 #[must_use]
1548 pub fn generate() -> Self {
1549 ExecutionId::TopLevel(PrefixedUlid::generate())
1550 }
1551
1552 #[must_use]
1553 pub fn get_top_level(&self) -> ExecutionIdTopLevel {
1554 match &self {
1555 ExecutionId::TopLevel(prefixed_ulid) => *prefixed_ulid,
1556 ExecutionId::Derived(ExecutionIdDerived { top_level, .. }) => *top_level,
1557 }
1558 }
1559
1560 #[must_use]
1561 pub fn is_top_level(&self) -> bool {
1562 matches!(self, ExecutionId::TopLevel(_))
1563 }
1564
1565 #[must_use]
1566 pub fn random_seed(&self) -> u64 {
1567 let mut hasher = fxhash::FxHasher::default();
1568 #[expect(clippy::cast_possible_truncation)]
1572 let random_part = self.get_top_level().random_part() as u64;
1573 hasher.write_u64(random_part);
1574 hasher.write_u64(self.get_top_level().timestamp_part());
1575 if let ExecutionId::Derived(ExecutionIdDerived {
1576 top_level: _,
1577 infix,
1578 idx,
1579 }) = self
1580 {
1581 hasher.write(infix.as_bytes());
1583 hasher.write_u64(*idx);
1584 }
1585 hasher.finish()
1586 }
1587
1588 #[must_use]
1589 pub const fn from_parts(timestamp_ms: u64, random_part: u128) -> Self {
1590 ExecutionId::TopLevel(ExecutionIdTopLevel::from_parts(timestamp_ms, random_part))
1591 }
1592
1593 #[must_use]
1594 pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1595 match &self {
1596 ExecutionId::TopLevel(top_level) => ExecutionIdDerived {
1597 top_level: *top_level,
1598 infix: Arc::from(join_set_id.to_string()),
1599 idx: EXECUTION_ID_START_IDX,
1600 },
1601 ExecutionId::Derived(derived) => derived.next_level(join_set_id),
1602 }
1603 }
1604
1605 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1606 match &self {
1607 ExecutionId::TopLevel(top_level) => Display::fmt(top_level, f),
1608 ExecutionId::Derived(derived) => Display::fmt(derived, f),
1609 }
1610 }
1611 }
1612
1613 const EXECUTION_ID_START_IDX: u64 = 1;
1614 pub const JOIN_SET_START_IDX: u64 = 1;
1615 const DELAY_ID_START_IDX: u64 = 1;
1616
1617 #[derive(Debug, thiserror::Error)]
1618 pub enum ExecutionIdParseError {
1619 #[error(transparent)]
1620 PrefixedUlidParseError(PrefixedUlidParseError),
1621 #[error(transparent)]
1622 DerivedIdParseError(DerivedIdParseError),
1623 }
1624
1625 impl FromStr for ExecutionId {
1626 type Err = ExecutionIdParseError;
1627
1628 fn from_str(input: &str) -> Result<Self, Self::Err> {
1629 if input.contains(EXECUTION_ID_INFIX) {
1630 ExecutionIdDerived::from_str(input)
1631 .map(ExecutionId::Derived)
1632 .map_err(ExecutionIdParseError::DerivedIdParseError)
1633 } else {
1634 Ok(ExecutionId::TopLevel(
1635 PrefixedUlid::from_str(input)
1636 .map_err(ExecutionIdParseError::PrefixedUlidParseError)?,
1637 ))
1638 }
1639 }
1640 }
1641
1642 #[derive(Debug, thiserror::Error)]
1643 pub enum ExecutionIdStructuralParseError {
1644 #[error(transparent)]
1645 ExecutionIdParseError(#[from] ExecutionIdParseError),
1646 #[error("execution-id must be a record with `id` field of type string")]
1647 TypeError,
1648 }
1649
1650 impl TryFrom<&wasmtime::component::Val> for ExecutionId {
1651 type Error = ExecutionIdStructuralParseError;
1652
1653 fn try_from(execution_id: &wasmtime::component::Val) -> Result<Self, Self::Error> {
1654 if let wasmtime::component::Val::Record(key_vals) = execution_id
1655 && key_vals.len() == 1
1656 && let Some((key, execution_id)) = key_vals.first()
1657 && key == "id"
1658 && let wasmtime::component::Val::String(execution_id) = execution_id
1659 {
1660 ExecutionId::from_str(execution_id)
1661 .map_err(ExecutionIdStructuralParseError::ExecutionIdParseError)
1662 } else {
1663 Err(ExecutionIdStructuralParseError::TypeError)
1664 }
1665 }
1666 }
1667
1668 impl Debug for ExecutionId {
1669 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1670 self.display_or_debug(f)
1671 }
1672 }
1673
1674 impl Display for ExecutionId {
1675 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1676 self.display_or_debug(f)
1677 }
1678 }
1679
1680 #[cfg(any(test, feature = "test"))]
1681 impl<'a> arbitrary::Arbitrary<'a> for ExecutionId {
1682 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1683 Ok(ExecutionId::TopLevel(PrefixedUlid::arbitrary(u)?))
1684 }
1685 }
1686
1687 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1689 pub struct DelayId {
1690 top_level: DelayIdTopLevel,
1691 infix: Arc<str>,
1692 idx: u64,
1693 }
1694 impl DelayId {
1695 #[must_use]
1696 pub fn new(execution_id: &ExecutionId, join_set_id: &JoinSetId) -> DelayId {
1697 Self::new_with_index(execution_id, join_set_id, DELAY_ID_START_IDX)
1698 }
1699
1700 #[must_use]
1701 pub fn new_with_index(
1702 execution_id: &ExecutionId,
1703 join_set_id: &JoinSetId,
1704 idx: u64,
1705 ) -> DelayId {
1706 let ExecutionIdDerived {
1707 top_level: PrefixedUlid { ulid, .. },
1708 infix,
1709 idx: _,
1710 } = execution_id.next_level(join_set_id);
1711 let top_level = DelayIdTopLevel::new(ulid);
1712 DelayId {
1713 top_level,
1714 infix,
1715 idx,
1716 }
1717 }
1718
1719 #[must_use]
1720 pub fn get_incremented(&self) -> Self {
1721 Self {
1722 top_level: self.top_level,
1723 infix: self.infix.clone(),
1724 idx: self.idx + 1,
1725 }
1726 }
1727
1728 #[must_use]
1729 pub fn split_to_parts(&self) -> (ExecutionId, JoinSetId) {
1730 self.split_to_parts_res().expect("verified in from_str")
1731 }
1732
1733 fn split_to_parts_res(&self) -> Result<(ExecutionId, JoinSetId), DerivedIdSplitError> {
1734 if let Some((old_infix_and_index, join_set_id)) =
1738 self.infix.rsplit_once(EXECUTION_ID_INFIX)
1739 {
1740 let join_set_id = JoinSetId::from_str(join_set_id)
1741 .map_err(DerivedIdSplitError::JoinSetIdParseError)?;
1742 let Some((old_infix, old_idx)) =
1743 old_infix_and_index.rsplit_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1744 else {
1745 return Err(DerivedIdSplitError::CannotFindJoinSetDelimiter);
1746 };
1747 let parent = ExecutionIdDerived {
1748 top_level: ExecutionIdTopLevel::new(self.top_level.ulid),
1749 infix: Arc::from(old_infix),
1750 idx: old_idx
1751 .parse()
1752 .map_err(DerivedIdSplitError::CannotParseOldIndex)?,
1753 };
1754 Ok((ExecutionId::Derived(parent), join_set_id))
1755 } else {
1756 Ok((
1757 ExecutionId::TopLevel(ExecutionIdTopLevel::new(self.top_level.ulid)),
1758 JoinSetId::from_str(&self.infix)
1759 .map_err(DerivedIdSplitError::JoinSetIdParseError)?,
1760 ))
1761 }
1762 }
1763
1764 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1765 let DelayId {
1766 top_level,
1767 infix,
1768 idx,
1769 } = self;
1770 write!(
1771 f,
1772 "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}"
1773 )
1774 }
1775 }
1776
1777 pub mod delay_impl {
1778 use super::{DelayId, DerivedIdParseError, derived_from_str};
1779 use std::{
1780 fmt::{Debug, Display},
1781 str::FromStr,
1782 };
1783
1784 impl Debug for DelayId {
1785 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1786 self.display_or_debug(f)
1787 }
1788 }
1789
1790 impl Display for DelayId {
1791 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1792 self.display_or_debug(f)
1793 }
1794 }
1795
1796 impl FromStr for DelayId {
1797 type Err = DerivedIdParseError;
1798
1799 fn from_str(input: &str) -> Result<Self, Self::Err> {
1800 let (top_level, infix, idx) = derived_from_str(input)?;
1801 let id = DelayId {
1802 top_level,
1803 infix,
1804 idx,
1805 };
1806 Ok(id)
1807 }
1808 }
1809
1810 #[cfg(any(test, feature = "test"))]
1811 impl<'a> arbitrary::Arbitrary<'a> for DelayId {
1812 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1813 use super::{ExecutionId, JoinSetId};
1814 let execution_id = ExecutionId::arbitrary(u)?;
1815 let mut join_set_id = JoinSetId::arbitrary(u)?;
1816 join_set_id.kind = crate::JoinSetKind::OneOff;
1817 Ok(DelayId::new(&execution_id, &join_set_id))
1818 }
1819 }
1820 }
1821}
1822
1823#[derive(
1824 Debug,
1825 Clone,
1826 PartialEq,
1827 Eq,
1828 Hash,
1829 derive_more::Display,
1830 serde_with::SerializeDisplay,
1831 serde_with::DeserializeFromStr,
1832)]
1833#[non_exhaustive] #[display("{kind}{JOIN_SET_ID_INFIX}{name}")]
1835pub struct JoinSetId {
1836 pub kind: JoinSetKind,
1837 pub name: StrVariant,
1838}
1839
1840impl JoinSetId {
1841 pub fn new(kind: JoinSetKind, name: StrVariant) -> Result<Self, InvalidNameError<JoinSetId>> {
1842 Ok(Self {
1843 kind,
1844 name: check_name(name, CHARSET_EXTRA_JSON_SET)?,
1845 })
1846 }
1847}
1848
1849pub const CHARSET_ALPHANUMERIC: &str =
1850 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1851
1852#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display, strum::EnumIter)]
1853#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
1854#[display("{}", self.as_code())]
1855pub enum JoinSetKind {
1856 OneOff,
1857 Named,
1858 Generated,
1859}
1860impl JoinSetKind {
1861 fn as_code(&self) -> &'static str {
1862 match self {
1863 JoinSetKind::OneOff => "o",
1864 JoinSetKind::Named => "n",
1865 JoinSetKind::Generated => "g",
1866 }
1867 }
1868}
1869impl FromStr for JoinSetKind {
1870 type Err = &'static str;
1871 fn from_str(s: &str) -> Result<Self, Self::Err> {
1872 use strum::IntoEnumIterator;
1873 Self::iter()
1874 .find(|variant| s == variant.as_code())
1875 .ok_or("unknown join set kind")
1876 }
1877}
1878
1879pub(crate) const EXECUTION_ID_INFIX: char = '.';
1880pub(crate) const EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX: char = '_';
1881
1882const JOIN_SET_ID_INFIX: char = ':';
1883const CHARSET_EXTRA_JSON_SET: &str = "-/";
1884
1885impl FromStr for JoinSetId {
1886 type Err = JoinSetIdParseError;
1887
1888 fn from_str(input: &str) -> Result<Self, Self::Err> {
1889 let Some((kind, name)) = input.split_once(JOIN_SET_ID_INFIX) else {
1890 return Err(JoinSetIdParseError::WrongParts);
1891 };
1892 let kind = kind
1893 .parse()
1894 .map_err(JoinSetIdParseError::JoinSetKindParseError)?;
1895 JoinSetId::new(kind, StrVariant::from(name.to_string()))
1896 .map_err(JoinSetIdParseError::InvalidName)
1897 }
1898}
1899
1900#[derive(Debug, thiserror::Error)]
1901pub enum JoinSetIdParseError {
1902 #[error("join set must consist of three parts separated by {JOIN_SET_ID_INFIX} ")]
1903 WrongParts,
1904 #[error("cannot parse join set kind - {0}")]
1905 JoinSetKindParseError(&'static str),
1906 #[error("cannot parse join set id - {0}")]
1907 InvalidName(InvalidNameError<JoinSetId>),
1908}
1909
1910#[cfg(any(test, feature = "test"))]
1911const CHARSET_JOIN_SET_NAME: &str =
1912 const_format::concatcp!(CHARSET_ALPHANUMERIC, CHARSET_EXTRA_JSON_SET);
1913#[cfg(any(test, feature = "test"))]
1914impl<'a> arbitrary::Arbitrary<'a> for JoinSetId {
1915 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1916 let name: String = {
1917 let length_inclusive = u.int_in_range(0..=10).unwrap();
1918 (0..=length_inclusive)
1919 .map(|_| {
1920 let idx = u.choose_index(CHARSET_JOIN_SET_NAME.len()).unwrap();
1921 CHARSET_JOIN_SET_NAME
1922 .chars()
1923 .nth(idx)
1924 .expect("idx is < charset.len()")
1925 })
1926 .collect()
1927 };
1928
1929 Ok(JoinSetId::new(JoinSetKind::Named, StrVariant::from(name)).unwrap())
1930 }
1931}
1932
1933const EXECUTION_FAILED_STRING_OR_VARIANT: &str = "execution-failed";
1934#[derive(
1935 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1936)]
1937#[serde(rename_all = "snake_case")]
1938pub enum ReturnType {
1939 Extendable(ReturnTypeExtendable), NonExtendable(ReturnTypeNonExtendable), }
1942impl ReturnType {
1943 #[must_use]
1951 pub fn detect(type_wrapper: TypeWrapper, wit_type: StrVariant) -> ReturnType {
1952 if let TypeWrapper::Result { ok, err: None } = type_wrapper {
1953 return ReturnType::Extendable(ReturnTypeExtendable {
1954 type_wrapper_tl: TypeWrapperTopLevel { ok, err: None },
1955 wit_type,
1956 });
1957 } else if let TypeWrapper::Result { ok, err: Some(err) } = type_wrapper {
1958 if let TypeWrapper::String = err.as_ref() {
1959 return ReturnType::Extendable(ReturnTypeExtendable {
1960 type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
1961 wit_type,
1962 });
1963 } else if let TypeWrapper::Variant(fields) = err.as_ref()
1964 && let Some(None) = fields.get(EXECUTION_FAILED_STRING_OR_VARIANT)
1965 {
1966 return ReturnType::Extendable(ReturnTypeExtendable {
1967 type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
1968 wit_type,
1969 });
1970 }
1971 return ReturnType::NonExtendable(ReturnTypeNonExtendable {
1972 type_wrapper: TypeWrapper::Result { ok, err: Some(err) },
1973 wit_type,
1974 });
1975 }
1976 ReturnType::NonExtendable(ReturnTypeNonExtendable {
1977 type_wrapper: type_wrapper.clone(),
1978 wit_type,
1979 })
1980 }
1981
1982 #[must_use]
1983 pub fn wit_type(&self) -> &str {
1984 match self {
1985 ReturnType::Extendable(compatible) => compatible.wit_type.as_ref(),
1986 ReturnType::NonExtendable(incompatible) => incompatible.wit_type.as_ref(),
1987 }
1988 }
1989
1990 #[must_use]
1991 pub fn type_wrapper(&self) -> TypeWrapper {
1992 match self {
1993 ReturnType::Extendable(compatible) => {
1994 TypeWrapper::from(compatible.type_wrapper_tl.clone())
1995 }
1996 ReturnType::NonExtendable(incompatible) => incompatible.type_wrapper.clone(),
1997 }
1998 }
1999}
2000
2001#[derive(
2002 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2003)]
2004#[display("{wit_type}")]
2005pub struct ReturnTypeNonExtendable {
2006 pub type_wrapper: TypeWrapper,
2007 pub wit_type: StrVariant,
2008}
2009
2010#[derive(
2011 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2012)]
2013#[display("{wit_type}")]
2014pub struct ReturnTypeExtendable {
2015 pub type_wrapper_tl: TypeWrapperTopLevel,
2016 pub wit_type: StrVariant,
2017}
2018
2019#[cfg(any(test, feature = "test"))]
2020pub const RETURN_TYPE_DUMMY: ReturnType = ReturnType::Extendable(ReturnTypeExtendable {
2021 type_wrapper_tl: TypeWrapperTopLevel {
2022 ok: None,
2023 err: None,
2024 },
2025 wit_type: StrVariant::Static("result"),
2026});
2027
2028#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, derive_more::Display)]
2029#[derive_where::derive_where(PartialEq)]
2030#[display("{name}: {wit_type}")]
2031pub struct ParameterType {
2032 pub type_wrapper: TypeWrapper,
2033 #[derive_where(skip)]
2034 pub name: StrVariant,
2036 pub wit_type: StrVariant,
2037}
2038
2039#[derive(
2040 Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default, derive_more::Deref,
2041)]
2042pub struct ParameterTypes(pub Vec<ParameterType>);
2043
2044impl Debug for ParameterTypes {
2045 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2046 write!(f, "(")?;
2047 let mut iter = self.0.iter().peekable();
2048 while let Some(p) = iter.next() {
2049 write!(f, "{p:?}")?;
2050 if iter.peek().is_some() {
2051 write!(f, ", ")?;
2052 }
2053 }
2054 write!(f, ")")
2055 }
2056}
2057
2058impl Display for ParameterTypes {
2059 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2060 write!(f, "(")?;
2061 let mut iter = self.0.iter().peekable();
2062 while let Some(p) = iter.next() {
2063 write!(f, "{p}")?;
2064 if iter.peek().is_some() {
2065 write!(f, ", ")?;
2066 }
2067 }
2068 write!(f, ")")
2069 }
2070}
2071
2072#[derive(Debug, Clone)]
2073pub struct PackageIfcFns {
2074 pub ifc_fqn: IfcFqnName,
2075 pub extension: bool, pub fns: IndexMap<FnName, FunctionMetadata>,
2077}
2078
2079#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2080pub struct ComponentRetryConfig {
2081 pub max_retries: Option<u32>,
2082 pub retry_exp_backoff: Duration,
2083}
2084impl ComponentRetryConfig {
2085 pub const ZERO: ComponentRetryConfig = ComponentRetryConfig {
2086 max_retries: Some(0),
2087 retry_exp_backoff: Duration::ZERO,
2088 };
2089
2090 #[cfg(feature = "test")]
2091 pub const WORKFLOW_TEST: ComponentRetryConfig = ComponentRetryConfig {
2092 max_retries: None,
2093 retry_exp_backoff: Duration::ZERO,
2094 };
2095}
2096
2097pub trait FunctionRegistry: Send + Sync {
2099 fn get_by_exported_function(
2100 &self,
2101 ffqn: &FunctionFqn,
2102 ) -> Option<(FunctionMetadata, ComponentId)>;
2103
2104 fn get_ret_type(&self, ffqn: &FunctionFqn) -> Option<TypeWrapperTopLevel> {
2107 self.get_by_exported_function(ffqn)
2108 .and_then(|(fn_meta, _)| {
2109 if let ReturnType::Extendable(ReturnTypeExtendable {
2110 type_wrapper_tl: type_wrapper,
2111 wit_type: _,
2112 }) = fn_meta.return_type
2113 {
2114 Some(type_wrapper)
2115 } else {
2116 None
2117 }
2118 })
2119 }
2120
2121 fn all_exports(&self) -> &[PackageIfcFns];
2122}
2123
2124#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_more::Display, PartialEq, Eq)]
2125#[display("{_0:?}")]
2126pub struct ExecutionMetadata(Option<hashbrown::HashMap<String, String>>);
2127
2128impl ExecutionMetadata {
2129 const LINKED_KEY: &str = "obelisk-tracing-linked";
2130 #[must_use]
2131 pub const fn empty() -> Self {
2132 Self(None)
2134 }
2135
2136 #[must_use]
2137 pub fn from_parent_span(less_specific: &Span) -> Self {
2138 ExecutionMetadata::create(less_specific, false)
2139 }
2140
2141 #[must_use]
2142 pub fn from_linked_span(less_specific: &Span) -> Self {
2143 ExecutionMetadata::create(less_specific, true)
2144 }
2145
2146 #[must_use]
2151 #[expect(clippy::items_after_statements)]
2152 fn create(span: &Span, link_marker: bool) -> Self {
2153 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2154 let mut metadata = Self(Some(hashbrown::HashMap::default()));
2155 let mut metadata_view = ExecutionMetadataInjectorView {
2156 metadata: &mut metadata,
2157 };
2158 fn inject(s: &Span, metadata_view: &mut ExecutionMetadataInjectorView) {
2160 opentelemetry::global::get_text_map_propagator(|propagator| {
2161 propagator.inject_context(&s.context(), metadata_view);
2162 });
2163 }
2164 inject(&Span::current(), &mut metadata_view);
2165 if metadata_view.is_empty() {
2166 inject(span, &mut metadata_view);
2168 }
2169 if link_marker {
2170 metadata_view.set(Self::LINKED_KEY, String::new());
2171 }
2172 metadata
2173 }
2174
2175 pub fn enrich(&self, span: &Span) {
2176 use opentelemetry::trace::TraceContextExt as _;
2177 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2178
2179 let metadata_view = ExecutionMetadataExtractorView { metadata: self };
2180 let otel_context = opentelemetry::global::get_text_map_propagator(|propagator| {
2181 propagator.extract(&metadata_view)
2182 });
2183 if metadata_view.get(Self::LINKED_KEY).is_some() {
2184 let linked_span_context = otel_context.span().span_context().clone();
2185 span.add_link(linked_span_context);
2186 } else {
2187 let _ = span.set_parent(otel_context);
2188 }
2189 }
2190}
2191
2192struct ExecutionMetadataInjectorView<'a> {
2193 metadata: &'a mut ExecutionMetadata,
2194}
2195
2196impl ExecutionMetadataInjectorView<'_> {
2197 fn is_empty(&self) -> bool {
2198 self.metadata
2199 .0
2200 .as_ref()
2201 .is_some_and(hashbrown::HashMap::is_empty)
2202 }
2203}
2204
2205impl opentelemetry::propagation::Injector for ExecutionMetadataInjectorView<'_> {
2206 fn set(&mut self, key: &str, value: String) {
2207 let key = format!("tracing:{key}");
2208 let map = if let Some(map) = self.metadata.0.as_mut() {
2209 map
2210 } else {
2211 self.metadata.0 = Some(hashbrown::HashMap::new());
2212 assert_matches!(&mut self.metadata.0, Some(map) => map)
2213 };
2214 map.insert(key, value);
2215 }
2216}
2217
2218struct ExecutionMetadataExtractorView<'a> {
2219 metadata: &'a ExecutionMetadata,
2220}
2221
2222impl opentelemetry::propagation::Extractor for ExecutionMetadataExtractorView<'_> {
2223 fn get(&self, key: &str) -> Option<&str> {
2224 self.metadata
2225 .0
2226 .as_ref()
2227 .and_then(|map| map.get(&format!("tracing:{key}")))
2228 .map(std::string::String::as_str)
2229 }
2230
2231 fn keys(&self) -> Vec<&str> {
2232 match &self.metadata.0.as_ref() {
2233 Some(map) => map
2234 .keys()
2235 .filter_map(|key| key.strip_prefix("tracing:"))
2236 .collect(),
2237 None => vec![],
2238 }
2239 }
2240}
2241
2242#[cfg(test)]
2243mod tests {
2244
2245 use rstest::rstest;
2246
2247 use crate::{
2248 ExecutionId, FunctionFqn, JoinSetId, JoinSetKind, StrVariant, prefixed_ulid::ExecutorId,
2249 };
2250 use std::{
2251 hash::{DefaultHasher, Hash, Hasher},
2252 str::FromStr,
2253 sync::Arc,
2254 };
2255
2256 #[test]
2257 fn ulid_parsing() {
2258 let generated = ExecutorId::generate();
2259 let str = generated.to_string();
2260 let parsed = str.parse().unwrap();
2261 assert_eq!(generated, parsed);
2262 }
2263
2264 #[test]
2265 fn execution_id_parsing_top_level() {
2266 let generated = ExecutionId::generate();
2267 let str = generated.to_string();
2268 let parsed = str.parse().unwrap();
2269 assert_eq!(generated, parsed);
2270 }
2271
2272 #[test]
2273 fn execution_id_with_one_level_should_parse() {
2274 let top_level = ExecutionId::generate();
2275 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2276 let first_child = ExecutionId::Derived(top_level.next_level(&join_set_id));
2277 let ser = first_child.to_string();
2278 assert_eq!(format!("{top_level}.n:name_1"), ser);
2279 let parsed = ExecutionId::from_str(&ser).unwrap();
2280 assert_eq!(first_child, parsed);
2281 }
2282
2283 #[test]
2284 fn execution_id_increment_twice() {
2285 let top_level = ExecutionId::generate();
2286 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2287 let first_child = top_level.next_level(&join_set_id);
2288 let second_child = ExecutionId::Derived(first_child.get_incremented());
2289 let ser = second_child.to_string();
2290 assert_eq!(format!("{top_level}.n:name_2"), ser);
2291 let parsed = ExecutionId::from_str(&ser).unwrap();
2292 assert_eq!(second_child, parsed);
2293 }
2294
2295 #[test]
2296 fn execution_id_next_level_twice() {
2297 let top_level = ExecutionId::generate();
2298 let join_set_id_outer =
2299 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("gg")).unwrap();
2300 let join_set_id_inner =
2301 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2302 let execution_id = ExecutionId::Derived(
2303 top_level
2304 .next_level(&join_set_id_outer)
2305 .get_incremented()
2306 .next_level(&join_set_id_inner)
2307 .get_incremented(),
2308 );
2309 let ser = execution_id.to_string();
2310 assert_eq!(format!("{top_level}.g:gg_2.o:oo_2"), ser);
2311 let parsed = ExecutionId::from_str(&ser).unwrap();
2312 assert_eq!(execution_id, parsed);
2313 }
2314
2315 #[test]
2316 fn execution_id_split_first_level() {
2317 let top_level = ExecutionId::generate();
2318 let join_set_id =
2319 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2320 let execution_id = top_level.next_level(&join_set_id);
2321 let (actual_top_level, actual_join_set) = execution_id.split_to_parts();
2322 assert_eq!(top_level, actual_top_level);
2323 assert_eq!(join_set_id, actual_join_set);
2324 }
2325
2326 #[rstest]
2327 fn execution_id_split_second_level(#[values(0, 1)] outer_idx: u64) {
2328 let top_level = ExecutionId::generate();
2329 let join_set_id_outer =
2330 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2331 let first_level = top_level
2332 .next_level(&join_set_id_outer)
2333 .get_incremented_by(outer_idx);
2334
2335 let join_set_id_inner =
2336 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("other")).unwrap();
2337 let second_level = first_level.next_level(&join_set_id_inner);
2338
2339 let (actual_first_level, actual_join_set) = second_level.split_to_parts();
2340 assert_eq!(ExecutionId::Derived(first_level), actual_first_level);
2341 assert_eq!(join_set_id_inner, actual_join_set);
2342 }
2343
2344 #[test]
2345 fn invalid_execution_id_should_fail_to_parse() {
2346 ExecutionId::from_str("E_01KBNHM5FQW81KP5Y3XMNX31K4.g:gg._2.o:oo_2").unwrap_err();
2347 }
2348
2349 #[test]
2350 fn execution_id_hash_should_be_stable() {
2351 let parent = ExecutionId::from_parts(1, 2);
2352 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2353 let sibling_1 = parent.next_level(&join_set_id);
2354 let sibling_2 = ExecutionId::Derived(sibling_1.get_incremented());
2355 let sibling_1 = ExecutionId::Derived(sibling_1);
2356 let join_set_id_inner =
2357 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2358 let child =
2359 ExecutionId::Derived(sibling_1.next_level(&join_set_id_inner).get_incremented());
2360 let parent = parent.random_seed();
2361 let sibling_1 = sibling_1.random_seed();
2362 let sibling_2 = sibling_2.random_seed();
2363 let child = child.random_seed();
2364 let vec = vec![parent, sibling_1, sibling_2, child];
2365 insta::assert_debug_snapshot!(vec);
2366 let set: hashbrown::HashSet<_> = vec.into_iter().collect();
2368 assert_eq!(4, set.len());
2369 }
2370
2371 #[test]
2372 fn hash_of_str_variants_should_be_equal() {
2373 let input = "foo";
2374 let left = StrVariant::Arc(Arc::from(input));
2375 let right = StrVariant::Static(input);
2376 assert_eq!(left, right);
2377 let mut left_hasher = DefaultHasher::new();
2378 left.hash(&mut left_hasher);
2379 let mut right_hasher = DefaultHasher::new();
2380 right.hash(&mut right_hasher);
2381 let left_hasher = left_hasher.finish();
2382 let right_hasher = right_hasher.finish();
2383 println!("left: {left_hasher:x}, right: {right_hasher:x}");
2384 assert_eq!(left_hasher, right_hasher);
2385 }
2386
2387 #[test]
2388 fn ffqn_from_tuple_with_version_should_work() {
2389 let ffqn = FunctionFqn::try_from_tuple("wasi:cli/run@0.2.0", "run").unwrap();
2390 assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2391 }
2392
2393 #[test]
2394 fn ffqn_from_str_with_version_should_work() {
2395 let ffqn = FunctionFqn::from_str("wasi:cli/run@0.2.0.run").unwrap();
2396 assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2397 }
2398
2399 #[tokio::test]
2400 async fn join_set_serde_should_be_consistent() {
2401 use crate::{JoinSetId, JoinSetKind};
2402 use strum::IntoEnumIterator;
2403 for kind in JoinSetKind::iter() {
2404 let join_set_id = JoinSetId::new(kind, StrVariant::from("name")).unwrap();
2405 let ser = serde_json::to_string(&join_set_id).unwrap();
2406 let deser = serde_json::from_str(&ser).unwrap();
2407 assert_eq!(join_set_id, deser);
2408 }
2409 }
2410}