ruvector_data_framework/
api_clients.rs

1//! Real API client integrations for OpenAlex, NOAA, and SEC EDGAR
2//!
3//! This module provides async clients for fetching data from public APIs
4//! and converting responses into RuVector's DataRecord format with embeddings.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use chrono::{NaiveDate, Utc};
12use reqwest::{Client, StatusCode};
13use serde::Deserialize;
14use tokio::time::sleep;
15
16use crate::{DataRecord, DataSource, FrameworkError, Relationship, Result};
17
18/// Rate limiting configuration
19const DEFAULT_RATE_LIMIT_DELAY_MS: u64 = 100;
20const MAX_RETRIES: u32 = 3;
21const RETRY_DELAY_MS: u64 = 1000;
22
23// ============================================================================
24// Simple Embedding Generator
25// ============================================================================
26
27/// Simple bag-of-words embedding generator
28pub struct SimpleEmbedder {
29    dimension: usize,
30}
31
32impl SimpleEmbedder {
33    /// Create a new embedder with specified dimension
34    pub fn new(dimension: usize) -> Self {
35        Self { dimension }
36    }
37
38    /// Generate embedding from text using simple bag-of-words
39    pub fn embed_text(&self, text: &str) -> Vec<f32> {
40        let lowercase_text = text.to_lowercase();
41        let words: Vec<&str> = lowercase_text
42            .split_whitespace()
43            .filter(|w| w.len() > 2)
44            .collect();
45
46        let mut embedding = vec![0.0f32; self.dimension];
47
48        // Simple hash-based bag-of-words
49        for word in words {
50            let hash = self.hash_word(word);
51            let idx = (hash % self.dimension as u64) as usize;
52            embedding[idx] += 1.0;
53        }
54
55        // Normalize
56        let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
57        if magnitude > 0.0 {
58            for val in &mut embedding {
59                *val /= magnitude;
60            }
61        }
62
63        embedding
64    }
65
66    /// Simple hash function for words
67    fn hash_word(&self, word: &str) -> u64 {
68        let mut hash = 5381u64;
69        for byte in word.bytes() {
70            hash = hash.wrapping_mul(33).wrapping_add(byte as u64);
71        }
72        hash
73    }
74
75    /// Generate embedding from JSON value
76    pub fn embed_json(&self, value: &serde_json::Value) -> Vec<f32> {
77        let text = self.extract_text_from_json(value);
78        self.embed_text(&text)
79    }
80
81    /// Extract text content from JSON
82    fn extract_text_from_json(&self, value: &serde_json::Value) -> String {
83        match value {
84            serde_json::Value::String(s) => s.clone(),
85            serde_json::Value::Object(map) => {
86                let mut text = String::new();
87                for (key, val) in map {
88                    text.push_str(key);
89                    text.push(' ');
90                    text.push_str(&self.extract_text_from_json(val));
91                    text.push(' ');
92                }
93                text
94            }
95            serde_json::Value::Array(arr) => {
96                arr.iter()
97                    .map(|v| self.extract_text_from_json(v))
98                    .collect::<Vec<_>>()
99                    .join(" ")
100            }
101            serde_json::Value::Number(n) => n.to_string(),
102            serde_json::Value::Bool(b) => b.to_string(),
103            serde_json::Value::Null => String::new(),
104        }
105    }
106}
107
108// ============================================================================
109// OpenAlex API Client
110// ============================================================================
111
112/// OpenAlex API response for works search
113#[derive(Debug, Deserialize)]
114struct OpenAlexWorksResponse {
115    results: Vec<OpenAlexWork>,
116    meta: OpenAlexMeta,
117}
118
119#[derive(Debug, Deserialize)]
120struct OpenAlexWork {
121    id: String,
122    title: Option<String>,
123    #[serde(rename = "display_name")]
124    display_name: Option<String>,
125    publication_date: Option<String>,
126    #[serde(rename = "authorships")]
127    authorships: Option<Vec<OpenAlexAuthorship>>,
128    #[serde(rename = "cited_by_count")]
129    cited_by_count: Option<i64>,
130    #[serde(rename = "concepts")]
131    concepts: Option<Vec<OpenAlexConcept>>,
132    #[serde(rename = "abstract_inverted_index")]
133    abstract_inverted_index: Option<HashMap<String, Vec<i32>>>,
134}
135
136#[derive(Debug, Deserialize)]
137struct OpenAlexAuthorship {
138    author: Option<OpenAlexAuthor>,
139}
140
141#[derive(Debug, Deserialize)]
142struct OpenAlexAuthor {
143    id: String,
144    #[serde(rename = "display_name")]
145    display_name: Option<String>,
146}
147
148#[derive(Debug, Deserialize)]
149struct OpenAlexConcept {
150    id: String,
151    #[serde(rename = "display_name")]
152    display_name: Option<String>,
153    score: Option<f64>,
154}
155
156#[derive(Debug, Deserialize)]
157struct OpenAlexMeta {
158    count: i64,
159}
160
161/// OpenAlex topics response
162#[derive(Debug, Deserialize)]
163struct OpenAlexTopicsResponse {
164    results: Vec<OpenAlexTopic>,
165}
166
167#[derive(Debug, Deserialize)]
168struct OpenAlexTopic {
169    id: String,
170    #[serde(rename = "display_name")]
171    display_name: String,
172    description: Option<String>,
173    #[serde(rename = "works_count")]
174    works_count: Option<i64>,
175}
176
177/// Client for OpenAlex academic database
178pub struct OpenAlexClient {
179    client: Client,
180    base_url: String,
181    rate_limit_delay: Duration,
182    embedder: Arc<SimpleEmbedder>,
183    user_email: Option<String>,
184}
185
186impl OpenAlexClient {
187    /// Create a new OpenAlex client
188    ///
189    /// # Arguments
190    /// * `user_email` - Email for polite API usage (optional but recommended)
191    pub fn new(user_email: Option<String>) -> Result<Self> {
192        let client = Client::builder()
193            .timeout(Duration::from_secs(30))
194            .build()
195            .map_err(|e| FrameworkError::Network(e))?;
196
197        Ok(Self {
198            client,
199            base_url: "https://api.openalex.org".to_string(),
200            rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
201            embedder: Arc::new(SimpleEmbedder::new(128)),
202            user_email,
203        })
204    }
205
206    /// Fetch academic works by query
207    ///
208    /// # Arguments
209    /// * `query` - Search query (title, abstract, etc.)
210    /// * `limit` - Maximum number of results
211    pub async fn fetch_works(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
212        let mut url = format!("{}/works?search={}", self.base_url, urlencoding::encode(query));
213        url.push_str(&format!("&per-page={}", limit.min(200)));
214
215        if let Some(email) = &self.user_email {
216            url.push_str(&format!("&mailto={}", email));
217        }
218
219        let response = self.fetch_with_retry(&url).await?;
220        let works_response: OpenAlexWorksResponse = response.json().await?;
221
222        let mut records = Vec::new();
223        for work in works_response.results {
224            let record = self.work_to_record(work)?;
225            records.push(record);
226            sleep(self.rate_limit_delay).await;
227        }
228
229        Ok(records)
230    }
231
232    /// Fetch topics by domain
233    pub async fn fetch_topics(&self, domain: &str) -> Result<Vec<DataRecord>> {
234        let mut url = format!(
235            "{}/topics?search={}",
236            self.base_url,
237            urlencoding::encode(domain)
238        );
239        url.push_str("&per-page=50");
240
241        if let Some(email) = &self.user_email {
242            url.push_str(&format!("&mailto={}", email));
243        }
244
245        let response = self.fetch_with_retry(&url).await?;
246        let topics_response: OpenAlexTopicsResponse = response.json().await?;
247
248        let mut records = Vec::new();
249        for topic in topics_response.results {
250            let record = self.topic_to_record(topic)?;
251            records.push(record);
252            sleep(self.rate_limit_delay).await;
253        }
254
255        Ok(records)
256    }
257
258    /// Convert OpenAlex work to DataRecord
259    fn work_to_record(&self, work: OpenAlexWork) -> Result<DataRecord> {
260        let title = work
261            .display_name
262            .or(work.title)
263            .unwrap_or_else(|| "Untitled".to_string());
264
265        // Reconstruct abstract from inverted index
266        let abstract_text = work
267            .abstract_inverted_index
268            .as_ref()
269            .map(|index| self.reconstruct_abstract(index))
270            .unwrap_or_default();
271
272        // Create text for embedding
273        let text = format!("{} {}", title, abstract_text);
274        let embedding = self.embedder.embed_text(&text);
275
276        // Parse publication date
277        let timestamp = work
278            .publication_date
279            .as_ref()
280            .and_then(|d| NaiveDate::parse_from_str(d, "%Y-%m-%d").ok())
281            .map(|d| d.and_hms_opt(0, 0, 0).unwrap().and_utc())
282            .unwrap_or_else(Utc::now);
283
284        // Build relationships
285        let mut relationships = Vec::new();
286
287        // Author relationships
288        if let Some(authorships) = work.authorships {
289            for authorship in authorships {
290                if let Some(author) = authorship.author {
291                    relationships.push(Relationship {
292                        target_id: author.id,
293                        rel_type: "authored_by".to_string(),
294                        weight: 1.0,
295                        properties: {
296                            let mut props = HashMap::new();
297                            if let Some(name) = author.display_name {
298                                props.insert("author_name".to_string(), serde_json::json!(name));
299                            }
300                            props
301                        },
302                    });
303                }
304            }
305        }
306
307        // Concept relationships
308        if let Some(concepts) = work.concepts {
309            for concept in concepts {
310                relationships.push(Relationship {
311                    target_id: concept.id,
312                    rel_type: "has_concept".to_string(),
313                    weight: concept.score.unwrap_or(0.0),
314                    properties: {
315                        let mut props = HashMap::new();
316                        if let Some(name) = concept.display_name {
317                            props.insert("concept_name".to_string(), serde_json::json!(name));
318                        }
319                        props
320                    },
321                });
322            }
323        }
324
325        let mut data_map = serde_json::Map::new();
326        data_map.insert("title".to_string(), serde_json::json!(title));
327        data_map.insert("abstract".to_string(), serde_json::json!(abstract_text));
328        if let Some(citations) = work.cited_by_count {
329            data_map.insert("citations".to_string(), serde_json::json!(citations));
330        }
331
332        Ok(DataRecord {
333            id: work.id,
334            source: "openalex".to_string(),
335            record_type: "work".to_string(),
336            timestamp,
337            data: serde_json::Value::Object(data_map),
338            embedding: Some(embedding),
339            relationships,
340        })
341    }
342
343    /// Reconstruct abstract from inverted index
344    fn reconstruct_abstract(&self, inverted_index: &HashMap<String, Vec<i32>>) -> String {
345        let mut positions: Vec<(i32, String)> = Vec::new();
346        for (word, indices) in inverted_index {
347            for &pos in indices {
348                positions.push((pos, word.clone()));
349            }
350        }
351        positions.sort_by_key(|&(pos, _)| pos);
352        positions
353            .into_iter()
354            .map(|(_, word)| word)
355            .collect::<Vec<_>>()
356            .join(" ")
357    }
358
359    /// Convert topic to DataRecord
360    fn topic_to_record(&self, topic: OpenAlexTopic) -> Result<DataRecord> {
361        let text = format!(
362            "{} {}",
363            topic.display_name,
364            topic.description.as_deref().unwrap_or("")
365        );
366        let embedding = self.embedder.embed_text(&text);
367
368        let mut data_map = serde_json::Map::new();
369        data_map.insert(
370            "display_name".to_string(),
371            serde_json::json!(topic.display_name),
372        );
373        if let Some(desc) = topic.description {
374            data_map.insert("description".to_string(), serde_json::json!(desc));
375        }
376        if let Some(count) = topic.works_count {
377            data_map.insert("works_count".to_string(), serde_json::json!(count));
378        }
379
380        Ok(DataRecord {
381            id: topic.id,
382            source: "openalex".to_string(),
383            record_type: "topic".to_string(),
384            timestamp: Utc::now(),
385            data: serde_json::Value::Object(data_map),
386            embedding: Some(embedding),
387            relationships: Vec::new(),
388        })
389    }
390
391    /// Fetch with retry logic
392    async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
393        let mut retries = 0;
394        loop {
395            match self.client.get(url).send().await {
396                Ok(response) => {
397                    if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
398                    {
399                        retries += 1;
400                        sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
401                        continue;
402                    }
403                    return Ok(response);
404                }
405                Err(_) if retries < MAX_RETRIES => {
406                    retries += 1;
407                    sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
408                }
409                Err(e) => return Err(FrameworkError::Network(e)),
410            }
411        }
412    }
413}
414
415#[async_trait]
416impl DataSource for OpenAlexClient {
417    fn source_id(&self) -> &str {
418        "openalex"
419    }
420
421    async fn fetch_batch(
422        &self,
423        cursor: Option<String>,
424        batch_size: usize,
425    ) -> Result<(Vec<DataRecord>, Option<String>)> {
426        // Default to fetching works about "machine learning"
427        let query = cursor.as_deref().unwrap_or("machine learning");
428        let records = self.fetch_works(query, batch_size).await?;
429        Ok((records, None)) // No pagination cursor in this simple impl
430    }
431
432    async fn total_count(&self) -> Result<Option<u64>> {
433        Ok(None)
434    }
435
436    async fn health_check(&self) -> Result<bool> {
437        let response = self.client.get(&self.base_url).send().await?;
438        Ok(response.status().is_success())
439    }
440}
441
442// ============================================================================
443// NOAA Climate Data Client
444// ============================================================================
445
446/// NOAA NCDC API response
447#[derive(Debug, Deserialize)]
448struct NoaaResponse {
449    results: Vec<NoaaObservation>,
450}
451
452#[derive(Debug, Deserialize)]
453struct NoaaObservation {
454    station: String,
455    date: String,
456    datatype: String,
457    value: f64,
458    #[serde(default)]
459    attributes: String,
460}
461
462/// Client for NOAA climate data
463pub struct NoaaClient {
464    client: Client,
465    base_url: String,
466    api_token: Option<String>,
467    rate_limit_delay: Duration,
468    embedder: Arc<SimpleEmbedder>,
469}
470
471impl NoaaClient {
472    /// Create a new NOAA client
473    ///
474    /// # Arguments
475    /// * `api_token` - NOAA API token (get from https://www.ncdc.noaa.gov/cdo-web/token)
476    pub fn new(api_token: Option<String>) -> Result<Self> {
477        let client = Client::builder()
478            .timeout(Duration::from_secs(30))
479            .build()
480            .map_err(|e| FrameworkError::Network(e))?;
481
482        Ok(Self {
483            client,
484            base_url: "https://www.ncei.noaa.gov/cdo-web/api/v2".to_string(),
485            api_token,
486            rate_limit_delay: Duration::from_millis(200), // NOAA has stricter limits
487            embedder: Arc::new(SimpleEmbedder::new(128)),
488        })
489    }
490
491    /// Fetch climate data for a station
492    ///
493    /// # Arguments
494    /// * `station_id` - GHCND station ID (e.g., "GHCND:USW00094728" for NYC)
495    /// * `start_date` - Start date (YYYY-MM-DD)
496    /// * `end_date` - End date (YYYY-MM-DD)
497    pub async fn fetch_climate_data(
498        &self,
499        station_id: &str,
500        start_date: &str,
501        end_date: &str,
502    ) -> Result<Vec<DataRecord>> {
503        if self.api_token.is_none() {
504            // If no API token, return synthetic data for demo
505            return Ok(self.generate_synthetic_climate_data(station_id, start_date, end_date)?);
506        }
507
508        let url = format!(
509            "{}/data?datasetid=GHCND&stationid={}&startdate={}&enddate={}&limit=1000",
510            self.base_url, station_id, start_date, end_date
511        );
512
513        let mut request = self.client.get(&url);
514        if let Some(token) = &self.api_token {
515            request = request.header("token", token);
516        }
517
518        let response = self.fetch_with_retry(request).await?;
519        let noaa_response: NoaaResponse = response.json().await?;
520
521        let mut records = Vec::new();
522        for observation in noaa_response.results {
523            let record = self.observation_to_record(observation)?;
524            records.push(record);
525        }
526
527        Ok(records)
528    }
529
530    /// Generate synthetic climate data for demo purposes
531    fn generate_synthetic_climate_data(
532        &self,
533        station_id: &str,
534        start_date: &str,
535        _end_date: &str,
536    ) -> Result<Vec<DataRecord>> {
537        let mut records = Vec::new();
538        let datatypes = vec!["TMAX", "TMIN", "PRCP"];
539
540        // Generate a few synthetic observations
541        for (i, datatype) in datatypes.iter().enumerate() {
542            let value = match *datatype {
543                "TMAX" => 250.0 + (i as f64 * 10.0),
544                "TMIN" => 150.0 + (i as f64 * 10.0),
545                "PRCP" => 5.0 + (i as f64),
546                _ => 0.0,
547            };
548
549            let text = format!("{} {} {}", station_id, datatype, value);
550            let embedding = self.embedder.embed_text(&text);
551
552            let mut data_map = serde_json::Map::new();
553            data_map.insert("station".to_string(), serde_json::json!(station_id));
554            data_map.insert("datatype".to_string(), serde_json::json!(datatype));
555            data_map.insert("value".to_string(), serde_json::json!(value));
556            data_map.insert("unit".to_string(), serde_json::json!("tenths"));
557
558            records.push(DataRecord {
559                id: format!("{}_{}_{}_{}", station_id, datatype, start_date, i),
560                source: "noaa".to_string(),
561                record_type: "observation".to_string(),
562                timestamp: Utc::now(),
563                data: serde_json::Value::Object(data_map),
564                embedding: Some(embedding),
565                relationships: Vec::new(),
566            });
567        }
568
569        Ok(records)
570    }
571
572    /// Convert NOAA observation to DataRecord
573    fn observation_to_record(&self, obs: NoaaObservation) -> Result<DataRecord> {
574        let text = format!("{} {} {}", obs.station, obs.datatype, obs.value);
575        let embedding = self.embedder.embed_text(&text);
576
577        // Parse date
578        let timestamp = NaiveDate::parse_from_str(&obs.date, "%Y-%m-%dT%H:%M:%S")
579            .or_else(|_| NaiveDate::parse_from_str(&obs.date, "%Y-%m-%d"))
580            .ok()
581            .and_then(|d| d.and_hms_opt(0, 0, 0))
582            .map(|dt| dt.and_utc())
583            .unwrap_or_else(Utc::now);
584
585        let mut data_map = serde_json::Map::new();
586        data_map.insert("station".to_string(), serde_json::json!(obs.station));
587        data_map.insert("datatype".to_string(), serde_json::json!(obs.datatype));
588        data_map.insert("value".to_string(), serde_json::json!(obs.value));
589        data_map.insert("attributes".to_string(), serde_json::json!(obs.attributes));
590
591        Ok(DataRecord {
592            id: format!("{}_{}", obs.station, obs.date),
593            source: "noaa".to_string(),
594            record_type: "observation".to_string(),
595            timestamp,
596            data: serde_json::Value::Object(data_map),
597            embedding: Some(embedding),
598            relationships: Vec::new(),
599        })
600    }
601
602    /// Fetch with retry logic
603    async fn fetch_with_retry(&self, request: reqwest::RequestBuilder) -> Result<reqwest::Response> {
604        let mut retries = 0;
605        loop {
606            let req = request
607                .try_clone()
608                .ok_or_else(|| FrameworkError::Config("Failed to clone request".to_string()))?;
609
610            match req.send().await {
611                Ok(response) => {
612                    if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
613                    {
614                        retries += 1;
615                        sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
616                        continue;
617                    }
618                    return Ok(response);
619                }
620                Err(_) if retries < MAX_RETRIES => {
621                    retries += 1;
622                    sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
623                }
624                Err(e) => return Err(FrameworkError::Network(e)),
625            }
626        }
627    }
628}
629
630#[async_trait]
631impl DataSource for NoaaClient {
632    fn source_id(&self) -> &str {
633        "noaa"
634    }
635
636    async fn fetch_batch(
637        &self,
638        _cursor: Option<String>,
639        _batch_size: usize,
640    ) -> Result<(Vec<DataRecord>, Option<String>)> {
641        // Fetch sample climate data
642        let records = self
643            .fetch_climate_data("GHCND:USW00094728", "2024-01-01", "2024-01-31")
644            .await?;
645        Ok((records, None))
646    }
647
648    async fn total_count(&self) -> Result<Option<u64>> {
649        Ok(None)
650    }
651
652    async fn health_check(&self) -> Result<bool> {
653        Ok(true) // NOAA doesn't have a simple health endpoint
654    }
655}
656
657// ============================================================================
658// SEC EDGAR Client
659// ============================================================================
660
661/// SEC EDGAR filing metadata
662#[derive(Debug, Deserialize)]
663struct EdgarFilingData {
664    #[serde(default)]
665    filings: EdgarFilings,
666}
667
668#[derive(Debug, Default, Deserialize)]
669struct EdgarFilings {
670    #[serde(default)]
671    recent: EdgarRecent,
672}
673
674#[derive(Debug, Default, Deserialize)]
675struct EdgarRecent {
676    #[serde(rename = "accessionNumber", default)]
677    accession_number: Vec<String>,
678    #[serde(rename = "filingDate", default)]
679    filing_date: Vec<String>,
680    #[serde(rename = "reportDate", default)]
681    report_date: Vec<String>,
682    #[serde(default)]
683    form: Vec<String>,
684    #[serde(rename = "primaryDocument", default)]
685    primary_document: Vec<String>,
686}
687
688/// Client for SEC EDGAR filings
689pub struct EdgarClient {
690    client: Client,
691    base_url: String,
692    rate_limit_delay: Duration,
693    embedder: Arc<SimpleEmbedder>,
694    user_agent: String,
695}
696
697impl EdgarClient {
698    /// Create a new SEC EDGAR client
699    ///
700    /// # Arguments
701    /// * `user_agent` - User agent string (required by SEC, should include email)
702    pub fn new(user_agent: String) -> Result<Self> {
703        let client = Client::builder()
704            .timeout(Duration::from_secs(30))
705            .user_agent(&user_agent)
706            .build()
707            .map_err(|e| FrameworkError::Network(e))?;
708
709        Ok(Self {
710            client,
711            base_url: "https://data.sec.gov".to_string(),
712            rate_limit_delay: Duration::from_millis(100), // SEC requires 10 requests/second max
713            embedder: Arc::new(SimpleEmbedder::new(128)),
714            user_agent,
715        })
716    }
717
718    /// Fetch company filings by CIK
719    ///
720    /// # Arguments
721    /// * `cik` - Central Index Key (company identifier, e.g., "0000320193" for Apple)
722    /// * `form_type` - Optional form type filter (e.g., "10-K", "10-Q", "8-K")
723    pub async fn fetch_filings(
724        &self,
725        cik: &str,
726        form_type: Option<&str>,
727    ) -> Result<Vec<DataRecord>> {
728        // Pad CIK to 10 digits
729        let padded_cik = format!("{:0>10}", cik);
730
731        let url = format!(
732            "{}/submissions/CIK{}.json",
733            self.base_url, padded_cik
734        );
735
736        let response = self.fetch_with_retry(&url).await?;
737        let filing_data: EdgarFilingData = response.json().await?;
738
739        let mut records = Vec::new();
740        let recent = filing_data.filings.recent;
741
742        let count = recent.accession_number.len();
743        for i in 0..count.min(50) {
744            // Limit to 50 most recent
745            // Filter by form type if specified
746            if let Some(filter_form) = form_type {
747                if i < recent.form.len() && recent.form[i] != filter_form {
748                    continue;
749                }
750            }
751
752            let filing = EdgarFiling {
753                cik: padded_cik.clone(),
754                accession_number: recent.accession_number.get(i).cloned().unwrap_or_default(),
755                filing_date: recent.filing_date.get(i).cloned().unwrap_or_default(),
756                report_date: recent.report_date.get(i).cloned().unwrap_or_default(),
757                form: recent.form.get(i).cloned().unwrap_or_default(),
758                primary_document: recent.primary_document.get(i).cloned().unwrap_or_default(),
759            };
760
761            let record = self.filing_to_record(filing)?;
762            records.push(record);
763            sleep(self.rate_limit_delay).await;
764        }
765
766        Ok(records)
767    }
768
769    /// Convert filing to DataRecord
770    fn filing_to_record(&self, filing: EdgarFiling) -> Result<DataRecord> {
771        let text = format!(
772            "CIK {} Form {} filed on {} report date {}",
773            filing.cik, filing.form, filing.filing_date, filing.report_date
774        );
775        let embedding = self.embedder.embed_text(&text);
776
777        // Parse filing date
778        let timestamp = NaiveDate::parse_from_str(&filing.filing_date, "%Y-%m-%d")
779            .ok()
780            .and_then(|d| d.and_hms_opt(0, 0, 0))
781            .map(|dt| dt.and_utc())
782            .unwrap_or_else(Utc::now);
783
784        let mut data_map = serde_json::Map::new();
785        data_map.insert("cik".to_string(), serde_json::json!(filing.cik));
786        data_map.insert(
787            "accession_number".to_string(),
788            serde_json::json!(filing.accession_number),
789        );
790        data_map.insert(
791            "filing_date".to_string(),
792            serde_json::json!(filing.filing_date),
793        );
794        data_map.insert(
795            "report_date".to_string(),
796            serde_json::json!(filing.report_date),
797        );
798        data_map.insert("form".to_string(), serde_json::json!(filing.form));
799        data_map.insert(
800            "primary_document".to_string(),
801            serde_json::json!(filing.primary_document),
802        );
803
804        // Build filing URL
805        let filing_url = format!(
806            "https://www.sec.gov/cgi-bin/viewer?action=view&cik={}&accession_number={}&xbrl_type=v",
807            filing.cik, filing.accession_number
808        );
809        data_map.insert("filing_url".to_string(), serde_json::json!(filing_url));
810
811        Ok(DataRecord {
812            id: format!("{}_{}", filing.cik, filing.accession_number),
813            source: "edgar".to_string(),
814            record_type: filing.form,
815            timestamp,
816            data: serde_json::Value::Object(data_map),
817            embedding: Some(embedding),
818            relationships: Vec::new(),
819        })
820    }
821
822    /// Fetch with retry logic
823    async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
824        let mut retries = 0;
825        loop {
826            match self.client.get(url).send().await {
827                Ok(response) => {
828                    if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
829                    {
830                        retries += 1;
831                        sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
832                        continue;
833                    }
834                    return Ok(response);
835                }
836                Err(_) if retries < MAX_RETRIES => {
837                    retries += 1;
838                    sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
839                }
840                Err(e) => return Err(FrameworkError::Network(e)),
841            }
842        }
843    }
844}
845
846/// Internal structure for SEC filing
847struct EdgarFiling {
848    cik: String,
849    accession_number: String,
850    filing_date: String,
851    report_date: String,
852    form: String,
853    primary_document: String,
854}
855
856#[async_trait]
857impl DataSource for EdgarClient {
858    fn source_id(&self) -> &str {
859        "edgar"
860    }
861
862    async fn fetch_batch(
863        &self,
864        cursor: Option<String>,
865        _batch_size: usize,
866    ) -> Result<(Vec<DataRecord>, Option<String>)> {
867        // Default to Apple Inc (AAPL)
868        let cik = cursor.as_deref().unwrap_or("320193");
869        let records = self.fetch_filings(cik, None).await?;
870        Ok((records, None))
871    }
872
873    async fn total_count(&self) -> Result<Option<u64>> {
874        Ok(None)
875    }
876
877    async fn health_check(&self) -> Result<bool> {
878        Ok(true)
879    }
880}
881
882// ============================================================================
883// Tests
884// ============================================================================
885
886#[cfg(test)]
887mod tests {
888    use super::*;
889
890    #[test]
891    fn test_simple_embedder() {
892        let embedder = SimpleEmbedder::new(128);
893        let embedding = embedder.embed_text("machine learning artificial intelligence");
894
895        assert_eq!(embedding.len(), 128);
896
897        // Check normalization
898        let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
899        assert!((magnitude - 1.0).abs() < 0.01);
900    }
901
902    #[test]
903    fn test_embedder_json() {
904        let embedder = SimpleEmbedder::new(64);
905        let json = serde_json::json!({
906            "title": "Test Document",
907            "content": "Some interesting content here"
908        });
909
910        let embedding = embedder.embed_json(&json);
911        assert_eq!(embedding.len(), 64);
912    }
913
914    #[tokio::test]
915    async fn test_openalex_client_creation() {
916        let client = OpenAlexClient::new(Some("test@example.com".to_string()));
917        assert!(client.is_ok());
918    }
919
920    #[tokio::test]
921    async fn test_noaa_client_creation() {
922        let client = NoaaClient::new(None);
923        assert!(client.is_ok());
924    }
925
926    #[tokio::test]
927    async fn test_noaa_synthetic_data() {
928        let client = NoaaClient::new(None).unwrap();
929        let records = client
930            .fetch_climate_data("GHCND:TEST", "2024-01-01", "2024-01-31")
931            .await
932            .unwrap();
933
934        assert!(!records.is_empty());
935        assert_eq!(records[0].source, "noaa");
936        assert!(records[0].embedding.is_some());
937    }
938
939    #[tokio::test]
940    async fn test_edgar_client_creation() {
941        let client = EdgarClient::new("test-agent test@example.com".to_string());
942        assert!(client.is_ok());
943    }
944}