1use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{load_csv_from_reader, load_json_from_reader};
13use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
14
15pub struct WebDataFetcher {
17 client: reqwest::blocking::Client,
18}
19
20impl WebDataFetcher {
21 pub fn new() -> Result<Self> {
22 let client = reqwest::blocking::Client::builder()
23 .timeout(Duration::from_secs(30))
24 .user_agent("sql-cli/1.0")
25 .build()?;
26
27 Ok(Self { client })
28 }
29
30 pub fn fetch(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
32 info!("Fetching data from URL: {}", spec.url);
33
34 if spec.url.starts_with("file://") {
36 return self.fetch_file(spec, table_name);
37 }
38
39 let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
42 HttpMethod::GET => self.client.get(&spec.url),
43 HttpMethod::POST => self.client.post(&spec.url),
44 HttpMethod::PUT => self.client.put(&spec.url),
45 HttpMethod::DELETE => self.client.delete(&spec.url),
46 HttpMethod::PATCH => self.client.patch(&spec.url),
47 };
48
49 for (key, value) in &spec.headers {
51 let resolved_value = self.resolve_env_var(value)?;
52 request = request.header(key, resolved_value);
53 }
54
55 if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
57 let mut form = reqwest::blocking::multipart::Form::new();
58
59 for (field_name, file_path) in &spec.form_files {
61 let resolved_path = self.resolve_env_var(file_path)?;
62 let file = std::fs::File::open(&resolved_path)
63 .with_context(|| format!("Failed to open file: {}", resolved_path))?;
64 let file_name = std::path::Path::new(&resolved_path)
65 .file_name()
66 .and_then(|n| n.to_str())
67 .unwrap_or("file")
68 .to_string();
69 let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
70 form = form.part(field_name.clone(), part);
71 }
72
73 for (field_name, value) in &spec.form_fields {
75 let resolved_value = self.resolve_env_var(value)?;
76 form = form.text(field_name.clone(), resolved_value);
77 }
78
79 request = request.multipart(form);
80 }
81 else if let Some(body) = &spec.body {
83 let resolved_body = self.resolve_env_var(body)?;
84 request = request.body(resolved_body);
85 if spec.body.as_ref().unwrap().trim().starts_with('{') {
87 request = request.header("Content-Type", "application/json");
88 }
89 }
90
91 let response = request
93 .send()
94 .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
95
96 if !response.status().is_success() {
98 return Err(anyhow::anyhow!(
99 "HTTP request failed with status {}: {}",
100 response.status(),
101 spec.url
102 ));
103 }
104
105 let content_type = response
107 .headers()
108 .get("content-type")
109 .and_then(|v| v.to_str().ok())
110 .unwrap_or("")
111 .to_string();
112
113 debug!("Response content-type: {}", content_type);
114
115 let bytes = response.bytes()?;
117
118 let format = match &spec.format {
120 Some(fmt) => fmt.clone(),
121 None => self.detect_format(&spec.url, &content_type),
122 };
123
124 info!("Using format: {:?} for {}", format, spec.url);
125
126 if let Some(json_path) = &spec.json_path {
128 if matches!(format, DataFormat::JSON | DataFormat::Auto) {
129 let json_value: serde_json::Value = serde_json::from_slice(&bytes)
131 .with_context(|| "Failed to parse JSON for path extraction")?;
132
133 let extracted = self
135 .navigate_json_path(&json_value, json_path)
136 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
137
138 let array_value = match extracted {
140 serde_json::Value::Array(_) => extracted.clone(),
141 _ => serde_json::Value::Array(vec![extracted.clone()]),
142 };
143
144 let extracted_bytes = serde_json::to_vec(&array_value)?;
145 return self.parse_data(
146 extracted_bytes,
147 DataFormat::JSON,
148 table_name,
149 "web",
150 &spec.url,
151 );
152 }
153 }
154
155 self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)
157 }
158
159 fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
161 let file_path = if spec.url.starts_with("file://") {
163 &spec.url[7..] } else {
165 &spec.url
166 };
167
168 info!("Reading local file: {}", file_path);
169
170 let path = Path::new(file_path);
172 if !path.exists() {
173 return Err(anyhow::anyhow!("File not found: {}", file_path));
174 }
175
176 let file =
178 File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
179
180 let metadata = file.metadata()?;
182 let file_size = metadata.len();
183 debug!("File size: {} bytes", file_size);
184
185 let format = match &spec.format {
187 Some(fmt) => fmt.clone(),
188 None => self.detect_format(file_path, ""),
189 };
190
191 info!("Using format: {:?} for {}", format, file_path);
192
193 match format {
195 DataFormat::CSV => load_csv_from_reader(file, table_name, "file", file_path)
196 .with_context(|| format!("Failed to parse CSV from {}", file_path)),
197 DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
198 .with_context(|| format!("Failed to parse JSON from {}", file_path)),
199 DataFormat::Auto => {
200 if file_path.ends_with(".json") {
202 let file = File::open(path)?;
203 load_json_from_reader(file, table_name, "file", file_path)
204 .with_context(|| format!("Failed to parse JSON from {}", file_path))
205 } else {
206 let file = File::open(path)?;
208 load_csv_from_reader(file, table_name, "file", file_path)
209 .with_context(|| format!("Failed to parse CSV from {}", file_path))
210 }
211 }
212 }
213 }
214
215 fn parse_data(
217 &self,
218 bytes: Vec<u8>,
219 format: DataFormat,
220 table_name: &str,
221 source_type: &str,
222 source_path: &str,
223 ) -> Result<DataTable> {
224 match format {
225 DataFormat::CSV => {
226 let reader = Cursor::new(bytes);
227 load_csv_from_reader(reader, table_name, source_type, source_path)
228 .with_context(|| format!("Failed to parse CSV from {}", source_path))
229 }
230 DataFormat::JSON => {
231 let reader = Cursor::new(bytes);
232 load_json_from_reader(reader, table_name, source_type, source_path)
233 .with_context(|| format!("Failed to parse JSON from {}", source_path))
234 }
235 DataFormat::Auto => {
236 let reader_csv = Cursor::new(bytes.clone());
238 match load_csv_from_reader(reader_csv, table_name, source_type, source_path) {
239 Ok(table) => Ok(table),
240 Err(_) => {
241 debug!("CSV parsing failed, trying JSON");
242 let reader_json = Cursor::new(bytes);
243 load_json_from_reader(reader_json, table_name, source_type, source_path)
244 .with_context(|| format!("Failed to parse data from {}", source_path))
245 }
246 }
247 }
248 }
249 }
250
251 fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
253 if content_type.contains("json") {
255 return DataFormat::JSON;
256 }
257 if content_type.contains("csv") || content_type.contains("text/plain") {
258 return DataFormat::CSV;
259 }
260
261 if url.ends_with(".json") {
263 DataFormat::JSON
264 } else if url.ends_with(".csv") {
265 DataFormat::CSV
266 } else {
267 DataFormat::Auto
269 }
270 }
271
272 fn extract_json_path(
274 &self,
275 _table: DataTable,
276 json_path: &str,
277 bytes: &[u8],
278 ) -> Result<DataTable> {
279 let json_value: serde_json::Value = serde_json::from_slice(bytes)
281 .with_context(|| "Failed to parse JSON for path extraction")?;
282
283 let extracted = self
285 .navigate_json_path(&json_value, json_path)
286 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
287
288 let array_value = match extracted {
291 serde_json::Value::Array(_) => extracted.clone(),
292 _ => serde_json::Value::Array(vec![extracted.clone()]),
293 };
294
295 let extracted_bytes = serde_json::to_vec(&array_value)?;
297
298 let reader = Cursor::new(extracted_bytes);
300 load_json_from_reader(reader, "extracted", "web", json_path)
301 }
302
303 fn navigate_json_path<'a>(
305 &self,
306 value: &'a serde_json::Value,
307 path: &str,
308 ) -> Result<&'a serde_json::Value> {
309 let mut current = value;
310
311 for part in path.split('.') {
314 if part.is_empty() {
315 continue;
316 }
317
318 current = current
319 .get(part)
320 .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", part))?;
321 }
322
323 Ok(current)
324 }
325
326 fn resolve_env_var(&self, value: &str) -> Result<String> {
328 let mut result = value.to_string();
329
330 let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
333 for cap in re.captures_iter(value) {
334 let var_name = &cap[1];
335 match std::env::var(var_name) {
336 Ok(var_value) => {
337 result = result.replace(&cap[0], &var_value);
338 }
339 Err(_) => {
340 debug!(
343 "Environment variable {} not found, keeping placeholder",
344 var_name
345 );
346 }
347 }
348 }
349
350 if result.starts_with('$') && !result.starts_with("${") {
352 let var_name = &result[1..];
353 if let Ok(var_value) = std::env::var(var_name) {
354 return Ok(var_value);
355 }
356 }
357
358 Ok(result)
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn test_detect_format() {
368 let fetcher = WebDataFetcher::new().unwrap();
369
370 assert!(matches!(
372 fetcher.detect_format("http://example.com/data.csv", ""),
373 DataFormat::CSV
374 ));
375 assert!(matches!(
376 fetcher.detect_format("http://example.com/data.json", ""),
377 DataFormat::JSON
378 ));
379
380 assert!(matches!(
382 fetcher.detect_format("http://example.com/data", "application/json"),
383 DataFormat::JSON
384 ));
385 assert!(matches!(
386 fetcher.detect_format("http://example.com/data", "text/csv"),
387 DataFormat::CSV
388 ));
389
390 assert!(matches!(
392 fetcher.detect_format("http://example.com/data", ""),
393 DataFormat::Auto
394 ));
395 }
396}