Skip to main content

aperture_cli/
batch.rs

1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::duration::parse_duration;
4use crate::engine::executor::RetryContext;
5use crate::engine::generator;
6use crate::error::Error;
7use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
8use serde::{Deserialize, Serialize};
9use std::num::NonZeroU32;
10use std::path::Path;
11use std::sync::Arc;
12use tokio::sync::Semaphore;
13
14/// Configuration for batch processing operations
15#[derive(Debug, Clone)]
16pub struct BatchConfig {
17    /// Maximum number of concurrent requests
18    pub max_concurrency: usize,
19    /// Rate limit: requests per second
20    pub rate_limit: Option<u32>,
21    /// Whether to continue processing if a request fails
22    pub continue_on_error: bool,
23    /// Whether to show progress during processing
24    pub show_progress: bool,
25    /// Whether to suppress individual operation outputs
26    pub suppress_output: bool,
27}
28
29impl Default for BatchConfig {
30    fn default() -> Self {
31        Self {
32            max_concurrency: 5,
33            rate_limit: None,
34            continue_on_error: true,
35            show_progress: true,
36            suppress_output: false,
37        }
38    }
39}
40
41/// A single batch operation definition
42#[derive(Debug, Clone, Serialize, Deserialize, Default)]
43pub struct BatchOperation {
44    /// Unique identifier for this operation (optional)
45    pub id: Option<String>,
46    /// The command arguments to execute (e.g., `["users", "get", "--user-id", "123"]`)
47    pub args: Vec<String>,
48    /// Optional description for this operation
49    pub description: Option<String>,
50    /// Custom headers for this specific operation
51    #[serde(default)]
52    pub headers: std::collections::HashMap<String, String>,
53    /// Whether to use cache for this operation (overrides global cache setting)
54    pub use_cache: Option<bool>,
55    /// Maximum number of retry attempts for this operation (overrides global retry setting)
56    #[serde(default)]
57    pub retry: Option<u32>,
58    /// Initial delay between retries (e.g., "500ms", "1s")
59    #[serde(default)]
60    pub retry_delay: Option<String>,
61    /// Maximum delay cap between retries (e.g., "30s", "1m")
62    #[serde(default)]
63    pub retry_max_delay: Option<String>,
64    /// Force retry on non-idempotent requests without an idempotency key
65    #[serde(default)]
66    pub force_retry: bool,
67}
68
69/// Batch file format containing multiple operations
70#[derive(Debug, Serialize, Deserialize)]
71pub struct BatchFile {
72    /// Metadata about this batch
73    pub metadata: Option<BatchMetadata>,
74    /// List of operations to execute
75    pub operations: Vec<BatchOperation>,
76}
77
78/// Metadata for a batch file
79#[derive(Debug, Serialize, Deserialize)]
80pub struct BatchMetadata {
81    /// Name/description of this batch
82    pub name: Option<String>,
83    /// Version of the batch file format
84    pub version: Option<String>,
85    /// Description of what this batch does
86    pub description: Option<String>,
87    /// Default configuration for all operations in this batch
88    pub defaults: Option<BatchDefaults>,
89}
90
91/// Default configuration for batch operations
92#[derive(Debug, Serialize, Deserialize)]
93pub struct BatchDefaults {
94    /// Default headers to apply to all operations
95    #[serde(default)]
96    pub headers: std::collections::HashMap<String, String>,
97    /// Default cache setting for all operations
98    pub use_cache: Option<bool>,
99}
100
101/// Result of a single batch operation
102#[derive(Debug)]
103pub struct BatchOperationResult {
104    /// The operation that was executed
105    pub operation: BatchOperation,
106    /// Whether the operation succeeded
107    pub success: bool,
108    /// Error message if the operation failed
109    pub error: Option<String>,
110    /// Response body if the operation succeeded
111    pub response: Option<String>,
112    /// Time taken to execute this operation
113    pub duration: std::time::Duration,
114}
115
116/// Result of an entire batch execution
117#[derive(Debug)]
118pub struct BatchResult {
119    /// Results for each operation
120    pub results: Vec<BatchOperationResult>,
121    /// Total time taken for the entire batch
122    pub total_duration: std::time::Duration,
123    /// Number of successful operations
124    pub success_count: usize,
125    /// Number of failed operations
126    pub failure_count: usize,
127}
128
129/// Batch processor for executing multiple API operations
130pub struct BatchProcessor {
131    config: BatchConfig,
132    rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
133    semaphore: Arc<Semaphore>,
134}
135
136impl BatchProcessor {
137    /// Creates a new batch processor with the given configuration
138    ///
139    /// # Panics
140    ///
141    /// Panics if the rate limit is configured as 0 (which would be invalid)
142    #[must_use]
143    pub fn new(config: BatchConfig) -> Self {
144        let rate_limiter = config.rate_limit.map(|limit| {
145            Arc::new(RateLimiter::direct(Quota::per_second(
146                NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).expect("1 is non-zero")),
147            )))
148        });
149
150        let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
151
152        Self {
153            config,
154            rate_limiter,
155            semaphore,
156        }
157    }
158
159    /// Parses a batch file from the given path
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if:
164    /// - The file cannot be read
165    /// - The file is not valid JSON or YAML
166    /// - The file structure doesn't match the expected `BatchFile` format
167    pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
168        let content = tokio::fs::read_to_string(path)
169            .await
170            .map_err(|e| Error::io_error(format!("Failed to read batch file: {e}")))?;
171
172        // Try to parse as JSON first, then YAML
173        if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
174            return Ok(batch_file);
175        }
176
177        if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
178            return Ok(batch_file);
179        }
180
181        Err(Error::validation_error(format!(
182            "Failed to parse batch file as JSON or YAML: {}",
183            path.display()
184        )))
185    }
186
187    /// Executes a batch of operations
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if:
192    /// - Any operation fails and `continue_on_error` is false
193    /// - Task spawning fails
194    /// - Network or API errors occur during operation execution
195    ///
196    /// # Panics
197    ///
198    /// Panics if the semaphore is poisoned (should not happen in normal operation)
199    #[allow(clippy::too_many_arguments)]
200    pub async fn execute_batch(
201        &self,
202        spec: &CachedSpec,
203        batch_file: BatchFile,
204        global_config: Option<&GlobalConfig>,
205        base_url: Option<&str>,
206        dry_run: bool,
207        output_format: &crate::cli::OutputFormat,
208        jq_filter: Option<&str>,
209    ) -> Result<BatchResult, Error> {
210        let start_time = std::time::Instant::now();
211        let total_operations = batch_file.operations.len();
212
213        if self.config.show_progress {
214            // ast-grep-ignore: no-println
215            println!("Starting batch execution: {total_operations} operations");
216        }
217
218        let mut results = Vec::with_capacity(total_operations);
219        let mut handles = Vec::new();
220
221        // Create tasks for each operation
222        for (index, operation) in batch_file.operations.into_iter().enumerate() {
223            let spec = spec.clone();
224            let global_config = global_config.cloned();
225            let base_url = base_url.map(String::from);
226            let output_format = output_format.clone();
227            let jq_filter = jq_filter.map(String::from);
228            let semaphore = Arc::clone(&self.semaphore);
229            let rate_limiter = self.rate_limiter.clone();
230            let show_progress = self.config.show_progress;
231            let suppress_output = self.config.suppress_output;
232
233            let handle = tokio::spawn(async move {
234                // Acquire semaphore permit for concurrency control
235                let _permit = semaphore
236                    .acquire()
237                    .await
238                    .expect("semaphore should not be closed");
239
240                // Apply rate limiting if configured
241                if let Some(limiter) = rate_limiter {
242                    limiter.until_ready().await;
243                }
244
245                let operation_start = std::time::Instant::now();
246
247                // Execute the operation
248                let result = Self::execute_single_operation(
249                    &spec,
250                    &operation,
251                    global_config.as_ref(),
252                    base_url.as_deref(),
253                    dry_run,
254                    &output_format,
255                    jq_filter.as_deref(),
256                    suppress_output,
257                )
258                .await;
259
260                let duration = operation_start.elapsed();
261
262                let (success, error, response) = match result {
263                    Ok(resp) => {
264                        if show_progress {
265                            // ast-grep-ignore: no-println
266                            println!("Operation {} completed", index + 1);
267                        }
268                        (true, None, Some(resp))
269                    }
270                    Err(e) => {
271                        if show_progress {
272                            // ast-grep-ignore: no-println
273                            println!("Operation {} failed: {}", index + 1, e);
274                        }
275                        (false, Some(e.to_string()), None)
276                    }
277                };
278
279                BatchOperationResult {
280                    operation,
281                    success,
282                    error,
283                    response,
284                    duration,
285                }
286            });
287
288            handles.push(handle);
289        }
290
291        // Collect all results
292        for handle in handles {
293            let result = handle
294                .await
295                .map_err(|e| Error::invalid_config(format!("Task failed: {e}")))?;
296            results.push(result);
297        }
298
299        let total_duration = start_time.elapsed();
300        let success_count = results.iter().filter(|r| r.success).count();
301        let failure_count = results.len() - success_count;
302
303        if self.config.show_progress {
304            // ast-grep-ignore: no-println
305            println!(
306                "Batch execution completed: {}/{} operations successful in {:.2}s",
307                success_count,
308                total_operations,
309                total_duration.as_secs_f64()
310            );
311        }
312
313        Ok(BatchResult {
314            results,
315            total_duration,
316            success_count,
317            failure_count,
318        })
319    }
320
321    /// Executes a single operation from a batch
322    #[allow(clippy::too_many_arguments)]
323    async fn execute_single_operation(
324        spec: &CachedSpec,
325        operation: &BatchOperation,
326        global_config: Option<&GlobalConfig>,
327        base_url: Option<&str>,
328        dry_run: bool,
329        output_format: &crate::cli::OutputFormat,
330        jq_filter: Option<&str>,
331        suppress_output: bool,
332    ) -> Result<String, Error> {
333        // Generate the command tree (we don't use experimental flags for batch operations)
334        let command = generator::generate_command_tree_with_flags(spec, false);
335
336        // Parse the operation args into ArgMatches
337        let matches = command
338            .try_get_matches_from(
339                std::iter::once(crate::constants::CLI_ROOT_COMMAND.to_string())
340                    .chain(operation.args.clone()),
341            )
342            .map_err(|e| Error::invalid_command(crate::constants::CONTEXT_BATCH, e.to_string()))?;
343
344        // Create cache configuration - for batch operations, we use the operation's use_cache setting
345        let cache_enabled = operation.use_cache.unwrap_or(false);
346        let cache_config = if cache_enabled {
347            Some(crate::response_cache::CacheConfig {
348                cache_dir: std::env::var(crate::constants::ENV_APERTURE_CONFIG_DIR)
349                    .map_or_else(
350                        |_| std::path::PathBuf::from("~/.config/aperture"),
351                        std::path::PathBuf::from,
352                    )
353                    .join(crate::constants::DIR_CACHE)
354                    .join(crate::constants::DIR_RESPONSES),
355                default_ttl: std::time::Duration::from_secs(300),
356                max_entries: 1000,
357                enabled: true,
358            })
359        } else {
360            None
361        };
362
363        // Build retry context from operation settings and global config defaults
364        let retry_context = build_batch_retry_context(operation, global_config)?;
365
366        // When suppressing output, capture it and return early
367        if suppress_output {
368            let output = crate::engine::executor::execute_request(
369                spec,
370                &matches,
371                base_url,
372                dry_run,
373                None, // idempotency_key
374                global_config,
375                output_format,
376                jq_filter,
377                cache_config.as_ref(),
378                true, // capture_output
379                retry_context.as_ref(),
380            )
381            .await?;
382
383            // Return captured output (for debugging/logging if needed)
384            return Ok(output.unwrap_or_default());
385        }
386
387        // Normal execution - output goes to stdout
388        // Handle dry run case
389        if dry_run {
390            crate::engine::executor::execute_request(
391                spec,
392                &matches,
393                base_url,
394                true, // dry_run
395                None, // idempotency_key
396                global_config,
397                output_format,
398                jq_filter,
399                cache_config.as_ref(),
400                false, // capture_output
401                retry_context.as_ref(),
402            )
403            .await?;
404
405            // Return dry run message
406            return Ok(format!(
407                "DRY RUN: Would execute operation with args: {:?}",
408                operation.args
409            ));
410        }
411
412        // For actual execution, call execute_request normally
413        // The output will go to stdout as expected for batch operations
414        crate::engine::executor::execute_request(
415            spec,
416            &matches,
417            base_url,
418            false, // dry_run
419            None,  // idempotency_key
420            global_config,
421            output_format,
422            jq_filter,
423            cache_config.as_ref(),
424            false, // capture_output
425            retry_context.as_ref(),
426        )
427        .await?;
428
429        // Return success message
430        Ok(format!(
431            "Successfully executed operation: {}",
432            operation
433                .id
434                .as_deref()
435                .unwrap_or(crate::constants::DEFAULT_OPERATION_NAME)
436        ))
437    }
438}
439
440/// Builds a `RetryContext` from batch operation settings and global configuration.
441///
442/// Operation-level settings take precedence over global config defaults.
443#[allow(clippy::cast_possible_truncation)]
444fn build_batch_retry_context(
445    operation: &BatchOperation,
446    global_config: Option<&GlobalConfig>,
447) -> Result<Option<RetryContext>, Error> {
448    // Get retry defaults from global config
449    let defaults = global_config.map(|c| &c.retry_defaults);
450
451    // Determine max_attempts: operation > global config > 0 (disabled)
452    let max_attempts = operation
453        .retry
454        .or_else(|| defaults.map(|d| d.max_attempts))
455        .unwrap_or(0);
456
457    // If retries are disabled, return None
458    if max_attempts == 0 {
459        return Ok(None);
460    }
461
462    // Determine initial_delay_ms: operation > global config > 500ms default
463    // Truncation is safe: delay values in practice are well under u64::MAX milliseconds
464    let initial_delay_ms = if let Some(ref delay_str) = operation.retry_delay {
465        parse_duration(delay_str)?.as_millis() as u64
466    } else {
467        defaults.map_or(500, |d| d.initial_delay_ms)
468    };
469
470    // Determine max_delay_ms: operation > global config > 30000ms default
471    // Truncation is safe: delay values in practice are well under u64::MAX milliseconds
472    let max_delay_ms = if let Some(ref delay_str) = operation.retry_max_delay {
473        parse_duration(delay_str)?.as_millis() as u64
474    } else {
475        defaults.map_or(30_000, |d| d.max_delay_ms)
476    };
477
478    Ok(Some(RetryContext {
479        max_attempts,
480        initial_delay_ms,
481        max_delay_ms,
482        force_retry: operation.force_retry,
483        method: None,               // Will be determined in executor
484        has_idempotency_key: false, // Batch operations don't support idempotency keys yet
485    }))
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use std::io::Write;
492    use tempfile::NamedTempFile;
493
494    #[tokio::test]
495    async fn test_parse_batch_file_json() {
496        let batch_content = r#"{
497            "metadata": {
498                "name": "Test batch",
499                "description": "A test batch file"
500            },
501            "operations": [
502                {
503                    "id": "op1",
504                    "args": ["users", "list"],
505                    "description": "List all users"
506                },
507                {
508                    "id": "op2", 
509                    "args": ["users", "get", "--user-id", "123"],
510                    "description": "Get user 123"
511                }
512            ]
513        }"#;
514
515        let mut temp_file = NamedTempFile::new().unwrap();
516        temp_file.write_all(batch_content.as_bytes()).unwrap();
517
518        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
519            .await
520            .unwrap();
521
522        assert_eq!(batch_file.operations.len(), 2);
523        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
524        assert_eq!(
525            batch_file.operations[1].args,
526            vec!["users", "get", "--user-id", "123"]
527        );
528    }
529
530    #[tokio::test]
531    async fn test_parse_batch_file_yaml() {
532        let batch_content = r#"
533metadata:
534  name: Test batch
535  description: A test batch file
536operations:
537  - id: op1
538    args: [users, list]
539    description: List all users
540  - id: op2
541    args: [users, get, --user-id, "123"]
542    description: Get user 123
543"#;
544
545        let mut temp_file = NamedTempFile::new().unwrap();
546        temp_file.write_all(batch_content.as_bytes()).unwrap();
547
548        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
549            .await
550            .unwrap();
551
552        assert_eq!(batch_file.operations.len(), 2);
553        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
554        assert_eq!(
555            batch_file.operations[1].args,
556            vec!["users", "get", "--user-id", "123"]
557        );
558    }
559
560    #[test]
561    fn test_batch_config_default() {
562        let config = BatchConfig::default();
563        assert_eq!(config.max_concurrency, 5);
564        assert_eq!(config.rate_limit, None);
565        assert!(config.continue_on_error);
566        assert!(config.show_progress);
567    }
568
569    #[test]
570    fn test_batch_processor_creation() {
571        let config = BatchConfig {
572            max_concurrency: 10,
573            rate_limit: Some(5),
574            continue_on_error: false,
575            show_progress: false,
576            suppress_output: false,
577        };
578
579        let processor = BatchProcessor::new(config);
580        assert_eq!(processor.semaphore.available_permits(), 10);
581        assert!(processor.rate_limiter.is_some());
582    }
583}