greentic_runner_host/runner/
invocation.rs1use anyhow::{Context, Result};
2use greentic_types::{EnvId, InvocationEnvelope, TenantCtx, TenantId};
3use serde_json::Value;
4use std::str::FromStr;
5
6#[derive(Clone, Copy)]
10pub struct InvocationMeta<'a> {
11 pub env: &'a str,
12 pub tenant: &'a str,
13 pub flow_id: &'a str,
14 pub node_id: Option<&'a str>,
15 pub provider_id: Option<&'a str>,
16 pub session_id: Option<&'a str>,
17 pub attempt: u32,
18}
19
20pub fn build_invocation_envelope(
21 meta: InvocationMeta<'_>,
22 operation: &str,
23 payload: Value,
24) -> Result<InvocationEnvelope> {
25 let parsed = InvocationPayload::parse(payload);
26 let env_id = EnvId::from_str(meta.env)
27 .unwrap_or_else(|_| EnvId::from_str("local").expect("local env id is valid"));
28 let tenant_id = TenantId::from_str(meta.tenant).unwrap_or_else(|_| {
29 tracing::warn!(
30 tenant = meta.tenant,
31 "invalid tenant id in invocation envelope, falling back to tenant.default"
32 );
33 TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
34 });
35 let mut ctx = TenantCtx::new(env_id, tenant_id).with_flow(meta.flow_id.to_string());
36 if let Some(provider) = meta.provider_id {
37 ctx = ctx.with_provider(provider.to_string());
38 }
39 if let Some(session) = meta.session_id {
40 ctx = ctx.with_session(session.to_string());
41 }
42 if let Some(node) = meta.node_id {
43 ctx = ctx.with_node(node.to_string());
44 }
45 ctx = ctx.with_attempt(meta.attempt);
46
47 let payload_bytes =
48 to_binary_payload(&parsed.payload).context("serialize payload for invocation envelope")?;
49 let metadata_bytes = to_binary_payload(&parsed.metadata)
50 .context("serialize metadata for invocation envelope")?;
51
52 let op = parsed.op.unwrap_or_else(|| operation.to_string());
53 tracing::trace!(
54 flow_id = %meta.flow_id,
55 node_id = ?meta.node_id,
56 op = %op,
57 payload = %parsed.payload,
58 "built invocation envelope; runtime owns ctx; flows must not embed ctx"
59 );
60
61 Ok(InvocationEnvelope {
62 ctx,
63 flow_id: meta.flow_id.to_string(),
64 node_id: meta.node_id.map(str::to_string),
65 op,
66 payload: payload_bytes,
67 metadata: metadata_bytes,
68 })
69}
70
71fn to_binary_payload(value: &Value) -> Result<Vec<u8>> {
72 serde_json::to_vec(value).context("failed to encode payload for envelope")
73}
74
75struct InvocationPayload {
76 op: Option<String>,
77 payload: Value,
78 metadata: Value,
79}
80
81impl InvocationPayload {
82 fn parse(value: Value) -> Self {
83 if let Value::Object(mut map) = value {
84 if let Some(envelope) = map.remove("envelope") {
85 return Self::from_envelope(envelope);
86 }
87 let op = extract_string(map.remove("op").or_else(|| map.remove("operation")));
91 let payload = map
92 .remove("payload")
93 .or_else(|| map.remove("input"))
94 .unwrap_or(Value::Null);
95 let metadata = map
96 .remove("metadata")
97 .or_else(|| map.remove("config"))
98 .unwrap_or(Value::Null);
99 if op.is_some() || !payload.is_null() || !metadata.is_null() {
100 if payload.is_null() && !map.is_empty() {
101 return Self {
102 op,
103 payload: Value::Object(map),
104 metadata,
105 };
106 }
107 return Self {
108 op,
109 payload,
110 metadata,
111 };
112 }
113 return Self {
114 op: None,
115 payload: Value::Object(map),
116 metadata: Value::Null,
117 };
118 }
119 InvocationPayload {
120 op: None,
121 payload: value,
122 metadata: Value::Null,
123 }
124 }
125
126 fn from_envelope(value: Value) -> Self {
127 if let Value::Object(map) = value {
128 let op = extract_string(map.get("op").cloned());
129 let payload = map.get("payload").cloned().unwrap_or(Value::Null);
130 let metadata = map.get("metadata").cloned().unwrap_or(Value::Null);
131 return Self {
132 op,
133 payload,
134 metadata,
135 };
136 }
137 InvocationPayload {
138 op: None,
139 payload: value,
140 metadata: Value::Null,
141 }
142 }
143}
144
145fn extract_string(value: Option<Value>) -> Option<String> {
146 value.and_then(|v| v.as_str().map(|s| s.to_string()))
147}