1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::duration::parse_duration;
4use crate::engine::executor::RetryContext;
5use crate::engine::generator;
6use crate::error::Error;
7use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
8use serde::{Deserialize, Serialize};
9use std::num::NonZeroU32;
10use std::path::Path;
11use std::sync::Arc;
12use tokio::sync::Semaphore;
13
14#[derive(Debug, Clone)]
16pub struct BatchConfig {
17 pub max_concurrency: usize,
19 pub rate_limit: Option<u32>,
21 pub continue_on_error: bool,
23 pub show_progress: bool,
25 pub suppress_output: bool,
27}
28
29impl Default for BatchConfig {
30 fn default() -> Self {
31 Self {
32 max_concurrency: 5,
33 rate_limit: None,
34 continue_on_error: true,
35 show_progress: true,
36 suppress_output: false,
37 }
38 }
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, Default)]
43pub struct BatchOperation {
44 pub id: Option<String>,
46 pub args: Vec<String>,
48 pub description: Option<String>,
50 #[serde(default)]
52 pub headers: std::collections::HashMap<String, String>,
53 pub use_cache: Option<bool>,
55 #[serde(default)]
57 pub retry: Option<u32>,
58 #[serde(default)]
60 pub retry_delay: Option<String>,
61 #[serde(default)]
63 pub retry_max_delay: Option<String>,
64 #[serde(default)]
66 pub force_retry: bool,
67}
68
69#[derive(Debug, Serialize, Deserialize)]
71pub struct BatchFile {
72 pub metadata: Option<BatchMetadata>,
74 pub operations: Vec<BatchOperation>,
76}
77
78#[derive(Debug, Serialize, Deserialize)]
80pub struct BatchMetadata {
81 pub name: Option<String>,
83 pub version: Option<String>,
85 pub description: Option<String>,
87 pub defaults: Option<BatchDefaults>,
89}
90
91#[derive(Debug, Serialize, Deserialize)]
93pub struct BatchDefaults {
94 #[serde(default)]
96 pub headers: std::collections::HashMap<String, String>,
97 pub use_cache: Option<bool>,
99}
100
101#[derive(Debug)]
103pub struct BatchOperationResult {
104 pub operation: BatchOperation,
106 pub success: bool,
108 pub error: Option<String>,
110 pub response: Option<String>,
112 pub duration: std::time::Duration,
114}
115
116#[derive(Debug)]
118pub struct BatchResult {
119 pub results: Vec<BatchOperationResult>,
121 pub total_duration: std::time::Duration,
123 pub success_count: usize,
125 pub failure_count: usize,
127}
128
129pub struct BatchProcessor {
131 config: BatchConfig,
132 rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
133 semaphore: Arc<Semaphore>,
134}
135
136impl BatchProcessor {
137 #[must_use]
143 pub fn new(config: BatchConfig) -> Self {
144 let rate_limiter = config.rate_limit.map(|limit| {
145 Arc::new(RateLimiter::direct(Quota::per_second(
146 NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).expect("1 is non-zero")),
147 )))
148 });
149
150 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
151
152 Self {
153 config,
154 rate_limiter,
155 semaphore,
156 }
157 }
158
159 pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
168 let content = tokio::fs::read_to_string(path)
169 .await
170 .map_err(|e| Error::io_error(format!("Failed to read batch file: {e}")))?;
171
172 if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
174 return Ok(batch_file);
175 }
176
177 if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
178 return Ok(batch_file);
179 }
180
181 Err(Error::validation_error(format!(
182 "Failed to parse batch file as JSON or YAML: {}",
183 path.display()
184 )))
185 }
186
187 #[allow(clippy::too_many_arguments)]
200 pub async fn execute_batch(
201 &self,
202 spec: &CachedSpec,
203 batch_file: BatchFile,
204 global_config: Option<&GlobalConfig>,
205 base_url: Option<&str>,
206 dry_run: bool,
207 output_format: &crate::cli::OutputFormat,
208 jq_filter: Option<&str>,
209 ) -> Result<BatchResult, Error> {
210 let start_time = std::time::Instant::now();
211 let total_operations = batch_file.operations.len();
212
213 if self.config.show_progress {
214 println!("Starting batch execution: {total_operations} operations");
216 }
217
218 let mut results = Vec::with_capacity(total_operations);
219 let mut handles = Vec::new();
220
221 for (index, operation) in batch_file.operations.into_iter().enumerate() {
223 let spec = spec.clone();
224 let global_config = global_config.cloned();
225 let base_url = base_url.map(String::from);
226 let output_format = output_format.clone();
227 let jq_filter = jq_filter.map(String::from);
228 let semaphore = Arc::clone(&self.semaphore);
229 let rate_limiter = self.rate_limiter.clone();
230 let show_progress = self.config.show_progress;
231 let suppress_output = self.config.suppress_output;
232
233 let handle = tokio::spawn(async move {
234 let _permit = semaphore
236 .acquire()
237 .await
238 .expect("semaphore should not be closed");
239
240 if let Some(limiter) = rate_limiter {
242 limiter.until_ready().await;
243 }
244
245 let operation_start = std::time::Instant::now();
246
247 let result = Self::execute_single_operation(
249 &spec,
250 &operation,
251 global_config.as_ref(),
252 base_url.as_deref(),
253 dry_run,
254 &output_format,
255 jq_filter.as_deref(),
256 suppress_output,
257 )
258 .await;
259
260 let duration = operation_start.elapsed();
261
262 let (success, error, response) = match result {
263 Ok(resp) => {
264 if show_progress {
265 println!("Operation {} completed", index + 1);
267 }
268 (true, None, Some(resp))
269 }
270 Err(e) => {
271 if show_progress {
272 println!("Operation {} failed: {}", index + 1, e);
274 }
275 (false, Some(e.to_string()), None)
276 }
277 };
278
279 BatchOperationResult {
280 operation,
281 success,
282 error,
283 response,
284 duration,
285 }
286 });
287
288 handles.push(handle);
289 }
290
291 for handle in handles {
293 let result = handle
294 .await
295 .map_err(|e| Error::invalid_config(format!("Task failed: {e}")))?;
296 results.push(result);
297 }
298
299 let total_duration = start_time.elapsed();
300 let success_count = results.iter().filter(|r| r.success).count();
301 let failure_count = results.len() - success_count;
302
303 if self.config.show_progress {
304 println!(
306 "Batch execution completed: {}/{} operations successful in {:.2}s",
307 success_count,
308 total_operations,
309 total_duration.as_secs_f64()
310 );
311 }
312
313 Ok(BatchResult {
314 results,
315 total_duration,
316 success_count,
317 failure_count,
318 })
319 }
320
321 #[allow(clippy::too_many_arguments)]
323 async fn execute_single_operation(
324 spec: &CachedSpec,
325 operation: &BatchOperation,
326 global_config: Option<&GlobalConfig>,
327 base_url: Option<&str>,
328 dry_run: bool,
329 output_format: &crate::cli::OutputFormat,
330 jq_filter: Option<&str>,
331 suppress_output: bool,
332 ) -> Result<String, Error> {
333 use crate::cli::translate;
334 use crate::invocation::ExecutionContext;
335
336 let command = generator::generate_command_tree_with_flags(spec, false);
338 let matches = command
339 .try_get_matches_from(
340 std::iter::once(crate::constants::CLI_ROOT_COMMAND.to_string())
341 .chain(operation.args.clone()),
342 )
343 .map_err(|e| Error::invalid_command(crate::constants::CONTEXT_BATCH, e.to_string()))?;
344
345 let call = translate::matches_to_operation_call(spec, &matches)?;
347
348 let cache_enabled = operation.use_cache.unwrap_or(false);
350 let cache_config = if cache_enabled {
351 let config_dir =
352 if let Ok(dir) = std::env::var(crate::constants::ENV_APERTURE_CONFIG_DIR) {
353 std::path::PathBuf::from(dir)
354 } else {
355 crate::config::manager::get_config_dir()?
356 };
357 Some(crate::response_cache::CacheConfig {
358 cache_dir: config_dir
359 .join(crate::constants::DIR_CACHE)
360 .join(crate::constants::DIR_RESPONSES),
361 default_ttl: std::time::Duration::from_secs(300),
362 max_entries: 1000,
363 enabled: true,
364 allow_authenticated: false,
365 })
366 } else {
367 None
368 };
369
370 let retry_context = build_batch_retry_context(operation, global_config)?;
372
373 let ctx = ExecutionContext {
375 dry_run,
376 idempotency_key: None,
377 cache_config,
378 retry_context,
379 base_url: base_url.map(String::from),
380 global_config: global_config.cloned(),
381 server_var_args: translate::extract_server_var_args(&matches),
382 };
383
384 let result = crate::engine::executor::execute(spec, call, ctx).await?;
386
387 if suppress_output {
389 let output =
390 crate::cli::render::render_result_to_string(&result, output_format, jq_filter)?;
391 return Ok(output.unwrap_or_default());
392 }
393
394 crate::cli::render::render_result(&result, output_format, jq_filter)?;
395
396 Ok(format!(
397 "Successfully executed operation: {}",
398 operation
399 .id
400 .as_deref()
401 .unwrap_or(crate::constants::DEFAULT_OPERATION_NAME)
402 ))
403 }
404}
405
406#[allow(clippy::cast_possible_truncation)]
410fn build_batch_retry_context(
411 operation: &BatchOperation,
412 global_config: Option<&GlobalConfig>,
413) -> Result<Option<RetryContext>, Error> {
414 let defaults = global_config.map(|c| &c.retry_defaults);
416
417 let max_attempts = operation
419 .retry
420 .or_else(|| defaults.map(|d| d.max_attempts))
421 .unwrap_or(0);
422
423 if max_attempts == 0 {
425 return Ok(None);
426 }
427
428 let initial_delay_ms = if let Some(ref delay_str) = operation.retry_delay {
431 parse_duration(delay_str)?.as_millis() as u64
432 } else {
433 defaults.map_or(500, |d| d.initial_delay_ms)
434 };
435
436 let max_delay_ms = if let Some(ref delay_str) = operation.retry_max_delay {
439 parse_duration(delay_str)?.as_millis() as u64
440 } else {
441 defaults.map_or(30_000, |d| d.max_delay_ms)
442 };
443
444 Ok(Some(RetryContext {
445 max_attempts,
446 initial_delay_ms,
447 max_delay_ms,
448 force_retry: operation.force_retry,
449 method: None, has_idempotency_key: false, }))
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use std::io::Write;
458 use tempfile::NamedTempFile;
459
460 #[tokio::test]
461 async fn test_parse_batch_file_json() {
462 let batch_content = r#"{
463 "metadata": {
464 "name": "Test batch",
465 "description": "A test batch file"
466 },
467 "operations": [
468 {
469 "id": "op1",
470 "args": ["users", "list"],
471 "description": "List all users"
472 },
473 {
474 "id": "op2",
475 "args": ["users", "get", "--user-id", "123"],
476 "description": "Get user 123"
477 }
478 ]
479 }"#;
480
481 let mut temp_file = NamedTempFile::new().unwrap();
482 temp_file.write_all(batch_content.as_bytes()).unwrap();
483
484 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
485 .await
486 .unwrap();
487
488 assert_eq!(batch_file.operations.len(), 2);
489 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
490 assert_eq!(
491 batch_file.operations[1].args,
492 vec!["users", "get", "--user-id", "123"]
493 );
494 }
495
496 #[tokio::test]
497 async fn test_parse_batch_file_yaml() {
498 let batch_content = r#"
499metadata:
500 name: Test batch
501 description: A test batch file
502operations:
503 - id: op1
504 args: [users, list]
505 description: List all users
506 - id: op2
507 args: [users, get, --user-id, "123"]
508 description: Get user 123
509"#;
510
511 let mut temp_file = NamedTempFile::new().unwrap();
512 temp_file.write_all(batch_content.as_bytes()).unwrap();
513
514 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
515 .await
516 .unwrap();
517
518 assert_eq!(batch_file.operations.len(), 2);
519 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
520 assert_eq!(
521 batch_file.operations[1].args,
522 vec!["users", "get", "--user-id", "123"]
523 );
524 }
525
526 #[test]
527 fn test_batch_config_default() {
528 let config = BatchConfig::default();
529 assert_eq!(config.max_concurrency, 5);
530 assert_eq!(config.rate_limit, None);
531 assert!(config.continue_on_error);
532 assert!(config.show_progress);
533 }
534
535 #[test]
536 fn test_batch_processor_creation() {
537 let config = BatchConfig {
538 max_concurrency: 10,
539 rate_limit: Some(5),
540 continue_on_error: false,
541 show_progress: false,
542 suppress_output: false,
543 };
544
545 let processor = BatchProcessor::new(config);
546 assert_eq!(processor.semaphore.available_permits(), 10);
547 assert!(processor.rate_limiter.is_some());
548 }
549}