use std::thread;
use log::{debug, error};
use serde::de::DeserializeOwned;
use tokio::sync::mpsc::Sender;
use crate::{
message_queue::{errors::MessageQueueError, MessageQueue, TaskExecutionEnqueued},
task::{Task, TaskExecutionData},
};
pub async fn consume<T>(mut message_queue: MessageQueue, task_request_tx: Sender<TaskExecutionData>)
where
T: DeserializeOwned + Task + Send + Sync + 'static,
{
loop {
thread::sleep(std::time::Duration::from_secs(1));
let message: Result<Option<TaskExecutionEnqueued>, MessageQueueError> =
message_queue.pop_task_execution().await;
debug!("Got message: {:?}", message);
match message {
Ok(Some(msg)) => {
let TaskExecutionEnqueued {
topic: _,
record,
task_data,
} = msg;
match serde_json::from_str::<T>(&task_data) {
Ok(task) => {
debug!("Task deserialized successfully: {}", record.id);
if let Err(e) = task_request_tx
.send(TaskExecutionData {
request_id: record.parent_request_id,
execution_id: record.id,
task: Box::new(task),
})
.await
{
error!("Failed to send task request: {}", e);
break;
}
}
Err(e) => {
error!("Failed to deserialize task: {}", e);
continue;
}
};
}
Ok(None) => {
continue;
}
Err(e) => {
error!("Error popping task execution: {:?}", e);
continue;
}
}
}
}