hdp_worker/
worker.rs

1use crate::proto::worker::common;
2use crate::publisher::Publisher;
3use async_nats::{service::ServiceExt, Client, HeaderMap};
4use bytes::Bytes;
5use futures::StreamExt;
6use prost::Message;
7use std::{collections::HashMap, time::Duration};
8use ulid::Ulid;
9use std::path::PathBuf;
10use std::fs;
11
12#[derive(Clone)]
13pub struct WorkerConfig {
14    pub worker_id: String,
15    pub worker_type: common::worker::Type,
16    pub heartbeat_interval: Duration,
17}
18
19const CONFIG_DIR: &str = "config";
20const WORKER_ID_FILE: &str = "worker_id";
21
22impl WorkerConfig {
23    fn get_worker_id_path() -> PathBuf {
24        let mut path = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
25        path.push(CONFIG_DIR);
26        path.push(WORKER_ID_FILE);
27        path
28    }
29
30    fn ensure_config_dir() -> std::io::Result<()> {
31        let mut path = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
32        path.push(CONFIG_DIR);
33        fs::create_dir_all(path)
34    }
35
36    fn load_or_generate_id() -> String {
37        let _ = Self::ensure_config_dir();
38        let path = Self::get_worker_id_path();
39
40        if let Ok(existing_id) = fs::read_to_string(&path) {
41            if !existing_id.trim().is_empty() {
42                return existing_id.trim().to_string();
43            }
44        }
45
46        let new_id = Ulid::new().to_string();
47        let _ = fs::write(&path, &new_id);
48        new_id
49    }
50
51    pub fn new(worker_type: common::worker::Type) -> Self {
52        Self {
53            worker_id: Self::load_or_generate_id(),
54            worker_type,
55            heartbeat_interval: Duration::from_secs(5),
56        }
57    }
58}
59
60pub struct Worker {
61    client: Client,
62    config: WorkerConfig,
63    publisher: Publisher,
64}
65
66impl Worker {
67    pub fn new(client: Client, config: WorkerConfig) -> Self {
68        let publisher = Publisher::new(client.clone(), config.worker_id.clone());
69        Self {
70            client,
71            config,
72            publisher,
73        }
74    }
75
76    pub fn get_publisher(&self) -> &Publisher {
77        &self.publisher
78    }
79
80    async fn setup_endpoint(&mut self) -> Result<(), String> {
81        let mut metadata = HashMap::new();
82        metadata.insert("worker_id".to_string(), self.config.worker_id.clone());
83
84        let service = self.client
85            .service_builder()
86            .description("HDP Worker Service")
87            .metadata(metadata)
88            .start(
89                "HDP",  // Simple service name with only allowed characters
90                &String::from("1.0.0")
91            )
92            .await
93            .map_err(|e| e.to_string())?;
94
95        let service_group = service.group(format!("worker.{}", format!("{:?}", self.config.worker_type).to_lowercase()));
96        let config_endpoint = service_group.endpoint("config").await.map_err(|e| e.to_string())?;
97
98
99        tokio::spawn(async move {
100            let mut endpoint = config_endpoint;
101            while let Some(request) = endpoint.next().await {
102                println!("Received config update: {:?}", request);
103                let _ = request.respond(Ok(Bytes::new())).await;
104            }
105        });
106
107        Ok(())
108    }
109
110    async fn start_heartbeat(&self) {
111        let client = self.client.clone();
112        let worker_id = self.config.worker_id.clone();
113        let interval = self.config.heartbeat_interval;
114
115        tokio::spawn(async move {
116            let mut interval = tokio::time::interval(interval);
117            loop {
118                interval.tick().await;
119                let mut headers = HeaderMap::new();
120                headers.insert("worker_id", worker_id.as_str());
121
122                if let Err(e) = client
123                    .request_with_headers("worker.core.heartbeat", headers, Bytes::new())
124                    .await
125                {
126                    println!("Failed to send heartbeat: {}", e);
127                }
128            }
129        });
130    }
131
132    async fn register(&self) -> Result<(), String> {
133        let registration = common::Worker {
134            id: self.config.worker_id.clone(),
135            name: None,
136            r#type: self.config.worker_type as i32,
137            inventory: None,
138            last_seen: None,
139        };
140        let data = registration.encode_to_vec();
141        self
142            .client
143            .request("worker.core.register", data.into())
144            .await
145            .map_err(|e| e.to_string())?;
146        println!("Registered with server");
147        Ok(())
148    }
149
150    pub async fn setup(&mut self) -> Result<(), String> {
151        println!("Setting up endpoints...");
152        self.setup_endpoint().await?;
153        println!("Registering worker...");
154        self.register().await?;
155        println!("Starting heartbeat...");
156        self.start_heartbeat().await;
157        Ok(())
158    }
159
160    pub fn get_client(&self) -> Client {
161        self.client.clone()
162    }
163
164    pub fn get_config(&self) -> &WorkerConfig {
165        &self.config
166    }
167}