aperture_cli/
batch.rs

1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::error::Error;
4use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
5use serde::{Deserialize, Serialize};
6use std::num::NonZeroU32;
7use std::path::Path;
8use std::sync::Arc;
9use tokio::sync::Semaphore;
10
11/// Configuration for batch processing operations
12#[derive(Debug, Clone)]
13pub struct BatchConfig {
14    /// Maximum number of concurrent requests
15    pub max_concurrency: usize,
16    /// Rate limit: requests per second
17    pub rate_limit: Option<u32>,
18    /// Whether to continue processing if a request fails
19    pub continue_on_error: bool,
20    /// Whether to show progress during processing
21    pub show_progress: bool,
22    /// Whether to suppress individual operation outputs
23    pub suppress_output: bool,
24}
25
26impl Default for BatchConfig {
27    fn default() -> Self {
28        Self {
29            max_concurrency: 5,
30            rate_limit: None,
31            continue_on_error: true,
32            show_progress: true,
33            suppress_output: false,
34        }
35    }
36}
37
38/// A single batch operation definition
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BatchOperation {
41    /// Unique identifier for this operation (optional)
42    pub id: Option<String>,
43    /// The command arguments to execute (e.g., `["users", "get", "--user-id", "123"]`)
44    pub args: Vec<String>,
45    /// Optional description for this operation
46    pub description: Option<String>,
47    /// Custom headers for this specific operation
48    #[serde(default)]
49    pub headers: std::collections::HashMap<String, String>,
50    /// Whether to use cache for this operation (overrides global cache setting)
51    pub use_cache: Option<bool>,
52}
53
54/// Batch file format containing multiple operations
55#[derive(Debug, Serialize, Deserialize)]
56pub struct BatchFile {
57    /// Metadata about this batch
58    pub metadata: Option<BatchMetadata>,
59    /// List of operations to execute
60    pub operations: Vec<BatchOperation>,
61}
62
63/// Metadata for a batch file
64#[derive(Debug, Serialize, Deserialize)]
65pub struct BatchMetadata {
66    /// Name/description of this batch
67    pub name: Option<String>,
68    /// Version of the batch file format
69    pub version: Option<String>,
70    /// Description of what this batch does
71    pub description: Option<String>,
72    /// Default configuration for all operations in this batch
73    pub defaults: Option<BatchDefaults>,
74}
75
76/// Default configuration for batch operations
77#[derive(Debug, Serialize, Deserialize)]
78pub struct BatchDefaults {
79    /// Default headers to apply to all operations
80    #[serde(default)]
81    pub headers: std::collections::HashMap<String, String>,
82    /// Default cache setting for all operations
83    pub use_cache: Option<bool>,
84}
85
86/// Result of a single batch operation
87#[derive(Debug)]
88pub struct BatchOperationResult {
89    /// The operation that was executed
90    pub operation: BatchOperation,
91    /// Whether the operation succeeded
92    pub success: bool,
93    /// Error message if the operation failed
94    pub error: Option<String>,
95    /// Response body if the operation succeeded
96    pub response: Option<String>,
97    /// Time taken to execute this operation
98    pub duration: std::time::Duration,
99}
100
101/// Result of an entire batch execution
102#[derive(Debug)]
103pub struct BatchResult {
104    /// Results for each operation
105    pub results: Vec<BatchOperationResult>,
106    /// Total time taken for the entire batch
107    pub total_duration: std::time::Duration,
108    /// Number of successful operations
109    pub success_count: usize,
110    /// Number of failed operations
111    pub failure_count: usize,
112}
113
114/// Batch processor for executing multiple API operations
115pub struct BatchProcessor {
116    config: BatchConfig,
117    rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
118    semaphore: Arc<Semaphore>,
119}
120
121impl BatchProcessor {
122    /// Creates a new batch processor with the given configuration
123    ///
124    /// # Panics
125    ///
126    /// Panics if the rate limit is configured as 0 (which would be invalid)
127    #[must_use]
128    pub fn new(config: BatchConfig) -> Self {
129        let rate_limiter = config.rate_limit.map(|limit| {
130            Arc::new(RateLimiter::direct(Quota::per_second(
131                NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).unwrap()),
132            )))
133        });
134
135        let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
136
137        Self {
138            config,
139            rate_limiter,
140            semaphore,
141        }
142    }
143
144    /// Parses a batch file from the given path
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if:
149    /// - The file cannot be read
150    /// - The file is not valid JSON or YAML
151    /// - The file structure doesn't match the expected `BatchFile` format
152    pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
153        let content = tokio::fs::read_to_string(path)
154            .await
155            .map_err(|e| Error::io_error(format!("Failed to read batch file: {e}")))?;
156
157        // Try to parse as JSON first, then YAML
158        if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
159            return Ok(batch_file);
160        }
161
162        if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
163            return Ok(batch_file);
164        }
165
166        Err(Error::validation_error(format!(
167            "Failed to parse batch file as JSON or YAML: {}",
168            path.display()
169        )))
170    }
171
172    /// Executes a batch of operations
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if:
177    /// - Any operation fails and `continue_on_error` is false
178    /// - Task spawning fails
179    /// - Network or API errors occur during operation execution
180    ///
181    /// # Panics
182    ///
183    /// Panics if the semaphore is poisoned (should not happen in normal operation)
184    #[allow(clippy::too_many_arguments)]
185    pub async fn execute_batch(
186        &self,
187        spec: &CachedSpec,
188        batch_file: BatchFile,
189        global_config: Option<&GlobalConfig>,
190        base_url: Option<&str>,
191        dry_run: bool,
192        output_format: &crate::cli::OutputFormat,
193        jq_filter: Option<&str>,
194    ) -> Result<BatchResult, Error> {
195        let start_time = std::time::Instant::now();
196        let total_operations = batch_file.operations.len();
197
198        if self.config.show_progress {
199            println!("Starting batch execution: {total_operations} operations");
200        }
201
202        let mut results = Vec::with_capacity(total_operations);
203        let mut handles = Vec::new();
204
205        // Create tasks for each operation
206        for (index, operation) in batch_file.operations.into_iter().enumerate() {
207            let spec = spec.clone();
208            let global_config = global_config.cloned();
209            let base_url = base_url.map(String::from);
210            let output_format = output_format.clone();
211            let jq_filter = jq_filter.map(String::from);
212            let semaphore = Arc::clone(&self.semaphore);
213            let rate_limiter = self.rate_limiter.clone();
214            let show_progress = self.config.show_progress;
215            let suppress_output = self.config.suppress_output;
216
217            let handle = tokio::spawn(async move {
218                // Acquire semaphore permit for concurrency control
219                let _permit = semaphore.acquire().await.unwrap();
220
221                // Apply rate limiting if configured
222                if let Some(limiter) = rate_limiter {
223                    limiter.until_ready().await;
224                }
225
226                let operation_start = std::time::Instant::now();
227
228                // Execute the operation
229                let result = Self::execute_single_operation(
230                    &spec,
231                    &operation,
232                    global_config.as_ref(),
233                    base_url.as_deref(),
234                    dry_run,
235                    &output_format,
236                    jq_filter.as_deref(),
237                    suppress_output,
238                )
239                .await;
240
241                let duration = operation_start.elapsed();
242
243                let (success, error, response) = match result {
244                    Ok(resp) => {
245                        if show_progress {
246                            println!("Operation {} completed", index + 1);
247                        }
248                        (true, None, Some(resp))
249                    }
250                    Err(e) => {
251                        if show_progress {
252                            println!("Operation {} failed: {}", index + 1, e);
253                        }
254                        (false, Some(e.to_string()), None)
255                    }
256                };
257
258                BatchOperationResult {
259                    operation,
260                    success,
261                    error,
262                    response,
263                    duration,
264                }
265            });
266
267            handles.push(handle);
268        }
269
270        // Collect all results
271        for handle in handles {
272            let result = handle
273                .await
274                .map_err(|e| Error::invalid_config(format!("Task failed: {e}")))?;
275            results.push(result);
276        }
277
278        let total_duration = start_time.elapsed();
279        let success_count = results.iter().filter(|r| r.success).count();
280        let failure_count = results.len() - success_count;
281
282        if self.config.show_progress {
283            println!(
284                "Batch execution completed: {}/{} operations successful in {:.2}s",
285                success_count,
286                total_operations,
287                total_duration.as_secs_f64()
288            );
289        }
290
291        Ok(BatchResult {
292            results,
293            total_duration,
294            success_count,
295            failure_count,
296        })
297    }
298
299    /// Executes a single operation from a batch
300    #[allow(clippy::too_many_arguments)]
301    async fn execute_single_operation(
302        spec: &CachedSpec,
303        operation: &BatchOperation,
304        global_config: Option<&GlobalConfig>,
305        base_url: Option<&str>,
306        dry_run: bool,
307        output_format: &crate::cli::OutputFormat,
308        jq_filter: Option<&str>,
309        suppress_output: bool,
310    ) -> Result<String, Error> {
311        use crate::engine::generator;
312
313        // Generate the command tree (we don't use experimental flags for batch operations)
314        let command = generator::generate_command_tree_with_flags(spec, false);
315
316        // Parse the operation args into ArgMatches
317        let matches = command
318            .try_get_matches_from(
319                std::iter::once(crate::constants::CLI_ROOT_COMMAND.to_string())
320                    .chain(operation.args.clone()),
321            )
322            .map_err(|e| Error::invalid_command(crate::constants::CONTEXT_BATCH, e.to_string()))?;
323
324        // Create cache configuration - for batch operations, we use the operation's use_cache setting
325        let cache_config = if operation.use_cache.unwrap_or(false) {
326            Some(crate::response_cache::CacheConfig {
327                cache_dir: std::env::var(crate::constants::ENV_APERTURE_CONFIG_DIR)
328                    .map_or_else(
329                        |_| std::path::PathBuf::from("~/.config/aperture"),
330                        std::path::PathBuf::from,
331                    )
332                    .join(crate::constants::DIR_CACHE)
333                    .join(crate::constants::DIR_RESPONSES),
334                default_ttl: std::time::Duration::from_secs(300),
335                max_entries: 1000,
336                enabled: true,
337            })
338        } else {
339            None
340        };
341
342        if suppress_output {
343            // When suppressing output, capture it
344            let output = crate::engine::executor::execute_request(
345                spec,
346                &matches,
347                base_url,
348                dry_run,
349                None, // idempotency_key
350                global_config,
351                output_format,
352                jq_filter,
353                cache_config.as_ref(),
354                true, // capture_output
355            )
356            .await?;
357
358            // Return captured output (for debugging/logging if needed)
359            Ok(output.unwrap_or_default())
360        } else {
361            // Normal execution - output goes to stdout
362            if dry_run {
363                // For dry run, we still call execute_request but with dry_run=true
364                crate::engine::executor::execute_request(
365                    spec,
366                    &matches,
367                    base_url,
368                    true, // dry_run
369                    None, // idempotency_key
370                    global_config,
371                    output_format,
372                    jq_filter,
373                    cache_config.as_ref(),
374                    false, // capture_output
375                )
376                .await?;
377
378                // Return dry run message
379                Ok(format!(
380                    "DRY RUN: Would execute operation with args: {:?}",
381                    operation.args
382                ))
383            } else {
384                // For actual execution, call execute_request normally
385                // The output will go to stdout as expected for batch operations
386                crate::engine::executor::execute_request(
387                    spec,
388                    &matches,
389                    base_url,
390                    false, // dry_run
391                    None,  // idempotency_key
392                    global_config,
393                    output_format,
394                    jq_filter,
395                    cache_config.as_ref(),
396                    false, // capture_output
397                )
398                .await?;
399
400                // Return success message
401                Ok(format!(
402                    "Successfully executed operation: {}",
403                    operation
404                        .id
405                        .as_deref()
406                        .unwrap_or(crate::constants::DEFAULT_OPERATION_NAME)
407                ))
408            }
409        }
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416    use std::io::Write;
417    use tempfile::NamedTempFile;
418
419    #[tokio::test]
420    async fn test_parse_batch_file_json() {
421        let batch_content = r#"{
422            "metadata": {
423                "name": "Test batch",
424                "description": "A test batch file"
425            },
426            "operations": [
427                {
428                    "id": "op1",
429                    "args": ["users", "list"],
430                    "description": "List all users"
431                },
432                {
433                    "id": "op2", 
434                    "args": ["users", "get", "--user-id", "123"],
435                    "description": "Get user 123"
436                }
437            ]
438        }"#;
439
440        let mut temp_file = NamedTempFile::new().unwrap();
441        temp_file.write_all(batch_content.as_bytes()).unwrap();
442
443        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
444            .await
445            .unwrap();
446
447        assert_eq!(batch_file.operations.len(), 2);
448        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
449        assert_eq!(
450            batch_file.operations[1].args,
451            vec!["users", "get", "--user-id", "123"]
452        );
453    }
454
455    #[tokio::test]
456    async fn test_parse_batch_file_yaml() {
457        let batch_content = r#"
458metadata:
459  name: Test batch
460  description: A test batch file
461operations:
462  - id: op1
463    args: [users, list]
464    description: List all users
465  - id: op2
466    args: [users, get, --user-id, "123"]
467    description: Get user 123
468"#;
469
470        let mut temp_file = NamedTempFile::new().unwrap();
471        temp_file.write_all(batch_content.as_bytes()).unwrap();
472
473        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
474            .await
475            .unwrap();
476
477        assert_eq!(batch_file.operations.len(), 2);
478        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
479        assert_eq!(
480            batch_file.operations[1].args,
481            vec!["users", "get", "--user-id", "123"]
482        );
483    }
484
485    #[test]
486    fn test_batch_config_default() {
487        let config = BatchConfig::default();
488        assert_eq!(config.max_concurrency, 5);
489        assert_eq!(config.rate_limit, None);
490        assert!(config.continue_on_error);
491        assert!(config.show_progress);
492    }
493
494    #[test]
495    fn test_batch_processor_creation() {
496        let config = BatchConfig {
497            max_concurrency: 10,
498            rate_limit: Some(5),
499            continue_on_error: false,
500            show_progress: false,
501            suppress_output: false,
502        };
503
504        let processor = BatchProcessor::new(config);
505        assert_eq!(processor.semaphore.available_permits(), 10);
506        assert!(processor.rate_limiter.is_some());
507    }
508}