hdp_worker/
worker.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use async_nats::{Client, service::ServiceExt, HeaderMap};
use bytes::Bytes;
use futures::StreamExt;
use hdp_protobufs::hdp::worker::registration;
use prost::Message;
use std::{collections::HashMap, time::Duration};
use crate::error::WorkerError;
use crate::publisher::Publisher;  // Add this import

#[derive(Clone)]
pub struct WorkerConfig {
    pub worker_id: String,
    pub worker_type: registration::WorkerType,
    pub heartbeat_interval: Duration,
}

impl Default for WorkerConfig {
    fn default() -> Self {
        Self {
            worker_id: String::new(),
            worker_type: registration::WorkerType::Weather,
            heartbeat_interval: Duration::from_secs(5),
        }
    }
}

pub struct Worker {
    client: Client,
    config: WorkerConfig,
    publisher: Publisher,
}


impl Worker {
    pub fn new(client: Client, config: WorkerConfig) -> Self {
        let publisher = Publisher::new(client.clone(), config.worker_id.clone());
        Self {
            client,
            config,
            publisher,
        }
    }

    pub fn get_publisher(&self) -> &Publisher {
            &self.publisher
        }

    async fn setup_endpoint(&mut self) -> Result<(), WorkerError> {
        let mut metadata = HashMap::new();
        metadata.insert("worker_id".to_string(), self.config.worker_id.clone());

        let service = self.client
            .service_builder()
            .description("HDP Worker Service")
            .metadata(metadata)
            .start(self.config.worker_id.clone(), "1.0.0".to_string())
            .await
            .map_err(|e| WorkerError::SetupError(e.to_string()))?;

        let config_endpoint = service.endpoint("config").await
            .map_err(|e| WorkerError::SetupError(e.to_string()))?;

        tokio::spawn(async move {
            let mut endpoint = config_endpoint;
            while let Some(request) = endpoint.next().await {
                println!("Received config update: {:?}", request);
                let _ = request.respond(Ok(Bytes::new())).await;
            }
        });

        Ok(())
    }

    async fn start_heartbeat(&self) {
        let client = self.client.clone();
        let worker_id = self.config.worker_id.clone();
        let interval = self.config.heartbeat_interval;

        tokio::spawn(async move {
            let mut interval = tokio::time::interval(interval);
            loop {
                interval.tick().await;
                let mut headers = HeaderMap::new();
                headers.insert("worker_id", worker_id.as_str());

                if let Err(e) = client
                    .request_with_headers("HDP.worker.heartbeat", headers, Bytes::new())
                    .await
                {
                    println!("Failed to send heartbeat: {}", e);
                }
            }
        });
    }

    async fn register(&self) -> Result<(), WorkerError> {
        let registration = registration::Payload {
            id: self.config.worker_id.clone(),
            r#type: self.config.worker_type as i32,
            inventory: None,
        };

        let data = registration.encode_to_vec();
        let response = self.client
            .request("HDP.worker.register", data.into())
            .await
            .map_err(|e| WorkerError::NatsError(e.to_string()))?;

        println!("Registration response: {:?}", response);
        Ok(())
    }

    pub async fn setup(&mut self) -> Result<(), WorkerError> {
        println!("Setting up endpoints...");
        self.setup_endpoint().await?;

        println!("Registering worker...");
        self.register().await?;

        println!("Starting heartbeat...");
        self.start_heartbeat().await;

        println!("Worker base setup complete!");
        Ok(())
    }

    pub fn get_client(&self) -> Client {
        self.client.clone()
    }

    pub fn get_config(&self) -> &WorkerConfig {
        &self.config
    }
}