Skip to main content

stygian_graph/adapters/
rss_feed.rs

1//! RSS / Atom feed [`ScrapingService`](crate::ports::ScrapingService) adapter
2//!
3//! Parses RSS 1.0, RSS 2.0, Atom, and JSON Feed formats via the `feed-rs`
4//! crate, returning feed items as structured JSON for downstream pipeline nodes.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use stygian_graph::adapters::rss_feed::RssFeedAdapter;
10//! use stygian_graph::ports::{ScrapingService, ServiceInput};
11//! use serde_json::json;
12//!
13//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
14//! let adapter = RssFeedAdapter::new(reqwest::Client::new());
15//! let input = ServiceInput {
16//!     url: "https://example.com/feed.xml".into(),
17//!     params: json!({}),
18//! };
19//! let output = adapter.execute(input).await.unwrap();
20//! println!("{}", output.data); // JSON array of feed items
21//! # });
22//! ```
23
24use crate::domain::error::{Result, ServiceError, StygianError};
25use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
26use async_trait::async_trait;
27use feed_rs::parser;
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30
31// ─── Domain types ─────────────────────────────────────────────────────────────
32
33/// A single feed item extracted from RSS/Atom.
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct FeedItem {
36    /// Item title.
37    pub title: Option<String>,
38    /// Primary link URL.
39    pub link: Option<String>,
40    /// Published or updated timestamp (ISO 8601).
41    pub published: Option<String>,
42    /// Item summary / description.
43    pub summary: Option<String>,
44    /// Category labels.
45    pub categories: Vec<String>,
46    /// Author names.
47    pub authors: Vec<String>,
48    /// Unique identifier (guid / id).
49    pub id: String,
50}
51
52// ─── Adapter ──────────────────────────────────────────────────────────────────
53
54/// RSS / Atom feed source adapter.
55///
56/// Fetches and parses feeds using the `feed-rs` crate which handles
57/// RSS 1.0, RSS 2.0, Atom, and JSON Feed formats transparently.
58pub struct RssFeedAdapter {
59    client: reqwest::Client,
60}
61
62impl RssFeedAdapter {
63    /// Create a new RSS feed adapter.
64    pub const fn new(client: reqwest::Client) -> Self {
65        Self { client }
66    }
67}
68
69#[async_trait]
70impl ScrapingService for RssFeedAdapter {
71    /// Fetch and parse a feed, returning items as JSON.
72    ///
73    /// # Params (optional)
74    ///
75    /// * `since` — ISO 8601 datetime string; exclude items published before this.
76    /// * `limit` — integer; maximum number of items to return.
77    /// * `categories` — array of strings; only include items matching any of these categories.
78    async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
79        let resp = self
80            .client
81            .get(&input.url)
82            .header(
83                reqwest::header::ACCEPT,
84                "application/rss+xml, application/atom+xml, application/xml, text/xml, */*",
85            )
86            .send()
87            .await
88            .map_err(|e| {
89                StygianError::Service(ServiceError::Unavailable(format!("feed fetch failed: {e}")))
90            })?;
91
92        if !resp.status().is_success() {
93            return Err(StygianError::Service(ServiceError::InvalidResponse(
94                format!("feed returned HTTP {}", resp.status()),
95            )));
96        }
97
98        let bytes = resp.bytes().await.map_err(|e| {
99            StygianError::Service(ServiceError::Unavailable(format!(
100                "feed body read failed: {e}"
101            )))
102        })?;
103
104        let feed = parser::parse(&bytes[..]).map_err(|e| {
105            StygianError::Service(ServiceError::InvalidResponse(format!(
106                "feed parse failed: {e}"
107            )))
108        })?;
109
110        // Convert feed entries into our domain type
111        let mut items: Vec<FeedItem> = feed
112            .entries
113            .iter()
114            .map(|entry| {
115                let title = entry.title.as_ref().map(|t| t.content.clone());
116                let link = entry.links.first().map(|l| l.href.clone());
117                let published = entry.published.or(entry.updated).map(|dt| dt.to_rfc3339());
118                let summary = entry.summary.as_ref().map(|s| s.content.clone());
119                let categories = entry.categories.iter().map(|c| c.term.clone()).collect();
120                let authors = entry.authors.iter().map(|a| a.name.clone()).collect();
121                let id = entry.id.clone();
122
123                FeedItem {
124                    title,
125                    link,
126                    published,
127                    summary,
128                    categories,
129                    authors,
130                    id,
131                }
132            })
133            .collect();
134
135        // Apply optional filters
136        if let Some(since) = input.params.get("since").and_then(|v| v.as_str()) {
137            items.retain(|item| {
138                item.published
139                    .as_deref()
140                    .is_some_and(|pub_date| pub_date >= since)
141            });
142        }
143
144        if let Some(cats) = input.params.get("categories").and_then(|v| v.as_array()) {
145            let filter_cats: Vec<&str> = cats.iter().filter_map(|c| c.as_str()).collect();
146            if !filter_cats.is_empty() {
147                items.retain(|item| {
148                    item.categories
149                        .iter()
150                        .any(|c| filter_cats.contains(&c.as_str()))
151                });
152            }
153        }
154
155        if let Some(limit) = input
156            .params
157            .get("limit")
158            .and_then(serde_json::Value::as_u64)
159            .and_then(|n| usize::try_from(n).ok())
160        {
161            items.truncate(limit);
162        }
163
164        let count = items.len();
165        let data = serde_json::to_string(&items).map_err(|e| {
166            StygianError::Service(ServiceError::InvalidResponse(format!(
167                "feed serialization failed: {e}"
168            )))
169        })?;
170
171        // Feed-level metadata
172        let feed_title = feed.title.map(|t| t.content);
173        let feed_description = feed.description.map(|d| d.content);
174        let feed_updated = feed.updated.map(|dt| dt.to_rfc3339());
175
176        Ok(ServiceOutput {
177            data,
178            metadata: json!({
179                "source": "rss_feed",
180                "feed_title": feed_title,
181                "feed_description": feed_description,
182                "last_updated": feed_updated,
183                "item_count": count,
184                "source_url": input.url,
185            }),
186        })
187    }
188
189    fn name(&self) -> &'static str {
190        "rss_feed"
191    }
192}
193
194// ─── Tests ────────────────────────────────────────────────────────────────────
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199
200    const RSS_FEED: &str = r#"<?xml version="1.0" encoding="UTF-8"?>
201<rss version="2.0">
202  <channel>
203    <title>Example Blog</title>
204    <link>https://example.com</link>
205    <description>An example RSS feed</description>
206    <item>
207      <title>First Post</title>
208      <link>https://example.com/post/1</link>
209      <pubDate>Mon, 01 Mar 2026 00:00:00 +0000</pubDate>
210      <description>Summary of first post</description>
211      <category>tech</category>
212      <guid>post-1</guid>
213    </item>
214    <item>
215      <title>Second Post</title>
216      <link>https://example.com/post/2</link>
217      <pubDate>Sun, 15 Feb 2026 00:00:00 +0000</pubDate>
218      <description>Summary of second post</description>
219      <category>science</category>
220      <guid>post-2</guid>
221    </item>
222    <item>
223      <title>Third Post</title>
224      <link>https://example.com/post/3</link>
225      <pubDate>Sat, 01 Feb 2026 00:00:00 +0000</pubDate>
226      <description>Summary of third post</description>
227      <category>tech</category>
228      <category>news</category>
229      <guid>post-3</guid>
230    </item>
231  </channel>
232</rss>"#;
233
234    const ATOM_FEED: &str = r#"<?xml version="1.0" encoding="UTF-8"?>
235<feed xmlns="http://www.w3.org/2005/Atom">
236  <title>Example Atom Feed</title>
237  <link href="https://example.com"/>
238  <updated>2026-03-01T00:00:00Z</updated>
239  <entry>
240    <title>Atom Entry One</title>
241    <link href="https://example.com/atom/1"/>
242    <id>urn:uuid:atom-1</id>
243    <updated>2026-03-01T00:00:00Z</updated>
244    <summary>First atom entry</summary>
245    <author><name>Alice</name></author>
246    <category term="rust"/>
247  </entry>
248  <entry>
249    <title>Atom Entry Two</title>
250    <link href="https://example.com/atom/2"/>
251    <id>urn:uuid:atom-2</id>
252    <updated>2026-02-15T00:00:00Z</updated>
253    <summary>Second atom entry</summary>
254    <author><name>Bob</name></author>
255  </entry>
256</feed>"#;
257
258    fn parse_test_feed(
259        xml: &str,
260    ) -> std::result::Result<Vec<FeedItem>, Box<dyn std::error::Error>> {
261        let feed = parser::parse(xml.as_bytes())?;
262        let items = feed
263            .entries
264            .iter()
265            .map(|entry| {
266                let title = entry.title.as_ref().map(|t| t.content.clone());
267                let link = entry.links.first().map(|l| l.href.clone());
268                let published = entry.published.or(entry.updated).map(|dt| dt.to_rfc3339());
269                let summary = entry.summary.as_ref().map(|s| s.content.clone());
270                let categories = entry.categories.iter().map(|c| c.term.clone()).collect();
271                let authors = entry.authors.iter().map(|a| a.name.clone()).collect();
272                let id = entry.id.clone();
273
274                FeedItem {
275                    title,
276                    link,
277                    published,
278                    summary,
279                    categories,
280                    authors,
281                    id,
282                }
283            })
284            .collect();
285        Ok(items)
286    }
287
288    #[test]
289    fn parse_rss_with_3_items() -> std::result::Result<(), Box<dyn std::error::Error>> {
290        let items = parse_test_feed(RSS_FEED)?;
291        assert_eq!(items.len(), 3);
292        let first = items
293            .first()
294            .ok_or_else(|| std::io::Error::other("expected first item"))?;
295        assert_eq!(first.title.as_deref(), Some("First Post"));
296        assert_eq!(first.link.as_deref(), Some("https://example.com/post/1"));
297        assert!(first.published.is_some());
298        assert_eq!(first.summary.as_deref(), Some("Summary of first post"));
299        Ok(())
300    }
301
302    #[test]
303    fn parse_atom_with_authors() -> std::result::Result<(), Box<dyn std::error::Error>> {
304        let items = parse_test_feed(ATOM_FEED)?;
305        assert_eq!(items.len(), 2);
306        let first = items
307            .first()
308            .ok_or_else(|| std::io::Error::other("expected first atom item"))?;
309        assert_eq!(first.title.as_deref(), Some("Atom Entry One"));
310        assert_eq!(first.authors, vec!["Alice".to_string()]);
311        assert_eq!(first.categories, vec!["rust".to_string()]);
312        assert_eq!(first.link.as_deref(), Some("https://example.com/atom/1"));
313        Ok(())
314    }
315
316    #[test]
317    fn filter_by_since_date() -> std::result::Result<(), Box<dyn std::error::Error>> {
318        let mut items = parse_test_feed(RSS_FEED)?;
319        // Keep only items published in March 2026 or later
320        items.retain(|item| {
321            item.published
322                .as_deref()
323                .is_some_and(|pub_date| pub_date >= "2026-03-01")
324        });
325        assert_eq!(items.len(), 1);
326        let first = items
327            .first()
328            .ok_or_else(|| std::io::Error::other("expected one filtered item"))?;
329        assert_eq!(first.title.as_deref(), Some("First Post"));
330        Ok(())
331    }
332
333    #[test]
334    fn filter_by_categories() -> std::result::Result<(), Box<dyn std::error::Error>> {
335        let mut items = parse_test_feed(RSS_FEED)?;
336        let filter_cats = ["tech"];
337        items.retain(|item| {
338            item.categories
339                .iter()
340                .any(|c| filter_cats.contains(&c.as_str()))
341        });
342        assert_eq!(items.len(), 2);
343        let first = items
344            .first()
345            .ok_or_else(|| std::io::Error::other("expected first filtered item"))?;
346        let second = items
347            .get(1)
348            .ok_or_else(|| std::io::Error::other("expected second filtered item"))?;
349        assert_eq!(first.title.as_deref(), Some("First Post"));
350        assert_eq!(second.title.as_deref(), Some("Third Post"));
351        Ok(())
352    }
353
354    #[test]
355    fn empty_feed_returns_empty_array() -> std::result::Result<(), Box<dyn std::error::Error>> {
356        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
357<rss version="2.0">
358  <channel>
359    <title>Empty Feed</title>
360  </channel>
361</rss>"#;
362        let items = parse_test_feed(xml)?;
363        assert!(items.is_empty());
364        Ok(())
365    }
366
367    #[test]
368    fn malformed_feed_returns_error() {
369        let bad = b"<not-a-feed><broken";
370        let result = parser::parse(&bad[..]);
371        assert!(result.is_err());
372    }
373
374    #[test]
375    fn limit_truncates_items() -> std::result::Result<(), Box<dyn std::error::Error>> {
376        let mut items = parse_test_feed(RSS_FEED)?;
377        assert_eq!(items.len(), 3);
378        items.truncate(2);
379        assert_eq!(items.len(), 2);
380        Ok(())
381    }
382
383    #[test]
384    fn rss_items_have_ids() -> std::result::Result<(), Box<dyn std::error::Error>> {
385        let items = parse_test_feed(RSS_FEED)?;
386        let first = items
387            .first()
388            .ok_or_else(|| std::io::Error::other("expected first rss item"))?;
389        let second = items
390            .get(1)
391            .ok_or_else(|| std::io::Error::other("expected second rss item"))?;
392        let third = items
393            .get(2)
394            .ok_or_else(|| std::io::Error::other("expected third rss item"))?;
395        assert!(!first.id.is_empty());
396        assert!(!second.id.is_empty());
397        assert!(!third.id.is_empty());
398        Ok(())
399    }
400
401    #[test]
402    fn atom_feed_has_categories() -> std::result::Result<(), Box<dyn std::error::Error>> {
403        let items = parse_test_feed(ATOM_FEED)?;
404        let first = items
405            .first()
406            .ok_or_else(|| std::io::Error::other("expected first atom item"))?;
407        let second = items
408            .get(1)
409            .ok_or_else(|| std::io::Error::other("expected second atom item"))?;
410        assert_eq!(first.categories, vec!["rust"]);
411        assert!(second.categories.is_empty());
412        Ok(())
413    }
414}