1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use chrono::{NaiveDate, Utc};
12use reqwest::{Client, StatusCode};
13use serde::Deserialize;
14use tokio::time::sleep;
15
16use crate::{DataRecord, DataSource, FrameworkError, Relationship, Result};
17
18const DEFAULT_RATE_LIMIT_DELAY_MS: u64 = 100;
20const MAX_RETRIES: u32 = 3;
21const RETRY_DELAY_MS: u64 = 1000;
22
23pub struct SimpleEmbedder {
29 dimension: usize,
30}
31
32impl SimpleEmbedder {
33 pub fn new(dimension: usize) -> Self {
35 Self { dimension }
36 }
37
38 pub fn embed_text(&self, text: &str) -> Vec<f32> {
40 let lowercase_text = text.to_lowercase();
41 let words: Vec<&str> = lowercase_text
42 .split_whitespace()
43 .filter(|w| w.len() > 2)
44 .collect();
45
46 let mut embedding = vec![0.0f32; self.dimension];
47
48 for word in words {
50 let hash = self.hash_word(word);
51 let idx = (hash % self.dimension as u64) as usize;
52 embedding[idx] += 1.0;
53 }
54
55 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
57 if magnitude > 0.0 {
58 for val in &mut embedding {
59 *val /= magnitude;
60 }
61 }
62
63 embedding
64 }
65
66 fn hash_word(&self, word: &str) -> u64 {
68 let mut hash = 5381u64;
69 for byte in word.bytes() {
70 hash = hash.wrapping_mul(33).wrapping_add(byte as u64);
71 }
72 hash
73 }
74
75 pub fn embed_json(&self, value: &serde_json::Value) -> Vec<f32> {
77 let text = self.extract_text_from_json(value);
78 self.embed_text(&text)
79 }
80
81 fn extract_text_from_json(&self, value: &serde_json::Value) -> String {
83 match value {
84 serde_json::Value::String(s) => s.clone(),
85 serde_json::Value::Object(map) => {
86 let mut text = String::new();
87 for (key, val) in map {
88 text.push_str(key);
89 text.push(' ');
90 text.push_str(&self.extract_text_from_json(val));
91 text.push(' ');
92 }
93 text
94 }
95 serde_json::Value::Array(arr) => {
96 arr.iter()
97 .map(|v| self.extract_text_from_json(v))
98 .collect::<Vec<_>>()
99 .join(" ")
100 }
101 serde_json::Value::Number(n) => n.to_string(),
102 serde_json::Value::Bool(b) => b.to_string(),
103 serde_json::Value::Null => String::new(),
104 }
105 }
106}
107
108#[derive(Debug, Deserialize)]
114struct OpenAlexWorksResponse {
115 results: Vec<OpenAlexWork>,
116 meta: OpenAlexMeta,
117}
118
119#[derive(Debug, Deserialize)]
120struct OpenAlexWork {
121 id: String,
122 title: Option<String>,
123 #[serde(rename = "display_name")]
124 display_name: Option<String>,
125 publication_date: Option<String>,
126 #[serde(rename = "authorships")]
127 authorships: Option<Vec<OpenAlexAuthorship>>,
128 #[serde(rename = "cited_by_count")]
129 cited_by_count: Option<i64>,
130 #[serde(rename = "concepts")]
131 concepts: Option<Vec<OpenAlexConcept>>,
132 #[serde(rename = "abstract_inverted_index")]
133 abstract_inverted_index: Option<HashMap<String, Vec<i32>>>,
134}
135
136#[derive(Debug, Deserialize)]
137struct OpenAlexAuthorship {
138 author: Option<OpenAlexAuthor>,
139}
140
141#[derive(Debug, Deserialize)]
142struct OpenAlexAuthor {
143 id: String,
144 #[serde(rename = "display_name")]
145 display_name: Option<String>,
146}
147
148#[derive(Debug, Deserialize)]
149struct OpenAlexConcept {
150 id: String,
151 #[serde(rename = "display_name")]
152 display_name: Option<String>,
153 score: Option<f64>,
154}
155
156#[derive(Debug, Deserialize)]
157struct OpenAlexMeta {
158 count: i64,
159}
160
161#[derive(Debug, Deserialize)]
163struct OpenAlexTopicsResponse {
164 results: Vec<OpenAlexTopic>,
165}
166
167#[derive(Debug, Deserialize)]
168struct OpenAlexTopic {
169 id: String,
170 #[serde(rename = "display_name")]
171 display_name: String,
172 description: Option<String>,
173 #[serde(rename = "works_count")]
174 works_count: Option<i64>,
175}
176
177pub struct OpenAlexClient {
179 client: Client,
180 base_url: String,
181 rate_limit_delay: Duration,
182 embedder: Arc<SimpleEmbedder>,
183 user_email: Option<String>,
184}
185
186impl OpenAlexClient {
187 pub fn new(user_email: Option<String>) -> Result<Self> {
192 let client = Client::builder()
193 .timeout(Duration::from_secs(30))
194 .build()
195 .map_err(|e| FrameworkError::Network(e))?;
196
197 Ok(Self {
198 client,
199 base_url: "https://api.openalex.org".to_string(),
200 rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
201 embedder: Arc::new(SimpleEmbedder::new(128)),
202 user_email,
203 })
204 }
205
206 pub async fn fetch_works(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
212 let mut url = format!("{}/works?search={}", self.base_url, urlencoding::encode(query));
213 url.push_str(&format!("&per-page={}", limit.min(200)));
214
215 if let Some(email) = &self.user_email {
216 url.push_str(&format!("&mailto={}", email));
217 }
218
219 let response = self.fetch_with_retry(&url).await?;
220 let works_response: OpenAlexWorksResponse = response.json().await?;
221
222 let mut records = Vec::new();
223 for work in works_response.results {
224 let record = self.work_to_record(work)?;
225 records.push(record);
226 sleep(self.rate_limit_delay).await;
227 }
228
229 Ok(records)
230 }
231
232 pub async fn fetch_topics(&self, domain: &str) -> Result<Vec<DataRecord>> {
234 let mut url = format!(
235 "{}/topics?search={}",
236 self.base_url,
237 urlencoding::encode(domain)
238 );
239 url.push_str("&per-page=50");
240
241 if let Some(email) = &self.user_email {
242 url.push_str(&format!("&mailto={}", email));
243 }
244
245 let response = self.fetch_with_retry(&url).await?;
246 let topics_response: OpenAlexTopicsResponse = response.json().await?;
247
248 let mut records = Vec::new();
249 for topic in topics_response.results {
250 let record = self.topic_to_record(topic)?;
251 records.push(record);
252 sleep(self.rate_limit_delay).await;
253 }
254
255 Ok(records)
256 }
257
258 fn work_to_record(&self, work: OpenAlexWork) -> Result<DataRecord> {
260 let title = work
261 .display_name
262 .or(work.title)
263 .unwrap_or_else(|| "Untitled".to_string());
264
265 let abstract_text = work
267 .abstract_inverted_index
268 .as_ref()
269 .map(|index| self.reconstruct_abstract(index))
270 .unwrap_or_default();
271
272 let text = format!("{} {}", title, abstract_text);
274 let embedding = self.embedder.embed_text(&text);
275
276 let timestamp = work
278 .publication_date
279 .as_ref()
280 .and_then(|d| NaiveDate::parse_from_str(d, "%Y-%m-%d").ok())
281 .map(|d| d.and_hms_opt(0, 0, 0).unwrap().and_utc())
282 .unwrap_or_else(Utc::now);
283
284 let mut relationships = Vec::new();
286
287 if let Some(authorships) = work.authorships {
289 for authorship in authorships {
290 if let Some(author) = authorship.author {
291 relationships.push(Relationship {
292 target_id: author.id,
293 rel_type: "authored_by".to_string(),
294 weight: 1.0,
295 properties: {
296 let mut props = HashMap::new();
297 if let Some(name) = author.display_name {
298 props.insert("author_name".to_string(), serde_json::json!(name));
299 }
300 props
301 },
302 });
303 }
304 }
305 }
306
307 if let Some(concepts) = work.concepts {
309 for concept in concepts {
310 relationships.push(Relationship {
311 target_id: concept.id,
312 rel_type: "has_concept".to_string(),
313 weight: concept.score.unwrap_or(0.0),
314 properties: {
315 let mut props = HashMap::new();
316 if let Some(name) = concept.display_name {
317 props.insert("concept_name".to_string(), serde_json::json!(name));
318 }
319 props
320 },
321 });
322 }
323 }
324
325 let mut data_map = serde_json::Map::new();
326 data_map.insert("title".to_string(), serde_json::json!(title));
327 data_map.insert("abstract".to_string(), serde_json::json!(abstract_text));
328 if let Some(citations) = work.cited_by_count {
329 data_map.insert("citations".to_string(), serde_json::json!(citations));
330 }
331
332 Ok(DataRecord {
333 id: work.id,
334 source: "openalex".to_string(),
335 record_type: "work".to_string(),
336 timestamp,
337 data: serde_json::Value::Object(data_map),
338 embedding: Some(embedding),
339 relationships,
340 })
341 }
342
343 fn reconstruct_abstract(&self, inverted_index: &HashMap<String, Vec<i32>>) -> String {
345 let mut positions: Vec<(i32, String)> = Vec::new();
346 for (word, indices) in inverted_index {
347 for &pos in indices {
348 positions.push((pos, word.clone()));
349 }
350 }
351 positions.sort_by_key(|&(pos, _)| pos);
352 positions
353 .into_iter()
354 .map(|(_, word)| word)
355 .collect::<Vec<_>>()
356 .join(" ")
357 }
358
359 fn topic_to_record(&self, topic: OpenAlexTopic) -> Result<DataRecord> {
361 let text = format!(
362 "{} {}",
363 topic.display_name,
364 topic.description.as_deref().unwrap_or("")
365 );
366 let embedding = self.embedder.embed_text(&text);
367
368 let mut data_map = serde_json::Map::new();
369 data_map.insert(
370 "display_name".to_string(),
371 serde_json::json!(topic.display_name),
372 );
373 if let Some(desc) = topic.description {
374 data_map.insert("description".to_string(), serde_json::json!(desc));
375 }
376 if let Some(count) = topic.works_count {
377 data_map.insert("works_count".to_string(), serde_json::json!(count));
378 }
379
380 Ok(DataRecord {
381 id: topic.id,
382 source: "openalex".to_string(),
383 record_type: "topic".to_string(),
384 timestamp: Utc::now(),
385 data: serde_json::Value::Object(data_map),
386 embedding: Some(embedding),
387 relationships: Vec::new(),
388 })
389 }
390
391 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
393 let mut retries = 0;
394 loop {
395 match self.client.get(url).send().await {
396 Ok(response) => {
397 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
398 {
399 retries += 1;
400 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
401 continue;
402 }
403 return Ok(response);
404 }
405 Err(_) if retries < MAX_RETRIES => {
406 retries += 1;
407 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
408 }
409 Err(e) => return Err(FrameworkError::Network(e)),
410 }
411 }
412 }
413}
414
415#[async_trait]
416impl DataSource for OpenAlexClient {
417 fn source_id(&self) -> &str {
418 "openalex"
419 }
420
421 async fn fetch_batch(
422 &self,
423 cursor: Option<String>,
424 batch_size: usize,
425 ) -> Result<(Vec<DataRecord>, Option<String>)> {
426 let query = cursor.as_deref().unwrap_or("machine learning");
428 let records = self.fetch_works(query, batch_size).await?;
429 Ok((records, None)) }
431
432 async fn total_count(&self) -> Result<Option<u64>> {
433 Ok(None)
434 }
435
436 async fn health_check(&self) -> Result<bool> {
437 let response = self.client.get(&self.base_url).send().await?;
438 Ok(response.status().is_success())
439 }
440}
441
442#[derive(Debug, Deserialize)]
448struct NoaaResponse {
449 results: Vec<NoaaObservation>,
450}
451
452#[derive(Debug, Deserialize)]
453struct NoaaObservation {
454 station: String,
455 date: String,
456 datatype: String,
457 value: f64,
458 #[serde(default)]
459 attributes: String,
460}
461
462pub struct NoaaClient {
464 client: Client,
465 base_url: String,
466 api_token: Option<String>,
467 rate_limit_delay: Duration,
468 embedder: Arc<SimpleEmbedder>,
469}
470
471impl NoaaClient {
472 pub fn new(api_token: Option<String>) -> Result<Self> {
477 let client = Client::builder()
478 .timeout(Duration::from_secs(30))
479 .build()
480 .map_err(|e| FrameworkError::Network(e))?;
481
482 Ok(Self {
483 client,
484 base_url: "https://www.ncei.noaa.gov/cdo-web/api/v2".to_string(),
485 api_token,
486 rate_limit_delay: Duration::from_millis(200), embedder: Arc::new(SimpleEmbedder::new(128)),
488 })
489 }
490
491 pub async fn fetch_climate_data(
498 &self,
499 station_id: &str,
500 start_date: &str,
501 end_date: &str,
502 ) -> Result<Vec<DataRecord>> {
503 if self.api_token.is_none() {
504 return Ok(self.generate_synthetic_climate_data(station_id, start_date, end_date)?);
506 }
507
508 let url = format!(
509 "{}/data?datasetid=GHCND&stationid={}&startdate={}&enddate={}&limit=1000",
510 self.base_url, station_id, start_date, end_date
511 );
512
513 let mut request = self.client.get(&url);
514 if let Some(token) = &self.api_token {
515 request = request.header("token", token);
516 }
517
518 let response = self.fetch_with_retry(request).await?;
519 let noaa_response: NoaaResponse = response.json().await?;
520
521 let mut records = Vec::new();
522 for observation in noaa_response.results {
523 let record = self.observation_to_record(observation)?;
524 records.push(record);
525 }
526
527 Ok(records)
528 }
529
530 fn generate_synthetic_climate_data(
532 &self,
533 station_id: &str,
534 start_date: &str,
535 _end_date: &str,
536 ) -> Result<Vec<DataRecord>> {
537 let mut records = Vec::new();
538 let datatypes = vec!["TMAX", "TMIN", "PRCP"];
539
540 for (i, datatype) in datatypes.iter().enumerate() {
542 let value = match *datatype {
543 "TMAX" => 250.0 + (i as f64 * 10.0),
544 "TMIN" => 150.0 + (i as f64 * 10.0),
545 "PRCP" => 5.0 + (i as f64),
546 _ => 0.0,
547 };
548
549 let text = format!("{} {} {}", station_id, datatype, value);
550 let embedding = self.embedder.embed_text(&text);
551
552 let mut data_map = serde_json::Map::new();
553 data_map.insert("station".to_string(), serde_json::json!(station_id));
554 data_map.insert("datatype".to_string(), serde_json::json!(datatype));
555 data_map.insert("value".to_string(), serde_json::json!(value));
556 data_map.insert("unit".to_string(), serde_json::json!("tenths"));
557
558 records.push(DataRecord {
559 id: format!("{}_{}_{}_{}", station_id, datatype, start_date, i),
560 source: "noaa".to_string(),
561 record_type: "observation".to_string(),
562 timestamp: Utc::now(),
563 data: serde_json::Value::Object(data_map),
564 embedding: Some(embedding),
565 relationships: Vec::new(),
566 });
567 }
568
569 Ok(records)
570 }
571
572 fn observation_to_record(&self, obs: NoaaObservation) -> Result<DataRecord> {
574 let text = format!("{} {} {}", obs.station, obs.datatype, obs.value);
575 let embedding = self.embedder.embed_text(&text);
576
577 let timestamp = NaiveDate::parse_from_str(&obs.date, "%Y-%m-%dT%H:%M:%S")
579 .or_else(|_| NaiveDate::parse_from_str(&obs.date, "%Y-%m-%d"))
580 .ok()
581 .and_then(|d| d.and_hms_opt(0, 0, 0))
582 .map(|dt| dt.and_utc())
583 .unwrap_or_else(Utc::now);
584
585 let mut data_map = serde_json::Map::new();
586 data_map.insert("station".to_string(), serde_json::json!(obs.station));
587 data_map.insert("datatype".to_string(), serde_json::json!(obs.datatype));
588 data_map.insert("value".to_string(), serde_json::json!(obs.value));
589 data_map.insert("attributes".to_string(), serde_json::json!(obs.attributes));
590
591 Ok(DataRecord {
592 id: format!("{}_{}", obs.station, obs.date),
593 source: "noaa".to_string(),
594 record_type: "observation".to_string(),
595 timestamp,
596 data: serde_json::Value::Object(data_map),
597 embedding: Some(embedding),
598 relationships: Vec::new(),
599 })
600 }
601
602 async fn fetch_with_retry(&self, request: reqwest::RequestBuilder) -> Result<reqwest::Response> {
604 let mut retries = 0;
605 loop {
606 let req = request
607 .try_clone()
608 .ok_or_else(|| FrameworkError::Config("Failed to clone request".to_string()))?;
609
610 match req.send().await {
611 Ok(response) => {
612 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
613 {
614 retries += 1;
615 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
616 continue;
617 }
618 return Ok(response);
619 }
620 Err(_) if retries < MAX_RETRIES => {
621 retries += 1;
622 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
623 }
624 Err(e) => return Err(FrameworkError::Network(e)),
625 }
626 }
627 }
628}
629
630#[async_trait]
631impl DataSource for NoaaClient {
632 fn source_id(&self) -> &str {
633 "noaa"
634 }
635
636 async fn fetch_batch(
637 &self,
638 _cursor: Option<String>,
639 _batch_size: usize,
640 ) -> Result<(Vec<DataRecord>, Option<String>)> {
641 let records = self
643 .fetch_climate_data("GHCND:USW00094728", "2024-01-01", "2024-01-31")
644 .await?;
645 Ok((records, None))
646 }
647
648 async fn total_count(&self) -> Result<Option<u64>> {
649 Ok(None)
650 }
651
652 async fn health_check(&self) -> Result<bool> {
653 Ok(true) }
655}
656
657#[derive(Debug, Deserialize)]
663struct EdgarFilingData {
664 #[serde(default)]
665 filings: EdgarFilings,
666}
667
668#[derive(Debug, Default, Deserialize)]
669struct EdgarFilings {
670 #[serde(default)]
671 recent: EdgarRecent,
672}
673
674#[derive(Debug, Default, Deserialize)]
675struct EdgarRecent {
676 #[serde(rename = "accessionNumber", default)]
677 accession_number: Vec<String>,
678 #[serde(rename = "filingDate", default)]
679 filing_date: Vec<String>,
680 #[serde(rename = "reportDate", default)]
681 report_date: Vec<String>,
682 #[serde(default)]
683 form: Vec<String>,
684 #[serde(rename = "primaryDocument", default)]
685 primary_document: Vec<String>,
686}
687
688pub struct EdgarClient {
690 client: Client,
691 base_url: String,
692 rate_limit_delay: Duration,
693 embedder: Arc<SimpleEmbedder>,
694 user_agent: String,
695}
696
697impl EdgarClient {
698 pub fn new(user_agent: String) -> Result<Self> {
703 let client = Client::builder()
704 .timeout(Duration::from_secs(30))
705 .user_agent(&user_agent)
706 .build()
707 .map_err(|e| FrameworkError::Network(e))?;
708
709 Ok(Self {
710 client,
711 base_url: "https://data.sec.gov".to_string(),
712 rate_limit_delay: Duration::from_millis(100), embedder: Arc::new(SimpleEmbedder::new(128)),
714 user_agent,
715 })
716 }
717
718 pub async fn fetch_filings(
724 &self,
725 cik: &str,
726 form_type: Option<&str>,
727 ) -> Result<Vec<DataRecord>> {
728 let padded_cik = format!("{:0>10}", cik);
730
731 let url = format!(
732 "{}/submissions/CIK{}.json",
733 self.base_url, padded_cik
734 );
735
736 let response = self.fetch_with_retry(&url).await?;
737 let filing_data: EdgarFilingData = response.json().await?;
738
739 let mut records = Vec::new();
740 let recent = filing_data.filings.recent;
741
742 let count = recent.accession_number.len();
743 for i in 0..count.min(50) {
744 if let Some(filter_form) = form_type {
747 if i < recent.form.len() && recent.form[i] != filter_form {
748 continue;
749 }
750 }
751
752 let filing = EdgarFiling {
753 cik: padded_cik.clone(),
754 accession_number: recent.accession_number.get(i).cloned().unwrap_or_default(),
755 filing_date: recent.filing_date.get(i).cloned().unwrap_or_default(),
756 report_date: recent.report_date.get(i).cloned().unwrap_or_default(),
757 form: recent.form.get(i).cloned().unwrap_or_default(),
758 primary_document: recent.primary_document.get(i).cloned().unwrap_or_default(),
759 };
760
761 let record = self.filing_to_record(filing)?;
762 records.push(record);
763 sleep(self.rate_limit_delay).await;
764 }
765
766 Ok(records)
767 }
768
769 fn filing_to_record(&self, filing: EdgarFiling) -> Result<DataRecord> {
771 let text = format!(
772 "CIK {} Form {} filed on {} report date {}",
773 filing.cik, filing.form, filing.filing_date, filing.report_date
774 );
775 let embedding = self.embedder.embed_text(&text);
776
777 let timestamp = NaiveDate::parse_from_str(&filing.filing_date, "%Y-%m-%d")
779 .ok()
780 .and_then(|d| d.and_hms_opt(0, 0, 0))
781 .map(|dt| dt.and_utc())
782 .unwrap_or_else(Utc::now);
783
784 let mut data_map = serde_json::Map::new();
785 data_map.insert("cik".to_string(), serde_json::json!(filing.cik));
786 data_map.insert(
787 "accession_number".to_string(),
788 serde_json::json!(filing.accession_number),
789 );
790 data_map.insert(
791 "filing_date".to_string(),
792 serde_json::json!(filing.filing_date),
793 );
794 data_map.insert(
795 "report_date".to_string(),
796 serde_json::json!(filing.report_date),
797 );
798 data_map.insert("form".to_string(), serde_json::json!(filing.form));
799 data_map.insert(
800 "primary_document".to_string(),
801 serde_json::json!(filing.primary_document),
802 );
803
804 let filing_url = format!(
806 "https://www.sec.gov/cgi-bin/viewer?action=view&cik={}&accession_number={}&xbrl_type=v",
807 filing.cik, filing.accession_number
808 );
809 data_map.insert("filing_url".to_string(), serde_json::json!(filing_url));
810
811 Ok(DataRecord {
812 id: format!("{}_{}", filing.cik, filing.accession_number),
813 source: "edgar".to_string(),
814 record_type: filing.form,
815 timestamp,
816 data: serde_json::Value::Object(data_map),
817 embedding: Some(embedding),
818 relationships: Vec::new(),
819 })
820 }
821
822 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
824 let mut retries = 0;
825 loop {
826 match self.client.get(url).send().await {
827 Ok(response) => {
828 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
829 {
830 retries += 1;
831 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
832 continue;
833 }
834 return Ok(response);
835 }
836 Err(_) if retries < MAX_RETRIES => {
837 retries += 1;
838 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
839 }
840 Err(e) => return Err(FrameworkError::Network(e)),
841 }
842 }
843 }
844}
845
846struct EdgarFiling {
848 cik: String,
849 accession_number: String,
850 filing_date: String,
851 report_date: String,
852 form: String,
853 primary_document: String,
854}
855
856#[async_trait]
857impl DataSource for EdgarClient {
858 fn source_id(&self) -> &str {
859 "edgar"
860 }
861
862 async fn fetch_batch(
863 &self,
864 cursor: Option<String>,
865 _batch_size: usize,
866 ) -> Result<(Vec<DataRecord>, Option<String>)> {
867 let cik = cursor.as_deref().unwrap_or("320193");
869 let records = self.fetch_filings(cik, None).await?;
870 Ok((records, None))
871 }
872
873 async fn total_count(&self) -> Result<Option<u64>> {
874 Ok(None)
875 }
876
877 async fn health_check(&self) -> Result<bool> {
878 Ok(true)
879 }
880}
881
882#[cfg(test)]
887mod tests {
888 use super::*;
889
890 #[test]
891 fn test_simple_embedder() {
892 let embedder = SimpleEmbedder::new(128);
893 let embedding = embedder.embed_text("machine learning artificial intelligence");
894
895 assert_eq!(embedding.len(), 128);
896
897 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
899 assert!((magnitude - 1.0).abs() < 0.01);
900 }
901
902 #[test]
903 fn test_embedder_json() {
904 let embedder = SimpleEmbedder::new(64);
905 let json = serde_json::json!({
906 "title": "Test Document",
907 "content": "Some interesting content here"
908 });
909
910 let embedding = embedder.embed_json(&json);
911 assert_eq!(embedding.len(), 64);
912 }
913
914 #[tokio::test]
915 async fn test_openalex_client_creation() {
916 let client = OpenAlexClient::new(Some("test@example.com".to_string()));
917 assert!(client.is_ok());
918 }
919
920 #[tokio::test]
921 async fn test_noaa_client_creation() {
922 let client = NoaaClient::new(None);
923 assert!(client.is_ok());
924 }
925
926 #[tokio::test]
927 async fn test_noaa_synthetic_data() {
928 let client = NoaaClient::new(None).unwrap();
929 let records = client
930 .fetch_climate_data("GHCND:TEST", "2024-01-01", "2024-01-31")
931 .await
932 .unwrap();
933
934 assert!(!records.is_empty());
935 assert_eq!(records[0].source, "noaa");
936 assert!(records[0].embedding.is_some());
937 }
938
939 #[tokio::test]
940 async fn test_edgar_client_creation() {
941 let client = EdgarClient::new("test-agent test@example.com".to_string());
942 assert!(client.is_ok());
943 }
944}