1use std::collections::HashMap;
8
9use std::sync::Arc;
10use std::time::SystemTime;
11
12use futures::future::join_all;
13use tokio::io::AsyncReadExt;
14use tokio::sync::{mpsc, Mutex, RwLock};
15use tokio::{select, signal};
16use tokio_util::sync::CancellationToken;
17use tracing::{error, info};
18
19use crate::workspace::agent::{
20 AgentDetail, EventHandler, ENV_WORKSPACE_ID, ENV_WORKSPACE_IP, ENV_WORKSPACE_PEER,
21 ENV_WORKSPACE_PORT,
22};
23use crate::workspace::message::{AgentMessage, MessageType};
24use crate::{utils, MessageHandler, Processor, WorkerAgent};
25use sangedama::peer::message::data::{EventType, NodeMessage, NodeMessageTransporter};
26use sangedama::peer::node::{
27 create_key, create_key_from_bytes, get_peer_id, AdminPeer, AdminPeerConfig,
28};
29
30#[derive(Clone)]
31pub struct AdminAgentConfig {
32 pub name: String,
33 pub port: u16,
34 pub buffer_size: u16,
35}
36
37pub struct AdminAgent {
38 pub config: AdminAgentConfig,
39
40 _processor: Arc<Mutex<Arc<dyn Processor>>>,
41 _on_message: Arc<Mutex<Arc<dyn MessageHandler>>>,
42 _on_event: Arc<Mutex<Arc<dyn EventHandler>>>,
43
44 pub broadcast_emitter: mpsc::Sender<NodeMessageTransporter>,
45 pub broadcast_receiver: Arc<Mutex<mpsc::Receiver<NodeMessageTransporter>>>,
46
47 _peer_id: String,
48
49 _key: Vec<u8>,
50
51 pub shutdown_send: mpsc::UnboundedSender<String>,
52 pub shutdown_recv: Arc<Mutex<mpsc::UnboundedReceiver<String>>>,
53 _worker_details: Vec<AgentDetail>,
54}
55
56impl AdminAgent {
57 pub fn new(
58 config: AdminAgentConfig,
59 on_message: Arc<dyn MessageHandler>,
60 processor: Arc<dyn Processor>,
61 on_event: Arc<dyn EventHandler>,
62 ) -> Self {
63 let (broadcast_emitter, broadcast_receiver) = mpsc::channel::<NodeMessageTransporter>(2);
64
65 let admin_peer_key = create_key();
66 let id = get_peer_id(&admin_peer_key).to_string();
67
68 let (shutdown_send, shutdown_recv) = mpsc::unbounded_channel::<String>();
69
70 Self {
71 config,
72 _on_message: Arc::new(Mutex::new(on_message)),
73 _processor: Arc::new(Mutex::new(processor)),
74 _on_event: Arc::new(Mutex::new(on_event)),
75
76 broadcast_emitter,
77 broadcast_receiver: Arc::new(Mutex::new(broadcast_receiver)),
78 _peer_id: id,
79
80 _key: admin_peer_key.to_protobuf_encoding().unwrap(),
81
82 shutdown_send,
83 shutdown_recv: Arc::new(Mutex::new(shutdown_recv)),
84
85 _worker_details: Vec::new(),
86 }
87 }
88
89 pub async fn send_direct(&self, to_peer: String, message: Vec<u8>) {
90 let node_message = AgentMessage::create_direct_message(message, to_peer.clone());
91 info!("Sending direct message to {}", to_peer.clone());
92 match self
93 .broadcast_emitter
94 .send((self.details().id, node_message.to_bytes(), Some(to_peer)))
95 .await
96 {
97 Ok(_) => {}
98 Err(e) => {
99 error!("Failed to send direct message: {:?}", e);
100 }
101 }
102 }
103
104 pub async fn broadcast(&self, message: Vec<u8>) {
105 let node_message = AgentMessage::create_broadcast_message(message);
106 match self
107 .broadcast_emitter
108 .send((self.details().id, node_message.to_bytes(), None))
109 .await
110 {
111 Ok(_) => {}
112 Err(e) => {
113 error!("Failed to send broadcast message: {:?}", e);
114 }
115 }
116 }
117
118 pub async fn start(&self, inputs: Vec<u8>, agents: Vec<Arc<WorkerAgent>>) {
119 self.run_(inputs, agents).await;
120 }
121
122 pub async fn stop(&self) {
123 info!("Agent {} stop called", self.config.name);
124 self.shutdown_send.send(self._peer_id.clone()).unwrap();
125 }
126
127 pub fn details(&self) -> AgentDetail {
128 AgentDetail {
129 name: self.config.name.clone(),
130 id: self._peer_id.clone(),
131 role: "admin".to_string(),
132 }
133 }
134 async fn run_(&self, inputs: Vec<u8>, agents: Vec<Arc<WorkerAgent>>) {
135 info!("Agent {} running", self.config.name);
136
137 let runtime = tokio::runtime::Builder::new_multi_thread()
138 .enable_all()
139 .build()
140 .unwrap();
141
142 let cancel_token = CancellationToken::new();
143
144 let handle = runtime.handle().clone();
145
146 let worker_details: RwLock<HashMap<String, AgentDetail>> = RwLock::new(HashMap::new());
147
148 let config = self.config.clone();
149 let admin_config = AdminPeerConfig::new(config.port, config.name.clone(), Some(config.buffer_size as usize));
150
151 let peer_key = create_key_from_bytes(self._key.clone());
152
153 let (mut peer_, mut peer_listener_) =
154 AdminPeer::create(admin_config.clone(), peer_key).await;
155
156 let name = self.config.name.clone();
157 let port = self.config.port;
158
159 if peer_.id == self._peer_id {
160 info!("Admin peer created {}", peer_.id.clone());
161
162 let mut env_vars = HashMap::new();
163 env_vars.insert(ENV_WORKSPACE_PEER, peer_.id.clone());
164 env_vars.insert(ENV_WORKSPACE_PORT, port.clone().to_string());
165 env_vars.insert(ENV_WORKSPACE_ID, name.clone());
166 env_vars.insert(ENV_WORKSPACE_IP, "127.0.0.1".to_string());
167
168 if let Err(e) = utils::env::write_to_env_file(&env_vars) {
169 eprintln!("Failed to write to .ceylon_network file: {}", e);
170 } else {
171 println!("Successfully wrote variables to .ceylon_network file");
172 }
173
174 println!("------------------------------------------------------------------");
175 println!("| Important");
176 println!("| {}={}", ENV_WORKSPACE_ID, name.clone());
177 println!("| {}={}", ENV_WORKSPACE_PEER, peer_.id.clone());
178 println!("| {}={}", ENV_WORKSPACE_PORT, port);
179 println!("| {}={}", ENV_WORKSPACE_IP, "127.0.0.1");
180 println!("| Use this ServerAdmin peer ID to connect to the server");
181 println!("-------------------------------------------------------------------");
182 } else {
183 panic!("Id mismatch");
184 }
185 let admin_id = peer_.id.clone();
186 let admin_emitter = peer_.emitter();
187
188 let cancel_token_clone = cancel_token.clone();
189 let task_admin = handle.spawn(async move {
190 peer_.run(None, cancel_token_clone).await;
191 });
192
193 let mut worker_tasks = vec![];
194
195 let _inputs = inputs.clone();
196 let admin_id_ = admin_id.clone();
197
198 let cancel_token_clone = cancel_token.clone();
199 for agent in agents {
200 let _inputs_ = _inputs.clone();
201 let agent_ = agent.clone();
202 let _admin_id_ = admin_id_.clone();
203 let mut config = agent_.config.clone();
204 config.admin_peer = _admin_id_.clone();
205 let tasks = agent_
206 .run_with_config(
207 _inputs_.clone(),
208 config,
209 handle.clone(),
210 cancel_token_clone.clone(),
211 )
212 .await;
213 let agent_detail = agent_.details();
214
215 worker_details
216 .write()
217 .await
218 .insert(agent_detail.id.clone(), agent_detail);
219
220 for task in tasks {
221 worker_tasks.push(task);
222 }
223 }
224
225 info!("Worker tasks created");
226
227 let worker_tasks = join_all(worker_tasks);
228 let cancel_token_clone = cancel_token.clone();
229 let worker_task_runner = handle.spawn(async move {
230 worker_tasks.await;
231 loop {
232 if cancel_token_clone.is_cancelled() {
233 break;
234 }
235 }
236 });
237
238 let name = self.config.name.clone();
239 let on_message = self._on_message.clone();
240 let on_event = self._on_event.clone();
241
242 let cancel_token_clone = cancel_token.clone();
243
244 let mut is_call_agent_on_connect_list: HashMap<String, bool> = HashMap::new();
245 let peer_id = self._peer_id.clone();
246 let task_admin_listener = handle.spawn(async move {
247 loop {
248 select! {
249 _ = cancel_token_clone.cancelled() => {
250 break;
251 }
252 event = peer_listener_.recv() => {
253 if let Some(event) = event {
254 match event {
255 NodeMessage::Message{ data, created_by, time,..} => {
256 let agent_message = AgentMessage::from_bytes(data);
257
258 match agent_message {
259 AgentMessage::NodeMessage { message,message_type,.. } => {
260 match message_type {
261 MessageType::Direct { to_peer } => {
262 if to_peer == peer_id {
264 on_message.lock().await.on_message(
265 created_by,
266 message,
267 time,
268 ).await;
269 }
270 }
271 MessageType::Broadcast => {
272 on_message.lock().await.on_message(
273 created_by,
274 message,
275 time,
276 ).await;
277 }
278 }
279 }
280 AgentMessage::AgentIntroduction { id, name,role,topic } => {
281 info!( "Agent introduction {:?}", id);
282 let peer_id = id.clone();
283 let id_key = id.clone();
284 let agent_detail = AgentDetail {
285 name,
286 id,
287 role
288 };
289 worker_details.write().await.insert(id_key, agent_detail);
290 let is_call_agent_on_connect = is_call_agent_on_connect_list.get( &peer_id).unwrap_or(&false).clone();
291 if !is_call_agent_on_connect{
292 if let Some(agent) = worker_details.read().await.get(&peer_id) {
293 let agent = agent.clone();
294 on_event.lock().await.on_agent_connected(topic,agent)
295 .await;
296 is_call_agent_on_connect_list.insert(peer_id, true);
297 }
298 }
299 }
300 _ => {
301 info!("Agent listener {:?}", agent_message);
302 }
303 }
304 }
305 NodeMessage::Event {
306 event,
307 ..
308 }=>{
309 match event{
310 EventType::Subscribe{
311 peer_id,
312 topic,
313 }=>{
314 let is_call_agent_on_connect = is_call_agent_on_connect_list.get( &peer_id).unwrap_or(&false).clone();
315 if !is_call_agent_on_connect{
316 if let Some(agent) = worker_details.read().await.get(&peer_id) {
317 let agent = agent.clone();
318 on_event.lock().await.on_agent_connected(topic,agent)
319 .await;
320 is_call_agent_on_connect_list.insert(peer_id, true);
321 }
322 }
323 }
324 _ => {
325 info!("Admin Received Event {:?}", event);
326 }
327 }
328 }
329 }
330 }
331 }
332 }
333 }
334 });
335
336 let processor = self._processor.clone();
337 let processor_input_clone = inputs.clone();
338 let cancel_token_clone = cancel_token.clone();
339 let run_process = handle.spawn(async move {
340 processor.lock().await.run(processor_input_clone).await;
341 loop {
342 if cancel_token_clone.is_cancelled() {
343 break;
344 }
345 }
346 });
347 let cancel_token_clone = cancel_token.clone();
348 let run_holder_process = handle.spawn(async move {
349 loop {
350 if cancel_token_clone.is_cancelled() {
351 break;
352 }
353 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
354 }
355 });
356
357 let broadcast_receiver = self.broadcast_receiver.clone();
358 let cancel_token_clone = cancel_token.clone();
359 let run_broadcast = handle.spawn(async move {
360 loop {
361 if cancel_token_clone.is_cancelled() {
362 break;
363 } else if let Some(raw_data) = broadcast_receiver.lock().await.recv().await {
364 match admin_emitter.send(raw_data).await {
366 Ok(_) => {
367 continue;
368 }
369 Err(_) => continue,
370 };
371 }
372 }
373 });
374 let shutdown_recv = self.shutdown_recv.clone();
375 let admin_id_clone = admin_id.clone();
376 let shutdown_task = handle.spawn(async move {
377 loop {
378 if let Some(raw_data) = shutdown_recv.lock().await.recv().await {
379 if raw_data == admin_id_clone {
380 info!("Received shutdown signal, shutting down ...");
381 cancel_token.cancel();
382 break;
383 }
384 }
385 }
386 });
387
388 let shutdown_tx = Arc::new(self.shutdown_send.clone());
389 let shutdown_tx = shutdown_tx.clone();
390 let admin_id_clone = admin_id.clone();
391 handle
392 .spawn(async move {
393 select! {
394 _ = run_holder_process => {
395 info!("Agent {} run_holder_process done", name);
396 }
397 _ = worker_task_runner => {
398 info!("Agent {} task_admin_listener done", name);
399 }
400 _ = task_admin => {
401 info!("Agent {} task_admin done", name);
402 }
403 _ = task_admin_listener => {
404 info!("Agent {} task_admin_listener done", name);
405 }
406 _ = run_process => {
407 info!("Agent {} run_process done", name);
408 }
409 _ = run_broadcast => {
410 info!("Agent {} run_broadcast done", name);
411 }
412 _ = shutdown_task => {
413 info!("Agent {} run_broadcast done", name);
414 }
415 _ = signal::ctrl_c() => {
416 println!("Agent {:?} received exit signal", name);
417 shutdown_tx.send(admin_id_clone).unwrap();
418 }
420 }
421 })
422 .await
423 .unwrap();
424 }
425}