1use crate::cli::RunArgs;
7use crate::errors::CliError;
8use crate::output::terminal;
9use crate::output::{CommandResult, HumanRenderable, OutputContext};
10use serde::Serialize;
11use std::time::{Duration, Instant};
12
13#[derive(Debug, Serialize)]
15pub struct RunResult {
16 pub duration_secs: f64,
18 pub elements_processed: u64,
20 pub throughput_elem_per_sec: f64,
22 pub error_count: u64,
24 pub peak_memory_bytes: u64,
26 pub flow_name: String,
28 pub component_count: usize,
30 pub edge_count: usize,
32}
33
34impl HumanRenderable for RunResult {
35 fn render_human(&self, ctx: &OutputContext) {
36 terminal::print_header(ctx, "Summary");
37 terminal::print_kv(ctx, "Duration", &format!("{:.2}s", self.duration_secs));
38 terminal::print_kv(ctx, "Elements", &format!("{}", self.elements_processed));
39 terminal::print_kv(
40 ctx,
41 "Throughput",
42 &format!("{:.0} elem/s", self.throughput_elem_per_sec),
43 );
44 terminal::print_kv(ctx, "Errors", &format!("{}", self.error_count));
45 terminal::print_kv(
46 ctx,
47 "Peak memory",
48 &terminal::format_bytes(self.peak_memory_bytes),
49 );
50 }
51}
52
53pub async fn execute(
69 args: &RunArgs,
70 ctx: &OutputContext,
71) -> Result<CommandResult<RunResult>, CliError> {
72 let manifest_path = &args.manifest;
73
74 if !manifest_path.exists() {
75 return Err(CliError::Config {
76 detail: format!("Manifest not found: {}", manifest_path.display()),
77 file: Some(manifest_path.display().to_string()),
78 suggestion: "Run this command from a Torvyn project directory.".into(),
79 });
80 }
81
82 let manifest_content = std::fs::read_to_string(manifest_path).map_err(|e| CliError::Io {
84 detail: e.to_string(),
85 path: Some(manifest_path.display().to_string()),
86 })?;
87
88 let manifest = torvyn_config::ComponentManifest::from_toml_str(
89 &manifest_content,
90 manifest_path.to_str().unwrap_or("Torvyn.toml"),
91 )
92 .map_err(|errors| CliError::Config {
93 detail: format!("Manifest has {} error(s)", errors.len()),
94 file: Some(manifest_path.display().to_string()),
95 suggestion: "Run `torvyn check` first.".into(),
96 })?;
97
98 let flow_name = args
100 .flow
101 .clone()
102 .or_else(|| manifest.flow.keys().next().cloned())
103 .ok_or_else(|| CliError::Config {
104 detail: "No flow defined in manifest and no --flow specified".into(),
105 file: Some(manifest_path.display().to_string()),
106 suggestion: "Add a [flow.*] section or use --flow <name>.".into(),
107 })?;
108
109 let timeout = args
111 .timeout
112 .as_ref()
113 .map(|s| parse_duration(s))
114 .transpose()
115 .map_err(|e| CliError::Config {
116 detail: format!("Invalid timeout: {e}"),
117 file: None,
118 suggestion: "Use a duration like '30s', '5m', or '1h'.".into(),
119 })?;
120
121 let spinner = ctx.spinner(&format!("Starting flow \"{flow_name}\"..."));
122
123 let config_path = manifest_path.to_path_buf();
124 let mut host = torvyn_host::HostBuilder::new()
125 .with_config_file(&config_path)
126 .build()
127 .await
128 .map_err(|e| CliError::Runtime {
129 detail: format!("Failed to initialize host: {e}"),
130 context: None,
131 })?;
132
133 if let Some(sp) = &spinner {
134 sp.finish_and_clear();
135 }
136
137 let _flow_id = host
139 .start_flow(&flow_name)
140 .await
141 .map_err(|e| CliError::Runtime {
142 detail: format!("Failed to start flow \"{flow_name}\": {e}"),
143 context: Some(flow_name.clone()),
144 })?;
145
146 if !ctx.quiet && ctx.format == crate::cli::OutputFormat::Human {
147 eprintln!("▶ Running flow \"{}\"", flow_name);
148 eprintln!();
149 }
150
151 let start = Instant::now();
152
153 let ctrl_c = tokio::signal::ctrl_c();
155 let run_future = host.run();
156
157 let run_result = if let Some(timeout_dur) = timeout {
158 tokio::select! {
159 result = run_future => result,
160 _ = tokio::time::sleep(timeout_dur) => {
161 host.shutdown().await.ok();
162 Ok(())
163 },
164 _ = ctrl_c => {
165 eprintln!();
166 host.shutdown().await.ok();
167 Ok(())
168 }
169 }
170 } else {
171 tokio::select! {
172 result = run_future => result,
173 _ = ctrl_c => {
174 eprintln!();
175 host.shutdown().await.ok();
176 Ok(())
177 }
178 }
179 };
180
181 run_result.map_err(|e| CliError::Runtime {
182 detail: format!("Pipeline execution failed: {e}"),
183 context: Some(flow_name.clone()),
184 })?;
185
186 let elapsed = start.elapsed();
187 let elapsed_secs = elapsed.as_secs_f64();
188
189 let elements_processed = 0_u64;
191 let error_count = 0_u64;
192 let throughput = if elapsed_secs > 0.0 {
193 elements_processed as f64 / elapsed_secs
194 } else {
195 0.0
196 };
197
198 let result = RunResult {
199 duration_secs: elapsed_secs,
200 elements_processed,
201 throughput_elem_per_sec: throughput,
202 error_count,
203 peak_memory_bytes: 0,
204 flow_name,
205 component_count: 0,
206 edge_count: 0,
207 };
208
209 Ok(CommandResult {
210 success: true,
211 command: "run".into(),
212 data: result,
213 warnings: vec![],
214 })
215}
216
217pub fn parse_duration(s: &str) -> Result<Duration, String> {
225 let s = s.trim();
226 if s.is_empty() {
227 return Err("empty duration string".into());
228 }
229
230 let (num_str, unit) = if let Some(n) = s.strip_suffix("ms") {
231 (n, "ms")
232 } else if let Some(n) = s.strip_suffix('s') {
233 (n, "s")
234 } else if let Some(n) = s.strip_suffix('m') {
235 (n, "m")
236 } else if let Some(n) = s.strip_suffix('h') {
237 (n, "h")
238 } else {
239 return Err(format!(
240 "unrecognized duration unit in \"{s}\". Use s, m, h, or ms."
241 ));
242 };
243
244 let num: f64 = num_str
245 .parse()
246 .map_err(|_| format!("invalid number in duration: \"{num_str}\""))?;
247
248 if num < 0.0 {
249 return Err("duration must be non-negative".into());
250 }
251
252 let millis = match unit {
253 "ms" => num,
254 "s" => num * 1_000.0,
255 "m" => num * 60_000.0,
256 "h" => num * 3_600_000.0,
257 _ => unreachable!(),
258 };
259
260 Ok(Duration::from_millis(millis as u64))
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 #[test]
268 fn test_parse_duration_seconds() {
269 assert_eq!(parse_duration("30s").unwrap(), Duration::from_secs(30));
270 }
271
272 #[test]
273 fn test_parse_duration_minutes() {
274 assert_eq!(parse_duration("5m").unwrap(), Duration::from_secs(300));
275 }
276
277 #[test]
278 fn test_parse_duration_hours() {
279 assert_eq!(parse_duration("1h").unwrap(), Duration::from_secs(3600));
280 }
281
282 #[test]
283 fn test_parse_duration_millis() {
284 assert_eq!(parse_duration("100ms").unwrap(), Duration::from_millis(100));
285 }
286
287 #[test]
288 fn test_parse_duration_invalid() {
289 assert!(parse_duration("abc").is_err());
290 assert!(parse_duration("").is_err());
291 assert!(parse_duration("-5s").is_err());
292 }
293}