Skip to main content

news_flash/
lib.rs

1#[macro_use]
2extern crate diesel;
3#[macro_use]
4extern crate diesel_migrations;
5
6mod action_cache;
7mod builder;
8mod config;
9mod database;
10mod default_portal;
11pub mod error;
12pub mod feed_api;
13mod feed_api_implementations;
14pub mod models;
15mod password_encryption;
16mod schema;
17pub mod util;
18
19#[cfg(test)]
20mod tests;
21
22use crate::action_cache::ActionCache;
23use crate::builder::NewsFlashBuilder;
24use crate::config::ConfigProxy;
25use crate::database::DatabaseExt;
26use crate::default_portal::DefaultPortal;
27use crate::error::NewsFlashError;
28use crate::feed_api::FeedApi;
29use crate::feed_api_implementations::FeedApiImplementations;
30use crate::models::{
31    Article, ArticleFilter, ArticleID, Category, CategoryID, DatabaseSize, Enclosure, FatArticle, FatFavIcon, FavIcon, Feed, FeedID, FeedMapping,
32    LoginData, Marked, NEWSFLASH_TOPLEVEL, PluginCapabilities, PluginID, PluginInfo, Read, Tag, TagID, Tagging, Url,
33};
34
35use crate::util::favicons::FavIconLoader;
36pub use crate::util::feed_parser::{self, ParsedUrl};
37use crate::util::opml;
38use chrono::{DateTime, Duration, Utc};
39use feed_api::FeedHeaderMap;
40use models::{CategoryMapping, OfflineAction, OfflineActionType, Thumbnail};
41use reqwest::Client;
42use reqwest::header::{HeaderMap, HeaderValue};
43use std::collections::hash_map::HashMap;
44use std::sync::Arc;
45use std::sync::atomic::{AtomicBool, Ordering};
46use tokio::sync::{Mutex, RwLock, Semaphore};
47
48#[cfg(feature = "article-scraper")]
49use article_scraper::ArticleScraper;
50#[cfg(feature = "article-scraper")]
51pub use article_scraper::DownloadProgress;
52
53#[cfg(feature = "image-downloader")]
54use crate::models::{Image, ImageMetadata};
55#[cfg(feature = "image-downloader")]
56use futures::channel::mpsc::Sender;
57#[cfg(feature = "image-downloader")]
58use std::collections::HashSet;
59#[cfg(feature = "image-downloader")]
60use tokio::io::AsyncReadExt;
61
62#[cfg(any(feature = "article-scraper", feature = "image-downloader"))]
63use std::path::PathBuf;
64
65#[cfg(feature = "article-scraper")]
66static SCRAPER_DATA_DIR: &str = "ftr-site-config";
67
68#[cfg(feature = "image-downloader")]
69static IMAGE_DATA_DIR: &str = "pictures";
70
71type NewsFlashResult<T> = Result<T, NewsFlashError>;
72
73pub struct NewsFlash {
74    #[cfg(any(feature = "article-scraper", feature = "image-downloader"))]
75    data_dir: PathBuf,
76    db: Arc<Box<dyn DatabaseExt>>,
77    api: RwLock<Box<dyn FeedApi>>,
78    config: Arc<RwLock<ConfigProxy>>,
79    download_semaphore: Arc<Semaphore>,
80    icons: FavIconLoader,
81    #[cfg(feature = "article-scraper")]
82    scraper: RwLock<Option<Arc<ArticleScraper>>>,
83    sync_cache: Mutex<ActionCache>,
84    is_sync_ongoing: Arc<AtomicBool>,
85    is_offline: Arc<AtomicBool>,
86}
87
88impl NewsFlash {
89    pub fn list_backends() -> HashMap<PluginID, PluginInfo> {
90        let mut map: HashMap<PluginID, PluginInfo> = HashMap::new();
91        for api_meta in FeedApiImplementations::list() {
92            map.insert(api_meta.id(), api_meta.info().unwrap());
93        }
94        map
95    }
96
97    pub fn builder() -> NewsFlashBuilder {
98        NewsFlashBuilder::default()
99    }
100
101    pub async fn id(&self) -> Option<PluginID> {
102        self.config.read().await.get_backend()
103    }
104
105    pub async fn user_name(&self) -> Option<String> {
106        self.api.read().await.user_name().await
107    }
108
109    pub async fn features(&self) -> NewsFlashResult<PluginCapabilities> {
110        let features = self.api.read().await.features()?;
111        Ok(features)
112    }
113
114    pub async fn get_login_data(&self) -> Option<LoginData> {
115        self.api.read().await.get_login_data().await
116    }
117
118    pub async fn last_sync(&self) -> DateTime<Utc> {
119        self.config.read().await.get_last_sync()
120    }
121
122    pub fn is_sync_ongoing(&self) -> bool {
123        self.is_sync_ongoing.load(Ordering::Acquire)
124    }
125
126    pub fn is_offline(&self) -> bool {
127        self.is_offline.load(Ordering::Acquire)
128    }
129
130    pub fn get_semaphore(&self) -> Arc<Semaphore> {
131        self.download_semaphore.clone()
132    }
133
134    pub async fn set_offline(&self, offline: bool, client: &Client) -> NewsFlashResult<()> {
135        self.is_offline.store(offline, Ordering::Release);
136
137        if !offline {
138            let offline_actions = self.db.read_offline_actions()?;
139            self.db.drop_offline_actions()?;
140            let mut action_cache = ActionCache::new();
141
142            for offline_action in offline_actions {
143                match offline_action.action_type {
144                    OfflineActionType::Read => action_cache.add_article_marked_read(&offline_action.article_id),
145                    OfflineActionType::Unread => action_cache.add_article_marked_unread(&offline_action.article_id),
146                    OfflineActionType::Mark => action_cache.add_article_mark(&offline_action.article_id),
147                    OfflineActionType::Unmark => action_cache.add_article_unmark(&offline_action.article_id),
148                    OfflineActionType::Tag => {
149                        if let Some(tag_id) = offline_action.tag_id {
150                            action_cache.add_article_tagged(&offline_action.article_id, &tag_id);
151                        }
152                    }
153                    OfflineActionType::Untag => {
154                        if let Some(tag_id) = offline_action.tag_id {
155                            action_cache.add_article_untagged(&offline_action.article_id, &tag_id);
156                        }
157                    }
158                }
159            }
160
161            self.sync_cache.lock().await.execute_api_actions(&self.api, client).await?;
162        }
163
164        Ok(())
165    }
166
167    pub fn is_database_empty(&self) -> NewsFlashResult<bool> {
168        let is_empty = self.db.is_empty()?;
169        Ok(is_empty)
170    }
171
172    pub async fn set_keep_articles_duration(&self, keep_articles: Option<chrono::Duration>) -> NewsFlashResult<()> {
173        self.config.write().await.set_keep_articles_duration(keep_articles)?;
174        if let Some(keep_articles) = keep_articles {
175            self.db.drop_old_articles(keep_articles)?;
176
177            #[cfg(feature = "image-downloader")]
178            self.delete_orphaned_images().await?;
179        }
180
181        Ok(())
182    }
183
184    pub async fn get_keep_articles_duration(&self) -> Option<chrono::Duration> {
185        self.config.read().await.get_keep_articles_duration()
186    }
187
188    pub fn error_login_related(error: &NewsFlashError) -> bool {
189        matches!(error, NewsFlashError::LoadBackend | NewsFlashError::NotLoggedIn | NewsFlashError::API(_))
190    }
191
192    pub async fn is_reachable(&self, client: &Client) -> NewsFlashResult<bool> {
193        let reachable = self.api.read().await.is_reachable(client).await?;
194        Ok(reachable)
195    }
196
197    pub async fn get_icon(&self, feed_id: &FeedID, client: &Client, header: HeaderMap<HeaderValue>) -> NewsFlashResult<FavIcon> {
198        let info = self.icons.get_icon(feed_id, &self.api, client, header).await?;
199        Ok(info)
200    }
201
202    pub fn load_icon_from_db(&self, feed_id: &FeedID) -> NewsFlashResult<FatFavIcon> {
203        let icon = self.db.read_fatfavicon(feed_id)?;
204        Ok(icon)
205    }
206
207    pub async fn get_article_thumbnail(&self, article_id: &ArticleID, client: &Client) -> NewsFlashResult<Option<Thumbnail>> {
208        if let Ok(thumbnail) = self.db.read_thumbnail(article_id) {
209            if thumbnail.data.is_some() {
210                return Ok(Some(thumbnail));
211            }
212
213            if thumbnail.last_try + Duration::try_days(4).unwrap() > Utc::now() {
214                tracing::debug!(%article_id, "Tried to download thumbnail recently, will not attemt to download again.");
215                return Ok(None);
216            }
217        }
218
219        let Ok(article) = self.db.read_article(article_id) else {
220            tracing::warn!(%article_id, "Couldn't download thumbnail: article not found");
221            return Ok(None);
222        };
223
224        let Some(thumbnail_url) = article.thumbnail_url.as_deref() else {
225            tracing::debug!(%article_id, "Couldn't download thumbnail: article doesn't specify thumbnail url");
226            return Ok(None);
227        };
228
229        let thumbnail_result = Thumbnail::from_url(thumbnail_url, article_id, client).await;
230        match thumbnail_result {
231            Ok(thumbnail) => {
232                self.db.insert_thumbnail(&thumbnail)?;
233                Ok(Some(thumbnail))
234            }
235            Err(error) => {
236                let empty_thumbnail = Thumbnail::empty(article_id);
237                self.db.insert_thumbnail(&empty_thumbnail)?;
238                tracing::warn!(%thumbnail_url, %error, "downloading thumbnail failed");
239                Err(NewsFlashError::Thumbnail)
240            }
241        }
242    }
243
244    pub fn database_size(&self) -> NewsFlashResult<DatabaseSize> {
245        let size = self.db.size()?;
246        Ok(size)
247    }
248
249    pub async fn login(&self, data: LoginData, client: &Client) -> NewsFlashResult<()> {
250        let id = data.id();
251        self.api.write().await.login(data, client).await?;
252        self.config.write().await.set_backend(Some(&id))?;
253        Ok(())
254    }
255
256    pub async fn logout(&self, client: &Client) -> NewsFlashResult<()> {
257        self.config.write().await.set_backend(None)?;
258        self.api.write().await.logout(client).await?;
259        self.db.reset()?;
260        Ok(())
261    }
262
263    pub fn clean_db(&self) -> NewsFlashResult<()> {
264        self.db.clean()?;
265        Ok(())
266    }
267
268    pub async fn initial_sync(&self, client: &Client, header: FeedHeaderMap) -> NewsFlashResult<HashMap<FeedID, i64>> {
269        if self.is_offline.load(Ordering::Acquire) {
270            return Err(NewsFlashError::Offline);
271        }
272
273        if self.is_sync_ongoing.load(Ordering::Acquire) {
274            tracing::warn!("sync is already ongoing");
275            return Err(NewsFlashError::Syncing);
276        }
277
278        if self.api.read().await.is_logged_in(client).await? {
279            self.is_sync_ongoing.store(true, Ordering::Release);
280
281            let now = chrono::Utc::now();
282            let keep_articles_duration = self.config.read().await.get_keep_articles_duration();
283
284            let result = self.api.read().await.initial_sync(client, header).await;
285            if result.is_err() {
286                self.is_sync_ongoing.store(true, Ordering::Release);
287            }
288            let sync_result = result?;
289            // Filter out old articles that would be deleted right away afterwards
290            let sync_result = sync_result.remove_old_articles(keep_articles_duration);
291            let sync_result = sync_result.generate_tag_colors(&[]);
292            // Modify result with all changes that happened during sync
293            let sync_result = self.sync_cache.lock().await.process_sync_result(sync_result);
294            // push all changes that happend druing sync to the backend
295            let result = self.sync_cache.lock().await.execute_api_actions(&self.api, client).await;
296            if let Err(cache_execution_error) = result {
297                self.is_sync_ongoing.store(false, Ordering::Release);
298                return Err(cache_execution_error.into());
299            }
300            // reset the sync_cache for next sync
301            self.sync_cache.lock().await.reset();
302
303            let result = self.db.write_sync_result(sync_result, keep_articles_duration);
304            if result.is_err() {
305                self.is_sync_ongoing.store(false, Ordering::Release);
306            }
307            let new_article_count = result?;
308
309            #[cfg(feature = "image-downloader")]
310            self.delete_orphaned_images().await?;
311
312            if let Err(write_config_error) = self.config.write().await.set_last_sync(now) {
313                self.is_sync_ongoing.store(false, Ordering::Release);
314                return Err(write_config_error.into());
315            }
316
317            self.is_sync_ongoing.store(false, Ordering::Release);
318            return Ok(new_article_count);
319        }
320        Err(NewsFlashError::NotLoggedIn)
321    }
322
323    pub async fn sync(&self, client: &Client, header: FeedHeaderMap) -> NewsFlashResult<HashMap<FeedID, i64>> {
324        if self.is_offline.load(Ordering::Acquire) {
325            return Err(NewsFlashError::Offline);
326        }
327
328        if self.is_sync_ongoing.load(Ordering::Acquire) {
329            tracing::warn!("sync is already ongoing");
330            return Err(NewsFlashError::Syncing);
331        }
332
333        if self.api.read().await.is_logged_in(client).await? {
334            self.is_sync_ongoing.store(true, Ordering::Release);
335
336            let now = chrono::Utc::now();
337            let keep_articles_duration = self.config.read().await.get_keep_articles_duration();
338
339            let sync_result = self.api.read().await.sync(client, header).await;
340            if sync_result.is_err() {
341                self.is_sync_ongoing.store(false, Ordering::Release);
342            }
343            let sync_result = sync_result?;
344
345            // Filter out old articles that would be deleted right away afterwards
346            let sync_result = sync_result.remove_old_articles(keep_articles_duration);
347
348            let tags = self.db.read_tags()?;
349            let sync_result = sync_result.generate_tag_colors(&tags);
350            // Modify result with all changes that happened during sync
351            let sync_result = self.sync_cache.lock().await.process_sync_result(sync_result);
352            // push all changes that happend druing sync to the backend
353            let result = self.sync_cache.lock().await.execute_api_actions(&self.api, client).await;
354            if let Err(cache_execution_error) = result {
355                self.is_sync_ongoing.store(false, Ordering::Release);
356                return Err(cache_execution_error.into());
357            }
358            // reset the sync_cache for next sync
359            self.sync_cache.lock().await.reset();
360
361            let result = self.db.write_sync_result(sync_result, keep_articles_duration);
362            if result.is_err() {
363                self.is_sync_ongoing.store(false, Ordering::Release);
364            }
365            let new_article_count = result?;
366
367            #[cfg(feature = "image-downloader")]
368            self.delete_orphaned_images().await?;
369
370            if let Err(write_config_error) = self.config.write().await.set_last_sync(now) {
371                self.is_sync_ongoing.store(false, Ordering::Release);
372                return Err(write_config_error.into());
373            }
374
375            self.is_sync_ongoing.store(false, Ordering::Release);
376            return Ok(new_article_count);
377        }
378        Err(NewsFlashError::NotLoggedIn)
379    }
380
381    pub async fn fetch_feed(&self, feed_id: &FeedID, client: &Client, header: HeaderMap<HeaderValue>) -> NewsFlashResult<i64> {
382        if self.is_offline.load(Ordering::Acquire) {
383            return Err(NewsFlashError::Offline);
384        }
385
386        if self.is_sync_ongoing.load(Ordering::Acquire) {
387            tracing::warn!("sync is already ongoing");
388            return Err(NewsFlashError::Syncing);
389        }
390
391        if self.api.read().await.is_logged_in(client).await? {
392            self.is_sync_ongoing.store(true, Ordering::Release);
393
394            let keep_articles_duration = self.config.read().await.get_keep_articles_duration();
395
396            let result = self.api.read().await.fetch_feed(feed_id, client, header).await;
397            if result.is_err() {
398                self.is_sync_ongoing.store(false, Ordering::Release);
399            }
400            let update_result = result?;
401            // Filter out old articles that would be deleted right away afterwards
402            let update_result = update_result.remove_old_articles(keep_articles_duration);
403            // push all changes that happend druing sync to the backend
404            let result = self.sync_cache.lock().await.execute_api_actions(&self.api, client).await;
405            if let Err(cache_execution_error) = result {
406                self.is_sync_ongoing.store(false, Ordering::Release);
407                return Err(cache_execution_error.into());
408            }
409            // reset the sync_cache for next sync
410            self.sync_cache.lock().await.reset();
411
412            let result = self.db.write_feed_update_result(feed_id, update_result);
413            if result.is_err() {
414                self.is_sync_ongoing.store(false, Ordering::Release);
415            }
416            let new_article_count = result?;
417
418            self.is_sync_ongoing.store(false, Ordering::Release);
419            Ok(new_article_count)
420        } else {
421            Err(NewsFlashError::NotLoggedIn)
422        }
423    }
424
425    pub async fn set_article_read(&self, articles: &[ArticleID], read: Read, client: &Client) -> NewsFlashResult<()> {
426        if self.is_offline.load(Ordering::Acquire) {
427            let action_type = if read == Read::Read {
428                OfflineActionType::Read
429            } else {
430                OfflineActionType::Unread
431            };
432            let actions = articles
433                .iter()
434                .map(|article_id| OfflineAction {
435                    action_type,
436                    article_id: article_id.clone(),
437                    tag_id: None,
438                })
439                .collect::<Vec<_>>();
440            self.db.insert_offline_actions(&actions)?;
441            self.db.set_article_read(articles, read)?;
442
443            Ok(())
444        } else if self.api.read().await.is_logged_in(client).await? {
445            if self.is_sync_ongoing.load(Ordering::Acquire) {
446                for article_id in articles {
447                    match read {
448                        Read::Read => self.sync_cache.lock().await.add_article_marked_read(article_id),
449                        Read::Unread => self.sync_cache.lock().await.add_article_marked_unread(article_id),
450                    }
451                }
452                self.db.set_article_read(articles, read)?;
453            } else {
454                let articles_before = self.db.read_articles(ArticleFilter::read_ids(articles.into(), read.invert()))?;
455                self.db.set_article_read(articles, read)?;
456
457                let api_result = self.api.read().await.set_article_read(articles, read, client).await;
458
459                // in case of error, reset read state to what it was before
460                if api_result.is_err() {
461                    let ids_before = articles_before.iter().map(|a| a.article_id.clone()).collect::<Vec<_>>();
462                    self.db.set_article_read(&ids_before, read.invert())?;
463                }
464
465                api_result?;
466            }
467
468            Ok(())
469        } else {
470            Err(NewsFlashError::NotLoggedIn)
471        }
472    }
473
474    pub async fn set_article_marked(&self, articles: &[ArticleID], marked: Marked, client: &Client) -> NewsFlashResult<()> {
475        if self.is_offline.load(Ordering::Acquire) {
476            let action_type = if marked == Marked::Marked {
477                OfflineActionType::Mark
478            } else {
479                OfflineActionType::Unmark
480            };
481            let actions = articles
482                .iter()
483                .map(|article_id| OfflineAction {
484                    action_type,
485                    article_id: article_id.clone(),
486                    tag_id: None,
487                })
488                .collect::<Vec<_>>();
489            self.db.insert_offline_actions(&actions)?;
490            self.db.set_article_marked(articles, marked)?;
491
492            Ok(())
493        } else if self.api.read().await.is_logged_in(client).await? {
494            if self.is_sync_ongoing.load(Ordering::Acquire) {
495                for article_id in articles {
496                    match marked {
497                        Marked::Marked => self.sync_cache.lock().await.add_article_mark(article_id),
498                        Marked::Unmarked => self.sync_cache.lock().await.add_article_unmark(article_id),
499                    }
500                }
501
502                self.db.set_article_marked(articles, marked)?;
503            } else {
504                let articles_before = self.db.read_articles(ArticleFilter::marked_ids(articles.into(), marked.invert()))?;
505                self.db.set_article_marked(articles, marked)?;
506
507                let article_ids_to_update = articles_before.iter().map(|a| &a.article_id).cloned().collect::<Vec<_>>();
508                let api_result = self.api.read().await.set_article_marked(&article_ids_to_update, marked, client).await;
509
510                // in case of error, reset marked state to what it was before
511                if api_result.is_err() {
512                    let ids_before = articles_before.iter().map(|a| a.article_id.clone()).collect::<Vec<_>>();
513                    self.db.set_article_marked(&ids_before, marked.invert())?;
514                }
515
516                api_result?;
517            }
518
519            Ok(())
520        } else {
521            Err(NewsFlashError::NotLoggedIn)
522        }
523    }
524
525    pub async fn set_feed_read(&self, feeds: &[FeedID], client: &Client) -> NewsFlashResult<()> {
526        if self.is_offline.load(Ordering::Acquire) {
527            let mut actions = Vec::new();
528
529            for feed in feeds {
530                let filter = ArticleFilter::feed_unread(feed);
531                let articles = self.db.read_articles(filter)?;
532                actions.append(
533                    &mut articles
534                        .into_iter()
535                        .map(|article| OfflineAction {
536                            action_type: OfflineActionType::Read,
537                            article_id: article.article_id,
538                            tag_id: None,
539                        })
540                        .collect(),
541                );
542            }
543
544            self.db.insert_offline_actions(&actions)?;
545            self.db.set_feed_read(feeds)?;
546
547            Ok(())
548        } else if self.api.read().await.is_logged_in(client).await? {
549            if self.is_sync_ongoing.load(Ordering::Acquire) {
550                for feed_id in feeds {
551                    self.sync_cache.lock().await.add_feed_mark_read(feed_id);
552                }
553                self.db.set_feed_read(feeds)?;
554            } else {
555                let mut unread_articles_before: Vec<ArticleID> = Vec::new();
556                for feed_id in feeds {
557                    let articles_before = self.db.read_articles(ArticleFilter::feed_unread(feed_id))?;
558                    unread_articles_before.append(&mut articles_before.into_iter().map(|a| a.article_id).collect());
559                }
560
561                self.db.set_feed_read(feeds)?;
562
563                let api_result = self.api.read().await.set_feed_read(feeds, &unread_articles_before, client).await;
564
565                // in case of error, reset read state to what it was before
566                if api_result.is_err() {
567                    self.db.set_article_read(&unread_articles_before, Read::Unread)?;
568                }
569
570                api_result?;
571            }
572
573            Ok(())
574        } else {
575            Err(NewsFlashError::NotLoggedIn)
576        }
577    }
578
579    pub async fn set_category_read(&self, categories: &[CategoryID], client: &Client) -> NewsFlashResult<()> {
580        if self.is_offline.load(Ordering::Acquire) {
581            let mut actions = Vec::new();
582
583            for category in categories {
584                let filter = ArticleFilter::category_unread(category);
585                let articles = self.db.read_articles(filter)?;
586                actions.append(
587                    &mut articles
588                        .into_iter()
589                        .map(|article| OfflineAction {
590                            action_type: OfflineActionType::Read,
591                            article_id: article.article_id,
592                            tag_id: None,
593                        })
594                        .collect(),
595                );
596            }
597
598            self.db.insert_offline_actions(&actions)?;
599            self.db.set_category_read(categories)?;
600
601            Ok(())
602        } else if self.api.read().await.is_logged_in(client).await? {
603            if self.is_sync_ongoing.load(Ordering::Acquire) {
604                for category_id in categories {
605                    self.sync_cache.lock().await.add_category_mark_read(category_id);
606                }
607                self.db.set_category_read(categories)?;
608            } else {
609                let mut unread_articles_before: Vec<ArticleID> = Vec::new();
610                for category_id in categories {
611                    let articles_before = self.db.read_articles(ArticleFilter::category_unread(category_id))?;
612                    unread_articles_before.append(&mut articles_before.into_iter().map(|a| a.article_id).collect());
613                }
614
615                self.db.set_category_read(categories)?;
616                let api_result = self.api.read().await.set_category_read(categories, &unread_articles_before, client).await;
617
618                // in case of error, reset read state to what it was before
619                if api_result.is_err() {
620                    self.db.set_article_read(&unread_articles_before, Read::Unread)?;
621                }
622
623                api_result?;
624            }
625
626            Ok(())
627        } else {
628            Err(NewsFlashError::NotLoggedIn)
629        }
630    }
631
632    pub async fn set_tag_read(&self, tags: &[TagID], client: &Client) -> NewsFlashResult<()> {
633        if self.is_offline.load(Ordering::Acquire) {
634            let mut actions = Vec::new();
635
636            for tag in tags {
637                let filter = ArticleFilter::tag_unread(tag);
638                let articles = self.db.read_articles(filter)?;
639                actions.append(
640                    &mut articles
641                        .into_iter()
642                        .map(|article| OfflineAction {
643                            action_type: OfflineActionType::Read,
644                            article_id: article.article_id,
645                            tag_id: None,
646                        })
647                        .collect(),
648                );
649            }
650
651            self.db.insert_offline_actions(&actions)?;
652            self.db.set_tag_read(tags)?;
653
654            Ok(())
655        } else if self.api.read().await.is_logged_in(client).await? {
656            if self.is_sync_ongoing.load(Ordering::Acquire) {
657                for tag_id in tags {
658                    self.sync_cache.lock().await.add_tag_mark_read(tag_id);
659                }
660                self.db.set_tag_read(tags)?;
661            } else {
662                let mut unread_articles_before: Vec<ArticleID> = Vec::new();
663                for tag_id in tags {
664                    let articles_before = self.db.read_articles(ArticleFilter::tag_unread(tag_id))?;
665                    unread_articles_before.append(&mut articles_before.into_iter().map(|a| a.article_id).collect());
666                }
667
668                self.db.set_tag_read(tags)?;
669                let api_result = self.api.read().await.set_tag_read(tags, &unread_articles_before, client).await;
670
671                // in case of error, reset read state to what it was before
672                if api_result.is_err() {
673                    self.db.set_article_read(&unread_articles_before, Read::Unread)?;
674                }
675
676                api_result?;
677            }
678
679            Ok(())
680        } else {
681            Err(NewsFlashError::NotLoggedIn)
682        }
683    }
684
685    pub async fn set_all_read(&self, client: &Client) -> NewsFlashResult<()> {
686        if self.is_offline.load(Ordering::Acquire) {
687            let filter = ArticleFilter::all_unread();
688            let articles = self.db.read_articles(filter)?;
689            let actions = articles
690                .into_iter()
691                .map(|article| OfflineAction {
692                    action_type: OfflineActionType::Read,
693                    article_id: article.article_id,
694                    tag_id: None,
695                })
696                .collect::<Vec<_>>();
697
698            self.db.insert_offline_actions(&actions)?;
699            self.db.set_all_read()?;
700
701            Ok(())
702        } else if self.api.read().await.is_logged_in(client).await? {
703            if self.is_sync_ongoing.load(Ordering::Acquire) {
704                let categories = self.db.read_categories()?;
705                for category in categories {
706                    self.sync_cache.lock().await.add_category_mark_read(&category.category_id);
707                }
708                self.db.set_all_read()?;
709            } else {
710                let unread_articles_before = self
711                    .db
712                    .read_articles(ArticleFilter::all_unread())?
713                    .into_iter()
714                    .map(|a| a.article_id)
715                    .collect::<Vec<_>>();
716                self.db.set_all_read()?;
717
718                let api_result = self.api.read().await.set_all_read(&unread_articles_before, client).await;
719
720                // in case of error, reset read state to what it was before
721                if api_result.is_err() {
722                    self.db.set_article_read(&unread_articles_before, Read::Unread)?;
723                }
724
725                api_result?;
726            }
727
728            Ok(())
729        } else {
730            Err(NewsFlashError::NotLoggedIn)
731        }
732    }
733
734    pub async fn add_feed(
735        &self,
736        url: &Url,
737        title: Option<String>,
738        category_id: Option<CategoryID>,
739        client: &Client,
740    ) -> NewsFlashResult<(Feed, FeedMapping, Option<Category>, Option<CategoryMapping>)> {
741        if self.is_offline.load(Ordering::Acquire) {
742            return Err(NewsFlashError::Offline);
743        }
744
745        if self.api.read().await.is_logged_in(client).await? {
746            if self.is_sync_ongoing.load(Ordering::Acquire) {
747                return Err(NewsFlashError::Syncing);
748            }
749
750            let (feed, category) = self.api.read().await.add_feed(url, title, category_id.clone(), client).await?;
751
752            self.db.insert_feed(&feed)?;
753
754            let category_mapping = if let Some(category) = &category {
755                if self.db.read_category(&category.category_id).is_ok() {
756                    // category already exists -> no need to insert it or create a mapping
757                    None
758                } else {
759                    let category_mapping = CategoryMapping {
760                        parent_id: NEWSFLASH_TOPLEVEL.clone(),
761                        category_id: category.category_id.clone(),
762                        sort_index: Some(i32::MAX),
763                    };
764                    self.db.insert_category(category)?;
765                    self.db.insert_category_mapping(&category_mapping)?;
766                    Some(category_mapping)
767                }
768            } else {
769                None
770            };
771
772            let category_id = match category_id {
773                Some(category_id) => Some(category_id),
774                None => category.as_ref().map(|c| c.category_id.clone()),
775            };
776
777            let feed_mapping = FeedMapping {
778                feed_id: feed.feed_id.clone(),
779                category_id: category_id.unwrap_or(NEWSFLASH_TOPLEVEL.clone()),
780                sort_index: Some(i32::MAX),
781            };
782
783            self.db.insert_feed_mapping(&feed_mapping)?;
784
785            return Ok((feed, feed_mapping, category, category_mapping));
786        }
787        Err(NewsFlashError::NotLoggedIn)
788    }
789
790    pub async fn remove_feed(&self, feed_id: &FeedID, client: &Client) -> NewsFlashResult<()> {
791        if self.is_offline.load(Ordering::Acquire) {
792            return Err(NewsFlashError::Offline);
793        }
794
795        if self.api.read().await.is_logged_in(client).await? {
796            if self.is_sync_ongoing.load(Ordering::Acquire) {
797                return Err(NewsFlashError::Syncing);
798            }
799            self.api.read().await.remove_feed(feed_id, client).await?;
800
801            // remove feed from db
802            self.db.drop_feed(feed_id)?;
803
804            #[cfg(feature = "image-downloader")]
805            self.delete_orphaned_images().await?;
806
807            return Ok(());
808        }
809        Err(NewsFlashError::NotLoggedIn)
810    }
811
812    pub async fn move_feed(&self, from: &FeedMapping, to: &FeedMapping, client: &Client) -> NewsFlashResult<()> {
813        if self.is_offline.load(Ordering::Acquire) {
814            return Err(NewsFlashError::Offline);
815        }
816
817        // if feed is still in same category: only alter local data
818        if from.category_id == to.category_id {
819            // drop mapping 'from'
820            self.db.drop_feed_mapping(from)?;
821            // add mapping 'to'
822            self.db.insert_feed_mapping(to)?;
823            Ok(())
824        } else if self.api.read().await.is_logged_in(client).await? {
825            if self.is_sync_ongoing.load(Ordering::Acquire) {
826                return Err(NewsFlashError::Syncing);
827            }
828
829            self.api
830                .read()
831                .await
832                .move_feed(&from.feed_id, &from.category_id, &to.category_id, client)
833                .await?;
834
835            // drop mapping 'from'
836            self.db.drop_feed_mapping(from)?;
837
838            // add mapping 'to'
839            self.db.insert_feed_mapping(to)?;
840            Ok(())
841        } else {
842            Err(NewsFlashError::NotLoggedIn)
843        }
844    }
845
846    pub async fn rename_feed(&self, feed_id: &FeedID, new_title: &str, client: &Client) -> NewsFlashResult<Feed> {
847        if self.is_offline.load(Ordering::Acquire) {
848            return Err(NewsFlashError::Offline);
849        }
850
851        if self.api.read().await.is_logged_in(client).await? {
852            if self.is_sync_ongoing.load(Ordering::Acquire) {
853                return Err(NewsFlashError::Syncing);
854            }
855
856            let new_id = self.api.read().await.rename_feed(feed_id, new_title, client).await?;
857
858            let mut modified_feed = self.db.read_feed(feed_id)?;
859            new_title.clone_into(&mut modified_feed.label);
860            modified_feed.feed_id = new_id.clone();
861
862            self.db.insert_feed(&modified_feed)?;
863
864            if &new_id != feed_id {
865                self.db.drop_feed(feed_id)?;
866
867                // fix mappings
868                let mappings = self.db.read_feed_mappings(Some(feed_id), None)?;
869                let modified_mappings: Vec<FeedMapping> = mappings
870                    .into_iter()
871                    .map(|mut mapping| {
872                        mapping.feed_id = new_id.clone();
873                        mapping
874                    })
875                    .collect();
876                self.db.drop_mapping_of_feed(feed_id)?;
877                self.db.insert_feed_mappings(&modified_mappings)?;
878
879                // fix articles
880                let articles = self.db.read_articles(ArticleFilter {
881                    feeds: Some([feed_id.clone()].into()),
882                    ..ArticleFilter::default()
883                })?;
884                let mut modified_ids: Vec<ArticleID> = Vec::new();
885                let modified_articles: Vec<Article> = articles
886                    .into_iter()
887                    .map(|mut article| {
888                        modified_ids.push(article.article_id.clone());
889                        article.feed_id = new_id.clone();
890                        article
891                    })
892                    .collect();
893                self.db.drop_articles(&modified_ids)?;
894                self.db.write_articles(&modified_articles)?;
895            }
896            return Ok(modified_feed);
897        }
898        Err(NewsFlashError::NotLoggedIn)
899    }
900
901    pub async fn edit_feed_url(&self, feed_id: &FeedID, new_url: &str, client: &Client) -> NewsFlashResult<()> {
902        if self.is_offline.load(Ordering::Acquire) {
903            return Err(NewsFlashError::Offline);
904        }
905
906        if self.api.read().await.is_logged_in(client).await? {
907            if self.is_sync_ongoing.load(Ordering::Acquire) {
908                return Err(NewsFlashError::Syncing);
909            }
910
911            // validate url
912            let parsed_url = Url::parse(new_url)?;
913
914            self.api.read().await.edit_feed_url(feed_id, new_url, client).await?;
915
916            let mut modified_feed = self.db.read_feed(feed_id)?;
917            modified_feed.feed_url = Some(parsed_url);
918
919            self.db.insert_feed(&modified_feed)?;
920
921            Ok(())
922        } else {
923            Err(NewsFlashError::NotLoggedIn)
924        }
925    }
926
927    pub async fn sort_alphabetically(&self) -> NewsFlashResult<()> {
928        self.db.sort_alphabetically()?;
929        Ok(())
930    }
931
932    pub async fn add_category(&self, title: &str, parent: Option<&CategoryID>, client: &Client) -> NewsFlashResult<(Category, CategoryMapping)> {
933        if self.is_offline.load(Ordering::Acquire) {
934            return Err(NewsFlashError::Offline);
935        }
936
937        if self.api.read().await.is_logged_in(client).await? {
938            if self.is_sync_ongoing.load(Ordering::Acquire) {
939                return Err(NewsFlashError::Syncing);
940            }
941
942            let category_id = self.api.read().await.add_category(title, parent, client).await?;
943
944            let category = Category {
945                category_id: category_id.clone(),
946                label: title.to_owned(),
947            };
948
949            let category_mapping = CategoryMapping {
950                parent_id: match parent {
951                    Some(parent) => parent.clone(),
952                    None => NEWSFLASH_TOPLEVEL.clone(),
953                },
954                category_id,
955                sort_index: Some(i32::MAX),
956            };
957
958            self.db.insert_category(&category)?;
959            self.db.insert_category_mapping(&category_mapping)?;
960
961            return Ok((category, category_mapping));
962        }
963        Err(NewsFlashError::NotLoggedIn)
964    }
965
966    pub async fn remove_category(&self, category_id: &CategoryID, remove_children: bool, client: &Client) -> NewsFlashResult<()> {
967        if self.is_offline.load(Ordering::Acquire) {
968            return Err(NewsFlashError::Offline);
969        }
970
971        if self.api.read().await.is_logged_in(client).await? {
972            if self.is_sync_ongoing.load(Ordering::Acquire) {
973                return Err(NewsFlashError::Syncing);
974            }
975
976            self.api.read().await.remove_category(category_id, remove_children, client).await?;
977        } else {
978            return Err(NewsFlashError::NotLoggedIn);
979        }
980
981        if remove_children {
982            self.remove_category_from_db_recurse(category_id)?;
983        } else {
984            self.remove_category_from_db_move_children_up(category_id)?;
985        }
986
987        #[cfg(feature = "image-downloader")]
988        self.delete_orphaned_images().await?;
989
990        Ok(())
991    }
992
993    fn remove_category_from_db_move_children_up(&self, category_id: &CategoryID) -> NewsFlashResult<()> {
994        let parent_id = self
995            .db
996            .read_category_mappings(None, Some(category_id))?
997            .first()
998            .map(|m| m.parent_id.clone())
999            .unwrap_or_else(|| NEWSFLASH_TOPLEVEL.clone());
1000
1001        tracing::trace!(%parent_id);
1002
1003        // map feeds of category as children of parent
1004        let feed_mappings = self.db.read_feed_mappings(None, Some(category_id))?;
1005        tracing::trace!(%category_id, ?feed_mappings, "feeds of category");
1006
1007        for mut mapping in feed_mappings {
1008            self.db.drop_feed_mapping(&mapping)?;
1009            mapping.category_id = parent_id.clone();
1010            self.db.insert_feed_mapping(&mapping)?;
1011        }
1012
1013        // map child categories of category as children of parent
1014        let category_mappings = self.db.read_category_mappings(Some(category_id), None)?;
1015        tracing::trace!(%category_id, ?category_mappings, "child categories of category");
1016
1017        for mut mapping in category_mappings {
1018            self.db.drop_category_mapping(&mapping)?;
1019            mapping.parent_id = parent_id.clone();
1020            self.db.insert_category_mapping(&mapping)?;
1021        }
1022
1023        self.db.drop_category(category_id)?;
1024
1025        Ok(())
1026    }
1027
1028    fn remove_category_from_db_recurse(&self, category_id: &CategoryID) -> NewsFlashResult<()> {
1029        // remove childen feeds
1030        let mappings = self.db.read_feed_mappings(None, Some(category_id))?;
1031        tracing::trace!(%category_id, ?mappings, "drop feeds of category");
1032
1033        for mapping in mappings {
1034            self.db.drop_feed(&mapping.feed_id)?;
1035        }
1036
1037        // remove category
1038        self.db.drop_category(category_id)?;
1039
1040        // look for children categories and recurse
1041        let mappings = self.db.read_category_mappings(Some(category_id), None)?;
1042        tracing::trace!(%category_id, ?mappings, "drop child categories of category");
1043
1044        let categories = self.db.read_categories()?;
1045
1046        let child_categories: Vec<Category> = categories
1047            .into_iter()
1048            .filter(|category| mappings.iter().any(|mapping| mapping.category_id == category.category_id))
1049            .collect();
1050        for child_category in child_categories {
1051            self.remove_category_from_db_recurse(&child_category.category_id)?;
1052        }
1053
1054        Ok(())
1055    }
1056
1057    pub async fn rename_category(&self, category_id: &CategoryID, new_title: &str, client: &Client) -> NewsFlashResult<Category> {
1058        if self.is_offline.load(Ordering::Acquire) {
1059            return Err(NewsFlashError::Offline);
1060        }
1061
1062        if self.api.read().await.is_logged_in(client).await? {
1063            if self.is_sync_ongoing.load(Ordering::Acquire) {
1064                return Err(NewsFlashError::Syncing);
1065            }
1066
1067            let mut modified_category_mappings = self.db.read_category_mappings(None, Some(category_id))?;
1068            let mut modified_feed_mappings = self.db.read_feed_mappings(None, Some(category_id))?;
1069
1070            let new_id = self.api.read().await.rename_category(category_id, new_title, client).await?;
1071
1072            let mut modified_category = Category {
1073                category_id: category_id.clone(),
1074                label: new_title.to_owned(),
1075            };
1076
1077            if &new_id != category_id {
1078                self.db.drop_category(category_id)?;
1079                self.db.drop_feed_mappings_of_category(category_id)?;
1080                modified_category.category_id = new_id.clone();
1081
1082                // fix mappings
1083                modified_feed_mappings = modified_feed_mappings
1084                    .into_iter()
1085                    .map(|mut mapping| {
1086                        mapping.category_id = new_id.clone();
1087                        mapping
1088                    })
1089                    .collect();
1090
1091                modified_category_mappings = modified_category_mappings
1092                    .into_iter()
1093                    .map(|mut m| {
1094                        m.category_id = new_id.clone();
1095                        m
1096                    })
1097                    .collect();
1098            }
1099
1100            self.db.insert_category(&modified_category)?;
1101            self.db.insert_category_mappings(&modified_category_mappings)?;
1102            self.db.insert_feed_mappings(&modified_feed_mappings)?;
1103            return Ok(modified_category);
1104        }
1105        Err(NewsFlashError::NotLoggedIn)
1106    }
1107
1108    pub async fn move_category(&self, mapping: &CategoryMapping, client: &Client) -> NewsFlashResult<()> {
1109        if self.is_offline.load(Ordering::Acquire) {
1110            return Err(NewsFlashError::Offline);
1111        }
1112
1113        // if category was not moved to a subcategory: only alter local data
1114        if mapping.parent_id == *NEWSFLASH_TOPLEVEL {
1115            self.db.drop_mapping_of_category(&mapping.category_id)?;
1116            self.db.insert_category_mapping(mapping)?;
1117            Ok(())
1118        } else if self.api.read().await.is_logged_in(client).await? {
1119            if self.is_sync_ongoing.load(Ordering::Acquire) {
1120                return Err(NewsFlashError::Syncing);
1121            }
1122
1123            self.api
1124                .read()
1125                .await
1126                .move_category(&mapping.category_id, &mapping.parent_id, client)
1127                .await?;
1128
1129            self.db.drop_mapping_of_category(&mapping.category_id)?;
1130            self.db.insert_category_mapping(mapping)?;
1131            Ok(())
1132        } else {
1133            Err(NewsFlashError::NotLoggedIn)
1134        }
1135    }
1136
1137    pub async fn add_tag(&self, title: &str, color: Option<String>, client: &Client) -> NewsFlashResult<Tag> {
1138        if self.is_offline.load(Ordering::Acquire) {
1139            return Err(NewsFlashError::Offline);
1140        }
1141
1142        if self.api.read().await.is_logged_in(client).await? {
1143            if self.is_sync_ongoing.load(Ordering::Acquire) {
1144                return Err(NewsFlashError::Syncing);
1145            }
1146
1147            let tag_id = self.api.read().await.add_tag(title, client).await?;
1148            let tag = Tag {
1149                tag_id,
1150                label: title.to_owned(),
1151                color,
1152                sort_index: Some(i32::MAX),
1153            };
1154
1155            self.db.insert_tag(&tag)?;
1156            return Ok(tag);
1157        }
1158        Err(NewsFlashError::NotLoggedIn)
1159    }
1160
1161    pub async fn remove_tag(&self, tag_id: &TagID, client: &Client) -> NewsFlashResult<()> {
1162        if self.is_offline.load(Ordering::Acquire) {
1163            return Err(NewsFlashError::Offline);
1164        }
1165
1166        if self.api.read().await.is_logged_in(client).await? {
1167            if self.is_sync_ongoing.load(Ordering::Acquire) {
1168                return Err(NewsFlashError::Syncing);
1169            }
1170
1171            self.api.read().await.remove_tag(tag_id, client).await?;
1172
1173            self.db.drop_tag(tag_id)?;
1174            return Ok(());
1175        }
1176        Err(NewsFlashError::NotLoggedIn)
1177    }
1178
1179    pub async fn edit_tag(&self, tag_id: &TagID, new_title: &str, new_color: &Option<String>, client: &Client) -> NewsFlashResult<Tag> {
1180        if self.is_offline.load(Ordering::Acquire) {
1181            return Err(NewsFlashError::Offline);
1182        }
1183
1184        if self.api.read().await.is_logged_in(client).await? {
1185            if self.is_sync_ongoing.load(Ordering::Acquire) {
1186                return Err(NewsFlashError::Syncing);
1187            }
1188
1189            let new_id = self.api.read().await.rename_tag(tag_id, new_title, client).await?;
1190
1191            let taggings = self.db.read_taggings(None, Some(tag_id))?;
1192            self.db.drop_tag(tag_id)?;
1193            let sort_index = self
1194                .db
1195                .read_tags()?
1196                .into_iter()
1197                .find(|tag| &tag.tag_id == tag_id)
1198                .and_then(|tag| tag.sort_index);
1199            let mutated_tag = Tag {
1200                tag_id: new_id.clone(),
1201                label: new_title.to_owned(),
1202                color: new_color.clone(),
1203                sort_index,
1204            };
1205            self.db.insert_tag(&mutated_tag)?;
1206            self.db.insert_taggings(&taggings)?;
1207
1208            let taggings = if &new_id != tag_id {
1209                taggings
1210                    .into_iter()
1211                    .map(|mut tagging| {
1212                        tagging.tag_id = new_id.clone();
1213                        tagging
1214                    })
1215                    .collect()
1216            } else {
1217                taggings
1218            };
1219            self.db.insert_taggings(&taggings)?;
1220
1221            return Ok(mutated_tag);
1222        }
1223        Err(NewsFlashError::NotLoggedIn)
1224    }
1225
1226    pub async fn tag_article(&self, article_id: &ArticleID, tag_id: &TagID, client: &Client) -> NewsFlashResult<()> {
1227        let tagging = Tagging {
1228            article_id: article_id.clone(),
1229            tag_id: tag_id.clone(),
1230        };
1231
1232        if self.is_offline.load(Ordering::Acquire) {
1233            let action = OfflineAction {
1234                action_type: OfflineActionType::Tag,
1235                article_id: article_id.clone(),
1236                tag_id: Some(tag_id.clone()),
1237            };
1238            self.db.insert_offline_actions(&[action])?;
1239            self.db.insert_tagging(&tagging)?;
1240
1241            Ok(())
1242        } else if self.api.read().await.is_logged_in(client).await? {
1243            if self.is_sync_ongoing.load(Ordering::Acquire) {
1244                self.sync_cache.lock().await.add_article_tagged(article_id, tag_id);
1245            } else {
1246                self.api.read().await.tag_article(article_id, tag_id, client).await?;
1247                self.db.insert_tagging(&tagging)?;
1248            }
1249
1250            Ok(())
1251        } else {
1252            Err(NewsFlashError::NotLoggedIn)
1253        }
1254    }
1255
1256    pub async fn untag_article(&self, article_id: &ArticleID, tag_id: &TagID, client: &Client) -> NewsFlashResult<()> {
1257        let tagging = Tagging {
1258            article_id: article_id.clone(),
1259            tag_id: tag_id.clone(),
1260        };
1261
1262        if self.is_offline.load(Ordering::Acquire) {
1263            let action = OfflineAction {
1264                action_type: OfflineActionType::Untag,
1265                article_id: article_id.clone(),
1266                tag_id: Some(tag_id.clone()),
1267            };
1268            self.db.insert_offline_actions(&[action])?;
1269            self.db.drop_tagging(&tagging)?;
1270
1271            Ok(())
1272        } else if self.api.read().await.is_logged_in(client).await? {
1273            if self.is_sync_ongoing.load(Ordering::Acquire) {
1274                self.sync_cache.lock().await.add_article_untagged(article_id, tag_id);
1275            } else {
1276                self.api.read().await.untag_article(article_id, tag_id, client).await?;
1277                self.db.drop_tagging(&tagging)?;
1278            }
1279            Ok(())
1280        } else {
1281            Err(NewsFlashError::NotLoggedIn)
1282        }
1283    }
1284
1285    pub async fn import_opml(&self, opml: &str, parse_all_feeds: bool, client: &Client) -> NewsFlashResult<()> {
1286        if self.is_offline.load(Ordering::Acquire) {
1287            return Err(NewsFlashError::Offline);
1288        }
1289
1290        if self.api.read().await.is_logged_in(client).await? {
1291            if self.is_sync_ongoing.load(Ordering::Acquire) {
1292                return Err(NewsFlashError::Syncing);
1293            }
1294
1295            self.api.read().await.import_opml(opml, client).await?;
1296            let opml_result = opml::parse_opml(opml, parse_all_feeds, self.download_semaphore.clone(), client).await?;
1297
1298            self.db.insert_categories(&opml_result.categories)?;
1299            self.db.insert_feeds(&opml_result.feeds)?;
1300            self.db.insert_feed_mappings(&opml_result.feed_mappings)?;
1301            self.db.insert_category_mappings(&opml_result.category_mappings)?;
1302            return Ok(());
1303        }
1304        Err(NewsFlashError::NotLoggedIn)
1305    }
1306
1307    pub async fn export_opml(&self) -> NewsFlashResult<String> {
1308        if self.is_offline.load(Ordering::Acquire) {
1309            return Err(NewsFlashError::Offline);
1310        }
1311
1312        let categories = self.db.read_categories()?;
1313        let category_mappings = self.db.read_category_mappings(None, None)?;
1314        let feeds = self.db.read_feeds()?;
1315        let feed_mappings = self.db.read_feed_mappings(None, None)?;
1316
1317        let opml_string = opml::generate_opml(&categories, &category_mappings, &feeds, &feed_mappings)?;
1318        Ok(opml_string)
1319    }
1320
1321    pub fn get_categories(&self) -> NewsFlashResult<(Vec<Category>, Vec<CategoryMapping>)> {
1322        let categories = self.db.read_categories()?;
1323        let category_mappings = self.db.read_category_mappings(None, None)?;
1324        Ok((categories, category_mappings))
1325    }
1326
1327    pub fn get_feeds(&self) -> NewsFlashResult<(Vec<Feed>, Vec<FeedMapping>)> {
1328        let feeds = self.db.read_feeds()?;
1329        let mappings = self.db.read_feed_mappings(None, None)?;
1330        Ok((feeds, mappings))
1331    }
1332
1333    pub fn unread_count_feed_map(&self, exclude_future: bool) -> NewsFlashResult<HashMap<FeedID, i64>> {
1334        let map = self.db.unread_count_feed_map(exclude_future)?;
1335        Ok(map)
1336    }
1337
1338    pub fn marked_count_feed_map(&self) -> NewsFlashResult<HashMap<FeedID, i64>> {
1339        let mut count_vec = self.db.marked_count_feed_map()?;
1340        let mut map: HashMap<FeedID, i64> = HashMap::new();
1341        count_vec.drain(..).for_each(|c| {
1342            map.insert(c.feed_id, c.count);
1343        });
1344        Ok(map)
1345    }
1346
1347    pub fn today_unread_count(&self, exclude_future: bool) -> NewsFlashResult<i64> {
1348        let count = self.db.today_unread_count(exclude_future)?;
1349        Ok(count)
1350    }
1351
1352    pub fn today_marked_count(&self) -> NewsFlashResult<i64> {
1353        let count = self.db.today_marked_count()?;
1354        Ok(count)
1355    }
1356
1357    pub fn get_tags(&self) -> NewsFlashResult<(Vec<Tag>, Vec<Tagging>)> {
1358        let tags = self.db.read_tags()?;
1359        let taggings = self.db.read_taggings(None, None)?;
1360        Ok((tags, taggings))
1361    }
1362
1363    pub fn get_tags_of_article(&self, article_id: &ArticleID) -> NewsFlashResult<Vec<Tag>> {
1364        let tags = self.db.read_tags_for_article(article_id)?;
1365        Ok(tags)
1366    }
1367
1368    pub fn unread_count_all(&self) -> NewsFlashResult<i64> {
1369        let count = self.db.unread_count_all()?;
1370        Ok(count)
1371    }
1372
1373    pub fn get_articles(&self, filter: ArticleFilter) -> NewsFlashResult<Vec<Article>> {
1374        let articles = self.db.read_articles(filter)?;
1375        Ok(articles)
1376    }
1377
1378    pub fn get_article(&self, id: &ArticleID) -> NewsFlashResult<Article> {
1379        let article = self.db.read_article(id)?;
1380        Ok(article)
1381    }
1382
1383    pub fn get_fat_articles(&self, filter: ArticleFilter) -> NewsFlashResult<Vec<FatArticle>> {
1384        let articles = self.db.read_fat_articles(filter)?;
1385        Ok(articles)
1386    }
1387
1388    pub fn get_fat_article(&self, id: &ArticleID) -> NewsFlashResult<FatArticle> {
1389        let article = self.db.read_fat_article(id)?;
1390        Ok(article)
1391    }
1392
1393    pub fn get_enclosures(&self, id: &ArticleID) -> NewsFlashResult<Vec<Enclosure>> {
1394        let enclosures = self.db.read_enclosures(id)?;
1395        Ok(enclosures)
1396    }
1397
1398    pub fn update_enclosure(&self, enclosure: &Enclosure) -> NewsFlashResult<()> {
1399        self.db.write_enclosures(std::slice::from_ref(enclosure))?;
1400        Ok(())
1401    }
1402
1403    #[cfg(feature = "article-scraper")]
1404    pub async fn scrap_content_feeds(&self, synced_after: DateTime<Utc>, feeds: &[FeedID], client: &Client) -> NewsFlashResult<()> {
1405        if self.is_sync_ongoing.load(Ordering::Acquire) {
1406            return Err(NewsFlashError::Syncing);
1407        }
1408
1409        let article_scraper = self.init_scraper().await;
1410
1411        let articles = self
1412            .db
1413            .read_fat_articles(ArticleFilter {
1414                feeds: Some(feeds.to_owned()),
1415                synced_after: Some(synced_after),
1416                ..Default::default()
1417            })?
1418            .into_iter()
1419            .filter(|article| article.scraped_content.is_none())
1420            .collect::<Vec<_>>();
1421
1422        let mut task_handles = Vec::new();
1423
1424        for article in articles.into_iter() {
1425            let semaphore = self.download_semaphore.clone();
1426            let client = client.clone();
1427            let article_scraper = article_scraper.clone();
1428
1429            task_handles.push(tokio::spawn(async move {
1430                Self::article_scrap_content_impl(article, article_scraper, semaphore, &client).await
1431            }));
1432        }
1433
1434        let result_vec = futures::future::join_all(task_handles)
1435            .await
1436            .into_iter()
1437            .flatten()
1438            .flatten()
1439            .collect::<Vec<_>>();
1440
1441        self.db.update_articles_grabbed_content(&result_vec)?;
1442
1443        Ok(())
1444    }
1445
1446    #[cfg(feature = "article-scraper")]
1447    pub async fn scrap_content_article(&self, id: &ArticleID, client: &Client) -> NewsFlashResult<FatArticle> {
1448        let article_scraper = self.init_scraper().await;
1449        let article = self.db.read_fat_article(id)?;
1450        let scraped_article = Self::article_scrap_content_impl(article, article_scraper, self.download_semaphore.clone(), client).await?;
1451        self.db.update_article_grabbed_content(&scraped_article)?;
1452        Ok(scraped_article)
1453    }
1454
1455    #[cfg(feature = "article-scraper")]
1456    async fn article_scrap_content_impl(
1457        mut article: FatArticle,
1458        article_scraper: Arc<ArticleScraper>,
1459        download_semaphore: Arc<Semaphore>,
1460        client: &Client,
1461    ) -> NewsFlashResult<FatArticle> {
1462        let Some(url) = &article.url else {
1463            tracing::error!("Article doesn't contain source URL");
1464            return Err(NewsFlashError::GrabContent);
1465        };
1466
1467        let permit = download_semaphore.acquire().await?;
1468        #[cfg(feature = "image-downloader")]
1469        let result = article_scraper.parse(url, client, false, None);
1470        #[cfg(not(feature = "image-downloader"))]
1471        let result = article_scraper.parse(url, client);
1472
1473        let processed_article = result.await.map_err(|error| {
1474            tracing::error!(%url, %error, "Internal scraper failed");
1475            NewsFlashError::GrabContent
1476        })?;
1477        drop(permit);
1478
1479        tracing::info!("Internal scraper: successfully scraped: '{url}'");
1480        if let Some(html) = processed_article.html {
1481            article.plain_text = Some(util::html2text::html2text(&html));
1482            article.scraped_content = Some(html);
1483        }
1484        if let Some(title) = processed_article.title
1485            && article.title.is_none()
1486        {
1487            article.title = Some(title);
1488        }
1489        if let Some(author) = processed_article.author
1490            && article.author.is_none()
1491        {
1492            article.author = Some(author);
1493        }
1494        if let Some(thumbnail_url) = processed_article.thumbnail_url
1495            && article.thumbnail_url.is_none()
1496        {
1497            article.thumbnail_url = Some(thumbnail_url);
1498        }
1499
1500        Ok(article)
1501    }
1502
1503    #[cfg(feature = "article-scraper")]
1504    async fn init_scraper(&self) -> Arc<ArticleScraper> {
1505        if self.scraper.read().await.is_none() {
1506            tracing::info!("Initialize ArticleScraper");
1507            let scraper_data_dir = self.data_dir.join(SCRAPER_DATA_DIR);
1508
1509            let scraper_data_dir = if std::fs::DirBuilder::new().recursive(true).create(&scraper_data_dir).is_ok() {
1510                Some(scraper_data_dir)
1511            } else {
1512                None
1513            };
1514
1515            let scraper = ArticleScraper::new(scraper_data_dir.as_deref()).await;
1516            let scraper = Arc::new(scraper);
1517            self.scraper.write().await.replace(scraper.clone());
1518        }
1519
1520        match self.scraper.read().await.as_ref() {
1521            Some(scraper) => scraper.clone(),
1522            None => unreachable!(),
1523        }
1524    }
1525
1526    #[cfg(feature = "image-downloader")]
1527    pub async fn get_image(
1528        &self,
1529        article_id: &ArticleID,
1530        url: &str,
1531        client: &Client,
1532        progress: Option<Sender<DownloadProgress>>,
1533    ) -> NewsFlashResult<Image> {
1534        let image_dir = self.data_dir.join(IMAGE_DATA_DIR);
1535        std::fs::DirBuilder::new().recursive(true).create(&image_dir)?;
1536
1537        // check db if image exists
1538        let parsed_url = Url::parse(url)?;
1539        if let Ok(image) = self.db.read_image(&parsed_url) {
1540            if let Ok(mut file) = tokio::fs::File::open(&image.file_path).await {
1541                let mut contents = vec![];
1542                file.read_to_end(&mut contents).await?;
1543                return Ok(Image::from_metadata(image, contents));
1544            } else {
1545                tracing::warn!(%image.file_path, "Failed to open file");
1546            }
1547        }
1548
1549        let permit = self.download_semaphore.acquire().await?;
1550        let res = article_scraper::images::ImageDownloader::single_from_url(url, client, progress).await?;
1551        drop(permit);
1552
1553        let file_name = sanitize_filename::sanitize(format!("{article_id}_{url}"));
1554        let path = image_dir.join(file_name);
1555        tracing::debug!(?path, "writing image");
1556        tokio::fs::write(&path, &res)
1557            .await
1558            .inspect_err(|error| tracing::error!(%error, "Failed to write image"))?;
1559
1560        let (width, height) = if let Ok((width, height)) = image::image_dimensions(&path) {
1561            (Some(width as i32), Some(height as i32))
1562        } else {
1563            (None, None)
1564        };
1565
1566        if let Ok(mut enclosure) = self.db.read_enclosure(article_id, &parsed_url) {
1567            enclosure.width = width;
1568            enclosure.height = height;
1569
1570            self.db.write_enclosures(&[enclosure])?;
1571        }
1572
1573        let image = ImageMetadata {
1574            image_url: parsed_url,
1575            article_id: article_id.clone(),
1576            file_path: path.to_string_lossy().into(),
1577            width,
1578            height,
1579        };
1580
1581        self.db.write_image(&image)?;
1582
1583        Ok(Image::from_metadata(image, res))
1584    }
1585
1586    #[cfg(feature = "image-downloader")]
1587    pub fn delete_all_images(&self) -> NewsFlashResult<()> {
1588        self.db.drop_all_images()?;
1589        std::fs::remove_dir_all(self.data_dir.join(IMAGE_DATA_DIR))?;
1590        Ok(())
1591    }
1592
1593    #[cfg(feature = "image-downloader")]
1594    async fn delete_orphaned_images(&self) -> NewsFlashResult<()> {
1595        let images = self.db.read_images()?;
1596        let db_images = images
1597            .into_iter()
1598            .map(|image| {
1599                let mut path = PathBuf::new();
1600                path.push(image.file_path);
1601                path
1602            })
1603            .collect::<HashSet<PathBuf>>();
1604
1605        let dir = match std::fs::read_dir(self.data_dir.join(IMAGE_DATA_DIR)) {
1606            Ok(dir) => dir,
1607            Err(error) => {
1608                tracing::warn!(%error, "failed to read image directory");
1609                return Ok(());
1610            }
1611        };
1612        let existing_images = dir
1613            .into_iter()
1614            .filter_map(|item| item.ok().map(|i| i.path()))
1615            .collect::<HashSet<PathBuf>>();
1616
1617        let difference = existing_images.difference(&db_images);
1618
1619        for file_path in difference {
1620            if let Err(error) = std::fs::remove_file(file_path) {
1621                tracing::warn!(?file_path, %error, "Failed to delete file");
1622            }
1623        }
1624
1625        Ok(())
1626    }
1627}