sql_cli/web/
http_fetcher.rs

1// HTTP fetcher for WEB CTEs (now supports file:// URLs too)
2use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{load_csv_from_reader, load_json_from_reader};
13use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
14
15#[cfg(feature = "redis-cache")]
16use crate::redis_cache_module::RedisCache;
17
18/// Fetches data from a URL and converts it to a DataTable
19pub struct WebDataFetcher {
20    client: reqwest::blocking::Client,
21}
22
23impl WebDataFetcher {
24    pub fn new() -> Result<Self> {
25        let client = reqwest::blocking::Client::builder()
26            .timeout(Duration::from_secs(30))
27            .user_agent("sql-cli/1.0")
28            .build()?;
29
30        Ok(Self { client })
31    }
32
33    /// Fetch data from a WEB CTE specification (supports http://, https://, and file:// URLs)
34    /// Each WEB CTE is cached independently based only on its own properties
35    pub fn fetch(
36        &self,
37        spec: &WebCTESpec,
38        table_name: &str,
39        _query_context: Option<&str>, // Kept for API compatibility but not used for caching
40    ) -> Result<DataTable> {
41        // Check if this is a file:// URL (no caching for local files)
42        if spec.url.starts_with("file://") {
43            return self.fetch_file(spec, table_name);
44        }
45
46        // Generate cache key from ALL Web CTE spec fields
47        // Each WEB CTE is independent - cache key depends only on the CTE itself
48        #[cfg(feature = "redis-cache")]
49        let cache_key = {
50            let method = format!("{:?}", spec.method.as_ref().unwrap_or(&HttpMethod::GET));
51
52            // Use the full cache key generation with all WebCTESpec fields
53            RedisCache::generate_key_full(
54                table_name, // CTE name
55                &spec.url,
56                Some(&method),
57                &spec.headers,
58                spec.body.as_deref(),
59                "",                        // Empty context - not used for independent caching
60                spec.json_path.as_deref(), // JSON extraction path
61                &spec.form_files,          // Multipart form files
62                &spec.form_fields,         // Multipart form fields
63            )
64        };
65
66        // Try cache first
67        #[cfg(feature = "redis-cache")]
68        {
69            let mut cache = RedisCache::new();
70            if cache.is_enabled() {
71                if let Some(cached_bytes) = cache.get(&cache_key) {
72                    // Try to deserialize from cached parquet
73                    match DataTable::from_parquet_bytes(&cached_bytes) {
74                        Ok(table) => {
75                            eprintln!(
76                                "Cache HIT for {} (key: {}...)",
77                                table_name,
78                                &cache_key[0..48.min(cache_key.len())]
79                            );
80                            return Ok(table);
81                        }
82                        Err(e) => {
83                            debug!("Failed to deserialize cached data: {}", e);
84                            // Continue to fetch from network
85                        }
86                    }
87                } else {
88                    eprintln!(
89                        "Cache MISS for {} (key: {}...)",
90                        table_name,
91                        &cache_key[0..48.min(cache_key.len())]
92                    );
93                }
94            }
95        }
96
97        info!("Fetching data from URL: {}", spec.url);
98
99        // Regular HTTP/HTTPS handling
100        // Build request based on method
101        let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
102            HttpMethod::GET => self.client.get(&spec.url),
103            HttpMethod::POST => self.client.post(&spec.url),
104            HttpMethod::PUT => self.client.put(&spec.url),
105            HttpMethod::DELETE => self.client.delete(&spec.url),
106            HttpMethod::PATCH => self.client.patch(&spec.url),
107        };
108
109        // Add headers if provided
110        for (key, value) in &spec.headers {
111            let resolved_value = self.resolve_env_var(value)?;
112            request = request.header(key, resolved_value);
113        }
114
115        // Handle multipart form data if form_files are specified
116        if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
117            let mut form = reqwest::blocking::multipart::Form::new();
118
119            // Add files
120            for (field_name, file_path) in &spec.form_files {
121                let resolved_path = self.resolve_env_var(file_path)?;
122                let file = std::fs::File::open(&resolved_path)
123                    .with_context(|| format!("Failed to open file: {}", resolved_path))?;
124                let file_name = std::path::Path::new(&resolved_path)
125                    .file_name()
126                    .and_then(|n| n.to_str())
127                    .unwrap_or("file")
128                    .to_string();
129                let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
130                form = form.part(field_name.clone(), part);
131            }
132
133            // Add regular form fields
134            for (field_name, value) in &spec.form_fields {
135                let resolved_value = self.resolve_env_var(value)?;
136                form = form.text(field_name.clone(), resolved_value);
137            }
138
139            request = request.multipart(form);
140        }
141        // Add body if provided (typically for POST/PUT/PATCH) - only if not using multipart
142        else if let Some(body) = &spec.body {
143            let resolved_body = self.resolve_env_var(body)?;
144            request = request.body(resolved_body);
145            // Set Content-Type to JSON if not already set and body looks like JSON
146            if spec.body.as_ref().unwrap().trim().starts_with('{') {
147                request = request.header("Content-Type", "application/json");
148            }
149        }
150
151        // Execute request
152        let response = request
153            .send()
154            .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
155
156        // Check status
157        if !response.status().is_success() {
158            return Err(anyhow::anyhow!(
159                "HTTP request failed with status {}: {}",
160                response.status(),
161                spec.url
162            ));
163        }
164
165        // Get content type for format detection
166        let content_type = response
167            .headers()
168            .get("content-type")
169            .and_then(|v| v.to_str().ok())
170            .unwrap_or("")
171            .to_string();
172
173        debug!("Response content-type: {}", content_type);
174
175        // Read response body
176        let bytes = response.bytes()?;
177
178        // Determine format
179        let format = match &spec.format {
180            Some(fmt) => fmt.clone(),
181            None => self.detect_format(&spec.url, &content_type),
182        };
183
184        info!("Using format: {:?} for {}", format, spec.url);
185
186        // Parse the data based on format
187        let result = if let Some(json_path) = &spec.json_path {
188            if matches!(format, DataFormat::JSON | DataFormat::Auto) {
189                // Parse JSON and extract the path
190                let json_value: serde_json::Value = serde_json::from_slice(&bytes)
191                    .with_context(|| "Failed to parse JSON for path extraction")?;
192
193                // Navigate to the specified path
194                let extracted = self
195                    .navigate_json_path(&json_value, json_path)
196                    .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
197
198                // Convert extracted value to bytes and parse as table
199                let array_value = match extracted {
200                    serde_json::Value::Array(_) => extracted.clone(),
201                    _ => serde_json::Value::Array(vec![extracted.clone()]),
202                };
203
204                let extracted_bytes = serde_json::to_vec(&array_value)?;
205                self.parse_data(
206                    extracted_bytes,
207                    DataFormat::JSON,
208                    table_name,
209                    "web",
210                    &spec.url,
211                )?
212            } else {
213                self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)?
214            }
215        } else {
216            self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)?
217        };
218
219        // Cache the result if caching is enabled
220        #[cfg(feature = "redis-cache")]
221        {
222            let mut cache = RedisCache::new();
223            if cache.is_enabled() {
224                // Determine TTL based on spec or smart defaults
225                let ttl = spec.cache_seconds.unwrap_or_else(|| {
226                    // Smart defaults based on URL and body
227                    if spec.url.contains("prod") {
228                        3600 // 1 hour for production
229                    } else if spec.url.contains("staging") {
230                        300 // 5 minutes for staging
231                    } else {
232                        600 // 10 minutes default
233                    }
234                });
235
236                // Serialize to parquet and cache
237                match result.to_parquet_bytes() {
238                    Ok(parquet_bytes) => {
239                        if let Err(e) = cache.set(&cache_key, &parquet_bytes, ttl) {
240                            debug!("Failed to cache result: {}", e);
241                        } else {
242                            eprintln!("Cached {} for {} seconds", table_name, ttl);
243                        }
244                    }
245                    Err(e) => {
246                        debug!("Failed to serialize to parquet: {}", e);
247                    }
248                }
249            }
250        }
251
252        Ok(result)
253    }
254
255    /// Fetch data from a file:// URL
256    fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
257        // Extract path from file:// URL
258        let file_path = if spec.url.starts_with("file://") {
259            &spec.url[7..] // Remove "file://" prefix
260        } else {
261            &spec.url
262        };
263
264        info!("Reading local file: {}", file_path);
265
266        // Check if file exists
267        let path = Path::new(file_path);
268        if !path.exists() {
269            return Err(anyhow::anyhow!("File not found: {}", file_path));
270        }
271
272        // Open file
273        let file =
274            File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
275
276        // Get file size for memory checks
277        let metadata = file.metadata()?;
278        let file_size = metadata.len();
279        debug!("File size: {} bytes", file_size);
280
281        // Determine format from extension or spec
282        let format = match &spec.format {
283            Some(fmt) => fmt.clone(),
284            None => self.detect_format(file_path, ""),
285        };
286
287        info!("Using format: {:?} for {}", format, file_path);
288
289        // Parse based on format
290        match format {
291            DataFormat::CSV => load_csv_from_reader(file, table_name, "file", file_path)
292                .with_context(|| format!("Failed to parse CSV from {}", file_path)),
293            DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
294                .with_context(|| format!("Failed to parse JSON from {}", file_path)),
295            DataFormat::Auto => {
296                // For files, we can't retry with the same reader, so determine based on extension
297                if file_path.ends_with(".json") {
298                    let file = File::open(path)?;
299                    load_json_from_reader(file, table_name, "file", file_path)
300                        .with_context(|| format!("Failed to parse JSON from {}", file_path))
301                } else {
302                    // Default to CSV for auto-detect with files
303                    let file = File::open(path)?;
304                    load_csv_from_reader(file, table_name, "file", file_path)
305                        .with_context(|| format!("Failed to parse CSV from {}", file_path))
306                }
307            }
308        }
309    }
310
311    /// Parse data bytes based on format
312    fn parse_data(
313        &self,
314        bytes: Vec<u8>,
315        format: DataFormat,
316        table_name: &str,
317        source_type: &str,
318        source_path: &str,
319    ) -> Result<DataTable> {
320        match format {
321            DataFormat::CSV => {
322                let reader = Cursor::new(bytes);
323                load_csv_from_reader(reader, table_name, source_type, source_path)
324                    .with_context(|| format!("Failed to parse CSV from {}", source_path))
325            }
326            DataFormat::JSON => {
327                let reader = Cursor::new(bytes);
328                load_json_from_reader(reader, table_name, source_type, source_path)
329                    .with_context(|| format!("Failed to parse JSON from {}", source_path))
330            }
331            DataFormat::Auto => {
332                // Try CSV first, then JSON
333                let reader_csv = Cursor::new(bytes.clone());
334                match load_csv_from_reader(reader_csv, table_name, source_type, source_path) {
335                    Ok(table) => Ok(table),
336                    Err(_) => {
337                        debug!("CSV parsing failed, trying JSON");
338                        let reader_json = Cursor::new(bytes);
339                        load_json_from_reader(reader_json, table_name, source_type, source_path)
340                            .with_context(|| format!("Failed to parse data from {}", source_path))
341                    }
342                }
343            }
344        }
345    }
346
347    /// Detect format from URL extension or content type
348    fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
349        // Check content type first
350        if content_type.contains("json") {
351            return DataFormat::JSON;
352        }
353        if content_type.contains("csv") || content_type.contains("text/plain") {
354            return DataFormat::CSV;
355        }
356
357        // Check URL extension
358        if url.ends_with(".json") {
359            DataFormat::JSON
360        } else if url.ends_with(".csv") {
361            DataFormat::CSV
362        } else {
363            // Default to auto-detect
364            DataFormat::Auto
365        }
366    }
367
368    /// Extract data from a specific JSON path
369    fn extract_json_path(
370        &self,
371        _table: DataTable,
372        json_path: &str,
373        bytes: &[u8],
374    ) -> Result<DataTable> {
375        // Parse the JSON
376        let json_value: serde_json::Value = serde_json::from_slice(bytes)
377            .with_context(|| "Failed to parse JSON for path extraction")?;
378
379        // Navigate to the specified path
380        let extracted = self
381            .navigate_json_path(&json_value, json_path)
382            .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
383
384        // If the extracted value is already an array, use it directly
385        // Otherwise, wrap it in an array for consistent handling
386        let array_value = match extracted {
387            serde_json::Value::Array(_) => extracted.clone(),
388            _ => serde_json::Value::Array(vec![extracted.clone()]),
389        };
390
391        // Convert to bytes and parse as a table
392        let extracted_bytes = serde_json::to_vec(&array_value)?;
393
394        // Re-parse the extracted JSON as a DataTable
395        let reader = Cursor::new(extracted_bytes);
396        load_json_from_reader(reader, "extracted", "web", json_path)
397    }
398
399    /// Navigate to a specific path in JSON structure
400    fn navigate_json_path<'a>(
401        &self,
402        value: &'a serde_json::Value,
403        path: &str,
404    ) -> Result<&'a serde_json::Value> {
405        let mut current = value;
406
407        // Split path by dots (simple path navigation for now)
408        // Future enhancement: support array indexing like "Result[0]"
409        for part in path.split('.') {
410            if part.is_empty() {
411                continue;
412            }
413
414            current = current
415                .get(part)
416                .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", part))?;
417        }
418
419        Ok(current)
420    }
421
422    /// Resolve environment variables in values (${VAR_NAME} or $VAR_NAME syntax)
423    fn resolve_env_var(&self, value: &str) -> Result<String> {
424        let mut result = value.to_string();
425
426        // Handle ${VAR} syntax - can be embedded in strings
427        // Use lazy_static for better performance, but for now just compile inline
428        let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
429        for cap in re.captures_iter(value) {
430            let var_name = &cap[1];
431            match std::env::var(var_name) {
432                Ok(var_value) => {
433                    result = result.replace(&cap[0], &var_value);
434                }
435                Err(_) => {
436                    // For security, don't expose which env vars exist
437                    // Just log a debug message and keep the placeholder
438                    debug!(
439                        "Environment variable {} not found, keeping placeholder",
440                        var_name
441                    );
442                }
443            }
444        }
445
446        // Also handle simple $VAR syntax at the start of the string
447        if result.starts_with('$') && !result.starts_with("${") {
448            let var_name = &result[1..];
449            if let Ok(var_value) = std::env::var(var_name) {
450                return Ok(var_value);
451            }
452        }
453
454        Ok(result)
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[test]
463    fn test_detect_format() {
464        let fetcher = WebDataFetcher::new().unwrap();
465
466        // Test URL-based detection
467        assert!(matches!(
468            fetcher.detect_format("http://example.com/data.csv", ""),
469            DataFormat::CSV
470        ));
471        assert!(matches!(
472            fetcher.detect_format("http://example.com/data.json", ""),
473            DataFormat::JSON
474        ));
475
476        // Test content-type detection
477        assert!(matches!(
478            fetcher.detect_format("http://example.com/data", "application/json"),
479            DataFormat::JSON
480        ));
481        assert!(matches!(
482            fetcher.detect_format("http://example.com/data", "text/csv"),
483            DataFormat::CSV
484        ));
485
486        // Test auto-detect fallback
487        assert!(matches!(
488            fetcher.detect_format("http://example.com/data", ""),
489            DataFormat::Auto
490        ));
491    }
492}