use std::time::Instant;
use log::{error, info};
use tokio::sync::mpsc::{
error::{SendError, TryRecvError},
Receiver, Sender,
};
use ulid::Ulid;
use crate::message_queue::UpdateExecutionRecordQuery;
use crate::task::{TaskExecutionData, TaskResultData, TaskStatus};
#[derive(PartialEq, Debug)]
pub enum WorkerState {
Idle,
Running,
}
#[derive(Debug)]
pub struct WorkerStatusReport {
pub id: Ulid,
pub worker_state: WorkerState,
}
#[derive(Debug)]
pub struct WorkerStateMachine {
pub task_tx: Sender<TaskExecutionData>,
pub worker_state: WorkerState,
}
pub struct Worker {
id: Ulid,
pub name: String,
task_rx: Receiver<TaskExecutionData>,
worker_state_tx: Sender<WorkerStatusReport>,
task_update_tx: Sender<UpdateExecutionRecordQuery>,
}
impl Worker {
pub fn new(
name: String,
task_rx: tokio::sync::mpsc::Receiver<TaskExecutionData>,
status_report_tx: Sender<WorkerStatusReport>,
task_update_tx: Sender<UpdateExecutionRecordQuery>,
) -> Worker {
Worker {
id: Ulid::new(),
name: name,
task_rx,
worker_state_tx: status_report_tx,
task_update_tx,
}
}
pub async fn work(&mut self) {
loop {
let task_request = match self.task_rx.try_recv() {
Ok(task_request) => {
info!(
"[{}] received task request: {}",
self.name().to_uppercase(),
task_request.execution_id
);
task_request
}
Err(e) => match e {
TryRecvError::Empty => continue,
TryRecvError::Disconnected => {
error!(
"[{}] Task request channel closed!",
self.name().to_uppercase()
);
break;
}
},
};
if let Err(e) = self.send_worker_update(WorkerState::Running).await {
error!(
"Failed to update worker status: {}, stopping worker!",
e.to_string()
);
break;
}
if let Err(e) = self
.send_task_update(UpdateExecutionRecordQuery {
request_id: task_request.request_id.clone(),
execution_id: task_request.execution_id.clone(),
duration_seconds: 0,
is_complete: false,
status: TaskStatus::Running as i32,
result: String::from(""),
})
.await
{
error!("Failed to update task execution record: {:?}", e);
break;
};
let now = Instant::now();
let TaskResultData {
result_status,
result_data,
} = task_request.task.execute();
let elapsed = now.elapsed();
info!(
"[{}] Task execution {} complete in: {:.2?}",
self.name().to_uppercase(),
task_request.execution_id,
elapsed
);
if let Err(e) = self
.send_task_update(UpdateExecutionRecordQuery {
request_id: task_request.request_id.clone(),
execution_id: task_request.execution_id.clone(),
duration_seconds: elapsed.as_secs() as u64,
is_complete: true,
status: result_status.clone() as i32,
result: result_data.unwrap_or_default().to_string(),
})
.await
{
error!("Failed to update task execution record: {:?}", e);
break;
};
if let Err(e) = self.send_worker_update(WorkerState::Idle).await {
error!(
"Failed to update worker status: {}, stopping worker!",
e.to_string()
);
break;
}
}
}
async fn send_worker_update(&self, worker_state: WorkerState) -> Result<(), SendError<WorkerStatusReport>> {
self.worker_state_tx
.send(WorkerStatusReport {
id: self.id(),
worker_state: worker_state,
})
.await
}
async fn send_task_update(
&self,
task_update: UpdateExecutionRecordQuery,
) -> Result<(), SendError<UpdateExecutionRecordQuery>> {
self.task_update_tx.send(task_update).await
}
pub fn id(&self) -> Ulid {
self.id.clone()
}
pub fn name(&self) -> String {
self.name.clone()
}
}