1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use chrono::{NaiveDate, Utc};
11use reqwest::{Client, StatusCode};
12use serde::Deserialize;
13use tokio::time::sleep;
14
15use crate::api_clients::SimpleEmbedder;
16use crate::ruvector_native::{Domain, SemanticVector};
17use crate::{FrameworkError, Result};
18
19const FRED_RATE_LIMIT_MS: u64 = 100; const WORLDBANK_RATE_LIMIT_MS: u64 = 100; const ALPHAVANTAGE_RATE_LIMIT_MS: u64 = 12000; const MAX_RETRIES: u32 = 3;
24const RETRY_DELAY_MS: u64 = 1000;
25
26#[derive(Debug, Deserialize)]
32struct FredObservationsResponse {
33 #[serde(default)]
34 observations: Vec<FredObservation>,
35 #[serde(default)]
36 error_code: Option<i32>,
37 #[serde(default)]
38 error_message: Option<String>,
39}
40
41#[derive(Debug, Deserialize)]
42struct FredObservation {
43 #[serde(default)]
44 date: String,
45 #[serde(default)]
46 value: String,
47}
48
49#[derive(Debug, Deserialize)]
51struct FredSeriesSearchResponse {
52 seriess: Vec<FredSeries>,
53}
54
55#[derive(Debug, Deserialize)]
56struct FredSeries {
57 id: String,
58 title: String,
59 #[serde(default)]
60 units: String,
61 #[serde(default)]
62 frequency: String,
63 #[serde(default)]
64 seasonal_adjustment: String,
65 #[serde(default)]
66 notes: String,
67}
68
69pub struct FredClient {
86 client: Client,
87 base_url: String,
88 api_key: Option<String>,
89 rate_limit_delay: Duration,
90 embedder: Arc<SimpleEmbedder>,
91}
92
93impl FredClient {
94 pub fn new(api_key: Option<String>) -> Result<Self> {
100 let client = Client::builder()
101 .timeout(Duration::from_secs(30))
102 .build()
103 .map_err(FrameworkError::Network)?;
104
105 Ok(Self {
106 client,
107 base_url: "https://api.stlouisfed.org/fred".to_string(),
108 api_key,
109 rate_limit_delay: Duration::from_millis(FRED_RATE_LIMIT_MS),
110 embedder: Arc::new(SimpleEmbedder::new(256)),
111 })
112 }
113
114 pub async fn get_series(
125 &self,
126 series_id: &str,
127 limit: Option<usize>,
128 ) -> Result<Vec<SemanticVector>> {
129 let api_key = self.api_key.as_ref().ok_or_else(|| {
131 FrameworkError::Config(
132 "FRED API key required. Get one at https://fred.stlouisfed.org/docs/api/api_key.html".to_string()
133 )
134 })?;
135
136 let mut url = format!(
137 "{}/series/observations?series_id={}&file_type=json&api_key={}",
138 self.base_url, series_id, api_key
139 );
140
141 if let Some(lim) = limit {
142 url.push_str(&format!("&limit={}", lim));
143 }
144
145 sleep(self.rate_limit_delay).await;
146 let response = self.fetch_with_retry(&url).await?;
147 let obs_response: FredObservationsResponse = response.json().await?;
148
149 if let Some(error_msg) = obs_response.error_message {
151 return Err(FrameworkError::Ingestion(format!("FRED API error: {}", error_msg)));
152 }
153
154 let mut vectors = Vec::new();
155 for obs in obs_response.observations {
156 let value = match obs.value.parse::<f64>() {
158 Ok(v) => v,
159 Err(_) => continue, };
161
162 let date = NaiveDate::parse_from_str(&obs.date, "%Y-%m-%d")
164 .ok()
165 .and_then(|d| d.and_hms_opt(0, 0, 0))
166 .map(|dt| dt.and_utc())
167 .unwrap_or_else(Utc::now);
168
169 let text = format!("{} on {}: {}", series_id, obs.date, value);
171 let embedding = self.embedder.embed_text(&text);
172
173 let mut metadata = HashMap::new();
174 metadata.insert("series_id".to_string(), series_id.to_string());
175 metadata.insert("date".to_string(), obs.date.clone());
176 metadata.insert("value".to_string(), value.to_string());
177 metadata.insert("source".to_string(), "fred".to_string());
178
179 vectors.push(SemanticVector {
180 id: format!("FRED:{}:{}", series_id, obs.date),
181 embedding,
182 domain: Domain::Economic,
183 timestamp: date,
184 metadata,
185 });
186 }
187
188 Ok(vectors)
189 }
190
191 pub async fn search_series(&self, keywords: &str) -> Result<Vec<SemanticVector>> {
201 let mut url = format!(
202 "{}/series/search?search_text={}&file_type=json&limit=50",
203 self.base_url,
204 urlencoding::encode(keywords)
205 );
206
207 if let Some(key) = &self.api_key {
208 url.push_str(&format!("&api_key={}", key));
209 }
210
211 sleep(self.rate_limit_delay).await;
212 let response = self.fetch_with_retry(&url).await?;
213 let search_response: FredSeriesSearchResponse = response.json().await?;
214
215 let mut vectors = Vec::new();
216 for series in search_response.seriess {
217 let text = format!(
219 "{} {} {} {}",
220 series.title, series.units, series.frequency, series.notes
221 );
222 let embedding = self.embedder.embed_text(&text);
223
224 let mut metadata = HashMap::new();
225 metadata.insert("series_id".to_string(), series.id.clone());
226 metadata.insert("title".to_string(), series.title.clone());
227 metadata.insert("units".to_string(), series.units);
228 metadata.insert("frequency".to_string(), series.frequency);
229 metadata.insert("seasonal_adjustment".to_string(), series.seasonal_adjustment);
230 metadata.insert("source".to_string(), "fred_search".to_string());
231
232 vectors.push(SemanticVector {
233 id: format!("FRED_SERIES:{}", series.id),
234 embedding,
235 domain: Domain::Economic,
236 timestamp: Utc::now(),
237 metadata,
238 });
239 }
240
241 Ok(vectors)
242 }
243
244 pub async fn get_gdp(&self) -> Result<Vec<SemanticVector>> {
251 self.get_series("GDP", Some(100)).await
252 }
253
254 pub async fn get_unemployment(&self) -> Result<Vec<SemanticVector>> {
261 self.get_series("UNRATE", Some(100)).await
262 }
263
264 pub async fn get_cpi(&self) -> Result<Vec<SemanticVector>> {
271 self.get_series("CPIAUCSL", Some(100)).await
272 }
273
274 pub async fn get_interest_rate(&self) -> Result<Vec<SemanticVector>> {
281 self.get_series("DFF", Some(100)).await
282 }
283
284 pub async fn get_money_supply(&self) -> Result<Vec<SemanticVector>> {
291 self.get_series("M2SL", Some(100)).await
292 }
293
294 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
296 let mut retries = 0;
297 loop {
298 match self.client.get(url).send().await {
299 Ok(response) => {
300 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES {
301 retries += 1;
302 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
303 continue;
304 }
305 return Ok(response);
306 }
307 Err(_) if retries < MAX_RETRIES => {
308 retries += 1;
309 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
310 }
311 Err(e) => return Err(FrameworkError::Network(e)),
312 }
313 }
314 }
315}
316
317#[derive(Debug, Deserialize)]
323struct WorldBankResponse {
324 #[serde(default)]
325 page: u32,
326 #[serde(default)]
327 pages: u32,
328 #[serde(default)]
329 per_page: u32,
330 #[serde(default)]
331 total: u32,
332}
333
334#[derive(Debug, Deserialize)]
336struct WorldBankIndicator {
337 indicator: WorldBankIndicatorInfo,
338 country: WorldBankCountryInfo,
339 #[serde(default)]
340 countryiso3code: String,
341 #[serde(default)]
342 date: String,
343 #[serde(default)]
344 value: Option<f64>,
345 #[serde(default)]
346 unit: String,
347 #[serde(default)]
348 obs_status: String,
349}
350
351#[derive(Debug, Deserialize)]
352struct WorldBankIndicatorInfo {
353 id: String,
354 value: String,
355}
356
357#[derive(Debug, Deserialize)]
358struct WorldBankCountryInfo {
359 id: String,
360 value: String,
361}
362
363pub struct WorldBankClient {
381 client: Client,
382 base_url: String,
383 rate_limit_delay: Duration,
384 embedder: Arc<SimpleEmbedder>,
385}
386
387impl WorldBankClient {
388 pub fn new() -> Result<Self> {
390 let client = Client::builder()
391 .timeout(Duration::from_secs(30))
392 .build()
393 .map_err(FrameworkError::Network)?;
394
395 Ok(Self {
396 client,
397 base_url: "https://api.worldbank.org/v2".to_string(),
398 rate_limit_delay: Duration::from_millis(WORLDBANK_RATE_LIMIT_MS),
399 embedder: Arc::new(SimpleEmbedder::new(256)),
400 })
401 }
402
403 pub async fn get_indicator(
415 &self,
416 country: &str,
417 indicator: &str,
418 ) -> Result<Vec<SemanticVector>> {
419 let url = format!(
420 "{}/country/{}/indicator/{}?format=json&per_page=100",
421 self.base_url, country, indicator
422 );
423
424 sleep(self.rate_limit_delay).await;
425 let response = self.fetch_with_retry(&url).await?;
426 let text = response.text().await?;
427
428 let json_values: Vec<serde_json::Value> = serde_json::from_str(&text)?;
430
431 if json_values.len() < 2 {
432 return Ok(Vec::new());
433 }
434
435 let indicators: Vec<WorldBankIndicator> = serde_json::from_value(json_values[1].clone())?;
436
437 let mut vectors = Vec::new();
438 for ind in indicators {
439 let value = match ind.value {
441 Some(v) => v,
442 None => continue,
443 };
444
445 let year = ind.date.parse::<i32>().unwrap_or(2020);
447 let date = NaiveDate::from_ymd_opt(year, 1, 1)
448 .and_then(|d| d.and_hms_opt(0, 0, 0))
449 .map(|dt| dt.and_utc())
450 .unwrap_or_else(Utc::now);
451
452 let text = format!(
454 "{} {} in {}: {}",
455 ind.country.value, ind.indicator.value, ind.date, value
456 );
457 let embedding = self.embedder.embed_text(&text);
458
459 let mut metadata = HashMap::new();
460 metadata.insert("country".to_string(), ind.country.value);
461 metadata.insert("country_code".to_string(), ind.countryiso3code.clone());
462 metadata.insert("indicator_id".to_string(), ind.indicator.id.clone());
463 metadata.insert("indicator_name".to_string(), ind.indicator.value);
464 metadata.insert("date".to_string(), ind.date.clone());
465 metadata.insert("value".to_string(), value.to_string());
466 metadata.insert("source".to_string(), "worldbank".to_string());
467
468 vectors.push(SemanticVector {
469 id: format!("WB:{}:{}:{}", ind.countryiso3code, ind.indicator.id, ind.date),
470 embedding,
471 domain: Domain::Economic,
472 timestamp: date,
473 metadata,
474 });
475 }
476
477 Ok(vectors)
478 }
479
480 pub async fn get_gdp_global(&self) -> Result<Vec<SemanticVector>> {
487 self.get_indicator("all", "NY.GDP.PCAP.CD").await
489 }
490
491 pub async fn get_climate_indicators(&self) -> Result<Vec<SemanticVector>> {
498 let mut vectors = self.get_indicator("all", "EN.ATM.CO2E.PC").await?;
500
501 sleep(self.rate_limit_delay).await;
503 let renewable = self.get_indicator("all", "EG.FEC.RNEW.ZS").await?;
504 vectors.extend(renewable);
505
506 Ok(vectors)
507 }
508
509 pub async fn get_health_indicators(&self) -> Result<Vec<SemanticVector>> {
516 let mut vectors = self.get_indicator("all", "SH.XPD.CHEX.GD.ZS").await?;
518
519 sleep(self.rate_limit_delay).await;
521 let life_exp = self.get_indicator("all", "SP.DYN.LE00.IN").await?;
522 vectors.extend(life_exp);
523
524 Ok(vectors)
525 }
526
527 pub async fn get_population(&self) -> Result<Vec<SemanticVector>> {
534 self.get_indicator("all", "SP.POP.TOTL").await
535 }
536
537 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
539 let mut retries = 0;
540 loop {
541 match self.client.get(url).send().await {
542 Ok(response) => {
543 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES {
544 retries += 1;
545 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
546 continue;
547 }
548 return Ok(response);
549 }
550 Err(_) if retries < MAX_RETRIES => {
551 retries += 1;
552 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
553 }
554 Err(e) => return Err(FrameworkError::Network(e)),
555 }
556 }
557 }
558}
559
560impl Default for WorldBankClient {
561 fn default() -> Self {
562 Self::new().expect("Failed to create WorldBank client")
563 }
564}
565
566#[derive(Debug, Deserialize)]
572struct AlphaVantageTimeSeriesResponse {
573 #[serde(rename = "Meta Data", default)]
574 meta_data: Option<serde_json::Value>,
575 #[serde(rename = "Time Series (Daily)", default)]
576 time_series: Option<HashMap<String, AlphaVantageDailyData>>,
577}
578
579#[derive(Debug, Deserialize)]
580struct AlphaVantageDailyData {
581 #[serde(rename = "1. open")]
582 open: String,
583 #[serde(rename = "2. high")]
584 high: String,
585 #[serde(rename = "3. low")]
586 low: String,
587 #[serde(rename = "4. close")]
588 close: String,
589 #[serde(rename = "5. volume")]
590 volume: String,
591}
592
593pub struct AlphaVantageClient {
610 client: Client,
611 base_url: String,
612 api_key: String,
613 rate_limit_delay: Duration,
614 embedder: Arc<SimpleEmbedder>,
615}
616
617impl AlphaVantageClient {
618 pub fn new(api_key: String) -> Result<Self> {
623 let client = Client::builder()
624 .timeout(Duration::from_secs(30))
625 .build()
626 .map_err(FrameworkError::Network)?;
627
628 Ok(Self {
629 client,
630 base_url: "https://www.alphavantage.co/query".to_string(),
631 api_key,
632 rate_limit_delay: Duration::from_millis(ALPHAVANTAGE_RATE_LIMIT_MS),
633 embedder: Arc::new(SimpleEmbedder::new(256)),
634 })
635 }
636
637 pub async fn get_daily_stock(&self, symbol: &str) -> Result<Vec<SemanticVector>> {
647 let url = format!(
648 "{}?function=TIME_SERIES_DAILY&symbol={}&apikey={}",
649 self.base_url, symbol, self.api_key
650 );
651
652 sleep(self.rate_limit_delay).await;
653 let response = self.fetch_with_retry(&url).await?;
654 let ts_response: AlphaVantageTimeSeriesResponse = response.json().await?;
655
656 let time_series = match ts_response.time_series {
657 Some(ts) => ts,
658 None => return Ok(Vec::new()),
659 };
660
661 let mut vectors = Vec::new();
662 for (date_str, data) in time_series.iter().take(100) {
663 let close = data.close.parse::<f64>().unwrap_or(0.0);
665 let volume = data.volume.parse::<f64>().unwrap_or(0.0);
666
667 let date = NaiveDate::parse_from_str(date_str, "%Y-%m-%d")
669 .ok()
670 .and_then(|d| d.and_hms_opt(0, 0, 0))
671 .map(|dt| dt.and_utc())
672 .unwrap_or_else(Utc::now);
673
674 let text = format!(
676 "{} stock on {}: close ${}, volume {}",
677 symbol, date_str, close, volume
678 );
679 let embedding = self.embedder.embed_text(&text);
680
681 let mut metadata = HashMap::new();
682 metadata.insert("symbol".to_string(), symbol.to_string());
683 metadata.insert("date".to_string(), date_str.clone());
684 metadata.insert("open".to_string(), data.open.clone());
685 metadata.insert("high".to_string(), data.high.clone());
686 metadata.insert("low".to_string(), data.low.clone());
687 metadata.insert("close".to_string(), data.close.clone());
688 metadata.insert("volume".to_string(), data.volume.clone());
689 metadata.insert("source".to_string(), "alphavantage".to_string());
690
691 vectors.push(SemanticVector {
692 id: format!("AV:{}:{}", symbol, date_str),
693 embedding,
694 domain: Domain::Finance,
695 timestamp: date,
696 metadata,
697 });
698 }
699
700 Ok(vectors)
701 }
702
703 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
705 let mut retries = 0;
706 loop {
707 match self.client.get(url).send().await {
708 Ok(response) => {
709 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES {
710 retries += 1;
711 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
712 continue;
713 }
714 return Ok(response);
715 }
716 Err(_) if retries < MAX_RETRIES => {
717 retries += 1;
718 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
719 }
720 Err(e) => return Err(FrameworkError::Network(e)),
721 }
722 }
723 }
724}
725
726#[cfg(test)]
731mod tests {
732 use super::*;
733
734 #[tokio::test]
735 async fn test_fred_client_creation() {
736 let client = FredClient::new(None);
737 assert!(client.is_ok());
738 }
739
740 #[tokio::test]
741 async fn test_fred_client_with_key() {
742 let client = FredClient::new(Some("test_key".to_string()));
743 assert!(client.is_ok());
744 }
745
746 #[tokio::test]
747 async fn test_worldbank_client_creation() {
748 let client = WorldBankClient::new();
749 assert!(client.is_ok());
750 }
751
752 #[tokio::test]
753 async fn test_alphavantage_client_creation() {
754 let client = AlphaVantageClient::new("test_key".to_string());
755 assert!(client.is_ok());
756 }
757
758 #[test]
759 fn test_rate_limiting() {
760 let fred = FredClient::new(None).unwrap();
762 assert_eq!(fred.rate_limit_delay, Duration::from_millis(FRED_RATE_LIMIT_MS));
763
764 let wb = WorldBankClient::new().unwrap();
765 assert_eq!(wb.rate_limit_delay, Duration::from_millis(WORLDBANK_RATE_LIMIT_MS));
766
767 let av = AlphaVantageClient::new("test".to_string()).unwrap();
768 assert_eq!(av.rate_limit_delay, Duration::from_millis(ALPHAVANTAGE_RATE_LIMIT_MS));
769 }
770}