1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21 self as host_v1, HostFns, add_all_v1_to_linker,
22 runner_host_http::RunnerHostHttp,
23 runner_host_kv::RunnerHostKv,
24 secrets_store::{SecretsError, SecretsStoreHost},
25 state_store::{
26 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27 StateStoreHost, TenantCtx as StateTenantCtx,
28 },
29 telemetry_logger::{
30 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32 TenantCtx as TelemetryTenantCtx,
33 },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
41 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
42 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
43 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44 pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69#[cfg(feature = "fault-injection")]
70use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
71
72use crate::config::HostConfig;
73use crate::fault;
74use crate::secrets::{DynSecretsManager, read_secret_blocking};
75use crate::storage::state::STATE_PREFIX;
76use crate::storage::{DynSessionStore, DynStateStore};
77use crate::verify;
78use crate::wasi::RunnerWasiPolicy;
79use tracing::warn;
80use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
81use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
82
83use greentic_flow::model::FlowDoc;
84
85#[allow(dead_code)]
86pub struct PackRuntime {
87 path: PathBuf,
89 archive_path: Option<PathBuf>,
91 config: Arc<HostConfig>,
92 engine: Engine,
93 metadata: PackMetadata,
94 manifest: Option<greentic_types::PackManifest>,
95 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
96 component_manifests: HashMap<String, ComponentManifest>,
97 mocks: Option<Arc<MockLayer>>,
98 flows: Option<PackFlows>,
99 components: HashMap<String, PackComponent>,
100 http_client: Arc<BlockingClient>,
101 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
102 session_store: Option<DynSessionStore>,
103 state_store: Option<DynStateStore>,
104 wasi_policy: Arc<RunnerWasiPolicy>,
105 provider_registry: RwLock<Option<ProviderRegistry>>,
106 secrets: DynSecretsManager,
107 oauth_config: Option<OAuthBrokerConfig>,
108 cache: CacheManager,
109}
110
111struct PackComponent {
112 #[allow(dead_code)]
113 name: String,
114 #[allow(dead_code)]
115 version: String,
116 component: Arc<Component>,
117}
118
119#[derive(Debug, Default, Clone)]
120pub struct ComponentResolution {
121 pub materialized_root: Option<PathBuf>,
123 pub overrides: HashMap<String, PathBuf>,
125 pub dist_offline: bool,
127 pub dist_cache_dir: Option<PathBuf>,
129 pub allow_missing_hash: bool,
131}
132
133fn build_blocking_client() -> BlockingClient {
134 std::thread::spawn(|| {
135 BlockingClient::builder()
136 .no_proxy()
137 .build()
138 .expect("blocking client")
139 })
140 .join()
141 .expect("client build thread panicked")
142}
143
144fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
145 let (root, candidate) = if path.is_absolute() {
146 let parent = path
147 .parent()
148 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
149 let root = parent
150 .canonicalize()
151 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
152 let file = path
153 .file_name()
154 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
155 (root, PathBuf::from(file))
156 } else {
157 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
158 let base = if let Some(parent) = path.parent() {
159 cwd.join(parent)
160 } else {
161 cwd
162 };
163 let root = base
164 .canonicalize()
165 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
166 let file = path
167 .file_name()
168 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
169 (root, PathBuf::from(file))
170 };
171 let safe = normalize_under_root(&root, &candidate)?;
172 Ok((root, safe))
173}
174
175static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct FlowDescriptor {
179 pub id: String,
180 #[serde(rename = "type")]
181 pub flow_type: String,
182 pub pack_id: String,
183 pub profile: String,
184 pub version: String,
185 #[serde(default)]
186 pub description: Option<String>,
187}
188
189pub struct HostState {
190 #[allow(dead_code)]
191 pack_id: String,
192 config: Arc<HostConfig>,
193 http_client: Arc<BlockingClient>,
194 default_env: String,
195 #[allow(dead_code)]
196 session_store: Option<DynSessionStore>,
197 state_store: Option<DynStateStore>,
198 mocks: Option<Arc<MockLayer>>,
199 secrets: DynSecretsManager,
200 oauth_config: Option<OAuthBrokerConfig>,
201 oauth_host: OAuthBrokerHost,
202 exec_ctx: Option<ComponentExecCtx>,
203}
204
205impl HostState {
206 #[allow(clippy::default_constructed_unit_structs)]
207 #[allow(clippy::too_many_arguments)]
208 pub fn new(
209 pack_id: String,
210 config: Arc<HostConfig>,
211 http_client: Arc<BlockingClient>,
212 mocks: Option<Arc<MockLayer>>,
213 session_store: Option<DynSessionStore>,
214 state_store: Option<DynStateStore>,
215 secrets: DynSecretsManager,
216 oauth_config: Option<OAuthBrokerConfig>,
217 exec_ctx: Option<ComponentExecCtx>,
218 ) -> Result<Self> {
219 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
220 Ok(Self {
221 pack_id,
222 config,
223 http_client,
224 default_env,
225 session_store,
226 state_store,
227 mocks,
228 secrets,
229 oauth_config,
230 oauth_host: OAuthBrokerHost::default(),
231 exec_ctx,
232 })
233 }
234
235 pub fn get_secret(&self, key: &str) -> Result<String> {
236 if provider_core_only::is_enabled() {
237 bail!(provider_core_only::blocked_message("secrets"))
238 }
239 if !self.config.secrets_policy.is_allowed(key) {
240 bail!("secret {key} is not permitted by bindings policy");
241 }
242 if let Some(mock) = &self.mocks
243 && let Some(value) = mock.secrets_lookup(key)
244 {
245 return Ok(value);
246 }
247 let ctx = self.config.tenant_ctx();
248 let bytes = read_secret_blocking(&self.secrets, &ctx, key)
249 .context("failed to read secret from manager")?;
250 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
251 Ok(value)
252 }
253
254 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
255 let tenant_raw = ctx
256 .as_ref()
257 .map(|ctx| ctx.tenant.clone())
258 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
259 .unwrap_or_else(|| self.config.tenant.clone());
260 let env_raw = ctx
261 .as_ref()
262 .map(|ctx| ctx.env.clone())
263 .unwrap_or_else(|| self.default_env.clone());
264 let tenant_id = TenantId::from_str(&tenant_raw)
265 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
266 let env_id = EnvId::from_str(&env_raw)
267 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
268 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
269 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
270 if let Some(team) = exec_ctx.tenant.team.as_ref() {
271 let team_id =
272 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
273 tenant_ctx = tenant_ctx.with_team(Some(team_id));
274 }
275 if let Some(user) = exec_ctx.tenant.user.as_ref() {
276 let user_id =
277 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
278 tenant_ctx = tenant_ctx.with_user(Some(user_id));
279 }
280 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
281 if let Some(node) = exec_ctx.node_id.as_ref() {
282 tenant_ctx = tenant_ctx.with_node(node.clone());
283 }
284 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
285 tenant_ctx = tenant_ctx.with_session(session.clone());
286 }
287 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
288 }
289
290 if let Some(ctx) = ctx {
291 if let Some(team) = ctx.team.or(ctx.team_id) {
292 let team_id =
293 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
294 tenant_ctx = tenant_ctx.with_team(Some(team_id));
295 }
296 if let Some(user) = ctx.user.or(ctx.user_id) {
297 let user_id =
298 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
299 tenant_ctx = tenant_ctx.with_user(Some(user_id));
300 }
301 if let Some(flow) = ctx.flow_id {
302 tenant_ctx = tenant_ctx.with_flow(flow);
303 }
304 if let Some(node) = ctx.node_id {
305 tenant_ctx = tenant_ctx.with_node(node);
306 }
307 if let Some(provider) = ctx.provider_id {
308 tenant_ctx = tenant_ctx.with_provider(provider);
309 }
310 if let Some(session) = ctx.session_id {
311 tenant_ctx = tenant_ctx.with_session(session);
312 }
313 tenant_ctx.trace_id = ctx.trace_id;
314 }
315 Ok(tenant_ctx)
316 }
317
318 fn send_http_request(
319 &mut self,
320 req: HttpRequest,
321 opts: Option<HttpRequestOptionsV1_1>,
322 _ctx: Option<HttpTenantCtx>,
323 ) -> Result<HttpResponse, HttpClientError> {
324 if !self.config.http_enabled {
325 return Err(HttpClientError {
326 code: "denied".into(),
327 message: "http client disabled by policy".into(),
328 });
329 }
330
331 let mut mock_state = None;
332 let raw_body = req.body.clone();
333 if let Some(mock) = &self.mocks
334 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
335 {
336 match mock.http_begin(&meta) {
337 HttpDecision::Mock(response) => {
338 let headers = response
339 .headers
340 .iter()
341 .map(|(k, v)| (k.clone(), v.clone()))
342 .collect();
343 return Ok(HttpResponse {
344 status: response.status,
345 headers,
346 body: response.body.clone().map(|b| b.into_bytes()),
347 });
348 }
349 HttpDecision::Deny(reason) => {
350 return Err(HttpClientError {
351 code: "denied".into(),
352 message: reason,
353 });
354 }
355 HttpDecision::Passthrough { record } => {
356 mock_state = Some((meta, record));
357 }
358 }
359 }
360
361 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
362 let mut builder = self.http_client.request(method, &req.url);
363 for (key, value) in req.headers {
364 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
365 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
366 {
367 builder = builder.header(header, header_value);
368 }
369 }
370
371 if let Some(body) = raw_body.clone() {
372 builder = builder.body(body);
373 }
374
375 if let Some(opts) = opts {
376 if let Some(timeout_ms) = opts.timeout_ms {
377 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
378 }
379 if opts.allow_insecure == Some(true) {
380 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
381 }
382 if let Some(follow_redirects) = opts.follow_redirects
383 && !follow_redirects
384 {
385 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
386 }
387 }
388
389 let response = match builder.send() {
390 Ok(resp) => resp,
391 Err(err) => {
392 warn!(url = %req.url, error = %err, "http client request failed");
393 return Err(HttpClientError {
394 code: "unavailable".into(),
395 message: err.to_string(),
396 });
397 }
398 };
399
400 let status = response.status().as_u16();
401 let headers_vec = response
402 .headers()
403 .iter()
404 .map(|(k, v)| {
405 (
406 k.as_str().to_string(),
407 v.to_str().unwrap_or_default().to_string(),
408 )
409 })
410 .collect::<Vec<_>>();
411 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
412
413 if let Some((meta, true)) = mock_state.take()
414 && let Some(mock) = &self.mocks
415 {
416 let recorded = HttpMockResponse::new(
417 status,
418 headers_vec.clone().into_iter().collect(),
419 body_bytes
420 .as_ref()
421 .map(|b| String::from_utf8_lossy(b).into_owned()),
422 );
423 mock.http_record(&meta, &recorded);
424 }
425
426 Ok(HttpResponse {
427 status,
428 headers: headers_vec,
429 body: body_bytes,
430 })
431 }
432}
433
434impl SecretsStoreHost for HostState {
435 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
436 if provider_core_only::is_enabled() {
437 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
438 return Err(SecretsError::Denied);
439 }
440 if !self.config.secrets_policy.is_allowed(&key) {
441 return Err(SecretsError::Denied);
442 }
443 if let Some(mock) = &self.mocks
444 && let Some(value) = mock.secrets_lookup(&key)
445 {
446 return Ok(Some(value.into_bytes()));
447 }
448 let ctx = self.config.tenant_ctx();
449 match read_secret_blocking(&self.secrets, &ctx, &key) {
450 Ok(bytes) => Ok(Some(bytes)),
451 Err(err) => {
452 warn!(secret = %key, error = %err, "secret lookup failed");
453 Err(SecretsError::NotFound)
454 }
455 }
456 }
457}
458
459impl HttpClientHost for HostState {
460 fn send(
461 &mut self,
462 req: HttpRequest,
463 ctx: Option<HttpTenantCtx>,
464 ) -> Result<HttpResponse, HttpClientError> {
465 self.send_http_request(req, None, ctx)
466 }
467}
468
469impl HttpClientHostV1_1 for HostState {
470 fn send(
471 &mut self,
472 req: HttpRequestV1_1,
473 opts: Option<HttpRequestOptionsV1_1>,
474 ctx: Option<HttpTenantCtxV1_1>,
475 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
476 let legacy_req = HttpRequest {
477 method: req.method,
478 url: req.url,
479 headers: req.headers,
480 body: req.body,
481 };
482 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
483 env: ctx.env,
484 tenant: ctx.tenant,
485 tenant_id: ctx.tenant_id,
486 team: ctx.team,
487 team_id: ctx.team_id,
488 user: ctx.user,
489 user_id: ctx.user_id,
490 trace_id: ctx.trace_id,
491 correlation_id: ctx.correlation_id,
492 attributes: ctx.attributes,
493 session_id: ctx.session_id,
494 flow_id: ctx.flow_id,
495 node_id: ctx.node_id,
496 provider_id: ctx.provider_id,
497 deadline_ms: ctx.deadline_ms,
498 attempt: ctx.attempt,
499 idempotency_key: ctx.idempotency_key,
500 impersonation: ctx
501 .impersonation
502 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
503 actor_id,
504 reason,
505 }),
506 });
507
508 self.send_http_request(legacy_req, opts, legacy_ctx)
509 .map(|resp| HttpResponseV1_1 {
510 status: resp.status,
511 headers: resp.headers,
512 body: resp.body,
513 })
514 .map_err(|err| HttpClientErrorV1_1 {
515 code: err.code,
516 message: err.message,
517 })
518 }
519}
520
521impl StateStoreHost for HostState {
522 fn read(
523 &mut self,
524 key: HostStateKey,
525 ctx: Option<StateTenantCtx>,
526 ) -> Result<Vec<u8>, StateError> {
527 let store = match self.state_store.as_ref() {
528 Some(store) => store.clone(),
529 None => {
530 return Err(StateError {
531 code: "unavailable".into(),
532 message: "state store not configured".into(),
533 });
534 }
535 };
536 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
537 Ok(ctx) => ctx,
538 Err(err) => {
539 return Err(StateError {
540 code: "invalid-ctx".into(),
541 message: err.to_string(),
542 });
543 }
544 };
545 #[cfg(feature = "fault-injection")]
546 {
547 let exec_ctx = self.exec_ctx.as_ref();
548 let flow_id = exec_ctx
549 .map(|ctx| ctx.flow_id.as_str())
550 .unwrap_or("unknown");
551 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
552 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
553 let fault_ctx = FaultContext {
554 pack_id: self.pack_id.as_str(),
555 flow_id,
556 node_id,
557 attempt,
558 };
559 if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
560 return Err(StateError {
561 code: "internal".into(),
562 message: err.to_string(),
563 });
564 }
565 }
566 let key = StoreStateKey::from(key);
567 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
568 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
569 Ok(None) => Err(StateError {
570 code: "not_found".into(),
571 message: "state key not found".into(),
572 }),
573 Err(err) => Err(StateError {
574 code: "internal".into(),
575 message: err.to_string(),
576 }),
577 }
578 }
579
580 fn write(
581 &mut self,
582 key: HostStateKey,
583 bytes: Vec<u8>,
584 ctx: Option<StateTenantCtx>,
585 ) -> Result<StateOpAck, StateError> {
586 let store = match self.state_store.as_ref() {
587 Some(store) => store.clone(),
588 None => {
589 return Err(StateError {
590 code: "unavailable".into(),
591 message: "state store not configured".into(),
592 });
593 }
594 };
595 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
596 Ok(ctx) => ctx,
597 Err(err) => {
598 return Err(StateError {
599 code: "invalid-ctx".into(),
600 message: err.to_string(),
601 });
602 }
603 };
604 #[cfg(feature = "fault-injection")]
605 {
606 let exec_ctx = self.exec_ctx.as_ref();
607 let flow_id = exec_ctx
608 .map(|ctx| ctx.flow_id.as_str())
609 .unwrap_or("unknown");
610 let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
611 let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
612 let fault_ctx = FaultContext {
613 pack_id: self.pack_id.as_str(),
614 flow_id,
615 node_id,
616 attempt,
617 };
618 if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
619 return Err(StateError {
620 code: "internal".into(),
621 message: err.to_string(),
622 });
623 }
624 }
625 let key = StoreStateKey::from(key);
626 let value = serde_json::from_slice(&bytes)
627 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
628 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
629 Ok(()) => Ok(StateOpAck::Ok),
630 Err(err) => Err(StateError {
631 code: "internal".into(),
632 message: err.to_string(),
633 }),
634 }
635 }
636
637 fn delete(
638 &mut self,
639 key: HostStateKey,
640 ctx: Option<StateTenantCtx>,
641 ) -> Result<StateOpAck, StateError> {
642 let store = match self.state_store.as_ref() {
643 Some(store) => store.clone(),
644 None => {
645 return Err(StateError {
646 code: "unavailable".into(),
647 message: "state store not configured".into(),
648 });
649 }
650 };
651 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
652 Ok(ctx) => ctx,
653 Err(err) => {
654 return Err(StateError {
655 code: "invalid-ctx".into(),
656 message: err.to_string(),
657 });
658 }
659 };
660 let key = StoreStateKey::from(key);
661 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
662 Ok(_) => Ok(StateOpAck::Ok),
663 Err(err) => Err(StateError {
664 code: "internal".into(),
665 message: err.to_string(),
666 }),
667 }
668 }
669}
670
671impl TelemetryLoggerHost for HostState {
672 fn log(
673 &mut self,
674 span: TelemetrySpanContext,
675 fields: Vec<(String, String)>,
676 _ctx: Option<TelemetryTenantCtx>,
677 ) -> Result<TelemetryAck, TelemetryError> {
678 if let Some(mock) = &self.mocks
679 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
680 {
681 return Ok(TelemetryAck::Ok);
682 }
683 let mut map = serde_json::Map::new();
684 for (k, v) in fields {
685 map.insert(k, Value::String(v));
686 }
687 tracing::info!(
688 tenant = %span.tenant,
689 flow_id = %span.flow_id,
690 node = ?span.node_id,
691 provider = %span.provider,
692 fields = %serde_json::Value::Object(map.clone()),
693 "telemetry log from pack"
694 );
695 Ok(TelemetryAck::Ok)
696 }
697}
698
699impl RunnerHostHttp for HostState {
700 fn request(
701 &mut self,
702 method: String,
703 url: String,
704 headers: Vec<String>,
705 body: Option<Vec<u8>>,
706 ) -> Result<Vec<u8>, String> {
707 let req = HttpRequest {
708 method,
709 url,
710 headers: headers
711 .chunks(2)
712 .filter_map(|chunk| {
713 if chunk.len() == 2 {
714 Some((chunk[0].clone(), chunk[1].clone()))
715 } else {
716 None
717 }
718 })
719 .collect(),
720 body,
721 };
722 match HttpClientHost::send(self, req, None) {
723 Ok(resp) => Ok(resp.body.unwrap_or_default()),
724 Err(err) => Err(err.message),
725 }
726 }
727}
728
729impl RunnerHostKv for HostState {
730 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
731 None
732 }
733
734 fn put(&mut self, _ns: String, _key: String, _val: String) {}
735}
736
737enum ManifestLoad {
738 New {
739 manifest: Box<greentic_types::PackManifest>,
740 flows: PackFlows,
741 },
742 Legacy {
743 manifest: Box<legacy_pack::PackManifest>,
744 flows: PackFlows,
745 },
746}
747
748fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
749 let mut archive = ZipArchive::new(File::open(path)?)
750 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
751 let bytes = read_entry(&mut archive, "manifest.cbor")
752 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
753 match decode_pack_manifest(&bytes) {
754 Ok(manifest) => {
755 let cache = PackFlows::from_manifest(manifest.clone());
756 Ok(ManifestLoad::New {
757 manifest: Box::new(manifest),
758 flows: cache,
759 })
760 }
761 Err(err) => {
762 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
763 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
765 .context("failed to decode legacy pack manifest from manifest.cbor")?;
766 let flows = load_legacy_flows(&mut archive, &legacy)?;
767 Ok(ManifestLoad::Legacy {
768 manifest: Box::new(legacy),
769 flows,
770 })
771 }
772 }
773}
774
775fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
776 let manifest_path = root.join("manifest.cbor");
777 let bytes = std::fs::read(&manifest_path)
778 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
779 match decode_pack_manifest(&bytes) {
780 Ok(manifest) => {
781 let cache = PackFlows::from_manifest(manifest.clone());
782 Ok(ManifestLoad::New {
783 manifest: Box::new(manifest),
784 flows: cache,
785 })
786 }
787 Err(err) => {
788 tracing::debug!(
789 error = %err,
790 pack = %root.display(),
791 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
792 );
793 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
794 .context("failed to decode legacy pack manifest from manifest.cbor")?;
795 let flows = load_legacy_flows_from_dir(root, &legacy)?;
796 Ok(ManifestLoad::Legacy {
797 manifest: Box::new(legacy),
798 flows,
799 })
800 }
801 }
802}
803
804fn load_legacy_flows(
805 archive: &mut ZipArchive<File>,
806 manifest: &legacy_pack::PackManifest,
807) -> Result<PackFlows> {
808 let mut flows = HashMap::new();
809 let mut descriptors = Vec::new();
810
811 for entry in &manifest.flows {
812 let bytes = read_entry(archive, &entry.file_json)
813 .with_context(|| format!("missing flow json {}", entry.file_json))?;
814 let doc: FlowDoc = serde_json::from_slice(&bytes)
815 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
816 let normalized = normalize_flow_doc(doc);
817 let flow_ir = flow_doc_to_ir(normalized)?;
818 let flow = flow_ir_to_flow(flow_ir)?;
819
820 descriptors.push(FlowDescriptor {
821 id: entry.id.clone(),
822 flow_type: entry.kind.clone(),
823 pack_id: manifest.meta.pack_id.clone(),
824 profile: manifest.meta.pack_id.clone(),
825 version: manifest.meta.version.to_string(),
826 description: None,
827 });
828 flows.insert(entry.id.clone(), flow);
829 }
830
831 let mut entry_flows = manifest.meta.entry_flows.clone();
832 if entry_flows.is_empty() {
833 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
834 }
835 let metadata = PackMetadata {
836 pack_id: manifest.meta.pack_id.clone(),
837 version: manifest.meta.version.to_string(),
838 entry_flows,
839 secret_requirements: Vec::new(),
840 };
841
842 Ok(PackFlows {
843 descriptors,
844 flows,
845 metadata,
846 })
847}
848
849fn load_legacy_flows_from_dir(
850 root: &Path,
851 manifest: &legacy_pack::PackManifest,
852) -> Result<PackFlows> {
853 let mut flows = HashMap::new();
854 let mut descriptors = Vec::new();
855
856 for entry in &manifest.flows {
857 let path = root.join(&entry.file_json);
858 let bytes = std::fs::read(&path)
859 .with_context(|| format!("missing flow json {}", path.display()))?;
860 let doc: FlowDoc = serde_json::from_slice(&bytes)
861 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
862 let normalized = normalize_flow_doc(doc);
863 let flow_ir = flow_doc_to_ir(normalized)?;
864 let flow = flow_ir_to_flow(flow_ir)?;
865
866 descriptors.push(FlowDescriptor {
867 id: entry.id.clone(),
868 flow_type: entry.kind.clone(),
869 pack_id: manifest.meta.pack_id.clone(),
870 profile: manifest.meta.pack_id.clone(),
871 version: manifest.meta.version.to_string(),
872 description: None,
873 });
874 flows.insert(entry.id.clone(), flow);
875 }
876
877 let mut entry_flows = manifest.meta.entry_flows.clone();
878 if entry_flows.is_empty() {
879 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
880 }
881 let metadata = PackMetadata {
882 pack_id: manifest.meta.pack_id.clone(),
883 version: manifest.meta.version.to_string(),
884 entry_flows,
885 secret_requirements: Vec::new(),
886 };
887
888 Ok(PackFlows {
889 descriptors,
890 flows,
891 metadata,
892 })
893}
894
895pub struct ComponentState {
896 pub host: HostState,
897 wasi_ctx: WasiCtx,
898 resource_table: ResourceTable,
899}
900
901impl ComponentState {
902 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
903 let wasi_ctx = policy
904 .instantiate()
905 .context("failed to build WASI context")?;
906 Ok(Self {
907 host,
908 wasi_ctx,
909 resource_table: ResourceTable::new(),
910 })
911 }
912
913 fn host_mut(&mut self) -> &mut HostState {
914 &mut self.host
915 }
916
917 fn should_cancel_host(&mut self) -> bool {
918 false
919 }
920
921 fn yield_now_host(&mut self) {
922 }
924}
925
926impl component_api::v0_4::greentic::component::control::Host for ComponentState {
927 fn should_cancel(&mut self) -> bool {
928 self.should_cancel_host()
929 }
930
931 fn yield_now(&mut self) {
932 self.yield_now_host();
933 }
934}
935
936impl component_api::v0_5::greentic::component::control::Host for ComponentState {
937 fn should_cancel(&mut self) -> bool {
938 self.should_cancel_host()
939 }
940
941 fn yield_now(&mut self) {
942 self.yield_now_host();
943 }
944}
945
946fn add_component_control_instance(
947 linker: &mut Linker<ComponentState>,
948 name: &str,
949) -> wasmtime::Result<()> {
950 let mut inst = linker.instance(name)?;
951 inst.func_wrap(
952 "should-cancel",
953 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
954 let host = caller.data_mut();
955 Ok((host.should_cancel_host(),))
956 },
957 )?;
958 inst.func_wrap(
959 "yield-now",
960 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
961 let host = caller.data_mut();
962 host.yield_now_host();
963 Ok(())
964 },
965 )?;
966 Ok(())
967}
968
969fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
970 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
971 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
972 Ok(())
973}
974
975pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
976 add_wasi_to_linker(linker)?;
977 add_all_v1_to_linker(
978 linker,
979 HostFns {
980 http_client_v1_1: Some(|state| state.host_mut()),
981 http_client: Some(|state| state.host_mut()),
982 oauth_broker: None,
983 runner_host_http: Some(|state| state.host_mut()),
984 runner_host_kv: Some(|state| state.host_mut()),
985 telemetry_logger: Some(|state| state.host_mut()),
986 state_store: allow_state_store.then_some(|state| state.host_mut()),
987 secrets_store: Some(|state| state.host_mut()),
988 },
989 )?;
990 Ok(())
991}
992
993impl OAuthHostContext for ComponentState {
994 fn tenant_id(&self) -> &str {
995 &self.host.config.tenant
996 }
997
998 fn env(&self) -> &str {
999 &self.host.default_env
1000 }
1001
1002 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1003 &mut self.host.oauth_host
1004 }
1005
1006 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1007 self.host.oauth_config.as_ref()
1008 }
1009}
1010
1011impl WasiView for ComponentState {
1012 fn ctx(&mut self) -> WasiCtxView<'_> {
1013 WasiCtxView {
1014 ctx: &mut self.wasi_ctx,
1015 table: &mut self.resource_table,
1016 }
1017 }
1018}
1019
1020#[allow(unsafe_code)]
1021unsafe impl Send for ComponentState {}
1022#[allow(unsafe_code)]
1023unsafe impl Sync for ComponentState {}
1024
1025impl PackRuntime {
1026 fn allows_state_store(&self, component_ref: &str) -> bool {
1027 if self.state_store.is_none() {
1028 return false;
1029 }
1030 if !self.config.state_store_policy.allow {
1031 return false;
1032 }
1033 let Some(manifest) = self.component_manifests.get(component_ref) else {
1034 return false;
1035 };
1036 manifest
1037 .capabilities
1038 .host
1039 .state
1040 .as_ref()
1041 .map(|caps| caps.read || caps.write)
1042 .unwrap_or(false)
1043 }
1044
1045 #[allow(clippy::too_many_arguments)]
1046 pub async fn load(
1047 path: impl AsRef<Path>,
1048 config: Arc<HostConfig>,
1049 mocks: Option<Arc<MockLayer>>,
1050 archive_source: Option<&Path>,
1051 session_store: Option<DynSessionStore>,
1052 state_store: Option<DynStateStore>,
1053 wasi_policy: Arc<RunnerWasiPolicy>,
1054 secrets: DynSecretsManager,
1055 oauth_config: Option<OAuthBrokerConfig>,
1056 verify_archive: bool,
1057 component_resolution: ComponentResolution,
1058 ) -> Result<Self> {
1059 let path = path.as_ref();
1060 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1061 let path_meta = std::fs::metadata(&safe_path).ok();
1062 let is_dir = path_meta
1063 .as_ref()
1064 .map(|meta| meta.is_dir())
1065 .unwrap_or(false);
1066 let is_component = !is_dir
1067 && safe_path
1068 .extension()
1069 .and_then(|ext| ext.to_str())
1070 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1071 .unwrap_or(false);
1072 let archive_hint_path = if let Some(source) = archive_source {
1073 let (_, normalized) = normalize_pack_path(source)?;
1074 Some(normalized)
1075 } else if is_component || is_dir {
1076 None
1077 } else {
1078 Some(safe_path.clone())
1079 };
1080 let archive_hint = archive_hint_path.as_deref();
1081 if verify_archive {
1082 if let Some(verify_target) = archive_hint.and_then(|p| {
1083 std::fs::metadata(p)
1084 .ok()
1085 .filter(|meta| meta.is_file())
1086 .map(|_| p)
1087 }) {
1088 verify::verify_pack(verify_target).await?;
1089 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1090 } else {
1091 tracing::debug!("skipping archive verification (no archive source)");
1092 }
1093 }
1094 let engine = Engine::default();
1095 let engine_profile =
1096 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1097 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1098 let mut metadata = PackMetadata::fallback(&safe_path);
1099 let mut manifest = None;
1100 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1101 let mut flows = None;
1102 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1103 if is_dir {
1104 Some(safe_path.clone())
1105 } else {
1106 None
1107 }
1108 });
1109
1110 if let Some(root) = materialized_root.as_ref() {
1111 match load_manifest_and_flows_from_dir(root) {
1112 Ok(ManifestLoad::New {
1113 manifest: m,
1114 flows: cache,
1115 }) => {
1116 metadata = cache.metadata.clone();
1117 manifest = Some(*m);
1118 flows = Some(cache);
1119 }
1120 Ok(ManifestLoad::Legacy {
1121 manifest: m,
1122 flows: cache,
1123 }) => {
1124 metadata = cache.metadata.clone();
1125 legacy_manifest = Some(m);
1126 flows = Some(cache);
1127 }
1128 Err(err) => {
1129 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1130 }
1131 }
1132 }
1133
1134 if manifest.is_none()
1135 && legacy_manifest.is_none()
1136 && let Some(archive_path) = archive_hint
1137 {
1138 match load_manifest_and_flows(archive_path) {
1139 Ok(ManifestLoad::New {
1140 manifest: m,
1141 flows: cache,
1142 }) => {
1143 metadata = cache.metadata.clone();
1144 manifest = Some(*m);
1145 flows = Some(cache);
1146 }
1147 Ok(ManifestLoad::Legacy {
1148 manifest: m,
1149 flows: cache,
1150 }) => {
1151 metadata = cache.metadata.clone();
1152 legacy_manifest = Some(m);
1153 flows = Some(cache);
1154 }
1155 Err(err) => {
1156 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1157 }
1158 }
1159 }
1160 #[cfg(feature = "fault-injection")]
1161 {
1162 let fault_ctx = FaultContext {
1163 pack_id: metadata.pack_id.as_str(),
1164 flow_id: "unknown",
1165 node_id: None,
1166 attempt: 1,
1167 };
1168 maybe_fail(FaultPoint::PackResolve, fault_ctx)
1169 .map_err(|err| anyhow!(err.to_string()))?;
1170 }
1171 let mut pack_lock = None;
1172 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1173 pack_lock = load_pack_lock(&root)?;
1174 if pack_lock.is_some() {
1175 break;
1176 }
1177 }
1178 let component_sources_payload = if pack_lock.is_none() {
1179 if let Some(manifest) = manifest.as_ref() {
1180 manifest
1181 .get_component_sources_v1()
1182 .context("invalid component sources extension")?
1183 } else {
1184 None
1185 }
1186 } else {
1187 None
1188 };
1189 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1190 Some(component_sources_table_from_pack_lock(
1191 lock,
1192 component_resolution.allow_missing_hash,
1193 )?)
1194 } else {
1195 component_sources_table(component_sources_payload.as_ref())?
1196 };
1197 let components = if is_component {
1198 let wasm_bytes = fs::read(&safe_path).await?;
1199 metadata = PackMetadata::from_wasm(&wasm_bytes)
1200 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1201 let name = safe_path
1202 .file_stem()
1203 .map(|s| s.to_string_lossy().to_string())
1204 .unwrap_or_else(|| "component".to_string());
1205 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1206 let mut map = HashMap::new();
1207 map.insert(
1208 name.clone(),
1209 PackComponent {
1210 name,
1211 version: metadata.version.clone(),
1212 component,
1213 },
1214 );
1215 map
1216 } else {
1217 let specs = component_specs(
1218 manifest.as_ref(),
1219 legacy_manifest.as_deref(),
1220 component_sources_payload.as_ref(),
1221 pack_lock.as_ref(),
1222 );
1223 if specs.is_empty() {
1224 HashMap::new()
1225 } else {
1226 let mut loaded = HashMap::new();
1227 let mut missing: HashSet<String> =
1228 specs.iter().map(|spec| spec.id.clone()).collect();
1229 let mut searched = Vec::new();
1230
1231 if !component_resolution.overrides.is_empty() {
1232 load_components_from_overrides(
1233 &cache,
1234 &engine,
1235 &component_resolution.overrides,
1236 &specs,
1237 &mut missing,
1238 &mut loaded,
1239 )
1240 .await?;
1241 searched.push("override map".to_string());
1242 }
1243
1244 if let Some(component_sources) = component_sources.as_ref() {
1245 load_components_from_sources(
1246 &cache,
1247 &engine,
1248 component_sources,
1249 &component_resolution,
1250 &specs,
1251 &mut missing,
1252 &mut loaded,
1253 materialized_root.as_deref(),
1254 archive_hint,
1255 )
1256 .await?;
1257 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1258 }
1259
1260 if let Some(root) = materialized_root.as_ref() {
1261 load_components_from_dir(
1262 &cache,
1263 &engine,
1264 root,
1265 &specs,
1266 &mut missing,
1267 &mut loaded,
1268 )
1269 .await?;
1270 searched.push(format!("components dir {}", root.display()));
1271 }
1272
1273 if let Some(archive_path) = archive_hint {
1274 load_components_from_archive(
1275 &cache,
1276 &engine,
1277 archive_path,
1278 &specs,
1279 &mut missing,
1280 &mut loaded,
1281 )
1282 .await?;
1283 searched.push(format!("archive {}", archive_path.display()));
1284 }
1285
1286 if !missing.is_empty() {
1287 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1288 let sources = if searched.is_empty() {
1289 "no component sources".to_string()
1290 } else {
1291 searched.join(", ")
1292 };
1293 bail!(
1294 "components missing: {}; looked in {}",
1295 missing_list,
1296 sources
1297 );
1298 }
1299
1300 loaded
1301 }
1302 };
1303 let http_client = Arc::clone(&HTTP_CLIENT);
1304 let mut component_manifests = HashMap::new();
1305 if let Some(manifest) = manifest.as_ref() {
1306 for component in &manifest.components {
1307 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1308 }
1309 }
1310 Ok(Self {
1311 path: safe_path,
1312 archive_path: archive_hint.map(Path::to_path_buf),
1313 config,
1314 engine,
1315 metadata,
1316 manifest,
1317 legacy_manifest,
1318 component_manifests,
1319 mocks,
1320 flows,
1321 components,
1322 http_client,
1323 pre_cache: Mutex::new(HashMap::new()),
1324 session_store,
1325 state_store,
1326 wasi_policy,
1327 provider_registry: RwLock::new(None),
1328 secrets,
1329 oauth_config,
1330 cache,
1331 })
1332 }
1333
1334 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1335 if let Some(cache) = &self.flows {
1336 return Ok(cache.descriptors.clone());
1337 }
1338 if let Some(manifest) = &self.manifest {
1339 let descriptors = manifest
1340 .flows
1341 .iter()
1342 .map(|flow| FlowDescriptor {
1343 id: flow.id.as_str().to_string(),
1344 flow_type: flow_kind_to_str(flow.kind).to_string(),
1345 pack_id: manifest.pack_id.as_str().to_string(),
1346 profile: manifest.pack_id.as_str().to_string(),
1347 version: manifest.version.to_string(),
1348 description: None,
1349 })
1350 .collect();
1351 return Ok(descriptors);
1352 }
1353 Ok(Vec::new())
1354 }
1355
1356 #[allow(dead_code)]
1357 pub async fn run_flow(
1358 &self,
1359 flow_id: &str,
1360 input: serde_json::Value,
1361 ) -> Result<serde_json::Value> {
1362 let pack = Arc::new(
1363 PackRuntime::load(
1364 &self.path,
1365 Arc::clone(&self.config),
1366 self.mocks.clone(),
1367 self.archive_path.as_deref(),
1368 self.session_store.clone(),
1369 self.state_store.clone(),
1370 Arc::clone(&self.wasi_policy),
1371 self.secrets.clone(),
1372 self.oauth_config.clone(),
1373 false,
1374 ComponentResolution::default(),
1375 )
1376 .await?,
1377 );
1378
1379 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1380 let retry_config = self.config.retry_config().into();
1381 let mocks = pack.mocks.as_deref();
1382 let tenant = self.config.tenant.as_str();
1383
1384 let ctx = FlowContext {
1385 tenant,
1386 pack_id: pack.metadata().pack_id.as_str(),
1387 flow_id,
1388 node_id: None,
1389 tool: None,
1390 action: None,
1391 session_id: None,
1392 provider_id: None,
1393 retry_config,
1394 attempt: 1,
1395 observer: None,
1396 mocks,
1397 };
1398
1399 let execution = engine.execute(ctx, input).await?;
1400 match execution.status {
1401 FlowStatus::Completed => Ok(execution.output),
1402 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1403 "status": "pending",
1404 "reason": wait.reason,
1405 "resume": wait.snapshot,
1406 "response": execution.output,
1407 })),
1408 }
1409 }
1410
1411 pub async fn invoke_component(
1412 &self,
1413 component_ref: &str,
1414 ctx: ComponentExecCtx,
1415 operation: &str,
1416 _config_json: Option<String>,
1417 input_json: String,
1418 ) -> Result<Value> {
1419 let pack_component = self
1420 .components
1421 .get(component_ref)
1422 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1423
1424 let mut linker = Linker::new(&self.engine);
1425 let allow_state_store = self.allows_state_store(component_ref);
1426 register_all(&mut linker, allow_state_store)?;
1427 add_component_control_to_linker(&mut linker)?;
1428
1429 let host_state = HostState::new(
1430 self.metadata().pack_id.clone(),
1431 Arc::clone(&self.config),
1432 Arc::clone(&self.http_client),
1433 self.mocks.clone(),
1434 self.session_store.clone(),
1435 self.state_store.clone(),
1436 Arc::clone(&self.secrets),
1437 self.oauth_config.clone(),
1438 Some(ctx.clone()),
1439 )?;
1440 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1441 let mut store = wasmtime::Store::new(&self.engine, store_state);
1442 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1443 let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1444 Ok(pre) => {
1445 let bindings = pre.instantiate_async(&mut store).await?;
1446 let node = bindings.greentic_component_node();
1447 let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1448 let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1449 component_api::invoke_result_from_v0_5(result)
1450 }
1451 Err(err) => {
1452 if is_missing_node_export(&err, "0.5.0") {
1453 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1454 let pre: component_api::v0_4::ComponentPre<ComponentState> =
1455 component_api::v0_4::ComponentPre::new(pre_instance)?;
1456 let bindings = pre.instantiate_async(&mut store).await?;
1457 let node = bindings.greentic_component_node();
1458 let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1459 let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1460 component_api::invoke_result_from_v0_4(result)
1461 } else {
1462 return Err(err);
1463 }
1464 }
1465 };
1466
1467 match result {
1468 InvokeResult::Ok(body) => {
1469 if body.is_empty() {
1470 return Ok(Value::Null);
1471 }
1472 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1473 }
1474 InvokeResult::Err(NodeError {
1475 code,
1476 message,
1477 retryable,
1478 backoff_ms,
1479 details,
1480 }) => {
1481 let mut obj = serde_json::Map::new();
1482 obj.insert("ok".into(), Value::Bool(false));
1483 let mut error = serde_json::Map::new();
1484 error.insert("code".into(), Value::String(code));
1485 error.insert("message".into(), Value::String(message));
1486 error.insert("retryable".into(), Value::Bool(retryable));
1487 if let Some(backoff) = backoff_ms {
1488 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1489 }
1490 if let Some(details) = details {
1491 error.insert(
1492 "details".into(),
1493 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1494 );
1495 }
1496 obj.insert("error".into(), Value::Object(error));
1497 Ok(Value::Object(obj))
1498 }
1499 }
1500 }
1501
1502 pub fn resolve_provider(
1503 &self,
1504 provider_id: Option<&str>,
1505 provider_type: Option<&str>,
1506 ) -> Result<ProviderBinding> {
1507 let registry = self.provider_registry()?;
1508 registry.resolve(provider_id, provider_type)
1509 }
1510
1511 pub async fn invoke_provider(
1512 &self,
1513 binding: &ProviderBinding,
1514 ctx: ComponentExecCtx,
1515 op: &str,
1516 input_json: Vec<u8>,
1517 ) -> Result<Value> {
1518 let component_ref = &binding.component_ref;
1519 let pack_component = self
1520 .components
1521 .get(component_ref)
1522 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1523
1524 let mut linker = Linker::new(&self.engine);
1525 let allow_state_store = self.allows_state_store(component_ref);
1526 register_all(&mut linker, allow_state_store)?;
1527 add_component_control_to_linker(&mut linker)?;
1528 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1529 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1530
1531 let host_state = HostState::new(
1532 self.metadata().pack_id.clone(),
1533 Arc::clone(&self.config),
1534 Arc::clone(&self.http_client),
1535 self.mocks.clone(),
1536 self.session_store.clone(),
1537 self.state_store.clone(),
1538 Arc::clone(&self.secrets),
1539 self.oauth_config.clone(),
1540 Some(ctx),
1541 )?;
1542 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1543 let mut store = wasmtime::Store::new(&self.engine, store_state);
1544 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1545 let provider = bindings.greentic_provider_core_schema_core_api();
1546
1547 let result = provider.call_invoke(&mut store, op, &input_json)?;
1548 deserialize_json_bytes(result)
1549 }
1550
1551 fn provider_registry(&self) -> Result<ProviderRegistry> {
1552 if let Some(registry) = self.provider_registry.read().clone() {
1553 return Ok(registry);
1554 }
1555 let manifest = self
1556 .manifest
1557 .as_ref()
1558 .context("pack manifest required for provider resolution")?;
1559 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1560 let registry = ProviderRegistry::new(
1561 manifest,
1562 self.state_store.clone(),
1563 &self.config.tenant,
1564 &env,
1565 )?;
1566 *self.provider_registry.write() = Some(registry.clone());
1567 Ok(registry)
1568 }
1569
1570 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1571 if let Some(cache) = &self.flows {
1572 return cache
1573 .flows
1574 .get(flow_id)
1575 .cloned()
1576 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1577 }
1578 if let Some(manifest) = &self.manifest {
1579 let entry = manifest
1580 .flows
1581 .iter()
1582 .find(|f| f.id.as_str() == flow_id)
1583 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1584 return Ok(entry.flow.clone());
1585 }
1586 bail!("flow '{flow_id}' not available (pack exports disabled)")
1587 }
1588
1589 pub fn metadata(&self) -> &PackMetadata {
1590 &self.metadata
1591 }
1592
1593 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1594 &self.metadata.secret_requirements
1595 }
1596
1597 pub fn missing_secrets(
1598 &self,
1599 tenant_ctx: &TypesTenantCtx,
1600 ) -> Vec<greentic_types::SecretRequirement> {
1601 let env = tenant_ctx.env.as_str().to_string();
1602 let tenant = tenant_ctx.tenant.as_str().to_string();
1603 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1604 self.required_secrets()
1605 .iter()
1606 .filter(|req| {
1607 if let Some(scope) = &req.scope {
1609 if scope.env != env {
1610 return false;
1611 }
1612 if scope.tenant != tenant {
1613 return false;
1614 }
1615 if let Some(ref team_req) = scope.team
1616 && team.as_ref() != Some(team_req)
1617 {
1618 return false;
1619 }
1620 }
1621 let ctx = self.config.tenant_ctx();
1622 read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1623 })
1624 .cloned()
1625 .collect()
1626 }
1627
1628 pub fn for_component_test(
1629 components: Vec<(String, PathBuf)>,
1630 flows: HashMap<String, FlowIR>,
1631 pack_id: &str,
1632 config: Arc<HostConfig>,
1633 ) -> Result<Self> {
1634 let engine = Engine::default();
1635 let engine_profile =
1636 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1637 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1638 let mut component_map = HashMap::new();
1639 for (name, path) in components {
1640 if !path.exists() {
1641 bail!("component artifact missing: {}", path.display());
1642 }
1643 let wasm_bytes = std::fs::read(&path)?;
1644 let component = Arc::new(
1645 Component::from_binary(&engine, &wasm_bytes)
1646 .with_context(|| format!("failed to compile component {}", path.display()))?,
1647 );
1648 component_map.insert(
1649 name.clone(),
1650 PackComponent {
1651 name,
1652 version: "0.0.0".into(),
1653 component,
1654 },
1655 );
1656 }
1657
1658 let mut flow_map = HashMap::new();
1659 let mut descriptors = Vec::new();
1660 for (id, ir) in flows {
1661 let flow_type = ir.flow_type.clone();
1662 let flow = flow_ir_to_flow(ir)?;
1663 flow_map.insert(id.clone(), flow);
1664 descriptors.push(FlowDescriptor {
1665 id: id.clone(),
1666 flow_type,
1667 pack_id: pack_id.to_string(),
1668 profile: "test".into(),
1669 version: "0.0.0".into(),
1670 description: None,
1671 });
1672 }
1673 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1674 let metadata = PackMetadata {
1675 pack_id: pack_id.to_string(),
1676 version: "0.0.0".into(),
1677 entry_flows,
1678 secret_requirements: Vec::new(),
1679 };
1680 let flows_cache = PackFlows {
1681 descriptors: descriptors.clone(),
1682 flows: flow_map,
1683 metadata: metadata.clone(),
1684 };
1685
1686 Ok(Self {
1687 path: PathBuf::new(),
1688 archive_path: None,
1689 config,
1690 engine,
1691 metadata,
1692 manifest: None,
1693 legacy_manifest: None,
1694 component_manifests: HashMap::new(),
1695 mocks: None,
1696 flows: Some(flows_cache),
1697 components: component_map,
1698 http_client: Arc::clone(&HTTP_CLIENT),
1699 pre_cache: Mutex::new(HashMap::new()),
1700 session_store: None,
1701 state_store: None,
1702 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1703 provider_registry: RwLock::new(None),
1704 secrets: crate::secrets::default_manager()?,
1705 oauth_config: None,
1706 cache,
1707 })
1708 }
1709}
1710
1711fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1712 let message = err.to_string();
1713 message.contains("no exported instance named")
1714 && message.contains(&format!("greentic:component/node@{version}"))
1715}
1716
1717struct PackFlows {
1718 descriptors: Vec<FlowDescriptor>,
1719 flows: HashMap<String, Flow>,
1720 metadata: PackMetadata,
1721}
1722
1723const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1724 "greentic.pack.runtime_flow",
1725 "greentic.pack.flow_runtime",
1726 "greentic.pack.runtime_flows",
1727];
1728
1729#[derive(Debug, Deserialize)]
1730struct RuntimeFlowBundle {
1731 flows: Vec<RuntimeFlow>,
1732}
1733
1734#[derive(Debug, Deserialize)]
1735struct RuntimeFlow {
1736 id: String,
1737 #[serde(alias = "flow_type")]
1738 kind: FlowKind,
1739 #[serde(default)]
1740 schema_version: Option<String>,
1741 #[serde(default)]
1742 start: Option<String>,
1743 #[serde(default)]
1744 entrypoints: BTreeMap<String, Value>,
1745 nodes: BTreeMap<String, RuntimeNode>,
1746 #[serde(default)]
1747 metadata: Option<FlowMetadata>,
1748}
1749
1750#[derive(Debug, Deserialize)]
1751struct RuntimeNode {
1752 #[serde(alias = "component")]
1753 component_id: String,
1754 #[serde(default, alias = "operation")]
1755 operation_name: Option<String>,
1756 #[serde(default, alias = "payload", alias = "input")]
1757 operation_payload: Value,
1758 #[serde(default)]
1759 routing: Option<Routing>,
1760 #[serde(default)]
1761 telemetry: Option<TelemetryHints>,
1762}
1763
1764fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1765 if bytes.is_empty() {
1766 return Ok(Value::Null);
1767 }
1768 serde_json::from_slice(&bytes).or_else(|_| {
1769 String::from_utf8(bytes)
1770 .map(Value::String)
1771 .map_err(|err| anyhow!(err))
1772 })
1773}
1774
1775impl PackFlows {
1776 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1777 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1778 return flows;
1779 }
1780 let descriptors = manifest
1781 .flows
1782 .iter()
1783 .map(|entry| FlowDescriptor {
1784 id: entry.id.as_str().to_string(),
1785 flow_type: flow_kind_to_str(entry.kind).to_string(),
1786 pack_id: manifest.pack_id.as_str().to_string(),
1787 profile: manifest.pack_id.as_str().to_string(),
1788 version: manifest.version.to_string(),
1789 description: None,
1790 })
1791 .collect();
1792 let mut flows = HashMap::new();
1793 for entry in &manifest.flows {
1794 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1795 }
1796 Self {
1797 metadata: PackMetadata::from_manifest(&manifest),
1798 descriptors,
1799 flows,
1800 }
1801 }
1802}
1803
1804fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1805 let extensions = manifest.extensions.as_ref()?;
1806 let extension = extensions.iter().find_map(|(key, ext)| {
1807 if RUNTIME_FLOW_EXTENSION_IDS
1808 .iter()
1809 .any(|candidate| candidate == key)
1810 {
1811 Some(ext)
1812 } else {
1813 None
1814 }
1815 })?;
1816 let runtime_flows = match decode_runtime_flow_extension(extension) {
1817 Some(flows) if !flows.is_empty() => flows,
1818 _ => return None,
1819 };
1820
1821 let descriptors = runtime_flows
1822 .iter()
1823 .map(|flow| FlowDescriptor {
1824 id: flow.id.as_str().to_string(),
1825 flow_type: flow_kind_to_str(flow.kind).to_string(),
1826 pack_id: manifest.pack_id.as_str().to_string(),
1827 profile: manifest.pack_id.as_str().to_string(),
1828 version: manifest.version.to_string(),
1829 description: None,
1830 })
1831 .collect::<Vec<_>>();
1832 let flows = runtime_flows
1833 .into_iter()
1834 .map(|flow| (flow.id.as_str().to_string(), flow))
1835 .collect();
1836
1837 Some(PackFlows {
1838 metadata: PackMetadata::from_manifest(manifest),
1839 descriptors,
1840 flows,
1841 })
1842}
1843
1844fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1845 let value = match extension.inline.as_ref()? {
1846 ExtensionInline::Other(value) => value.clone(),
1847 _ => return None,
1848 };
1849
1850 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1851 return Some(collect_runtime_flows(bundle.flows));
1852 }
1853
1854 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1855 return Some(collect_runtime_flows(flows));
1856 }
1857
1858 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1859 return Some(flows);
1860 }
1861
1862 warn!(
1863 extension = %extension.kind,
1864 version = %extension.version,
1865 "runtime flow extension present but could not be decoded"
1866 );
1867 None
1868}
1869
1870fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1871 flows
1872 .into_iter()
1873 .filter_map(|flow| match runtime_flow_to_flow(flow) {
1874 Ok(flow) => Some(flow),
1875 Err(err) => {
1876 warn!(error = %err, "failed to decode runtime flow");
1877 None
1878 }
1879 })
1880 .collect()
1881}
1882
1883fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1884 let flow_id = FlowId::from_str(&runtime.id)
1885 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1886 let mut entrypoints = runtime.entrypoints;
1887 if entrypoints.is_empty()
1888 && let Some(start) = &runtime.start
1889 {
1890 entrypoints.insert("default".into(), Value::String(start.clone()));
1891 }
1892
1893 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1894 for (id, node) in runtime.nodes {
1895 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1896 let component_id = ComponentId::from_str(&node.component_id)
1897 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1898 let component = FlowComponentRef {
1899 id: component_id,
1900 pack_alias: None,
1901 operation: node.operation_name,
1902 };
1903 let routing = node.routing.unwrap_or(Routing::End);
1904 let telemetry = node.telemetry.unwrap_or_default();
1905 nodes.insert(
1906 node_id.clone(),
1907 Node {
1908 id: node_id,
1909 component,
1910 input: InputMapping {
1911 mapping: node.operation_payload,
1912 },
1913 output: OutputMapping {
1914 mapping: Value::Null,
1915 },
1916 routing,
1917 telemetry,
1918 },
1919 );
1920 }
1921
1922 Ok(Flow {
1923 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1924 id: flow_id,
1925 kind: runtime.kind,
1926 entrypoints,
1927 nodes,
1928 metadata: runtime.metadata.unwrap_or_default(),
1929 })
1930}
1931
1932fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1933 match kind {
1934 greentic_types::FlowKind::Messaging => "messaging",
1935 greentic_types::FlowKind::Event => "event",
1936 greentic_types::FlowKind::ComponentConfig => "component-config",
1937 greentic_types::FlowKind::Job => "job",
1938 greentic_types::FlowKind::Http => "http",
1939 }
1940}
1941
1942fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1943 let mut file = archive
1944 .by_name(name)
1945 .with_context(|| format!("entry {name} missing from archive"))?;
1946 let mut buf = Vec::new();
1947 file.read_to_end(&mut buf)?;
1948 Ok(buf)
1949}
1950
1951fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1952 for node in doc.nodes.values_mut() {
1953 let Some((component_ref, payload)) = node
1954 .raw
1955 .iter()
1956 .next()
1957 .map(|(key, value)| (key.clone(), value.clone()))
1958 else {
1959 continue;
1960 };
1961 if component_ref.starts_with("emit.") {
1962 node.operation = Some(component_ref);
1963 node.payload = payload;
1964 node.raw.clear();
1965 continue;
1966 }
1967 let (target_component, operation, input, config) =
1968 infer_component_exec(&payload, &component_ref);
1969 let mut payload_obj = serde_json::Map::new();
1970 payload_obj.insert("component".into(), Value::String(target_component));
1972 payload_obj.insert("operation".into(), Value::String(operation));
1973 payload_obj.insert("input".into(), input);
1974 if let Some(cfg) = config {
1975 payload_obj.insert("config".into(), cfg);
1976 }
1977 node.operation = Some("component.exec".to_string());
1978 node.payload = Value::Object(payload_obj);
1979 node.raw.clear();
1980 }
1981 doc
1982}
1983
1984fn infer_component_exec(
1985 payload: &Value,
1986 component_ref: &str,
1987) -> (String, String, Value, Option<Value>) {
1988 let default_op = if component_ref.starts_with("templating.") {
1989 "render"
1990 } else {
1991 "invoke"
1992 }
1993 .to_string();
1994
1995 if let Value::Object(map) = payload {
1996 let op = map
1997 .get("op")
1998 .or_else(|| map.get("operation"))
1999 .and_then(Value::as_str)
2000 .map(|s| s.to_string())
2001 .unwrap_or_else(|| default_op.clone());
2002
2003 let mut input = map.clone();
2004 let config = input.remove("config");
2005 let component = input
2006 .get("component")
2007 .or_else(|| input.get("component_ref"))
2008 .and_then(Value::as_str)
2009 .map(|s| s.to_string())
2010 .unwrap_or_else(|| component_ref.to_string());
2011 input.remove("component");
2012 input.remove("component_ref");
2013 input.remove("op");
2014 input.remove("operation");
2015 return (component, op, Value::Object(input), config);
2016 }
2017
2018 (component_ref.to_string(), default_op, payload.clone(), None)
2019}
2020
2021#[derive(Clone, Debug)]
2022struct ComponentSpec {
2023 id: String,
2024 version: String,
2025 legacy_path: Option<String>,
2026}
2027
2028#[derive(Clone, Debug)]
2029struct ComponentSourceInfo {
2030 digest: Option<String>,
2031 source: ComponentSourceRef,
2032 artifact: ComponentArtifactLocation,
2033 expected_wasm_sha256: Option<String>,
2034 skip_digest_verification: bool,
2035}
2036
2037#[derive(Clone, Debug)]
2038enum ComponentArtifactLocation {
2039 Inline { wasm_path: String },
2040 Remote,
2041}
2042
2043#[derive(Clone, Debug, Deserialize)]
2044struct PackLockV1 {
2045 schema_version: u32,
2046 components: Vec<PackLockComponent>,
2047}
2048
2049#[derive(Clone, Debug, Deserialize)]
2050struct PackLockComponent {
2051 name: String,
2052 #[serde(default, rename = "source_ref")]
2053 source_ref: Option<String>,
2054 #[serde(default, rename = "ref")]
2055 legacy_ref: Option<String>,
2056 #[serde(default)]
2057 component_id: Option<ComponentId>,
2058 #[serde(default)]
2059 bundled: Option<bool>,
2060 #[serde(default, rename = "bundled_path")]
2061 bundled_path: Option<String>,
2062 #[serde(default, rename = "path")]
2063 legacy_path: Option<String>,
2064 #[serde(default)]
2065 wasm_sha256: Option<String>,
2066 #[serde(default, rename = "sha256")]
2067 legacy_sha256: Option<String>,
2068 #[serde(default)]
2069 resolved_digest: Option<String>,
2070 #[serde(default)]
2071 digest: Option<String>,
2072}
2073
2074fn component_specs(
2075 manifest: Option<&greentic_types::PackManifest>,
2076 legacy_manifest: Option<&legacy_pack::PackManifest>,
2077 component_sources: Option<&ComponentSourcesV1>,
2078 pack_lock: Option<&PackLockV1>,
2079) -> Vec<ComponentSpec> {
2080 if let Some(manifest) = manifest {
2081 if !manifest.components.is_empty() {
2082 return manifest
2083 .components
2084 .iter()
2085 .map(|entry| ComponentSpec {
2086 id: entry.id.as_str().to_string(),
2087 version: entry.version.to_string(),
2088 legacy_path: None,
2089 })
2090 .collect();
2091 }
2092 if let Some(lock) = pack_lock {
2093 let mut seen = HashSet::new();
2094 let mut specs = Vec::new();
2095 for entry in &lock.components {
2096 let id = entry
2097 .component_id
2098 .as_ref()
2099 .map(|id| id.as_str())
2100 .unwrap_or(entry.name.as_str());
2101 if seen.insert(id.to_string()) {
2102 specs.push(ComponentSpec {
2103 id: id.to_string(),
2104 version: "0.0.0".to_string(),
2105 legacy_path: None,
2106 });
2107 }
2108 }
2109 return specs;
2110 }
2111 if let Some(sources) = component_sources {
2112 let mut seen = HashSet::new();
2113 let mut specs = Vec::new();
2114 for entry in &sources.components {
2115 let id = entry
2116 .component_id
2117 .as_ref()
2118 .map(|id| id.as_str())
2119 .unwrap_or(entry.name.as_str());
2120 if seen.insert(id.to_string()) {
2121 specs.push(ComponentSpec {
2122 id: id.to_string(),
2123 version: "0.0.0".to_string(),
2124 legacy_path: None,
2125 });
2126 }
2127 }
2128 return specs;
2129 }
2130 }
2131 if let Some(legacy_manifest) = legacy_manifest {
2132 return legacy_manifest
2133 .components
2134 .iter()
2135 .map(|entry| ComponentSpec {
2136 id: entry.name.clone(),
2137 version: entry.version.to_string(),
2138 legacy_path: Some(entry.file_wasm.clone()),
2139 })
2140 .collect();
2141 }
2142 Vec::new()
2143}
2144
2145fn component_sources_table(
2146 sources: Option<&ComponentSourcesV1>,
2147) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2148 let Some(sources) = sources else {
2149 return Ok(None);
2150 };
2151 let mut table = HashMap::new();
2152 for entry in &sources.components {
2153 let artifact = match &entry.artifact {
2154 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2155 wasm_path: wasm_path.clone(),
2156 },
2157 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2158 };
2159 let info = ComponentSourceInfo {
2160 digest: Some(entry.resolved.digest.clone()),
2161 source: entry.source.clone(),
2162 artifact,
2163 expected_wasm_sha256: None,
2164 skip_digest_verification: false,
2165 };
2166 if let Some(component_id) = entry.component_id.as_ref() {
2167 table.insert(component_id.as_str().to_string(), info.clone());
2168 }
2169 table.insert(entry.name.clone(), info);
2170 }
2171 Ok(Some(table))
2172}
2173
2174fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2175 let lock_path = if path.is_dir() {
2176 let candidate = path.join("pack.lock");
2177 if candidate.exists() {
2178 Some(candidate)
2179 } else {
2180 let candidate = path.join("pack.lock.json");
2181 candidate.exists().then_some(candidate)
2182 }
2183 } else {
2184 None
2185 };
2186 let Some(lock_path) = lock_path else {
2187 return Ok(None);
2188 };
2189 let raw = std::fs::read_to_string(&lock_path)
2190 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2191 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2192 if lock.schema_version != 1 {
2193 bail!("pack.lock schema_version must be 1");
2194 }
2195 Ok(Some(lock))
2196}
2197
2198fn find_pack_lock_roots(
2199 pack_path: &Path,
2200 is_dir: bool,
2201 archive_hint: Option<&Path>,
2202) -> Vec<PathBuf> {
2203 if is_dir {
2204 return vec![pack_path.to_path_buf()];
2205 }
2206 let mut roots = Vec::new();
2207 if let Some(archive_path) = archive_hint {
2208 if let Some(parent) = archive_path.parent() {
2209 roots.push(parent.to_path_buf());
2210 if let Some(grandparent) = parent.parent() {
2211 roots.push(grandparent.to_path_buf());
2212 }
2213 }
2214 } else if let Some(parent) = pack_path.parent() {
2215 roots.push(parent.to_path_buf());
2216 if let Some(grandparent) = parent.parent() {
2217 roots.push(grandparent.to_path_buf());
2218 }
2219 }
2220 roots
2221}
2222
2223fn normalize_sha256(digest: &str) -> Result<String> {
2224 let trimmed = digest.trim();
2225 if trimmed.is_empty() {
2226 bail!("sha256 digest cannot be empty");
2227 }
2228 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2229 if stripped.is_empty() {
2230 bail!("sha256 digest must include hex bytes after sha256:");
2231 }
2232 return Ok(trimmed.to_string());
2233 }
2234 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2235 return Ok(format!("sha256:{trimmed}"));
2236 }
2237 bail!("sha256 digest must be hex or sha256:<hex>");
2238}
2239
2240fn component_sources_table_from_pack_lock(
2241 lock: &PackLockV1,
2242 allow_missing_hash: bool,
2243) -> Result<HashMap<String, ComponentSourceInfo>> {
2244 let mut table = HashMap::new();
2245 let mut names = HashSet::new();
2246 for entry in &lock.components {
2247 if !names.insert(entry.name.clone()) {
2248 bail!(
2249 "pack.lock contains duplicate component name `{}`",
2250 entry.name
2251 );
2252 }
2253 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2254 (Some(primary), Some(legacy)) => {
2255 if primary != legacy {
2256 bail!(
2257 "pack.lock component {} has conflicting refs: {} vs {}",
2258 entry.name,
2259 primary,
2260 legacy
2261 );
2262 }
2263 primary.as_str()
2264 }
2265 (Some(primary), None) => primary.as_str(),
2266 (None, Some(legacy)) => legacy.as_str(),
2267 (None, None) => {
2268 bail!("pack.lock component {} missing source_ref", entry.name);
2269 }
2270 };
2271 let source: ComponentSourceRef = source_ref
2272 .parse()
2273 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2274 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2275 (Some(primary), Some(legacy)) => {
2276 if primary != legacy {
2277 bail!(
2278 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2279 entry.name,
2280 primary,
2281 legacy
2282 );
2283 }
2284 Some(primary.clone())
2285 }
2286 (Some(primary), None) => Some(primary.clone()),
2287 (None, Some(legacy)) => Some(legacy.clone()),
2288 (None, None) => None,
2289 };
2290 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2291 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2292 let wasm_path = bundled_path.ok_or_else(|| {
2293 anyhow!(
2294 "pack.lock component {} marked bundled but bundled_path is missing",
2295 entry.name
2296 )
2297 })?;
2298 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2299 (Some(primary), Some(legacy)) => {
2300 if primary != legacy {
2301 bail!(
2302 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2303 entry.name,
2304 primary,
2305 legacy
2306 );
2307 }
2308 Some(primary.as_str())
2309 }
2310 (Some(primary), None) => Some(primary.as_str()),
2311 (None, Some(legacy)) => Some(legacy.as_str()),
2312 (None, None) => None,
2313 };
2314 let expected = match expected_raw {
2315 Some(value) => Some(normalize_sha256(value)?),
2316 None => None,
2317 };
2318 if expected.is_none() && !allow_missing_hash {
2319 bail!(
2320 "pack.lock component {} missing wasm_sha256 for bundled component",
2321 entry.name
2322 );
2323 }
2324 (
2325 ComponentArtifactLocation::Inline { wasm_path },
2326 expected.clone(),
2327 expected,
2328 allow_missing_hash && expected_raw.is_none(),
2329 )
2330 } else {
2331 if source.is_tag() {
2332 bail!(
2333 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2334 entry.name,
2335 source
2336 );
2337 }
2338 let expected = entry
2339 .resolved_digest
2340 .as_deref()
2341 .or(entry.digest.as_deref())
2342 .ok_or_else(|| {
2343 anyhow!(
2344 "pack.lock component {} missing resolved_digest for remote component",
2345 entry.name
2346 )
2347 })?;
2348 (
2349 ComponentArtifactLocation::Remote,
2350 Some(normalize_digest(expected)),
2351 None,
2352 false,
2353 )
2354 };
2355 let info = ComponentSourceInfo {
2356 digest,
2357 source,
2358 artifact,
2359 expected_wasm_sha256,
2360 skip_digest_verification,
2361 };
2362 if let Some(component_id) = entry.component_id.as_ref() {
2363 let key = component_id.as_str().to_string();
2364 if table.contains_key(&key) {
2365 bail!(
2366 "pack.lock contains duplicate component id `{}`",
2367 component_id.as_str()
2368 );
2369 }
2370 table.insert(key, info.clone());
2371 }
2372 if entry.name
2373 != entry
2374 .component_id
2375 .as_ref()
2376 .map(|id| id.as_str())
2377 .unwrap_or("")
2378 {
2379 table.insert(entry.name.clone(), info);
2380 }
2381 }
2382 Ok(table)
2383}
2384
2385fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2386 if let Some(path) = &spec.legacy_path {
2387 return root.join(path);
2388 }
2389 root.join("components").join(format!("{}.wasm", spec.id))
2390}
2391
2392fn normalize_digest(digest: &str) -> String {
2393 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2394 digest.to_string()
2395 } else {
2396 format!("sha256:{digest}")
2397 }
2398}
2399
2400fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2401 if digest.starts_with("blake3:") {
2402 let hash = blake3::hash(bytes);
2403 return Ok(format!("blake3:{}", hash.to_hex()));
2404 }
2405 let mut hasher = sha2::Sha256::new();
2406 hasher.update(bytes);
2407 Ok(format!("sha256:{:x}", hasher.finalize()))
2408}
2409
2410fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2411 let mut hasher = sha2::Sha256::new();
2412 hasher.update(bytes);
2413 format!("sha256:{:x}", hasher.finalize())
2414}
2415
2416fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2417 let wasm_digest = digest
2418 .map(normalize_digest)
2419 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2420 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2421}
2422
2423async fn compile_component_with_cache(
2424 cache: &CacheManager,
2425 engine: &Engine,
2426 digest: Option<&str>,
2427 bytes: Vec<u8>,
2428) -> Result<Arc<Component>> {
2429 let key = build_artifact_key(cache, digest, &bytes);
2430 cache.get_component(engine, &key, || Ok(bytes)).await
2431}
2432
2433fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2434 let normalized_expected = normalize_digest(expected);
2435 let actual = compute_digest_for(bytes, &normalized_expected)?;
2436 if normalize_digest(&actual) != normalized_expected {
2437 bail!(
2438 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2439 );
2440 }
2441 Ok(())
2442}
2443
2444fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2445 let normalized_expected = normalize_sha256(expected)?;
2446 let actual = compute_sha256_digest_for(bytes);
2447 if actual != normalized_expected {
2448 bail!(
2449 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2450 );
2451 }
2452 Ok(())
2453}
2454
2455#[cfg(test)]
2456mod pack_lock_tests {
2457 use super::*;
2458 use tempfile::TempDir;
2459
2460 #[test]
2461 fn pack_lock_tag_ref_requires_bundle() {
2462 let lock = PackLockV1 {
2463 schema_version: 1,
2464 components: vec![PackLockComponent {
2465 name: "templates".to_string(),
2466 source_ref: Some("oci://registry.test/templates:latest".to_string()),
2467 legacy_ref: None,
2468 component_id: None,
2469 bundled: Some(false),
2470 bundled_path: None,
2471 legacy_path: None,
2472 wasm_sha256: None,
2473 legacy_sha256: None,
2474 resolved_digest: None,
2475 digest: None,
2476 }],
2477 };
2478 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2479 assert!(
2480 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2481 "unexpected error: {err}"
2482 );
2483 }
2484
2485 #[test]
2486 fn bundled_hash_mismatch_errors() {
2487 let rt = tokio::runtime::Runtime::new().expect("runtime");
2488 let temp = TempDir::new().expect("temp dir");
2489 let engine = Engine::default();
2490 let engine_profile =
2491 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2492 let cache_config = CacheConfig {
2493 root: temp.path().join("cache"),
2494 ..CacheConfig::default()
2495 };
2496 let cache = CacheManager::new(cache_config, engine_profile);
2497 let wasm_path = temp.path().join("component.wasm");
2498 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2499 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2500 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2501 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2502
2503 let spec = ComponentSpec {
2504 id: "qa.process".to_string(),
2505 version: "0.0.0".to_string(),
2506 legacy_path: None,
2507 };
2508 let mut missing = HashSet::new();
2509 missing.insert(spec.id.clone());
2510
2511 let mut sources = HashMap::new();
2512 sources.insert(
2513 spec.id.clone(),
2514 ComponentSourceInfo {
2515 digest: Some("sha256:deadbeef".to_string()),
2516 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2517 artifact: ComponentArtifactLocation::Inline {
2518 wasm_path: "component.wasm".to_string(),
2519 },
2520 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2521 skip_digest_verification: false,
2522 },
2523 );
2524
2525 let mut loaded = HashMap::new();
2526 let result = rt.block_on(load_components_from_sources(
2527 &cache,
2528 &engine,
2529 &sources,
2530 &ComponentResolution::default(),
2531 &[spec],
2532 &mut missing,
2533 &mut loaded,
2534 Some(temp.path()),
2535 None,
2536 ));
2537 let err = result.unwrap_err();
2538 assert!(
2539 err.to_string().contains("bundled digest mismatch"),
2540 "unexpected error: {err}"
2541 );
2542 }
2543}
2544
2545#[cfg(test)]
2546mod pack_resolution_prop_tests {
2547 use super::*;
2548 use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
2549 use proptest::prelude::*;
2550 use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
2551 use std::collections::BTreeSet;
2552 use std::path::Path;
2553 use std::str::FromStr;
2554
2555 #[derive(Clone, Debug)]
2556 enum ResolveRequest {
2557 ById(String),
2558 ByName(String),
2559 }
2560
2561 #[derive(Clone, Debug, PartialEq, Eq)]
2562 struct ResolvedComponent {
2563 key: String,
2564 source: String,
2565 artifact: String,
2566 digest: Option<String>,
2567 expected_wasm_sha256: Option<String>,
2568 skip_digest_verification: bool,
2569 }
2570
2571 #[derive(Clone, Debug, PartialEq, Eq)]
2572 struct ResolveError {
2573 code: String,
2574 message: String,
2575 context_key: String,
2576 }
2577
2578 #[derive(Clone, Debug)]
2579 struct Scenario {
2580 pack_lock: Option<PackLockV1>,
2581 component_sources: Option<ComponentSourcesV1>,
2582 request: ResolveRequest,
2583 expected_sha256: Option<String>,
2584 bytes: Vec<u8>,
2585 }
2586
2587 fn resolve_component_test(
2588 sources: Option<&ComponentSourcesV1>,
2589 lock: Option<&PackLockV1>,
2590 request: &ResolveRequest,
2591 ) -> Result<ResolvedComponent, ResolveError> {
2592 let table = if let Some(lock) = lock {
2593 component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
2594 code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
2595 message: err.to_string(),
2596 context_key: request_key(request).to_string(),
2597 })?
2598 } else {
2599 let sources = component_sources_table(sources).map_err(|err| ResolveError {
2600 code: "component_sources_error".to_string(),
2601 message: err.to_string(),
2602 context_key: request_key(request).to_string(),
2603 })?;
2604 sources.ok_or_else(|| ResolveError {
2605 code: "missing_component_sources".to_string(),
2606 message: "component sources not provided".to_string(),
2607 context_key: request_key(request).to_string(),
2608 })?
2609 };
2610
2611 let key = request_key(request);
2612 let source = table.get(key).ok_or_else(|| ResolveError {
2613 code: "component_not_found".to_string(),
2614 message: format!("component {key} not found"),
2615 context_key: key.to_string(),
2616 })?;
2617
2618 Ok(ResolvedComponent {
2619 key: key.to_string(),
2620 source: source.source.to_string(),
2621 artifact: match source.artifact {
2622 ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
2623 ComponentArtifactLocation::Remote => "remote".to_string(),
2624 },
2625 digest: source.digest.clone(),
2626 expected_wasm_sha256: source.expected_wasm_sha256.clone(),
2627 skip_digest_verification: source.skip_digest_verification,
2628 })
2629 }
2630
2631 fn request_key(request: &ResolveRequest) -> &str {
2632 match request {
2633 ResolveRequest::ById(value) => value.as_str(),
2634 ResolveRequest::ByName(value) => value.as_str(),
2635 }
2636 }
2637
2638 fn classify_pack_lock_error(message: &str) -> &'static str {
2639 if message.contains("duplicate component name") {
2640 "duplicate_name"
2641 } else if message.contains("duplicate component id") {
2642 "duplicate_id"
2643 } else if message.contains("conflicting refs") {
2644 "conflicting_ref"
2645 } else if message.contains("conflicting bundled paths") {
2646 "conflicting_bundled_path"
2647 } else if message.contains("conflicting wasm_sha256") {
2648 "conflicting_wasm_sha256"
2649 } else if message.contains("missing source_ref") {
2650 "missing_source_ref"
2651 } else if message.contains("marked bundled but bundled_path is missing") {
2652 "missing_bundled_path"
2653 } else if message.contains("missing wasm_sha256") {
2654 "missing_wasm_sha256"
2655 } else if message.contains("tag ref") && message.contains("not bundled") {
2656 "tag_ref_requires_bundle"
2657 } else if message.contains("missing resolved_digest") {
2658 "missing_resolved_digest"
2659 } else if message.contains("invalid component ref") {
2660 "invalid_component_ref"
2661 } else if message.contains("sha256 digest") {
2662 "invalid_sha256"
2663 } else {
2664 "unknown_error"
2665 }
2666 }
2667
2668 fn known_error_codes() -> BTreeSet<&'static str> {
2669 [
2670 "component_sources_error",
2671 "missing_component_sources",
2672 "component_not_found",
2673 "duplicate_name",
2674 "duplicate_id",
2675 "conflicting_ref",
2676 "conflicting_bundled_path",
2677 "conflicting_wasm_sha256",
2678 "missing_source_ref",
2679 "missing_bundled_path",
2680 "missing_wasm_sha256",
2681 "tag_ref_requires_bundle",
2682 "missing_resolved_digest",
2683 "invalid_component_ref",
2684 "invalid_sha256",
2685 "unknown_error",
2686 ]
2687 .into_iter()
2688 .collect()
2689 }
2690
2691 fn proptest_config() -> ProptestConfig {
2692 let cases = std::env::var("PROPTEST_CASES")
2693 .ok()
2694 .and_then(|value| value.parse::<u32>().ok())
2695 .unwrap_or(128);
2696 ProptestConfig {
2697 cases,
2698 failure_persistence: None,
2699 ..ProptestConfig::default()
2700 }
2701 }
2702
2703 fn proptest_seed() -> Option<[u8; 32]> {
2704 let seed = std::env::var("PROPTEST_SEED")
2705 .ok()
2706 .and_then(|value| value.parse::<u64>().ok())?;
2707 let mut bytes = [0u8; 32];
2708 bytes[..8].copy_from_slice(&seed.to_le_bytes());
2709 Some(bytes)
2710 }
2711
2712 fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
2713 let config = ProptestConfig {
2714 cases,
2715 failure_persistence: None,
2716 ..ProptestConfig::default()
2717 };
2718 let mut runner = match seed {
2719 Some(bytes) => {
2720 TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
2721 }
2722 None => TestRunner::new(config),
2723 };
2724 runner
2725 .run(&strategy, |scenario| {
2726 run_scenario(&scenario);
2727 Ok(())
2728 })
2729 .unwrap();
2730 }
2731
2732 fn run_scenario(scenario: &Scenario) {
2733 let known_codes = known_error_codes();
2734 let first = resolve_component_test(
2735 scenario.component_sources.as_ref(),
2736 scenario.pack_lock.as_ref(),
2737 &scenario.request,
2738 );
2739 let second = resolve_component_test(
2740 scenario.component_sources.as_ref(),
2741 scenario.pack_lock.as_ref(),
2742 &scenario.request,
2743 );
2744 assert_eq!(normalize_result(&first), normalize_result(&second));
2745
2746 if let Some(lock) = scenario.pack_lock.as_ref() {
2747 let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
2748 assert_eq!(normalize_result(&first), normalize_result(&lock_only));
2749 }
2750
2751 if let Err(err) = first.as_ref() {
2752 assert!(
2753 known_codes.contains(err.code.as_str()),
2754 "unexpected error code {}: {}",
2755 err.code,
2756 err.message
2757 );
2758 }
2759
2760 if let Some(expected) = scenario.expected_sha256.as_deref() {
2761 let expected_ok =
2762 verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
2763 let actual = compute_sha256_digest_for(&scenario.bytes);
2764 if actual == normalize_sha256(expected).unwrap_or_default() {
2765 assert!(expected_ok, "expected sha256 match to succeed");
2766 } else {
2767 assert!(!expected_ok, "expected sha256 mismatch to fail");
2768 }
2769 }
2770 }
2771
2772 fn normalize_result(
2773 result: &Result<ResolvedComponent, ResolveError>,
2774 ) -> Result<ResolvedComponent, ResolveError> {
2775 match result {
2776 Ok(value) => Ok(value.clone()),
2777 Err(err) => Err(err.clone()),
2778 }
2779 }
2780
2781 fn scenario_strategy() -> impl Strategy<Value = Scenario> {
2782 let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
2783 let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
2784 let tag_ref = any::<bool>();
2785 let bundled = any::<bool>();
2786 let include_sha = any::<bool>();
2787 let include_component_id = any::<bool>();
2788 let request_by_id = any::<bool>();
2789 let use_lock = any::<bool>();
2790 let use_sources = any::<bool>();
2791 let bytes = prop::collection::vec(any::<u8>(), 1..64);
2792
2793 (
2794 name,
2795 alt_name,
2796 tag_ref,
2797 bundled,
2798 include_sha,
2799 include_component_id,
2800 request_by_id,
2801 use_lock,
2802 use_sources,
2803 bytes,
2804 )
2805 .prop_map(
2806 |(
2807 name,
2808 alt_name,
2809 tag_ref,
2810 bundled,
2811 include_sha,
2812 include_component_id,
2813 request_by_id,
2814 use_lock,
2815 use_sources,
2816 bytes,
2817 )| {
2818 let component_id_str = if include_component_id {
2819 alt_name.clone()
2820 } else {
2821 name.clone()
2822 };
2823 let component_id = ComponentId::from_str(&component_id_str).ok();
2824 let source_ref = if tag_ref {
2825 format!("oci://registry.test/{name}:v1")
2826 } else {
2827 format!(
2828 "oci://registry.test/{name}@sha256:{}",
2829 hex::encode([0x11u8; 32])
2830 )
2831 };
2832 let expected_sha256 = if bundled && include_sha {
2833 Some(compute_sha256_digest_for(&bytes))
2834 } else {
2835 None
2836 };
2837
2838 let lock_component = PackLockComponent {
2839 name: name.clone(),
2840 source_ref: Some(source_ref),
2841 legacy_ref: None,
2842 component_id,
2843 bundled: Some(bundled),
2844 bundled_path: if bundled {
2845 Some(format!("components/{name}.wasm"))
2846 } else {
2847 None
2848 },
2849 legacy_path: None,
2850 wasm_sha256: expected_sha256.clone(),
2851 legacy_sha256: None,
2852 resolved_digest: if bundled {
2853 None
2854 } else {
2855 Some("sha256:deadbeef".to_string())
2856 },
2857 digest: None,
2858 };
2859
2860 let pack_lock = if use_lock {
2861 Some(PackLockV1 {
2862 schema_version: 1,
2863 components: vec![lock_component],
2864 })
2865 } else {
2866 None
2867 };
2868
2869 let component_sources = if use_sources {
2870 Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
2871 name: name.clone(),
2872 component_id: ComponentId::from_str(&name).ok(),
2873 source: ComponentSourceRef::from_str(
2874 "oci://registry.test/component@sha256:deadbeef",
2875 )
2876 .expect("component ref"),
2877 resolved: ResolvedComponentV1 {
2878 digest: "sha256:deadbeef".to_string(),
2879 signature: None,
2880 signed_by: None,
2881 },
2882 artifact: if bundled {
2883 ArtifactLocationV1::Inline {
2884 wasm_path: format!("components/{name}.wasm"),
2885 manifest_path: None,
2886 }
2887 } else {
2888 ArtifactLocationV1::Remote
2889 },
2890 licensing_hint: None,
2891 metering_hint: None,
2892 }]))
2893 } else {
2894 None
2895 };
2896
2897 let request = if request_by_id {
2898 ResolveRequest::ById(component_id_str.clone())
2899 } else {
2900 ResolveRequest::ByName(name.clone())
2901 };
2902
2903 Scenario {
2904 pack_lock,
2905 component_sources,
2906 request,
2907 expected_sha256,
2908 bytes,
2909 }
2910 },
2911 )
2912 }
2913
2914 #[test]
2915 fn pack_resolution_proptest() {
2916 let seed = proptest_seed();
2917 run_cases(scenario_strategy(), proptest_config().cases, seed);
2918 }
2919
2920 #[test]
2921 fn pack_resolution_regression_seeds() {
2922 let seeds_path =
2923 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
2924 let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
2925 for line in raw.lines() {
2926 let line = line.trim();
2927 if line.is_empty() || line.starts_with('#') {
2928 continue;
2929 }
2930 let seed = line.parse::<u64>().expect("seed must be an integer");
2931 let mut bytes = [0u8; 32];
2932 bytes[..8].copy_from_slice(&seed.to_le_bytes());
2933 run_cases(scenario_strategy(), 1, Some(bytes));
2934 }
2935 }
2936}
2937
2938fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2939 let mut opts = DistOptions {
2940 allow_tags: true,
2941 ..DistOptions::default()
2942 };
2943 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2944 opts.cache_dir = cache_dir;
2945 }
2946 if component_resolution.dist_offline {
2947 opts.offline = true;
2948 }
2949 opts
2950}
2951
2952#[allow(clippy::too_many_arguments)]
2953async fn load_components_from_sources(
2954 cache: &CacheManager,
2955 engine: &Engine,
2956 component_sources: &HashMap<String, ComponentSourceInfo>,
2957 component_resolution: &ComponentResolution,
2958 specs: &[ComponentSpec],
2959 missing: &mut HashSet<String>,
2960 into: &mut HashMap<String, PackComponent>,
2961 materialized_root: Option<&Path>,
2962 archive_hint: Option<&Path>,
2963) -> Result<()> {
2964 let mut archive = if let Some(path) = archive_hint {
2965 Some(
2966 ZipArchive::new(File::open(path)?)
2967 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2968 )
2969 } else {
2970 None
2971 };
2972 let mut dist_client: Option<DistClient> = None;
2973
2974 for spec in specs {
2975 if !missing.contains(&spec.id) {
2976 continue;
2977 }
2978 let Some(source) = component_sources.get(&spec.id) else {
2979 continue;
2980 };
2981
2982 let bytes = match &source.artifact {
2983 ComponentArtifactLocation::Inline { wasm_path } => {
2984 if let Some(root) = materialized_root {
2985 let path = root.join(wasm_path);
2986 if path.exists() {
2987 std::fs::read(&path).with_context(|| {
2988 format!(
2989 "failed to read inline component {} from {}",
2990 spec.id,
2991 path.display()
2992 )
2993 })?
2994 } else if archive.is_none() {
2995 bail!("inline component {} missing at {}", spec.id, path.display());
2996 } else {
2997 read_entry(
2998 archive.as_mut().expect("archive present when needed"),
2999 wasm_path,
3000 )
3001 .with_context(|| {
3002 format!(
3003 "inline component {} missing at {} in pack archive",
3004 spec.id, wasm_path
3005 )
3006 })?
3007 }
3008 } else if let Some(archive) = archive.as_mut() {
3009 read_entry(archive, wasm_path).with_context(|| {
3010 format!(
3011 "inline component {} missing at {} in pack archive",
3012 spec.id, wasm_path
3013 )
3014 })?
3015 } else {
3016 bail!(
3017 "inline component {} missing and no pack source available",
3018 spec.id
3019 );
3020 }
3021 }
3022 ComponentArtifactLocation::Remote => {
3023 if source.source.is_tag() {
3024 bail!(
3025 "component {} uses tag ref {} but is not bundled; rebuild the pack",
3026 spec.id,
3027 source.source
3028 );
3029 }
3030 let client = dist_client.get_or_insert_with(|| {
3031 DistClient::new(dist_options_from(component_resolution))
3032 });
3033 let reference = source.source.to_string();
3034 fault::maybe_fail_asset(&reference)
3035 .await
3036 .with_context(|| format!("fault injection blocked asset {reference}"))?;
3037 let digest = source.digest.as_deref().ok_or_else(|| {
3038 anyhow!(
3039 "component {} missing expected digest for remote component",
3040 spec.id
3041 )
3042 })?;
3043 let cache_path = if component_resolution.dist_offline {
3044 client
3045 .fetch_digest(digest)
3046 .await
3047 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3048 } else {
3049 let resolved = client
3050 .resolve_ref(&reference)
3051 .await
3052 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3053 let expected = normalize_digest(digest);
3054 let actual = normalize_digest(&resolved.digest);
3055 if expected != actual {
3056 bail!(
3057 "component {} digest mismatch after fetch: expected {}, got {}",
3058 spec.id,
3059 expected,
3060 actual
3061 );
3062 }
3063 resolved.cache_path.ok_or_else(|| {
3064 anyhow!(
3065 "component {} resolved from {} but cache path is missing",
3066 spec.id,
3067 reference
3068 )
3069 })?
3070 };
3071 std::fs::read(&cache_path).with_context(|| {
3072 format!(
3073 "failed to read cached component {} from {}",
3074 spec.id,
3075 cache_path.display()
3076 )
3077 })?
3078 }
3079 };
3080
3081 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3082 verify_wasm_sha256(&spec.id, expected, &bytes)?;
3083 } else if source.skip_digest_verification {
3084 let actual = compute_sha256_digest_for(&bytes);
3085 warn!(
3086 component_id = %spec.id,
3087 digest = %actual,
3088 "bundled component missing wasm_sha256; allowing due to flag"
3089 );
3090 } else {
3091 let expected = source.digest.as_deref().ok_or_else(|| {
3092 anyhow!(
3093 "component {} missing expected digest for verification",
3094 spec.id
3095 )
3096 })?;
3097 verify_component_digest(&spec.id, expected, &bytes)?;
3098 }
3099 let component =
3100 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3101 .await
3102 .with_context(|| format!("failed to compile component {}", spec.id))?;
3103 into.insert(
3104 spec.id.clone(),
3105 PackComponent {
3106 name: spec.id.clone(),
3107 version: spec.version.clone(),
3108 component,
3109 },
3110 );
3111 missing.remove(&spec.id);
3112 }
3113
3114 Ok(())
3115}
3116
3117fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3118 match err {
3119 DistError::CacheMiss { reference: missing } => anyhow!(
3120 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3121 component_id,
3122 missing,
3123 reference
3124 ),
3125 DistError::Offline { reference: blocked } => anyhow!(
3126 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3127 component_id,
3128 blocked,
3129 reference
3130 ),
3131 DistError::AuthRequired { target } => anyhow!(
3132 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3133 component_id,
3134 target,
3135 reference
3136 ),
3137 other => anyhow!(
3138 "failed to resolve component {} from {}: {}",
3139 component_id,
3140 reference,
3141 other
3142 ),
3143 }
3144}
3145
3146async fn load_components_from_overrides(
3147 cache: &CacheManager,
3148 engine: &Engine,
3149 overrides: &HashMap<String, PathBuf>,
3150 specs: &[ComponentSpec],
3151 missing: &mut HashSet<String>,
3152 into: &mut HashMap<String, PackComponent>,
3153) -> Result<()> {
3154 for spec in specs {
3155 if !missing.contains(&spec.id) {
3156 continue;
3157 }
3158 let Some(path) = overrides.get(&spec.id) else {
3159 continue;
3160 };
3161 let bytes = std::fs::read(path)
3162 .with_context(|| format!("failed to read override component {}", path.display()))?;
3163 let component = compile_component_with_cache(cache, engine, None, bytes)
3164 .await
3165 .with_context(|| {
3166 format!(
3167 "failed to compile component {} from override {}",
3168 spec.id,
3169 path.display()
3170 )
3171 })?;
3172 into.insert(
3173 spec.id.clone(),
3174 PackComponent {
3175 name: spec.id.clone(),
3176 version: spec.version.clone(),
3177 component,
3178 },
3179 );
3180 missing.remove(&spec.id);
3181 }
3182 Ok(())
3183}
3184
3185async fn load_components_from_dir(
3186 cache: &CacheManager,
3187 engine: &Engine,
3188 root: &Path,
3189 specs: &[ComponentSpec],
3190 missing: &mut HashSet<String>,
3191 into: &mut HashMap<String, PackComponent>,
3192) -> Result<()> {
3193 for spec in specs {
3194 if !missing.contains(&spec.id) {
3195 continue;
3196 }
3197 let path = component_path_for_spec(root, spec);
3198 if !path.exists() {
3199 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3200 continue;
3201 }
3202 let bytes = std::fs::read(&path)
3203 .with_context(|| format!("failed to read component {}", path.display()))?;
3204 let component = compile_component_with_cache(cache, engine, None, bytes)
3205 .await
3206 .with_context(|| {
3207 format!(
3208 "failed to compile component {} from {}",
3209 spec.id,
3210 path.display()
3211 )
3212 })?;
3213 into.insert(
3214 spec.id.clone(),
3215 PackComponent {
3216 name: spec.id.clone(),
3217 version: spec.version.clone(),
3218 component,
3219 },
3220 );
3221 missing.remove(&spec.id);
3222 }
3223 Ok(())
3224}
3225
3226async fn load_components_from_archive(
3227 cache: &CacheManager,
3228 engine: &Engine,
3229 path: &Path,
3230 specs: &[ComponentSpec],
3231 missing: &mut HashSet<String>,
3232 into: &mut HashMap<String, PackComponent>,
3233) -> Result<()> {
3234 let mut archive = ZipArchive::new(File::open(path)?)
3235 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3236 for spec in specs {
3237 if !missing.contains(&spec.id) {
3238 continue;
3239 }
3240 let file_name = spec
3241 .legacy_path
3242 .clone()
3243 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3244 let bytes = match read_entry(&mut archive, &file_name) {
3245 Ok(bytes) => bytes,
3246 Err(err) => {
3247 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3248 continue;
3249 }
3250 };
3251 let component = compile_component_with_cache(cache, engine, None, bytes)
3252 .await
3253 .with_context(|| format!("failed to compile component {}", spec.id))?;
3254 into.insert(
3255 spec.id.clone(),
3256 PackComponent {
3257 name: spec.id.clone(),
3258 version: spec.version.clone(),
3259 component,
3260 },
3261 );
3262 missing.remove(&spec.id);
3263 }
3264 Ok(())
3265}
3266
3267#[cfg(test)]
3268mod tests {
3269 use super::*;
3270 use greentic_flow::model::{FlowDoc, NodeDoc};
3271 use indexmap::IndexMap;
3272 use serde_json::json;
3273
3274 #[test]
3275 fn normalizes_raw_component_to_component_exec() {
3276 let mut nodes = IndexMap::new();
3277 let mut raw = IndexMap::new();
3278 raw.insert(
3279 "templating.handlebars".into(),
3280 json!({ "template": "Hi {{name}}" }),
3281 );
3282 nodes.insert(
3283 "start".into(),
3284 NodeDoc {
3285 raw,
3286 routing: json!([{"out": true}]),
3287 ..Default::default()
3288 },
3289 );
3290 let doc = FlowDoc {
3291 id: "welcome".into(),
3292 title: None,
3293 description: None,
3294 flow_type: "messaging".into(),
3295 start: Some("start".into()),
3296 parameters: json!({}),
3297 tags: Vec::new(),
3298 schema_version: None,
3299 entrypoints: IndexMap::new(),
3300 nodes,
3301 };
3302
3303 let normalized = normalize_flow_doc(doc);
3304 let node = normalized.nodes.get("start").expect("node exists");
3305 assert_eq!(node.operation.as_deref(), Some("component.exec"));
3306 assert!(node.raw.is_empty());
3307 let payload = node.payload.as_object().expect("payload object");
3308 assert_eq!(
3309 payload.get("component"),
3310 Some(&Value::String("templating.handlebars".into()))
3311 );
3312 assert_eq!(
3313 payload.get("operation"),
3314 Some(&Value::String("render".into()))
3315 );
3316 let input = payload.get("input").unwrap();
3317 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
3318 }
3319}
3320
3321#[derive(Clone, Debug, Default, Serialize, Deserialize)]
3322pub struct PackMetadata {
3323 pub pack_id: String,
3324 pub version: String,
3325 #[serde(default)]
3326 pub entry_flows: Vec<String>,
3327 #[serde(default)]
3328 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
3329}
3330
3331impl PackMetadata {
3332 fn from_wasm(bytes: &[u8]) -> Option<Self> {
3333 let parser = Parser::new(0);
3334 for payload in parser.parse_all(bytes) {
3335 let payload = payload.ok()?;
3336 match payload {
3337 Payload::CustomSection(section) => {
3338 if section.name() == "greentic.manifest"
3339 && let Ok(meta) = Self::from_bytes(section.data())
3340 {
3341 return Some(meta);
3342 }
3343 }
3344 Payload::DataSection(reader) => {
3345 for segment in reader.into_iter().flatten() {
3346 if let Ok(meta) = Self::from_bytes(segment.data) {
3347 return Some(meta);
3348 }
3349 }
3350 }
3351 _ => {}
3352 }
3353 }
3354 None
3355 }
3356
3357 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
3358 #[derive(Deserialize)]
3359 struct RawManifest {
3360 pack_id: String,
3361 version: String,
3362 #[serde(default)]
3363 entry_flows: Vec<String>,
3364 #[serde(default)]
3365 flows: Vec<RawFlow>,
3366 #[serde(default)]
3367 secret_requirements: Vec<greentic_types::SecretRequirement>,
3368 }
3369
3370 #[derive(Deserialize)]
3371 struct RawFlow {
3372 id: String,
3373 }
3374
3375 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
3376 let mut entry_flows = if manifest.entry_flows.is_empty() {
3377 manifest.flows.iter().map(|f| f.id.clone()).collect()
3378 } else {
3379 manifest.entry_flows.clone()
3380 };
3381 entry_flows.retain(|id| !id.is_empty());
3382 Ok(Self {
3383 pack_id: manifest.pack_id,
3384 version: manifest.version,
3385 entry_flows,
3386 secret_requirements: manifest.secret_requirements,
3387 })
3388 }
3389
3390 pub fn fallback(path: &Path) -> Self {
3391 let pack_id = path
3392 .file_stem()
3393 .map(|s| s.to_string_lossy().into_owned())
3394 .unwrap_or_else(|| "unknown-pack".to_string());
3395 Self {
3396 pack_id,
3397 version: "0.0.0".to_string(),
3398 entry_flows: Vec::new(),
3399 secret_requirements: Vec::new(),
3400 }
3401 }
3402
3403 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
3404 let entry_flows = manifest
3405 .flows
3406 .iter()
3407 .map(|flow| flow.id.as_str().to_string())
3408 .collect::<Vec<_>>();
3409 Self {
3410 pack_id: manifest.pack_id.as_str().to_string(),
3411 version: manifest.version.to_string(),
3412 entry_flows,
3413 secret_requirements: manifest.secret_requirements.clone(),
3414 }
3415 }
3416}