1use ::serde::{Deserialize, Serialize};
2use arbitrary::Arbitrary;
3use assert_matches::assert_matches;
4use async_trait::async_trait;
5pub use indexmap;
6use indexmap::IndexMap;
7use opentelemetry::propagation::{Extractor, Injector};
8pub use prefixed_ulid::ExecutionId;
9use prefixed_ulid::{ExecutionIdDerived, ExecutionIdParseError};
10use serde_json::Value;
11use std::{
12 borrow::Borrow,
13 fmt::{Debug, Display},
14 hash::Hash,
15 marker::PhantomData,
16 ops::Deref,
17 str::FromStr,
18 sync::Arc,
19 time::Duration,
20};
21use storage::{PendingStateFinishedError, PendingStateFinishedResultKind};
22use tracing::Span;
23use val_json::{
24 type_wrapper::{TypeConversionError, TypeWrapper},
25 wast_val::{WastVal, WastValWithType},
26 wast_val_ser::params,
27};
28use wasmtime::component::{Type, Val};
29
30#[cfg(feature = "rusqlite")]
31mod rusqlite_ext;
32pub mod storage;
33pub mod time;
34
35pub const NAMESPACE_OBELISK: &str = "obelisk";
36pub const SUFFIX_PKG_EXT: &str = "-obelisk-ext";
37
38pub type FinishedExecutionResult = Result<SupportedFunctionReturnValue, FinishedExecutionError>;
39
40#[derive(thiserror::Error, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
41pub enum FinishedExecutionError {
42 #[error("permanent timeout")]
44 PermanentTimeout,
45 #[error("unhandled child execution error {child_execution_id}")]
47 UnhandledChildExecutionError {
48 child_execution_id: ExecutionIdDerived,
49 root_cause_id: ExecutionIdDerived,
50 },
51 #[error("permanent failure: {reason_full}")]
52 PermanentFailure {
53 reason_inner: String,
55 reason_full: String,
57 kind: PermanentFailureKind,
58 detail: Option<String>,
59 },
60}
61impl FinishedExecutionError {
62 #[must_use]
63 pub fn as_pending_state_finished_error(&self) -> PendingStateFinishedError {
64 match self {
65 FinishedExecutionError::PermanentTimeout => PendingStateFinishedError::Timeout,
66 FinishedExecutionError::UnhandledChildExecutionError { .. } => {
67 PendingStateFinishedError::UnhandledChildExecutionError
68 }
69 FinishedExecutionError::PermanentFailure { .. } => {
70 PendingStateFinishedError::ExecutionFailure
71 }
72 }
73 }
74}
75
76#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "snake_case")]
78pub enum PermanentFailureKind {
79 NondeterminismDetected,
81 ParamsParsingError,
83 CannotInstantiate,
85 ResultParsingError,
87 ImportedFunctionCallError,
89 ActivityTrap,
91 WorkflowTrap,
93 JoinSetNameConflict,
95 WebhookEndpointError,
97}
98
99#[derive(Debug, Clone, Copy, derive_more::Display, PartialEq, Eq, Serialize, Deserialize)]
100#[serde(rename_all = "snake_case")]
101pub enum TrapKind {
102 #[display("trap")]
103 Trap,
104 #[display("post_return_trap")]
105 PostReturnTrap,
106}
107
108#[derive(Clone, Eq, derive_more::Display)]
109pub enum StrVariant {
110 Static(&'static str),
111 Arc(Arc<str>),
112}
113
114impl StrVariant {
115 #[must_use]
116 pub const fn empty() -> StrVariant {
117 StrVariant::Static("")
118 }
119}
120
121impl From<String> for StrVariant {
122 fn from(value: String) -> Self {
123 StrVariant::Arc(Arc::from(value))
124 }
125}
126
127impl From<&'static str> for StrVariant {
128 fn from(value: &'static str) -> Self {
129 StrVariant::Static(value)
130 }
131}
132
133impl PartialEq for StrVariant {
134 fn eq(&self, other: &Self) -> bool {
135 match (self, other) {
136 (Self::Static(left), Self::Static(right)) => left == right,
137 (Self::Static(left), Self::Arc(right)) => *left == right.deref(),
138 (Self::Arc(left), Self::Arc(right)) => left == right,
139 (Self::Arc(left), Self::Static(right)) => left.deref() == *right,
140 }
141 }
142}
143
144impl Hash for StrVariant {
145 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
146 match self {
147 StrVariant::Static(val) => val.hash(state),
148 StrVariant::Arc(val) => {
149 let str: &str = val.deref();
150 str.hash(state);
151 }
152 }
153 }
154}
155
156impl Debug for StrVariant {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 Display::fmt(self, f)
159 }
160}
161
162impl Deref for StrVariant {
163 type Target = str;
164 fn deref(&self) -> &Self::Target {
165 match self {
166 Self::Arc(v) => v,
167 Self::Static(v) => v,
168 }
169 }
170}
171
172impl AsRef<str> for StrVariant {
173 fn as_ref(&self) -> &str {
174 match self {
175 Self::Arc(v) => v,
176 Self::Static(v) => v,
177 }
178 }
179}
180
181mod serde_strvariant {
182 use crate::StrVariant;
183 use serde::{
184 Deserialize, Deserializer, Serialize, Serializer,
185 de::{self, Visitor},
186 };
187 use std::{ops::Deref, sync::Arc};
188
189 impl Serialize for StrVariant {
190 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
191 where
192 S: Serializer,
193 {
194 serializer.serialize_str(self.deref())
195 }
196 }
197
198 impl<'de> Deserialize<'de> for StrVariant {
199 fn deserialize<D>(deserializer: D) -> Result<StrVariant, D::Error>
200 where
201 D: Deserializer<'de>,
202 {
203 deserializer.deserialize_str(StrVariantVisitor)
204 }
205 }
206
207 struct StrVariantVisitor;
208
209 impl Visitor<'_> for StrVariantVisitor {
210 type Value = StrVariant;
211
212 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
213 formatter.write_str("a string")
214 }
215
216 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
217 where
218 E: de::Error,
219 {
220 Ok(StrVariant::Arc(Arc::from(v)))
221 }
222 }
223}
224
225#[derive(Hash, Clone, PartialEq, Eq, derive_more::Display, Serialize, Deserialize)]
226#[display("{value}")]
227#[serde(transparent)]
228pub struct Name<T> {
229 value: StrVariant,
230 #[serde(skip)]
231 phantom_data: PhantomData<fn(T) -> T>,
232}
233
234impl<T> Name<T> {
235 #[must_use]
236 pub fn new_arc(value: Arc<str>) -> Self {
237 Self {
238 value: StrVariant::Arc(value),
239 phantom_data: PhantomData,
240 }
241 }
242
243 #[must_use]
244 pub const fn new_static(value: &'static str) -> Self {
245 Self {
246 value: StrVariant::Static(value),
247 phantom_data: PhantomData,
248 }
249 }
250}
251
252impl<T> Debug for Name<T> {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 Display::fmt(&self, f)
255 }
256}
257
258impl<T> Deref for Name<T> {
259 type Target = str;
260
261 fn deref(&self) -> &Self::Target {
262 self.value.deref()
263 }
264}
265
266impl<T> Borrow<str> for Name<T> {
267 fn borrow(&self) -> &str {
268 self.deref()
269 }
270}
271
272impl<T> From<String> for Name<T> {
273 fn from(value: String) -> Self {
274 Self::new_arc(Arc::from(value))
275 }
276}
277
278#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
279pub struct PkgFqn {
280 pub namespace: String,
281 pub package_name: String,
282 pub version: Option<String>,
283}
284impl Display for PkgFqn {
285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 let PkgFqn {
287 namespace,
288 package_name,
289 version,
290 } = self;
291 if let Some(version) = version {
292 write!(f, "{namespace}:{package_name}@{version}")
293 } else {
294 write!(f, "{namespace}:{package_name}")
295 }
296 }
297}
298
299impl PkgFqn {
300 #[must_use]
301 pub fn is_extension(&self) -> bool {
302 self.package_name.ends_with(SUFFIX_PKG_EXT)
303 }
304
305 #[must_use]
306 pub fn package_strip_extension_suffix(&self) -> Option<&str> {
307 self.package_name.as_str().strip_suffix(SUFFIX_PKG_EXT)
308 }
309
310 #[must_use]
311 pub fn is_namespace_obelisk(&self) -> bool {
312 self.namespace == NAMESPACE_OBELISK
313 }
314
315 #[must_use]
316 pub fn ifc_fqn_name(&self, ifc_name: &str) -> IfcFqnName {
317 IfcFqnName::from_parts(
318 &self.namespace,
319 &self.package_name,
320 ifc_name,
321 self.version.as_deref(),
322 )
323 }
324}
325
326#[derive(Hash, Clone, PartialEq, Eq)]
327pub struct IfcFqnMarker;
328
329pub type IfcFqnName = Name<IfcFqnMarker>; impl IfcFqnName {
332 #[must_use]
333 pub fn namespace(&self) -> &str {
334 self.deref().split_once(':').unwrap().0
335 }
336
337 #[must_use]
338 pub fn package_name(&self) -> &str {
339 let after_colon = self.deref().split_once(':').unwrap().1;
340 after_colon.split_once('/').unwrap().0
341 }
342
343 #[must_use]
344 pub fn version(&self) -> Option<&str> {
345 self.deref().split_once('@').map(|(_, version)| version)
346 }
347
348 #[must_use]
349 pub fn pkg_fqn_name(&self) -> PkgFqn {
350 let (namespace, rest) = self.deref().split_once(':').unwrap();
351 let (package_name, rest) = rest.split_once('/').unwrap();
352 let version = rest.split_once('@').map(|(_, version)| version);
353 PkgFqn {
354 namespace: namespace.to_string(),
355 package_name: package_name.to_string(),
356 version: version.map(std::string::ToString::to_string),
357 }
358 }
359
360 #[must_use]
361 pub fn ifc_name(&self) -> &str {
362 let after_colon = self.deref().split_once(':').unwrap().1;
363 let after_slash = after_colon.split_once('/').unwrap().1;
364 after_slash
365 .split_once('@')
366 .map_or(after_slash, |(ifc, _)| ifc)
367 }
368
369 #[must_use]
370 pub fn from_parts(
371 namespace: &str,
372 package_name: &str,
373 ifc_name: &str,
374 version: Option<&str>,
375 ) -> Self {
376 let mut str = format!("{namespace}:{package_name}/{ifc_name}");
377 if let Some(version) = version {
378 str += "@";
379 str += version;
380 }
381 Self::new_arc(Arc::from(str))
382 }
383
384 #[must_use]
385 pub fn is_extension(&self) -> bool {
386 self.package_name().ends_with(SUFFIX_PKG_EXT)
387 }
388
389 #[must_use]
390 pub fn package_strip_extension_suffix(&self) -> Option<&str> {
391 self.package_name().strip_suffix(SUFFIX_PKG_EXT)
392 }
393
394 #[must_use]
395 pub fn is_namespace_obelisk(&self) -> bool {
396 self.namespace() == NAMESPACE_OBELISK
397 }
398}
399
400#[derive(Hash, Clone, PartialEq, Eq)]
401pub struct FnMarker;
402
403pub type FnName = Name<FnMarker>;
404
405#[derive(Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
406pub struct FunctionFqn {
407 pub ifc_fqn: IfcFqnName,
408 pub function_name: FnName,
409}
410
411impl FunctionFqn {
412 #[must_use]
413 pub fn new_arc(ifc_fqn: Arc<str>, function_name: Arc<str>) -> Self {
414 Self {
415 ifc_fqn: Name::new_arc(ifc_fqn),
416 function_name: Name::new_arc(function_name),
417 }
418 }
419
420 #[must_use]
421 pub const fn new_static(ifc_fqn: &'static str, function_name: &'static str) -> Self {
422 Self {
423 ifc_fqn: Name::new_static(ifc_fqn),
424 function_name: Name::new_static(function_name),
425 }
426 }
427
428 #[must_use]
429 pub const fn new_static_tuple(tuple: (&'static str, &'static str)) -> Self {
430 Self::new_static(tuple.0, tuple.1)
431 }
432
433 pub fn try_from_tuple(
434 ifc_fqn: &str,
435 function_name: &str,
436 ) -> Result<Self, FunctionFqnParseError> {
437 if function_name.contains('.') {
438 Err(FunctionFqnParseError::DelimiterFoundInFunctionName)
439 } else {
440 Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
441 }
442 }
443}
444
445#[derive(Debug, thiserror::Error)]
446pub enum FunctionFqnParseError {
447 #[error("delimiter `.` not found")]
448 DelimiterNotFound,
449 #[error("delimiter `.` found in function name")]
450 DelimiterFoundInFunctionName,
451}
452
453impl FromStr for FunctionFqn {
454 type Err = FunctionFqnParseError;
455
456 fn from_str(s: &str) -> Result<Self, Self::Err> {
457 if let Some((ifc_fqn, function_name)) = s.rsplit_once('.') {
458 Ok(Self::new_arc(Arc::from(ifc_fqn), Arc::from(function_name)))
459 } else {
460 Err(FunctionFqnParseError::DelimiterNotFound)
461 }
462 }
463}
464
465impl Display for FunctionFqn {
466 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467 write!(
468 f,
469 "{ifc_fqn}.{function_name}",
470 ifc_fqn = self.ifc_fqn,
471 function_name = self.function_name
472 )
473 }
474}
475
476impl Debug for FunctionFqn {
477 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
478 Display::fmt(&self, f)
479 }
480}
481
482impl<'a> arbitrary::Arbitrary<'a> for FunctionFqn {
483 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
484 let illegal = [':', '@', '.'];
485 let namespace = u.arbitrary::<String>()?.replace(illegal, "");
486 let pkg_name = u.arbitrary::<String>()?.replace(illegal, "");
487 let ifc_name = u.arbitrary::<String>()?.replace(illegal, "");
488 let fn_name = u.arbitrary::<String>()?.replace(illegal, "");
489
490 Ok(FunctionFqn::new_arc(
491 Arc::from(format!("{namespace}:{pkg_name}/{ifc_name}")),
492 Arc::from(fn_name),
493 ))
494 }
495}
496
497#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
498pub enum SupportedFunctionReturnValue {
499 None,
500 FallibleResultErr(WastValWithType),
502 InfallibleOrResultOk(WastValWithType),
504}
505
506#[derive(Debug, thiserror::Error)]
507pub enum ResultParsingError {
508 #[error("result cannot be parsed, multi-value results are not supported")]
509 MultiValue,
510 #[error("result cannot be parsed, {0}")]
511 TypeConversionError(#[from] val_json::type_wrapper::TypeConversionError),
512 #[error("result cannot be parsed, {0}")]
513 ValueConversionError(#[from] val_json::wast_val::WastValConversionError),
514}
515
516impl SupportedFunctionReturnValue {
517 pub fn new<
518 I: ExactSizeIterator<Item = (wasmtime::component::Val, wasmtime::component::Type)>,
519 >(
520 mut iter: I,
521 ) -> Result<Self, ResultParsingError> {
522 if iter.len() == 0 {
523 Ok(Self::None)
524 } else if iter.len() == 1 {
525 let (val, r#type) = iter.next().unwrap();
526 let r#type = TypeWrapper::try_from(r#type)?;
527 let val = WastVal::try_from(val)?;
528 match &val {
529 WastVal::Result(Err(_)) => Ok(Self::FallibleResultErr(WastValWithType {
530 r#type,
531 value: val,
532 })),
533 _ => Ok(Self::InfallibleOrResultOk(WastValWithType {
534 r#type,
535 value: val,
536 })),
537 }
538 } else {
539 Err(ResultParsingError::MultiValue)
540 }
541 }
542
543 #[cfg(feature = "test")]
544 #[must_use]
545 pub fn fallible_err(&self) -> Option<Option<&WastVal>> {
546 match self {
547 SupportedFunctionReturnValue::FallibleResultErr(WastValWithType {
548 value: WastVal::Result(Err(err)),
549 ..
550 }) => Some(err.as_deref()),
551 _ => None,
552 }
553 }
554
555 #[cfg(feature = "test")]
556 #[must_use]
557 pub fn fallible_ok(&self) -> Option<Option<&WastVal>> {
558 match self {
559 SupportedFunctionReturnValue::InfallibleOrResultOk(WastValWithType {
560 value: WastVal::Result(Ok(ok)),
561 ..
562 }) => Some(ok.as_deref()),
563 _ => None,
564 }
565 }
566
567 #[cfg(feature = "test")]
568 #[must_use]
569 pub fn val_type(&self) -> Option<&TypeWrapper> {
570 match self {
571 SupportedFunctionReturnValue::None => None,
572 SupportedFunctionReturnValue::FallibleResultErr(v)
573 | SupportedFunctionReturnValue::InfallibleOrResultOk(v) => Some(&v.r#type),
574 }
575 }
576
577 #[must_use]
578 pub fn value(&self) -> Option<&WastVal> {
579 match self {
580 SupportedFunctionReturnValue::None => None,
581 SupportedFunctionReturnValue::FallibleResultErr(v)
582 | SupportedFunctionReturnValue::InfallibleOrResultOk(v) => Some(&v.value),
583 }
584 }
585
586 #[must_use]
587 pub fn into_value(self) -> Option<WastVal> {
588 match self {
589 SupportedFunctionReturnValue::None => None,
590 SupportedFunctionReturnValue::FallibleResultErr(v)
591 | SupportedFunctionReturnValue::InfallibleOrResultOk(v) => Some(v.value),
592 }
593 }
594
595 #[must_use]
596 pub fn len(&self) -> usize {
597 match self {
598 SupportedFunctionReturnValue::None => 0,
599 _ => 1,
600 }
601 }
602
603 #[must_use]
604 pub fn is_empty(&self) -> bool {
605 matches!(self, Self::None)
606 }
607
608 #[must_use]
609 pub fn as_pending_state_finished_result(&self) -> PendingStateFinishedResultKind {
610 if let SupportedFunctionReturnValue::FallibleResultErr(_) = self {
611 PendingStateFinishedResultKind(Err(PendingStateFinishedError::FallibleError))
612 } else {
613 PendingStateFinishedResultKind(Ok(()))
614 }
615 }
616}
617
618#[derive(Debug, Clone, PartialEq, Eq)]
619pub struct Params(ParamsInternal);
620
621#[derive(Debug, Clone, PartialEq, Eq)]
622enum ParamsInternal {
623 JsonValues(Vec<Value>),
624 Vals {
625 vals: Arc<[wasmtime::component::Val]>,
627 },
628 Empty,
629}
630
631impl Default for Params {
632 fn default() -> Self {
633 Self(ParamsInternal::Empty)
634 }
635}
636
637#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
638pub enum FunctionExtension {
639 Submit,
640 AwaitNext,
641 Schedule,
642}
643
644#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
645pub struct FunctionMetadata {
646 pub ffqn: FunctionFqn,
647 pub parameter_types: ParameterTypes,
648 pub return_type: Option<ReturnType>,
649 pub extension: Option<FunctionExtension>,
650 pub submittable: bool,
651}
652impl Display for FunctionMetadata {
653 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
654 write!(
655 f,
656 "{ffqn}: func{params}",
657 ffqn = self.ffqn,
658 params = self.parameter_types
659 )?;
660 if let Some(return_type) = &self.return_type {
661 write!(f, " -> {return_type}")?;
662 }
663 Ok(())
664 }
665}
666
667pub mod serde_params {
668 use crate::{Params, ParamsInternal};
669 use serde::de::{SeqAccess, Visitor};
670 use serde::ser::SerializeSeq;
671 use serde::{Deserialize, Serialize};
672 use serde_json::Value;
673 use val_json::wast_val::WastVal;
674
675 impl Serialize for Params {
676 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
677 where
678 S: ::serde::Serializer,
679 {
680 match &self.0 {
681 ParamsInternal::Vals { vals } => {
682 let mut seq = serializer.serialize_seq(Some(vals.len()))?; for val in vals.iter() {
684 let value = WastVal::try_from(val.clone())
685 .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
686 seq.serialize_element(&value)?;
687 }
688 seq.end()
689 }
690 ParamsInternal::Empty => serializer.serialize_seq(Some(0))?.end(),
691 ParamsInternal::JsonValues(vec) => {
692 let mut seq = serializer.serialize_seq(Some(vec.len()))?;
693 for item in vec {
694 seq.serialize_element(item)?;
695 }
696 seq.end()
697 }
698 }
699 }
700 }
701
702 pub struct VecVisitor;
703
704 impl<'de> Visitor<'de> for VecVisitor {
705 type Value = Vec<Value>;
706
707 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
708 formatter.write_str("a sequence of `Value`")
709 }
710
711 #[inline]
712 fn visit_seq<V>(self, mut visitor: V) -> Result<Self::Value, V::Error>
713 where
714 V: SeqAccess<'de>,
715 {
716 let mut vec = Vec::new();
717 while let Some(elem) = visitor.next_element()? {
718 vec.push(elem);
719 }
720 Ok(vec)
721 }
722 }
723
724 impl<'de> Deserialize<'de> for Params {
725 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
726 where
727 D: serde::Deserializer<'de>,
728 {
729 let vec: Vec<Value> = deserializer.deserialize_seq(VecVisitor)?;
730 if vec.is_empty() {
731 Ok(Self(ParamsInternal::Empty))
732 } else {
733 Ok(Self(ParamsInternal::JsonValues(vec)))
734 }
735 }
736 }
737}
738
739#[derive(Debug, thiserror::Error)]
740pub enum ParamsParsingError {
741 #[error("parameters cannot be parsed, cannot convert type of {idx}-th parameter")]
742 ParameterTypeError {
743 idx: usize,
744 err: TypeConversionError,
745 },
746 #[error("parameters cannot be deserialized: {0}")]
747 ParamsDeserializationError(serde_json::Error),
748 #[error("parameter cardinality mismatch, expected: {expected}, specified: {specified}")]
749 ParameterCardinalityMismatch { expected: usize, specified: usize },
750}
751
752impl ParamsParsingError {
753 #[must_use]
754 pub fn detail(&self) -> Option<String> {
755 match self {
756 ParamsParsingError::ParameterTypeError { err, .. } => Some(format!("{err:?}")),
757 ParamsParsingError::ParamsDeserializationError(err) => Some(format!("{err:?}")),
758 ParamsParsingError::ParameterCardinalityMismatch { .. } => None,
759 }
760 }
761}
762
763#[derive(Debug, thiserror::Error)]
764pub enum ParamsFromJsonError {
765 #[error("value must be a json array containing function parameters")]
766 MustBeArray,
767}
768
769impl Params {
770 #[must_use]
771 pub const fn empty() -> Self {
772 Self(ParamsInternal::Empty)
773 }
774
775 #[must_use]
776 pub fn from_wasmtime(vals: Arc<[wasmtime::component::Val]>) -> Self {
777 if vals.is_empty() {
778 Self::empty()
779 } else {
780 Self(ParamsInternal::Vals { vals })
781 }
782 }
783
784 #[must_use]
785 pub fn from_json_values(vec: Vec<Value>) -> Self {
786 if vec.is_empty() {
787 Self::empty()
788 } else {
789 Self(ParamsInternal::JsonValues(vec))
790 }
791 }
792
793 pub fn typecheck<'a>(
794 &self,
795 param_types: impl ExactSizeIterator<Item = &'a TypeWrapper>,
796 ) -> Result<(), ParamsParsingError> {
797 if param_types.len() != self.len() {
798 return Err(ParamsParsingError::ParameterCardinalityMismatch {
799 expected: param_types.len(),
800 specified: self.len(),
801 });
802 }
803 match &self.0 {
804 ParamsInternal::Vals { .. } | ParamsInternal::Empty => {}
805 ParamsInternal::JsonValues(params) => {
806 params::deserialize_values(params, param_types)
807 .map_err(ParamsParsingError::ParamsDeserializationError)?;
808 }
809 }
810 Ok(())
811 }
812
813 pub fn as_vals(
814 &self,
815 param_types: Box<[(String, Type)]>,
816 ) -> Result<Arc<[wasmtime::component::Val]>, ParamsParsingError> {
817 if param_types.len() != self.len() {
818 return Err(ParamsParsingError::ParameterCardinalityMismatch {
819 expected: param_types.len(),
820 specified: self.len(),
821 });
822 }
823 match &self.0 {
824 ParamsInternal::JsonValues(json_vec) => {
825 let param_types = param_types
826 .into_vec()
827 .into_iter()
828 .enumerate()
829 .map(|(idx, (_param_name, ty))| {
830 TypeWrapper::try_from(ty).map_err(|err| (idx, err))
831 })
832 .collect::<Result<Vec<_>, _>>()
833 .map_err(|(idx, err)| ParamsParsingError::ParameterTypeError { idx, err })?;
834 Ok(params::deserialize_values(json_vec, param_types.iter())
835 .map_err(ParamsParsingError::ParamsDeserializationError)?
836 .into_iter()
837 .map(Val::from)
838 .collect())
839 }
840 ParamsInternal::Vals { vals, .. } => Ok(vals.clone()),
841 ParamsInternal::Empty => Ok(Arc::from([])),
842 }
843 }
844
845 #[must_use]
846 pub fn len(&self) -> usize {
847 match &self.0 {
848 ParamsInternal::JsonValues(vec) => vec.len(),
849 ParamsInternal::Vals { vals, .. } => vals.len(),
850 ParamsInternal::Empty => 0,
851 }
852 }
853
854 #[must_use]
855 pub fn is_empty(&self) -> bool {
856 self.len() == 0
857 }
858}
859
860pub mod prefixed_ulid {
861 use arbitrary::Arbitrary;
862 use serde_with::{DeserializeFromStr, SerializeDisplay};
863 use std::{
864 fmt::{Debug, Display},
865 hash::Hasher,
866 marker::PhantomData,
867 num::ParseIntError,
868 str::FromStr,
869 sync::Arc,
870 };
871 use ulid::Ulid;
872
873 use crate::JoinSetId;
874
875 #[derive(derive_more::Display, SerializeDisplay, DeserializeFromStr)]
876 #[derive_where::derive_where(Clone, Copy)]
877 #[display("{}_{ulid}", Self::prefix())]
878 pub struct PrefixedUlid<T: 'static> {
879 ulid: Ulid,
880 phantom_data: PhantomData<fn(T) -> T>,
881 }
882
883 impl<T> PrefixedUlid<T> {
884 const fn new(ulid: Ulid) -> Self {
885 Self {
886 ulid,
887 phantom_data: PhantomData,
888 }
889 }
890
891 fn prefix() -> &'static str {
892 std::any::type_name::<T>().rsplit("::").next().unwrap()
893 }
894 }
895
896 impl<T> PrefixedUlid<T> {
897 #[must_use]
898 pub fn generate() -> Self {
899 Self::new(Ulid::new())
900 }
901
902 #[must_use]
903 pub const fn from_parts(timestamp_ms: u64, random: u128) -> Self {
904 Self::new(Ulid::from_parts(timestamp_ms, random))
905 }
906
907 #[must_use]
908 pub fn timestamp_part(&self) -> u64 {
909 self.ulid.timestamp_ms()
910 }
911
912 #[must_use]
913 #[expect(clippy::cast_possible_truncation)]
914 pub fn random_part(&self) -> u64 {
915 self.ulid.random() as u64
916 }
917 }
918
919 #[derive(Debug, thiserror::Error)]
920 pub enum PrefixedUlidParseError {
921 #[error("wrong prefix in `{input}`, expected prefix `{expected}`")]
922 WrongPrefix { input: String, expected: String },
923 #[error("cannot parse ULID suffix from `{input}`")]
924 CannotParseUlid { input: String },
925 }
926
927 mod impls {
928 use super::{PrefixedUlid, PrefixedUlidParseError, Ulid};
929 use std::{fmt::Debug, fmt::Display, hash::Hash, marker::PhantomData, str::FromStr};
930
931 impl<T> FromStr for PrefixedUlid<T> {
932 type Err = PrefixedUlidParseError;
933
934 fn from_str(input: &str) -> Result<Self, Self::Err> {
935 let prefix = Self::prefix();
936 let mut input_chars = input.chars();
937 for exp in prefix.chars() {
938 if input_chars.next() != Some(exp) {
939 return Err(PrefixedUlidParseError::WrongPrefix {
940 input: input.to_string(),
941 expected: format!("{prefix}_"),
942 });
943 }
944 }
945 if input_chars.next() != Some('_') {
946 return Err(PrefixedUlidParseError::WrongPrefix {
947 input: input.to_string(),
948 expected: format!("{prefix}_"),
949 });
950 }
951 let Ok(ulid) = Ulid::from_string(input_chars.as_str()) else {
952 return Err(PrefixedUlidParseError::CannotParseUlid {
953 input: input.to_string(),
954 });
955 };
956 Ok(Self {
957 ulid,
958 phantom_data: PhantomData,
959 })
960 }
961 }
962
963 impl<T> Debug for PrefixedUlid<T> {
964 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
965 Display::fmt(&self, f)
966 }
967 }
968
969 impl<T> Hash for PrefixedUlid<T> {
970 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
971 Self::prefix().hash(state);
972 self.ulid.hash(state);
973 self.phantom_data.hash(state);
974 }
975 }
976
977 impl<T> PartialEq for PrefixedUlid<T> {
978 fn eq(&self, other: &Self) -> bool {
979 self.ulid == other.ulid
980 }
981 }
982
983 impl<T> Eq for PrefixedUlid<T> {}
984
985 impl<T> PartialOrd for PrefixedUlid<T> {
986 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
987 Some(self.cmp(other))
988 }
989 }
990
991 impl<T> Ord for PrefixedUlid<T> {
992 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
993 self.ulid.cmp(&other.ulid)
994 }
995 }
996 }
997
998 pub mod prefix {
999 pub struct E;
1000 pub struct Exr;
1001 pub struct Run;
1002 pub struct Delay;
1003 }
1004
1005 pub type ExecutorId = PrefixedUlid<prefix::Exr>;
1006 pub type ExecutionIdTopLevel = PrefixedUlid<prefix::E>;
1007 pub type RunId = PrefixedUlid<prefix::Run>;
1008 pub type DelayId = PrefixedUlid<prefix::Delay>;
1009
1010 impl<'a, T> Arbitrary<'a> for PrefixedUlid<T> {
1011 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1012 Ok(Self::new(ulid::Ulid::from_parts(
1013 u.arbitrary()?,
1014 u.arbitrary()?,
1015 )))
1016 }
1017 }
1018
1019 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, SerializeDisplay, DeserializeFromStr, Clone)]
1020 pub enum ExecutionId {
1021 TopLevel(ExecutionIdTopLevel),
1022 Derived(ExecutionIdDerived),
1023 }
1024
1025 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, SerializeDisplay, DeserializeFromStr)]
1026 pub struct ExecutionIdDerived {
1027 top_level: ExecutionIdTopLevel,
1028 infix: Arc<str>,
1029 idx: u64,
1030 }
1031 impl ExecutionIdDerived {
1032 #[must_use]
1033 pub fn get_incremented(&self) -> Self {
1034 self.get_incremented_by(1)
1035 }
1036 #[must_use]
1037 pub fn get_incremented_by(&self, count: u64) -> Self {
1038 ExecutionIdDerived {
1039 top_level: self.top_level,
1040 infix: self.infix.clone(),
1041 idx: self.idx + count,
1042 }
1043 }
1044 #[must_use]
1045 pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1046 let ExecutionIdDerived {
1047 top_level,
1048 infix,
1049 idx,
1050 } = self;
1051 let infix = Arc::from(format!(
1052 "{infix}{EXECUTION_ID_JOIN_SET_INFIX}{idx}{EXECUTION_ID_INFIX}{join_set_id}"
1053 ));
1054 ExecutionIdDerived {
1055 top_level: *top_level,
1056 infix,
1057 idx: EXECUTION_ID_START_IDX,
1058 }
1059 }
1060 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1061 let ExecutionIdDerived {
1062 top_level,
1063 infix,
1064 idx,
1065 } = self;
1066 write!(
1067 f,
1068 "{top_level}{EXECUTION_ID_INFIX}{infix}{EXECUTION_ID_JOIN_SET_INFIX}{idx}"
1069 )
1070 }
1071 }
1072 impl Debug for ExecutionIdDerived {
1073 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1074 self.display_or_debug(f)
1075 }
1076 }
1077 impl Display for ExecutionIdDerived {
1078 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1079 self.display_or_debug(f)
1080 }
1081 }
1082 impl FromStr for ExecutionIdDerived {
1083 type Err = ExecutionIdDerivedParseError;
1084
1085 fn from_str(input: &str) -> Result<Self, Self::Err> {
1086 if let Some((prefix, suffix)) = input.split_once(EXECUTION_ID_INFIX) {
1087 let top_level = PrefixedUlid::from_str(prefix)
1088 .map_err(ExecutionIdDerivedParseError::PrefixedUlidParseError)?;
1089 let Some((infix, idx)) = suffix.rsplit_once(EXECUTION_ID_JOIN_SET_INFIX) else {
1090 return Err(ExecutionIdDerivedParseError::SecondDelimiterNotFound);
1091 };
1092 let infix = Arc::from(infix);
1093 let idx =
1094 u64::from_str(idx).map_err(ExecutionIdDerivedParseError::ParseIndexError)?;
1095 Ok(ExecutionIdDerived {
1096 top_level,
1097 infix,
1098 idx,
1099 })
1100 } else {
1101 Err(ExecutionIdDerivedParseError::FirstDelimiterNotFound)
1102 }
1103 }
1104 }
1105 impl<'a> Arbitrary<'a> for ExecutionIdDerived {
1106 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1107 let top_level = ExecutionId::TopLevel(ExecutionIdTopLevel::arbitrary(u)?);
1108 let join_set_id = JoinSetId::arbitrary(u)?;
1109 Ok(top_level.next_level(&join_set_id))
1110 }
1111 }
1112 #[derive(Debug, thiserror::Error)]
1113 pub enum ExecutionIdDerivedParseError {
1114 #[error(transparent)]
1115 PrefixedUlidParseError(PrefixedUlidParseError),
1116 #[error("cannot parse derived execution id - delimiter `{EXECUTION_ID_INFIX}` not found")]
1117 FirstDelimiterNotFound,
1118 #[error(
1119 "cannot parse derived execution id - delimiter `{EXECUTION_ID_JOIN_SET_INFIX}` not found"
1120 )]
1121 SecondDelimiterNotFound,
1122 #[error(
1123 "cannot parse derived execution id - suffix after `{EXECUTION_ID_JOIN_SET_INFIX}` must be a number"
1124 )]
1125 ParseIndexError(ParseIntError),
1126 }
1127
1128 impl ExecutionId {
1129 #[must_use]
1130 pub fn generate() -> Self {
1131 ExecutionId::TopLevel(PrefixedUlid::generate())
1132 }
1133
1134 #[must_use]
1135 pub fn get_top_level(&self) -> ExecutionIdTopLevel {
1136 match &self {
1137 ExecutionId::TopLevel(prefixed_ulid) => *prefixed_ulid,
1138 ExecutionId::Derived(ExecutionIdDerived { top_level, .. }) => *top_level,
1139 }
1140 }
1141
1142 #[must_use]
1143 pub fn is_top_level(&self) -> bool {
1144 matches!(self, ExecutionId::TopLevel(_))
1145 }
1146
1147 #[must_use]
1148 pub fn timestamp_part(&self) -> u64 {
1149 self.get_top_level().timestamp_part()
1150 }
1151
1152 #[must_use]
1153 pub fn random_seed(&self) -> u64 {
1154 let mut hasher = fxhash::FxHasher::default();
1155 hasher.write_u64(self.get_top_level().random_part());
1156 hasher.write_u64(self.timestamp_part());
1157 if let ExecutionId::Derived(ExecutionIdDerived {
1158 top_level: _,
1159 infix,
1160 idx,
1161 }) = self
1162 {
1163 hasher.write(infix.as_bytes());
1164 hasher.write_u64(*idx);
1165 }
1166 hasher.finish()
1167 }
1168
1169 #[must_use]
1170 pub const fn from_parts(timestamp_ms: u64, random_part: u128) -> Self {
1171 ExecutionId::TopLevel(ExecutionIdTopLevel::from_parts(timestamp_ms, random_part))
1172 }
1173
1174 #[must_use]
1175 pub fn next_level(&self, join_set_id: &JoinSetId) -> ExecutionIdDerived {
1176 match &self {
1177 ExecutionId::TopLevel(top_level) => ExecutionIdDerived {
1178 top_level: *top_level,
1179 infix: Arc::from(join_set_id.to_string()),
1180 idx: EXECUTION_ID_START_IDX,
1181 },
1182 ExecutionId::Derived(derived) => derived.next_level(join_set_id),
1183 }
1184 }
1185
1186 fn display_or_debug(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1187 match &self {
1188 ExecutionId::TopLevel(top_level) => Display::fmt(top_level, f),
1189 ExecutionId::Derived(derived) => Display::fmt(derived, f),
1190 }
1191 }
1192 }
1193
1194 const EXECUTION_ID_INFIX: char = '.';
1195 const EXECUTION_ID_JOIN_SET_INFIX: char = '_';
1196 const EXECUTION_ID_START_IDX: u64 = 1;
1197 pub const JOIN_SET_START_IDX: u64 = 1;
1198
1199 #[derive(Debug, thiserror::Error)]
1200 pub enum ExecutionIdParseError {
1201 #[error(transparent)]
1202 PrefixedUlidParseError(#[from] PrefixedUlidParseError),
1203 #[error(
1204 "cannot parse derived execution id - first delimiter `{EXECUTION_ID_INFIX}` not found"
1205 )]
1206 FirstDelimiterNotFound,
1207 #[error(
1208 "cannot parse derived execution id - second delimiter `{EXECUTION_ID_INFIX}` not found"
1209 )]
1210 SecondDelimiterNotFound,
1211 #[error("cannot parse derived execution id - last suffix must be a number")]
1212 ParseIndexError(#[from] ParseIntError),
1213 }
1214
1215 impl FromStr for ExecutionId {
1216 type Err = ExecutionIdParseError;
1217
1218 fn from_str(input: &str) -> Result<Self, Self::Err> {
1219 if input.contains(EXECUTION_ID_INFIX) {
1220 ExecutionIdDerived::from_str(input)
1221 .map(ExecutionId::Derived)
1222 .map_err(|err| match err {
1223 ExecutionIdDerivedParseError::FirstDelimiterNotFound => {
1224 unreachable!("first delimiter checked")
1225 }
1226 ExecutionIdDerivedParseError::SecondDelimiterNotFound => {
1227 ExecutionIdParseError::SecondDelimiterNotFound
1228 }
1229 ExecutionIdDerivedParseError::PrefixedUlidParseError(err) => {
1230 ExecutionIdParseError::PrefixedUlidParseError(err)
1231 }
1232 ExecutionIdDerivedParseError::ParseIndexError(err) => {
1233 ExecutionIdParseError::ParseIndexError(err)
1234 }
1235 })
1236 } else {
1237 Ok(ExecutionId::TopLevel(PrefixedUlid::from_str(input)?))
1238 }
1239 }
1240 }
1241
1242 impl Debug for ExecutionId {
1243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1244 self.display_or_debug(f)
1245 }
1246 }
1247
1248 impl Display for ExecutionId {
1249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1250 self.display_or_debug(f)
1251 }
1252 }
1253
1254 impl<'a> Arbitrary<'a> for ExecutionId {
1255 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1256 Ok(ExecutionId::TopLevel(PrefixedUlid::arbitrary(u)?))
1257 }
1258 }
1259}
1260
1261#[derive(
1262 Debug,
1263 Clone,
1264 PartialEq,
1265 Eq,
1266 Hash,
1267 derive_more::Display,
1268 serde_with::SerializeDisplay,
1269 serde_with::DeserializeFromStr,
1270)]
1271#[non_exhaustive] #[display("{kind}{JOIN_SET_ID_INFIX}{name}")]
1273pub struct JoinSetId {
1274 pub kind: JoinSetKind,
1275 pub name: StrVariant,
1276}
1277
1278#[derive(
1279 Debug,
1280 Clone,
1281 Copy,
1282 PartialEq,
1283 Eq,
1284 derive_more::Display,
1285 arbitrary::Arbitrary,
1286 Serialize,
1287 Deserialize,
1288)]
1289#[serde(rename_all = "snake_case")]
1290pub enum ClosingStrategy {
1291 Complete,
1292}
1293
1294impl JoinSetId {
1295 pub fn new(kind: JoinSetKind, name: StrVariant) -> Result<Self, InvalidNameError<JoinSetId>> {
1296 Ok(Self {
1297 kind,
1298 name: check_name(name, CHARSET_EXTRA_JSON_SET)?,
1299 })
1300 }
1301}
1302const CHARSET_JOIN_SET_NAME: &str =
1303 const_format::concatcp!(CHARSET_ALPHANUMERIC, CHARSET_EXTRA_JSON_SET);
1304
1305pub const CHARSET_ALPHANUMERIC: &str =
1306 "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
1307
1308#[derive(
1309 Debug,
1310 Clone,
1311 Copy,
1312 PartialEq,
1313 Eq,
1314 Hash,
1315 derive_more::Display,
1316 Serialize,
1317 Deserialize,
1318 strum::EnumIter,
1319 Arbitrary,
1320)]
1321#[display("{}", self.as_code())]
1322pub enum JoinSetKind {
1323 OneOff,
1324 Named,
1325 Generated,
1326}
1327impl JoinSetKind {
1328 fn as_code(&self) -> &'static str {
1329 match self {
1330 JoinSetKind::OneOff => "o",
1331 JoinSetKind::Named => "n",
1332 JoinSetKind::Generated => "g",
1333 }
1334 }
1335}
1336impl FromStr for JoinSetKind {
1337 type Err = &'static str;
1338 fn from_str(s: &str) -> Result<Self, Self::Err> {
1339 use strum::IntoEnumIterator;
1340 Self::iter()
1341 .find(|variant| s == variant.as_code())
1342 .ok_or("unknown join set kind")
1343 }
1344}
1345
1346pub const JOIN_SET_ID_INFIX: char = ':';
1347const CHARSET_EXTRA_JSON_SET: &str = "_-/";
1348
1349impl FromStr for JoinSetId {
1350 type Err = JoinSetIdParseError;
1351
1352 fn from_str(input: &str) -> Result<Self, Self::Err> {
1353 let Some((kind, name)) = input.split_once(JOIN_SET_ID_INFIX) else {
1354 return Err(JoinSetIdParseError::WrongParts);
1355 };
1356 let kind = kind
1357 .parse()
1358 .map_err(JoinSetIdParseError::JoinSetKindParseError)?;
1359 Ok(JoinSetId::new(kind, StrVariant::from(name.to_string()))?)
1360 }
1361}
1362
1363#[derive(Debug, thiserror::Error)]
1364pub enum JoinSetIdParseError {
1365 #[error("join set must consist of three parts separated by {JOIN_SET_ID_INFIX} ")]
1366 WrongParts,
1367 #[error("cannot parse join set id's execution id - {0}")]
1368 ExecutionIdParseError(#[from] ExecutionIdParseError),
1369 #[error("cannot parse join set kind - {0}")]
1370 JoinSetKindParseError(&'static str),
1371 #[error("cannot parse join set id - {0}")]
1372 InvalidName(#[from] InvalidNameError<JoinSetId>),
1373}
1374
1375impl<'a> Arbitrary<'a> for JoinSetId {
1376 fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
1377 let name: String = {
1378 let length_inclusive = u.int_in_range(0..=10).unwrap();
1379 (0..=length_inclusive)
1380 .map(|_| {
1381 let idx = u.choose_index(CHARSET_JOIN_SET_NAME.len()).unwrap();
1382 CHARSET_JOIN_SET_NAME
1383 .chars()
1384 .nth(idx)
1385 .expect("idx is < charset.len()")
1386 })
1387 .collect()
1388 };
1389
1390 Ok(JoinSetId::new(JoinSetKind::Generated, StrVariant::from(name)).unwrap())
1391 }
1392}
1393
1394#[derive(
1395 Debug,
1396 Clone,
1397 Copy,
1398 strum::Display,
1399 PartialEq,
1400 Eq,
1401 strum::EnumString,
1402 Hash,
1403 serde_with::SerializeDisplay,
1404 serde_with::DeserializeFromStr,
1405)]
1406#[strum(serialize_all = "snake_case")]
1407pub enum ComponentType {
1408 ActivityWasm,
1409 Workflow,
1410 WebhookEndpoint,
1411}
1412
1413#[derive(
1414 derive_more::Debug,
1415 Clone,
1416 PartialEq,
1417 Eq,
1418 Hash,
1419 serde_with::SerializeDisplay,
1420 serde_with::DeserializeFromStr,
1421 derive_more::Display,
1422)]
1423#[display("{component_type}:{name}")]
1424#[debug("{}", self)]
1425#[non_exhaustive] pub struct ComponentId {
1427 pub component_type: ComponentType,
1428 pub name: StrVariant,
1429}
1430impl ComponentId {
1431 pub fn new(
1432 component_type: ComponentType,
1433 name: StrVariant,
1434 ) -> Result<Self, InvalidNameError<Self>> {
1435 Ok(Self {
1436 component_type,
1437 name: check_name(name, "_")?,
1438 })
1439 }
1440
1441 #[must_use]
1442 pub const fn dummy_activity() -> Self {
1443 Self {
1444 component_type: ComponentType::ActivityWasm,
1445 name: StrVariant::empty(),
1446 }
1447 }
1448
1449 #[must_use]
1450 pub const fn dummy_workflow() -> ComponentId {
1451 ComponentId {
1452 component_type: ComponentType::Workflow,
1453 name: StrVariant::empty(),
1454 }
1455 }
1456}
1457
1458pub fn check_name<T>(
1459 name: StrVariant,
1460 special: &'static str,
1461) -> Result<StrVariant, InvalidNameError<T>> {
1462 if let Some(invalid) = name
1463 .as_ref()
1464 .chars()
1465 .find(|c| !c.is_ascii_alphanumeric() && !special.contains(*c))
1466 {
1467 Err(InvalidNameError::<T> {
1468 invalid,
1469 name: name.as_ref().to_string(),
1470 special,
1471 phantom_data: PhantomData,
1472 })
1473 } else {
1474 Ok(name)
1475 }
1476}
1477#[derive(Debug, thiserror::Error)]
1478#[error(
1479 "name of {} `{name}` contains invalid character `{invalid}`, must only contain alphanumeric characters and following characters {special}",
1480 std::any::type_name::<T>().rsplit("::").next().unwrap()
1481)]
1482pub struct InvalidNameError<T> {
1483 invalid: char,
1484 name: String,
1485 special: &'static str,
1486 phantom_data: PhantomData<T>,
1487}
1488
1489#[derive(Debug, thiserror::Error)]
1490pub enum ConfigIdParseError {
1491 #[error("cannot parse ComponentConfigHash - delimiter ':' not found")]
1492 DelimiterNotFound,
1493 #[error("cannot parse prefix of ComponentConfigHash - {0}")]
1494 ComponentTypeParseError(#[from] strum::ParseError),
1495}
1496
1497impl FromStr for ComponentId {
1498 type Err = ConfigIdParseError;
1499
1500 fn from_str(input: &str) -> Result<Self, Self::Err> {
1501 let (component_type, name) = input.split_once(':').ok_or(Self::Err::DelimiterNotFound)?;
1502 let component_type = component_type.parse()?;
1503 Ok(Self {
1504 component_type,
1505 name: StrVariant::from(name.to_string()),
1506 })
1507 }
1508}
1509
1510#[derive(
1511 Debug,
1512 Clone,
1513 Copy,
1514 strum::Display,
1515 strum::EnumString,
1516 PartialEq,
1517 Eq,
1518 Hash,
1519 serde_with::SerializeDisplay,
1520 serde_with::DeserializeFromStr,
1521)]
1522#[strum(serialize_all = "snake_case")]
1523pub enum HashType {
1524 Sha256,
1525}
1526
1527#[derive(
1528 Debug,
1529 Clone,
1530 derive_more::Display,
1531 derive_more::FromStr,
1532 derive_more::Deref,
1533 PartialEq,
1534 Eq,
1535 Hash,
1536 serde_with::SerializeDisplay,
1537 serde_with::DeserializeFromStr,
1538)]
1539pub struct ContentDigest(pub Digest);
1540pub const CONTENT_DIGEST_DUMMY: ContentDigest = ContentDigest(Digest {
1541 hash_type: HashType::Sha256,
1542 hash_base16: StrVariant::empty(),
1543});
1544
1545impl ContentDigest {
1546 #[must_use]
1547 pub fn new(hash_type: HashType, hash_base16: String) -> Self {
1548 Self(Digest::new(hash_type, hash_base16))
1549 }
1550}
1551
1552#[derive(
1553 Debug,
1554 Clone,
1555 derive_more::Display,
1556 PartialEq,
1557 Eq,
1558 Hash,
1559 serde_with::SerializeDisplay,
1560 serde_with::DeserializeFromStr,
1561)]
1562#[display("{hash_type}:{hash_base16}")]
1563pub struct Digest {
1564 hash_type: HashType,
1565 hash_base16: StrVariant,
1566}
1567impl Digest {
1568 #[must_use]
1569 pub fn new(hash_type: HashType, hash_base16: String) -> Self {
1570 Self {
1571 hash_type,
1572 hash_base16: StrVariant::Arc(Arc::from(hash_base16)),
1573 }
1574 }
1575
1576 #[must_use]
1577 pub fn hash_type(&self) -> HashType {
1578 self.hash_type
1579 }
1580
1581 #[must_use]
1582 pub fn digest_base16(&self) -> &str {
1583 &self.hash_base16
1584 }
1585}
1586
1587#[derive(Debug, thiserror::Error)]
1588pub enum DigestParseErrror {
1589 #[error("cannot parse ContentDigest - delimiter ':' not found")]
1590 DelimiterNotFound,
1591 #[error("cannot parse ContentDigest - invalid prefix `{hash_type}`")]
1592 TypeParseError { hash_type: String },
1593 #[error("cannot parse ContentDigest - invalid suffix length, expected 64 hex digits, got {0}")]
1594 SuffixLength(usize),
1595 #[error("cannot parse ContentDigest - suffix must be hex-encoded, got invalid character `{0}`")]
1596 SuffixInvalid(char),
1597}
1598
1599impl FromStr for Digest {
1600 type Err = DigestParseErrror;
1601
1602 fn from_str(input: &str) -> Result<Self, Self::Err> {
1603 let (hash_type, hash_base16) = input.split_once(':').ok_or(Self::Err::DelimiterNotFound)?;
1604 let hash_type =
1605 HashType::from_str(hash_type).map_err(|_err| Self::Err::TypeParseError {
1606 hash_type: hash_type.to_string(),
1607 })?;
1608 if hash_base16.len() != 64 {
1609 return Err(Self::Err::SuffixLength(hash_base16.len()));
1610 }
1611 if let Some(invalid) = hash_base16.chars().find(|c| !c.is_ascii_hexdigit()) {
1612 return Err(Self::Err::SuffixInvalid(invalid));
1613 }
1614 Ok(Self {
1615 hash_type,
1616 hash_base16: StrVariant::Arc(Arc::from(hash_base16)),
1617 })
1618 }
1619}
1620
1621#[derive(
1622 Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, derive_more::Display,
1623)]
1624#[display("{wit_type}")]
1625pub struct ReturnType {
1626 pub type_wrapper: TypeWrapper,
1627 pub wit_type: StrVariant,
1628}
1629
1630#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, derive_more::Display)]
1631#[derive_where::derive_where(PartialEq)]
1632#[display("{name}: {wit_type}")]
1633pub struct ParameterType {
1634 pub type_wrapper: TypeWrapper,
1635 #[derive_where(skip)]
1636 pub name: StrVariant,
1638 pub wit_type: StrVariant,
1639}
1640
1641#[derive(
1642 Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default, derive_more::Deref,
1643)]
1644pub struct ParameterTypes(pub Vec<ParameterType>);
1645
1646impl Debug for ParameterTypes {
1647 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1648 write!(f, "(")?;
1649 let mut iter = self.0.iter().peekable();
1650 while let Some(p) = iter.next() {
1651 write!(f, "{p:?}")?;
1652 if iter.peek().is_some() {
1653 write!(f, ", ")?;
1654 }
1655 }
1656 write!(f, ")")
1657 }
1658}
1659
1660impl Display for ParameterTypes {
1661 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1662 write!(f, "(")?;
1663 let mut iter = self.0.iter().peekable();
1664 while let Some(p) = iter.next() {
1665 write!(f, "{p}")?;
1666 if iter.peek().is_some() {
1667 write!(f, ", ")?;
1668 }
1669 }
1670 write!(f, ")")
1671 }
1672}
1673
1674#[derive(Debug, Clone)]
1675pub struct PackageIfcFns {
1676 pub ifc_fqn: IfcFqnName,
1677 pub extension: bool,
1678 pub fns: IndexMap<FnName, FunctionMetadata>,
1679}
1680
1681#[derive(Debug, Clone, Copy)]
1682pub struct ComponentRetryConfig {
1683 pub max_retries: u32,
1684 pub retry_exp_backoff: Duration,
1685}
1686
1687#[async_trait]
1689pub trait FunctionRegistry: Send + Sync {
1690 async fn get_by_exported_function(
1691 &self,
1692 ffqn: &FunctionFqn,
1693 ) -> Option<(FunctionMetadata, ComponentId, ComponentRetryConfig)>;
1694
1695 fn all_exports(&self) -> &[PackageIfcFns];
1696}
1697
1698#[derive(Debug, Default, Clone, Serialize, Deserialize, derive_more::Display, PartialEq, Eq)]
1699#[display("{_0:?}")]
1700pub struct ExecutionMetadata(Option<hashbrown::HashMap<String, String>>);
1701
1702impl ExecutionMetadata {
1703 const LINKED_KEY: &str = "obelisk-tracing-linked";
1704 #[must_use]
1705 pub const fn empty() -> Self {
1706 Self(None)
1708 }
1709
1710 #[must_use]
1711 pub fn from_parent_span(less_specific: &Span) -> Self {
1712 ExecutionMetadata::create(less_specific, false)
1713 }
1714
1715 #[must_use]
1716 pub fn from_linked_span(less_specific: &Span) -> Self {
1717 ExecutionMetadata::create(less_specific, true)
1718 }
1719
1720 #[must_use]
1725 #[expect(clippy::items_after_statements)]
1726 fn create(span: &Span, link_marker: bool) -> Self {
1727 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
1728 let mut metadata = Self(Some(hashbrown::HashMap::default()));
1729 let mut metadata_view = ExecutionMetadataInjectorView {
1730 metadata: &mut metadata,
1731 };
1732 fn inject(s: &Span, metadata_view: &mut ExecutionMetadataInjectorView) {
1734 opentelemetry::global::get_text_map_propagator(|propagator| {
1735 propagator.inject_context(&s.context(), metadata_view);
1736 });
1737 }
1738 inject(&Span::current(), &mut metadata_view);
1739 if metadata_view.is_empty() {
1740 inject(span, &mut metadata_view);
1742 }
1743 if link_marker {
1744 metadata_view.set(Self::LINKED_KEY, String::new());
1745 }
1746 metadata
1747 }
1748
1749 pub fn enrich(&self, span: &Span) {
1750 use opentelemetry::trace::TraceContextExt as _;
1751 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
1752
1753 let metadata_view = ExecutionMetadataExtractorView { metadata: self };
1754 let otel_context = opentelemetry::global::get_text_map_propagator(|propagator| {
1755 propagator.extract(&metadata_view)
1756 });
1757 if metadata_view.get(Self::LINKED_KEY).is_some() {
1758 let linked_span_context = otel_context.span().span_context().clone();
1759 span.add_link(linked_span_context);
1760 } else {
1761 span.set_parent(otel_context);
1762 }
1763 }
1764}
1765
1766struct ExecutionMetadataInjectorView<'a> {
1767 metadata: &'a mut ExecutionMetadata,
1768}
1769
1770impl ExecutionMetadataInjectorView<'_> {
1771 fn is_empty(&self) -> bool {
1772 self.metadata
1773 .0
1774 .as_ref()
1775 .is_some_and(hashbrown::HashMap::is_empty)
1776 }
1777}
1778
1779impl opentelemetry::propagation::Injector for ExecutionMetadataInjectorView<'_> {
1780 fn set(&mut self, key: &str, value: String) {
1781 let key = format!("tracing:{key}");
1782 let map = if let Some(map) = self.metadata.0.as_mut() {
1783 map
1784 } else {
1785 self.metadata.0 = Some(hashbrown::HashMap::new());
1786 assert_matches!(&mut self.metadata.0, Some(map) => map)
1787 };
1788 map.insert(key, value);
1789 }
1790}
1791
1792struct ExecutionMetadataExtractorView<'a> {
1793 metadata: &'a ExecutionMetadata,
1794}
1795
1796impl opentelemetry::propagation::Extractor for ExecutionMetadataExtractorView<'_> {
1797 fn get(&self, key: &str) -> Option<&str> {
1798 self.metadata
1799 .0
1800 .as_ref()
1801 .and_then(|map| map.get(&format!("tracing:{key}")))
1802 .map(std::string::String::as_str)
1803 }
1804
1805 fn keys(&self) -> Vec<&str> {
1806 match &self.metadata.0.as_ref() {
1807 Some(map) => map
1808 .keys()
1809 .filter_map(|key| key.strip_prefix("tracing:"))
1810 .collect(),
1811 None => vec![],
1812 }
1813 }
1814}
1815
1816#[cfg(test)]
1817mod tests {
1818
1819 use crate::{
1820 ExecutionId, FunctionFqn, JoinSetId, JoinSetKind, StrVariant, prefixed_ulid::ExecutorId,
1821 };
1822 use std::{
1823 hash::{DefaultHasher, Hash, Hasher},
1824 str::FromStr,
1825 sync::Arc,
1826 };
1827
1828 #[cfg(madsim)]
1829 #[test]
1830 fn ulid_generation_should_be_deterministic() {
1831 let mut builder_a = madsim::runtime::Builder::from_env();
1832 builder_a.check = true;
1833
1834 let mut builder_b = madsim::runtime::Builder::from_env(); builder_b.check = true;
1836 builder_b.seed = builder_a.seed;
1837
1838 assert_eq!(
1839 builder_a.run(|| async { ulid::Ulid::new() }),
1840 builder_b.run(|| async { ulid::Ulid::new() })
1841 );
1842 }
1843
1844 #[test]
1845 fn ulid_parsing() {
1846 let generated = ExecutorId::generate();
1847 let str = generated.to_string();
1848 let parsed = str.parse().unwrap();
1849 assert_eq!(generated, parsed);
1850 }
1851
1852 #[test]
1853 fn execution_id_parsing_top_level() {
1854 let generated = ExecutionId::generate();
1855 let str = generated.to_string();
1856 let parsed = str.parse().unwrap();
1857 assert_eq!(generated, parsed);
1858 }
1859
1860 #[test]
1861 fn execution_id_with_one_level_should_parse() {
1862 let top_level = ExecutionId::generate();
1863 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
1864 let first_child = ExecutionId::Derived(top_level.next_level(&join_set_id));
1865 let ser = first_child.to_string();
1866 assert_eq!(format!("{top_level}.n:name_1"), ser);
1867 let parsed = ExecutionId::from_str(&ser).unwrap();
1868 assert_eq!(first_child, parsed);
1869 }
1870
1871 #[test]
1872 fn execution_id_increment_twice() {
1873 let top_level = ExecutionId::generate();
1874 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
1875 let first_child = top_level.next_level(&join_set_id);
1876 let second_child = ExecutionId::Derived(first_child.get_incremented());
1877 let ser = second_child.to_string();
1878 assert_eq!(format!("{top_level}.n:name_2"), ser);
1879 let parsed = ExecutionId::from_str(&ser).unwrap();
1880 assert_eq!(second_child, parsed);
1881 }
1882
1883 #[test]
1884 fn execution_id_next_level_twice() {
1885 let top_level = ExecutionId::generate();
1886 let join_set_id_outer =
1887 JoinSetId::new(JoinSetKind::Generated, StrVariant::Static("gg")).unwrap();
1888 let join_set_id_inner =
1889 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
1890 let execution_id = ExecutionId::Derived(
1891 top_level
1892 .next_level(&join_set_id_outer)
1893 .get_incremented()
1894 .next_level(&join_set_id_inner)
1895 .get_incremented(),
1896 );
1897 let ser = execution_id.to_string();
1898 assert_eq!(format!("{top_level}.g:gg_2.o:oo_2"), ser);
1899 let parsed = ExecutionId::from_str(&ser).unwrap();
1900 assert_eq!(execution_id, parsed);
1901 }
1902
1903 #[test]
1904 fn execution_id_hash_should_be_stable() {
1905 let parent = ExecutionId::from_parts(1, 2);
1906 let join_set_id = JoinSetId::new(JoinSetKind::Named, StrVariant::Static("name")).unwrap();
1907 let sibling_1 = parent.next_level(&join_set_id);
1908 let sibling_2 = ExecutionId::Derived(sibling_1.get_incremented());
1909 let sibling_1 = ExecutionId::Derived(sibling_1);
1910 let join_set_id_inner =
1911 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("oo")).unwrap();
1912 let child =
1913 ExecutionId::Derived(sibling_1.next_level(&join_set_id_inner).get_incremented());
1914 let parent = parent.random_seed();
1915 let sibling_1 = sibling_1.random_seed();
1916 let sibling_2 = sibling_2.random_seed();
1917 let child = child.random_seed();
1918 let vec = vec![parent, sibling_1, sibling_2, child];
1919 insta::assert_debug_snapshot!(vec);
1920 let set: hashbrown::HashSet<_> = vec.into_iter().collect();
1922 assert_eq!(4, set.len());
1923 }
1924
1925 #[test]
1926 fn hash_of_str_variants_should_be_equal() {
1927 let input = "foo";
1928 let left = StrVariant::Arc(Arc::from(input));
1929 let right = StrVariant::Static(input);
1930 assert_eq!(left, right);
1931 let mut left_hasher = DefaultHasher::new();
1932 left.hash(&mut left_hasher);
1933 let mut right_hasher = DefaultHasher::new();
1934 right.hash(&mut right_hasher);
1935 let left_hasher = left_hasher.finish();
1936 let right_hasher = right_hasher.finish();
1937 println!("left: {left_hasher:x}, right: {right_hasher:x}");
1938 assert_eq!(left_hasher, right_hasher);
1939 }
1940
1941 #[test]
1942 fn ffqn_from_tuple_with_version_should_work() {
1943 let ffqn = FunctionFqn::try_from_tuple("wasi:cli/run@0.2.0", "run").unwrap();
1944 assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
1945 }
1946
1947 #[test]
1948 fn ffqn_from_str_with_version_should_work() {
1949 let ffqn = FunctionFqn::from_str("wasi:cli/run@0.2.0.run").unwrap();
1950 assert_eq!(FunctionFqn::new_static("wasi:cli/run@0.2.0", "run"), ffqn);
1951 }
1952
1953 #[cfg(madsim)]
1954 #[tokio::test]
1955 async fn join_set_serde_should_be_consistent() {
1956 use crate::{JoinSetId, JoinSetKind};
1957 use strum::IntoEnumIterator;
1958 for kind in JoinSetKind::iter() {
1959 let join_set_id = JoinSetId::new(kind, StrVariant::from("name")).unwrap();
1960 let ser = serde_json::to_string(&join_set_id).unwrap();
1961 let deser = serde_json::from_str(&ser).unwrap();
1962 assert_eq!(join_set_id, deser);
1963 }
1964 }
1965}