1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::{
10 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
13use crate::provider::{ProviderBinding, ProviderRegistry};
14use crate::provider_core::SchemaCorePre as ProviderComponentPre;
15use crate::provider_core_only;
16use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
17use anyhow::{Context, Result, anyhow, bail};
18use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20 self as host_v1, HostFns, add_all_v1_to_linker,
21 runner_host_http::RunnerHostHttp,
22 runner_host_kv::RunnerHostKv,
23 secrets_store::{SecretsError, SecretsStoreHost},
24 state_store::{
25 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26 StateStoreHost, TenantCtx as StateTenantCtx,
27 },
28 telemetry_logger::{
29 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31 TenantCtx as TelemetryTenantCtx,
32 },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::flow::FlowHasher;
38use greentic_types::{
39 ArtifactLocationV1, ComponentId, ComponentSourceRef, EXT_COMPONENT_SOURCES_V1, EnvId,
40 ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
41 NodeId, OutputMapping, Routing, StateKey as StoreStateKey, TeamId, TelemetryHints,
42 TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
43 pack_manifest::ExtensionInline,
44};
45use host_v1::http_client::{
46 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
47 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
48 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
49 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
50};
51use indexmap::IndexMap;
52use once_cell::sync::Lazy;
53use parking_lot::{Mutex, RwLock};
54use reqwest::blocking::Client as BlockingClient;
55use runner_core::normalize_under_root;
56use serde::{Deserialize, Serialize};
57use serde_cbor;
58use serde_json::{self, Value};
59use sha2::Digest;
60use tokio::fs;
61use wasmparser::{Parser, Payload};
62use wasmtime::StoreContextMut;
63use zip::ZipArchive;
64
65use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
66use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
67use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
68
69use crate::config::HostConfig;
70use crate::secrets::{DynSecretsManager, read_secret_blocking};
71use crate::storage::state::STATE_PREFIX;
72use crate::storage::{DynSessionStore, DynStateStore};
73use crate::verify;
74use crate::wasi::RunnerWasiPolicy;
75use tracing::warn;
76use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
77use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
78
79use greentic_flow::model::FlowDoc;
80
81#[allow(dead_code)]
82pub struct PackRuntime {
83 path: PathBuf,
85 archive_path: Option<PathBuf>,
87 config: Arc<HostConfig>,
88 engine: Engine,
89 metadata: PackMetadata,
90 manifest: Option<greentic_types::PackManifest>,
91 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
92 mocks: Option<Arc<MockLayer>>,
93 flows: Option<PackFlows>,
94 components: HashMap<String, PackComponent>,
95 http_client: Arc<BlockingClient>,
96 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
97 session_store: Option<DynSessionStore>,
98 state_store: Option<DynStateStore>,
99 wasi_policy: Arc<RunnerWasiPolicy>,
100 provider_registry: RwLock<Option<ProviderRegistry>>,
101 secrets: DynSecretsManager,
102 oauth_config: Option<OAuthBrokerConfig>,
103}
104
105struct PackComponent {
106 #[allow(dead_code)]
107 name: String,
108 #[allow(dead_code)]
109 version: String,
110 component: Component,
111}
112
113#[derive(Debug, Default, Clone)]
114pub struct ComponentResolution {
115 pub materialized_root: Option<PathBuf>,
117 pub overrides: HashMap<String, PathBuf>,
119 pub dist_offline: bool,
121 pub dist_cache_dir: Option<PathBuf>,
123}
124
125fn build_blocking_client() -> BlockingClient {
126 std::thread::spawn(|| {
127 BlockingClient::builder()
128 .no_proxy()
129 .build()
130 .expect("blocking client")
131 })
132 .join()
133 .expect("client build thread panicked")
134}
135
136fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
137 let (root, candidate) = if path.is_absolute() {
138 let parent = path
139 .parent()
140 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
141 let root = parent
142 .canonicalize()
143 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
144 let file = path
145 .file_name()
146 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
147 (root, PathBuf::from(file))
148 } else {
149 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
150 let base = if let Some(parent) = path.parent() {
151 cwd.join(parent)
152 } else {
153 cwd
154 };
155 let root = base
156 .canonicalize()
157 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
158 let file = path
159 .file_name()
160 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
161 (root, PathBuf::from(file))
162 };
163 let safe = normalize_under_root(&root, &candidate)?;
164 Ok((root, safe))
165}
166
167static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct FlowDescriptor {
171 pub id: String,
172 #[serde(rename = "type")]
173 pub flow_type: String,
174 pub profile: String,
175 pub version: String,
176 #[serde(default)]
177 pub description: Option<String>,
178}
179
180pub struct HostState {
181 config: Arc<HostConfig>,
182 http_client: Arc<BlockingClient>,
183 default_env: String,
184 #[allow(dead_code)]
185 session_store: Option<DynSessionStore>,
186 state_store: Option<DynStateStore>,
187 mocks: Option<Arc<MockLayer>>,
188 secrets: DynSecretsManager,
189 oauth_config: Option<OAuthBrokerConfig>,
190 oauth_host: OAuthBrokerHost,
191}
192
193impl HostState {
194 #[allow(clippy::default_constructed_unit_structs)]
195 pub fn new(
196 config: Arc<HostConfig>,
197 http_client: Arc<BlockingClient>,
198 mocks: Option<Arc<MockLayer>>,
199 session_store: Option<DynSessionStore>,
200 state_store: Option<DynStateStore>,
201 secrets: DynSecretsManager,
202 oauth_config: Option<OAuthBrokerConfig>,
203 ) -> Result<Self> {
204 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
205 Ok(Self {
206 config,
207 http_client,
208 default_env,
209 session_store,
210 state_store,
211 mocks,
212 secrets,
213 oauth_config,
214 oauth_host: OAuthBrokerHost::default(),
215 })
216 }
217
218 pub fn get_secret(&self, key: &str) -> Result<String> {
219 if provider_core_only::is_enabled() {
220 bail!(provider_core_only::blocked_message("secrets"))
221 }
222 if !self.config.secrets_policy.is_allowed(key) {
223 bail!("secret {key} is not permitted by bindings policy");
224 }
225 if let Some(mock) = &self.mocks
226 && let Some(value) = mock.secrets_lookup(key)
227 {
228 return Ok(value);
229 }
230 let bytes = read_secret_blocking(&self.secrets, key)
231 .context("failed to read secret from manager")?;
232 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
233 Ok(value)
234 }
235
236 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
237 let tenant_raw = ctx
238 .as_ref()
239 .map(|ctx| ctx.tenant.clone())
240 .unwrap_or_else(|| self.config.tenant.clone());
241 let env_raw = ctx
242 .as_ref()
243 .map(|ctx| ctx.env.clone())
244 .unwrap_or_else(|| self.default_env.clone());
245 let tenant_id = TenantId::from_str(&tenant_raw)
246 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
247 let env_id = EnvId::from_str(&env_raw)
248 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
249 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
250 if let Some(ctx) = ctx {
251 if let Some(team) = ctx.team.or(ctx.team_id) {
252 let team_id =
253 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
254 tenant_ctx = tenant_ctx.with_team(Some(team_id));
255 }
256 if let Some(user) = ctx.user.or(ctx.user_id) {
257 let user_id =
258 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
259 tenant_ctx = tenant_ctx.with_user(Some(user_id));
260 }
261 if let Some(flow) = ctx.flow_id {
262 tenant_ctx = tenant_ctx.with_flow(flow);
263 }
264 if let Some(node) = ctx.node_id {
265 tenant_ctx = tenant_ctx.with_node(node);
266 }
267 if let Some(provider) = ctx.provider_id {
268 tenant_ctx = tenant_ctx.with_provider(provider);
269 }
270 if let Some(session) = ctx.session_id {
271 tenant_ctx = tenant_ctx.with_session(session);
272 }
273 tenant_ctx.trace_id = ctx.trace_id;
274 }
275 Ok(tenant_ctx)
276 }
277
278 fn send_http_request(
279 &mut self,
280 req: HttpRequest,
281 opts: Option<HttpRequestOptionsV1_1>,
282 _ctx: Option<HttpTenantCtx>,
283 ) -> Result<HttpResponse, HttpClientError> {
284 if !self.config.http_enabled {
285 return Err(HttpClientError {
286 code: "denied".into(),
287 message: "http client disabled by policy".into(),
288 });
289 }
290
291 let mut mock_state = None;
292 let raw_body = req.body.clone();
293 if let Some(mock) = &self.mocks
294 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
295 {
296 match mock.http_begin(&meta) {
297 HttpDecision::Mock(response) => {
298 let headers = response
299 .headers
300 .iter()
301 .map(|(k, v)| (k.clone(), v.clone()))
302 .collect();
303 return Ok(HttpResponse {
304 status: response.status,
305 headers,
306 body: response.body.clone().map(|b| b.into_bytes()),
307 });
308 }
309 HttpDecision::Deny(reason) => {
310 return Err(HttpClientError {
311 code: "denied".into(),
312 message: reason,
313 });
314 }
315 HttpDecision::Passthrough { record } => {
316 mock_state = Some((meta, record));
317 }
318 }
319 }
320
321 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
322 let mut builder = self.http_client.request(method, &req.url);
323 for (key, value) in req.headers {
324 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
325 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
326 {
327 builder = builder.header(header, header_value);
328 }
329 }
330
331 if let Some(body) = raw_body.clone() {
332 builder = builder.body(body);
333 }
334
335 if let Some(opts) = opts {
336 if let Some(timeout_ms) = opts.timeout_ms {
337 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
338 }
339 if opts.allow_insecure == Some(true) {
340 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
341 }
342 if let Some(follow_redirects) = opts.follow_redirects
343 && !follow_redirects
344 {
345 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
346 }
347 }
348
349 let response = match builder.send() {
350 Ok(resp) => resp,
351 Err(err) => {
352 warn!(url = %req.url, error = %err, "http client request failed");
353 return Err(HttpClientError {
354 code: "unavailable".into(),
355 message: err.to_string(),
356 });
357 }
358 };
359
360 let status = response.status().as_u16();
361 let headers_vec = response
362 .headers()
363 .iter()
364 .map(|(k, v)| {
365 (
366 k.as_str().to_string(),
367 v.to_str().unwrap_or_default().to_string(),
368 )
369 })
370 .collect::<Vec<_>>();
371 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
372
373 if let Some((meta, true)) = mock_state.take()
374 && let Some(mock) = &self.mocks
375 {
376 let recorded = HttpMockResponse::new(
377 status,
378 headers_vec.clone().into_iter().collect(),
379 body_bytes
380 .as_ref()
381 .map(|b| String::from_utf8_lossy(b).into_owned()),
382 );
383 mock.http_record(&meta, &recorded);
384 }
385
386 Ok(HttpResponse {
387 status,
388 headers: headers_vec,
389 body: body_bytes,
390 })
391 }
392}
393
394impl SecretsStoreHost for HostState {
395 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
396 if provider_core_only::is_enabled() {
397 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
398 return Err(SecretsError::Denied);
399 }
400 if !self.config.secrets_policy.is_allowed(&key) {
401 return Err(SecretsError::Denied);
402 }
403 if let Some(mock) = &self.mocks
404 && let Some(value) = mock.secrets_lookup(&key)
405 {
406 return Ok(Some(value.into_bytes()));
407 }
408 match read_secret_blocking(&self.secrets, &key) {
409 Ok(bytes) => Ok(Some(bytes)),
410 Err(err) => {
411 warn!(secret = %key, error = %err, "secret lookup failed");
412 Err(SecretsError::NotFound)
413 }
414 }
415 }
416}
417
418impl HttpClientHost for HostState {
419 fn send(
420 &mut self,
421 req: HttpRequest,
422 ctx: Option<HttpTenantCtx>,
423 ) -> Result<HttpResponse, HttpClientError> {
424 self.send_http_request(req, None, ctx)
425 }
426}
427
428impl HttpClientHostV1_1 for HostState {
429 fn send(
430 &mut self,
431 req: HttpRequestV1_1,
432 opts: Option<HttpRequestOptionsV1_1>,
433 ctx: Option<HttpTenantCtxV1_1>,
434 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
435 let legacy_req = HttpRequest {
436 method: req.method,
437 url: req.url,
438 headers: req.headers,
439 body: req.body,
440 };
441 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
442 env: ctx.env,
443 tenant: ctx.tenant,
444 tenant_id: ctx.tenant_id,
445 team: ctx.team,
446 team_id: ctx.team_id,
447 user: ctx.user,
448 user_id: ctx.user_id,
449 trace_id: ctx.trace_id,
450 correlation_id: ctx.correlation_id,
451 attributes: ctx.attributes,
452 session_id: ctx.session_id,
453 flow_id: ctx.flow_id,
454 node_id: ctx.node_id,
455 provider_id: ctx.provider_id,
456 deadline_ms: ctx.deadline_ms,
457 attempt: ctx.attempt,
458 idempotency_key: ctx.idempotency_key,
459 impersonation: ctx
460 .impersonation
461 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
462 actor_id,
463 reason,
464 }),
465 });
466
467 self.send_http_request(legacy_req, opts, legacy_ctx)
468 .map(|resp| HttpResponseV1_1 {
469 status: resp.status,
470 headers: resp.headers,
471 body: resp.body,
472 })
473 .map_err(|err| HttpClientErrorV1_1 {
474 code: err.code,
475 message: err.message,
476 })
477 }
478}
479
480impl StateStoreHost for HostState {
481 fn read(
482 &mut self,
483 key: HostStateKey,
484 ctx: Option<StateTenantCtx>,
485 ) -> Result<Vec<u8>, StateError> {
486 let store = match self.state_store.as_ref() {
487 Some(store) => store.clone(),
488 None => {
489 return Err(StateError {
490 code: "unavailable".into(),
491 message: "state store not configured".into(),
492 });
493 }
494 };
495 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
496 Ok(ctx) => ctx,
497 Err(err) => {
498 return Err(StateError {
499 code: "invalid-ctx".into(),
500 message: err.to_string(),
501 });
502 }
503 };
504 let key = StoreStateKey::from(key);
505 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
506 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
507 Ok(None) => Err(StateError {
508 code: "not_found".into(),
509 message: "state key not found".into(),
510 }),
511 Err(err) => Err(StateError {
512 code: "internal".into(),
513 message: err.to_string(),
514 }),
515 }
516 }
517
518 fn write(
519 &mut self,
520 key: HostStateKey,
521 bytes: Vec<u8>,
522 ctx: Option<StateTenantCtx>,
523 ) -> Result<StateOpAck, StateError> {
524 let store = match self.state_store.as_ref() {
525 Some(store) => store.clone(),
526 None => {
527 return Err(StateError {
528 code: "unavailable".into(),
529 message: "state store not configured".into(),
530 });
531 }
532 };
533 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
534 Ok(ctx) => ctx,
535 Err(err) => {
536 return Err(StateError {
537 code: "invalid-ctx".into(),
538 message: err.to_string(),
539 });
540 }
541 };
542 let key = StoreStateKey::from(key);
543 let value = serde_json::from_slice(&bytes)
544 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
545 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
546 Ok(()) => Ok(StateOpAck::Ok),
547 Err(err) => Err(StateError {
548 code: "internal".into(),
549 message: err.to_string(),
550 }),
551 }
552 }
553
554 fn delete(
555 &mut self,
556 key: HostStateKey,
557 ctx: Option<StateTenantCtx>,
558 ) -> Result<StateOpAck, StateError> {
559 let store = match self.state_store.as_ref() {
560 Some(store) => store.clone(),
561 None => {
562 return Err(StateError {
563 code: "unavailable".into(),
564 message: "state store not configured".into(),
565 });
566 }
567 };
568 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
569 Ok(ctx) => ctx,
570 Err(err) => {
571 return Err(StateError {
572 code: "invalid-ctx".into(),
573 message: err.to_string(),
574 });
575 }
576 };
577 let key = StoreStateKey::from(key);
578 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
579 Ok(_) => Ok(StateOpAck::Ok),
580 Err(err) => Err(StateError {
581 code: "internal".into(),
582 message: err.to_string(),
583 }),
584 }
585 }
586}
587
588impl TelemetryLoggerHost for HostState {
589 fn log(
590 &mut self,
591 span: TelemetrySpanContext,
592 fields: Vec<(String, String)>,
593 _ctx: Option<TelemetryTenantCtx>,
594 ) -> Result<TelemetryAck, TelemetryError> {
595 if let Some(mock) = &self.mocks
596 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
597 {
598 return Ok(TelemetryAck::Ok);
599 }
600 let mut map = serde_json::Map::new();
601 for (k, v) in fields {
602 map.insert(k, Value::String(v));
603 }
604 tracing::info!(
605 tenant = %span.tenant,
606 flow_id = %span.flow_id,
607 node = ?span.node_id,
608 provider = %span.provider,
609 fields = %serde_json::Value::Object(map.clone()),
610 "telemetry log from pack"
611 );
612 Ok(TelemetryAck::Ok)
613 }
614}
615
616impl RunnerHostHttp for HostState {
617 fn request(
618 &mut self,
619 method: String,
620 url: String,
621 headers: Vec<String>,
622 body: Option<Vec<u8>>,
623 ) -> Result<Vec<u8>, String> {
624 let req = HttpRequest {
625 method,
626 url,
627 headers: headers
628 .chunks(2)
629 .filter_map(|chunk| {
630 if chunk.len() == 2 {
631 Some((chunk[0].clone(), chunk[1].clone()))
632 } else {
633 None
634 }
635 })
636 .collect(),
637 body,
638 };
639 match HttpClientHost::send(self, req, None) {
640 Ok(resp) => Ok(resp.body.unwrap_or_default()),
641 Err(err) => Err(err.message),
642 }
643 }
644}
645
646impl RunnerHostKv for HostState {
647 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
648 None
649 }
650
651 fn put(&mut self, _ns: String, _key: String, _val: String) {}
652}
653
654enum ManifestLoad {
655 New {
656 manifest: Box<greentic_types::PackManifest>,
657 flows: PackFlows,
658 },
659 Legacy {
660 manifest: Box<legacy_pack::PackManifest>,
661 flows: PackFlows,
662 },
663}
664
665fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
666 let mut archive = ZipArchive::new(File::open(path)?)
667 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
668 let bytes = read_entry(&mut archive, "manifest.cbor")
669 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
670 match decode_pack_manifest(&bytes) {
671 Ok(manifest) => {
672 let cache = PackFlows::from_manifest(manifest.clone());
673 Ok(ManifestLoad::New {
674 manifest: Box::new(manifest),
675 flows: cache,
676 })
677 }
678 Err(err) => {
679 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
680 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
682 .context("failed to decode legacy pack manifest from manifest.cbor")?;
683 let flows = load_legacy_flows(&mut archive, &legacy)?;
684 Ok(ManifestLoad::Legacy {
685 manifest: Box::new(legacy),
686 flows,
687 })
688 }
689 }
690}
691
692fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
693 let manifest_path = root.join("manifest.cbor");
694 let bytes = std::fs::read(&manifest_path)
695 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
696 match decode_pack_manifest(&bytes) {
697 Ok(manifest) => {
698 let cache = PackFlows::from_manifest(manifest.clone());
699 Ok(ManifestLoad::New {
700 manifest: Box::new(manifest),
701 flows: cache,
702 })
703 }
704 Err(err) => {
705 tracing::debug!(
706 error = %err,
707 pack = %root.display(),
708 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
709 );
710 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
711 .context("failed to decode legacy pack manifest from manifest.cbor")?;
712 let flows = load_legacy_flows_from_dir(root, &legacy)?;
713 Ok(ManifestLoad::Legacy {
714 manifest: Box::new(legacy),
715 flows,
716 })
717 }
718 }
719}
720
721fn load_legacy_flows(
722 archive: &mut ZipArchive<File>,
723 manifest: &legacy_pack::PackManifest,
724) -> Result<PackFlows> {
725 let mut flows = HashMap::new();
726 let mut descriptors = Vec::new();
727
728 for entry in &manifest.flows {
729 let bytes = read_entry(archive, &entry.file_json)
730 .with_context(|| format!("missing flow json {}", entry.file_json))?;
731 let doc: FlowDoc = serde_json::from_slice(&bytes)
732 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
733 let normalized = normalize_flow_doc(doc);
734 let flow_ir = flow_doc_to_ir(normalized)?;
735 let flow = flow_ir_to_flow(flow_ir)?;
736
737 descriptors.push(FlowDescriptor {
738 id: entry.id.clone(),
739 flow_type: entry.kind.clone(),
740 profile: manifest.meta.pack_id.clone(),
741 version: manifest.meta.version.to_string(),
742 description: None,
743 });
744 flows.insert(entry.id.clone(), flow);
745 }
746
747 let mut entry_flows = manifest.meta.entry_flows.clone();
748 if entry_flows.is_empty() {
749 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
750 }
751 let metadata = PackMetadata {
752 pack_id: manifest.meta.pack_id.clone(),
753 version: manifest.meta.version.to_string(),
754 entry_flows,
755 secret_requirements: Vec::new(),
756 };
757
758 Ok(PackFlows {
759 descriptors,
760 flows,
761 metadata,
762 })
763}
764
765fn load_legacy_flows_from_dir(
766 root: &Path,
767 manifest: &legacy_pack::PackManifest,
768) -> Result<PackFlows> {
769 let mut flows = HashMap::new();
770 let mut descriptors = Vec::new();
771
772 for entry in &manifest.flows {
773 let path = root.join(&entry.file_json);
774 let bytes = std::fs::read(&path)
775 .with_context(|| format!("missing flow json {}", path.display()))?;
776 let doc: FlowDoc = serde_json::from_slice(&bytes)
777 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
778 let normalized = normalize_flow_doc(doc);
779 let flow_ir = flow_doc_to_ir(normalized)?;
780 let flow = flow_ir_to_flow(flow_ir)?;
781
782 descriptors.push(FlowDescriptor {
783 id: entry.id.clone(),
784 flow_type: entry.kind.clone(),
785 profile: manifest.meta.pack_id.clone(),
786 version: manifest.meta.version.to_string(),
787 description: None,
788 });
789 flows.insert(entry.id.clone(), flow);
790 }
791
792 let mut entry_flows = manifest.meta.entry_flows.clone();
793 if entry_flows.is_empty() {
794 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
795 }
796 let metadata = PackMetadata {
797 pack_id: manifest.meta.pack_id.clone(),
798 version: manifest.meta.version.to_string(),
799 entry_flows,
800 secret_requirements: Vec::new(),
801 };
802
803 Ok(PackFlows {
804 descriptors,
805 flows,
806 metadata,
807 })
808}
809
810pub struct ComponentState {
811 pub host: HostState,
812 wasi_ctx: WasiCtx,
813 resource_table: ResourceTable,
814}
815
816impl ComponentState {
817 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
818 let wasi_ctx = policy
819 .instantiate()
820 .context("failed to build WASI context")?;
821 Ok(Self {
822 host,
823 wasi_ctx,
824 resource_table: ResourceTable::new(),
825 })
826 }
827
828 fn host_mut(&mut self) -> &mut HostState {
829 &mut self.host
830 }
831
832 fn should_cancel_host(&mut self) -> bool {
833 false
834 }
835
836 fn yield_now_host(&mut self) {
837 }
839}
840
841impl component_api::v0_4::greentic::component::control::Host for ComponentState {
842 fn should_cancel(&mut self) -> bool {
843 self.should_cancel_host()
844 }
845
846 fn yield_now(&mut self) {
847 self.yield_now_host();
848 }
849}
850
851impl component_api::v0_5::greentic::component::control::Host for ComponentState {
852 fn should_cancel(&mut self) -> bool {
853 self.should_cancel_host()
854 }
855
856 fn yield_now(&mut self) {
857 self.yield_now_host();
858 }
859}
860
861fn add_component_control_instance(
862 linker: &mut Linker<ComponentState>,
863 name: &str,
864) -> wasmtime::Result<()> {
865 let mut inst = linker.instance(name)?;
866 inst.func_wrap(
867 "should-cancel",
868 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
869 let host = caller.data_mut();
870 Ok((host.should_cancel_host(),))
871 },
872 )?;
873 inst.func_wrap(
874 "yield-now",
875 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
876 let host = caller.data_mut();
877 host.yield_now_host();
878 Ok(())
879 },
880 )?;
881 Ok(())
882}
883
884fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
885 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
886 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
887 Ok(())
888}
889
890pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
891 add_wasi_to_linker(linker)?;
892 add_all_v1_to_linker(
893 linker,
894 HostFns {
895 http_client_v1_1: Some(|state| state.host_mut()),
896 http_client: Some(|state| state.host_mut()),
897 oauth_broker: None,
898 runner_host_http: Some(|state| state.host_mut()),
899 runner_host_kv: Some(|state| state.host_mut()),
900 telemetry_logger: Some(|state| state.host_mut()),
901 state_store: Some(|state| state.host_mut()),
902 secrets_store: Some(|state| state.host_mut()),
903 },
904 )?;
905 Ok(())
906}
907
908impl OAuthHostContext for ComponentState {
909 fn tenant_id(&self) -> &str {
910 &self.host.config.tenant
911 }
912
913 fn env(&self) -> &str {
914 &self.host.default_env
915 }
916
917 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
918 &mut self.host.oauth_host
919 }
920
921 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
922 self.host.oauth_config.as_ref()
923 }
924}
925
926impl WasiView for ComponentState {
927 fn ctx(&mut self) -> WasiCtxView<'_> {
928 WasiCtxView {
929 ctx: &mut self.wasi_ctx,
930 table: &mut self.resource_table,
931 }
932 }
933}
934
935#[allow(unsafe_code)]
936unsafe impl Send for ComponentState {}
937#[allow(unsafe_code)]
938unsafe impl Sync for ComponentState {}
939
940impl PackRuntime {
941 #[allow(clippy::too_many_arguments)]
942 pub async fn load(
943 path: impl AsRef<Path>,
944 config: Arc<HostConfig>,
945 mocks: Option<Arc<MockLayer>>,
946 archive_source: Option<&Path>,
947 session_store: Option<DynSessionStore>,
948 state_store: Option<DynStateStore>,
949 wasi_policy: Arc<RunnerWasiPolicy>,
950 secrets: DynSecretsManager,
951 oauth_config: Option<OAuthBrokerConfig>,
952 verify_archive: bool,
953 component_resolution: ComponentResolution,
954 ) -> Result<Self> {
955 let path = path.as_ref();
956 let (_pack_root, safe_path) = normalize_pack_path(path)?;
957 let path_meta = std::fs::metadata(&safe_path).ok();
958 let is_dir = path_meta
959 .as_ref()
960 .map(|meta| meta.is_dir())
961 .unwrap_or(false);
962 let is_component = !is_dir
963 && safe_path
964 .extension()
965 .and_then(|ext| ext.to_str())
966 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
967 .unwrap_or(false);
968 let archive_hint_path = if let Some(source) = archive_source {
969 let (_, normalized) = normalize_pack_path(source)?;
970 Some(normalized)
971 } else if is_component || is_dir {
972 None
973 } else {
974 Some(safe_path.clone())
975 };
976 let archive_hint = archive_hint_path.as_deref();
977 if verify_archive {
978 if let Some(verify_target) = archive_hint.and_then(|p| {
979 std::fs::metadata(p)
980 .ok()
981 .filter(|meta| meta.is_file())
982 .map(|_| p)
983 }) {
984 verify::verify_pack(verify_target).await?;
985 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
986 } else {
987 tracing::debug!("skipping archive verification (no archive source)");
988 }
989 }
990 let engine = Engine::default();
991 let mut metadata = PackMetadata::fallback(&safe_path);
992 let mut manifest = None;
993 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
994 let mut flows = None;
995 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
996 if is_dir {
997 Some(safe_path.clone())
998 } else {
999 None
1000 }
1001 });
1002
1003 if let Some(root) = materialized_root.as_ref() {
1004 match load_manifest_and_flows_from_dir(root) {
1005 Ok(ManifestLoad::New {
1006 manifest: m,
1007 flows: cache,
1008 }) => {
1009 metadata = cache.metadata.clone();
1010 manifest = Some(*m);
1011 flows = Some(cache);
1012 }
1013 Ok(ManifestLoad::Legacy {
1014 manifest: m,
1015 flows: cache,
1016 }) => {
1017 metadata = cache.metadata.clone();
1018 legacy_manifest = Some(m);
1019 flows = Some(cache);
1020 }
1021 Err(err) => {
1022 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1023 }
1024 }
1025 }
1026
1027 if manifest.is_none()
1028 && legacy_manifest.is_none()
1029 && let Some(archive_path) = archive_hint
1030 {
1031 match load_manifest_and_flows(archive_path) {
1032 Ok(ManifestLoad::New {
1033 manifest: m,
1034 flows: cache,
1035 }) => {
1036 metadata = cache.metadata.clone();
1037 manifest = Some(*m);
1038 flows = Some(cache);
1039 }
1040 Ok(ManifestLoad::Legacy {
1041 manifest: m,
1042 flows: cache,
1043 }) => {
1044 metadata = cache.metadata.clone();
1045 legacy_manifest = Some(m);
1046 flows = Some(cache);
1047 }
1048 Err(err) => {
1049 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1050 }
1051 }
1052 }
1053 let component_sources = if let Some(manifest) = manifest.as_ref() {
1054 component_sources_table(manifest).context("invalid component sources extension")?
1055 } else {
1056 None
1057 };
1058 let components = if is_component {
1059 let wasm_bytes = fs::read(&safe_path).await?;
1060 metadata = PackMetadata::from_wasm(&wasm_bytes)
1061 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1062 let name = safe_path
1063 .file_stem()
1064 .map(|s| s.to_string_lossy().to_string())
1065 .unwrap_or_else(|| "component".to_string());
1066 let component = Component::from_binary(&engine, &wasm_bytes)?;
1067 let mut map = HashMap::new();
1068 map.insert(
1069 name.clone(),
1070 PackComponent {
1071 name,
1072 version: metadata.version.clone(),
1073 component,
1074 },
1075 );
1076 map
1077 } else {
1078 let specs = component_specs(manifest.as_ref(), legacy_manifest.as_deref());
1079 if specs.is_empty() {
1080 HashMap::new()
1081 } else {
1082 let mut loaded = HashMap::new();
1083 let mut missing: HashSet<String> =
1084 specs.iter().map(|spec| spec.id.clone()).collect();
1085 let mut searched = Vec::new();
1086
1087 if !component_resolution.overrides.is_empty() {
1088 load_components_from_overrides(
1089 &engine,
1090 &component_resolution.overrides,
1091 &specs,
1092 &mut missing,
1093 &mut loaded,
1094 )?;
1095 searched.push("override map".to_string());
1096 }
1097
1098 if let Some(component_sources) = component_sources.as_ref() {
1099 load_components_from_sources(
1100 &engine,
1101 component_sources,
1102 &component_resolution,
1103 &specs,
1104 &mut missing,
1105 &mut loaded,
1106 materialized_root.as_deref(),
1107 archive_hint,
1108 )
1109 .await?;
1110 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1111 }
1112
1113 if let Some(root) = materialized_root.as_ref() {
1114 load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1115 searched.push(format!("components dir {}", root.display()));
1116 }
1117
1118 if let Some(archive_path) = archive_hint {
1119 load_components_from_archive(
1120 &engine,
1121 archive_path,
1122 &specs,
1123 &mut missing,
1124 &mut loaded,
1125 )?;
1126 searched.push(format!("archive {}", archive_path.display()));
1127 }
1128
1129 if !missing.is_empty() {
1130 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1131 let sources = if searched.is_empty() {
1132 "no component sources".to_string()
1133 } else {
1134 searched.join(", ")
1135 };
1136 bail!(
1137 "components missing: {}; looked in {}",
1138 missing_list,
1139 sources
1140 );
1141 }
1142
1143 loaded
1144 }
1145 };
1146 let http_client = Arc::clone(&HTTP_CLIENT);
1147 Ok(Self {
1148 path: safe_path,
1149 archive_path: archive_hint.map(Path::to_path_buf),
1150 config,
1151 engine,
1152 metadata,
1153 manifest,
1154 legacy_manifest,
1155 mocks,
1156 flows,
1157 components,
1158 http_client,
1159 pre_cache: Mutex::new(HashMap::new()),
1160 session_store,
1161 state_store,
1162 wasi_policy,
1163 provider_registry: RwLock::new(None),
1164 secrets,
1165 oauth_config,
1166 })
1167 }
1168
1169 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1170 if let Some(cache) = &self.flows {
1171 return Ok(cache.descriptors.clone());
1172 }
1173 if let Some(manifest) = &self.manifest {
1174 let descriptors = manifest
1175 .flows
1176 .iter()
1177 .map(|flow| FlowDescriptor {
1178 id: flow.id.as_str().to_string(),
1179 flow_type: flow_kind_to_str(flow.kind).to_string(),
1180 profile: manifest.pack_id.as_str().to_string(),
1181 version: manifest.version.to_string(),
1182 description: None,
1183 })
1184 .collect();
1185 return Ok(descriptors);
1186 }
1187 Ok(Vec::new())
1188 }
1189
1190 #[allow(dead_code)]
1191 pub async fn run_flow(
1192 &self,
1193 flow_id: &str,
1194 input: serde_json::Value,
1195 ) -> Result<serde_json::Value> {
1196 let pack = Arc::new(
1197 PackRuntime::load(
1198 &self.path,
1199 Arc::clone(&self.config),
1200 self.mocks.clone(),
1201 self.archive_path.as_deref(),
1202 self.session_store.clone(),
1203 self.state_store.clone(),
1204 Arc::clone(&self.wasi_policy),
1205 self.secrets.clone(),
1206 self.oauth_config.clone(),
1207 false,
1208 ComponentResolution::default(),
1209 )
1210 .await?,
1211 );
1212
1213 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1214 let retry_config = self.config.retry_config().into();
1215 let mocks = pack.mocks.as_deref();
1216 let tenant = self.config.tenant.as_str();
1217
1218 let ctx = FlowContext {
1219 tenant,
1220 flow_id,
1221 node_id: None,
1222 tool: None,
1223 action: None,
1224 session_id: None,
1225 provider_id: None,
1226 retry_config,
1227 observer: None,
1228 mocks,
1229 };
1230
1231 let execution = engine.execute(ctx, input).await?;
1232 match execution.status {
1233 FlowStatus::Completed => Ok(execution.output),
1234 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1235 "status": "pending",
1236 "reason": wait.reason,
1237 "resume": wait.snapshot,
1238 "response": execution.output,
1239 })),
1240 }
1241 }
1242
1243 pub async fn invoke_component(
1244 &self,
1245 component_ref: &str,
1246 ctx: ComponentExecCtx,
1247 operation: &str,
1248 _config_json: Option<String>,
1249 input_json: String,
1250 ) -> Result<Value> {
1251 let pack_component = self
1252 .components
1253 .get(component_ref)
1254 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1255
1256 let mut linker = Linker::new(&self.engine);
1257 register_all(&mut linker)?;
1258 add_component_control_to_linker(&mut linker)?;
1259
1260 let host_state = HostState::new(
1261 Arc::clone(&self.config),
1262 Arc::clone(&self.http_client),
1263 self.mocks.clone(),
1264 self.session_store.clone(),
1265 self.state_store.clone(),
1266 Arc::clone(&self.secrets),
1267 self.oauth_config.clone(),
1268 )?;
1269 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1270 let mut store = wasmtime::Store::new(&self.engine, store_state);
1271 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1272 let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1273 Ok(pre) => {
1274 let bindings = pre.instantiate_async(&mut store).await?;
1275 let node = bindings.greentic_component_node();
1276 let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1277 let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1278 component_api::invoke_result_from_v0_5(result)
1279 }
1280 Err(err) => {
1281 if is_missing_node_export(&err, "0.5.0") {
1282 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1283 let pre: component_api::v0_4::ComponentPre<ComponentState> =
1284 component_api::v0_4::ComponentPre::new(pre_instance)?;
1285 let bindings = pre.instantiate_async(&mut store).await?;
1286 let node = bindings.greentic_component_node();
1287 let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1288 let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1289 component_api::invoke_result_from_v0_4(result)
1290 } else {
1291 return Err(err);
1292 }
1293 }
1294 };
1295
1296 match result {
1297 InvokeResult::Ok(body) => {
1298 if body.is_empty() {
1299 return Ok(Value::Null);
1300 }
1301 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1302 }
1303 InvokeResult::Err(NodeError {
1304 code,
1305 message,
1306 retryable,
1307 backoff_ms,
1308 details,
1309 }) => {
1310 let mut obj = serde_json::Map::new();
1311 obj.insert("ok".into(), Value::Bool(false));
1312 let mut error = serde_json::Map::new();
1313 error.insert("code".into(), Value::String(code));
1314 error.insert("message".into(), Value::String(message));
1315 error.insert("retryable".into(), Value::Bool(retryable));
1316 if let Some(backoff) = backoff_ms {
1317 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1318 }
1319 if let Some(details) = details {
1320 error.insert(
1321 "details".into(),
1322 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1323 );
1324 }
1325 obj.insert("error".into(), Value::Object(error));
1326 Ok(Value::Object(obj))
1327 }
1328 }
1329 }
1330
1331 pub fn resolve_provider(
1332 &self,
1333 provider_id: Option<&str>,
1334 provider_type: Option<&str>,
1335 ) -> Result<ProviderBinding> {
1336 let registry = self.provider_registry()?;
1337 registry.resolve(provider_id, provider_type)
1338 }
1339
1340 pub async fn invoke_provider(
1341 &self,
1342 binding: &ProviderBinding,
1343 _ctx: ComponentExecCtx,
1344 op: &str,
1345 input_json: Vec<u8>,
1346 ) -> Result<Value> {
1347 let component_ref = &binding.component_ref;
1348 let pack_component = self
1349 .components
1350 .get(component_ref)
1351 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1352
1353 let mut linker = Linker::new(&self.engine);
1354 register_all(&mut linker)?;
1355 add_component_control_to_linker(&mut linker)?;
1356 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1357 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1358
1359 let host_state = HostState::new(
1360 Arc::clone(&self.config),
1361 Arc::clone(&self.http_client),
1362 self.mocks.clone(),
1363 self.session_store.clone(),
1364 self.state_store.clone(),
1365 Arc::clone(&self.secrets),
1366 self.oauth_config.clone(),
1367 )?;
1368 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1369 let mut store = wasmtime::Store::new(&self.engine, store_state);
1370 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1371 let provider = bindings.greentic_provider_core_schema_core_api();
1372
1373 let result = provider.call_invoke(&mut store, op, &input_json)?;
1374 deserialize_json_bytes(result)
1375 }
1376
1377 fn provider_registry(&self) -> Result<ProviderRegistry> {
1378 if let Some(registry) = self.provider_registry.read().clone() {
1379 return Ok(registry);
1380 }
1381 let manifest = self
1382 .manifest
1383 .as_ref()
1384 .context("pack manifest required for provider resolution")?;
1385 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1386 let registry = ProviderRegistry::new(
1387 manifest,
1388 self.state_store.clone(),
1389 &self.config.tenant,
1390 &env,
1391 )?;
1392 *self.provider_registry.write() = Some(registry.clone());
1393 Ok(registry)
1394 }
1395
1396 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1397 if let Some(cache) = &self.flows {
1398 return cache
1399 .flows
1400 .get(flow_id)
1401 .cloned()
1402 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1403 }
1404 if let Some(manifest) = &self.manifest {
1405 let entry = manifest
1406 .flows
1407 .iter()
1408 .find(|f| f.id.as_str() == flow_id)
1409 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1410 return Ok(entry.flow.clone());
1411 }
1412 bail!("flow '{flow_id}' not available (pack exports disabled)")
1413 }
1414
1415 pub fn metadata(&self) -> &PackMetadata {
1416 &self.metadata
1417 }
1418
1419 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1420 &self.metadata.secret_requirements
1421 }
1422
1423 pub fn missing_secrets(
1424 &self,
1425 tenant_ctx: &TypesTenantCtx,
1426 ) -> Vec<greentic_types::SecretRequirement> {
1427 let env = tenant_ctx.env.as_str().to_string();
1428 let tenant = tenant_ctx.tenant.as_str().to_string();
1429 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1430 self.required_secrets()
1431 .iter()
1432 .filter(|req| {
1433 if let Some(scope) = &req.scope {
1435 if scope.env != env {
1436 return false;
1437 }
1438 if scope.tenant != tenant {
1439 return false;
1440 }
1441 if let Some(ref team_req) = scope.team
1442 && team.as_ref() != Some(team_req)
1443 {
1444 return false;
1445 }
1446 }
1447 read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1448 })
1449 .cloned()
1450 .collect()
1451 }
1452
1453 pub fn for_component_test(
1454 components: Vec<(String, PathBuf)>,
1455 flows: HashMap<String, FlowIR>,
1456 config: Arc<HostConfig>,
1457 ) -> Result<Self> {
1458 let engine = Engine::default();
1459 let mut component_map = HashMap::new();
1460 for (name, path) in components {
1461 if !path.exists() {
1462 bail!("component artifact missing: {}", path.display());
1463 }
1464 let wasm_bytes = std::fs::read(&path)?;
1465 let component = Component::from_binary(&engine, &wasm_bytes)
1466 .with_context(|| format!("failed to compile component {}", path.display()))?;
1467 component_map.insert(
1468 name.clone(),
1469 PackComponent {
1470 name,
1471 version: "0.0.0".into(),
1472 component,
1473 },
1474 );
1475 }
1476
1477 let mut flow_map = HashMap::new();
1478 let mut descriptors = Vec::new();
1479 for (id, ir) in flows {
1480 let flow_type = ir.flow_type.clone();
1481 let flow = flow_ir_to_flow(ir)?;
1482 flow_map.insert(id.clone(), flow);
1483 descriptors.push(FlowDescriptor {
1484 id: id.clone(),
1485 flow_type,
1486 profile: "test".into(),
1487 version: "0.0.0".into(),
1488 description: None,
1489 });
1490 }
1491 let flows_cache = PackFlows {
1492 descriptors: descriptors.clone(),
1493 flows: flow_map,
1494 metadata: PackMetadata::fallback(Path::new("component-test")),
1495 };
1496
1497 Ok(Self {
1498 path: PathBuf::new(),
1499 archive_path: None,
1500 config,
1501 engine,
1502 metadata: PackMetadata::fallback(Path::new("component-test")),
1503 manifest: None,
1504 legacy_manifest: None,
1505 mocks: None,
1506 flows: Some(flows_cache),
1507 components: component_map,
1508 http_client: Arc::clone(&HTTP_CLIENT),
1509 pre_cache: Mutex::new(HashMap::new()),
1510 session_store: None,
1511 state_store: None,
1512 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1513 provider_registry: RwLock::new(None),
1514 secrets: crate::secrets::default_manager(),
1515 oauth_config: None,
1516 })
1517 }
1518}
1519
1520fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1521 let message = err.to_string();
1522 message.contains("no exported instance named")
1523 && message.contains(&format!("greentic:component/node@{version}"))
1524}
1525
1526struct PackFlows {
1527 descriptors: Vec<FlowDescriptor>,
1528 flows: HashMap<String, Flow>,
1529 metadata: PackMetadata,
1530}
1531
1532const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1533 "greentic.pack.runtime_flow",
1534 "greentic.pack.flow_runtime",
1535 "greentic.pack.runtime_flows",
1536];
1537
1538#[derive(Debug, Deserialize)]
1539struct RuntimeFlowBundle {
1540 flows: Vec<RuntimeFlow>,
1541}
1542
1543#[derive(Debug, Deserialize)]
1544struct RuntimeFlow {
1545 id: String,
1546 #[serde(alias = "flow_type")]
1547 kind: FlowKind,
1548 #[serde(default)]
1549 schema_version: Option<String>,
1550 #[serde(default)]
1551 start: Option<String>,
1552 #[serde(default)]
1553 entrypoints: BTreeMap<String, Value>,
1554 nodes: BTreeMap<String, RuntimeNode>,
1555 #[serde(default)]
1556 metadata: Option<FlowMetadata>,
1557}
1558
1559#[derive(Debug, Deserialize)]
1560struct RuntimeNode {
1561 #[serde(alias = "component")]
1562 component_id: String,
1563 #[serde(default, alias = "operation")]
1564 operation_name: Option<String>,
1565 #[serde(default, alias = "payload", alias = "input")]
1566 operation_payload: Value,
1567 #[serde(default)]
1568 routing: Option<Routing>,
1569 #[serde(default)]
1570 telemetry: Option<TelemetryHints>,
1571}
1572
1573fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1574 if bytes.is_empty() {
1575 return Ok(Value::Null);
1576 }
1577 serde_json::from_slice(&bytes).or_else(|_| {
1578 String::from_utf8(bytes)
1579 .map(Value::String)
1580 .map_err(|err| anyhow!(err))
1581 })
1582}
1583
1584impl PackFlows {
1585 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1586 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1587 return flows;
1588 }
1589 let descriptors = manifest
1590 .flows
1591 .iter()
1592 .map(|entry| FlowDescriptor {
1593 id: entry.id.as_str().to_string(),
1594 flow_type: flow_kind_to_str(entry.kind).to_string(),
1595 profile: manifest.pack_id.as_str().to_string(),
1596 version: manifest.version.to_string(),
1597 description: None,
1598 })
1599 .collect();
1600 let mut flows = HashMap::new();
1601 for entry in &manifest.flows {
1602 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1603 }
1604 Self {
1605 metadata: PackMetadata::from_manifest(&manifest),
1606 descriptors,
1607 flows,
1608 }
1609 }
1610}
1611
1612fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1613 let extensions = manifest.extensions.as_ref()?;
1614 let extension = extensions.iter().find_map(|(key, ext)| {
1615 if RUNTIME_FLOW_EXTENSION_IDS
1616 .iter()
1617 .any(|candidate| candidate == key)
1618 {
1619 Some(ext)
1620 } else {
1621 None
1622 }
1623 })?;
1624 let runtime_flows = match decode_runtime_flow_extension(extension) {
1625 Some(flows) if !flows.is_empty() => flows,
1626 _ => return None,
1627 };
1628
1629 let descriptors = runtime_flows
1630 .iter()
1631 .map(|flow| FlowDescriptor {
1632 id: flow.id.as_str().to_string(),
1633 flow_type: flow_kind_to_str(flow.kind).to_string(),
1634 profile: manifest.pack_id.as_str().to_string(),
1635 version: manifest.version.to_string(),
1636 description: None,
1637 })
1638 .collect::<Vec<_>>();
1639 let flows = runtime_flows
1640 .into_iter()
1641 .map(|flow| (flow.id.as_str().to_string(), flow))
1642 .collect();
1643
1644 Some(PackFlows {
1645 metadata: PackMetadata::from_manifest(manifest),
1646 descriptors,
1647 flows,
1648 })
1649}
1650
1651fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1652 let value = match extension.inline.as_ref()? {
1653 ExtensionInline::Other(value) => value.clone(),
1654 _ => return None,
1655 };
1656
1657 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1658 return Some(collect_runtime_flows(bundle.flows));
1659 }
1660
1661 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1662 return Some(collect_runtime_flows(flows));
1663 }
1664
1665 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1666 return Some(flows);
1667 }
1668
1669 warn!(
1670 extension = %extension.kind,
1671 version = %extension.version,
1672 "runtime flow extension present but could not be decoded"
1673 );
1674 None
1675}
1676
1677fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1678 flows
1679 .into_iter()
1680 .filter_map(|flow| match runtime_flow_to_flow(flow) {
1681 Ok(flow) => Some(flow),
1682 Err(err) => {
1683 warn!(error = %err, "failed to decode runtime flow");
1684 None
1685 }
1686 })
1687 .collect()
1688}
1689
1690fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1691 let flow_id = FlowId::from_str(&runtime.id)
1692 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1693 let mut entrypoints = runtime.entrypoints;
1694 if entrypoints.is_empty()
1695 && let Some(start) = &runtime.start
1696 {
1697 entrypoints.insert("default".into(), Value::String(start.clone()));
1698 }
1699
1700 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1701 for (id, node) in runtime.nodes {
1702 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1703 let component_id = ComponentId::from_str(&node.component_id)
1704 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1705 let component = FlowComponentRef {
1706 id: component_id,
1707 pack_alias: None,
1708 operation: node.operation_name,
1709 };
1710 let routing = node.routing.unwrap_or(Routing::End);
1711 let telemetry = node.telemetry.unwrap_or_default();
1712 nodes.insert(
1713 node_id.clone(),
1714 Node {
1715 id: node_id,
1716 component,
1717 input: InputMapping {
1718 mapping: node.operation_payload,
1719 },
1720 output: OutputMapping {
1721 mapping: Value::Null,
1722 },
1723 routing,
1724 telemetry,
1725 },
1726 );
1727 }
1728
1729 Ok(Flow {
1730 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1731 id: flow_id,
1732 kind: runtime.kind,
1733 entrypoints,
1734 nodes,
1735 metadata: runtime.metadata.unwrap_or_default(),
1736 })
1737}
1738
1739fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1740 match kind {
1741 greentic_types::FlowKind::Messaging => "messaging",
1742 greentic_types::FlowKind::Event => "event",
1743 greentic_types::FlowKind::ComponentConfig => "component-config",
1744 greentic_types::FlowKind::Job => "job",
1745 greentic_types::FlowKind::Http => "http",
1746 }
1747}
1748
1749fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1750 let mut file = archive
1751 .by_name(name)
1752 .with_context(|| format!("entry {name} missing from archive"))?;
1753 let mut buf = Vec::new();
1754 file.read_to_end(&mut buf)?;
1755 Ok(buf)
1756}
1757
1758fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1759 for node in doc.nodes.values_mut() {
1760 let Some((component_ref, payload)) = node
1761 .raw
1762 .iter()
1763 .next()
1764 .map(|(key, value)| (key.clone(), value.clone()))
1765 else {
1766 continue;
1767 };
1768 if component_ref.starts_with("emit.") {
1769 node.operation = Some(component_ref);
1770 node.payload = payload;
1771 node.raw.clear();
1772 continue;
1773 }
1774 let (target_component, operation, input, config) =
1775 infer_component_exec(&payload, &component_ref);
1776 let mut payload_obj = serde_json::Map::new();
1777 payload_obj.insert("component".into(), Value::String(target_component));
1779 payload_obj.insert("operation".into(), Value::String(operation));
1780 payload_obj.insert("input".into(), input);
1781 if let Some(cfg) = config {
1782 payload_obj.insert("config".into(), cfg);
1783 }
1784 node.operation = Some("component.exec".to_string());
1785 node.payload = Value::Object(payload_obj);
1786 node.raw.clear();
1787 }
1788 doc
1789}
1790
1791fn infer_component_exec(
1792 payload: &Value,
1793 component_ref: &str,
1794) -> (String, String, Value, Option<Value>) {
1795 let default_op = if component_ref.starts_with("templating.") {
1796 "render"
1797 } else {
1798 "invoke"
1799 }
1800 .to_string();
1801
1802 if let Value::Object(map) = payload {
1803 let op = map
1804 .get("op")
1805 .or_else(|| map.get("operation"))
1806 .and_then(Value::as_str)
1807 .map(|s| s.to_string())
1808 .unwrap_or_else(|| default_op.clone());
1809
1810 let mut input = map.clone();
1811 let config = input.remove("config");
1812 let component = input
1813 .get("component")
1814 .or_else(|| input.get("component_ref"))
1815 .and_then(Value::as_str)
1816 .map(|s| s.to_string())
1817 .unwrap_or_else(|| component_ref.to_string());
1818 input.remove("component");
1819 input.remove("component_ref");
1820 input.remove("op");
1821 input.remove("operation");
1822 return (component, op, Value::Object(input), config);
1823 }
1824
1825 (component_ref.to_string(), default_op, payload.clone(), None)
1826}
1827
1828#[derive(Clone, Debug)]
1829struct ComponentSpec {
1830 id: String,
1831 version: String,
1832 legacy_path: Option<String>,
1833}
1834
1835#[derive(Clone, Debug)]
1836struct ComponentSourceInfo {
1837 digest: String,
1838 source: ComponentSourceRef,
1839 artifact: ComponentArtifactLocation,
1840}
1841
1842#[derive(Clone, Debug)]
1843enum ComponentArtifactLocation {
1844 Inline { wasm_path: String },
1845 Remote,
1846}
1847
1848fn component_specs(
1849 manifest: Option<&greentic_types::PackManifest>,
1850 legacy_manifest: Option<&legacy_pack::PackManifest>,
1851) -> Vec<ComponentSpec> {
1852 if let Some(manifest) = manifest {
1853 return manifest
1854 .components
1855 .iter()
1856 .map(|entry| ComponentSpec {
1857 id: entry.id.as_str().to_string(),
1858 version: entry.version.to_string(),
1859 legacy_path: None,
1860 })
1861 .collect();
1862 }
1863 if let Some(legacy_manifest) = legacy_manifest {
1864 return legacy_manifest
1865 .components
1866 .iter()
1867 .map(|entry| ComponentSpec {
1868 id: entry.name.clone(),
1869 version: entry.version.to_string(),
1870 legacy_path: Some(entry.file_wasm.clone()),
1871 })
1872 .collect();
1873 }
1874 Vec::new()
1875}
1876
1877fn component_sources_table(
1878 manifest: &greentic_types::PackManifest,
1879) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
1880 let Some(sources) = manifest.get_component_sources_v1()? else {
1881 return Ok(None);
1882 };
1883 let mut table = HashMap::new();
1884 for entry in sources.components {
1885 let artifact = match entry.artifact {
1886 ArtifactLocationV1::Inline { wasm_path, .. } => {
1887 ComponentArtifactLocation::Inline { wasm_path }
1888 }
1889 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
1890 };
1891 let info = ComponentSourceInfo {
1892 digest: entry.resolved.digest,
1893 source: entry.source,
1894 artifact,
1895 };
1896 if let Some(component_id) = entry.component_id.as_ref() {
1897 table.insert(component_id.as_str().to_string(), info.clone());
1898 }
1899 table.insert(entry.name, info);
1900 }
1901 Ok(Some(table))
1902}
1903
1904fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1905 if let Some(path) = &spec.legacy_path {
1906 return root.join(path);
1907 }
1908 root.join("components").join(format!("{}.wasm", spec.id))
1909}
1910
1911fn normalize_digest(digest: &str) -> String {
1912 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
1913 digest.to_string()
1914 } else {
1915 format!("sha256:{digest}")
1916 }
1917}
1918
1919fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
1920 if digest.starts_with("blake3:") {
1921 let hash = blake3::hash(bytes);
1922 return Ok(format!("blake3:{}", hash.to_hex()));
1923 }
1924 let mut hasher = sha2::Sha256::new();
1925 hasher.update(bytes);
1926 Ok(format!("sha256:{:x}", hasher.finalize()))
1927}
1928
1929fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
1930 let normalized_expected = normalize_digest(expected);
1931 let actual = compute_digest_for(bytes, &normalized_expected)?;
1932 if normalize_digest(&actual) != normalized_expected {
1933 bail!(
1934 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
1935 );
1936 }
1937 Ok(())
1938}
1939
1940fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
1941 let mut opts = DistOptions {
1942 allow_tags: true,
1943 ..DistOptions::default()
1944 };
1945 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
1946 opts.cache_dir = cache_dir;
1947 }
1948 if component_resolution.dist_offline {
1949 opts.offline = true;
1950 }
1951 opts
1952}
1953
1954#[allow(clippy::too_many_arguments)]
1955async fn load_components_from_sources(
1956 engine: &Engine,
1957 component_sources: &HashMap<String, ComponentSourceInfo>,
1958 component_resolution: &ComponentResolution,
1959 specs: &[ComponentSpec],
1960 missing: &mut HashSet<String>,
1961 into: &mut HashMap<String, PackComponent>,
1962 materialized_root: Option<&Path>,
1963 archive_hint: Option<&Path>,
1964) -> Result<()> {
1965 let mut archive = if let Some(path) = archive_hint {
1966 Some(
1967 ZipArchive::new(File::open(path)?)
1968 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
1969 )
1970 } else {
1971 None
1972 };
1973 let mut dist_client: Option<DistClient> = None;
1974
1975 for spec in specs {
1976 if !missing.contains(&spec.id) {
1977 continue;
1978 }
1979 let Some(source) = component_sources.get(&spec.id) else {
1980 continue;
1981 };
1982
1983 let bytes = match &source.artifact {
1984 ComponentArtifactLocation::Inline { wasm_path } => {
1985 if let Some(root) = materialized_root {
1986 let path = root.join(wasm_path);
1987 if path.exists() {
1988 std::fs::read(&path).with_context(|| {
1989 format!(
1990 "failed to read inline component {} from {}",
1991 spec.id,
1992 path.display()
1993 )
1994 })?
1995 } else if archive.is_none() {
1996 bail!("inline component {} missing at {}", spec.id, path.display());
1997 } else {
1998 read_entry(
1999 archive.as_mut().expect("archive present when needed"),
2000 wasm_path,
2001 )
2002 .with_context(|| {
2003 format!(
2004 "inline component {} missing at {} in pack archive",
2005 spec.id, wasm_path
2006 )
2007 })?
2008 }
2009 } else if let Some(archive) = archive.as_mut() {
2010 read_entry(archive, wasm_path).with_context(|| {
2011 format!(
2012 "inline component {} missing at {} in pack archive",
2013 spec.id, wasm_path
2014 )
2015 })?
2016 } else {
2017 bail!(
2018 "inline component {} missing and no pack source available",
2019 spec.id
2020 );
2021 }
2022 }
2023 ComponentArtifactLocation::Remote => {
2024 let client = dist_client.get_or_insert_with(|| {
2025 DistClient::new(dist_options_from(component_resolution))
2026 });
2027 let reference = source.source.to_string();
2028 let cache_path = if component_resolution.dist_offline {
2029 client
2030 .fetch_digest(&source.digest)
2031 .await
2032 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2033 } else {
2034 let resolved = client
2035 .resolve_ref(&reference)
2036 .await
2037 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2038 let expected = normalize_digest(&source.digest);
2039 let actual = normalize_digest(&resolved.digest);
2040 if expected != actual {
2041 bail!(
2042 "component {} digest mismatch after fetch: expected {}, got {}",
2043 spec.id,
2044 expected,
2045 actual
2046 );
2047 }
2048 resolved.cache_path.ok_or_else(|| {
2049 anyhow!(
2050 "component {} resolved from {} but cache path is missing",
2051 spec.id,
2052 reference
2053 )
2054 })?
2055 };
2056 std::fs::read(&cache_path).with_context(|| {
2057 format!(
2058 "failed to read cached component {} from {}",
2059 spec.id,
2060 cache_path.display()
2061 )
2062 })?
2063 }
2064 };
2065
2066 verify_component_digest(&spec.id, &source.digest, &bytes)?;
2067 let component = Component::from_binary(engine, &bytes)
2068 .with_context(|| format!("failed to compile component {}", spec.id))?;
2069 into.insert(
2070 spec.id.clone(),
2071 PackComponent {
2072 name: spec.id.clone(),
2073 version: spec.version.clone(),
2074 component,
2075 },
2076 );
2077 missing.remove(&spec.id);
2078 }
2079
2080 Ok(())
2081}
2082
2083fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2084 match err {
2085 DistError::CacheMiss { reference: missing } => anyhow!(
2086 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2087 component_id,
2088 missing,
2089 reference
2090 ),
2091 DistError::Offline { reference: blocked } => anyhow!(
2092 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2093 component_id,
2094 blocked,
2095 reference
2096 ),
2097 DistError::AuthRequired { target } => anyhow!(
2098 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2099 component_id,
2100 target,
2101 reference
2102 ),
2103 other => anyhow!(
2104 "failed to resolve component {} from {}: {}",
2105 component_id,
2106 reference,
2107 other
2108 ),
2109 }
2110}
2111
2112fn load_components_from_overrides(
2113 engine: &Engine,
2114 overrides: &HashMap<String, PathBuf>,
2115 specs: &[ComponentSpec],
2116 missing: &mut HashSet<String>,
2117 into: &mut HashMap<String, PackComponent>,
2118) -> Result<()> {
2119 for spec in specs {
2120 if !missing.contains(&spec.id) {
2121 continue;
2122 }
2123 let Some(path) = overrides.get(&spec.id) else {
2124 continue;
2125 };
2126 let bytes = std::fs::read(path)
2127 .with_context(|| format!("failed to read override component {}", path.display()))?;
2128 let component = Component::from_binary(engine, &bytes).with_context(|| {
2129 format!(
2130 "failed to compile component {} from override {}",
2131 spec.id,
2132 path.display()
2133 )
2134 })?;
2135 into.insert(
2136 spec.id.clone(),
2137 PackComponent {
2138 name: spec.id.clone(),
2139 version: spec.version.clone(),
2140 component,
2141 },
2142 );
2143 missing.remove(&spec.id);
2144 }
2145 Ok(())
2146}
2147
2148fn load_components_from_dir(
2149 engine: &Engine,
2150 root: &Path,
2151 specs: &[ComponentSpec],
2152 missing: &mut HashSet<String>,
2153 into: &mut HashMap<String, PackComponent>,
2154) -> Result<()> {
2155 for spec in specs {
2156 if !missing.contains(&spec.id) {
2157 continue;
2158 }
2159 let path = component_path_for_spec(root, spec);
2160 if !path.exists() {
2161 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2162 continue;
2163 }
2164 let bytes = std::fs::read(&path)
2165 .with_context(|| format!("failed to read component {}", path.display()))?;
2166 let component = Component::from_binary(engine, &bytes).with_context(|| {
2167 format!(
2168 "failed to compile component {} from {}",
2169 spec.id,
2170 path.display()
2171 )
2172 })?;
2173 into.insert(
2174 spec.id.clone(),
2175 PackComponent {
2176 name: spec.id.clone(),
2177 version: spec.version.clone(),
2178 component,
2179 },
2180 );
2181 missing.remove(&spec.id);
2182 }
2183 Ok(())
2184}
2185
2186fn load_components_from_archive(
2187 engine: &Engine,
2188 path: &Path,
2189 specs: &[ComponentSpec],
2190 missing: &mut HashSet<String>,
2191 into: &mut HashMap<String, PackComponent>,
2192) -> Result<()> {
2193 let mut archive = ZipArchive::new(File::open(path)?)
2194 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2195 for spec in specs {
2196 if !missing.contains(&spec.id) {
2197 continue;
2198 }
2199 let file_name = spec
2200 .legacy_path
2201 .clone()
2202 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2203 let bytes = match read_entry(&mut archive, &file_name) {
2204 Ok(bytes) => bytes,
2205 Err(err) => {
2206 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2207 continue;
2208 }
2209 };
2210 let component = Component::from_binary(engine, &bytes)
2211 .with_context(|| format!("failed to compile component {}", spec.id))?;
2212 into.insert(
2213 spec.id.clone(),
2214 PackComponent {
2215 name: spec.id.clone(),
2216 version: spec.version.clone(),
2217 component,
2218 },
2219 );
2220 missing.remove(&spec.id);
2221 }
2222 Ok(())
2223}
2224
2225#[cfg(test)]
2226mod tests {
2227 use super::*;
2228 use greentic_flow::model::{FlowDoc, NodeDoc};
2229 use indexmap::IndexMap;
2230 use serde_json::json;
2231
2232 #[test]
2233 fn normalizes_raw_component_to_component_exec() {
2234 let mut nodes = IndexMap::new();
2235 let mut raw = IndexMap::new();
2236 raw.insert(
2237 "templating.handlebars".into(),
2238 json!({ "template": "Hi {{name}}" }),
2239 );
2240 nodes.insert(
2241 "start".into(),
2242 NodeDoc {
2243 raw,
2244 routing: json!([{"out": true}]),
2245 ..Default::default()
2246 },
2247 );
2248 let doc = FlowDoc {
2249 id: "welcome".into(),
2250 title: None,
2251 description: None,
2252 flow_type: "messaging".into(),
2253 start: Some("start".into()),
2254 parameters: json!({}),
2255 tags: Vec::new(),
2256 schema_version: None,
2257 entrypoints: IndexMap::new(),
2258 nodes,
2259 };
2260
2261 let normalized = normalize_flow_doc(doc);
2262 let node = normalized.nodes.get("start").expect("node exists");
2263 assert_eq!(node.operation.as_deref(), Some("component.exec"));
2264 assert!(node.raw.is_empty());
2265 let payload = node.payload.as_object().expect("payload object");
2266 assert_eq!(
2267 payload.get("component"),
2268 Some(&Value::String("templating.handlebars".into()))
2269 );
2270 assert_eq!(
2271 payload.get("operation"),
2272 Some(&Value::String("render".into()))
2273 );
2274 let input = payload.get("input").unwrap();
2275 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2276 }
2277}
2278
2279#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2280pub struct PackMetadata {
2281 pub pack_id: String,
2282 pub version: String,
2283 #[serde(default)]
2284 pub entry_flows: Vec<String>,
2285 #[serde(default)]
2286 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2287}
2288
2289impl PackMetadata {
2290 fn from_wasm(bytes: &[u8]) -> Option<Self> {
2291 let parser = Parser::new(0);
2292 for payload in parser.parse_all(bytes) {
2293 let payload = payload.ok()?;
2294 match payload {
2295 Payload::CustomSection(section) => {
2296 if section.name() == "greentic.manifest"
2297 && let Ok(meta) = Self::from_bytes(section.data())
2298 {
2299 return Some(meta);
2300 }
2301 }
2302 Payload::DataSection(reader) => {
2303 for segment in reader.into_iter().flatten() {
2304 if let Ok(meta) = Self::from_bytes(segment.data) {
2305 return Some(meta);
2306 }
2307 }
2308 }
2309 _ => {}
2310 }
2311 }
2312 None
2313 }
2314
2315 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2316 #[derive(Deserialize)]
2317 struct RawManifest {
2318 pack_id: String,
2319 version: String,
2320 #[serde(default)]
2321 entry_flows: Vec<String>,
2322 #[serde(default)]
2323 flows: Vec<RawFlow>,
2324 #[serde(default)]
2325 secret_requirements: Vec<greentic_types::SecretRequirement>,
2326 }
2327
2328 #[derive(Deserialize)]
2329 struct RawFlow {
2330 id: String,
2331 }
2332
2333 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2334 let mut entry_flows = if manifest.entry_flows.is_empty() {
2335 manifest.flows.iter().map(|f| f.id.clone()).collect()
2336 } else {
2337 manifest.entry_flows.clone()
2338 };
2339 entry_flows.retain(|id| !id.is_empty());
2340 Ok(Self {
2341 pack_id: manifest.pack_id,
2342 version: manifest.version,
2343 entry_flows,
2344 secret_requirements: manifest.secret_requirements,
2345 })
2346 }
2347
2348 pub fn fallback(path: &Path) -> Self {
2349 let pack_id = path
2350 .file_stem()
2351 .map(|s| s.to_string_lossy().into_owned())
2352 .unwrap_or_else(|| "unknown-pack".to_string());
2353 Self {
2354 pack_id,
2355 version: "0.0.0".to_string(),
2356 entry_flows: Vec::new(),
2357 secret_requirements: Vec::new(),
2358 }
2359 }
2360
2361 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2362 let entry_flows = manifest
2363 .flows
2364 .iter()
2365 .map(|flow| flow.id.as_str().to_string())
2366 .collect::<Vec<_>>();
2367 Self {
2368 pack_id: manifest.pack_id.as_str().to_string(),
2369 version: manifest.version.to_string(),
2370 entry_flows,
2371 secret_requirements: manifest.secret_requirements.clone(),
2372 }
2373 }
2374}