1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11 ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
20use greentic_interfaces_wasmtime::host_helpers::v1::{
21 self as host_v1, HostFns, add_all_v1_to_linker,
22 runner_host_http::RunnerHostHttp,
23 runner_host_kv::RunnerHostKv,
24 secrets_store::{SecretsError, SecretsStoreHost},
25 state_store::{
26 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
27 StateStoreHost, TenantCtx as StateTenantCtx,
28 },
29 telemetry_logger::{
30 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
31 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
32 TenantCtx as TelemetryTenantCtx,
33 },
34};
35use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
36use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
37use greentic_pack::builder as legacy_pack;
38use greentic_types::flow::FlowHasher;
39use greentic_types::{
40 ArtifactLocationV1, ComponentId, ComponentSourceRef, EXT_COMPONENT_SOURCES_V1, EnvId,
41 ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
42 NodeId, OutputMapping, Routing, StateKey as StoreStateKey, TeamId, TelemetryHints,
43 TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
44 pack_manifest::ExtensionInline,
45};
46use host_v1::http_client::{
47 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
48 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
49 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
50 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
51};
52use indexmap::IndexMap;
53use once_cell::sync::Lazy;
54use parking_lot::{Mutex, RwLock};
55use reqwest::blocking::Client as BlockingClient;
56use runner_core::normalize_under_root;
57use serde::{Deserialize, Serialize};
58use serde_cbor;
59use serde_json::{self, Value};
60use sha2::Digest;
61use tokio::fs;
62use wasmparser::{Parser, Payload};
63use wasmtime::StoreContextMut;
64use zip::ZipArchive;
65
66use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
67use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
68use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
69
70use crate::config::HostConfig;
71use crate::secrets::{DynSecretsManager, read_secret_blocking};
72use crate::storage::state::STATE_PREFIX;
73use crate::storage::{DynSessionStore, DynStateStore};
74use crate::verify;
75use crate::wasi::RunnerWasiPolicy;
76use tracing::warn;
77use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
78use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
79
80use greentic_flow::model::FlowDoc;
81
82#[allow(dead_code)]
83pub struct PackRuntime {
84 path: PathBuf,
86 archive_path: Option<PathBuf>,
88 config: Arc<HostConfig>,
89 engine: Engine,
90 metadata: PackMetadata,
91 manifest: Option<greentic_types::PackManifest>,
92 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
93 mocks: Option<Arc<MockLayer>>,
94 flows: Option<PackFlows>,
95 components: HashMap<String, PackComponent>,
96 http_client: Arc<BlockingClient>,
97 pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
98 session_store: Option<DynSessionStore>,
99 state_store: Option<DynStateStore>,
100 wasi_policy: Arc<RunnerWasiPolicy>,
101 provider_registry: RwLock<Option<ProviderRegistry>>,
102 secrets: DynSecretsManager,
103 oauth_config: Option<OAuthBrokerConfig>,
104}
105
106struct PackComponent {
107 #[allow(dead_code)]
108 name: String,
109 #[allow(dead_code)]
110 version: String,
111 component: Component,
112}
113
114#[derive(Debug, Default, Clone)]
115pub struct ComponentResolution {
116 pub materialized_root: Option<PathBuf>,
118 pub overrides: HashMap<String, PathBuf>,
120 pub dist_offline: bool,
122 pub dist_cache_dir: Option<PathBuf>,
124}
125
126fn build_blocking_client() -> BlockingClient {
127 std::thread::spawn(|| {
128 BlockingClient::builder()
129 .no_proxy()
130 .build()
131 .expect("blocking client")
132 })
133 .join()
134 .expect("client build thread panicked")
135}
136
137fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
138 let (root, candidate) = if path.is_absolute() {
139 let parent = path
140 .parent()
141 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
142 let root = parent
143 .canonicalize()
144 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
145 let file = path
146 .file_name()
147 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
148 (root, PathBuf::from(file))
149 } else {
150 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
151 let base = if let Some(parent) = path.parent() {
152 cwd.join(parent)
153 } else {
154 cwd
155 };
156 let root = base
157 .canonicalize()
158 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
159 let file = path
160 .file_name()
161 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
162 (root, PathBuf::from(file))
163 };
164 let safe = normalize_under_root(&root, &candidate)?;
165 Ok((root, safe))
166}
167
168static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct FlowDescriptor {
172 pub id: String,
173 #[serde(rename = "type")]
174 pub flow_type: String,
175 pub profile: String,
176 pub version: String,
177 #[serde(default)]
178 pub description: Option<String>,
179}
180
181pub struct HostState {
182 config: Arc<HostConfig>,
183 http_client: Arc<BlockingClient>,
184 default_env: String,
185 #[allow(dead_code)]
186 session_store: Option<DynSessionStore>,
187 state_store: Option<DynStateStore>,
188 mocks: Option<Arc<MockLayer>>,
189 secrets: DynSecretsManager,
190 oauth_config: Option<OAuthBrokerConfig>,
191 oauth_host: OAuthBrokerHost,
192}
193
194impl HostState {
195 #[allow(clippy::default_constructed_unit_structs)]
196 pub fn new(
197 config: Arc<HostConfig>,
198 http_client: Arc<BlockingClient>,
199 mocks: Option<Arc<MockLayer>>,
200 session_store: Option<DynSessionStore>,
201 state_store: Option<DynStateStore>,
202 secrets: DynSecretsManager,
203 oauth_config: Option<OAuthBrokerConfig>,
204 ) -> Result<Self> {
205 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
206 Ok(Self {
207 config,
208 http_client,
209 default_env,
210 session_store,
211 state_store,
212 mocks,
213 secrets,
214 oauth_config,
215 oauth_host: OAuthBrokerHost::default(),
216 })
217 }
218
219 pub fn get_secret(&self, key: &str) -> Result<String> {
220 if provider_core_only::is_enabled() {
221 bail!(provider_core_only::blocked_message("secrets"))
222 }
223 if !self.config.secrets_policy.is_allowed(key) {
224 bail!("secret {key} is not permitted by bindings policy");
225 }
226 if let Some(mock) = &self.mocks
227 && let Some(value) = mock.secrets_lookup(key)
228 {
229 return Ok(value);
230 }
231 let bytes = read_secret_blocking(&self.secrets, key)
232 .context("failed to read secret from manager")?;
233 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
234 Ok(value)
235 }
236
237 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
238 let tenant_raw = ctx
239 .as_ref()
240 .map(|ctx| ctx.tenant.clone())
241 .unwrap_or_else(|| self.config.tenant.clone());
242 let env_raw = ctx
243 .as_ref()
244 .map(|ctx| ctx.env.clone())
245 .unwrap_or_else(|| self.default_env.clone());
246 let tenant_id = TenantId::from_str(&tenant_raw)
247 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
248 let env_id = EnvId::from_str(&env_raw)
249 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
250 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
251 if let Some(ctx) = ctx {
252 if let Some(team) = ctx.team.or(ctx.team_id) {
253 let team_id =
254 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
255 tenant_ctx = tenant_ctx.with_team(Some(team_id));
256 }
257 if let Some(user) = ctx.user.or(ctx.user_id) {
258 let user_id =
259 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
260 tenant_ctx = tenant_ctx.with_user(Some(user_id));
261 }
262 if let Some(flow) = ctx.flow_id {
263 tenant_ctx = tenant_ctx.with_flow(flow);
264 }
265 if let Some(node) = ctx.node_id {
266 tenant_ctx = tenant_ctx.with_node(node);
267 }
268 if let Some(provider) = ctx.provider_id {
269 tenant_ctx = tenant_ctx.with_provider(provider);
270 }
271 if let Some(session) = ctx.session_id {
272 tenant_ctx = tenant_ctx.with_session(session);
273 }
274 tenant_ctx.trace_id = ctx.trace_id;
275 }
276 Ok(tenant_ctx)
277 }
278
279 fn send_http_request(
280 &mut self,
281 req: HttpRequest,
282 opts: Option<HttpRequestOptionsV1_1>,
283 _ctx: Option<HttpTenantCtx>,
284 ) -> Result<HttpResponse, HttpClientError> {
285 if !self.config.http_enabled {
286 return Err(HttpClientError {
287 code: "denied".into(),
288 message: "http client disabled by policy".into(),
289 });
290 }
291
292 let mut mock_state = None;
293 let raw_body = req.body.clone();
294 if let Some(mock) = &self.mocks
295 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
296 {
297 match mock.http_begin(&meta) {
298 HttpDecision::Mock(response) => {
299 let headers = response
300 .headers
301 .iter()
302 .map(|(k, v)| (k.clone(), v.clone()))
303 .collect();
304 return Ok(HttpResponse {
305 status: response.status,
306 headers,
307 body: response.body.clone().map(|b| b.into_bytes()),
308 });
309 }
310 HttpDecision::Deny(reason) => {
311 return Err(HttpClientError {
312 code: "denied".into(),
313 message: reason,
314 });
315 }
316 HttpDecision::Passthrough { record } => {
317 mock_state = Some((meta, record));
318 }
319 }
320 }
321
322 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
323 let mut builder = self.http_client.request(method, &req.url);
324 for (key, value) in req.headers {
325 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
326 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
327 {
328 builder = builder.header(header, header_value);
329 }
330 }
331
332 if let Some(body) = raw_body.clone() {
333 builder = builder.body(body);
334 }
335
336 if let Some(opts) = opts {
337 if let Some(timeout_ms) = opts.timeout_ms {
338 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
339 }
340 if opts.allow_insecure == Some(true) {
341 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
342 }
343 if let Some(follow_redirects) = opts.follow_redirects
344 && !follow_redirects
345 {
346 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
347 }
348 }
349
350 let response = match builder.send() {
351 Ok(resp) => resp,
352 Err(err) => {
353 warn!(url = %req.url, error = %err, "http client request failed");
354 return Err(HttpClientError {
355 code: "unavailable".into(),
356 message: err.to_string(),
357 });
358 }
359 };
360
361 let status = response.status().as_u16();
362 let headers_vec = response
363 .headers()
364 .iter()
365 .map(|(k, v)| {
366 (
367 k.as_str().to_string(),
368 v.to_str().unwrap_or_default().to_string(),
369 )
370 })
371 .collect::<Vec<_>>();
372 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
373
374 if let Some((meta, true)) = mock_state.take()
375 && let Some(mock) = &self.mocks
376 {
377 let recorded = HttpMockResponse::new(
378 status,
379 headers_vec.clone().into_iter().collect(),
380 body_bytes
381 .as_ref()
382 .map(|b| String::from_utf8_lossy(b).into_owned()),
383 );
384 mock.http_record(&meta, &recorded);
385 }
386
387 Ok(HttpResponse {
388 status,
389 headers: headers_vec,
390 body: body_bytes,
391 })
392 }
393}
394
395impl SecretsStoreHost for HostState {
396 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
397 if provider_core_only::is_enabled() {
398 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
399 return Err(SecretsError::Denied);
400 }
401 if !self.config.secrets_policy.is_allowed(&key) {
402 return Err(SecretsError::Denied);
403 }
404 if let Some(mock) = &self.mocks
405 && let Some(value) = mock.secrets_lookup(&key)
406 {
407 return Ok(Some(value.into_bytes()));
408 }
409 match read_secret_blocking(&self.secrets, &key) {
410 Ok(bytes) => Ok(Some(bytes)),
411 Err(err) => {
412 warn!(secret = %key, error = %err, "secret lookup failed");
413 Err(SecretsError::NotFound)
414 }
415 }
416 }
417}
418
419impl HttpClientHost for HostState {
420 fn send(
421 &mut self,
422 req: HttpRequest,
423 ctx: Option<HttpTenantCtx>,
424 ) -> Result<HttpResponse, HttpClientError> {
425 self.send_http_request(req, None, ctx)
426 }
427}
428
429impl HttpClientHostV1_1 for HostState {
430 fn send(
431 &mut self,
432 req: HttpRequestV1_1,
433 opts: Option<HttpRequestOptionsV1_1>,
434 ctx: Option<HttpTenantCtxV1_1>,
435 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
436 let legacy_req = HttpRequest {
437 method: req.method,
438 url: req.url,
439 headers: req.headers,
440 body: req.body,
441 };
442 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
443 env: ctx.env,
444 tenant: ctx.tenant,
445 tenant_id: ctx.tenant_id,
446 team: ctx.team,
447 team_id: ctx.team_id,
448 user: ctx.user,
449 user_id: ctx.user_id,
450 trace_id: ctx.trace_id,
451 correlation_id: ctx.correlation_id,
452 attributes: ctx.attributes,
453 session_id: ctx.session_id,
454 flow_id: ctx.flow_id,
455 node_id: ctx.node_id,
456 provider_id: ctx.provider_id,
457 deadline_ms: ctx.deadline_ms,
458 attempt: ctx.attempt,
459 idempotency_key: ctx.idempotency_key,
460 impersonation: ctx
461 .impersonation
462 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
463 actor_id,
464 reason,
465 }),
466 });
467
468 self.send_http_request(legacy_req, opts, legacy_ctx)
469 .map(|resp| HttpResponseV1_1 {
470 status: resp.status,
471 headers: resp.headers,
472 body: resp.body,
473 })
474 .map_err(|err| HttpClientErrorV1_1 {
475 code: err.code,
476 message: err.message,
477 })
478 }
479}
480
481impl StateStoreHost for HostState {
482 fn read(
483 &mut self,
484 key: HostStateKey,
485 ctx: Option<StateTenantCtx>,
486 ) -> Result<Vec<u8>, StateError> {
487 let store = match self.state_store.as_ref() {
488 Some(store) => store.clone(),
489 None => {
490 return Err(StateError {
491 code: "unavailable".into(),
492 message: "state store not configured".into(),
493 });
494 }
495 };
496 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
497 Ok(ctx) => ctx,
498 Err(err) => {
499 return Err(StateError {
500 code: "invalid-ctx".into(),
501 message: err.to_string(),
502 });
503 }
504 };
505 let key = StoreStateKey::from(key);
506 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
507 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
508 Ok(None) => Err(StateError {
509 code: "not_found".into(),
510 message: "state key not found".into(),
511 }),
512 Err(err) => Err(StateError {
513 code: "internal".into(),
514 message: err.to_string(),
515 }),
516 }
517 }
518
519 fn write(
520 &mut self,
521 key: HostStateKey,
522 bytes: Vec<u8>,
523 ctx: Option<StateTenantCtx>,
524 ) -> Result<StateOpAck, StateError> {
525 let store = match self.state_store.as_ref() {
526 Some(store) => store.clone(),
527 None => {
528 return Err(StateError {
529 code: "unavailable".into(),
530 message: "state store not configured".into(),
531 });
532 }
533 };
534 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
535 Ok(ctx) => ctx,
536 Err(err) => {
537 return Err(StateError {
538 code: "invalid-ctx".into(),
539 message: err.to_string(),
540 });
541 }
542 };
543 let key = StoreStateKey::from(key);
544 let value = serde_json::from_slice(&bytes)
545 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
546 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
547 Ok(()) => Ok(StateOpAck::Ok),
548 Err(err) => Err(StateError {
549 code: "internal".into(),
550 message: err.to_string(),
551 }),
552 }
553 }
554
555 fn delete(
556 &mut self,
557 key: HostStateKey,
558 ctx: Option<StateTenantCtx>,
559 ) -> Result<StateOpAck, StateError> {
560 let store = match self.state_store.as_ref() {
561 Some(store) => store.clone(),
562 None => {
563 return Err(StateError {
564 code: "unavailable".into(),
565 message: "state store not configured".into(),
566 });
567 }
568 };
569 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
570 Ok(ctx) => ctx,
571 Err(err) => {
572 return Err(StateError {
573 code: "invalid-ctx".into(),
574 message: err.to_string(),
575 });
576 }
577 };
578 let key = StoreStateKey::from(key);
579 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
580 Ok(_) => Ok(StateOpAck::Ok),
581 Err(err) => Err(StateError {
582 code: "internal".into(),
583 message: err.to_string(),
584 }),
585 }
586 }
587}
588
589impl TelemetryLoggerHost for HostState {
590 fn log(
591 &mut self,
592 span: TelemetrySpanContext,
593 fields: Vec<(String, String)>,
594 _ctx: Option<TelemetryTenantCtx>,
595 ) -> Result<TelemetryAck, TelemetryError> {
596 if let Some(mock) = &self.mocks
597 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
598 {
599 return Ok(TelemetryAck::Ok);
600 }
601 let mut map = serde_json::Map::new();
602 for (k, v) in fields {
603 map.insert(k, Value::String(v));
604 }
605 tracing::info!(
606 tenant = %span.tenant,
607 flow_id = %span.flow_id,
608 node = ?span.node_id,
609 provider = %span.provider,
610 fields = %serde_json::Value::Object(map.clone()),
611 "telemetry log from pack"
612 );
613 Ok(TelemetryAck::Ok)
614 }
615}
616
617impl RunnerHostHttp for HostState {
618 fn request(
619 &mut self,
620 method: String,
621 url: String,
622 headers: Vec<String>,
623 body: Option<Vec<u8>>,
624 ) -> Result<Vec<u8>, String> {
625 let req = HttpRequest {
626 method,
627 url,
628 headers: headers
629 .chunks(2)
630 .filter_map(|chunk| {
631 if chunk.len() == 2 {
632 Some((chunk[0].clone(), chunk[1].clone()))
633 } else {
634 None
635 }
636 })
637 .collect(),
638 body,
639 };
640 match HttpClientHost::send(self, req, None) {
641 Ok(resp) => Ok(resp.body.unwrap_or_default()),
642 Err(err) => Err(err.message),
643 }
644 }
645}
646
647impl RunnerHostKv for HostState {
648 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
649 None
650 }
651
652 fn put(&mut self, _ns: String, _key: String, _val: String) {}
653}
654
655enum ManifestLoad {
656 New {
657 manifest: Box<greentic_types::PackManifest>,
658 flows: PackFlows,
659 },
660 Legacy {
661 manifest: Box<legacy_pack::PackManifest>,
662 flows: PackFlows,
663 },
664}
665
666fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
667 let mut archive = ZipArchive::new(File::open(path)?)
668 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
669 let bytes = read_entry(&mut archive, "manifest.cbor")
670 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
671 match decode_pack_manifest(&bytes) {
672 Ok(manifest) => {
673 let cache = PackFlows::from_manifest(manifest.clone());
674 Ok(ManifestLoad::New {
675 manifest: Box::new(manifest),
676 flows: cache,
677 })
678 }
679 Err(err) => {
680 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
681 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
683 .context("failed to decode legacy pack manifest from manifest.cbor")?;
684 let flows = load_legacy_flows(&mut archive, &legacy)?;
685 Ok(ManifestLoad::Legacy {
686 manifest: Box::new(legacy),
687 flows,
688 })
689 }
690 }
691}
692
693fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
694 let manifest_path = root.join("manifest.cbor");
695 let bytes = std::fs::read(&manifest_path)
696 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
697 match decode_pack_manifest(&bytes) {
698 Ok(manifest) => {
699 let cache = PackFlows::from_manifest(manifest.clone());
700 Ok(ManifestLoad::New {
701 manifest: Box::new(manifest),
702 flows: cache,
703 })
704 }
705 Err(err) => {
706 tracing::debug!(
707 error = %err,
708 pack = %root.display(),
709 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
710 );
711 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
712 .context("failed to decode legacy pack manifest from manifest.cbor")?;
713 let flows = load_legacy_flows_from_dir(root, &legacy)?;
714 Ok(ManifestLoad::Legacy {
715 manifest: Box::new(legacy),
716 flows,
717 })
718 }
719 }
720}
721
722fn load_legacy_flows(
723 archive: &mut ZipArchive<File>,
724 manifest: &legacy_pack::PackManifest,
725) -> Result<PackFlows> {
726 let mut flows = HashMap::new();
727 let mut descriptors = Vec::new();
728
729 for entry in &manifest.flows {
730 let bytes = read_entry(archive, &entry.file_json)
731 .with_context(|| format!("missing flow json {}", entry.file_json))?;
732 let doc: FlowDoc = serde_json::from_slice(&bytes)
733 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
734 let normalized = normalize_flow_doc(doc);
735 let flow_ir = flow_doc_to_ir(normalized)?;
736 let flow = flow_ir_to_flow(flow_ir)?;
737
738 descriptors.push(FlowDescriptor {
739 id: entry.id.clone(),
740 flow_type: entry.kind.clone(),
741 profile: manifest.meta.pack_id.clone(),
742 version: manifest.meta.version.to_string(),
743 description: None,
744 });
745 flows.insert(entry.id.clone(), flow);
746 }
747
748 let mut entry_flows = manifest.meta.entry_flows.clone();
749 if entry_flows.is_empty() {
750 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
751 }
752 let metadata = PackMetadata {
753 pack_id: manifest.meta.pack_id.clone(),
754 version: manifest.meta.version.to_string(),
755 entry_flows,
756 secret_requirements: Vec::new(),
757 };
758
759 Ok(PackFlows {
760 descriptors,
761 flows,
762 metadata,
763 })
764}
765
766fn load_legacy_flows_from_dir(
767 root: &Path,
768 manifest: &legacy_pack::PackManifest,
769) -> Result<PackFlows> {
770 let mut flows = HashMap::new();
771 let mut descriptors = Vec::new();
772
773 for entry in &manifest.flows {
774 let path = root.join(&entry.file_json);
775 let bytes = std::fs::read(&path)
776 .with_context(|| format!("missing flow json {}", path.display()))?;
777 let doc: FlowDoc = serde_json::from_slice(&bytes)
778 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
779 let normalized = normalize_flow_doc(doc);
780 let flow_ir = flow_doc_to_ir(normalized)?;
781 let flow = flow_ir_to_flow(flow_ir)?;
782
783 descriptors.push(FlowDescriptor {
784 id: entry.id.clone(),
785 flow_type: entry.kind.clone(),
786 profile: manifest.meta.pack_id.clone(),
787 version: manifest.meta.version.to_string(),
788 description: None,
789 });
790 flows.insert(entry.id.clone(), flow);
791 }
792
793 let mut entry_flows = manifest.meta.entry_flows.clone();
794 if entry_flows.is_empty() {
795 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
796 }
797 let metadata = PackMetadata {
798 pack_id: manifest.meta.pack_id.clone(),
799 version: manifest.meta.version.to_string(),
800 entry_flows,
801 secret_requirements: Vec::new(),
802 };
803
804 Ok(PackFlows {
805 descriptors,
806 flows,
807 metadata,
808 })
809}
810
811pub struct ComponentState {
812 pub host: HostState,
813 wasi_ctx: WasiCtx,
814 resource_table: ResourceTable,
815}
816
817impl ComponentState {
818 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
819 let wasi_ctx = policy
820 .instantiate()
821 .context("failed to build WASI context")?;
822 Ok(Self {
823 host,
824 wasi_ctx,
825 resource_table: ResourceTable::new(),
826 })
827 }
828
829 fn host_mut(&mut self) -> &mut HostState {
830 &mut self.host
831 }
832}
833
834impl control::Host for ComponentState {
835 fn should_cancel(&mut self) -> bool {
836 false
837 }
838
839 fn yield_now(&mut self) {
840 }
842}
843
844fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
845 let mut inst = linker.instance("greentic:component/control@0.4.0")?;
846 inst.func_wrap(
847 "should-cancel",
848 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
849 let host = caller.data_mut();
850 Ok((ComponentControlHost::should_cancel(host),))
851 },
852 )?;
853 inst.func_wrap(
854 "yield-now",
855 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
856 let host = caller.data_mut();
857 ComponentControlHost::yield_now(host);
858 Ok(())
859 },
860 )?;
861 Ok(())
862}
863
864pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
865 add_wasi_to_linker(linker)?;
866 add_all_v1_to_linker(
867 linker,
868 HostFns {
869 http_client_v1_1: Some(|state| state.host_mut()),
870 http_client: Some(|state| state.host_mut()),
871 oauth_broker: None,
872 runner_host_http: Some(|state| state.host_mut()),
873 runner_host_kv: Some(|state| state.host_mut()),
874 telemetry_logger: Some(|state| state.host_mut()),
875 state_store: Some(|state| state.host_mut()),
876 secrets_store: Some(|state| state.host_mut()),
877 },
878 )?;
879 Ok(())
880}
881
882impl OAuthHostContext for ComponentState {
883 fn tenant_id(&self) -> &str {
884 &self.host.config.tenant
885 }
886
887 fn env(&self) -> &str {
888 &self.host.default_env
889 }
890
891 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
892 &mut self.host.oauth_host
893 }
894
895 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
896 self.host.oauth_config.as_ref()
897 }
898}
899
900impl WasiView for ComponentState {
901 fn ctx(&mut self) -> WasiCtxView<'_> {
902 WasiCtxView {
903 ctx: &mut self.wasi_ctx,
904 table: &mut self.resource_table,
905 }
906 }
907}
908
909#[allow(unsafe_code)]
910unsafe impl Send for ComponentState {}
911#[allow(unsafe_code)]
912unsafe impl Sync for ComponentState {}
913
914impl PackRuntime {
915 #[allow(clippy::too_many_arguments)]
916 pub async fn load(
917 path: impl AsRef<Path>,
918 config: Arc<HostConfig>,
919 mocks: Option<Arc<MockLayer>>,
920 archive_source: Option<&Path>,
921 session_store: Option<DynSessionStore>,
922 state_store: Option<DynStateStore>,
923 wasi_policy: Arc<RunnerWasiPolicy>,
924 secrets: DynSecretsManager,
925 oauth_config: Option<OAuthBrokerConfig>,
926 verify_archive: bool,
927 component_resolution: ComponentResolution,
928 ) -> Result<Self> {
929 let path = path.as_ref();
930 let (_pack_root, safe_path) = normalize_pack_path(path)?;
931 let path_meta = std::fs::metadata(&safe_path).ok();
932 let is_dir = path_meta
933 .as_ref()
934 .map(|meta| meta.is_dir())
935 .unwrap_or(false);
936 let is_component = !is_dir
937 && safe_path
938 .extension()
939 .and_then(|ext| ext.to_str())
940 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
941 .unwrap_or(false);
942 let archive_hint_path = if let Some(source) = archive_source {
943 let (_, normalized) = normalize_pack_path(source)?;
944 Some(normalized)
945 } else if is_component || is_dir {
946 None
947 } else {
948 Some(safe_path.clone())
949 };
950 let archive_hint = archive_hint_path.as_deref();
951 if verify_archive {
952 if let Some(verify_target) = archive_hint.and_then(|p| {
953 std::fs::metadata(p)
954 .ok()
955 .filter(|meta| meta.is_file())
956 .map(|_| p)
957 }) {
958 verify::verify_pack(verify_target).await?;
959 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
960 } else {
961 tracing::debug!("skipping archive verification (no archive source)");
962 }
963 }
964 let engine = Engine::default();
965 let mut metadata = PackMetadata::fallback(&safe_path);
966 let mut manifest = None;
967 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
968 let mut flows = None;
969 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
970 if is_dir {
971 Some(safe_path.clone())
972 } else {
973 None
974 }
975 });
976
977 if let Some(root) = materialized_root.as_ref() {
978 match load_manifest_and_flows_from_dir(root) {
979 Ok(ManifestLoad::New {
980 manifest: m,
981 flows: cache,
982 }) => {
983 metadata = cache.metadata.clone();
984 manifest = Some(*m);
985 flows = Some(cache);
986 }
987 Ok(ManifestLoad::Legacy {
988 manifest: m,
989 flows: cache,
990 }) => {
991 metadata = cache.metadata.clone();
992 legacy_manifest = Some(m);
993 flows = Some(cache);
994 }
995 Err(err) => {
996 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
997 }
998 }
999 }
1000
1001 if manifest.is_none()
1002 && legacy_manifest.is_none()
1003 && let Some(archive_path) = archive_hint
1004 {
1005 match load_manifest_and_flows(archive_path) {
1006 Ok(ManifestLoad::New {
1007 manifest: m,
1008 flows: cache,
1009 }) => {
1010 metadata = cache.metadata.clone();
1011 manifest = Some(*m);
1012 flows = Some(cache);
1013 }
1014 Ok(ManifestLoad::Legacy {
1015 manifest: m,
1016 flows: cache,
1017 }) => {
1018 metadata = cache.metadata.clone();
1019 legacy_manifest = Some(m);
1020 flows = Some(cache);
1021 }
1022 Err(err) => {
1023 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1024 }
1025 }
1026 }
1027 let component_sources = if let Some(manifest) = manifest.as_ref() {
1028 component_sources_table(manifest).context("invalid component sources extension")?
1029 } else {
1030 None
1031 };
1032 let components = if is_component {
1033 let wasm_bytes = fs::read(&safe_path).await?;
1034 metadata = PackMetadata::from_wasm(&wasm_bytes)
1035 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1036 let name = safe_path
1037 .file_stem()
1038 .map(|s| s.to_string_lossy().to_string())
1039 .unwrap_or_else(|| "component".to_string());
1040 let component = Component::from_binary(&engine, &wasm_bytes)?;
1041 let mut map = HashMap::new();
1042 map.insert(
1043 name.clone(),
1044 PackComponent {
1045 name,
1046 version: metadata.version.clone(),
1047 component,
1048 },
1049 );
1050 map
1051 } else {
1052 let specs = component_specs(manifest.as_ref(), legacy_manifest.as_deref());
1053 if specs.is_empty() {
1054 HashMap::new()
1055 } else {
1056 let mut loaded = HashMap::new();
1057 let mut missing: HashSet<String> =
1058 specs.iter().map(|spec| spec.id.clone()).collect();
1059 let mut searched = Vec::new();
1060
1061 if !component_resolution.overrides.is_empty() {
1062 load_components_from_overrides(
1063 &engine,
1064 &component_resolution.overrides,
1065 &specs,
1066 &mut missing,
1067 &mut loaded,
1068 )?;
1069 searched.push("override map".to_string());
1070 }
1071
1072 if let Some(component_sources) = component_sources.as_ref() {
1073 load_components_from_sources(
1074 &engine,
1075 component_sources,
1076 &component_resolution,
1077 &specs,
1078 &mut missing,
1079 &mut loaded,
1080 materialized_root.as_deref(),
1081 archive_hint,
1082 )
1083 .await?;
1084 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1085 }
1086
1087 if let Some(root) = materialized_root.as_ref() {
1088 load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1089 searched.push(format!("components dir {}", root.display()));
1090 }
1091
1092 if let Some(archive_path) = archive_hint {
1093 load_components_from_archive(
1094 &engine,
1095 archive_path,
1096 &specs,
1097 &mut missing,
1098 &mut loaded,
1099 )?;
1100 searched.push(format!("archive {}", archive_path.display()));
1101 }
1102
1103 if !missing.is_empty() {
1104 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1105 let sources = if searched.is_empty() {
1106 "no component sources".to_string()
1107 } else {
1108 searched.join(", ")
1109 };
1110 bail!(
1111 "components missing: {}; looked in {}",
1112 missing_list,
1113 sources
1114 );
1115 }
1116
1117 loaded
1118 }
1119 };
1120 let http_client = Arc::clone(&HTTP_CLIENT);
1121 Ok(Self {
1122 path: safe_path,
1123 archive_path: archive_hint.map(Path::to_path_buf),
1124 config,
1125 engine,
1126 metadata,
1127 manifest,
1128 legacy_manifest,
1129 mocks,
1130 flows,
1131 components,
1132 http_client,
1133 pre_cache: Mutex::new(HashMap::new()),
1134 session_store,
1135 state_store,
1136 wasi_policy,
1137 provider_registry: RwLock::new(None),
1138 secrets,
1139 oauth_config,
1140 })
1141 }
1142
1143 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1144 if let Some(cache) = &self.flows {
1145 return Ok(cache.descriptors.clone());
1146 }
1147 if let Some(manifest) = &self.manifest {
1148 let descriptors = manifest
1149 .flows
1150 .iter()
1151 .map(|flow| FlowDescriptor {
1152 id: flow.id.as_str().to_string(),
1153 flow_type: flow_kind_to_str(flow.kind).to_string(),
1154 profile: manifest.pack_id.as_str().to_string(),
1155 version: manifest.version.to_string(),
1156 description: None,
1157 })
1158 .collect();
1159 return Ok(descriptors);
1160 }
1161 Ok(Vec::new())
1162 }
1163
1164 #[allow(dead_code)]
1165 pub async fn run_flow(
1166 &self,
1167 flow_id: &str,
1168 input: serde_json::Value,
1169 ) -> Result<serde_json::Value> {
1170 let pack = Arc::new(
1171 PackRuntime::load(
1172 &self.path,
1173 Arc::clone(&self.config),
1174 self.mocks.clone(),
1175 self.archive_path.as_deref(),
1176 self.session_store.clone(),
1177 self.state_store.clone(),
1178 Arc::clone(&self.wasi_policy),
1179 self.secrets.clone(),
1180 self.oauth_config.clone(),
1181 false,
1182 ComponentResolution::default(),
1183 )
1184 .await?,
1185 );
1186
1187 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1188 let retry_config = self.config.retry_config().into();
1189 let mocks = pack.mocks.as_deref();
1190 let tenant = self.config.tenant.as_str();
1191
1192 let ctx = FlowContext {
1193 tenant,
1194 flow_id,
1195 node_id: None,
1196 tool: None,
1197 action: None,
1198 session_id: None,
1199 provider_id: None,
1200 retry_config,
1201 observer: None,
1202 mocks,
1203 };
1204
1205 let execution = engine.execute(ctx, input).await?;
1206 match execution.status {
1207 FlowStatus::Completed => Ok(execution.output),
1208 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1209 "status": "pending",
1210 "reason": wait.reason,
1211 "resume": wait.snapshot,
1212 "response": execution.output,
1213 })),
1214 }
1215 }
1216
1217 pub async fn invoke_component(
1218 &self,
1219 component_ref: &str,
1220 ctx: ComponentExecCtx,
1221 operation: &str,
1222 _config_json: Option<String>,
1223 input_json: String,
1224 ) -> Result<Value> {
1225 let pack_component = self
1226 .components
1227 .get(component_ref)
1228 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1229
1230 let mut linker = Linker::new(&self.engine);
1231 register_all(&mut linker)?;
1232 add_component_control_to_linker(&mut linker)?;
1233 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1234 let pre: ComponentPre<ComponentState> = ComponentPre::new(pre_instance)?;
1235
1236 let host_state = HostState::new(
1237 Arc::clone(&self.config),
1238 Arc::clone(&self.http_client),
1239 self.mocks.clone(),
1240 self.session_store.clone(),
1241 self.state_store.clone(),
1242 Arc::clone(&self.secrets),
1243 self.oauth_config.clone(),
1244 )?;
1245 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1246 let mut store = wasmtime::Store::new(&self.engine, store_state);
1247 let bindings: crate::component_api::Component = pre.instantiate_async(&mut store).await?;
1248 let node = bindings.greentic_component_node();
1249
1250 let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1251
1252 match result {
1253 InvokeResult::Ok(body) => {
1254 if body.is_empty() {
1255 return Ok(Value::Null);
1256 }
1257 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1258 }
1259 InvokeResult::Err(NodeError {
1260 code,
1261 message,
1262 retryable,
1263 backoff_ms,
1264 details,
1265 }) => {
1266 let mut obj = serde_json::Map::new();
1267 obj.insert("ok".into(), Value::Bool(false));
1268 let mut error = serde_json::Map::new();
1269 error.insert("code".into(), Value::String(code));
1270 error.insert("message".into(), Value::String(message));
1271 error.insert("retryable".into(), Value::Bool(retryable));
1272 if let Some(backoff) = backoff_ms {
1273 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1274 }
1275 if let Some(details) = details {
1276 error.insert(
1277 "details".into(),
1278 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1279 );
1280 }
1281 obj.insert("error".into(), Value::Object(error));
1282 Ok(Value::Object(obj))
1283 }
1284 }
1285 }
1286
1287 pub fn resolve_provider(
1288 &self,
1289 provider_id: Option<&str>,
1290 provider_type: Option<&str>,
1291 ) -> Result<ProviderBinding> {
1292 let registry = self.provider_registry()?;
1293 registry.resolve(provider_id, provider_type)
1294 }
1295
1296 pub async fn invoke_provider(
1297 &self,
1298 binding: &ProviderBinding,
1299 _ctx: ComponentExecCtx,
1300 op: &str,
1301 input_json: Vec<u8>,
1302 ) -> Result<Value> {
1303 let component_ref = &binding.component_ref;
1304 let pack_component = self
1305 .components
1306 .get(component_ref)
1307 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1308
1309 let mut linker = Linker::new(&self.engine);
1310 register_all(&mut linker)?;
1311 add_component_control_to_linker(&mut linker)?;
1312 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1313 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1314
1315 let host_state = HostState::new(
1316 Arc::clone(&self.config),
1317 Arc::clone(&self.http_client),
1318 self.mocks.clone(),
1319 self.session_store.clone(),
1320 self.state_store.clone(),
1321 Arc::clone(&self.secrets),
1322 self.oauth_config.clone(),
1323 )?;
1324 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1325 let mut store = wasmtime::Store::new(&self.engine, store_state);
1326 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1327 let provider = bindings.greentic_provider_core_schema_core_api();
1328
1329 let result = provider.call_invoke(&mut store, op, &input_json)?;
1330 deserialize_json_bytes(result)
1331 }
1332
1333 fn provider_registry(&self) -> Result<ProviderRegistry> {
1334 if let Some(registry) = self.provider_registry.read().clone() {
1335 return Ok(registry);
1336 }
1337 let manifest = self
1338 .manifest
1339 .as_ref()
1340 .context("pack manifest required for provider resolution")?;
1341 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1342 let registry = ProviderRegistry::new(
1343 manifest,
1344 self.state_store.clone(),
1345 &self.config.tenant,
1346 &env,
1347 )?;
1348 *self.provider_registry.write() = Some(registry.clone());
1349 Ok(registry)
1350 }
1351
1352 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1353 if let Some(cache) = &self.flows {
1354 return cache
1355 .flows
1356 .get(flow_id)
1357 .cloned()
1358 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1359 }
1360 if let Some(manifest) = &self.manifest {
1361 let entry = manifest
1362 .flows
1363 .iter()
1364 .find(|f| f.id.as_str() == flow_id)
1365 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1366 return Ok(entry.flow.clone());
1367 }
1368 bail!("flow '{flow_id}' not available (pack exports disabled)")
1369 }
1370
1371 pub fn metadata(&self) -> &PackMetadata {
1372 &self.metadata
1373 }
1374
1375 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1376 &self.metadata.secret_requirements
1377 }
1378
1379 pub fn missing_secrets(
1380 &self,
1381 tenant_ctx: &TypesTenantCtx,
1382 ) -> Vec<greentic_types::SecretRequirement> {
1383 let env = tenant_ctx.env.as_str().to_string();
1384 let tenant = tenant_ctx.tenant.as_str().to_string();
1385 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1386 self.required_secrets()
1387 .iter()
1388 .filter(|req| {
1389 if let Some(scope) = &req.scope {
1391 if scope.env != env {
1392 return false;
1393 }
1394 if scope.tenant != tenant {
1395 return false;
1396 }
1397 if let Some(ref team_req) = scope.team
1398 && team.as_ref() != Some(team_req)
1399 {
1400 return false;
1401 }
1402 }
1403 read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1404 })
1405 .cloned()
1406 .collect()
1407 }
1408
1409 pub fn for_component_test(
1410 components: Vec<(String, PathBuf)>,
1411 flows: HashMap<String, FlowIR>,
1412 config: Arc<HostConfig>,
1413 ) -> Result<Self> {
1414 let engine = Engine::default();
1415 let mut component_map = HashMap::new();
1416 for (name, path) in components {
1417 if !path.exists() {
1418 bail!("component artifact missing: {}", path.display());
1419 }
1420 let wasm_bytes = std::fs::read(&path)?;
1421 let component = Component::from_binary(&engine, &wasm_bytes)
1422 .with_context(|| format!("failed to compile component {}", path.display()))?;
1423 component_map.insert(
1424 name.clone(),
1425 PackComponent {
1426 name,
1427 version: "0.0.0".into(),
1428 component,
1429 },
1430 );
1431 }
1432
1433 let mut flow_map = HashMap::new();
1434 let mut descriptors = Vec::new();
1435 for (id, ir) in flows {
1436 let flow_type = ir.flow_type.clone();
1437 let flow = flow_ir_to_flow(ir)?;
1438 flow_map.insert(id.clone(), flow);
1439 descriptors.push(FlowDescriptor {
1440 id: id.clone(),
1441 flow_type,
1442 profile: "test".into(),
1443 version: "0.0.0".into(),
1444 description: None,
1445 });
1446 }
1447 let flows_cache = PackFlows {
1448 descriptors: descriptors.clone(),
1449 flows: flow_map,
1450 metadata: PackMetadata::fallback(Path::new("component-test")),
1451 };
1452
1453 Ok(Self {
1454 path: PathBuf::new(),
1455 archive_path: None,
1456 config,
1457 engine,
1458 metadata: PackMetadata::fallback(Path::new("component-test")),
1459 manifest: None,
1460 legacy_manifest: None,
1461 mocks: None,
1462 flows: Some(flows_cache),
1463 components: component_map,
1464 http_client: Arc::clone(&HTTP_CLIENT),
1465 pre_cache: Mutex::new(HashMap::new()),
1466 session_store: None,
1467 state_store: None,
1468 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1469 provider_registry: RwLock::new(None),
1470 secrets: crate::secrets::default_manager(),
1471 oauth_config: None,
1472 })
1473 }
1474}
1475
1476struct PackFlows {
1477 descriptors: Vec<FlowDescriptor>,
1478 flows: HashMap<String, Flow>,
1479 metadata: PackMetadata,
1480}
1481
1482const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1483 "greentic.pack.runtime_flow",
1484 "greentic.pack.flow_runtime",
1485 "greentic.pack.runtime_flows",
1486];
1487
1488#[derive(Debug, Deserialize)]
1489struct RuntimeFlowBundle {
1490 flows: Vec<RuntimeFlow>,
1491}
1492
1493#[derive(Debug, Deserialize)]
1494struct RuntimeFlow {
1495 id: String,
1496 #[serde(alias = "flow_type")]
1497 kind: FlowKind,
1498 #[serde(default)]
1499 schema_version: Option<String>,
1500 #[serde(default)]
1501 start: Option<String>,
1502 #[serde(default)]
1503 entrypoints: BTreeMap<String, Value>,
1504 nodes: BTreeMap<String, RuntimeNode>,
1505 #[serde(default)]
1506 metadata: Option<FlowMetadata>,
1507}
1508
1509#[derive(Debug, Deserialize)]
1510struct RuntimeNode {
1511 #[serde(alias = "component")]
1512 component_id: String,
1513 #[serde(default, alias = "operation")]
1514 operation_name: Option<String>,
1515 #[serde(default, alias = "payload", alias = "input")]
1516 operation_payload: Value,
1517 #[serde(default)]
1518 routing: Option<Routing>,
1519 #[serde(default)]
1520 telemetry: Option<TelemetryHints>,
1521}
1522
1523fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1524 if bytes.is_empty() {
1525 return Ok(Value::Null);
1526 }
1527 serde_json::from_slice(&bytes).or_else(|_| {
1528 String::from_utf8(bytes)
1529 .map(Value::String)
1530 .map_err(|err| anyhow!(err))
1531 })
1532}
1533
1534impl PackFlows {
1535 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1536 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1537 return flows;
1538 }
1539 let descriptors = manifest
1540 .flows
1541 .iter()
1542 .map(|entry| FlowDescriptor {
1543 id: entry.id.as_str().to_string(),
1544 flow_type: flow_kind_to_str(entry.kind).to_string(),
1545 profile: manifest.pack_id.as_str().to_string(),
1546 version: manifest.version.to_string(),
1547 description: None,
1548 })
1549 .collect();
1550 let mut flows = HashMap::new();
1551 for entry in &manifest.flows {
1552 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1553 }
1554 Self {
1555 metadata: PackMetadata::from_manifest(&manifest),
1556 descriptors,
1557 flows,
1558 }
1559 }
1560}
1561
1562fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1563 let extensions = manifest.extensions.as_ref()?;
1564 let extension = extensions.iter().find_map(|(key, ext)| {
1565 if RUNTIME_FLOW_EXTENSION_IDS
1566 .iter()
1567 .any(|candidate| candidate == key)
1568 {
1569 Some(ext)
1570 } else {
1571 None
1572 }
1573 })?;
1574 let runtime_flows = match decode_runtime_flow_extension(extension) {
1575 Some(flows) if !flows.is_empty() => flows,
1576 _ => return None,
1577 };
1578
1579 let descriptors = runtime_flows
1580 .iter()
1581 .map(|flow| FlowDescriptor {
1582 id: flow.id.as_str().to_string(),
1583 flow_type: flow_kind_to_str(flow.kind).to_string(),
1584 profile: manifest.pack_id.as_str().to_string(),
1585 version: manifest.version.to_string(),
1586 description: None,
1587 })
1588 .collect::<Vec<_>>();
1589 let flows = runtime_flows
1590 .into_iter()
1591 .map(|flow| (flow.id.as_str().to_string(), flow))
1592 .collect();
1593
1594 Some(PackFlows {
1595 metadata: PackMetadata::from_manifest(manifest),
1596 descriptors,
1597 flows,
1598 })
1599}
1600
1601fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1602 let value = match extension.inline.as_ref()? {
1603 ExtensionInline::Other(value) => value.clone(),
1604 _ => return None,
1605 };
1606
1607 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1608 return Some(collect_runtime_flows(bundle.flows));
1609 }
1610
1611 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1612 return Some(collect_runtime_flows(flows));
1613 }
1614
1615 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1616 return Some(flows);
1617 }
1618
1619 warn!(
1620 extension = %extension.kind,
1621 version = %extension.version,
1622 "runtime flow extension present but could not be decoded"
1623 );
1624 None
1625}
1626
1627fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1628 flows
1629 .into_iter()
1630 .filter_map(|flow| match runtime_flow_to_flow(flow) {
1631 Ok(flow) => Some(flow),
1632 Err(err) => {
1633 warn!(error = %err, "failed to decode runtime flow");
1634 None
1635 }
1636 })
1637 .collect()
1638}
1639
1640fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1641 let flow_id = FlowId::from_str(&runtime.id)
1642 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1643 let mut entrypoints = runtime.entrypoints;
1644 if entrypoints.is_empty()
1645 && let Some(start) = &runtime.start
1646 {
1647 entrypoints.insert("default".into(), Value::String(start.clone()));
1648 }
1649
1650 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1651 for (id, node) in runtime.nodes {
1652 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1653 let component_id = ComponentId::from_str(&node.component_id)
1654 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1655 let component = FlowComponentRef {
1656 id: component_id,
1657 pack_alias: None,
1658 operation: node.operation_name,
1659 };
1660 let routing = node.routing.unwrap_or(Routing::End);
1661 let telemetry = node.telemetry.unwrap_or_default();
1662 nodes.insert(
1663 node_id.clone(),
1664 Node {
1665 id: node_id,
1666 component,
1667 input: InputMapping {
1668 mapping: node.operation_payload,
1669 },
1670 output: OutputMapping {
1671 mapping: Value::Null,
1672 },
1673 routing,
1674 telemetry,
1675 },
1676 );
1677 }
1678
1679 Ok(Flow {
1680 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1681 id: flow_id,
1682 kind: runtime.kind,
1683 entrypoints,
1684 nodes,
1685 metadata: runtime.metadata.unwrap_or_default(),
1686 })
1687}
1688
1689fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1690 match kind {
1691 greentic_types::FlowKind::Messaging => "messaging",
1692 greentic_types::FlowKind::Event => "event",
1693 greentic_types::FlowKind::ComponentConfig => "component-config",
1694 greentic_types::FlowKind::Job => "job",
1695 greentic_types::FlowKind::Http => "http",
1696 }
1697}
1698
1699fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1700 let mut file = archive
1701 .by_name(name)
1702 .with_context(|| format!("entry {name} missing from archive"))?;
1703 let mut buf = Vec::new();
1704 file.read_to_end(&mut buf)?;
1705 Ok(buf)
1706}
1707
1708fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1709 for node in doc.nodes.values_mut() {
1710 let Some((component_ref, payload)) = node
1711 .raw
1712 .iter()
1713 .next()
1714 .map(|(key, value)| (key.clone(), value.clone()))
1715 else {
1716 continue;
1717 };
1718 if component_ref.starts_with("emit.") {
1719 node.operation = Some(component_ref);
1720 node.payload = payload;
1721 node.raw.clear();
1722 continue;
1723 }
1724 let (target_component, operation, input, config) =
1725 infer_component_exec(&payload, &component_ref);
1726 let mut payload_obj = serde_json::Map::new();
1727 payload_obj.insert("component".into(), Value::String(target_component));
1729 payload_obj.insert("operation".into(), Value::String(operation));
1730 payload_obj.insert("input".into(), input);
1731 if let Some(cfg) = config {
1732 payload_obj.insert("config".into(), cfg);
1733 }
1734 node.operation = Some("component.exec".to_string());
1735 node.payload = Value::Object(payload_obj);
1736 node.raw.clear();
1737 }
1738 doc
1739}
1740
1741fn infer_component_exec(
1742 payload: &Value,
1743 component_ref: &str,
1744) -> (String, String, Value, Option<Value>) {
1745 let default_op = if component_ref.starts_with("templating.") {
1746 "render"
1747 } else {
1748 "invoke"
1749 }
1750 .to_string();
1751
1752 if let Value::Object(map) = payload {
1753 let op = map
1754 .get("op")
1755 .or_else(|| map.get("operation"))
1756 .and_then(Value::as_str)
1757 .map(|s| s.to_string())
1758 .unwrap_or_else(|| default_op.clone());
1759
1760 let mut input = map.clone();
1761 let config = input.remove("config");
1762 let component = input
1763 .get("component")
1764 .or_else(|| input.get("component_ref"))
1765 .and_then(Value::as_str)
1766 .map(|s| s.to_string())
1767 .unwrap_or_else(|| component_ref.to_string());
1768 input.remove("component");
1769 input.remove("component_ref");
1770 input.remove("op");
1771 input.remove("operation");
1772 return (component, op, Value::Object(input), config);
1773 }
1774
1775 (component_ref.to_string(), default_op, payload.clone(), None)
1776}
1777
1778#[derive(Clone, Debug)]
1779struct ComponentSpec {
1780 id: String,
1781 version: String,
1782 legacy_path: Option<String>,
1783}
1784
1785#[derive(Clone, Debug)]
1786struct ComponentSourceInfo {
1787 digest: String,
1788 source: ComponentSourceRef,
1789 artifact: ComponentArtifactLocation,
1790}
1791
1792#[derive(Clone, Debug)]
1793enum ComponentArtifactLocation {
1794 Inline { wasm_path: String },
1795 Remote,
1796}
1797
1798fn component_specs(
1799 manifest: Option<&greentic_types::PackManifest>,
1800 legacy_manifest: Option<&legacy_pack::PackManifest>,
1801) -> Vec<ComponentSpec> {
1802 if let Some(manifest) = manifest {
1803 return manifest
1804 .components
1805 .iter()
1806 .map(|entry| ComponentSpec {
1807 id: entry.id.as_str().to_string(),
1808 version: entry.version.to_string(),
1809 legacy_path: None,
1810 })
1811 .collect();
1812 }
1813 if let Some(legacy_manifest) = legacy_manifest {
1814 return legacy_manifest
1815 .components
1816 .iter()
1817 .map(|entry| ComponentSpec {
1818 id: entry.name.clone(),
1819 version: entry.version.to_string(),
1820 legacy_path: Some(entry.file_wasm.clone()),
1821 })
1822 .collect();
1823 }
1824 Vec::new()
1825}
1826
1827fn component_sources_table(
1828 manifest: &greentic_types::PackManifest,
1829) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
1830 let Some(sources) = manifest.get_component_sources_v1()? else {
1831 return Ok(None);
1832 };
1833 let mut table = HashMap::new();
1834 for entry in sources.components {
1835 let artifact = match entry.artifact {
1836 ArtifactLocationV1::Inline { wasm_path, .. } => {
1837 ComponentArtifactLocation::Inline { wasm_path }
1838 }
1839 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
1840 };
1841 let info = ComponentSourceInfo {
1842 digest: entry.resolved.digest,
1843 source: entry.source,
1844 artifact,
1845 };
1846 if let Some(component_id) = entry.component_id.as_ref() {
1847 table.insert(component_id.as_str().to_string(), info.clone());
1848 }
1849 table.insert(entry.name, info);
1850 }
1851 Ok(Some(table))
1852}
1853
1854fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1855 if let Some(path) = &spec.legacy_path {
1856 return root.join(path);
1857 }
1858 root.join("components").join(format!("{}.wasm", spec.id))
1859}
1860
1861fn normalize_digest(digest: &str) -> String {
1862 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
1863 digest.to_string()
1864 } else {
1865 format!("sha256:{digest}")
1866 }
1867}
1868
1869fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
1870 if digest.starts_with("blake3:") {
1871 let hash = blake3::hash(bytes);
1872 return Ok(format!("blake3:{}", hash.to_hex()));
1873 }
1874 let mut hasher = sha2::Sha256::new();
1875 hasher.update(bytes);
1876 Ok(format!("sha256:{:x}", hasher.finalize()))
1877}
1878
1879fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
1880 let normalized_expected = normalize_digest(expected);
1881 let actual = compute_digest_for(bytes, &normalized_expected)?;
1882 if normalize_digest(&actual) != normalized_expected {
1883 bail!(
1884 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
1885 );
1886 }
1887 Ok(())
1888}
1889
1890fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
1891 let mut opts = DistOptions {
1892 allow_tags: true,
1893 ..DistOptions::default()
1894 };
1895 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
1896 opts.cache_dir = cache_dir;
1897 }
1898 if component_resolution.dist_offline {
1899 opts.offline = true;
1900 }
1901 opts
1902}
1903
1904#[allow(clippy::too_many_arguments)]
1905async fn load_components_from_sources(
1906 engine: &Engine,
1907 component_sources: &HashMap<String, ComponentSourceInfo>,
1908 component_resolution: &ComponentResolution,
1909 specs: &[ComponentSpec],
1910 missing: &mut HashSet<String>,
1911 into: &mut HashMap<String, PackComponent>,
1912 materialized_root: Option<&Path>,
1913 archive_hint: Option<&Path>,
1914) -> Result<()> {
1915 let mut archive = if let Some(path) = archive_hint {
1916 Some(
1917 ZipArchive::new(File::open(path)?)
1918 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
1919 )
1920 } else {
1921 None
1922 };
1923 let mut dist_client: Option<DistClient> = None;
1924
1925 for spec in specs {
1926 if !missing.contains(&spec.id) {
1927 continue;
1928 }
1929 let Some(source) = component_sources.get(&spec.id) else {
1930 continue;
1931 };
1932
1933 let bytes = match &source.artifact {
1934 ComponentArtifactLocation::Inline { wasm_path } => {
1935 if let Some(root) = materialized_root {
1936 let path = root.join(wasm_path);
1937 if path.exists() {
1938 std::fs::read(&path).with_context(|| {
1939 format!(
1940 "failed to read inline component {} from {}",
1941 spec.id,
1942 path.display()
1943 )
1944 })?
1945 } else if archive.is_none() {
1946 bail!("inline component {} missing at {}", spec.id, path.display());
1947 } else {
1948 read_entry(
1949 archive.as_mut().expect("archive present when needed"),
1950 wasm_path,
1951 )
1952 .with_context(|| {
1953 format!(
1954 "inline component {} missing at {} in pack archive",
1955 spec.id, wasm_path
1956 )
1957 })?
1958 }
1959 } else if let Some(archive) = archive.as_mut() {
1960 read_entry(archive, wasm_path).with_context(|| {
1961 format!(
1962 "inline component {} missing at {} in pack archive",
1963 spec.id, wasm_path
1964 )
1965 })?
1966 } else {
1967 bail!(
1968 "inline component {} missing and no pack source available",
1969 spec.id
1970 );
1971 }
1972 }
1973 ComponentArtifactLocation::Remote => {
1974 let client = dist_client.get_or_insert_with(|| {
1975 DistClient::new(dist_options_from(component_resolution))
1976 });
1977 let reference = source.source.to_string();
1978 let cache_path = if component_resolution.dist_offline {
1979 client
1980 .fetch_digest(&source.digest)
1981 .await
1982 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
1983 } else {
1984 let resolved = client
1985 .resolve_ref(&reference)
1986 .await
1987 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
1988 let expected = normalize_digest(&source.digest);
1989 let actual = normalize_digest(&resolved.digest);
1990 if expected != actual {
1991 bail!(
1992 "component {} digest mismatch after fetch: expected {}, got {}",
1993 spec.id,
1994 expected,
1995 actual
1996 );
1997 }
1998 resolved.cache_path.ok_or_else(|| {
1999 anyhow!(
2000 "component {} resolved from {} but cache path is missing",
2001 spec.id,
2002 reference
2003 )
2004 })?
2005 };
2006 std::fs::read(&cache_path).with_context(|| {
2007 format!(
2008 "failed to read cached component {} from {}",
2009 spec.id,
2010 cache_path.display()
2011 )
2012 })?
2013 }
2014 };
2015
2016 verify_component_digest(&spec.id, &source.digest, &bytes)?;
2017 let component = Component::from_binary(engine, &bytes)
2018 .with_context(|| format!("failed to compile component {}", spec.id))?;
2019 into.insert(
2020 spec.id.clone(),
2021 PackComponent {
2022 name: spec.id.clone(),
2023 version: spec.version.clone(),
2024 component,
2025 },
2026 );
2027 missing.remove(&spec.id);
2028 }
2029
2030 Ok(())
2031}
2032
2033fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2034 match err {
2035 DistError::CacheMiss { reference: missing } => anyhow!(
2036 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2037 component_id,
2038 missing,
2039 reference
2040 ),
2041 DistError::Offline { reference: blocked } => anyhow!(
2042 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2043 component_id,
2044 blocked,
2045 reference
2046 ),
2047 DistError::AuthRequired { target } => anyhow!(
2048 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2049 component_id,
2050 target,
2051 reference
2052 ),
2053 other => anyhow!(
2054 "failed to resolve component {} from {}: {}",
2055 component_id,
2056 reference,
2057 other
2058 ),
2059 }
2060}
2061
2062fn load_components_from_overrides(
2063 engine: &Engine,
2064 overrides: &HashMap<String, PathBuf>,
2065 specs: &[ComponentSpec],
2066 missing: &mut HashSet<String>,
2067 into: &mut HashMap<String, PackComponent>,
2068) -> Result<()> {
2069 for spec in specs {
2070 if !missing.contains(&spec.id) {
2071 continue;
2072 }
2073 let Some(path) = overrides.get(&spec.id) else {
2074 continue;
2075 };
2076 let bytes = std::fs::read(path)
2077 .with_context(|| format!("failed to read override component {}", path.display()))?;
2078 let component = Component::from_binary(engine, &bytes).with_context(|| {
2079 format!(
2080 "failed to compile component {} from override {}",
2081 spec.id,
2082 path.display()
2083 )
2084 })?;
2085 into.insert(
2086 spec.id.clone(),
2087 PackComponent {
2088 name: spec.id.clone(),
2089 version: spec.version.clone(),
2090 component,
2091 },
2092 );
2093 missing.remove(&spec.id);
2094 }
2095 Ok(())
2096}
2097
2098fn load_components_from_dir(
2099 engine: &Engine,
2100 root: &Path,
2101 specs: &[ComponentSpec],
2102 missing: &mut HashSet<String>,
2103 into: &mut HashMap<String, PackComponent>,
2104) -> Result<()> {
2105 for spec in specs {
2106 if !missing.contains(&spec.id) {
2107 continue;
2108 }
2109 let path = component_path_for_spec(root, spec);
2110 if !path.exists() {
2111 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2112 continue;
2113 }
2114 let bytes = std::fs::read(&path)
2115 .with_context(|| format!("failed to read component {}", path.display()))?;
2116 let component = Component::from_binary(engine, &bytes).with_context(|| {
2117 format!(
2118 "failed to compile component {} from {}",
2119 spec.id,
2120 path.display()
2121 )
2122 })?;
2123 into.insert(
2124 spec.id.clone(),
2125 PackComponent {
2126 name: spec.id.clone(),
2127 version: spec.version.clone(),
2128 component,
2129 },
2130 );
2131 missing.remove(&spec.id);
2132 }
2133 Ok(())
2134}
2135
2136fn load_components_from_archive(
2137 engine: &Engine,
2138 path: &Path,
2139 specs: &[ComponentSpec],
2140 missing: &mut HashSet<String>,
2141 into: &mut HashMap<String, PackComponent>,
2142) -> Result<()> {
2143 let mut archive = ZipArchive::new(File::open(path)?)
2144 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2145 for spec in specs {
2146 if !missing.contains(&spec.id) {
2147 continue;
2148 }
2149 let file_name = spec
2150 .legacy_path
2151 .clone()
2152 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2153 let bytes = match read_entry(&mut archive, &file_name) {
2154 Ok(bytes) => bytes,
2155 Err(err) => {
2156 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2157 continue;
2158 }
2159 };
2160 let component = Component::from_binary(engine, &bytes)
2161 .with_context(|| format!("failed to compile component {}", spec.id))?;
2162 into.insert(
2163 spec.id.clone(),
2164 PackComponent {
2165 name: spec.id.clone(),
2166 version: spec.version.clone(),
2167 component,
2168 },
2169 );
2170 missing.remove(&spec.id);
2171 }
2172 Ok(())
2173}
2174
2175#[cfg(test)]
2176mod tests {
2177 use super::*;
2178 use greentic_flow::model::{FlowDoc, NodeDoc};
2179 use serde_json::json;
2180 use std::collections::BTreeMap;
2181
2182 #[test]
2183 fn normalizes_raw_component_to_component_exec() {
2184 let mut nodes = BTreeMap::new();
2185 let mut raw = BTreeMap::new();
2186 raw.insert(
2187 "templating.handlebars".into(),
2188 json!({ "template": "Hi {{name}}" }),
2189 );
2190 nodes.insert(
2191 "start".into(),
2192 NodeDoc {
2193 raw,
2194 routing: json!([{"out": true}]),
2195 ..Default::default()
2196 },
2197 );
2198 let doc = FlowDoc {
2199 id: "welcome".into(),
2200 title: None,
2201 description: None,
2202 flow_type: "messaging".into(),
2203 start: Some("start".into()),
2204 parameters: json!({}),
2205 tags: Vec::new(),
2206 schema_version: None,
2207 entrypoints: BTreeMap::new(),
2208 nodes,
2209 };
2210
2211 let normalized = normalize_flow_doc(doc);
2212 let node = normalized.nodes.get("start").expect("node exists");
2213 assert_eq!(node.operation.as_deref(), Some("component.exec"));
2214 assert!(node.raw.is_empty());
2215 let payload = node.payload.as_object().expect("payload object");
2216 assert_eq!(
2217 payload.get("component"),
2218 Some(&Value::String("templating.handlebars".into()))
2219 );
2220 assert_eq!(
2221 payload.get("operation"),
2222 Some(&Value::String("render".into()))
2223 );
2224 let input = payload.get("input").unwrap();
2225 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2226 }
2227}
2228
2229#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2230pub struct PackMetadata {
2231 pub pack_id: String,
2232 pub version: String,
2233 #[serde(default)]
2234 pub entry_flows: Vec<String>,
2235 #[serde(default)]
2236 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2237}
2238
2239impl PackMetadata {
2240 fn from_wasm(bytes: &[u8]) -> Option<Self> {
2241 let parser = Parser::new(0);
2242 for payload in parser.parse_all(bytes) {
2243 let payload = payload.ok()?;
2244 match payload {
2245 Payload::CustomSection(section) => {
2246 if section.name() == "greentic.manifest"
2247 && let Ok(meta) = Self::from_bytes(section.data())
2248 {
2249 return Some(meta);
2250 }
2251 }
2252 Payload::DataSection(reader) => {
2253 for segment in reader.into_iter().flatten() {
2254 if let Ok(meta) = Self::from_bytes(segment.data) {
2255 return Some(meta);
2256 }
2257 }
2258 }
2259 _ => {}
2260 }
2261 }
2262 None
2263 }
2264
2265 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2266 #[derive(Deserialize)]
2267 struct RawManifest {
2268 pack_id: String,
2269 version: String,
2270 #[serde(default)]
2271 entry_flows: Vec<String>,
2272 #[serde(default)]
2273 flows: Vec<RawFlow>,
2274 #[serde(default)]
2275 secret_requirements: Vec<greentic_types::SecretRequirement>,
2276 }
2277
2278 #[derive(Deserialize)]
2279 struct RawFlow {
2280 id: String,
2281 }
2282
2283 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2284 let mut entry_flows = if manifest.entry_flows.is_empty() {
2285 manifest.flows.iter().map(|f| f.id.clone()).collect()
2286 } else {
2287 manifest.entry_flows.clone()
2288 };
2289 entry_flows.retain(|id| !id.is_empty());
2290 Ok(Self {
2291 pack_id: manifest.pack_id,
2292 version: manifest.version,
2293 entry_flows,
2294 secret_requirements: manifest.secret_requirements,
2295 })
2296 }
2297
2298 pub fn fallback(path: &Path) -> Self {
2299 let pack_id = path
2300 .file_stem()
2301 .map(|s| s.to_string_lossy().into_owned())
2302 .unwrap_or_else(|| "unknown-pack".to_string());
2303 Self {
2304 pack_id,
2305 version: "0.0.0".to_string(),
2306 entry_flows: Vec::new(),
2307 secret_requirements: Vec::new(),
2308 }
2309 }
2310
2311 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2312 let entry_flows = manifest
2313 .flows
2314 .iter()
2315 .map(|flow| flow.id.as_str().to_string())
2316 .collect::<Vec<_>>();
2317 Self {
2318 pack_id: manifest.pack_id.as_str().to_string(),
2319 version: manifest.version.to_string(),
2320 entry_flows,
2321 secret_requirements: manifest.secret_requirements.clone(),
2322 }
2323 }
2324}