1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use chrono::{NaiveDate, Utc};
12use reqwest::{Client, StatusCode};
13use serde::Deserialize;
14use tokio::time::sleep;
15
16use crate::{DataRecord, DataSource, FrameworkError, Relationship, Result};
17
18const DEFAULT_RATE_LIMIT_DELAY_MS: u64 = 100;
20const MAX_RETRIES: u32 = 3;
21const RETRY_DELAY_MS: u64 = 1000;
22
23pub struct SimpleEmbedder {
29 dimension: usize,
30}
31
32impl SimpleEmbedder {
33 pub fn new(dimension: usize) -> Self {
35 Self { dimension }
36 }
37
38 pub fn embed_text(&self, text: &str) -> Vec<f32> {
40 let lowercase_text = text.to_lowercase();
41 let words: Vec<&str> = lowercase_text
42 .split_whitespace()
43 .filter(|w| w.len() > 2)
44 .collect();
45
46 let mut embedding = vec![0.0f32; self.dimension];
47
48 for word in words {
50 let hash = self.hash_word(word);
51 let idx = (hash % self.dimension as u64) as usize;
52 embedding[idx] += 1.0;
53 }
54
55 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
57 if magnitude > 0.0 {
58 for val in &mut embedding {
59 *val /= magnitude;
60 }
61 }
62
63 embedding
64 }
65
66 fn hash_word(&self, word: &str) -> u64 {
68 let mut hash = 5381u64;
69 for byte in word.bytes() {
70 hash = hash.wrapping_mul(33).wrapping_add(byte as u64);
71 }
72 hash
73 }
74
75 pub fn embed_json(&self, value: &serde_json::Value) -> Vec<f32> {
77 let text = self.extract_text_from_json(value);
78 self.embed_text(&text)
79 }
80
81 fn extract_text_from_json(&self, value: &serde_json::Value) -> String {
83 match value {
84 serde_json::Value::String(s) => s.clone(),
85 serde_json::Value::Object(map) => {
86 let mut text = String::new();
87 for (key, val) in map {
88 text.push_str(key);
89 text.push(' ');
90 text.push_str(&self.extract_text_from_json(val));
91 text.push(' ');
92 }
93 text
94 }
95 serde_json::Value::Array(arr) => {
96 arr.iter()
97 .map(|v| self.extract_text_from_json(v))
98 .collect::<Vec<_>>()
99 .join(" ")
100 }
101 serde_json::Value::Number(n) => n.to_string(),
102 serde_json::Value::Bool(b) => b.to_string(),
103 serde_json::Value::Null => String::new(),
104 }
105 }
106}
107
108#[cfg(feature = "onnx-embeddings")]
115pub struct OnnxEmbedder {
116 embedder: std::sync::RwLock<ruvector_onnx_embeddings::Embedder>,
117}
118
119#[cfg(feature = "onnx-embeddings")]
120impl OnnxEmbedder {
121 pub async fn new() -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
123 let embedder = ruvector_onnx_embeddings::Embedder::default_model().await?;
124 Ok(Self {
125 embedder: std::sync::RwLock::new(embedder),
126 })
127 }
128
129 pub async fn with_model(
131 model: ruvector_onnx_embeddings::PretrainedModel,
132 ) -> std::result::Result<Self, Box<dyn std::error::Error + Send + Sync>> {
133 let embedder = ruvector_onnx_embeddings::Embedder::pretrained(model).await?;
134 Ok(Self {
135 embedder: std::sync::RwLock::new(embedder),
136 })
137 }
138
139 pub fn embed_text(&self, text: &str) -> Vec<f32> {
141 let mut embedder = self.embedder.write().unwrap();
142 embedder.embed_one(text).unwrap_or_else(|_| vec![0.0; 384])
143 }
144
145 pub fn embed_batch(&self, texts: &[&str]) -> Vec<Vec<f32>> {
147 let mut embedder = self.embedder.write().unwrap();
148 match embedder.embed(texts) {
149 Ok(output) => (0..texts.len())
150 .map(|i| output.get(i).unwrap_or(&vec![0.0; 384]).clone())
151 .collect(),
152 Err(_) => texts.iter().map(|_| vec![0.0; 384]).collect(),
153 }
154 }
155
156 pub fn dimension(&self) -> usize {
158 let embedder = self.embedder.read().unwrap();
159 embedder.dimension()
160 }
161
162 pub fn similarity(&self, text1: &str, text2: &str) -> f32 {
164 let mut embedder = self.embedder.write().unwrap();
165 embedder.similarity(text1, text2).unwrap_or(0.0)
166 }
167
168 pub fn embed_json(&self, value: &serde_json::Value) -> Vec<f32> {
170 let text = extract_text_from_json(value);
171 self.embed_text(&text)
172 }
173}
174
175fn extract_text_from_json(value: &serde_json::Value) -> String {
177 match value {
178 serde_json::Value::String(s) => s.clone(),
179 serde_json::Value::Object(map) => {
180 let mut text = String::new();
181 for (key, val) in map {
182 text.push_str(key);
183 text.push(' ');
184 text.push_str(&extract_text_from_json(val));
185 text.push(' ');
186 }
187 text
188 }
189 serde_json::Value::Array(arr) => arr
190 .iter()
191 .map(|v| extract_text_from_json(v))
192 .collect::<Vec<_>>()
193 .join(" "),
194 serde_json::Value::Number(n) => n.to_string(),
195 serde_json::Value::Bool(b) => b.to_string(),
196 serde_json::Value::Null => String::new(),
197 }
198}
199
200pub trait Embedder: Send + Sync {
202 fn embed(&self, text: &str) -> Vec<f32>;
204 fn dim(&self) -> usize;
206}
207
208impl Embedder for SimpleEmbedder {
209 fn embed(&self, text: &str) -> Vec<f32> {
210 self.embed_text(text)
211 }
212 fn dim(&self) -> usize {
213 self.dimension
214 }
215}
216
217#[cfg(feature = "onnx-embeddings")]
218impl Embedder for OnnxEmbedder {
219 fn embed(&self, text: &str) -> Vec<f32> {
220 self.embed_text(text)
221 }
222 fn dim(&self) -> usize {
223 self.dimension()
224 }
225}
226
227#[derive(Debug, Deserialize)]
233struct OpenAlexWorksResponse {
234 results: Vec<OpenAlexWork>,
235 meta: OpenAlexMeta,
236}
237
238#[derive(Debug, Deserialize)]
239struct OpenAlexWork {
240 id: String,
241 title: Option<String>,
242 #[serde(rename = "display_name")]
243 display_name: Option<String>,
244 publication_date: Option<String>,
245 #[serde(rename = "authorships")]
246 authorships: Option<Vec<OpenAlexAuthorship>>,
247 #[serde(rename = "cited_by_count")]
248 cited_by_count: Option<i64>,
249 #[serde(rename = "concepts")]
250 concepts: Option<Vec<OpenAlexConcept>>,
251 #[serde(rename = "abstract_inverted_index")]
252 abstract_inverted_index: Option<HashMap<String, Vec<i32>>>,
253}
254
255#[derive(Debug, Deserialize)]
256struct OpenAlexAuthorship {
257 author: Option<OpenAlexAuthor>,
258}
259
260#[derive(Debug, Deserialize)]
261struct OpenAlexAuthor {
262 id: String,
263 #[serde(rename = "display_name")]
264 display_name: Option<String>,
265}
266
267#[derive(Debug, Deserialize)]
268struct OpenAlexConcept {
269 id: String,
270 #[serde(rename = "display_name")]
271 display_name: Option<String>,
272 score: Option<f64>,
273}
274
275#[derive(Debug, Deserialize)]
276struct OpenAlexMeta {
277 count: i64,
278}
279
280#[derive(Debug, Deserialize)]
282struct OpenAlexTopicsResponse {
283 results: Vec<OpenAlexTopic>,
284}
285
286#[derive(Debug, Deserialize)]
287struct OpenAlexTopic {
288 id: String,
289 #[serde(rename = "display_name")]
290 display_name: String,
291 description: Option<String>,
292 #[serde(rename = "works_count")]
293 works_count: Option<i64>,
294}
295
296pub struct OpenAlexClient {
298 client: Client,
299 base_url: String,
300 rate_limit_delay: Duration,
301 embedder: Arc<SimpleEmbedder>,
302 user_email: Option<String>,
303}
304
305impl OpenAlexClient {
306 pub fn new(user_email: Option<String>) -> Result<Self> {
311 let client = Client::builder()
312 .timeout(Duration::from_secs(30))
313 .build()
314 .map_err(|e| FrameworkError::Network(e))?;
315
316 Ok(Self {
317 client,
318 base_url: "https://api.openalex.org".to_string(),
319 rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
320 embedder: Arc::new(SimpleEmbedder::new(128)),
321 user_email,
322 })
323 }
324
325 pub async fn fetch_works(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
331 let mut url = format!("{}/works?search={}", self.base_url, urlencoding::encode(query));
332 url.push_str(&format!("&per-page={}", limit.min(200)));
333
334 if let Some(email) = &self.user_email {
335 url.push_str(&format!("&mailto={}", email));
336 }
337
338 let response = self.fetch_with_retry(&url).await?;
339 let works_response: OpenAlexWorksResponse = response.json().await?;
340
341 let mut records = Vec::new();
342 for work in works_response.results {
343 let record = self.work_to_record(work)?;
344 records.push(record);
345 sleep(self.rate_limit_delay).await;
346 }
347
348 Ok(records)
349 }
350
351 pub async fn fetch_topics(&self, domain: &str) -> Result<Vec<DataRecord>> {
353 let mut url = format!(
354 "{}/topics?search={}",
355 self.base_url,
356 urlencoding::encode(domain)
357 );
358 url.push_str("&per-page=50");
359
360 if let Some(email) = &self.user_email {
361 url.push_str(&format!("&mailto={}", email));
362 }
363
364 let response = self.fetch_with_retry(&url).await?;
365 let topics_response: OpenAlexTopicsResponse = response.json().await?;
366
367 let mut records = Vec::new();
368 for topic in topics_response.results {
369 let record = self.topic_to_record(topic)?;
370 records.push(record);
371 sleep(self.rate_limit_delay).await;
372 }
373
374 Ok(records)
375 }
376
377 fn work_to_record(&self, work: OpenAlexWork) -> Result<DataRecord> {
379 let title = work
380 .display_name
381 .or(work.title)
382 .unwrap_or_else(|| "Untitled".to_string());
383
384 let abstract_text = work
386 .abstract_inverted_index
387 .as_ref()
388 .map(|index| self.reconstruct_abstract(index))
389 .unwrap_or_default();
390
391 let text = format!("{} {}", title, abstract_text);
393 let embedding = self.embedder.embed_text(&text);
394
395 let timestamp = work
397 .publication_date
398 .as_ref()
399 .and_then(|d| NaiveDate::parse_from_str(d, "%Y-%m-%d").ok())
400 .map(|d| d.and_hms_opt(0, 0, 0).unwrap().and_utc())
401 .unwrap_or_else(Utc::now);
402
403 let mut relationships = Vec::new();
405
406 if let Some(authorships) = work.authorships {
408 for authorship in authorships {
409 if let Some(author) = authorship.author {
410 relationships.push(Relationship {
411 target_id: author.id,
412 rel_type: "authored_by".to_string(),
413 weight: 1.0,
414 properties: {
415 let mut props = HashMap::new();
416 if let Some(name) = author.display_name {
417 props.insert("author_name".to_string(), serde_json::json!(name));
418 }
419 props
420 },
421 });
422 }
423 }
424 }
425
426 if let Some(concepts) = work.concepts {
428 for concept in concepts {
429 relationships.push(Relationship {
430 target_id: concept.id,
431 rel_type: "has_concept".to_string(),
432 weight: concept.score.unwrap_or(0.0),
433 properties: {
434 let mut props = HashMap::new();
435 if let Some(name) = concept.display_name {
436 props.insert("concept_name".to_string(), serde_json::json!(name));
437 }
438 props
439 },
440 });
441 }
442 }
443
444 let mut data_map = serde_json::Map::new();
445 data_map.insert("title".to_string(), serde_json::json!(title));
446 data_map.insert("abstract".to_string(), serde_json::json!(abstract_text));
447 if let Some(citations) = work.cited_by_count {
448 data_map.insert("citations".to_string(), serde_json::json!(citations));
449 }
450
451 Ok(DataRecord {
452 id: work.id,
453 source: "openalex".to_string(),
454 record_type: "work".to_string(),
455 timestamp,
456 data: serde_json::Value::Object(data_map),
457 embedding: Some(embedding),
458 relationships,
459 })
460 }
461
462 fn reconstruct_abstract(&self, inverted_index: &HashMap<String, Vec<i32>>) -> String {
464 let mut positions: Vec<(i32, String)> = Vec::new();
465 for (word, indices) in inverted_index {
466 for &pos in indices {
467 positions.push((pos, word.clone()));
468 }
469 }
470 positions.sort_by_key(|&(pos, _)| pos);
471 positions
472 .into_iter()
473 .map(|(_, word)| word)
474 .collect::<Vec<_>>()
475 .join(" ")
476 }
477
478 fn topic_to_record(&self, topic: OpenAlexTopic) -> Result<DataRecord> {
480 let text = format!(
481 "{} {}",
482 topic.display_name,
483 topic.description.as_deref().unwrap_or("")
484 );
485 let embedding = self.embedder.embed_text(&text);
486
487 let mut data_map = serde_json::Map::new();
488 data_map.insert(
489 "display_name".to_string(),
490 serde_json::json!(topic.display_name),
491 );
492 if let Some(desc) = topic.description {
493 data_map.insert("description".to_string(), serde_json::json!(desc));
494 }
495 if let Some(count) = topic.works_count {
496 data_map.insert("works_count".to_string(), serde_json::json!(count));
497 }
498
499 Ok(DataRecord {
500 id: topic.id,
501 source: "openalex".to_string(),
502 record_type: "topic".to_string(),
503 timestamp: Utc::now(),
504 data: serde_json::Value::Object(data_map),
505 embedding: Some(embedding),
506 relationships: Vec::new(),
507 })
508 }
509
510 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
512 let mut retries = 0;
513 loop {
514 match self.client.get(url).send().await {
515 Ok(response) => {
516 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
517 {
518 retries += 1;
519 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
520 continue;
521 }
522 return Ok(response);
523 }
524 Err(_) if retries < MAX_RETRIES => {
525 retries += 1;
526 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
527 }
528 Err(e) => return Err(FrameworkError::Network(e)),
529 }
530 }
531 }
532}
533
534#[async_trait]
535impl DataSource for OpenAlexClient {
536 fn source_id(&self) -> &str {
537 "openalex"
538 }
539
540 async fn fetch_batch(
541 &self,
542 cursor: Option<String>,
543 batch_size: usize,
544 ) -> Result<(Vec<DataRecord>, Option<String>)> {
545 let query = cursor.as_deref().unwrap_or("machine learning");
547 let records = self.fetch_works(query, batch_size).await?;
548 Ok((records, None)) }
550
551 async fn total_count(&self) -> Result<Option<u64>> {
552 Ok(None)
553 }
554
555 async fn health_check(&self) -> Result<bool> {
556 let response = self.client.get(&self.base_url).send().await?;
557 Ok(response.status().is_success())
558 }
559}
560
561#[derive(Debug, Deserialize)]
567struct NoaaResponse {
568 results: Vec<NoaaObservation>,
569}
570
571#[derive(Debug, Deserialize)]
572struct NoaaObservation {
573 station: String,
574 date: String,
575 datatype: String,
576 value: f64,
577 #[serde(default)]
578 attributes: String,
579}
580
581pub struct NoaaClient {
583 client: Client,
584 base_url: String,
585 api_token: Option<String>,
586 rate_limit_delay: Duration,
587 embedder: Arc<SimpleEmbedder>,
588}
589
590impl NoaaClient {
591 pub fn new(api_token: Option<String>) -> Result<Self> {
596 let client = Client::builder()
597 .timeout(Duration::from_secs(30))
598 .build()
599 .map_err(|e| FrameworkError::Network(e))?;
600
601 Ok(Self {
602 client,
603 base_url: "https://www.ncei.noaa.gov/cdo-web/api/v2".to_string(),
604 api_token,
605 rate_limit_delay: Duration::from_millis(200), embedder: Arc::new(SimpleEmbedder::new(128)),
607 })
608 }
609
610 pub async fn fetch_climate_data(
617 &self,
618 station_id: &str,
619 start_date: &str,
620 end_date: &str,
621 ) -> Result<Vec<DataRecord>> {
622 if self.api_token.is_none() {
623 return Ok(self.generate_synthetic_climate_data(station_id, start_date, end_date)?);
625 }
626
627 let url = format!(
628 "{}/data?datasetid=GHCND&stationid={}&startdate={}&enddate={}&limit=1000",
629 self.base_url, station_id, start_date, end_date
630 );
631
632 let mut request = self.client.get(&url);
633 if let Some(token) = &self.api_token {
634 request = request.header("token", token);
635 }
636
637 let response = self.fetch_with_retry(request).await?;
638 let noaa_response: NoaaResponse = response.json().await?;
639
640 let mut records = Vec::new();
641 for observation in noaa_response.results {
642 let record = self.observation_to_record(observation)?;
643 records.push(record);
644 }
645
646 Ok(records)
647 }
648
649 fn generate_synthetic_climate_data(
651 &self,
652 station_id: &str,
653 start_date: &str,
654 _end_date: &str,
655 ) -> Result<Vec<DataRecord>> {
656 let mut records = Vec::new();
657 let datatypes = vec!["TMAX", "TMIN", "PRCP"];
658
659 for (i, datatype) in datatypes.iter().enumerate() {
661 let value = match *datatype {
662 "TMAX" => 250.0 + (i as f64 * 10.0),
663 "TMIN" => 150.0 + (i as f64 * 10.0),
664 "PRCP" => 5.0 + (i as f64),
665 _ => 0.0,
666 };
667
668 let text = format!("{} {} {}", station_id, datatype, value);
669 let embedding = self.embedder.embed_text(&text);
670
671 let mut data_map = serde_json::Map::new();
672 data_map.insert("station".to_string(), serde_json::json!(station_id));
673 data_map.insert("datatype".to_string(), serde_json::json!(datatype));
674 data_map.insert("value".to_string(), serde_json::json!(value));
675 data_map.insert("unit".to_string(), serde_json::json!("tenths"));
676
677 records.push(DataRecord {
678 id: format!("{}_{}_{}_{}", station_id, datatype, start_date, i),
679 source: "noaa".to_string(),
680 record_type: "observation".to_string(),
681 timestamp: Utc::now(),
682 data: serde_json::Value::Object(data_map),
683 embedding: Some(embedding),
684 relationships: Vec::new(),
685 });
686 }
687
688 Ok(records)
689 }
690
691 fn observation_to_record(&self, obs: NoaaObservation) -> Result<DataRecord> {
693 let text = format!("{} {} {}", obs.station, obs.datatype, obs.value);
694 let embedding = self.embedder.embed_text(&text);
695
696 let timestamp = NaiveDate::parse_from_str(&obs.date, "%Y-%m-%dT%H:%M:%S")
698 .or_else(|_| NaiveDate::parse_from_str(&obs.date, "%Y-%m-%d"))
699 .ok()
700 .and_then(|d| d.and_hms_opt(0, 0, 0))
701 .map(|dt| dt.and_utc())
702 .unwrap_or_else(Utc::now);
703
704 let mut data_map = serde_json::Map::new();
705 data_map.insert("station".to_string(), serde_json::json!(obs.station));
706 data_map.insert("datatype".to_string(), serde_json::json!(obs.datatype));
707 data_map.insert("value".to_string(), serde_json::json!(obs.value));
708 data_map.insert("attributes".to_string(), serde_json::json!(obs.attributes));
709
710 Ok(DataRecord {
711 id: format!("{}_{}", obs.station, obs.date),
712 source: "noaa".to_string(),
713 record_type: "observation".to_string(),
714 timestamp,
715 data: serde_json::Value::Object(data_map),
716 embedding: Some(embedding),
717 relationships: Vec::new(),
718 })
719 }
720
721 async fn fetch_with_retry(&self, request: reqwest::RequestBuilder) -> Result<reqwest::Response> {
723 let mut retries = 0;
724 loop {
725 let req = request
726 .try_clone()
727 .ok_or_else(|| FrameworkError::Config("Failed to clone request".to_string()))?;
728
729 match req.send().await {
730 Ok(response) => {
731 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
732 {
733 retries += 1;
734 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
735 continue;
736 }
737 return Ok(response);
738 }
739 Err(_) if retries < MAX_RETRIES => {
740 retries += 1;
741 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
742 }
743 Err(e) => return Err(FrameworkError::Network(e)),
744 }
745 }
746 }
747}
748
749#[async_trait]
750impl DataSource for NoaaClient {
751 fn source_id(&self) -> &str {
752 "noaa"
753 }
754
755 async fn fetch_batch(
756 &self,
757 _cursor: Option<String>,
758 _batch_size: usize,
759 ) -> Result<(Vec<DataRecord>, Option<String>)> {
760 let records = self
762 .fetch_climate_data("GHCND:USW00094728", "2024-01-01", "2024-01-31")
763 .await?;
764 Ok((records, None))
765 }
766
767 async fn total_count(&self) -> Result<Option<u64>> {
768 Ok(None)
769 }
770
771 async fn health_check(&self) -> Result<bool> {
772 Ok(true) }
774}
775
776#[derive(Debug, Deserialize)]
782struct EdgarFilingData {
783 #[serde(default)]
784 filings: EdgarFilings,
785}
786
787#[derive(Debug, Default, Deserialize)]
788struct EdgarFilings {
789 #[serde(default)]
790 recent: EdgarRecent,
791}
792
793#[derive(Debug, Default, Deserialize)]
794struct EdgarRecent {
795 #[serde(rename = "accessionNumber", default)]
796 accession_number: Vec<String>,
797 #[serde(rename = "filingDate", default)]
798 filing_date: Vec<String>,
799 #[serde(rename = "reportDate", default)]
800 report_date: Vec<String>,
801 #[serde(default)]
802 form: Vec<String>,
803 #[serde(rename = "primaryDocument", default)]
804 primary_document: Vec<String>,
805}
806
807pub struct EdgarClient {
809 client: Client,
810 base_url: String,
811 rate_limit_delay: Duration,
812 embedder: Arc<SimpleEmbedder>,
813 user_agent: String,
814}
815
816impl EdgarClient {
817 pub fn new(user_agent: String) -> Result<Self> {
822 let client = Client::builder()
823 .timeout(Duration::from_secs(30))
824 .user_agent(&user_agent)
825 .build()
826 .map_err(|e| FrameworkError::Network(e))?;
827
828 Ok(Self {
829 client,
830 base_url: "https://data.sec.gov".to_string(),
831 rate_limit_delay: Duration::from_millis(100), embedder: Arc::new(SimpleEmbedder::new(128)),
833 user_agent,
834 })
835 }
836
837 pub async fn fetch_filings(
843 &self,
844 cik: &str,
845 form_type: Option<&str>,
846 ) -> Result<Vec<DataRecord>> {
847 let padded_cik = format!("{:0>10}", cik);
849
850 let url = format!(
851 "{}/submissions/CIK{}.json",
852 self.base_url, padded_cik
853 );
854
855 let response = self.fetch_with_retry(&url).await?;
856 let filing_data: EdgarFilingData = response.json().await?;
857
858 let mut records = Vec::new();
859 let recent = filing_data.filings.recent;
860
861 let count = recent.accession_number.len();
862 for i in 0..count.min(50) {
863 if let Some(filter_form) = form_type {
866 if i < recent.form.len() && recent.form[i] != filter_form {
867 continue;
868 }
869 }
870
871 let filing = EdgarFiling {
872 cik: padded_cik.clone(),
873 accession_number: recent.accession_number.get(i).cloned().unwrap_or_default(),
874 filing_date: recent.filing_date.get(i).cloned().unwrap_or_default(),
875 report_date: recent.report_date.get(i).cloned().unwrap_or_default(),
876 form: recent.form.get(i).cloned().unwrap_or_default(),
877 primary_document: recent.primary_document.get(i).cloned().unwrap_or_default(),
878 };
879
880 let record = self.filing_to_record(filing)?;
881 records.push(record);
882 sleep(self.rate_limit_delay).await;
883 }
884
885 Ok(records)
886 }
887
888 fn filing_to_record(&self, filing: EdgarFiling) -> Result<DataRecord> {
890 let text = format!(
891 "CIK {} Form {} filed on {} report date {}",
892 filing.cik, filing.form, filing.filing_date, filing.report_date
893 );
894 let embedding = self.embedder.embed_text(&text);
895
896 let timestamp = NaiveDate::parse_from_str(&filing.filing_date, "%Y-%m-%d")
898 .ok()
899 .and_then(|d| d.and_hms_opt(0, 0, 0))
900 .map(|dt| dt.and_utc())
901 .unwrap_or_else(Utc::now);
902
903 let mut data_map = serde_json::Map::new();
904 data_map.insert("cik".to_string(), serde_json::json!(filing.cik));
905 data_map.insert(
906 "accession_number".to_string(),
907 serde_json::json!(filing.accession_number),
908 );
909 data_map.insert(
910 "filing_date".to_string(),
911 serde_json::json!(filing.filing_date),
912 );
913 data_map.insert(
914 "report_date".to_string(),
915 serde_json::json!(filing.report_date),
916 );
917 data_map.insert("form".to_string(), serde_json::json!(filing.form));
918 data_map.insert(
919 "primary_document".to_string(),
920 serde_json::json!(filing.primary_document),
921 );
922
923 let filing_url = format!(
925 "https://www.sec.gov/cgi-bin/viewer?action=view&cik={}&accession_number={}&xbrl_type=v",
926 filing.cik, filing.accession_number
927 );
928 data_map.insert("filing_url".to_string(), serde_json::json!(filing_url));
929
930 Ok(DataRecord {
931 id: format!("{}_{}", filing.cik, filing.accession_number),
932 source: "edgar".to_string(),
933 record_type: filing.form,
934 timestamp,
935 data: serde_json::Value::Object(data_map),
936 embedding: Some(embedding),
937 relationships: Vec::new(),
938 })
939 }
940
941 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
943 let mut retries = 0;
944 loop {
945 match self.client.get(url).send().await {
946 Ok(response) => {
947 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
948 {
949 retries += 1;
950 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
951 continue;
952 }
953 return Ok(response);
954 }
955 Err(_) if retries < MAX_RETRIES => {
956 retries += 1;
957 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
958 }
959 Err(e) => return Err(FrameworkError::Network(e)),
960 }
961 }
962 }
963}
964
965struct EdgarFiling {
967 cik: String,
968 accession_number: String,
969 filing_date: String,
970 report_date: String,
971 form: String,
972 primary_document: String,
973}
974
975#[async_trait]
976impl DataSource for EdgarClient {
977 fn source_id(&self) -> &str {
978 "edgar"
979 }
980
981 async fn fetch_batch(
982 &self,
983 cursor: Option<String>,
984 _batch_size: usize,
985 ) -> Result<(Vec<DataRecord>, Option<String>)> {
986 let cik = cursor.as_deref().unwrap_or("320193");
988 let records = self.fetch_filings(cik, None).await?;
989 Ok((records, None))
990 }
991
992 async fn total_count(&self) -> Result<Option<u64>> {
993 Ok(None)
994 }
995
996 async fn health_check(&self) -> Result<bool> {
997 Ok(true)
998 }
999}
1000
1001#[cfg(test)]
1006mod tests {
1007 use super::*;
1008
1009 #[test]
1010 fn test_simple_embedder() {
1011 let embedder = SimpleEmbedder::new(128);
1012 let embedding = embedder.embed_text("machine learning artificial intelligence");
1013
1014 assert_eq!(embedding.len(), 128);
1015
1016 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
1018 assert!((magnitude - 1.0).abs() < 0.01);
1019 }
1020
1021 #[test]
1022 fn test_embedder_json() {
1023 let embedder = SimpleEmbedder::new(64);
1024 let json = serde_json::json!({
1025 "title": "Test Document",
1026 "content": "Some interesting content here"
1027 });
1028
1029 let embedding = embedder.embed_json(&json);
1030 assert_eq!(embedding.len(), 64);
1031 }
1032
1033 #[tokio::test]
1034 async fn test_openalex_client_creation() {
1035 let client = OpenAlexClient::new(Some("test@example.com".to_string()));
1036 assert!(client.is_ok());
1037 }
1038
1039 #[tokio::test]
1040 async fn test_noaa_client_creation() {
1041 let client = NoaaClient::new(None);
1042 assert!(client.is_ok());
1043 }
1044
1045 #[tokio::test]
1046 async fn test_noaa_synthetic_data() {
1047 let client = NoaaClient::new(None).unwrap();
1048 let records = client
1049 .fetch_climate_data("GHCND:TEST", "2024-01-01", "2024-01-31")
1050 .await
1051 .unwrap();
1052
1053 assert!(!records.is_empty());
1054 assert_eq!(records[0].source, "noaa");
1055 assert!(records[0].embedding.is_some());
1056 }
1057
1058 #[tokio::test]
1059 async fn test_edgar_client_creation() {
1060 let client = EdgarClient::new("test-agent test@example.com".to_string());
1061 assert!(client.is_ok());
1062 }
1063}