1#[cfg(any(feature = "rusqlite", feature = "postgres"))]
2pub mod backtrace;
3pub mod component_id;
4mod error_conversions;
5#[cfg(feature = "postgres")]
6mod postgres_ext;
7#[cfg(feature = "rusqlite")]
8mod rusqlite_ext;
9pub mod storage;
10pub mod time;
11
12pub use crate::component_id::{
13 ComponentId, ComponentType, ContentDigest, InvalidNameError, check_name,
14};
15use ::serde::{Deserialize, Serialize};
16use indexmap::IndexMap;
17use opentelemetry::propagation::{Extractor, Injector};
18pub use prefixed_ulid::ExecutionId;
19use serde_json::Value;
20use std::collections::hash_map::DefaultHasher;
21use std::hash::BuildHasherDefault;
22use std::{
23 borrow::Borrow,
24 fmt::{Debug, Display, Write},
25 hash::Hash,
26 marker::PhantomData,
27 ops::Deref,
28 str::FromStr,
29 sync::Arc,
30 time::Duration,
31};
32use storage::{PendingStateFinishedError, PendingStateFinishedResultKind};
33use tracing::{Span, error};
34use val_json::{
35 type_wrapper::{TypeConversionError, TypeKey, TypeWrapper},
36 wast_val::{ValKey, WastVal, WastValWithType},
37 wast_val_ser::params,
38};
39use wasmtime::component::{Type, Val};
40
41pub const NAMESPACE_OBELISK: &str = "obelisk";
42const NAMESPACE_WASI: &str = "wasi";
43pub const SUFFIX_PKG_EXT: &str = "-obelisk-ext";
44pub const SUFFIX_PKG_SCHEDULE: &str = "-obelisk-schedule";
45pub const SUFFIX_PKG_STUB: &str = "-obelisk-stub";
46
47#[derive(
48 thiserror::Error, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
49)]
50#[error("{kind}")]
51pub struct FinishedExecutionFailure {
52 pub kind: ExecutionFailureKind,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub reason: Option<String>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub detail: Option<String>,
57}
58impl FinishedExecutionFailure {
59 #[must_use]
60 pub fn as_pending_state_finished_error(&self) -> PendingStateFinishedError {
61 PendingStateFinishedError::ExecutionFailure(self.kind)
62 }
63}
64
65#[derive(
66 Debug,
67 Clone,
68 Copy,
69 derive_more::Display,
70 PartialEq,
71 Eq,
72 Serialize,
73 Deserialize,
74 schemars::JsonSchema,
75)]
76#[serde(rename_all = "snake_case")]
77pub enum ExecutionFailureKind {
78 TimedOut,
80 NondeterminismDetected,
82 OutOfFuel,
84 Cancelled,
86 Uncategorized,
87}
88
89#[derive(
90 Debug,
91 Clone,
92 Copy,
93 derive_more::Display,
94 PartialEq,
95 Eq,
96 Serialize,
97 Deserialize,
98 schemars::JsonSchema,
99)]
100#[serde(rename_all = "snake_case")]
101pub enum TrapKind {
102 #[display("trap")]
103 Trap,
104 #[display("out of fuel")]
105 OutOfFuel,
106 #[display("host function error")]
107 HostFunctionError,
108}
109
110#[derive(Clone, Eq, derive_more::Display, schemars::JsonSchema)]
111#[schemars(with = "String")]
112pub enum StrVariant {
113 Static(&'static str),
114 Arc(Arc<str>),
115}
116
117impl StrVariant {
118 #[must_use]
119 pub const fn empty() -> StrVariant {
120 StrVariant::Static("")
121 }
122}
123
124impl From<String> for StrVariant {
125 fn from(value: String) -> Self {
126 StrVariant::Arc(Arc::from(value))
127 }
128}
129
130impl From<&'static str> for StrVariant {
131 fn from(value: &'static str) -> Self {
132 StrVariant::Static(value)
133 }
134}
135
136impl PartialEq for StrVariant {
137 fn eq(&self, other: &Self) -> bool {
138 match (self, other) {
139 (Self::Static(left), Self::Static(right)) => left == right,
140 (Self::Static(left), Self::Arc(right)) => *left == right.deref(),
141 (Self::Arc(left), Self::Arc(right)) => left == right,
142 (Self::Arc(left), Self::Static(right)) => left.deref() == *right,
143 }
144 }
145}
146
147impl Hash for StrVariant {
148 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
149 match self {
150 StrVariant::Static(val) => val.hash(state),
151 StrVariant::Arc(val) => {
152 let str: &str = val.deref();
153 str.hash(state);
154 }
155 }
156 }
157}
158
159impl Debug for StrVariant {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 Display::fmt(self, f)
162 }
163}
164
165impl Deref for StrVariant {
166 type Target = str;
167 fn deref(&self) -> &Self::Target {
168 match self {
169 Self::Arc(v) => v,
170 Self::Static(v) => v,
171 }
172 }
173}
174
175impl AsRef<str> for StrVariant {
176 fn as_ref(&self) -> &str {
177 match self {
178 Self::Arc(v) => v,
179 Self::Static(v) => v,
180 }
181 }
182}
183
184mod serde_strvariant {
185 use crate::StrVariant;
186 use serde::{
187 Deserialize, Deserializer, Serialize, Serializer,
188 de::{self, Visitor},
189 };
190 use std::{ops::Deref, sync::Arc};
191
192 impl Serialize for StrVariant {
193 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
194 where
195 S: Serializer,
196 {
197 serializer.serialize_str(self.deref())
198 }
199 }
200
201 impl<'de> Deserialize<'de> for StrVariant {
202 fn deserialize<D>(deserializer: D) -> Result<StrVariant, D::Error>
203 where
204 D: Deserializer<'de>,
205 {
206 deserializer.deserialize_str(StrVariantVisitor)
207 }
208 }
209
210 struct StrVariantVisitor;
211
212 impl Visitor<'_> for StrVariantVisitor {
213 type Value = StrVariant;
214
215 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
216 formatter.write_str("a string")
217 }
218
219 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
220 where
221 E: de::Error,
222 {
223 Ok(StrVariant::Arc(Arc::from(v)))
224 }
225 }
226}
227
228#[derive(Hash, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
229#[display("{value}")]
230#[serde(transparent)]
231pub struct Name<T> {
232 pub value: StrVariant,
233 #[serde(skip)]
234 phantom_data: PhantomData<fn(T) -> T>,
235}
236
237impl<T> Name<T> {
238 #[must_use]
239 pub fn new_arc(value: Arc<str>) -> Self {
240 Self {
241 value: StrVariant::Arc(value),
242 phantom_data: PhantomData,
243 }
244 }
245
246 #[must_use]
247 pub const fn new_static(value: &'static str) -> Self {
248 Self {
249 value: StrVariant::Static(value),
250 phantom_data: PhantomData,
251 }
252 }
253}
254
255impl<T> Debug for Name<T> {
256 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257 Display::fmt(&self, f)
258 }
259}
260
261impl<T> Deref for Name<T> {
262 type Target = str;
263
264 fn deref(&self) -> &Self::Target {
265 self.value.deref()
266 }
267}
268
269impl<T> Borrow<str> for Name<T> {
270 fn borrow(&self) -> &str {
271 self.deref()
272 }
273}
274
275impl<T> From<String> for Name<T> {
276 fn from(value: String) -> Self {
277 Self::new_arc(Arc::from(value))
278 }
279}
280
281#[derive(Clone, Copy, Debug, PartialEq, strum::EnumIter, derive_more::Display)]
282#[display("{}", self.suffix())]
283pub enum PackageExtension {
284 ObeliskExt,
285 ObeliskSchedule,
286 ObeliskStub,
287}
288impl PackageExtension {
289 fn suffix(&self) -> &'static str {
290 match self {
291 PackageExtension::ObeliskExt => SUFFIX_PKG_EXT,
292 PackageExtension::ObeliskSchedule => SUFFIX_PKG_SCHEDULE,
293 PackageExtension::ObeliskStub => SUFFIX_PKG_STUB,
294 }
295 }
296}
297
298#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
299#[cfg_attr(feature = "test", derive(Serialize))]
300pub struct PkgFqn {
301 pub namespace: String, pub package_name: String,
303 pub version: Option<String>,
304}
305impl PkgFqn {
306 #[must_use]
307 pub fn is_extension(&self) -> bool {
308 Self::is_package_name_ext(&self.package_name)
309 }
310
311 #[must_use]
312 pub fn split_ext(&self) -> Option<(PkgFqn, PackageExtension)> {
313 use strum::IntoEnumIterator;
314 for package_ext in PackageExtension::iter() {
315 if let Some(package_name) = self.package_name.strip_suffix(package_ext.suffix()) {
316 return Some((
317 PkgFqn {
318 namespace: self.namespace.clone(),
319 package_name: package_name.to_string(),
320 version: self.version.clone(),
321 },
322 package_ext,
323 ));
324 }
325 }
326 None
327 }
328
329 fn is_package_name_ext(package_name: &str) -> bool {
330 package_name.ends_with(SUFFIX_PKG_EXT)
331 || package_name.ends_with(SUFFIX_PKG_SCHEDULE)
332 || package_name.ends_with(SUFFIX_PKG_STUB)
333 }
334
335 fn fmt(&self, f: &mut dyn Write, delimiter: &'static str) -> std::fmt::Result {
336 let PkgFqn {
337 namespace,
338 package_name,
339 version,
340 } = self;
341 if let Some(version) = version {
342 write!(f, "{namespace}{delimiter}{package_name}@{version}")
343 } else {
344 write!(f, "{namespace}{delimiter}{package_name}")
345 }
346 }
347
348 #[must_use]
349 pub fn as_file_name(&self) -> String {
350 let mut buffer = String::new();
351 self.fmt(&mut buffer, "_").expect("writing to string");
352 buffer
353 }
354}
355impl Display for PkgFqn {
356 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357 self.fmt(f, ":")
358 }
359}
360
361#[derive(Hash, Clone, PartialEq, Eq)]
362pub struct IfcFqnMarker;
363
364pub type IfcFqnName = Name<IfcFqnMarker>; impl IfcFqnName {
367 #[must_use]
368 pub fn from_parts(
369 namespace: &str,
370 package_name: &str,
371 ifc_name: &str,
372 version: Option<&str>,
373 ) -> Self {
374 let mut str = format!("{namespace}:{package_name}/{ifc_name}");
375 if let Some(version) = version {
376 str += "@";
377 str += version;
378 }
379 Self::new_arc(Arc::from(str))
380 }
381
382 #[must_use]
383 pub fn from_pkg_fqn(pkg_fqn: &PkgFqn, ifc_name: &str) -> Self {
384 let mut str = format!(
385 "{namespace}:{package_name}/{ifc_name}",
386 namespace = pkg_fqn.namespace,
387 package_name = pkg_fqn.package_name
388 );
389 if let Some(version) = &pkg_fqn.version {
390 str += "@";
391 str += version;
392 }
393 Self::new_arc(Arc::from(str))
394 }
395
396 #[must_use]
397 pub fn namespace(&self) -> &str {
398 self.deref().split_once(':').unwrap().0
399 }
400
401 #[must_use]
402 pub fn package_name(&self) -> &str {
403 let after_colon = self.deref().split_once(':').unwrap().1;
404 after_colon.split_once('/').unwrap().0
405 }
406
407 #[must_use]
408 pub fn version(&self) -> Option<&str> {
409 self.deref().split_once('@').map(|(_, version)| version)
410 }
411
412 #[must_use]
413 pub fn pkg_fqn_name(&self) -> PkgFqn {
414 let (namespace, rest) = self.deref().split_once(':').unwrap();
415 let (package_name, rest) = rest.split_once('/').unwrap();
416 let version = rest.split_once('@').map(|(_, version)| version);
417 PkgFqn {
418 namespace: namespace.to_string(),
419 package_name: package_name.to_string(),
420 version: version.map(std::string::ToString::to_string),
421 }
422 }
423
424 #[must_use]
425 pub fn ifc_name(&self) -> &str {
426 let after_colon = self.deref().split_once(':').unwrap().1;
427 let after_slash = after_colon.split_once('/').unwrap().1;
428 after_slash
429 .split_once('@')
430 .map_or(after_slash, |(ifc, _)| ifc)
431 }
432
433 #[must_use]
434 pub fn is_extension(&self) -> bool {
436 PkgFqn::is_package_name_ext(self.package_name())
437 }
438
439 #[must_use]
440 pub fn package_strip_obelisk_ext_suffix(&self) -> Option<&str> {
441 self.package_name().strip_suffix(SUFFIX_PKG_EXT)
442 }
443
444 #[must_use]
445 pub fn package_strip_obelisk_schedule_suffix(&self) -> Option<&str> {
446 self.package_name().strip_suffix(SUFFIX_PKG_SCHEDULE)
447 }
448
449 #[must_use]
450 pub fn package_strip_obelisk_stub_suffix(&self) -> Option<&str> {
451 self.package_name().strip_suffix(SUFFIX_PKG_STUB)
452 }
453
454 #[must_use]
455 pub fn is_namespace_obelisk(&self) -> bool {
456 self.namespace() == NAMESPACE_OBELISK
457 }
458
459 #[must_use]
460 pub fn is_namespace_wasi(&self) -> bool {
461 self.namespace() == NAMESPACE_WASI
462 }
463}
464impl FromStr for IfcFqnName {
465 type Err = IfcFqnParseError;
466
467 fn from_str(s: &str) -> Result<Self, Self::Err> {
468 let input = s.to_string();
469
470 let (namespace, rest) = s
471 .split_once(':')
472 .ok_or_else(|| IfcFqnParseError::MissingNamespace(input.clone()))?;
473 if namespace.is_empty() {
474 return Err(IfcFqnParseError::EmptyNamespace(input));
475 }
476 let (package_and_ifc, version) = match rest.split_once('@') {
477 Some((left, ver)) => (left, Some(ver)),
478 None => (rest, None),
479 };
480 let (package_name, ifc_name) = package_and_ifc
481 .split_once('/')
482 .ok_or_else(|| IfcFqnParseError::MissingPackageOrIfc(input.clone()))?;
483 if package_name.is_empty() {
484 return Err(IfcFqnParseError::EmptyPackageName(input));
485 }
486 if ifc_name.is_empty() {
487 return Err(IfcFqnParseError::EmptyIfcName(input));
488 }
489 if let Some(v) = version
490 && v.is_empty()
491 {
492 return Err(IfcFqnParseError::EmptyVersion(input));
493 }
494 Ok(Self::new_arc(Arc::from(s)))
495 }
496}
497
498#[derive(Debug, thiserror::Error, Clone)]
499pub enum IfcFqnParseError {
500 #[error("missing namespace separator ':' in `{0}`")]
501 MissingNamespace(String),
502
503 #[error("namespace is empty in `{0}`")]
504 EmptyNamespace(String),
505
506 #[error("missing package/ifc separator '/' in `{0}`")]
507 MissingPackageOrIfc(String),
508
509 #[error("package name is empty in `{0}`")]
510 EmptyPackageName(String),
511
512 #[error("ifc name is empty in `{0}`")]
513 EmptyIfcName(String),
514
515 #[error("version is empty in `{0}`")]
516 EmptyVersion(String),
517}
518
519#[derive(Hash, Clone, PartialEq, Eq)]
520pub struct FnMarker;
521
522pub type FnName = Name<FnMarker>;
523
524#[derive(
525 Hash,
526 Clone,
527 PartialEq,
528 Eq,
529 serde_with::SerializeDisplay,
530 serde_with::DeserializeFromStr,
531 schemars::JsonSchema,
532)]
533#[schemars(with = "String")]
534pub struct FunctionFqn {
535 pub ifc_fqn: IfcFqnName,
537 pub function_name: FnName,
538}
539
540impl FunctionFqn {
541 #[must_use]
542 pub fn new_arc(ifc_fqn: Arc<str>, function_name: Arc<str>) -> Self {
543 Self {
544 ifc_fqn: Name::new_arc(ifc_fqn),
545 function_name: Name::new_arc(function_name),
546 }
547 }
548
549 #[must_use]
550 pub const fn new_static(ifc_fqn: &'static str, function_name: &'static str) -> Self {
551 Self {
552 ifc_fqn: Name::new_static(ifc_fqn),
553 function_name: Name::new_static(function_name),
554 }
555 }
556
557 #[must_use]
558 pub const fn new_static_tuple(tuple: (&'static str, &'static str)) -> Self {
559 Self::new_static(tuple.0, tuple.1)
560 }
561
562 pub fn try_from_tuple(
563 ifc_fqn: &str,
564 function_name: &str,
565 ) -> Result<Self, FunctionFqnParseError> {
566 IfcFqnName::from_str(ifc_fqn)?;
568
569 if function_name.contains('.') {
570 Err(FunctionFqnParseError::DelimiterFoundInFunctionName(
571 function_name.to_string(),
572 ))
573 } else {
574 Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
575 }
576 }
577}
578
579#[derive(Debug, thiserror::Error)]
580pub enum FunctionFqnParseError {
581 #[error(transparent)]
582 IfcFqnParseError(#[from] IfcFqnParseError),
583 #[error("delimiter `.` between interface FQN and function name not found in `{0}`")]
584 DelimiterNotFound(String),
585 #[error("delimiter `.` found in function name `{0}`")]
586 DelimiterFoundInFunctionName(String),
587}
588
589impl FromStr for FunctionFqn {
590 type Err = FunctionFqnParseError;
591
592 fn from_str(value: &str) -> Result<Self, Self::Err> {
593 if let Some((ifc_fqn, function_name)) = value.rsplit_once('.') {
594 IfcFqnName::from_str(ifc_fqn)?;
596 Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
597 } else {
598 Err(FunctionFqnParseError::DelimiterNotFound(value.to_string()))
599 }
600 }
601}
602
603impl Display for FunctionFqn {
604 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605 write!(
606 f,
607 "{ifc_fqn}.{function_name}",
608 ifc_fqn = self.ifc_fqn,
609 function_name = self.function_name
610 )
611 }
612}
613
614impl Debug for FunctionFqn {
615 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
616 Display::fmt(&self, f)
617 }
618}
619
620#[cfg(any(test, feature = "test"))]
621impl<'a> arbitrary::Arbitrary<'a> for FunctionFqn {
622 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
623 let illegal = [':', '@', '.'];
624 let namespace = format!("ns{}", u.arbitrary::<String>()?.replace(illegal, "")); let pkg_name = format!("pkg{}", u.arbitrary::<String>()?.replace(illegal, ""));
627 let ifc_name = format!("ifc{}", u.arbitrary::<String>()?.replace(illegal, ""));
628 let fn_name = format!("fn{}", u.arbitrary::<String>()?.replace(illegal, ""));
629
630 Ok(FunctionFqn::new_arc(
631 Arc::from(format!("{namespace}:{pkg_name}/{ifc_name}")),
632 Arc::from(fn_name),
633 ))
634 }
635}
636
637#[derive(
638 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, schemars::JsonSchema,
639)]
640pub struct TypeWrapperTopLevel {
641 pub ok: Option<Box<TypeWrapper>>,
642 pub err: Option<Box<TypeWrapper>>,
643}
644impl TypeWrapperTopLevel {
645 #[must_use]
646 pub fn is_result_of_units(&self) -> bool {
647 self.ok.is_none() && self.err.is_none()
648 }
649}
650impl From<TypeWrapperTopLevel> for TypeWrapper {
651 fn from(value: TypeWrapperTopLevel) -> TypeWrapper {
652 TypeWrapper::Result {
653 ok: value.ok,
654 err: value.err,
655 }
656 }
657}
658
659#[derive(
660 Clone, derive_more::Debug, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema,
661)]
662#[serde(rename_all = "snake_case")]
663pub enum SupportedFunctionReturnValue {
664 Ok(Option<WastValWithType>),
665 Err(Option<WastValWithType>),
666 ExecutionFailure(FinishedExecutionFailure),
667}
668impl Display for SupportedFunctionReturnValue {
669 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670 match self.as_pending_state_finished_result() {
671 PendingStateFinishedResultKind::Ok => write!(f, "completed successfully"),
672 PendingStateFinishedResultKind::Err(err) => write!(f, "{err}"),
673 }
674 }
675}
676impl SupportedFunctionReturnValue {
677 #[must_use]
678 pub fn is_permanent_variant(&self) -> bool {
679 match self {
680 SupportedFunctionReturnValue::Err(Some(WastValWithType {
681 value: val_json::wast_val::WastVal::Variant(key, _),
682 ..
683 })) => key.as_snake_str().contains("permanent"),
684 _ => false,
685 }
686 }
687}
688
689pub const SUPPORTED_RETURN_VALUE_OK_EMPTY: SupportedFunctionReturnValue =
690 SupportedFunctionReturnValue::Ok(None);
691
692#[derive(Debug, thiserror::Error)]
693pub enum ResultParsingError {
694 #[error("return value must not be empty")]
695 NoValue,
696 #[error("return value cannot be parsed, multi-value results are not supported")]
697 MultiValue,
698 #[error("return value cannot be parsed, {0}")]
699 TypeConversionError(val_json::type_wrapper::TypeConversionError),
700 #[error(transparent)]
701 ResultParsingErrorFromVal(ResultParsingErrorFromVal),
702}
703
704#[derive(Debug, thiserror::Error)]
705pub enum ResultParsingErrorFromVal {
706 #[error("return value cannot be parsed, {0}")]
707 WastValConversionError(val_json::wast_val::WastValConversionError),
708 #[error("top level type must be a result")]
709 TopLevelTypeMustBeAResult,
710 #[error("value does not type check - {0}")]
711 TypeCheckError(String),
712}
713
714impl SupportedFunctionReturnValue {
715 pub fn new_from_iterator<
716 I: ExactSizeIterator<Item = (wasmtime::component::Val, wasmtime::component::Type)>,
717 >(
718 mut iter: I,
719 ) -> Result<Self, ResultParsingError> {
720 if iter.len() == 0 {
721 Err(ResultParsingError::NoValue)
722 } else if iter.len() == 1 {
723 let (val, r#type) = iter.next().unwrap();
724 let r#type =
725 TypeWrapper::try_from(r#type).map_err(ResultParsingError::TypeConversionError)?;
726 Self::from_val_and_type_wrapper(val, r#type)
727 .map_err(ResultParsingError::ResultParsingErrorFromVal)
728 } else {
729 Err(ResultParsingError::MultiValue)
730 }
731 }
732
733 pub fn new(
734 val: wasmtime::component::Val,
735 r#type: wasmtime::component::Type,
736 ) -> Result<Self, ResultParsingError> {
737 let r#type =
738 TypeWrapper::try_from(r#type).map_err(ResultParsingError::TypeConversionError)?;
739 Self::from_val_and_type_wrapper(val, r#type)
740 .map_err(ResultParsingError::ResultParsingErrorFromVal)
741 }
742
743 #[expect(clippy::result_unit_err)]
746 pub fn from_wast_val_with_type(
747 value: WastValWithType,
748 ) -> Result<SupportedFunctionReturnValue, ()> {
749 match value {
750 WastValWithType {
752 r#type: TypeWrapper::Result { ok: None, err: _ },
753 value: WastVal::Result(Ok(None)),
754 } => Ok(SupportedFunctionReturnValue::Ok(None)),
755
756 WastValWithType {
758 r#type:
759 TypeWrapper::Result {
760 ok: Some(ok),
761 err: _,
762 },
763 value: WastVal::Result(Ok(Some(value))),
764 } => Ok(SupportedFunctionReturnValue::Ok(Some(WastValWithType {
765 r#type: *ok,
766 value: *value,
767 }))),
768
769 WastValWithType {
771 r#type: TypeWrapper::Result { ok: _, err: None },
772 value: WastVal::Result(Err(None)),
773 } => Ok(SupportedFunctionReturnValue::Err(None)),
774
775 WastValWithType {
777 r#type:
778 TypeWrapper::Result {
779 ok: _,
780 err: Some(err),
781 },
782 value: WastVal::Result(Err(Some(value))),
783 } => Ok(SupportedFunctionReturnValue::Err(Some(WastValWithType {
784 r#type: *err,
785 value: *value,
786 }))),
787
788 _ => Err(()),
789 }
790 }
791
792 pub fn from_val_and_type_wrapper(
793 value: wasmtime::component::Val,
794 ty: TypeWrapper,
795 ) -> Result<Self, ResultParsingErrorFromVal> {
796 let TypeWrapper::Result { ok, err } = ty else {
797 return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
798 };
799 let ty = TypeWrapperTopLevel { ok, err };
800 Self::from_val_and_type_wrapper_tl(value, ty)
801 }
802
803 pub fn from_val_and_type_wrapper_tl(
804 value: wasmtime::component::Val,
805 ty: TypeWrapperTopLevel,
806 ) -> Result<Self, ResultParsingErrorFromVal> {
807 let wasmtime::component::Val::Result(value) = value else {
808 return Err(ResultParsingErrorFromVal::TopLevelTypeMustBeAResult);
809 };
810
811 match (ty.ok, ty.err, value) {
812 (None, _, Ok(None)) => Ok(SupportedFunctionReturnValue::Ok(None)),
813 (Some(ok_type), _, Ok(Some(value))) => {
814 Ok(SupportedFunctionReturnValue::Ok(Some(WastValWithType {
815 r#type: *ok_type,
816 value: WastVal::try_from(*value)
817 .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
818 })))
819 }
820 (_, None, Err(None)) => Ok(SupportedFunctionReturnValue::Err(None)),
821 (_, Some(err_type), Err(Some(value))) => {
822 Ok(SupportedFunctionReturnValue::Err(Some(WastValWithType {
823 r#type: *err_type,
824 value: WastVal::try_from(*value)
825 .map_err(ResultParsingErrorFromVal::WastValConversionError)?,
826 })))
827 }
828 (ok_type, err_type, value) => Err(ResultParsingErrorFromVal::TypeCheckError(format!(
829 "invalid combination - ok type: {ok_type:?}, err type: {err_type:?}, value: {value:?}"
830 ))),
831 }
832 }
833
834 #[must_use]
835 pub fn into_wast_val(self, get_return_type: impl FnOnce() -> TypeWrapperTopLevel) -> WastVal {
836 WastVal::Result(self.into_wast_val_res(get_return_type))
837 }
838
839 pub fn into_wast_val_res(
840 self,
841 get_return_type: impl FnOnce() -> TypeWrapperTopLevel,
842 ) -> Result<Option<Box<WastVal>>, Option<Box<WastVal>>> {
843 match self {
844 SupportedFunctionReturnValue::Ok(None) => Ok(None),
845 SupportedFunctionReturnValue::Ok(Some(v)) => Ok(Some(Box::new(v.value))),
846 SupportedFunctionReturnValue::Err(None) => Err(None),
847 SupportedFunctionReturnValue::Err(Some(v)) => Err(Some(Box::new(v.value))),
848 SupportedFunctionReturnValue::ExecutionFailure(_) => {
849 Err(Self::execution_error_to_wast_val_err(&get_return_type()))
850 }
851 }
852 }
853
854 fn execution_error_to_wast_val_err(ret_type: &TypeWrapperTopLevel) -> Option<Box<WastVal>> {
855 match ret_type {
856 TypeWrapperTopLevel { ok: _, err: None } => None,
857 TypeWrapperTopLevel {
858 ok: _,
859 err: Some(inner),
860 } => match inner.as_ref() {
861 TypeWrapper::String => Some(Box::new(WastVal::String(
862 EXECUTION_FAILED_STRING_OR_VARIANT.to_string(),
863 ))),
864 TypeWrapper::Variant(variants)
865 if variants.get(&TypeKey::new_kebab(EXECUTION_FAILED_STRING_OR_VARIANT))
866 == Some(&None) =>
867 {
868 Some(Box::new(WastVal::Variant(
869 ValKey::from_kebab(EXECUTION_FAILED_STRING_OR_VARIANT),
870 None,
871 )))
872 }
873 _ => {
874 unreachable!(
875 "unexpected top-level return type {ret_type:?} cannot be `ReturnTypeExtendable`"
876 )
877 }
878 },
879 }
880 }
881
882 #[must_use]
883 pub fn as_pending_state_finished_result(&self) -> PendingStateFinishedResultKind {
884 match self {
885 SupportedFunctionReturnValue::Ok(_) => PendingStateFinishedResultKind::Ok,
886 SupportedFunctionReturnValue::Err(_) => {
887 PendingStateFinishedResultKind::Err(PendingStateFinishedError::Error)
888 }
889 SupportedFunctionReturnValue::ExecutionFailure(err) => {
890 PendingStateFinishedResultKind::Err(err.as_pending_state_finished_error())
891 }
892 }
893 }
894}
895
896#[derive(Debug, Clone, schemars::JsonSchema)]
897#[schemars(with = "Vec<serde_json::Value>")]
898pub struct Params(ParamsInternal);
899
900#[derive(derive_more::Debug, Clone)]
901enum ParamsInternal {
902 JsonValues(Arc<[Value]>),
903 Vals {
904 vals: Arc<[wasmtime::component::Val]>,
905 #[debug(skip)]
906 json_vals_cache: Arc<std::sync::RwLock<Option<Arc<[Value]>>>>, },
908 Empty,
909}
910
911pub const SUFFIX_FN_SUBMIT: &str = "-submit";
912pub const SUFFIX_FN_AWAIT_NEXT: &str = "-await-next";
913pub const SUFFIX_FN_SCHEDULE: &str = "-schedule";
914pub const SUFFIX_FN_STUB: &str = "-stub";
915pub const SUFFIX_FN_GET: &str = "-get";
916
917#[derive(
918 Debug,
919 Clone,
920 Copy,
921 serde::Serialize,
922 serde::Deserialize,
923 PartialEq,
924 Eq,
925 strum::EnumIter,
926 schemars::JsonSchema,
927)]
928#[serde(rename_all = "snake_case")]
929pub enum FunctionExtension {
930 Submit,
931 AwaitNext,
932 Schedule,
933 Stub,
934 Get,
935}
936impl FunctionExtension {
937 #[must_use]
938 pub fn suffix(&self) -> &'static str {
939 match self {
940 FunctionExtension::Submit => SUFFIX_FN_SUBMIT,
941 FunctionExtension::AwaitNext => SUFFIX_FN_AWAIT_NEXT,
942 FunctionExtension::Schedule => SUFFIX_FN_SCHEDULE,
943 FunctionExtension::Stub => SUFFIX_FN_STUB,
944 FunctionExtension::Get => SUFFIX_FN_GET,
945 }
946 }
947
948 #[must_use]
949 pub fn belongs_to(&self, pkg_ext: PackageExtension) -> bool {
950 matches!(
951 (pkg_ext, self),
952 (
953 PackageExtension::ObeliskExt,
954 FunctionExtension::Submit | FunctionExtension::AwaitNext | FunctionExtension::Get
955 ) | (
956 PackageExtension::ObeliskSchedule,
957 FunctionExtension::Schedule
958 ) | (PackageExtension::ObeliskStub, FunctionExtension::Stub)
959 )
960 }
961}
962
963#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
964pub struct FunctionMetadata {
965 pub ffqn: FunctionFqn,
966 pub parameter_types: ParameterTypes,
967 pub return_type: ReturnType,
968 pub extension: Option<FunctionExtension>,
969 pub submittable: bool,
971}
972impl FunctionMetadata {
973 #[must_use]
974 pub fn split_extension(&self) -> Option<(&str, FunctionExtension)> {
975 self.extension.map(|extension| {
976 let prefix = self
977 .ffqn
978 .function_name
979 .value
980 .strip_suffix(extension.suffix())
981 .unwrap_or_else(|| {
982 panic!(
983 "extension function {} must end with expected suffix {}",
984 self.ffqn.function_name,
985 extension.suffix()
986 )
987 });
988 (prefix, extension)
989 })
990 }
991}
992impl Display for FunctionMetadata {
993 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
994 write!(
995 f,
996 "{ffqn}: func{params} -> {return_type}",
997 ffqn = self.ffqn,
998 params = self.parameter_types,
999 return_type = self.return_type,
1000 )
1001 }
1002}
1003
1004pub mod serde_params {
1005 use crate::{Params, ParamsInternal};
1006 use serde::de::{SeqAccess, Visitor};
1007 use serde::ser::SerializeSeq;
1008 use serde::{Deserialize, Serialize};
1009 use serde_json::Value;
1010 use std::sync::Arc;
1011 use val_json::wast_val::WastVal;
1012
1013 impl Serialize for Params {
1014 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1015 where
1016 S: ::serde::Serializer,
1017 {
1018 let serialize_json_values = |slice: &[serde_json::Value], serializer: S| {
1019 let mut seq = serializer.serialize_seq(Some(slice.len()))?;
1020 for item in slice {
1021 seq.serialize_element(item)?;
1022 }
1023 seq.end()
1024 };
1025 match &self.0 {
1026 ParamsInternal::Vals {
1027 vals,
1028 json_vals_cache,
1029 } => {
1030 let guard = json_vals_cache.read().unwrap();
1031 if let Some(slice) = &*guard {
1032 serialize_json_values(slice, serializer)
1033 } else {
1034 drop(guard);
1035 let mut json_vals = Vec::with_capacity(vals.len());
1036 for val in vals.iter() {
1037 let value = WastVal::try_from(val.clone())
1038 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
1039 let value = serde_json::to_value(&value)
1040 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
1041 json_vals.push(value);
1042 }
1043 let res = serialize_json_values(&json_vals, serializer);
1044 *json_vals_cache.write().unwrap() = Some(Arc::from(json_vals));
1045 res
1046 }
1047 }
1048 ParamsInternal::Empty => serializer.serialize_seq(Some(0))?.end(),
1049 ParamsInternal::JsonValues(vec) => serialize_json_values(vec, serializer),
1050 }
1051 }
1052 }
1053
1054 pub struct VecVisitor;
1055
1056 impl<'de> Visitor<'de> for VecVisitor {
1057 type Value = Vec<Value>;
1058
1059 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
1060 formatter.write_str("a sequence of `Value`")
1061 }
1062
1063 #[inline]
1064 fn visit_seq<V>(self, mut visitor: V) -> Result<Self::Value, V::Error>
1065 where
1066 V: SeqAccess<'de>,
1067 {
1068 let mut vec = Vec::new();
1069 while let Some(elem) = visitor.next_element()? {
1070 vec.push(elem);
1071 }
1072 Ok(vec)
1073 }
1074 }
1075
1076 impl<'de> Deserialize<'de> for Params {
1077 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1078 where
1079 D: serde::Deserializer<'de>,
1080 {
1081 let vec: Vec<Value> = deserializer.deserialize_seq(VecVisitor)?;
1082 if vec.is_empty() {
1083 Ok(Self(ParamsInternal::Empty))
1084 } else {
1085 Ok(Self(ParamsInternal::JsonValues(Arc::from(vec))))
1086 }
1087 }
1088 }
1089}
1090
1091#[derive(Debug, thiserror::Error)]
1092pub enum ParamsParsingError {
1093 #[error("parameters cannot be parsed, cannot convert type of {idx}-th parameter")]
1094 ParameterTypeError {
1095 idx: usize,
1096 err: TypeConversionError,
1097 },
1098 #[error("parameters cannot be deserialized: {0}")]
1099 ParamsDeserializationError(#[source] serde_json::Error),
1100 #[error("parameter cardinality mismatch, expected: {expected}, specified: {specified}")]
1101 ParameterCardinalityMismatch { expected: usize, specified: usize },
1102}
1103
1104impl ParamsParsingError {
1105 #[must_use]
1106 pub fn detail(&self) -> Option<String> {
1107 match self {
1108 ParamsParsingError::ParameterTypeError { err, .. } => Some(format!("{err:?}")),
1109 ParamsParsingError::ParamsDeserializationError(err) => Some(format!("{err:?}")),
1110 ParamsParsingError::ParameterCardinalityMismatch { .. } => None,
1111 }
1112 }
1113}
1114
1115impl Params {
1116 #[must_use]
1117 pub const fn empty() -> Self {
1118 Self(ParamsInternal::Empty)
1119 }
1120
1121 #[must_use]
1122 pub fn from_wasmtime(vals: Arc<[wasmtime::component::Val]>) -> Self {
1123 if vals.is_empty() {
1124 Self::empty()
1125 } else {
1126 Self(ParamsInternal::Vals {
1127 vals,
1128 json_vals_cache: Arc::default(),
1129 })
1130 }
1131 }
1132
1133 #[cfg(any(test, feature = "test"))]
1134 #[must_use]
1135 pub fn from_json_values_test(vec: Vec<Value>) -> Self {
1137 if vec.is_empty() {
1138 Self::empty()
1139 } else {
1140 Self(ParamsInternal::JsonValues(Arc::from(vec)))
1141 }
1142 }
1143
1144 pub fn from_json_values<'a>(
1145 values: Arc<[Value]>,
1146 param_types: impl ExactSizeIterator<Item = &'a TypeWrapper>,
1147 ) -> Result<Self, ParamsParsingError> {
1148 if param_types.len() != values.len() {
1149 return Err(ParamsParsingError::ParameterCardinalityMismatch {
1150 expected: param_types.len(),
1151 specified: values.len(),
1152 });
1153 }
1154 if values.is_empty() {
1155 Ok(Self::empty())
1156 } else {
1157 params::deserialize_values(&values, param_types)
1159 .map_err(ParamsParsingError::ParamsDeserializationError)?;
1160
1161 Ok(Self(ParamsInternal::JsonValues(values)))
1162 }
1163 }
1164
1165 pub fn as_vals<'a>(
1166 &self,
1167 param_types: impl ExactSizeIterator<Item = (&'a str, Type)>,
1168 ) -> Result<Arc<[wasmtime::component::Val]>, ParamsParsingError> {
1169 if param_types.len() != self.len() {
1170 return Err(ParamsParsingError::ParameterCardinalityMismatch {
1171 expected: param_types.len(),
1172 specified: self.len(),
1173 });
1174 }
1175 match &self.0 {
1176 ParamsInternal::JsonValues(json_vec) => {
1177 let param_types = param_types
1178 .enumerate()
1179 .map(|(idx, (_param_name, ty))| {
1180 TypeWrapper::try_from(ty).map_err(|err| (idx, err))
1181 })
1182 .collect::<Result<Vec<_>, _>>()
1183 .map_err(|(idx, err)| ParamsParsingError::ParameterTypeError { idx, err })?;
1184 Ok(params::deserialize_values(json_vec, param_types.iter())
1185 .map_err(ParamsParsingError::ParamsDeserializationError)?
1186 .into_iter()
1187 .map(Val::from)
1188 .collect())
1189 }
1190 ParamsInternal::Vals { vals, .. } => Ok(vals.clone()),
1191 ParamsInternal::Empty => Ok(Arc::from([])),
1192 }
1193 }
1194
1195 #[must_use]
1196 pub fn len(&self) -> usize {
1197 match &self.0 {
1198 ParamsInternal::JsonValues(vec) => vec.len(),
1199 ParamsInternal::Vals { vals, .. } => vals.len(),
1200 ParamsInternal::Empty => 0,
1201 }
1202 }
1203
1204 #[must_use]
1205 pub fn is_empty(&self) -> bool {
1206 self.len() == 0
1207 }
1208
1209 #[must_use]
1210 pub fn as_json_values(&self) -> Option<&[serde_json::Value]> {
1211 match &self.0 {
1212 ParamsInternal::Vals { .. } => None,
1213 ParamsInternal::Empty => Some(&[]),
1214 ParamsInternal::JsonValues(vec) => Some(vec),
1215 }
1216 }
1217}
1218
1219impl PartialEq for Params {
1220 fn eq(&self, other: &Self) -> bool {
1221 if self.is_empty() && other.is_empty() {
1222 return true;
1223 }
1224 if self.len() != other.len() {
1225 return false;
1226 }
1227
1228 match (&self.0, &other.0) {
1229 (ParamsInternal::JsonValues(l), ParamsInternal::JsonValues(r)) => l == r,
1230
1231 (
1232 ParamsInternal::Vals {
1233 vals: l,
1234 json_vals_cache: _,
1235 },
1236 ParamsInternal::Vals {
1237 vals: r,
1238 json_vals_cache: _,
1239 },
1240 ) => l == r,
1241
1242 (
1243 ParamsInternal::JsonValues(json_vals),
1244 ParamsInternal::Vals {
1245 vals,
1246 json_vals_cache,
1247 },
1248 )
1249 | (
1250 ParamsInternal::Vals {
1251 vals,
1252 json_vals_cache,
1253 },
1254 ParamsInternal::JsonValues(json_vals),
1255 ) => {
1256 let Ok(vec) = to_json(vals) else { return false };
1257 let equals = *json_vals == vec;
1258 *json_vals_cache.write().unwrap() = Some(vec);
1259 equals
1260 }
1261
1262 (ParamsInternal::Empty, _) | (_, ParamsInternal::Empty) => {
1263 unreachable!("zero length and different lengths handled earlier")
1264 }
1265 }
1266 }
1267}
1268impl Eq for Params {}
1269
1270fn to_json(vals: &[wasmtime::component::Val]) -> Result<Arc<[serde_json::Value]>, ()> {
1272 let mut vec = Vec::with_capacity(vals.len());
1273 for val in vals {
1274 let value = match WastVal::try_from(val.clone()) {
1275 Ok(ok) => ok,
1276 Err(err) => {
1277 error!("cannot compare Params, cannot convert to WastVal: {err:?}");
1278 return Err(());
1279 }
1280 };
1281 let value = match serde_json::to_value(&value) {
1282 Ok(ok) => ok,
1283 Err(err) => {
1284 error!("cannot compare Params, cannot convert to JSON: {err:?}");
1285 return Err(());
1286 }
1287 };
1288 vec.push(value);
1289 }
1290 Ok(Arc::from(vec))
1291}
1292
1293impl Display for Params {
1294 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1295 let render_json_vals =
1296 |f: &mut std::fmt::Formatter<'_>, json_vals: &[Value]| -> std::fmt::Result {
1297 for (i, json_value) in json_vals.iter().enumerate() {
1298 if i > 0 {
1299 write!(f, ", ")?;
1300 }
1301 write!(f, "{json_value}")?;
1302 }
1303 Ok(())
1304 };
1305 write!(f, "[")?;
1306 match &self.0 {
1307 ParamsInternal::Empty => {}
1308 ParamsInternal::JsonValues(json_vals) => {
1309 render_json_vals(f, json_vals)?;
1310 }
1311 ParamsInternal::Vals {
1312 vals,
1313 json_vals_cache,
1314 } => {
1315 let guard = json_vals_cache.read().unwrap();
1316 if let Some(json_vals) = &*guard {
1317 render_json_vals(f, json_vals)?;
1318 } else {
1319 drop(guard);
1320 let Ok(json_vals) = to_json(vals) else {
1321 return write!(f, "<serialization error>]"); };
1323 render_json_vals(f, &json_vals)?;
1324 *json_vals_cache.write().unwrap() = Some(json_vals);
1325 }
1326 }
1327 }
1328 write!(f, "]")
1329 }
1330}
1331
1332pub mod prefixed_ulid {
1333 use crate::{
1334 EXECUTION_ID_INFIX, EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX, JoinSetId,
1335 JoinSetIdParseError,
1336 };
1337 use serde_with::{DeserializeFromStr, SerializeDisplay};
1338 use sha2::{Digest, Sha256};
1339 use std::{
1340 fmt::{Debug, Display},
1341 marker::PhantomData,
1342 num::ParseIntError,
1343 str::FromStr,
1344 sync::Arc,
1345 };
1346 use ulid::Ulid;
1347
1348 #[derive(derive_more::Display, SerializeDisplay, DeserializeFromStr, schemars::JsonSchema)]
1349 #[schemars(with = "String")]
1350 #[derive_where::derive_where(Clone, Copy)]
1351 #[display("{}_{ulid}", Self::prefix())]
1352 pub struct PrefixedUlid<T: 'static> {
1353 ulid: Ulid,
1354 phantom_data: PhantomData<fn(T) -> T>,
1355 }
1356
1357 impl<T> PrefixedUlid<T> {
1358 const fn new(ulid: Ulid) -> Self {
1359 Self {
1360 ulid,
1361 phantom_data: PhantomData,
1362 }
1363 }
1364
1365 fn prefix() -> &'static str {
1366 std::any::type_name::<T>().rsplit("::").next().unwrap()
1367 }
1368 }
1369
1370 impl<T> PrefixedUlid<T> {
1371 #[must_use]
1372 pub fn generate() -> Self {
1373 Self::new(Ulid::new())
1374 }
1375
1376 #[must_use]
1377 pub const fn from_parts(timestamp_ms: u64, random: u128) -> Self {
1378 Self::new(Ulid::from_parts(timestamp_ms, random))
1379 }
1380
1381 #[must_use]
1382 pub fn timestamp_part(&self) -> u64 {
1383 self.ulid.timestamp_ms()
1384 }
1385
1386 #[must_use]
1387 pub fn random_part(&self) -> u128 {
1389 self.ulid.random()
1390 }
1391
1392 #[must_use]
1393 pub fn ulid(&self) -> Ulid {
1394 self.ulid
1395 }
1396 }
1397
1398 #[derive(Debug, thiserror::Error)]
1399 pub enum PrefixedUlidParseError {
1400 #[error("wrong prefix in `{input}`, expected prefix `{expected}`")]
1401 WrongPrefix { input: String, expected: String },
1402 #[error("cannot parse ULID suffix from `{input}`")]
1403 CannotParseUlid { input: String },
1404 }
1405
1406 mod impls {
1407 use super::{PrefixedUlid, PrefixedUlidParseError, Ulid};
1408 use std::{fmt::Debug, fmt::Display, hash::Hash, marker::PhantomData, str::FromStr};
1409
1410 impl<T> FromStr for PrefixedUlid<T> {
1411 type Err = PrefixedUlidParseError;
1412
1413 fn from_str(input: &str) -> Result<Self, Self::Err> {
1414 let prefix = Self::prefix();
1415 let mut input_chars = input.chars();
1416 for exp in prefix.chars() {
1417 if input_chars.next() != Some(exp) {
1418 return Err(PrefixedUlidParseError::WrongPrefix {
1419 input: input.to_string(),
1420 expected: format!("{prefix}_"),
1421 });
1422 }
1423 }
1424 if input_chars.next() != Some('_') {
1425 return Err(PrefixedUlidParseError::WrongPrefix {
1426 input: input.to_string(),
1427 expected: format!("{prefix}_"),
1428 });
1429 }
1430 let Ok(ulid) = Ulid::from_string(input_chars.as_str()) else {
1431 return Err(PrefixedUlidParseError::CannotParseUlid {
1432 input: input.to_string(),
1433 });
1434 };
1435 Ok(Self {
1436 ulid,
1437 phantom_data: PhantomData,
1438 })
1439 }
1440 }
1441
1442 impl<T> Debug for PrefixedUlid<T> {
1443 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1444 Display::fmt(&self, f)
1445 }
1446 }
1447
1448 impl<T> Hash for PrefixedUlid<T> {
1449 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1450 Self::prefix().hash(state);
1451 self.ulid.hash(state);
1452 self.phantom_data.hash(state);
1453 }
1454 }
1455
1456 impl<T> PartialEq for PrefixedUlid<T> {
1457 fn eq(&self, other: &Self) -> bool {
1458 self.ulid == other.ulid
1459 }
1460 }
1461
1462 impl<T> Eq for PrefixedUlid<T> {}
1463
1464 impl<T> PartialOrd for PrefixedUlid<T> {
1465 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1466 Some(self.cmp(other))
1467 }
1468 }
1469
1470 impl<T> Ord for PrefixedUlid<T> {
1471 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1472 self.ulid.cmp(&other.ulid)
1473 }
1474 }
1475 }
1476
1477 pub mod prefix {
1478 pub struct E;
1479 pub struct Exr;
1480 pub struct Run;
1481 pub struct Delay;
1482 pub struct Dep;
1483 }
1484
1485 pub type ExecutorId = PrefixedUlid<prefix::Exr>;
1486 pub type ExecutionIdTopLevel = PrefixedUlid<prefix::E>;
1487 pub type RunId = PrefixedUlid<prefix::Run>;
1488 pub type DelayIdTopLevel = PrefixedUlid<prefix::Delay>; pub type DeploymentId = PrefixedUlid<prefix::Dep>;
1490
1491 #[cfg(any(test, feature = "test"))]
1492 pub const DEPLOYMENT_ID_DUMMY: DeploymentId = DeploymentId::from_parts(0, 0);
1493
1494 #[cfg(any(test, feature = "test"))]
1495 pub const EXECUTION_ID_DUMMY: ExecutionId = ExecutionId::from_parts(0, 0);
1496
1497 #[cfg(any(test, feature = "test"))]
1498 impl<'a, T> arbitrary::Arbitrary<'a> for PrefixedUlid<T> {
1499 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1500 Ok(Self::new(ulid::Ulid::from_parts(
1501 u.arbitrary()?,
1502 u.arbitrary()?,
1503 )))
1504 }
1505 }
1506
1507 #[derive(
1508 Hash,
1509 PartialEq,
1510 Eq,
1511 PartialOrd,
1512 Ord,
1513 SerializeDisplay,
1514 DeserializeFromStr,
1515 Clone,
1516 schemars::JsonSchema,
1517 )]
1518 #[schemars(with = "String")]
1519 pub enum ExecutionId {
1520 TopLevel(ExecutionIdTopLevel),
1521 Derived(ExecutionIdDerived),
1522 }
1523
1524 #[derive(
1525 Hash,
1526 PartialEq,
1527 Eq,
1528 PartialOrd,
1529 Ord,
1530 Clone,
1531 SerializeDisplay,
1532 DeserializeFromStr,
1533 schemars::JsonSchema,
1534 )]
1535 #[schemars(with = "String")]
1536 pub struct ExecutionIdDerived {
1537 top_level: ExecutionIdTopLevel,
1538 infix: Arc<str>,
1539 idx: u64,
1540 }
1541 impl ExecutionIdDerived {
1542 #[must_use]
1543 pub fn get_incremented(&self) -> Self {
1544 self.get_incremented_by(1)
1545 }
1546 #[must_use]
1547 pub fn get_incremented_by(&self, count: u64) -> Self {
1548 ExecutionIdDerived {
1549 top_level: self.top_level,
1550 infix: self.infix.clone(),
1551 idx: self.idx + count,
1552 }
1553 }
1554 #[must_use]
1555 pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1556 let ExecutionIdDerived {
1557 top_level,
1558 infix,
1559 idx,
1560 } = self;
1561 let infix = Arc::from(format!(
1562 "{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}{EXECUTION_ID_INFIX}{join_set_id}"
1563 ));
1564 ExecutionIdDerived {
1565 top_level: *top_level,
1566 infix,
1567 idx: EXECUTION_ID_START_IDX,
1568 }
1569 }
1570 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1571 let ExecutionIdDerived {
1572 top_level,
1573 infix,
1574 idx,
1575 } = self;
1576 write!(
1577 f,
1578 "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}"
1579 )
1580 }
1581
1582 #[must_use]
1583 pub fn split_to_parts(&self) -> (ExecutionId, JoinSetId) {
1584 self.split_to_parts_res().expect("verified in from_str")
1585 }
1586
1587 fn split_to_parts_res(&self) -> Result<(ExecutionId, JoinSetId), DerivedIdSplitError> {
1588 if let Some((old_infix_and_index, join_set_id)) =
1592 self.infix.rsplit_once(EXECUTION_ID_INFIX)
1593 {
1594 let join_set_id = JoinSetId::from_str(join_set_id)
1595 .map_err(DerivedIdSplitError::JoinSetIdParseError)?;
1596 let Some((old_infix, old_idx)) =
1597 old_infix_and_index.rsplit_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1598 else {
1599 return Err(DerivedIdSplitError::CannotFindJoinSetDelimiter);
1600 };
1601 let parent = ExecutionIdDerived {
1602 top_level: self.top_level,
1603 infix: Arc::from(old_infix),
1604 idx: old_idx
1605 .parse()
1606 .map_err(DerivedIdSplitError::CannotParseOldIndex)?,
1607 };
1608 Ok((ExecutionId::Derived(parent), join_set_id))
1609 } else {
1610 Ok((
1611 ExecutionId::TopLevel(self.top_level),
1612 JoinSetId::from_str(&self.infix)
1613 .map_err(DerivedIdSplitError::JoinSetIdParseError)?,
1614 ))
1615 }
1616 }
1617 }
1618
1619 #[derive(Debug, thiserror::Error)]
1620 pub enum DerivedIdSplitError {
1621 #[error(transparent)]
1622 JoinSetIdParseError(JoinSetIdParseError),
1623 #[error("cannot parse index of parent execution - {0}")]
1624 CannotParseOldIndex(ParseIntError),
1625 #[error("cannot find join set delimiter")]
1626 CannotFindJoinSetDelimiter,
1627 }
1628
1629 impl Debug for ExecutionIdDerived {
1630 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1631 self.display_or_debug(f)
1632 }
1633 }
1634 impl Display for ExecutionIdDerived {
1635 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1636 self.display_or_debug(f)
1637 }
1638 }
1639 impl FromStr for ExecutionIdDerived {
1640 type Err = DerivedIdParseError;
1641
1642 fn from_str(input: &str) -> Result<Self, Self::Err> {
1643 let (top_level, infix, idx) = derived_from_str(input)?;
1644 let id = ExecutionIdDerived {
1645 top_level,
1646 infix,
1647 idx,
1648 };
1649 Ok(id)
1650 }
1651 }
1652
1653 fn derived_from_str<T: 'static>(
1654 input: &str,
1655 ) -> Result<(PrefixedUlid<T>, Arc<str>, u64), DerivedIdParseError> {
1656 let (prefix, full_suffix) = input
1658 .split_once(EXECUTION_ID_INFIX)
1659 .ok_or(DerivedIdParseError::FirstDelimiterNotFound)?;
1660
1661 let top_level =
1663 PrefixedUlid::from_str(prefix).map_err(DerivedIdParseError::PrefixedUlidParseError)?;
1664
1665 let mut last_idx = None;
1667 for segment in full_suffix.split(EXECUTION_ID_INFIX) {
1668 if segment.is_empty() {
1669 return Err(DerivedIdParseError::EmptySegment);
1670 }
1671 let (join_set_str, idx_str) = segment
1673 .split_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1674 .ok_or(DerivedIdParseError::SecondDelimiterNotFound)?;
1675 JoinSetId::from_str(join_set_str).map_err(DerivedIdParseError::JoinSetIdParseError)?;
1676 let idx = u64::from_str(idx_str).map_err(DerivedIdParseError::ParseIndexError)?;
1677 last_idx = Some((idx, idx_str));
1678 }
1679 let (idx, idx_str) = last_idx.expect("split must have returned at least one element");
1680 let infix = full_suffix
1681 .strip_suffix(idx_str)
1682 .expect("must have ended with index");
1683 let infix = infix
1684 .strip_suffix(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1685 .expect("must have ended with `_index`");
1686 Ok((top_level, Arc::from(infix), idx))
1687 }
1688
1689 #[cfg(any(test, feature = "test"))]
1690 impl<'a> arbitrary::Arbitrary<'a> for ExecutionIdDerived {
1691 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1692 let top_level = ExecutionId::TopLevel(ExecutionIdTopLevel::arbitrary(u)?);
1693 let join_set_id = JoinSetId::arbitrary(u)?;
1694 Ok(top_level.next_level(&join_set_id))
1695 }
1696 }
1697
1698 #[derive(Debug, thiserror::Error)]
1699 pub enum DerivedIdParseError {
1700 #[error(transparent)]
1701 PrefixedUlidParseError(PrefixedUlidParseError),
1702 #[error("cannot parse derived id - delimiter `{EXECUTION_ID_INFIX}` not found")]
1703 FirstDelimiterNotFound,
1704 #[error(
1705 "cannot parse derived id - delimiter `{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}` not found"
1706 )]
1707 SecondDelimiterNotFound,
1708 #[error(
1709 "cannot parse derived id - suffix after `{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}` must be a number"
1710 )]
1711 ParseIndexError(ParseIntError),
1712 #[error(transparent)]
1713 DerivedIdSplitError(DerivedIdSplitError),
1714 #[error(transparent)]
1715 JoinSetIdParseError(JoinSetIdParseError),
1716 #[error("empty segment")]
1717 EmptySegment,
1718 }
1719
1720 impl ExecutionId {
1721 #[must_use]
1722 pub fn generate() -> Self {
1723 ExecutionId::TopLevel(PrefixedUlid::generate())
1724 }
1725
1726 #[must_use]
1727 pub fn deterministic_at_unix_epoch(hash: &[u8; 32]) -> Self {
1731 let mut ts_bytes = [0u8; 8];
1736 ts_bytes[4..8].copy_from_slice(&hash[..4]); let timestamp_ms = u64::from_be_bytes(ts_bytes);
1739
1740 const RAND_LEN: usize = 10;
1743 const U128_BYTES_LEN: usize = std::mem::size_of::<u128>(); const START: usize = U128_BYTES_LEN - RAND_LEN;
1745 let mut rand_bytes = [0u8; U128_BYTES_LEN];
1746 rand_bytes[START..].copy_from_slice(&hash[4..4 + RAND_LEN]);
1747 let random = u128::from_be_bytes(rand_bytes);
1748
1749 ExecutionId::TopLevel(PrefixedUlid::new(Ulid::from_parts(timestamp_ms, random)))
1750 }
1751
1752 #[must_use]
1753 pub fn get_top_level(&self) -> ExecutionIdTopLevel {
1754 match &self {
1755 ExecutionId::TopLevel(prefixed_ulid) => *prefixed_ulid,
1756 ExecutionId::Derived(ExecutionIdDerived { top_level, .. }) => *top_level,
1757 }
1758 }
1759
1760 #[must_use]
1761 pub fn is_top_level(&self) -> bool {
1762 matches!(self, ExecutionId::TopLevel(_))
1763 }
1764
1765 #[must_use]
1766 pub fn random_seed(&self) -> u64 {
1767 let mut hasher = Sha256::new();
1768 hasher.update(self.get_top_level().ulid.0.to_le_bytes());
1770 if let ExecutionId::Derived(ExecutionIdDerived {
1771 top_level: _,
1772 infix,
1773 idx,
1774 }) = self
1775 {
1776 hasher.update(infix.as_bytes());
1778 hasher.update(idx.to_le_bytes());
1779 }
1780 let hash = hasher.finalize();
1781 u64::from_le_bytes(hash[..8].try_into().unwrap())
1783 }
1784
1785 #[must_use]
1786 pub const fn from_parts(timestamp_ms: u64, random_part: u128) -> Self {
1787 ExecutionId::TopLevel(ExecutionIdTopLevel::from_parts(timestamp_ms, random_part))
1788 }
1789
1790 #[must_use]
1791 pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1792 match &self {
1793 ExecutionId::TopLevel(top_level) => ExecutionIdDerived {
1794 top_level: *top_level,
1795 infix: Arc::from(join_set_id.to_string()),
1796 idx: EXECUTION_ID_START_IDX,
1797 },
1798 ExecutionId::Derived(derived) => derived.next_level(join_set_id),
1799 }
1800 }
1801
1802 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1803 match &self {
1804 ExecutionId::TopLevel(top_level) => Display::fmt(top_level, f),
1805 ExecutionId::Derived(derived) => Display::fmt(derived, f),
1806 }
1807 }
1808 }
1809
1810 const EXECUTION_ID_START_IDX: u64 = 1;
1811 pub const JOIN_SET_START_IDX: u64 = 1;
1812 const DELAY_ID_START_IDX: u64 = 1;
1813
1814 #[derive(Debug, thiserror::Error)]
1815 pub enum ExecutionIdParseError {
1816 #[error(transparent)]
1817 PrefixedUlidParseError(PrefixedUlidParseError),
1818 #[error(transparent)]
1819 DerivedIdParseError(DerivedIdParseError),
1820 }
1821
1822 impl FromStr for ExecutionId {
1823 type Err = ExecutionIdParseError;
1824
1825 fn from_str(input: &str) -> Result<Self, Self::Err> {
1826 if input.contains(EXECUTION_ID_INFIX) {
1827 ExecutionIdDerived::from_str(input)
1828 .map(ExecutionId::Derived)
1829 .map_err(ExecutionIdParseError::DerivedIdParseError)
1830 } else {
1831 Ok(ExecutionId::TopLevel(
1832 PrefixedUlid::from_str(input)
1833 .map_err(ExecutionIdParseError::PrefixedUlidParseError)?,
1834 ))
1835 }
1836 }
1837 }
1838
1839 #[derive(Debug, thiserror::Error)]
1840 pub enum ExecutionIdStructuralParseError {
1841 #[error(transparent)]
1842 ExecutionIdParseError(#[from] ExecutionIdParseError),
1843 #[error("execution-id must be a record with `id` field of type string")]
1844 TypeError,
1845 }
1846
1847 impl TryFrom<&wasmtime::component::Val> for ExecutionId {
1848 type Error = ExecutionIdStructuralParseError;
1849
1850 fn try_from(execution_id: &wasmtime::component::Val) -> Result<Self, Self::Error> {
1851 if let wasmtime::component::Val::Record(key_vals) = execution_id
1852 && key_vals.len() == 1
1853 && let Some((key, execution_id)) = key_vals.first()
1854 && key == "id"
1855 && let wasmtime::component::Val::String(execution_id) = execution_id
1856 {
1857 ExecutionId::from_str(execution_id)
1858 .map_err(ExecutionIdStructuralParseError::ExecutionIdParseError)
1859 } else {
1860 Err(ExecutionIdStructuralParseError::TypeError)
1861 }
1862 }
1863 }
1864
1865 impl Debug for ExecutionId {
1866 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1867 self.display_or_debug(f)
1868 }
1869 }
1870
1871 impl Display for ExecutionId {
1872 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1873 self.display_or_debug(f)
1874 }
1875 }
1876
1877 #[cfg(any(test, feature = "test"))]
1878 impl<'a> arbitrary::Arbitrary<'a> for ExecutionId {
1879 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1880 Ok(ExecutionId::TopLevel(PrefixedUlid::arbitrary(u)?))
1881 }
1882 }
1883
1884 #[derive(
1886 Hash,
1887 PartialEq,
1888 Eq,
1889 PartialOrd,
1890 Ord,
1891 Clone,
1892 SerializeDisplay,
1893 DeserializeFromStr,
1894 schemars::JsonSchema,
1895 )]
1896 #[schemars(with = "String")]
1897 pub struct DelayId {
1898 top_level: DelayIdTopLevel,
1899 infix: Arc<str>,
1900 idx: u64,
1901 }
1902 impl DelayId {
1903 #[must_use]
1904 pub fn new(execution_id: &ExecutionId, join_set_id: &JoinSetId) -> DelayId {
1905 Self::new_with_index(execution_id, join_set_id, DELAY_ID_START_IDX)
1906 }
1907
1908 #[must_use]
1909 pub fn new_with_index(
1910 execution_id: &ExecutionId,
1911 join_set_id: &JoinSetId,
1912 idx: u64,
1913 ) -> DelayId {
1914 let ExecutionIdDerived {
1915 top_level: PrefixedUlid { ulid, .. },
1916 infix,
1917 idx: _,
1918 } = execution_id.next_level(join_set_id);
1919 let top_level = DelayIdTopLevel::new(ulid);
1920 DelayId {
1921 top_level,
1922 infix,
1923 idx,
1924 }
1925 }
1926
1927 #[must_use]
1928 pub fn get_incremented(&self) -> Self {
1929 Self {
1930 top_level: self.top_level,
1931 infix: self.infix.clone(),
1932 idx: self.idx + 1,
1933 }
1934 }
1935
1936 #[must_use]
1937 pub fn split_to_parts(&self) -> (ExecutionId, JoinSetId) {
1938 self.split_to_parts_res().expect("verified in from_str")
1939 }
1940
1941 fn split_to_parts_res(&self) -> Result<(ExecutionId, JoinSetId), DerivedIdSplitError> {
1942 if let Some((old_infix_and_index, join_set_id)) =
1946 self.infix.rsplit_once(EXECUTION_ID_INFIX)
1947 {
1948 let join_set_id = JoinSetId::from_str(join_set_id)
1949 .map_err(DerivedIdSplitError::JoinSetIdParseError)?;
1950 let Some((old_infix, old_idx)) =
1951 old_infix_and_index.rsplit_once(EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX)
1952 else {
1953 return Err(DerivedIdSplitError::CannotFindJoinSetDelimiter);
1954 };
1955 let parent = ExecutionIdDerived {
1956 top_level: ExecutionIdTopLevel::new(self.top_level.ulid),
1957 infix: Arc::from(old_infix),
1958 idx: old_idx
1959 .parse()
1960 .map_err(DerivedIdSplitError::CannotParseOldIndex)?,
1961 };
1962 Ok((ExecutionId::Derived(parent), join_set_id))
1963 } else {
1964 Ok((
1965 ExecutionId::TopLevel(ExecutionIdTopLevel::new(self.top_level.ulid)),
1966 JoinSetId::from_str(&self.infix)
1967 .map_err(DerivedIdSplitError::JoinSetIdParseError)?,
1968 ))
1969 }
1970 }
1971
1972 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1973 let DelayId {
1974 top_level,
1975 infix,
1976 idx,
1977 } = self;
1978 write!(
1979 f,
1980 "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX}{idx}"
1981 )
1982 }
1983 }
1984
1985 pub mod delay_impl {
1986 use super::{DelayId, DerivedIdParseError, derived_from_str};
1987 use std::{
1988 fmt::{Debug, Display},
1989 str::FromStr,
1990 };
1991
1992 impl Debug for DelayId {
1993 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1994 self.display_or_debug(f)
1995 }
1996 }
1997
1998 impl Display for DelayId {
1999 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2000 self.display_or_debug(f)
2001 }
2002 }
2003
2004 impl FromStr for DelayId {
2005 type Err = DerivedIdParseError;
2006
2007 fn from_str(input: &str) -> Result<Self, Self::Err> {
2008 let (top_level, infix, idx) = derived_from_str(input)?;
2009 let id = DelayId {
2010 top_level,
2011 infix,
2012 idx,
2013 };
2014 Ok(id)
2015 }
2016 }
2017
2018 #[cfg(any(test, feature = "test"))]
2019 impl<'a> arbitrary::Arbitrary<'a> for DelayId {
2020 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
2021 use super::{ExecutionId, JoinSetId};
2022 let execution_id = ExecutionId::arbitrary(u)?;
2023 let mut join_set_id = JoinSetId::arbitrary(u)?;
2024 join_set_id.kind = crate::JoinSetKind::OneOff;
2025 Ok(DelayId::new(&execution_id, &join_set_id))
2026 }
2027 }
2028 }
2029}
2030
2031#[derive(
2032 Debug,
2033 Clone,
2034 PartialEq,
2035 Eq,
2036 Hash,
2037 derive_more::Display,
2038 serde_with::SerializeDisplay,
2039 serde_with::DeserializeFromStr,
2040 schemars::JsonSchema,
2041)]
2042#[schemars(with = "String")]
2043#[non_exhaustive] #[display("{kind}{JOIN_SET_ID_INFIX}{name}")]
2045pub struct JoinSetId {
2046 pub kind: JoinSetKind,
2047 pub name: StrVariant,
2048}
2049
2050impl JoinSetId {
2051 pub fn new(kind: JoinSetKind, name: StrVariant) -> Result<Self, InvalidNameError<JoinSetId>> {
2052 Ok(Self {
2053 kind,
2054 name: check_name(name, CHARSET_EXTRA_JSON_SET)?,
2055 })
2056 }
2057}
2058
2059pub const CHARSET_ALPHANUMERIC: &str =
2060 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
2061
2062#[derive(
2063 Debug,
2064 Clone,
2065 Copy,
2066 PartialEq,
2067 Eq,
2068 Hash,
2069 derive_more::Display,
2070 strum::EnumIter,
2071 schemars::JsonSchema,
2072)]
2073#[cfg_attr(any(test, feature = "test"), derive(arbitrary::Arbitrary))]
2074#[display("{}", self.as_code())]
2075pub enum JoinSetKind {
2076 OneOff,
2077 Named,
2078 Generated,
2079}
2080impl JoinSetKind {
2081 fn as_code(&self) -> &'static str {
2082 match self {
2083 JoinSetKind::OneOff => "o",
2084 JoinSetKind::Named => "n",
2085 JoinSetKind::Generated => "g",
2086 }
2087 }
2088}
2089impl FromStr for JoinSetKind {
2090 type Err = &'static str;
2091 fn from_str(s: &str) -> Result<Self, Self::Err> {
2092 use strum::IntoEnumIterator;
2093 Self::iter()
2094 .find(|variant| s == variant.as_code())
2095 .ok_or("unknown join set kind")
2096 }
2097}
2098
2099pub(crate) const EXECUTION_ID_INFIX: char = '.';
2100pub(crate) const EXECUTION_ID_INFIX_BETWEEN_JOIN_SET_AND_INDEX: char = '_';
2101
2102const JOIN_SET_ID_INFIX: char = ':';
2103const CHARSET_EXTRA_JSON_SET: &str = "-/";
2104
2105impl FromStr for JoinSetId {
2106 type Err = JoinSetIdParseError;
2107
2108 fn from_str(input: &str) -> Result<Self, Self::Err> {
2109 let Some((kind, name)) = input.split_once(JOIN_SET_ID_INFIX) else {
2110 return Err(JoinSetIdParseError::WrongParts);
2111 };
2112 let kind = kind
2113 .parse()
2114 .map_err(JoinSetIdParseError::JoinSetKindParseError)?;
2115 JoinSetId::new(kind, StrVariant::from(name.to_string()))
2116 .map_err(JoinSetIdParseError::InvalidName)
2117 }
2118}
2119
2120#[derive(Debug, thiserror::Error)]
2121pub enum JoinSetIdParseError {
2122 #[error("join set must consist of three parts separated by {JOIN_SET_ID_INFIX} ")]
2123 WrongParts,
2124 #[error("cannot parse join set kind - {0}")]
2125 JoinSetKindParseError(&'static str),
2126 #[error("cannot parse join set id - {0}")]
2127 InvalidName(InvalidNameError<JoinSetId>),
2128}
2129
2130#[cfg(any(test, feature = "test"))]
2131const CHARSET_JOIN_SET_NAME: &str =
2132 const_format::concatcp!(CHARSET_ALPHANUMERIC, CHARSET_EXTRA_JSON_SET);
2133#[cfg(any(test, feature = "test"))]
2134impl<'a> arbitrary::Arbitrary<'a> for JoinSetId {
2135 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
2136 let name: String = {
2137 let length_inclusive = u.int_in_range(0..=10).unwrap();
2138 (0..=length_inclusive)
2139 .map(|_| {
2140 let idx = u.choose_index(CHARSET_JOIN_SET_NAME.len()).unwrap();
2141 CHARSET_JOIN_SET_NAME
2142 .chars()
2143 .nth(idx)
2144 .expect("idx is < charset.len()")
2145 })
2146 .collect()
2147 };
2148
2149 Ok(JoinSetId::new(JoinSetKind::Named, StrVariant::from(name)).unwrap())
2150 }
2151}
2152
2153const EXECUTION_FAILED_STRING_OR_VARIANT: &str = "execution-failed";
2154#[derive(
2155 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2156)]
2157#[serde(rename_all = "snake_case")]
2158pub enum ReturnType {
2159 Extendable(ReturnTypeExtendable), NonExtendable(ReturnTypeNonExtendable), }
2162impl ReturnType {
2163 #[must_use]
2175 pub fn detect(type_wrapper: TypeWrapper, wit_type: StrVariant) -> ReturnType {
2176 if let TypeWrapper::Result { ok, err: None } = type_wrapper {
2177 return ReturnType::Extendable(ReturnTypeExtendable {
2178 type_wrapper_tl: TypeWrapperTopLevel { ok, err: None },
2179 wit_type,
2180 });
2181 } else if let TypeWrapper::Result { ok, err: Some(err) } = type_wrapper {
2182 if let TypeWrapper::String = err.as_ref() {
2183 return ReturnType::Extendable(ReturnTypeExtendable {
2184 type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
2185 wit_type,
2186 });
2187 } else if let TypeWrapper::Variant(fields) = err.as_ref()
2188 && let Some(None) =
2189 fields.get(&TypeKey::new_kebab(EXECUTION_FAILED_STRING_OR_VARIANT))
2190 {
2191 return ReturnType::Extendable(ReturnTypeExtendable {
2192 type_wrapper_tl: TypeWrapperTopLevel { ok, err: Some(err) },
2193 wit_type,
2194 });
2195 }
2196 return ReturnType::NonExtendable(ReturnTypeNonExtendable {
2197 type_wrapper: TypeWrapper::Result { ok, err: Some(err) },
2198 wit_type,
2199 });
2200 }
2201 ReturnType::NonExtendable(ReturnTypeNonExtendable {
2202 type_wrapper: type_wrapper.clone(),
2203 wit_type,
2204 })
2205 }
2206
2207 #[must_use]
2208 pub fn wit_type(&self) -> &str {
2209 match self {
2210 ReturnType::Extendable(compatible) => compatible.wit_type.as_ref(),
2211 ReturnType::NonExtendable(incompatible) => incompatible.wit_type.as_ref(),
2212 }
2213 }
2214
2215 #[must_use]
2216 pub fn type_wrapper(&self) -> TypeWrapper {
2217 match self {
2218 ReturnType::Extendable(compatible) => {
2219 TypeWrapper::from(compatible.type_wrapper_tl.clone())
2220 }
2221 ReturnType::NonExtendable(incompatible) => incompatible.type_wrapper.clone(),
2222 }
2223 }
2224}
2225
2226#[derive(
2227 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2228)]
2229#[display("{wit_type}")]
2230pub struct ReturnTypeNonExtendable {
2231 pub type_wrapper: TypeWrapper,
2232 pub wit_type: StrVariant,
2233}
2234
2235#[derive(
2236 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
2237)]
2238#[display("{wit_type}")]
2239pub struct ReturnTypeExtendable {
2240 pub type_wrapper_tl: TypeWrapperTopLevel,
2241 pub wit_type: StrVariant,
2242}
2243
2244pub const RETURN_TYPE_DUMMY: ReturnType = ReturnType::Extendable(ReturnTypeExtendable {
2245 type_wrapper_tl: TypeWrapperTopLevel {
2246 ok: None,
2247 err: None,
2248 },
2249 wit_type: StrVariant::Static("result"),
2250});
2251
2252#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, derive_more::Display)]
2253#[derive_where::derive_where(PartialEq)]
2254#[display("{name}: {wit_type}")]
2255pub struct ParameterType {
2256 pub type_wrapper: TypeWrapper,
2257 #[derive_where(skip)]
2258 pub name: StrVariant,
2260 pub wit_type: StrVariant,
2261}
2262
2263#[derive(
2264 Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default, derive_more::Deref,
2265)]
2266pub struct ParameterTypes(pub Vec<ParameterType>);
2267
2268impl Debug for ParameterTypes {
2269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2270 write!(f, "(")?;
2271 let mut iter = self.0.iter().peekable();
2272 while let Some(p) = iter.next() {
2273 write!(f, "{p:?}")?;
2274 if iter.peek().is_some() {
2275 write!(f, ", ")?;
2276 }
2277 }
2278 write!(f, ")")
2279 }
2280}
2281
2282impl Display for ParameterTypes {
2283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2284 write!(f, "(")?;
2285 let mut iter = self.0.iter().peekable();
2286 while let Some(p) = iter.next() {
2287 write!(f, "{p}")?;
2288 if iter.peek().is_some() {
2289 write!(f, ", ")?;
2290 }
2291 }
2292 write!(f, ")")
2293 }
2294}
2295
2296#[derive(Debug, Clone)]
2297pub struct PackageIfcFns {
2298 pub ifc_fqn: IfcFqnName,
2299 pub extension: bool, pub fns: IndexMap<FnName, FunctionMetadata>,
2301}
2302
2303#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
2304pub struct ComponentRetryConfig {
2305 pub max_retries: Option<u32>,
2306 pub retry_exp_backoff: Duration,
2307}
2308impl ComponentRetryConfig {
2309 pub const ZERO: ComponentRetryConfig = ComponentRetryConfig {
2310 max_retries: Some(0),
2311 retry_exp_backoff: Duration::ZERO,
2312 };
2313
2314 pub const WORKFLOW: ComponentRetryConfig = ComponentRetryConfig {
2315 max_retries: None,
2316 retry_exp_backoff: Duration::ZERO,
2317 };
2318
2319 pub const CRON: ComponentRetryConfig = ComponentRetryConfig {
2320 max_retries: None,
2321 retry_exp_backoff: Duration::ZERO,
2322 };
2323}
2324
2325pub trait FunctionRegistry: Send + Sync {
2326 fn get_by_exported_function(
2328 &self,
2329 ffqn: &FunctionFqn,
2330 ) -> Option<(FunctionMetadata, ComponentId)>;
2331
2332 fn get_ret_type(&self, ffqn: &FunctionFqn) -> Option<TypeWrapperTopLevel> {
2335 self.get_by_exported_function(ffqn)
2336 .and_then(|(fn_meta, _)| {
2337 if let ReturnType::Extendable(ReturnTypeExtendable {
2338 type_wrapper_tl: type_wrapper,
2339 wit_type: _,
2340 }) = fn_meta.return_type
2341 {
2342 Some(type_wrapper)
2343 } else {
2344 None
2345 }
2346 })
2347 }
2348
2349 fn all_exports(&self) -> &[PackageIfcFns];
2350}
2351
2352type MetadataMap = hashbrown::HashMap<String, String, BuildHasherDefault<DefaultHasher>>;
2354
2355#[derive(
2356 Debug,
2357 Default,
2358 Clone,
2359 Serialize,
2360 Deserialize,
2361 derive_more::Display,
2362 PartialEq,
2363 Eq,
2364 schemars::JsonSchema,
2365)]
2366#[schemars(with = "std::collections::HashMap<String, String>")]
2367#[display("{_0:?}")]
2368pub struct ExecutionMetadata(MetadataMap);
2369
2370impl ExecutionMetadata {
2371 const LINKED_KEY: &str = "obelisk-tracing-linked";
2372
2373 const EMPTY_MAP: MetadataMap = hashbrown::HashMap::with_hasher(BuildHasherDefault::new());
2374
2375 #[must_use]
2376 pub const fn empty() -> Self {
2377 Self(Self::EMPTY_MAP)
2378 }
2379
2380 #[must_use]
2381 pub fn from_parent_span(less_specific: &Span) -> Self {
2382 ExecutionMetadata::create(less_specific, false)
2383 }
2384
2385 #[must_use]
2386 pub fn from_linked_span(less_specific: &Span) -> Self {
2387 ExecutionMetadata::create(less_specific, true)
2388 }
2389
2390 #[must_use]
2395 fn create(span: &Span, link_marker: bool) -> Self {
2396 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2397 let mut metadata = Self(hashbrown::HashMap::default());
2398 let mut metadata_view = ExecutionMetadataInjectorView {
2399 metadata: &mut metadata,
2400 };
2401 fn inject(s: &Span, metadata_view: &mut ExecutionMetadataInjectorView) {
2403 opentelemetry::global::get_text_map_propagator(|propagator| {
2404 propagator.inject_context(&s.context(), metadata_view);
2405 });
2406 }
2407 inject(&Span::current(), &mut metadata_view);
2408 if metadata_view.is_empty() {
2409 inject(span, &mut metadata_view);
2411 }
2412 if link_marker {
2413 metadata_view.set(Self::LINKED_KEY, String::new());
2414 }
2415 metadata
2416 }
2417
2418 pub fn enrich(&self, span: &Span) {
2419 use opentelemetry::trace::TraceContextExt as _;
2420 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
2421
2422 let metadata_view = ExecutionMetadataExtractorView { metadata: self };
2423 let otel_context = opentelemetry::global::get_text_map_propagator(|propagator| {
2424 propagator.extract(&metadata_view)
2425 });
2426 if metadata_view.get(Self::LINKED_KEY).is_some() {
2427 let linked_span_context = otel_context.span().span_context().clone();
2428 span.add_link(linked_span_context);
2429 } else {
2430 let _ = span.set_parent(otel_context);
2431 }
2432 }
2433}
2434
2435struct ExecutionMetadataInjectorView<'a> {
2436 metadata: &'a mut ExecutionMetadata,
2437}
2438
2439impl ExecutionMetadataInjectorView<'_> {
2440 fn is_empty(&self) -> bool {
2441 self.metadata.0.is_empty()
2442 }
2443}
2444
2445impl opentelemetry::propagation::Injector for ExecutionMetadataInjectorView<'_> {
2446 fn set(&mut self, key: &str, value: String) {
2447 let key = format!("tracing:{key}");
2448 self.metadata.0.insert(key, value);
2449 }
2450}
2451
2452struct ExecutionMetadataExtractorView<'a> {
2453 metadata: &'a ExecutionMetadata,
2454}
2455
2456impl opentelemetry::propagation::Extractor for ExecutionMetadataExtractorView<'_> {
2457 fn get(&self, key: &str) -> Option<&str> {
2458 self.metadata
2459 .0
2460 .get(&format!("tracing:{key}"))
2461 .map(std::string::String::as_str)
2462 }
2463
2464 fn keys(&self) -> Vec<&str> {
2465 self.metadata
2466 .0
2467 .keys()
2468 .filter_map(|key| key.strip_prefix("tracing:"))
2469 .collect()
2470 }
2471}
2472
2473#[cfg(test)]
2474mod tests {
2475
2476 use rstest::rstest;
2477
2478 use crate::{
2479 ExecutionId, FunctionFqn, JoinSetId, JoinSetKind, StrVariant, prefixed_ulid::ExecutorId,
2480 };
2481 use std::{
2482 hash::{DefaultHasher, Hash, Hasher},
2483 str::FromStr,
2484 sync::Arc,
2485 };
2486
2487 #[test]
2488 fn ulid_parsing() {
2489 let generated = ExecutorId::generate();
2490 let str = generated.to_string();
2491 let parsed = str.parse().unwrap();
2492 assert_eq!(generated, parsed);
2493 }
2494
2495 #[test]
2496 fn execution_id_parsing_top_level() {
2497 let generated = ExecutionId::generate();
2498 let str = generated.to_string();
2499 let parsed = str.parse().unwrap();
2500 assert_eq!(generated, parsed);
2501 }
2502
2503 #[test]
2504 fn execution_id_with_one_level_should_parse() {
2505 let top_level = ExecutionId::generate();
2506 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2507 let first_child = ExecutionId::Derived(top_level.next_level(&join_set_id));
2508 let ser = first_child.to_string();
2509 assert_eq!(format!("{top_level}.n:name_1"), ser);
2510 let parsed = ExecutionId::from_str(&ser).unwrap();
2511 assert_eq!(first_child, parsed);
2512 }
2513
2514 #[test]
2515 fn execution_id_increment_twice() {
2516 let top_level = ExecutionId::generate();
2517 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2518 let first_child = top_level.next_level(&join_set_id);
2519 let second_child = ExecutionId::Derived(first_child.get_incremented());
2520 let ser = second_child.to_string();
2521 assert_eq!(format!("{top_level}.n:name_2"), ser);
2522 let parsed = ExecutionId::from_str(&ser).unwrap();
2523 assert_eq!(second_child, parsed);
2524 }
2525
2526 #[test]
2527 fn execution_id_next_level_twice() {
2528 let top_level = ExecutionId::generate();
2529 let join_set_id_outer =
2530 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("gg")).unwrap();
2531 let join_set_id_inner =
2532 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2533 let execution_id = ExecutionId::Derived(
2534 top_level
2535 .next_level(&join_set_id_outer)
2536 .get_incremented()
2537 .next_level(&join_set_id_inner)
2538 .get_incremented(),
2539 );
2540 let ser = execution_id.to_string();
2541 assert_eq!(format!("{top_level}.g:gg_2.o:oo_2"), ser);
2542 let parsed = ExecutionId::from_str(&ser).unwrap();
2543 assert_eq!(execution_id, parsed);
2544 }
2545
2546 #[test]
2547 fn execution_id_split_first_level() {
2548 let top_level = ExecutionId::generate();
2549 let join_set_id =
2550 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2551 let execution_id = top_level.next_level(&join_set_id);
2552 let (actual_top_level, actual_join_set) = execution_id.split_to_parts();
2553 assert_eq!(top_level, actual_top_level);
2554 assert_eq!(join_set_id, actual_join_set);
2555 }
2556
2557 #[rstest]
2558 fn execution_id_split_second_level(#[values(0, 1)] outer_idx: u64) {
2559 let top_level = ExecutionId::generate();
2560 let join_set_id_outer =
2561 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("some")).unwrap();
2562 let first_level = top_level
2563 .next_level(&join_set_id_outer)
2564 .get_incremented_by(outer_idx);
2565
2566 let join_set_id_inner =
2567 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("other")).unwrap();
2568 let second_level = first_level.next_level(&join_set_id_inner);
2569
2570 let (actual_first_level, actual_join_set) = second_level.split_to_parts();
2571 assert_eq!(ExecutionId::Derived(first_level), actual_first_level);
2572 assert_eq!(join_set_id_inner, actual_join_set);
2573 }
2574
2575 #[test]
2576 fn invalid_execution_id_should_fail_to_parse() {
2577 ExecutionId::from_str("E_01KBNHM5FQW81KP5Y3XMNX31K4.g:gg._2.o:oo_2").unwrap_err();
2578 }
2579
2580 #[test]
2581 fn execution_id_hash_should_be_stable() {
2582 let parent = ExecutionId::from_parts(1, 2);
2583 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
2584 let sibling_1 = parent.next_level(&join_set_id);
2585 let sibling_2 = ExecutionId::Derived(sibling_1.get_incremented());
2586 let sibling_1 = ExecutionId::Derived(sibling_1);
2587 let join_set_id_inner =
2588 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
2589 let child =
2590 ExecutionId::Derived(sibling_1.next_level(&join_set_id_inner).get_incremented());
2591 let parent = parent.random_seed();
2592 let sibling_1 = sibling_1.random_seed();
2593 let sibling_2 = sibling_2.random_seed();
2594 let child = child.random_seed();
2595 let vec = vec![parent, sibling_1, sibling_2, child];
2596 insta::assert_debug_snapshot!(vec);
2597 let set: hashbrown::HashSet<_> = vec.into_iter().collect();
2599 assert_eq!(4, set.len());
2600 }
2601
2602 #[test]
2603 fn hash_of_str_variants_should_be_equal() {
2604 let input = "foo";
2605 let left = StrVariant::Arc(Arc::from(input));
2606 let right = StrVariant::Static(input);
2607 assert_eq!(left, right);
2608 let mut left_hasher = DefaultHasher::new();
2609 left.hash(&mut left_hasher);
2610 let mut right_hasher = DefaultHasher::new();
2611 right.hash(&mut right_hasher);
2612 let left_hasher = left_hasher.finish();
2613 let right_hasher = right_hasher.finish();
2614 println!("left: {left_hasher:x}, right: {right_hasher:x}");
2615 assert_eq!(left_hasher, right_hasher);
2616 }
2617
2618 #[test]
2619 fn ffqn_from_tuple_with_version_should_work() {
2620 let ffqn = FunctionFqn::try_from_tuple("wasi:cli/run@0.2.0", "run").unwrap();
2621 assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2622 }
2623
2624 #[test]
2625 fn ffqn_from_str_with_version_should_work() {
2626 let ffqn = FunctionFqn::from_str("wasi:cli/run@0.2.0.run").unwrap();
2627 assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
2628 }
2629
2630 #[tokio::test]
2631 async fn join_set_serde_should_be_consistent() {
2632 use crate::{JoinSetId, JoinSetKind};
2633 use strum::IntoEnumIterator;
2634 for kind in JoinSetKind::iter() {
2635 let join_set_id = JoinSetId::new(kind, StrVariant::from("name")).unwrap();
2636 let ser = serde_json::to_string(&join_set_id).unwrap();
2637 let deser = serde_json::from_str(&ser).unwrap();
2638 assert_eq!(join_set_id, deser);
2639 }
2640 }
2641
2642 mod ifc_fqn {
2643 use crate::{IfcFqnName, IfcFqnParseError};
2644 use std::str::FromStr;
2645
2646 #[test]
2647 fn parses_valid_without_version() {
2648 let fqn = IfcFqnName::from_str("ns:pkg/ifc").unwrap();
2649 assert_eq!(fqn.namespace(), "ns");
2650 assert_eq!(fqn.package_name(), "pkg");
2651 assert_eq!(fqn.ifc_name(), "ifc");
2652 assert_eq!(fqn.version(), None);
2653 }
2654
2655 #[test]
2656 fn parses_valid_with_version() {
2657 let fqn = IfcFqnName::from_str("ns:pkg/ifc@1.2.3").unwrap();
2658 assert_eq!(fqn.version(), Some("1.2.3"));
2659 }
2660
2661 #[test]
2662 fn fails_missing_colon() {
2663 let err = IfcFqnName::from_str("pkg/ifc").unwrap_err();
2664 assert!(matches!(err, IfcFqnParseError::MissingNamespace(_)));
2665 }
2666
2667 #[test]
2668 fn fails_empty_namespace() {
2669 let err = IfcFqnName::from_str(":pkg/ifc").unwrap_err();
2670 assert!(matches!(err, IfcFqnParseError::EmptyNamespace(_)));
2671 }
2672
2673 #[test]
2674 fn fails_missing_slash() {
2675 let err = IfcFqnName::from_str("ns:pkgifc").unwrap_err();
2676 assert!(matches!(err, IfcFqnParseError::MissingPackageOrIfc(_)));
2677 }
2678
2679 #[test]
2680 fn fails_empty_package() {
2681 let err = IfcFqnName::from_str("ns:/ifc").unwrap_err();
2682 assert!(matches!(err, IfcFqnParseError::EmptyPackageName(_)));
2683 }
2684
2685 #[test]
2686 fn fails_empty_ifc() {
2687 let err = IfcFqnName::from_str("ns:pkg/").unwrap_err();
2688 assert!(matches!(err, IfcFqnParseError::EmptyIfcName(_)));
2689 }
2690
2691 #[test]
2692 fn fails_empty_version() {
2693 let err = IfcFqnName::from_str("ns:pkg/ifc@").unwrap_err();
2694 assert!(matches!(err, IfcFqnParseError::EmptyVersion(_)));
2695 }
2696 }
2697}