1use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{load_csv_from_reader, load_json_from_reader};
13use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
14
15pub struct WebDataFetcher {
17 client: reqwest::blocking::Client,
18}
19
20impl WebDataFetcher {
21 pub fn new() -> Result<Self> {
22 let client = reqwest::blocking::Client::builder()
23 .timeout(Duration::from_secs(30))
24 .user_agent("sql-cli/1.0")
25 .build()?;
26
27 Ok(Self { client })
28 }
29
30 pub fn fetch(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
32 info!("Fetching data from URL: {}", spec.url);
33
34 if spec.url.starts_with("file://") {
36 return self.fetch_file(spec, table_name);
37 }
38
39 let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
42 HttpMethod::GET => self.client.get(&spec.url),
43 HttpMethod::POST => self.client.post(&spec.url),
44 HttpMethod::PUT => self.client.put(&spec.url),
45 HttpMethod::DELETE => self.client.delete(&spec.url),
46 HttpMethod::PATCH => self.client.patch(&spec.url),
47 };
48
49 for (key, value) in &spec.headers {
51 let resolved_value = self.resolve_env_var(value)?;
52 request = request.header(key, resolved_value);
53 }
54
55 if let Some(body) = &spec.body {
57 let resolved_body = self.resolve_env_var(body)?;
58 request = request.body(resolved_body);
59 if spec.body.as_ref().unwrap().trim().starts_with('{') {
61 request = request.header("Content-Type", "application/json");
62 }
63 }
64
65 let response = request
67 .send()
68 .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
69
70 if !response.status().is_success() {
72 return Err(anyhow::anyhow!(
73 "HTTP request failed with status {}: {}",
74 response.status(),
75 spec.url
76 ));
77 }
78
79 let content_type = response
81 .headers()
82 .get("content-type")
83 .and_then(|v| v.to_str().ok())
84 .unwrap_or("")
85 .to_string();
86
87 debug!("Response content-type: {}", content_type);
88
89 let bytes = response.bytes()?;
91
92 let format = match &spec.format {
94 Some(fmt) => fmt.clone(),
95 None => self.detect_format(&spec.url, &content_type),
96 };
97
98 info!("Using format: {:?} for {}", format, spec.url);
99
100 if let Some(json_path) = &spec.json_path {
102 if matches!(format, DataFormat::JSON | DataFormat::Auto) {
103 let json_value: serde_json::Value = serde_json::from_slice(&bytes)
105 .with_context(|| "Failed to parse JSON for path extraction")?;
106
107 let extracted = self
109 .navigate_json_path(&json_value, json_path)
110 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
111
112 let array_value = match extracted {
114 serde_json::Value::Array(_) => extracted.clone(),
115 _ => serde_json::Value::Array(vec![extracted.clone()]),
116 };
117
118 let extracted_bytes = serde_json::to_vec(&array_value)?;
119 return self.parse_data(
120 extracted_bytes,
121 DataFormat::JSON,
122 table_name,
123 "web",
124 &spec.url,
125 );
126 }
127 }
128
129 self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)
131 }
132
133 fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
135 let file_path = if spec.url.starts_with("file://") {
137 &spec.url[7..] } else {
139 &spec.url
140 };
141
142 info!("Reading local file: {}", file_path);
143
144 let path = Path::new(file_path);
146 if !path.exists() {
147 return Err(anyhow::anyhow!("File not found: {}", file_path));
148 }
149
150 let file =
152 File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
153
154 let metadata = file.metadata()?;
156 let file_size = metadata.len();
157 debug!("File size: {} bytes", file_size);
158
159 let format = match &spec.format {
161 Some(fmt) => fmt.clone(),
162 None => self.detect_format(file_path, ""),
163 };
164
165 info!("Using format: {:?} for {}", format, file_path);
166
167 match format {
169 DataFormat::CSV => load_csv_from_reader(file, table_name, "file", file_path)
170 .with_context(|| format!("Failed to parse CSV from {}", file_path)),
171 DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
172 .with_context(|| format!("Failed to parse JSON from {}", file_path)),
173 DataFormat::Auto => {
174 if file_path.ends_with(".json") {
176 let file = File::open(path)?;
177 load_json_from_reader(file, table_name, "file", file_path)
178 .with_context(|| format!("Failed to parse JSON from {}", file_path))
179 } else {
180 let file = File::open(path)?;
182 load_csv_from_reader(file, table_name, "file", file_path)
183 .with_context(|| format!("Failed to parse CSV from {}", file_path))
184 }
185 }
186 }
187 }
188
189 fn parse_data(
191 &self,
192 bytes: Vec<u8>,
193 format: DataFormat,
194 table_name: &str,
195 source_type: &str,
196 source_path: &str,
197 ) -> Result<DataTable> {
198 match format {
199 DataFormat::CSV => {
200 let reader = Cursor::new(bytes);
201 load_csv_from_reader(reader, table_name, source_type, source_path)
202 .with_context(|| format!("Failed to parse CSV from {}", source_path))
203 }
204 DataFormat::JSON => {
205 let reader = Cursor::new(bytes);
206 load_json_from_reader(reader, table_name, source_type, source_path)
207 .with_context(|| format!("Failed to parse JSON from {}", source_path))
208 }
209 DataFormat::Auto => {
210 let reader_csv = Cursor::new(bytes.clone());
212 match load_csv_from_reader(reader_csv, table_name, source_type, source_path) {
213 Ok(table) => Ok(table),
214 Err(_) => {
215 debug!("CSV parsing failed, trying JSON");
216 let reader_json = Cursor::new(bytes);
217 load_json_from_reader(reader_json, table_name, source_type, source_path)
218 .with_context(|| format!("Failed to parse data from {}", source_path))
219 }
220 }
221 }
222 }
223 }
224
225 fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
227 if content_type.contains("json") {
229 return DataFormat::JSON;
230 }
231 if content_type.contains("csv") || content_type.contains("text/plain") {
232 return DataFormat::CSV;
233 }
234
235 if url.ends_with(".json") {
237 DataFormat::JSON
238 } else if url.ends_with(".csv") {
239 DataFormat::CSV
240 } else {
241 DataFormat::Auto
243 }
244 }
245
246 fn extract_json_path(
248 &self,
249 _table: DataTable,
250 json_path: &str,
251 bytes: &[u8],
252 ) -> Result<DataTable> {
253 let json_value: serde_json::Value = serde_json::from_slice(bytes)
255 .with_context(|| "Failed to parse JSON for path extraction")?;
256
257 let extracted = self
259 .navigate_json_path(&json_value, json_path)
260 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
261
262 let array_value = match extracted {
265 serde_json::Value::Array(_) => extracted.clone(),
266 _ => serde_json::Value::Array(vec![extracted.clone()]),
267 };
268
269 let extracted_bytes = serde_json::to_vec(&array_value)?;
271
272 let reader = Cursor::new(extracted_bytes);
274 load_json_from_reader(reader, "extracted", "web", json_path)
275 }
276
277 fn navigate_json_path<'a>(
279 &self,
280 value: &'a serde_json::Value,
281 path: &str,
282 ) -> Result<&'a serde_json::Value> {
283 let mut current = value;
284
285 for part in path.split('.') {
288 if part.is_empty() {
289 continue;
290 }
291
292 current = current
293 .get(part)
294 .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", part))?;
295 }
296
297 Ok(current)
298 }
299
300 fn resolve_env_var(&self, value: &str) -> Result<String> {
302 let mut result = value.to_string();
303
304 let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
307 for cap in re.captures_iter(value) {
308 let var_name = &cap[1];
309 match std::env::var(var_name) {
310 Ok(var_value) => {
311 result = result.replace(&cap[0], &var_value);
312 }
313 Err(_) => {
314 debug!(
317 "Environment variable {} not found, keeping placeholder",
318 var_name
319 );
320 }
321 }
322 }
323
324 if result.starts_with('$') && !result.starts_with("${") {
326 let var_name = &result[1..];
327 if let Ok(var_value) = std::env::var(var_name) {
328 return Ok(var_value);
329 }
330 }
331
332 Ok(result)
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn test_detect_format() {
342 let fetcher = WebDataFetcher::new().unwrap();
343
344 assert!(matches!(
346 fetcher.detect_format("http://example.com/data.csv", ""),
347 DataFormat::CSV
348 ));
349 assert!(matches!(
350 fetcher.detect_format("http://example.com/data.json", ""),
351 DataFormat::JSON
352 ));
353
354 assert!(matches!(
356 fetcher.detect_format("http://example.com/data", "application/json"),
357 DataFormat::JSON
358 ));
359 assert!(matches!(
360 fetcher.detect_format("http://example.com/data", "text/csv"),
361 DataFormat::CSV
362 ));
363
364 assert!(matches!(
366 fetcher.detect_format("http://example.com/data", ""),
367 DataFormat::Auto
368 ));
369 }
370}