Skip to main content

greentic_runner_desktop/
lib.rs

1use anyhow::{Context, Result, anyhow};
2use greentic_pack::reader::open_pack;
3use greentic_runner_host::RunnerWasiPolicy;
4use greentic_runner_host::config::{
5    FlowRetryConfig, HostConfig, OperatorPolicy, RateLimits, SecretsPolicy, StateStorePolicy,
6    WebhookPolicy,
7};
8use greentic_runner_host::pack::{ComponentResolution, FlowDescriptor, PackMetadata, PackRuntime};
9use greentic_runner_host::runner::engine::{ExecutionObserver, FlowContext, FlowEngine, NodeEvent};
10pub use greentic_runner_host::runner::mocks::{
11    HttpMock, HttpMockMode, KvMock, MocksConfig, SecretsMock, TelemetryMock, TimeMock, ToolsMock,
12};
13use greentic_runner_host::runner::mocks::{MockEventSink, MockLayer};
14use greentic_runner_host::secrets::{DynSecretsManager, default_manager};
15use greentic_runner_host::storage::{new_session_store, new_state_store};
16use greentic_runner_host::trace::TraceConfig;
17use greentic_runner_host::validate::ValidationConfig;
18use parking_lot::Mutex;
19use runner_core::normalize_under_root;
20use serde::{Deserialize, Serialize};
21use serde_json::{Map as JsonMap, Value, json};
22use std::collections::{BTreeMap, HashMap};
23use std::fmt;
24use std::fs::{self, File};
25use std::io::{BufWriter, Write};
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::time::Instant;
29use time::OffsetDateTime;
30use time::format_description::well_known::Rfc3339;
31use tokio::runtime::Runtime;
32use tracing::{info, warn};
33use uuid::Uuid;
34
35const PROVIDER_ID_DEV: &str = "greentic-dev";
36
37/// Hook invoked for each transcript event.
38pub type TranscriptHook = Arc<dyn Fn(&Value) + Send + Sync>;
39
40/// Configuration for emitting OTLP telemetry.
41#[derive(Clone, Debug, Serialize, Deserialize, Default)]
42pub struct OtlpHook {
43    pub endpoint: String,
44    #[serde(default)]
45    pub headers: Vec<(String, String)>,
46    #[serde(default)]
47    pub sample_all: bool,
48}
49
50/// Execution profile.
51#[derive(Clone, Debug)]
52#[non_exhaustive]
53pub enum Profile {
54    Dev(DevProfile),
55}
56
57impl Default for Profile {
58    fn default() -> Self {
59        Self::Dev(DevProfile::default())
60    }
61}
62
63/// Tunables for the developer profile.
64#[derive(Clone, Debug)]
65pub struct DevProfile {
66    pub tenant_id: String,
67    pub team_id: String,
68    pub user_id: String,
69    pub max_node_wall_time_ms: u64,
70    pub max_run_wall_time_ms: u64,
71}
72
73impl Default for DevProfile {
74    fn default() -> Self {
75        Self {
76            tenant_id: "local-dev".to_string(),
77            team_id: "default".to_string(),
78            user_id: "developer".to_string(),
79            max_node_wall_time_ms: 30_000,
80            max_run_wall_time_ms: 600_000,
81        }
82    }
83}
84
85/// Tenant context overrides supplied by callers.
86#[derive(Clone, Debug, Default)]
87pub struct TenantContext {
88    pub tenant_id: Option<String>,
89    pub team_id: Option<String>,
90    pub user_id: Option<String>,
91    pub session_id: Option<String>,
92}
93
94impl TenantContext {
95    pub fn default_local() -> Self {
96        Self {
97            tenant_id: Some("local-dev".into()),
98            team_id: Some("default".into()),
99            user_id: Some("developer".into()),
100            session_id: None,
101        }
102    }
103}
104
105/// User supplied options for a run.
106#[derive(Clone)]
107pub struct RunOptions {
108    pub profile: Profile,
109    pub entry_flow: Option<String>,
110    pub input: Value,
111    pub ctx: TenantContext,
112    pub transcript: Option<TranscriptHook>,
113    pub otlp: Option<OtlpHook>,
114    pub mocks: MocksConfig,
115    pub artifacts_dir: Option<PathBuf>,
116    pub signing: SigningPolicy,
117    pub components_dir: Option<PathBuf>,
118    pub components_map: HashMap<String, PathBuf>,
119    pub dist_offline: bool,
120    pub dist_cache_dir: Option<PathBuf>,
121    pub allow_missing_hash: bool,
122    /// Optional secrets manager override used by embedded runtimes that need
123    /// to share a caller-owned secrets backend with the runner.
124    pub secrets_manager: Option<DynSecretsManager>,
125    /// Optional cross-pack resolver for `provider.invoke` nodes that reference
126    /// providers in other packs (e.g., OAuth broker from a capability pack).
127    pub cross_pack_resolver:
128        Option<Arc<dyn greentic_runner_host::runner::engine::CrossPackResolver>>,
129    /// Directory where suspended `FlowSnapshot`s are persisted between
130    /// activities so a conversational flow can resume at the node the
131    /// previous run paused at (instead of restarting from the entry).
132    ///
133    /// Snapshots are keyed by `ctx.session_id`, so the caller must populate
134    /// `ctx.session_id` with a stable per-conversation identifier (e.g.
135    /// `{tenant}:{flow}:{conversation_id}`) for resume to engage. When
136    /// either this directory or `session_id` is missing, the runner
137    /// behaves exactly as before: a `FlowExecution::waiting` result is
138    /// surfaced as an `Err` and the snapshot is dropped.
139    pub session_state_dir: Option<PathBuf>,
140}
141
142impl Default for RunOptions {
143    fn default() -> Self {
144        desktop_defaults()
145    }
146}
147
148impl fmt::Debug for RunOptions {
149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150        f.debug_struct("RunOptions")
151            .field("profile", &self.profile)
152            .field("entry_flow", &self.entry_flow)
153            .field("input", &self.input)
154            .field("ctx", &self.ctx)
155            .field("transcript", &self.transcript.is_some())
156            .field("otlp", &self.otlp)
157            .field("mocks", &self.mocks)
158            .field("artifacts_dir", &self.artifacts_dir)
159            .field("signing", &self.signing)
160            .field("components_dir", &self.components_dir)
161            .field("components_map_len", &self.components_map.len())
162            .field("dist_offline", &self.dist_offline)
163            .field("dist_cache_dir", &self.dist_cache_dir)
164            .field("allow_missing_hash", &self.allow_missing_hash)
165            .field("secrets_manager", &self.secrets_manager.is_some())
166            .finish()
167    }
168}
169
170#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
171pub enum SigningPolicy {
172    Strict,
173    DevOk,
174}
175
176/// Convenience builder that retains a baseline `RunOptions`.
177#[derive(Clone, Debug)]
178pub struct Runner {
179    base: RunOptions,
180}
181
182impl Runner {
183    pub fn new() -> Self {
184        Self {
185            base: desktop_defaults(),
186        }
187    }
188
189    pub fn profile(mut self, profile: Profile) -> Self {
190        self.base.profile = profile;
191        self
192    }
193
194    pub fn with_mocks(mut self, mocks: MocksConfig) -> Self {
195        self.base.mocks = mocks;
196        self
197    }
198
199    pub fn configure(mut self, f: impl FnOnce(&mut RunOptions)) -> Self {
200        f(&mut self.base);
201        self
202    }
203
204    pub fn run_pack<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
205        run_pack_with_options(pack_path, self.base.clone())
206    }
207
208    pub fn run_pack_with<P: AsRef<Path>>(
209        &self,
210        pack_path: P,
211        f: impl FnOnce(&mut RunOptions),
212    ) -> Result<RunResult> {
213        let mut opts = self.base.clone();
214        f(&mut opts);
215        run_pack_with_options(pack_path, opts)
216    }
217
218    pub async fn run_pack_async<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
219        run_pack_with_options_async(pack_path, self.base.clone()).await
220    }
221
222    pub async fn run_pack_with_async<P: AsRef<Path>>(
223        &self,
224        pack_path: P,
225        f: impl FnOnce(&mut RunOptions),
226    ) -> Result<RunResult> {
227        let mut opts = self.base.clone();
228        f(&mut opts);
229        run_pack_with_options_async(pack_path, opts).await
230    }
231}
232
233impl Default for Runner {
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239/// Execute a pack with the provided options.
240pub fn run_pack_with_options<P: AsRef<Path>>(pack_path: P, opts: RunOptions) -> Result<RunResult> {
241    let runtime = Runtime::new().context("failed to create tokio runtime")?;
242    runtime.block_on(run_pack_async(pack_path.as_ref(), opts))
243}
244
245/// Execute a pack with the provided options using the caller's async runtime.
246pub async fn run_pack_with_options_async<P: AsRef<Path>>(
247    pack_path: P,
248    opts: RunOptions,
249) -> Result<RunResult> {
250    run_pack_async(pack_path.as_ref(), opts).await
251}
252
253/// Reasonable defaults for local desktop invocations.
254pub fn desktop_defaults() -> RunOptions {
255    let otlp = std::env::var("OTLP_ENDPOINT")
256        .ok()
257        .map(|endpoint| OtlpHook {
258            endpoint,
259            headers: Vec::new(),
260            sample_all: true,
261        });
262    RunOptions {
263        profile: Profile::Dev(DevProfile::default()),
264        entry_flow: None,
265        input: json!({}),
266        ctx: TenantContext::default_local(),
267        transcript: None,
268        otlp,
269        mocks: MocksConfig::default(),
270        artifacts_dir: None,
271        signing: SigningPolicy::DevOk,
272        components_dir: None,
273        components_map: HashMap::new(),
274        dist_offline: false,
275        dist_cache_dir: None,
276        allow_missing_hash: false,
277        secrets_manager: None,
278        cross_pack_resolver: None,
279        session_state_dir: None,
280    }
281}
282
283fn resolve_secrets_manager(opts: &RunOptions) -> Result<DynSecretsManager> {
284    if let Some(manager) = opts.secrets_manager.clone() {
285        Ok(manager)
286    } else {
287        default_manager().context("failed to initialise secrets backend")
288    }
289}
290
291async fn run_pack_async(pack_path: &Path, opts: RunOptions) -> Result<RunResult> {
292    let pack_path = normalize_pack_path(pack_path)?;
293    let resolved_profile = resolve_profile(&opts.profile, &opts.ctx);
294    if let Some(otlp) = &opts.otlp {
295        apply_otlp_hook(otlp);
296    }
297
298    let directories = prepare_run_dirs(opts.artifacts_dir.clone())?;
299    info!(run_dir = %directories.root.display(), "prepared desktop run directory");
300
301    let mock_layer = Arc::new(MockLayer::new(opts.mocks.clone(), &directories.root)?);
302
303    let recorder = Arc::new(RunRecorder::new(
304        directories.clone(),
305        &resolved_profile,
306        None,
307        PackMetadata::fallback(&pack_path),
308        opts.transcript.clone(),
309    )?);
310
311    let mock_sink: Arc<dyn MockEventSink> = recorder.clone();
312    mock_layer.register_sink(mock_sink);
313
314    // Collect only the `host.http.client` booleans from the component manifest
315    // rather than cloning full `ComponentManifest` values — we just need to know
316    // whether any component opted in.
317    let mut pack_http_flags: Vec<bool> = Vec::new();
318    if pack_path.is_file() {
319        match open_pack(&pack_path, to_reader_policy(opts.signing)) {
320            Ok(load) => {
321                let meta = &load.manifest.meta;
322                recorder.update_pack_metadata(PackMetadata {
323                    pack_id: meta.pack_id.clone(),
324                    version: meta.version.to_string(),
325                    entry_flows: meta.entry_flows.clone(),
326                    secret_requirements: Vec::new(),
327                });
328                if let Some(manifest) = load.gpack_manifest.as_ref() {
329                    pack_http_flags = manifest
330                        .components
331                        .iter()
332                        .map(|component| {
333                            component
334                                .capabilities
335                                .host
336                                .http
337                                .as_ref()
338                                .is_some_and(|http| http.client)
339                        })
340                        .collect();
341                }
342            }
343            Err(err) => {
344                recorder.record_verify_event("error", &err.message)?;
345                if opts.signing == SigningPolicy::DevOk && is_signature_error(&err.message) {
346                    warn!(error = %err.message, "continuing despite signature error (dev policy)");
347                } else {
348                    return Err(anyhow!("pack verification failed: {}", err.message));
349                }
350            }
351        }
352    } else {
353        tracing::debug!(
354            path = %pack_path.display(),
355            "skipping pack verification for directory input"
356        );
357    }
358
359    let kill_switch = desktop_http_kill_switch_active();
360    let http_enabled = derive_http_enabled(&pack_http_flags, kill_switch);
361    let http_component_count = pack_http_flags.iter().filter(|flag| **flag).count();
362    if http_enabled {
363        info!(
364            total_component_count = pack_http_flags.len(),
365            http_component_count,
366            "enabling HTTP client gate based on component manifest capabilities"
367        );
368    } else if kill_switch {
369        info!(
370            env_var = DESKTOP_HTTP_DISABLE_ENV,
371            "HTTP client gate forced off by desktop kill-switch"
372        );
373    } else {
374        tracing::debug!(
375            total_component_count = pack_http_flags.len(),
376            "HTTP client gate disabled: no component manifest declares host.http.client"
377        );
378    }
379    let host_config = Arc::new(build_host_config(
380        &resolved_profile,
381        &directories,
382        http_enabled,
383    ));
384    let mut component_resolution = ComponentResolution::default();
385    if let Some(dir) = opts.components_dir.clone() {
386        component_resolution.materialized_root = Some(dir);
387    } else if pack_path.is_dir() {
388        component_resolution.materialized_root = Some(pack_path.clone());
389    }
390    component_resolution.overrides = opts.components_map.clone();
391    component_resolution.dist_offline = opts.dist_offline;
392    component_resolution.dist_cache_dir = opts.dist_cache_dir.clone();
393    component_resolution.allow_missing_hash = opts.allow_missing_hash;
394    let archive_source = if pack_path
395        .extension()
396        .and_then(|ext| ext.to_str())
397        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
398        .unwrap_or(false)
399    {
400        Some(&pack_path)
401    } else {
402        None
403    };
404
405    let session_store = new_session_store();
406    let state_store = new_state_store();
407    let secrets_manager = resolve_secrets_manager(&opts)?;
408    let pack = Arc::new(
409        PackRuntime::load(
410            &pack_path,
411            Arc::clone(&host_config),
412            Some(Arc::clone(&mock_layer)),
413            archive_source.map(|p| p as &Path),
414            Some(Arc::clone(&session_store)),
415            Some(Arc::clone(&state_store)),
416            Arc::new(RunnerWasiPolicy::default()),
417            secrets_manager,
418            host_config.oauth_broker_config(),
419            false,
420            component_resolution,
421        )
422        .await
423        .with_context(|| format!("failed to load pack {}", pack_path.display()))?,
424    );
425    recorder.update_pack_metadata(pack.metadata().clone());
426
427    let flows = pack
428        .list_flows()
429        .await
430        .context("failed to enumerate flows")?;
431    let entry_flow_id = resolve_entry_flow(opts.entry_flow.clone(), pack.metadata(), &flows)?;
432    recorder.set_flow_id(&entry_flow_id);
433
434    let mut engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&host_config))
435        .await
436        .context("failed to prime flow engine")?;
437    if let Some(resolver) = opts.cross_pack_resolver.clone() {
438        engine.set_cross_pack_resolver(resolver);
439    }
440
441    let started_at = OffsetDateTime::now_utc();
442    let tenant_str = host_config.tenant.clone();
443    let session_id_owned = resolved_profile.session_id.clone();
444    let provider_id_owned = resolved_profile.provider_id.clone();
445    let recorder_ref: &RunRecorder = &recorder;
446    let mock_ref: &MockLayer = &mock_layer;
447    let ctx = FlowContext {
448        tenant: &tenant_str,
449        pack_id: pack.metadata().pack_id.as_str(),
450        flow_id: &entry_flow_id,
451        node_id: None,
452        tool: None,
453        action: Some("run_pack"),
454        session_id: Some(session_id_owned.as_str()),
455        provider_id: Some(provider_id_owned.as_str()),
456        retry_config: host_config.retry_config().into(),
457        attempt: 1,
458        observer: Some(recorder_ref),
459        mocks: Some(mock_ref),
460    };
461
462    // Resume support: if the caller wired up a session_state_dir AND ctx
463    // carries a session_id, load any persisted snapshot for that session
464    // and resume the engine from there. Without either signal we fall
465    // back to a fresh execution at the flow entry.
466    let session_snapshot_path = session_snapshot_file(
467        opts.session_state_dir.as_deref(),
468        resolved_profile.session_id.as_str(),
469    );
470    let resume_snapshot = session_snapshot_path
471        .as_deref()
472        .and_then(load_session_snapshot);
473
474    let execution = if let Some(snapshot) = resume_snapshot {
475        engine.resume(ctx, snapshot, opts.input.clone()).await
476    } else {
477        engine.execute(ctx, opts.input.clone()).await
478    };
479    let finished_at = OffsetDateTime::now_utc();
480
481    let status = match execution {
482        Ok(result) => match result.status {
483            greentic_runner_host::runner::engine::FlowStatus::Completed => {
484                // Flow ran to a real terminator — discard any stale resume
485                // snapshot so the next activity starts fresh at the entry.
486                if let Some(path) = session_snapshot_path.as_deref() {
487                    let _ = std::fs::remove_file(path);
488                }
489                RunCompletion::Ok
490            }
491            greentic_runner_host::runner::engine::FlowStatus::Waiting(wait) => {
492                let reason = wait
493                    .reason
494                    .clone()
495                    .unwrap_or_else(|| "flow paused unexpectedly".to_string());
496                if let Some(path) = session_snapshot_path.as_deref() {
497                    if let Err(err) = save_session_snapshot(path, &wait.snapshot) {
498                        // Persisting the snapshot is best-effort — losing
499                        // it just means the next activity restarts at the
500                        // entry, which is the pre-fix behaviour. Surface
501                        // the failure to the operator log via the run
502                        // result error and continue.
503                        RunCompletion::Err(anyhow::anyhow!(
504                            "{reason} (and failed to persist resume snapshot to {}: {err})",
505                            path.display()
506                        ))
507                    } else {
508                        // Successfully paused. Surface the run as Ok so
509                        // greentic-start can emit the rendered card and
510                        // wait for the user's next submission.
511                        RunCompletion::Ok
512                    }
513                } else {
514                    // No persistence configured — preserve the legacy
515                    // behaviour (treat pause as an error so the caller at
516                    // least sees that the run did not complete).
517                    RunCompletion::Err(anyhow::anyhow!(reason))
518                }
519            }
520        },
521        Err(err) => RunCompletion::Err(err),
522    };
523
524    let result = recorder.finalise(status, started_at, finished_at)?;
525
526    let run_json_path = directories.root.join("run.json");
527    fs::write(&run_json_path, serde_json::to_vec_pretty(&result)?)
528        .with_context(|| format!("failed to write run summary {}", run_json_path.display()))?;
529
530    Ok(result)
531}
532
533/// Resolve the snapshot file path for a given session id.
534///
535/// Returns `None` if the caller didn't wire up `session_state_dir` or the
536/// session_id is empty/non-resumable — both cases mean "no persistence,
537/// run with fresh state".
538fn session_snapshot_file(session_state_dir: Option<&Path>, session_id: &str) -> Option<PathBuf> {
539    let dir = session_state_dir?;
540    if session_id.is_empty() {
541        return None;
542    }
543    let safe_id: String = session_id
544        .chars()
545        .map(|c| {
546            if c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.' || c == ':' {
547                c
548            } else {
549                '_'
550            }
551        })
552        .collect();
553    Some(dir.join(format!("{safe_id}.snapshot.json")))
554}
555
556/// Load a persisted `FlowSnapshot` from disk if one exists. Returns `None`
557/// when the file is missing or unreadable — the caller falls back to a
558/// fresh execution in that case (which is also the pre-resume behaviour).
559fn load_session_snapshot(
560    path: &Path,
561) -> Option<greentic_runner_host::runner::engine::FlowSnapshot> {
562    let bytes = fs::read(path).ok()?;
563    serde_json::from_slice(&bytes).ok()
564}
565
566/// Persist a `FlowSnapshot` to disk for later resume. Errors propagate to
567/// the caller so the run can surface "we paused but lost the snapshot" as
568/// a hard error rather than silently looping back to the flow entry.
569fn save_session_snapshot(
570    path: &Path,
571    snapshot: &greentic_runner_host::runner::engine::FlowSnapshot,
572) -> Result<()> {
573    if let Some(parent) = path.parent() {
574        fs::create_dir_all(parent)
575            .with_context(|| format!("create session snapshot dir {}", parent.display()))?;
576    }
577    let bytes = serde_json::to_vec_pretty(snapshot)
578        .with_context(|| "serialize session snapshot for persistence")?;
579    fs::write(path, bytes).with_context(|| format!("write session snapshot {}", path.display()))?;
580    Ok(())
581}
582
583fn apply_otlp_hook(hook: &OtlpHook) {
584    info!(
585        endpoint = %hook.endpoint,
586        sample_all = hook.sample_all,
587        headers = %hook.headers.len(),
588        "OTLP hook requested (set OTEL_* env vars before invoking run_pack)"
589    );
590}
591
592fn normalize_pack_path(path: &Path) -> Result<PathBuf> {
593    if path.is_absolute() {
594        let parent = path
595            .parent()
596            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
597        let root = parent
598            .canonicalize()
599            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
600        let file = path
601            .file_name()
602            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
603        return normalize_under_root(&root, Path::new(file));
604    }
605
606    let cwd = std::env::current_dir().context("failed to resolve current directory")?;
607    let base = if let Some(parent) = path.parent() {
608        cwd.join(parent)
609    } else {
610        cwd
611    };
612    let root = base
613        .canonicalize()
614        .with_context(|| format!("failed to canonicalize {}", base.display()))?;
615    let file = path
616        .file_name()
617        .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
618    normalize_under_root(&root, Path::new(file))
619}
620
621fn prepare_run_dirs(root_override: Option<PathBuf>) -> Result<RunDirectories> {
622    let root = if let Some(dir) = root_override {
623        dir
624    } else {
625        let timestamp = OffsetDateTime::now_utc()
626            .format(&Rfc3339)
627            .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
628            .replace(':', "-");
629        let short_id = Uuid::new_v4()
630            .to_string()
631            .chars()
632            .take(6)
633            .collect::<String>();
634        PathBuf::from(".greentic")
635            .join("runs")
636            .join(format!("{timestamp}_{short_id}"))
637    };
638
639    fs::create_dir_all(&root).with_context(|| format!("failed to create {}", root.display()))?;
640    let logs = root.join("logs");
641    let resolved = root.join("resolved_config");
642    fs::create_dir_all(&logs).with_context(|| format!("failed to create {}", logs.display()))?;
643    fs::create_dir_all(&resolved)
644        .with_context(|| format!("failed to create {}", resolved.display()))?;
645
646    Ok(RunDirectories {
647        root,
648        logs,
649        resolved,
650    })
651}
652
653fn resolve_entry_flow(
654    override_id: Option<String>,
655    metadata: &PackMetadata,
656    flows: &[FlowDescriptor],
657) -> Result<String> {
658    if let Some(flow) = override_id {
659        return Ok(flow);
660    }
661    if let Some(first) = metadata.entry_flows.first() {
662        return Ok(first.clone());
663    }
664    flows
665        .first()
666        .map(|f| f.id.clone())
667        .ok_or_else(|| anyhow!("pack does not declare any flows"))
668}
669
670fn is_signature_error(message: &str) -> bool {
671    message.to_ascii_lowercase().contains("signature")
672}
673
674fn to_reader_policy(policy: SigningPolicy) -> greentic_pack::reader::SigningPolicy {
675    match policy {
676        SigningPolicy::Strict => greentic_pack::reader::SigningPolicy::Strict,
677        SigningPolicy::DevOk => greentic_pack::reader::SigningPolicy::DevOk,
678    }
679}
680
681fn resolve_profile(profile: &Profile, ctx: &TenantContext) -> ResolvedProfile {
682    match profile {
683        Profile::Dev(dev) => ResolvedProfile {
684            tenant_id: ctx
685                .tenant_id
686                .clone()
687                .unwrap_or_else(|| dev.tenant_id.clone()),
688            team_id: ctx.team_id.clone().unwrap_or_else(|| dev.team_id.clone()),
689            user_id: ctx.user_id.clone().unwrap_or_else(|| dev.user_id.clone()),
690            session_id: ctx
691                .session_id
692                .clone()
693                .unwrap_or_else(|| Uuid::new_v4().to_string()),
694            provider_id: PROVIDER_ID_DEV.to_string(),
695            max_node_wall_time_ms: dev.max_node_wall_time_ms,
696            max_run_wall_time_ms: dev.max_run_wall_time_ms,
697        },
698    }
699}
700
701/// Environment variable that forces the HTTP client gate off regardless of
702/// what component manifests declare. Operators set this to `1` / `true` to
703/// disable outbound HTTP as a kill-switch when running the desktop profile.
704const DESKTOP_HTTP_DISABLE_ENV: &str = "GREENTIC_DESKTOP_HTTP_DISABLE";
705
706/// Derive whether the desktop profile must enable the HTTP client gate.
707///
708/// Returns `true` when at least one component flag is `true` and the
709/// kill-switch is not active. Packs opt in to HTTP by declaring
710/// `capabilities.host.http.client: true`; operators can force the gate off
711/// via [`DESKTOP_HTTP_DISABLE_ENV`] regardless of what the manifest says.
712fn derive_http_enabled(component_http_flags: &[bool], kill_switch: bool) -> bool {
713    !kill_switch && component_http_flags.iter().any(|flag| *flag)
714}
715
716/// Parse the raw value of [`DESKTOP_HTTP_DISABLE_ENV`] into a kill-switch bool.
717///
718/// Truthy values (`1`, `true`, `yes`, `on`, case- and whitespace-insensitive)
719/// activate the kill-switch. `None` or any other value leaves it inactive so
720/// the gate remains controlled by the component manifest.
721fn parse_kill_switch(raw: Option<&str>) -> bool {
722    matches!(
723        raw.map(|value| value.trim().to_ascii_lowercase())
724            .as_deref(),
725        Some("1" | "true" | "yes" | "on")
726    )
727}
728
729/// Read [`DESKTOP_HTTP_DISABLE_ENV`] from the process environment and
730/// interpret it via [`parse_kill_switch`].
731fn desktop_http_kill_switch_active() -> bool {
732    parse_kill_switch(std::env::var(DESKTOP_HTTP_DISABLE_ENV).ok().as_deref())
733}
734
735fn build_host_config(
736    profile: &ResolvedProfile,
737    dirs: &RunDirectories,
738    http_enabled: bool,
739) -> HostConfig {
740    HostConfig {
741        tenant: profile.tenant_id.clone(),
742        bindings_path: dirs.resolved.join("dev.bindings.yaml"),
743        flow_type_bindings: HashMap::new(),
744        rate_limits: RateLimits::default(),
745        retry: FlowRetryConfig::default(),
746        http_enabled,
747        secrets_policy: SecretsPolicy::allow_all(),
748        state_store_policy: StateStorePolicy::default(),
749        webhook_policy: WebhookPolicy::default(),
750        timers: Vec::new(),
751        oauth: None,
752        mocks: None,
753        pack_bindings: Vec::new(),
754        env_passthrough: Vec::new(),
755        trace: TraceConfig::from_env(),
756        validation: ValidationConfig::from_env(),
757        operator_policy: OperatorPolicy::allow_all(),
758        fast2flow: Default::default(),
759    }
760}
761
762#[allow(dead_code)]
763#[derive(Clone, Debug)]
764struct ResolvedProfile {
765    tenant_id: String,
766    team_id: String,
767    user_id: String,
768    session_id: String,
769    provider_id: String,
770    max_node_wall_time_ms: u64,
771    max_run_wall_time_ms: u64,
772}
773
774#[derive(Clone, Debug, Serialize, Deserialize)]
775pub struct RunResult {
776    pub session_id: String,
777    pub pack_id: String,
778    pub pack_version: String,
779    pub flow_id: String,
780    pub started_at_utc: String,
781    pub finished_at_utc: String,
782    pub status: RunStatus,
783    pub error: Option<String>,
784    pub node_summaries: Vec<NodeSummary>,
785    pub failures: BTreeMap<String, NodeFailure>,
786    pub artifacts_dir: PathBuf,
787}
788
789#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
790pub enum RunStatus {
791    Success,
792    PartialFailure,
793    Failure,
794}
795
796#[derive(Clone, Debug, Serialize, Deserialize)]
797pub struct NodeSummary {
798    pub node_id: String,
799    pub component: String,
800    pub status: NodeStatus,
801    pub duration_ms: u64,
802}
803
804#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
805pub enum NodeStatus {
806    Ok,
807    Skipped,
808    Error,
809}
810
811#[derive(Clone, Debug, Serialize, Deserialize)]
812pub struct NodeFailure {
813    pub code: String,
814    pub message: String,
815    pub details: Value,
816    pub transcript_offsets: (u64, u64),
817    pub log_paths: Vec<PathBuf>,
818}
819
820#[derive(Clone)]
821struct RunDirectories {
822    root: PathBuf,
823    logs: PathBuf,
824    resolved: PathBuf,
825}
826
827enum RunCompletion {
828    Ok,
829    Err(anyhow::Error),
830}
831
832struct RunRecorder {
833    directories: RunDirectories,
834    profile: ResolvedProfile,
835    flow_id: Mutex<String>,
836    pack_meta: Mutex<PackMetadata>,
837    transcript: Mutex<TranscriptWriter>,
838    state: Mutex<RunRecorderState>,
839}
840
841impl RunRecorder {
842    fn new(
843        dirs: RunDirectories,
844        profile: &ResolvedProfile,
845        flow_id: Option<String>,
846        pack_meta: PackMetadata,
847        hook: Option<TranscriptHook>,
848    ) -> Result<Self> {
849        let transcript_path = dirs.root.join("transcript.jsonl");
850        let file = File::create(&transcript_path)
851            .with_context(|| format!("failed to open {}", transcript_path.display()))?;
852        Ok(Self {
853            directories: dirs,
854            profile: profile.clone(),
855            flow_id: Mutex::new(flow_id.unwrap_or_else(|| "unknown".into())),
856            pack_meta: Mutex::new(pack_meta),
857            transcript: Mutex::new(TranscriptWriter::new(BufWriter::new(file), hook)),
858            state: Mutex::new(RunRecorderState::default()),
859        })
860    }
861
862    fn finalise(
863        &self,
864        completion: RunCompletion,
865        started_at: OffsetDateTime,
866        finished_at: OffsetDateTime,
867    ) -> Result<RunResult> {
868        let status = match &completion {
869            RunCompletion::Ok => RunStatus::Success,
870            RunCompletion::Err(_) => RunStatus::Failure,
871        };
872
873        let mut runtime_error = None;
874        if let RunCompletion::Err(err) = completion {
875            warn!(error = %err, "pack execution failed");
876            eprintln!("pack execution failed: {err}");
877            runtime_error = Some(err.to_string());
878        }
879
880        let started = started_at
881            .format(&Rfc3339)
882            .unwrap_or_else(|_| started_at.to_string());
883        let finished = finished_at
884            .format(&Rfc3339)
885            .unwrap_or_else(|_| finished_at.to_string());
886
887        let state = self.state.lock();
888        let mut summaries = Vec::new();
889        let mut failures = BTreeMap::new();
890        for node_id in &state.order {
891            if let Some(record) = state.nodes.get(node_id) {
892                let duration = record.duration_ms.unwrap_or(0);
893                summaries.push(NodeSummary {
894                    node_id: node_id.clone(),
895                    component: record.component.clone(),
896                    status: record.status.clone(),
897                    duration_ms: duration,
898                });
899                if record.status == NodeStatus::Error {
900                    let start_offset = record.transcript_start.unwrap_or(0);
901                    let err_offset = record.transcript_error.unwrap_or(start_offset);
902                    failures.insert(
903                        node_id.clone(),
904                        NodeFailure {
905                            code: record
906                                .failure_code
907                                .clone()
908                                .unwrap_or_else(|| "component-failed".into()),
909                            message: record
910                                .failure_message
911                                .clone()
912                                .unwrap_or_else(|| "node failed".into()),
913                            details: record
914                                .failure_details
915                                .clone()
916                                .unwrap_or_else(|| json!({ "node": node_id })),
917                            transcript_offsets: (start_offset, err_offset),
918                            log_paths: record.log_paths.clone(),
919                        },
920                    );
921                }
922            }
923        }
924
925        let mut final_status = status;
926        if final_status == RunStatus::Success && !failures.is_empty() {
927            final_status = RunStatus::PartialFailure;
928        }
929
930        if let Some(error) = &runtime_error {
931            failures.insert(
932                "_runtime".into(),
933                NodeFailure {
934                    code: "runtime-error".into(),
935                    message: error.clone(),
936                    details: json!({ "stage": "execute" }),
937                    transcript_offsets: (0, 0),
938                    log_paths: Vec::new(),
939                },
940            );
941        }
942
943        let pack_meta = self.pack_meta.lock();
944        let flow_id = self.flow_id.lock().clone();
945        Ok(RunResult {
946            session_id: self.profile.session_id.clone(),
947            pack_id: pack_meta.pack_id.clone(),
948            pack_version: pack_meta.version.clone(),
949            flow_id,
950            started_at_utc: started,
951            finished_at_utc: finished,
952            status: final_status,
953            error: runtime_error,
954            node_summaries: summaries,
955            failures,
956            artifacts_dir: self.directories.root.clone(),
957        })
958    }
959
960    fn set_flow_id(&self, flow_id: &str) {
961        *self.flow_id.lock() = flow_id.to_string();
962    }
963
964    fn update_pack_metadata(&self, meta: PackMetadata) {
965        *self.pack_meta.lock() = meta;
966    }
967
968    fn current_flow_id(&self) -> String {
969        self.flow_id.lock().clone()
970    }
971
972    fn record_verify_event(&self, status: &str, message: &str) -> Result<()> {
973        let timestamp = OffsetDateTime::now_utc();
974        let event = json!({
975            "ts": timestamp
976                .format(&Rfc3339)
977                .unwrap_or_else(|_| timestamp.to_string()),
978            "session_id": self.profile.session_id,
979            "flow_id": self.current_flow_id(),
980            "node_id": Value::Null,
981            "component": "verify.pack",
982            "phase": "verify",
983            "status": status,
984            "inputs": Value::Null,
985            "outputs": Value::Null,
986            "error": json!({ "message": message }),
987            "metrics": Value::Null,
988            "schema_id": Value::Null,
989            "defaults_applied": Value::Null,
990            "redactions": Value::Array(Vec::new()),
991        });
992        self.transcript.lock().write(&event).map(|_| ())
993    }
994
995    fn write_mock_event(&self, capability: &str, provider: &str, payload: Value) -> Result<()> {
996        let timestamp = OffsetDateTime::now_utc();
997        let event = json!({
998            "ts": timestamp
999                .format(&Rfc3339)
1000                .unwrap_or_else(|_| timestamp.to_string()),
1001            "session_id": self.profile.session_id,
1002            "flow_id": self.current_flow_id(),
1003            "node_id": Value::Null,
1004            "component": format!("mock::{capability}"),
1005            "phase": "mock",
1006            "status": "ok",
1007            "inputs": json!({ "capability": capability, "provider": provider }),
1008            "outputs": payload,
1009            "error": Value::Null,
1010            "metrics": Value::Null,
1011            "schema_id": Value::Null,
1012            "defaults_applied": Value::Null,
1013            "redactions": Value::Array(Vec::new()),
1014        });
1015        self.transcript.lock().write(&event).map(|_| ())
1016    }
1017
1018    fn handle_node_start(&self, event: &NodeEvent<'_>) -> Result<()> {
1019        let timestamp = OffsetDateTime::now_utc();
1020        let (redacted_payload, redactions) = redact_value(event.payload, "$.inputs.payload");
1021        let inputs = json!({
1022            "payload": redacted_payload,
1023            "context": {
1024                "tenant_id": self.profile.tenant_id.as_str(),
1025                "team_id": self.profile.team_id.as_str(),
1026                "user_id": self.profile.user_id.as_str(),
1027            }
1028        });
1029        let flow_id = self.current_flow_id();
1030        let event_json = build_transcript_event(TranscriptEventArgs {
1031            profile: &self.profile,
1032            flow_id: &flow_id,
1033            node_id: event.node_id,
1034            component: &event.node.component,
1035            phase: "start",
1036            status: "ok",
1037            timestamp,
1038            inputs,
1039            outputs: Value::Null,
1040            error: Value::Null,
1041            redactions,
1042        });
1043        let (start_offset, _) = self.transcript.lock().write(&event_json)?;
1044
1045        let mut state = self.state.lock();
1046        let node_key = event.node_id.to_string();
1047        if !state.order.iter().any(|id| id == &node_key) {
1048            state.order.push(node_key.clone());
1049        }
1050        let entry = state.nodes.entry(node_key).or_insert_with(|| {
1051            NodeExecutionRecord::new(event.node.component.clone(), &self.directories)
1052        });
1053        entry.start_instant = Some(Instant::now());
1054        entry.status = NodeStatus::Ok;
1055        entry.transcript_start = Some(start_offset);
1056        Ok(())
1057    }
1058
1059    fn handle_node_end(&self, event: &NodeEvent<'_>, output: &Value) -> Result<()> {
1060        let timestamp = OffsetDateTime::now_utc();
1061        let (redacted_output, redactions) = redact_value(output, "$.outputs");
1062        let flow_id = self.current_flow_id();
1063        let event_json = build_transcript_event(TranscriptEventArgs {
1064            profile: &self.profile,
1065            flow_id: &flow_id,
1066            node_id: event.node_id,
1067            component: &event.node.component,
1068            phase: "end",
1069            status: "ok",
1070            timestamp,
1071            inputs: Value::Null,
1072            outputs: redacted_output,
1073            error: Value::Null,
1074            redactions,
1075        });
1076        self.transcript.lock().write(&event_json)?;
1077
1078        let mut state = self.state.lock();
1079        if let Some(entry) = state.nodes.get_mut(event.node_id)
1080            && let Some(started) = entry.start_instant.take()
1081        {
1082            entry.duration_ms = Some(started.elapsed().as_millis() as u64);
1083        }
1084        Ok(())
1085    }
1086
1087    fn handle_node_error(
1088        &self,
1089        event: &NodeEvent<'_>,
1090        error: &dyn std::error::Error,
1091    ) -> Result<()> {
1092        let timestamp = OffsetDateTime::now_utc();
1093        let error_message = error.to_string();
1094        let error_json = json!({
1095            "code": "component-failed",
1096            "message": error_message,
1097            "details": {
1098                "node": event.node_id,
1099            }
1100        });
1101        let flow_id = self.current_flow_id();
1102        let event_json = build_transcript_event(TranscriptEventArgs {
1103            profile: &self.profile,
1104            flow_id: &flow_id,
1105            node_id: event.node_id,
1106            component: &event.node.component,
1107            phase: "error",
1108            status: "error",
1109            timestamp,
1110            inputs: Value::Null,
1111            outputs: Value::Null,
1112            error: error_json.clone(),
1113            redactions: Vec::new(),
1114        });
1115        let (_, end_offset) = self.transcript.lock().write(&event_json)?;
1116
1117        let mut state = self.state.lock();
1118        if let Some(entry) = state.nodes.get_mut(event.node_id) {
1119            entry.status = NodeStatus::Error;
1120            if let Some(started) = entry.start_instant.take() {
1121                entry.duration_ms = Some(started.elapsed().as_millis() as u64);
1122            }
1123            entry.transcript_error = Some(end_offset);
1124            entry.failure_code = Some("component-failed".to_string());
1125            entry.failure_message = Some(error_message.clone());
1126            entry.failure_details = Some(error_json);
1127            let log_path = self
1128                .directories
1129                .logs
1130                .join(format!("{}.stderr.log", sanitize_id(event.node_id)));
1131            if let Ok(mut file) = File::create(&log_path) {
1132                let _ = writeln!(file, "{error_message}");
1133            }
1134            entry.log_paths.push(log_path);
1135        }
1136        Ok(())
1137    }
1138}
1139
1140impl ExecutionObserver for RunRecorder {
1141    fn on_node_start(&self, event: &NodeEvent<'_>) {
1142        if let Err(err) = self.handle_node_start(event) {
1143            warn!(node = event.node_id, error = %err, "failed to record node start");
1144        }
1145    }
1146
1147    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
1148        if let Err(err) = self.handle_node_end(event, output) {
1149            warn!(node = event.node_id, error = %err, "failed to record node end");
1150        }
1151    }
1152
1153    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
1154        if let Err(err) = self.handle_node_error(event, error) {
1155            warn!(node = event.node_id, error = %err, "failed to record node error");
1156        }
1157    }
1158}
1159
1160impl MockEventSink for RunRecorder {
1161    fn on_mock_event(&self, capability: &str, provider: &str, payload: &Value) {
1162        if let Err(err) = self.write_mock_event(capability, provider, payload.clone()) {
1163            warn!(?capability, ?provider, error = %err, "failed to record mock event");
1164        }
1165    }
1166}
1167
1168#[derive(Default)]
1169struct RunRecorderState {
1170    nodes: BTreeMap<String, NodeExecutionRecord>,
1171    order: Vec<String>,
1172}
1173
1174#[derive(Clone)]
1175struct NodeExecutionRecord {
1176    component: String,
1177    status: NodeStatus,
1178    duration_ms: Option<u64>,
1179    transcript_start: Option<u64>,
1180    transcript_error: Option<u64>,
1181    log_paths: Vec<PathBuf>,
1182    failure_code: Option<String>,
1183    failure_message: Option<String>,
1184    failure_details: Option<Value>,
1185    start_instant: Option<Instant>,
1186}
1187
1188impl NodeExecutionRecord {
1189    fn new(component: String, _dirs: &RunDirectories) -> Self {
1190        Self {
1191            component,
1192            status: NodeStatus::Ok,
1193            duration_ms: None,
1194            transcript_start: None,
1195            transcript_error: None,
1196            log_paths: Vec::new(),
1197            failure_code: None,
1198            failure_message: None,
1199            failure_details: None,
1200            start_instant: None,
1201        }
1202    }
1203}
1204
1205struct TranscriptWriter {
1206    writer: BufWriter<File>,
1207    offset: u64,
1208    hook: Option<TranscriptHook>,
1209}
1210
1211impl TranscriptWriter {
1212    fn new(writer: BufWriter<File>, hook: Option<TranscriptHook>) -> Self {
1213        Self {
1214            writer,
1215            offset: 0,
1216            hook,
1217        }
1218    }
1219
1220    fn write(&mut self, value: &Value) -> Result<(u64, u64)> {
1221        let line = serde_json::to_vec(value)?;
1222        let start = self.offset;
1223        self.writer.write_all(&line)?;
1224        self.writer.write_all(b"\n")?;
1225        self.writer.flush()?;
1226        self.offset += line.len() as u64 + 1;
1227        if let Some(hook) = &self.hook {
1228            hook(value);
1229        }
1230        Ok((start, self.offset))
1231    }
1232}
1233
1234struct TranscriptEventArgs<'a> {
1235    profile: &'a ResolvedProfile,
1236    flow_id: &'a str,
1237    node_id: &'a str,
1238    component: &'a str,
1239    phase: &'a str,
1240    status: &'a str,
1241    timestamp: OffsetDateTime,
1242    inputs: Value,
1243    outputs: Value,
1244    error: Value,
1245    redactions: Vec<String>,
1246}
1247
1248fn build_transcript_event(args: TranscriptEventArgs<'_>) -> Value {
1249    let ts = args
1250        .timestamp
1251        .format(&Rfc3339)
1252        .unwrap_or_else(|_| args.timestamp.to_string());
1253    json!({
1254        "ts": ts,
1255        "session_id": args.profile.session_id.as_str(),
1256        "flow_id": args.flow_id,
1257        "node_id": args.node_id,
1258        "component": args.component,
1259        "phase": args.phase,
1260        "status": args.status,
1261        "inputs": args.inputs,
1262        "outputs": args.outputs,
1263        "error": args.error,
1264        "metrics": {
1265            "duration_ms": null,
1266            "cpu_time_ms": null,
1267            "mem_peak_bytes": null,
1268        },
1269        "schema_id": Value::Null,
1270        "defaults_applied": Value::Array(Vec::new()),
1271        "redactions": args.redactions,
1272    })
1273}
1274
1275fn redact_value(value: &Value, base: &str) -> (Value, Vec<String>) {
1276    let mut paths = Vec::new();
1277    let redacted = redact_recursive(value, base, &mut paths);
1278    (redacted, paths)
1279}
1280
1281fn redact_recursive(value: &Value, path: &str, acc: &mut Vec<String>) -> Value {
1282    match value {
1283        Value::Object(map) => {
1284            let mut new_map = JsonMap::new();
1285            for (key, val) in map {
1286                let child_path = format!("{path}.{key}");
1287                if is_sensitive_key(key) {
1288                    acc.push(child_path);
1289                    new_map.insert(key.clone(), Value::String("__REDACTED__".into()));
1290                } else {
1291                    new_map.insert(key.clone(), redact_recursive(val, &child_path, acc));
1292                }
1293            }
1294            Value::Object(new_map)
1295        }
1296        Value::Array(items) => {
1297            let mut new_items = Vec::new();
1298            for (idx, item) in items.iter().enumerate() {
1299                let child_path = format!("{path}[{idx}]");
1300                new_items.push(redact_recursive(item, &child_path, acc));
1301            }
1302            Value::Array(new_items)
1303        }
1304        other => other.clone(),
1305    }
1306}
1307
1308fn is_sensitive_key(key: &str) -> bool {
1309    let lower = key.to_ascii_lowercase();
1310    const MARKERS: [&str; 5] = ["secret", "token", "password", "authorization", "cookie"];
1311    MARKERS.iter().any(|marker| lower.contains(marker))
1312}
1313
1314fn sanitize_id(value: &str) -> String {
1315    value
1316        .chars()
1317        .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
1318        .collect()
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323    use super::*;
1324    use std::sync::Arc;
1325
1326    use async_trait::async_trait;
1327    use greentic_runner_host::secrets::DynSecretsManager;
1328    use greentic_secrets_lib::{SecretError, SecretsManager};
1329    use serde_json::json;
1330    use tempfile::TempDir;
1331
1332    fn sample_metadata() -> PackMetadata {
1333        PackMetadata {
1334            pack_id: "pack.demo".into(),
1335            version: "1.0.0".into(),
1336            entry_flows: vec!["entry.flow".into()],
1337            secret_requirements: Vec::new(),
1338        }
1339    }
1340
1341    fn sample_profile() -> ResolvedProfile {
1342        ResolvedProfile {
1343            tenant_id: "tenant".into(),
1344            team_id: "team".into(),
1345            user_id: "user".into(),
1346            session_id: "session-1".into(),
1347            provider_id: PROVIDER_ID_DEV.into(),
1348            max_node_wall_time_ms: 1000,
1349            max_run_wall_time_ms: 2000,
1350        }
1351    }
1352
1353    struct NoopSecretsManager;
1354
1355    #[async_trait]
1356    impl SecretsManager for NoopSecretsManager {
1357        async fn read(&self, path: &str) -> Result<Vec<u8>, SecretError> {
1358            Err(SecretError::NotFound(path.to_string()))
1359        }
1360
1361        async fn write(&self, _path: &str, _bytes: &[u8]) -> Result<(), SecretError> {
1362            Ok(())
1363        }
1364
1365        async fn delete(&self, _path: &str) -> Result<(), SecretError> {
1366            Ok(())
1367        }
1368    }
1369
1370    #[test]
1371    fn resolve_entry_flow_prefers_override_then_metadata_then_flows() {
1372        let metadata = sample_metadata();
1373        let flows = vec![FlowDescriptor {
1374            id: "fallback.flow".into(),
1375            flow_type: "message".into(),
1376            pack_id: "pack.demo".into(),
1377            profile: "default".into(),
1378            version: "1.0.0".into(),
1379            description: None,
1380        }];
1381
1382        assert_eq!(
1383            resolve_entry_flow(Some("manual.flow".into()), &metadata, &flows).unwrap(),
1384            "manual.flow"
1385        );
1386        assert_eq!(
1387            resolve_entry_flow(None, &metadata, &flows).unwrap(),
1388            "entry.flow"
1389        );
1390        assert!(
1391            resolve_entry_flow(
1392                None,
1393                &PackMetadata {
1394                    entry_flows: Vec::new(),
1395                    ..metadata
1396                },
1397                &flows
1398            )
1399            .is_ok()
1400        );
1401    }
1402
1403    #[test]
1404    fn normalize_pack_path_accepts_relative_files_inside_cwd() {
1405        let temp = tempfile::tempdir_in(std::env::current_dir().expect("cwd")).expect("tempdir");
1406        let pack = temp.path().join("demo.gtpack");
1407        fs::write(&pack, b"pack").expect("pack file");
1408        let relative = pack
1409            .strip_prefix(std::env::current_dir().expect("cwd"))
1410            .expect("relative")
1411            .to_path_buf();
1412
1413        let normalized = normalize_pack_path(&relative).expect("normalized path");
1414
1415        assert_eq!(normalized, pack.canonicalize().expect("canonical"));
1416    }
1417
1418    #[test]
1419    fn prepare_run_dirs_creates_logs_and_resolved_subdirs() {
1420        let temp = TempDir::new().expect("tempdir");
1421        let root = temp.path().join("artifacts");
1422        let dirs = prepare_run_dirs(Some(root.clone())).expect("run dirs");
1423
1424        assert_eq!(dirs.root, root);
1425        assert!(dirs.logs.is_dir());
1426        assert!(dirs.resolved.is_dir());
1427    }
1428
1429    #[test]
1430    fn redact_value_masks_nested_sensitive_fields() {
1431        let (redacted, paths) = redact_value(
1432            &json!({
1433                "token": "abc",
1434                "nested": { "authorization_header": "secret" },
1435                "items": [{ "password_hint": "nope" }]
1436            }),
1437            "$",
1438        );
1439
1440        assert_eq!(redacted["token"], "__REDACTED__");
1441        assert_eq!(redacted["nested"]["authorization_header"], "__REDACTED__");
1442        assert_eq!(redacted["items"][0]["password_hint"], "__REDACTED__");
1443        assert!(paths.contains(&"$.token".to_string()));
1444        assert!(paths.contains(&"$.nested.authorization_header".to_string()));
1445    }
1446
1447    #[test]
1448    fn helper_functions_preserve_ids_and_transcript_shape() {
1449        assert_eq!(sanitize_id("flow demo/1"), "flow-demo-1");
1450        assert!(is_signature_error("Signature verification failed"));
1451        assert_eq!(
1452            to_reader_policy(SigningPolicy::DevOk),
1453            greentic_pack::reader::SigningPolicy::DevOk
1454        );
1455
1456        let profile = sample_profile();
1457        let event = build_transcript_event(TranscriptEventArgs {
1458            profile: &profile,
1459            flow_id: "flow.demo",
1460            node_id: "node.demo",
1461            component: "component.demo",
1462            phase: "invoke",
1463            status: "ok",
1464            timestamp: OffsetDateTime::now_utc(),
1465            inputs: json!({"input": true}),
1466            outputs: json!({"output": true}),
1467            error: Value::Null,
1468            redactions: vec!["$.token".into()],
1469        });
1470        assert_eq!(event["session_id"], "session-1");
1471        assert_eq!(event["flow_id"], "flow.demo");
1472        assert_eq!(event["redactions"][0], "$.token");
1473    }
1474
1475    #[test]
1476    fn resolve_profile_uses_context_overrides() {
1477        let profile = resolve_profile(
1478            &Profile::Dev(DevProfile::default()),
1479            &TenantContext {
1480                tenant_id: Some("tenant-x".into()),
1481                team_id: Some("team-x".into()),
1482                user_id: Some("user-x".into()),
1483                session_id: Some("session-x".into()),
1484            },
1485        );
1486
1487        assert_eq!(profile.tenant_id, "tenant-x");
1488        assert_eq!(profile.team_id, "team-x");
1489        assert_eq!(profile.user_id, "user-x");
1490        assert_eq!(profile.session_id, "session-x");
1491        assert_eq!(profile.provider_id, PROVIDER_ID_DEV);
1492    }
1493
1494    #[test]
1495    fn desktop_defaults_and_runner_builder_preserve_overrides() {
1496        let defaults = desktop_defaults();
1497        assert_eq!(defaults.signing, SigningPolicy::DevOk);
1498        assert_eq!(defaults.ctx.tenant_id.as_deref(), Some("local-dev"));
1499
1500        let runner = Runner::new()
1501            .with_mocks(MocksConfig {
1502                telemetry: Some(TelemetryMock),
1503                ..MocksConfig::default()
1504            })
1505            .configure(|opts| {
1506                opts.entry_flow = Some("entry.flow".into());
1507                opts.input = json!({"demo": true});
1508            });
1509
1510        let rendered = format!("{:?}", runner.base);
1511        assert!(rendered.contains("entry.flow"));
1512        assert!(runner.base.mocks.telemetry.is_some());
1513        assert_eq!(runner.base.input, json!({"demo": true}));
1514    }
1515
1516    #[test]
1517    fn resolve_entry_flow_errors_when_pack_has_no_flows() {
1518        let metadata = PackMetadata {
1519            entry_flows: Vec::new(),
1520            ..sample_metadata()
1521        };
1522        assert!(resolve_entry_flow(None, &metadata, &[]).is_err());
1523    }
1524
1525    #[test]
1526    fn build_host_config_enables_local_dev_defaults() {
1527        let temp = TempDir::new().expect("tempdir");
1528        let dirs = prepare_run_dirs(Some(temp.path().join("run"))).expect("dirs");
1529        let config = build_host_config(&sample_profile(), &dirs, false);
1530
1531        assert_eq!(config.tenant, "tenant");
1532        assert!(!config.http_enabled);
1533        assert!(config.secrets_policy.is_allowed("any.secret"));
1534        assert!(
1535            config
1536                .operator_policy
1537                .allows_provider(Some("provider"), "provider")
1538        );
1539    }
1540
1541    #[test]
1542    fn build_host_config_honours_http_enabled_flag() {
1543        let temp = TempDir::new().expect("tempdir");
1544        let dirs = prepare_run_dirs(Some(temp.path().join("run"))).expect("dirs");
1545        let enabled = build_host_config(&sample_profile(), &dirs, true);
1546        assert!(
1547            enabled.http_enabled,
1548            "http_enabled should propagate when derived from manifest"
1549        );
1550
1551        let disabled = build_host_config(&sample_profile(), &dirs, false);
1552        assert!(!disabled.http_enabled);
1553    }
1554
1555    #[test]
1556    fn derive_http_enabled_true_when_any_component_declares_http_client() {
1557        assert!(derive_http_enabled(&[false, true], false));
1558    }
1559
1560    #[test]
1561    fn derive_http_enabled_false_when_no_component_declares_http_client() {
1562        assert!(!derive_http_enabled(&[false, false], false));
1563    }
1564
1565    #[test]
1566    fn derive_http_enabled_false_for_empty_components() {
1567        assert!(!derive_http_enabled(&[], false));
1568    }
1569
1570    #[test]
1571    fn derive_http_enabled_kill_switch_forces_disable() {
1572        assert!(
1573            !derive_http_enabled(&[true], true),
1574            "kill-switch must override positive manifest declaration"
1575        );
1576    }
1577
1578    #[test]
1579    fn parse_kill_switch_matches_truthy_and_falsy_inputs() {
1580        // (raw input, expected kill-switch active)
1581        let cases: &[(Option<&str>, bool)] = &[
1582            (Some("1"), true),
1583            (Some("true"), true),
1584            (Some("YES"), true),
1585            (Some("  on  "), true),
1586            (Some("On"), true),
1587            (Some("0"), false),
1588            (Some("false"), false),
1589            (Some("bogus"), false),
1590            (Some(""), false),
1591            (None, false),
1592        ];
1593
1594        for (raw, expected) in cases {
1595            assert_eq!(
1596                parse_kill_switch(*raw),
1597                *expected,
1598                "parse_kill_switch({raw:?}) should be {expected}"
1599            );
1600        }
1601    }
1602
1603    #[test]
1604    fn redact_value_leaves_non_sensitive_payloads_unchanged() {
1605        let original = json!({"public": {"value": 1}});
1606        let (redacted, paths) = redact_value(&original, "$");
1607        assert_eq!(redacted, original);
1608        assert!(paths.is_empty());
1609    }
1610
1611    /// Regression: `save_session_snapshot` + `load_session_snapshot` must
1612    /// round-trip an arbitrary `FlowSnapshot` byte-for-byte through JSON so
1613    /// that resume sees exactly what was persisted at pause time. If
1614    /// serialization ever drops a field (e.g. `next_flow` or any
1615    /// `ExecutionState` member) the resumed run would silently restart at
1616    /// the entry — which is the exact bug this whole change set fixes.
1617    #[test]
1618    fn session_snapshot_round_trip_through_disk() {
1619        let tmp = tempfile::tempdir().expect("create temp dir");
1620        let snapshot_path = tmp.path().join("session-1.snapshot.json");
1621
1622        let snapshot_json = json!({
1623            "pack_id": "pack.demo",
1624            "flow_id": "flow.welcome",
1625            "next_node": "card-confirm",
1626            "state": {
1627                "entry": { "metadata": { "action": "confirm" } },
1628                "input": { "metadata": { "action": "confirm" } },
1629                "nodes": {},
1630                "egress": [],
1631                "redirect_count": 0
1632            }
1633        });
1634        let original: greentic_runner_host::runner::engine::FlowSnapshot =
1635            serde_json::from_value(snapshot_json.clone()).expect("deserialize seed snapshot");
1636
1637        save_session_snapshot(&snapshot_path, &original).expect("save snapshot");
1638        let reloaded = load_session_snapshot(&snapshot_path).expect("load snapshot");
1639
1640        let original_value = serde_json::to_value(&original).unwrap();
1641        let reloaded_value = serde_json::to_value(&reloaded).unwrap();
1642        assert_eq!(
1643            original_value, reloaded_value,
1644            "snapshot must round-trip byte-for-byte through disk"
1645        );
1646    }
1647
1648    #[test]
1649    fn session_snapshot_file_sanitizes_session_id() {
1650        let tmp = tempfile::tempdir().expect("create temp dir");
1651        let path = session_snapshot_file(Some(tmp.path()), "tenant/team:user-1.session")
1652            .expect("snapshot path");
1653        let file_name = path
1654            .file_name()
1655            .and_then(|n| n.to_str())
1656            .expect("file name");
1657        // `/` -> `_`, but `:`, `-`, `.` are preserved.
1658        assert_eq!(file_name, "tenant_team:user-1.session.snapshot.json");
1659        assert!(session_snapshot_file(None, "any").is_none());
1660        assert!(session_snapshot_file(Some(tmp.path()), "").is_none());
1661    }
1662
1663    #[test]
1664    fn resolve_secrets_manager_prefers_injected_override_even_in_prod() {
1665        let previous_env = std::env::var("GREENTIC_ENV").ok();
1666        unsafe {
1667            std::env::set_var("GREENTIC_ENV", "prod");
1668        }
1669
1670        let without_override = resolve_secrets_manager(&RunOptions::default());
1671        assert!(
1672            without_override.is_err(),
1673            "expected prod desktop default backend to be rejected"
1674        );
1675
1676        let injected: DynSecretsManager = Arc::new(NoopSecretsManager);
1677        let with_override = resolve_secrets_manager(&RunOptions {
1678            secrets_manager: Some(Arc::clone(&injected)),
1679            ..RunOptions::default()
1680        });
1681        assert!(
1682            with_override.is_ok(),
1683            "expected injected secrets manager to bypass desktop default backend"
1684        );
1685
1686        match previous_env {
1687            Some(value) => unsafe {
1688                std::env::set_var("GREENTIC_ENV", value);
1689            },
1690            None => unsafe {
1691                std::env::remove_var("GREENTIC_ENV");
1692            },
1693        }
1694    }
1695}