1use std::collections::HashSet;
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use chrono::Utc;
12use serde::{Deserialize, Serialize};
13use tokio::sync::RwLock;
14use tokio::time::interval;
15
16use crate::ruvector_native::{Domain, SemanticVector};
17use crate::{FrameworkError, Result};
18
19pub struct RealTimeEngine {
21 feeds: Vec<FeedSource>,
22 update_interval: Duration,
23 on_new_data: Option<Arc<dyn Fn(Vec<SemanticVector>) + Send + Sync>>,
24 dedup_cache: Arc<RwLock<HashSet<String>>>,
25 running: Arc<RwLock<bool>>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub enum FeedSource {
31 Rss { url: String, category: String },
33 RestPolling { url: String, interval: Duration },
35 WebSocket { url: String },
37}
38
39pub struct NewsAggregator {
41 sources: Vec<NewsSource>,
42 client: reqwest::Client,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct NewsSource {
48 pub name: String,
49 pub feed_url: String,
50 pub domain: Domain,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct FeedItem {
56 pub id: String,
57 pub title: String,
58 pub description: String,
59 pub link: String,
60 pub published: Option<chrono::DateTime<Utc>>,
61 pub author: Option<String>,
62 pub categories: Vec<String>,
63}
64
65impl RealTimeEngine {
66 pub fn new(update_interval: Duration) -> Self {
68 Self {
69 feeds: Vec::new(),
70 update_interval,
71 on_new_data: None,
72 dedup_cache: Arc::new(RwLock::new(HashSet::new())),
73 running: Arc::new(RwLock::new(false)),
74 }
75 }
76
77 pub fn add_feed(&mut self, source: FeedSource) {
79 self.feeds.push(source);
80 }
81
82 pub fn set_callback<F>(&mut self, callback: F)
84 where
85 F: Fn(Vec<SemanticVector>) + Send + Sync + 'static,
86 {
87 self.on_new_data = Some(Arc::new(callback));
88 }
89
90 pub async fn start(&mut self) -> Result<()> {
92 {
93 let mut running = self.running.write().await;
94 if *running {
95 return Err(FrameworkError::Config(
96 "Engine already running".to_string(),
97 ));
98 }
99 *running = true;
100 }
101
102 let feeds = self.feeds.clone();
103 let callback = self.on_new_data.clone();
104 let dedup_cache = self.dedup_cache.clone();
105 let update_interval = self.update_interval;
106 let running = self.running.clone();
107
108 tokio::spawn(async move {
109 let mut ticker = interval(update_interval);
110
111 loop {
112 ticker.tick().await;
113
114 {
116 let is_running = running.read().await;
117 if !*is_running {
118 break;
119 }
120 }
121
122 for feed in &feeds {
124 match Self::process_feed(feed, &dedup_cache).await {
125 Ok(vectors) => {
126 if !vectors.is_empty() {
127 if let Some(ref cb) = callback {
128 cb(vectors);
129 }
130 }
131 }
132 Err(e) => {
133 tracing::error!("Feed processing error: {}", e);
134 }
135 }
136 }
137 }
138 });
139
140 Ok(())
141 }
142
143 pub async fn stop(&mut self) {
145 let mut running = self.running.write().await;
146 *running = false;
147 }
148
149 async fn process_feed(
151 feed: &FeedSource,
152 dedup_cache: &Arc<RwLock<HashSet<String>>>,
153 ) -> Result<Vec<SemanticVector>> {
154 match feed {
155 FeedSource::Rss { url, category } => {
156 Self::process_rss_feed(url, category, dedup_cache).await
157 }
158 FeedSource::RestPolling { url, .. } => {
159 Self::process_rest_feed(url, dedup_cache).await
160 }
161 FeedSource::WebSocket { url } => Self::process_websocket_feed(url, dedup_cache).await,
162 }
163 }
164
165 async fn process_rss_feed(
167 url: &str,
168 category: &str,
169 dedup_cache: &Arc<RwLock<HashSet<String>>>,
170 ) -> Result<Vec<SemanticVector>> {
171 let client = reqwest::Client::new();
172 let response = client.get(url).send().await?;
173 let content = response.text().await?;
174
175 let items = Self::parse_rss(&content)?;
177
178 let mut vectors = Vec::new();
179 let mut cache = dedup_cache.write().await;
180
181 for item in items {
182 if cache.contains(&item.id) {
184 continue;
185 }
186
187 cache.insert(item.id.clone());
189
190 let domain = Self::category_to_domain(category);
192 let vector = Self::item_to_vector(item, domain);
193 vectors.push(vector);
194 }
195
196 Ok(vectors)
197 }
198
199 async fn process_rest_feed(
201 url: &str,
202 dedup_cache: &Arc<RwLock<HashSet<String>>>,
203 ) -> Result<Vec<SemanticVector>> {
204 let client = reqwest::Client::new();
205 let response = client.get(url).send().await?;
206 let items: Vec<FeedItem> = response.json().await?;
207
208 let mut vectors = Vec::new();
209 let mut cache = dedup_cache.write().await;
210
211 for item in items {
212 if cache.contains(&item.id) {
213 continue;
214 }
215
216 cache.insert(item.id.clone());
217 let vector = Self::item_to_vector(item, Domain::Research);
218 vectors.push(vector);
219 }
220
221 Ok(vectors)
222 }
223
224 async fn process_websocket_feed(
226 _url: &str,
227 _dedup_cache: &Arc<RwLock<HashSet<String>>>,
228 ) -> Result<Vec<SemanticVector>> {
229 tracing::warn!("WebSocket feeds not yet implemented");
232 Ok(Vec::new())
233 }
234
235 fn parse_rss(content: &str) -> Result<Vec<FeedItem>> {
237 let mut items = Vec::new();
240
241 for item_block in content.split("<item>").skip(1) {
243 if let Some(end) = item_block.find("</item>") {
244 let item_xml = &item_block[..end];
245 if let Some(item) = Self::parse_rss_item(item_xml) {
246 items.push(item);
247 }
248 }
249 }
250
251 Ok(items)
252 }
253
254 fn parse_rss_item(xml: &str) -> Option<FeedItem> {
256 let title = Self::extract_tag(xml, "title")?;
257 let description = Self::extract_tag(xml, "description").unwrap_or_default();
258 let link = Self::extract_tag(xml, "link").unwrap_or_default();
259 let guid = Self::extract_tag(xml, "guid").unwrap_or_else(|| link.clone());
260
261 let published = Self::extract_tag(xml, "pubDate")
262 .and_then(|date_str| chrono::DateTime::parse_from_rfc2822(&date_str).ok())
263 .map(|dt| dt.with_timezone(&Utc));
264
265 let author = Self::extract_tag(xml, "author");
266
267 Some(FeedItem {
268 id: guid,
269 title,
270 description,
271 link,
272 published,
273 author,
274 categories: Vec::new(),
275 })
276 }
277
278 fn extract_tag(xml: &str, tag: &str) -> Option<String> {
280 let start_tag = format!("<{}>", tag);
281 let end_tag = format!("</{}>", tag);
282
283 let start = xml.find(&start_tag)? + start_tag.len();
284 let end = xml.find(&end_tag)?;
285
286 if start < end {
287 let content = &xml[start..end];
288 let decoded = content
290 .replace("<", "<")
291 .replace(">", ">")
292 .replace("&", "&")
293 .replace(""", "\"")
294 .replace("'", "'");
295 Some(decoded.trim().to_string())
296 } else {
297 None
298 }
299 }
300
301 fn category_to_domain(category: &str) -> Domain {
303 match category.to_lowercase().as_str() {
304 "climate" | "weather" | "environment" => Domain::Climate,
305 "finance" | "economy" | "market" | "stock" => Domain::Finance,
306 "research" | "science" | "academic" | "medical" => Domain::Research,
307 _ => Domain::CrossDomain,
308 }
309 }
310
311 fn item_to_vector(item: FeedItem, domain: Domain) -> SemanticVector {
313 use std::collections::HashMap;
314
315 let text = format!("{} {}", item.title, item.description);
318 let embedding = Self::simple_embedding(&text);
319
320 let mut metadata = HashMap::new();
321 metadata.insert("title".to_string(), item.title.clone());
322 metadata.insert("link".to_string(), item.link.clone());
323 if let Some(author) = item.author {
324 metadata.insert("author".to_string(), author);
325 }
326
327 SemanticVector {
328 id: item.id,
329 embedding,
330 domain,
331 timestamp: item.published.unwrap_or_else(Utc::now),
332 metadata,
333 }
334 }
335
336 fn simple_embedding(text: &str) -> Vec<f32> {
338 use std::collections::hash_map::DefaultHasher;
339 use std::hash::{Hash, Hasher};
340
341 let mut embedding = vec![0.0f32; 384];
343
344 for (i, word) in text.split_whitespace().take(384).enumerate() {
345 let mut hasher = DefaultHasher::new();
346 word.hash(&mut hasher);
347 let hash = hasher.finish();
348 embedding[i] = (hash as f32 / u64::MAX as f32) * 2.0 - 1.0;
349 }
350
351 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
353 if magnitude > 0.0 {
354 for val in &mut embedding {
355 *val /= magnitude;
356 }
357 }
358
359 embedding
360 }
361}
362
363impl NewsAggregator {
364 pub fn new() -> Self {
366 Self {
367 sources: Vec::new(),
368 client: reqwest::Client::builder()
369 .user_agent("RuVector/1.0")
370 .timeout(Duration::from_secs(30))
371 .build()
372 .unwrap(),
373 }
374 }
375
376 pub fn add_source(&mut self, source: NewsSource) {
378 self.sources.push(source);
379 }
380
381 pub fn add_default_sources(&mut self) {
383 self.add_source(NewsSource {
385 name: "NASA Earth Observatory".to_string(),
386 feed_url: "https://earthobservatory.nasa.gov/feeds/image-of-the-day.rss".to_string(),
387 domain: Domain::Climate,
388 });
389
390 self.add_source(NewsSource {
392 name: "Yahoo Finance - Top Stories".to_string(),
393 feed_url: "https://finance.yahoo.com/news/rssindex".to_string(),
394 domain: Domain::Finance,
395 });
396
397 self.add_source(NewsSource {
399 name: "PubMed Recent".to_string(),
400 feed_url: "https://pubmed.ncbi.nlm.nih.gov/rss/search/1nKx2zx8g-9UCGpQD5qVmN6jTvSRRxYqjD3T_nA-pSMjDlXr4u/?limit=100&utm_campaign=pubmed-2&fc=20210421200858".to_string(),
401 domain: Domain::Research,
402 });
403
404 self.add_source(NewsSource {
406 name: "Reuters Top News".to_string(),
407 feed_url: "https://www.reutersagency.com/feed/?taxonomy=best-topics&post_type=best".to_string(),
408 domain: Domain::CrossDomain,
409 });
410
411 self.add_source(NewsSource {
412 name: "AP News Top Stories".to_string(),
413 feed_url: "https://apnews.com/index.rss".to_string(),
414 domain: Domain::CrossDomain,
415 });
416 }
417
418 pub async fn fetch_latest(&self, limit: usize) -> Result<Vec<SemanticVector>> {
420 let mut all_vectors = Vec::new();
421 let mut seen = HashSet::new();
422
423 for source in &self.sources {
424 match self.fetch_source(source, limit).await {
425 Ok(vectors) => {
426 for vector in vectors {
427 if !seen.contains(&vector.id) {
428 seen.insert(vector.id.clone());
429 all_vectors.push(vector);
430 }
431 }
432 }
433 Err(e) => {
434 tracing::warn!("Failed to fetch {}: {}", source.name, e);
435 }
436 }
437 }
438
439 all_vectors.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
441
442 all_vectors.truncate(limit);
444
445 Ok(all_vectors)
446 }
447
448 async fn fetch_source(&self, source: &NewsSource, limit: usize) -> Result<Vec<SemanticVector>> {
450 let response = self.client.get(&source.feed_url).send().await?;
451 let content = response.text().await?;
452
453 let items = RealTimeEngine::parse_rss(&content)?;
454 let mut vectors = Vec::new();
455
456 for item in items.into_iter().take(limit) {
457 let vector = RealTimeEngine::item_to_vector(item, source.domain);
458 vectors.push(vector);
459 }
460
461 Ok(vectors)
462 }
463}
464
465impl Default for NewsAggregator {
466 fn default() -> Self {
467 Self::new()
468 }
469}
470
471#[cfg(test)]
472mod tests {
473 use super::*;
474
475 #[test]
476 fn test_extract_tag() {
477 let xml = "<title>Test Title</title><description>Test Description</description>";
478 assert_eq!(
479 RealTimeEngine::extract_tag(xml, "title"),
480 Some("Test Title".to_string())
481 );
482 assert_eq!(
483 RealTimeEngine::extract_tag(xml, "description"),
484 Some("Test Description".to_string())
485 );
486 assert_eq!(RealTimeEngine::extract_tag(xml, "missing"), None);
487 }
488
489 #[test]
490 fn test_category_to_domain() {
491 assert_eq!(
492 RealTimeEngine::category_to_domain("climate"),
493 Domain::Climate
494 );
495 assert_eq!(
496 RealTimeEngine::category_to_domain("Finance"),
497 Domain::Finance
498 );
499 assert_eq!(
500 RealTimeEngine::category_to_domain("research"),
501 Domain::Research
502 );
503 assert_eq!(
504 RealTimeEngine::category_to_domain("other"),
505 Domain::CrossDomain
506 );
507 }
508
509 #[test]
510 fn test_simple_embedding() {
511 let embedding = RealTimeEngine::simple_embedding("climate change impacts");
512 assert_eq!(embedding.len(), 384);
513
514 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
516 assert!((magnitude - 1.0).abs() < 0.01);
517 }
518
519 #[tokio::test]
520 async fn test_realtime_engine_lifecycle() {
521 let mut engine = RealTimeEngine::new(Duration::from_secs(1));
522
523 engine.add_feed(FeedSource::Rss {
524 url: "https://example.com/feed.rss".to_string(),
525 category: "climate".to_string(),
526 });
527
528 assert!(engine.start().await.is_ok());
530 engine.stop().await;
531 }
532
533 #[test]
534 fn test_news_aggregator() {
535 let mut aggregator = NewsAggregator::new();
536 aggregator.add_default_sources();
537 assert!(aggregator.sources.len() >= 5);
538 }
539
540 #[test]
541 fn test_parse_rss_item() {
542 let xml = r#"
543 <title>Test Article</title>
544 <description>This is a test article</description>
545 <link>https://example.com/article</link>
546 <guid>article-123</guid>
547 <pubDate>Mon, 01 Jan 2024 12:00:00 GMT</pubDate>
548 "#;
549
550 let item = RealTimeEngine::parse_rss_item(xml);
551 assert!(item.is_some());
552
553 let item = item.unwrap();
554 assert_eq!(item.title, "Test Article");
555 assert_eq!(item.description, "This is a test article");
556 assert_eq!(item.link, "https://example.com/article");
557 assert_eq!(item.id, "article-123");
558 }
559}