Skip to main content

sql_cli/web/
http_fetcher.rs

1// HTTP fetcher for WEB CTEs (now supports file:// URLs too)
2use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{
13    load_csv_from_reader_with_opts, load_json_from_reader, navigate_json_path, CsvReadOptions,
14};
15use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
16
17#[cfg(feature = "redis-cache")]
18use crate::redis_cache_module::RedisCache;
19
20/// Fetches data from a URL and converts it to a DataTable
21pub struct WebDataFetcher {
22    client: reqwest::blocking::Client,
23}
24
25impl WebDataFetcher {
26    pub fn new() -> Result<Self> {
27        let client = reqwest::blocking::Client::builder()
28            .timeout(Duration::from_secs(30))
29            .user_agent("sql-cli/1.0")
30            .build()?;
31
32        Ok(Self { client })
33    }
34
35    /// Fetch data from a WEB CTE specification (supports http://, https://, and file:// URLs)
36    /// Each WEB CTE is cached independently based only on its own properties
37    pub fn fetch(
38        &self,
39        spec: &WebCTESpec,
40        table_name: &str,
41        _query_context: Option<&str>, // Kept for API compatibility but not used for caching
42    ) -> Result<DataTable> {
43        // Check if this is a file:// URL (no caching for local files)
44        if spec.url.starts_with("file://") {
45            return self.fetch_file(spec, table_name);
46        }
47
48        // Generate cache key from ALL Web CTE spec fields
49        // Each WEB CTE is independent - cache key depends only on the CTE itself
50        #[cfg(feature = "redis-cache")]
51        let cache_key = {
52            let method = format!("{:?}", spec.method.as_ref().unwrap_or(&HttpMethod::GET));
53
54            // Use the full cache key generation with all WebCTESpec fields
55            RedisCache::generate_key_full(
56                table_name, // CTE name
57                &spec.url,
58                Some(&method),
59                &spec.headers,
60                spec.body.as_deref(),
61                "",                        // Empty context - not used for independent caching
62                spec.json_path.as_deref(), // JSON extraction path
63                &spec.form_files,          // Multipart form files
64                &spec.form_fields,         // Multipart form fields
65            )
66        };
67
68        // Try cache first
69        #[cfg(feature = "redis-cache")]
70        {
71            let mut cache = RedisCache::new();
72            if cache.is_enabled() {
73                if let Some(cached_bytes) = cache.get(&cache_key) {
74                    // Try to deserialize from cached parquet
75                    match DataTable::from_parquet_bytes(&cached_bytes) {
76                        Ok(table) => {
77                            eprintln!(
78                                "Cache HIT for {} (key: {}...)",
79                                table_name,
80                                &cache_key[0..48.min(cache_key.len())]
81                            );
82                            return Ok(table);
83                        }
84                        Err(e) => {
85                            debug!("Failed to deserialize cached data: {}", e);
86                            // Continue to fetch from network
87                        }
88                    }
89                } else {
90                    eprintln!(
91                        "Cache MISS for {} (key: {}...)",
92                        table_name,
93                        &cache_key[0..48.min(cache_key.len())]
94                    );
95                }
96            }
97        }
98
99        info!("Fetching data from URL: {}", spec.url);
100
101        // Regular HTTP/HTTPS handling
102        // Build request based on method
103        let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
104            HttpMethod::GET => self.client.get(&spec.url),
105            HttpMethod::POST => self.client.post(&spec.url),
106            HttpMethod::PUT => self.client.put(&spec.url),
107            HttpMethod::DELETE => self.client.delete(&spec.url),
108            HttpMethod::PATCH => self.client.patch(&spec.url),
109        };
110
111        // Add headers if provided
112        for (key, value) in &spec.headers {
113            let resolved_value = self.resolve_env_var(value)?;
114            request = request.header(key, resolved_value);
115        }
116
117        // Handle multipart form data if form_files are specified
118        if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
119            let mut form = reqwest::blocking::multipart::Form::new();
120
121            // Add files
122            for (field_name, file_path) in &spec.form_files {
123                let resolved_path = self.resolve_env_var(file_path)?;
124                let file = std::fs::File::open(&resolved_path)
125                    .with_context(|| format!("Failed to open file: {}", resolved_path))?;
126                let file_name = std::path::Path::new(&resolved_path)
127                    .file_name()
128                    .and_then(|n| n.to_str())
129                    .unwrap_or("file")
130                    .to_string();
131                let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
132                form = form.part(field_name.clone(), part);
133            }
134
135            // Add regular form fields
136            for (field_name, value) in &spec.form_fields {
137                let resolved_value = self.resolve_env_var(value)?;
138                form = form.text(field_name.clone(), resolved_value);
139            }
140
141            request = request.multipart(form);
142        }
143        // Add body if provided (typically for POST/PUT/PATCH) - only if not using multipart
144        else if let Some(body) = &spec.body {
145            let resolved_body = self.resolve_env_var(body)?;
146
147            // Debug: Always print the expanded body to help verify template expansion
148            eprintln!("\n=== WEB CTE Request Debug ===");
149            eprintln!("URL: {}", spec.url);
150            eprintln!(
151                "Method: {:?}",
152                spec.method.as_ref().unwrap_or(&HttpMethod::POST)
153            );
154            eprintln!("Body (after template expansion):");
155            eprintln!("{}", resolved_body);
156            eprintln!("=============================\n");
157
158            request = request.body(resolved_body);
159            // Set Content-Type to JSON only when the user didn't set one themselves
160            // and the body looks like JSON. ES 8.x and other strict servers reject
161            // requests with two Content-Type headers.
162            let has_content_type = spec
163                .headers
164                .iter()
165                .any(|(k, _)| k.eq_ignore_ascii_case("content-type"));
166            if !has_content_type && spec.body.as_ref().unwrap().trim().starts_with('{') {
167                request = request.header("Content-Type", "application/json");
168            }
169        }
170
171        // Execute request
172        let response = request
173            .send()
174            .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
175
176        // Check status
177        if !response.status().is_success() {
178            let status = response.status();
179            // Drain the body so callers see the server's diagnostic (ES, GraphQL,
180            // etc. put the real "why" here). Truncate so a giant HTML page can't
181            // flood the terminal.
182            let body = response.text().unwrap_or_default();
183            let snippet: String = body.chars().take(2000).collect();
184            return Err(anyhow::anyhow!(
185                "HTTP request failed with status {}: {}\nResponse body: {}",
186                status,
187                spec.url,
188                snippet
189            ));
190        }
191
192        // Get content type for format detection
193        let content_type = response
194            .headers()
195            .get("content-type")
196            .and_then(|v| v.to_str().ok())
197            .unwrap_or("")
198            .to_string();
199
200        debug!("Response content-type: {}", content_type);
201
202        // Read response body
203        let bytes = response.bytes()?;
204
205        // Determine format
206        let format = match &spec.format {
207            Some(fmt) => fmt.clone(),
208            None => self.detect_format(&spec.url, &content_type),
209        };
210
211        info!("Using format: {:?} for {}", format, spec.url);
212
213        // Parse the data based on format
214        let result = if let Some(json_path) = &spec.json_path {
215            if matches!(format, DataFormat::JSON | DataFormat::Auto) {
216                // Parse JSON and extract the path
217                let json_value: serde_json::Value = serde_json::from_slice(&bytes)
218                    .with_context(|| "Failed to parse JSON for path extraction")?;
219
220                // Navigate to the specified path
221                let extracted = navigate_json_path(&json_value, json_path)
222                    .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
223
224                // Convert extracted value to bytes and parse as table
225                let array_value = match extracted {
226                    serde_json::Value::Array(_) => extracted,
227                    other => serde_json::Value::Array(vec![other]),
228                };
229
230                let extracted_bytes = serde_json::to_vec(&array_value)?;
231                // JSON path: delimiter is irrelevant (output is JSON), but the
232                // arg is still required by the function signature.
233                self.parse_data(
234                    extracted_bytes,
235                    DataFormat::JSON,
236                    table_name,
237                    "web",
238                    &spec.url,
239                    spec.delimiter,
240                )?
241            } else {
242                self.parse_data(
243                    bytes.to_vec(),
244                    format,
245                    table_name,
246                    "web",
247                    &spec.url,
248                    spec.delimiter,
249                )?
250            }
251        } else {
252            self.parse_data(
253                bytes.to_vec(),
254                format,
255                table_name,
256                "web",
257                &spec.url,
258                spec.delimiter,
259            )?
260        };
261
262        // Cache the result if caching is enabled
263        #[cfg(feature = "redis-cache")]
264        {
265            let mut cache = RedisCache::new();
266            if cache.is_enabled() {
267                // Determine TTL based on spec or smart defaults
268                let ttl = spec.cache_seconds.unwrap_or_else(|| {
269                    // Smart defaults based on URL and body
270                    if spec.url.contains("prod") {
271                        3600 // 1 hour for production
272                    } else if spec.url.contains("staging") {
273                        300 // 5 minutes for staging
274                    } else {
275                        600 // 10 minutes default
276                    }
277                });
278
279                // Serialize to parquet and cache
280                match result.to_parquet_bytes() {
281                    Ok(parquet_bytes) => {
282                        if let Err(e) = cache.set(&cache_key, &parquet_bytes, ttl) {
283                            debug!("Failed to cache result: {}", e);
284                        } else {
285                            eprintln!("Cached {} for {} seconds", table_name, ttl);
286                        }
287                    }
288                    Err(e) => {
289                        debug!("Failed to serialize to parquet: {}", e);
290                    }
291                }
292            }
293        }
294
295        Ok(result)
296    }
297
298    /// Fetch data from a file:// URL
299    fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
300        // Extract path from file:// URL
301        let file_path = if spec.url.starts_with("file://") {
302            &spec.url[7..] // Remove "file://" prefix
303        } else {
304            &spec.url
305        };
306
307        info!("Reading local file: {}", file_path);
308
309        // Check if file exists
310        let path = Path::new(file_path);
311        if !path.exists() {
312            return Err(anyhow::anyhow!("File not found: {}", file_path));
313        }
314
315        // Open file
316        let file =
317            File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
318
319        // Get file size for memory checks
320        let metadata = file.metadata()?;
321        let file_size = metadata.len();
322        debug!("File size: {} bytes", file_size);
323
324        // Determine format from extension or spec
325        let format = match &spec.format {
326            Some(fmt) => fmt.clone(),
327            None => self.detect_format(file_path, ""),
328        };
329
330        info!("Using format: {:?} for {}", format, file_path);
331
332        // Build CSV options once; DELIMITER clause wins, else comma. URL paths
333        // are unreliable for extension auto-detect (query strings, redirects),
334        // so we don't probe them here.
335        let csv_opts = CsvReadOptions {
336            delimiter: spec.delimiter.unwrap_or(b','),
337            has_headers: true,
338        };
339
340        // Parse based on format
341        match format {
342            DataFormat::CSV => {
343                load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
344                    .with_context(|| format!("Failed to parse CSV from {}", file_path))
345            }
346            DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
347                .with_context(|| format!("Failed to parse JSON from {}", file_path)),
348            DataFormat::Auto => {
349                // For files, we can't retry with the same reader, so determine based on extension
350                if file_path.ends_with(".json") {
351                    let file = File::open(path)?;
352                    load_json_from_reader(file, table_name, "file", file_path)
353                        .with_context(|| format!("Failed to parse JSON from {}", file_path))
354                } else {
355                    // Default to CSV for auto-detect with files
356                    let file = File::open(path)?;
357                    load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
358                        .with_context(|| format!("Failed to parse CSV from {}", file_path))
359                }
360            }
361        }
362    }
363
364    /// Parse data bytes based on format. `delimiter` is honoured for CSV (and
365    /// Auto when it falls back to CSV); `None` means comma.
366    fn parse_data(
367        &self,
368        bytes: Vec<u8>,
369        format: DataFormat,
370        table_name: &str,
371        source_type: &str,
372        source_path: &str,
373        delimiter: Option<u8>,
374    ) -> Result<DataTable> {
375        let csv_opts = CsvReadOptions {
376            delimiter: delimiter.unwrap_or(b','),
377            has_headers: true,
378        };
379        match format {
380            DataFormat::CSV => {
381                let reader = Cursor::new(bytes);
382                load_csv_from_reader_with_opts(
383                    reader,
384                    table_name,
385                    source_type,
386                    source_path,
387                    &csv_opts,
388                )
389                .with_context(|| format!("Failed to parse CSV from {}", source_path))
390            }
391            DataFormat::JSON => {
392                let reader = Cursor::new(bytes);
393                load_json_from_reader(reader, table_name, source_type, source_path)
394                    .with_context(|| format!("Failed to parse JSON from {}", source_path))
395            }
396            DataFormat::Auto => {
397                // Try CSV first, then JSON
398                let reader_csv = Cursor::new(bytes.clone());
399                match load_csv_from_reader_with_opts(
400                    reader_csv,
401                    table_name,
402                    source_type,
403                    source_path,
404                    &csv_opts,
405                ) {
406                    Ok(table) => Ok(table),
407                    Err(_) => {
408                        debug!("CSV parsing failed, trying JSON");
409                        let reader_json = Cursor::new(bytes);
410                        load_json_from_reader(reader_json, table_name, source_type, source_path)
411                            .with_context(|| format!("Failed to parse data from {}", source_path))
412                    }
413                }
414            }
415        }
416    }
417
418    /// Detect format from URL extension or content type
419    fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
420        // Check content type first
421        if content_type.contains("json") {
422            return DataFormat::JSON;
423        }
424        if content_type.contains("csv") || content_type.contains("text/plain") {
425            return DataFormat::CSV;
426        }
427
428        // Check URL extension
429        if url.ends_with(".json") {
430            DataFormat::JSON
431        } else if url.ends_with(".csv") {
432            DataFormat::CSV
433        } else {
434            // Default to auto-detect
435            DataFormat::Auto
436        }
437    }
438
439    /// Extract data from a specific JSON path
440    fn extract_json_path(
441        &self,
442        _table: DataTable,
443        json_path: &str,
444        bytes: &[u8],
445    ) -> Result<DataTable> {
446        // Parse the JSON
447        let json_value: serde_json::Value = serde_json::from_slice(bytes)
448            .with_context(|| "Failed to parse JSON for path extraction")?;
449
450        // Navigate to the specified path
451        let extracted = navigate_json_path(&json_value, json_path)
452            .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
453
454        // If the extracted value is already an array, use it directly
455        // Otherwise, wrap it in an array for consistent handling
456        let array_value = match extracted {
457            serde_json::Value::Array(_) => extracted.clone(),
458            _ => serde_json::Value::Array(vec![extracted.clone()]),
459        };
460
461        // Convert to bytes and parse as a table
462        let extracted_bytes = serde_json::to_vec(&array_value)?;
463
464        // Re-parse the extracted JSON as a DataTable
465        let reader = Cursor::new(extracted_bytes);
466        load_json_from_reader(reader, "extracted", "web", json_path)
467    }
468
469    /// Resolve environment variables in values (${VAR_NAME} or $VAR_NAME syntax)
470    fn resolve_env_var(&self, value: &str) -> Result<String> {
471        let mut result = value.to_string();
472
473        // Handle ${VAR} syntax - can be embedded in strings
474        // Use lazy_static for better performance, but for now just compile inline
475        let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
476        for cap in re.captures_iter(value) {
477            let var_name = &cap[1];
478            match std::env::var(var_name) {
479                Ok(var_value) => {
480                    result = result.replace(&cap[0], &var_value);
481                }
482                Err(_) => {
483                    // For security, don't expose which env vars exist
484                    // Just log a debug message and keep the placeholder
485                    debug!(
486                        "Environment variable {} not found, keeping placeholder",
487                        var_name
488                    );
489                }
490            }
491        }
492
493        // Also handle simple $VAR syntax at the start of the string
494        if result.starts_with('$') && !result.starts_with("${") {
495            let var_name = &result[1..];
496            if let Ok(var_value) = std::env::var(var_name) {
497                return Ok(var_value);
498            }
499        }
500
501        Ok(result)
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    #[test]
510    fn test_detect_format() {
511        let fetcher = WebDataFetcher::new().unwrap();
512
513        // Test URL-based detection
514        assert!(matches!(
515            fetcher.detect_format("http://example.com/data.csv", ""),
516            DataFormat::CSV
517        ));
518        assert!(matches!(
519            fetcher.detect_format("http://example.com/data.json", ""),
520            DataFormat::JSON
521        ));
522
523        // Test content-type detection
524        assert!(matches!(
525            fetcher.detect_format("http://example.com/data", "application/json"),
526            DataFormat::JSON
527        ));
528        assert!(matches!(
529            fetcher.detect_format("http://example.com/data", "text/csv"),
530            DataFormat::CSV
531        ));
532
533        // Test auto-detect fallback
534        assert!(matches!(
535            fetcher.detect_format("http://example.com/data", ""),
536            DataFormat::Auto
537        ));
538    }
539}