ceylon_core/workspace/
uniffied_agent.rs

1/*
2 * Copyright 2024-Present, Syigen Ltd. and Syigen Private Limited. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (See LICENSE.md or http://www.apache.org/licenses/LICENSE-2.0).
4 *
5 */
6
7use crate::workspace::agent::{
8    AgentDetail, ENV_WORKSPACE_ID, ENV_WORKSPACE_IP, ENV_WORKSPACE_PEER, ENV_WORKSPACE_PORT,
9};
10use crate::workspace::agent::{EventHandler, MessageHandler, Processor};
11use crate::workspace::message::{AgentMessage, MessageType};
12use crate::WorkerAgent;
13use futures::future::join_all;
14use log::log_enabled;
15use sangedama::peer::message::data::{EventType, NodeMessage, NodeMessageTransporter};
16use sangedama::peer::node::node::{UnifiedPeerConfig, UnifiedPeerImpl};
17use sangedama::peer::node::peer_builder::{create_key, create_key_from_bytes, get_peer_id};
18use sangedama::peer::PeerMode;
19use std::collections::HashMap;
20use std::fs;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::io::AsyncReadExt;
24use tokio::runtime::{Handle, Runtime};
25use tokio::sync::Mutex;
26use tokio::sync::{mpsc, RwLock};
27use tokio::task::JoinHandle;
28use tokio::{select, signal};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, info};
31
32#[derive(Clone, Default, Debug)]
33pub struct UnifiedAgentConfig {
34    pub name: String,
35    pub mode: PeerMode,
36    pub role: Option<String>,
37    pub work_space_id: Option<String>,
38    pub port: Option<u16>,
39    pub admin_peer: Option<String>,
40    pub admin_ip: Option<String>,
41    pub buffer_size: Option<u16>,
42}
43
44impl UnifiedAgentConfig {
45    fn to_str(&self) -> String {
46        format!(
47            "name: {}, role: {:?}, work_space_id: {:?}, admin_peer: {:?}, admin_port: {:?}, admin_ip: {:?}, config_file {:?} ",
48            self.name, self.role, self.work_space_id, self.admin_peer, self.port, self.admin_ip, self.buffer_size
49        )
50    }
51}
52
53impl UnifiedAgentConfig {
54    pub fn update_from_file(&mut self, config_path: String) -> Result<(), std::io::Error> {
55        let config_content = fs::read_to_string(config_path)?;
56        let config: HashMap<String, String> = config_content
57            .lines()
58            .filter_map(|line| {
59                let parts: Vec<&str> = line.splitn(2, '=').collect();
60                if parts.len() == 2 {
61                    Some((parts[0].to_string(), parts[1].to_string()))
62                } else {
63                    None
64                }
65            })
66            .collect();
67
68        if let Some(workspace_id) = config.get(ENV_WORKSPACE_ID) {
69            self.work_space_id = Some(workspace_id.clone());
70        }
71        if let Some(admin_peer) = config.get(ENV_WORKSPACE_PEER) {
72            self.admin_peer = Some(admin_peer.clone());
73        }
74        if let Some(port) = config.get(ENV_WORKSPACE_PORT) {
75            self.port = port.parse().ok();
76        }
77        if let Some(ip) = config.get(ENV_WORKSPACE_IP) {
78            self.admin_ip = Some(ip.clone());
79        }
80
81        Ok(())
82    }
83
84    pub fn write_to_file(&self, config_path: String) -> Result<(), std::io::Error> {
85        let mut config = HashMap::new();
86        config.insert(ENV_WORKSPACE_ID.to_string(), self.work_space_id.clone());
87        config.insert(
88            ENV_WORKSPACE_PEER.to_string(),
89            Option::from(self.admin_peer.clone().unwrap()),
90        );
91        config.insert(
92            ENV_WORKSPACE_PORT.to_string(),
93            Option::from(self.port.unwrap().to_string()),
94        );
95        config.insert(
96            ENV_WORKSPACE_IP.to_string(),
97            Option::from(self.admin_ip.clone().unwrap_or("127.0.0.1".to_string())),
98        );
99        let config_content = config
100            .iter()
101            .map(|(k, v)| format!("{}={}", k, v.clone().unwrap()))
102            .collect::<Vec<String>>()
103            .join("\n");
104        fs::write(config_path, config_content)
105    }
106
107    pub fn copy_with(&mut self, _conf: UnifiedAgentConfig) {
108        self.name = _conf.name.clone();
109        self.mode = _conf.mode;
110        self.role = _conf.role.clone();
111        self.work_space_id = _conf.work_space_id.clone();
112        self.port = _conf.port.clone();
113        self.admin_peer = _conf.admin_peer.clone();
114        self.admin_ip = _conf.admin_ip.clone();
115        self.buffer_size = _conf.buffer_size.clone();
116    }
117}
118
119pub struct UnifiedAgent {
120    _config: UnifiedAgentConfig,
121    _config_path: Option<String>,
122    _processor: Arc<Mutex<Arc<dyn Processor>>>,
123    _on_message: Arc<Mutex<Arc<dyn MessageHandler>>>,
124    _on_event: Arc<Mutex<Arc<dyn EventHandler>>>,
125
126    pub broadcast_emitter: mpsc::Sender<NodeMessageTransporter>,
127    pub broadcast_receiver: Arc<Mutex<mpsc::Receiver<NodeMessageTransporter>>>,
128
129    _peer_id: String,
130    _key: Vec<u8>,
131
132    pub shutdown_send: mpsc::UnboundedSender<String>,
133    pub shutdown_recv: Arc<Mutex<mpsc::UnboundedReceiver<String>>>,
134
135    _connected_agents: Arc<RwLock<HashMap<String, AgentDetail>>>,
136
137    _cancel_token: CancellationToken,
138}
139
140impl UnifiedAgent {
141    pub fn new(
142        config: Option<UnifiedAgentConfig>,
143        config_path: Option<String>,
144        on_message: Arc<dyn MessageHandler>,
145        processor: Arc<dyn Processor>,
146        on_event: Arc<dyn EventHandler>,
147    ) -> Self {
148        let (broadcast_emitter, broadcast_receiver) = mpsc::channel(2);
149        let admin_peer_key = create_key();
150        let id = get_peer_id(&admin_peer_key).to_string();
151
152        let (shutdown_send, shutdown_recv) = mpsc::unbounded_channel();
153
154        let mut _config = UnifiedAgentConfig::default();
155
156        let conf = config.clone();
157        if let Some(_conf) = conf {
158            _config.copy_with(_conf);
159        }
160
161        if _config.mode == PeerMode::Admin {
162            _config.admin_peer = Some(id.clone());
163            _config
164                .write_to_file(".ceylon_network".to_string())
165                .expect("Failed to write config");
166        }
167
168        Self {
169            _config,
170            _config_path: if config_path.is_some() {
171                Some(config_path.unwrap())
172            } else {
173                Some("./.ceylon_network".to_string())
174            },
175            _processor: Arc::new(Mutex::new(processor)),
176            _on_message: Arc::new(Mutex::new(on_message)),
177            _on_event: Arc::new(Mutex::new(on_event)),
178
179            broadcast_emitter,
180            broadcast_receiver: Arc::new(Mutex::new(broadcast_receiver)),
181
182            _peer_id: id,
183            _key: admin_peer_key.to_protobuf_encoding().unwrap(),
184
185            shutdown_send,
186            shutdown_recv: Arc::new(Mutex::new(shutdown_recv)),
187
188            _connected_agents: Arc::new(RwLock::new(HashMap::new())),
189
190            _cancel_token: CancellationToken::new(),
191        }
192    }
193
194    pub async fn send_direct(&self, to_peer: String, message: Vec<u8>) {
195        let node_message = AgentMessage::create_direct_message(message, to_peer.clone());
196        debug!("Sending direct message to {}", to_peer);
197        match self
198            .broadcast_emitter
199            .send((self.details().id, node_message.to_bytes(), Some(to_peer)))
200            .await
201        {
202            Ok(_) => {}
203            Err(e) => {
204                error!("Failed to send direct message: {:?}", e);
205            }
206        }
207    }
208
209    pub async fn broadcast(&self, message: Vec<u8>) {
210        let node_message = AgentMessage::create_broadcast_message(message);
211        match self
212            .broadcast_emitter
213            .send((self.details().id, node_message.to_bytes(), None))
214            .await
215        {
216            Ok(_) => {}
217            Err(e) => {
218                error!("Failed to send broadcast message: {:?}", e);
219            }
220        }
221    }
222
223    pub fn details(&self) -> AgentDetail {
224        AgentDetail {
225            name: self._config.name.clone(),
226            id: self._peer_id.clone(),
227            role: self._config.role.clone().unwrap_or("".to_string()),
228        }
229    }
230
231    pub async fn get_connected_agents(&self) -> Vec<AgentDetail> {
232        let agents = self._connected_agents.read().await;
233        agents.values().cloned().collect()
234    }
235
236    pub async fn start(&self, inputs: Vec<u8>, agents: Option<Vec<Arc<UnifiedAgent>>>) {
237        let runtime = tokio::runtime::Builder::new_multi_thread()
238            .enable_all()
239            .build()
240            .unwrap();
241
242        let cancel_token = self._cancel_token.clone();
243
244        // Get all task handlers
245        let mut self_agent_handlers = self
246            .run(inputs.clone(), None, cancel_token.clone(), runtime.handle())
247            .await;
248
249        // Create a single future that completes when all tasks are done
250
251        let _agent_list = agents.clone();
252        if agents.is_some() {
253            let agents = agents.unwrap();
254            for agent in agents {
255                let agent_handlers = agent
256                    .run(inputs.clone(), None, cancel_token.clone(), runtime.handle())
257                    .await;
258                self_agent_handlers.extend(agent_handlers);
259            }
260        }
261
262        let all_tasks = join_all(self_agent_handlers);
263
264        let cancel_token_clone = cancel_token.clone();
265        let shutdown_recv = self.shutdown_recv.clone();
266        let admin_id = self._peer_id.clone();
267        let task_shutdown = runtime.spawn(async move {
268            let mut shutdown_recv_lock = shutdown_recv.lock().await;
269            loop {
270                select! {
271                    _ = cancel_token_clone.cancelled() => {
272                        debug!("Shutdown handler shutting down");
273                        break;
274                    }
275                    msg = shutdown_recv_lock.recv() => {
276                        if let Some(raw_data) = msg {
277                            if raw_data == admin_id {
278                                debug!("Received shutdown signal");
279                                cancel_token_clone.cancel();
280                                break;
281                            }
282                        }
283                    }
284                }
285            }
286        });
287
288        // Use select to handle either ctrl-c or task completion
289        runtime
290            .spawn(async move {
291                select! {
292
293                    _ = task_shutdown => {
294                        debug!("Shutdown handler completed");
295                    }
296
297                    _ = all_tasks => {
298                        debug!("All agent tasks completed normally");
299                    }
300                    _ = signal::ctrl_c() => {
301                        debug!("Received ctrl-c, initiating shutdown");
302                        cancel_token .cancel();
303                        // Wait for tasks to complete after cancellation
304                        // all_tasks.await;
305                    }
306                }
307            })
308            .await
309            .unwrap();
310    }
311
312    async fn run(
313        &self,
314        inputs: Vec<u8>,
315        agents: Option<Vec<Arc<UnifiedAgent>>>,
316        cancel_token: CancellationToken,
317        handle: &Handle,
318    ) -> Vec<JoinHandle<()>> {
319        let mut config = self._config.clone();
320        let config_path = self._config_path.clone();
321        debug!("Config: {}", config.to_str());
322        debug!("Config path: {}", config_path.clone().unwrap());
323        if config_path.is_some() || config.mode == PeerMode::Client {
324            let conf_file = config_path.unwrap().clone();
325            debug!(
326                "Checking .ceylon_network config {}",
327                fs::metadata(conf_file.clone()).is_ok()
328            );
329            // check .ceylon_network exists
330            if fs::metadata(conf_file.clone()).is_ok() {
331                config.update_from_file(conf_file.clone());
332                debug!("--------------------------------");
333                debug!("Using .ceylon_network config");
334                debug!("{} = {:?}", ENV_WORKSPACE_ID, config.work_space_id);
335                debug!("{} = {:?}", ENV_WORKSPACE_PEER, config.admin_peer);
336                debug!("{} = {:?}", ENV_WORKSPACE_PORT, config.port);
337                debug!("{} = {:?}", ENV_WORKSPACE_IP, config.admin_ip);
338                debug!("--------------------------------");
339            }
340        }
341        let peer_config = match self._config.mode {
342            PeerMode::Admin => UnifiedPeerConfig::new_admin(
343                config
344                    .work_space_id
345                    .clone()
346                    .unwrap_or("CEYLON-AI-AGENT-NETWORK".to_string()),
347                config.port.unwrap_or(0),
348                config.buffer_size,
349            ),
350            PeerMode::Client => UnifiedPeerConfig::new_member(
351                config.name.clone(),
352                config
353                    .work_space_id
354                    .clone()
355                    .unwrap_or("CEYLON-AI-AGENT-NETWORK".to_string()),
356                config.admin_peer.clone().unwrap(),
357                config.port.unwrap_or(0),
358                config.admin_ip.clone().unwrap_or_default(),
359                config.buffer_size,
360            ),
361        };
362
363        // let worker_details: RwLock<HashMap<String, AgentDetail>> = RwLock::new(HashMap::new());
364        // Create peer and listener
365        let peer_key = create_key_from_bytes(self._key.clone());
366        let (mut peer, mut peer_listener) =
367            UnifiedPeerImpl::create(peer_config.clone(), peer_key).await;
368
369        let worker_details: Arc<RwLock<HashMap<String, AgentDetail>>> =
370            self._connected_agents.clone();
371        let peer_emitter_clone = peer.emitter().clone();
372        let broadcast_emitter_clone = peer.emitter().clone();
373
374        // Spawn peer runner with proper cancellation
375        let cancel_token_clone = cancel_token.clone();
376        let task_peer = handle.spawn(async move {
377            select! {
378                _ = peer.run(cancel_token_clone.clone()) => {
379                    debug!("Peer run completed");
380                }
381                _ = cancel_token_clone.cancelled() => {
382                    debug!("Peer run cancelled");
383                }
384            }
385        });
386
387        let registration_intro_send_cancel_token = CancellationToken::new();
388
389        let on_message = self._on_message.clone();
390        let on_event = self._on_event.clone();
391        let peer_id = self._peer_id.clone();
392        let cancel_token_clone = cancel_token.clone();
393
394        let my_self_details = self.details().clone();
395        // Handle peer events
396        let task_peer_listener = handle.spawn(async move {
397            let mut is_call_agent_on_connect_list: HashMap<String, bool> = HashMap::new();
398
399            loop {
400                select! {
401                     _ = cancel_token_clone.cancelled() => {
402                        debug!("Peer listener select shutting down");
403                        break;
404                    }
405                    event = peer_listener.recv() => {
406                        if let Some(node_message) = event {
407                            if cancel_token_clone.is_cancelled() {
408                                debug!( "Peer listener shutting down");
409                                break;
410                            }
411                            // debug!( "Node Message: {:?}", node_message);
412                            match node_message {
413                                NodeMessage::Message{ data, created_by, time, .. } => {
414                                    let agent_message = AgentMessage::from_bytes(data);
415                                    debug!( "Agent message from data: {:#?}", agent_message);
416                                    match agent_message {
417                                        AgentMessage::NodeMessage { message, message_type, .. } => {
418                                            debug!( "Agent message: {:#?}", message);
419                                            match message_type {
420                                                MessageType::Direct { to_peer } => {
421                                                    if to_peer == peer_id {
422                                                        on_message.lock().await.on_message(
423                                                            created_by,
424                                                            message,
425                                                            time,
426                                                        ).await;
427                                                    }
428                                                }
429                                                MessageType::Broadcast => {
430                                                    on_message.lock().await.on_message(
431                                                        created_by,
432                                                        message,
433                                                        time,
434                                                    ).await;
435                                                }
436                                            }
437                                        }
438                                        AgentMessage::AgentIntroduction { id, name, role, topic } => {
439                                            debug!( "Agent introduction {:?}", id);
440                                            let peer_id = id.clone();
441                                            let id_key = id.clone();
442                                            let _ag = AgentDetail{
443                                                name,
444                                                id,
445                                                role
446                                            };
447                                            on_event.lock().await.on_agent_connected(
448                                                topic.clone(),
449                                                _ag.clone()
450                                            ).await;
451                                            worker_details.write().await.insert(id_key, _ag.clone());
452                                            if config.mode == PeerMode::Admin {
453                                                 let agent_intro_message = AgentMessage::create_registration_ack_message(
454                                                    peer_id.clone(),
455                                                    true,
456                                                );
457                                                peer_emitter_clone.send(
458                                                    (my_self_details.id.clone(),agent_intro_message.to_bytes(),None)
459                                                ).await.unwrap();
460                                            }
461                                            debug!( "{:?} Worker details: {:#?}", my_self_details.clone().id, worker_details.read().await);
462                                        }
463                                        AgentMessage::AgentRegistrationAck { id,status } => {
464                                            debug!( "Agent registration ack: {:#?}", status);
465                                            if (status){
466                                                registration_intro_send_cancel_token.cancel();
467                                            }
468                                        }
469                                        _ => {}
470                                    }
471                                }
472                                NodeMessage::Event { event, .. } => {
473                                    debug!( "Peer event: {:?}", event);
474                                    match event{
475                                        EventType::Subscribe{
476                                            peer_id,
477                                            topic,
478                                        }=>{
479                                            if worker_details.read().await.get(&peer_id).is_none() {
480                                                let agent_intro_message = AgentMessage::create_introduction_message(
481                                                    my_self_details.clone().id,
482                                                    my_self_details.clone().name,
483                                                    my_self_details.clone().role,
484                                                    topic.clone(),
485                                                );
486                                                let _cancel_token = registration_intro_send_cancel_token.clone();
487                                                let _emitter = peer_emitter_clone.clone();
488                                                let _id = my_self_details.id.clone();
489
490
491                                                if config.mode == PeerMode::Admin {
492                                                    _emitter.send(
493                                                            (_id.clone(),agent_intro_message.to_bytes(),None)
494                                                        ).await.unwrap();
495                                                }else{
496                                                    tokio::spawn(async move {
497                                                    loop{
498                                                        if _cancel_token.is_cancelled() {
499                                                            break;
500                                                        }
501                                                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
502                                                            _emitter.send(
503                                                            (_id.clone(),agent_intro_message.to_bytes(),None)
504                                                        ).await.unwrap();
505                                                    }
506                                                });
507                                                }
508                                            }
509                                        }
510                                        _ => {
511                                            debug!("Admin Received Event {:?}", event);
512                                        }
513                                    }
514                                }
515                            }
516                        }
517                    }
518                }
519            }
520        });
521
522        // Spawn processor with proper cancellation
523        let processor = self._processor.clone();
524        let cancel_token_clone = cancel_token.clone();
525        let task_processor = handle.spawn(async move {
526            processor.lock().await.run(inputs).await;
527            loop {
528                if cancel_token_clone.is_cancelled() {
529                    debug!("Processor shutting down");
530                    break;
531                }
532                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
533            }
534        });
535
536        // Spawn broadcast handler with proper cancellation
537        let broadcast_receiver = self.broadcast_receiver.clone();
538        let cancel_token_clone = cancel_token.clone();
539        let task_broadcast = handle.spawn(async move {
540            let mut broadcast_receiver_lock = broadcast_receiver.lock().await;
541            loop {
542                select! {
543                    _ = cancel_token_clone.cancelled() => {
544                        debug!("Broadcast handler shutting down");
545                        break;
546                    }
547                    msg = broadcast_receiver_lock.recv() => {
548                        if let Some(raw_data) = msg {
549                            broadcast_emitter_clone.send(raw_data).await.unwrap();
550                        }
551                    }
552                }
553            }
554        });
555
556        // Spawn shutdown handler
557
558        let cancel_token_clone = cancel_token.clone();
559        let run_holder_process = handle.spawn(async move {
560            loop {
561                if cancel_token_clone.is_cancelled() {
562                    break;
563                }
564                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
565            }
566        });
567        vec![
568            task_peer,
569            task_peer_listener,
570            task_processor,
571            task_broadcast,
572            run_holder_process,
573        ]
574    }
575
576    async fn cleanup(&self) {
577        // Release any resources that need explicit cleanup
578        debug!("Cleaning up agent resources");
579        // Close channels
580        drop(self.broadcast_emitter.clone());
581        // Any other cleanup...
582    }
583
584    pub async fn stop(&self) {
585        debug!("Agent {} stop called", self._config.name);
586        self._cancel_token.cancel();
587        // self.shutdown_send.send(self._peer_id.clone()).unwrap();
588        self.cleanup().await;
589    }
590}