1use chrono::DateTime;
2use chrono::FixedOffset;
3use chrono::Utc;
4use std::collections::HashMap;
5
6use crate::error::Result;
7use crate::model::feed::FeedStatus;
8use crate::model::feed::FeedToUpdate;
9use crate::model::item::ItemOrder;
10use crate::model::item::ItemReadOption;
11use crate::model::item::ItemStatus;
12use crate::model::item::ItemToCreate;
13use crate::model::syndication::RawItem;
14use crate::repository::database::DbConnection;
15use crate::service::feed;
16use crate::service::item;
17
18pub struct Worker {
19 conn: DbConnection,
20 proxy: Option<String>,
21}
22
23impl Worker {
24 pub fn new(conn: DbConnection, proxy: Option<String>) -> Self {
25 Self { conn, proxy }
26 }
27
28 pub async fn execute(&self) -> Result<Vec<ItemToCreate>> {
29 let pairs = self.get_links_to_check();
30
31 let mut inserted = vec![];
32
33 let feed_ids_to_check: Vec<i32> = pairs
34 .iter()
35 .filter_map(|(id, _, fetch_old_items)| if !fetch_old_items { Some(*id) } else { None })
36 .collect();
37
38 let most_recent_items = if !feed_ids_to_check.is_empty() {
39 self.get_most_recent_items(&feed_ids_to_check)
40 .unwrap_or_default()
41 } else {
42 HashMap::new()
43 };
44
45 for (feed, link, fetch_old_items) in pairs {
46 let items = item::fetch(&link, self.proxy.as_deref()).await?;
47
48 let mut filtered_items = if !fetch_old_items && !most_recent_items.contains_key(&feed) {
49 items
50 .into_iter()
51 .max_by_key(|x| x.published_at)
52 .into_iter()
53 .collect()
54 } else {
55 items
56 .into_iter()
57 .filter(|item| {
58 most_recent_items.get(&feed).is_none_or(|most_recent| {
59 item.published_at
60 .is_some_and(|published_at| published_at > *most_recent)
61 }) || fetch_old_items
62 })
63 .collect::<Vec<_>>()
64 };
65
66 filtered_items.sort_by_key(|x| x.published_at);
67 inserted.extend(self.insert_new_items(feed, &filtered_items));
68 }
69
70 Ok(inserted)
71 }
72
73 fn get_links_to_check(&self) -> Vec<(i32, String, bool)> {
74 if let Ok(feeds) = feed::read_all(&self.conn) {
75 let current = Utc::now().fixed_offset();
76 let filtered = feeds.iter().filter(|x| x.status == FeedStatus::Subscribed);
77
78 filtered
79 .map(|x| {
80 let _ = feed::update(
81 &self.conn,
82 &(FeedToUpdate {
83 id: x.id,
84 title: None,
85 link: None,
86 status: None,
87 checked_at: Some(current),
88 fetch_old_items: None,
89 }),
90 );
91 (x.id, x.link.clone(), x.fetch_old_items)
92 })
93 .collect()
94 } else {
95 vec![]
96 }
97 }
98
99 fn insert_new_items(&self, feed: i32, items: &[RawItem]) -> Vec<ItemToCreate> {
100 let current = Utc::now().fixed_offset();
101
102 let args = items.iter().map(|x| ItemToCreate {
103 author: x.author.clone().map(|x| x.trim().to_string()),
104 title: x.title.trim().to_string(),
105 link: x.link.clone().unwrap_or("#".to_string()).trim().to_string(),
106 description: x.content.clone().unwrap_or_default().trim().to_string(),
107 status: ItemStatus::Unread,
108 published_at: x.published_at.unwrap_or(current),
109 feed,
110 });
111
112 let mut inserted = vec![];
113 for arg in args {
114 if item::create(&self.conn, &arg).is_ok() {
115 inserted.push(arg);
116 }
117 }
118
119 inserted
120 }
121
122 fn get_most_recent_items(
123 &self,
124 feed_ids: &[i32],
125 ) -> Result<HashMap<i32, DateTime<FixedOffset>>> {
126 let mut most_recent_items = HashMap::new();
127
128 for feed_id in feed_ids {
129 let opt = ItemReadOption {
130 ids: None,
131 feed: Some(*feed_id),
132 status: None,
133 is_saved: None,
134 order_by: Some(ItemOrder::PublishedDateDesc),
135 limit: Some(1),
136 offset: None,
137 };
138
139 if let Some(item) = item::read_all(&self.conn, &opt)?.first() {
140 most_recent_items.insert(item.feed.id, item.published_at);
141 }
142 }
143
144 Ok(most_recent_items)
145 }
146}