1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21 self as host_v1, HostFns, add_all_v1_to_linker,
22 runner_host_http::RunnerHostHttp,
23 runner_host_kv::RunnerHostKv,
24 secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
25 state_store::{
26 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27 StateStoreHost, TenantCtx as StateTenantCtx,
28 },
29 telemetry_logger::{
30 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32 TenantCtx as TelemetryTenantCtx,
33 },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
41 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
42 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
43 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44 pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69#[cfg(feature = "fault-injection")]
70use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
71
72use crate::config::HostConfig;
73use crate::fault;
74use crate::secrets::{DynSecretsManager, read_secret_blocking, write_secret_blocking};
75use crate::storage::state::STATE_PREFIX;
76use crate::storage::{DynSessionStore, DynStateStore};
77use crate::verify;
78use crate::wasi::RunnerWasiPolicy;
79use tracing::warn;
80use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
81use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
82
83use greentic_flow::model::FlowDoc;
84
85#[allow(dead_code)]
86pub struct PackRuntime {
87 path: PathBuf,
89 archive_path: Option<PathBuf>,
91 config: Arc<HostConfig>,
92 engine: Engine,
93 metadata: PackMetadata,
94 manifest: Option<greentic_types::PackManifest>,
95 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
96 component_manifests: HashMap<String, ComponentManifest>,
97 mocks: Option<Arc<MockLayer>>,
98 flows: Option<PackFlows>,
99 components: HashMap<String, PackComponent>,
100 http_client: Arc<BlockingClient>,
101 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
102 session_store: Option<DynSessionStore>,
103 state_store: Option<DynStateStore>,
104 wasi_policy: Arc<RunnerWasiPolicy>,
105 provider_registry: RwLock<Option<ProviderRegistry>>,
106 secrets: DynSecretsManager,
107 oauth_config: Option<OAuthBrokerConfig>,
108 cache: CacheManager,
109}
110
111struct PackComponent {
112 #[allow(dead_code)]
113 name: String,
114 #[allow(dead_code)]
115 version: String,
116 component: Arc<Component>,
117}
118
119#[derive(Debug, Default, Clone)]
120pub struct ComponentResolution {
121 pub materialized_root: Option<PathBuf>,
123 pub overrides: HashMap<String, PathBuf>,
125 pub dist_offline: bool,
127 pub dist_cache_dir: Option<PathBuf>,
129 pub allow_missing_hash: bool,
131}
132
133fn build_blocking_client() -> BlockingClient {
134 std::thread::spawn(|| {
135 BlockingClient::builder()
136 .no_proxy()
137 .build()
138 .expect("blocking client")
139 })
140 .join()
141 .expect("client build thread panicked")
142}
143
144fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
145 let (root, candidate) = if path.is_absolute() {
146 let parent = path
147 .parent()
148 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
149 let root = parent
150 .canonicalize()
151 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
152 let file = path
153 .file_name()
154 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
155 (root, PathBuf::from(file))
156 } else {
157 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
158 let base = if let Some(parent) = path.parent() {
159 cwd.join(parent)
160 } else {
161 cwd
162 };
163 let root = base
164 .canonicalize()
165 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
166 let file = path
167 .file_name()
168 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
169 (root, PathBuf::from(file))
170 };
171 let safe = normalize_under_root(&root, &candidate)?;
172 Ok((root, safe))
173}
174
175static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FlowDescriptor {
179 pub id: String,
180 #[serde(rename = "type")]
181 pub flow_type: String,
182 pub pack_id: String,
183 pub profile: String,
184 pub version: String,
185 #[serde(default)]
186 pub description: Option<String>,
187}
188
189pub struct HostState {
190 #[allow(dead_code)]
191 pack_id: String,
192 config: Arc<HostConfig>,
193 http_client: Arc<BlockingClient>,
194 default_env: String,
195 #[allow(dead_code)]
196 session_store: Option<DynSessionStore>,
197 state_store: Option<DynStateStore>,
198 mocks: Option<Arc<MockLayer>>,
199 secrets: DynSecretsManager,
200 oauth_config: Option<OAuthBrokerConfig>,
201 oauth_host: OAuthBrokerHost,
202 exec_ctx: Option<ComponentExecCtx>,
203 component_ref: Option<String>,
204 provider_core_component: bool,
205}
206
207impl HostState {
208 #[allow(clippy::default_constructed_unit_structs)]
209 #[allow(clippy::too_many_arguments)]
210 pub fn new(
211 pack_id: String,
212 config: Arc<HostConfig>,
213 http_client: Arc<BlockingClient>,
214 mocks: Option<Arc<MockLayer>>,
215 session_store: Option<DynSessionStore>,
216 state_store: Option<DynStateStore>,
217 secrets: DynSecretsManager,
218 oauth_config: Option<OAuthBrokerConfig>,
219 exec_ctx: Option<ComponentExecCtx>,
220 component_ref: Option<String>,
221 provider_core_component: bool,
222 ) -> Result<Self> {
223 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
224 Ok(Self {
225 pack_id,
226 config,
227 http_client,
228 default_env,
229 session_store,
230 state_store,
231 mocks,
232 secrets,
233 oauth_config,
234 oauth_host: OAuthBrokerHost::default(),
235 exec_ctx,
236 component_ref,
237 provider_core_component,
238 })
239 }
240
241 pub fn get_secret(&self, key: &str) -> Result<String> {
242 if provider_core_only::is_enabled() {
243 bail!(provider_core_only::blocked_message("secrets"))
244 }
245 if !self.config.secrets_policy.is_allowed(key) {
246 bail!("secret {key} is not permitted by bindings policy");
247 }
248 if let Some(mock) = &self.mocks
249 && let Some(value) = mock.secrets_lookup(key)
250 {
251 return Ok(value);
252 }
253 let ctx = self.config.tenant_ctx();
254 let bytes = read_secret_blocking(&self.secrets, &ctx, key)
255 .context("failed to read secret from manager")?;
256 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
257 Ok(value)
258 }
259
260 fn allows_secret_write_in_provider_core_only(&self) -> bool {
261 self.provider_core_component || self.component_ref.is_none()
262 }
263
264 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
265 let tenant_raw = ctx
266 .as_ref()
267 .map(|ctx| ctx.tenant.clone())
268 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
269 .unwrap_or_else(|| self.config.tenant.clone());
270 let env_raw = ctx
271 .as_ref()
272 .map(|ctx| ctx.env.clone())
273 .unwrap_or_else(|| self.default_env.clone());
274 let tenant_id = TenantId::from_str(&tenant_raw)
275 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
276 let env_id = EnvId::from_str(&env_raw)
277 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
278 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
279 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
280 if let Some(team) = exec_ctx.tenant.team.as_ref() {
281 let team_id =
282 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
283 tenant_ctx = tenant_ctx.with_team(Some(team_id));
284 }
285 if let Some(user) = exec_ctx.tenant.user.as_ref() {
286 let user_id =
287 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
288 tenant_ctx = tenant_ctx.with_user(Some(user_id));
289 }
290 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
291 if let Some(node) = exec_ctx.node_id.as_ref() {
292 tenant_ctx = tenant_ctx.with_node(node.clone());
293 }
294 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
295 tenant_ctx = tenant_ctx.with_session(session.clone());
296 }
297 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
298 }
299
300 if let Some(ctx) = ctx {
301 if let Some(team) = ctx.team.or(ctx.team_id) {
302 let team_id =
303 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
304 tenant_ctx = tenant_ctx.with_team(Some(team_id));
305 }
306 if let Some(user) = ctx.user.or(ctx.user_id) {
307 let user_id =
308 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
309 tenant_ctx = tenant_ctx.with_user(Some(user_id));
310 }
311 if let Some(flow) = ctx.flow_id {
312 tenant_ctx = tenant_ctx.with_flow(flow);
313 }
314 if let Some(node) = ctx.node_id {
315 tenant_ctx = tenant_ctx.with_node(node);
316 }
317 if let Some(provider) = ctx.provider_id {
318 tenant_ctx = tenant_ctx.with_provider(provider);
319 }
320 if let Some(session) = ctx.session_id {
321 tenant_ctx = tenant_ctx.with_session(session);
322 }
323 tenant_ctx.trace_id = ctx.trace_id;
324 }
325 Ok(tenant_ctx)
326 }
327
328 fn send_http_request(
329 &mut self,
330 req: HttpRequest,
331 opts: Option<HttpRequestOptionsV1_1>,
332 _ctx: Option<HttpTenantCtx>,
333 ) -> Result<HttpResponse, HttpClientError> {
334 if !self.config.http_enabled {
335 return Err(HttpClientError {
336 code: "denied".into(),
337 message: "http client disabled by policy".into(),
338 });
339 }
340
341 let mut mock_state = None;
342 let raw_body = req.body.clone();
343 if let Some(mock) = &self.mocks
344 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
345 {
346 match mock.http_begin(&meta) {
347 HttpDecision::Mock(response) => {
348 let headers = response
349 .headers
350 .iter()
351 .map(|(k, v)| (k.clone(), v.clone()))
352 .collect();
353 return Ok(HttpResponse {
354 status: response.status,
355 headers,
356 body: response.body.clone().map(|b| b.into_bytes()),
357 });
358 }
359 HttpDecision::Deny(reason) => {
360 return Err(HttpClientError {
361 code: "denied".into(),
362 message: reason,
363 });
364 }
365 HttpDecision::Passthrough { record } => {
366 mock_state = Some((meta, record));
367 }
368 }
369 }
370
371 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
372 let mut builder = self.http_client.request(method, &req.url);
373 for (key, value) in req.headers {
374 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
375 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
376 {
377 builder = builder.header(header, header_value);
378 }
379 }
380
381 if let Some(body) = raw_body.clone() {
382 builder = builder.body(body);
383 }
384
385 if let Some(opts) = opts {
386 if let Some(timeout_ms) = opts.timeout_ms {
387 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
388 }
389 if opts.allow_insecure == Some(true) {
390 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
391 }
392 if let Some(follow_redirects) = opts.follow_redirects
393 && !follow_redirects
394 {
395 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
396 }
397 }
398
399 let response = match builder.send() {
400 Ok(resp) => resp,
401 Err(err) => {
402 warn!(url = %req.url, error = %err, "http client request failed");
403 return Err(HttpClientError {
404 code: "unavailable".into(),
405 message: err.to_string(),
406 });
407 }
408 };
409
410 let status = response.status().as_u16();
411 let headers_vec = response
412 .headers()
413 .iter()
414 .map(|(k, v)| {
415 (
416 k.as_str().to_string(),
417 v.to_str().unwrap_or_default().to_string(),
418 )
419 })
420 .collect::<Vec<_>>();
421 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
422
423 if let Some((meta, true)) = mock_state.take()
424 && let Some(mock) = &self.mocks
425 {
426 let recorded = HttpMockResponse::new(
427 status,
428 headers_vec.clone().into_iter().collect(),
429 body_bytes
430 .as_ref()
431 .map(|b| String::from_utf8_lossy(b).into_owned()),
432 );
433 mock.http_record(&meta, &recorded);
434 }
435
436 Ok(HttpResponse {
437 status,
438 headers: headers_vec,
439 body: body_bytes,
440 })
441 }
442}
443
444impl SecretsStoreHost for HostState {
445 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
446 if provider_core_only::is_enabled() {
447 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
448 return Err(SecretsError::Denied);
449 }
450 if !self.config.secrets_policy.is_allowed(&key) {
451 return Err(SecretsError::Denied);
452 }
453 if let Some(mock) = &self.mocks
454 && let Some(value) = mock.secrets_lookup(&key)
455 {
456 return Ok(Some(value.into_bytes()));
457 }
458 let ctx = self.config.tenant_ctx();
459 match read_secret_blocking(&self.secrets, &ctx, &key) {
460 Ok(bytes) => Ok(Some(bytes)),
461 Err(err) => {
462 warn!(secret = %key, error = %err, "secret lookup failed");
463 Err(SecretsError::NotFound)
464 }
465 }
466 }
467}
468
469impl SecretsStoreHostV1_1 for HostState {
470 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
471 if provider_core_only::is_enabled() {
472 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
473 return Err(SecretsErrorV1_1::Denied);
474 }
475 if !self.config.secrets_policy.is_allowed(&key) {
476 return Err(SecretsErrorV1_1::Denied);
477 }
478 if let Some(mock) = &self.mocks
479 && let Some(value) = mock.secrets_lookup(&key)
480 {
481 return Ok(Some(value.into_bytes()));
482 }
483 let ctx = self.config.tenant_ctx();
484 match read_secret_blocking(&self.secrets, &ctx, &key) {
485 Ok(bytes) => Ok(Some(bytes)),
486 Err(err) => {
487 warn!(secret = %key, error = %err, "secret lookup failed");
488 Err(SecretsErrorV1_1::NotFound)
489 }
490 }
491 }
492
493 fn put(&mut self, key: String, value: Vec<u8>) {
494 if key.trim().is_empty() {
495 warn!(secret = %key, "secret write blocked: empty key");
496 panic!("secret write denied for key {key}: invalid key");
497 }
498 if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
499 warn!(
500 secret = %key,
501 component = self.component_ref.as_deref().unwrap_or("<pack>"),
502 "provider-core only mode enabled; blocking secrets store write"
503 );
504 panic!("secret write denied for key {key}: provider-core-only mode");
505 }
506 if !self.config.secrets_policy.is_allowed(&key) {
507 warn!(secret = %key, "secret write denied by bindings policy");
508 panic!("secret write denied for key {key}: policy");
509 }
510 let ctx = self.config.tenant_ctx();
511 if let Err(err) = write_secret_blocking(&self.secrets, &ctx, &key, &value) {
512 warn!(secret = %key, error = %err, "secret write failed");
513 panic!("secret write failed for key {key}");
514 }
515 }
516}
517
518impl HttpClientHost for HostState {
519 fn send(
520 &mut self,
521 req: HttpRequest,
522 ctx: Option<HttpTenantCtx>,
523 ) -> Result<HttpResponse, HttpClientError> {
524 self.send_http_request(req, None, ctx)
525 }
526}
527
528impl HttpClientHostV1_1 for HostState {
529 fn send(
530 &mut self,
531 req: HttpRequestV1_1,
532 opts: Option<HttpRequestOptionsV1_1>,
533 ctx: Option<HttpTenantCtxV1_1>,
534 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
535 let legacy_req = HttpRequest {
536 method: req.method,
537 url: req.url,
538 headers: req.headers,
539 body: req.body,
540 };
541 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
542 env: ctx.env,
543 tenant: ctx.tenant,
544 tenant_id: ctx.tenant_id,
545 team: ctx.team,
546 team_id: ctx.team_id,
547 user: ctx.user,
548 user_id: ctx.user_id,
549 trace_id: ctx.trace_id,
550 correlation_id: ctx.correlation_id,
551 attributes: ctx.attributes,
552 session_id: ctx.session_id,
553 flow_id: ctx.flow_id,
554 node_id: ctx.node_id,
555 provider_id: ctx.provider_id,
556 deadline_ms: ctx.deadline_ms,
557 attempt: ctx.attempt,
558 idempotency_key: ctx.idempotency_key,
559 impersonation: ctx
560 .impersonation
561 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
562 actor_id,
563 reason,
564 }),
565 });
566
567 self.send_http_request(legacy_req, opts, legacy_ctx)
568 .map(|resp| HttpResponseV1_1 {
569 status: resp.status,
570 headers: resp.headers,
571 body: resp.body,
572 })
573 .map_err(|err| HttpClientErrorV1_1 {
574 code: err.code,
575 message: err.message,
576 })
577 }
578}
579
580impl StateStoreHost for HostState {
581 fn read(
582 &mut self,
583 key: HostStateKey,
584 ctx: Option<StateTenantCtx>,
585 ) -> Result<Vec<u8>, StateError> {
586 let store = match self.state_store.as_ref() {
587 Some(store) => store.clone(),
588 None => {
589 return Err(StateError {
590 code: "unavailable".into(),
591 message: "state store not configured".into(),
592 });
593 }
594 };
595 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
596 Ok(ctx) => ctx,
597 Err(err) => {
598 return Err(StateError {
599 code: "invalid-ctx".into(),
600 message: err.to_string(),
601 });
602 }
603 };
604 #[cfg(feature = "fault-injection")]
605 {
606 let exec_ctx = self.exec_ctx.as_ref();
607 let flow_id = exec_ctx
608 .map(|ctx| ctx.flow_id.as_str())
609 .unwrap_or("unknown");
610 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
611 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
612 let fault_ctx = FaultContext {
613 pack_id: self.pack_id.as_str(),
614 flow_id,
615 node_id,
616 attempt,
617 };
618 if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
619 return Err(StateError {
620 code: "internal".into(),
621 message: err.to_string(),
622 });
623 }
624 }
625 let key = StoreStateKey::from(key);
626 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
627 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
628 Ok(None) => Err(StateError {
629 code: "not_found".into(),
630 message: "state key not found".into(),
631 }),
632 Err(err) => Err(StateError {
633 code: "internal".into(),
634 message: err.to_string(),
635 }),
636 }
637 }
638
639 fn write(
640 &mut self,
641 key: HostStateKey,
642 bytes: Vec<u8>,
643 ctx: Option<StateTenantCtx>,
644 ) -> Result<StateOpAck, StateError> {
645 let store = match self.state_store.as_ref() {
646 Some(store) => store.clone(),
647 None => {
648 return Err(StateError {
649 code: "unavailable".into(),
650 message: "state store not configured".into(),
651 });
652 }
653 };
654 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
655 Ok(ctx) => ctx,
656 Err(err) => {
657 return Err(StateError {
658 code: "invalid-ctx".into(),
659 message: err.to_string(),
660 });
661 }
662 };
663 #[cfg(feature = "fault-injection")]
664 {
665 let exec_ctx = self.exec_ctx.as_ref();
666 let flow_id = exec_ctx
667 .map(|ctx| ctx.flow_id.as_str())
668 .unwrap_or("unknown");
669 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
670 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
671 let fault_ctx = FaultContext {
672 pack_id: self.pack_id.as_str(),
673 flow_id,
674 node_id,
675 attempt,
676 };
677 if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
678 return Err(StateError {
679 code: "internal".into(),
680 message: err.to_string(),
681 });
682 }
683 }
684 let key = StoreStateKey::from(key);
685 let value = serde_json::from_slice(&bytes)
686 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
687 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
688 Ok(()) => Ok(StateOpAck::Ok),
689 Err(err) => Err(StateError {
690 code: "internal".into(),
691 message: err.to_string(),
692 }),
693 }
694 }
695
696 fn delete(
697 &mut self,
698 key: HostStateKey,
699 ctx: Option<StateTenantCtx>,
700 ) -> Result<StateOpAck, StateError> {
701 let store = match self.state_store.as_ref() {
702 Some(store) => store.clone(),
703 None => {
704 return Err(StateError {
705 code: "unavailable".into(),
706 message: "state store not configured".into(),
707 });
708 }
709 };
710 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
711 Ok(ctx) => ctx,
712 Err(err) => {
713 return Err(StateError {
714 code: "invalid-ctx".into(),
715 message: err.to_string(),
716 });
717 }
718 };
719 let key = StoreStateKey::from(key);
720 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
721 Ok(_) => Ok(StateOpAck::Ok),
722 Err(err) => Err(StateError {
723 code: "internal".into(),
724 message: err.to_string(),
725 }),
726 }
727 }
728}
729
730impl TelemetryLoggerHost for HostState {
731 fn log(
732 &mut self,
733 span: TelemetrySpanContext,
734 fields: Vec<(String, String)>,
735 _ctx: Option<TelemetryTenantCtx>,
736 ) -> Result<TelemetryAck, TelemetryError> {
737 if let Some(mock) = &self.mocks
738 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
739 {
740 return Ok(TelemetryAck::Ok);
741 }
742 let mut map = serde_json::Map::new();
743 for (k, v) in fields {
744 map.insert(k, Value::String(v));
745 }
746 tracing::info!(
747 tenant = %span.tenant,
748 flow_id = %span.flow_id,
749 node = ?span.node_id,
750 provider = %span.provider,
751 fields = %serde_json::Value::Object(map.clone()),
752 "telemetry log from pack"
753 );
754 Ok(TelemetryAck::Ok)
755 }
756}
757
758impl RunnerHostHttp for HostState {
759 fn request(
760 &mut self,
761 method: String,
762 url: String,
763 headers: Vec<String>,
764 body: Option<Vec<u8>>,
765 ) -> Result<Vec<u8>, String> {
766 let req = HttpRequest {
767 method,
768 url,
769 headers: headers
770 .chunks(2)
771 .filter_map(|chunk| {
772 if chunk.len() == 2 {
773 Some((chunk[0].clone(), chunk[1].clone()))
774 } else {
775 None
776 }
777 })
778 .collect(),
779 body,
780 };
781 match HttpClientHost::send(self, req, None) {
782 Ok(resp) => Ok(resp.body.unwrap_or_default()),
783 Err(err) => Err(err.message),
784 }
785 }
786}
787
788impl RunnerHostKv for HostState {
789 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
790 None
791 }
792
793 fn put(&mut self, _ns: String, _key: String, _val: String) {}
794}
795
796enum ManifestLoad {
797 New {
798 manifest: Box<greentic_types::PackManifest>,
799 flows: PackFlows,
800 },
801 Legacy {
802 manifest: Box<legacy_pack::PackManifest>,
803 flows: PackFlows,
804 },
805}
806
807fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
808 let mut archive = ZipArchive::new(File::open(path)?)
809 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
810 let bytes = read_entry(&mut archive, "manifest.cbor")
811 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
812 match decode_pack_manifest(&bytes) {
813 Ok(manifest) => {
814 let cache = PackFlows::from_manifest(manifest.clone());
815 Ok(ManifestLoad::New {
816 manifest: Box::new(manifest),
817 flows: cache,
818 })
819 }
820 Err(err) => {
821 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
822 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
824 .context("failed to decode legacy pack manifest from manifest.cbor")?;
825 let flows = load_legacy_flows(&mut archive, &legacy)?;
826 Ok(ManifestLoad::Legacy {
827 manifest: Box::new(legacy),
828 flows,
829 })
830 }
831 }
832}
833
834fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
835 let manifest_path = root.join("manifest.cbor");
836 let bytes = std::fs::read(&manifest_path)
837 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
838 match decode_pack_manifest(&bytes) {
839 Ok(manifest) => {
840 let cache = PackFlows::from_manifest(manifest.clone());
841 Ok(ManifestLoad::New {
842 manifest: Box::new(manifest),
843 flows: cache,
844 })
845 }
846 Err(err) => {
847 tracing::debug!(
848 error = %err,
849 pack = %root.display(),
850 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
851 );
852 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
853 .context("failed to decode legacy pack manifest from manifest.cbor")?;
854 let flows = load_legacy_flows_from_dir(root, &legacy)?;
855 Ok(ManifestLoad::Legacy {
856 manifest: Box::new(legacy),
857 flows,
858 })
859 }
860 }
861}
862
863fn load_legacy_flows(
864 archive: &mut ZipArchive<File>,
865 manifest: &legacy_pack::PackManifest,
866) -> Result<PackFlows> {
867 let mut flows = HashMap::new();
868 let mut descriptors = Vec::new();
869
870 for entry in &manifest.flows {
871 let bytes = read_entry(archive, &entry.file_json)
872 .with_context(|| format!("missing flow json {}", entry.file_json))?;
873 let doc: FlowDoc = serde_json::from_slice(&bytes)
874 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
875 let normalized = normalize_flow_doc(doc);
876 let flow_ir = flow_doc_to_ir(normalized)?;
877 let flow = flow_ir_to_flow(flow_ir)?;
878
879 descriptors.push(FlowDescriptor {
880 id: entry.id.clone(),
881 flow_type: entry.kind.clone(),
882 pack_id: manifest.meta.pack_id.clone(),
883 profile: manifest.meta.pack_id.clone(),
884 version: manifest.meta.version.to_string(),
885 description: None,
886 });
887 flows.insert(entry.id.clone(), flow);
888 }
889
890 let mut entry_flows = manifest.meta.entry_flows.clone();
891 if entry_flows.is_empty() {
892 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
893 }
894 let metadata = PackMetadata {
895 pack_id: manifest.meta.pack_id.clone(),
896 version: manifest.meta.version.to_string(),
897 entry_flows,
898 secret_requirements: Vec::new(),
899 };
900
901 Ok(PackFlows {
902 descriptors,
903 flows,
904 metadata,
905 })
906}
907
908fn load_legacy_flows_from_dir(
909 root: &Path,
910 manifest: &legacy_pack::PackManifest,
911) -> Result<PackFlows> {
912 let mut flows = HashMap::new();
913 let mut descriptors = Vec::new();
914
915 for entry in &manifest.flows {
916 let path = root.join(&entry.file_json);
917 let bytes = std::fs::read(&path)
918 .with_context(|| format!("missing flow json {}", path.display()))?;
919 let doc: FlowDoc = serde_json::from_slice(&bytes)
920 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
921 let normalized = normalize_flow_doc(doc);
922 let flow_ir = flow_doc_to_ir(normalized)?;
923 let flow = flow_ir_to_flow(flow_ir)?;
924
925 descriptors.push(FlowDescriptor {
926 id: entry.id.clone(),
927 flow_type: entry.kind.clone(),
928 pack_id: manifest.meta.pack_id.clone(),
929 profile: manifest.meta.pack_id.clone(),
930 version: manifest.meta.version.to_string(),
931 description: None,
932 });
933 flows.insert(entry.id.clone(), flow);
934 }
935
936 let mut entry_flows = manifest.meta.entry_flows.clone();
937 if entry_flows.is_empty() {
938 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
939 }
940 let metadata = PackMetadata {
941 pack_id: manifest.meta.pack_id.clone(),
942 version: manifest.meta.version.to_string(),
943 entry_flows,
944 secret_requirements: Vec::new(),
945 };
946
947 Ok(PackFlows {
948 descriptors,
949 flows,
950 metadata,
951 })
952}
953
954pub struct ComponentState {
955 pub host: HostState,
956 wasi_ctx: WasiCtx,
957 resource_table: ResourceTable,
958}
959
960impl ComponentState {
961 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
962 let wasi_ctx = policy
963 .instantiate()
964 .context("failed to build WASI context")?;
965 Ok(Self {
966 host,
967 wasi_ctx,
968 resource_table: ResourceTable::new(),
969 })
970 }
971
972 fn host_mut(&mut self) -> &mut HostState {
973 &mut self.host
974 }
975
976 fn should_cancel_host(&mut self) -> bool {
977 false
978 }
979
980 fn yield_now_host(&mut self) {
981 }
983}
984
985impl component_api::v0_4::greentic::component::control::Host for ComponentState {
986 fn should_cancel(&mut self) -> bool {
987 self.should_cancel_host()
988 }
989
990 fn yield_now(&mut self) {
991 self.yield_now_host();
992 }
993}
994
995impl component_api::v0_5::greentic::component::control::Host for ComponentState {
996 fn should_cancel(&mut self) -> bool {
997 self.should_cancel_host()
998 }
999
1000 fn yield_now(&mut self) {
1001 self.yield_now_host();
1002 }
1003}
1004
1005fn add_component_control_instance(
1006 linker: &mut Linker<ComponentState>,
1007 name: &str,
1008) -> wasmtime::Result<()> {
1009 let mut inst = linker.instance(name)?;
1010 inst.func_wrap(
1011 "should-cancel",
1012 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1013 let host = caller.data_mut();
1014 Ok((host.should_cancel_host(),))
1015 },
1016 )?;
1017 inst.func_wrap(
1018 "yield-now",
1019 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1020 let host = caller.data_mut();
1021 host.yield_now_host();
1022 Ok(())
1023 },
1024 )?;
1025 Ok(())
1026}
1027
1028fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1029 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1030 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1031 Ok(())
1032}
1033
1034pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1035 add_wasi_to_linker(linker)?;
1036 add_all_v1_to_linker(
1037 linker,
1038 HostFns {
1039 http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1040 http_client: Some(|state: &mut ComponentState| state.host_mut()),
1041 oauth_broker: None,
1042 runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1043 runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1044 telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1045 state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1046 secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1047 secrets_store: None,
1048 },
1049 )?;
1050 Ok(())
1051}
1052
1053impl OAuthHostContext for ComponentState {
1054 fn tenant_id(&self) -> &str {
1055 &self.host.config.tenant
1056 }
1057
1058 fn env(&self) -> &str {
1059 &self.host.default_env
1060 }
1061
1062 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1063 &mut self.host.oauth_host
1064 }
1065
1066 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1067 self.host.oauth_config.as_ref()
1068 }
1069}
1070
1071impl WasiView for ComponentState {
1072 fn ctx(&mut self) -> WasiCtxView<'_> {
1073 WasiCtxView {
1074 ctx: &mut self.wasi_ctx,
1075 table: &mut self.resource_table,
1076 }
1077 }
1078}
1079
1080#[allow(unsafe_code)]
1081unsafe impl Send for ComponentState {}
1082#[allow(unsafe_code)]
1083unsafe impl Sync for ComponentState {}
1084
1085impl PackRuntime {
1086 fn allows_state_store(&self, component_ref: &str) -> bool {
1087 if self.state_store.is_none() {
1088 return false;
1089 }
1090 if !self.config.state_store_policy.allow {
1091 return false;
1092 }
1093 let Some(manifest) = self.component_manifests.get(component_ref) else {
1094 return false;
1095 };
1096 manifest
1097 .capabilities
1098 .host
1099 .state
1100 .as_ref()
1101 .map(|caps| caps.read || caps.write)
1102 .unwrap_or(false)
1103 }
1104
1105 #[allow(clippy::too_many_arguments)]
1106 pub async fn load(
1107 path: impl AsRef<Path>,
1108 config: Arc<HostConfig>,
1109 mocks: Option<Arc<MockLayer>>,
1110 archive_source: Option<&Path>,
1111 session_store: Option<DynSessionStore>,
1112 state_store: Option<DynStateStore>,
1113 wasi_policy: Arc<RunnerWasiPolicy>,
1114 secrets: DynSecretsManager,
1115 oauth_config: Option<OAuthBrokerConfig>,
1116 verify_archive: bool,
1117 component_resolution: ComponentResolution,
1118 ) -> Result<Self> {
1119 let path = path.as_ref();
1120 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1121 let path_meta = std::fs::metadata(&safe_path).ok();
1122 let is_dir = path_meta
1123 .as_ref()
1124 .map(|meta| meta.is_dir())
1125 .unwrap_or(false);
1126 let is_component = !is_dir
1127 && safe_path
1128 .extension()
1129 .and_then(|ext| ext.to_str())
1130 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1131 .unwrap_or(false);
1132 let archive_hint_path = if let Some(source) = archive_source {
1133 let (_, normalized) = normalize_pack_path(source)?;
1134 Some(normalized)
1135 } else if is_component || is_dir {
1136 None
1137 } else {
1138 Some(safe_path.clone())
1139 };
1140 let archive_hint = archive_hint_path.as_deref();
1141 if verify_archive {
1142 if let Some(verify_target) = archive_hint.and_then(|p| {
1143 std::fs::metadata(p)
1144 .ok()
1145 .filter(|meta| meta.is_file())
1146 .map(|_| p)
1147 }) {
1148 verify::verify_pack(verify_target).await?;
1149 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1150 } else {
1151 tracing::debug!("skipping archive verification (no archive source)");
1152 }
1153 }
1154 let engine = Engine::default();
1155 let engine_profile =
1156 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1157 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1158 let mut metadata = PackMetadata::fallback(&safe_path);
1159 let mut manifest = None;
1160 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1161 let mut flows = None;
1162 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1163 if is_dir {
1164 Some(safe_path.clone())
1165 } else {
1166 None
1167 }
1168 });
1169
1170 if let Some(root) = materialized_root.as_ref() {
1171 match load_manifest_and_flows_from_dir(root) {
1172 Ok(ManifestLoad::New {
1173 manifest: m,
1174 flows: cache,
1175 }) => {
1176 metadata = cache.metadata.clone();
1177 manifest = Some(*m);
1178 flows = Some(cache);
1179 }
1180 Ok(ManifestLoad::Legacy {
1181 manifest: m,
1182 flows: cache,
1183 }) => {
1184 metadata = cache.metadata.clone();
1185 legacy_manifest = Some(m);
1186 flows = Some(cache);
1187 }
1188 Err(err) => {
1189 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1190 }
1191 }
1192 }
1193
1194 if manifest.is_none()
1195 && legacy_manifest.is_none()
1196 && let Some(archive_path) = archive_hint
1197 {
1198 match load_manifest_and_flows(archive_path) {
1199 Ok(ManifestLoad::New {
1200 manifest: m,
1201 flows: cache,
1202 }) => {
1203 metadata = cache.metadata.clone();
1204 manifest = Some(*m);
1205 flows = Some(cache);
1206 }
1207 Ok(ManifestLoad::Legacy {
1208 manifest: m,
1209 flows: cache,
1210 }) => {
1211 metadata = cache.metadata.clone();
1212 legacy_manifest = Some(m);
1213 flows = Some(cache);
1214 }
1215 Err(err) => {
1216 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1217 }
1218 }
1219 }
1220 #[cfg(feature = "fault-injection")]
1221 {
1222 let fault_ctx = FaultContext {
1223 pack_id: metadata.pack_id.as_str(),
1224 flow_id: "unknown",
1225 node_id: None,
1226 attempt: 1,
1227 };
1228 maybe_fail(FaultPoint::PackResolve, fault_ctx)
1229 .map_err(|err| anyhow!(err.to_string()))?;
1230 }
1231 let mut pack_lock = None;
1232 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1233 pack_lock = load_pack_lock(&root)?;
1234 if pack_lock.is_some() {
1235 break;
1236 }
1237 }
1238 let component_sources_payload = if pack_lock.is_none() {
1239 if let Some(manifest) = manifest.as_ref() {
1240 manifest
1241 .get_component_sources_v1()
1242 .context("invalid component sources extension")?
1243 } else {
1244 None
1245 }
1246 } else {
1247 None
1248 };
1249 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1250 Some(component_sources_table_from_pack_lock(
1251 lock,
1252 component_resolution.allow_missing_hash,
1253 )?)
1254 } else {
1255 component_sources_table(component_sources_payload.as_ref())?
1256 };
1257 let components = if is_component {
1258 let wasm_bytes = fs::read(&safe_path).await?;
1259 metadata = PackMetadata::from_wasm(&wasm_bytes)
1260 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1261 let name = safe_path
1262 .file_stem()
1263 .map(|s| s.to_string_lossy().to_string())
1264 .unwrap_or_else(|| "component".to_string());
1265 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1266 let mut map = HashMap::new();
1267 map.insert(
1268 name.clone(),
1269 PackComponent {
1270 name,
1271 version: metadata.version.clone(),
1272 component,
1273 },
1274 );
1275 map
1276 } else {
1277 let specs = component_specs(
1278 manifest.as_ref(),
1279 legacy_manifest.as_deref(),
1280 component_sources_payload.as_ref(),
1281 pack_lock.as_ref(),
1282 );
1283 if specs.is_empty() {
1284 HashMap::new()
1285 } else {
1286 let mut loaded = HashMap::new();
1287 let mut missing: HashSet<String> =
1288 specs.iter().map(|spec| spec.id.clone()).collect();
1289 let mut searched = Vec::new();
1290
1291 if !component_resolution.overrides.is_empty() {
1292 load_components_from_overrides(
1293 &cache,
1294 &engine,
1295 &component_resolution.overrides,
1296 &specs,
1297 &mut missing,
1298 &mut loaded,
1299 )
1300 .await?;
1301 searched.push("override map".to_string());
1302 }
1303
1304 if let Some(component_sources) = component_sources.as_ref() {
1305 load_components_from_sources(
1306 &cache,
1307 &engine,
1308 component_sources,
1309 &component_resolution,
1310 &specs,
1311 &mut missing,
1312 &mut loaded,
1313 materialized_root.as_deref(),
1314 archive_hint,
1315 )
1316 .await?;
1317 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1318 }
1319
1320 if let Some(root) = materialized_root.as_ref() {
1321 load_components_from_dir(
1322 &cache,
1323 &engine,
1324 root,
1325 &specs,
1326 &mut missing,
1327 &mut loaded,
1328 )
1329 .await?;
1330 searched.push(format!("components dir {}", root.display()));
1331 }
1332
1333 if let Some(archive_path) = archive_hint {
1334 load_components_from_archive(
1335 &cache,
1336 &engine,
1337 archive_path,
1338 &specs,
1339 &mut missing,
1340 &mut loaded,
1341 )
1342 .await?;
1343 searched.push(format!("archive {}", archive_path.display()));
1344 }
1345
1346 if !missing.is_empty() {
1347 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1348 let sources = if searched.is_empty() {
1349 "no component sources".to_string()
1350 } else {
1351 searched.join(", ")
1352 };
1353 bail!(
1354 "components missing: {}; looked in {}",
1355 missing_list,
1356 sources
1357 );
1358 }
1359
1360 loaded
1361 }
1362 };
1363 let http_client = Arc::clone(&HTTP_CLIENT);
1364 let mut component_manifests = HashMap::new();
1365 if let Some(manifest) = manifest.as_ref() {
1366 for component in &manifest.components {
1367 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1368 }
1369 }
1370 Ok(Self {
1371 path: safe_path,
1372 archive_path: archive_hint.map(Path::to_path_buf),
1373 config,
1374 engine,
1375 metadata,
1376 manifest,
1377 legacy_manifest,
1378 component_manifests,
1379 mocks,
1380 flows,
1381 components,
1382 http_client,
1383 pre_cache: Mutex::new(HashMap::new()),
1384 session_store,
1385 state_store,
1386 wasi_policy,
1387 provider_registry: RwLock::new(None),
1388 secrets,
1389 oauth_config,
1390 cache,
1391 })
1392 }
1393
1394 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1395 if let Some(cache) = &self.flows {
1396 return Ok(cache.descriptors.clone());
1397 }
1398 if let Some(manifest) = &self.manifest {
1399 let descriptors = manifest
1400 .flows
1401 .iter()
1402 .map(|flow| FlowDescriptor {
1403 id: flow.id.as_str().to_string(),
1404 flow_type: flow_kind_to_str(flow.kind).to_string(),
1405 pack_id: manifest.pack_id.as_str().to_string(),
1406 profile: manifest.pack_id.as_str().to_string(),
1407 version: manifest.version.to_string(),
1408 description: None,
1409 })
1410 .collect();
1411 return Ok(descriptors);
1412 }
1413 Ok(Vec::new())
1414 }
1415
1416 #[allow(dead_code)]
1417 pub async fn run_flow(
1418 &self,
1419 flow_id: &str,
1420 input: serde_json::Value,
1421 ) -> Result<serde_json::Value> {
1422 let pack = Arc::new(
1423 PackRuntime::load(
1424 &self.path,
1425 Arc::clone(&self.config),
1426 self.mocks.clone(),
1427 self.archive_path.as_deref(),
1428 self.session_store.clone(),
1429 self.state_store.clone(),
1430 Arc::clone(&self.wasi_policy),
1431 self.secrets.clone(),
1432 self.oauth_config.clone(),
1433 false,
1434 ComponentResolution::default(),
1435 )
1436 .await?,
1437 );
1438
1439 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1440 let retry_config = self.config.retry_config().into();
1441 let mocks = pack.mocks.as_deref();
1442 let tenant = self.config.tenant.as_str();
1443
1444 let ctx = FlowContext {
1445 tenant,
1446 pack_id: pack.metadata().pack_id.as_str(),
1447 flow_id,
1448 node_id: None,
1449 tool: None,
1450 action: None,
1451 session_id: None,
1452 provider_id: None,
1453 retry_config,
1454 attempt: 1,
1455 observer: None,
1456 mocks,
1457 };
1458
1459 let execution = engine.execute(ctx, input).await?;
1460 match execution.status {
1461 FlowStatus::Completed => Ok(execution.output),
1462 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1463 "status": "pending",
1464 "reason": wait.reason,
1465 "resume": wait.snapshot,
1466 "response": execution.output,
1467 })),
1468 }
1469 }
1470
1471 pub async fn invoke_component(
1472 &self,
1473 component_ref: &str,
1474 ctx: ComponentExecCtx,
1475 operation: &str,
1476 _config_json: Option<String>,
1477 input_json: String,
1478 ) -> Result<Value> {
1479 let pack_component = self
1480 .components
1481 .get(component_ref)
1482 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1483
1484 let mut linker = Linker::new(&self.engine);
1485 let allow_state_store = self.allows_state_store(component_ref);
1486 register_all(&mut linker, allow_state_store)?;
1487 add_component_control_to_linker(&mut linker)?;
1488
1489 let host_state = HostState::new(
1490 self.metadata().pack_id.clone(),
1491 Arc::clone(&self.config),
1492 Arc::clone(&self.http_client),
1493 self.mocks.clone(),
1494 self.session_store.clone(),
1495 self.state_store.clone(),
1496 Arc::clone(&self.secrets),
1497 self.oauth_config.clone(),
1498 Some(ctx.clone()),
1499 Some(component_ref.to_string()),
1500 false,
1501 )?;
1502 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1503 let mut store = wasmtime::Store::new(&self.engine, store_state);
1504 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1505 let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1506 Ok(pre) => {
1507 let bindings = pre.instantiate_async(&mut store).await?;
1508 let node = bindings.greentic_component_node();
1509 let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1510 let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1511 component_api::invoke_result_from_v0_5(result)
1512 }
1513 Err(err) => {
1514 if is_missing_node_export(&err, "0.5.0") {
1515 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1516 let pre: component_api::v0_4::ComponentPre<ComponentState> =
1517 component_api::v0_4::ComponentPre::new(pre_instance)?;
1518 let bindings = pre.instantiate_async(&mut store).await?;
1519 let node = bindings.greentic_component_node();
1520 let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1521 let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1522 component_api::invoke_result_from_v0_4(result)
1523 } else {
1524 return Err(err);
1525 }
1526 }
1527 };
1528
1529 match result {
1530 InvokeResult::Ok(body) => {
1531 if body.is_empty() {
1532 return Ok(Value::Null);
1533 }
1534 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1535 }
1536 InvokeResult::Err(NodeError {
1537 code,
1538 message,
1539 retryable,
1540 backoff_ms,
1541 details,
1542 }) => {
1543 let mut obj = serde_json::Map::new();
1544 obj.insert("ok".into(), Value::Bool(false));
1545 let mut error = serde_json::Map::new();
1546 error.insert("code".into(), Value::String(code));
1547 error.insert("message".into(), Value::String(message));
1548 error.insert("retryable".into(), Value::Bool(retryable));
1549 if let Some(backoff) = backoff_ms {
1550 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1551 }
1552 if let Some(details) = details {
1553 error.insert(
1554 "details".into(),
1555 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1556 );
1557 }
1558 obj.insert("error".into(), Value::Object(error));
1559 Ok(Value::Object(obj))
1560 }
1561 }
1562 }
1563
1564 pub fn resolve_provider(
1565 &self,
1566 provider_id: Option<&str>,
1567 provider_type: Option<&str>,
1568 ) -> Result<ProviderBinding> {
1569 let registry = self.provider_registry()?;
1570 registry.resolve(provider_id, provider_type)
1571 }
1572
1573 pub async fn invoke_provider(
1574 &self,
1575 binding: &ProviderBinding,
1576 ctx: ComponentExecCtx,
1577 op: &str,
1578 input_json: Vec<u8>,
1579 ) -> Result<Value> {
1580 let component_ref = &binding.component_ref;
1581 let pack_component = self
1582 .components
1583 .get(component_ref)
1584 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1585
1586 let mut linker = Linker::new(&self.engine);
1587 let allow_state_store = self.allows_state_store(component_ref);
1588 register_all(&mut linker, allow_state_store)?;
1589 add_component_control_to_linker(&mut linker)?;
1590 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1591 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1592
1593 let host_state = HostState::new(
1594 self.metadata().pack_id.clone(),
1595 Arc::clone(&self.config),
1596 Arc::clone(&self.http_client),
1597 self.mocks.clone(),
1598 self.session_store.clone(),
1599 self.state_store.clone(),
1600 Arc::clone(&self.secrets),
1601 self.oauth_config.clone(),
1602 Some(ctx),
1603 Some(component_ref.to_string()),
1604 true,
1605 )?;
1606 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1607 let mut store = wasmtime::Store::new(&self.engine, store_state);
1608 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1609 let provider = bindings.greentic_provider_core_schema_core_api();
1610
1611 let result = provider.call_invoke(&mut store, op, &input_json)?;
1612 deserialize_json_bytes(result)
1613 }
1614
1615 fn provider_registry(&self) -> Result<ProviderRegistry> {
1616 if let Some(registry) = self.provider_registry.read().clone() {
1617 return Ok(registry);
1618 }
1619 let manifest = self
1620 .manifest
1621 .as_ref()
1622 .context("pack manifest required for provider resolution")?;
1623 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1624 let registry = ProviderRegistry::new(
1625 manifest,
1626 self.state_store.clone(),
1627 &self.config.tenant,
1628 &env,
1629 )?;
1630 *self.provider_registry.write() = Some(registry.clone());
1631 Ok(registry)
1632 }
1633
1634 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1635 if let Some(cache) = &self.flows {
1636 return cache
1637 .flows
1638 .get(flow_id)
1639 .cloned()
1640 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1641 }
1642 if let Some(manifest) = &self.manifest {
1643 let entry = manifest
1644 .flows
1645 .iter()
1646 .find(|f| f.id.as_str() == flow_id)
1647 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1648 return Ok(entry.flow.clone());
1649 }
1650 bail!("flow '{flow_id}' not available (pack exports disabled)")
1651 }
1652
1653 pub fn metadata(&self) -> &PackMetadata {
1654 &self.metadata
1655 }
1656
1657 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1658 &self.metadata.secret_requirements
1659 }
1660
1661 pub fn missing_secrets(
1662 &self,
1663 tenant_ctx: &TypesTenantCtx,
1664 ) -> Vec<greentic_types::SecretRequirement> {
1665 let env = tenant_ctx.env.as_str().to_string();
1666 let tenant = tenant_ctx.tenant.as_str().to_string();
1667 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1668 self.required_secrets()
1669 .iter()
1670 .filter(|req| {
1671 if let Some(scope) = &req.scope {
1673 if scope.env != env {
1674 return false;
1675 }
1676 if scope.tenant != tenant {
1677 return false;
1678 }
1679 if let Some(ref team_req) = scope.team
1680 && team.as_ref() != Some(team_req)
1681 {
1682 return false;
1683 }
1684 }
1685 let ctx = self.config.tenant_ctx();
1686 read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1687 })
1688 .cloned()
1689 .collect()
1690 }
1691
1692 pub fn for_component_test(
1693 components: Vec<(String, PathBuf)>,
1694 flows: HashMap<String, FlowIR>,
1695 pack_id: &str,
1696 config: Arc<HostConfig>,
1697 ) -> Result<Self> {
1698 let engine = Engine::default();
1699 let engine_profile =
1700 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1701 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1702 let mut component_map = HashMap::new();
1703 for (name, path) in components {
1704 if !path.exists() {
1705 bail!("component artifact missing: {}", path.display());
1706 }
1707 let wasm_bytes = std::fs::read(&path)?;
1708 let component = Arc::new(
1709 Component::from_binary(&engine, &wasm_bytes)
1710 .with_context(|| format!("failed to compile component {}", path.display()))?,
1711 );
1712 component_map.insert(
1713 name.clone(),
1714 PackComponent {
1715 name,
1716 version: "0.0.0".into(),
1717 component,
1718 },
1719 );
1720 }
1721
1722 let mut flow_map = HashMap::new();
1723 let mut descriptors = Vec::new();
1724 for (id, ir) in flows {
1725 let flow_type = ir.flow_type.clone();
1726 let flow = flow_ir_to_flow(ir)?;
1727 flow_map.insert(id.clone(), flow);
1728 descriptors.push(FlowDescriptor {
1729 id: id.clone(),
1730 flow_type,
1731 pack_id: pack_id.to_string(),
1732 profile: "test".into(),
1733 version: "0.0.0".into(),
1734 description: None,
1735 });
1736 }
1737 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1738 let metadata = PackMetadata {
1739 pack_id: pack_id.to_string(),
1740 version: "0.0.0".into(),
1741 entry_flows,
1742 secret_requirements: Vec::new(),
1743 };
1744 let flows_cache = PackFlows {
1745 descriptors: descriptors.clone(),
1746 flows: flow_map,
1747 metadata: metadata.clone(),
1748 };
1749
1750 Ok(Self {
1751 path: PathBuf::new(),
1752 archive_path: None,
1753 config,
1754 engine,
1755 metadata,
1756 manifest: None,
1757 legacy_manifest: None,
1758 component_manifests: HashMap::new(),
1759 mocks: None,
1760 flows: Some(flows_cache),
1761 components: component_map,
1762 http_client: Arc::clone(&HTTP_CLIENT),
1763 pre_cache: Mutex::new(HashMap::new()),
1764 session_store: None,
1765 state_store: None,
1766 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1767 provider_registry: RwLock::new(None),
1768 secrets: crate::secrets::default_manager()?,
1769 oauth_config: None,
1770 cache,
1771 })
1772 }
1773}
1774
1775fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1776 let message = err.to_string();
1777 message.contains("no exported instance named")
1778 && message.contains(&format!("greentic:component/node@{version}"))
1779}
1780
1781struct PackFlows {
1782 descriptors: Vec<FlowDescriptor>,
1783 flows: HashMap<String, Flow>,
1784 metadata: PackMetadata,
1785}
1786
1787const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1788 "greentic.pack.runtime_flow",
1789 "greentic.pack.flow_runtime",
1790 "greentic.pack.runtime_flows",
1791];
1792
1793#[derive(Debug, Deserialize)]
1794struct RuntimeFlowBundle {
1795 flows: Vec<RuntimeFlow>,
1796}
1797
1798#[derive(Debug, Deserialize)]
1799struct RuntimeFlow {
1800 id: String,
1801 #[serde(alias = "flow_type")]
1802 kind: FlowKind,
1803 #[serde(default)]
1804 schema_version: Option<String>,
1805 #[serde(default)]
1806 start: Option<String>,
1807 #[serde(default)]
1808 entrypoints: BTreeMap<String, Value>,
1809 nodes: BTreeMap<String, RuntimeNode>,
1810 #[serde(default)]
1811 metadata: Option<FlowMetadata>,
1812}
1813
1814#[derive(Debug, Deserialize)]
1815struct RuntimeNode {
1816 #[serde(alias = "component")]
1817 component_id: String,
1818 #[serde(default, alias = "operation")]
1819 operation_name: Option<String>,
1820 #[serde(default, alias = "payload", alias = "input")]
1821 operation_payload: Value,
1822 #[serde(default)]
1823 routing: Option<Routing>,
1824 #[serde(default)]
1825 telemetry: Option<TelemetryHints>,
1826}
1827
1828fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1829 if bytes.is_empty() {
1830 return Ok(Value::Null);
1831 }
1832 serde_json::from_slice(&bytes).or_else(|_| {
1833 String::from_utf8(bytes)
1834 .map(Value::String)
1835 .map_err(|err| anyhow!(err))
1836 })
1837}
1838
1839impl PackFlows {
1840 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1841 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1842 return flows;
1843 }
1844 let descriptors = manifest
1845 .flows
1846 .iter()
1847 .map(|entry| FlowDescriptor {
1848 id: entry.id.as_str().to_string(),
1849 flow_type: flow_kind_to_str(entry.kind).to_string(),
1850 pack_id: manifest.pack_id.as_str().to_string(),
1851 profile: manifest.pack_id.as_str().to_string(),
1852 version: manifest.version.to_string(),
1853 description: None,
1854 })
1855 .collect();
1856 let mut flows = HashMap::new();
1857 for entry in &manifest.flows {
1858 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1859 }
1860 Self {
1861 metadata: PackMetadata::from_manifest(&manifest),
1862 descriptors,
1863 flows,
1864 }
1865 }
1866}
1867
1868fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1869 let extensions = manifest.extensions.as_ref()?;
1870 let extension = extensions.iter().find_map(|(key, ext)| {
1871 if RUNTIME_FLOW_EXTENSION_IDS
1872 .iter()
1873 .any(|candidate| candidate == key)
1874 {
1875 Some(ext)
1876 } else {
1877 None
1878 }
1879 })?;
1880 let runtime_flows = match decode_runtime_flow_extension(extension) {
1881 Some(flows) if !flows.is_empty() => flows,
1882 _ => return None,
1883 };
1884
1885 let descriptors = runtime_flows
1886 .iter()
1887 .map(|flow| FlowDescriptor {
1888 id: flow.id.as_str().to_string(),
1889 flow_type: flow_kind_to_str(flow.kind).to_string(),
1890 pack_id: manifest.pack_id.as_str().to_string(),
1891 profile: manifest.pack_id.as_str().to_string(),
1892 version: manifest.version.to_string(),
1893 description: None,
1894 })
1895 .collect::<Vec<_>>();
1896 let flows = runtime_flows
1897 .into_iter()
1898 .map(|flow| (flow.id.as_str().to_string(), flow))
1899 .collect();
1900
1901 Some(PackFlows {
1902 metadata: PackMetadata::from_manifest(manifest),
1903 descriptors,
1904 flows,
1905 })
1906}
1907
1908fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1909 let value = match extension.inline.as_ref()? {
1910 ExtensionInline::Other(value) => value.clone(),
1911 _ => return None,
1912 };
1913
1914 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1915 return Some(collect_runtime_flows(bundle.flows));
1916 }
1917
1918 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1919 return Some(collect_runtime_flows(flows));
1920 }
1921
1922 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1923 return Some(flows);
1924 }
1925
1926 warn!(
1927 extension = %extension.kind,
1928 version = %extension.version,
1929 "runtime flow extension present but could not be decoded"
1930 );
1931 None
1932}
1933
1934fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1935 flows
1936 .into_iter()
1937 .filter_map(|flow| match runtime_flow_to_flow(flow) {
1938 Ok(flow) => Some(flow),
1939 Err(err) => {
1940 warn!(error = %err, "failed to decode runtime flow");
1941 None
1942 }
1943 })
1944 .collect()
1945}
1946
1947fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1948 let flow_id = FlowId::from_str(&runtime.id)
1949 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1950 let mut entrypoints = runtime.entrypoints;
1951 if entrypoints.is_empty()
1952 && let Some(start) = &runtime.start
1953 {
1954 entrypoints.insert("default".into(), Value::String(start.clone()));
1955 }
1956
1957 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1958 for (id, node) in runtime.nodes {
1959 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1960 let component_id = ComponentId::from_str(&node.component_id)
1961 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1962 let component = FlowComponentRef {
1963 id: component_id,
1964 pack_alias: None,
1965 operation: node.operation_name,
1966 };
1967 let routing = node.routing.unwrap_or(Routing::End);
1968 let telemetry = node.telemetry.unwrap_or_default();
1969 nodes.insert(
1970 node_id.clone(),
1971 Node {
1972 id: node_id,
1973 component,
1974 input: InputMapping {
1975 mapping: node.operation_payload,
1976 },
1977 output: OutputMapping {
1978 mapping: Value::Null,
1979 },
1980 routing,
1981 telemetry,
1982 },
1983 );
1984 }
1985
1986 Ok(Flow {
1987 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1988 id: flow_id,
1989 kind: runtime.kind,
1990 entrypoints,
1991 nodes,
1992 metadata: runtime.metadata.unwrap_or_default(),
1993 })
1994}
1995
1996fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1997 match kind {
1998 greentic_types::FlowKind::Messaging => "messaging",
1999 greentic_types::FlowKind::Event => "event",
2000 greentic_types::FlowKind::ComponentConfig => "component-config",
2001 greentic_types::FlowKind::Job => "job",
2002 greentic_types::FlowKind::Http => "http",
2003 }
2004}
2005
2006fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
2007 let mut file = archive
2008 .by_name(name)
2009 .with_context(|| format!("entry {name} missing from archive"))?;
2010 let mut buf = Vec::new();
2011 file.read_to_end(&mut buf)?;
2012 Ok(buf)
2013}
2014
2015fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
2016 for node in doc.nodes.values_mut() {
2017 let Some((component_ref, payload)) = node
2018 .raw
2019 .iter()
2020 .next()
2021 .map(|(key, value)| (key.clone(), value.clone()))
2022 else {
2023 continue;
2024 };
2025 if component_ref.starts_with("emit.") {
2026 node.operation = Some(component_ref);
2027 node.payload = payload;
2028 node.raw.clear();
2029 continue;
2030 }
2031 let (target_component, operation, input, config) =
2032 infer_component_exec(&payload, &component_ref);
2033 let mut payload_obj = serde_json::Map::new();
2034 payload_obj.insert("component".into(), Value::String(target_component));
2036 payload_obj.insert("operation".into(), Value::String(operation));
2037 payload_obj.insert("input".into(), input);
2038 if let Some(cfg) = config {
2039 payload_obj.insert("config".into(), cfg);
2040 }
2041 node.operation = Some("component.exec".to_string());
2042 node.payload = Value::Object(payload_obj);
2043 node.raw.clear();
2044 }
2045 doc
2046}
2047
2048fn infer_component_exec(
2049 payload: &Value,
2050 component_ref: &str,
2051) -> (String, String, Value, Option<Value>) {
2052 let default_op = if component_ref.starts_with("templating.") {
2053 "render"
2054 } else {
2055 "invoke"
2056 }
2057 .to_string();
2058
2059 if let Value::Object(map) = payload {
2060 let op = map
2061 .get("op")
2062 .or_else(|| map.get("operation"))
2063 .and_then(Value::as_str)
2064 .map(|s| s.to_string())
2065 .unwrap_or_else(|| default_op.clone());
2066
2067 let mut input = map.clone();
2068 let config = input.remove("config");
2069 let component = input
2070 .get("component")
2071 .or_else(|| input.get("component_ref"))
2072 .and_then(Value::as_str)
2073 .map(|s| s.to_string())
2074 .unwrap_or_else(|| component_ref.to_string());
2075 input.remove("component");
2076 input.remove("component_ref");
2077 input.remove("op");
2078 input.remove("operation");
2079 return (component, op, Value::Object(input), config);
2080 }
2081
2082 (component_ref.to_string(), default_op, payload.clone(), None)
2083}
2084
2085#[derive(Clone, Debug)]
2086struct ComponentSpec {
2087 id: String,
2088 version: String,
2089 legacy_path: Option<String>,
2090}
2091
2092#[derive(Clone, Debug)]
2093struct ComponentSourceInfo {
2094 digest: Option<String>,
2095 source: ComponentSourceRef,
2096 artifact: ComponentArtifactLocation,
2097 expected_wasm_sha256: Option<String>,
2098 skip_digest_verification: bool,
2099}
2100
2101#[derive(Clone, Debug)]
2102enum ComponentArtifactLocation {
2103 Inline { wasm_path: String },
2104 Remote,
2105}
2106
2107#[derive(Clone, Debug, Deserialize)]
2108struct PackLockV1 {
2109 schema_version: u32,
2110 components: Vec<PackLockComponent>,
2111}
2112
2113#[derive(Clone, Debug, Deserialize)]
2114struct PackLockComponent {
2115 name: String,
2116 #[serde(default, rename = "source_ref")]
2117 source_ref: Option<String>,
2118 #[serde(default, rename = "ref")]
2119 legacy_ref: Option<String>,
2120 #[serde(default)]
2121 component_id: Option<ComponentId>,
2122 #[serde(default)]
2123 bundled: Option<bool>,
2124 #[serde(default, rename = "bundled_path")]
2125 bundled_path: Option<String>,
2126 #[serde(default, rename = "path")]
2127 legacy_path: Option<String>,
2128 #[serde(default)]
2129 wasm_sha256: Option<String>,
2130 #[serde(default, rename = "sha256")]
2131 legacy_sha256: Option<String>,
2132 #[serde(default)]
2133 resolved_digest: Option<String>,
2134 #[serde(default)]
2135 digest: Option<String>,
2136}
2137
2138fn component_specs(
2139 manifest: Option<&greentic_types::PackManifest>,
2140 legacy_manifest: Option<&legacy_pack::PackManifest>,
2141 component_sources: Option<&ComponentSourcesV1>,
2142 pack_lock: Option<&PackLockV1>,
2143) -> Vec<ComponentSpec> {
2144 if let Some(manifest) = manifest {
2145 if !manifest.components.is_empty() {
2146 return manifest
2147 .components
2148 .iter()
2149 .map(|entry| ComponentSpec {
2150 id: entry.id.as_str().to_string(),
2151 version: entry.version.to_string(),
2152 legacy_path: None,
2153 })
2154 .collect();
2155 }
2156 if let Some(lock) = pack_lock {
2157 let mut seen = HashSet::new();
2158 let mut specs = Vec::new();
2159 for entry in &lock.components {
2160 let id = entry
2161 .component_id
2162 .as_ref()
2163 .map(|id| id.as_str())
2164 .unwrap_or(entry.name.as_str());
2165 if seen.insert(id.to_string()) {
2166 specs.push(ComponentSpec {
2167 id: id.to_string(),
2168 version: "0.0.0".to_string(),
2169 legacy_path: None,
2170 });
2171 }
2172 }
2173 return specs;
2174 }
2175 if let Some(sources) = component_sources {
2176 let mut seen = HashSet::new();
2177 let mut specs = Vec::new();
2178 for entry in &sources.components {
2179 let id = entry
2180 .component_id
2181 .as_ref()
2182 .map(|id| id.as_str())
2183 .unwrap_or(entry.name.as_str());
2184 if seen.insert(id.to_string()) {
2185 specs.push(ComponentSpec {
2186 id: id.to_string(),
2187 version: "0.0.0".to_string(),
2188 legacy_path: None,
2189 });
2190 }
2191 }
2192 return specs;
2193 }
2194 }
2195 if let Some(legacy_manifest) = legacy_manifest {
2196 return legacy_manifest
2197 .components
2198 .iter()
2199 .map(|entry| ComponentSpec {
2200 id: entry.name.clone(),
2201 version: entry.version.to_string(),
2202 legacy_path: Some(entry.file_wasm.clone()),
2203 })
2204 .collect();
2205 }
2206 Vec::new()
2207}
2208
2209fn component_sources_table(
2210 sources: Option<&ComponentSourcesV1>,
2211) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2212 let Some(sources) = sources else {
2213 return Ok(None);
2214 };
2215 let mut table = HashMap::new();
2216 for entry in &sources.components {
2217 let artifact = match &entry.artifact {
2218 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2219 wasm_path: wasm_path.clone(),
2220 },
2221 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2222 };
2223 let info = ComponentSourceInfo {
2224 digest: Some(entry.resolved.digest.clone()),
2225 source: entry.source.clone(),
2226 artifact,
2227 expected_wasm_sha256: None,
2228 skip_digest_verification: false,
2229 };
2230 if let Some(component_id) = entry.component_id.as_ref() {
2231 table.insert(component_id.as_str().to_string(), info.clone());
2232 }
2233 table.insert(entry.name.clone(), info);
2234 }
2235 Ok(Some(table))
2236}
2237
2238fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2239 let lock_path = if path.is_dir() {
2240 let candidate = path.join("pack.lock");
2241 if candidate.exists() {
2242 Some(candidate)
2243 } else {
2244 let candidate = path.join("pack.lock.json");
2245 candidate.exists().then_some(candidate)
2246 }
2247 } else {
2248 None
2249 };
2250 let Some(lock_path) = lock_path else {
2251 return Ok(None);
2252 };
2253 let raw = std::fs::read_to_string(&lock_path)
2254 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2255 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2256 if lock.schema_version != 1 {
2257 bail!("pack.lock schema_version must be 1");
2258 }
2259 Ok(Some(lock))
2260}
2261
2262fn find_pack_lock_roots(
2263 pack_path: &Path,
2264 is_dir: bool,
2265 archive_hint: Option<&Path>,
2266) -> Vec<PathBuf> {
2267 if is_dir {
2268 return vec![pack_path.to_path_buf()];
2269 }
2270 let mut roots = Vec::new();
2271 if let Some(archive_path) = archive_hint {
2272 if let Some(parent) = archive_path.parent() {
2273 roots.push(parent.to_path_buf());
2274 if let Some(grandparent) = parent.parent() {
2275 roots.push(grandparent.to_path_buf());
2276 }
2277 }
2278 } else if let Some(parent) = pack_path.parent() {
2279 roots.push(parent.to_path_buf());
2280 if let Some(grandparent) = parent.parent() {
2281 roots.push(grandparent.to_path_buf());
2282 }
2283 }
2284 roots
2285}
2286
2287fn normalize_sha256(digest: &str) -> Result<String> {
2288 let trimmed = digest.trim();
2289 if trimmed.is_empty() {
2290 bail!("sha256 digest cannot be empty");
2291 }
2292 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2293 if stripped.is_empty() {
2294 bail!("sha256 digest must include hex bytes after sha256:");
2295 }
2296 return Ok(trimmed.to_string());
2297 }
2298 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2299 return Ok(format!("sha256:{trimmed}"));
2300 }
2301 bail!("sha256 digest must be hex or sha256:<hex>");
2302}
2303
2304fn component_sources_table_from_pack_lock(
2305 lock: &PackLockV1,
2306 allow_missing_hash: bool,
2307) -> Result<HashMap<String, ComponentSourceInfo>> {
2308 let mut table = HashMap::new();
2309 let mut names = HashSet::new();
2310 for entry in &lock.components {
2311 if !names.insert(entry.name.clone()) {
2312 bail!(
2313 "pack.lock contains duplicate component name `{}`",
2314 entry.name
2315 );
2316 }
2317 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2318 (Some(primary), Some(legacy)) => {
2319 if primary != legacy {
2320 bail!(
2321 "pack.lock component {} has conflicting refs: {} vs {}",
2322 entry.name,
2323 primary,
2324 legacy
2325 );
2326 }
2327 primary.as_str()
2328 }
2329 (Some(primary), None) => primary.as_str(),
2330 (None, Some(legacy)) => legacy.as_str(),
2331 (None, None) => {
2332 bail!("pack.lock component {} missing source_ref", entry.name);
2333 }
2334 };
2335 let source: ComponentSourceRef = source_ref
2336 .parse()
2337 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2338 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2339 (Some(primary), Some(legacy)) => {
2340 if primary != legacy {
2341 bail!(
2342 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2343 entry.name,
2344 primary,
2345 legacy
2346 );
2347 }
2348 Some(primary.clone())
2349 }
2350 (Some(primary), None) => Some(primary.clone()),
2351 (None, Some(legacy)) => Some(legacy.clone()),
2352 (None, None) => None,
2353 };
2354 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2355 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2356 let wasm_path = bundled_path.ok_or_else(|| {
2357 anyhow!(
2358 "pack.lock component {} marked bundled but bundled_path is missing",
2359 entry.name
2360 )
2361 })?;
2362 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2363 (Some(primary), Some(legacy)) => {
2364 if primary != legacy {
2365 bail!(
2366 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2367 entry.name,
2368 primary,
2369 legacy
2370 );
2371 }
2372 Some(primary.as_str())
2373 }
2374 (Some(primary), None) => Some(primary.as_str()),
2375 (None, Some(legacy)) => Some(legacy.as_str()),
2376 (None, None) => None,
2377 };
2378 let expected = match expected_raw {
2379 Some(value) => Some(normalize_sha256(value)?),
2380 None => None,
2381 };
2382 if expected.is_none() && !allow_missing_hash {
2383 bail!(
2384 "pack.lock component {} missing wasm_sha256 for bundled component",
2385 entry.name
2386 );
2387 }
2388 (
2389 ComponentArtifactLocation::Inline { wasm_path },
2390 expected.clone(),
2391 expected,
2392 allow_missing_hash && expected_raw.is_none(),
2393 )
2394 } else {
2395 if source.is_tag() {
2396 bail!(
2397 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2398 entry.name,
2399 source
2400 );
2401 }
2402 let expected = entry
2403 .resolved_digest
2404 .as_deref()
2405 .or(entry.digest.as_deref())
2406 .ok_or_else(|| {
2407 anyhow!(
2408 "pack.lock component {} missing resolved_digest for remote component",
2409 entry.name
2410 )
2411 })?;
2412 (
2413 ComponentArtifactLocation::Remote,
2414 Some(normalize_digest(expected)),
2415 None,
2416 false,
2417 )
2418 };
2419 let info = ComponentSourceInfo {
2420 digest,
2421 source,
2422 artifact,
2423 expected_wasm_sha256,
2424 skip_digest_verification,
2425 };
2426 if let Some(component_id) = entry.component_id.as_ref() {
2427 let key = component_id.as_str().to_string();
2428 if table.contains_key(&key) {
2429 bail!(
2430 "pack.lock contains duplicate component id `{}`",
2431 component_id.as_str()
2432 );
2433 }
2434 table.insert(key, info.clone());
2435 }
2436 if entry.name
2437 != entry
2438 .component_id
2439 .as_ref()
2440 .map(|id| id.as_str())
2441 .unwrap_or("")
2442 {
2443 table.insert(entry.name.clone(), info);
2444 }
2445 }
2446 Ok(table)
2447}
2448
2449fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2450 if let Some(path) = &spec.legacy_path {
2451 return root.join(path);
2452 }
2453 root.join("components").join(format!("{}.wasm", spec.id))
2454}
2455
2456fn normalize_digest(digest: &str) -> String {
2457 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2458 digest.to_string()
2459 } else {
2460 format!("sha256:{digest}")
2461 }
2462}
2463
2464fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2465 if digest.starts_with("blake3:") {
2466 let hash = blake3::hash(bytes);
2467 return Ok(format!("blake3:{}", hash.to_hex()));
2468 }
2469 let mut hasher = sha2::Sha256::new();
2470 hasher.update(bytes);
2471 Ok(format!("sha256:{:x}", hasher.finalize()))
2472}
2473
2474fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2475 let mut hasher = sha2::Sha256::new();
2476 hasher.update(bytes);
2477 format!("sha256:{:x}", hasher.finalize())
2478}
2479
2480fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2481 let wasm_digest = digest
2482 .map(normalize_digest)
2483 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2484 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2485}
2486
2487async fn compile_component_with_cache(
2488 cache: &CacheManager,
2489 engine: &Engine,
2490 digest: Option<&str>,
2491 bytes: Vec<u8>,
2492) -> Result<Arc<Component>> {
2493 let key = build_artifact_key(cache, digest, &bytes);
2494 cache.get_component(engine, &key, || Ok(bytes)).await
2495}
2496
2497fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2498 let normalized_expected = normalize_digest(expected);
2499 let actual = compute_digest_for(bytes, &normalized_expected)?;
2500 if normalize_digest(&actual) != normalized_expected {
2501 bail!(
2502 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2503 );
2504 }
2505 Ok(())
2506}
2507
2508fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2509 let normalized_expected = normalize_sha256(expected)?;
2510 let actual = compute_sha256_digest_for(bytes);
2511 if actual != normalized_expected {
2512 bail!(
2513 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2514 );
2515 }
2516 Ok(())
2517}
2518
2519#[cfg(test)]
2520mod pack_lock_tests {
2521 use super::*;
2522 use tempfile::TempDir;
2523
2524 #[test]
2525 fn pack_lock_tag_ref_requires_bundle() {
2526 let lock = PackLockV1 {
2527 schema_version: 1,
2528 components: vec![PackLockComponent {
2529 name: "templates".to_string(),
2530 source_ref: Some("oci://registry.test/templates:latest".to_string()),
2531 legacy_ref: None,
2532 component_id: None,
2533 bundled: Some(false),
2534 bundled_path: None,
2535 legacy_path: None,
2536 wasm_sha256: None,
2537 legacy_sha256: None,
2538 resolved_digest: None,
2539 digest: None,
2540 }],
2541 };
2542 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2543 assert!(
2544 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2545 "unexpected error: {err}"
2546 );
2547 }
2548
2549 #[test]
2550 fn bundled_hash_mismatch_errors() {
2551 let rt = tokio::runtime::Runtime::new().expect("runtime");
2552 let temp = TempDir::new().expect("temp dir");
2553 let engine = Engine::default();
2554 let engine_profile =
2555 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2556 let cache_config = CacheConfig {
2557 root: temp.path().join("cache"),
2558 ..CacheConfig::default()
2559 };
2560 let cache = CacheManager::new(cache_config, engine_profile);
2561 let wasm_path = temp.path().join("component.wasm");
2562 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2563 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2564 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2565 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2566
2567 let spec = ComponentSpec {
2568 id: "qa.process".to_string(),
2569 version: "0.0.0".to_string(),
2570 legacy_path: None,
2571 };
2572 let mut missing = HashSet::new();
2573 missing.insert(spec.id.clone());
2574
2575 let mut sources = HashMap::new();
2576 sources.insert(
2577 spec.id.clone(),
2578 ComponentSourceInfo {
2579 digest: Some("sha256:deadbeef".to_string()),
2580 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2581 artifact: ComponentArtifactLocation::Inline {
2582 wasm_path: "component.wasm".to_string(),
2583 },
2584 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2585 skip_digest_verification: false,
2586 },
2587 );
2588
2589 let mut loaded = HashMap::new();
2590 let result = rt.block_on(load_components_from_sources(
2591 &cache,
2592 &engine,
2593 &sources,
2594 &ComponentResolution::default(),
2595 &[spec],
2596 &mut missing,
2597 &mut loaded,
2598 Some(temp.path()),
2599 None,
2600 ));
2601 let err = result.unwrap_err();
2602 assert!(
2603 err.to_string().contains("bundled digest mismatch"),
2604 "unexpected error: {err}"
2605 );
2606 }
2607}
2608
2609#[cfg(test)]
2610mod pack_resolution_prop_tests {
2611 use super::*;
2612 use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
2613 use proptest::prelude::*;
2614 use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
2615 use std::collections::BTreeSet;
2616 use std::path::Path;
2617 use std::str::FromStr;
2618
2619 #[derive(Clone, Debug)]
2620 enum ResolveRequest {
2621 ById(String),
2622 ByName(String),
2623 }
2624
2625 #[derive(Clone, Debug, PartialEq, Eq)]
2626 struct ResolvedComponent {
2627 key: String,
2628 source: String,
2629 artifact: String,
2630 digest: Option<String>,
2631 expected_wasm_sha256: Option<String>,
2632 skip_digest_verification: bool,
2633 }
2634
2635 #[derive(Clone, Debug, PartialEq, Eq)]
2636 struct ResolveError {
2637 code: String,
2638 message: String,
2639 context_key: String,
2640 }
2641
2642 #[derive(Clone, Debug)]
2643 struct Scenario {
2644 pack_lock: Option<PackLockV1>,
2645 component_sources: Option<ComponentSourcesV1>,
2646 request: ResolveRequest,
2647 expected_sha256: Option<String>,
2648 bytes: Vec<u8>,
2649 }
2650
2651 fn resolve_component_test(
2652 sources: Option<&ComponentSourcesV1>,
2653 lock: Option<&PackLockV1>,
2654 request: &ResolveRequest,
2655 ) -> Result<ResolvedComponent, ResolveError> {
2656 let table = if let Some(lock) = lock {
2657 component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
2658 code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
2659 message: err.to_string(),
2660 context_key: request_key(request).to_string(),
2661 })?
2662 } else {
2663 let sources = component_sources_table(sources).map_err(|err| ResolveError {
2664 code: "component_sources_error".to_string(),
2665 message: err.to_string(),
2666 context_key: request_key(request).to_string(),
2667 })?;
2668 sources.ok_or_else(|| ResolveError {
2669 code: "missing_component_sources".to_string(),
2670 message: "component sources not provided".to_string(),
2671 context_key: request_key(request).to_string(),
2672 })?
2673 };
2674
2675 let key = request_key(request);
2676 let source = table.get(key).ok_or_else(|| ResolveError {
2677 code: "component_not_found".to_string(),
2678 message: format!("component {key} not found"),
2679 context_key: key.to_string(),
2680 })?;
2681
2682 Ok(ResolvedComponent {
2683 key: key.to_string(),
2684 source: source.source.to_string(),
2685 artifact: match source.artifact {
2686 ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
2687 ComponentArtifactLocation::Remote => "remote".to_string(),
2688 },
2689 digest: source.digest.clone(),
2690 expected_wasm_sha256: source.expected_wasm_sha256.clone(),
2691 skip_digest_verification: source.skip_digest_verification,
2692 })
2693 }
2694
2695 fn request_key(request: &ResolveRequest) -> &str {
2696 match request {
2697 ResolveRequest::ById(value) => value.as_str(),
2698 ResolveRequest::ByName(value) => value.as_str(),
2699 }
2700 }
2701
2702 fn classify_pack_lock_error(message: &str) -> &'static str {
2703 if message.contains("duplicate component name") {
2704 "duplicate_name"
2705 } else if message.contains("duplicate component id") {
2706 "duplicate_id"
2707 } else if message.contains("conflicting refs") {
2708 "conflicting_ref"
2709 } else if message.contains("conflicting bundled paths") {
2710 "conflicting_bundled_path"
2711 } else if message.contains("conflicting wasm_sha256") {
2712 "conflicting_wasm_sha256"
2713 } else if message.contains("missing source_ref") {
2714 "missing_source_ref"
2715 } else if message.contains("marked bundled but bundled_path is missing") {
2716 "missing_bundled_path"
2717 } else if message.contains("missing wasm_sha256") {
2718 "missing_wasm_sha256"
2719 } else if message.contains("tag ref") && message.contains("not bundled") {
2720 "tag_ref_requires_bundle"
2721 } else if message.contains("missing resolved_digest") {
2722 "missing_resolved_digest"
2723 } else if message.contains("invalid component ref") {
2724 "invalid_component_ref"
2725 } else if message.contains("sha256 digest") {
2726 "invalid_sha256"
2727 } else {
2728 "unknown_error"
2729 }
2730 }
2731
2732 fn known_error_codes() -> BTreeSet<&'static str> {
2733 [
2734 "component_sources_error",
2735 "missing_component_sources",
2736 "component_not_found",
2737 "duplicate_name",
2738 "duplicate_id",
2739 "conflicting_ref",
2740 "conflicting_bundled_path",
2741 "conflicting_wasm_sha256",
2742 "missing_source_ref",
2743 "missing_bundled_path",
2744 "missing_wasm_sha256",
2745 "tag_ref_requires_bundle",
2746 "missing_resolved_digest",
2747 "invalid_component_ref",
2748 "invalid_sha256",
2749 "unknown_error",
2750 ]
2751 .into_iter()
2752 .collect()
2753 }
2754
2755 fn proptest_config() -> ProptestConfig {
2756 let cases = std::env::var("PROPTEST_CASES")
2757 .ok()
2758 .and_then(|value| value.parse::<u32>().ok())
2759 .unwrap_or(128);
2760 ProptestConfig {
2761 cases,
2762 failure_persistence: None,
2763 ..ProptestConfig::default()
2764 }
2765 }
2766
2767 fn proptest_seed() -> Option<[u8; 32]> {
2768 let seed = std::env::var("PROPTEST_SEED")
2769 .ok()
2770 .and_then(|value| value.parse::<u64>().ok())?;
2771 let mut bytes = [0u8; 32];
2772 bytes[..8].copy_from_slice(&seed.to_le_bytes());
2773 Some(bytes)
2774 }
2775
2776 fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
2777 let config = ProptestConfig {
2778 cases,
2779 failure_persistence: None,
2780 ..ProptestConfig::default()
2781 };
2782 let mut runner = match seed {
2783 Some(bytes) => {
2784 TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
2785 }
2786 None => TestRunner::new(config),
2787 };
2788 runner
2789 .run(&strategy, |scenario| {
2790 run_scenario(&scenario);
2791 Ok(())
2792 })
2793 .unwrap();
2794 }
2795
2796 fn run_scenario(scenario: &Scenario) {
2797 let known_codes = known_error_codes();
2798 let first = resolve_component_test(
2799 scenario.component_sources.as_ref(),
2800 scenario.pack_lock.as_ref(),
2801 &scenario.request,
2802 );
2803 let second = resolve_component_test(
2804 scenario.component_sources.as_ref(),
2805 scenario.pack_lock.as_ref(),
2806 &scenario.request,
2807 );
2808 assert_eq!(normalize_result(&first), normalize_result(&second));
2809
2810 if let Some(lock) = scenario.pack_lock.as_ref() {
2811 let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
2812 assert_eq!(normalize_result(&first), normalize_result(&lock_only));
2813 }
2814
2815 if let Err(err) = first.as_ref() {
2816 assert!(
2817 known_codes.contains(err.code.as_str()),
2818 "unexpected error code {}: {}",
2819 err.code,
2820 err.message
2821 );
2822 }
2823
2824 if let Some(expected) = scenario.expected_sha256.as_deref() {
2825 let expected_ok =
2826 verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
2827 let actual = compute_sha256_digest_for(&scenario.bytes);
2828 if actual == normalize_sha256(expected).unwrap_or_default() {
2829 assert!(expected_ok, "expected sha256 match to succeed");
2830 } else {
2831 assert!(!expected_ok, "expected sha256 mismatch to fail");
2832 }
2833 }
2834 }
2835
2836 fn normalize_result(
2837 result: &Result<ResolvedComponent, ResolveError>,
2838 ) -> Result<ResolvedComponent, ResolveError> {
2839 match result {
2840 Ok(value) => Ok(value.clone()),
2841 Err(err) => Err(err.clone()),
2842 }
2843 }
2844
2845 fn scenario_strategy() -> impl Strategy<Value = Scenario> {
2846 let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
2847 let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
2848 let tag_ref = any::<bool>();
2849 let bundled = any::<bool>();
2850 let include_sha = any::<bool>();
2851 let include_component_id = any::<bool>();
2852 let request_by_id = any::<bool>();
2853 let use_lock = any::<bool>();
2854 let use_sources = any::<bool>();
2855 let bytes = prop::collection::vec(any::<u8>(), 1..64);
2856
2857 (
2858 name,
2859 alt_name,
2860 tag_ref,
2861 bundled,
2862 include_sha,
2863 include_component_id,
2864 request_by_id,
2865 use_lock,
2866 use_sources,
2867 bytes,
2868 )
2869 .prop_map(
2870 |(
2871 name,
2872 alt_name,
2873 tag_ref,
2874 bundled,
2875 include_sha,
2876 include_component_id,
2877 request_by_id,
2878 use_lock,
2879 use_sources,
2880 bytes,
2881 )| {
2882 let component_id_str = if include_component_id {
2883 alt_name.clone()
2884 } else {
2885 name.clone()
2886 };
2887 let component_id = ComponentId::from_str(&component_id_str).ok();
2888 let source_ref = if tag_ref {
2889 format!("oci://registry.test/{name}:v1")
2890 } else {
2891 format!(
2892 "oci://registry.test/{name}@sha256:{}",
2893 hex::encode([0x11u8; 32])
2894 )
2895 };
2896 let expected_sha256 = if bundled && include_sha {
2897 Some(compute_sha256_digest_for(&bytes))
2898 } else {
2899 None
2900 };
2901
2902 let lock_component = PackLockComponent {
2903 name: name.clone(),
2904 source_ref: Some(source_ref),
2905 legacy_ref: None,
2906 component_id,
2907 bundled: Some(bundled),
2908 bundled_path: if bundled {
2909 Some(format!("components/{name}.wasm"))
2910 } else {
2911 None
2912 },
2913 legacy_path: None,
2914 wasm_sha256: expected_sha256.clone(),
2915 legacy_sha256: None,
2916 resolved_digest: if bundled {
2917 None
2918 } else {
2919 Some("sha256:deadbeef".to_string())
2920 },
2921 digest: None,
2922 };
2923
2924 let pack_lock = if use_lock {
2925 Some(PackLockV1 {
2926 schema_version: 1,
2927 components: vec![lock_component],
2928 })
2929 } else {
2930 None
2931 };
2932
2933 let component_sources = if use_sources {
2934 Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
2935 name: name.clone(),
2936 component_id: ComponentId::from_str(&name).ok(),
2937 source: ComponentSourceRef::from_str(
2938 "oci://registry.test/component@sha256:deadbeef",
2939 )
2940 .expect("component ref"),
2941 resolved: ResolvedComponentV1 {
2942 digest: "sha256:deadbeef".to_string(),
2943 signature: None,
2944 signed_by: None,
2945 },
2946 artifact: if bundled {
2947 ArtifactLocationV1::Inline {
2948 wasm_path: format!("components/{name}.wasm"),
2949 manifest_path: None,
2950 }
2951 } else {
2952 ArtifactLocationV1::Remote
2953 },
2954 licensing_hint: None,
2955 metering_hint: None,
2956 }]))
2957 } else {
2958 None
2959 };
2960
2961 let request = if request_by_id {
2962 ResolveRequest::ById(component_id_str.clone())
2963 } else {
2964 ResolveRequest::ByName(name.clone())
2965 };
2966
2967 Scenario {
2968 pack_lock,
2969 component_sources,
2970 request,
2971 expected_sha256,
2972 bytes,
2973 }
2974 },
2975 )
2976 }
2977
2978 #[test]
2979 fn pack_resolution_proptest() {
2980 let seed = proptest_seed();
2981 run_cases(scenario_strategy(), proptest_config().cases, seed);
2982 }
2983
2984 #[test]
2985 fn pack_resolution_regression_seeds() {
2986 let seeds_path =
2987 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
2988 let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
2989 for line in raw.lines() {
2990 let line = line.trim();
2991 if line.is_empty() || line.starts_with('#') {
2992 continue;
2993 }
2994 let seed = line.parse::<u64>().expect("seed must be an integer");
2995 let mut bytes = [0u8; 32];
2996 bytes[..8].copy_from_slice(&seed.to_le_bytes());
2997 run_cases(scenario_strategy(), 1, Some(bytes));
2998 }
2999 }
3000}
3001
3002fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
3003 let mut opts = DistOptions {
3004 allow_tags: true,
3005 ..DistOptions::default()
3006 };
3007 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
3008 opts.cache_dir = cache_dir;
3009 }
3010 if component_resolution.dist_offline {
3011 opts.offline = true;
3012 }
3013 opts
3014}
3015
3016#[allow(clippy::too_many_arguments)]
3017async fn load_components_from_sources(
3018 cache: &CacheManager,
3019 engine: &Engine,
3020 component_sources: &HashMap<String, ComponentSourceInfo>,
3021 component_resolution: &ComponentResolution,
3022 specs: &[ComponentSpec],
3023 missing: &mut HashSet<String>,
3024 into: &mut HashMap<String, PackComponent>,
3025 materialized_root: Option<&Path>,
3026 archive_hint: Option<&Path>,
3027) -> Result<()> {
3028 let mut archive = if let Some(path) = archive_hint {
3029 Some(
3030 ZipArchive::new(File::open(path)?)
3031 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
3032 )
3033 } else {
3034 None
3035 };
3036 let mut dist_client: Option<DistClient> = None;
3037
3038 for spec in specs {
3039 if !missing.contains(&spec.id) {
3040 continue;
3041 }
3042 let Some(source) = component_sources.get(&spec.id) else {
3043 continue;
3044 };
3045
3046 let bytes = match &source.artifact {
3047 ComponentArtifactLocation::Inline { wasm_path } => {
3048 if let Some(root) = materialized_root {
3049 let path = root.join(wasm_path);
3050 if path.exists() {
3051 std::fs::read(&path).with_context(|| {
3052 format!(
3053 "failed to read inline component {} from {}",
3054 spec.id,
3055 path.display()
3056 )
3057 })?
3058 } else if archive.is_none() {
3059 bail!("inline component {} missing at {}", spec.id, path.display());
3060 } else {
3061 read_entry(
3062 archive.as_mut().expect("archive present when needed"),
3063 wasm_path,
3064 )
3065 .with_context(|| {
3066 format!(
3067 "inline component {} missing at {} in pack archive",
3068 spec.id, wasm_path
3069 )
3070 })?
3071 }
3072 } else if let Some(archive) = archive.as_mut() {
3073 read_entry(archive, wasm_path).with_context(|| {
3074 format!(
3075 "inline component {} missing at {} in pack archive",
3076 spec.id, wasm_path
3077 )
3078 })?
3079 } else {
3080 bail!(
3081 "inline component {} missing and no pack source available",
3082 spec.id
3083 );
3084 }
3085 }
3086 ComponentArtifactLocation::Remote => {
3087 if source.source.is_tag() {
3088 bail!(
3089 "component {} uses tag ref {} but is not bundled; rebuild the pack",
3090 spec.id,
3091 source.source
3092 );
3093 }
3094 let client = dist_client.get_or_insert_with(|| {
3095 DistClient::new(dist_options_from(component_resolution))
3096 });
3097 let reference = source.source.to_string();
3098 fault::maybe_fail_asset(&reference)
3099 .await
3100 .with_context(|| format!("fault injection blocked asset {reference}"))?;
3101 let digest = source.digest.as_deref().ok_or_else(|| {
3102 anyhow!(
3103 "component {} missing expected digest for remote component",
3104 spec.id
3105 )
3106 })?;
3107 let cache_path = if component_resolution.dist_offline {
3108 client
3109 .fetch_digest(digest)
3110 .await
3111 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3112 } else {
3113 let resolved = client
3114 .resolve_ref(&reference)
3115 .await
3116 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3117 let expected = normalize_digest(digest);
3118 let actual = normalize_digest(&resolved.digest);
3119 if expected != actual {
3120 bail!(
3121 "component {} digest mismatch after fetch: expected {}, got {}",
3122 spec.id,
3123 expected,
3124 actual
3125 );
3126 }
3127 resolved.cache_path.ok_or_else(|| {
3128 anyhow!(
3129 "component {} resolved from {} but cache path is missing",
3130 spec.id,
3131 reference
3132 )
3133 })?
3134 };
3135 std::fs::read(&cache_path).with_context(|| {
3136 format!(
3137 "failed to read cached component {} from {}",
3138 spec.id,
3139 cache_path.display()
3140 )
3141 })?
3142 }
3143 };
3144
3145 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3146 verify_wasm_sha256(&spec.id, expected, &bytes)?;
3147 } else if source.skip_digest_verification {
3148 let actual = compute_sha256_digest_for(&bytes);
3149 warn!(
3150 component_id = %spec.id,
3151 digest = %actual,
3152 "bundled component missing wasm_sha256; allowing due to flag"
3153 );
3154 } else {
3155 let expected = source.digest.as_deref().ok_or_else(|| {
3156 anyhow!(
3157 "component {} missing expected digest for verification",
3158 spec.id
3159 )
3160 })?;
3161 verify_component_digest(&spec.id, expected, &bytes)?;
3162 }
3163 let component =
3164 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3165 .await
3166 .with_context(|| format!("failed to compile component {}", spec.id))?;
3167 into.insert(
3168 spec.id.clone(),
3169 PackComponent {
3170 name: spec.id.clone(),
3171 version: spec.version.clone(),
3172 component,
3173 },
3174 );
3175 missing.remove(&spec.id);
3176 }
3177
3178 Ok(())
3179}
3180
3181fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3182 match err {
3183 DistError::CacheMiss { reference: missing } => anyhow!(
3184 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3185 component_id,
3186 missing,
3187 reference
3188 ),
3189 DistError::Offline { reference: blocked } => anyhow!(
3190 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3191 component_id,
3192 blocked,
3193 reference
3194 ),
3195 DistError::AuthRequired { target } => anyhow!(
3196 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3197 component_id,
3198 target,
3199 reference
3200 ),
3201 other => anyhow!(
3202 "failed to resolve component {} from {}: {}",
3203 component_id,
3204 reference,
3205 other
3206 ),
3207 }
3208}
3209
3210async fn load_components_from_overrides(
3211 cache: &CacheManager,
3212 engine: &Engine,
3213 overrides: &HashMap<String, PathBuf>,
3214 specs: &[ComponentSpec],
3215 missing: &mut HashSet<String>,
3216 into: &mut HashMap<String, PackComponent>,
3217) -> Result<()> {
3218 for spec in specs {
3219 if !missing.contains(&spec.id) {
3220 continue;
3221 }
3222 let Some(path) = overrides.get(&spec.id) else {
3223 continue;
3224 };
3225 let bytes = std::fs::read(path)
3226 .with_context(|| format!("failed to read override component {}", path.display()))?;
3227 let component = compile_component_with_cache(cache, engine, None, bytes)
3228 .await
3229 .with_context(|| {
3230 format!(
3231 "failed to compile component {} from override {}",
3232 spec.id,
3233 path.display()
3234 )
3235 })?;
3236 into.insert(
3237 spec.id.clone(),
3238 PackComponent {
3239 name: spec.id.clone(),
3240 version: spec.version.clone(),
3241 component,
3242 },
3243 );
3244 missing.remove(&spec.id);
3245 }
3246 Ok(())
3247}
3248
3249async fn load_components_from_dir(
3250 cache: &CacheManager,
3251 engine: &Engine,
3252 root: &Path,
3253 specs: &[ComponentSpec],
3254 missing: &mut HashSet<String>,
3255 into: &mut HashMap<String, PackComponent>,
3256) -> Result<()> {
3257 for spec in specs {
3258 if !missing.contains(&spec.id) {
3259 continue;
3260 }
3261 let path = component_path_for_spec(root, spec);
3262 if !path.exists() {
3263 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3264 continue;
3265 }
3266 let bytes = std::fs::read(&path)
3267 .with_context(|| format!("failed to read component {}", path.display()))?;
3268 let component = compile_component_with_cache(cache, engine, None, bytes)
3269 .await
3270 .with_context(|| {
3271 format!(
3272 "failed to compile component {} from {}",
3273 spec.id,
3274 path.display()
3275 )
3276 })?;
3277 into.insert(
3278 spec.id.clone(),
3279 PackComponent {
3280 name: spec.id.clone(),
3281 version: spec.version.clone(),
3282 component,
3283 },
3284 );
3285 missing.remove(&spec.id);
3286 }
3287 Ok(())
3288}
3289
3290async fn load_components_from_archive(
3291 cache: &CacheManager,
3292 engine: &Engine,
3293 path: &Path,
3294 specs: &[ComponentSpec],
3295 missing: &mut HashSet<String>,
3296 into: &mut HashMap<String, PackComponent>,
3297) -> Result<()> {
3298 let mut archive = ZipArchive::new(File::open(path)?)
3299 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3300 for spec in specs {
3301 if !missing.contains(&spec.id) {
3302 continue;
3303 }
3304 let file_name = spec
3305 .legacy_path
3306 .clone()
3307 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3308 let bytes = match read_entry(&mut archive, &file_name) {
3309 Ok(bytes) => bytes,
3310 Err(err) => {
3311 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3312 continue;
3313 }
3314 };
3315 let component = compile_component_with_cache(cache, engine, None, bytes)
3316 .await
3317 .with_context(|| format!("failed to compile component {}", spec.id))?;
3318 into.insert(
3319 spec.id.clone(),
3320 PackComponent {
3321 name: spec.id.clone(),
3322 version: spec.version.clone(),
3323 component,
3324 },
3325 );
3326 missing.remove(&spec.id);
3327 }
3328 Ok(())
3329}
3330
3331#[cfg(test)]
3332mod tests {
3333 use super::*;
3334 use greentic_flow::model::{FlowDoc, NodeDoc};
3335 use indexmap::IndexMap;
3336 use serde_json::json;
3337
3338 #[test]
3339 fn normalizes_raw_component_to_component_exec() {
3340 let mut nodes = IndexMap::new();
3341 let mut raw = IndexMap::new();
3342 raw.insert(
3343 "templating.handlebars".into(),
3344 json!({ "template": "Hi {{name}}" }),
3345 );
3346 nodes.insert(
3347 "start".into(),
3348 NodeDoc {
3349 raw,
3350 routing: json!([{"out": true}]),
3351 ..Default::default()
3352 },
3353 );
3354 let doc = FlowDoc {
3355 id: "welcome".into(),
3356 title: None,
3357 description: None,
3358 flow_type: "messaging".into(),
3359 start: Some("start".into()),
3360 parameters: json!({}),
3361 tags: Vec::new(),
3362 schema_version: None,
3363 entrypoints: IndexMap::new(),
3364 nodes,
3365 };
3366
3367 let normalized = normalize_flow_doc(doc);
3368 let node = normalized.nodes.get("start").expect("node exists");
3369 assert_eq!(node.operation.as_deref(), Some("component.exec"));
3370 assert!(node.raw.is_empty());
3371 let payload = node.payload.as_object().expect("payload object");
3372 assert_eq!(
3373 payload.get("component"),
3374 Some(&Value::String("templating.handlebars".into()))
3375 );
3376 assert_eq!(
3377 payload.get("operation"),
3378 Some(&Value::String("render".into()))
3379 );
3380 let input = payload.get("input").unwrap();
3381 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
3382 }
3383}
3384
3385#[derive(Clone, Debug, Default, Serialize, Deserialize)]
3386pub struct PackMetadata {
3387 pub pack_id: String,
3388 pub version: String,
3389 #[serde(default)]
3390 pub entry_flows: Vec<String>,
3391 #[serde(default)]
3392 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
3393}
3394
3395impl PackMetadata {
3396 fn from_wasm(bytes: &[u8]) -> Option<Self> {
3397 let parser = Parser::new(0);
3398 for payload in parser.parse_all(bytes) {
3399 let payload = payload.ok()?;
3400 match payload {
3401 Payload::CustomSection(section) => {
3402 if section.name() == "greentic.manifest"
3403 && let Ok(meta) = Self::from_bytes(section.data())
3404 {
3405 return Some(meta);
3406 }
3407 }
3408 Payload::DataSection(reader) => {
3409 for segment in reader.into_iter().flatten() {
3410 if let Ok(meta) = Self::from_bytes(segment.data) {
3411 return Some(meta);
3412 }
3413 }
3414 }
3415 _ => {}
3416 }
3417 }
3418 None
3419 }
3420
3421 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
3422 #[derive(Deserialize)]
3423 struct RawManifest {
3424 pack_id: String,
3425 version: String,
3426 #[serde(default)]
3427 entry_flows: Vec<String>,
3428 #[serde(default)]
3429 flows: Vec<RawFlow>,
3430 #[serde(default)]
3431 secret_requirements: Vec<greentic_types::SecretRequirement>,
3432 }
3433
3434 #[derive(Deserialize)]
3435 struct RawFlow {
3436 id: String,
3437 }
3438
3439 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
3440 let mut entry_flows = if manifest.entry_flows.is_empty() {
3441 manifest.flows.iter().map(|f| f.id.clone()).collect()
3442 } else {
3443 manifest.entry_flows.clone()
3444 };
3445 entry_flows.retain(|id| !id.is_empty());
3446 Ok(Self {
3447 pack_id: manifest.pack_id,
3448 version: manifest.version,
3449 entry_flows,
3450 secret_requirements: manifest.secret_requirements,
3451 })
3452 }
3453
3454 pub fn fallback(path: &Path) -> Self {
3455 let pack_id = path
3456 .file_stem()
3457 .map(|s| s.to_string_lossy().into_owned())
3458 .unwrap_or_else(|| "unknown-pack".to_string());
3459 Self {
3460 pack_id,
3461 version: "0.0.0".to_string(),
3462 entry_flows: Vec::new(),
3463 secret_requirements: Vec::new(),
3464 }
3465 }
3466
3467 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
3468 let entry_flows = manifest
3469 .flows
3470 .iter()
3471 .map(|flow| flow.id.as_str().to_string())
3472 .collect::<Vec<_>>();
3473 Self {
3474 pack_id: manifest.pack_id.as_str().to_string(),
3475 version: manifest.version.to_string(),
3476 entry_flows,
3477 secret_requirements: manifest.secret_requirements.clone(),
3478 }
3479 }
3480}