1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::identify_hint::IdentifyInstanceHint;
14use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
15use crate::provider::{ProviderBinding, ProviderRegistry};
16use crate::provider_core::{
17 schema_core::SchemaCorePre as LegacySchemaCorePre,
18 schema_core_path::SchemaCorePre as PathSchemaCorePre,
19 schema_core_schema::SchemaCorePre as SchemaSchemaCorePre,
20};
21use crate::provider_core_only;
22use crate::runtime_refs::RuntimeRefsInjection;
23use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
24use anyhow::{Context, Result, anyhow, bail};
25use futures::executor::block_on;
26use greentic_distributor_client::dist::{
27 CachePolicy, DistClient, DistError, DistOptions, ResolvePolicy,
28};
29use greentic_interfaces_wasmtime::host_helpers::v1::{
30 self as host_v1, HostFns, add_all_v1_to_linker,
31 runner_host_http::RunnerHostHttp,
32 runner_host_kv::RunnerHostKv,
33 runtime_config::{ConfigError, RuntimeConfigHost},
34 secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
35 state_store::{
36 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
37 StateStoreHost, TenantCtx as StateTenantCtx,
38 },
39 telemetry_logger::{
40 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
41 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
42 TenantCtx as TelemetryTenantCtx,
43 },
44};
45use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::http::http_client as http_client_client_alias;
46use greentic_interfaces_wasmtime::instance_identity_instance_identity_describe_v0_1::InstanceIdentityDescribePre;
47use greentic_interfaces_wasmtime::instance_identity_v0_1::InstanceIdentityPre;
48use greentic_interfaces_wasmtime::{
49 http_client_client_v1_0::greentic::interfaces_types::types as http_types_v1_0,
50 http_client_client_v1_1::greentic::interfaces_types::types as http_types_v1_1,
51};
52use greentic_pack::builder as legacy_pack;
53use greentic_types::flow::FlowHasher;
54use greentic_types::{
55 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
56 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
57 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
58 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
59 pack_manifest::ExtensionInline,
60};
61use host_v1::http_client as host_http_client;
62use host_v1::http_client::{
63 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
64 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
65 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
66 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
67};
68use indexmap::IndexMap;
69use once_cell::sync::Lazy;
70use parking_lot::{Mutex, RwLock};
71use reqwest::blocking::Client as BlockingClient;
72use runner_core::normalize_under_root;
73use serde::{Deserialize, Serialize};
74use serde_cbor;
75use serde_json::{self, Value};
76use sha2::Digest;
77use tempfile::TempDir;
78use tokio::fs;
79use wasmparser::{Parser, Payload};
80use wasmtime::{Store, StoreContextMut};
81use wasmtime_wasi_http::WasiHttpCtx;
82use wasmtime_wasi_http::p2::{
83 WasiHttpCtxView, WasiHttpView, add_only_http_to_linker_sync as add_wasi_http_to_linker,
84};
85use wasmtime_wasi_tls::p2::LinkOptions;
86use wasmtime_wasi_tls::{WasiTlsCtx, WasiTlsCtxBuilder, WasiTlsCtxView, WasiTlsView};
87use zip::ZipArchive;
88
89use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
90use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
91use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
92#[cfg(feature = "fault-injection")]
93use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
94
95use crate::config::HostConfig;
96use crate::fault;
97use crate::secrets::{
98 DynSecretsManager, canonicalize_secret_key, read_secret_blocking, write_secret_blocking,
99};
100use crate::storage::state::STATE_PREFIX;
101use crate::storage::{DynSessionStore, DynStateStore};
102use crate::verify;
103use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
104use tracing::warn;
105use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
106use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
107
108use greentic_flow::model::FlowDoc;
109
110#[allow(dead_code)]
111pub struct PackRuntime {
112 path: PathBuf,
114 archive_path: Option<PathBuf>,
116 config: Arc<HostConfig>,
117 engine: Engine,
118 metadata: PackMetadata,
119 manifest: Option<greentic_types::PackManifest>,
120 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
121 component_manifests: HashMap<String, ComponentManifest>,
122 mocks: Option<Arc<MockLayer>>,
123 flows: Option<PackFlows>,
124 components: HashMap<String, PackComponent>,
125 http_client: Arc<BlockingClient>,
126 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
127 session_store: Option<DynSessionStore>,
128 state_store: Option<DynStateStore>,
129 wasi_policy: Arc<RunnerWasiPolicy>,
130 assets_tempdir: Option<TempDir>,
131 provider_registry: RwLock<Option<ProviderRegistry>>,
132 identify_hint_cache: RwLock<HashMap<String, Option<IdentifyInstanceHint>>>,
140 secrets: DynSecretsManager,
141 oauth_config: Option<OAuthBrokerConfig>,
142 cache: CacheManager,
143 runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
149 runtime_refs: Option<RuntimeRefsInjection>,
156}
157
158struct PackComponent {
159 #[allow(dead_code)]
160 name: String,
161 #[allow(dead_code)]
162 version: String,
163 component: Arc<Component>,
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
170pub enum IdentifyOutcome {
171 Unsupported,
174 NoMatch,
177 Identified(String),
180}
181
182impl IdentifyOutcome {
183 pub fn merge_in(&mut self, other: IdentifyOutcome) {
188 match (&*self, &other) {
189 (IdentifyOutcome::Identified(_), _) => {}
191 (_, IdentifyOutcome::Identified(_)) => *self = other,
193 (IdentifyOutcome::Unsupported, IdentifyOutcome::NoMatch) => *self = other,
195 _ => {}
196 }
197 }
198}
199
200fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
201where
202 F: FnOnce() -> Result<T> + Send + 'static,
203 T: Send + 'static,
204{
205 let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
206 let handle = builder
207 .spawn(move || {
208 let pid = std::process::id();
209 let thread_id = std::thread::current().id();
210 let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
211 tracing::info!(
212 event = "wasmtime.thread.start",
213 task = task_name,
214 pid,
215 thread_id = ?thread_id,
216 tokio_handle_present,
217 "starting Wasmtime thread"
218 );
219 task()
220 })
221 .context("failed to spawn Wasmtime thread")?;
222 handle
223 .join()
224 .map_err(|err| {
225 let reason = if let Some(msg) = err.downcast_ref::<&str>() {
226 msg.to_string()
227 } else if let Some(msg) = err.downcast_ref::<String>() {
228 msg.clone()
229 } else {
230 "unknown panic".to_string()
231 };
232 anyhow!("Wasmtime thread panicked: {reason}")
233 })
234 .and_then(|res| res)
235}
236
237#[derive(Debug, Default, Clone)]
238pub struct ComponentResolution {
239 pub materialized_root: Option<PathBuf>,
241 pub overrides: HashMap<String, PathBuf>,
243 pub dist_offline: bool,
245 pub dist_cache_dir: Option<PathBuf>,
247 pub allow_missing_hash: bool,
249}
250
251fn build_blocking_client() -> BlockingClient {
252 std::thread::spawn(|| {
253 BlockingClient::builder()
254 .no_proxy()
255 .build()
256 .expect("blocking client")
257 })
258 .join()
259 .expect("client build thread panicked")
260}
261
262fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
263 let (root, candidate) = if path.is_absolute() {
264 let parent = path
265 .parent()
266 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
267 let root = parent
268 .canonicalize()
269 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
270 let file = path
271 .file_name()
272 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
273 (root, PathBuf::from(file))
274 } else {
275 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
276 let base = if let Some(parent) = path.parent() {
277 cwd.join(parent)
278 } else {
279 cwd
280 };
281 let root = base
282 .canonicalize()
283 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
284 let file = path
285 .file_name()
286 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
287 (root, PathBuf::from(file))
288 };
289 let safe = normalize_under_root(&root, &candidate)?;
290 Ok((root, safe))
291}
292
293static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct FlowDescriptor {
297 pub id: String,
298 #[serde(rename = "type")]
299 pub flow_type: String,
300 pub pack_id: String,
301 pub profile: String,
302 pub version: String,
303 #[serde(default)]
304 pub description: Option<String>,
305}
306
307pub struct HostState {
308 #[allow(dead_code)]
309 pack_id: String,
310 config: Arc<HostConfig>,
311 http_client: Arc<BlockingClient>,
312 default_env: String,
313 #[allow(dead_code)]
314 session_store: Option<DynSessionStore>,
315 state_store: Option<DynStateStore>,
316 mocks: Option<Arc<MockLayer>>,
317 secrets: DynSecretsManager,
318 oauth_config: Option<OAuthBrokerConfig>,
319 oauth_host: OAuthBrokerHost,
320 exec_ctx: Option<ComponentExecCtx>,
321 component_ref: Option<String>,
322 provider_core_component: bool,
323 runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
329 runtime_refs: Option<RuntimeRefsInjection>,
333}
334
335impl HostState {
336 #[allow(clippy::default_constructed_unit_structs)]
337 #[allow(clippy::too_many_arguments)]
338 pub fn new(
339 pack_id: String,
340 config: Arc<HostConfig>,
341 http_client: Arc<BlockingClient>,
342 mocks: Option<Arc<MockLayer>>,
343 session_store: Option<DynSessionStore>,
344 state_store: Option<DynStateStore>,
345 secrets: DynSecretsManager,
346 oauth_config: Option<OAuthBrokerConfig>,
347 exec_ctx: Option<ComponentExecCtx>,
348 component_ref: Option<String>,
349 provider_core_component: bool,
350 runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
351 runtime_refs: Option<RuntimeRefsInjection>,
352 ) -> Result<Self> {
353 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
354 Ok(Self {
355 pack_id,
356 config,
357 http_client,
358 default_env,
359 session_store,
360 state_store,
361 mocks,
362 secrets,
363 oauth_config,
364 oauth_host: OAuthBrokerHost::default(),
365 exec_ctx,
366 component_ref,
367 provider_core_component,
368 runtime_config_non_secret,
369 runtime_refs,
370 })
371 }
372
373 fn instantiate_component_result(
374 linker: &mut Linker<ComponentState>,
375 store: &mut Store<ComponentState>,
376 component: &Component,
377 ctx: &ComponentExecCtx,
378 component_ref: &str,
379 operation: &str,
380 input_json: &str,
381 ) -> Result<InvokeResult> {
382 let pre_instance = linker.instantiate_pre(component)?;
383 match component_api::v0_6::ComponentPre::new(pre_instance) {
384 Ok(pre) => {
385 let envelope = component_api::envelope_v0_6(ctx, component_ref, input_json)?;
386 let operation_owned = operation.to_string();
387 let result = block_on(async {
388 let bindings = pre.instantiate_async(&mut *store).await?;
389 let node = bindings.greentic_component_node();
390 node.call_invoke(&mut *store, &operation_owned, &envelope)
391 })?;
392 component_api::invoke_result_from_v0_6(result)
393 }
394 Err(err_v06) => {
395 if !is_missing_node_export(&err_v06, "0.6.0") {
396 return Err(err_v06.into());
397 }
398 let pre_instance = linker.instantiate_pre(component)?;
399 match component_api::v0_5::ComponentPre::new(pre_instance) {
400 Ok(pre) => {
401 let result = block_on(async {
402 let bindings = pre.instantiate_async(&mut *store).await?;
403 let node = bindings.greentic_component_node();
404 let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
405 let operation_owned = operation.to_string();
406 let input_owned = input_json.to_string();
407 node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
408 })?;
409 Ok(component_api::invoke_result_from_v0_5(result))
410 }
411 Err(err) => {
412 if !is_missing_node_export(&err, "0.5.0") {
413 return Err(err.into());
414 }
415 let pre_instance = linker.instantiate_pre(component)?;
416 match component_api::v0_4::ComponentPre::new(pre_instance) {
417 Ok(pre) => {
418 let result = block_on(async {
419 let bindings = pre.instantiate_async(&mut *store).await?;
420 let node = bindings.greentic_component_node();
421 let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
422 let operation_owned = operation.to_string();
423 let input_owned = input_json.to_string();
424 node.call_invoke(
425 &mut *store,
426 &ctx_v04,
427 &operation_owned,
428 &input_owned,
429 )
430 })?;
431 Ok(component_api::invoke_result_from_v0_4(result))
432 }
433 Err(err_v04) => {
434 if is_missing_node_export(&err_v04, "0.4.0") {
435 Self::try_v06_runtime(linker, store, component, input_json)
436 } else {
437 Err(err_v04.into())
438 }
439 }
440 }
441 }
442 }
443 }
444 }
445 }
446
447 fn try_v06_runtime(
450 linker: &mut Linker<ComponentState>,
451 store: &mut Store<ComponentState>,
452 component: &Component,
453 input_json: &str,
454 ) -> Result<InvokeResult> {
455 let pre_instance = linker.instantiate_pre(component)?;
456 let pre = component_api::v0_6_runtime::ComponentV0V6RuntimePre::new(pre_instance).map_err(
457 |err| err.context("component exports neither node@0.5/0.4 nor component-runtime@0.6"),
458 )?;
459
460 let result = block_on(async {
461 let bindings = pre.instantiate_async(&mut *store).await?;
462 let runtime = bindings.greentic_component_component_runtime();
463
464 let input_value: Value = serde_json::from_str(input_json).unwrap_or(Value::Null);
466 let input_cbor =
467 serde_cbor::to_vec(&input_value).context("encode input as CBOR for v0.6")?;
468 let empty_state = serde_cbor::to_vec(&Value::Object(Default::default()))
469 .context("encode empty state")?;
470
471 let run_result = runtime
472 .call_run(&mut *store, &input_cbor, &empty_state)
473 .map_err(|err| err.context("v0.6 component-runtime::run call failed"))?;
474
475 let output_value: Value = serde_cbor::from_slice(&run_result.output)
477 .context("decode v0.6 run output CBOR")?;
478 let output_json = serde_json::to_string(&output_value)
479 .context("serialize v0.6 run output to JSON")?;
480
481 Ok::<_, anyhow::Error>(output_json)
482 })?;
483
484 Ok(InvokeResult::Ok(result))
485 }
486
487 fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
488 match result {
489 InvokeResult::Ok(body) => {
490 if body.is_empty() {
491 return Ok(Value::Null);
492 }
493 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
494 }
495 InvokeResult::Err(NodeError {
496 code,
497 message,
498 retryable,
499 backoff_ms,
500 details,
501 }) => {
502 let mut obj = serde_json::Map::new();
503 obj.insert("ok".into(), Value::Bool(false));
504 let mut error = serde_json::Map::new();
505 error.insert("code".into(), Value::String(code));
506 error.insert("message".into(), Value::String(message));
507 error.insert("retryable".into(), Value::Bool(retryable));
508 if let Some(backoff) = backoff_ms {
509 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
510 }
511 if let Some(details) = details {
512 error.insert(
513 "details".into(),
514 serde_json::from_str(&details).unwrap_or(Value::String(details)),
515 );
516 }
517 obj.insert("error".into(), Value::Object(error));
518 Ok(Value::Object(obj))
519 }
520 }
521 }
522
523 fn secrets_tenant_ctx(&self) -> TypesTenantCtx {
527 let mut ctx = self.config.tenant_ctx();
528 if let Some(exec_ctx) = self.exec_ctx.as_ref()
529 && let Some(team) = exec_ctx.tenant.team.as_ref()
530 && let Ok(team_id) = TeamId::from_str(team)
531 {
532 ctx = ctx.with_team(Some(team_id));
533 }
534 ctx
535 }
536
537 pub fn get_secret(&self, key: &str) -> Result<String> {
538 if provider_core_only::is_enabled() {
539 bail!(provider_core_only::blocked_message("secrets"))
540 }
541 if !self.config.secrets_policy.is_allowed(key) {
542 bail!("secret {key} is not permitted by bindings policy");
543 }
544 if let Some(mock) = &self.mocks
545 && let Some(value) = mock.secrets_lookup(key)
546 {
547 return Ok(value);
548 }
549 let ctx = self.secrets_tenant_ctx();
550 let canonical_key = canonicalize_secret_key(key);
551 let bytes = read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key)
552 .context("failed to read secret from manager")?;
553 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
554 Ok(value)
555 }
556
557 fn allows_secret_write_in_provider_core_only(&self) -> bool {
558 self.provider_core_component || self.component_ref.is_none()
559 }
560
561 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
562 let tenant_raw = ctx
563 .as_ref()
564 .map(|ctx| ctx.tenant.clone())
565 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
566 .unwrap_or_else(|| self.config.tenant.clone());
567 let env_raw = ctx
568 .as_ref()
569 .map(|ctx| ctx.env.clone())
570 .unwrap_or_else(|| self.default_env.clone());
571 let tenant_id = TenantId::from_str(&tenant_raw)
572 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
573 let env_id = EnvId::from_str(&env_raw)
574 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
575 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
576 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
577 if let Some(team) = exec_ctx.tenant.team.as_ref() {
578 let team_id =
579 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
580 tenant_ctx = tenant_ctx.with_team(Some(team_id));
581 }
582 if let Some(user) = exec_ctx.tenant.user.as_ref() {
583 let user_id =
584 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
585 tenant_ctx = tenant_ctx.with_user(Some(user_id));
586 }
587 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
588 if let Some(node) = exec_ctx.node_id.as_ref() {
589 tenant_ctx = tenant_ctx.with_node(node.clone());
590 }
591 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
592 tenant_ctx = tenant_ctx.with_session(session.clone());
593 }
594 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
595 }
596
597 if let Some(ctx) = ctx {
598 if let Some(team) = ctx.team.or(ctx.team_id) {
599 let team_id =
600 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
601 tenant_ctx = tenant_ctx.with_team(Some(team_id));
602 }
603 if let Some(user) = ctx.user.or(ctx.user_id) {
604 let user_id =
605 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
606 tenant_ctx = tenant_ctx.with_user(Some(user_id));
607 }
608 if let Some(flow) = ctx.flow_id {
609 tenant_ctx = tenant_ctx.with_flow(flow);
610 }
611 if let Some(node) = ctx.node_id {
612 tenant_ctx = tenant_ctx.with_node(node);
613 }
614 if let Some(provider) = ctx.provider_id {
615 tenant_ctx = tenant_ctx.with_provider(provider);
616 }
617 if let Some(session) = ctx.session_id {
618 tenant_ctx = tenant_ctx.with_session(session);
619 }
620 tenant_ctx.trace_id = ctx.trace_id;
621 }
622 Ok(tenant_ctx)
623 }
624
625 fn send_http_request(
626 &mut self,
627 req: HttpRequest,
628 opts: Option<HttpRequestOptionsV1_1>,
629 _ctx: Option<HttpTenantCtx>,
630 ) -> Result<HttpResponse, HttpClientError> {
631 if !self.config.http_enabled {
632 return Err(HttpClientError {
633 code: "denied".into(),
634 message: "http client disabled by policy".into(),
635 });
636 }
637
638 let mut mock_state = None;
639 let raw_body = req.body.clone();
640 if let Some(mock) = &self.mocks
641 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
642 {
643 match mock.http_begin(&meta) {
644 HttpDecision::Mock(response) => {
645 let headers = response
646 .headers
647 .iter()
648 .map(|(k, v)| (k.clone(), v.clone()))
649 .collect();
650 return Ok(HttpResponse {
651 status: response.status,
652 headers,
653 body: response.body.clone().map(|b| b.into_bytes()),
654 });
655 }
656 HttpDecision::Deny(reason) => {
657 return Err(HttpClientError {
658 code: "denied".into(),
659 message: reason,
660 });
661 }
662 HttpDecision::Passthrough { record } => {
663 mock_state = Some((meta, record));
664 }
665 }
666 }
667
668 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
669 let mut builder = self.http_client.request(method, &req.url);
670 for (key, value) in req.headers {
671 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
672 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
673 {
674 builder = builder.header(header, header_value);
675 }
676 }
677
678 if let Some(body) = raw_body.clone() {
679 builder = builder.body(body);
680 }
681
682 if let Some(opts) = opts {
683 if let Some(timeout_ms) = opts.timeout_ms {
684 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
685 }
686 if opts.allow_insecure == Some(true) {
687 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
688 }
689 if let Some(follow_redirects) = opts.follow_redirects
690 && !follow_redirects
691 {
692 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
693 }
694 }
695
696 let response = match builder.send() {
697 Ok(resp) => resp,
698 Err(err) => {
699 warn!(url = %req.url, error = %err, "http client request failed");
700 return Err(HttpClientError {
701 code: "unavailable".into(),
702 message: err.to_string(),
703 });
704 }
705 };
706
707 let status = response.status().as_u16();
708 let headers_vec = response
709 .headers()
710 .iter()
711 .map(|(k, v)| {
712 (
713 k.as_str().to_string(),
714 v.to_str().unwrap_or_default().to_string(),
715 )
716 })
717 .collect::<Vec<_>>();
718 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
719
720 if let Some((meta, true)) = mock_state.take()
721 && let Some(mock) = &self.mocks
722 {
723 let recorded = HttpMockResponse::new(
724 status,
725 headers_vec.clone().into_iter().collect(),
726 body_bytes
727 .as_ref()
728 .map(|b| String::from_utf8_lossy(b).into_owned()),
729 );
730 mock.http_record(&meta, &recorded);
731 }
732
733 Ok(HttpResponse {
734 status,
735 headers: headers_vec,
736 body: body_bytes,
737 })
738 }
739}
740
741#[cfg(test)]
742mod canonicalize_tests {
743 use crate::secrets::canonicalize_secret_key;
744
745 #[test]
746 fn upper_snake_to_lower_snake() {
747 assert_eq!(
748 canonicalize_secret_key("TELEGRAM_BOT_TOKEN"),
749 "telegram_bot_token"
750 );
751 }
752
753 #[test]
754 fn trim_and_replace_non_alphanumeric() {
755 assert_eq!(
756 canonicalize_secret_key(" webex-bot-token "),
757 "webex_bot_token"
758 );
759 }
760
761 #[test]
762 fn preserve_existing_lower_snake_with_extra_underscores() {
763 assert_eq!(canonicalize_secret_key("MiXeD__Case"), "mixed__case");
764 }
765}
766
767impl SecretsStoreHost for HostState {
768 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
769 if provider_core_only::is_enabled() {
770 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
771 return Err(SecretsError::Denied);
772 }
773 if !self.config.secrets_policy.is_allowed(&key) {
774 return Err(SecretsError::Denied);
775 }
776 if let Some(mock) = &self.mocks
777 && let Some(value) = mock.secrets_lookup(&key)
778 {
779 return Ok(Some(value.into_bytes()));
780 }
781 let ctx = self.secrets_tenant_ctx();
782 let canonical_key = canonicalize_secret_key(&key);
783 match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
784 Ok(bytes) => Ok(Some(bytes)),
785 Err(err) => {
786 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
787 Err(SecretsError::NotFound)
788 }
789 }
790 }
791}
792
793impl SecretsStoreHostV1_1 for HostState {
794 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
795 if provider_core_only::is_enabled() {
796 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
797 return Err(SecretsErrorV1_1::Denied);
798 }
799 if !self.config.secrets_policy.is_allowed(&key) {
800 return Err(SecretsErrorV1_1::Denied);
801 }
802 if let Some(mock) = &self.mocks
803 && let Some(value) = mock.secrets_lookup(&key)
804 {
805 return Ok(Some(value.into_bytes()));
806 }
807 let ctx = self.secrets_tenant_ctx();
808 let canonical_key = canonicalize_secret_key(&key);
809 match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
810 Ok(bytes) => Ok(Some(bytes)),
811 Err(err) => {
812 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
813 Err(SecretsErrorV1_1::NotFound)
814 }
815 }
816 }
817
818 fn put(&mut self, key: String, value: Vec<u8>) {
819 if key.trim().is_empty() {
820 warn!(secret = %key, "secret write blocked: empty key");
821 panic!("secret write denied for key {key}: invalid key");
822 }
823 if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
824 warn!(
825 secret = %key,
826 component = self.component_ref.as_deref().unwrap_or("<pack>"),
827 "provider-core only mode enabled; blocking secrets store write"
828 );
829 panic!("secret write denied for key {key}: provider-core-only mode");
830 }
831 if !self.config.secrets_policy.is_allowed(&key) {
832 warn!(secret = %key, "secret write denied by bindings policy");
833 panic!("secret write denied for key {key}: policy");
834 }
835 let ctx = self.secrets_tenant_ctx();
836 let canonical_key = canonicalize_secret_key(&key);
837 if let Err(err) =
838 write_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key, &value)
839 {
840 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret write failed");
841 panic!("secret write failed for key {key}");
842 }
843 }
844}
845
846static WARNED_COMPAT_KEYS: Lazy<Mutex<HashSet<String>>> = Lazy::new(|| Mutex::new(HashSet::new()));
851
852fn warn_compat_fallback_once(key: &str) {
853 let mut warned = WARNED_COMPAT_KEYS.lock();
854 if warned.insert(key.to_string()) {
855 warn!(
856 key = %key,
857 "runtime-config key resolved via secrets-store compat fallback; \
858 move this value into pack-config.v1.non_secret"
859 );
860 }
861}
862
863impl RuntimeConfigHost for HostState {
864 fn get(&mut self, key: String) -> Result<Option<String>, ConfigError> {
865 if key.trim().is_empty() {
866 return Err(ConfigError::InvalidKey);
867 }
868
869 if let Some(map) = self.runtime_config_non_secret.as_ref()
873 && let Some(value) = map.get(&key)
874 {
875 return serde_json::to_string(value).map(Some).map_err(|err| {
876 warn!(key = %key, error = %err, "runtime-config value JSON-encode failed");
877 ConfigError::Internal
878 });
879 }
880
881 if let Some(injection) = self.runtime_refs.as_ref()
886 && let Some(uri) = injection.refs.get(&key)
887 {
888 use crate::runtime_refs::RuntimeRefResolverError;
889 return match injection.resolver.resolve(uri) {
890 Ok(Some(value)) => serde_json::to_string(&value).map(Some).map_err(|err| {
891 warn!(key = %key, error = %err, "runtime-ref value JSON-encode failed");
892 ConfigError::Internal
893 }),
894 Ok(None) => Ok(None),
895 Err(err @ RuntimeRefResolverError::Invalid(_)) => {
896 warn!(key = %key, error = %err, "runtime-ref rejected");
897 Err(ConfigError::InvalidKey)
898 }
899 Err(err @ RuntimeRefResolverError::Internal(_)) => {
900 warn!(key = %key, error = %err, "runtime-ref resolution failed");
901 Err(ConfigError::Internal)
902 }
903 };
904 }
905
906 match SecretsStoreHost::get(self, key.clone()) {
909 Ok(Some(bytes)) => match String::from_utf8(bytes) {
910 Ok(value) => {
911 warn_compat_fallback_once(&key);
912 Ok(Some(value))
913 }
914 Err(_) => {
915 warn!(
916 key = %key,
917 "runtime-config compat fallback found non-UTF-8 secret bytes; \
918 returning not-found"
919 );
920 Err(ConfigError::Internal)
921 }
922 },
923 Ok(None) => Ok(None),
924 Err(SecretsError::NotFound) => Ok(None),
925 Err(SecretsError::Denied) => Err(ConfigError::Denied),
926 Err(SecretsError::InvalidKey) => Err(ConfigError::InvalidKey),
927 Err(SecretsError::Internal) => Err(ConfigError::Internal),
928 }
929 }
930}
931
932impl HttpClientHost for HostState {
933 fn send(
934 &mut self,
935 req: HttpRequest,
936 ctx: Option<HttpTenantCtx>,
937 ) -> Result<HttpResponse, HttpClientError> {
938 self.send_http_request(req, None, ctx)
939 }
940}
941
942impl HttpClientHostV1_1 for HostState {
943 fn send(
944 &mut self,
945 req: HttpRequestV1_1,
946 opts: Option<HttpRequestOptionsV1_1>,
947 ctx: Option<HttpTenantCtxV1_1>,
948 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
949 let legacy_req = HttpRequest {
950 method: req.method,
951 url: req.url,
952 headers: req.headers,
953 body: req.body,
954 };
955 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
956 env: ctx.env,
957 tenant: ctx.tenant,
958 tenant_id: ctx.tenant_id,
959 team: ctx.team,
960 team_id: ctx.team_id,
961 user: ctx.user,
962 user_id: ctx.user_id,
963 trace_id: ctx.trace_id,
964 correlation_id: ctx.correlation_id,
965 i18n_id: ctx.i18n_id,
966 attributes: ctx.attributes,
967 session_id: ctx.session_id,
968 flow_id: ctx.flow_id,
969 node_id: ctx.node_id,
970 provider_id: ctx.provider_id,
971 deadline_ms: ctx.deadline_ms,
972 attempt: ctx.attempt,
973 idempotency_key: ctx.idempotency_key,
974 impersonation: ctx.impersonation.map(|imp| http_types_v1_0::Impersonation {
975 actor_id: imp.actor_id,
976 reason: imp.reason,
977 }),
978 });
979
980 self.send_http_request(legacy_req, opts, legacy_ctx)
981 .map(|resp| HttpResponseV1_1 {
982 status: resp.status,
983 headers: resp.headers,
984 body: resp.body,
985 })
986 .map_err(|err| HttpClientErrorV1_1 {
987 code: err.code,
988 message: err.message,
989 })
990 }
991}
992
993impl StateStoreHost for HostState {
994 fn read(
995 &mut self,
996 key: HostStateKey,
997 ctx: Option<StateTenantCtx>,
998 ) -> Result<Vec<u8>, StateError> {
999 let store = match self.state_store.as_ref() {
1000 Some(store) => store.clone(),
1001 None => {
1002 return Err(StateError {
1003 code: "unavailable".into(),
1004 message: "state store not configured".into(),
1005 });
1006 }
1007 };
1008 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1009 Ok(ctx) => ctx,
1010 Err(err) => {
1011 return Err(StateError {
1012 code: "invalid-ctx".into(),
1013 message: err.to_string(),
1014 });
1015 }
1016 };
1017 #[cfg(feature = "fault-injection")]
1018 {
1019 let exec_ctx = self.exec_ctx.as_ref();
1020 let flow_id = exec_ctx
1021 .map(|ctx| ctx.flow_id.as_str())
1022 .unwrap_or("unknown");
1023 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
1024 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
1025 let fault_ctx = FaultContext {
1026 pack_id: self.pack_id.as_str(),
1027 flow_id,
1028 node_id,
1029 attempt,
1030 };
1031 if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
1032 return Err(StateError {
1033 code: "internal".into(),
1034 message: err.to_string(),
1035 });
1036 }
1037 }
1038 let key = StoreStateKey::from(key);
1039 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
1040 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
1041 Ok(None) => Err(StateError {
1042 code: "not_found".into(),
1043 message: "state key not found".into(),
1044 }),
1045 Err(err) => Err(StateError {
1046 code: "internal".into(),
1047 message: err.to_string(),
1048 }),
1049 }
1050 }
1051
1052 fn write(
1053 &mut self,
1054 key: HostStateKey,
1055 bytes: Vec<u8>,
1056 ctx: Option<StateTenantCtx>,
1057 ) -> Result<StateOpAck, StateError> {
1058 let store = match self.state_store.as_ref() {
1059 Some(store) => store.clone(),
1060 None => {
1061 return Err(StateError {
1062 code: "unavailable".into(),
1063 message: "state store not configured".into(),
1064 });
1065 }
1066 };
1067 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1068 Ok(ctx) => ctx,
1069 Err(err) => {
1070 return Err(StateError {
1071 code: "invalid-ctx".into(),
1072 message: err.to_string(),
1073 });
1074 }
1075 };
1076 #[cfg(feature = "fault-injection")]
1077 {
1078 let exec_ctx = self.exec_ctx.as_ref();
1079 let flow_id = exec_ctx
1080 .map(|ctx| ctx.flow_id.as_str())
1081 .unwrap_or("unknown");
1082 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
1083 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
1084 let fault_ctx = FaultContext {
1085 pack_id: self.pack_id.as_str(),
1086 flow_id,
1087 node_id,
1088 attempt,
1089 };
1090 if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
1091 return Err(StateError {
1092 code: "internal".into(),
1093 message: err.to_string(),
1094 });
1095 }
1096 }
1097 let key = StoreStateKey::from(key);
1098 let value = serde_json::from_slice(&bytes)
1099 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
1100 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
1101 Ok(()) => Ok(StateOpAck::Ok),
1102 Err(err) => Err(StateError {
1103 code: "internal".into(),
1104 message: err.to_string(),
1105 }),
1106 }
1107 }
1108
1109 fn delete(
1110 &mut self,
1111 key: HostStateKey,
1112 ctx: Option<StateTenantCtx>,
1113 ) -> Result<StateOpAck, StateError> {
1114 let store = match self.state_store.as_ref() {
1115 Some(store) => store.clone(),
1116 None => {
1117 return Err(StateError {
1118 code: "unavailable".into(),
1119 message: "state store not configured".into(),
1120 });
1121 }
1122 };
1123 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1124 Ok(ctx) => ctx,
1125 Err(err) => {
1126 return Err(StateError {
1127 code: "invalid-ctx".into(),
1128 message: err.to_string(),
1129 });
1130 }
1131 };
1132 let key = StoreStateKey::from(key);
1133 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
1134 Ok(_) => Ok(StateOpAck::Ok),
1135 Err(err) => Err(StateError {
1136 code: "internal".into(),
1137 message: err.to_string(),
1138 }),
1139 }
1140 }
1141}
1142
1143impl TelemetryLoggerHost for HostState {
1144 fn log(
1145 &mut self,
1146 span: TelemetrySpanContext,
1147 fields: Vec<(String, String)>,
1148 _ctx: Option<TelemetryTenantCtx>,
1149 ) -> Result<TelemetryAck, TelemetryError> {
1150 if let Some(mock) = &self.mocks
1151 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
1152 {
1153 return Ok(TelemetryAck::Ok);
1154 }
1155 let mut map = serde_json::Map::new();
1156 for (k, v) in fields {
1157 map.insert(k, Value::String(v));
1158 }
1159 tracing::info!(
1160 tenant = %span.tenant,
1161 flow_id = %span.flow_id,
1162 node = ?span.node_id,
1163 provider = %span.provider,
1164 fields = %serde_json::Value::Object(map.clone()),
1165 "telemetry log from pack"
1166 );
1167 Ok(TelemetryAck::Ok)
1168 }
1169}
1170
1171impl RunnerHostHttp for HostState {
1172 fn request(
1173 &mut self,
1174 method: String,
1175 url: String,
1176 headers: Vec<String>,
1177 body: Option<Vec<u8>>,
1178 ) -> Result<Vec<u8>, String> {
1179 let req = HttpRequest {
1180 method,
1181 url,
1182 headers: headers
1183 .chunks(2)
1184 .filter_map(|chunk| {
1185 if chunk.len() == 2 {
1186 Some((chunk[0].clone(), chunk[1].clone()))
1187 } else {
1188 None
1189 }
1190 })
1191 .collect(),
1192 body,
1193 };
1194 match HttpClientHost::send(self, req, None) {
1195 Ok(resp) => Ok(resp.body.unwrap_or_default()),
1196 Err(err) => Err(err.message),
1197 }
1198 }
1199}
1200
1201impl RunnerHostKv for HostState {
1202 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
1203 None
1204 }
1205
1206 fn put(&mut self, _ns: String, _key: String, _val: String) {}
1207}
1208
1209enum ManifestLoad {
1210 New {
1211 manifest: Box<greentic_types::PackManifest>,
1212 flows: PackFlows,
1213 },
1214 Legacy {
1215 manifest: Box<legacy_pack::PackManifest>,
1216 flows: PackFlows,
1217 },
1218}
1219
1220fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
1221 let mut archive = ZipArchive::new(File::open(path)?)
1222 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1223 let bytes = read_entry(&mut archive, "manifest.cbor")
1224 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
1225 match decode_pack_manifest(&bytes) {
1226 Ok(manifest) => {
1227 let cache = PackFlows::from_manifest(manifest.clone());
1228 Ok(ManifestLoad::New {
1229 manifest: Box::new(manifest),
1230 flows: cache,
1231 })
1232 }
1233 Err(err) => {
1234 tracing::debug!(
1235 error = %err,
1236 pack = %path.display(),
1237 "decode_pack_manifest failed for archive; falling back to legacy manifest"
1238 );
1239 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1240 .context("failed to decode legacy pack manifest from manifest.cbor")?;
1241 let flows = load_legacy_flows_from_archive(&mut archive, &legacy)?;
1242 Ok(ManifestLoad::Legacy {
1243 manifest: Box::new(legacy),
1244 flows,
1245 })
1246 }
1247 }
1248}
1249
1250fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
1251 let manifest_path = root.join("manifest.cbor");
1252 let bytes = std::fs::read(&manifest_path)
1253 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
1254 match decode_pack_manifest(&bytes) {
1255 Ok(manifest) => {
1256 let cache = PackFlows::from_manifest(manifest.clone());
1257 Ok(ManifestLoad::New {
1258 manifest: Box::new(manifest),
1259 flows: cache,
1260 })
1261 }
1262 Err(err) => {
1263 tracing::debug!(
1264 error = %err,
1265 pack = %root.display(),
1266 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
1267 );
1268 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1269 .context("failed to decode legacy pack manifest from manifest.cbor")?;
1270 let flows = load_legacy_flows_from_dir(root, &legacy)?;
1271 Ok(ManifestLoad::Legacy {
1272 manifest: Box::new(legacy),
1273 flows,
1274 })
1275 }
1276 }
1277}
1278
1279fn load_legacy_flows_from_dir(
1280 root: &Path,
1281 manifest: &legacy_pack::PackManifest,
1282) -> Result<PackFlows> {
1283 build_legacy_flows(manifest, |rel_path| {
1284 let path = root.join(rel_path);
1285 std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
1286 })
1287}
1288
1289fn load_legacy_flows_from_archive(
1290 archive: &mut ZipArchive<File>,
1291 manifest: &legacy_pack::PackManifest,
1292) -> Result<PackFlows> {
1293 build_legacy_flows(manifest, |rel_path| {
1294 read_entry(archive, rel_path).with_context(|| format!("missing flow json {}", rel_path))
1295 })
1296}
1297
1298fn build_legacy_flows(
1299 manifest: &legacy_pack::PackManifest,
1300 mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1301) -> Result<PackFlows> {
1302 let mut flows = HashMap::new();
1303 let mut descriptors = Vec::new();
1304
1305 for entry in &manifest.flows {
1306 let bytes = read_json(&entry.file_json)
1307 .with_context(|| format!("missing flow json {}", entry.file_json))?;
1308 let doc = parse_flow_doc_with_legacy_aliases(&bytes)?;
1309 let normalized = normalize_flow_doc(doc);
1310 let flow_ir = flow_doc_to_ir(normalized)?;
1311 let flow = flow_ir_to_flow(flow_ir)?;
1312
1313 descriptors.push(FlowDescriptor {
1314 id: entry.id.clone(),
1315 flow_type: entry.kind.clone(),
1316 pack_id: manifest.meta.pack_id.clone(),
1317 profile: manifest.meta.pack_id.clone(),
1318 version: manifest.meta.version.to_string(),
1319 description: None,
1320 });
1321 flows.insert(entry.id.clone(), flow);
1322 }
1323
1324 let mut entry_flows = manifest.meta.entry_flows.clone();
1325 if entry_flows.is_empty() {
1326 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1327 }
1328 let metadata = PackMetadata {
1329 pack_id: manifest.meta.pack_id.clone(),
1330 version: manifest.meta.version.to_string(),
1331 entry_flows,
1332 secret_requirements: Vec::new(),
1333 };
1334
1335 Ok(PackFlows {
1336 descriptors,
1337 flows,
1338 metadata,
1339 })
1340}
1341
1342fn parse_flow_doc_with_legacy_aliases(bytes: &[u8]) -> Result<FlowDoc> {
1343 let mut value: Value =
1344 serde_json::from_slice(bytes).context("failed to decode flow doc JSON")?;
1345 if let Some(map) = value.as_object_mut()
1346 && !map.contains_key("type")
1347 && let Some(flow_type) = map.remove("flow_type")
1348 {
1349 map.insert("type".to_string(), flow_type);
1350 }
1351 serde_json::from_value(value).context("failed to decode flow doc structure")
1352}
1353
1354pub struct ComponentState {
1355 pub host: HostState,
1356 wasi_ctx: WasiCtx,
1357 wasi_tls_ctx: WasiTlsCtx,
1358 wasi_http_ctx: WasiHttpCtx,
1359 resource_table: ResourceTable,
1360}
1361
1362fn install_default_crypto_provider() {
1373 static ONCE: std::sync::Once = std::sync::Once::new();
1374 ONCE.call_once(|| {
1375 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
1376 });
1377}
1378
1379impl ComponentState {
1380 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1381 install_default_crypto_provider();
1384 let wasi_ctx = policy
1385 .instantiate()
1386 .context("failed to build WASI context")?;
1387 Ok(Self {
1388 host,
1389 wasi_ctx,
1390 wasi_tls_ctx: WasiTlsCtxBuilder::new().build(),
1391 wasi_http_ctx: WasiHttpCtx::new(),
1392 resource_table: ResourceTable::new(),
1393 })
1394 }
1395
1396 fn host_mut(&mut self) -> &mut HostState {
1397 &mut self.host
1398 }
1399
1400 fn should_cancel_host(&mut self) -> bool {
1401 false
1402 }
1403
1404 fn yield_now_host(&mut self) {
1405 }
1407}
1408
1409impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1410 fn should_cancel(&mut self) -> bool {
1411 self.should_cancel_host()
1412 }
1413
1414 fn yield_now(&mut self) {
1415 self.yield_now_host();
1416 }
1417}
1418
1419impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1420 fn should_cancel(&mut self) -> bool {
1421 self.should_cancel_host()
1422 }
1423
1424 fn yield_now(&mut self) {
1425 self.yield_now_host();
1426 }
1427}
1428
1429fn add_component_control_instance(
1430 linker: &mut Linker<ComponentState>,
1431 name: &str,
1432) -> wasmtime::Result<()> {
1433 let mut inst = linker.instance(name)?;
1434 inst.func_wrap(
1435 "should-cancel",
1436 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1437 let host = caller.data_mut();
1438 Ok((host.should_cancel_host(),))
1439 },
1440 )?;
1441 inst.func_wrap(
1442 "yield-now",
1443 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1444 let host = caller.data_mut();
1445 host.yield_now_host();
1446 Ok(())
1447 },
1448 )?;
1449 Ok(())
1450}
1451
1452fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1453 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1454 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1455 Ok(())
1456}
1457
1458pub fn register_identity_probe(linker: &mut Linker<ComponentState>) -> Result<()> {
1491 register_all(linker, false)
1494}
1495
1496#[cfg(test)]
1497mod register_identity_probe_tests {
1498 use super::*;
1499
1500 #[test]
1509 fn register_identity_probe_links_successfully() {
1510 let engine = wasmtime::Engine::default();
1511 let mut linker = Linker::<ComponentState>::new(&engine);
1512 register_identity_probe(&mut linker).expect("probe linker registers all imports");
1513 }
1514
1515 #[test]
1518 fn probe_wasi_policy_is_locked_down() {
1519 let policy = RunnerWasiPolicy::probe();
1520 assert!(!policy.inherit_stdio, "probe WASI must not inherit stdio");
1521 assert!(
1522 policy.preopens.is_empty(),
1523 "probe WASI must have no preopens"
1524 );
1525 assert!(
1526 policy.env_allow.is_empty(),
1527 "probe WASI must not allow env vars"
1528 );
1529 assert!(
1530 policy.env_set.is_empty(),
1531 "probe WASI must not set env vars"
1532 );
1533 }
1534}
1535
1536pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1537 add_wasi_to_linker(linker)?;
1538
1539 let mut opts = LinkOptions::default();
1541 opts.tls(true);
1542 wasmtime_wasi_tls::p2::add_to_linker(linker, &opts)?;
1543
1544 add_wasi_http_to_linker(linker)?;
1546
1547 add_all_v1_to_linker(
1548 linker,
1549 HostFns {
1550 http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1551 http_client: Some(|state: &mut ComponentState| state.host_mut()),
1552 oauth_broker: None,
1553 runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1554 runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1555 telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1556 state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1557 secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1558 secrets_store: None,
1559 runtime_config: Some(|state: &mut ComponentState| state.host_mut()),
1560 },
1561 )?;
1562 add_http_client_client_world_aliases(linker)?;
1563 Ok(())
1564}
1565
1566fn add_http_client_client_world_aliases(linker: &mut Linker<ComponentState>) -> Result<()> {
1567 let mut inst_v1_1 = linker.instance("greentic:http/client@1.1.0")?;
1568 inst_v1_1.func_wrap(
1569 "send",
1570 move |mut caller: StoreContextMut<'_, ComponentState>,
1571 (req, opts, ctx): (
1572 http_client_client_alias::Request,
1573 Option<http_client_client_alias::RequestOptions>,
1574 Option<http_client_client_alias::TenantCtx>,
1575 )| {
1576 let host = caller.data_mut().host_mut();
1577 let result = HttpClientHostV1_1::send(
1578 host,
1579 alias_request_to_host(req),
1580 opts.map(alias_request_options_to_host),
1581 ctx.map(alias_tenant_ctx_to_host),
1582 );
1583 Ok((match result {
1584 Ok(resp) => Ok(alias_response_from_host(resp)),
1585 Err(err) => Err(alias_error_from_host(err)),
1586 },))
1587 },
1588 )?;
1589 let mut inst_v1_0 = linker.instance("greentic:http/client@1.0.0")?;
1590 inst_v1_0.func_wrap(
1591 "send",
1592 move |mut caller: StoreContextMut<'_, ComponentState>,
1593 (req, ctx): (
1594 host_http_client::Request,
1595 Option<host_http_client::TenantCtx>,
1596 )| {
1597 let host = caller.data_mut().host_mut();
1598 let result = HttpClientHost::send(host, req, ctx);
1599 Ok((result,))
1600 },
1601 )?;
1602 Ok(())
1603}
1604
1605fn alias_request_to_host(req: http_client_client_alias::Request) -> host_http_client::RequestV1_1 {
1606 host_http_client::RequestV1_1 {
1607 method: req.method,
1608 url: req.url,
1609 headers: req.headers,
1610 body: req.body,
1611 }
1612}
1613
1614fn alias_request_options_to_host(
1615 opts: http_client_client_alias::RequestOptions,
1616) -> host_http_client::RequestOptionsV1_1 {
1617 host_http_client::RequestOptionsV1_1 {
1618 timeout_ms: opts.timeout_ms,
1619 allow_insecure: opts.allow_insecure,
1620 follow_redirects: opts.follow_redirects,
1621 }
1622}
1623
1624fn alias_tenant_ctx_to_host(
1625 ctx: http_client_client_alias::TenantCtx,
1626) -> host_http_client::TenantCtxV1_1 {
1627 host_http_client::TenantCtxV1_1 {
1628 env: ctx.env,
1629 tenant: ctx.tenant,
1630 tenant_id: ctx.tenant_id,
1631 team: ctx.team,
1632 team_id: ctx.team_id,
1633 user: ctx.user,
1634 user_id: ctx.user_id,
1635 trace_id: ctx.trace_id,
1636 correlation_id: ctx.correlation_id,
1637 i18n_id: ctx.i18n_id,
1638 attributes: ctx.attributes,
1639 session_id: ctx.session_id,
1640 flow_id: ctx.flow_id,
1641 node_id: ctx.node_id,
1642 provider_id: ctx.provider_id,
1643 deadline_ms: ctx.deadline_ms,
1644 attempt: ctx.attempt,
1645 idempotency_key: ctx.idempotency_key,
1646 impersonation: ctx.impersonation.map(|imp| http_types_v1_1::Impersonation {
1647 actor_id: imp.actor_id,
1648 reason: imp.reason,
1649 }),
1650 }
1651}
1652
1653fn alias_response_from_host(
1654 resp: host_http_client::ResponseV1_1,
1655) -> http_client_client_alias::Response {
1656 http_client_client_alias::Response {
1657 status: resp.status,
1658 headers: resp.headers,
1659 body: resp.body,
1660 }
1661}
1662
1663fn alias_error_from_host(
1664 err: host_http_client::HttpClientErrorV1_1,
1665) -> http_client_client_alias::HostError {
1666 http_client_client_alias::HostError {
1667 code: err.code,
1668 message: err.message,
1669 }
1670}
1671
1672impl OAuthHostContext for ComponentState {
1673 fn tenant_id(&self) -> &str {
1674 &self.host.config.tenant
1675 }
1676
1677 fn env(&self) -> &str {
1678 &self.host.default_env
1679 }
1680
1681 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1682 &mut self.host.oauth_host
1683 }
1684
1685 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1686 self.host.oauth_config.as_ref()
1687 }
1688}
1689
1690impl WasiView for ComponentState {
1691 fn ctx(&mut self) -> WasiCtxView<'_> {
1692 WasiCtxView {
1693 ctx: &mut self.wasi_ctx,
1694 table: &mut self.resource_table,
1695 }
1696 }
1697}
1698
1699impl WasiHttpView for ComponentState {
1700 fn http(&mut self) -> WasiHttpCtxView<'_> {
1701 WasiHttpCtxView {
1702 ctx: &mut self.wasi_http_ctx,
1703 table: &mut self.resource_table,
1704 hooks: Default::default(),
1705 }
1706 }
1707}
1708
1709impl WasiTlsView for ComponentState {
1710 fn tls(&mut self) -> WasiTlsCtxView<'_> {
1711 WasiTlsCtxView {
1712 ctx: &mut self.wasi_tls_ctx,
1713 table: &mut self.resource_table,
1714 }
1715 }
1716}
1717
1718#[allow(unsafe_code)]
1719unsafe impl Send for ComponentState {}
1720#[allow(unsafe_code)]
1721unsafe impl Sync for ComponentState {}
1722
1723impl PackRuntime {
1724 fn allows_state_store(&self, component_ref: &str) -> bool {
1725 if self.state_store.is_none() {
1726 return false;
1727 }
1728 if !self.config.state_store_policy.allow {
1729 return false;
1730 }
1731 let Some(manifest) = self.component_manifests.get(component_ref) else {
1732 return true;
1734 };
1735 manifest
1740 .capabilities
1741 .host
1742 .state
1743 .as_ref()
1744 .map(|caps| caps.read || caps.write)
1745 .unwrap_or(true)
1746 }
1747
1748 pub fn contains_component(&self, component_ref: &str) -> bool {
1749 self.components.contains_key(component_ref)
1750 }
1751
1752 pub fn state_store_handle(&self) -> Option<crate::storage::DynStateStore> {
1757 self.state_store.clone()
1758 }
1759
1760 #[allow(clippy::too_many_arguments)]
1761 pub async fn load(
1762 path: impl AsRef<Path>,
1763 config: Arc<HostConfig>,
1764 mocks: Option<Arc<MockLayer>>,
1765 archive_source: Option<&Path>,
1766 session_store: Option<DynSessionStore>,
1767 state_store: Option<DynStateStore>,
1768 wasi_policy: Arc<RunnerWasiPolicy>,
1769 secrets: DynSecretsManager,
1770 oauth_config: Option<OAuthBrokerConfig>,
1771 verify_archive: bool,
1772 component_resolution: ComponentResolution,
1773 ) -> Result<Self> {
1774 let path = path.as_ref();
1775 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1776 let path_meta = std::fs::metadata(&safe_path).ok();
1777 let is_dir = path_meta
1778 .as_ref()
1779 .map(|meta| meta.is_dir())
1780 .unwrap_or(false);
1781 let is_component = !is_dir
1782 && safe_path
1783 .extension()
1784 .and_then(|ext| ext.to_str())
1785 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1786 .unwrap_or(false);
1787 let archive_hint_path = if let Some(source) = archive_source {
1788 let (_, normalized) = normalize_pack_path(source)?;
1789 Some(normalized)
1790 } else if is_component || is_dir {
1791 None
1792 } else {
1793 Some(safe_path.clone())
1794 };
1795 let archive_hint = archive_hint_path.as_deref();
1796 if verify_archive {
1797 if let Some(verify_target) = archive_hint.and_then(|p| {
1798 std::fs::metadata(p)
1799 .ok()
1800 .filter(|meta| meta.is_file())
1801 .map(|_| p)
1802 }) {
1803 verify::verify_pack(verify_target).await?;
1804 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1805 } else {
1806 tracing::debug!("skipping archive verification (no archive source)");
1807 }
1808 }
1809 let engine = Engine::default();
1810 let engine_profile =
1811 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1812 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1813 let mut metadata = PackMetadata::fallback(&safe_path);
1814 let mut manifest = None;
1815 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1816 let mut flows = None;
1817 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1818 if is_dir {
1819 Some(safe_path.clone())
1820 } else {
1821 None
1822 }
1823 });
1824 let (pack_assets_dir, assets_tempdir) =
1825 locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1826 let setup_yaml_exists = pack_assets_dir
1827 .as_ref()
1828 .map(|dir| dir.join("setup.yaml").is_file())
1829 .unwrap_or(false);
1830 tracing::info!(
1831 pack_root = %safe_path.display(),
1832 assets_setup_yaml_exists = setup_yaml_exists,
1833 "pack unpack metadata"
1834 );
1835
1836 if let Some(root) = materialized_root.as_ref() {
1837 match load_manifest_and_flows_from_dir(root) {
1838 Ok(ManifestLoad::New {
1839 manifest: m,
1840 flows: cache,
1841 }) => {
1842 metadata = cache.metadata.clone();
1843 manifest = Some(*m);
1844 flows = Some(cache);
1845 }
1846 Ok(ManifestLoad::Legacy {
1847 manifest: m,
1848 flows: cache,
1849 }) => {
1850 metadata = cache.metadata.clone();
1851 legacy_manifest = Some(m);
1852 flows = Some(cache);
1853 }
1854 Err(err) => {
1855 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1856 }
1857 }
1858 }
1859
1860 if manifest.is_none()
1861 && legacy_manifest.is_none()
1862 && let Some(archive_path) = archive_hint
1863 {
1864 let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1865 format!(
1866 "failed to load manifest.cbor from {}",
1867 archive_path.display()
1868 )
1869 })?;
1870 match manifest_load {
1871 ManifestLoad::New {
1872 manifest: m,
1873 flows: cache,
1874 } => {
1875 metadata = cache.metadata.clone();
1876 manifest = Some(*m);
1877 flows = Some(cache);
1878 }
1879 ManifestLoad::Legacy {
1880 manifest: m,
1881 flows: cache,
1882 } => {
1883 metadata = cache.metadata.clone();
1884 legacy_manifest = Some(m);
1885 flows = Some(cache);
1886 }
1887 }
1888 }
1889 #[cfg(feature = "fault-injection")]
1890 {
1891 let fault_ctx = FaultContext {
1892 pack_id: metadata.pack_id.as_str(),
1893 flow_id: "unknown",
1894 node_id: None,
1895 attempt: 1,
1896 };
1897 maybe_fail(FaultPoint::PackResolve, fault_ctx)
1898 .map_err(|err| anyhow!(err.to_string()))?;
1899 }
1900 let mut pack_lock = None;
1901 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1902 pack_lock = load_pack_lock(&root)?;
1903 if pack_lock.is_some() {
1904 break;
1905 }
1906 }
1907 let component_sources_payload = if pack_lock.is_none() {
1908 if let Some(manifest) = manifest.as_ref() {
1909 manifest
1910 .get_component_sources_v1()
1911 .context("invalid component sources extension")?
1912 } else {
1913 None
1914 }
1915 } else {
1916 None
1917 };
1918 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1919 Some(component_sources_table_from_pack_lock(
1920 lock,
1921 component_resolution.allow_missing_hash,
1922 )?)
1923 } else {
1924 component_sources_table(component_sources_payload.as_ref())?
1925 };
1926 let components = if is_component {
1927 let wasm_bytes = fs::read(&safe_path).await?;
1928 metadata = PackMetadata::from_wasm(&wasm_bytes)
1929 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1930 let name = safe_path
1931 .file_stem()
1932 .map(|s| s.to_string_lossy().to_string())
1933 .unwrap_or_else(|| "component".to_string());
1934 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1935 let mut map = HashMap::new();
1936 map.insert(
1937 name.clone(),
1938 PackComponent {
1939 name,
1940 version: metadata.version.clone(),
1941 component,
1942 },
1943 );
1944 map
1945 } else {
1946 let specs = component_specs(
1947 manifest.as_ref(),
1948 legacy_manifest.as_deref(),
1949 component_sources_payload.as_ref(),
1950 pack_lock.as_ref(),
1951 );
1952 if specs.is_empty() {
1953 HashMap::new()
1954 } else {
1955 let mut loaded = HashMap::new();
1956 let mut missing: HashSet<String> =
1957 specs.iter().map(|spec| spec.id.clone()).collect();
1958 let mut searched = Vec::new();
1959
1960 if !component_resolution.overrides.is_empty() {
1961 load_components_from_overrides(
1962 &cache,
1963 &engine,
1964 &component_resolution.overrides,
1965 &specs,
1966 &mut missing,
1967 &mut loaded,
1968 )
1969 .await?;
1970 searched.push("override map".to_string());
1971 }
1972
1973 if let Some(component_sources) = component_sources.as_ref() {
1974 load_components_from_sources(
1975 &cache,
1976 &engine,
1977 component_sources,
1978 &component_resolution,
1979 &specs,
1980 &mut missing,
1981 &mut loaded,
1982 materialized_root.as_deref(),
1983 archive_hint,
1984 )
1985 .await?;
1986 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1987 }
1988
1989 if let Some(root) = materialized_root.as_ref() {
1990 load_components_from_dir(
1991 &cache,
1992 &engine,
1993 root,
1994 &specs,
1995 &mut missing,
1996 &mut loaded,
1997 )
1998 .await?;
1999 searched.push(format!("components dir {}", root.display()));
2000 }
2001
2002 if let Some(archive_path) = archive_hint {
2003 load_components_from_archive(
2004 &cache,
2005 &engine,
2006 archive_path,
2007 &specs,
2008 &mut missing,
2009 &mut loaded,
2010 )
2011 .await?;
2012 searched.push(format!("archive {}", archive_path.display()));
2013 }
2014
2015 if !missing.is_empty() {
2016 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
2017 let sources = if searched.is_empty() {
2018 "no component sources".to_string()
2019 } else {
2020 searched.join(", ")
2021 };
2022 bail!(
2023 "components missing: {}; looked in {}",
2024 missing_list,
2025 sources
2026 );
2027 }
2028
2029 loaded
2030 }
2031 };
2032 let http_client = Arc::clone(&HTTP_CLIENT);
2033 let mut component_manifests = HashMap::new();
2034 if let Some(manifest) = manifest.as_ref() {
2035 for component in &manifest.components {
2036 component_manifests.insert(component.id.as_str().to_string(), component.clone());
2037 }
2038 }
2039 let mut pack_policy = (*wasi_policy).clone();
2040 if let Some(dir) = pack_assets_dir {
2041 tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
2042 pack_policy =
2043 pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
2044 }
2045 let wasi_policy = Arc::new(pack_policy);
2046 Ok(Self {
2047 path: safe_path,
2048 archive_path: archive_hint.map(Path::to_path_buf),
2049 config,
2050 engine,
2051 metadata,
2052 manifest,
2053 legacy_manifest,
2054 component_manifests,
2055 mocks,
2056 flows,
2057 components,
2058 http_client,
2059 pre_cache: Mutex::new(HashMap::new()),
2060 session_store,
2061 state_store,
2062 wasi_policy,
2063 assets_tempdir,
2064 provider_registry: RwLock::new(None),
2065 identify_hint_cache: RwLock::new(HashMap::new()),
2066 secrets,
2067 oauth_config,
2068 cache,
2069 runtime_config_non_secret: None,
2070 runtime_refs: None,
2071 })
2072 }
2073
2074 pub fn set_runtime_config_non_secret(&mut self, map: Option<Arc<BTreeMap<String, Value>>>) {
2078 self.runtime_config_non_secret = map;
2079 }
2080
2081 pub fn runtime_config_non_secret(&self) -> Option<&Arc<BTreeMap<String, Value>>> {
2084 self.runtime_config_non_secret.as_ref()
2085 }
2086
2087 pub fn set_runtime_refs(&mut self, injection: Option<RuntimeRefsInjection>) {
2092 self.runtime_refs = injection;
2093 }
2094
2095 pub fn runtime_refs(&self) -> Option<&RuntimeRefsInjection> {
2098 self.runtime_refs.as_ref()
2099 }
2100
2101 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
2102 if let Some(cache) = &self.flows {
2103 return Ok(cache.descriptors.clone());
2104 }
2105 if let Some(manifest) = &self.manifest {
2106 let descriptors = manifest
2107 .flows
2108 .iter()
2109 .map(|flow| FlowDescriptor {
2110 id: flow.id.as_str().to_string(),
2111 flow_type: flow_kind_to_str(flow.kind).to_string(),
2112 pack_id: manifest.pack_id.as_str().to_string(),
2113 profile: manifest.pack_id.as_str().to_string(),
2114 version: manifest.version.to_string(),
2115 description: None,
2116 })
2117 .collect();
2118 return Ok(descriptors);
2119 }
2120 Ok(Vec::new())
2121 }
2122
2123 #[allow(dead_code)]
2124 pub async fn run_flow(
2125 &self,
2126 flow_id: &str,
2127 input: serde_json::Value,
2128 ) -> Result<serde_json::Value> {
2129 let pack = Arc::new(
2130 PackRuntime::load(
2131 &self.path,
2132 Arc::clone(&self.config),
2133 self.mocks.clone(),
2134 self.archive_path.as_deref(),
2135 self.session_store.clone(),
2136 self.state_store.clone(),
2137 Arc::clone(&self.wasi_policy),
2138 self.secrets.clone(),
2139 self.oauth_config.clone(),
2140 false,
2141 ComponentResolution::default(),
2142 )
2143 .await?,
2144 );
2145
2146 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
2147 let retry_config = self.config.retry_config().into();
2148 let mocks = pack.mocks.as_deref();
2149 let tenant = self.config.tenant.as_str();
2150
2151 let ctx = FlowContext {
2152 tenant,
2153 pack_id: pack.metadata().pack_id.as_str(),
2154 flow_id,
2155 node_id: None,
2156 tool: None,
2157 action: None,
2158 session_id: None,
2159 provider_id: None,
2160 retry_config,
2161 attempt: 1,
2162 observer: None,
2163 mocks,
2164 };
2165
2166 let execution = engine.execute(ctx, input).await?;
2167 match execution.status {
2168 FlowStatus::Completed => Ok(execution.output),
2169 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
2170 "status": "pending",
2171 "reason": wait.reason,
2172 "resume": wait.snapshot,
2173 "response": execution.output,
2174 })),
2175 }
2176 }
2177
2178 pub async fn invoke_component(
2179 &self,
2180 component_ref: &str,
2181 ctx: ComponentExecCtx,
2182 operation: &str,
2183 config_json: Option<String>,
2184 input_json: String,
2185 ) -> Result<Value> {
2186 let component_ref = resolve_component_key(component_ref, operation, |key| {
2187 self.components.contains_key(key)
2188 });
2189 let pack_component = self
2190 .components
2191 .get(component_ref)
2192 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
2193 let engine = self.engine.clone();
2194 let config = Arc::clone(&self.config);
2195 let http_client = Arc::clone(&self.http_client);
2196 let mocks = self.mocks.clone();
2197 let session_store = self.session_store.clone();
2198 let state_store = self.state_store.clone();
2199 let secrets = Arc::clone(&self.secrets);
2200 let oauth_config = self.oauth_config.clone();
2201 let wasi_policy = Arc::clone(&self.wasi_policy);
2202 let pack_id = self.metadata().pack_id.clone();
2203 let allow_state_store = self.allows_state_store(component_ref);
2204 let component = pack_component.component.clone();
2205 let component_ref_owned = component_ref.to_string();
2206 let operation_owned = operation.to_string();
2207 let input_owned =
2208 Self::merge_component_config_into_input_json(config_json.as_deref(), &input_json)
2209 .context("merge component config into invocation payload")?;
2210 let ctx_owned = ctx;
2211 let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2212 let runtime_refs = self.runtime_refs.clone();
2213
2214 run_on_wasi_thread("component.invoke", move || {
2215 let mut linker = Linker::new(&engine);
2216 register_all(&mut linker, allow_state_store)?;
2217 add_component_control_to_linker(&mut linker)?;
2218
2219 let host_state = HostState::new(
2220 pack_id.clone(),
2221 config,
2222 http_client,
2223 mocks,
2224 session_store,
2225 state_store,
2226 secrets,
2227 oauth_config,
2228 Some(ctx_owned.clone()),
2229 Some(component_ref_owned.clone()),
2230 false,
2231 runtime_config_non_secret,
2232 runtime_refs,
2233 )?;
2234 let store_state = ComponentState::new(host_state, wasi_policy)?;
2235 let mut store = wasmtime::Store::new(&engine, store_state);
2236
2237 let invoke_result = HostState::instantiate_component_result(
2238 &mut linker,
2239 &mut store,
2240 &component,
2241 &ctx_owned,
2242 &component_ref_owned,
2243 &operation_owned,
2244 &input_owned,
2245 )?;
2246 HostState::convert_invoke_result(invoke_result)
2247 })
2248 }
2249
2250 fn merge_component_config_into_input_json(
2251 config_json: Option<&str>,
2252 input_json: &str,
2253 ) -> Result<String> {
2254 let Some(config_json) = config_json else {
2255 return Ok(input_json.to_string());
2256 };
2257
2258 let config_value: Value =
2259 serde_json::from_str(config_json).context("parse component config JSON")?;
2260
2261 if let Ok(mut invocation) =
2262 serde_json::from_str::<greentic_types::InvocationEnvelope>(input_json)
2263 {
2264 let payload_value = serde_json::from_slice(&invocation.payload).unwrap_or_else(|_| {
2265 Value::String(String::from_utf8_lossy(&invocation.payload).into_owned())
2266 });
2267 invocation.payload = serde_json::to_vec(&serde_json::json!({
2268 "config": config_value,
2269 "input": payload_value,
2270 }))
2271 .context("serialize merged invocation payload")?;
2272 return serde_json::to_string(&invocation)
2273 .context("serialize merged invocation envelope");
2274 }
2275
2276 let input_value = serde_json::from_str(input_json)
2277 .unwrap_or_else(|_| Value::String(input_json.to_string()));
2278 serde_json::to_string(&serde_json::json!({
2279 "config": config_value,
2280 "input": input_value,
2281 }))
2282 .context("serialize merged component input")
2283 }
2284
2285 pub fn resolve_provider(
2286 &self,
2287 provider_id: Option<&str>,
2288 provider_type: Option<&str>,
2289 ) -> Result<ProviderBinding> {
2290 let registry = self.provider_registry()?;
2291 registry.resolve(provider_id, provider_type)
2292 }
2293
2294 pub async fn invoke_provider(
2295 &self,
2296 binding: &ProviderBinding,
2297 ctx: ComponentExecCtx,
2298 op: &str,
2299 input_json: Vec<u8>,
2300 ) -> Result<Value> {
2301 let component_ref_owned = binding.component_ref.clone();
2302 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2303 format!("provider component '{component_ref_owned}' not found in pack")
2304 })?;
2305 let component = pack_component.component.clone();
2306
2307 let engine = self.engine.clone();
2308 let config = Arc::clone(&self.config);
2309 let http_client = Arc::clone(&self.http_client);
2310 let mocks = self.mocks.clone();
2311 let session_store = self.session_store.clone();
2312 let state_store = self.state_store.clone();
2313 let secrets = Arc::clone(&self.secrets);
2314 let oauth_config = self.oauth_config.clone();
2315 let wasi_policy = Arc::clone(&self.wasi_policy);
2316 let pack_id = self.metadata().pack_id.clone();
2317 let allow_state_store = self.allows_state_store(&component_ref_owned);
2318 let input_owned = input_json;
2319 let op_owned = op.to_string();
2320 let ctx_owned = ctx;
2321 let world = binding.world.clone();
2322 let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2323 let runtime_refs = self.runtime_refs.clone();
2324
2325 run_on_wasi_thread("provider.invoke", move || {
2326 let mut linker = Linker::new(&engine);
2327 register_all(&mut linker, allow_state_store)?;
2328 add_component_control_to_linker(&mut linker)?;
2329 let host_state = HostState::new(
2330 pack_id.clone(),
2331 config,
2332 http_client,
2333 mocks,
2334 session_store,
2335 state_store,
2336 secrets,
2337 oauth_config,
2338 Some(ctx_owned.clone()),
2339 Some(component_ref_owned.clone()),
2340 true,
2341 runtime_config_non_secret,
2342 runtime_refs,
2343 )?;
2344 let store_state = ComponentState::new(host_state, wasi_policy)?;
2345 let mut store = wasmtime::Store::new(&engine, store_state);
2346 let use_schema_core_schema = world.contains("provider-schema-core");
2347 let use_schema_core_path = world.contains("provider/schema-core");
2348 let result = if use_schema_core_schema {
2349 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2350 let pre: SchemaSchemaCorePre<ComponentState> =
2351 SchemaSchemaCorePre::new(pre_instance)?;
2352 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2353 let provider = bindings.greentic_provider_schema_core_schema_core_api();
2354 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2355 } else if use_schema_core_path {
2356 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2357 let path_attempt = (|| -> Result<Vec<u8>> {
2358 let pre: PathSchemaCorePre<ComponentState> =
2359 PathSchemaCorePre::new(pre_instance)?;
2360 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2361 let provider = bindings.greentic_provider_schema_core_api();
2362 Ok(provider.call_invoke(&mut store, &op_owned, &input_owned)?)
2363 })();
2364 match path_attempt {
2365 Ok(value) => value,
2366 Err(path_err)
2367 if path_err.to_string().contains("no exported instance named") =>
2368 {
2369 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2370 let pre: SchemaSchemaCorePre<ComponentState> =
2371 SchemaSchemaCorePre::new(pre_instance)?;
2372 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2373 let provider = bindings.greentic_provider_schema_core_schema_core_api();
2374 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2375 }
2376 Err(path_err) => return Err(path_err),
2377 }
2378 } else {
2379 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2380 let pre: LegacySchemaCorePre<ComponentState> =
2381 LegacySchemaCorePre::new(pre_instance)?;
2382 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2383 let provider = bindings.greentic_provider_core_schema_core_api();
2384 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2385 };
2386 deserialize_json_bytes(result)
2387 })
2388 }
2389
2390 pub async fn invoke_identify_instance(
2417 &self,
2418 binding: &ProviderBinding,
2419 payload: Vec<u8>,
2420 ) -> Result<IdentifyOutcome> {
2421 let component_ref_owned = binding.component_ref.clone();
2422 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2423 format!("provider component '{component_ref_owned}' not found in pack")
2424 })?;
2425 let component = pack_component.component.clone();
2426
2427 let engine = self.engine.clone();
2428 let config = Arc::clone(&self.config);
2429 let http_client = Arc::clone(&self.http_client);
2430 let mocks = self.mocks.clone();
2431 let session_store = self.session_store.clone();
2432 let state_store = self.state_store.clone();
2433 let secrets = Arc::clone(&self.secrets);
2434 let oauth_config = self.oauth_config.clone();
2435 let pack_id = self.metadata().pack_id.clone();
2436
2437 let wasi_policy = Arc::new(RunnerWasiPolicy::probe());
2442 let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2443 let runtime_refs = self.runtime_refs.clone();
2444 run_on_wasi_thread("provider.identify_instance", move || {
2445 let mut linker = Linker::new(&engine);
2446 register_identity_probe(&mut linker)?;
2447 let host_state = HostState::new(
2448 pack_id.clone(),
2449 config,
2450 http_client,
2451 mocks,
2452 session_store,
2453 state_store,
2454 secrets,
2455 oauth_config,
2456 None,
2457 Some(component_ref_owned.clone()),
2458 true,
2459 runtime_config_non_secret,
2460 runtime_refs,
2461 )?;
2462 let store_state = ComponentState::new(host_state, wasi_policy)?;
2463 let mut store = wasmtime::Store::new(&engine, store_state);
2464
2465 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2466 let pre = match InstanceIdentityPre::<ComponentState>::new(pre_instance) {
2467 Ok(pre) => pre,
2468 Err(err) if is_missing_export_error(&format!("{err:#}")) => {
2469 return Ok(IdentifyOutcome::Unsupported);
2470 }
2471 Err(err) => return Err(err.into()),
2472 };
2473 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2474 let api = bindings.greentic_provider_instance_identity_instance_identity_api();
2475 let result = api.call_identify_instance(&mut store, &payload)?;
2476 Ok(match result {
2477 Some(id) => IdentifyOutcome::Identified(id),
2478 None => IdentifyOutcome::NoMatch,
2479 })
2480 })
2481 }
2482
2483 pub async fn invoke_describe_identify_instance(
2499 &self,
2500 binding: &ProviderBinding,
2501 ) -> Result<Option<IdentifyInstanceHint>> {
2502 let component_ref_owned = binding.component_ref.clone();
2503 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2504 format!("provider component '{component_ref_owned}' not found in pack")
2505 })?;
2506 let component = pack_component.component.clone();
2507
2508 let engine = self.engine.clone();
2509 let config = Arc::clone(&self.config);
2510 let http_client = Arc::clone(&self.http_client);
2511 let mocks = self.mocks.clone();
2512 let session_store = self.session_store.clone();
2513 let state_store = self.state_store.clone();
2514 let secrets = Arc::clone(&self.secrets);
2515 let oauth_config = self.oauth_config.clone();
2516 let pack_id = self.metadata().pack_id.clone();
2517
2518 let wasi_policy = Arc::new(RunnerWasiPolicy::probe());
2521 let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2522 let runtime_refs = self.runtime_refs.clone();
2523 run_on_wasi_thread("provider.describe_identify_instance", move || {
2524 let mut linker = Linker::new(&engine);
2525 register_identity_probe(&mut linker)?;
2526 let host_state = HostState::new(
2527 pack_id.clone(),
2528 config,
2529 http_client,
2530 mocks,
2531 session_store,
2532 state_store,
2533 secrets,
2534 oauth_config,
2535 None,
2536 Some(component_ref_owned.clone()),
2537 true,
2538 runtime_config_non_secret,
2539 runtime_refs,
2540 )?;
2541 let store_state = ComponentState::new(host_state, wasi_policy)?;
2542 let mut store = wasmtime::Store::new(&engine, store_state);
2543
2544 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2545 let pre = match InstanceIdentityDescribePre::<ComponentState>::new(pre_instance) {
2546 Ok(pre) => pre,
2547 Err(err) if is_missing_export_error(&format!("{err:#}")) => {
2548 return Ok(None);
2549 }
2550 Err(err) => return Err(err.into()),
2551 };
2552 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2553 let api = bindings.greentic_provider_instance_identity_instance_identity_describe_api();
2554 let raw = api.call_describe_identify_instance(&mut store)?;
2555 let Some(bytes) = raw else {
2556 return Ok(None);
2560 };
2561 match IdentifyInstanceHint::from_json(&bytes) {
2562 Ok(hint) => Ok(Some(hint)),
2563 Err(err) => {
2564 warn!(
2569 event = "provider.describe_identify_instance.malformed",
2570 component_ref = %component_ref_owned,
2571 error = %err,
2572 "ignoring malformed describe-identify-instance hint; \
2573 falling back to unhinted wrapper"
2574 );
2575 Ok(None)
2576 }
2577 }
2578 })
2579 }
2580
2581 pub async fn resolve_identify_hint(
2597 &self,
2598 binding: &ProviderBinding,
2599 ) -> Option<IdentifyInstanceHint> {
2600 if let Some(cached) = self.identify_hint_cache.read().get(&binding.component_ref) {
2601 return cached.clone();
2602 }
2603 let hint = match self.invoke_describe_identify_instance(binding).await {
2604 Ok(hint) => hint,
2605 Err(err) => {
2606 warn!(
2607 event = "provider.describe_identify_instance.failed",
2608 component_ref = %binding.component_ref,
2609 error = %err,
2610 "describe-identify-instance probe failed; \
2611 falling back to unhinted wrapper for this component"
2612 );
2613 None
2614 }
2615 };
2616 self.identify_hint_cache
2621 .write()
2622 .insert(binding.component_ref.clone(), hint.clone());
2623 hint
2624 }
2625
2626 pub async fn describe_identify_hints_by_provider_type(
2649 &self,
2650 provider_types: &[&str],
2651 ) -> Result<HashMap<String, Option<IdentifyInstanceHint>>> {
2652 let mut out = HashMap::with_capacity(provider_types.len());
2653 let registry = match self.provider_registry_optional()? {
2654 Some(registry) => registry,
2655 None => {
2656 for ty in provider_types {
2657 out.insert((*ty).to_string(), None);
2658 }
2659 return Ok(out);
2660 }
2661 };
2662 for ty in provider_types {
2663 let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2664 out.insert((*ty).to_string(), None);
2665 continue;
2666 };
2667 let hint = self.resolve_identify_hint(&binding).await;
2668 out.insert((*ty).to_string(), hint);
2669 }
2670 Ok(out)
2671 }
2672
2673 pub async fn identify_endpoints_by_provider_type(
2686 &self,
2687 provider_types: &[&str],
2688 payload: &[u8],
2689 ) -> Result<HashMap<String, IdentifyOutcome>> {
2690 let mut out = HashMap::with_capacity(provider_types.len());
2691 let registry = match self.provider_registry_optional()? {
2692 Some(registry) => registry,
2693 None => {
2694 for ty in provider_types {
2695 out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2696 }
2697 return Ok(out);
2698 }
2699 };
2700 for ty in provider_types {
2701 let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2702 out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2703 continue;
2704 };
2705 let outcome = self
2706 .invoke_identify_instance(&binding, payload.to_vec())
2707 .await?;
2708 out.insert((*ty).to_string(), outcome);
2709 }
2710 Ok(out)
2711 }
2712
2713 pub async fn identify_endpoints_by_provider_type_scoped(
2730 &self,
2731 provider_types: &[&str],
2732 headers: &[(String, String)],
2733 body: &Value,
2734 ) -> Result<HashMap<String, IdentifyOutcome>> {
2735 let mut out = HashMap::with_capacity(provider_types.len());
2736 let registry = match self.provider_registry_optional()? {
2737 Some(registry) => registry,
2738 None => {
2739 for ty in provider_types {
2740 out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2741 }
2742 return Ok(out);
2743 }
2744 };
2745 for ty in provider_types {
2746 let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2747 out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2748 continue;
2749 };
2750 let hint = self.resolve_identify_hint(&binding).await;
2751 let payload = build_scoped_identify_payload(headers, body, hint.as_ref());
2752 let outcome = self.invoke_identify_instance(&binding, payload).await?;
2753 out.insert((*ty).to_string(), outcome);
2754 }
2755 Ok(out)
2756 }
2757
2758 pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
2759 if let Some(registry) = self.provider_registry.read().clone() {
2760 return Ok(registry);
2761 }
2762 let manifest = self
2763 .manifest
2764 .as_ref()
2765 .context("pack manifest required for provider resolution")?;
2766 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
2767 let registry = ProviderRegistry::new(
2768 manifest,
2769 self.state_store.clone(),
2770 &self.config.tenant,
2771 &env,
2772 )?;
2773 *self.provider_registry.write() = Some(registry.clone());
2774 Ok(registry)
2775 }
2776
2777 pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
2778 if self.manifest.is_none() {
2779 return Ok(None);
2780 }
2781 Ok(Some(self.provider_registry()?))
2782 }
2783
2784 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
2785 if let Some(cache) = &self.flows {
2786 return cache
2787 .flows
2788 .get(flow_id)
2789 .cloned()
2790 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
2791 }
2792 if let Some(manifest) = &self.manifest {
2793 let entry = manifest
2794 .flows
2795 .iter()
2796 .find(|f| f.id.as_str() == flow_id)
2797 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
2798 return Ok(entry.flow.clone());
2799 }
2800 bail!("flow '{flow_id}' not available (pack exports disabled)")
2801 }
2802
2803 pub fn metadata(&self) -> &PackMetadata {
2804 &self.metadata
2805 }
2806
2807 pub fn read_asset(&self, asset_path: &str) -> Result<Vec<u8>> {
2812 let normalized = asset_path
2813 .trim_start_matches("assets/")
2814 .trim_start_matches("/assets/");
2815 if let Some(tempdir) = &self.assets_tempdir {
2817 let full = tempdir.path().join("assets").join(normalized);
2818 if full.exists() {
2819 return std::fs::read(&full)
2820 .with_context(|| format!("read asset {}", full.display()));
2821 }
2822 }
2823 let full = self.path.join("assets").join(normalized);
2825 if full.exists() {
2826 return std::fs::read(&full).with_context(|| format!("read asset {}", full.display()));
2827 }
2828 bail!("asset not found: {}", asset_path)
2829 }
2830
2831 pub fn component_manifest(&self, component_ref: &str) -> Option<&ComponentManifest> {
2832 self.component_manifests.get(component_ref)
2833 }
2834
2835 pub fn describe_component_contract_v0_6(&self, component_ref: &str) -> Result<Option<Value>> {
2836 let pack_component = self
2837 .components
2838 .get(component_ref)
2839 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
2840 let engine = self.engine.clone();
2841 let config = Arc::clone(&self.config);
2842 let http_client = Arc::clone(&self.http_client);
2843 let mocks = self.mocks.clone();
2844 let session_store = self.session_store.clone();
2845 let state_store = self.state_store.clone();
2846 let secrets = Arc::clone(&self.secrets);
2847 let oauth_config = self.oauth_config.clone();
2848 let wasi_policy = Arc::clone(&self.wasi_policy);
2849 let pack_id = self.metadata().pack_id.clone();
2850 let allow_state_store = self.allows_state_store(component_ref);
2851 let component = pack_component.component.clone();
2852 let component_ref_owned = component_ref.to_string();
2853 let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2854 let runtime_refs = self.runtime_refs.clone();
2855
2856 run_on_wasi_thread("component.describe", move || {
2857 let mut linker = Linker::new(&engine);
2858 register_all(&mut linker, allow_state_store)?;
2859 add_component_control_to_linker(&mut linker)?;
2860
2861 let host_state = HostState::new(
2862 pack_id.clone(),
2863 config,
2864 http_client,
2865 mocks,
2866 session_store,
2867 state_store,
2868 secrets,
2869 oauth_config,
2870 None,
2871 Some(component_ref_owned),
2872 false,
2873 runtime_config_non_secret,
2874 runtime_refs,
2875 )?;
2876 let store_state = ComponentState::new(host_state, wasi_policy)?;
2877 let mut store = wasmtime::Store::new(&engine, store_state);
2878 let pre_instance = linker.instantiate_pre(&component)?;
2879 let pre = match component_api::v0_6_descriptor::ComponentV0V6V0Pre::new(pre_instance) {
2880 Ok(pre) => pre,
2881 Err(_) => return Ok(None),
2882 };
2883 let bytes = block_on(async {
2884 let bindings = pre.instantiate_async(&mut store).await?;
2885 let descriptor = bindings.greentic_component_component_descriptor();
2886 descriptor.call_describe(&mut store)
2887 })?;
2888
2889 if bytes.is_empty() {
2890 return Ok(Some(Value::Null));
2891 }
2892 if let Ok(value) = serde_cbor::from_slice::<Value>(&bytes) {
2893 return Ok(Some(value));
2894 }
2895 if let Ok(value) = serde_json::from_slice::<Value>(&bytes) {
2896 return Ok(Some(value));
2897 }
2898 if let Ok(text) = String::from_utf8(bytes) {
2899 if let Ok(value) = serde_json::from_str::<Value>(&text) {
2900 return Ok(Some(value));
2901 }
2902 return Ok(Some(Value::String(text)));
2903 }
2904 Ok(Some(Value::Null))
2905 })
2906 }
2907
2908 pub fn load_schema_json(&self, schema_ref: &str) -> Result<Option<Value>> {
2909 let rel = normalize_schema_ref(schema_ref)?;
2910 if self.path.is_dir() {
2911 let candidate = self.path.join(&rel);
2912 if candidate.exists() {
2913 let bytes = std::fs::read(&candidate).with_context(|| {
2914 format!("failed to read schema file {}", candidate.display())
2915 })?;
2916 let value = serde_json::from_slice::<Value>(&bytes)
2917 .with_context(|| format!("invalid schema JSON in {}", candidate.display()))?;
2918 return Ok(Some(value));
2919 }
2920 }
2921
2922 if let Some(archive_path) = self
2923 .archive_path
2924 .as_ref()
2925 .or_else(|| path_is_gtpack(&self.path).then_some(&self.path))
2926 {
2927 let file = File::open(archive_path)
2928 .with_context(|| format!("failed to open {}", archive_path.display()))?;
2929 let mut archive = ZipArchive::new(file)
2930 .with_context(|| format!("failed to read pack {}", archive_path.display()))?;
2931 match archive.by_name(&rel) {
2932 Ok(mut entry) => {
2933 let mut bytes = Vec::new();
2934 entry.read_to_end(&mut bytes)?;
2935 let value = serde_json::from_slice::<Value>(&bytes).with_context(|| {
2936 format!("invalid schema JSON in {}:{}", archive_path.display(), rel)
2937 })?;
2938 Ok(Some(value))
2939 }
2940 Err(zip::result::ZipError::FileNotFound) => Ok(None),
2941 Err(err) => Err(anyhow!(err)).with_context(|| {
2942 format!(
2943 "failed to read schema `{}` from {}",
2944 rel,
2945 archive_path.display()
2946 )
2947 }),
2948 }
2949 } else {
2950 Ok(None)
2951 }
2952 }
2953
2954 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
2955 &self.metadata.secret_requirements
2956 }
2957
2958 pub fn missing_secrets(
2959 &self,
2960 tenant_ctx: &TypesTenantCtx,
2961 ) -> Vec<greentic_types::SecretRequirement> {
2962 let env = tenant_ctx.env.as_str().to_string();
2963 let tenant = tenant_ctx.tenant.as_str().to_string();
2964 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
2965 self.required_secrets()
2966 .iter()
2967 .filter(|req| {
2968 if let Some(scope) = &req.scope {
2970 if scope.env != env {
2971 return false;
2972 }
2973 if scope.tenant != tenant {
2974 return false;
2975 }
2976 if let Some(ref team_req) = scope.team
2977 && team.as_ref() != Some(team_req)
2978 {
2979 return false;
2980 }
2981 }
2982 let ctx = self.config.tenant_ctx();
2983 read_secret_blocking(
2984 &self.secrets,
2985 &ctx,
2986 &self.metadata.pack_id,
2987 canonicalize_secret_key(req.key.as_str()).as_str(),
2988 )
2989 .is_err()
2990 })
2991 .cloned()
2992 .collect()
2993 }
2994
2995 pub fn for_component_test(
2996 components: Vec<(String, PathBuf)>,
2997 flows: HashMap<String, FlowIR>,
2998 pack_id: &str,
2999 config: Arc<HostConfig>,
3000 ) -> Result<Self> {
3001 let engine = Engine::default();
3002 let engine_profile =
3003 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
3004 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
3005 let mut component_map = HashMap::new();
3006 for (name, path) in components {
3007 if !path.exists() {
3008 bail!("component artifact missing: {}", path.display());
3009 }
3010 let wasm_bytes = std::fs::read(&path)?;
3011 let component =
3012 Arc::new(Component::from_binary(&engine, &wasm_bytes).map_err(|err| {
3013 anyhow!("failed to compile component {}: {err}", path.display())
3014 })?);
3015 component_map.insert(
3016 name.clone(),
3017 PackComponent {
3018 name,
3019 version: "0.0.0".into(),
3020 component,
3021 },
3022 );
3023 }
3024
3025 let mut flow_map = HashMap::new();
3026 let mut descriptors = Vec::new();
3027 for (id, ir) in flows {
3028 let flow_type = ir.flow_type.clone();
3029 let flow = flow_ir_to_flow(ir)?;
3030 flow_map.insert(id.clone(), flow);
3031 descriptors.push(FlowDescriptor {
3032 id: id.clone(),
3033 flow_type,
3034 pack_id: pack_id.to_string(),
3035 profile: "test".into(),
3036 version: "0.0.0".into(),
3037 description: None,
3038 });
3039 }
3040 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
3041 let metadata = PackMetadata {
3042 pack_id: pack_id.to_string(),
3043 version: "0.0.0".into(),
3044 entry_flows,
3045 secret_requirements: Vec::new(),
3046 };
3047 let flows_cache = PackFlows {
3048 descriptors: descriptors.clone(),
3049 flows: flow_map,
3050 metadata: metadata.clone(),
3051 };
3052
3053 Ok(Self {
3054 path: PathBuf::new(),
3055 archive_path: None,
3056 config,
3057 engine,
3058 metadata,
3059 manifest: None,
3060 legacy_manifest: None,
3061 component_manifests: HashMap::new(),
3062 mocks: None,
3063 flows: Some(flows_cache),
3064 components: component_map,
3065 http_client: Arc::clone(&HTTP_CLIENT),
3066 pre_cache: Mutex::new(HashMap::new()),
3067 session_store: None,
3068 state_store: None,
3069 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
3070 assets_tempdir: None,
3071 provider_registry: RwLock::new(None),
3072 identify_hint_cache: RwLock::new(HashMap::new()),
3073 secrets: crate::secrets::default_manager()?,
3074 oauth_config: None,
3075 cache,
3076 runtime_config_non_secret: None,
3077 runtime_refs: None,
3078 })
3079 }
3080}
3081
3082fn resolve_component_key<'a>(
3100 component_ref: &'a str,
3101 operation: &str,
3102 is_registered: impl Fn(&str) -> bool,
3103) -> &'a str {
3104 if is_registered(component_ref) {
3105 return component_ref;
3106 }
3107 if let Some((prefix, suffix)) = component_ref.rsplit_once('.')
3108 && suffix == operation
3109 && is_registered(prefix)
3110 {
3111 return prefix;
3112 }
3113 component_ref
3114}
3115
3116#[cfg(test)]
3117mod resolve_component_key_tests {
3118 use super::resolve_component_key;
3119 use std::collections::HashSet;
3120
3121 fn registered(keys: &[&'static str]) -> impl Fn(&str) -> bool {
3122 let set: HashSet<&'static str> = keys.iter().copied().collect();
3123 move |key: &str| set.contains(key)
3124 }
3125
3126 #[test]
3127 fn full_reference_is_used_when_registered() {
3128 let is_reg = registered(&["ai.greentic.component-templates", "ai.greentic"]);
3130 assert_eq!(
3131 resolve_component_key("ai.greentic.component-templates", "handle_message", is_reg),
3132 "ai.greentic.component-templates"
3133 );
3134 }
3135
3136 #[test]
3137 fn legacy_packed_id_falls_back_when_suffix_is_operation() {
3138 let is_reg = registered(&["qa"]);
3140 assert_eq!(resolve_component_key("qa.process", "process", is_reg), "qa");
3141 }
3142
3143 #[test]
3144 fn drifted_dotted_reference_does_not_fall_back_to_prefix() {
3145 let is_reg = registered(&["ai.greentic"]);
3149 assert_eq!(
3150 resolve_component_key("ai.greentic.component-templates", "handle_message", is_reg),
3151 "ai.greentic.component-templates"
3152 );
3153 }
3154
3155 #[test]
3156 fn unregistered_reference_is_returned_unchanged() {
3157 let is_reg = registered(&[]);
3158 assert_eq!(resolve_component_key("foo", "bar", is_reg), "foo");
3159 }
3160}
3161
3162fn normalize_schema_ref(schema_ref: &str) -> Result<String> {
3163 let candidate = schema_ref.trim();
3164 if candidate.is_empty() {
3165 bail!("schema ref cannot be empty");
3166 }
3167 let path = Path::new(candidate);
3168 if path.is_absolute() {
3169 bail!("schema ref must be relative: {}", schema_ref);
3170 }
3171 let mut normalized = PathBuf::new();
3172 for component in path.components() {
3173 match component {
3174 std::path::Component::Normal(part) => normalized.push(part),
3175 std::path::Component::CurDir => {}
3176 _ => bail!("schema ref must not contain traversal: {}", schema_ref),
3177 }
3178 }
3179 let normalized = normalized
3180 .to_str()
3181 .map(ToString::to_string)
3182 .ok_or_else(|| anyhow!("schema ref must be valid UTF-8"))?;
3183 if normalized.is_empty() {
3184 bail!("schema ref cannot normalize to empty path");
3185 }
3186 Ok(normalized)
3187}
3188
3189fn path_is_gtpack(path: &Path) -> bool {
3190 path.extension()
3191 .and_then(|ext| ext.to_str())
3192 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
3193 .unwrap_or(false)
3194}
3195
3196fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
3197 let message = err.to_string();
3198 message.contains("no exported instance named")
3199 && message.contains(&format!("greentic:component/node@{version}"))
3200}
3201
3202struct PackFlows {
3203 descriptors: Vec<FlowDescriptor>,
3204 flows: HashMap<String, Flow>,
3205 metadata: PackMetadata,
3206}
3207
3208const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
3209 "greentic.pack.runtime_flow",
3210 "greentic.pack.flow_runtime",
3211 "greentic.pack.runtime_flows",
3212];
3213
3214#[derive(Debug, Deserialize)]
3215struct RuntimeFlowBundle {
3216 flows: Vec<RuntimeFlow>,
3217}
3218
3219#[derive(Debug, Deserialize)]
3220struct RuntimeFlow {
3221 id: String,
3222 #[serde(alias = "flow_type")]
3223 kind: FlowKind,
3224 #[serde(default)]
3225 schema_version: Option<String>,
3226 #[serde(default)]
3227 start: Option<String>,
3228 #[serde(default)]
3229 entrypoints: BTreeMap<String, Value>,
3230 nodes: BTreeMap<String, RuntimeNode>,
3231 #[serde(default)]
3232 metadata: Option<FlowMetadata>,
3233}
3234
3235#[derive(Debug, Deserialize)]
3236struct RuntimeNode {
3237 #[serde(alias = "component")]
3238 component_id: String,
3239 #[serde(default, alias = "operation")]
3240 operation_name: Option<String>,
3241 #[serde(default, alias = "payload", alias = "input")]
3242 operation_payload: Value,
3243 #[serde(default)]
3244 config: Value,
3245 #[serde(default)]
3246 routing: Option<Routing>,
3247 #[serde(default)]
3248 telemetry: Option<TelemetryHints>,
3249}
3250
3251fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
3252 if bytes.is_empty() {
3253 return Ok(Value::Null);
3254 }
3255 serde_json::from_slice(&bytes).or_else(|_| {
3256 String::from_utf8(bytes)
3257 .map(Value::String)
3258 .map_err(|err| anyhow!(err))
3259 })
3260}
3261
3262fn is_missing_export_error(message: &str) -> bool {
3278 let has_broad_marker = message.contains("no exported instance named")
3279 || message.contains("no exported function named");
3280 let has_identity_segment = message.contains("instance-identity-api")
3281 || message.contains("identify-instance")
3282 || message.contains("instance-identity-describe-api")
3283 || message.contains("describe-identify-instance");
3284 has_broad_marker && has_identity_segment
3285}
3286
3287fn build_scoped_identify_payload(
3305 headers: &[(String, String)],
3306 body: &Value,
3307 hint: Option<&IdentifyInstanceHint>,
3308) -> Vec<u8> {
3309 let scoped_headers: Vec<&(String, String)> = match hint {
3310 Some(hint) => {
3313 let allowed = hint.header_names();
3314 headers
3315 .iter()
3316 .filter(|(name, _)| allowed.contains(&name.as_str()))
3317 .collect()
3318 }
3319 None => headers.iter().collect(),
3320 };
3321 let wrapper = serde_json::json!({
3322 "headers": scoped_headers
3323 .iter()
3324 .map(|(name, value)| serde_json::json!({ "name": name, "value": value }))
3325 .collect::<Vec<_>>(),
3326 "body": body,
3327 });
3328 serde_json::to_vec(&wrapper).expect("wrapper payload always serializes")
3329}
3330
3331#[cfg(test)]
3332mod build_scoped_identify_payload_tests {
3333 use super::*;
3334 use crate::identify_hint::HintSource;
3335 use serde_json::json;
3336
3337 fn hint(sources: Vec<HintSource>) -> IdentifyInstanceHint {
3338 IdentifyInstanceHint { sources }
3339 }
3340
3341 #[test]
3342 fn unhinted_passes_all_input_headers_through() {
3343 let headers = vec![
3347 (
3348 "x-telegram-bot-api-secret-token".into(),
3349 "telegram-tok".into(),
3350 ),
3351 ("x-future-routing-tag".into(), "abc".into()),
3352 ];
3353 let body = json!({ "update_id": 1 });
3354 let bytes = build_scoped_identify_payload(&headers, &body, None);
3355 let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3356 assert_eq!(
3357 parsed["headers"],
3358 json!([
3359 { "name": "x-telegram-bot-api-secret-token", "value": "telegram-tok" },
3360 { "name": "x-future-routing-tag", "value": "abc" }
3361 ])
3362 );
3363 assert_eq!(parsed["body"], body);
3364 }
3365
3366 #[test]
3367 fn header_hint_filters_to_declared_names_only() {
3368 let h = hint(vec![HintSource::Header {
3372 name: "x-telegram-bot-api-secret-token".into(),
3373 }]);
3374 let headers = vec![
3375 (
3376 "x-telegram-bot-api-secret-token".into(),
3377 "telegram-tok".into(),
3378 ),
3379 ("x-slack-signature".into(), "v0=sig".into()),
3380 ];
3381 let body = json!({});
3382 let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3383 let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3384 assert_eq!(
3385 parsed["headers"],
3386 json!([
3387 { "name": "x-telegram-bot-api-secret-token", "value": "telegram-tok" }
3388 ])
3389 );
3390 }
3391
3392 #[test]
3393 fn hints_without_header_sources_drop_all_headers() {
3394 let headers = vec![(
3399 "x-telegram-bot-api-secret-token".into(),
3400 "should-not-leak".into(),
3401 )];
3402 let body = json!({ "anything": true });
3403 for h in [
3404 hint(vec![HintSource::BodyPath {
3405 json_pointer: "/recipient/id".into(),
3406 }]),
3407 hint(vec![]),
3408 ] {
3409 let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3410 let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3411 assert_eq!(parsed["headers"], json!([]), "hint={:?}", h.sources);
3412 assert_eq!(parsed["body"], body);
3413 }
3414 }
3415
3416 #[test]
3417 fn header_filter_preserves_input_order_and_dups() {
3418 let h = hint(vec![HintSource::Header {
3423 name: "x-route".into(),
3424 }]);
3425 let headers = vec![
3426 ("x-route".into(), "a".into()),
3427 ("x-other".into(), "skip".into()),
3428 ("x-route".into(), "b".into()),
3429 ];
3430 let body = json!({});
3431 let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3432 let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3433 assert_eq!(
3434 parsed["headers"],
3435 json!([
3436 { "name": "x-route", "value": "a" },
3437 { "name": "x-route", "value": "b" }
3438 ])
3439 );
3440 }
3441}
3442
3443impl PackFlows {
3444 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
3445 if let Some(flows) = flows_from_runtime_extension(&manifest) {
3446 return flows;
3447 }
3448 let descriptors = manifest
3449 .flows
3450 .iter()
3451 .map(|entry| FlowDescriptor {
3452 id: entry.id.as_str().to_string(),
3453 flow_type: flow_kind_to_str(entry.kind).to_string(),
3454 pack_id: manifest.pack_id.as_str().to_string(),
3455 profile: manifest.pack_id.as_str().to_string(),
3456 version: manifest.version.to_string(),
3457 description: None,
3458 })
3459 .collect();
3460 let mut flows = HashMap::new();
3461 for entry in &manifest.flows {
3462 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
3463 }
3464 Self {
3465 metadata: PackMetadata::from_manifest(&manifest),
3466 descriptors,
3467 flows,
3468 }
3469 }
3470}
3471
3472fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
3473 let extensions = manifest.extensions.as_ref()?;
3474 let extension = extensions.iter().find_map(|(key, ext)| {
3475 if RUNTIME_FLOW_EXTENSION_IDS
3476 .iter()
3477 .any(|candidate| candidate == key)
3478 {
3479 Some(ext)
3480 } else {
3481 None
3482 }
3483 })?;
3484 let runtime_flows = match decode_runtime_flow_extension(extension) {
3485 Some(flows) if !flows.is_empty() => flows,
3486 _ => return None,
3487 };
3488
3489 let descriptors = runtime_flows
3490 .iter()
3491 .map(|flow| FlowDescriptor {
3492 id: flow.id.as_str().to_string(),
3493 flow_type: flow_kind_to_str(flow.kind).to_string(),
3494 pack_id: manifest.pack_id.as_str().to_string(),
3495 profile: manifest.pack_id.as_str().to_string(),
3496 version: manifest.version.to_string(),
3497 description: None,
3498 })
3499 .collect::<Vec<_>>();
3500 let flows = runtime_flows
3501 .into_iter()
3502 .map(|flow| (flow.id.as_str().to_string(), flow))
3503 .collect();
3504
3505 Some(PackFlows {
3506 metadata: PackMetadata::from_manifest(manifest),
3507 descriptors,
3508 flows,
3509 })
3510}
3511
3512fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
3513 let value = match extension.inline.as_ref()? {
3514 ExtensionInline::Other(value) => value.clone(),
3515 _ => return None,
3516 };
3517
3518 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
3519 return Some(collect_runtime_flows(bundle.flows));
3520 }
3521
3522 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
3523 return Some(collect_runtime_flows(flows));
3524 }
3525
3526 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
3527 return Some(flows);
3528 }
3529
3530 warn!(
3531 extension = %extension.kind,
3532 version = %extension.version,
3533 "runtime flow extension present but could not be decoded"
3534 );
3535 None
3536}
3537
3538fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
3539 flows
3540 .into_iter()
3541 .filter_map(|flow| match runtime_flow_to_flow(flow) {
3542 Ok(flow) => Some(flow),
3543 Err(err) => {
3544 warn!(error = %err, "failed to decode runtime flow");
3545 None
3546 }
3547 })
3548 .collect()
3549}
3550
3551fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
3552 let flow_id = FlowId::from_str(&runtime.id)
3553 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
3554 let mut entrypoints = runtime.entrypoints;
3555 if entrypoints.is_empty()
3556 && let Some(start) = &runtime.start
3557 {
3558 entrypoints.insert("default".into(), Value::String(start.clone()));
3559 }
3560
3561 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
3562 for (id, node) in runtime.nodes {
3563 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
3564 let component_id = ComponentId::from_str(&node.component_id)
3565 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
3566 let operation_payload = if node.config.is_null() {
3567 node.operation_payload
3568 } else {
3569 serde_json::json!({
3570 "input": node.operation_payload,
3571 "config": node.config,
3572 })
3573 };
3574 let component = FlowComponentRef {
3575 id: component_id,
3576 pack_alias: None,
3577 operation: node.operation_name,
3578 };
3579 let routing = node.routing.unwrap_or(Routing::End);
3580 let telemetry = node.telemetry.unwrap_or_default();
3581 nodes.insert(
3582 node_id.clone(),
3583 Node {
3584 id: node_id,
3585 component,
3586 input: InputMapping {
3587 mapping: operation_payload,
3588 },
3589 output: OutputMapping {
3590 mapping: Value::Null,
3591 },
3592 err_map: None,
3593 routing,
3594 telemetry,
3595 },
3596 );
3597 }
3598
3599 Ok(Flow {
3600 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
3601 id: flow_id,
3602 kind: runtime.kind,
3603 entrypoints,
3604 nodes,
3605 metadata: runtime.metadata.unwrap_or_default(),
3606 })
3607}
3608
3609fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
3610 match kind {
3611 greentic_types::FlowKind::Messaging => "messaging",
3612 greentic_types::FlowKind::Event => "event",
3613 greentic_types::FlowKind::ComponentConfig => "component-config",
3614 greentic_types::FlowKind::Job => "job",
3615 greentic_types::FlowKind::Http => "http",
3616 }
3617}
3618
3619fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
3620 let mut file = archive
3621 .by_name(name)
3622 .with_context(|| format!("entry {name} missing from archive"))?;
3623 let mut buf = Vec::new();
3624 file.read_to_end(&mut buf)?;
3625 Ok(buf)
3626}
3627
3628fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
3629 for node in doc.nodes.values_mut() {
3630 let Some((component_ref, payload)) = node
3631 .raw
3632 .iter()
3633 .next()
3634 .map(|(key, value)| (key.clone(), value.clone()))
3635 else {
3636 continue;
3637 };
3638 if component_ref.starts_with("emit.") {
3639 node.operation = Some(component_ref);
3640 node.payload = payload;
3641 node.raw.clear();
3642 continue;
3643 }
3644 let (target_component, operation, input, config) =
3645 infer_component_exec(&payload, &component_ref);
3646 let mut payload_obj = serde_json::Map::new();
3647 payload_obj.insert("component".into(), Value::String(target_component));
3649 payload_obj.insert("operation".into(), Value::String(operation));
3650 payload_obj.insert("input".into(), input);
3651 if let Some(cfg) = config {
3652 payload_obj.insert("config".into(), cfg);
3653 }
3654 node.operation = Some("component.exec".to_string());
3655 node.payload = Value::Object(payload_obj);
3656 node.raw.clear();
3657 }
3658 doc
3659}
3660
3661fn infer_component_exec(
3662 payload: &Value,
3663 component_ref: &str,
3664) -> (String, String, Value, Option<Value>) {
3665 let default_op = if component_ref.starts_with("templating.") {
3666 "render"
3667 } else {
3668 "invoke"
3669 }
3670 .to_string();
3671
3672 if let Value::Object(map) = payload {
3673 let has_embedded_component =
3674 map.get("component").is_some() || map.get("component_ref").is_some();
3675 let op = map
3676 .get("op")
3677 .or_else(|| map.get("operation"))
3678 .and_then(Value::as_str)
3679 .map(|s| s.to_string())
3680 .unwrap_or_else(|| {
3681 if has_embedded_component {
3682 component_ref.to_string()
3683 } else {
3684 default_op.clone()
3685 }
3686 });
3687
3688 let mut input = map.clone();
3689 let config = input.remove("config");
3690 let canonical_input = if has_embedded_component {
3691 input.get("input").cloned()
3692 } else {
3693 None
3694 };
3695 let component = input
3696 .get("component")
3697 .or_else(|| input.get("component_ref"))
3698 .and_then(Value::as_str)
3699 .map(|s| s.to_string())
3700 .unwrap_or_else(|| component_ref.to_string());
3701 input.remove("component");
3702 input.remove("component_ref");
3703 input.remove("op");
3704 input.remove("operation");
3705 let input = canonical_input.unwrap_or(Value::Object(input));
3706 return (component, op, input, config);
3707 }
3708
3709 (component_ref.to_string(), default_op, payload.clone(), None)
3710}
3711
3712#[derive(Clone, Debug)]
3713struct ComponentSpec {
3714 id: String,
3715 version: String,
3716 legacy_path: Option<String>,
3717}
3718
3719#[derive(Clone, Debug)]
3720struct ComponentSourceInfo {
3721 digest: Option<String>,
3722 source: ComponentSourceRef,
3723 artifact: ComponentArtifactLocation,
3724 expected_wasm_sha256: Option<String>,
3725 skip_digest_verification: bool,
3726}
3727
3728#[derive(Clone, Debug)]
3729enum ComponentArtifactLocation {
3730 Inline { wasm_path: String },
3731 Remote,
3732}
3733
3734#[derive(Clone, Debug, Deserialize)]
3735struct PackLockV1 {
3736 schema_version: u32,
3737 components: Vec<PackLockComponent>,
3738}
3739
3740#[derive(Clone, Debug, Deserialize)]
3741struct PackLockComponent {
3742 name: String,
3743 #[serde(default, rename = "source_ref")]
3744 source_ref: Option<String>,
3745 #[serde(default, rename = "ref")]
3746 legacy_ref: Option<String>,
3747 #[serde(default)]
3748 component_id: Option<ComponentId>,
3749 #[serde(default)]
3750 bundled: Option<bool>,
3751 #[serde(default, rename = "bundled_path")]
3752 bundled_path: Option<String>,
3753 #[serde(default, rename = "path")]
3754 legacy_path: Option<String>,
3755 #[serde(default)]
3756 wasm_sha256: Option<String>,
3757 #[serde(default, rename = "sha256")]
3758 legacy_sha256: Option<String>,
3759 #[serde(default)]
3760 resolved_digest: Option<String>,
3761 #[serde(default)]
3762 digest: Option<String>,
3763}
3764
3765fn component_specs(
3766 manifest: Option<&greentic_types::PackManifest>,
3767 legacy_manifest: Option<&legacy_pack::PackManifest>,
3768 component_sources: Option<&ComponentSourcesV1>,
3769 pack_lock: Option<&PackLockV1>,
3770) -> Vec<ComponentSpec> {
3771 if let Some(manifest) = manifest {
3772 if !manifest.components.is_empty() {
3773 return manifest
3774 .components
3775 .iter()
3776 .map(|entry| ComponentSpec {
3777 id: entry.id.as_str().to_string(),
3778 version: entry.version.to_string(),
3779 legacy_path: None,
3780 })
3781 .collect();
3782 }
3783 if let Some(lock) = pack_lock {
3784 let mut seen = HashSet::new();
3785 let mut specs = Vec::new();
3786 for entry in &lock.components {
3787 let id = entry
3788 .component_id
3789 .as_ref()
3790 .map(|id| id.as_str())
3791 .unwrap_or(entry.name.as_str());
3792 if seen.insert(id.to_string()) {
3793 specs.push(ComponentSpec {
3794 id: id.to_string(),
3795 version: "0.0.0".to_string(),
3796 legacy_path: None,
3797 });
3798 }
3799 }
3800 return specs;
3801 }
3802 if let Some(sources) = component_sources {
3803 let mut seen = HashSet::new();
3804 let mut specs = Vec::new();
3805 for entry in &sources.components {
3806 let id = entry
3807 .component_id
3808 .as_ref()
3809 .map(|id| id.as_str())
3810 .unwrap_or(entry.name.as_str());
3811 if seen.insert(id.to_string()) {
3812 specs.push(ComponentSpec {
3813 id: id.to_string(),
3814 version: "0.0.0".to_string(),
3815 legacy_path: None,
3816 });
3817 }
3818 }
3819 return specs;
3820 }
3821 }
3822 if let Some(legacy_manifest) = legacy_manifest {
3823 return legacy_manifest
3824 .components
3825 .iter()
3826 .map(|entry| ComponentSpec {
3827 id: entry.name.clone(),
3828 version: entry.version.to_string(),
3829 legacy_path: Some(entry.file_wasm.clone()),
3830 })
3831 .collect();
3832 }
3833 Vec::new()
3834}
3835
3836fn component_sources_table(
3837 sources: Option<&ComponentSourcesV1>,
3838) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
3839 let Some(sources) = sources else {
3840 return Ok(None);
3841 };
3842 let mut table = HashMap::new();
3843 for entry in &sources.components {
3844 let artifact = match &entry.artifact {
3845 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
3846 wasm_path: wasm_path.clone(),
3847 },
3848 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
3849 };
3850 let info = ComponentSourceInfo {
3851 digest: Some(entry.resolved.digest.clone()),
3852 source: entry.source.clone(),
3853 artifact,
3854 expected_wasm_sha256: None,
3855 skip_digest_verification: false,
3856 };
3857 if let Some(component_id) = entry.component_id.as_ref() {
3858 table.insert(component_id.as_str().to_string(), info.clone());
3859 }
3860 table.insert(entry.name.clone(), info);
3861 }
3862 Ok(Some(table))
3863}
3864
3865fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
3866 let lock_path = if path.is_dir() {
3867 let candidate = path.join("pack.lock");
3868 if candidate.exists() {
3869 Some(candidate)
3870 } else {
3871 let candidate = path.join("pack.lock.json");
3872 candidate.exists().then_some(candidate)
3873 }
3874 } else {
3875 None
3876 };
3877 let Some(lock_path) = lock_path else {
3878 return Ok(None);
3879 };
3880 let raw = std::fs::read_to_string(&lock_path)
3881 .with_context(|| format!("failed to read {}", lock_path.display()))?;
3882 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
3883 if lock.schema_version != 1 {
3884 bail!("pack.lock schema_version must be 1");
3885 }
3886 Ok(Some(lock))
3887}
3888
3889fn find_pack_lock_roots(
3890 pack_path: &Path,
3891 is_dir: bool,
3892 archive_hint: Option<&Path>,
3893) -> Vec<PathBuf> {
3894 if is_dir {
3895 return vec![pack_path.to_path_buf()];
3896 }
3897 let mut roots = Vec::new();
3898 if let Some(archive_path) = archive_hint {
3899 if let Some(parent) = archive_path.parent() {
3900 roots.push(parent.to_path_buf());
3901 if let Some(grandparent) = parent.parent() {
3902 roots.push(grandparent.to_path_buf());
3903 }
3904 }
3905 } else if let Some(parent) = pack_path.parent() {
3906 roots.push(parent.to_path_buf());
3907 if let Some(grandparent) = parent.parent() {
3908 roots.push(grandparent.to_path_buf());
3909 }
3910 }
3911 roots
3912}
3913
3914fn normalize_sha256(digest: &str) -> Result<String> {
3915 let trimmed = digest.trim();
3916 if trimmed.is_empty() {
3917 bail!("sha256 digest cannot be empty");
3918 }
3919 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
3920 if stripped.is_empty() {
3921 bail!("sha256 digest must include hex bytes after sha256:");
3922 }
3923 return Ok(trimmed.to_string());
3924 }
3925 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
3926 return Ok(format!("sha256:{trimmed}"));
3927 }
3928 bail!("sha256 digest must be hex or sha256:<hex>");
3929}
3930
3931fn component_sources_table_from_pack_lock(
3932 lock: &PackLockV1,
3933 allow_missing_hash: bool,
3934) -> Result<HashMap<String, ComponentSourceInfo>> {
3935 let mut table = HashMap::new();
3936 let mut names = HashSet::new();
3937 for entry in &lock.components {
3938 if !names.insert(entry.name.clone()) {
3939 bail!(
3940 "pack.lock contains duplicate component name `{}`",
3941 entry.name
3942 );
3943 }
3944 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
3945 (Some(primary), Some(legacy)) => {
3946 if primary != legacy {
3947 bail!(
3948 "pack.lock component {} has conflicting refs: {} vs {}",
3949 entry.name,
3950 primary,
3951 legacy
3952 );
3953 }
3954 primary.as_str()
3955 }
3956 (Some(primary), None) => primary.as_str(),
3957 (None, Some(legacy)) => legacy.as_str(),
3958 (None, None) => {
3959 bail!("pack.lock component {} missing source_ref", entry.name);
3960 }
3961 };
3962 let source: ComponentSourceRef = source_ref
3963 .parse()
3964 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
3965 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
3966 (Some(primary), Some(legacy)) => {
3967 if primary != legacy {
3968 bail!(
3969 "pack.lock component {} has conflicting bundled paths: {} vs {}",
3970 entry.name,
3971 primary,
3972 legacy
3973 );
3974 }
3975 Some(primary.clone())
3976 }
3977 (Some(primary), None) => Some(primary.clone()),
3978 (None, Some(legacy)) => Some(legacy.clone()),
3979 (None, None) => None,
3980 };
3981 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
3982 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
3983 let wasm_path = bundled_path.ok_or_else(|| {
3984 anyhow!(
3985 "pack.lock component {} marked bundled but bundled_path is missing",
3986 entry.name
3987 )
3988 })?;
3989 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
3990 (Some(primary), Some(legacy)) => {
3991 if primary != legacy {
3992 bail!(
3993 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
3994 entry.name,
3995 primary,
3996 legacy
3997 );
3998 }
3999 Some(primary.as_str())
4000 }
4001 (Some(primary), None) => Some(primary.as_str()),
4002 (None, Some(legacy)) => Some(legacy.as_str()),
4003 (None, None) => None,
4004 };
4005 let expected = match expected_raw {
4006 Some(value) => Some(normalize_sha256(value)?),
4007 None => None,
4008 };
4009 if expected.is_none() && !allow_missing_hash {
4010 bail!(
4011 "pack.lock component {} missing wasm_sha256 for bundled component",
4012 entry.name
4013 );
4014 }
4015 (
4016 ComponentArtifactLocation::Inline { wasm_path },
4017 expected.clone(),
4018 expected,
4019 allow_missing_hash && expected_raw.is_none(),
4020 )
4021 } else {
4022 if source.is_tag() {
4023 bail!(
4024 "component {} uses tag ref {} but is not bundled; rebuild the pack",
4025 entry.name,
4026 source
4027 );
4028 }
4029 let expected = entry
4030 .resolved_digest
4031 .as_deref()
4032 .or(entry.digest.as_deref())
4033 .ok_or_else(|| {
4034 anyhow!(
4035 "pack.lock component {} missing resolved_digest for remote component",
4036 entry.name
4037 )
4038 })?;
4039 (
4040 ComponentArtifactLocation::Remote,
4041 Some(normalize_digest(expected)),
4042 None,
4043 false,
4044 )
4045 };
4046 let info = ComponentSourceInfo {
4047 digest,
4048 source,
4049 artifact,
4050 expected_wasm_sha256,
4051 skip_digest_verification,
4052 };
4053 if let Some(component_id) = entry.component_id.as_ref() {
4054 let key = component_id.as_str().to_string();
4055 if table.contains_key(&key) {
4056 bail!(
4057 "pack.lock contains duplicate component id `{}`",
4058 component_id.as_str()
4059 );
4060 }
4061 table.insert(key, info.clone());
4062 }
4063 if entry.name
4064 != entry
4065 .component_id
4066 .as_ref()
4067 .map(|id| id.as_str())
4068 .unwrap_or("")
4069 {
4070 table.insert(entry.name.clone(), info);
4071 }
4072 }
4073 Ok(table)
4074}
4075
4076fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
4077 if let Some(path) = &spec.legacy_path {
4078 return root.join(path);
4079 }
4080 root.join("components").join(format!("{}.wasm", spec.id))
4081}
4082
4083fn normalize_digest(digest: &str) -> String {
4084 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
4085 digest.to_string()
4086 } else {
4087 format!("sha256:{digest}")
4088 }
4089}
4090
4091fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
4092 if digest.starts_with("blake3:") {
4093 let hash = blake3::hash(bytes);
4094 return Ok(format!("blake3:{}", hash.to_hex()));
4095 }
4096 let mut hasher = sha2::Sha256::new();
4097 hasher.update(bytes);
4098 Ok(format!("sha256:{}", to_hex(&hasher.finalize())))
4099}
4100
4101fn compute_sha256_digest_for(bytes: &[u8]) -> String {
4102 let mut hasher = sha2::Sha256::new();
4103 hasher.update(bytes);
4104 format!("sha256:{}", to_hex(&hasher.finalize()))
4105}
4106
4107fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
4108 let wasm_digest = digest
4109 .map(normalize_digest)
4110 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
4111 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
4112}
4113
4114async fn compile_component_with_cache(
4115 cache: &CacheManager,
4116 engine: &Engine,
4117 digest: Option<&str>,
4118 bytes: Vec<u8>,
4119) -> Result<Arc<Component>> {
4120 let key = build_artifact_key(cache, digest, &bytes);
4121 cache.get_component(engine, &key, || Ok(bytes)).await
4122}
4123
4124fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
4125 let normalized_expected = normalize_digest(expected);
4126 let actual = compute_digest_for(bytes, &normalized_expected)?;
4127 if normalize_digest(&actual) != normalized_expected {
4128 bail!(
4129 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
4130 );
4131 }
4132 Ok(())
4133}
4134
4135fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
4136 let normalized_expected = normalize_sha256(expected)?;
4137 let actual = compute_sha256_digest_for(bytes);
4138 if actual != normalized_expected {
4139 bail!(
4140 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
4141 );
4142 }
4143 Ok(())
4144}
4145
4146fn to_hex(digest: &[u8]) -> String {
4147 digest.iter().map(|byte| format!("{byte:02x}")).collect()
4148}
4149
4150#[cfg(test)]
4151mod pack_lock_tests {
4152 use super::*;
4153 use tempfile::TempDir;
4154
4155 #[test]
4156 fn pack_lock_tag_ref_requires_bundle() {
4157 let lock = PackLockV1 {
4158 schema_version: 1,
4159 components: vec![PackLockComponent {
4160 name: "templates".to_string(),
4161 source_ref: Some("oci://registry.test/templates:latest".to_string()),
4162 legacy_ref: None,
4163 component_id: None,
4164 bundled: Some(false),
4165 bundled_path: None,
4166 legacy_path: None,
4167 wasm_sha256: None,
4168 legacy_sha256: None,
4169 resolved_digest: None,
4170 digest: None,
4171 }],
4172 };
4173 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
4174 assert!(
4175 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
4176 "unexpected error: {err}"
4177 );
4178 }
4179
4180 #[test]
4181 fn bundled_hash_mismatch_errors() {
4182 let rt = tokio::runtime::Runtime::new().expect("runtime");
4183 let temp = TempDir::new().expect("temp dir");
4184 let engine = Engine::default();
4185 let engine_profile =
4186 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
4187 let cache_config = CacheConfig {
4188 root: temp.path().join("cache"),
4189 ..CacheConfig::default()
4190 };
4191 let cache = CacheManager::new(cache_config, engine_profile);
4192 let wasm_path = temp.path().join("component.wasm");
4193 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
4194 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
4195 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
4196 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
4197
4198 let spec = ComponentSpec {
4199 id: "qa.process".to_string(),
4200 version: "0.0.0".to_string(),
4201 legacy_path: None,
4202 };
4203 let mut missing = HashSet::new();
4204 missing.insert(spec.id.clone());
4205
4206 let mut sources = HashMap::new();
4207 sources.insert(
4208 spec.id.clone(),
4209 ComponentSourceInfo {
4210 digest: Some("sha256:deadbeef".to_string()),
4211 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
4212 artifact: ComponentArtifactLocation::Inline {
4213 wasm_path: "component.wasm".to_string(),
4214 },
4215 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
4216 skip_digest_verification: false,
4217 },
4218 );
4219
4220 let mut loaded = HashMap::new();
4221 let result = rt.block_on(load_components_from_sources(
4222 &cache,
4223 &engine,
4224 &sources,
4225 &ComponentResolution::default(),
4226 &[spec],
4227 &mut missing,
4228 &mut loaded,
4229 Some(temp.path()),
4230 None,
4231 ));
4232 let err = result.unwrap_err();
4233 assert!(
4234 err.to_string().contains("bundled digest mismatch"),
4235 "unexpected error: {err}"
4236 );
4237 }
4238}
4239
4240#[cfg(test)]
4241mod pack_resolution_prop_tests {
4242 use super::*;
4243 use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
4244 use proptest::prelude::*;
4245 use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
4246 use std::collections::BTreeSet;
4247 use std::path::Path;
4248 use std::str::FromStr;
4249
4250 #[derive(Clone, Debug)]
4251 enum ResolveRequest {
4252 ById(String),
4253 ByName(String),
4254 }
4255
4256 #[derive(Clone, Debug, PartialEq, Eq)]
4257 struct ResolvedComponent {
4258 key: String,
4259 source: String,
4260 artifact: String,
4261 digest: Option<String>,
4262 expected_wasm_sha256: Option<String>,
4263 skip_digest_verification: bool,
4264 }
4265
4266 #[derive(Clone, Debug, PartialEq, Eq)]
4267 struct ResolveError {
4268 code: String,
4269 message: String,
4270 context_key: String,
4271 }
4272
4273 #[derive(Clone, Debug)]
4274 struct Scenario {
4275 pack_lock: Option<PackLockV1>,
4276 component_sources: Option<ComponentSourcesV1>,
4277 request: ResolveRequest,
4278 expected_sha256: Option<String>,
4279 bytes: Vec<u8>,
4280 }
4281
4282 fn resolve_component_test(
4283 sources: Option<&ComponentSourcesV1>,
4284 lock: Option<&PackLockV1>,
4285 request: &ResolveRequest,
4286 ) -> Result<ResolvedComponent, ResolveError> {
4287 let table = if let Some(lock) = lock {
4288 component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
4289 code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
4290 message: err.to_string(),
4291 context_key: request_key(request).to_string(),
4292 })?
4293 } else {
4294 let sources = component_sources_table(sources).map_err(|err| ResolveError {
4295 code: "component_sources_error".to_string(),
4296 message: err.to_string(),
4297 context_key: request_key(request).to_string(),
4298 })?;
4299 sources.ok_or_else(|| ResolveError {
4300 code: "missing_component_sources".to_string(),
4301 message: "component sources not provided".to_string(),
4302 context_key: request_key(request).to_string(),
4303 })?
4304 };
4305
4306 let key = request_key(request);
4307 let source = table.get(key).ok_or_else(|| ResolveError {
4308 code: "component_not_found".to_string(),
4309 message: format!("component {key} not found"),
4310 context_key: key.to_string(),
4311 })?;
4312
4313 Ok(ResolvedComponent {
4314 key: key.to_string(),
4315 source: source.source.to_string(),
4316 artifact: match source.artifact {
4317 ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
4318 ComponentArtifactLocation::Remote => "remote".to_string(),
4319 },
4320 digest: source.digest.clone(),
4321 expected_wasm_sha256: source.expected_wasm_sha256.clone(),
4322 skip_digest_verification: source.skip_digest_verification,
4323 })
4324 }
4325
4326 fn request_key(request: &ResolveRequest) -> &str {
4327 match request {
4328 ResolveRequest::ById(value) => value.as_str(),
4329 ResolveRequest::ByName(value) => value.as_str(),
4330 }
4331 }
4332
4333 fn classify_pack_lock_error(message: &str) -> &'static str {
4334 if message.contains("duplicate component name") {
4335 "duplicate_name"
4336 } else if message.contains("duplicate component id") {
4337 "duplicate_id"
4338 } else if message.contains("conflicting refs") {
4339 "conflicting_ref"
4340 } else if message.contains("conflicting bundled paths") {
4341 "conflicting_bundled_path"
4342 } else if message.contains("conflicting wasm_sha256") {
4343 "conflicting_wasm_sha256"
4344 } else if message.contains("missing source_ref") {
4345 "missing_source_ref"
4346 } else if message.contains("marked bundled but bundled_path is missing") {
4347 "missing_bundled_path"
4348 } else if message.contains("missing wasm_sha256") {
4349 "missing_wasm_sha256"
4350 } else if message.contains("tag ref") && message.contains("not bundled") {
4351 "tag_ref_requires_bundle"
4352 } else if message.contains("missing resolved_digest") {
4353 "missing_resolved_digest"
4354 } else if message.contains("invalid component ref") {
4355 "invalid_component_ref"
4356 } else if message.contains("sha256 digest") {
4357 "invalid_sha256"
4358 } else {
4359 "unknown_error"
4360 }
4361 }
4362
4363 fn known_error_codes() -> BTreeSet<&'static str> {
4364 [
4365 "component_sources_error",
4366 "missing_component_sources",
4367 "component_not_found",
4368 "duplicate_name",
4369 "duplicate_id",
4370 "conflicting_ref",
4371 "conflicting_bundled_path",
4372 "conflicting_wasm_sha256",
4373 "missing_source_ref",
4374 "missing_bundled_path",
4375 "missing_wasm_sha256",
4376 "tag_ref_requires_bundle",
4377 "missing_resolved_digest",
4378 "invalid_component_ref",
4379 "invalid_sha256",
4380 "unknown_error",
4381 ]
4382 .into_iter()
4383 .collect()
4384 }
4385
4386 fn proptest_config() -> ProptestConfig {
4387 let cases = std::env::var("PROPTEST_CASES")
4388 .ok()
4389 .and_then(|value| value.parse::<u32>().ok())
4390 .unwrap_or(128);
4391 ProptestConfig {
4392 cases,
4393 failure_persistence: None,
4394 ..ProptestConfig::default()
4395 }
4396 }
4397
4398 fn proptest_seed() -> Option<[u8; 32]> {
4399 let seed = std::env::var("PROPTEST_SEED")
4400 .ok()
4401 .and_then(|value| value.parse::<u64>().ok())?;
4402 let mut bytes = [0u8; 32];
4403 bytes[..8].copy_from_slice(&seed.to_le_bytes());
4404 Some(bytes)
4405 }
4406
4407 fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
4408 let config = ProptestConfig {
4409 cases,
4410 failure_persistence: None,
4411 ..ProptestConfig::default()
4412 };
4413 let mut runner = match seed {
4414 Some(bytes) => {
4415 TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
4416 }
4417 None => TestRunner::new(config),
4418 };
4419 runner
4420 .run(&strategy, |scenario| {
4421 run_scenario(&scenario);
4422 Ok(())
4423 })
4424 .unwrap();
4425 }
4426
4427 fn run_scenario(scenario: &Scenario) {
4428 let known_codes = known_error_codes();
4429 let first = resolve_component_test(
4430 scenario.component_sources.as_ref(),
4431 scenario.pack_lock.as_ref(),
4432 &scenario.request,
4433 );
4434 let second = resolve_component_test(
4435 scenario.component_sources.as_ref(),
4436 scenario.pack_lock.as_ref(),
4437 &scenario.request,
4438 );
4439 assert_eq!(normalize_result(&first), normalize_result(&second));
4440
4441 if let Some(lock) = scenario.pack_lock.as_ref() {
4442 let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
4443 assert_eq!(normalize_result(&first), normalize_result(&lock_only));
4444 }
4445
4446 if let Err(err) = first.as_ref() {
4447 assert!(
4448 known_codes.contains(err.code.as_str()),
4449 "unexpected error code {}: {}",
4450 err.code,
4451 err.message
4452 );
4453 }
4454
4455 if let Some(expected) = scenario.expected_sha256.as_deref() {
4456 let expected_ok =
4457 verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
4458 let actual = compute_sha256_digest_for(&scenario.bytes);
4459 if actual == normalize_sha256(expected).unwrap_or_default() {
4460 assert!(expected_ok, "expected sha256 match to succeed");
4461 } else {
4462 assert!(!expected_ok, "expected sha256 mismatch to fail");
4463 }
4464 }
4465 }
4466
4467 fn normalize_result(
4468 result: &Result<ResolvedComponent, ResolveError>,
4469 ) -> Result<ResolvedComponent, ResolveError> {
4470 match result {
4471 Ok(value) => Ok(value.clone()),
4472 Err(err) => Err(err.clone()),
4473 }
4474 }
4475
4476 fn scenario_strategy() -> impl Strategy<Value = Scenario> {
4477 let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
4478 let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
4479 let tag_ref = any::<bool>();
4480 let bundled = any::<bool>();
4481 let include_sha = any::<bool>();
4482 let include_component_id = any::<bool>();
4483 let request_by_id = any::<bool>();
4484 let use_lock = any::<bool>();
4485 let use_sources = any::<bool>();
4486 let bytes = prop::collection::vec(any::<u8>(), 1..64);
4487
4488 (
4489 name,
4490 alt_name,
4491 tag_ref,
4492 bundled,
4493 include_sha,
4494 include_component_id,
4495 request_by_id,
4496 use_lock,
4497 use_sources,
4498 bytes,
4499 )
4500 .prop_map(
4501 |(
4502 name,
4503 alt_name,
4504 tag_ref,
4505 bundled,
4506 include_sha,
4507 include_component_id,
4508 request_by_id,
4509 use_lock,
4510 use_sources,
4511 bytes,
4512 )| {
4513 let component_id_str = if include_component_id {
4514 alt_name.clone()
4515 } else {
4516 name.clone()
4517 };
4518 let component_id = ComponentId::from_str(&component_id_str).ok();
4519 let source_ref = if tag_ref {
4520 format!("oci://registry.test/{name}:v1")
4521 } else {
4522 format!(
4523 "oci://registry.test/{name}@sha256:{}",
4524 hex::encode([0x11u8; 32])
4525 )
4526 };
4527 let expected_sha256 = if bundled && include_sha {
4528 Some(compute_sha256_digest_for(&bytes))
4529 } else {
4530 None
4531 };
4532
4533 let lock_component = PackLockComponent {
4534 name: name.clone(),
4535 source_ref: Some(source_ref),
4536 legacy_ref: None,
4537 component_id,
4538 bundled: Some(bundled),
4539 bundled_path: if bundled {
4540 Some(format!("components/{name}.wasm"))
4541 } else {
4542 None
4543 },
4544 legacy_path: None,
4545 wasm_sha256: expected_sha256.clone(),
4546 legacy_sha256: None,
4547 resolved_digest: if bundled {
4548 None
4549 } else {
4550 Some("sha256:deadbeef".to_string())
4551 },
4552 digest: None,
4553 };
4554
4555 let pack_lock = if use_lock {
4556 Some(PackLockV1 {
4557 schema_version: 1,
4558 components: vec![lock_component],
4559 })
4560 } else {
4561 None
4562 };
4563
4564 let component_sources = if use_sources {
4565 Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
4566 name: name.clone(),
4567 component_id: ComponentId::from_str(&name).ok(),
4568 source: ComponentSourceRef::from_str(
4569 "oci://registry.test/component@sha256:deadbeef",
4570 )
4571 .expect("component ref"),
4572 resolved: ResolvedComponentV1 {
4573 digest: "sha256:deadbeef".to_string(),
4574 signature: None,
4575 signed_by: None,
4576 },
4577 artifact: if bundled {
4578 ArtifactLocationV1::Inline {
4579 wasm_path: format!("components/{name}.wasm"),
4580 manifest_path: None,
4581 }
4582 } else {
4583 ArtifactLocationV1::Remote
4584 },
4585 licensing_hint: None,
4586 metering_hint: None,
4587 }]))
4588 } else {
4589 None
4590 };
4591
4592 let request = if request_by_id {
4593 ResolveRequest::ById(component_id_str.clone())
4594 } else {
4595 ResolveRequest::ByName(name.clone())
4596 };
4597
4598 Scenario {
4599 pack_lock,
4600 component_sources,
4601 request,
4602 expected_sha256,
4603 bytes,
4604 }
4605 },
4606 )
4607 }
4608
4609 #[test]
4610 fn pack_resolution_proptest() {
4611 let seed = proptest_seed();
4612 run_cases(scenario_strategy(), proptest_config().cases, seed);
4613 }
4614
4615 #[test]
4616 fn pack_resolution_regression_seeds() {
4617 let seeds_path =
4618 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
4619 let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
4620 for line in raw.lines() {
4621 let line = line.trim();
4622 if line.is_empty() || line.starts_with('#') {
4623 continue;
4624 }
4625 let seed = line.parse::<u64>().expect("seed must be an integer");
4626 let mut bytes = [0u8; 32];
4627 bytes[..8].copy_from_slice(&seed.to_le_bytes());
4628 run_cases(scenario_strategy(), 1, Some(bytes));
4629 }
4630 }
4631}
4632
4633fn locate_pack_assets(
4634 materialized_root: Option<&Path>,
4635 archive_hint: Option<&Path>,
4636) -> Result<(Option<PathBuf>, Option<TempDir>)> {
4637 if let Some(root) = materialized_root {
4638 let assets = root.join("assets");
4639 if assets.is_dir() {
4640 return Ok((Some(assets), None));
4641 }
4642 }
4643 if let Some(path) = archive_hint
4644 && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
4645 {
4646 return Ok((Some(assets), Some(tempdir)));
4647 }
4648 Ok((None, None))
4649}
4650
4651fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
4652 let file =
4653 File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
4654 let mut archive =
4655 ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
4656 let temp = TempDir::new().context("failed to create temporary assets directory")?;
4657 let mut found = false;
4658 for idx in 0..archive.len() {
4659 let mut entry = archive.by_index(idx)?;
4660 let name = entry.name();
4661 if !name.starts_with("assets/") {
4662 continue;
4663 }
4664 let dest = temp.path().join(name);
4665 if name.ends_with('/') {
4666 std::fs::create_dir_all(&dest)?;
4667 found = true;
4668 continue;
4669 }
4670 if let Some(parent) = dest.parent() {
4671 std::fs::create_dir_all(parent)?;
4672 }
4673 let mut outfile = std::fs::File::create(&dest)?;
4674 std::io::copy(&mut entry, &mut outfile)?;
4675 found = true;
4676 }
4677 if found {
4678 let assets_path = temp.path().join("assets");
4679 Ok(Some((temp, assets_path)))
4680 } else {
4681 Ok(None)
4682 }
4683}
4684
4685fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
4686 let mut opts = DistOptions {
4687 allow_tags: true,
4688 ..DistOptions::default()
4689 };
4690 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
4691 opts.cache_dir = cache_dir;
4692 }
4693 if component_resolution.dist_offline {
4694 opts.offline = true;
4695 }
4696 opts
4697}
4698
4699#[allow(clippy::too_many_arguments)]
4700async fn load_components_from_sources(
4701 cache: &CacheManager,
4702 engine: &Engine,
4703 component_sources: &HashMap<String, ComponentSourceInfo>,
4704 component_resolution: &ComponentResolution,
4705 specs: &[ComponentSpec],
4706 missing: &mut HashSet<String>,
4707 into: &mut HashMap<String, PackComponent>,
4708 materialized_root: Option<&Path>,
4709 archive_hint: Option<&Path>,
4710) -> Result<()> {
4711 let mut archive = if let Some(path) = archive_hint {
4712 Some(
4713 ZipArchive::new(File::open(path)?)
4714 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
4715 )
4716 } else {
4717 None
4718 };
4719 let mut dist_client: Option<DistClient> = None;
4720
4721 for spec in specs {
4722 if !missing.contains(&spec.id) {
4723 continue;
4724 }
4725 let Some(source) = component_sources.get(&spec.id) else {
4726 continue;
4727 };
4728
4729 let bytes = match &source.artifact {
4730 ComponentArtifactLocation::Inline { wasm_path } => {
4731 if let Some(root) = materialized_root {
4732 let path = root.join(wasm_path);
4733 if path.exists() {
4734 std::fs::read(&path).with_context(|| {
4735 format!(
4736 "failed to read inline component {} from {}",
4737 spec.id,
4738 path.display()
4739 )
4740 })?
4741 } else if archive.is_none() {
4742 bail!("inline component {} missing at {}", spec.id, path.display());
4743 } else {
4744 read_entry(
4745 archive.as_mut().expect("archive present when needed"),
4746 wasm_path,
4747 )
4748 .with_context(|| {
4749 format!(
4750 "inline component {} missing at {} in pack archive",
4751 spec.id, wasm_path
4752 )
4753 })?
4754 }
4755 } else if let Some(archive) = archive.as_mut() {
4756 read_entry(archive, wasm_path).with_context(|| {
4757 format!(
4758 "inline component {} missing at {} in pack archive",
4759 spec.id, wasm_path
4760 )
4761 })?
4762 } else {
4763 bail!(
4764 "inline component {} missing and no pack source available",
4765 spec.id
4766 );
4767 }
4768 }
4769 ComponentArtifactLocation::Remote => {
4770 if source.source.is_tag() {
4771 bail!(
4772 "component {} uses tag ref {} but is not bundled; rebuild the pack",
4773 spec.id,
4774 source.source
4775 );
4776 }
4777 let client = dist_client.get_or_insert_with(|| {
4778 DistClient::new(dist_options_from(component_resolution))
4779 });
4780 let reference = source.source.to_string();
4781 fault::maybe_fail_asset(&reference)
4782 .await
4783 .with_context(|| format!("fault injection blocked asset {reference}"))?;
4784 let digest = source.digest.as_deref().ok_or_else(|| {
4785 anyhow!(
4786 "component {} missing expected digest for remote component",
4787 spec.id
4788 )
4789 })?;
4790 let cache_path = if let Ok(cache_path) = client.fetch_digest(digest).await {
4791 cache_path
4792 } else if component_resolution.dist_offline {
4793 client
4794 .fetch_digest(digest)
4795 .await
4796 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
4797 } else {
4798 let source = client
4799 .parse_source(&reference)
4800 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4801 let descriptor = client
4802 .resolve(source, ResolvePolicy)
4803 .await
4804 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4805 let resolved = client
4806 .fetch(&descriptor, CachePolicy)
4807 .await
4808 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4809 let expected = normalize_digest(digest);
4810 let actual = normalize_digest(&resolved.digest);
4811 if expected != actual {
4812 bail!(
4813 "component {} digest mismatch after fetch: expected {}, got {}",
4814 spec.id,
4815 expected,
4816 actual
4817 );
4818 }
4819 resolved.cache_path.ok_or_else(|| {
4820 anyhow!(
4821 "component {} resolved from {} but cache path is missing",
4822 spec.id,
4823 reference
4824 )
4825 })?
4826 };
4827 std::fs::read(&cache_path).with_context(|| {
4828 format!(
4829 "failed to read cached component {} from {}",
4830 spec.id,
4831 cache_path.display()
4832 )
4833 })?
4834 }
4835 };
4836
4837 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
4838 verify_wasm_sha256(&spec.id, expected, &bytes)?;
4839 } else if source.skip_digest_verification {
4840 let actual = compute_sha256_digest_for(&bytes);
4841 warn!(
4842 component_id = %spec.id,
4843 digest = %actual,
4844 "bundled component missing wasm_sha256; allowing due to flag"
4845 );
4846 } else {
4847 let expected = source.digest.as_deref().ok_or_else(|| {
4848 anyhow!(
4849 "component {} missing expected digest for verification",
4850 spec.id
4851 )
4852 })?;
4853 verify_component_digest(&spec.id, expected, &bytes)?;
4854 }
4855 let component =
4856 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
4857 .await
4858 .with_context(|| format!("failed to compile component {}", spec.id))?;
4859 into.insert(
4860 spec.id.clone(),
4861 PackComponent {
4862 name: spec.id.clone(),
4863 version: spec.version.clone(),
4864 component,
4865 },
4866 );
4867 missing.remove(&spec.id);
4868 }
4869
4870 Ok(())
4871}
4872
4873fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
4874 match err {
4875 DistError::NotFound { reference: missing } => anyhow!(
4876 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4877 component_id,
4878 missing,
4879 reference
4880 ),
4881 DistError::Offline { reference: blocked } => anyhow!(
4882 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4883 component_id,
4884 blocked,
4885 reference
4886 ),
4887 DistError::Unauthorized { target } => anyhow!(
4888 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4889 component_id,
4890 target,
4891 reference
4892 ),
4893 other => anyhow!(
4894 "failed to resolve component {} from {}: {}",
4895 component_id,
4896 reference,
4897 other
4898 ),
4899 }
4900}
4901
4902async fn load_components_from_overrides(
4903 cache: &CacheManager,
4904 engine: &Engine,
4905 overrides: &HashMap<String, PathBuf>,
4906 specs: &[ComponentSpec],
4907 missing: &mut HashSet<String>,
4908 into: &mut HashMap<String, PackComponent>,
4909) -> Result<()> {
4910 for spec in specs {
4911 if !missing.contains(&spec.id) {
4912 continue;
4913 }
4914 let Some(path) = overrides.get(&spec.id) else {
4915 continue;
4916 };
4917 let bytes = std::fs::read(path)
4918 .with_context(|| format!("failed to read override component {}", path.display()))?;
4919 let component = compile_component_with_cache(cache, engine, None, bytes)
4920 .await
4921 .with_context(|| {
4922 format!(
4923 "failed to compile component {} from override {}",
4924 spec.id,
4925 path.display()
4926 )
4927 })?;
4928 into.insert(
4929 spec.id.clone(),
4930 PackComponent {
4931 name: spec.id.clone(),
4932 version: spec.version.clone(),
4933 component,
4934 },
4935 );
4936 missing.remove(&spec.id);
4937 }
4938 Ok(())
4939}
4940
4941async fn load_components_from_dir(
4942 cache: &CacheManager,
4943 engine: &Engine,
4944 root: &Path,
4945 specs: &[ComponentSpec],
4946 missing: &mut HashSet<String>,
4947 into: &mut HashMap<String, PackComponent>,
4948) -> Result<()> {
4949 for spec in specs {
4950 if !missing.contains(&spec.id) {
4951 continue;
4952 }
4953 let path = component_path_for_spec(root, spec);
4954 if !path.exists() {
4955 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
4956 continue;
4957 }
4958 let bytes = std::fs::read(&path)
4959 .with_context(|| format!("failed to read component {}", path.display()))?;
4960 let component = compile_component_with_cache(cache, engine, None, bytes)
4961 .await
4962 .with_context(|| {
4963 format!(
4964 "failed to compile component {} from {}",
4965 spec.id,
4966 path.display()
4967 )
4968 })?;
4969 into.insert(
4970 spec.id.clone(),
4971 PackComponent {
4972 name: spec.id.clone(),
4973 version: spec.version.clone(),
4974 component,
4975 },
4976 );
4977 missing.remove(&spec.id);
4978 }
4979 Ok(())
4980}
4981
4982async fn load_components_from_archive(
4983 cache: &CacheManager,
4984 engine: &Engine,
4985 path: &Path,
4986 specs: &[ComponentSpec],
4987 missing: &mut HashSet<String>,
4988 into: &mut HashMap<String, PackComponent>,
4989) -> Result<()> {
4990 let mut archive = ZipArchive::new(File::open(path)?)
4991 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
4992 for spec in specs {
4993 if !missing.contains(&spec.id) {
4994 continue;
4995 }
4996 let file_name = spec
4997 .legacy_path
4998 .clone()
4999 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
5000 let bytes = match read_entry(&mut archive, &file_name) {
5001 Ok(bytes) => bytes,
5002 Err(err) => {
5003 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
5004 continue;
5005 }
5006 };
5007 let component = compile_component_with_cache(cache, engine, None, bytes)
5008 .await
5009 .with_context(|| format!("failed to compile component {}", spec.id))?;
5010 into.insert(
5011 spec.id.clone(),
5012 PackComponent {
5013 name: spec.id.clone(),
5014 version: spec.version.clone(),
5015 component,
5016 },
5017 );
5018 missing.remove(&spec.id);
5019 }
5020 Ok(())
5021}
5022
5023#[cfg(test)]
5024mod tests {
5025 use super::*;
5026 use greentic_flow::model::{FlowDoc, NodeDoc};
5027 use indexmap::IndexMap;
5028 use serde_json::json;
5029
5030 #[test]
5031 fn normalizes_raw_component_to_component_exec() {
5032 let mut nodes = IndexMap::new();
5033 let mut raw = IndexMap::new();
5034 raw.insert(
5035 "templating.handlebars".into(),
5036 json!({ "template": "Hi {{name}}" }),
5037 );
5038 nodes.insert(
5039 "start".into(),
5040 NodeDoc {
5041 raw,
5042 routing: json!([{"out": true}]),
5043 ..Default::default()
5044 },
5045 );
5046 let doc = FlowDoc {
5047 id: "welcome".into(),
5048 title: None,
5049 description: None,
5050 flow_type: "messaging".into(),
5051 start: Some("start".into()),
5052 parameters: json!({}),
5053 tags: Vec::new(),
5054 schema_version: None,
5055 entrypoints: IndexMap::new(),
5056 meta: None,
5057 slot_schema: None,
5058 nodes,
5059 };
5060
5061 let normalized = normalize_flow_doc(doc);
5062 let node = normalized.nodes.get("start").expect("node exists");
5063 assert_eq!(node.operation.as_deref(), Some("component.exec"));
5064 assert!(node.raw.is_empty());
5065 let payload = node.payload.as_object().expect("payload object");
5066 assert_eq!(
5067 payload.get("component"),
5068 Some(&Value::String("templating.handlebars".into()))
5069 );
5070 assert_eq!(
5071 payload.get("operation"),
5072 Some(&Value::String("render".into()))
5073 );
5074 let input = payload.get("input").unwrap();
5075 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
5076 }
5077
5078 #[test]
5079 fn normalizes_canonical_operation_node_to_component_exec_with_config() {
5080 let mut nodes = IndexMap::new();
5081 let mut raw = IndexMap::new();
5082 raw.insert(
5083 "handle_message".into(),
5084 json!({
5085 "component": "oci://ghcr.io/greenticai/component/component-llm-openai:stable",
5086 "config": {
5087 "provider": "ollama",
5088 "base_url": "http://127.0.0.1:11434/v1",
5089 "default_model": "llama3.2"
5090 },
5091 "input": {
5092 "messages": [{
5093 "role": "user",
5094 "content": "Say hello from Ollama."
5095 }]
5096 }
5097 }),
5098 );
5099 nodes.insert(
5100 "llm".into(),
5101 NodeDoc {
5102 raw,
5103 routing: json!([{"out": true}]),
5104 ..Default::default()
5105 },
5106 );
5107 let doc = FlowDoc {
5108 id: "ollama-repro".into(),
5109 title: None,
5110 description: None,
5111 flow_type: "messaging".into(),
5112 start: Some("llm".into()),
5113 parameters: json!({}),
5114 tags: Vec::new(),
5115 schema_version: None,
5116 entrypoints: IndexMap::new(),
5117 meta: None,
5118 slot_schema: None,
5119 nodes,
5120 };
5121
5122 let normalized = normalize_flow_doc(doc);
5123 let node = normalized.nodes.get("llm").expect("node exists");
5124 assert_eq!(node.operation.as_deref(), Some("component.exec"));
5125 assert!(node.raw.is_empty());
5126 let payload = node.payload.as_object().expect("payload object");
5127 assert_eq!(
5128 payload.get("component"),
5129 Some(&Value::String(
5130 "oci://ghcr.io/greenticai/component/component-llm-openai:stable".into()
5131 ))
5132 );
5133 assert_eq!(
5134 payload.get("operation"),
5135 Some(&Value::String("handle_message".into()))
5136 );
5137 assert_eq!(
5138 payload.get("config"),
5139 Some(&json!({
5140 "provider": "ollama",
5141 "base_url": "http://127.0.0.1:11434/v1",
5142 "default_model": "llama3.2"
5143 }))
5144 );
5145 assert_eq!(
5146 payload.get("input"),
5147 Some(&json!({
5148 "messages": [{
5149 "role": "user",
5150 "content": "Say hello from Ollama."
5151 }]
5152 }))
5153 );
5154 }
5155
5156 #[test]
5157 fn missing_export_error_detection_recognises_bindgen_shapes() {
5158 assert!(is_missing_export_error(
5160 "instantiation: no exported instance named \
5161 `greentic:provider-instance-identity/instance-identity-api@0.1.0`"
5162 ));
5163 assert!(is_missing_export_error(
5165 "instantiation: no exported function named `identify-instance`"
5166 ));
5167 assert!(!is_missing_export_error(
5169 "Wasm trap: out of bounds memory access"
5170 ));
5171 assert!(!is_missing_export_error(
5174 "instantiation: no exported instance named \
5175 `greentic:provider-schema-core/schema-core-api@1.0.0`"
5176 ));
5177 assert!(!is_missing_export_error(
5179 "instantiation: no exported function named `invoke`"
5180 ));
5181 }
5182
5183 #[test]
5184 fn identify_outcome_merge_in_follows_lattice() {
5185 let unsupported = || IdentifyOutcome::Unsupported;
5186 let no_match = || IdentifyOutcome::NoMatch;
5187 let id_a = || IdentifyOutcome::Identified("a".to_string());
5188 let id_b = || IdentifyOutcome::Identified("b".to_string());
5189
5190 let mut x = unsupported();
5192 x.merge_in(unsupported());
5193 assert_eq!(x, unsupported());
5194 let mut x = unsupported();
5195 x.merge_in(no_match());
5196 assert_eq!(x, no_match());
5197 let mut x = unsupported();
5198 x.merge_in(id_a());
5199 assert_eq!(x, id_a());
5200
5201 let mut x = no_match();
5203 x.merge_in(unsupported());
5204 assert_eq!(x, no_match(), "NoMatch must not downgrade to Unsupported");
5205 let mut x = no_match();
5206 x.merge_in(no_match());
5207 assert_eq!(x, no_match());
5208 let mut x = no_match();
5209 x.merge_in(id_a());
5210 assert_eq!(x, id_a(), "Identified must override NoMatch");
5211
5212 let mut x = id_a();
5214 x.merge_in(unsupported());
5215 assert_eq!(x, id_a());
5216 let mut x = id_a();
5217 x.merge_in(no_match());
5218 assert_eq!(x, id_a());
5219 let mut x = id_a();
5220 x.merge_in(id_b());
5221 assert_eq!(
5222 x,
5223 id_a(),
5224 "first Identified wins; later id does not replace"
5225 );
5226 }
5227}
5228
5229#[cfg(test)]
5230mod identify_endpoints_pack_tests {
5231 use super::*;
5232 use crate::config::{
5233 FlowRetryConfig, HostConfig, OperatorPolicy, RateLimits, SecretsPolicy, StateStorePolicy,
5234 WebhookPolicy,
5235 };
5236 use crate::trace::TraceConfig;
5237 use crate::validate::ValidationConfig;
5238
5239 fn test_host_config() -> HostConfig {
5240 HostConfig {
5241 tenant: "test".to_string(),
5242 bindings_path: PathBuf::from("/tmp/bindings.yaml"),
5243 flow_type_bindings: HashMap::new(),
5244 rate_limits: RateLimits::default(),
5245 retry: FlowRetryConfig::default(),
5246 http_enabled: false,
5247 secrets_policy: SecretsPolicy::allow_all(),
5248 state_store_policy: StateStorePolicy::default(),
5249 webhook_policy: WebhookPolicy::default(),
5250 timers: Vec::new(),
5251 oauth: None,
5252 mocks: None,
5253 pack_bindings: Vec::new(),
5254 env_passthrough: Vec::new(),
5255 trace: TraceConfig::from_env(),
5256 validation: ValidationConfig::from_env(),
5257 operator_policy: OperatorPolicy::allow_all(),
5258 fast2flow: Default::default(),
5259 }
5260 }
5261
5262 #[tokio::test]
5263 async fn no_manifest_returns_unsupported_for_all_types() {
5264 let pack = PackRuntime::for_component_test(
5270 Vec::new(),
5271 HashMap::new(),
5272 "test-pack",
5273 Arc::new(test_host_config()),
5274 )
5275 .expect("empty pack construction");
5276 let result = pack
5277 .identify_endpoints_by_provider_type(&["teams", "slack", "telegram"], b"{}")
5278 .await
5279 .expect("no-manifest path must succeed");
5280 assert_eq!(result.len(), 3);
5281 for ty in &["teams", "slack", "telegram"] {
5282 assert_eq!(
5283 result.get(*ty),
5284 Some(&IdentifyOutcome::Unsupported),
5285 "type '{ty}' must be Unsupported when pack has no manifest"
5286 );
5287 }
5288 }
5289
5290 #[tokio::test]
5291 async fn empty_provider_types_returns_empty_map() {
5292 let pack = PackRuntime::for_component_test(
5293 Vec::new(),
5294 HashMap::new(),
5295 "test-pack",
5296 Arc::new(test_host_config()),
5297 )
5298 .expect("empty pack construction");
5299 let result = pack
5300 .identify_endpoints_by_provider_type(&[], b"{}")
5301 .await
5302 .expect("empty types fast path");
5303 assert!(result.is_empty());
5304 }
5305}
5306
5307#[derive(Clone, Debug, Default, Serialize, Deserialize)]
5308pub struct PackMetadata {
5309 pub pack_id: String,
5310 pub version: String,
5311 #[serde(default)]
5312 pub entry_flows: Vec<String>,
5313 #[serde(default)]
5314 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
5315}
5316
5317impl PackMetadata {
5318 fn from_wasm(bytes: &[u8]) -> Option<Self> {
5319 let parser = Parser::new(0);
5320 for payload in parser.parse_all(bytes) {
5321 let payload = payload.ok()?;
5322 match payload {
5323 Payload::CustomSection(section) => {
5324 if section.name() == "greentic.manifest"
5325 && let Ok(meta) = Self::from_bytes(section.data())
5326 {
5327 return Some(meta);
5328 }
5329 }
5330 Payload::DataSection(reader) => {
5331 for segment in reader.into_iter().flatten() {
5332 if let Ok(meta) = Self::from_bytes(segment.data) {
5333 return Some(meta);
5334 }
5335 }
5336 }
5337 _ => {}
5338 }
5339 }
5340 None
5341 }
5342
5343 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
5344 #[derive(Deserialize)]
5345 struct RawManifest {
5346 pack_id: String,
5347 version: String,
5348 #[serde(default)]
5349 entry_flows: Vec<String>,
5350 #[serde(default)]
5351 flows: Vec<RawFlow>,
5352 #[serde(default)]
5353 secret_requirements: Vec<greentic_types::SecretRequirement>,
5354 }
5355
5356 #[derive(Deserialize)]
5357 struct RawFlow {
5358 id: String,
5359 }
5360
5361 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
5362 let mut entry_flows = if manifest.entry_flows.is_empty() {
5363 manifest.flows.iter().map(|f| f.id.clone()).collect()
5364 } else {
5365 manifest.entry_flows.clone()
5366 };
5367 entry_flows.retain(|id| !id.is_empty());
5368 Ok(Self {
5369 pack_id: manifest.pack_id,
5370 version: manifest.version,
5371 entry_flows,
5372 secret_requirements: manifest.secret_requirements,
5373 })
5374 }
5375
5376 pub fn fallback(path: &Path) -> Self {
5377 let pack_id = path
5378 .file_stem()
5379 .map(|s| s.to_string_lossy().into_owned())
5380 .unwrap_or_else(|| "unknown-pack".to_string());
5381 Self {
5382 pack_id,
5383 version: "0.0.0".to_string(),
5384 entry_flows: Vec::new(),
5385 secret_requirements: Vec::new(),
5386 }
5387 }
5388
5389 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
5390 let entry_flows = manifest
5391 .flows
5392 .iter()
5393 .map(|flow| flow.id.as_str().to_string())
5394 .collect::<Vec<_>>();
5395 Self {
5396 pack_id: manifest.pack_id.as_str().to_string(),
5397 version: manifest.version.to_string(),
5398 entry_flows,
5399 secret_requirements: manifest.secret_requirements.clone(),
5400 }
5401 }
5402}