1use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use chrono::{DateTime, NaiveDateTime, Utc};
19use reqwest::{Client, StatusCode};
20use serde::Deserialize;
21use tokio::time::sleep;
22
23use crate::{DataRecord, DataSource, FrameworkError, Relationship, Result, SimpleEmbedder};
24
25const DEFAULT_RATE_LIMIT_DELAY_MS: u64 = 100;
27const MAX_RETRIES: u32 = 3;
28const RETRY_DELAY_MS: u64 = 1000;
29
30#[derive(Debug, Deserialize)]
36struct HNItem {
37 id: i64,
38 #[serde(rename = "type")]
39 item_type: String,
40 by: Option<String>,
41 time: i64,
42 text: Option<String>,
43 title: Option<String>,
44 url: Option<String>,
45 score: Option<i64>,
46 #[serde(default)]
47 kids: Vec<i64>,
48 descendants: Option<i64>,
49}
50
51#[derive(Debug, Deserialize)]
53struct HNUser {
54 id: String,
55 created: i64,
56 karma: i64,
57 about: Option<String>,
58 #[serde(default)]
59 submitted: Vec<i64>,
60}
61
62pub struct HackerNewsClient {
64 client: Client,
65 base_url: String,
66 rate_limit_delay: Duration,
67 embedder: Arc<SimpleEmbedder>,
68}
69
70impl HackerNewsClient {
71 pub fn new() -> Result<Self> {
75 let client = Client::builder()
76 .timeout(Duration::from_secs(30))
77 .build()
78 .map_err(|e| FrameworkError::Network(e))?;
79
80 Ok(Self {
81 client,
82 base_url: "https://hacker-news.firebaseio.com/v0".to_string(),
83 rate_limit_delay: Duration::from_millis(DEFAULT_RATE_LIMIT_DELAY_MS),
84 embedder: Arc::new(SimpleEmbedder::new(128)),
85 })
86 }
87
88 pub async fn get_top_stories(&self, limit: usize) -> Result<Vec<DataRecord>> {
93 let url = format!("{}/topstories.json", self.base_url);
94 let response = self.fetch_with_retry(&url).await?;
95 let story_ids: Vec<i64> = response.json().await?;
96
97 self.fetch_items(&story_ids[..limit.min(story_ids.len())])
98 .await
99 }
100
101 pub async fn get_new_stories(&self, limit: usize) -> Result<Vec<DataRecord>> {
106 let url = format!("{}/newstories.json", self.base_url);
107 let response = self.fetch_with_retry(&url).await?;
108 let story_ids: Vec<i64> = response.json().await?;
109
110 self.fetch_items(&story_ids[..limit.min(story_ids.len())])
111 .await
112 }
113
114 pub async fn get_best_stories(&self, limit: usize) -> Result<Vec<DataRecord>> {
119 let url = format!("{}/beststories.json", self.base_url);
120 let response = self.fetch_with_retry(&url).await?;
121 let story_ids: Vec<i64> = response.json().await?;
122
123 self.fetch_items(&story_ids[..limit.min(story_ids.len())])
124 .await
125 }
126
127 pub async fn get_item(&self, id: i64) -> Result<DataRecord> {
132 let url = format!("{}/item/{}.json", self.base_url, id);
133 let response = self.fetch_with_retry(&url).await?;
134 let item: HNItem = response.json().await?;
135
136 self.item_to_record(item)
137 }
138
139 pub async fn get_user(&self, username: &str) -> Result<DataRecord> {
144 let url = format!("{}/user/{}.json", self.base_url, username);
145 let response = self.fetch_with_retry(&url).await?;
146 let user: HNUser = response.json().await?;
147
148 self.user_to_record(user)
149 }
150
151 async fn fetch_items(&self, ids: &[i64]) -> Result<Vec<DataRecord>> {
153 let mut records = Vec::new();
154
155 for &id in ids {
156 match self.get_item(id).await {
157 Ok(record) => records.push(record),
158 Err(e) => {
159 tracing::warn!("Failed to fetch HN item {}: {}", id, e);
160 }
161 }
162 sleep(self.rate_limit_delay).await;
163 }
164
165 Ok(records)
166 }
167
168 fn item_to_record(&self, item: HNItem) -> Result<DataRecord> {
170 let text_content = format!(
171 "{} {}",
172 item.title.as_deref().unwrap_or(""),
173 item.text.as_deref().unwrap_or("")
174 );
175 let embedding = self.embedder.embed_text(&text_content);
176
177 let timestamp = DateTime::from_timestamp(item.time, 0).unwrap_or_else(Utc::now);
179
180 let mut relationships = Vec::new();
182
183 if let Some(author) = &item.by {
185 relationships.push(Relationship {
186 target_id: format!("hn_user_{}", author),
187 rel_type: "authored_by".to_string(),
188 weight: 1.0,
189 properties: {
190 let mut props = HashMap::new();
191 props.insert("username".to_string(), serde_json::json!(author));
192 props
193 },
194 });
195 }
196
197 for &kid_id in &item.kids {
199 relationships.push(Relationship {
200 target_id: format!("hn_item_{}", kid_id),
201 rel_type: "has_comment".to_string(),
202 weight: 1.0,
203 properties: HashMap::new(),
204 });
205 }
206
207 let mut data_map = serde_json::Map::new();
208 data_map.insert("item_type".to_string(), serde_json::json!(item.item_type));
209 if let Some(title) = item.title {
210 data_map.insert("title".to_string(), serde_json::json!(title));
211 }
212 if let Some(url) = item.url {
213 data_map.insert("url".to_string(), serde_json::json!(url));
214 }
215 if let Some(text) = item.text {
216 data_map.insert("text".to_string(), serde_json::json!(text));
217 }
218 if let Some(score) = item.score {
219 data_map.insert("score".to_string(), serde_json::json!(score));
220 }
221 if let Some(descendants) = item.descendants {
222 data_map.insert("descendants".to_string(), serde_json::json!(descendants));
223 }
224 data_map.insert("comments_count".to_string(), serde_json::json!(item.kids.len()));
225
226 Ok(DataRecord {
227 id: format!("hn_item_{}", item.id),
228 source: "hackernews".to_string(),
229 record_type: item.item_type,
230 timestamp,
231 data: serde_json::Value::Object(data_map),
232 embedding: Some(embedding),
233 relationships,
234 })
235 }
236
237 fn user_to_record(&self, user: HNUser) -> Result<DataRecord> {
239 let text_content = format!(
240 "{} {}",
241 user.id,
242 user.about.as_deref().unwrap_or("")
243 );
244 let embedding = self.embedder.embed_text(&text_content);
245
246 let timestamp = DateTime::from_timestamp(user.created, 0).unwrap_or_else(Utc::now);
247
248 let mut data_map = serde_json::Map::new();
249 data_map.insert("username".to_string(), serde_json::json!(user.id));
250 data_map.insert("karma".to_string(), serde_json::json!(user.karma));
251 if let Some(about) = user.about {
252 data_map.insert("about".to_string(), serde_json::json!(about));
253 }
254 data_map.insert(
255 "submissions_count".to_string(),
256 serde_json::json!(user.submitted.len()),
257 );
258
259 Ok(DataRecord {
260 id: format!("hn_user_{}", user.id),
261 source: "hackernews".to_string(),
262 record_type: "user".to_string(),
263 timestamp,
264 data: serde_json::Value::Object(data_map),
265 embedding: Some(embedding),
266 relationships: Vec::new(),
267 })
268 }
269
270 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
272 let mut retries = 0;
273 loop {
274 match self.client.get(url).send().await {
275 Ok(response) => {
276 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
277 {
278 retries += 1;
279 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
280 continue;
281 }
282 return Ok(response);
283 }
284 Err(_) if retries < MAX_RETRIES => {
285 retries += 1;
286 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
287 }
288 Err(e) => return Err(FrameworkError::Network(e)),
289 }
290 }
291 }
292}
293
294#[async_trait]
295impl DataSource for HackerNewsClient {
296 fn source_id(&self) -> &str {
297 "hackernews"
298 }
299
300 async fn fetch_batch(
301 &self,
302 _cursor: Option<String>,
303 batch_size: usize,
304 ) -> Result<(Vec<DataRecord>, Option<String>)> {
305 let records = self.get_top_stories(batch_size).await?;
306 Ok((records, None))
307 }
308
309 async fn total_count(&self) -> Result<Option<u64>> {
310 Ok(None)
311 }
312
313 async fn health_check(&self) -> Result<bool> {
314 let response = self
315 .client
316 .get(format!("{}/maxitem.json", self.base_url))
317 .send()
318 .await?;
319 Ok(response.status().is_success())
320 }
321}
322
323#[derive(Debug, Deserialize)]
329struct GuardianResponse {
330 response: GuardianResponseBody,
331}
332
333#[derive(Debug, Deserialize)]
334struct GuardianResponseBody {
335 status: String,
336 #[serde(default)]
337 results: Vec<GuardianArticle>,
338}
339
340#[derive(Debug, Deserialize)]
341struct GuardianArticle {
342 id: String,
343 #[serde(rename = "type")]
344 article_type: String,
345 #[serde(rename = "sectionId")]
346 section_id: Option<String>,
347 #[serde(rename = "sectionName")]
348 section_name: Option<String>,
349 #[serde(rename = "webPublicationDate")]
350 web_publication_date: String,
351 #[serde(rename = "webTitle")]
352 web_title: String,
353 #[serde(rename = "webUrl")]
354 web_url: String,
355 #[serde(rename = "apiUrl")]
356 api_url: String,
357 fields: Option<GuardianFields>,
358 tags: Option<Vec<GuardianTag>>,
359}
360
361#[derive(Debug, Deserialize)]
362struct GuardianFields {
363 body: Option<String>,
364 headline: Option<String>,
365 standfirst: Option<String>,
366 #[serde(rename = "bodyText")]
367 body_text: Option<String>,
368}
369
370#[derive(Debug, Deserialize)]
371struct GuardianTag {
372 id: String,
373 #[serde(rename = "type")]
374 tag_type: String,
375 #[serde(rename = "webTitle")]
376 web_title: String,
377}
378
379#[derive(Debug, Deserialize)]
381struct GuardianSectionsResponse {
382 response: GuardianSectionsBody,
383}
384
385#[derive(Debug, Deserialize)]
386struct GuardianSectionsBody {
387 #[serde(default)]
388 results: Vec<GuardianSection>,
389}
390
391#[derive(Debug, Deserialize)]
392struct GuardianSection {
393 id: String,
394 #[serde(rename = "webTitle")]
395 web_title: String,
396 #[serde(rename = "webUrl")]
397 web_url: String,
398}
399
400pub struct GuardianClient {
402 client: Client,
403 base_url: String,
404 api_key: Option<String>,
405 rate_limit_delay: Duration,
406 embedder: Arc<SimpleEmbedder>,
407}
408
409impl GuardianClient {
410 pub fn new(api_key: Option<String>) -> Result<Self> {
417 let client = Client::builder()
418 .timeout(Duration::from_secs(30))
419 .build()
420 .map_err(|e| FrameworkError::Network(e))?;
421
422 Ok(Self {
423 client,
424 base_url: "https://content.guardianapis.com".to_string(),
425 api_key,
426 rate_limit_delay: Duration::from_millis(100), embedder: Arc::new(SimpleEmbedder::new(128)),
428 })
429 }
430
431 pub async fn search(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
437 if self.api_key.is_none() {
438 return Ok(self.generate_synthetic_articles(query, limit)?);
439 }
440
441 let url = format!(
442 "{}/search?q={}&page-size={}&show-fields=all&show-tags=all&api-key={}",
443 self.base_url,
444 urlencoding::encode(query),
445 limit.min(200),
446 self.api_key.as_ref().unwrap()
447 );
448
449 let response = self.fetch_with_retry(&url).await?;
450 let guardian_response: GuardianResponse = response.json().await?;
451
452 let mut records = Vec::new();
453 for article in guardian_response.response.results {
454 let record = self.article_to_record(article)?;
455 records.push(record);
456 }
457
458 Ok(records)
459 }
460
461 pub async fn get_article(&self, id: &str) -> Result<DataRecord> {
466 if self.api_key.is_none() {
467 return Err(FrameworkError::Config(
468 "Guardian API key required".to_string(),
469 ));
470 }
471
472 let url = format!(
473 "{}/{}?show-fields=all&show-tags=all&api-key={}",
474 self.base_url,
475 id,
476 self.api_key.as_ref().unwrap()
477 );
478
479 let response = self.fetch_with_retry(&url).await?;
480 let guardian_response: GuardianResponse = response.json().await?;
481
482 if let Some(article) = guardian_response.response.results.into_iter().next() {
483 self.article_to_record(article)
484 } else {
485 Err(FrameworkError::Discovery("Article not found".to_string()))
486 }
487 }
488
489 pub async fn get_sections(&self) -> Result<Vec<DataRecord>> {
491 if self.api_key.is_none() {
492 return Ok(self.generate_synthetic_sections()?);
493 }
494
495 let url = format!("{}/sections?api-key={}", self.base_url, self.api_key.as_ref().unwrap());
496
497 let response = self.fetch_with_retry(&url).await?;
498 let sections_response: GuardianSectionsResponse = response.json().await?;
499
500 let mut records = Vec::new();
501 for section in sections_response.response.results {
502 let record = self.section_to_record(section)?;
503 records.push(record);
504 }
505
506 Ok(records)
507 }
508
509 pub async fn search_by_tag(&self, tag: &str, limit: usize) -> Result<Vec<DataRecord>> {
515 if self.api_key.is_none() {
516 return Ok(self.generate_synthetic_articles(tag, limit)?);
517 }
518
519 let url = format!(
520 "{}/search?tag={}&page-size={}&show-fields=all&api-key={}",
521 self.base_url,
522 urlencoding::encode(tag),
523 limit.min(200),
524 self.api_key.as_ref().unwrap()
525 );
526
527 let response = self.fetch_with_retry(&url).await?;
528 let guardian_response: GuardianResponse = response.json().await?;
529
530 let mut records = Vec::new();
531 for article in guardian_response.response.results {
532 let record = self.article_to_record(article)?;
533 records.push(record);
534 }
535
536 Ok(records)
537 }
538
539 fn generate_synthetic_articles(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
541 let mut records = Vec::new();
542
543 for i in 0..limit.min(10) {
544 let title = format!("Synthetic Guardian article about {}: Part {}", query, i + 1);
545 let text = format!(
546 "This is a synthetic article for demonstration. Query: {}. Content would appear here.",
547 query
548 );
549 let embedding = self.embedder.embed_text(&format!("{} {}", title, text));
550
551 let mut data_map = serde_json::Map::new();
552 data_map.insert("title".to_string(), serde_json::json!(title));
553 data_map.insert("body_text".to_string(), serde_json::json!(text));
554 data_map.insert("section".to_string(), serde_json::json!("world"));
555 data_map.insert(
556 "url".to_string(),
557 serde_json::json!(format!(
558 "https://www.theguardian.com/world/synthetic-{}",
559 i
560 )),
561 );
562
563 records.push(DataRecord {
564 id: format!("guardian_synthetic_{}", i),
565 source: "guardian".to_string(),
566 record_type: "article".to_string(),
567 timestamp: Utc::now(),
568 data: serde_json::Value::Object(data_map),
569 embedding: Some(embedding),
570 relationships: Vec::new(),
571 });
572 }
573
574 Ok(records)
575 }
576
577 fn generate_synthetic_sections(&self) -> Result<Vec<DataRecord>> {
579 let sections = vec!["world", "politics", "business", "technology", "science"];
580 let mut records = Vec::new();
581
582 for (_i, section) in sections.iter().enumerate() {
583 let embedding = self.embedder.embed_text(section);
584
585 let mut data_map = serde_json::Map::new();
586 data_map.insert("section_id".to_string(), serde_json::json!(section));
587 data_map.insert(
588 "title".to_string(),
589 serde_json::json!(format!("{} News", section)),
590 );
591
592 records.push(DataRecord {
593 id: format!("guardian_section_{}", section),
594 source: "guardian".to_string(),
595 record_type: "section".to_string(),
596 timestamp: Utc::now(),
597 data: serde_json::Value::Object(data_map),
598 embedding: Some(embedding),
599 relationships: Vec::new(),
600 });
601 }
602
603 Ok(records)
604 }
605
606 fn article_to_record(&self, article: GuardianArticle) -> Result<DataRecord> {
608 let body_text = article
609 .fields
610 .as_ref()
611 .and_then(|f| f.body_text.as_deref())
612 .unwrap_or("");
613 let text_content = format!("{} {}", article.web_title, body_text);
614 let embedding = self.embedder.embed_text(&text_content);
615
616 let timestamp = DateTime::parse_from_rfc3339(&article.web_publication_date)
618 .map(|dt| dt.with_timezone(&Utc))
619 .unwrap_or_else(|_| Utc::now());
620
621 let mut relationships = Vec::new();
623 if let Some(tags) = article.tags {
624 for tag in tags {
625 relationships.push(Relationship {
626 target_id: format!("guardian_tag_{}", tag.id),
627 rel_type: "has_tag".to_string(),
628 weight: 1.0,
629 properties: {
630 let mut props = HashMap::new();
631 props.insert("tag_type".to_string(), serde_json::json!(tag.tag_type));
632 props.insert("tag_title".to_string(), serde_json::json!(tag.web_title));
633 props
634 },
635 });
636 }
637 }
638
639 let mut data_map = serde_json::Map::new();
640 data_map.insert("title".to_string(), serde_json::json!(article.web_title));
641 data_map.insert("url".to_string(), serde_json::json!(article.web_url));
642 data_map.insert("api_url".to_string(), serde_json::json!(article.api_url));
643 if let Some(section_name) = article.section_name {
644 data_map.insert("section".to_string(), serde_json::json!(section_name));
645 }
646 if let Some(fields) = article.fields {
647 if let Some(headline) = fields.headline {
648 data_map.insert("headline".to_string(), serde_json::json!(headline));
649 }
650 if let Some(standfirst) = fields.standfirst {
651 data_map.insert("standfirst".to_string(), serde_json::json!(standfirst));
652 }
653 if let Some(body_text) = fields.body_text {
654 data_map.insert("body_text".to_string(), serde_json::json!(body_text));
655 }
656 }
657
658 Ok(DataRecord {
659 id: format!("guardian_{}", article.id.replace('/', "_")),
660 source: "guardian".to_string(),
661 record_type: article.article_type,
662 timestamp,
663 data: serde_json::Value::Object(data_map),
664 embedding: Some(embedding),
665 relationships,
666 })
667 }
668
669 fn section_to_record(&self, section: GuardianSection) -> Result<DataRecord> {
671 let embedding = self.embedder.embed_text(§ion.web_title);
672
673 let mut data_map = serde_json::Map::new();
674 data_map.insert("section_id".to_string(), serde_json::json!(section.id));
675 data_map.insert("title".to_string(), serde_json::json!(section.web_title));
676 data_map.insert("url".to_string(), serde_json::json!(section.web_url));
677
678 Ok(DataRecord {
679 id: format!("guardian_section_{}", section.id),
680 source: "guardian".to_string(),
681 record_type: "section".to_string(),
682 timestamp: Utc::now(),
683 data: serde_json::Value::Object(data_map),
684 embedding: Some(embedding),
685 relationships: Vec::new(),
686 })
687 }
688
689 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
691 let mut retries = 0;
692 loop {
693 match self.client.get(url).send().await {
694 Ok(response) => {
695 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
696 {
697 retries += 1;
698 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
699 continue;
700 }
701 return Ok(response);
702 }
703 Err(_) if retries < MAX_RETRIES => {
704 retries += 1;
705 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
706 }
707 Err(e) => return Err(FrameworkError::Network(e)),
708 }
709 }
710 }
711}
712
713#[async_trait]
714impl DataSource for GuardianClient {
715 fn source_id(&self) -> &str {
716 "guardian"
717 }
718
719 async fn fetch_batch(
720 &self,
721 cursor: Option<String>,
722 batch_size: usize,
723 ) -> Result<(Vec<DataRecord>, Option<String>)> {
724 let query = cursor.as_deref().unwrap_or("technology");
725 let records = self.search(query, batch_size).await?;
726 Ok((records, None))
727 }
728
729 async fn total_count(&self) -> Result<Option<u64>> {
730 Ok(None)
731 }
732
733 async fn health_check(&self) -> Result<bool> {
734 Ok(true)
735 }
736}
737
738#[derive(Debug, Deserialize)]
744struct NewsDataResponse {
745 status: String,
746 #[serde(default)]
747 results: Vec<NewsDataArticle>,
748 #[serde(rename = "nextPage")]
749 next_page: Option<String>,
750}
751
752#[derive(Debug, Deserialize)]
753struct NewsDataArticle {
754 #[serde(rename = "article_id")]
755 article_id: String,
756 title: String,
757 link: String,
758 #[serde(default)]
759 keywords: Vec<String>,
760 creator: Option<Vec<String>>,
761 description: Option<String>,
762 content: Option<String>,
763 #[serde(rename = "pubDate")]
764 pub_date: Option<String>,
765 #[serde(rename = "image_url")]
766 image_url: Option<String>,
767 #[serde(rename = "source_id")]
768 source_id: String,
769 category: Option<Vec<String>>,
770 country: Option<Vec<String>>,
771 language: Option<String>,
772}
773
774pub struct NewsDataClient {
776 client: Client,
777 base_url: String,
778 api_key: Option<String>,
779 rate_limit_delay: Duration,
780 embedder: Arc<SimpleEmbedder>,
781}
782
783impl NewsDataClient {
784 pub fn new(api_key: Option<String>) -> Result<Self> {
791 let client = Client::builder()
792 .timeout(Duration::from_secs(30))
793 .build()
794 .map_err(|e| FrameworkError::Network(e))?;
795
796 Ok(Self {
797 client,
798 base_url: "https://newsdata.io/api/1".to_string(),
799 api_key,
800 rate_limit_delay: Duration::from_millis(500), embedder: Arc::new(SimpleEmbedder::new(128)),
802 })
803 }
804
805 pub async fn get_latest(
812 &self,
813 query: Option<&str>,
814 country: Option<&str>,
815 category: Option<&str>,
816 ) -> Result<Vec<DataRecord>> {
817 if self.api_key.is_none() {
818 return Ok(self.generate_synthetic_news(
819 query.unwrap_or("technology"),
820 10,
821 )?);
822 }
823
824 let mut url = format!(
825 "{}/news?apikey={}",
826 self.base_url,
827 self.api_key.as_ref().unwrap()
828 );
829
830 if let Some(q) = query {
831 url.push_str(&format!("&q={}", urlencoding::encode(q)));
832 }
833 if let Some(c) = country {
834 url.push_str(&format!("&country={}", c));
835 }
836 if let Some(cat) = category {
837 url.push_str(&format!("&category={}", cat));
838 }
839
840 let response = self.fetch_with_retry(&url).await?;
841 let news_response: NewsDataResponse = response.json().await?;
842
843 let mut records = Vec::new();
844 for article in news_response.results {
845 let record = self.article_to_record(article)?;
846 records.push(record);
847 }
848
849 Ok(records)
850 }
851
852 pub async fn get_archive(
859 &self,
860 query: Option<&str>,
861 from_date: &str,
862 to_date: &str,
863 ) -> Result<Vec<DataRecord>> {
864 if self.api_key.is_none() {
865 return Ok(self.generate_synthetic_news(
866 query.unwrap_or("archive"),
867 10,
868 )?);
869 }
870
871 let mut url = format!(
872 "{}/archive?apikey={}&from_date={}&to_date={}",
873 self.base_url,
874 self.api_key.as_ref().unwrap(),
875 from_date,
876 to_date
877 );
878
879 if let Some(q) = query {
880 url.push_str(&format!("&q={}", urlencoding::encode(q)));
881 }
882
883 let response = self.fetch_with_retry(&url).await?;
884 let news_response: NewsDataResponse = response.json().await?;
885
886 let mut records = Vec::new();
887 for article in news_response.results {
888 let record = self.article_to_record(article)?;
889 records.push(record);
890 }
891
892 Ok(records)
893 }
894
895 fn generate_synthetic_news(&self, query: &str, limit: usize) -> Result<Vec<DataRecord>> {
897 let mut records = Vec::new();
898
899 for i in 0..limit {
900 let title = format!("Synthetic news article about {}: Story {}", query, i + 1);
901 let description = format!(
902 "This is synthetic news content for demonstration. Topic: {}",
903 query
904 );
905 let embedding = self.embedder.embed_text(&format!("{} {}", title, description));
906
907 let mut data_map = serde_json::Map::new();
908 data_map.insert("title".to_string(), serde_json::json!(title));
909 data_map.insert("description".to_string(), serde_json::json!(description));
910 data_map.insert(
911 "url".to_string(),
912 serde_json::json!(format!("https://example.com/news/{}", i)),
913 );
914 data_map.insert("source".to_string(), serde_json::json!("synthetic"));
915 data_map.insert("category".to_string(), serde_json::json!(["technology"]));
916
917 records.push(DataRecord {
918 id: format!("newsdata_synthetic_{}", i),
919 source: "newsdata".to_string(),
920 record_type: "article".to_string(),
921 timestamp: Utc::now(),
922 data: serde_json::Value::Object(data_map),
923 embedding: Some(embedding),
924 relationships: Vec::new(),
925 });
926 }
927
928 Ok(records)
929 }
930
931 fn article_to_record(&self, article: NewsDataArticle) -> Result<DataRecord> {
933 let description = article.description.as_deref().unwrap_or("");
934 let content = article.content.as_deref().unwrap_or("");
935 let text_content = format!("{} {} {}", article.title, description, content);
936 let embedding = self.embedder.embed_text(&text_content);
937
938 let timestamp = article
940 .pub_date
941 .as_ref()
942 .and_then(|d| {
943 DateTime::parse_from_rfc3339(d)
945 .ok()
946 .map(|dt| dt.with_timezone(&Utc))
947 .or_else(|| {
948 NaiveDateTime::parse_from_str(d, "%Y-%m-%d %H:%M:%S")
949 .ok()
950 .map(|ndt| ndt.and_utc())
951 })
952 })
953 .unwrap_or_else(Utc::now);
954
955 let mut data_map = serde_json::Map::new();
956 data_map.insert("title".to_string(), serde_json::json!(article.title));
957 data_map.insert("url".to_string(), serde_json::json!(article.link));
958 data_map.insert("source".to_string(), serde_json::json!(article.source_id));
959
960 if let Some(desc) = article.description {
961 data_map.insert("description".to_string(), serde_json::json!(desc));
962 }
963 if let Some(content) = article.content {
964 data_map.insert("content".to_string(), serde_json::json!(content));
965 }
966 if let Some(image) = article.image_url {
967 data_map.insert("image_url".to_string(), serde_json::json!(image));
968 }
969 if let Some(lang) = article.language {
970 data_map.insert("language".to_string(), serde_json::json!(lang));
971 }
972 if !article.keywords.is_empty() {
973 data_map.insert("keywords".to_string(), serde_json::json!(article.keywords));
974 }
975 if let Some(categories) = article.category {
976 data_map.insert("categories".to_string(), serde_json::json!(categories));
977 }
978 if let Some(countries) = article.country {
979 data_map.insert("countries".to_string(), serde_json::json!(countries));
980 }
981 if let Some(creators) = article.creator {
982 data_map.insert("creators".to_string(), serde_json::json!(creators));
983 }
984
985 Ok(DataRecord {
986 id: format!("newsdata_{}", article.article_id),
987 source: "newsdata".to_string(),
988 record_type: "article".to_string(),
989 timestamp,
990 data: serde_json::Value::Object(data_map),
991 embedding: Some(embedding),
992 relationships: Vec::new(),
993 })
994 }
995
996 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
998 let mut retries = 0;
999 loop {
1000 match self.client.get(url).send().await {
1001 Ok(response) => {
1002 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
1003 {
1004 retries += 1;
1005 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
1006 continue;
1007 }
1008 return Ok(response);
1009 }
1010 Err(_) if retries < MAX_RETRIES => {
1011 retries += 1;
1012 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
1013 }
1014 Err(e) => return Err(FrameworkError::Network(e)),
1015 }
1016 }
1017 }
1018}
1019
1020#[async_trait]
1021impl DataSource for NewsDataClient {
1022 fn source_id(&self) -> &str {
1023 "newsdata"
1024 }
1025
1026 async fn fetch_batch(
1027 &self,
1028 _cursor: Option<String>,
1029 _batch_size: usize,
1030 ) -> Result<(Vec<DataRecord>, Option<String>)> {
1031 let records = self.get_latest(Some("technology"), None, None).await?;
1032 Ok((records, None))
1033 }
1034
1035 async fn total_count(&self) -> Result<Option<u64>> {
1036 Ok(None)
1037 }
1038
1039 async fn health_check(&self) -> Result<bool> {
1040 Ok(true)
1041 }
1042}
1043
1044#[derive(Debug, Deserialize)]
1050struct RedditListing {
1051 data: RedditListingData,
1052}
1053
1054#[derive(Debug, Deserialize)]
1055struct RedditListingData {
1056 #[serde(default)]
1057 children: Vec<RedditChild>,
1058 after: Option<String>,
1059 before: Option<String>,
1060}
1061
1062#[derive(Debug, Deserialize)]
1063struct RedditChild {
1064 kind: String,
1065 data: RedditPost,
1066}
1067
1068#[derive(Debug, Clone, Deserialize)]
1070struct RedditPost {
1071 id: String,
1072 name: String,
1073 title: Option<String>,
1074 selftext: Option<String>,
1075 body: Option<String>,
1076 author: String,
1077 subreddit: String,
1078 #[serde(rename = "subreddit_id")]
1079 subreddit_id: String,
1080 score: i64,
1081 #[serde(rename = "num_comments")]
1082 num_comments: Option<i64>,
1083 created_utc: f64,
1084 permalink: String,
1085 url: Option<String>,
1086 #[serde(default)]
1087 is_self: bool,
1088 domain: Option<String>,
1089}
1090
1091pub struct RedditClient {
1093 client: Client,
1094 base_url: String,
1095 rate_limit_delay: Duration,
1096 embedder: Arc<SimpleEmbedder>,
1097}
1098
1099impl RedditClient {
1100 pub fn new() -> Result<Self> {
1105 let client = Client::builder()
1106 .timeout(Duration::from_secs(30))
1107 .user_agent("RuVector Data Framework/1.0")
1108 .build()
1109 .map_err(|e| FrameworkError::Network(e))?;
1110
1111 Ok(Self {
1112 client,
1113 base_url: "https://www.reddit.com".to_string(),
1114 rate_limit_delay: Duration::from_millis(1000), embedder: Arc::new(SimpleEmbedder::new(128)),
1116 })
1117 }
1118
1119 pub async fn get_subreddit_posts(
1126 &self,
1127 subreddit: &str,
1128 sort: &str,
1129 limit: usize,
1130 ) -> Result<Vec<DataRecord>> {
1131 let url = format!(
1132 "{}/r/{}/{}.json?limit={}",
1133 self.base_url,
1134 subreddit,
1135 sort,
1136 limit.min(100)
1137 );
1138
1139 let response = self.fetch_with_retry(&url).await?;
1140 let listing: RedditListing = response.json().await?;
1141
1142 let mut records = Vec::new();
1143 for child in &listing.data.children {
1144 if child.kind == "t3" {
1145 let record = self.post_to_record(&child.data, "post")?;
1147 records.push(record);
1148 }
1149 }
1150
1151 Ok(records)
1152 }
1153
1154 pub async fn get_post_comments(&self, post_id: &str) -> Result<Vec<DataRecord>> {
1159 let url = format!("{}/comments/{}.json", self.base_url, post_id);
1161
1162 let response = self.fetch_with_retry(&url).await?;
1163 let listings: Vec<RedditListing> = response.json().await?;
1164
1165 let mut records = Vec::new();
1166
1167 if listings.len() >= 2 {
1169 for child in &listings[1].data.children {
1170 if child.kind == "t1" {
1171 let record = self.post_to_record(&child.data, "comment")?;
1173 records.push(record);
1174 }
1175 }
1176 }
1177
1178 Ok(records)
1179 }
1180
1181 pub async fn search(
1188 &self,
1189 query: &str,
1190 subreddit: Option<&str>,
1191 limit: usize,
1192 ) -> Result<Vec<DataRecord>> {
1193 let url = if let Some(sub) = subreddit {
1194 format!(
1195 "{}/r/{}/search.json?q={}&restrict_sr=on&limit={}",
1196 self.base_url,
1197 sub,
1198 urlencoding::encode(query),
1199 limit.min(100)
1200 )
1201 } else {
1202 format!(
1203 "{}/search.json?q={}&limit={}",
1204 self.base_url,
1205 urlencoding::encode(query),
1206 limit.min(100)
1207 )
1208 };
1209
1210 let response = self.fetch_with_retry(&url).await?;
1211 let listing: RedditListing = response.json().await?;
1212
1213 let mut records = Vec::new();
1214 for child in &listing.data.children {
1215 if child.kind == "t3" {
1216 let record = self.post_to_record(&child.data, "post")?;
1217 records.push(record);
1218 }
1219 }
1220
1221 Ok(records)
1222 }
1223
1224 fn post_to_record(&self, post: &RedditPost, record_type: &str) -> Result<DataRecord> {
1226 let text_content = format!(
1227 "{} {} {}",
1228 post.title.as_deref().unwrap_or(""),
1229 post.selftext.as_deref().unwrap_or(""),
1230 post.body.as_deref().unwrap_or("")
1231 );
1232 let embedding = self.embedder.embed_text(&text_content);
1233
1234 let timestamp =
1236 DateTime::from_timestamp(post.created_utc as i64, 0).unwrap_or_else(Utc::now);
1237
1238 let mut relationships = Vec::new();
1240
1241 relationships.push(Relationship {
1243 target_id: format!("reddit_user_{}", post.author),
1244 rel_type: "authored_by".to_string(),
1245 weight: 1.0,
1246 properties: {
1247 let mut props = HashMap::new();
1248 props.insert("username".to_string(), serde_json::json!(post.author));
1249 props
1250 },
1251 });
1252
1253 relationships.push(Relationship {
1255 target_id: format!("reddit_sub_{}", post.subreddit),
1256 rel_type: "posted_in".to_string(),
1257 weight: 1.0,
1258 properties: {
1259 let mut props = HashMap::new();
1260 props.insert("subreddit".to_string(), serde_json::json!(post.subreddit));
1261 props
1262 },
1263 });
1264
1265 let mut data_map = serde_json::Map::new();
1266 data_map.insert("post_id".to_string(), serde_json::json!(post.id));
1267 data_map.insert("name".to_string(), serde_json::json!(post.name));
1268 data_map.insert("author".to_string(), serde_json::json!(post.author));
1269 data_map.insert("subreddit".to_string(), serde_json::json!(post.subreddit));
1270 data_map.insert("score".to_string(), serde_json::json!(post.score));
1271 data_map.insert(
1272 "permalink".to_string(),
1273 serde_json::json!(format!("{}{}", self.base_url, post.permalink)),
1274 );
1275
1276 if let Some(title) = &post.title {
1277 data_map.insert("title".to_string(), serde_json::json!(title));
1278 }
1279 if let Some(selftext) = &post.selftext {
1280 data_map.insert("selftext".to_string(), serde_json::json!(selftext));
1281 }
1282 if let Some(body) = &post.body {
1283 data_map.insert("body".to_string(), serde_json::json!(body));
1284 }
1285 if let Some(url) = &post.url {
1286 data_map.insert("url".to_string(), serde_json::json!(url));
1287 }
1288 if let Some(num_comments) = post.num_comments {
1289 data_map.insert("num_comments".to_string(), serde_json::json!(num_comments));
1290 }
1291 if let Some(domain) = &post.domain {
1292 data_map.insert("domain".to_string(), serde_json::json!(domain));
1293 }
1294 data_map.insert("is_self".to_string(), serde_json::json!(post.is_self));
1295
1296 Ok(DataRecord {
1297 id: format!("reddit_{}", post.name),
1298 source: "reddit".to_string(),
1299 record_type: record_type.to_string(),
1300 timestamp,
1301 data: serde_json::Value::Object(data_map),
1302 embedding: Some(embedding),
1303 relationships,
1304 })
1305 }
1306
1307 async fn fetch_with_retry(&self, url: &str) -> Result<reqwest::Response> {
1309 let mut retries = 0;
1310 loop {
1311 sleep(self.rate_limit_delay).await; match self.client.get(url).send().await {
1314 Ok(response) => {
1315 if response.status() == StatusCode::TOO_MANY_REQUESTS && retries < MAX_RETRIES
1316 {
1317 retries += 1;
1318 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
1319 continue;
1320 }
1321 return Ok(response);
1322 }
1323 Err(_) if retries < MAX_RETRIES => {
1324 retries += 1;
1325 sleep(Duration::from_millis(RETRY_DELAY_MS * retries as u64)).await;
1326 }
1327 Err(e) => return Err(FrameworkError::Network(e)),
1328 }
1329 }
1330 }
1331}
1332
1333#[async_trait]
1334impl DataSource for RedditClient {
1335 fn source_id(&self) -> &str {
1336 "reddit"
1337 }
1338
1339 async fn fetch_batch(
1340 &self,
1341 cursor: Option<String>,
1342 batch_size: usize,
1343 ) -> Result<(Vec<DataRecord>, Option<String>)> {
1344 let subreddit = cursor.as_deref().unwrap_or("technology");
1345 let records = self.get_subreddit_posts(subreddit, "hot", batch_size).await?;
1346 Ok((records, None))
1347 }
1348
1349 async fn total_count(&self) -> Result<Option<u64>> {
1350 Ok(None)
1351 }
1352
1353 async fn health_check(&self) -> Result<bool> {
1354 let response = self
1355 .client
1356 .get(format!("{}/r/technology/hot.json?limit=1", self.base_url))
1357 .send()
1358 .await?;
1359 Ok(response.status().is_success())
1360 }
1361}
1362
1363#[cfg(test)]
1368mod tests {
1369 use super::*;
1370 use chrono::Datelike;
1371
1372 #[tokio::test]
1374 async fn test_hackernews_client_creation() {
1375 let client = HackerNewsClient::new();
1376 assert!(client.is_ok());
1377 }
1378
1379 #[tokio::test]
1380 async fn test_hackernews_health_check() {
1381 let client = HackerNewsClient::new().unwrap();
1382 let health = client.health_check().await;
1383 assert!(health.is_ok());
1384 }
1385
1386 #[test]
1387 fn test_hackernews_item_conversion() {
1388 let client = HackerNewsClient::new().unwrap();
1389 let item = HNItem {
1390 id: 123,
1391 item_type: "story".to_string(),
1392 by: Some("testuser".to_string()),
1393 time: 1609459200, text: None,
1395 title: Some("Test Story".to_string()),
1396 url: Some("https://example.com".to_string()),
1397 score: Some(100),
1398 kids: vec![456, 789],
1399 descendants: Some(2),
1400 };
1401
1402 let record = client.item_to_record(item).unwrap();
1403 assert_eq!(record.source, "hackernews");
1404 assert_eq!(record.record_type, "story");
1405 assert!(record.embedding.is_some());
1406 assert_eq!(record.relationships.len(), 3); }
1408
1409 #[test]
1410 fn test_hackernews_user_conversion() {
1411 let client = HackerNewsClient::new().unwrap();
1412 let user = HNUser {
1413 id: "testuser".to_string(),
1414 created: 1609459200,
1415 karma: 5000,
1416 about: Some("Test user bio".to_string()),
1417 submitted: vec![1, 2, 3],
1418 };
1419
1420 let record = client.user_to_record(user).unwrap();
1421 assert_eq!(record.source, "hackernews");
1422 assert_eq!(record.record_type, "user");
1423 assert!(record.embedding.is_some());
1424 }
1425
1426 #[tokio::test]
1428 async fn test_guardian_client_creation() {
1429 let client = GuardianClient::new(None);
1430 assert!(client.is_ok());
1431 }
1432
1433 #[tokio::test]
1434 async fn test_guardian_synthetic_articles() {
1435 let client = GuardianClient::new(None).unwrap();
1436 let records = client.search("climate", 5).await.unwrap();
1437
1438 assert!(!records.is_empty());
1439 assert_eq!(records[0].source, "guardian");
1440 assert!(records[0].embedding.is_some());
1441 }
1442
1443 #[tokio::test]
1444 async fn test_guardian_synthetic_sections() {
1445 let client = GuardianClient::new(None).unwrap();
1446 let records = client.get_sections().await.unwrap();
1447
1448 assert!(!records.is_empty());
1449 assert_eq!(records[0].source, "guardian");
1450 assert_eq!(records[0].record_type, "section");
1451 }
1452
1453 #[test]
1454 fn test_guardian_article_conversion() {
1455 let client = GuardianClient::new(None).unwrap();
1456 let article = GuardianArticle {
1457 id: "world/2024/jan/01/test".to_string(),
1458 article_type: "article".to_string(),
1459 section_id: Some("world".to_string()),
1460 section_name: Some("World news".to_string()),
1461 web_publication_date: "2024-01-01T12:00:00Z".to_string(),
1462 web_title: "Test Article".to_string(),
1463 web_url: "https://theguardian.com/test".to_string(),
1464 api_url: "https://content.guardianapis.com/test".to_string(),
1465 fields: Some(GuardianFields {
1466 body: None,
1467 headline: Some("Test Headline".to_string()),
1468 standfirst: Some("Test standfirst".to_string()),
1469 body_text: Some("Test body text".to_string()),
1470 }),
1471 tags: None,
1472 };
1473
1474 let record = client.article_to_record(article).unwrap();
1475 assert_eq!(record.source, "guardian");
1476 assert!(record.embedding.is_some());
1477 }
1478
1479 #[tokio::test]
1481 async fn test_newsdata_client_creation() {
1482 let client = NewsDataClient::new(None);
1483 assert!(client.is_ok());
1484 }
1485
1486 #[tokio::test]
1487 async fn test_newsdata_synthetic_news() {
1488 let client = NewsDataClient::new(None).unwrap();
1489 let records = client.get_latest(Some("technology"), None, None).await.unwrap();
1490
1491 assert!(!records.is_empty());
1492 assert_eq!(records[0].source, "newsdata");
1493 assert!(records[0].embedding.is_some());
1494 }
1495
1496 #[test]
1497 fn test_newsdata_article_conversion() {
1498 let client = NewsDataClient::new(None).unwrap();
1499 let article = NewsDataArticle {
1500 article_id: "test123".to_string(),
1501 title: "Test News".to_string(),
1502 link: "https://example.com/news".to_string(),
1503 keywords: vec!["tech".to_string(), "ai".to_string()],
1504 creator: Some(vec!["Author Name".to_string()]),
1505 description: Some("Test description".to_string()),
1506 content: Some("Test content".to_string()),
1507 pub_date: Some("2024-01-01 12:00:00".to_string()),
1508 image_url: Some("https://example.com/image.jpg".to_string()),
1509 source_id: "testsource".to_string(),
1510 category: Some(vec!["technology".to_string()]),
1511 country: Some(vec!["us".to_string()]),
1512 language: Some("en".to_string()),
1513 };
1514
1515 let record = client.article_to_record(article).unwrap();
1516 assert_eq!(record.source, "newsdata");
1517 assert!(record.embedding.is_some());
1518 }
1519
1520 #[tokio::test]
1522 async fn test_reddit_client_creation() {
1523 let client = RedditClient::new();
1524 assert!(client.is_ok());
1525 }
1526
1527 #[test]
1528 fn test_reddit_post_conversion() {
1529 let client = RedditClient::new().unwrap();
1530 let post = RedditPost {
1531 id: "abc123".to_string(),
1532 name: "t3_abc123".to_string(),
1533 title: Some("Test Post".to_string()),
1534 selftext: Some("Test content".to_string()),
1535 body: None,
1536 author: "testuser".to_string(),
1537 subreddit: "technology".to_string(),
1538 subreddit_id: "t5_2qh16".to_string(),
1539 score: 100,
1540 num_comments: Some(50),
1541 created_utc: 1609459200.0,
1542 permalink: "/r/technology/comments/abc123/test_post/".to_string(),
1543 url: Some("https://reddit.com/r/technology".to_string()),
1544 is_self: true,
1545 domain: Some("self.technology".to_string()),
1546 };
1547
1548 let record = client.post_to_record(&post, "post").unwrap();
1549 assert_eq!(record.source, "reddit");
1550 assert_eq!(record.record_type, "post");
1551 assert!(record.embedding.is_some());
1552 assert_eq!(record.relationships.len(), 2); }
1554
1555 #[test]
1556 fn test_reddit_comment_conversion() {
1557 let client = RedditClient::new().unwrap();
1558 let comment = RedditPost {
1559 id: "def456".to_string(),
1560 name: "t1_def456".to_string(),
1561 title: None,
1562 selftext: None,
1563 body: Some("Test comment body".to_string()),
1564 author: "commenter".to_string(),
1565 subreddit: "technology".to_string(),
1566 subreddit_id: "t5_2qh16".to_string(),
1567 score: 10,
1568 num_comments: None,
1569 created_utc: 1609459200.0,
1570 permalink: "/r/technology/comments/abc123/test_post/def456/".to_string(),
1571 url: None,
1572 is_self: false,
1573 domain: None,
1574 };
1575
1576 let record = client.post_to_record(&comment, "comment").unwrap();
1577 assert_eq!(record.source, "reddit");
1578 assert_eq!(record.record_type, "comment");
1579 assert!(record.embedding.is_some());
1580 }
1581
1582 #[test]
1584 fn test_embedding_normalization() {
1585 let embedder = SimpleEmbedder::new(128);
1586 let embedding = embedder.embed_text("machine learning artificial intelligence");
1587
1588 assert_eq!(embedding.len(), 128);
1589
1590 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
1592 assert!((magnitude - 1.0).abs() < 0.01);
1593 }
1594
1595 #[test]
1596 fn test_timestamp_parsing() {
1597 let ts = DateTime::from_timestamp(1609459200, 0).unwrap();
1599 assert_eq!(ts.year(), 2021);
1600 assert_eq!(ts.month(), 1);
1601
1602 let rfc = DateTime::parse_from_rfc3339("2024-01-01T12:00:00Z").unwrap();
1604 assert_eq!(rfc.year(), 2024);
1605 }
1606}