1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::{
10 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
13use crate::provider::{ProviderBinding, ProviderRegistry};
14use crate::provider_core::SchemaCorePre as ProviderComponentPre;
15use crate::provider_core_only;
16use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
17use anyhow::{Context, Result, anyhow, bail};
18use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20 self as host_v1, HostFns, add_all_v1_to_linker,
21 runner_host_http::RunnerHostHttp,
22 runner_host_kv::RunnerHostKv,
23 secrets_store::{SecretsError, SecretsStoreHost},
24 state_store::{
25 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26 StateStoreHost, TenantCtx as StateTenantCtx,
27 },
28 telemetry_logger::{
29 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31 TenantCtx as TelemetryTenantCtx,
32 },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::flow::FlowHasher;
38use greentic_types::{
39 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
40 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
41 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
42 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
43 pack_manifest::ExtensionInline,
44};
45use host_v1::http_client::{
46 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
47 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
48 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
49 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
50};
51use indexmap::IndexMap;
52use once_cell::sync::Lazy;
53use parking_lot::{Mutex, RwLock};
54use reqwest::blocking::Client as BlockingClient;
55use runner_core::normalize_under_root;
56use serde::{Deserialize, Serialize};
57use serde_cbor;
58use serde_json::{self, Value};
59use sha2::Digest;
60use tokio::fs;
61use wasmparser::{Parser, Payload};
62use wasmtime::StoreContextMut;
63use zip::ZipArchive;
64
65use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
66use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
67use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
68
69use crate::config::HostConfig;
70use crate::secrets::{DynSecretsManager, read_secret_blocking};
71use crate::storage::state::STATE_PREFIX;
72use crate::storage::{DynSessionStore, DynStateStore};
73use crate::verify;
74use crate::wasi::RunnerWasiPolicy;
75use tracing::warn;
76use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
77use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
78
79use greentic_flow::model::FlowDoc;
80
81#[allow(dead_code)]
82pub struct PackRuntime {
83 path: PathBuf,
85 archive_path: Option<PathBuf>,
87 config: Arc<HostConfig>,
88 engine: Engine,
89 metadata: PackMetadata,
90 manifest: Option<greentic_types::PackManifest>,
91 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
92 component_manifests: HashMap<String, ComponentManifest>,
93 mocks: Option<Arc<MockLayer>>,
94 flows: Option<PackFlows>,
95 components: HashMap<String, PackComponent>,
96 http_client: Arc<BlockingClient>,
97 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
98 session_store: Option<DynSessionStore>,
99 state_store: Option<DynStateStore>,
100 wasi_policy: Arc<RunnerWasiPolicy>,
101 provider_registry: RwLock<Option<ProviderRegistry>>,
102 secrets: DynSecretsManager,
103 oauth_config: Option<OAuthBrokerConfig>,
104}
105
106struct PackComponent {
107 #[allow(dead_code)]
108 name: String,
109 #[allow(dead_code)]
110 version: String,
111 component: Component,
112}
113
114#[derive(Debug, Default, Clone)]
115pub struct ComponentResolution {
116 pub materialized_root: Option<PathBuf>,
118 pub overrides: HashMap<String, PathBuf>,
120 pub dist_offline: bool,
122 pub dist_cache_dir: Option<PathBuf>,
124 pub allow_missing_hash: bool,
126}
127
128fn build_blocking_client() -> BlockingClient {
129 std::thread::spawn(|| {
130 BlockingClient::builder()
131 .no_proxy()
132 .build()
133 .expect("blocking client")
134 })
135 .join()
136 .expect("client build thread panicked")
137}
138
139fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
140 let (root, candidate) = if path.is_absolute() {
141 let parent = path
142 .parent()
143 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
144 let root = parent
145 .canonicalize()
146 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
147 let file = path
148 .file_name()
149 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
150 (root, PathBuf::from(file))
151 } else {
152 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
153 let base = if let Some(parent) = path.parent() {
154 cwd.join(parent)
155 } else {
156 cwd
157 };
158 let root = base
159 .canonicalize()
160 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
161 let file = path
162 .file_name()
163 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
164 (root, PathBuf::from(file))
165 };
166 let safe = normalize_under_root(&root, &candidate)?;
167 Ok((root, safe))
168}
169
170static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct FlowDescriptor {
174 pub id: String,
175 #[serde(rename = "type")]
176 pub flow_type: String,
177 pub profile: String,
178 pub version: String,
179 #[serde(default)]
180 pub description: Option<String>,
181}
182
183pub struct HostState {
184 config: Arc<HostConfig>,
185 http_client: Arc<BlockingClient>,
186 default_env: String,
187 #[allow(dead_code)]
188 session_store: Option<DynSessionStore>,
189 state_store: Option<DynStateStore>,
190 mocks: Option<Arc<MockLayer>>,
191 secrets: DynSecretsManager,
192 oauth_config: Option<OAuthBrokerConfig>,
193 oauth_host: OAuthBrokerHost,
194 exec_ctx: Option<ComponentExecCtx>,
195}
196
197impl HostState {
198 #[allow(clippy::default_constructed_unit_structs)]
199 #[allow(clippy::too_many_arguments)]
200 pub fn new(
201 config: Arc<HostConfig>,
202 http_client: Arc<BlockingClient>,
203 mocks: Option<Arc<MockLayer>>,
204 session_store: Option<DynSessionStore>,
205 state_store: Option<DynStateStore>,
206 secrets: DynSecretsManager,
207 oauth_config: Option<OAuthBrokerConfig>,
208 exec_ctx: Option<ComponentExecCtx>,
209 ) -> Result<Self> {
210 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
211 Ok(Self {
212 config,
213 http_client,
214 default_env,
215 session_store,
216 state_store,
217 mocks,
218 secrets,
219 oauth_config,
220 oauth_host: OAuthBrokerHost::default(),
221 exec_ctx,
222 })
223 }
224
225 pub fn get_secret(&self, key: &str) -> Result<String> {
226 if provider_core_only::is_enabled() {
227 bail!(provider_core_only::blocked_message("secrets"))
228 }
229 if !self.config.secrets_policy.is_allowed(key) {
230 bail!("secret {key} is not permitted by bindings policy");
231 }
232 if let Some(mock) = &self.mocks
233 && let Some(value) = mock.secrets_lookup(key)
234 {
235 return Ok(value);
236 }
237 let bytes = read_secret_blocking(&self.secrets, key)
238 .context("failed to read secret from manager")?;
239 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
240 Ok(value)
241 }
242
243 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
244 let tenant_raw = ctx
245 .as_ref()
246 .map(|ctx| ctx.tenant.clone())
247 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
248 .unwrap_or_else(|| self.config.tenant.clone());
249 let env_raw = ctx
250 .as_ref()
251 .map(|ctx| ctx.env.clone())
252 .unwrap_or_else(|| self.default_env.clone());
253 let tenant_id = TenantId::from_str(&tenant_raw)
254 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
255 let env_id = EnvId::from_str(&env_raw)
256 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
257 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
258 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
259 if let Some(team) = exec_ctx.tenant.team.as_ref() {
260 let team_id =
261 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
262 tenant_ctx = tenant_ctx.with_team(Some(team_id));
263 }
264 if let Some(user) = exec_ctx.tenant.user.as_ref() {
265 let user_id =
266 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
267 tenant_ctx = tenant_ctx.with_user(Some(user_id));
268 }
269 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
270 if let Some(node) = exec_ctx.node_id.as_ref() {
271 tenant_ctx = tenant_ctx.with_node(node.clone());
272 }
273 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
274 tenant_ctx = tenant_ctx.with_session(session.clone());
275 }
276 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
277 }
278
279 if let Some(ctx) = ctx {
280 if let Some(team) = ctx.team.or(ctx.team_id) {
281 let team_id =
282 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
283 tenant_ctx = tenant_ctx.with_team(Some(team_id));
284 }
285 if let Some(user) = ctx.user.or(ctx.user_id) {
286 let user_id =
287 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
288 tenant_ctx = tenant_ctx.with_user(Some(user_id));
289 }
290 if let Some(flow) = ctx.flow_id {
291 tenant_ctx = tenant_ctx.with_flow(flow);
292 }
293 if let Some(node) = ctx.node_id {
294 tenant_ctx = tenant_ctx.with_node(node);
295 }
296 if let Some(provider) = ctx.provider_id {
297 tenant_ctx = tenant_ctx.with_provider(provider);
298 }
299 if let Some(session) = ctx.session_id {
300 tenant_ctx = tenant_ctx.with_session(session);
301 }
302 tenant_ctx.trace_id = ctx.trace_id;
303 }
304 Ok(tenant_ctx)
305 }
306
307 fn send_http_request(
308 &mut self,
309 req: HttpRequest,
310 opts: Option<HttpRequestOptionsV1_1>,
311 _ctx: Option<HttpTenantCtx>,
312 ) -> Result<HttpResponse, HttpClientError> {
313 if !self.config.http_enabled {
314 return Err(HttpClientError {
315 code: "denied".into(),
316 message: "http client disabled by policy".into(),
317 });
318 }
319
320 let mut mock_state = None;
321 let raw_body = req.body.clone();
322 if let Some(mock) = &self.mocks
323 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
324 {
325 match mock.http_begin(&meta) {
326 HttpDecision::Mock(response) => {
327 let headers = response
328 .headers
329 .iter()
330 .map(|(k, v)| (k.clone(), v.clone()))
331 .collect();
332 return Ok(HttpResponse {
333 status: response.status,
334 headers,
335 body: response.body.clone().map(|b| b.into_bytes()),
336 });
337 }
338 HttpDecision::Deny(reason) => {
339 return Err(HttpClientError {
340 code: "denied".into(),
341 message: reason,
342 });
343 }
344 HttpDecision::Passthrough { record } => {
345 mock_state = Some((meta, record));
346 }
347 }
348 }
349
350 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
351 let mut builder = self.http_client.request(method, &req.url);
352 for (key, value) in req.headers {
353 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
354 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
355 {
356 builder = builder.header(header, header_value);
357 }
358 }
359
360 if let Some(body) = raw_body.clone() {
361 builder = builder.body(body);
362 }
363
364 if let Some(opts) = opts {
365 if let Some(timeout_ms) = opts.timeout_ms {
366 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
367 }
368 if opts.allow_insecure == Some(true) {
369 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
370 }
371 if let Some(follow_redirects) = opts.follow_redirects
372 && !follow_redirects
373 {
374 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
375 }
376 }
377
378 let response = match builder.send() {
379 Ok(resp) => resp,
380 Err(err) => {
381 warn!(url = %req.url, error = %err, "http client request failed");
382 return Err(HttpClientError {
383 code: "unavailable".into(),
384 message: err.to_string(),
385 });
386 }
387 };
388
389 let status = response.status().as_u16();
390 let headers_vec = response
391 .headers()
392 .iter()
393 .map(|(k, v)| {
394 (
395 k.as_str().to_string(),
396 v.to_str().unwrap_or_default().to_string(),
397 )
398 })
399 .collect::<Vec<_>>();
400 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
401
402 if let Some((meta, true)) = mock_state.take()
403 && let Some(mock) = &self.mocks
404 {
405 let recorded = HttpMockResponse::new(
406 status,
407 headers_vec.clone().into_iter().collect(),
408 body_bytes
409 .as_ref()
410 .map(|b| String::from_utf8_lossy(b).into_owned()),
411 );
412 mock.http_record(&meta, &recorded);
413 }
414
415 Ok(HttpResponse {
416 status,
417 headers: headers_vec,
418 body: body_bytes,
419 })
420 }
421}
422
423impl SecretsStoreHost for HostState {
424 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
425 if provider_core_only::is_enabled() {
426 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
427 return Err(SecretsError::Denied);
428 }
429 if !self.config.secrets_policy.is_allowed(&key) {
430 return Err(SecretsError::Denied);
431 }
432 if let Some(mock) = &self.mocks
433 && let Some(value) = mock.secrets_lookup(&key)
434 {
435 return Ok(Some(value.into_bytes()));
436 }
437 match read_secret_blocking(&self.secrets, &key) {
438 Ok(bytes) => Ok(Some(bytes)),
439 Err(err) => {
440 warn!(secret = %key, error = %err, "secret lookup failed");
441 Err(SecretsError::NotFound)
442 }
443 }
444 }
445}
446
447impl HttpClientHost for HostState {
448 fn send(
449 &mut self,
450 req: HttpRequest,
451 ctx: Option<HttpTenantCtx>,
452 ) -> Result<HttpResponse, HttpClientError> {
453 self.send_http_request(req, None, ctx)
454 }
455}
456
457impl HttpClientHostV1_1 for HostState {
458 fn send(
459 &mut self,
460 req: HttpRequestV1_1,
461 opts: Option<HttpRequestOptionsV1_1>,
462 ctx: Option<HttpTenantCtxV1_1>,
463 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
464 let legacy_req = HttpRequest {
465 method: req.method,
466 url: req.url,
467 headers: req.headers,
468 body: req.body,
469 };
470 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
471 env: ctx.env,
472 tenant: ctx.tenant,
473 tenant_id: ctx.tenant_id,
474 team: ctx.team,
475 team_id: ctx.team_id,
476 user: ctx.user,
477 user_id: ctx.user_id,
478 trace_id: ctx.trace_id,
479 correlation_id: ctx.correlation_id,
480 attributes: ctx.attributes,
481 session_id: ctx.session_id,
482 flow_id: ctx.flow_id,
483 node_id: ctx.node_id,
484 provider_id: ctx.provider_id,
485 deadline_ms: ctx.deadline_ms,
486 attempt: ctx.attempt,
487 idempotency_key: ctx.idempotency_key,
488 impersonation: ctx
489 .impersonation
490 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
491 actor_id,
492 reason,
493 }),
494 });
495
496 self.send_http_request(legacy_req, opts, legacy_ctx)
497 .map(|resp| HttpResponseV1_1 {
498 status: resp.status,
499 headers: resp.headers,
500 body: resp.body,
501 })
502 .map_err(|err| HttpClientErrorV1_1 {
503 code: err.code,
504 message: err.message,
505 })
506 }
507}
508
509impl StateStoreHost for HostState {
510 fn read(
511 &mut self,
512 key: HostStateKey,
513 ctx: Option<StateTenantCtx>,
514 ) -> Result<Vec<u8>, StateError> {
515 let store = match self.state_store.as_ref() {
516 Some(store) => store.clone(),
517 None => {
518 return Err(StateError {
519 code: "unavailable".into(),
520 message: "state store not configured".into(),
521 });
522 }
523 };
524 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
525 Ok(ctx) => ctx,
526 Err(err) => {
527 return Err(StateError {
528 code: "invalid-ctx".into(),
529 message: err.to_string(),
530 });
531 }
532 };
533 let key = StoreStateKey::from(key);
534 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
535 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
536 Ok(None) => Err(StateError {
537 code: "not_found".into(),
538 message: "state key not found".into(),
539 }),
540 Err(err) => Err(StateError {
541 code: "internal".into(),
542 message: err.to_string(),
543 }),
544 }
545 }
546
547 fn write(
548 &mut self,
549 key: HostStateKey,
550 bytes: Vec<u8>,
551 ctx: Option<StateTenantCtx>,
552 ) -> Result<StateOpAck, StateError> {
553 let store = match self.state_store.as_ref() {
554 Some(store) => store.clone(),
555 None => {
556 return Err(StateError {
557 code: "unavailable".into(),
558 message: "state store not configured".into(),
559 });
560 }
561 };
562 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
563 Ok(ctx) => ctx,
564 Err(err) => {
565 return Err(StateError {
566 code: "invalid-ctx".into(),
567 message: err.to_string(),
568 });
569 }
570 };
571 let key = StoreStateKey::from(key);
572 let value = serde_json::from_slice(&bytes)
573 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
574 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
575 Ok(()) => Ok(StateOpAck::Ok),
576 Err(err) => Err(StateError {
577 code: "internal".into(),
578 message: err.to_string(),
579 }),
580 }
581 }
582
583 fn delete(
584 &mut self,
585 key: HostStateKey,
586 ctx: Option<StateTenantCtx>,
587 ) -> Result<StateOpAck, StateError> {
588 let store = match self.state_store.as_ref() {
589 Some(store) => store.clone(),
590 None => {
591 return Err(StateError {
592 code: "unavailable".into(),
593 message: "state store not configured".into(),
594 });
595 }
596 };
597 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
598 Ok(ctx) => ctx,
599 Err(err) => {
600 return Err(StateError {
601 code: "invalid-ctx".into(),
602 message: err.to_string(),
603 });
604 }
605 };
606 let key = StoreStateKey::from(key);
607 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
608 Ok(_) => Ok(StateOpAck::Ok),
609 Err(err) => Err(StateError {
610 code: "internal".into(),
611 message: err.to_string(),
612 }),
613 }
614 }
615}
616
617impl TelemetryLoggerHost for HostState {
618 fn log(
619 &mut self,
620 span: TelemetrySpanContext,
621 fields: Vec<(String, String)>,
622 _ctx: Option<TelemetryTenantCtx>,
623 ) -> Result<TelemetryAck, TelemetryError> {
624 if let Some(mock) = &self.mocks
625 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
626 {
627 return Ok(TelemetryAck::Ok);
628 }
629 let mut map = serde_json::Map::new();
630 for (k, v) in fields {
631 map.insert(k, Value::String(v));
632 }
633 tracing::info!(
634 tenant = %span.tenant,
635 flow_id = %span.flow_id,
636 node = ?span.node_id,
637 provider = %span.provider,
638 fields = %serde_json::Value::Object(map.clone()),
639 "telemetry log from pack"
640 );
641 Ok(TelemetryAck::Ok)
642 }
643}
644
645impl RunnerHostHttp for HostState {
646 fn request(
647 &mut self,
648 method: String,
649 url: String,
650 headers: Vec<String>,
651 body: Option<Vec<u8>>,
652 ) -> Result<Vec<u8>, String> {
653 let req = HttpRequest {
654 method,
655 url,
656 headers: headers
657 .chunks(2)
658 .filter_map(|chunk| {
659 if chunk.len() == 2 {
660 Some((chunk[0].clone(), chunk[1].clone()))
661 } else {
662 None
663 }
664 })
665 .collect(),
666 body,
667 };
668 match HttpClientHost::send(self, req, None) {
669 Ok(resp) => Ok(resp.body.unwrap_or_default()),
670 Err(err) => Err(err.message),
671 }
672 }
673}
674
675impl RunnerHostKv for HostState {
676 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
677 None
678 }
679
680 fn put(&mut self, _ns: String, _key: String, _val: String) {}
681}
682
683enum ManifestLoad {
684 New {
685 manifest: Box<greentic_types::PackManifest>,
686 flows: PackFlows,
687 },
688 Legacy {
689 manifest: Box<legacy_pack::PackManifest>,
690 flows: PackFlows,
691 },
692}
693
694fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
695 let mut archive = ZipArchive::new(File::open(path)?)
696 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
697 let bytes = read_entry(&mut archive, "manifest.cbor")
698 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
699 match decode_pack_manifest(&bytes) {
700 Ok(manifest) => {
701 let cache = PackFlows::from_manifest(manifest.clone());
702 Ok(ManifestLoad::New {
703 manifest: Box::new(manifest),
704 flows: cache,
705 })
706 }
707 Err(err) => {
708 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
709 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
711 .context("failed to decode legacy pack manifest from manifest.cbor")?;
712 let flows = load_legacy_flows(&mut archive, &legacy)?;
713 Ok(ManifestLoad::Legacy {
714 manifest: Box::new(legacy),
715 flows,
716 })
717 }
718 }
719}
720
721fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
722 let manifest_path = root.join("manifest.cbor");
723 let bytes = std::fs::read(&manifest_path)
724 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
725 match decode_pack_manifest(&bytes) {
726 Ok(manifest) => {
727 let cache = PackFlows::from_manifest(manifest.clone());
728 Ok(ManifestLoad::New {
729 manifest: Box::new(manifest),
730 flows: cache,
731 })
732 }
733 Err(err) => {
734 tracing::debug!(
735 error = %err,
736 pack = %root.display(),
737 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
738 );
739 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
740 .context("failed to decode legacy pack manifest from manifest.cbor")?;
741 let flows = load_legacy_flows_from_dir(root, &legacy)?;
742 Ok(ManifestLoad::Legacy {
743 manifest: Box::new(legacy),
744 flows,
745 })
746 }
747 }
748}
749
750fn load_legacy_flows(
751 archive: &mut ZipArchive<File>,
752 manifest: &legacy_pack::PackManifest,
753) -> Result<PackFlows> {
754 let mut flows = HashMap::new();
755 let mut descriptors = Vec::new();
756
757 for entry in &manifest.flows {
758 let bytes = read_entry(archive, &entry.file_json)
759 .with_context(|| format!("missing flow json {}", entry.file_json))?;
760 let doc: FlowDoc = serde_json::from_slice(&bytes)
761 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
762 let normalized = normalize_flow_doc(doc);
763 let flow_ir = flow_doc_to_ir(normalized)?;
764 let flow = flow_ir_to_flow(flow_ir)?;
765
766 descriptors.push(FlowDescriptor {
767 id: entry.id.clone(),
768 flow_type: entry.kind.clone(),
769 profile: manifest.meta.pack_id.clone(),
770 version: manifest.meta.version.to_string(),
771 description: None,
772 });
773 flows.insert(entry.id.clone(), flow);
774 }
775
776 let mut entry_flows = manifest.meta.entry_flows.clone();
777 if entry_flows.is_empty() {
778 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
779 }
780 let metadata = PackMetadata {
781 pack_id: manifest.meta.pack_id.clone(),
782 version: manifest.meta.version.to_string(),
783 entry_flows,
784 secret_requirements: Vec::new(),
785 };
786
787 Ok(PackFlows {
788 descriptors,
789 flows,
790 metadata,
791 })
792}
793
794fn load_legacy_flows_from_dir(
795 root: &Path,
796 manifest: &legacy_pack::PackManifest,
797) -> Result<PackFlows> {
798 let mut flows = HashMap::new();
799 let mut descriptors = Vec::new();
800
801 for entry in &manifest.flows {
802 let path = root.join(&entry.file_json);
803 let bytes = std::fs::read(&path)
804 .with_context(|| format!("missing flow json {}", path.display()))?;
805 let doc: FlowDoc = serde_json::from_slice(&bytes)
806 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
807 let normalized = normalize_flow_doc(doc);
808 let flow_ir = flow_doc_to_ir(normalized)?;
809 let flow = flow_ir_to_flow(flow_ir)?;
810
811 descriptors.push(FlowDescriptor {
812 id: entry.id.clone(),
813 flow_type: entry.kind.clone(),
814 profile: manifest.meta.pack_id.clone(),
815 version: manifest.meta.version.to_string(),
816 description: None,
817 });
818 flows.insert(entry.id.clone(), flow);
819 }
820
821 let mut entry_flows = manifest.meta.entry_flows.clone();
822 if entry_flows.is_empty() {
823 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
824 }
825 let metadata = PackMetadata {
826 pack_id: manifest.meta.pack_id.clone(),
827 version: manifest.meta.version.to_string(),
828 entry_flows,
829 secret_requirements: Vec::new(),
830 };
831
832 Ok(PackFlows {
833 descriptors,
834 flows,
835 metadata,
836 })
837}
838
839pub struct ComponentState {
840 pub host: HostState,
841 wasi_ctx: WasiCtx,
842 resource_table: ResourceTable,
843}
844
845impl ComponentState {
846 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
847 let wasi_ctx = policy
848 .instantiate()
849 .context("failed to build WASI context")?;
850 Ok(Self {
851 host,
852 wasi_ctx,
853 resource_table: ResourceTable::new(),
854 })
855 }
856
857 fn host_mut(&mut self) -> &mut HostState {
858 &mut self.host
859 }
860
861 fn should_cancel_host(&mut self) -> bool {
862 false
863 }
864
865 fn yield_now_host(&mut self) {
866 }
868}
869
870impl component_api::v0_4::greentic::component::control::Host for ComponentState {
871 fn should_cancel(&mut self) -> bool {
872 self.should_cancel_host()
873 }
874
875 fn yield_now(&mut self) {
876 self.yield_now_host();
877 }
878}
879
880impl component_api::v0_5::greentic::component::control::Host for ComponentState {
881 fn should_cancel(&mut self) -> bool {
882 self.should_cancel_host()
883 }
884
885 fn yield_now(&mut self) {
886 self.yield_now_host();
887 }
888}
889
890fn add_component_control_instance(
891 linker: &mut Linker<ComponentState>,
892 name: &str,
893) -> wasmtime::Result<()> {
894 let mut inst = linker.instance(name)?;
895 inst.func_wrap(
896 "should-cancel",
897 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
898 let host = caller.data_mut();
899 Ok((host.should_cancel_host(),))
900 },
901 )?;
902 inst.func_wrap(
903 "yield-now",
904 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
905 let host = caller.data_mut();
906 host.yield_now_host();
907 Ok(())
908 },
909 )?;
910 Ok(())
911}
912
913fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
914 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
915 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
916 Ok(())
917}
918
919pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
920 add_wasi_to_linker(linker)?;
921 add_all_v1_to_linker(
922 linker,
923 HostFns {
924 http_client_v1_1: Some(|state| state.host_mut()),
925 http_client: Some(|state| state.host_mut()),
926 oauth_broker: None,
927 runner_host_http: Some(|state| state.host_mut()),
928 runner_host_kv: Some(|state| state.host_mut()),
929 telemetry_logger: Some(|state| state.host_mut()),
930 state_store: allow_state_store.then_some(|state| state.host_mut()),
931 secrets_store: Some(|state| state.host_mut()),
932 },
933 )?;
934 Ok(())
935}
936
937impl OAuthHostContext for ComponentState {
938 fn tenant_id(&self) -> &str {
939 &self.host.config.tenant
940 }
941
942 fn env(&self) -> &str {
943 &self.host.default_env
944 }
945
946 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
947 &mut self.host.oauth_host
948 }
949
950 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
951 self.host.oauth_config.as_ref()
952 }
953}
954
955impl WasiView for ComponentState {
956 fn ctx(&mut self) -> WasiCtxView<'_> {
957 WasiCtxView {
958 ctx: &mut self.wasi_ctx,
959 table: &mut self.resource_table,
960 }
961 }
962}
963
964#[allow(unsafe_code)]
965unsafe impl Send for ComponentState {}
966#[allow(unsafe_code)]
967unsafe impl Sync for ComponentState {}
968
969impl PackRuntime {
970 fn allows_state_store(&self, component_ref: &str) -> bool {
971 if self.state_store.is_none() {
972 return false;
973 }
974 if !self.config.state_store_policy.allow {
975 return false;
976 }
977 let Some(manifest) = self.component_manifests.get(component_ref) else {
978 return false;
979 };
980 manifest
981 .capabilities
982 .host
983 .state
984 .as_ref()
985 .map(|caps| caps.read || caps.write)
986 .unwrap_or(false)
987 }
988
989 #[allow(clippy::too_many_arguments)]
990 pub async fn load(
991 path: impl AsRef<Path>,
992 config: Arc<HostConfig>,
993 mocks: Option<Arc<MockLayer>>,
994 archive_source: Option<&Path>,
995 session_store: Option<DynSessionStore>,
996 state_store: Option<DynStateStore>,
997 wasi_policy: Arc<RunnerWasiPolicy>,
998 secrets: DynSecretsManager,
999 oauth_config: Option<OAuthBrokerConfig>,
1000 verify_archive: bool,
1001 component_resolution: ComponentResolution,
1002 ) -> Result<Self> {
1003 let path = path.as_ref();
1004 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1005 let path_meta = std::fs::metadata(&safe_path).ok();
1006 let is_dir = path_meta
1007 .as_ref()
1008 .map(|meta| meta.is_dir())
1009 .unwrap_or(false);
1010 let is_component = !is_dir
1011 && safe_path
1012 .extension()
1013 .and_then(|ext| ext.to_str())
1014 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1015 .unwrap_or(false);
1016 let archive_hint_path = if let Some(source) = archive_source {
1017 let (_, normalized) = normalize_pack_path(source)?;
1018 Some(normalized)
1019 } else if is_component || is_dir {
1020 None
1021 } else {
1022 Some(safe_path.clone())
1023 };
1024 let archive_hint = archive_hint_path.as_deref();
1025 if verify_archive {
1026 if let Some(verify_target) = archive_hint.and_then(|p| {
1027 std::fs::metadata(p)
1028 .ok()
1029 .filter(|meta| meta.is_file())
1030 .map(|_| p)
1031 }) {
1032 verify::verify_pack(verify_target).await?;
1033 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1034 } else {
1035 tracing::debug!("skipping archive verification (no archive source)");
1036 }
1037 }
1038 let engine = Engine::default();
1039 let mut metadata = PackMetadata::fallback(&safe_path);
1040 let mut manifest = None;
1041 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1042 let mut flows = None;
1043 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1044 if is_dir {
1045 Some(safe_path.clone())
1046 } else {
1047 None
1048 }
1049 });
1050
1051 if let Some(root) = materialized_root.as_ref() {
1052 match load_manifest_and_flows_from_dir(root) {
1053 Ok(ManifestLoad::New {
1054 manifest: m,
1055 flows: cache,
1056 }) => {
1057 metadata = cache.metadata.clone();
1058 manifest = Some(*m);
1059 flows = Some(cache);
1060 }
1061 Ok(ManifestLoad::Legacy {
1062 manifest: m,
1063 flows: cache,
1064 }) => {
1065 metadata = cache.metadata.clone();
1066 legacy_manifest = Some(m);
1067 flows = Some(cache);
1068 }
1069 Err(err) => {
1070 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1071 }
1072 }
1073 }
1074
1075 if manifest.is_none()
1076 && legacy_manifest.is_none()
1077 && let Some(archive_path) = archive_hint
1078 {
1079 match load_manifest_and_flows(archive_path) {
1080 Ok(ManifestLoad::New {
1081 manifest: m,
1082 flows: cache,
1083 }) => {
1084 metadata = cache.metadata.clone();
1085 manifest = Some(*m);
1086 flows = Some(cache);
1087 }
1088 Ok(ManifestLoad::Legacy {
1089 manifest: m,
1090 flows: cache,
1091 }) => {
1092 metadata = cache.metadata.clone();
1093 legacy_manifest = Some(m);
1094 flows = Some(cache);
1095 }
1096 Err(err) => {
1097 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1098 }
1099 }
1100 }
1101 let mut pack_lock = None;
1102 for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1103 pack_lock = load_pack_lock(&root)?;
1104 if pack_lock.is_some() {
1105 break;
1106 }
1107 }
1108 let component_sources_payload = if pack_lock.is_none() {
1109 if let Some(manifest) = manifest.as_ref() {
1110 manifest
1111 .get_component_sources_v1()
1112 .context("invalid component sources extension")?
1113 } else {
1114 None
1115 }
1116 } else {
1117 None
1118 };
1119 let component_sources = if let Some(lock) = pack_lock.as_ref() {
1120 Some(component_sources_table_from_pack_lock(
1121 lock,
1122 component_resolution.allow_missing_hash,
1123 )?)
1124 } else {
1125 component_sources_table(component_sources_payload.as_ref())?
1126 };
1127 let components = if is_component {
1128 let wasm_bytes = fs::read(&safe_path).await?;
1129 metadata = PackMetadata::from_wasm(&wasm_bytes)
1130 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1131 let name = safe_path
1132 .file_stem()
1133 .map(|s| s.to_string_lossy().to_string())
1134 .unwrap_or_else(|| "component".to_string());
1135 let component = Component::from_binary(&engine, &wasm_bytes)?;
1136 let mut map = HashMap::new();
1137 map.insert(
1138 name.clone(),
1139 PackComponent {
1140 name,
1141 version: metadata.version.clone(),
1142 component,
1143 },
1144 );
1145 map
1146 } else {
1147 let specs = component_specs(
1148 manifest.as_ref(),
1149 legacy_manifest.as_deref(),
1150 component_sources_payload.as_ref(),
1151 pack_lock.as_ref(),
1152 );
1153 if specs.is_empty() {
1154 HashMap::new()
1155 } else {
1156 let mut loaded = HashMap::new();
1157 let mut missing: HashSet<String> =
1158 specs.iter().map(|spec| spec.id.clone()).collect();
1159 let mut searched = Vec::new();
1160
1161 if !component_resolution.overrides.is_empty() {
1162 load_components_from_overrides(
1163 &engine,
1164 &component_resolution.overrides,
1165 &specs,
1166 &mut missing,
1167 &mut loaded,
1168 )?;
1169 searched.push("override map".to_string());
1170 }
1171
1172 if let Some(component_sources) = component_sources.as_ref() {
1173 load_components_from_sources(
1174 &engine,
1175 component_sources,
1176 &component_resolution,
1177 &specs,
1178 &mut missing,
1179 &mut loaded,
1180 materialized_root.as_deref(),
1181 archive_hint,
1182 )
1183 .await?;
1184 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1185 }
1186
1187 if let Some(root) = materialized_root.as_ref() {
1188 load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1189 searched.push(format!("components dir {}", root.display()));
1190 }
1191
1192 if let Some(archive_path) = archive_hint {
1193 load_components_from_archive(
1194 &engine,
1195 archive_path,
1196 &specs,
1197 &mut missing,
1198 &mut loaded,
1199 )?;
1200 searched.push(format!("archive {}", archive_path.display()));
1201 }
1202
1203 if !missing.is_empty() {
1204 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1205 let sources = if searched.is_empty() {
1206 "no component sources".to_string()
1207 } else {
1208 searched.join(", ")
1209 };
1210 bail!(
1211 "components missing: {}; looked in {}",
1212 missing_list,
1213 sources
1214 );
1215 }
1216
1217 loaded
1218 }
1219 };
1220 let http_client = Arc::clone(&HTTP_CLIENT);
1221 let mut component_manifests = HashMap::new();
1222 if let Some(manifest) = manifest.as_ref() {
1223 for component in &manifest.components {
1224 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1225 }
1226 }
1227 Ok(Self {
1228 path: safe_path,
1229 archive_path: archive_hint.map(Path::to_path_buf),
1230 config,
1231 engine,
1232 metadata,
1233 manifest,
1234 legacy_manifest,
1235 component_manifests,
1236 mocks,
1237 flows,
1238 components,
1239 http_client,
1240 pre_cache: Mutex::new(HashMap::new()),
1241 session_store,
1242 state_store,
1243 wasi_policy,
1244 provider_registry: RwLock::new(None),
1245 secrets,
1246 oauth_config,
1247 })
1248 }
1249
1250 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1251 if let Some(cache) = &self.flows {
1252 return Ok(cache.descriptors.clone());
1253 }
1254 if let Some(manifest) = &self.manifest {
1255 let descriptors = manifest
1256 .flows
1257 .iter()
1258 .map(|flow| FlowDescriptor {
1259 id: flow.id.as_str().to_string(),
1260 flow_type: flow_kind_to_str(flow.kind).to_string(),
1261 profile: manifest.pack_id.as_str().to_string(),
1262 version: manifest.version.to_string(),
1263 description: None,
1264 })
1265 .collect();
1266 return Ok(descriptors);
1267 }
1268 Ok(Vec::new())
1269 }
1270
1271 #[allow(dead_code)]
1272 pub async fn run_flow(
1273 &self,
1274 flow_id: &str,
1275 input: serde_json::Value,
1276 ) -> Result<serde_json::Value> {
1277 let pack = Arc::new(
1278 PackRuntime::load(
1279 &self.path,
1280 Arc::clone(&self.config),
1281 self.mocks.clone(),
1282 self.archive_path.as_deref(),
1283 self.session_store.clone(),
1284 self.state_store.clone(),
1285 Arc::clone(&self.wasi_policy),
1286 self.secrets.clone(),
1287 self.oauth_config.clone(),
1288 false,
1289 ComponentResolution::default(),
1290 )
1291 .await?,
1292 );
1293
1294 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1295 let retry_config = self.config.retry_config().into();
1296 let mocks = pack.mocks.as_deref();
1297 let tenant = self.config.tenant.as_str();
1298
1299 let ctx = FlowContext {
1300 tenant,
1301 flow_id,
1302 node_id: None,
1303 tool: None,
1304 action: None,
1305 session_id: None,
1306 provider_id: None,
1307 retry_config,
1308 observer: None,
1309 mocks,
1310 };
1311
1312 let execution = engine.execute(ctx, input).await?;
1313 match execution.status {
1314 FlowStatus::Completed => Ok(execution.output),
1315 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1316 "status": "pending",
1317 "reason": wait.reason,
1318 "resume": wait.snapshot,
1319 "response": execution.output,
1320 })),
1321 }
1322 }
1323
1324 pub async fn invoke_component(
1325 &self,
1326 component_ref: &str,
1327 ctx: ComponentExecCtx,
1328 operation: &str,
1329 _config_json: Option<String>,
1330 input_json: String,
1331 ) -> Result<Value> {
1332 let pack_component = self
1333 .components
1334 .get(component_ref)
1335 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1336
1337 let mut linker = Linker::new(&self.engine);
1338 let allow_state_store = self.allows_state_store(component_ref);
1339 register_all(&mut linker, allow_state_store)?;
1340 add_component_control_to_linker(&mut linker)?;
1341
1342 let host_state = HostState::new(
1343 Arc::clone(&self.config),
1344 Arc::clone(&self.http_client),
1345 self.mocks.clone(),
1346 self.session_store.clone(),
1347 self.state_store.clone(),
1348 Arc::clone(&self.secrets),
1349 self.oauth_config.clone(),
1350 Some(ctx.clone()),
1351 )?;
1352 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1353 let mut store = wasmtime::Store::new(&self.engine, store_state);
1354 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1355 let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1356 Ok(pre) => {
1357 let bindings = pre.instantiate_async(&mut store).await?;
1358 let node = bindings.greentic_component_node();
1359 let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1360 let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1361 component_api::invoke_result_from_v0_5(result)
1362 }
1363 Err(err) => {
1364 if is_missing_node_export(&err, "0.5.0") {
1365 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1366 let pre: component_api::v0_4::ComponentPre<ComponentState> =
1367 component_api::v0_4::ComponentPre::new(pre_instance)?;
1368 let bindings = pre.instantiate_async(&mut store).await?;
1369 let node = bindings.greentic_component_node();
1370 let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1371 let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1372 component_api::invoke_result_from_v0_4(result)
1373 } else {
1374 return Err(err);
1375 }
1376 }
1377 };
1378
1379 match result {
1380 InvokeResult::Ok(body) => {
1381 if body.is_empty() {
1382 return Ok(Value::Null);
1383 }
1384 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1385 }
1386 InvokeResult::Err(NodeError {
1387 code,
1388 message,
1389 retryable,
1390 backoff_ms,
1391 details,
1392 }) => {
1393 let mut obj = serde_json::Map::new();
1394 obj.insert("ok".into(), Value::Bool(false));
1395 let mut error = serde_json::Map::new();
1396 error.insert("code".into(), Value::String(code));
1397 error.insert("message".into(), Value::String(message));
1398 error.insert("retryable".into(), Value::Bool(retryable));
1399 if let Some(backoff) = backoff_ms {
1400 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1401 }
1402 if let Some(details) = details {
1403 error.insert(
1404 "details".into(),
1405 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1406 );
1407 }
1408 obj.insert("error".into(), Value::Object(error));
1409 Ok(Value::Object(obj))
1410 }
1411 }
1412 }
1413
1414 pub fn resolve_provider(
1415 &self,
1416 provider_id: Option<&str>,
1417 provider_type: Option<&str>,
1418 ) -> Result<ProviderBinding> {
1419 let registry = self.provider_registry()?;
1420 registry.resolve(provider_id, provider_type)
1421 }
1422
1423 pub async fn invoke_provider(
1424 &self,
1425 binding: &ProviderBinding,
1426 ctx: ComponentExecCtx,
1427 op: &str,
1428 input_json: Vec<u8>,
1429 ) -> Result<Value> {
1430 let component_ref = &binding.component_ref;
1431 let pack_component = self
1432 .components
1433 .get(component_ref)
1434 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1435
1436 let mut linker = Linker::new(&self.engine);
1437 let allow_state_store = self.allows_state_store(component_ref);
1438 register_all(&mut linker, allow_state_store)?;
1439 add_component_control_to_linker(&mut linker)?;
1440 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1441 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1442
1443 let host_state = HostState::new(
1444 Arc::clone(&self.config),
1445 Arc::clone(&self.http_client),
1446 self.mocks.clone(),
1447 self.session_store.clone(),
1448 self.state_store.clone(),
1449 Arc::clone(&self.secrets),
1450 self.oauth_config.clone(),
1451 Some(ctx),
1452 )?;
1453 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1454 let mut store = wasmtime::Store::new(&self.engine, store_state);
1455 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1456 let provider = bindings.greentic_provider_core_schema_core_api();
1457
1458 let result = provider.call_invoke(&mut store, op, &input_json)?;
1459 deserialize_json_bytes(result)
1460 }
1461
1462 fn provider_registry(&self) -> Result<ProviderRegistry> {
1463 if let Some(registry) = self.provider_registry.read().clone() {
1464 return Ok(registry);
1465 }
1466 let manifest = self
1467 .manifest
1468 .as_ref()
1469 .context("pack manifest required for provider resolution")?;
1470 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1471 let registry = ProviderRegistry::new(
1472 manifest,
1473 self.state_store.clone(),
1474 &self.config.tenant,
1475 &env,
1476 )?;
1477 *self.provider_registry.write() = Some(registry.clone());
1478 Ok(registry)
1479 }
1480
1481 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1482 if let Some(cache) = &self.flows {
1483 return cache
1484 .flows
1485 .get(flow_id)
1486 .cloned()
1487 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1488 }
1489 if let Some(manifest) = &self.manifest {
1490 let entry = manifest
1491 .flows
1492 .iter()
1493 .find(|f| f.id.as_str() == flow_id)
1494 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1495 return Ok(entry.flow.clone());
1496 }
1497 bail!("flow '{flow_id}' not available (pack exports disabled)")
1498 }
1499
1500 pub fn metadata(&self) -> &PackMetadata {
1501 &self.metadata
1502 }
1503
1504 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1505 &self.metadata.secret_requirements
1506 }
1507
1508 pub fn missing_secrets(
1509 &self,
1510 tenant_ctx: &TypesTenantCtx,
1511 ) -> Vec<greentic_types::SecretRequirement> {
1512 let env = tenant_ctx.env.as_str().to_string();
1513 let tenant = tenant_ctx.tenant.as_str().to_string();
1514 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1515 self.required_secrets()
1516 .iter()
1517 .filter(|req| {
1518 if let Some(scope) = &req.scope {
1520 if scope.env != env {
1521 return false;
1522 }
1523 if scope.tenant != tenant {
1524 return false;
1525 }
1526 if let Some(ref team_req) = scope.team
1527 && team.as_ref() != Some(team_req)
1528 {
1529 return false;
1530 }
1531 }
1532 read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1533 })
1534 .cloned()
1535 .collect()
1536 }
1537
1538 pub fn for_component_test(
1539 components: Vec<(String, PathBuf)>,
1540 flows: HashMap<String, FlowIR>,
1541 config: Arc<HostConfig>,
1542 ) -> Result<Self> {
1543 let engine = Engine::default();
1544 let mut component_map = HashMap::new();
1545 for (name, path) in components {
1546 if !path.exists() {
1547 bail!("component artifact missing: {}", path.display());
1548 }
1549 let wasm_bytes = std::fs::read(&path)?;
1550 let component = Component::from_binary(&engine, &wasm_bytes)
1551 .with_context(|| format!("failed to compile component {}", path.display()))?;
1552 component_map.insert(
1553 name.clone(),
1554 PackComponent {
1555 name,
1556 version: "0.0.0".into(),
1557 component,
1558 },
1559 );
1560 }
1561
1562 let mut flow_map = HashMap::new();
1563 let mut descriptors = Vec::new();
1564 for (id, ir) in flows {
1565 let flow_type = ir.flow_type.clone();
1566 let flow = flow_ir_to_flow(ir)?;
1567 flow_map.insert(id.clone(), flow);
1568 descriptors.push(FlowDescriptor {
1569 id: id.clone(),
1570 flow_type,
1571 profile: "test".into(),
1572 version: "0.0.0".into(),
1573 description: None,
1574 });
1575 }
1576 let flows_cache = PackFlows {
1577 descriptors: descriptors.clone(),
1578 flows: flow_map,
1579 metadata: PackMetadata::fallback(Path::new("component-test")),
1580 };
1581
1582 Ok(Self {
1583 path: PathBuf::new(),
1584 archive_path: None,
1585 config,
1586 engine,
1587 metadata: PackMetadata::fallback(Path::new("component-test")),
1588 manifest: None,
1589 legacy_manifest: None,
1590 component_manifests: HashMap::new(),
1591 mocks: None,
1592 flows: Some(flows_cache),
1593 components: component_map,
1594 http_client: Arc::clone(&HTTP_CLIENT),
1595 pre_cache: Mutex::new(HashMap::new()),
1596 session_store: None,
1597 state_store: None,
1598 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1599 provider_registry: RwLock::new(None),
1600 secrets: crate::secrets::default_manager(),
1601 oauth_config: None,
1602 })
1603 }
1604}
1605
1606fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1607 let message = err.to_string();
1608 message.contains("no exported instance named")
1609 && message.contains(&format!("greentic:component/node@{version}"))
1610}
1611
1612struct PackFlows {
1613 descriptors: Vec<FlowDescriptor>,
1614 flows: HashMap<String, Flow>,
1615 metadata: PackMetadata,
1616}
1617
1618const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1619 "greentic.pack.runtime_flow",
1620 "greentic.pack.flow_runtime",
1621 "greentic.pack.runtime_flows",
1622];
1623
1624#[derive(Debug, Deserialize)]
1625struct RuntimeFlowBundle {
1626 flows: Vec<RuntimeFlow>,
1627}
1628
1629#[derive(Debug, Deserialize)]
1630struct RuntimeFlow {
1631 id: String,
1632 #[serde(alias = "flow_type")]
1633 kind: FlowKind,
1634 #[serde(default)]
1635 schema_version: Option<String>,
1636 #[serde(default)]
1637 start: Option<String>,
1638 #[serde(default)]
1639 entrypoints: BTreeMap<String, Value>,
1640 nodes: BTreeMap<String, RuntimeNode>,
1641 #[serde(default)]
1642 metadata: Option<FlowMetadata>,
1643}
1644
1645#[derive(Debug, Deserialize)]
1646struct RuntimeNode {
1647 #[serde(alias = "component")]
1648 component_id: String,
1649 #[serde(default, alias = "operation")]
1650 operation_name: Option<String>,
1651 #[serde(default, alias = "payload", alias = "input")]
1652 operation_payload: Value,
1653 #[serde(default)]
1654 routing: Option<Routing>,
1655 #[serde(default)]
1656 telemetry: Option<TelemetryHints>,
1657}
1658
1659fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1660 if bytes.is_empty() {
1661 return Ok(Value::Null);
1662 }
1663 serde_json::from_slice(&bytes).or_else(|_| {
1664 String::from_utf8(bytes)
1665 .map(Value::String)
1666 .map_err(|err| anyhow!(err))
1667 })
1668}
1669
1670impl PackFlows {
1671 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1672 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1673 return flows;
1674 }
1675 let descriptors = manifest
1676 .flows
1677 .iter()
1678 .map(|entry| FlowDescriptor {
1679 id: entry.id.as_str().to_string(),
1680 flow_type: flow_kind_to_str(entry.kind).to_string(),
1681 profile: manifest.pack_id.as_str().to_string(),
1682 version: manifest.version.to_string(),
1683 description: None,
1684 })
1685 .collect();
1686 let mut flows = HashMap::new();
1687 for entry in &manifest.flows {
1688 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1689 }
1690 Self {
1691 metadata: PackMetadata::from_manifest(&manifest),
1692 descriptors,
1693 flows,
1694 }
1695 }
1696}
1697
1698fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1699 let extensions = manifest.extensions.as_ref()?;
1700 let extension = extensions.iter().find_map(|(key, ext)| {
1701 if RUNTIME_FLOW_EXTENSION_IDS
1702 .iter()
1703 .any(|candidate| candidate == key)
1704 {
1705 Some(ext)
1706 } else {
1707 None
1708 }
1709 })?;
1710 let runtime_flows = match decode_runtime_flow_extension(extension) {
1711 Some(flows) if !flows.is_empty() => flows,
1712 _ => return None,
1713 };
1714
1715 let descriptors = runtime_flows
1716 .iter()
1717 .map(|flow| FlowDescriptor {
1718 id: flow.id.as_str().to_string(),
1719 flow_type: flow_kind_to_str(flow.kind).to_string(),
1720 profile: manifest.pack_id.as_str().to_string(),
1721 version: manifest.version.to_string(),
1722 description: None,
1723 })
1724 .collect::<Vec<_>>();
1725 let flows = runtime_flows
1726 .into_iter()
1727 .map(|flow| (flow.id.as_str().to_string(), flow))
1728 .collect();
1729
1730 Some(PackFlows {
1731 metadata: PackMetadata::from_manifest(manifest),
1732 descriptors,
1733 flows,
1734 })
1735}
1736
1737fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1738 let value = match extension.inline.as_ref()? {
1739 ExtensionInline::Other(value) => value.clone(),
1740 _ => return None,
1741 };
1742
1743 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1744 return Some(collect_runtime_flows(bundle.flows));
1745 }
1746
1747 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1748 return Some(collect_runtime_flows(flows));
1749 }
1750
1751 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1752 return Some(flows);
1753 }
1754
1755 warn!(
1756 extension = %extension.kind,
1757 version = %extension.version,
1758 "runtime flow extension present but could not be decoded"
1759 );
1760 None
1761}
1762
1763fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1764 flows
1765 .into_iter()
1766 .filter_map(|flow| match runtime_flow_to_flow(flow) {
1767 Ok(flow) => Some(flow),
1768 Err(err) => {
1769 warn!(error = %err, "failed to decode runtime flow");
1770 None
1771 }
1772 })
1773 .collect()
1774}
1775
1776fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1777 let flow_id = FlowId::from_str(&runtime.id)
1778 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1779 let mut entrypoints = runtime.entrypoints;
1780 if entrypoints.is_empty()
1781 && let Some(start) = &runtime.start
1782 {
1783 entrypoints.insert("default".into(), Value::String(start.clone()));
1784 }
1785
1786 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1787 for (id, node) in runtime.nodes {
1788 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1789 let component_id = ComponentId::from_str(&node.component_id)
1790 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1791 let component = FlowComponentRef {
1792 id: component_id,
1793 pack_alias: None,
1794 operation: node.operation_name,
1795 };
1796 let routing = node.routing.unwrap_or(Routing::End);
1797 let telemetry = node.telemetry.unwrap_or_default();
1798 nodes.insert(
1799 node_id.clone(),
1800 Node {
1801 id: node_id,
1802 component,
1803 input: InputMapping {
1804 mapping: node.operation_payload,
1805 },
1806 output: OutputMapping {
1807 mapping: Value::Null,
1808 },
1809 routing,
1810 telemetry,
1811 },
1812 );
1813 }
1814
1815 Ok(Flow {
1816 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1817 id: flow_id,
1818 kind: runtime.kind,
1819 entrypoints,
1820 nodes,
1821 metadata: runtime.metadata.unwrap_or_default(),
1822 })
1823}
1824
1825fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1826 match kind {
1827 greentic_types::FlowKind::Messaging => "messaging",
1828 greentic_types::FlowKind::Event => "event",
1829 greentic_types::FlowKind::ComponentConfig => "component-config",
1830 greentic_types::FlowKind::Job => "job",
1831 greentic_types::FlowKind::Http => "http",
1832 }
1833}
1834
1835fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1836 let mut file = archive
1837 .by_name(name)
1838 .with_context(|| format!("entry {name} missing from archive"))?;
1839 let mut buf = Vec::new();
1840 file.read_to_end(&mut buf)?;
1841 Ok(buf)
1842}
1843
1844fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1845 for node in doc.nodes.values_mut() {
1846 let Some((component_ref, payload)) = node
1847 .raw
1848 .iter()
1849 .next()
1850 .map(|(key, value)| (key.clone(), value.clone()))
1851 else {
1852 continue;
1853 };
1854 if component_ref.starts_with("emit.") {
1855 node.operation = Some(component_ref);
1856 node.payload = payload;
1857 node.raw.clear();
1858 continue;
1859 }
1860 let (target_component, operation, input, config) =
1861 infer_component_exec(&payload, &component_ref);
1862 let mut payload_obj = serde_json::Map::new();
1863 payload_obj.insert("component".into(), Value::String(target_component));
1865 payload_obj.insert("operation".into(), Value::String(operation));
1866 payload_obj.insert("input".into(), input);
1867 if let Some(cfg) = config {
1868 payload_obj.insert("config".into(), cfg);
1869 }
1870 node.operation = Some("component.exec".to_string());
1871 node.payload = Value::Object(payload_obj);
1872 node.raw.clear();
1873 }
1874 doc
1875}
1876
1877fn infer_component_exec(
1878 payload: &Value,
1879 component_ref: &str,
1880) -> (String, String, Value, Option<Value>) {
1881 let default_op = if component_ref.starts_with("templating.") {
1882 "render"
1883 } else {
1884 "invoke"
1885 }
1886 .to_string();
1887
1888 if let Value::Object(map) = payload {
1889 let op = map
1890 .get("op")
1891 .or_else(|| map.get("operation"))
1892 .and_then(Value::as_str)
1893 .map(|s| s.to_string())
1894 .unwrap_or_else(|| default_op.clone());
1895
1896 let mut input = map.clone();
1897 let config = input.remove("config");
1898 let component = input
1899 .get("component")
1900 .or_else(|| input.get("component_ref"))
1901 .and_then(Value::as_str)
1902 .map(|s| s.to_string())
1903 .unwrap_or_else(|| component_ref.to_string());
1904 input.remove("component");
1905 input.remove("component_ref");
1906 input.remove("op");
1907 input.remove("operation");
1908 return (component, op, Value::Object(input), config);
1909 }
1910
1911 (component_ref.to_string(), default_op, payload.clone(), None)
1912}
1913
1914#[derive(Clone, Debug)]
1915struct ComponentSpec {
1916 id: String,
1917 version: String,
1918 legacy_path: Option<String>,
1919}
1920
1921#[derive(Clone, Debug)]
1922struct ComponentSourceInfo {
1923 digest: Option<String>,
1924 source: ComponentSourceRef,
1925 artifact: ComponentArtifactLocation,
1926 expected_wasm_sha256: Option<String>,
1927 skip_digest_verification: bool,
1928}
1929
1930#[derive(Clone, Debug)]
1931enum ComponentArtifactLocation {
1932 Inline { wasm_path: String },
1933 Remote,
1934}
1935
1936#[derive(Clone, Debug, Deserialize)]
1937struct PackLockV1 {
1938 schema_version: u32,
1939 components: Vec<PackLockComponent>,
1940}
1941
1942#[derive(Clone, Debug, Deserialize)]
1943struct PackLockComponent {
1944 name: String,
1945 #[serde(default, rename = "source_ref")]
1946 source_ref: Option<String>,
1947 #[serde(default, rename = "ref")]
1948 legacy_ref: Option<String>,
1949 #[serde(default)]
1950 component_id: Option<ComponentId>,
1951 #[serde(default)]
1952 bundled: Option<bool>,
1953 #[serde(default, rename = "bundled_path")]
1954 bundled_path: Option<String>,
1955 #[serde(default, rename = "path")]
1956 legacy_path: Option<String>,
1957 #[serde(default)]
1958 wasm_sha256: Option<String>,
1959 #[serde(default, rename = "sha256")]
1960 legacy_sha256: Option<String>,
1961 #[serde(default)]
1962 resolved_digest: Option<String>,
1963 #[serde(default)]
1964 digest: Option<String>,
1965}
1966
1967fn component_specs(
1968 manifest: Option<&greentic_types::PackManifest>,
1969 legacy_manifest: Option<&legacy_pack::PackManifest>,
1970 component_sources: Option<&ComponentSourcesV1>,
1971 pack_lock: Option<&PackLockV1>,
1972) -> Vec<ComponentSpec> {
1973 if let Some(manifest) = manifest {
1974 if !manifest.components.is_empty() {
1975 return manifest
1976 .components
1977 .iter()
1978 .map(|entry| ComponentSpec {
1979 id: entry.id.as_str().to_string(),
1980 version: entry.version.to_string(),
1981 legacy_path: None,
1982 })
1983 .collect();
1984 }
1985 if let Some(lock) = pack_lock {
1986 let mut seen = HashSet::new();
1987 let mut specs = Vec::new();
1988 for entry in &lock.components {
1989 let id = entry
1990 .component_id
1991 .as_ref()
1992 .map(|id| id.as_str())
1993 .unwrap_or(entry.name.as_str());
1994 if seen.insert(id.to_string()) {
1995 specs.push(ComponentSpec {
1996 id: id.to_string(),
1997 version: "0.0.0".to_string(),
1998 legacy_path: None,
1999 });
2000 }
2001 }
2002 return specs;
2003 }
2004 if let Some(sources) = component_sources {
2005 let mut seen = HashSet::new();
2006 let mut specs = Vec::new();
2007 for entry in &sources.components {
2008 let id = entry
2009 .component_id
2010 .as_ref()
2011 .map(|id| id.as_str())
2012 .unwrap_or(entry.name.as_str());
2013 if seen.insert(id.to_string()) {
2014 specs.push(ComponentSpec {
2015 id: id.to_string(),
2016 version: "0.0.0".to_string(),
2017 legacy_path: None,
2018 });
2019 }
2020 }
2021 return specs;
2022 }
2023 }
2024 if let Some(legacy_manifest) = legacy_manifest {
2025 return legacy_manifest
2026 .components
2027 .iter()
2028 .map(|entry| ComponentSpec {
2029 id: entry.name.clone(),
2030 version: entry.version.to_string(),
2031 legacy_path: Some(entry.file_wasm.clone()),
2032 })
2033 .collect();
2034 }
2035 Vec::new()
2036}
2037
2038fn component_sources_table(
2039 sources: Option<&ComponentSourcesV1>,
2040) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2041 let Some(sources) = sources else {
2042 return Ok(None);
2043 };
2044 let mut table = HashMap::new();
2045 for entry in &sources.components {
2046 let artifact = match &entry.artifact {
2047 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2048 wasm_path: wasm_path.clone(),
2049 },
2050 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2051 };
2052 let info = ComponentSourceInfo {
2053 digest: Some(entry.resolved.digest.clone()),
2054 source: entry.source.clone(),
2055 artifact,
2056 expected_wasm_sha256: None,
2057 skip_digest_verification: false,
2058 };
2059 if let Some(component_id) = entry.component_id.as_ref() {
2060 table.insert(component_id.as_str().to_string(), info.clone());
2061 }
2062 table.insert(entry.name.clone(), info);
2063 }
2064 Ok(Some(table))
2065}
2066
2067fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2068 let lock_path = if path.is_dir() {
2069 let candidate = path.join("pack.lock");
2070 if candidate.exists() {
2071 Some(candidate)
2072 } else {
2073 let candidate = path.join("pack.lock.json");
2074 candidate.exists().then_some(candidate)
2075 }
2076 } else {
2077 None
2078 };
2079 let Some(lock_path) = lock_path else {
2080 return Ok(None);
2081 };
2082 let raw = std::fs::read_to_string(&lock_path)
2083 .with_context(|| format!("failed to read {}", lock_path.display()))?;
2084 let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2085 if lock.schema_version != 1 {
2086 bail!("pack.lock schema_version must be 1");
2087 }
2088 Ok(Some(lock))
2089}
2090
2091fn find_pack_lock_roots(
2092 pack_path: &Path,
2093 is_dir: bool,
2094 archive_hint: Option<&Path>,
2095) -> Vec<PathBuf> {
2096 if is_dir {
2097 return vec![pack_path.to_path_buf()];
2098 }
2099 let mut roots = Vec::new();
2100 if let Some(archive_path) = archive_hint {
2101 if let Some(parent) = archive_path.parent() {
2102 roots.push(parent.to_path_buf());
2103 if let Some(grandparent) = parent.parent() {
2104 roots.push(grandparent.to_path_buf());
2105 }
2106 }
2107 } else if let Some(parent) = pack_path.parent() {
2108 roots.push(parent.to_path_buf());
2109 if let Some(grandparent) = parent.parent() {
2110 roots.push(grandparent.to_path_buf());
2111 }
2112 }
2113 roots
2114}
2115
2116fn normalize_sha256(digest: &str) -> Result<String> {
2117 let trimmed = digest.trim();
2118 if trimmed.is_empty() {
2119 bail!("sha256 digest cannot be empty");
2120 }
2121 if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2122 if stripped.is_empty() {
2123 bail!("sha256 digest must include hex bytes after sha256:");
2124 }
2125 return Ok(trimmed.to_string());
2126 }
2127 if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2128 return Ok(format!("sha256:{trimmed}"));
2129 }
2130 bail!("sha256 digest must be hex or sha256:<hex>");
2131}
2132
2133fn component_sources_table_from_pack_lock(
2134 lock: &PackLockV1,
2135 allow_missing_hash: bool,
2136) -> Result<HashMap<String, ComponentSourceInfo>> {
2137 let mut table = HashMap::new();
2138 let mut names = HashSet::new();
2139 for entry in &lock.components {
2140 if !names.insert(entry.name.clone()) {
2141 bail!(
2142 "pack.lock contains duplicate component name `{}`",
2143 entry.name
2144 );
2145 }
2146 let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2147 (Some(primary), Some(legacy)) => {
2148 if primary != legacy {
2149 bail!(
2150 "pack.lock component {} has conflicting refs: {} vs {}",
2151 entry.name,
2152 primary,
2153 legacy
2154 );
2155 }
2156 primary.as_str()
2157 }
2158 (Some(primary), None) => primary.as_str(),
2159 (None, Some(legacy)) => legacy.as_str(),
2160 (None, None) => {
2161 bail!("pack.lock component {} missing source_ref", entry.name);
2162 }
2163 };
2164 let source: ComponentSourceRef = source_ref
2165 .parse()
2166 .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2167 let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2168 (Some(primary), Some(legacy)) => {
2169 if primary != legacy {
2170 bail!(
2171 "pack.lock component {} has conflicting bundled paths: {} vs {}",
2172 entry.name,
2173 primary,
2174 legacy
2175 );
2176 }
2177 Some(primary.clone())
2178 }
2179 (Some(primary), None) => Some(primary.clone()),
2180 (None, Some(legacy)) => Some(legacy.clone()),
2181 (None, None) => None,
2182 };
2183 let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2184 let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2185 let wasm_path = bundled_path.ok_or_else(|| {
2186 anyhow!(
2187 "pack.lock component {} marked bundled but bundled_path is missing",
2188 entry.name
2189 )
2190 })?;
2191 let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2192 (Some(primary), Some(legacy)) => {
2193 if primary != legacy {
2194 bail!(
2195 "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2196 entry.name,
2197 primary,
2198 legacy
2199 );
2200 }
2201 Some(primary.as_str())
2202 }
2203 (Some(primary), None) => Some(primary.as_str()),
2204 (None, Some(legacy)) => Some(legacy.as_str()),
2205 (None, None) => None,
2206 };
2207 let expected = match expected_raw {
2208 Some(value) => Some(normalize_sha256(value)?),
2209 None => None,
2210 };
2211 if expected.is_none() && !allow_missing_hash {
2212 bail!(
2213 "pack.lock component {} missing wasm_sha256 for bundled component",
2214 entry.name
2215 );
2216 }
2217 (
2218 ComponentArtifactLocation::Inline { wasm_path },
2219 expected.clone(),
2220 expected,
2221 allow_missing_hash && expected_raw.is_none(),
2222 )
2223 } else {
2224 if source.is_tag() {
2225 bail!(
2226 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2227 entry.name,
2228 source
2229 );
2230 }
2231 let expected = entry
2232 .resolved_digest
2233 .as_deref()
2234 .or(entry.digest.as_deref())
2235 .ok_or_else(|| {
2236 anyhow!(
2237 "pack.lock component {} missing resolved_digest for remote component",
2238 entry.name
2239 )
2240 })?;
2241 (
2242 ComponentArtifactLocation::Remote,
2243 Some(normalize_digest(expected)),
2244 None,
2245 false,
2246 )
2247 };
2248 let info = ComponentSourceInfo {
2249 digest,
2250 source,
2251 artifact,
2252 expected_wasm_sha256,
2253 skip_digest_verification,
2254 };
2255 if let Some(component_id) = entry.component_id.as_ref() {
2256 let key = component_id.as_str().to_string();
2257 if table.contains_key(&key) {
2258 bail!(
2259 "pack.lock contains duplicate component id `{}`",
2260 component_id.as_str()
2261 );
2262 }
2263 table.insert(key, info.clone());
2264 }
2265 if entry.name
2266 != entry
2267 .component_id
2268 .as_ref()
2269 .map(|id| id.as_str())
2270 .unwrap_or("")
2271 {
2272 table.insert(entry.name.clone(), info);
2273 }
2274 }
2275 Ok(table)
2276}
2277
2278fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2279 if let Some(path) = &spec.legacy_path {
2280 return root.join(path);
2281 }
2282 root.join("components").join(format!("{}.wasm", spec.id))
2283}
2284
2285fn normalize_digest(digest: &str) -> String {
2286 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2287 digest.to_string()
2288 } else {
2289 format!("sha256:{digest}")
2290 }
2291}
2292
2293fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2294 if digest.starts_with("blake3:") {
2295 let hash = blake3::hash(bytes);
2296 return Ok(format!("blake3:{}", hash.to_hex()));
2297 }
2298 let mut hasher = sha2::Sha256::new();
2299 hasher.update(bytes);
2300 Ok(format!("sha256:{:x}", hasher.finalize()))
2301}
2302
2303fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2304 let mut hasher = sha2::Sha256::new();
2305 hasher.update(bytes);
2306 format!("sha256:{:x}", hasher.finalize())
2307}
2308
2309fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2310 let normalized_expected = normalize_digest(expected);
2311 let actual = compute_digest_for(bytes, &normalized_expected)?;
2312 if normalize_digest(&actual) != normalized_expected {
2313 bail!(
2314 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2315 );
2316 }
2317 Ok(())
2318}
2319
2320fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2321 let normalized_expected = normalize_sha256(expected)?;
2322 let actual = compute_sha256_digest_for(bytes);
2323 if actual != normalized_expected {
2324 bail!(
2325 "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2326 );
2327 }
2328 Ok(())
2329}
2330
2331#[cfg(test)]
2332mod pack_lock_tests {
2333 use super::*;
2334 use tempfile::TempDir;
2335
2336 #[test]
2337 fn pack_lock_tag_ref_requires_bundle() {
2338 let lock = PackLockV1 {
2339 schema_version: 1,
2340 components: vec![PackLockComponent {
2341 name: "templates".to_string(),
2342 source_ref: Some("oci://registry.test/templates:latest".to_string()),
2343 legacy_ref: None,
2344 component_id: None,
2345 bundled: Some(false),
2346 bundled_path: None,
2347 legacy_path: None,
2348 wasm_sha256: None,
2349 legacy_sha256: None,
2350 resolved_digest: None,
2351 digest: None,
2352 }],
2353 };
2354 let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2355 assert!(
2356 err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2357 "unexpected error: {err}"
2358 );
2359 }
2360
2361 #[test]
2362 fn bundled_hash_mismatch_errors() {
2363 let rt = tokio::runtime::Runtime::new().expect("runtime");
2364 let temp = TempDir::new().expect("temp dir");
2365 let wasm_path = temp.path().join("component.wasm");
2366 let fixture_root =
2367 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/runner-components");
2368 let fixture_wasm = fixture_root.join("target/wasm32-wasip2/release/qa_process.wasm");
2369 let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2370 std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2371
2372 let spec = ComponentSpec {
2373 id: "qa.process".to_string(),
2374 version: "0.0.0".to_string(),
2375 legacy_path: None,
2376 };
2377 let mut missing = HashSet::new();
2378 missing.insert(spec.id.clone());
2379
2380 let mut sources = HashMap::new();
2381 sources.insert(
2382 spec.id.clone(),
2383 ComponentSourceInfo {
2384 digest: Some("sha256:deadbeef".to_string()),
2385 source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2386 artifact: ComponentArtifactLocation::Inline {
2387 wasm_path: "component.wasm".to_string(),
2388 },
2389 expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2390 skip_digest_verification: false,
2391 },
2392 );
2393
2394 let mut loaded = HashMap::new();
2395 let result = rt.block_on(load_components_from_sources(
2396 &Engine::default(),
2397 &sources,
2398 &ComponentResolution::default(),
2399 &[spec],
2400 &mut missing,
2401 &mut loaded,
2402 Some(temp.path()),
2403 None,
2404 ));
2405 let err = result.unwrap_err();
2406 assert!(
2407 err.to_string().contains("bundled digest mismatch"),
2408 "unexpected error: {err}"
2409 );
2410 }
2411}
2412
2413fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2414 let mut opts = DistOptions {
2415 allow_tags: true,
2416 ..DistOptions::default()
2417 };
2418 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2419 opts.cache_dir = cache_dir;
2420 }
2421 if component_resolution.dist_offline {
2422 opts.offline = true;
2423 }
2424 opts
2425}
2426
2427#[allow(clippy::too_many_arguments)]
2428async fn load_components_from_sources(
2429 engine: &Engine,
2430 component_sources: &HashMap<String, ComponentSourceInfo>,
2431 component_resolution: &ComponentResolution,
2432 specs: &[ComponentSpec],
2433 missing: &mut HashSet<String>,
2434 into: &mut HashMap<String, PackComponent>,
2435 materialized_root: Option<&Path>,
2436 archive_hint: Option<&Path>,
2437) -> Result<()> {
2438 let mut archive = if let Some(path) = archive_hint {
2439 Some(
2440 ZipArchive::new(File::open(path)?)
2441 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2442 )
2443 } else {
2444 None
2445 };
2446 let mut dist_client: Option<DistClient> = None;
2447
2448 for spec in specs {
2449 if !missing.contains(&spec.id) {
2450 continue;
2451 }
2452 let Some(source) = component_sources.get(&spec.id) else {
2453 continue;
2454 };
2455
2456 let bytes = match &source.artifact {
2457 ComponentArtifactLocation::Inline { wasm_path } => {
2458 if let Some(root) = materialized_root {
2459 let path = root.join(wasm_path);
2460 if path.exists() {
2461 std::fs::read(&path).with_context(|| {
2462 format!(
2463 "failed to read inline component {} from {}",
2464 spec.id,
2465 path.display()
2466 )
2467 })?
2468 } else if archive.is_none() {
2469 bail!("inline component {} missing at {}", spec.id, path.display());
2470 } else {
2471 read_entry(
2472 archive.as_mut().expect("archive present when needed"),
2473 wasm_path,
2474 )
2475 .with_context(|| {
2476 format!(
2477 "inline component {} missing at {} in pack archive",
2478 spec.id, wasm_path
2479 )
2480 })?
2481 }
2482 } else if let Some(archive) = archive.as_mut() {
2483 read_entry(archive, wasm_path).with_context(|| {
2484 format!(
2485 "inline component {} missing at {} in pack archive",
2486 spec.id, wasm_path
2487 )
2488 })?
2489 } else {
2490 bail!(
2491 "inline component {} missing and no pack source available",
2492 spec.id
2493 );
2494 }
2495 }
2496 ComponentArtifactLocation::Remote => {
2497 if source.source.is_tag() {
2498 bail!(
2499 "component {} uses tag ref {} but is not bundled; rebuild the pack",
2500 spec.id,
2501 source.source
2502 );
2503 }
2504 let client = dist_client.get_or_insert_with(|| {
2505 DistClient::new(dist_options_from(component_resolution))
2506 });
2507 let reference = source.source.to_string();
2508 let digest = source.digest.as_deref().ok_or_else(|| {
2509 anyhow!(
2510 "component {} missing expected digest for remote component",
2511 spec.id
2512 )
2513 })?;
2514 let cache_path = if component_resolution.dist_offline {
2515 client
2516 .fetch_digest(digest)
2517 .await
2518 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2519 } else {
2520 let resolved = client
2521 .resolve_ref(&reference)
2522 .await
2523 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2524 let expected = normalize_digest(digest);
2525 let actual = normalize_digest(&resolved.digest);
2526 if expected != actual {
2527 bail!(
2528 "component {} digest mismatch after fetch: expected {}, got {}",
2529 spec.id,
2530 expected,
2531 actual
2532 );
2533 }
2534 resolved.cache_path.ok_or_else(|| {
2535 anyhow!(
2536 "component {} resolved from {} but cache path is missing",
2537 spec.id,
2538 reference
2539 )
2540 })?
2541 };
2542 std::fs::read(&cache_path).with_context(|| {
2543 format!(
2544 "failed to read cached component {} from {}",
2545 spec.id,
2546 cache_path.display()
2547 )
2548 })?
2549 }
2550 };
2551
2552 if let Some(expected) = source.expected_wasm_sha256.as_deref() {
2553 verify_wasm_sha256(&spec.id, expected, &bytes)?;
2554 } else if source.skip_digest_verification {
2555 let actual = compute_sha256_digest_for(&bytes);
2556 warn!(
2557 component_id = %spec.id,
2558 digest = %actual,
2559 "bundled component missing wasm_sha256; allowing due to flag"
2560 );
2561 } else {
2562 let expected = source.digest.as_deref().ok_or_else(|| {
2563 anyhow!(
2564 "component {} missing expected digest for verification",
2565 spec.id
2566 )
2567 })?;
2568 verify_component_digest(&spec.id, expected, &bytes)?;
2569 }
2570 let component = Component::from_binary(engine, &bytes)
2571 .with_context(|| format!("failed to compile component {}", spec.id))?;
2572 into.insert(
2573 spec.id.clone(),
2574 PackComponent {
2575 name: spec.id.clone(),
2576 version: spec.version.clone(),
2577 component,
2578 },
2579 );
2580 missing.remove(&spec.id);
2581 }
2582
2583 Ok(())
2584}
2585
2586fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2587 match err {
2588 DistError::CacheMiss { reference: missing } => anyhow!(
2589 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2590 component_id,
2591 missing,
2592 reference
2593 ),
2594 DistError::Offline { reference: blocked } => anyhow!(
2595 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2596 component_id,
2597 blocked,
2598 reference
2599 ),
2600 DistError::AuthRequired { target } => anyhow!(
2601 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2602 component_id,
2603 target,
2604 reference
2605 ),
2606 other => anyhow!(
2607 "failed to resolve component {} from {}: {}",
2608 component_id,
2609 reference,
2610 other
2611 ),
2612 }
2613}
2614
2615fn load_components_from_overrides(
2616 engine: &Engine,
2617 overrides: &HashMap<String, PathBuf>,
2618 specs: &[ComponentSpec],
2619 missing: &mut HashSet<String>,
2620 into: &mut HashMap<String, PackComponent>,
2621) -> Result<()> {
2622 for spec in specs {
2623 if !missing.contains(&spec.id) {
2624 continue;
2625 }
2626 let Some(path) = overrides.get(&spec.id) else {
2627 continue;
2628 };
2629 let bytes = std::fs::read(path)
2630 .with_context(|| format!("failed to read override component {}", path.display()))?;
2631 let component = Component::from_binary(engine, &bytes).with_context(|| {
2632 format!(
2633 "failed to compile component {} from override {}",
2634 spec.id,
2635 path.display()
2636 )
2637 })?;
2638 into.insert(
2639 spec.id.clone(),
2640 PackComponent {
2641 name: spec.id.clone(),
2642 version: spec.version.clone(),
2643 component,
2644 },
2645 );
2646 missing.remove(&spec.id);
2647 }
2648 Ok(())
2649}
2650
2651fn load_components_from_dir(
2652 engine: &Engine,
2653 root: &Path,
2654 specs: &[ComponentSpec],
2655 missing: &mut HashSet<String>,
2656 into: &mut HashMap<String, PackComponent>,
2657) -> Result<()> {
2658 for spec in specs {
2659 if !missing.contains(&spec.id) {
2660 continue;
2661 }
2662 let path = component_path_for_spec(root, spec);
2663 if !path.exists() {
2664 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2665 continue;
2666 }
2667 let bytes = std::fs::read(&path)
2668 .with_context(|| format!("failed to read component {}", path.display()))?;
2669 let component = Component::from_binary(engine, &bytes).with_context(|| {
2670 format!(
2671 "failed to compile component {} from {}",
2672 spec.id,
2673 path.display()
2674 )
2675 })?;
2676 into.insert(
2677 spec.id.clone(),
2678 PackComponent {
2679 name: spec.id.clone(),
2680 version: spec.version.clone(),
2681 component,
2682 },
2683 );
2684 missing.remove(&spec.id);
2685 }
2686 Ok(())
2687}
2688
2689fn load_components_from_archive(
2690 engine: &Engine,
2691 path: &Path,
2692 specs: &[ComponentSpec],
2693 missing: &mut HashSet<String>,
2694 into: &mut HashMap<String, PackComponent>,
2695) -> Result<()> {
2696 let mut archive = ZipArchive::new(File::open(path)?)
2697 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2698 for spec in specs {
2699 if !missing.contains(&spec.id) {
2700 continue;
2701 }
2702 let file_name = spec
2703 .legacy_path
2704 .clone()
2705 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2706 let bytes = match read_entry(&mut archive, &file_name) {
2707 Ok(bytes) => bytes,
2708 Err(err) => {
2709 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2710 continue;
2711 }
2712 };
2713 let component = Component::from_binary(engine, &bytes)
2714 .with_context(|| format!("failed to compile component {}", spec.id))?;
2715 into.insert(
2716 spec.id.clone(),
2717 PackComponent {
2718 name: spec.id.clone(),
2719 version: spec.version.clone(),
2720 component,
2721 },
2722 );
2723 missing.remove(&spec.id);
2724 }
2725 Ok(())
2726}
2727
2728#[cfg(test)]
2729mod tests {
2730 use super::*;
2731 use greentic_flow::model::{FlowDoc, NodeDoc};
2732 use indexmap::IndexMap;
2733 use serde_json::json;
2734
2735 #[test]
2736 fn normalizes_raw_component_to_component_exec() {
2737 let mut nodes = IndexMap::new();
2738 let mut raw = IndexMap::new();
2739 raw.insert(
2740 "templating.handlebars".into(),
2741 json!({ "template": "Hi {{name}}" }),
2742 );
2743 nodes.insert(
2744 "start".into(),
2745 NodeDoc {
2746 raw,
2747 routing: json!([{"out": true}]),
2748 ..Default::default()
2749 },
2750 );
2751 let doc = FlowDoc {
2752 id: "welcome".into(),
2753 title: None,
2754 description: None,
2755 flow_type: "messaging".into(),
2756 start: Some("start".into()),
2757 parameters: json!({}),
2758 tags: Vec::new(),
2759 schema_version: None,
2760 entrypoints: IndexMap::new(),
2761 nodes,
2762 };
2763
2764 let normalized = normalize_flow_doc(doc);
2765 let node = normalized.nodes.get("start").expect("node exists");
2766 assert_eq!(node.operation.as_deref(), Some("component.exec"));
2767 assert!(node.raw.is_empty());
2768 let payload = node.payload.as_object().expect("payload object");
2769 assert_eq!(
2770 payload.get("component"),
2771 Some(&Value::String("templating.handlebars".into()))
2772 );
2773 assert_eq!(
2774 payload.get("operation"),
2775 Some(&Value::String("render".into()))
2776 );
2777 let input = payload.get("input").unwrap();
2778 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2779 }
2780}
2781
2782#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2783pub struct PackMetadata {
2784 pub pack_id: String,
2785 pub version: String,
2786 #[serde(default)]
2787 pub entry_flows: Vec<String>,
2788 #[serde(default)]
2789 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2790}
2791
2792impl PackMetadata {
2793 fn from_wasm(bytes: &[u8]) -> Option<Self> {
2794 let parser = Parser::new(0);
2795 for payload in parser.parse_all(bytes) {
2796 let payload = payload.ok()?;
2797 match payload {
2798 Payload::CustomSection(section) => {
2799 if section.name() == "greentic.manifest"
2800 && let Ok(meta) = Self::from_bytes(section.data())
2801 {
2802 return Some(meta);
2803 }
2804 }
2805 Payload::DataSection(reader) => {
2806 for segment in reader.into_iter().flatten() {
2807 if let Ok(meta) = Self::from_bytes(segment.data) {
2808 return Some(meta);
2809 }
2810 }
2811 }
2812 _ => {}
2813 }
2814 }
2815 None
2816 }
2817
2818 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2819 #[derive(Deserialize)]
2820 struct RawManifest {
2821 pack_id: String,
2822 version: String,
2823 #[serde(default)]
2824 entry_flows: Vec<String>,
2825 #[serde(default)]
2826 flows: Vec<RawFlow>,
2827 #[serde(default)]
2828 secret_requirements: Vec<greentic_types::SecretRequirement>,
2829 }
2830
2831 #[derive(Deserialize)]
2832 struct RawFlow {
2833 id: String,
2834 }
2835
2836 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2837 let mut entry_flows = if manifest.entry_flows.is_empty() {
2838 manifest.flows.iter().map(|f| f.id.clone()).collect()
2839 } else {
2840 manifest.entry_flows.clone()
2841 };
2842 entry_flows.retain(|id| !id.is_empty());
2843 Ok(Self {
2844 pack_id: manifest.pack_id,
2845 version: manifest.version,
2846 entry_flows,
2847 secret_requirements: manifest.secret_requirements,
2848 })
2849 }
2850
2851 pub fn fallback(path: &Path) -> Self {
2852 let pack_id = path
2853 .file_stem()
2854 .map(|s| s.to_string_lossy().into_owned())
2855 .unwrap_or_else(|| "unknown-pack".to_string());
2856 Self {
2857 pack_id,
2858 version: "0.0.0".to_string(),
2859 entry_flows: Vec::new(),
2860 secret_requirements: Vec::new(),
2861 }
2862 }
2863
2864 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2865 let entry_flows = manifest
2866 .flows
2867 .iter()
2868 .map(|flow| flow.id.as_str().to_string())
2869 .collect::<Vec<_>>();
2870 Self {
2871 pack_id: manifest.pack_id.as_str().to_string(),
2872 version: manifest.version.to_string(),
2873 entry_flows,
2874 secret_requirements: manifest.secret_requirements.clone(),
2875 }
2876 }
2877}