1use async_trait::async_trait;
2use ceylon_core::agent::{Agent, AgentContext};
3use ceylon_core::error::{Error, Result};
4use ceylon_core::mesh::Mesh;
5use ceylon_core::message::Message;
6use ceylon_core::request_queue::{MeshRequest, MeshResult, RequestQueue};
7use dashmap::DashMap;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::mpsc::{self, Sender};
11use tokio::task::JoinHandle;
12use tracing::{error, trace};
13
14pub struct LocalMesh {
15 name: String,
16 agents: DashMap<String, Sender<Message>>,
17 tasks: DashMap<String, JoinHandle<()>>,
18 request_queue: Arc<RequestQueue>,
19}
20
21impl LocalMesh {
22 pub fn new(name: impl Into<String>) -> Self {
23 Self {
24 name: name.into(),
25 agents: DashMap::new(),
26 tasks: DashMap::new(),
27 request_queue: Arc::new(RequestQueue::new()),
28 }
29 }
30
31 pub fn request_queue(&self) -> Arc<RequestQueue> {
33 self.request_queue.clone()
34 }
35}
36
37#[async_trait]
38impl Mesh for LocalMesh {
39 async fn start(&self) -> Result<()> {
40 Ok(())
43 }
44
45 async fn stop(&self) -> Result<()> {
46 for entry in self.tasks.iter() {
47 entry.value().abort();
48 }
49 self.tasks.clear();
50 self.agents.clear();
51 Ok(())
52 }
53
54 async fn add_agent(&self, mut agent: Box<dyn Agent + 'static>) -> Result<()> {
55 let name = agent.name();
56 if self.agents.contains_key(&name) {
57 return Err(Error::MeshError(format!("Agent {} already exists", name)));
58 }
59
60 let (tx, mut rx) = mpsc::channel(100);
61 self.agents.insert(name.clone(), tx);
62
63 let mesh_name = self.name.clone();
64 let agent_name = name.clone();
65 let request_queue = self.request_queue.clone();
66
67 let handle = tokio::spawn(async move {
69 let mut ctx = AgentContext::new(mesh_name, Some(request_queue));
70 if let Err(e) = agent.on_start(&mut ctx).await {
71 error!(agent = %agent_name, error = ?e, "Error starting agent");
72 return;
73 }
74
75 while let Some(msg) = rx.recv().await {
76 let now = chrono::Utc::now().timestamp_micros();
77 let latency = (now - msg.created_at).max(0) as u64;
78 trace!(agent = %agent_name, latency_us = latency, "processing message");
79
80 let start = std::time::Instant::now();
81 if let Err(e) = agent.on_message(msg, &mut ctx).await {
82 error!(agent = %agent_name, error = ?e, "Error processing message");
83 }
84 let duration = start.elapsed().as_micros() as u64;
85 trace!(agent = %agent_name, duration_us = duration, "message processed");
86 }
87
88 if let Err(e) = agent.on_stop(&mut ctx).await {
89 error!(agent = %agent_name, error = ?e, "Error stopping agent");
90 }
91 });
92
93 self.tasks.insert(name, handle);
94 Ok(())
95 }
96
97 async fn send(&self, message: Message, target: &str) -> Result<()> {
98 if let Some(sender) = self.agents.get(target) {
99 sender
100 .send(message)
101 .await
102 .map_err(|_| Error::MeshError(format!("Failed to send to agent {}", target)))?;
103 Ok(())
104 } else {
105 Err(Error::AgentNotFound(target.to_string()))
106 }
107 }
108}
109
110impl LocalMesh {
111 pub async fn broadcast(
113 &self,
114 message: Message,
115 exclude: Option<&str>,
116 ) -> Result<Vec<Result<()>>> {
117 let mut results = Vec::new();
118
119 for entry in self.agents.iter() {
120 let agent_name = entry.key();
121
122 if let Some(excluded) = exclude {
124 if agent_name == excluded {
125 continue;
126 }
127 }
128
129 let msg = Message::new(
131 message.sender.clone(),
132 message.payload.clone(),
133 agent_name.clone(),
134 );
135
136 let result = entry.value().send(msg).await.map_err(|_| {
137 Error::MeshError(format!("Failed to broadcast to agent {}", agent_name))
138 });
139
140 results.push(result);
141 }
142
143 Ok(results)
144 }
145
146 pub async fn submit(&self, target: &str, payload: String) -> Result<String> {
148 let request_id = self.request_queue.submit(target, payload.clone());
149
150 let mut msg = Message::new("request", payload.into_bytes(), target);
151 msg.set_correlation_id(&request_id);
152
153 self.send(msg, target).await?;
154 Ok(request_id)
155 }
156
157 pub fn get_pending(&self) -> Vec<MeshRequest> {
159 self.request_queue.get_pending()
160 }
161
162 pub fn has_pending(&self) -> bool {
164 self.request_queue.has_pending()
165 }
166
167 pub fn get_results(&self) -> Vec<MeshResult> {
169 self.request_queue.take_results()
170 }
171
172 pub fn peek_results(&self) -> Vec<MeshResult> {
174 self.request_queue.peek_results()
175 }
176
177 pub async fn send_reminders(&self, older_than_secs: f64) -> Result<Vec<String>> {
179 let stale = self
180 .request_queue
181 .get_stale(Duration::from_secs_f64(older_than_secs));
182 let mut reminded = Vec::new();
183
184 for req in stale {
185 let mut reminder_msg = Message::new(
186 "reminder",
187 format!("REMINDER: Please complete request {}", req.id).into_bytes(),
188 &req.target,
189 );
190 reminder_msg.set_correlation_id(&req.id);
191
192 if self.send(reminder_msg, &req.target).await.is_ok() {
193 self.request_queue.increment_reminder(&req.id);
194 reminded.push(req.id);
195 }
196 }
197
198 Ok(reminded)
199 }
200
201 pub async fn wait_for(
203 &self,
204 request_id: &str,
205 timeout_secs: f64,
206 reminder_interval_secs: f64,
207 ) -> Result<MeshResult> {
208 let timeout = Duration::from_secs_f64(timeout_secs);
209 let reminder_interval = Duration::from_secs_f64(reminder_interval_secs);
210 let deadline = std::time::Instant::now() + timeout;
211 let mut last_reminder = std::time::Instant::now();
212
213 loop {
214 if let Some(result) = self.request_queue.take_result(request_id) {
216 return Ok(result);
217 }
218
219 let pending = self.request_queue.get_pending();
221 let request = pending.iter().find(|r| r.id == request_id);
222 if request.is_none() {
223 return Err(Error::MeshError(format!(
224 "Request {} not found",
225 request_id
226 )));
227 }
228
229 if std::time::Instant::now() >= deadline {
231 return Err(Error::MeshError(format!(
232 "Request {} timed out",
233 request_id
234 )));
235 }
236
237 if last_reminder.elapsed() >= reminder_interval {
239 let _ = self.send_reminders(reminder_interval_secs).await;
240 last_reminder = std::time::Instant::now();
241 }
242
243 tokio::time::sleep(Duration::from_millis(100)).await;
245 }
246 }
247
248 pub async fn collect_results(&self, reminder_interval_secs: f64) -> Vec<MeshResult> {
250 let reminder_interval = Duration::from_secs_f64(reminder_interval_secs);
251 let mut last_reminder = std::time::Instant::now();
252 let mut all_results = Vec::new();
253
254 while self.request_queue.has_pending() {
255 let results = self.request_queue.take_results();
257 all_results.extend(results);
258
259 if last_reminder.elapsed() >= reminder_interval {
261 let _ = self.send_reminders(reminder_interval_secs).await;
262 last_reminder = std::time::Instant::now();
263 }
264
265 tokio::time::sleep(Duration::from_millis(100)).await;
267 }
268
269 let results = self.request_queue.take_results();
271 all_results.extend(results);
272
273 all_results
274 }
275}