1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5#[cfg(feature = "mcp")]
6use std::str::FromStr;
7use std::sync::Arc;
8
9use crate::runtime_wasmtime::{Component, Engine, Linker, Store, WasmResult};
10use anyhow::{Context, Result, anyhow, bail};
11use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
12use greentic_interfaces::host_import_v0_2::greentic::host_import::imports::{
13 HttpRequest, HttpResponse, IfaceError, TenantCtx,
14};
15use greentic_interfaces::pack_export_v0_2;
16use greentic_interfaces::pack_export_v0_2::exports::greentic::pack_export::exports::FlowInfo;
17#[cfg(feature = "mcp")]
18use greentic_mcp::{ExecConfig, ExecError, ExecRequest};
19#[cfg(feature = "mcp")]
20use greentic_types::{EnvId, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId};
21use indexmap::IndexMap;
22use reqwest::blocking::Client as BlockingClient;
23use serde::{Deserialize, Serialize};
24use serde_cbor;
25use serde_json::{self, Value, json};
26use serde_yaml_bw as serde_yaml;
27use tokio::fs;
28use wasmparser::{Parser, Payload};
29use zip::ZipArchive;
30
31use crate::imports;
32use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
33
34use crate::config::HostConfig;
35use crate::verify;
36
37pub struct PackRuntime {
38 path: PathBuf,
39 config: Arc<HostConfig>,
40 engine: Engine,
41 component: Option<Component>,
42 metadata: PackMetadata,
43 mocks: Option<Arc<MockLayer>>,
44 archive: Option<ArchiveFlows>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct FlowDescriptor {
49 pub id: String,
50 #[serde(rename = "type")]
51 pub flow_type: String,
52 pub profile: String,
53 pub version: String,
54 #[serde(default)]
55 pub description: Option<String>,
56}
57
58pub struct HostState {
59 config: Arc<HostConfig>,
60 http_client: BlockingClient,
61 #[cfg(feature = "mcp")]
62 exec_config: Option<ExecConfig>,
63 #[cfg(feature = "mcp")]
64 default_env: String,
65 mocks: Option<Arc<MockLayer>>,
66}
67
68impl HostState {
69 pub fn new(config: Arc<HostConfig>, mocks: Option<Arc<MockLayer>>) -> Result<Self> {
70 let http_client = BlockingClient::builder().build()?;
71 #[cfg(feature = "mcp")]
72 let exec_config = config.mcp_exec_config().ok();
73 #[cfg(feature = "mcp")]
74 let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
75 Ok(Self {
76 config,
77 http_client,
78 #[cfg(feature = "mcp")]
79 exec_config,
80 #[cfg(feature = "mcp")]
81 default_env,
82 mocks,
83 })
84 }
85
86 pub fn get_secret(&self, key: &str) -> Result<String> {
87 if !self.config.secrets_policy.is_allowed(key) {
88 bail!("secret {key} is not permitted by bindings policy");
89 }
90 if let Some(mock) = &self.mocks
91 && let Some(value) = mock.secrets_lookup(key)
92 {
93 return Ok(value);
94 }
95 if let Ok(value) = std::env::var(key) {
96 return Ok(value);
97 }
98 bail!("secret {key} not found in environment");
99 }
100}
101
102impl greentic_interfaces::host_import_v0_2::HostImports for HostState {
103 fn secrets_get(
104 &mut self,
105 key: String,
106 _ctx: Option<TenantCtx>,
107 ) -> WasmResult<Result<String, IfaceError>> {
108 Ok(self.get_secret(&key).map_err(|err| {
109 tracing::warn!(secret = %key, error = %err, "secret lookup denied");
110 IfaceError::Denied
111 }))
112 }
113
114 fn telemetry_emit(&mut self, span_json: String, _ctx: Option<TenantCtx>) -> WasmResult<()> {
115 if let Some(mock) = &self.mocks
116 && mock.telemetry_drain(&[("span_json", span_json.as_str())])
117 {
118 return Ok(());
119 }
120 tracing::info!(span = %span_json, "telemetry emit from pack");
121 Ok(())
122 }
123
124 fn tool_invoke(
125 &mut self,
126 tool: String,
127 action: String,
128 args_json: String,
129 ctx: Option<TenantCtx>,
130 ) -> WasmResult<Result<String, IfaceError>> {
131 #[cfg(not(feature = "mcp"))]
132 {
133 let _ = (tool, action, args_json, ctx);
134 tracing::warn!("tool invoke requested but crate built without `mcp` feature");
135 return Ok(Err(IfaceError::Unavailable));
136 }
137
138 #[cfg(feature = "mcp")]
139 {
140 let exec_config = match &self.exec_config {
141 Some(cfg) => cfg.clone(),
142 None => {
143 tracing::warn!(%tool, %action, "exec config unavailable for tool invoke");
144 return Ok(Err(IfaceError::Unavailable));
145 }
146 };
147
148 let args: Value = match serde_json::from_str(&args_json) {
149 Ok(value) => value,
150 Err(err) => {
151 tracing::warn!(error = %err, "invalid args for tool invoke");
152 return Ok(Err(IfaceError::InvalidArg));
153 }
154 };
155
156 let tenant = ctx.map(|ctx| map_tenant_ctx(ctx, &self.default_env));
157
158 let request = ExecRequest {
159 component: tool.clone(),
160 action: action.clone(),
161 args,
162 tenant,
163 };
164
165 if let Some(mock) = &self.mocks
166 && let Some(result) = mock.tool_short_circuit(&tool, &action)
167 {
168 return match result.and_then(|value| {
169 serde_json::to_string(&value)
170 .map_err(|err| anyhow!("failed to serialise mock tool output: {err}"))
171 }) {
172 Ok(body) => Ok(Ok(body)),
173 Err(err) => {
174 tracing::error!(error = %err, "mock tool execution failed");
175 Ok(Err(IfaceError::Internal))
176 }
177 };
178 }
179
180 match greentic_mcp::exec(request, &exec_config) {
181 Ok(value) => match serde_json::to_string(&value) {
182 Ok(body) => Ok(Ok(body)),
183 Err(err) => {
184 tracing::error!(error = %err, "failed to serialise tool result");
185 Ok(Err(IfaceError::Internal))
186 }
187 },
188 Err(err) => {
189 tracing::warn!(%tool, %action, error = %err, "tool invoke failed");
190 let iface_err = match err {
191 ExecError::NotFound { .. } => IfaceError::NotFound,
192 ExecError::Tool { .. } => IfaceError::Denied,
193 _ => IfaceError::Unavailable,
194 };
195 Ok(Err(iface_err))
196 }
197 }
198 }
199 }
200
201 fn http_fetch(
202 &mut self,
203 req: HttpRequest,
204 _ctx: Option<TenantCtx>,
205 ) -> WasmResult<Result<HttpResponse, IfaceError>> {
206 if !self.config.http_enabled {
207 tracing::warn!(url = %req.url, "http fetch denied by policy");
208 return Ok(Err(IfaceError::Denied));
209 }
210
211 let mut mock_state = None;
212 let raw_body = req.body.clone();
213 if let Some(mock) = &self.mocks
214 && let Ok(meta) = HttpMockRequest::new(
215 &req.method,
216 &req.url,
217 raw_body.as_deref().map(|body| body.as_bytes()),
218 )
219 {
220 match mock.http_begin(&meta) {
221 HttpDecision::Mock(response) => {
222 let http = HttpResponse::from(&response);
223 return Ok(Ok(http));
224 }
225 HttpDecision::Deny(reason) => {
226 tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
227 return Ok(Err(IfaceError::Denied));
228 }
229 HttpDecision::Passthrough { record } => {
230 mock_state = Some((meta, record));
231 }
232 }
233 }
234
235 let method = req.method.parse().unwrap_or(reqwest::Method::GET);
236 let mut builder = self.http_client.request(method, &req.url);
237
238 if let Some(headers_json) = req.headers_json.as_ref() {
239 match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
240 Ok(map) => {
241 for (key, value) in map {
242 if let Some(val) = value.as_str()
243 && let Ok(header) =
244 reqwest::header::HeaderName::from_bytes(key.as_bytes())
245 && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
246 {
247 builder = builder.header(header, header_value);
248 }
249 }
250 }
251 Err(err) => {
252 tracing::warn!(error = %err, "failed to parse headers for http.fetch");
253 }
254 }
255 }
256
257 if let Some(body) = raw_body.clone() {
258 builder = builder.body(body);
259 }
260
261 let response = match builder.send() {
262 Ok(resp) => resp,
263 Err(err) => {
264 tracing::error!(url = %req.url, error = %err, "http fetch failed");
265 return Ok(Err(IfaceError::Unavailable));
266 }
267 };
268
269 let status = response.status().as_u16();
270 let headers_map = response
271 .headers()
272 .iter()
273 .map(|(k, v)| {
274 (
275 k.as_str().to_string(),
276 v.to_str().unwrap_or_default().to_string(),
277 )
278 })
279 .collect::<BTreeMap<_, _>>();
280 let headers_json = serde_json::to_string(&headers_map).ok();
281 let body = response.text().ok();
282
283 if let Some((meta, true)) = mock_state.take()
284 && let Some(mock) = &self.mocks
285 {
286 let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
287 mock.http_record(&meta, &recorded);
288 }
289
290 Ok(Ok(HttpResponse {
291 status,
292 headers_json,
293 body,
294 }))
295 }
296}
297
298impl PackRuntime {
299 pub async fn load(
300 path: impl AsRef<Path>,
301 config: Arc<HostConfig>,
302 mocks: Option<Arc<MockLayer>>,
303 archive_source: Option<&Path>,
304 ) -> Result<Self> {
305 let path = path.as_ref();
306 verify::verify_pack(path).await?;
307 tracing::info!(pack_path = %path.display(), "pack verification complete");
308 let engine = Engine::default();
309 let wasm_bytes = fs::read(path).await?;
310 let mut metadata =
311 PackMetadata::from_wasm(&wasm_bytes).unwrap_or_else(|| PackMetadata::fallback(path));
312 let mut archive = None;
313 let component = match Component::from_file(&engine, path) {
314 Ok(component) => Some(component),
315 Err(err) => {
316 if let Some(archive_path) = archive_source {
317 tracing::warn!(
318 error = %err,
319 pack = %archive_path.display(),
320 "component load failed, using manifest archive"
321 );
322 let archive_data = ArchiveFlows::from_archive(archive_path)?;
323 metadata = archive_data.metadata.clone();
324 archive = Some(archive_data);
325 None
326 } else {
327 return Err(err);
328 }
329 }
330 };
331 Ok(Self {
332 path: path.to_path_buf(),
333 config,
334 engine,
335 component,
336 metadata,
337 mocks,
338 archive,
339 })
340 }
341
342 pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
343 if let Some(archive) = &self.archive {
344 return Ok(archive.descriptors.clone());
345 }
346 tracing::trace!(
347 tenant = %self.config.tenant,
348 pack_path = %self.path.display(),
349 "listing flows from pack"
350 );
351 let component = self
352 .component
353 .as_ref()
354 .ok_or_else(|| anyhow!("pack component unavailable"))?;
355 let mut store = Store::new(
356 &self.engine,
357 HostState::new(Arc::clone(&self.config), self.mocks.clone())?,
358 );
359 let mut linker = Linker::new(&self.engine);
360 imports::register_all(&mut linker)?;
361 let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
362 let exports = bindings.greentic_pack_export_exports();
363 let flows_raw = match exports.call_list_flows(&mut store)? {
364 Ok(flows) => flows,
365 Err(err) => {
366 bail!("pack list_flows failed: {err:?}");
367 }
368 };
369 let flows = flows_raw
370 .into_iter()
371 .map(|flow: FlowInfo| {
372 let profile = flow.profile.clone();
373 let version = flow.version.clone();
374 FlowDescriptor {
375 id: flow.id,
376 flow_type: flow.flow_type,
377 profile,
378 version,
379 description: Some(format!("{}@{}", flow.profile, flow.version)),
380 }
381 })
382 .collect();
383 Ok(flows)
384 }
385
386 #[allow(dead_code)]
387 pub async fn run_flow(
388 &self,
389 _flow_id: &str,
390 _input: serde_json::Value,
391 ) -> Result<serde_json::Value> {
392 Ok(serde_json::json!({}))
394 }
395
396 pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
397 if let Some(archive) = &self.archive {
398 return archive
399 .flows
400 .get(flow_id)
401 .cloned()
402 .ok_or_else(|| anyhow!("flow '{flow_id}' not found in archive"));
403 }
404 let component = self
405 .component
406 .as_ref()
407 .ok_or_else(|| anyhow!("pack component unavailable"))?;
408 let mut store = Store::new(
409 &self.engine,
410 HostState::new(Arc::clone(&self.config), self.mocks.clone())?,
411 );
412 let mut linker = Linker::new(&self.engine);
413 imports::register_all(&mut linker)?;
414 let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
415 let exports = bindings.greentic_pack_export_exports();
416 let flow_name = flow_id.to_string();
417 let metadata = match exports.call_flow_metadata(&mut store, &flow_name)? {
418 Ok(doc) => doc,
419 Err(err) => bail!("pack flow_metadata({flow_id}) failed: {err:?}"),
420 };
421 let flow_doc: greentic_flow::model::FlowDoc = serde_json::from_str(&metadata)
422 .or_else(|_| serde_yaml::from_str(&metadata))
423 .with_context(|| format!("failed to parse flow metadata for {flow_id}"))?;
424 let ir = greentic_flow::to_ir(flow_doc)?;
425 Ok(ir)
426 }
427
428 pub fn metadata(&self) -> &PackMetadata {
429 &self.metadata
430 }
431}
432
433#[cfg(feature = "mcp")]
434fn map_tenant_ctx(ctx: TenantCtx, default_env: &str) -> TypesTenantCtx {
435 let env = ctx
436 .deployment
437 .runtime
438 .unwrap_or_else(|| default_env.to_string());
439
440 let env_id = EnvId::from_str(env.as_str()).expect("invalid env id");
441 let tenant_id = TenantId::from_str(ctx.tenant.as_str()).expect("invalid tenant id");
442 let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
443 tenant_ctx = tenant_ctx.with_team(
444 ctx.team
445 .as_ref()
446 .and_then(|team| TeamId::from_str(team.as_str()).ok()),
447 );
448 tenant_ctx = tenant_ctx.with_user(
449 ctx.user
450 .as_ref()
451 .and_then(|user| UserId::from_str(user.as_str()).ok()),
452 );
453 tenant_ctx.trace_id = ctx.trace_id;
454 tenant_ctx
455}
456
457struct ArchiveFlows {
458 descriptors: Vec<FlowDescriptor>,
459 flows: HashMap<String, FlowIR>,
460 metadata: PackMetadata,
461}
462
463impl ArchiveFlows {
464 fn from_archive(path: &Path) -> Result<Self> {
465 let file = File::open(path)
466 .with_context(|| format!("failed to open archive {}", path.display()))?;
467 let mut archive = ZipArchive::new(file)
468 .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
469 let manifest_bytes = read_entry(&mut archive, "manifest.cbor")?;
470 let manifest: greentic_pack::builder::PackManifest =
471 serde_cbor::from_slice(&manifest_bytes).context("manifest.cbor is invalid")?;
472
473 let mut flows = HashMap::new();
474 let mut descriptors = Vec::new();
475 for flow in &manifest.flows {
476 let ir = build_stub_flow_ir(&flow.id, &flow.kind);
477 flows.insert(flow.id.clone(), ir);
478 descriptors.push(FlowDescriptor {
479 id: flow.id.clone(),
480 flow_type: flow.kind.clone(),
481 profile: manifest.meta.name.clone(),
482 version: manifest.meta.version.to_string(),
483 description: Some(flow.kind.clone()),
484 });
485 }
486
487 Ok(Self {
488 metadata: PackMetadata::from_manifest(&manifest),
489 descriptors,
490 flows,
491 })
492 }
493}
494
495fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
496 let mut file = archive
497 .by_name(name)
498 .with_context(|| format!("entry {name} missing from archive"))?;
499 let mut buf = Vec::new();
500 file.read_to_end(&mut buf)?;
501 Ok(buf)
502}
503
504fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
505 let mut nodes = IndexMap::new();
506 nodes.insert(
507 "complete".into(),
508 NodeIR {
509 component: "qa.process".into(),
510 payload_expr: json!({
511 "status": "done",
512 "flow_id": flow_id,
513 }),
514 routes: vec![RouteIR {
515 to: None,
516 out: true,
517 }],
518 },
519 );
520 FlowIR {
521 id: flow_id.to_string(),
522 flow_type: flow_type.to_string(),
523 start: Some("complete".into()),
524 parameters: Value::Object(Default::default()),
525 nodes,
526 }
527}
528
529#[derive(Clone, Debug, Default, Serialize, Deserialize)]
530pub struct PackMetadata {
531 pub pack_id: String,
532 pub version: String,
533 #[serde(default)]
534 pub entry_flows: Vec<String>,
535}
536
537impl PackMetadata {
538 fn from_wasm(bytes: &[u8]) -> Option<Self> {
539 let parser = Parser::new(0);
540 for payload in parser.parse_all(bytes) {
541 let payload = payload.ok()?;
542 match payload {
543 Payload::CustomSection(section) => {
544 if section.name() == "greentic.manifest"
545 && let Ok(meta) = Self::from_bytes(section.data())
546 {
547 return Some(meta);
548 }
549 }
550 Payload::DataSection(reader) => {
551 for segment in reader.into_iter().flatten() {
552 if let Ok(meta) = Self::from_bytes(segment.data) {
553 return Some(meta);
554 }
555 }
556 }
557 _ => {}
558 }
559 }
560 None
561 }
562
563 fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
564 #[derive(Deserialize)]
565 struct RawManifest {
566 pack_id: String,
567 version: String,
568 #[serde(default)]
569 entry_flows: Vec<String>,
570 #[serde(default)]
571 flows: Vec<RawFlow>,
572 }
573
574 #[derive(Deserialize)]
575 struct RawFlow {
576 id: String,
577 }
578
579 let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
580 let mut entry_flows = if manifest.entry_flows.is_empty() {
581 manifest.flows.iter().map(|f| f.id.clone()).collect()
582 } else {
583 manifest.entry_flows.clone()
584 };
585 entry_flows.retain(|id| !id.is_empty());
586 Ok(Self {
587 pack_id: manifest.pack_id,
588 version: manifest.version,
589 entry_flows,
590 })
591 }
592
593 pub fn fallback(path: &Path) -> Self {
594 let pack_id = path
595 .file_stem()
596 .map(|s| s.to_string_lossy().into_owned())
597 .unwrap_or_else(|| "unknown-pack".to_string());
598 Self {
599 pack_id,
600 version: "0.0.0".to_string(),
601 entry_flows: Vec::new(),
602 }
603 }
604
605 pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
606 let entry_flows = if manifest.meta.entry_flows.is_empty() {
607 manifest
608 .flows
609 .iter()
610 .map(|flow| flow.id.clone())
611 .collect::<Vec<_>>()
612 } else {
613 manifest.meta.entry_flows.clone()
614 };
615 Self {
616 pack_id: manifest.meta.pack_id.clone(),
617 version: manifest.meta.version.to_string(),
618 entry_flows,
619 }
620 }
621}
622
623impl From<&HttpMockResponse> for HttpResponse {
624 fn from(value: &HttpMockResponse) -> Self {
625 let headers_json = serde_json::to_string(&value.headers).ok();
626 Self {
627 status: value.status,
628 headers_json,
629 body: value.body.clone(),
630 }
631 }
632}