1use async_trait::async_trait;
2use serde::Serialize;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6
7use mabi_runtime::{RuntimeExtensions, RuntimeSession};
8use mabi_scenario::prelude::{
9 ExecutorConfig, ExecutorMetrics, ExecutorState, ScenarioExecutor, ScenarioValidator,
10};
11use mabi_scenario::{PlayerConfig, Scenario, ScenarioParser};
12
13use crate::context::CliContext;
14use crate::error::{CliError, CliResult};
15use crate::output::OutputFormat;
16use crate::runner::{Command, CommandOutput};
17use crate::runtime_registry::workspace_protocol_registry;
18
19#[derive(Debug, Clone, Serialize)]
20pub struct ScenarioRunSummary {
21 path: String,
22 name: String,
23 services: usize,
24 devices: usize,
25 state: String,
26 writes_attempted: u64,
27 writes_successful: u64,
28 writes_failed: u64,
29 success_rate: f64,
30}
31
32pub async fn run_scenario_on_session(
33 ctx: &mut CliContext,
34 path: &Path,
35 scenario: Scenario,
36 session: &RuntimeSession,
37 time_scale: f64,
38 duration: Option<Duration>,
39) -> CliResult<(ScenarioRunSummary, ExecutorMetrics, ExecutorState)> {
40 let scenario_name = scenario.name.clone();
41 let player_config = PlayerConfig {
42 time_scale: if (time_scale - 1.0).abs() > f64::EPSILON {
43 Some(time_scale)
44 } else {
45 None
46 },
47 max_duration: duration,
48 };
49
50 let mut executor = ScenarioExecutor::new_with_player_config(
51 scenario,
52 ExecutorConfig::default(),
53 player_config,
54 );
55 for (device_id, port) in session.devices().entries() {
56 executor.register_device(device_id, port);
57 }
58 executor
59 .validate_devices()
60 .map_err(|error| CliError::InvalidScenario {
61 message: error.to_string(),
62 })?;
63
64 let shutdown = ctx.shutdown_signal();
65 let stop_signal = executor.stop_signal();
66 let executor_task = tokio::spawn(async move {
67 let result = executor.run().await;
68 let metrics = executor.metrics();
69 let state = executor.state();
70 (result, metrics, state)
71 });
72 tokio::pin!(executor_task);
73
74 let (result, metrics, state) = tokio::select! {
75 result = &mut executor_task => result.map_err(|error| CliError::ExecutionFailed {
76 message: format!("scenario executor task failed: {}", error),
77 })?,
78 _ = shutdown.notified() => {
79 stop_signal.store(true, Ordering::SeqCst);
80 executor_task.await.map_err(|error| CliError::ExecutionFailed {
81 message: format!("scenario executor shutdown failed: {}", error),
82 })?
83 }
84 };
85
86 result.map_err(|error| CliError::ExecutionFailed {
87 message: error.to_string(),
88 })?;
89
90 let summary = ScenarioRunSummary {
91 path: path.display().to_string(),
92 name: scenario_name,
93 services: session.handles().len(),
94 devices: session.devices().len(),
95 state: format!("{:?}", state).to_lowercase(),
96 writes_attempted: metrics.writes_attempted,
97 writes_successful: metrics.writes_successful,
98 writes_failed: metrics.writes_failed,
99 success_rate: metrics.success_rate(),
100 };
101
102 Ok((summary, metrics, state))
103}
104
105pub struct RunCommand {
107 scenario_path: PathBuf,
108 time_scale: f64,
109 duration: Option<Duration>,
110 dry_run: bool,
111 readiness_timeout: Duration,
112}
113
114impl RunCommand {
115 pub fn new(scenario_path: PathBuf) -> Self {
116 Self {
117 scenario_path,
118 time_scale: 1.0,
119 duration: None,
120 dry_run: false,
121 readiness_timeout: Duration::from_secs(5),
122 }
123 }
124
125 pub fn with_time_scale(mut self, scale: f64) -> Self {
126 self.time_scale = scale;
127 self
128 }
129
130 pub fn with_duration(mut self, duration: Duration) -> Self {
131 self.duration = Some(duration);
132 self
133 }
134
135 pub fn with_dry_run(mut self, dry_run: bool) -> Self {
136 self.dry_run = dry_run;
137 self
138 }
139
140 pub fn with_readiness_timeout(mut self, timeout: Duration) -> Self {
141 self.readiness_timeout = timeout;
142 self
143 }
144
145 async fn load_scenario(&self, ctx: &CliContext) -> CliResult<(PathBuf, Scenario)> {
146 let path = ctx.resolve_path(&self.scenario_path);
147 if !path.exists() {
148 return Err(CliError::ScenarioNotFound { path });
149 }
150
151 let scenario =
152 ScenarioParser::load(&path)
153 .await
154 .map_err(|error| CliError::InvalidScenario {
155 message: error.to_string(),
156 })?;
157 ScenarioParser::validate(&scenario).map_err(|error| CliError::InvalidScenario {
158 message: error.to_string(),
159 })?;
160
161 let validation = ScenarioValidator::new().validate(&scenario);
162 if !validation.is_valid() {
163 return Err(CliError::validation_failed(validation.errors().iter().map(
164 |issue| format!("{} [{:?}] {}", issue.path, issue.code, issue.message),
165 )));
166 }
167
168 Ok((path, scenario))
169 }
170
171 fn render_summary(
172 &self,
173 ctx: &CliContext,
174 summary: &ScenarioRunSummary,
175 metrics: &ExecutorMetrics,
176 ) -> CliResult<()> {
177 if matches!(
178 ctx.output().format(),
179 OutputFormat::Json | OutputFormat::Yaml | OutputFormat::Compact
180 ) {
181 ctx.output().write(summary)?;
182 return Ok(());
183 }
184
185 ctx.output().header("Scenario Run");
186 ctx.output().kv("Path", &summary.path);
187 ctx.output().kv("Name", &summary.name);
188 ctx.output().kv("Services", summary.services);
189 ctx.output().kv("Devices", summary.devices);
190 ctx.output().kv("State", &summary.state);
191 ctx.output()
192 .kv("Writes Attempted", summary.writes_attempted);
193 ctx.output()
194 .kv("Writes Successful", summary.writes_successful);
195 ctx.output().kv("Writes Failed", summary.writes_failed);
196 ctx.output()
197 .kv("Success Rate", format!("{:.2}%", summary.success_rate));
198 if metrics.execution_time > Duration::ZERO {
199 ctx.output()
200 .kv("Execution Time", format!("{:?}", metrics.execution_time));
201 }
202 Ok(())
203 }
204
205 async fn run_loaded_scenario(
206 &self,
207 ctx: &mut CliContext,
208 path: &Path,
209 mut scenario: Scenario,
210 ) -> CliResult<CommandOutput> {
211 let session_spec = scenario
212 .runtime
213 .clone()
214 .ok_or_else(|| CliError::InvalidScenario {
215 message: "scenario execution requires a top-level runtime block".into(),
216 })?;
217 if session_spec.services.is_empty() {
218 return Err(CliError::InvalidScenario {
219 message: "scenario runtime.services must contain at least one service".into(),
220 });
221 }
222
223 let registry = workspace_protocol_registry();
224 let session = RuntimeSession::new(
225 session_spec.clone(),
226 ®istry,
227 RuntimeExtensions::default(),
228 )
229 .await?;
230 session.start(self.readiness_timeout).await?;
231
232 if !ctx.is_quiet() {
233 ctx.output().info(format!(
234 "Runtime session started with {} service(s)",
235 session_spec.services.len()
236 ));
237 }
238
239 scenario.runtime = Some(session_spec.clone());
240
241 let run_result = run_scenario_on_session(
242 ctx,
243 path,
244 scenario,
245 &session,
246 self.time_scale,
247 self.duration,
248 )
249 .await;
250 let stop_result = session.stop().await;
251 let (summary, metrics, state) = run_result?;
252 stop_result?;
253
254 self.render_summary(ctx, &summary, &metrics)?;
255
256 match state {
257 ExecutorState::Completed | ExecutorState::Idle => Ok(CommandOutput::quiet_success()),
258 ExecutorState::Error => Err(CliError::ExecutionFailed {
259 message: "scenario executor finished in error state".into(),
260 }),
261 ExecutorState::Running | ExecutorState::Paused => Err(CliError::ExecutionFailed {
262 message: "scenario executor did not settle into a terminal state".into(),
263 }),
264 }
265 }
266}
267
268#[async_trait]
269impl Command for RunCommand {
270 fn name(&self) -> &str {
271 "run"
272 }
273
274 fn description(&self) -> &str {
275 "Run a simulation scenario"
276 }
277
278 fn supports_shutdown(&self) -> bool {
279 true
280 }
281
282 fn validate(&self) -> CliResult<()> {
283 if self.time_scale <= 0.0 {
284 return Err(CliError::InvalidConfig {
285 message: "Time scale must be positive".into(),
286 });
287 }
288 Ok(())
289 }
290
291 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
292 let (path, scenario) = self.load_scenario(ctx).await?;
293
294 if self.dry_run {
295 let summary = ScenarioRunSummary {
296 path: path.display().to_string(),
297 name: scenario.name.clone(),
298 services: scenario
299 .runtime
300 .as_ref()
301 .map(|spec| spec.services.len())
302 .unwrap_or(0),
303 devices: scenario.devices.len(),
304 state: "validated".into(),
305 writes_attempted: 0,
306 writes_successful: 0,
307 writes_failed: 0,
308 success_rate: 100.0,
309 };
310 self.render_summary(ctx, &summary, &ExecutorMetrics::default())?;
311 if !ctx.is_quiet() {
312 ctx.output().success("Scenario validation passed");
313 }
314 return Ok(CommandOutput::quiet_success());
315 }
316
317 self.run_loaded_scenario(ctx, &path, scenario).await
318 }
319}