1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
9use crate::component_api::{
10 ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::imports::register_all;
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, WasmResult};
15use anyhow::{Context, Result, anyhow, bail};
16use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
17use greentic_flow::model::FlowDoc;
18use greentic_interfaces_host::host_import::v0_2 as host_import_v0_2;
19use greentic_interfaces_host::host_import::v0_2::greentic::host_import::imports::{
20 HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
21 IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
22};
23use greentic_interfaces_host::host_import::v0_6::{
24 self as host_import_v0_6, iface_types, state, types,
25};
26use greentic_pack::reader::{SigningPolicy, open_pack};
27use greentic_session::SessionKey as StoreSessionKey;
28use greentic_types::{
29 EnvId, FlowId, SessionCursor as StoreSessionCursor, SessionData, StateKey as StoreStateKey,
30 TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
31};
32use indexmap::IndexMap;
33use once_cell::sync::Lazy;
34use parking_lot::Mutex;
35use reqwest::blocking::Client as BlockingClient;
36use runner_core::normalize_under_root;
37use serde::{Deserialize, Serialize};
38use serde_cbor;
39use serde_json::{self, Value};
40use serde_yaml_bw as serde_yaml;
41use tokio::fs;
42use wasmparser::{Parser, Payload};
43use wasmtime::StoreContextMut;
44use zip::ZipArchive;
45
46use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
47
48use crate::config::HostConfig;
49use crate::secrets::{DynSecretsManager, read_secret_blocking};
50use crate::storage::state::STATE_PREFIX;
51use crate::storage::{DynSessionStore, DynStateStore};
52use crate::verify;
53use crate::wasi::RunnerWasiPolicy;
54use tracing::warn;
55use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
56
57#[allow(dead_code)]
58pub struct PackRuntime {
59 path: PathBuf,
61 archive_path: Option<PathBuf>,
63 config: Arc<HostConfig>,
64 engine: Engine,
65 metadata: PackMetadata,
66 manifest: Option<greentic_pack::builder::PackManifest>,
67 mocks: Option<Arc<MockLayer>>,
68 flows: Option<PackFlows>,
69 components: HashMap<String, PackComponent>,
70 http_client: Arc<BlockingClient>,
71 pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
72 session_store: Option<DynSessionStore>,
73 state_store: Option<DynStateStore>,
74 wasi_policy: Arc<RunnerWasiPolicy>,
75 secrets: DynSecretsManager,
76 oauth_config: Option<OAuthBrokerConfig>,
77}
78
79struct PackComponent {
80 #[allow(dead_code)]
81 name: String,
82 #[allow(dead_code)]
83 version: String,
84 component: Component,
85}
86
87fn build_blocking_client() -> BlockingClient {
88 std::thread::spawn(|| BlockingClient::builder().build().expect("blocking client"))
89 .join()
90 .expect("client build thread panicked")
91}
92
93fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
94 let (root, candidate) = if path.is_absolute() {
95 let parent = path
96 .parent()
97 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
98 let root = parent
99 .canonicalize()
100 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
101 let file = path
102 .file_name()
103 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
104 (root, PathBuf::from(file))
105 } else {
106 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
107 let base = if let Some(parent) = path.parent() {
108 cwd.join(parent)
109 } else {
110 cwd
111 };
112 let root = base
113 .canonicalize()
114 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
115 let file = path
116 .file_name()
117 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
118 (root, PathBuf::from(file))
119 };
120 let safe = normalize_under_root(&root, &candidate)?;
121 Ok((root, safe))
122}
123
124static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct FlowDescriptor {
128 pub id: String,
129 #[serde(rename = "type")]
130 pub flow_type: String,
131 pub profile: String,
132 pub version: String,
133 #[serde(default)]
134 pub description: Option<String>,
135}
136
137pub struct HostState {
138 config: Arc<HostConfig>,
139 http_client: Arc<BlockingClient>,
140 default_env: String,
141 session_store: Option<DynSessionStore>,
142 state_store: Option<DynStateStore>,
143 mocks: Option<Arc<MockLayer>>,
144 secrets: DynSecretsManager,
145 oauth_config: Option<OAuthBrokerConfig>,
146 oauth_host: OAuthBrokerHost,
147}
148
149impl HostState {
150 #[allow(clippy::default_constructed_unit_structs)]
151 pub fn new(
152 config: Arc<HostConfig>,
153 http_client: Arc<BlockingClient>,
154 mocks: Option<Arc<MockLayer>>,
155 session_store: Option<DynSessionStore>,
156 state_store: Option<DynStateStore>,
157 secrets: DynSecretsManager,
158 oauth_config: Option<OAuthBrokerConfig>,
159 ) -> Result<Self> {
160 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
161 Ok(Self {
162 config,
163 http_client,
164 default_env,
165 session_store,
166 state_store,
167 mocks,
168 secrets,
169 oauth_config,
170 oauth_host: OAuthBrokerHost::default(),
171 })
172 }
173
174 pub fn get_secret(&self, key: &str) -> Result<String> {
175 if !self.config.secrets_policy.is_allowed(key) {
176 bail!("secret {key} is not permitted by bindings policy");
177 }
178 if let Some(mock) = &self.mocks
179 && let Some(value) = mock.secrets_lookup(key)
180 {
181 return Ok(value);
182 }
183 let bytes = read_secret_blocking(&self.secrets, key)
184 .context("failed to read secret from manager")?;
185 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
186 Ok(value)
187 }
188
189 fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
190 let tenant_raw = ctx
191 .as_ref()
192 .map(|ctx| ctx.tenant.clone())
193 .unwrap_or_else(|| self.config.tenant.clone());
194 let tenant_id = TenantId::from_str(&tenant_raw)
195 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
196 let env_id = EnvId::from_str(&self.default_env)
197 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
198 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
199 if let Some(ctx) = ctx {
200 if let Some(team) = ctx.team {
201 let team_id =
202 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
203 tenant_ctx = tenant_ctx.with_team(Some(team_id));
204 }
205 if let Some(user) = ctx.user {
206 let user_id =
207 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
208 tenant_ctx = tenant_ctx.with_user(Some(user_id));
209 }
210 if let Some(flow) = ctx.flow_id {
211 tenant_ctx = tenant_ctx.with_flow(flow);
212 }
213 if let Some(node) = ctx.node_id {
214 tenant_ctx = tenant_ctx.with_node(node);
215 }
216 if let Some(provider) = ctx.provider_id {
217 tenant_ctx = tenant_ctx.with_provider(provider);
218 }
219 if let Some(session) = ctx.session_id {
220 tenant_ctx = tenant_ctx.with_session(session);
221 }
222 tenant_ctx.trace_id = ctx.trace_id;
223 }
224 Ok(tenant_ctx)
225 }
226
227 fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
228 self.session_store
229 .as_ref()
230 .cloned()
231 .ok_or(types::IfaceError::Unavailable)
232 }
233
234 fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
235 self.state_store
236 .as_ref()
237 .cloned()
238 .ok_or(types::IfaceError::Unavailable)
239 }
240
241 fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
242 ctx.user_id
243 .clone()
244 .or_else(|| ctx.user.clone())
245 .ok_or(types::IfaceError::InvalidArg)
246 }
247
248 fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
249 let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
250 FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
251 }
252
253 fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
254 let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
255 if let Some(reason) = cursor.wait_reason {
256 store_cursor = store_cursor.with_wait_reason(reason);
257 }
258 if let Some(marker) = cursor.outbox_marker {
259 store_cursor = store_cursor.with_outbox_marker(marker);
260 }
261 store_cursor
262 }
263}
264
265pub struct ComponentState {
266 host: HostState,
267 wasi_ctx: WasiCtx,
268 resource_table: ResourceTable,
269}
270
271impl ComponentState {
272 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
273 let wasi_ctx = policy
274 .instantiate()
275 .context("failed to build WASI context")?;
276 Ok(Self {
277 host,
278 wasi_ctx,
279 resource_table: ResourceTable::new(),
280 })
281 }
282
283 fn host_mut(&mut self) -> &mut HostState {
284 &mut self.host
285 }
286}
287
288impl control::Host for ComponentState {
289 fn should_cancel(&mut self) -> bool {
290 false
291 }
292
293 fn yield_now(&mut self) {
294 }
296}
297
298fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
299 let mut inst = linker.instance("greentic:component/control@0.4.0")?;
300 inst.func_wrap(
301 "should-cancel",
302 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
303 let host = caller.data_mut();
304 Ok((ComponentControlHost::should_cancel(host),))
305 },
306 )?;
307 inst.func_wrap(
308 "yield-now",
309 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
310 let host = caller.data_mut();
311 ComponentControlHost::yield_now(host);
312 Ok(())
313 },
314 )?;
315 Ok(())
316}
317
318impl OAuthHostContext for ComponentState {
319 fn tenant_id(&self) -> &str {
320 &self.host.config.tenant
321 }
322
323 fn env(&self) -> &str {
324 &self.host.default_env
325 }
326
327 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
328 &mut self.host.oauth_host
329 }
330
331 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
332 self.host.oauth_config.as_ref()
333 }
334}
335
336impl WasiView for ComponentState {
337 fn ctx(&mut self) -> WasiCtxView<'_> {
338 WasiCtxView {
339 ctx: &mut self.wasi_ctx,
340 table: &mut self.resource_table,
341 }
342 }
343}
344
345#[allow(unsafe_code)]
346unsafe impl Send for ComponentState {}
347#[allow(unsafe_code)]
348unsafe impl Sync for ComponentState {}
349
350impl host_import_v0_6::HostImports for ComponentState {
351 fn secrets_get(
352 &mut self,
353 key: String,
354 ctx: Option<types::TenantCtx>,
355 ) -> WasmResult<Result<String, types::IfaceError>> {
356 host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
357 }
358
359 fn telemetry_emit(
360 &mut self,
361 span_json: String,
362 ctx: Option<types::TenantCtx>,
363 ) -> WasmResult<()> {
364 host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
365 }
366
367 fn http_fetch(
368 &mut self,
369 req: host_import_v0_6::http::HttpRequest,
370 ctx: Option<types::TenantCtx>,
371 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
372 host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
373 }
374
375 fn mcp_exec(
376 &mut self,
377 component: String,
378 action: String,
379 args_json: String,
380 ctx: Option<types::TenantCtx>,
381 ) -> WasmResult<Result<String, types::IfaceError>> {
382 host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
383 }
384
385 fn state_get(
386 &mut self,
387 key: iface_types::StateKey,
388 ctx: Option<types::TenantCtx>,
389 ) -> WasmResult<Result<String, types::IfaceError>> {
390 host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
391 }
392
393 fn state_set(
394 &mut self,
395 key: iface_types::StateKey,
396 value_json: String,
397 ctx: Option<types::TenantCtx>,
398 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
399 host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
400 }
401
402 fn session_update(
403 &mut self,
404 cursor: iface_types::SessionCursor,
405 ctx: Option<types::TenantCtx>,
406 ) -> WasmResult<Result<String, types::IfaceError>> {
407 host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
408 }
409}
410
411impl host_import_v0_2::HostImports for ComponentState {
412 fn secrets_get(
413 &mut self,
414 key: String,
415 ctx: Option<LegacyTenantCtx>,
416 ) -> WasmResult<Result<String, LegacyIfaceError>> {
417 host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
418 }
419
420 fn telemetry_emit(
421 &mut self,
422 span_json: String,
423 ctx: Option<LegacyTenantCtx>,
424 ) -> WasmResult<()> {
425 host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
426 }
427
428 fn tool_invoke(
429 &mut self,
430 tool: String,
431 action: String,
432 args_json: String,
433 ctx: Option<LegacyTenantCtx>,
434 ) -> WasmResult<Result<String, LegacyIfaceError>> {
435 host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
436 }
437
438 fn http_fetch(
439 &mut self,
440 req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
441 ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
442 ) -> WasmResult<
443 Result<
444 host_import_v0_2::greentic::host_import::imports::HttpResponse,
445 host_import_v0_2::greentic::host_import::imports::IfaceError,
446 >,
447 > {
448 host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
449 }
450}
451
452impl host_import_v0_6::HostImports for HostState {
453 fn secrets_get(
454 &mut self,
455 key: String,
456 _ctx: Option<types::TenantCtx>,
457 ) -> WasmResult<Result<String, types::IfaceError>> {
458 Ok(self.get_secret(&key).map_err(|err| {
459 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
460 types::IfaceError::Denied
461 }))
462 }
463
464 fn telemetry_emit(
465 &mut self,
466 span_json: String,
467 _ctx: Option<types::TenantCtx>,
468 ) -> WasmResult<()> {
469 if let Some(mock) = &self.mocks
470 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
471 {
472 return Ok(());
473 }
474 tracing::info!(span = %span_json, "telemetry emit from pack");
475 Ok(())
476 }
477
478 fn http_fetch(
479 &mut self,
480 req: host_import_v0_6::http::HttpRequest,
481 _ctx: Option<types::TenantCtx>,
482 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
483 let legacy_req = LegacyHttpRequest {
484 method: req.method,
485 url: req.url,
486 headers_json: req.headers_json,
487 body: req.body,
488 };
489 match host_import_v0_2::HostImports::http_fetch(self, legacy_req, None)? {
490 Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
491 status: resp.status,
492 headers_json: resp.headers_json,
493 body: resp.body,
494 })),
495 Err(err) => Ok(Err(map_legacy_error(err))),
496 }
497 }
498
499 fn mcp_exec(
500 &mut self,
501 component: String,
502 action: String,
503 args_json: String,
504 ctx: Option<types::TenantCtx>,
505 ) -> WasmResult<Result<String, types::IfaceError>> {
506 let _ = (component, action, args_json, ctx);
507 tracing::warn!("mcp.exec requested but MCP bridge is removed; returning unavailable");
508 Ok(Err(types::IfaceError::Unavailable))
509 }
510
511 fn state_get(
512 &mut self,
513 key: iface_types::StateKey,
514 ctx: Option<types::TenantCtx>,
515 ) -> WasmResult<Result<String, types::IfaceError>> {
516 let store = match self.state_store_handle() {
517 Ok(store) => store,
518 Err(err) => return Ok(Err(err)),
519 };
520 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
521 Ok(ctx) => ctx,
522 Err(err) => {
523 tracing::warn!(error = %err, "invalid tenant context for state.get");
524 return Ok(Err(types::IfaceError::InvalidArg));
525 }
526 };
527 let key = StoreStateKey::from(key);
528 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
529 Ok(Some(value)) => {
530 let result = if let Some(text) = value.as_str() {
531 text.to_string()
532 } else {
533 serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
534 };
535 Ok(Ok(result))
536 }
537 Ok(None) => Ok(Err(types::IfaceError::NotFound)),
538 Err(err) => {
539 tracing::warn!(error = %err, "state.get failed");
540 Ok(Err(types::IfaceError::Internal))
541 }
542 }
543 }
544
545 fn state_set(
546 &mut self,
547 key: iface_types::StateKey,
548 value_json: String,
549 ctx: Option<types::TenantCtx>,
550 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
551 let store = match self.state_store_handle() {
552 Ok(store) => store,
553 Err(err) => return Ok(Err(err)),
554 };
555 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
556 Ok(ctx) => ctx,
557 Err(err) => {
558 tracing::warn!(error = %err, "invalid tenant context for state.set");
559 return Ok(Err(types::IfaceError::InvalidArg));
560 }
561 };
562 let key = StoreStateKey::from(key);
563 let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
564 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
565 Ok(()) => Ok(Ok(state::OpAck::Ok)),
566 Err(err) => {
567 tracing::warn!(error = %err, "state.set failed");
568 Ok(Err(types::IfaceError::Internal))
569 }
570 }
571 }
572
573 fn session_update(
574 &mut self,
575 cursor: iface_types::SessionCursor,
576 ctx: Option<types::TenantCtx>,
577 ) -> WasmResult<Result<String, types::IfaceError>> {
578 let store = match self.session_store_handle() {
579 Ok(store) => store,
580 Err(err) => return Ok(Err(err)),
581 };
582 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
583 Ok(ctx) => ctx,
584 Err(err) => {
585 tracing::warn!(error = %err, "invalid tenant context for session.update");
586 return Ok(Err(types::IfaceError::InvalidArg));
587 }
588 };
589 let user = match Self::ensure_user(&tenant_ctx) {
590 Ok(user) => user,
591 Err(err) => return Ok(Err(err)),
592 };
593 let flow_id = match Self::ensure_flow(&tenant_ctx) {
594 Ok(flow) => flow,
595 Err(err) => return Ok(Err(err)),
596 };
597 let cursor = Self::cursor_from_iface(cursor);
598 let payload = SessionData {
599 tenant_ctx: tenant_ctx.clone(),
600 flow_id,
601 cursor: cursor.clone(),
602 context_json: serde_json::json!({
603 "node_pointer": cursor.node_pointer,
604 "wait_reason": cursor.wait_reason,
605 "outbox_marker": cursor.outbox_marker,
606 })
607 .to_string(),
608 };
609 if let Some(existing) = tenant_ctx.session_id() {
610 let key = StoreSessionKey::from(existing.to_string());
611 if let Err(err) = store.update_session(&key, payload) {
612 tracing::error!(error = %err, "failed to update session snapshot");
613 return Ok(Err(types::IfaceError::Internal));
614 }
615 return Ok(Ok(existing.to_string()));
616 }
617 match store.find_by_user(&tenant_ctx, &user) {
618 Ok(Some((key, _))) => {
619 if let Err(err) = store.update_session(&key, payload) {
620 tracing::error!(error = %err, "failed to update existing user session");
621 return Ok(Err(types::IfaceError::Internal));
622 }
623 return Ok(Ok(key.to_string()));
624 }
625 Ok(None) => {}
626 Err(err) => {
627 tracing::error!(error = %err, "session lookup failed");
628 return Ok(Err(types::IfaceError::Internal));
629 }
630 }
631 let key = match store.create_session(&tenant_ctx, payload.clone()) {
632 Ok(key) => key,
633 Err(err) => {
634 tracing::error!(error = %err, "failed to create session");
635 return Ok(Err(types::IfaceError::Internal));
636 }
637 };
638 let ctx_with_session = tenant_ctx.with_session(key.to_string());
639 let updated_payload = SessionData {
640 tenant_ctx: ctx_with_session.clone(),
641 ..payload
642 };
643 if let Err(err) = store.update_session(&key, updated_payload) {
644 tracing::warn!(error = %err, "failed to stamp session id after create");
645 }
646 Ok(Ok(key.to_string()))
647 }
648}
649
650impl host_import_v0_2::HostImports for HostState {
651 fn secrets_get(
652 &mut self,
653 key: String,
654 _ctx: Option<LegacyTenantCtx>,
655 ) -> WasmResult<Result<String, LegacyIfaceError>> {
656 Ok(self.get_secret(&key).map_err(|err| {
657 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
658 LegacyIfaceError::Denied
659 }))
660 }
661
662 fn telemetry_emit(
663 &mut self,
664 span_json: String,
665 _ctx: Option<LegacyTenantCtx>,
666 ) -> WasmResult<()> {
667 if let Some(mock) = &self.mocks
668 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
669 {
670 return Ok(());
671 }
672 tracing::info!(span = %span_json, "telemetry emit from pack");
673 Ok(())
674 }
675
676 fn tool_invoke(
677 &mut self,
678 tool: String,
679 action: String,
680 args_json: String,
681 ctx: Option<LegacyTenantCtx>,
682 ) -> WasmResult<Result<String, LegacyIfaceError>> {
683 let _ = (tool, action, args_json, ctx);
684 tracing::warn!("tool invoke requested but MCP bridge is removed; returning unavailable");
685 Ok(Err(LegacyIfaceError::Unavailable))
686 }
687
688 fn http_fetch(
689 &mut self,
690 req: LegacyHttpRequest,
691 _ctx: Option<LegacyTenantCtx>,
692 ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
693 if !self.config.http_enabled {
694 tracing::warn!(url = %req.url, "http fetch denied by policy");
695 return Ok(Err(LegacyIfaceError::Denied));
696 }
697
698 let mut mock_state = None;
699 let raw_body = req.body.clone();
700 if let Some(mock) = &self.mocks
701 && let Ok(meta) = HttpMockRequest::new(
702 &req.method,
703 &req.url,
704 raw_body.as_deref().map(|body| body.as_bytes()),
705 )
706 {
707 match mock.http_begin(&meta) {
708 HttpDecision::Mock(response) => {
709 let http = LegacyHttpResponse::from(&response);
710 return Ok(Ok(http));
711 }
712 HttpDecision::Deny(reason) => {
713 tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
714 return Ok(Err(LegacyIfaceError::Denied));
715 }
716 HttpDecision::Passthrough { record } => {
717 mock_state = Some((meta, record));
718 }
719 }
720 }
721
722 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
723 let mut builder = self.http_client.request(method, &req.url);
724
725 if let Some(headers_json) = req.headers_json.as_ref() {
726 match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
727 Ok(map) => {
728 for (key, value) in map {
729 if let Some(val) = value.as_str()
730 && let Ok(header) =
731 reqwest::header::HeaderName::from_bytes(key.as_bytes())
732 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
733 {
734 builder = builder.header(header, header_value);
735 }
736 }
737 }
738 Err(err) => {
739 tracing::warn!(error = %err, "failed to parse headers for http.fetch");
740 }
741 }
742 }
743
744 if let Some(body) = raw_body.clone() {
745 builder = builder.body(body);
746 }
747
748 let response = match builder.send() {
749 Ok(resp) => resp,
750 Err(err) => {
751 tracing::error!(url = %req.url, error = %err, "http fetch failed");
752 return Ok(Err(LegacyIfaceError::Unavailable));
753 }
754 };
755
756 let status = response.status().as_u16();
757 let headers_map = response
758 .headers()
759 .iter()
760 .map(|(k, v)| {
761 (
762 k.as_str().to_string(),
763 v.to_str().unwrap_or_default().to_string(),
764 )
765 })
766 .collect::<BTreeMap<_, _>>();
767 let headers_json = serde_json::to_string(&headers_map).ok();
768 let body = response.text().ok();
769
770 if let Some((meta, true)) = mock_state.take()
771 && let Some(mock) = &self.mocks
772 {
773 let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
774 mock.http_record(&meta, &recorded);
775 }
776
777 Ok(Ok(LegacyHttpResponse {
778 status,
779 headers_json,
780 body,
781 }))
782 }
783}
784
785impl PackRuntime {
786 #[allow(clippy::too_many_arguments)]
787 pub async fn load(
788 path: impl AsRef<Path>,
789 config: Arc<HostConfig>,
790 mocks: Option<Arc<MockLayer>>,
791 archive_source: Option<&Path>,
792 session_store: Option<DynSessionStore>,
793 state_store: Option<DynStateStore>,
794 wasi_policy: Arc<RunnerWasiPolicy>,
795 secrets: DynSecretsManager,
796 oauth_config: Option<OAuthBrokerConfig>,
797 verify_archive: bool,
798 ) -> Result<Self> {
799 let path = path.as_ref();
800 let (_pack_root, safe_path) = normalize_pack_path(path)?;
801 let is_component = safe_path
802 .extension()
803 .and_then(|ext| ext.to_str())
804 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
805 .unwrap_or(false);
806 let archive_hint_path = if let Some(source) = archive_source {
807 let (_, normalized) = normalize_pack_path(source)?;
808 Some(normalized)
809 } else if is_component {
810 None
811 } else {
812 Some(safe_path.clone())
813 };
814 let archive_hint = archive_hint_path.as_deref();
815 if verify_archive {
816 let verify_target = archive_hint.unwrap_or(&safe_path);
817 verify::verify_pack(verify_target).await?;
818 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
819 }
820 let engine = Engine::default();
821 let wasm_bytes = fs::read(&safe_path).await?;
822 let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
823 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
824 let mut manifest = None;
825 let flows = if let Some(archive_path) = archive_hint {
826 match open_pack(archive_path, SigningPolicy::DevOk) {
827 Ok(pack) => {
828 metadata = PackMetadata::from_manifest(&pack.manifest);
829 manifest = Some(pack.manifest);
830 None
831 }
832 Err(err) => {
833 warn!(error = ?err, pack = %archive_path.display(), "failed to parse pack manifest; falling back to legacy flow cache");
834 match PackFlows::from_archive(archive_path) {
835 Ok(cache) => {
836 metadata = cache.metadata.clone();
837 Some(cache)
838 }
839 Err(err) => {
840 warn!(
841 error = %err,
842 pack = %archive_path.display(),
843 "failed to parse pack flows via greentic-pack; falling back to component exports"
844 );
845 None
846 }
847 }
848 }
849 }
850 } else {
851 None
852 };
853 let components = if let Some(archive_path) = archive_hint {
854 match load_components_from_archive(&engine, archive_path) {
855 Ok(map) => map,
856 Err(err) => {
857 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
858 HashMap::new()
859 }
860 }
861 } else if is_component {
862 let name = safe_path
863 .file_stem()
864 .map(|s| s.to_string_lossy().to_string())
865 .unwrap_or_else(|| "component".to_string());
866 let component = Component::from_binary(&engine, &wasm_bytes)?;
867 let mut map = HashMap::new();
868 map.insert(
869 name.clone(),
870 PackComponent {
871 name,
872 version: metadata.version.clone(),
873 component,
874 },
875 );
876 map
877 } else {
878 HashMap::new()
879 };
880 let http_client = Arc::clone(&HTTP_CLIENT);
881 Ok(Self {
882 path: safe_path,
883 archive_path: archive_hint.map(Path::to_path_buf),
884 config,
885 engine,
886 metadata,
887 manifest,
888 mocks,
889 flows,
890 components,
891 http_client,
892 pre_cache: Mutex::new(HashMap::new()),
893 session_store,
894 state_store,
895 wasi_policy,
896 secrets,
897 oauth_config,
898 })
899 }
900
901 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
902 if let Some(cache) = &self.flows {
903 return Ok(cache.descriptors.clone());
904 }
905 if let Some(manifest) = &self.manifest {
906 let descriptors = manifest
907 .flows
908 .iter()
909 .map(|flow| FlowDescriptor {
910 id: flow.id.clone(),
911 flow_type: flow.kind.clone(),
912 profile: manifest.meta.name.clone(),
913 version: manifest.meta.version.to_string(),
914 description: Some(flow.kind.clone()),
915 })
916 .collect();
917 return Ok(descriptors);
918 }
919 Ok(Vec::new())
920 }
921
922 #[allow(dead_code)]
923 pub async fn run_flow(
924 &self,
925 _flow_id: &str,
926 _input: serde_json::Value,
927 ) -> Result<serde_json::Value> {
928 Ok(serde_json::json!({}))
930 }
931
932 pub async fn invoke_component(
933 &self,
934 component_ref: &str,
935 ctx: ComponentExecCtx,
936 operation: &str,
937 _config_json: Option<String>,
938 input_json: String,
939 ) -> Result<Value> {
940 let pack_component = self
941 .components
942 .get(component_ref)
943 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
944
945 let pre = if let Some(pre) = self.pre_cache.lock().get(component_ref).cloned() {
946 pre
947 } else {
948 let mut linker = Linker::new(&self.engine);
949 register_all(&mut linker, self.oauth_config.is_some())?;
950 add_component_control_to_linker(&mut linker)?;
951 let pre = ComponentPre::new(
952 linker
953 .instantiate_pre(&pack_component.component)
954 .map_err(|err| anyhow!(err))?,
955 )
956 .map_err(|err| anyhow!(err))?;
957 self.pre_cache
958 .lock()
959 .insert(component_ref.to_string(), pre.clone());
960 pre
961 };
962
963 let host_state = HostState::new(
964 Arc::clone(&self.config),
965 Arc::clone(&self.http_client),
966 self.mocks.clone(),
967 self.session_store.clone(),
968 self.state_store.clone(),
969 Arc::clone(&self.secrets),
970 self.oauth_config.clone(),
971 )?;
972 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
973 let mut store = wasmtime::Store::new(&self.engine, store_state);
974 let bindings = pre
975 .instantiate_async(&mut store)
976 .await
977 .map_err(|err| anyhow!(err))?;
978 let node = bindings.greentic_component_node();
979
980 let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
981
982 match result {
983 InvokeResult::Ok(body) => {
984 if body.is_empty() {
985 return Ok(Value::Null);
986 }
987 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
988 }
989 InvokeResult::Err(NodeError {
990 code,
991 message,
992 retryable,
993 backoff_ms,
994 details,
995 }) => {
996 let mut obj = serde_json::Map::new();
997 obj.insert("ok".into(), Value::Bool(false));
998 let mut error = serde_json::Map::new();
999 error.insert("code".into(), Value::String(code));
1000 error.insert("message".into(), Value::String(message));
1001 error.insert("retryable".into(), Value::Bool(retryable));
1002 if let Some(backoff) = backoff_ms {
1003 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1004 }
1005 if let Some(details) = details {
1006 error.insert(
1007 "details".into(),
1008 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1009 );
1010 }
1011 obj.insert("error".into(), Value::Object(error));
1012 Ok(Value::Object(obj))
1013 }
1014 }
1015 }
1016
1017 pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
1018 if let Some(cache) = &self.flows {
1019 return cache
1020 .flows
1021 .get(flow_id)
1022 .cloned()
1023 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1024 }
1025 if let Some(manifest) = &self.manifest {
1026 let entry = manifest
1027 .flows
1028 .iter()
1029 .find(|f| f.id == flow_id)
1030 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1031 let archive_path = self.archive_path.as_ref().unwrap_or(&self.path);
1032 let mut archive = ZipArchive::new(File::open(archive_path)?)?;
1033 match load_flow_doc(&mut archive, entry).and_then(|doc| {
1034 greentic_flow::to_ir(doc.clone())
1035 .with_context(|| format!("failed to build IR for flow {}", doc.id))
1036 }) {
1037 Ok(ir) => return Ok(ir),
1038 Err(err) => {
1039 warn!(
1040 error = %err,
1041 flow_id = flow_id,
1042 "failed to parse flow from archive; falling back to stub IR"
1043 );
1044 return Ok(build_stub_flow_ir(&entry.id, &entry.kind));
1045 }
1046 }
1047 }
1048 bail!("flow '{flow_id}' not available (pack exports disabled)")
1049 }
1050
1051 pub fn metadata(&self) -> &PackMetadata {
1052 &self.metadata
1053 }
1054
1055 pub fn for_component_test(
1056 components: Vec<(String, PathBuf)>,
1057 flows: HashMap<String, FlowIR>,
1058 config: Arc<HostConfig>,
1059 ) -> Result<Self> {
1060 let engine = Engine::default();
1061 let mut component_map = HashMap::new();
1062 for (name, path) in components {
1063 if !path.exists() {
1064 bail!("component artifact missing: {}", path.display());
1065 }
1066 let wasm_bytes = std::fs::read(&path)?;
1067 let component = Component::from_binary(&engine, &wasm_bytes)
1068 .with_context(|| format!("failed to compile component {}", path.display()))?;
1069 component_map.insert(
1070 name.clone(),
1071 PackComponent {
1072 name,
1073 version: "0.0.0".into(),
1074 component,
1075 },
1076 );
1077 }
1078
1079 let descriptors = flows
1080 .iter()
1081 .map(|(id, ir)| FlowDescriptor {
1082 id: id.clone(),
1083 flow_type: ir.flow_type.clone(),
1084 profile: "test".into(),
1085 version: "0.0.0".into(),
1086 description: None,
1087 })
1088 .collect::<Vec<_>>();
1089 let flows_cache = PackFlows {
1090 descriptors: descriptors.clone(),
1091 flows,
1092 metadata: PackMetadata::fallback(Path::new("component-test")),
1093 };
1094
1095 Ok(Self {
1096 path: PathBuf::new(),
1097 archive_path: None,
1098 config,
1099 engine,
1100 metadata: PackMetadata::fallback(Path::new("component-test")),
1101 manifest: None,
1102 mocks: None,
1103 flows: Some(flows_cache),
1104 components: component_map,
1105 http_client: Arc::clone(&HTTP_CLIENT),
1106 pre_cache: Mutex::new(HashMap::new()),
1107 session_store: None,
1108 state_store: None,
1109 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1110 secrets: crate::secrets::default_manager(),
1111 oauth_config: None,
1112 })
1113 }
1114}
1115
1116fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
1117 match err {
1118 LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
1119 LegacyIfaceError::NotFound => types::IfaceError::NotFound,
1120 LegacyIfaceError::Denied => types::IfaceError::Denied,
1121 LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
1122 LegacyIfaceError::Internal => types::IfaceError::Internal,
1123 }
1124}
1125
1126struct PackFlows {
1127 descriptors: Vec<FlowDescriptor>,
1128 flows: HashMap<String, FlowIR>,
1129 metadata: PackMetadata,
1130}
1131
1132impl PackFlows {
1133 fn from_archive(path: &Path) -> Result<Self> {
1134 let pack = open_pack(path, SigningPolicy::DevOk)
1135 .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1136 let file = File::open(path)
1137 .with_context(|| format!("failed to open archive {}", path.display()))?;
1138 let mut archive = ZipArchive::new(file)
1139 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1140 let mut flows = HashMap::new();
1141 let mut descriptors = Vec::new();
1142 for flow in &pack.manifest.flows {
1143 match load_flow_entry(&mut archive, flow, &pack.manifest.meta) {
1144 Ok((descriptor, ir)) => {
1145 flows.insert(descriptor.id.clone(), ir);
1146 descriptors.push(descriptor);
1147 }
1148 Err(err) => {
1149 warn!(
1150 error = %err,
1151 flow_id = flow.id,
1152 "failed to parse flow metadata from archive; using stub flow"
1153 );
1154 let stub = build_stub_flow_ir(&flow.id, &flow.kind);
1155 flows.insert(flow.id.clone(), stub.clone());
1156 descriptors.push(FlowDescriptor {
1157 id: flow.id.clone(),
1158 flow_type: flow.kind.clone(),
1159 profile: pack.manifest.meta.name.clone(),
1160 version: pack.manifest.meta.version.to_string(),
1161 description: Some(flow.kind.clone()),
1162 });
1163 }
1164 }
1165 }
1166
1167 Ok(Self {
1168 metadata: PackMetadata::from_manifest(&pack.manifest),
1169 descriptors,
1170 flows,
1171 })
1172 }
1173}
1174
1175fn load_flow_entry(
1176 archive: &mut ZipArchive<File>,
1177 entry: &greentic_pack::builder::FlowEntry,
1178 meta: &greentic_pack::builder::PackMeta,
1179) -> Result<(FlowDescriptor, FlowIR)> {
1180 let doc = load_flow_doc(archive, entry)?;
1181 let ir = greentic_flow::to_ir(doc.clone())
1182 .with_context(|| format!("failed to build IR for flow {}", doc.id))?;
1183 let descriptor = FlowDescriptor {
1184 id: doc.id.clone(),
1185 flow_type: doc.flow_type.clone(),
1186 profile: meta.name.clone(),
1187 version: meta.version.to_string(),
1188 description: doc
1189 .description
1190 .clone()
1191 .or(doc.title.clone())
1192 .or_else(|| Some(entry.kind.clone())),
1193 };
1194 Ok((descriptor, ir))
1195}
1196
1197fn load_flow_doc(
1198 archive: &mut ZipArchive<File>,
1199 entry: &greentic_pack::builder::FlowEntry,
1200) -> Result<greentic_flow::model::FlowDoc> {
1201 if let Ok(bytes) = read_entry(archive, &entry.file_yaml)
1202 && let Ok(doc) = serde_yaml::from_slice(&bytes)
1203 {
1204 return Ok(normalize_flow_doc(doc));
1205 }
1206 let json_bytes = read_entry(archive, &entry.file_json)
1207 .with_context(|| format!("missing flow document {}", entry.file_json))?;
1208 let doc = serde_json::from_slice(&json_bytes)
1209 .with_context(|| format!("failed to parse {}", entry.file_json))?;
1210 Ok(normalize_flow_doc(doc))
1211}
1212
1213fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1214 let mut file = archive
1215 .by_name(name)
1216 .with_context(|| format!("entry {name} missing from archive"))?;
1217 let mut buf = Vec::new();
1218 file.read_to_end(&mut buf)?;
1219 Ok(buf)
1220}
1221
1222fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1223 for node in doc.nodes.values_mut() {
1224 if node.component.is_empty()
1225 && let Some((component_ref, payload)) = node.raw.iter().next()
1226 {
1227 let (operation, input, config) = infer_component_exec(payload, component_ref);
1228 let mut payload_obj = serde_json::Map::new();
1229 payload_obj.insert("component".into(), Value::String(component_ref.clone()));
1230 payload_obj.insert("operation".into(), Value::String(operation));
1231 payload_obj.insert("input".into(), input);
1232 if let Some(cfg) = config {
1233 payload_obj.insert("config".into(), cfg);
1234 }
1235 node.component = "component.exec".to_string();
1236 node.payload = Value::Object(payload_obj);
1237 }
1238 }
1239 doc
1240}
1241
1242fn infer_component_exec(payload: &Value, component_ref: &str) -> (String, Value, Option<Value>) {
1243 let default_op = if component_ref.starts_with("templating.") {
1244 "render"
1245 } else {
1246 "invoke"
1247 }
1248 .to_string();
1249
1250 if let Value::Object(map) = payload {
1251 let op = map
1252 .get("op")
1253 .or_else(|| map.get("operation"))
1254 .and_then(Value::as_str)
1255 .map(|s| s.to_string())
1256 .unwrap_or_else(|| default_op.clone());
1257
1258 let mut input = map.clone();
1259 let config = input.remove("config");
1260 input.remove("op");
1261 input.remove("operation");
1262 return (op, Value::Object(input), config);
1263 }
1264
1265 (default_op, payload.clone(), None)
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270 use super::*;
1271 use greentic_flow::model::{FlowDoc, Node, Route};
1272 use serde_json::json;
1273 use std::collections::BTreeMap;
1274
1275 #[test]
1276 fn normalizes_raw_component_to_component_exec() {
1277 let mut nodes = BTreeMap::new();
1278 let mut raw = BTreeMap::new();
1279 raw.insert(
1280 "templating.handlebars".into(),
1281 json!({ "template": "Hi {{name}}" }),
1282 );
1283 nodes.insert(
1284 "start".into(),
1285 Node {
1286 raw,
1287 routing: vec![Route {
1288 to: None,
1289 out: Some(true),
1290 }],
1291 ..Default::default()
1292 },
1293 );
1294 let doc = FlowDoc {
1295 id: "welcome".into(),
1296 title: None,
1297 description: None,
1298 flow_type: "messaging".into(),
1299 start: Some("start".into()),
1300 parameters: json!({}),
1301 nodes,
1302 };
1303
1304 let normalized = normalize_flow_doc(doc);
1305 let node = normalized.nodes.get("start").expect("node exists");
1306 assert_eq!(node.component, "component.exec");
1307 assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1308 let payload = node.payload.as_object().expect("payload object");
1309 assert_eq!(
1310 payload.get("component"),
1311 Some(&Value::String("templating.handlebars".into()))
1312 );
1313 assert_eq!(
1314 payload.get("operation"),
1315 Some(&Value::String("render".into()))
1316 );
1317 let input = payload.get("input").unwrap();
1318 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1319 }
1320}
1321
1322fn load_components_from_archive(
1323 engine: &Engine,
1324 path: &Path,
1325) -> Result<HashMap<String, PackComponent>> {
1326 let pack = open_pack(path, SigningPolicy::DevOk)
1327 .map_err(|err| anyhow!("failed to open pack {}: {}", path.display(), err.message))?;
1328 let mut archive = ZipArchive::new(File::open(path)?)?;
1329 let mut components = HashMap::new();
1330 for entry in &pack.manifest.components {
1331 let bytes = read_entry(&mut archive, &entry.file_wasm)
1332 .with_context(|| format!("missing component {}", entry.file_wasm))?;
1333 let component = Component::from_binary(engine, &bytes)
1334 .with_context(|| format!("failed to compile component {}", entry.name))?;
1335 components.insert(
1336 entry.name.clone(),
1337 PackComponent {
1338 name: entry.name.clone(),
1339 version: entry.version.to_string(),
1340 component,
1341 },
1342 );
1343 }
1344 Ok(components)
1345}
1346
1347#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1348pub struct PackMetadata {
1349 pub pack_id: String,
1350 pub version: String,
1351 #[serde(default)]
1352 pub entry_flows: Vec<String>,
1353}
1354
1355impl PackMetadata {
1356 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1357 let parser = Parser::new(0);
1358 for payload in parser.parse_all(bytes) {
1359 let payload = payload.ok()?;
1360 match payload {
1361 Payload::CustomSection(section) => {
1362 if section.name() == "greentic.manifest"
1363 && let Ok(meta) = Self::from_bytes(section.data())
1364 {
1365 return Some(meta);
1366 }
1367 }
1368 Payload::DataSection(reader) => {
1369 for segment in reader.into_iter().flatten() {
1370 if let Ok(meta) = Self::from_bytes(segment.data) {
1371 return Some(meta);
1372 }
1373 }
1374 }
1375 _ => {}
1376 }
1377 }
1378 None
1379 }
1380
1381 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1382 #[derive(Deserialize)]
1383 struct RawManifest {
1384 pack_id: String,
1385 version: String,
1386 #[serde(default)]
1387 entry_flows: Vec<String>,
1388 #[serde(default)]
1389 flows: Vec<RawFlow>,
1390 }
1391
1392 #[derive(Deserialize)]
1393 struct RawFlow {
1394 id: String,
1395 }
1396
1397 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1398 let mut entry_flows = if manifest.entry_flows.is_empty() {
1399 manifest.flows.iter().map(|f| f.id.clone()).collect()
1400 } else {
1401 manifest.entry_flows.clone()
1402 };
1403 entry_flows.retain(|id| !id.is_empty());
1404 Ok(Self {
1405 pack_id: manifest.pack_id,
1406 version: manifest.version,
1407 entry_flows,
1408 })
1409 }
1410
1411 pub fn fallback(path: &Path) -> Self {
1412 let pack_id = path
1413 .file_stem()
1414 .map(|s| s.to_string_lossy().into_owned())
1415 .unwrap_or_else(|| "unknown-pack".to_string());
1416 Self {
1417 pack_id,
1418 version: "0.0.0".to_string(),
1419 entry_flows: Vec::new(),
1420 }
1421 }
1422
1423 pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
1424 let entry_flows = if manifest.meta.entry_flows.is_empty() {
1425 manifest
1426 .flows
1427 .iter()
1428 .map(|flow| flow.id.clone())
1429 .collect::<Vec<_>>()
1430 } else {
1431 manifest.meta.entry_flows.clone()
1432 };
1433 Self {
1434 pack_id: manifest.meta.pack_id.clone(),
1435 version: manifest.meta.version.to_string(),
1436 entry_flows,
1437 }
1438 }
1439}
1440
1441fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
1442 let mut nodes = IndexMap::new();
1443 nodes.insert(
1444 "complete".into(),
1445 NodeIR {
1446 component: "component.exec".into(),
1447 payload_expr: serde_json::json!({
1448 "component": "qa.process",
1449 "operation": "process",
1450 "input": {
1451 "status": "done",
1452 "flow_id": flow_id,
1453 }
1454 }),
1455 routes: vec![RouteIR {
1456 to: None,
1457 out: true,
1458 }],
1459 },
1460 );
1461 FlowIR {
1462 id: flow_id.to_string(),
1463 flow_type: flow_type.to_string(),
1464 start: Some("complete".into()),
1465 parameters: Value::Object(Default::default()),
1466 nodes,
1467 }
1468}
1469
1470impl From<&HttpMockResponse> for LegacyHttpResponse {
1471 fn from(value: &HttpMockResponse) -> Self {
1472 let headers_json = serde_json::to_string(&value.headers).ok();
1473 Self {
1474 status: value.status,
1475 headers_json,
1476 body: value.body.clone(),
1477 }
1478 }
1479}