aperture_cli/
batch.rs

1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::error::Error;
4use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
5use serde::{Deserialize, Serialize};
6use std::num::NonZeroU32;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::sync::Semaphore;
10
11/// Configuration for batch processing operations
12#[derive(Debug, Clone)]
13pub struct BatchConfig {
14    /// Maximum number of concurrent requests
15    pub max_concurrency: usize,
16    /// Rate limit: requests per second
17    pub rate_limit: Option<u32>,
18    /// Whether to continue processing if a request fails
19    pub continue_on_error: bool,
20    /// Whether to show progress during processing
21    pub show_progress: bool,
22}
23
24impl Default for BatchConfig {
25    fn default() -> Self {
26        Self {
27            max_concurrency: 5,
28            rate_limit: None,
29            continue_on_error: true,
30            show_progress: true,
31        }
32    }
33}
34
35/// A single batch operation definition
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct BatchOperation {
38    /// Unique identifier for this operation (optional)
39    pub id: Option<String>,
40    /// The command arguments to execute (e.g., `["users", "get", "--user-id", "123"]`)
41    pub args: Vec<String>,
42    /// Optional description for this operation
43    pub description: Option<String>,
44    /// Custom headers for this specific operation
45    #[serde(default)]
46    pub headers: std::collections::HashMap<String, String>,
47    /// Whether to use cache for this operation (overrides global cache setting)
48    pub use_cache: Option<bool>,
49}
50
51/// Batch file format containing multiple operations
52#[derive(Debug, Serialize, Deserialize)]
53pub struct BatchFile {
54    /// Metadata about this batch
55    pub metadata: Option<BatchMetadata>,
56    /// List of operations to execute
57    pub operations: Vec<BatchOperation>,
58}
59
60/// Metadata for a batch file
61#[derive(Debug, Serialize, Deserialize)]
62pub struct BatchMetadata {
63    /// Name/description of this batch
64    pub name: Option<String>,
65    /// Version of the batch file format
66    pub version: Option<String>,
67    /// Description of what this batch does
68    pub description: Option<String>,
69    /// Default configuration for all operations in this batch
70    pub defaults: Option<BatchDefaults>,
71}
72
73/// Default configuration for batch operations
74#[derive(Debug, Serialize, Deserialize)]
75pub struct BatchDefaults {
76    /// Default headers to apply to all operations
77    #[serde(default)]
78    pub headers: std::collections::HashMap<String, String>,
79    /// Default cache setting for all operations
80    pub use_cache: Option<bool>,
81}
82
83/// Result of a single batch operation
84#[derive(Debug)]
85pub struct BatchOperationResult {
86    /// The operation that was executed
87    pub operation: BatchOperation,
88    /// Whether the operation succeeded
89    pub success: bool,
90    /// Error message if the operation failed
91    pub error: Option<String>,
92    /// Response body if the operation succeeded
93    pub response: Option<String>,
94    /// Time taken to execute this operation
95    pub duration: std::time::Duration,
96}
97
98/// Result of an entire batch execution
99#[derive(Debug)]
100pub struct BatchResult {
101    /// Results for each operation
102    pub results: Vec<BatchOperationResult>,
103    /// Total time taken for the entire batch
104    pub total_duration: std::time::Duration,
105    /// Number of successful operations
106    pub success_count: usize,
107    /// Number of failed operations
108    pub failure_count: usize,
109}
110
111/// Batch processor for executing multiple API operations
112pub struct BatchProcessor {
113    config: BatchConfig,
114    rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
115    semaphore: Arc<Semaphore>,
116}
117
118impl BatchProcessor {
119    /// Creates a new batch processor with the given configuration
120    ///
121    /// # Panics
122    ///
123    /// Panics if the rate limit is configured as 0 (which would be invalid)
124    #[must_use]
125    pub fn new(config: BatchConfig) -> Self {
126        let rate_limiter = config.rate_limit.map(|limit| {
127            Arc::new(RateLimiter::direct(Quota::per_second(
128                NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).unwrap()),
129            )))
130        });
131
132        let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
133
134        Self {
135            config,
136            rate_limiter,
137            semaphore,
138        }
139    }
140
141    /// Parses a batch file from the given path
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if:
146    /// - The file cannot be read
147    /// - The file is not valid JSON or YAML
148    /// - The file structure doesn't match the expected `BatchFile` format
149    pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
150        let content = tokio::fs::read_to_string(path).await.map_err(Error::Io)?;
151
152        // Try to parse as JSON first, then YAML
153        if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
154            return Ok(batch_file);
155        }
156
157        if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
158            return Ok(batch_file);
159        }
160
161        Err(Error::Validation(format!(
162            "Failed to parse batch file as JSON or YAML: {}",
163            path.display()
164        )))
165    }
166
167    /// Executes a batch of operations
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if:
172    /// - Any operation fails and `continue_on_error` is false
173    /// - Task spawning fails
174    /// - Network or API errors occur during operation execution
175    ///
176    /// # Panics
177    ///
178    /// Panics if the semaphore is poisoned (should not happen in normal operation)
179    #[allow(clippy::too_many_arguments)]
180    pub async fn execute_batch(
181        &self,
182        spec: &CachedSpec,
183        batch_file: BatchFile,
184        global_config: Option<&GlobalConfig>,
185        base_url: Option<&str>,
186        dry_run: bool,
187        output_format: &crate::cli::OutputFormat,
188        jq_filter: Option<&str>,
189    ) -> Result<BatchResult, Error> {
190        let start_time = std::time::Instant::now();
191        let total_operations = batch_file.operations.len();
192
193        if self.config.show_progress {
194            println!("Starting batch execution: {total_operations} operations");
195        }
196
197        let mut results = Vec::with_capacity(total_operations);
198        let mut handles = Vec::new();
199
200        // Create tasks for each operation
201        for (index, operation) in batch_file.operations.into_iter().enumerate() {
202            let spec = spec.clone();
203            let global_config = global_config.cloned();
204            let base_url = base_url.map(String::from);
205            let output_format = output_format.clone();
206            let jq_filter = jq_filter.map(String::from);
207            let semaphore = Arc::clone(&self.semaphore);
208            let rate_limiter = self.rate_limiter.clone();
209            let show_progress = self.config.show_progress;
210
211            let handle = tokio::spawn(async move {
212                // Acquire semaphore permit for concurrency control
213                let _permit = semaphore.acquire().await.unwrap();
214
215                // Apply rate limiting if configured
216                if let Some(limiter) = rate_limiter {
217                    limiter.until_ready().await;
218                }
219
220                let operation_start = std::time::Instant::now();
221
222                // Execute the operation
223                let result = Self::execute_single_operation(
224                    &spec,
225                    &operation,
226                    global_config.as_ref(),
227                    base_url.as_deref(),
228                    dry_run,
229                    &output_format,
230                    jq_filter.as_deref(),
231                )
232                .await;
233
234                let duration = operation_start.elapsed();
235
236                let (success, error, response) = match result {
237                    Ok(resp) => {
238                        if show_progress {
239                            println!("✓ Operation {} completed", index + 1);
240                        }
241                        (true, None, Some(resp))
242                    }
243                    Err(e) => {
244                        if show_progress {
245                            println!("✗ Operation {} failed: {}", index + 1, e);
246                        }
247                        (false, Some(e.to_string()), None)
248                    }
249                };
250
251                BatchOperationResult {
252                    operation,
253                    success,
254                    error,
255                    response,
256                    duration,
257                }
258            });
259
260            handles.push(handle);
261        }
262
263        // Collect all results
264        for handle in handles {
265            let result = handle
266                .await
267                .map_err(|e| Error::Config(format!("Task failed: {e}")))?;
268            results.push(result);
269        }
270
271        let total_duration = start_time.elapsed();
272        let success_count = results.iter().filter(|r| r.success).count();
273        let failure_count = results.len() - success_count;
274
275        if self.config.show_progress {
276            println!(
277                "Batch execution completed: {}/{} operations successful in {:.2}s",
278                success_count,
279                total_operations,
280                total_duration.as_secs_f64()
281            );
282        }
283
284        Ok(BatchResult {
285            results,
286            total_duration,
287            success_count,
288            failure_count,
289        })
290    }
291
292    /// Executes a single operation from a batch
293    async fn execute_single_operation(
294        spec: &CachedSpec,
295        operation: &BatchOperation,
296        global_config: Option<&GlobalConfig>,
297        base_url: Option<&str>,
298        dry_run: bool,
299        output_format: &crate::cli::OutputFormat,
300        jq_filter: Option<&str>,
301    ) -> Result<String, Error> {
302        use crate::engine::generator;
303
304        // Generate the command tree (we don't use experimental flags for batch operations)
305        let command = generator::generate_command_tree_with_flags(spec, false);
306
307        // Parse the operation args into ArgMatches
308        let matches = command
309            .try_get_matches_from(std::iter::once("api".to_string()).chain(operation.args.clone()))
310            .map_err(|e| Error::InvalidCommand {
311                context: "batch".to_string(),
312                reason: e.to_string(),
313            })?;
314
315        // Create cache configuration - for batch operations, we use the operation's use_cache setting
316        let cache_config = if operation.use_cache.unwrap_or(false) {
317            Some(crate::response_cache::CacheConfig {
318                cache_dir: std::env::var("APERTURE_CONFIG_DIR")
319                    .map_or_else(
320                        |_| std::path::PathBuf::from("~/.config/aperture"),
321                        std::path::PathBuf::from,
322                    )
323                    .join(".cache")
324                    .join("responses"),
325                default_ttl: std::time::Duration::from_secs(300),
326                max_entries: 1000,
327                enabled: true,
328            })
329        } else {
330            None
331        };
332
333        if dry_run {
334            // For dry run, we still call execute_request but with dry_run=true
335            crate::engine::executor::execute_request(
336                spec,
337                &matches,
338                base_url,
339                true, // dry_run
340                None, // idempotency_key
341                global_config,
342                output_format,
343                jq_filter,
344                cache_config.as_ref(),
345            )
346            .await?;
347
348            // Return dry run message
349            Ok(format!(
350                "DRY RUN: Would execute operation with args: {:?}",
351                operation.args
352            ))
353        } else {
354            // For actual execution, call execute_request normally
355            // The output will go to stdout as expected for batch operations
356            crate::engine::executor::execute_request(
357                spec,
358                &matches,
359                base_url,
360                false, // dry_run
361                None,  // idempotency_key
362                global_config,
363                output_format,
364                jq_filter,
365                cache_config.as_ref(),
366            )
367            .await?;
368
369            // Return success message
370            Ok(format!(
371                "Successfully executed operation: {}",
372                operation.id.as_deref().unwrap_or("unnamed")
373            ))
374        }
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use std::io::Write;
382    use tempfile::NamedTempFile;
383
384    #[tokio::test]
385    async fn test_parse_batch_file_json() {
386        let batch_content = r#"{
387            "metadata": {
388                "name": "Test batch",
389                "description": "A test batch file"
390            },
391            "operations": [
392                {
393                    "id": "op1",
394                    "args": ["users", "list"],
395                    "description": "List all users"
396                },
397                {
398                    "id": "op2", 
399                    "args": ["users", "get", "--user-id", "123"],
400                    "description": "Get user 123"
401                }
402            ]
403        }"#;
404
405        let mut temp_file = NamedTempFile::new().unwrap();
406        temp_file.write_all(batch_content.as_bytes()).unwrap();
407
408        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
409            .await
410            .unwrap();
411
412        assert_eq!(batch_file.operations.len(), 2);
413        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
414        assert_eq!(
415            batch_file.operations[1].args,
416            vec!["users", "get", "--user-id", "123"]
417        );
418    }
419
420    #[tokio::test]
421    async fn test_parse_batch_file_yaml() {
422        let batch_content = r#"
423metadata:
424  name: Test batch
425  description: A test batch file
426operations:
427  - id: op1
428    args: [users, list]
429    description: List all users
430  - id: op2
431    args: [users, get, --user-id, "123"]
432    description: Get user 123
433"#;
434
435        let mut temp_file = NamedTempFile::new().unwrap();
436        temp_file.write_all(batch_content.as_bytes()).unwrap();
437
438        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
439            .await
440            .unwrap();
441
442        assert_eq!(batch_file.operations.len(), 2);
443        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
444        assert_eq!(
445            batch_file.operations[1].args,
446            vec!["users", "get", "--user-id", "123"]
447        );
448    }
449
450    #[test]
451    fn test_batch_config_default() {
452        let config = BatchConfig::default();
453        assert_eq!(config.max_concurrency, 5);
454        assert_eq!(config.rate_limit, None);
455        assert!(config.continue_on_error);
456        assert!(config.show_progress);
457    }
458
459    #[test]
460    fn test_batch_processor_creation() {
461        let config = BatchConfig {
462            max_concurrency: 10,
463            rate_limit: Some(5),
464            continue_on_error: false,
465            show_progress: false,
466        };
467
468        let processor = BatchProcessor::new(config);
469        assert_eq!(processor.semaphore.available_permits(), 10);
470        assert!(processor.rate_limiter.is_some());
471    }
472}