1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::{
16 schema_core::SchemaCorePre as LegacySchemaCorePre,
17 schema_core_schema::SchemaCorePre as SchemaSchemaCorePre,
18};
19use crate::provider_core_only;
20use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
21use anyhow::{Context, Result, anyhow, bail};
22use futures::executor::block_on;
23use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
24use greentic_interfaces_wasmtime::host_helpers::v1::{
25 self as host_v1, HostFns, add_all_v1_to_linker,
26 runner_host_http::RunnerHostHttp,
27 runner_host_kv::RunnerHostKv,
28 secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
29 state_store::{
30 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
31 StateStoreHost, TenantCtx as StateTenantCtx,
32 },
33 telemetry_logger::{
34 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
35 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
36 TenantCtx as TelemetryTenantCtx,
37 },
38};
39use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
40use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::http::http_client as http_client_client_alias;
41use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
42use greentic_pack::builder as legacy_pack;
43use greentic_types::flow::FlowHasher;
44use greentic_types::{
45 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
46 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
47 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
48 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
49 pack_manifest::ExtensionInline,
50};
51use host_v1::http_client as host_http_client;
52use host_v1::http_client::{
53 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
54 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
55 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
56 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
57};
58use indexmap::IndexMap;
59use once_cell::sync::Lazy;
60use parking_lot::{Mutex, RwLock};
61use reqwest::blocking::Client as BlockingClient;
62use runner_core::normalize_under_root;
63use serde::{Deserialize, Serialize};
64use serde_cbor;
65use serde_json::{self, Value};
66use sha2::Digest;
67use tempfile::TempDir;
68use tokio::fs;
69use wasmparser::{Parser, Payload};
70use wasmtime::{Store, StoreContextMut};
71use zip::ZipArchive;
72
73use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
74use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
75use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
76#[cfg(feature = "fault-injection")]
77use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
78
79use crate::config::HostConfig;
80use crate::fault;
81use crate::secrets::{DynSecretsManager, read_secret_blocking, write_secret_blocking};
82use crate::storage::state::STATE_PREFIX;
83use crate::storage::{DynSessionStore, DynStateStore};
84use crate::verify;
85use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
86use tracing::warn;
87use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
88use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
89
90use greentic_flow::model::FlowDoc;
91
92#[allow(dead_code)]
93pub struct PackRuntime {
94 path: PathBuf,
96 archive_path: Option<PathBuf>,
98 config: Arc<HostConfig>,
99 engine: Engine,
100 metadata: PackMetadata,
101 manifest: Option<greentic_types::PackManifest>,
102 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
103 component_manifests: HashMap<String, ComponentManifest>,
104 mocks: Option<Arc<MockLayer>>,
105 flows: Option<PackFlows>,
106 components: HashMap<String, PackComponent>,
107 http_client: Arc<BlockingClient>,
108 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
109 session_store: Option<DynSessionStore>,
110 state_store: Option<DynStateStore>,
111 wasi_policy: Arc<RunnerWasiPolicy>,
112 assets_tempdir: Option<TempDir>,
113 provider_registry: RwLock<Option<ProviderRegistry>>,
114 secrets: DynSecretsManager,
115 oauth_config: Option<OAuthBrokerConfig>,
116 cache: CacheManager,
117}
118
119struct PackComponent {
120 #[allow(dead_code)]
121 name: String,
122 #[allow(dead_code)]
123 version: String,
124 component: Arc<Component>,
125}
126
127fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
128where
129 F: FnOnce() -> Result<T> + Send + 'static,
130 T: Send + 'static,
131{
132 let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
133 let handle = builder
134 .spawn(move || {
135 let pid = std::process::id();
136 let thread_id = std::thread::current().id();
137 let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
138 tracing::info!(
139 event = "wasmtime.thread.start",
140 task = task_name,
141 pid,
142 thread_id = ?thread_id,
143 tokio_handle_present,
144 "starting Wasmtime thread"
145 );
146 task()
147 })
148 .context("failed to spawn Wasmtime thread")?;
149 handle
150 .join()
151 .map_err(|err| {
152 let reason = if let Some(msg) = err.downcast_ref::<&str>() {
153 msg.to_string()
154 } else if let Some(msg) = err.downcast_ref::<String>() {
155 msg.clone()
156 } else {
157 "unknown panic".to_string()
158 };
159 anyhow!("Wasmtime thread panicked: {reason}")
160 })
161 .and_then(|res| res)
162}
163
164#[derive(Debug, Default, Clone)]
165pub struct ComponentResolution {
166 pub materialized_root: Option<PathBuf>,
168 pub overrides: HashMap<String, PathBuf>,
170 pub dist_offline: bool,
172 pub dist_cache_dir: Option<PathBuf>,
174 pub allow_missing_hash: bool,
176}
177
178fn build_blocking_client() -> BlockingClient {
179 std::thread::spawn(|| {
180 BlockingClient::builder()
181 .no_proxy()
182 .build()
183 .expect("blocking client")
184 })
185 .join()
186 .expect("client build thread panicked")
187}
188
189fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
190 let (root, candidate) = if path.is_absolute() {
191 let parent = path
192 .parent()
193 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
194 let root = parent
195 .canonicalize()
196 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
197 let file = path
198 .file_name()
199 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
200 (root, PathBuf::from(file))
201 } else {
202 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
203 let base = if let Some(parent) = path.parent() {
204 cwd.join(parent)
205 } else {
206 cwd
207 };
208 let root = base
209 .canonicalize()
210 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
211 let file = path
212 .file_name()
213 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
214 (root, PathBuf::from(file))
215 };
216 let safe = normalize_under_root(&root, &candidate)?;
217 Ok((root, safe))
218}
219
220static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct FlowDescriptor {
224 pub id: String,
225 #[serde(rename = "type")]
226 pub flow_type: String,
227 pub pack_id: String,
228 pub profile: String,
229 pub version: String,
230 #[serde(default)]
231 pub description: Option<String>,
232}
233
234pub struct HostState {
235 #[allow(dead_code)]
236 pack_id: String,
237 config: Arc<HostConfig>,
238 http_client: Arc<BlockingClient>,
239 default_env: String,
240 #[allow(dead_code)]
241 session_store: Option<DynSessionStore>,
242 state_store: Option<DynStateStore>,
243 mocks: Option<Arc<MockLayer>>,
244 secrets: DynSecretsManager,
245 oauth_config: Option<OAuthBrokerConfig>,
246 oauth_host: OAuthBrokerHost,
247 exec_ctx: Option<ComponentExecCtx>,
248 component_ref: Option<String>,
249 provider_core_component: bool,
250}
251
252impl HostState {
253 #[allow(clippy::default_constructed_unit_structs)]
254 #[allow(clippy::too_many_arguments)]
255 pub fn new(
256 pack_id: String,
257 config: Arc<HostConfig>,
258 http_client: Arc<BlockingClient>,
259 mocks: Option<Arc<MockLayer>>,
260 session_store: Option<DynSessionStore>,
261 state_store: Option<DynStateStore>,
262 secrets: DynSecretsManager,
263 oauth_config: Option<OAuthBrokerConfig>,
264 exec_ctx: Option<ComponentExecCtx>,
265 component_ref: Option<String>,
266 provider_core_component: bool,
267 ) -> Result<Self> {
268 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
269 Ok(Self {
270 pack_id,
271 config,
272 http_client,
273 default_env,
274 session_store,
275 state_store,
276 mocks,
277 secrets,
278 oauth_config,
279 oauth_host: OAuthBrokerHost::default(),
280 exec_ctx,
281 component_ref,
282 provider_core_component,
283 })
284 }
285
286 fn instantiate_component_result(
287 linker: &mut Linker<ComponentState>,
288 store: &mut Store<ComponentState>,
289 component: &Component,
290 ctx: &ComponentExecCtx,
291 operation: &str,
292 input_json: &str,
293 ) -> Result<InvokeResult> {
294 let pre_instance = linker.instantiate_pre(component)?;
295 match component_api::v0_5::ComponentPre::new(pre_instance) {
296 Ok(pre) => {
297 let result = block_on(async {
298 let bindings = pre.instantiate_async(&mut *store).await?;
299 let node = bindings.greentic_component_node();
300 let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
301 let operation_owned = operation.to_string();
302 let input_owned = input_json.to_string();
303 node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
304 })?;
305 Ok(component_api::invoke_result_from_v0_5(result))
306 }
307 Err(err) => {
308 if is_missing_node_export(&err, "0.5.0") {
309 let pre_instance = linker.instantiate_pre(component)?;
310 let pre: component_api::v0_4::ComponentPre<ComponentState> =
311 component_api::v0_4::ComponentPre::new(pre_instance)?;
312 let result = block_on(async {
313 let bindings = pre.instantiate_async(&mut *store).await?;
314 let node = bindings.greentic_component_node();
315 let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
316 let operation_owned = operation.to_string();
317 let input_owned = input_json.to_string();
318 node.call_invoke(&mut *store, &ctx_v04, &operation_owned, &input_owned)
319 })?;
320 Ok(component_api::invoke_result_from_v0_4(result))
321 } else {
322 Err(err)
323 }
324 }
325 }
326 }
327
328 fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
329 match result {
330 InvokeResult::Ok(body) => {
331 if body.is_empty() {
332 return Ok(Value::Null);
333 }
334 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
335 }
336 InvokeResult::Err(NodeError {
337 code,
338 message,
339 retryable,
340 backoff_ms,
341 details,
342 }) => {
343 let mut obj = serde_json::Map::new();
344 obj.insert("ok".into(), Value::Bool(false));
345 let mut error = serde_json::Map::new();
346 error.insert("code".into(), Value::String(code));
347 error.insert("message".into(), Value::String(message));
348 error.insert("retryable".into(), Value::Bool(retryable));
349 if let Some(backoff) = backoff_ms {
350 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
351 }
352 if let Some(details) = details {
353 error.insert(
354 "details".into(),
355 serde_json::from_str(&details).unwrap_or(Value::String(details)),
356 );
357 }
358 obj.insert("error".into(), Value::Object(error));
359 Ok(Value::Object(obj))
360 }
361 }
362 }
363
364 pub fn get_secret(&self, key: &str) -> Result<String> {
365 if provider_core_only::is_enabled() {
366 bail!(provider_core_only::blocked_message("secrets"))
367 }
368 if !self.config.secrets_policy.is_allowed(key) {
369 bail!("secret {key} is not permitted by bindings policy");
370 }
371 if let Some(mock) = &self.mocks
372 && let Some(value) = mock.secrets_lookup(key)
373 {
374 return Ok(value);
375 }
376 let ctx = self.config.tenant_ctx();
377 let bytes = read_secret_blocking(&self.secrets, &ctx, &self.pack_id, key)
378 .context("failed to read secret from manager")?;
379 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
380 Ok(value)
381 }
382
383 fn allows_secret_write_in_provider_core_only(&self) -> bool {
384 self.provider_core_component || self.component_ref.is_none()
385 }
386
387 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
388 let tenant_raw = ctx
389 .as_ref()
390 .map(|ctx| ctx.tenant.clone())
391 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
392 .unwrap_or_else(|| self.config.tenant.clone());
393 let env_raw = ctx
394 .as_ref()
395 .map(|ctx| ctx.env.clone())
396 .unwrap_or_else(|| self.default_env.clone());
397 let tenant_id = TenantId::from_str(&tenant_raw)
398 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
399 let env_id = EnvId::from_str(&env_raw)
400 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
401 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
402 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
403 if let Some(team) = exec_ctx.tenant.team.as_ref() {
404 let team_id =
405 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
406 tenant_ctx = tenant_ctx.with_team(Some(team_id));
407 }
408 if let Some(user) = exec_ctx.tenant.user.as_ref() {
409 let user_id =
410 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
411 tenant_ctx = tenant_ctx.with_user(Some(user_id));
412 }
413 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
414 if let Some(node) = exec_ctx.node_id.as_ref() {
415 tenant_ctx = tenant_ctx.with_node(node.clone());
416 }
417 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
418 tenant_ctx = tenant_ctx.with_session(session.clone());
419 }
420 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
421 }
422
423 if let Some(ctx) = ctx {
424 if let Some(team) = ctx.team.or(ctx.team_id) {
425 let team_id =
426 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
427 tenant_ctx = tenant_ctx.with_team(Some(team_id));
428 }
429 if let Some(user) = ctx.user.or(ctx.user_id) {
430 let user_id =
431 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
432 tenant_ctx = tenant_ctx.with_user(Some(user_id));
433 }
434 if let Some(flow) = ctx.flow_id {
435 tenant_ctx = tenant_ctx.with_flow(flow);
436 }
437 if let Some(node) = ctx.node_id {
438 tenant_ctx = tenant_ctx.with_node(node);
439 }
440 if let Some(provider) = ctx.provider_id {
441 tenant_ctx = tenant_ctx.with_provider(provider);
442 }
443 if let Some(session) = ctx.session_id {
444 tenant_ctx = tenant_ctx.with_session(session);
445 }
446 tenant_ctx.trace_id = ctx.trace_id;
447 }
448 Ok(tenant_ctx)
449 }
450
451 fn send_http_request(
452 &mut self,
453 req: HttpRequest,
454 opts: Option<HttpRequestOptionsV1_1>,
455 _ctx: Option<HttpTenantCtx>,
456 ) -> Result<HttpResponse, HttpClientError> {
457 if !self.config.http_enabled {
458 return Err(HttpClientError {
459 code: "denied".into(),
460 message: "http client disabled by policy".into(),
461 });
462 }
463
464 let mut mock_state = None;
465 let raw_body = req.body.clone();
466 if let Some(mock) = &self.mocks
467 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
468 {
469 match mock.http_begin(&meta) {
470 HttpDecision::Mock(response) => {
471 let headers = response
472 .headers
473 .iter()
474 .map(|(k, v)| (k.clone(), v.clone()))
475 .collect();
476 return Ok(HttpResponse {
477 status: response.status,
478 headers,
479 body: response.body.clone().map(|b| b.into_bytes()),
480 });
481 }
482 HttpDecision::Deny(reason) => {
483 return Err(HttpClientError {
484 code: "denied".into(),
485 message: reason,
486 });
487 }
488 HttpDecision::Passthrough { record } => {
489 mock_state = Some((meta, record));
490 }
491 }
492 }
493
494 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
495 let mut builder = self.http_client.request(method, &req.url);
496 for (key, value) in req.headers {
497 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
498 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
499 {
500 builder = builder.header(header, header_value);
501 }
502 }
503
504 if let Some(body) = raw_body.clone() {
505 builder = builder.body(body);
506 }
507
508 if let Some(opts) = opts {
509 if let Some(timeout_ms) = opts.timeout_ms {
510 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
511 }
512 if opts.allow_insecure == Some(true) {
513 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
514 }
515 if let Some(follow_redirects) = opts.follow_redirects
516 && !follow_redirects
517 {
518 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
519 }
520 }
521
522 let response = match builder.send() {
523 Ok(resp) => resp,
524 Err(err) => {
525 warn!(url = %req.url, error = %err, "http client request failed");
526 return Err(HttpClientError {
527 code: "unavailable".into(),
528 message: err.to_string(),
529 });
530 }
531 };
532
533 let status = response.status().as_u16();
534 let headers_vec = response
535 .headers()
536 .iter()
537 .map(|(k, v)| {
538 (
539 k.as_str().to_string(),
540 v.to_str().unwrap_or_default().to_string(),
541 )
542 })
543 .collect::<Vec<_>>();
544 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
545
546 if let Some((meta, true)) = mock_state.take()
547 && let Some(mock) = &self.mocks
548 {
549 let recorded = HttpMockResponse::new(
550 status,
551 headers_vec.clone().into_iter().collect(),
552 body_bytes
553 .as_ref()
554 .map(|b| String::from_utf8_lossy(b).into_owned()),
555 );
556 mock.http_record(&meta, &recorded);
557 }
558
559 Ok(HttpResponse {
560 status,
561 headers: headers_vec,
562 body: body_bytes,
563 })
564 }
565}
566
567fn canonicalize_wasm_secret_key(raw: &str) -> String {
568 raw.trim()
569 .chars()
570 .map(|ch| {
571 let ch = ch.to_ascii_lowercase();
572 match ch {
573 'a'..='z' | '0'..='9' | '_' => ch,
574 _ => '_',
575 }
576 })
577 .collect()
578}
579
580#[cfg(test)]
581mod canonicalize_tests {
582 use super::canonicalize_wasm_secret_key;
583
584 #[test]
585 fn upper_snake_to_lower_snake() {
586 assert_eq!(
587 canonicalize_wasm_secret_key("TELEGRAM_BOT_TOKEN"),
588 "telegram_bot_token"
589 );
590 }
591
592 #[test]
593 fn trim_and_replace_non_alphanumeric() {
594 assert_eq!(
595 canonicalize_wasm_secret_key(" webex-bot-token "),
596 "webex_bot_token"
597 );
598 }
599
600 #[test]
601 fn preserve_existing_lower_snake_with_extra_underscores() {
602 assert_eq!(canonicalize_wasm_secret_key("MiXeD__Case"), "mixed__case");
603 }
604}
605
606impl SecretsStoreHost for HostState {
607 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
608 if provider_core_only::is_enabled() {
609 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
610 return Err(SecretsError::Denied);
611 }
612 if !self.config.secrets_policy.is_allowed(&key) {
613 return Err(SecretsError::Denied);
614 }
615 if let Some(mock) = &self.mocks
616 && let Some(value) = mock.secrets_lookup(&key)
617 {
618 return Ok(Some(value.into_bytes()));
619 }
620 let ctx = self.config.tenant_ctx();
621 let canonical_key = canonicalize_wasm_secret_key(&key);
622 match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
623 Ok(bytes) => Ok(Some(bytes)),
624 Err(err) => {
625 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
626 Err(SecretsError::NotFound)
627 }
628 }
629 }
630}
631
632impl SecretsStoreHostV1_1 for HostState {
633 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
634 if provider_core_only::is_enabled() {
635 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
636 return Err(SecretsErrorV1_1::Denied);
637 }
638 if !self.config.secrets_policy.is_allowed(&key) {
639 return Err(SecretsErrorV1_1::Denied);
640 }
641 if let Some(mock) = &self.mocks
642 && let Some(value) = mock.secrets_lookup(&key)
643 {
644 return Ok(Some(value.into_bytes()));
645 }
646 let ctx = self.config.tenant_ctx();
647 let canonical_key = canonicalize_wasm_secret_key(&key);
648 match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
649 Ok(bytes) => Ok(Some(bytes)),
650 Err(err) => {
651 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
652 Err(SecretsErrorV1_1::NotFound)
653 }
654 }
655 }
656
657 fn put(&mut self, key: String, value: Vec<u8>) {
658 if key.trim().is_empty() {
659 warn!(secret = %key, "secret write blocked: empty key");
660 panic!("secret write denied for key {key}: invalid key");
661 }
662 if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
663 warn!(
664 secret = %key,
665 component = self.component_ref.as_deref().unwrap_or("<pack>"),
666 "provider-core only mode enabled; blocking secrets store write"
667 );
668 panic!("secret write denied for key {key}: provider-core-only mode");
669 }
670 if !self.config.secrets_policy.is_allowed(&key) {
671 warn!(secret = %key, "secret write denied by bindings policy");
672 panic!("secret write denied for key {key}: policy");
673 }
674 let ctx = self.config.tenant_ctx();
675 let canonical_key = canonicalize_wasm_secret_key(&key);
676 if let Err(err) =
677 write_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key, &value)
678 {
679 warn!(secret = %key, canonical = %canonical_key, error = %err, "secret write failed");
680 panic!("secret write failed for key {key}");
681 }
682 }
683}
684
685impl HttpClientHost for HostState {
686 fn send(
687 &mut self,
688 req: HttpRequest,
689 ctx: Option<HttpTenantCtx>,
690 ) -> Result<HttpResponse, HttpClientError> {
691 self.send_http_request(req, None, ctx)
692 }
693}
694
695impl HttpClientHostV1_1 for HostState {
696 fn send(
697 &mut self,
698 req: HttpRequestV1_1,
699 opts: Option<HttpRequestOptionsV1_1>,
700 ctx: Option<HttpTenantCtxV1_1>,
701 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
702 let legacy_req = HttpRequest {
703 method: req.method,
704 url: req.url,
705 headers: req.headers,
706 body: req.body,
707 };
708 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
709 env: ctx.env,
710 tenant: ctx.tenant,
711 tenant_id: ctx.tenant_id,
712 team: ctx.team,
713 team_id: ctx.team_id,
714 user: ctx.user,
715 user_id: ctx.user_id,
716 trace_id: ctx.trace_id,
717 correlation_id: ctx.correlation_id,
718 attributes: ctx.attributes,
719 session_id: ctx.session_id,
720 flow_id: ctx.flow_id,
721 node_id: ctx.node_id,
722 provider_id: ctx.provider_id,
723 deadline_ms: ctx.deadline_ms,
724 attempt: ctx.attempt,
725 idempotency_key: ctx.idempotency_key,
726 impersonation: ctx
727 .impersonation
728 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
729 actor_id,
730 reason,
731 }),
732 });
733
734 self.send_http_request(legacy_req, opts, legacy_ctx)
735 .map(|resp| HttpResponseV1_1 {
736 status: resp.status,
737 headers: resp.headers,
738 body: resp.body,
739 })
740 .map_err(|err| HttpClientErrorV1_1 {
741 code: err.code,
742 message: err.message,
743 })
744 }
745}
746
747impl StateStoreHost for HostState {
748 fn read(
749 &mut self,
750 key: HostStateKey,
751 ctx: Option<StateTenantCtx>,
752 ) -> Result<Vec<u8>, StateError> {
753 let store = match self.state_store.as_ref() {
754 Some(store) => store.clone(),
755 None => {
756 return Err(StateError {
757 code: "unavailable".into(),
758 message: "state store not configured".into(),
759 });
760 }
761 };
762 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
763 Ok(ctx) => ctx,
764 Err(err) => {
765 return Err(StateError {
766 code: "invalid-ctx".into(),
767 message: err.to_string(),
768 });
769 }
770 };
771 #[cfg(feature = "fault-injection")]
772 {
773 let exec_ctx = self.exec_ctx.as_ref();
774 let flow_id = exec_ctx
775 .map(|ctx| ctx.flow_id.as_str())
776 .unwrap_or("unknown");
777 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
778 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
779 let fault_ctx = FaultContext {
780 pack_id: self.pack_id.as_str(),
781 flow_id,
782 node_id,
783 attempt,
784 };
785 if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
786 return Err(StateError {
787 code: "internal".into(),
788 message: err.to_string(),
789 });
790 }
791 }
792 let key = StoreStateKey::from(key);
793 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
794 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
795 Ok(None) => Err(StateError {
796 code: "not_found".into(),
797 message: "state key not found".into(),
798 }),
799 Err(err) => Err(StateError {
800 code: "internal".into(),
801 message: err.to_string(),
802 }),
803 }
804 }
805
806 fn write(
807 &mut self,
808 key: HostStateKey,
809 bytes: Vec<u8>,
810 ctx: Option<StateTenantCtx>,
811 ) -> Result<StateOpAck, StateError> {
812 let store = match self.state_store.as_ref() {
813 Some(store) => store.clone(),
814 None => {
815 return Err(StateError {
816 code: "unavailable".into(),
817 message: "state store not configured".into(),
818 });
819 }
820 };
821 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
822 Ok(ctx) => ctx,
823 Err(err) => {
824 return Err(StateError {
825 code: "invalid-ctx".into(),
826 message: err.to_string(),
827 });
828 }
829 };
830 #[cfg(feature = "fault-injection")]
831 {
832 let exec_ctx = self.exec_ctx.as_ref();
833 let flow_id = exec_ctx
834 .map(|ctx| ctx.flow_id.as_str())
835 .unwrap_or("unknown");
836 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
837 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
838 let fault_ctx = FaultContext {
839 pack_id: self.pack_id.as_str(),
840 flow_id,
841 node_id,
842 attempt,
843 };
844 if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
845 return Err(StateError {
846 code: "internal".into(),
847 message: err.to_string(),
848 });
849 }
850 }
851 let key = StoreStateKey::from(key);
852 let value = serde_json::from_slice(&bytes)
853 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
854 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
855 Ok(()) => Ok(StateOpAck::Ok),
856 Err(err) => Err(StateError {
857 code: "internal".into(),
858 message: err.to_string(),
859 }),
860 }
861 }
862
863 fn delete(
864 &mut self,
865 key: HostStateKey,
866 ctx: Option<StateTenantCtx>,
867 ) -> Result<StateOpAck, StateError> {
868 let store = match self.state_store.as_ref() {
869 Some(store) => store.clone(),
870 None => {
871 return Err(StateError {
872 code: "unavailable".into(),
873 message: "state store not configured".into(),
874 });
875 }
876 };
877 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
878 Ok(ctx) => ctx,
879 Err(err) => {
880 return Err(StateError {
881 code: "invalid-ctx".into(),
882 message: err.to_string(),
883 });
884 }
885 };
886 let key = StoreStateKey::from(key);
887 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
888 Ok(_) => Ok(StateOpAck::Ok),
889 Err(err) => Err(StateError {
890 code: "internal".into(),
891 message: err.to_string(),
892 }),
893 }
894 }
895}
896
897impl TelemetryLoggerHost for HostState {
898 fn log(
899 &mut self,
900 span: TelemetrySpanContext,
901 fields: Vec<(String, String)>,
902 _ctx: Option<TelemetryTenantCtx>,
903 ) -> Result<TelemetryAck, TelemetryError> {
904 if let Some(mock) = &self.mocks
905 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
906 {
907 return Ok(TelemetryAck::Ok);
908 }
909 let mut map = serde_json::Map::new();
910 for (k, v) in fields {
911 map.insert(k, Value::String(v));
912 }
913 tracing::info!(
914 tenant = %span.tenant,
915 flow_id = %span.flow_id,
916 node = ?span.node_id,
917 provider = %span.provider,
918 fields = %serde_json::Value::Object(map.clone()),
919 "telemetry log from pack"
920 );
921 Ok(TelemetryAck::Ok)
922 }
923}
924
925impl RunnerHostHttp for HostState {
926 fn request(
927 &mut self,
928 method: String,
929 url: String,
930 headers: Vec<String>,
931 body: Option<Vec<u8>>,
932 ) -> Result<Vec<u8>, String> {
933 let req = HttpRequest {
934 method,
935 url,
936 headers: headers
937 .chunks(2)
938 .filter_map(|chunk| {
939 if chunk.len() == 2 {
940 Some((chunk[0].clone(), chunk[1].clone()))
941 } else {
942 None
943 }
944 })
945 .collect(),
946 body,
947 };
948 match HttpClientHost::send(self, req, None) {
949 Ok(resp) => Ok(resp.body.unwrap_or_default()),
950 Err(err) => Err(err.message),
951 }
952 }
953}
954
955impl RunnerHostKv for HostState {
956 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
957 None
958 }
959
960 fn put(&mut self, _ns: String, _key: String, _val: String) {}
961}
962
963enum ManifestLoad {
964 New {
965 manifest: Box<greentic_types::PackManifest>,
966 flows: PackFlows,
967 },
968 Legacy {
969 manifest: Box<legacy_pack::PackManifest>,
970 flows: PackFlows,
971 },
972}
973
974fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
975 let mut archive = ZipArchive::new(File::open(path)?)
976 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
977 let bytes = read_entry(&mut archive, "manifest.cbor")
978 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
979 match decode_pack_manifest(&bytes) {
980 Ok(manifest) => {
981 let cache = PackFlows::from_manifest(manifest.clone());
982 Ok(ManifestLoad::New {
983 manifest: Box::new(manifest),
984 flows: cache,
985 })
986 }
987 Err(err) => {
988 tracing::debug!(
989 error = %err,
990 pack = %path.display(),
991 "decode_pack_manifest failed for archive; falling back to legacy manifest"
992 );
993 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
994 .context("failed to decode legacy pack manifest from manifest.cbor")?;
995 let flows = load_legacy_flows_from_archive(&mut archive, path, &legacy)?;
996 Ok(ManifestLoad::Legacy {
997 manifest: Box::new(legacy),
998 flows,
999 })
1000 }
1001 }
1002}
1003
1004fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
1005 let manifest_path = root.join("manifest.cbor");
1006 let bytes = std::fs::read(&manifest_path)
1007 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
1008 match decode_pack_manifest(&bytes) {
1009 Ok(manifest) => {
1010 let cache = PackFlows::from_manifest(manifest.clone());
1011 Ok(ManifestLoad::New {
1012 manifest: Box::new(manifest),
1013 flows: cache,
1014 })
1015 }
1016 Err(err) => {
1017 tracing::debug!(
1018 error = %err,
1019 pack = %root.display(),
1020 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
1021 );
1022 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1023 .context("failed to decode legacy pack manifest from manifest.cbor")?;
1024 let flows = load_legacy_flows_from_dir(root, &legacy)?;
1025 Ok(ManifestLoad::Legacy {
1026 manifest: Box::new(legacy),
1027 flows,
1028 })
1029 }
1030 }
1031}
1032
1033fn load_legacy_flows_from_dir(
1034 root: &Path,
1035 manifest: &legacy_pack::PackManifest,
1036) -> Result<PackFlows> {
1037 build_legacy_flows(manifest, |rel_path| {
1038 let path = root.join(rel_path);
1039 std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
1040 })
1041}
1042
1043fn load_legacy_flows_from_archive(
1044 archive: &mut ZipArchive<File>,
1045 pack_path: &Path,
1046 manifest: &legacy_pack::PackManifest,
1047) -> Result<PackFlows> {
1048 build_legacy_flows(manifest, |rel_path| {
1049 read_entry(archive, rel_path)
1050 .with_context(|| format!("missing flow json {} in {}", rel_path, pack_path.display()))
1051 })
1052}
1053
1054fn build_legacy_flows(
1055 manifest: &legacy_pack::PackManifest,
1056 mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1057) -> Result<PackFlows> {
1058 let mut flows = HashMap::new();
1059 let mut descriptors = Vec::new();
1060
1061 for entry in &manifest.flows {
1062 let bytes = read_json(&entry.file_json)
1063 .with_context(|| format!("missing flow json {}", entry.file_json))?;
1064 let doc = parse_flow_doc_with_legacy_aliases(&bytes, &entry.file_json)?;
1065 let normalized = normalize_flow_doc(doc);
1066 let flow_ir = flow_doc_to_ir(normalized)?;
1067 let flow = flow_ir_to_flow(flow_ir)?;
1068
1069 descriptors.push(FlowDescriptor {
1070 id: entry.id.clone(),
1071 flow_type: entry.kind.clone(),
1072 pack_id: manifest.meta.pack_id.clone(),
1073 profile: manifest.meta.pack_id.clone(),
1074 version: manifest.meta.version.to_string(),
1075 description: None,
1076 });
1077 flows.insert(entry.id.clone(), flow);
1078 }
1079
1080 let mut entry_flows = manifest.meta.entry_flows.clone();
1081 if entry_flows.is_empty() {
1082 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1083 }
1084 let metadata = PackMetadata {
1085 pack_id: manifest.meta.pack_id.clone(),
1086 version: manifest.meta.version.to_string(),
1087 entry_flows,
1088 secret_requirements: Vec::new(),
1089 };
1090
1091 Ok(PackFlows {
1092 descriptors,
1093 flows,
1094 metadata,
1095 })
1096}
1097
1098fn parse_flow_doc_with_legacy_aliases(bytes: &[u8], path: &str) -> Result<FlowDoc> {
1099 let mut value: Value = serde_json::from_slice(bytes)
1100 .with_context(|| format!("failed to decode flow doc {}", path))?;
1101 if let Some(map) = value.as_object_mut()
1102 && !map.contains_key("type")
1103 && let Some(flow_type) = map.remove("flow_type")
1104 {
1105 map.insert("type".to_string(), flow_type);
1106 }
1107 serde_json::from_value(value).with_context(|| format!("failed to decode flow doc {}", path))
1108}
1109
1110pub struct ComponentState {
1111 pub host: HostState,
1112 wasi_ctx: WasiCtx,
1113 resource_table: ResourceTable,
1114}
1115
1116impl ComponentState {
1117 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1118 let wasi_ctx = policy
1119 .instantiate()
1120 .context("failed to build WASI context")?;
1121 Ok(Self {
1122 host,
1123 wasi_ctx,
1124 resource_table: ResourceTable::new(),
1125 })
1126 }
1127
1128 fn host_mut(&mut self) -> &mut HostState {
1129 &mut self.host
1130 }
1131
1132 fn should_cancel_host(&mut self) -> bool {
1133 false
1134 }
1135
1136 fn yield_now_host(&mut self) {
1137 }
1139}
1140
1141impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1142 fn should_cancel(&mut self) -> bool {
1143 self.should_cancel_host()
1144 }
1145
1146 fn yield_now(&mut self) {
1147 self.yield_now_host();
1148 }
1149}
1150
1151impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1152 fn should_cancel(&mut self) -> bool {
1153 self.should_cancel_host()
1154 }
1155
1156 fn yield_now(&mut self) {
1157 self.yield_now_host();
1158 }
1159}
1160
1161fn add_component_control_instance(
1162 linker: &mut Linker<ComponentState>,
1163 name: &str,
1164) -> wasmtime::Result<()> {
1165 let mut inst = linker.instance(name)?;
1166 inst.func_wrap(
1167 "should-cancel",
1168 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1169 let host = caller.data_mut();
1170 Ok((host.should_cancel_host(),))
1171 },
1172 )?;
1173 inst.func_wrap(
1174 "yield-now",
1175 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1176 let host = caller.data_mut();
1177 host.yield_now_host();
1178 Ok(())
1179 },
1180 )?;
1181 Ok(())
1182}
1183
1184fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1185 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1186 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1187 Ok(())
1188}
1189
1190pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1191 add_wasi_to_linker(linker)?;
1192 add_all_v1_to_linker(
1193 linker,
1194 HostFns {
1195 http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1196 http_client: Some(|state: &mut ComponentState| state.host_mut()),
1197 oauth_broker: None,
1198 runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1199 runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1200 telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1201 state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1202 secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1203 secrets_store: None,
1204 },
1205 )?;
1206 add_http_client_client_world_aliases(linker)?;
1207 Ok(())
1208}
1209
1210fn add_http_client_client_world_aliases(linker: &mut Linker<ComponentState>) -> Result<()> {
1211 let mut inst_v1_1 = linker.instance("greentic:http/client@1.1.0")?;
1212 inst_v1_1.func_wrap(
1213 "send",
1214 move |mut caller: StoreContextMut<'_, ComponentState>,
1215 (req, opts, ctx): (
1216 http_client_client_alias::Request,
1217 Option<http_client_client_alias::RequestOptions>,
1218 Option<http_client_client_alias::TenantCtx>,
1219 )| {
1220 let host = caller.data_mut().host_mut();
1221 let result = HttpClientHostV1_1::send(
1222 host,
1223 alias_request_to_host(req),
1224 opts.map(alias_request_options_to_host),
1225 ctx.map(alias_tenant_ctx_to_host),
1226 );
1227 Ok((match result {
1228 Ok(resp) => Ok(alias_response_from_host(resp)),
1229 Err(err) => Err(alias_error_from_host(err)),
1230 },))
1231 },
1232 )?;
1233 let mut inst_v1_0 = linker.instance("greentic:http/client@1.0.0")?;
1234 inst_v1_0.func_wrap(
1235 "send",
1236 move |mut caller: StoreContextMut<'_, ComponentState>,
1237 (req, ctx): (
1238 host_http_client::Request,
1239 Option<host_http_client::TenantCtx>,
1240 )| {
1241 let host = caller.data_mut().host_mut();
1242 let result = HttpClientHost::send(host, req, ctx);
1243 Ok((result,))
1244 },
1245 )?;
1246 Ok(())
1247}
1248
1249fn alias_request_to_host(req: http_client_client_alias::Request) -> host_http_client::RequestV1_1 {
1250 host_http_client::RequestV1_1 {
1251 method: req.method,
1252 url: req.url,
1253 headers: req.headers,
1254 body: req.body,
1255 }
1256}
1257
1258fn alias_request_options_to_host(
1259 opts: http_client_client_alias::RequestOptions,
1260) -> host_http_client::RequestOptionsV1_1 {
1261 host_http_client::RequestOptionsV1_1 {
1262 timeout_ms: opts.timeout_ms,
1263 allow_insecure: opts.allow_insecure,
1264 follow_redirects: opts.follow_redirects,
1265 }
1266}
1267
1268fn alias_tenant_ctx_to_host(
1269 ctx: http_client_client_alias::TenantCtx,
1270) -> host_http_client::TenantCtxV1_1 {
1271 host_http_client::TenantCtxV1_1 {
1272 env: ctx.env,
1273 tenant: ctx.tenant,
1274 tenant_id: ctx.tenant_id,
1275 team: ctx.team,
1276 team_id: ctx.team_id,
1277 user: ctx.user,
1278 user_id: ctx.user_id,
1279 trace_id: ctx.trace_id,
1280 correlation_id: ctx.correlation_id,
1281 attributes: ctx.attributes,
1282 session_id: ctx.session_id,
1283 flow_id: ctx.flow_id,
1284 node_id: ctx.node_id,
1285 provider_id: ctx.provider_id,
1286 deadline_ms: ctx.deadline_ms,
1287 attempt: ctx.attempt,
1288 idempotency_key: ctx.idempotency_key,
1289 impersonation: ctx.impersonation.map(|imp| ImpersonationV1_1 {
1290 actor_id: imp.actor_id,
1291 reason: imp.reason,
1292 }),
1293 }
1294}
1295
1296fn alias_response_from_host(
1297 resp: host_http_client::ResponseV1_1,
1298) -> http_client_client_alias::Response {
1299 http_client_client_alias::Response {
1300 status: resp.status,
1301 headers: resp.headers,
1302 body: resp.body,
1303 }
1304}
1305
1306fn alias_error_from_host(
1307 err: host_http_client::HttpClientErrorV1_1,
1308) -> http_client_client_alias::HostError {
1309 http_client_client_alias::HostError {
1310 code: err.code,
1311 message: err.message,
1312 }
1313}
1314
1315impl OAuthHostContext for ComponentState {
1316 fn tenant_id(&self) -> &str {
1317 &self.host.config.tenant
1318 }
1319
1320 fn env(&self) -> &str {
1321 &self.host.default_env
1322 }
1323
1324 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1325 &mut self.host.oauth_host
1326 }
1327
1328 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1329 self.host.oauth_config.as_ref()
1330 }
1331}
1332
1333impl WasiView for ComponentState {
1334 fn ctx(&mut self) -> WasiCtxView<'_> {
1335 WasiCtxView {
1336 ctx: &mut self.wasi_ctx,
1337 table: &mut self.resource_table,
1338 }
1339 }
1340}
1341
1342#[allow(unsafe_code)]
1343unsafe impl Send for ComponentState {}
1344#[allow(unsafe_code)]
1345unsafe impl Sync for ComponentState {}
1346
1347impl PackRuntime {
1348 fn allows_state_store(&self, component_ref: &str) -> bool {
1349 if self.state_store.is_none() {
1350 return false;
1351 }
1352 if !self.config.state_store_policy.allow {
1353 return false;
1354 }
1355 let Some(manifest) = self.component_manifests.get(component_ref) else {
1356 return false;
1357 };
1358 manifest
1359 .capabilities
1360 .host
1361 .state
1362 .as_ref()
1363 .map(|caps| caps.read || caps.write)
1364 .unwrap_or(false)
1365 }
1366
1367 pub fn contains_component(&self, component_ref: &str) -> bool {
1368 self.components.contains_key(component_ref)
1369 }
1370
1371 #[allow(clippy::too_many_arguments)]
1372 pub async fn load(
1373 path: impl AsRef<Path>,
1374 config: Arc<HostConfig>,
1375 mocks: Option<Arc<MockLayer>>,
1376 archive_source: Option<&Path>,
1377 session_store: Option<DynSessionStore>,
1378 state_store: Option<DynStateStore>,
1379 wasi_policy: Arc<RunnerWasiPolicy>,
1380 secrets: DynSecretsManager,
1381 oauth_config: Option<OAuthBrokerConfig>,
1382 verify_archive: bool,
1383 component_resolution: ComponentResolution,
1384 ) -> Result<Self> {
1385 let path = path.as_ref();
1386 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1387 let path_meta = std::fs::metadata(&safe_path).ok();
1388 let is_dir = path_meta
1389 .as_ref()
1390 .map(|meta| meta.is_dir())
1391 .unwrap_or(false);
1392 let is_component = !is_dir
1393 && safe_path
1394 .extension()
1395 .and_then(|ext| ext.to_str())
1396 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1397 .unwrap_or(false);
1398 let archive_hint_path = if let Some(source) = archive_source {
1399 let (_, normalized) = normalize_pack_path(source)?;
1400 Some(normalized)
1401 } else if is_component || is_dir {
1402 None
1403 } else {
1404 Some(safe_path.clone())
1405 };
1406 let archive_hint = archive_hint_path.as_deref();
1407 if verify_archive {
1408 if let Some(verify_target) = archive_hint.and_then(|p| {
1409 std::fs::metadata(p)
1410 .ok()
1411 .filter(|meta| meta.is_file())
1412 .map(|_| p)
1413 }) {
1414 verify::verify_pack(verify_target).await?;
1415 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1416 } else {
1417 tracing::debug!("skipping archive verification (no archive source)");
1418 }
1419 }
1420 let engine = Engine::default();
1421 let engine_profile =
1422 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1423 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1424 let mut metadata = PackMetadata::fallback(&safe_path);
1425 let mut manifest = None;
1426 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1427 let mut flows = None;
1428 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1429 if is_dir {
1430 Some(safe_path.clone())
1431 } else {
1432 None
1433 }
1434 });
1435 let (pack_assets_dir, assets_tempdir) =
1436 locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1437 let setup_yaml_exists = pack_assets_dir
1438 .as_ref()
1439 .map(|dir| dir.join("setup.yaml").is_file())
1440 .unwrap_or(false);
1441 tracing::info!(
1442 pack_root = %safe_path.display(),
1443 assets_setup_yaml_exists = setup_yaml_exists,
1444 "pack unpack metadata"
1445 );
1446
1447 if let Some(root) = materialized_root.as_ref() {
1448 match load_manifest_and_flows_from_dir(root) {
1449 Ok(ManifestLoad::New {
1450 manifest: m,
1451 flows: cache,
1452 }) => {
1453 metadata = cache.metadata.clone();
1454 manifest = Some(*m);
1455 flows = Some(cache);
1456 }
1457 Ok(ManifestLoad::Legacy {
1458 manifest: m,
1459 flows: cache,
1460 }) => {
1461 metadata = cache.metadata.clone();
1462 legacy_manifest = Some(m);
1463 flows = Some(cache);
1464 }
1465 Err(err) => {
1466 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1467 }
1468 }
1469 }
1470
1471 if manifest.is_none()
1472 && legacy_manifest.is_none()
1473 && let Some(archive_path) = archive_hint
1474 {
1475 let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1476 format!(
1477 "failed to load manifest.cbor from {}",
1478 archive_path.display()
1479 )
1480 })?;
1481 match manifest_load {
1482 ManifestLoad::New {
1483 manifest: m,
1484 flows: cache,
1485 } => {
1486 metadata = cache.metadata.clone();
1487 manifest = Some(*m);
1488 flows = Some(cache);
1489 }
1490 ManifestLoad::Legacy {
1491 manifest: m,
1492 flows: cache,
1493 } => {
1494 metadata = cache.metadata.clone();
1495 legacy_manifest = Some(m);
1496 flows = Some(cache);
1497 }
1498 }
1499 }
1500 #[cfg(feature = "fault-injection")]
1501 {
1502 let fault_ctx = FaultContext {
1503 pack_id: metadata.pack_id.as_str(),
1504 flow_id: "unknown",
1505 node_id: None,
1506 attempt: 1,
1507 };
1508 maybe_fail(FaultPoint::PackResolve, fault_ctx)
1509 .map_err(|err| anyhow!(err.to_string()))?;
1510 }
1511 let mut pack_lock = None;
1512 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1513 pack_lock = load_pack_lock(&root)?;
1514 if pack_lock.is_some() {
1515 break;
1516 }
1517 }
1518 let component_sources_payload = if pack_lock.is_none() {
1519 if let Some(manifest) = manifest.as_ref() {
1520 manifest
1521 .get_component_sources_v1()
1522 .context("invalid component sources extension")?
1523 } else {
1524 None
1525 }
1526 } else {
1527 None
1528 };
1529 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1530 Some(component_sources_table_from_pack_lock(
1531 lock,
1532 component_resolution.allow_missing_hash,
1533 )?)
1534 } else {
1535 component_sources_table(component_sources_payload.as_ref())?
1536 };
1537 let components = if is_component {
1538 let wasm_bytes = fs::read(&safe_path).await?;
1539 metadata = PackMetadata::from_wasm(&wasm_bytes)
1540 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1541 let name = safe_path
1542 .file_stem()
1543 .map(|s| s.to_string_lossy().to_string())
1544 .unwrap_or_else(|| "component".to_string());
1545 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1546 let mut map = HashMap::new();
1547 map.insert(
1548 name.clone(),
1549 PackComponent {
1550 name,
1551 version: metadata.version.clone(),
1552 component,
1553 },
1554 );
1555 map
1556 } else {
1557 let specs = component_specs(
1558 manifest.as_ref(),
1559 legacy_manifest.as_deref(),
1560 component_sources_payload.as_ref(),
1561 pack_lock.as_ref(),
1562 );
1563 if specs.is_empty() {
1564 HashMap::new()
1565 } else {
1566 let mut loaded = HashMap::new();
1567 let mut missing: HashSet<String> =
1568 specs.iter().map(|spec| spec.id.clone()).collect();
1569 let mut searched = Vec::new();
1570
1571 if !component_resolution.overrides.is_empty() {
1572 load_components_from_overrides(
1573 &cache,
1574 &engine,
1575 &component_resolution.overrides,
1576 &specs,
1577 &mut missing,
1578 &mut loaded,
1579 )
1580 .await?;
1581 searched.push("override map".to_string());
1582 }
1583
1584 if let Some(component_sources) = component_sources.as_ref() {
1585 load_components_from_sources(
1586 &cache,
1587 &engine,
1588 component_sources,
1589 &component_resolution,
1590 &specs,
1591 &mut missing,
1592 &mut loaded,
1593 materialized_root.as_deref(),
1594 archive_hint,
1595 )
1596 .await?;
1597 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1598 }
1599
1600 if let Some(root) = materialized_root.as_ref() {
1601 load_components_from_dir(
1602 &cache,
1603 &engine,
1604 root,
1605 &specs,
1606 &mut missing,
1607 &mut loaded,
1608 )
1609 .await?;
1610 searched.push(format!("components dir {}", root.display()));
1611 }
1612
1613 if let Some(archive_path) = archive_hint {
1614 load_components_from_archive(
1615 &cache,
1616 &engine,
1617 archive_path,
1618 &specs,
1619 &mut missing,
1620 &mut loaded,
1621 )
1622 .await?;
1623 searched.push(format!("archive {}", archive_path.display()));
1624 }
1625
1626 if !missing.is_empty() {
1627 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1628 let sources = if searched.is_empty() {
1629 "no component sources".to_string()
1630 } else {
1631 searched.join(", ")
1632 };
1633 bail!(
1634 "components missing: {}; looked in {}",
1635 missing_list,
1636 sources
1637 );
1638 }
1639
1640 loaded
1641 }
1642 };
1643 let http_client = Arc::clone(&HTTP_CLIENT);
1644 let mut component_manifests = HashMap::new();
1645 if let Some(manifest) = manifest.as_ref() {
1646 for component in &manifest.components {
1647 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1648 }
1649 }
1650 let mut pack_policy = (*wasi_policy).clone();
1651 if let Some(dir) = pack_assets_dir {
1652 tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
1653 pack_policy =
1654 pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
1655 }
1656 let wasi_policy = Arc::new(pack_policy);
1657 Ok(Self {
1658 path: safe_path,
1659 archive_path: archive_hint.map(Path::to_path_buf),
1660 config,
1661 engine,
1662 metadata,
1663 manifest,
1664 legacy_manifest,
1665 component_manifests,
1666 mocks,
1667 flows,
1668 components,
1669 http_client,
1670 pre_cache: Mutex::new(HashMap::new()),
1671 session_store,
1672 state_store,
1673 wasi_policy,
1674 assets_tempdir,
1675 provider_registry: RwLock::new(None),
1676 secrets,
1677 oauth_config,
1678 cache,
1679 })
1680 }
1681
1682 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1683 if let Some(cache) = &self.flows {
1684 return Ok(cache.descriptors.clone());
1685 }
1686 if let Some(manifest) = &self.manifest {
1687 let descriptors = manifest
1688 .flows
1689 .iter()
1690 .map(|flow| FlowDescriptor {
1691 id: flow.id.as_str().to_string(),
1692 flow_type: flow_kind_to_str(flow.kind).to_string(),
1693 pack_id: manifest.pack_id.as_str().to_string(),
1694 profile: manifest.pack_id.as_str().to_string(),
1695 version: manifest.version.to_string(),
1696 description: None,
1697 })
1698 .collect();
1699 return Ok(descriptors);
1700 }
1701 Ok(Vec::new())
1702 }
1703
1704 #[allow(dead_code)]
1705 pub async fn run_flow(
1706 &self,
1707 flow_id: &str,
1708 input: serde_json::Value,
1709 ) -> Result<serde_json::Value> {
1710 let pack = Arc::new(
1711 PackRuntime::load(
1712 &self.path,
1713 Arc::clone(&self.config),
1714 self.mocks.clone(),
1715 self.archive_path.as_deref(),
1716 self.session_store.clone(),
1717 self.state_store.clone(),
1718 Arc::clone(&self.wasi_policy),
1719 self.secrets.clone(),
1720 self.oauth_config.clone(),
1721 false,
1722 ComponentResolution::default(),
1723 )
1724 .await?,
1725 );
1726
1727 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1728 let retry_config = self.config.retry_config().into();
1729 let mocks = pack.mocks.as_deref();
1730 let tenant = self.config.tenant.as_str();
1731
1732 let ctx = FlowContext {
1733 tenant,
1734 pack_id: pack.metadata().pack_id.as_str(),
1735 flow_id,
1736 node_id: None,
1737 tool: None,
1738 action: None,
1739 session_id: None,
1740 provider_id: None,
1741 retry_config,
1742 attempt: 1,
1743 observer: None,
1744 mocks,
1745 };
1746
1747 let execution = engine.execute(ctx, input).await?;
1748 match execution.status {
1749 FlowStatus::Completed => Ok(execution.output),
1750 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1751 "status": "pending",
1752 "reason": wait.reason,
1753 "resume": wait.snapshot,
1754 "response": execution.output,
1755 })),
1756 }
1757 }
1758
1759 pub async fn invoke_component(
1760 &self,
1761 component_ref: &str,
1762 ctx: ComponentExecCtx,
1763 operation: &str,
1764 _config_json: Option<String>,
1765 input_json: String,
1766 ) -> Result<Value> {
1767 let pack_component = self
1768 .components
1769 .get(component_ref)
1770 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1771 let engine = self.engine.clone();
1772 let config = Arc::clone(&self.config);
1773 let http_client = Arc::clone(&self.http_client);
1774 let mocks = self.mocks.clone();
1775 let session_store = self.session_store.clone();
1776 let state_store = self.state_store.clone();
1777 let secrets = Arc::clone(&self.secrets);
1778 let oauth_config = self.oauth_config.clone();
1779 let wasi_policy = Arc::clone(&self.wasi_policy);
1780 let pack_id = self.metadata().pack_id.clone();
1781 let allow_state_store = self.allows_state_store(component_ref);
1782 let component = pack_component.component.clone();
1783 let component_ref_owned = component_ref.to_string();
1784 let operation_owned = operation.to_string();
1785 let input_owned = input_json;
1786 let ctx_owned = ctx;
1787
1788 run_on_wasi_thread("component.invoke", move || {
1789 let mut linker = Linker::new(&engine);
1790 register_all(&mut linker, allow_state_store)?;
1791 add_component_control_to_linker(&mut linker)?;
1792
1793 let host_state = HostState::new(
1794 pack_id.clone(),
1795 config,
1796 http_client,
1797 mocks,
1798 session_store,
1799 state_store,
1800 secrets,
1801 oauth_config,
1802 Some(ctx_owned.clone()),
1803 Some(component_ref_owned.clone()),
1804 false,
1805 )?;
1806 let store_state = ComponentState::new(host_state, wasi_policy)?;
1807 let mut store = wasmtime::Store::new(&engine, store_state);
1808
1809 let invoke_result = HostState::instantiate_component_result(
1810 &mut linker,
1811 &mut store,
1812 &component,
1813 &ctx_owned,
1814 &operation_owned,
1815 &input_owned,
1816 )?;
1817 HostState::convert_invoke_result(invoke_result)
1818 })
1819 }
1820
1821 pub fn resolve_provider(
1822 &self,
1823 provider_id: Option<&str>,
1824 provider_type: Option<&str>,
1825 ) -> Result<ProviderBinding> {
1826 let registry = self.provider_registry()?;
1827 registry.resolve(provider_id, provider_type)
1828 }
1829
1830 pub async fn invoke_provider(
1831 &self,
1832 binding: &ProviderBinding,
1833 ctx: ComponentExecCtx,
1834 op: &str,
1835 input_json: Vec<u8>,
1836 ) -> Result<Value> {
1837 let component_ref_owned = binding.component_ref.clone();
1838 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
1839 format!("provider component '{component_ref_owned}' not found in pack")
1840 })?;
1841 let component = pack_component.component.clone();
1842
1843 let engine = self.engine.clone();
1844 let config = Arc::clone(&self.config);
1845 let http_client = Arc::clone(&self.http_client);
1846 let mocks = self.mocks.clone();
1847 let session_store = self.session_store.clone();
1848 let state_store = self.state_store.clone();
1849 let secrets = Arc::clone(&self.secrets);
1850 let oauth_config = self.oauth_config.clone();
1851 let wasi_policy = Arc::clone(&self.wasi_policy);
1852 let pack_id = self.metadata().pack_id.clone();
1853 let allow_state_store = self.allows_state_store(&component_ref_owned);
1854 let input_owned = input_json;
1855 let op_owned = op.to_string();
1856 let ctx_owned = ctx;
1857 let world = binding.world.clone();
1858
1859 run_on_wasi_thread("provider.invoke", move || {
1860 let mut linker = Linker::new(&engine);
1861 register_all(&mut linker, allow_state_store)?;
1862 add_component_control_to_linker(&mut linker)?;
1863 let mut pre_instance = Some(linker.instantiate_pre(component.as_ref())?);
1864 let host_state = HostState::new(
1865 pack_id.clone(),
1866 config,
1867 http_client,
1868 mocks,
1869 session_store,
1870 state_store,
1871 secrets,
1872 oauth_config,
1873 Some(ctx_owned.clone()),
1874 Some(component_ref_owned.clone()),
1875 true,
1876 )?;
1877 let store_state = ComponentState::new(host_state, wasi_policy)?;
1878 let mut store = wasmtime::Store::new(&engine, store_state);
1879 let use_schema_core =
1880 world.contains("provider-schema-core") || world.contains("provider/schema-core");
1881 let result = if use_schema_core {
1882 let pre_instance = pre_instance
1883 .take()
1884 .ok_or_else(|| anyhow!("provider pre_instance already consumed"))?;
1885 let pre: SchemaSchemaCorePre<ComponentState> =
1886 SchemaSchemaCorePre::new(pre_instance)?;
1887 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
1888 let provider = bindings.greentic_provider_schema_core_schema_core_api();
1889 provider.call_invoke(&mut store, &op_owned, &input_owned)?
1890 } else {
1891 let pre_instance = pre_instance
1892 .take()
1893 .ok_or_else(|| anyhow!("provider pre_instance already consumed"))?;
1894 let pre: LegacySchemaCorePre<ComponentState> =
1895 LegacySchemaCorePre::new(pre_instance)?;
1896 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
1897 let provider = bindings.greentic_provider_core_schema_core_api();
1898 provider.call_invoke(&mut store, &op_owned, &input_owned)?
1899 };
1900 deserialize_json_bytes(result)
1901 })
1902 }
1903
1904 pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
1905 if let Some(registry) = self.provider_registry.read().clone() {
1906 return Ok(registry);
1907 }
1908 let manifest = self
1909 .manifest
1910 .as_ref()
1911 .context("pack manifest required for provider resolution")?;
1912 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1913 let registry = ProviderRegistry::new(
1914 manifest,
1915 self.state_store.clone(),
1916 &self.config.tenant,
1917 &env,
1918 )?;
1919 *self.provider_registry.write() = Some(registry.clone());
1920 Ok(registry)
1921 }
1922
1923 pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
1924 if self.manifest.is_none() {
1925 return Ok(None);
1926 }
1927 Ok(Some(self.provider_registry()?))
1928 }
1929
1930 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1931 if let Some(cache) = &self.flows {
1932 return cache
1933 .flows
1934 .get(flow_id)
1935 .cloned()
1936 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1937 }
1938 if let Some(manifest) = &self.manifest {
1939 let entry = manifest
1940 .flows
1941 .iter()
1942 .find(|f| f.id.as_str() == flow_id)
1943 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1944 return Ok(entry.flow.clone());
1945 }
1946 bail!("flow '{flow_id}' not available (pack exports disabled)")
1947 }
1948
1949 pub fn metadata(&self) -> &PackMetadata {
1950 &self.metadata
1951 }
1952
1953 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1954 &self.metadata.secret_requirements
1955 }
1956
1957 pub fn missing_secrets(
1958 &self,
1959 tenant_ctx: &TypesTenantCtx,
1960 ) -> Vec<greentic_types::SecretRequirement> {
1961 let env = tenant_ctx.env.as_str().to_string();
1962 let tenant = tenant_ctx.tenant.as_str().to_string();
1963 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1964 self.required_secrets()
1965 .iter()
1966 .filter(|req| {
1967 if let Some(scope) = &req.scope {
1969 if scope.env != env {
1970 return false;
1971 }
1972 if scope.tenant != tenant {
1973 return false;
1974 }
1975 if let Some(ref team_req) = scope.team
1976 && team.as_ref() != Some(team_req)
1977 {
1978 return false;
1979 }
1980 }
1981 let ctx = self.config.tenant_ctx();
1982 read_secret_blocking(
1983 &self.secrets,
1984 &ctx,
1985 &self.metadata.pack_id,
1986 req.key.as_str(),
1987 )
1988 .is_err()
1989 })
1990 .cloned()
1991 .collect()
1992 }
1993
1994 pub fn for_component_test(
1995 components: Vec<(String, PathBuf)>,
1996 flows: HashMap<String, FlowIR>,
1997 pack_id: &str,
1998 config: Arc<HostConfig>,
1999 ) -> Result<Self> {
2000 let engine = Engine::default();
2001 let engine_profile =
2002 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2003 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
2004 let mut component_map = HashMap::new();
2005 for (name, path) in components {
2006 if !path.exists() {
2007 bail!("component artifact missing: {}", path.display());
2008 }
2009 let wasm_bytes = std::fs::read(&path)?;
2010 let component = Arc::new(
2011 Component::from_binary(&engine, &wasm_bytes)
2012 .with_context(|| format!("failed to compile component {}", path.display()))?,
2013 );
2014 component_map.insert(
2015 name.clone(),
2016 PackComponent {
2017 name,
2018 version: "0.0.0".into(),
2019 component,
2020 },
2021 );
2022 }
2023
2024 let mut flow_map = HashMap::new();
2025 let mut descriptors = Vec::new();
2026 for (id, ir) in flows {
2027 let flow_type = ir.flow_type.clone();
2028 let flow = flow_ir_to_flow(ir)?;
2029 flow_map.insert(id.clone(), flow);
2030 descriptors.push(FlowDescriptor {
2031 id: id.clone(),
2032 flow_type,
2033 pack_id: pack_id.to_string(),
2034 profile: "test".into(),
2035 version: "0.0.0".into(),
2036 description: None,
2037 });
2038 }
2039 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
2040 let metadata = PackMetadata {
2041 pack_id: pack_id.to_string(),
2042 version: "0.0.0".into(),
2043 entry_flows,
2044 secret_requirements: Vec::new(),
2045 };
2046 let flows_cache = PackFlows {
2047 descriptors: descriptors.clone(),
2048 flows: flow_map,
2049 metadata: metadata.clone(),
2050 };
2051
2052 Ok(Self {
2053 path: PathBuf::new(),
2054 archive_path: None,
2055 config,
2056 engine,
2057 metadata,
2058 manifest: None,
2059 legacy_manifest: None,
2060 component_manifests: HashMap::new(),
2061 mocks: None,
2062 flows: Some(flows_cache),
2063 components: component_map,
2064 http_client: Arc::clone(&HTTP_CLIENT),
2065 pre_cache: Mutex::new(HashMap::new()),
2066 session_store: None,
2067 state_store: None,
2068 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
2069 assets_tempdir: None,
2070 provider_registry: RwLock::new(None),
2071 secrets: crate::secrets::default_manager()?,
2072 oauth_config: None,
2073 cache,
2074 })
2075 }
2076}
2077
2078fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
2079 let message = err.to_string();
2080 message.contains("no exported instance named")
2081 && message.contains(&format!("greentic:component/node@{version}"))
2082}
2083
2084struct PackFlows {
2085 descriptors: Vec<FlowDescriptor>,
2086 flows: HashMap<String, Flow>,
2087 metadata: PackMetadata,
2088}
2089
2090const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
2091 "greentic.pack.runtime_flow",
2092 "greentic.pack.flow_runtime",
2093 "greentic.pack.runtime_flows",
2094];
2095
2096#[derive(Debug, Deserialize)]
2097struct RuntimeFlowBundle {
2098 flows: Vec<RuntimeFlow>,
2099}
2100
2101#[derive(Debug, Deserialize)]
2102struct RuntimeFlow {
2103 id: String,
2104 #[serde(alias = "flow_type")]
2105 kind: FlowKind,
2106 #[serde(default)]
2107 schema_version: Option<String>,
2108 #[serde(default)]
2109 start: Option<String>,
2110 #[serde(default)]
2111 entrypoints: BTreeMap<String, Value>,
2112 nodes: BTreeMap<String, RuntimeNode>,
2113 #[serde(default)]
2114 metadata: Option<FlowMetadata>,
2115}
2116
2117#[derive(Debug, Deserialize)]
2118struct RuntimeNode {
2119 #[serde(alias = "component")]
2120 component_id: String,
2121 #[serde(default, alias = "operation")]
2122 operation_name: Option<String>,
2123 #[serde(default, alias = "payload", alias = "input")]
2124 operation_payload: Value,
2125 #[serde(default)]
2126 routing: Option<Routing>,
2127 #[serde(default)]
2128 telemetry: Option<TelemetryHints>,
2129}
2130
2131fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
2132 if bytes.is_empty() {
2133 return Ok(Value::Null);
2134 }
2135 serde_json::from_slice(&bytes).or_else(|_| {
2136 String::from_utf8(bytes)
2137 .map(Value::String)
2138 .map_err(|err| anyhow!(err))
2139 })
2140}
2141
2142impl PackFlows {
2143 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
2144 if let Some(flows) = flows_from_runtime_extension(&manifest) {
2145 return flows;
2146 }
2147 let descriptors = manifest
2148 .flows
2149 .iter()
2150 .map(|entry| FlowDescriptor {
2151 id: entry.id.as_str().to_string(),
2152 flow_type: flow_kind_to_str(entry.kind).to_string(),
2153 pack_id: manifest.pack_id.as_str().to_string(),
2154 profile: manifest.pack_id.as_str().to_string(),
2155 version: manifest.version.to_string(),
2156 description: None,
2157 })
2158 .collect();
2159 let mut flows = HashMap::new();
2160 for entry in &manifest.flows {
2161 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
2162 }
2163 Self {
2164 metadata: PackMetadata::from_manifest(&manifest),
2165 descriptors,
2166 flows,
2167 }
2168 }
2169}
2170
2171fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
2172 let extensions = manifest.extensions.as_ref()?;
2173 let extension = extensions.iter().find_map(|(key, ext)| {
2174 if RUNTIME_FLOW_EXTENSION_IDS
2175 .iter()
2176 .any(|candidate| candidate == key)
2177 {
2178 Some(ext)
2179 } else {
2180 None
2181 }
2182 })?;
2183 let runtime_flows = match decode_runtime_flow_extension(extension) {
2184 Some(flows) if !flows.is_empty() => flows,
2185 _ => return None,
2186 };
2187
2188 let descriptors = runtime_flows
2189 .iter()
2190 .map(|flow| FlowDescriptor {
2191 id: flow.id.as_str().to_string(),
2192 flow_type: flow_kind_to_str(flow.kind).to_string(),
2193 pack_id: manifest.pack_id.as_str().to_string(),
2194 profile: manifest.pack_id.as_str().to_string(),
2195 version: manifest.version.to_string(),
2196 description: None,
2197 })
2198 .collect::<Vec<_>>();
2199 let flows = runtime_flows
2200 .into_iter()
2201 .map(|flow| (flow.id.as_str().to_string(), flow))
2202 .collect();
2203
2204 Some(PackFlows {
2205 metadata: PackMetadata::from_manifest(manifest),
2206 descriptors,
2207 flows,
2208 })
2209}
2210
2211fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
2212 let value = match extension.inline.as_ref()? {
2213 ExtensionInline::Other(value) => value.clone(),
2214 _ => return None,
2215 };
2216
2217 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
2218 return Some(collect_runtime_flows(bundle.flows));
2219 }
2220
2221 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
2222 return Some(collect_runtime_flows(flows));
2223 }
2224
2225 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
2226 return Some(flows);
2227 }
2228
2229 warn!(
2230 extension = %extension.kind,
2231 version = %extension.version,
2232 "runtime flow extension present but could not be decoded"
2233 );
2234 None
2235}
2236
2237fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
2238 flows
2239 .into_iter()
2240 .filter_map(|flow| match runtime_flow_to_flow(flow) {
2241 Ok(flow) => Some(flow),
2242 Err(err) => {
2243 warn!(error = %err, "failed to decode runtime flow");
2244 None
2245 }
2246 })
2247 .collect()
2248}
2249
2250fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
2251 let flow_id = FlowId::from_str(&runtime.id)
2252 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
2253 let mut entrypoints = runtime.entrypoints;
2254 if entrypoints.is_empty()
2255 && let Some(start) = &runtime.start
2256 {
2257 entrypoints.insert("default".into(), Value::String(start.clone()));
2258 }
2259
2260 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
2261 for (id, node) in runtime.nodes {
2262 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
2263 let component_id = ComponentId::from_str(&node.component_id)
2264 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
2265 let component = FlowComponentRef {
2266 id: component_id,
2267 pack_alias: None,
2268 operation: node.operation_name,
2269 };
2270 let routing = node.routing.unwrap_or(Routing::End);
2271 let telemetry = node.telemetry.unwrap_or_default();
2272 nodes.insert(
2273 node_id.clone(),
2274 Node {
2275 id: node_id,
2276 component,
2277 input: InputMapping {
2278 mapping: node.operation_payload,
2279 },
2280 output: OutputMapping {
2281 mapping: Value::Null,
2282 },
2283 routing,
2284 telemetry,
2285 },
2286 );
2287 }
2288
2289 Ok(Flow {
2290 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
2291 id: flow_id,
2292 kind: runtime.kind,
2293 entrypoints,
2294 nodes,
2295 metadata: runtime.metadata.unwrap_or_default(),
2296 })
2297}
2298
2299fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
2300 match kind {
2301 greentic_types::FlowKind::Messaging => "messaging",
2302 greentic_types::FlowKind::Event => "event",
2303 greentic_types::FlowKind::ComponentConfig => "component-config",
2304 greentic_types::FlowKind::Job => "job",
2305 greentic_types::FlowKind::Http => "http",
2306 }
2307}
2308
2309fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
2310 let mut file = archive
2311 .by_name(name)
2312 .with_context(|| format!("entry {name} missing from archive"))?;
2313 let mut buf = Vec::new();
2314 file.read_to_end(&mut buf)?;
2315 Ok(buf)
2316}
2317
2318fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
2319 for node in doc.nodes.values_mut() {
2320 let Some((component_ref, payload)) = node
2321 .raw
2322 .iter()
2323 .next()
2324 .map(|(key, value)| (key.clone(), value.clone()))
2325 else {
2326 continue;
2327 };
2328 if component_ref.starts_with("emit.") {
2329 node.operation = Some(component_ref);
2330 node.payload = payload;
2331 node.raw.clear();
2332 continue;
2333 }
2334 let (target_component, operation, input, config) =
2335 infer_component_exec(&payload, &component_ref);
2336 let mut payload_obj = serde_json::Map::new();
2337 payload_obj.insert("component".into(), Value::String(target_component));
2339 payload_obj.insert("operation".into(), Value::String(operation));
2340 payload_obj.insert("input".into(), input);
2341 if let Some(cfg) = config {
2342 payload_obj.insert("config".into(), cfg);
2343 }
2344 node.operation = Some("component.exec".to_string());
2345 node.payload = Value::Object(payload_obj);
2346 node.raw.clear();
2347 }
2348 doc
2349}
2350
2351fn infer_component_exec(
2352 payload: &Value,
2353 component_ref: &str,
2354) -> (String, String, Value, Option<Value>) {
2355 let default_op = if component_ref.starts_with("templating.") {
2356 "render"
2357 } else {
2358 "invoke"
2359 }
2360 .to_string();
2361
2362 if let Value::Object(map) = payload {
2363 let op = map
2364 .get("op")
2365 .or_else(|| map.get("operation"))
2366 .and_then(Value::as_str)
2367 .map(|s| s.to_string())
2368 .unwrap_or_else(|| default_op.clone());
2369
2370 let mut input = map.clone();
2371 let config = input.remove("config");
2372 let component = input
2373 .get("component")
2374 .or_else(|| input.get("component_ref"))
2375 .and_then(Value::as_str)
2376 .map(|s| s.to_string())
2377 .unwrap_or_else(|| component_ref.to_string());
2378 input.remove("component");
2379 input.remove("component_ref");
2380 input.remove("op");
2381 input.remove("operation");
2382 return (component, op, Value::Object(input), config);
2383 }
2384
2385 (component_ref.to_string(), default_op, payload.clone(), None)
2386}
2387
2388#[derive(Clone, Debug)]
2389struct ComponentSpec {
2390 id: String,
2391 version: String,
2392 legacy_path: Option<String>,
2393}
2394
2395#[derive(Clone, Debug)]
2396struct ComponentSourceInfo {
2397 digest: Option<String>,
2398 source: ComponentSourceRef,
2399 artifact: ComponentArtifactLocation,
2400 expected_wasm_sha256: Option<String>,
2401 skip_digest_verification: bool,
2402}
2403
2404#[derive(Clone, Debug)]
2405enum ComponentArtifactLocation {
2406 Inline { wasm_path: String },
2407 Remote,
2408}
2409
2410#[derive(Clone, Debug, Deserialize)]
2411struct PackLockV1 {
2412 schema_version: u32,
2413 components: Vec<PackLockComponent>,
2414}
2415
2416#[derive(Clone, Debug, Deserialize)]
2417struct PackLockComponent {
2418 name: String,
2419 #[serde(default, rename = "source_ref")]
2420 source_ref: Option<String>,
2421 #[serde(default, rename = "ref")]
2422 legacy_ref: Option<String>,
2423 #[serde(default)]
2424 component_id: Option<ComponentId>,
2425 #[serde(default)]
2426 bundled: Option<bool>,
2427 #[serde(default, rename = "bundled_path")]
2428 bundled_path: Option<String>,
2429 #[serde(default, rename = "path")]
2430 legacy_path: Option<String>,
2431 #[serde(default)]
2432 wasm_sha256: Option<String>,
2433 #[serde(default, rename = "sha256")]
2434 legacy_sha256: Option<String>,
2435 #[serde(default)]
2436 resolved_digest: Option<String>,
2437 #[serde(default)]
2438 digest: Option<String>,
2439}
2440
2441fn component_specs(
2442 manifest: Option<&greentic_types::PackManifest>,
2443 legacy_manifest: Option<&legacy_pack::PackManifest>,
2444 component_sources: Option<&ComponentSourcesV1>,
2445 pack_lock: Option<&PackLockV1>,
2446) -> Vec<ComponentSpec> {
2447 if let Some(manifest) = manifest {
2448 if !manifest.components.is_empty() {
2449 return manifest
2450 .components
2451 .iter()
2452 .map(|entry| ComponentSpec {
2453 id: entry.id.as_str().to_string(),
2454 version: entry.version.to_string(),
2455 legacy_path: None,
2456 })
2457 .collect();
2458 }
2459 if let Some(lock) = pack_lock {
2460 let mut seen = HashSet::new();
2461 let mut specs = Vec::new();
2462 for entry in &lock.components {
2463 let id = entry
2464 .component_id
2465 .as_ref()
2466 .map(|id| id.as_str())
2467 .unwrap_or(entry.name.as_str());
2468 if seen.insert(id.to_string()) {
2469 specs.push(ComponentSpec {
2470 id: id.to_string(),
2471 version: "0.0.0".to_string(),
2472 legacy_path: None,
2473 });
2474 }
2475 }
2476 return specs;
2477 }
2478 if let Some(sources) = component_sources {
2479 let mut seen = HashSet::new();
2480 let mut specs = Vec::new();
2481 for entry in &sources.components {
2482 let id = entry
2483 .component_id
2484 .as_ref()
2485 .map(|id| id.as_str())
2486 .unwrap_or(entry.name.as_str());
2487 if seen.insert(id.to_string()) {
2488 specs.push(ComponentSpec {
2489 id: id.to_string(),
2490 version: "0.0.0".to_string(),
2491 legacy_path: None,
2492 });
2493 }
2494 }
2495 return specs;
2496 }
2497 }
2498 if let Some(legacy_manifest) = legacy_manifest {
2499 return legacy_manifest
2500 .components
2501 .iter()
2502 .map(|entry| ComponentSpec {
2503 id: entry.name.clone(),
2504 version: entry.version.to_string(),
2505 legacy_path: Some(entry.file_wasm.clone()),
2506 })
2507 .collect();
2508 }
2509 Vec::new()
2510}
2511
2512fn component_sources_table(
2513 sources: Option<&ComponentSourcesV1>,
2514) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2515 let Some(sources) = sources else {
2516 return Ok(None);
2517 };
2518 let mut table = HashMap::new();
2519 for entry in &sources.components {
2520 let artifact = match &entry.artifact {
2521 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2522 wasm_path: wasm_path.clone(),
2523 },
2524 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2525 };
2526 let info = ComponentSourceInfo {
2527 digest: Some(entry.resolved.digest.clone()),
2528 source: entry.source.clone(),
2529 artifact,
2530 expected_wasm_sha256: None,
2531 skip_digest_verification: false,
2532 };
2533 if let Some(component_id) = entry.component_id.as_ref() {
2534 table.insert(component_id.as_str().to_string(), info.clone());
2535 }
2536 table.insert(entry.name.clone(), info);
2537 }
2538 Ok(Some(table))
2539}
2540
2541fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2542 let lock_path = if path.is_dir() {
2543 let candidate = path.join("pack.lock");
2544 if candidate.exists() {
2545 Some(candidate)
2546 } else {
2547 let candidate = path.join("pack.lock.json");
2548 candidate.exists().then_some(candidate)
2549 }
2550 } else {
2551 None
2552 };
2553 let Some(lock_path) = lock_path else {
2554 return Ok(None);
2555 };
2556 let raw = std::fs::read_to_string(&lock_path)
2557 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2558 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2559 if lock.schema_version != 1 {
2560 bail!("pack.lock schema_version must be 1");
2561 }
2562 Ok(Some(lock))
2563}
2564
2565fn find_pack_lock_roots(
2566 pack_path: &Path,
2567 is_dir: bool,
2568 archive_hint: Option<&Path>,
2569) -> Vec<PathBuf> {
2570 if is_dir {
2571 return vec![pack_path.to_path_buf()];
2572 }
2573 let mut roots = Vec::new();
2574 if let Some(archive_path) = archive_hint {
2575 if let Some(parent) = archive_path.parent() {
2576 roots.push(parent.to_path_buf());
2577 if let Some(grandparent) = parent.parent() {
2578 roots.push(grandparent.to_path_buf());
2579 }
2580 }
2581 } else if let Some(parent) = pack_path.parent() {
2582 roots.push(parent.to_path_buf());
2583 if let Some(grandparent) = parent.parent() {
2584 roots.push(grandparent.to_path_buf());
2585 }
2586 }
2587 roots
2588}
2589
2590fn normalize_sha256(digest: &str) -> Result<String> {
2591 let trimmed = digest.trim();
2592 if trimmed.is_empty() {
2593 bail!("sha256 digest cannot be empty");
2594 }
2595 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2596 if stripped.is_empty() {
2597 bail!("sha256 digest must include hex bytes after sha256:");
2598 }
2599 return Ok(trimmed.to_string());
2600 }
2601 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2602 return Ok(format!("sha256:{trimmed}"));
2603 }
2604 bail!("sha256 digest must be hex or sha256:<hex>");
2605}
2606
2607fn component_sources_table_from_pack_lock(
2608 lock: &PackLockV1,
2609 allow_missing_hash: bool,
2610) -> Result<HashMap<String, ComponentSourceInfo>> {
2611 let mut table = HashMap::new();
2612 let mut names = HashSet::new();
2613 for entry in &lock.components {
2614 if !names.insert(entry.name.clone()) {
2615 bail!(
2616 "pack.lock contains duplicate component name `{}`",
2617 entry.name
2618 );
2619 }
2620 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2621 (Some(primary), Some(legacy)) => {
2622 if primary != legacy {
2623 bail!(
2624 "pack.lock component {} has conflicting refs: {} vs {}",
2625 entry.name,
2626 primary,
2627 legacy
2628 );
2629 }
2630 primary.as_str()
2631 }
2632 (Some(primary), None) => primary.as_str(),
2633 (None, Some(legacy)) => legacy.as_str(),
2634 (None, None) => {
2635 bail!("pack.lock component {} missing source_ref", entry.name);
2636 }
2637 };
2638 let source: ComponentSourceRef = source_ref
2639 .parse()
2640 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2641 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2642 (Some(primary), Some(legacy)) => {
2643 if primary != legacy {
2644 bail!(
2645 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2646 entry.name,
2647 primary,
2648 legacy
2649 );
2650 }
2651 Some(primary.clone())
2652 }
2653 (Some(primary), None) => Some(primary.clone()),
2654 (None, Some(legacy)) => Some(legacy.clone()),
2655 (None, None) => None,
2656 };
2657 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2658 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2659 let wasm_path = bundled_path.ok_or_else(|| {
2660 anyhow!(
2661 "pack.lock component {} marked bundled but bundled_path is missing",
2662 entry.name
2663 )
2664 })?;
2665 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2666 (Some(primary), Some(legacy)) => {
2667 if primary != legacy {
2668 bail!(
2669 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2670 entry.name,
2671 primary,
2672 legacy
2673 );
2674 }
2675 Some(primary.as_str())
2676 }
2677 (Some(primary), None) => Some(primary.as_str()),
2678 (None, Some(legacy)) => Some(legacy.as_str()),
2679 (None, None) => None,
2680 };
2681 let expected = match expected_raw {
2682 Some(value) => Some(normalize_sha256(value)?),
2683 None => None,
2684 };
2685 if expected.is_none() && !allow_missing_hash {
2686 bail!(
2687 "pack.lock component {} missing wasm_sha256 for bundled component",
2688 entry.name
2689 );
2690 }
2691 (
2692 ComponentArtifactLocation::Inline { wasm_path },
2693 expected.clone(),
2694 expected,
2695 allow_missing_hash && expected_raw.is_none(),
2696 )
2697 } else {
2698 if source.is_tag() {
2699 bail!(
2700 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2701 entry.name,
2702 source
2703 );
2704 }
2705 let expected = entry
2706 .resolved_digest
2707 .as_deref()
2708 .or(entry.digest.as_deref())
2709 .ok_or_else(|| {
2710 anyhow!(
2711 "pack.lock component {} missing resolved_digest for remote component",
2712 entry.name
2713 )
2714 })?;
2715 (
2716 ComponentArtifactLocation::Remote,
2717 Some(normalize_digest(expected)),
2718 None,
2719 false,
2720 )
2721 };
2722 let info = ComponentSourceInfo {
2723 digest,
2724 source,
2725 artifact,
2726 expected_wasm_sha256,
2727 skip_digest_verification,
2728 };
2729 if let Some(component_id) = entry.component_id.as_ref() {
2730 let key = component_id.as_str().to_string();
2731 if table.contains_key(&key) {
2732 bail!(
2733 "pack.lock contains duplicate component id `{}`",
2734 component_id.as_str()
2735 );
2736 }
2737 table.insert(key, info.clone());
2738 }
2739 if entry.name
2740 != entry
2741 .component_id
2742 .as_ref()
2743 .map(|id| id.as_str())
2744 .unwrap_or("")
2745 {
2746 table.insert(entry.name.clone(), info);
2747 }
2748 }
2749 Ok(table)
2750}
2751
2752fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2753 if let Some(path) = &spec.legacy_path {
2754 return root.join(path);
2755 }
2756 root.join("components").join(format!("{}.wasm", spec.id))
2757}
2758
2759fn normalize_digest(digest: &str) -> String {
2760 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2761 digest.to_string()
2762 } else {
2763 format!("sha256:{digest}")
2764 }
2765}
2766
2767fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2768 if digest.starts_with("blake3:") {
2769 let hash = blake3::hash(bytes);
2770 return Ok(format!("blake3:{}", hash.to_hex()));
2771 }
2772 let mut hasher = sha2::Sha256::new();
2773 hasher.update(bytes);
2774 Ok(format!("sha256:{:x}", hasher.finalize()))
2775}
2776
2777fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2778 let mut hasher = sha2::Sha256::new();
2779 hasher.update(bytes);
2780 format!("sha256:{:x}", hasher.finalize())
2781}
2782
2783fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2784 let wasm_digest = digest
2785 .map(normalize_digest)
2786 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2787 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2788}
2789
2790async fn compile_component_with_cache(
2791 cache: &CacheManager,
2792 engine: &Engine,
2793 digest: Option<&str>,
2794 bytes: Vec<u8>,
2795) -> Result<Arc<Component>> {
2796 let key = build_artifact_key(cache, digest, &bytes);
2797 cache.get_component(engine, &key, || Ok(bytes)).await
2798}
2799
2800fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2801 let normalized_expected = normalize_digest(expected);
2802 let actual = compute_digest_for(bytes, &normalized_expected)?;
2803 if normalize_digest(&actual) != normalized_expected {
2804 bail!(
2805 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2806 );
2807 }
2808 Ok(())
2809}
2810
2811fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2812 let normalized_expected = normalize_sha256(expected)?;
2813 let actual = compute_sha256_digest_for(bytes);
2814 if actual != normalized_expected {
2815 bail!(
2816 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2817 );
2818 }
2819 Ok(())
2820}
2821
2822#[cfg(test)]
2823mod pack_lock_tests {
2824 use super::*;
2825 use tempfile::TempDir;
2826
2827 #[test]
2828 fn pack_lock_tag_ref_requires_bundle() {
2829 let lock = PackLockV1 {
2830 schema_version: 1,
2831 components: vec![PackLockComponent {
2832 name: "templates".to_string(),
2833 source_ref: Some("oci://registry.test/templates:latest".to_string()),
2834 legacy_ref: None,
2835 component_id: None,
2836 bundled: Some(false),
2837 bundled_path: None,
2838 legacy_path: None,
2839 wasm_sha256: None,
2840 legacy_sha256: None,
2841 resolved_digest: None,
2842 digest: None,
2843 }],
2844 };
2845 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2846 assert!(
2847 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2848 "unexpected error: {err}"
2849 );
2850 }
2851
2852 #[test]
2853 fn bundled_hash_mismatch_errors() {
2854 let rt = tokio::runtime::Runtime::new().expect("runtime");
2855 let temp = TempDir::new().expect("temp dir");
2856 let engine = Engine::default();
2857 let engine_profile =
2858 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2859 let cache_config = CacheConfig {
2860 root: temp.path().join("cache"),
2861 ..CacheConfig::default()
2862 };
2863 let cache = CacheManager::new(cache_config, engine_profile);
2864 let wasm_path = temp.path().join("component.wasm");
2865 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2866 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2867 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2868 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2869
2870 let spec = ComponentSpec {
2871 id: "qa.process".to_string(),
2872 version: "0.0.0".to_string(),
2873 legacy_path: None,
2874 };
2875 let mut missing = HashSet::new();
2876 missing.insert(spec.id.clone());
2877
2878 let mut sources = HashMap::new();
2879 sources.insert(
2880 spec.id.clone(),
2881 ComponentSourceInfo {
2882 digest: Some("sha256:deadbeef".to_string()),
2883 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2884 artifact: ComponentArtifactLocation::Inline {
2885 wasm_path: "component.wasm".to_string(),
2886 },
2887 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2888 skip_digest_verification: false,
2889 },
2890 );
2891
2892 let mut loaded = HashMap::new();
2893 let result = rt.block_on(load_components_from_sources(
2894 &cache,
2895 &engine,
2896 &sources,
2897 &ComponentResolution::default(),
2898 &[spec],
2899 &mut missing,
2900 &mut loaded,
2901 Some(temp.path()),
2902 None,
2903 ));
2904 let err = result.unwrap_err();
2905 assert!(
2906 err.to_string().contains("bundled digest mismatch"),
2907 "unexpected error: {err}"
2908 );
2909 }
2910}
2911
2912#[cfg(test)]
2913mod pack_resolution_prop_tests {
2914 use super::*;
2915 use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
2916 use proptest::prelude::*;
2917 use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
2918 use std::collections::BTreeSet;
2919 use std::path::Path;
2920 use std::str::FromStr;
2921
2922 #[derive(Clone, Debug)]
2923 enum ResolveRequest {
2924 ById(String),
2925 ByName(String),
2926 }
2927
2928 #[derive(Clone, Debug, PartialEq, Eq)]
2929 struct ResolvedComponent {
2930 key: String,
2931 source: String,
2932 artifact: String,
2933 digest: Option<String>,
2934 expected_wasm_sha256: Option<String>,
2935 skip_digest_verification: bool,
2936 }
2937
2938 #[derive(Clone, Debug, PartialEq, Eq)]
2939 struct ResolveError {
2940 code: String,
2941 message: String,
2942 context_key: String,
2943 }
2944
2945 #[derive(Clone, Debug)]
2946 struct Scenario {
2947 pack_lock: Option<PackLockV1>,
2948 component_sources: Option<ComponentSourcesV1>,
2949 request: ResolveRequest,
2950 expected_sha256: Option<String>,
2951 bytes: Vec<u8>,
2952 }
2953
2954 fn resolve_component_test(
2955 sources: Option<&ComponentSourcesV1>,
2956 lock: Option<&PackLockV1>,
2957 request: &ResolveRequest,
2958 ) -> Result<ResolvedComponent, ResolveError> {
2959 let table = if let Some(lock) = lock {
2960 component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
2961 code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
2962 message: err.to_string(),
2963 context_key: request_key(request).to_string(),
2964 })?
2965 } else {
2966 let sources = component_sources_table(sources).map_err(|err| ResolveError {
2967 code: "component_sources_error".to_string(),
2968 message: err.to_string(),
2969 context_key: request_key(request).to_string(),
2970 })?;
2971 sources.ok_or_else(|| ResolveError {
2972 code: "missing_component_sources".to_string(),
2973 message: "component sources not provided".to_string(),
2974 context_key: request_key(request).to_string(),
2975 })?
2976 };
2977
2978 let key = request_key(request);
2979 let source = table.get(key).ok_or_else(|| ResolveError {
2980 code: "component_not_found".to_string(),
2981 message: format!("component {key} not found"),
2982 context_key: key.to_string(),
2983 })?;
2984
2985 Ok(ResolvedComponent {
2986 key: key.to_string(),
2987 source: source.source.to_string(),
2988 artifact: match source.artifact {
2989 ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
2990 ComponentArtifactLocation::Remote => "remote".to_string(),
2991 },
2992 digest: source.digest.clone(),
2993 expected_wasm_sha256: source.expected_wasm_sha256.clone(),
2994 skip_digest_verification: source.skip_digest_verification,
2995 })
2996 }
2997
2998 fn request_key(request: &ResolveRequest) -> &str {
2999 match request {
3000 ResolveRequest::ById(value) => value.as_str(),
3001 ResolveRequest::ByName(value) => value.as_str(),
3002 }
3003 }
3004
3005 fn classify_pack_lock_error(message: &str) -> &'static str {
3006 if message.contains("duplicate component name") {
3007 "duplicate_name"
3008 } else if message.contains("duplicate component id") {
3009 "duplicate_id"
3010 } else if message.contains("conflicting refs") {
3011 "conflicting_ref"
3012 } else if message.contains("conflicting bundled paths") {
3013 "conflicting_bundled_path"
3014 } else if message.contains("conflicting wasm_sha256") {
3015 "conflicting_wasm_sha256"
3016 } else if message.contains("missing source_ref") {
3017 "missing_source_ref"
3018 } else if message.contains("marked bundled but bundled_path is missing") {
3019 "missing_bundled_path"
3020 } else if message.contains("missing wasm_sha256") {
3021 "missing_wasm_sha256"
3022 } else if message.contains("tag ref") && message.contains("not bundled") {
3023 "tag_ref_requires_bundle"
3024 } else if message.contains("missing resolved_digest") {
3025 "missing_resolved_digest"
3026 } else if message.contains("invalid component ref") {
3027 "invalid_component_ref"
3028 } else if message.contains("sha256 digest") {
3029 "invalid_sha256"
3030 } else {
3031 "unknown_error"
3032 }
3033 }
3034
3035 fn known_error_codes() -> BTreeSet<&'static str> {
3036 [
3037 "component_sources_error",
3038 "missing_component_sources",
3039 "component_not_found",
3040 "duplicate_name",
3041 "duplicate_id",
3042 "conflicting_ref",
3043 "conflicting_bundled_path",
3044 "conflicting_wasm_sha256",
3045 "missing_source_ref",
3046 "missing_bundled_path",
3047 "missing_wasm_sha256",
3048 "tag_ref_requires_bundle",
3049 "missing_resolved_digest",
3050 "invalid_component_ref",
3051 "invalid_sha256",
3052 "unknown_error",
3053 ]
3054 .into_iter()
3055 .collect()
3056 }
3057
3058 fn proptest_config() -> ProptestConfig {
3059 let cases = std::env::var("PROPTEST_CASES")
3060 .ok()
3061 .and_then(|value| value.parse::<u32>().ok())
3062 .unwrap_or(128);
3063 ProptestConfig {
3064 cases,
3065 failure_persistence: None,
3066 ..ProptestConfig::default()
3067 }
3068 }
3069
3070 fn proptest_seed() -> Option<[u8; 32]> {
3071 let seed = std::env::var("PROPTEST_SEED")
3072 .ok()
3073 .and_then(|value| value.parse::<u64>().ok())?;
3074 let mut bytes = [0u8; 32];
3075 bytes[..8].copy_from_slice(&seed.to_le_bytes());
3076 Some(bytes)
3077 }
3078
3079 fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
3080 let config = ProptestConfig {
3081 cases,
3082 failure_persistence: None,
3083 ..ProptestConfig::default()
3084 };
3085 let mut runner = match seed {
3086 Some(bytes) => {
3087 TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
3088 }
3089 None => TestRunner::new(config),
3090 };
3091 runner
3092 .run(&strategy, |scenario| {
3093 run_scenario(&scenario);
3094 Ok(())
3095 })
3096 .unwrap();
3097 }
3098
3099 fn run_scenario(scenario: &Scenario) {
3100 let known_codes = known_error_codes();
3101 let first = resolve_component_test(
3102 scenario.component_sources.as_ref(),
3103 scenario.pack_lock.as_ref(),
3104 &scenario.request,
3105 );
3106 let second = resolve_component_test(
3107 scenario.component_sources.as_ref(),
3108 scenario.pack_lock.as_ref(),
3109 &scenario.request,
3110 );
3111 assert_eq!(normalize_result(&first), normalize_result(&second));
3112
3113 if let Some(lock) = scenario.pack_lock.as_ref() {
3114 let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
3115 assert_eq!(normalize_result(&first), normalize_result(&lock_only));
3116 }
3117
3118 if let Err(err) = first.as_ref() {
3119 assert!(
3120 known_codes.contains(err.code.as_str()),
3121 "unexpected error code {}: {}",
3122 err.code,
3123 err.message
3124 );
3125 }
3126
3127 if let Some(expected) = scenario.expected_sha256.as_deref() {
3128 let expected_ok =
3129 verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
3130 let actual = compute_sha256_digest_for(&scenario.bytes);
3131 if actual == normalize_sha256(expected).unwrap_or_default() {
3132 assert!(expected_ok, "expected sha256 match to succeed");
3133 } else {
3134 assert!(!expected_ok, "expected sha256 mismatch to fail");
3135 }
3136 }
3137 }
3138
3139 fn normalize_result(
3140 result: &Result<ResolvedComponent, ResolveError>,
3141 ) -> Result<ResolvedComponent, ResolveError> {
3142 match result {
3143 Ok(value) => Ok(value.clone()),
3144 Err(err) => Err(err.clone()),
3145 }
3146 }
3147
3148 fn scenario_strategy() -> impl Strategy<Value = Scenario> {
3149 let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
3150 let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
3151 let tag_ref = any::<bool>();
3152 let bundled = any::<bool>();
3153 let include_sha = any::<bool>();
3154 let include_component_id = any::<bool>();
3155 let request_by_id = any::<bool>();
3156 let use_lock = any::<bool>();
3157 let use_sources = any::<bool>();
3158 let bytes = prop::collection::vec(any::<u8>(), 1..64);
3159
3160 (
3161 name,
3162 alt_name,
3163 tag_ref,
3164 bundled,
3165 include_sha,
3166 include_component_id,
3167 request_by_id,
3168 use_lock,
3169 use_sources,
3170 bytes,
3171 )
3172 .prop_map(
3173 |(
3174 name,
3175 alt_name,
3176 tag_ref,
3177 bundled,
3178 include_sha,
3179 include_component_id,
3180 request_by_id,
3181 use_lock,
3182 use_sources,
3183 bytes,
3184 )| {
3185 let component_id_str = if include_component_id {
3186 alt_name.clone()
3187 } else {
3188 name.clone()
3189 };
3190 let component_id = ComponentId::from_str(&component_id_str).ok();
3191 let source_ref = if tag_ref {
3192 format!("oci://registry.test/{name}:v1")
3193 } else {
3194 format!(
3195 "oci://registry.test/{name}@sha256:{}",
3196 hex::encode([0x11u8; 32])
3197 )
3198 };
3199 let expected_sha256 = if bundled && include_sha {
3200 Some(compute_sha256_digest_for(&bytes))
3201 } else {
3202 None
3203 };
3204
3205 let lock_component = PackLockComponent {
3206 name: name.clone(),
3207 source_ref: Some(source_ref),
3208 legacy_ref: None,
3209 component_id,
3210 bundled: Some(bundled),
3211 bundled_path: if bundled {
3212 Some(format!("components/{name}.wasm"))
3213 } else {
3214 None
3215 },
3216 legacy_path: None,
3217 wasm_sha256: expected_sha256.clone(),
3218 legacy_sha256: None,
3219 resolved_digest: if bundled {
3220 None
3221 } else {
3222 Some("sha256:deadbeef".to_string())
3223 },
3224 digest: None,
3225 };
3226
3227 let pack_lock = if use_lock {
3228 Some(PackLockV1 {
3229 schema_version: 1,
3230 components: vec![lock_component],
3231 })
3232 } else {
3233 None
3234 };
3235
3236 let component_sources = if use_sources {
3237 Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
3238 name: name.clone(),
3239 component_id: ComponentId::from_str(&name).ok(),
3240 source: ComponentSourceRef::from_str(
3241 "oci://registry.test/component@sha256:deadbeef",
3242 )
3243 .expect("component ref"),
3244 resolved: ResolvedComponentV1 {
3245 digest: "sha256:deadbeef".to_string(),
3246 signature: None,
3247 signed_by: None,
3248 },
3249 artifact: if bundled {
3250 ArtifactLocationV1::Inline {
3251 wasm_path: format!("components/{name}.wasm"),
3252 manifest_path: None,
3253 }
3254 } else {
3255 ArtifactLocationV1::Remote
3256 },
3257 licensing_hint: None,
3258 metering_hint: None,
3259 }]))
3260 } else {
3261 None
3262 };
3263
3264 let request = if request_by_id {
3265 ResolveRequest::ById(component_id_str.clone())
3266 } else {
3267 ResolveRequest::ByName(name.clone())
3268 };
3269
3270 Scenario {
3271 pack_lock,
3272 component_sources,
3273 request,
3274 expected_sha256,
3275 bytes,
3276 }
3277 },
3278 )
3279 }
3280
3281 #[test]
3282 fn pack_resolution_proptest() {
3283 let seed = proptest_seed();
3284 run_cases(scenario_strategy(), proptest_config().cases, seed);
3285 }
3286
3287 #[test]
3288 fn pack_resolution_regression_seeds() {
3289 let seeds_path =
3290 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
3291 let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
3292 for line in raw.lines() {
3293 let line = line.trim();
3294 if line.is_empty() || line.starts_with('#') {
3295 continue;
3296 }
3297 let seed = line.parse::<u64>().expect("seed must be an integer");
3298 let mut bytes = [0u8; 32];
3299 bytes[..8].copy_from_slice(&seed.to_le_bytes());
3300 run_cases(scenario_strategy(), 1, Some(bytes));
3301 }
3302 }
3303}
3304
3305fn locate_pack_assets(
3306 materialized_root: Option<&Path>,
3307 archive_hint: Option<&Path>,
3308) -> Result<(Option<PathBuf>, Option<TempDir>)> {
3309 if let Some(root) = materialized_root {
3310 let assets = root.join("assets");
3311 if assets.is_dir() {
3312 return Ok((Some(assets), None));
3313 }
3314 }
3315 if let Some(path) = archive_hint
3316 && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
3317 {
3318 return Ok((Some(assets), Some(tempdir)));
3319 }
3320 Ok((None, None))
3321}
3322
3323fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
3324 let file =
3325 File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
3326 let mut archive =
3327 ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
3328 let temp = TempDir::new().context("failed to create temporary assets directory")?;
3329 let mut found = false;
3330 for idx in 0..archive.len() {
3331 let mut entry = archive.by_index(idx)?;
3332 let name = entry.name();
3333 if !name.starts_with("assets/") {
3334 continue;
3335 }
3336 let dest = temp.path().join(name);
3337 if name.ends_with('/') {
3338 std::fs::create_dir_all(&dest)?;
3339 found = true;
3340 continue;
3341 }
3342 if let Some(parent) = dest.parent() {
3343 std::fs::create_dir_all(parent)?;
3344 }
3345 let mut outfile = std::fs::File::create(&dest)?;
3346 std::io::copy(&mut entry, &mut outfile)?;
3347 found = true;
3348 }
3349 if found {
3350 let assets_path = temp.path().join("assets");
3351 Ok(Some((temp, assets_path)))
3352 } else {
3353 Ok(None)
3354 }
3355}
3356
3357fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
3358 let mut opts = DistOptions {
3359 allow_tags: true,
3360 ..DistOptions::default()
3361 };
3362 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
3363 opts.cache_dir = cache_dir;
3364 }
3365 if component_resolution.dist_offline {
3366 opts.offline = true;
3367 }
3368 opts
3369}
3370
3371#[allow(clippy::too_many_arguments)]
3372async fn load_components_from_sources(
3373 cache: &CacheManager,
3374 engine: &Engine,
3375 component_sources: &HashMap<String, ComponentSourceInfo>,
3376 component_resolution: &ComponentResolution,
3377 specs: &[ComponentSpec],
3378 missing: &mut HashSet<String>,
3379 into: &mut HashMap<String, PackComponent>,
3380 materialized_root: Option<&Path>,
3381 archive_hint: Option<&Path>,
3382) -> Result<()> {
3383 let mut archive = if let Some(path) = archive_hint {
3384 Some(
3385 ZipArchive::new(File::open(path)?)
3386 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
3387 )
3388 } else {
3389 None
3390 };
3391 let mut dist_client: Option<DistClient> = None;
3392
3393 for spec in specs {
3394 if !missing.contains(&spec.id) {
3395 continue;
3396 }
3397 let Some(source) = component_sources.get(&spec.id) else {
3398 continue;
3399 };
3400
3401 let bytes = match &source.artifact {
3402 ComponentArtifactLocation::Inline { wasm_path } => {
3403 if let Some(root) = materialized_root {
3404 let path = root.join(wasm_path);
3405 if path.exists() {
3406 std::fs::read(&path).with_context(|| {
3407 format!(
3408 "failed to read inline component {} from {}",
3409 spec.id,
3410 path.display()
3411 )
3412 })?
3413 } else if archive.is_none() {
3414 bail!("inline component {} missing at {}", spec.id, path.display());
3415 } else {
3416 read_entry(
3417 archive.as_mut().expect("archive present when needed"),
3418 wasm_path,
3419 )
3420 .with_context(|| {
3421 format!(
3422 "inline component {} missing at {} in pack archive",
3423 spec.id, wasm_path
3424 )
3425 })?
3426 }
3427 } else if let Some(archive) = archive.as_mut() {
3428 read_entry(archive, wasm_path).with_context(|| {
3429 format!(
3430 "inline component {} missing at {} in pack archive",
3431 spec.id, wasm_path
3432 )
3433 })?
3434 } else {
3435 bail!(
3436 "inline component {} missing and no pack source available",
3437 spec.id
3438 );
3439 }
3440 }
3441 ComponentArtifactLocation::Remote => {
3442 if source.source.is_tag() {
3443 bail!(
3444 "component {} uses tag ref {} but is not bundled; rebuild the pack",
3445 spec.id,
3446 source.source
3447 );
3448 }
3449 let client = dist_client.get_or_insert_with(|| {
3450 DistClient::new(dist_options_from(component_resolution))
3451 });
3452 let reference = source.source.to_string();
3453 fault::maybe_fail_asset(&reference)
3454 .await
3455 .with_context(|| format!("fault injection blocked asset {reference}"))?;
3456 let digest = source.digest.as_deref().ok_or_else(|| {
3457 anyhow!(
3458 "component {} missing expected digest for remote component",
3459 spec.id
3460 )
3461 })?;
3462 let cache_path = if component_resolution.dist_offline {
3463 client
3464 .fetch_digest(digest)
3465 .await
3466 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3467 } else {
3468 let resolved = client
3469 .resolve_ref(&reference)
3470 .await
3471 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3472 let expected = normalize_digest(digest);
3473 let actual = normalize_digest(&resolved.digest);
3474 if expected != actual {
3475 bail!(
3476 "component {} digest mismatch after fetch: expected {}, got {}",
3477 spec.id,
3478 expected,
3479 actual
3480 );
3481 }
3482 resolved.cache_path.ok_or_else(|| {
3483 anyhow!(
3484 "component {} resolved from {} but cache path is missing",
3485 spec.id,
3486 reference
3487 )
3488 })?
3489 };
3490 std::fs::read(&cache_path).with_context(|| {
3491 format!(
3492 "failed to read cached component {} from {}",
3493 spec.id,
3494 cache_path.display()
3495 )
3496 })?
3497 }
3498 };
3499
3500 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3501 verify_wasm_sha256(&spec.id, expected, &bytes)?;
3502 } else if source.skip_digest_verification {
3503 let actual = compute_sha256_digest_for(&bytes);
3504 warn!(
3505 component_id = %spec.id,
3506 digest = %actual,
3507 "bundled component missing wasm_sha256; allowing due to flag"
3508 );
3509 } else {
3510 let expected = source.digest.as_deref().ok_or_else(|| {
3511 anyhow!(
3512 "component {} missing expected digest for verification",
3513 spec.id
3514 )
3515 })?;
3516 verify_component_digest(&spec.id, expected, &bytes)?;
3517 }
3518 let component =
3519 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3520 .await
3521 .with_context(|| format!("failed to compile component {}", spec.id))?;
3522 into.insert(
3523 spec.id.clone(),
3524 PackComponent {
3525 name: spec.id.clone(),
3526 version: spec.version.clone(),
3527 component,
3528 },
3529 );
3530 missing.remove(&spec.id);
3531 }
3532
3533 Ok(())
3534}
3535
3536fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3537 match err {
3538 DistError::CacheMiss { reference: missing } => anyhow!(
3539 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3540 component_id,
3541 missing,
3542 reference
3543 ),
3544 DistError::Offline { reference: blocked } => anyhow!(
3545 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3546 component_id,
3547 blocked,
3548 reference
3549 ),
3550 DistError::AuthRequired { target } => anyhow!(
3551 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3552 component_id,
3553 target,
3554 reference
3555 ),
3556 other => anyhow!(
3557 "failed to resolve component {} from {}: {}",
3558 component_id,
3559 reference,
3560 other
3561 ),
3562 }
3563}
3564
3565async fn load_components_from_overrides(
3566 cache: &CacheManager,
3567 engine: &Engine,
3568 overrides: &HashMap<String, PathBuf>,
3569 specs: &[ComponentSpec],
3570 missing: &mut HashSet<String>,
3571 into: &mut HashMap<String, PackComponent>,
3572) -> Result<()> {
3573 for spec in specs {
3574 if !missing.contains(&spec.id) {
3575 continue;
3576 }
3577 let Some(path) = overrides.get(&spec.id) else {
3578 continue;
3579 };
3580 let bytes = std::fs::read(path)
3581 .with_context(|| format!("failed to read override component {}", path.display()))?;
3582 let component = compile_component_with_cache(cache, engine, None, bytes)
3583 .await
3584 .with_context(|| {
3585 format!(
3586 "failed to compile component {} from override {}",
3587 spec.id,
3588 path.display()
3589 )
3590 })?;
3591 into.insert(
3592 spec.id.clone(),
3593 PackComponent {
3594 name: spec.id.clone(),
3595 version: spec.version.clone(),
3596 component,
3597 },
3598 );
3599 missing.remove(&spec.id);
3600 }
3601 Ok(())
3602}
3603
3604async fn load_components_from_dir(
3605 cache: &CacheManager,
3606 engine: &Engine,
3607 root: &Path,
3608 specs: &[ComponentSpec],
3609 missing: &mut HashSet<String>,
3610 into: &mut HashMap<String, PackComponent>,
3611) -> Result<()> {
3612 for spec in specs {
3613 if !missing.contains(&spec.id) {
3614 continue;
3615 }
3616 let path = component_path_for_spec(root, spec);
3617 if !path.exists() {
3618 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3619 continue;
3620 }
3621 let bytes = std::fs::read(&path)
3622 .with_context(|| format!("failed to read component {}", path.display()))?;
3623 let component = compile_component_with_cache(cache, engine, None, bytes)
3624 .await
3625 .with_context(|| {
3626 format!(
3627 "failed to compile component {} from {}",
3628 spec.id,
3629 path.display()
3630 )
3631 })?;
3632 into.insert(
3633 spec.id.clone(),
3634 PackComponent {
3635 name: spec.id.clone(),
3636 version: spec.version.clone(),
3637 component,
3638 },
3639 );
3640 missing.remove(&spec.id);
3641 }
3642 Ok(())
3643}
3644
3645async fn load_components_from_archive(
3646 cache: &CacheManager,
3647 engine: &Engine,
3648 path: &Path,
3649 specs: &[ComponentSpec],
3650 missing: &mut HashSet<String>,
3651 into: &mut HashMap<String, PackComponent>,
3652) -> Result<()> {
3653 let mut archive = ZipArchive::new(File::open(path)?)
3654 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3655 for spec in specs {
3656 if !missing.contains(&spec.id) {
3657 continue;
3658 }
3659 let file_name = spec
3660 .legacy_path
3661 .clone()
3662 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3663 let bytes = match read_entry(&mut archive, &file_name) {
3664 Ok(bytes) => bytes,
3665 Err(err) => {
3666 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3667 continue;
3668 }
3669 };
3670 let component = compile_component_with_cache(cache, engine, None, bytes)
3671 .await
3672 .with_context(|| format!("failed to compile component {}", spec.id))?;
3673 into.insert(
3674 spec.id.clone(),
3675 PackComponent {
3676 name: spec.id.clone(),
3677 version: spec.version.clone(),
3678 component,
3679 },
3680 );
3681 missing.remove(&spec.id);
3682 }
3683 Ok(())
3684}
3685
3686#[cfg(test)]
3687mod tests {
3688 use super::*;
3689 use greentic_flow::model::{FlowDoc, NodeDoc};
3690 use indexmap::IndexMap;
3691 use serde_json::json;
3692
3693 #[test]
3694 fn normalizes_raw_component_to_component_exec() {
3695 let mut nodes = IndexMap::new();
3696 let mut raw = IndexMap::new();
3697 raw.insert(
3698 "templating.handlebars".into(),
3699 json!({ "template": "Hi {{name}}" }),
3700 );
3701 nodes.insert(
3702 "start".into(),
3703 NodeDoc {
3704 raw,
3705 routing: json!([{"out": true}]),
3706 ..Default::default()
3707 },
3708 );
3709 let doc = FlowDoc {
3710 id: "welcome".into(),
3711 title: None,
3712 description: None,
3713 flow_type: "messaging".into(),
3714 start: Some("start".into()),
3715 parameters: json!({}),
3716 tags: Vec::new(),
3717 schema_version: None,
3718 entrypoints: IndexMap::new(),
3719 nodes,
3720 };
3721
3722 let normalized = normalize_flow_doc(doc);
3723 let node = normalized.nodes.get("start").expect("node exists");
3724 assert_eq!(node.operation.as_deref(), Some("component.exec"));
3725 assert!(node.raw.is_empty());
3726 let payload = node.payload.as_object().expect("payload object");
3727 assert_eq!(
3728 payload.get("component"),
3729 Some(&Value::String("templating.handlebars".into()))
3730 );
3731 assert_eq!(
3732 payload.get("operation"),
3733 Some(&Value::String("render".into()))
3734 );
3735 let input = payload.get("input").unwrap();
3736 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
3737 }
3738}
3739
3740#[derive(Clone, Debug, Default, Serialize, Deserialize)]
3741pub struct PackMetadata {
3742 pub pack_id: String,
3743 pub version: String,
3744 #[serde(default)]
3745 pub entry_flows: Vec<String>,
3746 #[serde(default)]
3747 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
3748}
3749
3750impl PackMetadata {
3751 fn from_wasm(bytes: &[u8]) -> Option<Self> {
3752 let parser = Parser::new(0);
3753 for payload in parser.parse_all(bytes) {
3754 let payload = payload.ok()?;
3755 match payload {
3756 Payload::CustomSection(section) => {
3757 if section.name() == "greentic.manifest"
3758 && let Ok(meta) = Self::from_bytes(section.data())
3759 {
3760 return Some(meta);
3761 }
3762 }
3763 Payload::DataSection(reader) => {
3764 for segment in reader.into_iter().flatten() {
3765 if let Ok(meta) = Self::from_bytes(segment.data) {
3766 return Some(meta);
3767 }
3768 }
3769 }
3770 _ => {}
3771 }
3772 }
3773 None
3774 }
3775
3776 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
3777 #[derive(Deserialize)]
3778 struct RawManifest {
3779 pack_id: String,
3780 version: String,
3781 #[serde(default)]
3782 entry_flows: Vec<String>,
3783 #[serde(default)]
3784 flows: Vec<RawFlow>,
3785 #[serde(default)]
3786 secret_requirements: Vec<greentic_types::SecretRequirement>,
3787 }
3788
3789 #[derive(Deserialize)]
3790 struct RawFlow {
3791 id: String,
3792 }
3793
3794 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
3795 let mut entry_flows = if manifest.entry_flows.is_empty() {
3796 manifest.flows.iter().map(|f| f.id.clone()).collect()
3797 } else {
3798 manifest.entry_flows.clone()
3799 };
3800 entry_flows.retain(|id| !id.is_empty());
3801 Ok(Self {
3802 pack_id: manifest.pack_id,
3803 version: manifest.version,
3804 entry_flows,
3805 secret_requirements: manifest.secret_requirements,
3806 })
3807 }
3808
3809 pub fn fallback(path: &Path) -> Self {
3810 let pack_id = path
3811 .file_stem()
3812 .map(|s| s.to_string_lossy().into_owned())
3813 .unwrap_or_else(|| "unknown-pack".to_string());
3814 Self {
3815 pack_id,
3816 version: "0.0.0".to_string(),
3817 entry_flows: Vec::new(),
3818 secret_requirements: Vec::new(),
3819 }
3820 }
3821
3822 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
3823 let entry_flows = manifest
3824 .flows
3825 .iter()
3826 .map(|flow| flow.id.as_str().to_string())
3827 .collect::<Vec<_>>();
3828 Self {
3829 pack_id: manifest.pack_id.as_str().to_string(),
3830 version: manifest.version.to_string(),
3831 entry_flows,
3832 secret_requirements: manifest.secret_requirements.clone(),
3833 }
3834 }
3835}