1#![cfg_attr(not(feature = "std"), no_std)]
2#![forbid(unsafe_code)]
3#![deny(missing_docs)]
4#![warn(clippy::unwrap_used, clippy::expect_used)]
5
6extern crate alloc;
51
52pub const VERSION: &str = env!("CARGO_PKG_VERSION");
54pub const SCHEMA_BASE_URL: &str = "https://greentic-ai.github.io/greentic-types/schemas/v1";
56
57pub mod capabilities;
58pub mod pack_spec;
59
60pub mod context;
61pub mod error;
62pub mod outcome;
63pub mod pack;
64pub mod policy;
65pub mod run;
66#[cfg(all(feature = "schemars", feature = "std"))]
67pub mod schema;
68pub mod session;
69pub mod state;
70pub mod telemetry;
71pub mod tenant;
72
73pub use capabilities::{
74 Capabilities, FsCaps, HttpCaps, KvCaps, Limits, NetCaps, SecretsCaps, TelemetrySpec, ToolsCaps,
75};
76pub use context::{Cloud, DeploymentCtx, Platform};
77pub use error::{ErrorCode, GResult, GreenticError};
78pub use outcome::Outcome;
79pub use pack::{PackRef, Signature, SignatureAlgorithm};
80pub use pack_spec::{PackSpec, ToolSpec};
81pub use policy::{AllowList, NetworkPolicy, PolicyDecision, Protocol};
82#[cfg(feature = "time")]
83pub use run::RunResult;
84pub use run::{NodeFailure, NodeStatus, NodeSummary, RunStatus, TranscriptOffset};
85pub use session::{SessionCursor, SessionData, SessionKey};
86pub use state::{StateKey, StatePath};
87#[cfg(feature = "otel-keys")]
88pub use telemetry::OtlpKeys;
89pub use telemetry::SpanContext;
90#[cfg(feature = "telemetry-autoinit")]
91pub use telemetry::TelemetryCtx;
92pub use tenant::{Impersonation, TenantIdentity};
93
94#[cfg(feature = "schemars")]
95use alloc::borrow::Cow;
96use alloc::{borrow::ToOwned, format, string::String, vec::Vec};
97use core::fmt;
98use core::str::FromStr;
99#[cfg(feature = "schemars")]
100use schemars::JsonSchema;
101use semver::VersionReq;
102#[cfg(feature = "time")]
103use time::OffsetDateTime;
104
105#[cfg(feature = "serde")]
106use serde::{Deserialize, Serialize};
107
108#[cfg(feature = "std")]
109use alloc::boxed::Box;
110
111#[cfg(feature = "std")]
112use std::error::Error as StdError;
113
114fn validate_identifier(value: &str, label: &str) -> GResult<()> {
115 if value.is_empty() {
116 return Err(GreenticError::new(
117 ErrorCode::InvalidInput,
118 format!("{label} must not be empty"),
119 ));
120 }
121 if value
122 .chars()
123 .any(|c| !(c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-')))
124 {
125 return Err(GreenticError::new(
126 ErrorCode::InvalidInput,
127 format!("{label} must contain only ASCII letters, digits, '.', '-', or '_'"),
128 ));
129 }
130 Ok(())
131}
132
133pub mod ids {
135 pub const PACK_ID: &str =
137 "https://greentic-ai.github.io/greentic-types/schemas/v1/pack-id.schema.json";
138 pub const COMPONENT_ID: &str =
140 "https://greentic-ai.github.io/greentic-types/schemas/v1/component-id.schema.json";
141 pub const FLOW_ID: &str =
143 "https://greentic-ai.github.io/greentic-types/schemas/v1/flow-id.schema.json";
144 pub const NODE_ID: &str =
146 "https://greentic-ai.github.io/greentic-types/schemas/v1/node-id.schema.json";
147 pub const TENANT_CONTEXT: &str =
149 "https://greentic-ai.github.io/greentic-types/schemas/v1/tenant-context.schema.json";
150 pub const HASH_DIGEST: &str =
152 "https://greentic-ai.github.io/greentic-types/schemas/v1/hash-digest.schema.json";
153 pub const SEMVER_REQ: &str =
155 "https://greentic-ai.github.io/greentic-types/schemas/v1/semver-req.schema.json";
156 pub const REDACTION_PATH: &str =
158 "https://greentic-ai.github.io/greentic-types/schemas/v1/redaction-path.schema.json";
159 pub const CAPABILITIES: &str =
161 "https://greentic-ai.github.io/greentic-types/schemas/v1/capabilities.schema.json";
162 pub const LIMITS: &str =
164 "https://greentic-ai.github.io/greentic-types/schemas/v1/limits.schema.json";
165 pub const TELEMETRY_SPEC: &str =
167 "https://greentic-ai.github.io/greentic-types/schemas/v1/telemetry-spec.schema.json";
168 pub const NODE_SUMMARY: &str =
170 "https://greentic-ai.github.io/greentic-types/schemas/v1/node-summary.schema.json";
171 pub const NODE_FAILURE: &str =
173 "https://greentic-ai.github.io/greentic-types/schemas/v1/node-failure.schema.json";
174 pub const NODE_STATUS: &str =
176 "https://greentic-ai.github.io/greentic-types/schemas/v1/node-status.schema.json";
177 pub const RUN_STATUS: &str =
179 "https://greentic-ai.github.io/greentic-types/schemas/v1/run-status.schema.json";
180 pub const TRANSCRIPT_OFFSET: &str =
182 "https://greentic-ai.github.io/greentic-types/schemas/v1/transcript-offset.schema.json";
183 pub const TOOLS_CAPS: &str =
185 "https://greentic-ai.github.io/greentic-types/schemas/v1/tools-caps.schema.json";
186 pub const SECRETS_CAPS: &str =
188 "https://greentic-ai.github.io/greentic-types/schemas/v1/secrets-caps.schema.json";
189 pub const OTLP_KEYS: &str =
191 "https://greentic-ai.github.io/greentic-types/schemas/v1/otlp-keys.schema.json";
192 pub const RUN_RESULT: &str =
194 "https://greentic-ai.github.io/greentic-types/schemas/v1/run-result.schema.json";
195}
196
197#[cfg(all(feature = "schema", feature = "std"))]
198pub fn write_all_schemas(out_dir: &std::path::Path) -> anyhow::Result<()> {
200 use anyhow::Context;
201 use std::fs;
202
203 fs::create_dir_all(out_dir)
204 .with_context(|| format!("failed to create {}", out_dir.display()))?;
205
206 for entry in crate::schema::entries() {
207 let schema = (entry.generator)();
208 let path = out_dir.join(entry.file_name);
209 if let Some(parent) = path.parent() {
210 fs::create_dir_all(parent)
211 .with_context(|| format!("failed to create {}", parent.display()))?;
212 }
213
214 let json =
215 serde_json::to_vec_pretty(&schema).context("failed to serialize schema to JSON")?;
216 fs::write(&path, json).with_context(|| format!("failed to write {}", path.display()))?;
217 }
218
219 Ok(())
220}
221
222macro_rules! id_newtype {
223 ($name:ident, $doc:literal) => {
224 #[doc = $doc]
225 #[derive(Clone, Debug, Eq, PartialEq, Hash)]
226 #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
227 #[cfg_attr(feature = "schemars", derive(JsonSchema))]
228 #[cfg_attr(feature = "serde", serde(try_from = "String", into = "String"))]
229 pub struct $name(pub String);
230
231 impl $name {
232 pub fn as_str(&self) -> &str {
234 &self.0
235 }
236
237 pub fn new(value: impl AsRef<str>) -> GResult<Self> {
239 value.as_ref().parse()
240 }
241 }
242
243 impl From<$name> for String {
244 fn from(value: $name) -> Self {
245 value.0
246 }
247 }
248
249 impl AsRef<str> for $name {
250 fn as_ref(&self) -> &str {
251 self.as_str()
252 }
253 }
254
255 impl fmt::Display for $name {
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 f.write_str(self.as_str())
258 }
259 }
260
261 impl FromStr for $name {
262 type Err = GreenticError;
263
264 fn from_str(value: &str) -> Result<Self, Self::Err> {
265 validate_identifier(value, stringify!($name))?;
266 Ok(Self(value.to_owned()))
267 }
268 }
269
270 impl TryFrom<String> for $name {
271 type Error = GreenticError;
272
273 fn try_from(value: String) -> Result<Self, Self::Error> {
274 $name::from_str(&value)
275 }
276 }
277
278 impl TryFrom<&str> for $name {
279 type Error = GreenticError;
280
281 fn try_from(value: &str) -> Result<Self, Self::Error> {
282 $name::from_str(value)
283 }
284 }
285 };
286}
287
288id_newtype!(EnvId, "Environment identifier for a tenant context.");
289id_newtype!(TenantId, "Tenant identifier within an environment.");
290id_newtype!(TeamId, "Team identifier belonging to a tenant.");
291id_newtype!(UserId, "User identifier within a tenant.");
292id_newtype!(PackId, "Globally unique pack identifier.");
293id_newtype!(
294 ComponentId,
295 "Identifier referencing a component binding in a pack."
296);
297id_newtype!(FlowId, "Identifier referencing a flow inside a pack.");
298id_newtype!(NodeId, "Identifier referencing a node inside a flow graph.");
299
300#[derive(Clone, Debug, PartialEq, Eq, Hash)]
302#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
303#[cfg_attr(feature = "schemars", derive(JsonSchema))]
304pub struct TenantContext {
305 pub tenant_id: TenantId,
307 #[cfg_attr(
309 feature = "serde",
310 serde(default, skip_serializing_if = "Option::is_none")
311 )]
312 pub team_id: Option<TeamId>,
313 #[cfg_attr(
315 feature = "serde",
316 serde(default, skip_serializing_if = "Option::is_none")
317 )]
318 pub user_id: Option<UserId>,
319 #[cfg_attr(
321 feature = "serde",
322 serde(default, skip_serializing_if = "Option::is_none")
323 )]
324 pub session_id: Option<String>,
325}
326
327impl TenantContext {
328 pub fn new(tenant_id: TenantId) -> Self {
330 Self {
331 tenant_id,
332 team_id: None,
333 user_id: None,
334 session_id: None,
335 }
336 }
337}
338
339impl From<&TenantCtx> for TenantContext {
340 fn from(ctx: &TenantCtx) -> Self {
341 Self {
342 tenant_id: ctx.tenant_id.clone(),
343 team_id: ctx.team_id.clone().or_else(|| ctx.team.clone()),
344 user_id: ctx.user_id.clone().or_else(|| ctx.user.clone()),
345 session_id: ctx.session_id.clone(),
346 }
347 }
348}
349
350#[derive(Clone, Debug, PartialEq, Eq, Hash)]
352#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
353#[cfg_attr(feature = "schemars", derive(JsonSchema))]
354#[cfg_attr(feature = "serde", serde(rename_all = "snake_case"))]
355pub enum HashAlgorithm {
356 Blake3,
358 Other(String),
360}
361
362#[derive(Clone, Debug, PartialEq, Eq, Hash)]
364#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
365#[cfg_attr(
366 feature = "serde",
367 serde(into = "HashDigestRepr", try_from = "HashDigestRepr")
368)]
369#[cfg_attr(feature = "schemars", derive(JsonSchema))]
370pub struct HashDigest {
371 pub algo: HashAlgorithm,
373 pub hex: String,
375}
376
377impl HashDigest {
378 pub fn new(algo: HashAlgorithm, hex: impl Into<String>) -> GResult<Self> {
380 let hex = hex.into();
381 validate_hex(&hex)?;
382 Ok(Self { algo, hex })
383 }
384
385 pub fn blake3(hex: impl Into<String>) -> GResult<Self> {
387 Self::new(HashAlgorithm::Blake3, hex)
388 }
389}
390
391#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
392#[cfg_attr(feature = "schemars", derive(JsonSchema))]
393struct HashDigestRepr {
394 algo: HashAlgorithm,
395 hex: String,
396}
397
398impl From<HashDigest> for HashDigestRepr {
399 fn from(value: HashDigest) -> Self {
400 Self {
401 algo: value.algo,
402 hex: value.hex,
403 }
404 }
405}
406
407impl TryFrom<HashDigestRepr> for HashDigest {
408 type Error = GreenticError;
409
410 fn try_from(value: HashDigestRepr) -> Result<Self, Self::Error> {
411 HashDigest::new(value.algo, value.hex)
412 }
413}
414
415fn validate_hex(hex: &str) -> GResult<()> {
416 if hex.is_empty() {
417 return Err(GreenticError::new(
418 ErrorCode::InvalidInput,
419 "digest hex payload must not be empty",
420 ));
421 }
422 if hex.len() % 2 != 0 {
423 return Err(GreenticError::new(
424 ErrorCode::InvalidInput,
425 "digest hex payload must have an even number of digits",
426 ));
427 }
428 if !hex.chars().all(|c| c.is_ascii_hexdigit()) {
429 return Err(GreenticError::new(
430 ErrorCode::InvalidInput,
431 "digest hex payload must be hexadecimal",
432 ));
433 }
434 Ok(())
435}
436
437#[derive(Clone, Debug, PartialEq, Eq, Hash)]
439#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
440#[cfg_attr(feature = "serde", serde(into = "String", try_from = "String"))]
441pub struct SemverReq(String);
442
443impl SemverReq {
444 pub fn parse(value: impl AsRef<str>) -> GResult<Self> {
446 let value = value.as_ref();
447 VersionReq::parse(value).map_err(|err| {
448 GreenticError::new(
449 ErrorCode::InvalidInput,
450 format!("invalid semver requirement '{value}': {err}"),
451 )
452 })?;
453 Ok(Self(value.to_owned()))
454 }
455
456 pub fn as_str(&self) -> &str {
458 &self.0
459 }
460
461 pub fn to_version_req(&self) -> VersionReq {
463 VersionReq::parse(&self.0)
464 .unwrap_or_else(|err| unreachable!("SemverReq::parse validated inputs: {err}"))
465 }
466}
467
468impl fmt::Display for SemverReq {
469 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
470 f.write_str(self.as_str())
471 }
472}
473
474impl From<SemverReq> for String {
475 fn from(value: SemverReq) -> Self {
476 value.0
477 }
478}
479
480impl TryFrom<String> for SemverReq {
481 type Error = GreenticError;
482
483 fn try_from(value: String) -> Result<Self, Self::Error> {
484 SemverReq::parse(&value)
485 }
486}
487
488impl TryFrom<&str> for SemverReq {
489 type Error = GreenticError;
490
491 fn try_from(value: &str) -> Result<Self, Self::Error> {
492 SemverReq::parse(value)
493 }
494}
495
496impl FromStr for SemverReq {
497 type Err = GreenticError;
498
499 fn from_str(s: &str) -> Result<Self, Self::Err> {
500 SemverReq::parse(s)
501 }
502}
503
504#[derive(Clone, Debug, PartialEq, Eq, Hash)]
506#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
507#[cfg_attr(feature = "serde", serde(into = "String", try_from = "String"))]
508pub struct RedactionPath(String);
509
510impl RedactionPath {
511 pub fn parse(value: impl AsRef<str>) -> GResult<Self> {
513 let value = value.as_ref();
514 validate_jsonpath(value)?;
515 Ok(Self(value.to_owned()))
516 }
517
518 pub fn as_str(&self) -> &str {
520 &self.0
521 }
522}
523
524impl fmt::Display for RedactionPath {
525 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
526 f.write_str(self.as_str())
527 }
528}
529
530impl From<RedactionPath> for String {
531 fn from(value: RedactionPath) -> Self {
532 value.0
533 }
534}
535
536impl TryFrom<String> for RedactionPath {
537 type Error = GreenticError;
538
539 fn try_from(value: String) -> Result<Self, Self::Error> {
540 RedactionPath::parse(&value)
541 }
542}
543
544impl TryFrom<&str> for RedactionPath {
545 type Error = GreenticError;
546
547 fn try_from(value: &str) -> Result<Self, Self::Error> {
548 RedactionPath::parse(value)
549 }
550}
551
552fn validate_jsonpath(path: &str) -> GResult<()> {
553 if path.is_empty() {
554 return Err(GreenticError::new(
555 ErrorCode::InvalidInput,
556 "redaction path cannot be empty",
557 ));
558 }
559 if !path.starts_with('$') {
560 return Err(GreenticError::new(
561 ErrorCode::InvalidInput,
562 "redaction path must start with '$'",
563 ));
564 }
565 if path.chars().any(|c| c.is_control()) {
566 return Err(GreenticError::new(
567 ErrorCode::InvalidInput,
568 "redaction path cannot contain control characters",
569 ));
570 }
571 Ok(())
572}
573
574#[cfg(feature = "schemars")]
575impl JsonSchema for SemverReq {
576 fn schema_name() -> Cow<'static, str> {
577 Cow::Borrowed("SemverReq")
578 }
579
580 fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
581 let mut schema = <String>::json_schema(generator);
582 if schema.get("description").is_none() {
583 schema.insert(
584 "description".into(),
585 "Validated semantic version requirement string".into(),
586 );
587 }
588 schema
589 }
590}
591
592#[cfg(feature = "schemars")]
593impl JsonSchema for RedactionPath {
594 fn schema_name() -> Cow<'static, str> {
595 Cow::Borrowed("RedactionPath")
596 }
597
598 fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
599 let mut schema = <String>::json_schema(generator);
600 if schema.get("description").is_none() {
601 schema.insert(
602 "description".into(),
603 "JSONPath expression used for runtime redaction".into(),
604 );
605 }
606 schema
607 }
608}
609
610#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
612#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
613#[cfg_attr(feature = "schemars", derive(JsonSchema))]
614pub struct InvocationDeadline {
615 unix_millis: i128,
616}
617
618impl InvocationDeadline {
619 pub const fn from_unix_millis(unix_millis: i128) -> Self {
621 Self { unix_millis }
622 }
623
624 pub const fn unix_millis(&self) -> i128 {
626 self.unix_millis
627 }
628
629 #[cfg(feature = "time")]
631 pub fn to_offset_date_time(&self) -> Result<OffsetDateTime, time::error::ComponentRange> {
632 OffsetDateTime::from_unix_timestamp_nanos(self.unix_millis * 1_000_000)
633 }
634
635 #[cfg(feature = "time")]
637 pub fn from_offset_date_time(value: OffsetDateTime) -> Self {
638 let nanos = value.unix_timestamp_nanos();
639 Self {
640 unix_millis: nanos / 1_000_000,
641 }
642 }
643}
644
645#[derive(Clone, Debug, PartialEq, Eq, Hash)]
647#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
648#[cfg_attr(feature = "schemars", derive(JsonSchema))]
649pub struct TenantCtx {
650 pub env: EnvId,
652 pub tenant: TenantId,
654 pub tenant_id: TenantId,
656 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
658 pub team: Option<TeamId>,
659 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
661 pub team_id: Option<TeamId>,
662 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
664 pub user: Option<UserId>,
665 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
667 pub user_id: Option<UserId>,
668 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
670 pub session_id: Option<String>,
671 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
673 pub flow_id: Option<String>,
674 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
676 pub node_id: Option<String>,
677 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
679 pub provider_id: Option<String>,
680 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
682 pub trace_id: Option<String>,
683 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
685 pub correlation_id: Option<String>,
686 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
688 pub deadline: Option<InvocationDeadline>,
689 pub attempt: u32,
691 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
693 pub idempotency_key: Option<String>,
694 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
696 pub impersonation: Option<Impersonation>,
697}
698
699impl TenantCtx {
700 pub fn new(env: EnvId, tenant: TenantId) -> Self {
702 let tenant_id = tenant.clone();
703 Self {
704 env,
705 tenant: tenant.clone(),
706 tenant_id,
707 team: None,
708 team_id: None,
709 user: None,
710 user_id: None,
711 session_id: None,
712 flow_id: None,
713 node_id: None,
714 provider_id: None,
715 trace_id: None,
716 correlation_id: None,
717 deadline: None,
718 attempt: 0,
719 idempotency_key: None,
720 impersonation: None,
721 }
722 }
723
724 pub fn with_team(mut self, team: Option<TeamId>) -> Self {
726 self.team = team.clone();
727 self.team_id = team;
728 self
729 }
730
731 pub fn with_user(mut self, user: Option<UserId>) -> Self {
733 self.user = user.clone();
734 self.user_id = user;
735 self
736 }
737
738 pub fn with_session(mut self, session: impl Into<String>) -> Self {
740 self.session_id = Some(session.into());
741 self
742 }
743
744 pub fn with_flow(mut self, flow: impl Into<String>) -> Self {
746 self.flow_id = Some(flow.into());
747 self
748 }
749
750 pub fn with_node(mut self, node: impl Into<String>) -> Self {
752 self.node_id = Some(node.into());
753 self
754 }
755
756 pub fn with_provider(mut self, provider: impl Into<String>) -> Self {
758 self.provider_id = Some(provider.into());
759 self
760 }
761
762 pub fn with_impersonation(mut self, impersonation: Option<Impersonation>) -> Self {
764 self.impersonation = impersonation;
765 self
766 }
767
768 pub fn with_attempt(mut self, attempt: u32) -> Self {
770 self.attempt = attempt;
771 self
772 }
773
774 pub fn with_deadline(mut self, deadline: Option<InvocationDeadline>) -> Self {
776 self.deadline = deadline;
777 self
778 }
779
780 pub fn session_id(&self) -> Option<&str> {
782 self.session_id.as_deref()
783 }
784
785 pub fn flow_id(&self) -> Option<&str> {
787 self.flow_id.as_deref()
788 }
789
790 pub fn node_id(&self) -> Option<&str> {
792 self.node_id.as_deref()
793 }
794
795 pub fn provider_id(&self) -> Option<&str> {
797 self.provider_id.as_deref()
798 }
799}
800
801pub type BinaryPayload = Vec<u8>;
803
804#[derive(Clone, Debug, PartialEq, Eq)]
806#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
807#[cfg_attr(feature = "schemars", derive(JsonSchema))]
808pub struct InvocationEnvelope {
809 pub ctx: TenantCtx,
811 pub flow_id: String,
813 pub node_id: Option<String>,
815 pub op: String,
817 pub payload: BinaryPayload,
819 pub metadata: BinaryPayload,
821}
822
823#[derive(Clone, Debug, PartialEq, Eq)]
825#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
826#[cfg_attr(feature = "schemars", derive(JsonSchema))]
827pub enum ErrorDetail {
828 Text(String),
830 Binary(BinaryPayload),
832}
833
834#[derive(Debug)]
836#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
837#[cfg_attr(feature = "schemars", derive(JsonSchema))]
838pub struct NodeError {
839 pub code: String,
841 pub message: String,
843 pub retryable: bool,
845 pub backoff_ms: Option<u64>,
847 pub details: Option<ErrorDetail>,
849 #[cfg(feature = "std")]
850 #[cfg_attr(feature = "serde", serde(skip, default = "default_source"))]
851 #[cfg_attr(feature = "schemars", schemars(skip))]
852 source: Option<Box<dyn StdError + Send + Sync>>,
853}
854
855#[cfg(all(feature = "std", feature = "serde"))]
856fn default_source() -> Option<Box<dyn StdError + Send + Sync>> {
857 None
858}
859
860impl NodeError {
861 pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
863 Self {
864 code: code.into(),
865 message: message.into(),
866 retryable: false,
867 backoff_ms: None,
868 details: None,
869 #[cfg(feature = "std")]
870 source: None,
871 }
872 }
873
874 pub fn with_retry(mut self, backoff_ms: Option<u64>) -> Self {
876 self.retryable = true;
877 self.backoff_ms = backoff_ms;
878 self
879 }
880
881 pub fn with_detail(mut self, detail: ErrorDetail) -> Self {
883 self.details = Some(detail);
884 self
885 }
886
887 pub fn with_detail_text(mut self, detail: impl Into<String>) -> Self {
889 self.details = Some(ErrorDetail::Text(detail.into()));
890 self
891 }
892
893 pub fn with_detail_binary(mut self, detail: BinaryPayload) -> Self {
895 self.details = Some(ErrorDetail::Binary(detail));
896 self
897 }
898
899 #[cfg(feature = "std")]
900 pub fn with_source<E>(mut self, source: E) -> Self
902 where
903 E: StdError + Send + Sync + 'static,
904 {
905 self.source = Some(Box::new(source));
906 self
907 }
908
909 pub fn detail(&self) -> Option<&ErrorDetail> {
911 self.details.as_ref()
912 }
913
914 #[cfg(feature = "std")]
915 pub fn source(&self) -> Option<&(dyn StdError + Send + Sync + 'static)> {
917 self.source.as_deref()
918 }
919}
920
921impl fmt::Display for NodeError {
922 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
923 write!(f, "{}: {}", self.code, self.message)
924 }
925}
926
927#[cfg(feature = "std")]
928impl StdError for NodeError {
929 fn source(&self) -> Option<&(dyn StdError + 'static)> {
930 self.source
931 .as_ref()
932 .map(|err| err.as_ref() as &(dyn StdError + 'static))
933 }
934}
935
936pub type NodeResult<T> = Result<T, NodeError>;
938
939pub fn make_idempotency_key(
944 ctx: &TenantCtx,
945 flow_id: &str,
946 node_id: Option<&str>,
947 correlation: Option<&str>,
948) -> String {
949 let node_segment = node_id.unwrap_or_default();
950 let correlation_segment = correlation
951 .or(ctx.correlation_id.as_deref())
952 .unwrap_or_default();
953 let input = format!(
954 "{}|{}|{}|{}",
955 ctx.tenant_id.as_str(),
956 flow_id,
957 node_segment,
958 correlation_segment
959 );
960 fnv1a_128_hex(input.as_bytes())
961}
962
963const FNV_OFFSET_BASIS: u128 = 0x6c62272e07bb014262b821756295c58d;
964const FNV_PRIME: u128 = 0x0000000001000000000000000000013b;
965
966fn fnv1a_128_hex(bytes: &[u8]) -> String {
967 let mut hash = FNV_OFFSET_BASIS;
968 for &byte in bytes {
969 hash ^= byte as u128;
970 hash = hash.wrapping_mul(FNV_PRIME);
971 }
972
973 let mut output = String::with_capacity(32);
974 for shift in (0..32).rev() {
975 let nibble = ((hash >> (shift * 4)) & 0x0f) as u8;
976 output.push(match nibble {
977 0..=9 => (b'0' + nibble) as char,
978 _ => (b'a' + (nibble - 10)) as char,
979 });
980 }
981 output
982}
983
984#[cfg(test)]
985mod tests {
986 use super::*;
987 use core::convert::TryFrom;
988 use time::OffsetDateTime;
989
990 fn sample_ctx() -> TenantCtx {
991 let env = EnvId::try_from("prod").unwrap_or_else(|err| panic!("{err}"));
992 let tenant = TenantId::try_from("tenant-123").unwrap_or_else(|err| panic!("{err}"));
993 let team = TeamId::try_from("team-456").unwrap_or_else(|err| panic!("{err}"));
994 let user = UserId::try_from("user-789").unwrap_or_else(|err| panic!("{err}"));
995
996 let mut ctx = TenantCtx::new(env, tenant)
997 .with_team(Some(team))
998 .with_user(Some(user))
999 .with_attempt(2)
1000 .with_deadline(Some(InvocationDeadline::from_unix_millis(
1001 1_700_000_000_000,
1002 )));
1003 ctx.trace_id = Some("trace-abc".to_owned());
1004 ctx.correlation_id = Some("corr-xyz".to_owned());
1005 ctx.idempotency_key = Some("key-123".to_owned());
1006 ctx
1007 }
1008
1009 #[test]
1010 fn idempotent_key_stable() {
1011 let ctx = sample_ctx();
1012 let key_a = make_idempotency_key(&ctx, "flow-1", Some("node-1"), Some("corr-override"));
1013 let key_b = make_idempotency_key(&ctx, "flow-1", Some("node-1"), Some("corr-override"));
1014 assert_eq!(key_a, key_b);
1015 assert_eq!(key_a.len(), 32);
1016 }
1017
1018 #[test]
1019 fn idempotent_key_uses_context_correlation() {
1020 let ctx = sample_ctx();
1021 let key = make_idempotency_key(&ctx, "flow-1", None, None);
1022 let expected = make_idempotency_key(&ctx, "flow-1", None, ctx.correlation_id.as_deref());
1023 assert_eq!(key, expected);
1024 }
1025
1026 #[test]
1027 #[cfg(feature = "time")]
1028 fn deadline_roundtrips_through_offset_datetime() {
1029 let dt = OffsetDateTime::from_unix_timestamp(1_700_000_000)
1030 .unwrap_or_else(|err| panic!("valid timestamp: {err}"));
1031 let deadline = InvocationDeadline::from_offset_date_time(dt);
1032 let roundtrip = deadline
1033 .to_offset_date_time()
1034 .unwrap_or_else(|err| panic!("round-trip conversion failed: {err}"));
1035 let millis = dt.unix_timestamp_nanos() / 1_000_000;
1036 assert_eq!(deadline.unix_millis(), millis);
1037 assert_eq!(roundtrip.unix_timestamp_nanos() / 1_000_000, millis);
1038 }
1039
1040 #[test]
1041 fn node_error_builder_sets_fields() {
1042 let err = NodeError::new("TEST", "example")
1043 .with_retry(Some(500))
1044 .with_detail_text("context");
1045
1046 assert!(err.retryable);
1047 assert_eq!(err.backoff_ms, Some(500));
1048 match err.detail() {
1049 Some(ErrorDetail::Text(detail)) => assert_eq!(detail, "context"),
1050 other => panic!("unexpected detail {:?}", other),
1051 }
1052 }
1053
1054 #[cfg(feature = "std")]
1055 #[test]
1056 fn node_error_source_roundtrips() {
1057 use std::io::Error;
1058
1059 let source = Error::other("boom");
1060 let err = NodeError::new("TEST", "example").with_source(source);
1061 assert!(err.source().is_some());
1062 }
1063}