1use anyhow::{Context, Result, anyhow};
2use greentic_pack::reader::open_pack;
3use greentic_runner_host::RunnerWasiPolicy;
4use greentic_runner_host::config::{
5 FlowRetryConfig, HostConfig, OperatorPolicy, RateLimits, SecretsPolicy, StateStorePolicy,
6 WebhookPolicy,
7};
8use greentic_runner_host::pack::{ComponentResolution, FlowDescriptor, PackMetadata, PackRuntime};
9use greentic_runner_host::runner::engine::{ExecutionObserver, FlowContext, FlowEngine, NodeEvent};
10pub use greentic_runner_host::runner::mocks::{
11 HttpMock, HttpMockMode, KvMock, MocksConfig, SecretsMock, TelemetryMock, TimeMock, ToolsMock,
12};
13use greentic_runner_host::runner::mocks::{MockEventSink, MockLayer};
14use greentic_runner_host::secrets::{DynSecretsManager, default_manager};
15use greentic_runner_host::storage::{new_session_store, new_state_store};
16use greentic_runner_host::trace::TraceConfig;
17use greentic_runner_host::validate::ValidationConfig;
18use parking_lot::Mutex;
19use runner_core::normalize_under_root;
20use serde::{Deserialize, Serialize};
21use serde_json::{Map as JsonMap, Value, json};
22use std::collections::{BTreeMap, HashMap};
23use std::fmt;
24use std::fs::{self, File};
25use std::io::{BufWriter, Write};
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use std::time::Instant;
29use time::OffsetDateTime;
30use time::format_description::well_known::Rfc3339;
31use tokio::runtime::Runtime;
32use tracing::{info, warn};
33use uuid::Uuid;
34
35const PROVIDER_ID_DEV: &str = "greentic-dev";
36
37pub type TranscriptHook = Arc<dyn Fn(&Value) + Send + Sync>;
39
40#[derive(Clone, Debug, Serialize, Deserialize, Default)]
42pub struct OtlpHook {
43 pub endpoint: String,
44 #[serde(default)]
45 pub headers: Vec<(String, String)>,
46 #[serde(default)]
47 pub sample_all: bool,
48}
49
50#[derive(Clone, Debug)]
52#[non_exhaustive]
53pub enum Profile {
54 Dev(DevProfile),
55}
56
57impl Default for Profile {
58 fn default() -> Self {
59 Self::Dev(DevProfile::default())
60 }
61}
62
63#[derive(Clone, Debug)]
65pub struct DevProfile {
66 pub tenant_id: String,
67 pub team_id: String,
68 pub user_id: String,
69 pub max_node_wall_time_ms: u64,
70 pub max_run_wall_time_ms: u64,
71}
72
73impl Default for DevProfile {
74 fn default() -> Self {
75 Self {
76 tenant_id: "local-dev".to_string(),
77 team_id: "default".to_string(),
78 user_id: "developer".to_string(),
79 max_node_wall_time_ms: 30_000,
80 max_run_wall_time_ms: 600_000,
81 }
82 }
83}
84
85#[derive(Clone, Debug, Default)]
87pub struct TenantContext {
88 pub tenant_id: Option<String>,
89 pub team_id: Option<String>,
90 pub user_id: Option<String>,
91 pub session_id: Option<String>,
92}
93
94impl TenantContext {
95 pub fn default_local() -> Self {
96 Self {
97 tenant_id: Some("local-dev".into()),
98 team_id: Some("default".into()),
99 user_id: Some("developer".into()),
100 session_id: None,
101 }
102 }
103}
104
105#[derive(Clone)]
107pub struct RunOptions {
108 pub profile: Profile,
109 pub entry_flow: Option<String>,
110 pub input: Value,
111 pub ctx: TenantContext,
112 pub transcript: Option<TranscriptHook>,
113 pub otlp: Option<OtlpHook>,
114 pub mocks: MocksConfig,
115 pub artifacts_dir: Option<PathBuf>,
116 pub signing: SigningPolicy,
117 pub components_dir: Option<PathBuf>,
118 pub components_map: HashMap<String, PathBuf>,
119 pub dist_offline: bool,
120 pub dist_cache_dir: Option<PathBuf>,
121 pub allow_missing_hash: bool,
122 pub secrets_manager: Option<DynSecretsManager>,
125 pub cross_pack_resolver:
128 Option<Arc<dyn greentic_runner_host::runner::engine::CrossPackResolver>>,
129 pub session_state_dir: Option<PathBuf>,
140}
141
142impl Default for RunOptions {
143 fn default() -> Self {
144 desktop_defaults()
145 }
146}
147
148impl fmt::Debug for RunOptions {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 f.debug_struct("RunOptions")
151 .field("profile", &self.profile)
152 .field("entry_flow", &self.entry_flow)
153 .field("input", &self.input)
154 .field("ctx", &self.ctx)
155 .field("transcript", &self.transcript.is_some())
156 .field("otlp", &self.otlp)
157 .field("mocks", &self.mocks)
158 .field("artifacts_dir", &self.artifacts_dir)
159 .field("signing", &self.signing)
160 .field("components_dir", &self.components_dir)
161 .field("components_map_len", &self.components_map.len())
162 .field("dist_offline", &self.dist_offline)
163 .field("dist_cache_dir", &self.dist_cache_dir)
164 .field("allow_missing_hash", &self.allow_missing_hash)
165 .field("secrets_manager", &self.secrets_manager.is_some())
166 .finish()
167 }
168}
169
170#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
171pub enum SigningPolicy {
172 Strict,
173 DevOk,
174}
175
176#[derive(Clone, Debug)]
178pub struct Runner {
179 base: RunOptions,
180}
181
182impl Runner {
183 pub fn new() -> Self {
184 Self {
185 base: desktop_defaults(),
186 }
187 }
188
189 pub fn profile(mut self, profile: Profile) -> Self {
190 self.base.profile = profile;
191 self
192 }
193
194 pub fn with_mocks(mut self, mocks: MocksConfig) -> Self {
195 self.base.mocks = mocks;
196 self
197 }
198
199 pub fn configure(mut self, f: impl FnOnce(&mut RunOptions)) -> Self {
200 f(&mut self.base);
201 self
202 }
203
204 pub fn run_pack<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
205 run_pack_with_options(pack_path, self.base.clone())
206 }
207
208 pub fn run_pack_with<P: AsRef<Path>>(
209 &self,
210 pack_path: P,
211 f: impl FnOnce(&mut RunOptions),
212 ) -> Result<RunResult> {
213 let mut opts = self.base.clone();
214 f(&mut opts);
215 run_pack_with_options(pack_path, opts)
216 }
217
218 pub async fn run_pack_async<P: AsRef<Path>>(&self, pack_path: P) -> Result<RunResult> {
219 run_pack_with_options_async(pack_path, self.base.clone()).await
220 }
221
222 pub async fn run_pack_with_async<P: AsRef<Path>>(
223 &self,
224 pack_path: P,
225 f: impl FnOnce(&mut RunOptions),
226 ) -> Result<RunResult> {
227 let mut opts = self.base.clone();
228 f(&mut opts);
229 run_pack_with_options_async(pack_path, opts).await
230 }
231}
232
233impl Default for Runner {
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239pub fn run_pack_with_options<P: AsRef<Path>>(pack_path: P, opts: RunOptions) -> Result<RunResult> {
241 let runtime = Runtime::new().context("failed to create tokio runtime")?;
242 runtime.block_on(run_pack_async(pack_path.as_ref(), opts))
243}
244
245pub async fn run_pack_with_options_async<P: AsRef<Path>>(
247 pack_path: P,
248 opts: RunOptions,
249) -> Result<RunResult> {
250 run_pack_async(pack_path.as_ref(), opts).await
251}
252
253pub fn desktop_defaults() -> RunOptions {
255 let otlp = std::env::var("OTLP_ENDPOINT")
256 .ok()
257 .map(|endpoint| OtlpHook {
258 endpoint,
259 headers: Vec::new(),
260 sample_all: true,
261 });
262 RunOptions {
263 profile: Profile::Dev(DevProfile::default()),
264 entry_flow: None,
265 input: json!({}),
266 ctx: TenantContext::default_local(),
267 transcript: None,
268 otlp,
269 mocks: MocksConfig::default(),
270 artifacts_dir: None,
271 signing: SigningPolicy::DevOk,
272 components_dir: None,
273 components_map: HashMap::new(),
274 dist_offline: false,
275 dist_cache_dir: None,
276 allow_missing_hash: false,
277 secrets_manager: None,
278 cross_pack_resolver: None,
279 session_state_dir: None,
280 }
281}
282
283fn resolve_secrets_manager(opts: &RunOptions) -> Result<DynSecretsManager> {
284 if let Some(manager) = opts.secrets_manager.clone() {
285 Ok(manager)
286 } else {
287 default_manager().context("failed to initialise secrets backend")
288 }
289}
290
291async fn run_pack_async(pack_path: &Path, opts: RunOptions) -> Result<RunResult> {
292 let pack_path = normalize_pack_path(pack_path)?;
293 let resolved_profile = resolve_profile(&opts.profile, &opts.ctx);
294 if let Some(otlp) = &opts.otlp {
295 apply_otlp_hook(otlp);
296 }
297
298 let directories = prepare_run_dirs(opts.artifacts_dir.clone())?;
299 info!(run_dir = %directories.root.display(), "prepared desktop run directory");
300
301 let mock_layer = Arc::new(MockLayer::new(opts.mocks.clone(), &directories.root)?);
302
303 let recorder = Arc::new(RunRecorder::new(
304 directories.clone(),
305 &resolved_profile,
306 None,
307 PackMetadata::fallback(&pack_path),
308 opts.transcript.clone(),
309 )?);
310
311 let mock_sink: Arc<dyn MockEventSink> = recorder.clone();
312 mock_layer.register_sink(mock_sink);
313
314 let mut pack_http_flags: Vec<bool> = Vec::new();
318 if pack_path.is_file() {
319 match open_pack(&pack_path, to_reader_policy(opts.signing)) {
320 Ok(load) => {
321 let meta = &load.manifest.meta;
322 recorder.update_pack_metadata(PackMetadata {
323 pack_id: meta.pack_id.clone(),
324 version: meta.version.to_string(),
325 entry_flows: meta.entry_flows.clone(),
326 secret_requirements: Vec::new(),
327 });
328 if let Some(manifest) = load.gpack_manifest.as_ref() {
329 pack_http_flags = manifest
330 .components
331 .iter()
332 .map(|component| {
333 component
334 .capabilities
335 .host
336 .http
337 .as_ref()
338 .is_some_and(|http| http.client)
339 })
340 .collect();
341 }
342 }
343 Err(err) => {
344 recorder.record_verify_event("error", &err.message)?;
345 if opts.signing == SigningPolicy::DevOk && is_signature_error(&err.message) {
346 warn!(error = %err.message, "continuing despite signature error (dev policy)");
347 } else {
348 return Err(anyhow!("pack verification failed: {}", err.message));
349 }
350 }
351 }
352 } else {
353 tracing::debug!(
354 path = %pack_path.display(),
355 "skipping pack verification for directory input"
356 );
357 }
358
359 let kill_switch = desktop_http_kill_switch_active();
360 let http_enabled = derive_http_enabled(&pack_http_flags, kill_switch);
361 let http_component_count = pack_http_flags.iter().filter(|flag| **flag).count();
362 if http_enabled {
363 info!(
364 total_component_count = pack_http_flags.len(),
365 http_component_count,
366 "enabling HTTP client gate based on component manifest capabilities"
367 );
368 } else if kill_switch {
369 info!(
370 env_var = DESKTOP_HTTP_DISABLE_ENV,
371 "HTTP client gate forced off by desktop kill-switch"
372 );
373 } else {
374 tracing::debug!(
375 total_component_count = pack_http_flags.len(),
376 "HTTP client gate disabled: no component manifest declares host.http.client"
377 );
378 }
379 let host_config = Arc::new(build_host_config(
380 &resolved_profile,
381 &directories,
382 http_enabled,
383 ));
384 let mut component_resolution = ComponentResolution::default();
385 if let Some(dir) = opts.components_dir.clone() {
386 component_resolution.materialized_root = Some(dir);
387 } else if pack_path.is_dir() {
388 component_resolution.materialized_root = Some(pack_path.clone());
389 }
390 component_resolution.overrides = opts.components_map.clone();
391 component_resolution.dist_offline = opts.dist_offline;
392 component_resolution.dist_cache_dir = opts.dist_cache_dir.clone();
393 component_resolution.allow_missing_hash = opts.allow_missing_hash;
394 let archive_source = if pack_path
395 .extension()
396 .and_then(|ext| ext.to_str())
397 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
398 .unwrap_or(false)
399 {
400 Some(&pack_path)
401 } else {
402 None
403 };
404
405 let session_store = new_session_store();
406 let state_store = new_state_store();
407 let secrets_manager = resolve_secrets_manager(&opts)?;
408 let pack = Arc::new(
409 PackRuntime::load(
410 &pack_path,
411 Arc::clone(&host_config),
412 Some(Arc::clone(&mock_layer)),
413 archive_source.map(|p| p as &Path),
414 Some(Arc::clone(&session_store)),
415 Some(Arc::clone(&state_store)),
416 Arc::new(RunnerWasiPolicy::default()),
417 secrets_manager,
418 host_config.oauth_broker_config(),
419 false,
420 component_resolution,
421 )
422 .await
423 .with_context(|| format!("failed to load pack {}", pack_path.display()))?,
424 );
425 recorder.update_pack_metadata(pack.metadata().clone());
426
427 let flows = pack
428 .list_flows()
429 .await
430 .context("failed to enumerate flows")?;
431 let entry_flow_id = resolve_entry_flow(opts.entry_flow.clone(), pack.metadata(), &flows)?;
432 recorder.set_flow_id(&entry_flow_id);
433
434 let mut engine = FlowEngine::new(vec![Arc::clone(&pack)], Arc::clone(&host_config))
435 .await
436 .context("failed to prime flow engine")?;
437 if let Some(resolver) = opts.cross_pack_resolver.clone() {
438 engine.set_cross_pack_resolver(resolver);
439 }
440
441 let started_at = OffsetDateTime::now_utc();
442 let tenant_str = host_config.tenant.clone();
443 let session_id_owned = resolved_profile.session_id.clone();
444 let provider_id_owned = resolved_profile.provider_id.clone();
445 let recorder_ref: &RunRecorder = &recorder;
446 let mock_ref: &MockLayer = &mock_layer;
447 let ctx = FlowContext {
448 tenant: &tenant_str,
449 pack_id: pack.metadata().pack_id.as_str(),
450 flow_id: &entry_flow_id,
451 node_id: None,
452 tool: None,
453 action: Some("run_pack"),
454 session_id: Some(session_id_owned.as_str()),
455 provider_id: Some(provider_id_owned.as_str()),
456 retry_config: host_config.retry_config().into(),
457 attempt: 1,
458 observer: Some(recorder_ref),
459 mocks: Some(mock_ref),
460 };
461
462 let session_snapshot_path = session_snapshot_file(
467 opts.session_state_dir.as_deref(),
468 resolved_profile.session_id.as_str(),
469 );
470 let resume_snapshot = session_snapshot_path
471 .as_deref()
472 .and_then(load_session_snapshot);
473
474 let execution = if let Some(snapshot) = resume_snapshot {
475 engine.resume(ctx, snapshot, opts.input.clone()).await
476 } else {
477 engine.execute(ctx, opts.input.clone()).await
478 };
479 let finished_at = OffsetDateTime::now_utc();
480
481 let status = match execution {
482 Ok(result) => match result.status {
483 greentic_runner_host::runner::engine::FlowStatus::Completed => {
484 if let Some(path) = session_snapshot_path.as_deref() {
487 let _ = std::fs::remove_file(path);
488 }
489 RunCompletion::Ok
490 }
491 greentic_runner_host::runner::engine::FlowStatus::Waiting(wait) => {
492 let reason = wait
493 .reason
494 .clone()
495 .unwrap_or_else(|| "flow paused unexpectedly".to_string());
496 if let Some(path) = session_snapshot_path.as_deref() {
497 if let Err(err) = save_session_snapshot(path, &wait.snapshot) {
498 RunCompletion::Err(anyhow::anyhow!(
504 "{reason} (and failed to persist resume snapshot to {}: {err})",
505 path.display()
506 ))
507 } else {
508 RunCompletion::Ok
512 }
513 } else {
514 RunCompletion::Err(anyhow::anyhow!(reason))
518 }
519 }
520 },
521 Err(err) => RunCompletion::Err(err),
522 };
523
524 let result = recorder.finalise(status, started_at, finished_at)?;
525
526 let run_json_path = directories.root.join("run.json");
527 fs::write(&run_json_path, serde_json::to_vec_pretty(&result)?)
528 .with_context(|| format!("failed to write run summary {}", run_json_path.display()))?;
529
530 Ok(result)
531}
532
533fn session_snapshot_file(session_state_dir: Option<&Path>, session_id: &str) -> Option<PathBuf> {
539 let dir = session_state_dir?;
540 if session_id.is_empty() {
541 return None;
542 }
543 let safe_id: String = session_id
544 .chars()
545 .map(|c| {
546 if c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.' || c == ':' {
547 c
548 } else {
549 '_'
550 }
551 })
552 .collect();
553 Some(dir.join(format!("{safe_id}.snapshot.json")))
554}
555
556fn load_session_snapshot(
560 path: &Path,
561) -> Option<greentic_runner_host::runner::engine::FlowSnapshot> {
562 let bytes = fs::read(path).ok()?;
563 serde_json::from_slice(&bytes).ok()
564}
565
566fn save_session_snapshot(
570 path: &Path,
571 snapshot: &greentic_runner_host::runner::engine::FlowSnapshot,
572) -> Result<()> {
573 if let Some(parent) = path.parent() {
574 fs::create_dir_all(parent)
575 .with_context(|| format!("create session snapshot dir {}", parent.display()))?;
576 }
577 let bytes = serde_json::to_vec_pretty(snapshot)
578 .with_context(|| "serialize session snapshot for persistence")?;
579 fs::write(path, bytes).with_context(|| format!("write session snapshot {}", path.display()))?;
580 Ok(())
581}
582
583fn apply_otlp_hook(hook: &OtlpHook) {
584 info!(
585 endpoint = %hook.endpoint,
586 sample_all = hook.sample_all,
587 headers = %hook.headers.len(),
588 "OTLP hook requested (set OTEL_* env vars before invoking run_pack)"
589 );
590}
591
592fn normalize_pack_path(path: &Path) -> Result<PathBuf> {
593 if path.is_absolute() {
594 let parent = path
595 .parent()
596 .ok_or_else(|| anyhow!("pack path {} has no parent", path.display()))?;
597 let root = parent
598 .canonicalize()
599 .with_context(|| format!("failed to canonicalize {}", parent.display()))?;
600 let file = path
601 .file_name()
602 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
603 return normalize_under_root(&root, Path::new(file));
604 }
605
606 let cwd = std::env::current_dir().context("failed to resolve current directory")?;
607 let base = if let Some(parent) = path.parent() {
608 cwd.join(parent)
609 } else {
610 cwd
611 };
612 let root = base
613 .canonicalize()
614 .with_context(|| format!("failed to canonicalize {}", base.display()))?;
615 let file = path
616 .file_name()
617 .ok_or_else(|| anyhow!("pack path {} has no file name", path.display()))?;
618 normalize_under_root(&root, Path::new(file))
619}
620
621fn prepare_run_dirs(root_override: Option<PathBuf>) -> Result<RunDirectories> {
622 let root = if let Some(dir) = root_override {
623 dir
624 } else {
625 let timestamp = OffsetDateTime::now_utc()
626 .format(&Rfc3339)
627 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
628 .replace(':', "-");
629 let short_id = Uuid::new_v4()
630 .to_string()
631 .chars()
632 .take(6)
633 .collect::<String>();
634 PathBuf::from(".greentic")
635 .join("runs")
636 .join(format!("{timestamp}_{short_id}"))
637 };
638
639 fs::create_dir_all(&root).with_context(|| format!("failed to create {}", root.display()))?;
640 let logs = root.join("logs");
641 let resolved = root.join("resolved_config");
642 fs::create_dir_all(&logs).with_context(|| format!("failed to create {}", logs.display()))?;
643 fs::create_dir_all(&resolved)
644 .with_context(|| format!("failed to create {}", resolved.display()))?;
645
646 Ok(RunDirectories {
647 root,
648 logs,
649 resolved,
650 })
651}
652
653fn resolve_entry_flow(
654 override_id: Option<String>,
655 metadata: &PackMetadata,
656 flows: &[FlowDescriptor],
657) -> Result<String> {
658 if let Some(flow) = override_id {
659 return Ok(flow);
660 }
661 if let Some(first) = metadata.entry_flows.first() {
662 return Ok(first.clone());
663 }
664 flows
665 .first()
666 .map(|f| f.id.clone())
667 .ok_or_else(|| anyhow!("pack does not declare any flows"))
668}
669
670fn is_signature_error(message: &str) -> bool {
671 message.to_ascii_lowercase().contains("signature")
672}
673
674fn to_reader_policy(policy: SigningPolicy) -> greentic_pack::reader::SigningPolicy {
675 match policy {
676 SigningPolicy::Strict => greentic_pack::reader::SigningPolicy::Strict,
677 SigningPolicy::DevOk => greentic_pack::reader::SigningPolicy::DevOk,
678 }
679}
680
681fn resolve_profile(profile: &Profile, ctx: &TenantContext) -> ResolvedProfile {
682 match profile {
683 Profile::Dev(dev) => ResolvedProfile {
684 tenant_id: ctx
685 .tenant_id
686 .clone()
687 .unwrap_or_else(|| dev.tenant_id.clone()),
688 team_id: ctx.team_id.clone().unwrap_or_else(|| dev.team_id.clone()),
689 user_id: ctx.user_id.clone().unwrap_or_else(|| dev.user_id.clone()),
690 session_id: ctx
691 .session_id
692 .clone()
693 .unwrap_or_else(|| Uuid::new_v4().to_string()),
694 provider_id: PROVIDER_ID_DEV.to_string(),
695 max_node_wall_time_ms: dev.max_node_wall_time_ms,
696 max_run_wall_time_ms: dev.max_run_wall_time_ms,
697 },
698 }
699}
700
701const DESKTOP_HTTP_DISABLE_ENV: &str = "GREENTIC_DESKTOP_HTTP_DISABLE";
705
706fn derive_http_enabled(component_http_flags: &[bool], kill_switch: bool) -> bool {
713 !kill_switch && component_http_flags.iter().any(|flag| *flag)
714}
715
716fn parse_kill_switch(raw: Option<&str>) -> bool {
722 matches!(
723 raw.map(|value| value.trim().to_ascii_lowercase())
724 .as_deref(),
725 Some("1" | "true" | "yes" | "on")
726 )
727}
728
729fn desktop_http_kill_switch_active() -> bool {
732 parse_kill_switch(std::env::var(DESKTOP_HTTP_DISABLE_ENV).ok().as_deref())
733}
734
735fn build_host_config(
736 profile: &ResolvedProfile,
737 dirs: &RunDirectories,
738 http_enabled: bool,
739) -> HostConfig {
740 HostConfig {
741 tenant: profile.tenant_id.clone(),
742 bindings_path: dirs.resolved.join("dev.bindings.yaml"),
743 flow_type_bindings: HashMap::new(),
744 rate_limits: RateLimits::default(),
745 retry: FlowRetryConfig::default(),
746 http_enabled,
747 secrets_policy: SecretsPolicy::allow_all(),
748 state_store_policy: StateStorePolicy::default(),
749 webhook_policy: WebhookPolicy::default(),
750 timers: Vec::new(),
751 oauth: None,
752 mocks: None,
753 pack_bindings: Vec::new(),
754 env_passthrough: Vec::new(),
755 trace: TraceConfig::from_env(),
756 validation: ValidationConfig::from_env(),
757 operator_policy: OperatorPolicy::allow_all(),
758 fast2flow: Default::default(),
759 }
760}
761
762#[allow(dead_code)]
763#[derive(Clone, Debug)]
764struct ResolvedProfile {
765 tenant_id: String,
766 team_id: String,
767 user_id: String,
768 session_id: String,
769 provider_id: String,
770 max_node_wall_time_ms: u64,
771 max_run_wall_time_ms: u64,
772}
773
774#[derive(Clone, Debug, Serialize, Deserialize)]
775pub struct RunResult {
776 pub session_id: String,
777 pub pack_id: String,
778 pub pack_version: String,
779 pub flow_id: String,
780 pub started_at_utc: String,
781 pub finished_at_utc: String,
782 pub status: RunStatus,
783 pub error: Option<String>,
784 pub node_summaries: Vec<NodeSummary>,
785 pub failures: BTreeMap<String, NodeFailure>,
786 pub artifacts_dir: PathBuf,
787}
788
789#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
790pub enum RunStatus {
791 Success,
792 PartialFailure,
793 Failure,
794}
795
796#[derive(Clone, Debug, Serialize, Deserialize)]
797pub struct NodeSummary {
798 pub node_id: String,
799 pub component: String,
800 pub status: NodeStatus,
801 pub duration_ms: u64,
802}
803
804#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
805pub enum NodeStatus {
806 Ok,
807 Skipped,
808 Error,
809}
810
811#[derive(Clone, Debug, Serialize, Deserialize)]
812pub struct NodeFailure {
813 pub code: String,
814 pub message: String,
815 pub details: Value,
816 pub transcript_offsets: (u64, u64),
817 pub log_paths: Vec<PathBuf>,
818}
819
820#[derive(Clone)]
821struct RunDirectories {
822 root: PathBuf,
823 logs: PathBuf,
824 resolved: PathBuf,
825}
826
827enum RunCompletion {
828 Ok,
829 Err(anyhow::Error),
830}
831
832struct RunRecorder {
833 directories: RunDirectories,
834 profile: ResolvedProfile,
835 flow_id: Mutex<String>,
836 pack_meta: Mutex<PackMetadata>,
837 transcript: Mutex<TranscriptWriter>,
838 state: Mutex<RunRecorderState>,
839}
840
841impl RunRecorder {
842 fn new(
843 dirs: RunDirectories,
844 profile: &ResolvedProfile,
845 flow_id: Option<String>,
846 pack_meta: PackMetadata,
847 hook: Option<TranscriptHook>,
848 ) -> Result<Self> {
849 let transcript_path = dirs.root.join("transcript.jsonl");
850 let file = File::create(&transcript_path)
851 .with_context(|| format!("failed to open {}", transcript_path.display()))?;
852 Ok(Self {
853 directories: dirs,
854 profile: profile.clone(),
855 flow_id: Mutex::new(flow_id.unwrap_or_else(|| "unknown".into())),
856 pack_meta: Mutex::new(pack_meta),
857 transcript: Mutex::new(TranscriptWriter::new(BufWriter::new(file), hook)),
858 state: Mutex::new(RunRecorderState::default()),
859 })
860 }
861
862 fn finalise(
863 &self,
864 completion: RunCompletion,
865 started_at: OffsetDateTime,
866 finished_at: OffsetDateTime,
867 ) -> Result<RunResult> {
868 let status = match &completion {
869 RunCompletion::Ok => RunStatus::Success,
870 RunCompletion::Err(_) => RunStatus::Failure,
871 };
872
873 let mut runtime_error = None;
874 if let RunCompletion::Err(err) = completion {
875 warn!(error = %err, "pack execution failed");
876 eprintln!("pack execution failed: {err}");
877 runtime_error = Some(err.to_string());
878 }
879
880 let started = started_at
881 .format(&Rfc3339)
882 .unwrap_or_else(|_| started_at.to_string());
883 let finished = finished_at
884 .format(&Rfc3339)
885 .unwrap_or_else(|_| finished_at.to_string());
886
887 let state = self.state.lock();
888 let mut summaries = Vec::new();
889 let mut failures = BTreeMap::new();
890 for node_id in &state.order {
891 if let Some(record) = state.nodes.get(node_id) {
892 let duration = record.duration_ms.unwrap_or(0);
893 summaries.push(NodeSummary {
894 node_id: node_id.clone(),
895 component: record.component.clone(),
896 status: record.status.clone(),
897 duration_ms: duration,
898 });
899 if record.status == NodeStatus::Error {
900 let start_offset = record.transcript_start.unwrap_or(0);
901 let err_offset = record.transcript_error.unwrap_or(start_offset);
902 failures.insert(
903 node_id.clone(),
904 NodeFailure {
905 code: record
906 .failure_code
907 .clone()
908 .unwrap_or_else(|| "component-failed".into()),
909 message: record
910 .failure_message
911 .clone()
912 .unwrap_or_else(|| "node failed".into()),
913 details: record
914 .failure_details
915 .clone()
916 .unwrap_or_else(|| json!({ "node": node_id })),
917 transcript_offsets: (start_offset, err_offset),
918 log_paths: record.log_paths.clone(),
919 },
920 );
921 }
922 }
923 }
924
925 let mut final_status = status;
926 if final_status == RunStatus::Success && !failures.is_empty() {
927 final_status = RunStatus::PartialFailure;
928 }
929
930 if let Some(error) = &runtime_error {
931 failures.insert(
932 "_runtime".into(),
933 NodeFailure {
934 code: "runtime-error".into(),
935 message: error.clone(),
936 details: json!({ "stage": "execute" }),
937 transcript_offsets: (0, 0),
938 log_paths: Vec::new(),
939 },
940 );
941 }
942
943 let pack_meta = self.pack_meta.lock();
944 let flow_id = self.flow_id.lock().clone();
945 Ok(RunResult {
946 session_id: self.profile.session_id.clone(),
947 pack_id: pack_meta.pack_id.clone(),
948 pack_version: pack_meta.version.clone(),
949 flow_id,
950 started_at_utc: started,
951 finished_at_utc: finished,
952 status: final_status,
953 error: runtime_error,
954 node_summaries: summaries,
955 failures,
956 artifacts_dir: self.directories.root.clone(),
957 })
958 }
959
960 fn set_flow_id(&self, flow_id: &str) {
961 *self.flow_id.lock() = flow_id.to_string();
962 }
963
964 fn update_pack_metadata(&self, meta: PackMetadata) {
965 *self.pack_meta.lock() = meta;
966 }
967
968 fn current_flow_id(&self) -> String {
969 self.flow_id.lock().clone()
970 }
971
972 fn record_verify_event(&self, status: &str, message: &str) -> Result<()> {
973 let timestamp = OffsetDateTime::now_utc();
974 let event = json!({
975 "ts": timestamp
976 .format(&Rfc3339)
977 .unwrap_or_else(|_| timestamp.to_string()),
978 "session_id": self.profile.session_id,
979 "flow_id": self.current_flow_id(),
980 "node_id": Value::Null,
981 "component": "verify.pack",
982 "phase": "verify",
983 "status": status,
984 "inputs": Value::Null,
985 "outputs": Value::Null,
986 "error": json!({ "message": message }),
987 "metrics": Value::Null,
988 "schema_id": Value::Null,
989 "defaults_applied": Value::Null,
990 "redactions": Value::Array(Vec::new()),
991 });
992 self.transcript.lock().write(&event).map(|_| ())
993 }
994
995 fn write_mock_event(&self, capability: &str, provider: &str, payload: Value) -> Result<()> {
996 let timestamp = OffsetDateTime::now_utc();
997 let event = json!({
998 "ts": timestamp
999 .format(&Rfc3339)
1000 .unwrap_or_else(|_| timestamp.to_string()),
1001 "session_id": self.profile.session_id,
1002 "flow_id": self.current_flow_id(),
1003 "node_id": Value::Null,
1004 "component": format!("mock::{capability}"),
1005 "phase": "mock",
1006 "status": "ok",
1007 "inputs": json!({ "capability": capability, "provider": provider }),
1008 "outputs": payload,
1009 "error": Value::Null,
1010 "metrics": Value::Null,
1011 "schema_id": Value::Null,
1012 "defaults_applied": Value::Null,
1013 "redactions": Value::Array(Vec::new()),
1014 });
1015 self.transcript.lock().write(&event).map(|_| ())
1016 }
1017
1018 fn handle_node_start(&self, event: &NodeEvent<'_>) -> Result<()> {
1019 let timestamp = OffsetDateTime::now_utc();
1020 let (redacted_payload, redactions) = redact_value(event.payload, "$.inputs.payload");
1021 let inputs = json!({
1022 "payload": redacted_payload,
1023 "context": {
1024 "tenant_id": self.profile.tenant_id.as_str(),
1025 "team_id": self.profile.team_id.as_str(),
1026 "user_id": self.profile.user_id.as_str(),
1027 }
1028 });
1029 let flow_id = self.current_flow_id();
1030 let event_json = build_transcript_event(TranscriptEventArgs {
1031 profile: &self.profile,
1032 flow_id: &flow_id,
1033 node_id: event.node_id,
1034 component: &event.node.component,
1035 phase: "start",
1036 status: "ok",
1037 timestamp,
1038 inputs,
1039 outputs: Value::Null,
1040 error: Value::Null,
1041 redactions,
1042 });
1043 let (start_offset, _) = self.transcript.lock().write(&event_json)?;
1044
1045 let mut state = self.state.lock();
1046 let node_key = event.node_id.to_string();
1047 if !state.order.iter().any(|id| id == &node_key) {
1048 state.order.push(node_key.clone());
1049 }
1050 let entry = state.nodes.entry(node_key).or_insert_with(|| {
1051 NodeExecutionRecord::new(event.node.component.clone(), &self.directories)
1052 });
1053 entry.start_instant = Some(Instant::now());
1054 entry.status = NodeStatus::Ok;
1055 entry.transcript_start = Some(start_offset);
1056 Ok(())
1057 }
1058
1059 fn handle_node_end(&self, event: &NodeEvent<'_>, output: &Value) -> Result<()> {
1060 let timestamp = OffsetDateTime::now_utc();
1061 let (redacted_output, redactions) = redact_value(output, "$.outputs");
1062 let flow_id = self.current_flow_id();
1063 let event_json = build_transcript_event(TranscriptEventArgs {
1064 profile: &self.profile,
1065 flow_id: &flow_id,
1066 node_id: event.node_id,
1067 component: &event.node.component,
1068 phase: "end",
1069 status: "ok",
1070 timestamp,
1071 inputs: Value::Null,
1072 outputs: redacted_output,
1073 error: Value::Null,
1074 redactions,
1075 });
1076 self.transcript.lock().write(&event_json)?;
1077
1078 let mut state = self.state.lock();
1079 if let Some(entry) = state.nodes.get_mut(event.node_id)
1080 && let Some(started) = entry.start_instant.take()
1081 {
1082 entry.duration_ms = Some(started.elapsed().as_millis() as u64);
1083 }
1084 Ok(())
1085 }
1086
1087 fn handle_node_error(
1088 &self,
1089 event: &NodeEvent<'_>,
1090 error: &dyn std::error::Error,
1091 ) -> Result<()> {
1092 let timestamp = OffsetDateTime::now_utc();
1093 let error_message = error.to_string();
1094 let error_json = json!({
1095 "code": "component-failed",
1096 "message": error_message,
1097 "details": {
1098 "node": event.node_id,
1099 }
1100 });
1101 let flow_id = self.current_flow_id();
1102 let event_json = build_transcript_event(TranscriptEventArgs {
1103 profile: &self.profile,
1104 flow_id: &flow_id,
1105 node_id: event.node_id,
1106 component: &event.node.component,
1107 phase: "error",
1108 status: "error",
1109 timestamp,
1110 inputs: Value::Null,
1111 outputs: Value::Null,
1112 error: error_json.clone(),
1113 redactions: Vec::new(),
1114 });
1115 let (_, end_offset) = self.transcript.lock().write(&event_json)?;
1116
1117 let mut state = self.state.lock();
1118 if let Some(entry) = state.nodes.get_mut(event.node_id) {
1119 entry.status = NodeStatus::Error;
1120 if let Some(started) = entry.start_instant.take() {
1121 entry.duration_ms = Some(started.elapsed().as_millis() as u64);
1122 }
1123 entry.transcript_error = Some(end_offset);
1124 entry.failure_code = Some("component-failed".to_string());
1125 entry.failure_message = Some(error_message.clone());
1126 entry.failure_details = Some(error_json);
1127 let log_path = self
1128 .directories
1129 .logs
1130 .join(format!("{}.stderr.log", sanitize_id(event.node_id)));
1131 if let Ok(mut file) = File::create(&log_path) {
1132 let _ = writeln!(file, "{error_message}");
1133 }
1134 entry.log_paths.push(log_path);
1135 }
1136 Ok(())
1137 }
1138}
1139
1140impl ExecutionObserver for RunRecorder {
1141 fn on_node_start(&self, event: &NodeEvent<'_>) {
1142 if let Err(err) = self.handle_node_start(event) {
1143 warn!(node = event.node_id, error = %err, "failed to record node start");
1144 }
1145 }
1146
1147 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value) {
1148 if let Err(err) = self.handle_node_end(event, output) {
1149 warn!(node = event.node_id, error = %err, "failed to record node end");
1150 }
1151 }
1152
1153 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn std::error::Error) {
1154 if let Err(err) = self.handle_node_error(event, error) {
1155 warn!(node = event.node_id, error = %err, "failed to record node error");
1156 }
1157 }
1158}
1159
1160impl MockEventSink for RunRecorder {
1161 fn on_mock_event(&self, capability: &str, provider: &str, payload: &Value) {
1162 if let Err(err) = self.write_mock_event(capability, provider, payload.clone()) {
1163 warn!(?capability, ?provider, error = %err, "failed to record mock event");
1164 }
1165 }
1166}
1167
1168#[derive(Default)]
1169struct RunRecorderState {
1170 nodes: BTreeMap<String, NodeExecutionRecord>,
1171 order: Vec<String>,
1172}
1173
1174#[derive(Clone)]
1175struct NodeExecutionRecord {
1176 component: String,
1177 status: NodeStatus,
1178 duration_ms: Option<u64>,
1179 transcript_start: Option<u64>,
1180 transcript_error: Option<u64>,
1181 log_paths: Vec<PathBuf>,
1182 failure_code: Option<String>,
1183 failure_message: Option<String>,
1184 failure_details: Option<Value>,
1185 start_instant: Option<Instant>,
1186}
1187
1188impl NodeExecutionRecord {
1189 fn new(component: String, _dirs: &RunDirectories) -> Self {
1190 Self {
1191 component,
1192 status: NodeStatus::Ok,
1193 duration_ms: None,
1194 transcript_start: None,
1195 transcript_error: None,
1196 log_paths: Vec::new(),
1197 failure_code: None,
1198 failure_message: None,
1199 failure_details: None,
1200 start_instant: None,
1201 }
1202 }
1203}
1204
1205struct TranscriptWriter {
1206 writer: BufWriter<File>,
1207 offset: u64,
1208 hook: Option<TranscriptHook>,
1209}
1210
1211impl TranscriptWriter {
1212 fn new(writer: BufWriter<File>, hook: Option<TranscriptHook>) -> Self {
1213 Self {
1214 writer,
1215 offset: 0,
1216 hook,
1217 }
1218 }
1219
1220 fn write(&mut self, value: &Value) -> Result<(u64, u64)> {
1221 let line = serde_json::to_vec(value)?;
1222 let start = self.offset;
1223 self.writer.write_all(&line)?;
1224 self.writer.write_all(b"\n")?;
1225 self.writer.flush()?;
1226 self.offset += line.len() as u64 + 1;
1227 if let Some(hook) = &self.hook {
1228 hook(value);
1229 }
1230 Ok((start, self.offset))
1231 }
1232}
1233
1234struct TranscriptEventArgs<'a> {
1235 profile: &'a ResolvedProfile,
1236 flow_id: &'a str,
1237 node_id: &'a str,
1238 component: &'a str,
1239 phase: &'a str,
1240 status: &'a str,
1241 timestamp: OffsetDateTime,
1242 inputs: Value,
1243 outputs: Value,
1244 error: Value,
1245 redactions: Vec<String>,
1246}
1247
1248fn build_transcript_event(args: TranscriptEventArgs<'_>) -> Value {
1249 let ts = args
1250 .timestamp
1251 .format(&Rfc3339)
1252 .unwrap_or_else(|_| args.timestamp.to_string());
1253 json!({
1254 "ts": ts,
1255 "session_id": args.profile.session_id.as_str(),
1256 "flow_id": args.flow_id,
1257 "node_id": args.node_id,
1258 "component": args.component,
1259 "phase": args.phase,
1260 "status": args.status,
1261 "inputs": args.inputs,
1262 "outputs": args.outputs,
1263 "error": args.error,
1264 "metrics": {
1265 "duration_ms": null,
1266 "cpu_time_ms": null,
1267 "mem_peak_bytes": null,
1268 },
1269 "schema_id": Value::Null,
1270 "defaults_applied": Value::Array(Vec::new()),
1271 "redactions": args.redactions,
1272 })
1273}
1274
1275fn redact_value(value: &Value, base: &str) -> (Value, Vec<String>) {
1276 let mut paths = Vec::new();
1277 let redacted = redact_recursive(value, base, &mut paths);
1278 (redacted, paths)
1279}
1280
1281fn redact_recursive(value: &Value, path: &str, acc: &mut Vec<String>) -> Value {
1282 match value {
1283 Value::Object(map) => {
1284 let mut new_map = JsonMap::new();
1285 for (key, val) in map {
1286 let child_path = format!("{path}.{key}");
1287 if is_sensitive_key(key) {
1288 acc.push(child_path);
1289 new_map.insert(key.clone(), Value::String("__REDACTED__".into()));
1290 } else {
1291 new_map.insert(key.clone(), redact_recursive(val, &child_path, acc));
1292 }
1293 }
1294 Value::Object(new_map)
1295 }
1296 Value::Array(items) => {
1297 let mut new_items = Vec::new();
1298 for (idx, item) in items.iter().enumerate() {
1299 let child_path = format!("{path}[{idx}]");
1300 new_items.push(redact_recursive(item, &child_path, acc));
1301 }
1302 Value::Array(new_items)
1303 }
1304 other => other.clone(),
1305 }
1306}
1307
1308fn is_sensitive_key(key: &str) -> bool {
1309 let lower = key.to_ascii_lowercase();
1310 const MARKERS: [&str; 5] = ["secret", "token", "password", "authorization", "cookie"];
1311 MARKERS.iter().any(|marker| lower.contains(marker))
1312}
1313
1314fn sanitize_id(value: &str) -> String {
1315 value
1316 .chars()
1317 .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
1318 .collect()
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323 use super::*;
1324 use std::sync::Arc;
1325
1326 use async_trait::async_trait;
1327 use greentic_runner_host::secrets::DynSecretsManager;
1328 use greentic_secrets_lib::{SecretError, SecretsManager};
1329 use serde_json::json;
1330 use tempfile::TempDir;
1331
1332 fn sample_metadata() -> PackMetadata {
1333 PackMetadata {
1334 pack_id: "pack.demo".into(),
1335 version: "1.0.0".into(),
1336 entry_flows: vec!["entry.flow".into()],
1337 secret_requirements: Vec::new(),
1338 }
1339 }
1340
1341 fn sample_profile() -> ResolvedProfile {
1342 ResolvedProfile {
1343 tenant_id: "tenant".into(),
1344 team_id: "team".into(),
1345 user_id: "user".into(),
1346 session_id: "session-1".into(),
1347 provider_id: PROVIDER_ID_DEV.into(),
1348 max_node_wall_time_ms: 1000,
1349 max_run_wall_time_ms: 2000,
1350 }
1351 }
1352
1353 struct NoopSecretsManager;
1354
1355 #[async_trait]
1356 impl SecretsManager for NoopSecretsManager {
1357 async fn read(&self, path: &str) -> Result<Vec<u8>, SecretError> {
1358 Err(SecretError::NotFound(path.to_string()))
1359 }
1360
1361 async fn write(&self, _path: &str, _bytes: &[u8]) -> Result<(), SecretError> {
1362 Ok(())
1363 }
1364
1365 async fn delete(&self, _path: &str) -> Result<(), SecretError> {
1366 Ok(())
1367 }
1368 }
1369
1370 #[test]
1371 fn resolve_entry_flow_prefers_override_then_metadata_then_flows() {
1372 let metadata = sample_metadata();
1373 let flows = vec![FlowDescriptor {
1374 id: "fallback.flow".into(),
1375 flow_type: "message".into(),
1376 pack_id: "pack.demo".into(),
1377 profile: "default".into(),
1378 version: "1.0.0".into(),
1379 description: None,
1380 }];
1381
1382 assert_eq!(
1383 resolve_entry_flow(Some("manual.flow".into()), &metadata, &flows).unwrap(),
1384 "manual.flow"
1385 );
1386 assert_eq!(
1387 resolve_entry_flow(None, &metadata, &flows).unwrap(),
1388 "entry.flow"
1389 );
1390 assert!(
1391 resolve_entry_flow(
1392 None,
1393 &PackMetadata {
1394 entry_flows: Vec::new(),
1395 ..metadata
1396 },
1397 &flows
1398 )
1399 .is_ok()
1400 );
1401 }
1402
1403 #[test]
1404 fn normalize_pack_path_accepts_relative_files_inside_cwd() {
1405 let temp = tempfile::tempdir_in(std::env::current_dir().expect("cwd")).expect("tempdir");
1406 let pack = temp.path().join("demo.gtpack");
1407 fs::write(&pack, b"pack").expect("pack file");
1408 let relative = pack
1409 .strip_prefix(std::env::current_dir().expect("cwd"))
1410 .expect("relative")
1411 .to_path_buf();
1412
1413 let normalized = normalize_pack_path(&relative).expect("normalized path");
1414
1415 assert_eq!(normalized, pack.canonicalize().expect("canonical"));
1416 }
1417
1418 #[test]
1419 fn prepare_run_dirs_creates_logs_and_resolved_subdirs() {
1420 let temp = TempDir::new().expect("tempdir");
1421 let root = temp.path().join("artifacts");
1422 let dirs = prepare_run_dirs(Some(root.clone())).expect("run dirs");
1423
1424 assert_eq!(dirs.root, root);
1425 assert!(dirs.logs.is_dir());
1426 assert!(dirs.resolved.is_dir());
1427 }
1428
1429 #[test]
1430 fn redact_value_masks_nested_sensitive_fields() {
1431 let (redacted, paths) = redact_value(
1432 &json!({
1433 "token": "abc",
1434 "nested": { "authorization_header": "secret" },
1435 "items": [{ "password_hint": "nope" }]
1436 }),
1437 "$",
1438 );
1439
1440 assert_eq!(redacted["token"], "__REDACTED__");
1441 assert_eq!(redacted["nested"]["authorization_header"], "__REDACTED__");
1442 assert_eq!(redacted["items"][0]["password_hint"], "__REDACTED__");
1443 assert!(paths.contains(&"$.token".to_string()));
1444 assert!(paths.contains(&"$.nested.authorization_header".to_string()));
1445 }
1446
1447 #[test]
1448 fn helper_functions_preserve_ids_and_transcript_shape() {
1449 assert_eq!(sanitize_id("flow demo/1"), "flow-demo-1");
1450 assert!(is_signature_error("Signature verification failed"));
1451 assert_eq!(
1452 to_reader_policy(SigningPolicy::DevOk),
1453 greentic_pack::reader::SigningPolicy::DevOk
1454 );
1455
1456 let profile = sample_profile();
1457 let event = build_transcript_event(TranscriptEventArgs {
1458 profile: &profile,
1459 flow_id: "flow.demo",
1460 node_id: "node.demo",
1461 component: "component.demo",
1462 phase: "invoke",
1463 status: "ok",
1464 timestamp: OffsetDateTime::now_utc(),
1465 inputs: json!({"input": true}),
1466 outputs: json!({"output": true}),
1467 error: Value::Null,
1468 redactions: vec!["$.token".into()],
1469 });
1470 assert_eq!(event["session_id"], "session-1");
1471 assert_eq!(event["flow_id"], "flow.demo");
1472 assert_eq!(event["redactions"][0], "$.token");
1473 }
1474
1475 #[test]
1476 fn resolve_profile_uses_context_overrides() {
1477 let profile = resolve_profile(
1478 &Profile::Dev(DevProfile::default()),
1479 &TenantContext {
1480 tenant_id: Some("tenant-x".into()),
1481 team_id: Some("team-x".into()),
1482 user_id: Some("user-x".into()),
1483 session_id: Some("session-x".into()),
1484 },
1485 );
1486
1487 assert_eq!(profile.tenant_id, "tenant-x");
1488 assert_eq!(profile.team_id, "team-x");
1489 assert_eq!(profile.user_id, "user-x");
1490 assert_eq!(profile.session_id, "session-x");
1491 assert_eq!(profile.provider_id, PROVIDER_ID_DEV);
1492 }
1493
1494 #[test]
1495 fn desktop_defaults_and_runner_builder_preserve_overrides() {
1496 let defaults = desktop_defaults();
1497 assert_eq!(defaults.signing, SigningPolicy::DevOk);
1498 assert_eq!(defaults.ctx.tenant_id.as_deref(), Some("local-dev"));
1499
1500 let runner = Runner::new()
1501 .with_mocks(MocksConfig {
1502 telemetry: Some(TelemetryMock),
1503 ..MocksConfig::default()
1504 })
1505 .configure(|opts| {
1506 opts.entry_flow = Some("entry.flow".into());
1507 opts.input = json!({"demo": true});
1508 });
1509
1510 let rendered = format!("{:?}", runner.base);
1511 assert!(rendered.contains("entry.flow"));
1512 assert!(runner.base.mocks.telemetry.is_some());
1513 assert_eq!(runner.base.input, json!({"demo": true}));
1514 }
1515
1516 #[test]
1517 fn resolve_entry_flow_errors_when_pack_has_no_flows() {
1518 let metadata = PackMetadata {
1519 entry_flows: Vec::new(),
1520 ..sample_metadata()
1521 };
1522 assert!(resolve_entry_flow(None, &metadata, &[]).is_err());
1523 }
1524
1525 #[test]
1526 fn build_host_config_enables_local_dev_defaults() {
1527 let temp = TempDir::new().expect("tempdir");
1528 let dirs = prepare_run_dirs(Some(temp.path().join("run"))).expect("dirs");
1529 let config = build_host_config(&sample_profile(), &dirs, false);
1530
1531 assert_eq!(config.tenant, "tenant");
1532 assert!(!config.http_enabled);
1533 assert!(config.secrets_policy.is_allowed("any.secret"));
1534 assert!(
1535 config
1536 .operator_policy
1537 .allows_provider(Some("provider"), "provider")
1538 );
1539 }
1540
1541 #[test]
1542 fn build_host_config_honours_http_enabled_flag() {
1543 let temp = TempDir::new().expect("tempdir");
1544 let dirs = prepare_run_dirs(Some(temp.path().join("run"))).expect("dirs");
1545 let enabled = build_host_config(&sample_profile(), &dirs, true);
1546 assert!(
1547 enabled.http_enabled,
1548 "http_enabled should propagate when derived from manifest"
1549 );
1550
1551 let disabled = build_host_config(&sample_profile(), &dirs, false);
1552 assert!(!disabled.http_enabled);
1553 }
1554
1555 #[test]
1556 fn derive_http_enabled_true_when_any_component_declares_http_client() {
1557 assert!(derive_http_enabled(&[false, true], false));
1558 }
1559
1560 #[test]
1561 fn derive_http_enabled_false_when_no_component_declares_http_client() {
1562 assert!(!derive_http_enabled(&[false, false], false));
1563 }
1564
1565 #[test]
1566 fn derive_http_enabled_false_for_empty_components() {
1567 assert!(!derive_http_enabled(&[], false));
1568 }
1569
1570 #[test]
1571 fn derive_http_enabled_kill_switch_forces_disable() {
1572 assert!(
1573 !derive_http_enabled(&[true], true),
1574 "kill-switch must override positive manifest declaration"
1575 );
1576 }
1577
1578 #[test]
1579 fn parse_kill_switch_matches_truthy_and_falsy_inputs() {
1580 let cases: &[(Option<&str>, bool)] = &[
1582 (Some("1"), true),
1583 (Some("true"), true),
1584 (Some("YES"), true),
1585 (Some(" on "), true),
1586 (Some("On"), true),
1587 (Some("0"), false),
1588 (Some("false"), false),
1589 (Some("bogus"), false),
1590 (Some(""), false),
1591 (None, false),
1592 ];
1593
1594 for (raw, expected) in cases {
1595 assert_eq!(
1596 parse_kill_switch(*raw),
1597 *expected,
1598 "parse_kill_switch({raw:?}) should be {expected}"
1599 );
1600 }
1601 }
1602
1603 #[test]
1604 fn redact_value_leaves_non_sensitive_payloads_unchanged() {
1605 let original = json!({"public": {"value": 1}});
1606 let (redacted, paths) = redact_value(&original, "$");
1607 assert_eq!(redacted, original);
1608 assert!(paths.is_empty());
1609 }
1610
1611 #[test]
1618 fn session_snapshot_round_trip_through_disk() {
1619 let tmp = tempfile::tempdir().expect("create temp dir");
1620 let snapshot_path = tmp.path().join("session-1.snapshot.json");
1621
1622 let snapshot_json = json!({
1623 "pack_id": "pack.demo",
1624 "flow_id": "flow.welcome",
1625 "next_node": "card-confirm",
1626 "state": {
1627 "entry": { "metadata": { "action": "confirm" } },
1628 "input": { "metadata": { "action": "confirm" } },
1629 "nodes": {},
1630 "egress": [],
1631 "redirect_count": 0
1632 }
1633 });
1634 let original: greentic_runner_host::runner::engine::FlowSnapshot =
1635 serde_json::from_value(snapshot_json.clone()).expect("deserialize seed snapshot");
1636
1637 save_session_snapshot(&snapshot_path, &original).expect("save snapshot");
1638 let reloaded = load_session_snapshot(&snapshot_path).expect("load snapshot");
1639
1640 let original_value = serde_json::to_value(&original).unwrap();
1641 let reloaded_value = serde_json::to_value(&reloaded).unwrap();
1642 assert_eq!(
1643 original_value, reloaded_value,
1644 "snapshot must round-trip byte-for-byte through disk"
1645 );
1646 }
1647
1648 #[test]
1649 fn session_snapshot_file_sanitizes_session_id() {
1650 let tmp = tempfile::tempdir().expect("create temp dir");
1651 let path = session_snapshot_file(Some(tmp.path()), "tenant/team:user-1.session")
1652 .expect("snapshot path");
1653 let file_name = path
1654 .file_name()
1655 .and_then(|n| n.to_str())
1656 .expect("file name");
1657 assert_eq!(file_name, "tenant_team:user-1.session.snapshot.json");
1659 assert!(session_snapshot_file(None, "any").is_none());
1660 assert!(session_snapshot_file(Some(tmp.path()), "").is_none());
1661 }
1662
1663 #[test]
1664 fn resolve_secrets_manager_prefers_injected_override_even_in_prod() {
1665 let previous_env = std::env::var("GREENTIC_ENV").ok();
1666 unsafe {
1667 std::env::set_var("GREENTIC_ENV", "prod");
1668 }
1669
1670 let without_override = resolve_secrets_manager(&RunOptions::default());
1671 assert!(
1672 without_override.is_err(),
1673 "expected prod desktop default backend to be rejected"
1674 );
1675
1676 let injected: DynSecretsManager = Arc::new(NoopSecretsManager);
1677 let with_override = resolve_secrets_manager(&RunOptions {
1678 secrets_manager: Some(Arc::clone(&injected)),
1679 ..RunOptions::default()
1680 });
1681 assert!(
1682 with_override.is_ok(),
1683 "expected injected secrets manager to bypass desktop default backend"
1684 );
1685
1686 match previous_env {
1687 Some(value) => unsafe {
1688 std::env::set_var("GREENTIC_ENV", value);
1689 },
1690 None => unsafe {
1691 std::env::remove_var("GREENTIC_ENV");
1692 },
1693 }
1694 }
1695}