1use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use chrono::{NaiveDate, Utc};
15use reqwest::{Client, StatusCode};
16use serde::Deserialize;
17use tokio::time::sleep;
18
19use crate::{DataRecord, DataSource, FrameworkError, Relationship, Result, SimpleEmbedder};
20
21const DEFAULT_RATE_LIMIT_DELAY_MS: u64 = 100;
23const MAX_RETRIES: u32 = 3;
24const RETRY_DELAY_MS: u64 = 1000;
25const EMBEDDING_DIMENSION: usize = 128;
26
27#[derive(Debug, Deserialize)]
33struct OpenAlexWorksResponse {
34 results: Vec<OpenAlexWork>,
35 meta: OpenAlexMeta,
36}
37
38#[derive(Debug, Deserialize)]
39struct OpenAlexWork {
40 id: String,
41 #[serde(rename = "display_name")]
42 display_name: Option<String>,
43 publication_date: Option<String>,
44 #[serde(rename = "authorships")]
45 authorships: Option<Vec<OpenAlexAuthorship>>,
46 #[serde(rename = "cited_by_count")]
47 cited_by_count: Option<i64>,
48 #[serde(rename = "abstract_inverted_index")]
49 abstract_inverted_index: Option<HashMap<String, Vec<i32>>>,
50}
51
52#[derive(Debug, Deserialize)]
53struct OpenAlexAuthorship {
54 author: Option<OpenAlexAuthor>,
55}
56
57#[derive(Debug, Deserialize)]
58struct OpenAlexAuthor {
59 id: String,
60 #[serde(rename = "display_name")]
61 display_name: Option<String>,
62}
63
64#[derive(Debug, Deserialize)]
65struct OpenAlexMeta {
66 count: i64,
67}
68
69#[derive(Debug, Deserialize)]
70struct OpenAlexAuthorsResponse {
71 results: Vec<OpenAlexAuthorDetail>,
72}
73
74#[derive(Debug, Deserialize)]
75struct OpenAlexAuthorDetail {
76 id: String,
77 #[serde(rename = "display_name")]
78 display_name: Option<String>,
79 #[serde(rename = "works_count")]
80 works_count: Option<i64>,
81 #[serde(rename = "cited_by_count")]
82 cited_by_count: Option<i64>,
83}
84
85pub struct OpenAlexClient {
87 client: Client,
88 base_url: String,
89 rate_limit_delay: Duration,
90 embedder: Arc<SimpleEmbedder>,
91 user_email: Option<String>,
92}
93
94impl OpenAlexClient {
95 pub fn new(user_email: Option<String>) -> Result<Self> {
100 let client = Client::builder()
101 .timeout(Duration::from_secs(30))
102 .build()
103 .map_err(FrameworkError::Network)?;
104
105 Ok(Self {
106 client,
107 base_url: "https://api.openalex.org".to_string(),
108 rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
109 embedder: Arc::new(SimpleEmbedder::new(EMBEDDING_DIMENSION)),
110 user_email,
111 })
112 }
113
114 pub async fn search_works(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
120 let mut url = format!(
121 "{}/works?search={}",
122 self.base_url,
123 urlencoding::encode(query)
124 );
125 url.push_str(&format!("&per-page={}", limit.min(200)));
126
127 if let Some(email) = &self.user_email {
128 url.push_str(&format!("&mailto={}", email));
129 }
130
131 let response = self.fetch_with_retry(&url).await?;
132 let works_response: OpenAlexWorksResponse = response.json().await?;
133
134 let mut records = Vec::new();
135 for work in works_response.results {
136 let record = self.work_to_record(work)?;
137 records.push(record);
138 sleep(self.rate_limit_delay).await;
139 }
140
141 Ok(records)
142 }
143
144 pub async fn get_work(&self, id: &str) -> Result<DataRecord> {
149 let url = format!("{}/works/{}", self.base_url, id);
150 let response = self.fetch_with_retry(&url).await?;
151 let work: OpenAlexWork = response.json().await?;
152 self.work_to_record(work)
153 }
154
155 pub async fn search_authors(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
161 let mut url = format!(
162 "{}/authors?search={}",
163 self.base_url,
164 urlencoding::encode(query)
165 );
166 url.push_str(&format!("&per-page={}", limit.min(200)));
167
168 if let Some(email) = &self.user_email {
169 url.push_str(&format!("&mailto={}", email));
170 }
171
172 let response = self.fetch_with_retry(&url).await?;
173 let authors_response: OpenAlexAuthorsResponse = response.json().await?;
174
175 let mut records = Vec::new();
176 for author in authors_response.results {
177 let record = self.author_to_record(author)?;
178 records.push(record);
179 sleep(self.rate_limit_delay).await;
180 }
181
182 Ok(records)
183 }
184
185 pub async fn get_citations(&self, work_id: &str) -> Result<Vec<DataRecord>> {
190 let url = format!("{}/works?filter=cites:{}", self.base_url, work_id);
191 let response = self.fetch_with_retry(&url).await?;
192 let works_response: OpenAlexWorksResponse = response.json().await?;
193
194 let mut records = Vec::new();
195 for work in works_response.results {
196 let record = self.work_to_record(work)?;
197 records.push(record);
198 sleep(self.rate_limit_delay).await;
199 }
200
201 Ok(records)
202 }
203
204 fn work_to_record(&self, work: OpenAlexWork) -> Result<DataRecord> {
206 let title = work
207 .display_name
208 .unwrap_or_else(|| "Untitled".to_string());
209
210 let abstract_text = work
211 .abstract_inverted_index
212 .as_ref()
213 .map(|index| self.reconstruct_abstract(index))
214 .unwrap_or_default();
215
216 let text = format!("{} {}", title, abstract_text);
217 let embedding = self.embedder.embed_text(&text);
218
219 let timestamp = work
220 .publication_date
221 .as_ref()
222 .and_then(|d| NaiveDate::parse_from_str(d, "%Y-%m-%d").ok())
223 .map(|d| d.and_hms_opt(0, 0, 0).unwrap().and_utc())
224 .unwrap_or_else(Utc::now);
225
226 let mut relationships = Vec::new();
227 if let Some(authorships) = work.authorships {
228 for authorship in authorships {
229 if let Some(author) = authorship.author {
230 relationships.push(Relationship {
231 target_id: author.id,
232 rel_type: "authored_by".to_string(),
233 weight: 1.0,
234 properties: {
235 let mut props = HashMap::new();
236 if let Some(name) = author.display_name {
237 props.insert("author_name".to_string(), serde_json::json!(name));
238 }
239 props
240 },
241 });
242 }
243 }
244 }
245
246 let mut data_map = serde_json::Map::new();
247 data_map.insert("title".to_string(), serde_json::json!(title));
248 data_map.insert("abstract".to_string(), serde_json::json!(abstract_text));
249 if let Some(citations) = work.cited_by_count {
250 data_map.insert("citations".to_string(), serde_json::json!(citations));
251 }
252
253 Ok(DataRecord {
254 id: work.id,
255 source: "openalex".to_string(),
256 record_type: "work".to_string(),
257 timestamp,
258 data: serde_json::Value::Object(data_map),
259 embedding: Some(embedding),
260 relationships,
261 })
262 }
263
264 fn author_to_record(&self, author: OpenAlexAuthorDetail) -> Result<DataRecord> {
266 let name = author
267 .display_name
268 .clone()
269 .unwrap_or_else(|| "Unknown".to_string());
270 let embedding = self.embedder.embed_text(&name);
271
272 let mut data_map = serde_json::Map::new();
273 data_map.insert("display_name".to_string(), serde_json::json!(name));
274 if let Some(works) = author.works_count {
275 data_map.insert("works_count".to_string(), serde_json::json!(works));
276 }
277 if let Some(citations) = author.cited_by_count {
278 data_map.insert("cited_by_count".to_string(), serde_json::json!(citations));
279 }
280
281 Ok(DataRecord {
282 id: author.id,
283 source: "openalex".to_string(),
284 record_type: "author".to_string(),
285 timestamp: Utc::now(),
286 data: serde_json::Value::Object(data_map),
287 embedding: Some(embedding),
288 relationships: Vec::new(),
289 })
290 }
291
292 fn reconstruct_abstract(&self, inverted_index: &HashMap<String, Vec<i32>>) -> String {
294 let mut positions: Vec<(i32, String)> = Vec::new();
295 for (word, indices) in inverted_index {
296 for &pos in indices {
297 positions.push((pos, word.clone()));
298 }
299 }
300 positions.sort_by_key(|&(pos, _)| pos);
301 positions
302 .into_iter()
303 .map(|(_, word)| word)
304 .collect::<Vec<_>>()
305 .join(" ")
306 }
307
308 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
310 let mut retries = 0;
311 loop {
312 match self.client.get(url).send().await {
313 Ok(response) => {
314 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
315 {
316 retries += 1;
317 let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
318 sleep(Duration::from_millis(delay)).await;
319 continue;
320 }
321 return Ok(response);
322 }
323 Err(_) if retries < MAX_RETRIES => {
324 retries += 1;
325 let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
326 sleep(Duration::from_millis(delay)).await;
327 }
328 Err(e) => return Err(FrameworkError::Network(e)),
329 }
330 }
331 }
332}
333
334#[async_trait]
335impl DataSource for OpenAlexClient {
336 fn source_id(&self) -> &str {
337 "openalex"
338 }
339
340 async fn fetch_batch(
341 &self,
342 cursor: Option<String>,
343 batch_size: usize,
344 ) -> Result<(Vec<DataRecord>, Option<String>)> {
345 let query = cursor.as_deref().unwrap_or("machine learning");
346 let records = self.search_works(query, batch_size).await?;
347 Ok((records, None))
348 }
349
350 async fn total_count(&self) -> Result<Option<u64>> {
351 Ok(None)
352 }
353
354 async fn health_check(&self) -> Result<bool> {
355 let response = self.client.get(&self.base_url).send().await?;
356 Ok(response.status().is_success())
357 }
358}
359
360#[derive(Debug, Deserialize)]
366struct CoreSearchResponse {
367 results: Vec<CoreWork>,
368 #[serde(rename = "totalHits")]
369 total_hits: Option<i64>,
370}
371
372#[derive(Debug, Deserialize)]
373struct CoreWork {
374 id: String,
375 title: Option<String>,
376 #[serde(rename = "abstract")]
377 abstract_text: Option<String>,
378 authors: Option<Vec<String>>,
379 #[serde(rename = "publishedDate")]
380 published_date: Option<String>,
381 #[serde(rename = "downloadUrl")]
382 download_url: Option<String>,
383 doi: Option<String>,
384}
385
386pub struct CoreClient {
388 client: Client,
389 base_url: String,
390 api_key: Option<String>,
391 rate_limit_delay: Duration,
392 embedder: Arc<SimpleEmbedder>,
393}
394
395impl CoreClient {
396 pub fn new(api_key: Option<String>) -> Result<Self> {
401 let client = Client::builder()
402 .timeout(Duration::from_secs(30))
403 .build()
404 .map_err(FrameworkError::Network)?;
405
406 Ok(Self {
407 client,
408 base_url: "https://api.core.ac.uk/v3".to_string(),
409 api_key,
410 rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
411 embedder: Arc::new(SimpleEmbedder::new(EMBEDDING_DIMENSION)),
412 })
413 }
414
415 pub async fn search_works(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
421 if self.api_key.is_none() {
422 return Ok(self.generate_mock_core_data(query, limit)?);
423 }
424
425 let url = format!("{}/search/works", self.base_url);
426 let body = serde_json::json!({
427 "q": query,
428 "limit": limit.min(100),
429 });
430
431 let mut request = self.client.post(&url).json(&body);
432 if let Some(key) = &self.api_key {
433 request = request.header("Authorization", format!("Bearer {}", key));
434 }
435
436 let response = self.fetch_with_retry(request).await?;
437 let search_response: CoreSearchResponse = response.json().await?;
438
439 let mut records = Vec::new();
440 for work in search_response.results {
441 let record = self.work_to_record(work)?;
442 records.push(record);
443 sleep(self.rate_limit_delay).await;
444 }
445
446 Ok(records)
447 }
448
449 pub async fn get_work(&self, id: &str) -> Result<DataRecord> {
454 if self.api_key.is_none() {
455 return Err(FrameworkError::Config(
456 "API key required for get_work".to_string(),
457 ));
458 }
459
460 let url = format!("{}/works/{}", self.base_url, id);
461 let mut request = self.client.get(&url);
462 if let Some(key) = &self.api_key {
463 request = request.header("Authorization", format!("Bearer {}", key));
464 }
465
466 let response = self.fetch_with_retry(request).await?;
467 let work: CoreWork = response.json().await?;
468 self.work_to_record(work)
469 }
470
471 pub async fn search_by_doi(&self, doi: &str) -> Result<Option<DataRecord>> {
476 let records = self.search_works(&format!("doi:{}", doi), 1).await?;
477 Ok(records.into_iter().next())
478 }
479
480 fn generate_mock_core_data(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
482 let mut records = Vec::new();
483 for i in 0..limit.min(5) {
484 let title = format!("Mock CORE paper about {}: Part {}", query, i + 1);
485 let abstract_text = format!(
486 "This is a mock abstract for demonstration. Topic: {}. ID: {}",
487 query,
488 i + 1
489 );
490 let text = format!("{} {}", title, abstract_text);
491 let embedding = self.embedder.embed_text(&text);
492
493 let mut data_map = serde_json::Map::new();
494 data_map.insert("title".to_string(), serde_json::json!(title));
495 data_map.insert("abstract".to_string(), serde_json::json!(abstract_text));
496 data_map.insert("mock".to_string(), serde_json::json!(true));
497
498 records.push(DataRecord {
499 id: format!("mock_core_{}", i),
500 source: "core".to_string(),
501 record_type: "work".to_string(),
502 timestamp: Utc::now(),
503 data: serde_json::Value::Object(data_map),
504 embedding: Some(embedding),
505 relationships: Vec::new(),
506 });
507 }
508 Ok(records)
509 }
510
511 fn work_to_record(&self, work: CoreWork) -> Result<DataRecord> {
513 let title = work.title.unwrap_or_else(|| "Untitled".to_string());
514 let abstract_text = work.abstract_text.unwrap_or_default();
515 let text = format!("{} {}", title, abstract_text);
516 let embedding = self.embedder.embed_text(&text);
517
518 let timestamp = work
519 .published_date
520 .as_ref()
521 .and_then(|d| NaiveDate::parse_from_str(d, "%Y-%m-%d").ok())
522 .map(|d| d.and_hms_opt(0, 0, 0).unwrap().and_utc())
523 .unwrap_or_else(Utc::now);
524
525 let mut data_map = serde_json::Map::new();
526 data_map.insert("title".to_string(), serde_json::json!(title));
527 data_map.insert("abstract".to_string(), serde_json::json!(abstract_text));
528 if let Some(authors) = work.authors {
529 data_map.insert("authors".to_string(), serde_json::json!(authors));
530 }
531 if let Some(doi) = work.doi {
532 data_map.insert("doi".to_string(), serde_json::json!(doi));
533 }
534 if let Some(url) = work.download_url {
535 data_map.insert("download_url".to_string(), serde_json::json!(url));
536 }
537
538 Ok(DataRecord {
539 id: work.id,
540 source: "core".to_string(),
541 record_type: "work".to_string(),
542 timestamp,
543 data: serde_json::Value::Object(data_map),
544 embedding: Some(embedding),
545 relationships: Vec::new(),
546 })
547 }
548
549 async fn fetch_with_retry(&self, request: reqwest::RequestBuilder) -> Result<reqwest::Response> {
551 let mut retries = 0;
552 loop {
553 let req = request
554 .try_clone()
555 .ok_or_else(|| FrameworkError::Config("Failed to clone request".to_string()))?;
556
557 match req.send().await {
558 Ok(response) => {
559 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
560 {
561 retries += 1;
562 let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
563 sleep(Duration::from_millis(delay)).await;
564 continue;
565 }
566 return Ok(response);
567 }
568 Err(_) if retries < MAX_RETRIES => {
569 retries += 1;
570 let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
571 sleep(Duration::from_millis(delay)).await;
572 }
573 Err(e) => return Err(FrameworkError::Network(e)),
574 }
575 }
576 }
577}
578
579#[async_trait]
580impl DataSource for CoreClient {
581 fn source_id(&self) -> &str {
582 "core"
583 }
584
585 async fn fetch_batch(
586 &self,
587 cursor: Option<String>,
588 batch_size: usize,
589 ) -> Result<(Vec<DataRecord>, Option<String>)> {
590 let query = cursor.as_deref().unwrap_or("open access");
591 let records = self.search_works(query, batch_size).await?;
592 Ok((records, None))
593 }
594
595 async fn total_count(&self) -> Result<Option<u64>> {
596 Ok(None)
597 }
598
599 async fn health_check(&self) -> Result<bool> {
600 Ok(true)
601 }
602}
603
604#[derive(Debug, Deserialize)]
610struct EricResponse {
611 response: EricResponseData,
612}
613
614#[derive(Debug, Deserialize)]
615struct EricResponseData {
616 docs: Vec<EricDocument>,
617 #[serde(rename = "numFound")]
618 num_found: Option<i64>,
619}
620
621#[derive(Debug, Deserialize)]
622struct EricDocument {
623 id: String,
624 title: Option<Vec<String>>,
625 #[serde(rename = "description")]
626 description: Option<Vec<String>>,
627 author: Option<Vec<String>>,
628 #[serde(rename = "publicationdateyear")]
629 publication_year: Option<i32>,
630 #[serde(rename = "publicationtype")]
631 publication_type: Option<Vec<String>>,
632}
633
634pub struct EricClient {
636 client: Client,
637 base_url: String,
638 rate_limit_delay: Duration,
639 embedder: Arc<SimpleEmbedder>,
640}
641
642impl EricClient {
643 pub fn new() -> Result<Self> {
645 let client = Client::builder()
646 .timeout(Duration::from_secs(30))
647 .build()
648 .map_err(FrameworkError::Network)?;
649
650 Ok(Self {
651 client,
652 base_url: "https://api.ies.ed.gov/eric".to_string(),
653 rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
654 embedder: Arc::new(SimpleEmbedder::new(EMBEDDING_DIMENSION)),
655 })
656 }
657
658 pub async fn search(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
664 let url = format!(
665 "{}/?search={}&rows={}&format=json",
666 self.base_url,
667 urlencoding::encode(query),
668 limit.min(100)
669 );
670
671 let response = self.fetch_with_retry(&url).await?;
672 let eric_response: EricResponse = response.json().await?;
673
674 let mut records = Vec::new();
675 for doc in eric_response.response.docs {
676 let record = self.document_to_record(doc)?;
677 records.push(record);
678 sleep(self.rate_limit_delay).await;
679 }
680
681 Ok(records)
682 }
683
684 pub async fn get_document(&self, eric_id: &str) -> Result<DataRecord> {
689 let url = format!("{}/?id={}&format=json", self.base_url, eric_id);
690 let response = self.fetch_with_retry(&url).await?;
691 let eric_response: EricResponse = response.json().await?;
692
693 eric_response
694 .response
695 .docs
696 .into_iter()
697 .next()
698 .ok_or_else(|| FrameworkError::Discovery("Document not found".to_string()))
699 .and_then(|doc| self.document_to_record(doc))
700 }
701
702 fn document_to_record(&self, doc: EricDocument) -> Result<DataRecord> {
704 let title = doc
705 .title
706 .and_then(|t| t.into_iter().next())
707 .unwrap_or_else(|| "Untitled".to_string());
708
709 let description = doc
710 .description
711 .and_then(|d| d.into_iter().next())
712 .unwrap_or_default();
713
714 let text = format!("{} {}", title, description);
715 let embedding = self.embedder.embed_text(&text);
716
717 let timestamp = doc
719 .publication_year
720 .and_then(|year| {
721 NaiveDate::from_ymd_opt(year, 1, 1)
722 .and_then(|d| d.and_hms_opt(0, 0, 0))
723 .map(|dt| dt.and_utc())
724 })
725 .unwrap_or_else(Utc::now);
726
727 let mut data_map = serde_json::Map::new();
728 data_map.insert("title".to_string(), serde_json::json!(title));
729 data_map.insert("description".to_string(), serde_json::json!(description));
730 if let Some(authors) = doc.author {
731 data_map.insert("authors".to_string(), serde_json::json!(authors));
732 }
733 if let Some(year) = doc.publication_year {
734 data_map.insert("publication_year".to_string(), serde_json::json!(year));
735 }
736 if let Some(pub_type) = doc.publication_type {
737 data_map.insert("publication_type".to_string(), serde_json::json!(pub_type));
738 }
739
740 Ok(DataRecord {
741 id: doc.id,
742 source: "eric".to_string(),
743 record_type: "document".to_string(),
744 timestamp,
745 data: serde_json::Value::Object(data_map),
746 embedding: Some(embedding),
747 relationships: Vec::new(),
748 })
749 }
750
751 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
753 let mut retries = 0;
754 loop {
755 match self.client.get(url).send().await {
756 Ok(response) => {
757 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
758 {
759 retries += 1;
760 let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
761 sleep(Duration::from_millis(delay)).await;
762 continue;
763 }
764 return Ok(response);
765 }
766 Err(_) if retries < MAX_RETRIES => {
767 retries += 1;
768 let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
769 sleep(Duration::from_millis(delay)).await;
770 }
771 Err(e) => return Err(FrameworkError::Network(e)),
772 }
773 }
774 }
775}
776
777impl Default for EricClient {
778 fn default() -> Self {
779 Self::new().unwrap()
780 }
781}
782
783#[async_trait]
784impl DataSource for EricClient {
785 fn source_id(&self) -> &str {
786 "eric"
787 }
788
789 async fn fetch_batch(
790 &self,
791 cursor: Option<String>,
792 batch_size: usize,
793 ) -> Result<(Vec<DataRecord>, Option<String>)> {
794 let query = cursor.as_deref().unwrap_or("education technology");
795 let records = self.search(query, batch_size).await?;
796 Ok((records, None))
797 }
798
799 async fn total_count(&self) -> Result<Option<u64>> {
800 Ok(None)
801 }
802
803 async fn health_check(&self) -> Result<bool> {
804 let response = self.client.get(&self.base_url).send().await?;
805 Ok(response.status().is_success())
806 }
807}
808
809#[derive(Debug, Deserialize)]
815struct UnpaywallResponse {
816 doi: String,
817 title: Option<String>,
818 #[serde(rename = "is_oa")]
819 is_oa: bool,
820 #[serde(rename = "best_oa_location")]
821 best_oa_location: Option<OaLocation>,
822 #[serde(rename = "published_date")]
823 published_date: Option<String>,
824 #[serde(rename = "journal_name")]
825 journal_name: Option<String>,
826 #[serde(rename = "z_authors")]
827 authors: Option<Vec<UnpaywallAuthor>>,
828}
829
830#[derive(Debug, Deserialize)]
831struct OaLocation {
832 url: Option<String>,
833 #[serde(rename = "url_for_pdf")]
834 url_for_pdf: Option<String>,
835 license: Option<String>,
836}
837
838#[derive(Debug, Deserialize)]
839struct UnpaywallAuthor {
840 family: Option<String>,
841 given: Option<String>,
842}
843
844pub struct UnpaywallClient {
846 client: Client,
847 base_url: String,
848 rate_limit_delay: Duration,
849 embedder: Arc<SimpleEmbedder>,
850}
851
852impl UnpaywallClient {
853 pub fn new() -> Result<Self> {
855 let client = Client::builder()
856 .timeout(Duration::from_secs(30))
857 .build()
858 .map_err(FrameworkError::Network)?;
859
860 Ok(Self {
861 client,
862 base_url: "https://api.unpaywall.org/v2".to_string(),
863 rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
864 embedder: Arc::new(SimpleEmbedder::new(EMBEDDING_DIMENSION)),
865 })
866 }
867
868 pub async fn get_by_doi(&self, doi: &str, email: &str) -> Result<DataRecord> {
874 let url = format!("{}/{}?email={}", self.base_url, doi, email);
875 let response = self.fetch_with_retry(&url).await?;
876 let unpaywall_response: UnpaywallResponse = response.json().await?;
877 self.response_to_record(unpaywall_response)
878 }
879
880 pub async fn batch_lookup(&self, dois: &[&str], email: &str) -> Result<Vec<DataRecord>> {
886 let mut records = Vec::new();
887 for doi in dois {
888 match self.get_by_doi(doi, email).await {
889 Ok(record) => records.push(record),
890 Err(e) => {
891 tracing::warn!("Failed to fetch DOI {}: {}", doi, e);
892 continue;
893 }
894 }
895 sleep(self.rate_limit_delay).await;
896 }
897 Ok(records)
898 }
899
900 fn response_to_record(&self, response: UnpaywallResponse) -> Result<DataRecord> {
902 let title = response
903 .title
904 .unwrap_or_else(|| "Untitled".to_string());
905
906 let embedding = self.embedder.embed_text(&title);
907
908 let timestamp = response
909 .published_date
910 .as_ref()
911 .and_then(|d| NaiveDate::parse_from_str(d, "%Y-%m-%d").ok())
912 .map(|d| d.and_hms_opt(0, 0, 0).unwrap().and_utc())
913 .unwrap_or_else(Utc::now);
914
915 let mut data_map = serde_json::Map::new();
916 data_map.insert("doi".to_string(), serde_json::json!(response.doi));
917 data_map.insert("title".to_string(), serde_json::json!(title));
918 data_map.insert("is_oa".to_string(), serde_json::json!(response.is_oa));
919
920 if let Some(location) = response.best_oa_location {
921 if let Some(url) = location.url {
922 data_map.insert("oa_url".to_string(), serde_json::json!(url));
923 }
924 if let Some(pdf) = location.url_for_pdf {
925 data_map.insert("pdf_url".to_string(), serde_json::json!(pdf));
926 }
927 if let Some(license) = location.license {
928 data_map.insert("license".to_string(), serde_json::json!(license));
929 }
930 }
931
932 if let Some(journal) = response.journal_name {
933 data_map.insert("journal".to_string(), serde_json::json!(journal));
934 }
935
936 if let Some(authors) = response.authors {
937 let author_names: Vec<String> = authors
938 .iter()
939 .map(|a| {
940 format!(
941 "{} {}",
942 a.given.as_deref().unwrap_or(""),
943 a.family.as_deref().unwrap_or("")
944 )
945 .trim()
946 .to_string()
947 })
948 .collect();
949 data_map.insert("authors".to_string(), serde_json::json!(author_names));
950 }
951
952 Ok(DataRecord {
953 id: response.doi,
954 source: "unpaywall".to_string(),
955 record_type: "article".to_string(),
956 timestamp,
957 data: serde_json::Value::Object(data_map),
958 embedding: Some(embedding),
959 relationships: Vec::new(),
960 })
961 }
962
963 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
965 let mut retries = 0;
966 loop {
967 match self.client.get(url).send().await {
968 Ok(response) => {
969 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
970 {
971 retries += 1;
972 let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
973 sleep(Duration::from_millis(delay)).await;
974 continue;
975 }
976 if response.status() == StatusCode::NOT_FOUND {
977 return Err(FrameworkError::Discovery("DOI not found".to_string()));
978 }
979 return Ok(response);
980 }
981 Err(_) if retries < MAX_RETRIES => {
982 retries += 1;
983 let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
984 sleep(Duration::from_millis(delay)).await;
985 }
986 Err(e) => return Err(FrameworkError::Network(e)),
987 }
988 }
989 }
990}
991
992impl Default for UnpaywallClient {
993 fn default() -> Self {
994 Self::new().unwrap()
995 }
996}
997
998#[async_trait]
999impl DataSource for UnpaywallClient {
1000 fn source_id(&self) -> &str {
1001 "unpaywall"
1002 }
1003
1004 async fn fetch_batch(
1005 &self,
1006 _cursor: Option<String>,
1007 _batch_size: usize,
1008 ) -> Result<(Vec<DataRecord>, Option<String>)> {
1009 Ok((Vec::new(), None))
1011 }
1012
1013 async fn total_count(&self) -> Result<Option<u64>> {
1014 Ok(None)
1015 }
1016
1017 async fn health_check(&self) -> Result<bool> {
1018 Ok(true)
1019 }
1020}
1021
1022#[cfg(test)]
1027mod tests {
1028 use super::*;
1029
1030 #[test]
1035 fn test_openalex_client_creation() {
1036 let client = OpenAlexClient::new(Some("test@example.com".to_string()));
1037 assert!(client.is_ok());
1038 let client = client.unwrap();
1039 assert_eq!(client.source_id(), "openalex");
1040 }
1041
1042 #[tokio::test]
1043 async fn test_openalex_health_check() {
1044 let client = OpenAlexClient::new(None).unwrap();
1045 let health = client.health_check().await;
1046 assert!(health.is_ok());
1047 }
1048
1049 #[test]
1050 fn test_openalex_work_to_record() {
1051 let client = OpenAlexClient::new(None).unwrap();
1052 let work = OpenAlexWork {
1053 id: "W123456".to_string(),
1054 display_name: Some("Test Paper".to_string()),
1055 publication_date: Some("2024-01-01".to_string()),
1056 authorships: None,
1057 cited_by_count: Some(10),
1058 abstract_inverted_index: None,
1059 };
1060
1061 let record = client.work_to_record(work).unwrap();
1062 assert_eq!(record.id, "W123456");
1063 assert_eq!(record.source, "openalex");
1064 assert_eq!(record.record_type, "work");
1065 assert!(record.embedding.is_some());
1066 assert_eq!(record.embedding.as_ref().unwrap().len(), EMBEDDING_DIMENSION);
1067 }
1068
1069 #[test]
1070 fn test_openalex_author_to_record() {
1071 let client = OpenAlexClient::new(None).unwrap();
1072 let author = OpenAlexAuthorDetail {
1073 id: "A123456".to_string(),
1074 display_name: Some("Jane Doe".to_string()),
1075 works_count: Some(50),
1076 cited_by_count: Some(500),
1077 };
1078
1079 let record = client.author_to_record(author).unwrap();
1080 assert_eq!(record.id, "A123456");
1081 assert_eq!(record.source, "openalex");
1082 assert_eq!(record.record_type, "author");
1083 assert!(record.embedding.is_some());
1084 }
1085
1086 #[test]
1087 fn test_openalex_reconstruct_abstract() {
1088 let client = OpenAlexClient::new(None).unwrap();
1089 let mut inverted_index = HashMap::new();
1090 inverted_index.insert("machine".to_string(), vec![0]);
1091 inverted_index.insert("learning".to_string(), vec![1]);
1092 inverted_index.insert("is".to_string(), vec![2]);
1093 inverted_index.insert("awesome".to_string(), vec![3]);
1094
1095 let abstract_text = client.reconstruct_abstract(&inverted_index);
1096 assert_eq!(abstract_text, "machine learning is awesome");
1097 }
1098
1099 #[test]
1104 fn test_core_client_creation() {
1105 let client = CoreClient::new(None);
1106 assert!(client.is_ok());
1107 let client = client.unwrap();
1108 assert_eq!(client.source_id(), "core");
1109 }
1110
1111 #[tokio::test]
1112 async fn test_core_mock_data() {
1113 let client = CoreClient::new(None).unwrap();
1114 let records = client.search_works("test query", 3).await.unwrap();
1115 assert_eq!(records.len(), 3);
1116 assert_eq!(records[0].source, "core");
1117 assert!(records[0].embedding.is_some());
1118
1119 let mock_flag = records[0].data.get("mock");
1121 assert!(mock_flag.is_some());
1122 assert_eq!(mock_flag.unwrap(), &serde_json::json!(true));
1123 }
1124
1125 #[test]
1126 fn test_core_work_to_record() {
1127 let client = CoreClient::new(None).unwrap();
1128 let work = CoreWork {
1129 id: "123456".to_string(),
1130 title: Some("Test Paper".to_string()),
1131 abstract_text: Some("This is a test abstract".to_string()),
1132 authors: Some(vec!["John Doe".to_string(), "Jane Smith".to_string()]),
1133 published_date: Some("2024-01-15".to_string()),
1134 download_url: Some("https://example.com/paper.pdf".to_string()),
1135 doi: Some("10.1234/test".to_string()),
1136 };
1137
1138 let record = client.work_to_record(work).unwrap();
1139 assert_eq!(record.id, "123456");
1140 assert_eq!(record.source, "core");
1141 assert!(record.embedding.is_some());
1142 assert_eq!(record.embedding.as_ref().unwrap().len(), EMBEDDING_DIMENSION);
1143
1144 assert_eq!(
1146 record.data.get("title").unwrap(),
1147 &serde_json::json!("Test Paper")
1148 );
1149 assert_eq!(
1150 record.data.get("doi").unwrap(),
1151 &serde_json::json!("10.1234/test")
1152 );
1153 }
1154
1155 #[tokio::test]
1156 async fn test_core_health_check() {
1157 let client = CoreClient::new(None).unwrap();
1158 let health = client.health_check().await;
1159 assert!(health.is_ok());
1160 assert!(health.unwrap());
1161 }
1162
1163 #[test]
1168 fn test_eric_client_creation() {
1169 let client = EricClient::new();
1170 assert!(client.is_ok());
1171 let client = client.unwrap();
1172 assert_eq!(client.source_id(), "eric");
1173 }
1174
1175 #[test]
1176 fn test_eric_default() {
1177 let client = EricClient::default();
1178 assert_eq!(client.source_id(), "eric");
1179 }
1180
1181 #[test]
1182 fn test_eric_document_to_record() {
1183 let client = EricClient::new().unwrap();
1184 let doc = EricDocument {
1185 id: "ED123456".to_string(),
1186 title: Some(vec!["Educational Technology in Schools".to_string()]),
1187 description: Some(vec!["A study on technology adoption".to_string()]),
1188 author: Some(vec!["Smith, John".to_string()]),
1189 publication_year: Some(2023),
1190 publication_type: Some(vec!["Journal Article".to_string()]),
1191 };
1192
1193 let record = client.document_to_record(doc).unwrap();
1194 assert_eq!(record.id, "ED123456");
1195 assert_eq!(record.source, "eric");
1196 assert_eq!(record.record_type, "document");
1197 assert!(record.embedding.is_some());
1198
1199 assert_eq!(
1201 record.data.get("publication_year").unwrap(),
1202 &serde_json::json!(2023)
1203 );
1204 }
1205
1206 #[tokio::test]
1207 async fn test_eric_health_check() {
1208 let client = EricClient::new().unwrap();
1209 let health = client.health_check().await;
1210 assert!(health.is_ok());
1211 }
1212
1213 #[test]
1218 fn test_unpaywall_client_creation() {
1219 let client = UnpaywallClient::new();
1220 assert!(client.is_ok());
1221 let client = client.unwrap();
1222 assert_eq!(client.source_id(), "unpaywall");
1223 }
1224
1225 #[test]
1226 fn test_unpaywall_default() {
1227 let client = UnpaywallClient::default();
1228 assert_eq!(client.source_id(), "unpaywall");
1229 }
1230
1231 #[test]
1232 fn test_unpaywall_response_to_record() {
1233 let client = UnpaywallClient::new().unwrap();
1234 let response = UnpaywallResponse {
1235 doi: "10.1234/test".to_string(),
1236 title: Some("Open Access Paper".to_string()),
1237 is_oa: true,
1238 best_oa_location: Some(OaLocation {
1239 url: Some("https://example.com/paper".to_string()),
1240 url_for_pdf: Some("https://example.com/paper.pdf".to_string()),
1241 license: Some("CC-BY".to_string()),
1242 }),
1243 published_date: Some("2024-01-01".to_string()),
1244 journal_name: Some("Test Journal".to_string()),
1245 authors: Some(vec![
1246 UnpaywallAuthor {
1247 family: Some("Doe".to_string()),
1248 given: Some("John".to_string()),
1249 },
1250 UnpaywallAuthor {
1251 family: Some("Smith".to_string()),
1252 given: Some("Jane".to_string()),
1253 },
1254 ]),
1255 };
1256
1257 let record = client.response_to_record(response).unwrap();
1258 assert_eq!(record.id, "10.1234/test");
1259 assert_eq!(record.source, "unpaywall");
1260 assert_eq!(record.record_type, "article");
1261 assert!(record.embedding.is_some());
1262
1263 assert_eq!(record.data.get("is_oa").unwrap(), &serde_json::json!(true));
1265 assert_eq!(
1266 record.data.get("license").unwrap(),
1267 &serde_json::json!("CC-BY")
1268 );
1269
1270 let authors = record.data.get("authors").unwrap();
1272 assert!(authors.is_array());
1273 let author_array = authors.as_array().unwrap();
1274 assert_eq!(author_array.len(), 2);
1275 }
1276
1277 #[tokio::test]
1278 async fn test_unpaywall_health_check() {
1279 let client = UnpaywallClient::new().unwrap();
1280 let health = client.health_check().await;
1281 assert!(health.is_ok());
1282 assert!(health.unwrap());
1283 }
1284
1285 #[tokio::test]
1286 async fn test_unpaywall_batch_lookup_empty() {
1287 let client = UnpaywallClient::new().unwrap();
1288 let records = client.batch_lookup(&[], "test@example.com").await.unwrap();
1289 assert_eq!(records.len(), 0);
1290 }
1291
1292 #[tokio::test]
1297 async fn test_all_clients_datasource_trait() {
1298 let openalex = OpenAlexClient::new(None).unwrap();
1299 let core = CoreClient::new(None).unwrap();
1300 let eric = EricClient::new().unwrap();
1301 let unpaywall = UnpaywallClient::new().unwrap();
1302
1303 assert_eq!(openalex.source_id(), "openalex");
1304 assert_eq!(core.source_id(), "core");
1305 assert_eq!(eric.source_id(), "eric");
1306 assert_eq!(unpaywall.source_id(), "unpaywall");
1307 }
1308
1309 #[test]
1310 fn test_embedding_dimensions() {
1311 let embedder = SimpleEmbedder::new(EMBEDDING_DIMENSION);
1312 let embedding = embedder.embed_text("test text");
1313 assert_eq!(embedding.len(), EMBEDDING_DIMENSION);
1314
1315 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
1317 assert!((magnitude - 1.0).abs() < 0.01);
1318 }
1319
1320 #[test]
1321 fn test_retry_exponential_backoff() {
1322 let base_delay = RETRY_DELAY_MS;
1324 assert_eq!(base_delay * 2_u64.pow(0), 1000); assert_eq!(base_delay * 2_u64.pow(1), 2000); assert_eq!(base_delay * 2_u64.pow(2), 4000); }
1328}