1use crate::proto::worker::common;
2use crate::publisher::Publisher;
3use async_nats::{service::ServiceExt, Client, HeaderMap};
4use bytes::Bytes;
5use futures::StreamExt;
6use prost::Message;
7use std::{collections::HashMap, time::Duration};
8use ulid::Ulid;
9use std::path::PathBuf;
10use std::fs;
11
12#[derive(Clone)]
13pub struct WorkerConfig {
14 pub worker_id: String,
15 pub worker_type: common::worker::Type,
16 pub heartbeat_interval: Duration,
17}
18
19const CONFIG_DIR: &str = "config";
20const WORKER_ID_FILE: &str = "worker_id";
21
22impl WorkerConfig {
23 fn get_worker_id_path() -> PathBuf {
24 let mut path = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
25 path.push(CONFIG_DIR);
26 path.push(WORKER_ID_FILE);
27 path
28 }
29
30 fn ensure_config_dir() -> std::io::Result<()> {
31 let mut path = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
32 path.push(CONFIG_DIR);
33 fs::create_dir_all(path)
34 }
35
36 fn load_or_generate_id() -> String {
37 let _ = Self::ensure_config_dir();
38 let path = Self::get_worker_id_path();
39
40 if let Ok(existing_id) = fs::read_to_string(&path) {
41 if !existing_id.trim().is_empty() {
42 return existing_id.trim().to_string();
43 }
44 }
45
46 let new_id = Ulid::new().to_string();
47 let _ = fs::write(&path, &new_id);
48 new_id
49 }
50
51 pub fn new(worker_type: common::worker::Type) -> Self {
52 Self {
53 worker_id: Self::load_or_generate_id(),
54 worker_type,
55 heartbeat_interval: Duration::from_secs(5),
56 }
57 }
58}
59
60pub struct Worker {
61 client: Client,
62 config: WorkerConfig,
63 publisher: Publisher,
64}
65
66impl Worker {
67 pub fn new(client: Client, config: WorkerConfig) -> Self {
68 let publisher = Publisher::new(client.clone(), config.worker_id.clone());
69 Self {
70 client,
71 config,
72 publisher,
73 }
74 }
75
76 pub fn get_publisher(&self) -> &Publisher {
77 &self.publisher
78 }
79
80 async fn setup_endpoint(&mut self) -> Result<(), String> {
81 let mut metadata = HashMap::new();
82 metadata.insert("worker_id".to_string(), self.config.worker_id.clone());
83
84 let service = self.client
85 .service_builder()
86 .description("HDP Worker Service")
87 .metadata(metadata)
88 .start(
89 "HDP", &String::from("1.0.0")
91 )
92 .await
93 .map_err(|e| e.to_string())?;
94
95 let service_group = service.group(format!("worker.{}", format!("{:?}", self.config.worker_type).to_lowercase()));
96 let config_endpoint = service_group.endpoint("config").await.map_err(|e| e.to_string())?;
97
98
99 tokio::spawn(async move {
100 let mut endpoint = config_endpoint;
101 while let Some(request) = endpoint.next().await {
102 println!("Received config update: {:?}", request);
103 let _ = request.respond(Ok(Bytes::new())).await;
104 }
105 });
106
107 Ok(())
108 }
109
110 async fn start_heartbeat(&self) {
111 let client = self.client.clone();
112 let worker_id = self.config.worker_id.clone();
113 let interval = self.config.heartbeat_interval;
114
115 tokio::spawn(async move {
116 let mut interval = tokio::time::interval(interval);
117 loop {
118 interval.tick().await;
119 let mut headers = HeaderMap::new();
120 headers.insert("worker_id", worker_id.as_str());
121
122 if let Err(e) = client
123 .request_with_headers("worker.core.heartbeat", headers, Bytes::new())
124 .await
125 {
126 println!("Failed to send heartbeat: {}", e);
127 }
128 }
129 });
130 }
131
132 async fn register(&self) -> Result<(), String> {
133 let registration = common::Worker {
134 id: self.config.worker_id.clone(),
135 name: None,
136 r#type: self.config.worker_type as i32,
137 inventory: None,
138 last_seen: None,
139 };
140 let data = registration.encode_to_vec();
141 self
142 .client
143 .request("worker.core.register", data.into())
144 .await
145 .map_err(|e| e.to_string())?;
146 println!("Registered with server");
147 Ok(())
148 }
149
150 pub async fn setup(&mut self) -> Result<(), String> {
151 println!("Setting up endpoints...");
152 self.setup_endpoint().await?;
153 println!("Registering worker...");
154 self.register().await?;
155 println!("Starting heartbeat...");
156 self.start_heartbeat().await;
157 Ok(())
158 }
159
160 pub fn get_client(&self) -> Client {
161 self.client.clone()
162 }
163
164 pub fn get_config(&self) -> &WorkerConfig {
165 &self.config
166 }
167}