use crate::config::Config;
use crate::monitor::{FeedStatus, LogStatus, Monitor, TranslationLog, TranslationStage};
use crate::rss::fetch;
use crate::storage::{Article, Enclosure, FeedData, FeedMeta};
use std::sync::Arc;
use tokio::sync::{RwLock, Semaphore};
use uuid::Uuid;
#[derive(Clone)]
pub struct Scheduler {
config: Arc<RwLock<Config>>,
monitor: Arc<RwLock<Monitor>>,
semaphore: Arc<Semaphore>,
}
impl Scheduler {
pub fn new(config: Arc<RwLock<Config>>, monitor: Arc<RwLock<Monitor>>) -> Self {
let max_concurrent = config
.try_read()
.map(|c| c.llm.max_concurrent_requests)
.unwrap_or(3);
Scheduler {
config,
monitor,
semaphore: Arc::new(Semaphore::new(max_concurrent)),
}
}
pub async fn process_feed(&self, feed_name: &str, feed_url: &str) {
tracing::info!("Processing feed: {} ({})", feed_name, feed_url);
let start = std::time::Instant::now();
self.monitor.write().await.ensure_feed(feed_name);
self.monitor
.write()
.await
.set_status(feed_name, FeedStatus::Fetching);
let raw_articles = match fetch::fetch_feed(feed_url).await {
Ok(a) => a,
Err(e) => {
let ms = start.elapsed().as_millis() as u64;
tracing::error!("Fetch failed '{}': {}", feed_name, e);
self.monitor
.write()
.await
.finish_fetch(feed_name, ms, Some(&e.to_string()));
return;
}
};
let max_articles = {
let cfg = self.config.read().await;
cfg.feeds
.iter()
.find(|f| f.name == feed_name)
.map(|f| f.max_articles)
.unwrap_or(25)
};
let raw_articles: Vec<_> = raw_articles.into_iter().take(max_articles).collect();
self.monitor.write().await.finish_fetch(
feed_name,
start.elapsed().as_millis() as u64,
None,
);
let feed_data = match FeedData::load(feed_name) {
Ok(d) => d,
Err(e) => {
tracing::error!("Load failed: {}", e);
return;
}
};
let config = self.config.read().await.clone();
let new_articles: Vec<_> = raw_articles
.into_iter()
.filter(|a| !feed_data.contains_link(&a.link))
.collect();
if new_articles.is_empty() {
self.monitor
.write()
.await
.set_status(feed_name, FeedStatus::Done);
return;
}
let mut feed_meta = match FeedMeta::load(feed_name) {
Ok(m) => m,
Err(e) => {
tracing::error!("Load meta failed '{}': {}", feed_name, e);
return;
}
};
let total = new_articles.len() as u32;
self.monitor.write().await.set_status(
feed_name,
FeedStatus::Translating {
completed: 0,
total,
in_progress: vec![],
},
);
let tc = config.llm.translation.clone();
let target = config.language.target.clone();
let semaphore = self.semaphore.clone();
let monitor = self.monitor.clone();
let feed_name_owned = feed_name.to_string();
let handles: Vec<_> = new_articles
.into_iter()
.map(|raw| {
let tc = tc.clone();
let target = target.clone();
let semaphore = semaphore.clone();
let monitor = monitor.clone();
let feed_name = feed_name_owned.clone();
let title = raw.title.clone();
let link = raw.link.clone();
let published_at = raw.published_at.clone();
let guid = raw.guid.clone();
tokio::spawn(async move {
let result = process_single_article(
&feed_name,
raw,
&tc,
&target,
semaphore,
monitor.clone(),
)
.await;
(feed_name, title, link, published_at, guid, result)
})
})
.collect();
for handle in handles {
match handle.await {
Ok((feed_name, title, _link, _published_at, _guid, Ok(article))) => {
if let Err(e) = article.save_to_file(&feed_name) {
tracing::error!("Failed to save article '{}': {}", title, e);
monitor.write().await.complete_article(&feed_name, &title);
continue;
}
feed_meta.add_article(&article.id, &article.published_at);
if let Err(e) = feed_meta.save(&feed_name) {
tracing::error!("Failed to save feed meta '{}': {}", feed_name, e);
}
monitor.write().await.complete_article(&feed_name, &title);
}
Ok((feed_name, title, link, published_at, guid, Err(e))) => {
tracing::error!("Article processing failed '{}': {}", title, e);
let article = Article {
id: guid.unwrap_or_else(|| Uuid::new_v4().to_string()),
feed_name: feed_name.clone(),
title: title.clone(),
original_title: title.clone(),
link,
content: String::new(),
original_content: String::new(),
summary: None,
translated: false,
translated_title: false,
source_lang: None,
published_at,
published_at_rfc2822: None,
processed_at: chrono::Utc::now().to_rfc3339(),
author: None,
categories: vec![],
translation_model: None,
translation_tokens: None,
enclosure: None,
};
if let Err(e) = article.save_to_file(&feed_name) {
tracing::error!("Failed to save failed article '{}': {}", title, e);
}
feed_meta.add_article(&article.id, &article.published_at);
monitor.write().await.complete_article(&feed_name, &title);
}
Err(e) => {
tracing::error!("Task join error: {}", e);
}
}
}
if let Err(e) = feed_meta.save(feed_name) {
tracing::error!("Failed to save feed meta '{}': {}", feed_name, e);
}
tracing::info!(
"Feed '{}' processed: {} total",
feed_name,
feed_meta.article_ids.len()
);
self.monitor
.write()
.await
.set_status(feed_name, FeedStatus::Done);
}
pub async fn process_all(&self) {
let cfg = self.config.read().await;
let feeds: Vec<_> = cfg
.feeds
.iter()
.filter(|f| f.enabled)
.map(|f| (f.name.clone(), f.url.clone()))
.collect();
drop(cfg);
let handles: Vec<_> = feeds
.into_iter()
.map(|(name, url)| {
let scheduler = self.clone();
tokio::spawn(async move {
scheduler.process_feed(&name, &url).await;
})
})
.collect();
for handle in handles {
if let Err(e) = handle.await {
tracing::error!("Feed task join error: {}", e);
}
}
}
pub async fn run_loop(self: Arc<Self>) {
loop {
self.process_all().await;
let interval = {
let c = self.config.read().await;
c.feeds
.iter()
.filter(|f| f.enabled)
.map(|f| f.interval_secs)
.min()
.unwrap_or(300)
};
tokio::time::sleep(std::time::Duration::from_secs(interval)).await;
}
}
}
fn mlog(title: &str, stage: TranslationStage, model: &str) -> TranslationLog {
TranslationLog {
id: Uuid::new_v4().to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
article_title: title.to_string(),
stage,
status: LogStatus::Started,
model: model.to_string(),
prompt_tokens: None,
completion_tokens: None,
streamed_text: String::new(),
}
}
fn mtok(monitor: Arc<RwLock<Monitor>>, feed: String, lid: String) -> impl FnMut(&str) {
move |t: &str| {
let m = monitor.clone();
let f = feed.clone();
let l = lid.clone();
let s = t.to_string();
tokio::task::spawn(async move {
m.write().await.update_log(&f, &l, |log| {
log.streamed_text.push_str(&s);
log.status = LogStatus::Streaming {
tokens: log.streamed_text.clone(),
};
});
});
}
}
async fn process_single_article(
feed_name: &str,
raw: crate::rss::fetch::RawArticle,
tc: &crate::config::LlmProviderConfig,
target: &str,
semaphore: Arc<Semaphore>,
monitor: Arc<RwLock<Monitor>>,
) -> Result<Article, crate::error::AppError> {
let title = raw.title.clone();
monitor.write().await.start_article(feed_name, &title, 1);
let source_lang = crate::lang::detect(&raw.content).or_else(|| crate::lang::detect(&raw.title));
let needs_translation =
!raw.content.is_empty() && crate::lang::needs_translation(&raw.content, target);
let needs_title_translation = crate::lang::needs_translation(&raw.title, target);
if !needs_translation && !needs_title_translation {
return Ok(Article {
id: raw
.guid
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string()),
feed_name: feed_name.to_string(),
title: raw.title.clone(),
original_title: raw.title,
link: raw.link,
content: raw.content.clone(),
original_content: raw.content,
summary: None,
translated: false,
translated_title: false,
source_lang,
published_at: raw.published_at.clone(),
published_at_rfc2822: chrono::DateTime::parse_from_rfc2822(&raw.published_at)
.ok()
.map(|dt| dt.to_rfc2822()),
processed_at: chrono::Utc::now().to_rfc3339(),
author: raw.author,
categories: raw.categories,
translation_model: None,
translation_tokens: None,
enclosure: raw.media_urls.first().map(|m| Enclosure {
url: m.url.clone(),
content_type: m.content_type.clone(),
length: m.length,
}),
});
}
let model = tc.model.clone();
let _permit = semaphore.acquire().await.unwrap();
let log = mlog(&raw.title, TranslationStage::TranslateAndSummarize, &model);
let lid = log.id.clone();
monitor.write().await.add_log(feed_name, log);
let ot = mtok(monitor.clone(), feed_name.to_string(), lid.clone());
match crate::llm::translate_summarize::translate_and_summarize(
tc,
&raw.title,
&raw.content,
target,
ot,
)
.await
{
Ok((result, parsed)) => {
let translated_title = parsed.title.is_some();
let translated_content = parsed.content.is_some();
monitor.write().await.update_log(feed_name, &lid, |l| {
l.status = LogStatus::Completed;
l.prompt_tokens = Some(result.usage.prompt_tokens);
l.completion_tokens = Some(result.usage.completion_tokens);
});
monitor.write().await.add_token_usage(
feed_name,
&model,
result.usage.prompt_tokens,
result.usage.completion_tokens,
);
let final_title = parsed.title.unwrap_or_else(|| raw.title.clone());
let final_content = parsed.content.unwrap_or_else(|| raw.content.clone());
let summary = parsed.summary;
Ok(Article {
id: raw
.guid
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string()),
feed_name: feed_name.to_string(),
title: final_title,
original_title: raw.title,
link: raw.link,
content: final_content,
original_content: raw.content,
summary,
translated: translated_content,
translated_title,
source_lang,
published_at: raw.published_at.clone(),
published_at_rfc2822: chrono::DateTime::parse_from_rfc2822(&raw.published_at)
.ok()
.map(|dt| dt.to_rfc2822()),
processed_at: chrono::Utc::now().to_rfc3339(),
author: raw.author,
categories: raw.categories,
translation_model: if translated_content || translated_title {
Some(model.clone())
} else {
None
},
translation_tokens: Some(
result.usage.prompt_tokens + result.usage.completion_tokens,
),
enclosure: raw.media_urls.first().map(|m| Enclosure {
url: m.url.clone(),
content_type: m.content_type.clone(),
length: m.length,
}),
})
}
Err(e) => {
monitor.write().await.update_log(feed_name, &lid, |l| {
l.status = LogStatus::Failed(e.to_string());
});
Err(e)
}
}
}