greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11    ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20    self as host_v1, HostFns, add_all_v1_to_linker,
21    runner_host_http::RunnerHostHttp,
22    runner_host_kv::RunnerHostKv,
23    secrets_store::{SecretsError, SecretsStoreHost},
24    state_store::{
25        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
26        StateStoreHost, TenantCtx as StateTenantCtx,
27    },
28    telemetry_logger::{
29        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
30        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
31        TenantCtx as TelemetryTenantCtx,
32    },
33};
34use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
35use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
36use greentic_pack::builder as legacy_pack;
37use greentic_types::flow::FlowHasher;
38use greentic_types::{
39    ComponentId, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata,
40    InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey, TeamId,
41    TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
42    pack_manifest::ExtensionInline,
43};
44use host_v1::http_client::{
45    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
46    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
47    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
48    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
49};
50use indexmap::IndexMap;
51use once_cell::sync::Lazy;
52use parking_lot::{Mutex, RwLock};
53use reqwest::blocking::Client as BlockingClient;
54use runner_core::normalize_under_root;
55use serde::{Deserialize, Serialize};
56use serde_cbor;
57use serde_json::{self, Value};
58use tokio::fs;
59use wasmparser::{Parser, Payload};
60use wasmtime::StoreContextMut;
61use zip::ZipArchive;
62
63use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
64use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
65use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
66
67use crate::config::HostConfig;
68use crate::secrets::{DynSecretsManager, read_secret_blocking};
69use crate::storage::state::STATE_PREFIX;
70use crate::storage::{DynSessionStore, DynStateStore};
71use crate::verify;
72use crate::wasi::RunnerWasiPolicy;
73use tracing::warn;
74use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
75use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
76
77use greentic_flow::model::FlowDoc;
78
79#[allow(dead_code)]
80pub struct PackRuntime {
81    /// Component artifact path (wasm file).
82    path: PathBuf,
83    /// Optional archive (.gtpack) used to load flows/manifests.
84    archive_path: Option<PathBuf>,
85    config: Arc<HostConfig>,
86    engine: Engine,
87    metadata: PackMetadata,
88    manifest: Option<greentic_types::PackManifest>,
89    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
90    mocks: Option<Arc<MockLayer>>,
91    flows: Option<PackFlows>,
92    components: HashMap<String, PackComponent>,
93    http_client: Arc<BlockingClient>,
94    pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
95    session_store: Option<DynSessionStore>,
96    state_store: Option<DynStateStore>,
97    wasi_policy: Arc<RunnerWasiPolicy>,
98    provider_registry: RwLock<Option<ProviderRegistry>>,
99    secrets: DynSecretsManager,
100    oauth_config: Option<OAuthBrokerConfig>,
101}
102
103struct PackComponent {
104    #[allow(dead_code)]
105    name: String,
106    #[allow(dead_code)]
107    version: String,
108    component: Component,
109}
110
111#[derive(Debug, Default, Clone)]
112pub struct ComponentResolution {
113    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
114    pub materialized_root: Option<PathBuf>,
115    /// Explicit overrides mapping component id -> wasm path.
116    pub overrides: HashMap<String, PathBuf>,
117}
118
119fn build_blocking_client() -> BlockingClient {
120    std::thread::spawn(|| {
121        BlockingClient::builder()
122            .no_proxy()
123            .build()
124            .expect("blocking client")
125    })
126    .join()
127    .expect("client build thread panicked")
128}
129
130fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
131    let (root, candidate) = if path.is_absolute() {
132        let parent = path
133            .parent()
134            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
135        let root = parent
136            .canonicalize()
137            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
138        let file = path
139            .file_name()
140            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
141        (root, PathBuf::from(file))
142    } else {
143        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
144        let base = if let Some(parent) = path.parent() {
145            cwd.join(parent)
146        } else {
147            cwd
148        };
149        let root = base
150            .canonicalize()
151            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
152        let file = path
153            .file_name()
154            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
155        (root, PathBuf::from(file))
156    };
157    let safe = normalize_under_root(&root, &candidate)?;
158    Ok((root, safe))
159}
160
161static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct FlowDescriptor {
165    pub id: String,
166    #[serde(rename = "type")]
167    pub flow_type: String,
168    pub profile: String,
169    pub version: String,
170    #[serde(default)]
171    pub description: Option<String>,
172}
173
174pub struct HostState {
175    config: Arc<HostConfig>,
176    http_client: Arc<BlockingClient>,
177    default_env: String,
178    #[allow(dead_code)]
179    session_store: Option<DynSessionStore>,
180    state_store: Option<DynStateStore>,
181    mocks: Option<Arc<MockLayer>>,
182    secrets: DynSecretsManager,
183    oauth_config: Option<OAuthBrokerConfig>,
184    oauth_host: OAuthBrokerHost,
185}
186
187impl HostState {
188    #[allow(clippy::default_constructed_unit_structs)]
189    pub fn new(
190        config: Arc<HostConfig>,
191        http_client: Arc<BlockingClient>,
192        mocks: Option<Arc<MockLayer>>,
193        session_store: Option<DynSessionStore>,
194        state_store: Option<DynStateStore>,
195        secrets: DynSecretsManager,
196        oauth_config: Option<OAuthBrokerConfig>,
197    ) -> Result<Self> {
198        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
199        Ok(Self {
200            config,
201            http_client,
202            default_env,
203            session_store,
204            state_store,
205            mocks,
206            secrets,
207            oauth_config,
208            oauth_host: OAuthBrokerHost::default(),
209        })
210    }
211
212    pub fn get_secret(&self, key: &str) -> Result<String> {
213        if provider_core_only::is_enabled() {
214            bail!(provider_core_only::blocked_message("secrets"))
215        }
216        if !self.config.secrets_policy.is_allowed(key) {
217            bail!("secret {key} is not permitted by bindings policy");
218        }
219        if let Some(mock) = &self.mocks
220            && let Some(value) = mock.secrets_lookup(key)
221        {
222            return Ok(value);
223        }
224        let bytes = read_secret_blocking(&self.secrets, key)
225            .context("failed to read secret from manager")?;
226        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
227        Ok(value)
228    }
229
230    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
231        let tenant_raw = ctx
232            .as_ref()
233            .map(|ctx| ctx.tenant.clone())
234            .unwrap_or_else(|| self.config.tenant.clone());
235        let env_raw = ctx
236            .as_ref()
237            .map(|ctx| ctx.env.clone())
238            .unwrap_or_else(|| self.default_env.clone());
239        let tenant_id = TenantId::from_str(&tenant_raw)
240            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
241        let env_id = EnvId::from_str(&env_raw)
242            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
243        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
244        if let Some(ctx) = ctx {
245            if let Some(team) = ctx.team.or(ctx.team_id) {
246                let team_id =
247                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
248                tenant_ctx = tenant_ctx.with_team(Some(team_id));
249            }
250            if let Some(user) = ctx.user.or(ctx.user_id) {
251                let user_id =
252                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
253                tenant_ctx = tenant_ctx.with_user(Some(user_id));
254            }
255            if let Some(flow) = ctx.flow_id {
256                tenant_ctx = tenant_ctx.with_flow(flow);
257            }
258            if let Some(node) = ctx.node_id {
259                tenant_ctx = tenant_ctx.with_node(node);
260            }
261            if let Some(provider) = ctx.provider_id {
262                tenant_ctx = tenant_ctx.with_provider(provider);
263            }
264            if let Some(session) = ctx.session_id {
265                tenant_ctx = tenant_ctx.with_session(session);
266            }
267            tenant_ctx.trace_id = ctx.trace_id;
268        }
269        Ok(tenant_ctx)
270    }
271
272    fn send_http_request(
273        &mut self,
274        req: HttpRequest,
275        opts: Option<HttpRequestOptionsV1_1>,
276        _ctx: Option<HttpTenantCtx>,
277    ) -> Result<HttpResponse, HttpClientError> {
278        if !self.config.http_enabled {
279            return Err(HttpClientError {
280                code: "denied".into(),
281                message: "http client disabled by policy".into(),
282            });
283        }
284
285        let mut mock_state = None;
286        let raw_body = req.body.clone();
287        if let Some(mock) = &self.mocks
288            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
289        {
290            match mock.http_begin(&meta) {
291                HttpDecision::Mock(response) => {
292                    let headers = response
293                        .headers
294                        .iter()
295                        .map(|(k, v)| (k.clone(), v.clone()))
296                        .collect();
297                    return Ok(HttpResponse {
298                        status: response.status,
299                        headers,
300                        body: response.body.clone().map(|b| b.into_bytes()),
301                    });
302                }
303                HttpDecision::Deny(reason) => {
304                    return Err(HttpClientError {
305                        code: "denied".into(),
306                        message: reason,
307                    });
308                }
309                HttpDecision::Passthrough { record } => {
310                    mock_state = Some((meta, record));
311                }
312            }
313        }
314
315        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
316        let mut builder = self.http_client.request(method, &req.url);
317        for (key, value) in req.headers {
318            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
319                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
320            {
321                builder = builder.header(header, header_value);
322            }
323        }
324
325        if let Some(body) = raw_body.clone() {
326            builder = builder.body(body);
327        }
328
329        if let Some(opts) = opts {
330            if let Some(timeout_ms) = opts.timeout_ms {
331                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
332            }
333            if opts.allow_insecure == Some(true) {
334                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
335            }
336            if let Some(follow_redirects) = opts.follow_redirects
337                && !follow_redirects
338            {
339                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
340            }
341        }
342
343        let response = match builder.send() {
344            Ok(resp) => resp,
345            Err(err) => {
346                warn!(url = %req.url, error = %err, "http client request failed");
347                return Err(HttpClientError {
348                    code: "unavailable".into(),
349                    message: err.to_string(),
350                });
351            }
352        };
353
354        let status = response.status().as_u16();
355        let headers_vec = response
356            .headers()
357            .iter()
358            .map(|(k, v)| {
359                (
360                    k.as_str().to_string(),
361                    v.to_str().unwrap_or_default().to_string(),
362                )
363            })
364            .collect::<Vec<_>>();
365        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
366
367        if let Some((meta, true)) = mock_state.take()
368            && let Some(mock) = &self.mocks
369        {
370            let recorded = HttpMockResponse::new(
371                status,
372                headers_vec.clone().into_iter().collect(),
373                body_bytes
374                    .as_ref()
375                    .map(|b| String::from_utf8_lossy(b).into_owned()),
376            );
377            mock.http_record(&meta, &recorded);
378        }
379
380        Ok(HttpResponse {
381            status,
382            headers: headers_vec,
383            body: body_bytes,
384        })
385    }
386}
387
388impl SecretsStoreHost for HostState {
389    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
390        if provider_core_only::is_enabled() {
391            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
392            return Err(SecretsError::Denied);
393        }
394        if !self.config.secrets_policy.is_allowed(&key) {
395            return Err(SecretsError::Denied);
396        }
397        if let Some(mock) = &self.mocks
398            && let Some(value) = mock.secrets_lookup(&key)
399        {
400            return Ok(Some(value.into_bytes()));
401        }
402        match read_secret_blocking(&self.secrets, &key) {
403            Ok(bytes) => Ok(Some(bytes)),
404            Err(err) => {
405                warn!(secret = %key, error = %err, "secret lookup failed");
406                Err(SecretsError::NotFound)
407            }
408        }
409    }
410}
411
412impl HttpClientHost for HostState {
413    fn send(
414        &mut self,
415        req: HttpRequest,
416        ctx: Option<HttpTenantCtx>,
417    ) -> Result<HttpResponse, HttpClientError> {
418        self.send_http_request(req, None, ctx)
419    }
420}
421
422impl HttpClientHostV1_1 for HostState {
423    fn send(
424        &mut self,
425        req: HttpRequestV1_1,
426        opts: Option<HttpRequestOptionsV1_1>,
427        ctx: Option<HttpTenantCtxV1_1>,
428    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
429        let legacy_req = HttpRequest {
430            method: req.method,
431            url: req.url,
432            headers: req.headers,
433            body: req.body,
434        };
435        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
436            env: ctx.env,
437            tenant: ctx.tenant,
438            tenant_id: ctx.tenant_id,
439            team: ctx.team,
440            team_id: ctx.team_id,
441            user: ctx.user,
442            user_id: ctx.user_id,
443            trace_id: ctx.trace_id,
444            correlation_id: ctx.correlation_id,
445            attributes: ctx.attributes,
446            session_id: ctx.session_id,
447            flow_id: ctx.flow_id,
448            node_id: ctx.node_id,
449            provider_id: ctx.provider_id,
450            deadline_ms: ctx.deadline_ms,
451            attempt: ctx.attempt,
452            idempotency_key: ctx.idempotency_key,
453            impersonation: ctx
454                .impersonation
455                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
456                    actor_id,
457                    reason,
458                }),
459        });
460
461        self.send_http_request(legacy_req, opts, legacy_ctx)
462            .map(|resp| HttpResponseV1_1 {
463                status: resp.status,
464                headers: resp.headers,
465                body: resp.body,
466            })
467            .map_err(|err| HttpClientErrorV1_1 {
468                code: err.code,
469                message: err.message,
470            })
471    }
472}
473
474impl StateStoreHost for HostState {
475    fn read(
476        &mut self,
477        key: HostStateKey,
478        ctx: Option<StateTenantCtx>,
479    ) -> Result<Vec<u8>, StateError> {
480        let store = match self.state_store.as_ref() {
481            Some(store) => store.clone(),
482            None => {
483                return Err(StateError {
484                    code: "unavailable".into(),
485                    message: "state store not configured".into(),
486                });
487            }
488        };
489        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
490            Ok(ctx) => ctx,
491            Err(err) => {
492                return Err(StateError {
493                    code: "invalid-ctx".into(),
494                    message: err.to_string(),
495                });
496            }
497        };
498        let key = StoreStateKey::from(key);
499        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
500            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
501            Ok(None) => Err(StateError {
502                code: "not_found".into(),
503                message: "state key not found".into(),
504            }),
505            Err(err) => Err(StateError {
506                code: "internal".into(),
507                message: err.to_string(),
508            }),
509        }
510    }
511
512    fn write(
513        &mut self,
514        key: HostStateKey,
515        bytes: Vec<u8>,
516        ctx: Option<StateTenantCtx>,
517    ) -> Result<StateOpAck, StateError> {
518        let store = match self.state_store.as_ref() {
519            Some(store) => store.clone(),
520            None => {
521                return Err(StateError {
522                    code: "unavailable".into(),
523                    message: "state store not configured".into(),
524                });
525            }
526        };
527        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
528            Ok(ctx) => ctx,
529            Err(err) => {
530                return Err(StateError {
531                    code: "invalid-ctx".into(),
532                    message: err.to_string(),
533                });
534            }
535        };
536        let key = StoreStateKey::from(key);
537        let value = serde_json::from_slice(&bytes)
538            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
539        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
540            Ok(()) => Ok(StateOpAck::Ok),
541            Err(err) => Err(StateError {
542                code: "internal".into(),
543                message: err.to_string(),
544            }),
545        }
546    }
547
548    fn delete(
549        &mut self,
550        key: HostStateKey,
551        ctx: Option<StateTenantCtx>,
552    ) -> Result<StateOpAck, StateError> {
553        let store = match self.state_store.as_ref() {
554            Some(store) => store.clone(),
555            None => {
556                return Err(StateError {
557                    code: "unavailable".into(),
558                    message: "state store not configured".into(),
559                });
560            }
561        };
562        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
563            Ok(ctx) => ctx,
564            Err(err) => {
565                return Err(StateError {
566                    code: "invalid-ctx".into(),
567                    message: err.to_string(),
568                });
569            }
570        };
571        let key = StoreStateKey::from(key);
572        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
573            Ok(_) => Ok(StateOpAck::Ok),
574            Err(err) => Err(StateError {
575                code: "internal".into(),
576                message: err.to_string(),
577            }),
578        }
579    }
580}
581
582impl TelemetryLoggerHost for HostState {
583    fn log(
584        &mut self,
585        span: TelemetrySpanContext,
586        fields: Vec<(String, String)>,
587        _ctx: Option<TelemetryTenantCtx>,
588    ) -> Result<TelemetryAck, TelemetryError> {
589        if let Some(mock) = &self.mocks
590            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
591        {
592            return Ok(TelemetryAck::Ok);
593        }
594        let mut map = serde_json::Map::new();
595        for (k, v) in fields {
596            map.insert(k, Value::String(v));
597        }
598        tracing::info!(
599            tenant = %span.tenant,
600            flow_id = %span.flow_id,
601            node = ?span.node_id,
602            provider = %span.provider,
603            fields = %serde_json::Value::Object(map.clone()),
604            "telemetry log from pack"
605        );
606        Ok(TelemetryAck::Ok)
607    }
608}
609
610impl RunnerHostHttp for HostState {
611    fn request(
612        &mut self,
613        method: String,
614        url: String,
615        headers: Vec<String>,
616        body: Option<Vec<u8>>,
617    ) -> Result<Vec<u8>, String> {
618        let req = HttpRequest {
619            method,
620            url,
621            headers: headers
622                .chunks(2)
623                .filter_map(|chunk| {
624                    if chunk.len() == 2 {
625                        Some((chunk[0].clone(), chunk[1].clone()))
626                    } else {
627                        None
628                    }
629                })
630                .collect(),
631            body,
632        };
633        match HttpClientHost::send(self, req, None) {
634            Ok(resp) => Ok(resp.body.unwrap_or_default()),
635            Err(err) => Err(err.message),
636        }
637    }
638}
639
640impl RunnerHostKv for HostState {
641    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
642        None
643    }
644
645    fn put(&mut self, _ns: String, _key: String, _val: String) {}
646}
647
648enum ManifestLoad {
649    New {
650        manifest: Box<greentic_types::PackManifest>,
651        flows: PackFlows,
652    },
653    Legacy {
654        manifest: Box<legacy_pack::PackManifest>,
655        flows: PackFlows,
656    },
657}
658
659fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
660    let mut archive = ZipArchive::new(File::open(path)?)
661        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
662    let bytes = read_entry(&mut archive, "manifest.cbor")
663        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
664    match decode_pack_manifest(&bytes) {
665        Ok(manifest) => {
666            let cache = PackFlows::from_manifest(manifest.clone());
667            Ok(ManifestLoad::New {
668                manifest: Box::new(manifest),
669                flows: cache,
670            })
671        }
672        Err(err) => {
673            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
674            // Fall back to legacy pack manifest
675            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
676                .context("failed to decode legacy pack manifest from manifest.cbor")?;
677            let flows = load_legacy_flows(&mut archive, &legacy)?;
678            Ok(ManifestLoad::Legacy {
679                manifest: Box::new(legacy),
680                flows,
681            })
682        }
683    }
684}
685
686fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
687    let manifest_path = root.join("manifest.cbor");
688    let bytes = std::fs::read(&manifest_path)
689        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
690    match decode_pack_manifest(&bytes) {
691        Ok(manifest) => {
692            let cache = PackFlows::from_manifest(manifest.clone());
693            Ok(ManifestLoad::New {
694                manifest: Box::new(manifest),
695                flows: cache,
696            })
697        }
698        Err(err) => {
699            tracing::debug!(
700                error = %err,
701                pack = %root.display(),
702                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
703            );
704            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
705                .context("failed to decode legacy pack manifest from manifest.cbor")?;
706            let flows = load_legacy_flows_from_dir(root, &legacy)?;
707            Ok(ManifestLoad::Legacy {
708                manifest: Box::new(legacy),
709                flows,
710            })
711        }
712    }
713}
714
715fn load_legacy_flows(
716    archive: &mut ZipArchive<File>,
717    manifest: &legacy_pack::PackManifest,
718) -> Result<PackFlows> {
719    let mut flows = HashMap::new();
720    let mut descriptors = Vec::new();
721
722    for entry in &manifest.flows {
723        let bytes = read_entry(archive, &entry.file_json)
724            .with_context(|| format!("missing flow json {}", entry.file_json))?;
725        let doc: FlowDoc = serde_json::from_slice(&bytes)
726            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
727        let normalized = normalize_flow_doc(doc);
728        let flow_ir = flow_doc_to_ir(normalized)?;
729        let flow = flow_ir_to_flow(flow_ir)?;
730
731        descriptors.push(FlowDescriptor {
732            id: entry.id.clone(),
733            flow_type: entry.kind.clone(),
734            profile: manifest.meta.pack_id.clone(),
735            version: manifest.meta.version.to_string(),
736            description: None,
737        });
738        flows.insert(entry.id.clone(), flow);
739    }
740
741    let mut entry_flows = manifest.meta.entry_flows.clone();
742    if entry_flows.is_empty() {
743        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
744    }
745    let metadata = PackMetadata {
746        pack_id: manifest.meta.pack_id.clone(),
747        version: manifest.meta.version.to_string(),
748        entry_flows,
749        secret_requirements: Vec::new(),
750    };
751
752    Ok(PackFlows {
753        descriptors,
754        flows,
755        metadata,
756    })
757}
758
759fn load_legacy_flows_from_dir(
760    root: &Path,
761    manifest: &legacy_pack::PackManifest,
762) -> Result<PackFlows> {
763    let mut flows = HashMap::new();
764    let mut descriptors = Vec::new();
765
766    for entry in &manifest.flows {
767        let path = root.join(&entry.file_json);
768        let bytes = std::fs::read(&path)
769            .with_context(|| format!("missing flow json {}", path.display()))?;
770        let doc: FlowDoc = serde_json::from_slice(&bytes)
771            .with_context(|| format!("failed to decode flow doc {}", path.display()))?;
772        let normalized = normalize_flow_doc(doc);
773        let flow_ir = flow_doc_to_ir(normalized)?;
774        let flow = flow_ir_to_flow(flow_ir)?;
775
776        descriptors.push(FlowDescriptor {
777            id: entry.id.clone(),
778            flow_type: entry.kind.clone(),
779            profile: manifest.meta.pack_id.clone(),
780            version: manifest.meta.version.to_string(),
781            description: None,
782        });
783        flows.insert(entry.id.clone(), flow);
784    }
785
786    let mut entry_flows = manifest.meta.entry_flows.clone();
787    if entry_flows.is_empty() {
788        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
789    }
790    let metadata = PackMetadata {
791        pack_id: manifest.meta.pack_id.clone(),
792        version: manifest.meta.version.to_string(),
793        entry_flows,
794        secret_requirements: Vec::new(),
795    };
796
797    Ok(PackFlows {
798        descriptors,
799        flows,
800        metadata,
801    })
802}
803
804pub struct ComponentState {
805    pub host: HostState,
806    wasi_ctx: WasiCtx,
807    resource_table: ResourceTable,
808}
809
810impl ComponentState {
811    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
812        let wasi_ctx = policy
813            .instantiate()
814            .context("failed to build WASI context")?;
815        Ok(Self {
816            host,
817            wasi_ctx,
818            resource_table: ResourceTable::new(),
819        })
820    }
821
822    fn host_mut(&mut self) -> &mut HostState {
823        &mut self.host
824    }
825}
826
827impl control::Host for ComponentState {
828    fn should_cancel(&mut self) -> bool {
829        false
830    }
831
832    fn yield_now(&mut self) {
833        // no-op cooperative yield
834    }
835}
836
837fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
838    let mut inst = linker.instance("greentic:component/control@0.4.0")?;
839    inst.func_wrap(
840        "should-cancel",
841        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
842            let host = caller.data_mut();
843            Ok((ComponentControlHost::should_cancel(host),))
844        },
845    )?;
846    inst.func_wrap(
847        "yield-now",
848        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
849            let host = caller.data_mut();
850            ComponentControlHost::yield_now(host);
851            Ok(())
852        },
853    )?;
854    Ok(())
855}
856
857pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
858    add_wasi_to_linker(linker)?;
859    add_all_v1_to_linker(
860        linker,
861        HostFns {
862            http_client_v1_1: Some(|state| state.host_mut()),
863            http_client: Some(|state| state.host_mut()),
864            oauth_broker: None,
865            runner_host_http: Some(|state| state.host_mut()),
866            runner_host_kv: Some(|state| state.host_mut()),
867            telemetry_logger: Some(|state| state.host_mut()),
868            state_store: Some(|state| state.host_mut()),
869            secrets_store: Some(|state| state.host_mut()),
870        },
871    )?;
872    Ok(())
873}
874
875impl OAuthHostContext for ComponentState {
876    fn tenant_id(&self) -> &str {
877        &self.host.config.tenant
878    }
879
880    fn env(&self) -> &str {
881        &self.host.default_env
882    }
883
884    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
885        &mut self.host.oauth_host
886    }
887
888    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
889        self.host.oauth_config.as_ref()
890    }
891}
892
893impl WasiView for ComponentState {
894    fn ctx(&mut self) -> WasiCtxView<'_> {
895        WasiCtxView {
896            ctx: &mut self.wasi_ctx,
897            table: &mut self.resource_table,
898        }
899    }
900}
901
902#[allow(unsafe_code)]
903unsafe impl Send for ComponentState {}
904#[allow(unsafe_code)]
905unsafe impl Sync for ComponentState {}
906
907impl PackRuntime {
908    #[allow(clippy::too_many_arguments)]
909    pub async fn load(
910        path: impl AsRef<Path>,
911        config: Arc<HostConfig>,
912        mocks: Option<Arc<MockLayer>>,
913        archive_source: Option<&Path>,
914        session_store: Option<DynSessionStore>,
915        state_store: Option<DynStateStore>,
916        wasi_policy: Arc<RunnerWasiPolicy>,
917        secrets: DynSecretsManager,
918        oauth_config: Option<OAuthBrokerConfig>,
919        verify_archive: bool,
920        component_resolution: ComponentResolution,
921    ) -> Result<Self> {
922        let path = path.as_ref();
923        let (_pack_root, safe_path) = normalize_pack_path(path)?;
924        let path_meta = std::fs::metadata(&safe_path).ok();
925        let is_dir = path_meta
926            .as_ref()
927            .map(|meta| meta.is_dir())
928            .unwrap_or(false);
929        let is_component = !is_dir
930            && safe_path
931                .extension()
932                .and_then(|ext| ext.to_str())
933                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
934                .unwrap_or(false);
935        let archive_hint_path = if let Some(source) = archive_source {
936            let (_, normalized) = normalize_pack_path(source)?;
937            Some(normalized)
938        } else if is_component || is_dir {
939            None
940        } else {
941            Some(safe_path.clone())
942        };
943        let archive_hint = archive_hint_path.as_deref();
944        if verify_archive {
945            if let Some(verify_target) = archive_hint.and_then(|p| {
946                std::fs::metadata(p)
947                    .ok()
948                    .filter(|meta| meta.is_file())
949                    .map(|_| p)
950            }) {
951                verify::verify_pack(verify_target).await?;
952                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
953            } else {
954                tracing::debug!("skipping archive verification (no archive source)");
955            }
956        }
957        let engine = Engine::default();
958        let mut metadata = PackMetadata::fallback(&safe_path);
959        let mut manifest = None;
960        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
961        let mut flows = None;
962        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
963            if is_dir {
964                Some(safe_path.clone())
965            } else {
966                None
967            }
968        });
969
970        if let Some(root) = materialized_root.as_ref() {
971            match load_manifest_and_flows_from_dir(root) {
972                Ok(ManifestLoad::New {
973                    manifest: m,
974                    flows: cache,
975                }) => {
976                    metadata = cache.metadata.clone();
977                    manifest = Some(*m);
978                    flows = Some(cache);
979                }
980                Ok(ManifestLoad::Legacy {
981                    manifest: m,
982                    flows: cache,
983                }) => {
984                    metadata = cache.metadata.clone();
985                    legacy_manifest = Some(m);
986                    flows = Some(cache);
987                }
988                Err(err) => {
989                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
990                }
991            }
992        }
993
994        if manifest.is_none()
995            && legacy_manifest.is_none()
996            && let Some(archive_path) = archive_hint
997        {
998            match load_manifest_and_flows(archive_path) {
999                Ok(ManifestLoad::New {
1000                    manifest: m,
1001                    flows: cache,
1002                }) => {
1003                    metadata = cache.metadata.clone();
1004                    manifest = Some(*m);
1005                    flows = Some(cache);
1006                }
1007                Ok(ManifestLoad::Legacy {
1008                    manifest: m,
1009                    flows: cache,
1010                }) => {
1011                    metadata = cache.metadata.clone();
1012                    legacy_manifest = Some(m);
1013                    flows = Some(cache);
1014                }
1015                Err(err) => {
1016                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
1017                }
1018            }
1019        }
1020        let components = if is_component {
1021            let wasm_bytes = fs::read(&safe_path).await?;
1022            metadata = PackMetadata::from_wasm(&wasm_bytes)
1023                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1024            let name = safe_path
1025                .file_stem()
1026                .map(|s| s.to_string_lossy().to_string())
1027                .unwrap_or_else(|| "component".to_string());
1028            let component = Component::from_binary(&engine, &wasm_bytes)?;
1029            let mut map = HashMap::new();
1030            map.insert(
1031                name.clone(),
1032                PackComponent {
1033                    name,
1034                    version: metadata.version.clone(),
1035                    component,
1036                },
1037            );
1038            map
1039        } else {
1040            let specs = component_specs(manifest.as_ref(), legacy_manifest.as_deref());
1041            if specs.is_empty() {
1042                HashMap::new()
1043            } else {
1044                let mut loaded = HashMap::new();
1045                let mut missing: HashSet<String> =
1046                    specs.iter().map(|spec| spec.id.clone()).collect();
1047                let mut searched = Vec::new();
1048
1049                if !component_resolution.overrides.is_empty() {
1050                    load_components_from_overrides(
1051                        &engine,
1052                        &component_resolution.overrides,
1053                        &specs,
1054                        &mut missing,
1055                        &mut loaded,
1056                    )?;
1057                    searched.push("override map".to_string());
1058                }
1059
1060                if let Some(root) = materialized_root.as_ref() {
1061                    load_components_from_dir(&engine, root, &specs, &mut missing, &mut loaded)?;
1062                    searched.push(format!("components dir {}", root.display()));
1063                }
1064
1065                if let Some(archive_path) = archive_hint {
1066                    load_components_from_archive(
1067                        &engine,
1068                        archive_path,
1069                        &specs,
1070                        &mut missing,
1071                        &mut loaded,
1072                    )?;
1073                    searched.push(format!("archive {}", archive_path.display()));
1074                }
1075
1076                if !missing.is_empty() {
1077                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1078                    let sources = if searched.is_empty() {
1079                        "no component sources".to_string()
1080                    } else {
1081                        searched.join(", ")
1082                    };
1083                    bail!(
1084                        "components missing: {}; looked in {}",
1085                        missing_list,
1086                        sources
1087                    );
1088                }
1089
1090                loaded
1091            }
1092        };
1093        let http_client = Arc::clone(&HTTP_CLIENT);
1094        Ok(Self {
1095            path: safe_path,
1096            archive_path: archive_hint.map(Path::to_path_buf),
1097            config,
1098            engine,
1099            metadata,
1100            manifest,
1101            legacy_manifest,
1102            mocks,
1103            flows,
1104            components,
1105            http_client,
1106            pre_cache: Mutex::new(HashMap::new()),
1107            session_store,
1108            state_store,
1109            wasi_policy,
1110            provider_registry: RwLock::new(None),
1111            secrets,
1112            oauth_config,
1113        })
1114    }
1115
1116    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1117        if let Some(cache) = &self.flows {
1118            return Ok(cache.descriptors.clone());
1119        }
1120        if let Some(manifest) = &self.manifest {
1121            let descriptors = manifest
1122                .flows
1123                .iter()
1124                .map(|flow| FlowDescriptor {
1125                    id: flow.id.as_str().to_string(),
1126                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1127                    profile: manifest.pack_id.as_str().to_string(),
1128                    version: manifest.version.to_string(),
1129                    description: None,
1130                })
1131                .collect();
1132            return Ok(descriptors);
1133        }
1134        Ok(Vec::new())
1135    }
1136
1137    #[allow(dead_code)]
1138    pub async fn run_flow(
1139        &self,
1140        flow_id: &str,
1141        input: serde_json::Value,
1142    ) -> Result<serde_json::Value> {
1143        let pack = Arc::new(
1144            PackRuntime::load(
1145                &self.path,
1146                Arc::clone(&self.config),
1147                self.mocks.clone(),
1148                self.archive_path.as_deref(),
1149                self.session_store.clone(),
1150                self.state_store.clone(),
1151                Arc::clone(&self.wasi_policy),
1152                self.secrets.clone(),
1153                self.oauth_config.clone(),
1154                false,
1155                ComponentResolution::default(),
1156            )
1157            .await?,
1158        );
1159
1160        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1161        let retry_config = self.config.retry_config().into();
1162        let mocks = pack.mocks.as_deref();
1163        let tenant = self.config.tenant.as_str();
1164
1165        let ctx = FlowContext {
1166            tenant,
1167            flow_id,
1168            node_id: None,
1169            tool: None,
1170            action: None,
1171            session_id: None,
1172            provider_id: None,
1173            retry_config,
1174            observer: None,
1175            mocks,
1176        };
1177
1178        let execution = engine.execute(ctx, input).await?;
1179        match execution.status {
1180            FlowStatus::Completed => Ok(execution.output),
1181            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1182                "status": "pending",
1183                "reason": wait.reason,
1184                "resume": wait.snapshot,
1185                "response": execution.output,
1186            })),
1187        }
1188    }
1189
1190    pub async fn invoke_component(
1191        &self,
1192        component_ref: &str,
1193        ctx: ComponentExecCtx,
1194        operation: &str,
1195        _config_json: Option<String>,
1196        input_json: String,
1197    ) -> Result<Value> {
1198        let pack_component = self
1199            .components
1200            .get(component_ref)
1201            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1202
1203        let mut linker = Linker::new(&self.engine);
1204        register_all(&mut linker)?;
1205        add_component_control_to_linker(&mut linker)?;
1206        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1207        let pre: ComponentPre<ComponentState> = ComponentPre::new(pre_instance)?;
1208
1209        let host_state = HostState::new(
1210            Arc::clone(&self.config),
1211            Arc::clone(&self.http_client),
1212            self.mocks.clone(),
1213            self.session_store.clone(),
1214            self.state_store.clone(),
1215            Arc::clone(&self.secrets),
1216            self.oauth_config.clone(),
1217        )?;
1218        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1219        let mut store = wasmtime::Store::new(&self.engine, store_state);
1220        let bindings: crate::component_api::Component = pre.instantiate_async(&mut store).await?;
1221        let node = bindings.greentic_component_node();
1222
1223        let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1224
1225        match result {
1226            InvokeResult::Ok(body) => {
1227                if body.is_empty() {
1228                    return Ok(Value::Null);
1229                }
1230                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1231            }
1232            InvokeResult::Err(NodeError {
1233                code,
1234                message,
1235                retryable,
1236                backoff_ms,
1237                details,
1238            }) => {
1239                let mut obj = serde_json::Map::new();
1240                obj.insert("ok".into(), Value::Bool(false));
1241                let mut error = serde_json::Map::new();
1242                error.insert("code".into(), Value::String(code));
1243                error.insert("message".into(), Value::String(message));
1244                error.insert("retryable".into(), Value::Bool(retryable));
1245                if let Some(backoff) = backoff_ms {
1246                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1247                }
1248                if let Some(details) = details {
1249                    error.insert(
1250                        "details".into(),
1251                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1252                    );
1253                }
1254                obj.insert("error".into(), Value::Object(error));
1255                Ok(Value::Object(obj))
1256            }
1257        }
1258    }
1259
1260    pub fn resolve_provider(
1261        &self,
1262        provider_id: Option<&str>,
1263        provider_type: Option<&str>,
1264    ) -> Result<ProviderBinding> {
1265        let registry = self.provider_registry()?;
1266        registry.resolve(provider_id, provider_type)
1267    }
1268
1269    pub async fn invoke_provider(
1270        &self,
1271        binding: &ProviderBinding,
1272        _ctx: ComponentExecCtx,
1273        op: &str,
1274        input_json: Vec<u8>,
1275    ) -> Result<Value> {
1276        let component_ref = &binding.component_ref;
1277        let pack_component = self
1278            .components
1279            .get(component_ref)
1280            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1281
1282        let mut linker = Linker::new(&self.engine);
1283        register_all(&mut linker)?;
1284        add_component_control_to_linker(&mut linker)?;
1285        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1286        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1287
1288        let host_state = HostState::new(
1289            Arc::clone(&self.config),
1290            Arc::clone(&self.http_client),
1291            self.mocks.clone(),
1292            self.session_store.clone(),
1293            self.state_store.clone(),
1294            Arc::clone(&self.secrets),
1295            self.oauth_config.clone(),
1296        )?;
1297        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1298        let mut store = wasmtime::Store::new(&self.engine, store_state);
1299        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1300        let provider = bindings.greentic_provider_core_schema_core_api();
1301
1302        let result = provider.call_invoke(&mut store, op, &input_json)?;
1303        deserialize_json_bytes(result)
1304    }
1305
1306    fn provider_registry(&self) -> Result<ProviderRegistry> {
1307        if let Some(registry) = self.provider_registry.read().clone() {
1308            return Ok(registry);
1309        }
1310        let manifest = self
1311            .manifest
1312            .as_ref()
1313            .context("pack manifest required for provider resolution")?;
1314        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1315        let registry = ProviderRegistry::new(
1316            manifest,
1317            self.state_store.clone(),
1318            &self.config.tenant,
1319            &env,
1320        )?;
1321        *self.provider_registry.write() = Some(registry.clone());
1322        Ok(registry)
1323    }
1324
1325    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1326        if let Some(cache) = &self.flows {
1327            return cache
1328                .flows
1329                .get(flow_id)
1330                .cloned()
1331                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1332        }
1333        if let Some(manifest) = &self.manifest {
1334            let entry = manifest
1335                .flows
1336                .iter()
1337                .find(|f| f.id.as_str() == flow_id)
1338                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1339            return Ok(entry.flow.clone());
1340        }
1341        bail!("flow '{flow_id}' not available (pack exports disabled)")
1342    }
1343
1344    pub fn metadata(&self) -> &PackMetadata {
1345        &self.metadata
1346    }
1347
1348    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1349        &self.metadata.secret_requirements
1350    }
1351
1352    pub fn missing_secrets(
1353        &self,
1354        tenant_ctx: &TypesTenantCtx,
1355    ) -> Vec<greentic_types::SecretRequirement> {
1356        let env = tenant_ctx.env.as_str().to_string();
1357        let tenant = tenant_ctx.tenant.as_str().to_string();
1358        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1359        self.required_secrets()
1360            .iter()
1361            .filter(|req| {
1362                // scope must match current context if provided
1363                if let Some(scope) = &req.scope {
1364                    if scope.env != env {
1365                        return false;
1366                    }
1367                    if scope.tenant != tenant {
1368                        return false;
1369                    }
1370                    if let Some(ref team_req) = scope.team
1371                        && team.as_ref() != Some(team_req)
1372                    {
1373                        return false;
1374                    }
1375                }
1376                read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1377            })
1378            .cloned()
1379            .collect()
1380    }
1381
1382    pub fn for_component_test(
1383        components: Vec<(String, PathBuf)>,
1384        flows: HashMap<String, FlowIR>,
1385        config: Arc<HostConfig>,
1386    ) -> Result<Self> {
1387        let engine = Engine::default();
1388        let mut component_map = HashMap::new();
1389        for (name, path) in components {
1390            if !path.exists() {
1391                bail!("component artifact missing: {}", path.display());
1392            }
1393            let wasm_bytes = std::fs::read(&path)?;
1394            let component = Component::from_binary(&engine, &wasm_bytes)
1395                .with_context(|| format!("failed to compile component {}", path.display()))?;
1396            component_map.insert(
1397                name.clone(),
1398                PackComponent {
1399                    name,
1400                    version: "0.0.0".into(),
1401                    component,
1402                },
1403            );
1404        }
1405
1406        let mut flow_map = HashMap::new();
1407        let mut descriptors = Vec::new();
1408        for (id, ir) in flows {
1409            let flow_type = ir.flow_type.clone();
1410            let flow = flow_ir_to_flow(ir)?;
1411            flow_map.insert(id.clone(), flow);
1412            descriptors.push(FlowDescriptor {
1413                id: id.clone(),
1414                flow_type,
1415                profile: "test".into(),
1416                version: "0.0.0".into(),
1417                description: None,
1418            });
1419        }
1420        let flows_cache = PackFlows {
1421            descriptors: descriptors.clone(),
1422            flows: flow_map,
1423            metadata: PackMetadata::fallback(Path::new("component-test")),
1424        };
1425
1426        Ok(Self {
1427            path: PathBuf::new(),
1428            archive_path: None,
1429            config,
1430            engine,
1431            metadata: PackMetadata::fallback(Path::new("component-test")),
1432            manifest: None,
1433            legacy_manifest: None,
1434            mocks: None,
1435            flows: Some(flows_cache),
1436            components: component_map,
1437            http_client: Arc::clone(&HTTP_CLIENT),
1438            pre_cache: Mutex::new(HashMap::new()),
1439            session_store: None,
1440            state_store: None,
1441            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1442            provider_registry: RwLock::new(None),
1443            secrets: crate::secrets::default_manager(),
1444            oauth_config: None,
1445        })
1446    }
1447}
1448
1449struct PackFlows {
1450    descriptors: Vec<FlowDescriptor>,
1451    flows: HashMap<String, Flow>,
1452    metadata: PackMetadata,
1453}
1454
1455const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
1456    "greentic.pack.runtime_flow",
1457    "greentic.pack.flow_runtime",
1458    "greentic.pack.runtime_flows",
1459];
1460
1461#[derive(Debug, Deserialize)]
1462struct RuntimeFlowBundle {
1463    flows: Vec<RuntimeFlow>,
1464}
1465
1466#[derive(Debug, Deserialize)]
1467struct RuntimeFlow {
1468    id: String,
1469    #[serde(alias = "flow_type")]
1470    kind: FlowKind,
1471    #[serde(default)]
1472    schema_version: Option<String>,
1473    #[serde(default)]
1474    start: Option<String>,
1475    #[serde(default)]
1476    entrypoints: BTreeMap<String, Value>,
1477    nodes: BTreeMap<String, RuntimeNode>,
1478    #[serde(default)]
1479    metadata: Option<FlowMetadata>,
1480}
1481
1482#[derive(Debug, Deserialize)]
1483struct RuntimeNode {
1484    #[serde(alias = "component")]
1485    component_id: String,
1486    #[serde(default, alias = "operation")]
1487    operation_name: Option<String>,
1488    #[serde(default, alias = "payload", alias = "input")]
1489    operation_payload: Value,
1490    #[serde(default)]
1491    routing: Option<Routing>,
1492    #[serde(default)]
1493    telemetry: Option<TelemetryHints>,
1494}
1495
1496fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1497    if bytes.is_empty() {
1498        return Ok(Value::Null);
1499    }
1500    serde_json::from_slice(&bytes).or_else(|_| {
1501        String::from_utf8(bytes)
1502            .map(Value::String)
1503            .map_err(|err| anyhow!(err))
1504    })
1505}
1506
1507impl PackFlows {
1508    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1509        if let Some(flows) = flows_from_runtime_extension(&manifest) {
1510            return flows;
1511        }
1512        let descriptors = manifest
1513            .flows
1514            .iter()
1515            .map(|entry| FlowDescriptor {
1516                id: entry.id.as_str().to_string(),
1517                flow_type: flow_kind_to_str(entry.kind).to_string(),
1518                profile: manifest.pack_id.as_str().to_string(),
1519                version: manifest.version.to_string(),
1520                description: None,
1521            })
1522            .collect();
1523        let mut flows = HashMap::new();
1524        for entry in &manifest.flows {
1525            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1526        }
1527        Self {
1528            metadata: PackMetadata::from_manifest(&manifest),
1529            descriptors,
1530            flows,
1531        }
1532    }
1533}
1534
1535fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
1536    let extensions = manifest.extensions.as_ref()?;
1537    let extension = extensions.iter().find_map(|(key, ext)| {
1538        if RUNTIME_FLOW_EXTENSION_IDS
1539            .iter()
1540            .any(|candidate| candidate == key)
1541        {
1542            Some(ext)
1543        } else {
1544            None
1545        }
1546    })?;
1547    let runtime_flows = match decode_runtime_flow_extension(extension) {
1548        Some(flows) if !flows.is_empty() => flows,
1549        _ => return None,
1550    };
1551
1552    let descriptors = runtime_flows
1553        .iter()
1554        .map(|flow| FlowDescriptor {
1555            id: flow.id.as_str().to_string(),
1556            flow_type: flow_kind_to_str(flow.kind).to_string(),
1557            profile: manifest.pack_id.as_str().to_string(),
1558            version: manifest.version.to_string(),
1559            description: None,
1560        })
1561        .collect::<Vec<_>>();
1562    let flows = runtime_flows
1563        .into_iter()
1564        .map(|flow| (flow.id.as_str().to_string(), flow))
1565        .collect();
1566
1567    Some(PackFlows {
1568        metadata: PackMetadata::from_manifest(manifest),
1569        descriptors,
1570        flows,
1571    })
1572}
1573
1574fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
1575    let value = match extension.inline.as_ref()? {
1576        ExtensionInline::Other(value) => value.clone(),
1577        _ => return None,
1578    };
1579
1580    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
1581        return Some(collect_runtime_flows(bundle.flows));
1582    }
1583
1584    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
1585        return Some(collect_runtime_flows(flows));
1586    }
1587
1588    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
1589        return Some(flows);
1590    }
1591
1592    warn!(
1593        extension = %extension.kind,
1594        version = %extension.version,
1595        "runtime flow extension present but could not be decoded"
1596    );
1597    None
1598}
1599
1600fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
1601    flows
1602        .into_iter()
1603        .filter_map(|flow| match runtime_flow_to_flow(flow) {
1604            Ok(flow) => Some(flow),
1605            Err(err) => {
1606                warn!(error = %err, "failed to decode runtime flow");
1607                None
1608            }
1609        })
1610        .collect()
1611}
1612
1613fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
1614    let flow_id = FlowId::from_str(&runtime.id)
1615        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
1616    let mut entrypoints = runtime.entrypoints;
1617    if entrypoints.is_empty()
1618        && let Some(start) = &runtime.start
1619    {
1620        entrypoints.insert("default".into(), Value::String(start.clone()));
1621    }
1622
1623    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
1624    for (id, node) in runtime.nodes {
1625        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
1626        let component_id = ComponentId::from_str(&node.component_id)
1627            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
1628        let component = FlowComponentRef {
1629            id: component_id,
1630            pack_alias: None,
1631            operation: node.operation_name,
1632        };
1633        let routing = node.routing.unwrap_or(Routing::End);
1634        let telemetry = node.telemetry.unwrap_or_default();
1635        nodes.insert(
1636            node_id.clone(),
1637            Node {
1638                id: node_id,
1639                component,
1640                input: InputMapping {
1641                    mapping: node.operation_payload,
1642                },
1643                output: OutputMapping {
1644                    mapping: Value::Null,
1645                },
1646                routing,
1647                telemetry,
1648            },
1649        );
1650    }
1651
1652    Ok(Flow {
1653        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
1654        id: flow_id,
1655        kind: runtime.kind,
1656        entrypoints,
1657        nodes,
1658        metadata: runtime.metadata.unwrap_or_default(),
1659    })
1660}
1661
1662fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1663    match kind {
1664        greentic_types::FlowKind::Messaging => "messaging",
1665        greentic_types::FlowKind::Event => "event",
1666        greentic_types::FlowKind::ComponentConfig => "component-config",
1667        greentic_types::FlowKind::Job => "job",
1668        greentic_types::FlowKind::Http => "http",
1669    }
1670}
1671
1672fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1673    let mut file = archive
1674        .by_name(name)
1675        .with_context(|| format!("entry {name} missing from archive"))?;
1676    let mut buf = Vec::new();
1677    file.read_to_end(&mut buf)?;
1678    Ok(buf)
1679}
1680
1681fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1682    for node in doc.nodes.values_mut() {
1683        let Some((component_ref, payload)) = node
1684            .raw
1685            .iter()
1686            .next()
1687            .map(|(key, value)| (key.clone(), value.clone()))
1688        else {
1689            continue;
1690        };
1691        if component_ref.starts_with("emit.") {
1692            node.operation = Some(component_ref);
1693            node.payload = payload;
1694            node.raw.clear();
1695            continue;
1696        }
1697        let (target_component, operation, input, config) =
1698            infer_component_exec(&payload, &component_ref);
1699        let mut payload_obj = serde_json::Map::new();
1700        // component.exec is meta; ensure the payload carries the actual target component.
1701        payload_obj.insert("component".into(), Value::String(target_component));
1702        payload_obj.insert("operation".into(), Value::String(operation));
1703        payload_obj.insert("input".into(), input);
1704        if let Some(cfg) = config {
1705            payload_obj.insert("config".into(), cfg);
1706        }
1707        node.operation = Some("component.exec".to_string());
1708        node.payload = Value::Object(payload_obj);
1709        node.raw.clear();
1710    }
1711    doc
1712}
1713
1714fn infer_component_exec(
1715    payload: &Value,
1716    component_ref: &str,
1717) -> (String, String, Value, Option<Value>) {
1718    let default_op = if component_ref.starts_with("templating.") {
1719        "render"
1720    } else {
1721        "invoke"
1722    }
1723    .to_string();
1724
1725    if let Value::Object(map) = payload {
1726        let op = map
1727            .get("op")
1728            .or_else(|| map.get("operation"))
1729            .and_then(Value::as_str)
1730            .map(|s| s.to_string())
1731            .unwrap_or_else(|| default_op.clone());
1732
1733        let mut input = map.clone();
1734        let config = input.remove("config");
1735        let component = input
1736            .get("component")
1737            .or_else(|| input.get("component_ref"))
1738            .and_then(Value::as_str)
1739            .map(|s| s.to_string())
1740            .unwrap_or_else(|| component_ref.to_string());
1741        input.remove("component");
1742        input.remove("component_ref");
1743        input.remove("op");
1744        input.remove("operation");
1745        return (component, op, Value::Object(input), config);
1746    }
1747
1748    (component_ref.to_string(), default_op, payload.clone(), None)
1749}
1750
1751#[derive(Clone, Debug)]
1752struct ComponentSpec {
1753    id: String,
1754    version: String,
1755    legacy_path: Option<String>,
1756}
1757
1758fn component_specs(
1759    manifest: Option<&greentic_types::PackManifest>,
1760    legacy_manifest: Option<&legacy_pack::PackManifest>,
1761) -> Vec<ComponentSpec> {
1762    if let Some(manifest) = manifest {
1763        return manifest
1764            .components
1765            .iter()
1766            .map(|entry| ComponentSpec {
1767                id: entry.id.as_str().to_string(),
1768                version: entry.version.to_string(),
1769                legacy_path: None,
1770            })
1771            .collect();
1772    }
1773    if let Some(legacy_manifest) = legacy_manifest {
1774        return legacy_manifest
1775            .components
1776            .iter()
1777            .map(|entry| ComponentSpec {
1778                id: entry.name.clone(),
1779                version: entry.version.to_string(),
1780                legacy_path: Some(entry.file_wasm.clone()),
1781            })
1782            .collect();
1783    }
1784    Vec::new()
1785}
1786
1787fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
1788    if let Some(path) = &spec.legacy_path {
1789        return root.join(path);
1790    }
1791    root.join("components").join(format!("{}.wasm", spec.id))
1792}
1793
1794fn load_components_from_overrides(
1795    engine: &Engine,
1796    overrides: &HashMap<String, PathBuf>,
1797    specs: &[ComponentSpec],
1798    missing: &mut HashSet<String>,
1799    into: &mut HashMap<String, PackComponent>,
1800) -> Result<()> {
1801    for spec in specs {
1802        if !missing.contains(&spec.id) {
1803            continue;
1804        }
1805        let Some(path) = overrides.get(&spec.id) else {
1806            continue;
1807        };
1808        let bytes = std::fs::read(path)
1809            .with_context(|| format!("failed to read override component {}", path.display()))?;
1810        let component = Component::from_binary(engine, &bytes).with_context(|| {
1811            format!(
1812                "failed to compile component {} from override {}",
1813                spec.id,
1814                path.display()
1815            )
1816        })?;
1817        into.insert(
1818            spec.id.clone(),
1819            PackComponent {
1820                name: spec.id.clone(),
1821                version: spec.version.clone(),
1822                component,
1823            },
1824        );
1825        missing.remove(&spec.id);
1826    }
1827    Ok(())
1828}
1829
1830fn load_components_from_dir(
1831    engine: &Engine,
1832    root: &Path,
1833    specs: &[ComponentSpec],
1834    missing: &mut HashSet<String>,
1835    into: &mut HashMap<String, PackComponent>,
1836) -> Result<()> {
1837    for spec in specs {
1838        if !missing.contains(&spec.id) {
1839            continue;
1840        }
1841        let path = component_path_for_spec(root, spec);
1842        if !path.exists() {
1843            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
1844            continue;
1845        }
1846        let bytes = std::fs::read(&path)
1847            .with_context(|| format!("failed to read component {}", path.display()))?;
1848        let component = Component::from_binary(engine, &bytes).with_context(|| {
1849            format!(
1850                "failed to compile component {} from {}",
1851                spec.id,
1852                path.display()
1853            )
1854        })?;
1855        into.insert(
1856            spec.id.clone(),
1857            PackComponent {
1858                name: spec.id.clone(),
1859                version: spec.version.clone(),
1860                component,
1861            },
1862        );
1863        missing.remove(&spec.id);
1864    }
1865    Ok(())
1866}
1867
1868fn load_components_from_archive(
1869    engine: &Engine,
1870    path: &Path,
1871    specs: &[ComponentSpec],
1872    missing: &mut HashSet<String>,
1873    into: &mut HashMap<String, PackComponent>,
1874) -> Result<()> {
1875    let mut archive = ZipArchive::new(File::open(path)?)
1876        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1877    for spec in specs {
1878        if !missing.contains(&spec.id) {
1879            continue;
1880        }
1881        let file_name = spec
1882            .legacy_path
1883            .clone()
1884            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
1885        let bytes = match read_entry(&mut archive, &file_name) {
1886            Ok(bytes) => bytes,
1887            Err(err) => {
1888                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
1889                continue;
1890            }
1891        };
1892        let component = Component::from_binary(engine, &bytes)
1893            .with_context(|| format!("failed to compile component {}", spec.id))?;
1894        into.insert(
1895            spec.id.clone(),
1896            PackComponent {
1897                name: spec.id.clone(),
1898                version: spec.version.clone(),
1899                component,
1900            },
1901        );
1902        missing.remove(&spec.id);
1903    }
1904    Ok(())
1905}
1906
1907#[cfg(test)]
1908mod tests {
1909    use super::*;
1910    use greentic_flow::model::{FlowDoc, NodeDoc};
1911    use serde_json::json;
1912    use std::collections::BTreeMap;
1913
1914    #[test]
1915    fn normalizes_raw_component_to_component_exec() {
1916        let mut nodes = BTreeMap::new();
1917        let mut raw = BTreeMap::new();
1918        raw.insert(
1919            "templating.handlebars".into(),
1920            json!({ "template": "Hi {{name}}" }),
1921        );
1922        nodes.insert(
1923            "start".into(),
1924            NodeDoc {
1925                raw,
1926                routing: json!([{"out": true}]),
1927                ..Default::default()
1928            },
1929        );
1930        let doc = FlowDoc {
1931            id: "welcome".into(),
1932            title: None,
1933            description: None,
1934            flow_type: "messaging".into(),
1935            start: Some("start".into()),
1936            parameters: json!({}),
1937            tags: Vec::new(),
1938            schema_version: None,
1939            entrypoints: BTreeMap::new(),
1940            nodes,
1941        };
1942
1943        let normalized = normalize_flow_doc(doc);
1944        let node = normalized.nodes.get("start").expect("node exists");
1945        assert_eq!(node.operation.as_deref(), Some("component.exec"));
1946        assert!(node.raw.is_empty());
1947        let payload = node.payload.as_object().expect("payload object");
1948        assert_eq!(
1949            payload.get("component"),
1950            Some(&Value::String("templating.handlebars".into()))
1951        );
1952        assert_eq!(
1953            payload.get("operation"),
1954            Some(&Value::String("render".into()))
1955        );
1956        let input = payload.get("input").unwrap();
1957        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1958    }
1959}
1960
1961#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1962pub struct PackMetadata {
1963    pub pack_id: String,
1964    pub version: String,
1965    #[serde(default)]
1966    pub entry_flows: Vec<String>,
1967    #[serde(default)]
1968    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
1969}
1970
1971impl PackMetadata {
1972    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1973        let parser = Parser::new(0);
1974        for payload in parser.parse_all(bytes) {
1975            let payload = payload.ok()?;
1976            match payload {
1977                Payload::CustomSection(section) => {
1978                    if section.name() == "greentic.manifest"
1979                        && let Ok(meta) = Self::from_bytes(section.data())
1980                    {
1981                        return Some(meta);
1982                    }
1983                }
1984                Payload::DataSection(reader) => {
1985                    for segment in reader.into_iter().flatten() {
1986                        if let Ok(meta) = Self::from_bytes(segment.data) {
1987                            return Some(meta);
1988                        }
1989                    }
1990                }
1991                _ => {}
1992            }
1993        }
1994        None
1995    }
1996
1997    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1998        #[derive(Deserialize)]
1999        struct RawManifest {
2000            pack_id: String,
2001            version: String,
2002            #[serde(default)]
2003            entry_flows: Vec<String>,
2004            #[serde(default)]
2005            flows: Vec<RawFlow>,
2006            #[serde(default)]
2007            secret_requirements: Vec<greentic_types::SecretRequirement>,
2008        }
2009
2010        #[derive(Deserialize)]
2011        struct RawFlow {
2012            id: String,
2013        }
2014
2015        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
2016        let mut entry_flows = if manifest.entry_flows.is_empty() {
2017            manifest.flows.iter().map(|f| f.id.clone()).collect()
2018        } else {
2019            manifest.entry_flows.clone()
2020        };
2021        entry_flows.retain(|id| !id.is_empty());
2022        Ok(Self {
2023            pack_id: manifest.pack_id,
2024            version: manifest.version,
2025            entry_flows,
2026            secret_requirements: manifest.secret_requirements,
2027        })
2028    }
2029
2030    pub fn fallback(path: &Path) -> Self {
2031        let pack_id = path
2032            .file_stem()
2033            .map(|s| s.to_string_lossy().into_owned())
2034            .unwrap_or_else(|| "unknown-pack".to_string());
2035        Self {
2036            pack_id,
2037            version: "0.0.0".to_string(),
2038            entry_flows: Vec::new(),
2039            secret_requirements: Vec::new(),
2040        }
2041    }
2042
2043    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
2044        let entry_flows = manifest
2045            .flows
2046            .iter()
2047            .map(|flow| flow.id.as_str().to_string())
2048            .collect::<Vec<_>>();
2049        Self {
2050            pack_id: manifest.pack_id.as_str().to_string(),
2051            version: manifest.version.to_string(),
2052            entry_flows,
2053            secret_requirements: manifest.secret_requirements.clone(),
2054        }
2055    }
2056}