ruvector_data_framework/
realtime.rs

1//! Real-Time Data Feed Integration
2//!
3//! RSS/Atom feed parsing, WebSocket streaming, and REST API polling
4//! for continuous data ingestion into RuVector discovery framework.
5
6use std::collections::HashSet;
7use std::sync::Arc;
8use std::time::Duration;
9
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use tokio::sync::RwLock;
13use tokio::time::interval;
14
15use crate::ruvector_native::{Domain, SemanticVector};
16use crate::{FrameworkError, Result};
17
18/// Real-time engine for streaming data feeds
19pub struct RealTimeEngine {
20    feeds: Vec<FeedSource>,
21    update_interval: Duration,
22    on_new_data: Option<Arc<dyn Fn(Vec<SemanticVector>) + Send + Sync>>,
23    dedup_cache: Arc<RwLock<HashSet<String>>>,
24    running: Arc<RwLock<bool>>,
25}
26
27/// Types of feed sources
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub enum FeedSource {
30    /// RSS or Atom feed
31    Rss { url: String, category: String },
32    /// REST API with polling
33    RestPolling { url: String, interval: Duration },
34    /// WebSocket streaming endpoint
35    WebSocket { url: String },
36}
37
38/// News aggregator for multiple RSS feeds
39pub struct NewsAggregator {
40    sources: Vec<NewsSource>,
41    client: reqwest::Client,
42}
43
44/// Individual news source configuration
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct NewsSource {
47    pub name: String,
48    pub feed_url: String,
49    pub domain: Domain,
50}
51
52/// Parsed feed item
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct FeedItem {
55    pub id: String,
56    pub title: String,
57    pub description: String,
58    pub link: String,
59    pub published: Option<chrono::DateTime<Utc>>,
60    pub author: Option<String>,
61    pub categories: Vec<String>,
62}
63
64impl RealTimeEngine {
65    /// Create a new real-time engine
66    pub fn new(update_interval: Duration) -> Self {
67        Self {
68            feeds: Vec::new(),
69            update_interval,
70            on_new_data: None,
71            dedup_cache: Arc::new(RwLock::new(HashSet::new())),
72            running: Arc::new(RwLock::new(false)),
73        }
74    }
75
76    /// Add a feed source to monitor
77    pub fn add_feed(&mut self, source: FeedSource) {
78        self.feeds.push(source);
79    }
80
81    /// Set callback for new data
82    pub fn set_callback<F>(&mut self, callback: F)
83    where
84        F: Fn(Vec<SemanticVector>) + Send + Sync + 'static,
85    {
86        self.on_new_data = Some(Arc::new(callback));
87    }
88
89    /// Start the real-time engine
90    pub async fn start(&mut self) -> Result<()> {
91        {
92            let mut running = self.running.write().await;
93            if *running {
94                return Err(FrameworkError::Config(
95                    "Engine already running".to_string(),
96                ));
97            }
98            *running = true;
99        }
100
101        let feeds = self.feeds.clone();
102        let callback = self.on_new_data.clone();
103        let dedup_cache = self.dedup_cache.clone();
104        let update_interval = self.update_interval;
105        let running = self.running.clone();
106
107        tokio::spawn(async move {
108            let mut ticker = interval(update_interval);
109
110            loop {
111                ticker.tick().await;
112
113                // Check if we should stop
114                {
115                    let is_running = running.read().await;
116                    if !*is_running {
117                        break;
118                    }
119                }
120
121                // Process all feeds
122                for feed in &feeds {
123                    match Self::process_feed(feed, &dedup_cache).await {
124                        Ok(vectors) => {
125                            if !vectors.is_empty() {
126                                if let Some(ref cb) = callback {
127                                    cb(vectors);
128                                }
129                            }
130                        }
131                        Err(e) => {
132                            tracing::error!("Feed processing error: {}", e);
133                        }
134                    }
135                }
136            }
137        });
138
139        Ok(())
140    }
141
142    /// Stop the real-time engine
143    pub async fn stop(&mut self) {
144        let mut running = self.running.write().await;
145        *running = false;
146    }
147
148    /// Process a single feed source
149    async fn process_feed(
150        feed: &FeedSource,
151        dedup_cache: &Arc<RwLock<HashSet<String>>>,
152    ) -> Result<Vec<SemanticVector>> {
153        match feed {
154            FeedSource::Rss { url, category } => {
155                Self::process_rss_feed(url, category, dedup_cache).await
156            }
157            FeedSource::RestPolling { url, .. } => {
158                Self::process_rest_feed(url, dedup_cache).await
159            }
160            FeedSource::WebSocket { url } => Self::process_websocket_feed(url, dedup_cache).await,
161        }
162    }
163
164    /// Process RSS/Atom feed
165    async fn process_rss_feed(
166        url: &str,
167        category: &str,
168        dedup_cache: &Arc<RwLock<HashSet<String>>>,
169    ) -> Result<Vec<SemanticVector>> {
170        let client = reqwest::Client::new();
171        let response = client.get(url).send().await?;
172        let content = response.text().await?;
173
174        // Parse RSS/Atom feed
175        let items = Self::parse_rss(&content)?;
176
177        let mut vectors = Vec::new();
178        let mut cache = dedup_cache.write().await;
179
180        for item in items {
181            // Check for duplicates
182            if cache.contains(&item.id) {
183                continue;
184            }
185
186            // Add to dedup cache
187            cache.insert(item.id.clone());
188
189            // Convert to SemanticVector
190            let domain = Self::category_to_domain(category);
191            let vector = Self::item_to_vector(item, domain);
192            vectors.push(vector);
193        }
194
195        Ok(vectors)
196    }
197
198    /// Process REST API polling
199    async fn process_rest_feed(
200        url: &str,
201        dedup_cache: &Arc<RwLock<HashSet<String>>>,
202    ) -> Result<Vec<SemanticVector>> {
203        let client = reqwest::Client::new();
204        let response = client.get(url).send().await?;
205        let items: Vec<FeedItem> = response.json().await?;
206
207        let mut vectors = Vec::new();
208        let mut cache = dedup_cache.write().await;
209
210        for item in items {
211            if cache.contains(&item.id) {
212                continue;
213            }
214
215            cache.insert(item.id.clone());
216            let vector = Self::item_to_vector(item, Domain::Research);
217            vectors.push(vector);
218        }
219
220        Ok(vectors)
221    }
222
223    /// Process WebSocket stream (simplified implementation)
224    async fn process_websocket_feed(
225        _url: &str,
226        _dedup_cache: &Arc<RwLock<HashSet<String>>>,
227    ) -> Result<Vec<SemanticVector>> {
228        // WebSocket implementation would require tokio-tungstenite
229        // For now, return empty - can be extended with actual WebSocket client
230        tracing::warn!("WebSocket feeds not yet implemented");
231        Ok(Vec::new())
232    }
233
234    /// Parse RSS/Atom XML into feed items
235    fn parse_rss(content: &str) -> Result<Vec<FeedItem>> {
236        // Simple XML parsing for RSS 2.0
237        // In production, use feed-rs or rss crate
238        let mut items = Vec::new();
239
240        // Basic RSS parsing (simplified)
241        for item_block in content.split("<item>").skip(1) {
242            if let Some(end) = item_block.find("</item>") {
243                let item_xml = &item_block[..end];
244                if let Some(item) = Self::parse_rss_item(item_xml) {
245                    items.push(item);
246                }
247            }
248        }
249
250        Ok(items)
251    }
252
253    /// Parse a single RSS item from XML
254    fn parse_rss_item(xml: &str) -> Option<FeedItem> {
255        let title = Self::extract_tag(xml, "title")?;
256        let description = Self::extract_tag(xml, "description").unwrap_or_default();
257        let link = Self::extract_tag(xml, "link").unwrap_or_default();
258        let guid = Self::extract_tag(xml, "guid").unwrap_or_else(|| link.clone());
259
260        let published = Self::extract_tag(xml, "pubDate")
261            .and_then(|date_str| chrono::DateTime::parse_from_rfc2822(&date_str).ok())
262            .map(|dt| dt.with_timezone(&Utc));
263
264        let author = Self::extract_tag(xml, "author");
265
266        Some(FeedItem {
267            id: guid,
268            title,
269            description,
270            link,
271            published,
272            author,
273            categories: Vec::new(),
274        })
275    }
276
277    /// Extract content between XML tags
278    fn extract_tag(xml: &str, tag: &str) -> Option<String> {
279        let start_tag = format!("<{}>", tag);
280        let end_tag = format!("</{}>", tag);
281
282        let start = xml.find(&start_tag)? + start_tag.len();
283        let end = xml.find(&end_tag)?;
284
285        if start < end {
286            let content = &xml[start..end];
287            // Basic HTML entity decoding
288            let decoded = content
289                .replace("&lt;", "<")
290                .replace("&gt;", ">")
291                .replace("&amp;", "&")
292                .replace("&quot;", "\"")
293                .replace("&#39;", "'");
294            Some(decoded.trim().to_string())
295        } else {
296            None
297        }
298    }
299
300    /// Convert category string to Domain enum
301    fn category_to_domain(category: &str) -> Domain {
302        match category.to_lowercase().as_str() {
303            "climate" | "weather" | "environment" => Domain::Climate,
304            "finance" | "economy" | "market" | "stock" => Domain::Finance,
305            "research" | "science" | "academic" | "medical" => Domain::Research,
306            _ => Domain::CrossDomain,
307        }
308    }
309
310    /// Convert FeedItem to SemanticVector
311    fn item_to_vector(item: FeedItem, domain: Domain) -> SemanticVector {
312        use std::collections::HashMap;
313
314        // Create a simple embedding from title + description
315        // In production, use actual embedding model
316        let text = format!("{} {}", item.title, item.description);
317        let embedding = Self::simple_embedding(&text);
318
319        let mut metadata = HashMap::new();
320        metadata.insert("title".to_string(), item.title.clone());
321        metadata.insert("link".to_string(), item.link.clone());
322        if let Some(author) = item.author {
323            metadata.insert("author".to_string(), author);
324        }
325
326        SemanticVector {
327            id: item.id,
328            embedding,
329            domain,
330            timestamp: item.published.unwrap_or_else(Utc::now),
331            metadata,
332        }
333    }
334
335    /// Simple embedding generation (hash-based for demo)
336    fn simple_embedding(text: &str) -> Vec<f32> {
337        use std::collections::hash_map::DefaultHasher;
338        use std::hash::{Hash, Hasher};
339
340        // Create 384-dimensional embedding from text hash
341        let mut embedding = vec![0.0f32; 384];
342
343        for (i, word) in text.split_whitespace().take(384).enumerate() {
344            let mut hasher = DefaultHasher::new();
345            word.hash(&mut hasher);
346            let hash = hasher.finish();
347            embedding[i] = (hash as f32 / u64::MAX as f32) * 2.0 - 1.0;
348        }
349
350        // Normalize
351        let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
352        if magnitude > 0.0 {
353            for val in &mut embedding {
354                *val /= magnitude;
355            }
356        }
357
358        embedding
359    }
360}
361
362impl NewsAggregator {
363    /// Create a new news aggregator
364    pub fn new() -> Self {
365        Self {
366            sources: Vec::new(),
367            client: reqwest::Client::builder()
368                .user_agent("RuVector/1.0")
369                .timeout(Duration::from_secs(30))
370                .build()
371                .unwrap(),
372        }
373    }
374
375    /// Add a news source
376    pub fn add_source(&mut self, source: NewsSource) {
377        self.sources.push(source);
378    }
379
380    /// Add default free news sources
381    pub fn add_default_sources(&mut self) {
382        // Climate sources
383        self.add_source(NewsSource {
384            name: "NASA Earth Observatory".to_string(),
385            feed_url: "https://earthobservatory.nasa.gov/feeds/image-of-the-day.rss".to_string(),
386            domain: Domain::Climate,
387        });
388
389        // Financial sources
390        self.add_source(NewsSource {
391            name: "Yahoo Finance - Top Stories".to_string(),
392            feed_url: "https://finance.yahoo.com/news/rssindex".to_string(),
393            domain: Domain::Finance,
394        });
395
396        // Medical/Research sources
397        self.add_source(NewsSource {
398            name: "PubMed Recent".to_string(),
399            feed_url: "https://pubmed.ncbi.nlm.nih.gov/rss/search/1nKx2zx8g-9UCGpQD5qVmN6jTvSRRxYqjD3T_nA-pSMjDlXr4u/?limit=100&utm_campaign=pubmed-2&fc=20210421200858".to_string(),
400            domain: Domain::Research,
401        });
402
403        // General news sources
404        self.add_source(NewsSource {
405            name: "Reuters Top News".to_string(),
406            feed_url: "https://www.reutersagency.com/feed/?taxonomy=best-topics&post_type=best".to_string(),
407            domain: Domain::CrossDomain,
408        });
409
410        self.add_source(NewsSource {
411            name: "AP News Top Stories".to_string(),
412            feed_url: "https://apnews.com/index.rss".to_string(),
413            domain: Domain::CrossDomain,
414        });
415    }
416
417    /// Fetch latest items from all sources
418    pub async fn fetch_latest(&self, limit: usize) -> Result<Vec<SemanticVector>> {
419        let mut all_vectors = Vec::new();
420        let mut seen = HashSet::new();
421
422        for source in &self.sources {
423            match self.fetch_source(source, limit).await {
424                Ok(vectors) => {
425                    for vector in vectors {
426                        if !seen.contains(&vector.id) {
427                            seen.insert(vector.id.clone());
428                            all_vectors.push(vector);
429                        }
430                    }
431                }
432                Err(e) => {
433                    tracing::warn!("Failed to fetch {}: {}", source.name, e);
434                }
435            }
436        }
437
438        // Sort by timestamp, most recent first
439        all_vectors.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
440
441        // Limit results
442        all_vectors.truncate(limit);
443
444        Ok(all_vectors)
445    }
446
447    /// Fetch from a single source
448    async fn fetch_source(&self, source: &NewsSource, limit: usize) -> Result<Vec<SemanticVector>> {
449        let response = self.client.get(&source.feed_url).send().await?;
450        let content = response.text().await?;
451
452        let items = RealTimeEngine::parse_rss(&content)?;
453        let mut vectors = Vec::new();
454
455        for item in items.into_iter().take(limit) {
456            let vector = RealTimeEngine::item_to_vector(item, source.domain);
457            vectors.push(vector);
458        }
459
460        Ok(vectors)
461    }
462}
463
464impl Default for NewsAggregator {
465    fn default() -> Self {
466        Self::new()
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[test]
475    fn test_extract_tag() {
476        let xml = "<title>Test Title</title><description>Test Description</description>";
477        assert_eq!(
478            RealTimeEngine::extract_tag(xml, "title"),
479            Some("Test Title".to_string())
480        );
481        assert_eq!(
482            RealTimeEngine::extract_tag(xml, "description"),
483            Some("Test Description".to_string())
484        );
485        assert_eq!(RealTimeEngine::extract_tag(xml, "missing"), None);
486    }
487
488    #[test]
489    fn test_category_to_domain() {
490        assert_eq!(
491            RealTimeEngine::category_to_domain("climate"),
492            Domain::Climate
493        );
494        assert_eq!(
495            RealTimeEngine::category_to_domain("Finance"),
496            Domain::Finance
497        );
498        assert_eq!(
499            RealTimeEngine::category_to_domain("research"),
500            Domain::Research
501        );
502        assert_eq!(
503            RealTimeEngine::category_to_domain("other"),
504            Domain::CrossDomain
505        );
506    }
507
508    #[test]
509    fn test_simple_embedding() {
510        let embedding = RealTimeEngine::simple_embedding("climate change impacts");
511        assert_eq!(embedding.len(), 384);
512
513        // Check normalization
514        let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
515        assert!((magnitude - 1.0).abs() < 0.01);
516    }
517
518    #[tokio::test]
519    async fn test_realtime_engine_lifecycle() {
520        let mut engine = RealTimeEngine::new(Duration::from_secs(1));
521
522        engine.add_feed(FeedSource::Rss {
523            url: "https://example.com/feed.rss".to_string(),
524            category: "climate".to_string(),
525        });
526
527        // Start and stop
528        assert!(engine.start().await.is_ok());
529        engine.stop().await;
530    }
531
532    #[test]
533    fn test_news_aggregator() {
534        let mut aggregator = NewsAggregator::new();
535        aggregator.add_default_sources();
536        assert!(aggregator.sources.len() >= 5);
537    }
538
539    #[test]
540    fn test_parse_rss_item() {
541        let xml = r#"
542            <title>Test Article</title>
543            <description>This is a test article</description>
544            <link>https://example.com/article</link>
545            <guid>article-123</guid>
546            <pubDate>Mon, 01 Jan 2024 12:00:00 GMT</pubDate>
547        "#;
548
549        let item = RealTimeEngine::parse_rss_item(xml);
550        assert!(item.is_some());
551
552        let item = item.unwrap();
553        assert_eq!(item.title, "Test Article");
554        assert_eq!(item.description, "This is a test article");
555        assert_eq!(item.link, "https://example.com/article");
556        assert_eq!(item.id, "article-123");
557    }
558}