greentic_runner_desktop/
lib.rs

1use anyhow::{Context, Result, anyhow};
2use greentic_pack::reader::{PackLoad, open_pack};
3use greentic_runner_host::RunnerWasiPolicy;
4use greentic_runner_host::config::{
5    FlowRetryConfig, HostConfig, RateLimits, SecretsPolicy, WebhookPolicy,
6};
7use greentic_runner_host::pack::{FlowDescriptor, PackMetadata, PackRuntime};
8use greentic_runner_host::runner::engine::{ExecutionObserver, FlowContext, FlowEngine, NodeEvent};
9pub use greentic_runner_host::runner::mocks::{
10    HttpMock, HttpMockMode, KvMock, MocksConfig, SecretsMock, TelemetryMock, TimeMock, ToolsMock,
11};
12use greentic_runner_host::runner::mocks::{MockEventSink, MockLayer};
13use greentic_runner_host::secrets::default_manager;
14use greentic_runner_host::storage::{new_session_store, new_state_store};
15use parking_lot::Mutex;
16use runner_core::normalize_under_root;
17use serde::{Deserialize, Serialize};
18use serde_json::{Map as JsonMap, Value, json};
19use std::collections::{BTreeMap, HashMap};
20use std::fmt;
21use std::fs::{self, File};
22use std::io::{BufWriter, Write};
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25use std::time::Instant;
26use time::OffsetDateTime;
27use time::format_description::well_known::Rfc3339;
28use tokio::runtime::Runtime;
29use tracing::{info, warn};
30use uuid::Uuid;
31use zip::ZipArchive;
32
33const PROVIDER_ID_DEV: &str = "greentic-dev";
34
35/// Hook invoked for each transcript event.
36pub type TranscriptHook = Arc<dyn Fn(&Value) + Send + Sync>;
37
38/// Configuration for emitting OTLP telemetry.
39#[derive(Clone, Debug, Serialize, Deserialize, Default)]
40pub struct OtlpHook {
41    pub endpoint: String,
42    #[serde(default)]
43    pub headers: Vec<(String, String)>,
44    #[serde(default)]
45    pub sample_all: bool,
46}
47
48/// Execution profile.
49#[derive(Clone, Debug)]
50#[non_exhaustive]
51pub enum Profile {
52    Dev(DevProfile),
53}
54
55impl Default for Profile {
56    fn default() -> Self {
57        Self::Dev(DevProfile::default())
58    }
59}
60
61/// Tunables for the developer profile.
62#[derive(Clone, Debug)]
63pub struct DevProfile {
64    pub tenant_id: String,
65    pub team_id: String,
66    pub user_id: String,
67    pub max_node_wall_time_ms: u64,
68    pub max_run_wall_time_ms: u64,
69}
70
71impl Default for DevProfile {
72    fn default() -> Self {
73        Self {
74            tenant_id: "local-dev".to_string(),
75            team_id: "default".to_string(),
76            user_id: "developer".to_string(),
77            max_node_wall_time_ms: 30_000,
78            max_run_wall_time_ms: 600_000,
79        }
80    }
81}
82
83/// Tenant context overrides supplied by callers.
84#[derive(Clone, Debug, Default)]
85pub struct TenantContext {
86    pub tenant_id: Option<String>,
87    pub team_id: Option<String>,
88    pub user_id: Option<String>,
89    pub session_id: Option<String>,
90}
91
92impl TenantContext {
93    pub fn default_local() -> Self {
94        Self {
95            tenant_id: Some("local-dev".into()),
96            team_id: Some("default".into()),
97            user_id: Some("developer".into()),
98            session_id: None,
99        }
100    }
101}
102
103/// User supplied options for a run.
104#[derive(Clone)]
105pub struct RunOptions {
106    pub profile: Profile,
107    pub entry_flow: Option<String>,
108    pub input: Value,
109    pub ctx: TenantContext,
110    pub transcript: Option<TranscriptHook>,
111    pub otlp: Option<OtlpHook>,
112    pub mocks: MocksConfig,
113    pub artifacts_dir: Option<PathBuf>,
114    pub signing: SigningPolicy,
115}
116
117impl Default for RunOptions {
118    fn default() -> Self {
119        desktop_defaults()
120    }
121}
122
123impl fmt::Debug for RunOptions {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("RunOptions")
126            .field("profile", &self.profile)
127            .field("entry_flow", &self.entry_flow)
128            .field("input", &self.input)
129            .field("ctx", &self.ctx)
130            .field("transcript", &self.transcript.is_some())
131            .field("otlp", &self.otlp)
132            .field("mocks", &self.mocks)
133            .field("artifacts_dir", &self.artifacts_dir)
134            .field("signing", &self.signing)
135            .finish()
136    }
137}
138
139#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
140pub enum SigningPolicy {
141    Strict,
142    DevOk,
143}
144
145/// Convenience builder that retains a baseline `RunOptions`.
146#[derive(Clone, Debug)]
147pub struct Runner {
148    base: RunOptions,
149}
150
151impl Runner {
152    pub fn new() -> Self {
153        Self {
154            base: desktop_defaults(),
155        }
156    }
157
158    pub fn profile(mut self, profile: Profile) -> Self {
159        self.base.profile = profile;
160        self
161    }
162
163    pub fn with_mocks(mut self, mocks: MocksConfig) -> Self {
164        self.base.mocks = mocks;
165        self
166    }
167
168    pub fn configure(mut self, f: impl FnOnce(&mut RunOptions)) -> Self {
169        f(&mut self.base);
170        self
171    }
172
173    pub fn run_pack<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
174        run_pack_with_options(pack_path, self.base.clone())
175    }
176
177    pub fn run_pack_with<P: AsRef<Path>>(
178        &self,
179        pack_path: P,
180        f: impl FnOnce(&mut RunOptions),
181    ) -> Result<RunResult> {
182        let mut opts = self.base.clone();
183        f(&mut opts);
184        run_pack_with_options(pack_path, opts)
185    }
186}
187
188impl Default for Runner {
189    fn default() -> Self {
190        Self::new()
191    }
192}
193
194/// Execute a pack with the provided options.
195pub fn run_pack_with_options<P: AsRef<Path>>(pack_path: P, opts: RunOptions) -> Result<RunResult> {
196    let runtime = Runtime::new().context("failed to create tokio runtime")?;
197    runtime.block_on(run_pack_async(pack_path.as_ref(), opts))
198}
199
200/// Reasonable defaults for local desktop invocations.
201pub fn desktop_defaults() -> RunOptions {
202    let otlp = std::env::var("OTLP_ENDPOINT")
203        .ok()
204        .map(|endpoint| OtlpHook {
205            endpoint,
206            headers: Vec::new(),
207            sample_all: true,
208        });
209    RunOptions {
210        profile: Profile::Dev(DevProfile::default()),
211        entry_flow: None,
212        input: json!({}),
213        ctx: TenantContext::default_local(),
214        transcript: None,
215        otlp,
216        mocks: MocksConfig::default(),
217        artifacts_dir: None,
218        signing: SigningPolicy::DevOk,
219    }
220}
221
222async fn run_pack_async(pack_path: &Path, opts: RunOptions) -> Result<RunResult> {
223    let pack_path = normalize_pack_path(pack_path)?;
224    let resolved_profile = resolve_profile(&opts.profile, &opts.ctx);
225    if let Some(otlp) = &opts.otlp {
226        apply_otlp_hook(otlp);
227    }
228
229    let directories = prepare_run_dirs(opts.artifacts_dir.clone())?;
230    info!(run_dir = %directories.root.display(), "prepared desktop run directory");
231
232    let mock_layer = Arc::new(MockLayer::new(opts.mocks.clone(), &directories.root)?);
233
234    let recorder = Arc::new(RunRecorder::new(
235        directories.clone(),
236        &resolved_profile,
237        None,
238        PackMetadata::fallback(&pack_path),
239        opts.transcript.clone(),
240    )?);
241
242    let mock_sink: Arc<dyn MockEventSink> = recorder.clone();
243    mock_layer.register_sink(mock_sink);
244
245    let mut pack_load: Option<PackLoad> = None;
246    match open_pack(&pack_path, to_reader_policy(opts.signing)) {
247        Ok(load) => {
248            let meta = &load.manifest.meta;
249            recorder.update_pack_metadata(PackMetadata {
250                pack_id: meta.pack_id.clone(),
251                version: meta.version.to_string(),
252                entry_flows: meta.entry_flows.clone(),
253                secret_requirements: Vec::new(),
254            });
255            pack_load = Some(load);
256        }
257        Err(err) => {
258            recorder.record_verify_event("error", &err.message)?;
259            if opts.signing == SigningPolicy::DevOk && is_signature_error(&err.message) {
260                warn!(error = %err.message, "continuing despite signature error (dev policy)");
261            } else {
262                return Err(anyhow!("pack verification failed: {}", err.message));
263            }
264        }
265    }
266
267    let host_config = Arc::new(build_host_config(&resolved_profile, &directories));
268    let component_artifact =
269        resolve_component_artifact(&pack_path, pack_load.as_ref(), &directories)?;
270    let archive_source = if pack_path
271        .extension()
272        .and_then(|ext| ext.to_str())
273        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
274        .unwrap_or(false)
275    {
276        Some(&pack_path)
277    } else {
278        None
279    };
280
281    let session_store = new_session_store();
282    let state_store = new_state_store();
283    let secrets_manager = default_manager();
284    let pack = Arc::new(
285        PackRuntime::load(
286            &component_artifact,
287            Arc::clone(&host_config),
288            Some(Arc::clone(&mock_layer)),
289            archive_source.map(|p| p as &Path),
290            Some(Arc::clone(&session_store)),
291            Some(Arc::clone(&state_store)),
292            Arc::new(RunnerWasiPolicy::default()),
293            secrets_manager,
294            host_config.oauth_broker_config(),
295            false,
296        )
297        .await
298        .with_context(|| format!("failed to load pack {}", component_artifact.display()))?,
299    );
300    recorder.update_pack_metadata(pack.metadata().clone());
301
302    let flows = pack
303        .list_flows()
304        .await
305        .context("failed to enumerate flows")?;
306    let entry_flow_id = resolve_entry_flow(opts.entry_flow.clone(), pack.metadata(), &flows)?;
307    recorder.set_flow_id(&entry_flow_id);
308
309    let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&host_config))
310        .await
311        .context("failed to prime flow engine")?;
312
313    let started_at = OffsetDateTime::now_utc();
314    let tenant_str = host_config.tenant.clone();
315    let session_id_owned = resolved_profile.session_id.clone();
316    let provider_id_owned = resolved_profile.provider_id.clone();
317    let recorder_ref: &RunRecorder = &recorder;
318    let mock_ref: &MockLayer = &mock_layer;
319    let ctx = FlowContext {
320        tenant: &tenant_str,
321        flow_id: &entry_flow_id,
322        node_id: None,
323        tool: None,
324        action: Some("run_pack"),
325        session_id: Some(session_id_owned.as_str()),
326        provider_id: Some(provider_id_owned.as_str()),
327        retry_config: host_config.retry_config().into(),
328        observer: Some(recorder_ref),
329        mocks: Some(mock_ref),
330    };
331
332    let execution = engine.execute(ctx, opts.input.clone()).await;
333    let finished_at = OffsetDateTime::now_utc();
334
335    let status = match execution {
336        Ok(result) => match result.status {
337            greentic_runner_host::runner::engine::FlowStatus::Completed => RunCompletion::Ok,
338            greentic_runner_host::runner::engine::FlowStatus::Waiting(wait) => {
339                let reason = wait
340                    .reason
341                    .unwrap_or_else(|| "flow paused unexpectedly".to_string());
342                RunCompletion::Err(anyhow::anyhow!(reason))
343            }
344        },
345        Err(err) => RunCompletion::Err(err),
346    };
347
348    let result = recorder.finalise(status, started_at, finished_at)?;
349
350    let run_json_path = directories.root.join("run.json");
351    fs::write(&run_json_path, serde_json::to_vec_pretty(&result)?)
352        .with_context(|| format!("failed to write run summary {}", run_json_path.display()))?;
353
354    Ok(result)
355}
356
357fn apply_otlp_hook(hook: &OtlpHook) {
358    info!(
359        endpoint = %hook.endpoint,
360        sample_all = hook.sample_all,
361        headers = %hook.headers.len(),
362        "OTLP hook requested (set OTEL_* env vars before invoking run_pack)"
363    );
364}
365
366fn normalize_pack_path(path: &Path) -> Result<PathBuf> {
367    if path.is_absolute() {
368        let parent = path
369            .parent()
370            .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
371        let root = parent
372            .canonicalize()
373            .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
374        let file = path
375            .file_name()
376            .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
377        return normalize_under_root(&root, Path::new(file));
378    }
379
380    let cwd = std::env::current_dir().context("failed to resolve current directory")?;
381    let base = if let Some(parent) = path.parent() {
382        cwd.join(parent)
383    } else {
384        cwd
385    };
386    let root = base
387        .canonicalize()
388        .with_context(|| format!("failed to canonicalize {}", base.display()))?;
389    let file = path
390        .file_name()
391        .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
392    normalize_under_root(&root, Path::new(file))
393}
394
395fn prepare_run_dirs(root_override: Option<PathBuf>) -> Result<RunDirectories> {
396    let root = if let Some(dir) = root_override {
397        dir
398    } else {
399        let timestamp = OffsetDateTime::now_utc()
400            .format(&Rfc3339)
401            .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
402            .replace(':', "-");
403        let short_id = Uuid::new_v4()
404            .to_string()
405            .chars()
406            .take(6)
407            .collect::<String>();
408        PathBuf::from(".greentic")
409            .join("runs")
410            .join(format!("{timestamp}_{short_id}"))
411    };
412
413    fs::create_dir_all(&root).with_context(|| format!("failed to create {}", root.display()))?;
414    let logs = root.join("logs");
415    let resolved = root.join("resolved_config");
416    fs::create_dir_all(&logs).with_context(|| format!("failed to create {}", logs.display()))?;
417    fs::create_dir_all(&resolved)
418        .with_context(|| format!("failed to create {}", resolved.display()))?;
419
420    Ok(RunDirectories {
421        root,
422        logs,
423        resolved,
424    })
425}
426
427fn resolve_entry_flow(
428    override_id: Option<String>,
429    metadata: &PackMetadata,
430    flows: &[FlowDescriptor],
431) -> Result<String> {
432    if let Some(flow) = override_id {
433        return Ok(flow);
434    }
435    if let Some(first) = metadata.entry_flows.first() {
436        return Ok(first.clone());
437    }
438    flows
439        .first()
440        .map(|f| f.id.clone())
441        .ok_or_else(|| anyhow!("pack does not declare any flows"))
442}
443
444fn is_signature_error(message: &str) -> bool {
445    message.to_ascii_lowercase().contains("signature")
446}
447
448fn to_reader_policy(policy: SigningPolicy) -> greentic_pack::reader::SigningPolicy {
449    match policy {
450        SigningPolicy::Strict => greentic_pack::reader::SigningPolicy::Strict,
451        SigningPolicy::DevOk => greentic_pack::reader::SigningPolicy::DevOk,
452    }
453}
454
455fn resolve_component_artifact(
456    pack_path: &Path,
457    load: Option<&PackLoad>,
458    dirs: &RunDirectories,
459) -> Result<PathBuf> {
460    if pack_path
461        .extension()
462        .and_then(|ext| ext.to_str())
463        .map(|ext| ext.eq_ignore_ascii_case("wasm"))
464        .unwrap_or(false)
465    {
466        return Ok(pack_path.to_path_buf());
467    }
468
469    let entry = find_component_entry(pack_path, load)?;
470    let file =
471        File::open(pack_path).with_context(|| format!("failed to open {}", pack_path.display()))?;
472    let mut archive = ZipArchive::new(file)
473        .with_context(|| format!("{} is not a valid gtpack", pack_path.display()))?;
474    let mut component = archive
475        .by_name(&entry)
476        .with_context(|| format!("component {entry} missing from pack"))?;
477    let out_path = dirs.root.join("component.wasm");
478    let mut out_file = File::create(&out_path)
479        .with_context(|| format!("failed to create {}", out_path.display()))?;
480    std::io::copy(&mut component, &mut out_file)
481        .with_context(|| format!("failed to extract component {entry}"))?;
482    Ok(out_path)
483}
484
485fn find_component_entry(pack_path: &Path, load: Option<&PackLoad>) -> Result<String> {
486    if let Some(load) = load
487        && let Some(component) = load.manifest.components.first()
488    {
489        return Ok(component.file_wasm.clone());
490    }
491
492    let file =
493        File::open(pack_path).with_context(|| format!("failed to open {}", pack_path.display()))?;
494    let mut archive = ZipArchive::new(file)
495        .with_context(|| format!("{} is not a valid gtpack", pack_path.display()))?;
496    for index in 0..archive.len() {
497        let entry = archive
498            .by_index(index)
499            .with_context(|| format!("failed to read entry #{index}"))?;
500        if entry.name().ends_with(".wasm") {
501            return Ok(entry.name().to_string());
502        }
503    }
504    Err(anyhow!("pack does not contain a wasm component"))
505}
506
507fn resolve_profile(profile: &Profile, ctx: &TenantContext) -> ResolvedProfile {
508    match profile {
509        Profile::Dev(dev) => ResolvedProfile {
510            tenant_id: ctx
511                .tenant_id
512                .clone()
513                .unwrap_or_else(|| dev.tenant_id.clone()),
514            team_id: ctx.team_id.clone().unwrap_or_else(|| dev.team_id.clone()),
515            user_id: ctx.user_id.clone().unwrap_or_else(|| dev.user_id.clone()),
516            session_id: ctx
517                .session_id
518                .clone()
519                .unwrap_or_else(|| Uuid::new_v4().to_string()),
520            provider_id: PROVIDER_ID_DEV.to_string(),
521            max_node_wall_time_ms: dev.max_node_wall_time_ms,
522            max_run_wall_time_ms: dev.max_run_wall_time_ms,
523        },
524    }
525}
526
527fn build_host_config(profile: &ResolvedProfile, dirs: &RunDirectories) -> HostConfig {
528    HostConfig {
529        tenant: profile.tenant_id.clone(),
530        bindings_path: dirs.resolved.join("dev.bindings.yaml"),
531        flow_type_bindings: HashMap::new(),
532        rate_limits: RateLimits::default(),
533        retry: FlowRetryConfig::default(),
534        http_enabled: false,
535        secrets_policy: SecretsPolicy::allow_all(),
536        webhook_policy: WebhookPolicy::default(),
537        timers: Vec::new(),
538        oauth: None,
539        mocks: None,
540    }
541}
542
543#[allow(dead_code)]
544#[derive(Clone, Debug)]
545struct ResolvedProfile {
546    tenant_id: String,
547    team_id: String,
548    user_id: String,
549    session_id: String,
550    provider_id: String,
551    max_node_wall_time_ms: u64,
552    max_run_wall_time_ms: u64,
553}
554
555#[derive(Clone, Debug, Serialize, Deserialize)]
556pub struct RunResult {
557    pub session_id: String,
558    pub pack_id: String,
559    pub pack_version: String,
560    pub flow_id: String,
561    pub started_at_utc: String,
562    pub finished_at_utc: String,
563    pub status: RunStatus,
564    pub error: Option<String>,
565    pub node_summaries: Vec<NodeSummary>,
566    pub failures: BTreeMap<String, NodeFailure>,
567    pub artifacts_dir: PathBuf,
568}
569
570#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
571pub enum RunStatus {
572    Success,
573    PartialFailure,
574    Failure,
575}
576
577#[derive(Clone, Debug, Serialize, Deserialize)]
578pub struct NodeSummary {
579    pub node_id: String,
580    pub component: String,
581    pub status: NodeStatus,
582    pub duration_ms: u64,
583}
584
585#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
586pub enum NodeStatus {
587    Ok,
588    Skipped,
589    Error,
590}
591
592#[derive(Clone, Debug, Serialize, Deserialize)]
593pub struct NodeFailure {
594    pub code: String,
595    pub message: String,
596    pub details: Value,
597    pub transcript_offsets: (u64, u64),
598    pub log_paths: Vec<PathBuf>,
599}
600
601#[derive(Clone)]
602struct RunDirectories {
603    root: PathBuf,
604    logs: PathBuf,
605    resolved: PathBuf,
606}
607
608enum RunCompletion {
609    Ok,
610    Err(anyhow::Error),
611}
612
613struct RunRecorder {
614    directories: RunDirectories,
615    profile: ResolvedProfile,
616    flow_id: Mutex<String>,
617    pack_meta: Mutex<PackMetadata>,
618    transcript: Mutex<TranscriptWriter>,
619    state: Mutex<RunRecorderState>,
620}
621
622impl RunRecorder {
623    fn new(
624        dirs: RunDirectories,
625        profile: &ResolvedProfile,
626        flow_id: Option<String>,
627        pack_meta: PackMetadata,
628        hook: Option<TranscriptHook>,
629    ) -> Result<Self> {
630        let transcript_path = dirs.root.join("transcript.jsonl");
631        let file = File::create(&transcript_path)
632            .with_context(|| format!("failed to open {}", transcript_path.display()))?;
633        Ok(Self {
634            directories: dirs,
635            profile: profile.clone(),
636            flow_id: Mutex::new(flow_id.unwrap_or_else(|| "unknown".into())),
637            pack_meta: Mutex::new(pack_meta),
638            transcript: Mutex::new(TranscriptWriter::new(BufWriter::new(file), hook)),
639            state: Mutex::new(RunRecorderState::default()),
640        })
641    }
642
643    fn finalise(
644        &self,
645        completion: RunCompletion,
646        started_at: OffsetDateTime,
647        finished_at: OffsetDateTime,
648    ) -> Result<RunResult> {
649        let status = match &completion {
650            RunCompletion::Ok => RunStatus::Success,
651            RunCompletion::Err(_) => RunStatus::Failure,
652        };
653
654        let mut runtime_error = None;
655        if let RunCompletion::Err(err) = completion {
656            warn!(error = %err, "pack execution failed");
657            eprintln!("pack execution failed: {err}");
658            runtime_error = Some(err.to_string());
659        }
660
661        let started = started_at
662            .format(&Rfc3339)
663            .unwrap_or_else(|_| started_at.to_string());
664        let finished = finished_at
665            .format(&Rfc3339)
666            .unwrap_or_else(|_| finished_at.to_string());
667
668        let state = self.state.lock();
669        let mut summaries = Vec::new();
670        let mut failures = BTreeMap::new();
671        for node_id in &state.order {
672            if let Some(record) = state.nodes.get(node_id) {
673                let duration = record.duration_ms.unwrap_or(0);
674                summaries.push(NodeSummary {
675                    node_id: node_id.clone(),
676                    component: record.component.clone(),
677                    status: record.status.clone(),
678                    duration_ms: duration,
679                });
680                if record.status == NodeStatus::Error {
681                    let start_offset = record.transcript_start.unwrap_or(0);
682                    let err_offset = record.transcript_error.unwrap_or(start_offset);
683                    failures.insert(
684                        node_id.clone(),
685                        NodeFailure {
686                            code: record
687                                .failure_code
688                                .clone()
689                                .unwrap_or_else(|| "component-failed".into()),
690                            message: record
691                                .failure_message
692                                .clone()
693                                .unwrap_or_else(|| "node failed".into()),
694                            details: record
695                                .failure_details
696                                .clone()
697                                .unwrap_or_else(|| json!({ "node": node_id })),
698                            transcript_offsets: (start_offset, err_offset),
699                            log_paths: record.log_paths.clone(),
700                        },
701                    );
702                }
703            }
704        }
705
706        let mut final_status = status;
707        if final_status == RunStatus::Success && !failures.is_empty() {
708            final_status = RunStatus::PartialFailure;
709        }
710
711        if let Some(error) = &runtime_error {
712            failures.insert(
713                "_runtime".into(),
714                NodeFailure {
715                    code: "runtime-error".into(),
716                    message: error.clone(),
717                    details: json!({ "stage": "execute" }),
718                    transcript_offsets: (0, 0),
719                    log_paths: Vec::new(),
720                },
721            );
722        }
723
724        let pack_meta = self.pack_meta.lock();
725        let flow_id = self.flow_id.lock().clone();
726        Ok(RunResult {
727            session_id: self.profile.session_id.clone(),
728            pack_id: pack_meta.pack_id.clone(),
729            pack_version: pack_meta.version.clone(),
730            flow_id,
731            started_at_utc: started,
732            finished_at_utc: finished,
733            status: final_status,
734            error: runtime_error,
735            node_summaries: summaries,
736            failures,
737            artifacts_dir: self.directories.root.clone(),
738        })
739    }
740
741    fn set_flow_id(&self, flow_id: &str) {
742        *self.flow_id.lock() = flow_id.to_string();
743    }
744
745    fn update_pack_metadata(&self, meta: PackMetadata) {
746        *self.pack_meta.lock() = meta;
747    }
748
749    fn current_flow_id(&self) -> String {
750        self.flow_id.lock().clone()
751    }
752
753    fn record_verify_event(&self, status: &str, message: &str) -> Result<()> {
754        let timestamp = OffsetDateTime::now_utc();
755        let event = json!({
756            "ts": timestamp
757                .format(&Rfc3339)
758                .unwrap_or_else(|_| timestamp.to_string()),
759            "session_id": self.profile.session_id,
760            "flow_id": self.current_flow_id(),
761            "node_id": Value::Null,
762            "component": "verify.pack",
763            "phase": "verify",
764            "status": status,
765            "inputs": Value::Null,
766            "outputs": Value::Null,
767            "error": json!({ "message": message }),
768            "metrics": Value::Null,
769            "schema_id": Value::Null,
770            "defaults_applied": Value::Null,
771            "redactions": Value::Array(Vec::new()),
772        });
773        self.transcript.lock().write(&event).map(|_| ())
774    }
775
776    fn write_mock_event(&self, capability: &str, provider: &str, payload: Value) -> Result<()> {
777        let timestamp = OffsetDateTime::now_utc();
778        let event = json!({
779            "ts": timestamp
780                .format(&Rfc3339)
781                .unwrap_or_else(|_| timestamp.to_string()),
782            "session_id": self.profile.session_id,
783            "flow_id": self.current_flow_id(),
784            "node_id": Value::Null,
785            "component": format!("mock::{capability}"),
786            "phase": "mock",
787            "status": "ok",
788            "inputs": json!({ "capability": capability, "provider": provider }),
789            "outputs": payload,
790            "error": Value::Null,
791            "metrics": Value::Null,
792            "schema_id": Value::Null,
793            "defaults_applied": Value::Null,
794            "redactions": Value::Array(Vec::new()),
795        });
796        self.transcript.lock().write(&event).map(|_| ())
797    }
798
799    fn handle_node_start(&self, event: &NodeEvent<'_>) -> Result<()> {
800        let timestamp = OffsetDateTime::now_utc();
801        let (redacted_payload, redactions) = redact_value(event.payload, "$.inputs.payload");
802        let inputs = json!({
803            "payload": redacted_payload,
804            "context": {
805                "tenant_id": self.profile.tenant_id.as_str(),
806                "team_id": self.profile.team_id.as_str(),
807                "user_id": self.profile.user_id.as_str(),
808            }
809        });
810        let flow_id = self.current_flow_id();
811        let event_json = build_transcript_event(TranscriptEventArgs {
812            profile: &self.profile,
813            flow_id: &flow_id,
814            node_id: event.node_id,
815            component: &event.node.component,
816            phase: "start",
817            status: "ok",
818            timestamp,
819            inputs,
820            outputs: Value::Null,
821            error: Value::Null,
822            redactions,
823        });
824        let (start_offset, _) = self.transcript.lock().write(&event_json)?;
825
826        let mut state = self.state.lock();
827        let node_key = event.node_id.to_string();
828        if !state.order.iter().any(|id| id == &node_key) {
829            state.order.push(node_key.clone());
830        }
831        let entry = state.nodes.entry(node_key).or_insert_with(|| {
832            NodeExecutionRecord::new(event.node.component.clone(), &self.directories)
833        });
834        entry.start_instant = Some(Instant::now());
835        entry.status = NodeStatus::Ok;
836        entry.transcript_start = Some(start_offset);
837        Ok(())
838    }
839
840    fn handle_node_end(&self, event: &NodeEvent<'_>, output: &Value) -> Result<()> {
841        let timestamp = OffsetDateTime::now_utc();
842        let (redacted_output, redactions) = redact_value(output, "$.outputs");
843        let flow_id = self.current_flow_id();
844        let event_json = build_transcript_event(TranscriptEventArgs {
845            profile: &self.profile,
846            flow_id: &flow_id,
847            node_id: event.node_id,
848            component: &event.node.component,
849            phase: "end",
850            status: "ok",
851            timestamp,
852            inputs: Value::Null,
853            outputs: redacted_output,
854            error: Value::Null,
855            redactions,
856        });
857        self.transcript.lock().write(&event_json)?;
858
859        let mut state = self.state.lock();
860        if let Some(entry) = state.nodes.get_mut(event.node_id)
861            && let Some(started) = entry.start_instant.take()
862        {
863            entry.duration_ms = Some(started.elapsed().as_millis() as u64);
864        }
865        Ok(())
866    }
867
868    fn handle_node_error(
869        &self,
870        event: &NodeEvent<'_>,
871        error: &dyn std::error::Error,
872    ) -> Result<()> {
873        let timestamp = OffsetDateTime::now_utc();
874        let error_message = error.to_string();
875        let error_json = json!({
876            "code": "component-failed",
877            "message": error_message,
878            "details": {
879                "node": event.node_id,
880            }
881        });
882        let flow_id = self.current_flow_id();
883        let event_json = build_transcript_event(TranscriptEventArgs {
884            profile: &self.profile,
885            flow_id: &flow_id,
886            node_id: event.node_id,
887            component: &event.node.component,
888            phase: "error",
889            status: "error",
890            timestamp,
891            inputs: Value::Null,
892            outputs: Value::Null,
893            error: error_json.clone(),
894            redactions: Vec::new(),
895        });
896        let (_, end_offset) = self.transcript.lock().write(&event_json)?;
897
898        let mut state = self.state.lock();
899        if let Some(entry) = state.nodes.get_mut(event.node_id) {
900            entry.status = NodeStatus::Error;
901            if let Some(started) = entry.start_instant.take() {
902                entry.duration_ms = Some(started.elapsed().as_millis() as u64);
903            }
904            entry.transcript_error = Some(end_offset);
905            entry.failure_code = Some("component-failed".to_string());
906            entry.failure_message = Some(error_message.clone());
907            entry.failure_details = Some(error_json);
908            let log_path = self
909                .directories
910                .logs
911                .join(format!("{}.stderr.log", sanitize_id(event.node_id)));
912            if let Ok(mut file) = File::create(&log_path) {
913                let _ = writeln!(file, "{error_message}");
914            }
915            entry.log_paths.push(log_path);
916        }
917        Ok(())
918    }
919}
920
921impl ExecutionObserver for RunRecorder {
922    fn on_node_start(&self, event: &NodeEvent<'_>) {
923        if let Err(err) = self.handle_node_start(event) {
924            warn!(node = event.node_id, error = %err, "failed to record node start");
925        }
926    }
927
928    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
929        if let Err(err) = self.handle_node_end(event, output) {
930            warn!(node = event.node_id, error = %err, "failed to record node end");
931        }
932    }
933
934    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
935        if let Err(err) = self.handle_node_error(event, error) {
936            warn!(node = event.node_id, error = %err, "failed to record node error");
937        }
938    }
939}
940
941impl MockEventSink for RunRecorder {
942    fn on_mock_event(&self, capability: &str, provider: &str, payload: &Value) {
943        if let Err(err) = self.write_mock_event(capability, provider, payload.clone()) {
944            warn!(?capability, ?provider, error = %err, "failed to record mock event");
945        }
946    }
947}
948
949#[derive(Default)]
950struct RunRecorderState {
951    nodes: BTreeMap<String, NodeExecutionRecord>,
952    order: Vec<String>,
953}
954
955#[derive(Clone)]
956struct NodeExecutionRecord {
957    component: String,
958    status: NodeStatus,
959    duration_ms: Option<u64>,
960    transcript_start: Option<u64>,
961    transcript_error: Option<u64>,
962    log_paths: Vec<PathBuf>,
963    failure_code: Option<String>,
964    failure_message: Option<String>,
965    failure_details: Option<Value>,
966    start_instant: Option<Instant>,
967}
968
969impl NodeExecutionRecord {
970    fn new(component: String, _dirs: &RunDirectories) -> Self {
971        Self {
972            component,
973            status: NodeStatus::Ok,
974            duration_ms: None,
975            transcript_start: None,
976            transcript_error: None,
977            log_paths: Vec::new(),
978            failure_code: None,
979            failure_message: None,
980            failure_details: None,
981            start_instant: None,
982        }
983    }
984}
985
986struct TranscriptWriter {
987    writer: BufWriter<File>,
988    offset: u64,
989    hook: Option<TranscriptHook>,
990}
991
992impl TranscriptWriter {
993    fn new(writer: BufWriter<File>, hook: Option<TranscriptHook>) -> Self {
994        Self {
995            writer,
996            offset: 0,
997            hook,
998        }
999    }
1000
1001    fn write(&mut self, value: &Value) -> Result<(u64, u64)> {
1002        let line = serde_json::to_vec(value)?;
1003        let start = self.offset;
1004        self.writer.write_all(&line)?;
1005        self.writer.write_all(b"\n")?;
1006        self.writer.flush()?;
1007        self.offset += line.len() as u64 + 1;
1008        if let Some(hook) = &self.hook {
1009            hook(value);
1010        }
1011        Ok((start, self.offset))
1012    }
1013}
1014
1015struct TranscriptEventArgs<'a> {
1016    profile: &'a ResolvedProfile,
1017    flow_id: &'a str,
1018    node_id: &'a str,
1019    component: &'a str,
1020    phase: &'a str,
1021    status: &'a str,
1022    timestamp: OffsetDateTime,
1023    inputs: Value,
1024    outputs: Value,
1025    error: Value,
1026    redactions: Vec<String>,
1027}
1028
1029fn build_transcript_event(args: TranscriptEventArgs<'_>) -> Value {
1030    let ts = args
1031        .timestamp
1032        .format(&Rfc3339)
1033        .unwrap_or_else(|_| args.timestamp.to_string());
1034    json!({
1035        "ts": ts,
1036        "session_id": args.profile.session_id.as_str(),
1037        "flow_id": args.flow_id,
1038        "node_id": args.node_id,
1039        "component": args.component,
1040        "phase": args.phase,
1041        "status": args.status,
1042        "inputs": args.inputs,
1043        "outputs": args.outputs,
1044        "error": args.error,
1045        "metrics": {
1046            "duration_ms": null,
1047            "cpu_time_ms": null,
1048            "mem_peak_bytes": null,
1049        },
1050        "schema_id": Value::Null,
1051        "defaults_applied": Value::Array(Vec::new()),
1052        "redactions": args.redactions,
1053    })
1054}
1055
1056fn redact_value(value: &Value, base: &str) -> (Value, Vec<String>) {
1057    let mut paths = Vec::new();
1058    let redacted = redact_recursive(value, base, &mut paths);
1059    (redacted, paths)
1060}
1061
1062fn redact_recursive(value: &Value, path: &str, acc: &mut Vec<String>) -> Value {
1063    match value {
1064        Value::Object(map) => {
1065            let mut new_map = JsonMap::new();
1066            for (key, val) in map {
1067                let child_path = format!("{path}.{key}");
1068                if is_sensitive_key(key) {
1069                    acc.push(child_path);
1070                    new_map.insert(key.clone(), Value::String("__REDACTED__".into()));
1071                } else {
1072                    new_map.insert(key.clone(), redact_recursive(val, &child_path, acc));
1073                }
1074            }
1075            Value::Object(new_map)
1076        }
1077        Value::Array(items) => {
1078            let mut new_items = Vec::new();
1079            for (idx, item) in items.iter().enumerate() {
1080                let child_path = format!("{path}[{idx}]");
1081                new_items.push(redact_recursive(item, &child_path, acc));
1082            }
1083            Value::Array(new_items)
1084        }
1085        other => other.clone(),
1086    }
1087}
1088
1089fn is_sensitive_key(key: &str) -> bool {
1090    let lower = key.to_ascii_lowercase();
1091    const MARKERS: [&str; 5] = ["secret", "token", "password", "authorization", "cookie"];
1092    MARKERS.iter().any(|marker| lower.contains(marker))
1093}
1094
1095fn sanitize_id(value: &str) -> String {
1096    value
1097        .chars()
1098        .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
1099        .collect()
1100}