use crate::proto::worker::registration;
use crate::publisher::Publisher;
use async_nats::{service::ServiceExt, Client, HeaderMap};
use bytes::Bytes;
use futures::StreamExt;
use prost::Message;
use std::{collections::HashMap, time::Duration};
use ulid::Ulid;
#[derive(Clone)]
pub struct WorkerConfig {
pub worker_id: String,
pub worker_type: registration::WorkerType,
pub heartbeat_interval: Duration,
}
impl WorkerConfig {
pub fn new(worker_type: registration::WorkerType) -> Self {
Self {
worker_id: Ulid::new().to_string(),
worker_type,
heartbeat_interval: Duration::from_secs(5),
}
}
}
pub struct Worker {
client: Client,
config: WorkerConfig,
publisher: Publisher,
}
impl Worker {
pub fn new(client: Client, config: WorkerConfig) -> Self {
let publisher = Publisher::new(client.clone(), config.worker_id.clone());
Self {
client,
config,
publisher,
}
}
pub fn get_publisher(&self) -> &Publisher {
&self.publisher
}
async fn setup_endpoint(&mut self) -> Result<(), String> {
let mut metadata = HashMap::new();
metadata.insert("worker_id".to_string(), self.config.worker_id.clone());
let service = self.client
.service_builder()
.description("HDP Worker Service")
.metadata(metadata)
.start(
"HDP", &String::from("1.0.0")
)
.await
.map_err(|e| e.to_string())?;
let service_group = service.group(format!("worker.{}", format!("{:?}", self.config.worker_type).to_lowercase()));
let config_endpoint = service_group.endpoint("config").await.map_err(|e| e.to_string())?;
tokio::spawn(async move {
let mut endpoint = config_endpoint;
while let Some(request) = endpoint.next().await {
println!("Received config update: {:?}", request);
let _ = request.respond(Ok(Bytes::new())).await;
}
});
Ok(())
}
async fn start_heartbeat(&self) {
let client = self.client.clone();
let worker_id = self.config.worker_id.clone();
let interval = self.config.heartbeat_interval;
tokio::spawn(async move {
let mut interval = tokio::time::interval(interval);
loop {
interval.tick().await;
let mut headers = HeaderMap::new();
headers.insert("worker_id", worker_id.as_str());
if let Err(e) = client
.request_with_headers("HDP.worker.core.heartbeat", headers, Bytes::new())
.await
{
println!("Failed to send heartbeat: {}", e);
}
}
});
}
async fn register(&self) -> Result<(), String> {
let registration = registration::Payload {
id: self.config.worker_id.clone(),
r#type: self.config.worker_type as i32,
inventory: None,
};
let data = registration.encode_to_vec();
self
.client
.request("HDP.worker.core.register", data.into())
.await
.map_err(|e| e.to_string())?;
println!("Registered with server");
Ok(())
}
pub async fn setup(&mut self) -> Result<(), String> {
println!("Setting up endpoints...");
self.setup_endpoint().await?;
println!("Registering worker...");
self.register().await?;
println!("Starting heartbeat...");
self.start_heartbeat().await;
Ok(())
}
pub fn get_client(&self) -> Client {
self.client.clone()
}
pub fn get_config(&self) -> &WorkerConfig {
&self.config
}
}