1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::error::Error;
4use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
5use serde::{Deserialize, Serialize};
6use std::num::NonZeroU32;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::sync::Semaphore;
10
11#[derive(Debug, Clone)]
13pub struct BatchConfig {
14 pub max_concurrency: usize,
16 pub rate_limit: Option<u32>,
18 pub continue_on_error: bool,
20 pub show_progress: bool,
22 pub suppress_output: bool,
24}
25
26impl Default for BatchConfig {
27 fn default() -> Self {
28 Self {
29 max_concurrency: 5,
30 rate_limit: None,
31 continue_on_error: true,
32 show_progress: true,
33 suppress_output: false,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BatchOperation {
41 pub id: Option<String>,
43 pub args: Vec<String>,
45 pub description: Option<String>,
47 #[serde(default)]
49 pub headers: std::collections::HashMap<String, String>,
50 pub use_cache: Option<bool>,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
56pub struct BatchFile {
57 pub metadata: Option<BatchMetadata>,
59 pub operations: Vec<BatchOperation>,
61}
62
63#[derive(Debug, Serialize, Deserialize)]
65pub struct BatchMetadata {
66 pub name: Option<String>,
68 pub version: Option<String>,
70 pub description: Option<String>,
72 pub defaults: Option<BatchDefaults>,
74}
75
76#[derive(Debug, Serialize, Deserialize)]
78pub struct BatchDefaults {
79 #[serde(default)]
81 pub headers: std::collections::HashMap<String, String>,
82 pub use_cache: Option<bool>,
84}
85
86#[derive(Debug)]
88pub struct BatchOperationResult {
89 pub operation: BatchOperation,
91 pub success: bool,
93 pub error: Option<String>,
95 pub response: Option<String>,
97 pub duration: std::time::Duration,
99}
100
101#[derive(Debug)]
103pub struct BatchResult {
104 pub results: Vec<BatchOperationResult>,
106 pub total_duration: std::time::Duration,
108 pub success_count: usize,
110 pub failure_count: usize,
112}
113
114pub struct BatchProcessor {
116 config: BatchConfig,
117 rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
118 semaphore: Arc<Semaphore>,
119}
120
121impl BatchProcessor {
122 #[must_use]
128 pub fn new(config: BatchConfig) -> Self {
129 let rate_limiter = config.rate_limit.map(|limit| {
130 Arc::new(RateLimiter::direct(Quota::per_second(
131 NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).unwrap()),
132 )))
133 });
134
135 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
136
137 Self {
138 config,
139 rate_limiter,
140 semaphore,
141 }
142 }
143
144 pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
153 let content = tokio::fs::read_to_string(path)
154 .await
155 .map_err(|e| Error::io_error(format!("Failed to read batch file: {e}")))?;
156
157 if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
159 return Ok(batch_file);
160 }
161
162 if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
163 return Ok(batch_file);
164 }
165
166 Err(Error::validation_error(format!(
167 "Failed to parse batch file as JSON or YAML: {}",
168 path.display()
169 )))
170 }
171
172 #[allow(clippy::too_many_arguments)]
185 pub async fn execute_batch(
186 &self,
187 spec: &CachedSpec,
188 batch_file: BatchFile,
189 global_config: Option<&GlobalConfig>,
190 base_url: Option<&str>,
191 dry_run: bool,
192 output_format: &crate::cli::OutputFormat,
193 jq_filter: Option<&str>,
194 ) -> Result<BatchResult, Error> {
195 let start_time = std::time::Instant::now();
196 let total_operations = batch_file.operations.len();
197
198 if self.config.show_progress {
199 println!("Starting batch execution: {total_operations} operations");
200 }
201
202 let mut results = Vec::with_capacity(total_operations);
203 let mut handles = Vec::new();
204
205 for (index, operation) in batch_file.operations.into_iter().enumerate() {
207 let spec = spec.clone();
208 let global_config = global_config.cloned();
209 let base_url = base_url.map(String::from);
210 let output_format = output_format.clone();
211 let jq_filter = jq_filter.map(String::from);
212 let semaphore = Arc::clone(&self.semaphore);
213 let rate_limiter = self.rate_limiter.clone();
214 let show_progress = self.config.show_progress;
215 let suppress_output = self.config.suppress_output;
216
217 let handle = tokio::spawn(async move {
218 let _permit = semaphore.acquire().await.unwrap();
220
221 if let Some(limiter) = rate_limiter {
223 limiter.until_ready().await;
224 }
225
226 let operation_start = std::time::Instant::now();
227
228 let result = Self::execute_single_operation(
230 &spec,
231 &operation,
232 global_config.as_ref(),
233 base_url.as_deref(),
234 dry_run,
235 &output_format,
236 jq_filter.as_deref(),
237 suppress_output,
238 )
239 .await;
240
241 let duration = operation_start.elapsed();
242
243 let (success, error, response) = match result {
244 Ok(resp) => {
245 if show_progress {
246 println!("Operation {} completed", index + 1);
247 }
248 (true, None, Some(resp))
249 }
250 Err(e) => {
251 if show_progress {
252 println!("Operation {} failed: {}", index + 1, e);
253 }
254 (false, Some(e.to_string()), None)
255 }
256 };
257
258 BatchOperationResult {
259 operation,
260 success,
261 error,
262 response,
263 duration,
264 }
265 });
266
267 handles.push(handle);
268 }
269
270 for handle in handles {
272 let result = handle
273 .await
274 .map_err(|e| Error::invalid_config(format!("Task failed: {e}")))?;
275 results.push(result);
276 }
277
278 let total_duration = start_time.elapsed();
279 let success_count = results.iter().filter(|r| r.success).count();
280 let failure_count = results.len() - success_count;
281
282 if self.config.show_progress {
283 println!(
284 "Batch execution completed: {}/{} operations successful in {:.2}s",
285 success_count,
286 total_operations,
287 total_duration.as_secs_f64()
288 );
289 }
290
291 Ok(BatchResult {
292 results,
293 total_duration,
294 success_count,
295 failure_count,
296 })
297 }
298
299 #[allow(clippy::too_many_arguments)]
301 async fn execute_single_operation(
302 spec: &CachedSpec,
303 operation: &BatchOperation,
304 global_config: Option<&GlobalConfig>,
305 base_url: Option<&str>,
306 dry_run: bool,
307 output_format: &crate::cli::OutputFormat,
308 jq_filter: Option<&str>,
309 suppress_output: bool,
310 ) -> Result<String, Error> {
311 use crate::engine::generator;
312
313 let command = generator::generate_command_tree_with_flags(spec, false);
315
316 let matches = command
318 .try_get_matches_from(
319 std::iter::once(crate::constants::CLI_ROOT_COMMAND.to_string())
320 .chain(operation.args.clone()),
321 )
322 .map_err(|e| Error::invalid_command(crate::constants::CONTEXT_BATCH, e.to_string()))?;
323
324 let cache_config = if operation.use_cache.unwrap_or(false) {
326 Some(crate::response_cache::CacheConfig {
327 cache_dir: std::env::var(crate::constants::ENV_APERTURE_CONFIG_DIR)
328 .map_or_else(
329 |_| std::path::PathBuf::from("~/.config/aperture"),
330 std::path::PathBuf::from,
331 )
332 .join(crate::constants::DIR_CACHE)
333 .join(crate::constants::DIR_RESPONSES),
334 default_ttl: std::time::Duration::from_secs(300),
335 max_entries: 1000,
336 enabled: true,
337 })
338 } else {
339 None
340 };
341
342 if suppress_output {
343 let output = crate::engine::executor::execute_request(
345 spec,
346 &matches,
347 base_url,
348 dry_run,
349 None, global_config,
351 output_format,
352 jq_filter,
353 cache_config.as_ref(),
354 true, )
356 .await?;
357
358 Ok(output.unwrap_or_default())
360 } else {
361 if dry_run {
363 crate::engine::executor::execute_request(
365 spec,
366 &matches,
367 base_url,
368 true, None, global_config,
371 output_format,
372 jq_filter,
373 cache_config.as_ref(),
374 false, )
376 .await?;
377
378 Ok(format!(
380 "DRY RUN: Would execute operation with args: {:?}",
381 operation.args
382 ))
383 } else {
384 crate::engine::executor::execute_request(
387 spec,
388 &matches,
389 base_url,
390 false, None, global_config,
393 output_format,
394 jq_filter,
395 cache_config.as_ref(),
396 false, )
398 .await?;
399
400 Ok(format!(
402 "Successfully executed operation: {}",
403 operation
404 .id
405 .as_deref()
406 .unwrap_or(crate::constants::DEFAULT_OPERATION_NAME)
407 ))
408 }
409 }
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416 use std::io::Write;
417 use tempfile::NamedTempFile;
418
419 #[tokio::test]
420 async fn test_parse_batch_file_json() {
421 let batch_content = r#"{
422 "metadata": {
423 "name": "Test batch",
424 "description": "A test batch file"
425 },
426 "operations": [
427 {
428 "id": "op1",
429 "args": ["users", "list"],
430 "description": "List all users"
431 },
432 {
433 "id": "op2",
434 "args": ["users", "get", "--user-id", "123"],
435 "description": "Get user 123"
436 }
437 ]
438 }"#;
439
440 let mut temp_file = NamedTempFile::new().unwrap();
441 temp_file.write_all(batch_content.as_bytes()).unwrap();
442
443 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
444 .await
445 .unwrap();
446
447 assert_eq!(batch_file.operations.len(), 2);
448 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
449 assert_eq!(
450 batch_file.operations[1].args,
451 vec!["users", "get", "--user-id", "123"]
452 );
453 }
454
455 #[tokio::test]
456 async fn test_parse_batch_file_yaml() {
457 let batch_content = r#"
458metadata:
459 name: Test batch
460 description: A test batch file
461operations:
462 - id: op1
463 args: [users, list]
464 description: List all users
465 - id: op2
466 args: [users, get, --user-id, "123"]
467 description: Get user 123
468"#;
469
470 let mut temp_file = NamedTempFile::new().unwrap();
471 temp_file.write_all(batch_content.as_bytes()).unwrap();
472
473 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
474 .await
475 .unwrap();
476
477 assert_eq!(batch_file.operations.len(), 2);
478 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
479 assert_eq!(
480 batch_file.operations[1].args,
481 vec!["users", "get", "--user-id", "123"]
482 );
483 }
484
485 #[test]
486 fn test_batch_config_default() {
487 let config = BatchConfig::default();
488 assert_eq!(config.max_concurrency, 5);
489 assert_eq!(config.rate_limit, None);
490 assert!(config.continue_on_error);
491 assert!(config.show_progress);
492 }
493
494 #[test]
495 fn test_batch_processor_creation() {
496 let config = BatchConfig {
497 max_concurrency: 10,
498 rate_limit: Some(5),
499 continue_on_error: false,
500 show_progress: false,
501 suppress_output: false,
502 };
503
504 let processor = BatchProcessor::new(config);
505 assert_eq!(processor.semaphore.available_permits(), 10);
506 assert!(processor.rate_limiter.is_some());
507 }
508}