greentic_runner_host/
pack.rs

1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::component_api::component::greentic::component::control::Host as ComponentControlHost;
10use crate::component_api::{
11    ComponentPre, control, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::SchemaCorePre as ProviderComponentPre;
16use crate::provider_core_only;
17use crate::runtime_wasmtime::{Component, Engine, Linker, ResourceTable};
18use anyhow::{Context, Result, anyhow, bail};
19use greentic_interfaces_wasmtime::host_helpers::v1::{
20    self as host_v1, HostFns, add_all_v1_to_linker,
21    messaging_session::{
22        MessagingSessionError as MessagingError, MessagingSessionHost, OpAck as MessagingAck,
23        OutboundMessage, TenantCtx as MessagingTenantCtx,
24    },
25    runner_host_http::RunnerHostHttp,
26    runner_host_kv::RunnerHostKv,
27    secrets_store::{SecretsError, SecretsStoreHost},
28    state_store::{
29        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
30        StateStoreHost, TenantCtx as StateTenantCtx,
31    },
32    telemetry_logger::{
33        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
34        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
35        TenantCtx as TelemetryTenantCtx,
36    },
37};
38use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
39use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
40use greentic_pack::builder as legacy_pack;
41use greentic_types::{
42    EnvId, Flow, StateKey as StoreStateKey, TeamId, TenantCtx as TypesTenantCtx, TenantId, UserId,
43    decode_pack_manifest,
44};
45use host_v1::http_client::{
46    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
47    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
48    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
49    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
50};
51use once_cell::sync::Lazy;
52use parking_lot::{Mutex, RwLock};
53use reqwest::blocking::Client as BlockingClient;
54use runner_core::normalize_under_root;
55use serde::{Deserialize, Serialize};
56use serde_cbor;
57use serde_json::{self, Value};
58use tokio::fs;
59use wasmparser::{Parser, Payload};
60use wasmtime::StoreContextMut;
61use zip::ZipArchive;
62
63use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
64use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
65use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
66
67use crate::config::HostConfig;
68use crate::secrets::{DynSecretsManager, read_secret_blocking};
69use crate::storage::state::STATE_PREFIX;
70use crate::storage::{DynSessionStore, DynStateStore};
71use crate::verify;
72use crate::wasi::RunnerWasiPolicy;
73use tracing::warn;
74use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
75use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
76
77use greentic_flow::model::FlowDoc;
78
79#[allow(dead_code)]
80pub struct PackRuntime {
81    /// Component artifact path (wasm file).
82    path: PathBuf,
83    /// Optional archive (.gtpack) used to load flows/manifests.
84    archive_path: Option<PathBuf>,
85    config: Arc<HostConfig>,
86    engine: Engine,
87    metadata: PackMetadata,
88    manifest: Option<greentic_types::PackManifest>,
89    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
90    mocks: Option<Arc<MockLayer>>,
91    flows: Option<PackFlows>,
92    components: HashMap<String, PackComponent>,
93    http_client: Arc<BlockingClient>,
94    pre_cache: Mutex<HashMap<String, ComponentPre<ComponentState>>>,
95    session_store: Option<DynSessionStore>,
96    state_store: Option<DynStateStore>,
97    wasi_policy: Arc<RunnerWasiPolicy>,
98    provider_registry: RwLock<Option<ProviderRegistry>>,
99    secrets: DynSecretsManager,
100    oauth_config: Option<OAuthBrokerConfig>,
101}
102
103struct PackComponent {
104    #[allow(dead_code)]
105    name: String,
106    #[allow(dead_code)]
107    version: String,
108    component: Component,
109}
110
111fn build_blocking_client() -> BlockingClient {
112    std::thread::spawn(|| {
113        BlockingClient::builder()
114            .no_proxy()
115            .build()
116            .expect("blocking client")
117    })
118    .join()
119    .expect("client build thread panicked")
120}
121
122fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
123    let (root, candidate) = if path.is_absolute() {
124        let parent = path
125            .parent()
126            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
127        let root = parent
128            .canonicalize()
129            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
130        let file = path
131            .file_name()
132            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
133        (root, PathBuf::from(file))
134    } else {
135        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
136        let base = if let Some(parent) = path.parent() {
137            cwd.join(parent)
138        } else {
139            cwd
140        };
141        let root = base
142            .canonicalize()
143            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
144        let file = path
145            .file_name()
146            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
147        (root, PathBuf::from(file))
148    };
149    let safe = normalize_under_root(&root, &candidate)?;
150    Ok((root, safe))
151}
152
153static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct FlowDescriptor {
157    pub id: String,
158    #[serde(rename = "type")]
159    pub flow_type: String,
160    pub profile: String,
161    pub version: String,
162    #[serde(default)]
163    pub description: Option<String>,
164}
165
166pub struct HostState {
167    config: Arc<HostConfig>,
168    http_client: Arc<BlockingClient>,
169    default_env: String,
170    #[allow(dead_code)]
171    session_store: Option<DynSessionStore>,
172    state_store: Option<DynStateStore>,
173    mocks: Option<Arc<MockLayer>>,
174    secrets: DynSecretsManager,
175    oauth_config: Option<OAuthBrokerConfig>,
176    oauth_host: OAuthBrokerHost,
177}
178
179impl HostState {
180    #[allow(clippy::default_constructed_unit_structs)]
181    pub fn new(
182        config: Arc<HostConfig>,
183        http_client: Arc<BlockingClient>,
184        mocks: Option<Arc<MockLayer>>,
185        session_store: Option<DynSessionStore>,
186        state_store: Option<DynStateStore>,
187        secrets: DynSecretsManager,
188        oauth_config: Option<OAuthBrokerConfig>,
189    ) -> Result<Self> {
190        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
191        Ok(Self {
192            config,
193            http_client,
194            default_env,
195            session_store,
196            state_store,
197            mocks,
198            secrets,
199            oauth_config,
200            oauth_host: OAuthBrokerHost::default(),
201        })
202    }
203
204    pub fn get_secret(&self, key: &str) -> Result<String> {
205        if provider_core_only::is_enabled() {
206            bail!(provider_core_only::blocked_message("secrets"))
207        }
208        if !self.config.secrets_policy.is_allowed(key) {
209            bail!("secret {key} is not permitted by bindings policy");
210        }
211        if let Some(mock) = &self.mocks
212            && let Some(value) = mock.secrets_lookup(key)
213        {
214            return Ok(value);
215        }
216        let bytes = read_secret_blocking(&self.secrets, key)
217            .context("failed to read secret from manager")?;
218        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
219        Ok(value)
220    }
221
222    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
223        let tenant_raw = ctx
224            .as_ref()
225            .map(|ctx| ctx.tenant.clone())
226            .unwrap_or_else(|| self.config.tenant.clone());
227        let env_raw = ctx
228            .as_ref()
229            .map(|ctx| ctx.env.clone())
230            .unwrap_or_else(|| self.default_env.clone());
231        let tenant_id = TenantId::from_str(&tenant_raw)
232            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
233        let env_id = EnvId::from_str(&env_raw)
234            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
235        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
236        if let Some(ctx) = ctx {
237            if let Some(team) = ctx.team.or(ctx.team_id) {
238                let team_id =
239                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
240                tenant_ctx = tenant_ctx.with_team(Some(team_id));
241            }
242            if let Some(user) = ctx.user.or(ctx.user_id) {
243                let user_id =
244                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
245                tenant_ctx = tenant_ctx.with_user(Some(user_id));
246            }
247            if let Some(flow) = ctx.flow_id {
248                tenant_ctx = tenant_ctx.with_flow(flow);
249            }
250            if let Some(node) = ctx.node_id {
251                tenant_ctx = tenant_ctx.with_node(node);
252            }
253            if let Some(provider) = ctx.provider_id {
254                tenant_ctx = tenant_ctx.with_provider(provider);
255            }
256            if let Some(session) = ctx.session_id {
257                tenant_ctx = tenant_ctx.with_session(session);
258            }
259            tenant_ctx.trace_id = ctx.trace_id;
260        }
261        Ok(tenant_ctx)
262    }
263
264    fn send_http_request(
265        &mut self,
266        req: HttpRequest,
267        opts: Option<HttpRequestOptionsV1_1>,
268        _ctx: Option<HttpTenantCtx>,
269    ) -> Result<HttpResponse, HttpClientError> {
270        if !self.config.http_enabled {
271            return Err(HttpClientError {
272                code: "denied".into(),
273                message: "http client disabled by policy".into(),
274            });
275        }
276
277        let mut mock_state = None;
278        let raw_body = req.body.clone();
279        if let Some(mock) = &self.mocks
280            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
281        {
282            match mock.http_begin(&meta) {
283                HttpDecision::Mock(response) => {
284                    let headers = response
285                        .headers
286                        .iter()
287                        .map(|(k, v)| (k.clone(), v.clone()))
288                        .collect();
289                    return Ok(HttpResponse {
290                        status: response.status,
291                        headers,
292                        body: response.body.clone().map(|b| b.into_bytes()),
293                    });
294                }
295                HttpDecision::Deny(reason) => {
296                    return Err(HttpClientError {
297                        code: "denied".into(),
298                        message: reason,
299                    });
300                }
301                HttpDecision::Passthrough { record } => {
302                    mock_state = Some((meta, record));
303                }
304            }
305        }
306
307        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
308        let mut builder = self.http_client.request(method, &req.url);
309        for (key, value) in req.headers {
310            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
311                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
312            {
313                builder = builder.header(header, header_value);
314            }
315        }
316
317        if let Some(body) = raw_body.clone() {
318            builder = builder.body(body);
319        }
320
321        if let Some(opts) = opts {
322            if let Some(timeout_ms) = opts.timeout_ms {
323                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
324            }
325            if opts.allow_insecure == Some(true) {
326                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
327            }
328            if let Some(follow_redirects) = opts.follow_redirects
329                && !follow_redirects
330            {
331                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
332            }
333        }
334
335        let response = match builder.send() {
336            Ok(resp) => resp,
337            Err(err) => {
338                warn!(url = %req.url, error = %err, "http client request failed");
339                return Err(HttpClientError {
340                    code: "unavailable".into(),
341                    message: err.to_string(),
342                });
343            }
344        };
345
346        let status = response.status().as_u16();
347        let headers_vec = response
348            .headers()
349            .iter()
350            .map(|(k, v)| {
351                (
352                    k.as_str().to_string(),
353                    v.to_str().unwrap_or_default().to_string(),
354                )
355            })
356            .collect::<Vec<_>>();
357        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
358
359        if let Some((meta, true)) = mock_state.take()
360            && let Some(mock) = &self.mocks
361        {
362            let recorded = HttpMockResponse::new(
363                status,
364                headers_vec.clone().into_iter().collect(),
365                body_bytes
366                    .as_ref()
367                    .map(|b| String::from_utf8_lossy(b).into_owned()),
368            );
369            mock.http_record(&meta, &recorded);
370        }
371
372        Ok(HttpResponse {
373            status,
374            headers: headers_vec,
375            body: body_bytes,
376        })
377    }
378}
379
380impl SecretsStoreHost for HostState {
381    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
382        if provider_core_only::is_enabled() {
383            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
384            return Err(SecretsError::Denied);
385        }
386        if !self.config.secrets_policy.is_allowed(&key) {
387            return Err(SecretsError::Denied);
388        }
389        if let Some(mock) = &self.mocks
390            && let Some(value) = mock.secrets_lookup(&key)
391        {
392            return Ok(Some(value.into_bytes()));
393        }
394        match read_secret_blocking(&self.secrets, &key) {
395            Ok(bytes) => Ok(Some(bytes)),
396            Err(err) => {
397                warn!(secret = %key, error = %err, "secret lookup failed");
398                Err(SecretsError::NotFound)
399            }
400        }
401    }
402}
403
404impl HttpClientHost for HostState {
405    fn send(
406        &mut self,
407        req: HttpRequest,
408        ctx: Option<HttpTenantCtx>,
409    ) -> Result<HttpResponse, HttpClientError> {
410        self.send_http_request(req, None, ctx)
411    }
412}
413
414impl HttpClientHostV1_1 for HostState {
415    fn send(
416        &mut self,
417        req: HttpRequestV1_1,
418        opts: Option<HttpRequestOptionsV1_1>,
419        ctx: Option<HttpTenantCtxV1_1>,
420    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
421        let legacy_req = HttpRequest {
422            method: req.method,
423            url: req.url,
424            headers: req.headers,
425            body: req.body,
426        };
427        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
428            env: ctx.env,
429            tenant: ctx.tenant,
430            tenant_id: ctx.tenant_id,
431            team: ctx.team,
432            team_id: ctx.team_id,
433            user: ctx.user,
434            user_id: ctx.user_id,
435            trace_id: ctx.trace_id,
436            correlation_id: ctx.correlation_id,
437            attributes: ctx.attributes,
438            session_id: ctx.session_id,
439            flow_id: ctx.flow_id,
440            node_id: ctx.node_id,
441            provider_id: ctx.provider_id,
442            deadline_ms: ctx.deadline_ms,
443            attempt: ctx.attempt,
444            idempotency_key: ctx.idempotency_key,
445            impersonation: ctx
446                .impersonation
447                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
448                    actor_id,
449                    reason,
450                }),
451        });
452
453        self.send_http_request(legacy_req, opts, legacy_ctx)
454            .map(|resp| HttpResponseV1_1 {
455                status: resp.status,
456                headers: resp.headers,
457                body: resp.body,
458            })
459            .map_err(|err| HttpClientErrorV1_1 {
460                code: err.code,
461                message: err.message,
462            })
463    }
464}
465
466impl StateStoreHost for HostState {
467    fn read(
468        &mut self,
469        key: HostStateKey,
470        ctx: Option<StateTenantCtx>,
471    ) -> Result<Vec<u8>, StateError> {
472        let store = match self.state_store.as_ref() {
473            Some(store) => store.clone(),
474            None => {
475                return Err(StateError {
476                    code: "unavailable".into(),
477                    message: "state store not configured".into(),
478                });
479            }
480        };
481        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
482            Ok(ctx) => ctx,
483            Err(err) => {
484                return Err(StateError {
485                    code: "invalid-ctx".into(),
486                    message: err.to_string(),
487                });
488            }
489        };
490        let key = StoreStateKey::from(key);
491        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
492            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
493            Ok(None) => Err(StateError {
494                code: "not_found".into(),
495                message: "state key not found".into(),
496            }),
497            Err(err) => Err(StateError {
498                code: "internal".into(),
499                message: err.to_string(),
500            }),
501        }
502    }
503
504    fn write(
505        &mut self,
506        key: HostStateKey,
507        bytes: Vec<u8>,
508        ctx: Option<StateTenantCtx>,
509    ) -> Result<StateOpAck, StateError> {
510        let store = match self.state_store.as_ref() {
511            Some(store) => store.clone(),
512            None => {
513                return Err(StateError {
514                    code: "unavailable".into(),
515                    message: "state store not configured".into(),
516                });
517            }
518        };
519        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
520            Ok(ctx) => ctx,
521            Err(err) => {
522                return Err(StateError {
523                    code: "invalid-ctx".into(),
524                    message: err.to_string(),
525                });
526            }
527        };
528        let key = StoreStateKey::from(key);
529        let value = serde_json::from_slice(&bytes)
530            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
531        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
532            Ok(()) => Ok(StateOpAck::Ok),
533            Err(err) => Err(StateError {
534                code: "internal".into(),
535                message: err.to_string(),
536            }),
537        }
538    }
539
540    fn delete(
541        &mut self,
542        key: HostStateKey,
543        ctx: Option<StateTenantCtx>,
544    ) -> Result<StateOpAck, StateError> {
545        let store = match self.state_store.as_ref() {
546            Some(store) => store.clone(),
547            None => {
548                return Err(StateError {
549                    code: "unavailable".into(),
550                    message: "state store not configured".into(),
551                });
552            }
553        };
554        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
555            Ok(ctx) => ctx,
556            Err(err) => {
557                return Err(StateError {
558                    code: "invalid-ctx".into(),
559                    message: err.to_string(),
560                });
561            }
562        };
563        let key = StoreStateKey::from(key);
564        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
565            Ok(_) => Ok(StateOpAck::Ok),
566            Err(err) => Err(StateError {
567                code: "internal".into(),
568                message: err.to_string(),
569            }),
570        }
571    }
572}
573
574impl TelemetryLoggerHost for HostState {
575    fn log(
576        &mut self,
577        span: TelemetrySpanContext,
578        fields: Vec<(String, String)>,
579        _ctx: Option<TelemetryTenantCtx>,
580    ) -> Result<TelemetryAck, TelemetryError> {
581        if let Some(mock) = &self.mocks
582            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
583        {
584            return Ok(TelemetryAck::Ok);
585        }
586        let mut map = serde_json::Map::new();
587        for (k, v) in fields {
588            map.insert(k, Value::String(v));
589        }
590        tracing::info!(
591            tenant = %span.tenant,
592            flow_id = %span.flow_id,
593            node = ?span.node_id,
594            provider = %span.provider,
595            fields = %serde_json::Value::Object(map.clone()),
596            "telemetry log from pack"
597        );
598        Ok(TelemetryAck::Ok)
599    }
600}
601
602impl RunnerHostHttp for HostState {
603    fn request(
604        &mut self,
605        method: String,
606        url: String,
607        headers: Vec<String>,
608        body: Option<Vec<u8>>,
609    ) -> Result<Vec<u8>, String> {
610        let req = HttpRequest {
611            method,
612            url,
613            headers: headers
614                .chunks(2)
615                .filter_map(|chunk| {
616                    if chunk.len() == 2 {
617                        Some((chunk[0].clone(), chunk[1].clone()))
618                    } else {
619                        None
620                    }
621                })
622                .collect(),
623            body,
624        };
625        match HttpClientHost::send(self, req, None) {
626            Ok(resp) => Ok(resp.body.unwrap_or_default()),
627            Err(err) => Err(err.message),
628        }
629    }
630}
631
632impl RunnerHostKv for HostState {
633    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
634        None
635    }
636
637    fn put(&mut self, _ns: String, _key: String, _val: String) {}
638}
639
640impl MessagingSessionHost for HostState {
641    fn send(
642        &mut self,
643        _message: OutboundMessage,
644        _ctx: MessagingTenantCtx,
645    ) -> Result<MessagingAck, MessagingError> {
646        Err(MessagingError {
647            code: "unimplemented".into(),
648            message: "messaging session host not wired".into(),
649        })
650    }
651}
652
653enum ManifestLoad {
654    New {
655        manifest: Box<greentic_types::PackManifest>,
656        flows: PackFlows,
657    },
658    Legacy {
659        manifest: Box<legacy_pack::PackManifest>,
660        flows: PackFlows,
661    },
662}
663
664fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
665    let mut archive = ZipArchive::new(File::open(path)?)
666        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
667    let bytes = read_entry(&mut archive, "manifest.cbor")
668        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
669    match decode_pack_manifest(&bytes) {
670        Ok(manifest) => {
671            let cache = PackFlows::from_manifest(manifest.clone());
672            Ok(ManifestLoad::New {
673                manifest: Box::new(manifest),
674                flows: cache,
675            })
676        }
677        Err(err) => {
678            tracing::debug!(error = %err, pack = %path.display(), "decode_pack_manifest failed; trying legacy manifest");
679            // Fall back to legacy pack manifest
680            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
681                .context("failed to decode legacy pack manifest from manifest.cbor")?;
682            let flows = load_legacy_flows(&mut archive, &legacy)?;
683            Ok(ManifestLoad::Legacy {
684                manifest: Box::new(legacy),
685                flows,
686            })
687        }
688    }
689}
690
691fn load_legacy_flows(
692    archive: &mut ZipArchive<File>,
693    manifest: &legacy_pack::PackManifest,
694) -> Result<PackFlows> {
695    let mut flows = HashMap::new();
696    let mut descriptors = Vec::new();
697
698    for entry in &manifest.flows {
699        let bytes = read_entry(archive, &entry.file_json)
700            .with_context(|| format!("missing flow json {}", entry.file_json))?;
701        let doc: FlowDoc = serde_json::from_slice(&bytes)
702            .with_context(|| format!("failed to decode flow doc {}", entry.file_json))?;
703        let normalized = normalize_flow_doc(doc);
704        let flow_ir = flow_doc_to_ir(normalized)?;
705        let flow = flow_ir_to_flow(flow_ir)?;
706
707        descriptors.push(FlowDescriptor {
708            id: entry.id.clone(),
709            flow_type: entry.kind.clone(),
710            profile: manifest.meta.pack_id.clone(),
711            version: manifest.meta.version.to_string(),
712            description: None,
713        });
714        flows.insert(entry.id.clone(), flow);
715    }
716
717    let mut entry_flows = manifest.meta.entry_flows.clone();
718    if entry_flows.is_empty() {
719        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
720    }
721    let metadata = PackMetadata {
722        pack_id: manifest.meta.pack_id.clone(),
723        version: manifest.meta.version.to_string(),
724        entry_flows,
725        secret_requirements: Vec::new(),
726    };
727
728    Ok(PackFlows {
729        descriptors,
730        flows,
731        metadata,
732    })
733}
734
735pub struct ComponentState {
736    pub host: HostState,
737    wasi_ctx: WasiCtx,
738    resource_table: ResourceTable,
739}
740
741impl ComponentState {
742    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
743        let wasi_ctx = policy
744            .instantiate()
745            .context("failed to build WASI context")?;
746        Ok(Self {
747            host,
748            wasi_ctx,
749            resource_table: ResourceTable::new(),
750        })
751    }
752
753    fn host_mut(&mut self) -> &mut HostState {
754        &mut self.host
755    }
756}
757
758impl control::Host for ComponentState {
759    fn should_cancel(&mut self) -> bool {
760        false
761    }
762
763    fn yield_now(&mut self) {
764        // no-op cooperative yield
765    }
766}
767
768fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
769    let mut inst = linker.instance("greentic:component/control@0.4.0")?;
770    inst.func_wrap(
771        "should-cancel",
772        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
773            let host = caller.data_mut();
774            Ok((ComponentControlHost::should_cancel(host),))
775        },
776    )?;
777    inst.func_wrap(
778        "yield-now",
779        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
780            let host = caller.data_mut();
781            ComponentControlHost::yield_now(host);
782            Ok(())
783        },
784    )?;
785    Ok(())
786}
787
788pub fn register_all(linker: &mut Linker<ComponentState>) -> Result<()> {
789    add_wasi_to_linker(linker)?;
790    add_all_v1_to_linker(
791        linker,
792        HostFns {
793            http_client_v1_1: Some(|state| state.host_mut()),
794            http_client: Some(|state| state.host_mut()),
795            oauth_broker: None,
796            runner_host_http: Some(|state| state.host_mut()),
797            runner_host_kv: Some(|state| state.host_mut()),
798            messaging_session: Some(|state| state.host_mut()),
799            telemetry_logger: Some(|state| state.host_mut()),
800            state_store: Some(|state| state.host_mut()),
801            secrets_store: Some(|state| state.host_mut()),
802        },
803    )?;
804    Ok(())
805}
806
807impl OAuthHostContext for ComponentState {
808    fn tenant_id(&self) -> &str {
809        &self.host.config.tenant
810    }
811
812    fn env(&self) -> &str {
813        &self.host.default_env
814    }
815
816    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
817        &mut self.host.oauth_host
818    }
819
820    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
821        self.host.oauth_config.as_ref()
822    }
823}
824
825impl WasiView for ComponentState {
826    fn ctx(&mut self) -> WasiCtxView<'_> {
827        WasiCtxView {
828            ctx: &mut self.wasi_ctx,
829            table: &mut self.resource_table,
830        }
831    }
832}
833
834#[allow(unsafe_code)]
835unsafe impl Send for ComponentState {}
836#[allow(unsafe_code)]
837unsafe impl Sync for ComponentState {}
838
839impl PackRuntime {
840    #[allow(clippy::too_many_arguments)]
841    pub async fn load(
842        path: impl AsRef<Path>,
843        config: Arc<HostConfig>,
844        mocks: Option<Arc<MockLayer>>,
845        archive_source: Option<&Path>,
846        session_store: Option<DynSessionStore>,
847        state_store: Option<DynStateStore>,
848        wasi_policy: Arc<RunnerWasiPolicy>,
849        secrets: DynSecretsManager,
850        oauth_config: Option<OAuthBrokerConfig>,
851        verify_archive: bool,
852    ) -> Result<Self> {
853        let path = path.as_ref();
854        let (_pack_root, safe_path) = normalize_pack_path(path)?;
855        let is_component = safe_path
856            .extension()
857            .and_then(|ext| ext.to_str())
858            .map(|ext| ext.eq_ignore_ascii_case("wasm"))
859            .unwrap_or(false);
860        let archive_hint_path = if let Some(source) = archive_source {
861            let (_, normalized) = normalize_pack_path(source)?;
862            Some(normalized)
863        } else if is_component {
864            None
865        } else {
866            Some(safe_path.clone())
867        };
868        let archive_hint = archive_hint_path.as_deref();
869        if verify_archive {
870            let verify_target = archive_hint.unwrap_or(&safe_path);
871            verify::verify_pack(verify_target).await?;
872            tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
873        }
874        let engine = Engine::default();
875        let wasm_bytes = fs::read(&safe_path).await?;
876        let mut metadata = PackMetadata::from_wasm(&wasm_bytes)
877            .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
878        let mut manifest = None;
879        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
880        let flows = if let Some(archive_path) = archive_hint {
881            match load_manifest_and_flows(archive_path) {
882                Ok(ManifestLoad::New {
883                    manifest: m,
884                    flows: cache,
885                }) => {
886                    metadata = cache.metadata.clone();
887                    manifest = Some(*m);
888                    Some(cache)
889                }
890                Ok(ManifestLoad::Legacy {
891                    manifest: m,
892                    flows: cache,
893                }) => {
894                    metadata = cache.metadata.clone();
895                    legacy_manifest = Some(m);
896                    Some(cache)
897                }
898                Err(err) => {
899                    warn!(error = %err, pack = %archive_path.display(), "failed to parse pack manifest; skipping flows");
900                    None
901                }
902            }
903        } else {
904            None
905        };
906        let components = if let Some(archive_path) = archive_hint {
907            if let Some(new_manifest) = manifest.as_ref() {
908                match load_components_from_archive(&engine, archive_path, Some(new_manifest)) {
909                    Ok(map) => map,
910                    Err(err) => {
911                        warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
912                        HashMap::new()
913                    }
914                }
915            } else if let Some(legacy) = legacy_manifest.as_ref() {
916                match load_legacy_components_from_archive(&engine, archive_path, legacy) {
917                    Ok(map) => map,
918                    Err(err) => {
919                        warn!(error = %err, pack = %archive_path.display(), "failed to load components from archive");
920                        HashMap::new()
921                    }
922                }
923            } else {
924                HashMap::new()
925            }
926        } else if is_component {
927            let name = safe_path
928                .file_stem()
929                .map(|s| s.to_string_lossy().to_string())
930                .unwrap_or_else(|| "component".to_string());
931            let component = Component::from_binary(&engine, &wasm_bytes)?;
932            let mut map = HashMap::new();
933            map.insert(
934                name.clone(),
935                PackComponent {
936                    name,
937                    version: metadata.version.clone(),
938                    component,
939                },
940            );
941            map
942        } else {
943            HashMap::new()
944        };
945        let http_client = Arc::clone(&HTTP_CLIENT);
946        Ok(Self {
947            path: safe_path,
948            archive_path: archive_hint.map(Path::to_path_buf),
949            config,
950            engine,
951            metadata,
952            manifest,
953            legacy_manifest,
954            mocks,
955            flows,
956            components,
957            http_client,
958            pre_cache: Mutex::new(HashMap::new()),
959            session_store,
960            state_store,
961            wasi_policy,
962            provider_registry: RwLock::new(None),
963            secrets,
964            oauth_config,
965        })
966    }
967
968    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
969        if let Some(cache) = &self.flows {
970            return Ok(cache.descriptors.clone());
971        }
972        if let Some(manifest) = &self.manifest {
973            let descriptors = manifest
974                .flows
975                .iter()
976                .map(|flow| FlowDescriptor {
977                    id: flow.id.as_str().to_string(),
978                    flow_type: flow_kind_to_str(flow.kind).to_string(),
979                    profile: manifest.pack_id.as_str().to_string(),
980                    version: manifest.version.to_string(),
981                    description: None,
982                })
983                .collect();
984            return Ok(descriptors);
985        }
986        Ok(Vec::new())
987    }
988
989    #[allow(dead_code)]
990    pub async fn run_flow(
991        &self,
992        flow_id: &str,
993        input: serde_json::Value,
994    ) -> Result<serde_json::Value> {
995        let pack = Arc::new(
996            PackRuntime::load(
997                &self.path,
998                Arc::clone(&self.config),
999                self.mocks.clone(),
1000                self.archive_path.as_deref(),
1001                self.session_store.clone(),
1002                self.state_store.clone(),
1003                Arc::clone(&self.wasi_policy),
1004                self.secrets.clone(),
1005                self.oauth_config.clone(),
1006                false,
1007            )
1008            .await?,
1009        );
1010
1011        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1012        let retry_config = self.config.retry_config().into();
1013        let mocks = pack.mocks.as_deref();
1014        let tenant = self.config.tenant.as_str();
1015
1016        let ctx = FlowContext {
1017            tenant,
1018            flow_id,
1019            node_id: None,
1020            tool: None,
1021            action: None,
1022            session_id: None,
1023            provider_id: None,
1024            retry_config,
1025            observer: None,
1026            mocks,
1027        };
1028
1029        let execution = engine.execute(ctx, input).await?;
1030        match execution.status {
1031            FlowStatus::Completed => Ok(execution.output),
1032            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1033                "status": "pending",
1034                "reason": wait.reason,
1035                "resume": wait.snapshot,
1036                "response": execution.output,
1037            })),
1038        }
1039    }
1040
1041    pub async fn invoke_component(
1042        &self,
1043        component_ref: &str,
1044        ctx: ComponentExecCtx,
1045        operation: &str,
1046        _config_json: Option<String>,
1047        input_json: String,
1048    ) -> Result<Value> {
1049        let pack_component = self
1050            .components
1051            .get(component_ref)
1052            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1053
1054        let mut linker = Linker::new(&self.engine);
1055        register_all(&mut linker)?;
1056        add_component_control_to_linker(&mut linker)?;
1057        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1058        let pre: ComponentPre<ComponentState> = ComponentPre::new(pre_instance)?;
1059
1060        let host_state = HostState::new(
1061            Arc::clone(&self.config),
1062            Arc::clone(&self.http_client),
1063            self.mocks.clone(),
1064            self.session_store.clone(),
1065            self.state_store.clone(),
1066            Arc::clone(&self.secrets),
1067            self.oauth_config.clone(),
1068        )?;
1069        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1070        let mut store = wasmtime::Store::new(&self.engine, store_state);
1071        let bindings: crate::component_api::Component = pre.instantiate_async(&mut store).await?;
1072        let node = bindings.greentic_component_node();
1073
1074        let result = node.call_invoke(&mut store, &ctx, operation, &input_json)?;
1075
1076        match result {
1077            InvokeResult::Ok(body) => {
1078                if body.is_empty() {
1079                    return Ok(Value::Null);
1080                }
1081                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
1082            }
1083            InvokeResult::Err(NodeError {
1084                code,
1085                message,
1086                retryable,
1087                backoff_ms,
1088                details,
1089            }) => {
1090                let mut obj = serde_json::Map::new();
1091                obj.insert("ok".into(), Value::Bool(false));
1092                let mut error = serde_json::Map::new();
1093                error.insert("code".into(), Value::String(code));
1094                error.insert("message".into(), Value::String(message));
1095                error.insert("retryable".into(), Value::Bool(retryable));
1096                if let Some(backoff) = backoff_ms {
1097                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
1098                }
1099                if let Some(details) = details {
1100                    error.insert(
1101                        "details".into(),
1102                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
1103                    );
1104                }
1105                obj.insert("error".into(), Value::Object(error));
1106                Ok(Value::Object(obj))
1107            }
1108        }
1109    }
1110
1111    pub fn resolve_provider(
1112        &self,
1113        provider_id: Option<&str>,
1114        provider_type: Option<&str>,
1115    ) -> Result<ProviderBinding> {
1116        let registry = self.provider_registry()?;
1117        registry.resolve(provider_id, provider_type)
1118    }
1119
1120    pub async fn invoke_provider(
1121        &self,
1122        binding: &ProviderBinding,
1123        _ctx: ComponentExecCtx,
1124        op: &str,
1125        input_json: Vec<u8>,
1126    ) -> Result<Value> {
1127        let component_ref = &binding.component_ref;
1128        let pack_component = self
1129            .components
1130            .get(component_ref)
1131            .with_context(|| format!("provider component '{component_ref}' not found in pack"))?;
1132
1133        let mut linker = Linker::new(&self.engine);
1134        register_all(&mut linker)?;
1135        add_component_control_to_linker(&mut linker)?;
1136        let pre_instance = linker.instantiate_pre(&pack_component.component)?;
1137        let pre: ProviderComponentPre<ComponentState> = ProviderComponentPre::new(pre_instance)?;
1138
1139        let host_state = HostState::new(
1140            Arc::clone(&self.config),
1141            Arc::clone(&self.http_client),
1142            self.mocks.clone(),
1143            self.session_store.clone(),
1144            self.state_store.clone(),
1145            Arc::clone(&self.secrets),
1146            self.oauth_config.clone(),
1147        )?;
1148        let store_state = ComponentState::new(host_state, Arc::clone(&self.wasi_policy))?;
1149        let mut store = wasmtime::Store::new(&self.engine, store_state);
1150        let bindings: crate::provider_core::SchemaCore = pre.instantiate_async(&mut store).await?;
1151        let provider = bindings.greentic_provider_core_schema_core_api();
1152
1153        let result = provider.call_invoke(&mut store, op, &input_json)?;
1154        deserialize_json_bytes(result)
1155    }
1156
1157    fn provider_registry(&self) -> Result<ProviderRegistry> {
1158        if let Some(registry) = self.provider_registry.read().clone() {
1159            return Ok(registry);
1160        }
1161        let manifest = self
1162            .manifest
1163            .as_ref()
1164            .context("pack manifest required for provider resolution")?;
1165        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1166        let registry = ProviderRegistry::new(
1167            manifest,
1168            self.state_store.clone(),
1169            &self.config.tenant,
1170            &env,
1171        )?;
1172        *self.provider_registry.write() = Some(registry.clone());
1173        Ok(registry)
1174    }
1175
1176    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1177        if let Some(cache) = &self.flows {
1178            return cache
1179                .flows
1180                .get(flow_id)
1181                .cloned()
1182                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1183        }
1184        if let Some(manifest) = &self.manifest {
1185            let entry = manifest
1186                .flows
1187                .iter()
1188                .find(|f| f.id.as_str() == flow_id)
1189                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1190            return Ok(entry.flow.clone());
1191        }
1192        bail!("flow '{flow_id}' not available (pack exports disabled)")
1193    }
1194
1195    pub fn metadata(&self) -> &PackMetadata {
1196        &self.metadata
1197    }
1198
1199    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1200        &self.metadata.secret_requirements
1201    }
1202
1203    pub fn missing_secrets(
1204        &self,
1205        tenant_ctx: &TypesTenantCtx,
1206    ) -> Vec<greentic_types::SecretRequirement> {
1207        let env = tenant_ctx.env.as_str().to_string();
1208        let tenant = tenant_ctx.tenant.as_str().to_string();
1209        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1210        self.required_secrets()
1211            .iter()
1212            .filter(|req| {
1213                // scope must match current context if provided
1214                if let Some(scope) = &req.scope {
1215                    if scope.env != env {
1216                        return false;
1217                    }
1218                    if scope.tenant != tenant {
1219                        return false;
1220                    }
1221                    if let Some(ref team_req) = scope.team
1222                        && team.as_ref() != Some(team_req)
1223                    {
1224                        return false;
1225                    }
1226                }
1227                read_secret_blocking(&self.secrets, req.key.as_str()).is_err()
1228            })
1229            .cloned()
1230            .collect()
1231    }
1232
1233    pub fn for_component_test(
1234        components: Vec<(String, PathBuf)>,
1235        flows: HashMap<String, FlowIR>,
1236        config: Arc<HostConfig>,
1237    ) -> Result<Self> {
1238        let engine = Engine::default();
1239        let mut component_map = HashMap::new();
1240        for (name, path) in components {
1241            if !path.exists() {
1242                bail!("component artifact missing: {}", path.display());
1243            }
1244            let wasm_bytes = std::fs::read(&path)?;
1245            let component = Component::from_binary(&engine, &wasm_bytes)
1246                .with_context(|| format!("failed to compile component {}", path.display()))?;
1247            component_map.insert(
1248                name.clone(),
1249                PackComponent {
1250                    name,
1251                    version: "0.0.0".into(),
1252                    component,
1253                },
1254            );
1255        }
1256
1257        let mut flow_map = HashMap::new();
1258        let mut descriptors = Vec::new();
1259        for (id, ir) in flows {
1260            let flow_type = ir.flow_type.clone();
1261            let flow = flow_ir_to_flow(ir)?;
1262            flow_map.insert(id.clone(), flow);
1263            descriptors.push(FlowDescriptor {
1264                id: id.clone(),
1265                flow_type,
1266                profile: "test".into(),
1267                version: "0.0.0".into(),
1268                description: None,
1269            });
1270        }
1271        let flows_cache = PackFlows {
1272            descriptors: descriptors.clone(),
1273            flows: flow_map,
1274            metadata: PackMetadata::fallback(Path::new("component-test")),
1275        };
1276
1277        Ok(Self {
1278            path: PathBuf::new(),
1279            archive_path: None,
1280            config,
1281            engine,
1282            metadata: PackMetadata::fallback(Path::new("component-test")),
1283            manifest: None,
1284            legacy_manifest: None,
1285            mocks: None,
1286            flows: Some(flows_cache),
1287            components: component_map,
1288            http_client: Arc::clone(&HTTP_CLIENT),
1289            pre_cache: Mutex::new(HashMap::new()),
1290            session_store: None,
1291            state_store: None,
1292            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1293            provider_registry: RwLock::new(None),
1294            secrets: crate::secrets::default_manager(),
1295            oauth_config: None,
1296        })
1297    }
1298}
1299
1300struct PackFlows {
1301    descriptors: Vec<FlowDescriptor>,
1302    flows: HashMap<String, Flow>,
1303    metadata: PackMetadata,
1304}
1305
1306fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
1307    if bytes.is_empty() {
1308        return Ok(Value::Null);
1309    }
1310    serde_json::from_slice(&bytes).or_else(|_| {
1311        String::from_utf8(bytes)
1312            .map(Value::String)
1313            .map_err(|err| anyhow!(err))
1314    })
1315}
1316
1317impl PackFlows {
1318    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
1319        let descriptors = manifest
1320            .flows
1321            .iter()
1322            .map(|entry| FlowDescriptor {
1323                id: entry.id.as_str().to_string(),
1324                flow_type: flow_kind_to_str(entry.kind).to_string(),
1325                profile: manifest.pack_id.as_str().to_string(),
1326                version: manifest.version.to_string(),
1327                description: None,
1328            })
1329            .collect();
1330        let mut flows = HashMap::new();
1331        for entry in &manifest.flows {
1332            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
1333        }
1334        Self {
1335            metadata: PackMetadata::from_manifest(&manifest),
1336            descriptors,
1337            flows,
1338        }
1339    }
1340}
1341
1342fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
1343    match kind {
1344        greentic_types::FlowKind::Messaging => "messaging",
1345        greentic_types::FlowKind::Event => "event",
1346        greentic_types::FlowKind::ComponentConfig => "component-config",
1347        greentic_types::FlowKind::Job => "job",
1348        greentic_types::FlowKind::Http => "http",
1349    }
1350}
1351
1352fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
1353    let mut file = archive
1354        .by_name(name)
1355        .with_context(|| format!("entry {name} missing from archive"))?;
1356    let mut buf = Vec::new();
1357    file.read_to_end(&mut buf)?;
1358    Ok(buf)
1359}
1360
1361fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
1362    for node in doc.nodes.values_mut() {
1363        if node.component.is_empty()
1364            && let Some((component_ref, payload)) = node.raw.iter().next()
1365        {
1366            if component_ref.starts_with("emit.") {
1367                node.component = component_ref.clone();
1368                node.payload = payload.clone();
1369                node.raw.clear();
1370                continue;
1371            }
1372            let (target_component, operation, input, config) =
1373                infer_component_exec(payload, component_ref);
1374            let mut payload_obj = serde_json::Map::new();
1375            // component.exec is meta; ensure the payload carries the actual target component.
1376            payload_obj.insert("component".into(), Value::String(target_component));
1377            payload_obj.insert("operation".into(), Value::String(operation));
1378            payload_obj.insert("input".into(), input);
1379            if let Some(cfg) = config {
1380                payload_obj.insert("config".into(), cfg);
1381            }
1382            node.component = "component.exec".to_string();
1383            node.payload = Value::Object(payload_obj);
1384        }
1385    }
1386    doc
1387}
1388
1389fn infer_component_exec(
1390    payload: &Value,
1391    component_ref: &str,
1392) -> (String, String, Value, Option<Value>) {
1393    let default_op = if component_ref.starts_with("templating.") {
1394        "render"
1395    } else {
1396        "invoke"
1397    }
1398    .to_string();
1399
1400    if let Value::Object(map) = payload {
1401        let op = map
1402            .get("op")
1403            .or_else(|| map.get("operation"))
1404            .and_then(Value::as_str)
1405            .map(|s| s.to_string())
1406            .unwrap_or_else(|| default_op.clone());
1407
1408        let mut input = map.clone();
1409        let config = input.remove("config");
1410        let component = input
1411            .get("component")
1412            .or_else(|| input.get("component_ref"))
1413            .and_then(Value::as_str)
1414            .map(|s| s.to_string())
1415            .unwrap_or_else(|| component_ref.to_string());
1416        input.remove("component");
1417        input.remove("component_ref");
1418        input.remove("op");
1419        input.remove("operation");
1420        return (component, op, Value::Object(input), config);
1421    }
1422
1423    (component_ref.to_string(), default_op, payload.clone(), None)
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428    use super::*;
1429    use greentic_flow::model::{FlowDoc, NodeDoc};
1430    use serde_json::json;
1431    use std::collections::BTreeMap;
1432
1433    #[test]
1434    fn normalizes_raw_component_to_component_exec() {
1435        let mut nodes = BTreeMap::new();
1436        let mut raw = BTreeMap::new();
1437        raw.insert(
1438            "templating.handlebars".into(),
1439            json!({ "template": "Hi {{name}}" }),
1440        );
1441        nodes.insert(
1442            "start".into(),
1443            NodeDoc {
1444                raw,
1445                routing: json!([{"out": true}]),
1446                ..Default::default()
1447            },
1448        );
1449        let doc = FlowDoc {
1450            id: "welcome".into(),
1451            title: None,
1452            description: None,
1453            flow_type: "messaging".into(),
1454            start: Some("start".into()),
1455            parameters: json!({}),
1456            tags: Vec::new(),
1457            entrypoints: BTreeMap::new(),
1458            nodes,
1459        };
1460
1461        let normalized = normalize_flow_doc(doc);
1462        let node = normalized.nodes.get("start").expect("node exists");
1463        assert_eq!(node.component, "component.exec");
1464        assert!(node.raw.is_empty() || node.raw.contains_key("templating.handlebars"));
1465        let payload = node.payload.as_object().expect("payload object");
1466        assert_eq!(
1467            payload.get("component"),
1468            Some(&Value::String("templating.handlebars".into()))
1469        );
1470        assert_eq!(
1471            payload.get("operation"),
1472            Some(&Value::String("render".into()))
1473        );
1474        let input = payload.get("input").unwrap();
1475        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
1476    }
1477}
1478
1479fn load_components_from_archive(
1480    engine: &Engine,
1481    path: &Path,
1482    manifest: Option<&greentic_types::PackManifest>,
1483) -> Result<HashMap<String, PackComponent>> {
1484    let mut archive = ZipArchive::new(File::open(path)?)
1485        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1486    let mut components = HashMap::new();
1487    if let Some(manifest) = manifest {
1488        for entry in &manifest.components {
1489            let file_name = format!("components/{}.wasm", entry.id.as_str());
1490            let bytes = read_entry(&mut archive, &file_name)
1491                .with_context(|| format!("missing component {}", file_name))?;
1492            let component = Component::from_binary(engine, &bytes)
1493                .with_context(|| format!("failed to compile component {}", entry.id.as_str()))?;
1494            components.insert(
1495                entry.id.as_str().to_string(),
1496                PackComponent {
1497                    name: entry.id.as_str().to_string(),
1498                    version: entry.version.to_string(),
1499                    component,
1500                },
1501            );
1502        }
1503    }
1504    Ok(components)
1505}
1506
1507fn load_legacy_components_from_archive(
1508    engine: &Engine,
1509    path: &Path,
1510    manifest: &legacy_pack::PackManifest,
1511) -> Result<HashMap<String, PackComponent>> {
1512    let mut archive = ZipArchive::new(File::open(path)?)
1513        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
1514    let mut components = HashMap::new();
1515    for entry in &manifest.components {
1516        let bytes = read_entry(&mut archive, &entry.file_wasm)
1517            .with_context(|| format!("missing component {}", entry.file_wasm))?;
1518        let component = Component::from_binary(engine, &bytes)
1519            .with_context(|| format!("failed to compile component {}", entry.name))?;
1520        components.insert(
1521            entry.name.clone(),
1522            PackComponent {
1523                name: entry.name.clone(),
1524                version: entry.version.to_string(),
1525                component,
1526            },
1527        );
1528    }
1529    Ok(components)
1530}
1531
1532#[derive(Clone, Debug, Default, Serialize, Deserialize)]
1533pub struct PackMetadata {
1534    pub pack_id: String,
1535    pub version: String,
1536    #[serde(default)]
1537    pub entry_flows: Vec<String>,
1538    #[serde(default)]
1539    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
1540}
1541
1542impl PackMetadata {
1543    fn from_wasm(bytes: &[u8]) -> Option<Self> {
1544        let parser = Parser::new(0);
1545        for payload in parser.parse_all(bytes) {
1546            let payload = payload.ok()?;
1547            match payload {
1548                Payload::CustomSection(section) => {
1549                    if section.name() == "greentic.manifest"
1550                        && let Ok(meta) = Self::from_bytes(section.data())
1551                    {
1552                        return Some(meta);
1553                    }
1554                }
1555                Payload::DataSection(reader) => {
1556                    for segment in reader.into_iter().flatten() {
1557                        if let Ok(meta) = Self::from_bytes(segment.data) {
1558                            return Some(meta);
1559                        }
1560                    }
1561                }
1562                _ => {}
1563            }
1564        }
1565        None
1566    }
1567
1568    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
1569        #[derive(Deserialize)]
1570        struct RawManifest {
1571            pack_id: String,
1572            version: String,
1573            #[serde(default)]
1574            entry_flows: Vec<String>,
1575            #[serde(default)]
1576            flows: Vec<RawFlow>,
1577            #[serde(default)]
1578            secret_requirements: Vec<greentic_types::SecretRequirement>,
1579        }
1580
1581        #[derive(Deserialize)]
1582        struct RawFlow {
1583            id: String,
1584        }
1585
1586        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
1587        let mut entry_flows = if manifest.entry_flows.is_empty() {
1588            manifest.flows.iter().map(|f| f.id.clone()).collect()
1589        } else {
1590            manifest.entry_flows.clone()
1591        };
1592        entry_flows.retain(|id| !id.is_empty());
1593        Ok(Self {
1594            pack_id: manifest.pack_id,
1595            version: manifest.version,
1596            entry_flows,
1597            secret_requirements: manifest.secret_requirements,
1598        })
1599    }
1600
1601    pub fn fallback(path: &Path) -> Self {
1602        let pack_id = path
1603            .file_stem()
1604            .map(|s| s.to_string_lossy().into_owned())
1605            .unwrap_or_else(|| "unknown-pack".to_string());
1606        Self {
1607            pack_id,
1608            version: "0.0.0".to_string(),
1609            entry_flows: Vec::new(),
1610            secret_requirements: Vec::new(),
1611        }
1612    }
1613
1614    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
1615        let entry_flows = manifest
1616            .flows
1617            .iter()
1618            .map(|flow| flow.id.as_str().to_string())
1619            .collect::<Vec<_>>();
1620        Self {
1621            pack_id: manifest.pack_id.as_str().to_string(),
1622            version: manifest.version.to_string(),
1623            entry_flows,
1624            secret_requirements: manifest.secret_requirements.clone(),
1625        }
1626    }
1627}