1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21 self as host_v1, HostFns, add_all_v1_to_linker,
22 runner_host_http::RunnerHostHttp,
23 runner_host_kv::RunnerHostKv,
24 secrets_store::{SecretsError, SecretsStoreHost},
25 state_store::{
26 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27 StateStoreHost, TenantCtx as StateTenantCtx,
28 },
29 telemetry_logger::{
30 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32 TenantCtx as TelemetryTenantCtx,
33 },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
41 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
42 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
43 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44 pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69
70use crate::config::HostConfig;
71use crate::fault;
72use crate::secrets::{DynSecretsManager, read_secret_blocking};
73use crate::storage::state::STATE_PREFIX;
74use crate::storage::{DynSessionStore, DynStateStore};
75use crate::verify;
76use crate::wasi::RunnerWasiPolicy;
77use tracing::warn;
78use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
79use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
80
81use greentic_flow::model::FlowDoc;
82
83#[allow(dead_code)]
84pub struct PackRuntime {
85 path: PathBuf,
87 archive_path: Option<PathBuf>,
89 config: Arc<HostConfig>,
90 engine: Engine,
91 metadata: PackMetadata,
92 manifest: Option<greentic_types::PackManifest>,
93 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
94 component_manifests: HashMap<String, ComponentManifest>,
95 mocks: Option<Arc<MockLayer>>,
96 flows: Option<PackFlows>,
97 components: HashMap<String, PackComponent>,
98 http_client: Arc<BlockingClient>,
99 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
100 session_store: Option<DynSessionStore>,
101 state_store: Option<DynStateStore>,
102 wasi_policy: Arc<RunnerWasiPolicy>,
103 provider_registry: RwLock<Option<ProviderRegistry>>,
104 secrets: DynSecretsManager,
105 oauth_config: Option<OAuthBrokerConfig>,
106 cache: CacheManager,
107}
108
109struct PackComponent {
110 #[allow(dead_code)]
111 name: String,
112 #[allow(dead_code)]
113 version: String,
114 component: Arc<Component>,
115}
116
117#[derive(Debug, Default, Clone)]
118pub struct ComponentResolution {
119 pub materialized_root: Option<PathBuf>,
121 pub overrides: HashMap<String, PathBuf>,
123 pub dist_offline: bool,
125 pub dist_cache_dir: Option<PathBuf>,
127 pub allow_missing_hash: bool,
129}
130
131fn build_blocking_client() -> BlockingClient {
132 std::thread::spawn(|| {
133 BlockingClient::builder()
134 .no_proxy()
135 .build()
136 .expect("blocking client")
137 })
138 .join()
139 .expect("client build thread panicked")
140}
141
142fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
143 let (root, candidate) = if path.is_absolute() {
144 let parent = path
145 .parent()
146 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
147 let root = parent
148 .canonicalize()
149 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
150 let file = path
151 .file_name()
152 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
153 (root, PathBuf::from(file))
154 } else {
155 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
156 let base = if let Some(parent) = path.parent() {
157 cwd.join(parent)
158 } else {
159 cwd
160 };
161 let root = base
162 .canonicalize()
163 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
164 let file = path
165 .file_name()
166 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
167 (root, PathBuf::from(file))
168 };
169 let safe = normalize_under_root(&root, &candidate)?;
170 Ok((root, safe))
171}
172
173static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct FlowDescriptor {
177 pub id: String,
178 #[serde(rename = "type")]
179 pub flow_type: String,
180 pub pack_id: String,
181 pub profile: String,
182 pub version: String,
183 #[serde(default)]
184 pub description: Option<String>,
185}
186
187pub struct HostState {
188 config: Arc<HostConfig>,
189 http_client: Arc<BlockingClient>,
190 default_env: String,
191 #[allow(dead_code)]
192 session_store: Option<DynSessionStore>,
193 state_store: Option<DynStateStore>,
194 mocks: Option<Arc<MockLayer>>,
195 secrets: DynSecretsManager,
196 oauth_config: Option<OAuthBrokerConfig>,
197 oauth_host: OAuthBrokerHost,
198 exec_ctx: Option<ComponentExecCtx>,
199}
200
201impl HostState {
202 #[allow(clippy::default_constructed_unit_structs)]
203 #[allow(clippy::too_many_arguments)]
204 pub fn new(
205 config: Arc<HostConfig>,
206 http_client: Arc<BlockingClient>,
207 mocks: Option<Arc<MockLayer>>,
208 session_store: Option<DynSessionStore>,
209 state_store: Option<DynStateStore>,
210 secrets: DynSecretsManager,
211 oauth_config: Option<OAuthBrokerConfig>,
212 exec_ctx: Option<ComponentExecCtx>,
213 ) -> Result<Self> {
214 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
215 Ok(Self {
216 config,
217 http_client,
218 default_env,
219 session_store,
220 state_store,
221 mocks,
222 secrets,
223 oauth_config,
224 oauth_host: OAuthBrokerHost::default(),
225 exec_ctx,
226 })
227 }
228
229 pub fn get_secret(&self, key: &str) -> Result<String> {
230 if provider_core_only::is_enabled() {
231 bail!(provider_core_only::blocked_message("secrets"))
232 }
233 if !self.config.secrets_policy.is_allowed(key) {
234 bail!("secret {key} is not permitted by bindings policy");
235 }
236 if let Some(mock) = &self.mocks
237 && let Some(value) = mock.secrets_lookup(key)
238 {
239 return Ok(value);
240 }
241 let ctx = self.config.tenant_ctx();
242 let bytes = read_secret_blocking(&self.secrets, &ctx, key)
243 .context("failed to read secret from manager")?;
244 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
245 Ok(value)
246 }
247
248 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
249 let tenant_raw = ctx
250 .as_ref()
251 .map(|ctx| ctx.tenant.clone())
252 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
253 .unwrap_or_else(|| self.config.tenant.clone());
254 let env_raw = ctx
255 .as_ref()
256 .map(|ctx| ctx.env.clone())
257 .unwrap_or_else(|| self.default_env.clone());
258 let tenant_id = TenantId::from_str(&tenant_raw)
259 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
260 let env_id = EnvId::from_str(&env_raw)
261 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
262 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
263 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
264 if let Some(team) = exec_ctx.tenant.team.as_ref() {
265 let team_id =
266 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
267 tenant_ctx = tenant_ctx.with_team(Some(team_id));
268 }
269 if let Some(user) = exec_ctx.tenant.user.as_ref() {
270 let user_id =
271 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
272 tenant_ctx = tenant_ctx.with_user(Some(user_id));
273 }
274 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
275 if let Some(node) = exec_ctx.node_id.as_ref() {
276 tenant_ctx = tenant_ctx.with_node(node.clone());
277 }
278 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
279 tenant_ctx = tenant_ctx.with_session(session.clone());
280 }
281 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
282 }
283
284 if let Some(ctx) = ctx {
285 if let Some(team) = ctx.team.or(ctx.team_id) {
286 let team_id =
287 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
288 tenant_ctx = tenant_ctx.with_team(Some(team_id));
289 }
290 if let Some(user) = ctx.user.or(ctx.user_id) {
291 let user_id =
292 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
293 tenant_ctx = tenant_ctx.with_user(Some(user_id));
294 }
295 if let Some(flow) = ctx.flow_id {
296 tenant_ctx = tenant_ctx.with_flow(flow);
297 }
298 if let Some(node) = ctx.node_id {
299 tenant_ctx = tenant_ctx.with_node(node);
300 }
301 if let Some(provider) = ctx.provider_id {
302 tenant_ctx = tenant_ctx.with_provider(provider);
303 }
304 if let Some(session) = ctx.session_id {
305 tenant_ctx = tenant_ctx.with_session(session);
306 }
307 tenant_ctx.trace_id = ctx.trace_id;
308 }
309 Ok(tenant_ctx)
310 }
311
312 fn send_http_request(
313 &mut self,
314 req: HttpRequest,
315 opts: Option<HttpRequestOptionsV1_1>,
316 _ctx: Option<HttpTenantCtx>,
317 ) -> Result<HttpResponse, HttpClientError> {
318 if !self.config.http_enabled {
319 return Err(HttpClientError {
320 code: "denied".into(),
321 message: "http client disabled by policy".into(),
322 });
323 }
324
325 let mut mock_state = None;
326 let raw_body = req.body.clone();
327 if let Some(mock) = &self.mocks
328 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
329 {
330 match mock.http_begin(&meta) {
331 HttpDecision::Mock(response) => {
332 let headers = response
333 .headers
334 .iter()
335 .map(|(k, v)| (k.clone(), v.clone()))
336 .collect();
337 return Ok(HttpResponse {
338 status: response.status,
339 headers,
340 body: response.body.clone().map(|b| b.into_bytes()),
341 });
342 }
343 HttpDecision::Deny(reason) => {
344 return Err(HttpClientError {
345 code: "denied".into(),
346 message: reason,
347 });
348 }
349 HttpDecision::Passthrough { record } => {
350 mock_state = Some((meta, record));
351 }
352 }
353 }
354
355 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
356 let mut builder = self.http_client.request(method, &req.url);
357 for (key, value) in req.headers {
358 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
359 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
360 {
361 builder = builder.header(header, header_value);
362 }
363 }
364
365 if let Some(body) = raw_body.clone() {
366 builder = builder.body(body);
367 }
368
369 if let Some(opts) = opts {
370 if let Some(timeout_ms) = opts.timeout_ms {
371 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
372 }
373 if opts.allow_insecure == Some(true) {
374 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
375 }
376 if let Some(follow_redirects) = opts.follow_redirects
377 && !follow_redirects
378 {
379 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
380 }
381 }
382
383 let response = match builder.send() {
384 Ok(resp) => resp,
385 Err(err) => {
386 warn!(url = %req.url, error = %err, "http client request failed");
387 return Err(HttpClientError {
388 code: "unavailable".into(),
389 message: err.to_string(),
390 });
391 }
392 };
393
394 let status = response.status().as_u16();
395 let headers_vec = response
396 .headers()
397 .iter()
398 .map(|(k, v)| {
399 (
400 k.as_str().to_string(),
401 v.to_str().unwrap_or_default().to_string(),
402 )
403 })
404 .collect::<Vec<_>>();
405 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
406
407 if let Some((meta, true)) = mock_state.take()
408 && let Some(mock) = &self.mocks
409 {
410 let recorded = HttpMockResponse::new(
411 status,
412 headers_vec.clone().into_iter().collect(),
413 body_bytes
414 .as_ref()
415 .map(|b| String::from_utf8_lossy(b).into_owned()),
416 );
417 mock.http_record(&meta, &recorded);
418 }
419
420 Ok(HttpResponse {
421 status,
422 headers: headers_vec,
423 body: body_bytes,
424 })
425 }
426}
427
428impl SecretsStoreHost for HostState {
429 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
430 if provider_core_only::is_enabled() {
431 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
432 return Err(SecretsError::Denied);
433 }
434 if !self.config.secrets_policy.is_allowed(&key) {
435 return Err(SecretsError::Denied);
436 }
437 if let Some(mock) = &self.mocks
438 && let Some(value) = mock.secrets_lookup(&key)
439 {
440 return Ok(Some(value.into_bytes()));
441 }
442 let ctx = self.config.tenant_ctx();
443 match read_secret_blocking(&self.secrets, &ctx, &key) {
444 Ok(bytes) => Ok(Some(bytes)),
445 Err(err) => {
446 warn!(secret = %key, error = %err, "secret lookup failed");
447 Err(SecretsError::NotFound)
448 }
449 }
450 }
451}
452
453impl HttpClientHost for HostState {
454 fn send(
455 &mut self,
456 req: HttpRequest,
457 ctx: Option<HttpTenantCtx>,
458 ) -> Result<HttpResponse, HttpClientError> {
459 self.send_http_request(req, None, ctx)
460 }
461}
462
463impl HttpClientHostV1_1 for HostState {
464 fn send(
465 &mut self,
466 req: HttpRequestV1_1,
467 opts: Option<HttpRequestOptionsV1_1>,
468 ctx: Option<HttpTenantCtxV1_1>,
469 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
470 let legacy_req = HttpRequest {
471 method: req.method,
472 url: req.url,
473 headers: req.headers,
474 body: req.body,
475 };
476 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
477 env: ctx.env,
478 tenant: ctx.tenant,
479 tenant_id: ctx.tenant_id,
480 team: ctx.team,
481 team_id: ctx.team_id,
482 user: ctx.user,
483 user_id: ctx.user_id,
484 trace_id: ctx.trace_id,
485 correlation_id: ctx.correlation_id,
486 attributes: ctx.attributes,
487 session_id: ctx.session_id,
488 flow_id: ctx.flow_id,
489 node_id: ctx.node_id,
490 provider_id: ctx.provider_id,
491 deadline_ms: ctx.deadline_ms,
492 attempt: ctx.attempt,
493 idempotency_key: ctx.idempotency_key,
494 impersonation: ctx
495 .impersonation
496 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
497 actor_id,
498 reason,
499 }),
500 });
501
502 self.send_http_request(legacy_req, opts, legacy_ctx)
503 .map(|resp| HttpResponseV1_1 {
504 status: resp.status,
505 headers: resp.headers,
506 body: resp.body,
507 })
508 .map_err(|err| HttpClientErrorV1_1 {
509 code: err.code,
510 message: err.message,
511 })
512 }
513}
514
515impl StateStoreHost for HostState {
516 fn read(
517 &mut self,
518 key: HostStateKey,
519 ctx: Option<StateTenantCtx>,
520 ) -> Result<Vec<u8>, StateError> {
521 let store = match self.state_store.as_ref() {
522 Some(store) => store.clone(),
523 None => {
524 return Err(StateError {
525 code: "unavailable".into(),
526 message: "state store not configured".into(),
527 });
528 }
529 };
530 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
531 Ok(ctx) => ctx,
532 Err(err) => {
533 return Err(StateError {
534 code: "invalid-ctx".into(),
535 message: err.to_string(),
536 });
537 }
538 };
539 let key = StoreStateKey::from(key);
540 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
541 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
542 Ok(None) => Err(StateError {
543 code: "not_found".into(),
544 message: "state key not found".into(),
545 }),
546 Err(err) => Err(StateError {
547 code: "internal".into(),
548 message: err.to_string(),
549 }),
550 }
551 }
552
553 fn write(
554 &mut self,
555 key: HostStateKey,
556 bytes: Vec<u8>,
557 ctx: Option<StateTenantCtx>,
558 ) -> Result<StateOpAck, StateError> {
559 let store = match self.state_store.as_ref() {
560 Some(store) => store.clone(),
561 None => {
562 return Err(StateError {
563 code: "unavailable".into(),
564 message: "state store not configured".into(),
565 });
566 }
567 };
568 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
569 Ok(ctx) => ctx,
570 Err(err) => {
571 return Err(StateError {
572 code: "invalid-ctx".into(),
573 message: err.to_string(),
574 });
575 }
576 };
577 let key = StoreStateKey::from(key);
578 let value = serde_json::from_slice(&bytes)
579 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
580 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
581 Ok(()) => Ok(StateOpAck::Ok),
582 Err(err) => Err(StateError {
583 code: "internal".into(),
584 message: err.to_string(),
585 }),
586 }
587 }
588
589 fn delete(
590 &mut self,
591 key: HostStateKey,
592 ctx: Option<StateTenantCtx>,
593 ) -> Result<StateOpAck, StateError> {
594 let store = match self.state_store.as_ref() {
595 Some(store) => store.clone(),
596 None => {
597 return Err(StateError {
598 code: "unavailable".into(),
599 message: "state store not configured".into(),
600 });
601 }
602 };
603 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
604 Ok(ctx) => ctx,
605 Err(err) => {
606 return Err(StateError {
607 code: "invalid-ctx".into(),
608 message: err.to_string(),
609 });
610 }
611 };
612 let key = StoreStateKey::from(key);
613 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
614 Ok(_) => Ok(StateOpAck::Ok),
615 Err(err) => Err(StateError {
616 code: "internal".into(),
617 message: err.to_string(),
618 }),
619 }
620 }
621}
622
623impl TelemetryLoggerHost for HostState {
624 fn log(
625 &mut self,
626 span: TelemetrySpanContext,
627 fields: Vec<(String, String)>,
628 _ctx: Option<TelemetryTenantCtx>,
629 ) -> Result<TelemetryAck, TelemetryError> {
630 if let Some(mock) = &self.mocks
631 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
632 {
633 return Ok(TelemetryAck::Ok);
634 }
635 let mut map = serde_json::Map::new();
636 for (k, v) in fields {
637 map.insert(k, Value::String(v));
638 }
639 tracing::info!(
640 tenant = %span.tenant,
641 flow_id = %span.flow_id,
642 node = ?span.node_id,
643 provider = %span.provider,
644 fields = %serde_json::Value::Object(map.clone()),
645 "telemetry log from pack"
646 );
647 Ok(TelemetryAck::Ok)
648 }
649}
650
651impl RunnerHostHttp for HostState {
652 fn request(
653 &mut self,
654 method: String,
655 url: String,
656 headers: Vec<String>,
657 body: Option<Vec<u8>>,
658 ) -> Result<Vec<u8>, String> {
659 let req = HttpRequest {
660 method,
661 url,
662 headers: headers
663 .chunks(2)
664 .filter_map(|chunk| {
665 if chunk.len() == 2 {
666 Some((chunk[0].clone(), chunk[1].clone()))
667 } else {
668 None
669 }
670 })
671 .collect(),
672 body,
673 };
674 match HttpClientHost::send(self, req, None) {
675 Ok(resp) => Ok(resp.body.unwrap_or_default()),
676 Err(err) => Err(err.message),
677 }
678 }
679}
680
681impl RunnerHostKv for HostState {
682 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
683 None
684 }
685
686 fn put(&mut self, _ns: String, _key: String, _val: String) {}
687}
688
689enum ManifestLoad {
690 New {
691 manifest: Box<greentic_types::PackManifest>,
692 flows: PackFlows,
693 },
694 Legacy {
695 manifest: Box<legacy_pack::PackManifest>,
696 flows: PackFlows,
697 },
698}
699
700fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
701 let mut archive = ZipArchive::new(File::open(path)?)
702 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
703 let bytes = read_entry(&mut archive, "manifest.cbor")
704 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
705 match decode_pack_manifest(&bytes) {
706 Ok(manifest) => {
707 let cache = PackFlows::from_manifest(manifest.clone());
708 Ok(ManifestLoad::New {
709 manifest: Box::new(manifest),
710 flows: cache,
711 })
712 }
713 Err(err) => {
714 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
715 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
717 .context("failed to decode legacy pack manifest from manifest.cbor")?;
718 let flows = load_legacy_flows(&mut archive, &legacy)?;
719 Ok(ManifestLoad::Legacy {
720 manifest: Box::new(legacy),
721 flows,
722 })
723 }
724 }
725}
726
727fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
728 let manifest_path = root.join("manifest.cbor");
729 let bytes = std::fs::read(&manifest_path)
730 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
731 match decode_pack_manifest(&bytes) {
732 Ok(manifest) => {
733 let cache = PackFlows::from_manifest(manifest.clone());
734 Ok(ManifestLoad::New {
735 manifest: Box::new(manifest),
736 flows: cache,
737 })
738 }
739 Err(err) => {
740 tracing::debug!(
741 error = %err,
742 pack = %root.display(),
743 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
744 );
745 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
746 .context("failed to decode legacy pack manifest from manifest.cbor")?;
747 let flows = load_legacy_flows_from_dir(root, &legacy)?;
748 Ok(ManifestLoad::Legacy {
749 manifest: Box::new(legacy),
750 flows,
751 })
752 }
753 }
754}
755
756fn load_legacy_flows(
757 archive: &mut ZipArchive<File>,
758 manifest: &legacy_pack::PackManifest,
759) -> Result<PackFlows> {
760 let mut flows = HashMap::new();
761 let mut descriptors = Vec::new();
762
763 for entry in &manifest.flows {
764 let bytes = read_entry(archive, &entry.file_json)
765 .with_context(|| format!("missing flow json {}", entry.file_json))?;
766 let doc: FlowDoc = serde_json::from_slice(&bytes)
767 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
768 let normalized = normalize_flow_doc(doc);
769 let flow_ir = flow_doc_to_ir(normalized)?;
770 let flow = flow_ir_to_flow(flow_ir)?;
771
772 descriptors.push(FlowDescriptor {
773 id: entry.id.clone(),
774 flow_type: entry.kind.clone(),
775 pack_id: manifest.meta.pack_id.clone(),
776 profile: manifest.meta.pack_id.clone(),
777 version: manifest.meta.version.to_string(),
778 description: None,
779 });
780 flows.insert(entry.id.clone(), flow);
781 }
782
783 let mut entry_flows = manifest.meta.entry_flows.clone();
784 if entry_flows.is_empty() {
785 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
786 }
787 let metadata = PackMetadata {
788 pack_id: manifest.meta.pack_id.clone(),
789 version: manifest.meta.version.to_string(),
790 entry_flows,
791 secret_requirements: Vec::new(),
792 };
793
794 Ok(PackFlows {
795 descriptors,
796 flows,
797 metadata,
798 })
799}
800
801fn load_legacy_flows_from_dir(
802 root: &Path,
803 manifest: &legacy_pack::PackManifest,
804) -> Result<PackFlows> {
805 let mut flows = HashMap::new();
806 let mut descriptors = Vec::new();
807
808 for entry in &manifest.flows {
809 let path = root.join(&entry.file_json);
810 let bytes = std::fs::read(&path)
811 .with_context(|| format!("missing flow json {}", path.display()))?;
812 let doc: FlowDoc = serde_json::from_slice(&bytes)
813 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
814 let normalized = normalize_flow_doc(doc);
815 let flow_ir = flow_doc_to_ir(normalized)?;
816 let flow = flow_ir_to_flow(flow_ir)?;
817
818 descriptors.push(FlowDescriptor {
819 id: entry.id.clone(),
820 flow_type: entry.kind.clone(),
821 pack_id: manifest.meta.pack_id.clone(),
822 profile: manifest.meta.pack_id.clone(),
823 version: manifest.meta.version.to_string(),
824 description: None,
825 });
826 flows.insert(entry.id.clone(), flow);
827 }
828
829 let mut entry_flows = manifest.meta.entry_flows.clone();
830 if entry_flows.is_empty() {
831 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
832 }
833 let metadata = PackMetadata {
834 pack_id: manifest.meta.pack_id.clone(),
835 version: manifest.meta.version.to_string(),
836 entry_flows,
837 secret_requirements: Vec::new(),
838 };
839
840 Ok(PackFlows {
841 descriptors,
842 flows,
843 metadata,
844 })
845}
846
847pub struct ComponentState {
848 pub host: HostState,
849 wasi_ctx: WasiCtx,
850 resource_table: ResourceTable,
851}
852
853impl ComponentState {
854 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
855 let wasi_ctx = policy
856 .instantiate()
857 .context("failed to build WASI context")?;
858 Ok(Self {
859 host,
860 wasi_ctx,
861 resource_table: ResourceTable::new(),
862 })
863 }
864
865 fn host_mut(&mut self) -> &mut HostState {
866 &mut self.host
867 }
868
869 fn should_cancel_host(&mut self) -> bool {
870 false
871 }
872
873 fn yield_now_host(&mut self) {
874 }
876}
877
878impl component_api::v0_4::greentic::component::control::Host for ComponentState {
879 fn should_cancel(&mut self) -> bool {
880 self.should_cancel_host()
881 }
882
883 fn yield_now(&mut self) {
884 self.yield_now_host();
885 }
886}
887
888impl component_api::v0_5::greentic::component::control::Host for ComponentState {
889 fn should_cancel(&mut self) -> bool {
890 self.should_cancel_host()
891 }
892
893 fn yield_now(&mut self) {
894 self.yield_now_host();
895 }
896}
897
898fn add_component_control_instance(
899 linker: &mut Linker<ComponentState>,
900 name: &str,
901) -> wasmtime::Result<()> {
902 let mut inst = linker.instance(name)?;
903 inst.func_wrap(
904 "should-cancel",
905 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
906 let host = caller.data_mut();
907 Ok((host.should_cancel_host(),))
908 },
909 )?;
910 inst.func_wrap(
911 "yield-now",
912 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
913 let host = caller.data_mut();
914 host.yield_now_host();
915 Ok(())
916 },
917 )?;
918 Ok(())
919}
920
921fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
922 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
923 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
924 Ok(())
925}
926
927pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
928 add_wasi_to_linker(linker)?;
929 add_all_v1_to_linker(
930 linker,
931 HostFns {
932 http_client_v1_1: Some(|state| state.host_mut()),
933 http_client: Some(|state| state.host_mut()),
934 oauth_broker: None,
935 runner_host_http: Some(|state| state.host_mut()),
936 runner_host_kv: Some(|state| state.host_mut()),
937 telemetry_logger: Some(|state| state.host_mut()),
938 state_store: allow_state_store.then_some(|state| state.host_mut()),
939 secrets_store: Some(|state| state.host_mut()),
940 },
941 )?;
942 Ok(())
943}
944
945impl OAuthHostContext for ComponentState {
946 fn tenant_id(&self) -> &str {
947 &self.host.config.tenant
948 }
949
950 fn env(&self) -> &str {
951 &self.host.default_env
952 }
953
954 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
955 &mut self.host.oauth_host
956 }
957
958 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
959 self.host.oauth_config.as_ref()
960 }
961}
962
963impl WasiView for ComponentState {
964 fn ctx(&mut self) -> WasiCtxView<'_> {
965 WasiCtxView {
966 ctx: &mut self.wasi_ctx,
967 table: &mut self.resource_table,
968 }
969 }
970}
971
972#[allow(unsafe_code)]
973unsafe impl Send for ComponentState {}
974#[allow(unsafe_code)]
975unsafe impl Sync for ComponentState {}
976
977impl PackRuntime {
978 fn allows_state_store(&self, component_ref: &str) -> bool {
979 if self.state_store.is_none() {
980 return false;
981 }
982 if !self.config.state_store_policy.allow {
983 return false;
984 }
985 let Some(manifest) = self.component_manifests.get(component_ref) else {
986 return false;
987 };
988 manifest
989 .capabilities
990 .host
991 .state
992 .as_ref()
993 .map(|caps| caps.read || caps.write)
994 .unwrap_or(false)
995 }
996
997 #[allow(clippy::too_many_arguments)]
998 pub async fn load(
999 path: impl AsRef<Path>,
1000 config: Arc<HostConfig>,
1001 mocks: Option<Arc<MockLayer>>,
1002 archive_source: Option<&Path>,
1003 session_store: Option<DynSessionStore>,
1004 state_store: Option<DynStateStore>,
1005 wasi_policy: Arc<RunnerWasiPolicy>,
1006 secrets: DynSecretsManager,
1007 oauth_config: Option<OAuthBrokerConfig>,
1008 verify_archive: bool,
1009 component_resolution: ComponentResolution,
1010 ) -> Result<Self> {
1011 let path = path.as_ref();
1012 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1013 let path_meta = std::fs::metadata(&safe_path).ok();
1014 let is_dir = path_meta
1015 .as_ref()
1016 .map(|meta| meta.is_dir())
1017 .unwrap_or(false);
1018 let is_component = !is_dir
1019 && safe_path
1020 .extension()
1021 .and_then(|ext| ext.to_str())
1022 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1023 .unwrap_or(false);
1024 let archive_hint_path = if let Some(source) = archive_source {
1025 let (_, normalized) = normalize_pack_path(source)?;
1026 Some(normalized)
1027 } else if is_component || is_dir {
1028 None
1029 } else {
1030 Some(safe_path.clone())
1031 };
1032 let archive_hint = archive_hint_path.as_deref();
1033 if verify_archive {
1034 if let Some(verify_target) = archive_hint.and_then(|p| {
1035 std::fs::metadata(p)
1036 .ok()
1037 .filter(|meta| meta.is_file())
1038 .map(|_| p)
1039 }) {
1040 verify::verify_pack(verify_target).await?;
1041 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1042 } else {
1043 tracing::debug!("skipping archive verification (no archive source)");
1044 }
1045 }
1046 let engine = Engine::default();
1047 let engine_profile =
1048 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1049 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1050 let mut metadata = PackMetadata::fallback(&safe_path);
1051 let mut manifest = None;
1052 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1053 let mut flows = None;
1054 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1055 if is_dir {
1056 Some(safe_path.clone())
1057 } else {
1058 None
1059 }
1060 });
1061
1062 if let Some(root) = materialized_root.as_ref() {
1063 match load_manifest_and_flows_from_dir(root) {
1064 Ok(ManifestLoad::New {
1065 manifest: m,
1066 flows: cache,
1067 }) => {
1068 metadata = cache.metadata.clone();
1069 manifest = Some(*m);
1070 flows = Some(cache);
1071 }
1072 Ok(ManifestLoad::Legacy {
1073 manifest: m,
1074 flows: cache,
1075 }) => {
1076 metadata = cache.metadata.clone();
1077 legacy_manifest = Some(m);
1078 flows = Some(cache);
1079 }
1080 Err(err) => {
1081 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1082 }
1083 }
1084 }
1085
1086 if manifest.is_none()
1087 && legacy_manifest.is_none()
1088 && let Some(archive_path) = archive_hint
1089 {
1090 match load_manifest_and_flows(archive_path) {
1091 Ok(ManifestLoad::New {
1092 manifest: m,
1093 flows: cache,
1094 }) => {
1095 metadata = cache.metadata.clone();
1096 manifest = Some(*m);
1097 flows = Some(cache);
1098 }
1099 Ok(ManifestLoad::Legacy {
1100 manifest: m,
1101 flows: cache,
1102 }) => {
1103 metadata = cache.metadata.clone();
1104 legacy_manifest = Some(m);
1105 flows = Some(cache);
1106 }
1107 Err(err) => {
1108 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1109 }
1110 }
1111 }
1112 let mut pack_lock = None;
1113 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1114 pack_lock = load_pack_lock(&root)?;
1115 if pack_lock.is_some() {
1116 break;
1117 }
1118 }
1119 let component_sources_payload = if pack_lock.is_none() {
1120 if let Some(manifest) = manifest.as_ref() {
1121 manifest
1122 .get_component_sources_v1()
1123 .context("invalid component sources extension")?
1124 } else {
1125 None
1126 }
1127 } else {
1128 None
1129 };
1130 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1131 Some(component_sources_table_from_pack_lock(
1132 lock,
1133 component_resolution.allow_missing_hash,
1134 )?)
1135 } else {
1136 component_sources_table(component_sources_payload.as_ref())?
1137 };
1138 let components = if is_component {
1139 let wasm_bytes = fs::read(&safe_path).await?;
1140 metadata = PackMetadata::from_wasm(&wasm_bytes)
1141 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1142 let name = safe_path
1143 .file_stem()
1144 .map(|s| s.to_string_lossy().to_string())
1145 .unwrap_or_else(|| "component".to_string());
1146 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1147 let mut map = HashMap::new();
1148 map.insert(
1149 name.clone(),
1150 PackComponent {
1151 name,
1152 version: metadata.version.clone(),
1153 component,
1154 },
1155 );
1156 map
1157 } else {
1158 let specs = component_specs(
1159 manifest.as_ref(),
1160 legacy_manifest.as_deref(),
1161 component_sources_payload.as_ref(),
1162 pack_lock.as_ref(),
1163 );
1164 if specs.is_empty() {
1165 HashMap::new()
1166 } else {
1167 let mut loaded = HashMap::new();
1168 let mut missing: HashSet<String> =
1169 specs.iter().map(|spec| spec.id.clone()).collect();
1170 let mut searched = Vec::new();
1171
1172 if !component_resolution.overrides.is_empty() {
1173 load_components_from_overrides(
1174 &cache,
1175 &engine,
1176 &component_resolution.overrides,
1177 &specs,
1178 &mut missing,
1179 &mut loaded,
1180 )
1181 .await?;
1182 searched.push("override map".to_string());
1183 }
1184
1185 if let Some(component_sources) = component_sources.as_ref() {
1186 load_components_from_sources(
1187 &cache,
1188 &engine,
1189 component_sources,
1190 &component_resolution,
1191 &specs,
1192 &mut missing,
1193 &mut loaded,
1194 materialized_root.as_deref(),
1195 archive_hint,
1196 )
1197 .await?;
1198 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1199 }
1200
1201 if let Some(root) = materialized_root.as_ref() {
1202 load_components_from_dir(
1203 &cache,
1204 &engine,
1205 root,
1206 &specs,
1207 &mut missing,
1208 &mut loaded,
1209 )
1210 .await?;
1211 searched.push(format!("components dir {}", root.display()));
1212 }
1213
1214 if let Some(archive_path) = archive_hint {
1215 load_components_from_archive(
1216 &cache,
1217 &engine,
1218 archive_path,
1219 &specs,
1220 &mut missing,
1221 &mut loaded,
1222 )
1223 .await?;
1224 searched.push(format!("archive {}", archive_path.display()));
1225 }
1226
1227 if !missing.is_empty() {
1228 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1229 let sources = if searched.is_empty() {
1230 "no component sources".to_string()
1231 } else {
1232 searched.join(", ")
1233 };
1234 bail!(
1235 "components missing: {}; looked in {}",
1236 missing_list,
1237 sources
1238 );
1239 }
1240
1241 loaded
1242 }
1243 };
1244 let http_client = Arc::clone(&HTTP_CLIENT);
1245 let mut component_manifests = HashMap::new();
1246 if let Some(manifest) = manifest.as_ref() {
1247 for component in &manifest.components {
1248 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1249 }
1250 }
1251 Ok(Self {
1252 path: safe_path,
1253 archive_path: archive_hint.map(Path::to_path_buf),
1254 config,
1255 engine,
1256 metadata,
1257 manifest,
1258 legacy_manifest,
1259 component_manifests,
1260 mocks,
1261 flows,
1262 components,
1263 http_client,
1264 pre_cache: Mutex::new(HashMap::new()),
1265 session_store,
1266 state_store,
1267 wasi_policy,
1268 provider_registry: RwLock::new(None),
1269 secrets,
1270 oauth_config,
1271 cache,
1272 })
1273 }
1274
1275 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1276 if let Some(cache) = &self.flows {
1277 return Ok(cache.descriptors.clone());
1278 }
1279 if let Some(manifest) = &self.manifest {
1280 let descriptors = manifest
1281 .flows
1282 .iter()
1283 .map(|flow| FlowDescriptor {
1284 id: flow.id.as_str().to_string(),
1285 flow_type: flow_kind_to_str(flow.kind).to_string(),
1286 pack_id: manifest.pack_id.as_str().to_string(),
1287 profile: manifest.pack_id.as_str().to_string(),
1288 version: manifest.version.to_string(),
1289 description: None,
1290 })
1291 .collect();
1292 return Ok(descriptors);
1293 }
1294 Ok(Vec::new())
1295 }
1296
1297 #[allow(dead_code)]
1298 pub async fn run_flow(
1299 &self,
1300 flow_id: &str,
1301 input: serde_json::Value,
1302 ) -> Result<serde_json::Value> {
1303 let pack = Arc::new(
1304 PackRuntime::load(
1305 &self.path,
1306 Arc::clone(&self.config),
1307 self.mocks.clone(),
1308 self.archive_path.as_deref(),
1309 self.session_store.clone(),
1310 self.state_store.clone(),
1311 Arc::clone(&self.wasi_policy),
1312 self.secrets.clone(),
1313 self.oauth_config.clone(),
1314 false,
1315 ComponentResolution::default(),
1316 )
1317 .await?,
1318 );
1319
1320 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1321 let retry_config = self.config.retry_config().into();
1322 let mocks = pack.mocks.as_deref();
1323 let tenant = self.config.tenant.as_str();
1324
1325 let ctx = FlowContext {
1326 tenant,
1327 pack_id: pack.metadata().pack_id.as_str(),
1328 flow_id,
1329 node_id: None,
1330 tool: None,
1331 action: None,
1332 session_id: None,
1333 provider_id: None,
1334 retry_config,
1335 observer: None,
1336 mocks,
1337 };
1338
1339 let execution = engine.execute(ctx, input).await?;
1340 match execution.status {
1341 FlowStatus::Completed => Ok(execution.output),
1342 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1343 "status": "pending",
1344 "reason": wait.reason,
1345 "resume": wait.snapshot,
1346 "response": execution.output,
1347 })),
1348 }
1349 }
1350
1351 pub async fn invoke_component(
1352 &self,
1353 component_ref: &str,
1354 ctx: ComponentExecCtx,
1355 operation: &str,
1356 _config_json: Option<String>,
1357 input_json: String,
1358 ) -> Result<Value> {
1359 let pack_component = self
1360 .components
1361 .get(component_ref)
1362 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1363
1364 let mut linker = Linker::new(&self.engine);
1365 let allow_state_store = self.allows_state_store(component_ref);
1366 register_all(&mut linker, allow_state_store)?;
1367 add_component_control_to_linker(&mut linker)?;
1368
1369 let host_state = HostState::new(
1370 Arc::clone(&self.config),
1371 Arc::clone(&self.http_client),
1372 self.mocks.clone(),
1373 self.session_store.clone(),
1374 self.state_store.clone(),
1375 Arc::clone(&self.secrets),
1376 self.oauth_config.clone(),
1377 Some(ctx.clone()),
1378 )?;
1379 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1380 let mut store = wasmtime::Store::new(&self.engine, store_state);
1381 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1382 let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1383 Ok(pre) => {
1384 let bindings = pre.instantiate_async(&mut store).await?;
1385 let node = bindings.greentic_component_node();
1386 let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1387 let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1388 component_api::invoke_result_from_v0_5(result)
1389 }
1390 Err(err) => {
1391 if is_missing_node_export(&err, "0.5.0") {
1392 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1393 let pre: component_api::v0_4::ComponentPre<ComponentState> =
1394 component_api::v0_4::ComponentPre::new(pre_instance)?;
1395 let bindings = pre.instantiate_async(&mut store).await?;
1396 let node = bindings.greentic_component_node();
1397 let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1398 let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1399 component_api::invoke_result_from_v0_4(result)
1400 } else {
1401 return Err(err);
1402 }
1403 }
1404 };
1405
1406 match result {
1407 InvokeResult::Ok(body) => {
1408 if body.is_empty() {
1409 return Ok(Value::Null);
1410 }
1411 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1412 }
1413 InvokeResult::Err(NodeError {
1414 code,
1415 message,
1416 retryable,
1417 backoff_ms,
1418 details,
1419 }) => {
1420 let mut obj = serde_json::Map::new();
1421 obj.insert("ok".into(), Value::Bool(false));
1422 let mut error = serde_json::Map::new();
1423 error.insert("code".into(), Value::String(code));
1424 error.insert("message".into(), Value::String(message));
1425 error.insert("retryable".into(), Value::Bool(retryable));
1426 if let Some(backoff) = backoff_ms {
1427 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1428 }
1429 if let Some(details) = details {
1430 error.insert(
1431 "details".into(),
1432 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1433 );
1434 }
1435 obj.insert("error".into(), Value::Object(error));
1436 Ok(Value::Object(obj))
1437 }
1438 }
1439 }
1440
1441 pub fn resolve_provider(
1442 &self,
1443 provider_id: Option<&str>,
1444 provider_type: Option<&str>,
1445 ) -> Result<ProviderBinding> {
1446 let registry = self.provider_registry()?;
1447 registry.resolve(provider_id, provider_type)
1448 }
1449
1450 pub async fn invoke_provider(
1451 &self,
1452 binding: &ProviderBinding,
1453 ctx: ComponentExecCtx,
1454 op: &str,
1455 input_json: Vec<u8>,
1456 ) -> Result<Value> {
1457 let component_ref = &binding.component_ref;
1458 let pack_component = self
1459 .components
1460 .get(component_ref)
1461 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1462
1463 let mut linker = Linker::new(&self.engine);
1464 let allow_state_store = self.allows_state_store(component_ref);
1465 register_all(&mut linker, allow_state_store)?;
1466 add_component_control_to_linker(&mut linker)?;
1467 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1468 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1469
1470 let host_state = HostState::new(
1471 Arc::clone(&self.config),
1472 Arc::clone(&self.http_client),
1473 self.mocks.clone(),
1474 self.session_store.clone(),
1475 self.state_store.clone(),
1476 Arc::clone(&self.secrets),
1477 self.oauth_config.clone(),
1478 Some(ctx),
1479 )?;
1480 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1481 let mut store = wasmtime::Store::new(&self.engine, store_state);
1482 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1483 let provider = bindings.greentic_provider_core_schema_core_api();
1484
1485 let result = provider.call_invoke(&mut store, op, &input_json)?;
1486 deserialize_json_bytes(result)
1487 }
1488
1489 fn provider_registry(&self) -> Result<ProviderRegistry> {
1490 if let Some(registry) = self.provider_registry.read().clone() {
1491 return Ok(registry);
1492 }
1493 let manifest = self
1494 .manifest
1495 .as_ref()
1496 .context("pack manifest required for provider resolution")?;
1497 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1498 let registry = ProviderRegistry::new(
1499 manifest,
1500 self.state_store.clone(),
1501 &self.config.tenant,
1502 &env,
1503 )?;
1504 *self.provider_registry.write() = Some(registry.clone());
1505 Ok(registry)
1506 }
1507
1508 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1509 if let Some(cache) = &self.flows {
1510 return cache
1511 .flows
1512 .get(flow_id)
1513 .cloned()
1514 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1515 }
1516 if let Some(manifest) = &self.manifest {
1517 let entry = manifest
1518 .flows
1519 .iter()
1520 .find(|f| f.id.as_str() == flow_id)
1521 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1522 return Ok(entry.flow.clone());
1523 }
1524 bail!("flow '{flow_id}' not available (pack exports disabled)")
1525 }
1526
1527 pub fn metadata(&self) -> &PackMetadata {
1528 &self.metadata
1529 }
1530
1531 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1532 &self.metadata.secret_requirements
1533 }
1534
1535 pub fn missing_secrets(
1536 &self,
1537 tenant_ctx: &TypesTenantCtx,
1538 ) -> Vec<greentic_types::SecretRequirement> {
1539 let env = tenant_ctx.env.as_str().to_string();
1540 let tenant = tenant_ctx.tenant.as_str().to_string();
1541 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1542 self.required_secrets()
1543 .iter()
1544 .filter(|req| {
1545 if let Some(scope) = &req.scope {
1547 if scope.env != env {
1548 return false;
1549 }
1550 if scope.tenant != tenant {
1551 return false;
1552 }
1553 if let Some(ref team_req) = scope.team
1554 && team.as_ref() != Some(team_req)
1555 {
1556 return false;
1557 }
1558 }
1559 let ctx = self.config.tenant_ctx();
1560 read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1561 })
1562 .cloned()
1563 .collect()
1564 }
1565
1566 pub fn for_component_test(
1567 components: Vec<(String, PathBuf)>,
1568 flows: HashMap<String, FlowIR>,
1569 pack_id: &str,
1570 config: Arc<HostConfig>,
1571 ) -> Result<Self> {
1572 let engine = Engine::default();
1573 let engine_profile =
1574 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1575 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1576 let mut component_map = HashMap::new();
1577 for (name, path) in components {
1578 if !path.exists() {
1579 bail!("component artifact missing: {}", path.display());
1580 }
1581 let wasm_bytes = std::fs::read(&path)?;
1582 let component = Arc::new(
1583 Component::from_binary(&engine, &wasm_bytes)
1584 .with_context(|| format!("failed to compile component {}", path.display()))?,
1585 );
1586 component_map.insert(
1587 name.clone(),
1588 PackComponent {
1589 name,
1590 version: "0.0.0".into(),
1591 component,
1592 },
1593 );
1594 }
1595
1596 let mut flow_map = HashMap::new();
1597 let mut descriptors = Vec::new();
1598 for (id, ir) in flows {
1599 let flow_type = ir.flow_type.clone();
1600 let flow = flow_ir_to_flow(ir)?;
1601 flow_map.insert(id.clone(), flow);
1602 descriptors.push(FlowDescriptor {
1603 id: id.clone(),
1604 flow_type,
1605 pack_id: pack_id.to_string(),
1606 profile: "test".into(),
1607 version: "0.0.0".into(),
1608 description: None,
1609 });
1610 }
1611 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1612 let metadata = PackMetadata {
1613 pack_id: pack_id.to_string(),
1614 version: "0.0.0".into(),
1615 entry_flows,
1616 secret_requirements: Vec::new(),
1617 };
1618 let flows_cache = PackFlows {
1619 descriptors: descriptors.clone(),
1620 flows: flow_map,
1621 metadata: metadata.clone(),
1622 };
1623
1624 Ok(Self {
1625 path: PathBuf::new(),
1626 archive_path: None,
1627 config,
1628 engine,
1629 metadata,
1630 manifest: None,
1631 legacy_manifest: None,
1632 component_manifests: HashMap::new(),
1633 mocks: None,
1634 flows: Some(flows_cache),
1635 components: component_map,
1636 http_client: Arc::clone(&HTTP_CLIENT),
1637 pre_cache: Mutex::new(HashMap::new()),
1638 session_store: None,
1639 state_store: None,
1640 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1641 provider_registry: RwLock::new(None),
1642 secrets: crate::secrets::default_manager()?,
1643 oauth_config: None,
1644 cache,
1645 })
1646 }
1647}
1648
1649fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1650 let message = err.to_string();
1651 message.contains("no exported instance named")
1652 && message.contains(&format!("greentic:component/node@{version}"))
1653}
1654
1655struct PackFlows {
1656 descriptors: Vec<FlowDescriptor>,
1657 flows: HashMap<String, Flow>,
1658 metadata: PackMetadata,
1659}
1660
1661const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1662 "greentic.pack.runtime_flow",
1663 "greentic.pack.flow_runtime",
1664 "greentic.pack.runtime_flows",
1665];
1666
1667#[derive(Debug, Deserialize)]
1668struct RuntimeFlowBundle {
1669 flows: Vec<RuntimeFlow>,
1670}
1671
1672#[derive(Debug, Deserialize)]
1673struct RuntimeFlow {
1674 id: String,
1675 #[serde(alias = "flow_type")]
1676 kind: FlowKind,
1677 #[serde(default)]
1678 schema_version: Option<String>,
1679 #[serde(default)]
1680 start: Option<String>,
1681 #[serde(default)]
1682 entrypoints: BTreeMap<String, Value>,
1683 nodes: BTreeMap<String, RuntimeNode>,
1684 #[serde(default)]
1685 metadata: Option<FlowMetadata>,
1686}
1687
1688#[derive(Debug, Deserialize)]
1689struct RuntimeNode {
1690 #[serde(alias = "component")]
1691 component_id: String,
1692 #[serde(default, alias = "operation")]
1693 operation_name: Option<String>,
1694 #[serde(default, alias = "payload", alias = "input")]
1695 operation_payload: Value,
1696 #[serde(default)]
1697 routing: Option<Routing>,
1698 #[serde(default)]
1699 telemetry: Option<TelemetryHints>,
1700}
1701
1702fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1703 if bytes.is_empty() {
1704 return Ok(Value::Null);
1705 }
1706 serde_json::from_slice(&bytes).or_else(|_| {
1707 String::from_utf8(bytes)
1708 .map(Value::String)
1709 .map_err(|err| anyhow!(err))
1710 })
1711}
1712
1713impl PackFlows {
1714 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1715 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1716 return flows;
1717 }
1718 let descriptors = manifest
1719 .flows
1720 .iter()
1721 .map(|entry| FlowDescriptor {
1722 id: entry.id.as_str().to_string(),
1723 flow_type: flow_kind_to_str(entry.kind).to_string(),
1724 pack_id: manifest.pack_id.as_str().to_string(),
1725 profile: manifest.pack_id.as_str().to_string(),
1726 version: manifest.version.to_string(),
1727 description: None,
1728 })
1729 .collect();
1730 let mut flows = HashMap::new();
1731 for entry in &manifest.flows {
1732 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1733 }
1734 Self {
1735 metadata: PackMetadata::from_manifest(&manifest),
1736 descriptors,
1737 flows,
1738 }
1739 }
1740}
1741
1742fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1743 let extensions = manifest.extensions.as_ref()?;
1744 let extension = extensions.iter().find_map(|(key, ext)| {
1745 if RUNTIME_FLOW_EXTENSION_IDS
1746 .iter()
1747 .any(|candidate| candidate == key)
1748 {
1749 Some(ext)
1750 } else {
1751 None
1752 }
1753 })?;
1754 let runtime_flows = match decode_runtime_flow_extension(extension) {
1755 Some(flows) if !flows.is_empty() => flows,
1756 _ => return None,
1757 };
1758
1759 let descriptors = runtime_flows
1760 .iter()
1761 .map(|flow| FlowDescriptor {
1762 id: flow.id.as_str().to_string(),
1763 flow_type: flow_kind_to_str(flow.kind).to_string(),
1764 pack_id: manifest.pack_id.as_str().to_string(),
1765 profile: manifest.pack_id.as_str().to_string(),
1766 version: manifest.version.to_string(),
1767 description: None,
1768 })
1769 .collect::<Vec<_>>();
1770 let flows = runtime_flows
1771 .into_iter()
1772 .map(|flow| (flow.id.as_str().to_string(), flow))
1773 .collect();
1774
1775 Some(PackFlows {
1776 metadata: PackMetadata::from_manifest(manifest),
1777 descriptors,
1778 flows,
1779 })
1780}
1781
1782fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1783 let value = match extension.inline.as_ref()? {
1784 ExtensionInline::Other(value) => value.clone(),
1785 _ => return None,
1786 };
1787
1788 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1789 return Some(collect_runtime_flows(bundle.flows));
1790 }
1791
1792 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1793 return Some(collect_runtime_flows(flows));
1794 }
1795
1796 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1797 return Some(flows);
1798 }
1799
1800 warn!(
1801 extension = %extension.kind,
1802 version = %extension.version,
1803 "runtime flow extension present but could not be decoded"
1804 );
1805 None
1806}
1807
1808fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1809 flows
1810 .into_iter()
1811 .filter_map(|flow| match runtime_flow_to_flow(flow) {
1812 Ok(flow) => Some(flow),
1813 Err(err) => {
1814 warn!(error = %err, "failed to decode runtime flow");
1815 None
1816 }
1817 })
1818 .collect()
1819}
1820
1821fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1822 let flow_id = FlowId::from_str(&runtime.id)
1823 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1824 let mut entrypoints = runtime.entrypoints;
1825 if entrypoints.is_empty()
1826 && let Some(start) = &runtime.start
1827 {
1828 entrypoints.insert("default".into(), Value::String(start.clone()));
1829 }
1830
1831 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1832 for (id, node) in runtime.nodes {
1833 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1834 let component_id = ComponentId::from_str(&node.component_id)
1835 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1836 let component = FlowComponentRef {
1837 id: component_id,
1838 pack_alias: None,
1839 operation: node.operation_name,
1840 };
1841 let routing = node.routing.unwrap_or(Routing::End);
1842 let telemetry = node.telemetry.unwrap_or_default();
1843 nodes.insert(
1844 node_id.clone(),
1845 Node {
1846 id: node_id,
1847 component,
1848 input: InputMapping {
1849 mapping: node.operation_payload,
1850 },
1851 output: OutputMapping {
1852 mapping: Value::Null,
1853 },
1854 routing,
1855 telemetry,
1856 },
1857 );
1858 }
1859
1860 Ok(Flow {
1861 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1862 id: flow_id,
1863 kind: runtime.kind,
1864 entrypoints,
1865 nodes,
1866 metadata: runtime.metadata.unwrap_or_default(),
1867 })
1868}
1869
1870fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1871 match kind {
1872 greentic_types::FlowKind::Messaging => "messaging",
1873 greentic_types::FlowKind::Event => "event",
1874 greentic_types::FlowKind::ComponentConfig => "component-config",
1875 greentic_types::FlowKind::Job => "job",
1876 greentic_types::FlowKind::Http => "http",
1877 }
1878}
1879
1880fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1881 let mut file = archive
1882 .by_name(name)
1883 .with_context(|| format!("entry {name} missing from archive"))?;
1884 let mut buf = Vec::new();
1885 file.read_to_end(&mut buf)?;
1886 Ok(buf)
1887}
1888
1889fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1890 for node in doc.nodes.values_mut() {
1891 let Some((component_ref, payload)) = node
1892 .raw
1893 .iter()
1894 .next()
1895 .map(|(key, value)| (key.clone(), value.clone()))
1896 else {
1897 continue;
1898 };
1899 if component_ref.starts_with("emit.") {
1900 node.operation = Some(component_ref);
1901 node.payload = payload;
1902 node.raw.clear();
1903 continue;
1904 }
1905 let (target_component, operation, input, config) =
1906 infer_component_exec(&payload, &component_ref);
1907 let mut payload_obj = serde_json::Map::new();
1908 payload_obj.insert("component".into(), Value::String(target_component));
1910 payload_obj.insert("operation".into(), Value::String(operation));
1911 payload_obj.insert("input".into(), input);
1912 if let Some(cfg) = config {
1913 payload_obj.insert("config".into(), cfg);
1914 }
1915 node.operation = Some("component.exec".to_string());
1916 node.payload = Value::Object(payload_obj);
1917 node.raw.clear();
1918 }
1919 doc
1920}
1921
1922fn infer_component_exec(
1923 payload: &Value,
1924 component_ref: &str,
1925) -> (String, String, Value, Option<Value>) {
1926 let default_op = if component_ref.starts_with("templating.") {
1927 "render"
1928 } else {
1929 "invoke"
1930 }
1931 .to_string();
1932
1933 if let Value::Object(map) = payload {
1934 let op = map
1935 .get("op")
1936 .or_else(|| map.get("operation"))
1937 .and_then(Value::as_str)
1938 .map(|s| s.to_string())
1939 .unwrap_or_else(|| default_op.clone());
1940
1941 let mut input = map.clone();
1942 let config = input.remove("config");
1943 let component = input
1944 .get("component")
1945 .or_else(|| input.get("component_ref"))
1946 .and_then(Value::as_str)
1947 .map(|s| s.to_string())
1948 .unwrap_or_else(|| component_ref.to_string());
1949 input.remove("component");
1950 input.remove("component_ref");
1951 input.remove("op");
1952 input.remove("operation");
1953 return (component, op, Value::Object(input), config);
1954 }
1955
1956 (component_ref.to_string(), default_op, payload.clone(), None)
1957}
1958
1959#[derive(Clone, Debug)]
1960struct ComponentSpec {
1961 id: String,
1962 version: String,
1963 legacy_path: Option<String>,
1964}
1965
1966#[derive(Clone, Debug)]
1967struct ComponentSourceInfo {
1968 digest: Option<String>,
1969 source: ComponentSourceRef,
1970 artifact: ComponentArtifactLocation,
1971 expected_wasm_sha256: Option<String>,
1972 skip_digest_verification: bool,
1973}
1974
1975#[derive(Clone, Debug)]
1976enum ComponentArtifactLocation {
1977 Inline { wasm_path: String },
1978 Remote,
1979}
1980
1981#[derive(Clone, Debug, Deserialize)]
1982struct PackLockV1 {
1983 schema_version: u32,
1984 components: Vec<PackLockComponent>,
1985}
1986
1987#[derive(Clone, Debug, Deserialize)]
1988struct PackLockComponent {
1989 name: String,
1990 #[serde(default, rename = "source_ref")]
1991 source_ref: Option<String>,
1992 #[serde(default, rename = "ref")]
1993 legacy_ref: Option<String>,
1994 #[serde(default)]
1995 component_id: Option<ComponentId>,
1996 #[serde(default)]
1997 bundled: Option<bool>,
1998 #[serde(default, rename = "bundled_path")]
1999 bundled_path: Option<String>,
2000 #[serde(default, rename = "path")]
2001 legacy_path: Option<String>,
2002 #[serde(default)]
2003 wasm_sha256: Option<String>,
2004 #[serde(default, rename = "sha256")]
2005 legacy_sha256: Option<String>,
2006 #[serde(default)]
2007 resolved_digest: Option<String>,
2008 #[serde(default)]
2009 digest: Option<String>,
2010}
2011
2012fn component_specs(
2013 manifest: Option<&greentic_types::PackManifest>,
2014 legacy_manifest: Option<&legacy_pack::PackManifest>,
2015 component_sources: Option<&ComponentSourcesV1>,
2016 pack_lock: Option<&PackLockV1>,
2017) -> Vec<ComponentSpec> {
2018 if let Some(manifest) = manifest {
2019 if !manifest.components.is_empty() {
2020 return manifest
2021 .components
2022 .iter()
2023 .map(|entry| ComponentSpec {
2024 id: entry.id.as_str().to_string(),
2025 version: entry.version.to_string(),
2026 legacy_path: None,
2027 })
2028 .collect();
2029 }
2030 if let Some(lock) = pack_lock {
2031 let mut seen = HashSet::new();
2032 let mut specs = Vec::new();
2033 for entry in &lock.components {
2034 let id = entry
2035 .component_id
2036 .as_ref()
2037 .map(|id| id.as_str())
2038 .unwrap_or(entry.name.as_str());
2039 if seen.insert(id.to_string()) {
2040 specs.push(ComponentSpec {
2041 id: id.to_string(),
2042 version: "0.0.0".to_string(),
2043 legacy_path: None,
2044 });
2045 }
2046 }
2047 return specs;
2048 }
2049 if let Some(sources) = component_sources {
2050 let mut seen = HashSet::new();
2051 let mut specs = Vec::new();
2052 for entry in &sources.components {
2053 let id = entry
2054 .component_id
2055 .as_ref()
2056 .map(|id| id.as_str())
2057 .unwrap_or(entry.name.as_str());
2058 if seen.insert(id.to_string()) {
2059 specs.push(ComponentSpec {
2060 id: id.to_string(),
2061 version: "0.0.0".to_string(),
2062 legacy_path: None,
2063 });
2064 }
2065 }
2066 return specs;
2067 }
2068 }
2069 if let Some(legacy_manifest) = legacy_manifest {
2070 return legacy_manifest
2071 .components
2072 .iter()
2073 .map(|entry| ComponentSpec {
2074 id: entry.name.clone(),
2075 version: entry.version.to_string(),
2076 legacy_path: Some(entry.file_wasm.clone()),
2077 })
2078 .collect();
2079 }
2080 Vec::new()
2081}
2082
2083fn component_sources_table(
2084 sources: Option<&ComponentSourcesV1>,
2085) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2086 let Some(sources) = sources else {
2087 return Ok(None);
2088 };
2089 let mut table = HashMap::new();
2090 for entry in &sources.components {
2091 let artifact = match &entry.artifact {
2092 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2093 wasm_path: wasm_path.clone(),
2094 },
2095 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2096 };
2097 let info = ComponentSourceInfo {
2098 digest: Some(entry.resolved.digest.clone()),
2099 source: entry.source.clone(),
2100 artifact,
2101 expected_wasm_sha256: None,
2102 skip_digest_verification: false,
2103 };
2104 if let Some(component_id) = entry.component_id.as_ref() {
2105 table.insert(component_id.as_str().to_string(), info.clone());
2106 }
2107 table.insert(entry.name.clone(), info);
2108 }
2109 Ok(Some(table))
2110}
2111
2112fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2113 let lock_path = if path.is_dir() {
2114 let candidate = path.join("pack.lock");
2115 if candidate.exists() {
2116 Some(candidate)
2117 } else {
2118 let candidate = path.join("pack.lock.json");
2119 candidate.exists().then_some(candidate)
2120 }
2121 } else {
2122 None
2123 };
2124 let Some(lock_path) = lock_path else {
2125 return Ok(None);
2126 };
2127 let raw = std::fs::read_to_string(&lock_path)
2128 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2129 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2130 if lock.schema_version != 1 {
2131 bail!("pack.lock schema_version must be 1");
2132 }
2133 Ok(Some(lock))
2134}
2135
2136fn find_pack_lock_roots(
2137 pack_path: &Path,
2138 is_dir: bool,
2139 archive_hint: Option<&Path>,
2140) -> Vec<PathBuf> {
2141 if is_dir {
2142 return vec![pack_path.to_path_buf()];
2143 }
2144 let mut roots = Vec::new();
2145 if let Some(archive_path) = archive_hint {
2146 if let Some(parent) = archive_path.parent() {
2147 roots.push(parent.to_path_buf());
2148 if let Some(grandparent) = parent.parent() {
2149 roots.push(grandparent.to_path_buf());
2150 }
2151 }
2152 } else if let Some(parent) = pack_path.parent() {
2153 roots.push(parent.to_path_buf());
2154 if let Some(grandparent) = parent.parent() {
2155 roots.push(grandparent.to_path_buf());
2156 }
2157 }
2158 roots
2159}
2160
2161fn normalize_sha256(digest: &str) -> Result<String> {
2162 let trimmed = digest.trim();
2163 if trimmed.is_empty() {
2164 bail!("sha256 digest cannot be empty");
2165 }
2166 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2167 if stripped.is_empty() {
2168 bail!("sha256 digest must include hex bytes after sha256:");
2169 }
2170 return Ok(trimmed.to_string());
2171 }
2172 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2173 return Ok(format!("sha256:{trimmed}"));
2174 }
2175 bail!("sha256 digest must be hex or sha256:<hex>");
2176}
2177
2178fn component_sources_table_from_pack_lock(
2179 lock: &PackLockV1,
2180 allow_missing_hash: bool,
2181) -> Result<HashMap<String, ComponentSourceInfo>> {
2182 let mut table = HashMap::new();
2183 let mut names = HashSet::new();
2184 for entry in &lock.components {
2185 if !names.insert(entry.name.clone()) {
2186 bail!(
2187 "pack.lock contains duplicate component name `{}`",
2188 entry.name
2189 );
2190 }
2191 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2192 (Some(primary), Some(legacy)) => {
2193 if primary != legacy {
2194 bail!(
2195 "pack.lock component {} has conflicting refs: {} vs {}",
2196 entry.name,
2197 primary,
2198 legacy
2199 );
2200 }
2201 primary.as_str()
2202 }
2203 (Some(primary), None) => primary.as_str(),
2204 (None, Some(legacy)) => legacy.as_str(),
2205 (None, None) => {
2206 bail!("pack.lock component {} missing source_ref", entry.name);
2207 }
2208 };
2209 let source: ComponentSourceRef = source_ref
2210 .parse()
2211 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2212 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2213 (Some(primary), Some(legacy)) => {
2214 if primary != legacy {
2215 bail!(
2216 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2217 entry.name,
2218 primary,
2219 legacy
2220 );
2221 }
2222 Some(primary.clone())
2223 }
2224 (Some(primary), None) => Some(primary.clone()),
2225 (None, Some(legacy)) => Some(legacy.clone()),
2226 (None, None) => None,
2227 };
2228 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2229 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2230 let wasm_path = bundled_path.ok_or_else(|| {
2231 anyhow!(
2232 "pack.lock component {} marked bundled but bundled_path is missing",
2233 entry.name
2234 )
2235 })?;
2236 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2237 (Some(primary), Some(legacy)) => {
2238 if primary != legacy {
2239 bail!(
2240 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2241 entry.name,
2242 primary,
2243 legacy
2244 );
2245 }
2246 Some(primary.as_str())
2247 }
2248 (Some(primary), None) => Some(primary.as_str()),
2249 (None, Some(legacy)) => Some(legacy.as_str()),
2250 (None, None) => None,
2251 };
2252 let expected = match expected_raw {
2253 Some(value) => Some(normalize_sha256(value)?),
2254 None => None,
2255 };
2256 if expected.is_none() && !allow_missing_hash {
2257 bail!(
2258 "pack.lock component {} missing wasm_sha256 for bundled component",
2259 entry.name
2260 );
2261 }
2262 (
2263 ComponentArtifactLocation::Inline { wasm_path },
2264 expected.clone(),
2265 expected,
2266 allow_missing_hash && expected_raw.is_none(),
2267 )
2268 } else {
2269 if source.is_tag() {
2270 bail!(
2271 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2272 entry.name,
2273 source
2274 );
2275 }
2276 let expected = entry
2277 .resolved_digest
2278 .as_deref()
2279 .or(entry.digest.as_deref())
2280 .ok_or_else(|| {
2281 anyhow!(
2282 "pack.lock component {} missing resolved_digest for remote component",
2283 entry.name
2284 )
2285 })?;
2286 (
2287 ComponentArtifactLocation::Remote,
2288 Some(normalize_digest(expected)),
2289 None,
2290 false,
2291 )
2292 };
2293 let info = ComponentSourceInfo {
2294 digest,
2295 source,
2296 artifact,
2297 expected_wasm_sha256,
2298 skip_digest_verification,
2299 };
2300 if let Some(component_id) = entry.component_id.as_ref() {
2301 let key = component_id.as_str().to_string();
2302 if table.contains_key(&key) {
2303 bail!(
2304 "pack.lock contains duplicate component id `{}`",
2305 component_id.as_str()
2306 );
2307 }
2308 table.insert(key, info.clone());
2309 }
2310 if entry.name
2311 != entry
2312 .component_id
2313 .as_ref()
2314 .map(|id| id.as_str())
2315 .unwrap_or("")
2316 {
2317 table.insert(entry.name.clone(), info);
2318 }
2319 }
2320 Ok(table)
2321}
2322
2323fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2324 if let Some(path) = &spec.legacy_path {
2325 return root.join(path);
2326 }
2327 root.join("components").join(format!("{}.wasm", spec.id))
2328}
2329
2330fn normalize_digest(digest: &str) -> String {
2331 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2332 digest.to_string()
2333 } else {
2334 format!("sha256:{digest}")
2335 }
2336}
2337
2338fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2339 if digest.starts_with("blake3:") {
2340 let hash = blake3::hash(bytes);
2341 return Ok(format!("blake3:{}", hash.to_hex()));
2342 }
2343 let mut hasher = sha2::Sha256::new();
2344 hasher.update(bytes);
2345 Ok(format!("sha256:{:x}", hasher.finalize()))
2346}
2347
2348fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2349 let mut hasher = sha2::Sha256::new();
2350 hasher.update(bytes);
2351 format!("sha256:{:x}", hasher.finalize())
2352}
2353
2354fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2355 let wasm_digest = digest
2356 .map(normalize_digest)
2357 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2358 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2359}
2360
2361async fn compile_component_with_cache(
2362 cache: &CacheManager,
2363 engine: &Engine,
2364 digest: Option<&str>,
2365 bytes: Vec<u8>,
2366) -> Result<Arc<Component>> {
2367 let key = build_artifact_key(cache, digest, &bytes);
2368 cache.get_component(engine, &key, || Ok(bytes)).await
2369}
2370
2371fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2372 let normalized_expected = normalize_digest(expected);
2373 let actual = compute_digest_for(bytes, &normalized_expected)?;
2374 if normalize_digest(&actual) != normalized_expected {
2375 bail!(
2376 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2377 );
2378 }
2379 Ok(())
2380}
2381
2382fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2383 let normalized_expected = normalize_sha256(expected)?;
2384 let actual = compute_sha256_digest_for(bytes);
2385 if actual != normalized_expected {
2386 bail!(
2387 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2388 );
2389 }
2390 Ok(())
2391}
2392
2393#[cfg(test)]
2394mod pack_lock_tests {
2395 use super::*;
2396 use tempfile::TempDir;
2397
2398 #[test]
2399 fn pack_lock_tag_ref_requires_bundle() {
2400 let lock = PackLockV1 {
2401 schema_version: 1,
2402 components: vec![PackLockComponent {
2403 name: "templates".to_string(),
2404 source_ref: Some("oci://registry.test/templates:latest".to_string()),
2405 legacy_ref: None,
2406 component_id: None,
2407 bundled: Some(false),
2408 bundled_path: None,
2409 legacy_path: None,
2410 wasm_sha256: None,
2411 legacy_sha256: None,
2412 resolved_digest: None,
2413 digest: None,
2414 }],
2415 };
2416 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2417 assert!(
2418 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2419 "unexpected error: {err}"
2420 );
2421 }
2422
2423 #[test]
2424 fn bundled_hash_mismatch_errors() {
2425 let rt = tokio::runtime::Runtime::new().expect("runtime");
2426 let temp = TempDir::new().expect("temp dir");
2427 let engine = Engine::default();
2428 let engine_profile =
2429 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2430 let cache_config = CacheConfig {
2431 root: temp.path().join("cache"),
2432 ..CacheConfig::default()
2433 };
2434 let cache = CacheManager::new(cache_config, engine_profile);
2435 let wasm_path = temp.path().join("component.wasm");
2436 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2437 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2438 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2439 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2440
2441 let spec = ComponentSpec {
2442 id: "qa.process".to_string(),
2443 version: "0.0.0".to_string(),
2444 legacy_path: None,
2445 };
2446 let mut missing = HashSet::new();
2447 missing.insert(spec.id.clone());
2448
2449 let mut sources = HashMap::new();
2450 sources.insert(
2451 spec.id.clone(),
2452 ComponentSourceInfo {
2453 digest: Some("sha256:deadbeef".to_string()),
2454 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2455 artifact: ComponentArtifactLocation::Inline {
2456 wasm_path: "component.wasm".to_string(),
2457 },
2458 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2459 skip_digest_verification: false,
2460 },
2461 );
2462
2463 let mut loaded = HashMap::new();
2464 let result = rt.block_on(load_components_from_sources(
2465 &cache,
2466 &engine,
2467 &sources,
2468 &ComponentResolution::default(),
2469 &[spec],
2470 &mut missing,
2471 &mut loaded,
2472 Some(temp.path()),
2473 None,
2474 ));
2475 let err = result.unwrap_err();
2476 assert!(
2477 err.to_string().contains("bundled digest mismatch"),
2478 "unexpected error: {err}"
2479 );
2480 }
2481}
2482
2483fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2484 let mut opts = DistOptions {
2485 allow_tags: true,
2486 ..DistOptions::default()
2487 };
2488 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2489 opts.cache_dir = cache_dir;
2490 }
2491 if component_resolution.dist_offline {
2492 opts.offline = true;
2493 }
2494 opts
2495}
2496
2497#[allow(clippy::too_many_arguments)]
2498async fn load_components_from_sources(
2499 cache: &CacheManager,
2500 engine: &Engine,
2501 component_sources: &HashMap<String, ComponentSourceInfo>,
2502 component_resolution: &ComponentResolution,
2503 specs: &[ComponentSpec],
2504 missing: &mut HashSet<String>,
2505 into: &mut HashMap<String, PackComponent>,
2506 materialized_root: Option<&Path>,
2507 archive_hint: Option<&Path>,
2508) -> Result<()> {
2509 let mut archive = if let Some(path) = archive_hint {
2510 Some(
2511 ZipArchive::new(File::open(path)?)
2512 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2513 )
2514 } else {
2515 None
2516 };
2517 let mut dist_client: Option<DistClient> = None;
2518
2519 for spec in specs {
2520 if !missing.contains(&spec.id) {
2521 continue;
2522 }
2523 let Some(source) = component_sources.get(&spec.id) else {
2524 continue;
2525 };
2526
2527 let bytes = match &source.artifact {
2528 ComponentArtifactLocation::Inline { wasm_path } => {
2529 if let Some(root) = materialized_root {
2530 let path = root.join(wasm_path);
2531 if path.exists() {
2532 std::fs::read(&path).with_context(|| {
2533 format!(
2534 "failed to read inline component {} from {}",
2535 spec.id,
2536 path.display()
2537 )
2538 })?
2539 } else if archive.is_none() {
2540 bail!("inline component {} missing at {}", spec.id, path.display());
2541 } else {
2542 read_entry(
2543 archive.as_mut().expect("archive present when needed"),
2544 wasm_path,
2545 )
2546 .with_context(|| {
2547 format!(
2548 "inline component {} missing at {} in pack archive",
2549 spec.id, wasm_path
2550 )
2551 })?
2552 }
2553 } else if let Some(archive) = archive.as_mut() {
2554 read_entry(archive, wasm_path).with_context(|| {
2555 format!(
2556 "inline component {} missing at {} in pack archive",
2557 spec.id, wasm_path
2558 )
2559 })?
2560 } else {
2561 bail!(
2562 "inline component {} missing and no pack source available",
2563 spec.id
2564 );
2565 }
2566 }
2567 ComponentArtifactLocation::Remote => {
2568 if source.source.is_tag() {
2569 bail!(
2570 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2571 spec.id,
2572 source.source
2573 );
2574 }
2575 let client = dist_client.get_or_insert_with(|| {
2576 DistClient::new(dist_options_from(component_resolution))
2577 });
2578 let reference = source.source.to_string();
2579 fault::maybe_fail_asset(&reference)
2580 .await
2581 .with_context(|| format!("fault injection blocked asset {reference}"))?;
2582 let digest = source.digest.as_deref().ok_or_else(|| {
2583 anyhow!(
2584 "component {} missing expected digest for remote component",
2585 spec.id
2586 )
2587 })?;
2588 let cache_path = if component_resolution.dist_offline {
2589 client
2590 .fetch_digest(digest)
2591 .await
2592 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2593 } else {
2594 let resolved = client
2595 .resolve_ref(&reference)
2596 .await
2597 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2598 let expected = normalize_digest(digest);
2599 let actual = normalize_digest(&resolved.digest);
2600 if expected != actual {
2601 bail!(
2602 "component {} digest mismatch after fetch: expected {}, got {}",
2603 spec.id,
2604 expected,
2605 actual
2606 );
2607 }
2608 resolved.cache_path.ok_or_else(|| {
2609 anyhow!(
2610 "component {} resolved from {} but cache path is missing",
2611 spec.id,
2612 reference
2613 )
2614 })?
2615 };
2616 std::fs::read(&cache_path).with_context(|| {
2617 format!(
2618 "failed to read cached component {} from {}",
2619 spec.id,
2620 cache_path.display()
2621 )
2622 })?
2623 }
2624 };
2625
2626 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
2627 verify_wasm_sha256(&spec.id, expected, &bytes)?;
2628 } else if source.skip_digest_verification {
2629 let actual = compute_sha256_digest_for(&bytes);
2630 warn!(
2631 component_id = %spec.id,
2632 digest = %actual,
2633 "bundled component missing wasm_sha256; allowing due to flag"
2634 );
2635 } else {
2636 let expected = source.digest.as_deref().ok_or_else(|| {
2637 anyhow!(
2638 "component {} missing expected digest for verification",
2639 spec.id
2640 )
2641 })?;
2642 verify_component_digest(&spec.id, expected, &bytes)?;
2643 }
2644 let component =
2645 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
2646 .await
2647 .with_context(|| format!("failed to compile component {}", spec.id))?;
2648 into.insert(
2649 spec.id.clone(),
2650 PackComponent {
2651 name: spec.id.clone(),
2652 version: spec.version.clone(),
2653 component,
2654 },
2655 );
2656 missing.remove(&spec.id);
2657 }
2658
2659 Ok(())
2660}
2661
2662fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2663 match err {
2664 DistError::CacheMiss { reference: missing } => anyhow!(
2665 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2666 component_id,
2667 missing,
2668 reference
2669 ),
2670 DistError::Offline { reference: blocked } => anyhow!(
2671 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2672 component_id,
2673 blocked,
2674 reference
2675 ),
2676 DistError::AuthRequired { target } => anyhow!(
2677 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2678 component_id,
2679 target,
2680 reference
2681 ),
2682 other => anyhow!(
2683 "failed to resolve component {} from {}: {}",
2684 component_id,
2685 reference,
2686 other
2687 ),
2688 }
2689}
2690
2691async fn load_components_from_overrides(
2692 cache: &CacheManager,
2693 engine: &Engine,
2694 overrides: &HashMap<String, PathBuf>,
2695 specs: &[ComponentSpec],
2696 missing: &mut HashSet<String>,
2697 into: &mut HashMap<String, PackComponent>,
2698) -> Result<()> {
2699 for spec in specs {
2700 if !missing.contains(&spec.id) {
2701 continue;
2702 }
2703 let Some(path) = overrides.get(&spec.id) else {
2704 continue;
2705 };
2706 let bytes = std::fs::read(path)
2707 .with_context(|| format!("failed to read override component {}", path.display()))?;
2708 let component = compile_component_with_cache(cache, engine, None, bytes)
2709 .await
2710 .with_context(|| {
2711 format!(
2712 "failed to compile component {} from override {}",
2713 spec.id,
2714 path.display()
2715 )
2716 })?;
2717 into.insert(
2718 spec.id.clone(),
2719 PackComponent {
2720 name: spec.id.clone(),
2721 version: spec.version.clone(),
2722 component,
2723 },
2724 );
2725 missing.remove(&spec.id);
2726 }
2727 Ok(())
2728}
2729
2730async fn load_components_from_dir(
2731 cache: &CacheManager,
2732 engine: &Engine,
2733 root: &Path,
2734 specs: &[ComponentSpec],
2735 missing: &mut HashSet<String>,
2736 into: &mut HashMap<String, PackComponent>,
2737) -> Result<()> {
2738 for spec in specs {
2739 if !missing.contains(&spec.id) {
2740 continue;
2741 }
2742 let path = component_path_for_spec(root, spec);
2743 if !path.exists() {
2744 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2745 continue;
2746 }
2747 let bytes = std::fs::read(&path)
2748 .with_context(|| format!("failed to read component {}", path.display()))?;
2749 let component = compile_component_with_cache(cache, engine, None, bytes)
2750 .await
2751 .with_context(|| {
2752 format!(
2753 "failed to compile component {} from {}",
2754 spec.id,
2755 path.display()
2756 )
2757 })?;
2758 into.insert(
2759 spec.id.clone(),
2760 PackComponent {
2761 name: spec.id.clone(),
2762 version: spec.version.clone(),
2763 component,
2764 },
2765 );
2766 missing.remove(&spec.id);
2767 }
2768 Ok(())
2769}
2770
2771async fn load_components_from_archive(
2772 cache: &CacheManager,
2773 engine: &Engine,
2774 path: &Path,
2775 specs: &[ComponentSpec],
2776 missing: &mut HashSet<String>,
2777 into: &mut HashMap<String, PackComponent>,
2778) -> Result<()> {
2779 let mut archive = ZipArchive::new(File::open(path)?)
2780 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2781 for spec in specs {
2782 if !missing.contains(&spec.id) {
2783 continue;
2784 }
2785 let file_name = spec
2786 .legacy_path
2787 .clone()
2788 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2789 let bytes = match read_entry(&mut archive, &file_name) {
2790 Ok(bytes) => bytes,
2791 Err(err) => {
2792 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2793 continue;
2794 }
2795 };
2796 let component = compile_component_with_cache(cache, engine, None, bytes)
2797 .await
2798 .with_context(|| format!("failed to compile component {}", spec.id))?;
2799 into.insert(
2800 spec.id.clone(),
2801 PackComponent {
2802 name: spec.id.clone(),
2803 version: spec.version.clone(),
2804 component,
2805 },
2806 );
2807 missing.remove(&spec.id);
2808 }
2809 Ok(())
2810}
2811
2812#[cfg(test)]
2813mod tests {
2814 use super::*;
2815 use greentic_flow::model::{FlowDoc, NodeDoc};
2816 use indexmap::IndexMap;
2817 use serde_json::json;
2818
2819 #[test]
2820 fn normalizes_raw_component_to_component_exec() {
2821 let mut nodes = IndexMap::new();
2822 let mut raw = IndexMap::new();
2823 raw.insert(
2824 "templating.handlebars".into(),
2825 json!({ "template": "Hi {{name}}" }),
2826 );
2827 nodes.insert(
2828 "start".into(),
2829 NodeDoc {
2830 raw,
2831 routing: json!([{"out": true}]),
2832 ..Default::default()
2833 },
2834 );
2835 let doc = FlowDoc {
2836 id: "welcome".into(),
2837 title: None,
2838 description: None,
2839 flow_type: "messaging".into(),
2840 start: Some("start".into()),
2841 parameters: json!({}),
2842 tags: Vec::new(),
2843 schema_version: None,
2844 entrypoints: IndexMap::new(),
2845 nodes,
2846 };
2847
2848 let normalized = normalize_flow_doc(doc);
2849 let node = normalized.nodes.get("start").expect("node exists");
2850 assert_eq!(node.operation.as_deref(), Some("component.exec"));
2851 assert!(node.raw.is_empty());
2852 let payload = node.payload.as_object().expect("payload object");
2853 assert_eq!(
2854 payload.get("component"),
2855 Some(&Value::String("templating.handlebars".into()))
2856 );
2857 assert_eq!(
2858 payload.get("operation"),
2859 Some(&Value::String("render".into()))
2860 );
2861 let input = payload.get("input").unwrap();
2862 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2863 }
2864}
2865
2866#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2867pub struct PackMetadata {
2868 pub pack_id: String,
2869 pub version: String,
2870 #[serde(default)]
2871 pub entry_flows: Vec<String>,
2872 #[serde(default)]
2873 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2874}
2875
2876impl PackMetadata {
2877 fn from_wasm(bytes: &[u8]) -> Option<Self> {
2878 let parser = Parser::new(0);
2879 for payload in parser.parse_all(bytes) {
2880 let payload = payload.ok()?;
2881 match payload {
2882 Payload::CustomSection(section) => {
2883 if section.name() == "greentic.manifest"
2884 && let Ok(meta) = Self::from_bytes(section.data())
2885 {
2886 return Some(meta);
2887 }
2888 }
2889 Payload::DataSection(reader) => {
2890 for segment in reader.into_iter().flatten() {
2891 if let Ok(meta) = Self::from_bytes(segment.data) {
2892 return Some(meta);
2893 }
2894 }
2895 }
2896 _ => {}
2897 }
2898 }
2899 None
2900 }
2901
2902 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2903 #[derive(Deserialize)]
2904 struct RawManifest {
2905 pack_id: String,
2906 version: String,
2907 #[serde(default)]
2908 entry_flows: Vec<String>,
2909 #[serde(default)]
2910 flows: Vec<RawFlow>,
2911 #[serde(default)]
2912 secret_requirements: Vec<greentic_types::SecretRequirement>,
2913 }
2914
2915 #[derive(Deserialize)]
2916 struct RawFlow {
2917 id: String,
2918 }
2919
2920 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2921 let mut entry_flows = if manifest.entry_flows.is_empty() {
2922 manifest.flows.iter().map(|f| f.id.clone()).collect()
2923 } else {
2924 manifest.entry_flows.clone()
2925 };
2926 entry_flows.retain(|id| !id.is_empty());
2927 Ok(Self {
2928 pack_id: manifest.pack_id,
2929 version: manifest.version,
2930 entry_flows,
2931 secret_requirements: manifest.secret_requirements,
2932 })
2933 }
2934
2935 pub fn fallback(path: &Path) -> Self {
2936 let pack_id = path
2937 .file_stem()
2938 .map(|s| s.to_string_lossy().into_owned())
2939 .unwrap_or_else(|| "unknown-pack".to_string());
2940 Self {
2941 pack_id,
2942 version: "0.0.0".to_string(),
2943 entry_flows: Vec::new(),
2944 secret_requirements: Vec::new(),
2945 }
2946 }
2947
2948 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2949 let entry_flows = manifest
2950 .flows
2951 .iter()
2952 .map(|flow| flow.id.as_str().to_string())
2953 .collect::<Vec<_>>();
2954 Self {
2955 pack_id: manifest.pack_id.as_str().to_string(),
2956 version: manifest.version.to_string(),
2957 entry_flows,
2958 secret_requirements: manifest.secret_requirements.clone(),
2959 }
2960 }
2961}