aperture_cli/
batch.rs

1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::error::Error;
4use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
5use serde::{Deserialize, Serialize};
6use std::num::NonZeroU32;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::sync::Semaphore;
10
11/// Configuration for batch processing operations
12#[derive(Debug, Clone)]
13pub struct BatchConfig {
14    /// Maximum number of concurrent requests
15    pub max_concurrency: usize,
16    /// Rate limit: requests per second
17    pub rate_limit: Option<u32>,
18    /// Whether to continue processing if a request fails
19    pub continue_on_error: bool,
20    /// Whether to show progress during processing
21    pub show_progress: bool,
22    /// Whether to suppress individual operation outputs
23    pub suppress_output: bool,
24}
25
26impl Default for BatchConfig {
27    fn default() -> Self {
28        Self {
29            max_concurrency: 5,
30            rate_limit: None,
31            continue_on_error: true,
32            show_progress: true,
33            suppress_output: false,
34        }
35    }
36}
37
38/// A single batch operation definition
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BatchOperation {
41    /// Unique identifier for this operation (optional)
42    pub id: Option<String>,
43    /// The command arguments to execute (e.g., `["users", "get", "--user-id", "123"]`)
44    pub args: Vec<String>,
45    /// Optional description for this operation
46    pub description: Option<String>,
47    /// Custom headers for this specific operation
48    #[serde(default)]
49    pub headers: std::collections::HashMap<String, String>,
50    /// Whether to use cache for this operation (overrides global cache setting)
51    pub use_cache: Option<bool>,
52}
53
54/// Batch file format containing multiple operations
55#[derive(Debug, Serialize, Deserialize)]
56pub struct BatchFile {
57    /// Metadata about this batch
58    pub metadata: Option<BatchMetadata>,
59    /// List of operations to execute
60    pub operations: Vec<BatchOperation>,
61}
62
63/// Metadata for a batch file
64#[derive(Debug, Serialize, Deserialize)]
65pub struct BatchMetadata {
66    /// Name/description of this batch
67    pub name: Option<String>,
68    /// Version of the batch file format
69    pub version: Option<String>,
70    /// Description of what this batch does
71    pub description: Option<String>,
72    /// Default configuration for all operations in this batch
73    pub defaults: Option<BatchDefaults>,
74}
75
76/// Default configuration for batch operations
77#[derive(Debug, Serialize, Deserialize)]
78pub struct BatchDefaults {
79    /// Default headers to apply to all operations
80    #[serde(default)]
81    pub headers: std::collections::HashMap<String, String>,
82    /// Default cache setting for all operations
83    pub use_cache: Option<bool>,
84}
85
86/// Result of a single batch operation
87#[derive(Debug)]
88pub struct BatchOperationResult {
89    /// The operation that was executed
90    pub operation: BatchOperation,
91    /// Whether the operation succeeded
92    pub success: bool,
93    /// Error message if the operation failed
94    pub error: Option<String>,
95    /// Response body if the operation succeeded
96    pub response: Option<String>,
97    /// Time taken to execute this operation
98    pub duration: std::time::Duration,
99}
100
101/// Result of an entire batch execution
102#[derive(Debug)]
103pub struct BatchResult {
104    /// Results for each operation
105    pub results: Vec<BatchOperationResult>,
106    /// Total time taken for the entire batch
107    pub total_duration: std::time::Duration,
108    /// Number of successful operations
109    pub success_count: usize,
110    /// Number of failed operations
111    pub failure_count: usize,
112}
113
114/// Batch processor for executing multiple API operations
115pub struct BatchProcessor {
116    config: BatchConfig,
117    rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
118    semaphore: Arc<Semaphore>,
119}
120
121impl BatchProcessor {
122    /// Creates a new batch processor with the given configuration
123    ///
124    /// # Panics
125    ///
126    /// Panics if the rate limit is configured as 0 (which would be invalid)
127    #[must_use]
128    pub fn new(config: BatchConfig) -> Self {
129        let rate_limiter = config.rate_limit.map(|limit| {
130            Arc::new(RateLimiter::direct(Quota::per_second(
131                NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).unwrap()),
132            )))
133        });
134
135        let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
136
137        Self {
138            config,
139            rate_limiter,
140            semaphore,
141        }
142    }
143
144    /// Parses a batch file from the given path
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if:
149    /// - The file cannot be read
150    /// - The file is not valid JSON or YAML
151    /// - The file structure doesn't match the expected `BatchFile` format
152    pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
153        let content = tokio::fs::read_to_string(path).await.map_err(Error::Io)?;
154
155        // Try to parse as JSON first, then YAML
156        if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
157            return Ok(batch_file);
158        }
159
160        if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
161            return Ok(batch_file);
162        }
163
164        Err(Error::Validation(format!(
165            "Failed to parse batch file as JSON or YAML: {}",
166            path.display()
167        )))
168    }
169
170    /// Executes a batch of operations
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if:
175    /// - Any operation fails and `continue_on_error` is false
176    /// - Task spawning fails
177    /// - Network or API errors occur during operation execution
178    ///
179    /// # Panics
180    ///
181    /// Panics if the semaphore is poisoned (should not happen in normal operation)
182    #[allow(clippy::too_many_arguments)]
183    pub async fn execute_batch(
184        &self,
185        spec: &CachedSpec,
186        batch_file: BatchFile,
187        global_config: Option<&GlobalConfig>,
188        base_url: Option<&str>,
189        dry_run: bool,
190        output_format: &crate::cli::OutputFormat,
191        jq_filter: Option<&str>,
192    ) -> Result<BatchResult, Error> {
193        let start_time = std::time::Instant::now();
194        let total_operations = batch_file.operations.len();
195
196        if self.config.show_progress {
197            println!("Starting batch execution: {total_operations} operations");
198        }
199
200        let mut results = Vec::with_capacity(total_operations);
201        let mut handles = Vec::new();
202
203        // Create tasks for each operation
204        for (index, operation) in batch_file.operations.into_iter().enumerate() {
205            let spec = spec.clone();
206            let global_config = global_config.cloned();
207            let base_url = base_url.map(String::from);
208            let output_format = output_format.clone();
209            let jq_filter = jq_filter.map(String::from);
210            let semaphore = Arc::clone(&self.semaphore);
211            let rate_limiter = self.rate_limiter.clone();
212            let show_progress = self.config.show_progress;
213            let suppress_output = self.config.suppress_output;
214
215            let handle = tokio::spawn(async move {
216                // Acquire semaphore permit for concurrency control
217                let _permit = semaphore.acquire().await.unwrap();
218
219                // Apply rate limiting if configured
220                if let Some(limiter) = rate_limiter {
221                    limiter.until_ready().await;
222                }
223
224                let operation_start = std::time::Instant::now();
225
226                // Execute the operation
227                let result = Self::execute_single_operation(
228                    &spec,
229                    &operation,
230                    global_config.as_ref(),
231                    base_url.as_deref(),
232                    dry_run,
233                    &output_format,
234                    jq_filter.as_deref(),
235                    suppress_output,
236                )
237                .await;
238
239                let duration = operation_start.elapsed();
240
241                let (success, error, response) = match result {
242                    Ok(resp) => {
243                        if show_progress {
244                            println!("Operation {} completed", index + 1);
245                        }
246                        (true, None, Some(resp))
247                    }
248                    Err(e) => {
249                        if show_progress {
250                            println!("Operation {} failed: {}", index + 1, e);
251                        }
252                        (false, Some(e.to_string()), None)
253                    }
254                };
255
256                BatchOperationResult {
257                    operation,
258                    success,
259                    error,
260                    response,
261                    duration,
262                }
263            });
264
265            handles.push(handle);
266        }
267
268        // Collect all results
269        for handle in handles {
270            let result = handle
271                .await
272                .map_err(|e| Error::Config(format!("Task failed: {e}")))?;
273            results.push(result);
274        }
275
276        let total_duration = start_time.elapsed();
277        let success_count = results.iter().filter(|r| r.success).count();
278        let failure_count = results.len() - success_count;
279
280        if self.config.show_progress {
281            println!(
282                "Batch execution completed: {}/{} operations successful in {:.2}s",
283                success_count,
284                total_operations,
285                total_duration.as_secs_f64()
286            );
287        }
288
289        Ok(BatchResult {
290            results,
291            total_duration,
292            success_count,
293            failure_count,
294        })
295    }
296
297    /// Executes a single operation from a batch
298    #[allow(clippy::too_many_arguments)]
299    async fn execute_single_operation(
300        spec: &CachedSpec,
301        operation: &BatchOperation,
302        global_config: Option<&GlobalConfig>,
303        base_url: Option<&str>,
304        dry_run: bool,
305        output_format: &crate::cli::OutputFormat,
306        jq_filter: Option<&str>,
307        suppress_output: bool,
308    ) -> Result<String, Error> {
309        use crate::engine::generator;
310
311        // Generate the command tree (we don't use experimental flags for batch operations)
312        let command = generator::generate_command_tree_with_flags(spec, false);
313
314        // Parse the operation args into ArgMatches
315        let matches = command
316            .try_get_matches_from(std::iter::once("api".to_string()).chain(operation.args.clone()))
317            .map_err(|e| Error::InvalidCommand {
318                context: "batch".to_string(),
319                reason: e.to_string(),
320            })?;
321
322        // Create cache configuration - for batch operations, we use the operation's use_cache setting
323        let cache_config = if operation.use_cache.unwrap_or(false) {
324            Some(crate::response_cache::CacheConfig {
325                cache_dir: std::env::var("APERTURE_CONFIG_DIR")
326                    .map_or_else(
327                        |_| std::path::PathBuf::from("~/.config/aperture"),
328                        std::path::PathBuf::from,
329                    )
330                    .join(".cache")
331                    .join("responses"),
332                default_ttl: std::time::Duration::from_secs(300),
333                max_entries: 1000,
334                enabled: true,
335            })
336        } else {
337            None
338        };
339
340        if suppress_output {
341            // When suppressing output, capture it
342            let output = crate::engine::executor::execute_request(
343                spec,
344                &matches,
345                base_url,
346                dry_run,
347                None, // idempotency_key
348                global_config,
349                output_format,
350                jq_filter,
351                cache_config.as_ref(),
352                true, // capture_output
353            )
354            .await?;
355
356            // Return captured output (for debugging/logging if needed)
357            Ok(output.unwrap_or_default())
358        } else {
359            // Normal execution - output goes to stdout
360            if dry_run {
361                // For dry run, we still call execute_request but with dry_run=true
362                crate::engine::executor::execute_request(
363                    spec,
364                    &matches,
365                    base_url,
366                    true, // dry_run
367                    None, // idempotency_key
368                    global_config,
369                    output_format,
370                    jq_filter,
371                    cache_config.as_ref(),
372                    false, // capture_output
373                )
374                .await?;
375
376                // Return dry run message
377                Ok(format!(
378                    "DRY RUN: Would execute operation with args: {:?}",
379                    operation.args
380                ))
381            } else {
382                // For actual execution, call execute_request normally
383                // The output will go to stdout as expected for batch operations
384                crate::engine::executor::execute_request(
385                    spec,
386                    &matches,
387                    base_url,
388                    false, // dry_run
389                    None,  // idempotency_key
390                    global_config,
391                    output_format,
392                    jq_filter,
393                    cache_config.as_ref(),
394                    false, // capture_output
395                )
396                .await?;
397
398                // Return success message
399                Ok(format!(
400                    "Successfully executed operation: {}",
401                    operation.id.as_deref().unwrap_or("unnamed")
402                ))
403            }
404        }
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use std::io::Write;
412    use tempfile::NamedTempFile;
413
414    #[tokio::test]
415    async fn test_parse_batch_file_json() {
416        let batch_content = r#"{
417            "metadata": {
418                "name": "Test batch",
419                "description": "A test batch file"
420            },
421            "operations": [
422                {
423                    "id": "op1",
424                    "args": ["users", "list"],
425                    "description": "List all users"
426                },
427                {
428                    "id": "op2", 
429                    "args": ["users", "get", "--user-id", "123"],
430                    "description": "Get user 123"
431                }
432            ]
433        }"#;
434
435        let mut temp_file = NamedTempFile::new().unwrap();
436        temp_file.write_all(batch_content.as_bytes()).unwrap();
437
438        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
439            .await
440            .unwrap();
441
442        assert_eq!(batch_file.operations.len(), 2);
443        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
444        assert_eq!(
445            batch_file.operations[1].args,
446            vec!["users", "get", "--user-id", "123"]
447        );
448    }
449
450    #[tokio::test]
451    async fn test_parse_batch_file_yaml() {
452        let batch_content = r#"
453metadata:
454  name: Test batch
455  description: A test batch file
456operations:
457  - id: op1
458    args: [users, list]
459    description: List all users
460  - id: op2
461    args: [users, get, --user-id, "123"]
462    description: Get user 123
463"#;
464
465        let mut temp_file = NamedTempFile::new().unwrap();
466        temp_file.write_all(batch_content.as_bytes()).unwrap();
467
468        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
469            .await
470            .unwrap();
471
472        assert_eq!(batch_file.operations.len(), 2);
473        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
474        assert_eq!(
475            batch_file.operations[1].args,
476            vec!["users", "get", "--user-id", "123"]
477        );
478    }
479
480    #[test]
481    fn test_batch_config_default() {
482        let config = BatchConfig::default();
483        assert_eq!(config.max_concurrency, 5);
484        assert_eq!(config.rate_limit, None);
485        assert!(config.continue_on_error);
486        assert!(config.show_progress);
487    }
488
489    #[test]
490    fn test_batch_processor_creation() {
491        let config = BatchConfig {
492            max_concurrency: 10,
493            rate_limit: Some(5),
494            continue_on_error: false,
495            show_progress: false,
496            suppress_output: false,
497        };
498
499        let processor = BatchProcessor::new(config);
500        assert_eq!(processor.semaphore.available_permits(), 10);
501        assert!(processor.rate_limiter.is_some());
502    }
503}