ceylon_core/workspace/
worker_agent.rs

1/*
2 * Copyright 2024-Present, Syigen Ltd. and Syigen Private Limited. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (See LICENSE or http://www.apache.org/licenses/LICENSE-2.0).
4 *
5 */
6
7use crate::workspace::agent::{
8    AgentDetail, ENV_WORKSPACE_ID, ENV_WORKSPACE_IP, ENV_WORKSPACE_PEER, ENV_WORKSPACE_PORT,
9};
10use crate::workspace::message::{AgentMessage, MessageType};
11use crate::{EventHandler, MessageHandler, Processor};
12use futures::future::join_all;
13use sangedama::peer::message::data::{EventType, NodeMessage, NodeMessageTransporter};
14use sangedama::peer::node::{
15    create_key, create_key_from_bytes, get_peer_id, MemberPeer, MemberPeerConfig,
16};
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::fs;
20use std::sync::Arc;
21use tokio::runtime::Handle;
22use tokio::sync::Mutex;
23use tokio::task::JoinHandle;
24use tokio::{select, signal};
25use tokio_util::sync::CancellationToken;
26use tracing::{error, info};
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct WorkerAgentConfig {
30    pub name: String,
31    pub role: String,
32    pub conf_file: Option<String>,
33    pub work_space_id: String,
34    pub admin_peer: String,
35    pub admin_port: u16,
36    pub admin_ip: String,
37    pub buffer_size: u16,
38}
39
40impl WorkerAgentConfig {
41    pub fn update_admin_config(&mut self, config_path: String) {
42        let config_content = fs::read_to_string(config_path).unwrap();
43        let config: HashMap<String, String> = config_content
44            .lines()
45            .filter_map(|line| {
46                let parts: Vec<&str> = line.splitn(2, '=').collect();
47                if parts.len() == 2 {
48                    Some((parts[0].to_string(), parts[1].to_string()))
49                } else {
50                    None
51                }
52            })
53            .collect();
54
55        self.work_space_id = config.get(ENV_WORKSPACE_ID).cloned().unwrap_or_default();
56        self.admin_peer = config.get(ENV_WORKSPACE_PEER).cloned().unwrap_or_default();
57        self.admin_port = config
58            .get(ENV_WORKSPACE_PORT)
59            .and_then(|s| s.parse().ok())
60            .unwrap_or_default();
61        self.admin_ip = config.get(ENV_WORKSPACE_IP).cloned().unwrap_or_default();
62    }
63
64    fn to_str(&self) -> String {
65        format!(
66            "name: {}, role: {}, work_space_id: {}, admin_peer: {}, admin_port: {}, admin_ip: {}, config_file {:?} ",
67            self.name, self.role, self.work_space_id, self.admin_peer, self.admin_port, self.admin_ip, self.conf_file
68        )
69    }
70}
71
72pub struct WorkerAgent {
73    pub config: WorkerAgentConfig,
74
75    _processor: Arc<Mutex<Arc<dyn Processor>>>,
76    _on_message: Arc<Mutex<Arc<dyn MessageHandler>>>,
77    _on_event: Arc<Mutex<Arc<dyn EventHandler>>>,
78
79    pub broadcast_emitter: tokio::sync::mpsc::Sender<NodeMessageTransporter>,
80    pub broadcast_receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<NodeMessageTransporter>>>,
81
82    _peer_id: String,
83    _key: Vec<u8>,
84}
85
86impl WorkerAgent {
87    pub fn new(
88        config: WorkerAgentConfig,
89        on_message: Arc<dyn MessageHandler>,
90        processor: Arc<dyn Processor>,
91        on_event: Arc<dyn EventHandler>,
92    ) -> Self {
93        let (broadcast_emitter, broadcast_receiver) =
94            tokio::sync::mpsc::channel::<NodeMessageTransporter>(2);
95        let admin_peer_key = create_key();
96        let id = get_peer_id(&admin_peer_key).to_string();
97
98        let mut config = config.clone();
99        info!("Config: {}", config.to_str());
100        if config.conf_file.is_some() {
101            let conf_file = config.clone().conf_file.unwrap().clone().to_string();
102            info!(
103                "Checking .ceylon_network config {}",
104                fs::metadata(conf_file.clone()).is_ok()
105            );
106            // check .ceylon_network exists
107            if fs::metadata(conf_file.clone()).is_ok() {
108                config.update_admin_config(conf_file.clone());
109                info!("--------------------------------");
110                info!("Using .ceylon_network config");
111                info!("{} = {}", ENV_WORKSPACE_ID, config.work_space_id);
112                info!("{} = {}", ENV_WORKSPACE_PEER, config.admin_peer);
113                info!("{} = {}", ENV_WORKSPACE_PORT, config.admin_port);
114                info!("{} = {}", ENV_WORKSPACE_IP, config.admin_ip);
115                info!("--------------------------------");
116            }
117        }
118
119        Self {
120            config,
121            _processor: Arc::new(Mutex::new(processor)),
122            _on_message: Arc::new(Mutex::new(on_message)),
123            _on_event: Arc::new(Mutex::new(on_event)),
124
125            broadcast_emitter,
126            broadcast_receiver: Arc::new(Mutex::new(broadcast_receiver)),
127
128            _peer_id: id,
129            _key: admin_peer_key.to_protobuf_encoding().unwrap(),
130        }
131    }
132    pub async fn start(&self, _inputs: Vec<u8>) {
133        let runtime = tokio::runtime::Builder::new_multi_thread()
134            .enable_all()
135            .build()
136            .unwrap();
137
138        let _inputs_ = _inputs.clone();
139        let config = self.config.clone();
140
141        let handle = runtime.handle().clone();
142        let cancel_token = CancellationToken::new();
143        let cancel_token_clone = cancel_token.clone();
144
145        let tasks = self
146            .run_with_config(
147                _inputs_.clone(),
148                config,
149                handle.clone(),
150                cancel_token_clone.clone(),
151            )
152            .await;
153
154        let task_runner = join_all(tasks);
155
156        let name = self.config.name.clone();
157        select! {
158           _ = task_runner => {
159                info!("Agent {} task_runner done ", name);
160            }
161            _ = signal::ctrl_c() => {
162                println!("Agent {:?} received exit signal", name);
163                cancel_token_clone.cancel();
164            }
165        }
166    }
167
168    pub async fn stop(&self) {
169        info!("Agent {} stop called", self.config.name);
170    }
171
172    pub fn details(&self) -> AgentDetail {
173        AgentDetail {
174            name: self.config.name.clone(),
175            id: self._peer_id.clone(),
176            role: self.config.role.clone(),
177        }
178    }
179}
180
181impl WorkerAgent {
182    pub async fn send_direct(&self, to_peer: String, message: Vec<u8>) {
183        let node_message = AgentMessage::create_direct_message(message, to_peer.clone());
184        info!("Sending direct message to {}", to_peer.clone());
185        match self
186            .broadcast_emitter
187            .send((self.details().id, node_message.to_bytes(), Some(to_peer)))
188            .await
189        {
190            Ok(_) => {}
191            Err(e) => {
192                error!("Failed to send direct message: {:?}", e);
193            }
194        }
195    }
196
197    pub async fn broadcast(&self, message: Vec<u8>) {
198        let node_message = AgentMessage::create_broadcast_message(message);
199        match self
200            .broadcast_emitter
201            .send((self.details().id, node_message.to_bytes(), None))
202            .await
203        {
204            Ok(_) => {}
205            Err(e) => {
206                error!("Failed to send broadcast message: {:?}", e);
207            }
208        }
209    }
210    pub async fn run_with_config(
211        &self,
212        inputs: Vec<u8>,
213        worker_agent_config: WorkerAgentConfig,
214        runtime: Handle,
215        cancellation_token: CancellationToken,
216    ) -> Vec<JoinHandle<()>> {
217        info!("Agent {} running", self.config.name);
218
219        let config = worker_agent_config.clone();
220
221        info!("Config {:?}", config.to_str());
222
223        let member_config = MemberPeerConfig::new(
224            config.name.clone(),
225            config.work_space_id.clone(),
226            config.admin_peer.clone(),
227            config.admin_port,
228            config.admin_ip,
229            Some(config.buffer_size as usize),
230        );
231        let peer_key = create_key_from_bytes(self._key.clone());
232        let (mut peer_, mut peer_listener_) =
233            MemberPeer::create(member_config.clone(), peer_key).await;
234        if peer_.id == self._peer_id {
235            info!("Worker peer created {}", peer_.id.clone());
236        } else {
237            panic!("Id mismatch");
238        }
239        let peer_emitter = peer_.emitter();
240
241        let is_request_to_shutdown = false;
242        let cancellation_token_clone = cancellation_token.clone();
243        let task_admin = runtime.spawn(async move {
244            peer_.run(cancellation_token_clone).await;
245        });
246
247        let on_message = self._on_message.clone();
248        let cancellation_token_clone = cancellation_token.clone();
249        let peer_emitter_clone = peer_emitter.clone();
250
251        let agent_details = self.details().clone();
252
253        let on_event = self._on_event.clone();
254        let peer_id = self._peer_id.clone();
255        let task_admin_listener = runtime.spawn(async move {
256            loop {
257                if is_request_to_shutdown {
258                    break;
259                }
260                select! {
261                    _ = cancellation_token_clone.cancelled() => {
262                        break;
263                    }
264                   event = peer_listener_.recv() => {
265                        if let Some(event) = event {
266                            match event {
267                                NodeMessage::Message{ data, created_by, time,..} => {
268                                   let agent_message = AgentMessage::from_bytes(data);
269
270                                    match agent_message {
271                                        AgentMessage::NodeMessage { message,message_type,.. } => {
272                                             match message_type {
273                                                MessageType::Direct { to_peer } => {
274                                                    // Only process if we're the intended recipient
275                                                    if to_peer == peer_id {
276                                                        on_message.lock().await.on_message(
277                                                            created_by,
278                                                            message,
279                                                            time,
280                                                        ).await;
281                                                    }
282                                                }
283                                                MessageType::Broadcast => {
284                                                    on_message.lock().await.on_message(
285                                                        created_by,
286                                                        message,
287                                                        time,
288                                                    ).await;
289                                                }
290                                            }
291                                        }
292                                        AgentMessage::AgentIntroduction { id, name, role, topic } => {
293                                            let agent_detail = AgentDetail{
294                                                name,
295                                                id,
296                                                role
297                                            };
298                                            on_event.lock().await.on_agent_connected(
299                                                topic,
300                                                agent_detail
301                                            ).await;
302                                        }
303                                        _ => {
304                                            info!("Agent listener {:?}", agent_message);
305                                        }
306                                    }
307                                }
308                                NodeMessage::Event {
309                                    event,
310                                    ..
311                                }=>{
312                                   match event{
313                                        EventType::Subscribe{
314                                            topic,
315                                            ..
316                                        }=>{
317                                            info!("Worker {} Subscribed to topic {:?}", agent_details.id, topic);
318                                            let agent_intro_message = AgentMessage::AgentIntroduction {
319                                                id: agent_details.id.clone(),
320                                                name: agent_details.name.clone(),
321                                                role:agent_details.role.clone(),
322                                                topic,
323                                            };
324                                            peer_emitter_clone.send(
325                                                (agent_details.id.clone(),agent_intro_message.to_bytes(),None)
326                                            ).await.unwrap();
327                                        }
328                                        _ => {
329                                            info!("Admin Received Event {:?}", event);
330                                        }
331                                    }
332                                }
333                            }
334                        }
335                    }
336                }
337            }
338        });
339
340        let processor = self._processor.clone();
341        let run_process = runtime.spawn(async move {
342            processor.lock().await.run(inputs).await;
343        });
344
345        let broadcast_receiver = self.broadcast_receiver.clone();
346        let cancellation_token_clone = cancellation_token.clone();
347        let agent_details = self.details().clone();
348        let run_broadcast = runtime.spawn(async move {
349            loop {
350                if is_request_to_shutdown {
351                    break;
352                }
353
354                if cancellation_token_clone.is_cancelled() {
355                    break;
356                } else if let Some(raw_data) = broadcast_receiver.lock().await.recv().await {
357                    peer_emitter
358                        .send(raw_data)
359                        .await
360                        .unwrap();
361                }
362            }
363        });
364
365        vec![task_admin, task_admin_listener, run_process, run_broadcast]
366    }
367}