1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11 ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20 self as host_v1, HostFns, add_all_v1_to_linker,
21 runner_host_http::RunnerHostHttp,
22 runner_host_kv::RunnerHostKv,
23 secrets_store::{SecretsError, SecretsStoreHost},
24 state_store::{
25 OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26 StateStoreHost, TenantCtx as StateTenantCtx,
27 },
28 telemetry_logger::{
29 OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30 TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31 TenantCtx as TelemetryTenantCtx,
32 },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::{
38 EnvId, Flow, StateKey as StoreStateKey, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
39 decode_pack_manifest,
40};
41use host_v1::http_client::{
42 HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
43 Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
44 RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
45 TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
46};
47use once_cell::sync::Lazy;
48use parking_lot::{Mutex, RwLock};
49use reqwest::blocking::Client as BlockingClient;
50use runner_core::normalize_under_root;
51use serde::{Deserialize, Serialize};
52use serde_cbor;
53use serde_json::{self, Value};
54use tokio::fs;
55use wasmparser::{Parser, Payload};
56use wasmtime::StoreContextMut;
57use zip::ZipArchive;
58
59use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
60use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
61use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
62
63use crate::config::HostConfig;
64use crate::secrets::{DynSecretsManager, read_secret_blocking};
65use crate::storage::state::STATE_PREFIX;
66use crate::storage::{DynSessionStore, DynStateStore};
67use crate::verify;
68use crate::wasi::RunnerWasiPolicy;
69use tracing::warn;
70use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
71use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
72
73use greentic_flow::model::FlowDoc;
74
75#[allow(dead_code)]
76pub struct PackRuntime {
77 path: PathBuf,
79 archive_path: Option<PathBuf>,
81 config: Arc<HostConfig>,
82 engine: Engine,
83 metadata: PackMetadata,
84 manifest: Option<greentic_types::PackManifest>,
85 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
86 mocks: Option<Arc<MockLayer>>,
87 flows: Option<PackFlows>,
88 components: HashMap<String, PackComponent>,
89 http_client: Arc<BlockingClient>,
90 pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
91 session_store: Option<DynSessionStore>,
92 state_store: Option<DynStateStore>,
93 wasi_policy: Arc<RunnerWasiPolicy>,
94 provider_registry: RwLock<Option<ProviderRegistry>>,
95 secrets: DynSecretsManager,
96 oauth_config: Option<OAuthBrokerConfig>,
97}
98
99struct PackComponent {
100 #[allow(dead_code)]
101 name: String,
102 #[allow(dead_code)]
103 version: String,
104 component: Component,
105}
106
107fn build_blocking_client() -> BlockingClient {
108 std::thread::spawn(|| {
109 BlockingClient::builder()
110 .no_proxy()
111 .build()
112 .expect("blocking client")
113 })
114 .join()
115 .expect("client build thread panicked")
116}
117
118fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
119 let (root, candidate) = if path.is_absolute() {
120 let parent = path
121 .parent()
122 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
123 let root = parent
124 .canonicalize()
125 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
126 let file = path
127 .file_name()
128 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
129 (root, PathBuf::from(file))
130 } else {
131 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
132 let base = if let Some(parent) = path.parent() {
133 cwd.join(parent)
134 } else {
135 cwd
136 };
137 let root = base
138 .canonicalize()
139 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
140 let file = path
141 .file_name()
142 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
143 (root, PathBuf::from(file))
144 };
145 let safe = normalize_under_root(&root, &candidate)?;
146 Ok((root, safe))
147}
148
149static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct FlowDescriptor {
153 pub id: String,
154 #[serde(rename = "type")]
155 pub flow_type: String,
156 pub profile: String,
157 pub version: String,
158 #[serde(default)]
159 pub description: Option<String>,
160}
161
162pub struct HostState {
163 config: Arc<HostConfig>,
164 http_client: Arc<BlockingClient>,
165 default_env: String,
166 #[allow(dead_code)]
167 session_store: Option<DynSessionStore>,
168 state_store: Option<DynStateStore>,
169 mocks: Option<Arc<MockLayer>>,
170 secrets: DynSecretsManager,
171 oauth_config: Option<OAuthBrokerConfig>,
172 oauth_host: OAuthBrokerHost,
173}
174
175impl HostState {
176 #[allow(clippy::default_constructed_unit_structs)]
177 pub fn new(
178 config: Arc<HostConfig>,
179 http_client: Arc<BlockingClient>,
180 mocks: Option<Arc<MockLayer>>,
181 session_store: Option<DynSessionStore>,
182 state_store: Option<DynStateStore>,
183 secrets: DynSecretsManager,
184 oauth_config: Option<OAuthBrokerConfig>,
185 ) -> Result<Self> {
186 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
187 Ok(Self {
188 config,
189 http_client,
190 default_env,
191 session_store,
192 state_store,
193 mocks,
194 secrets,
195 oauth_config,
196 oauth_host: OAuthBrokerHost::default(),
197 })
198 }
199
200 pub fn get_secret(&self, key: &str) -> Result<String> {
201 if provider_core_only::is_enabled() {
202 bail!(provider_core_only::blocked_message("secrets"))
203 }
204 if !self.config.secrets_policy.is_allowed(key) {
205 bail!("secret {key} is not permitted by bindings policy");
206 }
207 if let Some(mock) = &self.mocks
208 && let Some(value) = mock.secrets_lookup(key)
209 {
210 return Ok(value);
211 }
212 let bytes = read_secret_blocking(&self.secrets, key)
213 .context("failed to read secret from manager")?;
214 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
215 Ok(value)
216 }
217
218 fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
219 let tenant_raw = ctx
220 .as_ref()
221 .map(|ctx| ctx.tenant.clone())
222 .unwrap_or_else(|| self.config.tenant.clone());
223 let env_raw = ctx
224 .as_ref()
225 .map(|ctx| ctx.env.clone())
226 .unwrap_or_else(|| self.default_env.clone());
227 let tenant_id = TenantId::from_str(&tenant_raw)
228 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
229 let env_id = EnvId::from_str(&env_raw)
230 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
231 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
232 if let Some(ctx) = ctx {
233 if let Some(team) = ctx.team.or(ctx.team_id) {
234 let team_id =
235 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
236 tenant_ctx = tenant_ctx.with_team(Some(team_id));
237 }
238 if let Some(user) = ctx.user.or(ctx.user_id) {
239 let user_id =
240 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
241 tenant_ctx = tenant_ctx.with_user(Some(user_id));
242 }
243 if let Some(flow) = ctx.flow_id {
244 tenant_ctx = tenant_ctx.with_flow(flow);
245 }
246 if let Some(node) = ctx.node_id {
247 tenant_ctx = tenant_ctx.with_node(node);
248 }
249 if let Some(provider) = ctx.provider_id {
250 tenant_ctx = tenant_ctx.with_provider(provider);
251 }
252 if let Some(session) = ctx.session_id {
253 tenant_ctx = tenant_ctx.with_session(session);
254 }
255 tenant_ctx.trace_id = ctx.trace_id;
256 }
257 Ok(tenant_ctx)
258 }
259
260 fn send_http_request(
261 &mut self,
262 req: HttpRequest,
263 opts: Option<HttpRequestOptionsV1_1>,
264 _ctx: Option<HttpTenantCtx>,
265 ) -> Result<HttpResponse, HttpClientError> {
266 if !self.config.http_enabled {
267 return Err(HttpClientError {
268 code: "denied".into(),
269 message: "http client disabled by policy".into(),
270 });
271 }
272
273 let mut mock_state = None;
274 let raw_body = req.body.clone();
275 if let Some(mock) = &self.mocks
276 && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
277 {
278 match mock.http_begin(&meta) {
279 HttpDecision::Mock(response) => {
280 let headers = response
281 .headers
282 .iter()
283 .map(|(k, v)| (k.clone(), v.clone()))
284 .collect();
285 return Ok(HttpResponse {
286 status: response.status,
287 headers,
288 body: response.body.clone().map(|b| b.into_bytes()),
289 });
290 }
291 HttpDecision::Deny(reason) => {
292 return Err(HttpClientError {
293 code: "denied".into(),
294 message: reason,
295 });
296 }
297 HttpDecision::Passthrough { record } => {
298 mock_state = Some((meta, record));
299 }
300 }
301 }
302
303 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
304 let mut builder = self.http_client.request(method, &req.url);
305 for (key, value) in req.headers {
306 if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
307 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
308 {
309 builder = builder.header(header, header_value);
310 }
311 }
312
313 if let Some(body) = raw_body.clone() {
314 builder = builder.body(body);
315 }
316
317 if let Some(opts) = opts {
318 if let Some(timeout_ms) = opts.timeout_ms {
319 builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
320 }
321 if opts.allow_insecure == Some(true) {
322 warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
323 }
324 if let Some(follow_redirects) = opts.follow_redirects
325 && !follow_redirects
326 {
327 warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
328 }
329 }
330
331 let response = match builder.send() {
332 Ok(resp) => resp,
333 Err(err) => {
334 warn!(url = %req.url, error = %err, "http client request failed");
335 return Err(HttpClientError {
336 code: "unavailable".into(),
337 message: err.to_string(),
338 });
339 }
340 };
341
342 let status = response.status().as_u16();
343 let headers_vec = response
344 .headers()
345 .iter()
346 .map(|(k, v)| {
347 (
348 k.as_str().to_string(),
349 v.to_str().unwrap_or_default().to_string(),
350 )
351 })
352 .collect::<Vec<_>>();
353 let body_bytes = response.bytes().ok().map(|b| b.to_vec());
354
355 if let Some((meta, true)) = mock_state.take()
356 && let Some(mock) = &self.mocks
357 {
358 let recorded = HttpMockResponse::new(
359 status,
360 headers_vec.clone().into_iter().collect(),
361 body_bytes
362 .as_ref()
363 .map(|b| String::from_utf8_lossy(b).into_owned()),
364 );
365 mock.http_record(&meta, &recorded);
366 }
367
368 Ok(HttpResponse {
369 status,
370 headers: headers_vec,
371 body: body_bytes,
372 })
373 }
374}
375
376impl SecretsStoreHost for HostState {
377 fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
378 if provider_core_only::is_enabled() {
379 warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
380 return Err(SecretsError::Denied);
381 }
382 if !self.config.secrets_policy.is_allowed(&key) {
383 return Err(SecretsError::Denied);
384 }
385 if let Some(mock) = &self.mocks
386 && let Some(value) = mock.secrets_lookup(&key)
387 {
388 return Ok(Some(value.into_bytes()));
389 }
390 match read_secret_blocking(&self.secrets, &key) {
391 Ok(bytes) => Ok(Some(bytes)),
392 Err(err) => {
393 warn!(secret = %key, error = %err, "secret lookup failed");
394 Err(SecretsError::NotFound)
395 }
396 }
397 }
398}
399
400impl HttpClientHost for HostState {
401 fn send(
402 &mut self,
403 req: HttpRequest,
404 ctx: Option<HttpTenantCtx>,
405 ) -> Result<HttpResponse, HttpClientError> {
406 self.send_http_request(req, None, ctx)
407 }
408}
409
410impl HttpClientHostV1_1 for HostState {
411 fn send(
412 &mut self,
413 req: HttpRequestV1_1,
414 opts: Option<HttpRequestOptionsV1_1>,
415 ctx: Option<HttpTenantCtxV1_1>,
416 ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
417 let legacy_req = HttpRequest {
418 method: req.method,
419 url: req.url,
420 headers: req.headers,
421 body: req.body,
422 };
423 let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
424 env: ctx.env,
425 tenant: ctx.tenant,
426 tenant_id: ctx.tenant_id,
427 team: ctx.team,
428 team_id: ctx.team_id,
429 user: ctx.user,
430 user_id: ctx.user_id,
431 trace_id: ctx.trace_id,
432 correlation_id: ctx.correlation_id,
433 attributes: ctx.attributes,
434 session_id: ctx.session_id,
435 flow_id: ctx.flow_id,
436 node_id: ctx.node_id,
437 provider_id: ctx.provider_id,
438 deadline_ms: ctx.deadline_ms,
439 attempt: ctx.attempt,
440 idempotency_key: ctx.idempotency_key,
441 impersonation: ctx
442 .impersonation
443 .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
444 actor_id,
445 reason,
446 }),
447 });
448
449 self.send_http_request(legacy_req, opts, legacy_ctx)
450 .map(|resp| HttpResponseV1_1 {
451 status: resp.status,
452 headers: resp.headers,
453 body: resp.body,
454 })
455 .map_err(|err| HttpClientErrorV1_1 {
456 code: err.code,
457 message: err.message,
458 })
459 }
460}
461
462impl StateStoreHost for HostState {
463 fn read(
464 &mut self,
465 key: HostStateKey,
466 ctx: Option<StateTenantCtx>,
467 ) -> Result<Vec<u8>, StateError> {
468 let store = match self.state_store.as_ref() {
469 Some(store) => store.clone(),
470 None => {
471 return Err(StateError {
472 code: "unavailable".into(),
473 message: "state store not configured".into(),
474 });
475 }
476 };
477 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
478 Ok(ctx) => ctx,
479 Err(err) => {
480 return Err(StateError {
481 code: "invalid-ctx".into(),
482 message: err.to_string(),
483 });
484 }
485 };
486 let key = StoreStateKey::from(key);
487 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
488 Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
489 Ok(None) => Err(StateError {
490 code: "not_found".into(),
491 message: "state key not found".into(),
492 }),
493 Err(err) => Err(StateError {
494 code: "internal".into(),
495 message: err.to_string(),
496 }),
497 }
498 }
499
500 fn write(
501 &mut self,
502 key: HostStateKey,
503 bytes: Vec<u8>,
504 ctx: Option<StateTenantCtx>,
505 ) -> Result<StateOpAck, StateError> {
506 let store = match self.state_store.as_ref() {
507 Some(store) => store.clone(),
508 None => {
509 return Err(StateError {
510 code: "unavailable".into(),
511 message: "state store not configured".into(),
512 });
513 }
514 };
515 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
516 Ok(ctx) => ctx,
517 Err(err) => {
518 return Err(StateError {
519 code: "invalid-ctx".into(),
520 message: err.to_string(),
521 });
522 }
523 };
524 let key = StoreStateKey::from(key);
525 let value = serde_json::from_slice(&bytes)
526 .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
527 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
528 Ok(()) => Ok(StateOpAck::Ok),
529 Err(err) => Err(StateError {
530 code: "internal".into(),
531 message: err.to_string(),
532 }),
533 }
534 }
535
536 fn delete(
537 &mut self,
538 key: HostStateKey,
539 ctx: Option<StateTenantCtx>,
540 ) -> Result<StateOpAck, StateError> {
541 let store = match self.state_store.as_ref() {
542 Some(store) => store.clone(),
543 None => {
544 return Err(StateError {
545 code: "unavailable".into(),
546 message: "state store not configured".into(),
547 });
548 }
549 };
550 let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
551 Ok(ctx) => ctx,
552 Err(err) => {
553 return Err(StateError {
554 code: "invalid-ctx".into(),
555 message: err.to_string(),
556 });
557 }
558 };
559 let key = StoreStateKey::from(key);
560 match store.del(&tenant_ctx, STATE_PREFIX, &key) {
561 Ok(_) => Ok(StateOpAck::Ok),
562 Err(err) => Err(StateError {
563 code: "internal".into(),
564 message: err.to_string(),
565 }),
566 }
567 }
568}
569
570impl TelemetryLoggerHost for HostState {
571 fn log(
572 &mut self,
573 span: TelemetrySpanContext,
574 fields: Vec<(String, String)>,
575 _ctx: Option<TelemetryTenantCtx>,
576 ) -> Result<TelemetryAck, TelemetryError> {
577 if let Some(mock) = &self.mocks
578 && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
579 {
580 return Ok(TelemetryAck::Ok);
581 }
582 let mut map = serde_json::Map::new();
583 for (k, v) in fields {
584 map.insert(k, Value::String(v));
585 }
586 tracing::info!(
587 tenant = %span.tenant,
588 flow_id = %span.flow_id,
589 node = ?span.node_id,
590 provider = %span.provider,
591 fields = %serde_json::Value::Object(map.clone()),
592 "telemetry log from pack"
593 );
594 Ok(TelemetryAck::Ok)
595 }
596}
597
598impl RunnerHostHttp for HostState {
599 fn request(
600 &mut self,
601 method: String,
602 url: String,
603 headers: Vec<String>,
604 body: Option<Vec<u8>>,
605 ) -> Result<Vec<u8>, String> {
606 let req = HttpRequest {
607 method,
608 url,
609 headers: headers
610 .chunks(2)
611 .filter_map(|chunk| {
612 if chunk.len() == 2 {
613 Some((chunk[0].clone(), chunk[1].clone()))
614 } else {
615 None
616 }
617 })
618 .collect(),
619 body,
620 };
621 match HttpClientHost::send(self, req, None) {
622 Ok(resp) => Ok(resp.body.unwrap_or_default()),
623 Err(err) => Err(err.message),
624 }
625 }
626}
627
628impl RunnerHostKv for HostState {
629 fn get(&mut self, _ns: String, _key: String) -> Option<String> {
630 None
631 }
632
633 fn put(&mut self, _ns: String, _key: String, _val: String) {}
634}
635
636enum ManifestLoad {
637 New {
638 manifest: Box<greentic_types::PackManifest>,
639 flows: PackFlows,
640 },
641 Legacy {
642 manifest: Box<legacy_pack::PackManifest>,
643 flows: PackFlows,
644 },
645}
646
647fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
648 let mut archive = ZipArchive::new(File::open(path)?)
649 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
650 let bytes = read_entry(&mut archive, "manifest.cbor")
651 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
652 match decode_pack_manifest(&bytes) {
653 Ok(manifest) => {
654 let cache = PackFlows::from_manifest(manifest.clone());
655 Ok(ManifestLoad::New {
656 manifest: Box::new(manifest),
657 flows: cache,
658 })
659 }
660 Err(err) => {
661 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
662 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
664 .context("failed to decode legacy pack manifest from manifest.cbor")?;
665 let flows = load_legacy_flows(&mut archive, &legacy)?;
666 Ok(ManifestLoad::Legacy {
667 manifest: Box::new(legacy),
668 flows,
669 })
670 }
671 }
672}
673
674fn load_legacy_flows(
675 archive: &mut ZipArchive<File>,
676 manifest: &legacy_pack::PackManifest,
677) -> Result<PackFlows> {
678 let mut flows = HashMap::new();
679 let mut descriptors = Vec::new();
680
681 for entry in &manifest.flows {
682 let bytes = read_entry(archive, &entry.file_json)
683 .with_context(|| format!("missing flow json {}", entry.file_json))?;
684 let doc: FlowDoc = serde_json::from_slice(&bytes)
685 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
686 let normalized = normalize_flow_doc(doc);
687 let flow_ir = flow_doc_to_ir(normalized)?;
688 let flow = flow_ir_to_flow(flow_ir)?;
689
690 descriptors.push(FlowDescriptor {
691 id: entry.id.clone(),
692 flow_type: entry.kind.clone(),
693 profile: manifest.meta.pack_id.clone(),
694 version: manifest.meta.version.to_string(),
695 description: None,
696 });
697 flows.insert(entry.id.clone(), flow);
698 }
699
700 let mut entry_flows = manifest.meta.entry_flows.clone();
701 if entry_flows.is_empty() {
702 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
703 }
704 let metadata = PackMetadata {
705 pack_id: manifest.meta.pack_id.clone(),
706 version: manifest.meta.version.to_string(),
707 entry_flows,
708 secret_requirements: Vec::new(),
709 };
710
711 Ok(PackFlows {
712 descriptors,
713 flows,
714 metadata,
715 })
716}
717
718pub struct ComponentState {
719 pub host: HostState,
720 wasi_ctx: WasiCtx,
721 resource_table: ResourceTable,
722}
723
724impl ComponentState {
725 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
726 let wasi_ctx = policy
727 .instantiate()
728 .context("failed to build WASI context")?;
729 Ok(Self {
730 host,
731 wasi_ctx,
732 resource_table: ResourceTable::new(),
733 })
734 }
735
736 fn host_mut(&mut self) -> &mut HostState {
737 &mut self.host
738 }
739}
740
741impl control::Host for ComponentState {
742 fn should_cancel(&mut self) -> bool {
743 false
744 }
745
746 fn yield_now(&mut self) {
747 }
749}
750
751fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
752 let mut inst = linker.instance("greentic:component/control@0.4.0")?;
753 inst.func_wrap(
754 "should-cancel",
755 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
756 let host = caller.data_mut();
757 Ok((ComponentControlHost::should_cancel(host),))
758 },
759 )?;
760 inst.func_wrap(
761 "yield-now",
762 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
763 let host = caller.data_mut();
764 ComponentControlHost::yield_now(host);
765 Ok(())
766 },
767 )?;
768 Ok(())
769}
770
771pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
772 add_wasi_to_linker(linker)?;
773 add_all_v1_to_linker(
774 linker,
775 HostFns {
776 http_client_v1_1: Some(|state| state.host_mut()),
777 http_client: Some(|state| state.host_mut()),
778 oauth_broker: None,
779 runner_host_http: Some(|state| state.host_mut()),
780 runner_host_kv: Some(|state| state.host_mut()),
781 telemetry_logger: Some(|state| state.host_mut()),
782 state_store: Some(|state| state.host_mut()),
783 secrets_store: Some(|state| state.host_mut()),
784 },
785 )?;
786 Ok(())
787}
788
789impl OAuthHostContext for ComponentState {
790 fn tenant_id(&self) -> &str {
791 &self.host.config.tenant
792 }
793
794 fn env(&self) -> &str {
795 &self.host.default_env
796 }
797
798 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
799 &mut self.host.oauth_host
800 }
801
802 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
803 self.host.oauth_config.as_ref()
804 }
805}
806
807impl WasiView for ComponentState {
808 fn ctx(&mut self) -> WasiCtxView<'_> {
809 WasiCtxView {
810 ctx: &mut self.wasi_ctx,
811 table: &mut self.resource_table,
812 }
813 }
814}
815
816#[allow(unsafe_code)]
817unsafe impl Send for ComponentState {}
818#[allow(unsafe_code)]
819unsafe impl Sync for ComponentState {}
820
821impl PackRuntime {
822 #[allow(clippy::too_many_arguments)]
823 pub async fn load(
824 path: impl AsRef<Path>,
825 config: Arc<HostConfig>,
826 mocks: Option<Arc<MockLayer>>,
827 archive_source: Option<&Path>,
828 session_store: Option<DynSessionStore>,
829 state_store: Option<DynStateStore>,
830 wasi_policy: Arc<RunnerWasiPolicy>,
831 secrets: DynSecretsManager,
832 oauth_config: Option<OAuthBrokerConfig>,
833 verify_archive: bool,
834 ) -> Result<Self> {
835 let path = path.as_ref();
836 let (_pack_root, safe_path) = normalize_pack_path(path)?;
837 let is_component = safe_path
838 .extension()
839 .and_then(|ext| ext.to_str())
840 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
841 .unwrap_or(false);
842 let archive_hint_path = if let Some(source) = archive_source {
843 let (_, normalized) = normalize_pack_path(source)?;
844 Some(normalized)
845 } else if is_component {
846 None
847 } else {
848 Some(safe_path.clone())
849 };
850 let archive_hint = archive_hint_path.as_deref();
851 if verify_archive {
852 let verify_target = archive_hint.unwrap_or(&safe_path);
853 verify::verify_pack(verify_target).await?;
854 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
855 }
856 let engine = Engine::default();
857 let wasm_bytes = fs::read(&safe_path).await?;
858 let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
859 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
860 let mut manifest = None;
861 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
862 let flows = if let Some(archive_path) = archive_hint {
863 match load_manifest_and_flows(archive_path) {
864 Ok(ManifestLoad::New {
865 manifest: m,
866 flows: cache,
867 }) => {
868 metadata = cache.metadata.clone();
869 manifest = Some(*m);
870 Some(cache)
871 }
872 Ok(ManifestLoad::Legacy {
873 manifest: m,
874 flows: cache,
875 }) => {
876 metadata = cache.metadata.clone();
877 legacy_manifest = Some(m);
878 Some(cache)
879 }
880 Err(err) => {
881 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
882 None
883 }
884 }
885 } else {
886 None
887 };
888 let components = if let Some(archive_path) = archive_hint {
889 if let Some(new_manifest) = manifest.as_ref() {
890 match load_components_from_archive(&engine, archive_path, Some(new_manifest)) {
891 Ok(map) => map,
892 Err(err) => {
893 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
894 HashMap::new()
895 }
896 }
897 } else if let Some(legacy) = legacy_manifest.as_ref() {
898 match load_legacy_components_from_archive(&engine, archive_path, legacy) {
899 Ok(map) => map,
900 Err(err) => {
901 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
902 HashMap::new()
903 }
904 }
905 } else {
906 HashMap::new()
907 }
908 } else if is_component {
909 let name = safe_path
910 .file_stem()
911 .map(|s| s.to_string_lossy().to_string())
912 .unwrap_or_else(|| "component".to_string());
913 let component = Component::from_binary(&engine, &wasm_bytes)?;
914 let mut map = HashMap::new();
915 map.insert(
916 name.clone(),
917 PackComponent {
918 name,
919 version: metadata.version.clone(),
920 component,
921 },
922 );
923 map
924 } else {
925 HashMap::new()
926 };
927 let http_client = Arc::clone(&HTTP_CLIENT);
928 Ok(Self {
929 path: safe_path,
930 archive_path: archive_hint.map(Path::to_path_buf),
931 config,
932 engine,
933 metadata,
934 manifest,
935 legacy_manifest,
936 mocks,
937 flows,
938 components,
939 http_client,
940 pre_cache: Mutex::new(HashMap::new()),
941 session_store,
942 state_store,
943 wasi_policy,
944 provider_registry: RwLock::new(None),
945 secrets,
946 oauth_config,
947 })
948 }
949
950 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
951 if let Some(cache) = &self.flows {
952 return Ok(cache.descriptors.clone());
953 }
954 if let Some(manifest) = &self.manifest {
955 let descriptors = manifest
956 .flows
957 .iter()
958 .map(|flow| FlowDescriptor {
959 id: flow.id.as_str().to_string(),
960 flow_type: flow_kind_to_str(flow.kind).to_string(),
961 profile: manifest.pack_id.as_str().to_string(),
962 version: manifest.version.to_string(),
963 description: None,
964 })
965 .collect();
966 return Ok(descriptors);
967 }
968 Ok(Vec::new())
969 }
970
971 #[allow(dead_code)]
972 pub async fn run_flow(
973 &self,
974 flow_id: &str,
975 input: serde_json::Value,
976 ) -> Result<serde_json::Value> {
977 let pack = Arc::new(
978 PackRuntime::load(
979 &self.path,
980 Arc::clone(&self.config),
981 self.mocks.clone(),
982 self.archive_path.as_deref(),
983 self.session_store.clone(),
984 self.state_store.clone(),
985 Arc::clone(&self.wasi_policy),
986 self.secrets.clone(),
987 self.oauth_config.clone(),
988 false,
989 )
990 .await?,
991 );
992
993 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
994 let retry_config = self.config.retry_config().into();
995 let mocks = pack.mocks.as_deref();
996 let tenant = self.config.tenant.as_str();
997
998 let ctx = FlowContext {
999 tenant,
1000 flow_id,
1001 node_id: None,
1002 tool: None,
1003 action: None,
1004 session_id: None,
1005 provider_id: None,
1006 retry_config,
1007 observer: None,
1008 mocks,
1009 };
1010
1011 let execution = engine.execute(ctx, input).await?;
1012 match execution.status {
1013 FlowStatus::Completed => Ok(execution.output),
1014 FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1015 "status": "pending",
1016 "reason": wait.reason,
1017 "resume": wait.snapshot,
1018 "response": execution.output,
1019 })),
1020 }
1021 }
1022
1023 pub async fn invoke_component(
1024 &self,
1025 component_ref: &str,
1026 ctx: ComponentExecCtx,
1027 operation: &str,
1028 _config_json: Option<String>,
1029 input_json: String,
1030 ) -> Result<Value> {
1031 let pack_component = self
1032 .components
1033 .get(component_ref)
1034 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1035
1036 let mut linker = Linker::new(&self.engine);
1037 register_all(&mut linker)?;
1038 add_component_control_to_linker(&mut linker)?;
1039 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1040 let pre: ComponentPre<ComponentState> = ComponentPre::new(pre_instance)?;
1041
1042 let host_state = HostState::new(
1043 Arc::clone(&self.config),
1044 Arc::clone(&self.http_client),
1045 self.mocks.clone(),
1046 self.session_store.clone(),
1047 self.state_store.clone(),
1048 Arc::clone(&self.secrets),
1049 self.oauth_config.clone(),
1050 )?;
1051 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1052 let mut store = wasmtime::Store::new(&self.engine, store_state);
1053 let bindings: crate::component_api::Component = pre.instantiate_async(&mut store).await?;
1054 let node = bindings.greentic_component_node();
1055
1056 let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1057
1058 match result {
1059 InvokeResult::Ok(body) => {
1060 if body.is_empty() {
1061 return Ok(Value::Null);
1062 }
1063 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1064 }
1065 InvokeResult::Err(NodeError {
1066 code,
1067 message,
1068 retryable,
1069 backoff_ms,
1070 details,
1071 }) => {
1072 let mut obj = serde_json::Map::new();
1073 obj.insert("ok".into(), Value::Bool(false));
1074 let mut error = serde_json::Map::new();
1075 error.insert("code".into(), Value::String(code));
1076 error.insert("message".into(), Value::String(message));
1077 error.insert("retryable".into(), Value::Bool(retryable));
1078 if let Some(backoff) = backoff_ms {
1079 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1080 }
1081 if let Some(details) = details {
1082 error.insert(
1083 "details".into(),
1084 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1085 );
1086 }
1087 obj.insert("error".into(), Value::Object(error));
1088 Ok(Value::Object(obj))
1089 }
1090 }
1091 }
1092
1093 pub fn resolve_provider(
1094 &self,
1095 provider_id: Option<&str>,
1096 provider_type: Option<&str>,
1097 ) -> Result<ProviderBinding> {
1098 let registry = self.provider_registry()?;
1099 registry.resolve(provider_id, provider_type)
1100 }
1101
1102 pub async fn invoke_provider(
1103 &self,
1104 binding: &ProviderBinding,
1105 _ctx: ComponentExecCtx,
1106 op: &str,
1107 input_json: Vec<u8>,
1108 ) -> Result<Value> {
1109 let component_ref = &binding.component_ref;
1110 let pack_component = self
1111 .components
1112 .get(component_ref)
1113 .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1114
1115 let mut linker = Linker::new(&self.engine);
1116 register_all(&mut linker)?;
1117 add_component_control_to_linker(&mut linker)?;
1118 let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1119 let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1120
1121 let host_state = HostState::new(
1122 Arc::clone(&self.config),
1123 Arc::clone(&self.http_client),
1124 self.mocks.clone(),
1125 self.session_store.clone(),
1126 self.state_store.clone(),
1127 Arc::clone(&self.secrets),
1128 self.oauth_config.clone(),
1129 )?;
1130 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1131 let mut store = wasmtime::Store::new(&self.engine, store_state);
1132 let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1133 let provider = bindings.greentic_provider_core_schema_core_api();
1134
1135 let result = provider.call_invoke(&mut store, op, &input_json)?;
1136 deserialize_json_bytes(result)
1137 }
1138
1139 fn provider_registry(&self) -> Result<ProviderRegistry> {
1140 if let Some(registry) = self.provider_registry.read().clone() {
1141 return Ok(registry);
1142 }
1143 let manifest = self
1144 .manifest
1145 .as_ref()
1146 .context("pack manifest required for provider resolution")?;
1147 let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1148 let registry = ProviderRegistry::new(
1149 manifest,
1150 self.state_store.clone(),
1151 &self.config.tenant,
1152 &env,
1153 )?;
1154 *self.provider_registry.write() = Some(registry.clone());
1155 Ok(registry)
1156 }
1157
1158 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1159 if let Some(cache) = &self.flows {
1160 return cache
1161 .flows
1162 .get(flow_id)
1163 .cloned()
1164 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1165 }
1166 if let Some(manifest) = &self.manifest {
1167 let entry = manifest
1168 .flows
1169 .iter()
1170 .find(|f| f.id.as_str() == flow_id)
1171 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1172 return Ok(entry.flow.clone());
1173 }
1174 bail!("flow '{flow_id}' not available (pack exports disabled)")
1175 }
1176
1177 pub fn metadata(&self) -> &PackMetadata {
1178 &self.metadata
1179 }
1180
1181 pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1182 &self.metadata.secret_requirements
1183 }
1184
1185 pub fn missing_secrets(
1186 &self,
1187 tenant_ctx: &TypesTenantCtx,
1188 ) -> Vec<greentic_types::SecretRequirement> {
1189 let env = tenant_ctx.env.as_str().to_string();
1190 let tenant = tenant_ctx.tenant.as_str().to_string();
1191 let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1192 self.required_secrets()
1193 .iter()
1194 .filter(|req| {
1195 if let Some(scope) = &req.scope {
1197 if scope.env != env {
1198 return false;
1199 }
1200 if scope.tenant != tenant {
1201 return false;
1202 }
1203 if let Some(ref team_req) = scope.team
1204 && team.as_ref() != Some(team_req)
1205 {
1206 return false;
1207 }
1208 }
1209 read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1210 })
1211 .cloned()
1212 .collect()
1213 }
1214
1215 pub fn for_component_test(
1216 components: Vec<(String, PathBuf)>,
1217 flows: HashMap<String, FlowIR>,
1218 config: Arc<HostConfig>,
1219 ) -> Result<Self> {
1220 let engine = Engine::default();
1221 let mut component_map = HashMap::new();
1222 for (name, path) in components {
1223 if !path.exists() {
1224 bail!("component artifact missing: {}", path.display());
1225 }
1226 let wasm_bytes = std::fs::read(&path)?;
1227 let component = Component::from_binary(&engine, &wasm_bytes)
1228 .with_context(|| format!("failed to compile component {}", path.display()))?;
1229 component_map.insert(
1230 name.clone(),
1231 PackComponent {
1232 name,
1233 version: "0.0.0".into(),
1234 component,
1235 },
1236 );
1237 }
1238
1239 let mut flow_map = HashMap::new();
1240 let mut descriptors = Vec::new();
1241 for (id, ir) in flows {
1242 let flow_type = ir.flow_type.clone();
1243 let flow = flow_ir_to_flow(ir)?;
1244 flow_map.insert(id.clone(), flow);
1245 descriptors.push(FlowDescriptor {
1246 id: id.clone(),
1247 flow_type,
1248 profile: "test".into(),
1249 version: "0.0.0".into(),
1250 description: None,
1251 });
1252 }
1253 let flows_cache = PackFlows {
1254 descriptors: descriptors.clone(),
1255 flows: flow_map,
1256 metadata: PackMetadata::fallback(Path::new("component-test")),
1257 };
1258
1259 Ok(Self {
1260 path: PathBuf::new(),
1261 archive_path: None,
1262 config,
1263 engine,
1264 metadata: PackMetadata::fallback(Path::new("component-test")),
1265 manifest: None,
1266 legacy_manifest: None,
1267 mocks: None,
1268 flows: Some(flows_cache),
1269 components: component_map,
1270 http_client: Arc::clone(&HTTP_CLIENT),
1271 pre_cache: Mutex::new(HashMap::new()),
1272 session_store: None,
1273 state_store: None,
1274 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1275 provider_registry: RwLock::new(None),
1276 secrets: crate::secrets::default_manager(),
1277 oauth_config: None,
1278 })
1279 }
1280}
1281
1282struct PackFlows {
1283 descriptors: Vec<FlowDescriptor>,
1284 flows: HashMap<String, Flow>,
1285 metadata: PackMetadata,
1286}
1287
1288fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1289 if bytes.is_empty() {
1290 return Ok(Value::Null);
1291 }
1292 serde_json::from_slice(&bytes).or_else(|_| {
1293 String::from_utf8(bytes)
1294 .map(Value::String)
1295 .map_err(|err| anyhow!(err))
1296 })
1297}
1298
1299impl PackFlows {
1300 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1301 let descriptors = manifest
1302 .flows
1303 .iter()
1304 .map(|entry| FlowDescriptor {
1305 id: entry.id.as_str().to_string(),
1306 flow_type: flow_kind_to_str(entry.kind).to_string(),
1307 profile: manifest.pack_id.as_str().to_string(),
1308 version: manifest.version.to_string(),
1309 description: None,
1310 })
1311 .collect();
1312 let mut flows = HashMap::new();
1313 for entry in &manifest.flows {
1314 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1315 }
1316 Self {
1317 metadata: PackMetadata::from_manifest(&manifest),
1318 descriptors,
1319 flows,
1320 }
1321 }
1322}
1323
1324fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1325 match kind {
1326 greentic_types::FlowKind::Messaging => "messaging",
1327 greentic_types::FlowKind::Event => "event",
1328 greentic_types::FlowKind::ComponentConfig => "component-config",
1329 greentic_types::FlowKind::Job => "job",
1330 greentic_types::FlowKind::Http => "http",
1331 }
1332}
1333
1334fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1335 let mut file = archive
1336 .by_name(name)
1337 .with_context(|| format!("entry {name} missing from archive"))?;
1338 let mut buf = Vec::new();
1339 file.read_to_end(&mut buf)?;
1340 Ok(buf)
1341}
1342
1343fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1344 for node in doc.nodes.values_mut() {
1345 if node.component.is_empty()
1346 && let Some((component_ref, payload)) = node.raw.iter().next()
1347 {
1348 if component_ref.starts_with("emit.") {
1349 node.component = component_ref.clone();
1350 node.payload = payload.clone();
1351 node.raw.clear();
1352 continue;
1353 }
1354 let (target_component, operation, input, config) =
1355 infer_component_exec(payload, component_ref);
1356 let mut payload_obj = serde_json::Map::new();
1357 payload_obj.insert("component".into(), Value::String(target_component));
1359 payload_obj.insert("operation".into(), Value::String(operation));
1360 payload_obj.insert("input".into(), input);
1361 if let Some(cfg) = config {
1362 payload_obj.insert("config".into(), cfg);
1363 }
1364 node.component = "component.exec".to_string();
1365 node.payload = Value::Object(payload_obj);
1366 }
1367 }
1368 doc
1369}
1370
1371fn infer_component_exec(
1372 payload: &Value,
1373 component_ref: &str,
1374) -> (String, String, Value, Option<Value>) {
1375 let default_op = if component_ref.starts_with("templating.") {
1376 "render"
1377 } else {
1378 "invoke"
1379 }
1380 .to_string();
1381
1382 if let Value::Object(map) = payload {
1383 let op = map
1384 .get("op")
1385 .or_else(|| map.get("operation"))
1386 .and_then(Value::as_str)
1387 .map(|s| s.to_string())
1388 .unwrap_or_else(|| default_op.clone());
1389
1390 let mut input = map.clone();
1391 let config = input.remove("config");
1392 let component = input
1393 .get("component")
1394 .or_else(|| input.get("component_ref"))
1395 .and_then(Value::as_str)
1396 .map(|s| s.to_string())
1397 .unwrap_or_else(|| component_ref.to_string());
1398 input.remove("component");
1399 input.remove("component_ref");
1400 input.remove("op");
1401 input.remove("operation");
1402 return (component, op, Value::Object(input), config);
1403 }
1404
1405 (component_ref.to_string(), default_op, payload.clone(), None)
1406}
1407
1408#[cfg(test)]
1409mod tests {
1410 use super::*;
1411 use greentic_flow::model::{FlowDoc, NodeDoc};
1412 use serde_json::json;
1413 use std::collections::BTreeMap;
1414
1415 #[test]
1416 fn normalizes_raw_component_to_component_exec() {
1417 let mut nodes = BTreeMap::new();
1418 let mut raw = BTreeMap::new();
1419 raw.insert(
1420 "templating.handlebars".into(),
1421 json!({ "template": "Hi {{name}}" }),
1422 );
1423 nodes.insert(
1424 "start".into(),
1425 NodeDoc {
1426 raw,
1427 routing: json!([{"out": true}]),
1428 ..Default::default()
1429 },
1430 );
1431 let doc = FlowDoc {
1432 id: "welcome".into(),
1433 title: None,
1434 description: None,
1435 flow_type: "messaging".into(),
1436 start: Some("start".into()),
1437 parameters: json!({}),
1438 tags: Vec::new(),
1439 entrypoints: BTreeMap::new(),
1440 nodes,
1441 };
1442
1443 let normalized = normalize_flow_doc(doc);
1444 let node = normalized.nodes.get("start").expect("node exists");
1445 assert_eq!(node.component, "component.exec");
1446 assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1447 let payload = node.payload.as_object().expect("payload object");
1448 assert_eq!(
1449 payload.get("component"),
1450 Some(&Value::String("templating.handlebars".into()))
1451 );
1452 assert_eq!(
1453 payload.get("operation"),
1454 Some(&Value::String("render".into()))
1455 );
1456 let input = payload.get("input").unwrap();
1457 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1458 }
1459}
1460
1461fn load_components_from_archive(
1462 engine: &Engine,
1463 path: &Path,
1464 manifest: Option<&greentic_types::PackManifest>,
1465) -> Result<HashMap<String, PackComponent>> {
1466 let mut archive = ZipArchive::new(File::open(path)?)
1467 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1468 let mut components = HashMap::new();
1469 if let Some(manifest) = manifest {
1470 for entry in &manifest.components {
1471 let file_name = format!("components/{}.wasm", entry.id.as_str());
1472 let bytes = read_entry(&mut archive, &file_name)
1473 .with_context(|| format!("missing component {}", file_name))?;
1474 let component = Component::from_binary(engine, &bytes)
1475 .with_context(|| format!("failed to compile component {}", entry.id.as_str()))?;
1476 components.insert(
1477 entry.id.as_str().to_string(),
1478 PackComponent {
1479 name: entry.id.as_str().to_string(),
1480 version: entry.version.to_string(),
1481 component,
1482 },
1483 );
1484 }
1485 }
1486 Ok(components)
1487}
1488
1489fn load_legacy_components_from_archive(
1490 engine: &Engine,
1491 path: &Path,
1492 manifest: &legacy_pack::PackManifest,
1493) -> Result<HashMap<String, PackComponent>> {
1494 let mut archive = ZipArchive::new(File::open(path)?)
1495 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1496 let mut components = HashMap::new();
1497 for entry in &manifest.components {
1498 let bytes = read_entry(&mut archive, &entry.file_wasm)
1499 .with_context(|| format!("missing component {}", entry.file_wasm))?;
1500 let component = Component::from_binary(engine, &bytes)
1501 .with_context(|| format!("failed to compile component {}", entry.name))?;
1502 components.insert(
1503 entry.name.clone(),
1504 PackComponent {
1505 name: entry.name.clone(),
1506 version: entry.version.to_string(),
1507 component,
1508 },
1509 );
1510 }
1511 Ok(components)
1512}
1513
1514#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1515pub struct PackMetadata {
1516 pub pack_id: String,
1517 pub version: String,
1518 #[serde(default)]
1519 pub entry_flows: Vec<String>,
1520 #[serde(default)]
1521 pub secret_requirements: Vec<greentic_types::SecretRequirement>,
1522}
1523
1524impl PackMetadata {
1525 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1526 let parser = Parser::new(0);
1527 for payload in parser.parse_all(bytes) {
1528 let payload = payload.ok()?;
1529 match payload {
1530 Payload::CustomSection(section) => {
1531 if section.name() == "greentic.manifest"
1532 && let Ok(meta) = Self::from_bytes(section.data())
1533 {
1534 return Some(meta);
1535 }
1536 }
1537 Payload::DataSection(reader) => {
1538 for segment in reader.into_iter().flatten() {
1539 if let Ok(meta) = Self::from_bytes(segment.data) {
1540 return Some(meta);
1541 }
1542 }
1543 }
1544 _ => {}
1545 }
1546 }
1547 None
1548 }
1549
1550 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1551 #[derive(Deserialize)]
1552 struct RawManifest {
1553 pack_id: String,
1554 version: String,
1555 #[serde(default)]
1556 entry_flows: Vec<String>,
1557 #[serde(default)]
1558 flows: Vec<RawFlow>,
1559 #[serde(default)]
1560 secret_requirements: Vec<greentic_types::SecretRequirement>,
1561 }
1562
1563 #[derive(Deserialize)]
1564 struct RawFlow {
1565 id: String,
1566 }
1567
1568 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1569 let mut entry_flows = if manifest.entry_flows.is_empty() {
1570 manifest.flows.iter().map(|f| f.id.clone()).collect()
1571 } else {
1572 manifest.entry_flows.clone()
1573 };
1574 entry_flows.retain(|id| !id.is_empty());
1575 Ok(Self {
1576 pack_id: manifest.pack_id,
1577 version: manifest.version,
1578 entry_flows,
1579 secret_requirements: manifest.secret_requirements,
1580 })
1581 }
1582
1583 pub fn fallback(path: &Path) -> Self {
1584 let pack_id = path
1585 .file_stem()
1586 .map(|s| s.to_string_lossy().into_owned())
1587 .unwrap_or_else(|| "unknown-pack".to_string());
1588 Self {
1589 pack_id,
1590 version: "0.0.0".to_string(),
1591 entry_flows: Vec::new(),
1592 secret_requirements: Vec::new(),
1593 }
1594 }
1595
1596 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
1597 let entry_flows = manifest
1598 .flows
1599 .iter()
1600 .map(|flow| flow.id.as_str().to_string())
1601 .collect::<Vec<_>>();
1602 Self {
1603 pack_id: manifest.pack_id.as_str().to_string(),
1604 version: manifest.version.to_string(),
1605 entry_flows,
1606 secret_requirements: manifest.secret_requirements.clone(),
1607 }
1608 }
1609}