1#[macro_use]
2extern crate diesel;
3#[macro_use]
4extern crate diesel_migrations;
5
6mod action_cache;
7mod builder;
8mod config;
9mod database;
10mod default_portal;
11pub mod error;
12pub mod feed_api;
13mod feed_api_implementations;
14pub mod models;
15mod password_encryption;
16mod schema;
17pub mod util;
18
19#[cfg(test)]
20mod tests;
21
22use crate::action_cache::ActionCache;
23use crate::builder::NewsFlashBuilder;
24use crate::config::ConfigProxy;
25use crate::database::DatabaseExt;
26use crate::default_portal::DefaultPortal;
27use crate::error::NewsFlashError;
28use crate::feed_api::FeedApi;
29use crate::feed_api_implementations::FeedApiImplementations;
30use crate::models::{
31 Article, ArticleFilter, ArticleID, Category, CategoryID, DatabaseSize, Enclosure, FatArticle, FatFavIcon, FavIcon, Feed, FeedID, FeedMapping,
32 LoginData, Marked, NEWSFLASH_TOPLEVEL, PluginCapabilities, PluginID, PluginInfo, Read, Tag, TagID, Tagging, Url,
33};
34
35use crate::util::favicons::FavIconLoader;
36pub use crate::util::feed_parser::{self, ParsedUrl};
37use crate::util::opml;
38use chrono::{DateTime, Duration, Utc};
39use feed_api::FeedHeaderMap;
40use models::{CategoryMapping, OfflineAction, OfflineActionType, Thumbnail};
41use reqwest::Client;
42use reqwest::header::{HeaderMap, HeaderValue};
43use std::collections::hash_map::HashMap;
44use std::sync::Arc;
45use std::sync::atomic::{AtomicBool, Ordering};
46use tokio::sync::{Mutex, RwLock, Semaphore};
47
48#[cfg(feature = "article-scraper")]
49use article_scraper::ArticleScraper;
50#[cfg(feature = "article-scraper")]
51pub use article_scraper::DownloadProgress;
52
53#[cfg(feature = "image-downloader")]
54use crate::models::{Image, ImageMetadata};
55#[cfg(feature = "image-downloader")]
56use futures::channel::mpsc::Sender;
57#[cfg(feature = "image-downloader")]
58use std::collections::HashSet;
59#[cfg(feature = "image-downloader")]
60use tokio::io::AsyncReadExt;
61
62#[cfg(any(feature = "article-scraper", feature = "image-downloader"))]
63use std::path::PathBuf;
64
65#[cfg(feature = "article-scraper")]
66static SCRAPER_DATA_DIR: &str = "ftr-site-config";
67
68#[cfg(feature = "image-downloader")]
69static IMAGE_DATA_DIR: &str = "pictures";
70
71type NewsFlashResult<T> = Result<T, NewsFlashError>;
72
73pub struct NewsFlash {
74 #[cfg(any(feature = "article-scraper", feature = "image-downloader"))]
75 data_dir: PathBuf,
76 db: Arc<Box<dyn DatabaseExt>>,
77 api: RwLock<Box<dyn FeedApi>>,
78 config: Arc<RwLock<ConfigProxy>>,
79 download_semaphore: Arc<Semaphore>,
80 icons: FavIconLoader,
81 #[cfg(feature = "article-scraper")]
82 scraper: RwLock<Option<Arc<ArticleScraper>>>,
83 sync_cache: Mutex<ActionCache>,
84 is_sync_ongoing: Arc<AtomicBool>,
85 is_offline: Arc<AtomicBool>,
86}
87
88impl NewsFlash {
89 pub fn list_backends() -> HashMap<PluginID, PluginInfo> {
90 let mut map: HashMap<PluginID, PluginInfo> = HashMap::new();
91 for api_meta in FeedApiImplementations::list() {
92 map.insert(api_meta.id(), api_meta.info().unwrap());
93 }
94 map
95 }
96
97 pub fn builder() -> NewsFlashBuilder {
98 NewsFlashBuilder::default()
99 }
100
101 pub async fn id(&self) -> Option<PluginID> {
102 self.config.read().await.get_backend()
103 }
104
105 pub async fn user_name(&self) -> Option<String> {
106 self.api.read().await.user_name().await
107 }
108
109 pub async fn features(&self) -> NewsFlashResult<PluginCapabilities> {
110 let features = self.api.read().await.features()?;
111 Ok(features)
112 }
113
114 pub async fn get_login_data(&self) -> Option<LoginData> {
115 self.api.read().await.get_login_data().await
116 }
117
118 pub async fn last_sync(&self) -> DateTime<Utc> {
119 self.config.read().await.get_last_sync()
120 }
121
122 pub fn is_sync_ongoing(&self) -> bool {
123 self.is_sync_ongoing.load(Ordering::Acquire)
124 }
125
126 pub fn is_offline(&self) -> bool {
127 self.is_offline.load(Ordering::Acquire)
128 }
129
130 pub fn get_semaphore(&self) -> Arc<Semaphore> {
131 self.download_semaphore.clone()
132 }
133
134 pub async fn set_offline(&self, offline: bool, client: &Client) -> NewsFlashResult<()> {
135 self.is_offline.store(offline, Ordering::Release);
136
137 if !offline {
138 let offline_actions = self.db.read_offline_actions()?;
139 self.db.drop_offline_actions()?;
140 let mut action_cache = ActionCache::new();
141
142 for offline_action in offline_actions {
143 match offline_action.action_type {
144 OfflineActionType::Read => action_cache.add_article_marked_read(&offline_action.article_id),
145 OfflineActionType::Unread => action_cache.add_article_marked_unread(&offline_action.article_id),
146 OfflineActionType::Mark => action_cache.add_article_mark(&offline_action.article_id),
147 OfflineActionType::Unmark => action_cache.add_article_unmark(&offline_action.article_id),
148 OfflineActionType::Tag => {
149 if let Some(tag_id) = offline_action.tag_id {
150 action_cache.add_article_tagged(&offline_action.article_id, &tag_id);
151 }
152 }
153 OfflineActionType::Untag => {
154 if let Some(tag_id) = offline_action.tag_id {
155 action_cache.add_article_untagged(&offline_action.article_id, &tag_id);
156 }
157 }
158 }
159 }
160
161 self.sync_cache.lock().await.execute_api_actions(&self.api, client).await?;
162 }
163
164 Ok(())
165 }
166
167 pub fn is_database_empty(&self) -> NewsFlashResult<bool> {
168 let is_empty = self.db.is_empty()?;
169 Ok(is_empty)
170 }
171
172 pub async fn set_keep_articles_duration(&self, keep_articles: Option<chrono::Duration>) -> NewsFlashResult<()> {
173 self.config.write().await.set_keep_articles_duration(keep_articles)?;
174 if let Some(keep_articles) = keep_articles {
175 self.db.drop_old_articles(keep_articles)?;
176
177 #[cfg(feature = "image-downloader")]
178 self.delete_orphaned_images().await?;
179 }
180
181 Ok(())
182 }
183
184 pub async fn get_keep_articles_duration(&self) -> Option<chrono::Duration> {
185 self.config.read().await.get_keep_articles_duration()
186 }
187
188 pub fn error_login_related(error: &NewsFlashError) -> bool {
189 matches!(error, NewsFlashError::LoadBackend | NewsFlashError::NotLoggedIn | NewsFlashError::API(_))
190 }
191
192 pub async fn is_reachable(&self, client: &Client) -> NewsFlashResult<bool> {
193 let reachable = self.api.read().await.is_reachable(client).await?;
194 Ok(reachable)
195 }
196
197 pub async fn get_icon(&self, feed_id: &FeedID, client: &Client, header: HeaderMap<HeaderValue>) -> NewsFlashResult<FavIcon> {
198 let info = self.icons.get_icon(feed_id, &self.api, client, header).await?;
199 Ok(info)
200 }
201
202 pub fn load_icon_from_db(&self, feed_id: &FeedID) -> NewsFlashResult<FatFavIcon> {
203 let icon = self.db.read_fatfavicon(feed_id)?;
204 Ok(icon)
205 }
206
207 pub async fn get_article_thumbnail(&self, article_id: &ArticleID, client: &Client) -> NewsFlashResult<Option<Thumbnail>> {
208 if let Ok(thumbnail) = self.db.read_thumbnail(article_id) {
209 if thumbnail.data.is_some() {
210 return Ok(Some(thumbnail));
211 }
212
213 if thumbnail.last_try + Duration::try_days(4).unwrap() > Utc::now() {
214 tracing::debug!(%article_id, "Tried to download thumbnail recently, will not attemt to download again.");
215 return Ok(None);
216 }
217 }
218
219 let Ok(article) = self.db.read_article(article_id) else {
220 tracing::warn!(%article_id, "Couldn't download thumbnail: article not found");
221 return Ok(None);
222 };
223
224 let Some(thumbnail_url) = article.thumbnail_url.as_deref() else {
225 tracing::debug!(%article_id, "Couldn't download thumbnail: article doesn't specify thumbnail url");
226 return Ok(None);
227 };
228
229 let thumbnail_result = Thumbnail::from_url(thumbnail_url, article_id, client).await;
230 match thumbnail_result {
231 Ok(thumbnail) => {
232 self.db.insert_thumbnail(&thumbnail)?;
233 Ok(Some(thumbnail))
234 }
235 Err(error) => {
236 let empty_thumbnail = Thumbnail::empty(article_id);
237 self.db.insert_thumbnail(&empty_thumbnail)?;
238 tracing::warn!(%thumbnail_url, %error, "downloading thumbnail failed");
239 Err(NewsFlashError::Thumbnail)
240 }
241 }
242 }
243
244 pub fn database_size(&self) -> NewsFlashResult<DatabaseSize> {
245 let size = self.db.size()?;
246 Ok(size)
247 }
248
249 pub async fn login(&self, data: LoginData, client: &Client) -> NewsFlashResult<()> {
250 let id = data.id();
251 self.api.write().await.login(data, client).await?;
252 self.config.write().await.set_backend(Some(&id))?;
253 Ok(())
254 }
255
256 pub async fn logout(&self, client: &Client) -> NewsFlashResult<()> {
257 self.config.write().await.set_backend(None)?;
258 self.api.write().await.logout(client).await?;
259 self.db.reset()?;
260 Ok(())
261 }
262
263 pub fn clean_db(&self) -> NewsFlashResult<()> {
264 self.db.clean()?;
265 Ok(())
266 }
267
268 pub async fn initial_sync(&self, client: &Client, header: FeedHeaderMap) -> NewsFlashResult<HashMap<FeedID, i64>> {
269 if self.is_offline.load(Ordering::Acquire) {
270 return Err(NewsFlashError::Offline);
271 }
272
273 if self.is_sync_ongoing.load(Ordering::Acquire) {
274 tracing::warn!("sync is already ongoing");
275 return Err(NewsFlashError::Syncing);
276 }
277
278 if self.api.read().await.is_logged_in(client).await? {
279 self.is_sync_ongoing.store(true, Ordering::Release);
280
281 let now = chrono::Utc::now();
282 let keep_articles_duration = self.config.read().await.get_keep_articles_duration();
283
284 let result = self.api.read().await.initial_sync(client, header).await;
285 if result.is_err() {
286 self.is_sync_ongoing.store(true, Ordering::Release);
287 }
288 let sync_result = result?;
289 let sync_result = sync_result.remove_old_articles(keep_articles_duration);
291 let sync_result = sync_result.generate_tag_colors(&[]);
292 let sync_result = self.sync_cache.lock().await.process_sync_result(sync_result);
294 let result = self.sync_cache.lock().await.execute_api_actions(&self.api, client).await;
296 if let Err(cache_execution_error) = result {
297 self.is_sync_ongoing.store(false, Ordering::Release);
298 return Err(cache_execution_error.into());
299 }
300 self.sync_cache.lock().await.reset();
302
303 let result = self.db.write_sync_result(sync_result, keep_articles_duration);
304 if result.is_err() {
305 self.is_sync_ongoing.store(false, Ordering::Release);
306 }
307 let new_article_count = result?;
308
309 #[cfg(feature = "image-downloader")]
310 self.delete_orphaned_images().await?;
311
312 if let Err(write_config_error) = self.config.write().await.set_last_sync(now) {
313 self.is_sync_ongoing.store(false, Ordering::Release);
314 return Err(write_config_error.into());
315 }
316
317 self.is_sync_ongoing.store(false, Ordering::Release);
318 return Ok(new_article_count);
319 }
320 Err(NewsFlashError::NotLoggedIn)
321 }
322
323 pub async fn sync(&self, client: &Client, header: FeedHeaderMap) -> NewsFlashResult<HashMap<FeedID, i64>> {
324 if self.is_offline.load(Ordering::Acquire) {
325 return Err(NewsFlashError::Offline);
326 }
327
328 if self.is_sync_ongoing.load(Ordering::Acquire) {
329 tracing::warn!("sync is already ongoing");
330 return Err(NewsFlashError::Syncing);
331 }
332
333 if self.api.read().await.is_logged_in(client).await? {
334 self.is_sync_ongoing.store(true, Ordering::Release);
335
336 let now = chrono::Utc::now();
337 let keep_articles_duration = self.config.read().await.get_keep_articles_duration();
338
339 let sync_result = self.api.read().await.sync(client, header).await;
340 if sync_result.is_err() {
341 self.is_sync_ongoing.store(false, Ordering::Release);
342 }
343 let sync_result = sync_result?;
344
345 let sync_result = sync_result.remove_old_articles(keep_articles_duration);
347
348 let tags = self.db.read_tags()?;
349 let sync_result = sync_result.generate_tag_colors(&tags);
350 let sync_result = self.sync_cache.lock().await.process_sync_result(sync_result);
352 let result = self.sync_cache.lock().await.execute_api_actions(&self.api, client).await;
354 if let Err(cache_execution_error) = result {
355 self.is_sync_ongoing.store(false, Ordering::Release);
356 return Err(cache_execution_error.into());
357 }
358 self.sync_cache.lock().await.reset();
360
361 let result = self.db.write_sync_result(sync_result, keep_articles_duration);
362 if result.is_err() {
363 self.is_sync_ongoing.store(false, Ordering::Release);
364 }
365 let new_article_count = result?;
366
367 #[cfg(feature = "image-downloader")]
368 self.delete_orphaned_images().await?;
369
370 if let Err(write_config_error) = self.config.write().await.set_last_sync(now) {
371 self.is_sync_ongoing.store(false, Ordering::Release);
372 return Err(write_config_error.into());
373 }
374
375 self.is_sync_ongoing.store(false, Ordering::Release);
376 return Ok(new_article_count);
377 }
378 Err(NewsFlashError::NotLoggedIn)
379 }
380
381 pub async fn fetch_feed(&self, feed_id: &FeedID, client: &Client, header: HeaderMap<HeaderValue>) -> NewsFlashResult<i64> {
382 if self.is_offline.load(Ordering::Acquire) {
383 return Err(NewsFlashError::Offline);
384 }
385
386 if self.is_sync_ongoing.load(Ordering::Acquire) {
387 tracing::warn!("sync is already ongoing");
388 return Err(NewsFlashError::Syncing);
389 }
390
391 if self.api.read().await.is_logged_in(client).await? {
392 self.is_sync_ongoing.store(true, Ordering::Release);
393
394 let keep_articles_duration = self.config.read().await.get_keep_articles_duration();
395
396 let result = self.api.read().await.fetch_feed(feed_id, client, header).await;
397 if result.is_err() {
398 self.is_sync_ongoing.store(false, Ordering::Release);
399 }
400 let update_result = result?;
401 let update_result = update_result.remove_old_articles(keep_articles_duration);
403 let result = self.sync_cache.lock().await.execute_api_actions(&self.api, client).await;
405 if let Err(cache_execution_error) = result {
406 self.is_sync_ongoing.store(false, Ordering::Release);
407 return Err(cache_execution_error.into());
408 }
409 self.sync_cache.lock().await.reset();
411
412 let result = self.db.write_feed_update_result(feed_id, update_result);
413 if result.is_err() {
414 self.is_sync_ongoing.store(false, Ordering::Release);
415 }
416 let new_article_count = result?;
417
418 self.is_sync_ongoing.store(false, Ordering::Release);
419 Ok(new_article_count)
420 } else {
421 Err(NewsFlashError::NotLoggedIn)
422 }
423 }
424
425 pub async fn set_article_read(&self, articles: &[ArticleID], read: Read, client: &Client) -> NewsFlashResult<()> {
426 if self.is_offline.load(Ordering::Acquire) {
427 let action_type = if read == Read::Read {
428 OfflineActionType::Read
429 } else {
430 OfflineActionType::Unread
431 };
432 let actions = articles
433 .iter()
434 .map(|article_id| OfflineAction {
435 action_type,
436 article_id: article_id.clone(),
437 tag_id: None,
438 })
439 .collect::<Vec<_>>();
440 self.db.insert_offline_actions(&actions)?;
441 self.db.set_article_read(articles, read)?;
442
443 Ok(())
444 } else if self.api.read().await.is_logged_in(client).await? {
445 if self.is_sync_ongoing.load(Ordering::Acquire) {
446 for article_id in articles {
447 match read {
448 Read::Read => self.sync_cache.lock().await.add_article_marked_read(article_id),
449 Read::Unread => self.sync_cache.lock().await.add_article_marked_unread(article_id),
450 }
451 }
452 self.db.set_article_read(articles, read)?;
453 } else {
454 let articles_before = self.db.read_articles(ArticleFilter::read_ids(articles.into(), read.invert()))?;
455 self.db.set_article_read(articles, read)?;
456
457 let api_result = self.api.read().await.set_article_read(articles, read, client).await;
458
459 if api_result.is_err() {
461 let ids_before = articles_before.iter().map(|a| a.article_id.clone()).collect::<Vec<_>>();
462 self.db.set_article_read(&ids_before, read.invert())?;
463 }
464
465 api_result?;
466 }
467
468 Ok(())
469 } else {
470 Err(NewsFlashError::NotLoggedIn)
471 }
472 }
473
474 pub async fn set_article_marked(&self, articles: &[ArticleID], marked: Marked, client: &Client) -> NewsFlashResult<()> {
475 if self.is_offline.load(Ordering::Acquire) {
476 let action_type = if marked == Marked::Marked {
477 OfflineActionType::Mark
478 } else {
479 OfflineActionType::Unmark
480 };
481 let actions = articles
482 .iter()
483 .map(|article_id| OfflineAction {
484 action_type,
485 article_id: article_id.clone(),
486 tag_id: None,
487 })
488 .collect::<Vec<_>>();
489 self.db.insert_offline_actions(&actions)?;
490 self.db.set_article_marked(articles, marked)?;
491
492 Ok(())
493 } else if self.api.read().await.is_logged_in(client).await? {
494 if self.is_sync_ongoing.load(Ordering::Acquire) {
495 for article_id in articles {
496 match marked {
497 Marked::Marked => self.sync_cache.lock().await.add_article_mark(article_id),
498 Marked::Unmarked => self.sync_cache.lock().await.add_article_unmark(article_id),
499 }
500 }
501
502 self.db.set_article_marked(articles, marked)?;
503 } else {
504 let articles_before = self.db.read_articles(ArticleFilter::marked_ids(articles.into(), marked.invert()))?;
505 self.db.set_article_marked(articles, marked)?;
506
507 let article_ids_to_update = articles_before.iter().map(|a| &a.article_id).cloned().collect::<Vec<_>>();
508 let api_result = self.api.read().await.set_article_marked(&article_ids_to_update, marked, client).await;
509
510 if api_result.is_err() {
512 let ids_before = articles_before.iter().map(|a| a.article_id.clone()).collect::<Vec<_>>();
513 self.db.set_article_marked(&ids_before, marked.invert())?;
514 }
515
516 api_result?;
517 }
518
519 Ok(())
520 } else {
521 Err(NewsFlashError::NotLoggedIn)
522 }
523 }
524
525 pub async fn set_feed_read(&self, feeds: &[FeedID], client: &Client) -> NewsFlashResult<()> {
526 if self.is_offline.load(Ordering::Acquire) {
527 let mut actions = Vec::new();
528
529 for feed in feeds {
530 let filter = ArticleFilter::feed_unread(feed);
531 let articles = self.db.read_articles(filter)?;
532 actions.append(
533 &mut articles
534 .into_iter()
535 .map(|article| OfflineAction {
536 action_type: OfflineActionType::Read,
537 article_id: article.article_id,
538 tag_id: None,
539 })
540 .collect(),
541 );
542 }
543
544 self.db.insert_offline_actions(&actions)?;
545 self.db.set_feed_read(feeds)?;
546
547 Ok(())
548 } else if self.api.read().await.is_logged_in(client).await? {
549 if self.is_sync_ongoing.load(Ordering::Acquire) {
550 for feed_id in feeds {
551 self.sync_cache.lock().await.add_feed_mark_read(feed_id);
552 }
553 self.db.set_feed_read(feeds)?;
554 } else {
555 let mut unread_articles_before: Vec<ArticleID> = Vec::new();
556 for feed_id in feeds {
557 let articles_before = self.db.read_articles(ArticleFilter::feed_unread(feed_id))?;
558 unread_articles_before.append(&mut articles_before.into_iter().map(|a| a.article_id).collect());
559 }
560
561 self.db.set_feed_read(feeds)?;
562
563 let api_result = self.api.read().await.set_feed_read(feeds, &unread_articles_before, client).await;
564
565 if api_result.is_err() {
567 self.db.set_article_read(&unread_articles_before, Read::Unread)?;
568 }
569
570 api_result?;
571 }
572
573 Ok(())
574 } else {
575 Err(NewsFlashError::NotLoggedIn)
576 }
577 }
578
579 pub async fn set_category_read(&self, categories: &[CategoryID], client: &Client) -> NewsFlashResult<()> {
580 if self.is_offline.load(Ordering::Acquire) {
581 let mut actions = Vec::new();
582
583 for category in categories {
584 let filter = ArticleFilter::category_unread(category);
585 let articles = self.db.read_articles(filter)?;
586 actions.append(
587 &mut articles
588 .into_iter()
589 .map(|article| OfflineAction {
590 action_type: OfflineActionType::Read,
591 article_id: article.article_id,
592 tag_id: None,
593 })
594 .collect(),
595 );
596 }
597
598 self.db.insert_offline_actions(&actions)?;
599 self.db.set_category_read(categories)?;
600
601 Ok(())
602 } else if self.api.read().await.is_logged_in(client).await? {
603 if self.is_sync_ongoing.load(Ordering::Acquire) {
604 for category_id in categories {
605 self.sync_cache.lock().await.add_category_mark_read(category_id);
606 }
607 self.db.set_category_read(categories)?;
608 } else {
609 let mut unread_articles_before: Vec<ArticleID> = Vec::new();
610 for category_id in categories {
611 let articles_before = self.db.read_articles(ArticleFilter::category_unread(category_id))?;
612 unread_articles_before.append(&mut articles_before.into_iter().map(|a| a.article_id).collect());
613 }
614
615 self.db.set_category_read(categories)?;
616 let api_result = self.api.read().await.set_category_read(categories, &unread_articles_before, client).await;
617
618 if api_result.is_err() {
620 self.db.set_article_read(&unread_articles_before, Read::Unread)?;
621 }
622
623 api_result?;
624 }
625
626 Ok(())
627 } else {
628 Err(NewsFlashError::NotLoggedIn)
629 }
630 }
631
632 pub async fn set_tag_read(&self, tags: &[TagID], client: &Client) -> NewsFlashResult<()> {
633 if self.is_offline.load(Ordering::Acquire) {
634 let mut actions = Vec::new();
635
636 for tag in tags {
637 let filter = ArticleFilter::tag_unread(tag);
638 let articles = self.db.read_articles(filter)?;
639 actions.append(
640 &mut articles
641 .into_iter()
642 .map(|article| OfflineAction {
643 action_type: OfflineActionType::Read,
644 article_id: article.article_id,
645 tag_id: None,
646 })
647 .collect(),
648 );
649 }
650
651 self.db.insert_offline_actions(&actions)?;
652 self.db.set_tag_read(tags)?;
653
654 Ok(())
655 } else if self.api.read().await.is_logged_in(client).await? {
656 if self.is_sync_ongoing.load(Ordering::Acquire) {
657 for tag_id in tags {
658 self.sync_cache.lock().await.add_tag_mark_read(tag_id);
659 }
660 self.db.set_tag_read(tags)?;
661 } else {
662 let mut unread_articles_before: Vec<ArticleID> = Vec::new();
663 for tag_id in tags {
664 let articles_before = self.db.read_articles(ArticleFilter::tag_unread(tag_id))?;
665 unread_articles_before.append(&mut articles_before.into_iter().map(|a| a.article_id).collect());
666 }
667
668 self.db.set_tag_read(tags)?;
669 let api_result = self.api.read().await.set_tag_read(tags, &unread_articles_before, client).await;
670
671 if api_result.is_err() {
673 self.db.set_article_read(&unread_articles_before, Read::Unread)?;
674 }
675
676 api_result?;
677 }
678
679 Ok(())
680 } else {
681 Err(NewsFlashError::NotLoggedIn)
682 }
683 }
684
685 pub async fn set_all_read(&self, client: &Client) -> NewsFlashResult<()> {
686 if self.is_offline.load(Ordering::Acquire) {
687 let filter = ArticleFilter::all_unread();
688 let articles = self.db.read_articles(filter)?;
689 let actions = articles
690 .into_iter()
691 .map(|article| OfflineAction {
692 action_type: OfflineActionType::Read,
693 article_id: article.article_id,
694 tag_id: None,
695 })
696 .collect::<Vec<_>>();
697
698 self.db.insert_offline_actions(&actions)?;
699 self.db.set_all_read()?;
700
701 Ok(())
702 } else if self.api.read().await.is_logged_in(client).await? {
703 if self.is_sync_ongoing.load(Ordering::Acquire) {
704 let categories = self.db.read_categories()?;
705 for category in categories {
706 self.sync_cache.lock().await.add_category_mark_read(&category.category_id);
707 }
708 self.db.set_all_read()?;
709 } else {
710 let unread_articles_before = self
711 .db
712 .read_articles(ArticleFilter::all_unread())?
713 .into_iter()
714 .map(|a| a.article_id)
715 .collect::<Vec<_>>();
716 self.db.set_all_read()?;
717
718 let api_result = self.api.read().await.set_all_read(&unread_articles_before, client).await;
719
720 if api_result.is_err() {
722 self.db.set_article_read(&unread_articles_before, Read::Unread)?;
723 }
724
725 api_result?;
726 }
727
728 Ok(())
729 } else {
730 Err(NewsFlashError::NotLoggedIn)
731 }
732 }
733
734 pub async fn add_feed(
735 &self,
736 url: &Url,
737 title: Option<String>,
738 category_id: Option<CategoryID>,
739 client: &Client,
740 ) -> NewsFlashResult<(Feed, FeedMapping, Option<Category>, Option<CategoryMapping>)> {
741 if self.is_offline.load(Ordering::Acquire) {
742 return Err(NewsFlashError::Offline);
743 }
744
745 if self.api.read().await.is_logged_in(client).await? {
746 if self.is_sync_ongoing.load(Ordering::Acquire) {
747 return Err(NewsFlashError::Syncing);
748 }
749
750 let (feed, category) = self.api.read().await.add_feed(url, title, category_id.clone(), client).await?;
751
752 self.db.insert_feed(&feed)?;
753
754 let category_mapping = if let Some(category) = &category {
755 if self.db.read_category(&category.category_id).is_ok() {
756 None
758 } else {
759 let category_mapping = CategoryMapping {
760 parent_id: NEWSFLASH_TOPLEVEL.clone(),
761 category_id: category.category_id.clone(),
762 sort_index: Some(i32::MAX),
763 };
764 self.db.insert_category(category)?;
765 self.db.insert_category_mapping(&category_mapping)?;
766 Some(category_mapping)
767 }
768 } else {
769 None
770 };
771
772 let category_id = match category_id {
773 Some(category_id) => Some(category_id),
774 None => category.as_ref().map(|c| c.category_id.clone()),
775 };
776
777 let feed_mapping = FeedMapping {
778 feed_id: feed.feed_id.clone(),
779 category_id: category_id.unwrap_or(NEWSFLASH_TOPLEVEL.clone()),
780 sort_index: Some(i32::MAX),
781 };
782
783 self.db.insert_feed_mapping(&feed_mapping)?;
784
785 return Ok((feed, feed_mapping, category, category_mapping));
786 }
787 Err(NewsFlashError::NotLoggedIn)
788 }
789
790 pub async fn remove_feed(&self, feed_id: &FeedID, client: &Client) -> NewsFlashResult<()> {
791 if self.is_offline.load(Ordering::Acquire) {
792 return Err(NewsFlashError::Offline);
793 }
794
795 if self.api.read().await.is_logged_in(client).await? {
796 if self.is_sync_ongoing.load(Ordering::Acquire) {
797 return Err(NewsFlashError::Syncing);
798 }
799 self.api.read().await.remove_feed(feed_id, client).await?;
800
801 self.db.drop_feed(feed_id)?;
803
804 #[cfg(feature = "image-downloader")]
805 self.delete_orphaned_images().await?;
806
807 return Ok(());
808 }
809 Err(NewsFlashError::NotLoggedIn)
810 }
811
812 pub async fn move_feed(&self, from: &FeedMapping, to: &FeedMapping, client: &Client) -> NewsFlashResult<()> {
813 if self.is_offline.load(Ordering::Acquire) {
814 return Err(NewsFlashError::Offline);
815 }
816
817 if from.category_id == to.category_id {
819 self.db.drop_feed_mapping(from)?;
821 self.db.insert_feed_mapping(to)?;
823 Ok(())
824 } else if self.api.read().await.is_logged_in(client).await? {
825 if self.is_sync_ongoing.load(Ordering::Acquire) {
826 return Err(NewsFlashError::Syncing);
827 }
828
829 self.api
830 .read()
831 .await
832 .move_feed(&from.feed_id, &from.category_id, &to.category_id, client)
833 .await?;
834
835 self.db.drop_feed_mapping(from)?;
837
838 self.db.insert_feed_mapping(to)?;
840 Ok(())
841 } else {
842 Err(NewsFlashError::NotLoggedIn)
843 }
844 }
845
846 pub async fn rename_feed(&self, feed_id: &FeedID, new_title: &str, client: &Client) -> NewsFlashResult<Feed> {
847 if self.is_offline.load(Ordering::Acquire) {
848 return Err(NewsFlashError::Offline);
849 }
850
851 if self.api.read().await.is_logged_in(client).await? {
852 if self.is_sync_ongoing.load(Ordering::Acquire) {
853 return Err(NewsFlashError::Syncing);
854 }
855
856 let new_id = self.api.read().await.rename_feed(feed_id, new_title, client).await?;
857
858 let mut modified_feed = self.db.read_feed(feed_id)?;
859 new_title.clone_into(&mut modified_feed.label);
860 modified_feed.feed_id = new_id.clone();
861
862 self.db.insert_feed(&modified_feed)?;
863
864 if &new_id != feed_id {
865 self.db.drop_feed(feed_id)?;
866
867 let mappings = self.db.read_feed_mappings(Some(feed_id), None)?;
869 let modified_mappings: Vec<FeedMapping> = mappings
870 .into_iter()
871 .map(|mut mapping| {
872 mapping.feed_id = new_id.clone();
873 mapping
874 })
875 .collect();
876 self.db.drop_mapping_of_feed(feed_id)?;
877 self.db.insert_feed_mappings(&modified_mappings)?;
878
879 let articles = self.db.read_articles(ArticleFilter {
881 feeds: Some([feed_id.clone()].into()),
882 ..ArticleFilter::default()
883 })?;
884 let mut modified_ids: Vec<ArticleID> = Vec::new();
885 let modified_articles: Vec<Article> = articles
886 .into_iter()
887 .map(|mut article| {
888 modified_ids.push(article.article_id.clone());
889 article.feed_id = new_id.clone();
890 article
891 })
892 .collect();
893 self.db.drop_articles(&modified_ids)?;
894 self.db.write_articles(&modified_articles)?;
895 }
896 return Ok(modified_feed);
897 }
898 Err(NewsFlashError::NotLoggedIn)
899 }
900
901 pub async fn edit_feed_url(&self, feed_id: &FeedID, new_url: &str, client: &Client) -> NewsFlashResult<()> {
902 if self.is_offline.load(Ordering::Acquire) {
903 return Err(NewsFlashError::Offline);
904 }
905
906 if self.api.read().await.is_logged_in(client).await? {
907 if self.is_sync_ongoing.load(Ordering::Acquire) {
908 return Err(NewsFlashError::Syncing);
909 }
910
911 let parsed_url = Url::parse(new_url)?;
913
914 self.api.read().await.edit_feed_url(feed_id, new_url, client).await?;
915
916 let mut modified_feed = self.db.read_feed(feed_id)?;
917 modified_feed.feed_url = Some(parsed_url);
918
919 self.db.insert_feed(&modified_feed)?;
920
921 Ok(())
922 } else {
923 Err(NewsFlashError::NotLoggedIn)
924 }
925 }
926
927 pub async fn sort_alphabetically(&self) -> NewsFlashResult<()> {
928 self.db.sort_alphabetically()?;
929 Ok(())
930 }
931
932 pub async fn add_category(&self, title: &str, parent: Option<&CategoryID>, client: &Client) -> NewsFlashResult<(Category, CategoryMapping)> {
933 if self.is_offline.load(Ordering::Acquire) {
934 return Err(NewsFlashError::Offline);
935 }
936
937 if self.api.read().await.is_logged_in(client).await? {
938 if self.is_sync_ongoing.load(Ordering::Acquire) {
939 return Err(NewsFlashError::Syncing);
940 }
941
942 let category_id = self.api.read().await.add_category(title, parent, client).await?;
943
944 let category = Category {
945 category_id: category_id.clone(),
946 label: title.to_owned(),
947 };
948
949 let category_mapping = CategoryMapping {
950 parent_id: match parent {
951 Some(parent) => parent.clone(),
952 None => NEWSFLASH_TOPLEVEL.clone(),
953 },
954 category_id,
955 sort_index: Some(i32::MAX),
956 };
957
958 self.db.insert_category(&category)?;
959 self.db.insert_category_mapping(&category_mapping)?;
960
961 return Ok((category, category_mapping));
962 }
963 Err(NewsFlashError::NotLoggedIn)
964 }
965
966 pub async fn remove_category(&self, category_id: &CategoryID, remove_children: bool, client: &Client) -> NewsFlashResult<()> {
967 if self.is_offline.load(Ordering::Acquire) {
968 return Err(NewsFlashError::Offline);
969 }
970
971 if self.api.read().await.is_logged_in(client).await? {
972 if self.is_sync_ongoing.load(Ordering::Acquire) {
973 return Err(NewsFlashError::Syncing);
974 }
975
976 self.api.read().await.remove_category(category_id, remove_children, client).await?;
977 } else {
978 return Err(NewsFlashError::NotLoggedIn);
979 }
980
981 if remove_children {
982 self.remove_category_from_db_recurse(category_id)?;
983 } else {
984 self.remove_category_from_db_move_children_up(category_id)?;
985 }
986
987 #[cfg(feature = "image-downloader")]
988 self.delete_orphaned_images().await?;
989
990 Ok(())
991 }
992
993 fn remove_category_from_db_move_children_up(&self, category_id: &CategoryID) -> NewsFlashResult<()> {
994 let parent_id = self
995 .db
996 .read_category_mappings(None, Some(category_id))?
997 .first()
998 .map(|m| m.parent_id.clone())
999 .unwrap_or_else(|| NEWSFLASH_TOPLEVEL.clone());
1000
1001 tracing::trace!(%parent_id);
1002
1003 let feed_mappings = self.db.read_feed_mappings(None, Some(category_id))?;
1005 tracing::trace!(%category_id, ?feed_mappings, "feeds of category");
1006
1007 for mut mapping in feed_mappings {
1008 self.db.drop_feed_mapping(&mapping)?;
1009 mapping.category_id = parent_id.clone();
1010 self.db.insert_feed_mapping(&mapping)?;
1011 }
1012
1013 let category_mappings = self.db.read_category_mappings(Some(category_id), None)?;
1015 tracing::trace!(%category_id, ?category_mappings, "child categories of category");
1016
1017 for mut mapping in category_mappings {
1018 self.db.drop_category_mapping(&mapping)?;
1019 mapping.parent_id = parent_id.clone();
1020 self.db.insert_category_mapping(&mapping)?;
1021 }
1022
1023 self.db.drop_category(category_id)?;
1024
1025 Ok(())
1026 }
1027
1028 fn remove_category_from_db_recurse(&self, category_id: &CategoryID) -> NewsFlashResult<()> {
1029 let mappings = self.db.read_feed_mappings(None, Some(category_id))?;
1031 tracing::trace!(%category_id, ?mappings, "drop feeds of category");
1032
1033 for mapping in mappings {
1034 self.db.drop_feed(&mapping.feed_id)?;
1035 }
1036
1037 self.db.drop_category(category_id)?;
1039
1040 let mappings = self.db.read_category_mappings(Some(category_id), None)?;
1042 tracing::trace!(%category_id, ?mappings, "drop child categories of category");
1043
1044 let categories = self.db.read_categories()?;
1045
1046 let child_categories: Vec<Category> = categories
1047 .into_iter()
1048 .filter(|category| mappings.iter().any(|mapping| mapping.category_id == category.category_id))
1049 .collect();
1050 for child_category in child_categories {
1051 self.remove_category_from_db_recurse(&child_category.category_id)?;
1052 }
1053
1054 Ok(())
1055 }
1056
1057 pub async fn rename_category(&self, category_id: &CategoryID, new_title: &str, client: &Client) -> NewsFlashResult<Category> {
1058 if self.is_offline.load(Ordering::Acquire) {
1059 return Err(NewsFlashError::Offline);
1060 }
1061
1062 if self.api.read().await.is_logged_in(client).await? {
1063 if self.is_sync_ongoing.load(Ordering::Acquire) {
1064 return Err(NewsFlashError::Syncing);
1065 }
1066
1067 let mut modified_category_mappings = self.db.read_category_mappings(None, Some(category_id))?;
1068 let mut modified_feed_mappings = self.db.read_feed_mappings(None, Some(category_id))?;
1069
1070 let new_id = self.api.read().await.rename_category(category_id, new_title, client).await?;
1071
1072 let mut modified_category = Category {
1073 category_id: category_id.clone(),
1074 label: new_title.to_owned(),
1075 };
1076
1077 if &new_id != category_id {
1078 self.db.drop_category(category_id)?;
1079 self.db.drop_feed_mappings_of_category(category_id)?;
1080 modified_category.category_id = new_id.clone();
1081
1082 modified_feed_mappings = modified_feed_mappings
1084 .into_iter()
1085 .map(|mut mapping| {
1086 mapping.category_id = new_id.clone();
1087 mapping
1088 })
1089 .collect();
1090
1091 modified_category_mappings = modified_category_mappings
1092 .into_iter()
1093 .map(|mut m| {
1094 m.category_id = new_id.clone();
1095 m
1096 })
1097 .collect();
1098 }
1099
1100 self.db.insert_category(&modified_category)?;
1101 self.db.insert_category_mappings(&modified_category_mappings)?;
1102 self.db.insert_feed_mappings(&modified_feed_mappings)?;
1103 return Ok(modified_category);
1104 }
1105 Err(NewsFlashError::NotLoggedIn)
1106 }
1107
1108 pub async fn move_category(&self, mapping: &CategoryMapping, client: &Client) -> NewsFlashResult<()> {
1109 if self.is_offline.load(Ordering::Acquire) {
1110 return Err(NewsFlashError::Offline);
1111 }
1112
1113 if mapping.parent_id == *NEWSFLASH_TOPLEVEL {
1115 self.db.drop_mapping_of_category(&mapping.category_id)?;
1116 self.db.insert_category_mapping(mapping)?;
1117 Ok(())
1118 } else if self.api.read().await.is_logged_in(client).await? {
1119 if self.is_sync_ongoing.load(Ordering::Acquire) {
1120 return Err(NewsFlashError::Syncing);
1121 }
1122
1123 self.api
1124 .read()
1125 .await
1126 .move_category(&mapping.category_id, &mapping.parent_id, client)
1127 .await?;
1128
1129 self.db.drop_mapping_of_category(&mapping.category_id)?;
1130 self.db.insert_category_mapping(mapping)?;
1131 Ok(())
1132 } else {
1133 Err(NewsFlashError::NotLoggedIn)
1134 }
1135 }
1136
1137 pub async fn add_tag(&self, title: &str, color: Option<String>, client: &Client) -> NewsFlashResult<Tag> {
1138 if self.is_offline.load(Ordering::Acquire) {
1139 return Err(NewsFlashError::Offline);
1140 }
1141
1142 if self.api.read().await.is_logged_in(client).await? {
1143 if self.is_sync_ongoing.load(Ordering::Acquire) {
1144 return Err(NewsFlashError::Syncing);
1145 }
1146
1147 let tag_id = self.api.read().await.add_tag(title, client).await?;
1148 let tag = Tag {
1149 tag_id,
1150 label: title.to_owned(),
1151 color,
1152 sort_index: Some(i32::MAX),
1153 };
1154
1155 self.db.insert_tag(&tag)?;
1156 return Ok(tag);
1157 }
1158 Err(NewsFlashError::NotLoggedIn)
1159 }
1160
1161 pub async fn remove_tag(&self, tag_id: &TagID, client: &Client) -> NewsFlashResult<()> {
1162 if self.is_offline.load(Ordering::Acquire) {
1163 return Err(NewsFlashError::Offline);
1164 }
1165
1166 if self.api.read().await.is_logged_in(client).await? {
1167 if self.is_sync_ongoing.load(Ordering::Acquire) {
1168 return Err(NewsFlashError::Syncing);
1169 }
1170
1171 self.api.read().await.remove_tag(tag_id, client).await?;
1172
1173 self.db.drop_tag(tag_id)?;
1174 return Ok(());
1175 }
1176 Err(NewsFlashError::NotLoggedIn)
1177 }
1178
1179 pub async fn edit_tag(&self, tag_id: &TagID, new_title: &str, new_color: &Option<String>, client: &Client) -> NewsFlashResult<Tag> {
1180 if self.is_offline.load(Ordering::Acquire) {
1181 return Err(NewsFlashError::Offline);
1182 }
1183
1184 if self.api.read().await.is_logged_in(client).await? {
1185 if self.is_sync_ongoing.load(Ordering::Acquire) {
1186 return Err(NewsFlashError::Syncing);
1187 }
1188
1189 let new_id = self.api.read().await.rename_tag(tag_id, new_title, client).await?;
1190
1191 let taggings = self.db.read_taggings(None, Some(tag_id))?;
1192 self.db.drop_tag(tag_id)?;
1193 let sort_index = self
1194 .db
1195 .read_tags()?
1196 .into_iter()
1197 .find(|tag| &tag.tag_id == tag_id)
1198 .and_then(|tag| tag.sort_index);
1199 let mutated_tag = Tag {
1200 tag_id: new_id.clone(),
1201 label: new_title.to_owned(),
1202 color: new_color.clone(),
1203 sort_index,
1204 };
1205 self.db.insert_tag(&mutated_tag)?;
1206 self.db.insert_taggings(&taggings)?;
1207
1208 let taggings = if &new_id != tag_id {
1209 taggings
1210 .into_iter()
1211 .map(|mut tagging| {
1212 tagging.tag_id = new_id.clone();
1213 tagging
1214 })
1215 .collect()
1216 } else {
1217 taggings
1218 };
1219 self.db.insert_taggings(&taggings)?;
1220
1221 return Ok(mutated_tag);
1222 }
1223 Err(NewsFlashError::NotLoggedIn)
1224 }
1225
1226 pub async fn tag_article(&self, article_id: &ArticleID, tag_id: &TagID, client: &Client) -> NewsFlashResult<()> {
1227 let tagging = Tagging {
1228 article_id: article_id.clone(),
1229 tag_id: tag_id.clone(),
1230 };
1231
1232 if self.is_offline.load(Ordering::Acquire) {
1233 let action = OfflineAction {
1234 action_type: OfflineActionType::Tag,
1235 article_id: article_id.clone(),
1236 tag_id: Some(tag_id.clone()),
1237 };
1238 self.db.insert_offline_actions(&[action])?;
1239 self.db.insert_tagging(&tagging)?;
1240
1241 Ok(())
1242 } else if self.api.read().await.is_logged_in(client).await? {
1243 if self.is_sync_ongoing.load(Ordering::Acquire) {
1244 self.sync_cache.lock().await.add_article_tagged(article_id, tag_id);
1245 } else {
1246 self.api.read().await.tag_article(article_id, tag_id, client).await?;
1247 self.db.insert_tagging(&tagging)?;
1248 }
1249
1250 Ok(())
1251 } else {
1252 Err(NewsFlashError::NotLoggedIn)
1253 }
1254 }
1255
1256 pub async fn untag_article(&self, article_id: &ArticleID, tag_id: &TagID, client: &Client) -> NewsFlashResult<()> {
1257 let tagging = Tagging {
1258 article_id: article_id.clone(),
1259 tag_id: tag_id.clone(),
1260 };
1261
1262 if self.is_offline.load(Ordering::Acquire) {
1263 let action = OfflineAction {
1264 action_type: OfflineActionType::Untag,
1265 article_id: article_id.clone(),
1266 tag_id: Some(tag_id.clone()),
1267 };
1268 self.db.insert_offline_actions(&[action])?;
1269 self.db.drop_tagging(&tagging)?;
1270
1271 Ok(())
1272 } else if self.api.read().await.is_logged_in(client).await? {
1273 if self.is_sync_ongoing.load(Ordering::Acquire) {
1274 self.sync_cache.lock().await.add_article_untagged(article_id, tag_id);
1275 } else {
1276 self.api.read().await.untag_article(article_id, tag_id, client).await?;
1277 self.db.drop_tagging(&tagging)?;
1278 }
1279 Ok(())
1280 } else {
1281 Err(NewsFlashError::NotLoggedIn)
1282 }
1283 }
1284
1285 pub async fn import_opml(&self, opml: &str, parse_all_feeds: bool, client: &Client) -> NewsFlashResult<()> {
1286 if self.is_offline.load(Ordering::Acquire) {
1287 return Err(NewsFlashError::Offline);
1288 }
1289
1290 if self.api.read().await.is_logged_in(client).await? {
1291 if self.is_sync_ongoing.load(Ordering::Acquire) {
1292 return Err(NewsFlashError::Syncing);
1293 }
1294
1295 self.api.read().await.import_opml(opml, client).await?;
1296 let opml_result = opml::parse_opml(opml, parse_all_feeds, self.download_semaphore.clone(), client).await?;
1297
1298 self.db.insert_categories(&opml_result.categories)?;
1299 self.db.insert_feeds(&opml_result.feeds)?;
1300 self.db.insert_feed_mappings(&opml_result.feed_mappings)?;
1301 self.db.insert_category_mappings(&opml_result.category_mappings)?;
1302 return Ok(());
1303 }
1304 Err(NewsFlashError::NotLoggedIn)
1305 }
1306
1307 pub async fn export_opml(&self) -> NewsFlashResult<String> {
1308 if self.is_offline.load(Ordering::Acquire) {
1309 return Err(NewsFlashError::Offline);
1310 }
1311
1312 let categories = self.db.read_categories()?;
1313 let category_mappings = self.db.read_category_mappings(None, None)?;
1314 let feeds = self.db.read_feeds()?;
1315 let feed_mappings = self.db.read_feed_mappings(None, None)?;
1316
1317 let opml_string = opml::generate_opml(&categories, &category_mappings, &feeds, &feed_mappings)?;
1318 Ok(opml_string)
1319 }
1320
1321 pub fn get_categories(&self) -> NewsFlashResult<(Vec<Category>, Vec<CategoryMapping>)> {
1322 let categories = self.db.read_categories()?;
1323 let category_mappings = self.db.read_category_mappings(None, None)?;
1324 Ok((categories, category_mappings))
1325 }
1326
1327 pub fn get_feeds(&self) -> NewsFlashResult<(Vec<Feed>, Vec<FeedMapping>)> {
1328 let feeds = self.db.read_feeds()?;
1329 let mappings = self.db.read_feed_mappings(None, None)?;
1330 Ok((feeds, mappings))
1331 }
1332
1333 pub fn unread_count_feed_map(&self, exclude_future: bool) -> NewsFlashResult<HashMap<FeedID, i64>> {
1334 let map = self.db.unread_count_feed_map(exclude_future)?;
1335 Ok(map)
1336 }
1337
1338 pub fn marked_count_feed_map(&self) -> NewsFlashResult<HashMap<FeedID, i64>> {
1339 let mut count_vec = self.db.marked_count_feed_map()?;
1340 let mut map: HashMap<FeedID, i64> = HashMap::new();
1341 count_vec.drain(..).for_each(|c| {
1342 map.insert(c.feed_id, c.count);
1343 });
1344 Ok(map)
1345 }
1346
1347 pub fn today_unread_count(&self, exclude_future: bool) -> NewsFlashResult<i64> {
1348 let count = self.db.today_unread_count(exclude_future)?;
1349 Ok(count)
1350 }
1351
1352 pub fn today_marked_count(&self) -> NewsFlashResult<i64> {
1353 let count = self.db.today_marked_count()?;
1354 Ok(count)
1355 }
1356
1357 pub fn get_tags(&self) -> NewsFlashResult<(Vec<Tag>, Vec<Tagging>)> {
1358 let tags = self.db.read_tags()?;
1359 let taggings = self.db.read_taggings(None, None)?;
1360 Ok((tags, taggings))
1361 }
1362
1363 pub fn get_tags_of_article(&self, article_id: &ArticleID) -> NewsFlashResult<Vec<Tag>> {
1364 let tags = self.db.read_tags_for_article(article_id)?;
1365 Ok(tags)
1366 }
1367
1368 pub fn unread_count_all(&self) -> NewsFlashResult<i64> {
1369 let count = self.db.unread_count_all()?;
1370 Ok(count)
1371 }
1372
1373 pub fn get_articles(&self, filter: ArticleFilter) -> NewsFlashResult<Vec<Article>> {
1374 let articles = self.db.read_articles(filter)?;
1375 Ok(articles)
1376 }
1377
1378 pub fn get_article(&self, id: &ArticleID) -> NewsFlashResult<Article> {
1379 let article = self.db.read_article(id)?;
1380 Ok(article)
1381 }
1382
1383 pub fn get_fat_articles(&self, filter: ArticleFilter) -> NewsFlashResult<Vec<FatArticle>> {
1384 let articles = self.db.read_fat_articles(filter)?;
1385 Ok(articles)
1386 }
1387
1388 pub fn get_fat_article(&self, id: &ArticleID) -> NewsFlashResult<FatArticle> {
1389 let article = self.db.read_fat_article(id)?;
1390 Ok(article)
1391 }
1392
1393 pub fn get_enclosures(&self, id: &ArticleID) -> NewsFlashResult<Vec<Enclosure>> {
1394 let enclosures = self.db.read_enclosures(id)?;
1395 Ok(enclosures)
1396 }
1397
1398 pub fn update_enclosure(&self, enclosure: &Enclosure) -> NewsFlashResult<()> {
1399 self.db.write_enclosures(std::slice::from_ref(enclosure))?;
1400 Ok(())
1401 }
1402
1403 #[cfg(feature = "article-scraper")]
1404 pub async fn scrap_content_feeds(&self, synced_after: DateTime<Utc>, feeds: &[FeedID], client: &Client) -> NewsFlashResult<()> {
1405 if self.is_sync_ongoing.load(Ordering::Acquire) {
1406 return Err(NewsFlashError::Syncing);
1407 }
1408
1409 let article_scraper = self.init_scraper().await;
1410
1411 let articles = self
1412 .db
1413 .read_fat_articles(ArticleFilter {
1414 feeds: Some(feeds.to_owned()),
1415 synced_after: Some(synced_after),
1416 ..Default::default()
1417 })?
1418 .into_iter()
1419 .filter(|article| article.scraped_content.is_none())
1420 .collect::<Vec<_>>();
1421
1422 let mut task_handles = Vec::new();
1423
1424 for article in articles.into_iter() {
1425 let semaphore = self.download_semaphore.clone();
1426 let client = client.clone();
1427 let article_scraper = article_scraper.clone();
1428
1429 task_handles.push(tokio::spawn(async move {
1430 Self::article_scrap_content_impl(article, article_scraper, semaphore, &client).await
1431 }));
1432 }
1433
1434 let result_vec = futures::future::join_all(task_handles)
1435 .await
1436 .into_iter()
1437 .flatten()
1438 .flatten()
1439 .collect::<Vec<_>>();
1440
1441 self.db.update_articles_grabbed_content(&result_vec)?;
1442
1443 Ok(())
1444 }
1445
1446 #[cfg(feature = "article-scraper")]
1447 pub async fn scrap_content_article(&self, id: &ArticleID, client: &Client) -> NewsFlashResult<FatArticle> {
1448 let article_scraper = self.init_scraper().await;
1449 let article = self.db.read_fat_article(id)?;
1450 let scraped_article = Self::article_scrap_content_impl(article, article_scraper, self.download_semaphore.clone(), client).await?;
1451 self.db.update_article_grabbed_content(&scraped_article)?;
1452 Ok(scraped_article)
1453 }
1454
1455 #[cfg(feature = "article-scraper")]
1456 async fn article_scrap_content_impl(
1457 mut article: FatArticle,
1458 article_scraper: Arc<ArticleScraper>,
1459 download_semaphore: Arc<Semaphore>,
1460 client: &Client,
1461 ) -> NewsFlashResult<FatArticle> {
1462 let Some(url) = &article.url else {
1463 tracing::error!("Article doesn't contain source URL");
1464 return Err(NewsFlashError::GrabContent);
1465 };
1466
1467 let permit = download_semaphore.acquire().await?;
1468 #[cfg(feature = "image-downloader")]
1469 let result = article_scraper.parse(url, client, false, None);
1470 #[cfg(not(feature = "image-downloader"))]
1471 let result = article_scraper.parse(url, client);
1472
1473 let processed_article = result.await.map_err(|error| {
1474 tracing::error!(%url, %error, "Internal scraper failed");
1475 NewsFlashError::GrabContent
1476 })?;
1477 drop(permit);
1478
1479 tracing::info!("Internal scraper: successfully scraped: '{url}'");
1480 if let Some(html) = processed_article.html {
1481 article.plain_text = Some(util::html2text::html2text(&html));
1482 article.scraped_content = Some(html);
1483 }
1484 if let Some(title) = processed_article.title
1485 && article.title.is_none()
1486 {
1487 article.title = Some(title);
1488 }
1489 if let Some(author) = processed_article.author
1490 && article.author.is_none()
1491 {
1492 article.author = Some(author);
1493 }
1494 if let Some(thumbnail_url) = processed_article.thumbnail_url
1495 && article.thumbnail_url.is_none()
1496 {
1497 article.thumbnail_url = Some(thumbnail_url);
1498 }
1499
1500 Ok(article)
1501 }
1502
1503 #[cfg(feature = "article-scraper")]
1504 async fn init_scraper(&self) -> Arc<ArticleScraper> {
1505 if self.scraper.read().await.is_none() {
1506 tracing::info!("Initialize ArticleScraper");
1507 let scraper_data_dir = self.data_dir.join(SCRAPER_DATA_DIR);
1508
1509 let scraper_data_dir = if std::fs::DirBuilder::new().recursive(true).create(&scraper_data_dir).is_ok() {
1510 Some(scraper_data_dir)
1511 } else {
1512 None
1513 };
1514
1515 let scraper = ArticleScraper::new(scraper_data_dir.as_deref()).await;
1516 let scraper = Arc::new(scraper);
1517 self.scraper.write().await.replace(scraper.clone());
1518 }
1519
1520 match self.scraper.read().await.as_ref() {
1521 Some(scraper) => scraper.clone(),
1522 None => unreachable!(),
1523 }
1524 }
1525
1526 #[cfg(feature = "image-downloader")]
1527 pub async fn get_image(
1528 &self,
1529 article_id: &ArticleID,
1530 url: &str,
1531 client: &Client,
1532 progress: Option<Sender<DownloadProgress>>,
1533 ) -> NewsFlashResult<Image> {
1534 let image_dir = self.data_dir.join(IMAGE_DATA_DIR);
1535 std::fs::DirBuilder::new().recursive(true).create(&image_dir)?;
1536
1537 let parsed_url = Url::parse(url)?;
1539 if let Ok(image) = self.db.read_image(&parsed_url) {
1540 if let Ok(mut file) = tokio::fs::File::open(&image.file_path).await {
1541 let mut contents = vec![];
1542 file.read_to_end(&mut contents).await?;
1543 return Ok(Image::from_metadata(image, contents));
1544 } else {
1545 tracing::warn!(%image.file_path, "Failed to open file");
1546 }
1547 }
1548
1549 let permit = self.download_semaphore.acquire().await?;
1550 let res = article_scraper::images::ImageDownloader::single_from_url(url, client, progress).await?;
1551 drop(permit);
1552
1553 let file_name = sanitize_filename::sanitize(format!("{article_id}_{url}"));
1554 let path = image_dir.join(file_name);
1555 tracing::debug!(?path, "writing image");
1556 tokio::fs::write(&path, &res)
1557 .await
1558 .inspect_err(|error| tracing::error!(%error, "Failed to write image"))?;
1559
1560 let (width, height) = if let Ok((width, height)) = image::image_dimensions(&path) {
1561 (Some(width as i32), Some(height as i32))
1562 } else {
1563 (None, None)
1564 };
1565
1566 if let Ok(mut enclosure) = self.db.read_enclosure(article_id, &parsed_url) {
1567 enclosure.width = width;
1568 enclosure.height = height;
1569
1570 self.db.write_enclosures(&[enclosure])?;
1571 }
1572
1573 let image = ImageMetadata {
1574 image_url: parsed_url,
1575 article_id: article_id.clone(),
1576 file_path: path.to_string_lossy().into(),
1577 width,
1578 height,
1579 };
1580
1581 self.db.write_image(&image)?;
1582
1583 Ok(Image::from_metadata(image, res))
1584 }
1585
1586 #[cfg(feature = "image-downloader")]
1587 pub fn delete_all_images(&self) -> NewsFlashResult<()> {
1588 self.db.drop_all_images()?;
1589 std::fs::remove_dir_all(self.data_dir.join(IMAGE_DATA_DIR))?;
1590 Ok(())
1591 }
1592
1593 #[cfg(feature = "image-downloader")]
1594 async fn delete_orphaned_images(&self) -> NewsFlashResult<()> {
1595 let images = self.db.read_images()?;
1596 let db_images = images
1597 .into_iter()
1598 .map(|image| {
1599 let mut path = PathBuf::new();
1600 path.push(image.file_path);
1601 path
1602 })
1603 .collect::<HashSet<PathBuf>>();
1604
1605 let dir = match std::fs::read_dir(self.data_dir.join(IMAGE_DATA_DIR)) {
1606 Ok(dir) => dir,
1607 Err(error) => {
1608 tracing::warn!(%error, "failed to read image directory");
1609 return Ok(());
1610 }
1611 };
1612 let existing_images = dir
1613 .into_iter()
1614 .filter_map(|item| item.ok().map(|i| i.path()))
1615 .collect::<HashSet<PathBuf>>();
1616
1617 let difference = existing_images.difference(&db_images);
1618
1619 for file_path in difference {
1620 if let Err(error) = std::fs::remove_file(file_path) {
1621 tracing::warn!(?file_path, %error, "Failed to delete file");
1622 }
1623 }
1624
1625 Ok(())
1626 }
1627}