1use anyhow::{Context, Result, anyhow};
2use greentic_pack::reader::open_pack;
3use greentic_runner_host::RunnerWasiPolicy;
4use greentic_runner_host::config::{
5 FlowRetryConfig, HostConfig, OperatorPolicy, RateLimits, SecretsPolicy, StateStorePolicy,
6 WebhookPolicy,
7};
8use greentic_runner_host::pack::{ComponentResolution, FlowDescriptor, PackMetadata, PackRuntime};
9use greentic_runner_host::runner::engine::{ExecutionObserver, FlowContext, FlowEngine, NodeEvent};
10pub use greentic_runner_host::runner::mocks::{
11 HttpMock, HttpMockMode, KvMock, MocksConfig, SecretsMock, TelemetryMock, TimeMock, ToolsMock,
12};
13use greentic_runner_host::runner::mocks::{MockEventSink, MockLayer};
14use greentic_runner_host::secrets::default_manager;
15use greentic_runner_host::storage::{new_session_store, new_state_store};
16use greentic_runner_host::trace::TraceConfig;
17use greentic_runner_host::validate::ValidationConfig;
18use parking_lot::Mutex;
19use runner_core::normalize_under_root;
20use serde::{Deserialize, Serialize};
21use serde_json::{Map as JsonMap, Value, json};
22use std::collections::{BTreeMap, HashMap};
23use std::fmt;
24use std::fs::{self, File};
25use std::io::{BufWriter, Write};
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::time::Instant;
29use time::OffsetDateTime;
30use time::format_description::well_known::Rfc3339;
31use tokio::runtime::Runtime;
32use tracing::{info, warn};
33use uuid::Uuid;
34
35const PROVIDER_ID_DEV: &str = "greentic-dev";
36
37pub type TranscriptHook = Arc<dyn Fn(&Value) + Send + Sync>;
39
40#[derive(Clone, Debug, Serialize, Deserialize, Default)]
42pub struct OtlpHook {
43 pub endpoint: String,
44 #[serde(default)]
45 pub headers: Vec<(String, String)>,
46 #[serde(default)]
47 pub sample_all: bool,
48}
49
50#[derive(Clone, Debug)]
52#[non_exhaustive]
53pub enum Profile {
54 Dev(DevProfile),
55}
56
57impl Default for Profile {
58 fn default() -> Self {
59 Self::Dev(DevProfile::default())
60 }
61}
62
63#[derive(Clone, Debug)]
65pub struct DevProfile {
66 pub tenant_id: String,
67 pub team_id: String,
68 pub user_id: String,
69 pub max_node_wall_time_ms: u64,
70 pub max_run_wall_time_ms: u64,
71}
72
73impl Default for DevProfile {
74 fn default() -> Self {
75 Self {
76 tenant_id: "local-dev".to_string(),
77 team_id: "default".to_string(),
78 user_id: "developer".to_string(),
79 max_node_wall_time_ms: 30_000,
80 max_run_wall_time_ms: 600_000,
81 }
82 }
83}
84
85#[derive(Clone, Debug, Default)]
87pub struct TenantContext {
88 pub tenant_id: Option<String>,
89 pub team_id: Option<String>,
90 pub user_id: Option<String>,
91 pub session_id: Option<String>,
92}
93
94impl TenantContext {
95 pub fn default_local() -> Self {
96 Self {
97 tenant_id: Some("local-dev".into()),
98 team_id: Some("default".into()),
99 user_id: Some("developer".into()),
100 session_id: None,
101 }
102 }
103}
104
105#[derive(Clone)]
107pub struct RunOptions {
108 pub profile: Profile,
109 pub entry_flow: Option<String>,
110 pub input: Value,
111 pub ctx: TenantContext,
112 pub transcript: Option<TranscriptHook>,
113 pub otlp: Option<OtlpHook>,
114 pub mocks: MocksConfig,
115 pub artifacts_dir: Option<PathBuf>,
116 pub signing: SigningPolicy,
117 pub components_dir: Option<PathBuf>,
118 pub components_map: HashMap<String, PathBuf>,
119 pub dist_offline: bool,
120 pub dist_cache_dir: Option<PathBuf>,
121 pub allow_missing_hash: bool,
122}
123
124impl Default for RunOptions {
125 fn default() -> Self {
126 desktop_defaults()
127 }
128}
129
130impl fmt::Debug for RunOptions {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 f.debug_struct("RunOptions")
133 .field("profile", &self.profile)
134 .field("entry_flow", &self.entry_flow)
135 .field("input", &self.input)
136 .field("ctx", &self.ctx)
137 .field("transcript", &self.transcript.is_some())
138 .field("otlp", &self.otlp)
139 .field("mocks", &self.mocks)
140 .field("artifacts_dir", &self.artifacts_dir)
141 .field("signing", &self.signing)
142 .field("components_dir", &self.components_dir)
143 .field("components_map_len", &self.components_map.len())
144 .field("dist_offline", &self.dist_offline)
145 .field("dist_cache_dir", &self.dist_cache_dir)
146 .field("allow_missing_hash", &self.allow_missing_hash)
147 .finish()
148 }
149}
150
151#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
152pub enum SigningPolicy {
153 Strict,
154 DevOk,
155}
156
157#[derive(Clone, Debug)]
159pub struct Runner {
160 base: RunOptions,
161}
162
163impl Runner {
164 pub fn new() -> Self {
165 Self {
166 base: desktop_defaults(),
167 }
168 }
169
170 pub fn profile(mut self, profile: Profile) -> Self {
171 self.base.profile = profile;
172 self
173 }
174
175 pub fn with_mocks(mut self, mocks: MocksConfig) -> Self {
176 self.base.mocks = mocks;
177 self
178 }
179
180 pub fn configure(mut self, f: impl FnOnce(&mut RunOptions)) -> Self {
181 f(&mut self.base);
182 self
183 }
184
185 pub fn run_pack<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
186 run_pack_with_options(pack_path, self.base.clone())
187 }
188
189 pub fn run_pack_with<P: AsRef<Path>>(
190 &self,
191 pack_path: P,
192 f: impl FnOnce(&mut RunOptions),
193 ) -> Result<RunResult> {
194 let mut opts = self.base.clone();
195 f(&mut opts);
196 run_pack_with_options(pack_path, opts)
197 }
198
199 pub async fn run_pack_async<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
200 run_pack_with_options_async(pack_path, self.base.clone()).await
201 }
202
203 pub async fn run_pack_with_async<P: AsRef<Path>>(
204 &self,
205 pack_path: P,
206 f: impl FnOnce(&mut RunOptions),
207 ) -> Result<RunResult> {
208 let mut opts = self.base.clone();
209 f(&mut opts);
210 run_pack_with_options_async(pack_path, opts).await
211 }
212}
213
214impl Default for Runner {
215 fn default() -> Self {
216 Self::new()
217 }
218}
219
220pub fn run_pack_with_options<P: AsRef<Path>>(pack_path: P, opts: RunOptions) -> Result<RunResult> {
222 let runtime = Runtime::new().context("failed to create tokio runtime")?;
223 runtime.block_on(run_pack_async(pack_path.as_ref(), opts))
224}
225
226pub async fn run_pack_with_options_async<P: AsRef<Path>>(
228 pack_path: P,
229 opts: RunOptions,
230) -> Result<RunResult> {
231 run_pack_async(pack_path.as_ref(), opts).await
232}
233
234pub fn desktop_defaults() -> RunOptions {
236 let otlp = std::env::var("OTLP_ENDPOINT")
237 .ok()
238 .map(|endpoint| OtlpHook {
239 endpoint,
240 headers: Vec::new(),
241 sample_all: true,
242 });
243 RunOptions {
244 profile: Profile::Dev(DevProfile::default()),
245 entry_flow: None,
246 input: json!({}),
247 ctx: TenantContext::default_local(),
248 transcript: None,
249 otlp,
250 mocks: MocksConfig::default(),
251 artifacts_dir: None,
252 signing: SigningPolicy::DevOk,
253 components_dir: None,
254 components_map: HashMap::new(),
255 dist_offline: false,
256 dist_cache_dir: None,
257 allow_missing_hash: false,
258 }
259}
260
261async fn run_pack_async(pack_path: &Path, opts: RunOptions) -> Result<RunResult> {
262 let pack_path = normalize_pack_path(pack_path)?;
263 let resolved_profile = resolve_profile(&opts.profile, &opts.ctx);
264 if let Some(otlp) = &opts.otlp {
265 apply_otlp_hook(otlp);
266 }
267
268 let directories = prepare_run_dirs(opts.artifacts_dir.clone())?;
269 info!(run_dir = %directories.root.display(), "prepared desktop run directory");
270
271 let mock_layer = Arc::new(MockLayer::new(opts.mocks.clone(), &directories.root)?);
272
273 let recorder = Arc::new(RunRecorder::new(
274 directories.clone(),
275 &resolved_profile,
276 None,
277 PackMetadata::fallback(&pack_path),
278 opts.transcript.clone(),
279 )?);
280
281 let mock_sink: Arc<dyn MockEventSink> = recorder.clone();
282 mock_layer.register_sink(mock_sink);
283
284 if pack_path.is_file() {
285 match open_pack(&pack_path, to_reader_policy(opts.signing)) {
286 Ok(load) => {
287 let meta = &load.manifest.meta;
288 recorder.update_pack_metadata(PackMetadata {
289 pack_id: meta.pack_id.clone(),
290 version: meta.version.to_string(),
291 entry_flows: meta.entry_flows.clone(),
292 secret_requirements: Vec::new(),
293 });
294 }
295 Err(err) => {
296 recorder.record_verify_event("error", &err.message)?;
297 if opts.signing == SigningPolicy::DevOk && is_signature_error(&err.message) {
298 warn!(error = %err.message, "continuing despite signature error (dev policy)");
299 } else {
300 return Err(anyhow!("pack verification failed: {}", err.message));
301 }
302 }
303 }
304 } else {
305 tracing::debug!(
306 path = %pack_path.display(),
307 "skipping pack verification for directory input"
308 );
309 }
310
311 let host_config = Arc::new(build_host_config(&resolved_profile, &directories));
312 let mut component_resolution = ComponentResolution::default();
313 if let Some(dir) = opts.components_dir.clone() {
314 component_resolution.materialized_root = Some(dir);
315 } else if pack_path.is_dir() {
316 component_resolution.materialized_root = Some(pack_path.clone());
317 }
318 component_resolution.overrides = opts.components_map.clone();
319 component_resolution.dist_offline = opts.dist_offline;
320 component_resolution.dist_cache_dir = opts.dist_cache_dir.clone();
321 component_resolution.allow_missing_hash = opts.allow_missing_hash;
322 let archive_source = if pack_path
323 .extension()
324 .and_then(|ext| ext.to_str())
325 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
326 .unwrap_or(false)
327 {
328 Some(&pack_path)
329 } else {
330 None
331 };
332
333 let session_store = new_session_store();
334 let state_store = new_state_store();
335 let secrets_manager = default_manager().context("failed to initialise secrets backend")?;
336 let pack = Arc::new(
337 PackRuntime::load(
338 &pack_path,
339 Arc::clone(&host_config),
340 Some(Arc::clone(&mock_layer)),
341 archive_source.map(|p| p as &Path),
342 Some(Arc::clone(&session_store)),
343 Some(Arc::clone(&state_store)),
344 Arc::new(RunnerWasiPolicy::default()),
345 secrets_manager,
346 host_config.oauth_broker_config(),
347 false,
348 component_resolution,
349 )
350 .await
351 .with_context(|| format!("failed to load pack {}", pack_path.display()))?,
352 );
353 recorder.update_pack_metadata(pack.metadata().clone());
354
355 let flows = pack
356 .list_flows()
357 .await
358 .context("failed to enumerate flows")?;
359 let entry_flow_id = resolve_entry_flow(opts.entry_flow.clone(), pack.metadata(), &flows)?;
360 recorder.set_flow_id(&entry_flow_id);
361
362 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&host_config))
363 .await
364 .context("failed to prime flow engine")?;
365
366 let started_at = OffsetDateTime::now_utc();
367 let tenant_str = host_config.tenant.clone();
368 let session_id_owned = resolved_profile.session_id.clone();
369 let provider_id_owned = resolved_profile.provider_id.clone();
370 let recorder_ref: &RunRecorder = &recorder;
371 let mock_ref: &MockLayer = &mock_layer;
372 let ctx = FlowContext {
373 tenant: &tenant_str,
374 pack_id: pack.metadata().pack_id.as_str(),
375 flow_id: &entry_flow_id,
376 node_id: None,
377 tool: None,
378 action: Some("run_pack"),
379 session_id: Some(session_id_owned.as_str()),
380 provider_id: Some(provider_id_owned.as_str()),
381 retry_config: host_config.retry_config().into(),
382 attempt: 1,
383 observer: Some(recorder_ref),
384 mocks: Some(mock_ref),
385 };
386
387 let execution = engine.execute(ctx, opts.input.clone()).await;
388 let finished_at = OffsetDateTime::now_utc();
389
390 let status = match execution {
391 Ok(result) => match result.status {
392 greentic_runner_host::runner::engine::FlowStatus::Completed => RunCompletion::Ok,
393 greentic_runner_host::runner::engine::FlowStatus::Waiting(wait) => {
394 let reason = wait
395 .reason
396 .unwrap_or_else(|| "flow paused unexpectedly".to_string());
397 RunCompletion::Err(anyhow::anyhow!(reason))
398 }
399 },
400 Err(err) => RunCompletion::Err(err),
401 };
402
403 let result = recorder.finalise(status, started_at, finished_at)?;
404
405 let run_json_path = directories.root.join("run.json");
406 fs::write(&run_json_path, serde_json::to_vec_pretty(&result)?)
407 .with_context(|| format!("failed to write run summary {}", run_json_path.display()))?;
408
409 Ok(result)
410}
411
412fn apply_otlp_hook(hook: &OtlpHook) {
413 info!(
414 endpoint = %hook.endpoint,
415 sample_all = hook.sample_all,
416 headers = %hook.headers.len(),
417 "OTLP hook requested (set OTEL_* env vars before invoking run_pack)"
418 );
419}
420
421fn normalize_pack_path(path: &Path) -> Result<PathBuf> {
422 if path.is_absolute() {
423 let parent = path
424 .parent()
425 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
426 let root = parent
427 .canonicalize()
428 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
429 let file = path
430 .file_name()
431 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
432 return normalize_under_root(&root, Path::new(file));
433 }
434
435 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
436 let base = if let Some(parent) = path.parent() {
437 cwd.join(parent)
438 } else {
439 cwd
440 };
441 let root = base
442 .canonicalize()
443 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
444 let file = path
445 .file_name()
446 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
447 normalize_under_root(&root, Path::new(file))
448}
449
450fn prepare_run_dirs(root_override: Option<PathBuf>) -> Result<RunDirectories> {
451 let root = if let Some(dir) = root_override {
452 dir
453 } else {
454 let timestamp = OffsetDateTime::now_utc()
455 .format(&Rfc3339)
456 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
457 .replace(':', "-");
458 let short_id = Uuid::new_v4()
459 .to_string()
460 .chars()
461 .take(6)
462 .collect::<String>();
463 PathBuf::from(".greentic")
464 .join("runs")
465 .join(format!("{timestamp}_{short_id}"))
466 };
467
468 fs::create_dir_all(&root).with_context(|| format!("failed to create {}", root.display()))?;
469 let logs = root.join("logs");
470 let resolved = root.join("resolved_config");
471 fs::create_dir_all(&logs).with_context(|| format!("failed to create {}", logs.display()))?;
472 fs::create_dir_all(&resolved)
473 .with_context(|| format!("failed to create {}", resolved.display()))?;
474
475 Ok(RunDirectories {
476 root,
477 logs,
478 resolved,
479 })
480}
481
482fn resolve_entry_flow(
483 override_id: Option<String>,
484 metadata: &PackMetadata,
485 flows: &[FlowDescriptor],
486) -> Result<String> {
487 if let Some(flow) = override_id {
488 return Ok(flow);
489 }
490 if let Some(first) = metadata.entry_flows.first() {
491 return Ok(first.clone());
492 }
493 flows
494 .first()
495 .map(|f| f.id.clone())
496 .ok_or_else(|| anyhow!("pack does not declare any flows"))
497}
498
499fn is_signature_error(message: &str) -> bool {
500 message.to_ascii_lowercase().contains("signature")
501}
502
503fn to_reader_policy(policy: SigningPolicy) -> greentic_pack::reader::SigningPolicy {
504 match policy {
505 SigningPolicy::Strict => greentic_pack::reader::SigningPolicy::Strict,
506 SigningPolicy::DevOk => greentic_pack::reader::SigningPolicy::DevOk,
507 }
508}
509
510fn resolve_profile(profile: &Profile, ctx: &TenantContext) -> ResolvedProfile {
511 match profile {
512 Profile::Dev(dev) => ResolvedProfile {
513 tenant_id: ctx
514 .tenant_id
515 .clone()
516 .unwrap_or_else(|| dev.tenant_id.clone()),
517 team_id: ctx.team_id.clone().unwrap_or_else(|| dev.team_id.clone()),
518 user_id: ctx.user_id.clone().unwrap_or_else(|| dev.user_id.clone()),
519 session_id: ctx
520 .session_id
521 .clone()
522 .unwrap_or_else(|| Uuid::new_v4().to_string()),
523 provider_id: PROVIDER_ID_DEV.to_string(),
524 max_node_wall_time_ms: dev.max_node_wall_time_ms,
525 max_run_wall_time_ms: dev.max_run_wall_time_ms,
526 },
527 }
528}
529
530fn build_host_config(profile: &ResolvedProfile, dirs: &RunDirectories) -> HostConfig {
531 HostConfig {
532 tenant: profile.tenant_id.clone(),
533 bindings_path: dirs.resolved.join("dev.bindings.yaml"),
534 flow_type_bindings: HashMap::new(),
535 rate_limits: RateLimits::default(),
536 retry: FlowRetryConfig::default(),
537 http_enabled: false,
538 secrets_policy: SecretsPolicy::allow_all(),
539 state_store_policy: StateStorePolicy::default(),
540 webhook_policy: WebhookPolicy::default(),
541 timers: Vec::new(),
542 oauth: None,
543 mocks: None,
544 pack_bindings: Vec::new(),
545 env_passthrough: Vec::new(),
546 trace: TraceConfig::from_env(),
547 validation: ValidationConfig::from_env(),
548 operator_policy: OperatorPolicy::allow_all(),
549 }
550}
551
552#[allow(dead_code)]
553#[derive(Clone, Debug)]
554struct ResolvedProfile {
555 tenant_id: String,
556 team_id: String,
557 user_id: String,
558 session_id: String,
559 provider_id: String,
560 max_node_wall_time_ms: u64,
561 max_run_wall_time_ms: u64,
562}
563
564#[derive(Clone, Debug, Serialize, Deserialize)]
565pub struct RunResult {
566 pub session_id: String,
567 pub pack_id: String,
568 pub pack_version: String,
569 pub flow_id: String,
570 pub started_at_utc: String,
571 pub finished_at_utc: String,
572 pub status: RunStatus,
573 pub error: Option<String>,
574 pub node_summaries: Vec<NodeSummary>,
575 pub failures: BTreeMap<String, NodeFailure>,
576 pub artifacts_dir: PathBuf,
577}
578
579#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
580pub enum RunStatus {
581 Success,
582 PartialFailure,
583 Failure,
584}
585
586#[derive(Clone, Debug, Serialize, Deserialize)]
587pub struct NodeSummary {
588 pub node_id: String,
589 pub component: String,
590 pub status: NodeStatus,
591 pub duration_ms: u64,
592}
593
594#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
595pub enum NodeStatus {
596 Ok,
597 Skipped,
598 Error,
599}
600
601#[derive(Clone, Debug, Serialize, Deserialize)]
602pub struct NodeFailure {
603 pub code: String,
604 pub message: String,
605 pub details: Value,
606 pub transcript_offsets: (u64, u64),
607 pub log_paths: Vec<PathBuf>,
608}
609
610#[derive(Clone)]
611struct RunDirectories {
612 root: PathBuf,
613 logs: PathBuf,
614 resolved: PathBuf,
615}
616
617enum RunCompletion {
618 Ok,
619 Err(anyhow::Error),
620}
621
622struct RunRecorder {
623 directories: RunDirectories,
624 profile: ResolvedProfile,
625 flow_id: Mutex<String>,
626 pack_meta: Mutex<PackMetadata>,
627 transcript: Mutex<TranscriptWriter>,
628 state: Mutex<RunRecorderState>,
629}
630
631impl RunRecorder {
632 fn new(
633 dirs: RunDirectories,
634 profile: &ResolvedProfile,
635 flow_id: Option<String>,
636 pack_meta: PackMetadata,
637 hook: Option<TranscriptHook>,
638 ) -> Result<Self> {
639 let transcript_path = dirs.root.join("transcript.jsonl");
640 let file = File::create(&transcript_path)
641 .with_context(|| format!("failed to open {}", transcript_path.display()))?;
642 Ok(Self {
643 directories: dirs,
644 profile: profile.clone(),
645 flow_id: Mutex::new(flow_id.unwrap_or_else(|| "unknown".into())),
646 pack_meta: Mutex::new(pack_meta),
647 transcript: Mutex::new(TranscriptWriter::new(BufWriter::new(file), hook)),
648 state: Mutex::new(RunRecorderState::default()),
649 })
650 }
651
652 fn finalise(
653 &self,
654 completion: RunCompletion,
655 started_at: OffsetDateTime,
656 finished_at: OffsetDateTime,
657 ) -> Result<RunResult> {
658 let status = match &completion {
659 RunCompletion::Ok => RunStatus::Success,
660 RunCompletion::Err(_) => RunStatus::Failure,
661 };
662
663 let mut runtime_error = None;
664 if let RunCompletion::Err(err) = completion {
665 warn!(error = %err, "pack execution failed");
666 eprintln!("pack execution failed: {err}");
667 runtime_error = Some(err.to_string());
668 }
669
670 let started = started_at
671 .format(&Rfc3339)
672 .unwrap_or_else(|_| started_at.to_string());
673 let finished = finished_at
674 .format(&Rfc3339)
675 .unwrap_or_else(|_| finished_at.to_string());
676
677 let state = self.state.lock();
678 let mut summaries = Vec::new();
679 let mut failures = BTreeMap::new();
680 for node_id in &state.order {
681 if let Some(record) = state.nodes.get(node_id) {
682 let duration = record.duration_ms.unwrap_or(0);
683 summaries.push(NodeSummary {
684 node_id: node_id.clone(),
685 component: record.component.clone(),
686 status: record.status.clone(),
687 duration_ms: duration,
688 });
689 if record.status == NodeStatus::Error {
690 let start_offset = record.transcript_start.unwrap_or(0);
691 let err_offset = record.transcript_error.unwrap_or(start_offset);
692 failures.insert(
693 node_id.clone(),
694 NodeFailure {
695 code: record
696 .failure_code
697 .clone()
698 .unwrap_or_else(|| "component-failed".into()),
699 message: record
700 .failure_message
701 .clone()
702 .unwrap_or_else(|| "node failed".into()),
703 details: record
704 .failure_details
705 .clone()
706 .unwrap_or_else(|| json!({ "node": node_id })),
707 transcript_offsets: (start_offset, err_offset),
708 log_paths: record.log_paths.clone(),
709 },
710 );
711 }
712 }
713 }
714
715 let mut final_status = status;
716 if final_status == RunStatus::Success && !failures.is_empty() {
717 final_status = RunStatus::PartialFailure;
718 }
719
720 if let Some(error) = &runtime_error {
721 failures.insert(
722 "_runtime".into(),
723 NodeFailure {
724 code: "runtime-error".into(),
725 message: error.clone(),
726 details: json!({ "stage": "execute" }),
727 transcript_offsets: (0, 0),
728 log_paths: Vec::new(),
729 },
730 );
731 }
732
733 let pack_meta = self.pack_meta.lock();
734 let flow_id = self.flow_id.lock().clone();
735 Ok(RunResult {
736 session_id: self.profile.session_id.clone(),
737 pack_id: pack_meta.pack_id.clone(),
738 pack_version: pack_meta.version.clone(),
739 flow_id,
740 started_at_utc: started,
741 finished_at_utc: finished,
742 status: final_status,
743 error: runtime_error,
744 node_summaries: summaries,
745 failures,
746 artifacts_dir: self.directories.root.clone(),
747 })
748 }
749
750 fn set_flow_id(&self, flow_id: &str) {
751 *self.flow_id.lock() = flow_id.to_string();
752 }
753
754 fn update_pack_metadata(&self, meta: PackMetadata) {
755 *self.pack_meta.lock() = meta;
756 }
757
758 fn current_flow_id(&self) -> String {
759 self.flow_id.lock().clone()
760 }
761
762 fn record_verify_event(&self, status: &str, message: &str) -> Result<()> {
763 let timestamp = OffsetDateTime::now_utc();
764 let event = json!({
765 "ts": timestamp
766 .format(&Rfc3339)
767 .unwrap_or_else(|_| timestamp.to_string()),
768 "session_id": self.profile.session_id,
769 "flow_id": self.current_flow_id(),
770 "node_id": Value::Null,
771 "component": "verify.pack",
772 "phase": "verify",
773 "status": status,
774 "inputs": Value::Null,
775 "outputs": Value::Null,
776 "error": json!({ "message": message }),
777 "metrics": Value::Null,
778 "schema_id": Value::Null,
779 "defaults_applied": Value::Null,
780 "redactions": Value::Array(Vec::new()),
781 });
782 self.transcript.lock().write(&event).map(|_| ())
783 }
784
785 fn write_mock_event(&self, capability: &str, provider: &str, payload: Value) -> Result<()> {
786 let timestamp = OffsetDateTime::now_utc();
787 let event = json!({
788 "ts": timestamp
789 .format(&Rfc3339)
790 .unwrap_or_else(|_| timestamp.to_string()),
791 "session_id": self.profile.session_id,
792 "flow_id": self.current_flow_id(),
793 "node_id": Value::Null,
794 "component": format!("mock::{capability}"),
795 "phase": "mock",
796 "status": "ok",
797 "inputs": json!({ "capability": capability, "provider": provider }),
798 "outputs": payload,
799 "error": Value::Null,
800 "metrics": Value::Null,
801 "schema_id": Value::Null,
802 "defaults_applied": Value::Null,
803 "redactions": Value::Array(Vec::new()),
804 });
805 self.transcript.lock().write(&event).map(|_| ())
806 }
807
808 fn handle_node_start(&self, event: &NodeEvent<'_>) -> Result<()> {
809 let timestamp = OffsetDateTime::now_utc();
810 let (redacted_payload, redactions) = redact_value(event.payload, "$.inputs.payload");
811 let inputs = json!({
812 "payload": redacted_payload,
813 "context": {
814 "tenant_id": self.profile.tenant_id.as_str(),
815 "team_id": self.profile.team_id.as_str(),
816 "user_id": self.profile.user_id.as_str(),
817 }
818 });
819 let flow_id = self.current_flow_id();
820 let event_json = build_transcript_event(TranscriptEventArgs {
821 profile: &self.profile,
822 flow_id: &flow_id,
823 node_id: event.node_id,
824 component: &event.node.component,
825 phase: "start",
826 status: "ok",
827 timestamp,
828 inputs,
829 outputs: Value::Null,
830 error: Value::Null,
831 redactions,
832 });
833 let (start_offset, _) = self.transcript.lock().write(&event_json)?;
834
835 let mut state = self.state.lock();
836 let node_key = event.node_id.to_string();
837 if !state.order.iter().any(|id| id == &node_key) {
838 state.order.push(node_key.clone());
839 }
840 let entry = state.nodes.entry(node_key).or_insert_with(|| {
841 NodeExecutionRecord::new(event.node.component.clone(), &self.directories)
842 });
843 entry.start_instant = Some(Instant::now());
844 entry.status = NodeStatus::Ok;
845 entry.transcript_start = Some(start_offset);
846 Ok(())
847 }
848
849 fn handle_node_end(&self, event: &NodeEvent<'_>, output: &Value) -> Result<()> {
850 let timestamp = OffsetDateTime::now_utc();
851 let (redacted_output, redactions) = redact_value(output, "$.outputs");
852 let flow_id = self.current_flow_id();
853 let event_json = build_transcript_event(TranscriptEventArgs {
854 profile: &self.profile,
855 flow_id: &flow_id,
856 node_id: event.node_id,
857 component: &event.node.component,
858 phase: "end",
859 status: "ok",
860 timestamp,
861 inputs: Value::Null,
862 outputs: redacted_output,
863 error: Value::Null,
864 redactions,
865 });
866 self.transcript.lock().write(&event_json)?;
867
868 let mut state = self.state.lock();
869 if let Some(entry) = state.nodes.get_mut(event.node_id)
870 && let Some(started) = entry.start_instant.take()
871 {
872 entry.duration_ms = Some(started.elapsed().as_millis() as u64);
873 }
874 Ok(())
875 }
876
877 fn handle_node_error(
878 &self,
879 event: &NodeEvent<'_>,
880 error: &dyn std::error::Error,
881 ) -> Result<()> {
882 let timestamp = OffsetDateTime::now_utc();
883 let error_message = error.to_string();
884 let error_json = json!({
885 "code": "component-failed",
886 "message": error_message,
887 "details": {
888 "node": event.node_id,
889 }
890 });
891 let flow_id = self.current_flow_id();
892 let event_json = build_transcript_event(TranscriptEventArgs {
893 profile: &self.profile,
894 flow_id: &flow_id,
895 node_id: event.node_id,
896 component: &event.node.component,
897 phase: "error",
898 status: "error",
899 timestamp,
900 inputs: Value::Null,
901 outputs: Value::Null,
902 error: error_json.clone(),
903 redactions: Vec::new(),
904 });
905 let (_, end_offset) = self.transcript.lock().write(&event_json)?;
906
907 let mut state = self.state.lock();
908 if let Some(entry) = state.nodes.get_mut(event.node_id) {
909 entry.status = NodeStatus::Error;
910 if let Some(started) = entry.start_instant.take() {
911 entry.duration_ms = Some(started.elapsed().as_millis() as u64);
912 }
913 entry.transcript_error = Some(end_offset);
914 entry.failure_code = Some("component-failed".to_string());
915 entry.failure_message = Some(error_message.clone());
916 entry.failure_details = Some(error_json);
917 let log_path = self
918 .directories
919 .logs
920 .join(format!("{}.stderr.log", sanitize_id(event.node_id)));
921 if let Ok(mut file) = File::create(&log_path) {
922 let _ = writeln!(file, "{error_message}");
923 }
924 entry.log_paths.push(log_path);
925 }
926 Ok(())
927 }
928}
929
930impl ExecutionObserver for RunRecorder {
931 fn on_node_start(&self, event: &NodeEvent<'_>) {
932 if let Err(err) = self.handle_node_start(event) {
933 warn!(node = event.node_id, error = %err, "failed to record node start");
934 }
935 }
936
937 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
938 if let Err(err) = self.handle_node_end(event, output) {
939 warn!(node = event.node_id, error = %err, "failed to record node end");
940 }
941 }
942
943 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
944 if let Err(err) = self.handle_node_error(event, error) {
945 warn!(node = event.node_id, error = %err, "failed to record node error");
946 }
947 }
948}
949
950impl MockEventSink for RunRecorder {
951 fn on_mock_event(&self, capability: &str, provider: &str, payload: &Value) {
952 if let Err(err) = self.write_mock_event(capability, provider, payload.clone()) {
953 warn!(?capability, ?provider, error = %err, "failed to record mock event");
954 }
955 }
956}
957
958#[derive(Default)]
959struct RunRecorderState {
960 nodes: BTreeMap<String, NodeExecutionRecord>,
961 order: Vec<String>,
962}
963
964#[derive(Clone)]
965struct NodeExecutionRecord {
966 component: String,
967 status: NodeStatus,
968 duration_ms: Option<u64>,
969 transcript_start: Option<u64>,
970 transcript_error: Option<u64>,
971 log_paths: Vec<PathBuf>,
972 failure_code: Option<String>,
973 failure_message: Option<String>,
974 failure_details: Option<Value>,
975 start_instant: Option<Instant>,
976}
977
978impl NodeExecutionRecord {
979 fn new(component: String, _dirs: &RunDirectories) -> Self {
980 Self {
981 component,
982 status: NodeStatus::Ok,
983 duration_ms: None,
984 transcript_start: None,
985 transcript_error: None,
986 log_paths: Vec::new(),
987 failure_code: None,
988 failure_message: None,
989 failure_details: None,
990 start_instant: None,
991 }
992 }
993}
994
995struct TranscriptWriter {
996 writer: BufWriter<File>,
997 offset: u64,
998 hook: Option<TranscriptHook>,
999}
1000
1001impl TranscriptWriter {
1002 fn new(writer: BufWriter<File>, hook: Option<TranscriptHook>) -> Self {
1003 Self {
1004 writer,
1005 offset: 0,
1006 hook,
1007 }
1008 }
1009
1010 fn write(&mut self, value: &Value) -> Result<(u64, u64)> {
1011 let line = serde_json::to_vec(value)?;
1012 let start = self.offset;
1013 self.writer.write_all(&line)?;
1014 self.writer.write_all(b"\n")?;
1015 self.writer.flush()?;
1016 self.offset += line.len() as u64 + 1;
1017 if let Some(hook) = &self.hook {
1018 hook(value);
1019 }
1020 Ok((start, self.offset))
1021 }
1022}
1023
1024struct TranscriptEventArgs<'a> {
1025 profile: &'a ResolvedProfile,
1026 flow_id: &'a str,
1027 node_id: &'a str,
1028 component: &'a str,
1029 phase: &'a str,
1030 status: &'a str,
1031 timestamp: OffsetDateTime,
1032 inputs: Value,
1033 outputs: Value,
1034 error: Value,
1035 redactions: Vec<String>,
1036}
1037
1038fn build_transcript_event(args: TranscriptEventArgs<'_>) -> Value {
1039 let ts = args
1040 .timestamp
1041 .format(&Rfc3339)
1042 .unwrap_or_else(|_| args.timestamp.to_string());
1043 json!({
1044 "ts": ts,
1045 "session_id": args.profile.session_id.as_str(),
1046 "flow_id": args.flow_id,
1047 "node_id": args.node_id,
1048 "component": args.component,
1049 "phase": args.phase,
1050 "status": args.status,
1051 "inputs": args.inputs,
1052 "outputs": args.outputs,
1053 "error": args.error,
1054 "metrics": {
1055 "duration_ms": null,
1056 "cpu_time_ms": null,
1057 "mem_peak_bytes": null,
1058 },
1059 "schema_id": Value::Null,
1060 "defaults_applied": Value::Array(Vec::new()),
1061 "redactions": args.redactions,
1062 })
1063}
1064
1065fn redact_value(value: &Value, base: &str) -> (Value, Vec<String>) {
1066 let mut paths = Vec::new();
1067 let redacted = redact_recursive(value, base, &mut paths);
1068 (redacted, paths)
1069}
1070
1071fn redact_recursive(value: &Value, path: &str, acc: &mut Vec<String>) -> Value {
1072 match value {
1073 Value::Object(map) => {
1074 let mut new_map = JsonMap::new();
1075 for (key, val) in map {
1076 let child_path = format!("{path}.{key}");
1077 if is_sensitive_key(key) {
1078 acc.push(child_path);
1079 new_map.insert(key.clone(), Value::String("__REDACTED__".into()));
1080 } else {
1081 new_map.insert(key.clone(), redact_recursive(val, &child_path, acc));
1082 }
1083 }
1084 Value::Object(new_map)
1085 }
1086 Value::Array(items) => {
1087 let mut new_items = Vec::new();
1088 for (idx, item) in items.iter().enumerate() {
1089 let child_path = format!("{path}[{idx}]");
1090 new_items.push(redact_recursive(item, &child_path, acc));
1091 }
1092 Value::Array(new_items)
1093 }
1094 other => other.clone(),
1095 }
1096}
1097
1098fn is_sensitive_key(key: &str) -> bool {
1099 let lower = key.to_ascii_lowercase();
1100 const MARKERS: [&str; 5] = ["secret", "token", "password", "authorization", "cookie"];
1101 MARKERS.iter().any(|marker| lower.contains(marker))
1102}
1103
1104fn sanitize_id(value: &str) -> String {
1105 value
1106 .chars()
1107 .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
1108 .collect()
1109}