1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::engine::generator;
4use crate::error::Error;
5use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
6use serde::{Deserialize, Serialize};
7use std::num::NonZeroU32;
8use std::path::Path;
9use std::sync::Arc;
10use tokio::sync::Semaphore;
11
12#[derive(Debug, Clone)]
14pub struct BatchConfig {
15 pub max_concurrency: usize,
17 pub rate_limit: Option<u32>,
19 pub continue_on_error: bool,
21 pub show_progress: bool,
23 pub suppress_output: bool,
25}
26
27impl Default for BatchConfig {
28 fn default() -> Self {
29 Self {
30 max_concurrency: 5,
31 rate_limit: None,
32 continue_on_error: true,
33 show_progress: true,
34 suppress_output: false,
35 }
36 }
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct BatchOperation {
42 pub id: Option<String>,
44 pub args: Vec<String>,
46 pub description: Option<String>,
48 #[serde(default)]
50 pub headers: std::collections::HashMap<String, String>,
51 pub use_cache: Option<bool>,
53}
54
55#[derive(Debug, Serialize, Deserialize)]
57pub struct BatchFile {
58 pub metadata: Option<BatchMetadata>,
60 pub operations: Vec<BatchOperation>,
62}
63
64#[derive(Debug, Serialize, Deserialize)]
66pub struct BatchMetadata {
67 pub name: Option<String>,
69 pub version: Option<String>,
71 pub description: Option<String>,
73 pub defaults: Option<BatchDefaults>,
75}
76
77#[derive(Debug, Serialize, Deserialize)]
79pub struct BatchDefaults {
80 #[serde(default)]
82 pub headers: std::collections::HashMap<String, String>,
83 pub use_cache: Option<bool>,
85}
86
87#[derive(Debug)]
89pub struct BatchOperationResult {
90 pub operation: BatchOperation,
92 pub success: bool,
94 pub error: Option<String>,
96 pub response: Option<String>,
98 pub duration: std::time::Duration,
100}
101
102#[derive(Debug)]
104pub struct BatchResult {
105 pub results: Vec<BatchOperationResult>,
107 pub total_duration: std::time::Duration,
109 pub success_count: usize,
111 pub failure_count: usize,
113}
114
115pub struct BatchProcessor {
117 config: BatchConfig,
118 rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
119 semaphore: Arc<Semaphore>,
120}
121
122impl BatchProcessor {
123 #[must_use]
129 pub fn new(config: BatchConfig) -> Self {
130 let rate_limiter = config.rate_limit.map(|limit| {
131 Arc::new(RateLimiter::direct(Quota::per_second(
132 NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).unwrap()),
133 )))
134 });
135
136 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
137
138 Self {
139 config,
140 rate_limiter,
141 semaphore,
142 }
143 }
144
145 pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
154 let content = tokio::fs::read_to_string(path)
155 .await
156 .map_err(|e| Error::io_error(format!("Failed to read batch file: {e}")))?;
157
158 if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
160 return Ok(batch_file);
161 }
162
163 if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
164 return Ok(batch_file);
165 }
166
167 Err(Error::validation_error(format!(
168 "Failed to parse batch file as JSON or YAML: {}",
169 path.display()
170 )))
171 }
172
173 #[allow(clippy::too_many_arguments)]
186 pub async fn execute_batch(
187 &self,
188 spec: &CachedSpec,
189 batch_file: BatchFile,
190 global_config: Option<&GlobalConfig>,
191 base_url: Option<&str>,
192 dry_run: bool,
193 output_format: &crate::cli::OutputFormat,
194 jq_filter: Option<&str>,
195 ) -> Result<BatchResult, Error> {
196 let start_time = std::time::Instant::now();
197 let total_operations = batch_file.operations.len();
198
199 if self.config.show_progress {
200 println!("Starting batch execution: {total_operations} operations");
201 }
202
203 let mut results = Vec::with_capacity(total_operations);
204 let mut handles = Vec::new();
205
206 for (index, operation) in batch_file.operations.into_iter().enumerate() {
208 let spec = spec.clone();
209 let global_config = global_config.cloned();
210 let base_url = base_url.map(String::from);
211 let output_format = output_format.clone();
212 let jq_filter = jq_filter.map(String::from);
213 let semaphore = Arc::clone(&self.semaphore);
214 let rate_limiter = self.rate_limiter.clone();
215 let show_progress = self.config.show_progress;
216 let suppress_output = self.config.suppress_output;
217
218 let handle = tokio::spawn(async move {
219 let _permit = semaphore.acquire().await.unwrap();
221
222 if let Some(limiter) = rate_limiter {
224 limiter.until_ready().await;
225 }
226
227 let operation_start = std::time::Instant::now();
228
229 let result = Self::execute_single_operation(
231 &spec,
232 &operation,
233 global_config.as_ref(),
234 base_url.as_deref(),
235 dry_run,
236 &output_format,
237 jq_filter.as_deref(),
238 suppress_output,
239 )
240 .await;
241
242 let duration = operation_start.elapsed();
243
244 let (success, error, response) = match result {
245 Ok(resp) => {
246 if show_progress {
247 println!("Operation {} completed", index + 1);
248 }
249 (true, None, Some(resp))
250 }
251 Err(e) => {
252 if show_progress {
253 println!("Operation {} failed: {}", index + 1, e);
254 }
255 (false, Some(e.to_string()), None)
256 }
257 };
258
259 BatchOperationResult {
260 operation,
261 success,
262 error,
263 response,
264 duration,
265 }
266 });
267
268 handles.push(handle);
269 }
270
271 for handle in handles {
273 let result = handle
274 .await
275 .map_err(|e| Error::invalid_config(format!("Task failed: {e}")))?;
276 results.push(result);
277 }
278
279 let total_duration = start_time.elapsed();
280 let success_count = results.iter().filter(|r| r.success).count();
281 let failure_count = results.len() - success_count;
282
283 if self.config.show_progress {
284 println!(
285 "Batch execution completed: {}/{} operations successful in {:.2}s",
286 success_count,
287 total_operations,
288 total_duration.as_secs_f64()
289 );
290 }
291
292 Ok(BatchResult {
293 results,
294 total_duration,
295 success_count,
296 failure_count,
297 })
298 }
299
300 #[allow(clippy::too_many_arguments)]
302 async fn execute_single_operation(
303 spec: &CachedSpec,
304 operation: &BatchOperation,
305 global_config: Option<&GlobalConfig>,
306 base_url: Option<&str>,
307 dry_run: bool,
308 output_format: &crate::cli::OutputFormat,
309 jq_filter: Option<&str>,
310 suppress_output: bool,
311 ) -> Result<String, Error> {
312 let command = generator::generate_command_tree_with_flags(spec, false);
314
315 let matches = command
317 .try_get_matches_from(
318 std::iter::once(crate::constants::CLI_ROOT_COMMAND.to_string())
319 .chain(operation.args.clone()),
320 )
321 .map_err(|e| Error::invalid_command(crate::constants::CONTEXT_BATCH, e.to_string()))?;
322
323 let cache_config = if operation.use_cache.unwrap_or(false) {
325 Some(crate::response_cache::CacheConfig {
326 cache_dir: std::env::var(crate::constants::ENV_APERTURE_CONFIG_DIR)
327 .map_or_else(
328 |_| std::path::PathBuf::from("~/.config/aperture"),
329 std::path::PathBuf::from,
330 )
331 .join(crate::constants::DIR_CACHE)
332 .join(crate::constants::DIR_RESPONSES),
333 default_ttl: std::time::Duration::from_secs(300),
334 max_entries: 1000,
335 enabled: true,
336 })
337 } else {
338 None
339 };
340
341 if suppress_output {
342 let output = crate::engine::executor::execute_request(
344 spec,
345 &matches,
346 base_url,
347 dry_run,
348 None, global_config,
350 output_format,
351 jq_filter,
352 cache_config.as_ref(),
353 true, )
355 .await?;
356
357 Ok(output.unwrap_or_default())
359 } else {
360 if dry_run {
362 crate::engine::executor::execute_request(
364 spec,
365 &matches,
366 base_url,
367 true, None, global_config,
370 output_format,
371 jq_filter,
372 cache_config.as_ref(),
373 false, )
375 .await?;
376
377 Ok(format!(
379 "DRY RUN: Would execute operation with args: {:?}",
380 operation.args
381 ))
382 } else {
383 crate::engine::executor::execute_request(
386 spec,
387 &matches,
388 base_url,
389 false, None, global_config,
392 output_format,
393 jq_filter,
394 cache_config.as_ref(),
395 false, )
397 .await?;
398
399 Ok(format!(
401 "Successfully executed operation: {}",
402 operation
403 .id
404 .as_deref()
405 .unwrap_or(crate::constants::DEFAULT_OPERATION_NAME)
406 ))
407 }
408 }
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use std::io::Write;
416 use tempfile::NamedTempFile;
417
418 #[tokio::test]
419 async fn test_parse_batch_file_json() {
420 let batch_content = r#"{
421 "metadata": {
422 "name": "Test batch",
423 "description": "A test batch file"
424 },
425 "operations": [
426 {
427 "id": "op1",
428 "args": ["users", "list"],
429 "description": "List all users"
430 },
431 {
432 "id": "op2",
433 "args": ["users", "get", "--user-id", "123"],
434 "description": "Get user 123"
435 }
436 ]
437 }"#;
438
439 let mut temp_file = NamedTempFile::new().unwrap();
440 temp_file.write_all(batch_content.as_bytes()).unwrap();
441
442 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
443 .await
444 .unwrap();
445
446 assert_eq!(batch_file.operations.len(), 2);
447 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
448 assert_eq!(
449 batch_file.operations[1].args,
450 vec!["users", "get", "--user-id", "123"]
451 );
452 }
453
454 #[tokio::test]
455 async fn test_parse_batch_file_yaml() {
456 let batch_content = r#"
457metadata:
458 name: Test batch
459 description: A test batch file
460operations:
461 - id: op1
462 args: [users, list]
463 description: List all users
464 - id: op2
465 args: [users, get, --user-id, "123"]
466 description: Get user 123
467"#;
468
469 let mut temp_file = NamedTempFile::new().unwrap();
470 temp_file.write_all(batch_content.as_bytes()).unwrap();
471
472 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
473 .await
474 .unwrap();
475
476 assert_eq!(batch_file.operations.len(), 2);
477 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
478 assert_eq!(
479 batch_file.operations[1].args,
480 vec!["users", "get", "--user-id", "123"]
481 );
482 }
483
484 #[test]
485 fn test_batch_config_default() {
486 let config = BatchConfig::default();
487 assert_eq!(config.max_concurrency, 5);
488 assert_eq!(config.rate_limit, None);
489 assert!(config.continue_on_error);
490 assert!(config.show_progress);
491 }
492
493 #[test]
494 fn test_batch_processor_creation() {
495 let config = BatchConfig {
496 max_concurrency: 10,
497 rate_limit: Some(5),
498 continue_on_error: false,
499 show_progress: false,
500 suppress_output: false,
501 };
502
503 let processor = BatchProcessor::new(config);
504 assert_eq!(processor.semaphore.available_permits(), 10);
505 assert!(processor.rate_limiter.is_some());
506 }
507}