Skip to main content

greentic_runner_desktop/
lib.rs

1use anyhow::{Context, Result, anyhow};
2use greentic_pack::reader::open_pack;
3use greentic_runner_host::RunnerWasiPolicy;
4use greentic_runner_host::config::{
5    FlowRetryConfig, HostConfig, OperatorPolicy, RateLimits, SecretsPolicy, StateStorePolicy,
6    WebhookPolicy,
7};
8use greentic_runner_host::pack::{ComponentResolution, FlowDescriptor, PackMetadata, PackRuntime};
9use greentic_runner_host::runner::engine::{ExecutionObserver, FlowContext, FlowEngine, NodeEvent};
10pub use greentic_runner_host::runner::mocks::{
11    HttpMock, HttpMockMode, KvMock, MocksConfig, SecretsMock, TelemetryMock, TimeMock, ToolsMock,
12};
13use greentic_runner_host::runner::mocks::{MockEventSink, MockLayer};
14use greentic_runner_host::secrets::default_manager;
15use greentic_runner_host::storage::{new_session_store, new_state_store};
16use greentic_runner_host::trace::TraceConfig;
17use greentic_runner_host::validate::ValidationConfig;
18use parking_lot::Mutex;
19use runner_core::normalize_under_root;
20use serde::{Deserialize, Serialize};
21use serde_json::{Map as JsonMap, Value, json};
22use std::collections::{BTreeMap, HashMap};
23use std::fmt;
24use std::fs::{self, File};
25use std::io::{BufWriter, Write};
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::time::Instant;
29use time::OffsetDateTime;
30use time::format_description::well_known::Rfc3339;
31use tokio::runtime::Runtime;
32use tracing::{info, warn};
33use uuid::Uuid;
34
35const PROVIDER_ID_DEV: &str = "greentic-dev";
36
37/// Hook invoked for each transcript event.
38pub type TranscriptHook = Arc<dyn Fn(&Value) + Send + Sync>;
39
40/// Configuration for emitting OTLP telemetry.
41#[derive(Clone, Debug, Serialize, Deserialize, Default)]
42pub struct OtlpHook {
43    pub endpoint: String,
44    #[serde(default)]
45    pub headers: Vec<(String, String)>,
46    #[serde(default)]
47    pub sample_all: bool,
48}
49
50/// Execution profile.
51#[derive(Clone, Debug)]
52#[non_exhaustive]
53pub enum Profile {
54    Dev(DevProfile),
55}
56
57impl Default for Profile {
58    fn default() -> Self {
59        Self::Dev(DevProfile::default())
60    }
61}
62
63/// Tunables for the developer profile.
64#[derive(Clone, Debug)]
65pub struct DevProfile {
66    pub tenant_id: String,
67    pub team_id: String,
68    pub user_id: String,
69    pub max_node_wall_time_ms: u64,
70    pub max_run_wall_time_ms: u64,
71}
72
73impl Default for DevProfile {
74    fn default() -> Self {
75        Self {
76            tenant_id: "local-dev".to_string(),
77            team_id: "default".to_string(),
78            user_id: "developer".to_string(),
79            max_node_wall_time_ms: 30_000,
80            max_run_wall_time_ms: 600_000,
81        }
82    }
83}
84
85/// Tenant context overrides supplied by callers.
86#[derive(Clone, Debug, Default)]
87pub struct TenantContext {
88    pub tenant_id: Option<String>,
89    pub team_id: Option<String>,
90    pub user_id: Option<String>,
91    pub session_id: Option<String>,
92}
93
94impl TenantContext {
95    pub fn default_local() -> Self {
96        Self {
97            tenant_id: Some("local-dev".into()),
98            team_id: Some("default".into()),
99            user_id: Some("developer".into()),
100            session_id: None,
101        }
102    }
103}
104
105/// User supplied options for a run.
106#[derive(Clone)]
107pub struct RunOptions {
108    pub profile: Profile,
109    pub entry_flow: Option<String>,
110    pub input: Value,
111    pub ctx: TenantContext,
112    pub transcript: Option<TranscriptHook>,
113    pub otlp: Option<OtlpHook>,
114    pub mocks: MocksConfig,
115    pub artifacts_dir: Option<PathBuf>,
116    pub signing: SigningPolicy,
117    pub components_dir: Option<PathBuf>,
118    pub components_map: HashMap<String, PathBuf>,
119    pub dist_offline: bool,
120    pub dist_cache_dir: Option<PathBuf>,
121    pub allow_missing_hash: bool,
122}
123
124impl Default for RunOptions {
125    fn default() -> Self {
126        desktop_defaults()
127    }
128}
129
130impl fmt::Debug for RunOptions {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        f.debug_struct("RunOptions")
133            .field("profile", &self.profile)
134            .field("entry_flow", &self.entry_flow)
135            .field("input", &self.input)
136            .field("ctx", &self.ctx)
137            .field("transcript", &self.transcript.is_some())
138            .field("otlp", &self.otlp)
139            .field("mocks", &self.mocks)
140            .field("artifacts_dir", &self.artifacts_dir)
141            .field("signing", &self.signing)
142            .field("components_dir", &self.components_dir)
143            .field("components_map_len", &self.components_map.len())
144            .field("dist_offline", &self.dist_offline)
145            .field("dist_cache_dir", &self.dist_cache_dir)
146            .field("allow_missing_hash", &self.allow_missing_hash)
147            .finish()
148    }
149}
150
151#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
152pub enum SigningPolicy {
153    Strict,
154    DevOk,
155}
156
157/// Convenience builder that retains a baseline `RunOptions`.
158#[derive(Clone, Debug)]
159pub struct Runner {
160    base: RunOptions,
161}
162
163impl Runner {
164    pub fn new() -> Self {
165        Self {
166            base: desktop_defaults(),
167        }
168    }
169
170    pub fn profile(mut self, profile: Profile) -> Self {
171        self.base.profile = profile;
172        self
173    }
174
175    pub fn with_mocks(mut self, mocks: MocksConfig) -> Self {
176        self.base.mocks = mocks;
177        self
178    }
179
180    pub fn configure(mut self, f: impl FnOnce(&mut RunOptions)) -> Self {
181        f(&mut self.base);
182        self
183    }
184
185    pub fn run_pack<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
186        run_pack_with_options(pack_path, self.base.clone())
187    }
188
189    pub fn run_pack_with<P: AsRef<Path>>(
190        &self,
191        pack_path: P,
192        f: impl FnOnce(&mut RunOptions),
193    ) -> Result<RunResult> {
194        let mut opts = self.base.clone();
195        f(&mut opts);
196        run_pack_with_options(pack_path, opts)
197    }
198
199    pub async fn run_pack_async<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
200        run_pack_with_options_async(pack_path, self.base.clone()).await
201    }
202
203    pub async fn run_pack_with_async<P: AsRef<Path>>(
204        &self,
205        pack_path: P,
206        f: impl FnOnce(&mut RunOptions),
207    ) -> Result<RunResult> {
208        let mut opts = self.base.clone();
209        f(&mut opts);
210        run_pack_with_options_async(pack_path, opts).await
211    }
212}
213
214impl Default for Runner {
215    fn default() -> Self {
216        Self::new()
217    }
218}
219
220/// Execute a pack with the provided options.
221pub fn run_pack_with_options<P: AsRef<Path>>(pack_path: P, opts: RunOptions) -> Result<RunResult> {
222    let runtime = Runtime::new().context("failed to create tokio runtime")?;
223    runtime.block_on(run_pack_async(pack_path.as_ref(), opts))
224}
225
226/// Execute a pack with the provided options using the caller's async runtime.
227pub async fn run_pack_with_options_async<P: AsRef<Path>>(
228    pack_path: P,
229    opts: RunOptions,
230) -> Result<RunResult> {
231    run_pack_async(pack_path.as_ref(), opts).await
232}
233
234/// Reasonable defaults for local desktop invocations.
235pub fn desktop_defaults() -> RunOptions {
236    let otlp = std::env::var("OTLP_ENDPOINT")
237        .ok()
238        .map(|endpoint| OtlpHook {
239            endpoint,
240            headers: Vec::new(),
241            sample_all: true,
242        });
243    RunOptions {
244        profile: Profile::Dev(DevProfile::default()),
245        entry_flow: None,
246        input: json!({}),
247        ctx: TenantContext::default_local(),
248        transcript: None,
249        otlp,
250        mocks: MocksConfig::default(),
251        artifacts_dir: None,
252        signing: SigningPolicy::DevOk,
253        components_dir: None,
254        components_map: HashMap::new(),
255        dist_offline: false,
256        dist_cache_dir: None,
257        allow_missing_hash: false,
258    }
259}
260
261async fn run_pack_async(pack_path: &Path, opts: RunOptions) -> Result<RunResult> {
262    let pack_path = normalize_pack_path(pack_path)?;
263    let resolved_profile = resolve_profile(&opts.profile, &opts.ctx);
264    if let Some(otlp) = &opts.otlp {
265        apply_otlp_hook(otlp);
266    }
267
268    let directories = prepare_run_dirs(opts.artifacts_dir.clone())?;
269    info!(run_dir = %directories.root.display(), "prepared desktop run directory");
270
271    let mock_layer = Arc::new(MockLayer::new(opts.mocks.clone(), &directories.root)?);
272
273    let recorder = Arc::new(RunRecorder::new(
274        directories.clone(),
275        &resolved_profile,
276        None,
277        PackMetadata::fallback(&pack_path),
278        opts.transcript.clone(),
279    )?);
280
281    let mock_sink: Arc<dyn MockEventSink> = recorder.clone();
282    mock_layer.register_sink(mock_sink);
283
284    if pack_path.is_file() {
285        match open_pack(&pack_path, to_reader_policy(opts.signing)) {
286            Ok(load) => {
287                let meta = &load.manifest.meta;
288                recorder.update_pack_metadata(PackMetadata {
289                    pack_id: meta.pack_id.clone(),
290                    version: meta.version.to_string(),
291                    entry_flows: meta.entry_flows.clone(),
292                    secret_requirements: Vec::new(),
293                });
294            }
295            Err(err) => {
296                recorder.record_verify_event("error", &err.message)?;
297                if opts.signing == SigningPolicy::DevOk && is_signature_error(&err.message) {
298                    warn!(error = %err.message, "continuing despite signature error (dev policy)");
299                } else {
300                    return Err(anyhow!("pack verification failed: {}", err.message));
301                }
302            }
303        }
304    } else {
305        tracing::debug!(
306            path = %pack_path.display(),
307            "skipping pack verification for directory input"
308        );
309    }
310
311    let host_config = Arc::new(build_host_config(&resolved_profile, &directories));
312    let mut component_resolution = ComponentResolution::default();
313    if let Some(dir) = opts.components_dir.clone() {
314        component_resolution.materialized_root = Some(dir);
315    } else if pack_path.is_dir() {
316        component_resolution.materialized_root = Some(pack_path.clone());
317    }
318    component_resolution.overrides = opts.components_map.clone();
319    component_resolution.dist_offline = opts.dist_offline;
320    component_resolution.dist_cache_dir = opts.dist_cache_dir.clone();
321    component_resolution.allow_missing_hash = opts.allow_missing_hash;
322    let archive_source = if pack_path
323        .extension()
324        .and_then(|ext| ext.to_str())
325        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
326        .unwrap_or(false)
327    {
328        Some(&pack_path)
329    } else {
330        None
331    };
332
333    let session_store = new_session_store();
334    let state_store = new_state_store();
335    let secrets_manager = default_manager().context("failed to initialise secrets backend")?;
336    let pack = Arc::new(
337        PackRuntime::load(
338            &pack_path,
339            Arc::clone(&host_config),
340            Some(Arc::clone(&mock_layer)),
341            archive_source.map(|p| p as &Path),
342            Some(Arc::clone(&session_store)),
343            Some(Arc::clone(&state_store)),
344            Arc::new(RunnerWasiPolicy::default()),
345            secrets_manager,
346            host_config.oauth_broker_config(),
347            false,
348            component_resolution,
349        )
350        .await
351        .with_context(|| format!("failed to load pack {}", pack_path.display()))?,
352    );
353    recorder.update_pack_metadata(pack.metadata().clone());
354
355    let flows = pack
356        .list_flows()
357        .await
358        .context("failed to enumerate flows")?;
359    let entry_flow_id = resolve_entry_flow(opts.entry_flow.clone(), pack.metadata(), &flows)?;
360    recorder.set_flow_id(&entry_flow_id);
361
362    let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&host_config))
363        .await
364        .context("failed to prime flow engine")?;
365
366    let started_at = OffsetDateTime::now_utc();
367    let tenant_str = host_config.tenant.clone();
368    let session_id_owned = resolved_profile.session_id.clone();
369    let provider_id_owned = resolved_profile.provider_id.clone();
370    let recorder_ref: &RunRecorder = &recorder;
371    let mock_ref: &MockLayer = &mock_layer;
372    let ctx = FlowContext {
373        tenant: &tenant_str,
374        pack_id: pack.metadata().pack_id.as_str(),
375        flow_id: &entry_flow_id,
376        node_id: None,
377        tool: None,
378        action: Some("run_pack"),
379        session_id: Some(session_id_owned.as_str()),
380        provider_id: Some(provider_id_owned.as_str()),
381        retry_config: host_config.retry_config().into(),
382        attempt: 1,
383        observer: Some(recorder_ref),
384        mocks: Some(mock_ref),
385    };
386
387    let execution = engine.execute(ctx, opts.input.clone()).await;
388    let finished_at = OffsetDateTime::now_utc();
389
390    let status = match execution {
391        Ok(result) => match result.status {
392            greentic_runner_host::runner::engine::FlowStatus::Completed => RunCompletion::Ok,
393            greentic_runner_host::runner::engine::FlowStatus::Waiting(wait) => {
394                let reason = wait
395                    .reason
396                    .unwrap_or_else(|| "flow paused unexpectedly".to_string());
397                RunCompletion::Err(anyhow::anyhow!(reason))
398            }
399        },
400        Err(err) => RunCompletion::Err(err),
401    };
402
403    let result = recorder.finalise(status, started_at, finished_at)?;
404
405    let run_json_path = directories.root.join("run.json");
406    fs::write(&run_json_path, serde_json::to_vec_pretty(&result)?)
407        .with_context(|| format!("failed to write run summary {}", run_json_path.display()))?;
408
409    Ok(result)
410}
411
412fn apply_otlp_hook(hook: &OtlpHook) {
413    info!(
414        endpoint = %hook.endpoint,
415        sample_all = hook.sample_all,
416        headers = %hook.headers.len(),
417        "OTLP hook requested (set OTEL_* env vars before invoking run_pack)"
418    );
419}
420
421fn normalize_pack_path(path: &Path) -> Result<PathBuf> {
422    if path.is_absolute() {
423        let parent = path
424            .parent()
425            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
426        let root = parent
427            .canonicalize()
428            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
429        let file = path
430            .file_name()
431            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
432        return normalize_under_root(&root, Path::new(file));
433    }
434
435    let cwd = std::env::current_dir().context("failed to resolve current directory")?;
436    let base = if let Some(parent) = path.parent() {
437        cwd.join(parent)
438    } else {
439        cwd
440    };
441    let root = base
442        .canonicalize()
443        .with_context(|| format!("failed to canonicalize {}", base.display()))?;
444    let file = path
445        .file_name()
446        .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
447    normalize_under_root(&root, Path::new(file))
448}
449
450fn prepare_run_dirs(root_override: Option<PathBuf>) -> Result<RunDirectories> {
451    let root = if let Some(dir) = root_override {
452        dir
453    } else {
454        let timestamp = OffsetDateTime::now_utc()
455            .format(&Rfc3339)
456            .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
457            .replace(':', "-");
458        let short_id = Uuid::new_v4()
459            .to_string()
460            .chars()
461            .take(6)
462            .collect::<String>();
463        PathBuf::from(".greentic")
464            .join("runs")
465            .join(format!("{timestamp}_{short_id}"))
466    };
467
468    fs::create_dir_all(&root).with_context(|| format!("failed to create {}", root.display()))?;
469    let logs = root.join("logs");
470    let resolved = root.join("resolved_config");
471    fs::create_dir_all(&logs).with_context(|| format!("failed to create {}", logs.display()))?;
472    fs::create_dir_all(&resolved)
473        .with_context(|| format!("failed to create {}", resolved.display()))?;
474
475    Ok(RunDirectories {
476        root,
477        logs,
478        resolved,
479    })
480}
481
482fn resolve_entry_flow(
483    override_id: Option<String>,
484    metadata: &PackMetadata,
485    flows: &[FlowDescriptor],
486) -> Result<String> {
487    if let Some(flow) = override_id {
488        return Ok(flow);
489    }
490    if let Some(first) = metadata.entry_flows.first() {
491        return Ok(first.clone());
492    }
493    flows
494        .first()
495        .map(|f| f.id.clone())
496        .ok_or_else(|| anyhow!("pack does not declare any flows"))
497}
498
499fn is_signature_error(message: &str) -> bool {
500    message.to_ascii_lowercase().contains("signature")
501}
502
503fn to_reader_policy(policy: SigningPolicy) -> greentic_pack::reader::SigningPolicy {
504    match policy {
505        SigningPolicy::Strict => greentic_pack::reader::SigningPolicy::Strict,
506        SigningPolicy::DevOk => greentic_pack::reader::SigningPolicy::DevOk,
507    }
508}
509
510fn resolve_profile(profile: &Profile, ctx: &TenantContext) -> ResolvedProfile {
511    match profile {
512        Profile::Dev(dev) => ResolvedProfile {
513            tenant_id: ctx
514                .tenant_id
515                .clone()
516                .unwrap_or_else(|| dev.tenant_id.clone()),
517            team_id: ctx.team_id.clone().unwrap_or_else(|| dev.team_id.clone()),
518            user_id: ctx.user_id.clone().unwrap_or_else(|| dev.user_id.clone()),
519            session_id: ctx
520                .session_id
521                .clone()
522                .unwrap_or_else(|| Uuid::new_v4().to_string()),
523            provider_id: PROVIDER_ID_DEV.to_string(),
524            max_node_wall_time_ms: dev.max_node_wall_time_ms,
525            max_run_wall_time_ms: dev.max_run_wall_time_ms,
526        },
527    }
528}
529
530fn build_host_config(profile: &ResolvedProfile, dirs: &RunDirectories) -> HostConfig {
531    HostConfig {
532        tenant: profile.tenant_id.clone(),
533        bindings_path: dirs.resolved.join("dev.bindings.yaml"),
534        flow_type_bindings: HashMap::new(),
535        rate_limits: RateLimits::default(),
536        retry: FlowRetryConfig::default(),
537        http_enabled: false,
538        secrets_policy: SecretsPolicy::allow_all(),
539        state_store_policy: StateStorePolicy::default(),
540        webhook_policy: WebhookPolicy::default(),
541        timers: Vec::new(),
542        oauth: None,
543        mocks: None,
544        pack_bindings: Vec::new(),
545        env_passthrough: Vec::new(),
546        trace: TraceConfig::from_env(),
547        validation: ValidationConfig::from_env(),
548        operator_policy: OperatorPolicy::allow_all(),
549    }
550}
551
552#[allow(dead_code)]
553#[derive(Clone, Debug)]
554struct ResolvedProfile {
555    tenant_id: String,
556    team_id: String,
557    user_id: String,
558    session_id: String,
559    provider_id: String,
560    max_node_wall_time_ms: u64,
561    max_run_wall_time_ms: u64,
562}
563
564#[derive(Clone, Debug, Serialize, Deserialize)]
565pub struct RunResult {
566    pub session_id: String,
567    pub pack_id: String,
568    pub pack_version: String,
569    pub flow_id: String,
570    pub started_at_utc: String,
571    pub finished_at_utc: String,
572    pub status: RunStatus,
573    pub error: Option<String>,
574    pub node_summaries: Vec<NodeSummary>,
575    pub failures: BTreeMap<String, NodeFailure>,
576    pub artifacts_dir: PathBuf,
577}
578
579#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
580pub enum RunStatus {
581    Success,
582    PartialFailure,
583    Failure,
584}
585
586#[derive(Clone, Debug, Serialize, Deserialize)]
587pub struct NodeSummary {
588    pub node_id: String,
589    pub component: String,
590    pub status: NodeStatus,
591    pub duration_ms: u64,
592}
593
594#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
595pub enum NodeStatus {
596    Ok,
597    Skipped,
598    Error,
599}
600
601#[derive(Clone, Debug, Serialize, Deserialize)]
602pub struct NodeFailure {
603    pub code: String,
604    pub message: String,
605    pub details: Value,
606    pub transcript_offsets: (u64, u64),
607    pub log_paths: Vec<PathBuf>,
608}
609
610#[derive(Clone)]
611struct RunDirectories {
612    root: PathBuf,
613    logs: PathBuf,
614    resolved: PathBuf,
615}
616
617enum RunCompletion {
618    Ok,
619    Err(anyhow::Error),
620}
621
622struct RunRecorder {
623    directories: RunDirectories,
624    profile: ResolvedProfile,
625    flow_id: Mutex<String>,
626    pack_meta: Mutex<PackMetadata>,
627    transcript: Mutex<TranscriptWriter>,
628    state: Mutex<RunRecorderState>,
629}
630
631impl RunRecorder {
632    fn new(
633        dirs: RunDirectories,
634        profile: &ResolvedProfile,
635        flow_id: Option<String>,
636        pack_meta: PackMetadata,
637        hook: Option<TranscriptHook>,
638    ) -> Result<Self> {
639        let transcript_path = dirs.root.join("transcript.jsonl");
640        let file = File::create(&transcript_path)
641            .with_context(|| format!("failed to open {}", transcript_path.display()))?;
642        Ok(Self {
643            directories: dirs,
644            profile: profile.clone(),
645            flow_id: Mutex::new(flow_id.unwrap_or_else(|| "unknown".into())),
646            pack_meta: Mutex::new(pack_meta),
647            transcript: Mutex::new(TranscriptWriter::new(BufWriter::new(file), hook)),
648            state: Mutex::new(RunRecorderState::default()),
649        })
650    }
651
652    fn finalise(
653        &self,
654        completion: RunCompletion,
655        started_at: OffsetDateTime,
656        finished_at: OffsetDateTime,
657    ) -> Result<RunResult> {
658        let status = match &completion {
659            RunCompletion::Ok => RunStatus::Success,
660            RunCompletion::Err(_) => RunStatus::Failure,
661        };
662
663        let mut runtime_error = None;
664        if let RunCompletion::Err(err) = completion {
665            warn!(error = %err, "pack execution failed");
666            eprintln!("pack execution failed: {err}");
667            runtime_error = Some(err.to_string());
668        }
669
670        let started = started_at
671            .format(&Rfc3339)
672            .unwrap_or_else(|_| started_at.to_string());
673        let finished = finished_at
674            .format(&Rfc3339)
675            .unwrap_or_else(|_| finished_at.to_string());
676
677        let state = self.state.lock();
678        let mut summaries = Vec::new();
679        let mut failures = BTreeMap::new();
680        for node_id in &state.order {
681            if let Some(record) = state.nodes.get(node_id) {
682                let duration = record.duration_ms.unwrap_or(0);
683                summaries.push(NodeSummary {
684                    node_id: node_id.clone(),
685                    component: record.component.clone(),
686                    status: record.status.clone(),
687                    duration_ms: duration,
688                });
689                if record.status == NodeStatus::Error {
690                    let start_offset = record.transcript_start.unwrap_or(0);
691                    let err_offset = record.transcript_error.unwrap_or(start_offset);
692                    failures.insert(
693                        node_id.clone(),
694                        NodeFailure {
695                            code: record
696                                .failure_code
697                                .clone()
698                                .unwrap_or_else(|| "component-failed".into()),
699                            message: record
700                                .failure_message
701                                .clone()
702                                .unwrap_or_else(|| "node failed".into()),
703                            details: record
704                                .failure_details
705                                .clone()
706                                .unwrap_or_else(|| json!({ "node": node_id })),
707                            transcript_offsets: (start_offset, err_offset),
708                            log_paths: record.log_paths.clone(),
709                        },
710                    );
711                }
712            }
713        }
714
715        let mut final_status = status;
716        if final_status == RunStatus::Success && !failures.is_empty() {
717            final_status = RunStatus::PartialFailure;
718        }
719
720        if let Some(error) = &runtime_error {
721            failures.insert(
722                "_runtime".into(),
723                NodeFailure {
724                    code: "runtime-error".into(),
725                    message: error.clone(),
726                    details: json!({ "stage": "execute" }),
727                    transcript_offsets: (0, 0),
728                    log_paths: Vec::new(),
729                },
730            );
731        }
732
733        let pack_meta = self.pack_meta.lock();
734        let flow_id = self.flow_id.lock().clone();
735        Ok(RunResult {
736            session_id: self.profile.session_id.clone(),
737            pack_id: pack_meta.pack_id.clone(),
738            pack_version: pack_meta.version.clone(),
739            flow_id,
740            started_at_utc: started,
741            finished_at_utc: finished,
742            status: final_status,
743            error: runtime_error,
744            node_summaries: summaries,
745            failures,
746            artifacts_dir: self.directories.root.clone(),
747        })
748    }
749
750    fn set_flow_id(&self, flow_id: &str) {
751        *self.flow_id.lock() = flow_id.to_string();
752    }
753
754    fn update_pack_metadata(&self, meta: PackMetadata) {
755        *self.pack_meta.lock() = meta;
756    }
757
758    fn current_flow_id(&self) -> String {
759        self.flow_id.lock().clone()
760    }
761
762    fn record_verify_event(&self, status: &str, message: &str) -> Result<()> {
763        let timestamp = OffsetDateTime::now_utc();
764        let event = json!({
765            "ts": timestamp
766                .format(&Rfc3339)
767                .unwrap_or_else(|_| timestamp.to_string()),
768            "session_id": self.profile.session_id,
769            "flow_id": self.current_flow_id(),
770            "node_id": Value::Null,
771            "component": "verify.pack",
772            "phase": "verify",
773            "status": status,
774            "inputs": Value::Null,
775            "outputs": Value::Null,
776            "error": json!({ "message": message }),
777            "metrics": Value::Null,
778            "schema_id": Value::Null,
779            "defaults_applied": Value::Null,
780            "redactions": Value::Array(Vec::new()),
781        });
782        self.transcript.lock().write(&event).map(|_| ())
783    }
784
785    fn write_mock_event(&self, capability: &str, provider: &str, payload: Value) -> Result<()> {
786        let timestamp = OffsetDateTime::now_utc();
787        let event = json!({
788            "ts": timestamp
789                .format(&Rfc3339)
790                .unwrap_or_else(|_| timestamp.to_string()),
791            "session_id": self.profile.session_id,
792            "flow_id": self.current_flow_id(),
793            "node_id": Value::Null,
794            "component": format!("mock::{capability}"),
795            "phase": "mock",
796            "status": "ok",
797            "inputs": json!({ "capability": capability, "provider": provider }),
798            "outputs": payload,
799            "error": Value::Null,
800            "metrics": Value::Null,
801            "schema_id": Value::Null,
802            "defaults_applied": Value::Null,
803            "redactions": Value::Array(Vec::new()),
804        });
805        self.transcript.lock().write(&event).map(|_| ())
806    }
807
808    fn handle_node_start(&self, event: &NodeEvent<'_>) -> Result<()> {
809        let timestamp = OffsetDateTime::now_utc();
810        let (redacted_payload, redactions) = redact_value(event.payload, "$.inputs.payload");
811        let inputs = json!({
812            "payload": redacted_payload,
813            "context": {
814                "tenant_id": self.profile.tenant_id.as_str(),
815                "team_id": self.profile.team_id.as_str(),
816                "user_id": self.profile.user_id.as_str(),
817            }
818        });
819        let flow_id = self.current_flow_id();
820        let event_json = build_transcript_event(TranscriptEventArgs {
821            profile: &self.profile,
822            flow_id: &flow_id,
823            node_id: event.node_id,
824            component: &event.node.component,
825            phase: "start",
826            status: "ok",
827            timestamp,
828            inputs,
829            outputs: Value::Null,
830            error: Value::Null,
831            redactions,
832        });
833        let (start_offset, _) = self.transcript.lock().write(&event_json)?;
834
835        let mut state = self.state.lock();
836        let node_key = event.node_id.to_string();
837        if !state.order.iter().any(|id| id == &node_key) {
838            state.order.push(node_key.clone());
839        }
840        let entry = state.nodes.entry(node_key).or_insert_with(|| {
841            NodeExecutionRecord::new(event.node.component.clone(), &self.directories)
842        });
843        entry.start_instant = Some(Instant::now());
844        entry.status = NodeStatus::Ok;
845        entry.transcript_start = Some(start_offset);
846        Ok(())
847    }
848
849    fn handle_node_end(&self, event: &NodeEvent<'_>, output: &Value) -> Result<()> {
850        let timestamp = OffsetDateTime::now_utc();
851        let (redacted_output, redactions) = redact_value(output, "$.outputs");
852        let flow_id = self.current_flow_id();
853        let event_json = build_transcript_event(TranscriptEventArgs {
854            profile: &self.profile,
855            flow_id: &flow_id,
856            node_id: event.node_id,
857            component: &event.node.component,
858            phase: "end",
859            status: "ok",
860            timestamp,
861            inputs: Value::Null,
862            outputs: redacted_output,
863            error: Value::Null,
864            redactions,
865        });
866        self.transcript.lock().write(&event_json)?;
867
868        let mut state = self.state.lock();
869        if let Some(entry) = state.nodes.get_mut(event.node_id)
870            && let Some(started) = entry.start_instant.take()
871        {
872            entry.duration_ms = Some(started.elapsed().as_millis() as u64);
873        }
874        Ok(())
875    }
876
877    fn handle_node_error(
878        &self,
879        event: &NodeEvent<'_>,
880        error: &dyn std::error::Error,
881    ) -> Result<()> {
882        let timestamp = OffsetDateTime::now_utc();
883        let error_message = error.to_string();
884        let error_json = json!({
885            "code": "component-failed",
886            "message": error_message,
887            "details": {
888                "node": event.node_id,
889            }
890        });
891        let flow_id = self.current_flow_id();
892        let event_json = build_transcript_event(TranscriptEventArgs {
893            profile: &self.profile,
894            flow_id: &flow_id,
895            node_id: event.node_id,
896            component: &event.node.component,
897            phase: "error",
898            status: "error",
899            timestamp,
900            inputs: Value::Null,
901            outputs: Value::Null,
902            error: error_json.clone(),
903            redactions: Vec::new(),
904        });
905        let (_, end_offset) = self.transcript.lock().write(&event_json)?;
906
907        let mut state = self.state.lock();
908        if let Some(entry) = state.nodes.get_mut(event.node_id) {
909            entry.status = NodeStatus::Error;
910            if let Some(started) = entry.start_instant.take() {
911                entry.duration_ms = Some(started.elapsed().as_millis() as u64);
912            }
913            entry.transcript_error = Some(end_offset);
914            entry.failure_code = Some("component-failed".to_string());
915            entry.failure_message = Some(error_message.clone());
916            entry.failure_details = Some(error_json);
917            let log_path = self
918                .directories
919                .logs
920                .join(format!("{}.stderr.log", sanitize_id(event.node_id)));
921            if let Ok(mut file) = File::create(&log_path) {
922                let _ = writeln!(file, "{error_message}");
923            }
924            entry.log_paths.push(log_path);
925        }
926        Ok(())
927    }
928}
929
930impl ExecutionObserver for RunRecorder {
931    fn on_node_start(&self, event: &NodeEvent<'_>) {
932        if let Err(err) = self.handle_node_start(event) {
933            warn!(node = event.node_id, error = %err, "failed to record node start");
934        }
935    }
936
937    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
938        if let Err(err) = self.handle_node_end(event, output) {
939            warn!(node = event.node_id, error = %err, "failed to record node end");
940        }
941    }
942
943    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
944        if let Err(err) = self.handle_node_error(event, error) {
945            warn!(node = event.node_id, error = %err, "failed to record node error");
946        }
947    }
948}
949
950impl MockEventSink for RunRecorder {
951    fn on_mock_event(&self, capability: &str, provider: &str, payload: &Value) {
952        if let Err(err) = self.write_mock_event(capability, provider, payload.clone()) {
953            warn!(?capability, ?provider, error = %err, "failed to record mock event");
954        }
955    }
956}
957
958#[derive(Default)]
959struct RunRecorderState {
960    nodes: BTreeMap<String, NodeExecutionRecord>,
961    order: Vec<String>,
962}
963
964#[derive(Clone)]
965struct NodeExecutionRecord {
966    component: String,
967    status: NodeStatus,
968    duration_ms: Option<u64>,
969    transcript_start: Option<u64>,
970    transcript_error: Option<u64>,
971    log_paths: Vec<PathBuf>,
972    failure_code: Option<String>,
973    failure_message: Option<String>,
974    failure_details: Option<Value>,
975    start_instant: Option<Instant>,
976}
977
978impl NodeExecutionRecord {
979    fn new(component: String, _dirs: &RunDirectories) -> Self {
980        Self {
981            component,
982            status: NodeStatus::Ok,
983            duration_ms: None,
984            transcript_start: None,
985            transcript_error: None,
986            log_paths: Vec::new(),
987            failure_code: None,
988            failure_message: None,
989            failure_details: None,
990            start_instant: None,
991        }
992    }
993}
994
995struct TranscriptWriter {
996    writer: BufWriter<File>,
997    offset: u64,
998    hook: Option<TranscriptHook>,
999}
1000
1001impl TranscriptWriter {
1002    fn new(writer: BufWriter<File>, hook: Option<TranscriptHook>) -> Self {
1003        Self {
1004            writer,
1005            offset: 0,
1006            hook,
1007        }
1008    }
1009
1010    fn write(&mut self, value: &Value) -> Result<(u64, u64)> {
1011        let line = serde_json::to_vec(value)?;
1012        let start = self.offset;
1013        self.writer.write_all(&line)?;
1014        self.writer.write_all(b"\n")?;
1015        self.writer.flush()?;
1016        self.offset += line.len() as u64 + 1;
1017        if let Some(hook) = &self.hook {
1018            hook(value);
1019        }
1020        Ok((start, self.offset))
1021    }
1022}
1023
1024struct TranscriptEventArgs<'a> {
1025    profile: &'a ResolvedProfile,
1026    flow_id: &'a str,
1027    node_id: &'a str,
1028    component: &'a str,
1029    phase: &'a str,
1030    status: &'a str,
1031    timestamp: OffsetDateTime,
1032    inputs: Value,
1033    outputs: Value,
1034    error: Value,
1035    redactions: Vec<String>,
1036}
1037
1038fn build_transcript_event(args: TranscriptEventArgs<'_>) -> Value {
1039    let ts = args
1040        .timestamp
1041        .format(&Rfc3339)
1042        .unwrap_or_else(|_| args.timestamp.to_string());
1043    json!({
1044        "ts": ts,
1045        "session_id": args.profile.session_id.as_str(),
1046        "flow_id": args.flow_id,
1047        "node_id": args.node_id,
1048        "component": args.component,
1049        "phase": args.phase,
1050        "status": args.status,
1051        "inputs": args.inputs,
1052        "outputs": args.outputs,
1053        "error": args.error,
1054        "metrics": {
1055            "duration_ms": null,
1056            "cpu_time_ms": null,
1057            "mem_peak_bytes": null,
1058        },
1059        "schema_id": Value::Null,
1060        "defaults_applied": Value::Array(Vec::new()),
1061        "redactions": args.redactions,
1062    })
1063}
1064
1065fn redact_value(value: &Value, base: &str) -> (Value, Vec<String>) {
1066    let mut paths = Vec::new();
1067    let redacted = redact_recursive(value, base, &mut paths);
1068    (redacted, paths)
1069}
1070
1071fn redact_recursive(value: &Value, path: &str, acc: &mut Vec<String>) -> Value {
1072    match value {
1073        Value::Object(map) => {
1074            let mut new_map = JsonMap::new();
1075            for (key, val) in map {
1076                let child_path = format!("{path}.{key}");
1077                if is_sensitive_key(key) {
1078                    acc.push(child_path);
1079                    new_map.insert(key.clone(), Value::String("__REDACTED__".into()));
1080                } else {
1081                    new_map.insert(key.clone(), redact_recursive(val, &child_path, acc));
1082                }
1083            }
1084            Value::Object(new_map)
1085        }
1086        Value::Array(items) => {
1087            let mut new_items = Vec::new();
1088            for (idx, item) in items.iter().enumerate() {
1089                let child_path = format!("{path}[{idx}]");
1090                new_items.push(redact_recursive(item, &child_path, acc));
1091            }
1092            Value::Array(new_items)
1093        }
1094        other => other.clone(),
1095    }
1096}
1097
1098fn is_sensitive_key(key: &str) -> bool {
1099    let lower = key.to_ascii_lowercase();
1100    const MARKERS: [&str; 5] = ["secret", "token", "password", "authorization", "cookie"];
1101    MARKERS.iter().any(|marker| lower.contains(marker))
1102}
1103
1104fn sanitize_id(value: &str) -> String {
1105    value
1106        .chars()
1107        .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
1108        .collect()
1109}