1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::{
10 self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
13use crate::provider::{ProviderBinding, ProviderRegistry};
14use crate::provider_core::SchemaCorePre as ProviderComponentPre;
15use crate::provider_core_only;
16use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
17use anyhow::{Context, Result, anyhow, bail};
18use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20 self as host_v1, HostFns, add_all_v1_to_linker,
21 runner_host_http::RunnerHostHttp,
22 runner_host_kv::RunnerHostKv,
23 secrets_store::{SecretsError, SecretsStoreHost},
24 state_store::{
25 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26 StateStoreHost, TenantCtx as StateTenantCtx,
27 },
28 telemetry_logger::{
29 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31 TenantCtx as TelemetryTenantCtx,
32 },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::flow::FlowHasher;
38use greentic_types::{
39 ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
40 EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
41 FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
42 TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
43 pack_manifest::ExtensionInline,
44};
45use host_v1::http_client::{
46 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
47 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
48 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
49 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
50};
51use indexmap::IndexMap;
52use once_cell::sync::Lazy;
53use parking_lot::{Mutex, RwLock};
54use reqwest::blocking::Client as BlockingClient;
55use runner_core::normalize_under_root;
56use serde::{Deserialize, Serialize};
57use serde_cbor;
58use serde_json::{self, Value};
59use sha2::Digest;
60use tokio::fs;
61use wasmparser::{Parser, Payload};
62use wasmtime::StoreContextMut;
63use zip::ZipArchive;
64
65use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
66use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
67use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
68
69use crate::config::HostConfig;
70use crate::secrets::{DynSecretsManager, read_secret_blocking};
71use crate::storage::state::STATE_PREFIX;
72use crate::storage::{DynSessionStore, DynStateStore};
73use crate::verify;
74use crate::wasi::RunnerWasiPolicy;
75use tracing::warn;
76use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
77use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
78
79use greentic_flow::model::FlowDoc;
80
81#[allow(dead_code)]
82pub struct PackRuntime {
83 path: PathBuf,
85 archive_path: Option<PathBuf>,
87 config: Arc<HostConfig>,
88 engine: Engine,
89 metadata: PackMetadata,
90 manifest: Option<greentic_types::PackManifest>,
91 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
92 component_manifests: HashMap<String, ComponentManifest>,
93 mocks: Option<Arc<MockLayer>>,
94 flows: Option<PackFlows>,
95 components: HashMap<String, PackComponent>,
96 http_client: Arc<BlockingClient>,
97 pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
98 session_store: Option<DynSessionStore>,
99 state_store: Option<DynStateStore>,
100 wasi_policy: Arc<RunnerWasiPolicy>,
101 provider_registry: RwLock<Option<ProviderRegistry>>,
102 secrets: DynSecretsManager,
103 oauth_config: Option<OAuthBrokerConfig>,
104}
105
106struct PackComponent {
107 #[allow(dead_code)]
108 name: String,
109 #[allow(dead_code)]
110 version: String,
111 component: Component,
112}
113
114#[derive(Debug, Default, Clone)]
115pub struct ComponentResolution {
116 pub materialized_root: Option<PathBuf>,
118 pub overrides: HashMap<String, PathBuf>,
120 pub dist_offline: bool,
122 pub dist_cache_dir: Option<PathBuf>,
124}
125
126fn build_blocking_client() -> BlockingClient {
127 std::thread::spawn(|| {
128 BlockingClient::builder()
129 .no_proxy()
130 .build()
131 .expect("blocking client")
132 })
133 .join()
134 .expect("client build thread panicked")
135}
136
137fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
138 let (root, candidate) = if path.is_absolute() {
139 let parent = path
140 .parent()
141 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
142 let root = parent
143 .canonicalize()
144 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
145 let file = path
146 .file_name()
147 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
148 (root, PathBuf::from(file))
149 } else {
150 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
151 let base = if let Some(parent) = path.parent() {
152 cwd.join(parent)
153 } else {
154 cwd
155 };
156 let root = base
157 .canonicalize()
158 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
159 let file = path
160 .file_name()
161 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
162 (root, PathBuf::from(file))
163 };
164 let safe = normalize_under_root(&root, &candidate)?;
165 Ok((root, safe))
166}
167
168static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct FlowDescriptor {
172 pub id: String,
173 #[serde(rename = "type")]
174 pub flow_type: String,
175 pub profile: String,
176 pub version: String,
177 #[serde(default)]
178 pub description: Option<String>,
179}
180
181pub struct HostState {
182 config: Arc<HostConfig>,
183 http_client: Arc<BlockingClient>,
184 default_env: String,
185 #[allow(dead_code)]
186 session_store: Option<DynSessionStore>,
187 state_store: Option<DynStateStore>,
188 mocks: Option<Arc<MockLayer>>,
189 secrets: DynSecretsManager,
190 oauth_config: Option<OAuthBrokerConfig>,
191 oauth_host: OAuthBrokerHost,
192 exec_ctx: Option<ComponentExecCtx>,
193}
194
195impl HostState {
196 #[allow(clippy::default_constructed_unit_structs)]
197 #[allow(clippy::too_many_arguments)]
198 pub fn new(
199 config: Arc<HostConfig>,
200 http_client: Arc<BlockingClient>,
201 mocks: Option<Arc<MockLayer>>,
202 session_store: Option<DynSessionStore>,
203 state_store: Option<DynStateStore>,
204 secrets: DynSecretsManager,
205 oauth_config: Option<OAuthBrokerConfig>,
206 exec_ctx: Option<ComponentExecCtx>,
207 ) -> Result<Self> {
208 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
209 Ok(Self {
210 config,
211 http_client,
212 default_env,
213 session_store,
214 state_store,
215 mocks,
216 secrets,
217 oauth_config,
218 oauth_host: OAuthBrokerHost::default(),
219 exec_ctx,
220 })
221 }
222
223 pub fn get_secret(&self, key: &str) -> Result<String> {
224 if provider_core_only::is_enabled() {
225 bail!(provider_core_only::blocked_message("secrets"))
226 }
227 if !self.config.secrets_policy.is_allowed(key) {
228 bail!("secret {key} is not permitted by bindings policy");
229 }
230 if let Some(mock) = &self.mocks
231 && let Some(value) = mock.secrets_lookup(key)
232 {
233 return Ok(value);
234 }
235 let bytes = read_secret_blocking(&self.secrets, key)
236 .context("failed to read secret from manager")?;
237 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
238 Ok(value)
239 }
240
241 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
242 let tenant_raw = ctx
243 .as_ref()
244 .map(|ctx| ctx.tenant.clone())
245 .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
246 .unwrap_or_else(|| self.config.tenant.clone());
247 let env_raw = ctx
248 .as_ref()
249 .map(|ctx| ctx.env.clone())
250 .unwrap_or_else(|| self.default_env.clone());
251 let tenant_id = TenantId::from_str(&tenant_raw)
252 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
253 let env_id = EnvId::from_str(&env_raw)
254 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
255 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
256 if let Some(exec_ctx) = self.exec_ctx.as_ref() {
257 if let Some(team) = exec_ctx.tenant.team.as_ref() {
258 let team_id =
259 TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
260 tenant_ctx = tenant_ctx.with_team(Some(team_id));
261 }
262 if let Some(user) = exec_ctx.tenant.user.as_ref() {
263 let user_id =
264 UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
265 tenant_ctx = tenant_ctx.with_user(Some(user_id));
266 }
267 tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
268 if let Some(node) = exec_ctx.node_id.as_ref() {
269 tenant_ctx = tenant_ctx.with_node(node.clone());
270 }
271 if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
272 tenant_ctx = tenant_ctx.with_session(session.clone());
273 }
274 tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
275 }
276
277 if let Some(ctx) = ctx {
278 if let Some(team) = ctx.team.or(ctx.team_id) {
279 let team_id =
280 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
281 tenant_ctx = tenant_ctx.with_team(Some(team_id));
282 }
283 if let Some(user) = ctx.user.or(ctx.user_id) {
284 let user_id =
285 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
286 tenant_ctx = tenant_ctx.with_user(Some(user_id));
287 }
288 if let Some(flow) = ctx.flow_id {
289 tenant_ctx = tenant_ctx.with_flow(flow);
290 }
291 if let Some(node) = ctx.node_id {
292 tenant_ctx = tenant_ctx.with_node(node);
293 }
294 if let Some(provider) = ctx.provider_id {
295 tenant_ctx = tenant_ctx.with_provider(provider);
296 }
297 if let Some(session) = ctx.session_id {
298 tenant_ctx = tenant_ctx.with_session(session);
299 }
300 tenant_ctx.trace_id = ctx.trace_id;
301 }
302 Ok(tenant_ctx)
303 }
304
305 fn send_http_request(
306 &mut self,
307 req: HttpRequest,
308 opts: Option<HttpRequestOptionsV1_1>,
309 _ctx: Option<HttpTenantCtx>,
310 ) -> Result<HttpResponse, HttpClientError> {
311 if !self.config.http_enabled {
312 return Err(HttpClientError {
313 code: "denied".into(),
314 message: "http client disabled by policy".into(),
315 });
316 }
317
318 let mut mock_state = None;
319 let raw_body = req.body.clone();
320 if let Some(mock) = &self.mocks
321 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
322 {
323 match mock.http_begin(&meta) {
324 HttpDecision::Mock(response) => {
325 let headers = response
326 .headers
327 .iter()
328 .map(|(k, v)| (k.clone(), v.clone()))
329 .collect();
330 return Ok(HttpResponse {
331 status: response.status,
332 headers,
333 body: response.body.clone().map(|b| b.into_bytes()),
334 });
335 }
336 HttpDecision::Deny(reason) => {
337 return Err(HttpClientError {
338 code: "denied".into(),
339 message: reason,
340 });
341 }
342 HttpDecision::Passthrough { record } => {
343 mock_state = Some((meta, record));
344 }
345 }
346 }
347
348 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
349 let mut builder = self.http_client.request(method, &req.url);
350 for (key, value) in req.headers {
351 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
352 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
353 {
354 builder = builder.header(header, header_value);
355 }
356 }
357
358 if let Some(body) = raw_body.clone() {
359 builder = builder.body(body);
360 }
361
362 if let Some(opts) = opts {
363 if let Some(timeout_ms) = opts.timeout_ms {
364 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
365 }
366 if opts.allow_insecure == Some(true) {
367 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
368 }
369 if let Some(follow_redirects) = opts.follow_redirects
370 && !follow_redirects
371 {
372 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
373 }
374 }
375
376 let response = match builder.send() {
377 Ok(resp) => resp,
378 Err(err) => {
379 warn!(url = %req.url, error = %err, "http client request failed");
380 return Err(HttpClientError {
381 code: "unavailable".into(),
382 message: err.to_string(),
383 });
384 }
385 };
386
387 let status = response.status().as_u16();
388 let headers_vec = response
389 .headers()
390 .iter()
391 .map(|(k, v)| {
392 (
393 k.as_str().to_string(),
394 v.to_str().unwrap_or_default().to_string(),
395 )
396 })
397 .collect::<Vec<_>>();
398 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
399
400 if let Some((meta, true)) = mock_state.take()
401 && let Some(mock) = &self.mocks
402 {
403 let recorded = HttpMockResponse::new(
404 status,
405 headers_vec.clone().into_iter().collect(),
406 body_bytes
407 .as_ref()
408 .map(|b| String::from_utf8_lossy(b).into_owned()),
409 );
410 mock.http_record(&meta, &recorded);
411 }
412
413 Ok(HttpResponse {
414 status,
415 headers: headers_vec,
416 body: body_bytes,
417 })
418 }
419}
420
421impl SecretsStoreHost for HostState {
422 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
423 if provider_core_only::is_enabled() {
424 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
425 return Err(SecretsError::Denied);
426 }
427 if !self.config.secrets_policy.is_allowed(&key) {
428 return Err(SecretsError::Denied);
429 }
430 if let Some(mock) = &self.mocks
431 && let Some(value) = mock.secrets_lookup(&key)
432 {
433 return Ok(Some(value.into_bytes()));
434 }
435 match read_secret_blocking(&self.secrets, &key) {
436 Ok(bytes) => Ok(Some(bytes)),
437 Err(err) => {
438 warn!(secret = %key, error = %err, "secret lookup failed");
439 Err(SecretsError::NotFound)
440 }
441 }
442 }
443}
444
445impl HttpClientHost for HostState {
446 fn send(
447 &mut self,
448 req: HttpRequest,
449 ctx: Option<HttpTenantCtx>,
450 ) -> Result<HttpResponse, HttpClientError> {
451 self.send_http_request(req, None, ctx)
452 }
453}
454
455impl HttpClientHostV1_1 for HostState {
456 fn send(
457 &mut self,
458 req: HttpRequestV1_1,
459 opts: Option<HttpRequestOptionsV1_1>,
460 ctx: Option<HttpTenantCtxV1_1>,
461 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
462 let legacy_req = HttpRequest {
463 method: req.method,
464 url: req.url,
465 headers: req.headers,
466 body: req.body,
467 };
468 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
469 env: ctx.env,
470 tenant: ctx.tenant,
471 tenant_id: ctx.tenant_id,
472 team: ctx.team,
473 team_id: ctx.team_id,
474 user: ctx.user,
475 user_id: ctx.user_id,
476 trace_id: ctx.trace_id,
477 correlation_id: ctx.correlation_id,
478 attributes: ctx.attributes,
479 session_id: ctx.session_id,
480 flow_id: ctx.flow_id,
481 node_id: ctx.node_id,
482 provider_id: ctx.provider_id,
483 deadline_ms: ctx.deadline_ms,
484 attempt: ctx.attempt,
485 idempotency_key: ctx.idempotency_key,
486 impersonation: ctx
487 .impersonation
488 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
489 actor_id,
490 reason,
491 }),
492 });
493
494 self.send_http_request(legacy_req, opts, legacy_ctx)
495 .map(|resp| HttpResponseV1_1 {
496 status: resp.status,
497 headers: resp.headers,
498 body: resp.body,
499 })
500 .map_err(|err| HttpClientErrorV1_1 {
501 code: err.code,
502 message: err.message,
503 })
504 }
505}
506
507impl StateStoreHost for HostState {
508 fn read(
509 &mut self,
510 key: HostStateKey,
511 ctx: Option<StateTenantCtx>,
512 ) -> Result<Vec<u8>, StateError> {
513 let store = match self.state_store.as_ref() {
514 Some(store) => store.clone(),
515 None => {
516 return Err(StateError {
517 code: "unavailable".into(),
518 message: "state store not configured".into(),
519 });
520 }
521 };
522 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
523 Ok(ctx) => ctx,
524 Err(err) => {
525 return Err(StateError {
526 code: "invalid-ctx".into(),
527 message: err.to_string(),
528 });
529 }
530 };
531 let key = StoreStateKey::from(key);
532 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
533 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
534 Ok(None) => Err(StateError {
535 code: "not_found".into(),
536 message: "state key not found".into(),
537 }),
538 Err(err) => Err(StateError {
539 code: "internal".into(),
540 message: err.to_string(),
541 }),
542 }
543 }
544
545 fn write(
546 &mut self,
547 key: HostStateKey,
548 bytes: Vec<u8>,
549 ctx: Option<StateTenantCtx>,
550 ) -> Result<StateOpAck, StateError> {
551 let store = match self.state_store.as_ref() {
552 Some(store) => store.clone(),
553 None => {
554 return Err(StateError {
555 code: "unavailable".into(),
556 message: "state store not configured".into(),
557 });
558 }
559 };
560 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
561 Ok(ctx) => ctx,
562 Err(err) => {
563 return Err(StateError {
564 code: "invalid-ctx".into(),
565 message: err.to_string(),
566 });
567 }
568 };
569 let key = StoreStateKey::from(key);
570 let value = serde_json::from_slice(&bytes)
571 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
572 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
573 Ok(()) => Ok(StateOpAck::Ok),
574 Err(err) => Err(StateError {
575 code: "internal".into(),
576 message: err.to_string(),
577 }),
578 }
579 }
580
581 fn delete(
582 &mut self,
583 key: HostStateKey,
584 ctx: Option<StateTenantCtx>,
585 ) -> Result<StateOpAck, StateError> {
586 let store = match self.state_store.as_ref() {
587 Some(store) => store.clone(),
588 None => {
589 return Err(StateError {
590 code: "unavailable".into(),
591 message: "state store not configured".into(),
592 });
593 }
594 };
595 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
596 Ok(ctx) => ctx,
597 Err(err) => {
598 return Err(StateError {
599 code: "invalid-ctx".into(),
600 message: err.to_string(),
601 });
602 }
603 };
604 let key = StoreStateKey::from(key);
605 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
606 Ok(_) => Ok(StateOpAck::Ok),
607 Err(err) => Err(StateError {
608 code: "internal".into(),
609 message: err.to_string(),
610 }),
611 }
612 }
613}
614
615impl TelemetryLoggerHost for HostState {
616 fn log(
617 &mut self,
618 span: TelemetrySpanContext,
619 fields: Vec<(String, String)>,
620 _ctx: Option<TelemetryTenantCtx>,
621 ) -> Result<TelemetryAck, TelemetryError> {
622 if let Some(mock) = &self.mocks
623 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
624 {
625 return Ok(TelemetryAck::Ok);
626 }
627 let mut map = serde_json::Map::new();
628 for (k, v) in fields {
629 map.insert(k, Value::String(v));
630 }
631 tracing::info!(
632 tenant = %span.tenant,
633 flow_id = %span.flow_id,
634 node = ?span.node_id,
635 provider = %span.provider,
636 fields = %serde_json::Value::Object(map.clone()),
637 "telemetry log from pack"
638 );
639 Ok(TelemetryAck::Ok)
640 }
641}
642
643impl RunnerHostHttp for HostState {
644 fn request(
645 &mut self,
646 method: String,
647 url: String,
648 headers: Vec<String>,
649 body: Option<Vec<u8>>,
650 ) -> Result<Vec<u8>, String> {
651 let req = HttpRequest {
652 method,
653 url,
654 headers: headers
655 .chunks(2)
656 .filter_map(|chunk| {
657 if chunk.len() == 2 {
658 Some((chunk[0].clone(), chunk[1].clone()))
659 } else {
660 None
661 }
662 })
663 .collect(),
664 body,
665 };
666 match HttpClientHost::send(self, req, None) {
667 Ok(resp) => Ok(resp.body.unwrap_or_default()),
668 Err(err) => Err(err.message),
669 }
670 }
671}
672
673impl RunnerHostKv for HostState {
674 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
675 None
676 }
677
678 fn put(&mut self, _ns: String, _key: String, _val: String) {}
679}
680
681enum ManifestLoad {
682 New {
683 manifest: Box<greentic_types::PackManifest>,
684 flows: PackFlows,
685 },
686 Legacy {
687 manifest: Box<legacy_pack::PackManifest>,
688 flows: PackFlows,
689 },
690}
691
692fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
693 let mut archive = ZipArchive::new(File::open(path)?)
694 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
695 let bytes = read_entry(&mut archive, "manifest.cbor")
696 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
697 match decode_pack_manifest(&bytes) {
698 Ok(manifest) => {
699 let cache = PackFlows::from_manifest(manifest.clone());
700 Ok(ManifestLoad::New {
701 manifest: Box::new(manifest),
702 flows: cache,
703 })
704 }
705 Err(err) => {
706 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
707 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
709 .context("failed to decode legacy pack manifest from manifest.cbor")?;
710 let flows = load_legacy_flows(&mut archive, &legacy)?;
711 Ok(ManifestLoad::Legacy {
712 manifest: Box::new(legacy),
713 flows,
714 })
715 }
716 }
717}
718
719fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
720 let manifest_path = root.join("manifest.cbor");
721 let bytes = std::fs::read(&manifest_path)
722 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
723 match decode_pack_manifest(&bytes) {
724 Ok(manifest) => {
725 let cache = PackFlows::from_manifest(manifest.clone());
726 Ok(ManifestLoad::New {
727 manifest: Box::new(manifest),
728 flows: cache,
729 })
730 }
731 Err(err) => {
732 tracing::debug!(
733 error = %err,
734 pack = %root.display(),
735 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
736 );
737 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
738 .context("failed to decode legacy pack manifest from manifest.cbor")?;
739 let flows = load_legacy_flows_from_dir(root, &legacy)?;
740 Ok(ManifestLoad::Legacy {
741 manifest: Box::new(legacy),
742 flows,
743 })
744 }
745 }
746}
747
748fn load_legacy_flows(
749 archive: &mut ZipArchive<File>,
750 manifest: &legacy_pack::PackManifest,
751) -> Result<PackFlows> {
752 let mut flows = HashMap::new();
753 let mut descriptors = Vec::new();
754
755 for entry in &manifest.flows {
756 let bytes = read_entry(archive, &entry.file_json)
757 .with_context(|| format!("missing flow json {}", entry.file_json))?;
758 let doc: FlowDoc = serde_json::from_slice(&bytes)
759 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
760 let normalized = normalize_flow_doc(doc);
761 let flow_ir = flow_doc_to_ir(normalized)?;
762 let flow = flow_ir_to_flow(flow_ir)?;
763
764 descriptors.push(FlowDescriptor {
765 id: entry.id.clone(),
766 flow_type: entry.kind.clone(),
767 profile: manifest.meta.pack_id.clone(),
768 version: manifest.meta.version.to_string(),
769 description: None,
770 });
771 flows.insert(entry.id.clone(), flow);
772 }
773
774 let mut entry_flows = manifest.meta.entry_flows.clone();
775 if entry_flows.is_empty() {
776 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
777 }
778 let metadata = PackMetadata {
779 pack_id: manifest.meta.pack_id.clone(),
780 version: manifest.meta.version.to_string(),
781 entry_flows,
782 secret_requirements: Vec::new(),
783 };
784
785 Ok(PackFlows {
786 descriptors,
787 flows,
788 metadata,
789 })
790}
791
792fn load_legacy_flows_from_dir(
793 root: &Path,
794 manifest: &legacy_pack::PackManifest,
795) -> Result<PackFlows> {
796 let mut flows = HashMap::new();
797 let mut descriptors = Vec::new();
798
799 for entry in &manifest.flows {
800 let path = root.join(&entry.file_json);
801 let bytes = std::fs::read(&path)
802 .with_context(|| format!("missing flow json {}", path.display()))?;
803 let doc: FlowDoc = serde_json::from_slice(&bytes)
804 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
805 let normalized = normalize_flow_doc(doc);
806 let flow_ir = flow_doc_to_ir(normalized)?;
807 let flow = flow_ir_to_flow(flow_ir)?;
808
809 descriptors.push(FlowDescriptor {
810 id: entry.id.clone(),
811 flow_type: entry.kind.clone(),
812 profile: manifest.meta.pack_id.clone(),
813 version: manifest.meta.version.to_string(),
814 description: None,
815 });
816 flows.insert(entry.id.clone(), flow);
817 }
818
819 let mut entry_flows = manifest.meta.entry_flows.clone();
820 if entry_flows.is_empty() {
821 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
822 }
823 let metadata = PackMetadata {
824 pack_id: manifest.meta.pack_id.clone(),
825 version: manifest.meta.version.to_string(),
826 entry_flows,
827 secret_requirements: Vec::new(),
828 };
829
830 Ok(PackFlows {
831 descriptors,
832 flows,
833 metadata,
834 })
835}
836
837pub struct ComponentState {
838 pub host: HostState,
839 wasi_ctx: WasiCtx,
840 resource_table: ResourceTable,
841}
842
843impl ComponentState {
844 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
845 let wasi_ctx = policy
846 .instantiate()
847 .context("failed to build WASI context")?;
848 Ok(Self {
849 host,
850 wasi_ctx,
851 resource_table: ResourceTable::new(),
852 })
853 }
854
855 fn host_mut(&mut self) -> &mut HostState {
856 &mut self.host
857 }
858
859 fn should_cancel_host(&mut self) -> bool {
860 false
861 }
862
863 fn yield_now_host(&mut self) {
864 }
866}
867
868impl component_api::v0_4::greentic::component::control::Host for ComponentState {
869 fn should_cancel(&mut self) -> bool {
870 self.should_cancel_host()
871 }
872
873 fn yield_now(&mut self) {
874 self.yield_now_host();
875 }
876}
877
878impl component_api::v0_5::greentic::component::control::Host for ComponentState {
879 fn should_cancel(&mut self) -> bool {
880 self.should_cancel_host()
881 }
882
883 fn yield_now(&mut self) {
884 self.yield_now_host();
885 }
886}
887
888fn add_component_control_instance(
889 linker: &mut Linker<ComponentState>,
890 name: &str,
891) -> wasmtime::Result<()> {
892 let mut inst = linker.instance(name)?;
893 inst.func_wrap(
894 "should-cancel",
895 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
896 let host = caller.data_mut();
897 Ok((host.should_cancel_host(),))
898 },
899 )?;
900 inst.func_wrap(
901 "yield-now",
902 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
903 let host = caller.data_mut();
904 host.yield_now_host();
905 Ok(())
906 },
907 )?;
908 Ok(())
909}
910
911fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
912 add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
913 add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
914 Ok(())
915}
916
917pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
918 add_wasi_to_linker(linker)?;
919 add_all_v1_to_linker(
920 linker,
921 HostFns {
922 http_client_v1_1: Some(|state| state.host_mut()),
923 http_client: Some(|state| state.host_mut()),
924 oauth_broker: None,
925 runner_host_http: Some(|state| state.host_mut()),
926 runner_host_kv: Some(|state| state.host_mut()),
927 telemetry_logger: Some(|state| state.host_mut()),
928 state_store: allow_state_store.then_some(|state| state.host_mut()),
929 secrets_store: Some(|state| state.host_mut()),
930 },
931 )?;
932 Ok(())
933}
934
935impl OAuthHostContext for ComponentState {
936 fn tenant_id(&self) -> &str {
937 &self.host.config.tenant
938 }
939
940 fn env(&self) -> &str {
941 &self.host.default_env
942 }
943
944 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
945 &mut self.host.oauth_host
946 }
947
948 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
949 self.host.oauth_config.as_ref()
950 }
951}
952
953impl WasiView for ComponentState {
954 fn ctx(&mut self) -> WasiCtxView<'_> {
955 WasiCtxView {
956 ctx: &mut self.wasi_ctx,
957 table: &mut self.resource_table,
958 }
959 }
960}
961
962#[allow(unsafe_code)]
963unsafe impl Send for ComponentState {}
964#[allow(unsafe_code)]
965unsafe impl Sync for ComponentState {}
966
967impl PackRuntime {
968 fn allows_state_store(&self, component_ref: &str) -> bool {
969 if self.state_store.is_none() {
970 return false;
971 }
972 if !self.config.state_store_policy.allow {
973 return false;
974 }
975 let Some(manifest) = self.component_manifests.get(component_ref) else {
976 return false;
977 };
978 manifest
979 .capabilities
980 .host
981 .state
982 .as_ref()
983 .map(|caps| caps.read || caps.write)
984 .unwrap_or(false)
985 }
986
987 #[allow(clippy::too_many_arguments)]
988 pub async fn load(
989 path: impl AsRef<Path>,
990 config: Arc<HostConfig>,
991 mocks: Option<Arc<MockLayer>>,
992 archive_source: Option<&Path>,
993 session_store: Option<DynSessionStore>,
994 state_store: Option<DynStateStore>,
995 wasi_policy: Arc<RunnerWasiPolicy>,
996 secrets: DynSecretsManager,
997 oauth_config: Option<OAuthBrokerConfig>,
998 verify_archive: bool,
999 component_resolution: ComponentResolution,
1000 ) -> Result<Self> {
1001 let path = path.as_ref();
1002 let (_pack_root, safe_path) = normalize_pack_path(path)?;
1003 let path_meta = std::fs::metadata(&safe_path).ok();
1004 let is_dir = path_meta
1005 .as_ref()
1006 .map(|meta| meta.is_dir())
1007 .unwrap_or(false);
1008 let is_component = !is_dir
1009 && safe_path
1010 .extension()
1011 .and_then(|ext| ext.to_str())
1012 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1013 .unwrap_or(false);
1014 let archive_hint_path = if let Some(source) = archive_source {
1015 let (_, normalized) = normalize_pack_path(source)?;
1016 Some(normalized)
1017 } else if is_component || is_dir {
1018 None
1019 } else {
1020 Some(safe_path.clone())
1021 };
1022 let archive_hint = archive_hint_path.as_deref();
1023 if verify_archive {
1024 if let Some(verify_target) = archive_hint.and_then(|p| {
1025 std::fs::metadata(p)
1026 .ok()
1027 .filter(|meta| meta.is_file())
1028 .map(|_| p)
1029 }) {
1030 verify::verify_pack(verify_target).await?;
1031 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1032 } else {
1033 tracing::debug!("skipping archive verification (no archive source)");
1034 }
1035 }
1036 let engine = Engine::default();
1037 let mut metadata = PackMetadata::fallback(&safe_path);
1038 let mut manifest = None;
1039 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1040 let mut flows = None;
1041 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1042 if is_dir {
1043 Some(safe_path.clone())
1044 } else {
1045 None
1046 }
1047 });
1048
1049 if let Some(root) = materialized_root.as_ref() {
1050 match load_manifest_and_flows_from_dir(root) {
1051 Ok(ManifestLoad::New {
1052 manifest: m,
1053 flows: cache,
1054 }) => {
1055 metadata = cache.metadata.clone();
1056 manifest = Some(*m);
1057 flows = Some(cache);
1058 }
1059 Ok(ManifestLoad::Legacy {
1060 manifest: m,
1061 flows: cache,
1062 }) => {
1063 metadata = cache.metadata.clone();
1064 legacy_manifest = Some(m);
1065 flows = Some(cache);
1066 }
1067 Err(err) => {
1068 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1069 }
1070 }
1071 }
1072
1073 if manifest.is_none()
1074 && legacy_manifest.is_none()
1075 && let Some(archive_path) = archive_hint
1076 {
1077 match load_manifest_and_flows(archive_path) {
1078 Ok(ManifestLoad::New {
1079 manifest: m,
1080 flows: cache,
1081 }) => {
1082 metadata = cache.metadata.clone();
1083 manifest = Some(*m);
1084 flows = Some(cache);
1085 }
1086 Ok(ManifestLoad::Legacy {
1087 manifest: m,
1088 flows: cache,
1089 }) => {
1090 metadata = cache.metadata.clone();
1091 legacy_manifest = Some(m);
1092 flows = Some(cache);
1093 }
1094 Err(err) => {
1095 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1096 }
1097 }
1098 }
1099 let component_sources_payload = if let Some(manifest) = manifest.as_ref() {
1100 manifest
1101 .get_component_sources_v1()
1102 .context("invalid component sources extension")?
1103 } else {
1104 None
1105 };
1106 let component_sources = component_sources_table(component_sources_payload.as_ref())?;
1107 let components = if is_component {
1108 let wasm_bytes = fs::read(&safe_path).await?;
1109 metadata = PackMetadata::from_wasm(&wasm_bytes)
1110 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1111 let name = safe_path
1112 .file_stem()
1113 .map(|s| s.to_string_lossy().to_string())
1114 .unwrap_or_else(|| "component".to_string());
1115 let component = Component::from_binary(&engine, &wasm_bytes)?;
1116 let mut map = HashMap::new();
1117 map.insert(
1118 name.clone(),
1119 PackComponent {
1120 name,
1121 version: metadata.version.clone(),
1122 component,
1123 },
1124 );
1125 map
1126 } else {
1127 let specs = component_specs(
1128 manifest.as_ref(),
1129 legacy_manifest.as_deref(),
1130 component_sources_payload.as_ref(),
1131 );
1132 if specs.is_empty() {
1133 HashMap::new()
1134 } else {
1135 let mut loaded = HashMap::new();
1136 let mut missing: HashSet<String> =
1137 specs.iter().map(|spec| spec.id.clone()).collect();
1138 let mut searched = Vec::new();
1139
1140 if !component_resolution.overrides.is_empty() {
1141 load_components_from_overrides(
1142 &engine,
1143 &component_resolution.overrides,
1144 &specs,
1145 &mut missing,
1146 &mut loaded,
1147 )?;
1148 searched.push("override map".to_string());
1149 }
1150
1151 if let Some(component_sources) = component_sources.as_ref() {
1152 load_components_from_sources(
1153 &engine,
1154 component_sources,
1155 &component_resolution,
1156 &specs,
1157 &mut missing,
1158 &mut loaded,
1159 materialized_root.as_deref(),
1160 archive_hint,
1161 )
1162 .await?;
1163 searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1164 }
1165
1166 if let Some(root) = materialized_root.as_ref() {
1167 load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1168 searched.push(format!("components dir {}", root.display()));
1169 }
1170
1171 if let Some(archive_path) = archive_hint {
1172 load_components_from_archive(
1173 &engine,
1174 archive_path,
1175 &specs,
1176 &mut missing,
1177 &mut loaded,
1178 )?;
1179 searched.push(format!("archive {}", archive_path.display()));
1180 }
1181
1182 if !missing.is_empty() {
1183 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1184 let sources = if searched.is_empty() {
1185 "no component sources".to_string()
1186 } else {
1187 searched.join(", ")
1188 };
1189 bail!(
1190 "components missing: {}; looked in {}",
1191 missing_list,
1192 sources
1193 );
1194 }
1195
1196 loaded
1197 }
1198 };
1199 let http_client = Arc::clone(&HTTP_CLIENT);
1200 let mut component_manifests = HashMap::new();
1201 if let Some(manifest) = manifest.as_ref() {
1202 for component in &manifest.components {
1203 component_manifests.insert(component.id.as_str().to_string(), component.clone());
1204 }
1205 }
1206 Ok(Self {
1207 path: safe_path,
1208 archive_path: archive_hint.map(Path::to_path_buf),
1209 config,
1210 engine,
1211 metadata,
1212 manifest,
1213 legacy_manifest,
1214 component_manifests,
1215 mocks,
1216 flows,
1217 components,
1218 http_client,
1219 pre_cache: Mutex::new(HashMap::new()),
1220 session_store,
1221 state_store,
1222 wasi_policy,
1223 provider_registry: RwLock::new(None),
1224 secrets,
1225 oauth_config,
1226 })
1227 }
1228
1229 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1230 if let Some(cache) = &self.flows {
1231 return Ok(cache.descriptors.clone());
1232 }
1233 if let Some(manifest) = &self.manifest {
1234 let descriptors = manifest
1235 .flows
1236 .iter()
1237 .map(|flow| FlowDescriptor {
1238 id: flow.id.as_str().to_string(),
1239 flow_type: flow_kind_to_str(flow.kind).to_string(),
1240 profile: manifest.pack_id.as_str().to_string(),
1241 version: manifest.version.to_string(),
1242 description: None,
1243 })
1244 .collect();
1245 return Ok(descriptors);
1246 }
1247 Ok(Vec::new())
1248 }
1249
1250 #[allow(dead_code)]
1251 pub async fn run_flow(
1252 &self,
1253 flow_id: &str,
1254 input: serde_json::Value,
1255 ) -> Result<serde_json::Value> {
1256 let pack = Arc::new(
1257 PackRuntime::load(
1258 &self.path,
1259 Arc::clone(&self.config),
1260 self.mocks.clone(),
1261 self.archive_path.as_deref(),
1262 self.session_store.clone(),
1263 self.state_store.clone(),
1264 Arc::clone(&self.wasi_policy),
1265 self.secrets.clone(),
1266 self.oauth_config.clone(),
1267 false,
1268 ComponentResolution::default(),
1269 )
1270 .await?,
1271 );
1272
1273 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1274 let retry_config = self.config.retry_config().into();
1275 let mocks = pack.mocks.as_deref();
1276 let tenant = self.config.tenant.as_str();
1277
1278 let ctx = FlowContext {
1279 tenant,
1280 flow_id,
1281 node_id: None,
1282 tool: None,
1283 action: None,
1284 session_id: None,
1285 provider_id: None,
1286 retry_config,
1287 observer: None,
1288 mocks,
1289 };
1290
1291 let execution = engine.execute(ctx, input).await?;
1292 match execution.status {
1293 FlowStatus::Completed => Ok(execution.output),
1294 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1295 "status": "pending",
1296 "reason": wait.reason,
1297 "resume": wait.snapshot,
1298 "response": execution.output,
1299 })),
1300 }
1301 }
1302
1303 pub async fn invoke_component(
1304 &self,
1305 component_ref: &str,
1306 ctx: ComponentExecCtx,
1307 operation: &str,
1308 _config_json: Option<String>,
1309 input_json: String,
1310 ) -> Result<Value> {
1311 let pack_component = self
1312 .components
1313 .get(component_ref)
1314 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1315
1316 let mut linker = Linker::new(&self.engine);
1317 let allow_state_store = self.allows_state_store(component_ref);
1318 register_all(&mut linker, allow_state_store)?;
1319 add_component_control_to_linker(&mut linker)?;
1320
1321 let host_state = HostState::new(
1322 Arc::clone(&self.config),
1323 Arc::clone(&self.http_client),
1324 self.mocks.clone(),
1325 self.session_store.clone(),
1326 self.state_store.clone(),
1327 Arc::clone(&self.secrets),
1328 self.oauth_config.clone(),
1329 Some(ctx.clone()),
1330 )?;
1331 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1332 let mut store = wasmtime::Store::new(&self.engine, store_state);
1333 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1334 let result = match component_api::v0_5::ComponentPre::new(pre_instance) {
1335 Ok(pre) => {
1336 let bindings = pre.instantiate_async(&mut store).await?;
1337 let node = bindings.greentic_component_node();
1338 let ctx_v05 = component_api::exec_ctx_v0_5(&ctx);
1339 let result = node.call_invoke(&mut store, &ctx_v05, operation, &input_json)?;
1340 component_api::invoke_result_from_v0_5(result)
1341 }
1342 Err(err) => {
1343 if is_missing_node_export(&err, "0.5.0") {
1344 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1345 let pre: component_api::v0_4::ComponentPre<ComponentState> =
1346 component_api::v0_4::ComponentPre::new(pre_instance)?;
1347 let bindings = pre.instantiate_async(&mut store).await?;
1348 let node = bindings.greentic_component_node();
1349 let ctx_v04 = component_api::exec_ctx_v0_4(&ctx);
1350 let result = node.call_invoke(&mut store, &ctx_v04, operation, &input_json)?;
1351 component_api::invoke_result_from_v0_4(result)
1352 } else {
1353 return Err(err);
1354 }
1355 }
1356 };
1357
1358 match result {
1359 InvokeResult::Ok(body) => {
1360 if body.is_empty() {
1361 return Ok(Value::Null);
1362 }
1363 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1364 }
1365 InvokeResult::Err(NodeError {
1366 code,
1367 message,
1368 retryable,
1369 backoff_ms,
1370 details,
1371 }) => {
1372 let mut obj = serde_json::Map::new();
1373 obj.insert("ok".into(), Value::Bool(false));
1374 let mut error = serde_json::Map::new();
1375 error.insert("code".into(), Value::String(code));
1376 error.insert("message".into(), Value::String(message));
1377 error.insert("retryable".into(), Value::Bool(retryable));
1378 if let Some(backoff) = backoff_ms {
1379 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1380 }
1381 if let Some(details) = details {
1382 error.insert(
1383 "details".into(),
1384 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1385 );
1386 }
1387 obj.insert("error".into(), Value::Object(error));
1388 Ok(Value::Object(obj))
1389 }
1390 }
1391 }
1392
1393 pub fn resolve_provider(
1394 &self,
1395 provider_id: Option<&str>,
1396 provider_type: Option<&str>,
1397 ) -> Result<ProviderBinding> {
1398 let registry = self.provider_registry()?;
1399 registry.resolve(provider_id, provider_type)
1400 }
1401
1402 pub async fn invoke_provider(
1403 &self,
1404 binding: &ProviderBinding,
1405 ctx: ComponentExecCtx,
1406 op: &str,
1407 input_json: Vec<u8>,
1408 ) -> Result<Value> {
1409 let component_ref = &binding.component_ref;
1410 let pack_component = self
1411 .components
1412 .get(component_ref)
1413 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1414
1415 let mut linker = Linker::new(&self.engine);
1416 let allow_state_store = self.allows_state_store(component_ref);
1417 register_all(&mut linker, allow_state_store)?;
1418 add_component_control_to_linker(&mut linker)?;
1419 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1420 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1421
1422 let host_state = HostState::new(
1423 Arc::clone(&self.config),
1424 Arc::clone(&self.http_client),
1425 self.mocks.clone(),
1426 self.session_store.clone(),
1427 self.state_store.clone(),
1428 Arc::clone(&self.secrets),
1429 self.oauth_config.clone(),
1430 Some(ctx),
1431 )?;
1432 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1433 let mut store = wasmtime::Store::new(&self.engine, store_state);
1434 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1435 let provider = bindings.greentic_provider_core_schema_core_api();
1436
1437 let result = provider.call_invoke(&mut store, op, &input_json)?;
1438 deserialize_json_bytes(result)
1439 }
1440
1441 fn provider_registry(&self) -> Result<ProviderRegistry> {
1442 if let Some(registry) = self.provider_registry.read().clone() {
1443 return Ok(registry);
1444 }
1445 let manifest = self
1446 .manifest
1447 .as_ref()
1448 .context("pack manifest required for provider resolution")?;
1449 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1450 let registry = ProviderRegistry::new(
1451 manifest,
1452 self.state_store.clone(),
1453 &self.config.tenant,
1454 &env,
1455 )?;
1456 *self.provider_registry.write() = Some(registry.clone());
1457 Ok(registry)
1458 }
1459
1460 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1461 if let Some(cache) = &self.flows {
1462 return cache
1463 .flows
1464 .get(flow_id)
1465 .cloned()
1466 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1467 }
1468 if let Some(manifest) = &self.manifest {
1469 let entry = manifest
1470 .flows
1471 .iter()
1472 .find(|f| f.id.as_str() == flow_id)
1473 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1474 return Ok(entry.flow.clone());
1475 }
1476 bail!("flow '{flow_id}' not available (pack exports disabled)")
1477 }
1478
1479 pub fn metadata(&self) -> &PackMetadata {
1480 &self.metadata
1481 }
1482
1483 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1484 &self.metadata.secret_requirements
1485 }
1486
1487 pub fn missing_secrets(
1488 &self,
1489 tenant_ctx: &TypesTenantCtx,
1490 ) -> Vec<greentic_types::SecretRequirement> {
1491 let env = tenant_ctx.env.as_str().to_string();
1492 let tenant = tenant_ctx.tenant.as_str().to_string();
1493 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1494 self.required_secrets()
1495 .iter()
1496 .filter(|req| {
1497 if let Some(scope) = &req.scope {
1499 if scope.env != env {
1500 return false;
1501 }
1502 if scope.tenant != tenant {
1503 return false;
1504 }
1505 if let Some(ref team_req) = scope.team
1506 && team.as_ref() != Some(team_req)
1507 {
1508 return false;
1509 }
1510 }
1511 read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1512 })
1513 .cloned()
1514 .collect()
1515 }
1516
1517 pub fn for_component_test(
1518 components: Vec<(String, PathBuf)>,
1519 flows: HashMap<String, FlowIR>,
1520 config: Arc<HostConfig>,
1521 ) -> Result<Self> {
1522 let engine = Engine::default();
1523 let mut component_map = HashMap::new();
1524 for (name, path) in components {
1525 if !path.exists() {
1526 bail!("component artifact missing: {}", path.display());
1527 }
1528 let wasm_bytes = std::fs::read(&path)?;
1529 let component = Component::from_binary(&engine, &wasm_bytes)
1530 .with_context(|| format!("failed to compile component {}", path.display()))?;
1531 component_map.insert(
1532 name.clone(),
1533 PackComponent {
1534 name,
1535 version: "0.0.0".into(),
1536 component,
1537 },
1538 );
1539 }
1540
1541 let mut flow_map = HashMap::new();
1542 let mut descriptors = Vec::new();
1543 for (id, ir) in flows {
1544 let flow_type = ir.flow_type.clone();
1545 let flow = flow_ir_to_flow(ir)?;
1546 flow_map.insert(id.clone(), flow);
1547 descriptors.push(FlowDescriptor {
1548 id: id.clone(),
1549 flow_type,
1550 profile: "test".into(),
1551 version: "0.0.0".into(),
1552 description: None,
1553 });
1554 }
1555 let flows_cache = PackFlows {
1556 descriptors: descriptors.clone(),
1557 flows: flow_map,
1558 metadata: PackMetadata::fallback(Path::new("component-test")),
1559 };
1560
1561 Ok(Self {
1562 path: PathBuf::new(),
1563 archive_path: None,
1564 config,
1565 engine,
1566 metadata: PackMetadata::fallback(Path::new("component-test")),
1567 manifest: None,
1568 legacy_manifest: None,
1569 component_manifests: HashMap::new(),
1570 mocks: None,
1571 flows: Some(flows_cache),
1572 components: component_map,
1573 http_client: Arc::clone(&HTTP_CLIENT),
1574 pre_cache: Mutex::new(HashMap::new()),
1575 session_store: None,
1576 state_store: None,
1577 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1578 provider_registry: RwLock::new(None),
1579 secrets: crate::secrets::default_manager(),
1580 oauth_config: None,
1581 })
1582 }
1583}
1584
1585fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
1586 let message = err.to_string();
1587 message.contains("no exported instance named")
1588 && message.contains(&format!("greentic:component/node@{version}"))
1589}
1590
1591struct PackFlows {
1592 descriptors: Vec<FlowDescriptor>,
1593 flows: HashMap<String, Flow>,
1594 metadata: PackMetadata,
1595}
1596
1597const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1598 "greentic.pack.runtime_flow",
1599 "greentic.pack.flow_runtime",
1600 "greentic.pack.runtime_flows",
1601];
1602
1603#[derive(Debug, Deserialize)]
1604struct RuntimeFlowBundle {
1605 flows: Vec<RuntimeFlow>,
1606}
1607
1608#[derive(Debug, Deserialize)]
1609struct RuntimeFlow {
1610 id: String,
1611 #[serde(alias = "flow_type")]
1612 kind: FlowKind,
1613 #[serde(default)]
1614 schema_version: Option<String>,
1615 #[serde(default)]
1616 start: Option<String>,
1617 #[serde(default)]
1618 entrypoints: BTreeMap<String, Value>,
1619 nodes: BTreeMap<String, RuntimeNode>,
1620 #[serde(default)]
1621 metadata: Option<FlowMetadata>,
1622}
1623
1624#[derive(Debug, Deserialize)]
1625struct RuntimeNode {
1626 #[serde(alias = "component")]
1627 component_id: String,
1628 #[serde(default, alias = "operation")]
1629 operation_name: Option<String>,
1630 #[serde(default, alias = "payload", alias = "input")]
1631 operation_payload: Value,
1632 #[serde(default)]
1633 routing: Option<Routing>,
1634 #[serde(default)]
1635 telemetry: Option<TelemetryHints>,
1636}
1637
1638fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1639 if bytes.is_empty() {
1640 return Ok(Value::Null);
1641 }
1642 serde_json::from_slice(&bytes).or_else(|_| {
1643 String::from_utf8(bytes)
1644 .map(Value::String)
1645 .map_err(|err| anyhow!(err))
1646 })
1647}
1648
1649impl PackFlows {
1650 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1651 if let Some(flows) = flows_from_runtime_extension(&manifest) {
1652 return flows;
1653 }
1654 let descriptors = manifest
1655 .flows
1656 .iter()
1657 .map(|entry| FlowDescriptor {
1658 id: entry.id.as_str().to_string(),
1659 flow_type: flow_kind_to_str(entry.kind).to_string(),
1660 profile: manifest.pack_id.as_str().to_string(),
1661 version: manifest.version.to_string(),
1662 description: None,
1663 })
1664 .collect();
1665 let mut flows = HashMap::new();
1666 for entry in &manifest.flows {
1667 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1668 }
1669 Self {
1670 metadata: PackMetadata::from_manifest(&manifest),
1671 descriptors,
1672 flows,
1673 }
1674 }
1675}
1676
1677fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1678 let extensions = manifest.extensions.as_ref()?;
1679 let extension = extensions.iter().find_map(|(key, ext)| {
1680 if RUNTIME_FLOW_EXTENSION_IDS
1681 .iter()
1682 .any(|candidate| candidate == key)
1683 {
1684 Some(ext)
1685 } else {
1686 None
1687 }
1688 })?;
1689 let runtime_flows = match decode_runtime_flow_extension(extension) {
1690 Some(flows) if !flows.is_empty() => flows,
1691 _ => return None,
1692 };
1693
1694 let descriptors = runtime_flows
1695 .iter()
1696 .map(|flow| FlowDescriptor {
1697 id: flow.id.as_str().to_string(),
1698 flow_type: flow_kind_to_str(flow.kind).to_string(),
1699 profile: manifest.pack_id.as_str().to_string(),
1700 version: manifest.version.to_string(),
1701 description: None,
1702 })
1703 .collect::<Vec<_>>();
1704 let flows = runtime_flows
1705 .into_iter()
1706 .map(|flow| (flow.id.as_str().to_string(), flow))
1707 .collect();
1708
1709 Some(PackFlows {
1710 metadata: PackMetadata::from_manifest(manifest),
1711 descriptors,
1712 flows,
1713 })
1714}
1715
1716fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1717 let value = match extension.inline.as_ref()? {
1718 ExtensionInline::Other(value) => value.clone(),
1719 _ => return None,
1720 };
1721
1722 if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1723 return Some(collect_runtime_flows(bundle.flows));
1724 }
1725
1726 if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1727 return Some(collect_runtime_flows(flows));
1728 }
1729
1730 if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1731 return Some(flows);
1732 }
1733
1734 warn!(
1735 extension = %extension.kind,
1736 version = %extension.version,
1737 "runtime flow extension present but could not be decoded"
1738 );
1739 None
1740}
1741
1742fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1743 flows
1744 .into_iter()
1745 .filter_map(|flow| match runtime_flow_to_flow(flow) {
1746 Ok(flow) => Some(flow),
1747 Err(err) => {
1748 warn!(error = %err, "failed to decode runtime flow");
1749 None
1750 }
1751 })
1752 .collect()
1753}
1754
1755fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1756 let flow_id = FlowId::from_str(&runtime.id)
1757 .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1758 let mut entrypoints = runtime.entrypoints;
1759 if entrypoints.is_empty()
1760 && let Some(start) = &runtime.start
1761 {
1762 entrypoints.insert("default".into(), Value::String(start.clone()));
1763 }
1764
1765 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1766 for (id, node) in runtime.nodes {
1767 let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1768 let component_id = ComponentId::from_str(&node.component_id)
1769 .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1770 let component = FlowComponentRef {
1771 id: component_id,
1772 pack_alias: None,
1773 operation: node.operation_name,
1774 };
1775 let routing = node.routing.unwrap_or(Routing::End);
1776 let telemetry = node.telemetry.unwrap_or_default();
1777 nodes.insert(
1778 node_id.clone(),
1779 Node {
1780 id: node_id,
1781 component,
1782 input: InputMapping {
1783 mapping: node.operation_payload,
1784 },
1785 output: OutputMapping {
1786 mapping: Value::Null,
1787 },
1788 routing,
1789 telemetry,
1790 },
1791 );
1792 }
1793
1794 Ok(Flow {
1795 schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1796 id: flow_id,
1797 kind: runtime.kind,
1798 entrypoints,
1799 nodes,
1800 metadata: runtime.metadata.unwrap_or_default(),
1801 })
1802}
1803
1804fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1805 match kind {
1806 greentic_types::FlowKind::Messaging => "messaging",
1807 greentic_types::FlowKind::Event => "event",
1808 greentic_types::FlowKind::ComponentConfig => "component-config",
1809 greentic_types::FlowKind::Job => "job",
1810 greentic_types::FlowKind::Http => "http",
1811 }
1812}
1813
1814fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1815 let mut file = archive
1816 .by_name(name)
1817 .with_context(|| format!("entry {name} missing from archive"))?;
1818 let mut buf = Vec::new();
1819 file.read_to_end(&mut buf)?;
1820 Ok(buf)
1821}
1822
1823fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1824 for node in doc.nodes.values_mut() {
1825 let Some((component_ref, payload)) = node
1826 .raw
1827 .iter()
1828 .next()
1829 .map(|(key, value)| (key.clone(), value.clone()))
1830 else {
1831 continue;
1832 };
1833 if component_ref.starts_with("emit.") {
1834 node.operation = Some(component_ref);
1835 node.payload = payload;
1836 node.raw.clear();
1837 continue;
1838 }
1839 let (target_component, operation, input, config) =
1840 infer_component_exec(&payload, &component_ref);
1841 let mut payload_obj = serde_json::Map::new();
1842 payload_obj.insert("component".into(), Value::String(target_component));
1844 payload_obj.insert("operation".into(), Value::String(operation));
1845 payload_obj.insert("input".into(), input);
1846 if let Some(cfg) = config {
1847 payload_obj.insert("config".into(), cfg);
1848 }
1849 node.operation = Some("component.exec".to_string());
1850 node.payload = Value::Object(payload_obj);
1851 node.raw.clear();
1852 }
1853 doc
1854}
1855
1856fn infer_component_exec(
1857 payload: &Value,
1858 component_ref: &str,
1859) -> (String, String, Value, Option<Value>) {
1860 let default_op = if component_ref.starts_with("templating.") {
1861 "render"
1862 } else {
1863 "invoke"
1864 }
1865 .to_string();
1866
1867 if let Value::Object(map) = payload {
1868 let op = map
1869 .get("op")
1870 .or_else(|| map.get("operation"))
1871 .and_then(Value::as_str)
1872 .map(|s| s.to_string())
1873 .unwrap_or_else(|| default_op.clone());
1874
1875 let mut input = map.clone();
1876 let config = input.remove("config");
1877 let component = input
1878 .get("component")
1879 .or_else(|| input.get("component_ref"))
1880 .and_then(Value::as_str)
1881 .map(|s| s.to_string())
1882 .unwrap_or_else(|| component_ref.to_string());
1883 input.remove("component");
1884 input.remove("component_ref");
1885 input.remove("op");
1886 input.remove("operation");
1887 return (component, op, Value::Object(input), config);
1888 }
1889
1890 (component_ref.to_string(), default_op, payload.clone(), None)
1891}
1892
1893#[derive(Clone, Debug)]
1894struct ComponentSpec {
1895 id: String,
1896 version: String,
1897 legacy_path: Option<String>,
1898}
1899
1900#[derive(Clone, Debug)]
1901struct ComponentSourceInfo {
1902 digest: String,
1903 source: ComponentSourceRef,
1904 artifact: ComponentArtifactLocation,
1905}
1906
1907#[derive(Clone, Debug)]
1908enum ComponentArtifactLocation {
1909 Inline { wasm_path: String },
1910 Remote,
1911}
1912
1913fn component_specs(
1914 manifest: Option<&greentic_types::PackManifest>,
1915 legacy_manifest: Option<&legacy_pack::PackManifest>,
1916 component_sources: Option<&ComponentSourcesV1>,
1917) -> Vec<ComponentSpec> {
1918 if let Some(manifest) = manifest {
1919 if !manifest.components.is_empty() {
1920 return manifest
1921 .components
1922 .iter()
1923 .map(|entry| ComponentSpec {
1924 id: entry.id.as_str().to_string(),
1925 version: entry.version.to_string(),
1926 legacy_path: None,
1927 })
1928 .collect();
1929 }
1930 if let Some(sources) = component_sources {
1931 let mut seen = HashSet::new();
1932 let mut specs = Vec::new();
1933 for entry in &sources.components {
1934 let id = entry
1935 .component_id
1936 .as_ref()
1937 .map(|id| id.as_str())
1938 .unwrap_or(entry.name.as_str());
1939 if seen.insert(id.to_string()) {
1940 specs.push(ComponentSpec {
1941 id: id.to_string(),
1942 version: "0.0.0".to_string(),
1943 legacy_path: None,
1944 });
1945 }
1946 }
1947 return specs;
1948 }
1949 }
1950 if let Some(legacy_manifest) = legacy_manifest {
1951 return legacy_manifest
1952 .components
1953 .iter()
1954 .map(|entry| ComponentSpec {
1955 id: entry.name.clone(),
1956 version: entry.version.to_string(),
1957 legacy_path: Some(entry.file_wasm.clone()),
1958 })
1959 .collect();
1960 }
1961 Vec::new()
1962}
1963
1964fn component_sources_table(
1965 sources: Option<&ComponentSourcesV1>,
1966) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
1967 let Some(sources) = sources else {
1968 return Ok(None);
1969 };
1970 let mut table = HashMap::new();
1971 for entry in &sources.components {
1972 let artifact = match &entry.artifact {
1973 ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
1974 wasm_path: wasm_path.clone(),
1975 },
1976 ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
1977 };
1978 let info = ComponentSourceInfo {
1979 digest: entry.resolved.digest.clone(),
1980 source: entry.source.clone(),
1981 artifact,
1982 };
1983 if let Some(component_id) = entry.component_id.as_ref() {
1984 table.insert(component_id.as_str().to_string(), info.clone());
1985 }
1986 table.insert(entry.name.clone(), info);
1987 }
1988 Ok(Some(table))
1989}
1990
1991fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1992 if let Some(path) = &spec.legacy_path {
1993 return root.join(path);
1994 }
1995 root.join("components").join(format!("{}.wasm", spec.id))
1996}
1997
1998fn normalize_digest(digest: &str) -> String {
1999 if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2000 digest.to_string()
2001 } else {
2002 format!("sha256:{digest}")
2003 }
2004}
2005
2006fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2007 if digest.starts_with("blake3:") {
2008 let hash = blake3::hash(bytes);
2009 return Ok(format!("blake3:{}", hash.to_hex()));
2010 }
2011 let mut hasher = sha2::Sha256::new();
2012 hasher.update(bytes);
2013 Ok(format!("sha256:{:x}", hasher.finalize()))
2014}
2015
2016fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2017 let normalized_expected = normalize_digest(expected);
2018 let actual = compute_digest_for(bytes, &normalized_expected)?;
2019 if normalize_digest(&actual) != normalized_expected {
2020 bail!(
2021 "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2022 );
2023 }
2024 Ok(())
2025}
2026
2027fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
2028 let mut opts = DistOptions {
2029 allow_tags: true,
2030 ..DistOptions::default()
2031 };
2032 if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
2033 opts.cache_dir = cache_dir;
2034 }
2035 if component_resolution.dist_offline {
2036 opts.offline = true;
2037 }
2038 opts
2039}
2040
2041#[allow(clippy::too_many_arguments)]
2042async fn load_components_from_sources(
2043 engine: &Engine,
2044 component_sources: &HashMap<String, ComponentSourceInfo>,
2045 component_resolution: &ComponentResolution,
2046 specs: &[ComponentSpec],
2047 missing: &mut HashSet<String>,
2048 into: &mut HashMap<String, PackComponent>,
2049 materialized_root: Option<&Path>,
2050 archive_hint: Option<&Path>,
2051) -> Result<()> {
2052 let mut archive = if let Some(path) = archive_hint {
2053 Some(
2054 ZipArchive::new(File::open(path)?)
2055 .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
2056 )
2057 } else {
2058 None
2059 };
2060 let mut dist_client: Option<DistClient> = None;
2061
2062 for spec in specs {
2063 if !missing.contains(&spec.id) {
2064 continue;
2065 }
2066 let Some(source) = component_sources.get(&spec.id) else {
2067 continue;
2068 };
2069
2070 let bytes = match &source.artifact {
2071 ComponentArtifactLocation::Inline { wasm_path } => {
2072 if let Some(root) = materialized_root {
2073 let path = root.join(wasm_path);
2074 if path.exists() {
2075 std::fs::read(&path).with_context(|| {
2076 format!(
2077 "failed to read inline component {} from {}",
2078 spec.id,
2079 path.display()
2080 )
2081 })?
2082 } else if archive.is_none() {
2083 bail!("inline component {} missing at {}", spec.id, path.display());
2084 } else {
2085 read_entry(
2086 archive.as_mut().expect("archive present when needed"),
2087 wasm_path,
2088 )
2089 .with_context(|| {
2090 format!(
2091 "inline component {} missing at {} in pack archive",
2092 spec.id, wasm_path
2093 )
2094 })?
2095 }
2096 } else if let Some(archive) = archive.as_mut() {
2097 read_entry(archive, wasm_path).with_context(|| {
2098 format!(
2099 "inline component {} missing at {} in pack archive",
2100 spec.id, wasm_path
2101 )
2102 })?
2103 } else {
2104 bail!(
2105 "inline component {} missing and no pack source available",
2106 spec.id
2107 );
2108 }
2109 }
2110 ComponentArtifactLocation::Remote => {
2111 let client = dist_client.get_or_insert_with(|| {
2112 DistClient::new(dist_options_from(component_resolution))
2113 });
2114 let reference = source.source.to_string();
2115 let cache_path = if component_resolution.dist_offline {
2116 client
2117 .fetch_digest(&source.digest)
2118 .await
2119 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
2120 } else {
2121 let resolved = client
2122 .resolve_ref(&reference)
2123 .await
2124 .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
2125 let expected = normalize_digest(&source.digest);
2126 let actual = normalize_digest(&resolved.digest);
2127 if expected != actual {
2128 bail!(
2129 "component {} digest mismatch after fetch: expected {}, got {}",
2130 spec.id,
2131 expected,
2132 actual
2133 );
2134 }
2135 resolved.cache_path.ok_or_else(|| {
2136 anyhow!(
2137 "component {} resolved from {} but cache path is missing",
2138 spec.id,
2139 reference
2140 )
2141 })?
2142 };
2143 std::fs::read(&cache_path).with_context(|| {
2144 format!(
2145 "failed to read cached component {} from {}",
2146 spec.id,
2147 cache_path.display()
2148 )
2149 })?
2150 }
2151 };
2152
2153 verify_component_digest(&spec.id, &source.digest, &bytes)?;
2154 let component = Component::from_binary(engine, &bytes)
2155 .with_context(|| format!("failed to compile component {}", spec.id))?;
2156 into.insert(
2157 spec.id.clone(),
2158 PackComponent {
2159 name: spec.id.clone(),
2160 version: spec.version.clone(),
2161 component,
2162 },
2163 );
2164 missing.remove(&spec.id);
2165 }
2166
2167 Ok(())
2168}
2169
2170fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
2171 match err {
2172 DistError::CacheMiss { reference: missing } => anyhow!(
2173 "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2174 component_id,
2175 missing,
2176 reference
2177 ),
2178 DistError::Offline { reference: blocked } => anyhow!(
2179 "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2180 component_id,
2181 blocked,
2182 reference
2183 ),
2184 DistError::AuthRequired { target } => anyhow!(
2185 "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
2186 component_id,
2187 target,
2188 reference
2189 ),
2190 other => anyhow!(
2191 "failed to resolve component {} from {}: {}",
2192 component_id,
2193 reference,
2194 other
2195 ),
2196 }
2197}
2198
2199fn load_components_from_overrides(
2200 engine: &Engine,
2201 overrides: &HashMap<String, PathBuf>,
2202 specs: &[ComponentSpec],
2203 missing: &mut HashSet<String>,
2204 into: &mut HashMap<String, PackComponent>,
2205) -> Result<()> {
2206 for spec in specs {
2207 if !missing.contains(&spec.id) {
2208 continue;
2209 }
2210 let Some(path) = overrides.get(&spec.id) else {
2211 continue;
2212 };
2213 let bytes = std::fs::read(path)
2214 .with_context(|| format!("failed to read override component {}", path.display()))?;
2215 let component = Component::from_binary(engine, &bytes).with_context(|| {
2216 format!(
2217 "failed to compile component {} from override {}",
2218 spec.id,
2219 path.display()
2220 )
2221 })?;
2222 into.insert(
2223 spec.id.clone(),
2224 PackComponent {
2225 name: spec.id.clone(),
2226 version: spec.version.clone(),
2227 component,
2228 },
2229 );
2230 missing.remove(&spec.id);
2231 }
2232 Ok(())
2233}
2234
2235fn load_components_from_dir(
2236 engine: &Engine,
2237 root: &Path,
2238 specs: &[ComponentSpec],
2239 missing: &mut HashSet<String>,
2240 into: &mut HashMap<String, PackComponent>,
2241) -> Result<()> {
2242 for spec in specs {
2243 if !missing.contains(&spec.id) {
2244 continue;
2245 }
2246 let path = component_path_for_spec(root, spec);
2247 if !path.exists() {
2248 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
2249 continue;
2250 }
2251 let bytes = std::fs::read(&path)
2252 .with_context(|| format!("failed to read component {}", path.display()))?;
2253 let component = Component::from_binary(engine, &bytes).with_context(|| {
2254 format!(
2255 "failed to compile component {} from {}",
2256 spec.id,
2257 path.display()
2258 )
2259 })?;
2260 into.insert(
2261 spec.id.clone(),
2262 PackComponent {
2263 name: spec.id.clone(),
2264 version: spec.version.clone(),
2265 component,
2266 },
2267 );
2268 missing.remove(&spec.id);
2269 }
2270 Ok(())
2271}
2272
2273fn load_components_from_archive(
2274 engine: &Engine,
2275 path: &Path,
2276 specs: &[ComponentSpec],
2277 missing: &mut HashSet<String>,
2278 into: &mut HashMap<String, PackComponent>,
2279) -> Result<()> {
2280 let mut archive = ZipArchive::new(File::open(path)?)
2281 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
2282 for spec in specs {
2283 if !missing.contains(&spec.id) {
2284 continue;
2285 }
2286 let file_name = spec
2287 .legacy_path
2288 .clone()
2289 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
2290 let bytes = match read_entry(&mut archive, &file_name) {
2291 Ok(bytes) => bytes,
2292 Err(err) => {
2293 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
2294 continue;
2295 }
2296 };
2297 let component = Component::from_binary(engine, &bytes)
2298 .with_context(|| format!("failed to compile component {}", spec.id))?;
2299 into.insert(
2300 spec.id.clone(),
2301 PackComponent {
2302 name: spec.id.clone(),
2303 version: spec.version.clone(),
2304 component,
2305 },
2306 );
2307 missing.remove(&spec.id);
2308 }
2309 Ok(())
2310}
2311
2312#[cfg(test)]
2313mod tests {
2314 use super::*;
2315 use greentic_flow::model::{FlowDoc, NodeDoc};
2316 use indexmap::IndexMap;
2317 use serde_json::json;
2318
2319 #[test]
2320 fn normalizes_raw_component_to_component_exec() {
2321 let mut nodes = IndexMap::new();
2322 let mut raw = IndexMap::new();
2323 raw.insert(
2324 "templating.handlebars".into(),
2325 json!({ "template": "Hi {{name}}" }),
2326 );
2327 nodes.insert(
2328 "start".into(),
2329 NodeDoc {
2330 raw,
2331 routing: json!([{"out": true}]),
2332 ..Default::default()
2333 },
2334 );
2335 let doc = FlowDoc {
2336 id: "welcome".into(),
2337 title: None,
2338 description: None,
2339 flow_type: "messaging".into(),
2340 start: Some("start".into()),
2341 parameters: json!({}),
2342 tags: Vec::new(),
2343 schema_version: None,
2344 entrypoints: IndexMap::new(),
2345 nodes,
2346 };
2347
2348 let normalized = normalize_flow_doc(doc);
2349 let node = normalized.nodes.get("start").expect("node exists");
2350 assert_eq!(node.operation.as_deref(), Some("component.exec"));
2351 assert!(node.raw.is_empty());
2352 let payload = node.payload.as_object().expect("payload object");
2353 assert_eq!(
2354 payload.get("component"),
2355 Some(&Value::String("templating.handlebars".into()))
2356 );
2357 assert_eq!(
2358 payload.get("operation"),
2359 Some(&Value::String("render".into()))
2360 );
2361 let input = payload.get("input").unwrap();
2362 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
2363 }
2364}
2365
2366#[derive(Clone, Debug, Default, Serialize, Deserialize)]
2367pub struct PackMetadata {
2368 pub pack_id: String,
2369 pub version: String,
2370 #[serde(default)]
2371 pub entry_flows: Vec<String>,
2372 #[serde(default)]
2373 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
2374}
2375
2376impl PackMetadata {
2377 fn from_wasm(bytes: &[u8]) -> Option<Self> {
2378 let parser = Parser::new(0);
2379 for payload in parser.parse_all(bytes) {
2380 let payload = payload.ok()?;
2381 match payload {
2382 Payload::CustomSection(section) => {
2383 if section.name() == "greentic.manifest"
2384 && let Ok(meta) = Self::from_bytes(section.data())
2385 {
2386 return Some(meta);
2387 }
2388 }
2389 Payload::DataSection(reader) => {
2390 for segment in reader.into_iter().flatten() {
2391 if let Ok(meta) = Self::from_bytes(segment.data) {
2392 return Some(meta);
2393 }
2394 }
2395 }
2396 _ => {}
2397 }
2398 }
2399 None
2400 }
2401
2402 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
2403 #[derive(Deserialize)]
2404 struct RawManifest {
2405 pack_id: String,
2406 version: String,
2407 #[serde(default)]
2408 entry_flows: Vec<String>,
2409 #[serde(default)]
2410 flows: Vec<RawFlow>,
2411 #[serde(default)]
2412 secret_requirements: Vec<greentic_types::SecretRequirement>,
2413 }
2414
2415 #[derive(Deserialize)]
2416 struct RawFlow {
2417 id: String,
2418 }
2419
2420 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2421 let mut entry_flows = if manifest.entry_flows.is_empty() {
2422 manifest.flows.iter().map(|f| f.id.clone()).collect()
2423 } else {
2424 manifest.entry_flows.clone()
2425 };
2426 entry_flows.retain(|id| !id.is_empty());
2427 Ok(Self {
2428 pack_id: manifest.pack_id,
2429 version: manifest.version,
2430 entry_flows,
2431 secret_requirements: manifest.secret_requirements,
2432 })
2433 }
2434
2435 pub fn fallback(path: &Path) -> Self {
2436 let pack_id = path
2437 .file_stem()
2438 .map(|s| s.to_string_lossy().into_owned())
2439 .unwrap_or_else(|| "unknown-pack".to_string());
2440 Self {
2441 pack_id,
2442 version: "0.0.0".to_string(),
2443 entry_flows: Vec::new(),
2444 secret_requirements: Vec::new(),
2445 }
2446 }
2447
2448 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2449 let entry_flows = manifest
2450 .flows
2451 .iter()
2452 .map(|flow| flow.id.as_str().to_string())
2453 .collect::<Vec<_>>();
2454 Self {
2455 pack_id: manifest.pack_id.as_str().to_string(),
2456 version: manifest.version.to_string(),
2457 entry_flows,
2458 secret_requirements: manifest.secret_requirements.clone(),
2459 }
2460 }
2461}