sql_cli/web/
http_fetcher.rs

1// HTTP fetcher for WEB CTEs (now supports file:// URLs too)
2use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{load_csv_from_reader, load_json_from_reader};
13use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
14
15/// Fetches data from a URL and converts it to a DataTable
16pub struct WebDataFetcher {
17    client: reqwest::blocking::Client,
18}
19
20impl WebDataFetcher {
21    pub fn new() -> Result<Self> {
22        let client = reqwest::blocking::Client::builder()
23            .timeout(Duration::from_secs(30))
24            .user_agent("sql-cli/1.0")
25            .build()?;
26
27        Ok(Self { client })
28    }
29
30    /// Fetch data from a WEB CTE specification (supports http://, https://, and file:// URLs)
31    pub fn fetch(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
32        info!("Fetching data from URL: {}", spec.url);
33
34        // Check if this is a file:// URL
35        if spec.url.starts_with("file://") {
36            return self.fetch_file(spec, table_name);
37        }
38
39        // Regular HTTP/HTTPS handling
40        // Build request based on method
41        let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
42            HttpMethod::GET => self.client.get(&spec.url),
43            HttpMethod::POST => self.client.post(&spec.url),
44            HttpMethod::PUT => self.client.put(&spec.url),
45            HttpMethod::DELETE => self.client.delete(&spec.url),
46            HttpMethod::PATCH => self.client.patch(&spec.url),
47        };
48
49        // Add headers if provided
50        for (key, value) in &spec.headers {
51            let resolved_value = self.resolve_env_var(value)?;
52            request = request.header(key, resolved_value);
53        }
54
55        // Handle multipart form data if form_files are specified
56        if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
57            let mut form = reqwest::blocking::multipart::Form::new();
58
59            // Add files
60            for (field_name, file_path) in &spec.form_files {
61                let resolved_path = self.resolve_env_var(file_path)?;
62                let file = std::fs::File::open(&resolved_path)
63                    .with_context(|| format!("Failed to open file: {}", resolved_path))?;
64                let file_name = std::path::Path::new(&resolved_path)
65                    .file_name()
66                    .and_then(|n| n.to_str())
67                    .unwrap_or("file")
68                    .to_string();
69                let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
70                form = form.part(field_name.clone(), part);
71            }
72
73            // Add regular form fields
74            for (field_name, value) in &spec.form_fields {
75                let resolved_value = self.resolve_env_var(value)?;
76                form = form.text(field_name.clone(), resolved_value);
77            }
78
79            request = request.multipart(form);
80        }
81        // Add body if provided (typically for POST/PUT/PATCH) - only if not using multipart
82        else if let Some(body) = &spec.body {
83            let resolved_body = self.resolve_env_var(body)?;
84            request = request.body(resolved_body);
85            // Set Content-Type to JSON if not already set and body looks like JSON
86            if spec.body.as_ref().unwrap().trim().starts_with('{') {
87                request = request.header("Content-Type", "application/json");
88            }
89        }
90
91        // Execute request
92        let response = request
93            .send()
94            .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
95
96        // Check status
97        if !response.status().is_success() {
98            return Err(anyhow::anyhow!(
99                "HTTP request failed with status {}: {}",
100                response.status(),
101                spec.url
102            ));
103        }
104
105        // Get content type for format detection
106        let content_type = response
107            .headers()
108            .get("content-type")
109            .and_then(|v| v.to_str().ok())
110            .unwrap_or("")
111            .to_string();
112
113        debug!("Response content-type: {}", content_type);
114
115        // Read response body
116        let bytes = response.bytes()?;
117
118        // Determine format
119        let format = match &spec.format {
120            Some(fmt) => fmt.clone(),
121            None => self.detect_format(&spec.url, &content_type),
122        };
123
124        info!("Using format: {:?} for {}", format, spec.url);
125
126        // If JSON_PATH is specified and format is JSON, extract the nested data first
127        if let Some(json_path) = &spec.json_path {
128            if matches!(format, DataFormat::JSON | DataFormat::Auto) {
129                // Parse JSON and extract the path
130                let json_value: serde_json::Value = serde_json::from_slice(&bytes)
131                    .with_context(|| "Failed to parse JSON for path extraction")?;
132
133                // Navigate to the specified path
134                let extracted = self
135                    .navigate_json_path(&json_value, json_path)
136                    .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
137
138                // Convert extracted value to bytes and parse as table
139                let array_value = match extracted {
140                    serde_json::Value::Array(_) => extracted.clone(),
141                    _ => serde_json::Value::Array(vec![extracted.clone()]),
142                };
143
144                let extracted_bytes = serde_json::to_vec(&array_value)?;
145                return self.parse_data(
146                    extracted_bytes,
147                    DataFormat::JSON,
148                    table_name,
149                    "web",
150                    &spec.url,
151                );
152            }
153        }
154
155        // Parse based on format without JSON path extraction
156        self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)
157    }
158
159    /// Fetch data from a file:// URL
160    fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
161        // Extract path from file:// URL
162        let file_path = if spec.url.starts_with("file://") {
163            &spec.url[7..] // Remove "file://" prefix
164        } else {
165            &spec.url
166        };
167
168        info!("Reading local file: {}", file_path);
169
170        // Check if file exists
171        let path = Path::new(file_path);
172        if !path.exists() {
173            return Err(anyhow::anyhow!("File not found: {}", file_path));
174        }
175
176        // Open file
177        let file =
178            File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
179
180        // Get file size for memory checks
181        let metadata = file.metadata()?;
182        let file_size = metadata.len();
183        debug!("File size: {} bytes", file_size);
184
185        // Determine format from extension or spec
186        let format = match &spec.format {
187            Some(fmt) => fmt.clone(),
188            None => self.detect_format(file_path, ""),
189        };
190
191        info!("Using format: {:?} for {}", format, file_path);
192
193        // Parse based on format
194        match format {
195            DataFormat::CSV => load_csv_from_reader(file, table_name, "file", file_path)
196                .with_context(|| format!("Failed to parse CSV from {}", file_path)),
197            DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
198                .with_context(|| format!("Failed to parse JSON from {}", file_path)),
199            DataFormat::Auto => {
200                // For files, we can't retry with the same reader, so determine based on extension
201                if file_path.ends_with(".json") {
202                    let file = File::open(path)?;
203                    load_json_from_reader(file, table_name, "file", file_path)
204                        .with_context(|| format!("Failed to parse JSON from {}", file_path))
205                } else {
206                    // Default to CSV for auto-detect with files
207                    let file = File::open(path)?;
208                    load_csv_from_reader(file, table_name, "file", file_path)
209                        .with_context(|| format!("Failed to parse CSV from {}", file_path))
210                }
211            }
212        }
213    }
214
215    /// Parse data bytes based on format
216    fn parse_data(
217        &self,
218        bytes: Vec<u8>,
219        format: DataFormat,
220        table_name: &str,
221        source_type: &str,
222        source_path: &str,
223    ) -> Result<DataTable> {
224        match format {
225            DataFormat::CSV => {
226                let reader = Cursor::new(bytes);
227                load_csv_from_reader(reader, table_name, source_type, source_path)
228                    .with_context(|| format!("Failed to parse CSV from {}", source_path))
229            }
230            DataFormat::JSON => {
231                let reader = Cursor::new(bytes);
232                load_json_from_reader(reader, table_name, source_type, source_path)
233                    .with_context(|| format!("Failed to parse JSON from {}", source_path))
234            }
235            DataFormat::Auto => {
236                // Try CSV first, then JSON
237                let reader_csv = Cursor::new(bytes.clone());
238                match load_csv_from_reader(reader_csv, table_name, source_type, source_path) {
239                    Ok(table) => Ok(table),
240                    Err(_) => {
241                        debug!("CSV parsing failed, trying JSON");
242                        let reader_json = Cursor::new(bytes);
243                        load_json_from_reader(reader_json, table_name, source_type, source_path)
244                            .with_context(|| format!("Failed to parse data from {}", source_path))
245                    }
246                }
247            }
248        }
249    }
250
251    /// Detect format from URL extension or content type
252    fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
253        // Check content type first
254        if content_type.contains("json") {
255            return DataFormat::JSON;
256        }
257        if content_type.contains("csv") || content_type.contains("text/plain") {
258            return DataFormat::CSV;
259        }
260
261        // Check URL extension
262        if url.ends_with(".json") {
263            DataFormat::JSON
264        } else if url.ends_with(".csv") {
265            DataFormat::CSV
266        } else {
267            // Default to auto-detect
268            DataFormat::Auto
269        }
270    }
271
272    /// Extract data from a specific JSON path
273    fn extract_json_path(
274        &self,
275        _table: DataTable,
276        json_path: &str,
277        bytes: &[u8],
278    ) -> Result<DataTable> {
279        // Parse the JSON
280        let json_value: serde_json::Value = serde_json::from_slice(bytes)
281            .with_context(|| "Failed to parse JSON for path extraction")?;
282
283        // Navigate to the specified path
284        let extracted = self
285            .navigate_json_path(&json_value, json_path)
286            .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
287
288        // If the extracted value is already an array, use it directly
289        // Otherwise, wrap it in an array for consistent handling
290        let array_value = match extracted {
291            serde_json::Value::Array(_) => extracted.clone(),
292            _ => serde_json::Value::Array(vec![extracted.clone()]),
293        };
294
295        // Convert to bytes and parse as a table
296        let extracted_bytes = serde_json::to_vec(&array_value)?;
297
298        // Re-parse the extracted JSON as a DataTable
299        let reader = Cursor::new(extracted_bytes);
300        load_json_from_reader(reader, "extracted", "web", json_path)
301    }
302
303    /// Navigate to a specific path in JSON structure
304    fn navigate_json_path<'a>(
305        &self,
306        value: &'a serde_json::Value,
307        path: &str,
308    ) -> Result<&'a serde_json::Value> {
309        let mut current = value;
310
311        // Split path by dots (simple path navigation for now)
312        // Future enhancement: support array indexing like "Result[0]"
313        for part in path.split('.') {
314            if part.is_empty() {
315                continue;
316            }
317
318            current = current
319                .get(part)
320                .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", part))?;
321        }
322
323        Ok(current)
324    }
325
326    /// Resolve environment variables in values (${VAR_NAME} or $VAR_NAME syntax)
327    fn resolve_env_var(&self, value: &str) -> Result<String> {
328        let mut result = value.to_string();
329
330        // Handle ${VAR} syntax - can be embedded in strings
331        // Use lazy_static for better performance, but for now just compile inline
332        let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
333        for cap in re.captures_iter(value) {
334            let var_name = &cap[1];
335            match std::env::var(var_name) {
336                Ok(var_value) => {
337                    result = result.replace(&cap[0], &var_value);
338                }
339                Err(_) => {
340                    // For security, don't expose which env vars exist
341                    // Just log a debug message and keep the placeholder
342                    debug!(
343                        "Environment variable {} not found, keeping placeholder",
344                        var_name
345                    );
346                }
347            }
348        }
349
350        // Also handle simple $VAR syntax at the start of the string
351        if result.starts_with('$') && !result.starts_with("${") {
352            let var_name = &result[1..];
353            if let Ok(var_value) = std::env::var(var_name) {
354                return Ok(var_value);
355            }
356        }
357
358        Ok(result)
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn test_detect_format() {
368        let fetcher = WebDataFetcher::new().unwrap();
369
370        // Test URL-based detection
371        assert!(matches!(
372            fetcher.detect_format("http://example.com/data.csv", ""),
373            DataFormat::CSV
374        ));
375        assert!(matches!(
376            fetcher.detect_format("http://example.com/data.json", ""),
377            DataFormat::JSON
378        ));
379
380        // Test content-type detection
381        assert!(matches!(
382            fetcher.detect_format("http://example.com/data", "application/json"),
383            DataFormat::JSON
384        ));
385        assert!(matches!(
386            fetcher.detect_format("http://example.com/data", "text/csv"),
387            DataFormat::CSV
388        ));
389
390        // Test auto-detect fallback
391        assert!(matches!(
392            fetcher.detect_format("http://example.com/data", ""),
393            DataFormat::Auto
394        ));
395    }
396}