1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7
8use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
9use crate::component_api::{
10 ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
11};
12use crate::imports::register_all;
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable, WasmResult};
15use anyhow::{Context, Result, anyhow, bail};
16use greentic_interfaces_host::host_import::v0_2 as host_import_v0_2;
17use greentic_interfaces_host::host_import::v0_2::greentic::host_import::imports::{
18 HttpRequest as LegacyHttpRequest, HttpResponse as LegacyHttpResponse,
19 IfaceError as LegacyIfaceError, TenantCtx as LegacyTenantCtx,
20};
21use greentic_interfaces_host::host_import::v0_6::{
22 self as host_import_v0_6, iface_types, state, types,
23};
24use greentic_pack::builder as legacy_pack;
25use greentic_session::SessionKey as StoreSessionKey;
26use greentic_types::{
27 EnvId, Flow, FlowId, SessionCursor as StoreSessionCursor, SessionData,
28 StateKey as StoreStateKey, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
29 decode_pack_manifest,
30};
31use once_cell::sync::Lazy;
32use parking_lot::Mutex;
33use reqwest::blocking::Client as BlockingClient;
34use runner_core::normalize_under_root;
35use serde::{Deserialize, Serialize};
36use serde_cbor;
37use serde_json::{self, Value};
38use tokio::fs;
39use wasmparser::{Parser, Payload};
40use wasmtime::StoreContextMut;
41use zip::ZipArchive;
42
43use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
44use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
45
46use crate::config::HostConfig;
47use crate::secrets::{DynSecretsManager, read_secret_blocking};
48use crate::storage::state::STATE_PREFIX;
49use crate::storage::{DynSessionStore, DynStateStore};
50use crate::verify;
51use crate::wasi::RunnerWasiPolicy;
52use tracing::warn;
53use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
54
55use greentic_flow::model::FlowDoc;
56
57#[allow(dead_code)]
58pub struct PackRuntime {
59 path: PathBuf,
61 archive_path: Option<PathBuf>,
63 config: Arc<HostConfig>,
64 engine: Engine,
65 metadata: PackMetadata,
66 manifest: Option<greentic_types::PackManifest>,
67 legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
68 mocks: Option<Arc<MockLayer>>,
69 flows: Option<PackFlows>,
70 components: HashMap<String, PackComponent>,
71 http_client: Arc<BlockingClient>,
72 pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
73 session_store: Option<DynSessionStore>,
74 state_store: Option<DynStateStore>,
75 wasi_policy: Arc<RunnerWasiPolicy>,
76 secrets: DynSecretsManager,
77 oauth_config: Option<OAuthBrokerConfig>,
78}
79
80struct PackComponent {
81 #[allow(dead_code)]
82 name: String,
83 #[allow(dead_code)]
84 version: String,
85 component: Component,
86}
87
88fn build_blocking_client() -> BlockingClient {
89 std::thread::spawn(|| {
90 BlockingClient::builder()
91 .no_proxy()
92 .build()
93 .expect("blocking client")
94 })
95 .join()
96 .expect("client build thread panicked")
97}
98
99fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
100 let (root, candidate) = if path.is_absolute() {
101 let parent = path
102 .parent()
103 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
104 let root = parent
105 .canonicalize()
106 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
107 let file = path
108 .file_name()
109 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
110 (root, PathBuf::from(file))
111 } else {
112 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
113 let base = if let Some(parent) = path.parent() {
114 cwd.join(parent)
115 } else {
116 cwd
117 };
118 let root = base
119 .canonicalize()
120 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
121 let file = path
122 .file_name()
123 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
124 (root, PathBuf::from(file))
125 };
126 let safe = normalize_under_root(&root, &candidate)?;
127 Ok((root, safe))
128}
129
130static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct FlowDescriptor {
134 pub id: String,
135 #[serde(rename = "type")]
136 pub flow_type: String,
137 pub profile: String,
138 pub version: String,
139 #[serde(default)]
140 pub description: Option<String>,
141}
142
143pub struct HostState {
144 config: Arc<HostConfig>,
145 http_client: Arc<BlockingClient>,
146 default_env: String,
147 session_store: Option<DynSessionStore>,
148 state_store: Option<DynStateStore>,
149 mocks: Option<Arc<MockLayer>>,
150 secrets: DynSecretsManager,
151 oauth_config: Option<OAuthBrokerConfig>,
152 oauth_host: OAuthBrokerHost,
153}
154
155impl HostState {
156 #[allow(clippy::default_constructed_unit_structs)]
157 pub fn new(
158 config: Arc<HostConfig>,
159 http_client: Arc<BlockingClient>,
160 mocks: Option<Arc<MockLayer>>,
161 session_store: Option<DynSessionStore>,
162 state_store: Option<DynStateStore>,
163 secrets: DynSecretsManager,
164 oauth_config: Option<OAuthBrokerConfig>,
165 ) -> Result<Self> {
166 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
167 Ok(Self {
168 config,
169 http_client,
170 default_env,
171 session_store,
172 state_store,
173 mocks,
174 secrets,
175 oauth_config,
176 oauth_host: OAuthBrokerHost::default(),
177 })
178 }
179
180 pub fn get_secret(&self, key: &str) -> Result<String> {
181 if !self.config.secrets_policy.is_allowed(key) {
182 bail!("secret {key} is not permitted by bindings policy");
183 }
184 if let Some(mock) = &self.mocks
185 && let Some(value) = mock.secrets_lookup(key)
186 {
187 return Ok(value);
188 }
189 let bytes = read_secret_blocking(&self.secrets, key)
190 .context("failed to read secret from manager")?;
191 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
192 Ok(value)
193 }
194
195 fn tenant_ctx_from_v6(&self, ctx: Option<types::TenantCtx>) -> Result<TypesTenantCtx> {
196 let tenant_raw = ctx
197 .as_ref()
198 .map(|ctx| ctx.tenant.clone())
199 .unwrap_or_else(|| self.config.tenant.clone());
200 let tenant_id = TenantId::from_str(&tenant_raw)
201 .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
202 let env_id = EnvId::from_str(&self.default_env)
203 .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
204 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
205 if let Some(ctx) = ctx {
206 if let Some(team) = ctx.team {
207 let team_id =
208 TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
209 tenant_ctx = tenant_ctx.with_team(Some(team_id));
210 }
211 if let Some(user) = ctx.user {
212 let user_id =
213 UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
214 tenant_ctx = tenant_ctx.with_user(Some(user_id));
215 }
216 if let Some(flow) = ctx.flow_id {
217 tenant_ctx = tenant_ctx.with_flow(flow);
218 }
219 if let Some(node) = ctx.node_id {
220 tenant_ctx = tenant_ctx.with_node(node);
221 }
222 if let Some(provider) = ctx.provider_id {
223 tenant_ctx = tenant_ctx.with_provider(provider);
224 }
225 if let Some(session) = ctx.session_id {
226 tenant_ctx = tenant_ctx.with_session(session);
227 }
228 tenant_ctx.trace_id = ctx.trace_id;
229 }
230 Ok(tenant_ctx)
231 }
232
233 fn session_store_handle(&self) -> Result<DynSessionStore, types::IfaceError> {
234 self.session_store
235 .as_ref()
236 .cloned()
237 .ok_or(types::IfaceError::Unavailable)
238 }
239
240 fn state_store_handle(&self) -> Result<DynStateStore, types::IfaceError> {
241 self.state_store
242 .as_ref()
243 .cloned()
244 .ok_or(types::IfaceError::Unavailable)
245 }
246
247 fn ensure_user(ctx: &TypesTenantCtx) -> Result<UserId, types::IfaceError> {
248 ctx.user_id
249 .clone()
250 .or_else(|| ctx.user.clone())
251 .ok_or(types::IfaceError::InvalidArg)
252 }
253
254 fn ensure_flow(ctx: &TypesTenantCtx) -> Result<FlowId, types::IfaceError> {
255 let flow = ctx.flow_id().ok_or(types::IfaceError::InvalidArg)?;
256 FlowId::from_str(flow).map_err(|_| types::IfaceError::InvalidArg)
257 }
258
259 fn cursor_from_iface(cursor: iface_types::SessionCursor) -> StoreSessionCursor {
260 let mut store_cursor = StoreSessionCursor::new(cursor.node_pointer);
261 if let Some(reason) = cursor.wait_reason {
262 store_cursor = store_cursor.with_wait_reason(reason);
263 }
264 if let Some(marker) = cursor.outbox_marker {
265 store_cursor = store_cursor.with_outbox_marker(marker);
266 }
267 store_cursor
268 }
269}
270
271enum ManifestLoad {
272 New {
273 manifest: Box<greentic_types::PackManifest>,
274 flows: PackFlows,
275 },
276 Legacy {
277 manifest: Box<legacy_pack::PackManifest>,
278 flows: PackFlows,
279 },
280}
281
282fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
283 let mut archive = ZipArchive::new(File::open(path)?)
284 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
285 let bytes = read_entry(&mut archive, "manifest.cbor")
286 .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
287 match decode_pack_manifest(&bytes) {
288 Ok(manifest) => {
289 let cache = PackFlows::from_manifest(manifest.clone());
290 Ok(ManifestLoad::New {
291 manifest: Box::new(manifest),
292 flows: cache,
293 })
294 }
295 Err(err) => {
296 tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
297 let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
299 .context("failed to decode legacy pack manifest from manifest.cbor")?;
300 let flows = load_legacy_flows(&mut archive, &legacy)?;
301 Ok(ManifestLoad::Legacy {
302 manifest: Box::new(legacy),
303 flows,
304 })
305 }
306 }
307}
308
309fn load_legacy_flows(
310 archive: &mut ZipArchive<File>,
311 manifest: &legacy_pack::PackManifest,
312) -> Result<PackFlows> {
313 let mut flows = HashMap::new();
314 let mut descriptors = Vec::new();
315
316 for entry in &manifest.flows {
317 let bytes = read_entry(archive, &entry.file_json)
318 .with_context(|| format!("missing flow json {}", entry.file_json))?;
319 let doc: FlowDoc = serde_json::from_slice(&bytes)
320 .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
321 let normalized = normalize_flow_doc(doc);
322 let flow_ir = flow_doc_to_ir(normalized)?;
323 let flow = flow_ir_to_flow(flow_ir)?;
324
325 descriptors.push(FlowDescriptor {
326 id: entry.id.clone(),
327 flow_type: entry.kind.clone(),
328 profile: manifest.meta.pack_id.clone(),
329 version: manifest.meta.version.to_string(),
330 description: None,
331 });
332 flows.insert(entry.id.clone(), flow);
333 }
334
335 let mut entry_flows = manifest.meta.entry_flows.clone();
336 if entry_flows.is_empty() {
337 entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
338 }
339 let metadata = PackMetadata {
340 pack_id: manifest.meta.pack_id.clone(),
341 version: manifest.meta.version.to_string(),
342 entry_flows,
343 };
344
345 Ok(PackFlows {
346 descriptors,
347 flows,
348 metadata,
349 })
350}
351
352pub struct ComponentState {
353 host: HostState,
354 wasi_ctx: WasiCtx,
355 resource_table: ResourceTable,
356}
357
358impl ComponentState {
359 pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
360 let wasi_ctx = policy
361 .instantiate()
362 .context("failed to build WASI context")?;
363 Ok(Self {
364 host,
365 wasi_ctx,
366 resource_table: ResourceTable::new(),
367 })
368 }
369
370 fn host_mut(&mut self) -> &mut HostState {
371 &mut self.host
372 }
373}
374
375impl control::Host for ComponentState {
376 fn should_cancel(&mut self) -> bool {
377 false
378 }
379
380 fn yield_now(&mut self) {
381 }
383}
384
385fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
386 let mut inst = linker.instance("greentic:component/control@0.4.0")?;
387 inst.func_wrap(
388 "should-cancel",
389 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
390 let host = caller.data_mut();
391 Ok((ComponentControlHost::should_cancel(host),))
392 },
393 )?;
394 inst.func_wrap(
395 "yield-now",
396 |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
397 let host = caller.data_mut();
398 ComponentControlHost::yield_now(host);
399 Ok(())
400 },
401 )?;
402 Ok(())
403}
404
405impl OAuthHostContext for ComponentState {
406 fn tenant_id(&self) -> &str {
407 &self.host.config.tenant
408 }
409
410 fn env(&self) -> &str {
411 &self.host.default_env
412 }
413
414 fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
415 &mut self.host.oauth_host
416 }
417
418 fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
419 self.host.oauth_config.as_ref()
420 }
421}
422
423impl WasiView for ComponentState {
424 fn ctx(&mut self) -> WasiCtxView<'_> {
425 WasiCtxView {
426 ctx: &mut self.wasi_ctx,
427 table: &mut self.resource_table,
428 }
429 }
430}
431
432#[allow(unsafe_code)]
433unsafe impl Send for ComponentState {}
434#[allow(unsafe_code)]
435unsafe impl Sync for ComponentState {}
436
437impl host_import_v0_6::HostImports for ComponentState {
438 fn secrets_get(
439 &mut self,
440 key: String,
441 ctx: Option<types::TenantCtx>,
442 ) -> WasmResult<Result<String, types::IfaceError>> {
443 host_import_v0_6::HostImports::secrets_get(self.host_mut(), key, ctx)
444 }
445
446 fn telemetry_emit(
447 &mut self,
448 span_json: String,
449 ctx: Option<types::TenantCtx>,
450 ) -> WasmResult<()> {
451 host_import_v0_6::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
452 }
453
454 fn http_fetch(
455 &mut self,
456 req: host_import_v0_6::http::HttpRequest,
457 ctx: Option<types::TenantCtx>,
458 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
459 host_import_v0_6::HostImports::http_fetch(self.host_mut(), req, ctx)
460 }
461
462 fn mcp_exec(
463 &mut self,
464 component: String,
465 action: String,
466 args_json: String,
467 ctx: Option<types::TenantCtx>,
468 ) -> WasmResult<Result<String, types::IfaceError>> {
469 host_import_v0_6::HostImports::mcp_exec(self.host_mut(), component, action, args_json, ctx)
470 }
471
472 fn state_get(
473 &mut self,
474 key: iface_types::StateKey,
475 ctx: Option<types::TenantCtx>,
476 ) -> WasmResult<Result<String, types::IfaceError>> {
477 host_import_v0_6::HostImports::state_get(self.host_mut(), key, ctx)
478 }
479
480 fn state_set(
481 &mut self,
482 key: iface_types::StateKey,
483 value_json: String,
484 ctx: Option<types::TenantCtx>,
485 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
486 host_import_v0_6::HostImports::state_set(self.host_mut(), key, value_json, ctx)
487 }
488
489 fn session_update(
490 &mut self,
491 cursor: iface_types::SessionCursor,
492 ctx: Option<types::TenantCtx>,
493 ) -> WasmResult<Result<String, types::IfaceError>> {
494 host_import_v0_6::HostImports::session_update(self.host_mut(), cursor, ctx)
495 }
496}
497
498impl host_import_v0_2::HostImports for ComponentState {
499 fn secrets_get(
500 &mut self,
501 key: String,
502 ctx: Option<LegacyTenantCtx>,
503 ) -> WasmResult<Result<String, LegacyIfaceError>> {
504 host_import_v0_2::HostImports::secrets_get(self.host_mut(), key, ctx)
505 }
506
507 fn telemetry_emit(
508 &mut self,
509 span_json: String,
510 ctx: Option<LegacyTenantCtx>,
511 ) -> WasmResult<()> {
512 host_import_v0_2::HostImports::telemetry_emit(self.host_mut(), span_json, ctx)
513 }
514
515 fn tool_invoke(
516 &mut self,
517 tool: String,
518 action: String,
519 args_json: String,
520 ctx: Option<LegacyTenantCtx>,
521 ) -> WasmResult<Result<String, LegacyIfaceError>> {
522 host_import_v0_2::HostImports::tool_invoke(self.host_mut(), tool, action, args_json, ctx)
523 }
524
525 fn http_fetch(
526 &mut self,
527 req: host_import_v0_2::greentic::host_import::imports::HttpRequest,
528 ctx: Option<host_import_v0_2::greentic::host_import::imports::TenantCtx>,
529 ) -> WasmResult<
530 Result<
531 host_import_v0_2::greentic::host_import::imports::HttpResponse,
532 host_import_v0_2::greentic::host_import::imports::IfaceError,
533 >,
534 > {
535 host_import_v0_2::HostImports::http_fetch(self.host_mut(), req, ctx)
536 }
537}
538
539impl host_import_v0_6::HostImports for HostState {
540 fn secrets_get(
541 &mut self,
542 key: String,
543 _ctx: Option<types::TenantCtx>,
544 ) -> WasmResult<Result<String, types::IfaceError>> {
545 Ok(self.get_secret(&key).map_err(|err| {
546 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
547 types::IfaceError::Denied
548 }))
549 }
550
551 fn telemetry_emit(
552 &mut self,
553 span_json: String,
554 _ctx: Option<types::TenantCtx>,
555 ) -> WasmResult<()> {
556 if let Some(mock) = &self.mocks
557 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
558 {
559 return Ok(());
560 }
561 tracing::info!(span = %span_json, "telemetry emit from pack");
562 Ok(())
563 }
564
565 fn http_fetch(
566 &mut self,
567 req: host_import_v0_6::http::HttpRequest,
568 _ctx: Option<types::TenantCtx>,
569 ) -> WasmResult<Result<host_import_v0_6::http::HttpResponse, types::IfaceError>> {
570 let legacy_req = LegacyHttpRequest {
571 method: req.method,
572 url: req.url,
573 headers_json: req.headers_json,
574 body: req.body,
575 };
576 match host_import_v0_2::HostImports::http_fetch(self, legacy_req, None)? {
577 Ok(resp) => Ok(Ok(host_import_v0_6::http::HttpResponse {
578 status: resp.status,
579 headers_json: resp.headers_json,
580 body: resp.body,
581 })),
582 Err(err) => Ok(Err(map_legacy_error(err))),
583 }
584 }
585
586 fn mcp_exec(
587 &mut self,
588 component: String,
589 action: String,
590 args_json: String,
591 ctx: Option<types::TenantCtx>,
592 ) -> WasmResult<Result<String, types::IfaceError>> {
593 let _ = (component, action, args_json, ctx);
594 tracing::warn!("mcp.exec requested but MCP bridge is removed; returning unavailable");
595 Ok(Err(types::IfaceError::Unavailable))
596 }
597
598 fn state_get(
599 &mut self,
600 key: iface_types::StateKey,
601 ctx: Option<types::TenantCtx>,
602 ) -> WasmResult<Result<String, types::IfaceError>> {
603 let store = match self.state_store_handle() {
604 Ok(store) => store,
605 Err(err) => return Ok(Err(err)),
606 };
607 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
608 Ok(ctx) => ctx,
609 Err(err) => {
610 tracing::warn!(error = %err, "invalid tenant context for state.get");
611 return Ok(Err(types::IfaceError::InvalidArg));
612 }
613 };
614 let key = StoreStateKey::from(key);
615 match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
616 Ok(Some(value)) => {
617 let result = if let Some(text) = value.as_str() {
618 text.to_string()
619 } else {
620 serde_json::to_string(&value).unwrap_or_else(|_| value.to_string())
621 };
622 Ok(Ok(result))
623 }
624 Ok(None) => Ok(Err(types::IfaceError::NotFound)),
625 Err(err) => {
626 tracing::warn!(error = %err, "state.get failed");
627 Ok(Err(types::IfaceError::Internal))
628 }
629 }
630 }
631
632 fn state_set(
633 &mut self,
634 key: iface_types::StateKey,
635 value_json: String,
636 ctx: Option<types::TenantCtx>,
637 ) -> WasmResult<Result<state::OpAck, types::IfaceError>> {
638 let store = match self.state_store_handle() {
639 Ok(store) => store,
640 Err(err) => return Ok(Err(err)),
641 };
642 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
643 Ok(ctx) => ctx,
644 Err(err) => {
645 tracing::warn!(error = %err, "invalid tenant context for state.set");
646 return Ok(Err(types::IfaceError::InvalidArg));
647 }
648 };
649 let key = StoreStateKey::from(key);
650 let value = serde_json::from_str(&value_json).unwrap_or(Value::String(value_json));
651 match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
652 Ok(()) => Ok(Ok(state::OpAck::Ok)),
653 Err(err) => {
654 tracing::warn!(error = %err, "state.set failed");
655 Ok(Err(types::IfaceError::Internal))
656 }
657 }
658 }
659
660 fn session_update(
661 &mut self,
662 cursor: iface_types::SessionCursor,
663 ctx: Option<types::TenantCtx>,
664 ) -> WasmResult<Result<String, types::IfaceError>> {
665 let store = match self.session_store_handle() {
666 Ok(store) => store,
667 Err(err) => return Ok(Err(err)),
668 };
669 let tenant_ctx = match self.tenant_ctx_from_v6(ctx) {
670 Ok(ctx) => ctx,
671 Err(err) => {
672 tracing::warn!(error = %err, "invalid tenant context for session.update");
673 return Ok(Err(types::IfaceError::InvalidArg));
674 }
675 };
676 let user = match Self::ensure_user(&tenant_ctx) {
677 Ok(user) => user,
678 Err(err) => return Ok(Err(err)),
679 };
680 let flow_id = match Self::ensure_flow(&tenant_ctx) {
681 Ok(flow) => flow,
682 Err(err) => return Ok(Err(err)),
683 };
684 let cursor = Self::cursor_from_iface(cursor);
685 let payload = SessionData {
686 tenant_ctx: tenant_ctx.clone(),
687 flow_id,
688 cursor: cursor.clone(),
689 context_json: serde_json::json!({
690 "node_pointer": cursor.node_pointer,
691 "wait_reason": cursor.wait_reason,
692 "outbox_marker": cursor.outbox_marker,
693 })
694 .to_string(),
695 };
696 if let Some(existing) = tenant_ctx.session_id() {
697 let key = StoreSessionKey::from(existing.to_string());
698 if let Err(err) = store.update_session(&key, payload) {
699 tracing::error!(error = %err, "failed to update session snapshot");
700 return Ok(Err(types::IfaceError::Internal));
701 }
702 return Ok(Ok(existing.to_string()));
703 }
704 match store.find_by_user(&tenant_ctx, &user) {
705 Ok(Some((key, _))) => {
706 if let Err(err) = store.update_session(&key, payload) {
707 tracing::error!(error = %err, "failed to update existing user session");
708 return Ok(Err(types::IfaceError::Internal));
709 }
710 return Ok(Ok(key.to_string()));
711 }
712 Ok(None) => {}
713 Err(err) => {
714 tracing::error!(error = %err, "session lookup failed");
715 return Ok(Err(types::IfaceError::Internal));
716 }
717 }
718 let key = match store.create_session(&tenant_ctx, payload.clone()) {
719 Ok(key) => key,
720 Err(err) => {
721 tracing::error!(error = %err, "failed to create session");
722 return Ok(Err(types::IfaceError::Internal));
723 }
724 };
725 let ctx_with_session = tenant_ctx.with_session(key.to_string());
726 let updated_payload = SessionData {
727 tenant_ctx: ctx_with_session.clone(),
728 ..payload
729 };
730 if let Err(err) = store.update_session(&key, updated_payload) {
731 tracing::warn!(error = %err, "failed to stamp session id after create");
732 }
733 Ok(Ok(key.to_string()))
734 }
735}
736
737impl host_import_v0_2::HostImports for HostState {
738 fn secrets_get(
739 &mut self,
740 key: String,
741 _ctx: Option<LegacyTenantCtx>,
742 ) -> WasmResult<Result<String, LegacyIfaceError>> {
743 Ok(self.get_secret(&key).map_err(|err| {
744 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
745 LegacyIfaceError::Denied
746 }))
747 }
748
749 fn telemetry_emit(
750 &mut self,
751 span_json: String,
752 _ctx: Option<LegacyTenantCtx>,
753 ) -> WasmResult<()> {
754 if let Some(mock) = &self.mocks
755 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
756 {
757 return Ok(());
758 }
759 tracing::info!(span = %span_json, "telemetry emit from pack");
760 Ok(())
761 }
762
763 fn tool_invoke(
764 &mut self,
765 tool: String,
766 action: String,
767 args_json: String,
768 ctx: Option<LegacyTenantCtx>,
769 ) -> WasmResult<Result<String, LegacyIfaceError>> {
770 let _ = (tool, action, args_json, ctx);
771 tracing::warn!("tool invoke requested but MCP bridge is removed; returning unavailable");
772 Ok(Err(LegacyIfaceError::Unavailable))
773 }
774
775 fn http_fetch(
776 &mut self,
777 req: LegacyHttpRequest,
778 _ctx: Option<LegacyTenantCtx>,
779 ) -> WasmResult<Result<LegacyHttpResponse, LegacyIfaceError>> {
780 if !self.config.http_enabled {
781 tracing::warn!(url = %req.url, "http fetch denied by policy");
782 return Ok(Err(LegacyIfaceError::Denied));
783 }
784
785 let mut mock_state = None;
786 let raw_body = req.body.clone();
787 if let Some(mock) = &self.mocks
788 && let Ok(meta) = HttpMockRequest::new(
789 &req.method,
790 &req.url,
791 raw_body.as_deref().map(|body| body.as_bytes()),
792 )
793 {
794 match mock.http_begin(&meta) {
795 HttpDecision::Mock(response) => {
796 let http = LegacyHttpResponse::from(&response);
797 return Ok(Ok(http));
798 }
799 HttpDecision::Deny(reason) => {
800 tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
801 return Ok(Err(LegacyIfaceError::Denied));
802 }
803 HttpDecision::Passthrough { record } => {
804 mock_state = Some((meta, record));
805 }
806 }
807 }
808
809 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
810 let mut builder = self.http_client.request(method, &req.url);
811
812 if let Some(headers_json) = req.headers_json.as_ref() {
813 match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
814 Ok(map) => {
815 for (key, value) in map {
816 if let Some(val) = value.as_str()
817 && let Ok(header) =
818 reqwest::header::HeaderName::from_bytes(key.as_bytes())
819 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
820 {
821 builder = builder.header(header, header_value);
822 }
823 }
824 }
825 Err(err) => {
826 tracing::warn!(error = %err, "failed to parse headers for http.fetch");
827 }
828 }
829 }
830
831 if let Some(body) = raw_body.clone() {
832 builder = builder.body(body);
833 }
834
835 let response = match builder.send() {
836 Ok(resp) => resp,
837 Err(err) => {
838 tracing::error!(url = %req.url, error = %err, "http fetch failed");
839 return Ok(Err(LegacyIfaceError::Unavailable));
840 }
841 };
842
843 let status = response.status().as_u16();
844 let headers_map = response
845 .headers()
846 .iter()
847 .map(|(k, v)| {
848 (
849 k.as_str().to_string(),
850 v.to_str().unwrap_or_default().to_string(),
851 )
852 })
853 .collect::<BTreeMap<_, _>>();
854 let headers_json = serde_json::to_string(&headers_map).ok();
855 let body = response.text().ok();
856
857 if let Some((meta, true)) = mock_state.take()
858 && let Some(mock) = &self.mocks
859 {
860 let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
861 mock.http_record(&meta, &recorded);
862 }
863
864 Ok(Ok(LegacyHttpResponse {
865 status,
866 headers_json,
867 body,
868 }))
869 }
870}
871
872impl PackRuntime {
873 #[allow(clippy::too_many_arguments)]
874 pub async fn load(
875 path: impl AsRef<Path>,
876 config: Arc<HostConfig>,
877 mocks: Option<Arc<MockLayer>>,
878 archive_source: Option<&Path>,
879 session_store: Option<DynSessionStore>,
880 state_store: Option<DynStateStore>,
881 wasi_policy: Arc<RunnerWasiPolicy>,
882 secrets: DynSecretsManager,
883 oauth_config: Option<OAuthBrokerConfig>,
884 verify_archive: bool,
885 ) -> Result<Self> {
886 let path = path.as_ref();
887 let (_pack_root, safe_path) = normalize_pack_path(path)?;
888 let is_component = safe_path
889 .extension()
890 .and_then(|ext| ext.to_str())
891 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
892 .unwrap_or(false);
893 let archive_hint_path = if let Some(source) = archive_source {
894 let (_, normalized) = normalize_pack_path(source)?;
895 Some(normalized)
896 } else if is_component {
897 None
898 } else {
899 Some(safe_path.clone())
900 };
901 let archive_hint = archive_hint_path.as_deref();
902 if verify_archive {
903 let verify_target = archive_hint.unwrap_or(&safe_path);
904 verify::verify_pack(verify_target).await?;
905 tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
906 }
907 let engine = Engine::default();
908 let wasm_bytes = fs::read(&safe_path).await?;
909 let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
910 .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
911 let mut manifest = None;
912 let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
913 let flows = if let Some(archive_path) = archive_hint {
914 match load_manifest_and_flows(archive_path) {
915 Ok(ManifestLoad::New {
916 manifest: m,
917 flows: cache,
918 }) => {
919 metadata = cache.metadata.clone();
920 manifest = Some(*m);
921 Some(cache)
922 }
923 Ok(ManifestLoad::Legacy {
924 manifest: m,
925 flows: cache,
926 }) => {
927 metadata = cache.metadata.clone();
928 legacy_manifest = Some(m);
929 Some(cache)
930 }
931 Err(err) => {
932 warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
933 None
934 }
935 }
936 } else {
937 None
938 };
939 let components = if let Some(archive_path) = archive_hint {
940 if let Some(new_manifest) = manifest.as_ref() {
941 match load_components_from_archive(&engine, archive_path, Some(new_manifest)) {
942 Ok(map) => map,
943 Err(err) => {
944 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
945 HashMap::new()
946 }
947 }
948 } else if let Some(legacy) = legacy_manifest.as_ref() {
949 match load_legacy_components_from_archive(&engine, archive_path, legacy) {
950 Ok(map) => map,
951 Err(err) => {
952 warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
953 HashMap::new()
954 }
955 }
956 } else {
957 HashMap::new()
958 }
959 } else if is_component {
960 let name = safe_path
961 .file_stem()
962 .map(|s| s.to_string_lossy().to_string())
963 .unwrap_or_else(|| "component".to_string());
964 let component = Component::from_binary(&engine, &wasm_bytes)?;
965 let mut map = HashMap::new();
966 map.insert(
967 name.clone(),
968 PackComponent {
969 name,
970 version: metadata.version.clone(),
971 component,
972 },
973 );
974 map
975 } else {
976 HashMap::new()
977 };
978 let http_client = Arc::clone(&HTTP_CLIENT);
979 Ok(Self {
980 path: safe_path,
981 archive_path: archive_hint.map(Path::to_path_buf),
982 config,
983 engine,
984 metadata,
985 manifest,
986 legacy_manifest,
987 mocks,
988 flows,
989 components,
990 http_client,
991 pre_cache: Mutex::new(HashMap::new()),
992 session_store,
993 state_store,
994 wasi_policy,
995 secrets,
996 oauth_config,
997 })
998 }
999
1000 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1001 if let Some(cache) = &self.flows {
1002 return Ok(cache.descriptors.clone());
1003 }
1004 if let Some(manifest) = &self.manifest {
1005 let descriptors = manifest
1006 .flows
1007 .iter()
1008 .map(|flow| FlowDescriptor {
1009 id: flow.id.as_str().to_string(),
1010 flow_type: flow_kind_to_str(flow.kind).to_string(),
1011 profile: manifest.pack_id.as_str().to_string(),
1012 version: manifest.version.to_string(),
1013 description: None,
1014 })
1015 .collect();
1016 return Ok(descriptors);
1017 }
1018 Ok(Vec::new())
1019 }
1020
1021 #[allow(dead_code)]
1022 pub async fn run_flow(
1023 &self,
1024 _flow_id: &str,
1025 _input: serde_json::Value,
1026 ) -> Result<serde_json::Value> {
1027 Ok(serde_json::json!({}))
1029 }
1030
1031 pub async fn invoke_component(
1032 &self,
1033 component_ref: &str,
1034 ctx: ComponentExecCtx,
1035 operation: &str,
1036 _config_json: Option<String>,
1037 input_json: String,
1038 ) -> Result<Value> {
1039 let pack_component = self
1040 .components
1041 .get(component_ref)
1042 .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1043
1044 let pre = if let Some(pre) = self.pre_cache.lock().get(component_ref).cloned() {
1045 pre
1046 } else {
1047 let mut linker = Linker::new(&self.engine);
1048 register_all(&mut linker, self.oauth_config.is_some())?;
1049 add_component_control_to_linker(&mut linker)?;
1050 let pre = ComponentPre::new(
1051 linker
1052 .instantiate_pre(&pack_component.component)
1053 .map_err(|err| anyhow!(err))?,
1054 )
1055 .map_err(|err| anyhow!(err))?;
1056 self.pre_cache
1057 .lock()
1058 .insert(component_ref.to_string(), pre.clone());
1059 pre
1060 };
1061
1062 let host_state = HostState::new(
1063 Arc::clone(&self.config),
1064 Arc::clone(&self.http_client),
1065 self.mocks.clone(),
1066 self.session_store.clone(),
1067 self.state_store.clone(),
1068 Arc::clone(&self.secrets),
1069 self.oauth_config.clone(),
1070 )?;
1071 let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1072 let mut store = wasmtime::Store::new(&self.engine, store_state);
1073 let bindings = pre
1074 .instantiate_async(&mut store)
1075 .await
1076 .map_err(|err| anyhow!(err))?;
1077 let node = bindings.greentic_component_node();
1078
1079 let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1080
1081 match result {
1082 InvokeResult::Ok(body) => {
1083 if body.is_empty() {
1084 return Ok(Value::Null);
1085 }
1086 serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1087 }
1088 InvokeResult::Err(NodeError {
1089 code,
1090 message,
1091 retryable,
1092 backoff_ms,
1093 details,
1094 }) => {
1095 let mut obj = serde_json::Map::new();
1096 obj.insert("ok".into(), Value::Bool(false));
1097 let mut error = serde_json::Map::new();
1098 error.insert("code".into(), Value::String(code));
1099 error.insert("message".into(), Value::String(message));
1100 error.insert("retryable".into(), Value::Bool(retryable));
1101 if let Some(backoff) = backoff_ms {
1102 error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1103 }
1104 if let Some(details) = details {
1105 error.insert(
1106 "details".into(),
1107 serde_json::from_str(&details).unwrap_or(Value::String(details)),
1108 );
1109 }
1110 obj.insert("error".into(), Value::Object(error));
1111 Ok(Value::Object(obj))
1112 }
1113 }
1114 }
1115
1116 pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1117 if let Some(cache) = &self.flows {
1118 return cache
1119 .flows
1120 .get(flow_id)
1121 .cloned()
1122 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1123 }
1124 if let Some(manifest) = &self.manifest {
1125 let entry = manifest
1126 .flows
1127 .iter()
1128 .find(|f| f.id.as_str() == flow_id)
1129 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1130 return Ok(entry.flow.clone());
1131 }
1132 bail!("flow '{flow_id}' not available (pack exports disabled)")
1133 }
1134
1135 pub fn metadata(&self) -> &PackMetadata {
1136 &self.metadata
1137 }
1138
1139 pub fn for_component_test(
1140 components: Vec<(String, PathBuf)>,
1141 flows: HashMap<String, FlowIR>,
1142 config: Arc<HostConfig>,
1143 ) -> Result<Self> {
1144 let engine = Engine::default();
1145 let mut component_map = HashMap::new();
1146 for (name, path) in components {
1147 if !path.exists() {
1148 bail!("component artifact missing: {}", path.display());
1149 }
1150 let wasm_bytes = std::fs::read(&path)?;
1151 let component = Component::from_binary(&engine, &wasm_bytes)
1152 .with_context(|| format!("failed to compile component {}", path.display()))?;
1153 component_map.insert(
1154 name.clone(),
1155 PackComponent {
1156 name,
1157 version: "0.0.0".into(),
1158 component,
1159 },
1160 );
1161 }
1162
1163 let mut flow_map = HashMap::new();
1164 let mut descriptors = Vec::new();
1165 for (id, ir) in flows {
1166 let flow_type = ir.flow_type.clone();
1167 let flow = flow_ir_to_flow(ir)?;
1168 flow_map.insert(id.clone(), flow);
1169 descriptors.push(FlowDescriptor {
1170 id: id.clone(),
1171 flow_type,
1172 profile: "test".into(),
1173 version: "0.0.0".into(),
1174 description: None,
1175 });
1176 }
1177 let flows_cache = PackFlows {
1178 descriptors: descriptors.clone(),
1179 flows: flow_map,
1180 metadata: PackMetadata::fallback(Path::new("component-test")),
1181 };
1182
1183 Ok(Self {
1184 path: PathBuf::new(),
1185 archive_path: None,
1186 config,
1187 engine,
1188 metadata: PackMetadata::fallback(Path::new("component-test")),
1189 manifest: None,
1190 legacy_manifest: None,
1191 mocks: None,
1192 flows: Some(flows_cache),
1193 components: component_map,
1194 http_client: Arc::clone(&HTTP_CLIENT),
1195 pre_cache: Mutex::new(HashMap::new()),
1196 session_store: None,
1197 state_store: None,
1198 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1199 secrets: crate::secrets::default_manager(),
1200 oauth_config: None,
1201 })
1202 }
1203}
1204
1205fn map_legacy_error(err: LegacyIfaceError) -> types::IfaceError {
1206 match err {
1207 LegacyIfaceError::InvalidArg => types::IfaceError::InvalidArg,
1208 LegacyIfaceError::NotFound => types::IfaceError::NotFound,
1209 LegacyIfaceError::Denied => types::IfaceError::Denied,
1210 LegacyIfaceError::Unavailable => types::IfaceError::Unavailable,
1211 LegacyIfaceError::Internal => types::IfaceError::Internal,
1212 }
1213}
1214
1215struct PackFlows {
1216 descriptors: Vec<FlowDescriptor>,
1217 flows: HashMap<String, Flow>,
1218 metadata: PackMetadata,
1219}
1220
1221impl PackFlows {
1222 fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1223 let descriptors = manifest
1224 .flows
1225 .iter()
1226 .map(|entry| FlowDescriptor {
1227 id: entry.id.as_str().to_string(),
1228 flow_type: flow_kind_to_str(entry.kind).to_string(),
1229 profile: manifest.pack_id.as_str().to_string(),
1230 version: manifest.version.to_string(),
1231 description: None,
1232 })
1233 .collect();
1234 let mut flows = HashMap::new();
1235 for entry in &manifest.flows {
1236 flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1237 }
1238 Self {
1239 metadata: PackMetadata::from_manifest(&manifest),
1240 descriptors,
1241 flows,
1242 }
1243 }
1244}
1245
1246fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1247 match kind {
1248 greentic_types::FlowKind::Messaging => "messaging",
1249 greentic_types::FlowKind::Event => "event",
1250 greentic_types::FlowKind::ComponentConfig => "component-config",
1251 greentic_types::FlowKind::Job => "job",
1252 greentic_types::FlowKind::Http => "http",
1253 }
1254}
1255
1256fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1257 let mut file = archive
1258 .by_name(name)
1259 .with_context(|| format!("entry {name} missing from archive"))?;
1260 let mut buf = Vec::new();
1261 file.read_to_end(&mut buf)?;
1262 Ok(buf)
1263}
1264
1265fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1266 for node in doc.nodes.values_mut() {
1267 if node.component.is_empty()
1268 && let Some((component_ref, payload)) = node.raw.iter().next()
1269 {
1270 if component_ref.starts_with("emit.") {
1271 node.component = component_ref.clone();
1272 node.payload = payload.clone();
1273 node.raw.clear();
1274 continue;
1275 }
1276 let (target_component, operation, input, config) =
1277 infer_component_exec(payload, component_ref);
1278 let mut payload_obj = serde_json::Map::new();
1279 payload_obj.insert("component".into(), Value::String(target_component));
1281 payload_obj.insert("operation".into(), Value::String(operation));
1282 payload_obj.insert("input".into(), input);
1283 if let Some(cfg) = config {
1284 payload_obj.insert("config".into(), cfg);
1285 }
1286 node.component = "component.exec".to_string();
1287 node.payload = Value::Object(payload_obj);
1288 }
1289 }
1290 doc
1291}
1292
1293fn infer_component_exec(
1294 payload: &Value,
1295 component_ref: &str,
1296) -> (String, String, Value, Option<Value>) {
1297 let default_op = if component_ref.starts_with("templating.") {
1298 "render"
1299 } else {
1300 "invoke"
1301 }
1302 .to_string();
1303
1304 if let Value::Object(map) = payload {
1305 let op = map
1306 .get("op")
1307 .or_else(|| map.get("operation"))
1308 .and_then(Value::as_str)
1309 .map(|s| s.to_string())
1310 .unwrap_or_else(|| default_op.clone());
1311
1312 let mut input = map.clone();
1313 let config = input.remove("config");
1314 let component = input
1315 .get("component")
1316 .or_else(|| input.get("component_ref"))
1317 .and_then(Value::as_str)
1318 .map(|s| s.to_string())
1319 .unwrap_or_else(|| component_ref.to_string());
1320 input.remove("component");
1321 input.remove("component_ref");
1322 input.remove("op");
1323 input.remove("operation");
1324 return (component, op, Value::Object(input), config);
1325 }
1326
1327 (component_ref.to_string(), default_op, payload.clone(), None)
1328}
1329
1330#[cfg(test)]
1331mod tests {
1332 use super::*;
1333 use greentic_flow::model::{FlowDoc, NodeDoc};
1334 use serde_json::json;
1335 use std::collections::BTreeMap;
1336
1337 #[test]
1338 fn normalizes_raw_component_to_component_exec() {
1339 let mut nodes = BTreeMap::new();
1340 let mut raw = BTreeMap::new();
1341 raw.insert(
1342 "templating.handlebars".into(),
1343 json!({ "template": "Hi {{name}}" }),
1344 );
1345 nodes.insert(
1346 "start".into(),
1347 NodeDoc {
1348 raw,
1349 routing: json!([{"out": true}]),
1350 ..Default::default()
1351 },
1352 );
1353 let doc = FlowDoc {
1354 id: "welcome".into(),
1355 title: None,
1356 description: None,
1357 flow_type: "messaging".into(),
1358 start: Some("start".into()),
1359 parameters: json!({}),
1360 tags: Vec::new(),
1361 entrypoints: BTreeMap::new(),
1362 nodes,
1363 };
1364
1365 let normalized = normalize_flow_doc(doc);
1366 let node = normalized.nodes.get("start").expect("node exists");
1367 assert_eq!(node.component, "component.exec");
1368 assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1369 let payload = node.payload.as_object().expect("payload object");
1370 assert_eq!(
1371 payload.get("component"),
1372 Some(&Value::String("templating.handlebars".into()))
1373 );
1374 assert_eq!(
1375 payload.get("operation"),
1376 Some(&Value::String("render".into()))
1377 );
1378 let input = payload.get("input").unwrap();
1379 assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1380 }
1381}
1382
1383fn load_components_from_archive(
1384 engine: &Engine,
1385 path: &Path,
1386 manifest: Option<&greentic_types::PackManifest>,
1387) -> Result<HashMap<String, PackComponent>> {
1388 let mut archive = ZipArchive::new(File::open(path)?)
1389 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1390 let mut components = HashMap::new();
1391 if let Some(manifest) = manifest {
1392 for entry in &manifest.components {
1393 let file_name = format!("components/{}.wasm", entry.id.as_str());
1394 let bytes = read_entry(&mut archive, &file_name)
1395 .with_context(|| format!("missing component {}", file_name))?;
1396 let component = Component::from_binary(engine, &bytes)
1397 .with_context(|| format!("failed to compile component {}", entry.id.as_str()))?;
1398 components.insert(
1399 entry.id.as_str().to_string(),
1400 PackComponent {
1401 name: entry.id.as_str().to_string(),
1402 version: entry.version.to_string(),
1403 component,
1404 },
1405 );
1406 }
1407 }
1408 Ok(components)
1409}
1410
1411fn load_legacy_components_from_archive(
1412 engine: &Engine,
1413 path: &Path,
1414 manifest: &legacy_pack::PackManifest,
1415) -> Result<HashMap<String, PackComponent>> {
1416 let mut archive = ZipArchive::new(File::open(path)?)
1417 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1418 let mut components = HashMap::new();
1419 for entry in &manifest.components {
1420 let bytes = read_entry(&mut archive, &entry.file_wasm)
1421 .with_context(|| format!("missing component {}", entry.file_wasm))?;
1422 let component = Component::from_binary(engine, &bytes)
1423 .with_context(|| format!("failed to compile component {}", entry.name))?;
1424 components.insert(
1425 entry.name.clone(),
1426 PackComponent {
1427 name: entry.name.clone(),
1428 version: entry.version.to_string(),
1429 component,
1430 },
1431 );
1432 }
1433 Ok(components)
1434}
1435
1436#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1437pub struct PackMetadata {
1438 pub pack_id: String,
1439 pub version: String,
1440 #[serde(default)]
1441 pub entry_flows: Vec<String>,
1442}
1443
1444impl PackMetadata {
1445 fn from_wasm(bytes: &[u8]) -> Option<Self> {
1446 let parser = Parser::new(0);
1447 for payload in parser.parse_all(bytes) {
1448 let payload = payload.ok()?;
1449 match payload {
1450 Payload::CustomSection(section) => {
1451 if section.name() == "greentic.manifest"
1452 && let Ok(meta) = Self::from_bytes(section.data())
1453 {
1454 return Some(meta);
1455 }
1456 }
1457 Payload::DataSection(reader) => {
1458 for segment in reader.into_iter().flatten() {
1459 if let Ok(meta) = Self::from_bytes(segment.data) {
1460 return Some(meta);
1461 }
1462 }
1463 }
1464 _ => {}
1465 }
1466 }
1467 None
1468 }
1469
1470 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1471 #[derive(Deserialize)]
1472 struct RawManifest {
1473 pack_id: String,
1474 version: String,
1475 #[serde(default)]
1476 entry_flows: Vec<String>,
1477 #[serde(default)]
1478 flows: Vec<RawFlow>,
1479 }
1480
1481 #[derive(Deserialize)]
1482 struct RawFlow {
1483 id: String,
1484 }
1485
1486 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1487 let mut entry_flows = if manifest.entry_flows.is_empty() {
1488 manifest.flows.iter().map(|f| f.id.clone()).collect()
1489 } else {
1490 manifest.entry_flows.clone()
1491 };
1492 entry_flows.retain(|id| !id.is_empty());
1493 Ok(Self {
1494 pack_id: manifest.pack_id,
1495 version: manifest.version,
1496 entry_flows,
1497 })
1498 }
1499
1500 pub fn fallback(path: &Path) -> Self {
1501 let pack_id = path
1502 .file_stem()
1503 .map(|s| s.to_string_lossy().into_owned())
1504 .unwrap_or_else(|| "unknown-pack".to_string());
1505 Self {
1506 pack_id,
1507 version: "0.0.0".to_string(),
1508 entry_flows: Vec::new(),
1509 }
1510 }
1511
1512 pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
1513 let entry_flows = manifest
1514 .flows
1515 .iter()
1516 .map(|flow| flow.id.as_str().to_string())
1517 .collect::<Vec<_>>();
1518 Self {
1519 pack_id: manifest.pack_id.as_str().to_string(),
1520 version: manifest.version.to_string(),
1521 entry_flows,
1522 }
1523 }
1524}
1525
1526impl From<&HttpMockResponse> for LegacyHttpResponse {
1527 fn from(value: &HttpMockResponse) -> Self {
1528 let headers_json = serde_json::to_string(&value.headers).ok();
1529 Self {
1530 status: value.status,
1531 headers_json,
1532 body: value.body.clone(),
1533 }
1534 }
1535}