1use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{load_csv_from_reader, load_json_from_reader};
13use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
14
15#[cfg(feature = "redis-cache")]
16use crate::redis_cache_module::RedisCache;
17
18pub struct WebDataFetcher {
20 client: reqwest::blocking::Client,
21}
22
23impl WebDataFetcher {
24 pub fn new() -> Result<Self> {
25 let client = reqwest::blocking::Client::builder()
26 .timeout(Duration::from_secs(30))
27 .user_agent("sql-cli/1.0")
28 .build()?;
29
30 Ok(Self { client })
31 }
32
33 pub fn fetch(
36 &self,
37 spec: &WebCTESpec,
38 table_name: &str,
39 _query_context: Option<&str>, ) -> Result<DataTable> {
41 if spec.url.starts_with("file://") {
43 return self.fetch_file(spec, table_name);
44 }
45
46 #[cfg(feature = "redis-cache")]
49 let cache_key = {
50 let method = format!("{:?}", spec.method.as_ref().unwrap_or(&HttpMethod::GET));
51
52 RedisCache::generate_key_full(
54 table_name, &spec.url,
56 Some(&method),
57 &spec.headers,
58 spec.body.as_deref(),
59 "", spec.json_path.as_deref(), &spec.form_files, &spec.form_fields, )
64 };
65
66 #[cfg(feature = "redis-cache")]
68 {
69 let mut cache = RedisCache::new();
70 if cache.is_enabled() {
71 if let Some(cached_bytes) = cache.get(&cache_key) {
72 match DataTable::from_parquet_bytes(&cached_bytes) {
74 Ok(table) => {
75 eprintln!(
76 "Cache HIT for {} (key: {}...)",
77 table_name,
78 &cache_key[0..48.min(cache_key.len())]
79 );
80 return Ok(table);
81 }
82 Err(e) => {
83 debug!("Failed to deserialize cached data: {}", e);
84 }
86 }
87 } else {
88 eprintln!(
89 "Cache MISS for {} (key: {}...)",
90 table_name,
91 &cache_key[0..48.min(cache_key.len())]
92 );
93 }
94 }
95 }
96
97 info!("Fetching data from URL: {}", spec.url);
98
99 let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
102 HttpMethod::GET => self.client.get(&spec.url),
103 HttpMethod::POST => self.client.post(&spec.url),
104 HttpMethod::PUT => self.client.put(&spec.url),
105 HttpMethod::DELETE => self.client.delete(&spec.url),
106 HttpMethod::PATCH => self.client.patch(&spec.url),
107 };
108
109 for (key, value) in &spec.headers {
111 let resolved_value = self.resolve_env_var(value)?;
112 request = request.header(key, resolved_value);
113 }
114
115 if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
117 let mut form = reqwest::blocking::multipart::Form::new();
118
119 for (field_name, file_path) in &spec.form_files {
121 let resolved_path = self.resolve_env_var(file_path)?;
122 let file = std::fs::File::open(&resolved_path)
123 .with_context(|| format!("Failed to open file: {}", resolved_path))?;
124 let file_name = std::path::Path::new(&resolved_path)
125 .file_name()
126 .and_then(|n| n.to_str())
127 .unwrap_or("file")
128 .to_string();
129 let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
130 form = form.part(field_name.clone(), part);
131 }
132
133 for (field_name, value) in &spec.form_fields {
135 let resolved_value = self.resolve_env_var(value)?;
136 form = form.text(field_name.clone(), resolved_value);
137 }
138
139 request = request.multipart(form);
140 }
141 else if let Some(body) = &spec.body {
143 let resolved_body = self.resolve_env_var(body)?;
144 request = request.body(resolved_body);
145 if spec.body.as_ref().unwrap().trim().starts_with('{') {
147 request = request.header("Content-Type", "application/json");
148 }
149 }
150
151 let response = request
153 .send()
154 .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
155
156 if !response.status().is_success() {
158 return Err(anyhow::anyhow!(
159 "HTTP request failed with status {}: {}",
160 response.status(),
161 spec.url
162 ));
163 }
164
165 let content_type = response
167 .headers()
168 .get("content-type")
169 .and_then(|v| v.to_str().ok())
170 .unwrap_or("")
171 .to_string();
172
173 debug!("Response content-type: {}", content_type);
174
175 let bytes = response.bytes()?;
177
178 let format = match &spec.format {
180 Some(fmt) => fmt.clone(),
181 None => self.detect_format(&spec.url, &content_type),
182 };
183
184 info!("Using format: {:?} for {}", format, spec.url);
185
186 let result = if let Some(json_path) = &spec.json_path {
188 if matches!(format, DataFormat::JSON | DataFormat::Auto) {
189 let json_value: serde_json::Value = serde_json::from_slice(&bytes)
191 .with_context(|| "Failed to parse JSON for path extraction")?;
192
193 let extracted = self
195 .navigate_json_path(&json_value, json_path)
196 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
197
198 let array_value = match extracted {
200 serde_json::Value::Array(_) => extracted.clone(),
201 _ => serde_json::Value::Array(vec![extracted.clone()]),
202 };
203
204 let extracted_bytes = serde_json::to_vec(&array_value)?;
205 self.parse_data(
206 extracted_bytes,
207 DataFormat::JSON,
208 table_name,
209 "web",
210 &spec.url,
211 )?
212 } else {
213 self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)?
214 }
215 } else {
216 self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)?
217 };
218
219 #[cfg(feature = "redis-cache")]
221 {
222 let mut cache = RedisCache::new();
223 if cache.is_enabled() {
224 let ttl = spec.cache_seconds.unwrap_or_else(|| {
226 if spec.url.contains("prod") {
228 3600 } else if spec.url.contains("staging") {
230 300 } else {
232 600 }
234 });
235
236 match result.to_parquet_bytes() {
238 Ok(parquet_bytes) => {
239 if let Err(e) = cache.set(&cache_key, &parquet_bytes, ttl) {
240 debug!("Failed to cache result: {}", e);
241 } else {
242 eprintln!("Cached {} for {} seconds", table_name, ttl);
243 }
244 }
245 Err(e) => {
246 debug!("Failed to serialize to parquet: {}", e);
247 }
248 }
249 }
250 }
251
252 Ok(result)
253 }
254
255 fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
257 let file_path = if spec.url.starts_with("file://") {
259 &spec.url[7..] } else {
261 &spec.url
262 };
263
264 info!("Reading local file: {}", file_path);
265
266 let path = Path::new(file_path);
268 if !path.exists() {
269 return Err(anyhow::anyhow!("File not found: {}", file_path));
270 }
271
272 let file =
274 File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
275
276 let metadata = file.metadata()?;
278 let file_size = metadata.len();
279 debug!("File size: {} bytes", file_size);
280
281 let format = match &spec.format {
283 Some(fmt) => fmt.clone(),
284 None => self.detect_format(file_path, ""),
285 };
286
287 info!("Using format: {:?} for {}", format, file_path);
288
289 match format {
291 DataFormat::CSV => load_csv_from_reader(file, table_name, "file", file_path)
292 .with_context(|| format!("Failed to parse CSV from {}", file_path)),
293 DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
294 .with_context(|| format!("Failed to parse JSON from {}", file_path)),
295 DataFormat::Auto => {
296 if file_path.ends_with(".json") {
298 let file = File::open(path)?;
299 load_json_from_reader(file, table_name, "file", file_path)
300 .with_context(|| format!("Failed to parse JSON from {}", file_path))
301 } else {
302 let file = File::open(path)?;
304 load_csv_from_reader(file, table_name, "file", file_path)
305 .with_context(|| format!("Failed to parse CSV from {}", file_path))
306 }
307 }
308 }
309 }
310
311 fn parse_data(
313 &self,
314 bytes: Vec<u8>,
315 format: DataFormat,
316 table_name: &str,
317 source_type: &str,
318 source_path: &str,
319 ) -> Result<DataTable> {
320 match format {
321 DataFormat::CSV => {
322 let reader = Cursor::new(bytes);
323 load_csv_from_reader(reader, table_name, source_type, source_path)
324 .with_context(|| format!("Failed to parse CSV from {}", source_path))
325 }
326 DataFormat::JSON => {
327 let reader = Cursor::new(bytes);
328 load_json_from_reader(reader, table_name, source_type, source_path)
329 .with_context(|| format!("Failed to parse JSON from {}", source_path))
330 }
331 DataFormat::Auto => {
332 let reader_csv = Cursor::new(bytes.clone());
334 match load_csv_from_reader(reader_csv, table_name, source_type, source_path) {
335 Ok(table) => Ok(table),
336 Err(_) => {
337 debug!("CSV parsing failed, trying JSON");
338 let reader_json = Cursor::new(bytes);
339 load_json_from_reader(reader_json, table_name, source_type, source_path)
340 .with_context(|| format!("Failed to parse data from {}", source_path))
341 }
342 }
343 }
344 }
345 }
346
347 fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
349 if content_type.contains("json") {
351 return DataFormat::JSON;
352 }
353 if content_type.contains("csv") || content_type.contains("text/plain") {
354 return DataFormat::CSV;
355 }
356
357 if url.ends_with(".json") {
359 DataFormat::JSON
360 } else if url.ends_with(".csv") {
361 DataFormat::CSV
362 } else {
363 DataFormat::Auto
365 }
366 }
367
368 fn extract_json_path(
370 &self,
371 _table: DataTable,
372 json_path: &str,
373 bytes: &[u8],
374 ) -> Result<DataTable> {
375 let json_value: serde_json::Value = serde_json::from_slice(bytes)
377 .with_context(|| "Failed to parse JSON for path extraction")?;
378
379 let extracted = self
381 .navigate_json_path(&json_value, json_path)
382 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
383
384 let array_value = match extracted {
387 serde_json::Value::Array(_) => extracted.clone(),
388 _ => serde_json::Value::Array(vec![extracted.clone()]),
389 };
390
391 let extracted_bytes = serde_json::to_vec(&array_value)?;
393
394 let reader = Cursor::new(extracted_bytes);
396 load_json_from_reader(reader, "extracted", "web", json_path)
397 }
398
399 fn navigate_json_path<'a>(
401 &self,
402 value: &'a serde_json::Value,
403 path: &str,
404 ) -> Result<&'a serde_json::Value> {
405 let mut current = value;
406
407 for part in path.split('.') {
410 if part.is_empty() {
411 continue;
412 }
413
414 current = current
415 .get(part)
416 .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", part))?;
417 }
418
419 Ok(current)
420 }
421
422 fn resolve_env_var(&self, value: &str) -> Result<String> {
424 let mut result = value.to_string();
425
426 let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
429 for cap in re.captures_iter(value) {
430 let var_name = &cap[1];
431 match std::env::var(var_name) {
432 Ok(var_value) => {
433 result = result.replace(&cap[0], &var_value);
434 }
435 Err(_) => {
436 debug!(
439 "Environment variable {} not found, keeping placeholder",
440 var_name
441 );
442 }
443 }
444 }
445
446 if result.starts_with('$') && !result.starts_with("${") {
448 let var_name = &result[1..];
449 if let Ok(var_value) = std::env::var(var_name) {
450 return Ok(var_value);
451 }
452 }
453
454 Ok(result)
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn test_detect_format() {
464 let fetcher = WebDataFetcher::new().unwrap();
465
466 assert!(matches!(
468 fetcher.detect_format("http://example.com/data.csv", ""),
469 DataFormat::CSV
470 ));
471 assert!(matches!(
472 fetcher.detect_format("http://example.com/data.json", ""),
473 DataFormat::JSON
474 ));
475
476 assert!(matches!(
478 fetcher.detect_format("http://example.com/data", "application/json"),
479 DataFormat::JSON
480 ));
481 assert!(matches!(
482 fetcher.detect_format("http://example.com/data", "text/csv"),
483 DataFormat::CSV
484 ));
485
486 assert!(matches!(
488 fetcher.detect_format("http://example.com/data", ""),
489 DataFormat::Auto
490 ));
491 }
492}