1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use futures::executor::block_on;
20use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
21use greentic_interfaces_wasmtime::host_helpers::v1::{
22 self as host_v1, HostFns, add_all_v1_to_linker,
23 runner_host_http::RunnerHostHttp,
24 runner_host_kv::RunnerHostKv,
25 secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
26 state_store::{
27 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
28 StateStoreHost, TenantCtx as StateTenantCtx,
29 },
30 telemetry_logger::{
31 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
32 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
33 TenantCtx as TelemetryTenantCtx,
34 },
35};
36use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
37use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
38use greentic_pack::builder as legacy_pack;
39use greentic_types::flow::FlowHasher;
40use greentic_types::{
41 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
42 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
43 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
44 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
45 pack_manifest::ExtensionInline,
46};
47use host_v1::http_client::{
48 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
49 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
50 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
51 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
52};
53use indexmap::IndexMap;
54use once_cell::sync::Lazy;
55use parking_lot::{Mutex, RwLock};
56use reqwest::blocking::Client as BlockingClient;
57use runner_core::normalize_under_root;
58use serde::{Deserialize, Serialize};
59use serde_cbor;
60use serde_json::{self, Value};
61use sha2::Digest;
62use tempfile::TempDir;
63use tokio::fs;
64use wasmparser::{Parser, Payload};
65use wasmtime::{Store, StoreContextMut};
66use zip::ZipArchive;
67
68use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
69use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
70use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
71#[cfg(feature = "fault-injection")]
72use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
73
74use crate::config::HostConfig;
75use crate::fault;
76use crate::secrets::{DynSecretsManager, read_secret_blocking, write_secret_blocking};
77use crate::storage::state::STATE_PREFIX;
78use crate::storage::{DynSessionStore, DynStateStore};
79use crate::verify;
80use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
81use tracing::warn;
82use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
83use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
84
85use greentic_flow::model::FlowDoc;
86
87#[allow(dead_code)]
88pub struct PackRuntime {
89 path: PathBuf,
91 archive_path: Option<PathBuf>,
93 config: Arc<HostConfig>,
94 engine: Engine,
95 metadata: PackMetadata,
96 manifest: Option<greentic_types::PackManifest>,
97 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
98 component_manifests: HashMap<String, ComponentManifest>,
99 mocks: Option<Arc<MockLayer>>,
100 flows: Option<PackFlows>,
101 components: HashMap<String, PackComponent>,
102 http_client: Arc<BlockingClient>,
103 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
104 session_store: Option<DynSessionStore>,
105 state_store: Option<DynStateStore>,
106 wasi_policy: Arc<RunnerWasiPolicy>,
107 assets_tempdir: Option<TempDir>,
108 provider_registry: RwLock<Option<ProviderRegistry>>,
109 secrets: DynSecretsManager,
110 oauth_config: Option<OAuthBrokerConfig>,
111 cache: CacheManager,
112}
113
114struct PackComponent {
115 #[allow(dead_code)]
116 name: String,
117 #[allow(dead_code)]
118 version: String,
119 component: Arc<Component>,
120}
121
122fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
123where
124 F: FnOnce() -> Result<T> + Send + 'static,
125 T: Send + 'static,
126{
127 let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
128 let handle = builder
129 .spawn(move || {
130 let pid = std::process::id();
131 let thread_id = std::thread::current().id();
132 let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
133 tracing::info!(
134 event = "wasmtime.thread.start",
135 task = task_name,
136 pid,
137 thread_id = ?thread_id,
138 tokio_handle_present,
139 "starting Wasmtime thread"
140 );
141 task()
142 })
143 .context("failed to spawn Wasmtime thread")?;
144 handle
145 .join()
146 .map_err(|err| {
147 let reason = if let Some(msg) = err.downcast_ref::<&str>() {
148 msg.to_string()
149 } else if let Some(msg) = err.downcast_ref::<String>() {
150 msg.clone()
151 } else {
152 "unknown panic".to_string()
153 };
154 anyhow!("Wasmtime thread panicked: {reason}")
155 })
156 .and_then(|res| res)
157}
158
159#[derive(Debug, Default, Clone)]
160pub struct ComponentResolution {
161 pub materialized_root: Option<PathBuf>,
163 pub overrides: HashMap<String, PathBuf>,
165 pub dist_offline: bool,
167 pub dist_cache_dir: Option<PathBuf>,
169 pub allow_missing_hash: bool,
171}
172
173fn build_blocking_client() -> BlockingClient {
174 std::thread::spawn(|| {
175 BlockingClient::builder()
176 .no_proxy()
177 .build()
178 .expect("blocking client")
179 })
180 .join()
181 .expect("client build thread panicked")
182}
183
184fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
185 let (root, candidate) = if path.is_absolute() {
186 let parent = path
187 .parent()
188 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
189 let root = parent
190 .canonicalize()
191 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
192 let file = path
193 .file_name()
194 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
195 (root, PathBuf::from(file))
196 } else {
197 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
198 let base = if let Some(parent) = path.parent() {
199 cwd.join(parent)
200 } else {
201 cwd
202 };
203 let root = base
204 .canonicalize()
205 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
206 let file = path
207 .file_name()
208 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
209 (root, PathBuf::from(file))
210 };
211 let safe = normalize_under_root(&root, &candidate)?;
212 Ok((root, safe))
213}
214
215static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218pub struct FlowDescriptor {
219 pub id: String,
220 #[serde(rename = "type")]
221 pub flow_type: String,
222 pub pack_id: String,
223 pub profile: String,
224 pub version: String,
225 #[serde(default)]
226 pub description: Option<String>,
227}
228
229pub struct HostState {
230 #[allow(dead_code)]
231 pack_id: String,
232 config: Arc<HostConfig>,
233 http_client: Arc<BlockingClient>,
234 default_env: String,
235 #[allow(dead_code)]
236 session_store: Option<DynSessionStore>,
237 state_store: Option<DynStateStore>,
238 mocks: Option<Arc<MockLayer>>,
239 secrets: DynSecretsManager,
240 oauth_config: Option<OAuthBrokerConfig>,
241 oauth_host: OAuthBrokerHost,
242 exec_ctx: Option<ComponentExecCtx>,
243 component_ref: Option<String>,
244 provider_core_component: bool,
245}
246
247impl HostState {
248 #[allow(clippy::default_constructed_unit_structs)]
249 #[allow(clippy::too_many_arguments)]
250 pub fn new(
251 pack_id: String,
252 config: Arc<HostConfig>,
253 http_client: Arc<BlockingClient>,
254 mocks: Option<Arc<MockLayer>>,
255 session_store: Option<DynSessionStore>,
256 state_store: Option<DynStateStore>,
257 secrets: DynSecretsManager,
258 oauth_config: Option<OAuthBrokerConfig>,
259 exec_ctx: Option<ComponentExecCtx>,
260 component_ref: Option<String>,
261 provider_core_component: bool,
262 ) -> Result<Self> {
263 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
264 Ok(Self {
265 pack_id,
266 config,
267 http_client,
268 default_env,
269 session_store,
270 state_store,
271 mocks,
272 secrets,
273 oauth_config,
274 oauth_host: OAuthBrokerHost::default(),
275 exec_ctx,
276 component_ref,
277 provider_core_component,
278 })
279 }
280
281 fn instantiate_component_result(
282 linker: &mut Linker<ComponentState>,
283 store: &mut Store<ComponentState>,
284 component: &Component,
285 ctx: &ComponentExecCtx,
286 operation: &str,
287 input_json: &str,
288 ) -> Result<InvokeResult> {
289 let pre_instance = linker.instantiate_pre(component)?;
290 match component_api::v0_5::ComponentPre::new(pre_instance) {
291 Ok(pre) => {
292 let result = block_on(async {
293 let bindings = pre.instantiate_async(&mut *store).await?;
294 let node = bindings.greentic_component_node();
295 let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
296 let operation_owned = operation.to_string();
297 let input_owned = input_json.to_string();
298 node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
299 })?;
300 Ok(component_api::invoke_result_from_v0_5(result))
301 }
302 Err(err) => {
303 if is_missing_node_export(&err, "0.5.0") {
304 let pre_instance = linker.instantiate_pre(component)?;
305 let pre: component_api::v0_4::ComponentPre<ComponentState> =
306 component_api::v0_4::ComponentPre::new(pre_instance)?;
307 let result = block_on(async {
308 let bindings = pre.instantiate_async(&mut *store).await?;
309 let node = bindings.greentic_component_node();
310 let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
311 let operation_owned = operation.to_string();
312 let input_owned = input_json.to_string();
313 node.call_invoke(&mut *store, &ctx_v04, &operation_owned, &input_owned)
314 })?;
315 Ok(component_api::invoke_result_from_v0_4(result))
316 } else {
317 Err(err)
318 }
319 }
320 }
321 }
322
323 fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
324 match result {
325 InvokeResult::Ok(body) => {
326 if body.is_empty() {
327 return Ok(Value::Null);
328 }
329 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
330 }
331 InvokeResult::Err(NodeError {
332 code,
333 message,
334 retryable,
335 backoff_ms,
336 details,
337 }) => {
338 let mut obj = serde_json::Map::new();
339 obj.insert("ok".into(), Value::Bool(false));
340 let mut error = serde_json::Map::new();
341 error.insert("code".into(), Value::String(code));
342 error.insert("message".into(), Value::String(message));
343 error.insert("retryable".into(), Value::Bool(retryable));
344 if let Some(backoff) = backoff_ms {
345 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
346 }
347 if let Some(details) = details {
348 error.insert(
349 "details".into(),
350 serde_json::from_str(&details).unwrap_or(Value::String(details)),
351 );
352 }
353 obj.insert("error".into(), Value::Object(error));
354 Ok(Value::Object(obj))
355 }
356 }
357 }
358
359 pub fn get_secret(&self, key: &str) -> Result<String> {
360 if provider_core_only::is_enabled() {
361 bail!(provider_core_only::blocked_message("secrets"))
362 }
363 if !self.config.secrets_policy.is_allowed(key) {
364 bail!("secret {key} is not permitted by bindings policy");
365 }
366 if let Some(mock) = &self.mocks
367 && let Some(value) = mock.secrets_lookup(key)
368 {
369 return Ok(value);
370 }
371 let ctx = self.config.tenant_ctx();
372 let bytes = read_secret_blocking(&self.secrets, &ctx, key)
373 .context("failed to read secret from manager")?;
374 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
375 Ok(value)
376 }
377
378 fn allows_secret_write_in_provider_core_only(&self) -> bool {
379 self.provider_core_component || self.component_ref.is_none()
380 }
381
382 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
383 let tenant_raw = ctx
384 .as_ref()
385 .map(|ctx| ctx.tenant.clone())
386 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
387 .unwrap_or_else(|| self.config.tenant.clone());
388 let env_raw = ctx
389 .as_ref()
390 .map(|ctx| ctx.env.clone())
391 .unwrap_or_else(|| self.default_env.clone());
392 let tenant_id = TenantId::from_str(&tenant_raw)
393 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
394 let env_id = EnvId::from_str(&env_raw)
395 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
396 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
397 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
398 if let Some(team) = exec_ctx.tenant.team.as_ref() {
399 let team_id =
400 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
401 tenant_ctx = tenant_ctx.with_team(Some(team_id));
402 }
403 if let Some(user) = exec_ctx.tenant.user.as_ref() {
404 let user_id =
405 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
406 tenant_ctx = tenant_ctx.with_user(Some(user_id));
407 }
408 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
409 if let Some(node) = exec_ctx.node_id.as_ref() {
410 tenant_ctx = tenant_ctx.with_node(node.clone());
411 }
412 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
413 tenant_ctx = tenant_ctx.with_session(session.clone());
414 }
415 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
416 }
417
418 if let Some(ctx) = ctx {
419 if let Some(team) = ctx.team.or(ctx.team_id) {
420 let team_id =
421 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
422 tenant_ctx = tenant_ctx.with_team(Some(team_id));
423 }
424 if let Some(user) = ctx.user.or(ctx.user_id) {
425 let user_id =
426 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
427 tenant_ctx = tenant_ctx.with_user(Some(user_id));
428 }
429 if let Some(flow) = ctx.flow_id {
430 tenant_ctx = tenant_ctx.with_flow(flow);
431 }
432 if let Some(node) = ctx.node_id {
433 tenant_ctx = tenant_ctx.with_node(node);
434 }
435 if let Some(provider) = ctx.provider_id {
436 tenant_ctx = tenant_ctx.with_provider(provider);
437 }
438 if let Some(session) = ctx.session_id {
439 tenant_ctx = tenant_ctx.with_session(session);
440 }
441 tenant_ctx.trace_id = ctx.trace_id;
442 }
443 Ok(tenant_ctx)
444 }
445
446 fn send_http_request(
447 &mut self,
448 req: HttpRequest,
449 opts: Option<HttpRequestOptionsV1_1>,
450 _ctx: Option<HttpTenantCtx>,
451 ) -> Result<HttpResponse, HttpClientError> {
452 if !self.config.http_enabled {
453 return Err(HttpClientError {
454 code: "denied".into(),
455 message: "http client disabled by policy".into(),
456 });
457 }
458
459 let mut mock_state = None;
460 let raw_body = req.body.clone();
461 if let Some(mock) = &self.mocks
462 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
463 {
464 match mock.http_begin(&meta) {
465 HttpDecision::Mock(response) => {
466 let headers = response
467 .headers
468 .iter()
469 .map(|(k, v)| (k.clone(), v.clone()))
470 .collect();
471 return Ok(HttpResponse {
472 status: response.status,
473 headers,
474 body: response.body.clone().map(|b| b.into_bytes()),
475 });
476 }
477 HttpDecision::Deny(reason) => {
478 return Err(HttpClientError {
479 code: "denied".into(),
480 message: reason,
481 });
482 }
483 HttpDecision::Passthrough { record } => {
484 mock_state = Some((meta, record));
485 }
486 }
487 }
488
489 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
490 let mut builder = self.http_client.request(method, &req.url);
491 for (key, value) in req.headers {
492 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
493 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
494 {
495 builder = builder.header(header, header_value);
496 }
497 }
498
499 if let Some(body) = raw_body.clone() {
500 builder = builder.body(body);
501 }
502
503 if let Some(opts) = opts {
504 if let Some(timeout_ms) = opts.timeout_ms {
505 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
506 }
507 if opts.allow_insecure == Some(true) {
508 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
509 }
510 if let Some(follow_redirects) = opts.follow_redirects
511 && !follow_redirects
512 {
513 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
514 }
515 }
516
517 let response = match builder.send() {
518 Ok(resp) => resp,
519 Err(err) => {
520 warn!(url = %req.url, error = %err, "http client request failed");
521 return Err(HttpClientError {
522 code: "unavailable".into(),
523 message: err.to_string(),
524 });
525 }
526 };
527
528 let status = response.status().as_u16();
529 let headers_vec = response
530 .headers()
531 .iter()
532 .map(|(k, v)| {
533 (
534 k.as_str().to_string(),
535 v.to_str().unwrap_or_default().to_string(),
536 )
537 })
538 .collect::<Vec<_>>();
539 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
540
541 if let Some((meta, true)) = mock_state.take()
542 && let Some(mock) = &self.mocks
543 {
544 let recorded = HttpMockResponse::new(
545 status,
546 headers_vec.clone().into_iter().collect(),
547 body_bytes
548 .as_ref()
549 .map(|b| String::from_utf8_lossy(b).into_owned()),
550 );
551 mock.http_record(&meta, &recorded);
552 }
553
554 Ok(HttpResponse {
555 status,
556 headers: headers_vec,
557 body: body_bytes,
558 })
559 }
560}
561
562impl SecretsStoreHost for HostState {
563 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
564 if provider_core_only::is_enabled() {
565 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
566 return Err(SecretsError::Denied);
567 }
568 if !self.config.secrets_policy.is_allowed(&key) {
569 return Err(SecretsError::Denied);
570 }
571 if let Some(mock) = &self.mocks
572 && let Some(value) = mock.secrets_lookup(&key)
573 {
574 return Ok(Some(value.into_bytes()));
575 }
576 let ctx = self.config.tenant_ctx();
577 match read_secret_blocking(&self.secrets, &ctx, &key) {
578 Ok(bytes) => Ok(Some(bytes)),
579 Err(err) => {
580 warn!(secret = %key, error = %err, "secret lookup failed");
581 Err(SecretsError::NotFound)
582 }
583 }
584 }
585}
586
587impl SecretsStoreHostV1_1 for HostState {
588 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
589 if provider_core_only::is_enabled() {
590 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
591 return Err(SecretsErrorV1_1::Denied);
592 }
593 if !self.config.secrets_policy.is_allowed(&key) {
594 return Err(SecretsErrorV1_1::Denied);
595 }
596 if let Some(mock) = &self.mocks
597 && let Some(value) = mock.secrets_lookup(&key)
598 {
599 return Ok(Some(value.into_bytes()));
600 }
601 let ctx = self.config.tenant_ctx();
602 match read_secret_blocking(&self.secrets, &ctx, &key) {
603 Ok(bytes) => Ok(Some(bytes)),
604 Err(err) => {
605 warn!(secret = %key, error = %err, "secret lookup failed");
606 Err(SecretsErrorV1_1::NotFound)
607 }
608 }
609 }
610
611 fn put(&mut self, key: String, value: Vec<u8>) {
612 if key.trim().is_empty() {
613 warn!(secret = %key, "secret write blocked: empty key");
614 panic!("secret write denied for key {key}: invalid key");
615 }
616 if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
617 warn!(
618 secret = %key,
619 component = self.component_ref.as_deref().unwrap_or("<pack>"),
620 "provider-core only mode enabled; blocking secrets store write"
621 );
622 panic!("secret write denied for key {key}: provider-core-only mode");
623 }
624 if !self.config.secrets_policy.is_allowed(&key) {
625 warn!(secret = %key, "secret write denied by bindings policy");
626 panic!("secret write denied for key {key}: policy");
627 }
628 let ctx = self.config.tenant_ctx();
629 if let Err(err) = write_secret_blocking(&self.secrets, &ctx, &key, &value) {
630 warn!(secret = %key, error = %err, "secret write failed");
631 panic!("secret write failed for key {key}");
632 }
633 }
634}
635
636impl HttpClientHost for HostState {
637 fn send(
638 &mut self,
639 req: HttpRequest,
640 ctx: Option<HttpTenantCtx>,
641 ) -> Result<HttpResponse, HttpClientError> {
642 self.send_http_request(req, None, ctx)
643 }
644}
645
646impl HttpClientHostV1_1 for HostState {
647 fn send(
648 &mut self,
649 req: HttpRequestV1_1,
650 opts: Option<HttpRequestOptionsV1_1>,
651 ctx: Option<HttpTenantCtxV1_1>,
652 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
653 let legacy_req = HttpRequest {
654 method: req.method,
655 url: req.url,
656 headers: req.headers,
657 body: req.body,
658 };
659 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
660 env: ctx.env,
661 tenant: ctx.tenant,
662 tenant_id: ctx.tenant_id,
663 team: ctx.team,
664 team_id: ctx.team_id,
665 user: ctx.user,
666 user_id: ctx.user_id,
667 trace_id: ctx.trace_id,
668 correlation_id: ctx.correlation_id,
669 attributes: ctx.attributes,
670 session_id: ctx.session_id,
671 flow_id: ctx.flow_id,
672 node_id: ctx.node_id,
673 provider_id: ctx.provider_id,
674 deadline_ms: ctx.deadline_ms,
675 attempt: ctx.attempt,
676 idempotency_key: ctx.idempotency_key,
677 impersonation: ctx
678 .impersonation
679 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
680 actor_id,
681 reason,
682 }),
683 });
684
685 self.send_http_request(legacy_req, opts, legacy_ctx)
686 .map(|resp| HttpResponseV1_1 {
687 status: resp.status,
688 headers: resp.headers,
689 body: resp.body,
690 })
691 .map_err(|err| HttpClientErrorV1_1 {
692 code: err.code,
693 message: err.message,
694 })
695 }
696}
697
698impl StateStoreHost for HostState {
699 fn read(
700 &mut self,
701 key: HostStateKey,
702 ctx: Option<StateTenantCtx>,
703 ) -> Result<Vec<u8>, StateError> {
704 let store = match self.state_store.as_ref() {
705 Some(store) => store.clone(),
706 None => {
707 return Err(StateError {
708 code: "unavailable".into(),
709 message: "state store not configured".into(),
710 });
711 }
712 };
713 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
714 Ok(ctx) => ctx,
715 Err(err) => {
716 return Err(StateError {
717 code: "invalid-ctx".into(),
718 message: err.to_string(),
719 });
720 }
721 };
722 #[cfg(feature = "fault-injection")]
723 {
724 let exec_ctx = self.exec_ctx.as_ref();
725 let flow_id = exec_ctx
726 .map(|ctx| ctx.flow_id.as_str())
727 .unwrap_or("unknown");
728 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
729 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
730 let fault_ctx = FaultContext {
731 pack_id: self.pack_id.as_str(),
732 flow_id,
733 node_id,
734 attempt,
735 };
736 if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
737 return Err(StateError {
738 code: "internal".into(),
739 message: err.to_string(),
740 });
741 }
742 }
743 let key = StoreStateKey::from(key);
744 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
745 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
746 Ok(None) => Err(StateError {
747 code: "not_found".into(),
748 message: "state key not found".into(),
749 }),
750 Err(err) => Err(StateError {
751 code: "internal".into(),
752 message: err.to_string(),
753 }),
754 }
755 }
756
757 fn write(
758 &mut self,
759 key: HostStateKey,
760 bytes: Vec<u8>,
761 ctx: Option<StateTenantCtx>,
762 ) -> Result<StateOpAck, StateError> {
763 let store = match self.state_store.as_ref() {
764 Some(store) => store.clone(),
765 None => {
766 return Err(StateError {
767 code: "unavailable".into(),
768 message: "state store not configured".into(),
769 });
770 }
771 };
772 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
773 Ok(ctx) => ctx,
774 Err(err) => {
775 return Err(StateError {
776 code: "invalid-ctx".into(),
777 message: err.to_string(),
778 });
779 }
780 };
781 #[cfg(feature = "fault-injection")]
782 {
783 let exec_ctx = self.exec_ctx.as_ref();
784 let flow_id = exec_ctx
785 .map(|ctx| ctx.flow_id.as_str())
786 .unwrap_or("unknown");
787 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
788 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
789 let fault_ctx = FaultContext {
790 pack_id: self.pack_id.as_str(),
791 flow_id,
792 node_id,
793 attempt,
794 };
795 if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
796 return Err(StateError {
797 code: "internal".into(),
798 message: err.to_string(),
799 });
800 }
801 }
802 let key = StoreStateKey::from(key);
803 let value = serde_json::from_slice(&bytes)
804 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
805 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
806 Ok(()) => Ok(StateOpAck::Ok),
807 Err(err) => Err(StateError {
808 code: "internal".into(),
809 message: err.to_string(),
810 }),
811 }
812 }
813
814 fn delete(
815 &mut self,
816 key: HostStateKey,
817 ctx: Option<StateTenantCtx>,
818 ) -> Result<StateOpAck, StateError> {
819 let store = match self.state_store.as_ref() {
820 Some(store) => store.clone(),
821 None => {
822 return Err(StateError {
823 code: "unavailable".into(),
824 message: "state store not configured".into(),
825 });
826 }
827 };
828 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
829 Ok(ctx) => ctx,
830 Err(err) => {
831 return Err(StateError {
832 code: "invalid-ctx".into(),
833 message: err.to_string(),
834 });
835 }
836 };
837 let key = StoreStateKey::from(key);
838 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
839 Ok(_) => Ok(StateOpAck::Ok),
840 Err(err) => Err(StateError {
841 code: "internal".into(),
842 message: err.to_string(),
843 }),
844 }
845 }
846}
847
848impl TelemetryLoggerHost for HostState {
849 fn log(
850 &mut self,
851 span: TelemetrySpanContext,
852 fields: Vec<(String, String)>,
853 _ctx: Option<TelemetryTenantCtx>,
854 ) -> Result<TelemetryAck, TelemetryError> {
855 if let Some(mock) = &self.mocks
856 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
857 {
858 return Ok(TelemetryAck::Ok);
859 }
860 let mut map = serde_json::Map::new();
861 for (k, v) in fields {
862 map.insert(k, Value::String(v));
863 }
864 tracing::info!(
865 tenant = %span.tenant,
866 flow_id = %span.flow_id,
867 node = ?span.node_id,
868 provider = %span.provider,
869 fields = %serde_json::Value::Object(map.clone()),
870 "telemetry log from pack"
871 );
872 Ok(TelemetryAck::Ok)
873 }
874}
875
876impl RunnerHostHttp for HostState {
877 fn request(
878 &mut self,
879 method: String,
880 url: String,
881 headers: Vec<String>,
882 body: Option<Vec<u8>>,
883 ) -> Result<Vec<u8>, String> {
884 let req = HttpRequest {
885 method,
886 url,
887 headers: headers
888 .chunks(2)
889 .filter_map(|chunk| {
890 if chunk.len() == 2 {
891 Some((chunk[0].clone(), chunk[1].clone()))
892 } else {
893 None
894 }
895 })
896 .collect(),
897 body,
898 };
899 match HttpClientHost::send(self, req, None) {
900 Ok(resp) => Ok(resp.body.unwrap_or_default()),
901 Err(err) => Err(err.message),
902 }
903 }
904}
905
906impl RunnerHostKv for HostState {
907 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
908 None
909 }
910
911 fn put(&mut self, _ns: String, _key: String, _val: String) {}
912}
913
914enum ManifestLoad {
915 New {
916 manifest: Box<greentic_types::PackManifest>,
917 flows: PackFlows,
918 },
919 Legacy {
920 manifest: Box<legacy_pack::PackManifest>,
921 flows: PackFlows,
922 },
923}
924
925fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
926 let mut archive = ZipArchive::new(File::open(path)?)
927 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
928 let bytes = read_entry(&mut archive, "manifest.cbor")
929 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
930 match decode_pack_manifest(&bytes) {
931 Ok(manifest) => {
932 let cache = PackFlows::from_manifest(manifest.clone());
933 Ok(ManifestLoad::New {
934 manifest: Box::new(manifest),
935 flows: cache,
936 })
937 }
938 Err(err) => {
939 tracing::debug!(
940 error = %err,
941 pack = %path.display(),
942 "decode_pack_manifest failed for archive; falling back to legacy manifest"
943 );
944 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
945 .context("failed to decode legacy pack manifest from manifest.cbor")?;
946 let flows = load_legacy_flows_from_archive(&mut archive, path, &legacy)?;
947 Ok(ManifestLoad::Legacy {
948 manifest: Box::new(legacy),
949 flows,
950 })
951 }
952 }
953}
954
955fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
956 let manifest_path = root.join("manifest.cbor");
957 let bytes = std::fs::read(&manifest_path)
958 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
959 match decode_pack_manifest(&bytes) {
960 Ok(manifest) => {
961 let cache = PackFlows::from_manifest(manifest.clone());
962 Ok(ManifestLoad::New {
963 manifest: Box::new(manifest),
964 flows: cache,
965 })
966 }
967 Err(err) => {
968 tracing::debug!(
969 error = %err,
970 pack = %root.display(),
971 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
972 );
973 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
974 .context("failed to decode legacy pack manifest from manifest.cbor")?;
975 let flows = load_legacy_flows_from_dir(root, &legacy)?;
976 Ok(ManifestLoad::Legacy {
977 manifest: Box::new(legacy),
978 flows,
979 })
980 }
981 }
982}
983
984fn load_legacy_flows_from_dir(
985 root: &Path,
986 manifest: &legacy_pack::PackManifest,
987) -> Result<PackFlows> {
988 build_legacy_flows(manifest, |rel_path| {
989 let path = root.join(rel_path);
990 std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
991 })
992}
993
994fn load_legacy_flows_from_archive(
995 archive: &mut ZipArchive<File>,
996 pack_path: &Path,
997 manifest: &legacy_pack::PackManifest,
998) -> Result<PackFlows> {
999 build_legacy_flows(manifest, |rel_path| {
1000 read_entry(archive, rel_path)
1001 .with_context(|| format!("missing flow json {} in {}", rel_path, pack_path.display()))
1002 })
1003}
1004
1005fn build_legacy_flows(
1006 manifest: &legacy_pack::PackManifest,
1007 mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1008) -> Result<PackFlows> {
1009 let mut flows = HashMap::new();
1010 let mut descriptors = Vec::new();
1011
1012 for entry in &manifest.flows {
1013 let bytes = read_json(&entry.file_json)
1014 .with_context(|| format!("missing flow json {}", entry.file_json))?;
1015 let doc = parse_flow_doc_with_legacy_aliases(&bytes, &entry.file_json)?;
1016 let normalized = normalize_flow_doc(doc);
1017 let flow_ir = flow_doc_to_ir(normalized)?;
1018 let flow = flow_ir_to_flow(flow_ir)?;
1019
1020 descriptors.push(FlowDescriptor {
1021 id: entry.id.clone(),
1022 flow_type: entry.kind.clone(),
1023 pack_id: manifest.meta.pack_id.clone(),
1024 profile: manifest.meta.pack_id.clone(),
1025 version: manifest.meta.version.to_string(),
1026 description: None,
1027 });
1028 flows.insert(entry.id.clone(), flow);
1029 }
1030
1031 let mut entry_flows = manifest.meta.entry_flows.clone();
1032 if entry_flows.is_empty() {
1033 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1034 }
1035 let metadata = PackMetadata {
1036 pack_id: manifest.meta.pack_id.clone(),
1037 version: manifest.meta.version.to_string(),
1038 entry_flows,
1039 secret_requirements: Vec::new(),
1040 };
1041
1042 Ok(PackFlows {
1043 descriptors,
1044 flows,
1045 metadata,
1046 })
1047}
1048
1049fn parse_flow_doc_with_legacy_aliases(bytes: &[u8], path: &str) -> Result<FlowDoc> {
1050 let mut value: Value = serde_json::from_slice(bytes)
1051 .with_context(|| format!("failed to decode flow doc {}", path))?;
1052 if let Some(map) = value.as_object_mut()
1053 && !map.contains_key("type")
1054 && let Some(flow_type) = map.remove("flow_type")
1055 {
1056 map.insert("type".to_string(), flow_type);
1057 }
1058 serde_json::from_value(value).with_context(|| format!("failed to decode flow doc {}", path))
1059}
1060
1061pub struct ComponentState {
1062 pub host: HostState,
1063 wasi_ctx: WasiCtx,
1064 resource_table: ResourceTable,
1065}
1066
1067impl ComponentState {
1068 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1069 let wasi_ctx = policy
1070 .instantiate()
1071 .context("failed to build WASI context")?;
1072 Ok(Self {
1073 host,
1074 wasi_ctx,
1075 resource_table: ResourceTable::new(),
1076 })
1077 }
1078
1079 fn host_mut(&mut self) -> &mut HostState {
1080 &mut self.host
1081 }
1082
1083 fn should_cancel_host(&mut self) -> bool {
1084 false
1085 }
1086
1087 fn yield_now_host(&mut self) {
1088 }
1090}
1091
1092impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1093 fn should_cancel(&mut self) -> bool {
1094 self.should_cancel_host()
1095 }
1096
1097 fn yield_now(&mut self) {
1098 self.yield_now_host();
1099 }
1100}
1101
1102impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1103 fn should_cancel(&mut self) -> bool {
1104 self.should_cancel_host()
1105 }
1106
1107 fn yield_now(&mut self) {
1108 self.yield_now_host();
1109 }
1110}
1111
1112fn add_component_control_instance(
1113 linker: &mut Linker<ComponentState>,
1114 name: &str,
1115) -> wasmtime::Result<()> {
1116 let mut inst = linker.instance(name)?;
1117 inst.func_wrap(
1118 "should-cancel",
1119 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1120 let host = caller.data_mut();
1121 Ok((host.should_cancel_host(),))
1122 },
1123 )?;
1124 inst.func_wrap(
1125 "yield-now",
1126 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1127 let host = caller.data_mut();
1128 host.yield_now_host();
1129 Ok(())
1130 },
1131 )?;
1132 Ok(())
1133}
1134
1135fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1136 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1137 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1138 Ok(())
1139}
1140
1141pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1142 add_wasi_to_linker(linker)?;
1143 add_all_v1_to_linker(
1144 linker,
1145 HostFns {
1146 http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1147 http_client: Some(|state: &mut ComponentState| state.host_mut()),
1148 oauth_broker: None,
1149 runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1150 runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1151 telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1152 state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1153 secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1154 secrets_store: None,
1155 },
1156 )?;
1157 Ok(())
1158}
1159
1160impl OAuthHostContext for ComponentState {
1161 fn tenant_id(&self) -> &str {
1162 &self.host.config.tenant
1163 }
1164
1165 fn env(&self) -> &str {
1166 &self.host.default_env
1167 }
1168
1169 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1170 &mut self.host.oauth_host
1171 }
1172
1173 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1174 self.host.oauth_config.as_ref()
1175 }
1176}
1177
1178impl WasiView for ComponentState {
1179 fn ctx(&mut self) -> WasiCtxView<'_> {
1180 WasiCtxView {
1181 ctx: &mut self.wasi_ctx,
1182 table: &mut self.resource_table,
1183 }
1184 }
1185}
1186
1187#[allow(unsafe_code)]
1188unsafe impl Send for ComponentState {}
1189#[allow(unsafe_code)]
1190unsafe impl Sync for ComponentState {}
1191
1192impl PackRuntime {
1193 fn allows_state_store(&self, component_ref: &str) -> bool {
1194 if self.state_store.is_none() {
1195 return false;
1196 }
1197 if !self.config.state_store_policy.allow {
1198 return false;
1199 }
1200 let Some(manifest) = self.component_manifests.get(component_ref) else {
1201 return false;
1202 };
1203 manifest
1204 .capabilities
1205 .host
1206 .state
1207 .as_ref()
1208 .map(|caps| caps.read || caps.write)
1209 .unwrap_or(false)
1210 }
1211
1212 pub fn contains_component(&self, component_ref: &str) -> bool {
1213 self.components.contains_key(component_ref)
1214 }
1215
1216 #[allow(clippy::too_many_arguments)]
1217 pub async fn load(
1218 path: impl AsRef<Path>,
1219 config: Arc<HostConfig>,
1220 mocks: Option<Arc<MockLayer>>,
1221 archive_source: Option<&Path>,
1222 session_store: Option<DynSessionStore>,
1223 state_store: Option<DynStateStore>,
1224 wasi_policy: Arc<RunnerWasiPolicy>,
1225 secrets: DynSecretsManager,
1226 oauth_config: Option<OAuthBrokerConfig>,
1227 verify_archive: bool,
1228 component_resolution: ComponentResolution,
1229 ) -> Result<Self> {
1230 let path = path.as_ref();
1231 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1232 let path_meta = std::fs::metadata(&safe_path).ok();
1233 let is_dir = path_meta
1234 .as_ref()
1235 .map(|meta| meta.is_dir())
1236 .unwrap_or(false);
1237 let is_component = !is_dir
1238 && safe_path
1239 .extension()
1240 .and_then(|ext| ext.to_str())
1241 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1242 .unwrap_or(false);
1243 let archive_hint_path = if let Some(source) = archive_source {
1244 let (_, normalized) = normalize_pack_path(source)?;
1245 Some(normalized)
1246 } else if is_component || is_dir {
1247 None
1248 } else {
1249 Some(safe_path.clone())
1250 };
1251 let archive_hint = archive_hint_path.as_deref();
1252 if verify_archive {
1253 if let Some(verify_target) = archive_hint.and_then(|p| {
1254 std::fs::metadata(p)
1255 .ok()
1256 .filter(|meta| meta.is_file())
1257 .map(|_| p)
1258 }) {
1259 verify::verify_pack(verify_target).await?;
1260 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1261 } else {
1262 tracing::debug!("skipping archive verification (no archive source)");
1263 }
1264 }
1265 let engine = Engine::default();
1266 let engine_profile =
1267 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1268 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1269 let mut metadata = PackMetadata::fallback(&safe_path);
1270 let mut manifest = None;
1271 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1272 let mut flows = None;
1273 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1274 if is_dir {
1275 Some(safe_path.clone())
1276 } else {
1277 None
1278 }
1279 });
1280 let (pack_assets_dir, assets_tempdir) =
1281 locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1282 let setup_yaml_exists = pack_assets_dir
1283 .as_ref()
1284 .map(|dir| dir.join("setup.yaml").is_file())
1285 .unwrap_or(false);
1286 tracing::info!(
1287 pack_root = %safe_path.display(),
1288 assets_setup_yaml_exists = setup_yaml_exists,
1289 "pack unpack metadata"
1290 );
1291
1292 if let Some(root) = materialized_root.as_ref() {
1293 match load_manifest_and_flows_from_dir(root) {
1294 Ok(ManifestLoad::New {
1295 manifest: m,
1296 flows: cache,
1297 }) => {
1298 metadata = cache.metadata.clone();
1299 manifest = Some(*m);
1300 flows = Some(cache);
1301 }
1302 Ok(ManifestLoad::Legacy {
1303 manifest: m,
1304 flows: cache,
1305 }) => {
1306 metadata = cache.metadata.clone();
1307 legacy_manifest = Some(m);
1308 flows = Some(cache);
1309 }
1310 Err(err) => {
1311 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1312 }
1313 }
1314 }
1315
1316 if manifest.is_none()
1317 && legacy_manifest.is_none()
1318 && let Some(archive_path) = archive_hint
1319 {
1320 let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1321 format!(
1322 "failed to load manifest.cbor from {}",
1323 archive_path.display()
1324 )
1325 })?;
1326 match manifest_load {
1327 ManifestLoad::New {
1328 manifest: m,
1329 flows: cache,
1330 } => {
1331 metadata = cache.metadata.clone();
1332 manifest = Some(*m);
1333 flows = Some(cache);
1334 }
1335 ManifestLoad::Legacy {
1336 manifest: m,
1337 flows: cache,
1338 } => {
1339 metadata = cache.metadata.clone();
1340 legacy_manifest = Some(m);
1341 flows = Some(cache);
1342 }
1343 }
1344 }
1345 #[cfg(feature = "fault-injection")]
1346 {
1347 let fault_ctx = FaultContext {
1348 pack_id: metadata.pack_id.as_str(),
1349 flow_id: "unknown",
1350 node_id: None,
1351 attempt: 1,
1352 };
1353 maybe_fail(FaultPoint::PackResolve, fault_ctx)
1354 .map_err(|err| anyhow!(err.to_string()))?;
1355 }
1356 let mut pack_lock = None;
1357 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1358 pack_lock = load_pack_lock(&root)?;
1359 if pack_lock.is_some() {
1360 break;
1361 }
1362 }
1363 let component_sources_payload = if pack_lock.is_none() {
1364 if let Some(manifest) = manifest.as_ref() {
1365 manifest
1366 .get_component_sources_v1()
1367 .context("invalid component sources extension")?
1368 } else {
1369 None
1370 }
1371 } else {
1372 None
1373 };
1374 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1375 Some(component_sources_table_from_pack_lock(
1376 lock,
1377 component_resolution.allow_missing_hash,
1378 )?)
1379 } else {
1380 component_sources_table(component_sources_payload.as_ref())?
1381 };
1382 let components = if is_component {
1383 let wasm_bytes = fs::read(&safe_path).await?;
1384 metadata = PackMetadata::from_wasm(&wasm_bytes)
1385 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1386 let name = safe_path
1387 .file_stem()
1388 .map(|s| s.to_string_lossy().to_string())
1389 .unwrap_or_else(|| "component".to_string());
1390 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1391 let mut map = HashMap::new();
1392 map.insert(
1393 name.clone(),
1394 PackComponent {
1395 name,
1396 version: metadata.version.clone(),
1397 component,
1398 },
1399 );
1400 map
1401 } else {
1402 let specs = component_specs(
1403 manifest.as_ref(),
1404 legacy_manifest.as_deref(),
1405 component_sources_payload.as_ref(),
1406 pack_lock.as_ref(),
1407 );
1408 if specs.is_empty() {
1409 HashMap::new()
1410 } else {
1411 let mut loaded = HashMap::new();
1412 let mut missing: HashSet<String> =
1413 specs.iter().map(|spec| spec.id.clone()).collect();
1414 let mut searched = Vec::new();
1415
1416 if !component_resolution.overrides.is_empty() {
1417 load_components_from_overrides(
1418 &cache,
1419 &engine,
1420 &component_resolution.overrides,
1421 &specs,
1422 &mut missing,
1423 &mut loaded,
1424 )
1425 .await?;
1426 searched.push("override map".to_string());
1427 }
1428
1429 if let Some(component_sources) = component_sources.as_ref() {
1430 load_components_from_sources(
1431 &cache,
1432 &engine,
1433 component_sources,
1434 &component_resolution,
1435 &specs,
1436 &mut missing,
1437 &mut loaded,
1438 materialized_root.as_deref(),
1439 archive_hint,
1440 )
1441 .await?;
1442 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1443 }
1444
1445 if let Some(root) = materialized_root.as_ref() {
1446 load_components_from_dir(
1447 &cache,
1448 &engine,
1449 root,
1450 &specs,
1451 &mut missing,
1452 &mut loaded,
1453 )
1454 .await?;
1455 searched.push(format!("components dir {}", root.display()));
1456 }
1457
1458 if let Some(archive_path) = archive_hint {
1459 load_components_from_archive(
1460 &cache,
1461 &engine,
1462 archive_path,
1463 &specs,
1464 &mut missing,
1465 &mut loaded,
1466 )
1467 .await?;
1468 searched.push(format!("archive {}", archive_path.display()));
1469 }
1470
1471 if !missing.is_empty() {
1472 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1473 let sources = if searched.is_empty() {
1474 "no component sources".to_string()
1475 } else {
1476 searched.join(", ")
1477 };
1478 bail!(
1479 "components missing: {}; looked in {}",
1480 missing_list,
1481 sources
1482 );
1483 }
1484
1485 loaded
1486 }
1487 };
1488 let http_client = Arc::clone(&HTTP_CLIENT);
1489 let mut component_manifests = HashMap::new();
1490 if let Some(manifest) = manifest.as_ref() {
1491 for component in &manifest.components {
1492 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1493 }
1494 }
1495 let mut pack_policy = (*wasi_policy).clone();
1496 if let Some(dir) = pack_assets_dir {
1497 tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
1498 pack_policy =
1499 pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
1500 }
1501 let wasi_policy = Arc::new(pack_policy);
1502 Ok(Self {
1503 path: safe_path,
1504 archive_path: archive_hint.map(Path::to_path_buf),
1505 config,
1506 engine,
1507 metadata,
1508 manifest,
1509 legacy_manifest,
1510 component_manifests,
1511 mocks,
1512 flows,
1513 components,
1514 http_client,
1515 pre_cache: Mutex::new(HashMap::new()),
1516 session_store,
1517 state_store,
1518 wasi_policy,
1519 assets_tempdir,
1520 provider_registry: RwLock::new(None),
1521 secrets,
1522 oauth_config,
1523 cache,
1524 })
1525 }
1526
1527 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1528 if let Some(cache) = &self.flows {
1529 return Ok(cache.descriptors.clone());
1530 }
1531 if let Some(manifest) = &self.manifest {
1532 let descriptors = manifest
1533 .flows
1534 .iter()
1535 .map(|flow| FlowDescriptor {
1536 id: flow.id.as_str().to_string(),
1537 flow_type: flow_kind_to_str(flow.kind).to_string(),
1538 pack_id: manifest.pack_id.as_str().to_string(),
1539 profile: manifest.pack_id.as_str().to_string(),
1540 version: manifest.version.to_string(),
1541 description: None,
1542 })
1543 .collect();
1544 return Ok(descriptors);
1545 }
1546 Ok(Vec::new())
1547 }
1548
1549 #[allow(dead_code)]
1550 pub async fn run_flow(
1551 &self,
1552 flow_id: &str,
1553 input: serde_json::Value,
1554 ) -> Result<serde_json::Value> {
1555 let pack = Arc::new(
1556 PackRuntime::load(
1557 &self.path,
1558 Arc::clone(&self.config),
1559 self.mocks.clone(),
1560 self.archive_path.as_deref(),
1561 self.session_store.clone(),
1562 self.state_store.clone(),
1563 Arc::clone(&self.wasi_policy),
1564 self.secrets.clone(),
1565 self.oauth_config.clone(),
1566 false,
1567 ComponentResolution::default(),
1568 )
1569 .await?,
1570 );
1571
1572 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1573 let retry_config = self.config.retry_config().into();
1574 let mocks = pack.mocks.as_deref();
1575 let tenant = self.config.tenant.as_str();
1576
1577 let ctx = FlowContext {
1578 tenant,
1579 pack_id: pack.metadata().pack_id.as_str(),
1580 flow_id,
1581 node_id: None,
1582 tool: None,
1583 action: None,
1584 session_id: None,
1585 provider_id: None,
1586 retry_config,
1587 attempt: 1,
1588 observer: None,
1589 mocks,
1590 };
1591
1592 let execution = engine.execute(ctx, input).await?;
1593 match execution.status {
1594 FlowStatus::Completed => Ok(execution.output),
1595 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1596 "status": "pending",
1597 "reason": wait.reason,
1598 "resume": wait.snapshot,
1599 "response": execution.output,
1600 })),
1601 }
1602 }
1603
1604 pub async fn invoke_component(
1605 &self,
1606 component_ref: &str,
1607 ctx: ComponentExecCtx,
1608 operation: &str,
1609 _config_json: Option<String>,
1610 input_json: String,
1611 ) -> Result<Value> {
1612 let pack_component = self
1613 .components
1614 .get(component_ref)
1615 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1616 let engine = self.engine.clone();
1617 let config = Arc::clone(&self.config);
1618 let http_client = Arc::clone(&self.http_client);
1619 let mocks = self.mocks.clone();
1620 let session_store = self.session_store.clone();
1621 let state_store = self.state_store.clone();
1622 let secrets = Arc::clone(&self.secrets);
1623 let oauth_config = self.oauth_config.clone();
1624 let wasi_policy = Arc::clone(&self.wasi_policy);
1625 let pack_id = self.metadata().pack_id.clone();
1626 let allow_state_store = self.allows_state_store(component_ref);
1627 let component = pack_component.component.clone();
1628 let component_ref_owned = component_ref.to_string();
1629 let operation_owned = operation.to_string();
1630 let input_owned = input_json;
1631 let ctx_owned = ctx;
1632
1633 run_on_wasi_thread("component.invoke", move || {
1634 let mut linker = Linker::new(&engine);
1635 register_all(&mut linker, allow_state_store)?;
1636 add_component_control_to_linker(&mut linker)?;
1637
1638 let host_state = HostState::new(
1639 pack_id.clone(),
1640 config,
1641 http_client,
1642 mocks,
1643 session_store,
1644 state_store,
1645 secrets,
1646 oauth_config,
1647 Some(ctx_owned.clone()),
1648 Some(component_ref_owned.clone()),
1649 false,
1650 )?;
1651 let store_state = ComponentState::new(host_state, wasi_policy)?;
1652 let mut store = wasmtime::Store::new(&engine, store_state);
1653
1654 let invoke_result = HostState::instantiate_component_result(
1655 &mut linker,
1656 &mut store,
1657 &component,
1658 &ctx_owned,
1659 &operation_owned,
1660 &input_owned,
1661 )?;
1662 HostState::convert_invoke_result(invoke_result)
1663 })
1664 }
1665
1666 pub fn resolve_provider(
1667 &self,
1668 provider_id: Option<&str>,
1669 provider_type: Option<&str>,
1670 ) -> Result<ProviderBinding> {
1671 let registry = self.provider_registry()?;
1672 registry.resolve(provider_id, provider_type)
1673 }
1674
1675 pub async fn invoke_provider(
1676 &self,
1677 binding: &ProviderBinding,
1678 ctx: ComponentExecCtx,
1679 op: &str,
1680 input_json: Vec<u8>,
1681 ) -> Result<Value> {
1682 let component_ref_owned = binding.component_ref.clone();
1683 let pack_component = self.components.get(&component_ref_owned).with_context(|| {
1684 format!("provider component '{component_ref_owned}' not found in pack")
1685 })?;
1686 let component = pack_component.component.clone();
1687
1688 let engine = self.engine.clone();
1689 let config = Arc::clone(&self.config);
1690 let http_client = Arc::clone(&self.http_client);
1691 let mocks = self.mocks.clone();
1692 let session_store = self.session_store.clone();
1693 let state_store = self.state_store.clone();
1694 let secrets = Arc::clone(&self.secrets);
1695 let oauth_config = self.oauth_config.clone();
1696 let wasi_policy = Arc::clone(&self.wasi_policy);
1697 let pack_id = self.metadata().pack_id.clone();
1698 let allow_state_store = self.allows_state_store(&component_ref_owned);
1699 let input_owned = input_json;
1700 let op_owned = op.to_string();
1701 let ctx_owned = ctx;
1702
1703 run_on_wasi_thread("provider.invoke", move || {
1704 let mut linker = Linker::new(&engine);
1705 register_all(&mut linker, allow_state_store)?;
1706 add_component_control_to_linker(&mut linker)?;
1707 let pre_instance = linker.instantiate_pre(component.as_ref())?;
1708 let pre: ProviderComponentPre<ComponentState> =
1709 ProviderComponentPre::new(pre_instance)?;
1710
1711 let host_state = HostState::new(
1712 pack_id.clone(),
1713 config,
1714 http_client,
1715 mocks,
1716 session_store,
1717 state_store,
1718 secrets,
1719 oauth_config,
1720 Some(ctx_owned.clone()),
1721 Some(component_ref_owned.clone()),
1722 true,
1723 )?;
1724 let store_state = ComponentState::new(host_state, wasi_policy)?;
1725 let mut store = wasmtime::Store::new(&engine, store_state);
1726 let bindings: crate::provider_core::SchemaCore =
1727 block_on(async { pre.instantiate_async(&mut store).await })?;
1728 let provider = bindings.greentic_provider_core_schema_core_api();
1729
1730 let result = provider.call_invoke(&mut store, &op_owned, &input_owned)?;
1731 deserialize_json_bytes(result)
1732 })
1733 }
1734
1735 pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
1736 if let Some(registry) = self.provider_registry.read().clone() {
1737 return Ok(registry);
1738 }
1739 let manifest = self
1740 .manifest
1741 .as_ref()
1742 .context("pack manifest required for provider resolution")?;
1743 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1744 let registry = ProviderRegistry::new(
1745 manifest,
1746 self.state_store.clone(),
1747 &self.config.tenant,
1748 &env,
1749 )?;
1750 *self.provider_registry.write() = Some(registry.clone());
1751 Ok(registry)
1752 }
1753
1754 pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
1755 if self.manifest.is_none() {
1756 return Ok(None);
1757 }
1758 Ok(Some(self.provider_registry()?))
1759 }
1760
1761 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1762 if let Some(cache) = &self.flows {
1763 return cache
1764 .flows
1765 .get(flow_id)
1766 .cloned()
1767 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1768 }
1769 if let Some(manifest) = &self.manifest {
1770 let entry = manifest
1771 .flows
1772 .iter()
1773 .find(|f| f.id.as_str() == flow_id)
1774 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1775 return Ok(entry.flow.clone());
1776 }
1777 bail!("flow '{flow_id}' not available (pack exports disabled)")
1778 }
1779
1780 pub fn metadata(&self) -> &PackMetadata {
1781 &self.metadata
1782 }
1783
1784 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1785 &self.metadata.secret_requirements
1786 }
1787
1788 pub fn missing_secrets(
1789 &self,
1790 tenant_ctx: &TypesTenantCtx,
1791 ) -> Vec<greentic_types::SecretRequirement> {
1792 let env = tenant_ctx.env.as_str().to_string();
1793 let tenant = tenant_ctx.tenant.as_str().to_string();
1794 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1795 self.required_secrets()
1796 .iter()
1797 .filter(|req| {
1798 if let Some(scope) = &req.scope {
1800 if scope.env != env {
1801 return false;
1802 }
1803 if scope.tenant != tenant {
1804 return false;
1805 }
1806 if let Some(ref team_req) = scope.team
1807 && team.as_ref() != Some(team_req)
1808 {
1809 return false;
1810 }
1811 }
1812 let ctx = self.config.tenant_ctx();
1813 read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1814 })
1815 .cloned()
1816 .collect()
1817 }
1818
1819 pub fn for_component_test(
1820 components: Vec<(String, PathBuf)>,
1821 flows: HashMap<String, FlowIR>,
1822 pack_id: &str,
1823 config: Arc<HostConfig>,
1824 ) -> Result<Self> {
1825 let engine = Engine::default();
1826 let engine_profile =
1827 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1828 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1829 let mut component_map = HashMap::new();
1830 for (name, path) in components {
1831 if !path.exists() {
1832 bail!("component artifact missing: {}", path.display());
1833 }
1834 let wasm_bytes = std::fs::read(&path)?;
1835 let component = Arc::new(
1836 Component::from_binary(&engine, &wasm_bytes)
1837 .with_context(|| format!("failed to compile component {}", path.display()))?,
1838 );
1839 component_map.insert(
1840 name.clone(),
1841 PackComponent {
1842 name,
1843 version: "0.0.0".into(),
1844 component,
1845 },
1846 );
1847 }
1848
1849 let mut flow_map = HashMap::new();
1850 let mut descriptors = Vec::new();
1851 for (id, ir) in flows {
1852 let flow_type = ir.flow_type.clone();
1853 let flow = flow_ir_to_flow(ir)?;
1854 flow_map.insert(id.clone(), flow);
1855 descriptors.push(FlowDescriptor {
1856 id: id.clone(),
1857 flow_type,
1858 pack_id: pack_id.to_string(),
1859 profile: "test".into(),
1860 version: "0.0.0".into(),
1861 description: None,
1862 });
1863 }
1864 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1865 let metadata = PackMetadata {
1866 pack_id: pack_id.to_string(),
1867 version: "0.0.0".into(),
1868 entry_flows,
1869 secret_requirements: Vec::new(),
1870 };
1871 let flows_cache = PackFlows {
1872 descriptors: descriptors.clone(),
1873 flows: flow_map,
1874 metadata: metadata.clone(),
1875 };
1876
1877 Ok(Self {
1878 path: PathBuf::new(),
1879 archive_path: None,
1880 config,
1881 engine,
1882 metadata,
1883 manifest: None,
1884 legacy_manifest: None,
1885 component_manifests: HashMap::new(),
1886 mocks: None,
1887 flows: Some(flows_cache),
1888 components: component_map,
1889 http_client: Arc::clone(&HTTP_CLIENT),
1890 pre_cache: Mutex::new(HashMap::new()),
1891 session_store: None,
1892 state_store: None,
1893 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1894 assets_tempdir: None,
1895 provider_registry: RwLock::new(None),
1896 secrets: crate::secrets::default_manager()?,
1897 oauth_config: None,
1898 cache,
1899 })
1900 }
1901}
1902
1903fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1904 let message = err.to_string();
1905 message.contains("no exported instance named")
1906 && message.contains(&format!("greentic:component/node@{version}"))
1907}
1908
1909struct PackFlows {
1910 descriptors: Vec<FlowDescriptor>,
1911 flows: HashMap<String, Flow>,
1912 metadata: PackMetadata,
1913}
1914
1915const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1916 "greentic.pack.runtime_flow",
1917 "greentic.pack.flow_runtime",
1918 "greentic.pack.runtime_flows",
1919];
1920
1921#[derive(Debug, Deserialize)]
1922struct RuntimeFlowBundle {
1923 flows: Vec<RuntimeFlow>,
1924}
1925
1926#[derive(Debug, Deserialize)]
1927struct RuntimeFlow {
1928 id: String,
1929 #[serde(alias = "flow_type")]
1930 kind: FlowKind,
1931 #[serde(default)]
1932 schema_version: Option<String>,
1933 #[serde(default)]
1934 start: Option<String>,
1935 #[serde(default)]
1936 entrypoints: BTreeMap<String, Value>,
1937 nodes: BTreeMap<String, RuntimeNode>,
1938 #[serde(default)]
1939 metadata: Option<FlowMetadata>,
1940}
1941
1942#[derive(Debug, Deserialize)]
1943struct RuntimeNode {
1944 #[serde(alias = "component")]
1945 component_id: String,
1946 #[serde(default, alias = "operation")]
1947 operation_name: Option<String>,
1948 #[serde(default, alias = "payload", alias = "input")]
1949 operation_payload: Value,
1950 #[serde(default)]
1951 routing: Option<Routing>,
1952 #[serde(default)]
1953 telemetry: Option<TelemetryHints>,
1954}
1955
1956fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1957 if bytes.is_empty() {
1958 return Ok(Value::Null);
1959 }
1960 serde_json::from_slice(&bytes).or_else(|_| {
1961 String::from_utf8(bytes)
1962 .map(Value::String)
1963 .map_err(|err| anyhow!(err))
1964 })
1965}
1966
1967impl PackFlows {
1968 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1969 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1970 return flows;
1971 }
1972 let descriptors = manifest
1973 .flows
1974 .iter()
1975 .map(|entry| FlowDescriptor {
1976 id: entry.id.as_str().to_string(),
1977 flow_type: flow_kind_to_str(entry.kind).to_string(),
1978 pack_id: manifest.pack_id.as_str().to_string(),
1979 profile: manifest.pack_id.as_str().to_string(),
1980 version: manifest.version.to_string(),
1981 description: None,
1982 })
1983 .collect();
1984 let mut flows = HashMap::new();
1985 for entry in &manifest.flows {
1986 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1987 }
1988 Self {
1989 metadata: PackMetadata::from_manifest(&manifest),
1990 descriptors,
1991 flows,
1992 }
1993 }
1994}
1995
1996fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1997 let extensions = manifest.extensions.as_ref()?;
1998 let extension = extensions.iter().find_map(|(key, ext)| {
1999 if RUNTIME_FLOW_EXTENSION_IDS
2000 .iter()
2001 .any(|candidate| candidate == key)
2002 {
2003 Some(ext)
2004 } else {
2005 None
2006 }
2007 })?;
2008 let runtime_flows = match decode_runtime_flow_extension(extension) {
2009 Some(flows) if !flows.is_empty() => flows,
2010 _ => return None,
2011 };
2012
2013 let descriptors = runtime_flows
2014 .iter()
2015 .map(|flow| FlowDescriptor {
2016 id: flow.id.as_str().to_string(),
2017 flow_type: flow_kind_to_str(flow.kind).to_string(),
2018 pack_id: manifest.pack_id.as_str().to_string(),
2019 profile: manifest.pack_id.as_str().to_string(),
2020 version: manifest.version.to_string(),
2021 description: None,
2022 })
2023 .collect::<Vec<_>>();
2024 let flows = runtime_flows
2025 .into_iter()
2026 .map(|flow| (flow.id.as_str().to_string(), flow))
2027 .collect();
2028
2029 Some(PackFlows {
2030 metadata: PackMetadata::from_manifest(manifest),
2031 descriptors,
2032 flows,
2033 })
2034}
2035
2036fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
2037 let value = match extension.inline.as_ref()? {
2038 ExtensionInline::Other(value) => value.clone(),
2039 _ => return None,
2040 };
2041
2042 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
2043 return Some(collect_runtime_flows(bundle.flows));
2044 }
2045
2046 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
2047 return Some(collect_runtime_flows(flows));
2048 }
2049
2050 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
2051 return Some(flows);
2052 }
2053
2054 warn!(
2055 extension = %extension.kind,
2056 version = %extension.version,
2057 "runtime flow extension present but could not be decoded"
2058 );
2059 None
2060}
2061
2062fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
2063 flows
2064 .into_iter()
2065 .filter_map(|flow| match runtime_flow_to_flow(flow) {
2066 Ok(flow) => Some(flow),
2067 Err(err) => {
2068 warn!(error = %err, "failed to decode runtime flow");
2069 None
2070 }
2071 })
2072 .collect()
2073}
2074
2075fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
2076 let flow_id = FlowId::from_str(&runtime.id)
2077 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
2078 let mut entrypoints = runtime.entrypoints;
2079 if entrypoints.is_empty()
2080 && let Some(start) = &runtime.start
2081 {
2082 entrypoints.insert("default".into(), Value::String(start.clone()));
2083 }
2084
2085 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
2086 for (id, node) in runtime.nodes {
2087 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
2088 let component_id = ComponentId::from_str(&node.component_id)
2089 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
2090 let component = FlowComponentRef {
2091 id: component_id,
2092 pack_alias: None,
2093 operation: node.operation_name,
2094 };
2095 let routing = node.routing.unwrap_or(Routing::End);
2096 let telemetry = node.telemetry.unwrap_or_default();
2097 nodes.insert(
2098 node_id.clone(),
2099 Node {
2100 id: node_id,
2101 component,
2102 input: InputMapping {
2103 mapping: node.operation_payload,
2104 },
2105 output: OutputMapping {
2106 mapping: Value::Null,
2107 },
2108 routing,
2109 telemetry,
2110 },
2111 );
2112 }
2113
2114 Ok(Flow {
2115 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
2116 id: flow_id,
2117 kind: runtime.kind,
2118 entrypoints,
2119 nodes,
2120 metadata: runtime.metadata.unwrap_or_default(),
2121 })
2122}
2123
2124fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
2125 match kind {
2126 greentic_types::FlowKind::Messaging => "messaging",
2127 greentic_types::FlowKind::Event => "event",
2128 greentic_types::FlowKind::ComponentConfig => "component-config",
2129 greentic_types::FlowKind::Job => "job",
2130 greentic_types::FlowKind::Http => "http",
2131 }
2132}
2133
2134fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
2135 let mut file = archive
2136 .by_name(name)
2137 .with_context(|| format!("entry {name} missing from archive"))?;
2138 let mut buf = Vec::new();
2139 file.read_to_end(&mut buf)?;
2140 Ok(buf)
2141}
2142
2143fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
2144 for node in doc.nodes.values_mut() {
2145 let Some((component_ref, payload)) = node
2146 .raw
2147 .iter()
2148 .next()
2149 .map(|(key, value)| (key.clone(), value.clone()))
2150 else {
2151 continue;
2152 };
2153 if component_ref.starts_with("emit.") {
2154 node.operation = Some(component_ref);
2155 node.payload = payload;
2156 node.raw.clear();
2157 continue;
2158 }
2159 let (target_component, operation, input, config) =
2160 infer_component_exec(&payload, &component_ref);
2161 let mut payload_obj = serde_json::Map::new();
2162 payload_obj.insert("component".into(), Value::String(target_component));
2164 payload_obj.insert("operation".into(), Value::String(operation));
2165 payload_obj.insert("input".into(), input);
2166 if let Some(cfg) = config {
2167 payload_obj.insert("config".into(), cfg);
2168 }
2169 node.operation = Some("component.exec".to_string());
2170 node.payload = Value::Object(payload_obj);
2171 node.raw.clear();
2172 }
2173 doc
2174}
2175
2176fn infer_component_exec(
2177 payload: &Value,
2178 component_ref: &str,
2179) -> (String, String, Value, Option<Value>) {
2180 let default_op = if component_ref.starts_with("templating.") {
2181 "render"
2182 } else {
2183 "invoke"
2184 }
2185 .to_string();
2186
2187 if let Value::Object(map) = payload {
2188 let op = map
2189 .get("op")
2190 .or_else(|| map.get("operation"))
2191 .and_then(Value::as_str)
2192 .map(|s| s.to_string())
2193 .unwrap_or_else(|| default_op.clone());
2194
2195 let mut input = map.clone();
2196 let config = input.remove("config");
2197 let component = input
2198 .get("component")
2199 .or_else(|| input.get("component_ref"))
2200 .and_then(Value::as_str)
2201 .map(|s| s.to_string())
2202 .unwrap_or_else(|| component_ref.to_string());
2203 input.remove("component");
2204 input.remove("component_ref");
2205 input.remove("op");
2206 input.remove("operation");
2207 return (component, op, Value::Object(input), config);
2208 }
2209
2210 (component_ref.to_string(), default_op, payload.clone(), None)
2211}
2212
2213#[derive(Clone, Debug)]
2214struct ComponentSpec {
2215 id: String,
2216 version: String,
2217 legacy_path: Option<String>,
2218}
2219
2220#[derive(Clone, Debug)]
2221struct ComponentSourceInfo {
2222 digest: Option<String>,
2223 source: ComponentSourceRef,
2224 artifact: ComponentArtifactLocation,
2225 expected_wasm_sha256: Option<String>,
2226 skip_digest_verification: bool,
2227}
2228
2229#[derive(Clone, Debug)]
2230enum ComponentArtifactLocation {
2231 Inline { wasm_path: String },
2232 Remote,
2233}
2234
2235#[derive(Clone, Debug, Deserialize)]
2236struct PackLockV1 {
2237 schema_version: u32,
2238 components: Vec<PackLockComponent>,
2239}
2240
2241#[derive(Clone, Debug, Deserialize)]
2242struct PackLockComponent {
2243 name: String,
2244 #[serde(default, rename = "source_ref")]
2245 source_ref: Option<String>,
2246 #[serde(default, rename = "ref")]
2247 legacy_ref: Option<String>,
2248 #[serde(default)]
2249 component_id: Option<ComponentId>,
2250 #[serde(default)]
2251 bundled: Option<bool>,
2252 #[serde(default, rename = "bundled_path")]
2253 bundled_path: Option<String>,
2254 #[serde(default, rename = "path")]
2255 legacy_path: Option<String>,
2256 #[serde(default)]
2257 wasm_sha256: Option<String>,
2258 #[serde(default, rename = "sha256")]
2259 legacy_sha256: Option<String>,
2260 #[serde(default)]
2261 resolved_digest: Option<String>,
2262 #[serde(default)]
2263 digest: Option<String>,
2264}
2265
2266fn component_specs(
2267 manifest: Option<&greentic_types::PackManifest>,
2268 legacy_manifest: Option<&legacy_pack::PackManifest>,
2269 component_sources: Option<&ComponentSourcesV1>,
2270 pack_lock: Option<&PackLockV1>,
2271) -> Vec<ComponentSpec> {
2272 if let Some(manifest) = manifest {
2273 if !manifest.components.is_empty() {
2274 return manifest
2275 .components
2276 .iter()
2277 .map(|entry| ComponentSpec {
2278 id: entry.id.as_str().to_string(),
2279 version: entry.version.to_string(),
2280 legacy_path: None,
2281 })
2282 .collect();
2283 }
2284 if let Some(lock) = pack_lock {
2285 let mut seen = HashSet::new();
2286 let mut specs = Vec::new();
2287 for entry in &lock.components {
2288 let id = entry
2289 .component_id
2290 .as_ref()
2291 .map(|id| id.as_str())
2292 .unwrap_or(entry.name.as_str());
2293 if seen.insert(id.to_string()) {
2294 specs.push(ComponentSpec {
2295 id: id.to_string(),
2296 version: "0.0.0".to_string(),
2297 legacy_path: None,
2298 });
2299 }
2300 }
2301 return specs;
2302 }
2303 if let Some(sources) = component_sources {
2304 let mut seen = HashSet::new();
2305 let mut specs = Vec::new();
2306 for entry in &sources.components {
2307 let id = entry
2308 .component_id
2309 .as_ref()
2310 .map(|id| id.as_str())
2311 .unwrap_or(entry.name.as_str());
2312 if seen.insert(id.to_string()) {
2313 specs.push(ComponentSpec {
2314 id: id.to_string(),
2315 version: "0.0.0".to_string(),
2316 legacy_path: None,
2317 });
2318 }
2319 }
2320 return specs;
2321 }
2322 }
2323 if let Some(legacy_manifest) = legacy_manifest {
2324 return legacy_manifest
2325 .components
2326 .iter()
2327 .map(|entry| ComponentSpec {
2328 id: entry.name.clone(),
2329 version: entry.version.to_string(),
2330 legacy_path: Some(entry.file_wasm.clone()),
2331 })
2332 .collect();
2333 }
2334 Vec::new()
2335}
2336
2337fn component_sources_table(
2338 sources: Option<&ComponentSourcesV1>,
2339) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2340 let Some(sources) = sources else {
2341 return Ok(None);
2342 };
2343 let mut table = HashMap::new();
2344 for entry in &sources.components {
2345 let artifact = match &entry.artifact {
2346 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2347 wasm_path: wasm_path.clone(),
2348 },
2349 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2350 };
2351 let info = ComponentSourceInfo {
2352 digest: Some(entry.resolved.digest.clone()),
2353 source: entry.source.clone(),
2354 artifact,
2355 expected_wasm_sha256: None,
2356 skip_digest_verification: false,
2357 };
2358 if let Some(component_id) = entry.component_id.as_ref() {
2359 table.insert(component_id.as_str().to_string(), info.clone());
2360 }
2361 table.insert(entry.name.clone(), info);
2362 }
2363 Ok(Some(table))
2364}
2365
2366fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2367 let lock_path = if path.is_dir() {
2368 let candidate = path.join("pack.lock");
2369 if candidate.exists() {
2370 Some(candidate)
2371 } else {
2372 let candidate = path.join("pack.lock.json");
2373 candidate.exists().then_some(candidate)
2374 }
2375 } else {
2376 None
2377 };
2378 let Some(lock_path) = lock_path else {
2379 return Ok(None);
2380 };
2381 let raw = std::fs::read_to_string(&lock_path)
2382 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2383 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2384 if lock.schema_version != 1 {
2385 bail!("pack.lock schema_version must be 1");
2386 }
2387 Ok(Some(lock))
2388}
2389
2390fn find_pack_lock_roots(
2391 pack_path: &Path,
2392 is_dir: bool,
2393 archive_hint: Option<&Path>,
2394) -> Vec<PathBuf> {
2395 if is_dir {
2396 return vec![pack_path.to_path_buf()];
2397 }
2398 let mut roots = Vec::new();
2399 if let Some(archive_path) = archive_hint {
2400 if let Some(parent) = archive_path.parent() {
2401 roots.push(parent.to_path_buf());
2402 if let Some(grandparent) = parent.parent() {
2403 roots.push(grandparent.to_path_buf());
2404 }
2405 }
2406 } else if let Some(parent) = pack_path.parent() {
2407 roots.push(parent.to_path_buf());
2408 if let Some(grandparent) = parent.parent() {
2409 roots.push(grandparent.to_path_buf());
2410 }
2411 }
2412 roots
2413}
2414
2415fn normalize_sha256(digest: &str) -> Result<String> {
2416 let trimmed = digest.trim();
2417 if trimmed.is_empty() {
2418 bail!("sha256 digest cannot be empty");
2419 }
2420 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2421 if stripped.is_empty() {
2422 bail!("sha256 digest must include hex bytes after sha256:");
2423 }
2424 return Ok(trimmed.to_string());
2425 }
2426 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2427 return Ok(format!("sha256:{trimmed}"));
2428 }
2429 bail!("sha256 digest must be hex or sha256:<hex>");
2430}
2431
2432fn component_sources_table_from_pack_lock(
2433 lock: &PackLockV1,
2434 allow_missing_hash: bool,
2435) -> Result<HashMap<String, ComponentSourceInfo>> {
2436 let mut table = HashMap::new();
2437 let mut names = HashSet::new();
2438 for entry in &lock.components {
2439 if !names.insert(entry.name.clone()) {
2440 bail!(
2441 "pack.lock contains duplicate component name `{}`",
2442 entry.name
2443 );
2444 }
2445 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2446 (Some(primary), Some(legacy)) => {
2447 if primary != legacy {
2448 bail!(
2449 "pack.lock component {} has conflicting refs: {} vs {}",
2450 entry.name,
2451 primary,
2452 legacy
2453 );
2454 }
2455 primary.as_str()
2456 }
2457 (Some(primary), None) => primary.as_str(),
2458 (None, Some(legacy)) => legacy.as_str(),
2459 (None, None) => {
2460 bail!("pack.lock component {} missing source_ref", entry.name);
2461 }
2462 };
2463 let source: ComponentSourceRef = source_ref
2464 .parse()
2465 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2466 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2467 (Some(primary), Some(legacy)) => {
2468 if primary != legacy {
2469 bail!(
2470 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2471 entry.name,
2472 primary,
2473 legacy
2474 );
2475 }
2476 Some(primary.clone())
2477 }
2478 (Some(primary), None) => Some(primary.clone()),
2479 (None, Some(legacy)) => Some(legacy.clone()),
2480 (None, None) => None,
2481 };
2482 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2483 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2484 let wasm_path = bundled_path.ok_or_else(|| {
2485 anyhow!(
2486 "pack.lock component {} marked bundled but bundled_path is missing",
2487 entry.name
2488 )
2489 })?;
2490 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2491 (Some(primary), Some(legacy)) => {
2492 if primary != legacy {
2493 bail!(
2494 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2495 entry.name,
2496 primary,
2497 legacy
2498 );
2499 }
2500 Some(primary.as_str())
2501 }
2502 (Some(primary), None) => Some(primary.as_str()),
2503 (None, Some(legacy)) => Some(legacy.as_str()),
2504 (None, None) => None,
2505 };
2506 let expected = match expected_raw {
2507 Some(value) => Some(normalize_sha256(value)?),
2508 None => None,
2509 };
2510 if expected.is_none() && !allow_missing_hash {
2511 bail!(
2512 "pack.lock component {} missing wasm_sha256 for bundled component",
2513 entry.name
2514 );
2515 }
2516 (
2517 ComponentArtifactLocation::Inline { wasm_path },
2518 expected.clone(),
2519 expected,
2520 allow_missing_hash && expected_raw.is_none(),
2521 )
2522 } else {
2523 if source.is_tag() {
2524 bail!(
2525 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2526 entry.name,
2527 source
2528 );
2529 }
2530 let expected = entry
2531 .resolved_digest
2532 .as_deref()
2533 .or(entry.digest.as_deref())
2534 .ok_or_else(|| {
2535 anyhow!(
2536 "pack.lock component {} missing resolved_digest for remote component",
2537 entry.name
2538 )
2539 })?;
2540 (
2541 ComponentArtifactLocation::Remote,
2542 Some(normalize_digest(expected)),
2543 None,
2544 false,
2545 )
2546 };
2547 let info = ComponentSourceInfo {
2548 digest,
2549 source,
2550 artifact,
2551 expected_wasm_sha256,
2552 skip_digest_verification,
2553 };
2554 if let Some(component_id) = entry.component_id.as_ref() {
2555 let key = component_id.as_str().to_string();
2556 if table.contains_key(&key) {
2557 bail!(
2558 "pack.lock contains duplicate component id `{}`",
2559 component_id.as_str()
2560 );
2561 }
2562 table.insert(key, info.clone());
2563 }
2564 if entry.name
2565 != entry
2566 .component_id
2567 .as_ref()
2568 .map(|id| id.as_str())
2569 .unwrap_or("")
2570 {
2571 table.insert(entry.name.clone(), info);
2572 }
2573 }
2574 Ok(table)
2575}
2576
2577fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2578 if let Some(path) = &spec.legacy_path {
2579 return root.join(path);
2580 }
2581 root.join("components").join(format!("{}.wasm", spec.id))
2582}
2583
2584fn normalize_digest(digest: &str) -> String {
2585 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2586 digest.to_string()
2587 } else {
2588 format!("sha256:{digest}")
2589 }
2590}
2591
2592fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2593 if digest.starts_with("blake3:") {
2594 let hash = blake3::hash(bytes);
2595 return Ok(format!("blake3:{}", hash.to_hex()));
2596 }
2597 let mut hasher = sha2::Sha256::new();
2598 hasher.update(bytes);
2599 Ok(format!("sha256:{:x}", hasher.finalize()))
2600}
2601
2602fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2603 let mut hasher = sha2::Sha256::new();
2604 hasher.update(bytes);
2605 format!("sha256:{:x}", hasher.finalize())
2606}
2607
2608fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2609 let wasm_digest = digest
2610 .map(normalize_digest)
2611 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2612 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2613}
2614
2615async fn compile_component_with_cache(
2616 cache: &CacheManager,
2617 engine: &Engine,
2618 digest: Option<&str>,
2619 bytes: Vec<u8>,
2620) -> Result<Arc<Component>> {
2621 let key = build_artifact_key(cache, digest, &bytes);
2622 cache.get_component(engine, &key, || Ok(bytes)).await
2623}
2624
2625fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2626 let normalized_expected = normalize_digest(expected);
2627 let actual = compute_digest_for(bytes, &normalized_expected)?;
2628 if normalize_digest(&actual) != normalized_expected {
2629 bail!(
2630 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2631 );
2632 }
2633 Ok(())
2634}
2635
2636fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2637 let normalized_expected = normalize_sha256(expected)?;
2638 let actual = compute_sha256_digest_for(bytes);
2639 if actual != normalized_expected {
2640 bail!(
2641 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2642 );
2643 }
2644 Ok(())
2645}
2646
2647#[cfg(test)]
2648mod pack_lock_tests {
2649 use super::*;
2650 use tempfile::TempDir;
2651
2652 #[test]
2653 fn pack_lock_tag_ref_requires_bundle() {
2654 let lock = PackLockV1 {
2655 schema_version: 1,
2656 components: vec![PackLockComponent {
2657 name: "templates".to_string(),
2658 source_ref: Some("oci://registry.test/templates:latest".to_string()),
2659 legacy_ref: None,
2660 component_id: None,
2661 bundled: Some(false),
2662 bundled_path: None,
2663 legacy_path: None,
2664 wasm_sha256: None,
2665 legacy_sha256: None,
2666 resolved_digest: None,
2667 digest: None,
2668 }],
2669 };
2670 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2671 assert!(
2672 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2673 "unexpected error: {err}"
2674 );
2675 }
2676
2677 #[test]
2678 fn bundled_hash_mismatch_errors() {
2679 let rt = tokio::runtime::Runtime::new().expect("runtime");
2680 let temp = TempDir::new().expect("temp dir");
2681 let engine = Engine::default();
2682 let engine_profile =
2683 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2684 let cache_config = CacheConfig {
2685 root: temp.path().join("cache"),
2686 ..CacheConfig::default()
2687 };
2688 let cache = CacheManager::new(cache_config, engine_profile);
2689 let wasm_path = temp.path().join("component.wasm");
2690 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2691 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2692 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2693 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2694
2695 let spec = ComponentSpec {
2696 id: "qa.process".to_string(),
2697 version: "0.0.0".to_string(),
2698 legacy_path: None,
2699 };
2700 let mut missing = HashSet::new();
2701 missing.insert(spec.id.clone());
2702
2703 let mut sources = HashMap::new();
2704 sources.insert(
2705 spec.id.clone(),
2706 ComponentSourceInfo {
2707 digest: Some("sha256:deadbeef".to_string()),
2708 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2709 artifact: ComponentArtifactLocation::Inline {
2710 wasm_path: "component.wasm".to_string(),
2711 },
2712 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2713 skip_digest_verification: false,
2714 },
2715 );
2716
2717 let mut loaded = HashMap::new();
2718 let result = rt.block_on(load_components_from_sources(
2719 &cache,
2720 &engine,
2721 &sources,
2722 &ComponentResolution::default(),
2723 &[spec],
2724 &mut missing,
2725 &mut loaded,
2726 Some(temp.path()),
2727 None,
2728 ));
2729 let err = result.unwrap_err();
2730 assert!(
2731 err.to_string().contains("bundled digest mismatch"),
2732 "unexpected error: {err}"
2733 );
2734 }
2735}
2736
2737#[cfg(test)]
2738mod pack_resolution_prop_tests {
2739 use super::*;
2740 use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
2741 use proptest::prelude::*;
2742 use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
2743 use std::collections::BTreeSet;
2744 use std::path::Path;
2745 use std::str::FromStr;
2746
2747 #[derive(Clone, Debug)]
2748 enum ResolveRequest {
2749 ById(String),
2750 ByName(String),
2751 }
2752
2753 #[derive(Clone, Debug, PartialEq, Eq)]
2754 struct ResolvedComponent {
2755 key: String,
2756 source: String,
2757 artifact: String,
2758 digest: Option<String>,
2759 expected_wasm_sha256: Option<String>,
2760 skip_digest_verification: bool,
2761 }
2762
2763 #[derive(Clone, Debug, PartialEq, Eq)]
2764 struct ResolveError {
2765 code: String,
2766 message: String,
2767 context_key: String,
2768 }
2769
2770 #[derive(Clone, Debug)]
2771 struct Scenario {
2772 pack_lock: Option<PackLockV1>,
2773 component_sources: Option<ComponentSourcesV1>,
2774 request: ResolveRequest,
2775 expected_sha256: Option<String>,
2776 bytes: Vec<u8>,
2777 }
2778
2779 fn resolve_component_test(
2780 sources: Option<&ComponentSourcesV1>,
2781 lock: Option<&PackLockV1>,
2782 request: &ResolveRequest,
2783 ) -> Result<ResolvedComponent, ResolveError> {
2784 let table = if let Some(lock) = lock {
2785 component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
2786 code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
2787 message: err.to_string(),
2788 context_key: request_key(request).to_string(),
2789 })?
2790 } else {
2791 let sources = component_sources_table(sources).map_err(|err| ResolveError {
2792 code: "component_sources_error".to_string(),
2793 message: err.to_string(),
2794 context_key: request_key(request).to_string(),
2795 })?;
2796 sources.ok_or_else(|| ResolveError {
2797 code: "missing_component_sources".to_string(),
2798 message: "component sources not provided".to_string(),
2799 context_key: request_key(request).to_string(),
2800 })?
2801 };
2802
2803 let key = request_key(request);
2804 let source = table.get(key).ok_or_else(|| ResolveError {
2805 code: "component_not_found".to_string(),
2806 message: format!("component {key} not found"),
2807 context_key: key.to_string(),
2808 })?;
2809
2810 Ok(ResolvedComponent {
2811 key: key.to_string(),
2812 source: source.source.to_string(),
2813 artifact: match source.artifact {
2814 ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
2815 ComponentArtifactLocation::Remote => "remote".to_string(),
2816 },
2817 digest: source.digest.clone(),
2818 expected_wasm_sha256: source.expected_wasm_sha256.clone(),
2819 skip_digest_verification: source.skip_digest_verification,
2820 })
2821 }
2822
2823 fn request_key(request: &ResolveRequest) -> &str {
2824 match request {
2825 ResolveRequest::ById(value) => value.as_str(),
2826 ResolveRequest::ByName(value) => value.as_str(),
2827 }
2828 }
2829
2830 fn classify_pack_lock_error(message: &str) -> &'static str {
2831 if message.contains("duplicate component name") {
2832 "duplicate_name"
2833 } else if message.contains("duplicate component id") {
2834 "duplicate_id"
2835 } else if message.contains("conflicting refs") {
2836 "conflicting_ref"
2837 } else if message.contains("conflicting bundled paths") {
2838 "conflicting_bundled_path"
2839 } else if message.contains("conflicting wasm_sha256") {
2840 "conflicting_wasm_sha256"
2841 } else if message.contains("missing source_ref") {
2842 "missing_source_ref"
2843 } else if message.contains("marked bundled but bundled_path is missing") {
2844 "missing_bundled_path"
2845 } else if message.contains("missing wasm_sha256") {
2846 "missing_wasm_sha256"
2847 } else if message.contains("tag ref") && message.contains("not bundled") {
2848 "tag_ref_requires_bundle"
2849 } else if message.contains("missing resolved_digest") {
2850 "missing_resolved_digest"
2851 } else if message.contains("invalid component ref") {
2852 "invalid_component_ref"
2853 } else if message.contains("sha256 digest") {
2854 "invalid_sha256"
2855 } else {
2856 "unknown_error"
2857 }
2858 }
2859
2860 fn known_error_codes() -> BTreeSet<&'static str> {
2861 [
2862 "component_sources_error",
2863 "missing_component_sources",
2864 "component_not_found",
2865 "duplicate_name",
2866 "duplicate_id",
2867 "conflicting_ref",
2868 "conflicting_bundled_path",
2869 "conflicting_wasm_sha256",
2870 "missing_source_ref",
2871 "missing_bundled_path",
2872 "missing_wasm_sha256",
2873 "tag_ref_requires_bundle",
2874 "missing_resolved_digest",
2875 "invalid_component_ref",
2876 "invalid_sha256",
2877 "unknown_error",
2878 ]
2879 .into_iter()
2880 .collect()
2881 }
2882
2883 fn proptest_config() -> ProptestConfig {
2884 let cases = std::env::var("PROPTEST_CASES")
2885 .ok()
2886 .and_then(|value| value.parse::<u32>().ok())
2887 .unwrap_or(128);
2888 ProptestConfig {
2889 cases,
2890 failure_persistence: None,
2891 ..ProptestConfig::default()
2892 }
2893 }
2894
2895 fn proptest_seed() -> Option<[u8; 32]> {
2896 let seed = std::env::var("PROPTEST_SEED")
2897 .ok()
2898 .and_then(|value| value.parse::<u64>().ok())?;
2899 let mut bytes = [0u8; 32];
2900 bytes[..8].copy_from_slice(&seed.to_le_bytes());
2901 Some(bytes)
2902 }
2903
2904 fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
2905 let config = ProptestConfig {
2906 cases,
2907 failure_persistence: None,
2908 ..ProptestConfig::default()
2909 };
2910 let mut runner = match seed {
2911 Some(bytes) => {
2912 TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
2913 }
2914 None => TestRunner::new(config),
2915 };
2916 runner
2917 .run(&strategy, |scenario| {
2918 run_scenario(&scenario);
2919 Ok(())
2920 })
2921 .unwrap();
2922 }
2923
2924 fn run_scenario(scenario: &Scenario) {
2925 let known_codes = known_error_codes();
2926 let first = resolve_component_test(
2927 scenario.component_sources.as_ref(),
2928 scenario.pack_lock.as_ref(),
2929 &scenario.request,
2930 );
2931 let second = resolve_component_test(
2932 scenario.component_sources.as_ref(),
2933 scenario.pack_lock.as_ref(),
2934 &scenario.request,
2935 );
2936 assert_eq!(normalize_result(&first), normalize_result(&second));
2937
2938 if let Some(lock) = scenario.pack_lock.as_ref() {
2939 let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
2940 assert_eq!(normalize_result(&first), normalize_result(&lock_only));
2941 }
2942
2943 if let Err(err) = first.as_ref() {
2944 assert!(
2945 known_codes.contains(err.code.as_str()),
2946 "unexpected error code {}: {}",
2947 err.code,
2948 err.message
2949 );
2950 }
2951
2952 if let Some(expected) = scenario.expected_sha256.as_deref() {
2953 let expected_ok =
2954 verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
2955 let actual = compute_sha256_digest_for(&scenario.bytes);
2956 if actual == normalize_sha256(expected).unwrap_or_default() {
2957 assert!(expected_ok, "expected sha256 match to succeed");
2958 } else {
2959 assert!(!expected_ok, "expected sha256 mismatch to fail");
2960 }
2961 }
2962 }
2963
2964 fn normalize_result(
2965 result: &Result<ResolvedComponent, ResolveError>,
2966 ) -> Result<ResolvedComponent, ResolveError> {
2967 match result {
2968 Ok(value) => Ok(value.clone()),
2969 Err(err) => Err(err.clone()),
2970 }
2971 }
2972
2973 fn scenario_strategy() -> impl Strategy<Value = Scenario> {
2974 let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
2975 let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
2976 let tag_ref = any::<bool>();
2977 let bundled = any::<bool>();
2978 let include_sha = any::<bool>();
2979 let include_component_id = any::<bool>();
2980 let request_by_id = any::<bool>();
2981 let use_lock = any::<bool>();
2982 let use_sources = any::<bool>();
2983 let bytes = prop::collection::vec(any::<u8>(), 1..64);
2984
2985 (
2986 name,
2987 alt_name,
2988 tag_ref,
2989 bundled,
2990 include_sha,
2991 include_component_id,
2992 request_by_id,
2993 use_lock,
2994 use_sources,
2995 bytes,
2996 )
2997 .prop_map(
2998 |(
2999 name,
3000 alt_name,
3001 tag_ref,
3002 bundled,
3003 include_sha,
3004 include_component_id,
3005 request_by_id,
3006 use_lock,
3007 use_sources,
3008 bytes,
3009 )| {
3010 let component_id_str = if include_component_id {
3011 alt_name.clone()
3012 } else {
3013 name.clone()
3014 };
3015 let component_id = ComponentId::from_str(&component_id_str).ok();
3016 let source_ref = if tag_ref {
3017 format!("oci://registry.test/{name}:v1")
3018 } else {
3019 format!(
3020 "oci://registry.test/{name}@sha256:{}",
3021 hex::encode([0x11u8; 32])
3022 )
3023 };
3024 let expected_sha256 = if bundled && include_sha {
3025 Some(compute_sha256_digest_for(&bytes))
3026 } else {
3027 None
3028 };
3029
3030 let lock_component = PackLockComponent {
3031 name: name.clone(),
3032 source_ref: Some(source_ref),
3033 legacy_ref: None,
3034 component_id,
3035 bundled: Some(bundled),
3036 bundled_path: if bundled {
3037 Some(format!("components/{name}.wasm"))
3038 } else {
3039 None
3040 },
3041 legacy_path: None,
3042 wasm_sha256: expected_sha256.clone(),
3043 legacy_sha256: None,
3044 resolved_digest: if bundled {
3045 None
3046 } else {
3047 Some("sha256:deadbeef".to_string())
3048 },
3049 digest: None,
3050 };
3051
3052 let pack_lock = if use_lock {
3053 Some(PackLockV1 {
3054 schema_version: 1,
3055 components: vec![lock_component],
3056 })
3057 } else {
3058 None
3059 };
3060
3061 let component_sources = if use_sources {
3062 Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
3063 name: name.clone(),
3064 component_id: ComponentId::from_str(&name).ok(),
3065 source: ComponentSourceRef::from_str(
3066 "oci://registry.test/component@sha256:deadbeef",
3067 )
3068 .expect("component ref"),
3069 resolved: ResolvedComponentV1 {
3070 digest: "sha256:deadbeef".to_string(),
3071 signature: None,
3072 signed_by: None,
3073 },
3074 artifact: if bundled {
3075 ArtifactLocationV1::Inline {
3076 wasm_path: format!("components/{name}.wasm"),
3077 manifest_path: None,
3078 }
3079 } else {
3080 ArtifactLocationV1::Remote
3081 },
3082 licensing_hint: None,
3083 metering_hint: None,
3084 }]))
3085 } else {
3086 None
3087 };
3088
3089 let request = if request_by_id {
3090 ResolveRequest::ById(component_id_str.clone())
3091 } else {
3092 ResolveRequest::ByName(name.clone())
3093 };
3094
3095 Scenario {
3096 pack_lock,
3097 component_sources,
3098 request,
3099 expected_sha256,
3100 bytes,
3101 }
3102 },
3103 )
3104 }
3105
3106 #[test]
3107 fn pack_resolution_proptest() {
3108 let seed = proptest_seed();
3109 run_cases(scenario_strategy(), proptest_config().cases, seed);
3110 }
3111
3112 #[test]
3113 fn pack_resolution_regression_seeds() {
3114 let seeds_path =
3115 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
3116 let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
3117 for line in raw.lines() {
3118 let line = line.trim();
3119 if line.is_empty() || line.starts_with('#') {
3120 continue;
3121 }
3122 let seed = line.parse::<u64>().expect("seed must be an integer");
3123 let mut bytes = [0u8; 32];
3124 bytes[..8].copy_from_slice(&seed.to_le_bytes());
3125 run_cases(scenario_strategy(), 1, Some(bytes));
3126 }
3127 }
3128}
3129
3130fn locate_pack_assets(
3131 materialized_root: Option<&Path>,
3132 archive_hint: Option<&Path>,
3133) -> Result<(Option<PathBuf>, Option<TempDir>)> {
3134 if let Some(root) = materialized_root {
3135 let assets = root.join("assets");
3136 if assets.is_dir() {
3137 return Ok((Some(assets), None));
3138 }
3139 }
3140 if let Some(path) = archive_hint
3141 && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
3142 {
3143 return Ok((Some(assets), Some(tempdir)));
3144 }
3145 Ok((None, None))
3146}
3147
3148fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
3149 let file =
3150 File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
3151 let mut archive =
3152 ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
3153 let temp = TempDir::new().context("failed to create temporary assets directory")?;
3154 let mut found = false;
3155 for idx in 0..archive.len() {
3156 let mut entry = archive.by_index(idx)?;
3157 let name = entry.name();
3158 if !name.starts_with("assets/") {
3159 continue;
3160 }
3161 let dest = temp.path().join(name);
3162 if name.ends_with('/') {
3163 std::fs::create_dir_all(&dest)?;
3164 found = true;
3165 continue;
3166 }
3167 if let Some(parent) = dest.parent() {
3168 std::fs::create_dir_all(parent)?;
3169 }
3170 let mut outfile = std::fs::File::create(&dest)?;
3171 std::io::copy(&mut entry, &mut outfile)?;
3172 found = true;
3173 }
3174 if found {
3175 let assets_path = temp.path().join("assets");
3176 Ok(Some((temp, assets_path)))
3177 } else {
3178 Ok(None)
3179 }
3180}
3181
3182fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
3183 let mut opts = DistOptions {
3184 allow_tags: true,
3185 ..DistOptions::default()
3186 };
3187 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
3188 opts.cache_dir = cache_dir;
3189 }
3190 if component_resolution.dist_offline {
3191 opts.offline = true;
3192 }
3193 opts
3194}
3195
3196#[allow(clippy::too_many_arguments)]
3197async fn load_components_from_sources(
3198 cache: &CacheManager,
3199 engine: &Engine,
3200 component_sources: &HashMap<String, ComponentSourceInfo>,
3201 component_resolution: &ComponentResolution,
3202 specs: &[ComponentSpec],
3203 missing: &mut HashSet<String>,
3204 into: &mut HashMap<String, PackComponent>,
3205 materialized_root: Option<&Path>,
3206 archive_hint: Option<&Path>,
3207) -> Result<()> {
3208 let mut archive = if let Some(path) = archive_hint {
3209 Some(
3210 ZipArchive::new(File::open(path)?)
3211 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
3212 )
3213 } else {
3214 None
3215 };
3216 let mut dist_client: Option<DistClient> = None;
3217
3218 for spec in specs {
3219 if !missing.contains(&spec.id) {
3220 continue;
3221 }
3222 let Some(source) = component_sources.get(&spec.id) else {
3223 continue;
3224 };
3225
3226 let bytes = match &source.artifact {
3227 ComponentArtifactLocation::Inline { wasm_path } => {
3228 if let Some(root) = materialized_root {
3229 let path = root.join(wasm_path);
3230 if path.exists() {
3231 std::fs::read(&path).with_context(|| {
3232 format!(
3233 "failed to read inline component {} from {}",
3234 spec.id,
3235 path.display()
3236 )
3237 })?
3238 } else if archive.is_none() {
3239 bail!("inline component {} missing at {}", spec.id, path.display());
3240 } else {
3241 read_entry(
3242 archive.as_mut().expect("archive present when needed"),
3243 wasm_path,
3244 )
3245 .with_context(|| {
3246 format!(
3247 "inline component {} missing at {} in pack archive",
3248 spec.id, wasm_path
3249 )
3250 })?
3251 }
3252 } else if let Some(archive) = archive.as_mut() {
3253 read_entry(archive, wasm_path).with_context(|| {
3254 format!(
3255 "inline component {} missing at {} in pack archive",
3256 spec.id, wasm_path
3257 )
3258 })?
3259 } else {
3260 bail!(
3261 "inline component {} missing and no pack source available",
3262 spec.id
3263 );
3264 }
3265 }
3266 ComponentArtifactLocation::Remote => {
3267 if source.source.is_tag() {
3268 bail!(
3269 "component {} uses tag ref {} but is not bundled; rebuild the pack",
3270 spec.id,
3271 source.source
3272 );
3273 }
3274 let client = dist_client.get_or_insert_with(|| {
3275 DistClient::new(dist_options_from(component_resolution))
3276 });
3277 let reference = source.source.to_string();
3278 fault::maybe_fail_asset(&reference)
3279 .await
3280 .with_context(|| format!("fault injection blocked asset {reference}"))?;
3281 let digest = source.digest.as_deref().ok_or_else(|| {
3282 anyhow!(
3283 "component {} missing expected digest for remote component",
3284 spec.id
3285 )
3286 })?;
3287 let cache_path = if component_resolution.dist_offline {
3288 client
3289 .fetch_digest(digest)
3290 .await
3291 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3292 } else {
3293 let resolved = client
3294 .resolve_ref(&reference)
3295 .await
3296 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3297 let expected = normalize_digest(digest);
3298 let actual = normalize_digest(&resolved.digest);
3299 if expected != actual {
3300 bail!(
3301 "component {} digest mismatch after fetch: expected {}, got {}",
3302 spec.id,
3303 expected,
3304 actual
3305 );
3306 }
3307 resolved.cache_path.ok_or_else(|| {
3308 anyhow!(
3309 "component {} resolved from {} but cache path is missing",
3310 spec.id,
3311 reference
3312 )
3313 })?
3314 };
3315 std::fs::read(&cache_path).with_context(|| {
3316 format!(
3317 "failed to read cached component {} from {}",
3318 spec.id,
3319 cache_path.display()
3320 )
3321 })?
3322 }
3323 };
3324
3325 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3326 verify_wasm_sha256(&spec.id, expected, &bytes)?;
3327 } else if source.skip_digest_verification {
3328 let actual = compute_sha256_digest_for(&bytes);
3329 warn!(
3330 component_id = %spec.id,
3331 digest = %actual,
3332 "bundled component missing wasm_sha256; allowing due to flag"
3333 );
3334 } else {
3335 let expected = source.digest.as_deref().ok_or_else(|| {
3336 anyhow!(
3337 "component {} missing expected digest for verification",
3338 spec.id
3339 )
3340 })?;
3341 verify_component_digest(&spec.id, expected, &bytes)?;
3342 }
3343 let component =
3344 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3345 .await
3346 .with_context(|| format!("failed to compile component {}", spec.id))?;
3347 into.insert(
3348 spec.id.clone(),
3349 PackComponent {
3350 name: spec.id.clone(),
3351 version: spec.version.clone(),
3352 component,
3353 },
3354 );
3355 missing.remove(&spec.id);
3356 }
3357
3358 Ok(())
3359}
3360
3361fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3362 match err {
3363 DistError::CacheMiss { reference: missing } => anyhow!(
3364 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3365 component_id,
3366 missing,
3367 reference
3368 ),
3369 DistError::Offline { reference: blocked } => anyhow!(
3370 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3371 component_id,
3372 blocked,
3373 reference
3374 ),
3375 DistError::AuthRequired { target } => anyhow!(
3376 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3377 component_id,
3378 target,
3379 reference
3380 ),
3381 other => anyhow!(
3382 "failed to resolve component {} from {}: {}",
3383 component_id,
3384 reference,
3385 other
3386 ),
3387 }
3388}
3389
3390async fn load_components_from_overrides(
3391 cache: &CacheManager,
3392 engine: &Engine,
3393 overrides: &HashMap<String, PathBuf>,
3394 specs: &[ComponentSpec],
3395 missing: &mut HashSet<String>,
3396 into: &mut HashMap<String, PackComponent>,
3397) -> Result<()> {
3398 for spec in specs {
3399 if !missing.contains(&spec.id) {
3400 continue;
3401 }
3402 let Some(path) = overrides.get(&spec.id) else {
3403 continue;
3404 };
3405 let bytes = std::fs::read(path)
3406 .with_context(|| format!("failed to read override component {}", path.display()))?;
3407 let component = compile_component_with_cache(cache, engine, None, bytes)
3408 .await
3409 .with_context(|| {
3410 format!(
3411 "failed to compile component {} from override {}",
3412 spec.id,
3413 path.display()
3414 )
3415 })?;
3416 into.insert(
3417 spec.id.clone(),
3418 PackComponent {
3419 name: spec.id.clone(),
3420 version: spec.version.clone(),
3421 component,
3422 },
3423 );
3424 missing.remove(&spec.id);
3425 }
3426 Ok(())
3427}
3428
3429async fn load_components_from_dir(
3430 cache: &CacheManager,
3431 engine: &Engine,
3432 root: &Path,
3433 specs: &[ComponentSpec],
3434 missing: &mut HashSet<String>,
3435 into: &mut HashMap<String, PackComponent>,
3436) -> Result<()> {
3437 for spec in specs {
3438 if !missing.contains(&spec.id) {
3439 continue;
3440 }
3441 let path = component_path_for_spec(root, spec);
3442 if !path.exists() {
3443 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3444 continue;
3445 }
3446 let bytes = std::fs::read(&path)
3447 .with_context(|| format!("failed to read component {}", path.display()))?;
3448 let component = compile_component_with_cache(cache, engine, None, bytes)
3449 .await
3450 .with_context(|| {
3451 format!(
3452 "failed to compile component {} from {}",
3453 spec.id,
3454 path.display()
3455 )
3456 })?;
3457 into.insert(
3458 spec.id.clone(),
3459 PackComponent {
3460 name: spec.id.clone(),
3461 version: spec.version.clone(),
3462 component,
3463 },
3464 );
3465 missing.remove(&spec.id);
3466 }
3467 Ok(())
3468}
3469
3470async fn load_components_from_archive(
3471 cache: &CacheManager,
3472 engine: &Engine,
3473 path: &Path,
3474 specs: &[ComponentSpec],
3475 missing: &mut HashSet<String>,
3476 into: &mut HashMap<String, PackComponent>,
3477) -> Result<()> {
3478 let mut archive = ZipArchive::new(File::open(path)?)
3479 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3480 for spec in specs {
3481 if !missing.contains(&spec.id) {
3482 continue;
3483 }
3484 let file_name = spec
3485 .legacy_path
3486 .clone()
3487 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3488 let bytes = match read_entry(&mut archive, &file_name) {
3489 Ok(bytes) => bytes,
3490 Err(err) => {
3491 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3492 continue;
3493 }
3494 };
3495 let component = compile_component_with_cache(cache, engine, None, bytes)
3496 .await
3497 .with_context(|| format!("failed to compile component {}", spec.id))?;
3498 into.insert(
3499 spec.id.clone(),
3500 PackComponent {
3501 name: spec.id.clone(),
3502 version: spec.version.clone(),
3503 component,
3504 },
3505 );
3506 missing.remove(&spec.id);
3507 }
3508 Ok(())
3509}
3510
3511#[cfg(test)]
3512mod tests {
3513 use super::*;
3514 use greentic_flow::model::{FlowDoc, NodeDoc};
3515 use indexmap::IndexMap;
3516 use serde_json::json;
3517
3518 #[test]
3519 fn normalizes_raw_component_to_component_exec() {
3520 let mut nodes = IndexMap::new();
3521 let mut raw = IndexMap::new();
3522 raw.insert(
3523 "templating.handlebars".into(),
3524 json!({ "template": "Hi {{name}}" }),
3525 );
3526 nodes.insert(
3527 "start".into(),
3528 NodeDoc {
3529 raw,
3530 routing: json!([{"out": true}]),
3531 ..Default::default()
3532 },
3533 );
3534 let doc = FlowDoc {
3535 id: "welcome".into(),
3536 title: None,
3537 description: None,
3538 flow_type: "messaging".into(),
3539 start: Some("start".into()),
3540 parameters: json!({}),
3541 tags: Vec::new(),
3542 schema_version: None,
3543 entrypoints: IndexMap::new(),
3544 nodes,
3545 };
3546
3547 let normalized = normalize_flow_doc(doc);
3548 let node = normalized.nodes.get("start").expect("node exists");
3549 assert_eq!(node.operation.as_deref(), Some("component.exec"));
3550 assert!(node.raw.is_empty());
3551 let payload = node.payload.as_object().expect("payload object");
3552 assert_eq!(
3553 payload.get("component"),
3554 Some(&Value::String("templating.handlebars".into()))
3555 );
3556 assert_eq!(
3557 payload.get("operation"),
3558 Some(&Value::String("render".into()))
3559 );
3560 let input = payload.get("input").unwrap();
3561 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
3562 }
3563}
3564
3565#[derive(Clone, Debug, Default, Serialize, Deserialize)]
3566pub struct PackMetadata {
3567 pub pack_id: String,
3568 pub version: String,
3569 #[serde(default)]
3570 pub entry_flows: Vec<String>,
3571 #[serde(default)]
3572 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
3573}
3574
3575impl PackMetadata {
3576 fn from_wasm(bytes: &[u8]) -> Option<Self> {
3577 let parser = Parser::new(0);
3578 for payload in parser.parse_all(bytes) {
3579 let payload = payload.ok()?;
3580 match payload {
3581 Payload::CustomSection(section) => {
3582 if section.name() == "greentic.manifest"
3583 && let Ok(meta) = Self::from_bytes(section.data())
3584 {
3585 return Some(meta);
3586 }
3587 }
3588 Payload::DataSection(reader) => {
3589 for segment in reader.into_iter().flatten() {
3590 if let Ok(meta) = Self::from_bytes(segment.data) {
3591 return Some(meta);
3592 }
3593 }
3594 }
3595 _ => {}
3596 }
3597 }
3598 None
3599 }
3600
3601 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
3602 #[derive(Deserialize)]
3603 struct RawManifest {
3604 pack_id: String,
3605 version: String,
3606 #[serde(default)]
3607 entry_flows: Vec<String>,
3608 #[serde(default)]
3609 flows: Vec<RawFlow>,
3610 #[serde(default)]
3611 secret_requirements: Vec<greentic_types::SecretRequirement>,
3612 }
3613
3614 #[derive(Deserialize)]
3615 struct RawFlow {
3616 id: String,
3617 }
3618
3619 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
3620 let mut entry_flows = if manifest.entry_flows.is_empty() {
3621 manifest.flows.iter().map(|f| f.id.clone()).collect()
3622 } else {
3623 manifest.entry_flows.clone()
3624 };
3625 entry_flows.retain(|id| !id.is_empty());
3626 Ok(Self {
3627 pack_id: manifest.pack_id,
3628 version: manifest.version,
3629 entry_flows,
3630 secret_requirements: manifest.secret_requirements,
3631 })
3632 }
3633
3634 pub fn fallback(path: &Path) -> Self {
3635 let pack_id = path
3636 .file_stem()
3637 .map(|s| s.to_string_lossy().into_owned())
3638 .unwrap_or_else(|| "unknown-pack".to_string());
3639 Self {
3640 pack_id,
3641 version: "0.0.0".to_string(),
3642 entry_flows: Vec::new(),
3643 secret_requirements: Vec::new(),
3644 }
3645 }
3646
3647 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
3648 let entry_flows = manifest
3649 .flows
3650 .iter()
3651 .map(|flow| flow.id.as_str().to_string())
3652 .collect::<Vec<_>>();
3653 Self {
3654 pack_id: manifest.pack_id.as_str().to_string(),
3655 version: manifest.version.to_string(),
3656 entry_flows,
3657 secret_requirements: manifest.secret_requirements.clone(),
3658 }
3659 }
3660}