1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::duration::parse_duration;
4use crate::engine::executor::RetryContext;
5use crate::engine::generator;
6use crate::error::Error;
7use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
8use serde::{Deserialize, Serialize};
9use std::num::NonZeroU32;
10use std::path::Path;
11use std::sync::Arc;
12use tokio::sync::Semaphore;
13
14#[derive(Debug, Clone)]
16pub struct BatchConfig {
17 pub max_concurrency: usize,
19 pub rate_limit: Option<u32>,
21 pub continue_on_error: bool,
23 pub show_progress: bool,
25 pub suppress_output: bool,
27}
28
29impl Default for BatchConfig {
30 fn default() -> Self {
31 Self {
32 max_concurrency: 5,
33 rate_limit: None,
34 continue_on_error: true,
35 show_progress: true,
36 suppress_output: false,
37 }
38 }
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, Default)]
43pub struct BatchOperation {
44 pub id: Option<String>,
46 pub args: Vec<String>,
48 pub description: Option<String>,
50 #[serde(default)]
52 pub headers: std::collections::HashMap<String, String>,
53 pub use_cache: Option<bool>,
55 #[serde(default)]
57 pub retry: Option<u32>,
58 #[serde(default)]
60 pub retry_delay: Option<String>,
61 #[serde(default)]
63 pub retry_max_delay: Option<String>,
64 #[serde(default)]
66 pub force_retry: bool,
67}
68
69#[derive(Debug, Serialize, Deserialize)]
71pub struct BatchFile {
72 pub metadata: Option<BatchMetadata>,
74 pub operations: Vec<BatchOperation>,
76}
77
78#[derive(Debug, Serialize, Deserialize)]
80pub struct BatchMetadata {
81 pub name: Option<String>,
83 pub version: Option<String>,
85 pub description: Option<String>,
87 pub defaults: Option<BatchDefaults>,
89}
90
91#[derive(Debug, Serialize, Deserialize)]
93pub struct BatchDefaults {
94 #[serde(default)]
96 pub headers: std::collections::HashMap<String, String>,
97 pub use_cache: Option<bool>,
99}
100
101#[derive(Debug)]
103pub struct BatchOperationResult {
104 pub operation: BatchOperation,
106 pub success: bool,
108 pub error: Option<String>,
110 pub response: Option<String>,
112 pub duration: std::time::Duration,
114}
115
116#[derive(Debug)]
118pub struct BatchResult {
119 pub results: Vec<BatchOperationResult>,
121 pub total_duration: std::time::Duration,
123 pub success_count: usize,
125 pub failure_count: usize,
127}
128
129pub struct BatchProcessor {
131 config: BatchConfig,
132 rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
133 semaphore: Arc<Semaphore>,
134}
135
136impl BatchProcessor {
137 #[must_use]
143 pub fn new(config: BatchConfig) -> Self {
144 let rate_limiter = config.rate_limit.map(|limit| {
145 Arc::new(RateLimiter::direct(Quota::per_second(
146 NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).expect("1 is non-zero")),
147 )))
148 });
149
150 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
151
152 Self {
153 config,
154 rate_limiter,
155 semaphore,
156 }
157 }
158
159 pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
168 let content = tokio::fs::read_to_string(path)
169 .await
170 .map_err(|e| Error::io_error(format!("Failed to read batch file: {e}")))?;
171
172 if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
174 return Ok(batch_file);
175 }
176
177 if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
178 return Ok(batch_file);
179 }
180
181 Err(Error::validation_error(format!(
182 "Failed to parse batch file as JSON or YAML: {}",
183 path.display()
184 )))
185 }
186
187 #[allow(clippy::too_many_arguments)]
200 pub async fn execute_batch(
201 &self,
202 spec: &CachedSpec,
203 batch_file: BatchFile,
204 global_config: Option<&GlobalConfig>,
205 base_url: Option<&str>,
206 dry_run: bool,
207 output_format: &crate::cli::OutputFormat,
208 jq_filter: Option<&str>,
209 ) -> Result<BatchResult, Error> {
210 let start_time = std::time::Instant::now();
211 let total_operations = batch_file.operations.len();
212
213 if self.config.show_progress {
214 println!("Starting batch execution: {total_operations} operations");
216 }
217
218 let mut results = Vec::with_capacity(total_operations);
219 let mut handles = Vec::new();
220
221 for (index, operation) in batch_file.operations.into_iter().enumerate() {
223 let spec = spec.clone();
224 let global_config = global_config.cloned();
225 let base_url = base_url.map(String::from);
226 let output_format = output_format.clone();
227 let jq_filter = jq_filter.map(String::from);
228 let semaphore = Arc::clone(&self.semaphore);
229 let rate_limiter = self.rate_limiter.clone();
230 let show_progress = self.config.show_progress;
231 let suppress_output = self.config.suppress_output;
232
233 let handle = tokio::spawn(async move {
234 let _permit = semaphore
236 .acquire()
237 .await
238 .expect("semaphore should not be closed");
239
240 if let Some(limiter) = rate_limiter {
242 limiter.until_ready().await;
243 }
244
245 let operation_start = std::time::Instant::now();
246
247 let result = Self::execute_single_operation(
249 &spec,
250 &operation,
251 global_config.as_ref(),
252 base_url.as_deref(),
253 dry_run,
254 &output_format,
255 jq_filter.as_deref(),
256 suppress_output,
257 )
258 .await;
259
260 let duration = operation_start.elapsed();
261
262 let (success, error, response) = match result {
263 Ok(resp) => {
264 if show_progress {
265 println!("Operation {} completed", index + 1);
267 }
268 (true, None, Some(resp))
269 }
270 Err(e) => {
271 if show_progress {
272 println!("Operation {} failed: {}", index + 1, e);
274 }
275 (false, Some(e.to_string()), None)
276 }
277 };
278
279 BatchOperationResult {
280 operation,
281 success,
282 error,
283 response,
284 duration,
285 }
286 });
287
288 handles.push(handle);
289 }
290
291 for handle in handles {
293 let result = handle
294 .await
295 .map_err(|e| Error::invalid_config(format!("Task failed: {e}")))?;
296 results.push(result);
297 }
298
299 let total_duration = start_time.elapsed();
300 let success_count = results.iter().filter(|r| r.success).count();
301 let failure_count = results.len() - success_count;
302
303 if self.config.show_progress {
304 println!(
306 "Batch execution completed: {}/{} operations successful in {:.2}s",
307 success_count,
308 total_operations,
309 total_duration.as_secs_f64()
310 );
311 }
312
313 Ok(BatchResult {
314 results,
315 total_duration,
316 success_count,
317 failure_count,
318 })
319 }
320
321 #[allow(clippy::too_many_arguments)]
323 async fn execute_single_operation(
324 spec: &CachedSpec,
325 operation: &BatchOperation,
326 global_config: Option<&GlobalConfig>,
327 base_url: Option<&str>,
328 dry_run: bool,
329 output_format: &crate::cli::OutputFormat,
330 jq_filter: Option<&str>,
331 suppress_output: bool,
332 ) -> Result<String, Error> {
333 let command = generator::generate_command_tree_with_flags(spec, false);
335
336 let matches = command
338 .try_get_matches_from(
339 std::iter::once(crate::constants::CLI_ROOT_COMMAND.to_string())
340 .chain(operation.args.clone()),
341 )
342 .map_err(|e| Error::invalid_command(crate::constants::CONTEXT_BATCH, e.to_string()))?;
343
344 let cache_enabled = operation.use_cache.unwrap_or(false);
346 let cache_config = if cache_enabled {
347 Some(crate::response_cache::CacheConfig {
348 cache_dir: std::env::var(crate::constants::ENV_APERTURE_CONFIG_DIR)
349 .map_or_else(
350 |_| std::path::PathBuf::from("~/.config/aperture"),
351 std::path::PathBuf::from,
352 )
353 .join(crate::constants::DIR_CACHE)
354 .join(crate::constants::DIR_RESPONSES),
355 default_ttl: std::time::Duration::from_secs(300),
356 max_entries: 1000,
357 enabled: true,
358 })
359 } else {
360 None
361 };
362
363 let retry_context = build_batch_retry_context(operation, global_config)?;
365
366 if suppress_output {
368 let output = crate::engine::executor::execute_request(
369 spec,
370 &matches,
371 base_url,
372 dry_run,
373 None, global_config,
375 output_format,
376 jq_filter,
377 cache_config.as_ref(),
378 true, retry_context.as_ref(),
380 )
381 .await?;
382
383 return Ok(output.unwrap_or_default());
385 }
386
387 if dry_run {
390 crate::engine::executor::execute_request(
391 spec,
392 &matches,
393 base_url,
394 true, None, global_config,
397 output_format,
398 jq_filter,
399 cache_config.as_ref(),
400 false, retry_context.as_ref(),
402 )
403 .await?;
404
405 return Ok(format!(
407 "DRY RUN: Would execute operation with args: {:?}",
408 operation.args
409 ));
410 }
411
412 crate::engine::executor::execute_request(
415 spec,
416 &matches,
417 base_url,
418 false, None, global_config,
421 output_format,
422 jq_filter,
423 cache_config.as_ref(),
424 false, retry_context.as_ref(),
426 )
427 .await?;
428
429 Ok(format!(
431 "Successfully executed operation: {}",
432 operation
433 .id
434 .as_deref()
435 .unwrap_or(crate::constants::DEFAULT_OPERATION_NAME)
436 ))
437 }
438}
439
440#[allow(clippy::cast_possible_truncation)]
444fn build_batch_retry_context(
445 operation: &BatchOperation,
446 global_config: Option<&GlobalConfig>,
447) -> Result<Option<RetryContext>, Error> {
448 let defaults = global_config.map(|c| &c.retry_defaults);
450
451 let max_attempts = operation
453 .retry
454 .or_else(|| defaults.map(|d| d.max_attempts))
455 .unwrap_or(0);
456
457 if max_attempts == 0 {
459 return Ok(None);
460 }
461
462 let initial_delay_ms = if let Some(ref delay_str) = operation.retry_delay {
465 parse_duration(delay_str)?.as_millis() as u64
466 } else {
467 defaults.map_or(500, |d| d.initial_delay_ms)
468 };
469
470 let max_delay_ms = if let Some(ref delay_str) = operation.retry_max_delay {
473 parse_duration(delay_str)?.as_millis() as u64
474 } else {
475 defaults.map_or(30_000, |d| d.max_delay_ms)
476 };
477
478 Ok(Some(RetryContext {
479 max_attempts,
480 initial_delay_ms,
481 max_delay_ms,
482 force_retry: operation.force_retry,
483 method: None, has_idempotency_key: false, }))
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use std::io::Write;
492 use tempfile::NamedTempFile;
493
494 #[tokio::test]
495 async fn test_parse_batch_file_json() {
496 let batch_content = r#"{
497 "metadata": {
498 "name": "Test batch",
499 "description": "A test batch file"
500 },
501 "operations": [
502 {
503 "id": "op1",
504 "args": ["users", "list"],
505 "description": "List all users"
506 },
507 {
508 "id": "op2",
509 "args": ["users", "get", "--user-id", "123"],
510 "description": "Get user 123"
511 }
512 ]
513 }"#;
514
515 let mut temp_file = NamedTempFile::new().unwrap();
516 temp_file.write_all(batch_content.as_bytes()).unwrap();
517
518 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
519 .await
520 .unwrap();
521
522 assert_eq!(batch_file.operations.len(), 2);
523 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
524 assert_eq!(
525 batch_file.operations[1].args,
526 vec!["users", "get", "--user-id", "123"]
527 );
528 }
529
530 #[tokio::test]
531 async fn test_parse_batch_file_yaml() {
532 let batch_content = r#"
533metadata:
534 name: Test batch
535 description: A test batch file
536operations:
537 - id: op1
538 args: [users, list]
539 description: List all users
540 - id: op2
541 args: [users, get, --user-id, "123"]
542 description: Get user 123
543"#;
544
545 let mut temp_file = NamedTempFile::new().unwrap();
546 temp_file.write_all(batch_content.as_bytes()).unwrap();
547
548 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
549 .await
550 .unwrap();
551
552 assert_eq!(batch_file.operations.len(), 2);
553 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
554 assert_eq!(
555 batch_file.operations[1].args,
556 vec!["users", "get", "--user-id", "123"]
557 );
558 }
559
560 #[test]
561 fn test_batch_config_default() {
562 let config = BatchConfig::default();
563 assert_eq!(config.max_concurrency, 5);
564 assert_eq!(config.rate_limit, None);
565 assert!(config.continue_on_error);
566 assert!(config.show_progress);
567 }
568
569 #[test]
570 fn test_batch_processor_creation() {
571 let config = BatchConfig {
572 max_concurrency: 10,
573 rate_limit: Some(5),
574 continue_on_error: false,
575 show_progress: false,
576 suppress_output: false,
577 };
578
579 let processor = BatchProcessor::new(config);
580 assert_eq!(processor.semaphore.available_permits(), 10);
581 assert!(processor.rate_limiter.is_some());
582 }
583}