1use anyhow::{Context, Result};
3use regex::Regex;
4use serde_json;
5use std::fs::File;
6use std::io::Cursor;
7use std::path::Path;
8use std::time::Duration;
9use tracing::{debug, info};
10
11use crate::data::datatable::DataTable;
12use crate::data::stream_loader::{
13 load_csv_from_reader_with_opts, load_json_from_reader, CsvReadOptions,
14};
15use crate::sql::parser::ast::{DataFormat, HttpMethod, WebCTESpec};
16
17#[cfg(feature = "redis-cache")]
18use crate::redis_cache_module::RedisCache;
19
20pub struct WebDataFetcher {
22 client: reqwest::blocking::Client,
23}
24
25impl WebDataFetcher {
26 pub fn new() -> Result<Self> {
27 let client = reqwest::blocking::Client::builder()
28 .timeout(Duration::from_secs(30))
29 .user_agent("sql-cli/1.0")
30 .build()?;
31
32 Ok(Self { client })
33 }
34
35 pub fn fetch(
38 &self,
39 spec: &WebCTESpec,
40 table_name: &str,
41 _query_context: Option<&str>, ) -> Result<DataTable> {
43 if spec.url.starts_with("file://") {
45 return self.fetch_file(spec, table_name);
46 }
47
48 #[cfg(feature = "redis-cache")]
51 let cache_key = {
52 let method = format!("{:?}", spec.method.as_ref().unwrap_or(&HttpMethod::GET));
53
54 RedisCache::generate_key_full(
56 table_name, &spec.url,
58 Some(&method),
59 &spec.headers,
60 spec.body.as_deref(),
61 "", spec.json_path.as_deref(), &spec.form_files, &spec.form_fields, )
66 };
67
68 #[cfg(feature = "redis-cache")]
70 {
71 let mut cache = RedisCache::new();
72 if cache.is_enabled() {
73 if let Some(cached_bytes) = cache.get(&cache_key) {
74 match DataTable::from_parquet_bytes(&cached_bytes) {
76 Ok(table) => {
77 eprintln!(
78 "Cache HIT for {} (key: {}...)",
79 table_name,
80 &cache_key[0..48.min(cache_key.len())]
81 );
82 return Ok(table);
83 }
84 Err(e) => {
85 debug!("Failed to deserialize cached data: {}", e);
86 }
88 }
89 } else {
90 eprintln!(
91 "Cache MISS for {} (key: {}...)",
92 table_name,
93 &cache_key[0..48.min(cache_key.len())]
94 );
95 }
96 }
97 }
98
99 info!("Fetching data from URL: {}", spec.url);
100
101 let mut request = match spec.method.as_ref().unwrap_or(&HttpMethod::GET) {
104 HttpMethod::GET => self.client.get(&spec.url),
105 HttpMethod::POST => self.client.post(&spec.url),
106 HttpMethod::PUT => self.client.put(&spec.url),
107 HttpMethod::DELETE => self.client.delete(&spec.url),
108 HttpMethod::PATCH => self.client.patch(&spec.url),
109 };
110
111 for (key, value) in &spec.headers {
113 let resolved_value = self.resolve_env_var(value)?;
114 request = request.header(key, resolved_value);
115 }
116
117 if !spec.form_files.is_empty() || !spec.form_fields.is_empty() {
119 let mut form = reqwest::blocking::multipart::Form::new();
120
121 for (field_name, file_path) in &spec.form_files {
123 let resolved_path = self.resolve_env_var(file_path)?;
124 let file = std::fs::File::open(&resolved_path)
125 .with_context(|| format!("Failed to open file: {}", resolved_path))?;
126 let file_name = std::path::Path::new(&resolved_path)
127 .file_name()
128 .and_then(|n| n.to_str())
129 .unwrap_or("file")
130 .to_string();
131 let part = reqwest::blocking::multipart::Part::reader(file).file_name(file_name);
132 form = form.part(field_name.clone(), part);
133 }
134
135 for (field_name, value) in &spec.form_fields {
137 let resolved_value = self.resolve_env_var(value)?;
138 form = form.text(field_name.clone(), resolved_value);
139 }
140
141 request = request.multipart(form);
142 }
143 else if let Some(body) = &spec.body {
145 let resolved_body = self.resolve_env_var(body)?;
146
147 eprintln!("\n=== WEB CTE Request Debug ===");
149 eprintln!("URL: {}", spec.url);
150 eprintln!(
151 "Method: {:?}",
152 spec.method.as_ref().unwrap_or(&HttpMethod::POST)
153 );
154 eprintln!("Body (after template expansion):");
155 eprintln!("{}", resolved_body);
156 eprintln!("=============================\n");
157
158 request = request.body(resolved_body);
159 if spec.body.as_ref().unwrap().trim().starts_with('{') {
161 request = request.header("Content-Type", "application/json");
162 }
163 }
164
165 let response = request
167 .send()
168 .with_context(|| format!("Failed to fetch from URL: {}", spec.url))?;
169
170 if !response.status().is_success() {
172 return Err(anyhow::anyhow!(
173 "HTTP request failed with status {}: {}",
174 response.status(),
175 spec.url
176 ));
177 }
178
179 let content_type = response
181 .headers()
182 .get("content-type")
183 .and_then(|v| v.to_str().ok())
184 .unwrap_or("")
185 .to_string();
186
187 debug!("Response content-type: {}", content_type);
188
189 let bytes = response.bytes()?;
191
192 let format = match &spec.format {
194 Some(fmt) => fmt.clone(),
195 None => self.detect_format(&spec.url, &content_type),
196 };
197
198 info!("Using format: {:?} for {}", format, spec.url);
199
200 let result = if let Some(json_path) = &spec.json_path {
202 if matches!(format, DataFormat::JSON | DataFormat::Auto) {
203 let json_value: serde_json::Value = serde_json::from_slice(&bytes)
205 .with_context(|| "Failed to parse JSON for path extraction")?;
206
207 let extracted = self
209 .navigate_json_path(&json_value, json_path)
210 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
211
212 let array_value = match extracted {
214 serde_json::Value::Array(_) => extracted.clone(),
215 _ => serde_json::Value::Array(vec![extracted.clone()]),
216 };
217
218 let extracted_bytes = serde_json::to_vec(&array_value)?;
219 self.parse_data(
222 extracted_bytes,
223 DataFormat::JSON,
224 table_name,
225 "web",
226 &spec.url,
227 spec.delimiter,
228 )?
229 } else {
230 self.parse_data(
231 bytes.to_vec(),
232 format,
233 table_name,
234 "web",
235 &spec.url,
236 spec.delimiter,
237 )?
238 }
239 } else {
240 self.parse_data(
241 bytes.to_vec(),
242 format,
243 table_name,
244 "web",
245 &spec.url,
246 spec.delimiter,
247 )?
248 };
249
250 #[cfg(feature = "redis-cache")]
252 {
253 let mut cache = RedisCache::new();
254 if cache.is_enabled() {
255 let ttl = spec.cache_seconds.unwrap_or_else(|| {
257 if spec.url.contains("prod") {
259 3600 } else if spec.url.contains("staging") {
261 300 } else {
263 600 }
265 });
266
267 match result.to_parquet_bytes() {
269 Ok(parquet_bytes) => {
270 if let Err(e) = cache.set(&cache_key, &parquet_bytes, ttl) {
271 debug!("Failed to cache result: {}", e);
272 } else {
273 eprintln!("Cached {} for {} seconds", table_name, ttl);
274 }
275 }
276 Err(e) => {
277 debug!("Failed to serialize to parquet: {}", e);
278 }
279 }
280 }
281 }
282
283 Ok(result)
284 }
285
286 fn fetch_file(&self, spec: &WebCTESpec, table_name: &str) -> Result<DataTable> {
288 let file_path = if spec.url.starts_with("file://") {
290 &spec.url[7..] } else {
292 &spec.url
293 };
294
295 info!("Reading local file: {}", file_path);
296
297 let path = Path::new(file_path);
299 if !path.exists() {
300 return Err(anyhow::anyhow!("File not found: {}", file_path));
301 }
302
303 let file =
305 File::open(path).with_context(|| format!("Failed to open file: {}", file_path))?;
306
307 let metadata = file.metadata()?;
309 let file_size = metadata.len();
310 debug!("File size: {} bytes", file_size);
311
312 let format = match &spec.format {
314 Some(fmt) => fmt.clone(),
315 None => self.detect_format(file_path, ""),
316 };
317
318 info!("Using format: {:?} for {}", format, file_path);
319
320 let csv_opts = CsvReadOptions {
324 delimiter: spec.delimiter.unwrap_or(b','),
325 has_headers: true,
326 };
327
328 match format {
330 DataFormat::CSV => {
331 load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
332 .with_context(|| format!("Failed to parse CSV from {}", file_path))
333 }
334 DataFormat::JSON => load_json_from_reader(file, table_name, "file", file_path)
335 .with_context(|| format!("Failed to parse JSON from {}", file_path)),
336 DataFormat::Auto => {
337 if file_path.ends_with(".json") {
339 let file = File::open(path)?;
340 load_json_from_reader(file, table_name, "file", file_path)
341 .with_context(|| format!("Failed to parse JSON from {}", file_path))
342 } else {
343 let file = File::open(path)?;
345 load_csv_from_reader_with_opts(file, table_name, "file", file_path, &csv_opts)
346 .with_context(|| format!("Failed to parse CSV from {}", file_path))
347 }
348 }
349 }
350 }
351
352 fn parse_data(
355 &self,
356 bytes: Vec<u8>,
357 format: DataFormat,
358 table_name: &str,
359 source_type: &str,
360 source_path: &str,
361 delimiter: Option<u8>,
362 ) -> Result<DataTable> {
363 let csv_opts = CsvReadOptions {
364 delimiter: delimiter.unwrap_or(b','),
365 has_headers: true,
366 };
367 match format {
368 DataFormat::CSV => {
369 let reader = Cursor::new(bytes);
370 load_csv_from_reader_with_opts(
371 reader,
372 table_name,
373 source_type,
374 source_path,
375 &csv_opts,
376 )
377 .with_context(|| format!("Failed to parse CSV from {}", source_path))
378 }
379 DataFormat::JSON => {
380 let reader = Cursor::new(bytes);
381 load_json_from_reader(reader, table_name, source_type, source_path)
382 .with_context(|| format!("Failed to parse JSON from {}", source_path))
383 }
384 DataFormat::Auto => {
385 let reader_csv = Cursor::new(bytes.clone());
387 match load_csv_from_reader_with_opts(
388 reader_csv,
389 table_name,
390 source_type,
391 source_path,
392 &csv_opts,
393 ) {
394 Ok(table) => Ok(table),
395 Err(_) => {
396 debug!("CSV parsing failed, trying JSON");
397 let reader_json = Cursor::new(bytes);
398 load_json_from_reader(reader_json, table_name, source_type, source_path)
399 .with_context(|| format!("Failed to parse data from {}", source_path))
400 }
401 }
402 }
403 }
404 }
405
406 fn detect_format(&self, url: &str, content_type: &str) -> DataFormat {
408 if content_type.contains("json") {
410 return DataFormat::JSON;
411 }
412 if content_type.contains("csv") || content_type.contains("text/plain") {
413 return DataFormat::CSV;
414 }
415
416 if url.ends_with(".json") {
418 DataFormat::JSON
419 } else if url.ends_with(".csv") {
420 DataFormat::CSV
421 } else {
422 DataFormat::Auto
424 }
425 }
426
427 fn extract_json_path(
429 &self,
430 _table: DataTable,
431 json_path: &str,
432 bytes: &[u8],
433 ) -> Result<DataTable> {
434 let json_value: serde_json::Value = serde_json::from_slice(bytes)
436 .with_context(|| "Failed to parse JSON for path extraction")?;
437
438 let extracted = self
440 .navigate_json_path(&json_value, json_path)
441 .with_context(|| format!("Failed to extract JSON path: {}", json_path))?;
442
443 let array_value = match extracted {
446 serde_json::Value::Array(_) => extracted.clone(),
447 _ => serde_json::Value::Array(vec![extracted.clone()]),
448 };
449
450 let extracted_bytes = serde_json::to_vec(&array_value)?;
452
453 let reader = Cursor::new(extracted_bytes);
455 load_json_from_reader(reader, "extracted", "web", json_path)
456 }
457
458 fn navigate_json_path<'a>(
460 &self,
461 value: &'a serde_json::Value,
462 path: &str,
463 ) -> Result<&'a serde_json::Value> {
464 let mut current = value;
465
466 for part in path.split('.') {
469 if part.is_empty() {
470 continue;
471 }
472
473 current = current
474 .get(part)
475 .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", part))?;
476 }
477
478 Ok(current)
479 }
480
481 fn resolve_env_var(&self, value: &str) -> Result<String> {
483 let mut result = value.to_string();
484
485 let re = Regex::new(r"\$\{([^}]+)\}").unwrap();
488 for cap in re.captures_iter(value) {
489 let var_name = &cap[1];
490 match std::env::var(var_name) {
491 Ok(var_value) => {
492 result = result.replace(&cap[0], &var_value);
493 }
494 Err(_) => {
495 debug!(
498 "Environment variable {} not found, keeping placeholder",
499 var_name
500 );
501 }
502 }
503 }
504
505 if result.starts_with('$') && !result.starts_with("${") {
507 let var_name = &result[1..];
508 if let Ok(var_value) = std::env::var(var_name) {
509 return Ok(var_value);
510 }
511 }
512
513 Ok(result)
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520
521 #[test]
522 fn test_detect_format() {
523 let fetcher = WebDataFetcher::new().unwrap();
524
525 assert!(matches!(
527 fetcher.detect_format("http://example.com/data.csv", ""),
528 DataFormat::CSV
529 ));
530 assert!(matches!(
531 fetcher.detect_format("http://example.com/data.json", ""),
532 DataFormat::JSON
533 ));
534
535 assert!(matches!(
537 fetcher.detect_format("http://example.com/data", "application/json"),
538 DataFormat::JSON
539 ));
540 assert!(matches!(
541 fetcher.detect_format("http://example.com/data", "text/csv"),
542 DataFormat::CSV
543 ));
544
545 assert!(matches!(
547 fetcher.detect_format("http://example.com/data", ""),
548 DataFormat::Auto
549 ));
550 }
551}