1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::{
16 schema_core::SchemaCorePre as LegacySchemaCorePre,
17 schema_core_schema::SchemaCorePre as SchemaSchemaCorePre,
18};
19use crate::provider_core_only;
20use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
21use anyhow::{Context, Result, anyhow, bail};
22use futures::executor::block_on;
23use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
24use greentic_interfaces_wasmtime::host_helpers::v1::{
25 self as host_v1, HostFns, add_all_v1_to_linker,
26 runner_host_http::RunnerHostHttp,
27 runner_host_kv::RunnerHostKv,
28 secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
29 state_store::{
30 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
31 StateStoreHost, TenantCtx as StateTenantCtx,
32 },
33 telemetry_logger::{
34 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
35 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
36 TenantCtx as TelemetryTenantCtx,
37 },
38};
39use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
40use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::http::http_client as http_client_client_alias;
41use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
42use greentic_pack::builder as legacy_pack;
43use greentic_types::flow::FlowHasher;
44use greentic_types::{
45 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
46 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
47 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
48 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
49 pack_manifest::ExtensionInline,
50};
51use host_v1::http_client as host_http_client;
52use host_v1::http_client::{
53 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
54 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
55 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
56 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
57};
58use indexmap::IndexMap;
59use once_cell::sync::Lazy;
60use parking_lot::{Mutex, RwLock};
61use reqwest::blocking::Client as BlockingClient;
62use runner_core::normalize_under_root;
63use serde::{Deserialize, Serialize};
64use serde_cbor;
65use serde_json::{self, Value};
66use sha2::Digest;
67use tempfile::TempDir;
68use tokio::fs;
69use wasmparser::{Parser, Payload};
70use wasmtime::{Store, StoreContextMut};
71use zip::ZipArchive;
72
73use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
74use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
75use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
76#[cfg(feature = "fault-injection")]
77use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
78
79use crate::config::HostConfig;
80use crate::fault;
81use crate::secrets::{DynSecretsManager, read_secret_blocking, write_secret_blocking};
82use crate::storage::state::STATE_PREFIX;
83use crate::storage::{DynSessionStore, DynStateStore};
84use crate::verify;
85use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
86use tracing::warn;
87use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
88use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
89
90use greentic_flow::model::FlowDoc;
91
92#[allow(dead_code)]
93pub struct PackRuntime {
94 path: PathBuf,
96 archive_path: Option<PathBuf>,
98 config: Arc<HostConfig>,
99 engine: Engine,
100 metadata: PackMetadata,
101 manifest: Option<greentic_types::PackManifest>,
102 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
103 component_manifests: HashMap<String, ComponentManifest>,
104 mocks: Option<Arc<MockLayer>>,
105 flows: Option<PackFlows>,
106 components: HashMap<String, PackComponent>,
107 http_client: Arc<BlockingClient>,
108 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
109 session_store: Option<DynSessionStore>,
110 state_store: Option<DynStateStore>,
111 wasi_policy: Arc<RunnerWasiPolicy>,
112 assets_tempdir: Option<TempDir>,
113 provider_registry: RwLock<Option<ProviderRegistry>>,
114 secrets: DynSecretsManager,
115 oauth_config: Option<OAuthBrokerConfig>,
116 cache: CacheManager,
117}
118
119struct PackComponent {
120 #[allow(dead_code)]
121 name: String,
122 #[allow(dead_code)]
123 version: String,
124 component: Arc<Component>,
125}
126
127fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
128where
129 F: FnOnce() -> Result<T> + Send + 'static,
130 T: Send + 'static,
131{
132 let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
133 let handle = builder
134 .spawn(move || {
135 let pid = std::process::id();
136 let thread_id = std::thread::current().id();
137 let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
138 tracing::info!(
139 event = "wasmtime.thread.start",
140 task = task_name,
141 pid,
142 thread_id = ?thread_id,
143 tokio_handle_present,
144 "starting Wasmtime thread"
145 );
146 task()
147 })
148 .context("failed to spawn Wasmtime thread")?;
149 handle
150 .join()
151 .map_err(|err| {
152 let reason = if let Some(msg) = err.downcast_ref::<&str>() {
153 msg.to_string()
154 } else if let Some(msg) = err.downcast_ref::<String>() {
155 msg.clone()
156 } else {
157 "unknown panic".to_string()
158 };
159 anyhow!("Wasmtime thread panicked: {reason}")
160 })
161 .and_then(|res| res)
162}
163
164#[derive(Debug, Default, Clone)]
165pub struct ComponentResolution {
166 pub materialized_root: Option<PathBuf>,
168 pub overrides: HashMap<String, PathBuf>,
170 pub dist_offline: bool,
172 pub dist_cache_dir: Option<PathBuf>,
174 pub allow_missing_hash: bool,
176}
177
178fn build_blocking_client() -> BlockingClient {
179 std::thread::spawn(|| {
180 BlockingClient::builder()
181 .no_proxy()
182 .build()
183 .expect("blocking client")
184 })
185 .join()
186 .expect("client build thread panicked")
187}
188
189fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
190 let (root, candidate) = if path.is_absolute() {
191 let parent = path
192 .parent()
193 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
194 let root = parent
195 .canonicalize()
196 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
197 let file = path
198 .file_name()
199 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
200 (root, PathBuf::from(file))
201 } else {
202 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
203 let base = if let Some(parent) = path.parent() {
204 cwd.join(parent)
205 } else {
206 cwd
207 };
208 let root = base
209 .canonicalize()
210 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
211 let file = path
212 .file_name()
213 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
214 (root, PathBuf::from(file))
215 };
216 let safe = normalize_under_root(&root, &candidate)?;
217 Ok((root, safe))
218}
219
220static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct FlowDescriptor {
224 pub id: String,
225 #[serde(rename = "type")]
226 pub flow_type: String,
227 pub pack_id: String,
228 pub profile: String,
229 pub version: String,
230 #[serde(default)]
231 pub description: Option<String>,
232}
233
234pub struct HostState {
235 #[allow(dead_code)]
236 pack_id: String,
237 config: Arc<HostConfig>,
238 http_client: Arc<BlockingClient>,
239 default_env: String,
240 #[allow(dead_code)]
241 session_store: Option<DynSessionStore>,
242 state_store: Option<DynStateStore>,
243 mocks: Option<Arc<MockLayer>>,
244 secrets: DynSecretsManager,
245 oauth_config: Option<OAuthBrokerConfig>,
246 oauth_host: OAuthBrokerHost,
247 exec_ctx: Option<ComponentExecCtx>,
248 component_ref: Option<String>,
249 provider_core_component: bool,
250}
251
252impl HostState {
253 #[allow(clippy::default_constructed_unit_structs)]
254 #[allow(clippy::too_many_arguments)]
255 pub fn new(
256 pack_id: String,
257 config: Arc<HostConfig>,
258 http_client: Arc<BlockingClient>,
259 mocks: Option<Arc<MockLayer>>,
260 session_store: Option<DynSessionStore>,
261 state_store: Option<DynStateStore>,
262 secrets: DynSecretsManager,
263 oauth_config: Option<OAuthBrokerConfig>,
264 exec_ctx: Option<ComponentExecCtx>,
265 component_ref: Option<String>,
266 provider_core_component: bool,
267 ) -> Result<Self> {
268 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
269 Ok(Self {
270 pack_id,
271 config,
272 http_client,
273 default_env,
274 session_store,
275 state_store,
276 mocks,
277 secrets,
278 oauth_config,
279 oauth_host: OAuthBrokerHost::default(),
280 exec_ctx,
281 component_ref,
282 provider_core_component,
283 })
284 }
285
286 fn instantiate_component_result(
287 linker: &mut Linker<ComponentState>,
288 store: &mut Store<ComponentState>,
289 component: &Component,
290 ctx: &ComponentExecCtx,
291 operation: &str,
292 input_json: &str,
293 ) -> Result<InvokeResult> {
294 let pre_instance = linker.instantiate_pre(component)?;
295 match component_api::v0_5::ComponentPre::new(pre_instance) {
296 Ok(pre) => {
297 let result = block_on(async {
298 let bindings = pre.instantiate_async(&mut *store).await?;
299 let node = bindings.greentic_component_node();
300 let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
301 let operation_owned = operation.to_string();
302 let input_owned = input_json.to_string();
303 node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
304 })?;
305 Ok(component_api::invoke_result_from_v0_5(result))
306 }
307 Err(err) => {
308 if is_missing_node_export(&err, "0.5.0") {
309 let pre_instance = linker.instantiate_pre(component)?;
310 let pre: component_api::v0_4::ComponentPre<ComponentState> =
311 component_api::v0_4::ComponentPre::new(pre_instance)?;
312 let result = block_on(async {
313 let bindings = pre.instantiate_async(&mut *store).await?;
314 let node = bindings.greentic_component_node();
315 let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
316 let operation_owned = operation.to_string();
317 let input_owned = input_json.to_string();
318 node.call_invoke(&mut *store, &ctx_v04, &operation_owned, &input_owned)
319 })?;
320 Ok(component_api::invoke_result_from_v0_4(result))
321 } else {
322 Err(err)
323 }
324 }
325 }
326 }
327
328 fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
329 match result {
330 InvokeResult::Ok(body) => {
331 if body.is_empty() {
332 return Ok(Value::Null);
333 }
334 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
335 }
336 InvokeResult::Err(NodeError {
337 code,
338 message,
339 retryable,
340 backoff_ms,
341 details,
342 }) => {
343 let mut obj = serde_json::Map::new();
344 obj.insert("ok".into(), Value::Bool(false));
345 let mut error = serde_json::Map::new();
346 error.insert("code".into(), Value::String(code));
347 error.insert("message".into(), Value::String(message));
348 error.insert("retryable".into(), Value::Bool(retryable));
349 if let Some(backoff) = backoff_ms {
350 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
351 }
352 if let Some(details) = details {
353 error.insert(
354 "details".into(),
355 serde_json::from_str(&details).unwrap_or(Value::String(details)),
356 );
357 }
358 obj.insert("error".into(), Value::Object(error));
359 Ok(Value::Object(obj))
360 }
361 }
362 }
363
364 pub fn get_secret(&self, key: &str) -> Result<String> {
365 if provider_core_only::is_enabled() {
366 bail!(provider_core_only::blocked_message("secrets"))
367 }
368 if !self.config.secrets_policy.is_allowed(key) {
369 bail!("secret {key} is not permitted by bindings policy");
370 }
371 if let Some(mock) = &self.mocks
372 && let Some(value) = mock.secrets_lookup(key)
373 {
374 return Ok(value);
375 }
376 let ctx = self.config.tenant_ctx();
377 let bytes = read_secret_blocking(&self.secrets, &ctx, key)
378 .context("failed to read secret from manager")?;
379 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
380 Ok(value)
381 }
382
383 fn allows_secret_write_in_provider_core_only(&self) -> bool {
384 self.provider_core_component || self.component_ref.is_none()
385 }
386
387 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
388 let tenant_raw = ctx
389 .as_ref()
390 .map(|ctx| ctx.tenant.clone())
391 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
392 .unwrap_or_else(|| self.config.tenant.clone());
393 let env_raw = ctx
394 .as_ref()
395 .map(|ctx| ctx.env.clone())
396 .unwrap_or_else(|| self.default_env.clone());
397 let tenant_id = TenantId::from_str(&tenant_raw)
398 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
399 let env_id = EnvId::from_str(&env_raw)
400 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
401 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
402 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
403 if let Some(team) = exec_ctx.tenant.team.as_ref() {
404 let team_id =
405 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
406 tenant_ctx = tenant_ctx.with_team(Some(team_id));
407 }
408 if let Some(user) = exec_ctx.tenant.user.as_ref() {
409 let user_id =
410 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
411 tenant_ctx = tenant_ctx.with_user(Some(user_id));
412 }
413 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
414 if let Some(node) = exec_ctx.node_id.as_ref() {
415 tenant_ctx = tenant_ctx.with_node(node.clone());
416 }
417 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
418 tenant_ctx = tenant_ctx.with_session(session.clone());
419 }
420 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
421 }
422
423 if let Some(ctx) = ctx {
424 if let Some(team) = ctx.team.or(ctx.team_id) {
425 let team_id =
426 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
427 tenant_ctx = tenant_ctx.with_team(Some(team_id));
428 }
429 if let Some(user) = ctx.user.or(ctx.user_id) {
430 let user_id =
431 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
432 tenant_ctx = tenant_ctx.with_user(Some(user_id));
433 }
434 if let Some(flow) = ctx.flow_id {
435 tenant_ctx = tenant_ctx.with_flow(flow);
436 }
437 if let Some(node) = ctx.node_id {
438 tenant_ctx = tenant_ctx.with_node(node);
439 }
440 if let Some(provider) = ctx.provider_id {
441 tenant_ctx = tenant_ctx.with_provider(provider);
442 }
443 if let Some(session) = ctx.session_id {
444 tenant_ctx = tenant_ctx.with_session(session);
445 }
446 tenant_ctx.trace_id = ctx.trace_id;
447 }
448 Ok(tenant_ctx)
449 }
450
451 fn send_http_request(
452 &mut self,
453 req: HttpRequest,
454 opts: Option<HttpRequestOptionsV1_1>,
455 _ctx: Option<HttpTenantCtx>,
456 ) -> Result<HttpResponse, HttpClientError> {
457 if !self.config.http_enabled {
458 return Err(HttpClientError {
459 code: "denied".into(),
460 message: "http client disabled by policy".into(),
461 });
462 }
463
464 let mut mock_state = None;
465 let raw_body = req.body.clone();
466 if let Some(mock) = &self.mocks
467 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
468 {
469 match mock.http_begin(&meta) {
470 HttpDecision::Mock(response) => {
471 let headers = response
472 .headers
473 .iter()
474 .map(|(k, v)| (k.clone(), v.clone()))
475 .collect();
476 return Ok(HttpResponse {
477 status: response.status,
478 headers,
479 body: response.body.clone().map(|b| b.into_bytes()),
480 });
481 }
482 HttpDecision::Deny(reason) => {
483 return Err(HttpClientError {
484 code: "denied".into(),
485 message: reason,
486 });
487 }
488 HttpDecision::Passthrough { record } => {
489 mock_state = Some((meta, record));
490 }
491 }
492 }
493
494 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
495 let mut builder = self.http_client.request(method, &req.url);
496 for (key, value) in req.headers {
497 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
498 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
499 {
500 builder = builder.header(header, header_value);
501 }
502 }
503
504 if let Some(body) = raw_body.clone() {
505 builder = builder.body(body);
506 }
507
508 if let Some(opts) = opts {
509 if let Some(timeout_ms) = opts.timeout_ms {
510 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
511 }
512 if opts.allow_insecure == Some(true) {
513 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
514 }
515 if let Some(follow_redirects) = opts.follow_redirects
516 && !follow_redirects
517 {
518 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
519 }
520 }
521
522 let response = match builder.send() {
523 Ok(resp) => resp,
524 Err(err) => {
525 warn!(url = %req.url, error = %err, "http client request failed");
526 return Err(HttpClientError {
527 code: "unavailable".into(),
528 message: err.to_string(),
529 });
530 }
531 };
532
533 let status = response.status().as_u16();
534 let headers_vec = response
535 .headers()
536 .iter()
537 .map(|(k, v)| {
538 (
539 k.as_str().to_string(),
540 v.to_str().unwrap_or_default().to_string(),
541 )
542 })
543 .collect::<Vec<_>>();
544 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
545
546 if let Some((meta, true)) = mock_state.take()
547 && let Some(mock) = &self.mocks
548 {
549 let recorded = HttpMockResponse::new(
550 status,
551 headers_vec.clone().into_iter().collect(),
552 body_bytes
553 .as_ref()
554 .map(|b| String::from_utf8_lossy(b).into_owned()),
555 );
556 mock.http_record(&meta, &recorded);
557 }
558
559 Ok(HttpResponse {
560 status,
561 headers: headers_vec,
562 body: body_bytes,
563 })
564 }
565}
566
567impl SecretsStoreHost for HostState {
568 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
569 if provider_core_only::is_enabled() {
570 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
571 return Err(SecretsError::Denied);
572 }
573 if !self.config.secrets_policy.is_allowed(&key) {
574 return Err(SecretsError::Denied);
575 }
576 if let Some(mock) = &self.mocks
577 && let Some(value) = mock.secrets_lookup(&key)
578 {
579 return Ok(Some(value.into_bytes()));
580 }
581 let ctx = self.config.tenant_ctx();
582 match read_secret_blocking(&self.secrets, &ctx, &key) {
583 Ok(bytes) => Ok(Some(bytes)),
584 Err(err) => {
585 warn!(secret = %key, error = %err, "secret lookup failed");
586 Err(SecretsError::NotFound)
587 }
588 }
589 }
590}
591
592impl SecretsStoreHostV1_1 for HostState {
593 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
594 if provider_core_only::is_enabled() {
595 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
596 return Err(SecretsErrorV1_1::Denied);
597 }
598 if !self.config.secrets_policy.is_allowed(&key) {
599 return Err(SecretsErrorV1_1::Denied);
600 }
601 if let Some(mock) = &self.mocks
602 && let Some(value) = mock.secrets_lookup(&key)
603 {
604 return Ok(Some(value.into_bytes()));
605 }
606 let ctx = self.config.tenant_ctx();
607 match read_secret_blocking(&self.secrets, &ctx, &key) {
608 Ok(bytes) => Ok(Some(bytes)),
609 Err(err) => {
610 warn!(secret = %key, error = %err, "secret lookup failed");
611 Err(SecretsErrorV1_1::NotFound)
612 }
613 }
614 }
615
616 fn put(&mut self, key: String, value: Vec<u8>) {
617 if key.trim().is_empty() {
618 warn!(secret = %key, "secret write blocked: empty key");
619 panic!("secret write denied for key {key}: invalid key");
620 }
621 if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
622 warn!(
623 secret = %key,
624 component = self.component_ref.as_deref().unwrap_or("<pack>"),
625 "provider-core only mode enabled; blocking secrets store write"
626 );
627 panic!("secret write denied for key {key}: provider-core-only mode");
628 }
629 if !self.config.secrets_policy.is_allowed(&key) {
630 warn!(secret = %key, "secret write denied by bindings policy");
631 panic!("secret write denied for key {key}: policy");
632 }
633 let ctx = self.config.tenant_ctx();
634 if let Err(err) = write_secret_blocking(&self.secrets, &ctx, &key, &value) {
635 warn!(secret = %key, error = %err, "secret write failed");
636 panic!("secret write failed for key {key}");
637 }
638 }
639}
640
641impl HttpClientHost for HostState {
642 fn send(
643 &mut self,
644 req: HttpRequest,
645 ctx: Option<HttpTenantCtx>,
646 ) -> Result<HttpResponse, HttpClientError> {
647 self.send_http_request(req, None, ctx)
648 }
649}
650
651impl HttpClientHostV1_1 for HostState {
652 fn send(
653 &mut self,
654 req: HttpRequestV1_1,
655 opts: Option<HttpRequestOptionsV1_1>,
656 ctx: Option<HttpTenantCtxV1_1>,
657 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
658 let legacy_req = HttpRequest {
659 method: req.method,
660 url: req.url,
661 headers: req.headers,
662 body: req.body,
663 };
664 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
665 env: ctx.env,
666 tenant: ctx.tenant,
667 tenant_id: ctx.tenant_id,
668 team: ctx.team,
669 team_id: ctx.team_id,
670 user: ctx.user,
671 user_id: ctx.user_id,
672 trace_id: ctx.trace_id,
673 correlation_id: ctx.correlation_id,
674 attributes: ctx.attributes,
675 session_id: ctx.session_id,
676 flow_id: ctx.flow_id,
677 node_id: ctx.node_id,
678 provider_id: ctx.provider_id,
679 deadline_ms: ctx.deadline_ms,
680 attempt: ctx.attempt,
681 idempotency_key: ctx.idempotency_key,
682 impersonation: ctx
683 .impersonation
684 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
685 actor_id,
686 reason,
687 }),
688 });
689
690 self.send_http_request(legacy_req, opts, legacy_ctx)
691 .map(|resp| HttpResponseV1_1 {
692 status: resp.status,
693 headers: resp.headers,
694 body: resp.body,
695 })
696 .map_err(|err| HttpClientErrorV1_1 {
697 code: err.code,
698 message: err.message,
699 })
700 }
701}
702
703impl StateStoreHost for HostState {
704 fn read(
705 &mut self,
706 key: HostStateKey,
707 ctx: Option<StateTenantCtx>,
708 ) -> Result<Vec<u8>, StateError> {
709 let store = match self.state_store.as_ref() {
710 Some(store) => store.clone(),
711 None => {
712 return Err(StateError {
713 code: "unavailable".into(),
714 message: "state store not configured".into(),
715 });
716 }
717 };
718 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
719 Ok(ctx) => ctx,
720 Err(err) => {
721 return Err(StateError {
722 code: "invalid-ctx".into(),
723 message: err.to_string(),
724 });
725 }
726 };
727 #[cfg(feature = "fault-injection")]
728 {
729 let exec_ctx = self.exec_ctx.as_ref();
730 let flow_id = exec_ctx
731 .map(|ctx| ctx.flow_id.as_str())
732 .unwrap_or("unknown");
733 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
734 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
735 let fault_ctx = FaultContext {
736 pack_id: self.pack_id.as_str(),
737 flow_id,
738 node_id,
739 attempt,
740 };
741 if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
742 return Err(StateError {
743 code: "internal".into(),
744 message: err.to_string(),
745 });
746 }
747 }
748 let key = StoreStateKey::from(key);
749 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
750 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
751 Ok(None) => Err(StateError {
752 code: "not_found".into(),
753 message: "state key not found".into(),
754 }),
755 Err(err) => Err(StateError {
756 code: "internal".into(),
757 message: err.to_string(),
758 }),
759 }
760 }
761
762 fn write(
763 &mut self,
764 key: HostStateKey,
765 bytes: Vec<u8>,
766 ctx: Option<StateTenantCtx>,
767 ) -> Result<StateOpAck, StateError> {
768 let store = match self.state_store.as_ref() {
769 Some(store) => store.clone(),
770 None => {
771 return Err(StateError {
772 code: "unavailable".into(),
773 message: "state store not configured".into(),
774 });
775 }
776 };
777 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
778 Ok(ctx) => ctx,
779 Err(err) => {
780 return Err(StateError {
781 code: "invalid-ctx".into(),
782 message: err.to_string(),
783 });
784 }
785 };
786 #[cfg(feature = "fault-injection")]
787 {
788 let exec_ctx = self.exec_ctx.as_ref();
789 let flow_id = exec_ctx
790 .map(|ctx| ctx.flow_id.as_str())
791 .unwrap_or("unknown");
792 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
793 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
794 let fault_ctx = FaultContext {
795 pack_id: self.pack_id.as_str(),
796 flow_id,
797 node_id,
798 attempt,
799 };
800 if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
801 return Err(StateError {
802 code: "internal".into(),
803 message: err.to_string(),
804 });
805 }
806 }
807 let key = StoreStateKey::from(key);
808 let value = serde_json::from_slice(&bytes)
809 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
810 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
811 Ok(()) => Ok(StateOpAck::Ok),
812 Err(err) => Err(StateError {
813 code: "internal".into(),
814 message: err.to_string(),
815 }),
816 }
817 }
818
819 fn delete(
820 &mut self,
821 key: HostStateKey,
822 ctx: Option<StateTenantCtx>,
823 ) -> Result<StateOpAck, StateError> {
824 let store = match self.state_store.as_ref() {
825 Some(store) => store.clone(),
826 None => {
827 return Err(StateError {
828 code: "unavailable".into(),
829 message: "state store not configured".into(),
830 });
831 }
832 };
833 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
834 Ok(ctx) => ctx,
835 Err(err) => {
836 return Err(StateError {
837 code: "invalid-ctx".into(),
838 message: err.to_string(),
839 });
840 }
841 };
842 let key = StoreStateKey::from(key);
843 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
844 Ok(_) => Ok(StateOpAck::Ok),
845 Err(err) => Err(StateError {
846 code: "internal".into(),
847 message: err.to_string(),
848 }),
849 }
850 }
851}
852
853impl TelemetryLoggerHost for HostState {
854 fn log(
855 &mut self,
856 span: TelemetrySpanContext,
857 fields: Vec<(String, String)>,
858 _ctx: Option<TelemetryTenantCtx>,
859 ) -> Result<TelemetryAck, TelemetryError> {
860 if let Some(mock) = &self.mocks
861 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
862 {
863 return Ok(TelemetryAck::Ok);
864 }
865 let mut map = serde_json::Map::new();
866 for (k, v) in fields {
867 map.insert(k, Value::String(v));
868 }
869 tracing::info!(
870 tenant = %span.tenant,
871 flow_id = %span.flow_id,
872 node = ?span.node_id,
873 provider = %span.provider,
874 fields = %serde_json::Value::Object(map.clone()),
875 "telemetry log from pack"
876 );
877 Ok(TelemetryAck::Ok)
878 }
879}
880
881impl RunnerHostHttp for HostState {
882 fn request(
883 &mut self,
884 method: String,
885 url: String,
886 headers: Vec<String>,
887 body: Option<Vec<u8>>,
888 ) -> Result<Vec<u8>, String> {
889 let req = HttpRequest {
890 method,
891 url,
892 headers: headers
893 .chunks(2)
894 .filter_map(|chunk| {
895 if chunk.len() == 2 {
896 Some((chunk[0].clone(), chunk[1].clone()))
897 } else {
898 None
899 }
900 })
901 .collect(),
902 body,
903 };
904 match HttpClientHost::send(self, req, None) {
905 Ok(resp) => Ok(resp.body.unwrap_or_default()),
906 Err(err) => Err(err.message),
907 }
908 }
909}
910
911impl RunnerHostKv for HostState {
912 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
913 None
914 }
915
916 fn put(&mut self, _ns: String, _key: String, _val: String) {}
917}
918
919enum ManifestLoad {
920 New {
921 manifest: Box<greentic_types::PackManifest>,
922 flows: PackFlows,
923 },
924 Legacy {
925 manifest: Box<legacy_pack::PackManifest>,
926 flows: PackFlows,
927 },
928}
929
930fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
931 let mut archive = ZipArchive::new(File::open(path)?)
932 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
933 let bytes = read_entry(&mut archive, "manifest.cbor")
934 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
935 match decode_pack_manifest(&bytes) {
936 Ok(manifest) => {
937 let cache = PackFlows::from_manifest(manifest.clone());
938 Ok(ManifestLoad::New {
939 manifest: Box::new(manifest),
940 flows: cache,
941 })
942 }
943 Err(err) => {
944 tracing::debug!(
945 error = %err,
946 pack = %path.display(),
947 "decode_pack_manifest failed for archive; falling back to legacy manifest"
948 );
949 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
950 .context("failed to decode legacy pack manifest from manifest.cbor")?;
951 let flows = load_legacy_flows_from_archive(&mut archive, path, &legacy)?;
952 Ok(ManifestLoad::Legacy {
953 manifest: Box::new(legacy),
954 flows,
955 })
956 }
957 }
958}
959
960fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
961 let manifest_path = root.join("manifest.cbor");
962 let bytes = std::fs::read(&manifest_path)
963 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
964 match decode_pack_manifest(&bytes) {
965 Ok(manifest) => {
966 let cache = PackFlows::from_manifest(manifest.clone());
967 Ok(ManifestLoad::New {
968 manifest: Box::new(manifest),
969 flows: cache,
970 })
971 }
972 Err(err) => {
973 tracing::debug!(
974 error = %err,
975 pack = %root.display(),
976 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
977 );
978 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
979 .context("failed to decode legacy pack manifest from manifest.cbor")?;
980 let flows = load_legacy_flows_from_dir(root, &legacy)?;
981 Ok(ManifestLoad::Legacy {
982 manifest: Box::new(legacy),
983 flows,
984 })
985 }
986 }
987}
988
989fn load_legacy_flows_from_dir(
990 root: &Path,
991 manifest: &legacy_pack::PackManifest,
992) -> Result<PackFlows> {
993 build_legacy_flows(manifest, |rel_path| {
994 let path = root.join(rel_path);
995 std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
996 })
997}
998
999fn load_legacy_flows_from_archive(
1000 archive: &mut ZipArchive<File>,
1001 pack_path: &Path,
1002 manifest: &legacy_pack::PackManifest,
1003) -> Result<PackFlows> {
1004 build_legacy_flows(manifest, |rel_path| {
1005 read_entry(archive, rel_path)
1006 .with_context(|| format!("missing flow json {} in {}", rel_path, pack_path.display()))
1007 })
1008}
1009
1010fn build_legacy_flows(
1011 manifest: &legacy_pack::PackManifest,
1012 mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1013) -> Result<PackFlows> {
1014 let mut flows = HashMap::new();
1015 let mut descriptors = Vec::new();
1016
1017 for entry in &manifest.flows {
1018 let bytes = read_json(&entry.file_json)
1019 .with_context(|| format!("missing flow json {}", entry.file_json))?;
1020 let doc = parse_flow_doc_with_legacy_aliases(&bytes, &entry.file_json)?;
1021 let normalized = normalize_flow_doc(doc);
1022 let flow_ir = flow_doc_to_ir(normalized)?;
1023 let flow = flow_ir_to_flow(flow_ir)?;
1024
1025 descriptors.push(FlowDescriptor {
1026 id: entry.id.clone(),
1027 flow_type: entry.kind.clone(),
1028 pack_id: manifest.meta.pack_id.clone(),
1029 profile: manifest.meta.pack_id.clone(),
1030 version: manifest.meta.version.to_string(),
1031 description: None,
1032 });
1033 flows.insert(entry.id.clone(), flow);
1034 }
1035
1036 let mut entry_flows = manifest.meta.entry_flows.clone();
1037 if entry_flows.is_empty() {
1038 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1039 }
1040 let metadata = PackMetadata {
1041 pack_id: manifest.meta.pack_id.clone(),
1042 version: manifest.meta.version.to_string(),
1043 entry_flows,
1044 secret_requirements: Vec::new(),
1045 };
1046
1047 Ok(PackFlows {
1048 descriptors,
1049 flows,
1050 metadata,
1051 })
1052}
1053
1054fn parse_flow_doc_with_legacy_aliases(bytes: &[u8], path: &str) -> Result<FlowDoc> {
1055 let mut value: Value = serde_json::from_slice(bytes)
1056 .with_context(|| format!("failed to decode flow doc {}", path))?;
1057 if let Some(map) = value.as_object_mut()
1058 && !map.contains_key("type")
1059 && let Some(flow_type) = map.remove("flow_type")
1060 {
1061 map.insert("type".to_string(), flow_type);
1062 }
1063 serde_json::from_value(value).with_context(|| format!("failed to decode flow doc {}", path))
1064}
1065
1066pub struct ComponentState {
1067 pub host: HostState,
1068 wasi_ctx: WasiCtx,
1069 resource_table: ResourceTable,
1070}
1071
1072impl ComponentState {
1073 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1074 let wasi_ctx = policy
1075 .instantiate()
1076 .context("failed to build WASI context")?;
1077 Ok(Self {
1078 host,
1079 wasi_ctx,
1080 resource_table: ResourceTable::new(),
1081 })
1082 }
1083
1084 fn host_mut(&mut self) -> &mut HostState {
1085 &mut self.host
1086 }
1087
1088 fn should_cancel_host(&mut self) -> bool {
1089 false
1090 }
1091
1092 fn yield_now_host(&mut self) {
1093 }
1095}
1096
1097impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1098 fn should_cancel(&mut self) -> bool {
1099 self.should_cancel_host()
1100 }
1101
1102 fn yield_now(&mut self) {
1103 self.yield_now_host();
1104 }
1105}
1106
1107impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1108 fn should_cancel(&mut self) -> bool {
1109 self.should_cancel_host()
1110 }
1111
1112 fn yield_now(&mut self) {
1113 self.yield_now_host();
1114 }
1115}
1116
1117fn add_component_control_instance(
1118 linker: &mut Linker<ComponentState>,
1119 name: &str,
1120) -> wasmtime::Result<()> {
1121 let mut inst = linker.instance(name)?;
1122 inst.func_wrap(
1123 "should-cancel",
1124 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1125 let host = caller.data_mut();
1126 Ok((host.should_cancel_host(),))
1127 },
1128 )?;
1129 inst.func_wrap(
1130 "yield-now",
1131 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1132 let host = caller.data_mut();
1133 host.yield_now_host();
1134 Ok(())
1135 },
1136 )?;
1137 Ok(())
1138}
1139
1140fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1141 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1142 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1143 Ok(())
1144}
1145
1146pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1147 add_wasi_to_linker(linker)?;
1148 add_all_v1_to_linker(
1149 linker,
1150 HostFns {
1151 http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1152 http_client: Some(|state: &mut ComponentState| state.host_mut()),
1153 oauth_broker: None,
1154 runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1155 runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1156 telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1157 state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1158 secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1159 secrets_store: None,
1160 },
1161 )?;
1162 add_http_client_client_world_aliases(linker)?;
1163 Ok(())
1164}
1165
1166fn add_http_client_client_world_aliases(linker: &mut Linker<ComponentState>) -> Result<()> {
1167 let mut inst_v1_1 = linker.instance("greentic:http/client@1.1.0")?;
1168 inst_v1_1.func_wrap(
1169 "send",
1170 move |mut caller: StoreContextMut<'_, ComponentState>,
1171 (req, opts, ctx): (
1172 http_client_client_alias::Request,
1173 Option<http_client_client_alias::RequestOptions>,
1174 Option<http_client_client_alias::TenantCtx>,
1175 )| {
1176 let host = caller.data_mut().host_mut();
1177 let result = HttpClientHostV1_1::send(
1178 host,
1179 alias_request_to_host(req),
1180 opts.map(alias_request_options_to_host),
1181 ctx.map(alias_tenant_ctx_to_host),
1182 );
1183 Ok((match result {
1184 Ok(resp) => Ok(alias_response_from_host(resp)),
1185 Err(err) => Err(alias_error_from_host(err)),
1186 },))
1187 },
1188 )?;
1189 let mut inst_v1_0 = linker.instance("greentic:http/client@1.0.0")?;
1190 inst_v1_0.func_wrap(
1191 "send",
1192 move |mut caller: StoreContextMut<'_, ComponentState>,
1193 (req, ctx): (
1194 host_http_client::Request,
1195 Option<host_http_client::TenantCtx>,
1196 )| {
1197 let host = caller.data_mut().host_mut();
1198 let result = HttpClientHost::send(host, req, ctx);
1199 Ok((result,))
1200 },
1201 )?;
1202 Ok(())
1203}
1204
1205fn alias_request_to_host(req: http_client_client_alias::Request) -> host_http_client::RequestV1_1 {
1206 host_http_client::RequestV1_1 {
1207 method: req.method,
1208 url: req.url,
1209 headers: req.headers,
1210 body: req.body,
1211 }
1212}
1213
1214fn alias_request_options_to_host(
1215 opts: http_client_client_alias::RequestOptions,
1216) -> host_http_client::RequestOptionsV1_1 {
1217 host_http_client::RequestOptionsV1_1 {
1218 timeout_ms: opts.timeout_ms,
1219 allow_insecure: opts.allow_insecure,
1220 follow_redirects: opts.follow_redirects,
1221 }
1222}
1223
1224fn alias_tenant_ctx_to_host(
1225 ctx: http_client_client_alias::TenantCtx,
1226) -> host_http_client::TenantCtxV1_1 {
1227 host_http_client::TenantCtxV1_1 {
1228 env: ctx.env,
1229 tenant: ctx.tenant,
1230 tenant_id: ctx.tenant_id,
1231 team: ctx.team,
1232 team_id: ctx.team_id,
1233 user: ctx.user,
1234 user_id: ctx.user_id,
1235 trace_id: ctx.trace_id,
1236 correlation_id: ctx.correlation_id,
1237 attributes: ctx.attributes,
1238 session_id: ctx.session_id,
1239 flow_id: ctx.flow_id,
1240 node_id: ctx.node_id,
1241 provider_id: ctx.provider_id,
1242 deadline_ms: ctx.deadline_ms,
1243 attempt: ctx.attempt,
1244 idempotency_key: ctx.idempotency_key,
1245 impersonation: ctx.impersonation.map(|imp| ImpersonationV1_1 {
1246 actor_id: imp.actor_id,
1247 reason: imp.reason,
1248 }),
1249 }
1250}
1251
1252fn alias_response_from_host(
1253 resp: host_http_client::ResponseV1_1,
1254) -> http_client_client_alias::Response {
1255 http_client_client_alias::Response {
1256 status: resp.status,
1257 headers: resp.headers,
1258 body: resp.body,
1259 }
1260}
1261
1262fn alias_error_from_host(
1263 err: host_http_client::HttpClientErrorV1_1,
1264) -> http_client_client_alias::HostError {
1265 http_client_client_alias::HostError {
1266 code: err.code,
1267 message: err.message,
1268 }
1269}
1270
1271impl OAuthHostContext for ComponentState {
1272 fn tenant_id(&self) -> &str {
1273 &self.host.config.tenant
1274 }
1275
1276 fn env(&self) -> &str {
1277 &self.host.default_env
1278 }
1279
1280 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1281 &mut self.host.oauth_host
1282 }
1283
1284 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1285 self.host.oauth_config.as_ref()
1286 }
1287}
1288
1289impl WasiView for ComponentState {
1290 fn ctx(&mut self) -> WasiCtxView<'_> {
1291 WasiCtxView {
1292 ctx: &mut self.wasi_ctx,
1293 table: &mut self.resource_table,
1294 }
1295 }
1296}
1297
1298#[allow(unsafe_code)]
1299unsafe impl Send for ComponentState {}
1300#[allow(unsafe_code)]
1301unsafe impl Sync for ComponentState {}
1302
1303impl PackRuntime {
1304 fn allows_state_store(&self, component_ref: &str) -> bool {
1305 if self.state_store.is_none() {
1306 return false;
1307 }
1308 if !self.config.state_store_policy.allow {
1309 return false;
1310 }
1311 let Some(manifest) = self.component_manifests.get(component_ref) else {
1312 return false;
1313 };
1314 manifest
1315 .capabilities
1316 .host
1317 .state
1318 .as_ref()
1319 .map(|caps| caps.read || caps.write)
1320 .unwrap_or(false)
1321 }
1322
1323 pub fn contains_component(&self, component_ref: &str) -> bool {
1324 self.components.contains_key(component_ref)
1325 }
1326
1327 #[allow(clippy::too_many_arguments)]
1328 pub async fn load(
1329 path: impl AsRef<Path>,
1330 config: Arc<HostConfig>,
1331 mocks: Option<Arc<MockLayer>>,
1332 archive_source: Option<&Path>,
1333 session_store: Option<DynSessionStore>,
1334 state_store: Option<DynStateStore>,
1335 wasi_policy: Arc<RunnerWasiPolicy>,
1336 secrets: DynSecretsManager,
1337 oauth_config: Option<OAuthBrokerConfig>,
1338 verify_archive: bool,
1339 component_resolution: ComponentResolution,
1340 ) -> Result<Self> {
1341 let path = path.as_ref();
1342 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1343 let path_meta = std::fs::metadata(&safe_path).ok();
1344 let is_dir = path_meta
1345 .as_ref()
1346 .map(|meta| meta.is_dir())
1347 .unwrap_or(false);
1348 let is_component = !is_dir
1349 && safe_path
1350 .extension()
1351 .and_then(|ext| ext.to_str())
1352 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1353 .unwrap_or(false);
1354 let archive_hint_path = if let Some(source) = archive_source {
1355 let (_, normalized) = normalize_pack_path(source)?;
1356 Some(normalized)
1357 } else if is_component || is_dir {
1358 None
1359 } else {
1360 Some(safe_path.clone())
1361 };
1362 let archive_hint = archive_hint_path.as_deref();
1363 if verify_archive {
1364 if let Some(verify_target) = archive_hint.and_then(|p| {
1365 std::fs::metadata(p)
1366 .ok()
1367 .filter(|meta| meta.is_file())
1368 .map(|_| p)
1369 }) {
1370 verify::verify_pack(verify_target).await?;
1371 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1372 } else {
1373 tracing::debug!("skipping archive verification (no archive source)");
1374 }
1375 }
1376 let engine = Engine::default();
1377 let engine_profile =
1378 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1379 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1380 let mut metadata = PackMetadata::fallback(&safe_path);
1381 let mut manifest = None;
1382 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1383 let mut flows = None;
1384 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1385 if is_dir {
1386 Some(safe_path.clone())
1387 } else {
1388 None
1389 }
1390 });
1391 let (pack_assets_dir, assets_tempdir) =
1392 locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1393 let setup_yaml_exists = pack_assets_dir
1394 .as_ref()
1395 .map(|dir| dir.join("setup.yaml").is_file())
1396 .unwrap_or(false);
1397 tracing::info!(
1398 pack_root = %safe_path.display(),
1399 assets_setup_yaml_exists = setup_yaml_exists,
1400 "pack unpack metadata"
1401 );
1402
1403 if let Some(root) = materialized_root.as_ref() {
1404 match load_manifest_and_flows_from_dir(root) {
1405 Ok(ManifestLoad::New {
1406 manifest: m,
1407 flows: cache,
1408 }) => {
1409 metadata = cache.metadata.clone();
1410 manifest = Some(*m);
1411 flows = Some(cache);
1412 }
1413 Ok(ManifestLoad::Legacy {
1414 manifest: m,
1415 flows: cache,
1416 }) => {
1417 metadata = cache.metadata.clone();
1418 legacy_manifest = Some(m);
1419 flows = Some(cache);
1420 }
1421 Err(err) => {
1422 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1423 }
1424 }
1425 }
1426
1427 if manifest.is_none()
1428 && legacy_manifest.is_none()
1429 && let Some(archive_path) = archive_hint
1430 {
1431 let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1432 format!(
1433 "failed to load manifest.cbor from {}",
1434 archive_path.display()
1435 )
1436 })?;
1437 match manifest_load {
1438 ManifestLoad::New {
1439 manifest: m,
1440 flows: cache,
1441 } => {
1442 metadata = cache.metadata.clone();
1443 manifest = Some(*m);
1444 flows = Some(cache);
1445 }
1446 ManifestLoad::Legacy {
1447 manifest: m,
1448 flows: cache,
1449 } => {
1450 metadata = cache.metadata.clone();
1451 legacy_manifest = Some(m);
1452 flows = Some(cache);
1453 }
1454 }
1455 }
1456 #[cfg(feature = "fault-injection")]
1457 {
1458 let fault_ctx = FaultContext {
1459 pack_id: metadata.pack_id.as_str(),
1460 flow_id: "unknown",
1461 node_id: None,
1462 attempt: 1,
1463 };
1464 maybe_fail(FaultPoint::PackResolve, fault_ctx)
1465 .map_err(|err| anyhow!(err.to_string()))?;
1466 }
1467 let mut pack_lock = None;
1468 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1469 pack_lock = load_pack_lock(&root)?;
1470 if pack_lock.is_some() {
1471 break;
1472 }
1473 }
1474 let component_sources_payload = if pack_lock.is_none() {
1475 if let Some(manifest) = manifest.as_ref() {
1476 manifest
1477 .get_component_sources_v1()
1478 .context("invalid component sources extension")?
1479 } else {
1480 None
1481 }
1482 } else {
1483 None
1484 };
1485 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1486 Some(component_sources_table_from_pack_lock(
1487 lock,
1488 component_resolution.allow_missing_hash,
1489 )?)
1490 } else {
1491 component_sources_table(component_sources_payload.as_ref())?
1492 };
1493 let components = if is_component {
1494 let wasm_bytes = fs::read(&safe_path).await?;
1495 metadata = PackMetadata::from_wasm(&wasm_bytes)
1496 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1497 let name = safe_path
1498 .file_stem()
1499 .map(|s| s.to_string_lossy().to_string())
1500 .unwrap_or_else(|| "component".to_string());
1501 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1502 let mut map = HashMap::new();
1503 map.insert(
1504 name.clone(),
1505 PackComponent {
1506 name,
1507 version: metadata.version.clone(),
1508 component,
1509 },
1510 );
1511 map
1512 } else {
1513 let specs = component_specs(
1514 manifest.as_ref(),
1515 legacy_manifest.as_deref(),
1516 component_sources_payload.as_ref(),
1517 pack_lock.as_ref(),
1518 );
1519 if specs.is_empty() {
1520 HashMap::new()
1521 } else {
1522 let mut loaded = HashMap::new();
1523 let mut missing: HashSet<String> =
1524 specs.iter().map(|spec| spec.id.clone()).collect();
1525 let mut searched = Vec::new();
1526
1527 if !component_resolution.overrides.is_empty() {
1528 load_components_from_overrides(
1529 &cache,
1530 &engine,
1531 &component_resolution.overrides,
1532 &specs,
1533 &mut missing,
1534 &mut loaded,
1535 )
1536 .await?;
1537 searched.push("override map".to_string());
1538 }
1539
1540 if let Some(component_sources) = component_sources.as_ref() {
1541 load_components_from_sources(
1542 &cache,
1543 &engine,
1544 component_sources,
1545 &component_resolution,
1546 &specs,
1547 &mut missing,
1548 &mut loaded,
1549 materialized_root.as_deref(),
1550 archive_hint,
1551 )
1552 .await?;
1553 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1554 }
1555
1556 if let Some(root) = materialized_root.as_ref() {
1557 load_components_from_dir(
1558 &cache,
1559 &engine,
1560 root,
1561 &specs,
1562 &mut missing,
1563 &mut loaded,
1564 )
1565 .await?;
1566 searched.push(format!("components dir {}", root.display()));
1567 }
1568
1569 if let Some(archive_path) = archive_hint {
1570 load_components_from_archive(
1571 &cache,
1572 &engine,
1573 archive_path,
1574 &specs,
1575 &mut missing,
1576 &mut loaded,
1577 )
1578 .await?;
1579 searched.push(format!("archive {}", archive_path.display()));
1580 }
1581
1582 if !missing.is_empty() {
1583 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1584 let sources = if searched.is_empty() {
1585 "no component sources".to_string()
1586 } else {
1587 searched.join(", ")
1588 };
1589 bail!(
1590 "components missing: {}; looked in {}",
1591 missing_list,
1592 sources
1593 );
1594 }
1595
1596 loaded
1597 }
1598 };
1599 let http_client = Arc::clone(&HTTP_CLIENT);
1600 let mut component_manifests = HashMap::new();
1601 if let Some(manifest) = manifest.as_ref() {
1602 for component in &manifest.components {
1603 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1604 }
1605 }
1606 let mut pack_policy = (*wasi_policy).clone();
1607 if let Some(dir) = pack_assets_dir {
1608 tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
1609 pack_policy =
1610 pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
1611 }
1612 let wasi_policy = Arc::new(pack_policy);
1613 Ok(Self {
1614 path: safe_path,
1615 archive_path: archive_hint.map(Path::to_path_buf),
1616 config,
1617 engine,
1618 metadata,
1619 manifest,
1620 legacy_manifest,
1621 component_manifests,
1622 mocks,
1623 flows,
1624 components,
1625 http_client,
1626 pre_cache: Mutex::new(HashMap::new()),
1627 session_store,
1628 state_store,
1629 wasi_policy,
1630 assets_tempdir,
1631 provider_registry: RwLock::new(None),
1632 secrets,
1633 oauth_config,
1634 cache,
1635 })
1636 }
1637
1638 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1639 if let Some(cache) = &self.flows {
1640 return Ok(cache.descriptors.clone());
1641 }
1642 if let Some(manifest) = &self.manifest {
1643 let descriptors = manifest
1644 .flows
1645 .iter()
1646 .map(|flow| FlowDescriptor {
1647 id: flow.id.as_str().to_string(),
1648 flow_type: flow_kind_to_str(flow.kind).to_string(),
1649 pack_id: manifest.pack_id.as_str().to_string(),
1650 profile: manifest.pack_id.as_str().to_string(),
1651 version: manifest.version.to_string(),
1652 description: None,
1653 })
1654 .collect();
1655 return Ok(descriptors);
1656 }
1657 Ok(Vec::new())
1658 }
1659
1660 #[allow(dead_code)]
1661 pub async fn run_flow(
1662 &self,
1663 flow_id: &str,
1664 input: serde_json::Value,
1665 ) -> Result<serde_json::Value> {
1666 let pack = Arc::new(
1667 PackRuntime::load(
1668 &self.path,
1669 Arc::clone(&self.config),
1670 self.mocks.clone(),
1671 self.archive_path.as_deref(),
1672 self.session_store.clone(),
1673 self.state_store.clone(),
1674 Arc::clone(&self.wasi_policy),
1675 self.secrets.clone(),
1676 self.oauth_config.clone(),
1677 false,
1678 ComponentResolution::default(),
1679 )
1680 .await?,
1681 );
1682
1683 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1684 let retry_config = self.config.retry_config().into();
1685 let mocks = pack.mocks.as_deref();
1686 let tenant = self.config.tenant.as_str();
1687
1688 let ctx = FlowContext {
1689 tenant,
1690 pack_id: pack.metadata().pack_id.as_str(),
1691 flow_id,
1692 node_id: None,
1693 tool: None,
1694 action: None,
1695 session_id: None,
1696 provider_id: None,
1697 retry_config,
1698 attempt: 1,
1699 observer: None,
1700 mocks,
1701 };
1702
1703 let execution = engine.execute(ctx, input).await?;
1704 match execution.status {
1705 FlowStatus::Completed => Ok(execution.output),
1706 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1707 "status": "pending",
1708 "reason": wait.reason,
1709 "resume": wait.snapshot,
1710 "response": execution.output,
1711 })),
1712 }
1713 }
1714
1715 pub async fn invoke_component(
1716 &self,
1717 component_ref: &str,
1718 ctx: ComponentExecCtx,
1719 operation: &str,
1720 _config_json: Option<String>,
1721 input_json: String,
1722 ) -> Result<Value> {
1723 let pack_component = self
1724 .components
1725 .get(component_ref)
1726 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1727 let engine = self.engine.clone();
1728 let config = Arc::clone(&self.config);
1729 let http_client = Arc::clone(&self.http_client);
1730 let mocks = self.mocks.clone();
1731 let session_store = self.session_store.clone();
1732 let state_store = self.state_store.clone();
1733 let secrets = Arc::clone(&self.secrets);
1734 let oauth_config = self.oauth_config.clone();
1735 let wasi_policy = Arc::clone(&self.wasi_policy);
1736 let pack_id = self.metadata().pack_id.clone();
1737 let allow_state_store = self.allows_state_store(component_ref);
1738 let component = pack_component.component.clone();
1739 let component_ref_owned = component_ref.to_string();
1740 let operation_owned = operation.to_string();
1741 let input_owned = input_json;
1742 let ctx_owned = ctx;
1743
1744 run_on_wasi_thread("component.invoke", move || {
1745 let mut linker = Linker::new(&engine);
1746 register_all(&mut linker, allow_state_store)?;
1747 add_component_control_to_linker(&mut linker)?;
1748
1749 let host_state = HostState::new(
1750 pack_id.clone(),
1751 config,
1752 http_client,
1753 mocks,
1754 session_store,
1755 state_store,
1756 secrets,
1757 oauth_config,
1758 Some(ctx_owned.clone()),
1759 Some(component_ref_owned.clone()),
1760 false,
1761 )?;
1762 let store_state = ComponentState::new(host_state, wasi_policy)?;
1763 let mut store = wasmtime::Store::new(&engine, store_state);
1764
1765 let invoke_result = HostState::instantiate_component_result(
1766 &mut linker,
1767 &mut store,
1768 &component,
1769 &ctx_owned,
1770 &operation_owned,
1771 &input_owned,
1772 )?;
1773 HostState::convert_invoke_result(invoke_result)
1774 })
1775 }
1776
1777 pub fn resolve_provider(
1778 &self,
1779 provider_id: Option<&str>,
1780 provider_type: Option<&str>,
1781 ) -> Result<ProviderBinding> {
1782 let registry = self.provider_registry()?;
1783 registry.resolve(provider_id, provider_type)
1784 }
1785
1786 pub async fn invoke_provider(
1787 &self,
1788 binding: &ProviderBinding,
1789 ctx: ComponentExecCtx,
1790 op: &str,
1791 input_json: Vec<u8>,
1792 ) -> Result<Value> {
1793 let component_ref_owned = binding.component_ref.clone();
1794 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
1795 format!("provider component '{component_ref_owned}' not found in pack")
1796 })?;
1797 let component = pack_component.component.clone();
1798
1799 let engine = self.engine.clone();
1800 let config = Arc::clone(&self.config);
1801 let http_client = Arc::clone(&self.http_client);
1802 let mocks = self.mocks.clone();
1803 let session_store = self.session_store.clone();
1804 let state_store = self.state_store.clone();
1805 let secrets = Arc::clone(&self.secrets);
1806 let oauth_config = self.oauth_config.clone();
1807 let wasi_policy = Arc::clone(&self.wasi_policy);
1808 let pack_id = self.metadata().pack_id.clone();
1809 let allow_state_store = self.allows_state_store(&component_ref_owned);
1810 let input_owned = input_json;
1811 let op_owned = op.to_string();
1812 let ctx_owned = ctx;
1813 let world = binding.world.clone();
1814
1815 run_on_wasi_thread("provider.invoke", move || {
1816 let mut linker = Linker::new(&engine);
1817 register_all(&mut linker, allow_state_store)?;
1818 add_component_control_to_linker(&mut linker)?;
1819 let mut pre_instance = Some(linker.instantiate_pre(component.as_ref())?);
1820 let host_state = HostState::new(
1821 pack_id.clone(),
1822 config,
1823 http_client,
1824 mocks,
1825 session_store,
1826 state_store,
1827 secrets,
1828 oauth_config,
1829 Some(ctx_owned.clone()),
1830 Some(component_ref_owned.clone()),
1831 true,
1832 )?;
1833 let store_state = ComponentState::new(host_state, wasi_policy)?;
1834 let mut store = wasmtime::Store::new(&engine, store_state);
1835 let use_schema_core =
1836 world.contains("provider-schema-core") || world.contains("provider/schema-core");
1837 let result = if use_schema_core {
1838 let pre_instance = pre_instance
1839 .take()
1840 .ok_or_else(|| anyhow!("provider pre_instance already consumed"))?;
1841 let pre: SchemaSchemaCorePre<ComponentState> =
1842 SchemaSchemaCorePre::new(pre_instance)?;
1843 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
1844 let provider = bindings.greentic_provider_schema_core_schema_core_api();
1845 provider.call_invoke(&mut store, &op_owned, &input_owned)?
1846 } else {
1847 let pre_instance = pre_instance
1848 .take()
1849 .ok_or_else(|| anyhow!("provider pre_instance already consumed"))?;
1850 let pre: LegacySchemaCorePre<ComponentState> =
1851 LegacySchemaCorePre::new(pre_instance)?;
1852 let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
1853 let provider = bindings.greentic_provider_core_schema_core_api();
1854 provider.call_invoke(&mut store, &op_owned, &input_owned)?
1855 };
1856 deserialize_json_bytes(result)
1857 })
1858 }
1859
1860 pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
1861 if let Some(registry) = self.provider_registry.read().clone() {
1862 return Ok(registry);
1863 }
1864 let manifest = self
1865 .manifest
1866 .as_ref()
1867 .context("pack manifest required for provider resolution")?;
1868 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1869 let registry = ProviderRegistry::new(
1870 manifest,
1871 self.state_store.clone(),
1872 &self.config.tenant,
1873 &env,
1874 )?;
1875 *self.provider_registry.write() = Some(registry.clone());
1876 Ok(registry)
1877 }
1878
1879 pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
1880 if self.manifest.is_none() {
1881 return Ok(None);
1882 }
1883 Ok(Some(self.provider_registry()?))
1884 }
1885
1886 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1887 if let Some(cache) = &self.flows {
1888 return cache
1889 .flows
1890 .get(flow_id)
1891 .cloned()
1892 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1893 }
1894 if let Some(manifest) = &self.manifest {
1895 let entry = manifest
1896 .flows
1897 .iter()
1898 .find(|f| f.id.as_str() == flow_id)
1899 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1900 return Ok(entry.flow.clone());
1901 }
1902 bail!("flow '{flow_id}' not available (pack exports disabled)")
1903 }
1904
1905 pub fn metadata(&self) -> &PackMetadata {
1906 &self.metadata
1907 }
1908
1909 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1910 &self.metadata.secret_requirements
1911 }
1912
1913 pub fn missing_secrets(
1914 &self,
1915 tenant_ctx: &TypesTenantCtx,
1916 ) -> Vec<greentic_types::SecretRequirement> {
1917 let env = tenant_ctx.env.as_str().to_string();
1918 let tenant = tenant_ctx.tenant.as_str().to_string();
1919 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1920 self.required_secrets()
1921 .iter()
1922 .filter(|req| {
1923 if let Some(scope) = &req.scope {
1925 if scope.env != env {
1926 return false;
1927 }
1928 if scope.tenant != tenant {
1929 return false;
1930 }
1931 if let Some(ref team_req) = scope.team
1932 && team.as_ref() != Some(team_req)
1933 {
1934 return false;
1935 }
1936 }
1937 let ctx = self.config.tenant_ctx();
1938 read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1939 })
1940 .cloned()
1941 .collect()
1942 }
1943
1944 pub fn for_component_test(
1945 components: Vec<(String, PathBuf)>,
1946 flows: HashMap<String, FlowIR>,
1947 pack_id: &str,
1948 config: Arc<HostConfig>,
1949 ) -> Result<Self> {
1950 let engine = Engine::default();
1951 let engine_profile =
1952 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1953 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1954 let mut component_map = HashMap::new();
1955 for (name, path) in components {
1956 if !path.exists() {
1957 bail!("component artifact missing: {}", path.display());
1958 }
1959 let wasm_bytes = std::fs::read(&path)?;
1960 let component = Arc::new(
1961 Component::from_binary(&engine, &wasm_bytes)
1962 .with_context(|| format!("failed to compile component {}", path.display()))?,
1963 );
1964 component_map.insert(
1965 name.clone(),
1966 PackComponent {
1967 name,
1968 version: "0.0.0".into(),
1969 component,
1970 },
1971 );
1972 }
1973
1974 let mut flow_map = HashMap::new();
1975 let mut descriptors = Vec::new();
1976 for (id, ir) in flows {
1977 let flow_type = ir.flow_type.clone();
1978 let flow = flow_ir_to_flow(ir)?;
1979 flow_map.insert(id.clone(), flow);
1980 descriptors.push(FlowDescriptor {
1981 id: id.clone(),
1982 flow_type,
1983 pack_id: pack_id.to_string(),
1984 profile: "test".into(),
1985 version: "0.0.0".into(),
1986 description: None,
1987 });
1988 }
1989 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1990 let metadata = PackMetadata {
1991 pack_id: pack_id.to_string(),
1992 version: "0.0.0".into(),
1993 entry_flows,
1994 secret_requirements: Vec::new(),
1995 };
1996 let flows_cache = PackFlows {
1997 descriptors: descriptors.clone(),
1998 flows: flow_map,
1999 metadata: metadata.clone(),
2000 };
2001
2002 Ok(Self {
2003 path: PathBuf::new(),
2004 archive_path: None,
2005 config,
2006 engine,
2007 metadata,
2008 manifest: None,
2009 legacy_manifest: None,
2010 component_manifests: HashMap::new(),
2011 mocks: None,
2012 flows: Some(flows_cache),
2013 components: component_map,
2014 http_client: Arc::clone(&HTTP_CLIENT),
2015 pre_cache: Mutex::new(HashMap::new()),
2016 session_store: None,
2017 state_store: None,
2018 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
2019 assets_tempdir: None,
2020 provider_registry: RwLock::new(None),
2021 secrets: crate::secrets::default_manager()?,
2022 oauth_config: None,
2023 cache,
2024 })
2025 }
2026}
2027
2028fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
2029 let message = err.to_string();
2030 message.contains("no exported instance named")
2031 && message.contains(&format!("greentic:component/node@{version}"))
2032}
2033
2034struct PackFlows {
2035 descriptors: Vec<FlowDescriptor>,
2036 flows: HashMap<String, Flow>,
2037 metadata: PackMetadata,
2038}
2039
2040const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
2041 "greentic.pack.runtime_flow",
2042 "greentic.pack.flow_runtime",
2043 "greentic.pack.runtime_flows",
2044];
2045
2046#[derive(Debug, Deserialize)]
2047struct RuntimeFlowBundle {
2048 flows: Vec<RuntimeFlow>,
2049}
2050
2051#[derive(Debug, Deserialize)]
2052struct RuntimeFlow {
2053 id: String,
2054 #[serde(alias = "flow_type")]
2055 kind: FlowKind,
2056 #[serde(default)]
2057 schema_version: Option<String>,
2058 #[serde(default)]
2059 start: Option<String>,
2060 #[serde(default)]
2061 entrypoints: BTreeMap<String, Value>,
2062 nodes: BTreeMap<String, RuntimeNode>,
2063 #[serde(default)]
2064 metadata: Option<FlowMetadata>,
2065}
2066
2067#[derive(Debug, Deserialize)]
2068struct RuntimeNode {
2069 #[serde(alias = "component")]
2070 component_id: String,
2071 #[serde(default, alias = "operation")]
2072 operation_name: Option<String>,
2073 #[serde(default, alias = "payload", alias = "input")]
2074 operation_payload: Value,
2075 #[serde(default)]
2076 routing: Option<Routing>,
2077 #[serde(default)]
2078 telemetry: Option<TelemetryHints>,
2079}
2080
2081fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
2082 if bytes.is_empty() {
2083 return Ok(Value::Null);
2084 }
2085 serde_json::from_slice(&bytes).or_else(|_| {
2086 String::from_utf8(bytes)
2087 .map(Value::String)
2088 .map_err(|err| anyhow!(err))
2089 })
2090}
2091
2092impl PackFlows {
2093 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
2094 if let Some(flows) = flows_from_runtime_extension(&manifest) {
2095 return flows;
2096 }
2097 let descriptors = manifest
2098 .flows
2099 .iter()
2100 .map(|entry| FlowDescriptor {
2101 id: entry.id.as_str().to_string(),
2102 flow_type: flow_kind_to_str(entry.kind).to_string(),
2103 pack_id: manifest.pack_id.as_str().to_string(),
2104 profile: manifest.pack_id.as_str().to_string(),
2105 version: manifest.version.to_string(),
2106 description: None,
2107 })
2108 .collect();
2109 let mut flows = HashMap::new();
2110 for entry in &manifest.flows {
2111 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
2112 }
2113 Self {
2114 metadata: PackMetadata::from_manifest(&manifest),
2115 descriptors,
2116 flows,
2117 }
2118 }
2119}
2120
2121fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
2122 let extensions = manifest.extensions.as_ref()?;
2123 let extension = extensions.iter().find_map(|(key, ext)| {
2124 if RUNTIME_FLOW_EXTENSION_IDS
2125 .iter()
2126 .any(|candidate| candidate == key)
2127 {
2128 Some(ext)
2129 } else {
2130 None
2131 }
2132 })?;
2133 let runtime_flows = match decode_runtime_flow_extension(extension) {
2134 Some(flows) if !flows.is_empty() => flows,
2135 _ => return None,
2136 };
2137
2138 let descriptors = runtime_flows
2139 .iter()
2140 .map(|flow| FlowDescriptor {
2141 id: flow.id.as_str().to_string(),
2142 flow_type: flow_kind_to_str(flow.kind).to_string(),
2143 pack_id: manifest.pack_id.as_str().to_string(),
2144 profile: manifest.pack_id.as_str().to_string(),
2145 version: manifest.version.to_string(),
2146 description: None,
2147 })
2148 .collect::<Vec<_>>();
2149 let flows = runtime_flows
2150 .into_iter()
2151 .map(|flow| (flow.id.as_str().to_string(), flow))
2152 .collect();
2153
2154 Some(PackFlows {
2155 metadata: PackMetadata::from_manifest(manifest),
2156 descriptors,
2157 flows,
2158 })
2159}
2160
2161fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
2162 let value = match extension.inline.as_ref()? {
2163 ExtensionInline::Other(value) => value.clone(),
2164 _ => return None,
2165 };
2166
2167 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
2168 return Some(collect_runtime_flows(bundle.flows));
2169 }
2170
2171 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
2172 return Some(collect_runtime_flows(flows));
2173 }
2174
2175 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
2176 return Some(flows);
2177 }
2178
2179 warn!(
2180 extension = %extension.kind,
2181 version = %extension.version,
2182 "runtime flow extension present but could not be decoded"
2183 );
2184 None
2185}
2186
2187fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
2188 flows
2189 .into_iter()
2190 .filter_map(|flow| match runtime_flow_to_flow(flow) {
2191 Ok(flow) => Some(flow),
2192 Err(err) => {
2193 warn!(error = %err, "failed to decode runtime flow");
2194 None
2195 }
2196 })
2197 .collect()
2198}
2199
2200fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
2201 let flow_id = FlowId::from_str(&runtime.id)
2202 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
2203 let mut entrypoints = runtime.entrypoints;
2204 if entrypoints.is_empty()
2205 && let Some(start) = &runtime.start
2206 {
2207 entrypoints.insert("default".into(), Value::String(start.clone()));
2208 }
2209
2210 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
2211 for (id, node) in runtime.nodes {
2212 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
2213 let component_id = ComponentId::from_str(&node.component_id)
2214 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
2215 let component = FlowComponentRef {
2216 id: component_id,
2217 pack_alias: None,
2218 operation: node.operation_name,
2219 };
2220 let routing = node.routing.unwrap_or(Routing::End);
2221 let telemetry = node.telemetry.unwrap_or_default();
2222 nodes.insert(
2223 node_id.clone(),
2224 Node {
2225 id: node_id,
2226 component,
2227 input: InputMapping {
2228 mapping: node.operation_payload,
2229 },
2230 output: OutputMapping {
2231 mapping: Value::Null,
2232 },
2233 routing,
2234 telemetry,
2235 },
2236 );
2237 }
2238
2239 Ok(Flow {
2240 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
2241 id: flow_id,
2242 kind: runtime.kind,
2243 entrypoints,
2244 nodes,
2245 metadata: runtime.metadata.unwrap_or_default(),
2246 })
2247}
2248
2249fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
2250 match kind {
2251 greentic_types::FlowKind::Messaging => "messaging",
2252 greentic_types::FlowKind::Event => "event",
2253 greentic_types::FlowKind::ComponentConfig => "component-config",
2254 greentic_types::FlowKind::Job => "job",
2255 greentic_types::FlowKind::Http => "http",
2256 }
2257}
2258
2259fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
2260 let mut file = archive
2261 .by_name(name)
2262 .with_context(|| format!("entry {name} missing from archive"))?;
2263 let mut buf = Vec::new();
2264 file.read_to_end(&mut buf)?;
2265 Ok(buf)
2266}
2267
2268fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
2269 for node in doc.nodes.values_mut() {
2270 let Some((component_ref, payload)) = node
2271 .raw
2272 .iter()
2273 .next()
2274 .map(|(key, value)| (key.clone(), value.clone()))
2275 else {
2276 continue;
2277 };
2278 if component_ref.starts_with("emit.") {
2279 node.operation = Some(component_ref);
2280 node.payload = payload;
2281 node.raw.clear();
2282 continue;
2283 }
2284 let (target_component, operation, input, config) =
2285 infer_component_exec(&payload, &component_ref);
2286 let mut payload_obj = serde_json::Map::new();
2287 payload_obj.insert("component".into(), Value::String(target_component));
2289 payload_obj.insert("operation".into(), Value::String(operation));
2290 payload_obj.insert("input".into(), input);
2291 if let Some(cfg) = config {
2292 payload_obj.insert("config".into(), cfg);
2293 }
2294 node.operation = Some("component.exec".to_string());
2295 node.payload = Value::Object(payload_obj);
2296 node.raw.clear();
2297 }
2298 doc
2299}
2300
2301fn infer_component_exec(
2302 payload: &Value,
2303 component_ref: &str,
2304) -> (String, String, Value, Option<Value>) {
2305 let default_op = if component_ref.starts_with("templating.") {
2306 "render"
2307 } else {
2308 "invoke"
2309 }
2310 .to_string();
2311
2312 if let Value::Object(map) = payload {
2313 let op = map
2314 .get("op")
2315 .or_else(|| map.get("operation"))
2316 .and_then(Value::as_str)
2317 .map(|s| s.to_string())
2318 .unwrap_or_else(|| default_op.clone());
2319
2320 let mut input = map.clone();
2321 let config = input.remove("config");
2322 let component = input
2323 .get("component")
2324 .or_else(|| input.get("component_ref"))
2325 .and_then(Value::as_str)
2326 .map(|s| s.to_string())
2327 .unwrap_or_else(|| component_ref.to_string());
2328 input.remove("component");
2329 input.remove("component_ref");
2330 input.remove("op");
2331 input.remove("operation");
2332 return (component, op, Value::Object(input), config);
2333 }
2334
2335 (component_ref.to_string(), default_op, payload.clone(), None)
2336}
2337
2338#[derive(Clone, Debug)]
2339struct ComponentSpec {
2340 id: String,
2341 version: String,
2342 legacy_path: Option<String>,
2343}
2344
2345#[derive(Clone, Debug)]
2346struct ComponentSourceInfo {
2347 digest: Option<String>,
2348 source: ComponentSourceRef,
2349 artifact: ComponentArtifactLocation,
2350 expected_wasm_sha256: Option<String>,
2351 skip_digest_verification: bool,
2352}
2353
2354#[derive(Clone, Debug)]
2355enum ComponentArtifactLocation {
2356 Inline { wasm_path: String },
2357 Remote,
2358}
2359
2360#[derive(Clone, Debug, Deserialize)]
2361struct PackLockV1 {
2362 schema_version: u32,
2363 components: Vec<PackLockComponent>,
2364}
2365
2366#[derive(Clone, Debug, Deserialize)]
2367struct PackLockComponent {
2368 name: String,
2369 #[serde(default, rename = "source_ref")]
2370 source_ref: Option<String>,
2371 #[serde(default, rename = "ref")]
2372 legacy_ref: Option<String>,
2373 #[serde(default)]
2374 component_id: Option<ComponentId>,
2375 #[serde(default)]
2376 bundled: Option<bool>,
2377 #[serde(default, rename = "bundled_path")]
2378 bundled_path: Option<String>,
2379 #[serde(default, rename = "path")]
2380 legacy_path: Option<String>,
2381 #[serde(default)]
2382 wasm_sha256: Option<String>,
2383 #[serde(default, rename = "sha256")]
2384 legacy_sha256: Option<String>,
2385 #[serde(default)]
2386 resolved_digest: Option<String>,
2387 #[serde(default)]
2388 digest: Option<String>,
2389}
2390
2391fn component_specs(
2392 manifest: Option<&greentic_types::PackManifest>,
2393 legacy_manifest: Option<&legacy_pack::PackManifest>,
2394 component_sources: Option<&ComponentSourcesV1>,
2395 pack_lock: Option<&PackLockV1>,
2396) -> Vec<ComponentSpec> {
2397 if let Some(manifest) = manifest {
2398 if !manifest.components.is_empty() {
2399 return manifest
2400 .components
2401 .iter()
2402 .map(|entry| ComponentSpec {
2403 id: entry.id.as_str().to_string(),
2404 version: entry.version.to_string(),
2405 legacy_path: None,
2406 })
2407 .collect();
2408 }
2409 if let Some(lock) = pack_lock {
2410 let mut seen = HashSet::new();
2411 let mut specs = Vec::new();
2412 for entry in &lock.components {
2413 let id = entry
2414 .component_id
2415 .as_ref()
2416 .map(|id| id.as_str())
2417 .unwrap_or(entry.name.as_str());
2418 if seen.insert(id.to_string()) {
2419 specs.push(ComponentSpec {
2420 id: id.to_string(),
2421 version: "0.0.0".to_string(),
2422 legacy_path: None,
2423 });
2424 }
2425 }
2426 return specs;
2427 }
2428 if let Some(sources) = component_sources {
2429 let mut seen = HashSet::new();
2430 let mut specs = Vec::new();
2431 for entry in &sources.components {
2432 let id = entry
2433 .component_id
2434 .as_ref()
2435 .map(|id| id.as_str())
2436 .unwrap_or(entry.name.as_str());
2437 if seen.insert(id.to_string()) {
2438 specs.push(ComponentSpec {
2439 id: id.to_string(),
2440 version: "0.0.0".to_string(),
2441 legacy_path: None,
2442 });
2443 }
2444 }
2445 return specs;
2446 }
2447 }
2448 if let Some(legacy_manifest) = legacy_manifest {
2449 return legacy_manifest
2450 .components
2451 .iter()
2452 .map(|entry| ComponentSpec {
2453 id: entry.name.clone(),
2454 version: entry.version.to_string(),
2455 legacy_path: Some(entry.file_wasm.clone()),
2456 })
2457 .collect();
2458 }
2459 Vec::new()
2460}
2461
2462fn component_sources_table(
2463 sources: Option<&ComponentSourcesV1>,
2464) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2465 let Some(sources) = sources else {
2466 return Ok(None);
2467 };
2468 let mut table = HashMap::new();
2469 for entry in &sources.components {
2470 let artifact = match &entry.artifact {
2471 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2472 wasm_path: wasm_path.clone(),
2473 },
2474 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2475 };
2476 let info = ComponentSourceInfo {
2477 digest: Some(entry.resolved.digest.clone()),
2478 source: entry.source.clone(),
2479 artifact,
2480 expected_wasm_sha256: None,
2481 skip_digest_verification: false,
2482 };
2483 if let Some(component_id) = entry.component_id.as_ref() {
2484 table.insert(component_id.as_str().to_string(), info.clone());
2485 }
2486 table.insert(entry.name.clone(), info);
2487 }
2488 Ok(Some(table))
2489}
2490
2491fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2492 let lock_path = if path.is_dir() {
2493 let candidate = path.join("pack.lock");
2494 if candidate.exists() {
2495 Some(candidate)
2496 } else {
2497 let candidate = path.join("pack.lock.json");
2498 candidate.exists().then_some(candidate)
2499 }
2500 } else {
2501 None
2502 };
2503 let Some(lock_path) = lock_path else {
2504 return Ok(None);
2505 };
2506 let raw = std::fs::read_to_string(&lock_path)
2507 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2508 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2509 if lock.schema_version != 1 {
2510 bail!("pack.lock schema_version must be 1");
2511 }
2512 Ok(Some(lock))
2513}
2514
2515fn find_pack_lock_roots(
2516 pack_path: &Path,
2517 is_dir: bool,
2518 archive_hint: Option<&Path>,
2519) -> Vec<PathBuf> {
2520 if is_dir {
2521 return vec![pack_path.to_path_buf()];
2522 }
2523 let mut roots = Vec::new();
2524 if let Some(archive_path) = archive_hint {
2525 if let Some(parent) = archive_path.parent() {
2526 roots.push(parent.to_path_buf());
2527 if let Some(grandparent) = parent.parent() {
2528 roots.push(grandparent.to_path_buf());
2529 }
2530 }
2531 } else if let Some(parent) = pack_path.parent() {
2532 roots.push(parent.to_path_buf());
2533 if let Some(grandparent) = parent.parent() {
2534 roots.push(grandparent.to_path_buf());
2535 }
2536 }
2537 roots
2538}
2539
2540fn normalize_sha256(digest: &str) -> Result<String> {
2541 let trimmed = digest.trim();
2542 if trimmed.is_empty() {
2543 bail!("sha256 digest cannot be empty");
2544 }
2545 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2546 if stripped.is_empty() {
2547 bail!("sha256 digest must include hex bytes after sha256:");
2548 }
2549 return Ok(trimmed.to_string());
2550 }
2551 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2552 return Ok(format!("sha256:{trimmed}"));
2553 }
2554 bail!("sha256 digest must be hex or sha256:<hex>");
2555}
2556
2557fn component_sources_table_from_pack_lock(
2558 lock: &PackLockV1,
2559 allow_missing_hash: bool,
2560) -> Result<HashMap<String, ComponentSourceInfo>> {
2561 let mut table = HashMap::new();
2562 let mut names = HashSet::new();
2563 for entry in &lock.components {
2564 if !names.insert(entry.name.clone()) {
2565 bail!(
2566 "pack.lock contains duplicate component name `{}`",
2567 entry.name
2568 );
2569 }
2570 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2571 (Some(primary), Some(legacy)) => {
2572 if primary != legacy {
2573 bail!(
2574 "pack.lock component {} has conflicting refs: {} vs {}",
2575 entry.name,
2576 primary,
2577 legacy
2578 );
2579 }
2580 primary.as_str()
2581 }
2582 (Some(primary), None) => primary.as_str(),
2583 (None, Some(legacy)) => legacy.as_str(),
2584 (None, None) => {
2585 bail!("pack.lock component {} missing source_ref", entry.name);
2586 }
2587 };
2588 let source: ComponentSourceRef = source_ref
2589 .parse()
2590 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2591 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2592 (Some(primary), Some(legacy)) => {
2593 if primary != legacy {
2594 bail!(
2595 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2596 entry.name,
2597 primary,
2598 legacy
2599 );
2600 }
2601 Some(primary.clone())
2602 }
2603 (Some(primary), None) => Some(primary.clone()),
2604 (None, Some(legacy)) => Some(legacy.clone()),
2605 (None, None) => None,
2606 };
2607 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2608 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2609 let wasm_path = bundled_path.ok_or_else(|| {
2610 anyhow!(
2611 "pack.lock component {} marked bundled but bundled_path is missing",
2612 entry.name
2613 )
2614 })?;
2615 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2616 (Some(primary), Some(legacy)) => {
2617 if primary != legacy {
2618 bail!(
2619 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2620 entry.name,
2621 primary,
2622 legacy
2623 );
2624 }
2625 Some(primary.as_str())
2626 }
2627 (Some(primary), None) => Some(primary.as_str()),
2628 (None, Some(legacy)) => Some(legacy.as_str()),
2629 (None, None) => None,
2630 };
2631 let expected = match expected_raw {
2632 Some(value) => Some(normalize_sha256(value)?),
2633 None => None,
2634 };
2635 if expected.is_none() && !allow_missing_hash {
2636 bail!(
2637 "pack.lock component {} missing wasm_sha256 for bundled component",
2638 entry.name
2639 );
2640 }
2641 (
2642 ComponentArtifactLocation::Inline { wasm_path },
2643 expected.clone(),
2644 expected,
2645 allow_missing_hash && expected_raw.is_none(),
2646 )
2647 } else {
2648 if source.is_tag() {
2649 bail!(
2650 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2651 entry.name,
2652 source
2653 );
2654 }
2655 let expected = entry
2656 .resolved_digest
2657 .as_deref()
2658 .or(entry.digest.as_deref())
2659 .ok_or_else(|| {
2660 anyhow!(
2661 "pack.lock component {} missing resolved_digest for remote component",
2662 entry.name
2663 )
2664 })?;
2665 (
2666 ComponentArtifactLocation::Remote,
2667 Some(normalize_digest(expected)),
2668 None,
2669 false,
2670 )
2671 };
2672 let info = ComponentSourceInfo {
2673 digest,
2674 source,
2675 artifact,
2676 expected_wasm_sha256,
2677 skip_digest_verification,
2678 };
2679 if let Some(component_id) = entry.component_id.as_ref() {
2680 let key = component_id.as_str().to_string();
2681 if table.contains_key(&key) {
2682 bail!(
2683 "pack.lock contains duplicate component id `{}`",
2684 component_id.as_str()
2685 );
2686 }
2687 table.insert(key, info.clone());
2688 }
2689 if entry.name
2690 != entry
2691 .component_id
2692 .as_ref()
2693 .map(|id| id.as_str())
2694 .unwrap_or("")
2695 {
2696 table.insert(entry.name.clone(), info);
2697 }
2698 }
2699 Ok(table)
2700}
2701
2702fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2703 if let Some(path) = &spec.legacy_path {
2704 return root.join(path);
2705 }
2706 root.join("components").join(format!("{}.wasm", spec.id))
2707}
2708
2709fn normalize_digest(digest: &str) -> String {
2710 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2711 digest.to_string()
2712 } else {
2713 format!("sha256:{digest}")
2714 }
2715}
2716
2717fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2718 if digest.starts_with("blake3:") {
2719 let hash = blake3::hash(bytes);
2720 return Ok(format!("blake3:{}", hash.to_hex()));
2721 }
2722 let mut hasher = sha2::Sha256::new();
2723 hasher.update(bytes);
2724 Ok(format!("sha256:{:x}", hasher.finalize()))
2725}
2726
2727fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2728 let mut hasher = sha2::Sha256::new();
2729 hasher.update(bytes);
2730 format!("sha256:{:x}", hasher.finalize())
2731}
2732
2733fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2734 let wasm_digest = digest
2735 .map(normalize_digest)
2736 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2737 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2738}
2739
2740async fn compile_component_with_cache(
2741 cache: &CacheManager,
2742 engine: &Engine,
2743 digest: Option<&str>,
2744 bytes: Vec<u8>,
2745) -> Result<Arc<Component>> {
2746 let key = build_artifact_key(cache, digest, &bytes);
2747 cache.get_component(engine, &key, || Ok(bytes)).await
2748}
2749
2750fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2751 let normalized_expected = normalize_digest(expected);
2752 let actual = compute_digest_for(bytes, &normalized_expected)?;
2753 if normalize_digest(&actual) != normalized_expected {
2754 bail!(
2755 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2756 );
2757 }
2758 Ok(())
2759}
2760
2761fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2762 let normalized_expected = normalize_sha256(expected)?;
2763 let actual = compute_sha256_digest_for(bytes);
2764 if actual != normalized_expected {
2765 bail!(
2766 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2767 );
2768 }
2769 Ok(())
2770}
2771
2772#[cfg(test)]
2773mod pack_lock_tests {
2774 use super::*;
2775 use tempfile::TempDir;
2776
2777 #[test]
2778 fn pack_lock_tag_ref_requires_bundle() {
2779 let lock = PackLockV1 {
2780 schema_version: 1,
2781 components: vec![PackLockComponent {
2782 name: "templates".to_string(),
2783 source_ref: Some("oci://registry.test/templates:latest".to_string()),
2784 legacy_ref: None,
2785 component_id: None,
2786 bundled: Some(false),
2787 bundled_path: None,
2788 legacy_path: None,
2789 wasm_sha256: None,
2790 legacy_sha256: None,
2791 resolved_digest: None,
2792 digest: None,
2793 }],
2794 };
2795 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2796 assert!(
2797 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2798 "unexpected error: {err}"
2799 );
2800 }
2801
2802 #[test]
2803 fn bundled_hash_mismatch_errors() {
2804 let rt = tokio::runtime::Runtime::new().expect("runtime");
2805 let temp = TempDir::new().expect("temp dir");
2806 let engine = Engine::default();
2807 let engine_profile =
2808 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2809 let cache_config = CacheConfig {
2810 root: temp.path().join("cache"),
2811 ..CacheConfig::default()
2812 };
2813 let cache = CacheManager::new(cache_config, engine_profile);
2814 let wasm_path = temp.path().join("component.wasm");
2815 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2816 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2817 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2818 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2819
2820 let spec = ComponentSpec {
2821 id: "qa.process".to_string(),
2822 version: "0.0.0".to_string(),
2823 legacy_path: None,
2824 };
2825 let mut missing = HashSet::new();
2826 missing.insert(spec.id.clone());
2827
2828 let mut sources = HashMap::new();
2829 sources.insert(
2830 spec.id.clone(),
2831 ComponentSourceInfo {
2832 digest: Some("sha256:deadbeef".to_string()),
2833 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2834 artifact: ComponentArtifactLocation::Inline {
2835 wasm_path: "component.wasm".to_string(),
2836 },
2837 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2838 skip_digest_verification: false,
2839 },
2840 );
2841
2842 let mut loaded = HashMap::new();
2843 let result = rt.block_on(load_components_from_sources(
2844 &cache,
2845 &engine,
2846 &sources,
2847 &ComponentResolution::default(),
2848 &[spec],
2849 &mut missing,
2850 &mut loaded,
2851 Some(temp.path()),
2852 None,
2853 ));
2854 let err = result.unwrap_err();
2855 assert!(
2856 err.to_string().contains("bundled digest mismatch"),
2857 "unexpected error: {err}"
2858 );
2859 }
2860}
2861
2862#[cfg(test)]
2863mod pack_resolution_prop_tests {
2864 use super::*;
2865 use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
2866 use proptest::prelude::*;
2867 use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
2868 use std::collections::BTreeSet;
2869 use std::path::Path;
2870 use std::str::FromStr;
2871
2872 #[derive(Clone, Debug)]
2873 enum ResolveRequest {
2874 ById(String),
2875 ByName(String),
2876 }
2877
2878 #[derive(Clone, Debug, PartialEq, Eq)]
2879 struct ResolvedComponent {
2880 key: String,
2881 source: String,
2882 artifact: String,
2883 digest: Option<String>,
2884 expected_wasm_sha256: Option<String>,
2885 skip_digest_verification: bool,
2886 }
2887
2888 #[derive(Clone, Debug, PartialEq, Eq)]
2889 struct ResolveError {
2890 code: String,
2891 message: String,
2892 context_key: String,
2893 }
2894
2895 #[derive(Clone, Debug)]
2896 struct Scenario {
2897 pack_lock: Option<PackLockV1>,
2898 component_sources: Option<ComponentSourcesV1>,
2899 request: ResolveRequest,
2900 expected_sha256: Option<String>,
2901 bytes: Vec<u8>,
2902 }
2903
2904 fn resolve_component_test(
2905 sources: Option<&ComponentSourcesV1>,
2906 lock: Option<&PackLockV1>,
2907 request: &ResolveRequest,
2908 ) -> Result<ResolvedComponent, ResolveError> {
2909 let table = if let Some(lock) = lock {
2910 component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
2911 code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
2912 message: err.to_string(),
2913 context_key: request_key(request).to_string(),
2914 })?
2915 } else {
2916 let sources = component_sources_table(sources).map_err(|err| ResolveError {
2917 code: "component_sources_error".to_string(),
2918 message: err.to_string(),
2919 context_key: request_key(request).to_string(),
2920 })?;
2921 sources.ok_or_else(|| ResolveError {
2922 code: "missing_component_sources".to_string(),
2923 message: "component sources not provided".to_string(),
2924 context_key: request_key(request).to_string(),
2925 })?
2926 };
2927
2928 let key = request_key(request);
2929 let source = table.get(key).ok_or_else(|| ResolveError {
2930 code: "component_not_found".to_string(),
2931 message: format!("component {key} not found"),
2932 context_key: key.to_string(),
2933 })?;
2934
2935 Ok(ResolvedComponent {
2936 key: key.to_string(),
2937 source: source.source.to_string(),
2938 artifact: match source.artifact {
2939 ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
2940 ComponentArtifactLocation::Remote => "remote".to_string(),
2941 },
2942 digest: source.digest.clone(),
2943 expected_wasm_sha256: source.expected_wasm_sha256.clone(),
2944 skip_digest_verification: source.skip_digest_verification,
2945 })
2946 }
2947
2948 fn request_key(request: &ResolveRequest) -> &str {
2949 match request {
2950 ResolveRequest::ById(value) => value.as_str(),
2951 ResolveRequest::ByName(value) => value.as_str(),
2952 }
2953 }
2954
2955 fn classify_pack_lock_error(message: &str) -> &'static str {
2956 if message.contains("duplicate component name") {
2957 "duplicate_name"
2958 } else if message.contains("duplicate component id") {
2959 "duplicate_id"
2960 } else if message.contains("conflicting refs") {
2961 "conflicting_ref"
2962 } else if message.contains("conflicting bundled paths") {
2963 "conflicting_bundled_path"
2964 } else if message.contains("conflicting wasm_sha256") {
2965 "conflicting_wasm_sha256"
2966 } else if message.contains("missing source_ref") {
2967 "missing_source_ref"
2968 } else if message.contains("marked bundled but bundled_path is missing") {
2969 "missing_bundled_path"
2970 } else if message.contains("missing wasm_sha256") {
2971 "missing_wasm_sha256"
2972 } else if message.contains("tag ref") && message.contains("not bundled") {
2973 "tag_ref_requires_bundle"
2974 } else if message.contains("missing resolved_digest") {
2975 "missing_resolved_digest"
2976 } else if message.contains("invalid component ref") {
2977 "invalid_component_ref"
2978 } else if message.contains("sha256 digest") {
2979 "invalid_sha256"
2980 } else {
2981 "unknown_error"
2982 }
2983 }
2984
2985 fn known_error_codes() -> BTreeSet<&'static str> {
2986 [
2987 "component_sources_error",
2988 "missing_component_sources",
2989 "component_not_found",
2990 "duplicate_name",
2991 "duplicate_id",
2992 "conflicting_ref",
2993 "conflicting_bundled_path",
2994 "conflicting_wasm_sha256",
2995 "missing_source_ref",
2996 "missing_bundled_path",
2997 "missing_wasm_sha256",
2998 "tag_ref_requires_bundle",
2999 "missing_resolved_digest",
3000 "invalid_component_ref",
3001 "invalid_sha256",
3002 "unknown_error",
3003 ]
3004 .into_iter()
3005 .collect()
3006 }
3007
3008 fn proptest_config() -> ProptestConfig {
3009 let cases = std::env::var("PROPTEST_CASES")
3010 .ok()
3011 .and_then(|value| value.parse::<u32>().ok())
3012 .unwrap_or(128);
3013 ProptestConfig {
3014 cases,
3015 failure_persistence: None,
3016 ..ProptestConfig::default()
3017 }
3018 }
3019
3020 fn proptest_seed() -> Option<[u8; 32]> {
3021 let seed = std::env::var("PROPTEST_SEED")
3022 .ok()
3023 .and_then(|value| value.parse::<u64>().ok())?;
3024 let mut bytes = [0u8; 32];
3025 bytes[..8].copy_from_slice(&seed.to_le_bytes());
3026 Some(bytes)
3027 }
3028
3029 fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
3030 let config = ProptestConfig {
3031 cases,
3032 failure_persistence: None,
3033 ..ProptestConfig::default()
3034 };
3035 let mut runner = match seed {
3036 Some(bytes) => {
3037 TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
3038 }
3039 None => TestRunner::new(config),
3040 };
3041 runner
3042 .run(&strategy, |scenario| {
3043 run_scenario(&scenario);
3044 Ok(())
3045 })
3046 .unwrap();
3047 }
3048
3049 fn run_scenario(scenario: &Scenario) {
3050 let known_codes = known_error_codes();
3051 let first = resolve_component_test(
3052 scenario.component_sources.as_ref(),
3053 scenario.pack_lock.as_ref(),
3054 &scenario.request,
3055 );
3056 let second = resolve_component_test(
3057 scenario.component_sources.as_ref(),
3058 scenario.pack_lock.as_ref(),
3059 &scenario.request,
3060 );
3061 assert_eq!(normalize_result(&first), normalize_result(&second));
3062
3063 if let Some(lock) = scenario.pack_lock.as_ref() {
3064 let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
3065 assert_eq!(normalize_result(&first), normalize_result(&lock_only));
3066 }
3067
3068 if let Err(err) = first.as_ref() {
3069 assert!(
3070 known_codes.contains(err.code.as_str()),
3071 "unexpected error code {}: {}",
3072 err.code,
3073 err.message
3074 );
3075 }
3076
3077 if let Some(expected) = scenario.expected_sha256.as_deref() {
3078 let expected_ok =
3079 verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
3080 let actual = compute_sha256_digest_for(&scenario.bytes);
3081 if actual == normalize_sha256(expected).unwrap_or_default() {
3082 assert!(expected_ok, "expected sha256 match to succeed");
3083 } else {
3084 assert!(!expected_ok, "expected sha256 mismatch to fail");
3085 }
3086 }
3087 }
3088
3089 fn normalize_result(
3090 result: &Result<ResolvedComponent, ResolveError>,
3091 ) -> Result<ResolvedComponent, ResolveError> {
3092 match result {
3093 Ok(value) => Ok(value.clone()),
3094 Err(err) => Err(err.clone()),
3095 }
3096 }
3097
3098 fn scenario_strategy() -> impl Strategy<Value = Scenario> {
3099 let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
3100 let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
3101 let tag_ref = any::<bool>();
3102 let bundled = any::<bool>();
3103 let include_sha = any::<bool>();
3104 let include_component_id = any::<bool>();
3105 let request_by_id = any::<bool>();
3106 let use_lock = any::<bool>();
3107 let use_sources = any::<bool>();
3108 let bytes = prop::collection::vec(any::<u8>(), 1..64);
3109
3110 (
3111 name,
3112 alt_name,
3113 tag_ref,
3114 bundled,
3115 include_sha,
3116 include_component_id,
3117 request_by_id,
3118 use_lock,
3119 use_sources,
3120 bytes,
3121 )
3122 .prop_map(
3123 |(
3124 name,
3125 alt_name,
3126 tag_ref,
3127 bundled,
3128 include_sha,
3129 include_component_id,
3130 request_by_id,
3131 use_lock,
3132 use_sources,
3133 bytes,
3134 )| {
3135 let component_id_str = if include_component_id {
3136 alt_name.clone()
3137 } else {
3138 name.clone()
3139 };
3140 let component_id = ComponentId::from_str(&component_id_str).ok();
3141 let source_ref = if tag_ref {
3142 format!("oci://registry.test/{name}:v1")
3143 } else {
3144 format!(
3145 "oci://registry.test/{name}@sha256:{}",
3146 hex::encode([0x11u8; 32])
3147 )
3148 };
3149 let expected_sha256 = if bundled && include_sha {
3150 Some(compute_sha256_digest_for(&bytes))
3151 } else {
3152 None
3153 };
3154
3155 let lock_component = PackLockComponent {
3156 name: name.clone(),
3157 source_ref: Some(source_ref),
3158 legacy_ref: None,
3159 component_id,
3160 bundled: Some(bundled),
3161 bundled_path: if bundled {
3162 Some(format!("components/{name}.wasm"))
3163 } else {
3164 None
3165 },
3166 legacy_path: None,
3167 wasm_sha256: expected_sha256.clone(),
3168 legacy_sha256: None,
3169 resolved_digest: if bundled {
3170 None
3171 } else {
3172 Some("sha256:deadbeef".to_string())
3173 },
3174 digest: None,
3175 };
3176
3177 let pack_lock = if use_lock {
3178 Some(PackLockV1 {
3179 schema_version: 1,
3180 components: vec![lock_component],
3181 })
3182 } else {
3183 None
3184 };
3185
3186 let component_sources = if use_sources {
3187 Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
3188 name: name.clone(),
3189 component_id: ComponentId::from_str(&name).ok(),
3190 source: ComponentSourceRef::from_str(
3191 "oci://registry.test/component@sha256:deadbeef",
3192 )
3193 .expect("component ref"),
3194 resolved: ResolvedComponentV1 {
3195 digest: "sha256:deadbeef".to_string(),
3196 signature: None,
3197 signed_by: None,
3198 },
3199 artifact: if bundled {
3200 ArtifactLocationV1::Inline {
3201 wasm_path: format!("components/{name}.wasm"),
3202 manifest_path: None,
3203 }
3204 } else {
3205 ArtifactLocationV1::Remote
3206 },
3207 licensing_hint: None,
3208 metering_hint: None,
3209 }]))
3210 } else {
3211 None
3212 };
3213
3214 let request = if request_by_id {
3215 ResolveRequest::ById(component_id_str.clone())
3216 } else {
3217 ResolveRequest::ByName(name.clone())
3218 };
3219
3220 Scenario {
3221 pack_lock,
3222 component_sources,
3223 request,
3224 expected_sha256,
3225 bytes,
3226 }
3227 },
3228 )
3229 }
3230
3231 #[test]
3232 fn pack_resolution_proptest() {
3233 let seed = proptest_seed();
3234 run_cases(scenario_strategy(), proptest_config().cases, seed);
3235 }
3236
3237 #[test]
3238 fn pack_resolution_regression_seeds() {
3239 let seeds_path =
3240 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
3241 let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
3242 for line in raw.lines() {
3243 let line = line.trim();
3244 if line.is_empty() || line.starts_with('#') {
3245 continue;
3246 }
3247 let seed = line.parse::<u64>().expect("seed must be an integer");
3248 let mut bytes = [0u8; 32];
3249 bytes[..8].copy_from_slice(&seed.to_le_bytes());
3250 run_cases(scenario_strategy(), 1, Some(bytes));
3251 }
3252 }
3253}
3254
3255fn locate_pack_assets(
3256 materialized_root: Option<&Path>,
3257 archive_hint: Option<&Path>,
3258) -> Result<(Option<PathBuf>, Option<TempDir>)> {
3259 if let Some(root) = materialized_root {
3260 let assets = root.join("assets");
3261 if assets.is_dir() {
3262 return Ok((Some(assets), None));
3263 }
3264 }
3265 if let Some(path) = archive_hint
3266 && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
3267 {
3268 return Ok((Some(assets), Some(tempdir)));
3269 }
3270 Ok((None, None))
3271}
3272
3273fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
3274 let file =
3275 File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
3276 let mut archive =
3277 ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
3278 let temp = TempDir::new().context("failed to create temporary assets directory")?;
3279 let mut found = false;
3280 for idx in 0..archive.len() {
3281 let mut entry = archive.by_index(idx)?;
3282 let name = entry.name();
3283 if !name.starts_with("assets/") {
3284 continue;
3285 }
3286 let dest = temp.path().join(name);
3287 if name.ends_with('/') {
3288 std::fs::create_dir_all(&dest)?;
3289 found = true;
3290 continue;
3291 }
3292 if let Some(parent) = dest.parent() {
3293 std::fs::create_dir_all(parent)?;
3294 }
3295 let mut outfile = std::fs::File::create(&dest)?;
3296 std::io::copy(&mut entry, &mut outfile)?;
3297 found = true;
3298 }
3299 if found {
3300 let assets_path = temp.path().join("assets");
3301 Ok(Some((temp, assets_path)))
3302 } else {
3303 Ok(None)
3304 }
3305}
3306
3307fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
3308 let mut opts = DistOptions {
3309 allow_tags: true,
3310 ..DistOptions::default()
3311 };
3312 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
3313 opts.cache_dir = cache_dir;
3314 }
3315 if component_resolution.dist_offline {
3316 opts.offline = true;
3317 }
3318 opts
3319}
3320
3321#[allow(clippy::too_many_arguments)]
3322async fn load_components_from_sources(
3323 cache: &CacheManager,
3324 engine: &Engine,
3325 component_sources: &HashMap<String, ComponentSourceInfo>,
3326 component_resolution: &ComponentResolution,
3327 specs: &[ComponentSpec],
3328 missing: &mut HashSet<String>,
3329 into: &mut HashMap<String, PackComponent>,
3330 materialized_root: Option<&Path>,
3331 archive_hint: Option<&Path>,
3332) -> Result<()> {
3333 let mut archive = if let Some(path) = archive_hint {
3334 Some(
3335 ZipArchive::new(File::open(path)?)
3336 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
3337 )
3338 } else {
3339 None
3340 };
3341 let mut dist_client: Option<DistClient> = None;
3342
3343 for spec in specs {
3344 if !missing.contains(&spec.id) {
3345 continue;
3346 }
3347 let Some(source) = component_sources.get(&spec.id) else {
3348 continue;
3349 };
3350
3351 let bytes = match &source.artifact {
3352 ComponentArtifactLocation::Inline { wasm_path } => {
3353 if let Some(root) = materialized_root {
3354 let path = root.join(wasm_path);
3355 if path.exists() {
3356 std::fs::read(&path).with_context(|| {
3357 format!(
3358 "failed to read inline component {} from {}",
3359 spec.id,
3360 path.display()
3361 )
3362 })?
3363 } else if archive.is_none() {
3364 bail!("inline component {} missing at {}", spec.id, path.display());
3365 } else {
3366 read_entry(
3367 archive.as_mut().expect("archive present when needed"),
3368 wasm_path,
3369 )
3370 .with_context(|| {
3371 format!(
3372 "inline component {} missing at {} in pack archive",
3373 spec.id, wasm_path
3374 )
3375 })?
3376 }
3377 } else if let Some(archive) = archive.as_mut() {
3378 read_entry(archive, wasm_path).with_context(|| {
3379 format!(
3380 "inline component {} missing at {} in pack archive",
3381 spec.id, wasm_path
3382 )
3383 })?
3384 } else {
3385 bail!(
3386 "inline component {} missing and no pack source available",
3387 spec.id
3388 );
3389 }
3390 }
3391 ComponentArtifactLocation::Remote => {
3392 if source.source.is_tag() {
3393 bail!(
3394 "component {} uses tag ref {} but is not bundled; rebuild the pack",
3395 spec.id,
3396 source.source
3397 );
3398 }
3399 let client = dist_client.get_or_insert_with(|| {
3400 DistClient::new(dist_options_from(component_resolution))
3401 });
3402 let reference = source.source.to_string();
3403 fault::maybe_fail_asset(&reference)
3404 .await
3405 .with_context(|| format!("fault injection blocked asset {reference}"))?;
3406 let digest = source.digest.as_deref().ok_or_else(|| {
3407 anyhow!(
3408 "component {} missing expected digest for remote component",
3409 spec.id
3410 )
3411 })?;
3412 let cache_path = if component_resolution.dist_offline {
3413 client
3414 .fetch_digest(digest)
3415 .await
3416 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3417 } else {
3418 let resolved = client
3419 .resolve_ref(&reference)
3420 .await
3421 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3422 let expected = normalize_digest(digest);
3423 let actual = normalize_digest(&resolved.digest);
3424 if expected != actual {
3425 bail!(
3426 "component {} digest mismatch after fetch: expected {}, got {}",
3427 spec.id,
3428 expected,
3429 actual
3430 );
3431 }
3432 resolved.cache_path.ok_or_else(|| {
3433 anyhow!(
3434 "component {} resolved from {} but cache path is missing",
3435 spec.id,
3436 reference
3437 )
3438 })?
3439 };
3440 std::fs::read(&cache_path).with_context(|| {
3441 format!(
3442 "failed to read cached component {} from {}",
3443 spec.id,
3444 cache_path.display()
3445 )
3446 })?
3447 }
3448 };
3449
3450 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3451 verify_wasm_sha256(&spec.id, expected, &bytes)?;
3452 } else if source.skip_digest_verification {
3453 let actual = compute_sha256_digest_for(&bytes);
3454 warn!(
3455 component_id = %spec.id,
3456 digest = %actual,
3457 "bundled component missing wasm_sha256; allowing due to flag"
3458 );
3459 } else {
3460 let expected = source.digest.as_deref().ok_or_else(|| {
3461 anyhow!(
3462 "component {} missing expected digest for verification",
3463 spec.id
3464 )
3465 })?;
3466 verify_component_digest(&spec.id, expected, &bytes)?;
3467 }
3468 let component =
3469 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3470 .await
3471 .with_context(|| format!("failed to compile component {}", spec.id))?;
3472 into.insert(
3473 spec.id.clone(),
3474 PackComponent {
3475 name: spec.id.clone(),
3476 version: spec.version.clone(),
3477 component,
3478 },
3479 );
3480 missing.remove(&spec.id);
3481 }
3482
3483 Ok(())
3484}
3485
3486fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3487 match err {
3488 DistError::CacheMiss { reference: missing } => anyhow!(
3489 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3490 component_id,
3491 missing,
3492 reference
3493 ),
3494 DistError::Offline { reference: blocked } => anyhow!(
3495 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3496 component_id,
3497 blocked,
3498 reference
3499 ),
3500 DistError::AuthRequired { target } => anyhow!(
3501 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3502 component_id,
3503 target,
3504 reference
3505 ),
3506 other => anyhow!(
3507 "failed to resolve component {} from {}: {}",
3508 component_id,
3509 reference,
3510 other
3511 ),
3512 }
3513}
3514
3515async fn load_components_from_overrides(
3516 cache: &CacheManager,
3517 engine: &Engine,
3518 overrides: &HashMap<String, PathBuf>,
3519 specs: &[ComponentSpec],
3520 missing: &mut HashSet<String>,
3521 into: &mut HashMap<String, PackComponent>,
3522) -> Result<()> {
3523 for spec in specs {
3524 if !missing.contains(&spec.id) {
3525 continue;
3526 }
3527 let Some(path) = overrides.get(&spec.id) else {
3528 continue;
3529 };
3530 let bytes = std::fs::read(path)
3531 .with_context(|| format!("failed to read override component {}", path.display()))?;
3532 let component = compile_component_with_cache(cache, engine, None, bytes)
3533 .await
3534 .with_context(|| {
3535 format!(
3536 "failed to compile component {} from override {}",
3537 spec.id,
3538 path.display()
3539 )
3540 })?;
3541 into.insert(
3542 spec.id.clone(),
3543 PackComponent {
3544 name: spec.id.clone(),
3545 version: spec.version.clone(),
3546 component,
3547 },
3548 );
3549 missing.remove(&spec.id);
3550 }
3551 Ok(())
3552}
3553
3554async fn load_components_from_dir(
3555 cache: &CacheManager,
3556 engine: &Engine,
3557 root: &Path,
3558 specs: &[ComponentSpec],
3559 missing: &mut HashSet<String>,
3560 into: &mut HashMap<String, PackComponent>,
3561) -> Result<()> {
3562 for spec in specs {
3563 if !missing.contains(&spec.id) {
3564 continue;
3565 }
3566 let path = component_path_for_spec(root, spec);
3567 if !path.exists() {
3568 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3569 continue;
3570 }
3571 let bytes = std::fs::read(&path)
3572 .with_context(|| format!("failed to read component {}", path.display()))?;
3573 let component = compile_component_with_cache(cache, engine, None, bytes)
3574 .await
3575 .with_context(|| {
3576 format!(
3577 "failed to compile component {} from {}",
3578 spec.id,
3579 path.display()
3580 )
3581 })?;
3582 into.insert(
3583 spec.id.clone(),
3584 PackComponent {
3585 name: spec.id.clone(),
3586 version: spec.version.clone(),
3587 component,
3588 },
3589 );
3590 missing.remove(&spec.id);
3591 }
3592 Ok(())
3593}
3594
3595async fn load_components_from_archive(
3596 cache: &CacheManager,
3597 engine: &Engine,
3598 path: &Path,
3599 specs: &[ComponentSpec],
3600 missing: &mut HashSet<String>,
3601 into: &mut HashMap<String, PackComponent>,
3602) -> Result<()> {
3603 let mut archive = ZipArchive::new(File::open(path)?)
3604 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3605 for spec in specs {
3606 if !missing.contains(&spec.id) {
3607 continue;
3608 }
3609 let file_name = spec
3610 .legacy_path
3611 .clone()
3612 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3613 let bytes = match read_entry(&mut archive, &file_name) {
3614 Ok(bytes) => bytes,
3615 Err(err) => {
3616 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3617 continue;
3618 }
3619 };
3620 let component = compile_component_with_cache(cache, engine, None, bytes)
3621 .await
3622 .with_context(|| format!("failed to compile component {}", spec.id))?;
3623 into.insert(
3624 spec.id.clone(),
3625 PackComponent {
3626 name: spec.id.clone(),
3627 version: spec.version.clone(),
3628 component,
3629 },
3630 );
3631 missing.remove(&spec.id);
3632 }
3633 Ok(())
3634}
3635
3636#[cfg(test)]
3637mod tests {
3638 use super::*;
3639 use greentic_flow::model::{FlowDoc, NodeDoc};
3640 use indexmap::IndexMap;
3641 use serde_json::json;
3642
3643 #[test]
3644 fn normalizes_raw_component_to_component_exec() {
3645 let mut nodes = IndexMap::new();
3646 let mut raw = IndexMap::new();
3647 raw.insert(
3648 "templating.handlebars".into(),
3649 json!({ "template": "Hi {{name}}" }),
3650 );
3651 nodes.insert(
3652 "start".into(),
3653 NodeDoc {
3654 raw,
3655 routing: json!([{"out": true}]),
3656 ..Default::default()
3657 },
3658 );
3659 let doc = FlowDoc {
3660 id: "welcome".into(),
3661 title: None,
3662 description: None,
3663 flow_type: "messaging".into(),
3664 start: Some("start".into()),
3665 parameters: json!({}),
3666 tags: Vec::new(),
3667 schema_version: None,
3668 entrypoints: IndexMap::new(),
3669 nodes,
3670 };
3671
3672 let normalized = normalize_flow_doc(doc);
3673 let node = normalized.nodes.get("start").expect("node exists");
3674 assert_eq!(node.operation.as_deref(), Some("component.exec"));
3675 assert!(node.raw.is_empty());
3676 let payload = node.payload.as_object().expect("payload object");
3677 assert_eq!(
3678 payload.get("component"),
3679 Some(&Value::String("templating.handlebars".into()))
3680 );
3681 assert_eq!(
3682 payload.get("operation"),
3683 Some(&Value::String("render".into()))
3684 );
3685 let input = payload.get("input").unwrap();
3686 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
3687 }
3688}
3689
3690#[derive(Clone, Debug, Default, Serialize, Deserialize)]
3691pub struct PackMetadata {
3692 pub pack_id: String,
3693 pub version: String,
3694 #[serde(default)]
3695 pub entry_flows: Vec<String>,
3696 #[serde(default)]
3697 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
3698}
3699
3700impl PackMetadata {
3701 fn from_wasm(bytes: &[u8]) -> Option<Self> {
3702 let parser = Parser::new(0);
3703 for payload in parser.parse_all(bytes) {
3704 let payload = payload.ok()?;
3705 match payload {
3706 Payload::CustomSection(section) => {
3707 if section.name() == "greentic.manifest"
3708 && let Ok(meta) = Self::from_bytes(section.data())
3709 {
3710 return Some(meta);
3711 }
3712 }
3713 Payload::DataSection(reader) => {
3714 for segment in reader.into_iter().flatten() {
3715 if let Ok(meta) = Self::from_bytes(segment.data) {
3716 return Some(meta);
3717 }
3718 }
3719 }
3720 _ => {}
3721 }
3722 }
3723 None
3724 }
3725
3726 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
3727 #[derive(Deserialize)]
3728 struct RawManifest {
3729 pack_id: String,
3730 version: String,
3731 #[serde(default)]
3732 entry_flows: Vec<String>,
3733 #[serde(default)]
3734 flows: Vec<RawFlow>,
3735 #[serde(default)]
3736 secret_requirements: Vec<greentic_types::SecretRequirement>,
3737 }
3738
3739 #[derive(Deserialize)]
3740 struct RawFlow {
3741 id: String,
3742 }
3743
3744 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
3745 let mut entry_flows = if manifest.entry_flows.is_empty() {
3746 manifest.flows.iter().map(|f| f.id.clone()).collect()
3747 } else {
3748 manifest.entry_flows.clone()
3749 };
3750 entry_flows.retain(|id| !id.is_empty());
3751 Ok(Self {
3752 pack_id: manifest.pack_id,
3753 version: manifest.version,
3754 entry_flows,
3755 secret_requirements: manifest.secret_requirements,
3756 })
3757 }
3758
3759 pub fn fallback(path: &Path) -> Self {
3760 let pack_id = path
3761 .file_stem()
3762 .map(|s| s.to_string_lossy().into_owned())
3763 .unwrap_or_else(|| "unknown-pack".to_string());
3764 Self {
3765 pack_id,
3766 version: "0.0.0".to_string(),
3767 entry_flows: Vec::new(),
3768 secret_requirements: Vec::new(),
3769 }
3770 }
3771
3772 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
3773 let entry_flows = manifest
3774 .flows
3775 .iter()
3776 .map(|flow| flow.id.as_str().to_string())
3777 .collect::<Vec<_>>();
3778 Self {
3779 pack_id: manifest.pack_id.as_str().to_string(),
3780 version: manifest.version.to_string(),
3781 entry_flows,
3782 secret_requirements: manifest.secret_requirements.clone(),
3783 }
3784 }
3785}