Skip to main content

stygian_graph/adapters/
rss_feed.rs

1//! RSS / Atom feed [`ScrapingService`](crate::ports::ScrapingService) adapter
2//!
3//! Parses RSS 1.0, RSS 2.0, Atom, and JSON Feed formats via the `feed-rs`
4//! crate, returning feed items as structured JSON for downstream pipeline nodes.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use stygian_graph::adapters::rss_feed::RssFeedAdapter;
10//! use stygian_graph::ports::{ScrapingService, ServiceInput};
11//! use serde_json::json;
12//!
13//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
14//! let adapter = RssFeedAdapter::new(reqwest::Client::new());
15//! let input = ServiceInput {
16//!     url: "https://example.com/feed.xml".into(),
17//!     params: json!({}),
18//! };
19//! let output = adapter.execute(input).await.unwrap();
20//! println!("{}", output.data); // JSON array of feed items
21//! # });
22//! ```
23
24use crate::domain::error::{Result, ServiceError, StygianError};
25use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
26use async_trait::async_trait;
27use feed_rs::parser;
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30
31// ─── Domain types ─────────────────────────────────────────────────────────────
32
33/// A single feed item extracted from RSS/Atom.
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct FeedItem {
36    /// Item title.
37    pub title: Option<String>,
38    /// Primary link URL.
39    pub link: Option<String>,
40    /// Published or updated timestamp (ISO 8601).
41    pub published: Option<String>,
42    /// Item summary / description.
43    pub summary: Option<String>,
44    /// Category labels.
45    pub categories: Vec<String>,
46    /// Author names.
47    pub authors: Vec<String>,
48    /// Unique identifier (guid / id).
49    pub id: String,
50}
51
52// ─── Adapter ──────────────────────────────────────────────────────────────────
53
54/// RSS / Atom feed source adapter.
55///
56/// Fetches and parses feeds using the `feed-rs` crate which handles
57/// RSS 1.0, RSS 2.0, Atom, and JSON Feed formats transparently.
58pub struct RssFeedAdapter {
59    client: reqwest::Client,
60}
61
62impl RssFeedAdapter {
63    /// Create a new RSS feed adapter.
64    #[must_use]
65    pub const fn new(client: reqwest::Client) -> Self {
66        Self { client }
67    }
68}
69
70#[async_trait]
71impl ScrapingService for RssFeedAdapter {
72    /// Fetch and parse a feed, returning items as JSON.
73    ///
74    /// # Params (optional)
75    ///
76    /// * `since` — ISO 8601 datetime string; exclude items published before this.
77    /// * `limit` — integer; maximum number of items to return.
78    /// * `categories` — array of strings; only include items matching any of these categories.
79    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
80        let resp = self
81            .client
82            .get(&input.url)
83            .header(
84                reqwest::header::ACCEPT,
85                "application/rss+xml, application/atom+xml, application/xml, text/xml, */*",
86            )
87            .send()
88            .await
89            .map_err(|e| {
90                StygianError::Service(ServiceError::Unavailable(format!("feed fetch failed: {e}")))
91            })?;
92
93        if !resp.status().is_success() {
94            return Err(StygianError::Service(ServiceError::InvalidResponse(
95                format!("feed returned HTTP {}", resp.status()),
96            )));
97        }
98
99        let bytes = resp.bytes().await.map_err(|e| {
100            StygianError::Service(ServiceError::Unavailable(format!(
101                "feed body read failed: {e}"
102            )))
103        })?;
104
105        let feed = parser::parse(&bytes[..]).map_err(|e| {
106            StygianError::Service(ServiceError::InvalidResponse(format!(
107                "feed parse failed: {e}"
108            )))
109        })?;
110
111        // Convert feed entries into our domain type
112        let mut items: Vec<FeedItem> = feed
113            .entries
114            .iter()
115            .map(|entry| {
116                let title = entry.title.as_ref().map(|t| t.content.clone());
117                let link = entry.links.first().map(|l| l.href.clone());
118                let published = entry.published.or(entry.updated).map(|dt| dt.to_rfc3339());
119                let summary = entry.summary.as_ref().map(|s| s.content.clone());
120                let categories = entry.categories.iter().map(|c| c.term.clone()).collect();
121                let authors = entry.authors.iter().map(|a| a.name.clone()).collect();
122                let id = entry.id.clone();
123
124                FeedItem {
125                    title,
126                    link,
127                    published,
128                    summary,
129                    categories,
130                    authors,
131                    id,
132                }
133            })
134            .collect();
135
136        // Apply optional filters
137        if let Some(since) = input.params.get("since").and_then(|v| v.as_str()) {
138            items.retain(|item| {
139                item.published
140                    .as_deref()
141                    .is_some_and(|pub_date| pub_date >= since)
142            });
143        }
144
145        if let Some(cats) = input.params.get("categories").and_then(|v| v.as_array()) {
146            let filter_cats: Vec<&str> = cats.iter().filter_map(|c| c.as_str()).collect();
147            if !filter_cats.is_empty() {
148                items.retain(|item| {
149                    item.categories
150                        .iter()
151                        .any(|c| filter_cats.contains(&c.as_str()))
152                });
153            }
154        }
155
156        if let Some(limit) = input
157            .params
158            .get("limit")
159            .and_then(serde_json::Value::as_u64)
160            .and_then(|n| usize::try_from(n).ok())
161        {
162            items.truncate(limit);
163        }
164
165        let count = items.len();
166        let data = serde_json::to_string(&items).map_err(|e| {
167            StygianError::Service(ServiceError::InvalidResponse(format!(
168                "feed serialization failed: {e}"
169            )))
170        })?;
171
172        // Feed-level metadata
173        let feed_title = feed.title.map(|t| t.content);
174        let feed_description = feed.description.map(|d| d.content);
175        let feed_updated = feed.updated.map(|dt| dt.to_rfc3339());
176
177        Ok(ServiceOutput {
178            data,
179            metadata: json!({
180                "source": "rss_feed",
181                "feed_title": feed_title,
182                "feed_description": feed_description,
183                "last_updated": feed_updated,
184                "item_count": count,
185                "source_url": input.url,
186            }),
187        })
188    }
189
190    fn name(&self) -> &'static str {
191        "rss_feed"
192    }
193}
194
195// ─── Tests ────────────────────────────────────────────────────────────────────
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    const RSS_FEED: &str = r#"<?xml version="1.0" encoding="UTF-8"?>
202<rss version="2.0">
203  <channel>
204    <title>Example Blog</title>
205    <link>https://example.com</link>
206    <description>An example RSS feed</description>
207    <item>
208      <title>First Post</title>
209      <link>https://example.com/post/1</link>
210      <pubDate>Mon, 01 Mar 2026 00:00:00 +0000</pubDate>
211      <description>Summary of first post</description>
212      <category>tech</category>
213      <guid>post-1</guid>
214    </item>
215    <item>
216      <title>Second Post</title>
217      <link>https://example.com/post/2</link>
218      <pubDate>Sun, 15 Feb 2026 00:00:00 +0000</pubDate>
219      <description>Summary of second post</description>
220      <category>science</category>
221      <guid>post-2</guid>
222    </item>
223    <item>
224      <title>Third Post</title>
225      <link>https://example.com/post/3</link>
226      <pubDate>Sat, 01 Feb 2026 00:00:00 +0000</pubDate>
227      <description>Summary of third post</description>
228      <category>tech</category>
229      <category>news</category>
230      <guid>post-3</guid>
231    </item>
232  </channel>
233</rss>"#;
234
235    const ATOM_FEED: &str = r#"<?xml version="1.0" encoding="UTF-8"?>
236<feed xmlns="http://www.w3.org/2005/Atom">
237  <title>Example Atom Feed</title>
238  <link href="https://example.com"/>
239  <updated>2026-03-01T00:00:00Z</updated>
240  <entry>
241    <title>Atom Entry One</title>
242    <link href="https://example.com/atom/1"/>
243    <id>urn:uuid:atom-1</id>
244    <updated>2026-03-01T00:00:00Z</updated>
245    <summary>First atom entry</summary>
246    <author><name>Alice</name></author>
247    <category term="rust"/>
248  </entry>
249  <entry>
250    <title>Atom Entry Two</title>
251    <link href="https://example.com/atom/2"/>
252    <id>urn:uuid:atom-2</id>
253    <updated>2026-02-15T00:00:00Z</updated>
254    <summary>Second atom entry</summary>
255    <author><name>Bob</name></author>
256  </entry>
257</feed>"#;
258
259    fn parse_test_feed(
260        xml: &str,
261    ) -> std::result::Result<Vec<FeedItem>, Box<dyn std::error::Error>> {
262        let feed = parser::parse(xml.as_bytes())?;
263        let items = feed
264            .entries
265            .iter()
266            .map(|entry| {
267                let title = entry.title.as_ref().map(|t| t.content.clone());
268                let link = entry.links.first().map(|l| l.href.clone());
269                let published = entry.published.or(entry.updated).map(|dt| dt.to_rfc3339());
270                let summary = entry.summary.as_ref().map(|s| s.content.clone());
271                let categories = entry.categories.iter().map(|c| c.term.clone()).collect();
272                let authors = entry.authors.iter().map(|a| a.name.clone()).collect();
273                let id = entry.id.clone();
274
275                FeedItem {
276                    title,
277                    link,
278                    published,
279                    summary,
280                    categories,
281                    authors,
282                    id,
283                }
284            })
285            .collect();
286        Ok(items)
287    }
288
289    #[test]
290    fn parse_rss_with_3_items() -> std::result::Result<(), Box<dyn std::error::Error>> {
291        let items = parse_test_feed(RSS_FEED)?;
292        assert_eq!(items.len(), 3);
293        let first = items
294            .first()
295            .ok_or_else(|| std::io::Error::other("expected first item"))?;
296        assert_eq!(first.title.as_deref(), Some("First Post"));
297        assert_eq!(first.link.as_deref(), Some("https://example.com/post/1"));
298        assert!(first.published.is_some());
299        assert_eq!(first.summary.as_deref(), Some("Summary of first post"));
300        Ok(())
301    }
302
303    #[test]
304    fn parse_atom_with_authors() -> std::result::Result<(), Box<dyn std::error::Error>> {
305        let items = parse_test_feed(ATOM_FEED)?;
306        assert_eq!(items.len(), 2);
307        let first = items
308            .first()
309            .ok_or_else(|| std::io::Error::other("expected first atom item"))?;
310        assert_eq!(first.title.as_deref(), Some("Atom Entry One"));
311        assert_eq!(first.authors, vec!["Alice".to_string()]);
312        assert_eq!(first.categories, vec!["rust".to_string()]);
313        assert_eq!(first.link.as_deref(), Some("https://example.com/atom/1"));
314        Ok(())
315    }
316
317    #[test]
318    fn filter_by_since_date() -> std::result::Result<(), Box<dyn std::error::Error>> {
319        let mut items = parse_test_feed(RSS_FEED)?;
320        // Keep only items published in March 2026 or later
321        items.retain(|item| {
322            item.published
323                .as_deref()
324                .is_some_and(|pub_date| pub_date >= "2026-03-01")
325        });
326        assert_eq!(items.len(), 1);
327        let first = items
328            .first()
329            .ok_or_else(|| std::io::Error::other("expected one filtered item"))?;
330        assert_eq!(first.title.as_deref(), Some("First Post"));
331        Ok(())
332    }
333
334    #[test]
335    fn filter_by_categories() -> std::result::Result<(), Box<dyn std::error::Error>> {
336        let mut items = parse_test_feed(RSS_FEED)?;
337        let filter_cats = ["tech"];
338        items.retain(|item| {
339            item.categories
340                .iter()
341                .any(|c| filter_cats.contains(&c.as_str()))
342        });
343        assert_eq!(items.len(), 2);
344        let first = items
345            .first()
346            .ok_or_else(|| std::io::Error::other("expected first filtered item"))?;
347        let second = items
348            .get(1)
349            .ok_or_else(|| std::io::Error::other("expected second filtered item"))?;
350        assert_eq!(first.title.as_deref(), Some("First Post"));
351        assert_eq!(second.title.as_deref(), Some("Third Post"));
352        Ok(())
353    }
354
355    #[test]
356    fn empty_feed_returns_empty_array() -> std::result::Result<(), Box<dyn std::error::Error>> {
357        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
358<rss version="2.0">
359  <channel>
360    <title>Empty Feed</title>
361  </channel>
362</rss>"#;
363        let items = parse_test_feed(xml)?;
364        assert!(items.is_empty());
365        Ok(())
366    }
367
368    #[test]
369    fn malformed_feed_returns_error() {
370        let bad = b"<not-a-feed><broken";
371        let result = parser::parse(&bad[..]);
372        assert!(result.is_err());
373    }
374
375    #[test]
376    fn limit_truncates_items() -> std::result::Result<(), Box<dyn std::error::Error>> {
377        let mut items = parse_test_feed(RSS_FEED)?;
378        assert_eq!(items.len(), 3);
379        items.truncate(2);
380        assert_eq!(items.len(), 2);
381        Ok(())
382    }
383
384    #[test]
385    fn rss_items_have_ids() -> std::result::Result<(), Box<dyn std::error::Error>> {
386        let items = parse_test_feed(RSS_FEED)?;
387        let first = items
388            .first()
389            .ok_or_else(|| std::io::Error::other("expected first rss item"))?;
390        let second = items
391            .get(1)
392            .ok_or_else(|| std::io::Error::other("expected second rss item"))?;
393        let third = items
394            .get(2)
395            .ok_or_else(|| std::io::Error::other("expected third rss item"))?;
396        assert!(!first.id.is_empty());
397        assert!(!second.id.is_empty());
398        assert!(!third.id.is_empty());
399        Ok(())
400    }
401
402    #[test]
403    fn atom_feed_has_categories() -> std::result::Result<(), Box<dyn std::error::Error>> {
404        let items = parse_test_feed(ATOM_FEED)?;
405        let first = items
406            .first()
407            .ok_or_else(|| std::io::Error::other("expected first atom item"))?;
408        let second = items
409            .get(1)
410            .ok_or_else(|| std::io::Error::other("expected second atom item"))?;
411        assert_eq!(first.categories, vec!["rust"]);
412        assert!(second.categories.is_empty());
413        Ok(())
414    }
415}