1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11 ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20 self as host_v1, HostFns, add_all_v1_to_linker,
21 runner_host_http::RunnerHostHttp,
22 runner_host_kv::RunnerHostKv,
23 secrets_store::{SecretsError, SecretsStoreHost},
24 state_store::{
25 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26 StateStoreHost, TenantCtx as StateTenantCtx,
27 },
28 telemetry_logger::{
29 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31 TenantCtx as TelemetryTenantCtx,
32 },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::flow::FlowHasher;
38use greentic_types::{
39 ComponentId, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata,
40 InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey, TeamId,
41 TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
42 pack_manifest::ExtensionInline,
43};
44use host_v1::http_client::{
45 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
46 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
47 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
48 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
49};
50use indexmap::IndexMap;
51use once_cell::sync::Lazy;
52use parking_lot::{Mutex, RwLock};
53use reqwest::blocking::Client as BlockingClient;
54use runner_core::normalize_under_root;
55use serde::{Deserialize, Serialize};
56use serde_cbor;
57use serde_json::{self, Value};
58use tokio::fs;
59use wasmparser::{Parser, Payload};
60use wasmtime::StoreContextMut;
61use zip::ZipArchive;
62
63use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
64use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
65use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
66
67use crate::config::HostConfig;
68use crate::secrets::{DynSecretsManager, read_secret_blocking};
69use crate::storage::state::STATE_PREFIX;
70use crate::storage::{DynSessionStore, DynStateStore};
71use crate::verify;
72use crate::wasi::RunnerWasiPolicy;
73use tracing::warn;
74use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
75use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
76
77use greentic_flow::model::FlowDoc;
78
79#[allow(dead_code)]
80pub struct PackRuntime {
81 path: PathBuf,
83 archive_path: Option<PathBuf>,
85 config: Arc<HostConfig>,
86 engine: Engine,
87 metadata: PackMetadata,
88 manifest: Option<greentic_types::PackManifest>,
89 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
90 mocks: Option<Arc<MockLayer>>,
91 flows: Option<PackFlows>,
92 components: HashMap<String, PackComponent>,
93 http_client: Arc<BlockingClient>,
94 pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
95 session_store: Option<DynSessionStore>,
96 state_store: Option<DynStateStore>,
97 wasi_policy: Arc<RunnerWasiPolicy>,
98 provider_registry: RwLock<Option<ProviderRegistry>>,
99 secrets: DynSecretsManager,
100 oauth_config: Option<OAuthBrokerConfig>,
101}
102
103struct PackComponent {
104 #[allow(dead_code)]
105 name: String,
106 #[allow(dead_code)]
107 version: String,
108 component: Component,
109}
110
111#[derive(Debug, Default, Clone)]
112pub struct ComponentResolution {
113 pub materialized_root: Option<PathBuf>,
115 pub overrides: HashMap<String, PathBuf>,
117}
118
119fn build_blocking_client() -> BlockingClient {
120 std::thread::spawn(|| {
121 BlockingClient::builder()
122 .no_proxy()
123 .build()
124 .expect("blocking client")
125 })
126 .join()
127 .expect("client build thread panicked")
128}
129
130fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
131 let (root, candidate) = if path.is_absolute() {
132 let parent = path
133 .parent()
134 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
135 let root = parent
136 .canonicalize()
137 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
138 let file = path
139 .file_name()
140 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
141 (root, PathBuf::from(file))
142 } else {
143 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
144 let base = if let Some(parent) = path.parent() {
145 cwd.join(parent)
146 } else {
147 cwd
148 };
149 let root = base
150 .canonicalize()
151 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
152 let file = path
153 .file_name()
154 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
155 (root, PathBuf::from(file))
156 };
157 let safe = normalize_under_root(&root, &candidate)?;
158 Ok((root, safe))
159}
160
161static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct FlowDescriptor {
165 pub id: String,
166 #[serde(rename = "type")]
167 pub flow_type: String,
168 pub profile: String,
169 pub version: String,
170 #[serde(default)]
171 pub description: Option<String>,
172}
173
174pub struct HostState {
175 config: Arc<HostConfig>,
176 http_client: Arc<BlockingClient>,
177 default_env: String,
178 #[allow(dead_code)]
179 session_store: Option<DynSessionStore>,
180 state_store: Option<DynStateStore>,
181 mocks: Option<Arc<MockLayer>>,
182 secrets: DynSecretsManager,
183 oauth_config: Option<OAuthBrokerConfig>,
184 oauth_host: OAuthBrokerHost,
185}
186
187impl HostState {
188 #[allow(clippy::default_constructed_unit_structs)]
189 pub fn new(
190 config: Arc<HostConfig>,
191 http_client: Arc<BlockingClient>,
192 mocks: Option<Arc<MockLayer>>,
193 session_store: Option<DynSessionStore>,
194 state_store: Option<DynStateStore>,
195 secrets: DynSecretsManager,
196 oauth_config: Option<OAuthBrokerConfig>,
197 ) -> Result<Self> {
198 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
199 Ok(Self {
200 config,
201 http_client,
202 default_env,
203 session_store,
204 state_store,
205 mocks,
206 secrets,
207 oauth_config,
208 oauth_host: OAuthBrokerHost::default(),
209 })
210 }
211
212 pub fn get_secret(&self, key: &str) -> Result<String> {
213 if provider_core_only::is_enabled() {
214 bail!(provider_core_only::blocked_message("secrets"))
215 }
216 if !self.config.secrets_policy.is_allowed(key) {
217 bail!("secret {key} is not permitted by bindings policy");
218 }
219 if let Some(mock) = &self.mocks
220 && let Some(value) = mock.secrets_lookup(key)
221 {
222 return Ok(value);
223 }
224 let bytes = read_secret_blocking(&self.secrets, key)
225 .context("failed to read secret from manager")?;
226 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
227 Ok(value)
228 }
229
230 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
231 let tenant_raw = ctx
232 .as_ref()
233 .map(|ctx| ctx.tenant.clone())
234 .unwrap_or_else(|| self.config.tenant.clone());
235 let env_raw = ctx
236 .as_ref()
237 .map(|ctx| ctx.env.clone())
238 .unwrap_or_else(|| self.default_env.clone());
239 let tenant_id = TenantId::from_str(&tenant_raw)
240 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
241 let env_id = EnvId::from_str(&env_raw)
242 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
243 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
244 if let Some(ctx) = ctx {
245 if let Some(team) = ctx.team.or(ctx.team_id) {
246 let team_id =
247 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
248 tenant_ctx = tenant_ctx.with_team(Some(team_id));
249 }
250 if let Some(user) = ctx.user.or(ctx.user_id) {
251 let user_id =
252 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
253 tenant_ctx = tenant_ctx.with_user(Some(user_id));
254 }
255 if let Some(flow) = ctx.flow_id {
256 tenant_ctx = tenant_ctx.with_flow(flow);
257 }
258 if let Some(node) = ctx.node_id {
259 tenant_ctx = tenant_ctx.with_node(node);
260 }
261 if let Some(provider) = ctx.provider_id {
262 tenant_ctx = tenant_ctx.with_provider(provider);
263 }
264 if let Some(session) = ctx.session_id {
265 tenant_ctx = tenant_ctx.with_session(session);
266 }
267 tenant_ctx.trace_id = ctx.trace_id;
268 }
269 Ok(tenant_ctx)
270 }
271
272 fn send_http_request(
273 &mut self,
274 req: HttpRequest,
275 opts: Option<HttpRequestOptionsV1_1>,
276 _ctx: Option<HttpTenantCtx>,
277 ) -> Result<HttpResponse, HttpClientError> {
278 if !self.config.http_enabled {
279 return Err(HttpClientError {
280 code: "denied".into(),
281 message: "http client disabled by policy".into(),
282 });
283 }
284
285 let mut mock_state = None;
286 let raw_body = req.body.clone();
287 if let Some(mock) = &self.mocks
288 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
289 {
290 match mock.http_begin(&meta) {
291 HttpDecision::Mock(response) => {
292 let headers = response
293 .headers
294 .iter()
295 .map(|(k, v)| (k.clone(), v.clone()))
296 .collect();
297 return Ok(HttpResponse {
298 status: response.status,
299 headers,
300 body: response.body.clone().map(|b| b.into_bytes()),
301 });
302 }
303 HttpDecision::Deny(reason) => {
304 return Err(HttpClientError {
305 code: "denied".into(),
306 message: reason,
307 });
308 }
309 HttpDecision::Passthrough { record } => {
310 mock_state = Some((meta, record));
311 }
312 }
313 }
314
315 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
316 let mut builder = self.http_client.request(method, &req.url);
317 for (key, value) in req.headers {
318 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
319 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
320 {
321 builder = builder.header(header, header_value);
322 }
323 }
324
325 if let Some(body) = raw_body.clone() {
326 builder = builder.body(body);
327 }
328
329 if let Some(opts) = opts {
330 if let Some(timeout_ms) = opts.timeout_ms {
331 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
332 }
333 if opts.allow_insecure == Some(true) {
334 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
335 }
336 if let Some(follow_redirects) = opts.follow_redirects
337 && !follow_redirects
338 {
339 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
340 }
341 }
342
343 let response = match builder.send() {
344 Ok(resp) => resp,
345 Err(err) => {
346 warn!(url = %req.url, error = %err, "http client request failed");
347 return Err(HttpClientError {
348 code: "unavailable".into(),
349 message: err.to_string(),
350 });
351 }
352 };
353
354 let status = response.status().as_u16();
355 let headers_vec = response
356 .headers()
357 .iter()
358 .map(|(k, v)| {
359 (
360 k.as_str().to_string(),
361 v.to_str().unwrap_or_default().to_string(),
362 )
363 })
364 .collect::<Vec<_>>();
365 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
366
367 if let Some((meta, true)) = mock_state.take()
368 && let Some(mock) = &self.mocks
369 {
370 let recorded = HttpMockResponse::new(
371 status,
372 headers_vec.clone().into_iter().collect(),
373 body_bytes
374 .as_ref()
375 .map(|b| String::from_utf8_lossy(b).into_owned()),
376 );
377 mock.http_record(&meta, &recorded);
378 }
379
380 Ok(HttpResponse {
381 status,
382 headers: headers_vec,
383 body: body_bytes,
384 })
385 }
386}
387
388impl SecretsStoreHost for HostState {
389 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
390 if provider_core_only::is_enabled() {
391 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
392 return Err(SecretsError::Denied);
393 }
394 if !self.config.secrets_policy.is_allowed(&key) {
395 return Err(SecretsError::Denied);
396 }
397 if let Some(mock) = &self.mocks
398 && let Some(value) = mock.secrets_lookup(&key)
399 {
400 return Ok(Some(value.into_bytes()));
401 }
402 match read_secret_blocking(&self.secrets, &key) {
403 Ok(bytes) => Ok(Some(bytes)),
404 Err(err) => {
405 warn!(secret = %key, error = %err, "secret lookup failed");
406 Err(SecretsError::NotFound)
407 }
408 }
409 }
410}
411
412impl HttpClientHost for HostState {
413 fn send(
414 &mut self,
415 req: HttpRequest,
416 ctx: Option<HttpTenantCtx>,
417 ) -> Result<HttpResponse, HttpClientError> {
418 self.send_http_request(req, None, ctx)
419 }
420}
421
422impl HttpClientHostV1_1 for HostState {
423 fn send(
424 &mut self,
425 req: HttpRequestV1_1,
426 opts: Option<HttpRequestOptionsV1_1>,
427 ctx: Option<HttpTenantCtxV1_1>,
428 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
429 let legacy_req = HttpRequest {
430 method: req.method,
431 url: req.url,
432 headers: req.headers,
433 body: req.body,
434 };
435 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
436 env: ctx.env,
437 tenant: ctx.tenant,
438 tenant_id: ctx.tenant_id,
439 team: ctx.team,
440 team_id: ctx.team_id,
441 user: ctx.user,
442 user_id: ctx.user_id,
443 trace_id: ctx.trace_id,
444 correlation_id: ctx.correlation_id,
445 attributes: ctx.attributes,
446 session_id: ctx.session_id,
447 flow_id: ctx.flow_id,
448 node_id: ctx.node_id,
449 provider_id: ctx.provider_id,
450 deadline_ms: ctx.deadline_ms,
451 attempt: ctx.attempt,
452 idempotency_key: ctx.idempotency_key,
453 impersonation: ctx
454 .impersonation
455 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
456 actor_id,
457 reason,
458 }),
459 });
460
461 self.send_http_request(legacy_req, opts, legacy_ctx)
462 .map(|resp| HttpResponseV1_1 {
463 status: resp.status,
464 headers: resp.headers,
465 body: resp.body,
466 })
467 .map_err(|err| HttpClientErrorV1_1 {
468 code: err.code,
469 message: err.message,
470 })
471 }
472}
473
474impl StateStoreHost for HostState {
475 fn read(
476 &mut self,
477 key: HostStateKey,
478 ctx: Option<StateTenantCtx>,
479 ) -> Result<Vec<u8>, StateError> {
480 let store = match self.state_store.as_ref() {
481 Some(store) => store.clone(),
482 None => {
483 return Err(StateError {
484 code: "unavailable".into(),
485 message: "state store not configured".into(),
486 });
487 }
488 };
489 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
490 Ok(ctx) => ctx,
491 Err(err) => {
492 return Err(StateError {
493 code: "invalid-ctx".into(),
494 message: err.to_string(),
495 });
496 }
497 };
498 let key = StoreStateKey::from(key);
499 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
500 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
501 Ok(None) => Err(StateError {
502 code: "not_found".into(),
503 message: "state key not found".into(),
504 }),
505 Err(err) => Err(StateError {
506 code: "internal".into(),
507 message: err.to_string(),
508 }),
509 }
510 }
511
512 fn write(
513 &mut self,
514 key: HostStateKey,
515 bytes: Vec<u8>,
516 ctx: Option<StateTenantCtx>,
517 ) -> Result<StateOpAck, StateError> {
518 let store = match self.state_store.as_ref() {
519 Some(store) => store.clone(),
520 None => {
521 return Err(StateError {
522 code: "unavailable".into(),
523 message: "state store not configured".into(),
524 });
525 }
526 };
527 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
528 Ok(ctx) => ctx,
529 Err(err) => {
530 return Err(StateError {
531 code: "invalid-ctx".into(),
532 message: err.to_string(),
533 });
534 }
535 };
536 let key = StoreStateKey::from(key);
537 let value = serde_json::from_slice(&bytes)
538 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
539 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
540 Ok(()) => Ok(StateOpAck::Ok),
541 Err(err) => Err(StateError {
542 code: "internal".into(),
543 message: err.to_string(),
544 }),
545 }
546 }
547
548 fn delete(
549 &mut self,
550 key: HostStateKey,
551 ctx: Option<StateTenantCtx>,
552 ) -> Result<StateOpAck, StateError> {
553 let store = match self.state_store.as_ref() {
554 Some(store) => store.clone(),
555 None => {
556 return Err(StateError {
557 code: "unavailable".into(),
558 message: "state store not configured".into(),
559 });
560 }
561 };
562 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
563 Ok(ctx) => ctx,
564 Err(err) => {
565 return Err(StateError {
566 code: "invalid-ctx".into(),
567 message: err.to_string(),
568 });
569 }
570 };
571 let key = StoreStateKey::from(key);
572 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
573 Ok(_) => Ok(StateOpAck::Ok),
574 Err(err) => Err(StateError {
575 code: "internal".into(),
576 message: err.to_string(),
577 }),
578 }
579 }
580}
581
582impl TelemetryLoggerHost for HostState {
583 fn log(
584 &mut self,
585 span: TelemetrySpanContext,
586 fields: Vec<(String, String)>,
587 _ctx: Option<TelemetryTenantCtx>,
588 ) -> Result<TelemetryAck, TelemetryError> {
589 if let Some(mock) = &self.mocks
590 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
591 {
592 return Ok(TelemetryAck::Ok);
593 }
594 let mut map = serde_json::Map::new();
595 for (k, v) in fields {
596 map.insert(k, Value::String(v));
597 }
598 tracing::info!(
599 tenant = %span.tenant,
600 flow_id = %span.flow_id,
601 node = ?span.node_id,
602 provider = %span.provider,
603 fields = %serde_json::Value::Object(map.clone()),
604 "telemetry log from pack"
605 );
606 Ok(TelemetryAck::Ok)
607 }
608}
609
610impl RunnerHostHttp for HostState {
611 fn request(
612 &mut self,
613 method: String,
614 url: String,
615 headers: Vec<String>,
616 body: Option<Vec<u8>>,
617 ) -> Result<Vec<u8>, String> {
618 let req = HttpRequest {
619 method,
620 url,
621 headers: headers
622 .chunks(2)
623 .filter_map(|chunk| {
624 if chunk.len() == 2 {
625 Some((chunk[0].clone(), chunk[1].clone()))
626 } else {
627 None
628 }
629 })
630 .collect(),
631 body,
632 };
633 match HttpClientHost::send(self, req, None) {
634 Ok(resp) => Ok(resp.body.unwrap_or_default()),
635 Err(err) => Err(err.message),
636 }
637 }
638}
639
640impl RunnerHostKv for HostState {
641 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
642 None
643 }
644
645 fn put(&mut self, _ns: String, _key: String, _val: String) {}
646}
647
648enum ManifestLoad {
649 New {
650 manifest: Box<greentic_types::PackManifest>,
651 flows: PackFlows,
652 },
653 Legacy {
654 manifest: Box<legacy_pack::PackManifest>,
655 flows: PackFlows,
656 },
657}
658
659fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
660 let mut archive = ZipArchive::new(File::open(path)?)
661 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
662 let bytes = read_entry(&mut archive, "manifest.cbor")
663 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
664 match decode_pack_manifest(&bytes) {
665 Ok(manifest) => {
666 let cache = PackFlows::from_manifest(manifest.clone());
667 Ok(ManifestLoad::New {
668 manifest: Box::new(manifest),
669 flows: cache,
670 })
671 }
672 Err(err) => {
673 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
674 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
676 .context("failed to decode legacy pack manifest from manifest.cbor")?;
677 let flows = load_legacy_flows(&mut archive, &legacy)?;
678 Ok(ManifestLoad::Legacy {
679 manifest: Box::new(legacy),
680 flows,
681 })
682 }
683 }
684}
685
686fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
687 let manifest_path = root.join("manifest.cbor");
688 let bytes = std::fs::read(&manifest_path)
689 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
690 match decode_pack_manifest(&bytes) {
691 Ok(manifest) => {
692 let cache = PackFlows::from_manifest(manifest.clone());
693 Ok(ManifestLoad::New {
694 manifest: Box::new(manifest),
695 flows: cache,
696 })
697 }
698 Err(err) => {
699 tracing::debug!(
700 error = %err,
701 pack = %root.display(),
702 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
703 );
704 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
705 .context("failed to decode legacy pack manifest from manifest.cbor")?;
706 let flows = load_legacy_flows_from_dir(root, &legacy)?;
707 Ok(ManifestLoad::Legacy {
708 manifest: Box::new(legacy),
709 flows,
710 })
711 }
712 }
713}
714
715fn load_legacy_flows(
716 archive: &mut ZipArchive<File>,
717 manifest: &legacy_pack::PackManifest,
718) -> Result<PackFlows> {
719 let mut flows = HashMap::new();
720 let mut descriptors = Vec::new();
721
722 for entry in &manifest.flows {
723 let bytes = read_entry(archive, &entry.file_json)
724 .with_context(|| format!("missing flow json {}", entry.file_json))?;
725 let doc: FlowDoc = serde_json::from_slice(&bytes)
726 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
727 let normalized = normalize_flow_doc(doc);
728 let flow_ir = flow_doc_to_ir(normalized)?;
729 let flow = flow_ir_to_flow(flow_ir)?;
730
731 descriptors.push(FlowDescriptor {
732 id: entry.id.clone(),
733 flow_type: entry.kind.clone(),
734 profile: manifest.meta.pack_id.clone(),
735 version: manifest.meta.version.to_string(),
736 description: None,
737 });
738 flows.insert(entry.id.clone(), flow);
739 }
740
741 let mut entry_flows = manifest.meta.entry_flows.clone();
742 if entry_flows.is_empty() {
743 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
744 }
745 let metadata = PackMetadata {
746 pack_id: manifest.meta.pack_id.clone(),
747 version: manifest.meta.version.to_string(),
748 entry_flows,
749 secret_requirements: Vec::new(),
750 };
751
752 Ok(PackFlows {
753 descriptors,
754 flows,
755 metadata,
756 })
757}
758
759fn load_legacy_flows_from_dir(
760 root: &Path,
761 manifest: &legacy_pack::PackManifest,
762) -> Result<PackFlows> {
763 let mut flows = HashMap::new();
764 let mut descriptors = Vec::new();
765
766 for entry in &manifest.flows {
767 let path = root.join(&entry.file_json);
768 let bytes = std::fs::read(&path)
769 .with_context(|| format!("missing flow json {}", path.display()))?;
770 let doc: FlowDoc = serde_json::from_slice(&bytes)
771 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
772 let normalized = normalize_flow_doc(doc);
773 let flow_ir = flow_doc_to_ir(normalized)?;
774 let flow = flow_ir_to_flow(flow_ir)?;
775
776 descriptors.push(FlowDescriptor {
777 id: entry.id.clone(),
778 flow_type: entry.kind.clone(),
779 profile: manifest.meta.pack_id.clone(),
780 version: manifest.meta.version.to_string(),
781 description: None,
782 });
783 flows.insert(entry.id.clone(), flow);
784 }
785
786 let mut entry_flows = manifest.meta.entry_flows.clone();
787 if entry_flows.is_empty() {
788 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
789 }
790 let metadata = PackMetadata {
791 pack_id: manifest.meta.pack_id.clone(),
792 version: manifest.meta.version.to_string(),
793 entry_flows,
794 secret_requirements: Vec::new(),
795 };
796
797 Ok(PackFlows {
798 descriptors,
799 flows,
800 metadata,
801 })
802}
803
804pub struct ComponentState {
805 pub host: HostState,
806 wasi_ctx: WasiCtx,
807 resource_table: ResourceTable,
808}
809
810impl ComponentState {
811 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
812 let wasi_ctx = policy
813 .instantiate()
814 .context("failed to build WASI context")?;
815 Ok(Self {
816 host,
817 wasi_ctx,
818 resource_table: ResourceTable::new(),
819 })
820 }
821
822 fn host_mut(&mut self) -> &mut HostState {
823 &mut self.host
824 }
825}
826
827impl control::Host for ComponentState {
828 fn should_cancel(&mut self) -> bool {
829 false
830 }
831
832 fn yield_now(&mut self) {
833 }
835}
836
837fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
838 let mut inst = linker.instance("greentic:component/control@0.4.0")?;
839 inst.func_wrap(
840 "should-cancel",
841 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
842 let host = caller.data_mut();
843 Ok((ComponentControlHost::should_cancel(host),))
844 },
845 )?;
846 inst.func_wrap(
847 "yield-now",
848 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
849 let host = caller.data_mut();
850 ComponentControlHost::yield_now(host);
851 Ok(())
852 },
853 )?;
854 Ok(())
855}
856
857pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
858 add_wasi_to_linker(linker)?;
859 add_all_v1_to_linker(
860 linker,
861 HostFns {
862 http_client_v1_1: Some(|state| state.host_mut()),
863 http_client: Some(|state| state.host_mut()),
864 oauth_broker: None,
865 runner_host_http: Some(|state| state.host_mut()),
866 runner_host_kv: Some(|state| state.host_mut()),
867 telemetry_logger: Some(|state| state.host_mut()),
868 state_store: Some(|state| state.host_mut()),
869 secrets_store: Some(|state| state.host_mut()),
870 },
871 )?;
872 Ok(())
873}
874
875impl OAuthHostContext for ComponentState {
876 fn tenant_id(&self) -> &str {
877 &self.host.config.tenant
878 }
879
880 fn env(&self) -> &str {
881 &self.host.default_env
882 }
883
884 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
885 &mut self.host.oauth_host
886 }
887
888 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
889 self.host.oauth_config.as_ref()
890 }
891}
892
893impl WasiView for ComponentState {
894 fn ctx(&mut self) -> WasiCtxView<'_> {
895 WasiCtxView {
896 ctx: &mut self.wasi_ctx,
897 table: &mut self.resource_table,
898 }
899 }
900}
901
902#[allow(unsafe_code)]
903unsafe impl Send for ComponentState {}
904#[allow(unsafe_code)]
905unsafe impl Sync for ComponentState {}
906
907impl PackRuntime {
908 #[allow(clippy::too_many_arguments)]
909 pub async fn load(
910 path: impl AsRef<Path>,
911 config: Arc<HostConfig>,
912 mocks: Option<Arc<MockLayer>>,
913 archive_source: Option<&Path>,
914 session_store: Option<DynSessionStore>,
915 state_store: Option<DynStateStore>,
916 wasi_policy: Arc<RunnerWasiPolicy>,
917 secrets: DynSecretsManager,
918 oauth_config: Option<OAuthBrokerConfig>,
919 verify_archive: bool,
920 component_resolution: ComponentResolution,
921 ) -> Result<Self> {
922 let path = path.as_ref();
923 let (_pack_root, safe_path) = normalize_pack_path(path)?;
924 let path_meta = std::fs::metadata(&safe_path).ok();
925 let is_dir = path_meta
926 .as_ref()
927 .map(|meta| meta.is_dir())
928 .unwrap_or(false);
929 let is_component = !is_dir
930 && safe_path
931 .extension()
932 .and_then(|ext| ext.to_str())
933 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
934 .unwrap_or(false);
935 let archive_hint_path = if let Some(source) = archive_source {
936 let (_, normalized) = normalize_pack_path(source)?;
937 Some(normalized)
938 } else if is_component || is_dir {
939 None
940 } else {
941 Some(safe_path.clone())
942 };
943 let archive_hint = archive_hint_path.as_deref();
944 if verify_archive {
945 if let Some(verify_target) = archive_hint.and_then(|p| {
946 std::fs::metadata(p)
947 .ok()
948 .filter(|meta| meta.is_file())
949 .map(|_| p)
950 }) {
951 verify::verify_pack(verify_target).await?;
952 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
953 } else {
954 tracing::debug!("skipping archive verification (no archive source)");
955 }
956 }
957 let engine = Engine::default();
958 let mut metadata = PackMetadata::fallback(&safe_path);
959 let mut manifest = None;
960 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
961 let mut flows = None;
962 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
963 if is_dir {
964 Some(safe_path.clone())
965 } else {
966 None
967 }
968 });
969
970 if let Some(root) = materialized_root.as_ref() {
971 match load_manifest_and_flows_from_dir(root) {
972 Ok(ManifestLoad::New {
973 manifest: m,
974 flows: cache,
975 }) => {
976 metadata = cache.metadata.clone();
977 manifest = Some(*m);
978 flows = Some(cache);
979 }
980 Ok(ManifestLoad::Legacy {
981 manifest: m,
982 flows: cache,
983 }) => {
984 metadata = cache.metadata.clone();
985 legacy_manifest = Some(m);
986 flows = Some(cache);
987 }
988 Err(err) => {
989 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
990 }
991 }
992 }
993
994 if manifest.is_none()
995 && legacy_manifest.is_none()
996 && let Some(archive_path) = archive_hint
997 {
998 match load_manifest_and_flows(archive_path) {
999 Ok(ManifestLoad::New {
1000 manifest: m,
1001 flows: cache,
1002 }) => {
1003 metadata = cache.metadata.clone();
1004 manifest = Some(*m);
1005 flows = Some(cache);
1006 }
1007 Ok(ManifestLoad::Legacy {
1008 manifest: m,
1009 flows: cache,
1010 }) => {
1011 metadata = cache.metadata.clone();
1012 legacy_manifest = Some(m);
1013 flows = Some(cache);
1014 }
1015 Err(err) => {
1016 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1017 }
1018 }
1019 }
1020 let components = if is_component {
1021 let wasm_bytes = fs::read(&safe_path).await?;
1022 metadata = PackMetadata::from_wasm(&wasm_bytes)
1023 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1024 let name = safe_path
1025 .file_stem()
1026 .map(|s| s.to_string_lossy().to_string())
1027 .unwrap_or_else(|| "component".to_string());
1028 let component = Component::from_binary(&engine, &wasm_bytes)?;
1029 let mut map = HashMap::new();
1030 map.insert(
1031 name.clone(),
1032 PackComponent {
1033 name,
1034 version: metadata.version.clone(),
1035 component,
1036 },
1037 );
1038 map
1039 } else {
1040 let specs = component_specs(manifest.as_ref(), legacy_manifest.as_deref());
1041 if specs.is_empty() {
1042 HashMap::new()
1043 } else {
1044 let mut loaded = HashMap::new();
1045 let mut missing: HashSet<String> =
1046 specs.iter().map(|spec| spec.id.clone()).collect();
1047 let mut searched = Vec::new();
1048
1049 if !component_resolution.overrides.is_empty() {
1050 load_components_from_overrides(
1051 &engine,
1052 &component_resolution.overrides,
1053 &specs,
1054 &mut missing,
1055 &mut loaded,
1056 )?;
1057 searched.push("override map".to_string());
1058 }
1059
1060 if let Some(root) = materialized_root.as_ref() {
1061 load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1062 searched.push(format!("components dir {}", root.display()));
1063 }
1064
1065 if let Some(archive_path) = archive_hint {
1066 load_components_from_archive(
1067 &engine,
1068 archive_path,
1069 &specs,
1070 &mut missing,
1071 &mut loaded,
1072 )?;
1073 searched.push(format!("archive {}", archive_path.display()));
1074 }
1075
1076 if !missing.is_empty() {
1077 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1078 let sources = if searched.is_empty() {
1079 "no component sources".to_string()
1080 } else {
1081 searched.join(", ")
1082 };
1083 bail!(
1084 "components missing: {}; looked in {}",
1085 missing_list,
1086 sources
1087 );
1088 }
1089
1090 loaded
1091 }
1092 };
1093 let http_client = Arc::clone(&HTTP_CLIENT);
1094 Ok(Self {
1095 path: safe_path,
1096 archive_path: archive_hint.map(Path::to_path_buf),
1097 config,
1098 engine,
1099 metadata,
1100 manifest,
1101 legacy_manifest,
1102 mocks,
1103 flows,
1104 components,
1105 http_client,
1106 pre_cache: Mutex::new(HashMap::new()),
1107 session_store,
1108 state_store,
1109 wasi_policy,
1110 provider_registry: RwLock::new(None),
1111 secrets,
1112 oauth_config,
1113 })
1114 }
1115
1116 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1117 if let Some(cache) = &self.flows {
1118 return Ok(cache.descriptors.clone());
1119 }
1120 if let Some(manifest) = &self.manifest {
1121 let descriptors = manifest
1122 .flows
1123 .iter()
1124 .map(|flow| FlowDescriptor {
1125 id: flow.id.as_str().to_string(),
1126 flow_type: flow_kind_to_str(flow.kind).to_string(),
1127 profile: manifest.pack_id.as_str().to_string(),
1128 version: manifest.version.to_string(),
1129 description: None,
1130 })
1131 .collect();
1132 return Ok(descriptors);
1133 }
1134 Ok(Vec::new())
1135 }
1136
1137 #[allow(dead_code)]
1138 pub async fn run_flow(
1139 &self,
1140 flow_id: &str,
1141 input: serde_json::Value,
1142 ) -> Result<serde_json::Value> {
1143 let pack = Arc::new(
1144 PackRuntime::load(
1145 &self.path,
1146 Arc::clone(&self.config),
1147 self.mocks.clone(),
1148 self.archive_path.as_deref(),
1149 self.session_store.clone(),
1150 self.state_store.clone(),
1151 Arc::clone(&self.wasi_policy),
1152 self.secrets.clone(),
1153 self.oauth_config.clone(),
1154 false,
1155 ComponentResolution::default(),
1156 )
1157 .await?,
1158 );
1159
1160 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1161 let retry_config = self.config.retry_config().into();
1162 let mocks = pack.mocks.as_deref();
1163 let tenant = self.config.tenant.as_str();
1164
1165 let ctx = FlowContext {
1166 tenant,
1167 flow_id,
1168 node_id: None,
1169 tool: None,
1170 action: None,
1171 session_id: None,
1172 provider_id: None,
1173 retry_config,
1174 observer: None,
1175 mocks,
1176 };
1177
1178 let execution = engine.execute(ctx, input).await?;
1179 match execution.status {
1180 FlowStatus::Completed => Ok(execution.output),
1181 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1182 "status": "pending",
1183 "reason": wait.reason,
1184 "resume": wait.snapshot,
1185 "response": execution.output,
1186 })),
1187 }
1188 }
1189
1190 pub async fn invoke_component(
1191 &self,
1192 component_ref: &str,
1193 ctx: ComponentExecCtx,
1194 operation: &str,
1195 _config_json: Option<String>,
1196 input_json: String,
1197 ) -> Result<Value> {
1198 let pack_component = self
1199 .components
1200 .get(component_ref)
1201 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1202
1203 let mut linker = Linker::new(&self.engine);
1204 register_all(&mut linker)?;
1205 add_component_control_to_linker(&mut linker)?;
1206 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1207 let pre: ComponentPre<ComponentState> = ComponentPre::new(pre_instance)?;
1208
1209 let host_state = HostState::new(
1210 Arc::clone(&self.config),
1211 Arc::clone(&self.http_client),
1212 self.mocks.clone(),
1213 self.session_store.clone(),
1214 self.state_store.clone(),
1215 Arc::clone(&self.secrets),
1216 self.oauth_config.clone(),
1217 )?;
1218 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1219 let mut store = wasmtime::Store::new(&self.engine, store_state);
1220 let bindings: crate::component_api::Component = pre.instantiate_async(&mut store).await?;
1221 let node = bindings.greentic_component_node();
1222
1223 let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1224
1225 match result {
1226 InvokeResult::Ok(body) => {
1227 if body.is_empty() {
1228 return Ok(Value::Null);
1229 }
1230 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1231 }
1232 InvokeResult::Err(NodeError {
1233 code,
1234 message,
1235 retryable,
1236 backoff_ms,
1237 details,
1238 }) => {
1239 let mut obj = serde_json::Map::new();
1240 obj.insert("ok".into(), Value::Bool(false));
1241 let mut error = serde_json::Map::new();
1242 error.insert("code".into(), Value::String(code));
1243 error.insert("message".into(), Value::String(message));
1244 error.insert("retryable".into(), Value::Bool(retryable));
1245 if let Some(backoff) = backoff_ms {
1246 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1247 }
1248 if let Some(details) = details {
1249 error.insert(
1250 "details".into(),
1251 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1252 );
1253 }
1254 obj.insert("error".into(), Value::Object(error));
1255 Ok(Value::Object(obj))
1256 }
1257 }
1258 }
1259
1260 pub fn resolve_provider(
1261 &self,
1262 provider_id: Option<&str>,
1263 provider_type: Option<&str>,
1264 ) -> Result<ProviderBinding> {
1265 let registry = self.provider_registry()?;
1266 registry.resolve(provider_id, provider_type)
1267 }
1268
1269 pub async fn invoke_provider(
1270 &self,
1271 binding: &ProviderBinding,
1272 _ctx: ComponentExecCtx,
1273 op: &str,
1274 input_json: Vec<u8>,
1275 ) -> Result<Value> {
1276 let component_ref = &binding.component_ref;
1277 let pack_component = self
1278 .components
1279 .get(component_ref)
1280 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1281
1282 let mut linker = Linker::new(&self.engine);
1283 register_all(&mut linker)?;
1284 add_component_control_to_linker(&mut linker)?;
1285 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1286 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1287
1288 let host_state = HostState::new(
1289 Arc::clone(&self.config),
1290 Arc::clone(&self.http_client),
1291 self.mocks.clone(),
1292 self.session_store.clone(),
1293 self.state_store.clone(),
1294 Arc::clone(&self.secrets),
1295 self.oauth_config.clone(),
1296 )?;
1297 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1298 let mut store = wasmtime::Store::new(&self.engine, store_state);
1299 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1300 let provider = bindings.greentic_provider_core_schema_core_api();
1301
1302 let result = provider.call_invoke(&mut store, op, &input_json)?;
1303 deserialize_json_bytes(result)
1304 }
1305
1306 fn provider_registry(&self) -> Result<ProviderRegistry> {
1307 if let Some(registry) = self.provider_registry.read().clone() {
1308 return Ok(registry);
1309 }
1310 let manifest = self
1311 .manifest
1312 .as_ref()
1313 .context("pack manifest required for provider resolution")?;
1314 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1315 let registry = ProviderRegistry::new(
1316 manifest,
1317 self.state_store.clone(),
1318 &self.config.tenant,
1319 &env,
1320 )?;
1321 *self.provider_registry.write() = Some(registry.clone());
1322 Ok(registry)
1323 }
1324
1325 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1326 if let Some(cache) = &self.flows {
1327 return cache
1328 .flows
1329 .get(flow_id)
1330 .cloned()
1331 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1332 }
1333 if let Some(manifest) = &self.manifest {
1334 let entry = manifest
1335 .flows
1336 .iter()
1337 .find(|f| f.id.as_str() == flow_id)
1338 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1339 return Ok(entry.flow.clone());
1340 }
1341 bail!("flow '{flow_id}' not available (pack exports disabled)")
1342 }
1343
1344 pub fn metadata(&self) -> &PackMetadata {
1345 &self.metadata
1346 }
1347
1348 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1349 &self.metadata.secret_requirements
1350 }
1351
1352 pub fn missing_secrets(
1353 &self,
1354 tenant_ctx: &TypesTenantCtx,
1355 ) -> Vec<greentic_types::SecretRequirement> {
1356 let env = tenant_ctx.env.as_str().to_string();
1357 let tenant = tenant_ctx.tenant.as_str().to_string();
1358 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1359 self.required_secrets()
1360 .iter()
1361 .filter(|req| {
1362 if let Some(scope) = &req.scope {
1364 if scope.env != env {
1365 return false;
1366 }
1367 if scope.tenant != tenant {
1368 return false;
1369 }
1370 if let Some(ref team_req) = scope.team
1371 && team.as_ref() != Some(team_req)
1372 {
1373 return false;
1374 }
1375 }
1376 read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1377 })
1378 .cloned()
1379 .collect()
1380 }
1381
1382 pub fn for_component_test(
1383 components: Vec<(String, PathBuf)>,
1384 flows: HashMap<String, FlowIR>,
1385 config: Arc<HostConfig>,
1386 ) -> Result<Self> {
1387 let engine = Engine::default();
1388 let mut component_map = HashMap::new();
1389 for (name, path) in components {
1390 if !path.exists() {
1391 bail!("component artifact missing: {}", path.display());
1392 }
1393 let wasm_bytes = std::fs::read(&path)?;
1394 let component = Component::from_binary(&engine, &wasm_bytes)
1395 .with_context(|| format!("failed to compile component {}", path.display()))?;
1396 component_map.insert(
1397 name.clone(),
1398 PackComponent {
1399 name,
1400 version: "0.0.0".into(),
1401 component,
1402 },
1403 );
1404 }
1405
1406 let mut flow_map = HashMap::new();
1407 let mut descriptors = Vec::new();
1408 for (id, ir) in flows {
1409 let flow_type = ir.flow_type.clone();
1410 let flow = flow_ir_to_flow(ir)?;
1411 flow_map.insert(id.clone(), flow);
1412 descriptors.push(FlowDescriptor {
1413 id: id.clone(),
1414 flow_type,
1415 profile: "test".into(),
1416 version: "0.0.0".into(),
1417 description: None,
1418 });
1419 }
1420 let flows_cache = PackFlows {
1421 descriptors: descriptors.clone(),
1422 flows: flow_map,
1423 metadata: PackMetadata::fallback(Path::new("component-test")),
1424 };
1425
1426 Ok(Self {
1427 path: PathBuf::new(),
1428 archive_path: None,
1429 config,
1430 engine,
1431 metadata: PackMetadata::fallback(Path::new("component-test")),
1432 manifest: None,
1433 legacy_manifest: None,
1434 mocks: None,
1435 flows: Some(flows_cache),
1436 components: component_map,
1437 http_client: Arc::clone(&HTTP_CLIENT),
1438 pre_cache: Mutex::new(HashMap::new()),
1439 session_store: None,
1440 state_store: None,
1441 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1442 provider_registry: RwLock::new(None),
1443 secrets: crate::secrets::default_manager(),
1444 oauth_config: None,
1445 })
1446 }
1447}
1448
1449struct PackFlows {
1450 descriptors: Vec<FlowDescriptor>,
1451 flows: HashMap<String, Flow>,
1452 metadata: PackMetadata,
1453}
1454
1455const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1456 "greentic.pack.runtime_flow",
1457 "greentic.pack.flow_runtime",
1458 "greentic.pack.runtime_flows",
1459];
1460
1461#[derive(Debug, Deserialize)]
1462struct RuntimeFlowBundle {
1463 flows: Vec<RuntimeFlow>,
1464}
1465
1466#[derive(Debug, Deserialize)]
1467struct RuntimeFlow {
1468 id: String,
1469 #[serde(alias = "flow_type")]
1470 kind: FlowKind,
1471 #[serde(default)]
1472 schema_version: Option<String>,
1473 #[serde(default)]
1474 start: Option<String>,
1475 #[serde(default)]
1476 entrypoints: BTreeMap<String, Value>,
1477 nodes: BTreeMap<String, RuntimeNode>,
1478 #[serde(default)]
1479 metadata: Option<FlowMetadata>,
1480}
1481
1482#[derive(Debug, Deserialize)]
1483struct RuntimeNode {
1484 #[serde(alias = "component")]
1485 component_id: String,
1486 #[serde(default, alias = "operation")]
1487 operation_name: Option<String>,
1488 #[serde(default, alias = "payload", alias = "input")]
1489 operation_payload: Value,
1490 #[serde(default)]
1491 routing: Option<Routing>,
1492 #[serde(default)]
1493 telemetry: Option<TelemetryHints>,
1494}
1495
1496fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1497 if bytes.is_empty() {
1498 return Ok(Value::Null);
1499 }
1500 serde_json::from_slice(&bytes).or_else(|_| {
1501 String::from_utf8(bytes)
1502 .map(Value::String)
1503 .map_err(|err| anyhow!(err))
1504 })
1505}
1506
1507impl PackFlows {
1508 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1509 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1510 return flows;
1511 }
1512 let descriptors = manifest
1513 .flows
1514 .iter()
1515 .map(|entry| FlowDescriptor {
1516 id: entry.id.as_str().to_string(),
1517 flow_type: flow_kind_to_str(entry.kind).to_string(),
1518 profile: manifest.pack_id.as_str().to_string(),
1519 version: manifest.version.to_string(),
1520 description: None,
1521 })
1522 .collect();
1523 let mut flows = HashMap::new();
1524 for entry in &manifest.flows {
1525 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1526 }
1527 Self {
1528 metadata: PackMetadata::from_manifest(&manifest),
1529 descriptors,
1530 flows,
1531 }
1532 }
1533}
1534
1535fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1536 let extensions = manifest.extensions.as_ref()?;
1537 let extension = extensions.iter().find_map(|(key, ext)| {
1538 if RUNTIME_FLOW_EXTENSION_IDS
1539 .iter()
1540 .any(|candidate| candidate == key)
1541 {
1542 Some(ext)
1543 } else {
1544 None
1545 }
1546 })?;
1547 let runtime_flows = match decode_runtime_flow_extension(extension) {
1548 Some(flows) if !flows.is_empty() => flows,
1549 _ => return None,
1550 };
1551
1552 let descriptors = runtime_flows
1553 .iter()
1554 .map(|flow| FlowDescriptor {
1555 id: flow.id.as_str().to_string(),
1556 flow_type: flow_kind_to_str(flow.kind).to_string(),
1557 profile: manifest.pack_id.as_str().to_string(),
1558 version: manifest.version.to_string(),
1559 description: None,
1560 })
1561 .collect::<Vec<_>>();
1562 let flows = runtime_flows
1563 .into_iter()
1564 .map(|flow| (flow.id.as_str().to_string(), flow))
1565 .collect();
1566
1567 Some(PackFlows {
1568 metadata: PackMetadata::from_manifest(manifest),
1569 descriptors,
1570 flows,
1571 })
1572}
1573
1574fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1575 let value = match extension.inline.as_ref()? {
1576 ExtensionInline::Other(value) => value.clone(),
1577 _ => return None,
1578 };
1579
1580 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1581 return Some(collect_runtime_flows(bundle.flows));
1582 }
1583
1584 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1585 return Some(collect_runtime_flows(flows));
1586 }
1587
1588 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1589 return Some(flows);
1590 }
1591
1592 warn!(
1593 extension = %extension.kind,
1594 version = %extension.version,
1595 "runtime flow extension present but could not be decoded"
1596 );
1597 None
1598}
1599
1600fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1601 flows
1602 .into_iter()
1603 .filter_map(|flow| match runtime_flow_to_flow(flow) {
1604 Ok(flow) => Some(flow),
1605 Err(err) => {
1606 warn!(error = %err, "failed to decode runtime flow");
1607 None
1608 }
1609 })
1610 .collect()
1611}
1612
1613fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1614 let flow_id = FlowId::from_str(&runtime.id)
1615 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1616 let mut entrypoints = runtime.entrypoints;
1617 if entrypoints.is_empty()
1618 && let Some(start) = &runtime.start
1619 {
1620 entrypoints.insert("default".into(), Value::String(start.clone()));
1621 }
1622
1623 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1624 for (id, node) in runtime.nodes {
1625 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1626 let component_id = ComponentId::from_str(&node.component_id)
1627 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1628 let component = FlowComponentRef {
1629 id: component_id,
1630 pack_alias: None,
1631 operation: node.operation_name,
1632 };
1633 let routing = node.routing.unwrap_or(Routing::End);
1634 let telemetry = node.telemetry.unwrap_or_default();
1635 nodes.insert(
1636 node_id.clone(),
1637 Node {
1638 id: node_id,
1639 component,
1640 input: InputMapping {
1641 mapping: node.operation_payload,
1642 },
1643 output: OutputMapping {
1644 mapping: Value::Null,
1645 },
1646 routing,
1647 telemetry,
1648 },
1649 );
1650 }
1651
1652 Ok(Flow {
1653 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1654 id: flow_id,
1655 kind: runtime.kind,
1656 entrypoints,
1657 nodes,
1658 metadata: runtime.metadata.unwrap_or_default(),
1659 })
1660}
1661
1662fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1663 match kind {
1664 greentic_types::FlowKind::Messaging => "messaging",
1665 greentic_types::FlowKind::Event => "event",
1666 greentic_types::FlowKind::ComponentConfig => "component-config",
1667 greentic_types::FlowKind::Job => "job",
1668 greentic_types::FlowKind::Http => "http",
1669 }
1670}
1671
1672fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1673 let mut file = archive
1674 .by_name(name)
1675 .with_context(|| format!("entry {name} missing from archive"))?;
1676 let mut buf = Vec::new();
1677 file.read_to_end(&mut buf)?;
1678 Ok(buf)
1679}
1680
1681fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1682 for node in doc.nodes.values_mut() {
1683 let Some((component_ref, payload)) = node
1684 .raw
1685 .iter()
1686 .next()
1687 .map(|(key, value)| (key.clone(), value.clone()))
1688 else {
1689 continue;
1690 };
1691 if component_ref.starts_with("emit.") {
1692 node.operation = Some(component_ref);
1693 node.payload = payload;
1694 node.raw.clear();
1695 continue;
1696 }
1697 let (target_component, operation, input, config) =
1698 infer_component_exec(&payload, &component_ref);
1699 let mut payload_obj = serde_json::Map::new();
1700 payload_obj.insert("component".into(), Value::String(target_component));
1702 payload_obj.insert("operation".into(), Value::String(operation));
1703 payload_obj.insert("input".into(), input);
1704 if let Some(cfg) = config {
1705 payload_obj.insert("config".into(), cfg);
1706 }
1707 node.operation = Some("component.exec".to_string());
1708 node.payload = Value::Object(payload_obj);
1709 node.raw.clear();
1710 }
1711 doc
1712}
1713
1714fn infer_component_exec(
1715 payload: &Value,
1716 component_ref: &str,
1717) -> (String, String, Value, Option<Value>) {
1718 let default_op = if component_ref.starts_with("templating.") {
1719 "render"
1720 } else {
1721 "invoke"
1722 }
1723 .to_string();
1724
1725 if let Value::Object(map) = payload {
1726 let op = map
1727 .get("op")
1728 .or_else(|| map.get("operation"))
1729 .and_then(Value::as_str)
1730 .map(|s| s.to_string())
1731 .unwrap_or_else(|| default_op.clone());
1732
1733 let mut input = map.clone();
1734 let config = input.remove("config");
1735 let component = input
1736 .get("component")
1737 .or_else(|| input.get("component_ref"))
1738 .and_then(Value::as_str)
1739 .map(|s| s.to_string())
1740 .unwrap_or_else(|| component_ref.to_string());
1741 input.remove("component");
1742 input.remove("component_ref");
1743 input.remove("op");
1744 input.remove("operation");
1745 return (component, op, Value::Object(input), config);
1746 }
1747
1748 (component_ref.to_string(), default_op, payload.clone(), None)
1749}
1750
1751#[derive(Clone, Debug)]
1752struct ComponentSpec {
1753 id: String,
1754 version: String,
1755 legacy_path: Option<String>,
1756}
1757
1758fn component_specs(
1759 manifest: Option<&greentic_types::PackManifest>,
1760 legacy_manifest: Option<&legacy_pack::PackManifest>,
1761) -> Vec<ComponentSpec> {
1762 if let Some(manifest) = manifest {
1763 return manifest
1764 .components
1765 .iter()
1766 .map(|entry| ComponentSpec {
1767 id: entry.id.as_str().to_string(),
1768 version: entry.version.to_string(),
1769 legacy_path: None,
1770 })
1771 .collect();
1772 }
1773 if let Some(legacy_manifest) = legacy_manifest {
1774 return legacy_manifest
1775 .components
1776 .iter()
1777 .map(|entry| ComponentSpec {
1778 id: entry.name.clone(),
1779 version: entry.version.to_string(),
1780 legacy_path: Some(entry.file_wasm.clone()),
1781 })
1782 .collect();
1783 }
1784 Vec::new()
1785}
1786
1787fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1788 if let Some(path) = &spec.legacy_path {
1789 return root.join(path);
1790 }
1791 root.join("components").join(format!("{}.wasm", spec.id))
1792}
1793
1794fn load_components_from_overrides(
1795 engine: &Engine,
1796 overrides: &HashMap<String, PathBuf>,
1797 specs: &[ComponentSpec],
1798 missing: &mut HashSet<String>,
1799 into: &mut HashMap<String, PackComponent>,
1800) -> Result<()> {
1801 for spec in specs {
1802 if !missing.contains(&spec.id) {
1803 continue;
1804 }
1805 let Some(path) = overrides.get(&spec.id) else {
1806 continue;
1807 };
1808 let bytes = std::fs::read(path)
1809 .with_context(|| format!("failed to read override component {}", path.display()))?;
1810 let component = Component::from_binary(engine, &bytes).with_context(|| {
1811 format!(
1812 "failed to compile component {} from override {}",
1813 spec.id,
1814 path.display()
1815 )
1816 })?;
1817 into.insert(
1818 spec.id.clone(),
1819 PackComponent {
1820 name: spec.id.clone(),
1821 version: spec.version.clone(),
1822 component,
1823 },
1824 );
1825 missing.remove(&spec.id);
1826 }
1827 Ok(())
1828}
1829
1830fn load_components_from_dir(
1831 engine: &Engine,
1832 root: &Path,
1833 specs: &[ComponentSpec],
1834 missing: &mut HashSet<String>,
1835 into: &mut HashMap<String, PackComponent>,
1836) -> Result<()> {
1837 for spec in specs {
1838 if !missing.contains(&spec.id) {
1839 continue;
1840 }
1841 let path = component_path_for_spec(root, spec);
1842 if !path.exists() {
1843 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
1844 continue;
1845 }
1846 let bytes = std::fs::read(&path)
1847 .with_context(|| format!("failed to read component {}", path.display()))?;
1848 let component = Component::from_binary(engine, &bytes).with_context(|| {
1849 format!(
1850 "failed to compile component {} from {}",
1851 spec.id,
1852 path.display()
1853 )
1854 })?;
1855 into.insert(
1856 spec.id.clone(),
1857 PackComponent {
1858 name: spec.id.clone(),
1859 version: spec.version.clone(),
1860 component,
1861 },
1862 );
1863 missing.remove(&spec.id);
1864 }
1865 Ok(())
1866}
1867
1868fn load_components_from_archive(
1869 engine: &Engine,
1870 path: &Path,
1871 specs: &[ComponentSpec],
1872 missing: &mut HashSet<String>,
1873 into: &mut HashMap<String, PackComponent>,
1874) -> Result<()> {
1875 let mut archive = ZipArchive::new(File::open(path)?)
1876 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1877 for spec in specs {
1878 if !missing.contains(&spec.id) {
1879 continue;
1880 }
1881 let file_name = spec
1882 .legacy_path
1883 .clone()
1884 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
1885 let bytes = match read_entry(&mut archive, &file_name) {
1886 Ok(bytes) => bytes,
1887 Err(err) => {
1888 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
1889 continue;
1890 }
1891 };
1892 let component = Component::from_binary(engine, &bytes)
1893 .with_context(|| format!("failed to compile component {}", spec.id))?;
1894 into.insert(
1895 spec.id.clone(),
1896 PackComponent {
1897 name: spec.id.clone(),
1898 version: spec.version.clone(),
1899 component,
1900 },
1901 );
1902 missing.remove(&spec.id);
1903 }
1904 Ok(())
1905}
1906
1907#[cfg(test)]
1908mod tests {
1909 use super::*;
1910 use greentic_flow::model::{FlowDoc, NodeDoc};
1911 use serde_json::json;
1912 use std::collections::BTreeMap;
1913
1914 #[test]
1915 fn normalizes_raw_component_to_component_exec() {
1916 let mut nodes = BTreeMap::new();
1917 let mut raw = BTreeMap::new();
1918 raw.insert(
1919 "templating.handlebars".into(),
1920 json!({ "template": "Hi {{name}}" }),
1921 );
1922 nodes.insert(
1923 "start".into(),
1924 NodeDoc {
1925 raw,
1926 routing: json!([{"out": true}]),
1927 ..Default::default()
1928 },
1929 );
1930 let doc = FlowDoc {
1931 id: "welcome".into(),
1932 title: None,
1933 description: None,
1934 flow_type: "messaging".into(),
1935 start: Some("start".into()),
1936 parameters: json!({}),
1937 tags: Vec::new(),
1938 schema_version: None,
1939 entrypoints: BTreeMap::new(),
1940 nodes,
1941 };
1942
1943 let normalized = normalize_flow_doc(doc);
1944 let node = normalized.nodes.get("start").expect("node exists");
1945 assert_eq!(node.operation.as_deref(), Some("component.exec"));
1946 assert!(node.raw.is_empty());
1947 let payload = node.payload.as_object().expect("payload object");
1948 assert_eq!(
1949 payload.get("component"),
1950 Some(&Value::String("templating.handlebars".into()))
1951 );
1952 assert_eq!(
1953 payload.get("operation"),
1954 Some(&Value::String("render".into()))
1955 );
1956 let input = payload.get("input").unwrap();
1957 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1958 }
1959}
1960
1961#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1962pub struct PackMetadata {
1963 pub pack_id: String,
1964 pub version: String,
1965 #[serde(default)]
1966 pub entry_flows: Vec<String>,
1967 #[serde(default)]
1968 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
1969}
1970
1971impl PackMetadata {
1972 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1973 let parser = Parser::new(0);
1974 for payload in parser.parse_all(bytes) {
1975 let payload = payload.ok()?;
1976 match payload {
1977 Payload::CustomSection(section) => {
1978 if section.name() == "greentic.manifest"
1979 && let Ok(meta) = Self::from_bytes(section.data())
1980 {
1981 return Some(meta);
1982 }
1983 }
1984 Payload::DataSection(reader) => {
1985 for segment in reader.into_iter().flatten() {
1986 if let Ok(meta) = Self::from_bytes(segment.data) {
1987 return Some(meta);
1988 }
1989 }
1990 }
1991 _ => {}
1992 }
1993 }
1994 None
1995 }
1996
1997 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1998 #[derive(Deserialize)]
1999 struct RawManifest {
2000 pack_id: String,
2001 version: String,
2002 #[serde(default)]
2003 entry_flows: Vec<String>,
2004 #[serde(default)]
2005 flows: Vec<RawFlow>,
2006 #[serde(default)]
2007 secret_requirements: Vec<greentic_types::SecretRequirement>,
2008 }
2009
2010 #[derive(Deserialize)]
2011 struct RawFlow {
2012 id: String,
2013 }
2014
2015 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2016 let mut entry_flows = if manifest.entry_flows.is_empty() {
2017 manifest.flows.iter().map(|f| f.id.clone()).collect()
2018 } else {
2019 manifest.entry_flows.clone()
2020 };
2021 entry_flows.retain(|id| !id.is_empty());
2022 Ok(Self {
2023 pack_id: manifest.pack_id,
2024 version: manifest.version,
2025 entry_flows,
2026 secret_requirements: manifest.secret_requirements,
2027 })
2028 }
2029
2030 pub fn fallback(path: &Path) -> Self {
2031 let pack_id = path
2032 .file_stem()
2033 .map(|s| s.to_string_lossy().into_owned())
2034 .unwrap_or_else(|| "unknown-pack".to_string());
2035 Self {
2036 pack_id,
2037 version: "0.0.0".to_string(),
2038 entry_flows: Vec::new(),
2039 secret_requirements: Vec::new(),
2040 }
2041 }
2042
2043 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2044 let entry_flows = manifest
2045 .flows
2046 .iter()
2047 .map(|flow| flow.id.as_str().to_string())
2048 .collect::<Vec<_>>();
2049 Self {
2050 pack_id: manifest.pack_id.as_str().to_string(),
2051 version: manifest.version.to_string(),
2052 entry_flows,
2053 secret_requirements: manifest.secret_requirements.clone(),
2054 }
2055 }
2056}