1use anyhow::{Context, Result, anyhow};
2use greentic_pack::reader::{PackLoad, open_pack};
3use greentic_runner_host::config::{
4 HostConfig, McpConfig, McpRetryConfig, RateLimits, SecretsPolicy, WebhookPolicy,
5};
6use greentic_runner_host::pack::{FlowDescriptor, PackMetadata, PackRuntime};
7use greentic_runner_host::runner::engine::{ExecutionObserver, FlowContext, FlowEngine, NodeEvent};
8pub use greentic_runner_host::runner::mocks::{
9 HttpMock, HttpMockMode, KvMock, MocksConfig, SecretsMock, TelemetryMock, TimeMock, ToolsMock,
10};
11use greentic_runner_host::runner::mocks::{MockEventSink, MockLayer};
12use parking_lot::Mutex;
13use serde::{Deserialize, Serialize};
14use serde_json::{Map as JsonMap, Value, json};
15use serde_yaml_bw::{Mapping as YamlMapping, Value as YamlValue};
16use std::collections::{BTreeMap, HashMap};
17use std::fmt;
18use std::fs::{self, File};
19use std::io::{BufWriter, Write};
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::time::Instant;
23use time::OffsetDateTime;
24use time::format_description::well_known::Rfc3339;
25use tokio::runtime::Runtime;
26use tracing::{info, warn};
27use uuid::Uuid;
28use zip::ZipArchive;
29
30const PROVIDER_ID_DEV: &str = "greentic-dev";
31
32pub type TranscriptHook = Arc<dyn Fn(&Value) + Send + Sync>;
34
35#[derive(Clone, Debug, Serialize, Deserialize, Default)]
37pub struct OtlpHook {
38 pub endpoint: String,
39 #[serde(default)]
40 pub headers: Vec<(String, String)>,
41 #[serde(default)]
42 pub sample_all: bool,
43}
44
45#[derive(Clone, Debug)]
47#[non_exhaustive]
48pub enum Profile {
49 Dev(DevProfile),
50}
51
52impl Default for Profile {
53 fn default() -> Self {
54 Self::Dev(DevProfile::default())
55 }
56}
57
58#[derive(Clone, Debug)]
60pub struct DevProfile {
61 pub tenant_id: String,
62 pub team_id: String,
63 pub user_id: String,
64 pub max_node_wall_time_ms: u64,
65 pub max_run_wall_time_ms: u64,
66}
67
68impl Default for DevProfile {
69 fn default() -> Self {
70 Self {
71 tenant_id: "local-dev".to_string(),
72 team_id: "default".to_string(),
73 user_id: "developer".to_string(),
74 max_node_wall_time_ms: 30_000,
75 max_run_wall_time_ms: 600_000,
76 }
77 }
78}
79
80#[derive(Clone, Debug, Default)]
82pub struct TenantContext {
83 pub tenant_id: Option<String>,
84 pub team_id: Option<String>,
85 pub user_id: Option<String>,
86 pub session_id: Option<String>,
87}
88
89impl TenantContext {
90 pub fn default_local() -> Self {
91 Self {
92 tenant_id: Some("local-dev".into()),
93 team_id: Some("default".into()),
94 user_id: Some("developer".into()),
95 session_id: None,
96 }
97 }
98}
99
100#[derive(Clone)]
102pub struct RunOptions {
103 pub profile: Profile,
104 pub entry_flow: Option<String>,
105 pub input: Value,
106 pub ctx: TenantContext,
107 pub transcript: Option<TranscriptHook>,
108 pub otlp: Option<OtlpHook>,
109 pub mocks: MocksConfig,
110 pub artifacts_dir: Option<PathBuf>,
111 pub signing: SigningPolicy,
112}
113
114impl Default for RunOptions {
115 fn default() -> Self {
116 desktop_defaults()
117 }
118}
119
120impl fmt::Debug for RunOptions {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 f.debug_struct("RunOptions")
123 .field("profile", &self.profile)
124 .field("entry_flow", &self.entry_flow)
125 .field("input", &self.input)
126 .field("ctx", &self.ctx)
127 .field("transcript", &self.transcript.is_some())
128 .field("otlp", &self.otlp)
129 .field("mocks", &self.mocks)
130 .field("artifacts_dir", &self.artifacts_dir)
131 .field("signing", &self.signing)
132 .finish()
133 }
134}
135
136#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
137pub enum SigningPolicy {
138 Strict,
139 DevOk,
140}
141
142#[derive(Clone, Debug)]
144pub struct Runner {
145 base: RunOptions,
146}
147
148impl Runner {
149 pub fn new() -> Self {
150 Self {
151 base: desktop_defaults(),
152 }
153 }
154
155 pub fn profile(mut self, profile: Profile) -> Self {
156 self.base.profile = profile;
157 self
158 }
159
160 pub fn with_mocks(mut self, mocks: MocksConfig) -> Self {
161 self.base.mocks = mocks;
162 self
163 }
164
165 pub fn configure(mut self, f: impl FnOnce(&mut RunOptions)) -> Self {
166 f(&mut self.base);
167 self
168 }
169
170 pub fn run_pack<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
171 run_pack_with_options(pack_path, self.base.clone())
172 }
173
174 pub fn run_pack_with<P: AsRef<Path>>(
175 &self,
176 pack_path: P,
177 f: impl FnOnce(&mut RunOptions),
178 ) -> Result<RunResult> {
179 let mut opts = self.base.clone();
180 f(&mut opts);
181 run_pack_with_options(pack_path, opts)
182 }
183}
184
185impl Default for Runner {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191pub fn run_pack_with_options<P: AsRef<Path>>(pack_path: P, opts: RunOptions) -> Result<RunResult> {
193 let runtime = Runtime::new().context("failed to create tokio runtime")?;
194 runtime.block_on(run_pack_async(pack_path.as_ref(), opts))
195}
196
197pub fn desktop_defaults() -> RunOptions {
199 let otlp = std::env::var("OTLP_ENDPOINT")
200 .ok()
201 .map(|endpoint| OtlpHook {
202 endpoint,
203 headers: Vec::new(),
204 sample_all: true,
205 });
206 RunOptions {
207 profile: Profile::Dev(DevProfile::default()),
208 entry_flow: None,
209 input: json!({}),
210 ctx: TenantContext::default_local(),
211 transcript: None,
212 otlp,
213 mocks: MocksConfig::default(),
214 artifacts_dir: None,
215 signing: SigningPolicy::DevOk,
216 }
217}
218
219async fn run_pack_async(pack_path: &Path, opts: RunOptions) -> Result<RunResult> {
220 let resolved_profile = resolve_profile(&opts.profile, &opts.ctx);
221 if let Some(otlp) = &opts.otlp {
222 apply_otlp_hook(otlp);
223 }
224
225 let directories = prepare_run_dirs(opts.artifacts_dir.clone())?;
226 info!(run_dir = %directories.root.display(), "prepared desktop run directory");
227
228 let mock_layer = Arc::new(MockLayer::new(opts.mocks.clone(), &directories.root)?);
229
230 let recorder = Arc::new(RunRecorder::new(
231 directories.clone(),
232 &resolved_profile,
233 None,
234 PackMetadata::fallback(pack_path),
235 opts.transcript.clone(),
236 )?);
237
238 let mock_sink: Arc<dyn MockEventSink> = recorder.clone();
239 mock_layer.register_sink(mock_sink);
240
241 let mut pack_load: Option<PackLoad> = None;
242 match open_pack(pack_path, to_reader_policy(opts.signing)) {
243 Ok(load) => {
244 recorder.update_pack_metadata(PackMetadata::from_manifest(&load.manifest));
245 pack_load = Some(load);
246 }
247 Err(err) => {
248 recorder.record_verify_event("error", &err.message)?;
249 if opts.signing == SigningPolicy::DevOk && is_signature_error(&err.message) {
250 warn!(error = %err.message, "continuing despite signature error (dev policy)");
251 } else {
252 return Err(anyhow!("pack verification failed: {}", err.message));
253 }
254 }
255 }
256
257 let host_config = Arc::new(build_host_config(&resolved_profile, &directories));
258 let component_artifact =
259 resolve_component_artifact(pack_path, pack_load.as_ref(), &directories)?;
260 let archive_source = if pack_path
261 .extension()
262 .and_then(|ext| ext.to_str())
263 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
264 .unwrap_or(false)
265 {
266 Some(pack_path)
267 } else {
268 None
269 };
270
271 let pack = Arc::new(
272 PackRuntime::load(
273 &component_artifact,
274 Arc::clone(&host_config),
275 Some(Arc::clone(&mock_layer)),
276 archive_source,
277 )
278 .await
279 .with_context(|| format!("failed to load pack {}", component_artifact.display()))?,
280 );
281 recorder.update_pack_metadata(pack.metadata().clone());
282
283 let flows = pack
284 .list_flows()
285 .await
286 .context("failed to enumerate flows")?;
287 let entry_flow_id = resolve_entry_flow(opts.entry_flow.clone(), pack.metadata(), &flows)?;
288 recorder.set_flow_id(&entry_flow_id);
289
290 let engine = FlowEngine::new(Arc::clone(&pack), Arc::clone(&host_config))
291 .await
292 .context("failed to prime flow engine")?;
293
294 let started_at = OffsetDateTime::now_utc();
295 let tenant_str = host_config.tenant.clone();
296 let session_id_owned = resolved_profile.session_id.clone();
297 let provider_id_owned = resolved_profile.provider_id.clone();
298 let recorder_ref: &RunRecorder = &recorder;
299 let mock_ref: &MockLayer = &mock_layer;
300 let ctx = FlowContext {
301 tenant: &tenant_str,
302 flow_id: &entry_flow_id,
303 node_id: None,
304 tool: None,
305 action: Some("run_pack"),
306 session_id: Some(session_id_owned.as_str()),
307 provider_id: Some(provider_id_owned.as_str()),
308 retry_config: host_config.mcp_retry_config().into(),
309 observer: Some(recorder_ref),
310 mocks: Some(mock_ref),
311 };
312
313 let execution = engine.execute(ctx, opts.input.clone()).await;
314 let finished_at = OffsetDateTime::now_utc();
315
316 let status = match execution {
317 Ok(_) => RunCompletion::Ok,
318 Err(err) => RunCompletion::Err(err),
319 };
320
321 let result = recorder.finalise(status, started_at, finished_at)?;
322
323 let run_json_path = directories.root.join("run.json");
324 fs::write(&run_json_path, serde_json::to_vec_pretty(&result)?)
325 .with_context(|| format!("failed to write run summary {}", run_json_path.display()))?;
326
327 Ok(result)
328}
329
330fn apply_otlp_hook(hook: &OtlpHook) {
331 info!(
332 endpoint = %hook.endpoint,
333 sample_all = hook.sample_all,
334 headers = %hook.headers.len(),
335 "OTLP hook requested (set OTEL_* env vars before invoking run_pack)"
336 );
337}
338
339fn prepare_run_dirs(root_override: Option<PathBuf>) -> Result<RunDirectories> {
340 let root = if let Some(dir) = root_override {
341 dir
342 } else {
343 let timestamp = OffsetDateTime::now_utc()
344 .format(&Rfc3339)
345 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
346 .replace(':', "-");
347 let short_id = Uuid::new_v4()
348 .to_string()
349 .chars()
350 .take(6)
351 .collect::<String>();
352 PathBuf::from(".greentic")
353 .join("runs")
354 .join(format!("{timestamp}_{short_id}"))
355 };
356
357 fs::create_dir_all(&root).with_context(|| format!("failed to create {}", root.display()))?;
358 let logs = root.join("logs");
359 let resolved = root.join("resolved_config");
360 fs::create_dir_all(&logs).with_context(|| format!("failed to create {}", logs.display()))?;
361 fs::create_dir_all(&resolved)
362 .with_context(|| format!("failed to create {}", resolved.display()))?;
363
364 Ok(RunDirectories {
365 root,
366 logs,
367 resolved,
368 })
369}
370
371fn resolve_entry_flow(
372 override_id: Option<String>,
373 metadata: &PackMetadata,
374 flows: &[FlowDescriptor],
375) -> Result<String> {
376 if let Some(flow) = override_id {
377 return Ok(flow);
378 }
379 if let Some(first) = metadata.entry_flows.first() {
380 return Ok(first.clone());
381 }
382 flows
383 .first()
384 .map(|f| f.id.clone())
385 .ok_or_else(|| anyhow!("pack does not declare any flows"))
386}
387
388fn is_signature_error(message: &str) -> bool {
389 message.to_ascii_lowercase().contains("signature")
390}
391
392fn to_reader_policy(policy: SigningPolicy) -> greentic_pack::reader::SigningPolicy {
393 match policy {
394 SigningPolicy::Strict => greentic_pack::reader::SigningPolicy::Strict,
395 SigningPolicy::DevOk => greentic_pack::reader::SigningPolicy::DevOk,
396 }
397}
398
399fn resolve_component_artifact(
400 pack_path: &Path,
401 load: Option<&PackLoad>,
402 dirs: &RunDirectories,
403) -> Result<PathBuf> {
404 if pack_path
405 .extension()
406 .and_then(|ext| ext.to_str())
407 .map(|ext| ext.eq_ignore_ascii_case("wasm"))
408 .unwrap_or(false)
409 {
410 return Ok(pack_path.to_path_buf());
411 }
412
413 let entry = find_component_entry(pack_path, load)?;
414 let file =
415 File::open(pack_path).with_context(|| format!("failed to open {}", pack_path.display()))?;
416 let mut archive = ZipArchive::new(file)
417 .with_context(|| format!("{} is not a valid gtpack", pack_path.display()))?;
418 let mut component = archive
419 .by_name(&entry)
420 .with_context(|| format!("component {entry} missing from pack"))?;
421 let out_path = dirs.root.join("component.wasm");
422 let mut out_file = File::create(&out_path)
423 .with_context(|| format!("failed to create {}", out_path.display()))?;
424 std::io::copy(&mut component, &mut out_file)
425 .with_context(|| format!("failed to extract component {entry}"))?;
426 Ok(out_path)
427}
428
429fn find_component_entry(pack_path: &Path, load: Option<&PackLoad>) -> Result<String> {
430 if let Some(load) = load
431 && let Some(component) = load.manifest.components.first()
432 {
433 return Ok(component.file_wasm.clone());
434 }
435
436 let file =
437 File::open(pack_path).with_context(|| format!("failed to open {}", pack_path.display()))?;
438 let mut archive = ZipArchive::new(file)
439 .with_context(|| format!("{} is not a valid gtpack", pack_path.display()))?;
440 for index in 0..archive.len() {
441 let entry = archive
442 .by_index(index)
443 .with_context(|| format!("failed to read entry #{index}"))?;
444 if entry.name().ends_with(".wasm") {
445 return Ok(entry.name().to_string());
446 }
447 }
448 Err(anyhow!("pack does not contain a wasm component"))
449}
450
451fn resolve_profile(profile: &Profile, ctx: &TenantContext) -> ResolvedProfile {
452 match profile {
453 Profile::Dev(dev) => ResolvedProfile {
454 tenant_id: ctx
455 .tenant_id
456 .clone()
457 .unwrap_or_else(|| dev.tenant_id.clone()),
458 team_id: ctx.team_id.clone().unwrap_or_else(|| dev.team_id.clone()),
459 user_id: ctx.user_id.clone().unwrap_or_else(|| dev.user_id.clone()),
460 session_id: ctx
461 .session_id
462 .clone()
463 .unwrap_or_else(|| Uuid::new_v4().to_string()),
464 provider_id: PROVIDER_ID_DEV.to_string(),
465 max_node_wall_time_ms: dev.max_node_wall_time_ms,
466 max_run_wall_time_ms: dev.max_run_wall_time_ms,
467 },
468 }
469}
470
471fn build_host_config(profile: &ResolvedProfile, dirs: &RunDirectories) -> HostConfig {
472 let mut store = YamlMapping::new();
473 store.insert(YamlValue::from("kind"), YamlValue::from("local-dir"));
474 store.insert(
475 YamlValue::from("path"),
476 YamlValue::from("./.greentic/tools"),
477 );
478 let mut runtime = YamlMapping::new();
479 runtime.insert(YamlValue::from("max_memory_mb"), YamlValue::from(256u64));
480 runtime.insert(
481 YamlValue::from("timeout_ms"),
482 YamlValue::from(profile.max_node_wall_time_ms),
483 );
484 runtime.insert(YamlValue::from("fuel"), YamlValue::from(50_000_000u64));
485 let mut security = YamlMapping::new();
486 security.insert(YamlValue::from("require_signature"), YamlValue::from(false));
487 HostConfig {
488 tenant: profile.tenant_id.clone(),
489 bindings_path: dirs.resolved.join("dev.bindings.yaml"),
490 flow_type_bindings: HashMap::new(),
491 mcp: McpConfig {
492 store: YamlValue::Mapping(store),
493 security: YamlValue::Mapping(security),
494 runtime: YamlValue::Mapping(runtime),
495 http_enabled: Some(false),
496 retry: Some(McpRetryConfig::default()),
497 },
498 rate_limits: RateLimits::default(),
499 http_enabled: false,
500 secrets_policy: SecretsPolicy::allow_all(),
501 webhook_policy: WebhookPolicy::default(),
502 timers: Vec::new(),
503 }
504}
505
506#[allow(dead_code)]
507#[derive(Clone, Debug)]
508struct ResolvedProfile {
509 tenant_id: String,
510 team_id: String,
511 user_id: String,
512 session_id: String,
513 provider_id: String,
514 max_node_wall_time_ms: u64,
515 max_run_wall_time_ms: u64,
516}
517
518#[derive(Clone, Debug, Serialize, Deserialize)]
519pub struct RunResult {
520 pub session_id: String,
521 pub pack_id: String,
522 pub pack_version: String,
523 pub flow_id: String,
524 pub started_at_utc: String,
525 pub finished_at_utc: String,
526 pub status: RunStatus,
527 pub node_summaries: Vec<NodeSummary>,
528 pub failures: BTreeMap<String, NodeFailure>,
529 pub artifacts_dir: PathBuf,
530}
531
532#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
533pub enum RunStatus {
534 Success,
535 PartialFailure,
536 Failure,
537}
538
539#[derive(Clone, Debug, Serialize, Deserialize)]
540pub struct NodeSummary {
541 pub node_id: String,
542 pub component: String,
543 pub status: NodeStatus,
544 pub duration_ms: u64,
545}
546
547#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
548pub enum NodeStatus {
549 Ok,
550 Skipped,
551 Error,
552}
553
554#[derive(Clone, Debug, Serialize, Deserialize)]
555pub struct NodeFailure {
556 pub code: String,
557 pub message: String,
558 pub details: Value,
559 pub transcript_offsets: (u64, u64),
560 pub log_paths: Vec<PathBuf>,
561}
562
563#[derive(Clone)]
564struct RunDirectories {
565 root: PathBuf,
566 logs: PathBuf,
567 resolved: PathBuf,
568}
569
570enum RunCompletion {
571 Ok,
572 Err(anyhow::Error),
573}
574
575struct RunRecorder {
576 directories: RunDirectories,
577 profile: ResolvedProfile,
578 flow_id: Mutex<String>,
579 pack_meta: Mutex<PackMetadata>,
580 transcript: Mutex<TranscriptWriter>,
581 state: Mutex<RunRecorderState>,
582}
583
584impl RunRecorder {
585 fn new(
586 dirs: RunDirectories,
587 profile: &ResolvedProfile,
588 flow_id: Option<String>,
589 pack_meta: PackMetadata,
590 hook: Option<TranscriptHook>,
591 ) -> Result<Self> {
592 let transcript_path = dirs.root.join("transcript.jsonl");
593 let file = File::create(&transcript_path)
594 .with_context(|| format!("failed to open {}", transcript_path.display()))?;
595 Ok(Self {
596 directories: dirs,
597 profile: profile.clone(),
598 flow_id: Mutex::new(flow_id.unwrap_or_else(|| "unknown".into())),
599 pack_meta: Mutex::new(pack_meta),
600 transcript: Mutex::new(TranscriptWriter::new(BufWriter::new(file), hook)),
601 state: Mutex::new(RunRecorderState::default()),
602 })
603 }
604
605 fn finalise(
606 &self,
607 completion: RunCompletion,
608 started_at: OffsetDateTime,
609 finished_at: OffsetDateTime,
610 ) -> Result<RunResult> {
611 let status = match &completion {
612 RunCompletion::Ok => RunStatus::Success,
613 RunCompletion::Err(_) => RunStatus::Failure,
614 };
615
616 if let RunCompletion::Err(err) = completion {
617 warn!(error = %err, "pack execution failed");
618 }
619
620 let started = started_at
621 .format(&Rfc3339)
622 .unwrap_or_else(|_| started_at.to_string());
623 let finished = finished_at
624 .format(&Rfc3339)
625 .unwrap_or_else(|_| finished_at.to_string());
626
627 let state = self.state.lock();
628 let mut summaries = Vec::new();
629 let mut failures = BTreeMap::new();
630 for node_id in &state.order {
631 if let Some(record) = state.nodes.get(node_id) {
632 let duration = record.duration_ms.unwrap_or(0);
633 summaries.push(NodeSummary {
634 node_id: node_id.clone(),
635 component: record.component.clone(),
636 status: record.status.clone(),
637 duration_ms: duration,
638 });
639 if record.status == NodeStatus::Error {
640 let start_offset = record.transcript_start.unwrap_or(0);
641 let err_offset = record.transcript_error.unwrap_or(start_offset);
642 failures.insert(
643 node_id.clone(),
644 NodeFailure {
645 code: record
646 .failure_code
647 .clone()
648 .unwrap_or_else(|| "component-failed".into()),
649 message: record
650 .failure_message
651 .clone()
652 .unwrap_or_else(|| "node failed".into()),
653 details: record
654 .failure_details
655 .clone()
656 .unwrap_or_else(|| json!({ "node": node_id })),
657 transcript_offsets: (start_offset, err_offset),
658 log_paths: record.log_paths.clone(),
659 },
660 );
661 }
662 }
663 }
664
665 let mut final_status = status;
666 if final_status == RunStatus::Success && !failures.is_empty() {
667 final_status = RunStatus::PartialFailure;
668 }
669
670 let pack_meta = self.pack_meta.lock();
671 let flow_id = self.flow_id.lock().clone();
672 Ok(RunResult {
673 session_id: self.profile.session_id.clone(),
674 pack_id: pack_meta.pack_id.clone(),
675 pack_version: pack_meta.version.clone(),
676 flow_id,
677 started_at_utc: started,
678 finished_at_utc: finished,
679 status: final_status,
680 node_summaries: summaries,
681 failures,
682 artifacts_dir: self.directories.root.clone(),
683 })
684 }
685
686 fn set_flow_id(&self, flow_id: &str) {
687 *self.flow_id.lock() = flow_id.to_string();
688 }
689
690 fn update_pack_metadata(&self, meta: PackMetadata) {
691 *self.pack_meta.lock() = meta;
692 }
693
694 fn current_flow_id(&self) -> String {
695 self.flow_id.lock().clone()
696 }
697
698 fn record_verify_event(&self, status: &str, message: &str) -> Result<()> {
699 let timestamp = OffsetDateTime::now_utc();
700 let event = json!({
701 "ts": timestamp
702 .format(&Rfc3339)
703 .unwrap_or_else(|_| timestamp.to_string()),
704 "session_id": self.profile.session_id,
705 "flow_id": self.current_flow_id(),
706 "node_id": Value::Null,
707 "component": "verify.pack",
708 "phase": "verify",
709 "status": status,
710 "inputs": Value::Null,
711 "outputs": Value::Null,
712 "error": json!({ "message": message }),
713 "metrics": Value::Null,
714 "schema_id": Value::Null,
715 "defaults_applied": Value::Null,
716 "redactions": Value::Array(Vec::new()),
717 });
718 self.transcript.lock().write(&event).map(|_| ())
719 }
720
721 fn write_mock_event(&self, capability: &str, provider: &str, payload: Value) -> Result<()> {
722 let timestamp = OffsetDateTime::now_utc();
723 let event = json!({
724 "ts": timestamp
725 .format(&Rfc3339)
726 .unwrap_or_else(|_| timestamp.to_string()),
727 "session_id": self.profile.session_id,
728 "flow_id": self.current_flow_id(),
729 "node_id": Value::Null,
730 "component": format!("mock::{capability}"),
731 "phase": "mock",
732 "status": "ok",
733 "inputs": json!({ "capability": capability, "provider": provider }),
734 "outputs": payload,
735 "error": Value::Null,
736 "metrics": Value::Null,
737 "schema_id": Value::Null,
738 "defaults_applied": Value::Null,
739 "redactions": Value::Array(Vec::new()),
740 });
741 self.transcript.lock().write(&event).map(|_| ())
742 }
743
744 fn handle_node_start(&self, event: &NodeEvent<'_>) -> Result<()> {
745 let timestamp = OffsetDateTime::now_utc();
746 let (redacted_payload, redactions) = redact_value(event.payload, "$.inputs.payload");
747 let inputs = json!({
748 "payload": redacted_payload,
749 "context": {
750 "tenant_id": self.profile.tenant_id.as_str(),
751 "team_id": self.profile.team_id.as_str(),
752 "user_id": self.profile.user_id.as_str(),
753 }
754 });
755 let flow_id = self.current_flow_id();
756 let event_json = build_transcript_event(TranscriptEventArgs {
757 profile: &self.profile,
758 flow_id: &flow_id,
759 node_id: event.node_id,
760 component: &event.node.component,
761 phase: "start",
762 status: "ok",
763 timestamp,
764 inputs,
765 outputs: Value::Null,
766 error: Value::Null,
767 redactions,
768 });
769 let (start_offset, _) = self.transcript.lock().write(&event_json)?;
770
771 let mut state = self.state.lock();
772 let node_key = event.node_id.to_string();
773 if !state.order.iter().any(|id| id == &node_key) {
774 state.order.push(node_key.clone());
775 }
776 let entry = state.nodes.entry(node_key).or_insert_with(|| {
777 NodeExecutionRecord::new(event.node.component.clone(), &self.directories)
778 });
779 entry.start_instant = Some(Instant::now());
780 entry.status = NodeStatus::Ok;
781 entry.transcript_start = Some(start_offset);
782 Ok(())
783 }
784
785 fn handle_node_end(&self, event: &NodeEvent<'_>, output: &Value) -> Result<()> {
786 let timestamp = OffsetDateTime::now_utc();
787 let (redacted_output, redactions) = redact_value(output, "$.outputs");
788 let flow_id = self.current_flow_id();
789 let event_json = build_transcript_event(TranscriptEventArgs {
790 profile: &self.profile,
791 flow_id: &flow_id,
792 node_id: event.node_id,
793 component: &event.node.component,
794 phase: "end",
795 status: "ok",
796 timestamp,
797 inputs: Value::Null,
798 outputs: redacted_output,
799 error: Value::Null,
800 redactions,
801 });
802 self.transcript.lock().write(&event_json)?;
803
804 let mut state = self.state.lock();
805 if let Some(entry) = state.nodes.get_mut(event.node_id)
806 && let Some(started) = entry.start_instant.take()
807 {
808 entry.duration_ms = Some(started.elapsed().as_millis() as u64);
809 }
810 Ok(())
811 }
812
813 fn handle_node_error(
814 &self,
815 event: &NodeEvent<'_>,
816 error: &dyn std::error::Error,
817 ) -> Result<()> {
818 let timestamp = OffsetDateTime::now_utc();
819 let error_message = error.to_string();
820 let error_json = json!({
821 "code": "component-failed",
822 "message": error_message,
823 "details": {
824 "node": event.node_id,
825 }
826 });
827 let flow_id = self.current_flow_id();
828 let event_json = build_transcript_event(TranscriptEventArgs {
829 profile: &self.profile,
830 flow_id: &flow_id,
831 node_id: event.node_id,
832 component: &event.node.component,
833 phase: "error",
834 status: "error",
835 timestamp,
836 inputs: Value::Null,
837 outputs: Value::Null,
838 error: error_json.clone(),
839 redactions: Vec::new(),
840 });
841 let (_, end_offset) = self.transcript.lock().write(&event_json)?;
842
843 let mut state = self.state.lock();
844 if let Some(entry) = state.nodes.get_mut(event.node_id) {
845 entry.status = NodeStatus::Error;
846 if let Some(started) = entry.start_instant.take() {
847 entry.duration_ms = Some(started.elapsed().as_millis() as u64);
848 }
849 entry.transcript_error = Some(end_offset);
850 entry.failure_code = Some("component-failed".to_string());
851 entry.failure_message = Some(error_message.clone());
852 entry.failure_details = Some(error_json);
853 let log_path = self
854 .directories
855 .logs
856 .join(format!("{}.stderr.log", sanitize_id(event.node_id)));
857 if let Ok(mut file) = File::create(&log_path) {
858 let _ = writeln!(file, "{error_message}");
859 }
860 entry.log_paths.push(log_path);
861 }
862 Ok(())
863 }
864}
865
866impl ExecutionObserver for RunRecorder {
867 fn on_node_start(&self, event: &NodeEvent<'_>) {
868 if let Err(err) = self.handle_node_start(event) {
869 warn!(node = event.node_id, error = %err, "failed to record node start");
870 }
871 }
872
873 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
874 if let Err(err) = self.handle_node_end(event, output) {
875 warn!(node = event.node_id, error = %err, "failed to record node end");
876 }
877 }
878
879 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
880 if let Err(err) = self.handle_node_error(event, error) {
881 warn!(node = event.node_id, error = %err, "failed to record node error");
882 }
883 }
884}
885
886impl MockEventSink for RunRecorder {
887 fn on_mock_event(&self, capability: &str, provider: &str, payload: &Value) {
888 if let Err(err) = self.write_mock_event(capability, provider, payload.clone()) {
889 warn!(?capability, ?provider, error = %err, "failed to record mock event");
890 }
891 }
892}
893
894#[derive(Default)]
895struct RunRecorderState {
896 nodes: BTreeMap<String, NodeExecutionRecord>,
897 order: Vec<String>,
898}
899
900#[derive(Clone)]
901struct NodeExecutionRecord {
902 component: String,
903 status: NodeStatus,
904 duration_ms: Option<u64>,
905 transcript_start: Option<u64>,
906 transcript_error: Option<u64>,
907 log_paths: Vec<PathBuf>,
908 failure_code: Option<String>,
909 failure_message: Option<String>,
910 failure_details: Option<Value>,
911 start_instant: Option<Instant>,
912}
913
914impl NodeExecutionRecord {
915 fn new(component: String, _dirs: &RunDirectories) -> Self {
916 Self {
917 component,
918 status: NodeStatus::Ok,
919 duration_ms: None,
920 transcript_start: None,
921 transcript_error: None,
922 log_paths: Vec::new(),
923 failure_code: None,
924 failure_message: None,
925 failure_details: None,
926 start_instant: None,
927 }
928 }
929}
930
931struct TranscriptWriter {
932 writer: BufWriter<File>,
933 offset: u64,
934 hook: Option<TranscriptHook>,
935}
936
937impl TranscriptWriter {
938 fn new(writer: BufWriter<File>, hook: Option<TranscriptHook>) -> Self {
939 Self {
940 writer,
941 offset: 0,
942 hook,
943 }
944 }
945
946 fn write(&mut self, value: &Value) -> Result<(u64, u64)> {
947 let line = serde_json::to_vec(value)?;
948 let start = self.offset;
949 self.writer.write_all(&line)?;
950 self.writer.write_all(b"\n")?;
951 self.writer.flush()?;
952 self.offset += line.len() as u64 + 1;
953 if let Some(hook) = &self.hook {
954 hook(value);
955 }
956 Ok((start, self.offset))
957 }
958}
959
960struct TranscriptEventArgs<'a> {
961 profile: &'a ResolvedProfile,
962 flow_id: &'a str,
963 node_id: &'a str,
964 component: &'a str,
965 phase: &'a str,
966 status: &'a str,
967 timestamp: OffsetDateTime,
968 inputs: Value,
969 outputs: Value,
970 error: Value,
971 redactions: Vec<String>,
972}
973
974fn build_transcript_event(args: TranscriptEventArgs<'_>) -> Value {
975 let ts = args
976 .timestamp
977 .format(&Rfc3339)
978 .unwrap_or_else(|_| args.timestamp.to_string());
979 json!({
980 "ts": ts,
981 "session_id": args.profile.session_id.as_str(),
982 "flow_id": args.flow_id,
983 "node_id": args.node_id,
984 "component": args.component,
985 "phase": args.phase,
986 "status": args.status,
987 "inputs": args.inputs,
988 "outputs": args.outputs,
989 "error": args.error,
990 "metrics": {
991 "duration_ms": null,
992 "cpu_time_ms": null,
993 "mem_peak_bytes": null,
994 },
995 "schema_id": Value::Null,
996 "defaults_applied": Value::Array(Vec::new()),
997 "redactions": args.redactions,
998 })
999}
1000
1001fn redact_value(value: &Value, base: &str) -> (Value, Vec<String>) {
1002 let mut paths = Vec::new();
1003 let redacted = redact_recursive(value, base, &mut paths);
1004 (redacted, paths)
1005}
1006
1007fn redact_recursive(value: &Value, path: &str, acc: &mut Vec<String>) -> Value {
1008 match value {
1009 Value::Object(map) => {
1010 let mut new_map = JsonMap::new();
1011 for (key, val) in map {
1012 let child_path = format!("{path}.{key}");
1013 if is_sensitive_key(key) {
1014 acc.push(child_path);
1015 new_map.insert(key.clone(), Value::String("__REDACTED__".into()));
1016 } else {
1017 new_map.insert(key.clone(), redact_recursive(val, &child_path, acc));
1018 }
1019 }
1020 Value::Object(new_map)
1021 }
1022 Value::Array(items) => {
1023 let mut new_items = Vec::new();
1024 for (idx, item) in items.iter().enumerate() {
1025 let child_path = format!("{path}[{idx}]");
1026 new_items.push(redact_recursive(item, &child_path, acc));
1027 }
1028 Value::Array(new_items)
1029 }
1030 other => other.clone(),
1031 }
1032}
1033
1034fn is_sensitive_key(key: &str) -> bool {
1035 let lower = key.to_ascii_lowercase();
1036 const MARKERS: [&str; 5] = ["secret", "token", "password", "authorization", "cookie"];
1037 MARKERS.iter().any(|marker| lower.contains(marker))
1038}
1039
1040fn sanitize_id(value: &str) -> String {
1041 value
1042 .chars()
1043 .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
1044 .collect()
1045}