1pub mod capture;
2pub mod graph;
3pub mod interpolation;
4
5use crate::cache::models::CachedSpec;
6use crate::config::models::GlobalConfig;
7use crate::duration::parse_duration;
8use crate::engine::executor::RetryContext;
9use crate::engine::generator;
10use crate::error::Error;
11use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
12use serde::{Deserialize, Serialize};
13use std::num::NonZeroU32;
14use std::path::Path;
15use std::sync::Arc;
16use tokio::sync::Semaphore;
17
18#[derive(Debug, Clone)]
20pub struct BatchConfig {
21 pub max_concurrency: usize,
23 pub rate_limit: Option<u32>,
25 pub continue_on_error: bool,
27 pub show_progress: bool,
29 pub suppress_output: bool,
31}
32
33impl Default for BatchConfig {
34 fn default() -> Self {
35 Self {
36 max_concurrency: 5,
37 rate_limit: None,
38 continue_on_error: true,
39 show_progress: true,
40 suppress_output: false,
41 }
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, Default)]
47pub struct BatchOperation {
48 pub id: Option<String>,
51 pub args: Vec<String>,
53 pub description: Option<String>,
55 #[serde(default)]
57 pub headers: std::collections::HashMap<String, String>,
58 pub use_cache: Option<bool>,
60 #[serde(default)]
62 pub retry: Option<u32>,
63 #[serde(default)]
65 pub retry_delay: Option<String>,
66 #[serde(default)]
68 pub retry_max_delay: Option<String>,
69 #[serde(default)]
71 pub force_retry: bool,
72
73 #[serde(default)]
77 pub capture: Option<std::collections::HashMap<String, String>>,
78
79 #[serde(default)]
83 pub capture_append: Option<std::collections::HashMap<String, String>>,
84
85 #[serde(default)]
89 pub depends_on: Option<Vec<String>>,
90
91 #[serde(default)]
96 pub body_file: Option<String>,
97}
98
99#[derive(Debug, Serialize, Deserialize)]
101pub struct BatchFile {
102 pub metadata: Option<BatchMetadata>,
104 pub operations: Vec<BatchOperation>,
106}
107
108#[derive(Debug, Serialize, Deserialize)]
110pub struct BatchMetadata {
111 pub name: Option<String>,
113 pub version: Option<String>,
115 pub description: Option<String>,
117 pub defaults: Option<BatchDefaults>,
119}
120
121#[derive(Debug, Serialize, Deserialize)]
123pub struct BatchDefaults {
124 #[serde(default)]
126 pub headers: std::collections::HashMap<String, String>,
127 pub use_cache: Option<bool>,
129}
130
131#[derive(Debug)]
133pub struct BatchOperationResult {
134 pub operation: BatchOperation,
136 pub success: bool,
138 pub error: Option<String>,
140 pub response: Option<String>,
142 pub duration: std::time::Duration,
144}
145
146#[derive(Debug)]
148pub struct BatchResult {
149 pub results: Vec<BatchOperationResult>,
151 pub total_duration: std::time::Duration,
153 pub success_count: usize,
155 pub failure_count: usize,
157}
158
159pub struct BatchProcessor {
161 config: BatchConfig,
162 rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
163 semaphore: Arc<Semaphore>,
164}
165
166impl BatchProcessor {
167 #[must_use]
173 pub fn new(config: BatchConfig) -> Self {
174 let rate_limiter = config.rate_limit.map(|limit| {
175 Arc::new(RateLimiter::direct(Quota::per_second(
176 NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).expect("1 is non-zero")),
177 )))
178 });
179
180 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
181
182 Self {
183 config,
184 rate_limiter,
185 semaphore,
186 }
187 }
188
189 pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
198 let content = tokio::fs::read_to_string(path)
199 .await
200 .map_err(|e| Error::io_error(format!("Failed to read batch file: {e}")))?;
201
202 if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
204 return Ok(batch_file);
205 }
206
207 if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
208 return Ok(batch_file);
209 }
210
211 Err(Error::validation_error(format!(
212 "Failed to parse batch file as JSON or YAML: {}",
213 path.display()
214 )))
215 }
216
217 #[allow(clippy::too_many_arguments)]
237 pub async fn execute_batch(
238 &self,
239 spec: &CachedSpec,
240 batch_file: BatchFile,
241 global_config: Option<&GlobalConfig>,
242 base_url: Option<&str>,
243 dry_run: bool,
244 output_format: &crate::cli::OutputFormat,
245 jq_filter: Option<&str>,
246 ) -> Result<BatchResult, Error> {
247 if graph::has_dependencies(&batch_file.operations) {
248 self.execute_dependent_batch(
249 spec,
250 batch_file,
251 global_config,
252 base_url,
253 dry_run,
254 output_format,
255 jq_filter,
256 )
257 .await
258 } else {
259 self.execute_concurrent_batch(
260 spec,
261 batch_file,
262 global_config,
263 base_url,
264 dry_run,
265 output_format,
266 jq_filter,
267 )
268 .await
269 }
270 }
271
272 #[allow(clippy::too_many_arguments)]
275 async fn execute_dependent_batch(
276 &self,
277 spec: &CachedSpec,
278 batch_file: BatchFile,
279 global_config: Option<&GlobalConfig>,
280 base_url: Option<&str>,
281 dry_run: bool,
282 _output_format: &crate::cli::OutputFormat,
283 _jq_filter: Option<&str>,
284 ) -> Result<BatchResult, Error> {
285 let start_time = std::time::Instant::now();
286 let operations = batch_file.operations;
287 let total_operations = operations.len();
288
289 let execution_order = graph::resolve_execution_order(&operations)?;
290
291 if self.config.show_progress {
292 println!("Starting dependent batch execution: {total_operations} operations");
294 }
295
296 let mut store = interpolation::VariableStore::default();
297 let mut results: Vec<Option<BatchOperationResult>> =
298 (0..total_operations).map(|_| None).collect();
299
300 for &idx in &execution_order {
301 let operation = &operations[idx];
302
303 if let Some(limiter) = &self.rate_limiter {
304 limiter.until_ready().await;
305 }
306
307 let result = Self::run_dependent_operation(
308 spec,
309 operation,
310 &mut store,
311 global_config,
312 base_url,
313 dry_run,
314 self.config.show_progress,
315 )
316 .await;
317
318 let failed = !result.success;
319 results[idx] = Some(result);
320
321 if failed {
322 break; }
324 }
325
326 let final_results = Self::fill_skipped_results(results, &operations);
327 let total_duration = start_time.elapsed();
328 let success_count = final_results.iter().filter(|r| r.success).count();
329 let failure_count = final_results.len() - success_count;
330
331 if self.config.show_progress {
332 println!(
334 "Dependent batch completed: {success_count}/{total_operations} operations successful in {:.2}s",
335 total_duration.as_secs_f64()
336 );
337 }
338
339 Ok(BatchResult {
340 results: final_results,
341 total_duration,
342 success_count,
343 failure_count,
344 })
345 }
346
347 #[allow(clippy::too_many_arguments)]
352 async fn run_dependent_operation(
353 spec: &CachedSpec,
354 operation: &BatchOperation,
355 store: &mut interpolation::VariableStore,
356 global_config: Option<&GlobalConfig>,
357 base_url: Option<&str>,
358 dry_run: bool,
359 show_progress: bool,
360 ) -> BatchOperationResult {
361 let op_id = operation
362 .id
363 .as_deref()
364 .unwrap_or(crate::constants::DEFAULT_OPERATION_NAME);
365
366 let interpolated_args = match interpolation::interpolate_args(&operation.args, store, op_id)
369 {
370 Ok(args) => args,
371 Err(e) => {
372 return BatchOperationResult {
373 operation: operation.clone(),
374 success: false,
375 error: Some(e.to_string()),
376 response: None,
377 duration: std::time::Duration::ZERO,
378 };
379 }
380 };
381
382 let interpolated_body_file = match operation
383 .body_file
384 .as_deref()
385 .map(|p| interpolation::interpolate_string(p, store, op_id))
386 .transpose()
387 {
388 Ok(path) => path,
389 Err(e) => {
390 return BatchOperationResult {
391 operation: operation.clone(),
392 success: false,
393 error: Some(e.to_string()),
394 response: None,
395 duration: std::time::Duration::ZERO,
396 };
397 }
398 };
399
400 let mut exec_op = operation.clone();
401 exec_op.args = interpolated_args;
402 exec_op.body_file = interpolated_body_file;
403 let operation_start = std::time::Instant::now();
404
405 let result = Self::execute_single_operation(
408 spec,
409 &exec_op,
410 global_config,
411 base_url,
412 dry_run,
413 &crate::cli::OutputFormat::Json,
414 None,
415 true,
416 )
417 .await;
418
419 let duration = operation_start.elapsed();
420
421 let response = match result {
424 Ok(resp) => resp,
425 Err(e) => {
426 Self::log_progress(show_progress, || format!("Operation '{op_id}' failed: {e}"));
427 return BatchOperationResult {
428 operation: exec_op,
429 success: false,
430 error: Some(e.to_string()),
431 response: None,
432 duration,
433 };
434 }
435 };
436
437 let capture_result = capture::extract_captures(operation, &response, store);
440 let Err(capture_err) = capture_result else {
441 Self::log_progress(show_progress, || format!("Operation '{op_id}' completed"));
442 return BatchOperationResult {
443 operation: exec_op,
444 success: true,
445 error: None,
446 response: Some(response),
447 duration,
448 };
449 };
450
451 Self::log_progress(show_progress, || {
452 format!("Operation '{op_id}' capture failed: {capture_err}")
453 });
454 BatchOperationResult {
455 operation: exec_op,
456 success: false,
457 error: Some(capture_err.to_string()),
458 response: Some(response),
459 duration,
460 }
461 }
462
463 fn log_progress(show_progress: bool, msg: impl FnOnce() -> String) {
465 if show_progress {
466 println!("{}", msg());
468 }
469 }
470
471 fn fill_skipped_results(
473 results: Vec<Option<BatchOperationResult>>,
474 operations: &[BatchOperation],
475 ) -> Vec<BatchOperationResult> {
476 results
477 .into_iter()
478 .enumerate()
479 .map(|(i, r)| {
480 r.unwrap_or_else(|| BatchOperationResult {
481 operation: operations[i].clone(),
482 success: false,
483 error: Some("Skipped due to prior failure".into()),
484 response: None,
485 duration: std::time::Duration::ZERO,
486 })
487 })
488 .collect()
489 }
490
491 #[allow(clippy::too_many_arguments)]
493 async fn execute_concurrent_batch(
494 &self,
495 spec: &CachedSpec,
496 batch_file: BatchFile,
497 global_config: Option<&GlobalConfig>,
498 base_url: Option<&str>,
499 dry_run: bool,
500 output_format: &crate::cli::OutputFormat,
501 jq_filter: Option<&str>,
502 ) -> Result<BatchResult, Error> {
503 let start_time = std::time::Instant::now();
504 let total_operations = batch_file.operations.len();
505
506 if self.config.show_progress {
507 println!("Starting batch execution: {total_operations} operations");
509 }
510
511 let mut results = Vec::with_capacity(total_operations);
512 let mut handles = Vec::new();
513
514 for (index, operation) in batch_file.operations.into_iter().enumerate() {
516 let spec = spec.clone();
517 let global_config = global_config.cloned();
518 let base_url = base_url.map(String::from);
519 let output_format = output_format.clone();
520 let jq_filter = jq_filter.map(String::from);
521 let semaphore = Arc::clone(&self.semaphore);
522 let rate_limiter = self.rate_limiter.clone();
523 let show_progress = self.config.show_progress;
524 let suppress_output = self.config.suppress_output;
525
526 let handle = tokio::spawn(async move {
527 let _permit = semaphore
529 .acquire()
530 .await
531 .expect("semaphore should not be closed");
532
533 if let Some(limiter) = rate_limiter {
535 limiter.until_ready().await;
536 }
537
538 let operation_start = std::time::Instant::now();
539
540 let result = Self::execute_single_operation(
542 &spec,
543 &operation,
544 global_config.as_ref(),
545 base_url.as_deref(),
546 dry_run,
547 &output_format,
548 jq_filter.as_deref(),
549 suppress_output,
550 )
551 .await;
552
553 let duration = operation_start.elapsed();
554
555 let (success, error, response) = match result {
556 Ok(resp) => {
557 if show_progress {
558 println!("Operation {} completed", index + 1);
560 }
561 (true, None, Some(resp))
562 }
563 Err(e) => {
564 if show_progress {
565 println!("Operation {} failed: {}", index + 1, e);
567 }
568 (false, Some(e.to_string()), None)
569 }
570 };
571
572 BatchOperationResult {
573 operation,
574 success,
575 error,
576 response,
577 duration,
578 }
579 });
580
581 handles.push(handle);
582 }
583
584 for handle in handles {
586 let result = handle
587 .await
588 .map_err(|e| Error::invalid_config(format!("Task failed: {e}")))?;
589 results.push(result);
590 }
591
592 let total_duration = start_time.elapsed();
593 let success_count = results.iter().filter(|r| r.success).count();
594 let failure_count = results.len() - success_count;
595
596 if self.config.show_progress {
597 println!(
599 "Batch execution completed: {}/{} operations successful in {:.2}s",
600 success_count,
601 total_operations,
602 total_duration.as_secs_f64()
603 );
604 }
605
606 Ok(BatchResult {
607 results,
608 total_duration,
609 success_count,
610 failure_count,
611 })
612 }
613
614 #[allow(clippy::too_many_arguments)]
616 async fn execute_single_operation(
617 spec: &CachedSpec,
618 operation: &BatchOperation,
619 global_config: Option<&GlobalConfig>,
620 base_url: Option<&str>,
621 dry_run: bool,
622 output_format: &crate::cli::OutputFormat,
623 jq_filter: Option<&str>,
624 suppress_output: bool,
625 ) -> Result<String, Error> {
626 use crate::cli::translate;
627 use crate::invocation::ExecutionContext;
628
629 let body_field_conflicts_with_args = operation.body_file.is_some()
639 && operation.args.iter().any(|a| {
640 a == "--body-file"
641 || a.starts_with("--body-file=")
642 || a == "--body"
643 || a.starts_with("--body=")
644 });
645 if body_field_conflicts_with_args {
646 return Err(Error::invalid_config(
647 "body_file field conflicts with --body or --body-file in args; use one or the other",
648 ));
649 }
650
651 let command = generator::generate_command_tree_with_flags(spec, false);
652 let extra_body_file: Vec<String> = operation
653 .body_file
654 .as_deref()
655 .map(|p| vec!["--body-file".to_string(), p.to_string()])
656 .unwrap_or_default();
657 let matches = command
658 .try_get_matches_from(
659 std::iter::once(crate::constants::CLI_ROOT_COMMAND.to_string())
660 .chain(operation.args.clone())
661 .chain(extra_body_file),
662 )
663 .map_err(|e| Error::invalid_command(crate::constants::CONTEXT_BATCH, e.to_string()))?;
664
665 let call = translate::matches_to_operation_call(spec, &matches)?;
667
668 let cache_enabled = operation.use_cache.unwrap_or(false);
670 let cache_config = if cache_enabled {
671 let config_dir =
672 if let Ok(dir) = std::env::var(crate::constants::ENV_APERTURE_CONFIG_DIR) {
673 std::path::PathBuf::from(dir)
674 } else {
675 crate::config::manager::get_config_dir()?
676 };
677 Some(crate::response_cache::CacheConfig {
678 cache_dir: config_dir
679 .join(crate::constants::DIR_CACHE)
680 .join(crate::constants::DIR_RESPONSES),
681 default_ttl: std::time::Duration::from_secs(300),
682 max_entries: 1000,
683 enabled: true,
684 allow_authenticated: false,
685 })
686 } else {
687 None
688 };
689
690 let retry_context = build_batch_retry_context(operation, global_config)?;
692
693 let ctx = ExecutionContext {
695 dry_run,
696 idempotency_key: None,
697 cache_config,
698 retry_context,
699 base_url: base_url.map(String::from),
700 global_config: global_config.cloned(),
701 server_var_args: translate::extract_server_var_args(&matches),
702 auto_paginate: false,
703 };
704
705 let result = crate::engine::executor::execute(spec, call, ctx).await?;
707
708 if suppress_output {
710 let output =
711 crate::cli::render::render_result_to_string(&result, output_format, jq_filter)?;
712 return Ok(output.unwrap_or_default());
713 }
714
715 crate::cli::render::render_result(&result, output_format, jq_filter)?;
716
717 Ok(format!(
718 "Successfully executed operation: {}",
719 operation
720 .id
721 .as_deref()
722 .unwrap_or(crate::constants::DEFAULT_OPERATION_NAME)
723 ))
724 }
725}
726
727#[allow(clippy::cast_possible_truncation)]
731fn build_batch_retry_context(
732 operation: &BatchOperation,
733 global_config: Option<&GlobalConfig>,
734) -> Result<Option<RetryContext>, Error> {
735 let defaults = global_config.map(|c| &c.retry_defaults);
737
738 let max_attempts = operation
740 .retry
741 .or_else(|| defaults.map(|d| d.max_attempts))
742 .unwrap_or(0);
743
744 if max_attempts == 0 {
746 return Ok(None);
747 }
748
749 let initial_delay_ms = if let Some(ref delay_str) = operation.retry_delay {
752 parse_duration(delay_str)?.as_millis() as u64
753 } else {
754 defaults.map_or(500, |d| d.initial_delay_ms)
755 };
756
757 let max_delay_ms = if let Some(ref delay_str) = operation.retry_max_delay {
760 parse_duration(delay_str)?.as_millis() as u64
761 } else {
762 defaults.map_or(30_000, |d| d.max_delay_ms)
763 };
764
765 Ok(Some(RetryContext {
766 max_attempts,
767 initial_delay_ms,
768 max_delay_ms,
769 force_retry: operation.force_retry,
770 method: None, has_idempotency_key: false, }))
773}
774
775#[cfg(test)]
776mod tests {
777 use super::*;
778 use std::io::Write;
779 use tempfile::NamedTempFile;
780
781 #[tokio::test]
782 async fn test_parse_batch_file_json() {
783 let batch_content = r#"{
784 "metadata": {
785 "name": "Test batch",
786 "description": "A test batch file"
787 },
788 "operations": [
789 {
790 "id": "op1",
791 "args": ["users", "list"],
792 "description": "List all users"
793 },
794 {
795 "id": "op2",
796 "args": ["users", "get", "--user-id", "123"],
797 "description": "Get user 123"
798 }
799 ]
800 }"#;
801
802 let mut temp_file = NamedTempFile::new().unwrap();
803 temp_file.write_all(batch_content.as_bytes()).unwrap();
804
805 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
806 .await
807 .unwrap();
808
809 assert_eq!(batch_file.operations.len(), 2);
810 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
811 assert_eq!(
812 batch_file.operations[1].args,
813 vec!["users", "get", "--user-id", "123"]
814 );
815 }
816
817 #[tokio::test]
818 async fn test_parse_batch_file_yaml() {
819 let batch_content = r#"
820metadata:
821 name: Test batch
822 description: A test batch file
823operations:
824 - id: op1
825 args: [users, list]
826 description: List all users
827 - id: op2
828 args: [users, get, --user-id, "123"]
829 description: Get user 123
830"#;
831
832 let mut temp_file = NamedTempFile::new().unwrap();
833 temp_file.write_all(batch_content.as_bytes()).unwrap();
834
835 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
836 .await
837 .unwrap();
838
839 assert_eq!(batch_file.operations.len(), 2);
840 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
841 assert_eq!(
842 batch_file.operations[1].args,
843 vec!["users", "get", "--user-id", "123"]
844 );
845 }
846
847 #[test]
848 fn test_batch_config_default() {
849 let config = BatchConfig::default();
850 assert_eq!(config.max_concurrency, 5);
851 assert_eq!(config.rate_limit, None);
852 assert!(config.continue_on_error);
853 assert!(config.show_progress);
854 }
855
856 #[test]
857 fn test_batch_processor_creation() {
858 let config = BatchConfig {
859 max_concurrency: 10,
860 rate_limit: Some(5),
861 continue_on_error: false,
862 show_progress: false,
863 suppress_output: false,
864 };
865
866 let processor = BatchProcessor::new(config);
867 assert_eq!(processor.semaphore.available_permits(), 10);
868 assert!(processor.rate_limiter.is_some());
869 }
870}