1use crate::workspace::agent::{
8 AgentDetail, ENV_WORKSPACE_ID, ENV_WORKSPACE_IP, ENV_WORKSPACE_PEER, ENV_WORKSPACE_PORT,
9};
10use crate::workspace::message::{AgentMessage, MessageType};
11use crate::{EventHandler, MessageHandler, Processor};
12use futures::future::join_all;
13use sangedama::peer::message::data::{EventType, NodeMessage, NodeMessageTransporter};
14use sangedama::peer::node::{
15 create_key, create_key_from_bytes, get_peer_id, MemberPeer, MemberPeerConfig,
16};
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::fs;
20use std::sync::Arc;
21use tokio::runtime::Handle;
22use tokio::sync::Mutex;
23use tokio::task::JoinHandle;
24use tokio::{select, signal};
25use tokio_util::sync::CancellationToken;
26use tracing::{error, info};
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct WorkerAgentConfig {
30 pub name: String,
31 pub role: String,
32 pub conf_file: Option<String>,
33 pub work_space_id: String,
34 pub admin_peer: String,
35 pub admin_port: u16,
36 pub admin_ip: String,
37 pub buffer_size: u16,
38}
39
40impl WorkerAgentConfig {
41 pub fn update_admin_config(&mut self, config_path: String) {
42 let config_content = fs::read_to_string(config_path).unwrap();
43 let config: HashMap<String, String> = config_content
44 .lines()
45 .filter_map(|line| {
46 let parts: Vec<&str> = line.splitn(2, '=').collect();
47 if parts.len() == 2 {
48 Some((parts[0].to_string(), parts[1].to_string()))
49 } else {
50 None
51 }
52 })
53 .collect();
54
55 self.work_space_id = config.get(ENV_WORKSPACE_ID).cloned().unwrap_or_default();
56 self.admin_peer = config.get(ENV_WORKSPACE_PEER).cloned().unwrap_or_default();
57 self.admin_port = config
58 .get(ENV_WORKSPACE_PORT)
59 .and_then(|s| s.parse().ok())
60 .unwrap_or_default();
61 self.admin_ip = config.get(ENV_WORKSPACE_IP).cloned().unwrap_or_default();
62 }
63
64 fn to_str(&self) -> String {
65 format!(
66 "name: {}, role: {}, work_space_id: {}, admin_peer: {}, admin_port: {}, admin_ip: {}, config_file {:?} ",
67 self.name, self.role, self.work_space_id, self.admin_peer, self.admin_port, self.admin_ip, self.conf_file
68 )
69 }
70}
71
72pub struct WorkerAgent {
73 pub config: WorkerAgentConfig,
74
75 _processor: Arc<Mutex<Arc<dyn Processor>>>,
76 _on_message: Arc<Mutex<Arc<dyn MessageHandler>>>,
77 _on_event: Arc<Mutex<Arc<dyn EventHandler>>>,
78
79 pub broadcast_emitter: tokio::sync::mpsc::Sender<NodeMessageTransporter>,
80 pub broadcast_receiver: Arc<Mutex<tokio::sync::mpsc::Receiver<NodeMessageTransporter>>>,
81
82 _peer_id: String,
83 _key: Vec<u8>,
84}
85
86impl WorkerAgent {
87 pub fn new(
88 config: WorkerAgentConfig,
89 on_message: Arc<dyn MessageHandler>,
90 processor: Arc<dyn Processor>,
91 on_event: Arc<dyn EventHandler>,
92 ) -> Self {
93 let (broadcast_emitter, broadcast_receiver) =
94 tokio::sync::mpsc::channel::<NodeMessageTransporter>(2);
95 let admin_peer_key = create_key();
96 let id = get_peer_id(&admin_peer_key).to_string();
97
98 let mut config = config.clone();
99 info!("Config: {}", config.to_str());
100 if config.conf_file.is_some() {
101 let conf_file = config.clone().conf_file.unwrap().clone().to_string();
102 info!(
103 "Checking .ceylon_network config {}",
104 fs::metadata(conf_file.clone()).is_ok()
105 );
106 if fs::metadata(conf_file.clone()).is_ok() {
108 config.update_admin_config(conf_file.clone());
109 info!("--------------------------------");
110 info!("Using .ceylon_network config");
111 info!("{} = {}", ENV_WORKSPACE_ID, config.work_space_id);
112 info!("{} = {}", ENV_WORKSPACE_PEER, config.admin_peer);
113 info!("{} = {}", ENV_WORKSPACE_PORT, config.admin_port);
114 info!("{} = {}", ENV_WORKSPACE_IP, config.admin_ip);
115 info!("--------------------------------");
116 }
117 }
118
119 Self {
120 config,
121 _processor: Arc::new(Mutex::new(processor)),
122 _on_message: Arc::new(Mutex::new(on_message)),
123 _on_event: Arc::new(Mutex::new(on_event)),
124
125 broadcast_emitter,
126 broadcast_receiver: Arc::new(Mutex::new(broadcast_receiver)),
127
128 _peer_id: id,
129 _key: admin_peer_key.to_protobuf_encoding().unwrap(),
130 }
131 }
132 pub async fn start(&self, _inputs: Vec<u8>) {
133 let runtime = tokio::runtime::Builder::new_multi_thread()
134 .enable_all()
135 .build()
136 .unwrap();
137
138 let _inputs_ = _inputs.clone();
139 let config = self.config.clone();
140
141 let handle = runtime.handle().clone();
142 let cancel_token = CancellationToken::new();
143 let cancel_token_clone = cancel_token.clone();
144
145 let tasks = self
146 .run_with_config(
147 _inputs_.clone(),
148 config,
149 handle.clone(),
150 cancel_token_clone.clone(),
151 )
152 .await;
153
154 let task_runner = join_all(tasks);
155
156 let name = self.config.name.clone();
157 select! {
158 _ = task_runner => {
159 info!("Agent {} task_runner done ", name);
160 }
161 _ = signal::ctrl_c() => {
162 println!("Agent {:?} received exit signal", name);
163 cancel_token_clone.cancel();
164 }
165 }
166 }
167
168 pub async fn stop(&self) {
169 info!("Agent {} stop called", self.config.name);
170 }
171
172 pub fn details(&self) -> AgentDetail {
173 AgentDetail {
174 name: self.config.name.clone(),
175 id: self._peer_id.clone(),
176 role: self.config.role.clone(),
177 }
178 }
179}
180
181impl WorkerAgent {
182 pub async fn send_direct(&self, to_peer: String, message: Vec<u8>) {
183 let node_message = AgentMessage::create_direct_message(message, to_peer.clone());
184 info!("Sending direct message to {}", to_peer.clone());
185 match self
186 .broadcast_emitter
187 .send((self.details().id, node_message.to_bytes(), Some(to_peer)))
188 .await
189 {
190 Ok(_) => {}
191 Err(e) => {
192 error!("Failed to send direct message: {:?}", e);
193 }
194 }
195 }
196
197 pub async fn broadcast(&self, message: Vec<u8>) {
198 let node_message = AgentMessage::create_broadcast_message(message);
199 match self
200 .broadcast_emitter
201 .send((self.details().id, node_message.to_bytes(), None))
202 .await
203 {
204 Ok(_) => {}
205 Err(e) => {
206 error!("Failed to send broadcast message: {:?}", e);
207 }
208 }
209 }
210 pub async fn run_with_config(
211 &self,
212 inputs: Vec<u8>,
213 worker_agent_config: WorkerAgentConfig,
214 runtime: Handle,
215 cancellation_token: CancellationToken,
216 ) -> Vec<JoinHandle<()>> {
217 info!("Agent {} running", self.config.name);
218
219 let config = worker_agent_config.clone();
220
221 info!("Config {:?}", config.to_str());
222
223 let member_config = MemberPeerConfig::new(
224 config.name.clone(),
225 config.work_space_id.clone(),
226 config.admin_peer.clone(),
227 config.admin_port,
228 config.admin_ip,
229 Some(config.buffer_size as usize),
230 );
231 let peer_key = create_key_from_bytes(self._key.clone());
232 let (mut peer_, mut peer_listener_) =
233 MemberPeer::create(member_config.clone(), peer_key).await;
234 if peer_.id == self._peer_id {
235 info!("Worker peer created {}", peer_.id.clone());
236 } else {
237 panic!("Id mismatch");
238 }
239 let peer_emitter = peer_.emitter();
240
241 let is_request_to_shutdown = false;
242 let cancellation_token_clone = cancellation_token.clone();
243 let task_admin = runtime.spawn(async move {
244 peer_.run(cancellation_token_clone).await;
245 });
246
247 let on_message = self._on_message.clone();
248 let cancellation_token_clone = cancellation_token.clone();
249 let peer_emitter_clone = peer_emitter.clone();
250
251 let agent_details = self.details().clone();
252
253 let on_event = self._on_event.clone();
254 let peer_id = self._peer_id.clone();
255 let task_admin_listener = runtime.spawn(async move {
256 loop {
257 if is_request_to_shutdown {
258 break;
259 }
260 select! {
261 _ = cancellation_token_clone.cancelled() => {
262 break;
263 }
264 event = peer_listener_.recv() => {
265 if let Some(event) = event {
266 match event {
267 NodeMessage::Message{ data, created_by, time,..} => {
268 let agent_message = AgentMessage::from_bytes(data);
269
270 match agent_message {
271 AgentMessage::NodeMessage { message,message_type,.. } => {
272 match message_type {
273 MessageType::Direct { to_peer } => {
274 if to_peer == peer_id {
276 on_message.lock().await.on_message(
277 created_by,
278 message,
279 time,
280 ).await;
281 }
282 }
283 MessageType::Broadcast => {
284 on_message.lock().await.on_message(
285 created_by,
286 message,
287 time,
288 ).await;
289 }
290 }
291 }
292 AgentMessage::AgentIntroduction { id, name, role, topic } => {
293 let agent_detail = AgentDetail{
294 name,
295 id,
296 role
297 };
298 on_event.lock().await.on_agent_connected(
299 topic,
300 agent_detail
301 ).await;
302 }
303 _ => {
304 info!("Agent listener {:?}", agent_message);
305 }
306 }
307 }
308 NodeMessage::Event {
309 event,
310 ..
311 }=>{
312 match event{
313 EventType::Subscribe{
314 topic,
315 ..
316 }=>{
317 info!("Worker {} Subscribed to topic {:?}", agent_details.id, topic);
318 let agent_intro_message = AgentMessage::AgentIntroduction {
319 id: agent_details.id.clone(),
320 name: agent_details.name.clone(),
321 role:agent_details.role.clone(),
322 topic,
323 };
324 peer_emitter_clone.send(
325 (agent_details.id.clone(),agent_intro_message.to_bytes(),None)
326 ).await.unwrap();
327 }
328 _ => {
329 info!("Admin Received Event {:?}", event);
330 }
331 }
332 }
333 }
334 }
335 }
336 }
337 }
338 });
339
340 let processor = self._processor.clone();
341 let run_process = runtime.spawn(async move {
342 processor.lock().await.run(inputs).await;
343 });
344
345 let broadcast_receiver = self.broadcast_receiver.clone();
346 let cancellation_token_clone = cancellation_token.clone();
347 let agent_details = self.details().clone();
348 let run_broadcast = runtime.spawn(async move {
349 loop {
350 if is_request_to_shutdown {
351 break;
352 }
353
354 if cancellation_token_clone.is_cancelled() {
355 break;
356 } else if let Some(raw_data) = broadcast_receiver.lock().await.recv().await {
357 peer_emitter
358 .send(raw_data)
359 .await
360 .unwrap();
361 }
362 }
363 });
364
365 vec![task_admin, task_admin_listener, run_process, run_broadcast]
366 }
367}