1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::{
16 schema_core::SchemaCorePre as LegacySchemaCorePre,
17 schema_core_path::SchemaCorePre as PathSchemaCorePre,
18 schema_core_schema::SchemaCorePre as SchemaSchemaCorePre,
19};
20use crate::provider_core_only;
21use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
22use anyhow::{Context, Result, anyhow, bail};
23use futures::executor::block_on;
24use greentic_distributor_client::dist::{
25 CachePolicy, DistClient, DistError, DistOptions, ResolvePolicy,
26};
27use greentic_interfaces_wasmtime::host_helpers::v1::{
28 self as host_v1, HostFns, add_all_v1_to_linker,
29 runner_host_http::RunnerHostHttp,
30 runner_host_kv::RunnerHostKv,
31 secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
32 state_store::{
33 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
34 StateStoreHost, TenantCtx as StateTenantCtx,
35 },
36 telemetry_logger::{
37 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
38 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
39 TenantCtx as TelemetryTenantCtx,
40 },
41};
42use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::http::http_client as http_client_client_alias;
43use greentic_interfaces_wasmtime::{
44 http_client_client_v1_0::greentic::interfaces_types::types as http_types_v1_0,
45 http_client_client_v1_1::greentic::interfaces_types::types as http_types_v1_1,
46};
47use greentic_pack::builder as legacy_pack;
48use greentic_types::flow::FlowHasher;
49use greentic_types::{
50 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
51 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
52 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
53 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
54 pack_manifest::ExtensionInline,
55};
56use host_v1::http_client as host_http_client;
57use host_v1::http_client::{
58 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
59 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
60 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
61 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
62};
63use indexmap::IndexMap;
64use once_cell::sync::Lazy;
65use parking_lot::{Mutex, RwLock};
66use reqwest::blocking::Client as BlockingClient;
67use runner_core::normalize_under_root;
68use serde::{Deserialize, Serialize};
69use serde_cbor;
70use serde_json::{self, Value};
71use sha2::Digest;
72use tempfile::TempDir;
73use tokio::fs;
74use wasmparser::{Parser, Payload};
75use wasmtime::{Store, StoreContextMut};
76use wasmtime_wasi_http::WasiHttpCtx;
77use wasmtime_wasi_http::p2::{
78 WasiHttpCtxView, WasiHttpView, add_only_http_to_linker_sync as add_wasi_http_to_linker,
79};
80use wasmtime_wasi_tls::{LinkOptions, WasiTls, WasiTlsCtx, WasiTlsCtxBuilder};
81use zip::ZipArchive;
82
83use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
84use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
85use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
86#[cfg(feature = "fault-injection")]
87use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
88
89use crate::config::HostConfig;
90use crate::fault;
91use crate::secrets::{DynSecretsManager, read_secret_blocking, write_secret_blocking};
92use crate::storage::state::STATE_PREFIX;
93use crate::storage::{DynSessionStore, DynStateStore};
94use crate::verify;
95use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
96use tracing::warn;
97use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
98use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
99
100use greentic_flow::model::FlowDoc;
101
102#[allow(dead_code)]
103pub struct PackRuntime {
104 path: PathBuf,
106 archive_path: Option<PathBuf>,
108 config: Arc<HostConfig>,
109 engine: Engine,
110 metadata: PackMetadata,
111 manifest: Option<greentic_types::PackManifest>,
112 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
113 component_manifests: HashMap<String, ComponentManifest>,
114 mocks: Option<Arc<MockLayer>>,
115 flows: Option<PackFlows>,
116 components: HashMap<String, PackComponent>,
117 http_client: Arc<BlockingClient>,
118 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
119 session_store: Option<DynSessionStore>,
120 state_store: Option<DynStateStore>,
121 wasi_policy: Arc<RunnerWasiPolicy>,
122 assets_tempdir: Option<TempDir>,
123 provider_registry: RwLock<Option<ProviderRegistry>>,
124 secrets: DynSecretsManager,
125 oauth_config: Option<OAuthBrokerConfig>,
126 cache: CacheManager,
127}
128
129struct PackComponent {
130 #[allow(dead_code)]
131 name: String,
132 #[allow(dead_code)]
133 version: String,
134 component: Arc<Component>,
135}
136
137fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
138where
139 F: FnOnce() -> Result<T> + Send + 'static,
140 T: Send + 'static,
141{
142 let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
143 let handle = builder
144 .spawn(move || {
145 let pid = std::process::id();
146 let thread_id = std::thread::current().id();
147 let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
148 tracing::info!(
149 event = "wasmtime.thread.start",
150 task = task_name,
151 pid,
152 thread_id = ?thread_id,
153 tokio_handle_present,
154 "starting Wasmtime thread"
155 );
156 task()
157 })
158 .context("failed to spawn Wasmtime thread")?;
159 handle
160 .join()
161 .map_err(|err| {
162 let reason = if let Some(msg) = err.downcast_ref::<&str>() {
163 msg.to_string()
164 } else if let Some(msg) = err.downcast_ref::<String>() {
165 msg.clone()
166 } else {
167 "unknown panic".to_string()
168 };
169 anyhow!("Wasmtime thread panicked: {reason}")
170 })
171 .and_then(|res| res)
172}
173
174#[derive(Debug, Default, Clone)]
175pub struct ComponentResolution {
176 pub materialized_root: Option<PathBuf>,
178 pub overrides: HashMap<String, PathBuf>,
180 pub dist_offline: bool,
182 pub dist_cache_dir: Option<PathBuf>,
184 pub allow_missing_hash: bool,
186}
187
188fn build_blocking_client() -> BlockingClient {
189 std::thread::spawn(|| {
190 BlockingClient::builder()
191 .no_proxy()
192 .build()
193 .expect("blocking client")
194 })
195 .join()
196 .expect("client build thread panicked")
197}
198
199fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
200 let (root, candidate) = if path.is_absolute() {
201 let parent = path
202 .parent()
203 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
204 let root = parent
205 .canonicalize()
206 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
207 let file = path
208 .file_name()
209 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
210 (root, PathBuf::from(file))
211 } else {
212 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
213 let base = if let Some(parent) = path.parent() {
214 cwd.join(parent)
215 } else {
216 cwd
217 };
218 let root = base
219 .canonicalize()
220 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
221 let file = path
222 .file_name()
223 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
224 (root, PathBuf::from(file))
225 };
226 let safe = normalize_under_root(&root, &candidate)?;
227 Ok((root, safe))
228}
229
230static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
231
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct FlowDescriptor {
234 pub id: String,
235 #[serde(rename = "type")]
236 pub flow_type: String,
237 pub pack_id: String,
238 pub profile: String,
239 pub version: String,
240 #[serde(default)]
241 pub description: Option<String>,
242}
243
244pub struct HostState {
245 #[allow(dead_code)]
246 pack_id: String,
247 config: Arc<HostConfig>,
248 http_client: Arc<BlockingClient>,
249 default_env: String,
250 #[allow(dead_code)]
251 session_store: Option<DynSessionStore>,
252 state_store: Option<DynStateStore>,
253 mocks: Option<Arc<MockLayer>>,
254 secrets: DynSecretsManager,
255 oauth_config: Option<OAuthBrokerConfig>,
256 oauth_host: OAuthBrokerHost,
257 exec_ctx: Option<ComponentExecCtx>,
258 component_ref: Option<String>,
259 provider_core_component: bool,
260}
261
262impl HostState {
263 #[allow(clippy::default_constructed_unit_structs)]
264 #[allow(clippy::too_many_arguments)]
265 pub fn new(
266 pack_id: String,
267 config: Arc<HostConfig>,
268 http_client: Arc<BlockingClient>,
269 mocks: Option<Arc<MockLayer>>,
270 session_store: Option<DynSessionStore>,
271 state_store: Option<DynStateStore>,
272 secrets: DynSecretsManager,
273 oauth_config: Option<OAuthBrokerConfig>,
274 exec_ctx: Option<ComponentExecCtx>,
275 component_ref: Option<String>,
276 provider_core_component: bool,
277 ) -> Result<Self> {
278 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
279 Ok(Self {
280 pack_id,
281 config,
282 http_client,
283 default_env,
284 session_store,
285 state_store,
286 mocks,
287 secrets,
288 oauth_config,
289 oauth_host: OAuthBrokerHost::default(),
290 exec_ctx,
291 component_ref,
292 provider_core_component,
293 })
294 }
295
296 fn instantiate_component_result(
297 linker: &mut Linker<ComponentState>,
298 store: &mut Store<ComponentState>,
299 component: &Component,
300 ctx: &ComponentExecCtx,
301 component_ref: &str,
302 operation: &str,
303 input_json: &str,
304 ) -> Result<InvokeResult> {
305 let pre_instance = linker.instantiate_pre(component)?;
306 match component_api::v0_6::ComponentPre::new(pre_instance) {
307 Ok(pre) => {
308 let envelope = component_api::envelope_v0_6(ctx, component_ref, input_json)?;
309 let operation_owned = operation.to_string();
310 let result = block_on(async {
311 let bindings = pre.instantiate_async(&mut *store).await?;
312 let node = bindings.greentic_component_node();
313 node.call_invoke(&mut *store, &operation_owned, &envelope)
314 })?;
315 component_api::invoke_result_from_v0_6(result)
316 }
317 Err(err_v06) => {
318 if !is_missing_node_export(&err_v06, "0.6.0") {
319 return Err(err_v06.into());
320 }
321 let pre_instance = linker.instantiate_pre(component)?;
322 match component_api::v0_5::ComponentPre::new(pre_instance) {
323 Ok(pre) => {
324 let result = block_on(async {
325 let bindings = pre.instantiate_async(&mut *store).await?;
326 let node = bindings.greentic_component_node();
327 let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
328 let operation_owned = operation.to_string();
329 let input_owned = input_json.to_string();
330 node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
331 })?;
332 Ok(component_api::invoke_result_from_v0_5(result))
333 }
334 Err(err) => {
335 if !is_missing_node_export(&err, "0.5.0") {
336 return Err(err.into());
337 }
338 let pre_instance = linker.instantiate_pre(component)?;
339 match component_api::v0_4::ComponentPre::new(pre_instance) {
340 Ok(pre) => {
341 let result = block_on(async {
342 let bindings = pre.instantiate_async(&mut *store).await?;
343 let node = bindings.greentic_component_node();
344 let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
345 let operation_owned = operation.to_string();
346 let input_owned = input_json.to_string();
347 node.call_invoke(
348 &mut *store,
349 &ctx_v04,
350 &operation_owned,
351 &input_owned,
352 )
353 })?;
354 Ok(component_api::invoke_result_from_v0_4(result))
355 }
356 Err(err_v04) => {
357 if is_missing_node_export(&err_v04, "0.4.0") {
358 Self::try_v06_runtime(linker, store, component, input_json)
359 } else {
360 Err(err_v04.into())
361 }
362 }
363 }
364 }
365 }
366 }
367 }
368 }
369
370 fn try_v06_runtime(
373 linker: &mut Linker<ComponentState>,
374 store: &mut Store<ComponentState>,
375 component: &Component,
376 input_json: &str,
377 ) -> Result<InvokeResult> {
378 let pre_instance = linker.instantiate_pre(component)?;
379 let pre = component_api::v0_6_runtime::ComponentV0V6RuntimePre::new(pre_instance).map_err(
380 |err| err.context("component exports neither node@0.5/0.4 nor component-runtime@0.6"),
381 )?;
382
383 let result = block_on(async {
384 let bindings = pre.instantiate_async(&mut *store).await?;
385 let runtime = bindings.greentic_component_component_runtime();
386
387 let input_value: Value = serde_json::from_str(input_json).unwrap_or(Value::Null);
389 let input_cbor =
390 serde_cbor::to_vec(&input_value).context("encode input as CBOR for v0.6")?;
391 let empty_state = serde_cbor::to_vec(&Value::Object(Default::default()))
392 .context("encode empty state")?;
393
394 let run_result = runtime
395 .call_run(&mut *store, &input_cbor, &empty_state)
396 .map_err(|err| err.context("v0.6 component-runtime::run call failed"))?;
397
398 let output_value: Value = serde_cbor::from_slice(&run_result.output)
400 .context("decode v0.6 run output CBOR")?;
401 let output_json = serde_json::to_string(&output_value)
402 .context("serialize v0.6 run output to JSON")?;
403
404 Ok::<_, anyhow::Error>(output_json)
405 })?;
406
407 Ok(InvokeResult::Ok(result))
408 }
409
410 fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
411 match result {
412 InvokeResult::Ok(body) => {
413 if body.is_empty() {
414 return Ok(Value::Null);
415 }
416 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
417 }
418 InvokeResult::Err(NodeError {
419 code,
420 message,
421 retryable,
422 backoff_ms,
423 details,
424 }) => {
425 let mut obj = serde_json::Map::new();
426 obj.insert("ok".into(), Value::Bool(false));
427 let mut error = serde_json::Map::new();
428 error.insert("code".into(), Value::String(code));
429 error.insert("message".into(), Value::String(message));
430 error.insert("retryable".into(), Value::Bool(retryable));
431 if let Some(backoff) = backoff_ms {
432 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
433 }
434 if let Some(details) = details {
435 error.insert(
436 "details".into(),
437 serde_json::from_str(&details).unwrap_or(Value::String(details)),
438 );
439 }
440 obj.insert("error".into(), Value::Object(error));
441 Ok(Value::Object(obj))
442 }
443 }
444 }
445
446 fn secrets_tenant_ctx(&self) -> TypesTenantCtx {
450 let mut ctx = self.config.tenant_ctx();
451 if let Some(exec_ctx) = self.exec_ctx.as_ref()
452 && let Some(team) = exec_ctx.tenant.team.as_ref()
453 && let Ok(team_id) = TeamId::from_str(team)
454 {
455 ctx = ctx.with_team(Some(team_id));
456 }
457 ctx
458 }
459
460 pub fn get_secret(&self, key: &str) -> Result<String> {
461 if provider_core_only::is_enabled() {
462 bail!(provider_core_only::blocked_message("secrets"))
463 }
464 if !self.config.secrets_policy.is_allowed(key) {
465 bail!("secret {key} is not permitted by bindings policy");
466 }
467 if let Some(mock) = &self.mocks
468 && let Some(value) = mock.secrets_lookup(key)
469 {
470 return Ok(value);
471 }
472 let ctx = self.secrets_tenant_ctx();
473 let bytes = read_secret_blocking(&self.secrets, &ctx, &self.pack_id, key)
474 .context("failed to read secret from manager")?;
475 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
476 Ok(value)
477 }
478
479 fn allows_secret_write_in_provider_core_only(&self) -> bool {
480 self.provider_core_component || self.component_ref.is_none()
481 }
482
483 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
484 let tenant_raw = ctx
485 .as_ref()
486 .map(|ctx| ctx.tenant.clone())
487 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
488 .unwrap_or_else(|| self.config.tenant.clone());
489 let env_raw = ctx
490 .as_ref()
491 .map(|ctx| ctx.env.clone())
492 .unwrap_or_else(|| self.default_env.clone());
493 let tenant_id = TenantId::from_str(&tenant_raw)
494 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
495 let env_id = EnvId::from_str(&env_raw)
496 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
497 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
498 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
499 if let Some(team) = exec_ctx.tenant.team.as_ref() {
500 let team_id =
501 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
502 tenant_ctx = tenant_ctx.with_team(Some(team_id));
503 }
504 if let Some(user) = exec_ctx.tenant.user.as_ref() {
505 let user_id =
506 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
507 tenant_ctx = tenant_ctx.with_user(Some(user_id));
508 }
509 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
510 if let Some(node) = exec_ctx.node_id.as_ref() {
511 tenant_ctx = tenant_ctx.with_node(node.clone());
512 }
513 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
514 tenant_ctx = tenant_ctx.with_session(session.clone());
515 }
516 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
517 }
518
519 if let Some(ctx) = ctx {
520 if let Some(team) = ctx.team.or(ctx.team_id) {
521 let team_id =
522 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
523 tenant_ctx = tenant_ctx.with_team(Some(team_id));
524 }
525 if let Some(user) = ctx.user.or(ctx.user_id) {
526 let user_id =
527 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
528 tenant_ctx = tenant_ctx.with_user(Some(user_id));
529 }
530 if let Some(flow) = ctx.flow_id {
531 tenant_ctx = tenant_ctx.with_flow(flow);
532 }
533 if let Some(node) = ctx.node_id {
534 tenant_ctx = tenant_ctx.with_node(node);
535 }
536 if let Some(provider) = ctx.provider_id {
537 tenant_ctx = tenant_ctx.with_provider(provider);
538 }
539 if let Some(session) = ctx.session_id {
540 tenant_ctx = tenant_ctx.with_session(session);
541 }
542 tenant_ctx.trace_id = ctx.trace_id;
543 }
544 Ok(tenant_ctx)
545 }
546
547 fn send_http_request(
548 &mut self,
549 req: HttpRequest,
550 opts: Option<HttpRequestOptionsV1_1>,
551 _ctx: Option<HttpTenantCtx>,
552 ) -> Result<HttpResponse, HttpClientError> {
553 if !self.config.http_enabled {
554 return Err(HttpClientError {
555 code: "denied".into(),
556 message: "http client disabled by policy".into(),
557 });
558 }
559
560 let mut mock_state = None;
561 let raw_body = req.body.clone();
562 if let Some(mock) = &self.mocks
563 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
564 {
565 match mock.http_begin(&meta) {
566 HttpDecision::Mock(response) => {
567 let headers = response
568 .headers
569 .iter()
570 .map(|(k, v)| (k.clone(), v.clone()))
571 .collect();
572 return Ok(HttpResponse {
573 status: response.status,
574 headers,
575 body: response.body.clone().map(|b| b.into_bytes()),
576 });
577 }
578 HttpDecision::Deny(reason) => {
579 return Err(HttpClientError {
580 code: "denied".into(),
581 message: reason,
582 });
583 }
584 HttpDecision::Passthrough { record } => {
585 mock_state = Some((meta, record));
586 }
587 }
588 }
589
590 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
591 let mut builder = self.http_client.request(method, &req.url);
592 for (key, value) in req.headers {
593 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
594 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
595 {
596 builder = builder.header(header, header_value);
597 }
598 }
599
600 if let Some(body) = raw_body.clone() {
601 builder = builder.body(body);
602 }
603
604 if let Some(opts) = opts {
605 if let Some(timeout_ms) = opts.timeout_ms {
606 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
607 }
608 if opts.allow_insecure == Some(true) {
609 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
610 }
611 if let Some(follow_redirects) = opts.follow_redirects
612 && !follow_redirects
613 {
614 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
615 }
616 }
617
618 let response = match builder.send() {
619 Ok(resp) => resp,
620 Err(err) => {
621 warn!(url = %req.url, error = %err, "http client request failed");
622 return Err(HttpClientError {
623 code: "unavailable".into(),
624 message: err.to_string(),
625 });
626 }
627 };
628
629 let status = response.status().as_u16();
630 let headers_vec = response
631 .headers()
632 .iter()
633 .map(|(k, v)| {
634 (
635 k.as_str().to_string(),
636 v.to_str().unwrap_or_default().to_string(),
637 )
638 })
639 .collect::<Vec<_>>();
640 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
641
642 if let Some((meta, true)) = mock_state.take()
643 && let Some(mock) = &self.mocks
644 {
645 let recorded = HttpMockResponse::new(
646 status,
647 headers_vec.clone().into_iter().collect(),
648 body_bytes
649 .as_ref()
650 .map(|b| String::from_utf8_lossy(b).into_owned()),
651 );
652 mock.http_record(&meta, &recorded);
653 }
654
655 Ok(HttpResponse {
656 status,
657 headers: headers_vec,
658 body: body_bytes,
659 })
660 }
661}
662
663fn canonicalize_wasm_secret_key(raw: &str) -> String {
664 raw.trim()
665 .chars()
666 .map(|ch| {
667 let ch = ch.to_ascii_lowercase();
668 match ch {
669 'a'..='z' | '0'..='9' | '_' => ch,
670 _ => '_',
671 }
672 })
673 .collect()
674}
675
676#[cfg(test)]
677mod canonicalize_tests {
678 use super::canonicalize_wasm_secret_key;
679
680 #[test]
681 fn upper_snake_to_lower_snake() {
682 assert_eq!(
683 canonicalize_wasm_secret_key("TELEGRAM_BOT_TOKEN"),
684 "telegram_bot_token"
685 );
686 }
687
688 #[test]
689 fn trim_and_replace_non_alphanumeric() {
690 assert_eq!(
691 canonicalize_wasm_secret_key(" webex-bot-token "),
692 "webex_bot_token"
693 );
694 }
695
696 #[test]
697 fn preserve_existing_lower_snake_with_extra_underscores() {
698 assert_eq!(canonicalize_wasm_secret_key("MiXeD__Case"), "mixed__case");
699 }
700}
701
702impl SecretsStoreHost for HostState {
703 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
704 if provider_core_only::is_enabled() {
705 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
706 return Err(SecretsError::Denied);
707 }
708 if !self.config.secrets_policy.is_allowed(&key) {
709 return Err(SecretsError::Denied);
710 }
711 if let Some(mock) = &self.mocks
712 && let Some(value) = mock.secrets_lookup(&key)
713 {
714 return Ok(Some(value.into_bytes()));
715 }
716 let ctx = self.secrets_tenant_ctx();
717 let canonical_key = canonicalize_wasm_secret_key(&key);
718 match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
719 Ok(bytes) => Ok(Some(bytes)),
720 Err(err) => {
721 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
722 Err(SecretsError::NotFound)
723 }
724 }
725 }
726}
727
728impl SecretsStoreHostV1_1 for HostState {
729 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
730 if provider_core_only::is_enabled() {
731 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
732 return Err(SecretsErrorV1_1::Denied);
733 }
734 if !self.config.secrets_policy.is_allowed(&key) {
735 return Err(SecretsErrorV1_1::Denied);
736 }
737 if let Some(mock) = &self.mocks
738 && let Some(value) = mock.secrets_lookup(&key)
739 {
740 return Ok(Some(value.into_bytes()));
741 }
742 let ctx = self.secrets_tenant_ctx();
743 let canonical_key = canonicalize_wasm_secret_key(&key);
744 match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
745 Ok(bytes) => Ok(Some(bytes)),
746 Err(err) => {
747 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
748 Err(SecretsErrorV1_1::NotFound)
749 }
750 }
751 }
752
753 fn put(&mut self, key: String, value: Vec<u8>) {
754 if key.trim().is_empty() {
755 warn!(secret = %key, "secret write blocked: empty key");
756 panic!("secret write denied for key {key}: invalid key");
757 }
758 if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
759 warn!(
760 secret = %key,
761 component = self.component_ref.as_deref().unwrap_or("<pack>"),
762 "provider-core only mode enabled; blocking secrets store write"
763 );
764 panic!("secret write denied for key {key}: provider-core-only mode");
765 }
766 if !self.config.secrets_policy.is_allowed(&key) {
767 warn!(secret = %key, "secret write denied by bindings policy");
768 panic!("secret write denied for key {key}: policy");
769 }
770 let ctx = self.secrets_tenant_ctx();
771 let canonical_key = canonicalize_wasm_secret_key(&key);
772 if let Err(err) =
773 write_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key, &value)
774 {
775 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret write failed");
776 panic!("secret write failed for key {key}");
777 }
778 }
779}
780
781impl HttpClientHost for HostState {
782 fn send(
783 &mut self,
784 req: HttpRequest,
785 ctx: Option<HttpTenantCtx>,
786 ) -> Result<HttpResponse, HttpClientError> {
787 self.send_http_request(req, None, ctx)
788 }
789}
790
791impl HttpClientHostV1_1 for HostState {
792 fn send(
793 &mut self,
794 req: HttpRequestV1_1,
795 opts: Option<HttpRequestOptionsV1_1>,
796 ctx: Option<HttpTenantCtxV1_1>,
797 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
798 let legacy_req = HttpRequest {
799 method: req.method,
800 url: req.url,
801 headers: req.headers,
802 body: req.body,
803 };
804 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
805 env: ctx.env,
806 tenant: ctx.tenant,
807 tenant_id: ctx.tenant_id,
808 team: ctx.team,
809 team_id: ctx.team_id,
810 user: ctx.user,
811 user_id: ctx.user_id,
812 trace_id: ctx.trace_id,
813 correlation_id: ctx.correlation_id,
814 i18n_id: ctx.i18n_id,
815 attributes: ctx.attributes,
816 session_id: ctx.session_id,
817 flow_id: ctx.flow_id,
818 node_id: ctx.node_id,
819 provider_id: ctx.provider_id,
820 deadline_ms: ctx.deadline_ms,
821 attempt: ctx.attempt,
822 idempotency_key: ctx.idempotency_key,
823 impersonation: ctx.impersonation.map(|imp| http_types_v1_0::Impersonation {
824 actor_id: imp.actor_id,
825 reason: imp.reason,
826 }),
827 });
828
829 self.send_http_request(legacy_req, opts, legacy_ctx)
830 .map(|resp| HttpResponseV1_1 {
831 status: resp.status,
832 headers: resp.headers,
833 body: resp.body,
834 })
835 .map_err(|err| HttpClientErrorV1_1 {
836 code: err.code,
837 message: err.message,
838 })
839 }
840}
841
842impl StateStoreHost for HostState {
843 fn read(
844 &mut self,
845 key: HostStateKey,
846 ctx: Option<StateTenantCtx>,
847 ) -> Result<Vec<u8>, StateError> {
848 let store = match self.state_store.as_ref() {
849 Some(store) => store.clone(),
850 None => {
851 return Err(StateError {
852 code: "unavailable".into(),
853 message: "state store not configured".into(),
854 });
855 }
856 };
857 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
858 Ok(ctx) => ctx,
859 Err(err) => {
860 return Err(StateError {
861 code: "invalid-ctx".into(),
862 message: err.to_string(),
863 });
864 }
865 };
866 #[cfg(feature = "fault-injection")]
867 {
868 let exec_ctx = self.exec_ctx.as_ref();
869 let flow_id = exec_ctx
870 .map(|ctx| ctx.flow_id.as_str())
871 .unwrap_or("unknown");
872 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
873 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
874 let fault_ctx = FaultContext {
875 pack_id: self.pack_id.as_str(),
876 flow_id,
877 node_id,
878 attempt,
879 };
880 if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
881 return Err(StateError {
882 code: "internal".into(),
883 message: err.to_string(),
884 });
885 }
886 }
887 let key = StoreStateKey::from(key);
888 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
889 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
890 Ok(None) => Err(StateError {
891 code: "not_found".into(),
892 message: "state key not found".into(),
893 }),
894 Err(err) => Err(StateError {
895 code: "internal".into(),
896 message: err.to_string(),
897 }),
898 }
899 }
900
901 fn write(
902 &mut self,
903 key: HostStateKey,
904 bytes: Vec<u8>,
905 ctx: Option<StateTenantCtx>,
906 ) -> Result<StateOpAck, StateError> {
907 let store = match self.state_store.as_ref() {
908 Some(store) => store.clone(),
909 None => {
910 return Err(StateError {
911 code: "unavailable".into(),
912 message: "state store not configured".into(),
913 });
914 }
915 };
916 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
917 Ok(ctx) => ctx,
918 Err(err) => {
919 return Err(StateError {
920 code: "invalid-ctx".into(),
921 message: err.to_string(),
922 });
923 }
924 };
925 #[cfg(feature = "fault-injection")]
926 {
927 let exec_ctx = self.exec_ctx.as_ref();
928 let flow_id = exec_ctx
929 .map(|ctx| ctx.flow_id.as_str())
930 .unwrap_or("unknown");
931 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
932 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
933 let fault_ctx = FaultContext {
934 pack_id: self.pack_id.as_str(),
935 flow_id,
936 node_id,
937 attempt,
938 };
939 if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
940 return Err(StateError {
941 code: "internal".into(),
942 message: err.to_string(),
943 });
944 }
945 }
946 let key = StoreStateKey::from(key);
947 let value = serde_json::from_slice(&bytes)
948 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
949 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
950 Ok(()) => Ok(StateOpAck::Ok),
951 Err(err) => Err(StateError {
952 code: "internal".into(),
953 message: err.to_string(),
954 }),
955 }
956 }
957
958 fn delete(
959 &mut self,
960 key: HostStateKey,
961 ctx: Option<StateTenantCtx>,
962 ) -> Result<StateOpAck, StateError> {
963 let store = match self.state_store.as_ref() {
964 Some(store) => store.clone(),
965 None => {
966 return Err(StateError {
967 code: "unavailable".into(),
968 message: "state store not configured".into(),
969 });
970 }
971 };
972 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
973 Ok(ctx) => ctx,
974 Err(err) => {
975 return Err(StateError {
976 code: "invalid-ctx".into(),
977 message: err.to_string(),
978 });
979 }
980 };
981 let key = StoreStateKey::from(key);
982 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
983 Ok(_) => Ok(StateOpAck::Ok),
984 Err(err) => Err(StateError {
985 code: "internal".into(),
986 message: err.to_string(),
987 }),
988 }
989 }
990}
991
992impl TelemetryLoggerHost for HostState {
993 fn log(
994 &mut self,
995 span: TelemetrySpanContext,
996 fields: Vec<(String, String)>,
997 _ctx: Option<TelemetryTenantCtx>,
998 ) -> Result<TelemetryAck, TelemetryError> {
999 if let Some(mock) = &self.mocks
1000 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
1001 {
1002 return Ok(TelemetryAck::Ok);
1003 }
1004 let mut map = serde_json::Map::new();
1005 for (k, v) in fields {
1006 map.insert(k, Value::String(v));
1007 }
1008 tracing::info!(
1009 tenant = %span.tenant,
1010 flow_id = %span.flow_id,
1011 node = ?span.node_id,
1012 provider = %span.provider,
1013 fields = %serde_json::Value::Object(map.clone()),
1014 "telemetry log from pack"
1015 );
1016 Ok(TelemetryAck::Ok)
1017 }
1018}
1019
1020impl RunnerHostHttp for HostState {
1021 fn request(
1022 &mut self,
1023 method: String,
1024 url: String,
1025 headers: Vec<String>,
1026 body: Option<Vec<u8>>,
1027 ) -> Result<Vec<u8>, String> {
1028 let req = HttpRequest {
1029 method,
1030 url,
1031 headers: headers
1032 .chunks(2)
1033 .filter_map(|chunk| {
1034 if chunk.len() == 2 {
1035 Some((chunk[0].clone(), chunk[1].clone()))
1036 } else {
1037 None
1038 }
1039 })
1040 .collect(),
1041 body,
1042 };
1043 match HttpClientHost::send(self, req, None) {
1044 Ok(resp) => Ok(resp.body.unwrap_or_default()),
1045 Err(err) => Err(err.message),
1046 }
1047 }
1048}
1049
1050impl RunnerHostKv for HostState {
1051 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
1052 None
1053 }
1054
1055 fn put(&mut self, _ns: String, _key: String, _val: String) {}
1056}
1057
1058enum ManifestLoad {
1059 New {
1060 manifest: Box<greentic_types::PackManifest>,
1061 flows: PackFlows,
1062 },
1063 Legacy {
1064 manifest: Box<legacy_pack::PackManifest>,
1065 flows: PackFlows,
1066 },
1067}
1068
1069fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
1070 let mut archive = ZipArchive::new(File::open(path)?)
1071 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1072 let bytes = read_entry(&mut archive, "manifest.cbor")
1073 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
1074 match decode_pack_manifest(&bytes) {
1075 Ok(manifest) => {
1076 let cache = PackFlows::from_manifest(manifest.clone());
1077 Ok(ManifestLoad::New {
1078 manifest: Box::new(manifest),
1079 flows: cache,
1080 })
1081 }
1082 Err(err) => {
1083 tracing::debug!(
1084 error = %err,
1085 pack = %path.display(),
1086 "decode_pack_manifest failed for archive; falling back to legacy manifest"
1087 );
1088 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1089 .context("failed to decode legacy pack manifest from manifest.cbor")?;
1090 let flows = load_legacy_flows_from_archive(&mut archive, path, &legacy)?;
1091 Ok(ManifestLoad::Legacy {
1092 manifest: Box::new(legacy),
1093 flows,
1094 })
1095 }
1096 }
1097}
1098
1099fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
1100 let manifest_path = root.join("manifest.cbor");
1101 let bytes = std::fs::read(&manifest_path)
1102 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
1103 match decode_pack_manifest(&bytes) {
1104 Ok(manifest) => {
1105 let cache = PackFlows::from_manifest(manifest.clone());
1106 Ok(ManifestLoad::New {
1107 manifest: Box::new(manifest),
1108 flows: cache,
1109 })
1110 }
1111 Err(err) => {
1112 tracing::debug!(
1113 error = %err,
1114 pack = %root.display(),
1115 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
1116 );
1117 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1118 .context("failed to decode legacy pack manifest from manifest.cbor")?;
1119 let flows = load_legacy_flows_from_dir(root, &legacy)?;
1120 Ok(ManifestLoad::Legacy {
1121 manifest: Box::new(legacy),
1122 flows,
1123 })
1124 }
1125 }
1126}
1127
1128fn load_legacy_flows_from_dir(
1129 root: &Path,
1130 manifest: &legacy_pack::PackManifest,
1131) -> Result<PackFlows> {
1132 build_legacy_flows(manifest, |rel_path| {
1133 let path = root.join(rel_path);
1134 std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
1135 })
1136}
1137
1138fn load_legacy_flows_from_archive(
1139 archive: &mut ZipArchive<File>,
1140 pack_path: &Path,
1141 manifest: &legacy_pack::PackManifest,
1142) -> Result<PackFlows> {
1143 build_legacy_flows(manifest, |rel_path| {
1144 read_entry(archive, rel_path)
1145 .with_context(|| format!("missing flow json {} in {}", rel_path, pack_path.display()))
1146 })
1147}
1148
1149fn build_legacy_flows(
1150 manifest: &legacy_pack::PackManifest,
1151 mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1152) -> Result<PackFlows> {
1153 let mut flows = HashMap::new();
1154 let mut descriptors = Vec::new();
1155
1156 for entry in &manifest.flows {
1157 let bytes = read_json(&entry.file_json)
1158 .with_context(|| format!("missing flow json {}", entry.file_json))?;
1159 let doc = parse_flow_doc_with_legacy_aliases(&bytes, &entry.file_json)?;
1160 let normalized = normalize_flow_doc(doc);
1161 let flow_ir = flow_doc_to_ir(normalized)?;
1162 let flow = flow_ir_to_flow(flow_ir)?;
1163
1164 descriptors.push(FlowDescriptor {
1165 id: entry.id.clone(),
1166 flow_type: entry.kind.clone(),
1167 pack_id: manifest.meta.pack_id.clone(),
1168 profile: manifest.meta.pack_id.clone(),
1169 version: manifest.meta.version.to_string(),
1170 description: None,
1171 });
1172 flows.insert(entry.id.clone(), flow);
1173 }
1174
1175 let mut entry_flows = manifest.meta.entry_flows.clone();
1176 if entry_flows.is_empty() {
1177 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1178 }
1179 let metadata = PackMetadata {
1180 pack_id: manifest.meta.pack_id.clone(),
1181 version: manifest.meta.version.to_string(),
1182 entry_flows,
1183 secret_requirements: Vec::new(),
1184 };
1185
1186 Ok(PackFlows {
1187 descriptors,
1188 flows,
1189 metadata,
1190 })
1191}
1192
1193fn parse_flow_doc_with_legacy_aliases(bytes: &[u8], path: &str) -> Result<FlowDoc> {
1194 let mut value: Value = serde_json::from_slice(bytes)
1195 .with_context(|| format!("failed to decode flow doc {}", path))?;
1196 if let Some(map) = value.as_object_mut()
1197 && !map.contains_key("type")
1198 && let Some(flow_type) = map.remove("flow_type")
1199 {
1200 map.insert("type".to_string(), flow_type);
1201 }
1202 serde_json::from_value(value).with_context(|| format!("failed to decode flow doc {}", path))
1203}
1204
1205pub struct ComponentState {
1206 pub host: HostState,
1207 wasi_ctx: WasiCtx,
1208 wasi_tls_ctx: WasiTlsCtx,
1209 wasi_http_ctx: WasiHttpCtx,
1210 resource_table: ResourceTable,
1211}
1212
1213impl ComponentState {
1214 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1215 let wasi_ctx = policy
1216 .instantiate()
1217 .context("failed to build WASI context")?;
1218 Ok(Self {
1219 host,
1220 wasi_ctx,
1221 wasi_tls_ctx: WasiTlsCtxBuilder::new().build(),
1222 wasi_http_ctx: WasiHttpCtx::new(),
1223 resource_table: ResourceTable::new(),
1224 })
1225 }
1226
1227 fn host_mut(&mut self) -> &mut HostState {
1228 &mut self.host
1229 }
1230
1231 fn should_cancel_host(&mut self) -> bool {
1232 false
1233 }
1234
1235 fn yield_now_host(&mut self) {
1236 }
1238}
1239
1240impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1241 fn should_cancel(&mut self) -> bool {
1242 self.should_cancel_host()
1243 }
1244
1245 fn yield_now(&mut self) {
1246 self.yield_now_host();
1247 }
1248}
1249
1250impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1251 fn should_cancel(&mut self) -> bool {
1252 self.should_cancel_host()
1253 }
1254
1255 fn yield_now(&mut self) {
1256 self.yield_now_host();
1257 }
1258}
1259
1260fn add_component_control_instance(
1261 linker: &mut Linker<ComponentState>,
1262 name: &str,
1263) -> wasmtime::Result<()> {
1264 let mut inst = linker.instance(name)?;
1265 inst.func_wrap(
1266 "should-cancel",
1267 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1268 let host = caller.data_mut();
1269 Ok((host.should_cancel_host(),))
1270 },
1271 )?;
1272 inst.func_wrap(
1273 "yield-now",
1274 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1275 let host = caller.data_mut();
1276 host.yield_now_host();
1277 Ok(())
1278 },
1279 )?;
1280 Ok(())
1281}
1282
1283fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1284 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1285 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1286 Ok(())
1287}
1288
1289pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1290 add_wasi_to_linker(linker)?;
1291
1292 let mut opts = LinkOptions::default();
1294 opts.tls(true);
1295 wasmtime_wasi_tls::add_to_linker(linker, &mut opts, |h: &mut ComponentState| {
1296 WasiTls::new(&h.wasi_tls_ctx, &mut h.resource_table)
1297 })?;
1298
1299 add_wasi_http_to_linker(linker)?;
1301
1302 add_all_v1_to_linker(
1303 linker,
1304 HostFns {
1305 http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1306 http_client: Some(|state: &mut ComponentState| state.host_mut()),
1307 oauth_broker: None,
1308 runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1309 runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1310 telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1311 state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1312 secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1313 secrets_store: None,
1314 },
1315 )?;
1316 add_http_client_client_world_aliases(linker)?;
1317 Ok(())
1318}
1319
1320fn add_http_client_client_world_aliases(linker: &mut Linker<ComponentState>) -> Result<()> {
1321 let mut inst_v1_1 = linker.instance("greentic:http/client@1.1.0")?;
1322 inst_v1_1.func_wrap(
1323 "send",
1324 move |mut caller: StoreContextMut<'_, ComponentState>,
1325 (req, opts, ctx): (
1326 http_client_client_alias::Request,
1327 Option<http_client_client_alias::RequestOptions>,
1328 Option<http_client_client_alias::TenantCtx>,
1329 )| {
1330 let host = caller.data_mut().host_mut();
1331 let result = HttpClientHostV1_1::send(
1332 host,
1333 alias_request_to_host(req),
1334 opts.map(alias_request_options_to_host),
1335 ctx.map(alias_tenant_ctx_to_host),
1336 );
1337 Ok((match result {
1338 Ok(resp) => Ok(alias_response_from_host(resp)),
1339 Err(err) => Err(alias_error_from_host(err)),
1340 },))
1341 },
1342 )?;
1343 let mut inst_v1_0 = linker.instance("greentic:http/client@1.0.0")?;
1344 inst_v1_0.func_wrap(
1345 "send",
1346 move |mut caller: StoreContextMut<'_, ComponentState>,
1347 (req, ctx): (
1348 host_http_client::Request,
1349 Option<host_http_client::TenantCtx>,
1350 )| {
1351 let host = caller.data_mut().host_mut();
1352 let result = HttpClientHost::send(host, req, ctx);
1353 Ok((result,))
1354 },
1355 )?;
1356 Ok(())
1357}
1358
1359fn alias_request_to_host(req: http_client_client_alias::Request) -> host_http_client::RequestV1_1 {
1360 host_http_client::RequestV1_1 {
1361 method: req.method,
1362 url: req.url,
1363 headers: req.headers,
1364 body: req.body,
1365 }
1366}
1367
1368fn alias_request_options_to_host(
1369 opts: http_client_client_alias::RequestOptions,
1370) -> host_http_client::RequestOptionsV1_1 {
1371 host_http_client::RequestOptionsV1_1 {
1372 timeout_ms: opts.timeout_ms,
1373 allow_insecure: opts.allow_insecure,
1374 follow_redirects: opts.follow_redirects,
1375 }
1376}
1377
1378fn alias_tenant_ctx_to_host(
1379 ctx: http_client_client_alias::TenantCtx,
1380) -> host_http_client::TenantCtxV1_1 {
1381 host_http_client::TenantCtxV1_1 {
1382 env: ctx.env,
1383 tenant: ctx.tenant,
1384 tenant_id: ctx.tenant_id,
1385 team: ctx.team,
1386 team_id: ctx.team_id,
1387 user: ctx.user,
1388 user_id: ctx.user_id,
1389 trace_id: ctx.trace_id,
1390 correlation_id: ctx.correlation_id,
1391 i18n_id: ctx.i18n_id,
1392 attributes: ctx.attributes,
1393 session_id: ctx.session_id,
1394 flow_id: ctx.flow_id,
1395 node_id: ctx.node_id,
1396 provider_id: ctx.provider_id,
1397 deadline_ms: ctx.deadline_ms,
1398 attempt: ctx.attempt,
1399 idempotency_key: ctx.idempotency_key,
1400 impersonation: ctx.impersonation.map(|imp| http_types_v1_1::Impersonation {
1401 actor_id: imp.actor_id,
1402 reason: imp.reason,
1403 }),
1404 }
1405}
1406
1407fn alias_response_from_host(
1408 resp: host_http_client::ResponseV1_1,
1409) -> http_client_client_alias::Response {
1410 http_client_client_alias::Response {
1411 status: resp.status,
1412 headers: resp.headers,
1413 body: resp.body,
1414 }
1415}
1416
1417fn alias_error_from_host(
1418 err: host_http_client::HttpClientErrorV1_1,
1419) -> http_client_client_alias::HostError {
1420 http_client_client_alias::HostError {
1421 code: err.code,
1422 message: err.message,
1423 }
1424}
1425
1426impl OAuthHostContext for ComponentState {
1427 fn tenant_id(&self) -> &str {
1428 &self.host.config.tenant
1429 }
1430
1431 fn env(&self) -> &str {
1432 &self.host.default_env
1433 }
1434
1435 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1436 &mut self.host.oauth_host
1437 }
1438
1439 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1440 self.host.oauth_config.as_ref()
1441 }
1442}
1443
1444impl WasiView for ComponentState {
1445 fn ctx(&mut self) -> WasiCtxView<'_> {
1446 WasiCtxView {
1447 ctx: &mut self.wasi_ctx,
1448 table: &mut self.resource_table,
1449 }
1450 }
1451}
1452
1453impl WasiHttpView for ComponentState {
1454 fn http(&mut self) -> WasiHttpCtxView<'_> {
1455 WasiHttpCtxView {
1456 ctx: &mut self.wasi_http_ctx,
1457 table: &mut self.resource_table,
1458 hooks: Default::default(),
1459 }
1460 }
1461}
1462
1463#[allow(unsafe_code)]
1464unsafe impl Send for ComponentState {}
1465#[allow(unsafe_code)]
1466unsafe impl Sync for ComponentState {}
1467
1468impl PackRuntime {
1469 fn allows_state_store(&self, component_ref: &str) -> bool {
1470 if self.state_store.is_none() {
1471 return false;
1472 }
1473 if !self.config.state_store_policy.allow {
1474 return false;
1475 }
1476 let Some(manifest) = self.component_manifests.get(component_ref) else {
1477 return true;
1479 };
1480 manifest
1485 .capabilities
1486 .host
1487 .state
1488 .as_ref()
1489 .map(|caps| caps.read || caps.write)
1490 .unwrap_or(false)
1491 }
1492
1493 pub fn contains_component(&self, component_ref: &str) -> bool {
1494 self.components.contains_key(component_ref)
1495 }
1496
1497 #[allow(clippy::too_many_arguments)]
1498 pub async fn load(
1499 path: impl AsRef<Path>,
1500 config: Arc<HostConfig>,
1501 mocks: Option<Arc<MockLayer>>,
1502 archive_source: Option<&Path>,
1503 session_store: Option<DynSessionStore>,
1504 state_store: Option<DynStateStore>,
1505 wasi_policy: Arc<RunnerWasiPolicy>,
1506 secrets: DynSecretsManager,
1507 oauth_config: Option<OAuthBrokerConfig>,
1508 verify_archive: bool,
1509 component_resolution: ComponentResolution,
1510 ) -> Result<Self> {
1511 let path = path.as_ref();
1512 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1513 let path_meta = std::fs::metadata(&safe_path).ok();
1514 let is_dir = path_meta
1515 .as_ref()
1516 .map(|meta| meta.is_dir())
1517 .unwrap_or(false);
1518 let is_component = !is_dir
1519 && safe_path
1520 .extension()
1521 .and_then(|ext| ext.to_str())
1522 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1523 .unwrap_or(false);
1524 let archive_hint_path = if let Some(source) = archive_source {
1525 let (_, normalized) = normalize_pack_path(source)?;
1526 Some(normalized)
1527 } else if is_component || is_dir {
1528 None
1529 } else {
1530 Some(safe_path.clone())
1531 };
1532 let archive_hint = archive_hint_path.as_deref();
1533 if verify_archive {
1534 if let Some(verify_target) = archive_hint.and_then(|p| {
1535 std::fs::metadata(p)
1536 .ok()
1537 .filter(|meta| meta.is_file())
1538 .map(|_| p)
1539 }) {
1540 verify::verify_pack(verify_target).await?;
1541 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1542 } else {
1543 tracing::debug!("skipping archive verification (no archive source)");
1544 }
1545 }
1546 let engine = Engine::default();
1547 let engine_profile =
1548 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1549 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1550 let mut metadata = PackMetadata::fallback(&safe_path);
1551 let mut manifest = None;
1552 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1553 let mut flows = None;
1554 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1555 if is_dir {
1556 Some(safe_path.clone())
1557 } else {
1558 None
1559 }
1560 });
1561 let (pack_assets_dir, assets_tempdir) =
1562 locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1563 let setup_yaml_exists = pack_assets_dir
1564 .as_ref()
1565 .map(|dir| dir.join("setup.yaml").is_file())
1566 .unwrap_or(false);
1567 tracing::info!(
1568 pack_root = %safe_path.display(),
1569 assets_setup_yaml_exists = setup_yaml_exists,
1570 "pack unpack metadata"
1571 );
1572
1573 if let Some(root) = materialized_root.as_ref() {
1574 match load_manifest_and_flows_from_dir(root) {
1575 Ok(ManifestLoad::New {
1576 manifest: m,
1577 flows: cache,
1578 }) => {
1579 metadata = cache.metadata.clone();
1580 manifest = Some(*m);
1581 flows = Some(cache);
1582 }
1583 Ok(ManifestLoad::Legacy {
1584 manifest: m,
1585 flows: cache,
1586 }) => {
1587 metadata = cache.metadata.clone();
1588 legacy_manifest = Some(m);
1589 flows = Some(cache);
1590 }
1591 Err(err) => {
1592 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1593 }
1594 }
1595 }
1596
1597 if manifest.is_none()
1598 && legacy_manifest.is_none()
1599 && let Some(archive_path) = archive_hint
1600 {
1601 let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1602 format!(
1603 "failed to load manifest.cbor from {}",
1604 archive_path.display()
1605 )
1606 })?;
1607 match manifest_load {
1608 ManifestLoad::New {
1609 manifest: m,
1610 flows: cache,
1611 } => {
1612 metadata = cache.metadata.clone();
1613 manifest = Some(*m);
1614 flows = Some(cache);
1615 }
1616 ManifestLoad::Legacy {
1617 manifest: m,
1618 flows: cache,
1619 } => {
1620 metadata = cache.metadata.clone();
1621 legacy_manifest = Some(m);
1622 flows = Some(cache);
1623 }
1624 }
1625 }
1626 #[cfg(feature = "fault-injection")]
1627 {
1628 let fault_ctx = FaultContext {
1629 pack_id: metadata.pack_id.as_str(),
1630 flow_id: "unknown",
1631 node_id: None,
1632 attempt: 1,
1633 };
1634 maybe_fail(FaultPoint::PackResolve, fault_ctx)
1635 .map_err(|err| anyhow!(err.to_string()))?;
1636 }
1637 let mut pack_lock = None;
1638 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1639 pack_lock = load_pack_lock(&root)?;
1640 if pack_lock.is_some() {
1641 break;
1642 }
1643 }
1644 let component_sources_payload = if pack_lock.is_none() {
1645 if let Some(manifest) = manifest.as_ref() {
1646 manifest
1647 .get_component_sources_v1()
1648 .context("invalid component sources extension")?
1649 } else {
1650 None
1651 }
1652 } else {
1653 None
1654 };
1655 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1656 Some(component_sources_table_from_pack_lock(
1657 lock,
1658 component_resolution.allow_missing_hash,
1659 )?)
1660 } else {
1661 component_sources_table(component_sources_payload.as_ref())?
1662 };
1663 let components = if is_component {
1664 let wasm_bytes = fs::read(&safe_path).await?;
1665 metadata = PackMetadata::from_wasm(&wasm_bytes)
1666 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1667 let name = safe_path
1668 .file_stem()
1669 .map(|s| s.to_string_lossy().to_string())
1670 .unwrap_or_else(|| "component".to_string());
1671 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1672 let mut map = HashMap::new();
1673 map.insert(
1674 name.clone(),
1675 PackComponent {
1676 name,
1677 version: metadata.version.clone(),
1678 component,
1679 },
1680 );
1681 map
1682 } else {
1683 let specs = component_specs(
1684 manifest.as_ref(),
1685 legacy_manifest.as_deref(),
1686 component_sources_payload.as_ref(),
1687 pack_lock.as_ref(),
1688 );
1689 if specs.is_empty() {
1690 HashMap::new()
1691 } else {
1692 let mut loaded = HashMap::new();
1693 let mut missing: HashSet<String> =
1694 specs.iter().map(|spec| spec.id.clone()).collect();
1695 let mut searched = Vec::new();
1696
1697 if !component_resolution.overrides.is_empty() {
1698 load_components_from_overrides(
1699 &cache,
1700 &engine,
1701 &component_resolution.overrides,
1702 &specs,
1703 &mut missing,
1704 &mut loaded,
1705 )
1706 .await?;
1707 searched.push("override map".to_string());
1708 }
1709
1710 if let Some(component_sources) = component_sources.as_ref() {
1711 load_components_from_sources(
1712 &cache,
1713 &engine,
1714 component_sources,
1715 &component_resolution,
1716 &specs,
1717 &mut missing,
1718 &mut loaded,
1719 materialized_root.as_deref(),
1720 archive_hint,
1721 )
1722 .await?;
1723 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1724 }
1725
1726 if let Some(root) = materialized_root.as_ref() {
1727 load_components_from_dir(
1728 &cache,
1729 &engine,
1730 root,
1731 &specs,
1732 &mut missing,
1733 &mut loaded,
1734 )
1735 .await?;
1736 searched.push(format!("components dir {}", root.display()));
1737 }
1738
1739 if let Some(archive_path) = archive_hint {
1740 load_components_from_archive(
1741 &cache,
1742 &engine,
1743 archive_path,
1744 &specs,
1745 &mut missing,
1746 &mut loaded,
1747 )
1748 .await?;
1749 searched.push(format!("archive {}", archive_path.display()));
1750 }
1751
1752 if !missing.is_empty() {
1753 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1754 let sources = if searched.is_empty() {
1755 "no component sources".to_string()
1756 } else {
1757 searched.join(", ")
1758 };
1759 bail!(
1760 "components missing: {}; looked in {}",
1761 missing_list,
1762 sources
1763 );
1764 }
1765
1766 loaded
1767 }
1768 };
1769 let http_client = Arc::clone(&HTTP_CLIENT);
1770 let mut component_manifests = HashMap::new();
1771 if let Some(manifest) = manifest.as_ref() {
1772 for component in &manifest.components {
1773 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1774 }
1775 }
1776 let mut pack_policy = (*wasi_policy).clone();
1777 if let Some(dir) = pack_assets_dir {
1778 tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
1779 pack_policy =
1780 pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
1781 }
1782 let wasi_policy = Arc::new(pack_policy);
1783 Ok(Self {
1784 path: safe_path,
1785 archive_path: archive_hint.map(Path::to_path_buf),
1786 config,
1787 engine,
1788 metadata,
1789 manifest,
1790 legacy_manifest,
1791 component_manifests,
1792 mocks,
1793 flows,
1794 components,
1795 http_client,
1796 pre_cache: Mutex::new(HashMap::new()),
1797 session_store,
1798 state_store,
1799 wasi_policy,
1800 assets_tempdir,
1801 provider_registry: RwLock::new(None),
1802 secrets,
1803 oauth_config,
1804 cache,
1805 })
1806 }
1807
1808 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1809 if let Some(cache) = &self.flows {
1810 return Ok(cache.descriptors.clone());
1811 }
1812 if let Some(manifest) = &self.manifest {
1813 let descriptors = manifest
1814 .flows
1815 .iter()
1816 .map(|flow| FlowDescriptor {
1817 id: flow.id.as_str().to_string(),
1818 flow_type: flow_kind_to_str(flow.kind).to_string(),
1819 pack_id: manifest.pack_id.as_str().to_string(),
1820 profile: manifest.pack_id.as_str().to_string(),
1821 version: manifest.version.to_string(),
1822 description: None,
1823 })
1824 .collect();
1825 return Ok(descriptors);
1826 }
1827 Ok(Vec::new())
1828 }
1829
1830 #[allow(dead_code)]
1831 pub async fn run_flow(
1832 &self,
1833 flow_id: &str,
1834 input: serde_json::Value,
1835 ) -> Result<serde_json::Value> {
1836 let pack = Arc::new(
1837 PackRuntime::load(
1838 &self.path,
1839 Arc::clone(&self.config),
1840 self.mocks.clone(),
1841 self.archive_path.as_deref(),
1842 self.session_store.clone(),
1843 self.state_store.clone(),
1844 Arc::clone(&self.wasi_policy),
1845 self.secrets.clone(),
1846 self.oauth_config.clone(),
1847 false,
1848 ComponentResolution::default(),
1849 )
1850 .await?,
1851 );
1852
1853 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1854 let retry_config = self.config.retry_config().into();
1855 let mocks = pack.mocks.as_deref();
1856 let tenant = self.config.tenant.as_str();
1857
1858 let ctx = FlowContext {
1859 tenant,
1860 pack_id: pack.metadata().pack_id.as_str(),
1861 flow_id,
1862 node_id: None,
1863 tool: None,
1864 action: None,
1865 session_id: None,
1866 provider_id: None,
1867 retry_config,
1868 attempt: 1,
1869 observer: None,
1870 mocks,
1871 };
1872
1873 let execution = engine.execute(ctx, input).await?;
1874 match execution.status {
1875 FlowStatus::Completed => Ok(execution.output),
1876 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1877 "status": "pending",
1878 "reason": wait.reason,
1879 "resume": wait.snapshot,
1880 "response": execution.output,
1881 })),
1882 }
1883 }
1884
1885 pub async fn invoke_component(
1886 &self,
1887 component_ref: &str,
1888 ctx: ComponentExecCtx,
1889 operation: &str,
1890 _config_json: Option<String>,
1891 input_json: String,
1892 ) -> Result<Value> {
1893 let pack_component = self
1894 .components
1895 .get(component_ref)
1896 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1897 let engine = self.engine.clone();
1898 let config = Arc::clone(&self.config);
1899 let http_client = Arc::clone(&self.http_client);
1900 let mocks = self.mocks.clone();
1901 let session_store = self.session_store.clone();
1902 let state_store = self.state_store.clone();
1903 let secrets = Arc::clone(&self.secrets);
1904 let oauth_config = self.oauth_config.clone();
1905 let wasi_policy = Arc::clone(&self.wasi_policy);
1906 let pack_id = self.metadata().pack_id.clone();
1907 let allow_state_store = self.allows_state_store(component_ref);
1908 let component = pack_component.component.clone();
1909 let component_ref_owned = component_ref.to_string();
1910 let operation_owned = operation.to_string();
1911 let input_owned = input_json;
1912 let ctx_owned = ctx;
1913
1914 run_on_wasi_thread("component.invoke", move || {
1915 let mut linker = Linker::new(&engine);
1916 register_all(&mut linker, allow_state_store)?;
1917 add_component_control_to_linker(&mut linker)?;
1918
1919 let host_state = HostState::new(
1920 pack_id.clone(),
1921 config,
1922 http_client,
1923 mocks,
1924 session_store,
1925 state_store,
1926 secrets,
1927 oauth_config,
1928 Some(ctx_owned.clone()),
1929 Some(component_ref_owned.clone()),
1930 false,
1931 )?;
1932 let store_state = ComponentState::new(host_state, wasi_policy)?;
1933 let mut store = wasmtime::Store::new(&engine, store_state);
1934
1935 let invoke_result = HostState::instantiate_component_result(
1936 &mut linker,
1937 &mut store,
1938 &component,
1939 &ctx_owned,
1940 &component_ref_owned,
1941 &operation_owned,
1942 &input_owned,
1943 )?;
1944 HostState::convert_invoke_result(invoke_result)
1945 })
1946 }
1947
1948 pub fn resolve_provider(
1949 &self,
1950 provider_id: Option<&str>,
1951 provider_type: Option<&str>,
1952 ) -> Result<ProviderBinding> {
1953 let registry = self.provider_registry()?;
1954 registry.resolve(provider_id, provider_type)
1955 }
1956
1957 pub async fn invoke_provider(
1958 &self,
1959 binding: &ProviderBinding,
1960 ctx: ComponentExecCtx,
1961 op: &str,
1962 input_json: Vec<u8>,
1963 ) -> Result<Value> {
1964 let component_ref_owned = binding.component_ref.clone();
1965 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
1966 format!("provider component '{component_ref_owned}' not found in pack")
1967 })?;
1968 let component = pack_component.component.clone();
1969
1970 let engine = self.engine.clone();
1971 let config = Arc::clone(&self.config);
1972 let http_client = Arc::clone(&self.http_client);
1973 let mocks = self.mocks.clone();
1974 let session_store = self.session_store.clone();
1975 let state_store = self.state_store.clone();
1976 let secrets = Arc::clone(&self.secrets);
1977 let oauth_config = self.oauth_config.clone();
1978 let wasi_policy = Arc::clone(&self.wasi_policy);
1979 let pack_id = self.metadata().pack_id.clone();
1980 let allow_state_store = self.allows_state_store(&component_ref_owned);
1981 let input_owned = input_json;
1982 let op_owned = op.to_string();
1983 let ctx_owned = ctx;
1984 let world = binding.world.clone();
1985
1986 run_on_wasi_thread("provider.invoke", move || {
1987 let mut linker = Linker::new(&engine);
1988 register_all(&mut linker, allow_state_store)?;
1989 add_component_control_to_linker(&mut linker)?;
1990 let host_state = HostState::new(
1991 pack_id.clone(),
1992 config,
1993 http_client,
1994 mocks,
1995 session_store,
1996 state_store,
1997 secrets,
1998 oauth_config,
1999 Some(ctx_owned.clone()),
2000 Some(component_ref_owned.clone()),
2001 true,
2002 )?;
2003 let store_state = ComponentState::new(host_state, wasi_policy)?;
2004 let mut store = wasmtime::Store::new(&engine, store_state);
2005 let use_schema_core_schema = world.contains("provider-schema-core");
2006 let use_schema_core_path = world.contains("provider/schema-core");
2007 let result = if use_schema_core_schema {
2008 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2009 let pre: SchemaSchemaCorePre<ComponentState> =
2010 SchemaSchemaCorePre::new(pre_instance)?;
2011 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2012 let provider = bindings.greentic_provider_schema_core_schema_core_api();
2013 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2014 } else if use_schema_core_path {
2015 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2016 let path_attempt = (|| -> Result<Vec<u8>> {
2017 let pre: PathSchemaCorePre<ComponentState> =
2018 PathSchemaCorePre::new(pre_instance)?;
2019 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2020 let provider = bindings.greentic_provider_schema_core_api();
2021 Ok(provider.call_invoke(&mut store, &op_owned, &input_owned)?)
2022 })();
2023 match path_attempt {
2024 Ok(value) => value,
2025 Err(path_err)
2026 if path_err.to_string().contains("no exported instance named") =>
2027 {
2028 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2029 let pre: SchemaSchemaCorePre<ComponentState> =
2030 SchemaSchemaCorePre::new(pre_instance)?;
2031 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2032 let provider = bindings.greentic_provider_schema_core_schema_core_api();
2033 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2034 }
2035 Err(path_err) => return Err(path_err),
2036 }
2037 } else {
2038 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2039 let pre: LegacySchemaCorePre<ComponentState> =
2040 LegacySchemaCorePre::new(pre_instance)?;
2041 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2042 let provider = bindings.greentic_provider_core_schema_core_api();
2043 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2044 };
2045 deserialize_json_bytes(result)
2046 })
2047 }
2048
2049 pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
2050 if let Some(registry) = self.provider_registry.read().clone() {
2051 return Ok(registry);
2052 }
2053 let manifest = self
2054 .manifest
2055 .as_ref()
2056 .context("pack manifest required for provider resolution")?;
2057 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
2058 let registry = ProviderRegistry::new(
2059 manifest,
2060 self.state_store.clone(),
2061 &self.config.tenant,
2062 &env,
2063 )?;
2064 *self.provider_registry.write() = Some(registry.clone());
2065 Ok(registry)
2066 }
2067
2068 pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
2069 if self.manifest.is_none() {
2070 return Ok(None);
2071 }
2072 Ok(Some(self.provider_registry()?))
2073 }
2074
2075 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
2076 if let Some(cache) = &self.flows {
2077 return cache
2078 .flows
2079 .get(flow_id)
2080 .cloned()
2081 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
2082 }
2083 if let Some(manifest) = &self.manifest {
2084 let entry = manifest
2085 .flows
2086 .iter()
2087 .find(|f| f.id.as_str() == flow_id)
2088 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
2089 return Ok(entry.flow.clone());
2090 }
2091 bail!("flow '{flow_id}' not available (pack exports disabled)")
2092 }
2093
2094 pub fn metadata(&self) -> &PackMetadata {
2095 &self.metadata
2096 }
2097
2098 pub fn read_asset(&self, asset_path: &str) -> Result<Vec<u8>> {
2103 let normalized = asset_path
2104 .trim_start_matches("assets/")
2105 .trim_start_matches("/assets/");
2106 if let Some(tempdir) = &self.assets_tempdir {
2108 let full = tempdir.path().join("assets").join(normalized);
2109 if full.exists() {
2110 return std::fs::read(&full)
2111 .with_context(|| format!("read asset {}", full.display()));
2112 }
2113 }
2114 let full = self.path.join("assets").join(normalized);
2116 if full.exists() {
2117 return std::fs::read(&full).with_context(|| format!("read asset {}", full.display()));
2118 }
2119 bail!("asset not found: {}", asset_path)
2120 }
2121
2122 pub fn component_manifest(&self, component_ref: &str) -> Option<&ComponentManifest> {
2123 self.component_manifests.get(component_ref)
2124 }
2125
2126 pub fn describe_component_contract_v0_6(&self, component_ref: &str) -> Result<Option<Value>> {
2127 let pack_component = self
2128 .components
2129 .get(component_ref)
2130 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
2131 let engine = self.engine.clone();
2132 let config = Arc::clone(&self.config);
2133 let http_client = Arc::clone(&self.http_client);
2134 let mocks = self.mocks.clone();
2135 let session_store = self.session_store.clone();
2136 let state_store = self.state_store.clone();
2137 let secrets = Arc::clone(&self.secrets);
2138 let oauth_config = self.oauth_config.clone();
2139 let wasi_policy = Arc::clone(&self.wasi_policy);
2140 let pack_id = self.metadata().pack_id.clone();
2141 let allow_state_store = self.allows_state_store(component_ref);
2142 let component = pack_component.component.clone();
2143 let component_ref_owned = component_ref.to_string();
2144
2145 run_on_wasi_thread("component.describe", move || {
2146 let mut linker = Linker::new(&engine);
2147 register_all(&mut linker, allow_state_store)?;
2148 add_component_control_to_linker(&mut linker)?;
2149
2150 let host_state = HostState::new(
2151 pack_id.clone(),
2152 config,
2153 http_client,
2154 mocks,
2155 session_store,
2156 state_store,
2157 secrets,
2158 oauth_config,
2159 None,
2160 Some(component_ref_owned),
2161 false,
2162 )?;
2163 let store_state = ComponentState::new(host_state, wasi_policy)?;
2164 let mut store = wasmtime::Store::new(&engine, store_state);
2165 let pre_instance = linker.instantiate_pre(&component)?;
2166 let pre = match component_api::v0_6_descriptor::ComponentV0V6V0Pre::new(pre_instance) {
2167 Ok(pre) => pre,
2168 Err(_) => return Ok(None),
2169 };
2170 let bytes = block_on(async {
2171 let bindings = pre.instantiate_async(&mut store).await?;
2172 let descriptor = bindings.greentic_component_component_descriptor();
2173 descriptor.call_describe(&mut store)
2174 })?;
2175
2176 if bytes.is_empty() {
2177 return Ok(Some(Value::Null));
2178 }
2179 if let Ok(value) = serde_cbor::from_slice::<Value>(&bytes) {
2180 return Ok(Some(value));
2181 }
2182 if let Ok(value) = serde_json::from_slice::<Value>(&bytes) {
2183 return Ok(Some(value));
2184 }
2185 if let Ok(text) = String::from_utf8(bytes) {
2186 if let Ok(value) = serde_json::from_str::<Value>(&text) {
2187 return Ok(Some(value));
2188 }
2189 return Ok(Some(Value::String(text)));
2190 }
2191 Ok(Some(Value::Null))
2192 })
2193 }
2194
2195 pub fn load_schema_json(&self, schema_ref: &str) -> Result<Option<Value>> {
2196 let rel = normalize_schema_ref(schema_ref)?;
2197 if self.path.is_dir() {
2198 let candidate = self.path.join(&rel);
2199 if candidate.exists() {
2200 let bytes = std::fs::read(&candidate).with_context(|| {
2201 format!("failed to read schema file {}", candidate.display())
2202 })?;
2203 let value = serde_json::from_slice::<Value>(&bytes)
2204 .with_context(|| format!("invalid schema JSON in {}", candidate.display()))?;
2205 return Ok(Some(value));
2206 }
2207 }
2208
2209 if let Some(archive_path) = self
2210 .archive_path
2211 .as_ref()
2212 .or_else(|| path_is_gtpack(&self.path).then_some(&self.path))
2213 {
2214 let file = File::open(archive_path)
2215 .with_context(|| format!("failed to open {}", archive_path.display()))?;
2216 let mut archive = ZipArchive::new(file)
2217 .with_context(|| format!("failed to read pack {}", archive_path.display()))?;
2218 match archive.by_name(&rel) {
2219 Ok(mut entry) => {
2220 let mut bytes = Vec::new();
2221 entry.read_to_end(&mut bytes)?;
2222 let value = serde_json::from_slice::<Value>(&bytes).with_context(|| {
2223 format!("invalid schema JSON in {}:{}", archive_path.display(), rel)
2224 })?;
2225 Ok(Some(value))
2226 }
2227 Err(zip::result::ZipError::FileNotFound) => Ok(None),
2228 Err(err) => Err(anyhow!(err)).with_context(|| {
2229 format!(
2230 "failed to read schema `{}` from {}",
2231 rel,
2232 archive_path.display()
2233 )
2234 }),
2235 }
2236 } else {
2237 Ok(None)
2238 }
2239 }
2240
2241 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
2242 &self.metadata.secret_requirements
2243 }
2244
2245 pub fn missing_secrets(
2246 &self,
2247 tenant_ctx: &TypesTenantCtx,
2248 ) -> Vec<greentic_types::SecretRequirement> {
2249 let env = tenant_ctx.env.as_str().to_string();
2250 let tenant = tenant_ctx.tenant.as_str().to_string();
2251 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
2252 self.required_secrets()
2253 .iter()
2254 .filter(|req| {
2255 if let Some(scope) = &req.scope {
2257 if scope.env != env {
2258 return false;
2259 }
2260 if scope.tenant != tenant {
2261 return false;
2262 }
2263 if let Some(ref team_req) = scope.team
2264 && team.as_ref() != Some(team_req)
2265 {
2266 return false;
2267 }
2268 }
2269 let ctx = self.config.tenant_ctx();
2270 read_secret_blocking(
2271 &self.secrets,
2272 &ctx,
2273 &self.metadata.pack_id,
2274 req.key.as_str(),
2275 )
2276 .is_err()
2277 })
2278 .cloned()
2279 .collect()
2280 }
2281
2282 pub fn for_component_test(
2283 components: Vec<(String, PathBuf)>,
2284 flows: HashMap<String, FlowIR>,
2285 pack_id: &str,
2286 config: Arc<HostConfig>,
2287 ) -> Result<Self> {
2288 let engine = Engine::default();
2289 let engine_profile =
2290 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2291 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
2292 let mut component_map = HashMap::new();
2293 for (name, path) in components {
2294 if !path.exists() {
2295 bail!("component artifact missing: {}", path.display());
2296 }
2297 let wasm_bytes = std::fs::read(&path)?;
2298 let component =
2299 Arc::new(Component::from_binary(&engine, &wasm_bytes).map_err(|err| {
2300 anyhow!("failed to compile component {}: {err}", path.display())
2301 })?);
2302 component_map.insert(
2303 name.clone(),
2304 PackComponent {
2305 name,
2306 version: "0.0.0".into(),
2307 component,
2308 },
2309 );
2310 }
2311
2312 let mut flow_map = HashMap::new();
2313 let mut descriptors = Vec::new();
2314 for (id, ir) in flows {
2315 let flow_type = ir.flow_type.clone();
2316 let flow = flow_ir_to_flow(ir)?;
2317 flow_map.insert(id.clone(), flow);
2318 descriptors.push(FlowDescriptor {
2319 id: id.clone(),
2320 flow_type,
2321 pack_id: pack_id.to_string(),
2322 profile: "test".into(),
2323 version: "0.0.0".into(),
2324 description: None,
2325 });
2326 }
2327 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
2328 let metadata = PackMetadata {
2329 pack_id: pack_id.to_string(),
2330 version: "0.0.0".into(),
2331 entry_flows,
2332 secret_requirements: Vec::new(),
2333 };
2334 let flows_cache = PackFlows {
2335 descriptors: descriptors.clone(),
2336 flows: flow_map,
2337 metadata: metadata.clone(),
2338 };
2339
2340 Ok(Self {
2341 path: PathBuf::new(),
2342 archive_path: None,
2343 config,
2344 engine,
2345 metadata,
2346 manifest: None,
2347 legacy_manifest: None,
2348 component_manifests: HashMap::new(),
2349 mocks: None,
2350 flows: Some(flows_cache),
2351 components: component_map,
2352 http_client: Arc::clone(&HTTP_CLIENT),
2353 pre_cache: Mutex::new(HashMap::new()),
2354 session_store: None,
2355 state_store: None,
2356 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
2357 assets_tempdir: None,
2358 provider_registry: RwLock::new(None),
2359 secrets: crate::secrets::default_manager()?,
2360 oauth_config: None,
2361 cache,
2362 })
2363 }
2364}
2365
2366fn normalize_schema_ref(schema_ref: &str) -> Result<String> {
2367 let candidate = schema_ref.trim();
2368 if candidate.is_empty() {
2369 bail!("schema ref cannot be empty");
2370 }
2371 let path = Path::new(candidate);
2372 if path.is_absolute() {
2373 bail!("schema ref must be relative: {}", schema_ref);
2374 }
2375 let mut normalized = PathBuf::new();
2376 for component in path.components() {
2377 match component {
2378 std::path::Component::Normal(part) => normalized.push(part),
2379 std::path::Component::CurDir => {}
2380 _ => bail!("schema ref must not contain traversal: {}", schema_ref),
2381 }
2382 }
2383 let normalized = normalized
2384 .to_str()
2385 .map(ToString::to_string)
2386 .ok_or_else(|| anyhow!("schema ref must be valid UTF-8"))?;
2387 if normalized.is_empty() {
2388 bail!("schema ref cannot normalize to empty path");
2389 }
2390 Ok(normalized)
2391}
2392
2393fn path_is_gtpack(path: &Path) -> bool {
2394 path.extension()
2395 .and_then(|ext| ext.to_str())
2396 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
2397 .unwrap_or(false)
2398}
2399
2400fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
2401 let message = err.to_string();
2402 message.contains("no exported instance named")
2403 && message.contains(&format!("greentic:component/node@{version}"))
2404}
2405
2406struct PackFlows {
2407 descriptors: Vec<FlowDescriptor>,
2408 flows: HashMap<String, Flow>,
2409 metadata: PackMetadata,
2410}
2411
2412const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
2413 "greentic.pack.runtime_flow",
2414 "greentic.pack.flow_runtime",
2415 "greentic.pack.runtime_flows",
2416];
2417
2418#[derive(Debug, Deserialize)]
2419struct RuntimeFlowBundle {
2420 flows: Vec<RuntimeFlow>,
2421}
2422
2423#[derive(Debug, Deserialize)]
2424struct RuntimeFlow {
2425 id: String,
2426 #[serde(alias = "flow_type")]
2427 kind: FlowKind,
2428 #[serde(default)]
2429 schema_version: Option<String>,
2430 #[serde(default)]
2431 start: Option<String>,
2432 #[serde(default)]
2433 entrypoints: BTreeMap<String, Value>,
2434 nodes: BTreeMap<String, RuntimeNode>,
2435 #[serde(default)]
2436 metadata: Option<FlowMetadata>,
2437}
2438
2439#[derive(Debug, Deserialize)]
2440struct RuntimeNode {
2441 #[serde(alias = "component")]
2442 component_id: String,
2443 #[serde(default, alias = "operation")]
2444 operation_name: Option<String>,
2445 #[serde(default, alias = "payload", alias = "input")]
2446 operation_payload: Value,
2447 #[serde(default)]
2448 routing: Option<Routing>,
2449 #[serde(default)]
2450 telemetry: Option<TelemetryHints>,
2451}
2452
2453fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
2454 if bytes.is_empty() {
2455 return Ok(Value::Null);
2456 }
2457 serde_json::from_slice(&bytes).or_else(|_| {
2458 String::from_utf8(bytes)
2459 .map(Value::String)
2460 .map_err(|err| anyhow!(err))
2461 })
2462}
2463
2464impl PackFlows {
2465 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
2466 if let Some(flows) = flows_from_runtime_extension(&manifest) {
2467 return flows;
2468 }
2469 let descriptors = manifest
2470 .flows
2471 .iter()
2472 .map(|entry| FlowDescriptor {
2473 id: entry.id.as_str().to_string(),
2474 flow_type: flow_kind_to_str(entry.kind).to_string(),
2475 pack_id: manifest.pack_id.as_str().to_string(),
2476 profile: manifest.pack_id.as_str().to_string(),
2477 version: manifest.version.to_string(),
2478 description: None,
2479 })
2480 .collect();
2481 let mut flows = HashMap::new();
2482 for entry in &manifest.flows {
2483 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
2484 }
2485 Self {
2486 metadata: PackMetadata::from_manifest(&manifest),
2487 descriptors,
2488 flows,
2489 }
2490 }
2491}
2492
2493fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
2494 let extensions = manifest.extensions.as_ref()?;
2495 let extension = extensions.iter().find_map(|(key, ext)| {
2496 if RUNTIME_FLOW_EXTENSION_IDS
2497 .iter()
2498 .any(|candidate| candidate == key)
2499 {
2500 Some(ext)
2501 } else {
2502 None
2503 }
2504 })?;
2505 let runtime_flows = match decode_runtime_flow_extension(extension) {
2506 Some(flows) if !flows.is_empty() => flows,
2507 _ => return None,
2508 };
2509
2510 let descriptors = runtime_flows
2511 .iter()
2512 .map(|flow| FlowDescriptor {
2513 id: flow.id.as_str().to_string(),
2514 flow_type: flow_kind_to_str(flow.kind).to_string(),
2515 pack_id: manifest.pack_id.as_str().to_string(),
2516 profile: manifest.pack_id.as_str().to_string(),
2517 version: manifest.version.to_string(),
2518 description: None,
2519 })
2520 .collect::<Vec<_>>();
2521 let flows = runtime_flows
2522 .into_iter()
2523 .map(|flow| (flow.id.as_str().to_string(), flow))
2524 .collect();
2525
2526 Some(PackFlows {
2527 metadata: PackMetadata::from_manifest(manifest),
2528 descriptors,
2529 flows,
2530 })
2531}
2532
2533fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
2534 let value = match extension.inline.as_ref()? {
2535 ExtensionInline::Other(value) => value.clone(),
2536 _ => return None,
2537 };
2538
2539 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
2540 return Some(collect_runtime_flows(bundle.flows));
2541 }
2542
2543 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
2544 return Some(collect_runtime_flows(flows));
2545 }
2546
2547 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
2548 return Some(flows);
2549 }
2550
2551 warn!(
2552 extension = %extension.kind,
2553 version = %extension.version,
2554 "runtime flow extension present but could not be decoded"
2555 );
2556 None
2557}
2558
2559fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
2560 flows
2561 .into_iter()
2562 .filter_map(|flow| match runtime_flow_to_flow(flow) {
2563 Ok(flow) => Some(flow),
2564 Err(err) => {
2565 warn!(error = %err, "failed to decode runtime flow");
2566 None
2567 }
2568 })
2569 .collect()
2570}
2571
2572fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
2573 let flow_id = FlowId::from_str(&runtime.id)
2574 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
2575 let mut entrypoints = runtime.entrypoints;
2576 if entrypoints.is_empty()
2577 && let Some(start) = &runtime.start
2578 {
2579 entrypoints.insert("default".into(), Value::String(start.clone()));
2580 }
2581
2582 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
2583 for (id, node) in runtime.nodes {
2584 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
2585 let component_id = ComponentId::from_str(&node.component_id)
2586 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
2587 let component = FlowComponentRef {
2588 id: component_id,
2589 pack_alias: None,
2590 operation: node.operation_name,
2591 };
2592 let routing = node.routing.unwrap_or(Routing::End);
2593 let telemetry = node.telemetry.unwrap_or_default();
2594 nodes.insert(
2595 node_id.clone(),
2596 Node {
2597 id: node_id,
2598 component,
2599 input: InputMapping {
2600 mapping: node.operation_payload,
2601 },
2602 output: OutputMapping {
2603 mapping: Value::Null,
2604 },
2605 routing,
2606 telemetry,
2607 },
2608 );
2609 }
2610
2611 Ok(Flow {
2612 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
2613 id: flow_id,
2614 kind: runtime.kind,
2615 entrypoints,
2616 nodes,
2617 metadata: runtime.metadata.unwrap_or_default(),
2618 })
2619}
2620
2621fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
2622 match kind {
2623 greentic_types::FlowKind::Messaging => "messaging",
2624 greentic_types::FlowKind::Event => "event",
2625 greentic_types::FlowKind::ComponentConfig => "component-config",
2626 greentic_types::FlowKind::Job => "job",
2627 greentic_types::FlowKind::Http => "http",
2628 }
2629}
2630
2631fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
2632 let mut file = archive
2633 .by_name(name)
2634 .with_context(|| format!("entry {name} missing from archive"))?;
2635 let mut buf = Vec::new();
2636 file.read_to_end(&mut buf)?;
2637 Ok(buf)
2638}
2639
2640fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
2641 for node in doc.nodes.values_mut() {
2642 let Some((component_ref, payload)) = node
2643 .raw
2644 .iter()
2645 .next()
2646 .map(|(key, value)| (key.clone(), value.clone()))
2647 else {
2648 continue;
2649 };
2650 if component_ref.starts_with("emit.") {
2651 node.operation = Some(component_ref);
2652 node.payload = payload;
2653 node.raw.clear();
2654 continue;
2655 }
2656 let (target_component, operation, input, config) =
2657 infer_component_exec(&payload, &component_ref);
2658 let mut payload_obj = serde_json::Map::new();
2659 payload_obj.insert("component".into(), Value::String(target_component));
2661 payload_obj.insert("operation".into(), Value::String(operation));
2662 payload_obj.insert("input".into(), input);
2663 if let Some(cfg) = config {
2664 payload_obj.insert("config".into(), cfg);
2665 }
2666 node.operation = Some("component.exec".to_string());
2667 node.payload = Value::Object(payload_obj);
2668 node.raw.clear();
2669 }
2670 doc
2671}
2672
2673fn infer_component_exec(
2674 payload: &Value,
2675 component_ref: &str,
2676) -> (String, String, Value, Option<Value>) {
2677 let default_op = if component_ref.starts_with("templating.") {
2678 "render"
2679 } else {
2680 "invoke"
2681 }
2682 .to_string();
2683
2684 if let Value::Object(map) = payload {
2685 let op = map
2686 .get("op")
2687 .or_else(|| map.get("operation"))
2688 .and_then(Value::as_str)
2689 .map(|s| s.to_string())
2690 .unwrap_or_else(|| default_op.clone());
2691
2692 let mut input = map.clone();
2693 let config = input.remove("config");
2694 let component = input
2695 .get("component")
2696 .or_else(|| input.get("component_ref"))
2697 .and_then(Value::as_str)
2698 .map(|s| s.to_string())
2699 .unwrap_or_else(|| component_ref.to_string());
2700 input.remove("component");
2701 input.remove("component_ref");
2702 input.remove("op");
2703 input.remove("operation");
2704 return (component, op, Value::Object(input), config);
2705 }
2706
2707 (component_ref.to_string(), default_op, payload.clone(), None)
2708}
2709
2710#[derive(Clone, Debug)]
2711struct ComponentSpec {
2712 id: String,
2713 version: String,
2714 legacy_path: Option<String>,
2715}
2716
2717#[derive(Clone, Debug)]
2718struct ComponentSourceInfo {
2719 digest: Option<String>,
2720 source: ComponentSourceRef,
2721 artifact: ComponentArtifactLocation,
2722 expected_wasm_sha256: Option<String>,
2723 skip_digest_verification: bool,
2724}
2725
2726#[derive(Clone, Debug)]
2727enum ComponentArtifactLocation {
2728 Inline { wasm_path: String },
2729 Remote,
2730}
2731
2732#[derive(Clone, Debug, Deserialize)]
2733struct PackLockV1 {
2734 schema_version: u32,
2735 components: Vec<PackLockComponent>,
2736}
2737
2738#[derive(Clone, Debug, Deserialize)]
2739struct PackLockComponent {
2740 name: String,
2741 #[serde(default, rename = "source_ref")]
2742 source_ref: Option<String>,
2743 #[serde(default, rename = "ref")]
2744 legacy_ref: Option<String>,
2745 #[serde(default)]
2746 component_id: Option<ComponentId>,
2747 #[serde(default)]
2748 bundled: Option<bool>,
2749 #[serde(default, rename = "bundled_path")]
2750 bundled_path: Option<String>,
2751 #[serde(default, rename = "path")]
2752 legacy_path: Option<String>,
2753 #[serde(default)]
2754 wasm_sha256: Option<String>,
2755 #[serde(default, rename = "sha256")]
2756 legacy_sha256: Option<String>,
2757 #[serde(default)]
2758 resolved_digest: Option<String>,
2759 #[serde(default)]
2760 digest: Option<String>,
2761}
2762
2763fn component_specs(
2764 manifest: Option<&greentic_types::PackManifest>,
2765 legacy_manifest: Option<&legacy_pack::PackManifest>,
2766 component_sources: Option<&ComponentSourcesV1>,
2767 pack_lock: Option<&PackLockV1>,
2768) -> Vec<ComponentSpec> {
2769 if let Some(manifest) = manifest {
2770 if !manifest.components.is_empty() {
2771 return manifest
2772 .components
2773 .iter()
2774 .map(|entry| ComponentSpec {
2775 id: entry.id.as_str().to_string(),
2776 version: entry.version.to_string(),
2777 legacy_path: None,
2778 })
2779 .collect();
2780 }
2781 if let Some(lock) = pack_lock {
2782 let mut seen = HashSet::new();
2783 let mut specs = Vec::new();
2784 for entry in &lock.components {
2785 let id = entry
2786 .component_id
2787 .as_ref()
2788 .map(|id| id.as_str())
2789 .unwrap_or(entry.name.as_str());
2790 if seen.insert(id.to_string()) {
2791 specs.push(ComponentSpec {
2792 id: id.to_string(),
2793 version: "0.0.0".to_string(),
2794 legacy_path: None,
2795 });
2796 }
2797 }
2798 return specs;
2799 }
2800 if let Some(sources) = component_sources {
2801 let mut seen = HashSet::new();
2802 let mut specs = Vec::new();
2803 for entry in &sources.components {
2804 let id = entry
2805 .component_id
2806 .as_ref()
2807 .map(|id| id.as_str())
2808 .unwrap_or(entry.name.as_str());
2809 if seen.insert(id.to_string()) {
2810 specs.push(ComponentSpec {
2811 id: id.to_string(),
2812 version: "0.0.0".to_string(),
2813 legacy_path: None,
2814 });
2815 }
2816 }
2817 return specs;
2818 }
2819 }
2820 if let Some(legacy_manifest) = legacy_manifest {
2821 return legacy_manifest
2822 .components
2823 .iter()
2824 .map(|entry| ComponentSpec {
2825 id: entry.name.clone(),
2826 version: entry.version.to_string(),
2827 legacy_path: Some(entry.file_wasm.clone()),
2828 })
2829 .collect();
2830 }
2831 Vec::new()
2832}
2833
2834fn component_sources_table(
2835 sources: Option<&ComponentSourcesV1>,
2836) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2837 let Some(sources) = sources else {
2838 return Ok(None);
2839 };
2840 let mut table = HashMap::new();
2841 for entry in &sources.components {
2842 let artifact = match &entry.artifact {
2843 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2844 wasm_path: wasm_path.clone(),
2845 },
2846 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2847 };
2848 let info = ComponentSourceInfo {
2849 digest: Some(entry.resolved.digest.clone()),
2850 source: entry.source.clone(),
2851 artifact,
2852 expected_wasm_sha256: None,
2853 skip_digest_verification: false,
2854 };
2855 if let Some(component_id) = entry.component_id.as_ref() {
2856 table.insert(component_id.as_str().to_string(), info.clone());
2857 }
2858 table.insert(entry.name.clone(), info);
2859 }
2860 Ok(Some(table))
2861}
2862
2863fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2864 let lock_path = if path.is_dir() {
2865 let candidate = path.join("pack.lock");
2866 if candidate.exists() {
2867 Some(candidate)
2868 } else {
2869 let candidate = path.join("pack.lock.json");
2870 candidate.exists().then_some(candidate)
2871 }
2872 } else {
2873 None
2874 };
2875 let Some(lock_path) = lock_path else {
2876 return Ok(None);
2877 };
2878 let raw = std::fs::read_to_string(&lock_path)
2879 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2880 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2881 if lock.schema_version != 1 {
2882 bail!("pack.lock schema_version must be 1");
2883 }
2884 Ok(Some(lock))
2885}
2886
2887fn find_pack_lock_roots(
2888 pack_path: &Path,
2889 is_dir: bool,
2890 archive_hint: Option<&Path>,
2891) -> Vec<PathBuf> {
2892 if is_dir {
2893 return vec![pack_path.to_path_buf()];
2894 }
2895 let mut roots = Vec::new();
2896 if let Some(archive_path) = archive_hint {
2897 if let Some(parent) = archive_path.parent() {
2898 roots.push(parent.to_path_buf());
2899 if let Some(grandparent) = parent.parent() {
2900 roots.push(grandparent.to_path_buf());
2901 }
2902 }
2903 } else if let Some(parent) = pack_path.parent() {
2904 roots.push(parent.to_path_buf());
2905 if let Some(grandparent) = parent.parent() {
2906 roots.push(grandparent.to_path_buf());
2907 }
2908 }
2909 roots
2910}
2911
2912fn normalize_sha256(digest: &str) -> Result<String> {
2913 let trimmed = digest.trim();
2914 if trimmed.is_empty() {
2915 bail!("sha256 digest cannot be empty");
2916 }
2917 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2918 if stripped.is_empty() {
2919 bail!("sha256 digest must include hex bytes after sha256:");
2920 }
2921 return Ok(trimmed.to_string());
2922 }
2923 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2924 return Ok(format!("sha256:{trimmed}"));
2925 }
2926 bail!("sha256 digest must be hex or sha256:<hex>");
2927}
2928
2929fn component_sources_table_from_pack_lock(
2930 lock: &PackLockV1,
2931 allow_missing_hash: bool,
2932) -> Result<HashMap<String, ComponentSourceInfo>> {
2933 let mut table = HashMap::new();
2934 let mut names = HashSet::new();
2935 for entry in &lock.components {
2936 if !names.insert(entry.name.clone()) {
2937 bail!(
2938 "pack.lock contains duplicate component name `{}`",
2939 entry.name
2940 );
2941 }
2942 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2943 (Some(primary), Some(legacy)) => {
2944 if primary != legacy {
2945 bail!(
2946 "pack.lock component {} has conflicting refs: {} vs {}",
2947 entry.name,
2948 primary,
2949 legacy
2950 );
2951 }
2952 primary.as_str()
2953 }
2954 (Some(primary), None) => primary.as_str(),
2955 (None, Some(legacy)) => legacy.as_str(),
2956 (None, None) => {
2957 bail!("pack.lock component {} missing source_ref", entry.name);
2958 }
2959 };
2960 let source: ComponentSourceRef = source_ref
2961 .parse()
2962 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2963 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2964 (Some(primary), Some(legacy)) => {
2965 if primary != legacy {
2966 bail!(
2967 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2968 entry.name,
2969 primary,
2970 legacy
2971 );
2972 }
2973 Some(primary.clone())
2974 }
2975 (Some(primary), None) => Some(primary.clone()),
2976 (None, Some(legacy)) => Some(legacy.clone()),
2977 (None, None) => None,
2978 };
2979 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2980 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2981 let wasm_path = bundled_path.ok_or_else(|| {
2982 anyhow!(
2983 "pack.lock component {} marked bundled but bundled_path is missing",
2984 entry.name
2985 )
2986 })?;
2987 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2988 (Some(primary), Some(legacy)) => {
2989 if primary != legacy {
2990 bail!(
2991 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2992 entry.name,
2993 primary,
2994 legacy
2995 );
2996 }
2997 Some(primary.as_str())
2998 }
2999 (Some(primary), None) => Some(primary.as_str()),
3000 (None, Some(legacy)) => Some(legacy.as_str()),
3001 (None, None) => None,
3002 };
3003 let expected = match expected_raw {
3004 Some(value) => Some(normalize_sha256(value)?),
3005 None => None,
3006 };
3007 if expected.is_none() && !allow_missing_hash {
3008 bail!(
3009 "pack.lock component {} missing wasm_sha256 for bundled component",
3010 entry.name
3011 );
3012 }
3013 (
3014 ComponentArtifactLocation::Inline { wasm_path },
3015 expected.clone(),
3016 expected,
3017 allow_missing_hash && expected_raw.is_none(),
3018 )
3019 } else {
3020 if source.is_tag() {
3021 bail!(
3022 "component {} uses tag ref {} but is not bundled; rebuild the pack",
3023 entry.name,
3024 source
3025 );
3026 }
3027 let expected = entry
3028 .resolved_digest
3029 .as_deref()
3030 .or(entry.digest.as_deref())
3031 .ok_or_else(|| {
3032 anyhow!(
3033 "pack.lock component {} missing resolved_digest for remote component",
3034 entry.name
3035 )
3036 })?;
3037 (
3038 ComponentArtifactLocation::Remote,
3039 Some(normalize_digest(expected)),
3040 None,
3041 false,
3042 )
3043 };
3044 let info = ComponentSourceInfo {
3045 digest,
3046 source,
3047 artifact,
3048 expected_wasm_sha256,
3049 skip_digest_verification,
3050 };
3051 if let Some(component_id) = entry.component_id.as_ref() {
3052 let key = component_id.as_str().to_string();
3053 if table.contains_key(&key) {
3054 bail!(
3055 "pack.lock contains duplicate component id `{}`",
3056 component_id.as_str()
3057 );
3058 }
3059 table.insert(key, info.clone());
3060 }
3061 if entry.name
3062 != entry
3063 .component_id
3064 .as_ref()
3065 .map(|id| id.as_str())
3066 .unwrap_or("")
3067 {
3068 table.insert(entry.name.clone(), info);
3069 }
3070 }
3071 Ok(table)
3072}
3073
3074fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
3075 if let Some(path) = &spec.legacy_path {
3076 return root.join(path);
3077 }
3078 root.join("components").join(format!("{}.wasm", spec.id))
3079}
3080
3081fn normalize_digest(digest: &str) -> String {
3082 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
3083 digest.to_string()
3084 } else {
3085 format!("sha256:{digest}")
3086 }
3087}
3088
3089fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
3090 if digest.starts_with("blake3:") {
3091 let hash = blake3::hash(bytes);
3092 return Ok(format!("blake3:{}", hash.to_hex()));
3093 }
3094 let mut hasher = sha2::Sha256::new();
3095 hasher.update(bytes);
3096 Ok(format!("sha256:{:x}", hasher.finalize()))
3097}
3098
3099fn compute_sha256_digest_for(bytes: &[u8]) -> String {
3100 let mut hasher = sha2::Sha256::new();
3101 hasher.update(bytes);
3102 format!("sha256:{:x}", hasher.finalize())
3103}
3104
3105fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
3106 let wasm_digest = digest
3107 .map(normalize_digest)
3108 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
3109 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
3110}
3111
3112async fn compile_component_with_cache(
3113 cache: &CacheManager,
3114 engine: &Engine,
3115 digest: Option<&str>,
3116 bytes: Vec<u8>,
3117) -> Result<Arc<Component>> {
3118 let key = build_artifact_key(cache, digest, &bytes);
3119 cache.get_component(engine, &key, || Ok(bytes)).await
3120}
3121
3122fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
3123 let normalized_expected = normalize_digest(expected);
3124 let actual = compute_digest_for(bytes, &normalized_expected)?;
3125 if normalize_digest(&actual) != normalized_expected {
3126 bail!(
3127 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
3128 );
3129 }
3130 Ok(())
3131}
3132
3133fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
3134 let normalized_expected = normalize_sha256(expected)?;
3135 let actual = compute_sha256_digest_for(bytes);
3136 if actual != normalized_expected {
3137 bail!(
3138 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
3139 );
3140 }
3141 Ok(())
3142}
3143
3144#[cfg(test)]
3145mod pack_lock_tests {
3146 use super::*;
3147 use tempfile::TempDir;
3148
3149 #[test]
3150 fn pack_lock_tag_ref_requires_bundle() {
3151 let lock = PackLockV1 {
3152 schema_version: 1,
3153 components: vec![PackLockComponent {
3154 name: "templates".to_string(),
3155 source_ref: Some("oci://registry.test/templates:latest".to_string()),
3156 legacy_ref: None,
3157 component_id: None,
3158 bundled: Some(false),
3159 bundled_path: None,
3160 legacy_path: None,
3161 wasm_sha256: None,
3162 legacy_sha256: None,
3163 resolved_digest: None,
3164 digest: None,
3165 }],
3166 };
3167 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
3168 assert!(
3169 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
3170 "unexpected error: {err}"
3171 );
3172 }
3173
3174 #[test]
3175 fn bundled_hash_mismatch_errors() {
3176 let rt = tokio::runtime::Runtime::new().expect("runtime");
3177 let temp = TempDir::new().expect("temp dir");
3178 let engine = Engine::default();
3179 let engine_profile =
3180 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
3181 let cache_config = CacheConfig {
3182 root: temp.path().join("cache"),
3183 ..CacheConfig::default()
3184 };
3185 let cache = CacheManager::new(cache_config, engine_profile);
3186 let wasm_path = temp.path().join("component.wasm");
3187 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
3188 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
3189 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
3190 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
3191
3192 let spec = ComponentSpec {
3193 id: "qa.process".to_string(),
3194 version: "0.0.0".to_string(),
3195 legacy_path: None,
3196 };
3197 let mut missing = HashSet::new();
3198 missing.insert(spec.id.clone());
3199
3200 let mut sources = HashMap::new();
3201 sources.insert(
3202 spec.id.clone(),
3203 ComponentSourceInfo {
3204 digest: Some("sha256:deadbeef".to_string()),
3205 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
3206 artifact: ComponentArtifactLocation::Inline {
3207 wasm_path: "component.wasm".to_string(),
3208 },
3209 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
3210 skip_digest_verification: false,
3211 },
3212 );
3213
3214 let mut loaded = HashMap::new();
3215 let result = rt.block_on(load_components_from_sources(
3216 &cache,
3217 &engine,
3218 &sources,
3219 &ComponentResolution::default(),
3220 &[spec],
3221 &mut missing,
3222 &mut loaded,
3223 Some(temp.path()),
3224 None,
3225 ));
3226 let err = result.unwrap_err();
3227 assert!(
3228 err.to_string().contains("bundled digest mismatch"),
3229 "unexpected error: {err}"
3230 );
3231 }
3232}
3233
3234#[cfg(test)]
3235mod pack_resolution_prop_tests {
3236 use super::*;
3237 use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
3238 use proptest::prelude::*;
3239 use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
3240 use std::collections::BTreeSet;
3241 use std::path::Path;
3242 use std::str::FromStr;
3243
3244 #[derive(Clone, Debug)]
3245 enum ResolveRequest {
3246 ById(String),
3247 ByName(String),
3248 }
3249
3250 #[derive(Clone, Debug, PartialEq, Eq)]
3251 struct ResolvedComponent {
3252 key: String,
3253 source: String,
3254 artifact: String,
3255 digest: Option<String>,
3256 expected_wasm_sha256: Option<String>,
3257 skip_digest_verification: bool,
3258 }
3259
3260 #[derive(Clone, Debug, PartialEq, Eq)]
3261 struct ResolveError {
3262 code: String,
3263 message: String,
3264 context_key: String,
3265 }
3266
3267 #[derive(Clone, Debug)]
3268 struct Scenario {
3269 pack_lock: Option<PackLockV1>,
3270 component_sources: Option<ComponentSourcesV1>,
3271 request: ResolveRequest,
3272 expected_sha256: Option<String>,
3273 bytes: Vec<u8>,
3274 }
3275
3276 fn resolve_component_test(
3277 sources: Option<&ComponentSourcesV1>,
3278 lock: Option<&PackLockV1>,
3279 request: &ResolveRequest,
3280 ) -> Result<ResolvedComponent, ResolveError> {
3281 let table = if let Some(lock) = lock {
3282 component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
3283 code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
3284 message: err.to_string(),
3285 context_key: request_key(request).to_string(),
3286 })?
3287 } else {
3288 let sources = component_sources_table(sources).map_err(|err| ResolveError {
3289 code: "component_sources_error".to_string(),
3290 message: err.to_string(),
3291 context_key: request_key(request).to_string(),
3292 })?;
3293 sources.ok_or_else(|| ResolveError {
3294 code: "missing_component_sources".to_string(),
3295 message: "component sources not provided".to_string(),
3296 context_key: request_key(request).to_string(),
3297 })?
3298 };
3299
3300 let key = request_key(request);
3301 let source = table.get(key).ok_or_else(|| ResolveError {
3302 code: "component_not_found".to_string(),
3303 message: format!("component {key} not found"),
3304 context_key: key.to_string(),
3305 })?;
3306
3307 Ok(ResolvedComponent {
3308 key: key.to_string(),
3309 source: source.source.to_string(),
3310 artifact: match source.artifact {
3311 ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
3312 ComponentArtifactLocation::Remote => "remote".to_string(),
3313 },
3314 digest: source.digest.clone(),
3315 expected_wasm_sha256: source.expected_wasm_sha256.clone(),
3316 skip_digest_verification: source.skip_digest_verification,
3317 })
3318 }
3319
3320 fn request_key(request: &ResolveRequest) -> &str {
3321 match request {
3322 ResolveRequest::ById(value) => value.as_str(),
3323 ResolveRequest::ByName(value) => value.as_str(),
3324 }
3325 }
3326
3327 fn classify_pack_lock_error(message: &str) -> &'static str {
3328 if message.contains("duplicate component name") {
3329 "duplicate_name"
3330 } else if message.contains("duplicate component id") {
3331 "duplicate_id"
3332 } else if message.contains("conflicting refs") {
3333 "conflicting_ref"
3334 } else if message.contains("conflicting bundled paths") {
3335 "conflicting_bundled_path"
3336 } else if message.contains("conflicting wasm_sha256") {
3337 "conflicting_wasm_sha256"
3338 } else if message.contains("missing source_ref") {
3339 "missing_source_ref"
3340 } else if message.contains("marked bundled but bundled_path is missing") {
3341 "missing_bundled_path"
3342 } else if message.contains("missing wasm_sha256") {
3343 "missing_wasm_sha256"
3344 } else if message.contains("tag ref") && message.contains("not bundled") {
3345 "tag_ref_requires_bundle"
3346 } else if message.contains("missing resolved_digest") {
3347 "missing_resolved_digest"
3348 } else if message.contains("invalid component ref") {
3349 "invalid_component_ref"
3350 } else if message.contains("sha256 digest") {
3351 "invalid_sha256"
3352 } else {
3353 "unknown_error"
3354 }
3355 }
3356
3357 fn known_error_codes() -> BTreeSet<&'static str> {
3358 [
3359 "component_sources_error",
3360 "missing_component_sources",
3361 "component_not_found",
3362 "duplicate_name",
3363 "duplicate_id",
3364 "conflicting_ref",
3365 "conflicting_bundled_path",
3366 "conflicting_wasm_sha256",
3367 "missing_source_ref",
3368 "missing_bundled_path",
3369 "missing_wasm_sha256",
3370 "tag_ref_requires_bundle",
3371 "missing_resolved_digest",
3372 "invalid_component_ref",
3373 "invalid_sha256",
3374 "unknown_error",
3375 ]
3376 .into_iter()
3377 .collect()
3378 }
3379
3380 fn proptest_config() -> ProptestConfig {
3381 let cases = std::env::var("PROPTEST_CASES")
3382 .ok()
3383 .and_then(|value| value.parse::<u32>().ok())
3384 .unwrap_or(128);
3385 ProptestConfig {
3386 cases,
3387 failure_persistence: None,
3388 ..ProptestConfig::default()
3389 }
3390 }
3391
3392 fn proptest_seed() -> Option<[u8; 32]> {
3393 let seed = std::env::var("PROPTEST_SEED")
3394 .ok()
3395 .and_then(|value| value.parse::<u64>().ok())?;
3396 let mut bytes = [0u8; 32];
3397 bytes[..8].copy_from_slice(&seed.to_le_bytes());
3398 Some(bytes)
3399 }
3400
3401 fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
3402 let config = ProptestConfig {
3403 cases,
3404 failure_persistence: None,
3405 ..ProptestConfig::default()
3406 };
3407 let mut runner = match seed {
3408 Some(bytes) => {
3409 TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
3410 }
3411 None => TestRunner::new(config),
3412 };
3413 runner
3414 .run(&strategy, |scenario| {
3415 run_scenario(&scenario);
3416 Ok(())
3417 })
3418 .unwrap();
3419 }
3420
3421 fn run_scenario(scenario: &Scenario) {
3422 let known_codes = known_error_codes();
3423 let first = resolve_component_test(
3424 scenario.component_sources.as_ref(),
3425 scenario.pack_lock.as_ref(),
3426 &scenario.request,
3427 );
3428 let second = resolve_component_test(
3429 scenario.component_sources.as_ref(),
3430 scenario.pack_lock.as_ref(),
3431 &scenario.request,
3432 );
3433 assert_eq!(normalize_result(&first), normalize_result(&second));
3434
3435 if let Some(lock) = scenario.pack_lock.as_ref() {
3436 let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
3437 assert_eq!(normalize_result(&first), normalize_result(&lock_only));
3438 }
3439
3440 if let Err(err) = first.as_ref() {
3441 assert!(
3442 known_codes.contains(err.code.as_str()),
3443 "unexpected error code {}: {}",
3444 err.code,
3445 err.message
3446 );
3447 }
3448
3449 if let Some(expected) = scenario.expected_sha256.as_deref() {
3450 let expected_ok =
3451 verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
3452 let actual = compute_sha256_digest_for(&scenario.bytes);
3453 if actual == normalize_sha256(expected).unwrap_or_default() {
3454 assert!(expected_ok, "expected sha256 match to succeed");
3455 } else {
3456 assert!(!expected_ok, "expected sha256 mismatch to fail");
3457 }
3458 }
3459 }
3460
3461 fn normalize_result(
3462 result: &Result<ResolvedComponent, ResolveError>,
3463 ) -> Result<ResolvedComponent, ResolveError> {
3464 match result {
3465 Ok(value) => Ok(value.clone()),
3466 Err(err) => Err(err.clone()),
3467 }
3468 }
3469
3470 fn scenario_strategy() -> impl Strategy<Value = Scenario> {
3471 let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
3472 let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
3473 let tag_ref = any::<bool>();
3474 let bundled = any::<bool>();
3475 let include_sha = any::<bool>();
3476 let include_component_id = any::<bool>();
3477 let request_by_id = any::<bool>();
3478 let use_lock = any::<bool>();
3479 let use_sources = any::<bool>();
3480 let bytes = prop::collection::vec(any::<u8>(), 1..64);
3481
3482 (
3483 name,
3484 alt_name,
3485 tag_ref,
3486 bundled,
3487 include_sha,
3488 include_component_id,
3489 request_by_id,
3490 use_lock,
3491 use_sources,
3492 bytes,
3493 )
3494 .prop_map(
3495 |(
3496 name,
3497 alt_name,
3498 tag_ref,
3499 bundled,
3500 include_sha,
3501 include_component_id,
3502 request_by_id,
3503 use_lock,
3504 use_sources,
3505 bytes,
3506 )| {
3507 let component_id_str = if include_component_id {
3508 alt_name.clone()
3509 } else {
3510 name.clone()
3511 };
3512 let component_id = ComponentId::from_str(&component_id_str).ok();
3513 let source_ref = if tag_ref {
3514 format!("oci://registry.test/{name}:v1")
3515 } else {
3516 format!(
3517 "oci://registry.test/{name}@sha256:{}",
3518 hex::encode([0x11u8; 32])
3519 )
3520 };
3521 let expected_sha256 = if bundled && include_sha {
3522 Some(compute_sha256_digest_for(&bytes))
3523 } else {
3524 None
3525 };
3526
3527 let lock_component = PackLockComponent {
3528 name: name.clone(),
3529 source_ref: Some(source_ref),
3530 legacy_ref: None,
3531 component_id,
3532 bundled: Some(bundled),
3533 bundled_path: if bundled {
3534 Some(format!("components/{name}.wasm"))
3535 } else {
3536 None
3537 },
3538 legacy_path: None,
3539 wasm_sha256: expected_sha256.clone(),
3540 legacy_sha256: None,
3541 resolved_digest: if bundled {
3542 None
3543 } else {
3544 Some("sha256:deadbeef".to_string())
3545 },
3546 digest: None,
3547 };
3548
3549 let pack_lock = if use_lock {
3550 Some(PackLockV1 {
3551 schema_version: 1,
3552 components: vec![lock_component],
3553 })
3554 } else {
3555 None
3556 };
3557
3558 let component_sources = if use_sources {
3559 Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
3560 name: name.clone(),
3561 component_id: ComponentId::from_str(&name).ok(),
3562 source: ComponentSourceRef::from_str(
3563 "oci://registry.test/component@sha256:deadbeef",
3564 )
3565 .expect("component ref"),
3566 resolved: ResolvedComponentV1 {
3567 digest: "sha256:deadbeef".to_string(),
3568 signature: None,
3569 signed_by: None,
3570 },
3571 artifact: if bundled {
3572 ArtifactLocationV1::Inline {
3573 wasm_path: format!("components/{name}.wasm"),
3574 manifest_path: None,
3575 }
3576 } else {
3577 ArtifactLocationV1::Remote
3578 },
3579 licensing_hint: None,
3580 metering_hint: None,
3581 }]))
3582 } else {
3583 None
3584 };
3585
3586 let request = if request_by_id {
3587 ResolveRequest::ById(component_id_str.clone())
3588 } else {
3589 ResolveRequest::ByName(name.clone())
3590 };
3591
3592 Scenario {
3593 pack_lock,
3594 component_sources,
3595 request,
3596 expected_sha256,
3597 bytes,
3598 }
3599 },
3600 )
3601 }
3602
3603 #[test]
3604 fn pack_resolution_proptest() {
3605 let seed = proptest_seed();
3606 run_cases(scenario_strategy(), proptest_config().cases, seed);
3607 }
3608
3609 #[test]
3610 fn pack_resolution_regression_seeds() {
3611 let seeds_path =
3612 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
3613 let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
3614 for line in raw.lines() {
3615 let line = line.trim();
3616 if line.is_empty() || line.starts_with('#') {
3617 continue;
3618 }
3619 let seed = line.parse::<u64>().expect("seed must be an integer");
3620 let mut bytes = [0u8; 32];
3621 bytes[..8].copy_from_slice(&seed.to_le_bytes());
3622 run_cases(scenario_strategy(), 1, Some(bytes));
3623 }
3624 }
3625}
3626
3627fn locate_pack_assets(
3628 materialized_root: Option<&Path>,
3629 archive_hint: Option<&Path>,
3630) -> Result<(Option<PathBuf>, Option<TempDir>)> {
3631 if let Some(root) = materialized_root {
3632 let assets = root.join("assets");
3633 if assets.is_dir() {
3634 return Ok((Some(assets), None));
3635 }
3636 }
3637 if let Some(path) = archive_hint
3638 && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
3639 {
3640 return Ok((Some(assets), Some(tempdir)));
3641 }
3642 Ok((None, None))
3643}
3644
3645fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
3646 let file =
3647 File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
3648 let mut archive =
3649 ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
3650 let temp = TempDir::new().context("failed to create temporary assets directory")?;
3651 let mut found = false;
3652 for idx in 0..archive.len() {
3653 let mut entry = archive.by_index(idx)?;
3654 let name = entry.name();
3655 if !name.starts_with("assets/") {
3656 continue;
3657 }
3658 let dest = temp.path().join(name);
3659 if name.ends_with('/') {
3660 std::fs::create_dir_all(&dest)?;
3661 found = true;
3662 continue;
3663 }
3664 if let Some(parent) = dest.parent() {
3665 std::fs::create_dir_all(parent)?;
3666 }
3667 let mut outfile = std::fs::File::create(&dest)?;
3668 std::io::copy(&mut entry, &mut outfile)?;
3669 found = true;
3670 }
3671 if found {
3672 let assets_path = temp.path().join("assets");
3673 Ok(Some((temp, assets_path)))
3674 } else {
3675 Ok(None)
3676 }
3677}
3678
3679fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
3680 let mut opts = DistOptions {
3681 allow_tags: true,
3682 ..DistOptions::default()
3683 };
3684 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
3685 opts.cache_dir = cache_dir;
3686 }
3687 if component_resolution.dist_offline {
3688 opts.offline = true;
3689 }
3690 opts
3691}
3692
3693#[allow(clippy::too_many_arguments)]
3694async fn load_components_from_sources(
3695 cache: &CacheManager,
3696 engine: &Engine,
3697 component_sources: &HashMap<String, ComponentSourceInfo>,
3698 component_resolution: &ComponentResolution,
3699 specs: &[ComponentSpec],
3700 missing: &mut HashSet<String>,
3701 into: &mut HashMap<String, PackComponent>,
3702 materialized_root: Option<&Path>,
3703 archive_hint: Option<&Path>,
3704) -> Result<()> {
3705 let mut archive = if let Some(path) = archive_hint {
3706 Some(
3707 ZipArchive::new(File::open(path)?)
3708 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
3709 )
3710 } else {
3711 None
3712 };
3713 let mut dist_client: Option<DistClient> = None;
3714
3715 for spec in specs {
3716 if !missing.contains(&spec.id) {
3717 continue;
3718 }
3719 let Some(source) = component_sources.get(&spec.id) else {
3720 continue;
3721 };
3722
3723 let bytes = match &source.artifact {
3724 ComponentArtifactLocation::Inline { wasm_path } => {
3725 if let Some(root) = materialized_root {
3726 let path = root.join(wasm_path);
3727 if path.exists() {
3728 std::fs::read(&path).with_context(|| {
3729 format!(
3730 "failed to read inline component {} from {}",
3731 spec.id,
3732 path.display()
3733 )
3734 })?
3735 } else if archive.is_none() {
3736 bail!("inline component {} missing at {}", spec.id, path.display());
3737 } else {
3738 read_entry(
3739 archive.as_mut().expect("archive present when needed"),
3740 wasm_path,
3741 )
3742 .with_context(|| {
3743 format!(
3744 "inline component {} missing at {} in pack archive",
3745 spec.id, wasm_path
3746 )
3747 })?
3748 }
3749 } else if let Some(archive) = archive.as_mut() {
3750 read_entry(archive, wasm_path).with_context(|| {
3751 format!(
3752 "inline component {} missing at {} in pack archive",
3753 spec.id, wasm_path
3754 )
3755 })?
3756 } else {
3757 bail!(
3758 "inline component {} missing and no pack source available",
3759 spec.id
3760 );
3761 }
3762 }
3763 ComponentArtifactLocation::Remote => {
3764 if source.source.is_tag() {
3765 bail!(
3766 "component {} uses tag ref {} but is not bundled; rebuild the pack",
3767 spec.id,
3768 source.source
3769 );
3770 }
3771 let client = dist_client.get_or_insert_with(|| {
3772 DistClient::new(dist_options_from(component_resolution))
3773 });
3774 let reference = source.source.to_string();
3775 fault::maybe_fail_asset(&reference)
3776 .await
3777 .with_context(|| format!("fault injection blocked asset {reference}"))?;
3778 let digest = source.digest.as_deref().ok_or_else(|| {
3779 anyhow!(
3780 "component {} missing expected digest for remote component",
3781 spec.id
3782 )
3783 })?;
3784 let cache_path = if let Ok(cache_path) = client.fetch_digest(digest).await {
3785 cache_path
3786 } else if component_resolution.dist_offline {
3787 client
3788 .fetch_digest(digest)
3789 .await
3790 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3791 } else {
3792 let source = client
3793 .parse_source(&reference)
3794 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3795 let descriptor = client
3796 .resolve(source, ResolvePolicy)
3797 .await
3798 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3799 let resolved = client
3800 .fetch(&descriptor, CachePolicy)
3801 .await
3802 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3803 let expected = normalize_digest(digest);
3804 let actual = normalize_digest(&resolved.digest);
3805 if expected != actual {
3806 bail!(
3807 "component {} digest mismatch after fetch: expected {}, got {}",
3808 spec.id,
3809 expected,
3810 actual
3811 );
3812 }
3813 resolved.cache_path.ok_or_else(|| {
3814 anyhow!(
3815 "component {} resolved from {} but cache path is missing",
3816 spec.id,
3817 reference
3818 )
3819 })?
3820 };
3821 std::fs::read(&cache_path).with_context(|| {
3822 format!(
3823 "failed to read cached component {} from {}",
3824 spec.id,
3825 cache_path.display()
3826 )
3827 })?
3828 }
3829 };
3830
3831 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3832 verify_wasm_sha256(&spec.id, expected, &bytes)?;
3833 } else if source.skip_digest_verification {
3834 let actual = compute_sha256_digest_for(&bytes);
3835 warn!(
3836 component_id = %spec.id,
3837 digest = %actual,
3838 "bundled component missing wasm_sha256; allowing due to flag"
3839 );
3840 } else {
3841 let expected = source.digest.as_deref().ok_or_else(|| {
3842 anyhow!(
3843 "component {} missing expected digest for verification",
3844 spec.id
3845 )
3846 })?;
3847 verify_component_digest(&spec.id, expected, &bytes)?;
3848 }
3849 let component =
3850 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3851 .await
3852 .with_context(|| format!("failed to compile component {}", spec.id))?;
3853 into.insert(
3854 spec.id.clone(),
3855 PackComponent {
3856 name: spec.id.clone(),
3857 version: spec.version.clone(),
3858 component,
3859 },
3860 );
3861 missing.remove(&spec.id);
3862 }
3863
3864 Ok(())
3865}
3866
3867fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3868 match err {
3869 DistError::NotFound { reference: missing } => anyhow!(
3870 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3871 component_id,
3872 missing,
3873 reference
3874 ),
3875 DistError::Offline { reference: blocked } => anyhow!(
3876 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3877 component_id,
3878 blocked,
3879 reference
3880 ),
3881 DistError::Unauthorized { target } => anyhow!(
3882 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3883 component_id,
3884 target,
3885 reference
3886 ),
3887 other => anyhow!(
3888 "failed to resolve component {} from {}: {}",
3889 component_id,
3890 reference,
3891 other
3892 ),
3893 }
3894}
3895
3896async fn load_components_from_overrides(
3897 cache: &CacheManager,
3898 engine: &Engine,
3899 overrides: &HashMap<String, PathBuf>,
3900 specs: &[ComponentSpec],
3901 missing: &mut HashSet<String>,
3902 into: &mut HashMap<String, PackComponent>,
3903) -> Result<()> {
3904 for spec in specs {
3905 if !missing.contains(&spec.id) {
3906 continue;
3907 }
3908 let Some(path) = overrides.get(&spec.id) else {
3909 continue;
3910 };
3911 let bytes = std::fs::read(path)
3912 .with_context(|| format!("failed to read override component {}", path.display()))?;
3913 let component = compile_component_with_cache(cache, engine, None, bytes)
3914 .await
3915 .with_context(|| {
3916 format!(
3917 "failed to compile component {} from override {}",
3918 spec.id,
3919 path.display()
3920 )
3921 })?;
3922 into.insert(
3923 spec.id.clone(),
3924 PackComponent {
3925 name: spec.id.clone(),
3926 version: spec.version.clone(),
3927 component,
3928 },
3929 );
3930 missing.remove(&spec.id);
3931 }
3932 Ok(())
3933}
3934
3935async fn load_components_from_dir(
3936 cache: &CacheManager,
3937 engine: &Engine,
3938 root: &Path,
3939 specs: &[ComponentSpec],
3940 missing: &mut HashSet<String>,
3941 into: &mut HashMap<String, PackComponent>,
3942) -> Result<()> {
3943 for spec in specs {
3944 if !missing.contains(&spec.id) {
3945 continue;
3946 }
3947 let path = component_path_for_spec(root, spec);
3948 if !path.exists() {
3949 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3950 continue;
3951 }
3952 let bytes = std::fs::read(&path)
3953 .with_context(|| format!("failed to read component {}", path.display()))?;
3954 let component = compile_component_with_cache(cache, engine, None, bytes)
3955 .await
3956 .with_context(|| {
3957 format!(
3958 "failed to compile component {} from {}",
3959 spec.id,
3960 path.display()
3961 )
3962 })?;
3963 into.insert(
3964 spec.id.clone(),
3965 PackComponent {
3966 name: spec.id.clone(),
3967 version: spec.version.clone(),
3968 component,
3969 },
3970 );
3971 missing.remove(&spec.id);
3972 }
3973 Ok(())
3974}
3975
3976async fn load_components_from_archive(
3977 cache: &CacheManager,
3978 engine: &Engine,
3979 path: &Path,
3980 specs: &[ComponentSpec],
3981 missing: &mut HashSet<String>,
3982 into: &mut HashMap<String, PackComponent>,
3983) -> Result<()> {
3984 let mut archive = ZipArchive::new(File::open(path)?)
3985 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3986 for spec in specs {
3987 if !missing.contains(&spec.id) {
3988 continue;
3989 }
3990 let file_name = spec
3991 .legacy_path
3992 .clone()
3993 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3994 let bytes = match read_entry(&mut archive, &file_name) {
3995 Ok(bytes) => bytes,
3996 Err(err) => {
3997 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3998 continue;
3999 }
4000 };
4001 let component = compile_component_with_cache(cache, engine, None, bytes)
4002 .await
4003 .with_context(|| format!("failed to compile component {}", spec.id))?;
4004 into.insert(
4005 spec.id.clone(),
4006 PackComponent {
4007 name: spec.id.clone(),
4008 version: spec.version.clone(),
4009 component,
4010 },
4011 );
4012 missing.remove(&spec.id);
4013 }
4014 Ok(())
4015}
4016
4017#[cfg(test)]
4018mod tests {
4019 use super::*;
4020 use greentic_flow::model::{FlowDoc, NodeDoc};
4021 use indexmap::IndexMap;
4022 use serde_json::json;
4023
4024 #[test]
4025 fn normalizes_raw_component_to_component_exec() {
4026 let mut nodes = IndexMap::new();
4027 let mut raw = IndexMap::new();
4028 raw.insert(
4029 "templating.handlebars".into(),
4030 json!({ "template": "Hi {{name}}" }),
4031 );
4032 nodes.insert(
4033 "start".into(),
4034 NodeDoc {
4035 raw,
4036 routing: json!([{"out": true}]),
4037 ..Default::default()
4038 },
4039 );
4040 let doc = FlowDoc {
4041 id: "welcome".into(),
4042 title: None,
4043 description: None,
4044 flow_type: "messaging".into(),
4045 start: Some("start".into()),
4046 parameters: json!({}),
4047 tags: Vec::new(),
4048 schema_version: None,
4049 entrypoints: IndexMap::new(),
4050 meta: None,
4051 nodes,
4052 };
4053
4054 let normalized = normalize_flow_doc(doc);
4055 let node = normalized.nodes.get("start").expect("node exists");
4056 assert_eq!(node.operation.as_deref(), Some("component.exec"));
4057 assert!(node.raw.is_empty());
4058 let payload = node.payload.as_object().expect("payload object");
4059 assert_eq!(
4060 payload.get("component"),
4061 Some(&Value::String("templating.handlebars".into()))
4062 );
4063 assert_eq!(
4064 payload.get("operation"),
4065 Some(&Value::String("render".into()))
4066 );
4067 let input = payload.get("input").unwrap();
4068 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
4069 }
4070}
4071
4072#[derive(Clone, Debug, Default, Serialize, Deserialize)]
4073pub struct PackMetadata {
4074 pub pack_id: String,
4075 pub version: String,
4076 #[serde(default)]
4077 pub entry_flows: Vec<String>,
4078 #[serde(default)]
4079 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
4080}
4081
4082impl PackMetadata {
4083 fn from_wasm(bytes: &[u8]) -> Option<Self> {
4084 let parser = Parser::new(0);
4085 for payload in parser.parse_all(bytes) {
4086 let payload = payload.ok()?;
4087 match payload {
4088 Payload::CustomSection(section) => {
4089 if section.name() == "greentic.manifest"
4090 && let Ok(meta) = Self::from_bytes(section.data())
4091 {
4092 return Some(meta);
4093 }
4094 }
4095 Payload::DataSection(reader) => {
4096 for segment in reader.into_iter().flatten() {
4097 if let Ok(meta) = Self::from_bytes(segment.data) {
4098 return Some(meta);
4099 }
4100 }
4101 }
4102 _ => {}
4103 }
4104 }
4105 None
4106 }
4107
4108 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
4109 #[derive(Deserialize)]
4110 struct RawManifest {
4111 pack_id: String,
4112 version: String,
4113 #[serde(default)]
4114 entry_flows: Vec<String>,
4115 #[serde(default)]
4116 flows: Vec<RawFlow>,
4117 #[serde(default)]
4118 secret_requirements: Vec<greentic_types::SecretRequirement>,
4119 }
4120
4121 #[derive(Deserialize)]
4122 struct RawFlow {
4123 id: String,
4124 }
4125
4126 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
4127 let mut entry_flows = if manifest.entry_flows.is_empty() {
4128 manifest.flows.iter().map(|f| f.id.clone()).collect()
4129 } else {
4130 manifest.entry_flows.clone()
4131 };
4132 entry_flows.retain(|id| !id.is_empty());
4133 Ok(Self {
4134 pack_id: manifest.pack_id,
4135 version: manifest.version,
4136 entry_flows,
4137 secret_requirements: manifest.secret_requirements,
4138 })
4139 }
4140
4141 pub fn fallback(path: &Path) -> Self {
4142 let pack_id = path
4143 .file_stem()
4144 .map(|s| s.to_string_lossy().into_owned())
4145 .unwrap_or_else(|| "unknown-pack".to_string());
4146 Self {
4147 pack_id,
4148 version: "0.0.0".to_string(),
4149 entry_flows: Vec::new(),
4150 secret_requirements: Vec::new(),
4151 }
4152 }
4153
4154 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
4155 let entry_flows = manifest
4156 .flows
4157 .iter()
4158 .map(|flow| flow.id.as_str().to_string())
4159 .collect::<Vec<_>>();
4160 Self {
4161 pack_id: manifest.pack_id.as_str().to_string(),
4162 version: manifest.version.to_string(),
4163 entry_flows,
4164 secret_requirements: manifest.secret_requirements.clone(),
4165 }
4166 }
4167}