1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::{
16 schema_core::SchemaCorePre as LegacySchemaCorePre,
17 schema_core_path::SchemaCorePre as PathSchemaCorePre,
18 schema_core_schema::SchemaCorePre as SchemaSchemaCorePre,
19};
20use crate::provider_core_only;
21use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
22use anyhow::{Context, Result, anyhow, bail};
23use futures::executor::block_on;
24use greentic_distributor_client::dist::{
25 CachePolicy, DistClient, DistError, DistOptions, ResolvePolicy,
26};
27use greentic_interfaces_wasmtime::host_helpers::v1::{
28 self as host_v1, HostFns, add_all_v1_to_linker,
29 runner_host_http::RunnerHostHttp,
30 runner_host_kv::RunnerHostKv,
31 secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
32 state_store::{
33 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
34 StateStoreHost, TenantCtx as StateTenantCtx,
35 },
36 telemetry_logger::{
37 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
38 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
39 TenantCtx as TelemetryTenantCtx,
40 },
41};
42use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::http::http_client as http_client_client_alias;
43use greentic_interfaces_wasmtime::{
44 http_client_client_v1_0::greentic::interfaces_types::types as http_types_v1_0,
45 http_client_client_v1_1::greentic::interfaces_types::types as http_types_v1_1,
46};
47use greentic_pack::builder as legacy_pack;
48use greentic_types::flow::FlowHasher;
49use greentic_types::{
50 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
51 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
52 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
53 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
54 pack_manifest::ExtensionInline,
55};
56use host_v1::http_client as host_http_client;
57use host_v1::http_client::{
58 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
59 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
60 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
61 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
62};
63use indexmap::IndexMap;
64use once_cell::sync::Lazy;
65use parking_lot::{Mutex, RwLock};
66use reqwest::blocking::Client as BlockingClient;
67use runner_core::normalize_under_root;
68use serde::{Deserialize, Serialize};
69use serde_cbor;
70use serde_json::{self, Value};
71use sha2::Digest;
72use tempfile::TempDir;
73use tokio::fs;
74use wasmparser::{Parser, Payload};
75use wasmtime::{Store, StoreContextMut};
76use zip::ZipArchive;
77
78use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
79use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
80use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
81#[cfg(feature = "fault-injection")]
82use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
83
84use crate::config::HostConfig;
85use crate::fault;
86use crate::secrets::{DynSecretsManager, read_secret_blocking, write_secret_blocking};
87use crate::storage::state::STATE_PREFIX;
88use crate::storage::{DynSessionStore, DynStateStore};
89use crate::verify;
90use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
91use tracing::warn;
92use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
93use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
94
95use greentic_flow::model::FlowDoc;
96
97#[allow(dead_code)]
98pub struct PackRuntime {
99 path: PathBuf,
101 archive_path: Option<PathBuf>,
103 config: Arc<HostConfig>,
104 engine: Engine,
105 metadata: PackMetadata,
106 manifest: Option<greentic_types::PackManifest>,
107 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
108 component_manifests: HashMap<String, ComponentManifest>,
109 mocks: Option<Arc<MockLayer>>,
110 flows: Option<PackFlows>,
111 components: HashMap<String, PackComponent>,
112 http_client: Arc<BlockingClient>,
113 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
114 session_store: Option<DynSessionStore>,
115 state_store: Option<DynStateStore>,
116 wasi_policy: Arc<RunnerWasiPolicy>,
117 assets_tempdir: Option<TempDir>,
118 provider_registry: RwLock<Option<ProviderRegistry>>,
119 secrets: DynSecretsManager,
120 oauth_config: Option<OAuthBrokerConfig>,
121 cache: CacheManager,
122}
123
124struct PackComponent {
125 #[allow(dead_code)]
126 name: String,
127 #[allow(dead_code)]
128 version: String,
129 component: Arc<Component>,
130}
131
132fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
133where
134 F: FnOnce() -> Result<T> + Send + 'static,
135 T: Send + 'static,
136{
137 let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
138 let handle = builder
139 .spawn(move || {
140 let pid = std::process::id();
141 let thread_id = std::thread::current().id();
142 let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
143 tracing::info!(
144 event = "wasmtime.thread.start",
145 task = task_name,
146 pid,
147 thread_id = ?thread_id,
148 tokio_handle_present,
149 "starting Wasmtime thread"
150 );
151 task()
152 })
153 .context("failed to spawn Wasmtime thread")?;
154 handle
155 .join()
156 .map_err(|err| {
157 let reason = if let Some(msg) = err.downcast_ref::<&str>() {
158 msg.to_string()
159 } else if let Some(msg) = err.downcast_ref::<String>() {
160 msg.clone()
161 } else {
162 "unknown panic".to_string()
163 };
164 anyhow!("Wasmtime thread panicked: {reason}")
165 })
166 .and_then(|res| res)
167}
168
169#[derive(Debug, Default, Clone)]
170pub struct ComponentResolution {
171 pub materialized_root: Option<PathBuf>,
173 pub overrides: HashMap<String, PathBuf>,
175 pub dist_offline: bool,
177 pub dist_cache_dir: Option<PathBuf>,
179 pub allow_missing_hash: bool,
181}
182
183fn build_blocking_client() -> BlockingClient {
184 std::thread::spawn(|| {
185 BlockingClient::builder()
186 .no_proxy()
187 .build()
188 .expect("blocking client")
189 })
190 .join()
191 .expect("client build thread panicked")
192}
193
194fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
195 let (root, candidate) = if path.is_absolute() {
196 let parent = path
197 .parent()
198 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
199 let root = parent
200 .canonicalize()
201 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
202 let file = path
203 .file_name()
204 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
205 (root, PathBuf::from(file))
206 } else {
207 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
208 let base = if let Some(parent) = path.parent() {
209 cwd.join(parent)
210 } else {
211 cwd
212 };
213 let root = base
214 .canonicalize()
215 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
216 let file = path
217 .file_name()
218 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
219 (root, PathBuf::from(file))
220 };
221 let safe = normalize_under_root(&root, &candidate)?;
222 Ok((root, safe))
223}
224
225static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct FlowDescriptor {
229 pub id: String,
230 #[serde(rename = "type")]
231 pub flow_type: String,
232 pub pack_id: String,
233 pub profile: String,
234 pub version: String,
235 #[serde(default)]
236 pub description: Option<String>,
237}
238
239pub struct HostState {
240 #[allow(dead_code)]
241 pack_id: String,
242 config: Arc<HostConfig>,
243 http_client: Arc<BlockingClient>,
244 default_env: String,
245 #[allow(dead_code)]
246 session_store: Option<DynSessionStore>,
247 state_store: Option<DynStateStore>,
248 mocks: Option<Arc<MockLayer>>,
249 secrets: DynSecretsManager,
250 oauth_config: Option<OAuthBrokerConfig>,
251 oauth_host: OAuthBrokerHost,
252 exec_ctx: Option<ComponentExecCtx>,
253 component_ref: Option<String>,
254 provider_core_component: bool,
255}
256
257impl HostState {
258 #[allow(clippy::default_constructed_unit_structs)]
259 #[allow(clippy::too_many_arguments)]
260 pub fn new(
261 pack_id: String,
262 config: Arc<HostConfig>,
263 http_client: Arc<BlockingClient>,
264 mocks: Option<Arc<MockLayer>>,
265 session_store: Option<DynSessionStore>,
266 state_store: Option<DynStateStore>,
267 secrets: DynSecretsManager,
268 oauth_config: Option<OAuthBrokerConfig>,
269 exec_ctx: Option<ComponentExecCtx>,
270 component_ref: Option<String>,
271 provider_core_component: bool,
272 ) -> Result<Self> {
273 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
274 Ok(Self {
275 pack_id,
276 config,
277 http_client,
278 default_env,
279 session_store,
280 state_store,
281 mocks,
282 secrets,
283 oauth_config,
284 oauth_host: OAuthBrokerHost::default(),
285 exec_ctx,
286 component_ref,
287 provider_core_component,
288 })
289 }
290
291 fn instantiate_component_result(
292 linker: &mut Linker<ComponentState>,
293 store: &mut Store<ComponentState>,
294 component: &Component,
295 ctx: &ComponentExecCtx,
296 component_ref: &str,
297 operation: &str,
298 input_json: &str,
299 ) -> Result<InvokeResult> {
300 let pre_instance = linker.instantiate_pre(component)?;
301 match component_api::v0_6::ComponentPre::new(pre_instance) {
302 Ok(pre) => {
303 let envelope = component_api::envelope_v0_6(ctx, component_ref, input_json)?;
304 let operation_owned = operation.to_string();
305 let result = block_on(async {
306 let bindings = pre.instantiate_async(&mut *store).await?;
307 let node = bindings.greentic_component_node();
308 node.call_invoke(&mut *store, &operation_owned, &envelope)
309 })?;
310 component_api::invoke_result_from_v0_6(result)
311 }
312 Err(err_v06) => {
313 if !is_missing_node_export(&err_v06, "0.6.0") {
314 return Err(err_v06.into());
315 }
316 let pre_instance = linker.instantiate_pre(component)?;
317 match component_api::v0_5::ComponentPre::new(pre_instance) {
318 Ok(pre) => {
319 let result = block_on(async {
320 let bindings = pre.instantiate_async(&mut *store).await?;
321 let node = bindings.greentic_component_node();
322 let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
323 let operation_owned = operation.to_string();
324 let input_owned = input_json.to_string();
325 node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
326 })?;
327 Ok(component_api::invoke_result_from_v0_5(result))
328 }
329 Err(err) => {
330 if !is_missing_node_export(&err, "0.5.0") {
331 return Err(err.into());
332 }
333 let pre_instance = linker.instantiate_pre(component)?;
334 match component_api::v0_4::ComponentPre::new(pre_instance) {
335 Ok(pre) => {
336 let result = block_on(async {
337 let bindings = pre.instantiate_async(&mut *store).await?;
338 let node = bindings.greentic_component_node();
339 let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
340 let operation_owned = operation.to_string();
341 let input_owned = input_json.to_string();
342 node.call_invoke(
343 &mut *store,
344 &ctx_v04,
345 &operation_owned,
346 &input_owned,
347 )
348 })?;
349 Ok(component_api::invoke_result_from_v0_4(result))
350 }
351 Err(err_v04) => {
352 if is_missing_node_export(&err_v04, "0.4.0") {
353 Self::try_v06_runtime(linker, store, component, input_json)
354 } else {
355 Err(err_v04.into())
356 }
357 }
358 }
359 }
360 }
361 }
362 }
363 }
364
365 fn try_v06_runtime(
368 linker: &mut Linker<ComponentState>,
369 store: &mut Store<ComponentState>,
370 component: &Component,
371 input_json: &str,
372 ) -> Result<InvokeResult> {
373 let pre_instance = linker.instantiate_pre(component)?;
374 let pre = component_api::v0_6_runtime::ComponentV0V6RuntimePre::new(pre_instance).map_err(
375 |err| err.context("component exports neither node@0.5/0.4 nor component-runtime@0.6"),
376 )?;
377
378 let result = block_on(async {
379 let bindings = pre.instantiate_async(&mut *store).await?;
380 let runtime = bindings.greentic_component_component_runtime();
381
382 let input_value: Value = serde_json::from_str(input_json).unwrap_or(Value::Null);
384 let input_cbor =
385 serde_cbor::to_vec(&input_value).context("encode input as CBOR for v0.6")?;
386 let empty_state = serde_cbor::to_vec(&Value::Object(Default::default()))
387 .context("encode empty state")?;
388
389 let run_result = runtime
390 .call_run(&mut *store, &input_cbor, &empty_state)
391 .map_err(|err| err.context("v0.6 component-runtime::run call failed"))?;
392
393 let output_value: Value = serde_cbor::from_slice(&run_result.output)
395 .context("decode v0.6 run output CBOR")?;
396 let output_json = serde_json::to_string(&output_value)
397 .context("serialize v0.6 run output to JSON")?;
398
399 Ok::<_, anyhow::Error>(output_json)
400 })?;
401
402 Ok(InvokeResult::Ok(result))
403 }
404
405 fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
406 match result {
407 InvokeResult::Ok(body) => {
408 if body.is_empty() {
409 return Ok(Value::Null);
410 }
411 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
412 }
413 InvokeResult::Err(NodeError {
414 code,
415 message,
416 retryable,
417 backoff_ms,
418 details,
419 }) => {
420 let mut obj = serde_json::Map::new();
421 obj.insert("ok".into(), Value::Bool(false));
422 let mut error = serde_json::Map::new();
423 error.insert("code".into(), Value::String(code));
424 error.insert("message".into(), Value::String(message));
425 error.insert("retryable".into(), Value::Bool(retryable));
426 if let Some(backoff) = backoff_ms {
427 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
428 }
429 if let Some(details) = details {
430 error.insert(
431 "details".into(),
432 serde_json::from_str(&details).unwrap_or(Value::String(details)),
433 );
434 }
435 obj.insert("error".into(), Value::Object(error));
436 Ok(Value::Object(obj))
437 }
438 }
439 }
440
441 fn secrets_tenant_ctx(&self) -> TypesTenantCtx {
445 let mut ctx = self.config.tenant_ctx();
446 if let Some(exec_ctx) = self.exec_ctx.as_ref()
447 && let Some(team) = exec_ctx.tenant.team.as_ref()
448 && let Ok(team_id) = TeamId::from_str(team)
449 {
450 ctx = ctx.with_team(Some(team_id));
451 }
452 ctx
453 }
454
455 pub fn get_secret(&self, key: &str) -> Result<String> {
456 if provider_core_only::is_enabled() {
457 bail!(provider_core_only::blocked_message("secrets"))
458 }
459 if !self.config.secrets_policy.is_allowed(key) {
460 bail!("secret {key} is not permitted by bindings policy");
461 }
462 if let Some(mock) = &self.mocks
463 && let Some(value) = mock.secrets_lookup(key)
464 {
465 return Ok(value);
466 }
467 let ctx = self.secrets_tenant_ctx();
468 let bytes = read_secret_blocking(&self.secrets, &ctx, &self.pack_id, key)
469 .context("failed to read secret from manager")?;
470 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
471 Ok(value)
472 }
473
474 fn allows_secret_write_in_provider_core_only(&self) -> bool {
475 self.provider_core_component || self.component_ref.is_none()
476 }
477
478 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
479 let tenant_raw = ctx
480 .as_ref()
481 .map(|ctx| ctx.tenant.clone())
482 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
483 .unwrap_or_else(|| self.config.tenant.clone());
484 let env_raw = ctx
485 .as_ref()
486 .map(|ctx| ctx.env.clone())
487 .unwrap_or_else(|| self.default_env.clone());
488 let tenant_id = TenantId::from_str(&tenant_raw)
489 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
490 let env_id = EnvId::from_str(&env_raw)
491 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
492 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
493 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
494 if let Some(team) = exec_ctx.tenant.team.as_ref() {
495 let team_id =
496 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
497 tenant_ctx = tenant_ctx.with_team(Some(team_id));
498 }
499 if let Some(user) = exec_ctx.tenant.user.as_ref() {
500 let user_id =
501 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
502 tenant_ctx = tenant_ctx.with_user(Some(user_id));
503 }
504 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
505 if let Some(node) = exec_ctx.node_id.as_ref() {
506 tenant_ctx = tenant_ctx.with_node(node.clone());
507 }
508 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
509 tenant_ctx = tenant_ctx.with_session(session.clone());
510 }
511 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
512 }
513
514 if let Some(ctx) = ctx {
515 if let Some(team) = ctx.team.or(ctx.team_id) {
516 let team_id =
517 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
518 tenant_ctx = tenant_ctx.with_team(Some(team_id));
519 }
520 if let Some(user) = ctx.user.or(ctx.user_id) {
521 let user_id =
522 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
523 tenant_ctx = tenant_ctx.with_user(Some(user_id));
524 }
525 if let Some(flow) = ctx.flow_id {
526 tenant_ctx = tenant_ctx.with_flow(flow);
527 }
528 if let Some(node) = ctx.node_id {
529 tenant_ctx = tenant_ctx.with_node(node);
530 }
531 if let Some(provider) = ctx.provider_id {
532 tenant_ctx = tenant_ctx.with_provider(provider);
533 }
534 if let Some(session) = ctx.session_id {
535 tenant_ctx = tenant_ctx.with_session(session);
536 }
537 tenant_ctx.trace_id = ctx.trace_id;
538 }
539 Ok(tenant_ctx)
540 }
541
542 fn send_http_request(
543 &mut self,
544 req: HttpRequest,
545 opts: Option<HttpRequestOptionsV1_1>,
546 _ctx: Option<HttpTenantCtx>,
547 ) -> Result<HttpResponse, HttpClientError> {
548 if !self.config.http_enabled {
549 return Err(HttpClientError {
550 code: "denied".into(),
551 message: "http client disabled by policy".into(),
552 });
553 }
554
555 let mut mock_state = None;
556 let raw_body = req.body.clone();
557 if let Some(mock) = &self.mocks
558 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
559 {
560 match mock.http_begin(&meta) {
561 HttpDecision::Mock(response) => {
562 let headers = response
563 .headers
564 .iter()
565 .map(|(k, v)| (k.clone(), v.clone()))
566 .collect();
567 return Ok(HttpResponse {
568 status: response.status,
569 headers,
570 body: response.body.clone().map(|b| b.into_bytes()),
571 });
572 }
573 HttpDecision::Deny(reason) => {
574 return Err(HttpClientError {
575 code: "denied".into(),
576 message: reason,
577 });
578 }
579 HttpDecision::Passthrough { record } => {
580 mock_state = Some((meta, record));
581 }
582 }
583 }
584
585 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
586 let mut builder = self.http_client.request(method, &req.url);
587 for (key, value) in req.headers {
588 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
589 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
590 {
591 builder = builder.header(header, header_value);
592 }
593 }
594
595 if let Some(body) = raw_body.clone() {
596 builder = builder.body(body);
597 }
598
599 if let Some(opts) = opts {
600 if let Some(timeout_ms) = opts.timeout_ms {
601 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
602 }
603 if opts.allow_insecure == Some(true) {
604 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
605 }
606 if let Some(follow_redirects) = opts.follow_redirects
607 && !follow_redirects
608 {
609 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
610 }
611 }
612
613 let response = match builder.send() {
614 Ok(resp) => resp,
615 Err(err) => {
616 warn!(url = %req.url, error = %err, "http client request failed");
617 return Err(HttpClientError {
618 code: "unavailable".into(),
619 message: err.to_string(),
620 });
621 }
622 };
623
624 let status = response.status().as_u16();
625 let headers_vec = response
626 .headers()
627 .iter()
628 .map(|(k, v)| {
629 (
630 k.as_str().to_string(),
631 v.to_str().unwrap_or_default().to_string(),
632 )
633 })
634 .collect::<Vec<_>>();
635 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
636
637 if let Some((meta, true)) = mock_state.take()
638 && let Some(mock) = &self.mocks
639 {
640 let recorded = HttpMockResponse::new(
641 status,
642 headers_vec.clone().into_iter().collect(),
643 body_bytes
644 .as_ref()
645 .map(|b| String::from_utf8_lossy(b).into_owned()),
646 );
647 mock.http_record(&meta, &recorded);
648 }
649
650 Ok(HttpResponse {
651 status,
652 headers: headers_vec,
653 body: body_bytes,
654 })
655 }
656}
657
658fn canonicalize_wasm_secret_key(raw: &str) -> String {
659 raw.trim()
660 .chars()
661 .map(|ch| {
662 let ch = ch.to_ascii_lowercase();
663 match ch {
664 'a'..='z' | '0'..='9' | '_' => ch,
665 _ => '_',
666 }
667 })
668 .collect()
669}
670
671#[cfg(test)]
672mod canonicalize_tests {
673 use super::canonicalize_wasm_secret_key;
674
675 #[test]
676 fn upper_snake_to_lower_snake() {
677 assert_eq!(
678 canonicalize_wasm_secret_key("TELEGRAM_BOT_TOKEN"),
679 "telegram_bot_token"
680 );
681 }
682
683 #[test]
684 fn trim_and_replace_non_alphanumeric() {
685 assert_eq!(
686 canonicalize_wasm_secret_key(" webex-bot-token "),
687 "webex_bot_token"
688 );
689 }
690
691 #[test]
692 fn preserve_existing_lower_snake_with_extra_underscores() {
693 assert_eq!(canonicalize_wasm_secret_key("MiXeD__Case"), "mixed__case");
694 }
695}
696
697impl SecretsStoreHost for HostState {
698 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
699 if provider_core_only::is_enabled() {
700 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
701 return Err(SecretsError::Denied);
702 }
703 if !self.config.secrets_policy.is_allowed(&key) {
704 return Err(SecretsError::Denied);
705 }
706 if let Some(mock) = &self.mocks
707 && let Some(value) = mock.secrets_lookup(&key)
708 {
709 return Ok(Some(value.into_bytes()));
710 }
711 let ctx = self.secrets_tenant_ctx();
712 let canonical_key = canonicalize_wasm_secret_key(&key);
713 match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
714 Ok(bytes) => Ok(Some(bytes)),
715 Err(err) => {
716 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
717 Err(SecretsError::NotFound)
718 }
719 }
720 }
721}
722
723impl SecretsStoreHostV1_1 for HostState {
724 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
725 if provider_core_only::is_enabled() {
726 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
727 return Err(SecretsErrorV1_1::Denied);
728 }
729 if !self.config.secrets_policy.is_allowed(&key) {
730 return Err(SecretsErrorV1_1::Denied);
731 }
732 if let Some(mock) = &self.mocks
733 && let Some(value) = mock.secrets_lookup(&key)
734 {
735 return Ok(Some(value.into_bytes()));
736 }
737 let ctx = self.secrets_tenant_ctx();
738 let canonical_key = canonicalize_wasm_secret_key(&key);
739 match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
740 Ok(bytes) => Ok(Some(bytes)),
741 Err(err) => {
742 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
743 Err(SecretsErrorV1_1::NotFound)
744 }
745 }
746 }
747
748 fn put(&mut self, key: String, value: Vec<u8>) {
749 if key.trim().is_empty() {
750 warn!(secret = %key, "secret write blocked: empty key");
751 panic!("secret write denied for key {key}: invalid key");
752 }
753 if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
754 warn!(
755 secret = %key,
756 component = self.component_ref.as_deref().unwrap_or("<pack>"),
757 "provider-core only mode enabled; blocking secrets store write"
758 );
759 panic!("secret write denied for key {key}: provider-core-only mode");
760 }
761 if !self.config.secrets_policy.is_allowed(&key) {
762 warn!(secret = %key, "secret write denied by bindings policy");
763 panic!("secret write denied for key {key}: policy");
764 }
765 let ctx = self.secrets_tenant_ctx();
766 let canonical_key = canonicalize_wasm_secret_key(&key);
767 if let Err(err) =
768 write_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key, &value)
769 {
770 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret write failed");
771 panic!("secret write failed for key {key}");
772 }
773 }
774}
775
776impl HttpClientHost for HostState {
777 fn send(
778 &mut self,
779 req: HttpRequest,
780 ctx: Option<HttpTenantCtx>,
781 ) -> Result<HttpResponse, HttpClientError> {
782 self.send_http_request(req, None, ctx)
783 }
784}
785
786impl HttpClientHostV1_1 for HostState {
787 fn send(
788 &mut self,
789 req: HttpRequestV1_1,
790 opts: Option<HttpRequestOptionsV1_1>,
791 ctx: Option<HttpTenantCtxV1_1>,
792 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
793 let legacy_req = HttpRequest {
794 method: req.method,
795 url: req.url,
796 headers: req.headers,
797 body: req.body,
798 };
799 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
800 env: ctx.env,
801 tenant: ctx.tenant,
802 tenant_id: ctx.tenant_id,
803 team: ctx.team,
804 team_id: ctx.team_id,
805 user: ctx.user,
806 user_id: ctx.user_id,
807 trace_id: ctx.trace_id,
808 correlation_id: ctx.correlation_id,
809 i18n_id: ctx.i18n_id,
810 attributes: ctx.attributes,
811 session_id: ctx.session_id,
812 flow_id: ctx.flow_id,
813 node_id: ctx.node_id,
814 provider_id: ctx.provider_id,
815 deadline_ms: ctx.deadline_ms,
816 attempt: ctx.attempt,
817 idempotency_key: ctx.idempotency_key,
818 impersonation: ctx.impersonation.map(|imp| http_types_v1_0::Impersonation {
819 actor_id: imp.actor_id,
820 reason: imp.reason,
821 }),
822 });
823
824 self.send_http_request(legacy_req, opts, legacy_ctx)
825 .map(|resp| HttpResponseV1_1 {
826 status: resp.status,
827 headers: resp.headers,
828 body: resp.body,
829 })
830 .map_err(|err| HttpClientErrorV1_1 {
831 code: err.code,
832 message: err.message,
833 })
834 }
835}
836
837impl StateStoreHost for HostState {
838 fn read(
839 &mut self,
840 key: HostStateKey,
841 ctx: Option<StateTenantCtx>,
842 ) -> Result<Vec<u8>, StateError> {
843 let store = match self.state_store.as_ref() {
844 Some(store) => store.clone(),
845 None => {
846 return Err(StateError {
847 code: "unavailable".into(),
848 message: "state store not configured".into(),
849 });
850 }
851 };
852 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
853 Ok(ctx) => ctx,
854 Err(err) => {
855 return Err(StateError {
856 code: "invalid-ctx".into(),
857 message: err.to_string(),
858 });
859 }
860 };
861 #[cfg(feature = "fault-injection")]
862 {
863 let exec_ctx = self.exec_ctx.as_ref();
864 let flow_id = exec_ctx
865 .map(|ctx| ctx.flow_id.as_str())
866 .unwrap_or("unknown");
867 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
868 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
869 let fault_ctx = FaultContext {
870 pack_id: self.pack_id.as_str(),
871 flow_id,
872 node_id,
873 attempt,
874 };
875 if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
876 return Err(StateError {
877 code: "internal".into(),
878 message: err.to_string(),
879 });
880 }
881 }
882 let key = StoreStateKey::from(key);
883 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
884 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
885 Ok(None) => Err(StateError {
886 code: "not_found".into(),
887 message: "state key not found".into(),
888 }),
889 Err(err) => Err(StateError {
890 code: "internal".into(),
891 message: err.to_string(),
892 }),
893 }
894 }
895
896 fn write(
897 &mut self,
898 key: HostStateKey,
899 bytes: Vec<u8>,
900 ctx: Option<StateTenantCtx>,
901 ) -> Result<StateOpAck, StateError> {
902 let store = match self.state_store.as_ref() {
903 Some(store) => store.clone(),
904 None => {
905 return Err(StateError {
906 code: "unavailable".into(),
907 message: "state store not configured".into(),
908 });
909 }
910 };
911 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
912 Ok(ctx) => ctx,
913 Err(err) => {
914 return Err(StateError {
915 code: "invalid-ctx".into(),
916 message: err.to_string(),
917 });
918 }
919 };
920 #[cfg(feature = "fault-injection")]
921 {
922 let exec_ctx = self.exec_ctx.as_ref();
923 let flow_id = exec_ctx
924 .map(|ctx| ctx.flow_id.as_str())
925 .unwrap_or("unknown");
926 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
927 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
928 let fault_ctx = FaultContext {
929 pack_id: self.pack_id.as_str(),
930 flow_id,
931 node_id,
932 attempt,
933 };
934 if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
935 return Err(StateError {
936 code: "internal".into(),
937 message: err.to_string(),
938 });
939 }
940 }
941 let key = StoreStateKey::from(key);
942 let value = serde_json::from_slice(&bytes)
943 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
944 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
945 Ok(()) => Ok(StateOpAck::Ok),
946 Err(err) => Err(StateError {
947 code: "internal".into(),
948 message: err.to_string(),
949 }),
950 }
951 }
952
953 fn delete(
954 &mut self,
955 key: HostStateKey,
956 ctx: Option<StateTenantCtx>,
957 ) -> Result<StateOpAck, StateError> {
958 let store = match self.state_store.as_ref() {
959 Some(store) => store.clone(),
960 None => {
961 return Err(StateError {
962 code: "unavailable".into(),
963 message: "state store not configured".into(),
964 });
965 }
966 };
967 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
968 Ok(ctx) => ctx,
969 Err(err) => {
970 return Err(StateError {
971 code: "invalid-ctx".into(),
972 message: err.to_string(),
973 });
974 }
975 };
976 let key = StoreStateKey::from(key);
977 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
978 Ok(_) => Ok(StateOpAck::Ok),
979 Err(err) => Err(StateError {
980 code: "internal".into(),
981 message: err.to_string(),
982 }),
983 }
984 }
985}
986
987impl TelemetryLoggerHost for HostState {
988 fn log(
989 &mut self,
990 span: TelemetrySpanContext,
991 fields: Vec<(String, String)>,
992 _ctx: Option<TelemetryTenantCtx>,
993 ) -> Result<TelemetryAck, TelemetryError> {
994 if let Some(mock) = &self.mocks
995 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
996 {
997 return Ok(TelemetryAck::Ok);
998 }
999 let mut map = serde_json::Map::new();
1000 for (k, v) in fields {
1001 map.insert(k, Value::String(v));
1002 }
1003 tracing::info!(
1004 tenant = %span.tenant,
1005 flow_id = %span.flow_id,
1006 node = ?span.node_id,
1007 provider = %span.provider,
1008 fields = %serde_json::Value::Object(map.clone()),
1009 "telemetry log from pack"
1010 );
1011 Ok(TelemetryAck::Ok)
1012 }
1013}
1014
1015impl RunnerHostHttp for HostState {
1016 fn request(
1017 &mut self,
1018 method: String,
1019 url: String,
1020 headers: Vec<String>,
1021 body: Option<Vec<u8>>,
1022 ) -> Result<Vec<u8>, String> {
1023 let req = HttpRequest {
1024 method,
1025 url,
1026 headers: headers
1027 .chunks(2)
1028 .filter_map(|chunk| {
1029 if chunk.len() == 2 {
1030 Some((chunk[0].clone(), chunk[1].clone()))
1031 } else {
1032 None
1033 }
1034 })
1035 .collect(),
1036 body,
1037 };
1038 match HttpClientHost::send(self, req, None) {
1039 Ok(resp) => Ok(resp.body.unwrap_or_default()),
1040 Err(err) => Err(err.message),
1041 }
1042 }
1043}
1044
1045impl RunnerHostKv for HostState {
1046 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
1047 None
1048 }
1049
1050 fn put(&mut self, _ns: String, _key: String, _val: String) {}
1051}
1052
1053enum ManifestLoad {
1054 New {
1055 manifest: Box<greentic_types::PackManifest>,
1056 flows: PackFlows,
1057 },
1058 Legacy {
1059 manifest: Box<legacy_pack::PackManifest>,
1060 flows: PackFlows,
1061 },
1062}
1063
1064fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
1065 let mut archive = ZipArchive::new(File::open(path)?)
1066 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1067 let bytes = read_entry(&mut archive, "manifest.cbor")
1068 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
1069 match decode_pack_manifest(&bytes) {
1070 Ok(manifest) => {
1071 let cache = PackFlows::from_manifest(manifest.clone());
1072 Ok(ManifestLoad::New {
1073 manifest: Box::new(manifest),
1074 flows: cache,
1075 })
1076 }
1077 Err(err) => {
1078 tracing::debug!(
1079 error = %err,
1080 pack = %path.display(),
1081 "decode_pack_manifest failed for archive; falling back to legacy manifest"
1082 );
1083 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1084 .context("failed to decode legacy pack manifest from manifest.cbor")?;
1085 let flows = load_legacy_flows_from_archive(&mut archive, path, &legacy)?;
1086 Ok(ManifestLoad::Legacy {
1087 manifest: Box::new(legacy),
1088 flows,
1089 })
1090 }
1091 }
1092}
1093
1094fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
1095 let manifest_path = root.join("manifest.cbor");
1096 let bytes = std::fs::read(&manifest_path)
1097 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
1098 match decode_pack_manifest(&bytes) {
1099 Ok(manifest) => {
1100 let cache = PackFlows::from_manifest(manifest.clone());
1101 Ok(ManifestLoad::New {
1102 manifest: Box::new(manifest),
1103 flows: cache,
1104 })
1105 }
1106 Err(err) => {
1107 tracing::debug!(
1108 error = %err,
1109 pack = %root.display(),
1110 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
1111 );
1112 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1113 .context("failed to decode legacy pack manifest from manifest.cbor")?;
1114 let flows = load_legacy_flows_from_dir(root, &legacy)?;
1115 Ok(ManifestLoad::Legacy {
1116 manifest: Box::new(legacy),
1117 flows,
1118 })
1119 }
1120 }
1121}
1122
1123fn load_legacy_flows_from_dir(
1124 root: &Path,
1125 manifest: &legacy_pack::PackManifest,
1126) -> Result<PackFlows> {
1127 build_legacy_flows(manifest, |rel_path| {
1128 let path = root.join(rel_path);
1129 std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
1130 })
1131}
1132
1133fn load_legacy_flows_from_archive(
1134 archive: &mut ZipArchive<File>,
1135 pack_path: &Path,
1136 manifest: &legacy_pack::PackManifest,
1137) -> Result<PackFlows> {
1138 build_legacy_flows(manifest, |rel_path| {
1139 read_entry(archive, rel_path)
1140 .with_context(|| format!("missing flow json {} in {}", rel_path, pack_path.display()))
1141 })
1142}
1143
1144fn build_legacy_flows(
1145 manifest: &legacy_pack::PackManifest,
1146 mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1147) -> Result<PackFlows> {
1148 let mut flows = HashMap::new();
1149 let mut descriptors = Vec::new();
1150
1151 for entry in &manifest.flows {
1152 let bytes = read_json(&entry.file_json)
1153 .with_context(|| format!("missing flow json {}", entry.file_json))?;
1154 let doc = parse_flow_doc_with_legacy_aliases(&bytes, &entry.file_json)?;
1155 let normalized = normalize_flow_doc(doc);
1156 let flow_ir = flow_doc_to_ir(normalized)?;
1157 let flow = flow_ir_to_flow(flow_ir)?;
1158
1159 descriptors.push(FlowDescriptor {
1160 id: entry.id.clone(),
1161 flow_type: entry.kind.clone(),
1162 pack_id: manifest.meta.pack_id.clone(),
1163 profile: manifest.meta.pack_id.clone(),
1164 version: manifest.meta.version.to_string(),
1165 description: None,
1166 });
1167 flows.insert(entry.id.clone(), flow);
1168 }
1169
1170 let mut entry_flows = manifest.meta.entry_flows.clone();
1171 if entry_flows.is_empty() {
1172 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1173 }
1174 let metadata = PackMetadata {
1175 pack_id: manifest.meta.pack_id.clone(),
1176 version: manifest.meta.version.to_string(),
1177 entry_flows,
1178 secret_requirements: Vec::new(),
1179 };
1180
1181 Ok(PackFlows {
1182 descriptors,
1183 flows,
1184 metadata,
1185 })
1186}
1187
1188fn parse_flow_doc_with_legacy_aliases(bytes: &[u8], path: &str) -> Result<FlowDoc> {
1189 let mut value: Value = serde_json::from_slice(bytes)
1190 .with_context(|| format!("failed to decode flow doc {}", path))?;
1191 if let Some(map) = value.as_object_mut()
1192 && !map.contains_key("type")
1193 && let Some(flow_type) = map.remove("flow_type")
1194 {
1195 map.insert("type".to_string(), flow_type);
1196 }
1197 serde_json::from_value(value).with_context(|| format!("failed to decode flow doc {}", path))
1198}
1199
1200pub struct ComponentState {
1201 pub host: HostState,
1202 wasi_ctx: WasiCtx,
1203 resource_table: ResourceTable,
1204}
1205
1206impl ComponentState {
1207 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1208 let wasi_ctx = policy
1209 .instantiate()
1210 .context("failed to build WASI context")?;
1211 Ok(Self {
1212 host,
1213 wasi_ctx,
1214 resource_table: ResourceTable::new(),
1215 })
1216 }
1217
1218 fn host_mut(&mut self) -> &mut HostState {
1219 &mut self.host
1220 }
1221
1222 fn should_cancel_host(&mut self) -> bool {
1223 false
1224 }
1225
1226 fn yield_now_host(&mut self) {
1227 }
1229}
1230
1231impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1232 fn should_cancel(&mut self) -> bool {
1233 self.should_cancel_host()
1234 }
1235
1236 fn yield_now(&mut self) {
1237 self.yield_now_host();
1238 }
1239}
1240
1241impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1242 fn should_cancel(&mut self) -> bool {
1243 self.should_cancel_host()
1244 }
1245
1246 fn yield_now(&mut self) {
1247 self.yield_now_host();
1248 }
1249}
1250
1251fn add_component_control_instance(
1252 linker: &mut Linker<ComponentState>,
1253 name: &str,
1254) -> wasmtime::Result<()> {
1255 let mut inst = linker.instance(name)?;
1256 inst.func_wrap(
1257 "should-cancel",
1258 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1259 let host = caller.data_mut();
1260 Ok((host.should_cancel_host(),))
1261 },
1262 )?;
1263 inst.func_wrap(
1264 "yield-now",
1265 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1266 let host = caller.data_mut();
1267 host.yield_now_host();
1268 Ok(())
1269 },
1270 )?;
1271 Ok(())
1272}
1273
1274fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1275 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1276 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1277 Ok(())
1278}
1279
1280pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1281 add_wasi_to_linker(linker)?;
1282 add_all_v1_to_linker(
1283 linker,
1284 HostFns {
1285 http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1286 http_client: Some(|state: &mut ComponentState| state.host_mut()),
1287 oauth_broker: None,
1288 runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1289 runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1290 telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1291 state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1292 secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1293 secrets_store: None,
1294 },
1295 )?;
1296 add_http_client_client_world_aliases(linker)?;
1297 Ok(())
1298}
1299
1300fn add_http_client_client_world_aliases(linker: &mut Linker<ComponentState>) -> Result<()> {
1301 let mut inst_v1_1 = linker.instance("greentic:http/client@1.1.0")?;
1302 inst_v1_1.func_wrap(
1303 "send",
1304 move |mut caller: StoreContextMut<'_, ComponentState>,
1305 (req, opts, ctx): (
1306 http_client_client_alias::Request,
1307 Option<http_client_client_alias::RequestOptions>,
1308 Option<http_client_client_alias::TenantCtx>,
1309 )| {
1310 let host = caller.data_mut().host_mut();
1311 let result = HttpClientHostV1_1::send(
1312 host,
1313 alias_request_to_host(req),
1314 opts.map(alias_request_options_to_host),
1315 ctx.map(alias_tenant_ctx_to_host),
1316 );
1317 Ok((match result {
1318 Ok(resp) => Ok(alias_response_from_host(resp)),
1319 Err(err) => Err(alias_error_from_host(err)),
1320 },))
1321 },
1322 )?;
1323 let mut inst_v1_0 = linker.instance("greentic:http/client@1.0.0")?;
1324 inst_v1_0.func_wrap(
1325 "send",
1326 move |mut caller: StoreContextMut<'_, ComponentState>,
1327 (req, ctx): (
1328 host_http_client::Request,
1329 Option<host_http_client::TenantCtx>,
1330 )| {
1331 let host = caller.data_mut().host_mut();
1332 let result = HttpClientHost::send(host, req, ctx);
1333 Ok((result,))
1334 },
1335 )?;
1336 Ok(())
1337}
1338
1339fn alias_request_to_host(req: http_client_client_alias::Request) -> host_http_client::RequestV1_1 {
1340 host_http_client::RequestV1_1 {
1341 method: req.method,
1342 url: req.url,
1343 headers: req.headers,
1344 body: req.body,
1345 }
1346}
1347
1348fn alias_request_options_to_host(
1349 opts: http_client_client_alias::RequestOptions,
1350) -> host_http_client::RequestOptionsV1_1 {
1351 host_http_client::RequestOptionsV1_1 {
1352 timeout_ms: opts.timeout_ms,
1353 allow_insecure: opts.allow_insecure,
1354 follow_redirects: opts.follow_redirects,
1355 }
1356}
1357
1358fn alias_tenant_ctx_to_host(
1359 ctx: http_client_client_alias::TenantCtx,
1360) -> host_http_client::TenantCtxV1_1 {
1361 host_http_client::TenantCtxV1_1 {
1362 env: ctx.env,
1363 tenant: ctx.tenant,
1364 tenant_id: ctx.tenant_id,
1365 team: ctx.team,
1366 team_id: ctx.team_id,
1367 user: ctx.user,
1368 user_id: ctx.user_id,
1369 trace_id: ctx.trace_id,
1370 correlation_id: ctx.correlation_id,
1371 i18n_id: ctx.i18n_id,
1372 attributes: ctx.attributes,
1373 session_id: ctx.session_id,
1374 flow_id: ctx.flow_id,
1375 node_id: ctx.node_id,
1376 provider_id: ctx.provider_id,
1377 deadline_ms: ctx.deadline_ms,
1378 attempt: ctx.attempt,
1379 idempotency_key: ctx.idempotency_key,
1380 impersonation: ctx.impersonation.map(|imp| http_types_v1_1::Impersonation {
1381 actor_id: imp.actor_id,
1382 reason: imp.reason,
1383 }),
1384 }
1385}
1386
1387fn alias_response_from_host(
1388 resp: host_http_client::ResponseV1_1,
1389) -> http_client_client_alias::Response {
1390 http_client_client_alias::Response {
1391 status: resp.status,
1392 headers: resp.headers,
1393 body: resp.body,
1394 }
1395}
1396
1397fn alias_error_from_host(
1398 err: host_http_client::HttpClientErrorV1_1,
1399) -> http_client_client_alias::HostError {
1400 http_client_client_alias::HostError {
1401 code: err.code,
1402 message: err.message,
1403 }
1404}
1405
1406impl OAuthHostContext for ComponentState {
1407 fn tenant_id(&self) -> &str {
1408 &self.host.config.tenant
1409 }
1410
1411 fn env(&self) -> &str {
1412 &self.host.default_env
1413 }
1414
1415 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1416 &mut self.host.oauth_host
1417 }
1418
1419 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1420 self.host.oauth_config.as_ref()
1421 }
1422}
1423
1424impl WasiView for ComponentState {
1425 fn ctx(&mut self) -> WasiCtxView<'_> {
1426 WasiCtxView {
1427 ctx: &mut self.wasi_ctx,
1428 table: &mut self.resource_table,
1429 }
1430 }
1431}
1432
1433#[allow(unsafe_code)]
1434unsafe impl Send for ComponentState {}
1435#[allow(unsafe_code)]
1436unsafe impl Sync for ComponentState {}
1437
1438impl PackRuntime {
1439 fn allows_state_store(&self, component_ref: &str) -> bool {
1440 if self.state_store.is_none() {
1441 return false;
1442 }
1443 if !self.config.state_store_policy.allow {
1444 return false;
1445 }
1446 let Some(manifest) = self.component_manifests.get(component_ref) else {
1447 return true;
1449 };
1450 manifest
1455 .capabilities
1456 .host
1457 .state
1458 .as_ref()
1459 .map(|caps| caps.read || caps.write)
1460 .unwrap_or(false)
1461 }
1462
1463 pub fn contains_component(&self, component_ref: &str) -> bool {
1464 self.components.contains_key(component_ref)
1465 }
1466
1467 #[allow(clippy::too_many_arguments)]
1468 pub async fn load(
1469 path: impl AsRef<Path>,
1470 config: Arc<HostConfig>,
1471 mocks: Option<Arc<MockLayer>>,
1472 archive_source: Option<&Path>,
1473 session_store: Option<DynSessionStore>,
1474 state_store: Option<DynStateStore>,
1475 wasi_policy: Arc<RunnerWasiPolicy>,
1476 secrets: DynSecretsManager,
1477 oauth_config: Option<OAuthBrokerConfig>,
1478 verify_archive: bool,
1479 component_resolution: ComponentResolution,
1480 ) -> Result<Self> {
1481 let path = path.as_ref();
1482 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1483 let path_meta = std::fs::metadata(&safe_path).ok();
1484 let is_dir = path_meta
1485 .as_ref()
1486 .map(|meta| meta.is_dir())
1487 .unwrap_or(false);
1488 let is_component = !is_dir
1489 && safe_path
1490 .extension()
1491 .and_then(|ext| ext.to_str())
1492 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1493 .unwrap_or(false);
1494 let archive_hint_path = if let Some(source) = archive_source {
1495 let (_, normalized) = normalize_pack_path(source)?;
1496 Some(normalized)
1497 } else if is_component || is_dir {
1498 None
1499 } else {
1500 Some(safe_path.clone())
1501 };
1502 let archive_hint = archive_hint_path.as_deref();
1503 if verify_archive {
1504 if let Some(verify_target) = archive_hint.and_then(|p| {
1505 std::fs::metadata(p)
1506 .ok()
1507 .filter(|meta| meta.is_file())
1508 .map(|_| p)
1509 }) {
1510 verify::verify_pack(verify_target).await?;
1511 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1512 } else {
1513 tracing::debug!("skipping archive verification (no archive source)");
1514 }
1515 }
1516 let engine = Engine::default();
1517 let engine_profile =
1518 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1519 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1520 let mut metadata = PackMetadata::fallback(&safe_path);
1521 let mut manifest = None;
1522 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1523 let mut flows = None;
1524 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1525 if is_dir {
1526 Some(safe_path.clone())
1527 } else {
1528 None
1529 }
1530 });
1531 let (pack_assets_dir, assets_tempdir) =
1532 locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1533 let setup_yaml_exists = pack_assets_dir
1534 .as_ref()
1535 .map(|dir| dir.join("setup.yaml").is_file())
1536 .unwrap_or(false);
1537 tracing::info!(
1538 pack_root = %safe_path.display(),
1539 assets_setup_yaml_exists = setup_yaml_exists,
1540 "pack unpack metadata"
1541 );
1542
1543 if let Some(root) = materialized_root.as_ref() {
1544 match load_manifest_and_flows_from_dir(root) {
1545 Ok(ManifestLoad::New {
1546 manifest: m,
1547 flows: cache,
1548 }) => {
1549 metadata = cache.metadata.clone();
1550 manifest = Some(*m);
1551 flows = Some(cache);
1552 }
1553 Ok(ManifestLoad::Legacy {
1554 manifest: m,
1555 flows: cache,
1556 }) => {
1557 metadata = cache.metadata.clone();
1558 legacy_manifest = Some(m);
1559 flows = Some(cache);
1560 }
1561 Err(err) => {
1562 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1563 }
1564 }
1565 }
1566
1567 if manifest.is_none()
1568 && legacy_manifest.is_none()
1569 && let Some(archive_path) = archive_hint
1570 {
1571 let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1572 format!(
1573 "failed to load manifest.cbor from {}",
1574 archive_path.display()
1575 )
1576 })?;
1577 match manifest_load {
1578 ManifestLoad::New {
1579 manifest: m,
1580 flows: cache,
1581 } => {
1582 metadata = cache.metadata.clone();
1583 manifest = Some(*m);
1584 flows = Some(cache);
1585 }
1586 ManifestLoad::Legacy {
1587 manifest: m,
1588 flows: cache,
1589 } => {
1590 metadata = cache.metadata.clone();
1591 legacy_manifest = Some(m);
1592 flows = Some(cache);
1593 }
1594 }
1595 }
1596 #[cfg(feature = "fault-injection")]
1597 {
1598 let fault_ctx = FaultContext {
1599 pack_id: metadata.pack_id.as_str(),
1600 flow_id: "unknown",
1601 node_id: None,
1602 attempt: 1,
1603 };
1604 maybe_fail(FaultPoint::PackResolve, fault_ctx)
1605 .map_err(|err| anyhow!(err.to_string()))?;
1606 }
1607 let mut pack_lock = None;
1608 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1609 pack_lock = load_pack_lock(&root)?;
1610 if pack_lock.is_some() {
1611 break;
1612 }
1613 }
1614 let component_sources_payload = if pack_lock.is_none() {
1615 if let Some(manifest) = manifest.as_ref() {
1616 manifest
1617 .get_component_sources_v1()
1618 .context("invalid component sources extension")?
1619 } else {
1620 None
1621 }
1622 } else {
1623 None
1624 };
1625 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1626 Some(component_sources_table_from_pack_lock(
1627 lock,
1628 component_resolution.allow_missing_hash,
1629 )?)
1630 } else {
1631 component_sources_table(component_sources_payload.as_ref())?
1632 };
1633 let components = if is_component {
1634 let wasm_bytes = fs::read(&safe_path).await?;
1635 metadata = PackMetadata::from_wasm(&wasm_bytes)
1636 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1637 let name = safe_path
1638 .file_stem()
1639 .map(|s| s.to_string_lossy().to_string())
1640 .unwrap_or_else(|| "component".to_string());
1641 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1642 let mut map = HashMap::new();
1643 map.insert(
1644 name.clone(),
1645 PackComponent {
1646 name,
1647 version: metadata.version.clone(),
1648 component,
1649 },
1650 );
1651 map
1652 } else {
1653 let specs = component_specs(
1654 manifest.as_ref(),
1655 legacy_manifest.as_deref(),
1656 component_sources_payload.as_ref(),
1657 pack_lock.as_ref(),
1658 );
1659 if specs.is_empty() {
1660 HashMap::new()
1661 } else {
1662 let mut loaded = HashMap::new();
1663 let mut missing: HashSet<String> =
1664 specs.iter().map(|spec| spec.id.clone()).collect();
1665 let mut searched = Vec::new();
1666
1667 if !component_resolution.overrides.is_empty() {
1668 load_components_from_overrides(
1669 &cache,
1670 &engine,
1671 &component_resolution.overrides,
1672 &specs,
1673 &mut missing,
1674 &mut loaded,
1675 )
1676 .await?;
1677 searched.push("override map".to_string());
1678 }
1679
1680 if let Some(component_sources) = component_sources.as_ref() {
1681 load_components_from_sources(
1682 &cache,
1683 &engine,
1684 component_sources,
1685 &component_resolution,
1686 &specs,
1687 &mut missing,
1688 &mut loaded,
1689 materialized_root.as_deref(),
1690 archive_hint,
1691 )
1692 .await?;
1693 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1694 }
1695
1696 if let Some(root) = materialized_root.as_ref() {
1697 load_components_from_dir(
1698 &cache,
1699 &engine,
1700 root,
1701 &specs,
1702 &mut missing,
1703 &mut loaded,
1704 )
1705 .await?;
1706 searched.push(format!("components dir {}", root.display()));
1707 }
1708
1709 if let Some(archive_path) = archive_hint {
1710 load_components_from_archive(
1711 &cache,
1712 &engine,
1713 archive_path,
1714 &specs,
1715 &mut missing,
1716 &mut loaded,
1717 )
1718 .await?;
1719 searched.push(format!("archive {}", archive_path.display()));
1720 }
1721
1722 if !missing.is_empty() {
1723 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1724 let sources = if searched.is_empty() {
1725 "no component sources".to_string()
1726 } else {
1727 searched.join(", ")
1728 };
1729 bail!(
1730 "components missing: {}; looked in {}",
1731 missing_list,
1732 sources
1733 );
1734 }
1735
1736 loaded
1737 }
1738 };
1739 let http_client = Arc::clone(&HTTP_CLIENT);
1740 let mut component_manifests = HashMap::new();
1741 if let Some(manifest) = manifest.as_ref() {
1742 for component in &manifest.components {
1743 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1744 }
1745 }
1746 let mut pack_policy = (*wasi_policy).clone();
1747 if let Some(dir) = pack_assets_dir {
1748 tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
1749 pack_policy =
1750 pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
1751 }
1752 let wasi_policy = Arc::new(pack_policy);
1753 Ok(Self {
1754 path: safe_path,
1755 archive_path: archive_hint.map(Path::to_path_buf),
1756 config,
1757 engine,
1758 metadata,
1759 manifest,
1760 legacy_manifest,
1761 component_manifests,
1762 mocks,
1763 flows,
1764 components,
1765 http_client,
1766 pre_cache: Mutex::new(HashMap::new()),
1767 session_store,
1768 state_store,
1769 wasi_policy,
1770 assets_tempdir,
1771 provider_registry: RwLock::new(None),
1772 secrets,
1773 oauth_config,
1774 cache,
1775 })
1776 }
1777
1778 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1779 if let Some(cache) = &self.flows {
1780 return Ok(cache.descriptors.clone());
1781 }
1782 if let Some(manifest) = &self.manifest {
1783 let descriptors = manifest
1784 .flows
1785 .iter()
1786 .map(|flow| FlowDescriptor {
1787 id: flow.id.as_str().to_string(),
1788 flow_type: flow_kind_to_str(flow.kind).to_string(),
1789 pack_id: manifest.pack_id.as_str().to_string(),
1790 profile: manifest.pack_id.as_str().to_string(),
1791 version: manifest.version.to_string(),
1792 description: None,
1793 })
1794 .collect();
1795 return Ok(descriptors);
1796 }
1797 Ok(Vec::new())
1798 }
1799
1800 #[allow(dead_code)]
1801 pub async fn run_flow(
1802 &self,
1803 flow_id: &str,
1804 input: serde_json::Value,
1805 ) -> Result<serde_json::Value> {
1806 let pack = Arc::new(
1807 PackRuntime::load(
1808 &self.path,
1809 Arc::clone(&self.config),
1810 self.mocks.clone(),
1811 self.archive_path.as_deref(),
1812 self.session_store.clone(),
1813 self.state_store.clone(),
1814 Arc::clone(&self.wasi_policy),
1815 self.secrets.clone(),
1816 self.oauth_config.clone(),
1817 false,
1818 ComponentResolution::default(),
1819 )
1820 .await?,
1821 );
1822
1823 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1824 let retry_config = self.config.retry_config().into();
1825 let mocks = pack.mocks.as_deref();
1826 let tenant = self.config.tenant.as_str();
1827
1828 let ctx = FlowContext {
1829 tenant,
1830 pack_id: pack.metadata().pack_id.as_str(),
1831 flow_id,
1832 node_id: None,
1833 tool: None,
1834 action: None,
1835 session_id: None,
1836 provider_id: None,
1837 retry_config,
1838 attempt: 1,
1839 observer: None,
1840 mocks,
1841 };
1842
1843 let execution = engine.execute(ctx, input).await?;
1844 match execution.status {
1845 FlowStatus::Completed => Ok(execution.output),
1846 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1847 "status": "pending",
1848 "reason": wait.reason,
1849 "resume": wait.snapshot,
1850 "response": execution.output,
1851 })),
1852 }
1853 }
1854
1855 pub async fn invoke_component(
1856 &self,
1857 component_ref: &str,
1858 ctx: ComponentExecCtx,
1859 operation: &str,
1860 _config_json: Option<String>,
1861 input_json: String,
1862 ) -> Result<Value> {
1863 let pack_component = self
1864 .components
1865 .get(component_ref)
1866 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1867 let engine = self.engine.clone();
1868 let config = Arc::clone(&self.config);
1869 let http_client = Arc::clone(&self.http_client);
1870 let mocks = self.mocks.clone();
1871 let session_store = self.session_store.clone();
1872 let state_store = self.state_store.clone();
1873 let secrets = Arc::clone(&self.secrets);
1874 let oauth_config = self.oauth_config.clone();
1875 let wasi_policy = Arc::clone(&self.wasi_policy);
1876 let pack_id = self.metadata().pack_id.clone();
1877 let allow_state_store = self.allows_state_store(component_ref);
1878 let component = pack_component.component.clone();
1879 let component_ref_owned = component_ref.to_string();
1880 let operation_owned = operation.to_string();
1881 let input_owned = input_json;
1882 let ctx_owned = ctx;
1883
1884 run_on_wasi_thread("component.invoke", move || {
1885 let mut linker = Linker::new(&engine);
1886 register_all(&mut linker, allow_state_store)?;
1887 add_component_control_to_linker(&mut linker)?;
1888
1889 let host_state = HostState::new(
1890 pack_id.clone(),
1891 config,
1892 http_client,
1893 mocks,
1894 session_store,
1895 state_store,
1896 secrets,
1897 oauth_config,
1898 Some(ctx_owned.clone()),
1899 Some(component_ref_owned.clone()),
1900 false,
1901 )?;
1902 let store_state = ComponentState::new(host_state, wasi_policy)?;
1903 let mut store = wasmtime::Store::new(&engine, store_state);
1904
1905 let invoke_result = HostState::instantiate_component_result(
1906 &mut linker,
1907 &mut store,
1908 &component,
1909 &ctx_owned,
1910 &component_ref_owned,
1911 &operation_owned,
1912 &input_owned,
1913 )?;
1914 HostState::convert_invoke_result(invoke_result)
1915 })
1916 }
1917
1918 pub fn resolve_provider(
1919 &self,
1920 provider_id: Option<&str>,
1921 provider_type: Option<&str>,
1922 ) -> Result<ProviderBinding> {
1923 let registry = self.provider_registry()?;
1924 registry.resolve(provider_id, provider_type)
1925 }
1926
1927 pub async fn invoke_provider(
1928 &self,
1929 binding: &ProviderBinding,
1930 ctx: ComponentExecCtx,
1931 op: &str,
1932 input_json: Vec<u8>,
1933 ) -> Result<Value> {
1934 let component_ref_owned = binding.component_ref.clone();
1935 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
1936 format!("provider component '{component_ref_owned}' not found in pack")
1937 })?;
1938 let component = pack_component.component.clone();
1939
1940 let engine = self.engine.clone();
1941 let config = Arc::clone(&self.config);
1942 let http_client = Arc::clone(&self.http_client);
1943 let mocks = self.mocks.clone();
1944 let session_store = self.session_store.clone();
1945 let state_store = self.state_store.clone();
1946 let secrets = Arc::clone(&self.secrets);
1947 let oauth_config = self.oauth_config.clone();
1948 let wasi_policy = Arc::clone(&self.wasi_policy);
1949 let pack_id = self.metadata().pack_id.clone();
1950 let allow_state_store = self.allows_state_store(&component_ref_owned);
1951 let input_owned = input_json;
1952 let op_owned = op.to_string();
1953 let ctx_owned = ctx;
1954 let world = binding.world.clone();
1955
1956 run_on_wasi_thread("provider.invoke", move || {
1957 let mut linker = Linker::new(&engine);
1958 register_all(&mut linker, allow_state_store)?;
1959 add_component_control_to_linker(&mut linker)?;
1960 let host_state = HostState::new(
1961 pack_id.clone(),
1962 config,
1963 http_client,
1964 mocks,
1965 session_store,
1966 state_store,
1967 secrets,
1968 oauth_config,
1969 Some(ctx_owned.clone()),
1970 Some(component_ref_owned.clone()),
1971 true,
1972 )?;
1973 let store_state = ComponentState::new(host_state, wasi_policy)?;
1974 let mut store = wasmtime::Store::new(&engine, store_state);
1975 let use_schema_core_schema = world.contains("provider-schema-core");
1976 let use_schema_core_path = world.contains("provider/schema-core");
1977 let result = if use_schema_core_schema {
1978 let pre_instance = linker.instantiate_pre(component.as_ref())?;
1979 let pre: SchemaSchemaCorePre<ComponentState> =
1980 SchemaSchemaCorePre::new(pre_instance)?;
1981 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
1982 let provider = bindings.greentic_provider_schema_core_schema_core_api();
1983 provider.call_invoke(&mut store, &op_owned, &input_owned)?
1984 } else if use_schema_core_path {
1985 let pre_instance = linker.instantiate_pre(component.as_ref())?;
1986 let path_attempt = (|| -> Result<Vec<u8>> {
1987 let pre: PathSchemaCorePre<ComponentState> =
1988 PathSchemaCorePre::new(pre_instance)?;
1989 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
1990 let provider = bindings.greentic_provider_schema_core_api();
1991 Ok(provider.call_invoke(&mut store, &op_owned, &input_owned)?)
1992 })();
1993 match path_attempt {
1994 Ok(value) => value,
1995 Err(path_err)
1996 if path_err.to_string().contains("no exported instance named") =>
1997 {
1998 let pre_instance = linker.instantiate_pre(component.as_ref())?;
1999 let pre: SchemaSchemaCorePre<ComponentState> =
2000 SchemaSchemaCorePre::new(pre_instance)?;
2001 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2002 let provider = bindings.greentic_provider_schema_core_schema_core_api();
2003 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2004 }
2005 Err(path_err) => return Err(path_err),
2006 }
2007 } else {
2008 let pre_instance = linker.instantiate_pre(component.as_ref())?;
2009 let pre: LegacySchemaCorePre<ComponentState> =
2010 LegacySchemaCorePre::new(pre_instance)?;
2011 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
2012 let provider = bindings.greentic_provider_core_schema_core_api();
2013 provider.call_invoke(&mut store, &op_owned, &input_owned)?
2014 };
2015 deserialize_json_bytes(result)
2016 })
2017 }
2018
2019 pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
2020 if let Some(registry) = self.provider_registry.read().clone() {
2021 return Ok(registry);
2022 }
2023 let manifest = self
2024 .manifest
2025 .as_ref()
2026 .context("pack manifest required for provider resolution")?;
2027 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
2028 let registry = ProviderRegistry::new(
2029 manifest,
2030 self.state_store.clone(),
2031 &self.config.tenant,
2032 &env,
2033 )?;
2034 *self.provider_registry.write() = Some(registry.clone());
2035 Ok(registry)
2036 }
2037
2038 pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
2039 if self.manifest.is_none() {
2040 return Ok(None);
2041 }
2042 Ok(Some(self.provider_registry()?))
2043 }
2044
2045 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
2046 if let Some(cache) = &self.flows {
2047 return cache
2048 .flows
2049 .get(flow_id)
2050 .cloned()
2051 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
2052 }
2053 if let Some(manifest) = &self.manifest {
2054 let entry = manifest
2055 .flows
2056 .iter()
2057 .find(|f| f.id.as_str() == flow_id)
2058 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
2059 return Ok(entry.flow.clone());
2060 }
2061 bail!("flow '{flow_id}' not available (pack exports disabled)")
2062 }
2063
2064 pub fn metadata(&self) -> &PackMetadata {
2065 &self.metadata
2066 }
2067
2068 pub fn read_asset(&self, asset_path: &str) -> Result<Vec<u8>> {
2073 let normalized = asset_path
2074 .trim_start_matches("assets/")
2075 .trim_start_matches("/assets/");
2076 if let Some(tempdir) = &self.assets_tempdir {
2078 let full = tempdir.path().join("assets").join(normalized);
2079 if full.exists() {
2080 return std::fs::read(&full)
2081 .with_context(|| format!("read asset {}", full.display()));
2082 }
2083 }
2084 let full = self.path.join("assets").join(normalized);
2086 if full.exists() {
2087 return std::fs::read(&full).with_context(|| format!("read asset {}", full.display()));
2088 }
2089 bail!("asset not found: {}", asset_path)
2090 }
2091
2092 pub fn component_manifest(&self, component_ref: &str) -> Option<&ComponentManifest> {
2093 self.component_manifests.get(component_ref)
2094 }
2095
2096 pub fn describe_component_contract_v0_6(&self, component_ref: &str) -> Result<Option<Value>> {
2097 let pack_component = self
2098 .components
2099 .get(component_ref)
2100 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
2101 let engine = self.engine.clone();
2102 let config = Arc::clone(&self.config);
2103 let http_client = Arc::clone(&self.http_client);
2104 let mocks = self.mocks.clone();
2105 let session_store = self.session_store.clone();
2106 let state_store = self.state_store.clone();
2107 let secrets = Arc::clone(&self.secrets);
2108 let oauth_config = self.oauth_config.clone();
2109 let wasi_policy = Arc::clone(&self.wasi_policy);
2110 let pack_id = self.metadata().pack_id.clone();
2111 let allow_state_store = self.allows_state_store(component_ref);
2112 let component = pack_component.component.clone();
2113 let component_ref_owned = component_ref.to_string();
2114
2115 run_on_wasi_thread("component.describe", move || {
2116 let mut linker = Linker::new(&engine);
2117 register_all(&mut linker, allow_state_store)?;
2118 add_component_control_to_linker(&mut linker)?;
2119
2120 let host_state = HostState::new(
2121 pack_id.clone(),
2122 config,
2123 http_client,
2124 mocks,
2125 session_store,
2126 state_store,
2127 secrets,
2128 oauth_config,
2129 None,
2130 Some(component_ref_owned),
2131 false,
2132 )?;
2133 let store_state = ComponentState::new(host_state, wasi_policy)?;
2134 let mut store = wasmtime::Store::new(&engine, store_state);
2135 let pre_instance = linker.instantiate_pre(&component)?;
2136 let pre = match component_api::v0_6_descriptor::ComponentV0V6V0Pre::new(pre_instance) {
2137 Ok(pre) => pre,
2138 Err(_) => return Ok(None),
2139 };
2140 let bytes = block_on(async {
2141 let bindings = pre.instantiate_async(&mut store).await?;
2142 let descriptor = bindings.greentic_component_component_descriptor();
2143 descriptor.call_describe(&mut store)
2144 })?;
2145
2146 if bytes.is_empty() {
2147 return Ok(Some(Value::Null));
2148 }
2149 if let Ok(value) = serde_cbor::from_slice::<Value>(&bytes) {
2150 return Ok(Some(value));
2151 }
2152 if let Ok(value) = serde_json::from_slice::<Value>(&bytes) {
2153 return Ok(Some(value));
2154 }
2155 if let Ok(text) = String::from_utf8(bytes) {
2156 if let Ok(value) = serde_json::from_str::<Value>(&text) {
2157 return Ok(Some(value));
2158 }
2159 return Ok(Some(Value::String(text)));
2160 }
2161 Ok(Some(Value::Null))
2162 })
2163 }
2164
2165 pub fn load_schema_json(&self, schema_ref: &str) -> Result<Option<Value>> {
2166 let rel = normalize_schema_ref(schema_ref)?;
2167 if self.path.is_dir() {
2168 let candidate = self.path.join(&rel);
2169 if candidate.exists() {
2170 let bytes = std::fs::read(&candidate).with_context(|| {
2171 format!("failed to read schema file {}", candidate.display())
2172 })?;
2173 let value = serde_json::from_slice::<Value>(&bytes)
2174 .with_context(|| format!("invalid schema JSON in {}", candidate.display()))?;
2175 return Ok(Some(value));
2176 }
2177 }
2178
2179 if let Some(archive_path) = self
2180 .archive_path
2181 .as_ref()
2182 .or_else(|| path_is_gtpack(&self.path).then_some(&self.path))
2183 {
2184 let file = File::open(archive_path)
2185 .with_context(|| format!("failed to open {}", archive_path.display()))?;
2186 let mut archive = ZipArchive::new(file)
2187 .with_context(|| format!("failed to read pack {}", archive_path.display()))?;
2188 match archive.by_name(&rel) {
2189 Ok(mut entry) => {
2190 let mut bytes = Vec::new();
2191 entry.read_to_end(&mut bytes)?;
2192 let value = serde_json::from_slice::<Value>(&bytes).with_context(|| {
2193 format!("invalid schema JSON in {}:{}", archive_path.display(), rel)
2194 })?;
2195 Ok(Some(value))
2196 }
2197 Err(zip::result::ZipError::FileNotFound) => Ok(None),
2198 Err(err) => Err(anyhow!(err)).with_context(|| {
2199 format!(
2200 "failed to read schema `{}` from {}",
2201 rel,
2202 archive_path.display()
2203 )
2204 }),
2205 }
2206 } else {
2207 Ok(None)
2208 }
2209 }
2210
2211 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
2212 &self.metadata.secret_requirements
2213 }
2214
2215 pub fn missing_secrets(
2216 &self,
2217 tenant_ctx: &TypesTenantCtx,
2218 ) -> Vec<greentic_types::SecretRequirement> {
2219 let env = tenant_ctx.env.as_str().to_string();
2220 let tenant = tenant_ctx.tenant.as_str().to_string();
2221 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
2222 self.required_secrets()
2223 .iter()
2224 .filter(|req| {
2225 if let Some(scope) = &req.scope {
2227 if scope.env != env {
2228 return false;
2229 }
2230 if scope.tenant != tenant {
2231 return false;
2232 }
2233 if let Some(ref team_req) = scope.team
2234 && team.as_ref() != Some(team_req)
2235 {
2236 return false;
2237 }
2238 }
2239 let ctx = self.config.tenant_ctx();
2240 read_secret_blocking(
2241 &self.secrets,
2242 &ctx,
2243 &self.metadata.pack_id,
2244 req.key.as_str(),
2245 )
2246 .is_err()
2247 })
2248 .cloned()
2249 .collect()
2250 }
2251
2252 pub fn for_component_test(
2253 components: Vec<(String, PathBuf)>,
2254 flows: HashMap<String, FlowIR>,
2255 pack_id: &str,
2256 config: Arc<HostConfig>,
2257 ) -> Result<Self> {
2258 let engine = Engine::default();
2259 let engine_profile =
2260 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2261 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
2262 let mut component_map = HashMap::new();
2263 for (name, path) in components {
2264 if !path.exists() {
2265 bail!("component artifact missing: {}", path.display());
2266 }
2267 let wasm_bytes = std::fs::read(&path)?;
2268 let component =
2269 Arc::new(Component::from_binary(&engine, &wasm_bytes).map_err(|err| {
2270 anyhow!("failed to compile component {}: {err}", path.display())
2271 })?);
2272 component_map.insert(
2273 name.clone(),
2274 PackComponent {
2275 name,
2276 version: "0.0.0".into(),
2277 component,
2278 },
2279 );
2280 }
2281
2282 let mut flow_map = HashMap::new();
2283 let mut descriptors = Vec::new();
2284 for (id, ir) in flows {
2285 let flow_type = ir.flow_type.clone();
2286 let flow = flow_ir_to_flow(ir)?;
2287 flow_map.insert(id.clone(), flow);
2288 descriptors.push(FlowDescriptor {
2289 id: id.clone(),
2290 flow_type,
2291 pack_id: pack_id.to_string(),
2292 profile: "test".into(),
2293 version: "0.0.0".into(),
2294 description: None,
2295 });
2296 }
2297 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
2298 let metadata = PackMetadata {
2299 pack_id: pack_id.to_string(),
2300 version: "0.0.0".into(),
2301 entry_flows,
2302 secret_requirements: Vec::new(),
2303 };
2304 let flows_cache = PackFlows {
2305 descriptors: descriptors.clone(),
2306 flows: flow_map,
2307 metadata: metadata.clone(),
2308 };
2309
2310 Ok(Self {
2311 path: PathBuf::new(),
2312 archive_path: None,
2313 config,
2314 engine,
2315 metadata,
2316 manifest: None,
2317 legacy_manifest: None,
2318 component_manifests: HashMap::new(),
2319 mocks: None,
2320 flows: Some(flows_cache),
2321 components: component_map,
2322 http_client: Arc::clone(&HTTP_CLIENT),
2323 pre_cache: Mutex::new(HashMap::new()),
2324 session_store: None,
2325 state_store: None,
2326 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
2327 assets_tempdir: None,
2328 provider_registry: RwLock::new(None),
2329 secrets: crate::secrets::default_manager()?,
2330 oauth_config: None,
2331 cache,
2332 })
2333 }
2334}
2335
2336fn normalize_schema_ref(schema_ref: &str) -> Result<String> {
2337 let candidate = schema_ref.trim();
2338 if candidate.is_empty() {
2339 bail!("schema ref cannot be empty");
2340 }
2341 let path = Path::new(candidate);
2342 if path.is_absolute() {
2343 bail!("schema ref must be relative: {}", schema_ref);
2344 }
2345 let mut normalized = PathBuf::new();
2346 for component in path.components() {
2347 match component {
2348 std::path::Component::Normal(part) => normalized.push(part),
2349 std::path::Component::CurDir => {}
2350 _ => bail!("schema ref must not contain traversal: {}", schema_ref),
2351 }
2352 }
2353 let normalized = normalized
2354 .to_str()
2355 .map(ToString::to_string)
2356 .ok_or_else(|| anyhow!("schema ref must be valid UTF-8"))?;
2357 if normalized.is_empty() {
2358 bail!("schema ref cannot normalize to empty path");
2359 }
2360 Ok(normalized)
2361}
2362
2363fn path_is_gtpack(path: &Path) -> bool {
2364 path.extension()
2365 .and_then(|ext| ext.to_str())
2366 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
2367 .unwrap_or(false)
2368}
2369
2370fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
2371 let message = err.to_string();
2372 message.contains("no exported instance named")
2373 && message.contains(&format!("greentic:component/node@{version}"))
2374}
2375
2376struct PackFlows {
2377 descriptors: Vec<FlowDescriptor>,
2378 flows: HashMap<String, Flow>,
2379 metadata: PackMetadata,
2380}
2381
2382const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
2383 "greentic.pack.runtime_flow",
2384 "greentic.pack.flow_runtime",
2385 "greentic.pack.runtime_flows",
2386];
2387
2388#[derive(Debug, Deserialize)]
2389struct RuntimeFlowBundle {
2390 flows: Vec<RuntimeFlow>,
2391}
2392
2393#[derive(Debug, Deserialize)]
2394struct RuntimeFlow {
2395 id: String,
2396 #[serde(alias = "flow_type")]
2397 kind: FlowKind,
2398 #[serde(default)]
2399 schema_version: Option<String>,
2400 #[serde(default)]
2401 start: Option<String>,
2402 #[serde(default)]
2403 entrypoints: BTreeMap<String, Value>,
2404 nodes: BTreeMap<String, RuntimeNode>,
2405 #[serde(default)]
2406 metadata: Option<FlowMetadata>,
2407}
2408
2409#[derive(Debug, Deserialize)]
2410struct RuntimeNode {
2411 #[serde(alias = "component")]
2412 component_id: String,
2413 #[serde(default, alias = "operation")]
2414 operation_name: Option<String>,
2415 #[serde(default, alias = "payload", alias = "input")]
2416 operation_payload: Value,
2417 #[serde(default)]
2418 routing: Option<Routing>,
2419 #[serde(default)]
2420 telemetry: Option<TelemetryHints>,
2421}
2422
2423fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
2424 if bytes.is_empty() {
2425 return Ok(Value::Null);
2426 }
2427 serde_json::from_slice(&bytes).or_else(|_| {
2428 String::from_utf8(bytes)
2429 .map(Value::String)
2430 .map_err(|err| anyhow!(err))
2431 })
2432}
2433
2434impl PackFlows {
2435 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
2436 if let Some(flows) = flows_from_runtime_extension(&manifest) {
2437 return flows;
2438 }
2439 let descriptors = manifest
2440 .flows
2441 .iter()
2442 .map(|entry| FlowDescriptor {
2443 id: entry.id.as_str().to_string(),
2444 flow_type: flow_kind_to_str(entry.kind).to_string(),
2445 pack_id: manifest.pack_id.as_str().to_string(),
2446 profile: manifest.pack_id.as_str().to_string(),
2447 version: manifest.version.to_string(),
2448 description: None,
2449 })
2450 .collect();
2451 let mut flows = HashMap::new();
2452 for entry in &manifest.flows {
2453 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
2454 }
2455 Self {
2456 metadata: PackMetadata::from_manifest(&manifest),
2457 descriptors,
2458 flows,
2459 }
2460 }
2461}
2462
2463fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
2464 let extensions = manifest.extensions.as_ref()?;
2465 let extension = extensions.iter().find_map(|(key, ext)| {
2466 if RUNTIME_FLOW_EXTENSION_IDS
2467 .iter()
2468 .any(|candidate| candidate == key)
2469 {
2470 Some(ext)
2471 } else {
2472 None
2473 }
2474 })?;
2475 let runtime_flows = match decode_runtime_flow_extension(extension) {
2476 Some(flows) if !flows.is_empty() => flows,
2477 _ => return None,
2478 };
2479
2480 let descriptors = runtime_flows
2481 .iter()
2482 .map(|flow| FlowDescriptor {
2483 id: flow.id.as_str().to_string(),
2484 flow_type: flow_kind_to_str(flow.kind).to_string(),
2485 pack_id: manifest.pack_id.as_str().to_string(),
2486 profile: manifest.pack_id.as_str().to_string(),
2487 version: manifest.version.to_string(),
2488 description: None,
2489 })
2490 .collect::<Vec<_>>();
2491 let flows = runtime_flows
2492 .into_iter()
2493 .map(|flow| (flow.id.as_str().to_string(), flow))
2494 .collect();
2495
2496 Some(PackFlows {
2497 metadata: PackMetadata::from_manifest(manifest),
2498 descriptors,
2499 flows,
2500 })
2501}
2502
2503fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
2504 let value = match extension.inline.as_ref()? {
2505 ExtensionInline::Other(value) => value.clone(),
2506 _ => return None,
2507 };
2508
2509 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
2510 return Some(collect_runtime_flows(bundle.flows));
2511 }
2512
2513 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
2514 return Some(collect_runtime_flows(flows));
2515 }
2516
2517 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
2518 return Some(flows);
2519 }
2520
2521 warn!(
2522 extension = %extension.kind,
2523 version = %extension.version,
2524 "runtime flow extension present but could not be decoded"
2525 );
2526 None
2527}
2528
2529fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
2530 flows
2531 .into_iter()
2532 .filter_map(|flow| match runtime_flow_to_flow(flow) {
2533 Ok(flow) => Some(flow),
2534 Err(err) => {
2535 warn!(error = %err, "failed to decode runtime flow");
2536 None
2537 }
2538 })
2539 .collect()
2540}
2541
2542fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
2543 let flow_id = FlowId::from_str(&runtime.id)
2544 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
2545 let mut entrypoints = runtime.entrypoints;
2546 if entrypoints.is_empty()
2547 && let Some(start) = &runtime.start
2548 {
2549 entrypoints.insert("default".into(), Value::String(start.clone()));
2550 }
2551
2552 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
2553 for (id, node) in runtime.nodes {
2554 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
2555 let component_id = ComponentId::from_str(&node.component_id)
2556 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
2557 let component = FlowComponentRef {
2558 id: component_id,
2559 pack_alias: None,
2560 operation: node.operation_name,
2561 };
2562 let routing = node.routing.unwrap_or(Routing::End);
2563 let telemetry = node.telemetry.unwrap_or_default();
2564 nodes.insert(
2565 node_id.clone(),
2566 Node {
2567 id: node_id,
2568 component,
2569 input: InputMapping {
2570 mapping: node.operation_payload,
2571 },
2572 output: OutputMapping {
2573 mapping: Value::Null,
2574 },
2575 routing,
2576 telemetry,
2577 },
2578 );
2579 }
2580
2581 Ok(Flow {
2582 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
2583 id: flow_id,
2584 kind: runtime.kind,
2585 entrypoints,
2586 nodes,
2587 metadata: runtime.metadata.unwrap_or_default(),
2588 })
2589}
2590
2591fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
2592 match kind {
2593 greentic_types::FlowKind::Messaging => "messaging",
2594 greentic_types::FlowKind::Event => "event",
2595 greentic_types::FlowKind::ComponentConfig => "component-config",
2596 greentic_types::FlowKind::Job => "job",
2597 greentic_types::FlowKind::Http => "http",
2598 }
2599}
2600
2601fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
2602 let mut file = archive
2603 .by_name(name)
2604 .with_context(|| format!("entry {name} missing from archive"))?;
2605 let mut buf = Vec::new();
2606 file.read_to_end(&mut buf)?;
2607 Ok(buf)
2608}
2609
2610fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
2611 for node in doc.nodes.values_mut() {
2612 let Some((component_ref, payload)) = node
2613 .raw
2614 .iter()
2615 .next()
2616 .map(|(key, value)| (key.clone(), value.clone()))
2617 else {
2618 continue;
2619 };
2620 if component_ref.starts_with("emit.") {
2621 node.operation = Some(component_ref);
2622 node.payload = payload;
2623 node.raw.clear();
2624 continue;
2625 }
2626 let (target_component, operation, input, config) =
2627 infer_component_exec(&payload, &component_ref);
2628 let mut payload_obj = serde_json::Map::new();
2629 payload_obj.insert("component".into(), Value::String(target_component));
2631 payload_obj.insert("operation".into(), Value::String(operation));
2632 payload_obj.insert("input".into(), input);
2633 if let Some(cfg) = config {
2634 payload_obj.insert("config".into(), cfg);
2635 }
2636 node.operation = Some("component.exec".to_string());
2637 node.payload = Value::Object(payload_obj);
2638 node.raw.clear();
2639 }
2640 doc
2641}
2642
2643fn infer_component_exec(
2644 payload: &Value,
2645 component_ref: &str,
2646) -> (String, String, Value, Option<Value>) {
2647 let default_op = if component_ref.starts_with("templating.") {
2648 "render"
2649 } else {
2650 "invoke"
2651 }
2652 .to_string();
2653
2654 if let Value::Object(map) = payload {
2655 let op = map
2656 .get("op")
2657 .or_else(|| map.get("operation"))
2658 .and_then(Value::as_str)
2659 .map(|s| s.to_string())
2660 .unwrap_or_else(|| default_op.clone());
2661
2662 let mut input = map.clone();
2663 let config = input.remove("config");
2664 let component = input
2665 .get("component")
2666 .or_else(|| input.get("component_ref"))
2667 .and_then(Value::as_str)
2668 .map(|s| s.to_string())
2669 .unwrap_or_else(|| component_ref.to_string());
2670 input.remove("component");
2671 input.remove("component_ref");
2672 input.remove("op");
2673 input.remove("operation");
2674 return (component, op, Value::Object(input), config);
2675 }
2676
2677 (component_ref.to_string(), default_op, payload.clone(), None)
2678}
2679
2680#[derive(Clone, Debug)]
2681struct ComponentSpec {
2682 id: String,
2683 version: String,
2684 legacy_path: Option<String>,
2685}
2686
2687#[derive(Clone, Debug)]
2688struct ComponentSourceInfo {
2689 digest: Option<String>,
2690 source: ComponentSourceRef,
2691 artifact: ComponentArtifactLocation,
2692 expected_wasm_sha256: Option<String>,
2693 skip_digest_verification: bool,
2694}
2695
2696#[derive(Clone, Debug)]
2697enum ComponentArtifactLocation {
2698 Inline { wasm_path: String },
2699 Remote,
2700}
2701
2702#[derive(Clone, Debug, Deserialize)]
2703struct PackLockV1 {
2704 schema_version: u32,
2705 components: Vec<PackLockComponent>,
2706}
2707
2708#[derive(Clone, Debug, Deserialize)]
2709struct PackLockComponent {
2710 name: String,
2711 #[serde(default, rename = "source_ref")]
2712 source_ref: Option<String>,
2713 #[serde(default, rename = "ref")]
2714 legacy_ref: Option<String>,
2715 #[serde(default)]
2716 component_id: Option<ComponentId>,
2717 #[serde(default)]
2718 bundled: Option<bool>,
2719 #[serde(default, rename = "bundled_path")]
2720 bundled_path: Option<String>,
2721 #[serde(default, rename = "path")]
2722 legacy_path: Option<String>,
2723 #[serde(default)]
2724 wasm_sha256: Option<String>,
2725 #[serde(default, rename = "sha256")]
2726 legacy_sha256: Option<String>,
2727 #[serde(default)]
2728 resolved_digest: Option<String>,
2729 #[serde(default)]
2730 digest: Option<String>,
2731}
2732
2733fn component_specs(
2734 manifest: Option<&greentic_types::PackManifest>,
2735 legacy_manifest: Option<&legacy_pack::PackManifest>,
2736 component_sources: Option<&ComponentSourcesV1>,
2737 pack_lock: Option<&PackLockV1>,
2738) -> Vec<ComponentSpec> {
2739 if let Some(manifest) = manifest {
2740 if !manifest.components.is_empty() {
2741 return manifest
2742 .components
2743 .iter()
2744 .map(|entry| ComponentSpec {
2745 id: entry.id.as_str().to_string(),
2746 version: entry.version.to_string(),
2747 legacy_path: None,
2748 })
2749 .collect();
2750 }
2751 if let Some(lock) = pack_lock {
2752 let mut seen = HashSet::new();
2753 let mut specs = Vec::new();
2754 for entry in &lock.components {
2755 let id = entry
2756 .component_id
2757 .as_ref()
2758 .map(|id| id.as_str())
2759 .unwrap_or(entry.name.as_str());
2760 if seen.insert(id.to_string()) {
2761 specs.push(ComponentSpec {
2762 id: id.to_string(),
2763 version: "0.0.0".to_string(),
2764 legacy_path: None,
2765 });
2766 }
2767 }
2768 return specs;
2769 }
2770 if let Some(sources) = component_sources {
2771 let mut seen = HashSet::new();
2772 let mut specs = Vec::new();
2773 for entry in &sources.components {
2774 let id = entry
2775 .component_id
2776 .as_ref()
2777 .map(|id| id.as_str())
2778 .unwrap_or(entry.name.as_str());
2779 if seen.insert(id.to_string()) {
2780 specs.push(ComponentSpec {
2781 id: id.to_string(),
2782 version: "0.0.0".to_string(),
2783 legacy_path: None,
2784 });
2785 }
2786 }
2787 return specs;
2788 }
2789 }
2790 if let Some(legacy_manifest) = legacy_manifest {
2791 return legacy_manifest
2792 .components
2793 .iter()
2794 .map(|entry| ComponentSpec {
2795 id: entry.name.clone(),
2796 version: entry.version.to_string(),
2797 legacy_path: Some(entry.file_wasm.clone()),
2798 })
2799 .collect();
2800 }
2801 Vec::new()
2802}
2803
2804fn component_sources_table(
2805 sources: Option<&ComponentSourcesV1>,
2806) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2807 let Some(sources) = sources else {
2808 return Ok(None);
2809 };
2810 let mut table = HashMap::new();
2811 for entry in &sources.components {
2812 let artifact = match &entry.artifact {
2813 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2814 wasm_path: wasm_path.clone(),
2815 },
2816 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2817 };
2818 let info = ComponentSourceInfo {
2819 digest: Some(entry.resolved.digest.clone()),
2820 source: entry.source.clone(),
2821 artifact,
2822 expected_wasm_sha256: None,
2823 skip_digest_verification: false,
2824 };
2825 if let Some(component_id) = entry.component_id.as_ref() {
2826 table.insert(component_id.as_str().to_string(), info.clone());
2827 }
2828 table.insert(entry.name.clone(), info);
2829 }
2830 Ok(Some(table))
2831}
2832
2833fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2834 let lock_path = if path.is_dir() {
2835 let candidate = path.join("pack.lock");
2836 if candidate.exists() {
2837 Some(candidate)
2838 } else {
2839 let candidate = path.join("pack.lock.json");
2840 candidate.exists().then_some(candidate)
2841 }
2842 } else {
2843 None
2844 };
2845 let Some(lock_path) = lock_path else {
2846 return Ok(None);
2847 };
2848 let raw = std::fs::read_to_string(&lock_path)
2849 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2850 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2851 if lock.schema_version != 1 {
2852 bail!("pack.lock schema_version must be 1");
2853 }
2854 Ok(Some(lock))
2855}
2856
2857fn find_pack_lock_roots(
2858 pack_path: &Path,
2859 is_dir: bool,
2860 archive_hint: Option<&Path>,
2861) -> Vec<PathBuf> {
2862 if is_dir {
2863 return vec![pack_path.to_path_buf()];
2864 }
2865 let mut roots = Vec::new();
2866 if let Some(archive_path) = archive_hint {
2867 if let Some(parent) = archive_path.parent() {
2868 roots.push(parent.to_path_buf());
2869 if let Some(grandparent) = parent.parent() {
2870 roots.push(grandparent.to_path_buf());
2871 }
2872 }
2873 } else if let Some(parent) = pack_path.parent() {
2874 roots.push(parent.to_path_buf());
2875 if let Some(grandparent) = parent.parent() {
2876 roots.push(grandparent.to_path_buf());
2877 }
2878 }
2879 roots
2880}
2881
2882fn normalize_sha256(digest: &str) -> Result<String> {
2883 let trimmed = digest.trim();
2884 if trimmed.is_empty() {
2885 bail!("sha256 digest cannot be empty");
2886 }
2887 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2888 if stripped.is_empty() {
2889 bail!("sha256 digest must include hex bytes after sha256:");
2890 }
2891 return Ok(trimmed.to_string());
2892 }
2893 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2894 return Ok(format!("sha256:{trimmed}"));
2895 }
2896 bail!("sha256 digest must be hex or sha256:<hex>");
2897}
2898
2899fn component_sources_table_from_pack_lock(
2900 lock: &PackLockV1,
2901 allow_missing_hash: bool,
2902) -> Result<HashMap<String, ComponentSourceInfo>> {
2903 let mut table = HashMap::new();
2904 let mut names = HashSet::new();
2905 for entry in &lock.components {
2906 if !names.insert(entry.name.clone()) {
2907 bail!(
2908 "pack.lock contains duplicate component name `{}`",
2909 entry.name
2910 );
2911 }
2912 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2913 (Some(primary), Some(legacy)) => {
2914 if primary != legacy {
2915 bail!(
2916 "pack.lock component {} has conflicting refs: {} vs {}",
2917 entry.name,
2918 primary,
2919 legacy
2920 );
2921 }
2922 primary.as_str()
2923 }
2924 (Some(primary), None) => primary.as_str(),
2925 (None, Some(legacy)) => legacy.as_str(),
2926 (None, None) => {
2927 bail!("pack.lock component {} missing source_ref", entry.name);
2928 }
2929 };
2930 let source: ComponentSourceRef = source_ref
2931 .parse()
2932 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2933 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2934 (Some(primary), Some(legacy)) => {
2935 if primary != legacy {
2936 bail!(
2937 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2938 entry.name,
2939 primary,
2940 legacy
2941 );
2942 }
2943 Some(primary.clone())
2944 }
2945 (Some(primary), None) => Some(primary.clone()),
2946 (None, Some(legacy)) => Some(legacy.clone()),
2947 (None, None) => None,
2948 };
2949 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2950 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2951 let wasm_path = bundled_path.ok_or_else(|| {
2952 anyhow!(
2953 "pack.lock component {} marked bundled but bundled_path is missing",
2954 entry.name
2955 )
2956 })?;
2957 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2958 (Some(primary), Some(legacy)) => {
2959 if primary != legacy {
2960 bail!(
2961 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2962 entry.name,
2963 primary,
2964 legacy
2965 );
2966 }
2967 Some(primary.as_str())
2968 }
2969 (Some(primary), None) => Some(primary.as_str()),
2970 (None, Some(legacy)) => Some(legacy.as_str()),
2971 (None, None) => None,
2972 };
2973 let expected = match expected_raw {
2974 Some(value) => Some(normalize_sha256(value)?),
2975 None => None,
2976 };
2977 if expected.is_none() && !allow_missing_hash {
2978 bail!(
2979 "pack.lock component {} missing wasm_sha256 for bundled component",
2980 entry.name
2981 );
2982 }
2983 (
2984 ComponentArtifactLocation::Inline { wasm_path },
2985 expected.clone(),
2986 expected,
2987 allow_missing_hash && expected_raw.is_none(),
2988 )
2989 } else {
2990 if source.is_tag() {
2991 bail!(
2992 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2993 entry.name,
2994 source
2995 );
2996 }
2997 let expected = entry
2998 .resolved_digest
2999 .as_deref()
3000 .or(entry.digest.as_deref())
3001 .ok_or_else(|| {
3002 anyhow!(
3003 "pack.lock component {} missing resolved_digest for remote component",
3004 entry.name
3005 )
3006 })?;
3007 (
3008 ComponentArtifactLocation::Remote,
3009 Some(normalize_digest(expected)),
3010 None,
3011 false,
3012 )
3013 };
3014 let info = ComponentSourceInfo {
3015 digest,
3016 source,
3017 artifact,
3018 expected_wasm_sha256,
3019 skip_digest_verification,
3020 };
3021 if let Some(component_id) = entry.component_id.as_ref() {
3022 let key = component_id.as_str().to_string();
3023 if table.contains_key(&key) {
3024 bail!(
3025 "pack.lock contains duplicate component id `{}`",
3026 component_id.as_str()
3027 );
3028 }
3029 table.insert(key, info.clone());
3030 }
3031 if entry.name
3032 != entry
3033 .component_id
3034 .as_ref()
3035 .map(|id| id.as_str())
3036 .unwrap_or("")
3037 {
3038 table.insert(entry.name.clone(), info);
3039 }
3040 }
3041 Ok(table)
3042}
3043
3044fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
3045 if let Some(path) = &spec.legacy_path {
3046 return root.join(path);
3047 }
3048 root.join("components").join(format!("{}.wasm", spec.id))
3049}
3050
3051fn normalize_digest(digest: &str) -> String {
3052 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
3053 digest.to_string()
3054 } else {
3055 format!("sha256:{digest}")
3056 }
3057}
3058
3059fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
3060 if digest.starts_with("blake3:") {
3061 let hash = blake3::hash(bytes);
3062 return Ok(format!("blake3:{}", hash.to_hex()));
3063 }
3064 let mut hasher = sha2::Sha256::new();
3065 hasher.update(bytes);
3066 Ok(format!("sha256:{:x}", hasher.finalize()))
3067}
3068
3069fn compute_sha256_digest_for(bytes: &[u8]) -> String {
3070 let mut hasher = sha2::Sha256::new();
3071 hasher.update(bytes);
3072 format!("sha256:{:x}", hasher.finalize())
3073}
3074
3075fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
3076 let wasm_digest = digest
3077 .map(normalize_digest)
3078 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
3079 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
3080}
3081
3082async fn compile_component_with_cache(
3083 cache: &CacheManager,
3084 engine: &Engine,
3085 digest: Option<&str>,
3086 bytes: Vec<u8>,
3087) -> Result<Arc<Component>> {
3088 let key = build_artifact_key(cache, digest, &bytes);
3089 cache.get_component(engine, &key, || Ok(bytes)).await
3090}
3091
3092fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
3093 let normalized_expected = normalize_digest(expected);
3094 let actual = compute_digest_for(bytes, &normalized_expected)?;
3095 if normalize_digest(&actual) != normalized_expected {
3096 bail!(
3097 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
3098 );
3099 }
3100 Ok(())
3101}
3102
3103fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
3104 let normalized_expected = normalize_sha256(expected)?;
3105 let actual = compute_sha256_digest_for(bytes);
3106 if actual != normalized_expected {
3107 bail!(
3108 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
3109 );
3110 }
3111 Ok(())
3112}
3113
3114#[cfg(test)]
3115mod pack_lock_tests {
3116 use super::*;
3117 use tempfile::TempDir;
3118
3119 #[test]
3120 fn pack_lock_tag_ref_requires_bundle() {
3121 let lock = PackLockV1 {
3122 schema_version: 1,
3123 components: vec![PackLockComponent {
3124 name: "templates".to_string(),
3125 source_ref: Some("oci://registry.test/templates:latest".to_string()),
3126 legacy_ref: None,
3127 component_id: None,
3128 bundled: Some(false),
3129 bundled_path: None,
3130 legacy_path: None,
3131 wasm_sha256: None,
3132 legacy_sha256: None,
3133 resolved_digest: None,
3134 digest: None,
3135 }],
3136 };
3137 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
3138 assert!(
3139 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
3140 "unexpected error: {err}"
3141 );
3142 }
3143
3144 #[test]
3145 fn bundled_hash_mismatch_errors() {
3146 let rt = tokio::runtime::Runtime::new().expect("runtime");
3147 let temp = TempDir::new().expect("temp dir");
3148 let engine = Engine::default();
3149 let engine_profile =
3150 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
3151 let cache_config = CacheConfig {
3152 root: temp.path().join("cache"),
3153 ..CacheConfig::default()
3154 };
3155 let cache = CacheManager::new(cache_config, engine_profile);
3156 let wasm_path = temp.path().join("component.wasm");
3157 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
3158 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
3159 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
3160 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
3161
3162 let spec = ComponentSpec {
3163 id: "qa.process".to_string(),
3164 version: "0.0.0".to_string(),
3165 legacy_path: None,
3166 };
3167 let mut missing = HashSet::new();
3168 missing.insert(spec.id.clone());
3169
3170 let mut sources = HashMap::new();
3171 sources.insert(
3172 spec.id.clone(),
3173 ComponentSourceInfo {
3174 digest: Some("sha256:deadbeef".to_string()),
3175 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
3176 artifact: ComponentArtifactLocation::Inline {
3177 wasm_path: "component.wasm".to_string(),
3178 },
3179 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
3180 skip_digest_verification: false,
3181 },
3182 );
3183
3184 let mut loaded = HashMap::new();
3185 let result = rt.block_on(load_components_from_sources(
3186 &cache,
3187 &engine,
3188 &sources,
3189 &ComponentResolution::default(),
3190 &[spec],
3191 &mut missing,
3192 &mut loaded,
3193 Some(temp.path()),
3194 None,
3195 ));
3196 let err = result.unwrap_err();
3197 assert!(
3198 err.to_string().contains("bundled digest mismatch"),
3199 "unexpected error: {err}"
3200 );
3201 }
3202}
3203
3204#[cfg(test)]
3205mod pack_resolution_prop_tests {
3206 use super::*;
3207 use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
3208 use proptest::prelude::*;
3209 use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
3210 use std::collections::BTreeSet;
3211 use std::path::Path;
3212 use std::str::FromStr;
3213
3214 #[derive(Clone, Debug)]
3215 enum ResolveRequest {
3216 ById(String),
3217 ByName(String),
3218 }
3219
3220 #[derive(Clone, Debug, PartialEq, Eq)]
3221 struct ResolvedComponent {
3222 key: String,
3223 source: String,
3224 artifact: String,
3225 digest: Option<String>,
3226 expected_wasm_sha256: Option<String>,
3227 skip_digest_verification: bool,
3228 }
3229
3230 #[derive(Clone, Debug, PartialEq, Eq)]
3231 struct ResolveError {
3232 code: String,
3233 message: String,
3234 context_key: String,
3235 }
3236
3237 #[derive(Clone, Debug)]
3238 struct Scenario {
3239 pack_lock: Option<PackLockV1>,
3240 component_sources: Option<ComponentSourcesV1>,
3241 request: ResolveRequest,
3242 expected_sha256: Option<String>,
3243 bytes: Vec<u8>,
3244 }
3245
3246 fn resolve_component_test(
3247 sources: Option<&ComponentSourcesV1>,
3248 lock: Option<&PackLockV1>,
3249 request: &ResolveRequest,
3250 ) -> Result<ResolvedComponent, ResolveError> {
3251 let table = if let Some(lock) = lock {
3252 component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
3253 code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
3254 message: err.to_string(),
3255 context_key: request_key(request).to_string(),
3256 })?
3257 } else {
3258 let sources = component_sources_table(sources).map_err(|err| ResolveError {
3259 code: "component_sources_error".to_string(),
3260 message: err.to_string(),
3261 context_key: request_key(request).to_string(),
3262 })?;
3263 sources.ok_or_else(|| ResolveError {
3264 code: "missing_component_sources".to_string(),
3265 message: "component sources not provided".to_string(),
3266 context_key: request_key(request).to_string(),
3267 })?
3268 };
3269
3270 let key = request_key(request);
3271 let source = table.get(key).ok_or_else(|| ResolveError {
3272 code: "component_not_found".to_string(),
3273 message: format!("component {key} not found"),
3274 context_key: key.to_string(),
3275 })?;
3276
3277 Ok(ResolvedComponent {
3278 key: key.to_string(),
3279 source: source.source.to_string(),
3280 artifact: match source.artifact {
3281 ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
3282 ComponentArtifactLocation::Remote => "remote".to_string(),
3283 },
3284 digest: source.digest.clone(),
3285 expected_wasm_sha256: source.expected_wasm_sha256.clone(),
3286 skip_digest_verification: source.skip_digest_verification,
3287 })
3288 }
3289
3290 fn request_key(request: &ResolveRequest) -> &str {
3291 match request {
3292 ResolveRequest::ById(value) => value.as_str(),
3293 ResolveRequest::ByName(value) => value.as_str(),
3294 }
3295 }
3296
3297 fn classify_pack_lock_error(message: &str) -> &'static str {
3298 if message.contains("duplicate component name") {
3299 "duplicate_name"
3300 } else if message.contains("duplicate component id") {
3301 "duplicate_id"
3302 } else if message.contains("conflicting refs") {
3303 "conflicting_ref"
3304 } else if message.contains("conflicting bundled paths") {
3305 "conflicting_bundled_path"
3306 } else if message.contains("conflicting wasm_sha256") {
3307 "conflicting_wasm_sha256"
3308 } else if message.contains("missing source_ref") {
3309 "missing_source_ref"
3310 } else if message.contains("marked bundled but bundled_path is missing") {
3311 "missing_bundled_path"
3312 } else if message.contains("missing wasm_sha256") {
3313 "missing_wasm_sha256"
3314 } else if message.contains("tag ref") && message.contains("not bundled") {
3315 "tag_ref_requires_bundle"
3316 } else if message.contains("missing resolved_digest") {
3317 "missing_resolved_digest"
3318 } else if message.contains("invalid component ref") {
3319 "invalid_component_ref"
3320 } else if message.contains("sha256 digest") {
3321 "invalid_sha256"
3322 } else {
3323 "unknown_error"
3324 }
3325 }
3326
3327 fn known_error_codes() -> BTreeSet<&'static str> {
3328 [
3329 "component_sources_error",
3330 "missing_component_sources",
3331 "component_not_found",
3332 "duplicate_name",
3333 "duplicate_id",
3334 "conflicting_ref",
3335 "conflicting_bundled_path",
3336 "conflicting_wasm_sha256",
3337 "missing_source_ref",
3338 "missing_bundled_path",
3339 "missing_wasm_sha256",
3340 "tag_ref_requires_bundle",
3341 "missing_resolved_digest",
3342 "invalid_component_ref",
3343 "invalid_sha256",
3344 "unknown_error",
3345 ]
3346 .into_iter()
3347 .collect()
3348 }
3349
3350 fn proptest_config() -> ProptestConfig {
3351 let cases = std::env::var("PROPTEST_CASES")
3352 .ok()
3353 .and_then(|value| value.parse::<u32>().ok())
3354 .unwrap_or(128);
3355 ProptestConfig {
3356 cases,
3357 failure_persistence: None,
3358 ..ProptestConfig::default()
3359 }
3360 }
3361
3362 fn proptest_seed() -> Option<[u8; 32]> {
3363 let seed = std::env::var("PROPTEST_SEED")
3364 .ok()
3365 .and_then(|value| value.parse::<u64>().ok())?;
3366 let mut bytes = [0u8; 32];
3367 bytes[..8].copy_from_slice(&seed.to_le_bytes());
3368 Some(bytes)
3369 }
3370
3371 fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
3372 let config = ProptestConfig {
3373 cases,
3374 failure_persistence: None,
3375 ..ProptestConfig::default()
3376 };
3377 let mut runner = match seed {
3378 Some(bytes) => {
3379 TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
3380 }
3381 None => TestRunner::new(config),
3382 };
3383 runner
3384 .run(&strategy, |scenario| {
3385 run_scenario(&scenario);
3386 Ok(())
3387 })
3388 .unwrap();
3389 }
3390
3391 fn run_scenario(scenario: &Scenario) {
3392 let known_codes = known_error_codes();
3393 let first = resolve_component_test(
3394 scenario.component_sources.as_ref(),
3395 scenario.pack_lock.as_ref(),
3396 &scenario.request,
3397 );
3398 let second = resolve_component_test(
3399 scenario.component_sources.as_ref(),
3400 scenario.pack_lock.as_ref(),
3401 &scenario.request,
3402 );
3403 assert_eq!(normalize_result(&first), normalize_result(&second));
3404
3405 if let Some(lock) = scenario.pack_lock.as_ref() {
3406 let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
3407 assert_eq!(normalize_result(&first), normalize_result(&lock_only));
3408 }
3409
3410 if let Err(err) = first.as_ref() {
3411 assert!(
3412 known_codes.contains(err.code.as_str()),
3413 "unexpected error code {}: {}",
3414 err.code,
3415 err.message
3416 );
3417 }
3418
3419 if let Some(expected) = scenario.expected_sha256.as_deref() {
3420 let expected_ok =
3421 verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
3422 let actual = compute_sha256_digest_for(&scenario.bytes);
3423 if actual == normalize_sha256(expected).unwrap_or_default() {
3424 assert!(expected_ok, "expected sha256 match to succeed");
3425 } else {
3426 assert!(!expected_ok, "expected sha256 mismatch to fail");
3427 }
3428 }
3429 }
3430
3431 fn normalize_result(
3432 result: &Result<ResolvedComponent, ResolveError>,
3433 ) -> Result<ResolvedComponent, ResolveError> {
3434 match result {
3435 Ok(value) => Ok(value.clone()),
3436 Err(err) => Err(err.clone()),
3437 }
3438 }
3439
3440 fn scenario_strategy() -> impl Strategy<Value = Scenario> {
3441 let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
3442 let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
3443 let tag_ref = any::<bool>();
3444 let bundled = any::<bool>();
3445 let include_sha = any::<bool>();
3446 let include_component_id = any::<bool>();
3447 let request_by_id = any::<bool>();
3448 let use_lock = any::<bool>();
3449 let use_sources = any::<bool>();
3450 let bytes = prop::collection::vec(any::<u8>(), 1..64);
3451
3452 (
3453 name,
3454 alt_name,
3455 tag_ref,
3456 bundled,
3457 include_sha,
3458 include_component_id,
3459 request_by_id,
3460 use_lock,
3461 use_sources,
3462 bytes,
3463 )
3464 .prop_map(
3465 |(
3466 name,
3467 alt_name,
3468 tag_ref,
3469 bundled,
3470 include_sha,
3471 include_component_id,
3472 request_by_id,
3473 use_lock,
3474 use_sources,
3475 bytes,
3476 )| {
3477 let component_id_str = if include_component_id {
3478 alt_name.clone()
3479 } else {
3480 name.clone()
3481 };
3482 let component_id = ComponentId::from_str(&component_id_str).ok();
3483 let source_ref = if tag_ref {
3484 format!("oci://registry.test/{name}:v1")
3485 } else {
3486 format!(
3487 "oci://registry.test/{name}@sha256:{}",
3488 hex::encode([0x11u8; 32])
3489 )
3490 };
3491 let expected_sha256 = if bundled && include_sha {
3492 Some(compute_sha256_digest_for(&bytes))
3493 } else {
3494 None
3495 };
3496
3497 let lock_component = PackLockComponent {
3498 name: name.clone(),
3499 source_ref: Some(source_ref),
3500 legacy_ref: None,
3501 component_id,
3502 bundled: Some(bundled),
3503 bundled_path: if bundled {
3504 Some(format!("components/{name}.wasm"))
3505 } else {
3506 None
3507 },
3508 legacy_path: None,
3509 wasm_sha256: expected_sha256.clone(),
3510 legacy_sha256: None,
3511 resolved_digest: if bundled {
3512 None
3513 } else {
3514 Some("sha256:deadbeef".to_string())
3515 },
3516 digest: None,
3517 };
3518
3519 let pack_lock = if use_lock {
3520 Some(PackLockV1 {
3521 schema_version: 1,
3522 components: vec![lock_component],
3523 })
3524 } else {
3525 None
3526 };
3527
3528 let component_sources = if use_sources {
3529 Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
3530 name: name.clone(),
3531 component_id: ComponentId::from_str(&name).ok(),
3532 source: ComponentSourceRef::from_str(
3533 "oci://registry.test/component@sha256:deadbeef",
3534 )
3535 .expect("component ref"),
3536 resolved: ResolvedComponentV1 {
3537 digest: "sha256:deadbeef".to_string(),
3538 signature: None,
3539 signed_by: None,
3540 },
3541 artifact: if bundled {
3542 ArtifactLocationV1::Inline {
3543 wasm_path: format!("components/{name}.wasm"),
3544 manifest_path: None,
3545 }
3546 } else {
3547 ArtifactLocationV1::Remote
3548 },
3549 licensing_hint: None,
3550 metering_hint: None,
3551 }]))
3552 } else {
3553 None
3554 };
3555
3556 let request = if request_by_id {
3557 ResolveRequest::ById(component_id_str.clone())
3558 } else {
3559 ResolveRequest::ByName(name.clone())
3560 };
3561
3562 Scenario {
3563 pack_lock,
3564 component_sources,
3565 request,
3566 expected_sha256,
3567 bytes,
3568 }
3569 },
3570 )
3571 }
3572
3573 #[test]
3574 fn pack_resolution_proptest() {
3575 let seed = proptest_seed();
3576 run_cases(scenario_strategy(), proptest_config().cases, seed);
3577 }
3578
3579 #[test]
3580 fn pack_resolution_regression_seeds() {
3581 let seeds_path =
3582 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
3583 let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
3584 for line in raw.lines() {
3585 let line = line.trim();
3586 if line.is_empty() || line.starts_with('#') {
3587 continue;
3588 }
3589 let seed = line.parse::<u64>().expect("seed must be an integer");
3590 let mut bytes = [0u8; 32];
3591 bytes[..8].copy_from_slice(&seed.to_le_bytes());
3592 run_cases(scenario_strategy(), 1, Some(bytes));
3593 }
3594 }
3595}
3596
3597fn locate_pack_assets(
3598 materialized_root: Option<&Path>,
3599 archive_hint: Option<&Path>,
3600) -> Result<(Option<PathBuf>, Option<TempDir>)> {
3601 if let Some(root) = materialized_root {
3602 let assets = root.join("assets");
3603 if assets.is_dir() {
3604 return Ok((Some(assets), None));
3605 }
3606 }
3607 if let Some(path) = archive_hint
3608 && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
3609 {
3610 return Ok((Some(assets), Some(tempdir)));
3611 }
3612 Ok((None, None))
3613}
3614
3615fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
3616 let file =
3617 File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
3618 let mut archive =
3619 ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
3620 let temp = TempDir::new().context("failed to create temporary assets directory")?;
3621 let mut found = false;
3622 for idx in 0..archive.len() {
3623 let mut entry = archive.by_index(idx)?;
3624 let name = entry.name();
3625 if !name.starts_with("assets/") {
3626 continue;
3627 }
3628 let dest = temp.path().join(name);
3629 if name.ends_with('/') {
3630 std::fs::create_dir_all(&dest)?;
3631 found = true;
3632 continue;
3633 }
3634 if let Some(parent) = dest.parent() {
3635 std::fs::create_dir_all(parent)?;
3636 }
3637 let mut outfile = std::fs::File::create(&dest)?;
3638 std::io::copy(&mut entry, &mut outfile)?;
3639 found = true;
3640 }
3641 if found {
3642 let assets_path = temp.path().join("assets");
3643 Ok(Some((temp, assets_path)))
3644 } else {
3645 Ok(None)
3646 }
3647}
3648
3649fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
3650 let mut opts = DistOptions {
3651 allow_tags: true,
3652 ..DistOptions::default()
3653 };
3654 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
3655 opts.cache_dir = cache_dir;
3656 }
3657 if component_resolution.dist_offline {
3658 opts.offline = true;
3659 }
3660 opts
3661}
3662
3663#[allow(clippy::too_many_arguments)]
3664async fn load_components_from_sources(
3665 cache: &CacheManager,
3666 engine: &Engine,
3667 component_sources: &HashMap<String, ComponentSourceInfo>,
3668 component_resolution: &ComponentResolution,
3669 specs: &[ComponentSpec],
3670 missing: &mut HashSet<String>,
3671 into: &mut HashMap<String, PackComponent>,
3672 materialized_root: Option<&Path>,
3673 archive_hint: Option<&Path>,
3674) -> Result<()> {
3675 let mut archive = if let Some(path) = archive_hint {
3676 Some(
3677 ZipArchive::new(File::open(path)?)
3678 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
3679 )
3680 } else {
3681 None
3682 };
3683 let mut dist_client: Option<DistClient> = None;
3684
3685 for spec in specs {
3686 if !missing.contains(&spec.id) {
3687 continue;
3688 }
3689 let Some(source) = component_sources.get(&spec.id) else {
3690 continue;
3691 };
3692
3693 let bytes = match &source.artifact {
3694 ComponentArtifactLocation::Inline { wasm_path } => {
3695 if let Some(root) = materialized_root {
3696 let path = root.join(wasm_path);
3697 if path.exists() {
3698 std::fs::read(&path).with_context(|| {
3699 format!(
3700 "failed to read inline component {} from {}",
3701 spec.id,
3702 path.display()
3703 )
3704 })?
3705 } else if archive.is_none() {
3706 bail!("inline component {} missing at {}", spec.id, path.display());
3707 } else {
3708 read_entry(
3709 archive.as_mut().expect("archive present when needed"),
3710 wasm_path,
3711 )
3712 .with_context(|| {
3713 format!(
3714 "inline component {} missing at {} in pack archive",
3715 spec.id, wasm_path
3716 )
3717 })?
3718 }
3719 } else if let Some(archive) = archive.as_mut() {
3720 read_entry(archive, wasm_path).with_context(|| {
3721 format!(
3722 "inline component {} missing at {} in pack archive",
3723 spec.id, wasm_path
3724 )
3725 })?
3726 } else {
3727 bail!(
3728 "inline component {} missing and no pack source available",
3729 spec.id
3730 );
3731 }
3732 }
3733 ComponentArtifactLocation::Remote => {
3734 if source.source.is_tag() {
3735 bail!(
3736 "component {} uses tag ref {} but is not bundled; rebuild the pack",
3737 spec.id,
3738 source.source
3739 );
3740 }
3741 let client = dist_client.get_or_insert_with(|| {
3742 DistClient::new(dist_options_from(component_resolution))
3743 });
3744 let reference = source.source.to_string();
3745 fault::maybe_fail_asset(&reference)
3746 .await
3747 .with_context(|| format!("fault injection blocked asset {reference}"))?;
3748 let digest = source.digest.as_deref().ok_or_else(|| {
3749 anyhow!(
3750 "component {} missing expected digest for remote component",
3751 spec.id
3752 )
3753 })?;
3754 let cache_path = if let Ok(cache_path) = client.fetch_digest(digest).await {
3755 cache_path
3756 } else if component_resolution.dist_offline {
3757 client
3758 .fetch_digest(digest)
3759 .await
3760 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3761 } else {
3762 let source = client
3763 .parse_source(&reference)
3764 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3765 let descriptor = client
3766 .resolve(source, ResolvePolicy)
3767 .await
3768 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3769 let resolved = client
3770 .fetch(&descriptor, CachePolicy)
3771 .await
3772 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3773 let expected = normalize_digest(digest);
3774 let actual = normalize_digest(&resolved.digest);
3775 if expected != actual {
3776 bail!(
3777 "component {} digest mismatch after fetch: expected {}, got {}",
3778 spec.id,
3779 expected,
3780 actual
3781 );
3782 }
3783 resolved.cache_path.ok_or_else(|| {
3784 anyhow!(
3785 "component {} resolved from {} but cache path is missing",
3786 spec.id,
3787 reference
3788 )
3789 })?
3790 };
3791 std::fs::read(&cache_path).with_context(|| {
3792 format!(
3793 "failed to read cached component {} from {}",
3794 spec.id,
3795 cache_path.display()
3796 )
3797 })?
3798 }
3799 };
3800
3801 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3802 verify_wasm_sha256(&spec.id, expected, &bytes)?;
3803 } else if source.skip_digest_verification {
3804 let actual = compute_sha256_digest_for(&bytes);
3805 warn!(
3806 component_id = %spec.id,
3807 digest = %actual,
3808 "bundled component missing wasm_sha256; allowing due to flag"
3809 );
3810 } else {
3811 let expected = source.digest.as_deref().ok_or_else(|| {
3812 anyhow!(
3813 "component {} missing expected digest for verification",
3814 spec.id
3815 )
3816 })?;
3817 verify_component_digest(&spec.id, expected, &bytes)?;
3818 }
3819 let component =
3820 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3821 .await
3822 .with_context(|| format!("failed to compile component {}", spec.id))?;
3823 into.insert(
3824 spec.id.clone(),
3825 PackComponent {
3826 name: spec.id.clone(),
3827 version: spec.version.clone(),
3828 component,
3829 },
3830 );
3831 missing.remove(&spec.id);
3832 }
3833
3834 Ok(())
3835}
3836
3837fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3838 match err {
3839 DistError::NotFound { reference: missing } => anyhow!(
3840 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3841 component_id,
3842 missing,
3843 reference
3844 ),
3845 DistError::Offline { reference: blocked } => anyhow!(
3846 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3847 component_id,
3848 blocked,
3849 reference
3850 ),
3851 DistError::Unauthorized { target } => anyhow!(
3852 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3853 component_id,
3854 target,
3855 reference
3856 ),
3857 other => anyhow!(
3858 "failed to resolve component {} from {}: {}",
3859 component_id,
3860 reference,
3861 other
3862 ),
3863 }
3864}
3865
3866async fn load_components_from_overrides(
3867 cache: &CacheManager,
3868 engine: &Engine,
3869 overrides: &HashMap<String, PathBuf>,
3870 specs: &[ComponentSpec],
3871 missing: &mut HashSet<String>,
3872 into: &mut HashMap<String, PackComponent>,
3873) -> Result<()> {
3874 for spec in specs {
3875 if !missing.contains(&spec.id) {
3876 continue;
3877 }
3878 let Some(path) = overrides.get(&spec.id) else {
3879 continue;
3880 };
3881 let bytes = std::fs::read(path)
3882 .with_context(|| format!("failed to read override component {}", path.display()))?;
3883 let component = compile_component_with_cache(cache, engine, None, bytes)
3884 .await
3885 .with_context(|| {
3886 format!(
3887 "failed to compile component {} from override {}",
3888 spec.id,
3889 path.display()
3890 )
3891 })?;
3892 into.insert(
3893 spec.id.clone(),
3894 PackComponent {
3895 name: spec.id.clone(),
3896 version: spec.version.clone(),
3897 component,
3898 },
3899 );
3900 missing.remove(&spec.id);
3901 }
3902 Ok(())
3903}
3904
3905async fn load_components_from_dir(
3906 cache: &CacheManager,
3907 engine: &Engine,
3908 root: &Path,
3909 specs: &[ComponentSpec],
3910 missing: &mut HashSet<String>,
3911 into: &mut HashMap<String, PackComponent>,
3912) -> Result<()> {
3913 for spec in specs {
3914 if !missing.contains(&spec.id) {
3915 continue;
3916 }
3917 let path = component_path_for_spec(root, spec);
3918 if !path.exists() {
3919 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3920 continue;
3921 }
3922 let bytes = std::fs::read(&path)
3923 .with_context(|| format!("failed to read component {}", path.display()))?;
3924 let component = compile_component_with_cache(cache, engine, None, bytes)
3925 .await
3926 .with_context(|| {
3927 format!(
3928 "failed to compile component {} from {}",
3929 spec.id,
3930 path.display()
3931 )
3932 })?;
3933 into.insert(
3934 spec.id.clone(),
3935 PackComponent {
3936 name: spec.id.clone(),
3937 version: spec.version.clone(),
3938 component,
3939 },
3940 );
3941 missing.remove(&spec.id);
3942 }
3943 Ok(())
3944}
3945
3946async fn load_components_from_archive(
3947 cache: &CacheManager,
3948 engine: &Engine,
3949 path: &Path,
3950 specs: &[ComponentSpec],
3951 missing: &mut HashSet<String>,
3952 into: &mut HashMap<String, PackComponent>,
3953) -> Result<()> {
3954 let mut archive = ZipArchive::new(File::open(path)?)
3955 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3956 for spec in specs {
3957 if !missing.contains(&spec.id) {
3958 continue;
3959 }
3960 let file_name = spec
3961 .legacy_path
3962 .clone()
3963 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3964 let bytes = match read_entry(&mut archive, &file_name) {
3965 Ok(bytes) => bytes,
3966 Err(err) => {
3967 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3968 continue;
3969 }
3970 };
3971 let component = compile_component_with_cache(cache, engine, None, bytes)
3972 .await
3973 .with_context(|| format!("failed to compile component {}", spec.id))?;
3974 into.insert(
3975 spec.id.clone(),
3976 PackComponent {
3977 name: spec.id.clone(),
3978 version: spec.version.clone(),
3979 component,
3980 },
3981 );
3982 missing.remove(&spec.id);
3983 }
3984 Ok(())
3985}
3986
3987#[cfg(test)]
3988mod tests {
3989 use super::*;
3990 use greentic_flow::model::{FlowDoc, NodeDoc};
3991 use indexmap::IndexMap;
3992 use serde_json::json;
3993
3994 #[test]
3995 fn normalizes_raw_component_to_component_exec() {
3996 let mut nodes = IndexMap::new();
3997 let mut raw = IndexMap::new();
3998 raw.insert(
3999 "templating.handlebars".into(),
4000 json!({ "template": "Hi {{name}}" }),
4001 );
4002 nodes.insert(
4003 "start".into(),
4004 NodeDoc {
4005 raw,
4006 routing: json!([{"out": true}]),
4007 ..Default::default()
4008 },
4009 );
4010 let doc = FlowDoc {
4011 id: "welcome".into(),
4012 title: None,
4013 description: None,
4014 flow_type: "messaging".into(),
4015 start: Some("start".into()),
4016 parameters: json!({}),
4017 tags: Vec::new(),
4018 schema_version: None,
4019 entrypoints: IndexMap::new(),
4020 meta: None,
4021 nodes,
4022 };
4023
4024 let normalized = normalize_flow_doc(doc);
4025 let node = normalized.nodes.get("start").expect("node exists");
4026 assert_eq!(node.operation.as_deref(), Some("component.exec"));
4027 assert!(node.raw.is_empty());
4028 let payload = node.payload.as_object().expect("payload object");
4029 assert_eq!(
4030 payload.get("component"),
4031 Some(&Value::String("templating.handlebars".into()))
4032 );
4033 assert_eq!(
4034 payload.get("operation"),
4035 Some(&Value::String("render".into()))
4036 );
4037 let input = payload.get("input").unwrap();
4038 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
4039 }
4040}
4041
4042#[derive(Clone, Debug, Default, Serialize, Deserialize)]
4043pub struct PackMetadata {
4044 pub pack_id: String,
4045 pub version: String,
4046 #[serde(default)]
4047 pub entry_flows: Vec<String>,
4048 #[serde(default)]
4049 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
4050}
4051
4052impl PackMetadata {
4053 fn from_wasm(bytes: &[u8]) -> Option<Self> {
4054 let parser = Parser::new(0);
4055 for payload in parser.parse_all(bytes) {
4056 let payload = payload.ok()?;
4057 match payload {
4058 Payload::CustomSection(section) => {
4059 if section.name() == "greentic.manifest"
4060 && let Ok(meta) = Self::from_bytes(section.data())
4061 {
4062 return Some(meta);
4063 }
4064 }
4065 Payload::DataSection(reader) => {
4066 for segment in reader.into_iter().flatten() {
4067 if let Ok(meta) = Self::from_bytes(segment.data) {
4068 return Some(meta);
4069 }
4070 }
4071 }
4072 _ => {}
4073 }
4074 }
4075 None
4076 }
4077
4078 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
4079 #[derive(Deserialize)]
4080 struct RawManifest {
4081 pack_id: String,
4082 version: String,
4083 #[serde(default)]
4084 entry_flows: Vec<String>,
4085 #[serde(default)]
4086 flows: Vec<RawFlow>,
4087 #[serde(default)]
4088 secret_requirements: Vec<greentic_types::SecretRequirement>,
4089 }
4090
4091 #[derive(Deserialize)]
4092 struct RawFlow {
4093 id: String,
4094 }
4095
4096 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
4097 let mut entry_flows = if manifest.entry_flows.is_empty() {
4098 manifest.flows.iter().map(|f| f.id.clone()).collect()
4099 } else {
4100 manifest.entry_flows.clone()
4101 };
4102 entry_flows.retain(|id| !id.is_empty());
4103 Ok(Self {
4104 pack_id: manifest.pack_id,
4105 version: manifest.version,
4106 entry_flows,
4107 secret_requirements: manifest.secret_requirements,
4108 })
4109 }
4110
4111 pub fn fallback(path: &Path) -> Self {
4112 let pack_id = path
4113 .file_stem()
4114 .map(|s| s.to_string_lossy().into_owned())
4115 .unwrap_or_else(|| "unknown-pack".to_string());
4116 Self {
4117 pack_id,
4118 version: "0.0.0".to_string(),
4119 entry_flows: Vec::new(),
4120 secret_requirements: Vec::new(),
4121 }
4122 }
4123
4124 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
4125 let entry_flows = manifest
4126 .flows
4127 .iter()
4128 .map(|flow| flow.id.as_str().to_string())
4129 .collect::<Vec<_>>();
4130 Self {
4131 pack_id: manifest.pack_id.as_str().to_string(),
4132 version: manifest.version.to_string(),
4133 entry_flows,
4134 secret_requirements: manifest.secret_requirements.clone(),
4135 }
4136 }
4137}