1use std::collections::{HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11 ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20 self as host_v1, HostFns, add_all_v1_to_linker,
21 runner_host_http::RunnerHostHttp,
22 runner_host_kv::RunnerHostKv,
23 secrets_store::{SecretsError, SecretsStoreHost},
24 state_store::{
25 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26 StateStoreHost, TenantCtx as StateTenantCtx,
27 },
28 telemetry_logger::{
29 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31 TenantCtx as TelemetryTenantCtx,
32 },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::{
38 EnvId, Flow, StateKey as StoreStateKey, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
39 decode_pack_manifest,
40};
41use host_v1::http_client::{
42 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
43 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
44 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
45 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
46};
47use once_cell::sync::Lazy;
48use parking_lot::{Mutex, RwLock};
49use reqwest::blocking::Client as BlockingClient;
50use runner_core::normalize_under_root;
51use serde::{Deserialize, Serialize};
52use serde_cbor;
53use serde_json::{self, Value};
54use tokio::fs;
55use wasmparser::{Parser, Payload};
56use wasmtime::StoreContextMut;
57use zip::ZipArchive;
58
59use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
60use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
61use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
62
63use crate::config::HostConfig;
64use crate::secrets::{DynSecretsManager, read_secret_blocking};
65use crate::storage::state::STATE_PREFIX;
66use crate::storage::{DynSessionStore, DynStateStore};
67use crate::verify;
68use crate::wasi::RunnerWasiPolicy;
69use tracing::warn;
70use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
71use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
72
73use greentic_flow::model::FlowDoc;
74
75#[allow(dead_code)]
76pub struct PackRuntime {
77 path: PathBuf,
79 archive_path: Option<PathBuf>,
81 config: Arc<HostConfig>,
82 engine: Engine,
83 metadata: PackMetadata,
84 manifest: Option<greentic_types::PackManifest>,
85 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
86 mocks: Option<Arc<MockLayer>>,
87 flows: Option<PackFlows>,
88 components: HashMap<String, PackComponent>,
89 http_client: Arc<BlockingClient>,
90 pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
91 session_store: Option<DynSessionStore>,
92 state_store: Option<DynStateStore>,
93 wasi_policy: Arc<RunnerWasiPolicy>,
94 provider_registry: RwLock<Option<ProviderRegistry>>,
95 secrets: DynSecretsManager,
96 oauth_config: Option<OAuthBrokerConfig>,
97}
98
99struct PackComponent {
100 #[allow(dead_code)]
101 name: String,
102 #[allow(dead_code)]
103 version: String,
104 component: Component,
105}
106
107#[derive(Debug, Default, Clone)]
108pub struct ComponentResolution {
109 pub materialized_root: Option<PathBuf>,
111 pub overrides: HashMap<String, PathBuf>,
113}
114
115fn build_blocking_client() -> BlockingClient {
116 std::thread::spawn(|| {
117 BlockingClient::builder()
118 .no_proxy()
119 .build()
120 .expect("blocking client")
121 })
122 .join()
123 .expect("client build thread panicked")
124}
125
126fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
127 let (root, candidate) = if path.is_absolute() {
128 let parent = path
129 .parent()
130 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
131 let root = parent
132 .canonicalize()
133 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
134 let file = path
135 .file_name()
136 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
137 (root, PathBuf::from(file))
138 } else {
139 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
140 let base = if let Some(parent) = path.parent() {
141 cwd.join(parent)
142 } else {
143 cwd
144 };
145 let root = base
146 .canonicalize()
147 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
148 let file = path
149 .file_name()
150 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
151 (root, PathBuf::from(file))
152 };
153 let safe = normalize_under_root(&root, &candidate)?;
154 Ok((root, safe))
155}
156
157static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct FlowDescriptor {
161 pub id: String,
162 #[serde(rename = "type")]
163 pub flow_type: String,
164 pub profile: String,
165 pub version: String,
166 #[serde(default)]
167 pub description: Option<String>,
168}
169
170pub struct HostState {
171 config: Arc<HostConfig>,
172 http_client: Arc<BlockingClient>,
173 default_env: String,
174 #[allow(dead_code)]
175 session_store: Option<DynSessionStore>,
176 state_store: Option<DynStateStore>,
177 mocks: Option<Arc<MockLayer>>,
178 secrets: DynSecretsManager,
179 oauth_config: Option<OAuthBrokerConfig>,
180 oauth_host: OAuthBrokerHost,
181}
182
183impl HostState {
184 #[allow(clippy::default_constructed_unit_structs)]
185 pub fn new(
186 config: Arc<HostConfig>,
187 http_client: Arc<BlockingClient>,
188 mocks: Option<Arc<MockLayer>>,
189 session_store: Option<DynSessionStore>,
190 state_store: Option<DynStateStore>,
191 secrets: DynSecretsManager,
192 oauth_config: Option<OAuthBrokerConfig>,
193 ) -> Result<Self> {
194 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
195 Ok(Self {
196 config,
197 http_client,
198 default_env,
199 session_store,
200 state_store,
201 mocks,
202 secrets,
203 oauth_config,
204 oauth_host: OAuthBrokerHost::default(),
205 })
206 }
207
208 pub fn get_secret(&self, key: &str) -> Result<String> {
209 if provider_core_only::is_enabled() {
210 bail!(provider_core_only::blocked_message("secrets"))
211 }
212 if !self.config.secrets_policy.is_allowed(key) {
213 bail!("secret {key} is not permitted by bindings policy");
214 }
215 if let Some(mock) = &self.mocks
216 && let Some(value) = mock.secrets_lookup(key)
217 {
218 return Ok(value);
219 }
220 let bytes = read_secret_blocking(&self.secrets, key)
221 .context("failed to read secret from manager")?;
222 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
223 Ok(value)
224 }
225
226 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
227 let tenant_raw = ctx
228 .as_ref()
229 .map(|ctx| ctx.tenant.clone())
230 .unwrap_or_else(|| self.config.tenant.clone());
231 let env_raw = ctx
232 .as_ref()
233 .map(|ctx| ctx.env.clone())
234 .unwrap_or_else(|| self.default_env.clone());
235 let tenant_id = TenantId::from_str(&tenant_raw)
236 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
237 let env_id = EnvId::from_str(&env_raw)
238 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
239 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
240 if let Some(ctx) = ctx {
241 if let Some(team) = ctx.team.or(ctx.team_id) {
242 let team_id =
243 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
244 tenant_ctx = tenant_ctx.with_team(Some(team_id));
245 }
246 if let Some(user) = ctx.user.or(ctx.user_id) {
247 let user_id =
248 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
249 tenant_ctx = tenant_ctx.with_user(Some(user_id));
250 }
251 if let Some(flow) = ctx.flow_id {
252 tenant_ctx = tenant_ctx.with_flow(flow);
253 }
254 if let Some(node) = ctx.node_id {
255 tenant_ctx = tenant_ctx.with_node(node);
256 }
257 if let Some(provider) = ctx.provider_id {
258 tenant_ctx = tenant_ctx.with_provider(provider);
259 }
260 if let Some(session) = ctx.session_id {
261 tenant_ctx = tenant_ctx.with_session(session);
262 }
263 tenant_ctx.trace_id = ctx.trace_id;
264 }
265 Ok(tenant_ctx)
266 }
267
268 fn send_http_request(
269 &mut self,
270 req: HttpRequest,
271 opts: Option<HttpRequestOptionsV1_1>,
272 _ctx: Option<HttpTenantCtx>,
273 ) -> Result<HttpResponse, HttpClientError> {
274 if !self.config.http_enabled {
275 return Err(HttpClientError {
276 code: "denied".into(),
277 message: "http client disabled by policy".into(),
278 });
279 }
280
281 let mut mock_state = None;
282 let raw_body = req.body.clone();
283 if let Some(mock) = &self.mocks
284 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
285 {
286 match mock.http_begin(&meta) {
287 HttpDecision::Mock(response) => {
288 let headers = response
289 .headers
290 .iter()
291 .map(|(k, v)| (k.clone(), v.clone()))
292 .collect();
293 return Ok(HttpResponse {
294 status: response.status,
295 headers,
296 body: response.body.clone().map(|b| b.into_bytes()),
297 });
298 }
299 HttpDecision::Deny(reason) => {
300 return Err(HttpClientError {
301 code: "denied".into(),
302 message: reason,
303 });
304 }
305 HttpDecision::Passthrough { record } => {
306 mock_state = Some((meta, record));
307 }
308 }
309 }
310
311 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
312 let mut builder = self.http_client.request(method, &req.url);
313 for (key, value) in req.headers {
314 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
315 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
316 {
317 builder = builder.header(header, header_value);
318 }
319 }
320
321 if let Some(body) = raw_body.clone() {
322 builder = builder.body(body);
323 }
324
325 if let Some(opts) = opts {
326 if let Some(timeout_ms) = opts.timeout_ms {
327 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
328 }
329 if opts.allow_insecure == Some(true) {
330 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
331 }
332 if let Some(follow_redirects) = opts.follow_redirects
333 && !follow_redirects
334 {
335 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
336 }
337 }
338
339 let response = match builder.send() {
340 Ok(resp) => resp,
341 Err(err) => {
342 warn!(url = %req.url, error = %err, "http client request failed");
343 return Err(HttpClientError {
344 code: "unavailable".into(),
345 message: err.to_string(),
346 });
347 }
348 };
349
350 let status = response.status().as_u16();
351 let headers_vec = response
352 .headers()
353 .iter()
354 .map(|(k, v)| {
355 (
356 k.as_str().to_string(),
357 v.to_str().unwrap_or_default().to_string(),
358 )
359 })
360 .collect::<Vec<_>>();
361 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
362
363 if let Some((meta, true)) = mock_state.take()
364 && let Some(mock) = &self.mocks
365 {
366 let recorded = HttpMockResponse::new(
367 status,
368 headers_vec.clone().into_iter().collect(),
369 body_bytes
370 .as_ref()
371 .map(|b| String::from_utf8_lossy(b).into_owned()),
372 );
373 mock.http_record(&meta, &recorded);
374 }
375
376 Ok(HttpResponse {
377 status,
378 headers: headers_vec,
379 body: body_bytes,
380 })
381 }
382}
383
384impl SecretsStoreHost for HostState {
385 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
386 if provider_core_only::is_enabled() {
387 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
388 return Err(SecretsError::Denied);
389 }
390 if !self.config.secrets_policy.is_allowed(&key) {
391 return Err(SecretsError::Denied);
392 }
393 if let Some(mock) = &self.mocks
394 && let Some(value) = mock.secrets_lookup(&key)
395 {
396 return Ok(Some(value.into_bytes()));
397 }
398 match read_secret_blocking(&self.secrets, &key) {
399 Ok(bytes) => Ok(Some(bytes)),
400 Err(err) => {
401 warn!(secret = %key, error = %err, "secret lookup failed");
402 Err(SecretsError::NotFound)
403 }
404 }
405 }
406}
407
408impl HttpClientHost for HostState {
409 fn send(
410 &mut self,
411 req: HttpRequest,
412 ctx: Option<HttpTenantCtx>,
413 ) -> Result<HttpResponse, HttpClientError> {
414 self.send_http_request(req, None, ctx)
415 }
416}
417
418impl HttpClientHostV1_1 for HostState {
419 fn send(
420 &mut self,
421 req: HttpRequestV1_1,
422 opts: Option<HttpRequestOptionsV1_1>,
423 ctx: Option<HttpTenantCtxV1_1>,
424 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
425 let legacy_req = HttpRequest {
426 method: req.method,
427 url: req.url,
428 headers: req.headers,
429 body: req.body,
430 };
431 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
432 env: ctx.env,
433 tenant: ctx.tenant,
434 tenant_id: ctx.tenant_id,
435 team: ctx.team,
436 team_id: ctx.team_id,
437 user: ctx.user,
438 user_id: ctx.user_id,
439 trace_id: ctx.trace_id,
440 correlation_id: ctx.correlation_id,
441 attributes: ctx.attributes,
442 session_id: ctx.session_id,
443 flow_id: ctx.flow_id,
444 node_id: ctx.node_id,
445 provider_id: ctx.provider_id,
446 deadline_ms: ctx.deadline_ms,
447 attempt: ctx.attempt,
448 idempotency_key: ctx.idempotency_key,
449 impersonation: ctx
450 .impersonation
451 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
452 actor_id,
453 reason,
454 }),
455 });
456
457 self.send_http_request(legacy_req, opts, legacy_ctx)
458 .map(|resp| HttpResponseV1_1 {
459 status: resp.status,
460 headers: resp.headers,
461 body: resp.body,
462 })
463 .map_err(|err| HttpClientErrorV1_1 {
464 code: err.code,
465 message: err.message,
466 })
467 }
468}
469
470impl StateStoreHost for HostState {
471 fn read(
472 &mut self,
473 key: HostStateKey,
474 ctx: Option<StateTenantCtx>,
475 ) -> Result<Vec<u8>, StateError> {
476 let store = match self.state_store.as_ref() {
477 Some(store) => store.clone(),
478 None => {
479 return Err(StateError {
480 code: "unavailable".into(),
481 message: "state store not configured".into(),
482 });
483 }
484 };
485 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
486 Ok(ctx) => ctx,
487 Err(err) => {
488 return Err(StateError {
489 code: "invalid-ctx".into(),
490 message: err.to_string(),
491 });
492 }
493 };
494 let key = StoreStateKey::from(key);
495 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
496 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
497 Ok(None) => Err(StateError {
498 code: "not_found".into(),
499 message: "state key not found".into(),
500 }),
501 Err(err) => Err(StateError {
502 code: "internal".into(),
503 message: err.to_string(),
504 }),
505 }
506 }
507
508 fn write(
509 &mut self,
510 key: HostStateKey,
511 bytes: Vec<u8>,
512 ctx: Option<StateTenantCtx>,
513 ) -> Result<StateOpAck, StateError> {
514 let store = match self.state_store.as_ref() {
515 Some(store) => store.clone(),
516 None => {
517 return Err(StateError {
518 code: "unavailable".into(),
519 message: "state store not configured".into(),
520 });
521 }
522 };
523 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
524 Ok(ctx) => ctx,
525 Err(err) => {
526 return Err(StateError {
527 code: "invalid-ctx".into(),
528 message: err.to_string(),
529 });
530 }
531 };
532 let key = StoreStateKey::from(key);
533 let value = serde_json::from_slice(&bytes)
534 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
535 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
536 Ok(()) => Ok(StateOpAck::Ok),
537 Err(err) => Err(StateError {
538 code: "internal".into(),
539 message: err.to_string(),
540 }),
541 }
542 }
543
544 fn delete(
545 &mut self,
546 key: HostStateKey,
547 ctx: Option<StateTenantCtx>,
548 ) -> Result<StateOpAck, StateError> {
549 let store = match self.state_store.as_ref() {
550 Some(store) => store.clone(),
551 None => {
552 return Err(StateError {
553 code: "unavailable".into(),
554 message: "state store not configured".into(),
555 });
556 }
557 };
558 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
559 Ok(ctx) => ctx,
560 Err(err) => {
561 return Err(StateError {
562 code: "invalid-ctx".into(),
563 message: err.to_string(),
564 });
565 }
566 };
567 let key = StoreStateKey::from(key);
568 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
569 Ok(_) => Ok(StateOpAck::Ok),
570 Err(err) => Err(StateError {
571 code: "internal".into(),
572 message: err.to_string(),
573 }),
574 }
575 }
576}
577
578impl TelemetryLoggerHost for HostState {
579 fn log(
580 &mut self,
581 span: TelemetrySpanContext,
582 fields: Vec<(String, String)>,
583 _ctx: Option<TelemetryTenantCtx>,
584 ) -> Result<TelemetryAck, TelemetryError> {
585 if let Some(mock) = &self.mocks
586 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
587 {
588 return Ok(TelemetryAck::Ok);
589 }
590 let mut map = serde_json::Map::new();
591 for (k, v) in fields {
592 map.insert(k, Value::String(v));
593 }
594 tracing::info!(
595 tenant = %span.tenant,
596 flow_id = %span.flow_id,
597 node = ?span.node_id,
598 provider = %span.provider,
599 fields = %serde_json::Value::Object(map.clone()),
600 "telemetry log from pack"
601 );
602 Ok(TelemetryAck::Ok)
603 }
604}
605
606impl RunnerHostHttp for HostState {
607 fn request(
608 &mut self,
609 method: String,
610 url: String,
611 headers: Vec<String>,
612 body: Option<Vec<u8>>,
613 ) -> Result<Vec<u8>, String> {
614 let req = HttpRequest {
615 method,
616 url,
617 headers: headers
618 .chunks(2)
619 .filter_map(|chunk| {
620 if chunk.len() == 2 {
621 Some((chunk[0].clone(), chunk[1].clone()))
622 } else {
623 None
624 }
625 })
626 .collect(),
627 body,
628 };
629 match HttpClientHost::send(self, req, None) {
630 Ok(resp) => Ok(resp.body.unwrap_or_default()),
631 Err(err) => Err(err.message),
632 }
633 }
634}
635
636impl RunnerHostKv for HostState {
637 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
638 None
639 }
640
641 fn put(&mut self, _ns: String, _key: String, _val: String) {}
642}
643
644enum ManifestLoad {
645 New {
646 manifest: Box<greentic_types::PackManifest>,
647 flows: PackFlows,
648 },
649 Legacy {
650 manifest: Box<legacy_pack::PackManifest>,
651 flows: PackFlows,
652 },
653}
654
655fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
656 let mut archive = ZipArchive::new(File::open(path)?)
657 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
658 let bytes = read_entry(&mut archive, "manifest.cbor")
659 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
660 match decode_pack_manifest(&bytes) {
661 Ok(manifest) => {
662 let cache = PackFlows::from_manifest(manifest.clone());
663 Ok(ManifestLoad::New {
664 manifest: Box::new(manifest),
665 flows: cache,
666 })
667 }
668 Err(err) => {
669 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
670 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
672 .context("failed to decode legacy pack manifest from manifest.cbor")?;
673 let flows = load_legacy_flows(&mut archive, &legacy)?;
674 Ok(ManifestLoad::Legacy {
675 manifest: Box::new(legacy),
676 flows,
677 })
678 }
679 }
680}
681
682fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
683 let manifest_path = root.join("manifest.cbor");
684 let bytes = std::fs::read(&manifest_path)
685 .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
686 match decode_pack_manifest(&bytes) {
687 Ok(manifest) => {
688 let cache = PackFlows::from_manifest(manifest.clone());
689 Ok(ManifestLoad::New {
690 manifest: Box::new(manifest),
691 flows: cache,
692 })
693 }
694 Err(err) => {
695 tracing::debug!(
696 error = %err,
697 pack = %root.display(),
698 "decode_pack_manifest failed for materialized pack; trying legacy manifest"
699 );
700 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
701 .context("failed to decode legacy pack manifest from manifest.cbor")?;
702 let flows = load_legacy_flows_from_dir(root, &legacy)?;
703 Ok(ManifestLoad::Legacy {
704 manifest: Box::new(legacy),
705 flows,
706 })
707 }
708 }
709}
710
711fn load_legacy_flows(
712 archive: &mut ZipArchive<File>,
713 manifest: &legacy_pack::PackManifest,
714) -> Result<PackFlows> {
715 let mut flows = HashMap::new();
716 let mut descriptors = Vec::new();
717
718 for entry in &manifest.flows {
719 let bytes = read_entry(archive, &entry.file_json)
720 .with_context(|| format!("missing flow json {}", entry.file_json))?;
721 let doc: FlowDoc = serde_json::from_slice(&bytes)
722 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
723 let normalized = normalize_flow_doc(doc);
724 let flow_ir = flow_doc_to_ir(normalized)?;
725 let flow = flow_ir_to_flow(flow_ir)?;
726
727 descriptors.push(FlowDescriptor {
728 id: entry.id.clone(),
729 flow_type: entry.kind.clone(),
730 profile: manifest.meta.pack_id.clone(),
731 version: manifest.meta.version.to_string(),
732 description: None,
733 });
734 flows.insert(entry.id.clone(), flow);
735 }
736
737 let mut entry_flows = manifest.meta.entry_flows.clone();
738 if entry_flows.is_empty() {
739 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
740 }
741 let metadata = PackMetadata {
742 pack_id: manifest.meta.pack_id.clone(),
743 version: manifest.meta.version.to_string(),
744 entry_flows,
745 secret_requirements: Vec::new(),
746 };
747
748 Ok(PackFlows {
749 descriptors,
750 flows,
751 metadata,
752 })
753}
754
755fn load_legacy_flows_from_dir(
756 root: &Path,
757 manifest: &legacy_pack::PackManifest,
758) -> Result<PackFlows> {
759 let mut flows = HashMap::new();
760 let mut descriptors = Vec::new();
761
762 for entry in &manifest.flows {
763 let path = root.join(&entry.file_json);
764 let bytes = std::fs::read(&path)
765 .with_context(|| format!("missing flow json {}", path.display()))?;
766 let doc: FlowDoc = serde_json::from_slice(&bytes)
767 .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
768 let normalized = normalize_flow_doc(doc);
769 let flow_ir = flow_doc_to_ir(normalized)?;
770 let flow = flow_ir_to_flow(flow_ir)?;
771
772 descriptors.push(FlowDescriptor {
773 id: entry.id.clone(),
774 flow_type: entry.kind.clone(),
775 profile: manifest.meta.pack_id.clone(),
776 version: manifest.meta.version.to_string(),
777 description: None,
778 });
779 flows.insert(entry.id.clone(), flow);
780 }
781
782 let mut entry_flows = manifest.meta.entry_flows.clone();
783 if entry_flows.is_empty() {
784 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
785 }
786 let metadata = PackMetadata {
787 pack_id: manifest.meta.pack_id.clone(),
788 version: manifest.meta.version.to_string(),
789 entry_flows,
790 secret_requirements: Vec::new(),
791 };
792
793 Ok(PackFlows {
794 descriptors,
795 flows,
796 metadata,
797 })
798}
799
800pub struct ComponentState {
801 pub host: HostState,
802 wasi_ctx: WasiCtx,
803 resource_table: ResourceTable,
804}
805
806impl ComponentState {
807 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
808 let wasi_ctx = policy
809 .instantiate()
810 .context("failed to build WASI context")?;
811 Ok(Self {
812 host,
813 wasi_ctx,
814 resource_table: ResourceTable::new(),
815 })
816 }
817
818 fn host_mut(&mut self) -> &mut HostState {
819 &mut self.host
820 }
821}
822
823impl control::Host for ComponentState {
824 fn should_cancel(&mut self) -> bool {
825 false
826 }
827
828 fn yield_now(&mut self) {
829 }
831}
832
833fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
834 let mut inst = linker.instance("greentic:component/control@0.4.0")?;
835 inst.func_wrap(
836 "should-cancel",
837 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
838 let host = caller.data_mut();
839 Ok((ComponentControlHost::should_cancel(host),))
840 },
841 )?;
842 inst.func_wrap(
843 "yield-now",
844 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
845 let host = caller.data_mut();
846 ComponentControlHost::yield_now(host);
847 Ok(())
848 },
849 )?;
850 Ok(())
851}
852
853pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
854 add_wasi_to_linker(linker)?;
855 add_all_v1_to_linker(
856 linker,
857 HostFns {
858 http_client_v1_1: Some(|state| state.host_mut()),
859 http_client: Some(|state| state.host_mut()),
860 oauth_broker: None,
861 runner_host_http: Some(|state| state.host_mut()),
862 runner_host_kv: Some(|state| state.host_mut()),
863 telemetry_logger: Some(|state| state.host_mut()),
864 state_store: Some(|state| state.host_mut()),
865 secrets_store: Some(|state| state.host_mut()),
866 },
867 )?;
868 Ok(())
869}
870
871impl OAuthHostContext for ComponentState {
872 fn tenant_id(&self) -> &str {
873 &self.host.config.tenant
874 }
875
876 fn env(&self) -> &str {
877 &self.host.default_env
878 }
879
880 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
881 &mut self.host.oauth_host
882 }
883
884 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
885 self.host.oauth_config.as_ref()
886 }
887}
888
889impl WasiView for ComponentState {
890 fn ctx(&mut self) -> WasiCtxView<'_> {
891 WasiCtxView {
892 ctx: &mut self.wasi_ctx,
893 table: &mut self.resource_table,
894 }
895 }
896}
897
898#[allow(unsafe_code)]
899unsafe impl Send for ComponentState {}
900#[allow(unsafe_code)]
901unsafe impl Sync for ComponentState {}
902
903impl PackRuntime {
904 #[allow(clippy::too_many_arguments)]
905 pub async fn load(
906 path: impl AsRef<Path>,
907 config: Arc<HostConfig>,
908 mocks: Option<Arc<MockLayer>>,
909 archive_source: Option<&Path>,
910 session_store: Option<DynSessionStore>,
911 state_store: Option<DynStateStore>,
912 wasi_policy: Arc<RunnerWasiPolicy>,
913 secrets: DynSecretsManager,
914 oauth_config: Option<OAuthBrokerConfig>,
915 verify_archive: bool,
916 component_resolution: ComponentResolution,
917 ) -> Result<Self> {
918 let path = path.as_ref();
919 let (_pack_root, safe_path) = normalize_pack_path(path)?;
920 let path_meta = std::fs::metadata(&safe_path).ok();
921 let is_dir = path_meta
922 .as_ref()
923 .map(|meta| meta.is_dir())
924 .unwrap_or(false);
925 let is_component = !is_dir
926 && safe_path
927 .extension()
928 .and_then(|ext| ext.to_str())
929 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
930 .unwrap_or(false);
931 let archive_hint_path = if let Some(source) = archive_source {
932 let (_, normalized) = normalize_pack_path(source)?;
933 Some(normalized)
934 } else if is_component || is_dir {
935 None
936 } else {
937 Some(safe_path.clone())
938 };
939 let archive_hint = archive_hint_path.as_deref();
940 if verify_archive {
941 if let Some(verify_target) = archive_hint.and_then(|p| {
942 std::fs::metadata(p)
943 .ok()
944 .filter(|meta| meta.is_file())
945 .map(|_| p)
946 }) {
947 verify::verify_pack(verify_target).await?;
948 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
949 } else {
950 tracing::debug!("skipping archive verification (no archive source)");
951 }
952 }
953 let engine = Engine::default();
954 let mut metadata = PackMetadata::fallback(&safe_path);
955 let mut manifest = None;
956 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
957 let mut flows = None;
958 let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
959 if is_dir {
960 Some(safe_path.clone())
961 } else {
962 None
963 }
964 });
965
966 if let Some(root) = materialized_root.as_ref() {
967 match load_manifest_and_flows_from_dir(root) {
968 Ok(ManifestLoad::New {
969 manifest: m,
970 flows: cache,
971 }) => {
972 metadata = cache.metadata.clone();
973 manifest = Some(*m);
974 flows = Some(cache);
975 }
976 Ok(ManifestLoad::Legacy {
977 manifest: m,
978 flows: cache,
979 }) => {
980 metadata = cache.metadata.clone();
981 legacy_manifest = Some(m);
982 flows = Some(cache);
983 }
984 Err(err) => {
985 warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
986 }
987 }
988 }
989
990 if manifest.is_none()
991 && legacy_manifest.is_none()
992 && let Some(archive_path) = archive_hint
993 {
994 match load_manifest_and_flows(archive_path) {
995 Ok(ManifestLoad::New {
996 manifest: m,
997 flows: cache,
998 }) => {
999 metadata = cache.metadata.clone();
1000 manifest = Some(*m);
1001 flows = Some(cache);
1002 }
1003 Ok(ManifestLoad::Legacy {
1004 manifest: m,
1005 flows: cache,
1006 }) => {
1007 metadata = cache.metadata.clone();
1008 legacy_manifest = Some(m);
1009 flows = Some(cache);
1010 }
1011 Err(err) => {
1012 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1013 }
1014 }
1015 }
1016 let components = if is_component {
1017 let wasm_bytes = fs::read(&safe_path).await?;
1018 metadata = PackMetadata::from_wasm(&wasm_bytes)
1019 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1020 let name = safe_path
1021 .file_stem()
1022 .map(|s| s.to_string_lossy().to_string())
1023 .unwrap_or_else(|| "component".to_string());
1024 let component = Component::from_binary(&engine, &wasm_bytes)?;
1025 let mut map = HashMap::new();
1026 map.insert(
1027 name.clone(),
1028 PackComponent {
1029 name,
1030 version: metadata.version.clone(),
1031 component,
1032 },
1033 );
1034 map
1035 } else {
1036 let specs = component_specs(manifest.as_ref(), legacy_manifest.as_deref());
1037 if specs.is_empty() {
1038 HashMap::new()
1039 } else {
1040 let mut loaded = HashMap::new();
1041 let mut missing: HashSet<String> =
1042 specs.iter().map(|spec| spec.id.clone()).collect();
1043 let mut searched = Vec::new();
1044
1045 if !component_resolution.overrides.is_empty() {
1046 load_components_from_overrides(
1047 &engine,
1048 &component_resolution.overrides,
1049 &specs,
1050 &mut missing,
1051 &mut loaded,
1052 )?;
1053 searched.push("override map".to_string());
1054 }
1055
1056 if let Some(root) = materialized_root.as_ref() {
1057 load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1058 searched.push(format!("components dir {}", root.display()));
1059 }
1060
1061 if let Some(archive_path) = archive_hint {
1062 load_components_from_archive(
1063 &engine,
1064 archive_path,
1065 &specs,
1066 &mut missing,
1067 &mut loaded,
1068 )?;
1069 searched.push(format!("archive {}", archive_path.display()));
1070 }
1071
1072 if !missing.is_empty() {
1073 let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1074 let sources = if searched.is_empty() {
1075 "no component sources".to_string()
1076 } else {
1077 searched.join(", ")
1078 };
1079 bail!(
1080 "components missing: {}; looked in {}",
1081 missing_list,
1082 sources
1083 );
1084 }
1085
1086 loaded
1087 }
1088 };
1089 let http_client = Arc::clone(&HTTP_CLIENT);
1090 Ok(Self {
1091 path: safe_path,
1092 archive_path: archive_hint.map(Path::to_path_buf),
1093 config,
1094 engine,
1095 metadata,
1096 manifest,
1097 legacy_manifest,
1098 mocks,
1099 flows,
1100 components,
1101 http_client,
1102 pre_cache: Mutex::new(HashMap::new()),
1103 session_store,
1104 state_store,
1105 wasi_policy,
1106 provider_registry: RwLock::new(None),
1107 secrets,
1108 oauth_config,
1109 })
1110 }
1111
1112 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1113 if let Some(cache) = &self.flows {
1114 return Ok(cache.descriptors.clone());
1115 }
1116 if let Some(manifest) = &self.manifest {
1117 let descriptors = manifest
1118 .flows
1119 .iter()
1120 .map(|flow| FlowDescriptor {
1121 id: flow.id.as_str().to_string(),
1122 flow_type: flow_kind_to_str(flow.kind).to_string(),
1123 profile: manifest.pack_id.as_str().to_string(),
1124 version: manifest.version.to_string(),
1125 description: None,
1126 })
1127 .collect();
1128 return Ok(descriptors);
1129 }
1130 Ok(Vec::new())
1131 }
1132
1133 #[allow(dead_code)]
1134 pub async fn run_flow(
1135 &self,
1136 flow_id: &str,
1137 input: serde_json::Value,
1138 ) -> Result<serde_json::Value> {
1139 let pack = Arc::new(
1140 PackRuntime::load(
1141 &self.path,
1142 Arc::clone(&self.config),
1143 self.mocks.clone(),
1144 self.archive_path.as_deref(),
1145 self.session_store.clone(),
1146 self.state_store.clone(),
1147 Arc::clone(&self.wasi_policy),
1148 self.secrets.clone(),
1149 self.oauth_config.clone(),
1150 false,
1151 ComponentResolution::default(),
1152 )
1153 .await?,
1154 );
1155
1156 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1157 let retry_config = self.config.retry_config().into();
1158 let mocks = pack.mocks.as_deref();
1159 let tenant = self.config.tenant.as_str();
1160
1161 let ctx = FlowContext {
1162 tenant,
1163 flow_id,
1164 node_id: None,
1165 tool: None,
1166 action: None,
1167 session_id: None,
1168 provider_id: None,
1169 retry_config,
1170 observer: None,
1171 mocks,
1172 };
1173
1174 let execution = engine.execute(ctx, input).await?;
1175 match execution.status {
1176 FlowStatus::Completed => Ok(execution.output),
1177 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1178 "status": "pending",
1179 "reason": wait.reason,
1180 "resume": wait.snapshot,
1181 "response": execution.output,
1182 })),
1183 }
1184 }
1185
1186 pub async fn invoke_component(
1187 &self,
1188 component_ref: &str,
1189 ctx: ComponentExecCtx,
1190 operation: &str,
1191 _config_json: Option<String>,
1192 input_json: String,
1193 ) -> Result<Value> {
1194 let pack_component = self
1195 .components
1196 .get(component_ref)
1197 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1198
1199 let mut linker = Linker::new(&self.engine);
1200 register_all(&mut linker)?;
1201 add_component_control_to_linker(&mut linker)?;
1202 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1203 let pre: ComponentPre<ComponentState> = ComponentPre::new(pre_instance)?;
1204
1205 let host_state = HostState::new(
1206 Arc::clone(&self.config),
1207 Arc::clone(&self.http_client),
1208 self.mocks.clone(),
1209 self.session_store.clone(),
1210 self.state_store.clone(),
1211 Arc::clone(&self.secrets),
1212 self.oauth_config.clone(),
1213 )?;
1214 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1215 let mut store = wasmtime::Store::new(&self.engine, store_state);
1216 let bindings: crate::component_api::Component = pre.instantiate_async(&mut store).await?;
1217 let node = bindings.greentic_component_node();
1218
1219 let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1220
1221 match result {
1222 InvokeResult::Ok(body) => {
1223 if body.is_empty() {
1224 return Ok(Value::Null);
1225 }
1226 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1227 }
1228 InvokeResult::Err(NodeError {
1229 code,
1230 message,
1231 retryable,
1232 backoff_ms,
1233 details,
1234 }) => {
1235 let mut obj = serde_json::Map::new();
1236 obj.insert("ok".into(), Value::Bool(false));
1237 let mut error = serde_json::Map::new();
1238 error.insert("code".into(), Value::String(code));
1239 error.insert("message".into(), Value::String(message));
1240 error.insert("retryable".into(), Value::Bool(retryable));
1241 if let Some(backoff) = backoff_ms {
1242 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1243 }
1244 if let Some(details) = details {
1245 error.insert(
1246 "details".into(),
1247 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1248 );
1249 }
1250 obj.insert("error".into(), Value::Object(error));
1251 Ok(Value::Object(obj))
1252 }
1253 }
1254 }
1255
1256 pub fn resolve_provider(
1257 &self,
1258 provider_id: Option<&str>,
1259 provider_type: Option<&str>,
1260 ) -> Result<ProviderBinding> {
1261 let registry = self.provider_registry()?;
1262 registry.resolve(provider_id, provider_type)
1263 }
1264
1265 pub async fn invoke_provider(
1266 &self,
1267 binding: &ProviderBinding,
1268 _ctx: ComponentExecCtx,
1269 op: &str,
1270 input_json: Vec<u8>,
1271 ) -> Result<Value> {
1272 let component_ref = &binding.component_ref;
1273 let pack_component = self
1274 .components
1275 .get(component_ref)
1276 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1277
1278 let mut linker = Linker::new(&self.engine);
1279 register_all(&mut linker)?;
1280 add_component_control_to_linker(&mut linker)?;
1281 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1282 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1283
1284 let host_state = HostState::new(
1285 Arc::clone(&self.config),
1286 Arc::clone(&self.http_client),
1287 self.mocks.clone(),
1288 self.session_store.clone(),
1289 self.state_store.clone(),
1290 Arc::clone(&self.secrets),
1291 self.oauth_config.clone(),
1292 )?;
1293 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1294 let mut store = wasmtime::Store::new(&self.engine, store_state);
1295 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1296 let provider = bindings.greentic_provider_core_schema_core_api();
1297
1298 let result = provider.call_invoke(&mut store, op, &input_json)?;
1299 deserialize_json_bytes(result)
1300 }
1301
1302 fn provider_registry(&self) -> Result<ProviderRegistry> {
1303 if let Some(registry) = self.provider_registry.read().clone() {
1304 return Ok(registry);
1305 }
1306 let manifest = self
1307 .manifest
1308 .as_ref()
1309 .context("pack manifest required for provider resolution")?;
1310 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1311 let registry = ProviderRegistry::new(
1312 manifest,
1313 self.state_store.clone(),
1314 &self.config.tenant,
1315 &env,
1316 )?;
1317 *self.provider_registry.write() = Some(registry.clone());
1318 Ok(registry)
1319 }
1320
1321 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1322 if let Some(cache) = &self.flows {
1323 return cache
1324 .flows
1325 .get(flow_id)
1326 .cloned()
1327 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1328 }
1329 if let Some(manifest) = &self.manifest {
1330 let entry = manifest
1331 .flows
1332 .iter()
1333 .find(|f| f.id.as_str() == flow_id)
1334 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1335 return Ok(entry.flow.clone());
1336 }
1337 bail!("flow '{flow_id}' not available (pack exports disabled)")
1338 }
1339
1340 pub fn metadata(&self) -> &PackMetadata {
1341 &self.metadata
1342 }
1343
1344 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1345 &self.metadata.secret_requirements
1346 }
1347
1348 pub fn missing_secrets(
1349 &self,
1350 tenant_ctx: &TypesTenantCtx,
1351 ) -> Vec<greentic_types::SecretRequirement> {
1352 let env = tenant_ctx.env.as_str().to_string();
1353 let tenant = tenant_ctx.tenant.as_str().to_string();
1354 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1355 self.required_secrets()
1356 .iter()
1357 .filter(|req| {
1358 if let Some(scope) = &req.scope {
1360 if scope.env != env {
1361 return false;
1362 }
1363 if scope.tenant != tenant {
1364 return false;
1365 }
1366 if let Some(ref team_req) = scope.team
1367 && team.as_ref() != Some(team_req)
1368 {
1369 return false;
1370 }
1371 }
1372 read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1373 })
1374 .cloned()
1375 .collect()
1376 }
1377
1378 pub fn for_component_test(
1379 components: Vec<(String, PathBuf)>,
1380 flows: HashMap<String, FlowIR>,
1381 config: Arc<HostConfig>,
1382 ) -> Result<Self> {
1383 let engine = Engine::default();
1384 let mut component_map = HashMap::new();
1385 for (name, path) in components {
1386 if !path.exists() {
1387 bail!("component artifact missing: {}", path.display());
1388 }
1389 let wasm_bytes = std::fs::read(&path)?;
1390 let component = Component::from_binary(&engine, &wasm_bytes)
1391 .with_context(|| format!("failed to compile component {}", path.display()))?;
1392 component_map.insert(
1393 name.clone(),
1394 PackComponent {
1395 name,
1396 version: "0.0.0".into(),
1397 component,
1398 },
1399 );
1400 }
1401
1402 let mut flow_map = HashMap::new();
1403 let mut descriptors = Vec::new();
1404 for (id, ir) in flows {
1405 let flow_type = ir.flow_type.clone();
1406 let flow = flow_ir_to_flow(ir)?;
1407 flow_map.insert(id.clone(), flow);
1408 descriptors.push(FlowDescriptor {
1409 id: id.clone(),
1410 flow_type,
1411 profile: "test".into(),
1412 version: "0.0.0".into(),
1413 description: None,
1414 });
1415 }
1416 let flows_cache = PackFlows {
1417 descriptors: descriptors.clone(),
1418 flows: flow_map,
1419 metadata: PackMetadata::fallback(Path::new("component-test")),
1420 };
1421
1422 Ok(Self {
1423 path: PathBuf::new(),
1424 archive_path: None,
1425 config,
1426 engine,
1427 metadata: PackMetadata::fallback(Path::new("component-test")),
1428 manifest: None,
1429 legacy_manifest: None,
1430 mocks: None,
1431 flows: Some(flows_cache),
1432 components: component_map,
1433 http_client: Arc::clone(&HTTP_CLIENT),
1434 pre_cache: Mutex::new(HashMap::new()),
1435 session_store: None,
1436 state_store: None,
1437 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1438 provider_registry: RwLock::new(None),
1439 secrets: crate::secrets::default_manager(),
1440 oauth_config: None,
1441 })
1442 }
1443}
1444
1445struct PackFlows {
1446 descriptors: Vec<FlowDescriptor>,
1447 flows: HashMap<String, Flow>,
1448 metadata: PackMetadata,
1449}
1450
1451fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1452 if bytes.is_empty() {
1453 return Ok(Value::Null);
1454 }
1455 serde_json::from_slice(&bytes).or_else(|_| {
1456 String::from_utf8(bytes)
1457 .map(Value::String)
1458 .map_err(|err| anyhow!(err))
1459 })
1460}
1461
1462impl PackFlows {
1463 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1464 let descriptors = manifest
1465 .flows
1466 .iter()
1467 .map(|entry| FlowDescriptor {
1468 id: entry.id.as_str().to_string(),
1469 flow_type: flow_kind_to_str(entry.kind).to_string(),
1470 profile: manifest.pack_id.as_str().to_string(),
1471 version: manifest.version.to_string(),
1472 description: None,
1473 })
1474 .collect();
1475 let mut flows = HashMap::new();
1476 for entry in &manifest.flows {
1477 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1478 }
1479 Self {
1480 metadata: PackMetadata::from_manifest(&manifest),
1481 descriptors,
1482 flows,
1483 }
1484 }
1485}
1486
1487fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1488 match kind {
1489 greentic_types::FlowKind::Messaging => "messaging",
1490 greentic_types::FlowKind::Event => "event",
1491 greentic_types::FlowKind::ComponentConfig => "component-config",
1492 greentic_types::FlowKind::Job => "job",
1493 greentic_types::FlowKind::Http => "http",
1494 }
1495}
1496
1497fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1498 let mut file = archive
1499 .by_name(name)
1500 .with_context(|| format!("entry {name} missing from archive"))?;
1501 let mut buf = Vec::new();
1502 file.read_to_end(&mut buf)?;
1503 Ok(buf)
1504}
1505
1506fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1507 for node in doc.nodes.values_mut() {
1508 if node.component.is_empty()
1509 && let Some((component_ref, payload)) = node.raw.iter().next()
1510 {
1511 if component_ref.starts_with("emit.") {
1512 node.component = component_ref.clone();
1513 node.payload = payload.clone();
1514 node.raw.clear();
1515 continue;
1516 }
1517 let (target_component, operation, input, config) =
1518 infer_component_exec(payload, component_ref);
1519 let mut payload_obj = serde_json::Map::new();
1520 payload_obj.insert("component".into(), Value::String(target_component));
1522 payload_obj.insert("operation".into(), Value::String(operation));
1523 payload_obj.insert("input".into(), input);
1524 if let Some(cfg) = config {
1525 payload_obj.insert("config".into(), cfg);
1526 }
1527 node.component = "component.exec".to_string();
1528 node.payload = Value::Object(payload_obj);
1529 }
1530 }
1531 doc
1532}
1533
1534fn infer_component_exec(
1535 payload: &Value,
1536 component_ref: &str,
1537) -> (String, String, Value, Option<Value>) {
1538 let default_op = if component_ref.starts_with("templating.") {
1539 "render"
1540 } else {
1541 "invoke"
1542 }
1543 .to_string();
1544
1545 if let Value::Object(map) = payload {
1546 let op = map
1547 .get("op")
1548 .or_else(|| map.get("operation"))
1549 .and_then(Value::as_str)
1550 .map(|s| s.to_string())
1551 .unwrap_or_else(|| default_op.clone());
1552
1553 let mut input = map.clone();
1554 let config = input.remove("config");
1555 let component = input
1556 .get("component")
1557 .or_else(|| input.get("component_ref"))
1558 .and_then(Value::as_str)
1559 .map(|s| s.to_string())
1560 .unwrap_or_else(|| component_ref.to_string());
1561 input.remove("component");
1562 input.remove("component_ref");
1563 input.remove("op");
1564 input.remove("operation");
1565 return (component, op, Value::Object(input), config);
1566 }
1567
1568 (component_ref.to_string(), default_op, payload.clone(), None)
1569}
1570
1571#[derive(Clone, Debug)]
1572struct ComponentSpec {
1573 id: String,
1574 version: String,
1575 legacy_path: Option<String>,
1576}
1577
1578fn component_specs(
1579 manifest: Option<&greentic_types::PackManifest>,
1580 legacy_manifest: Option<&legacy_pack::PackManifest>,
1581) -> Vec<ComponentSpec> {
1582 if let Some(manifest) = manifest {
1583 return manifest
1584 .components
1585 .iter()
1586 .map(|entry| ComponentSpec {
1587 id: entry.id.as_str().to_string(),
1588 version: entry.version.to_string(),
1589 legacy_path: None,
1590 })
1591 .collect();
1592 }
1593 if let Some(legacy_manifest) = legacy_manifest {
1594 return legacy_manifest
1595 .components
1596 .iter()
1597 .map(|entry| ComponentSpec {
1598 id: entry.name.clone(),
1599 version: entry.version.to_string(),
1600 legacy_path: Some(entry.file_wasm.clone()),
1601 })
1602 .collect();
1603 }
1604 Vec::new()
1605}
1606
1607fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1608 if let Some(path) = &spec.legacy_path {
1609 return root.join(path);
1610 }
1611 root.join("components").join(format!("{}.wasm", spec.id))
1612}
1613
1614fn load_components_from_overrides(
1615 engine: &Engine,
1616 overrides: &HashMap<String, PathBuf>,
1617 specs: &[ComponentSpec],
1618 missing: &mut HashSet<String>,
1619 into: &mut HashMap<String, PackComponent>,
1620) -> Result<()> {
1621 for spec in specs {
1622 if !missing.contains(&spec.id) {
1623 continue;
1624 }
1625 let Some(path) = overrides.get(&spec.id) else {
1626 continue;
1627 };
1628 let bytes = std::fs::read(path)
1629 .with_context(|| format!("failed to read override component {}", path.display()))?;
1630 let component = Component::from_binary(engine, &bytes).with_context(|| {
1631 format!(
1632 "failed to compile component {} from override {}",
1633 spec.id,
1634 path.display()
1635 )
1636 })?;
1637 into.insert(
1638 spec.id.clone(),
1639 PackComponent {
1640 name: spec.id.clone(),
1641 version: spec.version.clone(),
1642 component,
1643 },
1644 );
1645 missing.remove(&spec.id);
1646 }
1647 Ok(())
1648}
1649
1650fn load_components_from_dir(
1651 engine: &Engine,
1652 root: &Path,
1653 specs: &[ComponentSpec],
1654 missing: &mut HashSet<String>,
1655 into: &mut HashMap<String, PackComponent>,
1656) -> Result<()> {
1657 for spec in specs {
1658 if !missing.contains(&spec.id) {
1659 continue;
1660 }
1661 let path = component_path_for_spec(root, spec);
1662 if !path.exists() {
1663 tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
1664 continue;
1665 }
1666 let bytes = std::fs::read(&path)
1667 .with_context(|| format!("failed to read component {}", path.display()))?;
1668 let component = Component::from_binary(engine, &bytes).with_context(|| {
1669 format!(
1670 "failed to compile component {} from {}",
1671 spec.id,
1672 path.display()
1673 )
1674 })?;
1675 into.insert(
1676 spec.id.clone(),
1677 PackComponent {
1678 name: spec.id.clone(),
1679 version: spec.version.clone(),
1680 component,
1681 },
1682 );
1683 missing.remove(&spec.id);
1684 }
1685 Ok(())
1686}
1687
1688fn load_components_from_archive(
1689 engine: &Engine,
1690 path: &Path,
1691 specs: &[ComponentSpec],
1692 missing: &mut HashSet<String>,
1693 into: &mut HashMap<String, PackComponent>,
1694) -> Result<()> {
1695 let mut archive = ZipArchive::new(File::open(path)?)
1696 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1697 for spec in specs {
1698 if !missing.contains(&spec.id) {
1699 continue;
1700 }
1701 let file_name = spec
1702 .legacy_path
1703 .clone()
1704 .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
1705 let bytes = match read_entry(&mut archive, &file_name) {
1706 Ok(bytes) => bytes,
1707 Err(err) => {
1708 warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
1709 continue;
1710 }
1711 };
1712 let component = Component::from_binary(engine, &bytes)
1713 .with_context(|| format!("failed to compile component {}", spec.id))?;
1714 into.insert(
1715 spec.id.clone(),
1716 PackComponent {
1717 name: spec.id.clone(),
1718 version: spec.version.clone(),
1719 component,
1720 },
1721 );
1722 missing.remove(&spec.id);
1723 }
1724 Ok(())
1725}
1726
1727#[cfg(test)]
1728mod tests {
1729 use super::*;
1730 use greentic_flow::model::{FlowDoc, NodeDoc};
1731 use serde_json::json;
1732 use std::collections::BTreeMap;
1733
1734 #[test]
1735 fn normalizes_raw_component_to_component_exec() {
1736 let mut nodes = BTreeMap::new();
1737 let mut raw = BTreeMap::new();
1738 raw.insert(
1739 "templating.handlebars".into(),
1740 json!({ "template": "Hi {{name}}" }),
1741 );
1742 nodes.insert(
1743 "start".into(),
1744 NodeDoc {
1745 raw,
1746 routing: json!([{"out": true}]),
1747 ..Default::default()
1748 },
1749 );
1750 let doc = FlowDoc {
1751 id: "welcome".into(),
1752 title: None,
1753 description: None,
1754 flow_type: "messaging".into(),
1755 start: Some("start".into()),
1756 parameters: json!({}),
1757 tags: Vec::new(),
1758 entrypoints: BTreeMap::new(),
1759 nodes,
1760 };
1761
1762 let normalized = normalize_flow_doc(doc);
1763 let node = normalized.nodes.get("start").expect("node exists");
1764 assert_eq!(node.component, "component.exec");
1765 assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1766 let payload = node.payload.as_object().expect("payload object");
1767 assert_eq!(
1768 payload.get("component"),
1769 Some(&Value::String("templating.handlebars".into()))
1770 );
1771 assert_eq!(
1772 payload.get("operation"),
1773 Some(&Value::String("render".into()))
1774 );
1775 let input = payload.get("input").unwrap();
1776 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1777 }
1778}
1779
1780#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1781pub struct PackMetadata {
1782 pub pack_id: String,
1783 pub version: String,
1784 #[serde(default)]
1785 pub entry_flows: Vec<String>,
1786 #[serde(default)]
1787 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
1788}
1789
1790impl PackMetadata {
1791 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1792 let parser = Parser::new(0);
1793 for payload in parser.parse_all(bytes) {
1794 let payload = payload.ok()?;
1795 match payload {
1796 Payload::CustomSection(section) => {
1797 if section.name() == "greentic.manifest"
1798 && let Ok(meta) = Self::from_bytes(section.data())
1799 {
1800 return Some(meta);
1801 }
1802 }
1803 Payload::DataSection(reader) => {
1804 for segment in reader.into_iter().flatten() {
1805 if let Ok(meta) = Self::from_bytes(segment.data) {
1806 return Some(meta);
1807 }
1808 }
1809 }
1810 _ => {}
1811 }
1812 }
1813 None
1814 }
1815
1816 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1817 #[derive(Deserialize)]
1818 struct RawManifest {
1819 pack_id: String,
1820 version: String,
1821 #[serde(default)]
1822 entry_flows: Vec<String>,
1823 #[serde(default)]
1824 flows: Vec<RawFlow>,
1825 #[serde(default)]
1826 secret_requirements: Vec<greentic_types::SecretRequirement>,
1827 }
1828
1829 #[derive(Deserialize)]
1830 struct RawFlow {
1831 id: String,
1832 }
1833
1834 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1835 let mut entry_flows = if manifest.entry_flows.is_empty() {
1836 manifest.flows.iter().map(|f| f.id.clone()).collect()
1837 } else {
1838 manifest.entry_flows.clone()
1839 };
1840 entry_flows.retain(|id| !id.is_empty());
1841 Ok(Self {
1842 pack_id: manifest.pack_id,
1843 version: manifest.version,
1844 entry_flows,
1845 secret_requirements: manifest.secret_requirements,
1846 })
1847 }
1848
1849 pub fn fallback(path: &Path) -> Self {
1850 let pack_id = path
1851 .file_stem()
1852 .map(|s| s.to_string_lossy().into_owned())
1853 .unwrap_or_else(|| "unknown-pack".to_string());
1854 Self {
1855 pack_id,
1856 version: "0.0.0".to_string(),
1857 entry_flows: Vec::new(),
1858 secret_requirements: Vec::new(),
1859 }
1860 }
1861
1862 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
1863 let entry_flows = manifest
1864 .flows
1865 .iter()
1866 .map(|flow| flow.id.as_str().to_string())
1867 .collect::<Vec<_>>();
1868 Self {
1869 pack_id: manifest.pack_id.as_str().to_string(),
1870 version: manifest.version.to_string(),
1871 entry_flows,
1872 secret_requirements: manifest.secret_requirements.clone(),
1873 }
1874 }
1875}