1use anyhow::{Context, Result, anyhow};
2use greentic_pack::reader::open_pack;
3use greentic_runner_host::RunnerWasiPolicy;
4use greentic_runner_host::config::{
5 FlowRetryConfig, HostConfig, RateLimits, SecretsPolicy, StateStorePolicy, WebhookPolicy,
6};
7use greentic_runner_host::pack::{ComponentResolution, FlowDescriptor, PackMetadata, PackRuntime};
8use greentic_runner_host::runner::engine::{ExecutionObserver, FlowContext, FlowEngine, NodeEvent};
9pub use greentic_runner_host::runner::mocks::{
10 HttpMock, HttpMockMode, KvMock, MocksConfig, SecretsMock, TelemetryMock, TimeMock, ToolsMock,
11};
12use greentic_runner_host::runner::mocks::{MockEventSink, MockLayer};
13use greentic_runner_host::secrets::default_manager;
14use greentic_runner_host::storage::{new_session_store, new_state_store};
15use greentic_runner_host::trace::TraceConfig;
16use greentic_runner_host::validate::ValidationConfig;
17use parking_lot::Mutex;
18use runner_core::normalize_under_root;
19use serde::{Deserialize, Serialize};
20use serde_json::{Map as JsonMap, Value, json};
21use std::collections::{BTreeMap, HashMap};
22use std::fmt;
23use std::fs::{self, File};
24use std::io::{BufWriter, Write};
25use std::path::{Path, PathBuf};
26use std::sync::Arc;
27use std::time::Instant;
28use time::OffsetDateTime;
29use time::format_description::well_known::Rfc3339;
30use tokio::runtime::Runtime;
31use tracing::{info, warn};
32use uuid::Uuid;
33
34const PROVIDER_ID_DEV: &str = "greentic-dev";
35
36pub type TranscriptHook = Arc<dyn Fn(&Value) + Send + Sync>;
38
39#[derive(Clone, Debug, Serialize, Deserialize, Default)]
41pub struct OtlpHook {
42 pub endpoint: String,
43 #[serde(default)]
44 pub headers: Vec<(String, String)>,
45 #[serde(default)]
46 pub sample_all: bool,
47}
48
49#[derive(Clone, Debug)]
51#[non_exhaustive]
52pub enum Profile {
53 Dev(DevProfile),
54}
55
56impl Default for Profile {
57 fn default() -> Self {
58 Self::Dev(DevProfile::default())
59 }
60}
61
62#[derive(Clone, Debug)]
64pub struct DevProfile {
65 pub tenant_id: String,
66 pub team_id: String,
67 pub user_id: String,
68 pub max_node_wall_time_ms: u64,
69 pub max_run_wall_time_ms: u64,
70}
71
72impl Default for DevProfile {
73 fn default() -> Self {
74 Self {
75 tenant_id: "local-dev".to_string(),
76 team_id: "default".to_string(),
77 user_id: "developer".to_string(),
78 max_node_wall_time_ms: 30_000,
79 max_run_wall_time_ms: 600_000,
80 }
81 }
82}
83
84#[derive(Clone, Debug, Default)]
86pub struct TenantContext {
87 pub tenant_id: Option<String>,
88 pub team_id: Option<String>,
89 pub user_id: Option<String>,
90 pub session_id: Option<String>,
91}
92
93impl TenantContext {
94 pub fn default_local() -> Self {
95 Self {
96 tenant_id: Some("local-dev".into()),
97 team_id: Some("default".into()),
98 user_id: Some("developer".into()),
99 session_id: None,
100 }
101 }
102}
103
104#[derive(Clone)]
106pub struct RunOptions {
107 pub profile: Profile,
108 pub entry_flow: Option<String>,
109 pub input: Value,
110 pub ctx: TenantContext,
111 pub transcript: Option<TranscriptHook>,
112 pub otlp: Option<OtlpHook>,
113 pub mocks: MocksConfig,
114 pub artifacts_dir: Option<PathBuf>,
115 pub signing: SigningPolicy,
116 pub components_dir: Option<PathBuf>,
117 pub components_map: HashMap<String, PathBuf>,
118 pub dist_offline: bool,
119 pub dist_cache_dir: Option<PathBuf>,
120 pub allow_missing_hash: bool,
121}
122
123impl Default for RunOptions {
124 fn default() -> Self {
125 desktop_defaults()
126 }
127}
128
129impl fmt::Debug for RunOptions {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 f.debug_struct("RunOptions")
132 .field("profile", &self.profile)
133 .field("entry_flow", &self.entry_flow)
134 .field("input", &self.input)
135 .field("ctx", &self.ctx)
136 .field("transcript", &self.transcript.is_some())
137 .field("otlp", &self.otlp)
138 .field("mocks", &self.mocks)
139 .field("artifacts_dir", &self.artifacts_dir)
140 .field("signing", &self.signing)
141 .field("components_dir", &self.components_dir)
142 .field("components_map_len", &self.components_map.len())
143 .field("dist_offline", &self.dist_offline)
144 .field("dist_cache_dir", &self.dist_cache_dir)
145 .field("allow_missing_hash", &self.allow_missing_hash)
146 .finish()
147 }
148}
149
150#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
151pub enum SigningPolicy {
152 Strict,
153 DevOk,
154}
155
156#[derive(Clone, Debug)]
158pub struct Runner {
159 base: RunOptions,
160}
161
162impl Runner {
163 pub fn new() -> Self {
164 Self {
165 base: desktop_defaults(),
166 }
167 }
168
169 pub fn profile(mut self, profile: Profile) -> Self {
170 self.base.profile = profile;
171 self
172 }
173
174 pub fn with_mocks(mut self, mocks: MocksConfig) -> Self {
175 self.base.mocks = mocks;
176 self
177 }
178
179 pub fn configure(mut self, f: impl FnOnce(&mut RunOptions)) -> Self {
180 f(&mut self.base);
181 self
182 }
183
184 pub fn run_pack<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
185 run_pack_with_options(pack_path, self.base.clone())
186 }
187
188 pub fn run_pack_with<P: AsRef<Path>>(
189 &self,
190 pack_path: P,
191 f: impl FnOnce(&mut RunOptions),
192 ) -> Result<RunResult> {
193 let mut opts = self.base.clone();
194 f(&mut opts);
195 run_pack_with_options(pack_path, opts)
196 }
197}
198
199impl Default for Runner {
200 fn default() -> Self {
201 Self::new()
202 }
203}
204
205pub fn run_pack_with_options<P: AsRef<Path>>(pack_path: P, opts: RunOptions) -> Result<RunResult> {
207 let runtime = Runtime::new().context("failed to create tokio runtime")?;
208 runtime.block_on(run_pack_async(pack_path.as_ref(), opts))
209}
210
211pub fn desktop_defaults() -> RunOptions {
213 let otlp = std::env::var("OTLP_ENDPOINT")
214 .ok()
215 .map(|endpoint| OtlpHook {
216 endpoint,
217 headers: Vec::new(),
218 sample_all: true,
219 });
220 RunOptions {
221 profile: Profile::Dev(DevProfile::default()),
222 entry_flow: None,
223 input: json!({}),
224 ctx: TenantContext::default_local(),
225 transcript: None,
226 otlp,
227 mocks: MocksConfig::default(),
228 artifacts_dir: None,
229 signing: SigningPolicy::DevOk,
230 components_dir: None,
231 components_map: HashMap::new(),
232 dist_offline: false,
233 dist_cache_dir: None,
234 allow_missing_hash: false,
235 }
236}
237
238async fn run_pack_async(pack_path: &Path, opts: RunOptions) -> Result<RunResult> {
239 let pack_path = normalize_pack_path(pack_path)?;
240 let resolved_profile = resolve_profile(&opts.profile, &opts.ctx);
241 if let Some(otlp) = &opts.otlp {
242 apply_otlp_hook(otlp);
243 }
244
245 let directories = prepare_run_dirs(opts.artifacts_dir.clone())?;
246 info!(run_dir = %directories.root.display(), "prepared desktop run directory");
247
248 let mock_layer = Arc::new(MockLayer::new(opts.mocks.clone(), &directories.root)?);
249
250 let recorder = Arc::new(RunRecorder::new(
251 directories.clone(),
252 &resolved_profile,
253 None,
254 PackMetadata::fallback(&pack_path),
255 opts.transcript.clone(),
256 )?);
257
258 let mock_sink: Arc<dyn MockEventSink> = recorder.clone();
259 mock_layer.register_sink(mock_sink);
260
261 if pack_path.is_file() {
262 match open_pack(&pack_path, to_reader_policy(opts.signing)) {
263 Ok(load) => {
264 let meta = &load.manifest.meta;
265 recorder.update_pack_metadata(PackMetadata {
266 pack_id: meta.pack_id.clone(),
267 version: meta.version.to_string(),
268 entry_flows: meta.entry_flows.clone(),
269 secret_requirements: Vec::new(),
270 });
271 }
272 Err(err) => {
273 recorder.record_verify_event("error", &err.message)?;
274 if opts.signing == SigningPolicy::DevOk && is_signature_error(&err.message) {
275 warn!(error = %err.message, "continuing despite signature error (dev policy)");
276 } else {
277 return Err(anyhow!("pack verification failed: {}", err.message));
278 }
279 }
280 }
281 } else {
282 tracing::debug!(
283 path = %pack_path.display(),
284 "skipping pack verification for directory input"
285 );
286 }
287
288 let host_config = Arc::new(build_host_config(&resolved_profile, &directories));
289 let mut component_resolution = ComponentResolution::default();
290 if let Some(dir) = opts.components_dir.clone() {
291 component_resolution.materialized_root = Some(dir);
292 } else if pack_path.is_dir() {
293 component_resolution.materialized_root = Some(pack_path.clone());
294 }
295 component_resolution.overrides = opts.components_map.clone();
296 component_resolution.dist_offline = opts.dist_offline;
297 component_resolution.dist_cache_dir = opts.dist_cache_dir.clone();
298 component_resolution.allow_missing_hash = opts.allow_missing_hash;
299 let archive_source = if pack_path
300 .extension()
301 .and_then(|ext| ext.to_str())
302 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
303 .unwrap_or(false)
304 {
305 Some(&pack_path)
306 } else {
307 None
308 };
309
310 let session_store = new_session_store();
311 let state_store = new_state_store();
312 let secrets_manager = default_manager().context("failed to initialise secrets backend")?;
313 let pack = Arc::new(
314 PackRuntime::load(
315 &pack_path,
316 Arc::clone(&host_config),
317 Some(Arc::clone(&mock_layer)),
318 archive_source.map(|p| p as &Path),
319 Some(Arc::clone(&session_store)),
320 Some(Arc::clone(&state_store)),
321 Arc::new(RunnerWasiPolicy::default()),
322 secrets_manager,
323 host_config.oauth_broker_config(),
324 false,
325 component_resolution,
326 )
327 .await
328 .with_context(|| format!("failed to load pack {}", pack_path.display()))?,
329 );
330 recorder.update_pack_metadata(pack.metadata().clone());
331
332 let flows = pack
333 .list_flows()
334 .await
335 .context("failed to enumerate flows")?;
336 let entry_flow_id = resolve_entry_flow(opts.entry_flow.clone(), pack.metadata(), &flows)?;
337 recorder.set_flow_id(&entry_flow_id);
338
339 let engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&host_config))
340 .await
341 .context("failed to prime flow engine")?;
342
343 let started_at = OffsetDateTime::now_utc();
344 let tenant_str = host_config.tenant.clone();
345 let session_id_owned = resolved_profile.session_id.clone();
346 let provider_id_owned = resolved_profile.provider_id.clone();
347 let recorder_ref: &RunRecorder = &recorder;
348 let mock_ref: &MockLayer = &mock_layer;
349 let ctx = FlowContext {
350 tenant: &tenant_str,
351 pack_id: pack.metadata().pack_id.as_str(),
352 flow_id: &entry_flow_id,
353 node_id: None,
354 tool: None,
355 action: Some("run_pack"),
356 session_id: Some(session_id_owned.as_str()),
357 provider_id: Some(provider_id_owned.as_str()),
358 retry_config: host_config.retry_config().into(),
359 observer: Some(recorder_ref),
360 mocks: Some(mock_ref),
361 };
362
363 let execution = engine.execute(ctx, opts.input.clone()).await;
364 let finished_at = OffsetDateTime::now_utc();
365
366 let status = match execution {
367 Ok(result) => match result.status {
368 greentic_runner_host::runner::engine::FlowStatus::Completed => RunCompletion::Ok,
369 greentic_runner_host::runner::engine::FlowStatus::Waiting(wait) => {
370 let reason = wait
371 .reason
372 .unwrap_or_else(|| "flow paused unexpectedly".to_string());
373 RunCompletion::Err(anyhow::anyhow!(reason))
374 }
375 },
376 Err(err) => RunCompletion::Err(err),
377 };
378
379 let result = recorder.finalise(status, started_at, finished_at)?;
380
381 let run_json_path = directories.root.join("run.json");
382 fs::write(&run_json_path, serde_json::to_vec_pretty(&result)?)
383 .with_context(|| format!("failed to write run summary {}", run_json_path.display()))?;
384
385 Ok(result)
386}
387
388fn apply_otlp_hook(hook: &OtlpHook) {
389 info!(
390 endpoint = %hook.endpoint,
391 sample_all = hook.sample_all,
392 headers = %hook.headers.len(),
393 "OTLP hook requested (set OTEL_* env vars before invoking run_pack)"
394 );
395}
396
397fn normalize_pack_path(path: &Path) -> Result<PathBuf> {
398 if path.is_absolute() {
399 let parent = path
400 .parent()
401 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
402 let root = parent
403 .canonicalize()
404 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
405 let file = path
406 .file_name()
407 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
408 return normalize_under_root(&root, Path::new(file));
409 }
410
411 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
412 let base = if let Some(parent) = path.parent() {
413 cwd.join(parent)
414 } else {
415 cwd
416 };
417 let root = base
418 .canonicalize()
419 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
420 let file = path
421 .file_name()
422 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
423 normalize_under_root(&root, Path::new(file))
424}
425
426fn prepare_run_dirs(root_override: Option<PathBuf>) -> Result<RunDirectories> {
427 let root = if let Some(dir) = root_override {
428 dir
429 } else {
430 let timestamp = OffsetDateTime::now_utc()
431 .format(&Rfc3339)
432 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
433 .replace(':', "-");
434 let short_id = Uuid::new_v4()
435 .to_string()
436 .chars()
437 .take(6)
438 .collect::<String>();
439 PathBuf::from(".greentic")
440 .join("runs")
441 .join(format!("{timestamp}_{short_id}"))
442 };
443
444 fs::create_dir_all(&root).with_context(|| format!("failed to create {}", root.display()))?;
445 let logs = root.join("logs");
446 let resolved = root.join("resolved_config");
447 fs::create_dir_all(&logs).with_context(|| format!("failed to create {}", logs.display()))?;
448 fs::create_dir_all(&resolved)
449 .with_context(|| format!("failed to create {}", resolved.display()))?;
450
451 Ok(RunDirectories {
452 root,
453 logs,
454 resolved,
455 })
456}
457
458fn resolve_entry_flow(
459 override_id: Option<String>,
460 metadata: &PackMetadata,
461 flows: &[FlowDescriptor],
462) -> Result<String> {
463 if let Some(flow) = override_id {
464 return Ok(flow);
465 }
466 if let Some(first) = metadata.entry_flows.first() {
467 return Ok(first.clone());
468 }
469 flows
470 .first()
471 .map(|f| f.id.clone())
472 .ok_or_else(|| anyhow!("pack does not declare any flows"))
473}
474
475fn is_signature_error(message: &str) -> bool {
476 message.to_ascii_lowercase().contains("signature")
477}
478
479fn to_reader_policy(policy: SigningPolicy) -> greentic_pack::reader::SigningPolicy {
480 match policy {
481 SigningPolicy::Strict => greentic_pack::reader::SigningPolicy::Strict,
482 SigningPolicy::DevOk => greentic_pack::reader::SigningPolicy::DevOk,
483 }
484}
485
486fn resolve_profile(profile: &Profile, ctx: &TenantContext) -> ResolvedProfile {
487 match profile {
488 Profile::Dev(dev) => ResolvedProfile {
489 tenant_id: ctx
490 .tenant_id
491 .clone()
492 .unwrap_or_else(|| dev.tenant_id.clone()),
493 team_id: ctx.team_id.clone().unwrap_or_else(|| dev.team_id.clone()),
494 user_id: ctx.user_id.clone().unwrap_or_else(|| dev.user_id.clone()),
495 session_id: ctx
496 .session_id
497 .clone()
498 .unwrap_or_else(|| Uuid::new_v4().to_string()),
499 provider_id: PROVIDER_ID_DEV.to_string(),
500 max_node_wall_time_ms: dev.max_node_wall_time_ms,
501 max_run_wall_time_ms: dev.max_run_wall_time_ms,
502 },
503 }
504}
505
506fn build_host_config(profile: &ResolvedProfile, dirs: &RunDirectories) -> HostConfig {
507 HostConfig {
508 tenant: profile.tenant_id.clone(),
509 bindings_path: dirs.resolved.join("dev.bindings.yaml"),
510 flow_type_bindings: HashMap::new(),
511 rate_limits: RateLimits::default(),
512 retry: FlowRetryConfig::default(),
513 http_enabled: false,
514 secrets_policy: SecretsPolicy::allow_all(),
515 state_store_policy: StateStorePolicy::default(),
516 webhook_policy: WebhookPolicy::default(),
517 timers: Vec::new(),
518 oauth: None,
519 mocks: None,
520 pack_bindings: Vec::new(),
521 env_passthrough: Vec::new(),
522 trace: TraceConfig::from_env(),
523 validation: ValidationConfig::from_env(),
524 }
525}
526
527#[allow(dead_code)]
528#[derive(Clone, Debug)]
529struct ResolvedProfile {
530 tenant_id: String,
531 team_id: String,
532 user_id: String,
533 session_id: String,
534 provider_id: String,
535 max_node_wall_time_ms: u64,
536 max_run_wall_time_ms: u64,
537}
538
539#[derive(Clone, Debug, Serialize, Deserialize)]
540pub struct RunResult {
541 pub session_id: String,
542 pub pack_id: String,
543 pub pack_version: String,
544 pub flow_id: String,
545 pub started_at_utc: String,
546 pub finished_at_utc: String,
547 pub status: RunStatus,
548 pub error: Option<String>,
549 pub node_summaries: Vec<NodeSummary>,
550 pub failures: BTreeMap<String, NodeFailure>,
551 pub artifacts_dir: PathBuf,
552}
553
554#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
555pub enum RunStatus {
556 Success,
557 PartialFailure,
558 Failure,
559}
560
561#[derive(Clone, Debug, Serialize, Deserialize)]
562pub struct NodeSummary {
563 pub node_id: String,
564 pub component: String,
565 pub status: NodeStatus,
566 pub duration_ms: u64,
567}
568
569#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
570pub enum NodeStatus {
571 Ok,
572 Skipped,
573 Error,
574}
575
576#[derive(Clone, Debug, Serialize, Deserialize)]
577pub struct NodeFailure {
578 pub code: String,
579 pub message: String,
580 pub details: Value,
581 pub transcript_offsets: (u64, u64),
582 pub log_paths: Vec<PathBuf>,
583}
584
585#[derive(Clone)]
586struct RunDirectories {
587 root: PathBuf,
588 logs: PathBuf,
589 resolved: PathBuf,
590}
591
592enum RunCompletion {
593 Ok,
594 Err(anyhow::Error),
595}
596
597struct RunRecorder {
598 directories: RunDirectories,
599 profile: ResolvedProfile,
600 flow_id: Mutex<String>,
601 pack_meta: Mutex<PackMetadata>,
602 transcript: Mutex<TranscriptWriter>,
603 state: Mutex<RunRecorderState>,
604}
605
606impl RunRecorder {
607 fn new(
608 dirs: RunDirectories,
609 profile: &ResolvedProfile,
610 flow_id: Option<String>,
611 pack_meta: PackMetadata,
612 hook: Option<TranscriptHook>,
613 ) -> Result<Self> {
614 let transcript_path = dirs.root.join("transcript.jsonl");
615 let file = File::create(&transcript_path)
616 .with_context(|| format!("failed to open {}", transcript_path.display()))?;
617 Ok(Self {
618 directories: dirs,
619 profile: profile.clone(),
620 flow_id: Mutex::new(flow_id.unwrap_or_else(|| "unknown".into())),
621 pack_meta: Mutex::new(pack_meta),
622 transcript: Mutex::new(TranscriptWriter::new(BufWriter::new(file), hook)),
623 state: Mutex::new(RunRecorderState::default()),
624 })
625 }
626
627 fn finalise(
628 &self,
629 completion: RunCompletion,
630 started_at: OffsetDateTime,
631 finished_at: OffsetDateTime,
632 ) -> Result<RunResult> {
633 let status = match &completion {
634 RunCompletion::Ok => RunStatus::Success,
635 RunCompletion::Err(_) => RunStatus::Failure,
636 };
637
638 let mut runtime_error = None;
639 if let RunCompletion::Err(err) = completion {
640 warn!(error = %err, "pack execution failed");
641 eprintln!("pack execution failed: {err}");
642 runtime_error = Some(err.to_string());
643 }
644
645 let started = started_at
646 .format(&Rfc3339)
647 .unwrap_or_else(|_| started_at.to_string());
648 let finished = finished_at
649 .format(&Rfc3339)
650 .unwrap_or_else(|_| finished_at.to_string());
651
652 let state = self.state.lock();
653 let mut summaries = Vec::new();
654 let mut failures = BTreeMap::new();
655 for node_id in &state.order {
656 if let Some(record) = state.nodes.get(node_id) {
657 let duration = record.duration_ms.unwrap_or(0);
658 summaries.push(NodeSummary {
659 node_id: node_id.clone(),
660 component: record.component.clone(),
661 status: record.status.clone(),
662 duration_ms: duration,
663 });
664 if record.status == NodeStatus::Error {
665 let start_offset = record.transcript_start.unwrap_or(0);
666 let err_offset = record.transcript_error.unwrap_or(start_offset);
667 failures.insert(
668 node_id.clone(),
669 NodeFailure {
670 code: record
671 .failure_code
672 .clone()
673 .unwrap_or_else(|| "component-failed".into()),
674 message: record
675 .failure_message
676 .clone()
677 .unwrap_or_else(|| "node failed".into()),
678 details: record
679 .failure_details
680 .clone()
681 .unwrap_or_else(|| json!({ "node": node_id })),
682 transcript_offsets: (start_offset, err_offset),
683 log_paths: record.log_paths.clone(),
684 },
685 );
686 }
687 }
688 }
689
690 let mut final_status = status;
691 if final_status == RunStatus::Success && !failures.is_empty() {
692 final_status = RunStatus::PartialFailure;
693 }
694
695 if let Some(error) = &runtime_error {
696 failures.insert(
697 "_runtime".into(),
698 NodeFailure {
699 code: "runtime-error".into(),
700 message: error.clone(),
701 details: json!({ "stage": "execute" }),
702 transcript_offsets: (0, 0),
703 log_paths: Vec::new(),
704 },
705 );
706 }
707
708 let pack_meta = self.pack_meta.lock();
709 let flow_id = self.flow_id.lock().clone();
710 Ok(RunResult {
711 session_id: self.profile.session_id.clone(),
712 pack_id: pack_meta.pack_id.clone(),
713 pack_version: pack_meta.version.clone(),
714 flow_id,
715 started_at_utc: started,
716 finished_at_utc: finished,
717 status: final_status,
718 error: runtime_error,
719 node_summaries: summaries,
720 failures,
721 artifacts_dir: self.directories.root.clone(),
722 })
723 }
724
725 fn set_flow_id(&self, flow_id: &str) {
726 *self.flow_id.lock() = flow_id.to_string();
727 }
728
729 fn update_pack_metadata(&self, meta: PackMetadata) {
730 *self.pack_meta.lock() = meta;
731 }
732
733 fn current_flow_id(&self) -> String {
734 self.flow_id.lock().clone()
735 }
736
737 fn record_verify_event(&self, status: &str, message: &str) -> Result<()> {
738 let timestamp = OffsetDateTime::now_utc();
739 let event = json!({
740 "ts": timestamp
741 .format(&Rfc3339)
742 .unwrap_or_else(|_| timestamp.to_string()),
743 "session_id": self.profile.session_id,
744 "flow_id": self.current_flow_id(),
745 "node_id": Value::Null,
746 "component": "verify.pack",
747 "phase": "verify",
748 "status": status,
749 "inputs": Value::Null,
750 "outputs": Value::Null,
751 "error": json!({ "message": message }),
752 "metrics": Value::Null,
753 "schema_id": Value::Null,
754 "defaults_applied": Value::Null,
755 "redactions": Value::Array(Vec::new()),
756 });
757 self.transcript.lock().write(&event).map(|_| ())
758 }
759
760 fn write_mock_event(&self, capability: &str, provider: &str, payload: Value) -> Result<()> {
761 let timestamp = OffsetDateTime::now_utc();
762 let event = json!({
763 "ts": timestamp
764 .format(&Rfc3339)
765 .unwrap_or_else(|_| timestamp.to_string()),
766 "session_id": self.profile.session_id,
767 "flow_id": self.current_flow_id(),
768 "node_id": Value::Null,
769 "component": format!("mock::{capability}"),
770 "phase": "mock",
771 "status": "ok",
772 "inputs": json!({ "capability": capability, "provider": provider }),
773 "outputs": payload,
774 "error": Value::Null,
775 "metrics": Value::Null,
776 "schema_id": Value::Null,
777 "defaults_applied": Value::Null,
778 "redactions": Value::Array(Vec::new()),
779 });
780 self.transcript.lock().write(&event).map(|_| ())
781 }
782
783 fn handle_node_start(&self, event: &NodeEvent<'_>) -> Result<()> {
784 let timestamp = OffsetDateTime::now_utc();
785 let (redacted_payload, redactions) = redact_value(event.payload, "$.inputs.payload");
786 let inputs = json!({
787 "payload": redacted_payload,
788 "context": {
789 "tenant_id": self.profile.tenant_id.as_str(),
790 "team_id": self.profile.team_id.as_str(),
791 "user_id": self.profile.user_id.as_str(),
792 }
793 });
794 let flow_id = self.current_flow_id();
795 let event_json = build_transcript_event(TranscriptEventArgs {
796 profile: &self.profile,
797 flow_id: &flow_id,
798 node_id: event.node_id,
799 component: &event.node.component,
800 phase: "start",
801 status: "ok",
802 timestamp,
803 inputs,
804 outputs: Value::Null,
805 error: Value::Null,
806 redactions,
807 });
808 let (start_offset, _) = self.transcript.lock().write(&event_json)?;
809
810 let mut state = self.state.lock();
811 let node_key = event.node_id.to_string();
812 if !state.order.iter().any(|id| id == &node_key) {
813 state.order.push(node_key.clone());
814 }
815 let entry = state.nodes.entry(node_key).or_insert_with(|| {
816 NodeExecutionRecord::new(event.node.component.clone(), &self.directories)
817 });
818 entry.start_instant = Some(Instant::now());
819 entry.status = NodeStatus::Ok;
820 entry.transcript_start = Some(start_offset);
821 Ok(())
822 }
823
824 fn handle_node_end(&self, event: &NodeEvent<'_>, output: &Value) -> Result<()> {
825 let timestamp = OffsetDateTime::now_utc();
826 let (redacted_output, redactions) = redact_value(output, "$.outputs");
827 let flow_id = self.current_flow_id();
828 let event_json = build_transcript_event(TranscriptEventArgs {
829 profile: &self.profile,
830 flow_id: &flow_id,
831 node_id: event.node_id,
832 component: &event.node.component,
833 phase: "end",
834 status: "ok",
835 timestamp,
836 inputs: Value::Null,
837 outputs: redacted_output,
838 error: Value::Null,
839 redactions,
840 });
841 self.transcript.lock().write(&event_json)?;
842
843 let mut state = self.state.lock();
844 if let Some(entry) = state.nodes.get_mut(event.node_id)
845 && let Some(started) = entry.start_instant.take()
846 {
847 entry.duration_ms = Some(started.elapsed().as_millis() as u64);
848 }
849 Ok(())
850 }
851
852 fn handle_node_error(
853 &self,
854 event: &NodeEvent<'_>,
855 error: &dyn std::error::Error,
856 ) -> Result<()> {
857 let timestamp = OffsetDateTime::now_utc();
858 let error_message = error.to_string();
859 let error_json = json!({
860 "code": "component-failed",
861 "message": error_message,
862 "details": {
863 "node": event.node_id,
864 }
865 });
866 let flow_id = self.current_flow_id();
867 let event_json = build_transcript_event(TranscriptEventArgs {
868 profile: &self.profile,
869 flow_id: &flow_id,
870 node_id: event.node_id,
871 component: &event.node.component,
872 phase: "error",
873 status: "error",
874 timestamp,
875 inputs: Value::Null,
876 outputs: Value::Null,
877 error: error_json.clone(),
878 redactions: Vec::new(),
879 });
880 let (_, end_offset) = self.transcript.lock().write(&event_json)?;
881
882 let mut state = self.state.lock();
883 if let Some(entry) = state.nodes.get_mut(event.node_id) {
884 entry.status = NodeStatus::Error;
885 if let Some(started) = entry.start_instant.take() {
886 entry.duration_ms = Some(started.elapsed().as_millis() as u64);
887 }
888 entry.transcript_error = Some(end_offset);
889 entry.failure_code = Some("component-failed".to_string());
890 entry.failure_message = Some(error_message.clone());
891 entry.failure_details = Some(error_json);
892 let log_path = self
893 .directories
894 .logs
895 .join(format!("{}.stderr.log", sanitize_id(event.node_id)));
896 if let Ok(mut file) = File::create(&log_path) {
897 let _ = writeln!(file, "{error_message}");
898 }
899 entry.log_paths.push(log_path);
900 }
901 Ok(())
902 }
903}
904
905impl ExecutionObserver for RunRecorder {
906 fn on_node_start(&self, event: &NodeEvent<'_>) {
907 if let Err(err) = self.handle_node_start(event) {
908 warn!(node = event.node_id, error = %err, "failed to record node start");
909 }
910 }
911
912 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
913 if let Err(err) = self.handle_node_end(event, output) {
914 warn!(node = event.node_id, error = %err, "failed to record node end");
915 }
916 }
917
918 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
919 if let Err(err) = self.handle_node_error(event, error) {
920 warn!(node = event.node_id, error = %err, "failed to record node error");
921 }
922 }
923}
924
925impl MockEventSink for RunRecorder {
926 fn on_mock_event(&self, capability: &str, provider: &str, payload: &Value) {
927 if let Err(err) = self.write_mock_event(capability, provider, payload.clone()) {
928 warn!(?capability, ?provider, error = %err, "failed to record mock event");
929 }
930 }
931}
932
933#[derive(Default)]
934struct RunRecorderState {
935 nodes: BTreeMap<String, NodeExecutionRecord>,
936 order: Vec<String>,
937}
938
939#[derive(Clone)]
940struct NodeExecutionRecord {
941 component: String,
942 status: NodeStatus,
943 duration_ms: Option<u64>,
944 transcript_start: Option<u64>,
945 transcript_error: Option<u64>,
946 log_paths: Vec<PathBuf>,
947 failure_code: Option<String>,
948 failure_message: Option<String>,
949 failure_details: Option<Value>,
950 start_instant: Option<Instant>,
951}
952
953impl NodeExecutionRecord {
954 fn new(component: String, _dirs: &RunDirectories) -> Self {
955 Self {
956 component,
957 status: NodeStatus::Ok,
958 duration_ms: None,
959 transcript_start: None,
960 transcript_error: None,
961 log_paths: Vec::new(),
962 failure_code: None,
963 failure_message: None,
964 failure_details: None,
965 start_instant: None,
966 }
967 }
968}
969
970struct TranscriptWriter {
971 writer: BufWriter<File>,
972 offset: u64,
973 hook: Option<TranscriptHook>,
974}
975
976impl TranscriptWriter {
977 fn new(writer: BufWriter<File>, hook: Option<TranscriptHook>) -> Self {
978 Self {
979 writer,
980 offset: 0,
981 hook,
982 }
983 }
984
985 fn write(&mut self, value: &Value) -> Result<(u64, u64)> {
986 let line = serde_json::to_vec(value)?;
987 let start = self.offset;
988 self.writer.write_all(&line)?;
989 self.writer.write_all(b"\n")?;
990 self.writer.flush()?;
991 self.offset += line.len() as u64 + 1;
992 if let Some(hook) = &self.hook {
993 hook(value);
994 }
995 Ok((start, self.offset))
996 }
997}
998
999struct TranscriptEventArgs<'a> {
1000 profile: &'a ResolvedProfile,
1001 flow_id: &'a str,
1002 node_id: &'a str,
1003 component: &'a str,
1004 phase: &'a str,
1005 status: &'a str,
1006 timestamp: OffsetDateTime,
1007 inputs: Value,
1008 outputs: Value,
1009 error: Value,
1010 redactions: Vec<String>,
1011}
1012
1013fn build_transcript_event(args: TranscriptEventArgs<'_>) -> Value {
1014 let ts = args
1015 .timestamp
1016 .format(&Rfc3339)
1017 .unwrap_or_else(|_| args.timestamp.to_string());
1018 json!({
1019 "ts": ts,
1020 "session_id": args.profile.session_id.as_str(),
1021 "flow_id": args.flow_id,
1022 "node_id": args.node_id,
1023 "component": args.component,
1024 "phase": args.phase,
1025 "status": args.status,
1026 "inputs": args.inputs,
1027 "outputs": args.outputs,
1028 "error": args.error,
1029 "metrics": {
1030 "duration_ms": null,
1031 "cpu_time_ms": null,
1032 "mem_peak_bytes": null,
1033 },
1034 "schema_id": Value::Null,
1035 "defaults_applied": Value::Array(Vec::new()),
1036 "redactions": args.redactions,
1037 })
1038}
1039
1040fn redact_value(value: &Value, base: &str) -> (Value, Vec<String>) {
1041 let mut paths = Vec::new();
1042 let redacted = redact_recursive(value, base, &mut paths);
1043 (redacted, paths)
1044}
1045
1046fn redact_recursive(value: &Value, path: &str, acc: &mut Vec<String>) -> Value {
1047 match value {
1048 Value::Object(map) => {
1049 let mut new_map = JsonMap::new();
1050 for (key, val) in map {
1051 let child_path = format!("{path}.{key}");
1052 if is_sensitive_key(key) {
1053 acc.push(child_path);
1054 new_map.insert(key.clone(), Value::String("__REDACTED__".into()));
1055 } else {
1056 new_map.insert(key.clone(), redact_recursive(val, &child_path, acc));
1057 }
1058 }
1059 Value::Object(new_map)
1060 }
1061 Value::Array(items) => {
1062 let mut new_items = Vec::new();
1063 for (idx, item) in items.iter().enumerate() {
1064 let child_path = format!("{path}[{idx}]");
1065 new_items.push(redact_recursive(item, &child_path, acc));
1066 }
1067 Value::Array(new_items)
1068 }
1069 other => other.clone(),
1070 }
1071}
1072
1073fn is_sensitive_key(key: &str) -> bool {
1074 let lower = key.to_ascii_lowercase();
1075 const MARKERS: [&str; 5] = ["secret", "token", "password", "authorization", "cookie"];
1076 MARKERS.iter().any(|marker| lower.contains(marker))
1077}
1078
1079fn sanitize_id(value: &str) -> String {
1080 value
1081 .chars()
1082 .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
1083 .collect()
1084}