1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21 self as host_v1, HostFns, add_all_v1_to_linker,
22 runner_host_http::RunnerHostHttp,
23 runner_host_kv::RunnerHostKv,
24 secrets_store::{SecretsError, SecretsStoreHost},
25 state_store::{
26 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27 StateStoreHost, TenantCtx as StateTenantCtx,
28 },
29 telemetry_logger::{
30 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32 TenantCtx as TelemetryTenantCtx,
33 },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
41 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
42 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
43 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44 pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69
70use crate::config::HostConfig;
71use crate::secrets::{DynSecretsManager, read_secret_blocking};
72use crate::storage::state::STATE_PREFIX;
73use crate::storage::{DynSessionStore, DynStateStore};
74use crate::verify;
75use crate::wasi::RunnerWasiPolicy;
76use tracing::warn;
77use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
78use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
79
80use greentic_flow::model::FlowDoc;
81
82#[allow(dead_code)]
83pub struct PackRuntime {
84 path: PathBuf,
86 archive_path: Option<PathBuf>,
88 config: Arc<HostConfig>,
89 engine: Engine,
90 metadata: PackMetadata,
91 manifest: Option<greentic_types::PackManifest>,
92 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
93 component_manifests: HashMap<String, ComponentManifest>,
94 mocks: Option<Arc<MockLayer>>,
95 flows: Option<PackFlows>,
96 components: HashMap<String, PackComponent>,
97 http_client: Arc<BlockingClient>,
98 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
99 session_store: Option<DynSessionStore>,
100 state_store: Option<DynStateStore>,
101 wasi_policy: Arc<RunnerWasiPolicy>,
102 provider_registry: RwLock<Option<ProviderRegistry>>,
103 secrets: DynSecretsManager,
104 oauth_config: Option<OAuthBrokerConfig>,
105 cache: CacheManager,
106}
107
108struct PackComponent {
109 #[allow(dead_code)]
110 name: String,
111 #[allow(dead_code)]
112 version: String,
113 component: Arc<Component>,
114}
115
116#[derive(Debug, Default, Clone)]
117pub struct ComponentResolution {
118 pub materialized_root: Option<PathBuf>,
120 pub overrides: HashMap<String, PathBuf>,
122 pub dist_offline: bool,
124 pub dist_cache_dir: Option<PathBuf>,
126 pub allow_missing_hash: bool,
128}
129
130fn build_blocking_client() -> BlockingClient {
131 std::thread::spawn(|| {
132 BlockingClient::builder()
133 .no_proxy()
134 .build()
135 .expect("blocking client")
136 })
137 .join()
138 .expect("client build thread panicked")
139}
140
141fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
142 let (root, candidate) = if path.is_absolute() {
143 let parent = path
144 .parent()
145 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
146 let root = parent
147 .canonicalize()
148 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
149 let file = path
150 .file_name()
151 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
152 (root, PathBuf::from(file))
153 } else {
154 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
155 let base = if let Some(parent) = path.parent() {
156 cwd.join(parent)
157 } else {
158 cwd
159 };
160 let root = base
161 .canonicalize()
162 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
163 let file = path
164 .file_name()
165 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
166 (root, PathBuf::from(file))
167 };
168 let safe = normalize_under_root(&root, &candidate)?;
169 Ok((root, safe))
170}
171
172static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct FlowDescriptor {
176 pub id: String,
177 #[serde(rename = "type")]
178 pub flow_type: String,
179 pub profile: String,
180 pub version: String,
181 #[serde(default)]
182 pub description: Option<String>,
183}
184
185pub struct HostState {
186 config: Arc<HostConfig>,
187 http_client: Arc<BlockingClient>,
188 default_env: String,
189 #[allow(dead_code)]
190 session_store: Option<DynSessionStore>,
191 state_store: Option<DynStateStore>,
192 mocks: Option<Arc<MockLayer>>,
193 secrets: DynSecretsManager,
194 oauth_config: Option<OAuthBrokerConfig>,
195 oauth_host: OAuthBrokerHost,
196 exec_ctx: Option<ComponentExecCtx>,
197}
198
199impl HostState {
200 #[allow(clippy::default_constructed_unit_structs)]
201 #[allow(clippy::too_many_arguments)]
202 pub fn new(
203 config: Arc<HostConfig>,
204 http_client: Arc<BlockingClient>,
205 mocks: Option<Arc<MockLayer>>,
206 session_store: Option<DynSessionStore>,
207 state_store: Option<DynStateStore>,
208 secrets: DynSecretsManager,
209 oauth_config: Option<OAuthBrokerConfig>,
210 exec_ctx: Option<ComponentExecCtx>,
211 ) -> Result<Self> {
212 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
213 Ok(Self {
214 config,
215 http_client,
216 default_env,
217 session_store,
218 state_store,
219 mocks,
220 secrets,
221 oauth_config,
222 oauth_host: OAuthBrokerHost::default(),
223 exec_ctx,
224 })
225 }
226
227 pub fn get_secret(&self, key: &str) -> Result<String> {
228 if provider_core_only::is_enabled() {
229 bail!(provider_core_only::blocked_message("secrets"))
230 }
231 if !self.config.secrets_policy.is_allowed(key) {
232 bail!("secret {key} is not permitted by bindings policy");
233 }
234 if let Some(mock) = &self.mocks
235 && let Some(value) = mock.secrets_lookup(key)
236 {
237 return Ok(value);
238 }
239 let bytes = read_secret_blocking(&self.secrets, key)
240 .context("failed to read secret from manager")?;
241 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
242 Ok(value)
243 }
244
245 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
246 let tenant_raw = ctx
247 .as_ref()
248 .map(|ctx| ctx.tenant.clone())
249 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
250 .unwrap_or_else(|| self.config.tenant.clone());
251 let env_raw = ctx
252 .as_ref()
253 .map(|ctx| ctx.env.clone())
254 .unwrap_or_else(|| self.default_env.clone());
255 let tenant_id = TenantId::from_str(&tenant_raw)
256 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
257 let env_id = EnvId::from_str(&env_raw)
258 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
259 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
260 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
261 if let Some(team) = exec_ctx.tenant.team.as_ref() {
262 let team_id =
263 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
264 tenant_ctx = tenant_ctx.with_team(Some(team_id));
265 }
266 if let Some(user) = exec_ctx.tenant.user.as_ref() {
267 let user_id =
268 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
269 tenant_ctx = tenant_ctx.with_user(Some(user_id));
270 }
271 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
272 if let Some(node) = exec_ctx.node_id.as_ref() {
273 tenant_ctx = tenant_ctx.with_node(node.clone());
274 }
275 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
276 tenant_ctx = tenant_ctx.with_session(session.clone());
277 }
278 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
279 }
280
281 if let Some(ctx) = ctx {
282 if let Some(team) = ctx.team.or(ctx.team_id) {
283 let team_id =
284 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
285 tenant_ctx = tenant_ctx.with_team(Some(team_id));
286 }
287 if let Some(user) = ctx.user.or(ctx.user_id) {
288 let user_id =
289 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
290 tenant_ctx = tenant_ctx.with_user(Some(user_id));
291 }
292 if let Some(flow) = ctx.flow_id {
293 tenant_ctx = tenant_ctx.with_flow(flow);
294 }
295 if let Some(node) = ctx.node_id {
296 tenant_ctx = tenant_ctx.with_node(node);
297 }
298 if let Some(provider) = ctx.provider_id {
299 tenant_ctx = tenant_ctx.with_provider(provider);
300 }
301 if let Some(session) = ctx.session_id {
302 tenant_ctx = tenant_ctx.with_session(session);
303 }
304 tenant_ctx.trace_id = ctx.trace_id;
305 }
306 Ok(tenant_ctx)
307 }
308
309 fn send_http_request(
310 &mut self,
311 req: HttpRequest,
312 opts: Option<HttpRequestOptionsV1_1>,
313 _ctx: Option<HttpTenantCtx>,
314 ) -> Result<HttpResponse, HttpClientError> {
315 if !self.config.http_enabled {
316 return Err(HttpClientError {
317 code: "denied".into(),
318 message: "http client disabled by policy".into(),
319 });
320 }
321
322 let mut mock_state = None;
323 let raw_body = req.body.clone();
324 if let Some(mock) = &self.mocks
325 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
326 {
327 match mock.http_begin(&meta) {
328 HttpDecision::Mock(response) => {
329 let headers = response
330 .headers
331 .iter()
332 .map(|(k, v)| (k.clone(), v.clone()))
333 .collect();
334 return Ok(HttpResponse {
335 status: response.status,
336 headers,
337 body: response.body.clone().map(|b| b.into_bytes()),
338 });
339 }
340 HttpDecision::Deny(reason) => {
341 return Err(HttpClientError {
342 code: "denied".into(),
343 message: reason,
344 });
345 }
346 HttpDecision::Passthrough { record } => {
347 mock_state = Some((meta, record));
348 }
349 }
350 }
351
352 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
353 let mut builder = self.http_client.request(method, &req.url);
354 for (key, value) in req.headers {
355 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
356 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
357 {
358 builder = builder.header(header, header_value);
359 }
360 }
361
362 if let Some(body) = raw_body.clone() {
363 builder = builder.body(body);
364 }
365
366 if let Some(opts) = opts {
367 if let Some(timeout_ms) = opts.timeout_ms {
368 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
369 }
370 if opts.allow_insecure == Some(true) {
371 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
372 }
373 if let Some(follow_redirects) = opts.follow_redirects
374 && !follow_redirects
375 {
376 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
377 }
378 }
379
380 let response = match builder.send() {
381 Ok(resp) => resp,
382 Err(err) => {
383 warn!(url = %req.url, error = %err, "http client request failed");
384 return Err(HttpClientError {
385 code: "unavailable".into(),
386 message: err.to_string(),
387 });
388 }
389 };
390
391 let status = response.status().as_u16();
392 let headers_vec = response
393 .headers()
394 .iter()
395 .map(|(k, v)| {
396 (
397 k.as_str().to_string(),
398 v.to_str().unwrap_or_default().to_string(),
399 )
400 })
401 .collect::<Vec<_>>();
402 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
403
404 if let Some((meta, true)) = mock_state.take()
405 && let Some(mock) = &self.mocks
406 {
407 let recorded = HttpMockResponse::new(
408 status,
409 headers_vec.clone().into_iter().collect(),
410 body_bytes
411 .as_ref()
412 .map(|b| String::from_utf8_lossy(b).into_owned()),
413 );
414 mock.http_record(&meta, &recorded);
415 }
416
417 Ok(HttpResponse {
418 status,
419 headers: headers_vec,
420 body: body_bytes,
421 })
422 }
423}
424
425impl SecretsStoreHost for HostState {
426 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
427 if provider_core_only::is_enabled() {
428 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
429 return Err(SecretsError::Denied);
430 }
431 if !self.config.secrets_policy.is_allowed(&key) {
432 return Err(SecretsError::Denied);
433 }
434 if let Some(mock) = &self.mocks
435 && let Some(value) = mock.secrets_lookup(&key)
436 {
437 return Ok(Some(value.into_bytes()));
438 }
439 match read_secret_blocking(&self.secrets, &key) {
440 Ok(bytes) => Ok(Some(bytes)),
441 Err(err) => {
442 warn!(secret = %key, error = %err, "secret lookup failed");
443 Err(SecretsError::NotFound)
444 }
445 }
446 }
447}
448
449impl HttpClientHost for HostState {
450 fn send(
451 &mut self,
452 req: HttpRequest,
453 ctx: Option<HttpTenantCtx>,
454 ) -> Result<HttpResponse, HttpClientError> {
455 self.send_http_request(req, None, ctx)
456 }
457}
458
459impl HttpClientHostV1_1 for HostState {
460 fn send(
461 &mut self,
462 req: HttpRequestV1_1,
463 opts: Option<HttpRequestOptionsV1_1>,
464 ctx: Option<HttpTenantCtxV1_1>,
465 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
466 let legacy_req = HttpRequest {
467 method: req.method,
468 url: req.url,
469 headers: req.headers,
470 body: req.body,
471 };
472 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
473 env: ctx.env,
474 tenant: ctx.tenant,
475 tenant_id: ctx.tenant_id,
476 team: ctx.team,
477 team_id: ctx.team_id,
478 user: ctx.user,
479 user_id: ctx.user_id,
480 trace_id: ctx.trace_id,
481 correlation_id: ctx.correlation_id,
482 attributes: ctx.attributes,
483 session_id: ctx.session_id,
484 flow_id: ctx.flow_id,
485 node_id: ctx.node_id,
486 provider_id: ctx.provider_id,
487 deadline_ms: ctx.deadline_ms,
488 attempt: ctx.attempt,
489 idempotency_key: ctx.idempotency_key,
490 impersonation: ctx
491 .impersonation
492 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
493 actor_id,
494 reason,
495 }),
496 });
497
498 self.send_http_request(legacy_req, opts, legacy_ctx)
499 .map(|resp| HttpResponseV1_1 {
500 status: resp.status,
501 headers: resp.headers,
502 body: resp.body,
503 })
504 .map_err(|err| HttpClientErrorV1_1 {
505 code: err.code,
506 message: err.message,
507 })
508 }
509}
510
511impl StateStoreHost for HostState {
512 fn read(
513 &mut self,
514 key: HostStateKey,
515 ctx: Option<StateTenantCtx>,
516 ) -> Result<Vec<u8>, StateError> {
517 let store = match self.state_store.as_ref() {
518 Some(store) => store.clone(),
519 None => {
520 return Err(StateError {
521 code: "unavailable".into(),
522 message: "state store not configured".into(),
523 });
524 }
525 };
526 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
527 Ok(ctx) => ctx,
528 Err(err) => {
529 return Err(StateError {
530 code: "invalid-ctx".into(),
531 message: err.to_string(),
532 });
533 }
534 };
535 let key = StoreStateKey::from(key);
536 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
537 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
538 Ok(None) => Err(StateError {
539 code: "not_found".into(),
540 message: "state key not found".into(),
541 }),
542 Err(err) => Err(StateError {
543 code: "internal".into(),
544 message: err.to_string(),
545 }),
546 }
547 }
548
549 fn write(
550 &mut self,
551 key: HostStateKey,
552 bytes: Vec<u8>,
553 ctx: Option<StateTenantCtx>,
554 ) -> Result<StateOpAck, StateError> {
555 let store = match self.state_store.as_ref() {
556 Some(store) => store.clone(),
557 None => {
558 return Err(StateError {
559 code: "unavailable".into(),
560 message: "state store not configured".into(),
561 });
562 }
563 };
564 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
565 Ok(ctx) => ctx,
566 Err(err) => {
567 return Err(StateError {
568 code: "invalid-ctx".into(),
569 message: err.to_string(),
570 });
571 }
572 };
573 let key = StoreStateKey::from(key);
574 let value = serde_json::from_slice(&bytes)
575 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
576 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
577 Ok(()) => Ok(StateOpAck::Ok),
578 Err(err) => Err(StateError {
579 code: "internal".into(),
580 message: err.to_string(),
581 }),
582 }
583 }
584
585 fn delete(
586 &mut self,
587 key: HostStateKey,
588 ctx: Option<StateTenantCtx>,
589 ) -> Result<StateOpAck, StateError> {
590 let store = match self.state_store.as_ref() {
591 Some(store) => store.clone(),
592 None => {
593 return Err(StateError {
594 code: "unavailable".into(),
595 message: "state store not configured".into(),
596 });
597 }
598 };
599 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
600 Ok(ctx) => ctx,
601 Err(err) => {
602 return Err(StateError {
603 code: "invalid-ctx".into(),
604 message: err.to_string(),
605 });
606 }
607 };
608 let key = StoreStateKey::from(key);
609 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
610 Ok(_) => Ok(StateOpAck::Ok),
611 Err(err) => Err(StateError {
612 code: "internal".into(),
613 message: err.to_string(),
614 }),
615 }
616 }
617}
618
619impl TelemetryLoggerHost for HostState {
620 fn log(
621 &mut self,
622 span: TelemetrySpanContext,
623 fields: Vec<(String, String)>,
624 _ctx: Option<TelemetryTenantCtx>,
625 ) -> Result<TelemetryAck, TelemetryError> {
626 if let Some(mock) = &self.mocks
627 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
628 {
629 return Ok(TelemetryAck::Ok);
630 }
631 let mut map = serde_json::Map::new();
632 for (k, v) in fields {
633 map.insert(k, Value::String(v));
634 }
635 tracing::info!(
636 tenant = %span.tenant,
637 flow_id = %span.flow_id,
638 node = ?span.node_id,
639 provider = %span.provider,
640 fields = %serde_json::Value::Object(map.clone()),
641 "telemetry log from pack"
642 );
643 Ok(TelemetryAck::Ok)
644 }
645}
646
647impl RunnerHostHttp for HostState {
648 fn request(
649 &mut self,
650 method: String,
651 url: String,
652 headers: Vec<String>,
653 body: Option<Vec<u8>>,
654 ) -> Result<Vec<u8>, String> {
655 let req = HttpRequest {
656 method,
657 url,
658 headers: headers
659 .chunks(2)
660 .filter_map(|chunk| {
661 if chunk.len() == 2 {
662 Some((chunk[0].clone(), chunk[1].clone()))
663 } else {
664 None
665 }
666 })
667 .collect(),
668 body,
669 };
670 match HttpClientHost::send(self, req, None) {
671 Ok(resp) => Ok(resp.body.unwrap_or_default()),
672 Err(err) => Err(err.message),
673 }
674 }
675}
676
677impl RunnerHostKv for HostState {
678 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
679 None
680 }
681
682 fn put(&mut self, _ns: String, _key: String, _val: String) {}
683}
684
685enum ManifestLoad {
686 New {
687 manifest: Box<greentic_types::PackManifest>,
688 flows: PackFlows,
689 },
690 Legacy {
691 manifest: Box<legacy_pack::PackManifest>,
692 flows: PackFlows,
693 },
694}
695
696fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
697 let mut archive = ZipArchive::new(File::open(path)?)
698 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
699 let bytes = read_entry(&mut archive, "manifest.cbor")
700 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
701 match decode_pack_manifest(&bytes) {
702 Ok(manifest) => {
703 let cache = PackFlows::from_manifest(manifest.clone());
704 Ok(ManifestLoad::New {
705 manifest: Box::new(manifest),
706 flows: cache,
707 })
708 }
709 Err(err) => {
710 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
711 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
713 .context("failed to decode legacy pack manifest from manifest.cbor")?;
714 let flows = load_legacy_flows(&mut archive, &legacy)?;
715 Ok(ManifestLoad::Legacy {
716 manifest: Box::new(legacy),
717 flows,
718 })
719 }
720 }
721}
722
723fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
724 let manifest_path = root.join("manifest.cbor");
725 let bytes = std::fs::read(&manifest_path)
726 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
727 match decode_pack_manifest(&bytes) {
728 Ok(manifest) => {
729 let cache = PackFlows::from_manifest(manifest.clone());
730 Ok(ManifestLoad::New {
731 manifest: Box::new(manifest),
732 flows: cache,
733 })
734 }
735 Err(err) => {
736 tracing::debug!(
737 error = %err,
738 pack = %root.display(),
739 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
740 );
741 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
742 .context("failed to decode legacy pack manifest from manifest.cbor")?;
743 let flows = load_legacy_flows_from_dir(root, &legacy)?;
744 Ok(ManifestLoad::Legacy {
745 manifest: Box::new(legacy),
746 flows,
747 })
748 }
749 }
750}
751
752fn load_legacy_flows(
753 archive: &mut ZipArchive<File>,
754 manifest: &legacy_pack::PackManifest,
755) -> Result<PackFlows> {
756 let mut flows = HashMap::new();
757 let mut descriptors = Vec::new();
758
759 for entry in &manifest.flows {
760 let bytes = read_entry(archive, &entry.file_json)
761 .with_context(|| format!("missing flow json {}", entry.file_json))?;
762 let doc: FlowDoc = serde_json::from_slice(&bytes)
763 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
764 let normalized = normalize_flow_doc(doc);
765 let flow_ir = flow_doc_to_ir(normalized)?;
766 let flow = flow_ir_to_flow(flow_ir)?;
767
768 descriptors.push(FlowDescriptor {
769 id: entry.id.clone(),
770 flow_type: entry.kind.clone(),
771 profile: manifest.meta.pack_id.clone(),
772 version: manifest.meta.version.to_string(),
773 description: None,
774 });
775 flows.insert(entry.id.clone(), flow);
776 }
777
778 let mut entry_flows = manifest.meta.entry_flows.clone();
779 if entry_flows.is_empty() {
780 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
781 }
782 let metadata = PackMetadata {
783 pack_id: manifest.meta.pack_id.clone(),
784 version: manifest.meta.version.to_string(),
785 entry_flows,
786 secret_requirements: Vec::new(),
787 };
788
789 Ok(PackFlows {
790 descriptors,
791 flows,
792 metadata,
793 })
794}
795
796fn load_legacy_flows_from_dir(
797 root: &Path,
798 manifest: &legacy_pack::PackManifest,
799) -> Result<PackFlows> {
800 let mut flows = HashMap::new();
801 let mut descriptors = Vec::new();
802
803 for entry in &manifest.flows {
804 let path = root.join(&entry.file_json);
805 let bytes = std::fs::read(&path)
806 .with_context(|| format!("missing flow json {}", path.display()))?;
807 let doc: FlowDoc = serde_json::from_slice(&bytes)
808 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
809 let normalized = normalize_flow_doc(doc);
810 let flow_ir = flow_doc_to_ir(normalized)?;
811 let flow = flow_ir_to_flow(flow_ir)?;
812
813 descriptors.push(FlowDescriptor {
814 id: entry.id.clone(),
815 flow_type: entry.kind.clone(),
816 profile: manifest.meta.pack_id.clone(),
817 version: manifest.meta.version.to_string(),
818 description: None,
819 });
820 flows.insert(entry.id.clone(), flow);
821 }
822
823 let mut entry_flows = manifest.meta.entry_flows.clone();
824 if entry_flows.is_empty() {
825 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
826 }
827 let metadata = PackMetadata {
828 pack_id: manifest.meta.pack_id.clone(),
829 version: manifest.meta.version.to_string(),
830 entry_flows,
831 secret_requirements: Vec::new(),
832 };
833
834 Ok(PackFlows {
835 descriptors,
836 flows,
837 metadata,
838 })
839}
840
841pub struct ComponentState {
842 pub host: HostState,
843 wasi_ctx: WasiCtx,
844 resource_table: ResourceTable,
845}
846
847impl ComponentState {
848 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
849 let wasi_ctx = policy
850 .instantiate()
851 .context("failed to build WASI context")?;
852 Ok(Self {
853 host,
854 wasi_ctx,
855 resource_table: ResourceTable::new(),
856 })
857 }
858
859 fn host_mut(&mut self) -> &mut HostState {
860 &mut self.host
861 }
862
863 fn should_cancel_host(&mut self) -> bool {
864 false
865 }
866
867 fn yield_now_host(&mut self) {
868 }
870}
871
872impl component_api::v0_4::greentic::component::control::Host for ComponentState {
873 fn should_cancel(&mut self) -> bool {
874 self.should_cancel_host()
875 }
876
877 fn yield_now(&mut self) {
878 self.yield_now_host();
879 }
880}
881
882impl component_api::v0_5::greentic::component::control::Host for ComponentState {
883 fn should_cancel(&mut self) -> bool {
884 self.should_cancel_host()
885 }
886
887 fn yield_now(&mut self) {
888 self.yield_now_host();
889 }
890}
891
892fn add_component_control_instance(
893 linker: &mut Linker<ComponentState>,
894 name: &str,
895) -> wasmtime::Result<()> {
896 let mut inst = linker.instance(name)?;
897 inst.func_wrap(
898 "should-cancel",
899 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
900 let host = caller.data_mut();
901 Ok((host.should_cancel_host(),))
902 },
903 )?;
904 inst.func_wrap(
905 "yield-now",
906 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
907 let host = caller.data_mut();
908 host.yield_now_host();
909 Ok(())
910 },
911 )?;
912 Ok(())
913}
914
915fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
916 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
917 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
918 Ok(())
919}
920
921pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
922 add_wasi_to_linker(linker)?;
923 add_all_v1_to_linker(
924 linker,
925 HostFns {
926 http_client_v1_1: Some(|state| state.host_mut()),
927 http_client: Some(|state| state.host_mut()),
928 oauth_broker: None,
929 runner_host_http: Some(|state| state.host_mut()),
930 runner_host_kv: Some(|state| state.host_mut()),
931 telemetry_logger: Some(|state| state.host_mut()),
932 state_store: allow_state_store.then_some(|state| state.host_mut()),
933 secrets_store: Some(|state| state.host_mut()),
934 },
935 )?;
936 Ok(())
937}
938
939impl OAuthHostContext for ComponentState {
940 fn tenant_id(&self) -> &str {
941 &self.host.config.tenant
942 }
943
944 fn env(&self) -> &str {
945 &self.host.default_env
946 }
947
948 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
949 &mut self.host.oauth_host
950 }
951
952 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
953 self.host.oauth_config.as_ref()
954 }
955}
956
957impl WasiView for ComponentState {
958 fn ctx(&mut self) -> WasiCtxView<'_> {
959 WasiCtxView {
960 ctx: &mut self.wasi_ctx,
961 table: &mut self.resource_table,
962 }
963 }
964}
965
966#[allow(unsafe_code)]
967unsafe impl Send for ComponentState {}
968#[allow(unsafe_code)]
969unsafe impl Sync for ComponentState {}
970
971impl PackRuntime {
972 fn allows_state_store(&self, component_ref: &str) -> bool {
973 if self.state_store.is_none() {
974 return false;
975 }
976 if !self.config.state_store_policy.allow {
977 return false;
978 }
979 let Some(manifest) = self.component_manifests.get(component_ref) else {
980 return false;
981 };
982 manifest
983 .capabilities
984 .host
985 .state
986 .as_ref()
987 .map(|caps| caps.read || caps.write)
988 .unwrap_or(false)
989 }
990
991 #[allow(clippy::too_many_arguments)]
992 pub async fn load(
993 path: impl AsRef<Path>,
994 config: Arc<HostConfig>,
995 mocks: Option<Arc<MockLayer>>,
996 archive_source: Option<&Path>,
997 session_store: Option<DynSessionStore>,
998 state_store: Option<DynStateStore>,
999 wasi_policy: Arc<RunnerWasiPolicy>,
1000 secrets: DynSecretsManager,
1001 oauth_config: Option<OAuthBrokerConfig>,
1002 verify_archive: bool,
1003 component_resolution: ComponentResolution,
1004 ) -> Result<Self> {
1005 let path = path.as_ref();
1006 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1007 let path_meta = std::fs::metadata(&safe_path).ok();
1008 let is_dir = path_meta
1009 .as_ref()
1010 .map(|meta| meta.is_dir())
1011 .unwrap_or(false);
1012 let is_component = !is_dir
1013 && safe_path
1014 .extension()
1015 .and_then(|ext| ext.to_str())
1016 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1017 .unwrap_or(false);
1018 let archive_hint_path = if let Some(source) = archive_source {
1019 let (_, normalized) = normalize_pack_path(source)?;
1020 Some(normalized)
1021 } else if is_component || is_dir {
1022 None
1023 } else {
1024 Some(safe_path.clone())
1025 };
1026 let archive_hint = archive_hint_path.as_deref();
1027 if verify_archive {
1028 if let Some(verify_target) = archive_hint.and_then(|p| {
1029 std::fs::metadata(p)
1030 .ok()
1031 .filter(|meta| meta.is_file())
1032 .map(|_| p)
1033 }) {
1034 verify::verify_pack(verify_target).await?;
1035 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1036 } else {
1037 tracing::debug!("skipping archive verification (no archive source)");
1038 }
1039 }
1040 let engine = Engine::default();
1041 let engine_profile =
1042 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1043 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1044 let mut metadata = PackMetadata::fallback(&safe_path);
1045 let mut manifest = None;
1046 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1047 let mut flows = None;
1048 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1049 if is_dir {
1050 Some(safe_path.clone())
1051 } else {
1052 None
1053 }
1054 });
1055
1056 if let Some(root) = materialized_root.as_ref() {
1057 match load_manifest_and_flows_from_dir(root) {
1058 Ok(ManifestLoad::New {
1059 manifest: m,
1060 flows: cache,
1061 }) => {
1062 metadata = cache.metadata.clone();
1063 manifest = Some(*m);
1064 flows = Some(cache);
1065 }
1066 Ok(ManifestLoad::Legacy {
1067 manifest: m,
1068 flows: cache,
1069 }) => {
1070 metadata = cache.metadata.clone();
1071 legacy_manifest = Some(m);
1072 flows = Some(cache);
1073 }
1074 Err(err) => {
1075 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1076 }
1077 }
1078 }
1079
1080 if manifest.is_none()
1081 && legacy_manifest.is_none()
1082 && let Some(archive_path) = archive_hint
1083 {
1084 match load_manifest_and_flows(archive_path) {
1085 Ok(ManifestLoad::New {
1086 manifest: m,
1087 flows: cache,
1088 }) => {
1089 metadata = cache.metadata.clone();
1090 manifest = Some(*m);
1091 flows = Some(cache);
1092 }
1093 Ok(ManifestLoad::Legacy {
1094 manifest: m,
1095 flows: cache,
1096 }) => {
1097 metadata = cache.metadata.clone();
1098 legacy_manifest = Some(m);
1099 flows = Some(cache);
1100 }
1101 Err(err) => {
1102 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1103 }
1104 }
1105 }
1106 let mut pack_lock = None;
1107 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1108 pack_lock = load_pack_lock(&root)?;
1109 if pack_lock.is_some() {
1110 break;
1111 }
1112 }
1113 let component_sources_payload = if pack_lock.is_none() {
1114 if let Some(manifest) = manifest.as_ref() {
1115 manifest
1116 .get_component_sources_v1()
1117 .context("invalid component sources extension")?
1118 } else {
1119 None
1120 }
1121 } else {
1122 None
1123 };
1124 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1125 Some(component_sources_table_from_pack_lock(
1126 lock,
1127 component_resolution.allow_missing_hash,
1128 )?)
1129 } else {
1130 component_sources_table(component_sources_payload.as_ref())?
1131 };
1132 let components = if is_component {
1133 let wasm_bytes = fs::read(&safe_path).await?;
1134 metadata = PackMetadata::from_wasm(&wasm_bytes)
1135 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1136 let name = safe_path
1137 .file_stem()
1138 .map(|s| s.to_string_lossy().to_string())
1139 .unwrap_or_else(|| "component".to_string());
1140 let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1141 let mut map = HashMap::new();
1142 map.insert(
1143 name.clone(),
1144 PackComponent {
1145 name,
1146 version: metadata.version.clone(),
1147 component,
1148 },
1149 );
1150 map
1151 } else {
1152 let specs = component_specs(
1153 manifest.as_ref(),
1154 legacy_manifest.as_deref(),
1155 component_sources_payload.as_ref(),
1156 pack_lock.as_ref(),
1157 );
1158 if specs.is_empty() {
1159 HashMap::new()
1160 } else {
1161 let mut loaded = HashMap::new();
1162 let mut missing: HashSet<String> =
1163 specs.iter().map(|spec| spec.id.clone()).collect();
1164 let mut searched = Vec::new();
1165
1166 if !component_resolution.overrides.is_empty() {
1167 load_components_from_overrides(
1168 &cache,
1169 &engine,
1170 &component_resolution.overrides,
1171 &specs,
1172 &mut missing,
1173 &mut loaded,
1174 )
1175 .await?;
1176 searched.push("override map".to_string());
1177 }
1178
1179 if let Some(component_sources) = component_sources.as_ref() {
1180 load_components_from_sources(
1181 &cache,
1182 &engine,
1183 component_sources,
1184 &component_resolution,
1185 &specs,
1186 &mut missing,
1187 &mut loaded,
1188 materialized_root.as_deref(),
1189 archive_hint,
1190 )
1191 .await?;
1192 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1193 }
1194
1195 if let Some(root) = materialized_root.as_ref() {
1196 load_components_from_dir(
1197 &cache,
1198 &engine,
1199 root,
1200 &specs,
1201 &mut missing,
1202 &mut loaded,
1203 )
1204 .await?;
1205 searched.push(format!("components dir {}", root.display()));
1206 }
1207
1208 if let Some(archive_path) = archive_hint {
1209 load_components_from_archive(
1210 &cache,
1211 &engine,
1212 archive_path,
1213 &specs,
1214 &mut missing,
1215 &mut loaded,
1216 )
1217 .await?;
1218 searched.push(format!("archive {}", archive_path.display()));
1219 }
1220
1221 if !missing.is_empty() {
1222 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1223 let sources = if searched.is_empty() {
1224 "no component sources".to_string()
1225 } else {
1226 searched.join(", ")
1227 };
1228 bail!(
1229 "components missing: {}; looked in {}",
1230 missing_list,
1231 sources
1232 );
1233 }
1234
1235 loaded
1236 }
1237 };
1238 let http_client = Arc::clone(&HTTP_CLIENT);
1239 let mut component_manifests = HashMap::new();
1240 if let Some(manifest) = manifest.as_ref() {
1241 for component in &manifest.components {
1242 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1243 }
1244 }
1245 Ok(Self {
1246 path: safe_path,
1247 archive_path: archive_hint.map(Path::to_path_buf),
1248 config,
1249 engine,
1250 metadata,
1251 manifest,
1252 legacy_manifest,
1253 component_manifests,
1254 mocks,
1255 flows,
1256 components,
1257 http_client,
1258 pre_cache: Mutex::new(HashMap::new()),
1259 session_store,
1260 state_store,
1261 wasi_policy,
1262 provider_registry: RwLock::new(None),
1263 secrets,
1264 oauth_config,
1265 cache,
1266 })
1267 }
1268
1269 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1270 if let Some(cache) = &self.flows {
1271 return Ok(cache.descriptors.clone());
1272 }
1273 if let Some(manifest) = &self.manifest {
1274 let descriptors = manifest
1275 .flows
1276 .iter()
1277 .map(|flow| FlowDescriptor {
1278 id: flow.id.as_str().to_string(),
1279 flow_type: flow_kind_to_str(flow.kind).to_string(),
1280 profile: manifest.pack_id.as_str().to_string(),
1281 version: manifest.version.to_string(),
1282 description: None,
1283 })
1284 .collect();
1285 return Ok(descriptors);
1286 }
1287 Ok(Vec::new())
1288 }
1289
1290 #[allow(dead_code)]
1291 pub async fn run_flow(
1292 &self,
1293 flow_id: &str,
1294 input: serde_json::Value,
1295 ) -> Result<serde_json::Value> {
1296 let pack = Arc::new(
1297 PackRuntime::load(
1298 &self.path,
1299 Arc::clone(&self.config),
1300 self.mocks.clone(),
1301 self.archive_path.as_deref(),
1302 self.session_store.clone(),
1303 self.state_store.clone(),
1304 Arc::clone(&self.wasi_policy),
1305 self.secrets.clone(),
1306 self.oauth_config.clone(),
1307 false,
1308 ComponentResolution::default(),
1309 )
1310 .await?,
1311 );
1312
1313 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1314 let retry_config = self.config.retry_config().into();
1315 let mocks = pack.mocks.as_deref();
1316 let tenant = self.config.tenant.as_str();
1317
1318 let ctx = FlowContext {
1319 tenant,
1320 flow_id,
1321 node_id: None,
1322 tool: None,
1323 action: None,
1324 session_id: None,
1325 provider_id: None,
1326 retry_config,
1327 observer: None,
1328 mocks,
1329 };
1330
1331 let execution = engine.execute(ctx, input).await?;
1332 match execution.status {
1333 FlowStatus::Completed => Ok(execution.output),
1334 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1335 "status": "pending",
1336 "reason": wait.reason,
1337 "resume": wait.snapshot,
1338 "response": execution.output,
1339 })),
1340 }
1341 }
1342
1343 pub async fn invoke_component(
1344 &self,
1345 component_ref: &str,
1346 ctx: ComponentExecCtx,
1347 operation: &str,
1348 _config_json: Option<String>,
1349 input_json: String,
1350 ) -> Result<Value> {
1351 let pack_component = self
1352 .components
1353 .get(component_ref)
1354 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1355
1356 let mut linker = Linker::new(&self.engine);
1357 let allow_state_store = self.allows_state_store(component_ref);
1358 register_all(&mut linker, allow_state_store)?;
1359 add_component_control_to_linker(&mut linker)?;
1360
1361 let host_state = HostState::new(
1362 Arc::clone(&self.config),
1363 Arc::clone(&self.http_client),
1364 self.mocks.clone(),
1365 self.session_store.clone(),
1366 self.state_store.clone(),
1367 Arc::clone(&self.secrets),
1368 self.oauth_config.clone(),
1369 Some(ctx.clone()),
1370 )?;
1371 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1372 let mut store = wasmtime::Store::new(&self.engine, store_state);
1373 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1374 let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1375 Ok(pre) => {
1376 let bindings = pre.instantiate_async(&mut store).await?;
1377 let node = bindings.greentic_component_node();
1378 let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1379 let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1380 component_api::invoke_result_from_v0_5(result)
1381 }
1382 Err(err) => {
1383 if is_missing_node_export(&err, "0.5.0") {
1384 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1385 let pre: component_api::v0_4::ComponentPre<ComponentState> =
1386 component_api::v0_4::ComponentPre::new(pre_instance)?;
1387 let bindings = pre.instantiate_async(&mut store).await?;
1388 let node = bindings.greentic_component_node();
1389 let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1390 let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1391 component_api::invoke_result_from_v0_4(result)
1392 } else {
1393 return Err(err);
1394 }
1395 }
1396 };
1397
1398 match result {
1399 InvokeResult::Ok(body) => {
1400 if body.is_empty() {
1401 return Ok(Value::Null);
1402 }
1403 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1404 }
1405 InvokeResult::Err(NodeError {
1406 code,
1407 message,
1408 retryable,
1409 backoff_ms,
1410 details,
1411 }) => {
1412 let mut obj = serde_json::Map::new();
1413 obj.insert("ok".into(), Value::Bool(false));
1414 let mut error = serde_json::Map::new();
1415 error.insert("code".into(), Value::String(code));
1416 error.insert("message".into(), Value::String(message));
1417 error.insert("retryable".into(), Value::Bool(retryable));
1418 if let Some(backoff) = backoff_ms {
1419 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1420 }
1421 if let Some(details) = details {
1422 error.insert(
1423 "details".into(),
1424 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1425 );
1426 }
1427 obj.insert("error".into(), Value::Object(error));
1428 Ok(Value::Object(obj))
1429 }
1430 }
1431 }
1432
1433 pub fn resolve_provider(
1434 &self,
1435 provider_id: Option<&str>,
1436 provider_type: Option<&str>,
1437 ) -> Result<ProviderBinding> {
1438 let registry = self.provider_registry()?;
1439 registry.resolve(provider_id, provider_type)
1440 }
1441
1442 pub async fn invoke_provider(
1443 &self,
1444 binding: &ProviderBinding,
1445 ctx: ComponentExecCtx,
1446 op: &str,
1447 input_json: Vec<u8>,
1448 ) -> Result<Value> {
1449 let component_ref = &binding.component_ref;
1450 let pack_component = self
1451 .components
1452 .get(component_ref)
1453 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1454
1455 let mut linker = Linker::new(&self.engine);
1456 let allow_state_store = self.allows_state_store(component_ref);
1457 register_all(&mut linker, allow_state_store)?;
1458 add_component_control_to_linker(&mut linker)?;
1459 let pre_instance = linker.instantiate_pre(pack_component.component.as_ref())?;
1460 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1461
1462 let host_state = HostState::new(
1463 Arc::clone(&self.config),
1464 Arc::clone(&self.http_client),
1465 self.mocks.clone(),
1466 self.session_store.clone(),
1467 self.state_store.clone(),
1468 Arc::clone(&self.secrets),
1469 self.oauth_config.clone(),
1470 Some(ctx),
1471 )?;
1472 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1473 let mut store = wasmtime::Store::new(&self.engine, store_state);
1474 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1475 let provider = bindings.greentic_provider_core_schema_core_api();
1476
1477 let result = provider.call_invoke(&mut store, op, &input_json)?;
1478 deserialize_json_bytes(result)
1479 }
1480
1481 fn provider_registry(&self) -> Result<ProviderRegistry> {
1482 if let Some(registry) = self.provider_registry.read().clone() {
1483 return Ok(registry);
1484 }
1485 let manifest = self
1486 .manifest
1487 .as_ref()
1488 .context("pack manifest required for provider resolution")?;
1489 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1490 let registry = ProviderRegistry::new(
1491 manifest,
1492 self.state_store.clone(),
1493 &self.config.tenant,
1494 &env,
1495 )?;
1496 *self.provider_registry.write() = Some(registry.clone());
1497 Ok(registry)
1498 }
1499
1500 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1501 if let Some(cache) = &self.flows {
1502 return cache
1503 .flows
1504 .get(flow_id)
1505 .cloned()
1506 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1507 }
1508 if let Some(manifest) = &self.manifest {
1509 let entry = manifest
1510 .flows
1511 .iter()
1512 .find(|f| f.id.as_str() == flow_id)
1513 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1514 return Ok(entry.flow.clone());
1515 }
1516 bail!("flow '{flow_id}' not available (pack exports disabled)")
1517 }
1518
1519 pub fn metadata(&self) -> &PackMetadata {
1520 &self.metadata
1521 }
1522
1523 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1524 &self.metadata.secret_requirements
1525 }
1526
1527 pub fn missing_secrets(
1528 &self,
1529 tenant_ctx: &TypesTenantCtx,
1530 ) -> Vec<greentic_types::SecretRequirement> {
1531 let env = tenant_ctx.env.as_str().to_string();
1532 let tenant = tenant_ctx.tenant.as_str().to_string();
1533 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1534 self.required_secrets()
1535 .iter()
1536 .filter(|req| {
1537 if let Some(scope) = &req.scope {
1539 if scope.env != env {
1540 return false;
1541 }
1542 if scope.tenant != tenant {
1543 return false;
1544 }
1545 if let Some(ref team_req) = scope.team
1546 && team.as_ref() != Some(team_req)
1547 {
1548 return false;
1549 }
1550 }
1551 read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1552 })
1553 .cloned()
1554 .collect()
1555 }
1556
1557 pub fn for_component_test(
1558 components: Vec<(String, PathBuf)>,
1559 flows: HashMap<String, FlowIR>,
1560 config: Arc<HostConfig>,
1561 ) -> Result<Self> {
1562 let engine = Engine::default();
1563 let engine_profile =
1564 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1565 let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1566 let mut component_map = HashMap::new();
1567 for (name, path) in components {
1568 if !path.exists() {
1569 bail!("component artifact missing: {}", path.display());
1570 }
1571 let wasm_bytes = std::fs::read(&path)?;
1572 let component = Arc::new(
1573 Component::from_binary(&engine, &wasm_bytes)
1574 .with_context(|| format!("failed to compile component {}", path.display()))?,
1575 );
1576 component_map.insert(
1577 name.clone(),
1578 PackComponent {
1579 name,
1580 version: "0.0.0".into(),
1581 component,
1582 },
1583 );
1584 }
1585
1586 let mut flow_map = HashMap::new();
1587 let mut descriptors = Vec::new();
1588 for (id, ir) in flows {
1589 let flow_type = ir.flow_type.clone();
1590 let flow = flow_ir_to_flow(ir)?;
1591 flow_map.insert(id.clone(), flow);
1592 descriptors.push(FlowDescriptor {
1593 id: id.clone(),
1594 flow_type,
1595 profile: "test".into(),
1596 version: "0.0.0".into(),
1597 description: None,
1598 });
1599 }
1600 let flows_cache = PackFlows {
1601 descriptors: descriptors.clone(),
1602 flows: flow_map,
1603 metadata: PackMetadata::fallback(Path::new("component-test")),
1604 };
1605
1606 Ok(Self {
1607 path: PathBuf::new(),
1608 archive_path: None,
1609 config,
1610 engine,
1611 metadata: PackMetadata::fallback(Path::new("component-test")),
1612 manifest: None,
1613 legacy_manifest: None,
1614 component_manifests: HashMap::new(),
1615 mocks: None,
1616 flows: Some(flows_cache),
1617 components: component_map,
1618 http_client: Arc::clone(&HTTP_CLIENT),
1619 pre_cache: Mutex::new(HashMap::new()),
1620 session_store: None,
1621 state_store: None,
1622 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1623 provider_registry: RwLock::new(None),
1624 secrets: crate::secrets::default_manager(),
1625 oauth_config: None,
1626 cache,
1627 })
1628 }
1629}
1630
1631fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1632 let message = err.to_string();
1633 message.contains("no exported instance named")
1634 && message.contains(&format!("greentic:component/node@{version}"))
1635}
1636
1637struct PackFlows {
1638 descriptors: Vec<FlowDescriptor>,
1639 flows: HashMap<String, Flow>,
1640 metadata: PackMetadata,
1641}
1642
1643const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1644 "greentic.pack.runtime_flow",
1645 "greentic.pack.flow_runtime",
1646 "greentic.pack.runtime_flows",
1647];
1648
1649#[derive(Debug, Deserialize)]
1650struct RuntimeFlowBundle {
1651 flows: Vec<RuntimeFlow>,
1652}
1653
1654#[derive(Debug, Deserialize)]
1655struct RuntimeFlow {
1656 id: String,
1657 #[serde(alias = "flow_type")]
1658 kind: FlowKind,
1659 #[serde(default)]
1660 schema_version: Option<String>,
1661 #[serde(default)]
1662 start: Option<String>,
1663 #[serde(default)]
1664 entrypoints: BTreeMap<String, Value>,
1665 nodes: BTreeMap<String, RuntimeNode>,
1666 #[serde(default)]
1667 metadata: Option<FlowMetadata>,
1668}
1669
1670#[derive(Debug, Deserialize)]
1671struct RuntimeNode {
1672 #[serde(alias = "component")]
1673 component_id: String,
1674 #[serde(default, alias = "operation")]
1675 operation_name: Option<String>,
1676 #[serde(default, alias = "payload", alias = "input")]
1677 operation_payload: Value,
1678 #[serde(default)]
1679 routing: Option<Routing>,
1680 #[serde(default)]
1681 telemetry: Option<TelemetryHints>,
1682}
1683
1684fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1685 if bytes.is_empty() {
1686 return Ok(Value::Null);
1687 }
1688 serde_json::from_slice(&bytes).or_else(|_| {
1689 String::from_utf8(bytes)
1690 .map(Value::String)
1691 .map_err(|err| anyhow!(err))
1692 })
1693}
1694
1695impl PackFlows {
1696 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1697 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1698 return flows;
1699 }
1700 let descriptors = manifest
1701 .flows
1702 .iter()
1703 .map(|entry| FlowDescriptor {
1704 id: entry.id.as_str().to_string(),
1705 flow_type: flow_kind_to_str(entry.kind).to_string(),
1706 profile: manifest.pack_id.as_str().to_string(),
1707 version: manifest.version.to_string(),
1708 description: None,
1709 })
1710 .collect();
1711 let mut flows = HashMap::new();
1712 for entry in &manifest.flows {
1713 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1714 }
1715 Self {
1716 metadata: PackMetadata::from_manifest(&manifest),
1717 descriptors,
1718 flows,
1719 }
1720 }
1721}
1722
1723fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1724 let extensions = manifest.extensions.as_ref()?;
1725 let extension = extensions.iter().find_map(|(key, ext)| {
1726 if RUNTIME_FLOW_EXTENSION_IDS
1727 .iter()
1728 .any(|candidate| candidate == key)
1729 {
1730 Some(ext)
1731 } else {
1732 None
1733 }
1734 })?;
1735 let runtime_flows = match decode_runtime_flow_extension(extension) {
1736 Some(flows) if !flows.is_empty() => flows,
1737 _ => return None,
1738 };
1739
1740 let descriptors = runtime_flows
1741 .iter()
1742 .map(|flow| FlowDescriptor {
1743 id: flow.id.as_str().to_string(),
1744 flow_type: flow_kind_to_str(flow.kind).to_string(),
1745 profile: manifest.pack_id.as_str().to_string(),
1746 version: manifest.version.to_string(),
1747 description: None,
1748 })
1749 .collect::<Vec<_>>();
1750 let flows = runtime_flows
1751 .into_iter()
1752 .map(|flow| (flow.id.as_str().to_string(), flow))
1753 .collect();
1754
1755 Some(PackFlows {
1756 metadata: PackMetadata::from_manifest(manifest),
1757 descriptors,
1758 flows,
1759 })
1760}
1761
1762fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1763 let value = match extension.inline.as_ref()? {
1764 ExtensionInline::Other(value) => value.clone(),
1765 _ => return None,
1766 };
1767
1768 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1769 return Some(collect_runtime_flows(bundle.flows));
1770 }
1771
1772 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1773 return Some(collect_runtime_flows(flows));
1774 }
1775
1776 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1777 return Some(flows);
1778 }
1779
1780 warn!(
1781 extension = %extension.kind,
1782 version = %extension.version,
1783 "runtime flow extension present but could not be decoded"
1784 );
1785 None
1786}
1787
1788fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1789 flows
1790 .into_iter()
1791 .filter_map(|flow| match runtime_flow_to_flow(flow) {
1792 Ok(flow) => Some(flow),
1793 Err(err) => {
1794 warn!(error = %err, "failed to decode runtime flow");
1795 None
1796 }
1797 })
1798 .collect()
1799}
1800
1801fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1802 let flow_id = FlowId::from_str(&runtime.id)
1803 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1804 let mut entrypoints = runtime.entrypoints;
1805 if entrypoints.is_empty()
1806 && let Some(start) = &runtime.start
1807 {
1808 entrypoints.insert("default".into(), Value::String(start.clone()));
1809 }
1810
1811 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1812 for (id, node) in runtime.nodes {
1813 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1814 let component_id = ComponentId::from_str(&node.component_id)
1815 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1816 let component = FlowComponentRef {
1817 id: component_id,
1818 pack_alias: None,
1819 operation: node.operation_name,
1820 };
1821 let routing = node.routing.unwrap_or(Routing::End);
1822 let telemetry = node.telemetry.unwrap_or_default();
1823 nodes.insert(
1824 node_id.clone(),
1825 Node {
1826 id: node_id,
1827 component,
1828 input: InputMapping {
1829 mapping: node.operation_payload,
1830 },
1831 output: OutputMapping {
1832 mapping: Value::Null,
1833 },
1834 routing,
1835 telemetry,
1836 },
1837 );
1838 }
1839
1840 Ok(Flow {
1841 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1842 id: flow_id,
1843 kind: runtime.kind,
1844 entrypoints,
1845 nodes,
1846 metadata: runtime.metadata.unwrap_or_default(),
1847 })
1848}
1849
1850fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1851 match kind {
1852 greentic_types::FlowKind::Messaging => "messaging",
1853 greentic_types::FlowKind::Event => "event",
1854 greentic_types::FlowKind::ComponentConfig => "component-config",
1855 greentic_types::FlowKind::Job => "job",
1856 greentic_types::FlowKind::Http => "http",
1857 }
1858}
1859
1860fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1861 let mut file = archive
1862 .by_name(name)
1863 .with_context(|| format!("entry {name} missing from archive"))?;
1864 let mut buf = Vec::new();
1865 file.read_to_end(&mut buf)?;
1866 Ok(buf)
1867}
1868
1869fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1870 for node in doc.nodes.values_mut() {
1871 let Some((component_ref, payload)) = node
1872 .raw
1873 .iter()
1874 .next()
1875 .map(|(key, value)| (key.clone(), value.clone()))
1876 else {
1877 continue;
1878 };
1879 if component_ref.starts_with("emit.") {
1880 node.operation = Some(component_ref);
1881 node.payload = payload;
1882 node.raw.clear();
1883 continue;
1884 }
1885 let (target_component, operation, input, config) =
1886 infer_component_exec(&payload, &component_ref);
1887 let mut payload_obj = serde_json::Map::new();
1888 payload_obj.insert("component".into(), Value::String(target_component));
1890 payload_obj.insert("operation".into(), Value::String(operation));
1891 payload_obj.insert("input".into(), input);
1892 if let Some(cfg) = config {
1893 payload_obj.insert("config".into(), cfg);
1894 }
1895 node.operation = Some("component.exec".to_string());
1896 node.payload = Value::Object(payload_obj);
1897 node.raw.clear();
1898 }
1899 doc
1900}
1901
1902fn infer_component_exec(
1903 payload: &Value,
1904 component_ref: &str,
1905) -> (String, String, Value, Option<Value>) {
1906 let default_op = if component_ref.starts_with("templating.") {
1907 "render"
1908 } else {
1909 "invoke"
1910 }
1911 .to_string();
1912
1913 if let Value::Object(map) = payload {
1914 let op = map
1915 .get("op")
1916 .or_else(|| map.get("operation"))
1917 .and_then(Value::as_str)
1918 .map(|s| s.to_string())
1919 .unwrap_or_else(|| default_op.clone());
1920
1921 let mut input = map.clone();
1922 let config = input.remove("config");
1923 let component = input
1924 .get("component")
1925 .or_else(|| input.get("component_ref"))
1926 .and_then(Value::as_str)
1927 .map(|s| s.to_string())
1928 .unwrap_or_else(|| component_ref.to_string());
1929 input.remove("component");
1930 input.remove("component_ref");
1931 input.remove("op");
1932 input.remove("operation");
1933 return (component, op, Value::Object(input), config);
1934 }
1935
1936 (component_ref.to_string(), default_op, payload.clone(), None)
1937}
1938
1939#[derive(Clone, Debug)]
1940struct ComponentSpec {
1941 id: String,
1942 version: String,
1943 legacy_path: Option<String>,
1944}
1945
1946#[derive(Clone, Debug)]
1947struct ComponentSourceInfo {
1948 digest: Option<String>,
1949 source: ComponentSourceRef,
1950 artifact: ComponentArtifactLocation,
1951 expected_wasm_sha256: Option<String>,
1952 skip_digest_verification: bool,
1953}
1954
1955#[derive(Clone, Debug)]
1956enum ComponentArtifactLocation {
1957 Inline { wasm_path: String },
1958 Remote,
1959}
1960
1961#[derive(Clone, Debug, Deserialize)]
1962struct PackLockV1 {
1963 schema_version: u32,
1964 components: Vec<PackLockComponent>,
1965}
1966
1967#[derive(Clone, Debug, Deserialize)]
1968struct PackLockComponent {
1969 name: String,
1970 #[serde(default, rename = "source_ref")]
1971 source_ref: Option<String>,
1972 #[serde(default, rename = "ref")]
1973 legacy_ref: Option<String>,
1974 #[serde(default)]
1975 component_id: Option<ComponentId>,
1976 #[serde(default)]
1977 bundled: Option<bool>,
1978 #[serde(default, rename = "bundled_path")]
1979 bundled_path: Option<String>,
1980 #[serde(default, rename = "path")]
1981 legacy_path: Option<String>,
1982 #[serde(default)]
1983 wasm_sha256: Option<String>,
1984 #[serde(default, rename = "sha256")]
1985 legacy_sha256: Option<String>,
1986 #[serde(default)]
1987 resolved_digest: Option<String>,
1988 #[serde(default)]
1989 digest: Option<String>,
1990}
1991
1992fn component_specs(
1993 manifest: Option<&greentic_types::PackManifest>,
1994 legacy_manifest: Option<&legacy_pack::PackManifest>,
1995 component_sources: Option<&ComponentSourcesV1>,
1996 pack_lock: Option<&PackLockV1>,
1997) -> Vec<ComponentSpec> {
1998 if let Some(manifest) = manifest {
1999 if !manifest.components.is_empty() {
2000 return manifest
2001 .components
2002 .iter()
2003 .map(|entry| ComponentSpec {
2004 id: entry.id.as_str().to_string(),
2005 version: entry.version.to_string(),
2006 legacy_path: None,
2007 })
2008 .collect();
2009 }
2010 if let Some(lock) = pack_lock {
2011 let mut seen = HashSet::new();
2012 let mut specs = Vec::new();
2013 for entry in &lock.components {
2014 let id = entry
2015 .component_id
2016 .as_ref()
2017 .map(|id| id.as_str())
2018 .unwrap_or(entry.name.as_str());
2019 if seen.insert(id.to_string()) {
2020 specs.push(ComponentSpec {
2021 id: id.to_string(),
2022 version: "0.0.0".to_string(),
2023 legacy_path: None,
2024 });
2025 }
2026 }
2027 return specs;
2028 }
2029 if let Some(sources) = component_sources {
2030 let mut seen = HashSet::new();
2031 let mut specs = Vec::new();
2032 for entry in &sources.components {
2033 let id = entry
2034 .component_id
2035 .as_ref()
2036 .map(|id| id.as_str())
2037 .unwrap_or(entry.name.as_str());
2038 if seen.insert(id.to_string()) {
2039 specs.push(ComponentSpec {
2040 id: id.to_string(),
2041 version: "0.0.0".to_string(),
2042 legacy_path: None,
2043 });
2044 }
2045 }
2046 return specs;
2047 }
2048 }
2049 if let Some(legacy_manifest) = legacy_manifest {
2050 return legacy_manifest
2051 .components
2052 .iter()
2053 .map(|entry| ComponentSpec {
2054 id: entry.name.clone(),
2055 version: entry.version.to_string(),
2056 legacy_path: Some(entry.file_wasm.clone()),
2057 })
2058 .collect();
2059 }
2060 Vec::new()
2061}
2062
2063fn component_sources_table(
2064 sources: Option<&ComponentSourcesV1>,
2065) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2066 let Some(sources) = sources else {
2067 return Ok(None);
2068 };
2069 let mut table = HashMap::new();
2070 for entry in &sources.components {
2071 let artifact = match &entry.artifact {
2072 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2073 wasm_path: wasm_path.clone(),
2074 },
2075 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2076 };
2077 let info = ComponentSourceInfo {
2078 digest: Some(entry.resolved.digest.clone()),
2079 source: entry.source.clone(),
2080 artifact,
2081 expected_wasm_sha256: None,
2082 skip_digest_verification: false,
2083 };
2084 if let Some(component_id) = entry.component_id.as_ref() {
2085 table.insert(component_id.as_str().to_string(), info.clone());
2086 }
2087 table.insert(entry.name.clone(), info);
2088 }
2089 Ok(Some(table))
2090}
2091
2092fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2093 let lock_path = if path.is_dir() {
2094 let candidate = path.join("pack.lock");
2095 if candidate.exists() {
2096 Some(candidate)
2097 } else {
2098 let candidate = path.join("pack.lock.json");
2099 candidate.exists().then_some(candidate)
2100 }
2101 } else {
2102 None
2103 };
2104 let Some(lock_path) = lock_path else {
2105 return Ok(None);
2106 };
2107 let raw = std::fs::read_to_string(&lock_path)
2108 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2109 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2110 if lock.schema_version != 1 {
2111 bail!("pack.lock schema_version must be 1");
2112 }
2113 Ok(Some(lock))
2114}
2115
2116fn find_pack_lock_roots(
2117 pack_path: &Path,
2118 is_dir: bool,
2119 archive_hint: Option<&Path>,
2120) -> Vec<PathBuf> {
2121 if is_dir {
2122 return vec![pack_path.to_path_buf()];
2123 }
2124 let mut roots = Vec::new();
2125 if let Some(archive_path) = archive_hint {
2126 if let Some(parent) = archive_path.parent() {
2127 roots.push(parent.to_path_buf());
2128 if let Some(grandparent) = parent.parent() {
2129 roots.push(grandparent.to_path_buf());
2130 }
2131 }
2132 } else if let Some(parent) = pack_path.parent() {
2133 roots.push(parent.to_path_buf());
2134 if let Some(grandparent) = parent.parent() {
2135 roots.push(grandparent.to_path_buf());
2136 }
2137 }
2138 roots
2139}
2140
2141fn normalize_sha256(digest: &str) -> Result<String> {
2142 let trimmed = digest.trim();
2143 if trimmed.is_empty() {
2144 bail!("sha256 digest cannot be empty");
2145 }
2146 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2147 if stripped.is_empty() {
2148 bail!("sha256 digest must include hex bytes after sha256:");
2149 }
2150 return Ok(trimmed.to_string());
2151 }
2152 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2153 return Ok(format!("sha256:{trimmed}"));
2154 }
2155 bail!("sha256 digest must be hex or sha256:<hex>");
2156}
2157
2158fn component_sources_table_from_pack_lock(
2159 lock: &PackLockV1,
2160 allow_missing_hash: bool,
2161) -> Result<HashMap<String, ComponentSourceInfo>> {
2162 let mut table = HashMap::new();
2163 let mut names = HashSet::new();
2164 for entry in &lock.components {
2165 if !names.insert(entry.name.clone()) {
2166 bail!(
2167 "pack.lock contains duplicate component name `{}`",
2168 entry.name
2169 );
2170 }
2171 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2172 (Some(primary), Some(legacy)) => {
2173 if primary != legacy {
2174 bail!(
2175 "pack.lock component {} has conflicting refs: {} vs {}",
2176 entry.name,
2177 primary,
2178 legacy
2179 );
2180 }
2181 primary.as_str()
2182 }
2183 (Some(primary), None) => primary.as_str(),
2184 (None, Some(legacy)) => legacy.as_str(),
2185 (None, None) => {
2186 bail!("pack.lock component {} missing source_ref", entry.name);
2187 }
2188 };
2189 let source: ComponentSourceRef = source_ref
2190 .parse()
2191 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2192 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2193 (Some(primary), Some(legacy)) => {
2194 if primary != legacy {
2195 bail!(
2196 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2197 entry.name,
2198 primary,
2199 legacy
2200 );
2201 }
2202 Some(primary.clone())
2203 }
2204 (Some(primary), None) => Some(primary.clone()),
2205 (None, Some(legacy)) => Some(legacy.clone()),
2206 (None, None) => None,
2207 };
2208 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2209 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2210 let wasm_path = bundled_path.ok_or_else(|| {
2211 anyhow!(
2212 "pack.lock component {} marked bundled but bundled_path is missing",
2213 entry.name
2214 )
2215 })?;
2216 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2217 (Some(primary), Some(legacy)) => {
2218 if primary != legacy {
2219 bail!(
2220 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2221 entry.name,
2222 primary,
2223 legacy
2224 );
2225 }
2226 Some(primary.as_str())
2227 }
2228 (Some(primary), None) => Some(primary.as_str()),
2229 (None, Some(legacy)) => Some(legacy.as_str()),
2230 (None, None) => None,
2231 };
2232 let expected = match expected_raw {
2233 Some(value) => Some(normalize_sha256(value)?),
2234 None => None,
2235 };
2236 if expected.is_none() && !allow_missing_hash {
2237 bail!(
2238 "pack.lock component {} missing wasm_sha256 for bundled component",
2239 entry.name
2240 );
2241 }
2242 (
2243 ComponentArtifactLocation::Inline { wasm_path },
2244 expected.clone(),
2245 expected,
2246 allow_missing_hash && expected_raw.is_none(),
2247 )
2248 } else {
2249 if source.is_tag() {
2250 bail!(
2251 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2252 entry.name,
2253 source
2254 );
2255 }
2256 let expected = entry
2257 .resolved_digest
2258 .as_deref()
2259 .or(entry.digest.as_deref())
2260 .ok_or_else(|| {
2261 anyhow!(
2262 "pack.lock component {} missing resolved_digest for remote component",
2263 entry.name
2264 )
2265 })?;
2266 (
2267 ComponentArtifactLocation::Remote,
2268 Some(normalize_digest(expected)),
2269 None,
2270 false,
2271 )
2272 };
2273 let info = ComponentSourceInfo {
2274 digest,
2275 source,
2276 artifact,
2277 expected_wasm_sha256,
2278 skip_digest_verification,
2279 };
2280 if let Some(component_id) = entry.component_id.as_ref() {
2281 let key = component_id.as_str().to_string();
2282 if table.contains_key(&key) {
2283 bail!(
2284 "pack.lock contains duplicate component id `{}`",
2285 component_id.as_str()
2286 );
2287 }
2288 table.insert(key, info.clone());
2289 }
2290 if entry.name
2291 != entry
2292 .component_id
2293 .as_ref()
2294 .map(|id| id.as_str())
2295 .unwrap_or("")
2296 {
2297 table.insert(entry.name.clone(), info);
2298 }
2299 }
2300 Ok(table)
2301}
2302
2303fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2304 if let Some(path) = &spec.legacy_path {
2305 return root.join(path);
2306 }
2307 root.join("components").join(format!("{}.wasm", spec.id))
2308}
2309
2310fn normalize_digest(digest: &str) -> String {
2311 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2312 digest.to_string()
2313 } else {
2314 format!("sha256:{digest}")
2315 }
2316}
2317
2318fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2319 if digest.starts_with("blake3:") {
2320 let hash = blake3::hash(bytes);
2321 return Ok(format!("blake3:{}", hash.to_hex()));
2322 }
2323 let mut hasher = sha2::Sha256::new();
2324 hasher.update(bytes);
2325 Ok(format!("sha256:{:x}", hasher.finalize()))
2326}
2327
2328fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2329 let mut hasher = sha2::Sha256::new();
2330 hasher.update(bytes);
2331 format!("sha256:{:x}", hasher.finalize())
2332}
2333
2334fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2335 let wasm_digest = digest
2336 .map(normalize_digest)
2337 .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2338 ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2339}
2340
2341async fn compile_component_with_cache(
2342 cache: &CacheManager,
2343 engine: &Engine,
2344 digest: Option<&str>,
2345 bytes: Vec<u8>,
2346) -> Result<Arc<Component>> {
2347 let key = build_artifact_key(cache, digest, &bytes);
2348 cache.get_component(engine, &key, || Ok(bytes)).await
2349}
2350
2351fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2352 let normalized_expected = normalize_digest(expected);
2353 let actual = compute_digest_for(bytes, &normalized_expected)?;
2354 if normalize_digest(&actual) != normalized_expected {
2355 bail!(
2356 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2357 );
2358 }
2359 Ok(())
2360}
2361
2362fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2363 let normalized_expected = normalize_sha256(expected)?;
2364 let actual = compute_sha256_digest_for(bytes);
2365 if actual != normalized_expected {
2366 bail!(
2367 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2368 );
2369 }
2370 Ok(())
2371}
2372
2373#[cfg(test)]
2374mod pack_lock_tests {
2375 use super::*;
2376 use tempfile::TempDir;
2377
2378 #[test]
2379 fn pack_lock_tag_ref_requires_bundle() {
2380 let lock = PackLockV1 {
2381 schema_version: 1,
2382 components: vec![PackLockComponent {
2383 name: "templates".to_string(),
2384 source_ref: Some("oci://registry.test/templates:latest".to_string()),
2385 legacy_ref: None,
2386 component_id: None,
2387 bundled: Some(false),
2388 bundled_path: None,
2389 legacy_path: None,
2390 wasm_sha256: None,
2391 legacy_sha256: None,
2392 resolved_digest: None,
2393 digest: None,
2394 }],
2395 };
2396 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2397 assert!(
2398 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2399 "unexpected error: {err}"
2400 );
2401 }
2402
2403 #[test]
2404 fn bundled_hash_mismatch_errors() {
2405 let rt = tokio::runtime::Runtime::new().expect("runtime");
2406 let temp = TempDir::new().expect("temp dir");
2407 let engine = Engine::default();
2408 let engine_profile =
2409 EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2410 let cache_config = CacheConfig {
2411 root: temp.path().join("cache"),
2412 ..CacheConfig::default()
2413 };
2414 let cache = CacheManager::new(cache_config, engine_profile);
2415 let wasm_path = temp.path().join("component.wasm");
2416 let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2417 .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2418 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2419 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2420
2421 let spec = ComponentSpec {
2422 id: "qa.process".to_string(),
2423 version: "0.0.0".to_string(),
2424 legacy_path: None,
2425 };
2426 let mut missing = HashSet::new();
2427 missing.insert(spec.id.clone());
2428
2429 let mut sources = HashMap::new();
2430 sources.insert(
2431 spec.id.clone(),
2432 ComponentSourceInfo {
2433 digest: Some("sha256:deadbeef".to_string()),
2434 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2435 artifact: ComponentArtifactLocation::Inline {
2436 wasm_path: "component.wasm".to_string(),
2437 },
2438 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2439 skip_digest_verification: false,
2440 },
2441 );
2442
2443 let mut loaded = HashMap::new();
2444 let result = rt.block_on(load_components_from_sources(
2445 &cache,
2446 &engine,
2447 &sources,
2448 &ComponentResolution::default(),
2449 &[spec],
2450 &mut missing,
2451 &mut loaded,
2452 Some(temp.path()),
2453 None,
2454 ));
2455 let err = result.unwrap_err();
2456 assert!(
2457 err.to_string().contains("bundled digest mismatch"),
2458 "unexpected error: {err}"
2459 );
2460 }
2461}
2462
2463fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2464 let mut opts = DistOptions {
2465 allow_tags: true,
2466 ..DistOptions::default()
2467 };
2468 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2469 opts.cache_dir = cache_dir;
2470 }
2471 if component_resolution.dist_offline {
2472 opts.offline = true;
2473 }
2474 opts
2475}
2476
2477#[allow(clippy::too_many_arguments)]
2478async fn load_components_from_sources(
2479 cache: &CacheManager,
2480 engine: &Engine,
2481 component_sources: &HashMap<String, ComponentSourceInfo>,
2482 component_resolution: &ComponentResolution,
2483 specs: &[ComponentSpec],
2484 missing: &mut HashSet<String>,
2485 into: &mut HashMap<String, PackComponent>,
2486 materialized_root: Option<&Path>,
2487 archive_hint: Option<&Path>,
2488) -> Result<()> {
2489 let mut archive = if let Some(path) = archive_hint {
2490 Some(
2491 ZipArchive::new(File::open(path)?)
2492 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2493 )
2494 } else {
2495 None
2496 };
2497 let mut dist_client: Option<DistClient> = None;
2498
2499 for spec in specs {
2500 if !missing.contains(&spec.id) {
2501 continue;
2502 }
2503 let Some(source) = component_sources.get(&spec.id) else {
2504 continue;
2505 };
2506
2507 let bytes = match &source.artifact {
2508 ComponentArtifactLocation::Inline { wasm_path } => {
2509 if let Some(root) = materialized_root {
2510 let path = root.join(wasm_path);
2511 if path.exists() {
2512 std::fs::read(&path).with_context(|| {
2513 format!(
2514 "failed to read inline component {} from {}",
2515 spec.id,
2516 path.display()
2517 )
2518 })?
2519 } else if archive.is_none() {
2520 bail!("inline component {} missing at {}", spec.id, path.display());
2521 } else {
2522 read_entry(
2523 archive.as_mut().expect("archive present when needed"),
2524 wasm_path,
2525 )
2526 .with_context(|| {
2527 format!(
2528 "inline component {} missing at {} in pack archive",
2529 spec.id, wasm_path
2530 )
2531 })?
2532 }
2533 } else if let Some(archive) = archive.as_mut() {
2534 read_entry(archive, wasm_path).with_context(|| {
2535 format!(
2536 "inline component {} missing at {} in pack archive",
2537 spec.id, wasm_path
2538 )
2539 })?
2540 } else {
2541 bail!(
2542 "inline component {} missing and no pack source available",
2543 spec.id
2544 );
2545 }
2546 }
2547 ComponentArtifactLocation::Remote => {
2548 if source.source.is_tag() {
2549 bail!(
2550 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2551 spec.id,
2552 source.source
2553 );
2554 }
2555 let client = dist_client.get_or_insert_with(|| {
2556 DistClient::new(dist_options_from(component_resolution))
2557 });
2558 let reference = source.source.to_string();
2559 let digest = source.digest.as_deref().ok_or_else(|| {
2560 anyhow!(
2561 "component {} missing expected digest for remote component",
2562 spec.id
2563 )
2564 })?;
2565 let cache_path = if component_resolution.dist_offline {
2566 client
2567 .fetch_digest(digest)
2568 .await
2569 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2570 } else {
2571 let resolved = client
2572 .resolve_ref(&reference)
2573 .await
2574 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2575 let expected = normalize_digest(digest);
2576 let actual = normalize_digest(&resolved.digest);
2577 if expected != actual {
2578 bail!(
2579 "component {} digest mismatch after fetch: expected {}, got {}",
2580 spec.id,
2581 expected,
2582 actual
2583 );
2584 }
2585 resolved.cache_path.ok_or_else(|| {
2586 anyhow!(
2587 "component {} resolved from {} but cache path is missing",
2588 spec.id,
2589 reference
2590 )
2591 })?
2592 };
2593 std::fs::read(&cache_path).with_context(|| {
2594 format!(
2595 "failed to read cached component {} from {}",
2596 spec.id,
2597 cache_path.display()
2598 )
2599 })?
2600 }
2601 };
2602
2603 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
2604 verify_wasm_sha256(&spec.id, expected, &bytes)?;
2605 } else if source.skip_digest_verification {
2606 let actual = compute_sha256_digest_for(&bytes);
2607 warn!(
2608 component_id = %spec.id,
2609 digest = %actual,
2610 "bundled component missing wasm_sha256; allowing due to flag"
2611 );
2612 } else {
2613 let expected = source.digest.as_deref().ok_or_else(|| {
2614 anyhow!(
2615 "component {} missing expected digest for verification",
2616 spec.id
2617 )
2618 })?;
2619 verify_component_digest(&spec.id, expected, &bytes)?;
2620 }
2621 let component =
2622 compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
2623 .await
2624 .with_context(|| format!("failed to compile component {}", spec.id))?;
2625 into.insert(
2626 spec.id.clone(),
2627 PackComponent {
2628 name: spec.id.clone(),
2629 version: spec.version.clone(),
2630 component,
2631 },
2632 );
2633 missing.remove(&spec.id);
2634 }
2635
2636 Ok(())
2637}
2638
2639fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2640 match err {
2641 DistError::CacheMiss { reference: missing } => anyhow!(
2642 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2643 component_id,
2644 missing,
2645 reference
2646 ),
2647 DistError::Offline { reference: blocked } => anyhow!(
2648 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2649 component_id,
2650 blocked,
2651 reference
2652 ),
2653 DistError::AuthRequired { target } => anyhow!(
2654 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2655 component_id,
2656 target,
2657 reference
2658 ),
2659 other => anyhow!(
2660 "failed to resolve component {} from {}: {}",
2661 component_id,
2662 reference,
2663 other
2664 ),
2665 }
2666}
2667
2668async fn load_components_from_overrides(
2669 cache: &CacheManager,
2670 engine: &Engine,
2671 overrides: &HashMap<String, PathBuf>,
2672 specs: &[ComponentSpec],
2673 missing: &mut HashSet<String>,
2674 into: &mut HashMap<String, PackComponent>,
2675) -> Result<()> {
2676 for spec in specs {
2677 if !missing.contains(&spec.id) {
2678 continue;
2679 }
2680 let Some(path) = overrides.get(&spec.id) else {
2681 continue;
2682 };
2683 let bytes = std::fs::read(path)
2684 .with_context(|| format!("failed to read override component {}", path.display()))?;
2685 let component = compile_component_with_cache(cache, engine, None, bytes)
2686 .await
2687 .with_context(|| {
2688 format!(
2689 "failed to compile component {} from override {}",
2690 spec.id,
2691 path.display()
2692 )
2693 })?;
2694 into.insert(
2695 spec.id.clone(),
2696 PackComponent {
2697 name: spec.id.clone(),
2698 version: spec.version.clone(),
2699 component,
2700 },
2701 );
2702 missing.remove(&spec.id);
2703 }
2704 Ok(())
2705}
2706
2707async fn load_components_from_dir(
2708 cache: &CacheManager,
2709 engine: &Engine,
2710 root: &Path,
2711 specs: &[ComponentSpec],
2712 missing: &mut HashSet<String>,
2713 into: &mut HashMap<String, PackComponent>,
2714) -> Result<()> {
2715 for spec in specs {
2716 if !missing.contains(&spec.id) {
2717 continue;
2718 }
2719 let path = component_path_for_spec(root, spec);
2720 if !path.exists() {
2721 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2722 continue;
2723 }
2724 let bytes = std::fs::read(&path)
2725 .with_context(|| format!("failed to read component {}", path.display()))?;
2726 let component = compile_component_with_cache(cache, engine, None, bytes)
2727 .await
2728 .with_context(|| {
2729 format!(
2730 "failed to compile component {} from {}",
2731 spec.id,
2732 path.display()
2733 )
2734 })?;
2735 into.insert(
2736 spec.id.clone(),
2737 PackComponent {
2738 name: spec.id.clone(),
2739 version: spec.version.clone(),
2740 component,
2741 },
2742 );
2743 missing.remove(&spec.id);
2744 }
2745 Ok(())
2746}
2747
2748async fn load_components_from_archive(
2749 cache: &CacheManager,
2750 engine: &Engine,
2751 path: &Path,
2752 specs: &[ComponentSpec],
2753 missing: &mut HashSet<String>,
2754 into: &mut HashMap<String, PackComponent>,
2755) -> Result<()> {
2756 let mut archive = ZipArchive::new(File::open(path)?)
2757 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2758 for spec in specs {
2759 if !missing.contains(&spec.id) {
2760 continue;
2761 }
2762 let file_name = spec
2763 .legacy_path
2764 .clone()
2765 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2766 let bytes = match read_entry(&mut archive, &file_name) {
2767 Ok(bytes) => bytes,
2768 Err(err) => {
2769 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2770 continue;
2771 }
2772 };
2773 let component = compile_component_with_cache(cache, engine, None, bytes)
2774 .await
2775 .with_context(|| format!("failed to compile component {}", spec.id))?;
2776 into.insert(
2777 spec.id.clone(),
2778 PackComponent {
2779 name: spec.id.clone(),
2780 version: spec.version.clone(),
2781 component,
2782 },
2783 );
2784 missing.remove(&spec.id);
2785 }
2786 Ok(())
2787}
2788
2789#[cfg(test)]
2790mod tests {
2791 use super::*;
2792 use greentic_flow::model::{FlowDoc, NodeDoc};
2793 use indexmap::IndexMap;
2794 use serde_json::json;
2795
2796 #[test]
2797 fn normalizes_raw_component_to_component_exec() {
2798 let mut nodes = IndexMap::new();
2799 let mut raw = IndexMap::new();
2800 raw.insert(
2801 "templating.handlebars".into(),
2802 json!({ "template": "Hi {{name}}" }),
2803 );
2804 nodes.insert(
2805 "start".into(),
2806 NodeDoc {
2807 raw,
2808 routing: json!([{"out": true}]),
2809 ..Default::default()
2810 },
2811 );
2812 let doc = FlowDoc {
2813 id: "welcome".into(),
2814 title: None,
2815 description: None,
2816 flow_type: "messaging".into(),
2817 start: Some("start".into()),
2818 parameters: json!({}),
2819 tags: Vec::new(),
2820 schema_version: None,
2821 entrypoints: IndexMap::new(),
2822 nodes,
2823 };
2824
2825 let normalized = normalize_flow_doc(doc);
2826 let node = normalized.nodes.get("start").expect("node exists");
2827 assert_eq!(node.operation.as_deref(), Some("component.exec"));
2828 assert!(node.raw.is_empty());
2829 let payload = node.payload.as_object().expect("payload object");
2830 assert_eq!(
2831 payload.get("component"),
2832 Some(&Value::String("templating.handlebars".into()))
2833 );
2834 assert_eq!(
2835 payload.get("operation"),
2836 Some(&Value::String("render".into()))
2837 );
2838 let input = payload.get("input").unwrap();
2839 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2840 }
2841}
2842
2843#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2844pub struct PackMetadata {
2845 pub pack_id: String,
2846 pub version: String,
2847 #[serde(default)]
2848 pub entry_flows: Vec<String>,
2849 #[serde(default)]
2850 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2851}
2852
2853impl PackMetadata {
2854 fn from_wasm(bytes: &[u8]) -> Option<Self> {
2855 let parser = Parser::new(0);
2856 for payload in parser.parse_all(bytes) {
2857 let payload = payload.ok()?;
2858 match payload {
2859 Payload::CustomSection(section) => {
2860 if section.name() == "greentic.manifest"
2861 && let Ok(meta) = Self::from_bytes(section.data())
2862 {
2863 return Some(meta);
2864 }
2865 }
2866 Payload::DataSection(reader) => {
2867 for segment in reader.into_iter().flatten() {
2868 if let Ok(meta) = Self::from_bytes(segment.data) {
2869 return Some(meta);
2870 }
2871 }
2872 }
2873 _ => {}
2874 }
2875 }
2876 None
2877 }
2878
2879 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2880 #[derive(Deserialize)]
2881 struct RawManifest {
2882 pack_id: String,
2883 version: String,
2884 #[serde(default)]
2885 entry_flows: Vec<String>,
2886 #[serde(default)]
2887 flows: Vec<RawFlow>,
2888 #[serde(default)]
2889 secret_requirements: Vec<greentic_types::SecretRequirement>,
2890 }
2891
2892 #[derive(Deserialize)]
2893 struct RawFlow {
2894 id: String,
2895 }
2896
2897 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2898 let mut entry_flows = if manifest.entry_flows.is_empty() {
2899 manifest.flows.iter().map(|f| f.id.clone()).collect()
2900 } else {
2901 manifest.entry_flows.clone()
2902 };
2903 entry_flows.retain(|id| !id.is_empty());
2904 Ok(Self {
2905 pack_id: manifest.pack_id,
2906 version: manifest.version,
2907 entry_flows,
2908 secret_requirements: manifest.secret_requirements,
2909 })
2910 }
2911
2912 pub fn fallback(path: &Path) -> Self {
2913 let pack_id = path
2914 .file_stem()
2915 .map(|s| s.to_string_lossy().into_owned())
2916 .unwrap_or_else(|| "unknown-pack".to_string());
2917 Self {
2918 pack_id,
2919 version: "0.0.0".to_string(),
2920 entry_flows: Vec::new(),
2921 secret_requirements: Vec::new(),
2922 }
2923 }
2924
2925 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2926 let entry_flows = manifest
2927 .flows
2928 .iter()
2929 .map(|flow| flow.id.as_str().to_string())
2930 .collect::<Vec<_>>();
2931 Self {
2932 pack_id: manifest.pack_id.as_str().to_string(),
2933 version: manifest.version.to_string(),
2934 entry_flows,
2935 secret_requirements: manifest.secret_requirements.clone(),
2936 }
2937 }
2938}