subliminal 0.0.4

Base crate for subliminal microservices project
Documentation
use std::thread;

use log::{debug, error};
use serde::de::DeserializeOwned;
use tokio::sync::mpsc::Sender;

use crate::{
    message_queue::{errors::MessageQueueError, MessageQueue, TaskExecutionEnqueued},
    task::{Task, TaskExecutionData},
};

/// This function represents a consumer.
/// It's job is to pop serialized task requests from the message queue, deserialize them, and place them into the task channel.
/// From there, the dispatcher will pick them up and dispatch them to the workers as needed.
pub async fn consume<T>(mut message_queue: MessageQueue, task_request_tx: Sender<TaskExecutionData>)
where
    T: DeserializeOwned + Task + Send + Sync + 'static,
{
    loop {
        thread::sleep(std::time::Duration::from_secs(1));

        // Get a generic enqueued task execution from the message queue
        let message: Result<Option<TaskExecutionEnqueued>, MessageQueueError> =
            message_queue.pop_task_execution().await;

        debug!("Got message: {:?}", message);
        match message {
            Ok(Some(msg)) => {
                let TaskExecutionEnqueued {
                    topic: _,
                    record,
                    task_data,
                } = msg;

                // task_data contains serialized task data, so deserialize it into the desired task type
                match serde_json::from_str::<T>(&task_data) {
                    Ok(task) => {
                        debug!("Task deserialized successfully: {}", record.id);

                        // Send the deserialized task data to the dispatcher for execution
                        if let Err(e) = task_request_tx
                            .send(TaskExecutionData {
                                request_id: record.parent_request_id,
                                execution_id: record.id,
                                task: Box::new(task),
                            })
                            .await
                        {
                            error!("Failed to send task request: {}", e);
                            break;
                        }
                    }
                    Err(e) => {
                        // TODO: Dead letter queue or send an update to the execution record indicating failure
                        error!("Failed to deserialize task: {}", e);
                        continue;
                    }
                };
            }
            Ok(None) => {
                continue;
            }
            Err(e) => {
                error!("Error popping task execution: {:?}", e);
                continue;
            }
        }
    }
}