1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::error::Error;
4use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
5use serde::{Deserialize, Serialize};
6use std::num::NonZeroU32;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::sync::Semaphore;
10
11#[derive(Debug, Clone)]
13pub struct BatchConfig {
14 pub max_concurrency: usize,
16 pub rate_limit: Option<u32>,
18 pub continue_on_error: bool,
20 pub show_progress: bool,
22 pub suppress_output: bool,
24}
25
26impl Default for BatchConfig {
27 fn default() -> Self {
28 Self {
29 max_concurrency: 5,
30 rate_limit: None,
31 continue_on_error: true,
32 show_progress: true,
33 suppress_output: false,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BatchOperation {
41 pub id: Option<String>,
43 pub args: Vec<String>,
45 pub description: Option<String>,
47 #[serde(default)]
49 pub headers: std::collections::HashMap<String, String>,
50 pub use_cache: Option<bool>,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
56pub struct BatchFile {
57 pub metadata: Option<BatchMetadata>,
59 pub operations: Vec<BatchOperation>,
61}
62
63#[derive(Debug, Serialize, Deserialize)]
65pub struct BatchMetadata {
66 pub name: Option<String>,
68 pub version: Option<String>,
70 pub description: Option<String>,
72 pub defaults: Option<BatchDefaults>,
74}
75
76#[derive(Debug, Serialize, Deserialize)]
78pub struct BatchDefaults {
79 #[serde(default)]
81 pub headers: std::collections::HashMap<String, String>,
82 pub use_cache: Option<bool>,
84}
85
86#[derive(Debug)]
88pub struct BatchOperationResult {
89 pub operation: BatchOperation,
91 pub success: bool,
93 pub error: Option<String>,
95 pub response: Option<String>,
97 pub duration: std::time::Duration,
99}
100
101#[derive(Debug)]
103pub struct BatchResult {
104 pub results: Vec<BatchOperationResult>,
106 pub total_duration: std::time::Duration,
108 pub success_count: usize,
110 pub failure_count: usize,
112}
113
114pub struct BatchProcessor {
116 config: BatchConfig,
117 rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
118 semaphore: Arc<Semaphore>,
119}
120
121impl BatchProcessor {
122 #[must_use]
128 pub fn new(config: BatchConfig) -> Self {
129 let rate_limiter = config.rate_limit.map(|limit| {
130 Arc::new(RateLimiter::direct(Quota::per_second(
131 NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).unwrap()),
132 )))
133 });
134
135 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
136
137 Self {
138 config,
139 rate_limiter,
140 semaphore,
141 }
142 }
143
144 pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
153 let content = tokio::fs::read_to_string(path).await.map_err(Error::Io)?;
154
155 if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
157 return Ok(batch_file);
158 }
159
160 if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
161 return Ok(batch_file);
162 }
163
164 Err(Error::Validation(format!(
165 "Failed to parse batch file as JSON or YAML: {}",
166 path.display()
167 )))
168 }
169
170 #[allow(clippy::too_many_arguments)]
183 pub async fn execute_batch(
184 &self,
185 spec: &CachedSpec,
186 batch_file: BatchFile,
187 global_config: Option<&GlobalConfig>,
188 base_url: Option<&str>,
189 dry_run: bool,
190 output_format: &crate::cli::OutputFormat,
191 jq_filter: Option<&str>,
192 ) -> Result<BatchResult, Error> {
193 let start_time = std::time::Instant::now();
194 let total_operations = batch_file.operations.len();
195
196 if self.config.show_progress {
197 println!("Starting batch execution: {total_operations} operations");
198 }
199
200 let mut results = Vec::with_capacity(total_operations);
201 let mut handles = Vec::new();
202
203 for (index, operation) in batch_file.operations.into_iter().enumerate() {
205 let spec = spec.clone();
206 let global_config = global_config.cloned();
207 let base_url = base_url.map(String::from);
208 let output_format = output_format.clone();
209 let jq_filter = jq_filter.map(String::from);
210 let semaphore = Arc::clone(&self.semaphore);
211 let rate_limiter = self.rate_limiter.clone();
212 let show_progress = self.config.show_progress;
213 let suppress_output = self.config.suppress_output;
214
215 let handle = tokio::spawn(async move {
216 let _permit = semaphore.acquire().await.unwrap();
218
219 if let Some(limiter) = rate_limiter {
221 limiter.until_ready().await;
222 }
223
224 let operation_start = std::time::Instant::now();
225
226 let result = Self::execute_single_operation(
228 &spec,
229 &operation,
230 global_config.as_ref(),
231 base_url.as_deref(),
232 dry_run,
233 &output_format,
234 jq_filter.as_deref(),
235 suppress_output,
236 )
237 .await;
238
239 let duration = operation_start.elapsed();
240
241 let (success, error, response) = match result {
242 Ok(resp) => {
243 if show_progress {
244 println!("Operation {} completed", index + 1);
245 }
246 (true, None, Some(resp))
247 }
248 Err(e) => {
249 if show_progress {
250 println!("Operation {} failed: {}", index + 1, e);
251 }
252 (false, Some(e.to_string()), None)
253 }
254 };
255
256 BatchOperationResult {
257 operation,
258 success,
259 error,
260 response,
261 duration,
262 }
263 });
264
265 handles.push(handle);
266 }
267
268 for handle in handles {
270 let result = handle
271 .await
272 .map_err(|e| Error::Config(format!("Task failed: {e}")))?;
273 results.push(result);
274 }
275
276 let total_duration = start_time.elapsed();
277 let success_count = results.iter().filter(|r| r.success).count();
278 let failure_count = results.len() - success_count;
279
280 if self.config.show_progress {
281 println!(
282 "Batch execution completed: {}/{} operations successful in {:.2}s",
283 success_count,
284 total_operations,
285 total_duration.as_secs_f64()
286 );
287 }
288
289 Ok(BatchResult {
290 results,
291 total_duration,
292 success_count,
293 failure_count,
294 })
295 }
296
297 #[allow(clippy::too_many_arguments)]
299 async fn execute_single_operation(
300 spec: &CachedSpec,
301 operation: &BatchOperation,
302 global_config: Option<&GlobalConfig>,
303 base_url: Option<&str>,
304 dry_run: bool,
305 output_format: &crate::cli::OutputFormat,
306 jq_filter: Option<&str>,
307 suppress_output: bool,
308 ) -> Result<String, Error> {
309 use crate::engine::generator;
310
311 let command = generator::generate_command_tree_with_flags(spec, false);
313
314 let matches = command
316 .try_get_matches_from(std::iter::once("api".to_string()).chain(operation.args.clone()))
317 .map_err(|e| Error::InvalidCommand {
318 context: "batch".to_string(),
319 reason: e.to_string(),
320 })?;
321
322 let cache_config = if operation.use_cache.unwrap_or(false) {
324 Some(crate::response_cache::CacheConfig {
325 cache_dir: std::env::var("APERTURE_CONFIG_DIR")
326 .map_or_else(
327 |_| std::path::PathBuf::from("~/.config/aperture"),
328 std::path::PathBuf::from,
329 )
330 .join(".cache")
331 .join("responses"),
332 default_ttl: std::time::Duration::from_secs(300),
333 max_entries: 1000,
334 enabled: true,
335 })
336 } else {
337 None
338 };
339
340 if suppress_output {
341 let output = crate::engine::executor::execute_request(
343 spec,
344 &matches,
345 base_url,
346 dry_run,
347 None, global_config,
349 output_format,
350 jq_filter,
351 cache_config.as_ref(),
352 true, )
354 .await?;
355
356 Ok(output.unwrap_or_default())
358 } else {
359 if dry_run {
361 crate::engine::executor::execute_request(
363 spec,
364 &matches,
365 base_url,
366 true, None, global_config,
369 output_format,
370 jq_filter,
371 cache_config.as_ref(),
372 false, )
374 .await?;
375
376 Ok(format!(
378 "DRY RUN: Would execute operation with args: {:?}",
379 operation.args
380 ))
381 } else {
382 crate::engine::executor::execute_request(
385 spec,
386 &matches,
387 base_url,
388 false, None, global_config,
391 output_format,
392 jq_filter,
393 cache_config.as_ref(),
394 false, )
396 .await?;
397
398 Ok(format!(
400 "Successfully executed operation: {}",
401 operation.id.as_deref().unwrap_or("unnamed")
402 ))
403 }
404 }
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use std::io::Write;
412 use tempfile::NamedTempFile;
413
414 #[tokio::test]
415 async fn test_parse_batch_file_json() {
416 let batch_content = r#"{
417 "metadata": {
418 "name": "Test batch",
419 "description": "A test batch file"
420 },
421 "operations": [
422 {
423 "id": "op1",
424 "args": ["users", "list"],
425 "description": "List all users"
426 },
427 {
428 "id": "op2",
429 "args": ["users", "get", "--user-id", "123"],
430 "description": "Get user 123"
431 }
432 ]
433 }"#;
434
435 let mut temp_file = NamedTempFile::new().unwrap();
436 temp_file.write_all(batch_content.as_bytes()).unwrap();
437
438 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
439 .await
440 .unwrap();
441
442 assert_eq!(batch_file.operations.len(), 2);
443 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
444 assert_eq!(
445 batch_file.operations[1].args,
446 vec!["users", "get", "--user-id", "123"]
447 );
448 }
449
450 #[tokio::test]
451 async fn test_parse_batch_file_yaml() {
452 let batch_content = r#"
453metadata:
454 name: Test batch
455 description: A test batch file
456operations:
457 - id: op1
458 args: [users, list]
459 description: List all users
460 - id: op2
461 args: [users, get, --user-id, "123"]
462 description: Get user 123
463"#;
464
465 let mut temp_file = NamedTempFile::new().unwrap();
466 temp_file.write_all(batch_content.as_bytes()).unwrap();
467
468 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
469 .await
470 .unwrap();
471
472 assert_eq!(batch_file.operations.len(), 2);
473 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
474 assert_eq!(
475 batch_file.operations[1].args,
476 vec!["users", "get", "--user-id", "123"]
477 );
478 }
479
480 #[test]
481 fn test_batch_config_default() {
482 let config = BatchConfig::default();
483 assert_eq!(config.max_concurrency, 5);
484 assert_eq!(config.rate_limit, None);
485 assert!(config.continue_on_error);
486 assert!(config.show_progress);
487 }
488
489 #[test]
490 fn test_batch_processor_creation() {
491 let config = BatchConfig {
492 max_concurrency: 10,
493 rate_limit: Some(5),
494 continue_on_error: false,
495 show_progress: false,
496 suppress_output: false,
497 };
498
499 let processor = BatchProcessor::new(config);
500 assert_eq!(processor.semaphore.available_permits(), 10);
501 assert!(processor.rate_limiter.is_some());
502 }
503}