aperture_cli/
batch.rs

1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::engine::generator;
4use crate::error::Error;
5use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
6use serde::{Deserialize, Serialize};
7use std::num::NonZeroU32;
8use std::path::Path;
9use std::sync::Arc;
10use tokio::sync::Semaphore;
11
12/// Configuration for batch processing operations
13#[derive(Debug, Clone)]
14pub struct BatchConfig {
15    /// Maximum number of concurrent requests
16    pub max_concurrency: usize,
17    /// Rate limit: requests per second
18    pub rate_limit: Option<u32>,
19    /// Whether to continue processing if a request fails
20    pub continue_on_error: bool,
21    /// Whether to show progress during processing
22    pub show_progress: bool,
23    /// Whether to suppress individual operation outputs
24    pub suppress_output: bool,
25}
26
27impl Default for BatchConfig {
28    fn default() -> Self {
29        Self {
30            max_concurrency: 5,
31            rate_limit: None,
32            continue_on_error: true,
33            show_progress: true,
34            suppress_output: false,
35        }
36    }
37}
38
39/// A single batch operation definition
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct BatchOperation {
42    /// Unique identifier for this operation (optional)
43    pub id: Option<String>,
44    /// The command arguments to execute (e.g., `["users", "get", "--user-id", "123"]`)
45    pub args: Vec<String>,
46    /// Optional description for this operation
47    pub description: Option<String>,
48    /// Custom headers for this specific operation
49    #[serde(default)]
50    pub headers: std::collections::HashMap<String, String>,
51    /// Whether to use cache for this operation (overrides global cache setting)
52    pub use_cache: Option<bool>,
53}
54
55/// Batch file format containing multiple operations
56#[derive(Debug, Serialize, Deserialize)]
57pub struct BatchFile {
58    /// Metadata about this batch
59    pub metadata: Option<BatchMetadata>,
60    /// List of operations to execute
61    pub operations: Vec<BatchOperation>,
62}
63
64/// Metadata for a batch file
65#[derive(Debug, Serialize, Deserialize)]
66pub struct BatchMetadata {
67    /// Name/description of this batch
68    pub name: Option<String>,
69    /// Version of the batch file format
70    pub version: Option<String>,
71    /// Description of what this batch does
72    pub description: Option<String>,
73    /// Default configuration for all operations in this batch
74    pub defaults: Option<BatchDefaults>,
75}
76
77/// Default configuration for batch operations
78#[derive(Debug, Serialize, Deserialize)]
79pub struct BatchDefaults {
80    /// Default headers to apply to all operations
81    #[serde(default)]
82    pub headers: std::collections::HashMap<String, String>,
83    /// Default cache setting for all operations
84    pub use_cache: Option<bool>,
85}
86
87/// Result of a single batch operation
88#[derive(Debug)]
89pub struct BatchOperationResult {
90    /// The operation that was executed
91    pub operation: BatchOperation,
92    /// Whether the operation succeeded
93    pub success: bool,
94    /// Error message if the operation failed
95    pub error: Option<String>,
96    /// Response body if the operation succeeded
97    pub response: Option<String>,
98    /// Time taken to execute this operation
99    pub duration: std::time::Duration,
100}
101
102/// Result of an entire batch execution
103#[derive(Debug)]
104pub struct BatchResult {
105    /// Results for each operation
106    pub results: Vec<BatchOperationResult>,
107    /// Total time taken for the entire batch
108    pub total_duration: std::time::Duration,
109    /// Number of successful operations
110    pub success_count: usize,
111    /// Number of failed operations
112    pub failure_count: usize,
113}
114
115/// Batch processor for executing multiple API operations
116pub struct BatchProcessor {
117    config: BatchConfig,
118    rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
119    semaphore: Arc<Semaphore>,
120}
121
122impl BatchProcessor {
123    /// Creates a new batch processor with the given configuration
124    ///
125    /// # Panics
126    ///
127    /// Panics if the rate limit is configured as 0 (which would be invalid)
128    #[must_use]
129    pub fn new(config: BatchConfig) -> Self {
130        let rate_limiter = config.rate_limit.map(|limit| {
131            Arc::new(RateLimiter::direct(Quota::per_second(
132                NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).unwrap()),
133            )))
134        });
135
136        let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
137
138        Self {
139            config,
140            rate_limiter,
141            semaphore,
142        }
143    }
144
145    /// Parses a batch file from the given path
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if:
150    /// - The file cannot be read
151    /// - The file is not valid JSON or YAML
152    /// - The file structure doesn't match the expected `BatchFile` format
153    pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
154        let content = tokio::fs::read_to_string(path)
155            .await
156            .map_err(|e| Error::io_error(format!("Failed to read batch file: {e}")))?;
157
158        // Try to parse as JSON first, then YAML
159        if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
160            return Ok(batch_file);
161        }
162
163        if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
164            return Ok(batch_file);
165        }
166
167        Err(Error::validation_error(format!(
168            "Failed to parse batch file as JSON or YAML: {}",
169            path.display()
170        )))
171    }
172
173    /// Executes a batch of operations
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if:
178    /// - Any operation fails and `continue_on_error` is false
179    /// - Task spawning fails
180    /// - Network or API errors occur during operation execution
181    ///
182    /// # Panics
183    ///
184    /// Panics if the semaphore is poisoned (should not happen in normal operation)
185    #[allow(clippy::too_many_arguments)]
186    pub async fn execute_batch(
187        &self,
188        spec: &CachedSpec,
189        batch_file: BatchFile,
190        global_config: Option<&GlobalConfig>,
191        base_url: Option<&str>,
192        dry_run: bool,
193        output_format: &crate::cli::OutputFormat,
194        jq_filter: Option<&str>,
195    ) -> Result<BatchResult, Error> {
196        let start_time = std::time::Instant::now();
197        let total_operations = batch_file.operations.len();
198
199        if self.config.show_progress {
200            println!("Starting batch execution: {total_operations} operations");
201        }
202
203        let mut results = Vec::with_capacity(total_operations);
204        let mut handles = Vec::new();
205
206        // Create tasks for each operation
207        for (index, operation) in batch_file.operations.into_iter().enumerate() {
208            let spec = spec.clone();
209            let global_config = global_config.cloned();
210            let base_url = base_url.map(String::from);
211            let output_format = output_format.clone();
212            let jq_filter = jq_filter.map(String::from);
213            let semaphore = Arc::clone(&self.semaphore);
214            let rate_limiter = self.rate_limiter.clone();
215            let show_progress = self.config.show_progress;
216            let suppress_output = self.config.suppress_output;
217
218            let handle = tokio::spawn(async move {
219                // Acquire semaphore permit for concurrency control
220                let _permit = semaphore.acquire().await.unwrap();
221
222                // Apply rate limiting if configured
223                if let Some(limiter) = rate_limiter {
224                    limiter.until_ready().await;
225                }
226
227                let operation_start = std::time::Instant::now();
228
229                // Execute the operation
230                let result = Self::execute_single_operation(
231                    &spec,
232                    &operation,
233                    global_config.as_ref(),
234                    base_url.as_deref(),
235                    dry_run,
236                    &output_format,
237                    jq_filter.as_deref(),
238                    suppress_output,
239                )
240                .await;
241
242                let duration = operation_start.elapsed();
243
244                let (success, error, response) = match result {
245                    Ok(resp) => {
246                        if show_progress {
247                            println!("Operation {} completed", index + 1);
248                        }
249                        (true, None, Some(resp))
250                    }
251                    Err(e) => {
252                        if show_progress {
253                            println!("Operation {} failed: {}", index + 1, e);
254                        }
255                        (false, Some(e.to_string()), None)
256                    }
257                };
258
259                BatchOperationResult {
260                    operation,
261                    success,
262                    error,
263                    response,
264                    duration,
265                }
266            });
267
268            handles.push(handle);
269        }
270
271        // Collect all results
272        for handle in handles {
273            let result = handle
274                .await
275                .map_err(|e| Error::invalid_config(format!("Task failed: {e}")))?;
276            results.push(result);
277        }
278
279        let total_duration = start_time.elapsed();
280        let success_count = results.iter().filter(|r| r.success).count();
281        let failure_count = results.len() - success_count;
282
283        if self.config.show_progress {
284            println!(
285                "Batch execution completed: {}/{} operations successful in {:.2}s",
286                success_count,
287                total_operations,
288                total_duration.as_secs_f64()
289            );
290        }
291
292        Ok(BatchResult {
293            results,
294            total_duration,
295            success_count,
296            failure_count,
297        })
298    }
299
300    /// Executes a single operation from a batch
301    #[allow(clippy::too_many_arguments)]
302    async fn execute_single_operation(
303        spec: &CachedSpec,
304        operation: &BatchOperation,
305        global_config: Option<&GlobalConfig>,
306        base_url: Option<&str>,
307        dry_run: bool,
308        output_format: &crate::cli::OutputFormat,
309        jq_filter: Option<&str>,
310        suppress_output: bool,
311    ) -> Result<String, Error> {
312        // Generate the command tree (we don't use experimental flags for batch operations)
313        let command = generator::generate_command_tree_with_flags(spec, false);
314
315        // Parse the operation args into ArgMatches
316        let matches = command
317            .try_get_matches_from(
318                std::iter::once(crate::constants::CLI_ROOT_COMMAND.to_string())
319                    .chain(operation.args.clone()),
320            )
321            .map_err(|e| Error::invalid_command(crate::constants::CONTEXT_BATCH, e.to_string()))?;
322
323        // Create cache configuration - for batch operations, we use the operation's use_cache setting
324        let cache_config = if operation.use_cache.unwrap_or(false) {
325            Some(crate::response_cache::CacheConfig {
326                cache_dir: std::env::var(crate::constants::ENV_APERTURE_CONFIG_DIR)
327                    .map_or_else(
328                        |_| std::path::PathBuf::from("~/.config/aperture"),
329                        std::path::PathBuf::from,
330                    )
331                    .join(crate::constants::DIR_CACHE)
332                    .join(crate::constants::DIR_RESPONSES),
333                default_ttl: std::time::Duration::from_secs(300),
334                max_entries: 1000,
335                enabled: true,
336            })
337        } else {
338            None
339        };
340
341        if suppress_output {
342            // When suppressing output, capture it
343            let output = crate::engine::executor::execute_request(
344                spec,
345                &matches,
346                base_url,
347                dry_run,
348                None, // idempotency_key
349                global_config,
350                output_format,
351                jq_filter,
352                cache_config.as_ref(),
353                true, // capture_output
354            )
355            .await?;
356
357            // Return captured output (for debugging/logging if needed)
358            Ok(output.unwrap_or_default())
359        } else {
360            // Normal execution - output goes to stdout
361            if dry_run {
362                // For dry run, we still call execute_request but with dry_run=true
363                crate::engine::executor::execute_request(
364                    spec,
365                    &matches,
366                    base_url,
367                    true, // dry_run
368                    None, // idempotency_key
369                    global_config,
370                    output_format,
371                    jq_filter,
372                    cache_config.as_ref(),
373                    false, // capture_output
374                )
375                .await?;
376
377                // Return dry run message
378                Ok(format!(
379                    "DRY RUN: Would execute operation with args: {:?}",
380                    operation.args
381                ))
382            } else {
383                // For actual execution, call execute_request normally
384                // The output will go to stdout as expected for batch operations
385                crate::engine::executor::execute_request(
386                    spec,
387                    &matches,
388                    base_url,
389                    false, // dry_run
390                    None,  // idempotency_key
391                    global_config,
392                    output_format,
393                    jq_filter,
394                    cache_config.as_ref(),
395                    false, // capture_output
396                )
397                .await?;
398
399                // Return success message
400                Ok(format!(
401                    "Successfully executed operation: {}",
402                    operation
403                        .id
404                        .as_deref()
405                        .unwrap_or(crate::constants::DEFAULT_OPERATION_NAME)
406                ))
407            }
408        }
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use std::io::Write;
416    use tempfile::NamedTempFile;
417
418    #[tokio::test]
419    async fn test_parse_batch_file_json() {
420        let batch_content = r#"{
421            "metadata": {
422                "name": "Test batch",
423                "description": "A test batch file"
424            },
425            "operations": [
426                {
427                    "id": "op1",
428                    "args": ["users", "list"],
429                    "description": "List all users"
430                },
431                {
432                    "id": "op2", 
433                    "args": ["users", "get", "--user-id", "123"],
434                    "description": "Get user 123"
435                }
436            ]
437        }"#;
438
439        let mut temp_file = NamedTempFile::new().unwrap();
440        temp_file.write_all(batch_content.as_bytes()).unwrap();
441
442        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
443            .await
444            .unwrap();
445
446        assert_eq!(batch_file.operations.len(), 2);
447        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
448        assert_eq!(
449            batch_file.operations[1].args,
450            vec!["users", "get", "--user-id", "123"]
451        );
452    }
453
454    #[tokio::test]
455    async fn test_parse_batch_file_yaml() {
456        let batch_content = r#"
457metadata:
458  name: Test batch
459  description: A test batch file
460operations:
461  - id: op1
462    args: [users, list]
463    description: List all users
464  - id: op2
465    args: [users, get, --user-id, "123"]
466    description: Get user 123
467"#;
468
469        let mut temp_file = NamedTempFile::new().unwrap();
470        temp_file.write_all(batch_content.as_bytes()).unwrap();
471
472        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
473            .await
474            .unwrap();
475
476        assert_eq!(batch_file.operations.len(), 2);
477        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
478        assert_eq!(
479            batch_file.operations[1].args,
480            vec!["users", "get", "--user-id", "123"]
481        );
482    }
483
484    #[test]
485    fn test_batch_config_default() {
486        let config = BatchConfig::default();
487        assert_eq!(config.max_concurrency, 5);
488        assert_eq!(config.rate_limit, None);
489        assert!(config.continue_on_error);
490        assert!(config.show_progress);
491    }
492
493    #[test]
494    fn test_batch_processor_creation() {
495        let config = BatchConfig {
496            max_concurrency: 10,
497            rate_limit: Some(5),
498            continue_on_error: false,
499            show_progress: false,
500            suppress_output: false,
501        };
502
503        let processor = BatchProcessor::new(config);
504        assert_eq!(processor.semaphore.available_permits(), 10);
505        assert!(processor.rate_limiter.is_some());
506    }
507}