collie_core/
worker.rs

1use chrono::DateTime;
2use chrono::FixedOffset;
3use chrono::Utc;
4use std::collections::HashMap;
5
6use crate::error::Result;
7use crate::model::feed::FeedStatus;
8use crate::model::feed::FeedToUpdate;
9use crate::model::item::ItemOrder;
10use crate::model::item::ItemReadOption;
11use crate::model::item::ItemStatus;
12use crate::model::item::ItemToCreate;
13use crate::model::syndication::RawItem;
14use crate::repository::database::DbConnection;
15use crate::service::feed;
16use crate::service::item;
17
18pub struct Worker {
19    conn: DbConnection,
20    proxy: Option<String>,
21}
22
23impl Worker {
24    pub fn new(conn: DbConnection, proxy: Option<String>) -> Self {
25        Self { conn, proxy }
26    }
27
28    pub async fn execute(&self) -> Result<Vec<ItemToCreate>> {
29        let pairs = self.get_links_to_check();
30
31        let mut inserted = vec![];
32
33        let feed_ids_to_check: Vec<i32> = pairs
34            .iter()
35            .filter_map(|(id, _, fetch_old_items)| if !fetch_old_items { Some(*id) } else { None })
36            .collect();
37
38        let most_recent_items = if !feed_ids_to_check.is_empty() {
39            self.get_most_recent_items(&feed_ids_to_check)
40                .unwrap_or_default()
41        } else {
42            HashMap::new()
43        };
44
45        for (feed, link, fetch_old_items) in pairs {
46            let items = item::fetch(&link, self.proxy.as_deref()).await?;
47
48            let mut filtered_items = if !fetch_old_items && !most_recent_items.contains_key(&feed) {
49                items
50                    .into_iter()
51                    .max_by_key(|x| x.published_at)
52                    .into_iter()
53                    .collect()
54            } else {
55                items
56                    .into_iter()
57                    .filter(|item| {
58                        most_recent_items.get(&feed).is_none_or(|most_recent| {
59                            item.published_at
60                                .is_some_and(|published_at| published_at > *most_recent)
61                        }) || fetch_old_items
62                    })
63                    .collect::<Vec<_>>()
64            };
65
66            filtered_items.sort_by_key(|x| x.published_at);
67            inserted.extend(self.insert_new_items(feed, &filtered_items));
68        }
69
70        Ok(inserted)
71    }
72
73    fn get_links_to_check(&self) -> Vec<(i32, String, bool)> {
74        if let Ok(feeds) = feed::read_all(&self.conn) {
75            let current = Utc::now().fixed_offset();
76            let filtered = feeds.iter().filter(|x| x.status == FeedStatus::Subscribed);
77
78            filtered
79                .map(|x| {
80                    let _ = feed::update(
81                        &self.conn,
82                        &(FeedToUpdate {
83                            id: x.id,
84                            title: None,
85                            link: None,
86                            status: None,
87                            checked_at: Some(current),
88                            fetch_old_items: None,
89                        }),
90                    );
91                    (x.id, x.link.clone(), x.fetch_old_items)
92                })
93                .collect()
94        } else {
95            vec![]
96        }
97    }
98
99    fn insert_new_items(&self, feed: i32, items: &[RawItem]) -> Vec<ItemToCreate> {
100        let current = Utc::now().fixed_offset();
101
102        let args = items.iter().map(|x| ItemToCreate {
103            author: x.author.clone().map(|x| x.trim().to_string()),
104            title: x.title.trim().to_string(),
105            link: x.link.clone().unwrap_or("#".to_string()).trim().to_string(),
106            description: x.content.clone().unwrap_or_default().trim().to_string(),
107            status: ItemStatus::Unread,
108            published_at: x.published_at.unwrap_or(current),
109            feed,
110        });
111
112        let mut inserted = vec![];
113        for arg in args {
114            if item::create(&self.conn, &arg).is_ok() {
115                inserted.push(arg);
116            }
117        }
118
119        inserted
120    }
121
122    fn get_most_recent_items(
123        &self,
124        feed_ids: &[i32],
125    ) -> Result<HashMap<i32, DateTime<FixedOffset>>> {
126        let mut most_recent_items = HashMap::new();
127
128        for feed_id in feed_ids {
129            let opt = ItemReadOption {
130                ids: None,
131                feed: Some(*feed_id),
132                status: None,
133                is_saved: None,
134                order_by: Some(ItemOrder::PublishedDateDesc),
135                limit: Some(1),
136                offset: None,
137            };
138
139            if let Some(item) = item::read_all(&self.conn, &opt)?.first() {
140                most_recent_items.insert(item.feed.id, item.published_at);
141            }
142        }
143
144        Ok(most_recent_items)
145    }
146}