1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11 ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
15use anyhow::{Context, Result, anyhow, bail};
16use greentic_interfaces_wasmtime::host_helpers::v1::{
17 self as host_v1, HostFns, add_all_v1_to_linker,
18 messaging_session::{
19 MessagingSessionError as MessagingError, MessagingSessionHost, OpAck as MessagingAck,
20 OutboundMessage, TenantCtx as MessagingTenantCtx,
21 },
22 runner_host_http::RunnerHostHttp,
23 runner_host_kv::RunnerHostKv,
24 secrets_store::{SecretsError, SecretsStoreHost},
25 state_store::{
26 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27 StateStoreHost, TenantCtx as StateTenantCtx,
28 },
29 telemetry_logger::{
30 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32 TenantCtx as TelemetryTenantCtx,
33 },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::{
39 EnvId, Flow, StateKey as StoreStateKey, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
40 decode_pack_manifest,
41};
42use host_v1::http_client::{
43 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
44 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
45 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
46 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
47};
48use once_cell::sync::Lazy;
49use parking_lot::Mutex;
50use reqwest::blocking::Client as BlockingClient;
51use runner_core::normalize_under_root;
52use serde::{Deserialize, Serialize};
53use serde_cbor;
54use serde_json::{self, Value};
55use tokio::fs;
56use wasmparser::{Parser, Payload};
57use wasmtime::StoreContextMut;
58use zip::ZipArchive;
59
60use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
61use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
62use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
63
64use crate::config::HostConfig;
65use crate::secrets::{DynSecretsManager, read_secret_blocking};
66use crate::storage::state::STATE_PREFIX;
67use crate::storage::{DynSessionStore, DynStateStore};
68use crate::verify;
69use crate::wasi::RunnerWasiPolicy;
70use tracing::warn;
71use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
72use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
73
74use greentic_flow::model::FlowDoc;
75
76#[allow(dead_code)]
77pub struct PackRuntime {
78 path: PathBuf,
80 archive_path: Option<PathBuf>,
82 config: Arc<HostConfig>,
83 engine: Engine,
84 metadata: PackMetadata,
85 manifest: Option<greentic_types::PackManifest>,
86 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
87 mocks: Option<Arc<MockLayer>>,
88 flows: Option<PackFlows>,
89 components: HashMap<String, PackComponent>,
90 http_client: Arc<BlockingClient>,
91 pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
92 session_store: Option<DynSessionStore>,
93 state_store: Option<DynStateStore>,
94 wasi_policy: Arc<RunnerWasiPolicy>,
95 secrets: DynSecretsManager,
96 oauth_config: Option<OAuthBrokerConfig>,
97}
98
99struct PackComponent {
100 #[allow(dead_code)]
101 name: String,
102 #[allow(dead_code)]
103 version: String,
104 component: Component,
105}
106
107fn build_blocking_client() -> BlockingClient {
108 std::thread::spawn(|| {
109 BlockingClient::builder()
110 .no_proxy()
111 .build()
112 .expect("blocking client")
113 })
114 .join()
115 .expect("client build thread panicked")
116}
117
118fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
119 let (root, candidate) = if path.is_absolute() {
120 let parent = path
121 .parent()
122 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
123 let root = parent
124 .canonicalize()
125 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
126 let file = path
127 .file_name()
128 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
129 (root, PathBuf::from(file))
130 } else {
131 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
132 let base = if let Some(parent) = path.parent() {
133 cwd.join(parent)
134 } else {
135 cwd
136 };
137 let root = base
138 .canonicalize()
139 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
140 let file = path
141 .file_name()
142 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
143 (root, PathBuf::from(file))
144 };
145 let safe = normalize_under_root(&root, &candidate)?;
146 Ok((root, safe))
147}
148
149static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct FlowDescriptor {
153 pub id: String,
154 #[serde(rename = "type")]
155 pub flow_type: String,
156 pub profile: String,
157 pub version: String,
158 #[serde(default)]
159 pub description: Option<String>,
160}
161
162pub struct HostState {
163 config: Arc<HostConfig>,
164 http_client: Arc<BlockingClient>,
165 default_env: String,
166 #[allow(dead_code)]
167 session_store: Option<DynSessionStore>,
168 state_store: Option<DynStateStore>,
169 mocks: Option<Arc<MockLayer>>,
170 secrets: DynSecretsManager,
171 oauth_config: Option<OAuthBrokerConfig>,
172 oauth_host: OAuthBrokerHost,
173}
174
175impl HostState {
176 #[allow(clippy::default_constructed_unit_structs)]
177 pub fn new(
178 config: Arc<HostConfig>,
179 http_client: Arc<BlockingClient>,
180 mocks: Option<Arc<MockLayer>>,
181 session_store: Option<DynSessionStore>,
182 state_store: Option<DynStateStore>,
183 secrets: DynSecretsManager,
184 oauth_config: Option<OAuthBrokerConfig>,
185 ) -> Result<Self> {
186 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
187 Ok(Self {
188 config,
189 http_client,
190 default_env,
191 session_store,
192 state_store,
193 mocks,
194 secrets,
195 oauth_config,
196 oauth_host: OAuthBrokerHost::default(),
197 })
198 }
199
200 pub fn get_secret(&self, key: &str) -> Result<String> {
201 if !self.config.secrets_policy.is_allowed(key) {
202 bail!("secret {key} is not permitted by bindings policy");
203 }
204 if let Some(mock) = &self.mocks
205 && let Some(value) = mock.secrets_lookup(key)
206 {
207 return Ok(value);
208 }
209 let bytes = read_secret_blocking(&self.secrets, key)
210 .context("failed to read secret from manager")?;
211 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
212 Ok(value)
213 }
214
215 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
216 let tenant_raw = ctx
217 .as_ref()
218 .map(|ctx| ctx.tenant.clone())
219 .unwrap_or_else(|| self.config.tenant.clone());
220 let env_raw = ctx
221 .as_ref()
222 .map(|ctx| ctx.env.clone())
223 .unwrap_or_else(|| self.default_env.clone());
224 let tenant_id = TenantId::from_str(&tenant_raw)
225 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
226 let env_id = EnvId::from_str(&env_raw)
227 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
228 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
229 if let Some(ctx) = ctx {
230 if let Some(team) = ctx.team.or(ctx.team_id) {
231 let team_id =
232 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
233 tenant_ctx = tenant_ctx.with_team(Some(team_id));
234 }
235 if let Some(user) = ctx.user.or(ctx.user_id) {
236 let user_id =
237 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
238 tenant_ctx = tenant_ctx.with_user(Some(user_id));
239 }
240 if let Some(flow) = ctx.flow_id {
241 tenant_ctx = tenant_ctx.with_flow(flow);
242 }
243 if let Some(node) = ctx.node_id {
244 tenant_ctx = tenant_ctx.with_node(node);
245 }
246 if let Some(provider) = ctx.provider_id {
247 tenant_ctx = tenant_ctx.with_provider(provider);
248 }
249 if let Some(session) = ctx.session_id {
250 tenant_ctx = tenant_ctx.with_session(session);
251 }
252 tenant_ctx.trace_id = ctx.trace_id;
253 }
254 Ok(tenant_ctx)
255 }
256
257 fn send_http_request(
258 &mut self,
259 req: HttpRequest,
260 opts: Option<HttpRequestOptionsV1_1>,
261 _ctx: Option<HttpTenantCtx>,
262 ) -> Result<HttpResponse, HttpClientError> {
263 if !self.config.http_enabled {
264 return Err(HttpClientError {
265 code: "denied".into(),
266 message: "http client disabled by policy".into(),
267 });
268 }
269
270 let mut mock_state = None;
271 let raw_body = req.body.clone();
272 if let Some(mock) = &self.mocks
273 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
274 {
275 match mock.http_begin(&meta) {
276 HttpDecision::Mock(response) => {
277 let headers = response
278 .headers
279 .iter()
280 .map(|(k, v)| (k.clone(), v.clone()))
281 .collect();
282 return Ok(HttpResponse {
283 status: response.status,
284 headers,
285 body: response.body.clone().map(|b| b.into_bytes()),
286 });
287 }
288 HttpDecision::Deny(reason) => {
289 return Err(HttpClientError {
290 code: "denied".into(),
291 message: reason,
292 });
293 }
294 HttpDecision::Passthrough { record } => {
295 mock_state = Some((meta, record));
296 }
297 }
298 }
299
300 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
301 let mut builder = self.http_client.request(method, &req.url);
302 for (key, value) in req.headers {
303 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
304 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
305 {
306 builder = builder.header(header, header_value);
307 }
308 }
309
310 if let Some(body) = raw_body.clone() {
311 builder = builder.body(body);
312 }
313
314 if let Some(opts) = opts {
315 if let Some(timeout_ms) = opts.timeout_ms {
316 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
317 }
318 if opts.allow_insecure == Some(true) {
319 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
320 }
321 if let Some(follow_redirects) = opts.follow_redirects
322 && !follow_redirects
323 {
324 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
325 }
326 }
327
328 let response = match builder.send() {
329 Ok(resp) => resp,
330 Err(err) => {
331 warn!(url = %req.url, error = %err, "http client request failed");
332 return Err(HttpClientError {
333 code: "unavailable".into(),
334 message: err.to_string(),
335 });
336 }
337 };
338
339 let status = response.status().as_u16();
340 let headers_vec = response
341 .headers()
342 .iter()
343 .map(|(k, v)| {
344 (
345 k.as_str().to_string(),
346 v.to_str().unwrap_or_default().to_string(),
347 )
348 })
349 .collect::<Vec<_>>();
350 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
351
352 if let Some((meta, true)) = mock_state.take()
353 && let Some(mock) = &self.mocks
354 {
355 let recorded = HttpMockResponse::new(
356 status,
357 headers_vec.clone().into_iter().collect(),
358 body_bytes
359 .as_ref()
360 .map(|b| String::from_utf8_lossy(b).into_owned()),
361 );
362 mock.http_record(&meta, &recorded);
363 }
364
365 Ok(HttpResponse {
366 status,
367 headers: headers_vec,
368 body: body_bytes,
369 })
370 }
371}
372
373impl SecretsStoreHost for HostState {
374 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
375 if !self.config.secrets_policy.is_allowed(&key) {
376 return Err(SecretsError::Denied);
377 }
378 if let Some(mock) = &self.mocks
379 && let Some(value) = mock.secrets_lookup(&key)
380 {
381 return Ok(Some(value.into_bytes()));
382 }
383 match read_secret_blocking(&self.secrets, &key) {
384 Ok(bytes) => Ok(Some(bytes)),
385 Err(err) => {
386 warn!(secret = %key, error = %err, "secret lookup failed");
387 Err(SecretsError::NotFound)
388 }
389 }
390 }
391}
392
393impl HttpClientHost for HostState {
394 fn send(
395 &mut self,
396 req: HttpRequest,
397 ctx: Option<HttpTenantCtx>,
398 ) -> Result<HttpResponse, HttpClientError> {
399 self.send_http_request(req, None, ctx)
400 }
401}
402
403impl HttpClientHostV1_1 for HostState {
404 fn send(
405 &mut self,
406 req: HttpRequestV1_1,
407 opts: Option<HttpRequestOptionsV1_1>,
408 ctx: Option<HttpTenantCtxV1_1>,
409 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
410 let legacy_req = HttpRequest {
411 method: req.method,
412 url: req.url,
413 headers: req.headers,
414 body: req.body,
415 };
416 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
417 env: ctx.env,
418 tenant: ctx.tenant,
419 tenant_id: ctx.tenant_id,
420 team: ctx.team,
421 team_id: ctx.team_id,
422 user: ctx.user,
423 user_id: ctx.user_id,
424 trace_id: ctx.trace_id,
425 correlation_id: ctx.correlation_id,
426 attributes: ctx.attributes,
427 session_id: ctx.session_id,
428 flow_id: ctx.flow_id,
429 node_id: ctx.node_id,
430 provider_id: ctx.provider_id,
431 deadline_ms: ctx.deadline_ms,
432 attempt: ctx.attempt,
433 idempotency_key: ctx.idempotency_key,
434 impersonation: ctx
435 .impersonation
436 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
437 actor_id,
438 reason,
439 }),
440 });
441
442 self.send_http_request(legacy_req, opts, legacy_ctx)
443 .map(|resp| HttpResponseV1_1 {
444 status: resp.status,
445 headers: resp.headers,
446 body: resp.body,
447 })
448 .map_err(|err| HttpClientErrorV1_1 {
449 code: err.code,
450 message: err.message,
451 })
452 }
453}
454
455impl StateStoreHost for HostState {
456 fn read(
457 &mut self,
458 key: HostStateKey,
459 ctx: Option<StateTenantCtx>,
460 ) -> Result<Vec<u8>, StateError> {
461 let store = match self.state_store.as_ref() {
462 Some(store) => store.clone(),
463 None => {
464 return Err(StateError {
465 code: "unavailable".into(),
466 message: "state store not configured".into(),
467 });
468 }
469 };
470 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
471 Ok(ctx) => ctx,
472 Err(err) => {
473 return Err(StateError {
474 code: "invalid-ctx".into(),
475 message: err.to_string(),
476 });
477 }
478 };
479 let key = StoreStateKey::from(key);
480 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
481 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
482 Ok(None) => Err(StateError {
483 code: "not_found".into(),
484 message: "state key not found".into(),
485 }),
486 Err(err) => Err(StateError {
487 code: "internal".into(),
488 message: err.to_string(),
489 }),
490 }
491 }
492
493 fn write(
494 &mut self,
495 key: HostStateKey,
496 bytes: Vec<u8>,
497 ctx: Option<StateTenantCtx>,
498 ) -> Result<StateOpAck, StateError> {
499 let store = match self.state_store.as_ref() {
500 Some(store) => store.clone(),
501 None => {
502 return Err(StateError {
503 code: "unavailable".into(),
504 message: "state store not configured".into(),
505 });
506 }
507 };
508 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
509 Ok(ctx) => ctx,
510 Err(err) => {
511 return Err(StateError {
512 code: "invalid-ctx".into(),
513 message: err.to_string(),
514 });
515 }
516 };
517 let key = StoreStateKey::from(key);
518 let value = serde_json::from_slice(&bytes)
519 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
520 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
521 Ok(()) => Ok(StateOpAck::Ok),
522 Err(err) => Err(StateError {
523 code: "internal".into(),
524 message: err.to_string(),
525 }),
526 }
527 }
528
529 fn delete(
530 &mut self,
531 key: HostStateKey,
532 ctx: Option<StateTenantCtx>,
533 ) -> Result<StateOpAck, StateError> {
534 let store = match self.state_store.as_ref() {
535 Some(store) => store.clone(),
536 None => {
537 return Err(StateError {
538 code: "unavailable".into(),
539 message: "state store not configured".into(),
540 });
541 }
542 };
543 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
544 Ok(ctx) => ctx,
545 Err(err) => {
546 return Err(StateError {
547 code: "invalid-ctx".into(),
548 message: err.to_string(),
549 });
550 }
551 };
552 let key = StoreStateKey::from(key);
553 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
554 Ok(_) => Ok(StateOpAck::Ok),
555 Err(err) => Err(StateError {
556 code: "internal".into(),
557 message: err.to_string(),
558 }),
559 }
560 }
561}
562
563impl TelemetryLoggerHost for HostState {
564 fn log(
565 &mut self,
566 span: TelemetrySpanContext,
567 fields: Vec<(String, String)>,
568 _ctx: Option<TelemetryTenantCtx>,
569 ) -> Result<TelemetryAck, TelemetryError> {
570 if let Some(mock) = &self.mocks
571 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
572 {
573 return Ok(TelemetryAck::Ok);
574 }
575 let mut map = serde_json::Map::new();
576 for (k, v) in fields {
577 map.insert(k, Value::String(v));
578 }
579 tracing::info!(
580 tenant = %span.tenant,
581 flow_id = %span.flow_id,
582 node = ?span.node_id,
583 provider = %span.provider,
584 fields = %serde_json::Value::Object(map.clone()),
585 "telemetry log from pack"
586 );
587 Ok(TelemetryAck::Ok)
588 }
589}
590
591impl RunnerHostHttp for HostState {
592 fn request(
593 &mut self,
594 method: String,
595 url: String,
596 headers: Vec<String>,
597 body: Option<Vec<u8>>,
598 ) -> Result<Vec<u8>, String> {
599 let req = HttpRequest {
600 method,
601 url,
602 headers: headers
603 .chunks(2)
604 .filter_map(|chunk| {
605 if chunk.len() == 2 {
606 Some((chunk[0].clone(), chunk[1].clone()))
607 } else {
608 None
609 }
610 })
611 .collect(),
612 body,
613 };
614 match HttpClientHost::send(self, req, None) {
615 Ok(resp) => Ok(resp.body.unwrap_or_default()),
616 Err(err) => Err(err.message),
617 }
618 }
619}
620
621impl RunnerHostKv for HostState {
622 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
623 None
624 }
625
626 fn put(&mut self, _ns: String, _key: String, _val: String) {}
627}
628
629impl MessagingSessionHost for HostState {
630 fn send(
631 &mut self,
632 _message: OutboundMessage,
633 _ctx: MessagingTenantCtx,
634 ) -> Result<MessagingAck, MessagingError> {
635 Err(MessagingError {
636 code: "unimplemented".into(),
637 message: "messaging session host not wired".into(),
638 })
639 }
640}
641
642enum ManifestLoad {
643 New {
644 manifest: Box<greentic_types::PackManifest>,
645 flows: PackFlows,
646 },
647 Legacy {
648 manifest: Box<legacy_pack::PackManifest>,
649 flows: PackFlows,
650 },
651}
652
653fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
654 let mut archive = ZipArchive::new(File::open(path)?)
655 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
656 let bytes = read_entry(&mut archive, "manifest.cbor")
657 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
658 match decode_pack_manifest(&bytes) {
659 Ok(manifest) => {
660 let cache = PackFlows::from_manifest(manifest.clone());
661 Ok(ManifestLoad::New {
662 manifest: Box::new(manifest),
663 flows: cache,
664 })
665 }
666 Err(err) => {
667 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
668 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
670 .context("failed to decode legacy pack manifest from manifest.cbor")?;
671 let flows = load_legacy_flows(&mut archive, &legacy)?;
672 Ok(ManifestLoad::Legacy {
673 manifest: Box::new(legacy),
674 flows,
675 })
676 }
677 }
678}
679
680fn load_legacy_flows(
681 archive: &mut ZipArchive<File>,
682 manifest: &legacy_pack::PackManifest,
683) -> Result<PackFlows> {
684 let mut flows = HashMap::new();
685 let mut descriptors = Vec::new();
686
687 for entry in &manifest.flows {
688 let bytes = read_entry(archive, &entry.file_json)
689 .with_context(|| format!("missing flow json {}", entry.file_json))?;
690 let doc: FlowDoc = serde_json::from_slice(&bytes)
691 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
692 let normalized = normalize_flow_doc(doc);
693 let flow_ir = flow_doc_to_ir(normalized)?;
694 let flow = flow_ir_to_flow(flow_ir)?;
695
696 descriptors.push(FlowDescriptor {
697 id: entry.id.clone(),
698 flow_type: entry.kind.clone(),
699 profile: manifest.meta.pack_id.clone(),
700 version: manifest.meta.version.to_string(),
701 description: None,
702 });
703 flows.insert(entry.id.clone(), flow);
704 }
705
706 let mut entry_flows = manifest.meta.entry_flows.clone();
707 if entry_flows.is_empty() {
708 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
709 }
710 let metadata = PackMetadata {
711 pack_id: manifest.meta.pack_id.clone(),
712 version: manifest.meta.version.to_string(),
713 entry_flows,
714 secret_requirements: Vec::new(),
715 };
716
717 Ok(PackFlows {
718 descriptors,
719 flows,
720 metadata,
721 })
722}
723
724pub struct ComponentState {
725 pub host: HostState,
726 wasi_ctx: WasiCtx,
727 resource_table: ResourceTable,
728}
729
730impl ComponentState {
731 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
732 let wasi_ctx = policy
733 .instantiate()
734 .context("failed to build WASI context")?;
735 Ok(Self {
736 host,
737 wasi_ctx,
738 resource_table: ResourceTable::new(),
739 })
740 }
741
742 fn host_mut(&mut self) -> &mut HostState {
743 &mut self.host
744 }
745}
746
747impl control::Host for ComponentState {
748 fn should_cancel(&mut self) -> bool {
749 false
750 }
751
752 fn yield_now(&mut self) {
753 }
755}
756
757fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
758 let mut inst = linker.instance("greentic:component/control@0.4.0")?;
759 inst.func_wrap(
760 "should-cancel",
761 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
762 let host = caller.data_mut();
763 Ok((ComponentControlHost::should_cancel(host),))
764 },
765 )?;
766 inst.func_wrap(
767 "yield-now",
768 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
769 let host = caller.data_mut();
770 ComponentControlHost::yield_now(host);
771 Ok(())
772 },
773 )?;
774 Ok(())
775}
776
777pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
778 add_wasi_to_linker(linker)?;
779 add_all_v1_to_linker(
780 linker,
781 HostFns {
782 http_client_v1_1: Some(|state| state.host_mut()),
783 http_client: Some(|state| state.host_mut()),
784 oauth_broker: None,
785 runner_host_http: Some(|state| state.host_mut()),
786 runner_host_kv: Some(|state| state.host_mut()),
787 messaging_session: Some(|state| state.host_mut()),
788 telemetry_logger: Some(|state| state.host_mut()),
789 state_store: Some(|state| state.host_mut()),
790 secrets_store: Some(|state| state.host_mut()),
791 },
792 )?;
793 Ok(())
794}
795
796impl OAuthHostContext for ComponentState {
797 fn tenant_id(&self) -> &str {
798 &self.host.config.tenant
799 }
800
801 fn env(&self) -> &str {
802 &self.host.default_env
803 }
804
805 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
806 &mut self.host.oauth_host
807 }
808
809 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
810 self.host.oauth_config.as_ref()
811 }
812}
813
814impl WasiView for ComponentState {
815 fn ctx(&mut self) -> WasiCtxView<'_> {
816 WasiCtxView {
817 ctx: &mut self.wasi_ctx,
818 table: &mut self.resource_table,
819 }
820 }
821}
822
823#[allow(unsafe_code)]
824unsafe impl Send for ComponentState {}
825#[allow(unsafe_code)]
826unsafe impl Sync for ComponentState {}
827
828impl PackRuntime {
829 #[allow(clippy::too_many_arguments)]
830 pub async fn load(
831 path: impl AsRef<Path>,
832 config: Arc<HostConfig>,
833 mocks: Option<Arc<MockLayer>>,
834 archive_source: Option<&Path>,
835 session_store: Option<DynSessionStore>,
836 state_store: Option<DynStateStore>,
837 wasi_policy: Arc<RunnerWasiPolicy>,
838 secrets: DynSecretsManager,
839 oauth_config: Option<OAuthBrokerConfig>,
840 verify_archive: bool,
841 ) -> Result<Self> {
842 let path = path.as_ref();
843 let (_pack_root, safe_path) = normalize_pack_path(path)?;
844 let is_component = safe_path
845 .extension()
846 .and_then(|ext| ext.to_str())
847 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
848 .unwrap_or(false);
849 let archive_hint_path = if let Some(source) = archive_source {
850 let (_, normalized) = normalize_pack_path(source)?;
851 Some(normalized)
852 } else if is_component {
853 None
854 } else {
855 Some(safe_path.clone())
856 };
857 let archive_hint = archive_hint_path.as_deref();
858 if verify_archive {
859 let verify_target = archive_hint.unwrap_or(&safe_path);
860 verify::verify_pack(verify_target).await?;
861 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
862 }
863 let engine = Engine::default();
864 let wasm_bytes = fs::read(&safe_path).await?;
865 let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
866 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
867 let mut manifest = None;
868 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
869 let flows = if let Some(archive_path) = archive_hint {
870 match load_manifest_and_flows(archive_path) {
871 Ok(ManifestLoad::New {
872 manifest: m,
873 flows: cache,
874 }) => {
875 metadata = cache.metadata.clone();
876 manifest = Some(*m);
877 Some(cache)
878 }
879 Ok(ManifestLoad::Legacy {
880 manifest: m,
881 flows: cache,
882 }) => {
883 metadata = cache.metadata.clone();
884 legacy_manifest = Some(m);
885 Some(cache)
886 }
887 Err(err) => {
888 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
889 None
890 }
891 }
892 } else {
893 None
894 };
895 let components = if let Some(archive_path) = archive_hint {
896 if let Some(new_manifest) = manifest.as_ref() {
897 match load_components_from_archive(&engine, archive_path, Some(new_manifest)) {
898 Ok(map) => map,
899 Err(err) => {
900 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
901 HashMap::new()
902 }
903 }
904 } else if let Some(legacy) = legacy_manifest.as_ref() {
905 match load_legacy_components_from_archive(&engine, archive_path, legacy) {
906 Ok(map) => map,
907 Err(err) => {
908 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
909 HashMap::new()
910 }
911 }
912 } else {
913 HashMap::new()
914 }
915 } else if is_component {
916 let name = safe_path
917 .file_stem()
918 .map(|s| s.to_string_lossy().to_string())
919 .unwrap_or_else(|| "component".to_string());
920 let component = Component::from_binary(&engine, &wasm_bytes)?;
921 let mut map = HashMap::new();
922 map.insert(
923 name.clone(),
924 PackComponent {
925 name,
926 version: metadata.version.clone(),
927 component,
928 },
929 );
930 map
931 } else {
932 HashMap::new()
933 };
934 let http_client = Arc::clone(&HTTP_CLIENT);
935 Ok(Self {
936 path: safe_path,
937 archive_path: archive_hint.map(Path::to_path_buf),
938 config,
939 engine,
940 metadata,
941 manifest,
942 legacy_manifest,
943 mocks,
944 flows,
945 components,
946 http_client,
947 pre_cache: Mutex::new(HashMap::new()),
948 session_store,
949 state_store,
950 wasi_policy,
951 secrets,
952 oauth_config,
953 })
954 }
955
956 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
957 if let Some(cache) = &self.flows {
958 return Ok(cache.descriptors.clone());
959 }
960 if let Some(manifest) = &self.manifest {
961 let descriptors = manifest
962 .flows
963 .iter()
964 .map(|flow| FlowDescriptor {
965 id: flow.id.as_str().to_string(),
966 flow_type: flow_kind_to_str(flow.kind).to_string(),
967 profile: manifest.pack_id.as_str().to_string(),
968 version: manifest.version.to_string(),
969 description: None,
970 })
971 .collect();
972 return Ok(descriptors);
973 }
974 Ok(Vec::new())
975 }
976
977 #[allow(dead_code)]
978 pub async fn run_flow(
979 &self,
980 flow_id: &str,
981 input: serde_json::Value,
982 ) -> Result<serde_json::Value> {
983 let pack = Arc::new(
984 PackRuntime::load(
985 &self.path,
986 Arc::clone(&self.config),
987 self.mocks.clone(),
988 self.archive_path.as_deref(),
989 self.session_store.clone(),
990 self.state_store.clone(),
991 Arc::clone(&self.wasi_policy),
992 self.secrets.clone(),
993 self.oauth_config.clone(),
994 false,
995 )
996 .await?,
997 );
998
999 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1000 let retry_config = self.config.retry_config().into();
1001 let mocks = pack.mocks.as_deref();
1002 let tenant = self.config.tenant.as_str();
1003
1004 let ctx = FlowContext {
1005 tenant,
1006 flow_id,
1007 node_id: None,
1008 tool: None,
1009 action: None,
1010 session_id: None,
1011 provider_id: None,
1012 retry_config,
1013 observer: None,
1014 mocks,
1015 };
1016
1017 let execution = engine.execute(ctx, input).await?;
1018 match execution.status {
1019 FlowStatus::Completed => Ok(execution.output),
1020 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1021 "status": "pending",
1022 "reason": wait.reason,
1023 "resume": wait.snapshot,
1024 "response": execution.output,
1025 })),
1026 }
1027 }
1028
1029 pub async fn invoke_component(
1030 &self,
1031 component_ref: &str,
1032 ctx: ComponentExecCtx,
1033 operation: &str,
1034 _config_json: Option<String>,
1035 input_json: String,
1036 ) -> Result<Value> {
1037 let pack_component = self
1038 .components
1039 .get(component_ref)
1040 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1041
1042 let pre = if let Some(pre) = self.pre_cache.lock().get(component_ref).cloned() {
1043 pre
1044 } else {
1045 let mut linker = Linker::new(&self.engine);
1046 register_all(&mut linker)?;
1047 add_component_control_to_linker(&mut linker)?;
1048 let pre = ComponentPre::new(
1049 linker
1050 .instantiate_pre(&pack_component.component)
1051 .map_err(|err| anyhow!(err))?,
1052 )
1053 .map_err(|err| anyhow!(err))?;
1054 self.pre_cache
1055 .lock()
1056 .insert(component_ref.to_string(), pre.clone());
1057 pre
1058 };
1059
1060 let host_state = HostState::new(
1061 Arc::clone(&self.config),
1062 Arc::clone(&self.http_client),
1063 self.mocks.clone(),
1064 self.session_store.clone(),
1065 self.state_store.clone(),
1066 Arc::clone(&self.secrets),
1067 self.oauth_config.clone(),
1068 )?;
1069 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1070 let mut store = wasmtime::Store::new(&self.engine, store_state);
1071 let bindings = pre
1072 .instantiate_async(&mut store)
1073 .await
1074 .map_err(|err| anyhow!(err))?;
1075 let node = bindings.greentic_component_node();
1076
1077 let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1078
1079 match result {
1080 InvokeResult::Ok(body) => {
1081 if body.is_empty() {
1082 return Ok(Value::Null);
1083 }
1084 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1085 }
1086 InvokeResult::Err(NodeError {
1087 code,
1088 message,
1089 retryable,
1090 backoff_ms,
1091 details,
1092 }) => {
1093 let mut obj = serde_json::Map::new();
1094 obj.insert("ok".into(), Value::Bool(false));
1095 let mut error = serde_json::Map::new();
1096 error.insert("code".into(), Value::String(code));
1097 error.insert("message".into(), Value::String(message));
1098 error.insert("retryable".into(), Value::Bool(retryable));
1099 if let Some(backoff) = backoff_ms {
1100 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1101 }
1102 if let Some(details) = details {
1103 error.insert(
1104 "details".into(),
1105 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1106 );
1107 }
1108 obj.insert("error".into(), Value::Object(error));
1109 Ok(Value::Object(obj))
1110 }
1111 }
1112 }
1113
1114 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1115 if let Some(cache) = &self.flows {
1116 return cache
1117 .flows
1118 .get(flow_id)
1119 .cloned()
1120 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1121 }
1122 if let Some(manifest) = &self.manifest {
1123 let entry = manifest
1124 .flows
1125 .iter()
1126 .find(|f| f.id.as_str() == flow_id)
1127 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1128 return Ok(entry.flow.clone());
1129 }
1130 bail!("flow '{flow_id}' not available (pack exports disabled)")
1131 }
1132
1133 pub fn metadata(&self) -> &PackMetadata {
1134 &self.metadata
1135 }
1136
1137 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1138 &self.metadata.secret_requirements
1139 }
1140
1141 pub fn missing_secrets(
1142 &self,
1143 tenant_ctx: &TypesTenantCtx,
1144 ) -> Vec<greentic_types::SecretRequirement> {
1145 let env = tenant_ctx.env.as_str().to_string();
1146 let tenant = tenant_ctx.tenant.as_str().to_string();
1147 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1148 self.required_secrets()
1149 .iter()
1150 .filter(|req| {
1151 if let Some(scope) = &req.scope {
1153 if scope.env != env {
1154 return false;
1155 }
1156 if scope.tenant != tenant {
1157 return false;
1158 }
1159 if let Some(ref team_req) = scope.team
1160 && team.as_ref() != Some(team_req)
1161 {
1162 return false;
1163 }
1164 }
1165 read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1166 })
1167 .cloned()
1168 .collect()
1169 }
1170
1171 pub fn for_component_test(
1172 components: Vec<(String, PathBuf)>,
1173 flows: HashMap<String, FlowIR>,
1174 config: Arc<HostConfig>,
1175 ) -> Result<Self> {
1176 let engine = Engine::default();
1177 let mut component_map = HashMap::new();
1178 for (name, path) in components {
1179 if !path.exists() {
1180 bail!("component artifact missing: {}", path.display());
1181 }
1182 let wasm_bytes = std::fs::read(&path)?;
1183 let component = Component::from_binary(&engine, &wasm_bytes)
1184 .with_context(|| format!("failed to compile component {}", path.display()))?;
1185 component_map.insert(
1186 name.clone(),
1187 PackComponent {
1188 name,
1189 version: "0.0.0".into(),
1190 component,
1191 },
1192 );
1193 }
1194
1195 let mut flow_map = HashMap::new();
1196 let mut descriptors = Vec::new();
1197 for (id, ir) in flows {
1198 let flow_type = ir.flow_type.clone();
1199 let flow = flow_ir_to_flow(ir)?;
1200 flow_map.insert(id.clone(), flow);
1201 descriptors.push(FlowDescriptor {
1202 id: id.clone(),
1203 flow_type,
1204 profile: "test".into(),
1205 version: "0.0.0".into(),
1206 description: None,
1207 });
1208 }
1209 let flows_cache = PackFlows {
1210 descriptors: descriptors.clone(),
1211 flows: flow_map,
1212 metadata: PackMetadata::fallback(Path::new("component-test")),
1213 };
1214
1215 Ok(Self {
1216 path: PathBuf::new(),
1217 archive_path: None,
1218 config,
1219 engine,
1220 metadata: PackMetadata::fallback(Path::new("component-test")),
1221 manifest: None,
1222 legacy_manifest: None,
1223 mocks: None,
1224 flows: Some(flows_cache),
1225 components: component_map,
1226 http_client: Arc::clone(&HTTP_CLIENT),
1227 pre_cache: Mutex::new(HashMap::new()),
1228 session_store: None,
1229 state_store: None,
1230 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1231 secrets: crate::secrets::default_manager(),
1232 oauth_config: None,
1233 })
1234 }
1235}
1236
1237struct PackFlows {
1238 descriptors: Vec<FlowDescriptor>,
1239 flows: HashMap<String, Flow>,
1240 metadata: PackMetadata,
1241}
1242
1243impl PackFlows {
1244 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1245 let descriptors = manifest
1246 .flows
1247 .iter()
1248 .map(|entry| FlowDescriptor {
1249 id: entry.id.as_str().to_string(),
1250 flow_type: flow_kind_to_str(entry.kind).to_string(),
1251 profile: manifest.pack_id.as_str().to_string(),
1252 version: manifest.version.to_string(),
1253 description: None,
1254 })
1255 .collect();
1256 let mut flows = HashMap::new();
1257 for entry in &manifest.flows {
1258 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1259 }
1260 Self {
1261 metadata: PackMetadata::from_manifest(&manifest),
1262 descriptors,
1263 flows,
1264 }
1265 }
1266}
1267
1268fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1269 match kind {
1270 greentic_types::FlowKind::Messaging => "messaging",
1271 greentic_types::FlowKind::Event => "event",
1272 greentic_types::FlowKind::ComponentConfig => "component-config",
1273 greentic_types::FlowKind::Job => "job",
1274 greentic_types::FlowKind::Http => "http",
1275 }
1276}
1277
1278fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1279 let mut file = archive
1280 .by_name(name)
1281 .with_context(|| format!("entry {name} missing from archive"))?;
1282 let mut buf = Vec::new();
1283 file.read_to_end(&mut buf)?;
1284 Ok(buf)
1285}
1286
1287fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1288 for node in doc.nodes.values_mut() {
1289 if node.component.is_empty()
1290 && let Some((component_ref, payload)) = node.raw.iter().next()
1291 {
1292 if component_ref.starts_with("emit.") {
1293 node.component = component_ref.clone();
1294 node.payload = payload.clone();
1295 node.raw.clear();
1296 continue;
1297 }
1298 let (target_component, operation, input, config) =
1299 infer_component_exec(payload, component_ref);
1300 let mut payload_obj = serde_json::Map::new();
1301 payload_obj.insert("component".into(), Value::String(target_component));
1303 payload_obj.insert("operation".into(), Value::String(operation));
1304 payload_obj.insert("input".into(), input);
1305 if let Some(cfg) = config {
1306 payload_obj.insert("config".into(), cfg);
1307 }
1308 node.component = "component.exec".to_string();
1309 node.payload = Value::Object(payload_obj);
1310 }
1311 }
1312 doc
1313}
1314
1315fn infer_component_exec(
1316 payload: &Value,
1317 component_ref: &str,
1318) -> (String, String, Value, Option<Value>) {
1319 let default_op = if component_ref.starts_with("templating.") {
1320 "render"
1321 } else {
1322 "invoke"
1323 }
1324 .to_string();
1325
1326 if let Value::Object(map) = payload {
1327 let op = map
1328 .get("op")
1329 .or_else(|| map.get("operation"))
1330 .and_then(Value::as_str)
1331 .map(|s| s.to_string())
1332 .unwrap_or_else(|| default_op.clone());
1333
1334 let mut input = map.clone();
1335 let config = input.remove("config");
1336 let component = input
1337 .get("component")
1338 .or_else(|| input.get("component_ref"))
1339 .and_then(Value::as_str)
1340 .map(|s| s.to_string())
1341 .unwrap_or_else(|| component_ref.to_string());
1342 input.remove("component");
1343 input.remove("component_ref");
1344 input.remove("op");
1345 input.remove("operation");
1346 return (component, op, Value::Object(input), config);
1347 }
1348
1349 (component_ref.to_string(), default_op, payload.clone(), None)
1350}
1351
1352#[cfg(test)]
1353mod tests {
1354 use super::*;
1355 use greentic_flow::model::{FlowDoc, NodeDoc};
1356 use serde_json::json;
1357 use std::collections::BTreeMap;
1358
1359 #[test]
1360 fn normalizes_raw_component_to_component_exec() {
1361 let mut nodes = BTreeMap::new();
1362 let mut raw = BTreeMap::new();
1363 raw.insert(
1364 "templating.handlebars".into(),
1365 json!({ "template": "Hi {{name}}" }),
1366 );
1367 nodes.insert(
1368 "start".into(),
1369 NodeDoc {
1370 raw,
1371 routing: json!([{"out": true}]),
1372 ..Default::default()
1373 },
1374 );
1375 let doc = FlowDoc {
1376 id: "welcome".into(),
1377 title: None,
1378 description: None,
1379 flow_type: "messaging".into(),
1380 start: Some("start".into()),
1381 parameters: json!({}),
1382 tags: Vec::new(),
1383 entrypoints: BTreeMap::new(),
1384 nodes,
1385 };
1386
1387 let normalized = normalize_flow_doc(doc);
1388 let node = normalized.nodes.get("start").expect("node exists");
1389 assert_eq!(node.component, "component.exec");
1390 assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1391 let payload = node.payload.as_object().expect("payload object");
1392 assert_eq!(
1393 payload.get("component"),
1394 Some(&Value::String("templating.handlebars".into()))
1395 );
1396 assert_eq!(
1397 payload.get("operation"),
1398 Some(&Value::String("render".into()))
1399 );
1400 let input = payload.get("input").unwrap();
1401 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1402 }
1403}
1404
1405fn load_components_from_archive(
1406 engine: &Engine,
1407 path: &Path,
1408 manifest: Option<&greentic_types::PackManifest>,
1409) -> Result<HashMap<String, PackComponent>> {
1410 let mut archive = ZipArchive::new(File::open(path)?)
1411 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1412 let mut components = HashMap::new();
1413 if let Some(manifest) = manifest {
1414 for entry in &manifest.components {
1415 let file_name = format!("components/{}.wasm", entry.id.as_str());
1416 let bytes = read_entry(&mut archive, &file_name)
1417 .with_context(|| format!("missing component {}", file_name))?;
1418 let component = Component::from_binary(engine, &bytes)
1419 .with_context(|| format!("failed to compile component {}", entry.id.as_str()))?;
1420 components.insert(
1421 entry.id.as_str().to_string(),
1422 PackComponent {
1423 name: entry.id.as_str().to_string(),
1424 version: entry.version.to_string(),
1425 component,
1426 },
1427 );
1428 }
1429 }
1430 Ok(components)
1431}
1432
1433fn load_legacy_components_from_archive(
1434 engine: &Engine,
1435 path: &Path,
1436 manifest: &legacy_pack::PackManifest,
1437) -> Result<HashMap<String, PackComponent>> {
1438 let mut archive = ZipArchive::new(File::open(path)?)
1439 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1440 let mut components = HashMap::new();
1441 for entry in &manifest.components {
1442 let bytes = read_entry(&mut archive, &entry.file_wasm)
1443 .with_context(|| format!("missing component {}", entry.file_wasm))?;
1444 let component = Component::from_binary(engine, &bytes)
1445 .with_context(|| format!("failed to compile component {}", entry.name))?;
1446 components.insert(
1447 entry.name.clone(),
1448 PackComponent {
1449 name: entry.name.clone(),
1450 version: entry.version.to_string(),
1451 component,
1452 },
1453 );
1454 }
1455 Ok(components)
1456}
1457
1458#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1459pub struct PackMetadata {
1460 pub pack_id: String,
1461 pub version: String,
1462 #[serde(default)]
1463 pub entry_flows: Vec<String>,
1464 #[serde(default)]
1465 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
1466}
1467
1468impl PackMetadata {
1469 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1470 let parser = Parser::new(0);
1471 for payload in parser.parse_all(bytes) {
1472 let payload = payload.ok()?;
1473 match payload {
1474 Payload::CustomSection(section) => {
1475 if section.name() == "greentic.manifest"
1476 && let Ok(meta) = Self::from_bytes(section.data())
1477 {
1478 return Some(meta);
1479 }
1480 }
1481 Payload::DataSection(reader) => {
1482 for segment in reader.into_iter().flatten() {
1483 if let Ok(meta) = Self::from_bytes(segment.data) {
1484 return Some(meta);
1485 }
1486 }
1487 }
1488 _ => {}
1489 }
1490 }
1491 None
1492 }
1493
1494 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1495 #[derive(Deserialize)]
1496 struct RawManifest {
1497 pack_id: String,
1498 version: String,
1499 #[serde(default)]
1500 entry_flows: Vec<String>,
1501 #[serde(default)]
1502 flows: Vec<RawFlow>,
1503 #[serde(default)]
1504 secret_requirements: Vec<greentic_types::SecretRequirement>,
1505 }
1506
1507 #[derive(Deserialize)]
1508 struct RawFlow {
1509 id: String,
1510 }
1511
1512 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1513 let mut entry_flows = if manifest.entry_flows.is_empty() {
1514 manifest.flows.iter().map(|f| f.id.clone()).collect()
1515 } else {
1516 manifest.entry_flows.clone()
1517 };
1518 entry_flows.retain(|id| !id.is_empty());
1519 Ok(Self {
1520 pack_id: manifest.pack_id,
1521 version: manifest.version,
1522 entry_flows,
1523 secret_requirements: manifest.secret_requirements,
1524 })
1525 }
1526
1527 pub fn fallback(path: &Path) -> Self {
1528 let pack_id = path
1529 .file_stem()
1530 .map(|s| s.to_string_lossy().into_owned())
1531 .unwrap_or_else(|| "unknown-pack".to_string());
1532 Self {
1533 pack_id,
1534 version: "0.0.0".to_string(),
1535 entry_flows: Vec::new(),
1536 secret_requirements: Vec::new(),
1537 }
1538 }
1539
1540 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
1541 let entry_flows = manifest
1542 .flows
1543 .iter()
1544 .map(|flow| flow.id.as_str().to_string())
1545 .collect::<Vec<_>>();
1546 Self {
1547 pack_id: manifest.pack_id.as_str().to_string(),
1548 version: manifest.version.to_string(),
1549 entry_flows,
1550 secret_requirements: manifest.secret_requirements.clone(),
1551 }
1552 }
1553}