1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::identify_hint::IdentifyInstanceHint;
14use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
15use crate::provider::{ProviderBinding, ProviderRegistry};
16use crate::provider_core::{
17 schema_core::SchemaCorePre as LegacySchemaCorePre,
18 schema_core_path::SchemaCorePre as PathSchemaCorePre,
19 schema_core_schema::SchemaCorePre as SchemaSchemaCorePre,
20};
21use crate::provider_core_only;
22use crate::runtime_refs::RuntimeRefsInjection;
23use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
24use anyhow::{Context, Result, anyhow, bail};
25use futures::executor::block_on;
26use greentic_distributor_client::dist::{
27 CachePolicy, DistClient, DistError, DistOptions, ResolvePolicy,
28};
29use greentic_interfaces_wasmtime::host_helpers::v1::{
30 self as host_v1, HostFns, add_all_v1_to_linker,
31 runner_host_http::RunnerHostHttp,
32 runner_host_kv::RunnerHostKv,
33 runtime_config::{ConfigError, RuntimeConfigHost},
34 secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
35 state_store::{
36 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
37 StateStoreHost, TenantCtx as StateTenantCtx,
38 },
39 telemetry_logger::{
40 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
41 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
42 TenantCtx as TelemetryTenantCtx,
43 },
44};
45use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::http::http_client as http_client_client_alias;
46use greentic_interfaces_wasmtime::instance_identity_instance_identity_describe_v0_1::InstanceIdentityDescribePre;
47use greentic_interfaces_wasmtime::instance_identity_v0_1::InstanceIdentityPre;
48use greentic_interfaces_wasmtime::{
49 http_client_client_v1_0::greentic::interfaces_types::types as http_types_v1_0,
50 http_client_client_v1_1::greentic::interfaces_types::types as http_types_v1_1,
51};
52use greentic_pack::builder as legacy_pack;
53use greentic_types::flow::FlowHasher;
54use greentic_types::{
55 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
56 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
57 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
58 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
59 pack_manifest::ExtensionInline,
60};
61use host_v1::http_client as host_http_client;
62use host_v1::http_client::{
63 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
64 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
65 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
66 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
67};
68use indexmap::IndexMap;
69use once_cell::sync::Lazy;
70use parking_lot::{Mutex, RwLock};
71use reqwest::blocking::Client as BlockingClient;
72use runner_core::normalize_under_root;
73use serde::{Deserialize, Serialize};
74use serde_cbor;
75use serde_json::{self, Value};
76use sha2::Digest;
77use tempfile::TempDir;
78use tokio::fs;
79use wasmparser::{Parser, Payload};
80use wasmtime::{Store, StoreContextMut};
81use wasmtime_wasi_http::WasiHttpCtx;
82use wasmtime_wasi_http::p2::{
83 WasiHttpCtxView, WasiHttpView, add_only_http_to_linker_sync as add_wasi_http_to_linker,
84};
85use wasmtime_wasi_tls::p2::LinkOptions;
86use wasmtime_wasi_tls::{WasiTlsCtx, WasiTlsCtxBuilder, WasiTlsCtxView, WasiTlsView};
87use zip::ZipArchive;
88
89use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
90use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
91use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
92#[cfg(feature = "fault-injection")]
93use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
94
95use crate::config::HostConfig;
96use crate::fault;
97use crate::secrets::{
98 DynSecretsManager, canonicalize_secret_key, read_secret_blocking, write_secret_blocking,
99};
100use crate::storage::state::STATE_PREFIX;
101use crate::storage::{DynSessionStore, DynStateStore};
102use crate::verify;
103use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
104use tracing::warn;
105use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
106use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
107
108use greentic_flow::model::FlowDoc;
109
110#[allow(dead_code)]
111pub struct PackRuntime {
112 path: PathBuf,
114 archive_path: Option<PathBuf>,
116 config: Arc<HostConfig>,
117 engine: Engine,
118 metadata: PackMetadata,
119 manifest: Option<greentic_types::PackManifest>,
120 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
121 component_manifests: HashMap<String, ComponentManifest>,
122 mocks: Option<Arc<MockLayer>>,
123 flows: Option<PackFlows>,
124 components: HashMap<String, PackComponent>,
125 http_client: Arc<BlockingClient>,
126 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
127 session_store: Option<DynSessionStore>,
128 state_store: Option<DynStateStore>,
129 wasi_policy: Arc<RunnerWasiPolicy>,
130 assets_tempdir: Option<TempDir>,
131 provider_registry: RwLock<Option<ProviderRegistry>>,
132 identify_hint_cache: RwLock<HashMap<String, Option<IdentifyInstanceHint>>>,
140 secrets: DynSecretsManager,
141 oauth_config: Option<OAuthBrokerConfig>,
142 cache: CacheManager,
143 runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
149 runtime_refs: Option<RuntimeRefsInjection>,
156}
157
158struct PackComponent {
159 #[allow(dead_code)]
160 name: String,
161 #[allow(dead_code)]
162 version: String,
163 component: Arc<Component>,
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
170pub enum IdentifyOutcome {
171 Unsupported,
174 NoMatch,
177 Identified(String),
180}
181
182impl IdentifyOutcome {
183 pub fn merge_in(&mut self, other: IdentifyOutcome) {
188 match (&*self, &other) {
189 (IdentifyOutcome::Identified(_), _) => {}
191 (_, IdentifyOutcome::Identified(_)) => *self = other,
193 (IdentifyOutcome::Unsupported, IdentifyOutcome::NoMatch) => *self = other,
195 _ => {}
196 }
197 }
198}
199
200fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
201where
202 F: FnOnce() -> Result<T> + Send + 'static,
203 T: Send + 'static,
204{
205 let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
206 let handle = builder
207 .spawn(move || {
208 let pid = std::process::id();
209 let thread_id = std::thread::current().id();
210 let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
211 tracing::info!(
212 event = "wasmtime.thread.start",
213 task = task_name,
214 pid,
215 thread_id = ?thread_id,
216 tokio_handle_present,
217 "starting Wasmtime thread"
218 );
219 task()
220 })
221 .context("failed to spawn Wasmtime thread")?;
222 handle
223 .join()
224 .map_err(|err| {
225 let reason = if let Some(msg) = err.downcast_ref::<&str>() {
226 msg.to_string()
227 } else if let Some(msg) = err.downcast_ref::<String>() {
228 msg.clone()
229 } else {
230 "unknown panic".to_string()
231 };
232 anyhow!("Wasmtime thread panicked: {reason}")
233 })
234 .and_then(|res| res)
235}
236
237#[derive(Debug, Default, Clone)]
238pub struct ComponentResolution {
239 pub materialized_root: Option<PathBuf>,
241 pub overrides: HashMap<String, PathBuf>,
243 pub dist_offline: bool,
245 pub dist_cache_dir: Option<PathBuf>,
247 pub allow_missing_hash: bool,
249}
250
251fn build_blocking_client() -> BlockingClient {
252 std::thread::spawn(|| {
253 BlockingClient::builder()
254 .no_proxy()
255 .build()
256 .expect("blocking client")
257 })
258 .join()
259 .expect("client build thread panicked")
260}
261
262fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
263 let (root, candidate) = if path.is_absolute() {
264 let parent = path
265 .parent()
266 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
267 let root = parent
268 .canonicalize()
269 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
270 let file = path
271 .file_name()
272 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
273 (root, PathBuf::from(file))
274 } else {
275 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
276 let base = if let Some(parent) = path.parent() {
277 cwd.join(parent)
278 } else {
279 cwd
280 };
281 let root = base
282 .canonicalize()
283 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
284 let file = path
285 .file_name()
286 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
287 (root, PathBuf::from(file))
288 };
289 let safe = normalize_under_root(&root, &candidate)?;
290 Ok((root, safe))
291}
292
293static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct FlowDescriptor {
297 pub id: String,
298 #[serde(rename = "type")]
299 pub flow_type: String,
300 pub pack_id: String,
301 pub profile: String,
302 pub version: String,
303 #[serde(default)]
304 pub description: Option<String>,
305}
306
307pub struct HostState {
308 #[allow(dead_code)]
309 pack_id: String,
310 config: Arc<HostConfig>,
311 http_client: Arc<BlockingClient>,
312 default_env: String,
313 #[allow(dead_code)]
314 session_store: Option<DynSessionStore>,
315 state_store: Option<DynStateStore>,
316 mocks: Option<Arc<MockLayer>>,
317 secrets: DynSecretsManager,
318 oauth_config: Option<OAuthBrokerConfig>,
319 oauth_host: OAuthBrokerHost,
320 exec_ctx: Option<ComponentExecCtx>,
321 component_ref: Option<String>,
322 provider_core_component: bool,
323 runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
329 runtime_refs: Option<RuntimeRefsInjection>,
333}
334
335impl HostState {
336 #[allow(clippy::default_constructed_unit_structs)]
337 #[allow(clippy::too_many_arguments)]
338 pub fn new(
339 pack_id: String,
340 config: Arc<HostConfig>,
341 http_client: Arc<BlockingClient>,
342 mocks: Option<Arc<MockLayer>>,
343 session_store: Option<DynSessionStore>,
344 state_store: Option<DynStateStore>,
345 secrets: DynSecretsManager,
346 oauth_config: Option<OAuthBrokerConfig>,
347 exec_ctx: Option<ComponentExecCtx>,
348 component_ref: Option<String>,
349 provider_core_component: bool,
350 runtime_config_non_secret: Option<Arc<BTreeMap<String, Value>>>,
351 runtime_refs: Option<RuntimeRefsInjection>,
352 ) -> Result<Self> {
353 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
354 Ok(Self {
355 pack_id,
356 config,
357 http_client,
358 default_env,
359 session_store,
360 state_store,
361 mocks,
362 secrets,
363 oauth_config,
364 oauth_host: OAuthBrokerHost::default(),
365 exec_ctx,
366 component_ref,
367 provider_core_component,
368 runtime_config_non_secret,
369 runtime_refs,
370 })
371 }
372
373 fn instantiate_component_result(
374 linker: &mut Linker<ComponentState>,
375 store: &mut Store<ComponentState>,
376 component: &Component,
377 ctx: &ComponentExecCtx,
378 component_ref: &str,
379 operation: &str,
380 input_json: &str,
381 ) -> Result<InvokeResult> {
382 let pre_instance = linker.instantiate_pre(component)?;
383 match component_api::v0_6::ComponentPre::new(pre_instance) {
384 Ok(pre) => {
385 let envelope = component_api::envelope_v0_6(ctx, component_ref, input_json)?;
386 let operation_owned = operation.to_string();
387 let result = block_on(async {
388 let bindings = pre.instantiate_async(&mut *store).await?;
389 let node = bindings.greentic_component_node();
390 node.call_invoke(&mut *store, &operation_owned, &envelope)
391 })?;
392 component_api::invoke_result_from_v0_6(result)
393 }
394 Err(err_v06) => {
395 if !is_missing_node_export(&err_v06, "0.6.0") {
396 return Err(err_v06.into());
397 }
398 let pre_instance = linker.instantiate_pre(component)?;
399 match component_api::v0_5::ComponentPre::new(pre_instance) {
400 Ok(pre) => {
401 let result = block_on(async {
402 let bindings = pre.instantiate_async(&mut *store).await?;
403 let node = bindings.greentic_component_node();
404 let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
405 let operation_owned = operation.to_string();
406 let input_owned = input_json.to_string();
407 node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
408 })?;
409 Ok(component_api::invoke_result_from_v0_5(result))
410 }
411 Err(err) => {
412 if !is_missing_node_export(&err, "0.5.0") {
413 return Err(err.into());
414 }
415 let pre_instance = linker.instantiate_pre(component)?;
416 match component_api::v0_4::ComponentPre::new(pre_instance) {
417 Ok(pre) => {
418 let result = block_on(async {
419 let bindings = pre.instantiate_async(&mut *store).await?;
420 let node = bindings.greentic_component_node();
421 let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
422 let operation_owned = operation.to_string();
423 let input_owned = input_json.to_string();
424 node.call_invoke(
425 &mut *store,
426 &ctx_v04,
427 &operation_owned,
428 &input_owned,
429 )
430 })?;
431 Ok(component_api::invoke_result_from_v0_4(result))
432 }
433 Err(err_v04) => {
434 if is_missing_node_export(&err_v04, "0.4.0") {
435 Self::try_v06_runtime(linker, store, component, input_json)
436 } else {
437 Err(err_v04.into())
438 }
439 }
440 }
441 }
442 }
443 }
444 }
445 }
446
447 fn try_v06_runtime(
450 linker: &mut Linker<ComponentState>,
451 store: &mut Store<ComponentState>,
452 component: &Component,
453 input_json: &str,
454 ) -> Result<InvokeResult> {
455 let pre_instance = linker.instantiate_pre(component)?;
456 let pre = component_api::v0_6_runtime::ComponentV0V6RuntimePre::new(pre_instance).map_err(
457 |err| err.context("component exports neither node@0.5/0.4 nor component-runtime@0.6"),
458 )?;
459
460 let result = block_on(async {
461 let bindings = pre.instantiate_async(&mut *store).await?;
462 let runtime = bindings.greentic_component_component_runtime();
463
464 let input_value: Value = serde_json::from_str(input_json).unwrap_or(Value::Null);
466 let input_cbor =
467 serde_cbor::to_vec(&input_value).context("encode input as CBOR for v0.6")?;
468 let empty_state = serde_cbor::to_vec(&Value::Object(Default::default()))
469 .context("encode empty state")?;
470
471 let run_result = runtime
472 .call_run(&mut *store, &input_cbor, &empty_state)
473 .map_err(|err| err.context("v0.6 component-runtime::run call failed"))?;
474
475 let output_value: Value = serde_cbor::from_slice(&run_result.output)
477 .context("decode v0.6 run output CBOR")?;
478 let output_json = serde_json::to_string(&output_value)
479 .context("serialize v0.6 run output to JSON")?;
480
481 Ok::<_, anyhow::Error>(output_json)
482 })?;
483
484 Ok(InvokeResult::Ok(result))
485 }
486
487 fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
488 match result {
489 InvokeResult::Ok(body) => {
490 if body.is_empty() {
491 return Ok(Value::Null);
492 }
493 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
494 }
495 InvokeResult::Err(NodeError {
496 code,
497 message,
498 retryable,
499 backoff_ms,
500 details,
501 }) => {
502 let mut obj = serde_json::Map::new();
503 obj.insert("ok".into(), Value::Bool(false));
504 let mut error = serde_json::Map::new();
505 error.insert("code".into(), Value::String(code));
506 error.insert("message".into(), Value::String(message));
507 error.insert("retryable".into(), Value::Bool(retryable));
508 if let Some(backoff) = backoff_ms {
509 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
510 }
511 if let Some(details) = details {
512 error.insert(
513 "details".into(),
514 serde_json::from_str(&details).unwrap_or(Value::String(details)),
515 );
516 }
517 obj.insert("error".into(), Value::Object(error));
518 Ok(Value::Object(obj))
519 }
520 }
521 }
522
523 fn secrets_tenant_ctx(&self) -> TypesTenantCtx {
527 let mut ctx = self.config.tenant_ctx();
528 if let Some(exec_ctx) = self.exec_ctx.as_ref()
529 && let Some(team) = exec_ctx.tenant.team.as_ref()
530 && let Ok(team_id) = TeamId::from_str(team)
531 {
532 ctx = ctx.with_team(Some(team_id));
533 }
534 ctx
535 }
536
537 pub fn get_secret(&self, key: &str) -> Result<String> {
538 if provider_core_only::is_enabled() {
539 bail!(provider_core_only::blocked_message("secrets"))
540 }
541 if !self.config.secrets_policy.is_allowed(key) {
542 bail!("secret {key} is not permitted by bindings policy");
543 }
544 if let Some(mock) = &self.mocks
545 && let Some(value) = mock.secrets_lookup(key)
546 {
547 return Ok(value);
548 }
549 let ctx = self.secrets_tenant_ctx();
550 let canonical_key = canonicalize_secret_key(key);
551 let bytes = read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key)
552 .context("failed to read secret from manager")?;
553 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
554 Ok(value)
555 }
556
557 fn allows_secret_write_in_provider_core_only(&self) -> bool {
558 self.provider_core_component || self.component_ref.is_none()
559 }
560
561 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
562 let tenant_raw = ctx
563 .as_ref()
564 .map(|ctx| ctx.tenant.clone())
565 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
566 .unwrap_or_else(|| self.config.tenant.clone());
567 let env_raw = ctx
568 .as_ref()
569 .map(|ctx| ctx.env.clone())
570 .unwrap_or_else(|| self.default_env.clone());
571 let tenant_id = TenantId::from_str(&tenant_raw)
572 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
573 let env_id = EnvId::from_str(&env_raw)
574 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
575 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
576 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
577 if let Some(team) = exec_ctx.tenant.team.as_ref() {
578 let team_id =
579 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
580 tenant_ctx = tenant_ctx.with_team(Some(team_id));
581 }
582 if let Some(user) = exec_ctx.tenant.user.as_ref() {
583 let user_id =
584 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
585 tenant_ctx = tenant_ctx.with_user(Some(user_id));
586 }
587 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
588 if let Some(node) = exec_ctx.node_id.as_ref() {
589 tenant_ctx = tenant_ctx.with_node(node.clone());
590 }
591 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
592 tenant_ctx = tenant_ctx.with_session(session.clone());
593 }
594 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
595 }
596
597 if let Some(ctx) = ctx {
598 if let Some(team) = ctx.team.or(ctx.team_id) {
599 let team_id =
600 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
601 tenant_ctx = tenant_ctx.with_team(Some(team_id));
602 }
603 if let Some(user) = ctx.user.or(ctx.user_id) {
604 let user_id =
605 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
606 tenant_ctx = tenant_ctx.with_user(Some(user_id));
607 }
608 if let Some(flow) = ctx.flow_id {
609 tenant_ctx = tenant_ctx.with_flow(flow);
610 }
611 if let Some(node) = ctx.node_id {
612 tenant_ctx = tenant_ctx.with_node(node);
613 }
614 if let Some(provider) = ctx.provider_id {
615 tenant_ctx = tenant_ctx.with_provider(provider);
616 }
617 if let Some(session) = ctx.session_id {
618 tenant_ctx = tenant_ctx.with_session(session);
619 }
620 tenant_ctx.trace_id = ctx.trace_id;
621 }
622 Ok(tenant_ctx)
623 }
624
625 fn send_http_request(
626 &mut self,
627 req: HttpRequest,
628 opts: Option<HttpRequestOptionsV1_1>,
629 _ctx: Option<HttpTenantCtx>,
630 ) -> Result<HttpResponse, HttpClientError> {
631 if !self.config.http_enabled {
632 return Err(HttpClientError {
633 code: "denied".into(),
634 message: "http client disabled by policy".into(),
635 });
636 }
637
638 let mut mock_state = None;
639 let raw_body = req.body.clone();
640 if let Some(mock) = &self.mocks
641 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
642 {
643 match mock.http_begin(&meta) {
644 HttpDecision::Mock(response) => {
645 let headers = response
646 .headers
647 .iter()
648 .map(|(k, v)| (k.clone(), v.clone()))
649 .collect();
650 return Ok(HttpResponse {
651 status: response.status,
652 headers,
653 body: response.body.clone().map(|b| b.into_bytes()),
654 });
655 }
656 HttpDecision::Deny(reason) => {
657 return Err(HttpClientError {
658 code: "denied".into(),
659 message: reason,
660 });
661 }
662 HttpDecision::Passthrough { record } => {
663 mock_state = Some((meta, record));
664 }
665 }
666 }
667
668 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
669 let mut builder = self.http_client.request(method, &req.url);
670 for (key, value) in req.headers {
671 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
672 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
673 {
674 builder = builder.header(header, header_value);
675 }
676 }
677
678 if let Some(body) = raw_body.clone() {
679 builder = builder.body(body);
680 }
681
682 if let Some(opts) = opts {
683 if let Some(timeout_ms) = opts.timeout_ms {
684 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
685 }
686 if opts.allow_insecure == Some(true) {
687 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
688 }
689 if let Some(follow_redirects) = opts.follow_redirects
690 && !follow_redirects
691 {
692 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
693 }
694 }
695
696 let response = match builder.send() {
697 Ok(resp) => resp,
698 Err(err) => {
699 warn!(url = %req.url, error = %err, "http client request failed");
700 return Err(HttpClientError {
701 code: "unavailable".into(),
702 message: err.to_string(),
703 });
704 }
705 };
706
707 let status = response.status().as_u16();
708 let headers_vec = response
709 .headers()
710 .iter()
711 .map(|(k, v)| {
712 (
713 k.as_str().to_string(),
714 v.to_str().unwrap_or_default().to_string(),
715 )
716 })
717 .collect::<Vec<_>>();
718 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
719
720 if let Some((meta, true)) = mock_state.take()
721 && let Some(mock) = &self.mocks
722 {
723 let recorded = HttpMockResponse::new(
724 status,
725 headers_vec.clone().into_iter().collect(),
726 body_bytes
727 .as_ref()
728 .map(|b| String::from_utf8_lossy(b).into_owned()),
729 );
730 mock.http_record(&meta, &recorded);
731 }
732
733 Ok(HttpResponse {
734 status,
735 headers: headers_vec,
736 body: body_bytes,
737 })
738 }
739}
740
741#[cfg(test)]
742mod canonicalize_tests {
743 use crate::secrets::canonicalize_secret_key;
744
745 #[test]
746 fn upper_snake_to_lower_snake() {
747 assert_eq!(
748 canonicalize_secret_key("TELEGRAM_BOT_TOKEN"),
749 "telegram_bot_token"
750 );
751 }
752
753 #[test]
754 fn trim_and_replace_non_alphanumeric() {
755 assert_eq!(
756 canonicalize_secret_key(" webex-bot-token "),
757 "webex_bot_token"
758 );
759 }
760
761 #[test]
762 fn preserve_existing_lower_snake_with_extra_underscores() {
763 assert_eq!(canonicalize_secret_key("MiXeD__Case"), "mixed__case");
764 }
765}
766
767impl SecretsStoreHost for HostState {
768 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
769 if provider_core_only::is_enabled() {
770 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
771 return Err(SecretsError::Denied);
772 }
773 if !self.config.secrets_policy.is_allowed(&key) {
774 return Err(SecretsError::Denied);
775 }
776 if let Some(mock) = &self.mocks
777 && let Some(value) = mock.secrets_lookup(&key)
778 {
779 return Ok(Some(value.into_bytes()));
780 }
781 let ctx = self.secrets_tenant_ctx();
782 let canonical_key = canonicalize_secret_key(&key);
783 match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
784 Ok(bytes) => Ok(Some(bytes)),
785 Err(err) => {
786 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
787 Err(SecretsError::NotFound)
788 }
789 }
790 }
791}
792
793impl SecretsStoreHostV1_1 for HostState {
794 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
795 if provider_core_only::is_enabled() {
796 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
797 return Err(SecretsErrorV1_1::Denied);
798 }
799 if !self.config.secrets_policy.is_allowed(&key) {
800 return Err(SecretsErrorV1_1::Denied);
801 }
802 if let Some(mock) = &self.mocks
803 && let Some(value) = mock.secrets_lookup(&key)
804 {
805 return Ok(Some(value.into_bytes()));
806 }
807 let ctx = self.secrets_tenant_ctx();
808 let canonical_key = canonicalize_secret_key(&key);
809 match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
810 Ok(bytes) => Ok(Some(bytes)),
811 Err(err) => {
812 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
813 Err(SecretsErrorV1_1::NotFound)
814 }
815 }
816 }
817
818 fn put(&mut self, key: String, value: Vec<u8>) {
819 if key.trim().is_empty() {
820 warn!(secret = %key, "secret write blocked: empty key");
821 panic!("secret write denied for key {key}: invalid key");
822 }
823 if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
824 warn!(
825 secret = %key,
826 component = self.component_ref.as_deref().unwrap_or("<pack>"),
827 "provider-core only mode enabled; blocking secrets store write"
828 );
829 panic!("secret write denied for key {key}: provider-core-only mode");
830 }
831 if !self.config.secrets_policy.is_allowed(&key) {
832 warn!(secret = %key, "secret write denied by bindings policy");
833 panic!("secret write denied for key {key}: policy");
834 }
835 let ctx = self.secrets_tenant_ctx();
836 let canonical_key = canonicalize_secret_key(&key);
837 if let Err(err) =
838 write_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key, &value)
839 {
840 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret write failed");
841 panic!("secret write failed for key {key}");
842 }
843 }
844}
845
846static WARNED_COMPAT_KEYS: Lazy<Mutex<HashSet<String>>> = Lazy::new(|| Mutex::new(HashSet::new()));
851
852fn warn_compat_fallback_once(key: &str) {
853 let mut warned = WARNED_COMPAT_KEYS.lock();
854 if warned.insert(key.to_string()) {
855 warn!(
856 key = %key,
857 "runtime-config key resolved via secrets-store compat fallback; \
858 move this value into pack-config.v1.non_secret"
859 );
860 }
861}
862
863impl RuntimeConfigHost for HostState {
864 fn get(&mut self, key: String) -> Result<Option<String>, ConfigError> {
865 if key.trim().is_empty() {
866 return Err(ConfigError::InvalidKey);
867 }
868
869 if let Some(map) = self.runtime_config_non_secret.as_ref()
873 && let Some(value) = map.get(&key)
874 {
875 return serde_json::to_string(value).map(Some).map_err(|err| {
876 warn!(key = %key, error = %err, "runtime-config value JSON-encode failed");
877 ConfigError::Internal
878 });
879 }
880
881 if let Some(injection) = self.runtime_refs.as_ref()
886 && let Some(uri) = injection.refs.get(&key)
887 {
888 use crate::runtime_refs::RuntimeRefResolverError;
889 return match injection.resolver.resolve(uri) {
890 Ok(Some(value)) => serde_json::to_string(&value).map(Some).map_err(|err| {
891 warn!(key = %key, error = %err, "runtime-ref value JSON-encode failed");
892 ConfigError::Internal
893 }),
894 Ok(None) => Ok(None),
895 Err(err @ RuntimeRefResolverError::Invalid(_)) => {
896 warn!(key = %key, error = %err, "runtime-ref rejected");
897 Err(ConfigError::InvalidKey)
898 }
899 Err(err @ RuntimeRefResolverError::Internal(_)) => {
900 warn!(key = %key, error = %err, "runtime-ref resolution failed");
901 Err(ConfigError::Internal)
902 }
903 };
904 }
905
906 match SecretsStoreHost::get(self, key.clone()) {
909 Ok(Some(bytes)) => match String::from_utf8(bytes) {
910 Ok(value) => {
911 warn_compat_fallback_once(&key);
912 Ok(Some(value))
913 }
914 Err(_) => {
915 warn!(
916 key = %key,
917 "runtime-config compat fallback found non-UTF-8 secret bytes; \
918 returning not-found"
919 );
920 Err(ConfigError::Internal)
921 }
922 },
923 Ok(None) => Ok(None),
924 Err(SecretsError::NotFound) => Ok(None),
925 Err(SecretsError::Denied) => Err(ConfigError::Denied),
926 Err(SecretsError::InvalidKey) => Err(ConfigError::InvalidKey),
927 Err(SecretsError::Internal) => Err(ConfigError::Internal),
928 }
929 }
930}
931
932impl HttpClientHost for HostState {
933 fn send(
934 &mut self,
935 req: HttpRequest,
936 ctx: Option<HttpTenantCtx>,
937 ) -> Result<HttpResponse, HttpClientError> {
938 self.send_http_request(req, None, ctx)
939 }
940}
941
942impl HttpClientHostV1_1 for HostState {
943 fn send(
944 &mut self,
945 req: HttpRequestV1_1,
946 opts: Option<HttpRequestOptionsV1_1>,
947 ctx: Option<HttpTenantCtxV1_1>,
948 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
949 let legacy_req = HttpRequest {
950 method: req.method,
951 url: req.url,
952 headers: req.headers,
953 body: req.body,
954 };
955 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
956 env: ctx.env,
957 tenant: ctx.tenant,
958 tenant_id: ctx.tenant_id,
959 team: ctx.team,
960 team_id: ctx.team_id,
961 user: ctx.user,
962 user_id: ctx.user_id,
963 trace_id: ctx.trace_id,
964 correlation_id: ctx.correlation_id,
965 i18n_id: ctx.i18n_id,
966 attributes: ctx.attributes,
967 session_id: ctx.session_id,
968 flow_id: ctx.flow_id,
969 node_id: ctx.node_id,
970 provider_id: ctx.provider_id,
971 deadline_ms: ctx.deadline_ms,
972 attempt: ctx.attempt,
973 idempotency_key: ctx.idempotency_key,
974 impersonation: ctx.impersonation.map(|imp| http_types_v1_0::Impersonation {
975 actor_id: imp.actor_id,
976 reason: imp.reason,
977 }),
978 });
979
980 self.send_http_request(legacy_req, opts, legacy_ctx)
981 .map(|resp| HttpResponseV1_1 {
982 status: resp.status,
983 headers: resp.headers,
984 body: resp.body,
985 })
986 .map_err(|err| HttpClientErrorV1_1 {
987 code: err.code,
988 message: err.message,
989 })
990 }
991}
992
993impl StateStoreHost for HostState {
994 fn read(
995 &mut self,
996 key: HostStateKey,
997 ctx: Option<StateTenantCtx>,
998 ) -> Result<Vec<u8>, StateError> {
999 let store = match self.state_store.as_ref() {
1000 Some(store) => store.clone(),
1001 None => {
1002 return Err(StateError {
1003 code: "unavailable".into(),
1004 message: "state store not configured".into(),
1005 });
1006 }
1007 };
1008 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1009 Ok(ctx) => ctx,
1010 Err(err) => {
1011 return Err(StateError {
1012 code: "invalid-ctx".into(),
1013 message: err.to_string(),
1014 });
1015 }
1016 };
1017 #[cfg(feature = "fault-injection")]
1018 {
1019 let exec_ctx = self.exec_ctx.as_ref();
1020 let flow_id = exec_ctx
1021 .map(|ctx| ctx.flow_id.as_str())
1022 .unwrap_or("unknown");
1023 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
1024 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
1025 let fault_ctx = FaultContext {
1026 pack_id: self.pack_id.as_str(),
1027 flow_id,
1028 node_id,
1029 attempt,
1030 };
1031 if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
1032 return Err(StateError {
1033 code: "internal".into(),
1034 message: err.to_string(),
1035 });
1036 }
1037 }
1038 let key = StoreStateKey::from(key);
1039 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
1040 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
1041 Ok(None) => Err(StateError {
1042 code: "not_found".into(),
1043 message: "state key not found".into(),
1044 }),
1045 Err(err) => Err(StateError {
1046 code: "internal".into(),
1047 message: err.to_string(),
1048 }),
1049 }
1050 }
1051
1052 fn write(
1053 &mut self,
1054 key: HostStateKey,
1055 bytes: Vec<u8>,
1056 ctx: Option<StateTenantCtx>,
1057 ) -> Result<StateOpAck, StateError> {
1058 let store = match self.state_store.as_ref() {
1059 Some(store) => store.clone(),
1060 None => {
1061 return Err(StateError {
1062 code: "unavailable".into(),
1063 message: "state store not configured".into(),
1064 });
1065 }
1066 };
1067 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1068 Ok(ctx) => ctx,
1069 Err(err) => {
1070 return Err(StateError {
1071 code: "invalid-ctx".into(),
1072 message: err.to_string(),
1073 });
1074 }
1075 };
1076 #[cfg(feature = "fault-injection")]
1077 {
1078 let exec_ctx = self.exec_ctx.as_ref();
1079 let flow_id = exec_ctx
1080 .map(|ctx| ctx.flow_id.as_str())
1081 .unwrap_or("unknown");
1082 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
1083 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
1084 let fault_ctx = FaultContext {
1085 pack_id: self.pack_id.as_str(),
1086 flow_id,
1087 node_id,
1088 attempt,
1089 };
1090 if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
1091 return Err(StateError {
1092 code: "internal".into(),
1093 message: err.to_string(),
1094 });
1095 }
1096 }
1097 let key = StoreStateKey::from(key);
1098 let value = serde_json::from_slice(&bytes)
1099 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
1100 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
1101 Ok(()) => Ok(StateOpAck::Ok),
1102 Err(err) => Err(StateError {
1103 code: "internal".into(),
1104 message: err.to_string(),
1105 }),
1106 }
1107 }
1108
1109 fn delete(
1110 &mut self,
1111 key: HostStateKey,
1112 ctx: Option<StateTenantCtx>,
1113 ) -> Result<StateOpAck, StateError> {
1114 let store = match self.state_store.as_ref() {
1115 Some(store) => store.clone(),
1116 None => {
1117 return Err(StateError {
1118 code: "unavailable".into(),
1119 message: "state store not configured".into(),
1120 });
1121 }
1122 };
1123 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
1124 Ok(ctx) => ctx,
1125 Err(err) => {
1126 return Err(StateError {
1127 code: "invalid-ctx".into(),
1128 message: err.to_string(),
1129 });
1130 }
1131 };
1132 let key = StoreStateKey::from(key);
1133 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
1134 Ok(_) => Ok(StateOpAck::Ok),
1135 Err(err) => Err(StateError {
1136 code: "internal".into(),
1137 message: err.to_string(),
1138 }),
1139 }
1140 }
1141}
1142
1143impl TelemetryLoggerHost for HostState {
1144 fn log(
1145 &mut self,
1146 span: TelemetrySpanContext,
1147 fields: Vec<(String, String)>,
1148 _ctx: Option<TelemetryTenantCtx>,
1149 ) -> Result<TelemetryAck, TelemetryError> {
1150 if let Some(mock) = &self.mocks
1151 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
1152 {
1153 return Ok(TelemetryAck::Ok);
1154 }
1155 let mut map = serde_json::Map::new();
1156 for (k, v) in fields {
1157 map.insert(k, Value::String(v));
1158 }
1159 tracing::info!(
1160 tenant = %span.tenant,
1161 flow_id = %span.flow_id,
1162 node = ?span.node_id,
1163 provider = %span.provider,
1164 fields = %serde_json::Value::Object(map.clone()),
1165 "telemetry log from pack"
1166 );
1167 Ok(TelemetryAck::Ok)
1168 }
1169}
1170
1171impl RunnerHostHttp for HostState {
1172 fn request(
1173 &mut self,
1174 method: String,
1175 url: String,
1176 headers: Vec<String>,
1177 body: Option<Vec<u8>>,
1178 ) -> Result<Vec<u8>, String> {
1179 let req = HttpRequest {
1180 method,
1181 url,
1182 headers: headers
1183 .chunks(2)
1184 .filter_map(|chunk| {
1185 if chunk.len() == 2 {
1186 Some((chunk[0].clone(), chunk[1].clone()))
1187 } else {
1188 None
1189 }
1190 })
1191 .collect(),
1192 body,
1193 };
1194 match HttpClientHost::send(self, req, None) {
1195 Ok(resp) => Ok(resp.body.unwrap_or_default()),
1196 Err(err) => Err(err.message),
1197 }
1198 }
1199}
1200
1201impl RunnerHostKv for HostState {
1202 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
1203 None
1204 }
1205
1206 fn put(&mut self, _ns: String, _key: String, _val: String) {}
1207}
1208
1209enum ManifestLoad {
1210 New {
1211 manifest: Box<greentic_types::PackManifest>,
1212 flows: PackFlows,
1213 },
1214 Legacy {
1215 manifest: Box<legacy_pack::PackManifest>,
1216 flows: PackFlows,
1217 },
1218}
1219
1220fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
1221 let mut archive = ZipArchive::new(File::open(path)?)
1222 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1223 let bytes = read_entry(&mut archive, "manifest.cbor")
1224 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
1225 match decode_pack_manifest(&bytes) {
1226 Ok(manifest) => {
1227 let cache = PackFlows::from_manifest(manifest.clone());
1228 Ok(ManifestLoad::New {
1229 manifest: Box::new(manifest),
1230 flows: cache,
1231 })
1232 }
1233 Err(err) => {
1234 tracing::debug!(
1235 error = %err,
1236 pack = %path.display(),
1237 "decode_pack_manifest failed for archive; falling back to legacy manifest"
1238 );
1239 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1240 .context("failed to decode legacy pack manifest from manifest.cbor")?;
1241 let flows = load_legacy_flows_from_archive(&mut archive, &legacy)?;
1242 Ok(ManifestLoad::Legacy {
1243 manifest: Box::new(legacy),
1244 flows,
1245 })
1246 }
1247 }
1248}
1249
1250fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
1251 let manifest_path = root.join("manifest.cbor");
1252 let bytes = std::fs::read(&manifest_path)
1253 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
1254 match decode_pack_manifest(&bytes) {
1255 Ok(manifest) => {
1256 let cache = PackFlows::from_manifest(manifest.clone());
1257 Ok(ManifestLoad::New {
1258 manifest: Box::new(manifest),
1259 flows: cache,
1260 })
1261 }
1262 Err(err) => {
1263 tracing::debug!(
1264 error = %err,
1265 pack = %root.display(),
1266 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
1267 );
1268 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1269 .context("failed to decode legacy pack manifest from manifest.cbor")?;
1270 let flows = load_legacy_flows_from_dir(root, &legacy)?;
1271 Ok(ManifestLoad::Legacy {
1272 manifest: Box::new(legacy),
1273 flows,
1274 })
1275 }
1276 }
1277}
1278
1279fn load_legacy_flows_from_dir(
1280 root: &Path,
1281 manifest: &legacy_pack::PackManifest,
1282) -> Result<PackFlows> {
1283 build_legacy_flows(manifest, |rel_path| {
1284 let path = root.join(rel_path);
1285 std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
1286 })
1287}
1288
1289fn load_legacy_flows_from_archive(
1290 archive: &mut ZipArchive<File>,
1291 manifest: &legacy_pack::PackManifest,
1292) -> Result<PackFlows> {
1293 build_legacy_flows(manifest, |rel_path| {
1294 read_entry(archive, rel_path).with_context(|| format!("missing flow json {}", rel_path))
1295 })
1296}
1297
1298fn build_legacy_flows(
1299 manifest: &legacy_pack::PackManifest,
1300 mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1301) -> Result<PackFlows> {
1302 let mut flows = HashMap::new();
1303 let mut descriptors = Vec::new();
1304
1305 for entry in &manifest.flows {
1306 let bytes = read_json(&entry.file_json)
1307 .with_context(|| format!("missing flow json {}", entry.file_json))?;
1308 let doc = parse_flow_doc_with_legacy_aliases(&bytes)?;
1309 let normalized = normalize_flow_doc(doc);
1310 let flow_ir = flow_doc_to_ir(normalized)?;
1311 let flow = flow_ir_to_flow(flow_ir)?;
1312
1313 descriptors.push(FlowDescriptor {
1314 id: entry.id.clone(),
1315 flow_type: entry.kind.clone(),
1316 pack_id: manifest.meta.pack_id.clone(),
1317 profile: manifest.meta.pack_id.clone(),
1318 version: manifest.meta.version.to_string(),
1319 description: None,
1320 });
1321 flows.insert(entry.id.clone(), flow);
1322 }
1323
1324 let mut entry_flows = manifest.meta.entry_flows.clone();
1325 if entry_flows.is_empty() {
1326 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1327 }
1328 let metadata = PackMetadata {
1329 pack_id: manifest.meta.pack_id.clone(),
1330 version: manifest.meta.version.to_string(),
1331 entry_flows,
1332 secret_requirements: Vec::new(),
1333 };
1334
1335 Ok(PackFlows {
1336 descriptors,
1337 flows,
1338 metadata,
1339 })
1340}
1341
1342fn parse_flow_doc_with_legacy_aliases(bytes: &[u8]) -> Result<FlowDoc> {
1343 let mut value: Value =
1344 serde_json::from_slice(bytes).context("failed to decode flow doc JSON")?;
1345 if let Some(map) = value.as_object_mut()
1346 && !map.contains_key("type")
1347 && let Some(flow_type) = map.remove("flow_type")
1348 {
1349 map.insert("type".to_string(), flow_type);
1350 }
1351 serde_json::from_value(value).context("failed to decode flow doc structure")
1352}
1353
1354pub struct ComponentState {
1355 pub host: HostState,
1356 wasi_ctx: WasiCtx,
1357 wasi_tls_ctx: WasiTlsCtx,
1358 wasi_http_ctx: WasiHttpCtx,
1359 resource_table: ResourceTable,
1360}
1361
1362fn install_default_crypto_provider() {
1373 static ONCE: std::sync::Once = std::sync::Once::new();
1374 ONCE.call_once(|| {
1375 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
1376 });
1377}
1378
1379impl ComponentState {
1380 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1381 install_default_crypto_provider();
1384 let wasi_ctx = policy
1385 .instantiate()
1386 .context("failed to build WASI context")?;
1387 Ok(Self {
1388 host,
1389 wasi_ctx,
1390 wasi_tls_ctx: WasiTlsCtxBuilder::new().build(),
1391 wasi_http_ctx: WasiHttpCtx::new(),
1392 resource_table: ResourceTable::new(),
1393 })
1394 }
1395
1396 fn host_mut(&mut self) -> &mut HostState {
1397 &mut self.host
1398 }
1399
1400 fn should_cancel_host(&mut self) -> bool {
1401 false
1402 }
1403
1404 fn yield_now_host(&mut self) {
1405 }
1407}
1408
1409impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1410 fn should_cancel(&mut self) -> bool {
1411 self.should_cancel_host()
1412 }
1413
1414 fn yield_now(&mut self) {
1415 self.yield_now_host();
1416 }
1417}
1418
1419impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1420 fn should_cancel(&mut self) -> bool {
1421 self.should_cancel_host()
1422 }
1423
1424 fn yield_now(&mut self) {
1425 self.yield_now_host();
1426 }
1427}
1428
1429fn add_component_control_instance(
1430 linker: &mut Linker<ComponentState>,
1431 name: &str,
1432) -> wasmtime::Result<()> {
1433 let mut inst = linker.instance(name)?;
1434 inst.func_wrap(
1435 "should-cancel",
1436 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1437 let host = caller.data_mut();
1438 Ok((host.should_cancel_host(),))
1439 },
1440 )?;
1441 inst.func_wrap(
1442 "yield-now",
1443 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1444 let host = caller.data_mut();
1445 host.yield_now_host();
1446 Ok(())
1447 },
1448 )?;
1449 Ok(())
1450}
1451
1452fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1453 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1454 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1455 Ok(())
1456}
1457
1458pub fn register_identity_probe(linker: &mut Linker<ComponentState>) -> Result<()> {
1491 register_all(linker, false)
1494}
1495
1496#[cfg(test)]
1497mod register_identity_probe_tests {
1498 use super::*;
1499
1500 #[test]
1509 fn register_identity_probe_links_successfully() {
1510 let engine = wasmtime::Engine::default();
1511 let mut linker = Linker::<ComponentState>::new(&engine);
1512 register_identity_probe(&mut linker).expect("probe linker registers all imports");
1513 }
1514
1515 #[test]
1518 fn probe_wasi_policy_is_locked_down() {
1519 let policy = RunnerWasiPolicy::probe();
1520 assert!(!policy.inherit_stdio, "probe WASI must not inherit stdio");
1521 assert!(
1522 policy.preopens.is_empty(),
1523 "probe WASI must have no preopens"
1524 );
1525 assert!(
1526 policy.env_allow.is_empty(),
1527 "probe WASI must not allow env vars"
1528 );
1529 assert!(
1530 policy.env_set.is_empty(),
1531 "probe WASI must not set env vars"
1532 );
1533 }
1534}
1535
1536pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1537 add_wasi_to_linker(linker)?;
1538
1539 let mut opts = LinkOptions::default();
1541 opts.tls(true);
1542 wasmtime_wasi_tls::p2::add_to_linker(linker, &opts)?;
1543
1544 add_wasi_http_to_linker(linker)?;
1546
1547 add_all_v1_to_linker(
1548 linker,
1549 HostFns {
1550 http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1551 http_client: Some(|state: &mut ComponentState| state.host_mut()),
1552 oauth_broker: None,
1553 runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1554 runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1555 telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1556 state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1557 secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1558 secrets_store: None,
1559 runtime_config: Some(|state: &mut ComponentState| state.host_mut()),
1560 },
1561 )?;
1562 add_http_client_client_world_aliases(linker)?;
1563 Ok(())
1564}
1565
1566fn add_http_client_client_world_aliases(linker: &mut Linker<ComponentState>) -> Result<()> {
1567 let mut inst_v1_1 = linker.instance("greentic:http/client@1.1.0")?;
1568 inst_v1_1.func_wrap(
1569 "send",
1570 move |mut caller: StoreContextMut<'_, ComponentState>,
1571 (req, opts, ctx): (
1572 http_client_client_alias::Request,
1573 Option<http_client_client_alias::RequestOptions>,
1574 Option<http_client_client_alias::TenantCtx>,
1575 )| {
1576 let host = caller.data_mut().host_mut();
1577 let result = HttpClientHostV1_1::send(
1578 host,
1579 alias_request_to_host(req),
1580 opts.map(alias_request_options_to_host),
1581 ctx.map(alias_tenant_ctx_to_host),
1582 );
1583 Ok((match result {
1584 Ok(resp) => Ok(alias_response_from_host(resp)),
1585 Err(err) => Err(alias_error_from_host(err)),
1586 },))
1587 },
1588 )?;
1589 let mut inst_v1_0 = linker.instance("greentic:http/client@1.0.0")?;
1590 inst_v1_0.func_wrap(
1591 "send",
1592 move |mut caller: StoreContextMut<'_, ComponentState>,
1593 (req, ctx): (
1594 host_http_client::Request,
1595 Option<host_http_client::TenantCtx>,
1596 )| {
1597 let host = caller.data_mut().host_mut();
1598 let result = HttpClientHost::send(host, req, ctx);
1599 Ok((result,))
1600 },
1601 )?;
1602 Ok(())
1603}
1604
1605fn alias_request_to_host(req: http_client_client_alias::Request) -> host_http_client::RequestV1_1 {
1606 host_http_client::RequestV1_1 {
1607 method: req.method,
1608 url: req.url,
1609 headers: req.headers,
1610 body: req.body,
1611 }
1612}
1613
1614fn alias_request_options_to_host(
1615 opts: http_client_client_alias::RequestOptions,
1616) -> host_http_client::RequestOptionsV1_1 {
1617 host_http_client::RequestOptionsV1_1 {
1618 timeout_ms: opts.timeout_ms,
1619 allow_insecure: opts.allow_insecure,
1620 follow_redirects: opts.follow_redirects,
1621 }
1622}
1623
1624fn alias_tenant_ctx_to_host(
1625 ctx: http_client_client_alias::TenantCtx,
1626) -> host_http_client::TenantCtxV1_1 {
1627 host_http_client::TenantCtxV1_1 {
1628 env: ctx.env,
1629 tenant: ctx.tenant,
1630 tenant_id: ctx.tenant_id,
1631 team: ctx.team,
1632 team_id: ctx.team_id,
1633 user: ctx.user,
1634 user_id: ctx.user_id,
1635 trace_id: ctx.trace_id,
1636 correlation_id: ctx.correlation_id,
1637 i18n_id: ctx.i18n_id,
1638 attributes: ctx.attributes,
1639 session_id: ctx.session_id,
1640 flow_id: ctx.flow_id,
1641 node_id: ctx.node_id,
1642 provider_id: ctx.provider_id,
1643 deadline_ms: ctx.deadline_ms,
1644 attempt: ctx.attempt,
1645 idempotency_key: ctx.idempotency_key,
1646 impersonation: ctx.impersonation.map(|imp| http_types_v1_1::Impersonation {
1647 actor_id: imp.actor_id,
1648 reason: imp.reason,
1649 }),
1650 }
1651}
1652
1653fn alias_response_from_host(
1654 resp: host_http_client::ResponseV1_1,
1655) -> http_client_client_alias::Response {
1656 http_client_client_alias::Response {
1657 status: resp.status,
1658 headers: resp.headers,
1659 body: resp.body,
1660 }
1661}
1662
1663fn alias_error_from_host(
1664 err: host_http_client::HttpClientErrorV1_1,
1665) -> http_client_client_alias::HostError {
1666 http_client_client_alias::HostError {
1667 code: err.code,
1668 message: err.message,
1669 }
1670}
1671
1672impl OAuthHostContext for ComponentState {
1673 fn tenant_id(&self) -> &str {
1674 &self.host.config.tenant
1675 }
1676
1677 fn env(&self) -> &str {
1678 &self.host.default_env
1679 }
1680
1681 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1682 &mut self.host.oauth_host
1683 }
1684
1685 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1686 self.host.oauth_config.as_ref()
1687 }
1688}
1689
1690impl WasiView for ComponentState {
1691 fn ctx(&mut self) -> WasiCtxView<'_> {
1692 WasiCtxView {
1693 ctx: &mut self.wasi_ctx,
1694 table: &mut self.resource_table,
1695 }
1696 }
1697}
1698
1699impl WasiHttpView for ComponentState {
1700 fn http(&mut self) -> WasiHttpCtxView<'_> {
1701 WasiHttpCtxView {
1702 ctx: &mut self.wasi_http_ctx,
1703 table: &mut self.resource_table,
1704 hooks: Default::default(),
1705 }
1706 }
1707}
1708
1709impl WasiTlsView for ComponentState {
1710 fn tls(&mut self) -> WasiTlsCtxView<'_> {
1711 WasiTlsCtxView {
1712 ctx: &mut self.wasi_tls_ctx,
1713 table: &mut self.resource_table,
1714 }
1715 }
1716}
1717
1718#[allow(unsafe_code)]
1719unsafe impl Send for ComponentState {}
1720#[allow(unsafe_code)]
1721unsafe impl Sync for ComponentState {}
1722
1723impl PackRuntime {
1724 fn allows_state_store(&self, component_ref: &str) -> bool {
1725 if self.state_store.is_none() {
1726 return false;
1727 }
1728 if !self.config.state_store_policy.allow {
1729 return false;
1730 }
1731 let Some(manifest) = self.component_manifests.get(component_ref) else {
1732 return true;
1734 };
1735 manifest
1740 .capabilities
1741 .host
1742 .state
1743 .as_ref()
1744 .map(|caps| caps.read || caps.write)
1745 .unwrap_or(true)
1746 }
1747
1748 pub fn contains_component(&self, component_ref: &str) -> bool {
1749 self.components.contains_key(component_ref)
1750 }
1751
1752 pub fn state_store_handle(&self) -> Option<crate::storage::DynStateStore> {
1757 self.state_store.clone()
1758 }
1759
1760 #[allow(clippy::too_many_arguments)]
1761 pub async fn load(
1762 path: impl AsRef<Path>,
1763 config: Arc<HostConfig>,
1764 mocks: Option<Arc<MockLayer>>,
1765 archive_source: Option<&Path>,
1766 session_store: Option<DynSessionStore>,
1767 state_store: Option<DynStateStore>,
1768 wasi_policy: Arc<RunnerWasiPolicy>,
1769 secrets: DynSecretsManager,
1770 oauth_config: Option<OAuthBrokerConfig>,
1771 verify_archive: bool,
1772 component_resolution: ComponentResolution,
1773 ) -> Result<Self> {
1774 let path = path.as_ref();
1775 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1776 let path_meta = std::fs::metadata(&safe_path).ok();
1777 let is_dir = path_meta
1778 .as_ref()
1779 .map(|meta| meta.is_dir())
1780 .unwrap_or(false);
1781 let is_component = !is_dir
1782 && safe_path
1783 .extension()
1784 .and_then(|ext| ext.to_str())
1785 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1786 .unwrap_or(false);
1787 let archive_hint_path = if let Some(source) = archive_source {
1788 let (_, normalized) = normalize_pack_path(source)?;
1789 Some(normalized)
1790 } else if is_component || is_dir {
1791 None
1792 } else {
1793 Some(safe_path.clone())
1794 };
1795 let archive_hint = archive_hint_path.as_deref();
1796 if verify_archive {
1797 if let Some(verify_target) = archive_hint.and_then(|p| {
1798 std::fs::metadata(p)
1799 .ok()
1800 .filter(|meta| meta.is_file())
1801 .map(|_| p)
1802 }) {
1803 verify::verify_pack(verify_target).await?;
1804 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1805 } else {
1806 tracing::debug!("skipping archive verification (no archive source)");
1807 }
1808 }
1809 let engine = Engine::default();
1810 let engine_profile =
1811 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1812 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1813 let mut metadata = PackMetadata::fallback(&safe_path);
1814 let mut manifest = None;
1815 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1816 let mut flows = None;
1817 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1818 if is_dir {
1819 Some(safe_path.clone())
1820 } else {
1821 None
1822 }
1823 });
1824 let (pack_assets_dir, assets_tempdir) =
1825 locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1826 let setup_yaml_exists = pack_assets_dir
1827 .as_ref()
1828 .map(|dir| dir.join("setup.yaml").is_file())
1829 .unwrap_or(false);
1830 tracing::info!(
1831 pack_root = %safe_path.display(),
1832 assets_setup_yaml_exists = setup_yaml_exists,
1833 "pack unpack metadata"
1834 );
1835
1836 if let Some(root) = materialized_root.as_ref() {
1837 match load_manifest_and_flows_from_dir(root) {
1838 Ok(ManifestLoad::New {
1839 manifest: m,
1840 flows: cache,
1841 }) => {
1842 metadata = cache.metadata.clone();
1843 manifest = Some(*m);
1844 flows = Some(cache);
1845 }
1846 Ok(ManifestLoad::Legacy {
1847 manifest: m,
1848 flows: cache,
1849 }) => {
1850 metadata = cache.metadata.clone();
1851 legacy_manifest = Some(m);
1852 flows = Some(cache);
1853 }
1854 Err(err) => {
1855 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1856 }
1857 }
1858 }
1859
1860 if manifest.is_none()
1861 && legacy_manifest.is_none()
1862 && let Some(archive_path) = archive_hint
1863 {
1864 let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1865 format!(
1866 "failed to load manifest.cbor from {}",
1867 archive_path.display()
1868 )
1869 })?;
1870 match manifest_load {
1871 ManifestLoad::New {
1872 manifest: m,
1873 flows: cache,
1874 } => {
1875 metadata = cache.metadata.clone();
1876 manifest = Some(*m);
1877 flows = Some(cache);
1878 }
1879 ManifestLoad::Legacy {
1880 manifest: m,
1881 flows: cache,
1882 } => {
1883 metadata = cache.metadata.clone();
1884 legacy_manifest = Some(m);
1885 flows = Some(cache);
1886 }
1887 }
1888 }
1889 #[cfg(feature = "fault-injection")]
1890 {
1891 let fault_ctx = FaultContext {
1892 pack_id: metadata.pack_id.as_str(),
1893 flow_id: "unknown",
1894 node_id: None,
1895 attempt: 1,
1896 };
1897 maybe_fail(FaultPoint::PackResolve, fault_ctx)
1898 .map_err(|err| anyhow!(err.to_string()))?;
1899 }
1900 let mut pack_lock = None;
1901 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1902 pack_lock = load_pack_lock(&root)?;
1903 if pack_lock.is_some() {
1904 break;
1905 }
1906 }
1907 let component_sources_payload = if pack_lock.is_none() {
1908 if let Some(manifest) = manifest.as_ref() {
1909 manifest
1910 .get_component_sources_v1()
1911 .context("invalid component sources extension")?
1912 } else {
1913 None
1914 }
1915 } else {
1916 None
1917 };
1918 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1919 Some(component_sources_table_from_pack_lock(
1920 lock,
1921 component_resolution.allow_missing_hash,
1922 )?)
1923 } else {
1924 component_sources_table(component_sources_payload.as_ref())?
1925 };
1926 let components = if is_component {
1927 let wasm_bytes = fs::read(&safe_path).await?;
1928 metadata = PackMetadata::from_wasm(&wasm_bytes)
1929 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1930 let name = safe_path
1931 .file_stem()
1932 .map(|s| s.to_string_lossy().to_string())
1933 .unwrap_or_else(|| "component".to_string());
1934 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1935 let mut map = HashMap::new();
1936 map.insert(
1937 name.clone(),
1938 PackComponent {
1939 name,
1940 version: metadata.version.clone(),
1941 component,
1942 },
1943 );
1944 map
1945 } else {
1946 let specs = component_specs(
1947 manifest.as_ref(),
1948 legacy_manifest.as_deref(),
1949 component_sources_payload.as_ref(),
1950 pack_lock.as_ref(),
1951 );
1952 if specs.is_empty() {
1953 HashMap::new()
1954 } else {
1955 let mut loaded = HashMap::new();
1956 let mut missing: HashSet<String> =
1957 specs.iter().map(|spec| spec.id.clone()).collect();
1958 let mut searched = Vec::new();
1959
1960 if !component_resolution.overrides.is_empty() {
1961 load_components_from_overrides(
1962 &cache,
1963 &engine,
1964 &component_resolution.overrides,
1965 &specs,
1966 &mut missing,
1967 &mut loaded,
1968 )
1969 .await?;
1970 searched.push("override map".to_string());
1971 }
1972
1973 if let Some(component_sources) = component_sources.as_ref() {
1974 load_components_from_sources(
1975 &cache,
1976 &engine,
1977 component_sources,
1978 &component_resolution,
1979 &specs,
1980 &mut missing,
1981 &mut loaded,
1982 materialized_root.as_deref(),
1983 archive_hint,
1984 )
1985 .await?;
1986 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1987 }
1988
1989 if let Some(root) = materialized_root.as_ref() {
1990 load_components_from_dir(
1991 &cache,
1992 &engine,
1993 root,
1994 &specs,
1995 &mut missing,
1996 &mut loaded,
1997 )
1998 .await?;
1999 searched.push(format!("components dir {}", root.display()));
2000 }
2001
2002 if let Some(archive_path) = archive_hint {
2003 load_components_from_archive(
2004 &cache,
2005 &engine,
2006 archive_path,
2007 &specs,
2008 &mut missing,
2009 &mut loaded,
2010 )
2011 .await?;
2012 searched.push(format!("archive {}", archive_path.display()));
2013 }
2014
2015 if !missing.is_empty() {
2016 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
2017 let sources = if searched.is_empty() {
2018 "no component sources".to_string()
2019 } else {
2020 searched.join(", ")
2021 };
2022 bail!(
2023 "components missing: {}; looked in {}",
2024 missing_list,
2025 sources
2026 );
2027 }
2028
2029 loaded
2030 }
2031 };
2032 let http_client = Arc::clone(&HTTP_CLIENT);
2033 let mut component_manifests = HashMap::new();
2034 if let Some(manifest) = manifest.as_ref() {
2035 for component in &manifest.components {
2036 component_manifests.insert(component.id.as_str().to_string(), component.clone());
2037 }
2038 }
2039 let mut pack_policy = (*wasi_policy).clone();
2040 if let Some(dir) = pack_assets_dir {
2041 tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
2042 pack_policy =
2043 pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
2044 }
2045 let wasi_policy = Arc::new(pack_policy);
2046 Ok(Self {
2047 path: safe_path,
2048 archive_path: archive_hint.map(Path::to_path_buf),
2049 config,
2050 engine,
2051 metadata,
2052 manifest,
2053 legacy_manifest,
2054 component_manifests,
2055 mocks,
2056 flows,
2057 components,
2058 http_client,
2059 pre_cache: Mutex::new(HashMap::new()),
2060 session_store,
2061 state_store,
2062 wasi_policy,
2063 assets_tempdir,
2064 provider_registry: RwLock::new(None),
2065 identify_hint_cache: RwLock::new(HashMap::new()),
2066 secrets,
2067 oauth_config,
2068 cache,
2069 runtime_config_non_secret: None,
2070 runtime_refs: None,
2071 })
2072 }
2073
2074 pub fn set_runtime_config_non_secret(&mut self, map: Option<Arc<BTreeMap<String, Value>>>) {
2078 self.runtime_config_non_secret = map;
2079 }
2080
2081 pub fn runtime_config_non_secret(&self) -> Option<&Arc<BTreeMap<String, Value>>> {
2084 self.runtime_config_non_secret.as_ref()
2085 }
2086
2087 pub fn set_runtime_refs(&mut self, injection: Option<RuntimeRefsInjection>) {
2092 self.runtime_refs = injection;
2093 }
2094
2095 pub fn runtime_refs(&self) -> Option<&RuntimeRefsInjection> {
2098 self.runtime_refs.as_ref()
2099 }
2100
2101 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
2102 if let Some(cache) = &self.flows {
2103 return Ok(cache.descriptors.clone());
2104 }
2105 if let Some(manifest) = &self.manifest {
2106 let descriptors = manifest
2107 .flows
2108 .iter()
2109 .map(|flow| FlowDescriptor {
2110 id: flow.id.as_str().to_string(),
2111 flow_type: flow_kind_to_str(flow.kind).to_string(),
2112 pack_id: manifest.pack_id.as_str().to_string(),
2113 profile: manifest.pack_id.as_str().to_string(),
2114 version: manifest.version.to_string(),
2115 description: None,
2116 })
2117 .collect();
2118 return Ok(descriptors);
2119 }
2120 Ok(Vec::new())
2121 }
2122
2123 #[allow(dead_code)]
2124 pub async fn run_flow(
2125 &self,
2126 flow_id: &str,
2127 input: serde_json::Value,
2128 ) -> Result<serde_json::Value> {
2129 let pack = Arc::new(
2130 PackRuntime::load(
2131 &self.path,
2132 Arc::clone(&self.config),
2133 self.mocks.clone(),
2134 self.archive_path.as_deref(),
2135 self.session_store.clone(),
2136 self.state_store.clone(),
2137 Arc::clone(&self.wasi_policy),
2138 self.secrets.clone(),
2139 self.oauth_config.clone(),
2140 false,
2141 ComponentResolution::default(),
2142 )
2143 .await?,
2144 );
2145
2146 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
2147 let retry_config = self.config.retry_config().into();
2148 let mocks = pack.mocks.as_deref();
2149 let tenant = self.config.tenant.as_str();
2150
2151 let ctx = FlowContext {
2152 tenant,
2153 pack_id: pack.metadata().pack_id.as_str(),
2154 flow_id,
2155 node_id: None,
2156 tool: None,
2157 action: None,
2158 session_id: None,
2159 provider_id: None,
2160 retry_config,
2161 attempt: 1,
2162 observer: None,
2163 mocks,
2164 };
2165
2166 let execution = engine.execute(ctx, input).await?;
2167 match execution.status {
2168 FlowStatus::Completed => Ok(execution.output),
2169 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
2170 "status": "pending",
2171 "reason": wait.reason,
2172 "resume": wait.snapshot,
2173 "response": execution.output,
2174 })),
2175 }
2176 }
2177
2178 pub async fn invoke_component(
2179 &self,
2180 component_ref: &str,
2181 ctx: ComponentExecCtx,
2182 operation: &str,
2183 config_json: Option<String>,
2184 input_json: String,
2185 ) -> Result<Value> {
2186 let pack_component = self
2187 .components
2188 .get(component_ref)
2189 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
2190 let engine = self.engine.clone();
2191 let config = Arc::clone(&self.config);
2192 let http_client = Arc::clone(&self.http_client);
2193 let mocks = self.mocks.clone();
2194 let session_store = self.session_store.clone();
2195 let state_store = self.state_store.clone();
2196 let secrets = Arc::clone(&self.secrets);
2197 let oauth_config = self.oauth_config.clone();
2198 let wasi_policy = Arc::clone(&self.wasi_policy);
2199 let pack_id = self.metadata().pack_id.clone();
2200 let allow_state_store = self.allows_state_store(component_ref);
2201 let component = pack_component.component.clone();
2202 let component_ref_owned = component_ref.to_string();
2203 let operation_owned = operation.to_string();
2204 let input_owned =
2205 Self::merge_component_config_into_input_json(config_json.as_deref(), &input_json)
2206 .context("merge component config into invocation payload")?;
2207 let ctx_owned = ctx;
2208 let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2209 let runtime_refs = self.runtime_refs.clone();
2210
2211 run_on_wasi_thread("component.invoke", move || {
2212 let mut linker = Linker::new(&engine);
2213 register_all(&mut linker, allow_state_store)?;
2214 add_component_control_to_linker(&mut linker)?;
2215
2216 let host_state = HostState::new(
2217 pack_id.clone(),
2218 config,
2219 http_client,
2220 mocks,
2221 session_store,
2222 state_store,
2223 secrets,
2224 oauth_config,
2225 Some(ctx_owned.clone()),
2226 Some(component_ref_owned.clone()),
2227 false,
2228 runtime_config_non_secret,
2229 runtime_refs,
2230 )?;
2231 let store_state = ComponentState::new(host_state, wasi_policy)?;
2232 let mut store = wasmtime::Store::new(&engine, store_state);
2233
2234 let invoke_result = HostState::instantiate_component_result(
2235 &mut linker,
2236 &mut store,
2237 &component,
2238 &ctx_owned,
2239 &component_ref_owned,
2240 &operation_owned,
2241 &input_owned,
2242 )?;
2243 HostState::convert_invoke_result(invoke_result)
2244 })
2245 }
2246
2247 fn merge_component_config_into_input_json(
2248 config_json: Option<&str>,
2249 input_json: &str,
2250 ) -> Result<String> {
2251 let Some(config_json) = config_json else {
2252 return Ok(input_json.to_string());
2253 };
2254
2255 let config_value: Value =
2256 serde_json::from_str(config_json).context("parse component config JSON")?;
2257
2258 if let Ok(mut invocation) =
2259 serde_json::from_str::<greentic_types::InvocationEnvelope>(input_json)
2260 {
2261 let payload_value = serde_json::from_slice(&invocation.payload).unwrap_or_else(|_| {
2262 Value::String(String::from_utf8_lossy(&invocation.payload).into_owned())
2263 });
2264 invocation.payload = serde_json::to_vec(&serde_json::json!({
2265 "config": config_value,
2266 "input": payload_value,
2267 }))
2268 .context("serialize merged invocation payload")?;
2269 return serde_json::to_string(&invocation)
2270 .context("serialize merged invocation envelope");
2271 }
2272
2273 let input_value = serde_json::from_str(input_json)
2274 .unwrap_or_else(|_| Value::String(input_json.to_string()));
2275 serde_json::to_string(&serde_json::json!({
2276 "config": config_value,
2277 "input": input_value,
2278 }))
2279 .context("serialize merged component input")
2280 }
2281
2282 pub fn resolve_provider(
2283 &self,
2284 provider_id: Option<&str>,
2285 provider_type: Option<&str>,
2286 ) -> Result<ProviderBinding> {
2287 let registry = self.provider_registry()?;
2288 registry.resolve(provider_id, provider_type)
2289 }
2290
2291 pub async fn invoke_provider(
2292 &self,
2293 binding: &ProviderBinding,
2294 ctx: ComponentExecCtx,
2295 op: &str,
2296 input_json: Vec<u8>,
2297 ) -> Result<Value> {
2298 let component_ref_owned = binding.component_ref.clone();
2299 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2300 format!("provider component '{component_ref_owned}' not found in pack")
2301 })?;
2302 let component = pack_component.component.clone();
2303
2304 let engine = self.engine.clone();
2305 let config = Arc::clone(&self.config);
2306 let http_client = Arc::clone(&self.http_client);
2307 let mocks = self.mocks.clone();
2308 let session_store = self.session_store.clone();
2309 let state_store = self.state_store.clone();
2310 let secrets = Arc::clone(&self.secrets);
2311 let oauth_config = self.oauth_config.clone();
2312 let wasi_policy = Arc::clone(&self.wasi_policy);
2313 let pack_id = self.metadata().pack_id.clone();
2314 let allow_state_store = self.allows_state_store(&component_ref_owned);
2315 let input_owned = input_json;
2316 let op_owned = op.to_string();
2317 let ctx_owned = ctx;
2318 let world = binding.world.clone();
2319 let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2320 let runtime_refs = self.runtime_refs.clone();
2321
2322 run_on_wasi_thread("provider.invoke", move || {
2323 let mut linker = Linker::new(&engine);
2324 register_all(&mut linker, allow_state_store)?;
2325 add_component_control_to_linker(&mut linker)?;
2326 let host_state = HostState::new(
2327 pack_id.clone(),
2328 config,
2329 http_client,
2330 mocks,
2331 session_store,
2332 state_store,
2333 secrets,
2334 oauth_config,
2335 Some(ctx_owned.clone()),
2336 Some(component_ref_owned.clone()),
2337 true,
2338 runtime_config_non_secret,
2339 runtime_refs,
2340 )?;
2341 let store_state = ComponentState::new(host_state, wasi_policy)?;
2342 let mut store = wasmtime::Store::new(&engine, store_state);
2343 let use_schema_core_schema = world.contains("provider-schema-core");
2344 let use_schema_core_path = world.contains("provider/schema-core");
2345 let result = if use_schema_core_schema {
2346 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2347 let pre: SchemaSchemaCorePre<ComponentState> =
2348 SchemaSchemaCorePre::new(pre_instance)?;
2349 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2350 let provider = bindings.greentic_provider_schema_core_schema_core_api();
2351 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2352 } else if use_schema_core_path {
2353 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2354 let path_attempt = (|| -> Result<Vec<u8>> {
2355 let pre: PathSchemaCorePre<ComponentState> =
2356 PathSchemaCorePre::new(pre_instance)?;
2357 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2358 let provider = bindings.greentic_provider_schema_core_api();
2359 Ok(provider.call_invoke(&mut store, &op_owned, &input_owned)?)
2360 })();
2361 match path_attempt {
2362 Ok(value) => value,
2363 Err(path_err)
2364 if path_err.to_string().contains("no exported instance named") =>
2365 {
2366 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2367 let pre: SchemaSchemaCorePre<ComponentState> =
2368 SchemaSchemaCorePre::new(pre_instance)?;
2369 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2370 let provider = bindings.greentic_provider_schema_core_schema_core_api();
2371 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2372 }
2373 Err(path_err) => return Err(path_err),
2374 }
2375 } else {
2376 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2377 let pre: LegacySchemaCorePre<ComponentState> =
2378 LegacySchemaCorePre::new(pre_instance)?;
2379 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2380 let provider = bindings.greentic_provider_core_schema_core_api();
2381 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2382 };
2383 deserialize_json_bytes(result)
2384 })
2385 }
2386
2387 pub async fn invoke_identify_instance(
2414 &self,
2415 binding: &ProviderBinding,
2416 payload: Vec<u8>,
2417 ) -> Result<IdentifyOutcome> {
2418 let component_ref_owned = binding.component_ref.clone();
2419 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2420 format!("provider component '{component_ref_owned}' not found in pack")
2421 })?;
2422 let component = pack_component.component.clone();
2423
2424 let engine = self.engine.clone();
2425 let config = Arc::clone(&self.config);
2426 let http_client = Arc::clone(&self.http_client);
2427 let mocks = self.mocks.clone();
2428 let session_store = self.session_store.clone();
2429 let state_store = self.state_store.clone();
2430 let secrets = Arc::clone(&self.secrets);
2431 let oauth_config = self.oauth_config.clone();
2432 let pack_id = self.metadata().pack_id.clone();
2433
2434 let wasi_policy = Arc::new(RunnerWasiPolicy::probe());
2439 let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2440 let runtime_refs = self.runtime_refs.clone();
2441 run_on_wasi_thread("provider.identify_instance", move || {
2442 let mut linker = Linker::new(&engine);
2443 register_identity_probe(&mut linker)?;
2444 let host_state = HostState::new(
2445 pack_id.clone(),
2446 config,
2447 http_client,
2448 mocks,
2449 session_store,
2450 state_store,
2451 secrets,
2452 oauth_config,
2453 None,
2454 Some(component_ref_owned.clone()),
2455 true,
2456 runtime_config_non_secret,
2457 runtime_refs,
2458 )?;
2459 let store_state = ComponentState::new(host_state, wasi_policy)?;
2460 let mut store = wasmtime::Store::new(&engine, store_state);
2461
2462 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2463 let pre = match InstanceIdentityPre::<ComponentState>::new(pre_instance) {
2464 Ok(pre) => pre,
2465 Err(err) if is_missing_export_error(&format!("{err:#}")) => {
2466 return Ok(IdentifyOutcome::Unsupported);
2467 }
2468 Err(err) => return Err(err.into()),
2469 };
2470 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2471 let api = bindings.greentic_provider_instance_identity_instance_identity_api();
2472 let result = api.call_identify_instance(&mut store, &payload)?;
2473 Ok(match result {
2474 Some(id) => IdentifyOutcome::Identified(id),
2475 None => IdentifyOutcome::NoMatch,
2476 })
2477 })
2478 }
2479
2480 pub async fn invoke_describe_identify_instance(
2496 &self,
2497 binding: &ProviderBinding,
2498 ) -> Result<Option<IdentifyInstanceHint>> {
2499 let component_ref_owned = binding.component_ref.clone();
2500 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
2501 format!("provider component '{component_ref_owned}' not found in pack")
2502 })?;
2503 let component = pack_component.component.clone();
2504
2505 let engine = self.engine.clone();
2506 let config = Arc::clone(&self.config);
2507 let http_client = Arc::clone(&self.http_client);
2508 let mocks = self.mocks.clone();
2509 let session_store = self.session_store.clone();
2510 let state_store = self.state_store.clone();
2511 let secrets = Arc::clone(&self.secrets);
2512 let oauth_config = self.oauth_config.clone();
2513 let pack_id = self.metadata().pack_id.clone();
2514
2515 let wasi_policy = Arc::new(RunnerWasiPolicy::probe());
2518 let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2519 let runtime_refs = self.runtime_refs.clone();
2520 run_on_wasi_thread("provider.describe_identify_instance", move || {
2521 let mut linker = Linker::new(&engine);
2522 register_identity_probe(&mut linker)?;
2523 let host_state = HostState::new(
2524 pack_id.clone(),
2525 config,
2526 http_client,
2527 mocks,
2528 session_store,
2529 state_store,
2530 secrets,
2531 oauth_config,
2532 None,
2533 Some(component_ref_owned.clone()),
2534 true,
2535 runtime_config_non_secret,
2536 runtime_refs,
2537 )?;
2538 let store_state = ComponentState::new(host_state, wasi_policy)?;
2539 let mut store = wasmtime::Store::new(&engine, store_state);
2540
2541 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2542 let pre = match InstanceIdentityDescribePre::<ComponentState>::new(pre_instance) {
2543 Ok(pre) => pre,
2544 Err(err) if is_missing_export_error(&format!("{err:#}")) => {
2545 return Ok(None);
2546 }
2547 Err(err) => return Err(err.into()),
2548 };
2549 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2550 let api = bindings.greentic_provider_instance_identity_instance_identity_describe_api();
2551 let raw = api.call_describe_identify_instance(&mut store)?;
2552 let Some(bytes) = raw else {
2553 return Ok(None);
2557 };
2558 match IdentifyInstanceHint::from_json(&bytes) {
2559 Ok(hint) => Ok(Some(hint)),
2560 Err(err) => {
2561 warn!(
2566 event = "provider.describe_identify_instance.malformed",
2567 component_ref = %component_ref_owned,
2568 error = %err,
2569 "ignoring malformed describe-identify-instance hint; \
2570 falling back to unhinted wrapper"
2571 );
2572 Ok(None)
2573 }
2574 }
2575 })
2576 }
2577
2578 pub async fn resolve_identify_hint(
2594 &self,
2595 binding: &ProviderBinding,
2596 ) -> Option<IdentifyInstanceHint> {
2597 if let Some(cached) = self.identify_hint_cache.read().get(&binding.component_ref) {
2598 return cached.clone();
2599 }
2600 let hint = match self.invoke_describe_identify_instance(binding).await {
2601 Ok(hint) => hint,
2602 Err(err) => {
2603 warn!(
2604 event = "provider.describe_identify_instance.failed",
2605 component_ref = %binding.component_ref,
2606 error = %err,
2607 "describe-identify-instance probe failed; \
2608 falling back to unhinted wrapper for this component"
2609 );
2610 None
2611 }
2612 };
2613 self.identify_hint_cache
2618 .write()
2619 .insert(binding.component_ref.clone(), hint.clone());
2620 hint
2621 }
2622
2623 pub async fn describe_identify_hints_by_provider_type(
2646 &self,
2647 provider_types: &[&str],
2648 ) -> Result<HashMap<String, Option<IdentifyInstanceHint>>> {
2649 let mut out = HashMap::with_capacity(provider_types.len());
2650 let registry = match self.provider_registry_optional()? {
2651 Some(registry) => registry,
2652 None => {
2653 for ty in provider_types {
2654 out.insert((*ty).to_string(), None);
2655 }
2656 return Ok(out);
2657 }
2658 };
2659 for ty in provider_types {
2660 let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2661 out.insert((*ty).to_string(), None);
2662 continue;
2663 };
2664 let hint = self.resolve_identify_hint(&binding).await;
2665 out.insert((*ty).to_string(), hint);
2666 }
2667 Ok(out)
2668 }
2669
2670 pub async fn identify_endpoints_by_provider_type(
2683 &self,
2684 provider_types: &[&str],
2685 payload: &[u8],
2686 ) -> Result<HashMap<String, IdentifyOutcome>> {
2687 let mut out = HashMap::with_capacity(provider_types.len());
2688 let registry = match self.provider_registry_optional()? {
2689 Some(registry) => registry,
2690 None => {
2691 for ty in provider_types {
2692 out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2693 }
2694 return Ok(out);
2695 }
2696 };
2697 for ty in provider_types {
2698 let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2699 out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2700 continue;
2701 };
2702 let outcome = self
2703 .invoke_identify_instance(&binding, payload.to_vec())
2704 .await?;
2705 out.insert((*ty).to_string(), outcome);
2706 }
2707 Ok(out)
2708 }
2709
2710 pub async fn identify_endpoints_by_provider_type_scoped(
2727 &self,
2728 provider_types: &[&str],
2729 headers: &[(String, String)],
2730 body: &Value,
2731 ) -> Result<HashMap<String, IdentifyOutcome>> {
2732 let mut out = HashMap::with_capacity(provider_types.len());
2733 let registry = match self.provider_registry_optional()? {
2734 Some(registry) => registry,
2735 None => {
2736 for ty in provider_types {
2737 out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2738 }
2739 return Ok(out);
2740 }
2741 };
2742 for ty in provider_types {
2743 let Some(binding) = registry.try_resolve(None, Some(ty))? else {
2744 out.insert((*ty).to_string(), IdentifyOutcome::Unsupported);
2745 continue;
2746 };
2747 let hint = self.resolve_identify_hint(&binding).await;
2748 let payload = build_scoped_identify_payload(headers, body, hint.as_ref());
2749 let outcome = self.invoke_identify_instance(&binding, payload).await?;
2750 out.insert((*ty).to_string(), outcome);
2751 }
2752 Ok(out)
2753 }
2754
2755 pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
2756 if let Some(registry) = self.provider_registry.read().clone() {
2757 return Ok(registry);
2758 }
2759 let manifest = self
2760 .manifest
2761 .as_ref()
2762 .context("pack manifest required for provider resolution")?;
2763 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
2764 let registry = ProviderRegistry::new(
2765 manifest,
2766 self.state_store.clone(),
2767 &self.config.tenant,
2768 &env,
2769 )?;
2770 *self.provider_registry.write() = Some(registry.clone());
2771 Ok(registry)
2772 }
2773
2774 pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
2775 if self.manifest.is_none() {
2776 return Ok(None);
2777 }
2778 Ok(Some(self.provider_registry()?))
2779 }
2780
2781 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
2782 if let Some(cache) = &self.flows {
2783 return cache
2784 .flows
2785 .get(flow_id)
2786 .cloned()
2787 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
2788 }
2789 if let Some(manifest) = &self.manifest {
2790 let entry = manifest
2791 .flows
2792 .iter()
2793 .find(|f| f.id.as_str() == flow_id)
2794 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
2795 return Ok(entry.flow.clone());
2796 }
2797 bail!("flow '{flow_id}' not available (pack exports disabled)")
2798 }
2799
2800 pub fn metadata(&self) -> &PackMetadata {
2801 &self.metadata
2802 }
2803
2804 pub fn read_asset(&self, asset_path: &str) -> Result<Vec<u8>> {
2809 let normalized = asset_path
2810 .trim_start_matches("assets/")
2811 .trim_start_matches("/assets/");
2812 if let Some(tempdir) = &self.assets_tempdir {
2814 let full = tempdir.path().join("assets").join(normalized);
2815 if full.exists() {
2816 return std::fs::read(&full)
2817 .with_context(|| format!("read asset {}", full.display()));
2818 }
2819 }
2820 let full = self.path.join("assets").join(normalized);
2822 if full.exists() {
2823 return std::fs::read(&full).with_context(|| format!("read asset {}", full.display()));
2824 }
2825 bail!("asset not found: {}", asset_path)
2826 }
2827
2828 pub fn component_manifest(&self, component_ref: &str) -> Option<&ComponentManifest> {
2829 self.component_manifests.get(component_ref)
2830 }
2831
2832 pub fn describe_component_contract_v0_6(&self, component_ref: &str) -> Result<Option<Value>> {
2833 let pack_component = self
2834 .components
2835 .get(component_ref)
2836 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
2837 let engine = self.engine.clone();
2838 let config = Arc::clone(&self.config);
2839 let http_client = Arc::clone(&self.http_client);
2840 let mocks = self.mocks.clone();
2841 let session_store = self.session_store.clone();
2842 let state_store = self.state_store.clone();
2843 let secrets = Arc::clone(&self.secrets);
2844 let oauth_config = self.oauth_config.clone();
2845 let wasi_policy = Arc::clone(&self.wasi_policy);
2846 let pack_id = self.metadata().pack_id.clone();
2847 let allow_state_store = self.allows_state_store(component_ref);
2848 let component = pack_component.component.clone();
2849 let component_ref_owned = component_ref.to_string();
2850 let runtime_config_non_secret = self.runtime_config_non_secret.clone();
2851 let runtime_refs = self.runtime_refs.clone();
2852
2853 run_on_wasi_thread("component.describe", move || {
2854 let mut linker = Linker::new(&engine);
2855 register_all(&mut linker, allow_state_store)?;
2856 add_component_control_to_linker(&mut linker)?;
2857
2858 let host_state = HostState::new(
2859 pack_id.clone(),
2860 config,
2861 http_client,
2862 mocks,
2863 session_store,
2864 state_store,
2865 secrets,
2866 oauth_config,
2867 None,
2868 Some(component_ref_owned),
2869 false,
2870 runtime_config_non_secret,
2871 runtime_refs,
2872 )?;
2873 let store_state = ComponentState::new(host_state, wasi_policy)?;
2874 let mut store = wasmtime::Store::new(&engine, store_state);
2875 let pre_instance = linker.instantiate_pre(&component)?;
2876 let pre = match component_api::v0_6_descriptor::ComponentV0V6V0Pre::new(pre_instance) {
2877 Ok(pre) => pre,
2878 Err(_) => return Ok(None),
2879 };
2880 let bytes = block_on(async {
2881 let bindings = pre.instantiate_async(&mut store).await?;
2882 let descriptor = bindings.greentic_component_component_descriptor();
2883 descriptor.call_describe(&mut store)
2884 })?;
2885
2886 if bytes.is_empty() {
2887 return Ok(Some(Value::Null));
2888 }
2889 if let Ok(value) = serde_cbor::from_slice::<Value>(&bytes) {
2890 return Ok(Some(value));
2891 }
2892 if let Ok(value) = serde_json::from_slice::<Value>(&bytes) {
2893 return Ok(Some(value));
2894 }
2895 if let Ok(text) = String::from_utf8(bytes) {
2896 if let Ok(value) = serde_json::from_str::<Value>(&text) {
2897 return Ok(Some(value));
2898 }
2899 return Ok(Some(Value::String(text)));
2900 }
2901 Ok(Some(Value::Null))
2902 })
2903 }
2904
2905 pub fn load_schema_json(&self, schema_ref: &str) -> Result<Option<Value>> {
2906 let rel = normalize_schema_ref(schema_ref)?;
2907 if self.path.is_dir() {
2908 let candidate = self.path.join(&rel);
2909 if candidate.exists() {
2910 let bytes = std::fs::read(&candidate).with_context(|| {
2911 format!("failed to read schema file {}", candidate.display())
2912 })?;
2913 let value = serde_json::from_slice::<Value>(&bytes)
2914 .with_context(|| format!("invalid schema JSON in {}", candidate.display()))?;
2915 return Ok(Some(value));
2916 }
2917 }
2918
2919 if let Some(archive_path) = self
2920 .archive_path
2921 .as_ref()
2922 .or_else(|| path_is_gtpack(&self.path).then_some(&self.path))
2923 {
2924 let file = File::open(archive_path)
2925 .with_context(|| format!("failed to open {}", archive_path.display()))?;
2926 let mut archive = ZipArchive::new(file)
2927 .with_context(|| format!("failed to read pack {}", archive_path.display()))?;
2928 match archive.by_name(&rel) {
2929 Ok(mut entry) => {
2930 let mut bytes = Vec::new();
2931 entry.read_to_end(&mut bytes)?;
2932 let value = serde_json::from_slice::<Value>(&bytes).with_context(|| {
2933 format!("invalid schema JSON in {}:{}", archive_path.display(), rel)
2934 })?;
2935 Ok(Some(value))
2936 }
2937 Err(zip::result::ZipError::FileNotFound) => Ok(None),
2938 Err(err) => Err(anyhow!(err)).with_context(|| {
2939 format!(
2940 "failed to read schema `{}` from {}",
2941 rel,
2942 archive_path.display()
2943 )
2944 }),
2945 }
2946 } else {
2947 Ok(None)
2948 }
2949 }
2950
2951 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
2952 &self.metadata.secret_requirements
2953 }
2954
2955 pub fn missing_secrets(
2956 &self,
2957 tenant_ctx: &TypesTenantCtx,
2958 ) -> Vec<greentic_types::SecretRequirement> {
2959 let env = tenant_ctx.env.as_str().to_string();
2960 let tenant = tenant_ctx.tenant.as_str().to_string();
2961 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
2962 self.required_secrets()
2963 .iter()
2964 .filter(|req| {
2965 if let Some(scope) = &req.scope {
2967 if scope.env != env {
2968 return false;
2969 }
2970 if scope.tenant != tenant {
2971 return false;
2972 }
2973 if let Some(ref team_req) = scope.team
2974 && team.as_ref() != Some(team_req)
2975 {
2976 return false;
2977 }
2978 }
2979 let ctx = self.config.tenant_ctx();
2980 read_secret_blocking(
2981 &self.secrets,
2982 &ctx,
2983 &self.metadata.pack_id,
2984 canonicalize_secret_key(req.key.as_str()).as_str(),
2985 )
2986 .is_err()
2987 })
2988 .cloned()
2989 .collect()
2990 }
2991
2992 pub fn for_component_test(
2993 components: Vec<(String, PathBuf)>,
2994 flows: HashMap<String, FlowIR>,
2995 pack_id: &str,
2996 config: Arc<HostConfig>,
2997 ) -> Result<Self> {
2998 let engine = Engine::default();
2999 let engine_profile =
3000 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
3001 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
3002 let mut component_map = HashMap::new();
3003 for (name, path) in components {
3004 if !path.exists() {
3005 bail!("component artifact missing: {}", path.display());
3006 }
3007 let wasm_bytes = std::fs::read(&path)?;
3008 let component =
3009 Arc::new(Component::from_binary(&engine, &wasm_bytes).map_err(|err| {
3010 anyhow!("failed to compile component {}: {err}", path.display())
3011 })?);
3012 component_map.insert(
3013 name.clone(),
3014 PackComponent {
3015 name,
3016 version: "0.0.0".into(),
3017 component,
3018 },
3019 );
3020 }
3021
3022 let mut flow_map = HashMap::new();
3023 let mut descriptors = Vec::new();
3024 for (id, ir) in flows {
3025 let flow_type = ir.flow_type.clone();
3026 let flow = flow_ir_to_flow(ir)?;
3027 flow_map.insert(id.clone(), flow);
3028 descriptors.push(FlowDescriptor {
3029 id: id.clone(),
3030 flow_type,
3031 pack_id: pack_id.to_string(),
3032 profile: "test".into(),
3033 version: "0.0.0".into(),
3034 description: None,
3035 });
3036 }
3037 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
3038 let metadata = PackMetadata {
3039 pack_id: pack_id.to_string(),
3040 version: "0.0.0".into(),
3041 entry_flows,
3042 secret_requirements: Vec::new(),
3043 };
3044 let flows_cache = PackFlows {
3045 descriptors: descriptors.clone(),
3046 flows: flow_map,
3047 metadata: metadata.clone(),
3048 };
3049
3050 Ok(Self {
3051 path: PathBuf::new(),
3052 archive_path: None,
3053 config,
3054 engine,
3055 metadata,
3056 manifest: None,
3057 legacy_manifest: None,
3058 component_manifests: HashMap::new(),
3059 mocks: None,
3060 flows: Some(flows_cache),
3061 components: component_map,
3062 http_client: Arc::clone(&HTTP_CLIENT),
3063 pre_cache: Mutex::new(HashMap::new()),
3064 session_store: None,
3065 state_store: None,
3066 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
3067 assets_tempdir: None,
3068 provider_registry: RwLock::new(None),
3069 identify_hint_cache: RwLock::new(HashMap::new()),
3070 secrets: crate::secrets::default_manager()?,
3071 oauth_config: None,
3072 cache,
3073 runtime_config_non_secret: None,
3074 runtime_refs: None,
3075 })
3076 }
3077}
3078
3079fn normalize_schema_ref(schema_ref: &str) -> Result<String> {
3080 let candidate = schema_ref.trim();
3081 if candidate.is_empty() {
3082 bail!("schema ref cannot be empty");
3083 }
3084 let path = Path::new(candidate);
3085 if path.is_absolute() {
3086 bail!("schema ref must be relative: {}", schema_ref);
3087 }
3088 let mut normalized = PathBuf::new();
3089 for component in path.components() {
3090 match component {
3091 std::path::Component::Normal(part) => normalized.push(part),
3092 std::path::Component::CurDir => {}
3093 _ => bail!("schema ref must not contain traversal: {}", schema_ref),
3094 }
3095 }
3096 let normalized = normalized
3097 .to_str()
3098 .map(ToString::to_string)
3099 .ok_or_else(|| anyhow!("schema ref must be valid UTF-8"))?;
3100 if normalized.is_empty() {
3101 bail!("schema ref cannot normalize to empty path");
3102 }
3103 Ok(normalized)
3104}
3105
3106fn path_is_gtpack(path: &Path) -> bool {
3107 path.extension()
3108 .and_then(|ext| ext.to_str())
3109 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
3110 .unwrap_or(false)
3111}
3112
3113fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
3114 let message = err.to_string();
3115 message.contains("no exported instance named")
3116 && message.contains(&format!("greentic:component/node@{version}"))
3117}
3118
3119struct PackFlows {
3120 descriptors: Vec<FlowDescriptor>,
3121 flows: HashMap<String, Flow>,
3122 metadata: PackMetadata,
3123}
3124
3125const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
3126 "greentic.pack.runtime_flow",
3127 "greentic.pack.flow_runtime",
3128 "greentic.pack.runtime_flows",
3129];
3130
3131#[derive(Debug, Deserialize)]
3132struct RuntimeFlowBundle {
3133 flows: Vec<RuntimeFlow>,
3134}
3135
3136#[derive(Debug, Deserialize)]
3137struct RuntimeFlow {
3138 id: String,
3139 #[serde(alias = "flow_type")]
3140 kind: FlowKind,
3141 #[serde(default)]
3142 schema_version: Option<String>,
3143 #[serde(default)]
3144 start: Option<String>,
3145 #[serde(default)]
3146 entrypoints: BTreeMap<String, Value>,
3147 nodes: BTreeMap<String, RuntimeNode>,
3148 #[serde(default)]
3149 metadata: Option<FlowMetadata>,
3150}
3151
3152#[derive(Debug, Deserialize)]
3153struct RuntimeNode {
3154 #[serde(alias = "component")]
3155 component_id: String,
3156 #[serde(default, alias = "operation")]
3157 operation_name: Option<String>,
3158 #[serde(default, alias = "payload", alias = "input")]
3159 operation_payload: Value,
3160 #[serde(default)]
3161 config: Value,
3162 #[serde(default)]
3163 routing: Option<Routing>,
3164 #[serde(default)]
3165 telemetry: Option<TelemetryHints>,
3166}
3167
3168fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
3169 if bytes.is_empty() {
3170 return Ok(Value::Null);
3171 }
3172 serde_json::from_slice(&bytes).or_else(|_| {
3173 String::from_utf8(bytes)
3174 .map(Value::String)
3175 .map_err(|err| anyhow!(err))
3176 })
3177}
3178
3179fn is_missing_export_error(message: &str) -> bool {
3195 let has_broad_marker = message.contains("no exported instance named")
3196 || message.contains("no exported function named");
3197 let has_identity_segment = message.contains("instance-identity-api")
3198 || message.contains("identify-instance")
3199 || message.contains("instance-identity-describe-api")
3200 || message.contains("describe-identify-instance");
3201 has_broad_marker && has_identity_segment
3202}
3203
3204fn build_scoped_identify_payload(
3222 headers: &[(String, String)],
3223 body: &Value,
3224 hint: Option<&IdentifyInstanceHint>,
3225) -> Vec<u8> {
3226 let scoped_headers: Vec<&(String, String)> = match hint {
3227 Some(hint) => {
3230 let allowed = hint.header_names();
3231 headers
3232 .iter()
3233 .filter(|(name, _)| allowed.contains(&name.as_str()))
3234 .collect()
3235 }
3236 None => headers.iter().collect(),
3237 };
3238 let wrapper = serde_json::json!({
3239 "headers": scoped_headers
3240 .iter()
3241 .map(|(name, value)| serde_json::json!({ "name": name, "value": value }))
3242 .collect::<Vec<_>>(),
3243 "body": body,
3244 });
3245 serde_json::to_vec(&wrapper).expect("wrapper payload always serializes")
3246}
3247
3248#[cfg(test)]
3249mod build_scoped_identify_payload_tests {
3250 use super::*;
3251 use crate::identify_hint::HintSource;
3252 use serde_json::json;
3253
3254 fn hint(sources: Vec<HintSource>) -> IdentifyInstanceHint {
3255 IdentifyInstanceHint { sources }
3256 }
3257
3258 #[test]
3259 fn unhinted_passes_all_input_headers_through() {
3260 let headers = vec![
3264 (
3265 "x-telegram-bot-api-secret-token".into(),
3266 "telegram-tok".into(),
3267 ),
3268 ("x-future-routing-tag".into(), "abc".into()),
3269 ];
3270 let body = json!({ "update_id": 1 });
3271 let bytes = build_scoped_identify_payload(&headers, &body, None);
3272 let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3273 assert_eq!(
3274 parsed["headers"],
3275 json!([
3276 { "name": "x-telegram-bot-api-secret-token", "value": "telegram-tok" },
3277 { "name": "x-future-routing-tag", "value": "abc" }
3278 ])
3279 );
3280 assert_eq!(parsed["body"], body);
3281 }
3282
3283 #[test]
3284 fn header_hint_filters_to_declared_names_only() {
3285 let h = hint(vec![HintSource::Header {
3289 name: "x-telegram-bot-api-secret-token".into(),
3290 }]);
3291 let headers = vec![
3292 (
3293 "x-telegram-bot-api-secret-token".into(),
3294 "telegram-tok".into(),
3295 ),
3296 ("x-slack-signature".into(), "v0=sig".into()),
3297 ];
3298 let body = json!({});
3299 let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3300 let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3301 assert_eq!(
3302 parsed["headers"],
3303 json!([
3304 { "name": "x-telegram-bot-api-secret-token", "value": "telegram-tok" }
3305 ])
3306 );
3307 }
3308
3309 #[test]
3310 fn hints_without_header_sources_drop_all_headers() {
3311 let headers = vec![(
3316 "x-telegram-bot-api-secret-token".into(),
3317 "should-not-leak".into(),
3318 )];
3319 let body = json!({ "anything": true });
3320 for h in [
3321 hint(vec![HintSource::BodyPath {
3322 json_pointer: "/recipient/id".into(),
3323 }]),
3324 hint(vec![]),
3325 ] {
3326 let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3327 let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3328 assert_eq!(parsed["headers"], json!([]), "hint={:?}", h.sources);
3329 assert_eq!(parsed["body"], body);
3330 }
3331 }
3332
3333 #[test]
3334 fn header_filter_preserves_input_order_and_dups() {
3335 let h = hint(vec![HintSource::Header {
3340 name: "x-route".into(),
3341 }]);
3342 let headers = vec![
3343 ("x-route".into(), "a".into()),
3344 ("x-other".into(), "skip".into()),
3345 ("x-route".into(), "b".into()),
3346 ];
3347 let body = json!({});
3348 let bytes = build_scoped_identify_payload(&headers, &body, Some(&h));
3349 let parsed: Value = serde_json::from_slice(&bytes).unwrap();
3350 assert_eq!(
3351 parsed["headers"],
3352 json!([
3353 { "name": "x-route", "value": "a" },
3354 { "name": "x-route", "value": "b" }
3355 ])
3356 );
3357 }
3358}
3359
3360impl PackFlows {
3361 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
3362 if let Some(flows) = flows_from_runtime_extension(&manifest) {
3363 return flows;
3364 }
3365 let descriptors = manifest
3366 .flows
3367 .iter()
3368 .map(|entry| FlowDescriptor {
3369 id: entry.id.as_str().to_string(),
3370 flow_type: flow_kind_to_str(entry.kind).to_string(),
3371 pack_id: manifest.pack_id.as_str().to_string(),
3372 profile: manifest.pack_id.as_str().to_string(),
3373 version: manifest.version.to_string(),
3374 description: None,
3375 })
3376 .collect();
3377 let mut flows = HashMap::new();
3378 for entry in &manifest.flows {
3379 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
3380 }
3381 Self {
3382 metadata: PackMetadata::from_manifest(&manifest),
3383 descriptors,
3384 flows,
3385 }
3386 }
3387}
3388
3389fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
3390 let extensions = manifest.extensions.as_ref()?;
3391 let extension = extensions.iter().find_map(|(key, ext)| {
3392 if RUNTIME_FLOW_EXTENSION_IDS
3393 .iter()
3394 .any(|candidate| candidate == key)
3395 {
3396 Some(ext)
3397 } else {
3398 None
3399 }
3400 })?;
3401 let runtime_flows = match decode_runtime_flow_extension(extension) {
3402 Some(flows) if !flows.is_empty() => flows,
3403 _ => return None,
3404 };
3405
3406 let descriptors = runtime_flows
3407 .iter()
3408 .map(|flow| FlowDescriptor {
3409 id: flow.id.as_str().to_string(),
3410 flow_type: flow_kind_to_str(flow.kind).to_string(),
3411 pack_id: manifest.pack_id.as_str().to_string(),
3412 profile: manifest.pack_id.as_str().to_string(),
3413 version: manifest.version.to_string(),
3414 description: None,
3415 })
3416 .collect::<Vec<_>>();
3417 let flows = runtime_flows
3418 .into_iter()
3419 .map(|flow| (flow.id.as_str().to_string(), flow))
3420 .collect();
3421
3422 Some(PackFlows {
3423 metadata: PackMetadata::from_manifest(manifest),
3424 descriptors,
3425 flows,
3426 })
3427}
3428
3429fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
3430 let value = match extension.inline.as_ref()? {
3431 ExtensionInline::Other(value) => value.clone(),
3432 _ => return None,
3433 };
3434
3435 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
3436 return Some(collect_runtime_flows(bundle.flows));
3437 }
3438
3439 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
3440 return Some(collect_runtime_flows(flows));
3441 }
3442
3443 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
3444 return Some(flows);
3445 }
3446
3447 warn!(
3448 extension = %extension.kind,
3449 version = %extension.version,
3450 "runtime flow extension present but could not be decoded"
3451 );
3452 None
3453}
3454
3455fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
3456 flows
3457 .into_iter()
3458 .filter_map(|flow| match runtime_flow_to_flow(flow) {
3459 Ok(flow) => Some(flow),
3460 Err(err) => {
3461 warn!(error = %err, "failed to decode runtime flow");
3462 None
3463 }
3464 })
3465 .collect()
3466}
3467
3468fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
3469 let flow_id = FlowId::from_str(&runtime.id)
3470 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
3471 let mut entrypoints = runtime.entrypoints;
3472 if entrypoints.is_empty()
3473 && let Some(start) = &runtime.start
3474 {
3475 entrypoints.insert("default".into(), Value::String(start.clone()));
3476 }
3477
3478 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
3479 for (id, node) in runtime.nodes {
3480 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
3481 let component_id = ComponentId::from_str(&node.component_id)
3482 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
3483 let operation_payload = if node.config.is_null() {
3484 node.operation_payload
3485 } else {
3486 serde_json::json!({
3487 "input": node.operation_payload,
3488 "config": node.config,
3489 })
3490 };
3491 let component = FlowComponentRef {
3492 id: component_id,
3493 pack_alias: None,
3494 operation: node.operation_name,
3495 };
3496 let routing = node.routing.unwrap_or(Routing::End);
3497 let telemetry = node.telemetry.unwrap_or_default();
3498 nodes.insert(
3499 node_id.clone(),
3500 Node {
3501 id: node_id,
3502 component,
3503 input: InputMapping {
3504 mapping: operation_payload,
3505 },
3506 output: OutputMapping {
3507 mapping: Value::Null,
3508 },
3509 err_map: None,
3510 routing,
3511 telemetry,
3512 },
3513 );
3514 }
3515
3516 Ok(Flow {
3517 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
3518 id: flow_id,
3519 kind: runtime.kind,
3520 entrypoints,
3521 nodes,
3522 metadata: runtime.metadata.unwrap_or_default(),
3523 })
3524}
3525
3526fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
3527 match kind {
3528 greentic_types::FlowKind::Messaging => "messaging",
3529 greentic_types::FlowKind::Event => "event",
3530 greentic_types::FlowKind::ComponentConfig => "component-config",
3531 greentic_types::FlowKind::Job => "job",
3532 greentic_types::FlowKind::Http => "http",
3533 }
3534}
3535
3536fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
3537 let mut file = archive
3538 .by_name(name)
3539 .with_context(|| format!("entry {name} missing from archive"))?;
3540 let mut buf = Vec::new();
3541 file.read_to_end(&mut buf)?;
3542 Ok(buf)
3543}
3544
3545fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
3546 for node in doc.nodes.values_mut() {
3547 let Some((component_ref, payload)) = node
3548 .raw
3549 .iter()
3550 .next()
3551 .map(|(key, value)| (key.clone(), value.clone()))
3552 else {
3553 continue;
3554 };
3555 if component_ref.starts_with("emit.") {
3556 node.operation = Some(component_ref);
3557 node.payload = payload;
3558 node.raw.clear();
3559 continue;
3560 }
3561 let (target_component, operation, input, config) =
3562 infer_component_exec(&payload, &component_ref);
3563 let mut payload_obj = serde_json::Map::new();
3564 payload_obj.insert("component".into(), Value::String(target_component));
3566 payload_obj.insert("operation".into(), Value::String(operation));
3567 payload_obj.insert("input".into(), input);
3568 if let Some(cfg) = config {
3569 payload_obj.insert("config".into(), cfg);
3570 }
3571 node.operation = Some("component.exec".to_string());
3572 node.payload = Value::Object(payload_obj);
3573 node.raw.clear();
3574 }
3575 doc
3576}
3577
3578fn infer_component_exec(
3579 payload: &Value,
3580 component_ref: &str,
3581) -> (String, String, Value, Option<Value>) {
3582 let default_op = if component_ref.starts_with("templating.") {
3583 "render"
3584 } else {
3585 "invoke"
3586 }
3587 .to_string();
3588
3589 if let Value::Object(map) = payload {
3590 let has_embedded_component =
3591 map.get("component").is_some() || map.get("component_ref").is_some();
3592 let op = map
3593 .get("op")
3594 .or_else(|| map.get("operation"))
3595 .and_then(Value::as_str)
3596 .map(|s| s.to_string())
3597 .unwrap_or_else(|| {
3598 if has_embedded_component {
3599 component_ref.to_string()
3600 } else {
3601 default_op.clone()
3602 }
3603 });
3604
3605 let mut input = map.clone();
3606 let config = input.remove("config");
3607 let canonical_input = if has_embedded_component {
3608 input.get("input").cloned()
3609 } else {
3610 None
3611 };
3612 let component = input
3613 .get("component")
3614 .or_else(|| input.get("component_ref"))
3615 .and_then(Value::as_str)
3616 .map(|s| s.to_string())
3617 .unwrap_or_else(|| component_ref.to_string());
3618 input.remove("component");
3619 input.remove("component_ref");
3620 input.remove("op");
3621 input.remove("operation");
3622 let input = canonical_input.unwrap_or(Value::Object(input));
3623 return (component, op, input, config);
3624 }
3625
3626 (component_ref.to_string(), default_op, payload.clone(), None)
3627}
3628
3629#[derive(Clone, Debug)]
3630struct ComponentSpec {
3631 id: String,
3632 version: String,
3633 legacy_path: Option<String>,
3634}
3635
3636#[derive(Clone, Debug)]
3637struct ComponentSourceInfo {
3638 digest: Option<String>,
3639 source: ComponentSourceRef,
3640 artifact: ComponentArtifactLocation,
3641 expected_wasm_sha256: Option<String>,
3642 skip_digest_verification: bool,
3643}
3644
3645#[derive(Clone, Debug)]
3646enum ComponentArtifactLocation {
3647 Inline { wasm_path: String },
3648 Remote,
3649}
3650
3651#[derive(Clone, Debug, Deserialize)]
3652struct PackLockV1 {
3653 schema_version: u32,
3654 components: Vec<PackLockComponent>,
3655}
3656
3657#[derive(Clone, Debug, Deserialize)]
3658struct PackLockComponent {
3659 name: String,
3660 #[serde(default, rename = "source_ref")]
3661 source_ref: Option<String>,
3662 #[serde(default, rename = "ref")]
3663 legacy_ref: Option<String>,
3664 #[serde(default)]
3665 component_id: Option<ComponentId>,
3666 #[serde(default)]
3667 bundled: Option<bool>,
3668 #[serde(default, rename = "bundled_path")]
3669 bundled_path: Option<String>,
3670 #[serde(default, rename = "path")]
3671 legacy_path: Option<String>,
3672 #[serde(default)]
3673 wasm_sha256: Option<String>,
3674 #[serde(default, rename = "sha256")]
3675 legacy_sha256: Option<String>,
3676 #[serde(default)]
3677 resolved_digest: Option<String>,
3678 #[serde(default)]
3679 digest: Option<String>,
3680}
3681
3682fn component_specs(
3683 manifest: Option<&greentic_types::PackManifest>,
3684 legacy_manifest: Option<&legacy_pack::PackManifest>,
3685 component_sources: Option<&ComponentSourcesV1>,
3686 pack_lock: Option<&PackLockV1>,
3687) -> Vec<ComponentSpec> {
3688 if let Some(manifest) = manifest {
3689 if !manifest.components.is_empty() {
3690 return manifest
3691 .components
3692 .iter()
3693 .map(|entry| ComponentSpec {
3694 id: entry.id.as_str().to_string(),
3695 version: entry.version.to_string(),
3696 legacy_path: None,
3697 })
3698 .collect();
3699 }
3700 if let Some(lock) = pack_lock {
3701 let mut seen = HashSet::new();
3702 let mut specs = Vec::new();
3703 for entry in &lock.components {
3704 let id = entry
3705 .component_id
3706 .as_ref()
3707 .map(|id| id.as_str())
3708 .unwrap_or(entry.name.as_str());
3709 if seen.insert(id.to_string()) {
3710 specs.push(ComponentSpec {
3711 id: id.to_string(),
3712 version: "0.0.0".to_string(),
3713 legacy_path: None,
3714 });
3715 }
3716 }
3717 return specs;
3718 }
3719 if let Some(sources) = component_sources {
3720 let mut seen = HashSet::new();
3721 let mut specs = Vec::new();
3722 for entry in &sources.components {
3723 let id = entry
3724 .component_id
3725 .as_ref()
3726 .map(|id| id.as_str())
3727 .unwrap_or(entry.name.as_str());
3728 if seen.insert(id.to_string()) {
3729 specs.push(ComponentSpec {
3730 id: id.to_string(),
3731 version: "0.0.0".to_string(),
3732 legacy_path: None,
3733 });
3734 }
3735 }
3736 return specs;
3737 }
3738 }
3739 if let Some(legacy_manifest) = legacy_manifest {
3740 return legacy_manifest
3741 .components
3742 .iter()
3743 .map(|entry| ComponentSpec {
3744 id: entry.name.clone(),
3745 version: entry.version.to_string(),
3746 legacy_path: Some(entry.file_wasm.clone()),
3747 })
3748 .collect();
3749 }
3750 Vec::new()
3751}
3752
3753fn component_sources_table(
3754 sources: Option<&ComponentSourcesV1>,
3755) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
3756 let Some(sources) = sources else {
3757 return Ok(None);
3758 };
3759 let mut table = HashMap::new();
3760 for entry in &sources.components {
3761 let artifact = match &entry.artifact {
3762 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
3763 wasm_path: wasm_path.clone(),
3764 },
3765 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
3766 };
3767 let info = ComponentSourceInfo {
3768 digest: Some(entry.resolved.digest.clone()),
3769 source: entry.source.clone(),
3770 artifact,
3771 expected_wasm_sha256: None,
3772 skip_digest_verification: false,
3773 };
3774 if let Some(component_id) = entry.component_id.as_ref() {
3775 table.insert(component_id.as_str().to_string(), info.clone());
3776 }
3777 table.insert(entry.name.clone(), info);
3778 }
3779 Ok(Some(table))
3780}
3781
3782fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
3783 let lock_path = if path.is_dir() {
3784 let candidate = path.join("pack.lock");
3785 if candidate.exists() {
3786 Some(candidate)
3787 } else {
3788 let candidate = path.join("pack.lock.json");
3789 candidate.exists().then_some(candidate)
3790 }
3791 } else {
3792 None
3793 };
3794 let Some(lock_path) = lock_path else {
3795 return Ok(None);
3796 };
3797 let raw = std::fs::read_to_string(&lock_path)
3798 .with_context(|| format!("failed to read {}", lock_path.display()))?;
3799 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
3800 if lock.schema_version != 1 {
3801 bail!("pack.lock schema_version must be 1");
3802 }
3803 Ok(Some(lock))
3804}
3805
3806fn find_pack_lock_roots(
3807 pack_path: &Path,
3808 is_dir: bool,
3809 archive_hint: Option<&Path>,
3810) -> Vec<PathBuf> {
3811 if is_dir {
3812 return vec![pack_path.to_path_buf()];
3813 }
3814 let mut roots = Vec::new();
3815 if let Some(archive_path) = archive_hint {
3816 if let Some(parent) = archive_path.parent() {
3817 roots.push(parent.to_path_buf());
3818 if let Some(grandparent) = parent.parent() {
3819 roots.push(grandparent.to_path_buf());
3820 }
3821 }
3822 } else if let Some(parent) = pack_path.parent() {
3823 roots.push(parent.to_path_buf());
3824 if let Some(grandparent) = parent.parent() {
3825 roots.push(grandparent.to_path_buf());
3826 }
3827 }
3828 roots
3829}
3830
3831fn normalize_sha256(digest: &str) -> Result<String> {
3832 let trimmed = digest.trim();
3833 if trimmed.is_empty() {
3834 bail!("sha256 digest cannot be empty");
3835 }
3836 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
3837 if stripped.is_empty() {
3838 bail!("sha256 digest must include hex bytes after sha256:");
3839 }
3840 return Ok(trimmed.to_string());
3841 }
3842 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
3843 return Ok(format!("sha256:{trimmed}"));
3844 }
3845 bail!("sha256 digest must be hex or sha256:<hex>");
3846}
3847
3848fn component_sources_table_from_pack_lock(
3849 lock: &PackLockV1,
3850 allow_missing_hash: bool,
3851) -> Result<HashMap<String, ComponentSourceInfo>> {
3852 let mut table = HashMap::new();
3853 let mut names = HashSet::new();
3854 for entry in &lock.components {
3855 if !names.insert(entry.name.clone()) {
3856 bail!(
3857 "pack.lock contains duplicate component name `{}`",
3858 entry.name
3859 );
3860 }
3861 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
3862 (Some(primary), Some(legacy)) => {
3863 if primary != legacy {
3864 bail!(
3865 "pack.lock component {} has conflicting refs: {} vs {}",
3866 entry.name,
3867 primary,
3868 legacy
3869 );
3870 }
3871 primary.as_str()
3872 }
3873 (Some(primary), None) => primary.as_str(),
3874 (None, Some(legacy)) => legacy.as_str(),
3875 (None, None) => {
3876 bail!("pack.lock component {} missing source_ref", entry.name);
3877 }
3878 };
3879 let source: ComponentSourceRef = source_ref
3880 .parse()
3881 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
3882 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
3883 (Some(primary), Some(legacy)) => {
3884 if primary != legacy {
3885 bail!(
3886 "pack.lock component {} has conflicting bundled paths: {} vs {}",
3887 entry.name,
3888 primary,
3889 legacy
3890 );
3891 }
3892 Some(primary.clone())
3893 }
3894 (Some(primary), None) => Some(primary.clone()),
3895 (None, Some(legacy)) => Some(legacy.clone()),
3896 (None, None) => None,
3897 };
3898 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
3899 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
3900 let wasm_path = bundled_path.ok_or_else(|| {
3901 anyhow!(
3902 "pack.lock component {} marked bundled but bundled_path is missing",
3903 entry.name
3904 )
3905 })?;
3906 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
3907 (Some(primary), Some(legacy)) => {
3908 if primary != legacy {
3909 bail!(
3910 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
3911 entry.name,
3912 primary,
3913 legacy
3914 );
3915 }
3916 Some(primary.as_str())
3917 }
3918 (Some(primary), None) => Some(primary.as_str()),
3919 (None, Some(legacy)) => Some(legacy.as_str()),
3920 (None, None) => None,
3921 };
3922 let expected = match expected_raw {
3923 Some(value) => Some(normalize_sha256(value)?),
3924 None => None,
3925 };
3926 if expected.is_none() && !allow_missing_hash {
3927 bail!(
3928 "pack.lock component {} missing wasm_sha256 for bundled component",
3929 entry.name
3930 );
3931 }
3932 (
3933 ComponentArtifactLocation::Inline { wasm_path },
3934 expected.clone(),
3935 expected,
3936 allow_missing_hash && expected_raw.is_none(),
3937 )
3938 } else {
3939 if source.is_tag() {
3940 bail!(
3941 "component {} uses tag ref {} but is not bundled; rebuild the pack",
3942 entry.name,
3943 source
3944 );
3945 }
3946 let expected = entry
3947 .resolved_digest
3948 .as_deref()
3949 .or(entry.digest.as_deref())
3950 .ok_or_else(|| {
3951 anyhow!(
3952 "pack.lock component {} missing resolved_digest for remote component",
3953 entry.name
3954 )
3955 })?;
3956 (
3957 ComponentArtifactLocation::Remote,
3958 Some(normalize_digest(expected)),
3959 None,
3960 false,
3961 )
3962 };
3963 let info = ComponentSourceInfo {
3964 digest,
3965 source,
3966 artifact,
3967 expected_wasm_sha256,
3968 skip_digest_verification,
3969 };
3970 if let Some(component_id) = entry.component_id.as_ref() {
3971 let key = component_id.as_str().to_string();
3972 if table.contains_key(&key) {
3973 bail!(
3974 "pack.lock contains duplicate component id `{}`",
3975 component_id.as_str()
3976 );
3977 }
3978 table.insert(key, info.clone());
3979 }
3980 if entry.name
3981 != entry
3982 .component_id
3983 .as_ref()
3984 .map(|id| id.as_str())
3985 .unwrap_or("")
3986 {
3987 table.insert(entry.name.clone(), info);
3988 }
3989 }
3990 Ok(table)
3991}
3992
3993fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
3994 if let Some(path) = &spec.legacy_path {
3995 return root.join(path);
3996 }
3997 root.join("components").join(format!("{}.wasm", spec.id))
3998}
3999
4000fn normalize_digest(digest: &str) -> String {
4001 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
4002 digest.to_string()
4003 } else {
4004 format!("sha256:{digest}")
4005 }
4006}
4007
4008fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
4009 if digest.starts_with("blake3:") {
4010 let hash = blake3::hash(bytes);
4011 return Ok(format!("blake3:{}", hash.to_hex()));
4012 }
4013 let mut hasher = sha2::Sha256::new();
4014 hasher.update(bytes);
4015 Ok(format!("sha256:{}", to_hex(&hasher.finalize())))
4016}
4017
4018fn compute_sha256_digest_for(bytes: &[u8]) -> String {
4019 let mut hasher = sha2::Sha256::new();
4020 hasher.update(bytes);
4021 format!("sha256:{}", to_hex(&hasher.finalize()))
4022}
4023
4024fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
4025 let wasm_digest = digest
4026 .map(normalize_digest)
4027 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
4028 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
4029}
4030
4031async fn compile_component_with_cache(
4032 cache: &CacheManager,
4033 engine: &Engine,
4034 digest: Option<&str>,
4035 bytes: Vec<u8>,
4036) -> Result<Arc<Component>> {
4037 let key = build_artifact_key(cache, digest, &bytes);
4038 cache.get_component(engine, &key, || Ok(bytes)).await
4039}
4040
4041fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
4042 let normalized_expected = normalize_digest(expected);
4043 let actual = compute_digest_for(bytes, &normalized_expected)?;
4044 if normalize_digest(&actual) != normalized_expected {
4045 bail!(
4046 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
4047 );
4048 }
4049 Ok(())
4050}
4051
4052fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
4053 let normalized_expected = normalize_sha256(expected)?;
4054 let actual = compute_sha256_digest_for(bytes);
4055 if actual != normalized_expected {
4056 bail!(
4057 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
4058 );
4059 }
4060 Ok(())
4061}
4062
4063fn to_hex(digest: &[u8]) -> String {
4064 digest.iter().map(|byte| format!("{byte:02x}")).collect()
4065}
4066
4067#[cfg(test)]
4068mod pack_lock_tests {
4069 use super::*;
4070 use tempfile::TempDir;
4071
4072 #[test]
4073 fn pack_lock_tag_ref_requires_bundle() {
4074 let lock = PackLockV1 {
4075 schema_version: 1,
4076 components: vec![PackLockComponent {
4077 name: "templates".to_string(),
4078 source_ref: Some("oci://registry.test/templates:latest".to_string()),
4079 legacy_ref: None,
4080 component_id: None,
4081 bundled: Some(false),
4082 bundled_path: None,
4083 legacy_path: None,
4084 wasm_sha256: None,
4085 legacy_sha256: None,
4086 resolved_digest: None,
4087 digest: None,
4088 }],
4089 };
4090 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
4091 assert!(
4092 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
4093 "unexpected error: {err}"
4094 );
4095 }
4096
4097 #[test]
4098 fn bundled_hash_mismatch_errors() {
4099 let rt = tokio::runtime::Runtime::new().expect("runtime");
4100 let temp = TempDir::new().expect("temp dir");
4101 let engine = Engine::default();
4102 let engine_profile =
4103 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
4104 let cache_config = CacheConfig {
4105 root: temp.path().join("cache"),
4106 ..CacheConfig::default()
4107 };
4108 let cache = CacheManager::new(cache_config, engine_profile);
4109 let wasm_path = temp.path().join("component.wasm");
4110 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
4111 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
4112 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
4113 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
4114
4115 let spec = ComponentSpec {
4116 id: "qa.process".to_string(),
4117 version: "0.0.0".to_string(),
4118 legacy_path: None,
4119 };
4120 let mut missing = HashSet::new();
4121 missing.insert(spec.id.clone());
4122
4123 let mut sources = HashMap::new();
4124 sources.insert(
4125 spec.id.clone(),
4126 ComponentSourceInfo {
4127 digest: Some("sha256:deadbeef".to_string()),
4128 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
4129 artifact: ComponentArtifactLocation::Inline {
4130 wasm_path: "component.wasm".to_string(),
4131 },
4132 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
4133 skip_digest_verification: false,
4134 },
4135 );
4136
4137 let mut loaded = HashMap::new();
4138 let result = rt.block_on(load_components_from_sources(
4139 &cache,
4140 &engine,
4141 &sources,
4142 &ComponentResolution::default(),
4143 &[spec],
4144 &mut missing,
4145 &mut loaded,
4146 Some(temp.path()),
4147 None,
4148 ));
4149 let err = result.unwrap_err();
4150 assert!(
4151 err.to_string().contains("bundled digest mismatch"),
4152 "unexpected error: {err}"
4153 );
4154 }
4155}
4156
4157#[cfg(test)]
4158mod pack_resolution_prop_tests {
4159 use super::*;
4160 use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
4161 use proptest::prelude::*;
4162 use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
4163 use std::collections::BTreeSet;
4164 use std::path::Path;
4165 use std::str::FromStr;
4166
4167 #[derive(Clone, Debug)]
4168 enum ResolveRequest {
4169 ById(String),
4170 ByName(String),
4171 }
4172
4173 #[derive(Clone, Debug, PartialEq, Eq)]
4174 struct ResolvedComponent {
4175 key: String,
4176 source: String,
4177 artifact: String,
4178 digest: Option<String>,
4179 expected_wasm_sha256: Option<String>,
4180 skip_digest_verification: bool,
4181 }
4182
4183 #[derive(Clone, Debug, PartialEq, Eq)]
4184 struct ResolveError {
4185 code: String,
4186 message: String,
4187 context_key: String,
4188 }
4189
4190 #[derive(Clone, Debug)]
4191 struct Scenario {
4192 pack_lock: Option<PackLockV1>,
4193 component_sources: Option<ComponentSourcesV1>,
4194 request: ResolveRequest,
4195 expected_sha256: Option<String>,
4196 bytes: Vec<u8>,
4197 }
4198
4199 fn resolve_component_test(
4200 sources: Option<&ComponentSourcesV1>,
4201 lock: Option<&PackLockV1>,
4202 request: &ResolveRequest,
4203 ) -> Result<ResolvedComponent, ResolveError> {
4204 let table = if let Some(lock) = lock {
4205 component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
4206 code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
4207 message: err.to_string(),
4208 context_key: request_key(request).to_string(),
4209 })?
4210 } else {
4211 let sources = component_sources_table(sources).map_err(|err| ResolveError {
4212 code: "component_sources_error".to_string(),
4213 message: err.to_string(),
4214 context_key: request_key(request).to_string(),
4215 })?;
4216 sources.ok_or_else(|| ResolveError {
4217 code: "missing_component_sources".to_string(),
4218 message: "component sources not provided".to_string(),
4219 context_key: request_key(request).to_string(),
4220 })?
4221 };
4222
4223 let key = request_key(request);
4224 let source = table.get(key).ok_or_else(|| ResolveError {
4225 code: "component_not_found".to_string(),
4226 message: format!("component {key} not found"),
4227 context_key: key.to_string(),
4228 })?;
4229
4230 Ok(ResolvedComponent {
4231 key: key.to_string(),
4232 source: source.source.to_string(),
4233 artifact: match source.artifact {
4234 ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
4235 ComponentArtifactLocation::Remote => "remote".to_string(),
4236 },
4237 digest: source.digest.clone(),
4238 expected_wasm_sha256: source.expected_wasm_sha256.clone(),
4239 skip_digest_verification: source.skip_digest_verification,
4240 })
4241 }
4242
4243 fn request_key(request: &ResolveRequest) -> &str {
4244 match request {
4245 ResolveRequest::ById(value) => value.as_str(),
4246 ResolveRequest::ByName(value) => value.as_str(),
4247 }
4248 }
4249
4250 fn classify_pack_lock_error(message: &str) -> &'static str {
4251 if message.contains("duplicate component name") {
4252 "duplicate_name"
4253 } else if message.contains("duplicate component id") {
4254 "duplicate_id"
4255 } else if message.contains("conflicting refs") {
4256 "conflicting_ref"
4257 } else if message.contains("conflicting bundled paths") {
4258 "conflicting_bundled_path"
4259 } else if message.contains("conflicting wasm_sha256") {
4260 "conflicting_wasm_sha256"
4261 } else if message.contains("missing source_ref") {
4262 "missing_source_ref"
4263 } else if message.contains("marked bundled but bundled_path is missing") {
4264 "missing_bundled_path"
4265 } else if message.contains("missing wasm_sha256") {
4266 "missing_wasm_sha256"
4267 } else if message.contains("tag ref") && message.contains("not bundled") {
4268 "tag_ref_requires_bundle"
4269 } else if message.contains("missing resolved_digest") {
4270 "missing_resolved_digest"
4271 } else if message.contains("invalid component ref") {
4272 "invalid_component_ref"
4273 } else if message.contains("sha256 digest") {
4274 "invalid_sha256"
4275 } else {
4276 "unknown_error"
4277 }
4278 }
4279
4280 fn known_error_codes() -> BTreeSet<&'static str> {
4281 [
4282 "component_sources_error",
4283 "missing_component_sources",
4284 "component_not_found",
4285 "duplicate_name",
4286 "duplicate_id",
4287 "conflicting_ref",
4288 "conflicting_bundled_path",
4289 "conflicting_wasm_sha256",
4290 "missing_source_ref",
4291 "missing_bundled_path",
4292 "missing_wasm_sha256",
4293 "tag_ref_requires_bundle",
4294 "missing_resolved_digest",
4295 "invalid_component_ref",
4296 "invalid_sha256",
4297 "unknown_error",
4298 ]
4299 .into_iter()
4300 .collect()
4301 }
4302
4303 fn proptest_config() -> ProptestConfig {
4304 let cases = std::env::var("PROPTEST_CASES")
4305 .ok()
4306 .and_then(|value| value.parse::<u32>().ok())
4307 .unwrap_or(128);
4308 ProptestConfig {
4309 cases,
4310 failure_persistence: None,
4311 ..ProptestConfig::default()
4312 }
4313 }
4314
4315 fn proptest_seed() -> Option<[u8; 32]> {
4316 let seed = std::env::var("PROPTEST_SEED")
4317 .ok()
4318 .and_then(|value| value.parse::<u64>().ok())?;
4319 let mut bytes = [0u8; 32];
4320 bytes[..8].copy_from_slice(&seed.to_le_bytes());
4321 Some(bytes)
4322 }
4323
4324 fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
4325 let config = ProptestConfig {
4326 cases,
4327 failure_persistence: None,
4328 ..ProptestConfig::default()
4329 };
4330 let mut runner = match seed {
4331 Some(bytes) => {
4332 TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
4333 }
4334 None => TestRunner::new(config),
4335 };
4336 runner
4337 .run(&strategy, |scenario| {
4338 run_scenario(&scenario);
4339 Ok(())
4340 })
4341 .unwrap();
4342 }
4343
4344 fn run_scenario(scenario: &Scenario) {
4345 let known_codes = known_error_codes();
4346 let first = resolve_component_test(
4347 scenario.component_sources.as_ref(),
4348 scenario.pack_lock.as_ref(),
4349 &scenario.request,
4350 );
4351 let second = resolve_component_test(
4352 scenario.component_sources.as_ref(),
4353 scenario.pack_lock.as_ref(),
4354 &scenario.request,
4355 );
4356 assert_eq!(normalize_result(&first), normalize_result(&second));
4357
4358 if let Some(lock) = scenario.pack_lock.as_ref() {
4359 let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
4360 assert_eq!(normalize_result(&first), normalize_result(&lock_only));
4361 }
4362
4363 if let Err(err) = first.as_ref() {
4364 assert!(
4365 known_codes.contains(err.code.as_str()),
4366 "unexpected error code {}: {}",
4367 err.code,
4368 err.message
4369 );
4370 }
4371
4372 if let Some(expected) = scenario.expected_sha256.as_deref() {
4373 let expected_ok =
4374 verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
4375 let actual = compute_sha256_digest_for(&scenario.bytes);
4376 if actual == normalize_sha256(expected).unwrap_or_default() {
4377 assert!(expected_ok, "expected sha256 match to succeed");
4378 } else {
4379 assert!(!expected_ok, "expected sha256 mismatch to fail");
4380 }
4381 }
4382 }
4383
4384 fn normalize_result(
4385 result: &Result<ResolvedComponent, ResolveError>,
4386 ) -> Result<ResolvedComponent, ResolveError> {
4387 match result {
4388 Ok(value) => Ok(value.clone()),
4389 Err(err) => Err(err.clone()),
4390 }
4391 }
4392
4393 fn scenario_strategy() -> impl Strategy<Value = Scenario> {
4394 let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
4395 let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
4396 let tag_ref = any::<bool>();
4397 let bundled = any::<bool>();
4398 let include_sha = any::<bool>();
4399 let include_component_id = any::<bool>();
4400 let request_by_id = any::<bool>();
4401 let use_lock = any::<bool>();
4402 let use_sources = any::<bool>();
4403 let bytes = prop::collection::vec(any::<u8>(), 1..64);
4404
4405 (
4406 name,
4407 alt_name,
4408 tag_ref,
4409 bundled,
4410 include_sha,
4411 include_component_id,
4412 request_by_id,
4413 use_lock,
4414 use_sources,
4415 bytes,
4416 )
4417 .prop_map(
4418 |(
4419 name,
4420 alt_name,
4421 tag_ref,
4422 bundled,
4423 include_sha,
4424 include_component_id,
4425 request_by_id,
4426 use_lock,
4427 use_sources,
4428 bytes,
4429 )| {
4430 let component_id_str = if include_component_id {
4431 alt_name.clone()
4432 } else {
4433 name.clone()
4434 };
4435 let component_id = ComponentId::from_str(&component_id_str).ok();
4436 let source_ref = if tag_ref {
4437 format!("oci://registry.test/{name}:v1")
4438 } else {
4439 format!(
4440 "oci://registry.test/{name}@sha256:{}",
4441 hex::encode([0x11u8; 32])
4442 )
4443 };
4444 let expected_sha256 = if bundled && include_sha {
4445 Some(compute_sha256_digest_for(&bytes))
4446 } else {
4447 None
4448 };
4449
4450 let lock_component = PackLockComponent {
4451 name: name.clone(),
4452 source_ref: Some(source_ref),
4453 legacy_ref: None,
4454 component_id,
4455 bundled: Some(bundled),
4456 bundled_path: if bundled {
4457 Some(format!("components/{name}.wasm"))
4458 } else {
4459 None
4460 },
4461 legacy_path: None,
4462 wasm_sha256: expected_sha256.clone(),
4463 legacy_sha256: None,
4464 resolved_digest: if bundled {
4465 None
4466 } else {
4467 Some("sha256:deadbeef".to_string())
4468 },
4469 digest: None,
4470 };
4471
4472 let pack_lock = if use_lock {
4473 Some(PackLockV1 {
4474 schema_version: 1,
4475 components: vec![lock_component],
4476 })
4477 } else {
4478 None
4479 };
4480
4481 let component_sources = if use_sources {
4482 Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
4483 name: name.clone(),
4484 component_id: ComponentId::from_str(&name).ok(),
4485 source: ComponentSourceRef::from_str(
4486 "oci://registry.test/component@sha256:deadbeef",
4487 )
4488 .expect("component ref"),
4489 resolved: ResolvedComponentV1 {
4490 digest: "sha256:deadbeef".to_string(),
4491 signature: None,
4492 signed_by: None,
4493 },
4494 artifact: if bundled {
4495 ArtifactLocationV1::Inline {
4496 wasm_path: format!("components/{name}.wasm"),
4497 manifest_path: None,
4498 }
4499 } else {
4500 ArtifactLocationV1::Remote
4501 },
4502 licensing_hint: None,
4503 metering_hint: None,
4504 }]))
4505 } else {
4506 None
4507 };
4508
4509 let request = if request_by_id {
4510 ResolveRequest::ById(component_id_str.clone())
4511 } else {
4512 ResolveRequest::ByName(name.clone())
4513 };
4514
4515 Scenario {
4516 pack_lock,
4517 component_sources,
4518 request,
4519 expected_sha256,
4520 bytes,
4521 }
4522 },
4523 )
4524 }
4525
4526 #[test]
4527 fn pack_resolution_proptest() {
4528 let seed = proptest_seed();
4529 run_cases(scenario_strategy(), proptest_config().cases, seed);
4530 }
4531
4532 #[test]
4533 fn pack_resolution_regression_seeds() {
4534 let seeds_path =
4535 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
4536 let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
4537 for line in raw.lines() {
4538 let line = line.trim();
4539 if line.is_empty() || line.starts_with('#') {
4540 continue;
4541 }
4542 let seed = line.parse::<u64>().expect("seed must be an integer");
4543 let mut bytes = [0u8; 32];
4544 bytes[..8].copy_from_slice(&seed.to_le_bytes());
4545 run_cases(scenario_strategy(), 1, Some(bytes));
4546 }
4547 }
4548}
4549
4550fn locate_pack_assets(
4551 materialized_root: Option<&Path>,
4552 archive_hint: Option<&Path>,
4553) -> Result<(Option<PathBuf>, Option<TempDir>)> {
4554 if let Some(root) = materialized_root {
4555 let assets = root.join("assets");
4556 if assets.is_dir() {
4557 return Ok((Some(assets), None));
4558 }
4559 }
4560 if let Some(path) = archive_hint
4561 && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
4562 {
4563 return Ok((Some(assets), Some(tempdir)));
4564 }
4565 Ok((None, None))
4566}
4567
4568fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
4569 let file =
4570 File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
4571 let mut archive =
4572 ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
4573 let temp = TempDir::new().context("failed to create temporary assets directory")?;
4574 let mut found = false;
4575 for idx in 0..archive.len() {
4576 let mut entry = archive.by_index(idx)?;
4577 let name = entry.name();
4578 if !name.starts_with("assets/") {
4579 continue;
4580 }
4581 let dest = temp.path().join(name);
4582 if name.ends_with('/') {
4583 std::fs::create_dir_all(&dest)?;
4584 found = true;
4585 continue;
4586 }
4587 if let Some(parent) = dest.parent() {
4588 std::fs::create_dir_all(parent)?;
4589 }
4590 let mut outfile = std::fs::File::create(&dest)?;
4591 std::io::copy(&mut entry, &mut outfile)?;
4592 found = true;
4593 }
4594 if found {
4595 let assets_path = temp.path().join("assets");
4596 Ok(Some((temp, assets_path)))
4597 } else {
4598 Ok(None)
4599 }
4600}
4601
4602fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
4603 let mut opts = DistOptions {
4604 allow_tags: true,
4605 ..DistOptions::default()
4606 };
4607 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
4608 opts.cache_dir = cache_dir;
4609 }
4610 if component_resolution.dist_offline {
4611 opts.offline = true;
4612 }
4613 opts
4614}
4615
4616#[allow(clippy::too_many_arguments)]
4617async fn load_components_from_sources(
4618 cache: &CacheManager,
4619 engine: &Engine,
4620 component_sources: &HashMap<String, ComponentSourceInfo>,
4621 component_resolution: &ComponentResolution,
4622 specs: &[ComponentSpec],
4623 missing: &mut HashSet<String>,
4624 into: &mut HashMap<String, PackComponent>,
4625 materialized_root: Option<&Path>,
4626 archive_hint: Option<&Path>,
4627) -> Result<()> {
4628 let mut archive = if let Some(path) = archive_hint {
4629 Some(
4630 ZipArchive::new(File::open(path)?)
4631 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
4632 )
4633 } else {
4634 None
4635 };
4636 let mut dist_client: Option<DistClient> = None;
4637
4638 for spec in specs {
4639 if !missing.contains(&spec.id) {
4640 continue;
4641 }
4642 let Some(source) = component_sources.get(&spec.id) else {
4643 continue;
4644 };
4645
4646 let bytes = match &source.artifact {
4647 ComponentArtifactLocation::Inline { wasm_path } => {
4648 if let Some(root) = materialized_root {
4649 let path = root.join(wasm_path);
4650 if path.exists() {
4651 std::fs::read(&path).with_context(|| {
4652 format!(
4653 "failed to read inline component {} from {}",
4654 spec.id,
4655 path.display()
4656 )
4657 })?
4658 } else if archive.is_none() {
4659 bail!("inline component {} missing at {}", spec.id, path.display());
4660 } else {
4661 read_entry(
4662 archive.as_mut().expect("archive present when needed"),
4663 wasm_path,
4664 )
4665 .with_context(|| {
4666 format!(
4667 "inline component {} missing at {} in pack archive",
4668 spec.id, wasm_path
4669 )
4670 })?
4671 }
4672 } else if let Some(archive) = archive.as_mut() {
4673 read_entry(archive, wasm_path).with_context(|| {
4674 format!(
4675 "inline component {} missing at {} in pack archive",
4676 spec.id, wasm_path
4677 )
4678 })?
4679 } else {
4680 bail!(
4681 "inline component {} missing and no pack source available",
4682 spec.id
4683 );
4684 }
4685 }
4686 ComponentArtifactLocation::Remote => {
4687 if source.source.is_tag() {
4688 bail!(
4689 "component {} uses tag ref {} but is not bundled; rebuild the pack",
4690 spec.id,
4691 source.source
4692 );
4693 }
4694 let client = dist_client.get_or_insert_with(|| {
4695 DistClient::new(dist_options_from(component_resolution))
4696 });
4697 let reference = source.source.to_string();
4698 fault::maybe_fail_asset(&reference)
4699 .await
4700 .with_context(|| format!("fault injection blocked asset {reference}"))?;
4701 let digest = source.digest.as_deref().ok_or_else(|| {
4702 anyhow!(
4703 "component {} missing expected digest for remote component",
4704 spec.id
4705 )
4706 })?;
4707 let cache_path = if let Ok(cache_path) = client.fetch_digest(digest).await {
4708 cache_path
4709 } else if component_resolution.dist_offline {
4710 client
4711 .fetch_digest(digest)
4712 .await
4713 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
4714 } else {
4715 let source = client
4716 .parse_source(&reference)
4717 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4718 let descriptor = client
4719 .resolve(source, ResolvePolicy)
4720 .await
4721 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4722 let resolved = client
4723 .fetch(&descriptor, CachePolicy)
4724 .await
4725 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
4726 let expected = normalize_digest(digest);
4727 let actual = normalize_digest(&resolved.digest);
4728 if expected != actual {
4729 bail!(
4730 "component {} digest mismatch after fetch: expected {}, got {}",
4731 spec.id,
4732 expected,
4733 actual
4734 );
4735 }
4736 resolved.cache_path.ok_or_else(|| {
4737 anyhow!(
4738 "component {} resolved from {} but cache path is missing",
4739 spec.id,
4740 reference
4741 )
4742 })?
4743 };
4744 std::fs::read(&cache_path).with_context(|| {
4745 format!(
4746 "failed to read cached component {} from {}",
4747 spec.id,
4748 cache_path.display()
4749 )
4750 })?
4751 }
4752 };
4753
4754 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
4755 verify_wasm_sha256(&spec.id, expected, &bytes)?;
4756 } else if source.skip_digest_verification {
4757 let actual = compute_sha256_digest_for(&bytes);
4758 warn!(
4759 component_id = %spec.id,
4760 digest = %actual,
4761 "bundled component missing wasm_sha256; allowing due to flag"
4762 );
4763 } else {
4764 let expected = source.digest.as_deref().ok_or_else(|| {
4765 anyhow!(
4766 "component {} missing expected digest for verification",
4767 spec.id
4768 )
4769 })?;
4770 verify_component_digest(&spec.id, expected, &bytes)?;
4771 }
4772 let component =
4773 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
4774 .await
4775 .with_context(|| format!("failed to compile component {}", spec.id))?;
4776 into.insert(
4777 spec.id.clone(),
4778 PackComponent {
4779 name: spec.id.clone(),
4780 version: spec.version.clone(),
4781 component,
4782 },
4783 );
4784 missing.remove(&spec.id);
4785 }
4786
4787 Ok(())
4788}
4789
4790fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
4791 match err {
4792 DistError::NotFound { reference: missing } => anyhow!(
4793 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4794 component_id,
4795 missing,
4796 reference
4797 ),
4798 DistError::Offline { reference: blocked } => anyhow!(
4799 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4800 component_id,
4801 blocked,
4802 reference
4803 ),
4804 DistError::Unauthorized { target } => anyhow!(
4805 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
4806 component_id,
4807 target,
4808 reference
4809 ),
4810 other => anyhow!(
4811 "failed to resolve component {} from {}: {}",
4812 component_id,
4813 reference,
4814 other
4815 ),
4816 }
4817}
4818
4819async fn load_components_from_overrides(
4820 cache: &CacheManager,
4821 engine: &Engine,
4822 overrides: &HashMap<String, PathBuf>,
4823 specs: &[ComponentSpec],
4824 missing: &mut HashSet<String>,
4825 into: &mut HashMap<String, PackComponent>,
4826) -> Result<()> {
4827 for spec in specs {
4828 if !missing.contains(&spec.id) {
4829 continue;
4830 }
4831 let Some(path) = overrides.get(&spec.id) else {
4832 continue;
4833 };
4834 let bytes = std::fs::read(path)
4835 .with_context(|| format!("failed to read override component {}", path.display()))?;
4836 let component = compile_component_with_cache(cache, engine, None, bytes)
4837 .await
4838 .with_context(|| {
4839 format!(
4840 "failed to compile component {} from override {}",
4841 spec.id,
4842 path.display()
4843 )
4844 })?;
4845 into.insert(
4846 spec.id.clone(),
4847 PackComponent {
4848 name: spec.id.clone(),
4849 version: spec.version.clone(),
4850 component,
4851 },
4852 );
4853 missing.remove(&spec.id);
4854 }
4855 Ok(())
4856}
4857
4858async fn load_components_from_dir(
4859 cache: &CacheManager,
4860 engine: &Engine,
4861 root: &Path,
4862 specs: &[ComponentSpec],
4863 missing: &mut HashSet<String>,
4864 into: &mut HashMap<String, PackComponent>,
4865) -> Result<()> {
4866 for spec in specs {
4867 if !missing.contains(&spec.id) {
4868 continue;
4869 }
4870 let path = component_path_for_spec(root, spec);
4871 if !path.exists() {
4872 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
4873 continue;
4874 }
4875 let bytes = std::fs::read(&path)
4876 .with_context(|| format!("failed to read component {}", path.display()))?;
4877 let component = compile_component_with_cache(cache, engine, None, bytes)
4878 .await
4879 .with_context(|| {
4880 format!(
4881 "failed to compile component {} from {}",
4882 spec.id,
4883 path.display()
4884 )
4885 })?;
4886 into.insert(
4887 spec.id.clone(),
4888 PackComponent {
4889 name: spec.id.clone(),
4890 version: spec.version.clone(),
4891 component,
4892 },
4893 );
4894 missing.remove(&spec.id);
4895 }
4896 Ok(())
4897}
4898
4899async fn load_components_from_archive(
4900 cache: &CacheManager,
4901 engine: &Engine,
4902 path: &Path,
4903 specs: &[ComponentSpec],
4904 missing: &mut HashSet<String>,
4905 into: &mut HashMap<String, PackComponent>,
4906) -> Result<()> {
4907 let mut archive = ZipArchive::new(File::open(path)?)
4908 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
4909 for spec in specs {
4910 if !missing.contains(&spec.id) {
4911 continue;
4912 }
4913 let file_name = spec
4914 .legacy_path
4915 .clone()
4916 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
4917 let bytes = match read_entry(&mut archive, &file_name) {
4918 Ok(bytes) => bytes,
4919 Err(err) => {
4920 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
4921 continue;
4922 }
4923 };
4924 let component = compile_component_with_cache(cache, engine, None, bytes)
4925 .await
4926 .with_context(|| format!("failed to compile component {}", spec.id))?;
4927 into.insert(
4928 spec.id.clone(),
4929 PackComponent {
4930 name: spec.id.clone(),
4931 version: spec.version.clone(),
4932 component,
4933 },
4934 );
4935 missing.remove(&spec.id);
4936 }
4937 Ok(())
4938}
4939
4940#[cfg(test)]
4941mod tests {
4942 use super::*;
4943 use greentic_flow::model::{FlowDoc, NodeDoc};
4944 use indexmap::IndexMap;
4945 use serde_json::json;
4946
4947 #[test]
4948 fn normalizes_raw_component_to_component_exec() {
4949 let mut nodes = IndexMap::new();
4950 let mut raw = IndexMap::new();
4951 raw.insert(
4952 "templating.handlebars".into(),
4953 json!({ "template": "Hi {{name}}" }),
4954 );
4955 nodes.insert(
4956 "start".into(),
4957 NodeDoc {
4958 raw,
4959 routing: json!([{"out": true}]),
4960 ..Default::default()
4961 },
4962 );
4963 let doc = FlowDoc {
4964 id: "welcome".into(),
4965 title: None,
4966 description: None,
4967 flow_type: "messaging".into(),
4968 start: Some("start".into()),
4969 parameters: json!({}),
4970 tags: Vec::new(),
4971 schema_version: None,
4972 entrypoints: IndexMap::new(),
4973 meta: None,
4974 slot_schema: None,
4975 nodes,
4976 };
4977
4978 let normalized = normalize_flow_doc(doc);
4979 let node = normalized.nodes.get("start").expect("node exists");
4980 assert_eq!(node.operation.as_deref(), Some("component.exec"));
4981 assert!(node.raw.is_empty());
4982 let payload = node.payload.as_object().expect("payload object");
4983 assert_eq!(
4984 payload.get("component"),
4985 Some(&Value::String("templating.handlebars".into()))
4986 );
4987 assert_eq!(
4988 payload.get("operation"),
4989 Some(&Value::String("render".into()))
4990 );
4991 let input = payload.get("input").unwrap();
4992 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
4993 }
4994
4995 #[test]
4996 fn normalizes_canonical_operation_node_to_component_exec_with_config() {
4997 let mut nodes = IndexMap::new();
4998 let mut raw = IndexMap::new();
4999 raw.insert(
5000 "handle_message".into(),
5001 json!({
5002 "component": "oci://ghcr.io/greenticai/component/component-llm-openai:stable",
5003 "config": {
5004 "provider": "ollama",
5005 "base_url": "http://127.0.0.1:11434/v1",
5006 "default_model": "llama3.2"
5007 },
5008 "input": {
5009 "messages": [{
5010 "role": "user",
5011 "content": "Say hello from Ollama."
5012 }]
5013 }
5014 }),
5015 );
5016 nodes.insert(
5017 "llm".into(),
5018 NodeDoc {
5019 raw,
5020 routing: json!([{"out": true}]),
5021 ..Default::default()
5022 },
5023 );
5024 let doc = FlowDoc {
5025 id: "ollama-repro".into(),
5026 title: None,
5027 description: None,
5028 flow_type: "messaging".into(),
5029 start: Some("llm".into()),
5030 parameters: json!({}),
5031 tags: Vec::new(),
5032 schema_version: None,
5033 entrypoints: IndexMap::new(),
5034 meta: None,
5035 slot_schema: None,
5036 nodes,
5037 };
5038
5039 let normalized = normalize_flow_doc(doc);
5040 let node = normalized.nodes.get("llm").expect("node exists");
5041 assert_eq!(node.operation.as_deref(), Some("component.exec"));
5042 assert!(node.raw.is_empty());
5043 let payload = node.payload.as_object().expect("payload object");
5044 assert_eq!(
5045 payload.get("component"),
5046 Some(&Value::String(
5047 "oci://ghcr.io/greenticai/component/component-llm-openai:stable".into()
5048 ))
5049 );
5050 assert_eq!(
5051 payload.get("operation"),
5052 Some(&Value::String("handle_message".into()))
5053 );
5054 assert_eq!(
5055 payload.get("config"),
5056 Some(&json!({
5057 "provider": "ollama",
5058 "base_url": "http://127.0.0.1:11434/v1",
5059 "default_model": "llama3.2"
5060 }))
5061 );
5062 assert_eq!(
5063 payload.get("input"),
5064 Some(&json!({
5065 "messages": [{
5066 "role": "user",
5067 "content": "Say hello from Ollama."
5068 }]
5069 }))
5070 );
5071 }
5072
5073 #[test]
5074 fn missing_export_error_detection_recognises_bindgen_shapes() {
5075 assert!(is_missing_export_error(
5077 "instantiation: no exported instance named \
5078 `greentic:provider-instance-identity/instance-identity-api@0.1.0`"
5079 ));
5080 assert!(is_missing_export_error(
5082 "instantiation: no exported function named `identify-instance`"
5083 ));
5084 assert!(!is_missing_export_error(
5086 "Wasm trap: out of bounds memory access"
5087 ));
5088 assert!(!is_missing_export_error(
5091 "instantiation: no exported instance named \
5092 `greentic:provider-schema-core/schema-core-api@1.0.0`"
5093 ));
5094 assert!(!is_missing_export_error(
5096 "instantiation: no exported function named `invoke`"
5097 ));
5098 }
5099
5100 #[test]
5101 fn identify_outcome_merge_in_follows_lattice() {
5102 let unsupported = || IdentifyOutcome::Unsupported;
5103 let no_match = || IdentifyOutcome::NoMatch;
5104 let id_a = || IdentifyOutcome::Identified("a".to_string());
5105 let id_b = || IdentifyOutcome::Identified("b".to_string());
5106
5107 let mut x = unsupported();
5109 x.merge_in(unsupported());
5110 assert_eq!(x, unsupported());
5111 let mut x = unsupported();
5112 x.merge_in(no_match());
5113 assert_eq!(x, no_match());
5114 let mut x = unsupported();
5115 x.merge_in(id_a());
5116 assert_eq!(x, id_a());
5117
5118 let mut x = no_match();
5120 x.merge_in(unsupported());
5121 assert_eq!(x, no_match(), "NoMatch must not downgrade to Unsupported");
5122 let mut x = no_match();
5123 x.merge_in(no_match());
5124 assert_eq!(x, no_match());
5125 let mut x = no_match();
5126 x.merge_in(id_a());
5127 assert_eq!(x, id_a(), "Identified must override NoMatch");
5128
5129 let mut x = id_a();
5131 x.merge_in(unsupported());
5132 assert_eq!(x, id_a());
5133 let mut x = id_a();
5134 x.merge_in(no_match());
5135 assert_eq!(x, id_a());
5136 let mut x = id_a();
5137 x.merge_in(id_b());
5138 assert_eq!(
5139 x,
5140 id_a(),
5141 "first Identified wins; later id does not replace"
5142 );
5143 }
5144}
5145
5146#[cfg(test)]
5147mod identify_endpoints_pack_tests {
5148 use super::*;
5149 use crate::config::{
5150 FlowRetryConfig, HostConfig, OperatorPolicy, RateLimits, SecretsPolicy, StateStorePolicy,
5151 WebhookPolicy,
5152 };
5153 use crate::trace::TraceConfig;
5154 use crate::validate::ValidationConfig;
5155
5156 fn test_host_config() -> HostConfig {
5157 HostConfig {
5158 tenant: "test".to_string(),
5159 bindings_path: PathBuf::from("/tmp/bindings.yaml"),
5160 flow_type_bindings: HashMap::new(),
5161 rate_limits: RateLimits::default(),
5162 retry: FlowRetryConfig::default(),
5163 http_enabled: false,
5164 secrets_policy: SecretsPolicy::allow_all(),
5165 state_store_policy: StateStorePolicy::default(),
5166 webhook_policy: WebhookPolicy::default(),
5167 timers: Vec::new(),
5168 oauth: None,
5169 mocks: None,
5170 pack_bindings: Vec::new(),
5171 env_passthrough: Vec::new(),
5172 trace: TraceConfig::from_env(),
5173 validation: ValidationConfig::from_env(),
5174 operator_policy: OperatorPolicy::allow_all(),
5175 fast2flow: Default::default(),
5176 }
5177 }
5178
5179 #[tokio::test]
5180 async fn no_manifest_returns_unsupported_for_all_types() {
5181 let pack = PackRuntime::for_component_test(
5187 Vec::new(),
5188 HashMap::new(),
5189 "test-pack",
5190 Arc::new(test_host_config()),
5191 )
5192 .expect("empty pack construction");
5193 let result = pack
5194 .identify_endpoints_by_provider_type(&["teams", "slack", "telegram"], b"{}")
5195 .await
5196 .expect("no-manifest path must succeed");
5197 assert_eq!(result.len(), 3);
5198 for ty in &["teams", "slack", "telegram"] {
5199 assert_eq!(
5200 result.get(*ty),
5201 Some(&IdentifyOutcome::Unsupported),
5202 "type '{ty}' must be Unsupported when pack has no manifest"
5203 );
5204 }
5205 }
5206
5207 #[tokio::test]
5208 async fn empty_provider_types_returns_empty_map() {
5209 let pack = PackRuntime::for_component_test(
5210 Vec::new(),
5211 HashMap::new(),
5212 "test-pack",
5213 Arc::new(test_host_config()),
5214 )
5215 .expect("empty pack construction");
5216 let result = pack
5217 .identify_endpoints_by_provider_type(&[], b"{}")
5218 .await
5219 .expect("empty types fast path");
5220 assert!(result.is_empty());
5221 }
5222}
5223
5224#[derive(Clone, Debug, Default, Serialize, Deserialize)]
5225pub struct PackMetadata {
5226 pub pack_id: String,
5227 pub version: String,
5228 #[serde(default)]
5229 pub entry_flows: Vec<String>,
5230 #[serde(default)]
5231 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
5232}
5233
5234impl PackMetadata {
5235 fn from_wasm(bytes: &[u8]) -> Option<Self> {
5236 let parser = Parser::new(0);
5237 for payload in parser.parse_all(bytes) {
5238 let payload = payload.ok()?;
5239 match payload {
5240 Payload::CustomSection(section) => {
5241 if section.name() == "greentic.manifest"
5242 && let Ok(meta) = Self::from_bytes(section.data())
5243 {
5244 return Some(meta);
5245 }
5246 }
5247 Payload::DataSection(reader) => {
5248 for segment in reader.into_iter().flatten() {
5249 if let Ok(meta) = Self::from_bytes(segment.data) {
5250 return Some(meta);
5251 }
5252 }
5253 }
5254 _ => {}
5255 }
5256 }
5257 None
5258 }
5259
5260 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
5261 #[derive(Deserialize)]
5262 struct RawManifest {
5263 pack_id: String,
5264 version: String,
5265 #[serde(default)]
5266 entry_flows: Vec<String>,
5267 #[serde(default)]
5268 flows: Vec<RawFlow>,
5269 #[serde(default)]
5270 secret_requirements: Vec<greentic_types::SecretRequirement>,
5271 }
5272
5273 #[derive(Deserialize)]
5274 struct RawFlow {
5275 id: String,
5276 }
5277
5278 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
5279 let mut entry_flows = if manifest.entry_flows.is_empty() {
5280 manifest.flows.iter().map(|f| f.id.clone()).collect()
5281 } else {
5282 manifest.entry_flows.clone()
5283 };
5284 entry_flows.retain(|id| !id.is_empty());
5285 Ok(Self {
5286 pack_id: manifest.pack_id,
5287 version: manifest.version,
5288 entry_flows,
5289 secret_requirements: manifest.secret_requirements,
5290 })
5291 }
5292
5293 pub fn fallback(path: &Path) -> Self {
5294 let pack_id = path
5295 .file_stem()
5296 .map(|s| s.to_string_lossy().into_owned())
5297 .unwrap_or_else(|| "unknown-pack".to_string());
5298 Self {
5299 pack_id,
5300 version: "0.0.0".to_string(),
5301 entry_flows: Vec::new(),
5302 secret_requirements: Vec::new(),
5303 }
5304 }
5305
5306 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
5307 let entry_flows = manifest
5308 .flows
5309 .iter()
5310 .map(|flow| flow.id.as_str().to_string())
5311 .collect::<Vec<_>>();
5312 Self {
5313 pack_id: manifest.pack_id.as_str().to_string(),
5314 version: manifest.version.to_string(),
5315 entry_flows,
5316 secret_requirements: manifest.secret_requirements.clone(),
5317 }
5318 }
5319}