ceylon_core/workspace/
admin_agent.rs

1/*
2 * Copyright 2024-Present, Syigen Ltd. and Syigen Private Limited. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (See LICENSE or http://www.apache.org/licenses/LICENSE-2.0).
4 *
5 */
6
7use std::collections::HashMap;
8
9use std::sync::Arc;
10use std::time::SystemTime;
11
12use futures::future::join_all;
13use tokio::io::AsyncReadExt;
14use tokio::sync::{mpsc, Mutex, RwLock};
15use tokio::{select, signal};
16use tokio_util::sync::CancellationToken;
17use tracing::{error, info};
18
19use crate::workspace::agent::{
20    AgentDetail, EventHandler, ENV_WORKSPACE_ID, ENV_WORKSPACE_IP, ENV_WORKSPACE_PEER,
21    ENV_WORKSPACE_PORT,
22};
23use crate::workspace::message::{AgentMessage, MessageType};
24use crate::{utils, MessageHandler, Processor, WorkerAgent};
25use sangedama::peer::message::data::{EventType, NodeMessage, NodeMessageTransporter};
26use sangedama::peer::node::{
27    create_key, create_key_from_bytes, get_peer_id, AdminPeer, AdminPeerConfig,
28};
29
30#[derive(Clone)]
31pub struct AdminAgentConfig {
32    pub name: String,
33    pub port: u16,
34    pub buffer_size: u16,
35}
36
37pub struct AdminAgent {
38    pub config: AdminAgentConfig,
39
40    _processor: Arc<Mutex<Arc<dyn Processor>>>,
41    _on_message: Arc<Mutex<Arc<dyn MessageHandler>>>,
42    _on_event: Arc<Mutex<Arc<dyn EventHandler>>>,
43
44    pub broadcast_emitter: mpsc::Sender<NodeMessageTransporter>,
45    pub broadcast_receiver: Arc<Mutex<mpsc::Receiver<NodeMessageTransporter>>>,
46
47    _peer_id: String,
48
49    _key: Vec<u8>,
50
51    pub shutdown_send: mpsc::UnboundedSender<String>,
52    pub shutdown_recv: Arc<Mutex<mpsc::UnboundedReceiver<String>>>,
53    _worker_details: Vec<AgentDetail>,
54}
55
56impl AdminAgent {
57    pub fn new(
58        config: AdminAgentConfig,
59        on_message: Arc<dyn MessageHandler>,
60        processor: Arc<dyn Processor>,
61        on_event: Arc<dyn EventHandler>,
62    ) -> Self {
63        let (broadcast_emitter, broadcast_receiver) = mpsc::channel::<NodeMessageTransporter>(2);
64
65        let admin_peer_key = create_key();
66        let id = get_peer_id(&admin_peer_key).to_string();
67
68        let (shutdown_send, shutdown_recv) = mpsc::unbounded_channel::<String>();
69
70        Self {
71            config,
72            _on_message: Arc::new(Mutex::new(on_message)),
73            _processor: Arc::new(Mutex::new(processor)),
74            _on_event: Arc::new(Mutex::new(on_event)),
75
76            broadcast_emitter,
77            broadcast_receiver: Arc::new(Mutex::new(broadcast_receiver)),
78            _peer_id: id,
79
80            _key: admin_peer_key.to_protobuf_encoding().unwrap(),
81
82            shutdown_send,
83            shutdown_recv: Arc::new(Mutex::new(shutdown_recv)),
84
85            _worker_details: Vec::new(),
86        }
87    }
88
89    pub async fn send_direct(&self, to_peer: String, message: Vec<u8>) {
90        let node_message = AgentMessage::create_direct_message(message, to_peer.clone());
91        info!("Sending direct message to {}", to_peer.clone());
92        match self
93            .broadcast_emitter
94            .send((self.details().id, node_message.to_bytes(), Some(to_peer)))
95            .await
96        {
97            Ok(_) => {}
98            Err(e) => {
99                error!("Failed to send direct message: {:?}", e);
100            }
101        }
102    }
103
104    pub async fn broadcast(&self, message: Vec<u8>) {
105        let node_message = AgentMessage::create_broadcast_message(message);
106        match self
107            .broadcast_emitter
108            .send((self.details().id, node_message.to_bytes(), None))
109            .await
110        {
111            Ok(_) => {}
112            Err(e) => {
113                error!("Failed to send broadcast message: {:?}", e);
114            }
115        }
116    }
117
118    pub async fn start(&self, inputs: Vec<u8>, agents: Vec<Arc<WorkerAgent>>) {
119        self.run_(inputs, agents).await;
120    }
121
122    pub async fn stop(&self) {
123        info!("Agent {} stop called", self.config.name);
124        self.shutdown_send.send(self._peer_id.clone()).unwrap();
125    }
126
127    pub fn details(&self) -> AgentDetail {
128        AgentDetail {
129            name: self.config.name.clone(),
130            id: self._peer_id.clone(),
131            role: "admin".to_string(),
132        }
133    }
134    async fn run_(&self, inputs: Vec<u8>, agents: Vec<Arc<WorkerAgent>>) {
135        info!("Agent {} running", self.config.name);
136
137        let runtime = tokio::runtime::Builder::new_multi_thread()
138            .enable_all()
139            .build()
140            .unwrap();
141
142        let cancel_token = CancellationToken::new();
143
144        let handle = runtime.handle().clone();
145
146        let worker_details: RwLock<HashMap<String, AgentDetail>> = RwLock::new(HashMap::new());
147
148        let config = self.config.clone();
149        let admin_config = AdminPeerConfig::new(config.port, config.name.clone(), Some(config.buffer_size as usize));
150
151        let peer_key = create_key_from_bytes(self._key.clone());
152
153        let (mut peer_, mut peer_listener_) =
154            AdminPeer::create(admin_config.clone(), peer_key).await;
155
156        let name = self.config.name.clone();
157        let port = self.config.port;
158
159        if peer_.id == self._peer_id {
160            info!("Admin peer created {}", peer_.id.clone());
161
162            let mut env_vars = HashMap::new();
163            env_vars.insert(ENV_WORKSPACE_PEER, peer_.id.clone());
164            env_vars.insert(ENV_WORKSPACE_PORT, port.clone().to_string());
165            env_vars.insert(ENV_WORKSPACE_ID, name.clone());
166            env_vars.insert(ENV_WORKSPACE_IP, "127.0.0.1".to_string());
167
168            if let Err(e) = utils::env::write_to_env_file(&env_vars) {
169                eprintln!("Failed to write to .ceylon_network file: {}", e);
170            } else {
171                println!("Successfully wrote variables to .ceylon_network file");
172            }
173
174            println!("------------------------------------------------------------------");
175            println!("| Important");
176            println!("| {}={}", ENV_WORKSPACE_ID, name.clone());
177            println!("| {}={}", ENV_WORKSPACE_PEER, peer_.id.clone());
178            println!("| {}={}", ENV_WORKSPACE_PORT, port);
179            println!("| {}={}", ENV_WORKSPACE_IP, "127.0.0.1");
180            println!("| Use this ServerAdmin peer ID to connect to the server");
181            println!("-------------------------------------------------------------------");
182        } else {
183            panic!("Id mismatch");
184        }
185        let admin_id = peer_.id.clone();
186        let admin_emitter = peer_.emitter();
187
188        let cancel_token_clone = cancel_token.clone();
189        let task_admin = handle.spawn(async move {
190            peer_.run(None, cancel_token_clone).await;
191        });
192
193        let mut worker_tasks = vec![];
194
195        let _inputs = inputs.clone();
196        let admin_id_ = admin_id.clone();
197
198        let cancel_token_clone = cancel_token.clone();
199        for agent in agents {
200            let _inputs_ = _inputs.clone();
201            let agent_ = agent.clone();
202            let _admin_id_ = admin_id_.clone();
203            let mut config = agent_.config.clone();
204            config.admin_peer = _admin_id_.clone();
205            let tasks = agent_
206                .run_with_config(
207                    _inputs_.clone(),
208                    config,
209                    handle.clone(),
210                    cancel_token_clone.clone(),
211                )
212                .await;
213            let agent_detail = agent_.details();
214
215            worker_details
216                .write()
217                .await
218                .insert(agent_detail.id.clone(), agent_detail);
219
220            for task in tasks {
221                worker_tasks.push(task);
222            }
223        }
224
225        info!("Worker tasks created");
226
227        let worker_tasks = join_all(worker_tasks);
228        let cancel_token_clone = cancel_token.clone();
229        let worker_task_runner = handle.spawn(async move {
230            worker_tasks.await;
231            loop {
232                if cancel_token_clone.is_cancelled() {
233                    break;
234                }
235            }
236        });
237
238        let name = self.config.name.clone();
239        let on_message = self._on_message.clone();
240        let on_event = self._on_event.clone();
241
242        let cancel_token_clone = cancel_token.clone();
243
244        let mut is_call_agent_on_connect_list: HashMap<String, bool> = HashMap::new();
245        let peer_id = self._peer_id.clone();
246        let task_admin_listener = handle.spawn(async move {
247            loop {
248                select! {
249                    _ = cancel_token_clone.cancelled() => {
250                        break;
251                    }
252                   event = peer_listener_.recv() => {
253                        if let Some(event) = event {
254                            match event {
255                                NodeMessage::Message{ data, created_by, time,..} => {
256                                    let agent_message = AgentMessage::from_bytes(data);
257
258                                    match agent_message {
259                                        AgentMessage::NodeMessage { message,message_type,.. } => {
260                                            match message_type {
261                                                MessageType::Direct { to_peer } => {
262                                                    // Only process if we're the intended recipient
263                                                    if to_peer == peer_id {
264                                                        on_message.lock().await.on_message(
265                                                            created_by,
266                                                            message,
267                                                            time,
268                                                        ).await;
269                                                    }
270                                                }
271                                                MessageType::Broadcast => {
272                                                    on_message.lock().await.on_message(
273                                                        created_by,
274                                                        message,
275                                                        time,
276                                                    ).await;
277                                                }
278                                            }
279                                        }
280                                        AgentMessage::AgentIntroduction { id, name,role,topic } => {
281                                            info!( "Agent introduction {:?}", id);
282                                            let peer_id = id.clone();
283                                            let id_key = id.clone();
284                                            let agent_detail = AgentDetail {
285                                               name,
286                                               id,
287                                               role
288                                            };
289                                            worker_details.write().await.insert(id_key, agent_detail);
290                                            let is_call_agent_on_connect = is_call_agent_on_connect_list.get( &peer_id).unwrap_or(&false).clone();
291                                            if !is_call_agent_on_connect{
292                                                if let Some(agent) = worker_details.read().await.get(&peer_id) {
293                                                    let agent = agent.clone();
294                                                    on_event.lock().await.on_agent_connected(topic,agent)
295                                                    .await;
296                                                    is_call_agent_on_connect_list.insert(peer_id, true);
297                                                }
298                                            }
299                                        }
300                                        _ => {
301                                            info!("Agent listener {:?}", agent_message);
302                                        }
303                                    }
304                                }
305                                NodeMessage::Event {
306                                    event,
307                                    ..
308                                }=>{
309                                   match event{
310                                        EventType::Subscribe{
311                                            peer_id,
312                                            topic,
313                                        }=>{
314                                            let is_call_agent_on_connect = is_call_agent_on_connect_list.get( &peer_id).unwrap_or(&false).clone();
315                                            if !is_call_agent_on_connect{
316                                                if let Some(agent) = worker_details.read().await.get(&peer_id) {
317                                                    let agent = agent.clone();
318                                                    on_event.lock().await.on_agent_connected(topic,agent)
319                                                    .await;
320                                                    is_call_agent_on_connect_list.insert(peer_id, true);
321                                                }
322                                            }
323                                        }
324                                        _ => {
325                                            info!("Admin Received Event {:?}", event);
326                                        }
327                                    }
328                                }
329                            }
330                        }
331                    }
332                }
333            }
334        });
335
336        let processor = self._processor.clone();
337        let processor_input_clone = inputs.clone();
338        let cancel_token_clone = cancel_token.clone();
339        let run_process = handle.spawn(async move {
340            processor.lock().await.run(processor_input_clone).await;
341            loop {
342                if cancel_token_clone.is_cancelled() {
343                    break;
344                }
345            }
346        });
347        let cancel_token_clone = cancel_token.clone();
348        let run_holder_process = handle.spawn(async move {
349            loop {
350                if cancel_token_clone.is_cancelled() {
351                    break;
352                }
353                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
354            }
355        });
356
357        let broadcast_receiver = self.broadcast_receiver.clone();
358        let cancel_token_clone = cancel_token.clone();
359        let run_broadcast = handle.spawn(async move {
360            loop {
361                if cancel_token_clone.is_cancelled() {
362                    break;
363                } else if let Some(raw_data) = broadcast_receiver.lock().await.recv().await {
364                    // info!("Agent broadcast {:?}", raw_data);
365                    match admin_emitter.send(raw_data).await {
366                        Ok(_) => {
367                            continue;
368                        }
369                        Err(_) => continue,
370                    };
371                }
372            }
373        });
374        let shutdown_recv = self.shutdown_recv.clone();
375        let admin_id_clone = admin_id.clone();
376        let shutdown_task = handle.spawn(async move {
377            loop {
378                if let Some(raw_data) = shutdown_recv.lock().await.recv().await {
379                    if raw_data == admin_id_clone {
380                        info!("Received shutdown signal, shutting down ...");
381                        cancel_token.cancel();
382                        break;
383                    }
384                }
385            }
386        });
387
388        let shutdown_tx = Arc::new(self.shutdown_send.clone());
389        let shutdown_tx = shutdown_tx.clone();
390        let admin_id_clone = admin_id.clone();
391        handle
392            .spawn(async move {
393                select! {
394                    _ = run_holder_process => {
395                        info!("Agent {} run_holder_process done", name);
396                    }
397                   _ = worker_task_runner => {
398                        info!("Agent {} task_admin_listener done", name);
399                    }
400                    _ = task_admin => {
401                        info!("Agent {} task_admin done", name);
402                    }
403                    _ = task_admin_listener => {
404                        info!("Agent {} task_admin_listener done", name);
405                    }
406                    _ = run_process => {
407                        info!("Agent {} run_process done", name);
408                    }
409                    _ = run_broadcast => {
410                        info!("Agent {} run_broadcast done", name);
411                    }
412                    _ = shutdown_task => {
413                        info!("Agent {} run_broadcast done", name);
414                    }
415                    _ = signal::ctrl_c() => {
416                        println!("Agent {:?} received exit signal", name);
417                        shutdown_tx.send(admin_id_clone).unwrap();
418                        // Perform any necessary cleanup here
419                    }
420                }
421            })
422            .await
423            .unwrap();
424    }
425}