Skip to main content

sql_cli/web/
http_fetcher.rs

1// HTTP fetcher for WEB CTEs (now supports file:// URLs too)
2use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{
13    load_csv_from_reader_with_opts, load_json_from_reader, CsvReadOptions,
14};
15use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
16
17#[cfg(feature = "redis-cache")]
18use crate::redis_cache_module::RedisCache;
19
20/// Fetches data from a URL and converts it to a DataTable
21pub struct WebDataFetcher {
22    client: reqwest::blocking::Client,
23}
24
25impl WebDataFetcher {
26    pub fn new() -> Result<Self> {
27        let client = reqwest::blocking::Client::builder()
28            .timeout(Duration::from_secs(30))
29            .user_agent("sql-cli/1.0")
30            .build()?;
31
32        Ok(Self { client })
33    }
34
35    /// Fetch data from a WEB CTE specification (supports http://, https://, and file:// URLs)
36    /// Each WEB CTE is cached independently based only on its own properties
37    pub fn fetch(
38        &self,
39        spec: &WebCTESpec,
40        table_name: &str,
41        _query_context: Option<&str>, // Kept for API compatibility but not used for caching
42    ) -> Result<DataTable> {
43        // Check if this is a file:// URL (no caching for local files)
44        if spec.url.starts_with("file://") {
45            return self.fetch_file(spec, table_name);
46        }
47
48        // Generate cache key from ALL Web CTE spec fields
49        // Each WEB CTE is independent - cache key depends only on the CTE itself
50        #[cfg(feature = "redis-cache")]
51        let cache_key = {
52            let method = format!("{:?}", spec.method.as_ref().unwrap_or(&HttpMethod::GET));
53
54            // Use the full cache key generation with all WebCTESpec fields
55            RedisCache::generate_key_full(
56                table_name, // CTE name
57                &spec.url,
58                Some(&method),
59                &spec.headers,
60                spec.body.as_deref(),
61                "",                        // Empty context - not used for independent caching
62                spec.json_path.as_deref(), // JSON extraction path
63                &spec.form_files,          // Multipart form files
64                &spec.form_fields,         // Multipart form fields
65            )
66        };
67
68        // Try cache first
69        #[cfg(feature = "redis-cache")]
70        {
71            let mut cache = RedisCache::new();
72            if cache.is_enabled() {
73                if let Some(cached_bytes) = cache.get(&cache_key) {
74                    // Try to deserialize from cached parquet
75                    match DataTable::from_parquet_bytes(&cached_bytes) {
76                        Ok(table) => {
77                            eprintln!(
78                                "Cache HIT for {} (key: {}...)",
79                                table_name,
80                                &cache_key[0..48.min(cache_key.len())]
81                            );
82                            return Ok(table);
83                        }
84                        Err(e) => {
85                            debug!("Failed to deserialize cached data: {}", e);
86                            // Continue to fetch from network
87                        }
88                    }
89                } else {
90                    eprintln!(
91                        "Cache MISS for {} (key: {}...)",
92                        table_name,
93                        &cache_key[0..48.min(cache_key.len())]
94                    );
95                }
96            }
97        }
98
99        info!("Fetching data from URL: {}", spec.url);
100
101        // Regular HTTP/HTTPS handling
102        // Build request based on method
103        let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
104            HttpMethod::GET => self.client.get(&spec.url),
105            HttpMethod::POST => self.client.post(&spec.url),
106            HttpMethod::PUT => self.client.put(&spec.url),
107            HttpMethod::DELETE => self.client.delete(&spec.url),
108            HttpMethod::PATCH => self.client.patch(&spec.url),
109        };
110
111        // Add headers if provided
112        for (key, value) in &spec.headers {
113            let resolved_value = self.resolve_env_var(value)?;
114            request = request.header(key, resolved_value);
115        }
116
117        // Handle multipart form data if form_files are specified
118        if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
119            let mut form = reqwest::blocking::multipart::Form::new();
120
121            // Add files
122            for (field_name, file_path) in &spec.form_files {
123                let resolved_path = self.resolve_env_var(file_path)?;
124                let file = std::fs::File::open(&resolved_path)
125                    .with_context(|| format!("Failed to open file: {}", resolved_path))?;
126                let file_name = std::path::Path::new(&resolved_path)
127                    .file_name()
128                    .and_then(|n| n.to_str())
129                    .unwrap_or("file")
130                    .to_string();
131                let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
132                form = form.part(field_name.clone(), part);
133            }
134
135            // Add regular form fields
136            for (field_name, value) in &spec.form_fields {
137                let resolved_value = self.resolve_env_var(value)?;
138                form = form.text(field_name.clone(), resolved_value);
139            }
140
141            request = request.multipart(form);
142        }
143        // Add body if provided (typically for POST/PUT/PATCH) - only if not using multipart
144        else if let Some(body) = &spec.body {
145            let resolved_body = self.resolve_env_var(body)?;
146
147            // Debug: Always print the expanded body to help verify template expansion
148            eprintln!("\n=== WEB CTE Request Debug ===");
149            eprintln!("URL: {}", spec.url);
150            eprintln!(
151                "Method: {:?}",
152                spec.method.as_ref().unwrap_or(&HttpMethod::POST)
153            );
154            eprintln!("Body (after template expansion):");
155            eprintln!("{}", resolved_body);
156            eprintln!("=============================\n");
157
158            request = request.body(resolved_body);
159            // Set Content-Type to JSON if not already set and body looks like JSON
160            if spec.body.as_ref().unwrap().trim().starts_with('{') {
161                request = request.header("Content-Type", "application/json");
162            }
163        }
164
165        // Execute request
166        let response = request
167            .send()
168            .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
169
170        // Check status
171        if !response.status().is_success() {
172            return Err(anyhow::anyhow!(
173                "HTTP request failed with status {}: {}",
174                response.status(),
175                spec.url
176            ));
177        }
178
179        // Get content type for format detection
180        let content_type = response
181            .headers()
182            .get("content-type")
183            .and_then(|v| v.to_str().ok())
184            .unwrap_or("")
185            .to_string();
186
187        debug!("Response content-type: {}", content_type);
188
189        // Read response body
190        let bytes = response.bytes()?;
191
192        // Determine format
193        let format = match &spec.format {
194            Some(fmt) => fmt.clone(),
195            None => self.detect_format(&spec.url, &content_type),
196        };
197
198        info!("Using format: {:?} for {}", format, spec.url);
199
200        // Parse the data based on format
201        let result = if let Some(json_path) = &spec.json_path {
202            if matches!(format, DataFormat::JSON | DataFormat::Auto) {
203                // Parse JSON and extract the path
204                let json_value: serde_json::Value = serde_json::from_slice(&bytes)
205                    .with_context(|| "Failed to parse JSON for path extraction")?;
206
207                // Navigate to the specified path
208                let extracted = self
209                    .navigate_json_path(&json_value, json_path)
210                    .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
211
212                // Convert extracted value to bytes and parse as table
213                let array_value = match extracted {
214                    serde_json::Value::Array(_) => extracted.clone(),
215                    _ => serde_json::Value::Array(vec![extracted.clone()]),
216                };
217
218                let extracted_bytes = serde_json::to_vec(&array_value)?;
219                // JSON path: delimiter is irrelevant (output is JSON), but the
220                // arg is still required by the function signature.
221                self.parse_data(
222                    extracted_bytes,
223                    DataFormat::JSON,
224                    table_name,
225                    "web",
226                    &spec.url,
227                    spec.delimiter,
228                )?
229            } else {
230                self.parse_data(
231                    bytes.to_vec(),
232                    format,
233                    table_name,
234                    "web",
235                    &spec.url,
236                    spec.delimiter,
237                )?
238            }
239        } else {
240            self.parse_data(
241                bytes.to_vec(),
242                format,
243                table_name,
244                "web",
245                &spec.url,
246                spec.delimiter,
247            )?
248        };
249
250        // Cache the result if caching is enabled
251        #[cfg(feature = "redis-cache")]
252        {
253            let mut cache = RedisCache::new();
254            if cache.is_enabled() {
255                // Determine TTL based on spec or smart defaults
256                let ttl = spec.cache_seconds.unwrap_or_else(|| {
257                    // Smart defaults based on URL and body
258                    if spec.url.contains("prod") {
259                        3600 // 1 hour for production
260                    } else if spec.url.contains("staging") {
261                        300 // 5 minutes for staging
262                    } else {
263                        600 // 10 minutes default
264                    }
265                });
266
267                // Serialize to parquet and cache
268                match result.to_parquet_bytes() {
269                    Ok(parquet_bytes) => {
270                        if let Err(e) = cache.set(&cache_key, &parquet_bytes, ttl) {
271                            debug!("Failed to cache result: {}", e);
272                        } else {
273                            eprintln!("Cached {} for {} seconds", table_name, ttl);
274                        }
275                    }
276                    Err(e) => {
277                        debug!("Failed to serialize to parquet: {}", e);
278                    }
279                }
280            }
281        }
282
283        Ok(result)
284    }
285
286    /// Fetch data from a file:// URL
287    fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
288        // Extract path from file:// URL
289        let file_path = if spec.url.starts_with("file://") {
290            &spec.url[7..] // Remove "file://" prefix
291        } else {
292            &spec.url
293        };
294
295        info!("Reading local file: {}", file_path);
296
297        // Check if file exists
298        let path = Path::new(file_path);
299        if !path.exists() {
300            return Err(anyhow::anyhow!("File not found: {}", file_path));
301        }
302
303        // Open file
304        let file =
305            File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
306
307        // Get file size for memory checks
308        let metadata = file.metadata()?;
309        let file_size = metadata.len();
310        debug!("File size: {} bytes", file_size);
311
312        // Determine format from extension or spec
313        let format = match &spec.format {
314            Some(fmt) => fmt.clone(),
315            None => self.detect_format(file_path, ""),
316        };
317
318        info!("Using format: {:?} for {}", format, file_path);
319
320        // Build CSV options once; DELIMITER clause wins, else comma. URL paths
321        // are unreliable for extension auto-detect (query strings, redirects),
322        // so we don't probe them here.
323        let csv_opts = CsvReadOptions {
324            delimiter: spec.delimiter.unwrap_or(b','),
325            has_headers: true,
326        };
327
328        // Parse based on format
329        match format {
330            DataFormat::CSV => {
331                load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
332                    .with_context(|| format!("Failed to parse CSV from {}", file_path))
333            }
334            DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
335                .with_context(|| format!("Failed to parse JSON from {}", file_path)),
336            DataFormat::Auto => {
337                // For files, we can't retry with the same reader, so determine based on extension
338                if file_path.ends_with(".json") {
339                    let file = File::open(path)?;
340                    load_json_from_reader(file, table_name, "file", file_path)
341                        .with_context(|| format!("Failed to parse JSON from {}", file_path))
342                } else {
343                    // Default to CSV for auto-detect with files
344                    let file = File::open(path)?;
345                    load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
346                        .with_context(|| format!("Failed to parse CSV from {}", file_path))
347                }
348            }
349        }
350    }
351
352    /// Parse data bytes based on format. `delimiter` is honoured for CSV (and
353    /// Auto when it falls back to CSV); `None` means comma.
354    fn parse_data(
355        &self,
356        bytes: Vec<u8>,
357        format: DataFormat,
358        table_name: &str,
359        source_type: &str,
360        source_path: &str,
361        delimiter: Option<u8>,
362    ) -> Result<DataTable> {
363        let csv_opts = CsvReadOptions {
364            delimiter: delimiter.unwrap_or(b','),
365            has_headers: true,
366        };
367        match format {
368            DataFormat::CSV => {
369                let reader = Cursor::new(bytes);
370                load_csv_from_reader_with_opts(
371                    reader,
372                    table_name,
373                    source_type,
374                    source_path,
375                    &csv_opts,
376                )
377                .with_context(|| format!("Failed to parse CSV from {}", source_path))
378            }
379            DataFormat::JSON => {
380                let reader = Cursor::new(bytes);
381                load_json_from_reader(reader, table_name, source_type, source_path)
382                    .with_context(|| format!("Failed to parse JSON from {}", source_path))
383            }
384            DataFormat::Auto => {
385                // Try CSV first, then JSON
386                let reader_csv = Cursor::new(bytes.clone());
387                match load_csv_from_reader_with_opts(
388                    reader_csv,
389                    table_name,
390                    source_type,
391                    source_path,
392                    &csv_opts,
393                ) {
394                    Ok(table) => Ok(table),
395                    Err(_) => {
396                        debug!("CSV parsing failed, trying JSON");
397                        let reader_json = Cursor::new(bytes);
398                        load_json_from_reader(reader_json, table_name, source_type, source_path)
399                            .with_context(|| format!("Failed to parse data from {}", source_path))
400                    }
401                }
402            }
403        }
404    }
405
406    /// Detect format from URL extension or content type
407    fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
408        // Check content type first
409        if content_type.contains("json") {
410            return DataFormat::JSON;
411        }
412        if content_type.contains("csv") || content_type.contains("text/plain") {
413            return DataFormat::CSV;
414        }
415
416        // Check URL extension
417        if url.ends_with(".json") {
418            DataFormat::JSON
419        } else if url.ends_with(".csv") {
420            DataFormat::CSV
421        } else {
422            // Default to auto-detect
423            DataFormat::Auto
424        }
425    }
426
427    /// Extract data from a specific JSON path
428    fn extract_json_path(
429        &self,
430        _table: DataTable,
431        json_path: &str,
432        bytes: &[u8],
433    ) -> Result<DataTable> {
434        // Parse the JSON
435        let json_value: serde_json::Value = serde_json::from_slice(bytes)
436            .with_context(|| "Failed to parse JSON for path extraction")?;
437
438        // Navigate to the specified path
439        let extracted = self
440            .navigate_json_path(&json_value, json_path)
441            .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
442
443        // If the extracted value is already an array, use it directly
444        // Otherwise, wrap it in an array for consistent handling
445        let array_value = match extracted {
446            serde_json::Value::Array(_) => extracted.clone(),
447            _ => serde_json::Value::Array(vec![extracted.clone()]),
448        };
449
450        // Convert to bytes and parse as a table
451        let extracted_bytes = serde_json::to_vec(&array_value)?;
452
453        // Re-parse the extracted JSON as a DataTable
454        let reader = Cursor::new(extracted_bytes);
455        load_json_from_reader(reader, "extracted", "web", json_path)
456    }
457
458    /// Navigate to a specific path in JSON structure
459    fn navigate_json_path<'a>(
460        &self,
461        value: &'a serde_json::Value,
462        path: &str,
463    ) -> Result<&'a serde_json::Value> {
464        let mut current = value;
465
466        // Split path by dots (simple path navigation for now)
467        // Future enhancement: support array indexing like "Result[0]"
468        for part in path.split('.') {
469            if part.is_empty() {
470                continue;
471            }
472
473            current = current
474                .get(part)
475                .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", part))?;
476        }
477
478        Ok(current)
479    }
480
481    /// Resolve environment variables in values (${VAR_NAME} or $VAR_NAME syntax)
482    fn resolve_env_var(&self, value: &str) -> Result<String> {
483        let mut result = value.to_string();
484
485        // Handle ${VAR} syntax - can be embedded in strings
486        // Use lazy_static for better performance, but for now just compile inline
487        let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
488        for cap in re.captures_iter(value) {
489            let var_name = &cap[1];
490            match std::env::var(var_name) {
491                Ok(var_value) => {
492                    result = result.replace(&cap[0], &var_value);
493                }
494                Err(_) => {
495                    // For security, don't expose which env vars exist
496                    // Just log a debug message and keep the placeholder
497                    debug!(
498                        "Environment variable {} not found, keeping placeholder",
499                        var_name
500                    );
501                }
502            }
503        }
504
505        // Also handle simple $VAR syntax at the start of the string
506        if result.starts_with('$') && !result.starts_with("${") {
507            let var_name = &result[1..];
508            if let Ok(var_value) = std::env::var(var_name) {
509                return Ok(var_value);
510            }
511        }
512
513        Ok(result)
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520
521    #[test]
522    fn test_detect_format() {
523        let fetcher = WebDataFetcher::new().unwrap();
524
525        // Test URL-based detection
526        assert!(matches!(
527            fetcher.detect_format("http://example.com/data.csv", ""),
528            DataFormat::CSV
529        ));
530        assert!(matches!(
531            fetcher.detect_format("http://example.com/data.json", ""),
532            DataFormat::JSON
533        ));
534
535        // Test content-type detection
536        assert!(matches!(
537            fetcher.detect_format("http://example.com/data", "application/json"),
538            DataFormat::JSON
539        ));
540        assert!(matches!(
541            fetcher.detect_format("http://example.com/data", "text/csv"),
542            DataFormat::CSV
543        ));
544
545        // Test auto-detect fallback
546        assert!(matches!(
547            fetcher.detect_format("http://example.com/data", ""),
548            DataFormat::Auto
549        ));
550    }
551}