ruvector_data_framework/
academic_clients.rs

1//! Academic & Research API clients for scholarly data discovery
2//!
3//! This module provides async clients for fetching data from academic databases:
4//! - OpenAlex: Scholarly works and citations
5//! - CORE: Open access research papers
6//! - ERIC: Education research database
7//! - Unpaywall: Open access paper discovery
8
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::Duration;
12
13use async_trait::async_trait;
14use chrono::{NaiveDate, Utc};
15use reqwest::{Client, StatusCode};
16use serde::Deserialize;
17use tokio::time::sleep;
18
19use crate::{DataRecord, DataSource, FrameworkError, Relationship, Result, SimpleEmbedder};
20
21/// Rate limiting configuration
22const DEFAULT_RATE_LIMIT_DELAY_MS: u64 = 100;
23const MAX_RETRIES: u32 = 3;
24const RETRY_DELAY_MS: u64 = 1000;
25const EMBEDDING_DIMENSION: usize = 128;
26
27// ============================================================================
28// OpenAlex Client (Extended)
29// ============================================================================
30
31/// OpenAlex API response structures
32#[derive(Debug, Deserialize)]
33struct OpenAlexWorksResponse {
34    results: Vec<OpenAlexWork>,
35    meta: OpenAlexMeta,
36}
37
38#[derive(Debug, Deserialize)]
39struct OpenAlexWork {
40    id: String,
41    #[serde(rename = "display_name")]
42    display_name: Option<String>,
43    publication_date: Option<String>,
44    #[serde(rename = "authorships")]
45    authorships: Option<Vec<OpenAlexAuthorship>>,
46    #[serde(rename = "cited_by_count")]
47    cited_by_count: Option<i64>,
48    #[serde(rename = "abstract_inverted_index")]
49    abstract_inverted_index: Option<HashMap<String, Vec<i32>>>,
50}
51
52#[derive(Debug, Deserialize)]
53struct OpenAlexAuthorship {
54    author: Option<OpenAlexAuthor>,
55}
56
57#[derive(Debug, Deserialize)]
58struct OpenAlexAuthor {
59    id: String,
60    #[serde(rename = "display_name")]
61    display_name: Option<String>,
62}
63
64#[derive(Debug, Deserialize)]
65struct OpenAlexMeta {
66    count: i64,
67}
68
69#[derive(Debug, Deserialize)]
70struct OpenAlexAuthorsResponse {
71    results: Vec<OpenAlexAuthorDetail>,
72}
73
74#[derive(Debug, Deserialize)]
75struct OpenAlexAuthorDetail {
76    id: String,
77    #[serde(rename = "display_name")]
78    display_name: Option<String>,
79    #[serde(rename = "works_count")]
80    works_count: Option<i64>,
81    #[serde(rename = "cited_by_count")]
82    cited_by_count: Option<i64>,
83}
84
85/// Client for OpenAlex scholarly database
86pub struct OpenAlexClient {
87    client: Client,
88    base_url: String,
89    rate_limit_delay: Duration,
90    embedder: Arc<SimpleEmbedder>,
91    user_email: Option<String>,
92}
93
94impl OpenAlexClient {
95    /// Create a new OpenAlex client
96    ///
97    /// # Arguments
98    /// * `user_email` - Email for polite API usage (optional but recommended)
99    pub fn new(user_email: Option<String>) -> Result<Self> {
100        let client = Client::builder()
101            .timeout(Duration::from_secs(30))
102            .build()
103            .map_err(FrameworkError::Network)?;
104
105        Ok(Self {
106            client,
107            base_url: "https://api.openalex.org".to_string(),
108            rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
109            embedder: Arc::new(SimpleEmbedder::new(EMBEDDING_DIMENSION)),
110            user_email,
111        })
112    }
113
114    /// Search scholarly works
115    ///
116    /// # Arguments
117    /// * `query` - Search query string
118    /// * `limit` - Maximum number of results (max 200 per request)
119    pub async fn search_works(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
120        let mut url = format!(
121            "{}/works?search={}",
122            self.base_url,
123            urlencoding::encode(query)
124        );
125        url.push_str(&format!("&per-page={}", limit.min(200)));
126
127        if let Some(email) = &self.user_email {
128            url.push_str(&format!("&mailto={}", email));
129        }
130
131        let response = self.fetch_with_retry(&url).await?;
132        let works_response: OpenAlexWorksResponse = response.json().await?;
133
134        let mut records = Vec::new();
135        for work in works_response.results {
136            let record = self.work_to_record(work)?;
137            records.push(record);
138            sleep(self.rate_limit_delay).await;
139        }
140
141        Ok(records)
142    }
143
144    /// Get work by OpenAlex ID
145    ///
146    /// # Arguments
147    /// * `id` - OpenAlex work ID (e.g., "W2741809807")
148    pub async fn get_work(&self, id: &str) -> Result<DataRecord> {
149        let url = format!("{}/works/{}", self.base_url, id);
150        let response = self.fetch_with_retry(&url).await?;
151        let work: OpenAlexWork = response.json().await?;
152        self.work_to_record(work)
153    }
154
155    /// Search authors
156    ///
157    /// # Arguments
158    /// * `query` - Author name or affiliation
159    /// * `limit` - Maximum number of results
160    pub async fn search_authors(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
161        let mut url = format!(
162            "{}/authors?search={}",
163            self.base_url,
164            urlencoding::encode(query)
165        );
166        url.push_str(&format!("&per-page={}", limit.min(200)));
167
168        if let Some(email) = &self.user_email {
169            url.push_str(&format!("&mailto={}", email));
170        }
171
172        let response = self.fetch_with_retry(&url).await?;
173        let authors_response: OpenAlexAuthorsResponse = response.json().await?;
174
175        let mut records = Vec::new();
176        for author in authors_response.results {
177            let record = self.author_to_record(author)?;
178            records.push(record);
179            sleep(self.rate_limit_delay).await;
180        }
181
182        Ok(records)
183    }
184
185    /// Get citing works for a given work ID
186    ///
187    /// # Arguments
188    /// * `work_id` - OpenAlex work ID
189    pub async fn get_citations(&self, work_id: &str) -> Result<Vec<DataRecord>> {
190        let url = format!("{}/works?filter=cites:{}", self.base_url, work_id);
191        let response = self.fetch_with_retry(&url).await?;
192        let works_response: OpenAlexWorksResponse = response.json().await?;
193
194        let mut records = Vec::new();
195        for work in works_response.results {
196            let record = self.work_to_record(work)?;
197            records.push(record);
198            sleep(self.rate_limit_delay).await;
199        }
200
201        Ok(records)
202    }
203
204    /// Convert OpenAlex work to DataRecord
205    fn work_to_record(&self, work: OpenAlexWork) -> Result<DataRecord> {
206        let title = work
207            .display_name
208            .unwrap_or_else(|| "Untitled".to_string());
209
210        let abstract_text = work
211            .abstract_inverted_index
212            .as_ref()
213            .map(|index| self.reconstruct_abstract(index))
214            .unwrap_or_default();
215
216        let text = format!("{} {}", title, abstract_text);
217        let embedding = self.embedder.embed_text(&text);
218
219        let timestamp = work
220            .publication_date
221            .as_ref()
222            .and_then(|d| NaiveDate::parse_from_str(d, "%Y-%m-%d").ok())
223            .map(|d| d.and_hms_opt(0, 0, 0).unwrap().and_utc())
224            .unwrap_or_else(Utc::now);
225
226        let mut relationships = Vec::new();
227        if let Some(authorships) = work.authorships {
228            for authorship in authorships {
229                if let Some(author) = authorship.author {
230                    relationships.push(Relationship {
231                        target_id: author.id,
232                        rel_type: "authored_by".to_string(),
233                        weight: 1.0,
234                        properties: {
235                            let mut props = HashMap::new();
236                            if let Some(name) = author.display_name {
237                                props.insert("author_name".to_string(), serde_json::json!(name));
238                            }
239                            props
240                        },
241                    });
242                }
243            }
244        }
245
246        let mut data_map = serde_json::Map::new();
247        data_map.insert("title".to_string(), serde_json::json!(title));
248        data_map.insert("abstract".to_string(), serde_json::json!(abstract_text));
249        if let Some(citations) = work.cited_by_count {
250            data_map.insert("citations".to_string(), serde_json::json!(citations));
251        }
252
253        Ok(DataRecord {
254            id: work.id,
255            source: "openalex".to_string(),
256            record_type: "work".to_string(),
257            timestamp,
258            data: serde_json::Value::Object(data_map),
259            embedding: Some(embedding),
260            relationships,
261        })
262    }
263
264    /// Convert author to DataRecord
265    fn author_to_record(&self, author: OpenAlexAuthorDetail) -> Result<DataRecord> {
266        let name = author
267            .display_name
268            .clone()
269            .unwrap_or_else(|| "Unknown".to_string());
270        let embedding = self.embedder.embed_text(&name);
271
272        let mut data_map = serde_json::Map::new();
273        data_map.insert("display_name".to_string(), serde_json::json!(name));
274        if let Some(works) = author.works_count {
275            data_map.insert("works_count".to_string(), serde_json::json!(works));
276        }
277        if let Some(citations) = author.cited_by_count {
278            data_map.insert("cited_by_count".to_string(), serde_json::json!(citations));
279        }
280
281        Ok(DataRecord {
282            id: author.id,
283            source: "openalex".to_string(),
284            record_type: "author".to_string(),
285            timestamp: Utc::now(),
286            data: serde_json::Value::Object(data_map),
287            embedding: Some(embedding),
288            relationships: Vec::new(),
289        })
290    }
291
292    /// Reconstruct abstract from inverted index
293    fn reconstruct_abstract(&self, inverted_index: &HashMap<String, Vec<i32>>) -> String {
294        let mut positions: Vec<(i32, String)> = Vec::new();
295        for (word, indices) in inverted_index {
296            for &pos in indices {
297                positions.push((pos, word.clone()));
298            }
299        }
300        positions.sort_by_key(|&(pos, _)| pos);
301        positions
302            .into_iter()
303            .map(|(_, word)| word)
304            .collect::<Vec<_>>()
305            .join(" ")
306    }
307
308    /// Fetch with retry logic and exponential backoff
309    async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
310        let mut retries = 0;
311        loop {
312            match self.client.get(url).send().await {
313                Ok(response) => {
314                    if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
315                    {
316                        retries += 1;
317                        let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
318                        sleep(Duration::from_millis(delay)).await;
319                        continue;
320                    }
321                    return Ok(response);
322                }
323                Err(_) if retries < MAX_RETRIES => {
324                    retries += 1;
325                    let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
326                    sleep(Duration::from_millis(delay)).await;
327                }
328                Err(e) => return Err(FrameworkError::Network(e)),
329            }
330        }
331    }
332}
333
334#[async_trait]
335impl DataSource for OpenAlexClient {
336    fn source_id(&self) -> &str {
337        "openalex"
338    }
339
340    async fn fetch_batch(
341        &self,
342        cursor: Option<String>,
343        batch_size: usize,
344    ) -> Result<(Vec<DataRecord>, Option<String>)> {
345        let query = cursor.as_deref().unwrap_or("machine learning");
346        let records = self.search_works(query, batch_size).await?;
347        Ok((records, None))
348    }
349
350    async fn total_count(&self) -> Result<Option<u64>> {
351        Ok(None)
352    }
353
354    async fn health_check(&self) -> Result<bool> {
355        let response = self.client.get(&self.base_url).send().await?;
356        Ok(response.status().is_success())
357    }
358}
359
360// ============================================================================
361// CORE Client
362// ============================================================================
363
364/// CORE API response structures
365#[derive(Debug, Deserialize)]
366struct CoreSearchResponse {
367    results: Vec<CoreWork>,
368    #[serde(rename = "totalHits")]
369    total_hits: Option<i64>,
370}
371
372#[derive(Debug, Deserialize)]
373struct CoreWork {
374    id: String,
375    title: Option<String>,
376    #[serde(rename = "abstract")]
377    abstract_text: Option<String>,
378    authors: Option<Vec<String>>,
379    #[serde(rename = "publishedDate")]
380    published_date: Option<String>,
381    #[serde(rename = "downloadUrl")]
382    download_url: Option<String>,
383    doi: Option<String>,
384}
385
386/// Client for CORE open access papers
387pub struct CoreClient {
388    client: Client,
389    base_url: String,
390    api_key: Option<String>,
391    rate_limit_delay: Duration,
392    embedder: Arc<SimpleEmbedder>,
393}
394
395impl CoreClient {
396    /// Create a new CORE client
397    ///
398    /// # Arguments
399    /// * `api_key` - CORE API key (from https://core.ac.uk/services/api)
400    pub fn new(api_key: Option<String>) -> Result<Self> {
401        let client = Client::builder()
402            .timeout(Duration::from_secs(30))
403            .build()
404            .map_err(FrameworkError::Network)?;
405
406        Ok(Self {
407            client,
408            base_url: "https://api.core.ac.uk/v3".to_string(),
409            api_key,
410            rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
411            embedder: Arc::new(SimpleEmbedder::new(EMBEDDING_DIMENSION)),
412        })
413    }
414
415    /// Search open access works
416    ///
417    /// # Arguments
418    /// * `query` - Search query string
419    /// * `limit` - Maximum number of results
420    pub async fn search_works(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
421        if self.api_key.is_none() {
422            return Ok(self.generate_mock_core_data(query, limit)?);
423        }
424
425        let url = format!("{}/search/works", self.base_url);
426        let body = serde_json::json!({
427            "q": query,
428            "limit": limit.min(100),
429        });
430
431        let mut request = self.client.post(&url).json(&body);
432        if let Some(key) = &self.api_key {
433            request = request.header("Authorization", format!("Bearer {}", key));
434        }
435
436        let response = self.fetch_with_retry(request).await?;
437        let search_response: CoreSearchResponse = response.json().await?;
438
439        let mut records = Vec::new();
440        for work in search_response.results {
441            let record = self.work_to_record(work)?;
442            records.push(record);
443            sleep(self.rate_limit_delay).await;
444        }
445
446        Ok(records)
447    }
448
449    /// Get work by CORE ID
450    ///
451    /// # Arguments
452    /// * `id` - CORE work ID
453    pub async fn get_work(&self, id: &str) -> Result<DataRecord> {
454        if self.api_key.is_none() {
455            return Err(FrameworkError::Config(
456                "API key required for get_work".to_string(),
457            ));
458        }
459
460        let url = format!("{}/works/{}", self.base_url, id);
461        let mut request = self.client.get(&url);
462        if let Some(key) = &self.api_key {
463            request = request.header("Authorization", format!("Bearer {}", key));
464        }
465
466        let response = self.fetch_with_retry(request).await?;
467        let work: CoreWork = response.json().await?;
468        self.work_to_record(work)
469    }
470
471    /// Search by DOI
472    ///
473    /// # Arguments
474    /// * `doi` - Digital Object Identifier
475    pub async fn search_by_doi(&self, doi: &str) -> Result<Option<DataRecord>> {
476        let records = self.search_works(&format!("doi:{}", doi), 1).await?;
477        Ok(records.into_iter().next())
478    }
479
480    /// Generate mock CORE data when API key is missing
481    fn generate_mock_core_data(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
482        let mut records = Vec::new();
483        for i in 0..limit.min(5) {
484            let title = format!("Mock CORE paper about {}: Part {}", query, i + 1);
485            let abstract_text = format!(
486                "This is a mock abstract for demonstration. Topic: {}. ID: {}",
487                query,
488                i + 1
489            );
490            let text = format!("{} {}", title, abstract_text);
491            let embedding = self.embedder.embed_text(&text);
492
493            let mut data_map = serde_json::Map::new();
494            data_map.insert("title".to_string(), serde_json::json!(title));
495            data_map.insert("abstract".to_string(), serde_json::json!(abstract_text));
496            data_map.insert("mock".to_string(), serde_json::json!(true));
497
498            records.push(DataRecord {
499                id: format!("mock_core_{}", i),
500                source: "core".to_string(),
501                record_type: "work".to_string(),
502                timestamp: Utc::now(),
503                data: serde_json::Value::Object(data_map),
504                embedding: Some(embedding),
505                relationships: Vec::new(),
506            });
507        }
508        Ok(records)
509    }
510
511    /// Convert CORE work to DataRecord
512    fn work_to_record(&self, work: CoreWork) -> Result<DataRecord> {
513        let title = work.title.unwrap_or_else(|| "Untitled".to_string());
514        let abstract_text = work.abstract_text.unwrap_or_default();
515        let text = format!("{} {}", title, abstract_text);
516        let embedding = self.embedder.embed_text(&text);
517
518        let timestamp = work
519            .published_date
520            .as_ref()
521            .and_then(|d| NaiveDate::parse_from_str(d, "%Y-%m-%d").ok())
522            .map(|d| d.and_hms_opt(0, 0, 0).unwrap().and_utc())
523            .unwrap_or_else(Utc::now);
524
525        let mut data_map = serde_json::Map::new();
526        data_map.insert("title".to_string(), serde_json::json!(title));
527        data_map.insert("abstract".to_string(), serde_json::json!(abstract_text));
528        if let Some(authors) = work.authors {
529            data_map.insert("authors".to_string(), serde_json::json!(authors));
530        }
531        if let Some(doi) = work.doi {
532            data_map.insert("doi".to_string(), serde_json::json!(doi));
533        }
534        if let Some(url) = work.download_url {
535            data_map.insert("download_url".to_string(), serde_json::json!(url));
536        }
537
538        Ok(DataRecord {
539            id: work.id,
540            source: "core".to_string(),
541            record_type: "work".to_string(),
542            timestamp,
543            data: serde_json::Value::Object(data_map),
544            embedding: Some(embedding),
545            relationships: Vec::new(),
546        })
547    }
548
549    /// Fetch with retry logic
550    async fn fetch_with_retry(&self, request: reqwest::RequestBuilder) -> Result<reqwest::Response> {
551        let mut retries = 0;
552        loop {
553            let req = request
554                .try_clone()
555                .ok_or_else(|| FrameworkError::Config("Failed to clone request".to_string()))?;
556
557            match req.send().await {
558                Ok(response) => {
559                    if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
560                    {
561                        retries += 1;
562                        let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
563                        sleep(Duration::from_millis(delay)).await;
564                        continue;
565                    }
566                    return Ok(response);
567                }
568                Err(_) if retries < MAX_RETRIES => {
569                    retries += 1;
570                    let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
571                    sleep(Duration::from_millis(delay)).await;
572                }
573                Err(e) => return Err(FrameworkError::Network(e)),
574            }
575        }
576    }
577}
578
579#[async_trait]
580impl DataSource for CoreClient {
581    fn source_id(&self) -> &str {
582        "core"
583    }
584
585    async fn fetch_batch(
586        &self,
587        cursor: Option<String>,
588        batch_size: usize,
589    ) -> Result<(Vec<DataRecord>, Option<String>)> {
590        let query = cursor.as_deref().unwrap_or("open access");
591        let records = self.search_works(query, batch_size).await?;
592        Ok((records, None))
593    }
594
595    async fn total_count(&self) -> Result<Option<u64>> {
596        Ok(None)
597    }
598
599    async fn health_check(&self) -> Result<bool> {
600        Ok(true)
601    }
602}
603
604// ============================================================================
605// ERIC Client
606// ============================================================================
607
608/// ERIC API response structures
609#[derive(Debug, Deserialize)]
610struct EricResponse {
611    response: EricResponseData,
612}
613
614#[derive(Debug, Deserialize)]
615struct EricResponseData {
616    docs: Vec<EricDocument>,
617    #[serde(rename = "numFound")]
618    num_found: Option<i64>,
619}
620
621#[derive(Debug, Deserialize)]
622struct EricDocument {
623    id: String,
624    title: Option<Vec<String>>,
625    #[serde(rename = "description")]
626    description: Option<Vec<String>>,
627    author: Option<Vec<String>>,
628    #[serde(rename = "publicationdateyear")]
629    publication_year: Option<i32>,
630    #[serde(rename = "publicationtype")]
631    publication_type: Option<Vec<String>>,
632}
633
634/// Client for ERIC education research database
635pub struct EricClient {
636    client: Client,
637    base_url: String,
638    rate_limit_delay: Duration,
639    embedder: Arc<SimpleEmbedder>,
640}
641
642impl EricClient {
643    /// Create a new ERIC client (no auth required)
644    pub fn new() -> Result<Self> {
645        let client = Client::builder()
646            .timeout(Duration::from_secs(30))
647            .build()
648            .map_err(FrameworkError::Network)?;
649
650        Ok(Self {
651            client,
652            base_url: "https://api.ies.ed.gov/eric".to_string(),
653            rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
654            embedder: Arc::new(SimpleEmbedder::new(EMBEDDING_DIMENSION)),
655        })
656    }
657
658    /// Search education research documents
659    ///
660    /// # Arguments
661    /// * `query` - Search query string
662    /// * `limit` - Maximum number of results
663    pub async fn search(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
664        let url = format!(
665            "{}/?search={}&rows={}&format=json",
666            self.base_url,
667            urlencoding::encode(query),
668            limit.min(100)
669        );
670
671        let response = self.fetch_with_retry(&url).await?;
672        let eric_response: EricResponse = response.json().await?;
673
674        let mut records = Vec::new();
675        for doc in eric_response.response.docs {
676            let record = self.document_to_record(doc)?;
677            records.push(record);
678            sleep(self.rate_limit_delay).await;
679        }
680
681        Ok(records)
682    }
683
684    /// Get document by ERIC ID
685    ///
686    /// # Arguments
687    /// * `eric_id` - ERIC document ID (e.g., "ED123456")
688    pub async fn get_document(&self, eric_id: &str) -> Result<DataRecord> {
689        let url = format!("{}/?id={}&format=json", self.base_url, eric_id);
690        let response = self.fetch_with_retry(&url).await?;
691        let eric_response: EricResponse = response.json().await?;
692
693        eric_response
694            .response
695            .docs
696            .into_iter()
697            .next()
698            .ok_or_else(|| FrameworkError::Discovery("Document not found".to_string()))
699            .and_then(|doc| self.document_to_record(doc))
700    }
701
702    /// Convert ERIC document to DataRecord
703    fn document_to_record(&self, doc: EricDocument) -> Result<DataRecord> {
704        let title = doc
705            .title
706            .and_then(|t| t.into_iter().next())
707            .unwrap_or_else(|| "Untitled".to_string());
708
709        let description = doc
710            .description
711            .and_then(|d| d.into_iter().next())
712            .unwrap_or_default();
713
714        let text = format!("{} {}", title, description);
715        let embedding = self.embedder.embed_text(&text);
716
717        // Use publication year to estimate timestamp
718        let timestamp = doc
719            .publication_year
720            .and_then(|year| {
721                NaiveDate::from_ymd_opt(year, 1, 1)
722                    .and_then(|d| d.and_hms_opt(0, 0, 0))
723                    .map(|dt| dt.and_utc())
724            })
725            .unwrap_or_else(Utc::now);
726
727        let mut data_map = serde_json::Map::new();
728        data_map.insert("title".to_string(), serde_json::json!(title));
729        data_map.insert("description".to_string(), serde_json::json!(description));
730        if let Some(authors) = doc.author {
731            data_map.insert("authors".to_string(), serde_json::json!(authors));
732        }
733        if let Some(year) = doc.publication_year {
734            data_map.insert("publication_year".to_string(), serde_json::json!(year));
735        }
736        if let Some(pub_type) = doc.publication_type {
737            data_map.insert("publication_type".to_string(), serde_json::json!(pub_type));
738        }
739
740        Ok(DataRecord {
741            id: doc.id,
742            source: "eric".to_string(),
743            record_type: "document".to_string(),
744            timestamp,
745            data: serde_json::Value::Object(data_map),
746            embedding: Some(embedding),
747            relationships: Vec::new(),
748        })
749    }
750
751    /// Fetch with retry logic
752    async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
753        let mut retries = 0;
754        loop {
755            match self.client.get(url).send().await {
756                Ok(response) => {
757                    if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
758                    {
759                        retries += 1;
760                        let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
761                        sleep(Duration::from_millis(delay)).await;
762                        continue;
763                    }
764                    return Ok(response);
765                }
766                Err(_) if retries < MAX_RETRIES => {
767                    retries += 1;
768                    let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
769                    sleep(Duration::from_millis(delay)).await;
770                }
771                Err(e) => return Err(FrameworkError::Network(e)),
772            }
773        }
774    }
775}
776
777impl Default for EricClient {
778    fn default() -> Self {
779        Self::new().unwrap()
780    }
781}
782
783#[async_trait]
784impl DataSource for EricClient {
785    fn source_id(&self) -> &str {
786        "eric"
787    }
788
789    async fn fetch_batch(
790        &self,
791        cursor: Option<String>,
792        batch_size: usize,
793    ) -> Result<(Vec<DataRecord>, Option<String>)> {
794        let query = cursor.as_deref().unwrap_or("education technology");
795        let records = self.search(query, batch_size).await?;
796        Ok((records, None))
797    }
798
799    async fn total_count(&self) -> Result<Option<u64>> {
800        Ok(None)
801    }
802
803    async fn health_check(&self) -> Result<bool> {
804        let response = self.client.get(&self.base_url).send().await?;
805        Ok(response.status().is_success())
806    }
807}
808
809// ============================================================================
810// Unpaywall Client
811// ============================================================================
812
813/// Unpaywall API response structure
814#[derive(Debug, Deserialize)]
815struct UnpaywallResponse {
816    doi: String,
817    title: Option<String>,
818    #[serde(rename = "is_oa")]
819    is_oa: bool,
820    #[serde(rename = "best_oa_location")]
821    best_oa_location: Option<OaLocation>,
822    #[serde(rename = "published_date")]
823    published_date: Option<String>,
824    #[serde(rename = "journal_name")]
825    journal_name: Option<String>,
826    #[serde(rename = "z_authors")]
827    authors: Option<Vec<UnpaywallAuthor>>,
828}
829
830#[derive(Debug, Deserialize)]
831struct OaLocation {
832    url: Option<String>,
833    #[serde(rename = "url_for_pdf")]
834    url_for_pdf: Option<String>,
835    license: Option<String>,
836}
837
838#[derive(Debug, Deserialize)]
839struct UnpaywallAuthor {
840    family: Option<String>,
841    given: Option<String>,
842}
843
844/// Client for Unpaywall open access discovery
845pub struct UnpaywallClient {
846    client: Client,
847    base_url: String,
848    rate_limit_delay: Duration,
849    embedder: Arc<SimpleEmbedder>,
850}
851
852impl UnpaywallClient {
853    /// Create a new Unpaywall client
854    pub fn new() -> Result<Self> {
855        let client = Client::builder()
856            .timeout(Duration::from_secs(30))
857            .build()
858            .map_err(FrameworkError::Network)?;
859
860        Ok(Self {
861            client,
862            base_url: "https://api.unpaywall.org/v2".to_string(),
863            rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
864            embedder: Arc::new(SimpleEmbedder::new(EMBEDDING_DIMENSION)),
865        })
866    }
867
868    /// Get open access status by DOI
869    ///
870    /// # Arguments
871    /// * `doi` - Digital Object Identifier
872    /// * `email` - Email address (required by Unpaywall)
873    pub async fn get_by_doi(&self, doi: &str, email: &str) -> Result<DataRecord> {
874        let url = format!("{}/{}?email={}", self.base_url, doi, email);
875        let response = self.fetch_with_retry(&url).await?;
876        let unpaywall_response: UnpaywallResponse = response.json().await?;
877        self.response_to_record(unpaywall_response)
878    }
879
880    /// Batch lookup multiple DOIs
881    ///
882    /// # Arguments
883    /// * `dois` - List of DOIs
884    /// * `email` - Email address (required)
885    pub async fn batch_lookup(&self, dois: &[&str], email: &str) -> Result<Vec<DataRecord>> {
886        let mut records = Vec::new();
887        for doi in dois {
888            match self.get_by_doi(doi, email).await {
889                Ok(record) => records.push(record),
890                Err(e) => {
891                    tracing::warn!("Failed to fetch DOI {}: {}", doi, e);
892                    continue;
893                }
894            }
895            sleep(self.rate_limit_delay).await;
896        }
897        Ok(records)
898    }
899
900    /// Convert Unpaywall response to DataRecord
901    fn response_to_record(&self, response: UnpaywallResponse) -> Result<DataRecord> {
902        let title = response
903            .title
904            .unwrap_or_else(|| "Untitled".to_string());
905
906        let embedding = self.embedder.embed_text(&title);
907
908        let timestamp = response
909            .published_date
910            .as_ref()
911            .and_then(|d| NaiveDate::parse_from_str(d, "%Y-%m-%d").ok())
912            .map(|d| d.and_hms_opt(0, 0, 0).unwrap().and_utc())
913            .unwrap_or_else(Utc::now);
914
915        let mut data_map = serde_json::Map::new();
916        data_map.insert("doi".to_string(), serde_json::json!(response.doi));
917        data_map.insert("title".to_string(), serde_json::json!(title));
918        data_map.insert("is_oa".to_string(), serde_json::json!(response.is_oa));
919
920        if let Some(location) = response.best_oa_location {
921            if let Some(url) = location.url {
922                data_map.insert("oa_url".to_string(), serde_json::json!(url));
923            }
924            if let Some(pdf) = location.url_for_pdf {
925                data_map.insert("pdf_url".to_string(), serde_json::json!(pdf));
926            }
927            if let Some(license) = location.license {
928                data_map.insert("license".to_string(), serde_json::json!(license));
929            }
930        }
931
932        if let Some(journal) = response.journal_name {
933            data_map.insert("journal".to_string(), serde_json::json!(journal));
934        }
935
936        if let Some(authors) = response.authors {
937            let author_names: Vec<String> = authors
938                .iter()
939                .map(|a| {
940                    format!(
941                        "{} {}",
942                        a.given.as_deref().unwrap_or(""),
943                        a.family.as_deref().unwrap_or("")
944                    )
945                    .trim()
946                    .to_string()
947                })
948                .collect();
949            data_map.insert("authors".to_string(), serde_json::json!(author_names));
950        }
951
952        Ok(DataRecord {
953            id: response.doi,
954            source: "unpaywall".to_string(),
955            record_type: "article".to_string(),
956            timestamp,
957            data: serde_json::Value::Object(data_map),
958            embedding: Some(embedding),
959            relationships: Vec::new(),
960        })
961    }
962
963    /// Fetch with retry logic
964    async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
965        let mut retries = 0;
966        loop {
967            match self.client.get(url).send().await {
968                Ok(response) => {
969                    if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
970                    {
971                        retries += 1;
972                        let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
973                        sleep(Duration::from_millis(delay)).await;
974                        continue;
975                    }
976                    if response.status() == StatusCode::NOT_FOUND {
977                        return Err(FrameworkError::Discovery("DOI not found".to_string()));
978                    }
979                    return Ok(response);
980                }
981                Err(_) if retries < MAX_RETRIES => {
982                    retries += 1;
983                    let delay = RETRY_DELAY_MS * 2_u64.pow(retries - 1);
984                    sleep(Duration::from_millis(delay)).await;
985                }
986                Err(e) => return Err(FrameworkError::Network(e)),
987            }
988        }
989    }
990}
991
992impl Default for UnpaywallClient {
993    fn default() -> Self {
994        Self::new().unwrap()
995    }
996}
997
998#[async_trait]
999impl DataSource for UnpaywallClient {
1000    fn source_id(&self) -> &str {
1001        "unpaywall"
1002    }
1003
1004    async fn fetch_batch(
1005        &self,
1006        _cursor: Option<String>,
1007        _batch_size: usize,
1008    ) -> Result<(Vec<DataRecord>, Option<String>)> {
1009        // Unpaywall doesn't support bulk search, only DOI lookup
1010        Ok((Vec::new(), None))
1011    }
1012
1013    async fn total_count(&self) -> Result<Option<u64>> {
1014        Ok(None)
1015    }
1016
1017    async fn health_check(&self) -> Result<bool> {
1018        Ok(true)
1019    }
1020}
1021
1022// ============================================================================
1023// Tests
1024// ============================================================================
1025
1026#[cfg(test)]
1027mod tests {
1028    use super::*;
1029
1030    // ========================================================================
1031    // OpenAlex Tests
1032    // ========================================================================
1033
1034    #[test]
1035    fn test_openalex_client_creation() {
1036        let client = OpenAlexClient::new(Some("test@example.com".to_string()));
1037        assert!(client.is_ok());
1038        let client = client.unwrap();
1039        assert_eq!(client.source_id(), "openalex");
1040    }
1041
1042    #[tokio::test]
1043    async fn test_openalex_health_check() {
1044        let client = OpenAlexClient::new(None).unwrap();
1045        let health = client.health_check().await;
1046        assert!(health.is_ok());
1047    }
1048
1049    #[test]
1050    fn test_openalex_work_to_record() {
1051        let client = OpenAlexClient::new(None).unwrap();
1052        let work = OpenAlexWork {
1053            id: "W123456".to_string(),
1054            display_name: Some("Test Paper".to_string()),
1055            publication_date: Some("2024-01-01".to_string()),
1056            authorships: None,
1057            cited_by_count: Some(10),
1058            abstract_inverted_index: None,
1059        };
1060
1061        let record = client.work_to_record(work).unwrap();
1062        assert_eq!(record.id, "W123456");
1063        assert_eq!(record.source, "openalex");
1064        assert_eq!(record.record_type, "work");
1065        assert!(record.embedding.is_some());
1066        assert_eq!(record.embedding.as_ref().unwrap().len(), EMBEDDING_DIMENSION);
1067    }
1068
1069    #[test]
1070    fn test_openalex_author_to_record() {
1071        let client = OpenAlexClient::new(None).unwrap();
1072        let author = OpenAlexAuthorDetail {
1073            id: "A123456".to_string(),
1074            display_name: Some("Jane Doe".to_string()),
1075            works_count: Some(50),
1076            cited_by_count: Some(500),
1077        };
1078
1079        let record = client.author_to_record(author).unwrap();
1080        assert_eq!(record.id, "A123456");
1081        assert_eq!(record.source, "openalex");
1082        assert_eq!(record.record_type, "author");
1083        assert!(record.embedding.is_some());
1084    }
1085
1086    #[test]
1087    fn test_openalex_reconstruct_abstract() {
1088        let client = OpenAlexClient::new(None).unwrap();
1089        let mut inverted_index = HashMap::new();
1090        inverted_index.insert("machine".to_string(), vec![0]);
1091        inverted_index.insert("learning".to_string(), vec![1]);
1092        inverted_index.insert("is".to_string(), vec![2]);
1093        inverted_index.insert("awesome".to_string(), vec![3]);
1094
1095        let abstract_text = client.reconstruct_abstract(&inverted_index);
1096        assert_eq!(abstract_text, "machine learning is awesome");
1097    }
1098
1099    // ========================================================================
1100    // CORE Tests
1101    // ========================================================================
1102
1103    #[test]
1104    fn test_core_client_creation() {
1105        let client = CoreClient::new(None);
1106        assert!(client.is_ok());
1107        let client = client.unwrap();
1108        assert_eq!(client.source_id(), "core");
1109    }
1110
1111    #[tokio::test]
1112    async fn test_core_mock_data() {
1113        let client = CoreClient::new(None).unwrap();
1114        let records = client.search_works("test query", 3).await.unwrap();
1115        assert_eq!(records.len(), 3);
1116        assert_eq!(records[0].source, "core");
1117        assert!(records[0].embedding.is_some());
1118
1119        // Verify mock flag
1120        let mock_flag = records[0].data.get("mock");
1121        assert!(mock_flag.is_some());
1122        assert_eq!(mock_flag.unwrap(), &serde_json::json!(true));
1123    }
1124
1125    #[test]
1126    fn test_core_work_to_record() {
1127        let client = CoreClient::new(None).unwrap();
1128        let work = CoreWork {
1129            id: "123456".to_string(),
1130            title: Some("Test Paper".to_string()),
1131            abstract_text: Some("This is a test abstract".to_string()),
1132            authors: Some(vec!["John Doe".to_string(), "Jane Smith".to_string()]),
1133            published_date: Some("2024-01-15".to_string()),
1134            download_url: Some("https://example.com/paper.pdf".to_string()),
1135            doi: Some("10.1234/test".to_string()),
1136        };
1137
1138        let record = client.work_to_record(work).unwrap();
1139        assert_eq!(record.id, "123456");
1140        assert_eq!(record.source, "core");
1141        assert!(record.embedding.is_some());
1142        assert_eq!(record.embedding.as_ref().unwrap().len(), EMBEDDING_DIMENSION);
1143
1144        // Verify data fields
1145        assert_eq!(
1146            record.data.get("title").unwrap(),
1147            &serde_json::json!("Test Paper")
1148        );
1149        assert_eq!(
1150            record.data.get("doi").unwrap(),
1151            &serde_json::json!("10.1234/test")
1152        );
1153    }
1154
1155    #[tokio::test]
1156    async fn test_core_health_check() {
1157        let client = CoreClient::new(None).unwrap();
1158        let health = client.health_check().await;
1159        assert!(health.is_ok());
1160        assert!(health.unwrap());
1161    }
1162
1163    // ========================================================================
1164    // ERIC Tests
1165    // ========================================================================
1166
1167    #[test]
1168    fn test_eric_client_creation() {
1169        let client = EricClient::new();
1170        assert!(client.is_ok());
1171        let client = client.unwrap();
1172        assert_eq!(client.source_id(), "eric");
1173    }
1174
1175    #[test]
1176    fn test_eric_default() {
1177        let client = EricClient::default();
1178        assert_eq!(client.source_id(), "eric");
1179    }
1180
1181    #[test]
1182    fn test_eric_document_to_record() {
1183        let client = EricClient::new().unwrap();
1184        let doc = EricDocument {
1185            id: "ED123456".to_string(),
1186            title: Some(vec!["Educational Technology in Schools".to_string()]),
1187            description: Some(vec!["A study on technology adoption".to_string()]),
1188            author: Some(vec!["Smith, John".to_string()]),
1189            publication_year: Some(2023),
1190            publication_type: Some(vec!["Journal Article".to_string()]),
1191        };
1192
1193        let record = client.document_to_record(doc).unwrap();
1194        assert_eq!(record.id, "ED123456");
1195        assert_eq!(record.source, "eric");
1196        assert_eq!(record.record_type, "document");
1197        assert!(record.embedding.is_some());
1198
1199        // Verify year conversion
1200        assert_eq!(
1201            record.data.get("publication_year").unwrap(),
1202            &serde_json::json!(2023)
1203        );
1204    }
1205
1206    #[tokio::test]
1207    async fn test_eric_health_check() {
1208        let client = EricClient::new().unwrap();
1209        let health = client.health_check().await;
1210        assert!(health.is_ok());
1211    }
1212
1213    // ========================================================================
1214    // Unpaywall Tests
1215    // ========================================================================
1216
1217    #[test]
1218    fn test_unpaywall_client_creation() {
1219        let client = UnpaywallClient::new();
1220        assert!(client.is_ok());
1221        let client = client.unwrap();
1222        assert_eq!(client.source_id(), "unpaywall");
1223    }
1224
1225    #[test]
1226    fn test_unpaywall_default() {
1227        let client = UnpaywallClient::default();
1228        assert_eq!(client.source_id(), "unpaywall");
1229    }
1230
1231    #[test]
1232    fn test_unpaywall_response_to_record() {
1233        let client = UnpaywallClient::new().unwrap();
1234        let response = UnpaywallResponse {
1235            doi: "10.1234/test".to_string(),
1236            title: Some("Open Access Paper".to_string()),
1237            is_oa: true,
1238            best_oa_location: Some(OaLocation {
1239                url: Some("https://example.com/paper".to_string()),
1240                url_for_pdf: Some("https://example.com/paper.pdf".to_string()),
1241                license: Some("CC-BY".to_string()),
1242            }),
1243            published_date: Some("2024-01-01".to_string()),
1244            journal_name: Some("Test Journal".to_string()),
1245            authors: Some(vec![
1246                UnpaywallAuthor {
1247                    family: Some("Doe".to_string()),
1248                    given: Some("John".to_string()),
1249                },
1250                UnpaywallAuthor {
1251                    family: Some("Smith".to_string()),
1252                    given: Some("Jane".to_string()),
1253                },
1254            ]),
1255        };
1256
1257        let record = client.response_to_record(response).unwrap();
1258        assert_eq!(record.id, "10.1234/test");
1259        assert_eq!(record.source, "unpaywall");
1260        assert_eq!(record.record_type, "article");
1261        assert!(record.embedding.is_some());
1262
1263        // Verify OA fields
1264        assert_eq!(record.data.get("is_oa").unwrap(), &serde_json::json!(true));
1265        assert_eq!(
1266            record.data.get("license").unwrap(),
1267            &serde_json::json!("CC-BY")
1268        );
1269
1270        // Verify authors
1271        let authors = record.data.get("authors").unwrap();
1272        assert!(authors.is_array());
1273        let author_array = authors.as_array().unwrap();
1274        assert_eq!(author_array.len(), 2);
1275    }
1276
1277    #[tokio::test]
1278    async fn test_unpaywall_health_check() {
1279        let client = UnpaywallClient::new().unwrap();
1280        let health = client.health_check().await;
1281        assert!(health.is_ok());
1282        assert!(health.unwrap());
1283    }
1284
1285    #[tokio::test]
1286    async fn test_unpaywall_batch_lookup_empty() {
1287        let client = UnpaywallClient::new().unwrap();
1288        let records = client.batch_lookup(&[], "test@example.com").await.unwrap();
1289        assert_eq!(records.len(), 0);
1290    }
1291
1292    // ========================================================================
1293    // Integration Tests
1294    // ========================================================================
1295
1296    #[tokio::test]
1297    async fn test_all_clients_datasource_trait() {
1298        let openalex = OpenAlexClient::new(None).unwrap();
1299        let core = CoreClient::new(None).unwrap();
1300        let eric = EricClient::new().unwrap();
1301        let unpaywall = UnpaywallClient::new().unwrap();
1302
1303        assert_eq!(openalex.source_id(), "openalex");
1304        assert_eq!(core.source_id(), "core");
1305        assert_eq!(eric.source_id(), "eric");
1306        assert_eq!(unpaywall.source_id(), "unpaywall");
1307    }
1308
1309    #[test]
1310    fn test_embedding_dimensions() {
1311        let embedder = SimpleEmbedder::new(EMBEDDING_DIMENSION);
1312        let embedding = embedder.embed_text("test text");
1313        assert_eq!(embedding.len(), EMBEDDING_DIMENSION);
1314
1315        // Check normalization
1316        let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
1317        assert!((magnitude - 1.0).abs() < 0.01);
1318    }
1319
1320    #[test]
1321    fn test_retry_exponential_backoff() {
1322        // Test that retry delays increase exponentially
1323        let base_delay = RETRY_DELAY_MS;
1324        assert_eq!(base_delay * 2_u64.pow(0), 1000); // First retry: 1s
1325        assert_eq!(base_delay * 2_u64.pow(1), 2000); // Second retry: 2s
1326        assert_eq!(base_delay * 2_u64.pow(2), 4000); // Third retry: 4s
1327    }
1328}