use super::{
Error,
db::{self, AddingNewsOutput, InputNews, SourceFeedbackSignal, StorageError, uuid::Uuid},
inform::{self, InformantError, InformantTrait as _},
ml,
};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum Event {
NewsUpdated { source_id: Uuid, updates: AddingNewsOutput },
InformantError { source_id: Uuid, error: String },
}
impl super::Representative {
async fn trigger_informant(
&self,
network: super::net::InterfaceType,
informant_parameters: inform::Parameters,
) -> Result<Vec<InputNews>, InformantError> {
#[cfg(feature = "informant_feedrs")]
let network = self.network.new_client(network);
match informant_parameters {
#[cfg(any(test, not(feature = "_informant")))]
inform::Parameters::TestPlaceholder => {
unimplemented!()
}
#[cfg(feature = "informant_feedrs")]
inform::Parameters::FeedRs(parameters) => Ok(inform::feedrs::Informant::new(network).execute(parameters).await?),
}
}
#[tracing::instrument(skip(self), level = tracing::Level::DEBUG)]
pub async fn trigger_informant_by_source(&self, source: Uuid) -> Result<AddingNewsOutput, Error> {
let source = self.storage.get_source(source).await?;
let items = self
.trigger_informant(
super::net::InterfaceType::try_from(source.network)?,
serde_json::from_value::<inform::Parameters>(source.informant_parameters)
.map_err(inform::InformantError::from)?,
)
.await?;
Ok(self.storage.add_news(source.id, items).await?)
}
#[tracing::instrument(skip(self), level = tracing::Level::DEBUG)]
async fn trigger_informant_with_telemetry_by_source(&self, source: Uuid) -> Result<(), Error> {
let source = self.storage.get_source(source).await?;
let started_at = time::OffsetDateTime::now_utc();
let input_items = self
.trigger_informant(
super::net::InterfaceType::try_from(source.network)?,
serde_json::from_value::<inform::Parameters>(source.informant_parameters)
.map_err(inform::InformantError::from)?,
)
.await;
match (input_items, time::OffsetDateTime::now_utc() - started_at) {
(Ok(input_items), duration) => {
let addition_output = self.storage.add_news(source.id, input_items).await?;
self.storage
.store_source_feedback_signal(SourceFeedbackSignal::FetchSignal {
source: source.id,
done_at: started_at,
duration,
failure_code: None,
new_items_count: addition_output.new.len().try_into().map_err(StorageError::from)?,
latest_publish_at: addition_output.latest_publish,
oldest_publish_at: addition_output.oldest_publish,
})
.await?;
self.send_event(move || Event::NewsUpdated {
source_id: source.id,
updates: addition_output,
});
}
(Err(error), duration) => {
tracing::error!(?source.id, ?error, "Informant execution failure");
self.storage
.store_source_feedback_signal(SourceFeedbackSignal::FetchSignal {
source: source.id,
done_at: started_at,
duration,
failure_code: Some(error.kind()),
new_items_count: 0,
oldest_publish_at: None,
latest_publish_at: None,
})
.await?;
self.send_event(move || Event::InformantError {
source_id: source.id,
error: error.to_string(),
});
}
}
Ok(())
}
pub async fn start_scheduler(
&self,
mut storage_notify_receiver: tokio::sync::mpsc::Receiver<db::ScheduleNotify>,
ml_config: ml::scheduler::Config,
) -> Result<(), Error> {
let mut scheduler = ml::scheduler::Calender::new(ml_config);
let mut rng = rand::rng();
let notify_new_enabled_source = tokio::sync::Notify::new();
let (run_task_sender, mut run_task_receiver) = tokio::sync::mpsc::unbounded_channel::<Vec<Uuid>>();
let (end_task_sender, mut end_task_receiver) = tokio::sync::mpsc::unbounded_channel::<Vec<Uuid>>();
{
let initial_enabled_sources = self
.storage
.get_all_sources(Some(true))
.await?
.into_iter()
.map(|s| s.id)
.collect::<Vec<Uuid>>();
end_task_sender.send(initial_enabled_sources).unwrap();
}
tokio::select! {
result = async {
loop {
let tasks = run_task_receiver.recv().await.unwrap();
for source in &tasks {
self.trigger_informant_with_telemetry_by_source(*source).await?;
}
end_task_sender.send(tasks).unwrap();
}
#[expect(unused, reason = "for type inference")]
Ok::<(), Error>(())
} => { result? },
result = async {
loop {
tracing::debug!(?scheduler);
tokio::select! {
Some(notification) = storage_notify_receiver.recv() => {
match notification {
db::ScheduleNotify::SourceEnabled(source) => {
tracing::debug!(source.id=?source, "scheduling newly enabled source");
scheduler.schedule_source(source, &mut rng).await?;
notify_new_enabled_source.notify_one();
}
db::ScheduleNotify::SourceDisabled(source) => {
tracing::debug!(source.id=?source, "unscheduling disabled source");
scheduler.unschedule_fetch(source)?;
}
}
},
Some(tasks) = end_task_receiver.recv() => {
for source in tasks {
scheduler.schedule_source(source, &mut rng).await?;
}
},
result = async {
let now = time::OffsetDateTime::now_utc();
match scheduler.next_time() {
Some(next_time) => {
let sleep_duration = next_time - now;
if sleep_duration > time::Duration::ZERO {
tokio::time::sleep(std::time::Duration::from_millis(sleep_duration.unsigned_abs().as_millis() as u64))
.await;
}
let due_tasks = scheduler.pop_due(time::OffsetDateTime::now_utc());
run_task_sender.send(due_tasks).unwrap();
}
None => {
tracing::info!("waiting, no enabled source to schedule");
notify_new_enabled_source.notified().await;
}
}
Ok::<(), Error>(())
} => { result? }
}
}
#[expect(unused, reason = "for type inference")]
Ok::<(), Error>(())
} => { result? }
}
Ok(())
}
pub fn open_events_channel(&mut self) -> tokio::sync::mpsc::UnboundedReceiver<Event> {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<Event>();
self.events_sender = Some(sender);
receiver
}
pub fn close_events_channel(&mut self) {
self.events_sender = None;
}
fn send_event<F>(&self, event: F)
where
F: FnOnce() -> Event,
{
if let Some(events_sender) = &self.events_sender {
let event = event();
tracing::debug!(?event, "sending to events channel");
if let Err(error) = events_sender.send(event) {
tracing::warn!(
event=?error.0,
"Failed to send event due to the receiver being closed. Please make sure to close the sender as well."
);
}
}
}
}