use crate::config::Config;
use crate::llm::{summarize, translate};
use crate::monitor::{FeedStatus, LogStatus, Monitor, TranslationLog, TranslationStage};
use crate::rss::fetch;
use crate::storage::{Article, Enclosure, FeedData};
use std::sync::Arc;
use tokio::sync::{RwLock, Semaphore};
use uuid::Uuid;
#[derive(Clone)]
pub struct Scheduler {
config: Arc<RwLock<Config>>,
monitor: Arc<RwLock<Monitor>>,
semaphore: Arc<Semaphore>,
}
impl Scheduler {
pub fn new(config: Arc<RwLock<Config>>, monitor: Arc<RwLock<Monitor>>) -> Self {
let max_concurrent = config
.try_read()
.map(|c| c.llm.max_concurrent_requests)
.unwrap_or(3);
Scheduler {
config,
monitor,
semaphore: Arc::new(Semaphore::new(max_concurrent)),
}
}
pub async fn process_feed(&self, feed_name: &str, feed_url: &str) {
tracing::info!("Processing feed: {} ({})", feed_name, feed_url);
let start = std::time::Instant::now();
self.monitor.write().await.ensure_feed(feed_name);
self.monitor
.write()
.await
.set_status(feed_name, FeedStatus::Fetching);
let raw_articles = match fetch::fetch_feed(feed_url).await {
Ok(a) => a,
Err(e) => {
let ms = start.elapsed().as_millis() as u64;
tracing::error!("Fetch failed '{}': {}", feed_name, e);
self.monitor
.write()
.await
.finish_fetch(feed_name, ms, Some(&e.to_string()));
return;
}
};
self.monitor.write().await.finish_fetch(
feed_name,
start.elapsed().as_millis() as u64,
None,
);
let mut feed_data = match FeedData::load(feed_name) {
Ok(d) => d,
Err(e) => {
tracing::error!("Load failed: {}", e);
return;
}
};
let config = self.config.read().await.clone();
let new_articles: Vec<_> = raw_articles
.into_iter()
.filter(|a| !feed_data.contains_link(&a.link))
.collect();
if new_articles.is_empty() {
self.monitor
.write()
.await
.set_status(feed_name, FeedStatus::Done);
return;
}
let total = new_articles.len() as u32;
self.monitor.write().await.set_status(
feed_name,
FeedStatus::Translating {
completed: 0,
total,
in_progress: vec![],
},
);
let tc = config.llm.translation.clone();
let sc = config.llm.summary.clone();
let target = config.language.target.clone();
let semaphore = self.semaphore.clone();
let monitor = self.monitor.clone();
let feed_name_owned = feed_name.to_string();
let handles: Vec<_> = new_articles
.into_iter()
.map(|raw| {
let tc = tc.clone();
let sc = sc.clone();
let target = target.clone();
let semaphore = semaphore.clone();
let monitor = monitor.clone();
let feed_name = feed_name_owned.clone();
let title = raw.title.clone();
tokio::spawn(async move {
let result = process_single_article(
&feed_name,
raw,
&tc,
&sc,
&target,
semaphore,
monitor.clone(),
)
.await;
(feed_name, title, result)
})
})
.collect();
for handle in handles {
match handle.await {
Ok((feed_name, title, Ok(article))) => {
feed_data.articles.push(article);
monitor.write().await.complete_article(&feed_name, &title);
}
Ok((feed_name, title, Err(e))) => {
tracing::error!("Article processing failed '{}': {}", feed_name, e);
monitor.write().await.complete_article(&feed_name, &title);
}
Err(e) => {
tracing::error!("Task join error: {}", e);
}
}
}
feed_data
.articles
.sort_by(|a, b| b.published_at.cmp(&a.published_at));
if let Err(e) = feed_data.save(feed_name) {
tracing::error!("Save failed '{}': {}", feed_name, e);
}
tracing::info!(
"Feed '{}' processed: {} total",
feed_name,
feed_data.article_count()
);
self.monitor
.write()
.await
.set_status(feed_name, FeedStatus::Done);
}
pub async fn process_all(&self) {
let cfg = self.config.read().await;
let feeds: Vec<_> = cfg
.feeds
.iter()
.filter(|f| f.enabled)
.map(|f| (f.name.clone(), f.url.clone()))
.collect();
drop(cfg);
let handles: Vec<_> = feeds
.into_iter()
.map(|(name, url)| {
let scheduler = self.clone();
tokio::spawn(async move {
scheduler.process_feed(&name, &url).await;
})
})
.collect();
for handle in handles {
if let Err(e) = handle.await {
tracing::error!("Feed task join error: {}", e);
}
}
}
pub async fn run_loop(self: Arc<Self>) {
loop {
self.process_all().await;
let interval = {
let c = self.config.read().await;
c.feeds
.iter()
.filter(|f| f.enabled)
.map(|f| f.interval_secs)
.min()
.unwrap_or(300)
};
tokio::time::sleep(std::time::Duration::from_secs(interval)).await;
}
}
}
fn mlog(title: &str, stage: TranslationStage, model: &str) -> TranslationLog {
TranslationLog {
id: Uuid::new_v4().to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
article_title: title.to_string(),
stage,
status: LogStatus::Started,
model: model.to_string(),
prompt_tokens: None,
completion_tokens: None,
streamed_text: String::new(),
}
}
fn mtok(monitor: Arc<RwLock<Monitor>>, feed: String, lid: String) -> impl FnMut(&str) {
move |t: &str| {
let m = monitor.clone();
let f = feed.clone();
let l = lid.clone();
let s = t.to_string();
tokio::task::spawn(async move {
m.write().await.update_log(&f, &l, |log| {
log.streamed_text.push_str(&s);
log.status = LogStatus::Streaming {
tokens: log.streamed_text.clone(),
};
});
});
}
}
async fn process_single_article(
feed_name: &str,
raw: crate::rss::fetch::RawArticle,
tc: &crate::config::LlmProviderConfig,
sc: &crate::config::LlmProviderConfig,
target: &str,
semaphore: Arc<Semaphore>,
monitor: Arc<RwLock<Monitor>>,
) -> Result<Article, crate::error::AppError> {
let title = raw.title.clone();
monitor.write().await.start_article(feed_name, &title, 1);
let source_lang = crate::lang::detect(&raw.content).or_else(|| crate::lang::detect(&raw.title));
let needs_ct = !raw.content.is_empty() && crate::lang::needs_translation(&raw.content, target);
let needs_tt = crate::lang::needs_translation(&raw.title, target);
let model = tc.model.clone();
let sum_model = sc.model.clone();
let mut total_translation_tokens: u32 = 0;
let (final_title, tt) = if needs_tt {
let _permit = semaphore.acquire().await.unwrap();
let log = mlog(&raw.title, TranslationStage::TranslatingTitle, &model);
let lid = log.id.clone();
monitor.write().await.add_log(feed_name, log);
let ot = mtok(monitor.clone(), feed_name.to_string(), lid.clone());
match translate::translate(tc, &raw.title, target, ot).await {
Ok(r) => {
let translated = r.text != raw.title;
monitor.write().await.update_log(feed_name, &lid, |l| {
if translated {
l.status = LogStatus::Completed;
l.prompt_tokens = Some(r.usage.prompt_tokens);
l.completion_tokens = Some(r.usage.completion_tokens);
} else {
l.status = LogStatus::Failed("model returned untranslated text".into());
}
});
if translated {
total_translation_tokens += r.usage.prompt_tokens + r.usage.completion_tokens;
monitor.write().await.add_token_usage(
feed_name,
&model,
r.usage.prompt_tokens,
r.usage.completion_tokens,
);
}
(
if translated {
r.text
} else {
raw.title.clone()
},
translated,
)
}
Err(e) => {
monitor.write().await.update_log(feed_name, &lid, |l| {
l.status = LogStatus::Failed(e.to_string());
});
(raw.title.clone(), false)
}
}
} else {
(raw.title.clone(), false)
};
let (final_content, ct) = if needs_ct {
let _permit = semaphore.acquire().await.unwrap();
let log = mlog(&raw.title, TranslationStage::TranslatingContent, &model);
let lid = log.id.clone();
monitor.write().await.add_log(feed_name, log);
let ot = mtok(monitor.clone(), feed_name.to_string(), lid.clone());
match translate::translate(tc, &raw.content, target, ot).await {
Ok(r) => {
let translated = r.text != raw.content;
monitor.write().await.update_log(feed_name, &lid, |l| {
if translated {
l.status = LogStatus::Completed;
l.prompt_tokens = Some(r.usage.prompt_tokens);
l.completion_tokens = Some(r.usage.completion_tokens);
} else {
l.status = LogStatus::Failed("model returned untranslated text".into());
}
});
if translated {
total_translation_tokens += r.usage.prompt_tokens + r.usage.completion_tokens;
monitor.write().await.add_token_usage(
feed_name,
&model,
r.usage.prompt_tokens,
r.usage.completion_tokens,
);
}
(
if translated {
r.text
} else {
raw.content.clone()
},
translated,
)
}
Err(e) => {
monitor.write().await.update_log(feed_name, &lid, |l| {
l.status = LogStatus::Failed(e.to_string());
});
(raw.content.clone(), false)
}
}
} else {
(raw.content.clone(), false)
};
let summary = {
let _permit = semaphore.acquire().await.unwrap();
let log = mlog(&final_title, TranslationStage::Summarizing, &sum_model);
let lid = log.id.clone();
monitor.write().await.add_log(feed_name, log);
let ot = mtok(monitor.clone(), feed_name.to_string(), lid.clone());
match summarize::summarize(sc, &final_title, &final_content, ot).await {
Ok(r) => {
monitor.write().await.update_log(feed_name, &lid, |l| {
l.status = LogStatus::Completed;
l.prompt_tokens = Some(r.usage.prompt_tokens);
l.completion_tokens = Some(r.usage.completion_tokens);
});
monitor.write().await.add_token_usage(
feed_name,
&sum_model,
r.usage.prompt_tokens,
r.usage.completion_tokens,
);
Some(r.text)
}
Err(e) => {
monitor.write().await.update_log(feed_name, &lid, |l| {
l.status = LogStatus::Failed(e.to_string());
});
None
}
}
};
let enclosure = raw.media_urls.first().map(|m| Enclosure {
url: m.url.clone(),
content_type: m.content_type.clone(),
length: m.length,
});
let translation_tokens = if total_translation_tokens > 0 {
Some(total_translation_tokens)
} else {
None
};
Ok(Article {
id: raw
.guid
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string()),
feed_name: feed_name.to_string(),
title: final_title,
original_title: raw.title,
link: raw.link,
content: final_content,
original_content: raw.content,
summary,
translated: ct,
translated_title: tt,
source_lang,
published_at: raw.published_at.clone(),
published_at_rfc2822: chrono::DateTime::parse_from_rfc2822(&raw.published_at)
.ok()
.map(|dt| dt.to_rfc2822()),
processed_at: chrono::Utc::now().to_rfc3339(),
author: raw.author,
categories: raw.categories,
translation_model: if ct || tt { Some(model.clone()) } else { None },
translation_tokens,
enclosure,
})
}