1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11 ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20 self as host_v1, HostFns, add_all_v1_to_linker,
21 messaging_session::{
22 MessagingSessionError as MessagingError, MessagingSessionHost, OpAck as MessagingAck,
23 OutboundMessage, TenantCtx as MessagingTenantCtx,
24 },
25 runner_host_http::RunnerHostHttp,
26 runner_host_kv::RunnerHostKv,
27 secrets_store::{SecretsError, SecretsStoreHost},
28 state_store::{
29 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
30 StateStoreHost, TenantCtx as StateTenantCtx,
31 },
32 telemetry_logger::{
33 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
34 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
35 TenantCtx as TelemetryTenantCtx,
36 },
37};
38use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
39use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
40use greentic_pack::builder as legacy_pack;
41use greentic_types::{
42 EnvId, Flow, StateKey as StoreStateKey, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
43 decode_pack_manifest,
44};
45use host_v1::http_client::{
46 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
47 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
48 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
49 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
50};
51use once_cell::sync::Lazy;
52use parking_lot::{Mutex, RwLock};
53use reqwest::blocking::Client as BlockingClient;
54use runner_core::normalize_under_root;
55use serde::{Deserialize, Serialize};
56use serde_cbor;
57use serde_json::{self, Value};
58use tokio::fs;
59use wasmparser::{Parser, Payload};
60use wasmtime::StoreContextMut;
61use zip::ZipArchive;
62
63use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
64use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
65use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
66
67use crate::config::HostConfig;
68use crate::secrets::{DynSecretsManager, read_secret_blocking};
69use crate::storage::state::STATE_PREFIX;
70use crate::storage::{DynSessionStore, DynStateStore};
71use crate::verify;
72use crate::wasi::RunnerWasiPolicy;
73use tracing::warn;
74use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
75use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
76
77use greentic_flow::model::FlowDoc;
78
79#[allow(dead_code)]
80pub struct PackRuntime {
81 path: PathBuf,
83 archive_path: Option<PathBuf>,
85 config: Arc<HostConfig>,
86 engine: Engine,
87 metadata: PackMetadata,
88 manifest: Option<greentic_types::PackManifest>,
89 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
90 mocks: Option<Arc<MockLayer>>,
91 flows: Option<PackFlows>,
92 components: HashMap<String, PackComponent>,
93 http_client: Arc<BlockingClient>,
94 pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
95 session_store: Option<DynSessionStore>,
96 state_store: Option<DynStateStore>,
97 wasi_policy: Arc<RunnerWasiPolicy>,
98 provider_registry: RwLock<Option<ProviderRegistry>>,
99 secrets: DynSecretsManager,
100 oauth_config: Option<OAuthBrokerConfig>,
101}
102
103struct PackComponent {
104 #[allow(dead_code)]
105 name: String,
106 #[allow(dead_code)]
107 version: String,
108 component: Component,
109}
110
111fn build_blocking_client() -> BlockingClient {
112 std::thread::spawn(|| {
113 BlockingClient::builder()
114 .no_proxy()
115 .build()
116 .expect("blocking client")
117 })
118 .join()
119 .expect("client build thread panicked")
120}
121
122fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
123 let (root, candidate) = if path.is_absolute() {
124 let parent = path
125 .parent()
126 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
127 let root = parent
128 .canonicalize()
129 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
130 let file = path
131 .file_name()
132 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
133 (root, PathBuf::from(file))
134 } else {
135 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
136 let base = if let Some(parent) = path.parent() {
137 cwd.join(parent)
138 } else {
139 cwd
140 };
141 let root = base
142 .canonicalize()
143 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
144 let file = path
145 .file_name()
146 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
147 (root, PathBuf::from(file))
148 };
149 let safe = normalize_under_root(&root, &candidate)?;
150 Ok((root, safe))
151}
152
153static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct FlowDescriptor {
157 pub id: String,
158 #[serde(rename = "type")]
159 pub flow_type: String,
160 pub profile: String,
161 pub version: String,
162 #[serde(default)]
163 pub description: Option<String>,
164}
165
166pub struct HostState {
167 config: Arc<HostConfig>,
168 http_client: Arc<BlockingClient>,
169 default_env: String,
170 #[allow(dead_code)]
171 session_store: Option<DynSessionStore>,
172 state_store: Option<DynStateStore>,
173 mocks: Option<Arc<MockLayer>>,
174 secrets: DynSecretsManager,
175 oauth_config: Option<OAuthBrokerConfig>,
176 oauth_host: OAuthBrokerHost,
177}
178
179impl HostState {
180 #[allow(clippy::default_constructed_unit_structs)]
181 pub fn new(
182 config: Arc<HostConfig>,
183 http_client: Arc<BlockingClient>,
184 mocks: Option<Arc<MockLayer>>,
185 session_store: Option<DynSessionStore>,
186 state_store: Option<DynStateStore>,
187 secrets: DynSecretsManager,
188 oauth_config: Option<OAuthBrokerConfig>,
189 ) -> Result<Self> {
190 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
191 Ok(Self {
192 config,
193 http_client,
194 default_env,
195 session_store,
196 state_store,
197 mocks,
198 secrets,
199 oauth_config,
200 oauth_host: OAuthBrokerHost::default(),
201 })
202 }
203
204 pub fn get_secret(&self, key: &str) -> Result<String> {
205 if provider_core_only::is_enabled() {
206 bail!(provider_core_only::blocked_message("secrets"))
207 }
208 if !self.config.secrets_policy.is_allowed(key) {
209 bail!("secret {key} is not permitted by bindings policy");
210 }
211 if let Some(mock) = &self.mocks
212 && let Some(value) = mock.secrets_lookup(key)
213 {
214 return Ok(value);
215 }
216 let bytes = read_secret_blocking(&self.secrets, key)
217 .context("failed to read secret from manager")?;
218 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
219 Ok(value)
220 }
221
222 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
223 let tenant_raw = ctx
224 .as_ref()
225 .map(|ctx| ctx.tenant.clone())
226 .unwrap_or_else(|| self.config.tenant.clone());
227 let env_raw = ctx
228 .as_ref()
229 .map(|ctx| ctx.env.clone())
230 .unwrap_or_else(|| self.default_env.clone());
231 let tenant_id = TenantId::from_str(&tenant_raw)
232 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
233 let env_id = EnvId::from_str(&env_raw)
234 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
235 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
236 if let Some(ctx) = ctx {
237 if let Some(team) = ctx.team.or(ctx.team_id) {
238 let team_id =
239 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
240 tenant_ctx = tenant_ctx.with_team(Some(team_id));
241 }
242 if let Some(user) = ctx.user.or(ctx.user_id) {
243 let user_id =
244 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
245 tenant_ctx = tenant_ctx.with_user(Some(user_id));
246 }
247 if let Some(flow) = ctx.flow_id {
248 tenant_ctx = tenant_ctx.with_flow(flow);
249 }
250 if let Some(node) = ctx.node_id {
251 tenant_ctx = tenant_ctx.with_node(node);
252 }
253 if let Some(provider) = ctx.provider_id {
254 tenant_ctx = tenant_ctx.with_provider(provider);
255 }
256 if let Some(session) = ctx.session_id {
257 tenant_ctx = tenant_ctx.with_session(session);
258 }
259 tenant_ctx.trace_id = ctx.trace_id;
260 }
261 Ok(tenant_ctx)
262 }
263
264 fn send_http_request(
265 &mut self,
266 req: HttpRequest,
267 opts: Option<HttpRequestOptionsV1_1>,
268 _ctx: Option<HttpTenantCtx>,
269 ) -> Result<HttpResponse, HttpClientError> {
270 if !self.config.http_enabled {
271 return Err(HttpClientError {
272 code: "denied".into(),
273 message: "http client disabled by policy".into(),
274 });
275 }
276
277 let mut mock_state = None;
278 let raw_body = req.body.clone();
279 if let Some(mock) = &self.mocks
280 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
281 {
282 match mock.http_begin(&meta) {
283 HttpDecision::Mock(response) => {
284 let headers = response
285 .headers
286 .iter()
287 .map(|(k, v)| (k.clone(), v.clone()))
288 .collect();
289 return Ok(HttpResponse {
290 status: response.status,
291 headers,
292 body: response.body.clone().map(|b| b.into_bytes()),
293 });
294 }
295 HttpDecision::Deny(reason) => {
296 return Err(HttpClientError {
297 code: "denied".into(),
298 message: reason,
299 });
300 }
301 HttpDecision::Passthrough { record } => {
302 mock_state = Some((meta, record));
303 }
304 }
305 }
306
307 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
308 let mut builder = self.http_client.request(method, &req.url);
309 for (key, value) in req.headers {
310 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
311 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
312 {
313 builder = builder.header(header, header_value);
314 }
315 }
316
317 if let Some(body) = raw_body.clone() {
318 builder = builder.body(body);
319 }
320
321 if let Some(opts) = opts {
322 if let Some(timeout_ms) = opts.timeout_ms {
323 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
324 }
325 if opts.allow_insecure == Some(true) {
326 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
327 }
328 if let Some(follow_redirects) = opts.follow_redirects
329 && !follow_redirects
330 {
331 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
332 }
333 }
334
335 let response = match builder.send() {
336 Ok(resp) => resp,
337 Err(err) => {
338 warn!(url = %req.url, error = %err, "http client request failed");
339 return Err(HttpClientError {
340 code: "unavailable".into(),
341 message: err.to_string(),
342 });
343 }
344 };
345
346 let status = response.status().as_u16();
347 let headers_vec = response
348 .headers()
349 .iter()
350 .map(|(k, v)| {
351 (
352 k.as_str().to_string(),
353 v.to_str().unwrap_or_default().to_string(),
354 )
355 })
356 .collect::<Vec<_>>();
357 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
358
359 if let Some((meta, true)) = mock_state.take()
360 && let Some(mock) = &self.mocks
361 {
362 let recorded = HttpMockResponse::new(
363 status,
364 headers_vec.clone().into_iter().collect(),
365 body_bytes
366 .as_ref()
367 .map(|b| String::from_utf8_lossy(b).into_owned()),
368 );
369 mock.http_record(&meta, &recorded);
370 }
371
372 Ok(HttpResponse {
373 status,
374 headers: headers_vec,
375 body: body_bytes,
376 })
377 }
378}
379
380impl SecretsStoreHost for HostState {
381 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
382 if provider_core_only::is_enabled() {
383 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
384 return Err(SecretsError::Denied);
385 }
386 if !self.config.secrets_policy.is_allowed(&key) {
387 return Err(SecretsError::Denied);
388 }
389 if let Some(mock) = &self.mocks
390 && let Some(value) = mock.secrets_lookup(&key)
391 {
392 return Ok(Some(value.into_bytes()));
393 }
394 match read_secret_blocking(&self.secrets, &key) {
395 Ok(bytes) => Ok(Some(bytes)),
396 Err(err) => {
397 warn!(secret = %key, error = %err, "secret lookup failed");
398 Err(SecretsError::NotFound)
399 }
400 }
401 }
402}
403
404impl HttpClientHost for HostState {
405 fn send(
406 &mut self,
407 req: HttpRequest,
408 ctx: Option<HttpTenantCtx>,
409 ) -> Result<HttpResponse, HttpClientError> {
410 self.send_http_request(req, None, ctx)
411 }
412}
413
414impl HttpClientHostV1_1 for HostState {
415 fn send(
416 &mut self,
417 req: HttpRequestV1_1,
418 opts: Option<HttpRequestOptionsV1_1>,
419 ctx: Option<HttpTenantCtxV1_1>,
420 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
421 let legacy_req = HttpRequest {
422 method: req.method,
423 url: req.url,
424 headers: req.headers,
425 body: req.body,
426 };
427 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
428 env: ctx.env,
429 tenant: ctx.tenant,
430 tenant_id: ctx.tenant_id,
431 team: ctx.team,
432 team_id: ctx.team_id,
433 user: ctx.user,
434 user_id: ctx.user_id,
435 trace_id: ctx.trace_id,
436 correlation_id: ctx.correlation_id,
437 attributes: ctx.attributes,
438 session_id: ctx.session_id,
439 flow_id: ctx.flow_id,
440 node_id: ctx.node_id,
441 provider_id: ctx.provider_id,
442 deadline_ms: ctx.deadline_ms,
443 attempt: ctx.attempt,
444 idempotency_key: ctx.idempotency_key,
445 impersonation: ctx
446 .impersonation
447 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
448 actor_id,
449 reason,
450 }),
451 });
452
453 self.send_http_request(legacy_req, opts, legacy_ctx)
454 .map(|resp| HttpResponseV1_1 {
455 status: resp.status,
456 headers: resp.headers,
457 body: resp.body,
458 })
459 .map_err(|err| HttpClientErrorV1_1 {
460 code: err.code,
461 message: err.message,
462 })
463 }
464}
465
466impl StateStoreHost for HostState {
467 fn read(
468 &mut self,
469 key: HostStateKey,
470 ctx: Option<StateTenantCtx>,
471 ) -> Result<Vec<u8>, StateError> {
472 let store = match self.state_store.as_ref() {
473 Some(store) => store.clone(),
474 None => {
475 return Err(StateError {
476 code: "unavailable".into(),
477 message: "state store not configured".into(),
478 });
479 }
480 };
481 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
482 Ok(ctx) => ctx,
483 Err(err) => {
484 return Err(StateError {
485 code: "invalid-ctx".into(),
486 message: err.to_string(),
487 });
488 }
489 };
490 let key = StoreStateKey::from(key);
491 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
492 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
493 Ok(None) => Err(StateError {
494 code: "not_found".into(),
495 message: "state key not found".into(),
496 }),
497 Err(err) => Err(StateError {
498 code: "internal".into(),
499 message: err.to_string(),
500 }),
501 }
502 }
503
504 fn write(
505 &mut self,
506 key: HostStateKey,
507 bytes: Vec<u8>,
508 ctx: Option<StateTenantCtx>,
509 ) -> Result<StateOpAck, StateError> {
510 let store = match self.state_store.as_ref() {
511 Some(store) => store.clone(),
512 None => {
513 return Err(StateError {
514 code: "unavailable".into(),
515 message: "state store not configured".into(),
516 });
517 }
518 };
519 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
520 Ok(ctx) => ctx,
521 Err(err) => {
522 return Err(StateError {
523 code: "invalid-ctx".into(),
524 message: err.to_string(),
525 });
526 }
527 };
528 let key = StoreStateKey::from(key);
529 let value = serde_json::from_slice(&bytes)
530 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
531 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
532 Ok(()) => Ok(StateOpAck::Ok),
533 Err(err) => Err(StateError {
534 code: "internal".into(),
535 message: err.to_string(),
536 }),
537 }
538 }
539
540 fn delete(
541 &mut self,
542 key: HostStateKey,
543 ctx: Option<StateTenantCtx>,
544 ) -> Result<StateOpAck, StateError> {
545 let store = match self.state_store.as_ref() {
546 Some(store) => store.clone(),
547 None => {
548 return Err(StateError {
549 code: "unavailable".into(),
550 message: "state store not configured".into(),
551 });
552 }
553 };
554 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
555 Ok(ctx) => ctx,
556 Err(err) => {
557 return Err(StateError {
558 code: "invalid-ctx".into(),
559 message: err.to_string(),
560 });
561 }
562 };
563 let key = StoreStateKey::from(key);
564 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
565 Ok(_) => Ok(StateOpAck::Ok),
566 Err(err) => Err(StateError {
567 code: "internal".into(),
568 message: err.to_string(),
569 }),
570 }
571 }
572}
573
574impl TelemetryLoggerHost for HostState {
575 fn log(
576 &mut self,
577 span: TelemetrySpanContext,
578 fields: Vec<(String, String)>,
579 _ctx: Option<TelemetryTenantCtx>,
580 ) -> Result<TelemetryAck, TelemetryError> {
581 if let Some(mock) = &self.mocks
582 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
583 {
584 return Ok(TelemetryAck::Ok);
585 }
586 let mut map = serde_json::Map::new();
587 for (k, v) in fields {
588 map.insert(k, Value::String(v));
589 }
590 tracing::info!(
591 tenant = %span.tenant,
592 flow_id = %span.flow_id,
593 node = ?span.node_id,
594 provider = %span.provider,
595 fields = %serde_json::Value::Object(map.clone()),
596 "telemetry log from pack"
597 );
598 Ok(TelemetryAck::Ok)
599 }
600}
601
602impl RunnerHostHttp for HostState {
603 fn request(
604 &mut self,
605 method: String,
606 url: String,
607 headers: Vec<String>,
608 body: Option<Vec<u8>>,
609 ) -> Result<Vec<u8>, String> {
610 let req = HttpRequest {
611 method,
612 url,
613 headers: headers
614 .chunks(2)
615 .filter_map(|chunk| {
616 if chunk.len() == 2 {
617 Some((chunk[0].clone(), chunk[1].clone()))
618 } else {
619 None
620 }
621 })
622 .collect(),
623 body,
624 };
625 match HttpClientHost::send(self, req, None) {
626 Ok(resp) => Ok(resp.body.unwrap_or_default()),
627 Err(err) => Err(err.message),
628 }
629 }
630}
631
632impl RunnerHostKv for HostState {
633 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
634 None
635 }
636
637 fn put(&mut self, _ns: String, _key: String, _val: String) {}
638}
639
640impl MessagingSessionHost for HostState {
641 fn send(
642 &mut self,
643 _message: OutboundMessage,
644 _ctx: MessagingTenantCtx,
645 ) -> Result<MessagingAck, MessagingError> {
646 Err(MessagingError {
647 code: "unimplemented".into(),
648 message: "messaging session host not wired".into(),
649 })
650 }
651}
652
653enum ManifestLoad {
654 New {
655 manifest: Box<greentic_types::PackManifest>,
656 flows: PackFlows,
657 },
658 Legacy {
659 manifest: Box<legacy_pack::PackManifest>,
660 flows: PackFlows,
661 },
662}
663
664fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
665 let mut archive = ZipArchive::new(File::open(path)?)
666 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
667 let bytes = read_entry(&mut archive, "manifest.cbor")
668 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
669 match decode_pack_manifest(&bytes) {
670 Ok(manifest) => {
671 let cache = PackFlows::from_manifest(manifest.clone());
672 Ok(ManifestLoad::New {
673 manifest: Box::new(manifest),
674 flows: cache,
675 })
676 }
677 Err(err) => {
678 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
679 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
681 .context("failed to decode legacy pack manifest from manifest.cbor")?;
682 let flows = load_legacy_flows(&mut archive, &legacy)?;
683 Ok(ManifestLoad::Legacy {
684 manifest: Box::new(legacy),
685 flows,
686 })
687 }
688 }
689}
690
691fn load_legacy_flows(
692 archive: &mut ZipArchive<File>,
693 manifest: &legacy_pack::PackManifest,
694) -> Result<PackFlows> {
695 let mut flows = HashMap::new();
696 let mut descriptors = Vec::new();
697
698 for entry in &manifest.flows {
699 let bytes = read_entry(archive, &entry.file_json)
700 .with_context(|| format!("missing flow json {}", entry.file_json))?;
701 let doc: FlowDoc = serde_json::from_slice(&bytes)
702 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
703 let normalized = normalize_flow_doc(doc);
704 let flow_ir = flow_doc_to_ir(normalized)?;
705 let flow = flow_ir_to_flow(flow_ir)?;
706
707 descriptors.push(FlowDescriptor {
708 id: entry.id.clone(),
709 flow_type: entry.kind.clone(),
710 profile: manifest.meta.pack_id.clone(),
711 version: manifest.meta.version.to_string(),
712 description: None,
713 });
714 flows.insert(entry.id.clone(), flow);
715 }
716
717 let mut entry_flows = manifest.meta.entry_flows.clone();
718 if entry_flows.is_empty() {
719 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
720 }
721 let metadata = PackMetadata {
722 pack_id: manifest.meta.pack_id.clone(),
723 version: manifest.meta.version.to_string(),
724 entry_flows,
725 secret_requirements: Vec::new(),
726 };
727
728 Ok(PackFlows {
729 descriptors,
730 flows,
731 metadata,
732 })
733}
734
735pub struct ComponentState {
736 pub host: HostState,
737 wasi_ctx: WasiCtx,
738 resource_table: ResourceTable,
739}
740
741impl ComponentState {
742 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
743 let wasi_ctx = policy
744 .instantiate()
745 .context("failed to build WASI context")?;
746 Ok(Self {
747 host,
748 wasi_ctx,
749 resource_table: ResourceTable::new(),
750 })
751 }
752
753 fn host_mut(&mut self) -> &mut HostState {
754 &mut self.host
755 }
756}
757
758impl control::Host for ComponentState {
759 fn should_cancel(&mut self) -> bool {
760 false
761 }
762
763 fn yield_now(&mut self) {
764 }
766}
767
768fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
769 let mut inst = linker.instance("greentic:component/control@0.4.0")?;
770 inst.func_wrap(
771 "should-cancel",
772 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
773 let host = caller.data_mut();
774 Ok((ComponentControlHost::should_cancel(host),))
775 },
776 )?;
777 inst.func_wrap(
778 "yield-now",
779 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
780 let host = caller.data_mut();
781 ComponentControlHost::yield_now(host);
782 Ok(())
783 },
784 )?;
785 Ok(())
786}
787
788pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
789 add_wasi_to_linker(linker)?;
790 add_all_v1_to_linker(
791 linker,
792 HostFns {
793 http_client_v1_1: Some(|state| state.host_mut()),
794 http_client: Some(|state| state.host_mut()),
795 oauth_broker: None,
796 runner_host_http: Some(|state| state.host_mut()),
797 runner_host_kv: Some(|state| state.host_mut()),
798 messaging_session: Some(|state| state.host_mut()),
799 telemetry_logger: Some(|state| state.host_mut()),
800 state_store: Some(|state| state.host_mut()),
801 secrets_store: Some(|state| state.host_mut()),
802 },
803 )?;
804 Ok(())
805}
806
807impl OAuthHostContext for ComponentState {
808 fn tenant_id(&self) -> &str {
809 &self.host.config.tenant
810 }
811
812 fn env(&self) -> &str {
813 &self.host.default_env
814 }
815
816 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
817 &mut self.host.oauth_host
818 }
819
820 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
821 self.host.oauth_config.as_ref()
822 }
823}
824
825impl WasiView for ComponentState {
826 fn ctx(&mut self) -> WasiCtxView<'_> {
827 WasiCtxView {
828 ctx: &mut self.wasi_ctx,
829 table: &mut self.resource_table,
830 }
831 }
832}
833
834#[allow(unsafe_code)]
835unsafe impl Send for ComponentState {}
836#[allow(unsafe_code)]
837unsafe impl Sync for ComponentState {}
838
839impl PackRuntime {
840 #[allow(clippy::too_many_arguments)]
841 pub async fn load(
842 path: impl AsRef<Path>,
843 config: Arc<HostConfig>,
844 mocks: Option<Arc<MockLayer>>,
845 archive_source: Option<&Path>,
846 session_store: Option<DynSessionStore>,
847 state_store: Option<DynStateStore>,
848 wasi_policy: Arc<RunnerWasiPolicy>,
849 secrets: DynSecretsManager,
850 oauth_config: Option<OAuthBrokerConfig>,
851 verify_archive: bool,
852 ) -> Result<Self> {
853 let path = path.as_ref();
854 let (_pack_root, safe_path) = normalize_pack_path(path)?;
855 let is_component = safe_path
856 .extension()
857 .and_then(|ext| ext.to_str())
858 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
859 .unwrap_or(false);
860 let archive_hint_path = if let Some(source) = archive_source {
861 let (_, normalized) = normalize_pack_path(source)?;
862 Some(normalized)
863 } else if is_component {
864 None
865 } else {
866 Some(safe_path.clone())
867 };
868 let archive_hint = archive_hint_path.as_deref();
869 if verify_archive {
870 let verify_target = archive_hint.unwrap_or(&safe_path);
871 verify::verify_pack(verify_target).await?;
872 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
873 }
874 let engine = Engine::default();
875 let wasm_bytes = fs::read(&safe_path).await?;
876 let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
877 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
878 let mut manifest = None;
879 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
880 let flows = if let Some(archive_path) = archive_hint {
881 match load_manifest_and_flows(archive_path) {
882 Ok(ManifestLoad::New {
883 manifest: m,
884 flows: cache,
885 }) => {
886 metadata = cache.metadata.clone();
887 manifest = Some(*m);
888 Some(cache)
889 }
890 Ok(ManifestLoad::Legacy {
891 manifest: m,
892 flows: cache,
893 }) => {
894 metadata = cache.metadata.clone();
895 legacy_manifest = Some(m);
896 Some(cache)
897 }
898 Err(err) => {
899 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
900 None
901 }
902 }
903 } else {
904 None
905 };
906 let components = if let Some(archive_path) = archive_hint {
907 if let Some(new_manifest) = manifest.as_ref() {
908 match load_components_from_archive(&engine, archive_path, Some(new_manifest)) {
909 Ok(map) => map,
910 Err(err) => {
911 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
912 HashMap::new()
913 }
914 }
915 } else if let Some(legacy) = legacy_manifest.as_ref() {
916 match load_legacy_components_from_archive(&engine, archive_path, legacy) {
917 Ok(map) => map,
918 Err(err) => {
919 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
920 HashMap::new()
921 }
922 }
923 } else {
924 HashMap::new()
925 }
926 } else if is_component {
927 let name = safe_path
928 .file_stem()
929 .map(|s| s.to_string_lossy().to_string())
930 .unwrap_or_else(|| "component".to_string());
931 let component = Component::from_binary(&engine, &wasm_bytes)?;
932 let mut map = HashMap::new();
933 map.insert(
934 name.clone(),
935 PackComponent {
936 name,
937 version: metadata.version.clone(),
938 component,
939 },
940 );
941 map
942 } else {
943 HashMap::new()
944 };
945 let http_client = Arc::clone(&HTTP_CLIENT);
946 Ok(Self {
947 path: safe_path,
948 archive_path: archive_hint.map(Path::to_path_buf),
949 config,
950 engine,
951 metadata,
952 manifest,
953 legacy_manifest,
954 mocks,
955 flows,
956 components,
957 http_client,
958 pre_cache: Mutex::new(HashMap::new()),
959 session_store,
960 state_store,
961 wasi_policy,
962 provider_registry: RwLock::new(None),
963 secrets,
964 oauth_config,
965 })
966 }
967
968 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
969 if let Some(cache) = &self.flows {
970 return Ok(cache.descriptors.clone());
971 }
972 if let Some(manifest) = &self.manifest {
973 let descriptors = manifest
974 .flows
975 .iter()
976 .map(|flow| FlowDescriptor {
977 id: flow.id.as_str().to_string(),
978 flow_type: flow_kind_to_str(flow.kind).to_string(),
979 profile: manifest.pack_id.as_str().to_string(),
980 version: manifest.version.to_string(),
981 description: None,
982 })
983 .collect();
984 return Ok(descriptors);
985 }
986 Ok(Vec::new())
987 }
988
989 #[allow(dead_code)]
990 pub async fn run_flow(
991 &self,
992 flow_id: &str,
993 input: serde_json::Value,
994 ) -> Result<serde_json::Value> {
995 let pack = Arc::new(
996 PackRuntime::load(
997 &self.path,
998 Arc::clone(&self.config),
999 self.mocks.clone(),
1000 self.archive_path.as_deref(),
1001 self.session_store.clone(),
1002 self.state_store.clone(),
1003 Arc::clone(&self.wasi_policy),
1004 self.secrets.clone(),
1005 self.oauth_config.clone(),
1006 false,
1007 )
1008 .await?,
1009 );
1010
1011 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1012 let retry_config = self.config.retry_config().into();
1013 let mocks = pack.mocks.as_deref();
1014 let tenant = self.config.tenant.as_str();
1015
1016 let ctx = FlowContext {
1017 tenant,
1018 flow_id,
1019 node_id: None,
1020 tool: None,
1021 action: None,
1022 session_id: None,
1023 provider_id: None,
1024 retry_config,
1025 observer: None,
1026 mocks,
1027 };
1028
1029 let execution = engine.execute(ctx, input).await?;
1030 match execution.status {
1031 FlowStatus::Completed => Ok(execution.output),
1032 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1033 "status": "pending",
1034 "reason": wait.reason,
1035 "resume": wait.snapshot,
1036 "response": execution.output,
1037 })),
1038 }
1039 }
1040
1041 pub async fn invoke_component(
1042 &self,
1043 component_ref: &str,
1044 ctx: ComponentExecCtx,
1045 operation: &str,
1046 _config_json: Option<String>,
1047 input_json: String,
1048 ) -> Result<Value> {
1049 let pack_component = self
1050 .components
1051 .get(component_ref)
1052 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1053
1054 let mut linker = Linker::new(&self.engine);
1055 register_all(&mut linker)?;
1056 add_component_control_to_linker(&mut linker)?;
1057 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1058 let pre: ComponentPre<ComponentState> = ComponentPre::new(pre_instance)?;
1059
1060 let host_state = HostState::new(
1061 Arc::clone(&self.config),
1062 Arc::clone(&self.http_client),
1063 self.mocks.clone(),
1064 self.session_store.clone(),
1065 self.state_store.clone(),
1066 Arc::clone(&self.secrets),
1067 self.oauth_config.clone(),
1068 )?;
1069 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1070 let mut store = wasmtime::Store::new(&self.engine, store_state);
1071 let bindings: crate::component_api::Component = pre.instantiate_async(&mut store).await?;
1072 let node = bindings.greentic_component_node();
1073
1074 let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1075
1076 match result {
1077 InvokeResult::Ok(body) => {
1078 if body.is_empty() {
1079 return Ok(Value::Null);
1080 }
1081 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1082 }
1083 InvokeResult::Err(NodeError {
1084 code,
1085 message,
1086 retryable,
1087 backoff_ms,
1088 details,
1089 }) => {
1090 let mut obj = serde_json::Map::new();
1091 obj.insert("ok".into(), Value::Bool(false));
1092 let mut error = serde_json::Map::new();
1093 error.insert("code".into(), Value::String(code));
1094 error.insert("message".into(), Value::String(message));
1095 error.insert("retryable".into(), Value::Bool(retryable));
1096 if let Some(backoff) = backoff_ms {
1097 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1098 }
1099 if let Some(details) = details {
1100 error.insert(
1101 "details".into(),
1102 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1103 );
1104 }
1105 obj.insert("error".into(), Value::Object(error));
1106 Ok(Value::Object(obj))
1107 }
1108 }
1109 }
1110
1111 pub fn resolve_provider(
1112 &self,
1113 provider_id: Option<&str>,
1114 provider_type: Option<&str>,
1115 ) -> Result<ProviderBinding> {
1116 let registry = self.provider_registry()?;
1117 registry.resolve(provider_id, provider_type)
1118 }
1119
1120 pub async fn invoke_provider(
1121 &self,
1122 binding: &ProviderBinding,
1123 _ctx: ComponentExecCtx,
1124 op: &str,
1125 input_json: Vec<u8>,
1126 ) -> Result<Value> {
1127 let component_ref = &binding.component_ref;
1128 let pack_component = self
1129 .components
1130 .get(component_ref)
1131 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1132
1133 let mut linker = Linker::new(&self.engine);
1134 register_all(&mut linker)?;
1135 add_component_control_to_linker(&mut linker)?;
1136 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1137 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1138
1139 let host_state = HostState::new(
1140 Arc::clone(&self.config),
1141 Arc::clone(&self.http_client),
1142 self.mocks.clone(),
1143 self.session_store.clone(),
1144 self.state_store.clone(),
1145 Arc::clone(&self.secrets),
1146 self.oauth_config.clone(),
1147 )?;
1148 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1149 let mut store = wasmtime::Store::new(&self.engine, store_state);
1150 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1151 let provider = bindings.greentic_provider_core_schema_core_api();
1152
1153 let result = provider.call_invoke(&mut store, op, &input_json)?;
1154 deserialize_json_bytes(result)
1155 }
1156
1157 fn provider_registry(&self) -> Result<ProviderRegistry> {
1158 if let Some(registry) = self.provider_registry.read().clone() {
1159 return Ok(registry);
1160 }
1161 let manifest = self
1162 .manifest
1163 .as_ref()
1164 .context("pack manifest required for provider resolution")?;
1165 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1166 let registry = ProviderRegistry::new(
1167 manifest,
1168 self.state_store.clone(),
1169 &self.config.tenant,
1170 &env,
1171 )?;
1172 *self.provider_registry.write() = Some(registry.clone());
1173 Ok(registry)
1174 }
1175
1176 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1177 if let Some(cache) = &self.flows {
1178 return cache
1179 .flows
1180 .get(flow_id)
1181 .cloned()
1182 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1183 }
1184 if let Some(manifest) = &self.manifest {
1185 let entry = manifest
1186 .flows
1187 .iter()
1188 .find(|f| f.id.as_str() == flow_id)
1189 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1190 return Ok(entry.flow.clone());
1191 }
1192 bail!("flow '{flow_id}' not available (pack exports disabled)")
1193 }
1194
1195 pub fn metadata(&self) -> &PackMetadata {
1196 &self.metadata
1197 }
1198
1199 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1200 &self.metadata.secret_requirements
1201 }
1202
1203 pub fn missing_secrets(
1204 &self,
1205 tenant_ctx: &TypesTenantCtx,
1206 ) -> Vec<greentic_types::SecretRequirement> {
1207 let env = tenant_ctx.env.as_str().to_string();
1208 let tenant = tenant_ctx.tenant.as_str().to_string();
1209 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1210 self.required_secrets()
1211 .iter()
1212 .filter(|req| {
1213 if let Some(scope) = &req.scope {
1215 if scope.env != env {
1216 return false;
1217 }
1218 if scope.tenant != tenant {
1219 return false;
1220 }
1221 if let Some(ref team_req) = scope.team
1222 && team.as_ref() != Some(team_req)
1223 {
1224 return false;
1225 }
1226 }
1227 read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1228 })
1229 .cloned()
1230 .collect()
1231 }
1232
1233 pub fn for_component_test(
1234 components: Vec<(String, PathBuf)>,
1235 flows: HashMap<String, FlowIR>,
1236 config: Arc<HostConfig>,
1237 ) -> Result<Self> {
1238 let engine = Engine::default();
1239 let mut component_map = HashMap::new();
1240 for (name, path) in components {
1241 if !path.exists() {
1242 bail!("component artifact missing: {}", path.display());
1243 }
1244 let wasm_bytes = std::fs::read(&path)?;
1245 let component = Component::from_binary(&engine, &wasm_bytes)
1246 .with_context(|| format!("failed to compile component {}", path.display()))?;
1247 component_map.insert(
1248 name.clone(),
1249 PackComponent {
1250 name,
1251 version: "0.0.0".into(),
1252 component,
1253 },
1254 );
1255 }
1256
1257 let mut flow_map = HashMap::new();
1258 let mut descriptors = Vec::new();
1259 for (id, ir) in flows {
1260 let flow_type = ir.flow_type.clone();
1261 let flow = flow_ir_to_flow(ir)?;
1262 flow_map.insert(id.clone(), flow);
1263 descriptors.push(FlowDescriptor {
1264 id: id.clone(),
1265 flow_type,
1266 profile: "test".into(),
1267 version: "0.0.0".into(),
1268 description: None,
1269 });
1270 }
1271 let flows_cache = PackFlows {
1272 descriptors: descriptors.clone(),
1273 flows: flow_map,
1274 metadata: PackMetadata::fallback(Path::new("component-test")),
1275 };
1276
1277 Ok(Self {
1278 path: PathBuf::new(),
1279 archive_path: None,
1280 config,
1281 engine,
1282 metadata: PackMetadata::fallback(Path::new("component-test")),
1283 manifest: None,
1284 legacy_manifest: None,
1285 mocks: None,
1286 flows: Some(flows_cache),
1287 components: component_map,
1288 http_client: Arc::clone(&HTTP_CLIENT),
1289 pre_cache: Mutex::new(HashMap::new()),
1290 session_store: None,
1291 state_store: None,
1292 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1293 provider_registry: RwLock::new(None),
1294 secrets: crate::secrets::default_manager(),
1295 oauth_config: None,
1296 })
1297 }
1298}
1299
1300struct PackFlows {
1301 descriptors: Vec<FlowDescriptor>,
1302 flows: HashMap<String, Flow>,
1303 metadata: PackMetadata,
1304}
1305
1306fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1307 if bytes.is_empty() {
1308 return Ok(Value::Null);
1309 }
1310 serde_json::from_slice(&bytes).or_else(|_| {
1311 String::from_utf8(bytes)
1312 .map(Value::String)
1313 .map_err(|err| anyhow!(err))
1314 })
1315}
1316
1317impl PackFlows {
1318 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1319 let descriptors = manifest
1320 .flows
1321 .iter()
1322 .map(|entry| FlowDescriptor {
1323 id: entry.id.as_str().to_string(),
1324 flow_type: flow_kind_to_str(entry.kind).to_string(),
1325 profile: manifest.pack_id.as_str().to_string(),
1326 version: manifest.version.to_string(),
1327 description: None,
1328 })
1329 .collect();
1330 let mut flows = HashMap::new();
1331 for entry in &manifest.flows {
1332 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1333 }
1334 Self {
1335 metadata: PackMetadata::from_manifest(&manifest),
1336 descriptors,
1337 flows,
1338 }
1339 }
1340}
1341
1342fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1343 match kind {
1344 greentic_types::FlowKind::Messaging => "messaging",
1345 greentic_types::FlowKind::Event => "event",
1346 greentic_types::FlowKind::ComponentConfig => "component-config",
1347 greentic_types::FlowKind::Job => "job",
1348 greentic_types::FlowKind::Http => "http",
1349 }
1350}
1351
1352fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1353 let mut file = archive
1354 .by_name(name)
1355 .with_context(|| format!("entry {name} missing from archive"))?;
1356 let mut buf = Vec::new();
1357 file.read_to_end(&mut buf)?;
1358 Ok(buf)
1359}
1360
1361fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1362 for node in doc.nodes.values_mut() {
1363 if node.component.is_empty()
1364 && let Some((component_ref, payload)) = node.raw.iter().next()
1365 {
1366 if component_ref.starts_with("emit.") {
1367 node.component = component_ref.clone();
1368 node.payload = payload.clone();
1369 node.raw.clear();
1370 continue;
1371 }
1372 let (target_component, operation, input, config) =
1373 infer_component_exec(payload, component_ref);
1374 let mut payload_obj = serde_json::Map::new();
1375 payload_obj.insert("component".into(), Value::String(target_component));
1377 payload_obj.insert("operation".into(), Value::String(operation));
1378 payload_obj.insert("input".into(), input);
1379 if let Some(cfg) = config {
1380 payload_obj.insert("config".into(), cfg);
1381 }
1382 node.component = "component.exec".to_string();
1383 node.payload = Value::Object(payload_obj);
1384 }
1385 }
1386 doc
1387}
1388
1389fn infer_component_exec(
1390 payload: &Value,
1391 component_ref: &str,
1392) -> (String, String, Value, Option<Value>) {
1393 let default_op = if component_ref.starts_with("templating.") {
1394 "render"
1395 } else {
1396 "invoke"
1397 }
1398 .to_string();
1399
1400 if let Value::Object(map) = payload {
1401 let op = map
1402 .get("op")
1403 .or_else(|| map.get("operation"))
1404 .and_then(Value::as_str)
1405 .map(|s| s.to_string())
1406 .unwrap_or_else(|| default_op.clone());
1407
1408 let mut input = map.clone();
1409 let config = input.remove("config");
1410 let component = input
1411 .get("component")
1412 .or_else(|| input.get("component_ref"))
1413 .and_then(Value::as_str)
1414 .map(|s| s.to_string())
1415 .unwrap_or_else(|| component_ref.to_string());
1416 input.remove("component");
1417 input.remove("component_ref");
1418 input.remove("op");
1419 input.remove("operation");
1420 return (component, op, Value::Object(input), config);
1421 }
1422
1423 (component_ref.to_string(), default_op, payload.clone(), None)
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428 use super::*;
1429 use greentic_flow::model::{FlowDoc, NodeDoc};
1430 use serde_json::json;
1431 use std::collections::BTreeMap;
1432
1433 #[test]
1434 fn normalizes_raw_component_to_component_exec() {
1435 let mut nodes = BTreeMap::new();
1436 let mut raw = BTreeMap::new();
1437 raw.insert(
1438 "templating.handlebars".into(),
1439 json!({ "template": "Hi {{name}}" }),
1440 );
1441 nodes.insert(
1442 "start".into(),
1443 NodeDoc {
1444 raw,
1445 routing: json!([{"out": true}]),
1446 ..Default::default()
1447 },
1448 );
1449 let doc = FlowDoc {
1450 id: "welcome".into(),
1451 title: None,
1452 description: None,
1453 flow_type: "messaging".into(),
1454 start: Some("start".into()),
1455 parameters: json!({}),
1456 tags: Vec::new(),
1457 entrypoints: BTreeMap::new(),
1458 nodes,
1459 };
1460
1461 let normalized = normalize_flow_doc(doc);
1462 let node = normalized.nodes.get("start").expect("node exists");
1463 assert_eq!(node.component, "component.exec");
1464 assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1465 let payload = node.payload.as_object().expect("payload object");
1466 assert_eq!(
1467 payload.get("component"),
1468 Some(&Value::String("templating.handlebars".into()))
1469 );
1470 assert_eq!(
1471 payload.get("operation"),
1472 Some(&Value::String("render".into()))
1473 );
1474 let input = payload.get("input").unwrap();
1475 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1476 }
1477}
1478
1479fn load_components_from_archive(
1480 engine: &Engine,
1481 path: &Path,
1482 manifest: Option<&greentic_types::PackManifest>,
1483) -> Result<HashMap<String, PackComponent>> {
1484 let mut archive = ZipArchive::new(File::open(path)?)
1485 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1486 let mut components = HashMap::new();
1487 if let Some(manifest) = manifest {
1488 for entry in &manifest.components {
1489 let file_name = format!("components/{}.wasm", entry.id.as_str());
1490 let bytes = read_entry(&mut archive, &file_name)
1491 .with_context(|| format!("missing component {}", file_name))?;
1492 let component = Component::from_binary(engine, &bytes)
1493 .with_context(|| format!("failed to compile component {}", entry.id.as_str()))?;
1494 components.insert(
1495 entry.id.as_str().to_string(),
1496 PackComponent {
1497 name: entry.id.as_str().to_string(),
1498 version: entry.version.to_string(),
1499 component,
1500 },
1501 );
1502 }
1503 }
1504 Ok(components)
1505}
1506
1507fn load_legacy_components_from_archive(
1508 engine: &Engine,
1509 path: &Path,
1510 manifest: &legacy_pack::PackManifest,
1511) -> Result<HashMap<String, PackComponent>> {
1512 let mut archive = ZipArchive::new(File::open(path)?)
1513 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1514 let mut components = HashMap::new();
1515 for entry in &manifest.components {
1516 let bytes = read_entry(&mut archive, &entry.file_wasm)
1517 .with_context(|| format!("missing component {}", entry.file_wasm))?;
1518 let component = Component::from_binary(engine, &bytes)
1519 .with_context(|| format!("failed to compile component {}", entry.name))?;
1520 components.insert(
1521 entry.name.clone(),
1522 PackComponent {
1523 name: entry.name.clone(),
1524 version: entry.version.to_string(),
1525 component,
1526 },
1527 );
1528 }
1529 Ok(components)
1530}
1531
1532#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1533pub struct PackMetadata {
1534 pub pack_id: String,
1535 pub version: String,
1536 #[serde(default)]
1537 pub entry_flows: Vec<String>,
1538 #[serde(default)]
1539 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
1540}
1541
1542impl PackMetadata {
1543 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1544 let parser = Parser::new(0);
1545 for payload in parser.parse_all(bytes) {
1546 let payload = payload.ok()?;
1547 match payload {
1548 Payload::CustomSection(section) => {
1549 if section.name() == "greentic.manifest"
1550 && let Ok(meta) = Self::from_bytes(section.data())
1551 {
1552 return Some(meta);
1553 }
1554 }
1555 Payload::DataSection(reader) => {
1556 for segment in reader.into_iter().flatten() {
1557 if let Ok(meta) = Self::from_bytes(segment.data) {
1558 return Some(meta);
1559 }
1560 }
1561 }
1562 _ => {}
1563 }
1564 }
1565 None
1566 }
1567
1568 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1569 #[derive(Deserialize)]
1570 struct RawManifest {
1571 pack_id: String,
1572 version: String,
1573 #[serde(default)]
1574 entry_flows: Vec<String>,
1575 #[serde(default)]
1576 flows: Vec<RawFlow>,
1577 #[serde(default)]
1578 secret_requirements: Vec<greentic_types::SecretRequirement>,
1579 }
1580
1581 #[derive(Deserialize)]
1582 struct RawFlow {
1583 id: String,
1584 }
1585
1586 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1587 let mut entry_flows = if manifest.entry_flows.is_empty() {
1588 manifest.flows.iter().map(|f| f.id.clone()).collect()
1589 } else {
1590 manifest.entry_flows.clone()
1591 };
1592 entry_flows.retain(|id| !id.is_empty());
1593 Ok(Self {
1594 pack_id: manifest.pack_id,
1595 version: manifest.version,
1596 entry_flows,
1597 secret_requirements: manifest.secret_requirements,
1598 })
1599 }
1600
1601 pub fn fallback(path: &Path) -> Self {
1602 let pack_id = path
1603 .file_stem()
1604 .map(|s| s.to_string_lossy().into_owned())
1605 .unwrap_or_else(|| "unknown-pack".to_string());
1606 Self {
1607 pack_id,
1608 version: "0.0.0".to_string(),
1609 entry_flows: Vec::new(),
1610 secret_requirements: Vec::new(),
1611 }
1612 }
1613
1614 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
1615 let entry_flows = manifest
1616 .flows
1617 .iter()
1618 .map(|flow| flow.id.as_str().to_string())
1619 .collect::<Vec<_>>();
1620 Self {
1621 pack_id: manifest.pack_id.as_str().to_string(),
1622 version: manifest.version.to_string(),
1623 entry_flows,
1624 secret_requirements: manifest.secret_requirements.clone(),
1625 }
1626 }
1627}