subliminal 0.0.4

Base crate for subliminal microservices project
Documentation
use std::time::Instant;

use log::{error, info};
use tokio::sync::mpsc::{
    error::{SendError, TryRecvError},
    Receiver, Sender,
};
use ulid::Ulid;

use crate::message_queue::UpdateExecutionRecordQuery;

use crate::task::{TaskExecutionData, TaskResultData, TaskStatus};

#[derive(PartialEq, Debug)]
pub enum WorkerState {
    Idle,
    Running,
}

#[derive(Debug)]
pub struct WorkerStatusReport {
    pub id: Ulid,
    pub worker_state: WorkerState,
}

#[derive(Debug)]
pub struct WorkerStateMachine {
    pub task_tx: Sender<TaskExecutionData>,
    pub worker_state: WorkerState,
}


/// Represents a worker that takes tasks from the dispatcher and executes them, reporting relevant updates back to the dispatcher.
/// Can have one or more workers per node.
pub struct Worker {
    // ID of the worker
    id: Ulid,

    // Name of the worker
    pub name: String,

    // Channel to receive tasks from the dispatcher
    task_rx: Receiver<TaskExecutionData>,

    // Channel to send worker status updates to the dispatcher
    worker_state_tx: Sender<WorkerStatusReport>,

    // Channel to send task execution updates to the dispatcher
    task_update_tx: Sender<UpdateExecutionRecordQuery>,
}

impl Worker {
    /// Create a new worker
    pub fn new(
        name: String,
        task_rx: tokio::sync::mpsc::Receiver<TaskExecutionData>,
        status_report_tx: Sender<WorkerStatusReport>,
        task_update_tx: Sender<UpdateExecutionRecordQuery>,
    ) -> Worker {
        Worker {
            id: Ulid::new(),
            name: name,
            task_rx,
            worker_state_tx: status_report_tx,
            task_update_tx,
        }
    }

    /// Main worker loop. Waits for a job, executes it, and reports back to the dispatcher
    pub async fn work(&mut self) {
        loop {
            // Wait for a job
            let task_request = match self.task_rx.try_recv() {
                Ok(task_request) => {
                    info!(
                        "[{}] received task request: {}",
                        self.name().to_uppercase(),
                        task_request.execution_id
                    );
                    task_request
                }
                Err(e) => match e {
                    TryRecvError::Empty => continue,
                    TryRecvError::Disconnected => {
                        error!(
                            "[{}] Task request channel closed!",
                            self.name().to_uppercase()
                        );
                        break;
                    }
                },
            };

            // Update the worker status to RUNNING using the dispatcher's channel
            if let Err(e) = self.send_worker_update(WorkerState::Running).await {
                error!(
                    "Failed to update worker status: {}, stopping worker!",
                    e.to_string()
                );
                break;
            }

            // We update the task execution record independently of the worker status through a separate channel
            if let Err(e) = self
                .send_task_update(UpdateExecutionRecordQuery {
                    request_id: task_request.request_id.clone(),
                    execution_id: task_request.execution_id.clone(),
                    duration_seconds: 0,
                    is_complete: false,
                    status: TaskStatus::Running as i32,
                    result: String::from(""),
                })
                .await
            {
                error!("Failed to update task execution record: {:?}", e);
                break;
            };

            // Execute the job and get the result
            let now = Instant::now();
            let TaskResultData {
                result_status,
                result_data,
            } = task_request.task.execute();
            let elapsed = now.elapsed();
            info!(
                "[{}] Task execution {} complete in: {:.2?}",
                self.name().to_uppercase(),
                task_request.execution_id,
                elapsed
            );

            // Update the task execution record with the result
            // TODO: We could let the task execute() function update the record itself,
            //    but that would require the task to have access to the channel. TBD.
            if let Err(e) = self
                .send_task_update(UpdateExecutionRecordQuery {
                    request_id: task_request.request_id.clone(),
                    execution_id: task_request.execution_id.clone(),
                    duration_seconds: elapsed.as_secs() as u64,
                    is_complete: true,
                    status: result_status.clone() as i32,
                    result: result_data.unwrap_or_default().to_string(),
                })
                .await
            {
                error!("Failed to update task execution record: {:?}", e);
                break;
            };

            // Set the worker back to idle
            if let Err(e) = self.send_worker_update(WorkerState::Idle).await {
                error!(
                    "Failed to update worker status: {}, stopping worker!",
                    e.to_string()
                );
                break;
            }
        }
    }

    /// Send the new worker state report to the dispatcher
    async fn send_worker_update(&self, worker_state: WorkerState) -> Result<(), SendError<WorkerStatusReport>> {
        self.worker_state_tx
            .send(WorkerStatusReport {
                id: self.id(),
                worker_state: worker_state,
            })
            .await
    }

    /// Send a task update to the dispatcher
    async fn send_task_update(
        &self,
        task_update: UpdateExecutionRecordQuery,
    ) -> Result<(), SendError<UpdateExecutionRecordQuery>> {
        self.task_update_tx.send(task_update).await
    }

    /// Get the worker ID
    pub fn id(&self) -> Ulid {
        self.id.clone()
    }

    /// Get the worker name
    pub fn name(&self) -> String {
        self.name.clone()
    }
}