1use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{
13 load_csv_from_reader_with_opts, load_json_from_reader, CsvReadOptions,
14};
15use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
16
17#[cfg(feature = "redis-cache")]
18use crate::redis_cache_module::RedisCache;
19
20pub struct WebDataFetcher {
22 client: reqwest::blocking::Client,
23}
24
25impl WebDataFetcher {
26 pub fn new() -> Result<Self> {
27 let client = reqwest::blocking::Client::builder()
28 .timeout(Duration::from_secs(30))
29 .user_agent("sql-cli/1.0")
30 .build()?;
31
32 Ok(Self { client })
33 }
34
35 pub fn fetch(
38 &self,
39 spec: &WebCTESpec,
40 table_name: &str,
41 _query_context: Option<&str>, ) -> Result<DataTable> {
43 if spec.url.starts_with("file://") {
45 return self.fetch_file(spec, table_name);
46 }
47
48 #[cfg(feature = "redis-cache")]
51 let cache_key = {
52 let method = format!("{:?}", spec.method.as_ref().unwrap_or(&HttpMethod::GET));
53
54 RedisCache::generate_key_full(
56 table_name, &spec.url,
58 Some(&method),
59 &spec.headers,
60 spec.body.as_deref(),
61 "", spec.json_path.as_deref(), &spec.form_files, &spec.form_fields, )
66 };
67
68 #[cfg(feature = "redis-cache")]
70 {
71 let mut cache = RedisCache::new();
72 if cache.is_enabled() {
73 if let Some(cached_bytes) = cache.get(&cache_key) {
74 match DataTable::from_parquet_bytes(&cached_bytes) {
76 Ok(table) => {
77 eprintln!(
78 "Cache HIT for {} (key: {}...)",
79 table_name,
80 &cache_key[0..48.min(cache_key.len())]
81 );
82 return Ok(table);
83 }
84 Err(e) => {
85 debug!("Failed to deserialize cached data: {}", e);
86 }
88 }
89 } else {
90 eprintln!(
91 "Cache MISS for {} (key: {}...)",
92 table_name,
93 &cache_key[0..48.min(cache_key.len())]
94 );
95 }
96 }
97 }
98
99 info!("Fetching data from URL: {}", spec.url);
100
101 let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
104 HttpMethod::GET => self.client.get(&spec.url),
105 HttpMethod::POST => self.client.post(&spec.url),
106 HttpMethod::PUT => self.client.put(&spec.url),
107 HttpMethod::DELETE => self.client.delete(&spec.url),
108 HttpMethod::PATCH => self.client.patch(&spec.url),
109 };
110
111 for (key, value) in &spec.headers {
113 let resolved_value = self.resolve_env_var(value)?;
114 request = request.header(key, resolved_value);
115 }
116
117 if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
119 let mut form = reqwest::blocking::multipart::Form::new();
120
121 for (field_name, file_path) in &spec.form_files {
123 let resolved_path = self.resolve_env_var(file_path)?;
124 let file = std::fs::File::open(&resolved_path)
125 .with_context(|| format!("Failed to open file: {}", resolved_path))?;
126 let file_name = std::path::Path::new(&resolved_path)
127 .file_name()
128 .and_then(|n| n.to_str())
129 .unwrap_or("file")
130 .to_string();
131 let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
132 form = form.part(field_name.clone(), part);
133 }
134
135 for (field_name, value) in &spec.form_fields {
137 let resolved_value = self.resolve_env_var(value)?;
138 form = form.text(field_name.clone(), resolved_value);
139 }
140
141 request = request.multipart(form);
142 }
143 else if let Some(body) = &spec.body {
145 let resolved_body = self.resolve_env_var(body)?;
146
147 eprintln!("\n=== WEB CTE Request Debug ===");
149 eprintln!("URL: {}", spec.url);
150 eprintln!(
151 "Method: {:?}",
152 spec.method.as_ref().unwrap_or(&HttpMethod::POST)
153 );
154 eprintln!("Body (after template expansion):");
155 eprintln!("{}", resolved_body);
156 eprintln!("=============================\n");
157
158 request = request.body(resolved_body);
159 let has_content_type = spec
163 .headers
164 .iter()
165 .any(|(k, _)| k.eq_ignore_ascii_case("content-type"));
166 if !has_content_type && spec.body.as_ref().unwrap().trim().starts_with('{') {
167 request = request.header("Content-Type", "application/json");
168 }
169 }
170
171 let response = request
173 .send()
174 .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
175
176 if !response.status().is_success() {
178 let status = response.status();
179 let body = response.text().unwrap_or_default();
183 let snippet: String = body.chars().take(2000).collect();
184 return Err(anyhow::anyhow!(
185 "HTTP request failed with status {}: {}\nResponse body: {}",
186 status,
187 spec.url,
188 snippet
189 ));
190 }
191
192 let content_type = response
194 .headers()
195 .get("content-type")
196 .and_then(|v| v.to_str().ok())
197 .unwrap_or("")
198 .to_string();
199
200 debug!("Response content-type: {}", content_type);
201
202 let bytes = response.bytes()?;
204
205 let format = match &spec.format {
207 Some(fmt) => fmt.clone(),
208 None => self.detect_format(&spec.url, &content_type),
209 };
210
211 info!("Using format: {:?} for {}", format, spec.url);
212
213 let result = if let Some(json_path) = &spec.json_path {
215 if matches!(format, DataFormat::JSON | DataFormat::Auto) {
216 let json_value: serde_json::Value = serde_json::from_slice(&bytes)
218 .with_context(|| "Failed to parse JSON for path extraction")?;
219
220 let extracted = self
222 .navigate_json_path(&json_value, json_path)
223 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
224
225 let array_value = match extracted {
227 serde_json::Value::Array(_) => extracted,
228 other => serde_json::Value::Array(vec![other]),
229 };
230
231 let extracted_bytes = serde_json::to_vec(&array_value)?;
232 self.parse_data(
235 extracted_bytes,
236 DataFormat::JSON,
237 table_name,
238 "web",
239 &spec.url,
240 spec.delimiter,
241 )?
242 } else {
243 self.parse_data(
244 bytes.to_vec(),
245 format,
246 table_name,
247 "web",
248 &spec.url,
249 spec.delimiter,
250 )?
251 }
252 } else {
253 self.parse_data(
254 bytes.to_vec(),
255 format,
256 table_name,
257 "web",
258 &spec.url,
259 spec.delimiter,
260 )?
261 };
262
263 #[cfg(feature = "redis-cache")]
265 {
266 let mut cache = RedisCache::new();
267 if cache.is_enabled() {
268 let ttl = spec.cache_seconds.unwrap_or_else(|| {
270 if spec.url.contains("prod") {
272 3600 } else if spec.url.contains("staging") {
274 300 } else {
276 600 }
278 });
279
280 match result.to_parquet_bytes() {
282 Ok(parquet_bytes) => {
283 if let Err(e) = cache.set(&cache_key, &parquet_bytes, ttl) {
284 debug!("Failed to cache result: {}", e);
285 } else {
286 eprintln!("Cached {} for {} seconds", table_name, ttl);
287 }
288 }
289 Err(e) => {
290 debug!("Failed to serialize to parquet: {}", e);
291 }
292 }
293 }
294 }
295
296 Ok(result)
297 }
298
299 fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
301 let file_path = if spec.url.starts_with("file://") {
303 &spec.url[7..] } else {
305 &spec.url
306 };
307
308 info!("Reading local file: {}", file_path);
309
310 let path = Path::new(file_path);
312 if !path.exists() {
313 return Err(anyhow::anyhow!("File not found: {}", file_path));
314 }
315
316 let file =
318 File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
319
320 let metadata = file.metadata()?;
322 let file_size = metadata.len();
323 debug!("File size: {} bytes", file_size);
324
325 let format = match &spec.format {
327 Some(fmt) => fmt.clone(),
328 None => self.detect_format(file_path, ""),
329 };
330
331 info!("Using format: {:?} for {}", format, file_path);
332
333 let csv_opts = CsvReadOptions {
337 delimiter: spec.delimiter.unwrap_or(b','),
338 has_headers: true,
339 };
340
341 match format {
343 DataFormat::CSV => {
344 load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
345 .with_context(|| format!("Failed to parse CSV from {}", file_path))
346 }
347 DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
348 .with_context(|| format!("Failed to parse JSON from {}", file_path)),
349 DataFormat::Auto => {
350 if file_path.ends_with(".json") {
352 let file = File::open(path)?;
353 load_json_from_reader(file, table_name, "file", file_path)
354 .with_context(|| format!("Failed to parse JSON from {}", file_path))
355 } else {
356 let file = File::open(path)?;
358 load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
359 .with_context(|| format!("Failed to parse CSV from {}", file_path))
360 }
361 }
362 }
363 }
364
365 fn parse_data(
368 &self,
369 bytes: Vec<u8>,
370 format: DataFormat,
371 table_name: &str,
372 source_type: &str,
373 source_path: &str,
374 delimiter: Option<u8>,
375 ) -> Result<DataTable> {
376 let csv_opts = CsvReadOptions {
377 delimiter: delimiter.unwrap_or(b','),
378 has_headers: true,
379 };
380 match format {
381 DataFormat::CSV => {
382 let reader = Cursor::new(bytes);
383 load_csv_from_reader_with_opts(
384 reader,
385 table_name,
386 source_type,
387 source_path,
388 &csv_opts,
389 )
390 .with_context(|| format!("Failed to parse CSV from {}", source_path))
391 }
392 DataFormat::JSON => {
393 let reader = Cursor::new(bytes);
394 load_json_from_reader(reader, table_name, source_type, source_path)
395 .with_context(|| format!("Failed to parse JSON from {}", source_path))
396 }
397 DataFormat::Auto => {
398 let reader_csv = Cursor::new(bytes.clone());
400 match load_csv_from_reader_with_opts(
401 reader_csv,
402 table_name,
403 source_type,
404 source_path,
405 &csv_opts,
406 ) {
407 Ok(table) => Ok(table),
408 Err(_) => {
409 debug!("CSV parsing failed, trying JSON");
410 let reader_json = Cursor::new(bytes);
411 load_json_from_reader(reader_json, table_name, source_type, source_path)
412 .with_context(|| format!("Failed to parse data from {}", source_path))
413 }
414 }
415 }
416 }
417 }
418
419 fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
421 if content_type.contains("json") {
423 return DataFormat::JSON;
424 }
425 if content_type.contains("csv") || content_type.contains("text/plain") {
426 return DataFormat::CSV;
427 }
428
429 if url.ends_with(".json") {
431 DataFormat::JSON
432 } else if url.ends_with(".csv") {
433 DataFormat::CSV
434 } else {
435 DataFormat::Auto
437 }
438 }
439
440 fn extract_json_path(
442 &self,
443 _table: DataTable,
444 json_path: &str,
445 bytes: &[u8],
446 ) -> Result<DataTable> {
447 let json_value: serde_json::Value = serde_json::from_slice(bytes)
449 .with_context(|| "Failed to parse JSON for path extraction")?;
450
451 let extracted = self
453 .navigate_json_path(&json_value, json_path)
454 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
455
456 let array_value = match extracted {
459 serde_json::Value::Array(_) => extracted.clone(),
460 _ => serde_json::Value::Array(vec![extracted.clone()]),
461 };
462
463 let extracted_bytes = serde_json::to_vec(&array_value)?;
465
466 let reader = Cursor::new(extracted_bytes);
468 load_json_from_reader(reader, "extracted", "web", json_path)
469 }
470
471 fn navigate_json_path(
483 &self,
484 value: &serde_json::Value,
485 path: &str,
486 ) -> Result<serde_json::Value> {
487 let parts: Vec<&str> = path.split('.').filter(|p| !p.is_empty()).collect();
488 Self::walk_json_path(value, &parts)
489 }
490
491 fn walk_json_path(value: &serde_json::Value, parts: &[&str]) -> Result<serde_json::Value> {
492 let Some((head, tail)) = parts.split_first() else {
493 return Ok(value.clone());
494 };
495
496 if let Some(name) = head.strip_suffix("[]") {
499 let array_val = if name.is_empty() {
500 value
501 } else {
502 value
503 .get(name)
504 .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", name))?
505 };
506 let arr = array_val.as_array().ok_or_else(|| {
507 let kind = match array_val {
508 serde_json::Value::Null => "null",
509 serde_json::Value::Bool(_) => "bool",
510 serde_json::Value::Number(_) => "number",
511 serde_json::Value::String(_) => "string",
512 serde_json::Value::Array(_) => "array",
513 serde_json::Value::Object(_) => "object",
514 };
515 anyhow::anyhow!(
516 "Expected array at '{}' for [] projection, got {}",
517 if name.is_empty() { "<root>" } else { name },
518 kind
519 )
520 })?;
521 let mut projected = Vec::with_capacity(arr.len());
522 for el in arr {
523 projected.push(Self::walk_json_path(el, tail)?);
524 }
525 return Ok(serde_json::Value::Array(projected));
526 }
527
528 let next = value
529 .get(head)
530 .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", head))?;
531 Self::walk_json_path(next, tail)
532 }
533
534 fn resolve_env_var(&self, value: &str) -> Result<String> {
536 let mut result = value.to_string();
537
538 let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
541 for cap in re.captures_iter(value) {
542 let var_name = &cap[1];
543 match std::env::var(var_name) {
544 Ok(var_value) => {
545 result = result.replace(&cap[0], &var_value);
546 }
547 Err(_) => {
548 debug!(
551 "Environment variable {} not found, keeping placeholder",
552 var_name
553 );
554 }
555 }
556 }
557
558 if result.starts_with('$') && !result.starts_with("${") {
560 let var_name = &result[1..];
561 if let Ok(var_value) = std::env::var(var_name) {
562 return Ok(var_value);
563 }
564 }
565
566 Ok(result)
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573
574 #[test]
575 fn test_detect_format() {
576 let fetcher = WebDataFetcher::new().unwrap();
577
578 assert!(matches!(
580 fetcher.detect_format("http://example.com/data.csv", ""),
581 DataFormat::CSV
582 ));
583 assert!(matches!(
584 fetcher.detect_format("http://example.com/data.json", ""),
585 DataFormat::JSON
586 ));
587
588 assert!(matches!(
590 fetcher.detect_format("http://example.com/data", "application/json"),
591 DataFormat::JSON
592 ));
593 assert!(matches!(
594 fetcher.detect_format("http://example.com/data", "text/csv"),
595 DataFormat::CSV
596 ));
597
598 assert!(matches!(
600 fetcher.detect_format("http://example.com/data", ""),
601 DataFormat::Auto
602 ));
603 }
604}