Skip to main content

mabi_cli/commands/
run.rs

1use async_trait::async_trait;
2use serde::Serialize;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::Ordering;
5use std::time::Duration;
6
7use mabi_runtime::{RuntimeExtensions, RuntimeSession};
8use mabi_scenario::prelude::{
9    ExecutorConfig, ExecutorMetrics, ExecutorState, ScenarioExecutor, ScenarioValidator,
10};
11use mabi_scenario::{PlayerConfig, Scenario, ScenarioParser};
12
13use crate::context::CliContext;
14use crate::error::{CliError, CliResult};
15use crate::output::OutputFormat;
16use crate::runner::{Command, CommandOutput};
17use crate::runtime_registry::workspace_protocol_registry;
18
19#[derive(Debug, Clone, Serialize)]
20pub struct ScenarioRunSummary {
21    path: String,
22    name: String,
23    services: usize,
24    devices: usize,
25    state: String,
26    writes_attempted: u64,
27    writes_successful: u64,
28    writes_failed: u64,
29    success_rate: f64,
30}
31
32pub async fn run_scenario_on_session(
33    ctx: &mut CliContext,
34    path: &Path,
35    scenario: Scenario,
36    session: &RuntimeSession,
37    time_scale: f64,
38    duration: Option<Duration>,
39) -> CliResult<(ScenarioRunSummary, ExecutorMetrics, ExecutorState)> {
40    let scenario_name = scenario.name.clone();
41    let player_config = PlayerConfig {
42        time_scale: if (time_scale - 1.0).abs() > f64::EPSILON {
43            Some(time_scale)
44        } else {
45            None
46        },
47        max_duration: duration,
48    };
49
50    let mut executor = ScenarioExecutor::new_with_player_config(
51        scenario,
52        ExecutorConfig::default(),
53        player_config,
54    );
55    for (device_id, port) in session.devices().entries() {
56        executor.register_device(device_id, port);
57    }
58    executor
59        .validate_devices()
60        .map_err(|error| CliError::InvalidScenario {
61            message: error.to_string(),
62        })?;
63
64    let shutdown = ctx.shutdown_signal();
65    let stop_signal = executor.stop_signal();
66    let executor_task = tokio::spawn(async move {
67        let result = executor.run().await;
68        let metrics = executor.metrics();
69        let state = executor.state();
70        (result, metrics, state)
71    });
72    tokio::pin!(executor_task);
73
74    let (result, metrics, state) = tokio::select! {
75        result = &mut executor_task => result.map_err(|error| CliError::ExecutionFailed {
76            message: format!("scenario executor task failed: {}", error),
77        })?,
78        _ = shutdown.notified() => {
79            stop_signal.store(true, Ordering::SeqCst);
80            executor_task.await.map_err(|error| CliError::ExecutionFailed {
81                message: format!("scenario executor shutdown failed: {}", error),
82            })?
83        }
84    };
85
86    result.map_err(|error| CliError::ExecutionFailed {
87        message: error.to_string(),
88    })?;
89
90    let summary = ScenarioRunSummary {
91        path: path.display().to_string(),
92        name: scenario_name,
93        services: session.handles().len(),
94        devices: session.devices().len(),
95        state: format!("{:?}", state).to_lowercase(),
96        writes_attempted: metrics.writes_attempted,
97        writes_successful: metrics.writes_successful,
98        writes_failed: metrics.writes_failed,
99        success_rate: metrics.success_rate(),
100    };
101
102    Ok((summary, metrics, state))
103}
104
105/// Run command for executing scenarios.
106pub struct RunCommand {
107    scenario_path: PathBuf,
108    time_scale: f64,
109    duration: Option<Duration>,
110    dry_run: bool,
111    readiness_timeout: Duration,
112}
113
114impl RunCommand {
115    pub fn new(scenario_path: PathBuf) -> Self {
116        Self {
117            scenario_path,
118            time_scale: 1.0,
119            duration: None,
120            dry_run: false,
121            readiness_timeout: Duration::from_secs(5),
122        }
123    }
124
125    pub fn with_time_scale(mut self, scale: f64) -> Self {
126        self.time_scale = scale;
127        self
128    }
129
130    pub fn with_duration(mut self, duration: Duration) -> Self {
131        self.duration = Some(duration);
132        self
133    }
134
135    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
136        self.dry_run = dry_run;
137        self
138    }
139
140    pub fn with_readiness_timeout(mut self, timeout: Duration) -> Self {
141        self.readiness_timeout = timeout;
142        self
143    }
144
145    async fn load_scenario(&self, ctx: &CliContext) -> CliResult<(PathBuf, Scenario)> {
146        let path = ctx.resolve_path(&self.scenario_path);
147        if !path.exists() {
148            return Err(CliError::ScenarioNotFound { path });
149        }
150
151        let scenario =
152            ScenarioParser::load(&path)
153                .await
154                .map_err(|error| CliError::InvalidScenario {
155                    message: error.to_string(),
156                })?;
157        ScenarioParser::validate(&scenario).map_err(|error| CliError::InvalidScenario {
158            message: error.to_string(),
159        })?;
160
161        let validation = ScenarioValidator::new().validate(&scenario);
162        if !validation.is_valid() {
163            return Err(CliError::validation_failed(validation.errors().iter().map(
164                |issue| format!("{} [{:?}] {}", issue.path, issue.code, issue.message),
165            )));
166        }
167
168        Ok((path, scenario))
169    }
170
171    fn render_summary(
172        &self,
173        ctx: &CliContext,
174        summary: &ScenarioRunSummary,
175        metrics: &ExecutorMetrics,
176    ) -> CliResult<()> {
177        if matches!(
178            ctx.output().format(),
179            OutputFormat::Json | OutputFormat::Yaml | OutputFormat::Compact
180        ) {
181            ctx.output().write(summary)?;
182            return Ok(());
183        }
184
185        ctx.output().header("Scenario Run");
186        ctx.output().kv("Path", &summary.path);
187        ctx.output().kv("Name", &summary.name);
188        ctx.output().kv("Services", summary.services);
189        ctx.output().kv("Devices", summary.devices);
190        ctx.output().kv("State", &summary.state);
191        ctx.output()
192            .kv("Writes Attempted", summary.writes_attempted);
193        ctx.output()
194            .kv("Writes Successful", summary.writes_successful);
195        ctx.output().kv("Writes Failed", summary.writes_failed);
196        ctx.output()
197            .kv("Success Rate", format!("{:.2}%", summary.success_rate));
198        if metrics.execution_time > Duration::ZERO {
199            ctx.output()
200                .kv("Execution Time", format!("{:?}", metrics.execution_time));
201        }
202        Ok(())
203    }
204
205    async fn run_loaded_scenario(
206        &self,
207        ctx: &mut CliContext,
208        path: &Path,
209        mut scenario: Scenario,
210    ) -> CliResult<CommandOutput> {
211        let session_spec = scenario
212            .runtime
213            .clone()
214            .ok_or_else(|| CliError::InvalidScenario {
215                message: "scenario execution requires a top-level runtime block".into(),
216            })?;
217        if session_spec.services.is_empty() {
218            return Err(CliError::InvalidScenario {
219                message: "scenario runtime.services must contain at least one service".into(),
220            });
221        }
222
223        let registry = workspace_protocol_registry();
224        let session = RuntimeSession::new(
225            session_spec.clone(),
226            &registry,
227            RuntimeExtensions::default(),
228        )
229        .await?;
230        session.start(self.readiness_timeout).await?;
231
232        if !ctx.is_quiet() {
233            ctx.output().info(format!(
234                "Runtime session started with {} service(s)",
235                session_spec.services.len()
236            ));
237        }
238
239        scenario.runtime = Some(session_spec.clone());
240
241        let run_result = run_scenario_on_session(
242            ctx,
243            path,
244            scenario,
245            &session,
246            self.time_scale,
247            self.duration,
248        )
249        .await;
250        let stop_result = session.stop().await;
251        let (summary, metrics, state) = run_result?;
252        stop_result?;
253
254        self.render_summary(ctx, &summary, &metrics)?;
255
256        match state {
257            ExecutorState::Completed | ExecutorState::Idle => Ok(CommandOutput::quiet_success()),
258            ExecutorState::Error => Err(CliError::ExecutionFailed {
259                message: "scenario executor finished in error state".into(),
260            }),
261            ExecutorState::Running | ExecutorState::Paused => Err(CliError::ExecutionFailed {
262                message: "scenario executor did not settle into a terminal state".into(),
263            }),
264        }
265    }
266}
267
268#[async_trait]
269impl Command for RunCommand {
270    fn name(&self) -> &str {
271        "run"
272    }
273
274    fn description(&self) -> &str {
275        "Run a simulation scenario"
276    }
277
278    fn supports_shutdown(&self) -> bool {
279        true
280    }
281
282    fn validate(&self) -> CliResult<()> {
283        if self.time_scale <= 0.0 {
284            return Err(CliError::InvalidConfig {
285                message: "Time scale must be positive".into(),
286            });
287        }
288        Ok(())
289    }
290
291    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
292        let (path, scenario) = self.load_scenario(ctx).await?;
293
294        if self.dry_run {
295            let summary = ScenarioRunSummary {
296                path: path.display().to_string(),
297                name: scenario.name.clone(),
298                services: scenario
299                    .runtime
300                    .as_ref()
301                    .map(|spec| spec.services.len())
302                    .unwrap_or(0),
303                devices: scenario.devices.len(),
304                state: "validated".into(),
305                writes_attempted: 0,
306                writes_successful: 0,
307                writes_failed: 0,
308                success_rate: 100.0,
309            };
310            self.render_summary(ctx, &summary, &ExecutorMetrics::default())?;
311            if !ctx.is_quiet() {
312                ctx.output().success("Scenario validation passed");
313            }
314            return Ok(CommandOutput::quiet_success());
315        }
316
317        self.run_loaded_scenario(ctx, &path, scenario).await
318    }
319}