1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::error::Error;
4use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
5use serde::{Deserialize, Serialize};
6use std::num::NonZeroU32;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::sync::Semaphore;
10
11#[derive(Debug, Clone)]
13pub struct BatchConfig {
14 pub max_concurrency: usize,
16 pub rate_limit: Option<u32>,
18 pub continue_on_error: bool,
20 pub show_progress: bool,
22}
23
24impl Default for BatchConfig {
25 fn default() -> Self {
26 Self {
27 max_concurrency: 5,
28 rate_limit: None,
29 continue_on_error: true,
30 show_progress: true,
31 }
32 }
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct BatchOperation {
38 pub id: Option<String>,
40 pub args: Vec<String>,
42 pub description: Option<String>,
44 #[serde(default)]
46 pub headers: std::collections::HashMap<String, String>,
47 pub use_cache: Option<bool>,
49}
50
51#[derive(Debug, Serialize, Deserialize)]
53pub struct BatchFile {
54 pub metadata: Option<BatchMetadata>,
56 pub operations: Vec<BatchOperation>,
58}
59
60#[derive(Debug, Serialize, Deserialize)]
62pub struct BatchMetadata {
63 pub name: Option<String>,
65 pub version: Option<String>,
67 pub description: Option<String>,
69 pub defaults: Option<BatchDefaults>,
71}
72
73#[derive(Debug, Serialize, Deserialize)]
75pub struct BatchDefaults {
76 #[serde(default)]
78 pub headers: std::collections::HashMap<String, String>,
79 pub use_cache: Option<bool>,
81}
82
83#[derive(Debug)]
85pub struct BatchOperationResult {
86 pub operation: BatchOperation,
88 pub success: bool,
90 pub error: Option<String>,
92 pub response: Option<String>,
94 pub duration: std::time::Duration,
96}
97
98#[derive(Debug)]
100pub struct BatchResult {
101 pub results: Vec<BatchOperationResult>,
103 pub total_duration: std::time::Duration,
105 pub success_count: usize,
107 pub failure_count: usize,
109}
110
111pub struct BatchProcessor {
113 config: BatchConfig,
114 rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
115 semaphore: Arc<Semaphore>,
116}
117
118impl BatchProcessor {
119 #[must_use]
125 pub fn new(config: BatchConfig) -> Self {
126 let rate_limiter = config.rate_limit.map(|limit| {
127 Arc::new(RateLimiter::direct(Quota::per_second(
128 NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).unwrap()),
129 )))
130 });
131
132 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
133
134 Self {
135 config,
136 rate_limiter,
137 semaphore,
138 }
139 }
140
141 pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
150 let content = tokio::fs::read_to_string(path).await.map_err(Error::Io)?;
151
152 if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
154 return Ok(batch_file);
155 }
156
157 if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
158 return Ok(batch_file);
159 }
160
161 Err(Error::Validation(format!(
162 "Failed to parse batch file as JSON or YAML: {}",
163 path.display()
164 )))
165 }
166
167 #[allow(clippy::too_many_arguments)]
180 pub async fn execute_batch(
181 &self,
182 spec: &CachedSpec,
183 batch_file: BatchFile,
184 global_config: Option<&GlobalConfig>,
185 base_url: Option<&str>,
186 dry_run: bool,
187 output_format: &crate::cli::OutputFormat,
188 jq_filter: Option<&str>,
189 ) -> Result<BatchResult, Error> {
190 let start_time = std::time::Instant::now();
191 let total_operations = batch_file.operations.len();
192
193 if self.config.show_progress {
194 println!("Starting batch execution: {total_operations} operations");
195 }
196
197 let mut results = Vec::with_capacity(total_operations);
198 let mut handles = Vec::new();
199
200 for (index, operation) in batch_file.operations.into_iter().enumerate() {
202 let spec = spec.clone();
203 let global_config = global_config.cloned();
204 let base_url = base_url.map(String::from);
205 let output_format = output_format.clone();
206 let jq_filter = jq_filter.map(String::from);
207 let semaphore = Arc::clone(&self.semaphore);
208 let rate_limiter = self.rate_limiter.clone();
209 let show_progress = self.config.show_progress;
210
211 let handle = tokio::spawn(async move {
212 let _permit = semaphore.acquire().await.unwrap();
214
215 if let Some(limiter) = rate_limiter {
217 limiter.until_ready().await;
218 }
219
220 let operation_start = std::time::Instant::now();
221
222 let result = Self::execute_single_operation(
224 &spec,
225 &operation,
226 global_config.as_ref(),
227 base_url.as_deref(),
228 dry_run,
229 &output_format,
230 jq_filter.as_deref(),
231 )
232 .await;
233
234 let duration = operation_start.elapsed();
235
236 let (success, error, response) = match result {
237 Ok(resp) => {
238 if show_progress {
239 println!("✓ Operation {} completed", index + 1);
240 }
241 (true, None, Some(resp))
242 }
243 Err(e) => {
244 if show_progress {
245 println!("✗ Operation {} failed: {}", index + 1, e);
246 }
247 (false, Some(e.to_string()), None)
248 }
249 };
250
251 BatchOperationResult {
252 operation,
253 success,
254 error,
255 response,
256 duration,
257 }
258 });
259
260 handles.push(handle);
261 }
262
263 for handle in handles {
265 let result = handle
266 .await
267 .map_err(|e| Error::Config(format!("Task failed: {e}")))?;
268 results.push(result);
269 }
270
271 let total_duration = start_time.elapsed();
272 let success_count = results.iter().filter(|r| r.success).count();
273 let failure_count = results.len() - success_count;
274
275 if self.config.show_progress {
276 println!(
277 "Batch execution completed: {}/{} operations successful in {:.2}s",
278 success_count,
279 total_operations,
280 total_duration.as_secs_f64()
281 );
282 }
283
284 Ok(BatchResult {
285 results,
286 total_duration,
287 success_count,
288 failure_count,
289 })
290 }
291
292 async fn execute_single_operation(
294 spec: &CachedSpec,
295 operation: &BatchOperation,
296 global_config: Option<&GlobalConfig>,
297 base_url: Option<&str>,
298 dry_run: bool,
299 output_format: &crate::cli::OutputFormat,
300 jq_filter: Option<&str>,
301 ) -> Result<String, Error> {
302 use crate::engine::generator;
303
304 let command = generator::generate_command_tree_with_flags(spec, false);
306
307 let matches = command
309 .try_get_matches_from(std::iter::once("api".to_string()).chain(operation.args.clone()))
310 .map_err(|e| Error::InvalidCommand {
311 context: "batch".to_string(),
312 reason: e.to_string(),
313 })?;
314
315 let cache_config = if operation.use_cache.unwrap_or(false) {
317 Some(crate::response_cache::CacheConfig {
318 cache_dir: std::env::var("APERTURE_CONFIG_DIR")
319 .map_or_else(
320 |_| std::path::PathBuf::from("~/.config/aperture"),
321 std::path::PathBuf::from,
322 )
323 .join(".cache")
324 .join("responses"),
325 default_ttl: std::time::Duration::from_secs(300),
326 max_entries: 1000,
327 enabled: true,
328 })
329 } else {
330 None
331 };
332
333 if dry_run {
334 crate::engine::executor::execute_request(
336 spec,
337 &matches,
338 base_url,
339 true, None, global_config,
342 output_format,
343 jq_filter,
344 cache_config.as_ref(),
345 )
346 .await?;
347
348 Ok(format!(
350 "DRY RUN: Would execute operation with args: {:?}",
351 operation.args
352 ))
353 } else {
354 crate::engine::executor::execute_request(
357 spec,
358 &matches,
359 base_url,
360 false, None, global_config,
363 output_format,
364 jq_filter,
365 cache_config.as_ref(),
366 )
367 .await?;
368
369 Ok(format!(
371 "Successfully executed operation: {}",
372 operation.id.as_deref().unwrap_or("unnamed")
373 ))
374 }
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use std::io::Write;
382 use tempfile::NamedTempFile;
383
384 #[tokio::test]
385 async fn test_parse_batch_file_json() {
386 let batch_content = r#"{
387 "metadata": {
388 "name": "Test batch",
389 "description": "A test batch file"
390 },
391 "operations": [
392 {
393 "id": "op1",
394 "args": ["users", "list"],
395 "description": "List all users"
396 },
397 {
398 "id": "op2",
399 "args": ["users", "get", "--user-id", "123"],
400 "description": "Get user 123"
401 }
402 ]
403 }"#;
404
405 let mut temp_file = NamedTempFile::new().unwrap();
406 temp_file.write_all(batch_content.as_bytes()).unwrap();
407
408 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
409 .await
410 .unwrap();
411
412 assert_eq!(batch_file.operations.len(), 2);
413 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
414 assert_eq!(
415 batch_file.operations[1].args,
416 vec!["users", "get", "--user-id", "123"]
417 );
418 }
419
420 #[tokio::test]
421 async fn test_parse_batch_file_yaml() {
422 let batch_content = r#"
423metadata:
424 name: Test batch
425 description: A test batch file
426operations:
427 - id: op1
428 args: [users, list]
429 description: List all users
430 - id: op2
431 args: [users, get, --user-id, "123"]
432 description: Get user 123
433"#;
434
435 let mut temp_file = NamedTempFile::new().unwrap();
436 temp_file.write_all(batch_content.as_bytes()).unwrap();
437
438 let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
439 .await
440 .unwrap();
441
442 assert_eq!(batch_file.operations.len(), 2);
443 assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
444 assert_eq!(
445 batch_file.operations[1].args,
446 vec!["users", "get", "--user-id", "123"]
447 );
448 }
449
450 #[test]
451 fn test_batch_config_default() {
452 let config = BatchConfig::default();
453 assert_eq!(config.max_concurrency, 5);
454 assert_eq!(config.rate_limit, None);
455 assert!(config.continue_on_error);
456 assert!(config.show_progress);
457 }
458
459 #[test]
460 fn test_batch_processor_creation() {
461 let config = BatchConfig {
462 max_concurrency: 10,
463 rate_limit: Some(5),
464 continue_on_error: false,
465 show_progress: false,
466 };
467
468 let processor = BatchProcessor::new(config);
469 assert_eq!(processor.semaphore.available_permits(), 10);
470 assert!(processor.rate_limiter.is_some());
471 }
472}