1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21 self as host_v1, HostFns, add_all_v1_to_linker,
22 runner_host_http::RunnerHostHttp,
23 runner_host_kv::RunnerHostKv,
24 secrets_store::{SecretsError, SecretsStoreHost},
25 state_store::{
26 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27 StateStoreHost, TenantCtx as StateTenantCtx,
28 },
29 telemetry_logger::{
30 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32 TenantCtx as TelemetryTenantCtx,
33 },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
41 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
42 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
43 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44 pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69
70use crate::config::HostConfig;
71use crate::secrets::{DynSecretsManager, read_secret_blocking};
72use crate::storage::state::STATE_PREFIX;
73use crate::storage::{DynSessionStore, DynStateStore};
74use crate::verify;
75use crate::wasi::RunnerWasiPolicy;
76use tracing::warn;
77use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
78use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
79
80use greentic_flow::model::FlowDoc;
81
82#[allow(dead_code)]
83pub struct PackRuntime {
84 path: PathBuf,
86 archive_path: Option<PathBuf>,
88 config: Arc<HostConfig>,
89 engine: Engine,
90 metadata: PackMetadata,
91 manifest: Option<greentic_types::PackManifest>,
92 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
93 component_manifests: HashMap<String, ComponentManifest>,
94 mocks: Option<Arc<MockLayer>>,
95 flows: Option<PackFlows>,
96 components: HashMap<String, PackComponent>,
97 http_client: Arc<BlockingClient>,
98 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
99 session_store: Option<DynSessionStore>,
100 state_store: Option<DynStateStore>,
101 wasi_policy: Arc<RunnerWasiPolicy>,
102 provider_registry: RwLock<Option<ProviderRegistry>>,
103 secrets: DynSecretsManager,
104 oauth_config: Option<OAuthBrokerConfig>,
105 cache: CacheManager,
106}
107
108struct PackComponent {
109 #[allow(dead_code)]
110 name: String,
111 #[allow(dead_code)]
112 version: String,
113 component: Arc<Component>,
114}
115
116#[derive(Debug, Default, Clone)]
117pub struct ComponentResolution {
118 pub materialized_root: Option<PathBuf>,
120 pub overrides: HashMap<String, PathBuf>,
122 pub dist_offline: bool,
124 pub dist_cache_dir: Option<PathBuf>,
126 pub allow_missing_hash: bool,
128}
129
130fn build_blocking_client() -> BlockingClient {
131 std::thread::spawn(|| {
132 BlockingClient::builder()
133 .no_proxy()
134 .build()
135 .expect("blocking client")
136 })
137 .join()
138 .expect("client build thread panicked")
139}
140
141fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
142 let (root, candidate) = if path.is_absolute() {
143 let parent = path
144 .parent()
145 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
146 let root = parent
147 .canonicalize()
148 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
149 let file = path
150 .file_name()
151 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
152 (root, PathBuf::from(file))
153 } else {
154 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
155 let base = if let Some(parent) = path.parent() {
156 cwd.join(parent)
157 } else {
158 cwd
159 };
160 let root = base
161 .canonicalize()
162 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
163 let file = path
164 .file_name()
165 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
166 (root, PathBuf::from(file))
167 };
168 let safe = normalize_under_root(&root, &candidate)?;
169 Ok((root, safe))
170}
171
172static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct FlowDescriptor {
176 pub id: String,
177 #[serde(rename = "type")]
178 pub flow_type: String,
179 pub pack_id: String,
180 pub profile: String,
181 pub version: String,
182 #[serde(default)]
183 pub description: Option<String>,
184}
185
186pub struct HostState {
187 config: Arc<HostConfig>,
188 http_client: Arc<BlockingClient>,
189 default_env: String,
190 #[allow(dead_code)]
191 session_store: Option<DynSessionStore>,
192 state_store: Option<DynStateStore>,
193 mocks: Option<Arc<MockLayer>>,
194 secrets: DynSecretsManager,
195 oauth_config: Option<OAuthBrokerConfig>,
196 oauth_host: OAuthBrokerHost,
197 exec_ctx: Option<ComponentExecCtx>,
198}
199
200impl HostState {
201 #[allow(clippy::default_constructed_unit_structs)]
202 #[allow(clippy::too_many_arguments)]
203 pub fn new(
204 config: Arc<HostConfig>,
205 http_client: Arc<BlockingClient>,
206 mocks: Option<Arc<MockLayer>>,
207 session_store: Option<DynSessionStore>,
208 state_store: Option<DynStateStore>,
209 secrets: DynSecretsManager,
210 oauth_config: Option<OAuthBrokerConfig>,
211 exec_ctx: Option<ComponentExecCtx>,
212 ) -> Result<Self> {
213 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
214 Ok(Self {
215 config,
216 http_client,
217 default_env,
218 session_store,
219 state_store,
220 mocks,
221 secrets,
222 oauth_config,
223 oauth_host: OAuthBrokerHost::default(),
224 exec_ctx,
225 })
226 }
227
228 pub fn get_secret(&self, key: &str) -> Result<String> {
229 if provider_core_only::is_enabled() {
230 bail!(provider_core_only::blocked_message("secrets"))
231 }
232 if !self.config.secrets_policy.is_allowed(key) {
233 bail!("secret {key} is not permitted by bindings policy");
234 }
235 if let Some(mock) = &self.mocks
236 && let Some(value) = mock.secrets_lookup(key)
237 {
238 return Ok(value);
239 }
240 let ctx = self.config.tenant_ctx();
241 let bytes = read_secret_blocking(&self.secrets, &ctx, key)
242 .context("failed to read secret from manager")?;
243 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
244 Ok(value)
245 }
246
247 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
248 let tenant_raw = ctx
249 .as_ref()
250 .map(|ctx| ctx.tenant.clone())
251 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
252 .unwrap_or_else(|| self.config.tenant.clone());
253 let env_raw = ctx
254 .as_ref()
255 .map(|ctx| ctx.env.clone())
256 .unwrap_or_else(|| self.default_env.clone());
257 let tenant_id = TenantId::from_str(&tenant_raw)
258 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
259 let env_id = EnvId::from_str(&env_raw)
260 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
261 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
262 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
263 if let Some(team) = exec_ctx.tenant.team.as_ref() {
264 let team_id =
265 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
266 tenant_ctx = tenant_ctx.with_team(Some(team_id));
267 }
268 if let Some(user) = exec_ctx.tenant.user.as_ref() {
269 let user_id =
270 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
271 tenant_ctx = tenant_ctx.with_user(Some(user_id));
272 }
273 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
274 if let Some(node) = exec_ctx.node_id.as_ref() {
275 tenant_ctx = tenant_ctx.with_node(node.clone());
276 }
277 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
278 tenant_ctx = tenant_ctx.with_session(session.clone());
279 }
280 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
281 }
282
283 if let Some(ctx) = ctx {
284 if let Some(team) = ctx.team.or(ctx.team_id) {
285 let team_id =
286 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
287 tenant_ctx = tenant_ctx.with_team(Some(team_id));
288 }
289 if let Some(user) = ctx.user.or(ctx.user_id) {
290 let user_id =
291 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
292 tenant_ctx = tenant_ctx.with_user(Some(user_id));
293 }
294 if let Some(flow) = ctx.flow_id {
295 tenant_ctx = tenant_ctx.with_flow(flow);
296 }
297 if let Some(node) = ctx.node_id {
298 tenant_ctx = tenant_ctx.with_node(node);
299 }
300 if let Some(provider) = ctx.provider_id {
301 tenant_ctx = tenant_ctx.with_provider(provider);
302 }
303 if let Some(session) = ctx.session_id {
304 tenant_ctx = tenant_ctx.with_session(session);
305 }
306 tenant_ctx.trace_id = ctx.trace_id;
307 }
308 Ok(tenant_ctx)
309 }
310
311 fn send_http_request(
312 &mut self,
313 req: HttpRequest,
314 opts: Option<HttpRequestOptionsV1_1>,
315 _ctx: Option<HttpTenantCtx>,
316 ) -> Result<HttpResponse, HttpClientError> {
317 if !self.config.http_enabled {
318 return Err(HttpClientError {
319 code: "denied".into(),
320 message: "http client disabled by policy".into(),
321 });
322 }
323
324 let mut mock_state = None;
325 let raw_body = req.body.clone();
326 if let Some(mock) = &self.mocks
327 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
328 {
329 match mock.http_begin(&meta) {
330 HttpDecision::Mock(response) => {
331 let headers = response
332 .headers
333 .iter()
334 .map(|(k, v)| (k.clone(), v.clone()))
335 .collect();
336 return Ok(HttpResponse {
337 status: response.status,
338 headers,
339 body: response.body.clone().map(|b| b.into_bytes()),
340 });
341 }
342 HttpDecision::Deny(reason) => {
343 return Err(HttpClientError {
344 code: "denied".into(),
345 message: reason,
346 });
347 }
348 HttpDecision::Passthrough { record } => {
349 mock_state = Some((meta, record));
350 }
351 }
352 }
353
354 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
355 let mut builder = self.http_client.request(method, &req.url);
356 for (key, value) in req.headers {
357 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
358 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
359 {
360 builder = builder.header(header, header_value);
361 }
362 }
363
364 if let Some(body) = raw_body.clone() {
365 builder = builder.body(body);
366 }
367
368 if let Some(opts) = opts {
369 if let Some(timeout_ms) = opts.timeout_ms {
370 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
371 }
372 if opts.allow_insecure == Some(true) {
373 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
374 }
375 if let Some(follow_redirects) = opts.follow_redirects
376 && !follow_redirects
377 {
378 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
379 }
380 }
381
382 let response = match builder.send() {
383 Ok(resp) => resp,
384 Err(err) => {
385 warn!(url = %req.url, error = %err, "http client request failed");
386 return Err(HttpClientError {
387 code: "unavailable".into(),
388 message: err.to_string(),
389 });
390 }
391 };
392
393 let status = response.status().as_u16();
394 let headers_vec = response
395 .headers()
396 .iter()
397 .map(|(k, v)| {
398 (
399 k.as_str().to_string(),
400 v.to_str().unwrap_or_default().to_string(),
401 )
402 })
403 .collect::<Vec<_>>();
404 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
405
406 if let Some((meta, true)) = mock_state.take()
407 && let Some(mock) = &self.mocks
408 {
409 let recorded = HttpMockResponse::new(
410 status,
411 headers_vec.clone().into_iter().collect(),
412 body_bytes
413 .as_ref()
414 .map(|b| String::from_utf8_lossy(b).into_owned()),
415 );
416 mock.http_record(&meta, &recorded);
417 }
418
419 Ok(HttpResponse {
420 status,
421 headers: headers_vec,
422 body: body_bytes,
423 })
424 }
425}
426
427impl SecretsStoreHost for HostState {
428 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
429 if provider_core_only::is_enabled() {
430 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
431 return Err(SecretsError::Denied);
432 }
433 if !self.config.secrets_policy.is_allowed(&key) {
434 return Err(SecretsError::Denied);
435 }
436 if let Some(mock) = &self.mocks
437 && let Some(value) = mock.secrets_lookup(&key)
438 {
439 return Ok(Some(value.into_bytes()));
440 }
441 let ctx = self.config.tenant_ctx();
442 match read_secret_blocking(&self.secrets, &ctx, &key) {
443 Ok(bytes) => Ok(Some(bytes)),
444 Err(err) => {
445 warn!(secret = %key, error = %err, "secret lookup failed");
446 Err(SecretsError::NotFound)
447 }
448 }
449 }
450}
451
452impl HttpClientHost for HostState {
453 fn send(
454 &mut self,
455 req: HttpRequest,
456 ctx: Option<HttpTenantCtx>,
457 ) -> Result<HttpResponse, HttpClientError> {
458 self.send_http_request(req, None, ctx)
459 }
460}
461
462impl HttpClientHostV1_1 for HostState {
463 fn send(
464 &mut self,
465 req: HttpRequestV1_1,
466 opts: Option<HttpRequestOptionsV1_1>,
467 ctx: Option<HttpTenantCtxV1_1>,
468 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
469 let legacy_req = HttpRequest {
470 method: req.method,
471 url: req.url,
472 headers: req.headers,
473 body: req.body,
474 };
475 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
476 env: ctx.env,
477 tenant: ctx.tenant,
478 tenant_id: ctx.tenant_id,
479 team: ctx.team,
480 team_id: ctx.team_id,
481 user: ctx.user,
482 user_id: ctx.user_id,
483 trace_id: ctx.trace_id,
484 correlation_id: ctx.correlation_id,
485 attributes: ctx.attributes,
486 session_id: ctx.session_id,
487 flow_id: ctx.flow_id,
488 node_id: ctx.node_id,
489 provider_id: ctx.provider_id,
490 deadline_ms: ctx.deadline_ms,
491 attempt: ctx.attempt,
492 idempotency_key: ctx.idempotency_key,
493 impersonation: ctx
494 .impersonation
495 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
496 actor_id,
497 reason,
498 }),
499 });
500
501 self.send_http_request(legacy_req, opts, legacy_ctx)
502 .map(|resp| HttpResponseV1_1 {
503 status: resp.status,
504 headers: resp.headers,
505 body: resp.body,
506 })
507 .map_err(|err| HttpClientErrorV1_1 {
508 code: err.code,
509 message: err.message,
510 })
511 }
512}
513
514impl StateStoreHost for HostState {
515 fn read(
516 &mut self,
517 key: HostStateKey,
518 ctx: Option<StateTenantCtx>,
519 ) -> Result<Vec<u8>, StateError> {
520 let store = match self.state_store.as_ref() {
521 Some(store) => store.clone(),
522 None => {
523 return Err(StateError {
524 code: "unavailable".into(),
525 message: "state store not configured".into(),
526 });
527 }
528 };
529 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
530 Ok(ctx) => ctx,
531 Err(err) => {
532 return Err(StateError {
533 code: "invalid-ctx".into(),
534 message: err.to_string(),
535 });
536 }
537 };
538 let key = StoreStateKey::from(key);
539 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
540 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
541 Ok(None) => Err(StateError {
542 code: "not_found".into(),
543 message: "state key not found".into(),
544 }),
545 Err(err) => Err(StateError {
546 code: "internal".into(),
547 message: err.to_string(),
548 }),
549 }
550 }
551
552 fn write(
553 &mut self,
554 key: HostStateKey,
555 bytes: Vec<u8>,
556 ctx: Option<StateTenantCtx>,
557 ) -> Result<StateOpAck, StateError> {
558 let store = match self.state_store.as_ref() {
559 Some(store) => store.clone(),
560 None => {
561 return Err(StateError {
562 code: "unavailable".into(),
563 message: "state store not configured".into(),
564 });
565 }
566 };
567 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
568 Ok(ctx) => ctx,
569 Err(err) => {
570 return Err(StateError {
571 code: "invalid-ctx".into(),
572 message: err.to_string(),
573 });
574 }
575 };
576 let key = StoreStateKey::from(key);
577 let value = serde_json::from_slice(&bytes)
578 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
579 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
580 Ok(()) => Ok(StateOpAck::Ok),
581 Err(err) => Err(StateError {
582 code: "internal".into(),
583 message: err.to_string(),
584 }),
585 }
586 }
587
588 fn delete(
589 &mut self,
590 key: HostStateKey,
591 ctx: Option<StateTenantCtx>,
592 ) -> Result<StateOpAck, StateError> {
593 let store = match self.state_store.as_ref() {
594 Some(store) => store.clone(),
595 None => {
596 return Err(StateError {
597 code: "unavailable".into(),
598 message: "state store not configured".into(),
599 });
600 }
601 };
602 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
603 Ok(ctx) => ctx,
604 Err(err) => {
605 return Err(StateError {
606 code: "invalid-ctx".into(),
607 message: err.to_string(),
608 });
609 }
610 };
611 let key = StoreStateKey::from(key);
612 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
613 Ok(_) => Ok(StateOpAck::Ok),
614 Err(err) => Err(StateError {
615 code: "internal".into(),
616 message: err.to_string(),
617 }),
618 }
619 }
620}
621
622impl TelemetryLoggerHost for HostState {
623 fn log(
624 &mut self,
625 span: TelemetrySpanContext,
626 fields: Vec<(String, String)>,
627 _ctx: Option<TelemetryTenantCtx>,
628 ) -> Result<TelemetryAck, TelemetryError> {
629 if let Some(mock) = &self.mocks
630 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
631 {
632 return Ok(TelemetryAck::Ok);
633 }
634 let mut map = serde_json::Map::new();
635 for (k, v) in fields {
636 map.insert(k, Value::String(v));
637 }
638 tracing::info!(
639 tenant = %span.tenant,
640 flow_id = %span.flow_id,
641 node = ?span.node_id,
642 provider = %span.provider,
643 fields = %serde_json::Value::Object(map.clone()),
644 "telemetry log from pack"
645 );
646 Ok(TelemetryAck::Ok)
647 }
648}
649
650impl RunnerHostHttp for HostState {
651 fn request(
652 &mut self,
653 method: String,
654 url: String,
655 headers: Vec<String>,
656 body: Option<Vec<u8>>,
657 ) -> Result<Vec<u8>, String> {
658 let req = HttpRequest {
659 method,
660 url,
661 headers: headers
662 .chunks(2)
663 .filter_map(|chunk| {
664 if chunk.len() == 2 {
665 Some((chunk[0].clone(), chunk[1].clone()))
666 } else {
667 None
668 }
669 })
670 .collect(),
671 body,
672 };
673 match HttpClientHost::send(self, req, None) {
674 Ok(resp) => Ok(resp.body.unwrap_or_default()),
675 Err(err) => Err(err.message),
676 }
677 }
678}
679
680impl RunnerHostKv for HostState {
681 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
682 None
683 }
684
685 fn put(&mut self, _ns: String, _key: String, _val: String) {}
686}
687
688enum ManifestLoad {
689 New {
690 manifest: Box<greentic_types::PackManifest>,
691 flows: PackFlows,
692 },
693 Legacy {
694 manifest: Box<legacy_pack::PackManifest>,
695 flows: PackFlows,
696 },
697}
698
699fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
700 let mut archive = ZipArchive::new(File::open(path)?)
701 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
702 let bytes = read_entry(&mut archive, "manifest.cbor")
703 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
704 match decode_pack_manifest(&bytes) {
705 Ok(manifest) => {
706 let cache = PackFlows::from_manifest(manifest.clone());
707 Ok(ManifestLoad::New {
708 manifest: Box::new(manifest),
709 flows: cache,
710 })
711 }
712 Err(err) => {
713 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
714 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
716 .context("failed to decode legacy pack manifest from manifest.cbor")?;
717 let flows = load_legacy_flows(&mut archive, &legacy)?;
718 Ok(ManifestLoad::Legacy {
719 manifest: Box::new(legacy),
720 flows,
721 })
722 }
723 }
724}
725
726fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
727 let manifest_path = root.join("manifest.cbor");
728 let bytes = std::fs::read(&manifest_path)
729 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
730 match decode_pack_manifest(&bytes) {
731 Ok(manifest) => {
732 let cache = PackFlows::from_manifest(manifest.clone());
733 Ok(ManifestLoad::New {
734 manifest: Box::new(manifest),
735 flows: cache,
736 })
737 }
738 Err(err) => {
739 tracing::debug!(
740 error = %err,
741 pack = %root.display(),
742 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
743 );
744 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
745 .context("failed to decode legacy pack manifest from manifest.cbor")?;
746 let flows = load_legacy_flows_from_dir(root, &legacy)?;
747 Ok(ManifestLoad::Legacy {
748 manifest: Box::new(legacy),
749 flows,
750 })
751 }
752 }
753}
754
755fn load_legacy_flows(
756 archive: &mut ZipArchive<File>,
757 manifest: &legacy_pack::PackManifest,
758) -> Result<PackFlows> {
759 let mut flows = HashMap::new();
760 let mut descriptors = Vec::new();
761
762 for entry in &manifest.flows {
763 let bytes = read_entry(archive, &entry.file_json)
764 .with_context(|| format!("missing flow json {}", entry.file_json))?;
765 let doc: FlowDoc = serde_json::from_slice(&bytes)
766 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
767 let normalized = normalize_flow_doc(doc);
768 let flow_ir = flow_doc_to_ir(normalized)?;
769 let flow = flow_ir_to_flow(flow_ir)?;
770
771 descriptors.push(FlowDescriptor {
772 id: entry.id.clone(),
773 flow_type: entry.kind.clone(),
774 pack_id: manifest.meta.pack_id.clone(),
775 profile: manifest.meta.pack_id.clone(),
776 version: manifest.meta.version.to_string(),
777 description: None,
778 });
779 flows.insert(entry.id.clone(), flow);
780 }
781
782 let mut entry_flows = manifest.meta.entry_flows.clone();
783 if entry_flows.is_empty() {
784 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
785 }
786 let metadata = PackMetadata {
787 pack_id: manifest.meta.pack_id.clone(),
788 version: manifest.meta.version.to_string(),
789 entry_flows,
790 secret_requirements: Vec::new(),
791 };
792
793 Ok(PackFlows {
794 descriptors,
795 flows,
796 metadata,
797 })
798}
799
800fn load_legacy_flows_from_dir(
801 root: &Path,
802 manifest: &legacy_pack::PackManifest,
803) -> Result<PackFlows> {
804 let mut flows = HashMap::new();
805 let mut descriptors = Vec::new();
806
807 for entry in &manifest.flows {
808 let path = root.join(&entry.file_json);
809 let bytes = std::fs::read(&path)
810 .with_context(|| format!("missing flow json {}", path.display()))?;
811 let doc: FlowDoc = serde_json::from_slice(&bytes)
812 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
813 let normalized = normalize_flow_doc(doc);
814 let flow_ir = flow_doc_to_ir(normalized)?;
815 let flow = flow_ir_to_flow(flow_ir)?;
816
817 descriptors.push(FlowDescriptor {
818 id: entry.id.clone(),
819 flow_type: entry.kind.clone(),
820 pack_id: manifest.meta.pack_id.clone(),
821 profile: manifest.meta.pack_id.clone(),
822 version: manifest.meta.version.to_string(),
823 description: None,
824 });
825 flows.insert(entry.id.clone(), flow);
826 }
827
828 let mut entry_flows = manifest.meta.entry_flows.clone();
829 if entry_flows.is_empty() {
830 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
831 }
832 let metadata = PackMetadata {
833 pack_id: manifest.meta.pack_id.clone(),
834 version: manifest.meta.version.to_string(),
835 entry_flows,
836 secret_requirements: Vec::new(),
837 };
838
839 Ok(PackFlows {
840 descriptors,
841 flows,
842 metadata,
843 })
844}
845
846pub struct ComponentState {
847 pub host: HostState,
848 wasi_ctx: WasiCtx,
849 resource_table: ResourceTable,
850}
851
852impl ComponentState {
853 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
854 let wasi_ctx = policy
855 .instantiate()
856 .context("failed to build WASI context")?;
857 Ok(Self {
858 host,
859 wasi_ctx,
860 resource_table: ResourceTable::new(),
861 })
862 }
863
864 fn host_mut(&mut self) -> &mut HostState {
865 &mut self.host
866 }
867
868 fn should_cancel_host(&mut self) -> bool {
869 false
870 }
871
872 fn yield_now_host(&mut self) {
873 }
875}
876
877impl component_api::v0_4::greentic::component::control::Host for ComponentState {
878 fn should_cancel(&mut self) -> bool {
879 self.should_cancel_host()
880 }
881
882 fn yield_now(&mut self) {
883 self.yield_now_host();
884 }
885}
886
887impl component_api::v0_5::greentic::component::control::Host for ComponentState {
888 fn should_cancel(&mut self) -> bool {
889 self.should_cancel_host()
890 }
891
892 fn yield_now(&mut self) {
893 self.yield_now_host();
894 }
895}
896
897fn add_component_control_instance(
898 linker: &mut Linker<ComponentState>,
899 name: &str,
900) -> wasmtime::Result<()> {
901 let mut inst = linker.instance(name)?;
902 inst.func_wrap(
903 "should-cancel",
904 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
905 let host = caller.data_mut();
906 Ok((host.should_cancel_host(),))
907 },
908 )?;
909 inst.func_wrap(
910 "yield-now",
911 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
912 let host = caller.data_mut();
913 host.yield_now_host();
914 Ok(())
915 },
916 )?;
917 Ok(())
918}
919
920fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
921 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
922 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
923 Ok(())
924}
925
926pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
927 add_wasi_to_linker(linker)?;
928 add_all_v1_to_linker(
929 linker,
930 HostFns {
931 http_client_v1_1: Some(|state| state.host_mut()),
932 http_client: Some(|state| state.host_mut()),
933 oauth_broker: None,
934 runner_host_http: Some(|state| state.host_mut()),
935 runner_host_kv: Some(|state| state.host_mut()),
936 telemetry_logger: Some(|state| state.host_mut()),
937 state_store: allow_state_store.then_some(|state| state.host_mut()),
938 secrets_store: Some(|state| state.host_mut()),
939 },
940 )?;
941 Ok(())
942}
943
944impl OAuthHostContext for ComponentState {
945 fn tenant_id(&self) -> &str {
946 &self.host.config.tenant
947 }
948
949 fn env(&self) -> &str {
950 &self.host.default_env
951 }
952
953 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
954 &mut self.host.oauth_host
955 }
956
957 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
958 self.host.oauth_config.as_ref()
959 }
960}
961
962impl WasiView for ComponentState {
963 fn ctx(&mut self) -> WasiCtxView<'_> {
964 WasiCtxView {
965 ctx: &mut self.wasi_ctx,
966 table: &mut self.resource_table,
967 }
968 }
969}
970
971#[allow(unsafe_code)]
972unsafe impl Send for ComponentState {}
973#[allow(unsafe_code)]
974unsafe impl Sync for ComponentState {}
975
976impl PackRuntime {
977 fn allows_state_store(&self, component_ref: &str) -> bool {
978 if self.state_store.is_none() {
979 return false;
980 }
981 if !self.config.state_store_policy.allow {
982 return false;
983 }
984 let Some(manifest) = self.component_manifests.get(component_ref) else {
985 return false;
986 };
987 manifest
988 .capabilities
989 .host
990 .state
991 .as_ref()
992 .map(|caps| caps.read || caps.write)
993 .unwrap_or(false)
994 }
995
996 #[allow(clippy::too_many_arguments)]
997 pub async fn load(
998 path: impl AsRef<Path>,
999 config: Arc<HostConfig>,
1000 mocks: Option<Arc<MockLayer>>,
1001 archive_source: Option<&Path>,
1002 session_store: Option<DynSessionStore>,
1003 state_store: Option<DynStateStore>,
1004 wasi_policy: Arc<RunnerWasiPolicy>,
1005 secrets: DynSecretsManager,
1006 oauth_config: Option<OAuthBrokerConfig>,
1007 verify_archive: bool,
1008 component_resolution: ComponentResolution,
1009 ) -> Result<Self> {
1010 let path = path.as_ref();
1011 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1012 let path_meta = std::fs::metadata(&safe_path).ok();
1013 let is_dir = path_meta
1014 .as_ref()
1015 .map(|meta| meta.is_dir())
1016 .unwrap_or(false);
1017 let is_component = !is_dir
1018 && safe_path
1019 .extension()
1020 .and_then(|ext| ext.to_str())
1021 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1022 .unwrap_or(false);
1023 let archive_hint_path = if let Some(source) = archive_source {
1024 let (_, normalized) = normalize_pack_path(source)?;
1025 Some(normalized)
1026 } else if is_component || is_dir {
1027 None
1028 } else {
1029 Some(safe_path.clone())
1030 };
1031 let archive_hint = archive_hint_path.as_deref();
1032 if verify_archive {
1033 if let Some(verify_target) = archive_hint.and_then(|p| {
1034 std::fs::metadata(p)
1035 .ok()
1036 .filter(|meta| meta.is_file())
1037 .map(|_| p)
1038 }) {
1039 verify::verify_pack(verify_target).await?;
1040 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1041 } else {
1042 tracing::debug!("skipping archive verification (no archive source)");
1043 }
1044 }
1045 let engine = Engine::default();
1046 let engine_profile =
1047 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1048 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1049 let mut metadata = PackMetadata::fallback(&safe_path);
1050 let mut manifest = None;
1051 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1052 let mut flows = None;
1053 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1054 if is_dir {
1055 Some(safe_path.clone())
1056 } else {
1057 None
1058 }
1059 });
1060
1061 if let Some(root) = materialized_root.as_ref() {
1062 match load_manifest_and_flows_from_dir(root) {
1063 Ok(ManifestLoad::New {
1064 manifest: m,
1065 flows: cache,
1066 }) => {
1067 metadata = cache.metadata.clone();
1068 manifest = Some(*m);
1069 flows = Some(cache);
1070 }
1071 Ok(ManifestLoad::Legacy {
1072 manifest: m,
1073 flows: cache,
1074 }) => {
1075 metadata = cache.metadata.clone();
1076 legacy_manifest = Some(m);
1077 flows = Some(cache);
1078 }
1079 Err(err) => {
1080 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1081 }
1082 }
1083 }
1084
1085 if manifest.is_none()
1086 && legacy_manifest.is_none()
1087 && let Some(archive_path) = archive_hint
1088 {
1089 match load_manifest_and_flows(archive_path) {
1090 Ok(ManifestLoad::New {
1091 manifest: m,
1092 flows: cache,
1093 }) => {
1094 metadata = cache.metadata.clone();
1095 manifest = Some(*m);
1096 flows = Some(cache);
1097 }
1098 Ok(ManifestLoad::Legacy {
1099 manifest: m,
1100 flows: cache,
1101 }) => {
1102 metadata = cache.metadata.clone();
1103 legacy_manifest = Some(m);
1104 flows = Some(cache);
1105 }
1106 Err(err) => {
1107 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1108 }
1109 }
1110 }
1111 let mut pack_lock = None;
1112 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1113 pack_lock = load_pack_lock(&root)?;
1114 if pack_lock.is_some() {
1115 break;
1116 }
1117 }
1118 let component_sources_payload = if pack_lock.is_none() {
1119 if let Some(manifest) = manifest.as_ref() {
1120 manifest
1121 .get_component_sources_v1()
1122 .context("invalid component sources extension")?
1123 } else {
1124 None
1125 }
1126 } else {
1127 None
1128 };
1129 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1130 Some(component_sources_table_from_pack_lock(
1131 lock,
1132 component_resolution.allow_missing_hash,
1133 )?)
1134 } else {
1135 component_sources_table(component_sources_payload.as_ref())?
1136 };
1137 let components = if is_component {
1138 let wasm_bytes = fs::read(&safe_path).await?;
1139 metadata = PackMetadata::from_wasm(&wasm_bytes)
1140 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1141 let name = safe_path
1142 .file_stem()
1143 .map(|s| s.to_string_lossy().to_string())
1144 .unwrap_or_else(|| "component".to_string());
1145 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1146 let mut map = HashMap::new();
1147 map.insert(
1148 name.clone(),
1149 PackComponent {
1150 name,
1151 version: metadata.version.clone(),
1152 component,
1153 },
1154 );
1155 map
1156 } else {
1157 let specs = component_specs(
1158 manifest.as_ref(),
1159 legacy_manifest.as_deref(),
1160 component_sources_payload.as_ref(),
1161 pack_lock.as_ref(),
1162 );
1163 if specs.is_empty() {
1164 HashMap::new()
1165 } else {
1166 let mut loaded = HashMap::new();
1167 let mut missing: HashSet<String> =
1168 specs.iter().map(|spec| spec.id.clone()).collect();
1169 let mut searched = Vec::new();
1170
1171 if !component_resolution.overrides.is_empty() {
1172 load_components_from_overrides(
1173 &cache,
1174 &engine,
1175 &component_resolution.overrides,
1176 &specs,
1177 &mut missing,
1178 &mut loaded,
1179 )
1180 .await?;
1181 searched.push("override map".to_string());
1182 }
1183
1184 if let Some(component_sources) = component_sources.as_ref() {
1185 load_components_from_sources(
1186 &cache,
1187 &engine,
1188 component_sources,
1189 &component_resolution,
1190 &specs,
1191 &mut missing,
1192 &mut loaded,
1193 materialized_root.as_deref(),
1194 archive_hint,
1195 )
1196 .await?;
1197 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1198 }
1199
1200 if let Some(root) = materialized_root.as_ref() {
1201 load_components_from_dir(
1202 &cache,
1203 &engine,
1204 root,
1205 &specs,
1206 &mut missing,
1207 &mut loaded,
1208 )
1209 .await?;
1210 searched.push(format!("components dir {}", root.display()));
1211 }
1212
1213 if let Some(archive_path) = archive_hint {
1214 load_components_from_archive(
1215 &cache,
1216 &engine,
1217 archive_path,
1218 &specs,
1219 &mut missing,
1220 &mut loaded,
1221 )
1222 .await?;
1223 searched.push(format!("archive {}", archive_path.display()));
1224 }
1225
1226 if !missing.is_empty() {
1227 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1228 let sources = if searched.is_empty() {
1229 "no component sources".to_string()
1230 } else {
1231 searched.join(", ")
1232 };
1233 bail!(
1234 "components missing: {}; looked in {}",
1235 missing_list,
1236 sources
1237 );
1238 }
1239
1240 loaded
1241 }
1242 };
1243 let http_client = Arc::clone(&HTTP_CLIENT);
1244 let mut component_manifests = HashMap::new();
1245 if let Some(manifest) = manifest.as_ref() {
1246 for component in &manifest.components {
1247 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1248 }
1249 }
1250 Ok(Self {
1251 path: safe_path,
1252 archive_path: archive_hint.map(Path::to_path_buf),
1253 config,
1254 engine,
1255 metadata,
1256 manifest,
1257 legacy_manifest,
1258 component_manifests,
1259 mocks,
1260 flows,
1261 components,
1262 http_client,
1263 pre_cache: Mutex::new(HashMap::new()),
1264 session_store,
1265 state_store,
1266 wasi_policy,
1267 provider_registry: RwLock::new(None),
1268 secrets,
1269 oauth_config,
1270 cache,
1271 })
1272 }
1273
1274 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1275 if let Some(cache) = &self.flows {
1276 return Ok(cache.descriptors.clone());
1277 }
1278 if let Some(manifest) = &self.manifest {
1279 let descriptors = manifest
1280 .flows
1281 .iter()
1282 .map(|flow| FlowDescriptor {
1283 id: flow.id.as_str().to_string(),
1284 flow_type: flow_kind_to_str(flow.kind).to_string(),
1285 pack_id: manifest.pack_id.as_str().to_string(),
1286 profile: manifest.pack_id.as_str().to_string(),
1287 version: manifest.version.to_string(),
1288 description: None,
1289 })
1290 .collect();
1291 return Ok(descriptors);
1292 }
1293 Ok(Vec::new())
1294 }
1295
1296 #[allow(dead_code)]
1297 pub async fn run_flow(
1298 &self,
1299 flow_id: &str,
1300 input: serde_json::Value,
1301 ) -> Result<serde_json::Value> {
1302 let pack = Arc::new(
1303 PackRuntime::load(
1304 &self.path,
1305 Arc::clone(&self.config),
1306 self.mocks.clone(),
1307 self.archive_path.as_deref(),
1308 self.session_store.clone(),
1309 self.state_store.clone(),
1310 Arc::clone(&self.wasi_policy),
1311 self.secrets.clone(),
1312 self.oauth_config.clone(),
1313 false,
1314 ComponentResolution::default(),
1315 )
1316 .await?,
1317 );
1318
1319 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1320 let retry_config = self.config.retry_config().into();
1321 let mocks = pack.mocks.as_deref();
1322 let tenant = self.config.tenant.as_str();
1323
1324 let ctx = FlowContext {
1325 tenant,
1326 pack_id: pack.metadata().pack_id.as_str(),
1327 flow_id,
1328 node_id: None,
1329 tool: None,
1330 action: None,
1331 session_id: None,
1332 provider_id: None,
1333 retry_config,
1334 observer: None,
1335 mocks,
1336 };
1337
1338 let execution = engine.execute(ctx, input).await?;
1339 match execution.status {
1340 FlowStatus::Completed => Ok(execution.output),
1341 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1342 "status": "pending",
1343 "reason": wait.reason,
1344 "resume": wait.snapshot,
1345 "response": execution.output,
1346 })),
1347 }
1348 }
1349
1350 pub async fn invoke_component(
1351 &self,
1352 component_ref: &str,
1353 ctx: ComponentExecCtx,
1354 operation: &str,
1355 _config_json: Option<String>,
1356 input_json: String,
1357 ) -> Result<Value> {
1358 let pack_component = self
1359 .components
1360 .get(component_ref)
1361 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1362
1363 let mut linker = Linker::new(&self.engine);
1364 let allow_state_store = self.allows_state_store(component_ref);
1365 register_all(&mut linker, allow_state_store)?;
1366 add_component_control_to_linker(&mut linker)?;
1367
1368 let host_state = HostState::new(
1369 Arc::clone(&self.config),
1370 Arc::clone(&self.http_client),
1371 self.mocks.clone(),
1372 self.session_store.clone(),
1373 self.state_store.clone(),
1374 Arc::clone(&self.secrets),
1375 self.oauth_config.clone(),
1376 Some(ctx.clone()),
1377 )?;
1378 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1379 let mut store = wasmtime::Store::new(&self.engine, store_state);
1380 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1381 let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1382 Ok(pre) => {
1383 let bindings = pre.instantiate_async(&mut store).await?;
1384 let node = bindings.greentic_component_node();
1385 let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1386 let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1387 component_api::invoke_result_from_v0_5(result)
1388 }
1389 Err(err) => {
1390 if is_missing_node_export(&err, "0.5.0") {
1391 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1392 let pre: component_api::v0_4::ComponentPre<ComponentState> =
1393 component_api::v0_4::ComponentPre::new(pre_instance)?;
1394 let bindings = pre.instantiate_async(&mut store).await?;
1395 let node = bindings.greentic_component_node();
1396 let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1397 let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1398 component_api::invoke_result_from_v0_4(result)
1399 } else {
1400 return Err(err);
1401 }
1402 }
1403 };
1404
1405 match result {
1406 InvokeResult::Ok(body) => {
1407 if body.is_empty() {
1408 return Ok(Value::Null);
1409 }
1410 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1411 }
1412 InvokeResult::Err(NodeError {
1413 code,
1414 message,
1415 retryable,
1416 backoff_ms,
1417 details,
1418 }) => {
1419 let mut obj = serde_json::Map::new();
1420 obj.insert("ok".into(), Value::Bool(false));
1421 let mut error = serde_json::Map::new();
1422 error.insert("code".into(), Value::String(code));
1423 error.insert("message".into(), Value::String(message));
1424 error.insert("retryable".into(), Value::Bool(retryable));
1425 if let Some(backoff) = backoff_ms {
1426 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1427 }
1428 if let Some(details) = details {
1429 error.insert(
1430 "details".into(),
1431 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1432 );
1433 }
1434 obj.insert("error".into(), Value::Object(error));
1435 Ok(Value::Object(obj))
1436 }
1437 }
1438 }
1439
1440 pub fn resolve_provider(
1441 &self,
1442 provider_id: Option<&str>,
1443 provider_type: Option<&str>,
1444 ) -> Result<ProviderBinding> {
1445 let registry = self.provider_registry()?;
1446 registry.resolve(provider_id, provider_type)
1447 }
1448
1449 pub async fn invoke_provider(
1450 &self,
1451 binding: &ProviderBinding,
1452 ctx: ComponentExecCtx,
1453 op: &str,
1454 input_json: Vec<u8>,
1455 ) -> Result<Value> {
1456 let component_ref = &binding.component_ref;
1457 let pack_component = self
1458 .components
1459 .get(component_ref)
1460 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1461
1462 let mut linker = Linker::new(&self.engine);
1463 let allow_state_store = self.allows_state_store(component_ref);
1464 register_all(&mut linker, allow_state_store)?;
1465 add_component_control_to_linker(&mut linker)?;
1466 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1467 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1468
1469 let host_state = HostState::new(
1470 Arc::clone(&self.config),
1471 Arc::clone(&self.http_client),
1472 self.mocks.clone(),
1473 self.session_store.clone(),
1474 self.state_store.clone(),
1475 Arc::clone(&self.secrets),
1476 self.oauth_config.clone(),
1477 Some(ctx),
1478 )?;
1479 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1480 let mut store = wasmtime::Store::new(&self.engine, store_state);
1481 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1482 let provider = bindings.greentic_provider_core_schema_core_api();
1483
1484 let result = provider.call_invoke(&mut store, op, &input_json)?;
1485 deserialize_json_bytes(result)
1486 }
1487
1488 fn provider_registry(&self) -> Result<ProviderRegistry> {
1489 if let Some(registry) = self.provider_registry.read().clone() {
1490 return Ok(registry);
1491 }
1492 let manifest = self
1493 .manifest
1494 .as_ref()
1495 .context("pack manifest required for provider resolution")?;
1496 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1497 let registry = ProviderRegistry::new(
1498 manifest,
1499 self.state_store.clone(),
1500 &self.config.tenant,
1501 &env,
1502 )?;
1503 *self.provider_registry.write() = Some(registry.clone());
1504 Ok(registry)
1505 }
1506
1507 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1508 if let Some(cache) = &self.flows {
1509 return cache
1510 .flows
1511 .get(flow_id)
1512 .cloned()
1513 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1514 }
1515 if let Some(manifest) = &self.manifest {
1516 let entry = manifest
1517 .flows
1518 .iter()
1519 .find(|f| f.id.as_str() == flow_id)
1520 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1521 return Ok(entry.flow.clone());
1522 }
1523 bail!("flow '{flow_id}' not available (pack exports disabled)")
1524 }
1525
1526 pub fn metadata(&self) -> &PackMetadata {
1527 &self.metadata
1528 }
1529
1530 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1531 &self.metadata.secret_requirements
1532 }
1533
1534 pub fn missing_secrets(
1535 &self,
1536 tenant_ctx: &TypesTenantCtx,
1537 ) -> Vec<greentic_types::SecretRequirement> {
1538 let env = tenant_ctx.env.as_str().to_string();
1539 let tenant = tenant_ctx.tenant.as_str().to_string();
1540 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1541 self.required_secrets()
1542 .iter()
1543 .filter(|req| {
1544 if let Some(scope) = &req.scope {
1546 if scope.env != env {
1547 return false;
1548 }
1549 if scope.tenant != tenant {
1550 return false;
1551 }
1552 if let Some(ref team_req) = scope.team
1553 && team.as_ref() != Some(team_req)
1554 {
1555 return false;
1556 }
1557 }
1558 let ctx = self.config.tenant_ctx();
1559 read_secret_blocking(&self.secrets, &ctx, req.key.as_str()).is_err()
1560 })
1561 .cloned()
1562 .collect()
1563 }
1564
1565 pub fn for_component_test(
1566 components: Vec<(String, PathBuf)>,
1567 flows: HashMap<String, FlowIR>,
1568 pack_id: &str,
1569 config: Arc<HostConfig>,
1570 ) -> Result<Self> {
1571 let engine = Engine::default();
1572 let engine_profile =
1573 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1574 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1575 let mut component_map = HashMap::new();
1576 for (name, path) in components {
1577 if !path.exists() {
1578 bail!("component artifact missing: {}", path.display());
1579 }
1580 let wasm_bytes = std::fs::read(&path)?;
1581 let component = Arc::new(
1582 Component::from_binary(&engine, &wasm_bytes)
1583 .with_context(|| format!("failed to compile component {}", path.display()))?,
1584 );
1585 component_map.insert(
1586 name.clone(),
1587 PackComponent {
1588 name,
1589 version: "0.0.0".into(),
1590 component,
1591 },
1592 );
1593 }
1594
1595 let mut flow_map = HashMap::new();
1596 let mut descriptors = Vec::new();
1597 for (id, ir) in flows {
1598 let flow_type = ir.flow_type.clone();
1599 let flow = flow_ir_to_flow(ir)?;
1600 flow_map.insert(id.clone(), flow);
1601 descriptors.push(FlowDescriptor {
1602 id: id.clone(),
1603 flow_type,
1604 pack_id: pack_id.to_string(),
1605 profile: "test".into(),
1606 version: "0.0.0".into(),
1607 description: None,
1608 });
1609 }
1610 let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
1611 let metadata = PackMetadata {
1612 pack_id: pack_id.to_string(),
1613 version: "0.0.0".into(),
1614 entry_flows,
1615 secret_requirements: Vec::new(),
1616 };
1617 let flows_cache = PackFlows {
1618 descriptors: descriptors.clone(),
1619 flows: flow_map,
1620 metadata: metadata.clone(),
1621 };
1622
1623 Ok(Self {
1624 path: PathBuf::new(),
1625 archive_path: None,
1626 config,
1627 engine,
1628 metadata,
1629 manifest: None,
1630 legacy_manifest: None,
1631 component_manifests: HashMap::new(),
1632 mocks: None,
1633 flows: Some(flows_cache),
1634 components: component_map,
1635 http_client: Arc::clone(&HTTP_CLIENT),
1636 pre_cache: Mutex::new(HashMap::new()),
1637 session_store: None,
1638 state_store: None,
1639 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1640 provider_registry: RwLock::new(None),
1641 secrets: crate::secrets::default_manager()?,
1642 oauth_config: None,
1643 cache,
1644 })
1645 }
1646}
1647
1648fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1649 let message = err.to_string();
1650 message.contains("no exported instance named")
1651 && message.contains(&format!("greentic:component/node@{version}"))
1652}
1653
1654struct PackFlows {
1655 descriptors: Vec<FlowDescriptor>,
1656 flows: HashMap<String, Flow>,
1657 metadata: PackMetadata,
1658}
1659
1660const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1661 "greentic.pack.runtime_flow",
1662 "greentic.pack.flow_runtime",
1663 "greentic.pack.runtime_flows",
1664];
1665
1666#[derive(Debug, Deserialize)]
1667struct RuntimeFlowBundle {
1668 flows: Vec<RuntimeFlow>,
1669}
1670
1671#[derive(Debug, Deserialize)]
1672struct RuntimeFlow {
1673 id: String,
1674 #[serde(alias = "flow_type")]
1675 kind: FlowKind,
1676 #[serde(default)]
1677 schema_version: Option<String>,
1678 #[serde(default)]
1679 start: Option<String>,
1680 #[serde(default)]
1681 entrypoints: BTreeMap<String, Value>,
1682 nodes: BTreeMap<String, RuntimeNode>,
1683 #[serde(default)]
1684 metadata: Option<FlowMetadata>,
1685}
1686
1687#[derive(Debug, Deserialize)]
1688struct RuntimeNode {
1689 #[serde(alias = "component")]
1690 component_id: String,
1691 #[serde(default, alias = "operation")]
1692 operation_name: Option<String>,
1693 #[serde(default, alias = "payload", alias = "input")]
1694 operation_payload: Value,
1695 #[serde(default)]
1696 routing: Option<Routing>,
1697 #[serde(default)]
1698 telemetry: Option<TelemetryHints>,
1699}
1700
1701fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1702 if bytes.is_empty() {
1703 return Ok(Value::Null);
1704 }
1705 serde_json::from_slice(&bytes).or_else(|_| {
1706 String::from_utf8(bytes)
1707 .map(Value::String)
1708 .map_err(|err| anyhow!(err))
1709 })
1710}
1711
1712impl PackFlows {
1713 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1714 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1715 return flows;
1716 }
1717 let descriptors = manifest
1718 .flows
1719 .iter()
1720 .map(|entry| FlowDescriptor {
1721 id: entry.id.as_str().to_string(),
1722 flow_type: flow_kind_to_str(entry.kind).to_string(),
1723 pack_id: manifest.pack_id.as_str().to_string(),
1724 profile: manifest.pack_id.as_str().to_string(),
1725 version: manifest.version.to_string(),
1726 description: None,
1727 })
1728 .collect();
1729 let mut flows = HashMap::new();
1730 for entry in &manifest.flows {
1731 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1732 }
1733 Self {
1734 metadata: PackMetadata::from_manifest(&manifest),
1735 descriptors,
1736 flows,
1737 }
1738 }
1739}
1740
1741fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1742 let extensions = manifest.extensions.as_ref()?;
1743 let extension = extensions.iter().find_map(|(key, ext)| {
1744 if RUNTIME_FLOW_EXTENSION_IDS
1745 .iter()
1746 .any(|candidate| candidate == key)
1747 {
1748 Some(ext)
1749 } else {
1750 None
1751 }
1752 })?;
1753 let runtime_flows = match decode_runtime_flow_extension(extension) {
1754 Some(flows) if !flows.is_empty() => flows,
1755 _ => return None,
1756 };
1757
1758 let descriptors = runtime_flows
1759 .iter()
1760 .map(|flow| FlowDescriptor {
1761 id: flow.id.as_str().to_string(),
1762 flow_type: flow_kind_to_str(flow.kind).to_string(),
1763 pack_id: manifest.pack_id.as_str().to_string(),
1764 profile: manifest.pack_id.as_str().to_string(),
1765 version: manifest.version.to_string(),
1766 description: None,
1767 })
1768 .collect::<Vec<_>>();
1769 let flows = runtime_flows
1770 .into_iter()
1771 .map(|flow| (flow.id.as_str().to_string(), flow))
1772 .collect();
1773
1774 Some(PackFlows {
1775 metadata: PackMetadata::from_manifest(manifest),
1776 descriptors,
1777 flows,
1778 })
1779}
1780
1781fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1782 let value = match extension.inline.as_ref()? {
1783 ExtensionInline::Other(value) => value.clone(),
1784 _ => return None,
1785 };
1786
1787 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1788 return Some(collect_runtime_flows(bundle.flows));
1789 }
1790
1791 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1792 return Some(collect_runtime_flows(flows));
1793 }
1794
1795 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1796 return Some(flows);
1797 }
1798
1799 warn!(
1800 extension = %extension.kind,
1801 version = %extension.version,
1802 "runtime flow extension present but could not be decoded"
1803 );
1804 None
1805}
1806
1807fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1808 flows
1809 .into_iter()
1810 .filter_map(|flow| match runtime_flow_to_flow(flow) {
1811 Ok(flow) => Some(flow),
1812 Err(err) => {
1813 warn!(error = %err, "failed to decode runtime flow");
1814 None
1815 }
1816 })
1817 .collect()
1818}
1819
1820fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1821 let flow_id = FlowId::from_str(&runtime.id)
1822 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1823 let mut entrypoints = runtime.entrypoints;
1824 if entrypoints.is_empty()
1825 && let Some(start) = &runtime.start
1826 {
1827 entrypoints.insert("default".into(), Value::String(start.clone()));
1828 }
1829
1830 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1831 for (id, node) in runtime.nodes {
1832 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1833 let component_id = ComponentId::from_str(&node.component_id)
1834 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1835 let component = FlowComponentRef {
1836 id: component_id,
1837 pack_alias: None,
1838 operation: node.operation_name,
1839 };
1840 let routing = node.routing.unwrap_or(Routing::End);
1841 let telemetry = node.telemetry.unwrap_or_default();
1842 nodes.insert(
1843 node_id.clone(),
1844 Node {
1845 id: node_id,
1846 component,
1847 input: InputMapping {
1848 mapping: node.operation_payload,
1849 },
1850 output: OutputMapping {
1851 mapping: Value::Null,
1852 },
1853 routing,
1854 telemetry,
1855 },
1856 );
1857 }
1858
1859 Ok(Flow {
1860 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1861 id: flow_id,
1862 kind: runtime.kind,
1863 entrypoints,
1864 nodes,
1865 metadata: runtime.metadata.unwrap_or_default(),
1866 })
1867}
1868
1869fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1870 match kind {
1871 greentic_types::FlowKind::Messaging => "messaging",
1872 greentic_types::FlowKind::Event => "event",
1873 greentic_types::FlowKind::ComponentConfig => "component-config",
1874 greentic_types::FlowKind::Job => "job",
1875 greentic_types::FlowKind::Http => "http",
1876 }
1877}
1878
1879fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1880 let mut file = archive
1881 .by_name(name)
1882 .with_context(|| format!("entry {name} missing from archive"))?;
1883 let mut buf = Vec::new();
1884 file.read_to_end(&mut buf)?;
1885 Ok(buf)
1886}
1887
1888fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1889 for node in doc.nodes.values_mut() {
1890 let Some((component_ref, payload)) = node
1891 .raw
1892 .iter()
1893 .next()
1894 .map(|(key, value)| (key.clone(), value.clone()))
1895 else {
1896 continue;
1897 };
1898 if component_ref.starts_with("emit.") {
1899 node.operation = Some(component_ref);
1900 node.payload = payload;
1901 node.raw.clear();
1902 continue;
1903 }
1904 let (target_component, operation, input, config) =
1905 infer_component_exec(&payload, &component_ref);
1906 let mut payload_obj = serde_json::Map::new();
1907 payload_obj.insert("component".into(), Value::String(target_component));
1909 payload_obj.insert("operation".into(), Value::String(operation));
1910 payload_obj.insert("input".into(), input);
1911 if let Some(cfg) = config {
1912 payload_obj.insert("config".into(), cfg);
1913 }
1914 node.operation = Some("component.exec".to_string());
1915 node.payload = Value::Object(payload_obj);
1916 node.raw.clear();
1917 }
1918 doc
1919}
1920
1921fn infer_component_exec(
1922 payload: &Value,
1923 component_ref: &str,
1924) -> (String, String, Value, Option<Value>) {
1925 let default_op = if component_ref.starts_with("templating.") {
1926 "render"
1927 } else {
1928 "invoke"
1929 }
1930 .to_string();
1931
1932 if let Value::Object(map) = payload {
1933 let op = map
1934 .get("op")
1935 .or_else(|| map.get("operation"))
1936 .and_then(Value::as_str)
1937 .map(|s| s.to_string())
1938 .unwrap_or_else(|| default_op.clone());
1939
1940 let mut input = map.clone();
1941 let config = input.remove("config");
1942 let component = input
1943 .get("component")
1944 .or_else(|| input.get("component_ref"))
1945 .and_then(Value::as_str)
1946 .map(|s| s.to_string())
1947 .unwrap_or_else(|| component_ref.to_string());
1948 input.remove("component");
1949 input.remove("component_ref");
1950 input.remove("op");
1951 input.remove("operation");
1952 return (component, op, Value::Object(input), config);
1953 }
1954
1955 (component_ref.to_string(), default_op, payload.clone(), None)
1956}
1957
1958#[derive(Clone, Debug)]
1959struct ComponentSpec {
1960 id: String,
1961 version: String,
1962 legacy_path: Option<String>,
1963}
1964
1965#[derive(Clone, Debug)]
1966struct ComponentSourceInfo {
1967 digest: Option<String>,
1968 source: ComponentSourceRef,
1969 artifact: ComponentArtifactLocation,
1970 expected_wasm_sha256: Option<String>,
1971 skip_digest_verification: bool,
1972}
1973
1974#[derive(Clone, Debug)]
1975enum ComponentArtifactLocation {
1976 Inline { wasm_path: String },
1977 Remote,
1978}
1979
1980#[derive(Clone, Debug, Deserialize)]
1981struct PackLockV1 {
1982 schema_version: u32,
1983 components: Vec<PackLockComponent>,
1984}
1985
1986#[derive(Clone, Debug, Deserialize)]
1987struct PackLockComponent {
1988 name: String,
1989 #[serde(default, rename = "source_ref")]
1990 source_ref: Option<String>,
1991 #[serde(default, rename = "ref")]
1992 legacy_ref: Option<String>,
1993 #[serde(default)]
1994 component_id: Option<ComponentId>,
1995 #[serde(default)]
1996 bundled: Option<bool>,
1997 #[serde(default, rename = "bundled_path")]
1998 bundled_path: Option<String>,
1999 #[serde(default, rename = "path")]
2000 legacy_path: Option<String>,
2001 #[serde(default)]
2002 wasm_sha256: Option<String>,
2003 #[serde(default, rename = "sha256")]
2004 legacy_sha256: Option<String>,
2005 #[serde(default)]
2006 resolved_digest: Option<String>,
2007 #[serde(default)]
2008 digest: Option<String>,
2009}
2010
2011fn component_specs(
2012 manifest: Option<&greentic_types::PackManifest>,
2013 legacy_manifest: Option<&legacy_pack::PackManifest>,
2014 component_sources: Option<&ComponentSourcesV1>,
2015 pack_lock: Option<&PackLockV1>,
2016) -> Vec<ComponentSpec> {
2017 if let Some(manifest) = manifest {
2018 if !manifest.components.is_empty() {
2019 return manifest
2020 .components
2021 .iter()
2022 .map(|entry| ComponentSpec {
2023 id: entry.id.as_str().to_string(),
2024 version: entry.version.to_string(),
2025 legacy_path: None,
2026 })
2027 .collect();
2028 }
2029 if let Some(lock) = pack_lock {
2030 let mut seen = HashSet::new();
2031 let mut specs = Vec::new();
2032 for entry in &lock.components {
2033 let id = entry
2034 .component_id
2035 .as_ref()
2036 .map(|id| id.as_str())
2037 .unwrap_or(entry.name.as_str());
2038 if seen.insert(id.to_string()) {
2039 specs.push(ComponentSpec {
2040 id: id.to_string(),
2041 version: "0.0.0".to_string(),
2042 legacy_path: None,
2043 });
2044 }
2045 }
2046 return specs;
2047 }
2048 if let Some(sources) = component_sources {
2049 let mut seen = HashSet::new();
2050 let mut specs = Vec::new();
2051 for entry in &sources.components {
2052 let id = entry
2053 .component_id
2054 .as_ref()
2055 .map(|id| id.as_str())
2056 .unwrap_or(entry.name.as_str());
2057 if seen.insert(id.to_string()) {
2058 specs.push(ComponentSpec {
2059 id: id.to_string(),
2060 version: "0.0.0".to_string(),
2061 legacy_path: None,
2062 });
2063 }
2064 }
2065 return specs;
2066 }
2067 }
2068 if let Some(legacy_manifest) = legacy_manifest {
2069 return legacy_manifest
2070 .components
2071 .iter()
2072 .map(|entry| ComponentSpec {
2073 id: entry.name.clone(),
2074 version: entry.version.to_string(),
2075 legacy_path: Some(entry.file_wasm.clone()),
2076 })
2077 .collect();
2078 }
2079 Vec::new()
2080}
2081
2082fn component_sources_table(
2083 sources: Option<&ComponentSourcesV1>,
2084) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2085 let Some(sources) = sources else {
2086 return Ok(None);
2087 };
2088 let mut table = HashMap::new();
2089 for entry in &sources.components {
2090 let artifact = match &entry.artifact {
2091 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2092 wasm_path: wasm_path.clone(),
2093 },
2094 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2095 };
2096 let info = ComponentSourceInfo {
2097 digest: Some(entry.resolved.digest.clone()),
2098 source: entry.source.clone(),
2099 artifact,
2100 expected_wasm_sha256: None,
2101 skip_digest_verification: false,
2102 };
2103 if let Some(component_id) = entry.component_id.as_ref() {
2104 table.insert(component_id.as_str().to_string(), info.clone());
2105 }
2106 table.insert(entry.name.clone(), info);
2107 }
2108 Ok(Some(table))
2109}
2110
2111fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2112 let lock_path = if path.is_dir() {
2113 let candidate = path.join("pack.lock");
2114 if candidate.exists() {
2115 Some(candidate)
2116 } else {
2117 let candidate = path.join("pack.lock.json");
2118 candidate.exists().then_some(candidate)
2119 }
2120 } else {
2121 None
2122 };
2123 let Some(lock_path) = lock_path else {
2124 return Ok(None);
2125 };
2126 let raw = std::fs::read_to_string(&lock_path)
2127 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2128 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2129 if lock.schema_version != 1 {
2130 bail!("pack.lock schema_version must be 1");
2131 }
2132 Ok(Some(lock))
2133}
2134
2135fn find_pack_lock_roots(
2136 pack_path: &Path,
2137 is_dir: bool,
2138 archive_hint: Option<&Path>,
2139) -> Vec<PathBuf> {
2140 if is_dir {
2141 return vec![pack_path.to_path_buf()];
2142 }
2143 let mut roots = Vec::new();
2144 if let Some(archive_path) = archive_hint {
2145 if let Some(parent) = archive_path.parent() {
2146 roots.push(parent.to_path_buf());
2147 if let Some(grandparent) = parent.parent() {
2148 roots.push(grandparent.to_path_buf());
2149 }
2150 }
2151 } else if let Some(parent) = pack_path.parent() {
2152 roots.push(parent.to_path_buf());
2153 if let Some(grandparent) = parent.parent() {
2154 roots.push(grandparent.to_path_buf());
2155 }
2156 }
2157 roots
2158}
2159
2160fn normalize_sha256(digest: &str) -> Result<String> {
2161 let trimmed = digest.trim();
2162 if trimmed.is_empty() {
2163 bail!("sha256 digest cannot be empty");
2164 }
2165 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2166 if stripped.is_empty() {
2167 bail!("sha256 digest must include hex bytes after sha256:");
2168 }
2169 return Ok(trimmed.to_string());
2170 }
2171 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2172 return Ok(format!("sha256:{trimmed}"));
2173 }
2174 bail!("sha256 digest must be hex or sha256:<hex>");
2175}
2176
2177fn component_sources_table_from_pack_lock(
2178 lock: &PackLockV1,
2179 allow_missing_hash: bool,
2180) -> Result<HashMap<String, ComponentSourceInfo>> {
2181 let mut table = HashMap::new();
2182 let mut names = HashSet::new();
2183 for entry in &lock.components {
2184 if !names.insert(entry.name.clone()) {
2185 bail!(
2186 "pack.lock contains duplicate component name `{}`",
2187 entry.name
2188 );
2189 }
2190 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2191 (Some(primary), Some(legacy)) => {
2192 if primary != legacy {
2193 bail!(
2194 "pack.lock component {} has conflicting refs: {} vs {}",
2195 entry.name,
2196 primary,
2197 legacy
2198 );
2199 }
2200 primary.as_str()
2201 }
2202 (Some(primary), None) => primary.as_str(),
2203 (None, Some(legacy)) => legacy.as_str(),
2204 (None, None) => {
2205 bail!("pack.lock component {} missing source_ref", entry.name);
2206 }
2207 };
2208 let source: ComponentSourceRef = source_ref
2209 .parse()
2210 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2211 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2212 (Some(primary), Some(legacy)) => {
2213 if primary != legacy {
2214 bail!(
2215 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2216 entry.name,
2217 primary,
2218 legacy
2219 );
2220 }
2221 Some(primary.clone())
2222 }
2223 (Some(primary), None) => Some(primary.clone()),
2224 (None, Some(legacy)) => Some(legacy.clone()),
2225 (None, None) => None,
2226 };
2227 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2228 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2229 let wasm_path = bundled_path.ok_or_else(|| {
2230 anyhow!(
2231 "pack.lock component {} marked bundled but bundled_path is missing",
2232 entry.name
2233 )
2234 })?;
2235 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2236 (Some(primary), Some(legacy)) => {
2237 if primary != legacy {
2238 bail!(
2239 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2240 entry.name,
2241 primary,
2242 legacy
2243 );
2244 }
2245 Some(primary.as_str())
2246 }
2247 (Some(primary), None) => Some(primary.as_str()),
2248 (None, Some(legacy)) => Some(legacy.as_str()),
2249 (None, None) => None,
2250 };
2251 let expected = match expected_raw {
2252 Some(value) => Some(normalize_sha256(value)?),
2253 None => None,
2254 };
2255 if expected.is_none() && !allow_missing_hash {
2256 bail!(
2257 "pack.lock component {} missing wasm_sha256 for bundled component",
2258 entry.name
2259 );
2260 }
2261 (
2262 ComponentArtifactLocation::Inline { wasm_path },
2263 expected.clone(),
2264 expected,
2265 allow_missing_hash && expected_raw.is_none(),
2266 )
2267 } else {
2268 if source.is_tag() {
2269 bail!(
2270 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2271 entry.name,
2272 source
2273 );
2274 }
2275 let expected = entry
2276 .resolved_digest
2277 .as_deref()
2278 .or(entry.digest.as_deref())
2279 .ok_or_else(|| {
2280 anyhow!(
2281 "pack.lock component {} missing resolved_digest for remote component",
2282 entry.name
2283 )
2284 })?;
2285 (
2286 ComponentArtifactLocation::Remote,
2287 Some(normalize_digest(expected)),
2288 None,
2289 false,
2290 )
2291 };
2292 let info = ComponentSourceInfo {
2293 digest,
2294 source,
2295 artifact,
2296 expected_wasm_sha256,
2297 skip_digest_verification,
2298 };
2299 if let Some(component_id) = entry.component_id.as_ref() {
2300 let key = component_id.as_str().to_string();
2301 if table.contains_key(&key) {
2302 bail!(
2303 "pack.lock contains duplicate component id `{}`",
2304 component_id.as_str()
2305 );
2306 }
2307 table.insert(key, info.clone());
2308 }
2309 if entry.name
2310 != entry
2311 .component_id
2312 .as_ref()
2313 .map(|id| id.as_str())
2314 .unwrap_or("")
2315 {
2316 table.insert(entry.name.clone(), info);
2317 }
2318 }
2319 Ok(table)
2320}
2321
2322fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2323 if let Some(path) = &spec.legacy_path {
2324 return root.join(path);
2325 }
2326 root.join("components").join(format!("{}.wasm", spec.id))
2327}
2328
2329fn normalize_digest(digest: &str) -> String {
2330 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2331 digest.to_string()
2332 } else {
2333 format!("sha256:{digest}")
2334 }
2335}
2336
2337fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2338 if digest.starts_with("blake3:") {
2339 let hash = blake3::hash(bytes);
2340 return Ok(format!("blake3:{}", hash.to_hex()));
2341 }
2342 let mut hasher = sha2::Sha256::new();
2343 hasher.update(bytes);
2344 Ok(format!("sha256:{:x}", hasher.finalize()))
2345}
2346
2347fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2348 let mut hasher = sha2::Sha256::new();
2349 hasher.update(bytes);
2350 format!("sha256:{:x}", hasher.finalize())
2351}
2352
2353fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2354 let wasm_digest = digest
2355 .map(normalize_digest)
2356 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2357 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2358}
2359
2360async fn compile_component_with_cache(
2361 cache: &CacheManager,
2362 engine: &Engine,
2363 digest: Option<&str>,
2364 bytes: Vec<u8>,
2365) -> Result<Arc<Component>> {
2366 let key = build_artifact_key(cache, digest, &bytes);
2367 cache.get_component(engine, &key, || Ok(bytes)).await
2368}
2369
2370fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2371 let normalized_expected = normalize_digest(expected);
2372 let actual = compute_digest_for(bytes, &normalized_expected)?;
2373 if normalize_digest(&actual) != normalized_expected {
2374 bail!(
2375 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2376 );
2377 }
2378 Ok(())
2379}
2380
2381fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2382 let normalized_expected = normalize_sha256(expected)?;
2383 let actual = compute_sha256_digest_for(bytes);
2384 if actual != normalized_expected {
2385 bail!(
2386 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2387 );
2388 }
2389 Ok(())
2390}
2391
2392#[cfg(test)]
2393mod pack_lock_tests {
2394 use super::*;
2395 use tempfile::TempDir;
2396
2397 #[test]
2398 fn pack_lock_tag_ref_requires_bundle() {
2399 let lock = PackLockV1 {
2400 schema_version: 1,
2401 components: vec![PackLockComponent {
2402 name: "templates".to_string(),
2403 source_ref: Some("oci://registry.test/templates:latest".to_string()),
2404 legacy_ref: None,
2405 component_id: None,
2406 bundled: Some(false),
2407 bundled_path: None,
2408 legacy_path: None,
2409 wasm_sha256: None,
2410 legacy_sha256: None,
2411 resolved_digest: None,
2412 digest: None,
2413 }],
2414 };
2415 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2416 assert!(
2417 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2418 "unexpected error: {err}"
2419 );
2420 }
2421
2422 #[test]
2423 fn bundled_hash_mismatch_errors() {
2424 let rt = tokio::runtime::Runtime::new().expect("runtime");
2425 let temp = TempDir::new().expect("temp dir");
2426 let engine = Engine::default();
2427 let engine_profile =
2428 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2429 let cache_config = CacheConfig {
2430 root: temp.path().join("cache"),
2431 ..CacheConfig::default()
2432 };
2433 let cache = CacheManager::new(cache_config, engine_profile);
2434 let wasm_path = temp.path().join("component.wasm");
2435 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2436 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2437 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2438 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2439
2440 let spec = ComponentSpec {
2441 id: "qa.process".to_string(),
2442 version: "0.0.0".to_string(),
2443 legacy_path: None,
2444 };
2445 let mut missing = HashSet::new();
2446 missing.insert(spec.id.clone());
2447
2448 let mut sources = HashMap::new();
2449 sources.insert(
2450 spec.id.clone(),
2451 ComponentSourceInfo {
2452 digest: Some("sha256:deadbeef".to_string()),
2453 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2454 artifact: ComponentArtifactLocation::Inline {
2455 wasm_path: "component.wasm".to_string(),
2456 },
2457 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2458 skip_digest_verification: false,
2459 },
2460 );
2461
2462 let mut loaded = HashMap::new();
2463 let result = rt.block_on(load_components_from_sources(
2464 &cache,
2465 &engine,
2466 &sources,
2467 &ComponentResolution::default(),
2468 &[spec],
2469 &mut missing,
2470 &mut loaded,
2471 Some(temp.path()),
2472 None,
2473 ));
2474 let err = result.unwrap_err();
2475 assert!(
2476 err.to_string().contains("bundled digest mismatch"),
2477 "unexpected error: {err}"
2478 );
2479 }
2480}
2481
2482fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2483 let mut opts = DistOptions {
2484 allow_tags: true,
2485 ..DistOptions::default()
2486 };
2487 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2488 opts.cache_dir = cache_dir;
2489 }
2490 if component_resolution.dist_offline {
2491 opts.offline = true;
2492 }
2493 opts
2494}
2495
2496#[allow(clippy::too_many_arguments)]
2497async fn load_components_from_sources(
2498 cache: &CacheManager,
2499 engine: &Engine,
2500 component_sources: &HashMap<String, ComponentSourceInfo>,
2501 component_resolution: &ComponentResolution,
2502 specs: &[ComponentSpec],
2503 missing: &mut HashSet<String>,
2504 into: &mut HashMap<String, PackComponent>,
2505 materialized_root: Option<&Path>,
2506 archive_hint: Option<&Path>,
2507) -> Result<()> {
2508 let mut archive = if let Some(path) = archive_hint {
2509 Some(
2510 ZipArchive::new(File::open(path)?)
2511 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2512 )
2513 } else {
2514 None
2515 };
2516 let mut dist_client: Option<DistClient> = None;
2517
2518 for spec in specs {
2519 if !missing.contains(&spec.id) {
2520 continue;
2521 }
2522 let Some(source) = component_sources.get(&spec.id) else {
2523 continue;
2524 };
2525
2526 let bytes = match &source.artifact {
2527 ComponentArtifactLocation::Inline { wasm_path } => {
2528 if let Some(root) = materialized_root {
2529 let path = root.join(wasm_path);
2530 if path.exists() {
2531 std::fs::read(&path).with_context(|| {
2532 format!(
2533 "failed to read inline component {} from {}",
2534 spec.id,
2535 path.display()
2536 )
2537 })?
2538 } else if archive.is_none() {
2539 bail!("inline component {} missing at {}", spec.id, path.display());
2540 } else {
2541 read_entry(
2542 archive.as_mut().expect("archive present when needed"),
2543 wasm_path,
2544 )
2545 .with_context(|| {
2546 format!(
2547 "inline component {} missing at {} in pack archive",
2548 spec.id, wasm_path
2549 )
2550 })?
2551 }
2552 } else if let Some(archive) = archive.as_mut() {
2553 read_entry(archive, wasm_path).with_context(|| {
2554 format!(
2555 "inline component {} missing at {} in pack archive",
2556 spec.id, wasm_path
2557 )
2558 })?
2559 } else {
2560 bail!(
2561 "inline component {} missing and no pack source available",
2562 spec.id
2563 );
2564 }
2565 }
2566 ComponentArtifactLocation::Remote => {
2567 if source.source.is_tag() {
2568 bail!(
2569 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2570 spec.id,
2571 source.source
2572 );
2573 }
2574 let client = dist_client.get_or_insert_with(|| {
2575 DistClient::new(dist_options_from(component_resolution))
2576 });
2577 let reference = source.source.to_string();
2578 let digest = source.digest.as_deref().ok_or_else(|| {
2579 anyhow!(
2580 "component {} missing expected digest for remote component",
2581 spec.id
2582 )
2583 })?;
2584 let cache_path = if component_resolution.dist_offline {
2585 client
2586 .fetch_digest(digest)
2587 .await
2588 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2589 } else {
2590 let resolved = client
2591 .resolve_ref(&reference)
2592 .await
2593 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2594 let expected = normalize_digest(digest);
2595 let actual = normalize_digest(&resolved.digest);
2596 if expected != actual {
2597 bail!(
2598 "component {} digest mismatch after fetch: expected {}, got {}",
2599 spec.id,
2600 expected,
2601 actual
2602 );
2603 }
2604 resolved.cache_path.ok_or_else(|| {
2605 anyhow!(
2606 "component {} resolved from {} but cache path is missing",
2607 spec.id,
2608 reference
2609 )
2610 })?
2611 };
2612 std::fs::read(&cache_path).with_context(|| {
2613 format!(
2614 "failed to read cached component {} from {}",
2615 spec.id,
2616 cache_path.display()
2617 )
2618 })?
2619 }
2620 };
2621
2622 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
2623 verify_wasm_sha256(&spec.id, expected, &bytes)?;
2624 } else if source.skip_digest_verification {
2625 let actual = compute_sha256_digest_for(&bytes);
2626 warn!(
2627 component_id = %spec.id,
2628 digest = %actual,
2629 "bundled component missing wasm_sha256; allowing due to flag"
2630 );
2631 } else {
2632 let expected = source.digest.as_deref().ok_or_else(|| {
2633 anyhow!(
2634 "component {} missing expected digest for verification",
2635 spec.id
2636 )
2637 })?;
2638 verify_component_digest(&spec.id, expected, &bytes)?;
2639 }
2640 let component =
2641 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
2642 .await
2643 .with_context(|| format!("failed to compile component {}", spec.id))?;
2644 into.insert(
2645 spec.id.clone(),
2646 PackComponent {
2647 name: spec.id.clone(),
2648 version: spec.version.clone(),
2649 component,
2650 },
2651 );
2652 missing.remove(&spec.id);
2653 }
2654
2655 Ok(())
2656}
2657
2658fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2659 match err {
2660 DistError::CacheMiss { reference: missing } => anyhow!(
2661 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2662 component_id,
2663 missing,
2664 reference
2665 ),
2666 DistError::Offline { reference: blocked } => anyhow!(
2667 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2668 component_id,
2669 blocked,
2670 reference
2671 ),
2672 DistError::AuthRequired { target } => anyhow!(
2673 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2674 component_id,
2675 target,
2676 reference
2677 ),
2678 other => anyhow!(
2679 "failed to resolve component {} from {}: {}",
2680 component_id,
2681 reference,
2682 other
2683 ),
2684 }
2685}
2686
2687async fn load_components_from_overrides(
2688 cache: &CacheManager,
2689 engine: &Engine,
2690 overrides: &HashMap<String, PathBuf>,
2691 specs: &[ComponentSpec],
2692 missing: &mut HashSet<String>,
2693 into: &mut HashMap<String, PackComponent>,
2694) -> Result<()> {
2695 for spec in specs {
2696 if !missing.contains(&spec.id) {
2697 continue;
2698 }
2699 let Some(path) = overrides.get(&spec.id) else {
2700 continue;
2701 };
2702 let bytes = std::fs::read(path)
2703 .with_context(|| format!("failed to read override component {}", path.display()))?;
2704 let component = compile_component_with_cache(cache, engine, None, bytes)
2705 .await
2706 .with_context(|| {
2707 format!(
2708 "failed to compile component {} from override {}",
2709 spec.id,
2710 path.display()
2711 )
2712 })?;
2713 into.insert(
2714 spec.id.clone(),
2715 PackComponent {
2716 name: spec.id.clone(),
2717 version: spec.version.clone(),
2718 component,
2719 },
2720 );
2721 missing.remove(&spec.id);
2722 }
2723 Ok(())
2724}
2725
2726async fn load_components_from_dir(
2727 cache: &CacheManager,
2728 engine: &Engine,
2729 root: &Path,
2730 specs: &[ComponentSpec],
2731 missing: &mut HashSet<String>,
2732 into: &mut HashMap<String, PackComponent>,
2733) -> Result<()> {
2734 for spec in specs {
2735 if !missing.contains(&spec.id) {
2736 continue;
2737 }
2738 let path = component_path_for_spec(root, spec);
2739 if !path.exists() {
2740 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2741 continue;
2742 }
2743 let bytes = std::fs::read(&path)
2744 .with_context(|| format!("failed to read component {}", path.display()))?;
2745 let component = compile_component_with_cache(cache, engine, None, bytes)
2746 .await
2747 .with_context(|| {
2748 format!(
2749 "failed to compile component {} from {}",
2750 spec.id,
2751 path.display()
2752 )
2753 })?;
2754 into.insert(
2755 spec.id.clone(),
2756 PackComponent {
2757 name: spec.id.clone(),
2758 version: spec.version.clone(),
2759 component,
2760 },
2761 );
2762 missing.remove(&spec.id);
2763 }
2764 Ok(())
2765}
2766
2767async fn load_components_from_archive(
2768 cache: &CacheManager,
2769 engine: &Engine,
2770 path: &Path,
2771 specs: &[ComponentSpec],
2772 missing: &mut HashSet<String>,
2773 into: &mut HashMap<String, PackComponent>,
2774) -> Result<()> {
2775 let mut archive = ZipArchive::new(File::open(path)?)
2776 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2777 for spec in specs {
2778 if !missing.contains(&spec.id) {
2779 continue;
2780 }
2781 let file_name = spec
2782 .legacy_path
2783 .clone()
2784 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2785 let bytes = match read_entry(&mut archive, &file_name) {
2786 Ok(bytes) => bytes,
2787 Err(err) => {
2788 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2789 continue;
2790 }
2791 };
2792 let component = compile_component_with_cache(cache, engine, None, bytes)
2793 .await
2794 .with_context(|| format!("failed to compile component {}", spec.id))?;
2795 into.insert(
2796 spec.id.clone(),
2797 PackComponent {
2798 name: spec.id.clone(),
2799 version: spec.version.clone(),
2800 component,
2801 },
2802 );
2803 missing.remove(&spec.id);
2804 }
2805 Ok(())
2806}
2807
2808#[cfg(test)]
2809mod tests {
2810 use super::*;
2811 use greentic_flow::model::{FlowDoc, NodeDoc};
2812 use indexmap::IndexMap;
2813 use serde_json::json;
2814
2815 #[test]
2816 fn normalizes_raw_component_to_component_exec() {
2817 let mut nodes = IndexMap::new();
2818 let mut raw = IndexMap::new();
2819 raw.insert(
2820 "templating.handlebars".into(),
2821 json!({ "template": "Hi {{name}}" }),
2822 );
2823 nodes.insert(
2824 "start".into(),
2825 NodeDoc {
2826 raw,
2827 routing: json!([{"out": true}]),
2828 ..Default::default()
2829 },
2830 );
2831 let doc = FlowDoc {
2832 id: "welcome".into(),
2833 title: None,
2834 description: None,
2835 flow_type: "messaging".into(),
2836 start: Some("start".into()),
2837 parameters: json!({}),
2838 tags: Vec::new(),
2839 schema_version: None,
2840 entrypoints: IndexMap::new(),
2841 nodes,
2842 };
2843
2844 let normalized = normalize_flow_doc(doc);
2845 let node = normalized.nodes.get("start").expect("node exists");
2846 assert_eq!(node.operation.as_deref(), Some("component.exec"));
2847 assert!(node.raw.is_empty());
2848 let payload = node.payload.as_object().expect("payload object");
2849 assert_eq!(
2850 payload.get("component"),
2851 Some(&Value::String("templating.handlebars".into()))
2852 );
2853 assert_eq!(
2854 payload.get("operation"),
2855 Some(&Value::String("render".into()))
2856 );
2857 let input = payload.get("input").unwrap();
2858 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2859 }
2860}
2861
2862#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2863pub struct PackMetadata {
2864 pub pack_id: String,
2865 pub version: String,
2866 #[serde(default)]
2867 pub entry_flows: Vec<String>,
2868 #[serde(default)]
2869 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2870}
2871
2872impl PackMetadata {
2873 fn from_wasm(bytes: &[u8]) -> Option<Self> {
2874 let parser = Parser::new(0);
2875 for payload in parser.parse_all(bytes) {
2876 let payload = payload.ok()?;
2877 match payload {
2878 Payload::CustomSection(section) => {
2879 if section.name() == "greentic.manifest"
2880 && let Ok(meta) = Self::from_bytes(section.data())
2881 {
2882 return Some(meta);
2883 }
2884 }
2885 Payload::DataSection(reader) => {
2886 for segment in reader.into_iter().flatten() {
2887 if let Ok(meta) = Self::from_bytes(segment.data) {
2888 return Some(meta);
2889 }
2890 }
2891 }
2892 _ => {}
2893 }
2894 }
2895 None
2896 }
2897
2898 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2899 #[derive(Deserialize)]
2900 struct RawManifest {
2901 pack_id: String,
2902 version: String,
2903 #[serde(default)]
2904 entry_flows: Vec<String>,
2905 #[serde(default)]
2906 flows: Vec<RawFlow>,
2907 #[serde(default)]
2908 secret_requirements: Vec<greentic_types::SecretRequirement>,
2909 }
2910
2911 #[derive(Deserialize)]
2912 struct RawFlow {
2913 id: String,
2914 }
2915
2916 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2917 let mut entry_flows = if manifest.entry_flows.is_empty() {
2918 manifest.flows.iter().map(|f| f.id.clone()).collect()
2919 } else {
2920 manifest.entry_flows.clone()
2921 };
2922 entry_flows.retain(|id| !id.is_empty());
2923 Ok(Self {
2924 pack_id: manifest.pack_id,
2925 version: manifest.version,
2926 entry_flows,
2927 secret_requirements: manifest.secret_requirements,
2928 })
2929 }
2930
2931 pub fn fallback(path: &Path) -> Self {
2932 let pack_id = path
2933 .file_stem()
2934 .map(|s| s.to_string_lossy().into_owned())
2935 .unwrap_or_else(|| "unknown-pack".to_string());
2936 Self {
2937 pack_id,
2938 version: "0.0.0".to_string(),
2939 entry_flows: Vec::new(),
2940 secret_requirements: Vec::new(),
2941 }
2942 }
2943
2944 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2945 let entry_flows = manifest
2946 .flows
2947 .iter()
2948 .map(|flow| flow.id.as_str().to_string())
2949 .collect::<Vec<_>>();
2950 Self {
2951 pack_id: manifest.pack_id.as_str().to_string(),
2952 version: manifest.version.to_string(),
2953 entry_flows,
2954 secret_requirements: manifest.secret_requirements.clone(),
2955 }
2956 }
2957}