1use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{
13 load_csv_from_reader_with_opts, load_json_from_reader, navigate_json_path, CsvReadOptions,
14};
15use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
16
17#[cfg(feature = "redis-cache")]
18use crate::redis_cache_module::RedisCache;
19
20pub struct WebDataFetcher {
22 client: reqwest::blocking::Client,
23}
24
25impl WebDataFetcher {
26 pub fn new() -> Result<Self> {
27 let client = reqwest::blocking::Client::builder()
28 .timeout(Duration::from_secs(30))
29 .user_agent("sql-cli/1.0")
30 .build()?;
31
32 Ok(Self { client })
33 }
34
35 pub fn fetch(
38 &self,
39 spec: &WebCTESpec,
40 table_name: &str,
41 _query_context: Option<&str>, ) -> Result<DataTable> {
43 if spec.url.starts_with("file://") {
45 return self.fetch_file(spec, table_name);
46 }
47
48 #[cfg(feature = "redis-cache")]
51 let cache_key = {
52 let method = format!("{:?}", spec.method.as_ref().unwrap_or(&HttpMethod::GET));
53
54 RedisCache::generate_key_full(
56 table_name, &spec.url,
58 Some(&method),
59 &spec.headers,
60 spec.body.as_deref(),
61 "", spec.json_path.as_deref(), &spec.form_files, &spec.form_fields, )
66 };
67
68 #[cfg(feature = "redis-cache")]
70 {
71 let mut cache = RedisCache::new();
72 if cache.is_enabled() {
73 if let Some(cached_bytes) = cache.get(&cache_key) {
74 match DataTable::from_parquet_bytes(&cached_bytes) {
76 Ok(table) => {
77 eprintln!(
78 "Cache HIT for {} (key: {}...)",
79 table_name,
80 &cache_key[0..48.min(cache_key.len())]
81 );
82 return Ok(table);
83 }
84 Err(e) => {
85 debug!("Failed to deserialize cached data: {}", e);
86 }
88 }
89 } else {
90 eprintln!(
91 "Cache MISS for {} (key: {}...)",
92 table_name,
93 &cache_key[0..48.min(cache_key.len())]
94 );
95 }
96 }
97 }
98
99 info!("Fetching data from URL: {}", spec.url);
100
101 let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
104 HttpMethod::GET => self.client.get(&spec.url),
105 HttpMethod::POST => self.client.post(&spec.url),
106 HttpMethod::PUT => self.client.put(&spec.url),
107 HttpMethod::DELETE => self.client.delete(&spec.url),
108 HttpMethod::PATCH => self.client.patch(&spec.url),
109 };
110
111 for (key, value) in &spec.headers {
113 let resolved_value = self.resolve_env_var(value)?;
114 request = request.header(key, resolved_value);
115 }
116
117 if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
119 let mut form = reqwest::blocking::multipart::Form::new();
120
121 for (field_name, file_path) in &spec.form_files {
123 let resolved_path = self.resolve_env_var(file_path)?;
124 let file = std::fs::File::open(&resolved_path)
125 .with_context(|| format!("Failed to open file: {}", resolved_path))?;
126 let file_name = std::path::Path::new(&resolved_path)
127 .file_name()
128 .and_then(|n| n.to_str())
129 .unwrap_or("file")
130 .to_string();
131 let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
132 form = form.part(field_name.clone(), part);
133 }
134
135 for (field_name, value) in &spec.form_fields {
137 let resolved_value = self.resolve_env_var(value)?;
138 form = form.text(field_name.clone(), resolved_value);
139 }
140
141 request = request.multipart(form);
142 }
143 else if let Some(body) = &spec.body {
145 let resolved_body = self.resolve_env_var(body)?;
146
147 eprintln!("\n=== WEB CTE Request Debug ===");
149 eprintln!("URL: {}", spec.url);
150 eprintln!(
151 "Method: {:?}",
152 spec.method.as_ref().unwrap_or(&HttpMethod::POST)
153 );
154 eprintln!("Body (after template expansion):");
155 eprintln!("{}", resolved_body);
156 eprintln!("=============================\n");
157
158 request = request.body(resolved_body);
159 let has_content_type = spec
163 .headers
164 .iter()
165 .any(|(k, _)| k.eq_ignore_ascii_case("content-type"));
166 if !has_content_type && spec.body.as_ref().unwrap().trim().starts_with('{') {
167 request = request.header("Content-Type", "application/json");
168 }
169 }
170
171 let response = request
173 .send()
174 .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
175
176 if !response.status().is_success() {
178 let status = response.status();
179 let body = response.text().unwrap_or_default();
183 let snippet: String = body.chars().take(2000).collect();
184 return Err(anyhow::anyhow!(
185 "HTTP request failed with status {}: {}\nResponse body: {}",
186 status,
187 spec.url,
188 snippet
189 ));
190 }
191
192 let content_type = response
194 .headers()
195 .get("content-type")
196 .and_then(|v| v.to_str().ok())
197 .unwrap_or("")
198 .to_string();
199
200 debug!("Response content-type: {}", content_type);
201
202 let bytes = response.bytes()?;
204
205 let format = match &spec.format {
207 Some(fmt) => fmt.clone(),
208 None => self.detect_format(&spec.url, &content_type),
209 };
210
211 info!("Using format: {:?} for {}", format, spec.url);
212
213 let result = if let Some(json_path) = &spec.json_path {
215 if matches!(format, DataFormat::JSON | DataFormat::Auto) {
216 let json_value: serde_json::Value = serde_json::from_slice(&bytes)
218 .with_context(|| "Failed to parse JSON for path extraction")?;
219
220 let extracted = navigate_json_path(&json_value, json_path)
222 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
223
224 let array_value = match extracted {
226 serde_json::Value::Array(_) => extracted,
227 other => serde_json::Value::Array(vec![other]),
228 };
229
230 let extracted_bytes = serde_json::to_vec(&array_value)?;
231 self.parse_data(
234 extracted_bytes,
235 DataFormat::JSON,
236 table_name,
237 "web",
238 &spec.url,
239 spec.delimiter,
240 )?
241 } else {
242 self.parse_data(
243 bytes.to_vec(),
244 format,
245 table_name,
246 "web",
247 &spec.url,
248 spec.delimiter,
249 )?
250 }
251 } else {
252 self.parse_data(
253 bytes.to_vec(),
254 format,
255 table_name,
256 "web",
257 &spec.url,
258 spec.delimiter,
259 )?
260 };
261
262 #[cfg(feature = "redis-cache")]
264 {
265 let mut cache = RedisCache::new();
266 if cache.is_enabled() {
267 let ttl = spec.cache_seconds.unwrap_or_else(|| {
269 if spec.url.contains("prod") {
271 3600 } else if spec.url.contains("staging") {
273 300 } else {
275 600 }
277 });
278
279 match result.to_parquet_bytes() {
281 Ok(parquet_bytes) => {
282 if let Err(e) = cache.set(&cache_key, &parquet_bytes, ttl) {
283 debug!("Failed to cache result: {}", e);
284 } else {
285 eprintln!("Cached {} for {} seconds", table_name, ttl);
286 }
287 }
288 Err(e) => {
289 debug!("Failed to serialize to parquet: {}", e);
290 }
291 }
292 }
293 }
294
295 Ok(result)
296 }
297
298 fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
300 let file_path = if spec.url.starts_with("file://") {
302 &spec.url[7..] } else {
304 &spec.url
305 };
306
307 info!("Reading local file: {}", file_path);
308
309 let path = Path::new(file_path);
311 if !path.exists() {
312 return Err(anyhow::anyhow!("File not found: {}", file_path));
313 }
314
315 let file =
317 File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
318
319 let metadata = file.metadata()?;
321 let file_size = metadata.len();
322 debug!("File size: {} bytes", file_size);
323
324 let format = match &spec.format {
326 Some(fmt) => fmt.clone(),
327 None => self.detect_format(file_path, ""),
328 };
329
330 info!("Using format: {:?} for {}", format, file_path);
331
332 let csv_opts = CsvReadOptions {
336 delimiter: spec.delimiter.unwrap_or(b','),
337 has_headers: true,
338 };
339
340 match format {
342 DataFormat::CSV => {
343 load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
344 .with_context(|| format!("Failed to parse CSV from {}", file_path))
345 }
346 DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
347 .with_context(|| format!("Failed to parse JSON from {}", file_path)),
348 DataFormat::Auto => {
349 if file_path.ends_with(".json") {
351 let file = File::open(path)?;
352 load_json_from_reader(file, table_name, "file", file_path)
353 .with_context(|| format!("Failed to parse JSON from {}", file_path))
354 } else {
355 let file = File::open(path)?;
357 load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
358 .with_context(|| format!("Failed to parse CSV from {}", file_path))
359 }
360 }
361 }
362 }
363
364 fn parse_data(
367 &self,
368 bytes: Vec<u8>,
369 format: DataFormat,
370 table_name: &str,
371 source_type: &str,
372 source_path: &str,
373 delimiter: Option<u8>,
374 ) -> Result<DataTable> {
375 let csv_opts = CsvReadOptions {
376 delimiter: delimiter.unwrap_or(b','),
377 has_headers: true,
378 };
379 match format {
380 DataFormat::CSV => {
381 let reader = Cursor::new(bytes);
382 load_csv_from_reader_with_opts(
383 reader,
384 table_name,
385 source_type,
386 source_path,
387 &csv_opts,
388 )
389 .with_context(|| format!("Failed to parse CSV from {}", source_path))
390 }
391 DataFormat::JSON => {
392 let reader = Cursor::new(bytes);
393 load_json_from_reader(reader, table_name, source_type, source_path)
394 .with_context(|| format!("Failed to parse JSON from {}", source_path))
395 }
396 DataFormat::Auto => {
397 let reader_csv = Cursor::new(bytes.clone());
399 match load_csv_from_reader_with_opts(
400 reader_csv,
401 table_name,
402 source_type,
403 source_path,
404 &csv_opts,
405 ) {
406 Ok(table) => Ok(table),
407 Err(_) => {
408 debug!("CSV parsing failed, trying JSON");
409 let reader_json = Cursor::new(bytes);
410 load_json_from_reader(reader_json, table_name, source_type, source_path)
411 .with_context(|| format!("Failed to parse data from {}", source_path))
412 }
413 }
414 }
415 }
416 }
417
418 fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
420 if content_type.contains("json") {
422 return DataFormat::JSON;
423 }
424 if content_type.contains("csv") || content_type.contains("text/plain") {
425 return DataFormat::CSV;
426 }
427
428 if url.ends_with(".json") {
430 DataFormat::JSON
431 } else if url.ends_with(".csv") {
432 DataFormat::CSV
433 } else {
434 DataFormat::Auto
436 }
437 }
438
439 fn extract_json_path(
441 &self,
442 _table: DataTable,
443 json_path: &str,
444 bytes: &[u8],
445 ) -> Result<DataTable> {
446 let json_value: serde_json::Value = serde_json::from_slice(bytes)
448 .with_context(|| "Failed to parse JSON for path extraction")?;
449
450 let extracted = navigate_json_path(&json_value, json_path)
452 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
453
454 let array_value = match extracted {
457 serde_json::Value::Array(_) => extracted.clone(),
458 _ => serde_json::Value::Array(vec![extracted.clone()]),
459 };
460
461 let extracted_bytes = serde_json::to_vec(&array_value)?;
463
464 let reader = Cursor::new(extracted_bytes);
466 load_json_from_reader(reader, "extracted", "web", json_path)
467 }
468
469 fn resolve_env_var(&self, value: &str) -> Result<String> {
471 let mut result = value.to_string();
472
473 let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
476 for cap in re.captures_iter(value) {
477 let var_name = &cap[1];
478 match std::env::var(var_name) {
479 Ok(var_value) => {
480 result = result.replace(&cap[0], &var_value);
481 }
482 Err(_) => {
483 debug!(
486 "Environment variable {} not found, keeping placeholder",
487 var_name
488 );
489 }
490 }
491 }
492
493 if result.starts_with('$') && !result.starts_with("${") {
495 let var_name = &result[1..];
496 if let Ok(var_value) = std::env::var(var_name) {
497 return Ok(var_value);
498 }
499 }
500
501 Ok(result)
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508
509 #[test]
510 fn test_detect_format() {
511 let fetcher = WebDataFetcher::new().unwrap();
512
513 assert!(matches!(
515 fetcher.detect_format("http://example.com/data.csv", ""),
516 DataFormat::CSV
517 ));
518 assert!(matches!(
519 fetcher.detect_format("http://example.com/data.json", ""),
520 DataFormat::JSON
521 ));
522
523 assert!(matches!(
525 fetcher.detect_format("http://example.com/data", "application/json"),
526 DataFormat::JSON
527 ));
528 assert!(matches!(
529 fetcher.detect_format("http://example.com/data", "text/csv"),
530 DataFormat::CSV
531 ));
532
533 assert!(matches!(
535 fetcher.detect_format("http://example.com/data", ""),
536 DataFormat::Auto
537 ));
538 }
539}