Skip to main content

sql_cli/web/
http_fetcher.rs

1// HTTP fetcher for WEB CTEs (now supports file:// URLs too)
2use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{
13    load_csv_from_reader_with_opts, load_json_from_reader, CsvReadOptions,
14};
15use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
16
17#[cfg(feature = "redis-cache")]
18use crate::redis_cache_module::RedisCache;
19
20/// Fetches data from a URL and converts it to a DataTable
21pub struct WebDataFetcher {
22    client: reqwest::blocking::Client,
23}
24
25impl WebDataFetcher {
26    pub fn new() -> Result<Self> {
27        let client = reqwest::blocking::Client::builder()
28            .timeout(Duration::from_secs(30))
29            .user_agent("sql-cli/1.0")
30            .build()?;
31
32        Ok(Self { client })
33    }
34
35    /// Fetch data from a WEB CTE specification (supports http://, https://, and file:// URLs)
36    /// Each WEB CTE is cached independently based only on its own properties
37    pub fn fetch(
38        &self,
39        spec: &WebCTESpec,
40        table_name: &str,
41        _query_context: Option<&str>, // Kept for API compatibility but not used for caching
42    ) -> Result<DataTable> {
43        // Check if this is a file:// URL (no caching for local files)
44        if spec.url.starts_with("file://") {
45            return self.fetch_file(spec, table_name);
46        }
47
48        // Generate cache key from ALL Web CTE spec fields
49        // Each WEB CTE is independent - cache key depends only on the CTE itself
50        #[cfg(feature = "redis-cache")]
51        let cache_key = {
52            let method = format!("{:?}", spec.method.as_ref().unwrap_or(&HttpMethod::GET));
53
54            // Use the full cache key generation with all WebCTESpec fields
55            RedisCache::generate_key_full(
56                table_name, // CTE name
57                &spec.url,
58                Some(&method),
59                &spec.headers,
60                spec.body.as_deref(),
61                "",                        // Empty context - not used for independent caching
62                spec.json_path.as_deref(), // JSON extraction path
63                &spec.form_files,          // Multipart form files
64                &spec.form_fields,         // Multipart form fields
65            )
66        };
67
68        // Try cache first
69        #[cfg(feature = "redis-cache")]
70        {
71            let mut cache = RedisCache::new();
72            if cache.is_enabled() {
73                if let Some(cached_bytes) = cache.get(&cache_key) {
74                    // Try to deserialize from cached parquet
75                    match DataTable::from_parquet_bytes(&cached_bytes) {
76                        Ok(table) => {
77                            eprintln!(
78                                "Cache HIT for {} (key: {}...)",
79                                table_name,
80                                &cache_key[0..48.min(cache_key.len())]
81                            );
82                            return Ok(table);
83                        }
84                        Err(e) => {
85                            debug!("Failed to deserialize cached data: {}", e);
86                            // Continue to fetch from network
87                        }
88                    }
89                } else {
90                    eprintln!(
91                        "Cache MISS for {} (key: {}...)",
92                        table_name,
93                        &cache_key[0..48.min(cache_key.len())]
94                    );
95                }
96            }
97        }
98
99        info!("Fetching data from URL: {}", spec.url);
100
101        // Regular HTTP/HTTPS handling
102        // Build request based on method
103        let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
104            HttpMethod::GET => self.client.get(&spec.url),
105            HttpMethod::POST => self.client.post(&spec.url),
106            HttpMethod::PUT => self.client.put(&spec.url),
107            HttpMethod::DELETE => self.client.delete(&spec.url),
108            HttpMethod::PATCH => self.client.patch(&spec.url),
109        };
110
111        // Add headers if provided
112        for (key, value) in &spec.headers {
113            let resolved_value = self.resolve_env_var(value)?;
114            request = request.header(key, resolved_value);
115        }
116
117        // Handle multipart form data if form_files are specified
118        if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
119            let mut form = reqwest::blocking::multipart::Form::new();
120
121            // Add files
122            for (field_name, file_path) in &spec.form_files {
123                let resolved_path = self.resolve_env_var(file_path)?;
124                let file = std::fs::File::open(&resolved_path)
125                    .with_context(|| format!("Failed to open file: {}", resolved_path))?;
126                let file_name = std::path::Path::new(&resolved_path)
127                    .file_name()
128                    .and_then(|n| n.to_str())
129                    .unwrap_or("file")
130                    .to_string();
131                let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
132                form = form.part(field_name.clone(), part);
133            }
134
135            // Add regular form fields
136            for (field_name, value) in &spec.form_fields {
137                let resolved_value = self.resolve_env_var(value)?;
138                form = form.text(field_name.clone(), resolved_value);
139            }
140
141            request = request.multipart(form);
142        }
143        // Add body if provided (typically for POST/PUT/PATCH) - only if not using multipart
144        else if let Some(body) = &spec.body {
145            let resolved_body = self.resolve_env_var(body)?;
146
147            // Debug: Always print the expanded body to help verify template expansion
148            eprintln!("\n=== WEB CTE Request Debug ===");
149            eprintln!("URL: {}", spec.url);
150            eprintln!(
151                "Method: {:?}",
152                spec.method.as_ref().unwrap_or(&HttpMethod::POST)
153            );
154            eprintln!("Body (after template expansion):");
155            eprintln!("{}", resolved_body);
156            eprintln!("=============================\n");
157
158            request = request.body(resolved_body);
159            // Set Content-Type to JSON only when the user didn't set one themselves
160            // and the body looks like JSON. ES 8.x and other strict servers reject
161            // requests with two Content-Type headers.
162            let has_content_type = spec
163                .headers
164                .iter()
165                .any(|(k, _)| k.eq_ignore_ascii_case("content-type"));
166            if !has_content_type && spec.body.as_ref().unwrap().trim().starts_with('{') {
167                request = request.header("Content-Type", "application/json");
168            }
169        }
170
171        // Execute request
172        let response = request
173            .send()
174            .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
175
176        // Check status
177        if !response.status().is_success() {
178            let status = response.status();
179            // Drain the body so callers see the server's diagnostic (ES, GraphQL,
180            // etc. put the real "why" here). Truncate so a giant HTML page can't
181            // flood the terminal.
182            let body = response.text().unwrap_or_default();
183            let snippet: String = body.chars().take(2000).collect();
184            return Err(anyhow::anyhow!(
185                "HTTP request failed with status {}: {}\nResponse body: {}",
186                status,
187                spec.url,
188                snippet
189            ));
190        }
191
192        // Get content type for format detection
193        let content_type = response
194            .headers()
195            .get("content-type")
196            .and_then(|v| v.to_str().ok())
197            .unwrap_or("")
198            .to_string();
199
200        debug!("Response content-type: {}", content_type);
201
202        // Read response body
203        let bytes = response.bytes()?;
204
205        // Determine format
206        let format = match &spec.format {
207            Some(fmt) => fmt.clone(),
208            None => self.detect_format(&spec.url, &content_type),
209        };
210
211        info!("Using format: {:?} for {}", format, spec.url);
212
213        // Parse the data based on format
214        let result = if let Some(json_path) = &spec.json_path {
215            if matches!(format, DataFormat::JSON | DataFormat::Auto) {
216                // Parse JSON and extract the path
217                let json_value: serde_json::Value = serde_json::from_slice(&bytes)
218                    .with_context(|| "Failed to parse JSON for path extraction")?;
219
220                // Navigate to the specified path
221                let extracted = self
222                    .navigate_json_path(&json_value, json_path)
223                    .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
224
225                // Convert extracted value to bytes and parse as table
226                let array_value = match extracted {
227                    serde_json::Value::Array(_) => extracted,
228                    other => serde_json::Value::Array(vec![other]),
229                };
230
231                let extracted_bytes = serde_json::to_vec(&array_value)?;
232                // JSON path: delimiter is irrelevant (output is JSON), but the
233                // arg is still required by the function signature.
234                self.parse_data(
235                    extracted_bytes,
236                    DataFormat::JSON,
237                    table_name,
238                    "web",
239                    &spec.url,
240                    spec.delimiter,
241                )?
242            } else {
243                self.parse_data(
244                    bytes.to_vec(),
245                    format,
246                    table_name,
247                    "web",
248                    &spec.url,
249                    spec.delimiter,
250                )?
251            }
252        } else {
253            self.parse_data(
254                bytes.to_vec(),
255                format,
256                table_name,
257                "web",
258                &spec.url,
259                spec.delimiter,
260            )?
261        };
262
263        // Cache the result if caching is enabled
264        #[cfg(feature = "redis-cache")]
265        {
266            let mut cache = RedisCache::new();
267            if cache.is_enabled() {
268                // Determine TTL based on spec or smart defaults
269                let ttl = spec.cache_seconds.unwrap_or_else(|| {
270                    // Smart defaults based on URL and body
271                    if spec.url.contains("prod") {
272                        3600 // 1 hour for production
273                    } else if spec.url.contains("staging") {
274                        300 // 5 minutes for staging
275                    } else {
276                        600 // 10 minutes default
277                    }
278                });
279
280                // Serialize to parquet and cache
281                match result.to_parquet_bytes() {
282                    Ok(parquet_bytes) => {
283                        if let Err(e) = cache.set(&cache_key, &parquet_bytes, ttl) {
284                            debug!("Failed to cache result: {}", e);
285                        } else {
286                            eprintln!("Cached {} for {} seconds", table_name, ttl);
287                        }
288                    }
289                    Err(e) => {
290                        debug!("Failed to serialize to parquet: {}", e);
291                    }
292                }
293            }
294        }
295
296        Ok(result)
297    }
298
299    /// Fetch data from a file:// URL
300    fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
301        // Extract path from file:// URL
302        let file_path = if spec.url.starts_with("file://") {
303            &spec.url[7..] // Remove "file://" prefix
304        } else {
305            &spec.url
306        };
307
308        info!("Reading local file: {}", file_path);
309
310        // Check if file exists
311        let path = Path::new(file_path);
312        if !path.exists() {
313            return Err(anyhow::anyhow!("File not found: {}", file_path));
314        }
315
316        // Open file
317        let file =
318            File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
319
320        // Get file size for memory checks
321        let metadata = file.metadata()?;
322        let file_size = metadata.len();
323        debug!("File size: {} bytes", file_size);
324
325        // Determine format from extension or spec
326        let format = match &spec.format {
327            Some(fmt) => fmt.clone(),
328            None => self.detect_format(file_path, ""),
329        };
330
331        info!("Using format: {:?} for {}", format, file_path);
332
333        // Build CSV options once; DELIMITER clause wins, else comma. URL paths
334        // are unreliable for extension auto-detect (query strings, redirects),
335        // so we don't probe them here.
336        let csv_opts = CsvReadOptions {
337            delimiter: spec.delimiter.unwrap_or(b','),
338            has_headers: true,
339        };
340
341        // Parse based on format
342        match format {
343            DataFormat::CSV => {
344                load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
345                    .with_context(|| format!("Failed to parse CSV from {}", file_path))
346            }
347            DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
348                .with_context(|| format!("Failed to parse JSON from {}", file_path)),
349            DataFormat::Auto => {
350                // For files, we can't retry with the same reader, so determine based on extension
351                if file_path.ends_with(".json") {
352                    let file = File::open(path)?;
353                    load_json_from_reader(file, table_name, "file", file_path)
354                        .with_context(|| format!("Failed to parse JSON from {}", file_path))
355                } else {
356                    // Default to CSV for auto-detect with files
357                    let file = File::open(path)?;
358                    load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
359                        .with_context(|| format!("Failed to parse CSV from {}", file_path))
360                }
361            }
362        }
363    }
364
365    /// Parse data bytes based on format. `delimiter` is honoured for CSV (and
366    /// Auto when it falls back to CSV); `None` means comma.
367    fn parse_data(
368        &self,
369        bytes: Vec<u8>,
370        format: DataFormat,
371        table_name: &str,
372        source_type: &str,
373        source_path: &str,
374        delimiter: Option<u8>,
375    ) -> Result<DataTable> {
376        let csv_opts = CsvReadOptions {
377            delimiter: delimiter.unwrap_or(b','),
378            has_headers: true,
379        };
380        match format {
381            DataFormat::CSV => {
382                let reader = Cursor::new(bytes);
383                load_csv_from_reader_with_opts(
384                    reader,
385                    table_name,
386                    source_type,
387                    source_path,
388                    &csv_opts,
389                )
390                .with_context(|| format!("Failed to parse CSV from {}", source_path))
391            }
392            DataFormat::JSON => {
393                let reader = Cursor::new(bytes);
394                load_json_from_reader(reader, table_name, source_type, source_path)
395                    .with_context(|| format!("Failed to parse JSON from {}", source_path))
396            }
397            DataFormat::Auto => {
398                // Try CSV first, then JSON
399                let reader_csv = Cursor::new(bytes.clone());
400                match load_csv_from_reader_with_opts(
401                    reader_csv,
402                    table_name,
403                    source_type,
404                    source_path,
405                    &csv_opts,
406                ) {
407                    Ok(table) => Ok(table),
408                    Err(_) => {
409                        debug!("CSV parsing failed, trying JSON");
410                        let reader_json = Cursor::new(bytes);
411                        load_json_from_reader(reader_json, table_name, source_type, source_path)
412                            .with_context(|| format!("Failed to parse data from {}", source_path))
413                    }
414                }
415            }
416        }
417    }
418
419    /// Detect format from URL extension or content type
420    fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
421        // Check content type first
422        if content_type.contains("json") {
423            return DataFormat::JSON;
424        }
425        if content_type.contains("csv") || content_type.contains("text/plain") {
426            return DataFormat::CSV;
427        }
428
429        // Check URL extension
430        if url.ends_with(".json") {
431            DataFormat::JSON
432        } else if url.ends_with(".csv") {
433            DataFormat::CSV
434        } else {
435            // Default to auto-detect
436            DataFormat::Auto
437        }
438    }
439
440    /// Extract data from a specific JSON path
441    fn extract_json_path(
442        &self,
443        _table: DataTable,
444        json_path: &str,
445        bytes: &[u8],
446    ) -> Result<DataTable> {
447        // Parse the JSON
448        let json_value: serde_json::Value = serde_json::from_slice(bytes)
449            .with_context(|| "Failed to parse JSON for path extraction")?;
450
451        // Navigate to the specified path
452        let extracted = self
453            .navigate_json_path(&json_value, json_path)
454            .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
455
456        // If the extracted value is already an array, use it directly
457        // Otherwise, wrap it in an array for consistent handling
458        let array_value = match extracted {
459            serde_json::Value::Array(_) => extracted.clone(),
460            _ => serde_json::Value::Array(vec![extracted.clone()]),
461        };
462
463        // Convert to bytes and parse as a table
464        let extracted_bytes = serde_json::to_vec(&array_value)?;
465
466        // Re-parse the extracted JSON as a DataTable
467        let reader = Cursor::new(extracted_bytes);
468        load_json_from_reader(reader, "extracted", "web", json_path)
469    }
470
471    /// Navigate to a specific path in JSON structure.
472    ///
473    /// Supports two forms per dotted segment:
474    ///   - `name`     — descend into an object key
475    ///   - `name[]`   — descend into `name` (must be an array), then map the
476    ///                  remainder of the path across every element
477    ///
478    /// Example for an ES response:
479    ///   `JSON_PATH 'hits.hits[]._source'`
480    /// returns an array of `_source` objects, one per hit — i.e. the
481    /// `_source` fields become the top-level row shape consumed by the loader.
482    fn navigate_json_path(
483        &self,
484        value: &serde_json::Value,
485        path: &str,
486    ) -> Result<serde_json::Value> {
487        let parts: Vec<&str> = path.split('.').filter(|p| !p.is_empty()).collect();
488        Self::walk_json_path(value, &parts)
489    }
490
491    fn walk_json_path(value: &serde_json::Value, parts: &[&str]) -> Result<serde_json::Value> {
492        let Some((head, tail)) = parts.split_first() else {
493            return Ok(value.clone());
494        };
495
496        // Array projection: `name[]` (or bare `[]`) maps the rest of the path
497        // across each element of an array.
498        if let Some(name) = head.strip_suffix("[]") {
499            let array_val = if name.is_empty() {
500                value
501            } else {
502                value
503                    .get(name)
504                    .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", name))?
505            };
506            let arr = array_val.as_array().ok_or_else(|| {
507                let kind = match array_val {
508                    serde_json::Value::Null => "null",
509                    serde_json::Value::Bool(_) => "bool",
510                    serde_json::Value::Number(_) => "number",
511                    serde_json::Value::String(_) => "string",
512                    serde_json::Value::Array(_) => "array",
513                    serde_json::Value::Object(_) => "object",
514                };
515                anyhow::anyhow!(
516                    "Expected array at '{}' for [] projection, got {}",
517                    if name.is_empty() { "<root>" } else { name },
518                    kind
519                )
520            })?;
521            let mut projected = Vec::with_capacity(arr.len());
522            for el in arr {
523                projected.push(Self::walk_json_path(el, tail)?);
524            }
525            return Ok(serde_json::Value::Array(projected));
526        }
527
528        let next = value
529            .get(head)
530            .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", head))?;
531        Self::walk_json_path(next, tail)
532    }
533
534    /// Resolve environment variables in values (${VAR_NAME} or $VAR_NAME syntax)
535    fn resolve_env_var(&self, value: &str) -> Result<String> {
536        let mut result = value.to_string();
537
538        // Handle ${VAR} syntax - can be embedded in strings
539        // Use lazy_static for better performance, but for now just compile inline
540        let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
541        for cap in re.captures_iter(value) {
542            let var_name = &cap[1];
543            match std::env::var(var_name) {
544                Ok(var_value) => {
545                    result = result.replace(&cap[0], &var_value);
546                }
547                Err(_) => {
548                    // For security, don't expose which env vars exist
549                    // Just log a debug message and keep the placeholder
550                    debug!(
551                        "Environment variable {} not found, keeping placeholder",
552                        var_name
553                    );
554                }
555            }
556        }
557
558        // Also handle simple $VAR syntax at the start of the string
559        if result.starts_with('$') && !result.starts_with("${") {
560            let var_name = &result[1..];
561            if let Ok(var_value) = std::env::var(var_name) {
562                return Ok(var_value);
563            }
564        }
565
566        Ok(result)
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573
574    #[test]
575    fn test_detect_format() {
576        let fetcher = WebDataFetcher::new().unwrap();
577
578        // Test URL-based detection
579        assert!(matches!(
580            fetcher.detect_format("http://example.com/data.csv", ""),
581            DataFormat::CSV
582        ));
583        assert!(matches!(
584            fetcher.detect_format("http://example.com/data.json", ""),
585            DataFormat::JSON
586        ));
587
588        // Test content-type detection
589        assert!(matches!(
590            fetcher.detect_format("http://example.com/data", "application/json"),
591            DataFormat::JSON
592        ));
593        assert!(matches!(
594            fetcher.detect_format("http://example.com/data", "text/csv"),
595            DataFormat::CSV
596        ));
597
598        // Test auto-detect fallback
599        assert!(matches!(
600            fetcher.detect_format("http://example.com/data", ""),
601            DataFormat::Auto
602        ));
603    }
604}