ruvector_data_framework/
realtime.rs

1//! Real-Time Data Feed Integration
2//!
3//! RSS/Atom feed parsing, WebSocket streaming, and REST API polling
4//! for continuous data ingestion into RuVector discovery framework.
5
6use std::collections::HashSet;
7use std::sync::Arc;
8use std::time::Duration;
9
10use async_trait::async_trait;
11use chrono::Utc;
12use serde::{Deserialize, Serialize};
13use tokio::sync::RwLock;
14use tokio::time::interval;
15
16use crate::ruvector_native::{Domain, SemanticVector};
17use crate::{FrameworkError, Result};
18
19/// Real-time engine for streaming data feeds
20pub struct RealTimeEngine {
21    feeds: Vec<FeedSource>,
22    update_interval: Duration,
23    on_new_data: Option<Arc<dyn Fn(Vec<SemanticVector>) + Send + Sync>>,
24    dedup_cache: Arc<RwLock<HashSet<String>>>,
25    running: Arc<RwLock<bool>>,
26}
27
28/// Types of feed sources
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub enum FeedSource {
31    /// RSS or Atom feed
32    Rss { url: String, category: String },
33    /// REST API with polling
34    RestPolling { url: String, interval: Duration },
35    /// WebSocket streaming endpoint
36    WebSocket { url: String },
37}
38
39/// News aggregator for multiple RSS feeds
40pub struct NewsAggregator {
41    sources: Vec<NewsSource>,
42    client: reqwest::Client,
43}
44
45/// Individual news source configuration
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct NewsSource {
48    pub name: String,
49    pub feed_url: String,
50    pub domain: Domain,
51}
52
53/// Parsed feed item
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct FeedItem {
56    pub id: String,
57    pub title: String,
58    pub description: String,
59    pub link: String,
60    pub published: Option<chrono::DateTime<Utc>>,
61    pub author: Option<String>,
62    pub categories: Vec<String>,
63}
64
65impl RealTimeEngine {
66    /// Create a new real-time engine
67    pub fn new(update_interval: Duration) -> Self {
68        Self {
69            feeds: Vec::new(),
70            update_interval,
71            on_new_data: None,
72            dedup_cache: Arc::new(RwLock::new(HashSet::new())),
73            running: Arc::new(RwLock::new(false)),
74        }
75    }
76
77    /// Add a feed source to monitor
78    pub fn add_feed(&mut self, source: FeedSource) {
79        self.feeds.push(source);
80    }
81
82    /// Set callback for new data
83    pub fn set_callback<F>(&mut self, callback: F)
84    where
85        F: Fn(Vec<SemanticVector>) + Send + Sync + 'static,
86    {
87        self.on_new_data = Some(Arc::new(callback));
88    }
89
90    /// Start the real-time engine
91    pub async fn start(&mut self) -> Result<()> {
92        {
93            let mut running = self.running.write().await;
94            if *running {
95                return Err(FrameworkError::Config(
96                    "Engine already running".to_string(),
97                ));
98            }
99            *running = true;
100        }
101
102        let feeds = self.feeds.clone();
103        let callback = self.on_new_data.clone();
104        let dedup_cache = self.dedup_cache.clone();
105        let update_interval = self.update_interval;
106        let running = self.running.clone();
107
108        tokio::spawn(async move {
109            let mut ticker = interval(update_interval);
110
111            loop {
112                ticker.tick().await;
113
114                // Check if we should stop
115                {
116                    let is_running = running.read().await;
117                    if !*is_running {
118                        break;
119                    }
120                }
121
122                // Process all feeds
123                for feed in &feeds {
124                    match Self::process_feed(feed, &dedup_cache).await {
125                        Ok(vectors) => {
126                            if !vectors.is_empty() {
127                                if let Some(ref cb) = callback {
128                                    cb(vectors);
129                                }
130                            }
131                        }
132                        Err(e) => {
133                            tracing::error!("Feed processing error: {}", e);
134                        }
135                    }
136                }
137            }
138        });
139
140        Ok(())
141    }
142
143    /// Stop the real-time engine
144    pub async fn stop(&mut self) {
145        let mut running = self.running.write().await;
146        *running = false;
147    }
148
149    /// Process a single feed source
150    async fn process_feed(
151        feed: &FeedSource,
152        dedup_cache: &Arc<RwLock<HashSet<String>>>,
153    ) -> Result<Vec<SemanticVector>> {
154        match feed {
155            FeedSource::Rss { url, category } => {
156                Self::process_rss_feed(url, category, dedup_cache).await
157            }
158            FeedSource::RestPolling { url, .. } => {
159                Self::process_rest_feed(url, dedup_cache).await
160            }
161            FeedSource::WebSocket { url } => Self::process_websocket_feed(url, dedup_cache).await,
162        }
163    }
164
165    /// Process RSS/Atom feed
166    async fn process_rss_feed(
167        url: &str,
168        category: &str,
169        dedup_cache: &Arc<RwLock<HashSet<String>>>,
170    ) -> Result<Vec<SemanticVector>> {
171        let client = reqwest::Client::new();
172        let response = client.get(url).send().await?;
173        let content = response.text().await?;
174
175        // Parse RSS/Atom feed
176        let items = Self::parse_rss(&content)?;
177
178        let mut vectors = Vec::new();
179        let mut cache = dedup_cache.write().await;
180
181        for item in items {
182            // Check for duplicates
183            if cache.contains(&item.id) {
184                continue;
185            }
186
187            // Add to dedup cache
188            cache.insert(item.id.clone());
189
190            // Convert to SemanticVector
191            let domain = Self::category_to_domain(category);
192            let vector = Self::item_to_vector(item, domain);
193            vectors.push(vector);
194        }
195
196        Ok(vectors)
197    }
198
199    /// Process REST API polling
200    async fn process_rest_feed(
201        url: &str,
202        dedup_cache: &Arc<RwLock<HashSet<String>>>,
203    ) -> Result<Vec<SemanticVector>> {
204        let client = reqwest::Client::new();
205        let response = client.get(url).send().await?;
206        let items: Vec<FeedItem> = response.json().await?;
207
208        let mut vectors = Vec::new();
209        let mut cache = dedup_cache.write().await;
210
211        for item in items {
212            if cache.contains(&item.id) {
213                continue;
214            }
215
216            cache.insert(item.id.clone());
217            let vector = Self::item_to_vector(item, Domain::Research);
218            vectors.push(vector);
219        }
220
221        Ok(vectors)
222    }
223
224    /// Process WebSocket stream (simplified implementation)
225    async fn process_websocket_feed(
226        _url: &str,
227        _dedup_cache: &Arc<RwLock<HashSet<String>>>,
228    ) -> Result<Vec<SemanticVector>> {
229        // WebSocket implementation would require tokio-tungstenite
230        // For now, return empty - can be extended with actual WebSocket client
231        tracing::warn!("WebSocket feeds not yet implemented");
232        Ok(Vec::new())
233    }
234
235    /// Parse RSS/Atom XML into feed items
236    fn parse_rss(content: &str) -> Result<Vec<FeedItem>> {
237        // Simple XML parsing for RSS 2.0
238        // In production, use feed-rs or rss crate
239        let mut items = Vec::new();
240
241        // Basic RSS parsing (simplified)
242        for item_block in content.split("<item>").skip(1) {
243            if let Some(end) = item_block.find("</item>") {
244                let item_xml = &item_block[..end];
245                if let Some(item) = Self::parse_rss_item(item_xml) {
246                    items.push(item);
247                }
248            }
249        }
250
251        Ok(items)
252    }
253
254    /// Parse a single RSS item from XML
255    fn parse_rss_item(xml: &str) -> Option<FeedItem> {
256        let title = Self::extract_tag(xml, "title")?;
257        let description = Self::extract_tag(xml, "description").unwrap_or_default();
258        let link = Self::extract_tag(xml, "link").unwrap_or_default();
259        let guid = Self::extract_tag(xml, "guid").unwrap_or_else(|| link.clone());
260
261        let published = Self::extract_tag(xml, "pubDate")
262            .and_then(|date_str| chrono::DateTime::parse_from_rfc2822(&date_str).ok())
263            .map(|dt| dt.with_timezone(&Utc));
264
265        let author = Self::extract_tag(xml, "author");
266
267        Some(FeedItem {
268            id: guid,
269            title,
270            description,
271            link,
272            published,
273            author,
274            categories: Vec::new(),
275        })
276    }
277
278    /// Extract content between XML tags
279    fn extract_tag(xml: &str, tag: &str) -> Option<String> {
280        let start_tag = format!("<{}>", tag);
281        let end_tag = format!("</{}>", tag);
282
283        let start = xml.find(&start_tag)? + start_tag.len();
284        let end = xml.find(&end_tag)?;
285
286        if start < end {
287            let content = &xml[start..end];
288            // Basic HTML entity decoding
289            let decoded = content
290                .replace("&lt;", "<")
291                .replace("&gt;", ">")
292                .replace("&amp;", "&")
293                .replace("&quot;", "\"")
294                .replace("&#39;", "'");
295            Some(decoded.trim().to_string())
296        } else {
297            None
298        }
299    }
300
301    /// Convert category string to Domain enum
302    fn category_to_domain(category: &str) -> Domain {
303        match category.to_lowercase().as_str() {
304            "climate" | "weather" | "environment" => Domain::Climate,
305            "finance" | "economy" | "market" | "stock" => Domain::Finance,
306            "research" | "science" | "academic" | "medical" => Domain::Research,
307            _ => Domain::CrossDomain,
308        }
309    }
310
311    /// Convert FeedItem to SemanticVector
312    fn item_to_vector(item: FeedItem, domain: Domain) -> SemanticVector {
313        use std::collections::HashMap;
314
315        // Create a simple embedding from title + description
316        // In production, use actual embedding model
317        let text = format!("{} {}", item.title, item.description);
318        let embedding = Self::simple_embedding(&text);
319
320        let mut metadata = HashMap::new();
321        metadata.insert("title".to_string(), item.title.clone());
322        metadata.insert("link".to_string(), item.link.clone());
323        if let Some(author) = item.author {
324            metadata.insert("author".to_string(), author);
325        }
326
327        SemanticVector {
328            id: item.id,
329            embedding,
330            domain,
331            timestamp: item.published.unwrap_or_else(Utc::now),
332            metadata,
333        }
334    }
335
336    /// Simple embedding generation (hash-based for demo)
337    fn simple_embedding(text: &str) -> Vec<f32> {
338        use std::collections::hash_map::DefaultHasher;
339        use std::hash::{Hash, Hasher};
340
341        // Create 384-dimensional embedding from text hash
342        let mut embedding = vec![0.0f32; 384];
343
344        for (i, word) in text.split_whitespace().take(384).enumerate() {
345            let mut hasher = DefaultHasher::new();
346            word.hash(&mut hasher);
347            let hash = hasher.finish();
348            embedding[i] = (hash as f32 / u64::MAX as f32) * 2.0 - 1.0;
349        }
350
351        // Normalize
352        let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
353        if magnitude > 0.0 {
354            for val in &mut embedding {
355                *val /= magnitude;
356            }
357        }
358
359        embedding
360    }
361}
362
363impl NewsAggregator {
364    /// Create a new news aggregator
365    pub fn new() -> Self {
366        Self {
367            sources: Vec::new(),
368            client: reqwest::Client::builder()
369                .user_agent("RuVector/1.0")
370                .timeout(Duration::from_secs(30))
371                .build()
372                .unwrap(),
373        }
374    }
375
376    /// Add a news source
377    pub fn add_source(&mut self, source: NewsSource) {
378        self.sources.push(source);
379    }
380
381    /// Add default free news sources
382    pub fn add_default_sources(&mut self) {
383        // Climate sources
384        self.add_source(NewsSource {
385            name: "NASA Earth Observatory".to_string(),
386            feed_url: "https://earthobservatory.nasa.gov/feeds/image-of-the-day.rss".to_string(),
387            domain: Domain::Climate,
388        });
389
390        // Financial sources
391        self.add_source(NewsSource {
392            name: "Yahoo Finance - Top Stories".to_string(),
393            feed_url: "https://finance.yahoo.com/news/rssindex".to_string(),
394            domain: Domain::Finance,
395        });
396
397        // Medical/Research sources
398        self.add_source(NewsSource {
399            name: "PubMed Recent".to_string(),
400            feed_url: "https://pubmed.ncbi.nlm.nih.gov/rss/search/1nKx2zx8g-9UCGpQD5qVmN6jTvSRRxYqjD3T_nA-pSMjDlXr4u/?limit=100&utm_campaign=pubmed-2&fc=20210421200858".to_string(),
401            domain: Domain::Research,
402        });
403
404        // General news sources
405        self.add_source(NewsSource {
406            name: "Reuters Top News".to_string(),
407            feed_url: "https://www.reutersagency.com/feed/?taxonomy=best-topics&post_type=best".to_string(),
408            domain: Domain::CrossDomain,
409        });
410
411        self.add_source(NewsSource {
412            name: "AP News Top Stories".to_string(),
413            feed_url: "https://apnews.com/index.rss".to_string(),
414            domain: Domain::CrossDomain,
415        });
416    }
417
418    /// Fetch latest items from all sources
419    pub async fn fetch_latest(&self, limit: usize) -> Result<Vec<SemanticVector>> {
420        let mut all_vectors = Vec::new();
421        let mut seen = HashSet::new();
422
423        for source in &self.sources {
424            match self.fetch_source(source, limit).await {
425                Ok(vectors) => {
426                    for vector in vectors {
427                        if !seen.contains(&vector.id) {
428                            seen.insert(vector.id.clone());
429                            all_vectors.push(vector);
430                        }
431                    }
432                }
433                Err(e) => {
434                    tracing::warn!("Failed to fetch {}: {}", source.name, e);
435                }
436            }
437        }
438
439        // Sort by timestamp, most recent first
440        all_vectors.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
441
442        // Limit results
443        all_vectors.truncate(limit);
444
445        Ok(all_vectors)
446    }
447
448    /// Fetch from a single source
449    async fn fetch_source(&self, source: &NewsSource, limit: usize) -> Result<Vec<SemanticVector>> {
450        let response = self.client.get(&source.feed_url).send().await?;
451        let content = response.text().await?;
452
453        let items = RealTimeEngine::parse_rss(&content)?;
454        let mut vectors = Vec::new();
455
456        for item in items.into_iter().take(limit) {
457            let vector = RealTimeEngine::item_to_vector(item, source.domain);
458            vectors.push(vector);
459        }
460
461        Ok(vectors)
462    }
463}
464
465impl Default for NewsAggregator {
466    fn default() -> Self {
467        Self::new()
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474
475    #[test]
476    fn test_extract_tag() {
477        let xml = "<title>Test Title</title><description>Test Description</description>";
478        assert_eq!(
479            RealTimeEngine::extract_tag(xml, "title"),
480            Some("Test Title".to_string())
481        );
482        assert_eq!(
483            RealTimeEngine::extract_tag(xml, "description"),
484            Some("Test Description".to_string())
485        );
486        assert_eq!(RealTimeEngine::extract_tag(xml, "missing"), None);
487    }
488
489    #[test]
490    fn test_category_to_domain() {
491        assert_eq!(
492            RealTimeEngine::category_to_domain("climate"),
493            Domain::Climate
494        );
495        assert_eq!(
496            RealTimeEngine::category_to_domain("Finance"),
497            Domain::Finance
498        );
499        assert_eq!(
500            RealTimeEngine::category_to_domain("research"),
501            Domain::Research
502        );
503        assert_eq!(
504            RealTimeEngine::category_to_domain("other"),
505            Domain::CrossDomain
506        );
507    }
508
509    #[test]
510    fn test_simple_embedding() {
511        let embedding = RealTimeEngine::simple_embedding("climate change impacts");
512        assert_eq!(embedding.len(), 384);
513
514        // Check normalization
515        let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
516        assert!((magnitude - 1.0).abs() < 0.01);
517    }
518
519    #[tokio::test]
520    async fn test_realtime_engine_lifecycle() {
521        let mut engine = RealTimeEngine::new(Duration::from_secs(1));
522
523        engine.add_feed(FeedSource::Rss {
524            url: "https://example.com/feed.rss".to_string(),
525            category: "climate".to_string(),
526        });
527
528        // Start and stop
529        assert!(engine.start().await.is_ok());
530        engine.stop().await;
531    }
532
533    #[test]
534    fn test_news_aggregator() {
535        let mut aggregator = NewsAggregator::new();
536        aggregator.add_default_sources();
537        assert!(aggregator.sources.len() >= 5);
538    }
539
540    #[test]
541    fn test_parse_rss_item() {
542        let xml = r#"
543            <title>Test Article</title>
544            <description>This is a test article</description>
545            <link>https://example.com/article</link>
546            <guid>article-123</guid>
547            <pubDate>Mon, 01 Jan 2024 12:00:00 GMT</pubDate>
548        "#;
549
550        let item = RealTimeEngine::parse_rss_item(xml);
551        assert!(item.is_some());
552
553        let item = item.unwrap();
554        assert_eq!(item.title, "Test Article");
555        assert_eq!(item.description, "This is a test article");
556        assert_eq!(item.link, "https://example.com/article");
557        assert_eq!(item.id, "article-123");
558    }
559}