1use std::collections::HashSet;
7use std::sync::Arc;
8use std::time::Duration;
9
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use tokio::sync::RwLock;
13use tokio::time::interval;
14
15use crate::ruvector_native::{Domain, SemanticVector};
16use crate::{FrameworkError, Result};
17
18pub struct RealTimeEngine {
20 feeds: Vec<FeedSource>,
21 update_interval: Duration,
22 on_new_data: Option<Arc<dyn Fn(Vec<SemanticVector>) + Send + Sync>>,
23 dedup_cache: Arc<RwLock<HashSet<String>>>,
24 running: Arc<RwLock<bool>>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub enum FeedSource {
30 Rss { url: String, category: String },
32 RestPolling { url: String, interval: Duration },
34 WebSocket { url: String },
36}
37
38pub struct NewsAggregator {
40 sources: Vec<NewsSource>,
41 client: reqwest::Client,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NewsSource {
47 pub name: String,
48 pub feed_url: String,
49 pub domain: Domain,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct FeedItem {
55 pub id: String,
56 pub title: String,
57 pub description: String,
58 pub link: String,
59 pub published: Option<chrono::DateTime<Utc>>,
60 pub author: Option<String>,
61 pub categories: Vec<String>,
62}
63
64impl RealTimeEngine {
65 pub fn new(update_interval: Duration) -> Self {
67 Self {
68 feeds: Vec::new(),
69 update_interval,
70 on_new_data: None,
71 dedup_cache: Arc::new(RwLock::new(HashSet::new())),
72 running: Arc::new(RwLock::new(false)),
73 }
74 }
75
76 pub fn add_feed(&mut self, source: FeedSource) {
78 self.feeds.push(source);
79 }
80
81 pub fn set_callback<F>(&mut self, callback: F)
83 where
84 F: Fn(Vec<SemanticVector>) + Send + Sync + 'static,
85 {
86 self.on_new_data = Some(Arc::new(callback));
87 }
88
89 pub async fn start(&mut self) -> Result<()> {
91 {
92 let mut running = self.running.write().await;
93 if *running {
94 return Err(FrameworkError::Config(
95 "Engine already running".to_string(),
96 ));
97 }
98 *running = true;
99 }
100
101 let feeds = self.feeds.clone();
102 let callback = self.on_new_data.clone();
103 let dedup_cache = self.dedup_cache.clone();
104 let update_interval = self.update_interval;
105 let running = self.running.clone();
106
107 tokio::spawn(async move {
108 let mut ticker = interval(update_interval);
109
110 loop {
111 ticker.tick().await;
112
113 {
115 let is_running = running.read().await;
116 if !*is_running {
117 break;
118 }
119 }
120
121 for feed in &feeds {
123 match Self::process_feed(feed, &dedup_cache).await {
124 Ok(vectors) => {
125 if !vectors.is_empty() {
126 if let Some(ref cb) = callback {
127 cb(vectors);
128 }
129 }
130 }
131 Err(e) => {
132 tracing::error!("Feed processing error: {}", e);
133 }
134 }
135 }
136 }
137 });
138
139 Ok(())
140 }
141
142 pub async fn stop(&mut self) {
144 let mut running = self.running.write().await;
145 *running = false;
146 }
147
148 async fn process_feed(
150 feed: &FeedSource,
151 dedup_cache: &Arc<RwLock<HashSet<String>>>,
152 ) -> Result<Vec<SemanticVector>> {
153 match feed {
154 FeedSource::Rss { url, category } => {
155 Self::process_rss_feed(url, category, dedup_cache).await
156 }
157 FeedSource::RestPolling { url, .. } => {
158 Self::process_rest_feed(url, dedup_cache).await
159 }
160 FeedSource::WebSocket { url } => Self::process_websocket_feed(url, dedup_cache).await,
161 }
162 }
163
164 async fn process_rss_feed(
166 url: &str,
167 category: &str,
168 dedup_cache: &Arc<RwLock<HashSet<String>>>,
169 ) -> Result<Vec<SemanticVector>> {
170 let client = reqwest::Client::new();
171 let response = client.get(url).send().await?;
172 let content = response.text().await?;
173
174 let items = Self::parse_rss(&content)?;
176
177 let mut vectors = Vec::new();
178 let mut cache = dedup_cache.write().await;
179
180 for item in items {
181 if cache.contains(&item.id) {
183 continue;
184 }
185
186 cache.insert(item.id.clone());
188
189 let domain = Self::category_to_domain(category);
191 let vector = Self::item_to_vector(item, domain);
192 vectors.push(vector);
193 }
194
195 Ok(vectors)
196 }
197
198 async fn process_rest_feed(
200 url: &str,
201 dedup_cache: &Arc<RwLock<HashSet<String>>>,
202 ) -> Result<Vec<SemanticVector>> {
203 let client = reqwest::Client::new();
204 let response = client.get(url).send().await?;
205 let items: Vec<FeedItem> = response.json().await?;
206
207 let mut vectors = Vec::new();
208 let mut cache = dedup_cache.write().await;
209
210 for item in items {
211 if cache.contains(&item.id) {
212 continue;
213 }
214
215 cache.insert(item.id.clone());
216 let vector = Self::item_to_vector(item, Domain::Research);
217 vectors.push(vector);
218 }
219
220 Ok(vectors)
221 }
222
223 async fn process_websocket_feed(
225 _url: &str,
226 _dedup_cache: &Arc<RwLock<HashSet<String>>>,
227 ) -> Result<Vec<SemanticVector>> {
228 tracing::warn!("WebSocket feeds not yet implemented");
231 Ok(Vec::new())
232 }
233
234 fn parse_rss(content: &str) -> Result<Vec<FeedItem>> {
236 let mut items = Vec::new();
239
240 for item_block in content.split("<item>").skip(1) {
242 if let Some(end) = item_block.find("</item>") {
243 let item_xml = &item_block[..end];
244 if let Some(item) = Self::parse_rss_item(item_xml) {
245 items.push(item);
246 }
247 }
248 }
249
250 Ok(items)
251 }
252
253 fn parse_rss_item(xml: &str) -> Option<FeedItem> {
255 let title = Self::extract_tag(xml, "title")?;
256 let description = Self::extract_tag(xml, "description").unwrap_or_default();
257 let link = Self::extract_tag(xml, "link").unwrap_or_default();
258 let guid = Self::extract_tag(xml, "guid").unwrap_or_else(|| link.clone());
259
260 let published = Self::extract_tag(xml, "pubDate")
261 .and_then(|date_str| chrono::DateTime::parse_from_rfc2822(&date_str).ok())
262 .map(|dt| dt.with_timezone(&Utc));
263
264 let author = Self::extract_tag(xml, "author");
265
266 Some(FeedItem {
267 id: guid,
268 title,
269 description,
270 link,
271 published,
272 author,
273 categories: Vec::new(),
274 })
275 }
276
277 fn extract_tag(xml: &str, tag: &str) -> Option<String> {
279 let start_tag = format!("<{}>", tag);
280 let end_tag = format!("</{}>", tag);
281
282 let start = xml.find(&start_tag)? + start_tag.len();
283 let end = xml.find(&end_tag)?;
284
285 if start < end {
286 let content = &xml[start..end];
287 let decoded = content
289 .replace("<", "<")
290 .replace(">", ">")
291 .replace("&", "&")
292 .replace(""", "\"")
293 .replace("'", "'");
294 Some(decoded.trim().to_string())
295 } else {
296 None
297 }
298 }
299
300 fn category_to_domain(category: &str) -> Domain {
302 match category.to_lowercase().as_str() {
303 "climate" | "weather" | "environment" => Domain::Climate,
304 "finance" | "economy" | "market" | "stock" => Domain::Finance,
305 "research" | "science" | "academic" | "medical" => Domain::Research,
306 _ => Domain::CrossDomain,
307 }
308 }
309
310 fn item_to_vector(item: FeedItem, domain: Domain) -> SemanticVector {
312 use std::collections::HashMap;
313
314 let text = format!("{} {}", item.title, item.description);
317 let embedding = Self::simple_embedding(&text);
318
319 let mut metadata = HashMap::new();
320 metadata.insert("title".to_string(), item.title.clone());
321 metadata.insert("link".to_string(), item.link.clone());
322 if let Some(author) = item.author {
323 metadata.insert("author".to_string(), author);
324 }
325
326 SemanticVector {
327 id: item.id,
328 embedding,
329 domain,
330 timestamp: item.published.unwrap_or_else(Utc::now),
331 metadata,
332 }
333 }
334
335 fn simple_embedding(text: &str) -> Vec<f32> {
337 use std::collections::hash_map::DefaultHasher;
338 use std::hash::{Hash, Hasher};
339
340 let mut embedding = vec![0.0f32; 384];
342
343 for (i, word) in text.split_whitespace().take(384).enumerate() {
344 let mut hasher = DefaultHasher::new();
345 word.hash(&mut hasher);
346 let hash = hasher.finish();
347 embedding[i] = (hash as f32 / u64::MAX as f32) * 2.0 - 1.0;
348 }
349
350 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
352 if magnitude > 0.0 {
353 for val in &mut embedding {
354 *val /= magnitude;
355 }
356 }
357
358 embedding
359 }
360}
361
362impl NewsAggregator {
363 pub fn new() -> Self {
365 Self {
366 sources: Vec::new(),
367 client: reqwest::Client::builder()
368 .user_agent("RuVector/1.0")
369 .timeout(Duration::from_secs(30))
370 .build()
371 .unwrap(),
372 }
373 }
374
375 pub fn add_source(&mut self, source: NewsSource) {
377 self.sources.push(source);
378 }
379
380 pub fn add_default_sources(&mut self) {
382 self.add_source(NewsSource {
384 name: "NASA Earth Observatory".to_string(),
385 feed_url: "https://earthobservatory.nasa.gov/feeds/image-of-the-day.rss".to_string(),
386 domain: Domain::Climate,
387 });
388
389 self.add_source(NewsSource {
391 name: "Yahoo Finance - Top Stories".to_string(),
392 feed_url: "https://finance.yahoo.com/news/rssindex".to_string(),
393 domain: Domain::Finance,
394 });
395
396 self.add_source(NewsSource {
398 name: "PubMed Recent".to_string(),
399 feed_url: "https://pubmed.ncbi.nlm.nih.gov/rss/search/1nKx2zx8g-9UCGpQD5qVmN6jTvSRRxYqjD3T_nA-pSMjDlXr4u/?limit=100&utm_campaign=pubmed-2&fc=20210421200858".to_string(),
400 domain: Domain::Research,
401 });
402
403 self.add_source(NewsSource {
405 name: "Reuters Top News".to_string(),
406 feed_url: "https://www.reutersagency.com/feed/?taxonomy=best-topics&post_type=best".to_string(),
407 domain: Domain::CrossDomain,
408 });
409
410 self.add_source(NewsSource {
411 name: "AP News Top Stories".to_string(),
412 feed_url: "https://apnews.com/index.rss".to_string(),
413 domain: Domain::CrossDomain,
414 });
415 }
416
417 pub async fn fetch_latest(&self, limit: usize) -> Result<Vec<SemanticVector>> {
419 let mut all_vectors = Vec::new();
420 let mut seen = HashSet::new();
421
422 for source in &self.sources {
423 match self.fetch_source(source, limit).await {
424 Ok(vectors) => {
425 for vector in vectors {
426 if !seen.contains(&vector.id) {
427 seen.insert(vector.id.clone());
428 all_vectors.push(vector);
429 }
430 }
431 }
432 Err(e) => {
433 tracing::warn!("Failed to fetch {}: {}", source.name, e);
434 }
435 }
436 }
437
438 all_vectors.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
440
441 all_vectors.truncate(limit);
443
444 Ok(all_vectors)
445 }
446
447 async fn fetch_source(&self, source: &NewsSource, limit: usize) -> Result<Vec<SemanticVector>> {
449 let response = self.client.get(&source.feed_url).send().await?;
450 let content = response.text().await?;
451
452 let items = RealTimeEngine::parse_rss(&content)?;
453 let mut vectors = Vec::new();
454
455 for item in items.into_iter().take(limit) {
456 let vector = RealTimeEngine::item_to_vector(item, source.domain);
457 vectors.push(vector);
458 }
459
460 Ok(vectors)
461 }
462}
463
464impl Default for NewsAggregator {
465 fn default() -> Self {
466 Self::new()
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473
474 #[test]
475 fn test_extract_tag() {
476 let xml = "<title>Test Title</title><description>Test Description</description>";
477 assert_eq!(
478 RealTimeEngine::extract_tag(xml, "title"),
479 Some("Test Title".to_string())
480 );
481 assert_eq!(
482 RealTimeEngine::extract_tag(xml, "description"),
483 Some("Test Description".to_string())
484 );
485 assert_eq!(RealTimeEngine::extract_tag(xml, "missing"), None);
486 }
487
488 #[test]
489 fn test_category_to_domain() {
490 assert_eq!(
491 RealTimeEngine::category_to_domain("climate"),
492 Domain::Climate
493 );
494 assert_eq!(
495 RealTimeEngine::category_to_domain("Finance"),
496 Domain::Finance
497 );
498 assert_eq!(
499 RealTimeEngine::category_to_domain("research"),
500 Domain::Research
501 );
502 assert_eq!(
503 RealTimeEngine::category_to_domain("other"),
504 Domain::CrossDomain
505 );
506 }
507
508 #[test]
509 fn test_simple_embedding() {
510 let embedding = RealTimeEngine::simple_embedding("climate change impacts");
511 assert_eq!(embedding.len(), 384);
512
513 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
515 assert!((magnitude - 1.0).abs() < 0.01);
516 }
517
518 #[tokio::test]
519 async fn test_realtime_engine_lifecycle() {
520 let mut engine = RealTimeEngine::new(Duration::from_secs(1));
521
522 engine.add_feed(FeedSource::Rss {
523 url: "https://example.com/feed.rss".to_string(),
524 category: "climate".to_string(),
525 });
526
527 assert!(engine.start().await.is_ok());
529 engine.stop().await;
530 }
531
532 #[test]
533 fn test_news_aggregator() {
534 let mut aggregator = NewsAggregator::new();
535 aggregator.add_default_sources();
536 assert!(aggregator.sources.len() >= 5);
537 }
538
539 #[test]
540 fn test_parse_rss_item() {
541 let xml = r#"
542 <title>Test Article</title>
543 <description>This is a test article</description>
544 <link>https://example.com/article</link>
545 <guid>article-123</guid>
546 <pubDate>Mon, 01 Jan 2024 12:00:00 GMT</pubDate>
547 "#;
548
549 let item = RealTimeEngine::parse_rss_item(xml);
550 assert!(item.is_some());
551
552 let item = item.unwrap();
553 assert_eq!(item.title, "Test Article");
554 assert_eq!(item.description, "This is a test article");
555 assert_eq!(item.link, "https://example.com/article");
556 assert_eq!(item.id, "article-123");
557 }
558}