Skip to main content

greentic_runner_desktop/
lib.rs

1use anyhow::{Context, Result, anyhow};
2use greentic_pack::reader::open_pack;
3use greentic_runner_host::RunnerWasiPolicy;
4use greentic_runner_host::config::{
5    FlowRetryConfig, HostConfig, RateLimits, SecretsPolicy, StateStorePolicy, WebhookPolicy,
6};
7use greentic_runner_host::pack::{ComponentResolution, FlowDescriptor, PackMetadata, PackRuntime};
8use greentic_runner_host::runner::engine::{ExecutionObserver, FlowContext, FlowEngine, NodeEvent};
9pub use greentic_runner_host::runner::mocks::{
10    HttpMock, HttpMockMode, KvMock, MocksConfig, SecretsMock, TelemetryMock, TimeMock, ToolsMock,
11};
12use greentic_runner_host::runner::mocks::{MockEventSink, MockLayer};
13use greentic_runner_host::secrets::default_manager;
14use greentic_runner_host::storage::{new_session_store, new_state_store};
15use greentic_runner_host::trace::TraceConfig;
16use greentic_runner_host::validate::ValidationConfig;
17use parking_lot::Mutex;
18use runner_core::normalize_under_root;
19use serde::{Deserialize, Serialize};
20use serde_json::{Map as JsonMap, Value, json};
21use std::collections::{BTreeMap, HashMap};
22use std::fmt;
23use std::fs::{self, File};
24use std::io::{BufWriter, Write};
25use std::path::{Path, PathBuf};
26use std::sync::Arc;
27use std::time::Instant;
28use time::OffsetDateTime;
29use time::format_description::well_known::Rfc3339;
30use tokio::runtime::Runtime;
31use tracing::{info, warn};
32use uuid::Uuid;
33
34const PROVIDER_ID_DEV: &str = "greentic-dev";
35
36/// Hook invoked for each transcript event.
37pub type TranscriptHook = Arc<dyn Fn(&Value) + Send + Sync>;
38
39/// Configuration for emitting OTLP telemetry.
40#[derive(Clone, Debug, Serialize, Deserialize, Default)]
41pub struct OtlpHook {
42    pub endpoint: String,
43    #[serde(default)]
44    pub headers: Vec<(String, String)>,
45    #[serde(default)]
46    pub sample_all: bool,
47}
48
49/// Execution profile.
50#[derive(Clone, Debug)]
51#[non_exhaustive]
52pub enum Profile {
53    Dev(DevProfile),
54}
55
56impl Default for Profile {
57    fn default() -> Self {
58        Self::Dev(DevProfile::default())
59    }
60}
61
62/// Tunables for the developer profile.
63#[derive(Clone, Debug)]
64pub struct DevProfile {
65    pub tenant_id: String,
66    pub team_id: String,
67    pub user_id: String,
68    pub max_node_wall_time_ms: u64,
69    pub max_run_wall_time_ms: u64,
70}
71
72impl Default for DevProfile {
73    fn default() -> Self {
74        Self {
75            tenant_id: "local-dev".to_string(),
76            team_id: "default".to_string(),
77            user_id: "developer".to_string(),
78            max_node_wall_time_ms: 30_000,
79            max_run_wall_time_ms: 600_000,
80        }
81    }
82}
83
84/// Tenant context overrides supplied by callers.
85#[derive(Clone, Debug, Default)]
86pub struct TenantContext {
87    pub tenant_id: Option<String>,
88    pub team_id: Option<String>,
89    pub user_id: Option<String>,
90    pub session_id: Option<String>,
91}
92
93impl TenantContext {
94    pub fn default_local() -> Self {
95        Self {
96            tenant_id: Some("local-dev".into()),
97            team_id: Some("default".into()),
98            user_id: Some("developer".into()),
99            session_id: None,
100        }
101    }
102}
103
104/// User supplied options for a run.
105#[derive(Clone)]
106pub struct RunOptions {
107    pub profile: Profile,
108    pub entry_flow: Option<String>,
109    pub input: Value,
110    pub ctx: TenantContext,
111    pub transcript: Option<TranscriptHook>,
112    pub otlp: Option<OtlpHook>,
113    pub mocks: MocksConfig,
114    pub artifacts_dir: Option<PathBuf>,
115    pub signing: SigningPolicy,
116    pub components_dir: Option<PathBuf>,
117    pub components_map: HashMap<String, PathBuf>,
118    pub dist_offline: bool,
119    pub dist_cache_dir: Option<PathBuf>,
120    pub allow_missing_hash: bool,
121}
122
123impl Default for RunOptions {
124    fn default() -> Self {
125        desktop_defaults()
126    }
127}
128
129impl fmt::Debug for RunOptions {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        f.debug_struct("RunOptions")
132            .field("profile", &self.profile)
133            .field("entry_flow", &self.entry_flow)
134            .field("input", &self.input)
135            .field("ctx", &self.ctx)
136            .field("transcript", &self.transcript.is_some())
137            .field("otlp", &self.otlp)
138            .field("mocks", &self.mocks)
139            .field("artifacts_dir", &self.artifacts_dir)
140            .field("signing", &self.signing)
141            .field("components_dir", &self.components_dir)
142            .field("components_map_len", &self.components_map.len())
143            .field("dist_offline", &self.dist_offline)
144            .field("dist_cache_dir", &self.dist_cache_dir)
145            .field("allow_missing_hash", &self.allow_missing_hash)
146            .finish()
147    }
148}
149
150#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
151pub enum SigningPolicy {
152    Strict,
153    DevOk,
154}
155
156/// Convenience builder that retains a baseline `RunOptions`.
157#[derive(Clone, Debug)]
158pub struct Runner {
159    base: RunOptions,
160}
161
162impl Runner {
163    pub fn new() -> Self {
164        Self {
165            base: desktop_defaults(),
166        }
167    }
168
169    pub fn profile(mut self, profile: Profile) -> Self {
170        self.base.profile = profile;
171        self
172    }
173
174    pub fn with_mocks(mut self, mocks: MocksConfig) -> Self {
175        self.base.mocks = mocks;
176        self
177    }
178
179    pub fn configure(mut self, f: impl FnOnce(&mut RunOptions)) -> Self {
180        f(&mut self.base);
181        self
182    }
183
184    pub fn run_pack<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
185        run_pack_with_options(pack_path, self.base.clone())
186    }
187
188    pub fn run_pack_with<P: AsRef<Path>>(
189        &self,
190        pack_path: P,
191        f: impl FnOnce(&mut RunOptions),
192    ) -> Result<RunResult> {
193        let mut opts = self.base.clone();
194        f(&mut opts);
195        run_pack_with_options(pack_path, opts)
196    }
197}
198
199impl Default for Runner {
200    fn default() -> Self {
201        Self::new()
202    }
203}
204
205/// Execute a pack with the provided options.
206pub fn run_pack_with_options<P: AsRef<Path>>(pack_path: P, opts: RunOptions) -> Result<RunResult> {
207    let runtime = Runtime::new().context("failed to create tokio runtime")?;
208    runtime.block_on(run_pack_async(pack_path.as_ref(), opts))
209}
210
211/// Reasonable defaults for local desktop invocations.
212pub fn desktop_defaults() -> RunOptions {
213    let otlp = std::env::var("OTLP_ENDPOINT")
214        .ok()
215        .map(|endpoint| OtlpHook {
216            endpoint,
217            headers: Vec::new(),
218            sample_all: true,
219        });
220    RunOptions {
221        profile: Profile::Dev(DevProfile::default()),
222        entry_flow: None,
223        input: json!({}),
224        ctx: TenantContext::default_local(),
225        transcript: None,
226        otlp,
227        mocks: MocksConfig::default(),
228        artifacts_dir: None,
229        signing: SigningPolicy::DevOk,
230        components_dir: None,
231        components_map: HashMap::new(),
232        dist_offline: false,
233        dist_cache_dir: None,
234        allow_missing_hash: false,
235    }
236}
237
238async fn run_pack_async(pack_path: &Path, opts: RunOptions) -> Result<RunResult> {
239    let pack_path = normalize_pack_path(pack_path)?;
240    let resolved_profile = resolve_profile(&opts.profile, &opts.ctx);
241    if let Some(otlp) = &opts.otlp {
242        apply_otlp_hook(otlp);
243    }
244
245    let directories = prepare_run_dirs(opts.artifacts_dir.clone())?;
246    info!(run_dir = %directories.root.display(), "prepared desktop run directory");
247
248    let mock_layer = Arc::new(MockLayer::new(opts.mocks.clone(), &directories.root)?);
249
250    let recorder = Arc::new(RunRecorder::new(
251        directories.clone(),
252        &resolved_profile,
253        None,
254        PackMetadata::fallback(&pack_path),
255        opts.transcript.clone(),
256    )?);
257
258    let mock_sink: Arc<dyn MockEventSink> = recorder.clone();
259    mock_layer.register_sink(mock_sink);
260
261    if pack_path.is_file() {
262        match open_pack(&pack_path, to_reader_policy(opts.signing)) {
263            Ok(load) => {
264                let meta = &load.manifest.meta;
265                recorder.update_pack_metadata(PackMetadata {
266                    pack_id: meta.pack_id.clone(),
267                    version: meta.version.to_string(),
268                    entry_flows: meta.entry_flows.clone(),
269                    secret_requirements: Vec::new(),
270                });
271            }
272            Err(err) => {
273                recorder.record_verify_event("error", &err.message)?;
274                if opts.signing == SigningPolicy::DevOk && is_signature_error(&err.message) {
275                    warn!(error = %err.message, "continuing despite signature error (dev policy)");
276                } else {
277                    return Err(anyhow!("pack verification failed: {}", err.message));
278                }
279            }
280        }
281    } else {
282        tracing::debug!(
283            path = %pack_path.display(),
284            "skipping pack verification for directory input"
285        );
286    }
287
288    let host_config = Arc::new(build_host_config(&resolved_profile, &directories));
289    let mut component_resolution = ComponentResolution::default();
290    if let Some(dir) = opts.components_dir.clone() {
291        component_resolution.materialized_root = Some(dir);
292    } else if pack_path.is_dir() {
293        component_resolution.materialized_root = Some(pack_path.clone());
294    }
295    component_resolution.overrides = opts.components_map.clone();
296    component_resolution.dist_offline = opts.dist_offline;
297    component_resolution.dist_cache_dir = opts.dist_cache_dir.clone();
298    component_resolution.allow_missing_hash = opts.allow_missing_hash;
299    let archive_source = if pack_path
300        .extension()
301        .and_then(|ext| ext.to_str())
302        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
303        .unwrap_or(false)
304    {
305        Some(&pack_path)
306    } else {
307        None
308    };
309
310    let session_store = new_session_store();
311    let state_store = new_state_store();
312    let secrets_manager = default_manager().context("failed to initialise secrets backend")?;
313    let pack = Arc::new(
314        PackRuntime::load(
315            &pack_path,
316            Arc::clone(&host_config),
317            Some(Arc::clone(&mock_layer)),
318            archive_source.map(|p| p as &Path),
319            Some(Arc::clone(&session_store)),
320            Some(Arc::clone(&state_store)),
321            Arc::new(RunnerWasiPolicy::default()),
322            secrets_manager,
323            host_config.oauth_broker_config(),
324            false,
325            component_resolution,
326        )
327        .await
328        .with_context(|| format!("failed to load pack {}", pack_path.display()))?,
329    );
330    recorder.update_pack_metadata(pack.metadata().clone());
331
332    let flows = pack
333        .list_flows()
334        .await
335        .context("failed to enumerate flows")?;
336    let entry_flow_id = resolve_entry_flow(opts.entry_flow.clone(), pack.metadata(), &flows)?;
337    recorder.set_flow_id(&entry_flow_id);
338
339    let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&host_config))
340        .await
341        .context("failed to prime flow engine")?;
342
343    let started_at = OffsetDateTime::now_utc();
344    let tenant_str = host_config.tenant.clone();
345    let session_id_owned = resolved_profile.session_id.clone();
346    let provider_id_owned = resolved_profile.provider_id.clone();
347    let recorder_ref: &RunRecorder = &recorder;
348    let mock_ref: &MockLayer = &mock_layer;
349    let ctx = FlowContext {
350        tenant: &tenant_str,
351        pack_id: pack.metadata().pack_id.as_str(),
352        flow_id: &entry_flow_id,
353        node_id: None,
354        tool: None,
355        action: Some("run_pack"),
356        session_id: Some(session_id_owned.as_str()),
357        provider_id: Some(provider_id_owned.as_str()),
358        retry_config: host_config.retry_config().into(),
359        observer: Some(recorder_ref),
360        mocks: Some(mock_ref),
361    };
362
363    let execution = engine.execute(ctx, opts.input.clone()).await;
364    let finished_at = OffsetDateTime::now_utc();
365
366    let status = match execution {
367        Ok(result) => match result.status {
368            greentic_runner_host::runner::engine::FlowStatus::Completed => RunCompletion::Ok,
369            greentic_runner_host::runner::engine::FlowStatus::Waiting(wait) => {
370                let reason = wait
371                    .reason
372                    .unwrap_or_else(|| "flow paused unexpectedly".to_string());
373                RunCompletion::Err(anyhow::anyhow!(reason))
374            }
375        },
376        Err(err) => RunCompletion::Err(err),
377    };
378
379    let result = recorder.finalise(status, started_at, finished_at)?;
380
381    let run_json_path = directories.root.join("run.json");
382    fs::write(&run_json_path, serde_json::to_vec_pretty(&result)?)
383        .with_context(|| format!("failed to write run summary {}", run_json_path.display()))?;
384
385    Ok(result)
386}
387
388fn apply_otlp_hook(hook: &OtlpHook) {
389    info!(
390        endpoint = %hook.endpoint,
391        sample_all = hook.sample_all,
392        headers = %hook.headers.len(),
393        "OTLP hook requested (set OTEL_* env vars before invoking run_pack)"
394    );
395}
396
397fn normalize_pack_path(path: &Path) -> Result<PathBuf> {
398    if path.is_absolute() {
399        let parent = path
400            .parent()
401            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
402        let root = parent
403            .canonicalize()
404            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
405        let file = path
406            .file_name()
407            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
408        return normalize_under_root(&root, Path::new(file));
409    }
410
411    let cwd = std::env::current_dir().context("failed to resolve current directory")?;
412    let base = if let Some(parent) = path.parent() {
413        cwd.join(parent)
414    } else {
415        cwd
416    };
417    let root = base
418        .canonicalize()
419        .with_context(|| format!("failed to canonicalize {}", base.display()))?;
420    let file = path
421        .file_name()
422        .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
423    normalize_under_root(&root, Path::new(file))
424}
425
426fn prepare_run_dirs(root_override: Option<PathBuf>) -> Result<RunDirectories> {
427    let root = if let Some(dir) = root_override {
428        dir
429    } else {
430        let timestamp = OffsetDateTime::now_utc()
431            .format(&Rfc3339)
432            .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
433            .replace(':', "-");
434        let short_id = Uuid::new_v4()
435            .to_string()
436            .chars()
437            .take(6)
438            .collect::<String>();
439        PathBuf::from(".greentic")
440            .join("runs")
441            .join(format!("{timestamp}_{short_id}"))
442    };
443
444    fs::create_dir_all(&root).with_context(|| format!("failed to create {}", root.display()))?;
445    let logs = root.join("logs");
446    let resolved = root.join("resolved_config");
447    fs::create_dir_all(&logs).with_context(|| format!("failed to create {}", logs.display()))?;
448    fs::create_dir_all(&resolved)
449        .with_context(|| format!("failed to create {}", resolved.display()))?;
450
451    Ok(RunDirectories {
452        root,
453        logs,
454        resolved,
455    })
456}
457
458fn resolve_entry_flow(
459    override_id: Option<String>,
460    metadata: &PackMetadata,
461    flows: &[FlowDescriptor],
462) -> Result<String> {
463    if let Some(flow) = override_id {
464        return Ok(flow);
465    }
466    if let Some(first) = metadata.entry_flows.first() {
467        return Ok(first.clone());
468    }
469    flows
470        .first()
471        .map(|f| f.id.clone())
472        .ok_or_else(|| anyhow!("pack does not declare any flows"))
473}
474
475fn is_signature_error(message: &str) -> bool {
476    message.to_ascii_lowercase().contains("signature")
477}
478
479fn to_reader_policy(policy: SigningPolicy) -> greentic_pack::reader::SigningPolicy {
480    match policy {
481        SigningPolicy::Strict => greentic_pack::reader::SigningPolicy::Strict,
482        SigningPolicy::DevOk => greentic_pack::reader::SigningPolicy::DevOk,
483    }
484}
485
486fn resolve_profile(profile: &Profile, ctx: &TenantContext) -> ResolvedProfile {
487    match profile {
488        Profile::Dev(dev) => ResolvedProfile {
489            tenant_id: ctx
490                .tenant_id
491                .clone()
492                .unwrap_or_else(|| dev.tenant_id.clone()),
493            team_id: ctx.team_id.clone().unwrap_or_else(|| dev.team_id.clone()),
494            user_id: ctx.user_id.clone().unwrap_or_else(|| dev.user_id.clone()),
495            session_id: ctx
496                .session_id
497                .clone()
498                .unwrap_or_else(|| Uuid::new_v4().to_string()),
499            provider_id: PROVIDER_ID_DEV.to_string(),
500            max_node_wall_time_ms: dev.max_node_wall_time_ms,
501            max_run_wall_time_ms: dev.max_run_wall_time_ms,
502        },
503    }
504}
505
506fn build_host_config(profile: &ResolvedProfile, dirs: &RunDirectories) -> HostConfig {
507    HostConfig {
508        tenant: profile.tenant_id.clone(),
509        bindings_path: dirs.resolved.join("dev.bindings.yaml"),
510        flow_type_bindings: HashMap::new(),
511        rate_limits: RateLimits::default(),
512        retry: FlowRetryConfig::default(),
513        http_enabled: false,
514        secrets_policy: SecretsPolicy::allow_all(),
515        state_store_policy: StateStorePolicy::default(),
516        webhook_policy: WebhookPolicy::default(),
517        timers: Vec::new(),
518        oauth: None,
519        mocks: None,
520        pack_bindings: Vec::new(),
521        env_passthrough: Vec::new(),
522        trace: TraceConfig::from_env(),
523        validation: ValidationConfig::from_env(),
524    }
525}
526
527#[allow(dead_code)]
528#[derive(Clone, Debug)]
529struct ResolvedProfile {
530    tenant_id: String,
531    team_id: String,
532    user_id: String,
533    session_id: String,
534    provider_id: String,
535    max_node_wall_time_ms: u64,
536    max_run_wall_time_ms: u64,
537}
538
539#[derive(Clone, Debug, Serialize, Deserialize)]
540pub struct RunResult {
541    pub session_id: String,
542    pub pack_id: String,
543    pub pack_version: String,
544    pub flow_id: String,
545    pub started_at_utc: String,
546    pub finished_at_utc: String,
547    pub status: RunStatus,
548    pub error: Option<String>,
549    pub node_summaries: Vec<NodeSummary>,
550    pub failures: BTreeMap<String, NodeFailure>,
551    pub artifacts_dir: PathBuf,
552}
553
554#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
555pub enum RunStatus {
556    Success,
557    PartialFailure,
558    Failure,
559}
560
561#[derive(Clone, Debug, Serialize, Deserialize)]
562pub struct NodeSummary {
563    pub node_id: String,
564    pub component: String,
565    pub status: NodeStatus,
566    pub duration_ms: u64,
567}
568
569#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
570pub enum NodeStatus {
571    Ok,
572    Skipped,
573    Error,
574}
575
576#[derive(Clone, Debug, Serialize, Deserialize)]
577pub struct NodeFailure {
578    pub code: String,
579    pub message: String,
580    pub details: Value,
581    pub transcript_offsets: (u64, u64),
582    pub log_paths: Vec<PathBuf>,
583}
584
585#[derive(Clone)]
586struct RunDirectories {
587    root: PathBuf,
588    logs: PathBuf,
589    resolved: PathBuf,
590}
591
592enum RunCompletion {
593    Ok,
594    Err(anyhow::Error),
595}
596
597struct RunRecorder {
598    directories: RunDirectories,
599    profile: ResolvedProfile,
600    flow_id: Mutex<String>,
601    pack_meta: Mutex<PackMetadata>,
602    transcript: Mutex<TranscriptWriter>,
603    state: Mutex<RunRecorderState>,
604}
605
606impl RunRecorder {
607    fn new(
608        dirs: RunDirectories,
609        profile: &ResolvedProfile,
610        flow_id: Option<String>,
611        pack_meta: PackMetadata,
612        hook: Option<TranscriptHook>,
613    ) -> Result<Self> {
614        let transcript_path = dirs.root.join("transcript.jsonl");
615        let file = File::create(&transcript_path)
616            .with_context(|| format!("failed to open {}", transcript_path.display()))?;
617        Ok(Self {
618            directories: dirs,
619            profile: profile.clone(),
620            flow_id: Mutex::new(flow_id.unwrap_or_else(|| "unknown".into())),
621            pack_meta: Mutex::new(pack_meta),
622            transcript: Mutex::new(TranscriptWriter::new(BufWriter::new(file), hook)),
623            state: Mutex::new(RunRecorderState::default()),
624        })
625    }
626
627    fn finalise(
628        &self,
629        completion: RunCompletion,
630        started_at: OffsetDateTime,
631        finished_at: OffsetDateTime,
632    ) -> Result<RunResult> {
633        let status = match &completion {
634            RunCompletion::Ok => RunStatus::Success,
635            RunCompletion::Err(_) => RunStatus::Failure,
636        };
637
638        let mut runtime_error = None;
639        if let RunCompletion::Err(err) = completion {
640            warn!(error = %err, "pack execution failed");
641            eprintln!("pack execution failed: {err}");
642            runtime_error = Some(err.to_string());
643        }
644
645        let started = started_at
646            .format(&Rfc3339)
647            .unwrap_or_else(|_| started_at.to_string());
648        let finished = finished_at
649            .format(&Rfc3339)
650            .unwrap_or_else(|_| finished_at.to_string());
651
652        let state = self.state.lock();
653        let mut summaries = Vec::new();
654        let mut failures = BTreeMap::new();
655        for node_id in &state.order {
656            if let Some(record) = state.nodes.get(node_id) {
657                let duration = record.duration_ms.unwrap_or(0);
658                summaries.push(NodeSummary {
659                    node_id: node_id.clone(),
660                    component: record.component.clone(),
661                    status: record.status.clone(),
662                    duration_ms: duration,
663                });
664                if record.status == NodeStatus::Error {
665                    let start_offset = record.transcript_start.unwrap_or(0);
666                    let err_offset = record.transcript_error.unwrap_or(start_offset);
667                    failures.insert(
668                        node_id.clone(),
669                        NodeFailure {
670                            code: record
671                                .failure_code
672                                .clone()
673                                .unwrap_or_else(|| "component-failed".into()),
674                            message: record
675                                .failure_message
676                                .clone()
677                                .unwrap_or_else(|| "node failed".into()),
678                            details: record
679                                .failure_details
680                                .clone()
681                                .unwrap_or_else(|| json!({ "node": node_id })),
682                            transcript_offsets: (start_offset, err_offset),
683                            log_paths: record.log_paths.clone(),
684                        },
685                    );
686                }
687            }
688        }
689
690        let mut final_status = status;
691        if final_status == RunStatus::Success && !failures.is_empty() {
692            final_status = RunStatus::PartialFailure;
693        }
694
695        if let Some(error) = &runtime_error {
696            failures.insert(
697                "_runtime".into(),
698                NodeFailure {
699                    code: "runtime-error".into(),
700                    message: error.clone(),
701                    details: json!({ "stage": "execute" }),
702                    transcript_offsets: (0, 0),
703                    log_paths: Vec::new(),
704                },
705            );
706        }
707
708        let pack_meta = self.pack_meta.lock();
709        let flow_id = self.flow_id.lock().clone();
710        Ok(RunResult {
711            session_id: self.profile.session_id.clone(),
712            pack_id: pack_meta.pack_id.clone(),
713            pack_version: pack_meta.version.clone(),
714            flow_id,
715            started_at_utc: started,
716            finished_at_utc: finished,
717            status: final_status,
718            error: runtime_error,
719            node_summaries: summaries,
720            failures,
721            artifacts_dir: self.directories.root.clone(),
722        })
723    }
724
725    fn set_flow_id(&self, flow_id: &str) {
726        *self.flow_id.lock() = flow_id.to_string();
727    }
728
729    fn update_pack_metadata(&self, meta: PackMetadata) {
730        *self.pack_meta.lock() = meta;
731    }
732
733    fn current_flow_id(&self) -> String {
734        self.flow_id.lock().clone()
735    }
736
737    fn record_verify_event(&self, status: &str, message: &str) -> Result<()> {
738        let timestamp = OffsetDateTime::now_utc();
739        let event = json!({
740            "ts": timestamp
741                .format(&Rfc3339)
742                .unwrap_or_else(|_| timestamp.to_string()),
743            "session_id": self.profile.session_id,
744            "flow_id": self.current_flow_id(),
745            "node_id": Value::Null,
746            "component": "verify.pack",
747            "phase": "verify",
748            "status": status,
749            "inputs": Value::Null,
750            "outputs": Value::Null,
751            "error": json!({ "message": message }),
752            "metrics": Value::Null,
753            "schema_id": Value::Null,
754            "defaults_applied": Value::Null,
755            "redactions": Value::Array(Vec::new()),
756        });
757        self.transcript.lock().write(&event).map(|_| ())
758    }
759
760    fn write_mock_event(&self, capability: &str, provider: &str, payload: Value) -> Result<()> {
761        let timestamp = OffsetDateTime::now_utc();
762        let event = json!({
763            "ts": timestamp
764                .format(&Rfc3339)
765                .unwrap_or_else(|_| timestamp.to_string()),
766            "session_id": self.profile.session_id,
767            "flow_id": self.current_flow_id(),
768            "node_id": Value::Null,
769            "component": format!("mock::{capability}"),
770            "phase": "mock",
771            "status": "ok",
772            "inputs": json!({ "capability": capability, "provider": provider }),
773            "outputs": payload,
774            "error": Value::Null,
775            "metrics": Value::Null,
776            "schema_id": Value::Null,
777            "defaults_applied": Value::Null,
778            "redactions": Value::Array(Vec::new()),
779        });
780        self.transcript.lock().write(&event).map(|_| ())
781    }
782
783    fn handle_node_start(&self, event: &NodeEvent<'_>) -> Result<()> {
784        let timestamp = OffsetDateTime::now_utc();
785        let (redacted_payload, redactions) = redact_value(event.payload, "$.inputs.payload");
786        let inputs = json!({
787            "payload": redacted_payload,
788            "context": {
789                "tenant_id": self.profile.tenant_id.as_str(),
790                "team_id": self.profile.team_id.as_str(),
791                "user_id": self.profile.user_id.as_str(),
792            }
793        });
794        let flow_id = self.current_flow_id();
795        let event_json = build_transcript_event(TranscriptEventArgs {
796            profile: &self.profile,
797            flow_id: &flow_id,
798            node_id: event.node_id,
799            component: &event.node.component,
800            phase: "start",
801            status: "ok",
802            timestamp,
803            inputs,
804            outputs: Value::Null,
805            error: Value::Null,
806            redactions,
807        });
808        let (start_offset, _) = self.transcript.lock().write(&event_json)?;
809
810        let mut state = self.state.lock();
811        let node_key = event.node_id.to_string();
812        if !state.order.iter().any(|id| id == &node_key) {
813            state.order.push(node_key.clone());
814        }
815        let entry = state.nodes.entry(node_key).or_insert_with(|| {
816            NodeExecutionRecord::new(event.node.component.clone(), &self.directories)
817        });
818        entry.start_instant = Some(Instant::now());
819        entry.status = NodeStatus::Ok;
820        entry.transcript_start = Some(start_offset);
821        Ok(())
822    }
823
824    fn handle_node_end(&self, event: &NodeEvent<'_>, output: &Value) -> Result<()> {
825        let timestamp = OffsetDateTime::now_utc();
826        let (redacted_output, redactions) = redact_value(output, "$.outputs");
827        let flow_id = self.current_flow_id();
828        let event_json = build_transcript_event(TranscriptEventArgs {
829            profile: &self.profile,
830            flow_id: &flow_id,
831            node_id: event.node_id,
832            component: &event.node.component,
833            phase: "end",
834            status: "ok",
835            timestamp,
836            inputs: Value::Null,
837            outputs: redacted_output,
838            error: Value::Null,
839            redactions,
840        });
841        self.transcript.lock().write(&event_json)?;
842
843        let mut state = self.state.lock();
844        if let Some(entry) = state.nodes.get_mut(event.node_id)
845            && let Some(started) = entry.start_instant.take()
846        {
847            entry.duration_ms = Some(started.elapsed().as_millis() as u64);
848        }
849        Ok(())
850    }
851
852    fn handle_node_error(
853        &self,
854        event: &NodeEvent<'_>,
855        error: &dyn std::error::Error,
856    ) -> Result<()> {
857        let timestamp = OffsetDateTime::now_utc();
858        let error_message = error.to_string();
859        let error_json = json!({
860            "code": "component-failed",
861            "message": error_message,
862            "details": {
863                "node": event.node_id,
864            }
865        });
866        let flow_id = self.current_flow_id();
867        let event_json = build_transcript_event(TranscriptEventArgs {
868            profile: &self.profile,
869            flow_id: &flow_id,
870            node_id: event.node_id,
871            component: &event.node.component,
872            phase: "error",
873            status: "error",
874            timestamp,
875            inputs: Value::Null,
876            outputs: Value::Null,
877            error: error_json.clone(),
878            redactions: Vec::new(),
879        });
880        let (_, end_offset) = self.transcript.lock().write(&event_json)?;
881
882        let mut state = self.state.lock();
883        if let Some(entry) = state.nodes.get_mut(event.node_id) {
884            entry.status = NodeStatus::Error;
885            if let Some(started) = entry.start_instant.take() {
886                entry.duration_ms = Some(started.elapsed().as_millis() as u64);
887            }
888            entry.transcript_error = Some(end_offset);
889            entry.failure_code = Some("component-failed".to_string());
890            entry.failure_message = Some(error_message.clone());
891            entry.failure_details = Some(error_json);
892            let log_path = self
893                .directories
894                .logs
895                .join(format!("{}.stderr.log", sanitize_id(event.node_id)));
896            if let Ok(mut file) = File::create(&log_path) {
897                let _ = writeln!(file, "{error_message}");
898            }
899            entry.log_paths.push(log_path);
900        }
901        Ok(())
902    }
903}
904
905impl ExecutionObserver for RunRecorder {
906    fn on_node_start(&self, event: &NodeEvent<'_>) {
907        if let Err(err) = self.handle_node_start(event) {
908            warn!(node = event.node_id, error = %err, "failed to record node start");
909        }
910    }
911
912    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
913        if let Err(err) = self.handle_node_end(event, output) {
914            warn!(node = event.node_id, error = %err, "failed to record node end");
915        }
916    }
917
918    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
919        if let Err(err) = self.handle_node_error(event, error) {
920            warn!(node = event.node_id, error = %err, "failed to record node error");
921        }
922    }
923}
924
925impl MockEventSink for RunRecorder {
926    fn on_mock_event(&self, capability: &str, provider: &str, payload: &Value) {
927        if let Err(err) = self.write_mock_event(capability, provider, payload.clone()) {
928            warn!(?capability, ?provider, error = %err, "failed to record mock event");
929        }
930    }
931}
932
933#[derive(Default)]
934struct RunRecorderState {
935    nodes: BTreeMap<String, NodeExecutionRecord>,
936    order: Vec<String>,
937}
938
939#[derive(Clone)]
940struct NodeExecutionRecord {
941    component: String,
942    status: NodeStatus,
943    duration_ms: Option<u64>,
944    transcript_start: Option<u64>,
945    transcript_error: Option<u64>,
946    log_paths: Vec<PathBuf>,
947    failure_code: Option<String>,
948    failure_message: Option<String>,
949    failure_details: Option<Value>,
950    start_instant: Option<Instant>,
951}
952
953impl NodeExecutionRecord {
954    fn new(component: String, _dirs: &RunDirectories) -> Self {
955        Self {
956            component,
957            status: NodeStatus::Ok,
958            duration_ms: None,
959            transcript_start: None,
960            transcript_error: None,
961            log_paths: Vec::new(),
962            failure_code: None,
963            failure_message: None,
964            failure_details: None,
965            start_instant: None,
966        }
967    }
968}
969
970struct TranscriptWriter {
971    writer: BufWriter<File>,
972    offset: u64,
973    hook: Option<TranscriptHook>,
974}
975
976impl TranscriptWriter {
977    fn new(writer: BufWriter<File>, hook: Option<TranscriptHook>) -> Self {
978        Self {
979            writer,
980            offset: 0,
981            hook,
982        }
983    }
984
985    fn write(&mut self, value: &Value) -> Result<(u64, u64)> {
986        let line = serde_json::to_vec(value)?;
987        let start = self.offset;
988        self.writer.write_all(&line)?;
989        self.writer.write_all(b"\n")?;
990        self.writer.flush()?;
991        self.offset += line.len() as u64 + 1;
992        if let Some(hook) = &self.hook {
993            hook(value);
994        }
995        Ok((start, self.offset))
996    }
997}
998
999struct TranscriptEventArgs<'a> {
1000    profile: &'a ResolvedProfile,
1001    flow_id: &'a str,
1002    node_id: &'a str,
1003    component: &'a str,
1004    phase: &'a str,
1005    status: &'a str,
1006    timestamp: OffsetDateTime,
1007    inputs: Value,
1008    outputs: Value,
1009    error: Value,
1010    redactions: Vec<String>,
1011}
1012
1013fn build_transcript_event(args: TranscriptEventArgs<'_>) -> Value {
1014    let ts = args
1015        .timestamp
1016        .format(&Rfc3339)
1017        .unwrap_or_else(|_| args.timestamp.to_string());
1018    json!({
1019        "ts": ts,
1020        "session_id": args.profile.session_id.as_str(),
1021        "flow_id": args.flow_id,
1022        "node_id": args.node_id,
1023        "component": args.component,
1024        "phase": args.phase,
1025        "status": args.status,
1026        "inputs": args.inputs,
1027        "outputs": args.outputs,
1028        "error": args.error,
1029        "metrics": {
1030            "duration_ms": null,
1031            "cpu_time_ms": null,
1032            "mem_peak_bytes": null,
1033        },
1034        "schema_id": Value::Null,
1035        "defaults_applied": Value::Array(Vec::new()),
1036        "redactions": args.redactions,
1037    })
1038}
1039
1040fn redact_value(value: &Value, base: &str) -> (Value, Vec<String>) {
1041    let mut paths = Vec::new();
1042    let redacted = redact_recursive(value, base, &mut paths);
1043    (redacted, paths)
1044}
1045
1046fn redact_recursive(value: &Value, path: &str, acc: &mut Vec<String>) -> Value {
1047    match value {
1048        Value::Object(map) => {
1049            let mut new_map = JsonMap::new();
1050            for (key, val) in map {
1051                let child_path = format!("{path}.{key}");
1052                if is_sensitive_key(key) {
1053                    acc.push(child_path);
1054                    new_map.insert(key.clone(), Value::String("__REDACTED__".into()));
1055                } else {
1056                    new_map.insert(key.clone(), redact_recursive(val, &child_path, acc));
1057                }
1058            }
1059            Value::Object(new_map)
1060        }
1061        Value::Array(items) => {
1062            let mut new_items = Vec::new();
1063            for (idx, item) in items.iter().enumerate() {
1064                let child_path = format!("{path}[{idx}]");
1065                new_items.push(redact_recursive(item, &child_path, acc));
1066            }
1067            Value::Array(new_items)
1068        }
1069        other => other.clone(),
1070    }
1071}
1072
1073fn is_sensitive_key(key: &str) -> bool {
1074    let lower = key.to_ascii_lowercase();
1075    const MARKERS: [&str; 5] = ["secret", "token", "password", "authorization", "cookie"];
1076    MARKERS.iter().any(|marker| lower.contains(marker))
1077}
1078
1079fn sanitize_id(value: &str) -> String {
1080    value
1081        .chars()
1082        .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
1083        .collect()
1084}