1use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{load_csv_from_reader, load_json_from_reader};
13use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
14
15#[cfg(feature = "redis-cache")]
16use crate::redis_cache_module::RedisCache;
17
18pub struct WebDataFetcher {
20 client: reqwest::blocking::Client,
21}
22
23impl WebDataFetcher {
24 pub fn new() -> Result<Self> {
25 let client = reqwest::blocking::Client::builder()
26 .timeout(Duration::from_secs(30))
27 .user_agent("sql-cli/1.0")
28 .build()?;
29
30 Ok(Self { client })
31 }
32
33 pub fn fetch(
36 &self,
37 spec: &WebCTESpec,
38 table_name: &str,
39 _query_context: Option<&str>, ) -> Result<DataTable> {
41 if spec.url.starts_with("file://") {
43 return self.fetch_file(spec, table_name);
44 }
45
46 #[cfg(feature = "redis-cache")]
49 let cache_key = {
50 let method = format!("{:?}", spec.method.as_ref().unwrap_or(&HttpMethod::GET));
51
52 RedisCache::generate_key_full(
54 table_name, &spec.url,
56 Some(&method),
57 &spec.headers,
58 spec.body.as_deref(),
59 "", spec.json_path.as_deref(), &spec.form_files, &spec.form_fields, )
64 };
65
66 #[cfg(feature = "redis-cache")]
68 {
69 let mut cache = RedisCache::new();
70 if cache.is_enabled() {
71 if let Some(cached_bytes) = cache.get(&cache_key) {
72 match DataTable::from_parquet_bytes(&cached_bytes) {
74 Ok(table) => {
75 eprintln!(
76 "Cache HIT for {} (key: {}...)",
77 table_name,
78 &cache_key[0..48.min(cache_key.len())]
79 );
80 return Ok(table);
81 }
82 Err(e) => {
83 debug!("Failed to deserialize cached data: {}", e);
84 }
86 }
87 } else {
88 eprintln!(
89 "Cache MISS for {} (key: {}...)",
90 table_name,
91 &cache_key[0..48.min(cache_key.len())]
92 );
93 }
94 }
95 }
96
97 info!("Fetching data from URL: {}", spec.url);
98
99 let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
102 HttpMethod::GET => self.client.get(&spec.url),
103 HttpMethod::POST => self.client.post(&spec.url),
104 HttpMethod::PUT => self.client.put(&spec.url),
105 HttpMethod::DELETE => self.client.delete(&spec.url),
106 HttpMethod::PATCH => self.client.patch(&spec.url),
107 };
108
109 for (key, value) in &spec.headers {
111 let resolved_value = self.resolve_env_var(value)?;
112 request = request.header(key, resolved_value);
113 }
114
115 if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
117 let mut form = reqwest::blocking::multipart::Form::new();
118
119 for (field_name, file_path) in &spec.form_files {
121 let resolved_path = self.resolve_env_var(file_path)?;
122 let file = std::fs::File::open(&resolved_path)
123 .with_context(|| format!("Failed to open file: {}", resolved_path))?;
124 let file_name = std::path::Path::new(&resolved_path)
125 .file_name()
126 .and_then(|n| n.to_str())
127 .unwrap_or("file")
128 .to_string();
129 let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
130 form = form.part(field_name.clone(), part);
131 }
132
133 for (field_name, value) in &spec.form_fields {
135 let resolved_value = self.resolve_env_var(value)?;
136 form = form.text(field_name.clone(), resolved_value);
137 }
138
139 request = request.multipart(form);
140 }
141 else if let Some(body) = &spec.body {
143 let resolved_body = self.resolve_env_var(body)?;
144
145 eprintln!("\n=== WEB CTE Request Debug ===");
147 eprintln!("URL: {}", spec.url);
148 eprintln!(
149 "Method: {:?}",
150 spec.method.as_ref().unwrap_or(&HttpMethod::POST)
151 );
152 eprintln!("Body (after template expansion):");
153 eprintln!("{}", resolved_body);
154 eprintln!("=============================\n");
155
156 request = request.body(resolved_body);
157 if spec.body.as_ref().unwrap().trim().starts_with('{') {
159 request = request.header("Content-Type", "application/json");
160 }
161 }
162
163 let response = request
165 .send()
166 .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
167
168 if !response.status().is_success() {
170 return Err(anyhow::anyhow!(
171 "HTTP request failed with status {}: {}",
172 response.status(),
173 spec.url
174 ));
175 }
176
177 let content_type = response
179 .headers()
180 .get("content-type")
181 .and_then(|v| v.to_str().ok())
182 .unwrap_or("")
183 .to_string();
184
185 debug!("Response content-type: {}", content_type);
186
187 let bytes = response.bytes()?;
189
190 let format = match &spec.format {
192 Some(fmt) => fmt.clone(),
193 None => self.detect_format(&spec.url, &content_type),
194 };
195
196 info!("Using format: {:?} for {}", format, spec.url);
197
198 let result = if let Some(json_path) = &spec.json_path {
200 if matches!(format, DataFormat::JSON | DataFormat::Auto) {
201 let json_value: serde_json::Value = serde_json::from_slice(&bytes)
203 .with_context(|| "Failed to parse JSON for path extraction")?;
204
205 let extracted = self
207 .navigate_json_path(&json_value, json_path)
208 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
209
210 let array_value = match extracted {
212 serde_json::Value::Array(_) => extracted.clone(),
213 _ => serde_json::Value::Array(vec![extracted.clone()]),
214 };
215
216 let extracted_bytes = serde_json::to_vec(&array_value)?;
217 self.parse_data(
218 extracted_bytes,
219 DataFormat::JSON,
220 table_name,
221 "web",
222 &spec.url,
223 )?
224 } else {
225 self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)?
226 }
227 } else {
228 self.parse_data(bytes.to_vec(), format, table_name, "web", &spec.url)?
229 };
230
231 #[cfg(feature = "redis-cache")]
233 {
234 let mut cache = RedisCache::new();
235 if cache.is_enabled() {
236 let ttl = spec.cache_seconds.unwrap_or_else(|| {
238 if spec.url.contains("prod") {
240 3600 } else if spec.url.contains("staging") {
242 300 } else {
244 600 }
246 });
247
248 match result.to_parquet_bytes() {
250 Ok(parquet_bytes) => {
251 if let Err(e) = cache.set(&cache_key, &parquet_bytes, ttl) {
252 debug!("Failed to cache result: {}", e);
253 } else {
254 eprintln!("Cached {} for {} seconds", table_name, ttl);
255 }
256 }
257 Err(e) => {
258 debug!("Failed to serialize to parquet: {}", e);
259 }
260 }
261 }
262 }
263
264 Ok(result)
265 }
266
267 fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
269 let file_path = if spec.url.starts_with("file://") {
271 &spec.url[7..] } else {
273 &spec.url
274 };
275
276 info!("Reading local file: {}", file_path);
277
278 let path = Path::new(file_path);
280 if !path.exists() {
281 return Err(anyhow::anyhow!("File not found: {}", file_path));
282 }
283
284 let file =
286 File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
287
288 let metadata = file.metadata()?;
290 let file_size = metadata.len();
291 debug!("File size: {} bytes", file_size);
292
293 let format = match &spec.format {
295 Some(fmt) => fmt.clone(),
296 None => self.detect_format(file_path, ""),
297 };
298
299 info!("Using format: {:?} for {}", format, file_path);
300
301 match format {
303 DataFormat::CSV => load_csv_from_reader(file, table_name, "file", file_path)
304 .with_context(|| format!("Failed to parse CSV from {}", file_path)),
305 DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
306 .with_context(|| format!("Failed to parse JSON from {}", file_path)),
307 DataFormat::Auto => {
308 if file_path.ends_with(".json") {
310 let file = File::open(path)?;
311 load_json_from_reader(file, table_name, "file", file_path)
312 .with_context(|| format!("Failed to parse JSON from {}", file_path))
313 } else {
314 let file = File::open(path)?;
316 load_csv_from_reader(file, table_name, "file", file_path)
317 .with_context(|| format!("Failed to parse CSV from {}", file_path))
318 }
319 }
320 }
321 }
322
323 fn parse_data(
325 &self,
326 bytes: Vec<u8>,
327 format: DataFormat,
328 table_name: &str,
329 source_type: &str,
330 source_path: &str,
331 ) -> Result<DataTable> {
332 match format {
333 DataFormat::CSV => {
334 let reader = Cursor::new(bytes);
335 load_csv_from_reader(reader, table_name, source_type, source_path)
336 .with_context(|| format!("Failed to parse CSV from {}", source_path))
337 }
338 DataFormat::JSON => {
339 let reader = Cursor::new(bytes);
340 load_json_from_reader(reader, table_name, source_type, source_path)
341 .with_context(|| format!("Failed to parse JSON from {}", source_path))
342 }
343 DataFormat::Auto => {
344 let reader_csv = Cursor::new(bytes.clone());
346 match load_csv_from_reader(reader_csv, table_name, source_type, source_path) {
347 Ok(table) => Ok(table),
348 Err(_) => {
349 debug!("CSV parsing failed, trying JSON");
350 let reader_json = Cursor::new(bytes);
351 load_json_from_reader(reader_json, table_name, source_type, source_path)
352 .with_context(|| format!("Failed to parse data from {}", source_path))
353 }
354 }
355 }
356 }
357 }
358
359 fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
361 if content_type.contains("json") {
363 return DataFormat::JSON;
364 }
365 if content_type.contains("csv") || content_type.contains("text/plain") {
366 return DataFormat::CSV;
367 }
368
369 if url.ends_with(".json") {
371 DataFormat::JSON
372 } else if url.ends_with(".csv") {
373 DataFormat::CSV
374 } else {
375 DataFormat::Auto
377 }
378 }
379
380 fn extract_json_path(
382 &self,
383 _table: DataTable,
384 json_path: &str,
385 bytes: &[u8],
386 ) -> Result<DataTable> {
387 let json_value: serde_json::Value = serde_json::from_slice(bytes)
389 .with_context(|| "Failed to parse JSON for path extraction")?;
390
391 let extracted = self
393 .navigate_json_path(&json_value, json_path)
394 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
395
396 let array_value = match extracted {
399 serde_json::Value::Array(_) => extracted.clone(),
400 _ => serde_json::Value::Array(vec![extracted.clone()]),
401 };
402
403 let extracted_bytes = serde_json::to_vec(&array_value)?;
405
406 let reader = Cursor::new(extracted_bytes);
408 load_json_from_reader(reader, "extracted", "web", json_path)
409 }
410
411 fn navigate_json_path<'a>(
413 &self,
414 value: &'a serde_json::Value,
415 path: &str,
416 ) -> Result<&'a serde_json::Value> {
417 let mut current = value;
418
419 for part in path.split('.') {
422 if part.is_empty() {
423 continue;
424 }
425
426 current = current
427 .get(part)
428 .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", part))?;
429 }
430
431 Ok(current)
432 }
433
434 fn resolve_env_var(&self, value: &str) -> Result<String> {
436 let mut result = value.to_string();
437
438 let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
441 for cap in re.captures_iter(value) {
442 let var_name = &cap[1];
443 match std::env::var(var_name) {
444 Ok(var_value) => {
445 result = result.replace(&cap[0], &var_value);
446 }
447 Err(_) => {
448 debug!(
451 "Environment variable {} not found, keeping placeholder",
452 var_name
453 );
454 }
455 }
456 }
457
458 if result.starts_with('$') && !result.starts_with("${") {
460 let var_name = &result[1..];
461 if let Ok(var_value) = std::env::var(var_name) {
462 return Ok(var_value);
463 }
464 }
465
466 Ok(result)
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473
474 #[test]
475 fn test_detect_format() {
476 let fetcher = WebDataFetcher::new().unwrap();
477
478 assert!(matches!(
480 fetcher.detect_format("http://example.com/data.csv", ""),
481 DataFormat::CSV
482 ));
483 assert!(matches!(
484 fetcher.detect_format("http://example.com/data.json", ""),
485 DataFormat::JSON
486 ));
487
488 assert!(matches!(
490 fetcher.detect_format("http://example.com/data", "application/json"),
491 DataFormat::JSON
492 ));
493 assert!(matches!(
494 fetcher.detect_format("http://example.com/data", "text/csv"),
495 DataFormat::CSV
496 ));
497
498 assert!(matches!(
500 fetcher.detect_format("http://example.com/data", ""),
501 DataFormat::Auto
502 ));
503 }
504}