greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5#[cfg(feature = "mcp")]
6use std::str::FromStr;
7use std::sync::Arc;
8
9use crate::runtime_wasmtime::{Component, Engine, Linker, Store, WasmResult};
10use anyhow::{Context, Result, anyhow, bail};
11use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
12use greentic_interfaces::host_import_v0_2::greentic::host_import::imports::{
13    HttpRequest, HttpResponse, IfaceError, TenantCtx,
14};
15use greentic_interfaces::pack_export_v0_2;
16use greentic_interfaces::pack_export_v0_2::exports::greentic::pack_export::exports::FlowInfo;
17#[cfg(feature = "mcp")]
18use greentic_mcp::{ExecConfig, ExecError, ExecRequest};
19#[cfg(feature = "mcp")]
20use greentic_types::{EnvId, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId};
21use indexmap::IndexMap;
22use reqwest::blocking::Client as BlockingClient;
23use serde::{Deserialize, Serialize};
24use serde_cbor;
25use serde_json::{self, Value, json};
26use serde_yaml_bw as serde_yaml;
27use tokio::fs;
28use wasmparser::{Parser, Payload};
29use zip::ZipArchive;
30
31use crate::imports;
32use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
33
34use crate::config::HostConfig;
35use crate::verify;
36
37pub struct PackRuntime {
38    path: PathBuf,
39    config: Arc<HostConfig>,
40    engine: Engine,
41    component: Option<Component>,
42    metadata: PackMetadata,
43    mocks: Option<Arc<MockLayer>>,
44    archive: Option<ArchiveFlows>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct FlowDescriptor {
49    pub id: String,
50    #[serde(rename = "type")]
51    pub flow_type: String,
52    pub profile: String,
53    pub version: String,
54    #[serde(default)]
55    pub description: Option<String>,
56}
57
58pub struct HostState {
59    config: Arc<HostConfig>,
60    http_client: BlockingClient,
61    #[cfg(feature = "mcp")]
62    exec_config: Option<ExecConfig>,
63    #[cfg(feature = "mcp")]
64    default_env: String,
65    mocks: Option<Arc<MockLayer>>,
66}
67
68impl HostState {
69    pub fn new(config: Arc<HostConfig>, mocks: Option<Arc<MockLayer>>) -> Result<Self> {
70        let http_client = BlockingClient::builder().build()?;
71        #[cfg(feature = "mcp")]
72        let exec_config = config.mcp_exec_config().ok();
73        #[cfg(feature = "mcp")]
74        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
75        Ok(Self {
76            config,
77            http_client,
78            #[cfg(feature = "mcp")]
79            exec_config,
80            #[cfg(feature = "mcp")]
81            default_env,
82            mocks,
83        })
84    }
85
86    pub fn get_secret(&self, key: &str) -> Result<String> {
87        if !self.config.secrets_policy.is_allowed(key) {
88            bail!("secret {key} is not permitted by bindings policy");
89        }
90        if let Some(mock) = &self.mocks
91            && let Some(value) = mock.secrets_lookup(key)
92        {
93            return Ok(value);
94        }
95        if let Ok(value) = std::env::var(key) {
96            return Ok(value);
97        }
98        bail!("secret {key} not found in environment");
99    }
100}
101
102impl greentic_interfaces::host_import_v0_2::HostImports for HostState {
103    fn secrets_get(
104        &mut self,
105        key: String,
106        _ctx: Option<TenantCtx>,
107    ) -> WasmResult<Result<String, IfaceError>> {
108        Ok(self.get_secret(&key).map_err(|err| {
109            tracing::warn!(secret = %key, error = %err, "secret lookup denied");
110            IfaceError::Denied
111        }))
112    }
113
114    fn telemetry_emit(&mut self, span_json: String, _ctx: Option<TenantCtx>) -> WasmResult<()> {
115        if let Some(mock) = &self.mocks
116            && mock.telemetry_drain(&[("span_json", span_json.as_str())])
117        {
118            return Ok(());
119        }
120        tracing::info!(span = %span_json, "telemetry emit from pack");
121        Ok(())
122    }
123
124    fn tool_invoke(
125        &mut self,
126        tool: String,
127        action: String,
128        args_json: String,
129        ctx: Option<TenantCtx>,
130    ) -> WasmResult<Result<String, IfaceError>> {
131        #[cfg(not(feature = "mcp"))]
132        {
133            let _ = (tool, action, args_json, ctx);
134            tracing::warn!("tool invoke requested but crate built without `mcp` feature");
135            return Ok(Err(IfaceError::Unavailable));
136        }
137
138        #[cfg(feature = "mcp")]
139        {
140            let exec_config = match &self.exec_config {
141                Some(cfg) => cfg.clone(),
142                None => {
143                    tracing::warn!(%tool, %action, "exec config unavailable for tool invoke");
144                    return Ok(Err(IfaceError::Unavailable));
145                }
146            };
147
148            let args: Value = match serde_json::from_str(&args_json) {
149                Ok(value) => value,
150                Err(err) => {
151                    tracing::warn!(error = %err, "invalid args for tool invoke");
152                    return Ok(Err(IfaceError::InvalidArg));
153                }
154            };
155
156            let tenant = ctx.map(|ctx| map_tenant_ctx(ctx, &self.default_env));
157
158            let request = ExecRequest {
159                component: tool.clone(),
160                action: action.clone(),
161                args,
162                tenant,
163            };
164
165            if let Some(mock) = &self.mocks
166                && let Some(result) = mock.tool_short_circuit(&tool, &action)
167            {
168                return match result.and_then(|value| {
169                    serde_json::to_string(&value)
170                        .map_err(|err| anyhow!("failed to serialise mock tool output: {err}"))
171                }) {
172                    Ok(body) => Ok(Ok(body)),
173                    Err(err) => {
174                        tracing::error!(error = %err, "mock tool execution failed");
175                        Ok(Err(IfaceError::Internal))
176                    }
177                };
178            }
179
180            match greentic_mcp::exec(request, &exec_config) {
181                Ok(value) => match serde_json::to_string(&value) {
182                    Ok(body) => Ok(Ok(body)),
183                    Err(err) => {
184                        tracing::error!(error = %err, "failed to serialise tool result");
185                        Ok(Err(IfaceError::Internal))
186                    }
187                },
188                Err(err) => {
189                    tracing::warn!(%tool, %action, error = %err, "tool invoke failed");
190                    let iface_err = match err {
191                        ExecError::NotFound { .. } => IfaceError::NotFound,
192                        ExecError::Tool { .. } => IfaceError::Denied,
193                        _ => IfaceError::Unavailable,
194                    };
195                    Ok(Err(iface_err))
196                }
197            }
198        }
199    }
200
201    fn http_fetch(
202        &mut self,
203        req: HttpRequest,
204        _ctx: Option<TenantCtx>,
205    ) -> WasmResult<Result<HttpResponse, IfaceError>> {
206        if !self.config.http_enabled {
207            tracing::warn!(url = %req.url, "http fetch denied by policy");
208            return Ok(Err(IfaceError::Denied));
209        }
210
211        let mut mock_state = None;
212        let raw_body = req.body.clone();
213        if let Some(mock) = &self.mocks
214            && let Ok(meta) = HttpMockRequest::new(
215                &req.method,
216                &req.url,
217                raw_body.as_deref().map(|body| body.as_bytes()),
218            )
219        {
220            match mock.http_begin(&meta) {
221                HttpDecision::Mock(response) => {
222                    let http = HttpResponse::from(&response);
223                    return Ok(Ok(http));
224                }
225                HttpDecision::Deny(reason) => {
226                    tracing::warn!(url = %req.url, reason = %reason, "http fetch blocked by mocks");
227                    return Ok(Err(IfaceError::Denied));
228                }
229                HttpDecision::Passthrough { record } => {
230                    mock_state = Some((meta, record));
231                }
232            }
233        }
234
235        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
236        let mut builder = self.http_client.request(method, &req.url);
237
238        if let Some(headers_json) = req.headers_json.as_ref() {
239            match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(headers_json) {
240                Ok(map) => {
241                    for (key, value) in map {
242                        if let Some(val) = value.as_str()
243                            && let Ok(header) =
244                                reqwest::header::HeaderName::from_bytes(key.as_bytes())
245                            && let Ok(header_value) = reqwest::header::HeaderValue::from_str(val)
246                        {
247                            builder = builder.header(header, header_value);
248                        }
249                    }
250                }
251                Err(err) => {
252                    tracing::warn!(error = %err, "failed to parse headers for http.fetch");
253                }
254            }
255        }
256
257        if let Some(body) = raw_body.clone() {
258            builder = builder.body(body);
259        }
260
261        let response = match builder.send() {
262            Ok(resp) => resp,
263            Err(err) => {
264                tracing::error!(url = %req.url, error = %err, "http fetch failed");
265                return Ok(Err(IfaceError::Unavailable));
266            }
267        };
268
269        let status = response.status().as_u16();
270        let headers_map = response
271            .headers()
272            .iter()
273            .map(|(k, v)| {
274                (
275                    k.as_str().to_string(),
276                    v.to_str().unwrap_or_default().to_string(),
277                )
278            })
279            .collect::<BTreeMap<_, _>>();
280        let headers_json = serde_json::to_string(&headers_map).ok();
281        let body = response.text().ok();
282
283        if let Some((meta, true)) = mock_state.take()
284            && let Some(mock) = &self.mocks
285        {
286            let recorded = HttpMockResponse::new(status, headers_map.clone(), body.clone());
287            mock.http_record(&meta, &recorded);
288        }
289
290        Ok(Ok(HttpResponse {
291            status,
292            headers_json,
293            body,
294        }))
295    }
296}
297
298impl PackRuntime {
299    pub async fn load(
300        path: impl AsRef<Path>,
301        config: Arc<HostConfig>,
302        mocks: Option<Arc<MockLayer>>,
303        archive_source: Option<&Path>,
304    ) -> Result<Self> {
305        let path = path.as_ref();
306        verify::verify_pack(path).await?;
307        tracing::info!(pack_path = %path.display(), "pack verification complete");
308        let engine = Engine::default();
309        let wasm_bytes = fs::read(path).await?;
310        let mut metadata =
311            PackMetadata::from_wasm(&wasm_bytes).unwrap_or_else(|| PackMetadata::fallback(path));
312        let mut archive = None;
313        let component = match Component::from_file(&engine, path) {
314            Ok(component) => Some(component),
315            Err(err) => {
316                if let Some(archive_path) = archive_source {
317                    tracing::warn!(
318                        error = %err,
319                        pack = %archive_path.display(),
320                        "component load failed, using manifest archive"
321                    );
322                    let archive_data = ArchiveFlows::from_archive(archive_path)?;
323                    metadata = archive_data.metadata.clone();
324                    archive = Some(archive_data);
325                    None
326                } else {
327                    return Err(err);
328                }
329            }
330        };
331        Ok(Self {
332            path: path.to_path_buf(),
333            config,
334            engine,
335            component,
336            metadata,
337            mocks,
338            archive,
339        })
340    }
341
342    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
343        if let Some(archive) = &self.archive {
344            return Ok(archive.descriptors.clone());
345        }
346        tracing::trace!(
347            tenant = %self.config.tenant,
348            pack_path = %self.path.display(),
349            "listing flows from pack"
350        );
351        let component = self
352            .component
353            .as_ref()
354            .ok_or_else(|| anyhow!("pack component unavailable"))?;
355        let mut store = Store::new(
356            &self.engine,
357            HostState::new(Arc::clone(&self.config), self.mocks.clone())?,
358        );
359        let mut linker = Linker::new(&self.engine);
360        imports::register_all(&mut linker)?;
361        let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
362        let exports = bindings.greentic_pack_export_exports();
363        let flows_raw = match exports.call_list_flows(&mut store)? {
364            Ok(flows) => flows,
365            Err(err) => {
366                bail!("pack list_flows failed: {err:?}");
367            }
368        };
369        let flows = flows_raw
370            .into_iter()
371            .map(|flow: FlowInfo| {
372                let profile = flow.profile.clone();
373                let version = flow.version.clone();
374                FlowDescriptor {
375                    id: flow.id,
376                    flow_type: flow.flow_type,
377                    profile,
378                    version,
379                    description: Some(format!("{}@{}", flow.profile, flow.version)),
380                }
381            })
382            .collect();
383        Ok(flows)
384    }
385
386    #[allow(dead_code)]
387    pub async fn run_flow(
388        &self,
389        _flow_id: &str,
390        _input: serde_json::Value,
391    ) -> Result<serde_json::Value> {
392        // TODO: dispatch flow execution via Wasmtime
393        Ok(serde_json::json!({}))
394    }
395
396    pub fn load_flow_ir(&self, flow_id: &str) -> Result<greentic_flow::ir::FlowIR> {
397        if let Some(archive) = &self.archive {
398            return archive
399                .flows
400                .get(flow_id)
401                .cloned()
402                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in archive"));
403        }
404        let component = self
405            .component
406            .as_ref()
407            .ok_or_else(|| anyhow!("pack component unavailable"))?;
408        let mut store = Store::new(
409            &self.engine,
410            HostState::new(Arc::clone(&self.config), self.mocks.clone())?,
411        );
412        let mut linker = Linker::new(&self.engine);
413        imports::register_all(&mut linker)?;
414        let bindings = pack_export_v0_2::PackExports::instantiate(&mut store, component, &linker)?;
415        let exports = bindings.greentic_pack_export_exports();
416        let flow_name = flow_id.to_string();
417        let metadata = match exports.call_flow_metadata(&mut store, &flow_name)? {
418            Ok(doc) => doc,
419            Err(err) => bail!("pack flow_metadata({flow_id}) failed: {err:?}"),
420        };
421        let flow_doc: greentic_flow::model::FlowDoc = serde_json::from_str(&metadata)
422            .or_else(|_| serde_yaml::from_str(&metadata))
423            .with_context(|| format!("failed to parse flow metadata for {flow_id}"))?;
424        let ir = greentic_flow::to_ir(flow_doc)?;
425        Ok(ir)
426    }
427
428    pub fn metadata(&self) -> &PackMetadata {
429        &self.metadata
430    }
431}
432
433#[cfg(feature = "mcp")]
434fn map_tenant_ctx(ctx: TenantCtx, default_env: &str) -> TypesTenantCtx {
435    let env = ctx
436        .deployment
437        .runtime
438        .unwrap_or_else(|| default_env.to_string());
439
440    let env_id = EnvId::from_str(env.as_str()).expect("invalid env id");
441    let tenant_id = TenantId::from_str(ctx.tenant.as_str()).expect("invalid tenant id");
442    let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
443    tenant_ctx = tenant_ctx.with_team(
444        ctx.team
445            .as_ref()
446            .and_then(|team| TeamId::from_str(team.as_str()).ok()),
447    );
448    tenant_ctx = tenant_ctx.with_user(
449        ctx.user
450            .as_ref()
451            .and_then(|user| UserId::from_str(user.as_str()).ok()),
452    );
453    tenant_ctx.trace_id = ctx.trace_id;
454    tenant_ctx
455}
456
457struct ArchiveFlows {
458    descriptors: Vec<FlowDescriptor>,
459    flows: HashMap<String, FlowIR>,
460    metadata: PackMetadata,
461}
462
463impl ArchiveFlows {
464    fn from_archive(path: &Path) -> Result<Self> {
465        let file = File::open(path)
466            .with_context(|| format!("failed to open archive {}", path.display()))?;
467        let mut archive = ZipArchive::new(file)
468            .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
469        let manifest_bytes = read_entry(&mut archive, "manifest.cbor")?;
470        let manifest: greentic_pack::builder::PackManifest =
471            serde_cbor::from_slice(&manifest_bytes).context("manifest.cbor is invalid")?;
472
473        let mut flows = HashMap::new();
474        let mut descriptors = Vec::new();
475        for flow in &manifest.flows {
476            let ir = build_stub_flow_ir(&flow.id, &flow.kind);
477            flows.insert(flow.id.clone(), ir);
478            descriptors.push(FlowDescriptor {
479                id: flow.id.clone(),
480                flow_type: flow.kind.clone(),
481                profile: manifest.meta.name.clone(),
482                version: manifest.meta.version.to_string(),
483                description: Some(flow.kind.clone()),
484            });
485        }
486
487        Ok(Self {
488            metadata: PackMetadata::from_manifest(&manifest),
489            descriptors,
490            flows,
491        })
492    }
493}
494
495fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
496    let mut file = archive
497        .by_name(name)
498        .with_context(|| format!("entry {name} missing from archive"))?;
499    let mut buf = Vec::new();
500    file.read_to_end(&mut buf)?;
501    Ok(buf)
502}
503
504fn build_stub_flow_ir(flow_id: &str, flow_type: &str) -> FlowIR {
505    let mut nodes = IndexMap::new();
506    nodes.insert(
507        "complete".into(),
508        NodeIR {
509            component: "qa.process".into(),
510            payload_expr: json!({
511                "status": "done",
512                "flow_id": flow_id,
513            }),
514            routes: vec![RouteIR {
515                to: None,
516                out: true,
517            }],
518        },
519    );
520    FlowIR {
521        id: flow_id.to_string(),
522        flow_type: flow_type.to_string(),
523        start: Some("complete".into()),
524        parameters: Value::Object(Default::default()),
525        nodes,
526    }
527}
528
529#[derive(Clone, Debug, Default, Serialize, Deserialize)]
530pub struct PackMetadata {
531    pub pack_id: String,
532    pub version: String,
533    #[serde(default)]
534    pub entry_flows: Vec<String>,
535}
536
537impl PackMetadata {
538    fn from_wasm(bytes: &[u8]) -> Option<Self> {
539        let parser = Parser::new(0);
540        for payload in parser.parse_all(bytes) {
541            let payload = payload.ok()?;
542            match payload {
543                Payload::CustomSection(section) => {
544                    if section.name() == "greentic.manifest"
545                        && let Ok(meta) = Self::from_bytes(section.data())
546                    {
547                        return Some(meta);
548                    }
549                }
550                Payload::DataSection(reader) => {
551                    for segment in reader.into_iter().flatten() {
552                        if let Ok(meta) = Self::from_bytes(segment.data) {
553                            return Some(meta);
554                        }
555                    }
556                }
557                _ => {}
558            }
559        }
560        None
561    }
562
563    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
564        #[derive(Deserialize)]
565        struct RawManifest {
566            pack_id: String,
567            version: String,
568            #[serde(default)]
569            entry_flows: Vec<String>,
570            #[serde(default)]
571            flows: Vec<RawFlow>,
572        }
573
574        #[derive(Deserialize)]
575        struct RawFlow {
576            id: String,
577        }
578
579        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
580        let mut entry_flows = if manifest.entry_flows.is_empty() {
581            manifest.flows.iter().map(|f| f.id.clone()).collect()
582        } else {
583            manifest.entry_flows.clone()
584        };
585        entry_flows.retain(|id| !id.is_empty());
586        Ok(Self {
587            pack_id: manifest.pack_id,
588            version: manifest.version,
589            entry_flows,
590        })
591    }
592
593    pub fn fallback(path: &Path) -> Self {
594        let pack_id = path
595            .file_stem()
596            .map(|s| s.to_string_lossy().into_owned())
597            .unwrap_or_else(|| "unknown-pack".to_string());
598        Self {
599            pack_id,
600            version: "0.0.0".to_string(),
601            entry_flows: Vec::new(),
602        }
603    }
604
605    pub fn from_manifest(manifest: &greentic_pack::builder::PackManifest) -> Self {
606        let entry_flows = if manifest.meta.entry_flows.is_empty() {
607            manifest
608                .flows
609                .iter()
610                .map(|flow| flow.id.clone())
611                .collect::<Vec<_>>()
612        } else {
613            manifest.meta.entry_flows.clone()
614        };
615        Self {
616            pack_id: manifest.meta.pack_id.clone(),
617            version: manifest.meta.version.to_string(),
618            entry_flows,
619        }
620    }
621}
622
623impl From<&HttpMockResponse> for HttpResponse {
624    fn from(value: &HttpMockResponse) -> Self {
625        let headers_json = serde_json::to_string(&value.headers).ok();
626        Self {
627            status: value.status,
628            headers_json,
629            body: value.body.clone(),
630        }
631    }
632}