Skip to main content

aperture_cli/
batch.rs

1use crate::cache::models::CachedSpec;
2use crate::config::models::GlobalConfig;
3use crate::duration::parse_duration;
4use crate::engine::executor::RetryContext;
5use crate::engine::generator;
6use crate::error::Error;
7use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
8use serde::{Deserialize, Serialize};
9use std::num::NonZeroU32;
10use std::path::Path;
11use std::sync::Arc;
12use tokio::sync::Semaphore;
13
14/// Configuration for batch processing operations
15#[derive(Debug, Clone)]
16pub struct BatchConfig {
17    /// Maximum number of concurrent requests
18    pub max_concurrency: usize,
19    /// Rate limit: requests per second
20    pub rate_limit: Option<u32>,
21    /// Whether to continue processing if a request fails
22    pub continue_on_error: bool,
23    /// Whether to show progress during processing
24    pub show_progress: bool,
25    /// Whether to suppress individual operation outputs
26    pub suppress_output: bool,
27}
28
29impl Default for BatchConfig {
30    fn default() -> Self {
31        Self {
32            max_concurrency: 5,
33            rate_limit: None,
34            continue_on_error: true,
35            show_progress: true,
36            suppress_output: false,
37        }
38    }
39}
40
41/// A single batch operation definition
42#[derive(Debug, Clone, Serialize, Deserialize, Default)]
43pub struct BatchOperation {
44    /// Unique identifier for this operation (optional)
45    pub id: Option<String>,
46    /// The command arguments to execute (e.g., `["users", "get", "--user-id", "123"]`)
47    pub args: Vec<String>,
48    /// Optional description for this operation
49    pub description: Option<String>,
50    /// Custom headers for this specific operation
51    #[serde(default)]
52    pub headers: std::collections::HashMap<String, String>,
53    /// Whether to use cache for this operation (overrides global cache setting)
54    pub use_cache: Option<bool>,
55    /// Maximum number of retry attempts for this operation (overrides global retry setting)
56    #[serde(default)]
57    pub retry: Option<u32>,
58    /// Initial delay between retries (e.g., "500ms", "1s")
59    #[serde(default)]
60    pub retry_delay: Option<String>,
61    /// Maximum delay cap between retries (e.g., "30s", "1m")
62    #[serde(default)]
63    pub retry_max_delay: Option<String>,
64    /// Force retry on non-idempotent requests without an idempotency key
65    #[serde(default)]
66    pub force_retry: bool,
67}
68
69/// Batch file format containing multiple operations
70#[derive(Debug, Serialize, Deserialize)]
71pub struct BatchFile {
72    /// Metadata about this batch
73    pub metadata: Option<BatchMetadata>,
74    /// List of operations to execute
75    pub operations: Vec<BatchOperation>,
76}
77
78/// Metadata for a batch file
79#[derive(Debug, Serialize, Deserialize)]
80pub struct BatchMetadata {
81    /// Name/description of this batch
82    pub name: Option<String>,
83    /// Version of the batch file format
84    pub version: Option<String>,
85    /// Description of what this batch does
86    pub description: Option<String>,
87    /// Default configuration for all operations in this batch
88    pub defaults: Option<BatchDefaults>,
89}
90
91/// Default configuration for batch operations
92#[derive(Debug, Serialize, Deserialize)]
93pub struct BatchDefaults {
94    /// Default headers to apply to all operations
95    #[serde(default)]
96    pub headers: std::collections::HashMap<String, String>,
97    /// Default cache setting for all operations
98    pub use_cache: Option<bool>,
99}
100
101/// Result of a single batch operation
102#[derive(Debug)]
103pub struct BatchOperationResult {
104    /// The operation that was executed
105    pub operation: BatchOperation,
106    /// Whether the operation succeeded
107    pub success: bool,
108    /// Error message if the operation failed
109    pub error: Option<String>,
110    /// Response body if the operation succeeded
111    pub response: Option<String>,
112    /// Time taken to execute this operation
113    pub duration: std::time::Duration,
114}
115
116/// Result of an entire batch execution
117#[derive(Debug)]
118pub struct BatchResult {
119    /// Results for each operation
120    pub results: Vec<BatchOperationResult>,
121    /// Total time taken for the entire batch
122    pub total_duration: std::time::Duration,
123    /// Number of successful operations
124    pub success_count: usize,
125    /// Number of failed operations
126    pub failure_count: usize,
127}
128
129/// Batch processor for executing multiple API operations
130pub struct BatchProcessor {
131    config: BatchConfig,
132    rate_limiter: Option<Arc<DefaultDirectRateLimiter>>,
133    semaphore: Arc<Semaphore>,
134}
135
136impl BatchProcessor {
137    /// Creates a new batch processor with the given configuration
138    ///
139    /// # Panics
140    ///
141    /// Panics if the rate limit is configured as 0 (which would be invalid)
142    #[must_use]
143    pub fn new(config: BatchConfig) -> Self {
144        let rate_limiter = config.rate_limit.map(|limit| {
145            Arc::new(RateLimiter::direct(Quota::per_second(
146                NonZeroU32::new(limit).unwrap_or(NonZeroU32::new(1).expect("1 is non-zero")),
147            )))
148        });
149
150        let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
151
152        Self {
153            config,
154            rate_limiter,
155            semaphore,
156        }
157    }
158
159    /// Parses a batch file from the given path
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if:
164    /// - The file cannot be read
165    /// - The file is not valid JSON or YAML
166    /// - The file structure doesn't match the expected `BatchFile` format
167    pub async fn parse_batch_file(path: &Path) -> Result<BatchFile, Error> {
168        let content = tokio::fs::read_to_string(path)
169            .await
170            .map_err(|e| Error::io_error(format!("Failed to read batch file: {e}")))?;
171
172        // Try to parse as JSON first, then YAML
173        if let Ok(batch_file) = serde_json::from_str::<BatchFile>(&content) {
174            return Ok(batch_file);
175        }
176
177        if let Ok(batch_file) = serde_yaml::from_str::<BatchFile>(&content) {
178            return Ok(batch_file);
179        }
180
181        Err(Error::validation_error(format!(
182            "Failed to parse batch file as JSON or YAML: {}",
183            path.display()
184        )))
185    }
186
187    /// Executes a batch of operations
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if:
192    /// - Any operation fails and `continue_on_error` is false
193    /// - Task spawning fails
194    /// - Network or API errors occur during operation execution
195    ///
196    /// # Panics
197    ///
198    /// Panics if the semaphore is poisoned (should not happen in normal operation)
199    #[allow(clippy::too_many_arguments)]
200    pub async fn execute_batch(
201        &self,
202        spec: &CachedSpec,
203        batch_file: BatchFile,
204        global_config: Option<&GlobalConfig>,
205        base_url: Option<&str>,
206        dry_run: bool,
207        output_format: &crate::cli::OutputFormat,
208        jq_filter: Option<&str>,
209    ) -> Result<BatchResult, Error> {
210        let start_time = std::time::Instant::now();
211        let total_operations = batch_file.operations.len();
212
213        if self.config.show_progress {
214            // ast-grep-ignore: no-println
215            println!("Starting batch execution: {total_operations} operations");
216        }
217
218        let mut results = Vec::with_capacity(total_operations);
219        let mut handles = Vec::new();
220
221        // Create tasks for each operation
222        for (index, operation) in batch_file.operations.into_iter().enumerate() {
223            let spec = spec.clone();
224            let global_config = global_config.cloned();
225            let base_url = base_url.map(String::from);
226            let output_format = output_format.clone();
227            let jq_filter = jq_filter.map(String::from);
228            let semaphore = Arc::clone(&self.semaphore);
229            let rate_limiter = self.rate_limiter.clone();
230            let show_progress = self.config.show_progress;
231            let suppress_output = self.config.suppress_output;
232
233            let handle = tokio::spawn(async move {
234                // Acquire semaphore permit for concurrency control
235                let _permit = semaphore
236                    .acquire()
237                    .await
238                    .expect("semaphore should not be closed");
239
240                // Apply rate limiting if configured
241                if let Some(limiter) = rate_limiter {
242                    limiter.until_ready().await;
243                }
244
245                let operation_start = std::time::Instant::now();
246
247                // Execute the operation
248                let result = Self::execute_single_operation(
249                    &spec,
250                    &operation,
251                    global_config.as_ref(),
252                    base_url.as_deref(),
253                    dry_run,
254                    &output_format,
255                    jq_filter.as_deref(),
256                    suppress_output,
257                )
258                .await;
259
260                let duration = operation_start.elapsed();
261
262                let (success, error, response) = match result {
263                    Ok(resp) => {
264                        if show_progress {
265                            // ast-grep-ignore: no-println
266                            println!("Operation {} completed", index + 1);
267                        }
268                        (true, None, Some(resp))
269                    }
270                    Err(e) => {
271                        if show_progress {
272                            // ast-grep-ignore: no-println
273                            println!("Operation {} failed: {}", index + 1, e);
274                        }
275                        (false, Some(e.to_string()), None)
276                    }
277                };
278
279                BatchOperationResult {
280                    operation,
281                    success,
282                    error,
283                    response,
284                    duration,
285                }
286            });
287
288            handles.push(handle);
289        }
290
291        // Collect all results
292        for handle in handles {
293            let result = handle
294                .await
295                .map_err(|e| Error::invalid_config(format!("Task failed: {e}")))?;
296            results.push(result);
297        }
298
299        let total_duration = start_time.elapsed();
300        let success_count = results.iter().filter(|r| r.success).count();
301        let failure_count = results.len() - success_count;
302
303        if self.config.show_progress {
304            // ast-grep-ignore: no-println
305            println!(
306                "Batch execution completed: {}/{} operations successful in {:.2}s",
307                success_count,
308                total_operations,
309                total_duration.as_secs_f64()
310            );
311        }
312
313        Ok(BatchResult {
314            results,
315            total_duration,
316            success_count,
317            failure_count,
318        })
319    }
320
321    /// Executes a single operation from a batch
322    #[allow(clippy::too_many_arguments)]
323    async fn execute_single_operation(
324        spec: &CachedSpec,
325        operation: &BatchOperation,
326        global_config: Option<&GlobalConfig>,
327        base_url: Option<&str>,
328        dry_run: bool,
329        output_format: &crate::cli::OutputFormat,
330        jq_filter: Option<&str>,
331        suppress_output: bool,
332    ) -> Result<String, Error> {
333        use crate::cli::translate;
334        use crate::invocation::ExecutionContext;
335
336        // Generate the command tree and parse operation args into ArgMatches
337        let command = generator::generate_command_tree_with_flags(spec, false);
338        let matches = command
339            .try_get_matches_from(
340                std::iter::once(crate::constants::CLI_ROOT_COMMAND.to_string())
341                    .chain(operation.args.clone()),
342            )
343            .map_err(|e| Error::invalid_command(crate::constants::CONTEXT_BATCH, e.to_string()))?;
344
345        // Translate ArgMatches → OperationCall
346        let call = translate::matches_to_operation_call(spec, &matches)?;
347
348        // Build cache configuration
349        let cache_enabled = operation.use_cache.unwrap_or(false);
350        let cache_config = if cache_enabled {
351            let config_dir =
352                if let Ok(dir) = std::env::var(crate::constants::ENV_APERTURE_CONFIG_DIR) {
353                    std::path::PathBuf::from(dir)
354                } else {
355                    crate::config::manager::get_config_dir()?
356                };
357            Some(crate::response_cache::CacheConfig {
358                cache_dir: config_dir
359                    .join(crate::constants::DIR_CACHE)
360                    .join(crate::constants::DIR_RESPONSES),
361                default_ttl: std::time::Duration::from_secs(300),
362                max_entries: 1000,
363                enabled: true,
364                allow_authenticated: false,
365            })
366        } else {
367            None
368        };
369
370        // Build retry context from operation settings and global config defaults
371        let retry_context = build_batch_retry_context(operation, global_config)?;
372
373        // Build ExecutionContext
374        let ctx = ExecutionContext {
375            dry_run,
376            idempotency_key: None,
377            cache_config,
378            retry_context,
379            base_url: base_url.map(String::from),
380            global_config: global_config.cloned(),
381            server_var_args: translate::extract_server_var_args(&matches),
382        };
383
384        // Execute
385        let result = crate::engine::executor::execute(spec, call, ctx).await?;
386
387        // Render based on suppress_output flag
388        if suppress_output {
389            let output =
390                crate::cli::render::render_result_to_string(&result, output_format, jq_filter)?;
391            return Ok(output.unwrap_or_default());
392        }
393
394        crate::cli::render::render_result(&result, output_format, jq_filter)?;
395
396        Ok(format!(
397            "Successfully executed operation: {}",
398            operation
399                .id
400                .as_deref()
401                .unwrap_or(crate::constants::DEFAULT_OPERATION_NAME)
402        ))
403    }
404}
405
406/// Builds a `RetryContext` from batch operation settings and global configuration.
407///
408/// Operation-level settings take precedence over global config defaults.
409#[allow(clippy::cast_possible_truncation)]
410fn build_batch_retry_context(
411    operation: &BatchOperation,
412    global_config: Option<&GlobalConfig>,
413) -> Result<Option<RetryContext>, Error> {
414    // Get retry defaults from global config
415    let defaults = global_config.map(|c| &c.retry_defaults);
416
417    // Determine max_attempts: operation > global config > 0 (disabled)
418    let max_attempts = operation
419        .retry
420        .or_else(|| defaults.map(|d| d.max_attempts))
421        .unwrap_or(0);
422
423    // If retries are disabled, return None
424    if max_attempts == 0 {
425        return Ok(None);
426    }
427
428    // Determine initial_delay_ms: operation > global config > 500ms default
429    // Truncation is safe: delay values in practice are well under u64::MAX milliseconds
430    let initial_delay_ms = if let Some(ref delay_str) = operation.retry_delay {
431        parse_duration(delay_str)?.as_millis() as u64
432    } else {
433        defaults.map_or(500, |d| d.initial_delay_ms)
434    };
435
436    // Determine max_delay_ms: operation > global config > 30000ms default
437    // Truncation is safe: delay values in practice are well under u64::MAX milliseconds
438    let max_delay_ms = if let Some(ref delay_str) = operation.retry_max_delay {
439        parse_duration(delay_str)?.as_millis() as u64
440    } else {
441        defaults.map_or(30_000, |d| d.max_delay_ms)
442    };
443
444    Ok(Some(RetryContext {
445        max_attempts,
446        initial_delay_ms,
447        max_delay_ms,
448        force_retry: operation.force_retry,
449        method: None,               // Will be determined in executor
450        has_idempotency_key: false, // Batch operations don't support idempotency keys yet
451    }))
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use std::io::Write;
458    use tempfile::NamedTempFile;
459
460    #[tokio::test]
461    async fn test_parse_batch_file_json() {
462        let batch_content = r#"{
463            "metadata": {
464                "name": "Test batch",
465                "description": "A test batch file"
466            },
467            "operations": [
468                {
469                    "id": "op1",
470                    "args": ["users", "list"],
471                    "description": "List all users"
472                },
473                {
474                    "id": "op2", 
475                    "args": ["users", "get", "--user-id", "123"],
476                    "description": "Get user 123"
477                }
478            ]
479        }"#;
480
481        let mut temp_file = NamedTempFile::new().unwrap();
482        temp_file.write_all(batch_content.as_bytes()).unwrap();
483
484        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
485            .await
486            .unwrap();
487
488        assert_eq!(batch_file.operations.len(), 2);
489        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
490        assert_eq!(
491            batch_file.operations[1].args,
492            vec!["users", "get", "--user-id", "123"]
493        );
494    }
495
496    #[tokio::test]
497    async fn test_parse_batch_file_yaml() {
498        let batch_content = r#"
499metadata:
500  name: Test batch
501  description: A test batch file
502operations:
503  - id: op1
504    args: [users, list]
505    description: List all users
506  - id: op2
507    args: [users, get, --user-id, "123"]
508    description: Get user 123
509"#;
510
511        let mut temp_file = NamedTempFile::new().unwrap();
512        temp_file.write_all(batch_content.as_bytes()).unwrap();
513
514        let batch_file = BatchProcessor::parse_batch_file(temp_file.path())
515            .await
516            .unwrap();
517
518        assert_eq!(batch_file.operations.len(), 2);
519        assert_eq!(batch_file.operations[0].args, vec!["users", "list"]);
520        assert_eq!(
521            batch_file.operations[1].args,
522            vec!["users", "get", "--user-id", "123"]
523        );
524    }
525
526    #[test]
527    fn test_batch_config_default() {
528        let config = BatchConfig::default();
529        assert_eq!(config.max_concurrency, 5);
530        assert_eq!(config.rate_limit, None);
531        assert!(config.continue_on_error);
532        assert!(config.show_progress);
533    }
534
535    #[test]
536    fn test_batch_processor_creation() {
537        let config = BatchConfig {
538            max_concurrency: 10,
539            rate_limit: Some(5),
540            continue_on_error: false,
541            show_progress: false,
542            suppress_output: false,
543        };
544
545        let processor = BatchProcessor::new(config);
546        assert_eq!(processor.semaphore.available_permits(), 10);
547        assert!(processor.rate_limiter.is_some());
548    }
549}