greentic_types/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2#![forbid(unsafe_code)]
3#![warn(missing_docs, clippy::unwrap_used, clippy::expect_used)]
4
5//! Shared types and helpers for Greentic multi-tenant flows.
6
7extern crate alloc;
8
9pub mod pack_spec;
10
11pub mod context;
12pub mod error;
13pub mod outcome;
14pub mod pack;
15pub mod policy;
16pub mod session;
17pub mod state;
18pub mod telemetry;
19pub mod tenant;
20
21pub use context::{Cloud, DeploymentCtx, Platform};
22pub use error::{ErrorCode, GResult, GreenticError};
23pub use outcome::Outcome;
24pub use pack::{PackRef, Signature, SignatureAlgorithm};
25pub use pack_spec::{PackSpec, ToolSpec};
26pub use policy::{AllowList, NetworkPolicy, PolicyDecision, Protocol};
27pub use session::{SessionCursor, SessionKey};
28pub use state::{StateKey, StatePath};
29pub use telemetry::SpanContext;
30pub use tenant::{Impersonation, TenantIdentity};
31
32use alloc::{borrow::ToOwned, format, string::String, vec::Vec};
33use core::fmt;
34#[cfg(feature = "schemars")]
35use schemars::JsonSchema;
36#[cfg(feature = "time")]
37use time::OffsetDateTime;
38
39#[cfg(feature = "serde")]
40use serde::{Deserialize, Serialize};
41
42#[cfg(feature = "std")]
43use alloc::boxed::Box;
44
45#[cfg(feature = "std")]
46use std::error::Error as StdError;
47
48macro_rules! id_newtype {
49    ($name:ident, $doc:literal) => {
50        #[doc = $doc]
51        #[derive(Clone, Debug, Eq, PartialEq, Hash)]
52        #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
53        #[cfg_attr(feature = "schemars", derive(JsonSchema))]
54        #[cfg_attr(feature = "serde", serde(transparent))]
55        pub struct $name(pub String);
56
57        impl $name {
58            /// Returns the identifier as a string slice.
59            pub fn as_str(&self) -> &str {
60                &self.0
61            }
62        }
63
64        impl From<String> for $name {
65            fn from(value: String) -> Self {
66                Self(value)
67            }
68        }
69
70        impl From<&str> for $name {
71            fn from(value: &str) -> Self {
72                Self(value.to_owned())
73            }
74        }
75
76        impl From<$name> for String {
77            fn from(value: $name) -> Self {
78                value.0
79            }
80        }
81
82        impl AsRef<str> for $name {
83            fn as_ref(&self) -> &str {
84                self.as_str()
85            }
86        }
87
88        impl fmt::Display for $name {
89            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90                f.write_str(self.as_str())
91            }
92        }
93    };
94}
95
96id_newtype!(EnvId, "Environment identifier for a tenant context.");
97id_newtype!(TenantId, "Tenant identifier within an environment.");
98id_newtype!(TeamId, "Team identifier belonging to a tenant.");
99id_newtype!(UserId, "User identifier within a tenant.");
100
101/// Deadline metadata for an invocation, stored as Unix epoch milliseconds.
102#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
103#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
104#[cfg_attr(feature = "schemars", derive(JsonSchema))]
105pub struct InvocationDeadline {
106    unix_millis: i128,
107}
108
109impl InvocationDeadline {
110    /// Creates a deadline from a Unix timestamp expressed in milliseconds.
111    pub const fn from_unix_millis(unix_millis: i128) -> Self {
112        Self { unix_millis }
113    }
114
115    /// Returns the deadline as Unix epoch milliseconds.
116    pub const fn unix_millis(&self) -> i128 {
117        self.unix_millis
118    }
119
120    /// Converts the deadline into an [`OffsetDateTime`].
121    #[cfg(feature = "time")]
122    pub fn to_offset_date_time(&self) -> Result<OffsetDateTime, time::error::ComponentRange> {
123        OffsetDateTime::from_unix_timestamp_nanos(self.unix_millis * 1_000_000)
124    }
125
126    /// Creates a deadline from an [`OffsetDateTime`], truncating to milliseconds.
127    #[cfg(feature = "time")]
128    pub fn from_offset_date_time(value: OffsetDateTime) -> Self {
129        let nanos = value.unix_timestamp_nanos();
130        Self {
131            unix_millis: nanos / 1_000_000,
132        }
133    }
134}
135
136/// Context that accompanies every invocation across Greentic runtimes.
137#[derive(Clone, Debug, PartialEq, Eq, Hash)]
138#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
139#[cfg_attr(feature = "schemars", derive(JsonSchema))]
140pub struct TenantCtx {
141    /// Environment scope (for example `dev`, `staging`, or `prod`).
142    pub env: EnvId,
143    /// Tenant identifier for the current execution.
144    pub tenant: TenantId,
145    /// Stable tenant identifier reference used across systems.
146    pub tenant_id: TenantId,
147    /// Optional team identifier scoped to the tenant.
148    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
149    pub team: Option<TeamId>,
150    /// Optional team identifier accessible via the shared schema.
151    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
152    pub team_id: Option<TeamId>,
153    /// Optional user identifier scoped to the tenant.
154    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
155    pub user: Option<UserId>,
156    /// Optional user identifier aligned with the shared schema.
157    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
158    pub user_id: Option<UserId>,
159    /// Distributed tracing identifier when available.
160    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
161    pub trace_id: Option<String>,
162    /// Correlation identifier for linking related events.
163    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
164    pub correlation_id: Option<String>,
165    /// Deadline when the invocation should finish.
166    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
167    pub deadline: Option<InvocationDeadline>,
168    /// Attempt counter for retried invocations (starting at zero).
169    pub attempt: u32,
170    /// Stable idempotency key propagated across retries.
171    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
172    pub idempotency_key: Option<String>,
173    /// Optional impersonation context describing the acting identity.
174    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
175    pub impersonation: Option<Impersonation>,
176}
177
178impl TenantCtx {
179    /// Creates a new tenant context with the provided environment and tenant identifiers.
180    pub fn new(env: EnvId, tenant: TenantId) -> Self {
181        let tenant_id = tenant.clone();
182        Self {
183            env,
184            tenant: tenant.clone(),
185            tenant_id,
186            team: None,
187            team_id: None,
188            user: None,
189            user_id: None,
190            trace_id: None,
191            correlation_id: None,
192            deadline: None,
193            attempt: 0,
194            idempotency_key: None,
195            impersonation: None,
196        }
197    }
198
199    /// Updates the team information ensuring legacy and shared fields stay aligned.
200    pub fn with_team(mut self, team: Option<TeamId>) -> Self {
201        self.team = team.clone();
202        self.team_id = team;
203        self
204    }
205
206    /// Updates the user information ensuring legacy and shared fields stay aligned.
207    pub fn with_user(mut self, user: Option<UserId>) -> Self {
208        self.user = user.clone();
209        self.user_id = user;
210        self
211    }
212
213    /// Sets the impersonation context.
214    pub fn with_impersonation(mut self, impersonation: Option<Impersonation>) -> Self {
215        self.impersonation = impersonation;
216        self
217    }
218
219    /// Returns a copy of the context with the provided attempt value.
220    pub fn with_attempt(mut self, attempt: u32) -> Self {
221        self.attempt = attempt;
222        self
223    }
224
225    /// Updates the deadline metadata for subsequent invocations.
226    pub fn with_deadline(mut self, deadline: Option<InvocationDeadline>) -> Self {
227        self.deadline = deadline;
228        self
229    }
230}
231
232/// Primary payload representation shared across envelopes.
233pub type BinaryPayload = Vec<u8>;
234
235/// Normalized ingress payload delivered to nodes.
236#[derive(Clone, Debug, PartialEq, Eq)]
237#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
238#[cfg_attr(feature = "schemars", derive(JsonSchema))]
239pub struct InvocationEnvelope {
240    /// Tenant context for the invocation.
241    pub ctx: TenantCtx,
242    /// Flow identifier the event belongs to.
243    pub flow_id: String,
244    /// Optional node identifier within the flow.
245    pub node_id: Option<String>,
246    /// Operation being invoked (for example `on_message` or `tick`).
247    pub op: String,
248    /// Normalized payload for the invocation.
249    pub payload: BinaryPayload,
250    /// Raw metadata propagated from the ingress surface.
251    pub metadata: BinaryPayload,
252}
253
254/// Structured detail payload attached to a node error.
255#[derive(Clone, Debug, PartialEq, Eq)]
256#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
257#[cfg_attr(feature = "schemars", derive(JsonSchema))]
258pub enum ErrorDetail {
259    /// UTF-8 encoded detail payload.
260    Text(String),
261    /// Binary payload detail (for example message pack or CBOR).
262    Binary(BinaryPayload),
263}
264
265/// Error type emitted by Greentic nodes.
266#[derive(Debug)]
267#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
268#[cfg_attr(feature = "schemars", derive(JsonSchema))]
269pub struct NodeError {
270    /// Machine readable error code.
271    pub code: String,
272    /// Human readable message explaining the failure.
273    pub message: String,
274    /// Whether the failure is retryable by the runtime.
275    pub retryable: bool,
276    /// Optional backoff duration in milliseconds for the next retry.
277    pub backoff_ms: Option<u64>,
278    /// Optional structured error detail payload.
279    pub details: Option<ErrorDetail>,
280    #[cfg(feature = "std")]
281    #[cfg_attr(feature = "serde", serde(skip, default = "default_source"))]
282    #[cfg_attr(feature = "schemars", schemars(skip))]
283    source: Option<Box<dyn StdError + Send + Sync>>,
284}
285
286#[cfg(all(feature = "std", feature = "serde"))]
287fn default_source() -> Option<Box<dyn StdError + Send + Sync>> {
288    None
289}
290
291impl NodeError {
292    /// Constructs a non-retryable failure with the supplied code and message.
293    pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
294        Self {
295            code: code.into(),
296            message: message.into(),
297            retryable: false,
298            backoff_ms: None,
299            details: None,
300            #[cfg(feature = "std")]
301            source: None,
302        }
303    }
304
305    /// Marks the error as retryable with an optional backoff value.
306    pub fn with_retry(mut self, backoff_ms: Option<u64>) -> Self {
307        self.retryable = true;
308        self.backoff_ms = backoff_ms;
309        self
310    }
311
312    /// Attaches structured details to the error.
313    pub fn with_detail(mut self, detail: ErrorDetail) -> Self {
314        self.details = Some(detail);
315        self
316    }
317
318    /// Attaches a textual detail payload to the error.
319    pub fn with_detail_text(mut self, detail: impl Into<String>) -> Self {
320        self.details = Some(ErrorDetail::Text(detail.into()));
321        self
322    }
323
324    /// Attaches a binary detail payload to the error.
325    pub fn with_detail_binary(mut self, detail: BinaryPayload) -> Self {
326        self.details = Some(ErrorDetail::Binary(detail));
327        self
328    }
329
330    #[cfg(feature = "std")]
331    /// Attaches a source error to the failure for debugging purposes.
332    pub fn with_source<E>(mut self, source: E) -> Self
333    where
334        E: StdError + Send + Sync + 'static,
335    {
336        self.source = Some(Box::new(source));
337        self
338    }
339
340    /// Returns the structured details, when available.
341    pub fn detail(&self) -> Option<&ErrorDetail> {
342        self.details.as_ref()
343    }
344
345    #[cfg(feature = "std")]
346    /// Returns the attached source error when one has been provided.
347    pub fn source(&self) -> Option<&(dyn StdError + Send + Sync + 'static)> {
348        self.source.as_deref()
349    }
350}
351
352impl fmt::Display for NodeError {
353    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354        write!(f, "{}: {}", self.code, self.message)
355    }
356}
357
358#[cfg(feature = "std")]
359impl StdError for NodeError {
360    fn source(&self) -> Option<&(dyn StdError + 'static)> {
361        self.source
362            .as_ref()
363            .map(|err| err.as_ref() as &(dyn StdError + 'static))
364    }
365}
366
367/// Alias for results returned by node handlers.
368pub type NodeResult<T> = Result<T, NodeError>;
369
370/// Generates a stable idempotency key for a node invocation.
371///
372/// The key uses tenant, flow, node, and correlation identifiers. Missing
373/// correlation values fall back to the value stored on the context.
374pub fn make_idempotency_key(
375    ctx: &TenantCtx,
376    flow_id: &str,
377    node_id: Option<&str>,
378    correlation: Option<&str>,
379) -> String {
380    let node_segment = node_id.unwrap_or_default();
381    let correlation_segment = correlation
382        .or(ctx.correlation_id.as_deref())
383        .unwrap_or_default();
384    let input = format!(
385        "{}|{}|{}|{}",
386        ctx.tenant_id.as_str(),
387        flow_id,
388        node_segment,
389        correlation_segment
390    );
391    fnv1a_128_hex(input.as_bytes())
392}
393
394const FNV_OFFSET_BASIS: u128 = 0x6c62272e07bb014262b821756295c58d;
395const FNV_PRIME: u128 = 0x0000000001000000000000000000013b;
396
397fn fnv1a_128_hex(bytes: &[u8]) -> String {
398    let mut hash = FNV_OFFSET_BASIS;
399    for &byte in bytes {
400        hash ^= byte as u128;
401        hash = hash.wrapping_mul(FNV_PRIME);
402    }
403
404    let mut output = String::with_capacity(32);
405    for shift in (0..32).rev() {
406        let nibble = ((hash >> (shift * 4)) & 0x0f) as u8;
407        output.push(match nibble {
408            0..=9 => (b'0' + nibble) as char,
409            _ => (b'a' + (nibble - 10)) as char,
410        });
411    }
412    output
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use time::OffsetDateTime;
419
420    fn sample_ctx() -> TenantCtx {
421        let mut ctx = TenantCtx::new(EnvId::from("prod"), TenantId::from("tenant-123"))
422            .with_team(Some(TeamId::from("team-456")))
423            .with_user(Some(UserId::from("user-789")))
424            .with_attempt(2)
425            .with_deadline(Some(InvocationDeadline::from_unix_millis(
426                1_700_000_000_000,
427            )));
428        ctx.trace_id = Some("trace-abc".to_owned());
429        ctx.correlation_id = Some("corr-xyz".to_owned());
430        ctx.idempotency_key = Some("key-123".to_owned());
431        ctx
432    }
433
434    #[test]
435    fn idempotent_key_stable() {
436        let ctx = sample_ctx();
437        let key_a = make_idempotency_key(&ctx, "flow-1", Some("node-1"), Some("corr-override"));
438        let key_b = make_idempotency_key(&ctx, "flow-1", Some("node-1"), Some("corr-override"));
439        assert_eq!(key_a, key_b);
440        assert_eq!(key_a.len(), 32);
441    }
442
443    #[test]
444    fn idempotent_key_uses_context_correlation() {
445        let ctx = sample_ctx();
446        let key = make_idempotency_key(&ctx, "flow-1", None, None);
447        let expected = make_idempotency_key(&ctx, "flow-1", None, ctx.correlation_id.as_deref());
448        assert_eq!(key, expected);
449    }
450
451    #[test]
452    #[cfg(feature = "time")]
453    fn deadline_roundtrips_through_offset_datetime() {
454        let dt = OffsetDateTime::from_unix_timestamp(1_700_000_000)
455            .unwrap_or_else(|err| panic!("valid timestamp: {err}"));
456        let deadline = InvocationDeadline::from_offset_date_time(dt);
457        let roundtrip = deadline
458            .to_offset_date_time()
459            .unwrap_or_else(|err| panic!("round-trip conversion failed: {err}"));
460        let millis = dt.unix_timestamp_nanos() / 1_000_000;
461        assert_eq!(deadline.unix_millis(), millis);
462        assert_eq!(roundtrip.unix_timestamp_nanos() / 1_000_000, millis);
463    }
464
465    #[test]
466    fn node_error_builder_sets_fields() {
467        let err = NodeError::new("TEST", "example")
468            .with_retry(Some(500))
469            .with_detail_text("context");
470
471        assert!(err.retryable);
472        assert_eq!(err.backoff_ms, Some(500));
473        match err.detail() {
474            Some(ErrorDetail::Text(detail)) => assert_eq!(detail, "context"),
475            other => panic!("unexpected detail {:?}", other),
476        }
477    }
478
479    #[cfg(feature = "std")]
480    #[test]
481    fn node_error_source_roundtrips() {
482        use std::io::Error;
483
484        let source = Error::other("boom");
485        let err = NodeError::new("TEST", "example").with_source(source);
486        assert!(err.source().is_some());
487    }
488}