greentic_types/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2#![forbid(unsafe_code)]
3#![warn(missing_docs, clippy::unwrap_used, clippy::expect_used)]
4
5//! Shared types and helpers for Greentic multi-tenant flows.
6
7extern crate alloc;
8
9pub mod pack_spec;
10
11pub mod context;
12pub mod error;
13pub mod outcome;
14pub mod pack;
15pub mod policy;
16pub mod session;
17pub mod state;
18pub mod telemetry;
19pub mod tenant;
20
21pub use context::{Cloud, DeploymentCtx, Platform};
22pub use error::{ErrorCode, GResult, GreenticError};
23pub use outcome::Outcome;
24pub use pack::{PackRef, Signature, SignatureAlgorithm};
25pub use pack_spec::{PackSpec, ToolSpec};
26pub use policy::{AllowList, NetworkPolicy, PolicyDecision, Protocol};
27pub use session::{SessionCursor, SessionKey};
28pub use state::{StateKey, StatePath};
29pub use telemetry::SpanContext;
30#[cfg(feature = "telemetry-autoinit")]
31pub use telemetry::TelemetryCtx;
32pub use tenant::{Impersonation, TenantIdentity};
33
34use alloc::{borrow::ToOwned, format, string::String, vec::Vec};
35use core::fmt;
36#[cfg(feature = "schemars")]
37use schemars::JsonSchema;
38#[cfg(feature = "time")]
39use time::OffsetDateTime;
40
41#[cfg(feature = "serde")]
42use serde::{Deserialize, Serialize};
43
44#[cfg(feature = "std")]
45use alloc::boxed::Box;
46
47#[cfg(feature = "std")]
48use std::error::Error as StdError;
49
50macro_rules! id_newtype {
51    ($name:ident, $doc:literal) => {
52        #[doc = $doc]
53        #[derive(Clone, Debug, Eq, PartialEq, Hash)]
54        #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
55        #[cfg_attr(feature = "schemars", derive(JsonSchema))]
56        #[cfg_attr(feature = "serde", serde(transparent))]
57        pub struct $name(pub String);
58
59        impl $name {
60            /// Returns the identifier as a string slice.
61            pub fn as_str(&self) -> &str {
62                &self.0
63            }
64        }
65
66        impl From<String> for $name {
67            fn from(value: String) -> Self {
68                Self(value)
69            }
70        }
71
72        impl From<&str> for $name {
73            fn from(value: &str) -> Self {
74                Self(value.to_owned())
75            }
76        }
77
78        impl From<$name> for String {
79            fn from(value: $name) -> Self {
80                value.0
81            }
82        }
83
84        impl AsRef<str> for $name {
85            fn as_ref(&self) -> &str {
86                self.as_str()
87            }
88        }
89
90        impl fmt::Display for $name {
91            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92                f.write_str(self.as_str())
93            }
94        }
95    };
96}
97
98id_newtype!(EnvId, "Environment identifier for a tenant context.");
99id_newtype!(TenantId, "Tenant identifier within an environment.");
100id_newtype!(TeamId, "Team identifier belonging to a tenant.");
101id_newtype!(UserId, "User identifier within a tenant.");
102
103/// Deadline metadata for an invocation, stored as Unix epoch milliseconds.
104#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
105#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
106#[cfg_attr(feature = "schemars", derive(JsonSchema))]
107pub struct InvocationDeadline {
108    unix_millis: i128,
109}
110
111impl InvocationDeadline {
112    /// Creates a deadline from a Unix timestamp expressed in milliseconds.
113    pub const fn from_unix_millis(unix_millis: i128) -> Self {
114        Self { unix_millis }
115    }
116
117    /// Returns the deadline as Unix epoch milliseconds.
118    pub const fn unix_millis(&self) -> i128 {
119        self.unix_millis
120    }
121
122    /// Converts the deadline into an [`OffsetDateTime`].
123    #[cfg(feature = "time")]
124    pub fn to_offset_date_time(&self) -> Result<OffsetDateTime, time::error::ComponentRange> {
125        OffsetDateTime::from_unix_timestamp_nanos(self.unix_millis * 1_000_000)
126    }
127
128    /// Creates a deadline from an [`OffsetDateTime`], truncating to milliseconds.
129    #[cfg(feature = "time")]
130    pub fn from_offset_date_time(value: OffsetDateTime) -> Self {
131        let nanos = value.unix_timestamp_nanos();
132        Self {
133            unix_millis: nanos / 1_000_000,
134        }
135    }
136}
137
138/// Context that accompanies every invocation across Greentic runtimes.
139#[derive(Clone, Debug, PartialEq, Eq, Hash)]
140#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
141#[cfg_attr(feature = "schemars", derive(JsonSchema))]
142pub struct TenantCtx {
143    /// Environment scope (for example `dev`, `staging`, or `prod`).
144    pub env: EnvId,
145    /// Tenant identifier for the current execution.
146    pub tenant: TenantId,
147    /// Stable tenant identifier reference used across systems.
148    pub tenant_id: TenantId,
149    /// Optional team identifier scoped to the tenant.
150    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
151    pub team: Option<TeamId>,
152    /// Optional team identifier accessible via the shared schema.
153    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
154    pub team_id: Option<TeamId>,
155    /// Optional user identifier scoped to the tenant.
156    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
157    pub user: Option<UserId>,
158    /// Optional user identifier aligned with the shared schema.
159    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
160    pub user_id: Option<UserId>,
161    /// Optional session identifier propagated by the runtime.
162    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
163    pub session_id: Option<String>,
164    /// Optional flow identifier for the current execution.
165    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
166    pub flow_id: Option<String>,
167    /// Optional node identifier within the flow.
168    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
169    pub node_id: Option<String>,
170    /// Optional provider identifier describing the runtime surface.
171    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
172    pub provider_id: Option<String>,
173    /// Distributed tracing identifier when available.
174    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
175    pub trace_id: Option<String>,
176    /// Correlation identifier for linking related events.
177    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
178    pub correlation_id: Option<String>,
179    /// Deadline when the invocation should finish.
180    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
181    pub deadline: Option<InvocationDeadline>,
182    /// Attempt counter for retried invocations (starting at zero).
183    pub attempt: u32,
184    /// Stable idempotency key propagated across retries.
185    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
186    pub idempotency_key: Option<String>,
187    /// Optional impersonation context describing the acting identity.
188    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
189    pub impersonation: Option<Impersonation>,
190}
191
192impl TenantCtx {
193    /// Creates a new tenant context with the provided environment and tenant identifiers.
194    pub fn new(env: EnvId, tenant: TenantId) -> Self {
195        let tenant_id = tenant.clone();
196        Self {
197            env,
198            tenant: tenant.clone(),
199            tenant_id,
200            team: None,
201            team_id: None,
202            user: None,
203            user_id: None,
204            session_id: None,
205            flow_id: None,
206            node_id: None,
207            provider_id: None,
208            trace_id: None,
209            correlation_id: None,
210            deadline: None,
211            attempt: 0,
212            idempotency_key: None,
213            impersonation: None,
214        }
215    }
216
217    /// Updates the team information ensuring legacy and shared fields stay aligned.
218    pub fn with_team(mut self, team: Option<TeamId>) -> Self {
219        self.team = team.clone();
220        self.team_id = team;
221        self
222    }
223
224    /// Updates the user information ensuring legacy and shared fields stay aligned.
225    pub fn with_user(mut self, user: Option<UserId>) -> Self {
226        self.user = user.clone();
227        self.user_id = user;
228        self
229    }
230
231    /// Updates the session identifier.
232    pub fn with_session(mut self, session: impl Into<String>) -> Self {
233        self.session_id = Some(session.into());
234        self
235    }
236
237    /// Updates the flow identifier.
238    pub fn with_flow(mut self, flow: impl Into<String>) -> Self {
239        self.flow_id = Some(flow.into());
240        self
241    }
242
243    /// Updates the node identifier.
244    pub fn with_node(mut self, node: impl Into<String>) -> Self {
245        self.node_id = Some(node.into());
246        self
247    }
248
249    /// Updates the provider identifier.
250    pub fn with_provider(mut self, provider: impl Into<String>) -> Self {
251        self.provider_id = Some(provider.into());
252        self
253    }
254
255    /// Sets the impersonation context.
256    pub fn with_impersonation(mut self, impersonation: Option<Impersonation>) -> Self {
257        self.impersonation = impersonation;
258        self
259    }
260
261    /// Returns a copy of the context with the provided attempt value.
262    pub fn with_attempt(mut self, attempt: u32) -> Self {
263        self.attempt = attempt;
264        self
265    }
266
267    /// Updates the deadline metadata for subsequent invocations.
268    pub fn with_deadline(mut self, deadline: Option<InvocationDeadline>) -> Self {
269        self.deadline = deadline;
270        self
271    }
272
273    /// Returns the session identifier, when present.
274    pub fn session_id(&self) -> Option<&str> {
275        self.session_id.as_deref()
276    }
277
278    /// Returns the flow identifier, when present.
279    pub fn flow_id(&self) -> Option<&str> {
280        self.flow_id.as_deref()
281    }
282
283    /// Returns the node identifier, when present.
284    pub fn node_id(&self) -> Option<&str> {
285        self.node_id.as_deref()
286    }
287
288    /// Returns the provider identifier, when present.
289    pub fn provider_id(&self) -> Option<&str> {
290        self.provider_id.as_deref()
291    }
292}
293
294/// Primary payload representation shared across envelopes.
295pub type BinaryPayload = Vec<u8>;
296
297/// Normalized ingress payload delivered to nodes.
298#[derive(Clone, Debug, PartialEq, Eq)]
299#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
300#[cfg_attr(feature = "schemars", derive(JsonSchema))]
301pub struct InvocationEnvelope {
302    /// Tenant context for the invocation.
303    pub ctx: TenantCtx,
304    /// Flow identifier the event belongs to.
305    pub flow_id: String,
306    /// Optional node identifier within the flow.
307    pub node_id: Option<String>,
308    /// Operation being invoked (for example `on_message` or `tick`).
309    pub op: String,
310    /// Normalized payload for the invocation.
311    pub payload: BinaryPayload,
312    /// Raw metadata propagated from the ingress surface.
313    pub metadata: BinaryPayload,
314}
315
316/// Structured detail payload attached to a node error.
317#[derive(Clone, Debug, PartialEq, Eq)]
318#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
319#[cfg_attr(feature = "schemars", derive(JsonSchema))]
320pub enum ErrorDetail {
321    /// UTF-8 encoded detail payload.
322    Text(String),
323    /// Binary payload detail (for example message pack or CBOR).
324    Binary(BinaryPayload),
325}
326
327/// Error type emitted by Greentic nodes.
328#[derive(Debug)]
329#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
330#[cfg_attr(feature = "schemars", derive(JsonSchema))]
331pub struct NodeError {
332    /// Machine readable error code.
333    pub code: String,
334    /// Human readable message explaining the failure.
335    pub message: String,
336    /// Whether the failure is retryable by the runtime.
337    pub retryable: bool,
338    /// Optional backoff duration in milliseconds for the next retry.
339    pub backoff_ms: Option<u64>,
340    /// Optional structured error detail payload.
341    pub details: Option<ErrorDetail>,
342    #[cfg(feature = "std")]
343    #[cfg_attr(feature = "serde", serde(skip, default = "default_source"))]
344    #[cfg_attr(feature = "schemars", schemars(skip))]
345    source: Option<Box<dyn StdError + Send + Sync>>,
346}
347
348#[cfg(all(feature = "std", feature = "serde"))]
349fn default_source() -> Option<Box<dyn StdError + Send + Sync>> {
350    None
351}
352
353impl NodeError {
354    /// Constructs a non-retryable failure with the supplied code and message.
355    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
356        Self {
357            code: code.into(),
358            message: message.into(),
359            retryable: false,
360            backoff_ms: None,
361            details: None,
362            #[cfg(feature = "std")]
363            source: None,
364        }
365    }
366
367    /// Marks the error as retryable with an optional backoff value.
368    pub fn with_retry(mut self, backoff_ms: Option<u64>) -> Self {
369        self.retryable = true;
370        self.backoff_ms = backoff_ms;
371        self
372    }
373
374    /// Attaches structured details to the error.
375    pub fn with_detail(mut self, detail: ErrorDetail) -> Self {
376        self.details = Some(detail);
377        self
378    }
379
380    /// Attaches a textual detail payload to the error.
381    pub fn with_detail_text(mut self, detail: impl Into<String>) -> Self {
382        self.details = Some(ErrorDetail::Text(detail.into()));
383        self
384    }
385
386    /// Attaches a binary detail payload to the error.
387    pub fn with_detail_binary(mut self, detail: BinaryPayload) -> Self {
388        self.details = Some(ErrorDetail::Binary(detail));
389        self
390    }
391
392    #[cfg(feature = "std")]
393    /// Attaches a source error to the failure for debugging purposes.
394    pub fn with_source<E>(mut self, source: E) -> Self
395    where
396        E: StdError + Send + Sync + 'static,
397    {
398        self.source = Some(Box::new(source));
399        self
400    }
401
402    /// Returns the structured details, when available.
403    pub fn detail(&self) -> Option<&ErrorDetail> {
404        self.details.as_ref()
405    }
406
407    #[cfg(feature = "std")]
408    /// Returns the attached source error when one has been provided.
409    pub fn source(&self) -> Option<&(dyn StdError + Send + Sync + 'static)> {
410        self.source.as_deref()
411    }
412}
413
414impl fmt::Display for NodeError {
415    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
416        write!(f, "{}: {}", self.code, self.message)
417    }
418}
419
420#[cfg(feature = "std")]
421impl StdError for NodeError {
422    fn source(&self) -> Option<&(dyn StdError + 'static)> {
423        self.source
424            .as_ref()
425            .map(|err| err.as_ref() as &(dyn StdError + 'static))
426    }
427}
428
429/// Alias for results returned by node handlers.
430pub type NodeResult<T> = Result<T, NodeError>;
431
432/// Generates a stable idempotency key for a node invocation.
433///
434/// The key uses tenant, flow, node, and correlation identifiers. Missing
435/// correlation values fall back to the value stored on the context.
436pub fn make_idempotency_key(
437    ctx: &TenantCtx,
438    flow_id: &str,
439    node_id: Option<&str>,
440    correlation: Option<&str>,
441) -> String {
442    let node_segment = node_id.unwrap_or_default();
443    let correlation_segment = correlation
444        .or(ctx.correlation_id.as_deref())
445        .unwrap_or_default();
446    let input = format!(
447        "{}|{}|{}|{}",
448        ctx.tenant_id.as_str(),
449        flow_id,
450        node_segment,
451        correlation_segment
452    );
453    fnv1a_128_hex(input.as_bytes())
454}
455
456const FNV_OFFSET_BASIS: u128 = 0x6c62272e07bb014262b821756295c58d;
457const FNV_PRIME: u128 = 0x0000000001000000000000000000013b;
458
459fn fnv1a_128_hex(bytes: &[u8]) -> String {
460    let mut hash = FNV_OFFSET_BASIS;
461    for &byte in bytes {
462        hash ^= byte as u128;
463        hash = hash.wrapping_mul(FNV_PRIME);
464    }
465
466    let mut output = String::with_capacity(32);
467    for shift in (0..32).rev() {
468        let nibble = ((hash >> (shift * 4)) & 0x0f) as u8;
469        output.push(match nibble {
470            0..=9 => (b'0' + nibble) as char,
471            _ => (b'a' + (nibble - 10)) as char,
472        });
473    }
474    output
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480    use time::OffsetDateTime;
481
482    fn sample_ctx() -> TenantCtx {
483        let mut ctx = TenantCtx::new(EnvId::from("prod"), TenantId::from("tenant-123"))
484            .with_team(Some(TeamId::from("team-456")))
485            .with_user(Some(UserId::from("user-789")))
486            .with_attempt(2)
487            .with_deadline(Some(InvocationDeadline::from_unix_millis(
488                1_700_000_000_000,
489            )));
490        ctx.trace_id = Some("trace-abc".to_owned());
491        ctx.correlation_id = Some("corr-xyz".to_owned());
492        ctx.idempotency_key = Some("key-123".to_owned());
493        ctx
494    }
495
496    #[test]
497    fn idempotent_key_stable() {
498        let ctx = sample_ctx();
499        let key_a = make_idempotency_key(&ctx, "flow-1", Some("node-1"), Some("corr-override"));
500        let key_b = make_idempotency_key(&ctx, "flow-1", Some("node-1"), Some("corr-override"));
501        assert_eq!(key_a, key_b);
502        assert_eq!(key_a.len(), 32);
503    }
504
505    #[test]
506    fn idempotent_key_uses_context_correlation() {
507        let ctx = sample_ctx();
508        let key = make_idempotency_key(&ctx, "flow-1", None, None);
509        let expected = make_idempotency_key(&ctx, "flow-1", None, ctx.correlation_id.as_deref());
510        assert_eq!(key, expected);
511    }
512
513    #[test]
514    #[cfg(feature = "time")]
515    fn deadline_roundtrips_through_offset_datetime() {
516        let dt = OffsetDateTime::from_unix_timestamp(1_700_000_000)
517            .unwrap_or_else(|err| panic!("valid timestamp: {err}"));
518        let deadline = InvocationDeadline::from_offset_date_time(dt);
519        let roundtrip = deadline
520            .to_offset_date_time()
521            .unwrap_or_else(|err| panic!("round-trip conversion failed: {err}"));
522        let millis = dt.unix_timestamp_nanos() / 1_000_000;
523        assert_eq!(deadline.unix_millis(), millis);
524        assert_eq!(roundtrip.unix_timestamp_nanos() / 1_000_000, millis);
525    }
526
527    #[test]
528    fn node_error_builder_sets_fields() {
529        let err = NodeError::new("TEST", "example")
530            .with_retry(Some(500))
531            .with_detail_text("context");
532
533        assert!(err.retryable);
534        assert_eq!(err.backoff_ms, Some(500));
535        match err.detail() {
536            Some(ErrorDetail::Text(detail)) => assert_eq!(detail, "context"),
537            other => panic!("unexpected detail {:?}", other),
538        }
539    }
540
541    #[cfg(feature = "std")]
542    #[test]
543    fn node_error_source_roundtrips() {
544        use std::io::Error;
545
546        let source = Error::other("boom");
547        let err = NodeError::new("TEST", "example").with_source(source);
548        assert!(err.source().is_some());
549    }
550}