Skip to main content

torvyn_cli/commands/
run.rs

1//! `torvyn run` — execute a pipeline locally.
2//!
3//! Instantiates the Torvyn host runtime, loads components, and runs
4//! the pipeline. Displays real-time throughput and summary on exit.
5
6use crate::cli::RunArgs;
7use crate::errors::CliError;
8use crate::output::terminal;
9use crate::output::{CommandResult, HumanRenderable, OutputContext};
10use serde::Serialize;
11use std::time::{Duration, Instant};
12
13/// Result of `torvyn run`.
14#[derive(Debug, Serialize)]
15pub struct RunResult {
16    /// Total execution duration in seconds.
17    pub duration_secs: f64,
18    /// Total elements processed.
19    pub elements_processed: u64,
20    /// Average throughput in elements/second.
21    pub throughput_elem_per_sec: f64,
22    /// Total errors encountered.
23    pub error_count: u64,
24    /// Peak memory usage in bytes (across all components).
25    pub peak_memory_bytes: u64,
26    /// Flow name that was executed.
27    pub flow_name: String,
28    /// Number of components in the flow.
29    pub component_count: usize,
30    /// Number of edges in the flow.
31    pub edge_count: usize,
32}
33
34impl HumanRenderable for RunResult {
35    fn render_human(&self, ctx: &OutputContext) {
36        terminal::print_header(ctx, "Summary");
37        terminal::print_kv(ctx, "Duration", &format!("{:.2}s", self.duration_secs));
38        terminal::print_kv(ctx, "Elements", &format!("{}", self.elements_processed));
39        terminal::print_kv(
40            ctx,
41            "Throughput",
42            &format!("{:.0} elem/s", self.throughput_elem_per_sec),
43        );
44        terminal::print_kv(ctx, "Errors", &format!("{}", self.error_count));
45        terminal::print_kv(
46            ctx,
47            "Peak memory",
48            &terminal::format_bytes(self.peak_memory_bytes),
49        );
50    }
51}
52
53/// Execute the `torvyn run` command.
54///
55/// COLD PATH (setup), then delegates to HOT PATH runtime.
56///
57/// # Preconditions
58/// - Manifest file must exist.
59/// - Components must be compiled (or builds are triggered implicitly).
60///
61/// # Postconditions
62/// - Pipeline runs to completion (or until limit/timeout/Ctrl+C).
63/// - Returns summary statistics.
64///
65/// # Errors
66/// - [`CliError::Config`] if manifest is missing or invalid.
67/// - [`CliError::Runtime`] if pipeline execution fails.
68pub async fn execute(
69    args: &RunArgs,
70    ctx: &OutputContext,
71) -> Result<CommandResult<RunResult>, CliError> {
72    let manifest_path = &args.manifest;
73
74    if !manifest_path.exists() {
75        return Err(CliError::Config {
76            detail: format!("Manifest not found: {}", manifest_path.display()),
77            file: Some(manifest_path.display().to_string()),
78            suggestion: "Run this command from a Torvyn project directory.".into(),
79        });
80    }
81
82    // Parse manifest
83    let manifest_content = std::fs::read_to_string(manifest_path).map_err(|e| CliError::Io {
84        detail: e.to_string(),
85        path: Some(manifest_path.display().to_string()),
86    })?;
87
88    let manifest = torvyn_config::ComponentManifest::from_toml_str(
89        &manifest_content,
90        manifest_path.to_str().unwrap_or("Torvyn.toml"),
91    )
92    .map_err(|errors| CliError::Config {
93        detail: format!("Manifest has {} error(s)", errors.len()),
94        file: Some(manifest_path.display().to_string()),
95        suggestion: "Run `torvyn check` first.".into(),
96    })?;
97
98    // Determine which flow to run
99    let flow_name = args
100        .flow
101        .clone()
102        .or_else(|| manifest.flow.keys().next().cloned())
103        .ok_or_else(|| CliError::Config {
104            detail: "No flow defined in manifest and no --flow specified".into(),
105            file: Some(manifest_path.display().to_string()),
106            suggestion: "Add a [flow.*] section or use --flow <name>.".into(),
107        })?;
108
109    // Parse timeout
110    let timeout = args
111        .timeout
112        .as_ref()
113        .map(|s| parse_duration(s))
114        .transpose()
115        .map_err(|e| CliError::Config {
116            detail: format!("Invalid timeout: {e}"),
117            file: None,
118            suggestion: "Use a duration like '30s', '5m', or '1h'.".into(),
119        })?;
120
121    let spinner = ctx.spinner(&format!("Starting flow \"{flow_name}\"..."));
122
123    let config_path = manifest_path.to_path_buf();
124    let mut host = torvyn_host::HostBuilder::new()
125        .with_config_file(&config_path)
126        .build()
127        .await
128        .map_err(|e| CliError::Runtime {
129            detail: format!("Failed to initialize host: {e}"),
130            context: None,
131        })?;
132
133    if let Some(sp) = &spinner {
134        sp.finish_and_clear();
135    }
136
137    // Start the flow
138    let _flow_id = host
139        .start_flow(&flow_name)
140        .await
141        .map_err(|e| CliError::Runtime {
142            detail: format!("Failed to start flow \"{flow_name}\": {e}"),
143            context: Some(flow_name.clone()),
144        })?;
145
146    if !ctx.quiet && ctx.format == crate::cli::OutputFormat::Human {
147        eprintln!("▶ Running flow \"{}\"", flow_name);
148        eprintln!();
149    }
150
151    let start = Instant::now();
152
153    // Run until completion, limit, timeout, or Ctrl+C
154    let ctrl_c = tokio::signal::ctrl_c();
155    let run_future = host.run();
156
157    let run_result = if let Some(timeout_dur) = timeout {
158        tokio::select! {
159            result = run_future => result,
160            _ = tokio::time::sleep(timeout_dur) => {
161                host.shutdown().await.ok();
162                Ok(())
163            },
164            _ = ctrl_c => {
165                eprintln!();
166                host.shutdown().await.ok();
167                Ok(())
168            }
169        }
170    } else {
171        tokio::select! {
172            result = run_future => result,
173            _ = ctrl_c => {
174                eprintln!();
175                host.shutdown().await.ok();
176                Ok(())
177            }
178        }
179    };
180
181    run_result.map_err(|e| CliError::Runtime {
182        detail: format!("Pipeline execution failed: {e}"),
183        context: Some(flow_name.clone()),
184    })?;
185
186    let elapsed = start.elapsed();
187    let elapsed_secs = elapsed.as_secs_f64();
188
189    // Placeholder metrics — FlowSummary doesn't yet expose detailed statistics
190    let elements_processed = 0_u64;
191    let error_count = 0_u64;
192    let throughput = if elapsed_secs > 0.0 {
193        elements_processed as f64 / elapsed_secs
194    } else {
195        0.0
196    };
197
198    let result = RunResult {
199        duration_secs: elapsed_secs,
200        elements_processed,
201        throughput_elem_per_sec: throughput,
202        error_count,
203        peak_memory_bytes: 0,
204        flow_name,
205        component_count: 0,
206        edge_count: 0,
207    };
208
209    Ok(CommandResult {
210        success: true,
211        command: "run".into(),
212        data: result,
213        warnings: vec![],
214    })
215}
216
217/// Parse a duration string like "30s", "5m", "1h".
218///
219/// COLD PATH.
220///
221/// # Postconditions
222/// - Returns a [`Duration`] on success.
223/// - Returns an error string on invalid format.
224pub fn parse_duration(s: &str) -> Result<Duration, String> {
225    let s = s.trim();
226    if s.is_empty() {
227        return Err("empty duration string".into());
228    }
229
230    let (num_str, unit) = if let Some(n) = s.strip_suffix("ms") {
231        (n, "ms")
232    } else if let Some(n) = s.strip_suffix('s') {
233        (n, "s")
234    } else if let Some(n) = s.strip_suffix('m') {
235        (n, "m")
236    } else if let Some(n) = s.strip_suffix('h') {
237        (n, "h")
238    } else {
239        return Err(format!(
240            "unrecognized duration unit in \"{s}\". Use s, m, h, or ms."
241        ));
242    };
243
244    let num: f64 = num_str
245        .parse()
246        .map_err(|_| format!("invalid number in duration: \"{num_str}\""))?;
247
248    if num < 0.0 {
249        return Err("duration must be non-negative".into());
250    }
251
252    let millis = match unit {
253        "ms" => num,
254        "s" => num * 1_000.0,
255        "m" => num * 60_000.0,
256        "h" => num * 3_600_000.0,
257        _ => unreachable!(),
258    };
259
260    Ok(Duration::from_millis(millis as u64))
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    #[test]
268    fn test_parse_duration_seconds() {
269        assert_eq!(parse_duration("30s").unwrap(), Duration::from_secs(30));
270    }
271
272    #[test]
273    fn test_parse_duration_minutes() {
274        assert_eq!(parse_duration("5m").unwrap(), Duration::from_secs(300));
275    }
276
277    #[test]
278    fn test_parse_duration_hours() {
279        assert_eq!(parse_duration("1h").unwrap(), Duration::from_secs(3600));
280    }
281
282    #[test]
283    fn test_parse_duration_millis() {
284        assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
285    }
286
287    #[test]
288    fn test_parse_duration_invalid() {
289        assert!(parse_duration("abc").is_err());
290        assert!(parse_duration("").is_err());
291        assert!(parse_duration("-5s").is_err());
292    }
293}