sql_cli/web/
http_fetcher.rs

1// HTTP fetcher for WEB CTEs (now supports file:// URLs too)
2use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{load_csv_from_reader, load_json_from_reader};
13use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
14
15#[cfg(feature = "redis-cache")]
16use crate::redis_cache_module::RedisCache;
17
18/// Fetches data from a URL and converts it to a DataTable
19pub struct WebDataFetcher {
20    client: reqwest::blocking::Client,
21}
22
23impl WebDataFetcher {
24    pub fn new() -> Result<Self> {
25        let client = reqwest::blocking::Client::builder()
26            .timeout(Duration::from_secs(30))
27            .user_agent("sql-cli/1.0")
28            .build()?;
29
30        Ok(Self { client })
31    }
32
33    /// Fetch data from a WEB CTE specification (supports http://, https://, and file:// URLs)
34    /// Each WEB CTE is cached independently based only on its own properties
35    pub fn fetch(
36        &self,
37        spec: &WebCTESpec,
38        table_name: &str,
39        _query_context: Option<&str>, // Kept for API compatibility but not used for caching
40    ) -> Result<DataTable> {
41        // Check if this is a file:// URL (no caching for local files)
42        if spec.url.starts_with("file://") {
43            return self.fetch_file(spec, table_name);
44        }
45
46        // Generate cache key from ALL Web CTE spec fields
47        // Each WEB CTE is independent - cache key depends only on the CTE itself
48        #[cfg(feature = "redis-cache")]
49        let cache_key = {
50            let method = format!("{:?}", spec.method.as_ref().unwrap_or(&HttpMethod::GET));
51
52            // Use the full cache key generation with all WebCTESpec fields
53            RedisCache::generate_key_full(
54                table_name, // CTE name
55                &spec.url,
56                Some(&method),
57                &spec.headers,
58                spec.body.as_deref(),
59                "",                        // Empty context - not used for independent caching
60                spec.json_path.as_deref(), // JSON extraction path
61                &spec.form_files,          // Multipart form files
62                &spec.form_fields,         // Multipart form fields
63            )
64        };
65
66        // Try cache first
67        #[cfg(feature = "redis-cache")]
68        {
69            let mut cache = RedisCache::new();
70            if cache.is_enabled() {
71                if let Some(cached_bytes) = cache.get(&cache_key) {
72                    // Try to deserialize from cached parquet
73                    match DataTable::from_parquet_bytes(&cached_bytes) {
74                        Ok(table) => {
75                            eprintln!(
76                                "Cache HIT for {} (key: {}...)",
77                                table_name,
78                                &cache_key[0..48.min(cache_key.len())]
79                            );
80                            return Ok(table);
81                        }
82                        Err(e) => {
83                            debug!("Failed to deserialize cached data: {}", e);
84                            // Continue to fetch from network
85                        }
86                    }
87                } else {
88                    eprintln!(
89                        "Cache MISS for {} (key: {}...)",
90                        table_name,
91                        &cache_key[0..48.min(cache_key.len())]
92                    );
93                }
94            }
95        }
96
97        info!("Fetching data from URL: {}", spec.url);
98
99        // Regular HTTP/HTTPS handling
100        // Build request based on method
101        let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
102            HttpMethod::GET => self.client.get(&spec.url),
103            HttpMethod::POST => self.client.post(&spec.url),
104            HttpMethod::PUT => self.client.put(&spec.url),
105            HttpMethod::DELETE => self.client.delete(&spec.url),
106            HttpMethod::PATCH => self.client.patch(&spec.url),
107        };
108
109        // Add headers if provided
110        for (key, value) in &spec.headers {
111            let resolved_value = self.resolve_env_var(value)?;
112            request = request.header(key, resolved_value);
113        }
114
115        // Handle multipart form data if form_files are specified
116        if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
117            let mut form = reqwest::blocking::multipart::Form::new();
118
119            // Add files
120            for (field_name, file_path) in &spec.form_files {
121                let resolved_path = self.resolve_env_var(file_path)?;
122                let file = std::fs::File::open(&resolved_path)
123                    .with_context(|| format!("Failed to open file: {}", resolved_path))?;
124                let file_name = std::path::Path::new(&resolved_path)
125                    .file_name()
126                    .and_then(|n| n.to_str())
127                    .unwrap_or("file")
128                    .to_string();
129                let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
130                form = form.part(field_name.clone(), part);
131            }
132
133            // Add regular form fields
134            for (field_name, value) in &spec.form_fields {
135                let resolved_value = self.resolve_env_var(value)?;
136                form = form.text(field_name.clone(), resolved_value);
137            }
138
139            request = request.multipart(form);
140        }
141        // Add body if provided (typically for POST/PUT/PATCH) - only if not using multipart
142        else if let Some(body) = &spec.body {
143            let resolved_body = self.resolve_env_var(body)?;
144
145            // Debug: Always print the expanded body to help verify template expansion
146            eprintln!("\n=== WEB CTE Request Debug ===");
147            eprintln!("URL: {}", spec.url);
148            eprintln!(
149                "Method: {:?}",
150                spec.method.as_ref().unwrap_or(&HttpMethod::POST)
151            );
152            eprintln!("Body (after template expansion):");
153            eprintln!("{}", resolved_body);
154            eprintln!("=============================\n");
155
156            request = request.body(resolved_body);
157            // Set Content-Type to JSON if not already set and body looks like JSON
158            if spec.body.as_ref().unwrap().trim().starts_with('{') {
159                request = request.header("Content-Type", "application/json");
160            }
161        }
162
163        // Execute request
164        let response = request
165            .send()
166            .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
167
168        // Check status
169        if !response.status().is_success() {
170            return Err(anyhow::anyhow!(
171                "HTTP request failed with status {}: {}",
172                response.status(),
173                spec.url
174            ));
175        }
176
177        // Get content type for format detection
178        let content_type = response
179            .headers()
180            .get("content-type")
181            .and_then(|v| v.to_str().ok())
182            .unwrap_or("")
183            .to_string();
184
185        debug!("Response content-type: {}", content_type);
186
187        // Read response body
188        let bytes = response.bytes()?;
189
190        // Determine format
191        let format = match &spec.format {
192            Some(fmt) => fmt.clone(),
193            None => self.detect_format(&spec.url, &content_type),
194        };
195
196        info!("Using format: {:?} for {}", format, spec.url);
197
198        // Parse the data based on format
199        let result = if let Some(json_path) = &spec.json_path {
200            if matches!(format, DataFormat::JSON | DataFormat::Auto) {
201                // Parse JSON and extract the path
202                let json_value: serde_json::Value = serde_json::from_slice(&bytes)
203                    .with_context(|| "Failed to parse JSON for path extraction")?;
204
205                // Navigate to the specified path
206                let extracted = self
207                    .navigate_json_path(&json_value, json_path)
208                    .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
209
210                // Convert extracted value to bytes and parse as table
211                let array_value = match extracted {
212                    serde_json::Value::Array(_) => extracted.clone(),
213                    _ => serde_json::Value::Array(vec![extracted.clone()]),
214                };
215
216                let extracted_bytes = serde_json::to_vec(&array_value)?;
217                self.parse_data(
218                    extracted_bytes,
219                    DataFormat::JSON,
220                    table_name,
221                    "web",
222                    &spec.url,
223                )?
224            } else {
225                self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)?
226            }
227        } else {
228            self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)?
229        };
230
231        // Cache the result if caching is enabled
232        #[cfg(feature = "redis-cache")]
233        {
234            let mut cache = RedisCache::new();
235            if cache.is_enabled() {
236                // Determine TTL based on spec or smart defaults
237                let ttl = spec.cache_seconds.unwrap_or_else(|| {
238                    // Smart defaults based on URL and body
239                    if spec.url.contains("prod") {
240                        3600 // 1 hour for production
241                    } else if spec.url.contains("staging") {
242                        300 // 5 minutes for staging
243                    } else {
244                        600 // 10 minutes default
245                    }
246                });
247
248                // Serialize to parquet and cache
249                match result.to_parquet_bytes() {
250                    Ok(parquet_bytes) => {
251                        if let Err(e) = cache.set(&cache_key, &parquet_bytes, ttl) {
252                            debug!("Failed to cache result: {}", e);
253                        } else {
254                            eprintln!("Cached {} for {} seconds", table_name, ttl);
255                        }
256                    }
257                    Err(e) => {
258                        debug!("Failed to serialize to parquet: {}", e);
259                    }
260                }
261            }
262        }
263
264        Ok(result)
265    }
266
267    /// Fetch data from a file:// URL
268    fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
269        // Extract path from file:// URL
270        let file_path = if spec.url.starts_with("file://") {
271            &spec.url[7..] // Remove "file://" prefix
272        } else {
273            &spec.url
274        };
275
276        info!("Reading local file: {}", file_path);
277
278        // Check if file exists
279        let path = Path::new(file_path);
280        if !path.exists() {
281            return Err(anyhow::anyhow!("File not found: {}", file_path));
282        }
283
284        // Open file
285        let file =
286            File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
287
288        // Get file size for memory checks
289        let metadata = file.metadata()?;
290        let file_size = metadata.len();
291        debug!("File size: {} bytes", file_size);
292
293        // Determine format from extension or spec
294        let format = match &spec.format {
295            Some(fmt) => fmt.clone(),
296            None => self.detect_format(file_path, ""),
297        };
298
299        info!("Using format: {:?} for {}", format, file_path);
300
301        // Parse based on format
302        match format {
303            DataFormat::CSV => load_csv_from_reader(file, table_name, "file", file_path)
304                .with_context(|| format!("Failed to parse CSV from {}", file_path)),
305            DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
306                .with_context(|| format!("Failed to parse JSON from {}", file_path)),
307            DataFormat::Auto => {
308                // For files, we can't retry with the same reader, so determine based on extension
309                if file_path.ends_with(".json") {
310                    let file = File::open(path)?;
311                    load_json_from_reader(file, table_name, "file", file_path)
312                        .with_context(|| format!("Failed to parse JSON from {}", file_path))
313                } else {
314                    // Default to CSV for auto-detect with files
315                    let file = File::open(path)?;
316                    load_csv_from_reader(file, table_name, "file", file_path)
317                        .with_context(|| format!("Failed to parse CSV from {}", file_path))
318                }
319            }
320        }
321    }
322
323    /// Parse data bytes based on format
324    fn parse_data(
325        &self,
326        bytes: Vec<u8>,
327        format: DataFormat,
328        table_name: &str,
329        source_type: &str,
330        source_path: &str,
331    ) -> Result<DataTable> {
332        match format {
333            DataFormat::CSV => {
334                let reader = Cursor::new(bytes);
335                load_csv_from_reader(reader, table_name, source_type, source_path)
336                    .with_context(|| format!("Failed to parse CSV from {}", source_path))
337            }
338            DataFormat::JSON => {
339                let reader = Cursor::new(bytes);
340                load_json_from_reader(reader, table_name, source_type, source_path)
341                    .with_context(|| format!("Failed to parse JSON from {}", source_path))
342            }
343            DataFormat::Auto => {
344                // Try CSV first, then JSON
345                let reader_csv = Cursor::new(bytes.clone());
346                match load_csv_from_reader(reader_csv, table_name, source_type, source_path) {
347                    Ok(table) => Ok(table),
348                    Err(_) => {
349                        debug!("CSV parsing failed, trying JSON");
350                        let reader_json = Cursor::new(bytes);
351                        load_json_from_reader(reader_json, table_name, source_type, source_path)
352                            .with_context(|| format!("Failed to parse data from {}", source_path))
353                    }
354                }
355            }
356        }
357    }
358
359    /// Detect format from URL extension or content type
360    fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
361        // Check content type first
362        if content_type.contains("json") {
363            return DataFormat::JSON;
364        }
365        if content_type.contains("csv") || content_type.contains("text/plain") {
366            return DataFormat::CSV;
367        }
368
369        // Check URL extension
370        if url.ends_with(".json") {
371            DataFormat::JSON
372        } else if url.ends_with(".csv") {
373            DataFormat::CSV
374        } else {
375            // Default to auto-detect
376            DataFormat::Auto
377        }
378    }
379
380    /// Extract data from a specific JSON path
381    fn extract_json_path(
382        &self,
383        _table: DataTable,
384        json_path: &str,
385        bytes: &[u8],
386    ) -> Result<DataTable> {
387        // Parse the JSON
388        let json_value: serde_json::Value = serde_json::from_slice(bytes)
389            .with_context(|| "Failed to parse JSON for path extraction")?;
390
391        // Navigate to the specified path
392        let extracted = self
393            .navigate_json_path(&json_value, json_path)
394            .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
395
396        // If the extracted value is already an array, use it directly
397        // Otherwise, wrap it in an array for consistent handling
398        let array_value = match extracted {
399            serde_json::Value::Array(_) => extracted.clone(),
400            _ => serde_json::Value::Array(vec![extracted.clone()]),
401        };
402
403        // Convert to bytes and parse as a table
404        let extracted_bytes = serde_json::to_vec(&array_value)?;
405
406        // Re-parse the extracted JSON as a DataTable
407        let reader = Cursor::new(extracted_bytes);
408        load_json_from_reader(reader, "extracted", "web", json_path)
409    }
410
411    /// Navigate to a specific path in JSON structure
412    fn navigate_json_path<'a>(
413        &self,
414        value: &'a serde_json::Value,
415        path: &str,
416    ) -> Result<&'a serde_json::Value> {
417        let mut current = value;
418
419        // Split path by dots (simple path navigation for now)
420        // Future enhancement: support array indexing like "Result[0]"
421        for part in path.split('.') {
422            if part.is_empty() {
423                continue;
424            }
425
426            current = current
427                .get(part)
428                .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", part))?;
429        }
430
431        Ok(current)
432    }
433
434    /// Resolve environment variables in values (${VAR_NAME} or $VAR_NAME syntax)
435    fn resolve_env_var(&self, value: &str) -> Result<String> {
436        let mut result = value.to_string();
437
438        // Handle ${VAR} syntax - can be embedded in strings
439        // Use lazy_static for better performance, but for now just compile inline
440        let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
441        for cap in re.captures_iter(value) {
442            let var_name = &cap[1];
443            match std::env::var(var_name) {
444                Ok(var_value) => {
445                    result = result.replace(&cap[0], &var_value);
446                }
447                Err(_) => {
448                    // For security, don't expose which env vars exist
449                    // Just log a debug message and keep the placeholder
450                    debug!(
451                        "Environment variable {} not found, keeping placeholder",
452                        var_name
453                    );
454                }
455            }
456        }
457
458        // Also handle simple $VAR syntax at the start of the string
459        if result.starts_with('$') && !result.starts_with("${") {
460            let var_name = &result[1..];
461            if let Ok(var_value) = std::env::var(var_name) {
462                return Ok(var_value);
463            }
464        }
465
466        Ok(result)
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[test]
475    fn test_detect_format() {
476        let fetcher = WebDataFetcher::new().unwrap();
477
478        // Test URL-based detection
479        assert!(matches!(
480            fetcher.detect_format("http://example.com/data.csv", ""),
481            DataFormat::CSV
482        ));
483        assert!(matches!(
484            fetcher.detect_format("http://example.com/data.json", ""),
485            DataFormat::JSON
486        ));
487
488        // Test content-type detection
489        assert!(matches!(
490            fetcher.detect_format("http://example.com/data", "application/json"),
491            DataFormat::JSON
492        ));
493        assert!(matches!(
494            fetcher.detect_format("http://example.com/data", "text/csv"),
495            DataFormat::CSV
496        ));
497
498        // Test auto-detect fallback
499        assert!(matches!(
500            fetcher.detect_format("http://example.com/data", ""),
501            DataFormat::Auto
502        ));
503    }
504}