greentic_runner_host/
pack.rs

1use std::collections::{HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11    ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20    self as host_v1, HostFns, add_all_v1_to_linker,
21    runner_host_http::RunnerHostHttp,
22    runner_host_kv::RunnerHostKv,
23    secrets_store::{SecretsError, SecretsStoreHost},
24    state_store::{
25        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26        StateStoreHost, TenantCtx as StateTenantCtx,
27    },
28    telemetry_logger::{
29        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31        TenantCtx as TelemetryTenantCtx,
32    },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::{
38    EnvId, Flow, StateKey as StoreStateKey, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
39    decode_pack_manifest,
40};
41use host_v1::http_client::{
42    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
43    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
44    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
45    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
46};
47use once_cell::sync::Lazy;
48use parking_lot::{Mutex, RwLock};
49use reqwest::blocking::Client as BlockingClient;
50use runner_core::normalize_under_root;
51use serde::{Deserialize, Serialize};
52use serde_cbor;
53use serde_json::{self, Value};
54use tokio::fs;
55use wasmparser::{Parser, Payload};
56use wasmtime::StoreContextMut;
57use zip::ZipArchive;
58
59use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
60use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
61use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
62
63use crate::config::HostConfig;
64use crate::secrets::{DynSecretsManager, read_secret_blocking};
65use crate::storage::state::STATE_PREFIX;
66use crate::storage::{DynSessionStore, DynStateStore};
67use crate::verify;
68use crate::wasi::RunnerWasiPolicy;
69use tracing::warn;
70use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
71use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
72
73use greentic_flow::model::FlowDoc;
74
75#[allow(dead_code)]
76pub struct PackRuntime {
77    /// Component artifact path (wasm file).
78    path: PathBuf,
79    /// Optional archive (.gtpack) used to load flows/manifests.
80    archive_path: Option<PathBuf>,
81    config: Arc<HostConfig>,
82    engine: Engine,
83    metadata: PackMetadata,
84    manifest: Option<greentic_types::PackManifest>,
85    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
86    mocks: Option<Arc<MockLayer>>,
87    flows: Option<PackFlows>,
88    components: HashMap<String, PackComponent>,
89    http_client: Arc<BlockingClient>,
90    pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
91    session_store: Option<DynSessionStore>,
92    state_store: Option<DynStateStore>,
93    wasi_policy: Arc<RunnerWasiPolicy>,
94    provider_registry: RwLock<Option<ProviderRegistry>>,
95    secrets: DynSecretsManager,
96    oauth_config: Option<OAuthBrokerConfig>,
97}
98
99struct PackComponent {
100    #[allow(dead_code)]
101    name: String,
102    #[allow(dead_code)]
103    version: String,
104    component: Component,
105}
106
107#[derive(Debug, Default, Clone)]
108pub struct ComponentResolution {
109    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
110    pub materialized_root: Option<PathBuf>,
111    /// Explicit overrides mapping component id -> wasm path.
112    pub overrides: HashMap<String, PathBuf>,
113}
114
115fn build_blocking_client() -> BlockingClient {
116    std::thread::spawn(|| {
117        BlockingClient::builder()
118            .no_proxy()
119            .build()
120            .expect("blocking client")
121    })
122    .join()
123    .expect("client build thread panicked")
124}
125
126fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
127    let (root, candidate) = if path.is_absolute() {
128        let parent = path
129            .parent()
130            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
131        let root = parent
132            .canonicalize()
133            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
134        let file = path
135            .file_name()
136            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
137        (root, PathBuf::from(file))
138    } else {
139        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
140        let base = if let Some(parent) = path.parent() {
141            cwd.join(parent)
142        } else {
143            cwd
144        };
145        let root = base
146            .canonicalize()
147            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
148        let file = path
149            .file_name()
150            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
151        (root, PathBuf::from(file))
152    };
153    let safe = normalize_under_root(&root, &candidate)?;
154    Ok((root, safe))
155}
156
157static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct FlowDescriptor {
161    pub id: String,
162    #[serde(rename = "type")]
163    pub flow_type: String,
164    pub profile: String,
165    pub version: String,
166    #[serde(default)]
167    pub description: Option<String>,
168}
169
170pub struct HostState {
171    config: Arc<HostConfig>,
172    http_client: Arc<BlockingClient>,
173    default_env: String,
174    #[allow(dead_code)]
175    session_store: Option<DynSessionStore>,
176    state_store: Option<DynStateStore>,
177    mocks: Option<Arc<MockLayer>>,
178    secrets: DynSecretsManager,
179    oauth_config: Option<OAuthBrokerConfig>,
180    oauth_host: OAuthBrokerHost,
181}
182
183impl HostState {
184    #[allow(clippy::default_constructed_unit_structs)]
185    pub fn new(
186        config: Arc<HostConfig>,
187        http_client: Arc<BlockingClient>,
188        mocks: Option<Arc<MockLayer>>,
189        session_store: Option<DynSessionStore>,
190        state_store: Option<DynStateStore>,
191        secrets: DynSecretsManager,
192        oauth_config: Option<OAuthBrokerConfig>,
193    ) -> Result<Self> {
194        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
195        Ok(Self {
196            config,
197            http_client,
198            default_env,
199            session_store,
200            state_store,
201            mocks,
202            secrets,
203            oauth_config,
204            oauth_host: OAuthBrokerHost::default(),
205        })
206    }
207
208    pub fn get_secret(&self, key: &str) -> Result<String> {
209        if provider_core_only::is_enabled() {
210            bail!(provider_core_only::blocked_message("secrets"))
211        }
212        if !self.config.secrets_policy.is_allowed(key) {
213            bail!("secret {key} is not permitted by bindings policy");
214        }
215        if let Some(mock) = &self.mocks
216            && let Some(value) = mock.secrets_lookup(key)
217        {
218            return Ok(value);
219        }
220        let bytes = read_secret_blocking(&self.secrets, key)
221            .context("failed to read secret from manager")?;
222        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
223        Ok(value)
224    }
225
226    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
227        let tenant_raw = ctx
228            .as_ref()
229            .map(|ctx| ctx.tenant.clone())
230            .unwrap_or_else(|| self.config.tenant.clone());
231        let env_raw = ctx
232            .as_ref()
233            .map(|ctx| ctx.env.clone())
234            .unwrap_or_else(|| self.default_env.clone());
235        let tenant_id = TenantId::from_str(&tenant_raw)
236            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
237        let env_id = EnvId::from_str(&env_raw)
238            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
239        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
240        if let Some(ctx) = ctx {
241            if let Some(team) = ctx.team.or(ctx.team_id) {
242                let team_id =
243                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
244                tenant_ctx = tenant_ctx.with_team(Some(team_id));
245            }
246            if let Some(user) = ctx.user.or(ctx.user_id) {
247                let user_id =
248                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
249                tenant_ctx = tenant_ctx.with_user(Some(user_id));
250            }
251            if let Some(flow) = ctx.flow_id {
252                tenant_ctx = tenant_ctx.with_flow(flow);
253            }
254            if let Some(node) = ctx.node_id {
255                tenant_ctx = tenant_ctx.with_node(node);
256            }
257            if let Some(provider) = ctx.provider_id {
258                tenant_ctx = tenant_ctx.with_provider(provider);
259            }
260            if let Some(session) = ctx.session_id {
261                tenant_ctx = tenant_ctx.with_session(session);
262            }
263            tenant_ctx.trace_id = ctx.trace_id;
264        }
265        Ok(tenant_ctx)
266    }
267
268    fn send_http_request(
269        &mut self,
270        req: HttpRequest,
271        opts: Option<HttpRequestOptionsV1_1>,
272        _ctx: Option<HttpTenantCtx>,
273    ) -> Result<HttpResponse, HttpClientError> {
274        if !self.config.http_enabled {
275            return Err(HttpClientError {
276                code: "denied".into(),
277                message: "http client disabled by policy".into(),
278            });
279        }
280
281        let mut mock_state = None;
282        let raw_body = req.body.clone();
283        if let Some(mock) = &self.mocks
284            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
285        {
286            match mock.http_begin(&meta) {
287                HttpDecision::Mock(response) => {
288                    let headers = response
289                        .headers
290                        .iter()
291                        .map(|(k, v)| (k.clone(), v.clone()))
292                        .collect();
293                    return Ok(HttpResponse {
294                        status: response.status,
295                        headers,
296                        body: response.body.clone().map(|b| b.into_bytes()),
297                    });
298                }
299                HttpDecision::Deny(reason) => {
300                    return Err(HttpClientError {
301                        code: "denied".into(),
302                        message: reason,
303                    });
304                }
305                HttpDecision::Passthrough { record } => {
306                    mock_state = Some((meta, record));
307                }
308            }
309        }
310
311        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
312        let mut builder = self.http_client.request(method, &req.url);
313        for (key, value) in req.headers {
314            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
315                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
316            {
317                builder = builder.header(header, header_value);
318            }
319        }
320
321        if let Some(body) = raw_body.clone() {
322            builder = builder.body(body);
323        }
324
325        if let Some(opts) = opts {
326            if let Some(timeout_ms) = opts.timeout_ms {
327                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
328            }
329            if opts.allow_insecure == Some(true) {
330                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
331            }
332            if let Some(follow_redirects) = opts.follow_redirects
333                && !follow_redirects
334            {
335                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
336            }
337        }
338
339        let response = match builder.send() {
340            Ok(resp) => resp,
341            Err(err) => {
342                warn!(url = %req.url, error = %err, "http client request failed");
343                return Err(HttpClientError {
344                    code: "unavailable".into(),
345                    message: err.to_string(),
346                });
347            }
348        };
349
350        let status = response.status().as_u16();
351        let headers_vec = response
352            .headers()
353            .iter()
354            .map(|(k, v)| {
355                (
356                    k.as_str().to_string(),
357                    v.to_str().unwrap_or_default().to_string(),
358                )
359            })
360            .collect::<Vec<_>>();
361        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
362
363        if let Some((meta, true)) = mock_state.take()
364            && let Some(mock) = &self.mocks
365        {
366            let recorded = HttpMockResponse::new(
367                status,
368                headers_vec.clone().into_iter().collect(),
369                body_bytes
370                    .as_ref()
371                    .map(|b| String::from_utf8_lossy(b).into_owned()),
372            );
373            mock.http_record(&meta, &recorded);
374        }
375
376        Ok(HttpResponse {
377            status,
378            headers: headers_vec,
379            body: body_bytes,
380        })
381    }
382}
383
384impl SecretsStoreHost for HostState {
385    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
386        if provider_core_only::is_enabled() {
387            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
388            return Err(SecretsError::Denied);
389        }
390        if !self.config.secrets_policy.is_allowed(&key) {
391            return Err(SecretsError::Denied);
392        }
393        if let Some(mock) = &self.mocks
394            && let Some(value) = mock.secrets_lookup(&key)
395        {
396            return Ok(Some(value.into_bytes()));
397        }
398        match read_secret_blocking(&self.secrets, &key) {
399            Ok(bytes) => Ok(Some(bytes)),
400            Err(err) => {
401                warn!(secret = %key, error = %err, "secret lookup failed");
402                Err(SecretsError::NotFound)
403            }
404        }
405    }
406}
407
408impl HttpClientHost for HostState {
409    fn send(
410        &mut self,
411        req: HttpRequest,
412        ctx: Option<HttpTenantCtx>,
413    ) -> Result<HttpResponse, HttpClientError> {
414        self.send_http_request(req, None, ctx)
415    }
416}
417
418impl HttpClientHostV1_1 for HostState {
419    fn send(
420        &mut self,
421        req: HttpRequestV1_1,
422        opts: Option<HttpRequestOptionsV1_1>,
423        ctx: Option<HttpTenantCtxV1_1>,
424    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
425        let legacy_req = HttpRequest {
426            method: req.method,
427            url: req.url,
428            headers: req.headers,
429            body: req.body,
430        };
431        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
432            env: ctx.env,
433            tenant: ctx.tenant,
434            tenant_id: ctx.tenant_id,
435            team: ctx.team,
436            team_id: ctx.team_id,
437            user: ctx.user,
438            user_id: ctx.user_id,
439            trace_id: ctx.trace_id,
440            correlation_id: ctx.correlation_id,
441            attributes: ctx.attributes,
442            session_id: ctx.session_id,
443            flow_id: ctx.flow_id,
444            node_id: ctx.node_id,
445            provider_id: ctx.provider_id,
446            deadline_ms: ctx.deadline_ms,
447            attempt: ctx.attempt,
448            idempotency_key: ctx.idempotency_key,
449            impersonation: ctx
450                .impersonation
451                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
452                    actor_id,
453                    reason,
454                }),
455        });
456
457        self.send_http_request(legacy_req, opts, legacy_ctx)
458            .map(|resp| HttpResponseV1_1 {
459                status: resp.status,
460                headers: resp.headers,
461                body: resp.body,
462            })
463            .map_err(|err| HttpClientErrorV1_1 {
464                code: err.code,
465                message: err.message,
466            })
467    }
468}
469
470impl StateStoreHost for HostState {
471    fn read(
472        &mut self,
473        key: HostStateKey,
474        ctx: Option<StateTenantCtx>,
475    ) -> Result<Vec<u8>, StateError> {
476        let store = match self.state_store.as_ref() {
477            Some(store) => store.clone(),
478            None => {
479                return Err(StateError {
480                    code: "unavailable".into(),
481                    message: "state store not configured".into(),
482                });
483            }
484        };
485        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
486            Ok(ctx) => ctx,
487            Err(err) => {
488                return Err(StateError {
489                    code: "invalid-ctx".into(),
490                    message: err.to_string(),
491                });
492            }
493        };
494        let key = StoreStateKey::from(key);
495        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
496            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
497            Ok(None) => Err(StateError {
498                code: "not_found".into(),
499                message: "state key not found".into(),
500            }),
501            Err(err) => Err(StateError {
502                code: "internal".into(),
503                message: err.to_string(),
504            }),
505        }
506    }
507
508    fn write(
509        &mut self,
510        key: HostStateKey,
511        bytes: Vec<u8>,
512        ctx: Option<StateTenantCtx>,
513    ) -> Result<StateOpAck, StateError> {
514        let store = match self.state_store.as_ref() {
515            Some(store) => store.clone(),
516            None => {
517                return Err(StateError {
518                    code: "unavailable".into(),
519                    message: "state store not configured".into(),
520                });
521            }
522        };
523        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
524            Ok(ctx) => ctx,
525            Err(err) => {
526                return Err(StateError {
527                    code: "invalid-ctx".into(),
528                    message: err.to_string(),
529                });
530            }
531        };
532        let key = StoreStateKey::from(key);
533        let value = serde_json::from_slice(&bytes)
534            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
535        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
536            Ok(()) => Ok(StateOpAck::Ok),
537            Err(err) => Err(StateError {
538                code: "internal".into(),
539                message: err.to_string(),
540            }),
541        }
542    }
543
544    fn delete(
545        &mut self,
546        key: HostStateKey,
547        ctx: Option<StateTenantCtx>,
548    ) -> Result<StateOpAck, StateError> {
549        let store = match self.state_store.as_ref() {
550            Some(store) => store.clone(),
551            None => {
552                return Err(StateError {
553                    code: "unavailable".into(),
554                    message: "state store not configured".into(),
555                });
556            }
557        };
558        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
559            Ok(ctx) => ctx,
560            Err(err) => {
561                return Err(StateError {
562                    code: "invalid-ctx".into(),
563                    message: err.to_string(),
564                });
565            }
566        };
567        let key = StoreStateKey::from(key);
568        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
569            Ok(_) => Ok(StateOpAck::Ok),
570            Err(err) => Err(StateError {
571                code: "internal".into(),
572                message: err.to_string(),
573            }),
574        }
575    }
576}
577
578impl TelemetryLoggerHost for HostState {
579    fn log(
580        &mut self,
581        span: TelemetrySpanContext,
582        fields: Vec<(String, String)>,
583        _ctx: Option<TelemetryTenantCtx>,
584    ) -> Result<TelemetryAck, TelemetryError> {
585        if let Some(mock) = &self.mocks
586            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
587        {
588            return Ok(TelemetryAck::Ok);
589        }
590        let mut map = serde_json::Map::new();
591        for (k, v) in fields {
592            map.insert(k, Value::String(v));
593        }
594        tracing::info!(
595            tenant = %span.tenant,
596            flow_id = %span.flow_id,
597            node = ?span.node_id,
598            provider = %span.provider,
599            fields = %serde_json::Value::Object(map.clone()),
600            "telemetry log from pack"
601        );
602        Ok(TelemetryAck::Ok)
603    }
604}
605
606impl RunnerHostHttp for HostState {
607    fn request(
608        &mut self,
609        method: String,
610        url: String,
611        headers: Vec<String>,
612        body: Option<Vec<u8>>,
613    ) -> Result<Vec<u8>, String> {
614        let req = HttpRequest {
615            method,
616            url,
617            headers: headers
618                .chunks(2)
619                .filter_map(|chunk| {
620                    if chunk.len() == 2 {
621                        Some((chunk[0].clone(), chunk[1].clone()))
622                    } else {
623                        None
624                    }
625                })
626                .collect(),
627            body,
628        };
629        match HttpClientHost::send(self, req, None) {
630            Ok(resp) => Ok(resp.body.unwrap_or_default()),
631            Err(err) => Err(err.message),
632        }
633    }
634}
635
636impl RunnerHostKv for HostState {
637    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
638        None
639    }
640
641    fn put(&mut self, _ns: String, _key: String, _val: String) {}
642}
643
644enum ManifestLoad {
645    New {
646        manifest: Box<greentic_types::PackManifest>,
647        flows: PackFlows,
648    },
649    Legacy {
650        manifest: Box<legacy_pack::PackManifest>,
651        flows: PackFlows,
652    },
653}
654
655fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
656    let mut archive = ZipArchive::new(File::open(path)?)
657        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
658    let bytes = read_entry(&mut archive, "manifest.cbor")
659        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
660    match decode_pack_manifest(&bytes) {
661        Ok(manifest) => {
662            let cache = PackFlows::from_manifest(manifest.clone());
663            Ok(ManifestLoad::New {
664                manifest: Box::new(manifest),
665                flows: cache,
666            })
667        }
668        Err(err) => {
669            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
670            // Fall back to legacy pack manifest
671            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
672                .context("failed to decode legacy pack manifest from manifest.cbor")?;
673            let flows = load_legacy_flows(&mut archive, &legacy)?;
674            Ok(ManifestLoad::Legacy {
675                manifest: Box::new(legacy),
676                flows,
677            })
678        }
679    }
680}
681
682fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
683    let manifest_path = root.join("manifest.cbor");
684    let bytes = std::fs::read(&manifest_path)
685        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
686    match decode_pack_manifest(&bytes) {
687        Ok(manifest) => {
688            let cache = PackFlows::from_manifest(manifest.clone());
689            Ok(ManifestLoad::New {
690                manifest: Box::new(manifest),
691                flows: cache,
692            })
693        }
694        Err(err) => {
695            tracing::debug!(
696                error = %err,
697                pack = %root.display(),
698                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
699            );
700            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
701                .context("failed to decode legacy pack manifest from manifest.cbor")?;
702            let flows = load_legacy_flows_from_dir(root, &legacy)?;
703            Ok(ManifestLoad::Legacy {
704                manifest: Box::new(legacy),
705                flows,
706            })
707        }
708    }
709}
710
711fn load_legacy_flows(
712    archive: &mut ZipArchive<File>,
713    manifest: &legacy_pack::PackManifest,
714) -> Result<PackFlows> {
715    let mut flows = HashMap::new();
716    let mut descriptors = Vec::new();
717
718    for entry in &manifest.flows {
719        let bytes = read_entry(archive, &entry.file_json)
720            .with_context(|| format!("missing flow json {}", entry.file_json))?;
721        let doc: FlowDoc = serde_json::from_slice(&bytes)
722            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
723        let normalized = normalize_flow_doc(doc);
724        let flow_ir = flow_doc_to_ir(normalized)?;
725        let flow = flow_ir_to_flow(flow_ir)?;
726
727        descriptors.push(FlowDescriptor {
728            id: entry.id.clone(),
729            flow_type: entry.kind.clone(),
730            profile: manifest.meta.pack_id.clone(),
731            version: manifest.meta.version.to_string(),
732            description: None,
733        });
734        flows.insert(entry.id.clone(), flow);
735    }
736
737    let mut entry_flows = manifest.meta.entry_flows.clone();
738    if entry_flows.is_empty() {
739        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
740    }
741    let metadata = PackMetadata {
742        pack_id: manifest.meta.pack_id.clone(),
743        version: manifest.meta.version.to_string(),
744        entry_flows,
745        secret_requirements: Vec::new(),
746    };
747
748    Ok(PackFlows {
749        descriptors,
750        flows,
751        metadata,
752    })
753}
754
755fn load_legacy_flows_from_dir(
756    root: &Path,
757    manifest: &legacy_pack::PackManifest,
758) -> Result<PackFlows> {
759    let mut flows = HashMap::new();
760    let mut descriptors = Vec::new();
761
762    for entry in &manifest.flows {
763        let path = root.join(&entry.file_json);
764        let bytes = std::fs::read(&path)
765            .with_context(|| format!("missing flow json {}", path.display()))?;
766        let doc: FlowDoc = serde_json::from_slice(&bytes)
767            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
768        let normalized = normalize_flow_doc(doc);
769        let flow_ir = flow_doc_to_ir(normalized)?;
770        let flow = flow_ir_to_flow(flow_ir)?;
771
772        descriptors.push(FlowDescriptor {
773            id: entry.id.clone(),
774            flow_type: entry.kind.clone(),
775            profile: manifest.meta.pack_id.clone(),
776            version: manifest.meta.version.to_string(),
777            description: None,
778        });
779        flows.insert(entry.id.clone(), flow);
780    }
781
782    let mut entry_flows = manifest.meta.entry_flows.clone();
783    if entry_flows.is_empty() {
784        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
785    }
786    let metadata = PackMetadata {
787        pack_id: manifest.meta.pack_id.clone(),
788        version: manifest.meta.version.to_string(),
789        entry_flows,
790        secret_requirements: Vec::new(),
791    };
792
793    Ok(PackFlows {
794        descriptors,
795        flows,
796        metadata,
797    })
798}
799
800pub struct ComponentState {
801    pub host: HostState,
802    wasi_ctx: WasiCtx,
803    resource_table: ResourceTable,
804}
805
806impl ComponentState {
807    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
808        let wasi_ctx = policy
809            .instantiate()
810            .context("failed to build WASI context")?;
811        Ok(Self {
812            host,
813            wasi_ctx,
814            resource_table: ResourceTable::new(),
815        })
816    }
817
818    fn host_mut(&mut self) -> &mut HostState {
819        &mut self.host
820    }
821}
822
823impl control::Host for ComponentState {
824    fn should_cancel(&mut self) -> bool {
825        false
826    }
827
828    fn yield_now(&mut self) {
829        // no-op cooperative yield
830    }
831}
832
833fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
834    let mut inst = linker.instance("greentic:component/control@0.4.0")?;
835    inst.func_wrap(
836        "should-cancel",
837        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
838            let host = caller.data_mut();
839            Ok((ComponentControlHost::should_cancel(host),))
840        },
841    )?;
842    inst.func_wrap(
843        "yield-now",
844        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
845            let host = caller.data_mut();
846            ComponentControlHost::yield_now(host);
847            Ok(())
848        },
849    )?;
850    Ok(())
851}
852
853pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
854    add_wasi_to_linker(linker)?;
855    add_all_v1_to_linker(
856        linker,
857        HostFns {
858            http_client_v1_1: Some(|state| state.host_mut()),
859            http_client: Some(|state| state.host_mut()),
860            oauth_broker: None,
861            runner_host_http: Some(|state| state.host_mut()),
862            runner_host_kv: Some(|state| state.host_mut()),
863            telemetry_logger: Some(|state| state.host_mut()),
864            state_store: Some(|state| state.host_mut()),
865            secrets_store: Some(|state| state.host_mut()),
866        },
867    )?;
868    Ok(())
869}
870
871impl OAuthHostContext for ComponentState {
872    fn tenant_id(&self) -> &str {
873        &self.host.config.tenant
874    }
875
876    fn env(&self) -> &str {
877        &self.host.default_env
878    }
879
880    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
881        &mut self.host.oauth_host
882    }
883
884    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
885        self.host.oauth_config.as_ref()
886    }
887}
888
889impl WasiView for ComponentState {
890    fn ctx(&mut self) -> WasiCtxView<'_> {
891        WasiCtxView {
892            ctx: &mut self.wasi_ctx,
893            table: &mut self.resource_table,
894        }
895    }
896}
897
898#[allow(unsafe_code)]
899unsafe impl Send for ComponentState {}
900#[allow(unsafe_code)]
901unsafe impl Sync for ComponentState {}
902
903impl PackRuntime {
904    #[allow(clippy::too_many_arguments)]
905    pub async fn load(
906        path: impl AsRef<Path>,
907        config: Arc<HostConfig>,
908        mocks: Option<Arc<MockLayer>>,
909        archive_source: Option<&Path>,
910        session_store: Option<DynSessionStore>,
911        state_store: Option<DynStateStore>,
912        wasi_policy: Arc<RunnerWasiPolicy>,
913        secrets: DynSecretsManager,
914        oauth_config: Option<OAuthBrokerConfig>,
915        verify_archive: bool,
916        component_resolution: ComponentResolution,
917    ) -> Result<Self> {
918        let path = path.as_ref();
919        let (_pack_root, safe_path) = normalize_pack_path(path)?;
920        let path_meta = std::fs::metadata(&safe_path).ok();
921        let is_dir = path_meta
922            .as_ref()
923            .map(|meta| meta.is_dir())
924            .unwrap_or(false);
925        let is_component = !is_dir
926            && safe_path
927                .extension()
928                .and_then(|ext| ext.to_str())
929                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
930                .unwrap_or(false);
931        let archive_hint_path = if let Some(source) = archive_source {
932            let (_, normalized) = normalize_pack_path(source)?;
933            Some(normalized)
934        } else if is_component || is_dir {
935            None
936        } else {
937            Some(safe_path.clone())
938        };
939        let archive_hint = archive_hint_path.as_deref();
940        if verify_archive {
941            if let Some(verify_target) = archive_hint.and_then(|p| {
942                std::fs::metadata(p)
943                    .ok()
944                    .filter(|meta| meta.is_file())
945                    .map(|_| p)
946            }) {
947                verify::verify_pack(verify_target).await?;
948                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
949            } else {
950                tracing::debug!("skipping archive verification (no archive source)");
951            }
952        }
953        let engine = Engine::default();
954        let mut metadata = PackMetadata::fallback(&safe_path);
955        let mut manifest = None;
956        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
957        let mut flows = None;
958        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
959            if is_dir {
960                Some(safe_path.clone())
961            } else {
962                None
963            }
964        });
965
966        if let Some(root) = materialized_root.as_ref() {
967            match load_manifest_and_flows_from_dir(root) {
968                Ok(ManifestLoad::New {
969                    manifest: m,
970                    flows: cache,
971                }) => {
972                    metadata = cache.metadata.clone();
973                    manifest = Some(*m);
974                    flows = Some(cache);
975                }
976                Ok(ManifestLoad::Legacy {
977                    manifest: m,
978                    flows: cache,
979                }) => {
980                    metadata = cache.metadata.clone();
981                    legacy_manifest = Some(m);
982                    flows = Some(cache);
983                }
984                Err(err) => {
985                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
986                }
987            }
988        }
989
990        if manifest.is_none()
991            && legacy_manifest.is_none()
992            && let Some(archive_path) = archive_hint
993        {
994            match load_manifest_and_flows(archive_path) {
995                Ok(ManifestLoad::New {
996                    manifest: m,
997                    flows: cache,
998                }) => {
999                    metadata = cache.metadata.clone();
1000                    manifest = Some(*m);
1001                    flows = Some(cache);
1002                }
1003                Ok(ManifestLoad::Legacy {
1004                    manifest: m,
1005                    flows: cache,
1006                }) => {
1007                    metadata = cache.metadata.clone();
1008                    legacy_manifest = Some(m);
1009                    flows = Some(cache);
1010                }
1011                Err(err) => {
1012                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1013                }
1014            }
1015        }
1016        let components = if is_component {
1017            let wasm_bytes = fs::read(&safe_path).await?;
1018            metadata = PackMetadata::from_wasm(&wasm_bytes)
1019                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1020            let name = safe_path
1021                .file_stem()
1022                .map(|s| s.to_string_lossy().to_string())
1023                .unwrap_or_else(|| "component".to_string());
1024            let component = Component::from_binary(&engine, &wasm_bytes)?;
1025            let mut map = HashMap::new();
1026            map.insert(
1027                name.clone(),
1028                PackComponent {
1029                    name,
1030                    version: metadata.version.clone(),
1031                    component,
1032                },
1033            );
1034            map
1035        } else {
1036            let specs = component_specs(manifest.as_ref(), legacy_manifest.as_deref());
1037            if specs.is_empty() {
1038                HashMap::new()
1039            } else {
1040                let mut loaded = HashMap::new();
1041                let mut missing: HashSet<String> =
1042                    specs.iter().map(|spec| spec.id.clone()).collect();
1043                let mut searched = Vec::new();
1044
1045                if !component_resolution.overrides.is_empty() {
1046                    load_components_from_overrides(
1047                        &engine,
1048                        &component_resolution.overrides,
1049                        &specs,
1050                        &mut missing,
1051                        &mut loaded,
1052                    )?;
1053                    searched.push("override map".to_string());
1054                }
1055
1056                if let Some(root) = materialized_root.as_ref() {
1057                    load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1058                    searched.push(format!("components dir {}", root.display()));
1059                }
1060
1061                if let Some(archive_path) = archive_hint {
1062                    load_components_from_archive(
1063                        &engine,
1064                        archive_path,
1065                        &specs,
1066                        &mut missing,
1067                        &mut loaded,
1068                    )?;
1069                    searched.push(format!("archive {}", archive_path.display()));
1070                }
1071
1072                if !missing.is_empty() {
1073                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1074                    let sources = if searched.is_empty() {
1075                        "no component sources".to_string()
1076                    } else {
1077                        searched.join(", ")
1078                    };
1079                    bail!(
1080                        "components missing: {}; looked in {}",
1081                        missing_list,
1082                        sources
1083                    );
1084                }
1085
1086                loaded
1087            }
1088        };
1089        let http_client = Arc::clone(&HTTP_CLIENT);
1090        Ok(Self {
1091            path: safe_path,
1092            archive_path: archive_hint.map(Path::to_path_buf),
1093            config,
1094            engine,
1095            metadata,
1096            manifest,
1097            legacy_manifest,
1098            mocks,
1099            flows,
1100            components,
1101            http_client,
1102            pre_cache: Mutex::new(HashMap::new()),
1103            session_store,
1104            state_store,
1105            wasi_policy,
1106            provider_registry: RwLock::new(None),
1107            secrets,
1108            oauth_config,
1109        })
1110    }
1111
1112    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1113        if let Some(cache) = &self.flows {
1114            return Ok(cache.descriptors.clone());
1115        }
1116        if let Some(manifest) = &self.manifest {
1117            let descriptors = manifest
1118                .flows
1119                .iter()
1120                .map(|flow| FlowDescriptor {
1121                    id: flow.id.as_str().to_string(),
1122                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1123                    profile: manifest.pack_id.as_str().to_string(),
1124                    version: manifest.version.to_string(),
1125                    description: None,
1126                })
1127                .collect();
1128            return Ok(descriptors);
1129        }
1130        Ok(Vec::new())
1131    }
1132
1133    #[allow(dead_code)]
1134    pub async fn run_flow(
1135        &self,
1136        flow_id: &str,
1137        input: serde_json::Value,
1138    ) -> Result<serde_json::Value> {
1139        let pack = Arc::new(
1140            PackRuntime::load(
1141                &self.path,
1142                Arc::clone(&self.config),
1143                self.mocks.clone(),
1144                self.archive_path.as_deref(),
1145                self.session_store.clone(),
1146                self.state_store.clone(),
1147                Arc::clone(&self.wasi_policy),
1148                self.secrets.clone(),
1149                self.oauth_config.clone(),
1150                false,
1151                ComponentResolution::default(),
1152            )
1153            .await?,
1154        );
1155
1156        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1157        let retry_config = self.config.retry_config().into();
1158        let mocks = pack.mocks.as_deref();
1159        let tenant = self.config.tenant.as_str();
1160
1161        let ctx = FlowContext {
1162            tenant,
1163            flow_id,
1164            node_id: None,
1165            tool: None,
1166            action: None,
1167            session_id: None,
1168            provider_id: None,
1169            retry_config,
1170            observer: None,
1171            mocks,
1172        };
1173
1174        let execution = engine.execute(ctx, input).await?;
1175        match execution.status {
1176            FlowStatus::Completed => Ok(execution.output),
1177            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1178                "status": "pending",
1179                "reason": wait.reason,
1180                "resume": wait.snapshot,
1181                "response": execution.output,
1182            })),
1183        }
1184    }
1185
1186    pub async fn invoke_component(
1187        &self,
1188        component_ref: &str,
1189        ctx: ComponentExecCtx,
1190        operation: &str,
1191        _config_json: Option<String>,
1192        input_json: String,
1193    ) -> Result<Value> {
1194        let pack_component = self
1195            .components
1196            .get(component_ref)
1197            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1198
1199        let mut linker = Linker::new(&self.engine);
1200        register_all(&mut linker)?;
1201        add_component_control_to_linker(&mut linker)?;
1202        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1203        let pre: ComponentPre<ComponentState> = ComponentPre::new(pre_instance)?;
1204
1205        let host_state = HostState::new(
1206            Arc::clone(&self.config),
1207            Arc::clone(&self.http_client),
1208            self.mocks.clone(),
1209            self.session_store.clone(),
1210            self.state_store.clone(),
1211            Arc::clone(&self.secrets),
1212            self.oauth_config.clone(),
1213        )?;
1214        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1215        let mut store = wasmtime::Store::new(&self.engine, store_state);
1216        let bindings: crate::component_api::Component = pre.instantiate_async(&mut store).await?;
1217        let node = bindings.greentic_component_node();
1218
1219        let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1220
1221        match result {
1222            InvokeResult::Ok(body) => {
1223                if body.is_empty() {
1224                    return Ok(Value::Null);
1225                }
1226                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1227            }
1228            InvokeResult::Err(NodeError {
1229                code,
1230                message,
1231                retryable,
1232                backoff_ms,
1233                details,
1234            }) => {
1235                let mut obj = serde_json::Map::new();
1236                obj.insert("ok".into(), Value::Bool(false));
1237                let mut error = serde_json::Map::new();
1238                error.insert("code".into(), Value::String(code));
1239                error.insert("message".into(), Value::String(message));
1240                error.insert("retryable".into(), Value::Bool(retryable));
1241                if let Some(backoff) = backoff_ms {
1242                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1243                }
1244                if let Some(details) = details {
1245                    error.insert(
1246                        "details".into(),
1247                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1248                    );
1249                }
1250                obj.insert("error".into(), Value::Object(error));
1251                Ok(Value::Object(obj))
1252            }
1253        }
1254    }
1255
1256    pub fn resolve_provider(
1257        &self,
1258        provider_id: Option<&str>,
1259        provider_type: Option<&str>,
1260    ) -> Result<ProviderBinding> {
1261        let registry = self.provider_registry()?;
1262        registry.resolve(provider_id, provider_type)
1263    }
1264
1265    pub async fn invoke_provider(
1266        &self,
1267        binding: &ProviderBinding,
1268        _ctx: ComponentExecCtx,
1269        op: &str,
1270        input_json: Vec<u8>,
1271    ) -> Result<Value> {
1272        let component_ref = &binding.component_ref;
1273        let pack_component = self
1274            .components
1275            .get(component_ref)
1276            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1277
1278        let mut linker = Linker::new(&self.engine);
1279        register_all(&mut linker)?;
1280        add_component_control_to_linker(&mut linker)?;
1281        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1282        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1283
1284        let host_state = HostState::new(
1285            Arc::clone(&self.config),
1286            Arc::clone(&self.http_client),
1287            self.mocks.clone(),
1288            self.session_store.clone(),
1289            self.state_store.clone(),
1290            Arc::clone(&self.secrets),
1291            self.oauth_config.clone(),
1292        )?;
1293        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1294        let mut store = wasmtime::Store::new(&self.engine, store_state);
1295        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1296        let provider = bindings.greentic_provider_core_schema_core_api();
1297
1298        let result = provider.call_invoke(&mut store, op, &input_json)?;
1299        deserialize_json_bytes(result)
1300    }
1301
1302    fn provider_registry(&self) -> Result<ProviderRegistry> {
1303        if let Some(registry) = self.provider_registry.read().clone() {
1304            return Ok(registry);
1305        }
1306        let manifest = self
1307            .manifest
1308            .as_ref()
1309            .context("pack manifest required for provider resolution")?;
1310        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1311        let registry = ProviderRegistry::new(
1312            manifest,
1313            self.state_store.clone(),
1314            &self.config.tenant,
1315            &env,
1316        )?;
1317        *self.provider_registry.write() = Some(registry.clone());
1318        Ok(registry)
1319    }
1320
1321    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1322        if let Some(cache) = &self.flows {
1323            return cache
1324                .flows
1325                .get(flow_id)
1326                .cloned()
1327                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1328        }
1329        if let Some(manifest) = &self.manifest {
1330            let entry = manifest
1331                .flows
1332                .iter()
1333                .find(|f| f.id.as_str() == flow_id)
1334                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1335            return Ok(entry.flow.clone());
1336        }
1337        bail!("flow '{flow_id}' not available (pack exports disabled)")
1338    }
1339
1340    pub fn metadata(&self) -> &PackMetadata {
1341        &self.metadata
1342    }
1343
1344    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1345        &self.metadata.secret_requirements
1346    }
1347
1348    pub fn missing_secrets(
1349        &self,
1350        tenant_ctx: &TypesTenantCtx,
1351    ) -> Vec<greentic_types::SecretRequirement> {
1352        let env = tenant_ctx.env.as_str().to_string();
1353        let tenant = tenant_ctx.tenant.as_str().to_string();
1354        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1355        self.required_secrets()
1356            .iter()
1357            .filter(|req| {
1358                // scope must match current context if provided
1359                if let Some(scope) = &req.scope {
1360                    if scope.env != env {
1361                        return false;
1362                    }
1363                    if scope.tenant != tenant {
1364                        return false;
1365                    }
1366                    if let Some(ref team_req) = scope.team
1367                        && team.as_ref() != Some(team_req)
1368                    {
1369                        return false;
1370                    }
1371                }
1372                read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1373            })
1374            .cloned()
1375            .collect()
1376    }
1377
1378    pub fn for_component_test(
1379        components: Vec<(String, PathBuf)>,
1380        flows: HashMap<String, FlowIR>,
1381        config: Arc<HostConfig>,
1382    ) -> Result<Self> {
1383        let engine = Engine::default();
1384        let mut component_map = HashMap::new();
1385        for (name, path) in components {
1386            if !path.exists() {
1387                bail!("component artifact missing: {}", path.display());
1388            }
1389            let wasm_bytes = std::fs::read(&path)?;
1390            let component = Component::from_binary(&engine, &wasm_bytes)
1391                .with_context(|| format!("failed to compile component {}", path.display()))?;
1392            component_map.insert(
1393                name.clone(),
1394                PackComponent {
1395                    name,
1396                    version: "0.0.0".into(),
1397                    component,
1398                },
1399            );
1400        }
1401
1402        let mut flow_map = HashMap::new();
1403        let mut descriptors = Vec::new();
1404        for (id, ir) in flows {
1405            let flow_type = ir.flow_type.clone();
1406            let flow = flow_ir_to_flow(ir)?;
1407            flow_map.insert(id.clone(), flow);
1408            descriptors.push(FlowDescriptor {
1409                id: id.clone(),
1410                flow_type,
1411                profile: "test".into(),
1412                version: "0.0.0".into(),
1413                description: None,
1414            });
1415        }
1416        let flows_cache = PackFlows {
1417            descriptors: descriptors.clone(),
1418            flows: flow_map,
1419            metadata: PackMetadata::fallback(Path::new("component-test")),
1420        };
1421
1422        Ok(Self {
1423            path: PathBuf::new(),
1424            archive_path: None,
1425            config,
1426            engine,
1427            metadata: PackMetadata::fallback(Path::new("component-test")),
1428            manifest: None,
1429            legacy_manifest: None,
1430            mocks: None,
1431            flows: Some(flows_cache),
1432            components: component_map,
1433            http_client: Arc::clone(&HTTP_CLIENT),
1434            pre_cache: Mutex::new(HashMap::new()),
1435            session_store: None,
1436            state_store: None,
1437            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1438            provider_registry: RwLock::new(None),
1439            secrets: crate::secrets::default_manager(),
1440            oauth_config: None,
1441        })
1442    }
1443}
1444
1445struct PackFlows {
1446    descriptors: Vec<FlowDescriptor>,
1447    flows: HashMap<String, Flow>,
1448    metadata: PackMetadata,
1449}
1450
1451fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1452    if bytes.is_empty() {
1453        return Ok(Value::Null);
1454    }
1455    serde_json::from_slice(&bytes).or_else(|_| {
1456        String::from_utf8(bytes)
1457            .map(Value::String)
1458            .map_err(|err| anyhow!(err))
1459    })
1460}
1461
1462impl PackFlows {
1463    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1464        let descriptors = manifest
1465            .flows
1466            .iter()
1467            .map(|entry| FlowDescriptor {
1468                id: entry.id.as_str().to_string(),
1469                flow_type: flow_kind_to_str(entry.kind).to_string(),
1470                profile: manifest.pack_id.as_str().to_string(),
1471                version: manifest.version.to_string(),
1472                description: None,
1473            })
1474            .collect();
1475        let mut flows = HashMap::new();
1476        for entry in &manifest.flows {
1477            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1478        }
1479        Self {
1480            metadata: PackMetadata::from_manifest(&manifest),
1481            descriptors,
1482            flows,
1483        }
1484    }
1485}
1486
1487fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1488    match kind {
1489        greentic_types::FlowKind::Messaging => "messaging",
1490        greentic_types::FlowKind::Event => "event",
1491        greentic_types::FlowKind::ComponentConfig => "component-config",
1492        greentic_types::FlowKind::Job => "job",
1493        greentic_types::FlowKind::Http => "http",
1494    }
1495}
1496
1497fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1498    let mut file = archive
1499        .by_name(name)
1500        .with_context(|| format!("entry {name} missing from archive"))?;
1501    let mut buf = Vec::new();
1502    file.read_to_end(&mut buf)?;
1503    Ok(buf)
1504}
1505
1506fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1507    for node in doc.nodes.values_mut() {
1508        if node.component.is_empty()
1509            && let Some((component_ref, payload)) = node.raw.iter().next()
1510        {
1511            if component_ref.starts_with("emit.") {
1512                node.component = component_ref.clone();
1513                node.payload = payload.clone();
1514                node.raw.clear();
1515                continue;
1516            }
1517            let (target_component, operation, input, config) =
1518                infer_component_exec(payload, component_ref);
1519            let mut payload_obj = serde_json::Map::new();
1520            // component.exec is meta; ensure the payload carries the actual target component.
1521            payload_obj.insert("component".into(), Value::String(target_component));
1522            payload_obj.insert("operation".into(), Value::String(operation));
1523            payload_obj.insert("input".into(), input);
1524            if let Some(cfg) = config {
1525                payload_obj.insert("config".into(), cfg);
1526            }
1527            node.component = "component.exec".to_string();
1528            node.payload = Value::Object(payload_obj);
1529        }
1530    }
1531    doc
1532}
1533
1534fn infer_component_exec(
1535    payload: &Value,
1536    component_ref: &str,
1537) -> (String, String, Value, Option<Value>) {
1538    let default_op = if component_ref.starts_with("templating.") {
1539        "render"
1540    } else {
1541        "invoke"
1542    }
1543    .to_string();
1544
1545    if let Value::Object(map) = payload {
1546        let op = map
1547            .get("op")
1548            .or_else(|| map.get("operation"))
1549            .and_then(Value::as_str)
1550            .map(|s| s.to_string())
1551            .unwrap_or_else(|| default_op.clone());
1552
1553        let mut input = map.clone();
1554        let config = input.remove("config");
1555        let component = input
1556            .get("component")
1557            .or_else(|| input.get("component_ref"))
1558            .and_then(Value::as_str)
1559            .map(|s| s.to_string())
1560            .unwrap_or_else(|| component_ref.to_string());
1561        input.remove("component");
1562        input.remove("component_ref");
1563        input.remove("op");
1564        input.remove("operation");
1565        return (component, op, Value::Object(input), config);
1566    }
1567
1568    (component_ref.to_string(), default_op, payload.clone(), None)
1569}
1570
1571#[derive(Clone, Debug)]
1572struct ComponentSpec {
1573    id: String,
1574    version: String,
1575    legacy_path: Option<String>,
1576}
1577
1578fn component_specs(
1579    manifest: Option<&greentic_types::PackManifest>,
1580    legacy_manifest: Option<&legacy_pack::PackManifest>,
1581) -> Vec<ComponentSpec> {
1582    if let Some(manifest) = manifest {
1583        return manifest
1584            .components
1585            .iter()
1586            .map(|entry| ComponentSpec {
1587                id: entry.id.as_str().to_string(),
1588                version: entry.version.to_string(),
1589                legacy_path: None,
1590            })
1591            .collect();
1592    }
1593    if let Some(legacy_manifest) = legacy_manifest {
1594        return legacy_manifest
1595            .components
1596            .iter()
1597            .map(|entry| ComponentSpec {
1598                id: entry.name.clone(),
1599                version: entry.version.to_string(),
1600                legacy_path: Some(entry.file_wasm.clone()),
1601            })
1602            .collect();
1603    }
1604    Vec::new()
1605}
1606
1607fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1608    if let Some(path) = &spec.legacy_path {
1609        return root.join(path);
1610    }
1611    root.join("components").join(format!("{}.wasm", spec.id))
1612}
1613
1614fn load_components_from_overrides(
1615    engine: &Engine,
1616    overrides: &HashMap<String, PathBuf>,
1617    specs: &[ComponentSpec],
1618    missing: &mut HashSet<String>,
1619    into: &mut HashMap<String, PackComponent>,
1620) -> Result<()> {
1621    for spec in specs {
1622        if !missing.contains(&spec.id) {
1623            continue;
1624        }
1625        let Some(path) = overrides.get(&spec.id) else {
1626            continue;
1627        };
1628        let bytes = std::fs::read(path)
1629            .with_context(|| format!("failed to read override component {}", path.display()))?;
1630        let component = Component::from_binary(engine, &bytes).with_context(|| {
1631            format!(
1632                "failed to compile component {} from override {}",
1633                spec.id,
1634                path.display()
1635            )
1636        })?;
1637        into.insert(
1638            spec.id.clone(),
1639            PackComponent {
1640                name: spec.id.clone(),
1641                version: spec.version.clone(),
1642                component,
1643            },
1644        );
1645        missing.remove(&spec.id);
1646    }
1647    Ok(())
1648}
1649
1650fn load_components_from_dir(
1651    engine: &Engine,
1652    root: &Path,
1653    specs: &[ComponentSpec],
1654    missing: &mut HashSet<String>,
1655    into: &mut HashMap<String, PackComponent>,
1656) -> Result<()> {
1657    for spec in specs {
1658        if !missing.contains(&spec.id) {
1659            continue;
1660        }
1661        let path = component_path_for_spec(root, spec);
1662        if !path.exists() {
1663            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
1664            continue;
1665        }
1666        let bytes = std::fs::read(&path)
1667            .with_context(|| format!("failed to read component {}", path.display()))?;
1668        let component = Component::from_binary(engine, &bytes).with_context(|| {
1669            format!(
1670                "failed to compile component {} from {}",
1671                spec.id,
1672                path.display()
1673            )
1674        })?;
1675        into.insert(
1676            spec.id.clone(),
1677            PackComponent {
1678                name: spec.id.clone(),
1679                version: spec.version.clone(),
1680                component,
1681            },
1682        );
1683        missing.remove(&spec.id);
1684    }
1685    Ok(())
1686}
1687
1688fn load_components_from_archive(
1689    engine: &Engine,
1690    path: &Path,
1691    specs: &[ComponentSpec],
1692    missing: &mut HashSet<String>,
1693    into: &mut HashMap<String, PackComponent>,
1694) -> Result<()> {
1695    let mut archive = ZipArchive::new(File::open(path)?)
1696        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1697    for spec in specs {
1698        if !missing.contains(&spec.id) {
1699            continue;
1700        }
1701        let file_name = spec
1702            .legacy_path
1703            .clone()
1704            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
1705        let bytes = match read_entry(&mut archive, &file_name) {
1706            Ok(bytes) => bytes,
1707            Err(err) => {
1708                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
1709                continue;
1710            }
1711        };
1712        let component = Component::from_binary(engine, &bytes)
1713            .with_context(|| format!("failed to compile component {}", spec.id))?;
1714        into.insert(
1715            spec.id.clone(),
1716            PackComponent {
1717                name: spec.id.clone(),
1718                version: spec.version.clone(),
1719                component,
1720            },
1721        );
1722        missing.remove(&spec.id);
1723    }
1724    Ok(())
1725}
1726
1727#[cfg(test)]
1728mod tests {
1729    use super::*;
1730    use greentic_flow::model::{FlowDoc, NodeDoc};
1731    use serde_json::json;
1732    use std::collections::BTreeMap;
1733
1734    #[test]
1735    fn normalizes_raw_component_to_component_exec() {
1736        let mut nodes = BTreeMap::new();
1737        let mut raw = BTreeMap::new();
1738        raw.insert(
1739            "templating.handlebars".into(),
1740            json!({ "template": "Hi {{name}}" }),
1741        );
1742        nodes.insert(
1743            "start".into(),
1744            NodeDoc {
1745                raw,
1746                routing: json!([{"out": true}]),
1747                ..Default::default()
1748            },
1749        );
1750        let doc = FlowDoc {
1751            id: "welcome".into(),
1752            title: None,
1753            description: None,
1754            flow_type: "messaging".into(),
1755            start: Some("start".into()),
1756            parameters: json!({}),
1757            tags: Vec::new(),
1758            entrypoints: BTreeMap::new(),
1759            nodes,
1760        };
1761
1762        let normalized = normalize_flow_doc(doc);
1763        let node = normalized.nodes.get("start").expect("node exists");
1764        assert_eq!(node.component, "component.exec");
1765        assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1766        let payload = node.payload.as_object().expect("payload object");
1767        assert_eq!(
1768            payload.get("component"),
1769            Some(&Value::String("templating.handlebars".into()))
1770        );
1771        assert_eq!(
1772            payload.get("operation"),
1773            Some(&Value::String("render".into()))
1774        );
1775        let input = payload.get("input").unwrap();
1776        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1777    }
1778}
1779
1780#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1781pub struct PackMetadata {
1782    pub pack_id: String,
1783    pub version: String,
1784    #[serde(default)]
1785    pub entry_flows: Vec<String>,
1786    #[serde(default)]
1787    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
1788}
1789
1790impl PackMetadata {
1791    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1792        let parser = Parser::new(0);
1793        for payload in parser.parse_all(bytes) {
1794            let payload = payload.ok()?;
1795            match payload {
1796                Payload::CustomSection(section) => {
1797                    if section.name() == "greentic.manifest"
1798                        && let Ok(meta) = Self::from_bytes(section.data())
1799                    {
1800                        return Some(meta);
1801                    }
1802                }
1803                Payload::DataSection(reader) => {
1804                    for segment in reader.into_iter().flatten() {
1805                        if let Ok(meta) = Self::from_bytes(segment.data) {
1806                            return Some(meta);
1807                        }
1808                    }
1809                }
1810                _ => {}
1811            }
1812        }
1813        None
1814    }
1815
1816    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1817        #[derive(Deserialize)]
1818        struct RawManifest {
1819            pack_id: String,
1820            version: String,
1821            #[serde(default)]
1822            entry_flows: Vec<String>,
1823            #[serde(default)]
1824            flows: Vec<RawFlow>,
1825            #[serde(default)]
1826            secret_requirements: Vec<greentic_types::SecretRequirement>,
1827        }
1828
1829        #[derive(Deserialize)]
1830        struct RawFlow {
1831            id: String,
1832        }
1833
1834        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1835        let mut entry_flows = if manifest.entry_flows.is_empty() {
1836            manifest.flows.iter().map(|f| f.id.clone()).collect()
1837        } else {
1838            manifest.entry_flows.clone()
1839        };
1840        entry_flows.retain(|id| !id.is_empty());
1841        Ok(Self {
1842            pack_id: manifest.pack_id,
1843            version: manifest.version,
1844            entry_flows,
1845            secret_requirements: manifest.secret_requirements,
1846        })
1847    }
1848
1849    pub fn fallback(path: &Path) -> Self {
1850        let pack_id = path
1851            .file_stem()
1852            .map(|s| s.to_string_lossy().into_owned())
1853            .unwrap_or_else(|| "unknown-pack".to_string());
1854        Self {
1855            pack_id,
1856            version: "0.0.0".to_string(),
1857            entry_flows: Vec::new(),
1858            secret_requirements: Vec::new(),
1859        }
1860    }
1861
1862    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
1863        let entry_flows = manifest
1864            .flows
1865            .iter()
1866            .map(|flow| flow.id.as_str().to_string())
1867            .collect::<Vec<_>>();
1868        Self {
1869            pack_id: manifest.pack_id.as_str().to_string(),
1870            version: manifest.version.to_string(),
1871            entry_flows,
1872            secret_requirements: manifest.secret_requirements.clone(),
1873        }
1874    }
1875}