ruvector_data_framework/
api_clients.rs

1//! Real API client integrations for OpenAlex, NOAA, and SEC EDGAR
2//!
3//! This module provides async clients for fetching data from public APIs
4//! and converting responses into RuVector's DataRecord format with embeddings.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use chrono::{NaiveDate, Utc};
12use reqwest::{Client, StatusCode};
13use serde::Deserialize;
14use tokio::time::sleep;
15
16use crate::{DataRecord, DataSource, FrameworkError, Relationship, Result};
17
18/// Rate limiting configuration
19const DEFAULT_RATE_LIMIT_DELAY_MS: u64 = 100;
20const MAX_RETRIES: u32 = 3;
21const RETRY_DELAY_MS: u64 = 1000;
22
23// ============================================================================
24// Simple Embedding Generator
25// ============================================================================
26
27/// Simple bag-of-words embedding generator
28pub struct SimpleEmbedder {
29    dimension: usize,
30}
31
32impl SimpleEmbedder {
33    /// Create a new embedder with specified dimension
34    pub fn new(dimension: usize) -> Self {
35        Self { dimension }
36    }
37
38    /// Generate embedding from text using simple bag-of-words
39    pub fn embed_text(&self, text: &str) -> Vec<f32> {
40        let lowercase_text = text.to_lowercase();
41        let words: Vec<&str> = lowercase_text
42            .split_whitespace()
43            .filter(|w| w.len() > 2)
44            .collect();
45
46        let mut embedding = vec![0.0f32; self.dimension];
47
48        // Simple hash-based bag-of-words
49        for word in words {
50            let hash = self.hash_word(word);
51            let idx = (hash % self.dimension as u64) as usize;
52            embedding[idx] += 1.0;
53        }
54
55        // Normalize
56        let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
57        if magnitude > 0.0 {
58            for val in &mut embedding {
59                *val /= magnitude;
60            }
61        }
62
63        embedding
64    }
65
66    /// Simple hash function for words
67    fn hash_word(&self, word: &str) -> u64 {
68        let mut hash = 5381u64;
69        for byte in word.bytes() {
70            hash = hash.wrapping_mul(33).wrapping_add(byte as u64);
71        }
72        hash
73    }
74
75    /// Generate embedding from JSON value
76    pub fn embed_json(&self, value: &serde_json::Value) -> Vec<f32> {
77        let text = self.extract_text_from_json(value);
78        self.embed_text(&text)
79    }
80
81    /// Extract text content from JSON
82    fn extract_text_from_json(&self, value: &serde_json::Value) -> String {
83        match value {
84            serde_json::Value::String(s) => s.clone(),
85            serde_json::Value::Object(map) => {
86                let mut text = String::new();
87                for (key, val) in map {
88                    text.push_str(key);
89                    text.push(' ');
90                    text.push_str(&self.extract_text_from_json(val));
91                    text.push(' ');
92                }
93                text
94            }
95            serde_json::Value::Array(arr) => {
96                arr.iter()
97                    .map(|v| self.extract_text_from_json(v))
98                    .collect::<Vec<_>>()
99                    .join(" ")
100            }
101            serde_json::Value::Number(n) => n.to_string(),
102            serde_json::Value::Bool(b) => b.to_string(),
103            serde_json::Value::Null => String::new(),
104        }
105    }
106}
107
108// ============================================================================
109// ONNX Semantic Embedder (Optional Feature)
110// ============================================================================
111
112/// ONNX-based semantic embedder for high-quality embeddings
113/// Requires the `onnx-embeddings` feature flag
114#[cfg(feature = "onnx-embeddings")]
115pub struct OnnxEmbedder {
116    embedder: std::sync::RwLock<ruvector_onnx_embeddings::Embedder>,
117}
118
119#[cfg(feature = "onnx-embeddings")]
120impl OnnxEmbedder {
121    /// Create a new ONNX embedder with the default model (all-MiniLM-L6-v2)
122    pub async fn new() -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
123        let embedder = ruvector_onnx_embeddings::Embedder::default_model().await?;
124        Ok(Self {
125            embedder: std::sync::RwLock::new(embedder),
126        })
127    }
128
129    /// Create with a specific pretrained model
130    pub async fn with_model(
131        model: ruvector_onnx_embeddings::PretrainedModel,
132    ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
133        let embedder = ruvector_onnx_embeddings::Embedder::pretrained(model).await?;
134        Ok(Self {
135            embedder: std::sync::RwLock::new(embedder),
136        })
137    }
138
139    /// Generate semantic embedding from text
140    pub fn embed_text(&self, text: &str) -> Vec<f32> {
141        let mut embedder = self.embedder.write().unwrap();
142        embedder.embed_one(text).unwrap_or_else(|_| vec![0.0; 384])
143    }
144
145    /// Generate embeddings for multiple texts (batch processing)
146    pub fn embed_batch(&self, texts: &[&str]) -> Vec<Vec<f32>> {
147        let mut embedder = self.embedder.write().unwrap();
148        match embedder.embed(texts) {
149            Ok(output) => (0..texts.len())
150                .map(|i| output.get(i).unwrap_or(&vec![0.0; 384]).clone())
151                .collect(),
152            Err(_) => texts.iter().map(|_| vec![0.0; 384]).collect(),
153        }
154    }
155
156    /// Get the embedding dimension (384 for MiniLM, 768 for larger models)
157    pub fn dimension(&self) -> usize {
158        let embedder = self.embedder.read().unwrap();
159        embedder.dimension()
160    }
161
162    /// Compute cosine similarity between two texts
163    pub fn similarity(&self, text1: &str, text2: &str) -> f32 {
164        let mut embedder = self.embedder.write().unwrap();
165        embedder.similarity(text1, text2).unwrap_or(0.0)
166    }
167
168    /// Generate embedding from JSON value by extracting text
169    pub fn embed_json(&self, value: &serde_json::Value) -> Vec<f32> {
170        let text = extract_text_from_json(value);
171        self.embed_text(&text)
172    }
173}
174
175/// Helper to extract text from JSON (used by both embedders)
176fn extract_text_from_json(value: &serde_json::Value) -> String {
177    match value {
178        serde_json::Value::String(s) => s.clone(),
179        serde_json::Value::Object(map) => {
180            let mut text = String::new();
181            for (key, val) in map {
182                text.push_str(key);
183                text.push(' ');
184                text.push_str(&extract_text_from_json(val));
185                text.push(' ');
186            }
187            text
188        }
189        serde_json::Value::Array(arr) => arr
190            .iter()
191            .map(|v| extract_text_from_json(v))
192            .collect::<Vec<_>>()
193            .join(" "),
194        serde_json::Value::Number(n) => n.to_string(),
195        serde_json::Value::Bool(b) => b.to_string(),
196        serde_json::Value::Null => String::new(),
197    }
198}
199
200/// Unified embedder trait for both SimpleEmbedder and OnnxEmbedder
201pub trait Embedder: Send + Sync {
202    /// Generate embedding from text
203    fn embed(&self, text: &str) -> Vec<f32>;
204    /// Get embedding dimension
205    fn dim(&self) -> usize;
206}
207
208impl Embedder for SimpleEmbedder {
209    fn embed(&self, text: &str) -> Vec<f32> {
210        self.embed_text(text)
211    }
212    fn dim(&self) -> usize {
213        self.dimension
214    }
215}
216
217#[cfg(feature = "onnx-embeddings")]
218impl Embedder for OnnxEmbedder {
219    fn embed(&self, text: &str) -> Vec<f32> {
220        self.embed_text(text)
221    }
222    fn dim(&self) -> usize {
223        self.dimension()
224    }
225}
226
227// ============================================================================
228// OpenAlex API Client
229// ============================================================================
230
231/// OpenAlex API response for works search
232#[derive(Debug, Deserialize)]
233struct OpenAlexWorksResponse {
234    results: Vec<OpenAlexWork>,
235    meta: OpenAlexMeta,
236}
237
238#[derive(Debug, Deserialize)]
239struct OpenAlexWork {
240    id: String,
241    title: Option<String>,
242    #[serde(rename = "display_name")]
243    display_name: Option<String>,
244    publication_date: Option<String>,
245    #[serde(rename = "authorships")]
246    authorships: Option<Vec<OpenAlexAuthorship>>,
247    #[serde(rename = "cited_by_count")]
248    cited_by_count: Option<i64>,
249    #[serde(rename = "concepts")]
250    concepts: Option<Vec<OpenAlexConcept>>,
251    #[serde(rename = "abstract_inverted_index")]
252    abstract_inverted_index: Option<HashMap<String, Vec<i32>>>,
253}
254
255#[derive(Debug, Deserialize)]
256struct OpenAlexAuthorship {
257    author: Option<OpenAlexAuthor>,
258}
259
260#[derive(Debug, Deserialize)]
261struct OpenAlexAuthor {
262    id: String,
263    #[serde(rename = "display_name")]
264    display_name: Option<String>,
265}
266
267#[derive(Debug, Deserialize)]
268struct OpenAlexConcept {
269    id: String,
270    #[serde(rename = "display_name")]
271    display_name: Option<String>,
272    score: Option<f64>,
273}
274
275#[derive(Debug, Deserialize)]
276struct OpenAlexMeta {
277    count: i64,
278}
279
280/// OpenAlex topics response
281#[derive(Debug, Deserialize)]
282struct OpenAlexTopicsResponse {
283    results: Vec<OpenAlexTopic>,
284}
285
286#[derive(Debug, Deserialize)]
287struct OpenAlexTopic {
288    id: String,
289    #[serde(rename = "display_name")]
290    display_name: String,
291    description: Option<String>,
292    #[serde(rename = "works_count")]
293    works_count: Option<i64>,
294}
295
296/// Client for OpenAlex academic database
297pub struct OpenAlexClient {
298    client: Client,
299    base_url: String,
300    rate_limit_delay: Duration,
301    embedder: Arc<SimpleEmbedder>,
302    user_email: Option<String>,
303}
304
305impl OpenAlexClient {
306    /// Create a new OpenAlex client
307    ///
308    /// # Arguments
309    /// * `user_email` - Email for polite API usage (optional but recommended)
310    pub fn new(user_email: Option<String>) -> Result<Self> {
311        let client = Client::builder()
312            .timeout(Duration::from_secs(30))
313            .build()
314            .map_err(|e| FrameworkError::Network(e))?;
315
316        Ok(Self {
317            client,
318            base_url: "https://api.openalex.org".to_string(),
319            rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
320            embedder: Arc::new(SimpleEmbedder::new(128)),
321            user_email,
322        })
323    }
324
325    /// Fetch academic works by query
326    ///
327    /// # Arguments
328    /// * `query` - Search query (title, abstract, etc.)
329    /// * `limit` - Maximum number of results
330    pub async fn fetch_works(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
331        let mut url = format!("{}/works?search={}", self.base_url, urlencoding::encode(query));
332        url.push_str(&format!("&per-page={}", limit.min(200)));
333
334        if let Some(email) = &self.user_email {
335            url.push_str(&format!("&mailto={}", email));
336        }
337
338        let response = self.fetch_with_retry(&url).await?;
339        let works_response: OpenAlexWorksResponse = response.json().await?;
340
341        let mut records = Vec::new();
342        for work in works_response.results {
343            let record = self.work_to_record(work)?;
344            records.push(record);
345            sleep(self.rate_limit_delay).await;
346        }
347
348        Ok(records)
349    }
350
351    /// Fetch topics by domain
352    pub async fn fetch_topics(&self, domain: &str) -> Result<Vec<DataRecord>> {
353        let mut url = format!(
354            "{}/topics?search={}",
355            self.base_url,
356            urlencoding::encode(domain)
357        );
358        url.push_str("&per-page=50");
359
360        if let Some(email) = &self.user_email {
361            url.push_str(&format!("&mailto={}", email));
362        }
363
364        let response = self.fetch_with_retry(&url).await?;
365        let topics_response: OpenAlexTopicsResponse = response.json().await?;
366
367        let mut records = Vec::new();
368        for topic in topics_response.results {
369            let record = self.topic_to_record(topic)?;
370            records.push(record);
371            sleep(self.rate_limit_delay).await;
372        }
373
374        Ok(records)
375    }
376
377    /// Convert OpenAlex work to DataRecord
378    fn work_to_record(&self, work: OpenAlexWork) -> Result<DataRecord> {
379        let title = work
380            .display_name
381            .or(work.title)
382            .unwrap_or_else(|| "Untitled".to_string());
383
384        // Reconstruct abstract from inverted index
385        let abstract_text = work
386            .abstract_inverted_index
387            .as_ref()
388            .map(|index| self.reconstruct_abstract(index))
389            .unwrap_or_default();
390
391        // Create text for embedding
392        let text = format!("{} {}", title, abstract_text);
393        let embedding = self.embedder.embed_text(&text);
394
395        // Parse publication date
396        let timestamp = work
397            .publication_date
398            .as_ref()
399            .and_then(|d| NaiveDate::parse_from_str(d, "%Y-%m-%d").ok())
400            .map(|d| d.and_hms_opt(0, 0, 0).unwrap().and_utc())
401            .unwrap_or_else(Utc::now);
402
403        // Build relationships
404        let mut relationships = Vec::new();
405
406        // Author relationships
407        if let Some(authorships) = work.authorships {
408            for authorship in authorships {
409                if let Some(author) = authorship.author {
410                    relationships.push(Relationship {
411                        target_id: author.id,
412                        rel_type: "authored_by".to_string(),
413                        weight: 1.0,
414                        properties: {
415                            let mut props = HashMap::new();
416                            if let Some(name) = author.display_name {
417                                props.insert("author_name".to_string(), serde_json::json!(name));
418                            }
419                            props
420                        },
421                    });
422                }
423            }
424        }
425
426        // Concept relationships
427        if let Some(concepts) = work.concepts {
428            for concept in concepts {
429                relationships.push(Relationship {
430                    target_id: concept.id,
431                    rel_type: "has_concept".to_string(),
432                    weight: concept.score.unwrap_or(0.0),
433                    properties: {
434                        let mut props = HashMap::new();
435                        if let Some(name) = concept.display_name {
436                            props.insert("concept_name".to_string(), serde_json::json!(name));
437                        }
438                        props
439                    },
440                });
441            }
442        }
443
444        let mut data_map = serde_json::Map::new();
445        data_map.insert("title".to_string(), serde_json::json!(title));
446        data_map.insert("abstract".to_string(), serde_json::json!(abstract_text));
447        if let Some(citations) = work.cited_by_count {
448            data_map.insert("citations".to_string(), serde_json::json!(citations));
449        }
450
451        Ok(DataRecord {
452            id: work.id,
453            source: "openalex".to_string(),
454            record_type: "work".to_string(),
455            timestamp,
456            data: serde_json::Value::Object(data_map),
457            embedding: Some(embedding),
458            relationships,
459        })
460    }
461
462    /// Reconstruct abstract from inverted index
463    fn reconstruct_abstract(&self, inverted_index: &HashMap<String, Vec<i32>>) -> String {
464        let mut positions: Vec<(i32, String)> = Vec::new();
465        for (word, indices) in inverted_index {
466            for &pos in indices {
467                positions.push((pos, word.clone()));
468            }
469        }
470        positions.sort_by_key(|&(pos, _)| pos);
471        positions
472            .into_iter()
473            .map(|(_, word)| word)
474            .collect::<Vec<_>>()
475            .join(" ")
476    }
477
478    /// Convert topic to DataRecord
479    fn topic_to_record(&self, topic: OpenAlexTopic) -> Result<DataRecord> {
480        let text = format!(
481            "{} {}",
482            topic.display_name,
483            topic.description.as_deref().unwrap_or("")
484        );
485        let embedding = self.embedder.embed_text(&text);
486
487        let mut data_map = serde_json::Map::new();
488        data_map.insert(
489            "display_name".to_string(),
490            serde_json::json!(topic.display_name),
491        );
492        if let Some(desc) = topic.description {
493            data_map.insert("description".to_string(), serde_json::json!(desc));
494        }
495        if let Some(count) = topic.works_count {
496            data_map.insert("works_count".to_string(), serde_json::json!(count));
497        }
498
499        Ok(DataRecord {
500            id: topic.id,
501            source: "openalex".to_string(),
502            record_type: "topic".to_string(),
503            timestamp: Utc::now(),
504            data: serde_json::Value::Object(data_map),
505            embedding: Some(embedding),
506            relationships: Vec::new(),
507        })
508    }
509
510    /// Fetch with retry logic
511    async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
512        let mut retries = 0;
513        loop {
514            match self.client.get(url).send().await {
515                Ok(response) => {
516                    if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
517                    {
518                        retries += 1;
519                        sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
520                        continue;
521                    }
522                    return Ok(response);
523                }
524                Err(_) if retries < MAX_RETRIES => {
525                    retries += 1;
526                    sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
527                }
528                Err(e) => return Err(FrameworkError::Network(e)),
529            }
530        }
531    }
532}
533
534#[async_trait]
535impl DataSource for OpenAlexClient {
536    fn source_id(&self) -> &str {
537        "openalex"
538    }
539
540    async fn fetch_batch(
541        &self,
542        cursor: Option<String>,
543        batch_size: usize,
544    ) -> Result<(Vec<DataRecord>, Option<String>)> {
545        // Default to fetching works about "machine learning"
546        let query = cursor.as_deref().unwrap_or("machine learning");
547        let records = self.fetch_works(query, batch_size).await?;
548        Ok((records, None)) // No pagination cursor in this simple impl
549    }
550
551    async fn total_count(&self) -> Result<Option<u64>> {
552        Ok(None)
553    }
554
555    async fn health_check(&self) -> Result<bool> {
556        let response = self.client.get(&self.base_url).send().await?;
557        Ok(response.status().is_success())
558    }
559}
560
561// ============================================================================
562// NOAA Climate Data Client
563// ============================================================================
564
565/// NOAA NCDC API response
566#[derive(Debug, Deserialize)]
567struct NoaaResponse {
568    results: Vec<NoaaObservation>,
569}
570
571#[derive(Debug, Deserialize)]
572struct NoaaObservation {
573    station: String,
574    date: String,
575    datatype: String,
576    value: f64,
577    #[serde(default)]
578    attributes: String,
579}
580
581/// Client for NOAA climate data
582pub struct NoaaClient {
583    client: Client,
584    base_url: String,
585    api_token: Option<String>,
586    rate_limit_delay: Duration,
587    embedder: Arc<SimpleEmbedder>,
588}
589
590impl NoaaClient {
591    /// Create a new NOAA client
592    ///
593    /// # Arguments
594    /// * `api_token` - NOAA API token (get from https://www.ncdc.noaa.gov/cdo-web/token)
595    pub fn new(api_token: Option<String>) -> Result<Self> {
596        let client = Client::builder()
597            .timeout(Duration::from_secs(30))
598            .build()
599            .map_err(|e| FrameworkError::Network(e))?;
600
601        Ok(Self {
602            client,
603            base_url: "https://www.ncei.noaa.gov/cdo-web/api/v2".to_string(),
604            api_token,
605            rate_limit_delay: Duration::from_millis(200), // NOAA has stricter limits
606            embedder: Arc::new(SimpleEmbedder::new(128)),
607        })
608    }
609
610    /// Fetch climate data for a station
611    ///
612    /// # Arguments
613    /// * `station_id` - GHCND station ID (e.g., "GHCND:USW00094728" for NYC)
614    /// * `start_date` - Start date (YYYY-MM-DD)
615    /// * `end_date` - End date (YYYY-MM-DD)
616    pub async fn fetch_climate_data(
617        &self,
618        station_id: &str,
619        start_date: &str,
620        end_date: &str,
621    ) -> Result<Vec<DataRecord>> {
622        if self.api_token.is_none() {
623            // If no API token, return synthetic data for demo
624            return Ok(self.generate_synthetic_climate_data(station_id, start_date, end_date)?);
625        }
626
627        let url = format!(
628            "{}/data?datasetid=GHCND&stationid={}&startdate={}&enddate={}&limit=1000",
629            self.base_url, station_id, start_date, end_date
630        );
631
632        let mut request = self.client.get(&url);
633        if let Some(token) = &self.api_token {
634            request = request.header("token", token);
635        }
636
637        let response = self.fetch_with_retry(request).await?;
638        let noaa_response: NoaaResponse = response.json().await?;
639
640        let mut records = Vec::new();
641        for observation in noaa_response.results {
642            let record = self.observation_to_record(observation)?;
643            records.push(record);
644        }
645
646        Ok(records)
647    }
648
649    /// Generate synthetic climate data for demo purposes
650    fn generate_synthetic_climate_data(
651        &self,
652        station_id: &str,
653        start_date: &str,
654        _end_date: &str,
655    ) -> Result<Vec<DataRecord>> {
656        let mut records = Vec::new();
657        let datatypes = vec!["TMAX", "TMIN", "PRCP"];
658
659        // Generate a few synthetic observations
660        for (i, datatype) in datatypes.iter().enumerate() {
661            let value = match *datatype {
662                "TMAX" => 250.0 + (i as f64 * 10.0),
663                "TMIN" => 150.0 + (i as f64 * 10.0),
664                "PRCP" => 5.0 + (i as f64),
665                _ => 0.0,
666            };
667
668            let text = format!("{} {} {}", station_id, datatype, value);
669            let embedding = self.embedder.embed_text(&text);
670
671            let mut data_map = serde_json::Map::new();
672            data_map.insert("station".to_string(), serde_json::json!(station_id));
673            data_map.insert("datatype".to_string(), serde_json::json!(datatype));
674            data_map.insert("value".to_string(), serde_json::json!(value));
675            data_map.insert("unit".to_string(), serde_json::json!("tenths"));
676
677            records.push(DataRecord {
678                id: format!("{}_{}_{}_{}", station_id, datatype, start_date, i),
679                source: "noaa".to_string(),
680                record_type: "observation".to_string(),
681                timestamp: Utc::now(),
682                data: serde_json::Value::Object(data_map),
683                embedding: Some(embedding),
684                relationships: Vec::new(),
685            });
686        }
687
688        Ok(records)
689    }
690
691    /// Convert NOAA observation to DataRecord
692    fn observation_to_record(&self, obs: NoaaObservation) -> Result<DataRecord> {
693        let text = format!("{} {} {}", obs.station, obs.datatype, obs.value);
694        let embedding = self.embedder.embed_text(&text);
695
696        // Parse date
697        let timestamp = NaiveDate::parse_from_str(&obs.date, "%Y-%m-%dT%H:%M:%S")
698            .or_else(|_| NaiveDate::parse_from_str(&obs.date, "%Y-%m-%d"))
699            .ok()
700            .and_then(|d| d.and_hms_opt(0, 0, 0))
701            .map(|dt| dt.and_utc())
702            .unwrap_or_else(Utc::now);
703
704        let mut data_map = serde_json::Map::new();
705        data_map.insert("station".to_string(), serde_json::json!(obs.station));
706        data_map.insert("datatype".to_string(), serde_json::json!(obs.datatype));
707        data_map.insert("value".to_string(), serde_json::json!(obs.value));
708        data_map.insert("attributes".to_string(), serde_json::json!(obs.attributes));
709
710        Ok(DataRecord {
711            id: format!("{}_{}", obs.station, obs.date),
712            source: "noaa".to_string(),
713            record_type: "observation".to_string(),
714            timestamp,
715            data: serde_json::Value::Object(data_map),
716            embedding: Some(embedding),
717            relationships: Vec::new(),
718        })
719    }
720
721    /// Fetch with retry logic
722    async fn fetch_with_retry(&self, request: reqwest::RequestBuilder) -> Result<reqwest::Response> {
723        let mut retries = 0;
724        loop {
725            let req = request
726                .try_clone()
727                .ok_or_else(|| FrameworkError::Config("Failed to clone request".to_string()))?;
728
729            match req.send().await {
730                Ok(response) => {
731                    if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
732                    {
733                        retries += 1;
734                        sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
735                        continue;
736                    }
737                    return Ok(response);
738                }
739                Err(_) if retries < MAX_RETRIES => {
740                    retries += 1;
741                    sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
742                }
743                Err(e) => return Err(FrameworkError::Network(e)),
744            }
745        }
746    }
747}
748
749#[async_trait]
750impl DataSource for NoaaClient {
751    fn source_id(&self) -> &str {
752        "noaa"
753    }
754
755    async fn fetch_batch(
756        &self,
757        _cursor: Option<String>,
758        _batch_size: usize,
759    ) -> Result<(Vec<DataRecord>, Option<String>)> {
760        // Fetch sample climate data
761        let records = self
762            .fetch_climate_data("GHCND:USW00094728", "2024-01-01", "2024-01-31")
763            .await?;
764        Ok((records, None))
765    }
766
767    async fn total_count(&self) -> Result<Option<u64>> {
768        Ok(None)
769    }
770
771    async fn health_check(&self) -> Result<bool> {
772        Ok(true) // NOAA doesn't have a simple health endpoint
773    }
774}
775
776// ============================================================================
777// SEC EDGAR Client
778// ============================================================================
779
780/// SEC EDGAR filing metadata
781#[derive(Debug, Deserialize)]
782struct EdgarFilingData {
783    #[serde(default)]
784    filings: EdgarFilings,
785}
786
787#[derive(Debug, Default, Deserialize)]
788struct EdgarFilings {
789    #[serde(default)]
790    recent: EdgarRecent,
791}
792
793#[derive(Debug, Default, Deserialize)]
794struct EdgarRecent {
795    #[serde(rename = "accessionNumber", default)]
796    accession_number: Vec<String>,
797    #[serde(rename = "filingDate", default)]
798    filing_date: Vec<String>,
799    #[serde(rename = "reportDate", default)]
800    report_date: Vec<String>,
801    #[serde(default)]
802    form: Vec<String>,
803    #[serde(rename = "primaryDocument", default)]
804    primary_document: Vec<String>,
805}
806
807/// Client for SEC EDGAR filings
808pub struct EdgarClient {
809    client: Client,
810    base_url: String,
811    rate_limit_delay: Duration,
812    embedder: Arc<SimpleEmbedder>,
813    user_agent: String,
814}
815
816impl EdgarClient {
817    /// Create a new SEC EDGAR client
818    ///
819    /// # Arguments
820    /// * `user_agent` - User agent string (required by SEC, should include email)
821    pub fn new(user_agent: String) -> Result<Self> {
822        let client = Client::builder()
823            .timeout(Duration::from_secs(30))
824            .user_agent(&user_agent)
825            .build()
826            .map_err(|e| FrameworkError::Network(e))?;
827
828        Ok(Self {
829            client,
830            base_url: "https://data.sec.gov".to_string(),
831            rate_limit_delay: Duration::from_millis(100), // SEC requires 10 requests/second max
832            embedder: Arc::new(SimpleEmbedder::new(128)),
833            user_agent,
834        })
835    }
836
837    /// Fetch company filings by CIK
838    ///
839    /// # Arguments
840    /// * `cik` - Central Index Key (company identifier, e.g., "0000320193" for Apple)
841    /// * `form_type` - Optional form type filter (e.g., "10-K", "10-Q", "8-K")
842    pub async fn fetch_filings(
843        &self,
844        cik: &str,
845        form_type: Option<&str>,
846    ) -> Result<Vec<DataRecord>> {
847        // Pad CIK to 10 digits
848        let padded_cik = format!("{:0>10}", cik);
849
850        let url = format!(
851            "{}/submissions/CIK{}.json",
852            self.base_url, padded_cik
853        );
854
855        let response = self.fetch_with_retry(&url).await?;
856        let filing_data: EdgarFilingData = response.json().await?;
857
858        let mut records = Vec::new();
859        let recent = filing_data.filings.recent;
860
861        let count = recent.accession_number.len();
862        for i in 0..count.min(50) {
863            // Limit to 50 most recent
864            // Filter by form type if specified
865            if let Some(filter_form) = form_type {
866                if i < recent.form.len() && recent.form[i] != filter_form {
867                    continue;
868                }
869            }
870
871            let filing = EdgarFiling {
872                cik: padded_cik.clone(),
873                accession_number: recent.accession_number.get(i).cloned().unwrap_or_default(),
874                filing_date: recent.filing_date.get(i).cloned().unwrap_or_default(),
875                report_date: recent.report_date.get(i).cloned().unwrap_or_default(),
876                form: recent.form.get(i).cloned().unwrap_or_default(),
877                primary_document: recent.primary_document.get(i).cloned().unwrap_or_default(),
878            };
879
880            let record = self.filing_to_record(filing)?;
881            records.push(record);
882            sleep(self.rate_limit_delay).await;
883        }
884
885        Ok(records)
886    }
887
888    /// Convert filing to DataRecord
889    fn filing_to_record(&self, filing: EdgarFiling) -> Result<DataRecord> {
890        let text = format!(
891            "CIK {} Form {} filed on {} report date {}",
892            filing.cik, filing.form, filing.filing_date, filing.report_date
893        );
894        let embedding = self.embedder.embed_text(&text);
895
896        // Parse filing date
897        let timestamp = NaiveDate::parse_from_str(&filing.filing_date, "%Y-%m-%d")
898            .ok()
899            .and_then(|d| d.and_hms_opt(0, 0, 0))
900            .map(|dt| dt.and_utc())
901            .unwrap_or_else(Utc::now);
902
903        let mut data_map = serde_json::Map::new();
904        data_map.insert("cik".to_string(), serde_json::json!(filing.cik));
905        data_map.insert(
906            "accession_number".to_string(),
907            serde_json::json!(filing.accession_number),
908        );
909        data_map.insert(
910            "filing_date".to_string(),
911            serde_json::json!(filing.filing_date),
912        );
913        data_map.insert(
914            "report_date".to_string(),
915            serde_json::json!(filing.report_date),
916        );
917        data_map.insert("form".to_string(), serde_json::json!(filing.form));
918        data_map.insert(
919            "primary_document".to_string(),
920            serde_json::json!(filing.primary_document),
921        );
922
923        // Build filing URL
924        let filing_url = format!(
925            "https://www.sec.gov/cgi-bin/viewer?action=view&cik={}&accession_number={}&xbrl_type=v",
926            filing.cik, filing.accession_number
927        );
928        data_map.insert("filing_url".to_string(), serde_json::json!(filing_url));
929
930        Ok(DataRecord {
931            id: format!("{}_{}", filing.cik, filing.accession_number),
932            source: "edgar".to_string(),
933            record_type: filing.form,
934            timestamp,
935            data: serde_json::Value::Object(data_map),
936            embedding: Some(embedding),
937            relationships: Vec::new(),
938        })
939    }
940
941    /// Fetch with retry logic
942    async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
943        let mut retries = 0;
944        loop {
945            match self.client.get(url).send().await {
946                Ok(response) => {
947                    if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
948                    {
949                        retries += 1;
950                        sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
951                        continue;
952                    }
953                    return Ok(response);
954                }
955                Err(_) if retries < MAX_RETRIES => {
956                    retries += 1;
957                    sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
958                }
959                Err(e) => return Err(FrameworkError::Network(e)),
960            }
961        }
962    }
963}
964
965/// Internal structure for SEC filing
966struct EdgarFiling {
967    cik: String,
968    accession_number: String,
969    filing_date: String,
970    report_date: String,
971    form: String,
972    primary_document: String,
973}
974
975#[async_trait]
976impl DataSource for EdgarClient {
977    fn source_id(&self) -> &str {
978        "edgar"
979    }
980
981    async fn fetch_batch(
982        &self,
983        cursor: Option<String>,
984        _batch_size: usize,
985    ) -> Result<(Vec<DataRecord>, Option<String>)> {
986        // Default to Apple Inc (AAPL)
987        let cik = cursor.as_deref().unwrap_or("320193");
988        let records = self.fetch_filings(cik, None).await?;
989        Ok((records, None))
990    }
991
992    async fn total_count(&self) -> Result<Option<u64>> {
993        Ok(None)
994    }
995
996    async fn health_check(&self) -> Result<bool> {
997        Ok(true)
998    }
999}
1000
1001// ============================================================================
1002// Tests
1003// ============================================================================
1004
1005#[cfg(test)]
1006mod tests {
1007    use super::*;
1008
1009    #[test]
1010    fn test_simple_embedder() {
1011        let embedder = SimpleEmbedder::new(128);
1012        let embedding = embedder.embed_text("machine learning artificial intelligence");
1013
1014        assert_eq!(embedding.len(), 128);
1015
1016        // Check normalization
1017        let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
1018        assert!((magnitude - 1.0).abs() < 0.01);
1019    }
1020
1021    #[test]
1022    fn test_embedder_json() {
1023        let embedder = SimpleEmbedder::new(64);
1024        let json = serde_json::json!({
1025            "title": "Test Document",
1026            "content": "Some interesting content here"
1027        });
1028
1029        let embedding = embedder.embed_json(&json);
1030        assert_eq!(embedding.len(), 64);
1031    }
1032
1033    #[tokio::test]
1034    async fn test_openalex_client_creation() {
1035        let client = OpenAlexClient::new(Some("test@example.com".to_string()));
1036        assert!(client.is_ok());
1037    }
1038
1039    #[tokio::test]
1040    async fn test_noaa_client_creation() {
1041        let client = NoaaClient::new(None);
1042        assert!(client.is_ok());
1043    }
1044
1045    #[tokio::test]
1046    async fn test_noaa_synthetic_data() {
1047        let client = NoaaClient::new(None).unwrap();
1048        let records = client
1049            .fetch_climate_data("GHCND:TEST", "2024-01-01", "2024-01-31")
1050            .await
1051            .unwrap();
1052
1053        assert!(!records.is_empty());
1054        assert_eq!(records[0].source, "noaa");
1055        assert!(records[0].embedding.is_some());
1056    }
1057
1058    #[tokio::test]
1059    async fn test_edgar_client_creation() {
1060        let client = EdgarClient::new("test-agent test@example.com".to_string());
1061        assert!(client.is_ok());
1062    }
1063}