Skip to main content

greentic_runner_host/
pack.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::fs::File;
3use std::io::Read;
4use std::path::{Path, PathBuf};
5use std::str::FromStr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::cache::{ArtifactKey, CacheConfig, CacheManager, CpuPolicy, EngineProfile};
10use crate::component_api::{
11    self, node::ExecCtx as ComponentExecCtx, node::InvokeResult, node::NodeError,
12};
13use crate::oauth::{OAuthBrokerConfig, OAuthBrokerHost, OAuthHostContext};
14use crate::provider::{ProviderBinding, ProviderRegistry};
15use crate::provider_core::{
16    schema_core::SchemaCorePre as LegacySchemaCorePre,
17    schema_core_schema::SchemaCorePre as SchemaSchemaCorePre,
18};
19use crate::provider_core_only;
20use crate::runtime_wasmtime::{Component, Engine, InstancePre, Linker, ResourceTable};
21use anyhow::{Context, Result, anyhow, bail};
22use futures::executor::block_on;
23use greentic_distributor_client::dist::{DistClient, DistError, DistOptions};
24use greentic_interfaces_wasmtime::host_helpers::v1::{
25    self as host_v1, HostFns, add_all_v1_to_linker,
26    runner_host_http::RunnerHostHttp,
27    runner_host_kv::RunnerHostKv,
28    secrets_store::{SecretsError, SecretsErrorV1_1, SecretsStoreHost, SecretsStoreHostV1_1},
29    state_store::{
30        OpAck as StateOpAck, StateKey as HostStateKey, StateStoreError as StateError,
31        StateStoreHost, TenantCtx as StateTenantCtx,
32    },
33    telemetry_logger::{
34        OpAck as TelemetryAck, SpanContext as TelemetrySpanContext,
35        TelemetryLoggerError as TelemetryError, TelemetryLoggerHost,
36        TenantCtx as TelemetryTenantCtx,
37    },
38};
39use greentic_interfaces_wasmtime::http_client_client_v1_0::greentic::interfaces_types::types::Impersonation as ImpersonationV1_0;
40use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::http::http_client as http_client_client_alias;
41use greentic_interfaces_wasmtime::http_client_client_v1_1::greentic::interfaces_types::types::Impersonation as ImpersonationV1_1;
42use greentic_pack::builder as legacy_pack;
43use greentic_types::flow::FlowHasher;
44use greentic_types::{
45    ArtifactLocationV1, ComponentId, ComponentManifest, ComponentSourceRef, ComponentSourcesV1,
46    EXT_COMPONENT_SOURCES_V1, EnvId, ExtensionRef, Flow, FlowComponentRef, FlowId, FlowKind,
47    FlowMetadata, InputMapping, Node, NodeId, OutputMapping, Routing, StateKey as StoreStateKey,
48    TeamId, TelemetryHints, TenantCtx as TypesTenantCtx, TenantId, UserId, decode_pack_manifest,
49    pack_manifest::ExtensionInline,
50};
51use host_v1::http_client as host_http_client;
52use host_v1::http_client::{
53    HttpClientError, HttpClientErrorV1_1, HttpClientHost, HttpClientHostV1_1,
54    Request as HttpRequest, RequestOptionsV1_1 as HttpRequestOptionsV1_1,
55    RequestV1_1 as HttpRequestV1_1, Response as HttpResponse, ResponseV1_1 as HttpResponseV1_1,
56    TenantCtx as HttpTenantCtx, TenantCtxV1_1 as HttpTenantCtxV1_1,
57};
58use indexmap::IndexMap;
59use once_cell::sync::Lazy;
60use parking_lot::{Mutex, RwLock};
61use reqwest::blocking::Client as BlockingClient;
62use runner_core::normalize_under_root;
63use serde::{Deserialize, Serialize};
64use serde_cbor;
65use serde_json::{self, Value};
66use sha2::Digest;
67use tempfile::TempDir;
68use tokio::fs;
69use wasmparser::{Parser, Payload};
70use wasmtime::{Store, StoreContextMut};
71use zip::ZipArchive;
72
73use crate::runner::engine::{FlowContext, FlowEngine, FlowStatus};
74use crate::runner::flow_adapter::{FlowIR, flow_doc_to_ir, flow_ir_to_flow};
75use crate::runner::mocks::{HttpDecision, HttpMockRequest, HttpMockResponse, MockLayer};
76#[cfg(feature = "fault-injection")]
77use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
78
79use crate::config::HostConfig;
80use crate::fault;
81use crate::secrets::{DynSecretsManager, read_secret_blocking, write_secret_blocking};
82use crate::storage::state::STATE_PREFIX;
83use crate::storage::{DynSessionStore, DynStateStore};
84use crate::verify;
85use crate::wasi::{PreopenSpec, RunnerWasiPolicy};
86use tracing::warn;
87use wasmtime_wasi::p2::add_to_linker_sync as add_wasi_to_linker;
88use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
89
90use greentic_flow::model::FlowDoc;
91
92#[allow(dead_code)]
93pub struct PackRuntime {
94    /// Component artifact path (wasm file).
95    path: PathBuf,
96    /// Optional archive (.gtpack) used to load flows/manifests.
97    archive_path: Option<PathBuf>,
98    config: Arc<HostConfig>,
99    engine: Engine,
100    metadata: PackMetadata,
101    manifest: Option<greentic_types::PackManifest>,
102    legacy_manifest: Option<Box<legacy_pack::PackManifest>>,
103    component_manifests: HashMap<String, ComponentManifest>,
104    mocks: Option<Arc<MockLayer>>,
105    flows: Option<PackFlows>,
106    components: HashMap<String, PackComponent>,
107    http_client: Arc<BlockingClient>,
108    pre_cache: Mutex<HashMap<String, InstancePre<ComponentState>>>,
109    session_store: Option<DynSessionStore>,
110    state_store: Option<DynStateStore>,
111    wasi_policy: Arc<RunnerWasiPolicy>,
112    assets_tempdir: Option<TempDir>,
113    provider_registry: RwLock<Option<ProviderRegistry>>,
114    secrets: DynSecretsManager,
115    oauth_config: Option<OAuthBrokerConfig>,
116    cache: CacheManager,
117}
118
119struct PackComponent {
120    #[allow(dead_code)]
121    name: String,
122    #[allow(dead_code)]
123    version: String,
124    component: Arc<Component>,
125}
126
127fn run_on_wasi_thread<F, T>(task_name: &'static str, task: F) -> Result<T>
128where
129    F: FnOnce() -> Result<T> + Send + 'static,
130    T: Send + 'static,
131{
132    let builder = std::thread::Builder::new().name(format!("greentic-wasmtime-{task_name}"));
133    let handle = builder
134        .spawn(move || {
135            let pid = std::process::id();
136            let thread_id = std::thread::current().id();
137            let tokio_handle_present = tokio::runtime::Handle::try_current().is_ok();
138            tracing::info!(
139                event = "wasmtime.thread.start",
140                task = task_name,
141                pid,
142                thread_id = ?thread_id,
143                tokio_handle_present,
144                "starting Wasmtime thread"
145            );
146            task()
147        })
148        .context("failed to spawn Wasmtime thread")?;
149    handle
150        .join()
151        .map_err(|err| {
152            let reason = if let Some(msg) = err.downcast_ref::<&str>() {
153                msg.to_string()
154            } else if let Some(msg) = err.downcast_ref::<String>() {
155                msg.clone()
156            } else {
157                "unknown panic".to_string()
158            };
159            anyhow!("Wasmtime thread panicked: {reason}")
160        })
161        .and_then(|res| res)
162}
163
164#[derive(Debug, Default, Clone)]
165pub struct ComponentResolution {
166    /// Root of a materialized pack directory containing `manifest.cbor` and `components/`.
167    pub materialized_root: Option<PathBuf>,
168    /// Explicit overrides mapping component id -> wasm path.
169    pub overrides: HashMap<String, PathBuf>,
170    /// If true, do not fetch remote components; require cached artifacts.
171    pub dist_offline: bool,
172    /// Optional cache directory for resolved remote components.
173    pub dist_cache_dir: Option<PathBuf>,
174    /// Allow bundled components without wasm_sha256 (dev-only escape hatch).
175    pub allow_missing_hash: bool,
176}
177
178fn build_blocking_client() -> BlockingClient {
179    std::thread::spawn(|| {
180        BlockingClient::builder()
181            .no_proxy()
182            .build()
183            .expect("blocking client")
184    })
185    .join()
186    .expect("client build thread panicked")
187}
188
189fn normalize_pack_path(path: &Path) -> Result<(PathBuf, PathBuf)> {
190    let (root, candidate) = if path.is_absolute() {
191        let parent = path
192            .parent()
193            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
194        let root = parent
195            .canonicalize()
196            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
197        let file = path
198            .file_name()
199            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
200        (root, PathBuf::from(file))
201    } else {
202        let cwd = std::env::current_dir().context("failed to resolve current directory")?;
203        let base = if let Some(parent) = path.parent() {
204            cwd.join(parent)
205        } else {
206            cwd
207        };
208        let root = base
209            .canonicalize()
210            .with_context(|| format!("failed to canonicalize {}", base.display()))?;
211        let file = path
212            .file_name()
213            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
214        (root, PathBuf::from(file))
215    };
216    let safe = normalize_under_root(&root, &candidate)?;
217    Ok((root, safe))
218}
219
220static HTTP_CLIENT: Lazy<Arc<BlockingClient>> = Lazy::new(|| Arc::new(build_blocking_client()));
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct FlowDescriptor {
224    pub id: String,
225    #[serde(rename = "type")]
226    pub flow_type: String,
227    pub pack_id: String,
228    pub profile: String,
229    pub version: String,
230    #[serde(default)]
231    pub description: Option<String>,
232}
233
234pub struct HostState {
235    #[allow(dead_code)]
236    pack_id: String,
237    config: Arc<HostConfig>,
238    http_client: Arc<BlockingClient>,
239    default_env: String,
240    #[allow(dead_code)]
241    session_store: Option<DynSessionStore>,
242    state_store: Option<DynStateStore>,
243    mocks: Option<Arc<MockLayer>>,
244    secrets: DynSecretsManager,
245    oauth_config: Option<OAuthBrokerConfig>,
246    oauth_host: OAuthBrokerHost,
247    exec_ctx: Option<ComponentExecCtx>,
248    component_ref: Option<String>,
249    provider_core_component: bool,
250}
251
252impl HostState {
253    #[allow(clippy::default_constructed_unit_structs)]
254    #[allow(clippy::too_many_arguments)]
255    pub fn new(
256        pack_id: String,
257        config: Arc<HostConfig>,
258        http_client: Arc<BlockingClient>,
259        mocks: Option<Arc<MockLayer>>,
260        session_store: Option<DynSessionStore>,
261        state_store: Option<DynStateStore>,
262        secrets: DynSecretsManager,
263        oauth_config: Option<OAuthBrokerConfig>,
264        exec_ctx: Option<ComponentExecCtx>,
265        component_ref: Option<String>,
266        provider_core_component: bool,
267    ) -> Result<Self> {
268        let default_env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
269        Ok(Self {
270            pack_id,
271            config,
272            http_client,
273            default_env,
274            session_store,
275            state_store,
276            mocks,
277            secrets,
278            oauth_config,
279            oauth_host: OAuthBrokerHost::default(),
280            exec_ctx,
281            component_ref,
282            provider_core_component,
283        })
284    }
285
286    fn instantiate_component_result(
287        linker: &mut Linker<ComponentState>,
288        store: &mut Store<ComponentState>,
289        component: &Component,
290        ctx: &ComponentExecCtx,
291        operation: &str,
292        input_json: &str,
293    ) -> Result<InvokeResult> {
294        let pre_instance = linker.instantiate_pre(component)?;
295        match component_api::v0_5::ComponentPre::new(pre_instance) {
296            Ok(pre) => {
297                let result = block_on(async {
298                    let bindings = pre.instantiate_async(&mut *store).await?;
299                    let node = bindings.greentic_component_node();
300                    let ctx_v05 = component_api::exec_ctx_v0_5(ctx);
301                    let operation_owned = operation.to_string();
302                    let input_owned = input_json.to_string();
303                    node.call_invoke(&mut *store, &ctx_v05, &operation_owned, &input_owned)
304                })?;
305                Ok(component_api::invoke_result_from_v0_5(result))
306            }
307            Err(err) => {
308                if is_missing_node_export(&err, "0.5.0") {
309                    let pre_instance = linker.instantiate_pre(component)?;
310                    let pre: component_api::v0_4::ComponentPre<ComponentState> =
311                        component_api::v0_4::ComponentPre::new(pre_instance)?;
312                    let result = block_on(async {
313                        let bindings = pre.instantiate_async(&mut *store).await?;
314                        let node = bindings.greentic_component_node();
315                        let ctx_v04 = component_api::exec_ctx_v0_4(ctx);
316                        let operation_owned = operation.to_string();
317                        let input_owned = input_json.to_string();
318                        node.call_invoke(&mut *store, &ctx_v04, &operation_owned, &input_owned)
319                    })?;
320                    Ok(component_api::invoke_result_from_v0_4(result))
321                } else {
322                    Err(err)
323                }
324            }
325        }
326    }
327
328    fn convert_invoke_result(result: InvokeResult) -> Result<Value> {
329        match result {
330            InvokeResult::Ok(body) => {
331                if body.is_empty() {
332                    return Ok(Value::Null);
333                }
334                serde_json::from_str(&body).or_else(|_| Ok(Value::String(body)))
335            }
336            InvokeResult::Err(NodeError {
337                code,
338                message,
339                retryable,
340                backoff_ms,
341                details,
342            }) => {
343                let mut obj = serde_json::Map::new();
344                obj.insert("ok".into(), Value::Bool(false));
345                let mut error = serde_json::Map::new();
346                error.insert("code".into(), Value::String(code));
347                error.insert("message".into(), Value::String(message));
348                error.insert("retryable".into(), Value::Bool(retryable));
349                if let Some(backoff) = backoff_ms {
350                    error.insert("backoff_ms".into(), Value::Number(backoff.into()));
351                }
352                if let Some(details) = details {
353                    error.insert(
354                        "details".into(),
355                        serde_json::from_str(&details).unwrap_or(Value::String(details)),
356                    );
357                }
358                obj.insert("error".into(), Value::Object(error));
359                Ok(Value::Object(obj))
360            }
361        }
362    }
363
364    pub fn get_secret(&self, key: &str) -> Result<String> {
365        if provider_core_only::is_enabled() {
366            bail!(provider_core_only::blocked_message("secrets"))
367        }
368        if !self.config.secrets_policy.is_allowed(key) {
369            bail!("secret {key} is not permitted by bindings policy");
370        }
371        if let Some(mock) = &self.mocks
372            && let Some(value) = mock.secrets_lookup(key)
373        {
374            return Ok(value);
375        }
376        let ctx = self.config.tenant_ctx();
377        let bytes = read_secret_blocking(&self.secrets, &ctx, &self.pack_id, key)
378            .context("failed to read secret from manager")?;
379        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
380        Ok(value)
381    }
382
383    fn allows_secret_write_in_provider_core_only(&self) -> bool {
384        self.provider_core_component || self.component_ref.is_none()
385    }
386
387    fn tenant_ctx_from_v1(&self, ctx: Option<StateTenantCtx>) -> Result<TypesTenantCtx> {
388        let tenant_raw = ctx
389            .as_ref()
390            .map(|ctx| ctx.tenant.clone())
391            .or_else(|| self.exec_ctx.as_ref().map(|ctx| ctx.tenant.tenant.clone()))
392            .unwrap_or_else(|| self.config.tenant.clone());
393        let env_raw = ctx
394            .as_ref()
395            .map(|ctx| ctx.env.clone())
396            .unwrap_or_else(|| self.default_env.clone());
397        let tenant_id = TenantId::from_str(&tenant_raw)
398            .with_context(|| format!("invalid tenant id `{tenant_raw}`"))?;
399        let env_id = EnvId::from_str(&env_raw)
400            .unwrap_or_else(|_| EnvId::from_str("local").expect("default env must be valid"));
401        let mut tenant_ctx = TypesTenantCtx::new(env_id, tenant_id);
402        if let Some(exec_ctx) = self.exec_ctx.as_ref() {
403            if let Some(team) = exec_ctx.tenant.team.as_ref() {
404                let team_id =
405                    TeamId::from_str(team).with_context(|| format!("invalid team id `{team}`"))?;
406                tenant_ctx = tenant_ctx.with_team(Some(team_id));
407            }
408            if let Some(user) = exec_ctx.tenant.user.as_ref() {
409                let user_id =
410                    UserId::from_str(user).with_context(|| format!("invalid user id `{user}`"))?;
411                tenant_ctx = tenant_ctx.with_user(Some(user_id));
412            }
413            tenant_ctx = tenant_ctx.with_flow(exec_ctx.flow_id.clone());
414            if let Some(node) = exec_ctx.node_id.as_ref() {
415                tenant_ctx = tenant_ctx.with_node(node.clone());
416            }
417            if let Some(session) = exec_ctx.tenant.correlation_id.as_ref() {
418                tenant_ctx = tenant_ctx.with_session(session.clone());
419            }
420            tenant_ctx.trace_id = exec_ctx.tenant.trace_id.clone();
421        }
422
423        if let Some(ctx) = ctx {
424            if let Some(team) = ctx.team.or(ctx.team_id) {
425                let team_id =
426                    TeamId::from_str(&team).with_context(|| format!("invalid team id `{team}`"))?;
427                tenant_ctx = tenant_ctx.with_team(Some(team_id));
428            }
429            if let Some(user) = ctx.user.or(ctx.user_id) {
430                let user_id =
431                    UserId::from_str(&user).with_context(|| format!("invalid user id `{user}`"))?;
432                tenant_ctx = tenant_ctx.with_user(Some(user_id));
433            }
434            if let Some(flow) = ctx.flow_id {
435                tenant_ctx = tenant_ctx.with_flow(flow);
436            }
437            if let Some(node) = ctx.node_id {
438                tenant_ctx = tenant_ctx.with_node(node);
439            }
440            if let Some(provider) = ctx.provider_id {
441                tenant_ctx = tenant_ctx.with_provider(provider);
442            }
443            if let Some(session) = ctx.session_id {
444                tenant_ctx = tenant_ctx.with_session(session);
445            }
446            tenant_ctx.trace_id = ctx.trace_id;
447        }
448        Ok(tenant_ctx)
449    }
450
451    fn send_http_request(
452        &mut self,
453        req: HttpRequest,
454        opts: Option<HttpRequestOptionsV1_1>,
455        _ctx: Option<HttpTenantCtx>,
456    ) -> Result<HttpResponse, HttpClientError> {
457        if !self.config.http_enabled {
458            return Err(HttpClientError {
459                code: "denied".into(),
460                message: "http client disabled by policy".into(),
461            });
462        }
463
464        let mut mock_state = None;
465        let raw_body = req.body.clone();
466        if let Some(mock) = &self.mocks
467            && let Ok(meta) = HttpMockRequest::new(&req.method, &req.url, raw_body.as_deref())
468        {
469            match mock.http_begin(&meta) {
470                HttpDecision::Mock(response) => {
471                    let headers = response
472                        .headers
473                        .iter()
474                        .map(|(k, v)| (k.clone(), v.clone()))
475                        .collect();
476                    return Ok(HttpResponse {
477                        status: response.status,
478                        headers,
479                        body: response.body.clone().map(|b| b.into_bytes()),
480                    });
481                }
482                HttpDecision::Deny(reason) => {
483                    return Err(HttpClientError {
484                        code: "denied".into(),
485                        message: reason,
486                    });
487                }
488                HttpDecision::Passthrough { record } => {
489                    mock_state = Some((meta, record));
490                }
491            }
492        }
493
494        let method = req.method.parse().unwrap_or(reqwest::Method::GET);
495        let mut builder = self.http_client.request(method, &req.url);
496        for (key, value) in req.headers {
497            if let Ok(header) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
498                && let Ok(header_value) = reqwest::header::HeaderValue::from_str(&value)
499            {
500                builder = builder.header(header, header_value);
501            }
502        }
503
504        if let Some(body) = raw_body.clone() {
505            builder = builder.body(body);
506        }
507
508        if let Some(opts) = opts {
509            if let Some(timeout_ms) = opts.timeout_ms {
510                builder = builder.timeout(Duration::from_millis(timeout_ms as u64));
511            }
512            if opts.allow_insecure == Some(true) {
513                warn!(url = %req.url, "allow-insecure not supported; using default TLS validation");
514            }
515            if let Some(follow_redirects) = opts.follow_redirects
516                && !follow_redirects
517            {
518                warn!(url = %req.url, "follow-redirects=false not supported; using default client behaviour");
519            }
520        }
521
522        let response = match builder.send() {
523            Ok(resp) => resp,
524            Err(err) => {
525                warn!(url = %req.url, error = %err, "http client request failed");
526                return Err(HttpClientError {
527                    code: "unavailable".into(),
528                    message: err.to_string(),
529                });
530            }
531        };
532
533        let status = response.status().as_u16();
534        let headers_vec = response
535            .headers()
536            .iter()
537            .map(|(k, v)| {
538                (
539                    k.as_str().to_string(),
540                    v.to_str().unwrap_or_default().to_string(),
541                )
542            })
543            .collect::<Vec<_>>();
544        let body_bytes = response.bytes().ok().map(|b| b.to_vec());
545
546        if let Some((meta, true)) = mock_state.take()
547            && let Some(mock) = &self.mocks
548        {
549            let recorded = HttpMockResponse::new(
550                status,
551                headers_vec.clone().into_iter().collect(),
552                body_bytes
553                    .as_ref()
554                    .map(|b| String::from_utf8_lossy(b).into_owned()),
555            );
556            mock.http_record(&meta, &recorded);
557        }
558
559        Ok(HttpResponse {
560            status,
561            headers: headers_vec,
562            body: body_bytes,
563        })
564    }
565}
566
567fn canonicalize_wasm_secret_key(raw: &str) -> String {
568    raw.trim()
569        .chars()
570        .map(|ch| {
571            let ch = ch.to_ascii_lowercase();
572            match ch {
573                'a'..='z' | '0'..='9' | '_' => ch,
574                _ => '_',
575            }
576        })
577        .collect()
578}
579
580#[cfg(test)]
581mod canonicalize_tests {
582    use super::canonicalize_wasm_secret_key;
583
584    #[test]
585    fn upper_snake_to_lower_snake() {
586        assert_eq!(
587            canonicalize_wasm_secret_key("TELEGRAM_BOT_TOKEN"),
588            "telegram_bot_token"
589        );
590    }
591
592    #[test]
593    fn trim_and_replace_non_alphanumeric() {
594        assert_eq!(
595            canonicalize_wasm_secret_key("  webex-bot-token  "),
596            "webex_bot_token"
597        );
598    }
599
600    #[test]
601    fn preserve_existing_lower_snake_with_extra_underscores() {
602        assert_eq!(canonicalize_wasm_secret_key("MiXeD__Case"), "mixed__case");
603    }
604}
605
606impl SecretsStoreHost for HostState {
607    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsError> {
608        if provider_core_only::is_enabled() {
609            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
610            return Err(SecretsError::Denied);
611        }
612        if !self.config.secrets_policy.is_allowed(&key) {
613            return Err(SecretsError::Denied);
614        }
615        if let Some(mock) = &self.mocks
616            && let Some(value) = mock.secrets_lookup(&key)
617        {
618            return Ok(Some(value.into_bytes()));
619        }
620        let ctx = self.config.tenant_ctx();
621        let canonical_key = canonicalize_wasm_secret_key(&key);
622        match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
623            Ok(bytes) => Ok(Some(bytes)),
624            Err(err) => {
625                warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
626                Err(SecretsError::NotFound)
627            }
628        }
629    }
630}
631
632impl SecretsStoreHostV1_1 for HostState {
633    fn get(&mut self, key: String) -> Result<Option<Vec<u8>>, SecretsErrorV1_1> {
634        if provider_core_only::is_enabled() {
635            warn!(secret = %key, "provider-core only mode enabled; blocking secrets store");
636            return Err(SecretsErrorV1_1::Denied);
637        }
638        if !self.config.secrets_policy.is_allowed(&key) {
639            return Err(SecretsErrorV1_1::Denied);
640        }
641        if let Some(mock) = &self.mocks
642            && let Some(value) = mock.secrets_lookup(&key)
643        {
644            return Ok(Some(value.into_bytes()));
645        }
646        let ctx = self.config.tenant_ctx();
647        let canonical_key = canonicalize_wasm_secret_key(&key);
648        match read_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key) {
649            Ok(bytes) => Ok(Some(bytes)),
650            Err(err) => {
651                warn!(secret = %key, canonical = %canonical_key, error = %err, "secret lookup failed");
652                Err(SecretsErrorV1_1::NotFound)
653            }
654        }
655    }
656
657    fn put(&mut self, key: String, value: Vec<u8>) {
658        if key.trim().is_empty() {
659            warn!(secret = %key, "secret write blocked: empty key");
660            panic!("secret write denied for key {key}: invalid key");
661        }
662        if provider_core_only::is_enabled() && !self.allows_secret_write_in_provider_core_only() {
663            warn!(
664                secret = %key,
665                component = self.component_ref.as_deref().unwrap_or("<pack>"),
666                "provider-core only mode enabled; blocking secrets store write"
667            );
668            panic!("secret write denied for key {key}: provider-core-only mode");
669        }
670        if !self.config.secrets_policy.is_allowed(&key) {
671            warn!(secret = %key, "secret write denied by bindings policy");
672            panic!("secret write denied for key {key}: policy");
673        }
674        let ctx = self.config.tenant_ctx();
675        let canonical_key = canonicalize_wasm_secret_key(&key);
676        if let Err(err) =
677            write_secret_blocking(&self.secrets, &ctx, &self.pack_id, &canonical_key, &value)
678        {
679            warn!(secret = %key, canonical = %canonical_key, error = %err, "secret write failed");
680            panic!("secret write failed for key {key}");
681        }
682    }
683}
684
685impl HttpClientHost for HostState {
686    fn send(
687        &mut self,
688        req: HttpRequest,
689        ctx: Option<HttpTenantCtx>,
690    ) -> Result<HttpResponse, HttpClientError> {
691        self.send_http_request(req, None, ctx)
692    }
693}
694
695impl HttpClientHostV1_1 for HostState {
696    fn send(
697        &mut self,
698        req: HttpRequestV1_1,
699        opts: Option<HttpRequestOptionsV1_1>,
700        ctx: Option<HttpTenantCtxV1_1>,
701    ) -> Result<HttpResponseV1_1, HttpClientErrorV1_1> {
702        let legacy_req = HttpRequest {
703            method: req.method,
704            url: req.url,
705            headers: req.headers,
706            body: req.body,
707        };
708        let legacy_ctx = ctx.map(|ctx| HttpTenantCtx {
709            env: ctx.env,
710            tenant: ctx.tenant,
711            tenant_id: ctx.tenant_id,
712            team: ctx.team,
713            team_id: ctx.team_id,
714            user: ctx.user,
715            user_id: ctx.user_id,
716            trace_id: ctx.trace_id,
717            correlation_id: ctx.correlation_id,
718            attributes: ctx.attributes,
719            session_id: ctx.session_id,
720            flow_id: ctx.flow_id,
721            node_id: ctx.node_id,
722            provider_id: ctx.provider_id,
723            deadline_ms: ctx.deadline_ms,
724            attempt: ctx.attempt,
725            idempotency_key: ctx.idempotency_key,
726            impersonation: ctx
727                .impersonation
728                .map(|ImpersonationV1_1 { actor_id, reason }| ImpersonationV1_0 {
729                    actor_id,
730                    reason,
731                }),
732        });
733
734        self.send_http_request(legacy_req, opts, legacy_ctx)
735            .map(|resp| HttpResponseV1_1 {
736                status: resp.status,
737                headers: resp.headers,
738                body: resp.body,
739            })
740            .map_err(|err| HttpClientErrorV1_1 {
741                code: err.code,
742                message: err.message,
743            })
744    }
745}
746
747impl StateStoreHost for HostState {
748    fn read(
749        &mut self,
750        key: HostStateKey,
751        ctx: Option<StateTenantCtx>,
752    ) -> Result<Vec<u8>, StateError> {
753        let store = match self.state_store.as_ref() {
754            Some(store) => store.clone(),
755            None => {
756                return Err(StateError {
757                    code: "unavailable".into(),
758                    message: "state store not configured".into(),
759                });
760            }
761        };
762        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
763            Ok(ctx) => ctx,
764            Err(err) => {
765                return Err(StateError {
766                    code: "invalid-ctx".into(),
767                    message: err.to_string(),
768                });
769            }
770        };
771        #[cfg(feature = "fault-injection")]
772        {
773            let exec_ctx = self.exec_ctx.as_ref();
774            let flow_id = exec_ctx
775                .map(|ctx| ctx.flow_id.as_str())
776                .unwrap_or("unknown");
777            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
778            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
779            let fault_ctx = FaultContext {
780                pack_id: self.pack_id.as_str(),
781                flow_id,
782                node_id,
783                attempt,
784            };
785            if let Err(err) = maybe_fail(FaultPoint::StateRead, fault_ctx) {
786                return Err(StateError {
787                    code: "internal".into(),
788                    message: err.to_string(),
789                });
790            }
791        }
792        let key = StoreStateKey::from(key);
793        match store.get_json(&tenant_ctx, STATE_PREFIX, &key, None) {
794            Ok(Some(value)) => Ok(serde_json::to_vec(&value).unwrap_or_else(|_| Vec::new())),
795            Ok(None) => Err(StateError {
796                code: "not_found".into(),
797                message: "state key not found".into(),
798            }),
799            Err(err) => Err(StateError {
800                code: "internal".into(),
801                message: err.to_string(),
802            }),
803        }
804    }
805
806    fn write(
807        &mut self,
808        key: HostStateKey,
809        bytes: Vec<u8>,
810        ctx: Option<StateTenantCtx>,
811    ) -> Result<StateOpAck, StateError> {
812        let store = match self.state_store.as_ref() {
813            Some(store) => store.clone(),
814            None => {
815                return Err(StateError {
816                    code: "unavailable".into(),
817                    message: "state store not configured".into(),
818                });
819            }
820        };
821        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
822            Ok(ctx) => ctx,
823            Err(err) => {
824                return Err(StateError {
825                    code: "invalid-ctx".into(),
826                    message: err.to_string(),
827                });
828            }
829        };
830        #[cfg(feature = "fault-injection")]
831        {
832            let exec_ctx = self.exec_ctx.as_ref();
833            let flow_id = exec_ctx
834                .map(|ctx| ctx.flow_id.as_str())
835                .unwrap_or("unknown");
836            let node_id = exec_ctx.and_then(|ctx| ctx.node_id.as_deref());
837            let attempt = exec_ctx.map(|ctx| ctx.tenant.attempt).unwrap_or(1);
838            let fault_ctx = FaultContext {
839                pack_id: self.pack_id.as_str(),
840                flow_id,
841                node_id,
842                attempt,
843            };
844            if let Err(err) = maybe_fail(FaultPoint::StateWrite, fault_ctx) {
845                return Err(StateError {
846                    code: "internal".into(),
847                    message: err.to_string(),
848                });
849            }
850        }
851        let key = StoreStateKey::from(key);
852        let value = serde_json::from_slice(&bytes)
853            .unwrap_or_else(|_| Value::String(String::from_utf8_lossy(&bytes).to_string()));
854        match store.set_json(&tenant_ctx, STATE_PREFIX, &key, None, &value, None) {
855            Ok(()) => Ok(StateOpAck::Ok),
856            Err(err) => Err(StateError {
857                code: "internal".into(),
858                message: err.to_string(),
859            }),
860        }
861    }
862
863    fn delete(
864        &mut self,
865        key: HostStateKey,
866        ctx: Option<StateTenantCtx>,
867    ) -> Result<StateOpAck, StateError> {
868        let store = match self.state_store.as_ref() {
869            Some(store) => store.clone(),
870            None => {
871                return Err(StateError {
872                    code: "unavailable".into(),
873                    message: "state store not configured".into(),
874                });
875            }
876        };
877        let tenant_ctx = match self.tenant_ctx_from_v1(ctx) {
878            Ok(ctx) => ctx,
879            Err(err) => {
880                return Err(StateError {
881                    code: "invalid-ctx".into(),
882                    message: err.to_string(),
883                });
884            }
885        };
886        let key = StoreStateKey::from(key);
887        match store.del(&tenant_ctx, STATE_PREFIX, &key) {
888            Ok(_) => Ok(StateOpAck::Ok),
889            Err(err) => Err(StateError {
890                code: "internal".into(),
891                message: err.to_string(),
892            }),
893        }
894    }
895}
896
897impl TelemetryLoggerHost for HostState {
898    fn log(
899        &mut self,
900        span: TelemetrySpanContext,
901        fields: Vec<(String, String)>,
902        _ctx: Option<TelemetryTenantCtx>,
903    ) -> Result<TelemetryAck, TelemetryError> {
904        if let Some(mock) = &self.mocks
905            && mock.telemetry_drain(&[("span_json", span.flow_id.as_str())])
906        {
907            return Ok(TelemetryAck::Ok);
908        }
909        let mut map = serde_json::Map::new();
910        for (k, v) in fields {
911            map.insert(k, Value::String(v));
912        }
913        tracing::info!(
914            tenant = %span.tenant,
915            flow_id = %span.flow_id,
916            node = ?span.node_id,
917            provider = %span.provider,
918            fields = %serde_json::Value::Object(map.clone()),
919            "telemetry log from pack"
920        );
921        Ok(TelemetryAck::Ok)
922    }
923}
924
925impl RunnerHostHttp for HostState {
926    fn request(
927        &mut self,
928        method: String,
929        url: String,
930        headers: Vec<String>,
931        body: Option<Vec<u8>>,
932    ) -> Result<Vec<u8>, String> {
933        let req = HttpRequest {
934            method,
935            url,
936            headers: headers
937                .chunks(2)
938                .filter_map(|chunk| {
939                    if chunk.len() == 2 {
940                        Some((chunk[0].clone(), chunk[1].clone()))
941                    } else {
942                        None
943                    }
944                })
945                .collect(),
946            body,
947        };
948        match HttpClientHost::send(self, req, None) {
949            Ok(resp) => Ok(resp.body.unwrap_or_default()),
950            Err(err) => Err(err.message),
951        }
952    }
953}
954
955impl RunnerHostKv for HostState {
956    fn get(&mut self, _ns: String, _key: String) -> Option<String> {
957        None
958    }
959
960    fn put(&mut self, _ns: String, _key: String, _val: String) {}
961}
962
963enum ManifestLoad {
964    New {
965        manifest: Box<greentic_types::PackManifest>,
966        flows: PackFlows,
967    },
968    Legacy {
969        manifest: Box<legacy_pack::PackManifest>,
970        flows: PackFlows,
971    },
972}
973
974fn load_manifest_and_flows(path: &Path) -> Result<ManifestLoad> {
975    let mut archive = ZipArchive::new(File::open(path)?)
976        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
977    let bytes = read_entry(&mut archive, "manifest.cbor")
978        .with_context(|| format!("missing manifest.cbor in {}", path.display()))?;
979    match decode_pack_manifest(&bytes) {
980        Ok(manifest) => {
981            let cache = PackFlows::from_manifest(manifest.clone());
982            Ok(ManifestLoad::New {
983                manifest: Box::new(manifest),
984                flows: cache,
985            })
986        }
987        Err(err) => {
988            tracing::debug!(
989                error = %err,
990                pack = %path.display(),
991                "decode_pack_manifest failed for archive; falling back to legacy manifest"
992            );
993            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
994                .context("failed to decode legacy pack manifest from manifest.cbor")?;
995            let flows = load_legacy_flows_from_archive(&mut archive, path, &legacy)?;
996            Ok(ManifestLoad::Legacy {
997                manifest: Box::new(legacy),
998                flows,
999            })
1000        }
1001    }
1002}
1003
1004fn load_manifest_and_flows_from_dir(root: &Path) -> Result<ManifestLoad> {
1005    let manifest_path = root.join("manifest.cbor");
1006    let bytes = std::fs::read(&manifest_path)
1007        .with_context(|| format!("missing manifest.cbor in {}", root.display()))?;
1008    match decode_pack_manifest(&bytes) {
1009        Ok(manifest) => {
1010            let cache = PackFlows::from_manifest(manifest.clone());
1011            Ok(ManifestLoad::New {
1012                manifest: Box::new(manifest),
1013                flows: cache,
1014            })
1015        }
1016        Err(err) => {
1017            tracing::debug!(
1018                error = %err,
1019                pack = %root.display(),
1020                "decode_pack_manifest failed for materialized pack; trying legacy manifest"
1021            );
1022            let legacy: legacy_pack::PackManifest = serde_cbor::from_slice(&bytes)
1023                .context("failed to decode legacy pack manifest from manifest.cbor")?;
1024            let flows = load_legacy_flows_from_dir(root, &legacy)?;
1025            Ok(ManifestLoad::Legacy {
1026                manifest: Box::new(legacy),
1027                flows,
1028            })
1029        }
1030    }
1031}
1032
1033fn load_legacy_flows_from_dir(
1034    root: &Path,
1035    manifest: &legacy_pack::PackManifest,
1036) -> Result<PackFlows> {
1037    build_legacy_flows(manifest, |rel_path| {
1038        let path = root.join(rel_path);
1039        std::fs::read(&path).with_context(|| format!("missing flow json {}", path.display()))
1040    })
1041}
1042
1043fn load_legacy_flows_from_archive(
1044    archive: &mut ZipArchive<File>,
1045    pack_path: &Path,
1046    manifest: &legacy_pack::PackManifest,
1047) -> Result<PackFlows> {
1048    build_legacy_flows(manifest, |rel_path| {
1049        read_entry(archive, rel_path)
1050            .with_context(|| format!("missing flow json {} in {}", rel_path, pack_path.display()))
1051    })
1052}
1053
1054fn build_legacy_flows(
1055    manifest: &legacy_pack::PackManifest,
1056    mut read_json: impl FnMut(&str) -> Result<Vec<u8>>,
1057) -> Result<PackFlows> {
1058    let mut flows = HashMap::new();
1059    let mut descriptors = Vec::new();
1060
1061    for entry in &manifest.flows {
1062        let bytes = read_json(&entry.file_json)
1063            .with_context(|| format!("missing flow json {}", entry.file_json))?;
1064        let doc = parse_flow_doc_with_legacy_aliases(&bytes, &entry.file_json)?;
1065        let normalized = normalize_flow_doc(doc);
1066        let flow_ir = flow_doc_to_ir(normalized)?;
1067        let flow = flow_ir_to_flow(flow_ir)?;
1068
1069        descriptors.push(FlowDescriptor {
1070            id: entry.id.clone(),
1071            flow_type: entry.kind.clone(),
1072            pack_id: manifest.meta.pack_id.clone(),
1073            profile: manifest.meta.pack_id.clone(),
1074            version: manifest.meta.version.to_string(),
1075            description: None,
1076        });
1077        flows.insert(entry.id.clone(), flow);
1078    }
1079
1080    let mut entry_flows = manifest.meta.entry_flows.clone();
1081    if entry_flows.is_empty() {
1082        entry_flows = manifest.flows.iter().map(|f| f.id.clone()).collect();
1083    }
1084    let metadata = PackMetadata {
1085        pack_id: manifest.meta.pack_id.clone(),
1086        version: manifest.meta.version.to_string(),
1087        entry_flows,
1088        secret_requirements: Vec::new(),
1089    };
1090
1091    Ok(PackFlows {
1092        descriptors,
1093        flows,
1094        metadata,
1095    })
1096}
1097
1098fn parse_flow_doc_with_legacy_aliases(bytes: &[u8], path: &str) -> Result<FlowDoc> {
1099    let mut value: Value = serde_json::from_slice(bytes)
1100        .with_context(|| format!("failed to decode flow doc {}", path))?;
1101    if let Some(map) = value.as_object_mut()
1102        && !map.contains_key("type")
1103        && let Some(flow_type) = map.remove("flow_type")
1104    {
1105        map.insert("type".to_string(), flow_type);
1106    }
1107    serde_json::from_value(value).with_context(|| format!("failed to decode flow doc {}", path))
1108}
1109
1110pub struct ComponentState {
1111    pub host: HostState,
1112    wasi_ctx: WasiCtx,
1113    resource_table: ResourceTable,
1114}
1115
1116impl ComponentState {
1117    pub fn new(host: HostState, policy: Arc<RunnerWasiPolicy>) -> Result<Self> {
1118        let wasi_ctx = policy
1119            .instantiate()
1120            .context("failed to build WASI context")?;
1121        Ok(Self {
1122            host,
1123            wasi_ctx,
1124            resource_table: ResourceTable::new(),
1125        })
1126    }
1127
1128    fn host_mut(&mut self) -> &mut HostState {
1129        &mut self.host
1130    }
1131
1132    fn should_cancel_host(&mut self) -> bool {
1133        false
1134    }
1135
1136    fn yield_now_host(&mut self) {
1137        // no-op cooperative yield
1138    }
1139}
1140
1141impl component_api::v0_4::greentic::component::control::Host for ComponentState {
1142    fn should_cancel(&mut self) -> bool {
1143        self.should_cancel_host()
1144    }
1145
1146    fn yield_now(&mut self) {
1147        self.yield_now_host();
1148    }
1149}
1150
1151impl component_api::v0_5::greentic::component::control::Host for ComponentState {
1152    fn should_cancel(&mut self) -> bool {
1153        self.should_cancel_host()
1154    }
1155
1156    fn yield_now(&mut self) {
1157        self.yield_now_host();
1158    }
1159}
1160
1161fn add_component_control_instance(
1162    linker: &mut Linker<ComponentState>,
1163    name: &str,
1164) -> wasmtime::Result<()> {
1165    let mut inst = linker.instance(name)?;
1166    inst.func_wrap(
1167        "should-cancel",
1168        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1169            let host = caller.data_mut();
1170            Ok((host.should_cancel_host(),))
1171        },
1172    )?;
1173    inst.func_wrap(
1174        "yield-now",
1175        |mut caller: StoreContextMut<'_, ComponentState>, (): ()| {
1176            let host = caller.data_mut();
1177            host.yield_now_host();
1178            Ok(())
1179        },
1180    )?;
1181    Ok(())
1182}
1183
1184fn add_component_control_to_linker(linker: &mut Linker<ComponentState>) -> wasmtime::Result<()> {
1185    add_component_control_instance(linker, "greentic:component/control@0.5.0")?;
1186    add_component_control_instance(linker, "greentic:component/control@0.4.0")?;
1187    Ok(())
1188}
1189
1190pub fn register_all(linker: &mut Linker<ComponentState>, allow_state_store: bool) -> Result<()> {
1191    add_wasi_to_linker(linker)?;
1192    add_all_v1_to_linker(
1193        linker,
1194        HostFns {
1195            http_client_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1196            http_client: Some(|state: &mut ComponentState| state.host_mut()),
1197            oauth_broker: None,
1198            runner_host_http: Some(|state: &mut ComponentState| state.host_mut()),
1199            runner_host_kv: Some(|state: &mut ComponentState| state.host_mut()),
1200            telemetry_logger: Some(|state: &mut ComponentState| state.host_mut()),
1201            state_store: allow_state_store.then_some(|state: &mut ComponentState| state.host_mut()),
1202            secrets_store_v1_1: Some(|state: &mut ComponentState| state.host_mut()),
1203            secrets_store: None,
1204        },
1205    )?;
1206    add_http_client_client_world_aliases(linker)?;
1207    Ok(())
1208}
1209
1210fn add_http_client_client_world_aliases(linker: &mut Linker<ComponentState>) -> Result<()> {
1211    let mut inst_v1_1 = linker.instance("greentic:http/client@1.1.0")?;
1212    inst_v1_1.func_wrap(
1213        "send",
1214        move |mut caller: StoreContextMut<'_, ComponentState>,
1215              (req, opts, ctx): (
1216            http_client_client_alias::Request,
1217            Option<http_client_client_alias::RequestOptions>,
1218            Option<http_client_client_alias::TenantCtx>,
1219        )| {
1220            let host = caller.data_mut().host_mut();
1221            let result = HttpClientHostV1_1::send(
1222                host,
1223                alias_request_to_host(req),
1224                opts.map(alias_request_options_to_host),
1225                ctx.map(alias_tenant_ctx_to_host),
1226            );
1227            Ok((match result {
1228                Ok(resp) => Ok(alias_response_from_host(resp)),
1229                Err(err) => Err(alias_error_from_host(err)),
1230            },))
1231        },
1232    )?;
1233    let mut inst_v1_0 = linker.instance("greentic:http/client@1.0.0")?;
1234    inst_v1_0.func_wrap(
1235        "send",
1236        move |mut caller: StoreContextMut<'_, ComponentState>,
1237              (req, ctx): (
1238            host_http_client::Request,
1239            Option<host_http_client::TenantCtx>,
1240        )| {
1241            let host = caller.data_mut().host_mut();
1242            let result = HttpClientHost::send(host, req, ctx);
1243            Ok((result,))
1244        },
1245    )?;
1246    Ok(())
1247}
1248
1249fn alias_request_to_host(req: http_client_client_alias::Request) -> host_http_client::RequestV1_1 {
1250    host_http_client::RequestV1_1 {
1251        method: req.method,
1252        url: req.url,
1253        headers: req.headers,
1254        body: req.body,
1255    }
1256}
1257
1258fn alias_request_options_to_host(
1259    opts: http_client_client_alias::RequestOptions,
1260) -> host_http_client::RequestOptionsV1_1 {
1261    host_http_client::RequestOptionsV1_1 {
1262        timeout_ms: opts.timeout_ms,
1263        allow_insecure: opts.allow_insecure,
1264        follow_redirects: opts.follow_redirects,
1265    }
1266}
1267
1268fn alias_tenant_ctx_to_host(
1269    ctx: http_client_client_alias::TenantCtx,
1270) -> host_http_client::TenantCtxV1_1 {
1271    host_http_client::TenantCtxV1_1 {
1272        env: ctx.env,
1273        tenant: ctx.tenant,
1274        tenant_id: ctx.tenant_id,
1275        team: ctx.team,
1276        team_id: ctx.team_id,
1277        user: ctx.user,
1278        user_id: ctx.user_id,
1279        trace_id: ctx.trace_id,
1280        correlation_id: ctx.correlation_id,
1281        attributes: ctx.attributes,
1282        session_id: ctx.session_id,
1283        flow_id: ctx.flow_id,
1284        node_id: ctx.node_id,
1285        provider_id: ctx.provider_id,
1286        deadline_ms: ctx.deadline_ms,
1287        attempt: ctx.attempt,
1288        idempotency_key: ctx.idempotency_key,
1289        impersonation: ctx.impersonation.map(|imp| ImpersonationV1_1 {
1290            actor_id: imp.actor_id,
1291            reason: imp.reason,
1292        }),
1293    }
1294}
1295
1296fn alias_response_from_host(
1297    resp: host_http_client::ResponseV1_1,
1298) -> http_client_client_alias::Response {
1299    http_client_client_alias::Response {
1300        status: resp.status,
1301        headers: resp.headers,
1302        body: resp.body,
1303    }
1304}
1305
1306fn alias_error_from_host(
1307    err: host_http_client::HttpClientErrorV1_1,
1308) -> http_client_client_alias::HostError {
1309    http_client_client_alias::HostError {
1310        code: err.code,
1311        message: err.message,
1312    }
1313}
1314
1315impl OAuthHostContext for ComponentState {
1316    fn tenant_id(&self) -> &str {
1317        &self.host.config.tenant
1318    }
1319
1320    fn env(&self) -> &str {
1321        &self.host.default_env
1322    }
1323
1324    fn oauth_broker_host(&mut self) -> &mut OAuthBrokerHost {
1325        &mut self.host.oauth_host
1326    }
1327
1328    fn oauth_config(&self) -> Option<&OAuthBrokerConfig> {
1329        self.host.oauth_config.as_ref()
1330    }
1331}
1332
1333impl WasiView for ComponentState {
1334    fn ctx(&mut self) -> WasiCtxView<'_> {
1335        WasiCtxView {
1336            ctx: &mut self.wasi_ctx,
1337            table: &mut self.resource_table,
1338        }
1339    }
1340}
1341
1342#[allow(unsafe_code)]
1343unsafe impl Send for ComponentState {}
1344#[allow(unsafe_code)]
1345unsafe impl Sync for ComponentState {}
1346
1347impl PackRuntime {
1348    fn allows_state_store(&self, component_ref: &str) -> bool {
1349        if self.state_store.is_none() {
1350            return false;
1351        }
1352        if !self.config.state_store_policy.allow {
1353            return false;
1354        }
1355        let Some(manifest) = self.component_manifests.get(component_ref) else {
1356            return false;
1357        };
1358        manifest
1359            .capabilities
1360            .host
1361            .state
1362            .as_ref()
1363            .map(|caps| caps.read || caps.write)
1364            .unwrap_or(false)
1365    }
1366
1367    pub fn contains_component(&self, component_ref: &str) -> bool {
1368        self.components.contains_key(component_ref)
1369    }
1370
1371    #[allow(clippy::too_many_arguments)]
1372    pub async fn load(
1373        path: impl AsRef<Path>,
1374        config: Arc<HostConfig>,
1375        mocks: Option<Arc<MockLayer>>,
1376        archive_source: Option<&Path>,
1377        session_store: Option<DynSessionStore>,
1378        state_store: Option<DynStateStore>,
1379        wasi_policy: Arc<RunnerWasiPolicy>,
1380        secrets: DynSecretsManager,
1381        oauth_config: Option<OAuthBrokerConfig>,
1382        verify_archive: bool,
1383        component_resolution: ComponentResolution,
1384    ) -> Result<Self> {
1385        let path = path.as_ref();
1386        let (_pack_root, safe_path) = normalize_pack_path(path)?;
1387        let path_meta = std::fs::metadata(&safe_path).ok();
1388        let is_dir = path_meta
1389            .as_ref()
1390            .map(|meta| meta.is_dir())
1391            .unwrap_or(false);
1392        let is_component = !is_dir
1393            && safe_path
1394                .extension()
1395                .and_then(|ext| ext.to_str())
1396                .map(|ext| ext.eq_ignore_ascii_case("wasm"))
1397                .unwrap_or(false);
1398        let archive_hint_path = if let Some(source) = archive_source {
1399            let (_, normalized) = normalize_pack_path(source)?;
1400            Some(normalized)
1401        } else if is_component || is_dir {
1402            None
1403        } else {
1404            Some(safe_path.clone())
1405        };
1406        let archive_hint = archive_hint_path.as_deref();
1407        if verify_archive {
1408            if let Some(verify_target) = archive_hint.and_then(|p| {
1409                std::fs::metadata(p)
1410                    .ok()
1411                    .filter(|meta| meta.is_file())
1412                    .map(|_| p)
1413            }) {
1414                verify::verify_pack(verify_target).await?;
1415                tracing::info!(pack_path = %verify_target.display(), "pack verification complete");
1416            } else {
1417                tracing::debug!("skipping archive verification (no archive source)");
1418            }
1419        }
1420        let engine = Engine::default();
1421        let engine_profile =
1422            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
1423        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
1424        let mut metadata = PackMetadata::fallback(&safe_path);
1425        let mut manifest = None;
1426        let mut legacy_manifest: Option<Box<legacy_pack::PackManifest>> = None;
1427        let mut flows = None;
1428        let materialized_root = component_resolution.materialized_root.clone().or_else(|| {
1429            if is_dir {
1430                Some(safe_path.clone())
1431            } else {
1432                None
1433            }
1434        });
1435        let (pack_assets_dir, assets_tempdir) =
1436            locate_pack_assets(materialized_root.as_deref(), archive_hint)?;
1437        let setup_yaml_exists = pack_assets_dir
1438            .as_ref()
1439            .map(|dir| dir.join("setup.yaml").is_file())
1440            .unwrap_or(false);
1441        tracing::info!(
1442            pack_root = %safe_path.display(),
1443            assets_setup_yaml_exists = setup_yaml_exists,
1444            "pack unpack metadata"
1445        );
1446
1447        if let Some(root) = materialized_root.as_ref() {
1448            match load_manifest_and_flows_from_dir(root) {
1449                Ok(ManifestLoad::New {
1450                    manifest: m,
1451                    flows: cache,
1452                }) => {
1453                    metadata = cache.metadata.clone();
1454                    manifest = Some(*m);
1455                    flows = Some(cache);
1456                }
1457                Ok(ManifestLoad::Legacy {
1458                    manifest: m,
1459                    flows: cache,
1460                }) => {
1461                    metadata = cache.metadata.clone();
1462                    legacy_manifest = Some(m);
1463                    flows = Some(cache);
1464                }
1465                Err(err) => {
1466                    warn!(error = %err, pack = %root.display(), "failed to parse materialized pack manifest");
1467                }
1468            }
1469        }
1470
1471        if manifest.is_none()
1472            && legacy_manifest.is_none()
1473            && let Some(archive_path) = archive_hint
1474        {
1475            let manifest_load = load_manifest_and_flows(archive_path).with_context(|| {
1476                format!(
1477                    "failed to load manifest.cbor from {}",
1478                    archive_path.display()
1479                )
1480            })?;
1481            match manifest_load {
1482                ManifestLoad::New {
1483                    manifest: m,
1484                    flows: cache,
1485                } => {
1486                    metadata = cache.metadata.clone();
1487                    manifest = Some(*m);
1488                    flows = Some(cache);
1489                }
1490                ManifestLoad::Legacy {
1491                    manifest: m,
1492                    flows: cache,
1493                } => {
1494                    metadata = cache.metadata.clone();
1495                    legacy_manifest = Some(m);
1496                    flows = Some(cache);
1497                }
1498            }
1499        }
1500        #[cfg(feature = "fault-injection")]
1501        {
1502            let fault_ctx = FaultContext {
1503                pack_id: metadata.pack_id.as_str(),
1504                flow_id: "unknown",
1505                node_id: None,
1506                attempt: 1,
1507            };
1508            maybe_fail(FaultPoint::PackResolve, fault_ctx)
1509                .map_err(|err| anyhow!(err.to_string()))?;
1510        }
1511        let mut pack_lock = None;
1512        for root in find_pack_lock_roots(&safe_path, is_dir, archive_hint) {
1513            pack_lock = load_pack_lock(&root)?;
1514            if pack_lock.is_some() {
1515                break;
1516            }
1517        }
1518        let component_sources_payload = if pack_lock.is_none() {
1519            if let Some(manifest) = manifest.as_ref() {
1520                manifest
1521                    .get_component_sources_v1()
1522                    .context("invalid component sources extension")?
1523            } else {
1524                None
1525            }
1526        } else {
1527            None
1528        };
1529        let component_sources = if let Some(lock) = pack_lock.as_ref() {
1530            Some(component_sources_table_from_pack_lock(
1531                lock,
1532                component_resolution.allow_missing_hash,
1533            )?)
1534        } else {
1535            component_sources_table(component_sources_payload.as_ref())?
1536        };
1537        let components = if is_component {
1538            let wasm_bytes = fs::read(&safe_path).await?;
1539            metadata = PackMetadata::from_wasm(&wasm_bytes)
1540                .unwrap_or_else(|| PackMetadata::fallback(&safe_path));
1541            let name = safe_path
1542                .file_stem()
1543                .map(|s| s.to_string_lossy().to_string())
1544                .unwrap_or_else(|| "component".to_string());
1545            let component = compile_component_with_cache(&cache, &engine, None, wasm_bytes).await?;
1546            let mut map = HashMap::new();
1547            map.insert(
1548                name.clone(),
1549                PackComponent {
1550                    name,
1551                    version: metadata.version.clone(),
1552                    component,
1553                },
1554            );
1555            map
1556        } else {
1557            let specs = component_specs(
1558                manifest.as_ref(),
1559                legacy_manifest.as_deref(),
1560                component_sources_payload.as_ref(),
1561                pack_lock.as_ref(),
1562            );
1563            if specs.is_empty() {
1564                HashMap::new()
1565            } else {
1566                let mut loaded = HashMap::new();
1567                let mut missing: HashSet<String> =
1568                    specs.iter().map(|spec| spec.id.clone()).collect();
1569                let mut searched = Vec::new();
1570
1571                if !component_resolution.overrides.is_empty() {
1572                    load_components_from_overrides(
1573                        &cache,
1574                        &engine,
1575                        &component_resolution.overrides,
1576                        &specs,
1577                        &mut missing,
1578                        &mut loaded,
1579                    )
1580                    .await?;
1581                    searched.push("override map".to_string());
1582                }
1583
1584                if let Some(component_sources) = component_sources.as_ref() {
1585                    load_components_from_sources(
1586                        &cache,
1587                        &engine,
1588                        component_sources,
1589                        &component_resolution,
1590                        &specs,
1591                        &mut missing,
1592                        &mut loaded,
1593                        materialized_root.as_deref(),
1594                        archive_hint,
1595                    )
1596                    .await?;
1597                    searched.push(format!("extension {}", EXT_COMPONENT_SOURCES_V1));
1598                }
1599
1600                if let Some(root) = materialized_root.as_ref() {
1601                    load_components_from_dir(
1602                        &cache,
1603                        &engine,
1604                        root,
1605                        &specs,
1606                        &mut missing,
1607                        &mut loaded,
1608                    )
1609                    .await?;
1610                    searched.push(format!("components dir {}", root.display()));
1611                }
1612
1613                if let Some(archive_path) = archive_hint {
1614                    load_components_from_archive(
1615                        &cache,
1616                        &engine,
1617                        archive_path,
1618                        &specs,
1619                        &mut missing,
1620                        &mut loaded,
1621                    )
1622                    .await?;
1623                    searched.push(format!("archive {}", archive_path.display()));
1624                }
1625
1626                if !missing.is_empty() {
1627                    let missing_list = missing.into_iter().collect::<Vec<_>>().join(", ");
1628                    let sources = if searched.is_empty() {
1629                        "no component sources".to_string()
1630                    } else {
1631                        searched.join(", ")
1632                    };
1633                    bail!(
1634                        "components missing: {}; looked in {}",
1635                        missing_list,
1636                        sources
1637                    );
1638                }
1639
1640                loaded
1641            }
1642        };
1643        let http_client = Arc::clone(&HTTP_CLIENT);
1644        let mut component_manifests = HashMap::new();
1645        if let Some(manifest) = manifest.as_ref() {
1646            for component in &manifest.components {
1647                component_manifests.insert(component.id.as_str().to_string(), component.clone());
1648            }
1649        }
1650        let mut pack_policy = (*wasi_policy).clone();
1651        if let Some(dir) = pack_assets_dir {
1652            tracing::debug!(path = %dir.display(), "preopening pack assets directory for WASI /assets");
1653            pack_policy =
1654                pack_policy.with_preopen(PreopenSpec::new(dir, "/assets").read_only(true));
1655        }
1656        let wasi_policy = Arc::new(pack_policy);
1657        Ok(Self {
1658            path: safe_path,
1659            archive_path: archive_hint.map(Path::to_path_buf),
1660            config,
1661            engine,
1662            metadata,
1663            manifest,
1664            legacy_manifest,
1665            component_manifests,
1666            mocks,
1667            flows,
1668            components,
1669            http_client,
1670            pre_cache: Mutex::new(HashMap::new()),
1671            session_store,
1672            state_store,
1673            wasi_policy,
1674            assets_tempdir,
1675            provider_registry: RwLock::new(None),
1676            secrets,
1677            oauth_config,
1678            cache,
1679        })
1680    }
1681
1682    pub async fn list_flows(&self) -> Result<Vec<FlowDescriptor>> {
1683        if let Some(cache) = &self.flows {
1684            return Ok(cache.descriptors.clone());
1685        }
1686        if let Some(manifest) = &self.manifest {
1687            let descriptors = manifest
1688                .flows
1689                .iter()
1690                .map(|flow| FlowDescriptor {
1691                    id: flow.id.as_str().to_string(),
1692                    flow_type: flow_kind_to_str(flow.kind).to_string(),
1693                    pack_id: manifest.pack_id.as_str().to_string(),
1694                    profile: manifest.pack_id.as_str().to_string(),
1695                    version: manifest.version.to_string(),
1696                    description: None,
1697                })
1698                .collect();
1699            return Ok(descriptors);
1700        }
1701        Ok(Vec::new())
1702    }
1703
1704    #[allow(dead_code)]
1705    pub async fn run_flow(
1706        &self,
1707        flow_id: &str,
1708        input: serde_json::Value,
1709    ) -> Result<serde_json::Value> {
1710        let pack = Arc::new(
1711            PackRuntime::load(
1712                &self.path,
1713                Arc::clone(&self.config),
1714                self.mocks.clone(),
1715                self.archive_path.as_deref(),
1716                self.session_store.clone(),
1717                self.state_store.clone(),
1718                Arc::clone(&self.wasi_policy),
1719                self.secrets.clone(),
1720                self.oauth_config.clone(),
1721                false,
1722                ComponentResolution::default(),
1723            )
1724            .await?,
1725        );
1726
1727        let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&self.config)).await?;
1728        let retry_config = self.config.retry_config().into();
1729        let mocks = pack.mocks.as_deref();
1730        let tenant = self.config.tenant.as_str();
1731
1732        let ctx = FlowContext {
1733            tenant,
1734            pack_id: pack.metadata().pack_id.as_str(),
1735            flow_id,
1736            node_id: None,
1737            tool: None,
1738            action: None,
1739            session_id: None,
1740            provider_id: None,
1741            retry_config,
1742            attempt: 1,
1743            observer: None,
1744            mocks,
1745        };
1746
1747        let execution = engine.execute(ctx, input).await?;
1748        match execution.status {
1749            FlowStatus::Completed => Ok(execution.output),
1750            FlowStatus::Waiting(wait) => Ok(serde_json::json!({
1751                "status": "pending",
1752                "reason": wait.reason,
1753                "resume": wait.snapshot,
1754                "response": execution.output,
1755            })),
1756        }
1757    }
1758
1759    pub async fn invoke_component(
1760        &self,
1761        component_ref: &str,
1762        ctx: ComponentExecCtx,
1763        operation: &str,
1764        _config_json: Option<String>,
1765        input_json: String,
1766    ) -> Result<Value> {
1767        let pack_component = self
1768            .components
1769            .get(component_ref)
1770            .with_context(|| format!("component '{component_ref}' not found in pack"))?;
1771        let engine = self.engine.clone();
1772        let config = Arc::clone(&self.config);
1773        let http_client = Arc::clone(&self.http_client);
1774        let mocks = self.mocks.clone();
1775        let session_store = self.session_store.clone();
1776        let state_store = self.state_store.clone();
1777        let secrets = Arc::clone(&self.secrets);
1778        let oauth_config = self.oauth_config.clone();
1779        let wasi_policy = Arc::clone(&self.wasi_policy);
1780        let pack_id = self.metadata().pack_id.clone();
1781        let allow_state_store = self.allows_state_store(component_ref);
1782        let component = pack_component.component.clone();
1783        let component_ref_owned = component_ref.to_string();
1784        let operation_owned = operation.to_string();
1785        let input_owned = input_json;
1786        let ctx_owned = ctx;
1787
1788        run_on_wasi_thread("component.invoke", move || {
1789            let mut linker = Linker::new(&engine);
1790            register_all(&mut linker, allow_state_store)?;
1791            add_component_control_to_linker(&mut linker)?;
1792
1793            let host_state = HostState::new(
1794                pack_id.clone(),
1795                config,
1796                http_client,
1797                mocks,
1798                session_store,
1799                state_store,
1800                secrets,
1801                oauth_config,
1802                Some(ctx_owned.clone()),
1803                Some(component_ref_owned.clone()),
1804                false,
1805            )?;
1806            let store_state = ComponentState::new(host_state, wasi_policy)?;
1807            let mut store = wasmtime::Store::new(&engine, store_state);
1808
1809            let invoke_result = HostState::instantiate_component_result(
1810                &mut linker,
1811                &mut store,
1812                &component,
1813                &ctx_owned,
1814                &operation_owned,
1815                &input_owned,
1816            )?;
1817            HostState::convert_invoke_result(invoke_result)
1818        })
1819    }
1820
1821    pub fn resolve_provider(
1822        &self,
1823        provider_id: Option<&str>,
1824        provider_type: Option<&str>,
1825    ) -> Result<ProviderBinding> {
1826        let registry = self.provider_registry()?;
1827        registry.resolve(provider_id, provider_type)
1828    }
1829
1830    pub async fn invoke_provider(
1831        &self,
1832        binding: &ProviderBinding,
1833        ctx: ComponentExecCtx,
1834        op: &str,
1835        input_json: Vec<u8>,
1836    ) -> Result<Value> {
1837        let component_ref_owned = binding.component_ref.clone();
1838        let pack_component = self.components.get(&component_ref_owned).with_context(|| {
1839            format!("provider component '{component_ref_owned}' not found in pack")
1840        })?;
1841        let component = pack_component.component.clone();
1842
1843        let engine = self.engine.clone();
1844        let config = Arc::clone(&self.config);
1845        let http_client = Arc::clone(&self.http_client);
1846        let mocks = self.mocks.clone();
1847        let session_store = self.session_store.clone();
1848        let state_store = self.state_store.clone();
1849        let secrets = Arc::clone(&self.secrets);
1850        let oauth_config = self.oauth_config.clone();
1851        let wasi_policy = Arc::clone(&self.wasi_policy);
1852        let pack_id = self.metadata().pack_id.clone();
1853        let allow_state_store = self.allows_state_store(&component_ref_owned);
1854        let input_owned = input_json;
1855        let op_owned = op.to_string();
1856        let ctx_owned = ctx;
1857        let world = binding.world.clone();
1858
1859        run_on_wasi_thread("provider.invoke", move || {
1860            let mut linker = Linker::new(&engine);
1861            register_all(&mut linker, allow_state_store)?;
1862            add_component_control_to_linker(&mut linker)?;
1863            let mut pre_instance = Some(linker.instantiate_pre(component.as_ref())?);
1864            let host_state = HostState::new(
1865                pack_id.clone(),
1866                config,
1867                http_client,
1868                mocks,
1869                session_store,
1870                state_store,
1871                secrets,
1872                oauth_config,
1873                Some(ctx_owned.clone()),
1874                Some(component_ref_owned.clone()),
1875                true,
1876            )?;
1877            let store_state = ComponentState::new(host_state, wasi_policy)?;
1878            let mut store = wasmtime::Store::new(&engine, store_state);
1879            let use_schema_core =
1880                world.contains("provider-schema-core") || world.contains("provider/schema-core");
1881            let result = if use_schema_core {
1882                let pre_instance = pre_instance
1883                    .take()
1884                    .ok_or_else(|| anyhow!("provider pre_instance already consumed"))?;
1885                let pre: SchemaSchemaCorePre<ComponentState> =
1886                    SchemaSchemaCorePre::new(pre_instance)?;
1887                let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
1888                let provider = bindings.greentic_provider_schema_core_schema_core_api();
1889                provider.call_invoke(&mut store, &op_owned, &input_owned)?
1890            } else {
1891                let pre_instance = pre_instance
1892                    .take()
1893                    .ok_or_else(|| anyhow!("provider pre_instance already consumed"))?;
1894                let pre: LegacySchemaCorePre<ComponentState> =
1895                    LegacySchemaCorePre::new(pre_instance)?;
1896                let bindings = block_on(async { pre.instantiate_async(&mut store).await })?;
1897                let provider = bindings.greentic_provider_core_schema_core_api();
1898                provider.call_invoke(&mut store, &op_owned, &input_owned)?
1899            };
1900            deserialize_json_bytes(result)
1901        })
1902    }
1903
1904    pub(crate) fn provider_registry(&self) -> Result<ProviderRegistry> {
1905        if let Some(registry) = self.provider_registry.read().clone() {
1906            return Ok(registry);
1907        }
1908        let manifest = self
1909            .manifest
1910            .as_ref()
1911            .context("pack manifest required for provider resolution")?;
1912        let env = std::env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string());
1913        let registry = ProviderRegistry::new(
1914            manifest,
1915            self.state_store.clone(),
1916            &self.config.tenant,
1917            &env,
1918        )?;
1919        *self.provider_registry.write() = Some(registry.clone());
1920        Ok(registry)
1921    }
1922
1923    pub(crate) fn provider_registry_optional(&self) -> Result<Option<ProviderRegistry>> {
1924        if self.manifest.is_none() {
1925            return Ok(None);
1926        }
1927        Ok(Some(self.provider_registry()?))
1928    }
1929
1930    pub fn load_flow(&self, flow_id: &str) -> Result<Flow> {
1931        if let Some(cache) = &self.flows {
1932            return cache
1933                .flows
1934                .get(flow_id)
1935                .cloned()
1936                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in pack"));
1937        }
1938        if let Some(manifest) = &self.manifest {
1939            let entry = manifest
1940                .flows
1941                .iter()
1942                .find(|f| f.id.as_str() == flow_id)
1943                .ok_or_else(|| anyhow!("flow '{flow_id}' not found in manifest"))?;
1944            return Ok(entry.flow.clone());
1945        }
1946        bail!("flow '{flow_id}' not available (pack exports disabled)")
1947    }
1948
1949    pub fn metadata(&self) -> &PackMetadata {
1950        &self.metadata
1951    }
1952
1953    pub fn required_secrets(&self) -> &[greentic_types::SecretRequirement] {
1954        &self.metadata.secret_requirements
1955    }
1956
1957    pub fn missing_secrets(
1958        &self,
1959        tenant_ctx: &TypesTenantCtx,
1960    ) -> Vec<greentic_types::SecretRequirement> {
1961        let env = tenant_ctx.env.as_str().to_string();
1962        let tenant = tenant_ctx.tenant.as_str().to_string();
1963        let team = tenant_ctx.team.as_ref().map(|t| t.as_str().to_string());
1964        self.required_secrets()
1965            .iter()
1966            .filter(|req| {
1967                // scope must match current context if provided
1968                if let Some(scope) = &req.scope {
1969                    if scope.env != env {
1970                        return false;
1971                    }
1972                    if scope.tenant != tenant {
1973                        return false;
1974                    }
1975                    if let Some(ref team_req) = scope.team
1976                        && team.as_ref() != Some(team_req)
1977                    {
1978                        return false;
1979                    }
1980                }
1981                let ctx = self.config.tenant_ctx();
1982                read_secret_blocking(
1983                    &self.secrets,
1984                    &ctx,
1985                    &self.metadata.pack_id,
1986                    req.key.as_str(),
1987                )
1988                .is_err()
1989            })
1990            .cloned()
1991            .collect()
1992    }
1993
1994    pub fn for_component_test(
1995        components: Vec<(String, PathBuf)>,
1996        flows: HashMap<String, FlowIR>,
1997        pack_id: &str,
1998        config: Arc<HostConfig>,
1999    ) -> Result<Self> {
2000        let engine = Engine::default();
2001        let engine_profile =
2002            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2003        let cache = CacheManager::new(CacheConfig::default(), engine_profile);
2004        let mut component_map = HashMap::new();
2005        for (name, path) in components {
2006            if !path.exists() {
2007                bail!("component artifact missing: {}", path.display());
2008            }
2009            let wasm_bytes = std::fs::read(&path)?;
2010            let component = Arc::new(
2011                Component::from_binary(&engine, &wasm_bytes)
2012                    .with_context(|| format!("failed to compile component {}", path.display()))?,
2013            );
2014            component_map.insert(
2015                name.clone(),
2016                PackComponent {
2017                    name,
2018                    version: "0.0.0".into(),
2019                    component,
2020                },
2021            );
2022        }
2023
2024        let mut flow_map = HashMap::new();
2025        let mut descriptors = Vec::new();
2026        for (id, ir) in flows {
2027            let flow_type = ir.flow_type.clone();
2028            let flow = flow_ir_to_flow(ir)?;
2029            flow_map.insert(id.clone(), flow);
2030            descriptors.push(FlowDescriptor {
2031                id: id.clone(),
2032                flow_type,
2033                pack_id: pack_id.to_string(),
2034                profile: "test".into(),
2035                version: "0.0.0".into(),
2036                description: None,
2037            });
2038        }
2039        let entry_flows = descriptors.iter().map(|flow| flow.id.clone()).collect();
2040        let metadata = PackMetadata {
2041            pack_id: pack_id.to_string(),
2042            version: "0.0.0".into(),
2043            entry_flows,
2044            secret_requirements: Vec::new(),
2045        };
2046        let flows_cache = PackFlows {
2047            descriptors: descriptors.clone(),
2048            flows: flow_map,
2049            metadata: metadata.clone(),
2050        };
2051
2052        Ok(Self {
2053            path: PathBuf::new(),
2054            archive_path: None,
2055            config,
2056            engine,
2057            metadata,
2058            manifest: None,
2059            legacy_manifest: None,
2060            component_manifests: HashMap::new(),
2061            mocks: None,
2062            flows: Some(flows_cache),
2063            components: component_map,
2064            http_client: Arc::clone(&HTTP_CLIENT),
2065            pre_cache: Mutex::new(HashMap::new()),
2066            session_store: None,
2067            state_store: None,
2068            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
2069            assets_tempdir: None,
2070            provider_registry: RwLock::new(None),
2071            secrets: crate::secrets::default_manager()?,
2072            oauth_config: None,
2073            cache,
2074        })
2075    }
2076}
2077
2078fn is_missing_node_export(err: &wasmtime::Error, version: &str) -> bool {
2079    let message = err.to_string();
2080    message.contains("no exported instance named")
2081        && message.contains(&format!("greentic:component/node@{version}"))
2082}
2083
2084struct PackFlows {
2085    descriptors: Vec<FlowDescriptor>,
2086    flows: HashMap<String, Flow>,
2087    metadata: PackMetadata,
2088}
2089
2090const RUNTIME_FLOW_EXTENSION_IDS: [&str; 3] = [
2091    "greentic.pack.runtime_flow",
2092    "greentic.pack.flow_runtime",
2093    "greentic.pack.runtime_flows",
2094];
2095
2096#[derive(Debug, Deserialize)]
2097struct RuntimeFlowBundle {
2098    flows: Vec<RuntimeFlow>,
2099}
2100
2101#[derive(Debug, Deserialize)]
2102struct RuntimeFlow {
2103    id: String,
2104    #[serde(alias = "flow_type")]
2105    kind: FlowKind,
2106    #[serde(default)]
2107    schema_version: Option<String>,
2108    #[serde(default)]
2109    start: Option<String>,
2110    #[serde(default)]
2111    entrypoints: BTreeMap<String, Value>,
2112    nodes: BTreeMap<String, RuntimeNode>,
2113    #[serde(default)]
2114    metadata: Option<FlowMetadata>,
2115}
2116
2117#[derive(Debug, Deserialize)]
2118struct RuntimeNode {
2119    #[serde(alias = "component")]
2120    component_id: String,
2121    #[serde(default, alias = "operation")]
2122    operation_name: Option<String>,
2123    #[serde(default, alias = "payload", alias = "input")]
2124    operation_payload: Value,
2125    #[serde(default)]
2126    routing: Option<Routing>,
2127    #[serde(default)]
2128    telemetry: Option<TelemetryHints>,
2129}
2130
2131fn deserialize_json_bytes(bytes: Vec<u8>) -> Result<Value> {
2132    if bytes.is_empty() {
2133        return Ok(Value::Null);
2134    }
2135    serde_json::from_slice(&bytes).or_else(|_| {
2136        String::from_utf8(bytes)
2137            .map(Value::String)
2138            .map_err(|err| anyhow!(err))
2139    })
2140}
2141
2142impl PackFlows {
2143    fn from_manifest(manifest: greentic_types::PackManifest) -> Self {
2144        if let Some(flows) = flows_from_runtime_extension(&manifest) {
2145            return flows;
2146        }
2147        let descriptors = manifest
2148            .flows
2149            .iter()
2150            .map(|entry| FlowDescriptor {
2151                id: entry.id.as_str().to_string(),
2152                flow_type: flow_kind_to_str(entry.kind).to_string(),
2153                pack_id: manifest.pack_id.as_str().to_string(),
2154                profile: manifest.pack_id.as_str().to_string(),
2155                version: manifest.version.to_string(),
2156                description: None,
2157            })
2158            .collect();
2159        let mut flows = HashMap::new();
2160        for entry in &manifest.flows {
2161            flows.insert(entry.id.as_str().to_string(), entry.flow.clone());
2162        }
2163        Self {
2164            metadata: PackMetadata::from_manifest(&manifest),
2165            descriptors,
2166            flows,
2167        }
2168    }
2169}
2170
2171fn flows_from_runtime_extension(manifest: &greentic_types::PackManifest) -> Option<PackFlows> {
2172    let extensions = manifest.extensions.as_ref()?;
2173    let extension = extensions.iter().find_map(|(key, ext)| {
2174        if RUNTIME_FLOW_EXTENSION_IDS
2175            .iter()
2176            .any(|candidate| candidate == key)
2177        {
2178            Some(ext)
2179        } else {
2180            None
2181        }
2182    })?;
2183    let runtime_flows = match decode_runtime_flow_extension(extension) {
2184        Some(flows) if !flows.is_empty() => flows,
2185        _ => return None,
2186    };
2187
2188    let descriptors = runtime_flows
2189        .iter()
2190        .map(|flow| FlowDescriptor {
2191            id: flow.id.as_str().to_string(),
2192            flow_type: flow_kind_to_str(flow.kind).to_string(),
2193            pack_id: manifest.pack_id.as_str().to_string(),
2194            profile: manifest.pack_id.as_str().to_string(),
2195            version: manifest.version.to_string(),
2196            description: None,
2197        })
2198        .collect::<Vec<_>>();
2199    let flows = runtime_flows
2200        .into_iter()
2201        .map(|flow| (flow.id.as_str().to_string(), flow))
2202        .collect();
2203
2204    Some(PackFlows {
2205        metadata: PackMetadata::from_manifest(manifest),
2206        descriptors,
2207        flows,
2208    })
2209}
2210
2211fn decode_runtime_flow_extension(extension: &ExtensionRef) -> Option<Vec<Flow>> {
2212    let value = match extension.inline.as_ref()? {
2213        ExtensionInline::Other(value) => value.clone(),
2214        _ => return None,
2215    };
2216
2217    if let Ok(bundle) = serde_json::from_value::<RuntimeFlowBundle>(value.clone()) {
2218        return Some(collect_runtime_flows(bundle.flows));
2219    }
2220
2221    if let Ok(flows) = serde_json::from_value::<Vec<RuntimeFlow>>(value.clone()) {
2222        return Some(collect_runtime_flows(flows));
2223    }
2224
2225    if let Ok(flows) = serde_json::from_value::<Vec<Flow>>(value) {
2226        return Some(flows);
2227    }
2228
2229    warn!(
2230        extension = %extension.kind,
2231        version = %extension.version,
2232        "runtime flow extension present but could not be decoded"
2233    );
2234    None
2235}
2236
2237fn collect_runtime_flows(flows: Vec<RuntimeFlow>) -> Vec<Flow> {
2238    flows
2239        .into_iter()
2240        .filter_map(|flow| match runtime_flow_to_flow(flow) {
2241            Ok(flow) => Some(flow),
2242            Err(err) => {
2243                warn!(error = %err, "failed to decode runtime flow");
2244                None
2245            }
2246        })
2247        .collect()
2248}
2249
2250fn runtime_flow_to_flow(runtime: RuntimeFlow) -> Result<Flow> {
2251    let flow_id = FlowId::from_str(&runtime.id)
2252        .with_context(|| format!("invalid flow id `{}`", runtime.id))?;
2253    let mut entrypoints = runtime.entrypoints;
2254    if entrypoints.is_empty()
2255        && let Some(start) = &runtime.start
2256    {
2257        entrypoints.insert("default".into(), Value::String(start.clone()));
2258    }
2259
2260    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
2261    for (id, node) in runtime.nodes {
2262        let node_id = NodeId::from_str(&id).with_context(|| format!("invalid node id `{id}`"))?;
2263        let component_id = ComponentId::from_str(&node.component_id)
2264            .with_context(|| format!("invalid component id `{}`", node.component_id))?;
2265        let component = FlowComponentRef {
2266            id: component_id,
2267            pack_alias: None,
2268            operation: node.operation_name,
2269        };
2270        let routing = node.routing.unwrap_or(Routing::End);
2271        let telemetry = node.telemetry.unwrap_or_default();
2272        nodes.insert(
2273            node_id.clone(),
2274            Node {
2275                id: node_id,
2276                component,
2277                input: InputMapping {
2278                    mapping: node.operation_payload,
2279                },
2280                output: OutputMapping {
2281                    mapping: Value::Null,
2282                },
2283                routing,
2284                telemetry,
2285            },
2286        );
2287    }
2288
2289    Ok(Flow {
2290        schema_version: runtime.schema_version.unwrap_or_else(|| "1.0".to_string()),
2291        id: flow_id,
2292        kind: runtime.kind,
2293        entrypoints,
2294        nodes,
2295        metadata: runtime.metadata.unwrap_or_default(),
2296    })
2297}
2298
2299fn flow_kind_to_str(kind: greentic_types::FlowKind) -> &'static str {
2300    match kind {
2301        greentic_types::FlowKind::Messaging => "messaging",
2302        greentic_types::FlowKind::Event => "event",
2303        greentic_types::FlowKind::ComponentConfig => "component-config",
2304        greentic_types::FlowKind::Job => "job",
2305        greentic_types::FlowKind::Http => "http",
2306    }
2307}
2308
2309fn read_entry(archive: &mut ZipArchive<File>, name: &str) -> Result<Vec<u8>> {
2310    let mut file = archive
2311        .by_name(name)
2312        .with_context(|| format!("entry {name} missing from archive"))?;
2313    let mut buf = Vec::new();
2314    file.read_to_end(&mut buf)?;
2315    Ok(buf)
2316}
2317
2318fn normalize_flow_doc(mut doc: FlowDoc) -> FlowDoc {
2319    for node in doc.nodes.values_mut() {
2320        let Some((component_ref, payload)) = node
2321            .raw
2322            .iter()
2323            .next()
2324            .map(|(key, value)| (key.clone(), value.clone()))
2325        else {
2326            continue;
2327        };
2328        if component_ref.starts_with("emit.") {
2329            node.operation = Some(component_ref);
2330            node.payload = payload;
2331            node.raw.clear();
2332            continue;
2333        }
2334        let (target_component, operation, input, config) =
2335            infer_component_exec(&payload, &component_ref);
2336        let mut payload_obj = serde_json::Map::new();
2337        // component.exec is meta; ensure the payload carries the actual target component.
2338        payload_obj.insert("component".into(), Value::String(target_component));
2339        payload_obj.insert("operation".into(), Value::String(operation));
2340        payload_obj.insert("input".into(), input);
2341        if let Some(cfg) = config {
2342            payload_obj.insert("config".into(), cfg);
2343        }
2344        node.operation = Some("component.exec".to_string());
2345        node.payload = Value::Object(payload_obj);
2346        node.raw.clear();
2347    }
2348    doc
2349}
2350
2351fn infer_component_exec(
2352    payload: &Value,
2353    component_ref: &str,
2354) -> (String, String, Value, Option<Value>) {
2355    let default_op = if component_ref.starts_with("templating.") {
2356        "render"
2357    } else {
2358        "invoke"
2359    }
2360    .to_string();
2361
2362    if let Value::Object(map) = payload {
2363        let op = map
2364            .get("op")
2365            .or_else(|| map.get("operation"))
2366            .and_then(Value::as_str)
2367            .map(|s| s.to_string())
2368            .unwrap_or_else(|| default_op.clone());
2369
2370        let mut input = map.clone();
2371        let config = input.remove("config");
2372        let component = input
2373            .get("component")
2374            .or_else(|| input.get("component_ref"))
2375            .and_then(Value::as_str)
2376            .map(|s| s.to_string())
2377            .unwrap_or_else(|| component_ref.to_string());
2378        input.remove("component");
2379        input.remove("component_ref");
2380        input.remove("op");
2381        input.remove("operation");
2382        return (component, op, Value::Object(input), config);
2383    }
2384
2385    (component_ref.to_string(), default_op, payload.clone(), None)
2386}
2387
2388#[derive(Clone, Debug)]
2389struct ComponentSpec {
2390    id: String,
2391    version: String,
2392    legacy_path: Option<String>,
2393}
2394
2395#[derive(Clone, Debug)]
2396struct ComponentSourceInfo {
2397    digest: Option<String>,
2398    source: ComponentSourceRef,
2399    artifact: ComponentArtifactLocation,
2400    expected_wasm_sha256: Option<String>,
2401    skip_digest_verification: bool,
2402}
2403
2404#[derive(Clone, Debug)]
2405enum ComponentArtifactLocation {
2406    Inline { wasm_path: String },
2407    Remote,
2408}
2409
2410#[derive(Clone, Debug, Deserialize)]
2411struct PackLockV1 {
2412    schema_version: u32,
2413    components: Vec<PackLockComponent>,
2414}
2415
2416#[derive(Clone, Debug, Deserialize)]
2417struct PackLockComponent {
2418    name: String,
2419    #[serde(default, rename = "source_ref")]
2420    source_ref: Option<String>,
2421    #[serde(default, rename = "ref")]
2422    legacy_ref: Option<String>,
2423    #[serde(default)]
2424    component_id: Option<ComponentId>,
2425    #[serde(default)]
2426    bundled: Option<bool>,
2427    #[serde(default, rename = "bundled_path")]
2428    bundled_path: Option<String>,
2429    #[serde(default, rename = "path")]
2430    legacy_path: Option<String>,
2431    #[serde(default)]
2432    wasm_sha256: Option<String>,
2433    #[serde(default, rename = "sha256")]
2434    legacy_sha256: Option<String>,
2435    #[serde(default)]
2436    resolved_digest: Option<String>,
2437    #[serde(default)]
2438    digest: Option<String>,
2439}
2440
2441fn component_specs(
2442    manifest: Option<&greentic_types::PackManifest>,
2443    legacy_manifest: Option<&legacy_pack::PackManifest>,
2444    component_sources: Option<&ComponentSourcesV1>,
2445    pack_lock: Option<&PackLockV1>,
2446) -> Vec<ComponentSpec> {
2447    if let Some(manifest) = manifest {
2448        if !manifest.components.is_empty() {
2449            return manifest
2450                .components
2451                .iter()
2452                .map(|entry| ComponentSpec {
2453                    id: entry.id.as_str().to_string(),
2454                    version: entry.version.to_string(),
2455                    legacy_path: None,
2456                })
2457                .collect();
2458        }
2459        if let Some(lock) = pack_lock {
2460            let mut seen = HashSet::new();
2461            let mut specs = Vec::new();
2462            for entry in &lock.components {
2463                let id = entry
2464                    .component_id
2465                    .as_ref()
2466                    .map(|id| id.as_str())
2467                    .unwrap_or(entry.name.as_str());
2468                if seen.insert(id.to_string()) {
2469                    specs.push(ComponentSpec {
2470                        id: id.to_string(),
2471                        version: "0.0.0".to_string(),
2472                        legacy_path: None,
2473                    });
2474                }
2475            }
2476            return specs;
2477        }
2478        if let Some(sources) = component_sources {
2479            let mut seen = HashSet::new();
2480            let mut specs = Vec::new();
2481            for entry in &sources.components {
2482                let id = entry
2483                    .component_id
2484                    .as_ref()
2485                    .map(|id| id.as_str())
2486                    .unwrap_or(entry.name.as_str());
2487                if seen.insert(id.to_string()) {
2488                    specs.push(ComponentSpec {
2489                        id: id.to_string(),
2490                        version: "0.0.0".to_string(),
2491                        legacy_path: None,
2492                    });
2493                }
2494            }
2495            return specs;
2496        }
2497    }
2498    if let Some(legacy_manifest) = legacy_manifest {
2499        return legacy_manifest
2500            .components
2501            .iter()
2502            .map(|entry| ComponentSpec {
2503                id: entry.name.clone(),
2504                version: entry.version.to_string(),
2505                legacy_path: Some(entry.file_wasm.clone()),
2506            })
2507            .collect();
2508    }
2509    Vec::new()
2510}
2511
2512fn component_sources_table(
2513    sources: Option<&ComponentSourcesV1>,
2514) -> Result<Option<HashMap<String, ComponentSourceInfo>>> {
2515    let Some(sources) = sources else {
2516        return Ok(None);
2517    };
2518    let mut table = HashMap::new();
2519    for entry in &sources.components {
2520        let artifact = match &entry.artifact {
2521            ArtifactLocationV1::Inline { wasm_path, .. } => ComponentArtifactLocation::Inline {
2522                wasm_path: wasm_path.clone(),
2523            },
2524            ArtifactLocationV1::Remote => ComponentArtifactLocation::Remote,
2525        };
2526        let info = ComponentSourceInfo {
2527            digest: Some(entry.resolved.digest.clone()),
2528            source: entry.source.clone(),
2529            artifact,
2530            expected_wasm_sha256: None,
2531            skip_digest_verification: false,
2532        };
2533        if let Some(component_id) = entry.component_id.as_ref() {
2534            table.insert(component_id.as_str().to_string(), info.clone());
2535        }
2536        table.insert(entry.name.clone(), info);
2537    }
2538    Ok(Some(table))
2539}
2540
2541fn load_pack_lock(path: &Path) -> Result<Option<PackLockV1>> {
2542    let lock_path = if path.is_dir() {
2543        let candidate = path.join("pack.lock");
2544        if candidate.exists() {
2545            Some(candidate)
2546        } else {
2547            let candidate = path.join("pack.lock.json");
2548            candidate.exists().then_some(candidate)
2549        }
2550    } else {
2551        None
2552    };
2553    let Some(lock_path) = lock_path else {
2554        return Ok(None);
2555    };
2556    let raw = std::fs::read_to_string(&lock_path)
2557        .with_context(|| format!("failed to read {}", lock_path.display()))?;
2558    let lock: PackLockV1 = serde_json::from_str(&raw).context("failed to parse pack.lock")?;
2559    if lock.schema_version != 1 {
2560        bail!("pack.lock schema_version must be 1");
2561    }
2562    Ok(Some(lock))
2563}
2564
2565fn find_pack_lock_roots(
2566    pack_path: &Path,
2567    is_dir: bool,
2568    archive_hint: Option<&Path>,
2569) -> Vec<PathBuf> {
2570    if is_dir {
2571        return vec![pack_path.to_path_buf()];
2572    }
2573    let mut roots = Vec::new();
2574    if let Some(archive_path) = archive_hint {
2575        if let Some(parent) = archive_path.parent() {
2576            roots.push(parent.to_path_buf());
2577            if let Some(grandparent) = parent.parent() {
2578                roots.push(grandparent.to_path_buf());
2579            }
2580        }
2581    } else if let Some(parent) = pack_path.parent() {
2582        roots.push(parent.to_path_buf());
2583        if let Some(grandparent) = parent.parent() {
2584            roots.push(grandparent.to_path_buf());
2585        }
2586    }
2587    roots
2588}
2589
2590fn normalize_sha256(digest: &str) -> Result<String> {
2591    let trimmed = digest.trim();
2592    if trimmed.is_empty() {
2593        bail!("sha256 digest cannot be empty");
2594    }
2595    if let Some(stripped) = trimmed.strip_prefix("sha256:") {
2596        if stripped.is_empty() {
2597            bail!("sha256 digest must include hex bytes after sha256:");
2598        }
2599        return Ok(trimmed.to_string());
2600    }
2601    if trimmed.chars().all(|c| c.is_ascii_hexdigit()) {
2602        return Ok(format!("sha256:{trimmed}"));
2603    }
2604    bail!("sha256 digest must be hex or sha256:<hex>");
2605}
2606
2607fn component_sources_table_from_pack_lock(
2608    lock: &PackLockV1,
2609    allow_missing_hash: bool,
2610) -> Result<HashMap<String, ComponentSourceInfo>> {
2611    let mut table = HashMap::new();
2612    let mut names = HashSet::new();
2613    for entry in &lock.components {
2614        if !names.insert(entry.name.clone()) {
2615            bail!(
2616                "pack.lock contains duplicate component name `{}`",
2617                entry.name
2618            );
2619        }
2620        let source_ref = match (&entry.source_ref, &entry.legacy_ref) {
2621            (Some(primary), Some(legacy)) => {
2622                if primary != legacy {
2623                    bail!(
2624                        "pack.lock component {} has conflicting refs: {} vs {}",
2625                        entry.name,
2626                        primary,
2627                        legacy
2628                    );
2629                }
2630                primary.as_str()
2631            }
2632            (Some(primary), None) => primary.as_str(),
2633            (None, Some(legacy)) => legacy.as_str(),
2634            (None, None) => {
2635                bail!("pack.lock component {} missing source_ref", entry.name);
2636            }
2637        };
2638        let source: ComponentSourceRef = source_ref
2639            .parse()
2640            .with_context(|| format!("invalid component ref `{}`", source_ref))?;
2641        let bundled_path = match (&entry.bundled_path, &entry.legacy_path) {
2642            (Some(primary), Some(legacy)) => {
2643                if primary != legacy {
2644                    bail!(
2645                        "pack.lock component {} has conflicting bundled paths: {} vs {}",
2646                        entry.name,
2647                        primary,
2648                        legacy
2649                    );
2650                }
2651                Some(primary.clone())
2652            }
2653            (Some(primary), None) => Some(primary.clone()),
2654            (None, Some(legacy)) => Some(legacy.clone()),
2655            (None, None) => None,
2656        };
2657        let bundled = entry.bundled.unwrap_or(false) || bundled_path.is_some();
2658        let (artifact, digest, expected_wasm_sha256, skip_digest_verification) = if bundled {
2659            let wasm_path = bundled_path.ok_or_else(|| {
2660                anyhow!(
2661                    "pack.lock component {} marked bundled but bundled_path is missing",
2662                    entry.name
2663                )
2664            })?;
2665            let expected_raw = match (&entry.wasm_sha256, &entry.legacy_sha256) {
2666                (Some(primary), Some(legacy)) => {
2667                    if primary != legacy {
2668                        bail!(
2669                            "pack.lock component {} has conflicting wasm_sha256 values: {} vs {}",
2670                            entry.name,
2671                            primary,
2672                            legacy
2673                        );
2674                    }
2675                    Some(primary.as_str())
2676                }
2677                (Some(primary), None) => Some(primary.as_str()),
2678                (None, Some(legacy)) => Some(legacy.as_str()),
2679                (None, None) => None,
2680            };
2681            let expected = match expected_raw {
2682                Some(value) => Some(normalize_sha256(value)?),
2683                None => None,
2684            };
2685            if expected.is_none() && !allow_missing_hash {
2686                bail!(
2687                    "pack.lock component {} missing wasm_sha256 for bundled component",
2688                    entry.name
2689                );
2690            }
2691            (
2692                ComponentArtifactLocation::Inline { wasm_path },
2693                expected.clone(),
2694                expected,
2695                allow_missing_hash && expected_raw.is_none(),
2696            )
2697        } else {
2698            if source.is_tag() {
2699                bail!(
2700                    "component {} uses tag ref {} but is not bundled; rebuild the pack",
2701                    entry.name,
2702                    source
2703                );
2704            }
2705            let expected = entry
2706                .resolved_digest
2707                .as_deref()
2708                .or(entry.digest.as_deref())
2709                .ok_or_else(|| {
2710                    anyhow!(
2711                        "pack.lock component {} missing resolved_digest for remote component",
2712                        entry.name
2713                    )
2714                })?;
2715            (
2716                ComponentArtifactLocation::Remote,
2717                Some(normalize_digest(expected)),
2718                None,
2719                false,
2720            )
2721        };
2722        let info = ComponentSourceInfo {
2723            digest,
2724            source,
2725            artifact,
2726            expected_wasm_sha256,
2727            skip_digest_verification,
2728        };
2729        if let Some(component_id) = entry.component_id.as_ref() {
2730            let key = component_id.as_str().to_string();
2731            if table.contains_key(&key) {
2732                bail!(
2733                    "pack.lock contains duplicate component id `{}`",
2734                    component_id.as_str()
2735                );
2736            }
2737            table.insert(key, info.clone());
2738        }
2739        if entry.name
2740            != entry
2741                .component_id
2742                .as_ref()
2743                .map(|id| id.as_str())
2744                .unwrap_or("")
2745        {
2746            table.insert(entry.name.clone(), info);
2747        }
2748    }
2749    Ok(table)
2750}
2751
2752fn component_path_for_spec(root: &Path, spec: &ComponentSpec) -> PathBuf {
2753    if let Some(path) = &spec.legacy_path {
2754        return root.join(path);
2755    }
2756    root.join("components").join(format!("{}.wasm", spec.id))
2757}
2758
2759fn normalize_digest(digest: &str) -> String {
2760    if digest.starts_with("sha256:") || digest.starts_with("blake3:") {
2761        digest.to_string()
2762    } else {
2763        format!("sha256:{digest}")
2764    }
2765}
2766
2767fn compute_digest_for(bytes: &[u8], digest: &str) -> Result<String> {
2768    if digest.starts_with("blake3:") {
2769        let hash = blake3::hash(bytes);
2770        return Ok(format!("blake3:{}", hash.to_hex()));
2771    }
2772    let mut hasher = sha2::Sha256::new();
2773    hasher.update(bytes);
2774    Ok(format!("sha256:{:x}", hasher.finalize()))
2775}
2776
2777fn compute_sha256_digest_for(bytes: &[u8]) -> String {
2778    let mut hasher = sha2::Sha256::new();
2779    hasher.update(bytes);
2780    format!("sha256:{:x}", hasher.finalize())
2781}
2782
2783fn build_artifact_key(cache: &CacheManager, digest: Option<&str>, bytes: &[u8]) -> ArtifactKey {
2784    let wasm_digest = digest
2785        .map(normalize_digest)
2786        .unwrap_or_else(|| compute_sha256_digest_for(bytes));
2787    ArtifactKey::new(cache.engine_profile_id().to_string(), wasm_digest)
2788}
2789
2790async fn compile_component_with_cache(
2791    cache: &CacheManager,
2792    engine: &Engine,
2793    digest: Option<&str>,
2794    bytes: Vec<u8>,
2795) -> Result<Arc<Component>> {
2796    let key = build_artifact_key(cache, digest, &bytes);
2797    cache.get_component(engine, &key, || Ok(bytes)).await
2798}
2799
2800fn verify_component_digest(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2801    let normalized_expected = normalize_digest(expected);
2802    let actual = compute_digest_for(bytes, &normalized_expected)?;
2803    if normalize_digest(&actual) != normalized_expected {
2804        bail!(
2805            "component {component_id} digest mismatch: expected {normalized_expected}, got {actual}"
2806        );
2807    }
2808    Ok(())
2809}
2810
2811fn verify_wasm_sha256(component_id: &str, expected: &str, bytes: &[u8]) -> Result<()> {
2812    let normalized_expected = normalize_sha256(expected)?;
2813    let actual = compute_sha256_digest_for(bytes);
2814    if actual != normalized_expected {
2815        bail!(
2816            "component {component_id} bundled digest mismatch: expected {normalized_expected}, got {actual}"
2817        );
2818    }
2819    Ok(())
2820}
2821
2822#[cfg(test)]
2823mod pack_lock_tests {
2824    use super::*;
2825    use tempfile::TempDir;
2826
2827    #[test]
2828    fn pack_lock_tag_ref_requires_bundle() {
2829        let lock = PackLockV1 {
2830            schema_version: 1,
2831            components: vec![PackLockComponent {
2832                name: "templates".to_string(),
2833                source_ref: Some("oci://registry.test/templates:latest".to_string()),
2834                legacy_ref: None,
2835                component_id: None,
2836                bundled: Some(false),
2837                bundled_path: None,
2838                legacy_path: None,
2839                wasm_sha256: None,
2840                legacy_sha256: None,
2841                resolved_digest: None,
2842                digest: None,
2843            }],
2844        };
2845        let err = component_sources_table_from_pack_lock(&lock, false).unwrap_err();
2846        assert!(
2847            err.to_string().contains("tag ref") && err.to_string().contains("rebuild the pack"),
2848            "unexpected error: {err}"
2849        );
2850    }
2851
2852    #[test]
2853    fn bundled_hash_mismatch_errors() {
2854        let rt = tokio::runtime::Runtime::new().expect("runtime");
2855        let temp = TempDir::new().expect("temp dir");
2856        let engine = Engine::default();
2857        let engine_profile =
2858            EngineProfile::from_engine(&engine, CpuPolicy::Native, "default".to_string());
2859        let cache_config = CacheConfig {
2860            root: temp.path().join("cache"),
2861            ..CacheConfig::default()
2862        };
2863        let cache = CacheManager::new(cache_config, engine_profile);
2864        let wasm_path = temp.path().join("component.wasm");
2865        let fixture_wasm = Path::new(env!("CARGO_MANIFEST_DIR"))
2866            .join("../../tests/fixtures/packs/secrets_store_smoke/components/echo_secret.wasm");
2867        let bytes = std::fs::read(&fixture_wasm).expect("read fixture wasm");
2868        std::fs::write(&wasm_path, &bytes).expect("write temp wasm");
2869
2870        let spec = ComponentSpec {
2871            id: "qa.process".to_string(),
2872            version: "0.0.0".to_string(),
2873            legacy_path: None,
2874        };
2875        let mut missing = HashSet::new();
2876        missing.insert(spec.id.clone());
2877
2878        let mut sources = HashMap::new();
2879        sources.insert(
2880            spec.id.clone(),
2881            ComponentSourceInfo {
2882                digest: Some("sha256:deadbeef".to_string()),
2883                source: ComponentSourceRef::Oci("registry.test/qa.process@sha256:deadbeef".into()),
2884                artifact: ComponentArtifactLocation::Inline {
2885                    wasm_path: "component.wasm".to_string(),
2886                },
2887                expected_wasm_sha256: Some("sha256:deadbeef".to_string()),
2888                skip_digest_verification: false,
2889            },
2890        );
2891
2892        let mut loaded = HashMap::new();
2893        let result = rt.block_on(load_components_from_sources(
2894            &cache,
2895            &engine,
2896            &sources,
2897            &ComponentResolution::default(),
2898            &[spec],
2899            &mut missing,
2900            &mut loaded,
2901            Some(temp.path()),
2902            None,
2903        ));
2904        let err = result.unwrap_err();
2905        assert!(
2906            err.to_string().contains("bundled digest mismatch"),
2907            "unexpected error: {err}"
2908        );
2909    }
2910}
2911
2912#[cfg(test)]
2913mod pack_resolution_prop_tests {
2914    use super::*;
2915    use greentic_types::{ArtifactLocationV1, ComponentSourceEntryV1, ResolvedComponentV1};
2916    use proptest::prelude::*;
2917    use proptest::test_runner::{Config as ProptestConfig, RngAlgorithm, TestRng, TestRunner};
2918    use std::collections::BTreeSet;
2919    use std::path::Path;
2920    use std::str::FromStr;
2921
2922    #[derive(Clone, Debug)]
2923    enum ResolveRequest {
2924        ById(String),
2925        ByName(String),
2926    }
2927
2928    #[derive(Clone, Debug, PartialEq, Eq)]
2929    struct ResolvedComponent {
2930        key: String,
2931        source: String,
2932        artifact: String,
2933        digest: Option<String>,
2934        expected_wasm_sha256: Option<String>,
2935        skip_digest_verification: bool,
2936    }
2937
2938    #[derive(Clone, Debug, PartialEq, Eq)]
2939    struct ResolveError {
2940        code: String,
2941        message: String,
2942        context_key: String,
2943    }
2944
2945    #[derive(Clone, Debug)]
2946    struct Scenario {
2947        pack_lock: Option<PackLockV1>,
2948        component_sources: Option<ComponentSourcesV1>,
2949        request: ResolveRequest,
2950        expected_sha256: Option<String>,
2951        bytes: Vec<u8>,
2952    }
2953
2954    fn resolve_component_test(
2955        sources: Option<&ComponentSourcesV1>,
2956        lock: Option<&PackLockV1>,
2957        request: &ResolveRequest,
2958    ) -> Result<ResolvedComponent, ResolveError> {
2959        let table = if let Some(lock) = lock {
2960            component_sources_table_from_pack_lock(lock, false).map_err(|err| ResolveError {
2961                code: classify_pack_lock_error(err.to_string().as_str()).to_string(),
2962                message: err.to_string(),
2963                context_key: request_key(request).to_string(),
2964            })?
2965        } else {
2966            let sources = component_sources_table(sources).map_err(|err| ResolveError {
2967                code: "component_sources_error".to_string(),
2968                message: err.to_string(),
2969                context_key: request_key(request).to_string(),
2970            })?;
2971            sources.ok_or_else(|| ResolveError {
2972                code: "missing_component_sources".to_string(),
2973                message: "component sources not provided".to_string(),
2974                context_key: request_key(request).to_string(),
2975            })?
2976        };
2977
2978        let key = request_key(request);
2979        let source = table.get(key).ok_or_else(|| ResolveError {
2980            code: "component_not_found".to_string(),
2981            message: format!("component {key} not found"),
2982            context_key: key.to_string(),
2983        })?;
2984
2985        Ok(ResolvedComponent {
2986            key: key.to_string(),
2987            source: source.source.to_string(),
2988            artifact: match source.artifact {
2989                ComponentArtifactLocation::Inline { .. } => "inline".to_string(),
2990                ComponentArtifactLocation::Remote => "remote".to_string(),
2991            },
2992            digest: source.digest.clone(),
2993            expected_wasm_sha256: source.expected_wasm_sha256.clone(),
2994            skip_digest_verification: source.skip_digest_verification,
2995        })
2996    }
2997
2998    fn request_key(request: &ResolveRequest) -> &str {
2999        match request {
3000            ResolveRequest::ById(value) => value.as_str(),
3001            ResolveRequest::ByName(value) => value.as_str(),
3002        }
3003    }
3004
3005    fn classify_pack_lock_error(message: &str) -> &'static str {
3006        if message.contains("duplicate component name") {
3007            "duplicate_name"
3008        } else if message.contains("duplicate component id") {
3009            "duplicate_id"
3010        } else if message.contains("conflicting refs") {
3011            "conflicting_ref"
3012        } else if message.contains("conflicting bundled paths") {
3013            "conflicting_bundled_path"
3014        } else if message.contains("conflicting wasm_sha256") {
3015            "conflicting_wasm_sha256"
3016        } else if message.contains("missing source_ref") {
3017            "missing_source_ref"
3018        } else if message.contains("marked bundled but bundled_path is missing") {
3019            "missing_bundled_path"
3020        } else if message.contains("missing wasm_sha256") {
3021            "missing_wasm_sha256"
3022        } else if message.contains("tag ref") && message.contains("not bundled") {
3023            "tag_ref_requires_bundle"
3024        } else if message.contains("missing resolved_digest") {
3025            "missing_resolved_digest"
3026        } else if message.contains("invalid component ref") {
3027            "invalid_component_ref"
3028        } else if message.contains("sha256 digest") {
3029            "invalid_sha256"
3030        } else {
3031            "unknown_error"
3032        }
3033    }
3034
3035    fn known_error_codes() -> BTreeSet<&'static str> {
3036        [
3037            "component_sources_error",
3038            "missing_component_sources",
3039            "component_not_found",
3040            "duplicate_name",
3041            "duplicate_id",
3042            "conflicting_ref",
3043            "conflicting_bundled_path",
3044            "conflicting_wasm_sha256",
3045            "missing_source_ref",
3046            "missing_bundled_path",
3047            "missing_wasm_sha256",
3048            "tag_ref_requires_bundle",
3049            "missing_resolved_digest",
3050            "invalid_component_ref",
3051            "invalid_sha256",
3052            "unknown_error",
3053        ]
3054        .into_iter()
3055        .collect()
3056    }
3057
3058    fn proptest_config() -> ProptestConfig {
3059        let cases = std::env::var("PROPTEST_CASES")
3060            .ok()
3061            .and_then(|value| value.parse::<u32>().ok())
3062            .unwrap_or(128);
3063        ProptestConfig {
3064            cases,
3065            failure_persistence: None,
3066            ..ProptestConfig::default()
3067        }
3068    }
3069
3070    fn proptest_seed() -> Option<[u8; 32]> {
3071        let seed = std::env::var("PROPTEST_SEED")
3072            .ok()
3073            .and_then(|value| value.parse::<u64>().ok())?;
3074        let mut bytes = [0u8; 32];
3075        bytes[..8].copy_from_slice(&seed.to_le_bytes());
3076        Some(bytes)
3077    }
3078
3079    fn run_cases(strategy: impl Strategy<Value = Scenario>, cases: u32, seed: Option<[u8; 32]>) {
3080        let config = ProptestConfig {
3081            cases,
3082            failure_persistence: None,
3083            ..ProptestConfig::default()
3084        };
3085        let mut runner = match seed {
3086            Some(bytes) => {
3087                TestRunner::new_with_rng(config, TestRng::from_seed(RngAlgorithm::ChaCha, &bytes))
3088            }
3089            None => TestRunner::new(config),
3090        };
3091        runner
3092            .run(&strategy, |scenario| {
3093                run_scenario(&scenario);
3094                Ok(())
3095            })
3096            .unwrap();
3097    }
3098
3099    fn run_scenario(scenario: &Scenario) {
3100        let known_codes = known_error_codes();
3101        let first = resolve_component_test(
3102            scenario.component_sources.as_ref(),
3103            scenario.pack_lock.as_ref(),
3104            &scenario.request,
3105        );
3106        let second = resolve_component_test(
3107            scenario.component_sources.as_ref(),
3108            scenario.pack_lock.as_ref(),
3109            &scenario.request,
3110        );
3111        assert_eq!(normalize_result(&first), normalize_result(&second));
3112
3113        if let Some(lock) = scenario.pack_lock.as_ref() {
3114            let lock_only = resolve_component_test(None, Some(lock), &scenario.request);
3115            assert_eq!(normalize_result(&first), normalize_result(&lock_only));
3116        }
3117
3118        if let Err(err) = first.as_ref() {
3119            assert!(
3120                known_codes.contains(err.code.as_str()),
3121                "unexpected error code {}: {}",
3122                err.code,
3123                err.message
3124            );
3125        }
3126
3127        if let Some(expected) = scenario.expected_sha256.as_deref() {
3128            let expected_ok =
3129                verify_wasm_sha256("test.component", expected, &scenario.bytes).is_ok();
3130            let actual = compute_sha256_digest_for(&scenario.bytes);
3131            if actual == normalize_sha256(expected).unwrap_or_default() {
3132                assert!(expected_ok, "expected sha256 match to succeed");
3133            } else {
3134                assert!(!expected_ok, "expected sha256 mismatch to fail");
3135            }
3136        }
3137    }
3138
3139    fn normalize_result(
3140        result: &Result<ResolvedComponent, ResolveError>,
3141    ) -> Result<ResolvedComponent, ResolveError> {
3142        match result {
3143            Ok(value) => Ok(value.clone()),
3144            Err(err) => Err(err.clone()),
3145        }
3146    }
3147
3148    fn scenario_strategy() -> impl Strategy<Value = Scenario> {
3149        let name = any::<u8>().prop_map(|n| format!("component{n}.core"));
3150        let alt_name = any::<u8>().prop_map(|n| format!("component_alt{n}.core"));
3151        let tag_ref = any::<bool>();
3152        let bundled = any::<bool>();
3153        let include_sha = any::<bool>();
3154        let include_component_id = any::<bool>();
3155        let request_by_id = any::<bool>();
3156        let use_lock = any::<bool>();
3157        let use_sources = any::<bool>();
3158        let bytes = prop::collection::vec(any::<u8>(), 1..64);
3159
3160        (
3161            name,
3162            alt_name,
3163            tag_ref,
3164            bundled,
3165            include_sha,
3166            include_component_id,
3167            request_by_id,
3168            use_lock,
3169            use_sources,
3170            bytes,
3171        )
3172            .prop_map(
3173                |(
3174                    name,
3175                    alt_name,
3176                    tag_ref,
3177                    bundled,
3178                    include_sha,
3179                    include_component_id,
3180                    request_by_id,
3181                    use_lock,
3182                    use_sources,
3183                    bytes,
3184                )| {
3185                    let component_id_str = if include_component_id {
3186                        alt_name.clone()
3187                    } else {
3188                        name.clone()
3189                    };
3190                    let component_id = ComponentId::from_str(&component_id_str).ok();
3191                    let source_ref = if tag_ref {
3192                        format!("oci://registry.test/{name}:v1")
3193                    } else {
3194                        format!(
3195                            "oci://registry.test/{name}@sha256:{}",
3196                            hex::encode([0x11u8; 32])
3197                        )
3198                    };
3199                    let expected_sha256 = if bundled && include_sha {
3200                        Some(compute_sha256_digest_for(&bytes))
3201                    } else {
3202                        None
3203                    };
3204
3205                    let lock_component = PackLockComponent {
3206                        name: name.clone(),
3207                        source_ref: Some(source_ref),
3208                        legacy_ref: None,
3209                        component_id,
3210                        bundled: Some(bundled),
3211                        bundled_path: if bundled {
3212                            Some(format!("components/{name}.wasm"))
3213                        } else {
3214                            None
3215                        },
3216                        legacy_path: None,
3217                        wasm_sha256: expected_sha256.clone(),
3218                        legacy_sha256: None,
3219                        resolved_digest: if bundled {
3220                            None
3221                        } else {
3222                            Some("sha256:deadbeef".to_string())
3223                        },
3224                        digest: None,
3225                    };
3226
3227                    let pack_lock = if use_lock {
3228                        Some(PackLockV1 {
3229                            schema_version: 1,
3230                            components: vec![lock_component],
3231                        })
3232                    } else {
3233                        None
3234                    };
3235
3236                    let component_sources = if use_sources {
3237                        Some(ComponentSourcesV1::new(vec![ComponentSourceEntryV1 {
3238                            name: name.clone(),
3239                            component_id: ComponentId::from_str(&name).ok(),
3240                            source: ComponentSourceRef::from_str(
3241                                "oci://registry.test/component@sha256:deadbeef",
3242                            )
3243                            .expect("component ref"),
3244                            resolved: ResolvedComponentV1 {
3245                                digest: "sha256:deadbeef".to_string(),
3246                                signature: None,
3247                                signed_by: None,
3248                            },
3249                            artifact: if bundled {
3250                                ArtifactLocationV1::Inline {
3251                                    wasm_path: format!("components/{name}.wasm"),
3252                                    manifest_path: None,
3253                                }
3254                            } else {
3255                                ArtifactLocationV1::Remote
3256                            },
3257                            licensing_hint: None,
3258                            metering_hint: None,
3259                        }]))
3260                    } else {
3261                        None
3262                    };
3263
3264                    let request = if request_by_id {
3265                        ResolveRequest::ById(component_id_str.clone())
3266                    } else {
3267                        ResolveRequest::ByName(name.clone())
3268                    };
3269
3270                    Scenario {
3271                        pack_lock,
3272                        component_sources,
3273                        request,
3274                        expected_sha256,
3275                        bytes,
3276                    }
3277                },
3278            )
3279    }
3280
3281    #[test]
3282    fn pack_resolution_proptest() {
3283        let seed = proptest_seed();
3284        run_cases(scenario_strategy(), proptest_config().cases, seed);
3285    }
3286
3287    #[test]
3288    fn pack_resolution_regression_seeds() {
3289        let seeds_path =
3290            Path::new(env!("CARGO_MANIFEST_DIR")).join("../../tests/fixtures/proptest-seeds.txt");
3291        let raw = std::fs::read_to_string(&seeds_path).expect("read proptest seeds");
3292        for line in raw.lines() {
3293            let line = line.trim();
3294            if line.is_empty() || line.starts_with('#') {
3295                continue;
3296            }
3297            let seed = line.parse::<u64>().expect("seed must be an integer");
3298            let mut bytes = [0u8; 32];
3299            bytes[..8].copy_from_slice(&seed.to_le_bytes());
3300            run_cases(scenario_strategy(), 1, Some(bytes));
3301        }
3302    }
3303}
3304
3305fn locate_pack_assets(
3306    materialized_root: Option<&Path>,
3307    archive_hint: Option<&Path>,
3308) -> Result<(Option<PathBuf>, Option<TempDir>)> {
3309    if let Some(root) = materialized_root {
3310        let assets = root.join("assets");
3311        if assets.is_dir() {
3312            return Ok((Some(assets), None));
3313        }
3314    }
3315    if let Some(path) = archive_hint
3316        && let Some((tempdir, assets)) = extract_assets_from_archive(path)?
3317    {
3318        return Ok((Some(assets), Some(tempdir)));
3319    }
3320    Ok((None, None))
3321}
3322
3323fn extract_assets_from_archive(path: &Path) -> Result<Option<(TempDir, PathBuf)>> {
3324    let file =
3325        File::open(path).with_context(|| format!("failed to open pack {}", path.display()))?;
3326    let mut archive =
3327        ZipArchive::new(file).with_context(|| format!("failed to read pack {}", path.display()))?;
3328    let temp = TempDir::new().context("failed to create temporary assets directory")?;
3329    let mut found = false;
3330    for idx in 0..archive.len() {
3331        let mut entry = archive.by_index(idx)?;
3332        let name = entry.name();
3333        if !name.starts_with("assets/") {
3334            continue;
3335        }
3336        let dest = temp.path().join(name);
3337        if name.ends_with('/') {
3338            std::fs::create_dir_all(&dest)?;
3339            found = true;
3340            continue;
3341        }
3342        if let Some(parent) = dest.parent() {
3343            std::fs::create_dir_all(parent)?;
3344        }
3345        let mut outfile = std::fs::File::create(&dest)?;
3346        std::io::copy(&mut entry, &mut outfile)?;
3347        found = true;
3348    }
3349    if found {
3350        let assets_path = temp.path().join("assets");
3351        Ok(Some((temp, assets_path)))
3352    } else {
3353        Ok(None)
3354    }
3355}
3356
3357fn dist_options_from(component_resolution: &ComponentResolution) -> DistOptions {
3358    let mut opts = DistOptions {
3359        allow_tags: true,
3360        ..DistOptions::default()
3361    };
3362    if let Some(cache_dir) = component_resolution.dist_cache_dir.clone() {
3363        opts.cache_dir = cache_dir;
3364    }
3365    if component_resolution.dist_offline {
3366        opts.offline = true;
3367    }
3368    opts
3369}
3370
3371#[allow(clippy::too_many_arguments)]
3372async fn load_components_from_sources(
3373    cache: &CacheManager,
3374    engine: &Engine,
3375    component_sources: &HashMap<String, ComponentSourceInfo>,
3376    component_resolution: &ComponentResolution,
3377    specs: &[ComponentSpec],
3378    missing: &mut HashSet<String>,
3379    into: &mut HashMap<String, PackComponent>,
3380    materialized_root: Option<&Path>,
3381    archive_hint: Option<&Path>,
3382) -> Result<()> {
3383    let mut archive = if let Some(path) = archive_hint {
3384        Some(
3385            ZipArchive::new(File::open(path)?)
3386                .with_context(|| format!("{} is not a valid gtpack", path.display()))?,
3387        )
3388    } else {
3389        None
3390    };
3391    let mut dist_client: Option<DistClient> = None;
3392
3393    for spec in specs {
3394        if !missing.contains(&spec.id) {
3395            continue;
3396        }
3397        let Some(source) = component_sources.get(&spec.id) else {
3398            continue;
3399        };
3400
3401        let bytes = match &source.artifact {
3402            ComponentArtifactLocation::Inline { wasm_path } => {
3403                if let Some(root) = materialized_root {
3404                    let path = root.join(wasm_path);
3405                    if path.exists() {
3406                        std::fs::read(&path).with_context(|| {
3407                            format!(
3408                                "failed to read inline component {} from {}",
3409                                spec.id,
3410                                path.display()
3411                            )
3412                        })?
3413                    } else if archive.is_none() {
3414                        bail!("inline component {} missing at {}", spec.id, path.display());
3415                    } else {
3416                        read_entry(
3417                            archive.as_mut().expect("archive present when needed"),
3418                            wasm_path,
3419                        )
3420                        .with_context(|| {
3421                            format!(
3422                                "inline component {} missing at {} in pack archive",
3423                                spec.id, wasm_path
3424                            )
3425                        })?
3426                    }
3427                } else if let Some(archive) = archive.as_mut() {
3428                    read_entry(archive, wasm_path).with_context(|| {
3429                        format!(
3430                            "inline component {} missing at {} in pack archive",
3431                            spec.id, wasm_path
3432                        )
3433                    })?
3434                } else {
3435                    bail!(
3436                        "inline component {} missing and no pack source available",
3437                        spec.id
3438                    );
3439                }
3440            }
3441            ComponentArtifactLocation::Remote => {
3442                if source.source.is_tag() {
3443                    bail!(
3444                        "component {} uses tag ref {} but is not bundled; rebuild the pack",
3445                        spec.id,
3446                        source.source
3447                    );
3448                }
3449                let client = dist_client.get_or_insert_with(|| {
3450                    DistClient::new(dist_options_from(component_resolution))
3451                });
3452                let reference = source.source.to_string();
3453                fault::maybe_fail_asset(&reference)
3454                    .await
3455                    .with_context(|| format!("fault injection blocked asset {reference}"))?;
3456                let digest = source.digest.as_deref().ok_or_else(|| {
3457                    anyhow!(
3458                        "component {} missing expected digest for remote component",
3459                        spec.id
3460                    )
3461                })?;
3462                let cache_path = if component_resolution.dist_offline {
3463                    client
3464                        .fetch_digest(digest)
3465                        .await
3466                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?
3467                } else {
3468                    let resolved = client
3469                        .resolve_ref(&reference)
3470                        .await
3471                        .map_err(|err| dist_error_for_component(err, &spec.id, &reference))?;
3472                    let expected = normalize_digest(digest);
3473                    let actual = normalize_digest(&resolved.digest);
3474                    if expected != actual {
3475                        bail!(
3476                            "component {} digest mismatch after fetch: expected {}, got {}",
3477                            spec.id,
3478                            expected,
3479                            actual
3480                        );
3481                    }
3482                    resolved.cache_path.ok_or_else(|| {
3483                        anyhow!(
3484                            "component {} resolved from {} but cache path is missing",
3485                            spec.id,
3486                            reference
3487                        )
3488                    })?
3489                };
3490                std::fs::read(&cache_path).with_context(|| {
3491                    format!(
3492                        "failed to read cached component {} from {}",
3493                        spec.id,
3494                        cache_path.display()
3495                    )
3496                })?
3497            }
3498        };
3499
3500        if let Some(expected) = source.expected_wasm_sha256.as_deref() {
3501            verify_wasm_sha256(&spec.id, expected, &bytes)?;
3502        } else if source.skip_digest_verification {
3503            let actual = compute_sha256_digest_for(&bytes);
3504            warn!(
3505                component_id = %spec.id,
3506                digest = %actual,
3507                "bundled component missing wasm_sha256; allowing due to flag"
3508            );
3509        } else {
3510            let expected = source.digest.as_deref().ok_or_else(|| {
3511                anyhow!(
3512                    "component {} missing expected digest for verification",
3513                    spec.id
3514                )
3515            })?;
3516            verify_component_digest(&spec.id, expected, &bytes)?;
3517        }
3518        let component =
3519            compile_component_with_cache(cache, engine, source.digest.as_deref(), bytes)
3520                .await
3521                .with_context(|| format!("failed to compile component {}", spec.id))?;
3522        into.insert(
3523            spec.id.clone(),
3524            PackComponent {
3525                name: spec.id.clone(),
3526                version: spec.version.clone(),
3527                component,
3528            },
3529        );
3530        missing.remove(&spec.id);
3531    }
3532
3533    Ok(())
3534}
3535
3536fn dist_error_for_component(err: DistError, component_id: &str, reference: &str) -> anyhow::Error {
3537    match err {
3538        DistError::CacheMiss { reference: missing } => anyhow!(
3539            "remote component {} is not cached for {}. Run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3540            component_id,
3541            missing,
3542            reference
3543        ),
3544        DistError::Offline { reference: blocked } => anyhow!(
3545            "offline mode blocked fetching component {} from {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3546            component_id,
3547            blocked,
3548            reference
3549        ),
3550        DistError::AuthRequired { target } => anyhow!(
3551            "component {} requires authenticated source {}; run `greentic-dist pull --lock <pack.lock>` or `greentic-dist pull {}`",
3552            component_id,
3553            target,
3554            reference
3555        ),
3556        other => anyhow!(
3557            "failed to resolve component {} from {}: {}",
3558            component_id,
3559            reference,
3560            other
3561        ),
3562    }
3563}
3564
3565async fn load_components_from_overrides(
3566    cache: &CacheManager,
3567    engine: &Engine,
3568    overrides: &HashMap<String, PathBuf>,
3569    specs: &[ComponentSpec],
3570    missing: &mut HashSet<String>,
3571    into: &mut HashMap<String, PackComponent>,
3572) -> Result<()> {
3573    for spec in specs {
3574        if !missing.contains(&spec.id) {
3575            continue;
3576        }
3577        let Some(path) = overrides.get(&spec.id) else {
3578            continue;
3579        };
3580        let bytes = std::fs::read(path)
3581            .with_context(|| format!("failed to read override component {}", path.display()))?;
3582        let component = compile_component_with_cache(cache, engine, None, bytes)
3583            .await
3584            .with_context(|| {
3585                format!(
3586                    "failed to compile component {} from override {}",
3587                    spec.id,
3588                    path.display()
3589                )
3590            })?;
3591        into.insert(
3592            spec.id.clone(),
3593            PackComponent {
3594                name: spec.id.clone(),
3595                version: spec.version.clone(),
3596                component,
3597            },
3598        );
3599        missing.remove(&spec.id);
3600    }
3601    Ok(())
3602}
3603
3604async fn load_components_from_dir(
3605    cache: &CacheManager,
3606    engine: &Engine,
3607    root: &Path,
3608    specs: &[ComponentSpec],
3609    missing: &mut HashSet<String>,
3610    into: &mut HashMap<String, PackComponent>,
3611) -> Result<()> {
3612    for spec in specs {
3613        if !missing.contains(&spec.id) {
3614            continue;
3615        }
3616        let path = component_path_for_spec(root, spec);
3617        if !path.exists() {
3618            tracing::debug!(component = %spec.id, path = %path.display(), "materialized component missing; will try other sources");
3619            continue;
3620        }
3621        let bytes = std::fs::read(&path)
3622            .with_context(|| format!("failed to read component {}", path.display()))?;
3623        let component = compile_component_with_cache(cache, engine, None, bytes)
3624            .await
3625            .with_context(|| {
3626                format!(
3627                    "failed to compile component {} from {}",
3628                    spec.id,
3629                    path.display()
3630                )
3631            })?;
3632        into.insert(
3633            spec.id.clone(),
3634            PackComponent {
3635                name: spec.id.clone(),
3636                version: spec.version.clone(),
3637                component,
3638            },
3639        );
3640        missing.remove(&spec.id);
3641    }
3642    Ok(())
3643}
3644
3645async fn load_components_from_archive(
3646    cache: &CacheManager,
3647    engine: &Engine,
3648    path: &Path,
3649    specs: &[ComponentSpec],
3650    missing: &mut HashSet<String>,
3651    into: &mut HashMap<String, PackComponent>,
3652) -> Result<()> {
3653    let mut archive = ZipArchive::new(File::open(path)?)
3654        .with_context(|| format!("{} is not a valid gtpack", path.display()))?;
3655    for spec in specs {
3656        if !missing.contains(&spec.id) {
3657            continue;
3658        }
3659        let file_name = spec
3660            .legacy_path
3661            .clone()
3662            .unwrap_or_else(|| format!("components/{}.wasm", spec.id));
3663        let bytes = match read_entry(&mut archive, &file_name) {
3664            Ok(bytes) => bytes,
3665            Err(err) => {
3666                warn!(component = %spec.id, pack = %path.display(), error = %err, "component entry missing in pack archive");
3667                continue;
3668            }
3669        };
3670        let component = compile_component_with_cache(cache, engine, None, bytes)
3671            .await
3672            .with_context(|| format!("failed to compile component {}", spec.id))?;
3673        into.insert(
3674            spec.id.clone(),
3675            PackComponent {
3676                name: spec.id.clone(),
3677                version: spec.version.clone(),
3678                component,
3679            },
3680        );
3681        missing.remove(&spec.id);
3682    }
3683    Ok(())
3684}
3685
3686#[cfg(test)]
3687mod tests {
3688    use super::*;
3689    use greentic_flow::model::{FlowDoc, NodeDoc};
3690    use indexmap::IndexMap;
3691    use serde_json::json;
3692
3693    #[test]
3694    fn normalizes_raw_component_to_component_exec() {
3695        let mut nodes = IndexMap::new();
3696        let mut raw = IndexMap::new();
3697        raw.insert(
3698            "templating.handlebars".into(),
3699            json!({ "template": "Hi {{name}}" }),
3700        );
3701        nodes.insert(
3702            "start".into(),
3703            NodeDoc {
3704                raw,
3705                routing: json!([{"out": true}]),
3706                ..Default::default()
3707            },
3708        );
3709        let doc = FlowDoc {
3710            id: "welcome".into(),
3711            title: None,
3712            description: None,
3713            flow_type: "messaging".into(),
3714            start: Some("start".into()),
3715            parameters: json!({}),
3716            tags: Vec::new(),
3717            schema_version: None,
3718            entrypoints: IndexMap::new(),
3719            nodes,
3720        };
3721
3722        let normalized = normalize_flow_doc(doc);
3723        let node = normalized.nodes.get("start").expect("node exists");
3724        assert_eq!(node.operation.as_deref(), Some("component.exec"));
3725        assert!(node.raw.is_empty());
3726        let payload = node.payload.as_object().expect("payload object");
3727        assert_eq!(
3728            payload.get("component"),
3729            Some(&Value::String("templating.handlebars".into()))
3730        );
3731        assert_eq!(
3732            payload.get("operation"),
3733            Some(&Value::String("render".into()))
3734        );
3735        let input = payload.get("input").unwrap();
3736        assert_eq!(input, &json!({ "template": "Hi {{name}}" }));
3737    }
3738}
3739
3740#[derive(Clone, Debug, Default, Serialize, Deserialize)]
3741pub struct PackMetadata {
3742    pub pack_id: String,
3743    pub version: String,
3744    #[serde(default)]
3745    pub entry_flows: Vec<String>,
3746    #[serde(default)]
3747    pub secret_requirements: Vec<greentic_types::SecretRequirement>,
3748}
3749
3750impl PackMetadata {
3751    fn from_wasm(bytes: &[u8]) -> Option<Self> {
3752        let parser = Parser::new(0);
3753        for payload in parser.parse_all(bytes) {
3754            let payload = payload.ok()?;
3755            match payload {
3756                Payload::CustomSection(section) => {
3757                    if section.name() == "greentic.manifest"
3758                        && let Ok(meta) = Self::from_bytes(section.data())
3759                    {
3760                        return Some(meta);
3761                    }
3762                }
3763                Payload::DataSection(reader) => {
3764                    for segment in reader.into_iter().flatten() {
3765                        if let Ok(meta) = Self::from_bytes(segment.data) {
3766                            return Some(meta);
3767                        }
3768                    }
3769                }
3770                _ => {}
3771            }
3772        }
3773        None
3774    }
3775
3776    fn from_bytes(bytes: &[u8]) -> Result<Self, serde_cbor::Error> {
3777        #[derive(Deserialize)]
3778        struct RawManifest {
3779            pack_id: String,
3780            version: String,
3781            #[serde(default)]
3782            entry_flows: Vec<String>,
3783            #[serde(default)]
3784            flows: Vec<RawFlow>,
3785            #[serde(default)]
3786            secret_requirements: Vec<greentic_types::SecretRequirement>,
3787        }
3788
3789        #[derive(Deserialize)]
3790        struct RawFlow {
3791            id: String,
3792        }
3793
3794        let manifest: RawManifest = serde_cbor::from_slice(bytes)?;
3795        let mut entry_flows = if manifest.entry_flows.is_empty() {
3796            manifest.flows.iter().map(|f| f.id.clone()).collect()
3797        } else {
3798            manifest.entry_flows.clone()
3799        };
3800        entry_flows.retain(|id| !id.is_empty());
3801        Ok(Self {
3802            pack_id: manifest.pack_id,
3803            version: manifest.version,
3804            entry_flows,
3805            secret_requirements: manifest.secret_requirements,
3806        })
3807    }
3808
3809    pub fn fallback(path: &Path) -> Self {
3810        let pack_id = path
3811            .file_stem()
3812            .map(|s| s.to_string_lossy().into_owned())
3813            .unwrap_or_else(|| "unknown-pack".to_string());
3814        Self {
3815            pack_id,
3816            version: "0.0.0".to_string(),
3817            entry_flows: Vec::new(),
3818            secret_requirements: Vec::new(),
3819        }
3820    }
3821
3822    pub fn from_manifest(manifest: &greentic_types::PackManifest) -> Self {
3823        let entry_flows = manifest
3824            .flows
3825            .iter()
3826            .map(|flow| flow.id.as_str().to_string())
3827            .collect::<Vec<_>>();
3828        Self {
3829            pack_id: manifest.pack_id.as_str().to_string(),
3830            version: manifest.version.to_string(),
3831            entry_flows,
3832            secret_requirements: manifest.secret_requirements.clone(),
3833        }
3834    }
3835}