1use anyhow::{Context, Result, anyhow};
2use greentic_pack::reader::{PackLoad, open_pack};
3use greentic_runner_host::RunnerWasiPolicy;
4use greentic_runner_host::config::{
5 FlowRetryConfig, HostConfig, RateLimits, SecretsPolicy, WebhookPolicy,
6};
7use greentic_runner_host::pack::{FlowDescriptor, PackMetadata, PackRuntime};
8use greentic_runner_host::runner::engine::{ExecutionObserver, FlowContext, FlowEngine, NodeEvent};
9pub use greentic_runner_host::runner::mocks::{
10 HttpMock, HttpMockMode, KvMock, MocksConfig, SecretsMock, TelemetryMock, TimeMock, ToolsMock,
11};
12use greentic_runner_host::runner::mocks::{MockEventSink, MockLayer};
13use greentic_runner_host::secrets::default_manager;
14use greentic_runner_host::storage::{new_session_store, new_state_store};
15use parking_lot::Mutex;
16use runner_core::normalize_under_root;
17use serde::{Deserialize, Serialize};
18use serde_json::{Map as JsonMap, Value, json};
19use std::collections::{BTreeMap, HashMap};
20use std::fmt;
21use std::fs::{self, File};
22use std::io::{BufWriter, Write};
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25use std::time::Instant;
26use time::OffsetDateTime;
27use time::format_description::well_known::Rfc3339;
28use tokio::runtime::Runtime;
29use tracing::{info, warn};
30use uuid::Uuid;
31use zip::ZipArchive;
32
33const PROVIDER_ID_DEV: &str = "greentic-dev";
34
35pub type TranscriptHook = Arc<dyn Fn(&Value) + Send + Sync>;
37
38#[derive(Clone, Debug, Serialize, Deserialize, Default)]
40pub struct OtlpHook {
41 pub endpoint: String,
42 #[serde(default)]
43 pub headers: Vec<(String, String)>,
44 #[serde(default)]
45 pub sample_all: bool,
46}
47
48#[derive(Clone, Debug)]
50#[non_exhaustive]
51pub enum Profile {
52 Dev(DevProfile),
53}
54
55impl Default for Profile {
56 fn default() -> Self {
57 Self::Dev(DevProfile::default())
58 }
59}
60
61#[derive(Clone, Debug)]
63pub struct DevProfile {
64 pub tenant_id: String,
65 pub team_id: String,
66 pub user_id: String,
67 pub max_node_wall_time_ms: u64,
68 pub max_run_wall_time_ms: u64,
69}
70
71impl Default for DevProfile {
72 fn default() -> Self {
73 Self {
74 tenant_id: "local-dev".to_string(),
75 team_id: "default".to_string(),
76 user_id: "developer".to_string(),
77 max_node_wall_time_ms: 30_000,
78 max_run_wall_time_ms: 600_000,
79 }
80 }
81}
82
83#[derive(Clone, Debug, Default)]
85pub struct TenantContext {
86 pub tenant_id: Option<String>,
87 pub team_id: Option<String>,
88 pub user_id: Option<String>,
89 pub session_id: Option<String>,
90}
91
92impl TenantContext {
93 pub fn default_local() -> Self {
94 Self {
95 tenant_id: Some("local-dev".into()),
96 team_id: Some("default".into()),
97 user_id: Some("developer".into()),
98 session_id: None,
99 }
100 }
101}
102
103#[derive(Clone)]
105pub struct RunOptions {
106 pub profile: Profile,
107 pub entry_flow: Option<String>,
108 pub input: Value,
109 pub ctx: TenantContext,
110 pub transcript: Option<TranscriptHook>,
111 pub otlp: Option<OtlpHook>,
112 pub mocks: MocksConfig,
113 pub artifacts_dir: Option<PathBuf>,
114 pub signing: SigningPolicy,
115}
116
117impl Default for RunOptions {
118 fn default() -> Self {
119 desktop_defaults()
120 }
121}
122
123impl fmt::Debug for RunOptions {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("RunOptions")
126 .field("profile", &self.profile)
127 .field("entry_flow", &self.entry_flow)
128 .field("input", &self.input)
129 .field("ctx", &self.ctx)
130 .field("transcript", &self.transcript.is_some())
131 .field("otlp", &self.otlp)
132 .field("mocks", &self.mocks)
133 .field("artifacts_dir", &self.artifacts_dir)
134 .field("signing", &self.signing)
135 .finish()
136 }
137}
138
139#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
140pub enum SigningPolicy {
141 Strict,
142 DevOk,
143}
144
145#[derive(Clone, Debug)]
147pub struct Runner {
148 base: RunOptions,
149}
150
151impl Runner {
152 pub fn new() -> Self {
153 Self {
154 base: desktop_defaults(),
155 }
156 }
157
158 pub fn profile(mut self, profile: Profile) -> Self {
159 self.base.profile = profile;
160 self
161 }
162
163 pub fn with_mocks(mut self, mocks: MocksConfig) -> Self {
164 self.base.mocks = mocks;
165 self
166 }
167
168 pub fn configure(mut self, f: impl FnOnce(&mut RunOptions)) -> Self {
169 f(&mut self.base);
170 self
171 }
172
173 pub fn run_pack<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
174 run_pack_with_options(pack_path, self.base.clone())
175 }
176
177 pub fn run_pack_with<P: AsRef<Path>>(
178 &self,
179 pack_path: P,
180 f: impl FnOnce(&mut RunOptions),
181 ) -> Result<RunResult> {
182 let mut opts = self.base.clone();
183 f(&mut opts);
184 run_pack_with_options(pack_path, opts)
185 }
186}
187
188impl Default for Runner {
189 fn default() -> Self {
190 Self::new()
191 }
192}
193
194pub fn run_pack_with_options<P: AsRef<Path>>(pack_path: P, opts: RunOptions) -> Result<RunResult> {
196 let runtime = Runtime::new().context("failed to create tokio runtime")?;
197 runtime.block_on(run_pack_async(pack_path.as_ref(), opts))
198}
199
200pub fn desktop_defaults() -> RunOptions {
202 let otlp = std::env::var("OTLP_ENDPOINT")
203 .ok()
204 .map(|endpoint| OtlpHook {
205 endpoint,
206 headers: Vec::new(),
207 sample_all: true,
208 });
209 RunOptions {
210 profile: Profile::Dev(DevProfile::default()),
211 entry_flow: None,
212 input: json!({}),
213 ctx: TenantContext::default_local(),
214 transcript: None,
215 otlp,
216 mocks: MocksConfig::default(),
217 artifacts_dir: None,
218 signing: SigningPolicy::DevOk,
219 }
220}
221
222async fn run_pack_async(pack_path: &Path, opts: RunOptions) -> Result<RunResult> {
223 let pack_path = normalize_pack_path(pack_path)?;
224 let resolved_profile = resolve_profile(&opts.profile, &opts.ctx);
225 if let Some(otlp) = &opts.otlp {
226 apply_otlp_hook(otlp);
227 }
228
229 let directories = prepare_run_dirs(opts.artifacts_dir.clone())?;
230 info!(run_dir = %directories.root.display(), "prepared desktop run directory");
231
232 let mock_layer = Arc::new(MockLayer::new(opts.mocks.clone(), &directories.root)?);
233
234 let recorder = Arc::new(RunRecorder::new(
235 directories.clone(),
236 &resolved_profile,
237 None,
238 PackMetadata::fallback(&pack_path),
239 opts.transcript.clone(),
240 )?);
241
242 let mock_sink: Arc<dyn MockEventSink> = recorder.clone();
243 mock_layer.register_sink(mock_sink);
244
245 let mut pack_load: Option<PackLoad> = None;
246 match open_pack(&pack_path, to_reader_policy(opts.signing)) {
247 Ok(load) => {
248 let meta = &load.manifest.meta;
249 recorder.update_pack_metadata(PackMetadata {
250 pack_id: meta.pack_id.clone(),
251 version: meta.version.to_string(),
252 entry_flows: meta.entry_flows.clone(),
253 secret_requirements: Vec::new(),
254 });
255 pack_load = Some(load);
256 }
257 Err(err) => {
258 recorder.record_verify_event("error", &err.message)?;
259 if opts.signing == SigningPolicy::DevOk && is_signature_error(&err.message) {
260 warn!(error = %err.message, "continuing despite signature error (dev policy)");
261 } else {
262 return Err(anyhow!("pack verification failed: {}", err.message));
263 }
264 }
265 }
266
267 let host_config = Arc::new(build_host_config(&resolved_profile, &directories));
268 let component_artifact =
269 resolve_component_artifact(&pack_path, pack_load.as_ref(), &directories)?;
270 let archive_source = if pack_path
271 .extension()
272 .and_then(|ext| ext.to_str())
273 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
274 .unwrap_or(false)
275 {
276 Some(&pack_path)
277 } else {
278 None
279 };
280
281 let session_store = new_session_store();
282 let state_store = new_state_store();
283 let secrets_manager = default_manager();
284 let pack = Arc::new(
285 PackRuntime::load(
286 &component_artifact,
287 Arc::clone(&host_config),
288 Some(Arc::clone(&mock_layer)),
289 archive_source.map(|p| p as &Path),
290 Some(Arc::clone(&session_store)),
291 Some(Arc::clone(&state_store)),
292 Arc::new(RunnerWasiPolicy::default()),
293 secrets_manager,
294 host_config.oauth_broker_config(),
295 false,
296 )
297 .await
298 .with_context(|| format!("failed to load pack {}", component_artifact.display()))?,
299 );
300 recorder.update_pack_metadata(pack.metadata().clone());
301
302 let flows = pack
303 .list_flows()
304 .await
305 .context("failed to enumerate flows")?;
306 let entry_flow_id = resolve_entry_flow(opts.entry_flow.clone(), pack.metadata(), &flows)?;
307 recorder.set_flow_id(&entry_flow_id);
308
309 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&host_config))
310 .await
311 .context("failed to prime flow engine")?;
312
313 let started_at = OffsetDateTime::now_utc();
314 let tenant_str = host_config.tenant.clone();
315 let session_id_owned = resolved_profile.session_id.clone();
316 let provider_id_owned = resolved_profile.provider_id.clone();
317 let recorder_ref: &RunRecorder = &recorder;
318 let mock_ref: &MockLayer = &mock_layer;
319 let ctx = FlowContext {
320 tenant: &tenant_str,
321 flow_id: &entry_flow_id,
322 node_id: None,
323 tool: None,
324 action: Some("run_pack"),
325 session_id: Some(session_id_owned.as_str()),
326 provider_id: Some(provider_id_owned.as_str()),
327 retry_config: host_config.retry_config().into(),
328 observer: Some(recorder_ref),
329 mocks: Some(mock_ref),
330 };
331
332 let execution = engine.execute(ctx, opts.input.clone()).await;
333 let finished_at = OffsetDateTime::now_utc();
334
335 let status = match execution {
336 Ok(result) => match result.status {
337 greentic_runner_host::runner::engine::FlowStatus::Completed => RunCompletion::Ok,
338 greentic_runner_host::runner::engine::FlowStatus::Waiting(wait) => {
339 let reason = wait
340 .reason
341 .unwrap_or_else(|| "flow paused unexpectedly".to_string());
342 RunCompletion::Err(anyhow::anyhow!(reason))
343 }
344 },
345 Err(err) => RunCompletion::Err(err),
346 };
347
348 let result = recorder.finalise(status, started_at, finished_at)?;
349
350 let run_json_path = directories.root.join("run.json");
351 fs::write(&run_json_path, serde_json::to_vec_pretty(&result)?)
352 .with_context(|| format!("failed to write run summary {}", run_json_path.display()))?;
353
354 Ok(result)
355}
356
357fn apply_otlp_hook(hook: &OtlpHook) {
358 info!(
359 endpoint = %hook.endpoint,
360 sample_all = hook.sample_all,
361 headers = %hook.headers.len(),
362 "OTLP hook requested (set OTEL_* env vars before invoking run_pack)"
363 );
364}
365
366fn normalize_pack_path(path: &Path) -> Result<PathBuf> {
367 if path.is_absolute() {
368 let parent = path
369 .parent()
370 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
371 let root = parent
372 .canonicalize()
373 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
374 let file = path
375 .file_name()
376 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
377 return normalize_under_root(&root, Path::new(file));
378 }
379
380 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
381 let base = if let Some(parent) = path.parent() {
382 cwd.join(parent)
383 } else {
384 cwd
385 };
386 let root = base
387 .canonicalize()
388 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
389 let file = path
390 .file_name()
391 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
392 normalize_under_root(&root, Path::new(file))
393}
394
395fn prepare_run_dirs(root_override: Option<PathBuf>) -> Result<RunDirectories> {
396 let root = if let Some(dir) = root_override {
397 dir
398 } else {
399 let timestamp = OffsetDateTime::now_utc()
400 .format(&Rfc3339)
401 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
402 .replace(':', "-");
403 let short_id = Uuid::new_v4()
404 .to_string()
405 .chars()
406 .take(6)
407 .collect::<String>();
408 PathBuf::from(".greentic")
409 .join("runs")
410 .join(format!("{timestamp}_{short_id}"))
411 };
412
413 fs::create_dir_all(&root).with_context(|| format!("failed to create {}", root.display()))?;
414 let logs = root.join("logs");
415 let resolved = root.join("resolved_config");
416 fs::create_dir_all(&logs).with_context(|| format!("failed to create {}", logs.display()))?;
417 fs::create_dir_all(&resolved)
418 .with_context(|| format!("failed to create {}", resolved.display()))?;
419
420 Ok(RunDirectories {
421 root,
422 logs,
423 resolved,
424 })
425}
426
427fn resolve_entry_flow(
428 override_id: Option<String>,
429 metadata: &PackMetadata,
430 flows: &[FlowDescriptor],
431) -> Result<String> {
432 if let Some(flow) = override_id {
433 return Ok(flow);
434 }
435 if let Some(first) = metadata.entry_flows.first() {
436 return Ok(first.clone());
437 }
438 flows
439 .first()
440 .map(|f| f.id.clone())
441 .ok_or_else(|| anyhow!("pack does not declare any flows"))
442}
443
444fn is_signature_error(message: &str) -> bool {
445 message.to_ascii_lowercase().contains("signature")
446}
447
448fn to_reader_policy(policy: SigningPolicy) -> greentic_pack::reader::SigningPolicy {
449 match policy {
450 SigningPolicy::Strict => greentic_pack::reader::SigningPolicy::Strict,
451 SigningPolicy::DevOk => greentic_pack::reader::SigningPolicy::DevOk,
452 }
453}
454
455fn resolve_component_artifact(
456 pack_path: &Path,
457 load: Option<&PackLoad>,
458 dirs: &RunDirectories,
459) -> Result<PathBuf> {
460 if pack_path
461 .extension()
462 .and_then(|ext| ext.to_str())
463 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
464 .unwrap_or(false)
465 {
466 return Ok(pack_path.to_path_buf());
467 }
468
469 let entry = find_component_entry(pack_path, load)?;
470 let file =
471 File::open(pack_path).with_context(|| format!("failed to open {}", pack_path.display()))?;
472 let mut archive = ZipArchive::new(file)
473 .with_context(|| format!("{} is not a valid gtpack", pack_path.display()))?;
474 let mut component = archive
475 .by_name(&entry)
476 .with_context(|| format!("component {entry} missing from pack"))?;
477 let out_path = dirs.root.join("component.wasm");
478 let mut out_file = File::create(&out_path)
479 .with_context(|| format!("failed to create {}", out_path.display()))?;
480 std::io::copy(&mut component, &mut out_file)
481 .with_context(|| format!("failed to extract component {entry}"))?;
482 Ok(out_path)
483}
484
485fn find_component_entry(pack_path: &Path, load: Option<&PackLoad>) -> Result<String> {
486 if let Some(load) = load
487 && let Some(component) = load.manifest.components.first()
488 {
489 return Ok(component.file_wasm.clone());
490 }
491
492 let file =
493 File::open(pack_path).with_context(|| format!("failed to open {}", pack_path.display()))?;
494 let mut archive = ZipArchive::new(file)
495 .with_context(|| format!("{} is not a valid gtpack", pack_path.display()))?;
496 for index in 0..archive.len() {
497 let entry = archive
498 .by_index(index)
499 .with_context(|| format!("failed to read entry #{index}"))?;
500 if entry.name().ends_with(".wasm") {
501 return Ok(entry.name().to_string());
502 }
503 }
504 Err(anyhow!("pack does not contain a wasm component"))
505}
506
507fn resolve_profile(profile: &Profile, ctx: &TenantContext) -> ResolvedProfile {
508 match profile {
509 Profile::Dev(dev) => ResolvedProfile {
510 tenant_id: ctx
511 .tenant_id
512 .clone()
513 .unwrap_or_else(|| dev.tenant_id.clone()),
514 team_id: ctx.team_id.clone().unwrap_or_else(|| dev.team_id.clone()),
515 user_id: ctx.user_id.clone().unwrap_or_else(|| dev.user_id.clone()),
516 session_id: ctx
517 .session_id
518 .clone()
519 .unwrap_or_else(|| Uuid::new_v4().to_string()),
520 provider_id: PROVIDER_ID_DEV.to_string(),
521 max_node_wall_time_ms: dev.max_node_wall_time_ms,
522 max_run_wall_time_ms: dev.max_run_wall_time_ms,
523 },
524 }
525}
526
527fn build_host_config(profile: &ResolvedProfile, dirs: &RunDirectories) -> HostConfig {
528 HostConfig {
529 tenant: profile.tenant_id.clone(),
530 bindings_path: dirs.resolved.join("dev.bindings.yaml"),
531 flow_type_bindings: HashMap::new(),
532 rate_limits: RateLimits::default(),
533 retry: FlowRetryConfig::default(),
534 http_enabled: false,
535 secrets_policy: SecretsPolicy::allow_all(),
536 webhook_policy: WebhookPolicy::default(),
537 timers: Vec::new(),
538 oauth: None,
539 mocks: None,
540 }
541}
542
543#[allow(dead_code)]
544#[derive(Clone, Debug)]
545struct ResolvedProfile {
546 tenant_id: String,
547 team_id: String,
548 user_id: String,
549 session_id: String,
550 provider_id: String,
551 max_node_wall_time_ms: u64,
552 max_run_wall_time_ms: u64,
553}
554
555#[derive(Clone, Debug, Serialize, Deserialize)]
556pub struct RunResult {
557 pub session_id: String,
558 pub pack_id: String,
559 pub pack_version: String,
560 pub flow_id: String,
561 pub started_at_utc: String,
562 pub finished_at_utc: String,
563 pub status: RunStatus,
564 pub error: Option<String>,
565 pub node_summaries: Vec<NodeSummary>,
566 pub failures: BTreeMap<String, NodeFailure>,
567 pub artifacts_dir: PathBuf,
568}
569
570#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
571pub enum RunStatus {
572 Success,
573 PartialFailure,
574 Failure,
575}
576
577#[derive(Clone, Debug, Serialize, Deserialize)]
578pub struct NodeSummary {
579 pub node_id: String,
580 pub component: String,
581 pub status: NodeStatus,
582 pub duration_ms: u64,
583}
584
585#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
586pub enum NodeStatus {
587 Ok,
588 Skipped,
589 Error,
590}
591
592#[derive(Clone, Debug, Serialize, Deserialize)]
593pub struct NodeFailure {
594 pub code: String,
595 pub message: String,
596 pub details: Value,
597 pub transcript_offsets: (u64, u64),
598 pub log_paths: Vec<PathBuf>,
599}
600
601#[derive(Clone)]
602struct RunDirectories {
603 root: PathBuf,
604 logs: PathBuf,
605 resolved: PathBuf,
606}
607
608enum RunCompletion {
609 Ok,
610 Err(anyhow::Error),
611}
612
613struct RunRecorder {
614 directories: RunDirectories,
615 profile: ResolvedProfile,
616 flow_id: Mutex<String>,
617 pack_meta: Mutex<PackMetadata>,
618 transcript: Mutex<TranscriptWriter>,
619 state: Mutex<RunRecorderState>,
620}
621
622impl RunRecorder {
623 fn new(
624 dirs: RunDirectories,
625 profile: &ResolvedProfile,
626 flow_id: Option<String>,
627 pack_meta: PackMetadata,
628 hook: Option<TranscriptHook>,
629 ) -> Result<Self> {
630 let transcript_path = dirs.root.join("transcript.jsonl");
631 let file = File::create(&transcript_path)
632 .with_context(|| format!("failed to open {}", transcript_path.display()))?;
633 Ok(Self {
634 directories: dirs,
635 profile: profile.clone(),
636 flow_id: Mutex::new(flow_id.unwrap_or_else(|| "unknown".into())),
637 pack_meta: Mutex::new(pack_meta),
638 transcript: Mutex::new(TranscriptWriter::new(BufWriter::new(file), hook)),
639 state: Mutex::new(RunRecorderState::default()),
640 })
641 }
642
643 fn finalise(
644 &self,
645 completion: RunCompletion,
646 started_at: OffsetDateTime,
647 finished_at: OffsetDateTime,
648 ) -> Result<RunResult> {
649 let status = match &completion {
650 RunCompletion::Ok => RunStatus::Success,
651 RunCompletion::Err(_) => RunStatus::Failure,
652 };
653
654 let mut runtime_error = None;
655 if let RunCompletion::Err(err) = completion {
656 warn!(error = %err, "pack execution failed");
657 eprintln!("pack execution failed: {err}");
658 runtime_error = Some(err.to_string());
659 }
660
661 let started = started_at
662 .format(&Rfc3339)
663 .unwrap_or_else(|_| started_at.to_string());
664 let finished = finished_at
665 .format(&Rfc3339)
666 .unwrap_or_else(|_| finished_at.to_string());
667
668 let state = self.state.lock();
669 let mut summaries = Vec::new();
670 let mut failures = BTreeMap::new();
671 for node_id in &state.order {
672 if let Some(record) = state.nodes.get(node_id) {
673 let duration = record.duration_ms.unwrap_or(0);
674 summaries.push(NodeSummary {
675 node_id: node_id.clone(),
676 component: record.component.clone(),
677 status: record.status.clone(),
678 duration_ms: duration,
679 });
680 if record.status == NodeStatus::Error {
681 let start_offset = record.transcript_start.unwrap_or(0);
682 let err_offset = record.transcript_error.unwrap_or(start_offset);
683 failures.insert(
684 node_id.clone(),
685 NodeFailure {
686 code: record
687 .failure_code
688 .clone()
689 .unwrap_or_else(|| "component-failed".into()),
690 message: record
691 .failure_message
692 .clone()
693 .unwrap_or_else(|| "node failed".into()),
694 details: record
695 .failure_details
696 .clone()
697 .unwrap_or_else(|| json!({ "node": node_id })),
698 transcript_offsets: (start_offset, err_offset),
699 log_paths: record.log_paths.clone(),
700 },
701 );
702 }
703 }
704 }
705
706 let mut final_status = status;
707 if final_status == RunStatus::Success && !failures.is_empty() {
708 final_status = RunStatus::PartialFailure;
709 }
710
711 if let Some(error) = &runtime_error {
712 failures.insert(
713 "_runtime".into(),
714 NodeFailure {
715 code: "runtime-error".into(),
716 message: error.clone(),
717 details: json!({ "stage": "execute" }),
718 transcript_offsets: (0, 0),
719 log_paths: Vec::new(),
720 },
721 );
722 }
723
724 let pack_meta = self.pack_meta.lock();
725 let flow_id = self.flow_id.lock().clone();
726 Ok(RunResult {
727 session_id: self.profile.session_id.clone(),
728 pack_id: pack_meta.pack_id.clone(),
729 pack_version: pack_meta.version.clone(),
730 flow_id,
731 started_at_utc: started,
732 finished_at_utc: finished,
733 status: final_status,
734 error: runtime_error,
735 node_summaries: summaries,
736 failures,
737 artifacts_dir: self.directories.root.clone(),
738 })
739 }
740
741 fn set_flow_id(&self, flow_id: &str) {
742 *self.flow_id.lock() = flow_id.to_string();
743 }
744
745 fn update_pack_metadata(&self, meta: PackMetadata) {
746 *self.pack_meta.lock() = meta;
747 }
748
749 fn current_flow_id(&self) -> String {
750 self.flow_id.lock().clone()
751 }
752
753 fn record_verify_event(&self, status: &str, message: &str) -> Result<()> {
754 let timestamp = OffsetDateTime::now_utc();
755 let event = json!({
756 "ts": timestamp
757 .format(&Rfc3339)
758 .unwrap_or_else(|_| timestamp.to_string()),
759 "session_id": self.profile.session_id,
760 "flow_id": self.current_flow_id(),
761 "node_id": Value::Null,
762 "component": "verify.pack",
763 "phase": "verify",
764 "status": status,
765 "inputs": Value::Null,
766 "outputs": Value::Null,
767 "error": json!({ "message": message }),
768 "metrics": Value::Null,
769 "schema_id": Value::Null,
770 "defaults_applied": Value::Null,
771 "redactions": Value::Array(Vec::new()),
772 });
773 self.transcript.lock().write(&event).map(|_| ())
774 }
775
776 fn write_mock_event(&self, capability: &str, provider: &str, payload: Value) -> Result<()> {
777 let timestamp = OffsetDateTime::now_utc();
778 let event = json!({
779 "ts": timestamp
780 .format(&Rfc3339)
781 .unwrap_or_else(|_| timestamp.to_string()),
782 "session_id": self.profile.session_id,
783 "flow_id": self.current_flow_id(),
784 "node_id": Value::Null,
785 "component": format!("mock::{capability}"),
786 "phase": "mock",
787 "status": "ok",
788 "inputs": json!({ "capability": capability, "provider": provider }),
789 "outputs": payload,
790 "error": Value::Null,
791 "metrics": Value::Null,
792 "schema_id": Value::Null,
793 "defaults_applied": Value::Null,
794 "redactions": Value::Array(Vec::new()),
795 });
796 self.transcript.lock().write(&event).map(|_| ())
797 }
798
799 fn handle_node_start(&self, event: &NodeEvent<'_>) -> Result<()> {
800 let timestamp = OffsetDateTime::now_utc();
801 let (redacted_payload, redactions) = redact_value(event.payload, "$.inputs.payload");
802 let inputs = json!({
803 "payload": redacted_payload,
804 "context": {
805 "tenant_id": self.profile.tenant_id.as_str(),
806 "team_id": self.profile.team_id.as_str(),
807 "user_id": self.profile.user_id.as_str(),
808 }
809 });
810 let flow_id = self.current_flow_id();
811 let event_json = build_transcript_event(TranscriptEventArgs {
812 profile: &self.profile,
813 flow_id: &flow_id,
814 node_id: event.node_id,
815 component: &event.node.component,
816 phase: "start",
817 status: "ok",
818 timestamp,
819 inputs,
820 outputs: Value::Null,
821 error: Value::Null,
822 redactions,
823 });
824 let (start_offset, _) = self.transcript.lock().write(&event_json)?;
825
826 let mut state = self.state.lock();
827 let node_key = event.node_id.to_string();
828 if !state.order.iter().any(|id| id == &node_key) {
829 state.order.push(node_key.clone());
830 }
831 let entry = state.nodes.entry(node_key).or_insert_with(|| {
832 NodeExecutionRecord::new(event.node.component.clone(), &self.directories)
833 });
834 entry.start_instant = Some(Instant::now());
835 entry.status = NodeStatus::Ok;
836 entry.transcript_start = Some(start_offset);
837 Ok(())
838 }
839
840 fn handle_node_end(&self, event: &NodeEvent<'_>, output: &Value) -> Result<()> {
841 let timestamp = OffsetDateTime::now_utc();
842 let (redacted_output, redactions) = redact_value(output, "$.outputs");
843 let flow_id = self.current_flow_id();
844 let event_json = build_transcript_event(TranscriptEventArgs {
845 profile: &self.profile,
846 flow_id: &flow_id,
847 node_id: event.node_id,
848 component: &event.node.component,
849 phase: "end",
850 status: "ok",
851 timestamp,
852 inputs: Value::Null,
853 outputs: redacted_output,
854 error: Value::Null,
855 redactions,
856 });
857 self.transcript.lock().write(&event_json)?;
858
859 let mut state = self.state.lock();
860 if let Some(entry) = state.nodes.get_mut(event.node_id)
861 && let Some(started) = entry.start_instant.take()
862 {
863 entry.duration_ms = Some(started.elapsed().as_millis() as u64);
864 }
865 Ok(())
866 }
867
868 fn handle_node_error(
869 &self,
870 event: &NodeEvent<'_>,
871 error: &dyn std::error::Error,
872 ) -> Result<()> {
873 let timestamp = OffsetDateTime::now_utc();
874 let error_message = error.to_string();
875 let error_json = json!({
876 "code": "component-failed",
877 "message": error_message,
878 "details": {
879 "node": event.node_id,
880 }
881 });
882 let flow_id = self.current_flow_id();
883 let event_json = build_transcript_event(TranscriptEventArgs {
884 profile: &self.profile,
885 flow_id: &flow_id,
886 node_id: event.node_id,
887 component: &event.node.component,
888 phase: "error",
889 status: "error",
890 timestamp,
891 inputs: Value::Null,
892 outputs: Value::Null,
893 error: error_json.clone(),
894 redactions: Vec::new(),
895 });
896 let (_, end_offset) = self.transcript.lock().write(&event_json)?;
897
898 let mut state = self.state.lock();
899 if let Some(entry) = state.nodes.get_mut(event.node_id) {
900 entry.status = NodeStatus::Error;
901 if let Some(started) = entry.start_instant.take() {
902 entry.duration_ms = Some(started.elapsed().as_millis() as u64);
903 }
904 entry.transcript_error = Some(end_offset);
905 entry.failure_code = Some("component-failed".to_string());
906 entry.failure_message = Some(error_message.clone());
907 entry.failure_details = Some(error_json);
908 let log_path = self
909 .directories
910 .logs
911 .join(format!("{}.stderr.log", sanitize_id(event.node_id)));
912 if let Ok(mut file) = File::create(&log_path) {
913 let _ = writeln!(file, "{error_message}");
914 }
915 entry.log_paths.push(log_path);
916 }
917 Ok(())
918 }
919}
920
921impl ExecutionObserver for RunRecorder {
922 fn on_node_start(&self, event: &NodeEvent<'_>) {
923 if let Err(err) = self.handle_node_start(event) {
924 warn!(node = event.node_id, error = %err, "failed to record node start");
925 }
926 }
927
928 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
929 if let Err(err) = self.handle_node_end(event, output) {
930 warn!(node = event.node_id, error = %err, "failed to record node end");
931 }
932 }
933
934 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
935 if let Err(err) = self.handle_node_error(event, error) {
936 warn!(node = event.node_id, error = %err, "failed to record node error");
937 }
938 }
939}
940
941impl MockEventSink for RunRecorder {
942 fn on_mock_event(&self, capability: &str, provider: &str, payload: &Value) {
943 if let Err(err) = self.write_mock_event(capability, provider, payload.clone()) {
944 warn!(?capability, ?provider, error = %err, "failed to record mock event");
945 }
946 }
947}
948
949#[derive(Default)]
950struct RunRecorderState {
951 nodes: BTreeMap<String, NodeExecutionRecord>,
952 order: Vec<String>,
953}
954
955#[derive(Clone)]
956struct NodeExecutionRecord {
957 component: String,
958 status: NodeStatus,
959 duration_ms: Option<u64>,
960 transcript_start: Option<u64>,
961 transcript_error: Option<u64>,
962 log_paths: Vec<PathBuf>,
963 failure_code: Option<String>,
964 failure_message: Option<String>,
965 failure_details: Option<Value>,
966 start_instant: Option<Instant>,
967}
968
969impl NodeExecutionRecord {
970 fn new(component: String, _dirs: &RunDirectories) -> Self {
971 Self {
972 component,
973 status: NodeStatus::Ok,
974 duration_ms: None,
975 transcript_start: None,
976 transcript_error: None,
977 log_paths: Vec::new(),
978 failure_code: None,
979 failure_message: None,
980 failure_details: None,
981 start_instant: None,
982 }
983 }
984}
985
986struct TranscriptWriter {
987 writer: BufWriter<File>,
988 offset: u64,
989 hook: Option<TranscriptHook>,
990}
991
992impl TranscriptWriter {
993 fn new(writer: BufWriter<File>, hook: Option<TranscriptHook>) -> Self {
994 Self {
995 writer,
996 offset: 0,
997 hook,
998 }
999 }
1000
1001 fn write(&mut self, value: &Value) -> Result<(u64, u64)> {
1002 let line = serde_json::to_vec(value)?;
1003 let start = self.offset;
1004 self.writer.write_all(&line)?;
1005 self.writer.write_all(b"\n")?;
1006 self.writer.flush()?;
1007 self.offset += line.len() as u64 + 1;
1008 if let Some(hook) = &self.hook {
1009 hook(value);
1010 }
1011 Ok((start, self.offset))
1012 }
1013}
1014
1015struct TranscriptEventArgs<'a> {
1016 profile: &'a ResolvedProfile,
1017 flow_id: &'a str,
1018 node_id: &'a str,
1019 component: &'a str,
1020 phase: &'a str,
1021 status: &'a str,
1022 timestamp: OffsetDateTime,
1023 inputs: Value,
1024 outputs: Value,
1025 error: Value,
1026 redactions: Vec<String>,
1027}
1028
1029fn build_transcript_event(args: TranscriptEventArgs<'_>) -> Value {
1030 let ts = args
1031 .timestamp
1032 .format(&Rfc3339)
1033 .unwrap_or_else(|_| args.timestamp.to_string());
1034 json!({
1035 "ts": ts,
1036 "session_id": args.profile.session_id.as_str(),
1037 "flow_id": args.flow_id,
1038 "node_id": args.node_id,
1039 "component": args.component,
1040 "phase": args.phase,
1041 "status": args.status,
1042 "inputs": args.inputs,
1043 "outputs": args.outputs,
1044 "error": args.error,
1045 "metrics": {
1046 "duration_ms": null,
1047 "cpu_time_ms": null,
1048 "mem_peak_bytes": null,
1049 },
1050 "schema_id": Value::Null,
1051 "defaults_applied": Value::Array(Vec::new()),
1052 "redactions": args.redactions,
1053 })
1054}
1055
1056fn redact_value(value: &Value, base: &str) -> (Value, Vec<String>) {
1057 let mut paths = Vec::new();
1058 let redacted = redact_recursive(value, base, &mut paths);
1059 (redacted, paths)
1060}
1061
1062fn redact_recursive(value: &Value, path: &str, acc: &mut Vec<String>) -> Value {
1063 match value {
1064 Value::Object(map) => {
1065 let mut new_map = JsonMap::new();
1066 for (key, val) in map {
1067 let child_path = format!("{path}.{key}");
1068 if is_sensitive_key(key) {
1069 acc.push(child_path);
1070 new_map.insert(key.clone(), Value::String("__REDACTED__".into()));
1071 } else {
1072 new_map.insert(key.clone(), redact_recursive(val, &child_path, acc));
1073 }
1074 }
1075 Value::Object(new_map)
1076 }
1077 Value::Array(items) => {
1078 let mut new_items = Vec::new();
1079 for (idx, item) in items.iter().enumerate() {
1080 let child_path = format!("{path}[{idx}]");
1081 new_items.push(redact_recursive(item, &child_path, acc));
1082 }
1083 Value::Array(new_items)
1084 }
1085 other => other.clone(),
1086 }
1087}
1088
1089fn is_sensitive_key(key: &str) -> bool {
1090 let lower = key.to_ascii_lowercase();
1091 const MARKERS: [&str; 5] = ["secret", "token", "password", "authorization", "cookie"];
1092 MARKERS.iter().any(|marker| lower.contains(marker))
1093}
1094
1095fn sanitize_id(value: &str) -> String {
1096 value
1097 .chars()
1098 .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
1099 .collect()
1100}