1use crate::workspace::agent::{
8 AgentDetail, ENV_WORKSPACE_ID, ENV_WORKSPACE_IP, ENV_WORKSPACE_PEER, ENV_WORKSPACE_PORT,
9};
10use crate::workspace::agent::{EventHandler, MessageHandler, Processor};
11use crate::workspace::message::{AgentMessage, MessageType};
12use crate::WorkerAgent;
13use futures::future::join_all;
14use log::log_enabled;
15use sangedama::peer::message::data::{EventType, NodeMessage, NodeMessageTransporter};
16use sangedama::peer::node::node::{UnifiedPeerConfig, UnifiedPeerImpl};
17use sangedama::peer::node::peer_builder::{create_key, create_key_from_bytes, get_peer_id};
18use sangedama::peer::PeerMode;
19use std::collections::HashMap;
20use std::fs;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::io::AsyncReadExt;
24use tokio::runtime::{Handle, Runtime};
25use tokio::sync::Mutex;
26use tokio::sync::{mpsc, RwLock};
27use tokio::task::JoinHandle;
28use tokio::{select, signal};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, info};
31
32#[derive(Clone, Default, Debug)]
33pub struct UnifiedAgentConfig {
34 pub name: String,
35 pub mode: PeerMode,
36 pub role: Option<String>,
37 pub work_space_id: Option<String>,
38 pub port: Option<u16>,
39 pub admin_peer: Option<String>,
40 pub admin_ip: Option<String>,
41 pub buffer_size: Option<u16>,
42}
43
44impl UnifiedAgentConfig {
45 fn to_str(&self) -> String {
46 format!(
47 "name: {}, role: {:?}, work_space_id: {:?}, admin_peer: {:?}, admin_port: {:?}, admin_ip: {:?}, config_file {:?} ",
48 self.name, self.role, self.work_space_id, self.admin_peer, self.port, self.admin_ip, self.buffer_size
49 )
50 }
51}
52
53impl UnifiedAgentConfig {
54 pub fn update_from_file(&mut self, config_path: String) -> Result<(), std::io::Error> {
55 let config_content = fs::read_to_string(config_path)?;
56 let config: HashMap<String, String> = config_content
57 .lines()
58 .filter_map(|line| {
59 let parts: Vec<&str> = line.splitn(2, '=').collect();
60 if parts.len() == 2 {
61 Some((parts[0].to_string(), parts[1].to_string()))
62 } else {
63 None
64 }
65 })
66 .collect();
67
68 if let Some(workspace_id) = config.get(ENV_WORKSPACE_ID) {
69 self.work_space_id = Some(workspace_id.clone());
70 }
71 if let Some(admin_peer) = config.get(ENV_WORKSPACE_PEER) {
72 self.admin_peer = Some(admin_peer.clone());
73 }
74 if let Some(port) = config.get(ENV_WORKSPACE_PORT) {
75 self.port = port.parse().ok();
76 }
77 if let Some(ip) = config.get(ENV_WORKSPACE_IP) {
78 self.admin_ip = Some(ip.clone());
79 }
80
81 Ok(())
82 }
83
84 pub fn write_to_file(&self, config_path: String) -> Result<(), std::io::Error> {
85 let mut config = HashMap::new();
86 config.insert(ENV_WORKSPACE_ID.to_string(), self.work_space_id.clone());
87 config.insert(
88 ENV_WORKSPACE_PEER.to_string(),
89 Option::from(self.admin_peer.clone().unwrap()),
90 );
91 config.insert(
92 ENV_WORKSPACE_PORT.to_string(),
93 Option::from(self.port.unwrap().to_string()),
94 );
95 config.insert(
96 ENV_WORKSPACE_IP.to_string(),
97 Option::from(self.admin_ip.clone().unwrap_or("127.0.0.1".to_string())),
98 );
99 let config_content = config
100 .iter()
101 .map(|(k, v)| format!("{}={}", k, v.clone().unwrap()))
102 .collect::<Vec<String>>()
103 .join("\n");
104 fs::write(config_path, config_content)
105 }
106
107 pub fn copy_with(&mut self, _conf: UnifiedAgentConfig) {
108 self.name = _conf.name.clone();
109 self.mode = _conf.mode;
110 self.role = _conf.role.clone();
111 self.work_space_id = _conf.work_space_id.clone();
112 self.port = _conf.port.clone();
113 self.admin_peer = _conf.admin_peer.clone();
114 self.admin_ip = _conf.admin_ip.clone();
115 self.buffer_size = _conf.buffer_size.clone();
116 }
117}
118
119pub struct UnifiedAgent {
120 _config: UnifiedAgentConfig,
121 _config_path: Option<String>,
122 _processor: Arc<Mutex<Arc<dyn Processor>>>,
123 _on_message: Arc<Mutex<Arc<dyn MessageHandler>>>,
124 _on_event: Arc<Mutex<Arc<dyn EventHandler>>>,
125
126 pub broadcast_emitter: mpsc::Sender<NodeMessageTransporter>,
127 pub broadcast_receiver: Arc<Mutex<mpsc::Receiver<NodeMessageTransporter>>>,
128
129 _peer_id: String,
130 _key: Vec<u8>,
131
132 pub shutdown_send: mpsc::UnboundedSender<String>,
133 pub shutdown_recv: Arc<Mutex<mpsc::UnboundedReceiver<String>>>,
134
135 _connected_agents: Arc<RwLock<HashMap<String, AgentDetail>>>,
136
137 _cancel_token: CancellationToken,
138}
139
140impl UnifiedAgent {
141 pub fn new(
142 config: Option<UnifiedAgentConfig>,
143 config_path: Option<String>,
144 on_message: Arc<dyn MessageHandler>,
145 processor: Arc<dyn Processor>,
146 on_event: Arc<dyn EventHandler>,
147 ) -> Self {
148 let (broadcast_emitter, broadcast_receiver) = mpsc::channel(2);
149 let admin_peer_key = create_key();
150 let id = get_peer_id(&admin_peer_key).to_string();
151
152 let (shutdown_send, shutdown_recv) = mpsc::unbounded_channel();
153
154 let mut _config = UnifiedAgentConfig::default();
155
156 let conf = config.clone();
157 if let Some(_conf) = conf {
158 _config.copy_with(_conf);
159 }
160
161 if _config.mode == PeerMode::Admin {
162 _config.admin_peer = Some(id.clone());
163 _config
164 .write_to_file(".ceylon_network".to_string())
165 .expect("Failed to write config");
166 }
167
168 Self {
169 _config,
170 _config_path: if config_path.is_some() {
171 Some(config_path.unwrap())
172 } else {
173 Some("./.ceylon_network".to_string())
174 },
175 _processor: Arc::new(Mutex::new(processor)),
176 _on_message: Arc::new(Mutex::new(on_message)),
177 _on_event: Arc::new(Mutex::new(on_event)),
178
179 broadcast_emitter,
180 broadcast_receiver: Arc::new(Mutex::new(broadcast_receiver)),
181
182 _peer_id: id,
183 _key: admin_peer_key.to_protobuf_encoding().unwrap(),
184
185 shutdown_send,
186 shutdown_recv: Arc::new(Mutex::new(shutdown_recv)),
187
188 _connected_agents: Arc::new(RwLock::new(HashMap::new())),
189
190 _cancel_token: CancellationToken::new(),
191 }
192 }
193
194 pub async fn send_direct(&self, to_peer: String, message: Vec<u8>) {
195 let node_message = AgentMessage::create_direct_message(message, to_peer.clone());
196 debug!("Sending direct message to {}", to_peer);
197 match self
198 .broadcast_emitter
199 .send((self.details().id, node_message.to_bytes(), Some(to_peer)))
200 .await
201 {
202 Ok(_) => {}
203 Err(e) => {
204 error!("Failed to send direct message: {:?}", e);
205 }
206 }
207 }
208
209 pub async fn broadcast(&self, message: Vec<u8>) {
210 let node_message = AgentMessage::create_broadcast_message(message);
211 match self
212 .broadcast_emitter
213 .send((self.details().id, node_message.to_bytes(), None))
214 .await
215 {
216 Ok(_) => {}
217 Err(e) => {
218 error!("Failed to send broadcast message: {:?}", e);
219 }
220 }
221 }
222
223 pub fn details(&self) -> AgentDetail {
224 AgentDetail {
225 name: self._config.name.clone(),
226 id: self._peer_id.clone(),
227 role: self._config.role.clone().unwrap_or("".to_string()),
228 }
229 }
230
231 pub async fn get_connected_agents(&self) -> Vec<AgentDetail> {
232 let agents = self._connected_agents.read().await;
233 agents.values().cloned().collect()
234 }
235
236 pub async fn start(&self, inputs: Vec<u8>, agents: Option<Vec<Arc<UnifiedAgent>>>) {
237 let runtime = tokio::runtime::Builder::new_multi_thread()
238 .enable_all()
239 .build()
240 .unwrap();
241
242 let cancel_token = self._cancel_token.clone();
243
244 let mut self_agent_handlers = self
246 .run(inputs.clone(), None, cancel_token.clone(), runtime.handle())
247 .await;
248
249 let _agent_list = agents.clone();
252 if agents.is_some() {
253 let agents = agents.unwrap();
254 for agent in agents {
255 let agent_handlers = agent
256 .run(inputs.clone(), None, cancel_token.clone(), runtime.handle())
257 .await;
258 self_agent_handlers.extend(agent_handlers);
259 }
260 }
261
262 let all_tasks = join_all(self_agent_handlers);
263
264 let cancel_token_clone = cancel_token.clone();
265 let shutdown_recv = self.shutdown_recv.clone();
266 let admin_id = self._peer_id.clone();
267 let task_shutdown = runtime.spawn(async move {
268 let mut shutdown_recv_lock = shutdown_recv.lock().await;
269 loop {
270 select! {
271 _ = cancel_token_clone.cancelled() => {
272 debug!("Shutdown handler shutting down");
273 break;
274 }
275 msg = shutdown_recv_lock.recv() => {
276 if let Some(raw_data) = msg {
277 if raw_data == admin_id {
278 debug!("Received shutdown signal");
279 cancel_token_clone.cancel();
280 break;
281 }
282 }
283 }
284 }
285 }
286 });
287
288 runtime
290 .spawn(async move {
291 select! {
292
293 _ = task_shutdown => {
294 debug!("Shutdown handler completed");
295 }
296
297 _ = all_tasks => {
298 debug!("All agent tasks completed normally");
299 }
300 _ = signal::ctrl_c() => {
301 debug!("Received ctrl-c, initiating shutdown");
302 cancel_token .cancel();
303 }
306 }
307 })
308 .await
309 .unwrap();
310 }
311
312 async fn run(
313 &self,
314 inputs: Vec<u8>,
315 agents: Option<Vec<Arc<UnifiedAgent>>>,
316 cancel_token: CancellationToken,
317 handle: &Handle,
318 ) -> Vec<JoinHandle<()>> {
319 let mut config = self._config.clone();
320 let config_path = self._config_path.clone();
321 debug!("Config: {}", config.to_str());
322 debug!("Config path: {}", config_path.clone().unwrap());
323 if config_path.is_some() || config.mode == PeerMode::Client {
324 let conf_file = config_path.unwrap().clone();
325 debug!(
326 "Checking .ceylon_network config {}",
327 fs::metadata(conf_file.clone()).is_ok()
328 );
329 if fs::metadata(conf_file.clone()).is_ok() {
331 config.update_from_file(conf_file.clone());
332 debug!("--------------------------------");
333 debug!("Using .ceylon_network config");
334 debug!("{} = {:?}", ENV_WORKSPACE_ID, config.work_space_id);
335 debug!("{} = {:?}", ENV_WORKSPACE_PEER, config.admin_peer);
336 debug!("{} = {:?}", ENV_WORKSPACE_PORT, config.port);
337 debug!("{} = {:?}", ENV_WORKSPACE_IP, config.admin_ip);
338 debug!("--------------------------------");
339 }
340 }
341 let peer_config = match self._config.mode {
342 PeerMode::Admin => UnifiedPeerConfig::new_admin(
343 config
344 .work_space_id
345 .clone()
346 .unwrap_or("CEYLON-AI-AGENT-NETWORK".to_string()),
347 config.port.unwrap_or(0),
348 config.buffer_size,
349 ),
350 PeerMode::Client => UnifiedPeerConfig::new_member(
351 config.name.clone(),
352 config
353 .work_space_id
354 .clone()
355 .unwrap_or("CEYLON-AI-AGENT-NETWORK".to_string()),
356 config.admin_peer.clone().unwrap(),
357 config.port.unwrap_or(0),
358 config.admin_ip.clone().unwrap_or_default(),
359 config.buffer_size,
360 ),
361 };
362
363 let peer_key = create_key_from_bytes(self._key.clone());
366 let (mut peer, mut peer_listener) =
367 UnifiedPeerImpl::create(peer_config.clone(), peer_key).await;
368
369 let worker_details: Arc<RwLock<HashMap<String, AgentDetail>>> =
370 self._connected_agents.clone();
371 let peer_emitter_clone = peer.emitter().clone();
372 let broadcast_emitter_clone = peer.emitter().clone();
373
374 let cancel_token_clone = cancel_token.clone();
376 let task_peer = handle.spawn(async move {
377 select! {
378 _ = peer.run(cancel_token_clone.clone()) => {
379 debug!("Peer run completed");
380 }
381 _ = cancel_token_clone.cancelled() => {
382 debug!("Peer run cancelled");
383 }
384 }
385 });
386
387 let registration_intro_send_cancel_token = CancellationToken::new();
388
389 let on_message = self._on_message.clone();
390 let on_event = self._on_event.clone();
391 let peer_id = self._peer_id.clone();
392 let cancel_token_clone = cancel_token.clone();
393
394 let my_self_details = self.details().clone();
395 let task_peer_listener = handle.spawn(async move {
397 let mut is_call_agent_on_connect_list: HashMap<String, bool> = HashMap::new();
398
399 loop {
400 select! {
401 _ = cancel_token_clone.cancelled() => {
402 debug!("Peer listener select shutting down");
403 break;
404 }
405 event = peer_listener.recv() => {
406 if let Some(node_message) = event {
407 if cancel_token_clone.is_cancelled() {
408 debug!( "Peer listener shutting down");
409 break;
410 }
411 match node_message {
413 NodeMessage::Message{ data, created_by, time, .. } => {
414 let agent_message = AgentMessage::from_bytes(data);
415 debug!( "Agent message from data: {:#?}", agent_message);
416 match agent_message {
417 AgentMessage::NodeMessage { message, message_type, .. } => {
418 debug!( "Agent message: {:#?}", message);
419 match message_type {
420 MessageType::Direct { to_peer } => {
421 if to_peer == peer_id {
422 on_message.lock().await.on_message(
423 created_by,
424 message,
425 time,
426 ).await;
427 }
428 }
429 MessageType::Broadcast => {
430 on_message.lock().await.on_message(
431 created_by,
432 message,
433 time,
434 ).await;
435 }
436 }
437 }
438 AgentMessage::AgentIntroduction { id, name, role, topic } => {
439 debug!( "Agent introduction {:?}", id);
440 let peer_id = id.clone();
441 let id_key = id.clone();
442 let _ag = AgentDetail{
443 name,
444 id,
445 role
446 };
447 on_event.lock().await.on_agent_connected(
448 topic.clone(),
449 _ag.clone()
450 ).await;
451 worker_details.write().await.insert(id_key, _ag.clone());
452 if config.mode == PeerMode::Admin {
453 let agent_intro_message = AgentMessage::create_registration_ack_message(
454 peer_id.clone(),
455 true,
456 );
457 peer_emitter_clone.send(
458 (my_self_details.id.clone(),agent_intro_message.to_bytes(),None)
459 ).await.unwrap();
460 }
461 debug!( "{:?} Worker details: {:#?}", my_self_details.clone().id, worker_details.read().await);
462 }
463 AgentMessage::AgentRegistrationAck { id,status } => {
464 debug!( "Agent registration ack: {:#?}", status);
465 if (status){
466 registration_intro_send_cancel_token.cancel();
467 }
468 }
469 _ => {}
470 }
471 }
472 NodeMessage::Event { event, .. } => {
473 debug!( "Peer event: {:?}", event);
474 match event{
475 EventType::Subscribe{
476 peer_id,
477 topic,
478 }=>{
479 if worker_details.read().await.get(&peer_id).is_none() {
480 let agent_intro_message = AgentMessage::create_introduction_message(
481 my_self_details.clone().id,
482 my_self_details.clone().name,
483 my_self_details.clone().role,
484 topic.clone(),
485 );
486 let _cancel_token = registration_intro_send_cancel_token.clone();
487 let _emitter = peer_emitter_clone.clone();
488 let _id = my_self_details.id.clone();
489
490
491 if config.mode == PeerMode::Admin {
492 _emitter.send(
493 (_id.clone(),agent_intro_message.to_bytes(),None)
494 ).await.unwrap();
495 }else{
496 tokio::spawn(async move {
497 loop{
498 if _cancel_token.is_cancelled() {
499 break;
500 }
501 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
502 _emitter.send(
503 (_id.clone(),agent_intro_message.to_bytes(),None)
504 ).await.unwrap();
505 }
506 });
507 }
508 }
509 }
510 _ => {
511 debug!("Admin Received Event {:?}", event);
512 }
513 }
514 }
515 }
516 }
517 }
518 }
519 }
520 });
521
522 let processor = self._processor.clone();
524 let cancel_token_clone = cancel_token.clone();
525 let task_processor = handle.spawn(async move {
526 processor.lock().await.run(inputs).await;
527 loop {
528 if cancel_token_clone.is_cancelled() {
529 debug!("Processor shutting down");
530 break;
531 }
532 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
533 }
534 });
535
536 let broadcast_receiver = self.broadcast_receiver.clone();
538 let cancel_token_clone = cancel_token.clone();
539 let task_broadcast = handle.spawn(async move {
540 let mut broadcast_receiver_lock = broadcast_receiver.lock().await;
541 loop {
542 select! {
543 _ = cancel_token_clone.cancelled() => {
544 debug!("Broadcast handler shutting down");
545 break;
546 }
547 msg = broadcast_receiver_lock.recv() => {
548 if let Some(raw_data) = msg {
549 broadcast_emitter_clone.send(raw_data).await.unwrap();
550 }
551 }
552 }
553 }
554 });
555
556 let cancel_token_clone = cancel_token.clone();
559 let run_holder_process = handle.spawn(async move {
560 loop {
561 if cancel_token_clone.is_cancelled() {
562 break;
563 }
564 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
565 }
566 });
567 vec![
568 task_peer,
569 task_peer_listener,
570 task_processor,
571 task_broadcast,
572 run_holder_process,
573 ]
574 }
575
576 async fn cleanup(&self) {
577 debug!("Cleaning up agent resources");
579 drop(self.broadcast_emitter.clone());
581 }
583
584 pub async fn stop(&self) {
585 debug!("Agent {} stop called", self._config.name);
586 self._cancel_token.cancel();
587 self.cleanup().await;
589 }
590}