greentic_runner_desktop/
lib.rs

1use anyhow::{Context, Result, anyhow};
2use greentic_pack::reader::{PackLoad, open_pack};
3use greentic_runner_host::config::{
4    HostConfig, McpConfig, McpRetryConfig, RateLimits, SecretsPolicy, WebhookPolicy,
5};
6use greentic_runner_host::pack::{FlowDescriptor, PackMetadata, PackRuntime};
7use greentic_runner_host::runner::engine::{ExecutionObserver, FlowContext, FlowEngine, NodeEvent};
8pub use greentic_runner_host::runner::mocks::{
9    HttpMock, HttpMockMode, KvMock, MocksConfig, SecretsMock, TelemetryMock, TimeMock, ToolsMock,
10};
11use greentic_runner_host::runner::mocks::{MockEventSink, MockLayer};
12use parking_lot::Mutex;
13use serde::{Deserialize, Serialize};
14use serde_json::{Map as JsonMap, Value, json};
15use serde_yaml_bw::{Mapping as YamlMapping, Value as YamlValue};
16use std::collections::{BTreeMap, HashMap};
17use std::fmt;
18use std::fs::{self, File};
19use std::io::{BufWriter, Write};
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::time::Instant;
23use time::OffsetDateTime;
24use time::format_description::well_known::Rfc3339;
25use tokio::runtime::Runtime;
26use tracing::{info, warn};
27use uuid::Uuid;
28use zip::ZipArchive;
29
30const PROVIDER_ID_DEV: &str = "greentic-dev";
31
32/// Hook invoked for each transcript event.
33pub type TranscriptHook = Arc<dyn Fn(&Value) + Send + Sync>;
34
35/// Configuration for emitting OTLP telemetry.
36#[derive(Clone, Debug, Serialize, Deserialize, Default)]
37pub struct OtlpHook {
38    pub endpoint: String,
39    #[serde(default)]
40    pub headers: Vec<(String, String)>,
41    #[serde(default)]
42    pub sample_all: bool,
43}
44
45/// Execution profile.
46#[derive(Clone, Debug)]
47#[non_exhaustive]
48pub enum Profile {
49    Dev(DevProfile),
50}
51
52impl Default for Profile {
53    fn default() -> Self {
54        Self::Dev(DevProfile::default())
55    }
56}
57
58/// Tunables for the developer profile.
59#[derive(Clone, Debug)]
60pub struct DevProfile {
61    pub tenant_id: String,
62    pub team_id: String,
63    pub user_id: String,
64    pub max_node_wall_time_ms: u64,
65    pub max_run_wall_time_ms: u64,
66}
67
68impl Default for DevProfile {
69    fn default() -> Self {
70        Self {
71            tenant_id: "local-dev".to_string(),
72            team_id: "default".to_string(),
73            user_id: "developer".to_string(),
74            max_node_wall_time_ms: 30_000,
75            max_run_wall_time_ms: 600_000,
76        }
77    }
78}
79
80/// Tenant context overrides supplied by callers.
81#[derive(Clone, Debug, Default)]
82pub struct TenantContext {
83    pub tenant_id: Option<String>,
84    pub team_id: Option<String>,
85    pub user_id: Option<String>,
86    pub session_id: Option<String>,
87}
88
89impl TenantContext {
90    pub fn default_local() -> Self {
91        Self {
92            tenant_id: Some("local-dev".into()),
93            team_id: Some("default".into()),
94            user_id: Some("developer".into()),
95            session_id: None,
96        }
97    }
98}
99
100/// User supplied options for a run.
101#[derive(Clone)]
102pub struct RunOptions {
103    pub profile: Profile,
104    pub entry_flow: Option<String>,
105    pub input: Value,
106    pub ctx: TenantContext,
107    pub transcript: Option<TranscriptHook>,
108    pub otlp: Option<OtlpHook>,
109    pub mocks: MocksConfig,
110    pub artifacts_dir: Option<PathBuf>,
111    pub signing: SigningPolicy,
112}
113
114impl Default for RunOptions {
115    fn default() -> Self {
116        desktop_defaults()
117    }
118}
119
120impl fmt::Debug for RunOptions {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        f.debug_struct("RunOptions")
123            .field("profile", &self.profile)
124            .field("entry_flow", &self.entry_flow)
125            .field("input", &self.input)
126            .field("ctx", &self.ctx)
127            .field("transcript", &self.transcript.is_some())
128            .field("otlp", &self.otlp)
129            .field("mocks", &self.mocks)
130            .field("artifacts_dir", &self.artifacts_dir)
131            .field("signing", &self.signing)
132            .finish()
133    }
134}
135
136#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
137pub enum SigningPolicy {
138    Strict,
139    DevOk,
140}
141
142/// Convenience builder that retains a baseline `RunOptions`.
143#[derive(Clone, Debug)]
144pub struct Runner {
145    base: RunOptions,
146}
147
148impl Runner {
149    pub fn new() -> Self {
150        Self {
151            base: desktop_defaults(),
152        }
153    }
154
155    pub fn profile(mut self, profile: Profile) -> Self {
156        self.base.profile = profile;
157        self
158    }
159
160    pub fn with_mocks(mut self, mocks: MocksConfig) -> Self {
161        self.base.mocks = mocks;
162        self
163    }
164
165    pub fn configure(mut self, f: impl FnOnce(&mut RunOptions)) -> Self {
166        f(&mut self.base);
167        self
168    }
169
170    pub fn run_pack<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
171        run_pack_with_options(pack_path, self.base.clone())
172    }
173
174    pub fn run_pack_with<P: AsRef<Path>>(
175        &self,
176        pack_path: P,
177        f: impl FnOnce(&mut RunOptions),
178    ) -> Result<RunResult> {
179        let mut opts = self.base.clone();
180        f(&mut opts);
181        run_pack_with_options(pack_path, opts)
182    }
183}
184
185impl Default for Runner {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191/// Execute a pack with the provided options.
192pub fn run_pack_with_options<P: AsRef<Path>>(pack_path: P, opts: RunOptions) -> Result<RunResult> {
193    let runtime = Runtime::new().context("failed to create tokio runtime")?;
194    runtime.block_on(run_pack_async(pack_path.as_ref(), opts))
195}
196
197/// Reasonable defaults for local desktop invocations.
198pub fn desktop_defaults() -> RunOptions {
199    let otlp = std::env::var("OTLP_ENDPOINT")
200        .ok()
201        .map(|endpoint| OtlpHook {
202            endpoint,
203            headers: Vec::new(),
204            sample_all: true,
205        });
206    RunOptions {
207        profile: Profile::Dev(DevProfile::default()),
208        entry_flow: None,
209        input: json!({}),
210        ctx: TenantContext::default_local(),
211        transcript: None,
212        otlp,
213        mocks: MocksConfig::default(),
214        artifacts_dir: None,
215        signing: SigningPolicy::DevOk,
216    }
217}
218
219async fn run_pack_async(pack_path: &Path, opts: RunOptions) -> Result<RunResult> {
220    let resolved_profile = resolve_profile(&opts.profile, &opts.ctx);
221    if let Some(otlp) = &opts.otlp {
222        apply_otlp_hook(otlp);
223    }
224
225    let directories = prepare_run_dirs(opts.artifacts_dir.clone())?;
226    info!(run_dir = %directories.root.display(), "prepared desktop run directory");
227
228    let mock_layer = Arc::new(MockLayer::new(opts.mocks.clone(), &directories.root)?);
229
230    let recorder = Arc::new(RunRecorder::new(
231        directories.clone(),
232        &resolved_profile,
233        None,
234        PackMetadata::fallback(pack_path),
235        opts.transcript.clone(),
236    )?);
237
238    let mock_sink: Arc<dyn MockEventSink> = recorder.clone();
239    mock_layer.register_sink(mock_sink);
240
241    let mut pack_load: Option<PackLoad> = None;
242    match open_pack(pack_path, to_reader_policy(opts.signing)) {
243        Ok(load) => {
244            recorder.update_pack_metadata(PackMetadata::from_manifest(&load.manifest));
245            pack_load = Some(load);
246        }
247        Err(err) => {
248            recorder.record_verify_event("error", &err.message)?;
249            if opts.signing == SigningPolicy::DevOk && is_signature_error(&err.message) {
250                warn!(error = %err.message, "continuing despite signature error (dev policy)");
251            } else {
252                return Err(anyhow!("pack verification failed: {}", err.message));
253            }
254        }
255    }
256
257    let host_config = Arc::new(build_host_config(&resolved_profile, &directories));
258    let component_artifact =
259        resolve_component_artifact(pack_path, pack_load.as_ref(), &directories)?;
260    let archive_source = if pack_path
261        .extension()
262        .and_then(|ext| ext.to_str())
263        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
264        .unwrap_or(false)
265    {
266        Some(pack_path)
267    } else {
268        None
269    };
270
271    let pack = Arc::new(
272        PackRuntime::load(
273            &component_artifact,
274            Arc::clone(&host_config),
275            Some(Arc::clone(&mock_layer)),
276            archive_source,
277        )
278        .await
279        .with_context(|| format!("failed to load pack {}", component_artifact.display()))?,
280    );
281    recorder.update_pack_metadata(pack.metadata().clone());
282
283    let flows = pack
284        .list_flows()
285        .await
286        .context("failed to enumerate flows")?;
287    let entry_flow_id = resolve_entry_flow(opts.entry_flow.clone(), pack.metadata(), &flows)?;
288    recorder.set_flow_id(&entry_flow_id);
289
290    let engine = FlowEngine::new(Arc::clone(&pack), Arc::clone(&host_config))
291        .await
292        .context("failed to prime flow engine")?;
293
294    let started_at = OffsetDateTime::now_utc();
295    let tenant_str = host_config.tenant.clone();
296    let session_id_owned = resolved_profile.session_id.clone();
297    let provider_id_owned = resolved_profile.provider_id.clone();
298    let recorder_ref: &RunRecorder = &recorder;
299    let mock_ref: &MockLayer = &mock_layer;
300    let ctx = FlowContext {
301        tenant: &tenant_str,
302        flow_id: &entry_flow_id,
303        node_id: None,
304        tool: None,
305        action: Some("run_pack"),
306        session_id: Some(session_id_owned.as_str()),
307        provider_id: Some(provider_id_owned.as_str()),
308        retry_config: host_config.mcp_retry_config().into(),
309        observer: Some(recorder_ref),
310        mocks: Some(mock_ref),
311    };
312
313    let execution = engine.execute(ctx, opts.input.clone()).await;
314    let finished_at = OffsetDateTime::now_utc();
315
316    let status = match execution {
317        Ok(_) => RunCompletion::Ok,
318        Err(err) => RunCompletion::Err(err),
319    };
320
321    let result = recorder.finalise(status, started_at, finished_at)?;
322
323    let run_json_path = directories.root.join("run.json");
324    fs::write(&run_json_path, serde_json::to_vec_pretty(&result)?)
325        .with_context(|| format!("failed to write run summary {}", run_json_path.display()))?;
326
327    Ok(result)
328}
329
330fn apply_otlp_hook(hook: &OtlpHook) {
331    info!(
332        endpoint = %hook.endpoint,
333        sample_all = hook.sample_all,
334        headers = %hook.headers.len(),
335        "OTLP hook requested (set OTEL_* env vars before invoking run_pack)"
336    );
337}
338
339fn prepare_run_dirs(root_override: Option<PathBuf>) -> Result<RunDirectories> {
340    let root = if let Some(dir) = root_override {
341        dir
342    } else {
343        let timestamp = OffsetDateTime::now_utc()
344            .format(&Rfc3339)
345            .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
346            .replace(':', "-");
347        let short_id = Uuid::new_v4()
348            .to_string()
349            .chars()
350            .take(6)
351            .collect::<String>();
352        PathBuf::from(".greentic")
353            .join("runs")
354            .join(format!("{timestamp}_{short_id}"))
355    };
356
357    fs::create_dir_all(&root).with_context(|| format!("failed to create {}", root.display()))?;
358    let logs = root.join("logs");
359    let resolved = root.join("resolved_config");
360    fs::create_dir_all(&logs).with_context(|| format!("failed to create {}", logs.display()))?;
361    fs::create_dir_all(&resolved)
362        .with_context(|| format!("failed to create {}", resolved.display()))?;
363
364    Ok(RunDirectories {
365        root,
366        logs,
367        resolved,
368    })
369}
370
371fn resolve_entry_flow(
372    override_id: Option<String>,
373    metadata: &PackMetadata,
374    flows: &[FlowDescriptor],
375) -> Result<String> {
376    if let Some(flow) = override_id {
377        return Ok(flow);
378    }
379    if let Some(first) = metadata.entry_flows.first() {
380        return Ok(first.clone());
381    }
382    flows
383        .first()
384        .map(|f| f.id.clone())
385        .ok_or_else(|| anyhow!("pack does not declare any flows"))
386}
387
388fn is_signature_error(message: &str) -> bool {
389    message.to_ascii_lowercase().contains("signature")
390}
391
392fn to_reader_policy(policy: SigningPolicy) -> greentic_pack::reader::SigningPolicy {
393    match policy {
394        SigningPolicy::Strict => greentic_pack::reader::SigningPolicy::Strict,
395        SigningPolicy::DevOk => greentic_pack::reader::SigningPolicy::DevOk,
396    }
397}
398
399fn resolve_component_artifact(
400    pack_path: &Path,
401    load: Option<&PackLoad>,
402    dirs: &RunDirectories,
403) -> Result<PathBuf> {
404    if pack_path
405        .extension()
406        .and_then(|ext| ext.to_str())
407        .map(|ext| ext.eq_ignore_ascii_case("wasm"))
408        .unwrap_or(false)
409    {
410        return Ok(pack_path.to_path_buf());
411    }
412
413    let entry = find_component_entry(pack_path, load)?;
414    let file =
415        File::open(pack_path).with_context(|| format!("failed to open {}", pack_path.display()))?;
416    let mut archive = ZipArchive::new(file)
417        .with_context(|| format!("{} is not a valid gtpack", pack_path.display()))?;
418    let mut component = archive
419        .by_name(&entry)
420        .with_context(|| format!("component {entry} missing from pack"))?;
421    let out_path = dirs.root.join("component.wasm");
422    let mut out_file = File::create(&out_path)
423        .with_context(|| format!("failed to create {}", out_path.display()))?;
424    std::io::copy(&mut component, &mut out_file)
425        .with_context(|| format!("failed to extract component {entry}"))?;
426    Ok(out_path)
427}
428
429fn find_component_entry(pack_path: &Path, load: Option<&PackLoad>) -> Result<String> {
430    if let Some(load) = load
431        && let Some(component) = load.manifest.components.first()
432    {
433        return Ok(component.file_wasm.clone());
434    }
435
436    let file =
437        File::open(pack_path).with_context(|| format!("failed to open {}", pack_path.display()))?;
438    let mut archive = ZipArchive::new(file)
439        .with_context(|| format!("{} is not a valid gtpack", pack_path.display()))?;
440    for index in 0..archive.len() {
441        let entry = archive
442            .by_index(index)
443            .with_context(|| format!("failed to read entry #{index}"))?;
444        if entry.name().ends_with(".wasm") {
445            return Ok(entry.name().to_string());
446        }
447    }
448    Err(anyhow!("pack does not contain a wasm component"))
449}
450
451fn resolve_profile(profile: &Profile, ctx: &TenantContext) -> ResolvedProfile {
452    match profile {
453        Profile::Dev(dev) => ResolvedProfile {
454            tenant_id: ctx
455                .tenant_id
456                .clone()
457                .unwrap_or_else(|| dev.tenant_id.clone()),
458            team_id: ctx.team_id.clone().unwrap_or_else(|| dev.team_id.clone()),
459            user_id: ctx.user_id.clone().unwrap_or_else(|| dev.user_id.clone()),
460            session_id: ctx
461                .session_id
462                .clone()
463                .unwrap_or_else(|| Uuid::new_v4().to_string()),
464            provider_id: PROVIDER_ID_DEV.to_string(),
465            max_node_wall_time_ms: dev.max_node_wall_time_ms,
466            max_run_wall_time_ms: dev.max_run_wall_time_ms,
467        },
468    }
469}
470
471fn build_host_config(profile: &ResolvedProfile, dirs: &RunDirectories) -> HostConfig {
472    let mut store = YamlMapping::new();
473    store.insert(YamlValue::from("kind"), YamlValue::from("local-dir"));
474    store.insert(
475        YamlValue::from("path"),
476        YamlValue::from("./.greentic/tools"),
477    );
478    let mut runtime = YamlMapping::new();
479    runtime.insert(YamlValue::from("max_memory_mb"), YamlValue::from(256u64));
480    runtime.insert(
481        YamlValue::from("timeout_ms"),
482        YamlValue::from(profile.max_node_wall_time_ms),
483    );
484    runtime.insert(YamlValue::from("fuel"), YamlValue::from(50_000_000u64));
485    let mut security = YamlMapping::new();
486    security.insert(YamlValue::from("require_signature"), YamlValue::from(false));
487    HostConfig {
488        tenant: profile.tenant_id.clone(),
489        bindings_path: dirs.resolved.join("dev.bindings.yaml"),
490        flow_type_bindings: HashMap::new(),
491        mcp: McpConfig {
492            store: YamlValue::Mapping(store),
493            security: YamlValue::Mapping(security),
494            runtime: YamlValue::Mapping(runtime),
495            http_enabled: Some(false),
496            retry: Some(McpRetryConfig::default()),
497        },
498        rate_limits: RateLimits::default(),
499        http_enabled: false,
500        secrets_policy: SecretsPolicy::allow_all(),
501        webhook_policy: WebhookPolicy::default(),
502        timers: Vec::new(),
503    }
504}
505
506#[allow(dead_code)]
507#[derive(Clone, Debug)]
508struct ResolvedProfile {
509    tenant_id: String,
510    team_id: String,
511    user_id: String,
512    session_id: String,
513    provider_id: String,
514    max_node_wall_time_ms: u64,
515    max_run_wall_time_ms: u64,
516}
517
518#[derive(Clone, Debug, Serialize, Deserialize)]
519pub struct RunResult {
520    pub session_id: String,
521    pub pack_id: String,
522    pub pack_version: String,
523    pub flow_id: String,
524    pub started_at_utc: String,
525    pub finished_at_utc: String,
526    pub status: RunStatus,
527    pub node_summaries: Vec<NodeSummary>,
528    pub failures: BTreeMap<String, NodeFailure>,
529    pub artifacts_dir: PathBuf,
530}
531
532#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
533pub enum RunStatus {
534    Success,
535    PartialFailure,
536    Failure,
537}
538
539#[derive(Clone, Debug, Serialize, Deserialize)]
540pub struct NodeSummary {
541    pub node_id: String,
542    pub component: String,
543    pub status: NodeStatus,
544    pub duration_ms: u64,
545}
546
547#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
548pub enum NodeStatus {
549    Ok,
550    Skipped,
551    Error,
552}
553
554#[derive(Clone, Debug, Serialize, Deserialize)]
555pub struct NodeFailure {
556    pub code: String,
557    pub message: String,
558    pub details: Value,
559    pub transcript_offsets: (u64, u64),
560    pub log_paths: Vec<PathBuf>,
561}
562
563#[derive(Clone)]
564struct RunDirectories {
565    root: PathBuf,
566    logs: PathBuf,
567    resolved: PathBuf,
568}
569
570enum RunCompletion {
571    Ok,
572    Err(anyhow::Error),
573}
574
575struct RunRecorder {
576    directories: RunDirectories,
577    profile: ResolvedProfile,
578    flow_id: Mutex<String>,
579    pack_meta: Mutex<PackMetadata>,
580    transcript: Mutex<TranscriptWriter>,
581    state: Mutex<RunRecorderState>,
582}
583
584impl RunRecorder {
585    fn new(
586        dirs: RunDirectories,
587        profile: &ResolvedProfile,
588        flow_id: Option<String>,
589        pack_meta: PackMetadata,
590        hook: Option<TranscriptHook>,
591    ) -> Result<Self> {
592        let transcript_path = dirs.root.join("transcript.jsonl");
593        let file = File::create(&transcript_path)
594            .with_context(|| format!("failed to open {}", transcript_path.display()))?;
595        Ok(Self {
596            directories: dirs,
597            profile: profile.clone(),
598            flow_id: Mutex::new(flow_id.unwrap_or_else(|| "unknown".into())),
599            pack_meta: Mutex::new(pack_meta),
600            transcript: Mutex::new(TranscriptWriter::new(BufWriter::new(file), hook)),
601            state: Mutex::new(RunRecorderState::default()),
602        })
603    }
604
605    fn finalise(
606        &self,
607        completion: RunCompletion,
608        started_at: OffsetDateTime,
609        finished_at: OffsetDateTime,
610    ) -> Result<RunResult> {
611        let status = match &completion {
612            RunCompletion::Ok => RunStatus::Success,
613            RunCompletion::Err(_) => RunStatus::Failure,
614        };
615
616        if let RunCompletion::Err(err) = completion {
617            warn!(error = %err, "pack execution failed");
618        }
619
620        let started = started_at
621            .format(&Rfc3339)
622            .unwrap_or_else(|_| started_at.to_string());
623        let finished = finished_at
624            .format(&Rfc3339)
625            .unwrap_or_else(|_| finished_at.to_string());
626
627        let state = self.state.lock();
628        let mut summaries = Vec::new();
629        let mut failures = BTreeMap::new();
630        for node_id in &state.order {
631            if let Some(record) = state.nodes.get(node_id) {
632                let duration = record.duration_ms.unwrap_or(0);
633                summaries.push(NodeSummary {
634                    node_id: node_id.clone(),
635                    component: record.component.clone(),
636                    status: record.status.clone(),
637                    duration_ms: duration,
638                });
639                if record.status == NodeStatus::Error {
640                    let start_offset = record.transcript_start.unwrap_or(0);
641                    let err_offset = record.transcript_error.unwrap_or(start_offset);
642                    failures.insert(
643                        node_id.clone(),
644                        NodeFailure {
645                            code: record
646                                .failure_code
647                                .clone()
648                                .unwrap_or_else(|| "component-failed".into()),
649                            message: record
650                                .failure_message
651                                .clone()
652                                .unwrap_or_else(|| "node failed".into()),
653                            details: record
654                                .failure_details
655                                .clone()
656                                .unwrap_or_else(|| json!({ "node": node_id })),
657                            transcript_offsets: (start_offset, err_offset),
658                            log_paths: record.log_paths.clone(),
659                        },
660                    );
661                }
662            }
663        }
664
665        let mut final_status = status;
666        if final_status == RunStatus::Success && !failures.is_empty() {
667            final_status = RunStatus::PartialFailure;
668        }
669
670        let pack_meta = self.pack_meta.lock();
671        let flow_id = self.flow_id.lock().clone();
672        Ok(RunResult {
673            session_id: self.profile.session_id.clone(),
674            pack_id: pack_meta.pack_id.clone(),
675            pack_version: pack_meta.version.clone(),
676            flow_id,
677            started_at_utc: started,
678            finished_at_utc: finished,
679            status: final_status,
680            node_summaries: summaries,
681            failures,
682            artifacts_dir: self.directories.root.clone(),
683        })
684    }
685
686    fn set_flow_id(&self, flow_id: &str) {
687        *self.flow_id.lock() = flow_id.to_string();
688    }
689
690    fn update_pack_metadata(&self, meta: PackMetadata) {
691        *self.pack_meta.lock() = meta;
692    }
693
694    fn current_flow_id(&self) -> String {
695        self.flow_id.lock().clone()
696    }
697
698    fn record_verify_event(&self, status: &str, message: &str) -> Result<()> {
699        let timestamp = OffsetDateTime::now_utc();
700        let event = json!({
701            "ts": timestamp
702                .format(&Rfc3339)
703                .unwrap_or_else(|_| timestamp.to_string()),
704            "session_id": self.profile.session_id,
705            "flow_id": self.current_flow_id(),
706            "node_id": Value::Null,
707            "component": "verify.pack",
708            "phase": "verify",
709            "status": status,
710            "inputs": Value::Null,
711            "outputs": Value::Null,
712            "error": json!({ "message": message }),
713            "metrics": Value::Null,
714            "schema_id": Value::Null,
715            "defaults_applied": Value::Null,
716            "redactions": Value::Array(Vec::new()),
717        });
718        self.transcript.lock().write(&event).map(|_| ())
719    }
720
721    fn write_mock_event(&self, capability: &str, provider: &str, payload: Value) -> Result<()> {
722        let timestamp = OffsetDateTime::now_utc();
723        let event = json!({
724            "ts": timestamp
725                .format(&Rfc3339)
726                .unwrap_or_else(|_| timestamp.to_string()),
727            "session_id": self.profile.session_id,
728            "flow_id": self.current_flow_id(),
729            "node_id": Value::Null,
730            "component": format!("mock::{capability}"),
731            "phase": "mock",
732            "status": "ok",
733            "inputs": json!({ "capability": capability, "provider": provider }),
734            "outputs": payload,
735            "error": Value::Null,
736            "metrics": Value::Null,
737            "schema_id": Value::Null,
738            "defaults_applied": Value::Null,
739            "redactions": Value::Array(Vec::new()),
740        });
741        self.transcript.lock().write(&event).map(|_| ())
742    }
743
744    fn handle_node_start(&self, event: &NodeEvent<'_>) -> Result<()> {
745        let timestamp = OffsetDateTime::now_utc();
746        let (redacted_payload, redactions) = redact_value(event.payload, "$.inputs.payload");
747        let inputs = json!({
748            "payload": redacted_payload,
749            "context": {
750                "tenant_id": self.profile.tenant_id.as_str(),
751                "team_id": self.profile.team_id.as_str(),
752                "user_id": self.profile.user_id.as_str(),
753            }
754        });
755        let flow_id = self.current_flow_id();
756        let event_json = build_transcript_event(TranscriptEventArgs {
757            profile: &self.profile,
758            flow_id: &flow_id,
759            node_id: event.node_id,
760            component: &event.node.component,
761            phase: "start",
762            status: "ok",
763            timestamp,
764            inputs,
765            outputs: Value::Null,
766            error: Value::Null,
767            redactions,
768        });
769        let (start_offset, _) = self.transcript.lock().write(&event_json)?;
770
771        let mut state = self.state.lock();
772        let node_key = event.node_id.to_string();
773        if !state.order.iter().any(|id| id == &node_key) {
774            state.order.push(node_key.clone());
775        }
776        let entry = state.nodes.entry(node_key).or_insert_with(|| {
777            NodeExecutionRecord::new(event.node.component.clone(), &self.directories)
778        });
779        entry.start_instant = Some(Instant::now());
780        entry.status = NodeStatus::Ok;
781        entry.transcript_start = Some(start_offset);
782        Ok(())
783    }
784
785    fn handle_node_end(&self, event: &NodeEvent<'_>, output: &Value) -> Result<()> {
786        let timestamp = OffsetDateTime::now_utc();
787        let (redacted_output, redactions) = redact_value(output, "$.outputs");
788        let flow_id = self.current_flow_id();
789        let event_json = build_transcript_event(TranscriptEventArgs {
790            profile: &self.profile,
791            flow_id: &flow_id,
792            node_id: event.node_id,
793            component: &event.node.component,
794            phase: "end",
795            status: "ok",
796            timestamp,
797            inputs: Value::Null,
798            outputs: redacted_output,
799            error: Value::Null,
800            redactions,
801        });
802        self.transcript.lock().write(&event_json)?;
803
804        let mut state = self.state.lock();
805        if let Some(entry) = state.nodes.get_mut(event.node_id)
806            && let Some(started) = entry.start_instant.take()
807        {
808            entry.duration_ms = Some(started.elapsed().as_millis() as u64);
809        }
810        Ok(())
811    }
812
813    fn handle_node_error(
814        &self,
815        event: &NodeEvent<'_>,
816        error: &dyn std::error::Error,
817    ) -> Result<()> {
818        let timestamp = OffsetDateTime::now_utc();
819        let error_message = error.to_string();
820        let error_json = json!({
821            "code": "component-failed",
822            "message": error_message,
823            "details": {
824                "node": event.node_id,
825            }
826        });
827        let flow_id = self.current_flow_id();
828        let event_json = build_transcript_event(TranscriptEventArgs {
829            profile: &self.profile,
830            flow_id: &flow_id,
831            node_id: event.node_id,
832            component: &event.node.component,
833            phase: "error",
834            status: "error",
835            timestamp,
836            inputs: Value::Null,
837            outputs: Value::Null,
838            error: error_json.clone(),
839            redactions: Vec::new(),
840        });
841        let (_, end_offset) = self.transcript.lock().write(&event_json)?;
842
843        let mut state = self.state.lock();
844        if let Some(entry) = state.nodes.get_mut(event.node_id) {
845            entry.status = NodeStatus::Error;
846            if let Some(started) = entry.start_instant.take() {
847                entry.duration_ms = Some(started.elapsed().as_millis() as u64);
848            }
849            entry.transcript_error = Some(end_offset);
850            entry.failure_code = Some("component-failed".to_string());
851            entry.failure_message = Some(error_message.clone());
852            entry.failure_details = Some(error_json);
853            let log_path = self
854                .directories
855                .logs
856                .join(format!("{}.stderr.log", sanitize_id(event.node_id)));
857            if let Ok(mut file) = File::create(&log_path) {
858                let _ = writeln!(file, "{error_message}");
859            }
860            entry.log_paths.push(log_path);
861        }
862        Ok(())
863    }
864}
865
866impl ExecutionObserver for RunRecorder {
867    fn on_node_start(&self, event: &NodeEvent<'_>) {
868        if let Err(err) = self.handle_node_start(event) {
869            warn!(node = event.node_id, error = %err, "failed to record node start");
870        }
871    }
872
873    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
874        if let Err(err) = self.handle_node_end(event, output) {
875            warn!(node = event.node_id, error = %err, "failed to record node end");
876        }
877    }
878
879    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
880        if let Err(err) = self.handle_node_error(event, error) {
881            warn!(node = event.node_id, error = %err, "failed to record node error");
882        }
883    }
884}
885
886impl MockEventSink for RunRecorder {
887    fn on_mock_event(&self, capability: &str, provider: &str, payload: &Value) {
888        if let Err(err) = self.write_mock_event(capability, provider, payload.clone()) {
889            warn!(?capability, ?provider, error = %err, "failed to record mock event");
890        }
891    }
892}
893
894#[derive(Default)]
895struct RunRecorderState {
896    nodes: BTreeMap<String, NodeExecutionRecord>,
897    order: Vec<String>,
898}
899
900#[derive(Clone)]
901struct NodeExecutionRecord {
902    component: String,
903    status: NodeStatus,
904    duration_ms: Option<u64>,
905    transcript_start: Option<u64>,
906    transcript_error: Option<u64>,
907    log_paths: Vec<PathBuf>,
908    failure_code: Option<String>,
909    failure_message: Option<String>,
910    failure_details: Option<Value>,
911    start_instant: Option<Instant>,
912}
913
914impl NodeExecutionRecord {
915    fn new(component: String, _dirs: &RunDirectories) -> Self {
916        Self {
917            component,
918            status: NodeStatus::Ok,
919            duration_ms: None,
920            transcript_start: None,
921            transcript_error: None,
922            log_paths: Vec::new(),
923            failure_code: None,
924            failure_message: None,
925            failure_details: None,
926            start_instant: None,
927        }
928    }
929}
930
931struct TranscriptWriter {
932    writer: BufWriter<File>,
933    offset: u64,
934    hook: Option<TranscriptHook>,
935}
936
937impl TranscriptWriter {
938    fn new(writer: BufWriter<File>, hook: Option<TranscriptHook>) -> Self {
939        Self {
940            writer,
941            offset: 0,
942            hook,
943        }
944    }
945
946    fn write(&mut self, value: &Value) -> Result<(u64, u64)> {
947        let line = serde_json::to_vec(value)?;
948        let start = self.offset;
949        self.writer.write_all(&line)?;
950        self.writer.write_all(b"\n")?;
951        self.writer.flush()?;
952        self.offset += line.len() as u64 + 1;
953        if let Some(hook) = &self.hook {
954            hook(value);
955        }
956        Ok((start, self.offset))
957    }
958}
959
960struct TranscriptEventArgs<'a> {
961    profile: &'a ResolvedProfile,
962    flow_id: &'a str,
963    node_id: &'a str,
964    component: &'a str,
965    phase: &'a str,
966    status: &'a str,
967    timestamp: OffsetDateTime,
968    inputs: Value,
969    outputs: Value,
970    error: Value,
971    redactions: Vec<String>,
972}
973
974fn build_transcript_event(args: TranscriptEventArgs<'_>) -> Value {
975    let ts = args
976        .timestamp
977        .format(&Rfc3339)
978        .unwrap_or_else(|_| args.timestamp.to_string());
979    json!({
980        "ts": ts,
981        "session_id": args.profile.session_id.as_str(),
982        "flow_id": args.flow_id,
983        "node_id": args.node_id,
984        "component": args.component,
985        "phase": args.phase,
986        "status": args.status,
987        "inputs": args.inputs,
988        "outputs": args.outputs,
989        "error": args.error,
990        "metrics": {
991            "duration_ms": null,
992            "cpu_time_ms": null,
993            "mem_peak_bytes": null,
994        },
995        "schema_id": Value::Null,
996        "defaults_applied": Value::Array(Vec::new()),
997        "redactions": args.redactions,
998    })
999}
1000
1001fn redact_value(value: &Value, base: &str) -> (Value, Vec<String>) {
1002    let mut paths = Vec::new();
1003    let redacted = redact_recursive(value, base, &mut paths);
1004    (redacted, paths)
1005}
1006
1007fn redact_recursive(value: &Value, path: &str, acc: &mut Vec<String>) -> Value {
1008    match value {
1009        Value::Object(map) => {
1010            let mut new_map = JsonMap::new();
1011            for (key, val) in map {
1012                let child_path = format!("{path}.{key}");
1013                if is_sensitive_key(key) {
1014                    acc.push(child_path);
1015                    new_map.insert(key.clone(), Value::String("__REDACTED__".into()));
1016                } else {
1017                    new_map.insert(key.clone(), redact_recursive(val, &child_path, acc));
1018                }
1019            }
1020            Value::Object(new_map)
1021        }
1022        Value::Array(items) => {
1023            let mut new_items = Vec::new();
1024            for (idx, item) in items.iter().enumerate() {
1025                let child_path = format!("{path}[{idx}]");
1026                new_items.push(redact_recursive(item, &child_path, acc));
1027            }
1028            Value::Array(new_items)
1029        }
1030        other => other.clone(),
1031    }
1032}
1033
1034fn is_sensitive_key(key: &str) -> bool {
1035    let lower = key.to_ascii_lowercase();
1036    const MARKERS: [&str; 5] = ["secret", "token", "password", "authorization", "cookie"];
1037    MARKERS.iter().any(|marker| lower.contains(marker))
1038}
1039
1040fn sanitize_id(value: &str) -> String {
1041    value
1042        .chars()
1043        .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
1044        .collect()
1045}