ceylon_local/
mesh.rs

1use async_trait::async_trait;
2use ceylon_core::agent::{Agent, AgentContext};
3use ceylon_core::error::{Error, Result};
4use ceylon_core::mesh::Mesh;
5use ceylon_core::message::Message;
6use ceylon_core::request_queue::{MeshRequest, MeshResult, RequestQueue};
7use dashmap::DashMap;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::mpsc::{self, Sender};
11use tokio::task::JoinHandle;
12use tracing::{error, trace};
13
14pub struct LocalMesh {
15    name: String,
16    agents: DashMap<String, Sender<Message>>,
17    tasks: DashMap<String, JoinHandle<()>>,
18    request_queue: Arc<RequestQueue>,
19}
20
21impl LocalMesh {
22    pub fn new(name: impl Into<String>) -> Self {
23        Self {
24            name: name.into(),
25            agents: DashMap::new(),
26            tasks: DashMap::new(),
27            request_queue: Arc::new(RequestQueue::new()),
28        }
29    }
30
31    /// Get the request queue (for sharing with agent wrappers)
32    pub fn request_queue(&self) -> Arc<RequestQueue> {
33        self.request_queue.clone()
34    }
35}
36
37#[async_trait]
38impl Mesh for LocalMesh {
39    async fn start(&self) -> Result<()> {
40        // For local mesh, start might just be a signal,
41        // but agents are started when added for now.
42        Ok(())
43    }
44
45    async fn stop(&self) -> Result<()> {
46        for entry in self.tasks.iter() {
47            entry.value().abort();
48        }
49        self.tasks.clear();
50        self.agents.clear();
51        Ok(())
52    }
53
54    async fn add_agent(&self, mut agent: Box<dyn Agent + 'static>) -> Result<()> {
55        let name = agent.name();
56        if self.agents.contains_key(&name) {
57            return Err(Error::MeshError(format!("Agent {} already exists", name)));
58        }
59
60        let (tx, mut rx) = mpsc::channel(100);
61        self.agents.insert(name.clone(), tx);
62
63        let mesh_name = self.name.clone();
64        let agent_name = name.clone();
65        let request_queue = self.request_queue.clone();
66
67        // Spawn agent loop
68        let handle = tokio::spawn(async move {
69            let mut ctx = AgentContext::new(mesh_name, Some(request_queue));
70            if let Err(e) = agent.on_start(&mut ctx).await {
71                error!(agent = %agent_name, error = ?e, "Error starting agent");
72                return;
73            }
74
75            while let Some(msg) = rx.recv().await {
76                let now = chrono::Utc::now().timestamp_micros();
77                let latency = (now - msg.created_at).max(0) as u64;
78                trace!(agent = %agent_name, latency_us = latency, "processing message");
79
80                let start = std::time::Instant::now();
81                if let Err(e) = agent.on_message(msg, &mut ctx).await {
82                    error!(agent = %agent_name, error = ?e, "Error processing message");
83                }
84                let duration = start.elapsed().as_micros() as u64;
85                trace!(agent = %agent_name, duration_us = duration, "message processed");
86            }
87
88            if let Err(e) = agent.on_stop(&mut ctx).await {
89                error!(agent = %agent_name, error = ?e, "Error stopping agent");
90            }
91        });
92
93        self.tasks.insert(name, handle);
94        Ok(())
95    }
96
97    async fn send(&self, message: Message, target: &str) -> Result<()> {
98        if let Some(sender) = self.agents.get(target) {
99            sender
100                .send(message)
101                .await
102                .map_err(|_| Error::MeshError(format!("Failed to send to agent {}", target)))?;
103            Ok(())
104        } else {
105            Err(Error::AgentNotFound(target.to_string()))
106        }
107    }
108}
109
110impl LocalMesh {
111    /// Broadcast a message to all agents in the mesh
112    pub async fn broadcast(
113        &self,
114        message: Message,
115        exclude: Option<&str>,
116    ) -> Result<Vec<Result<()>>> {
117        let mut results = Vec::new();
118
119        for entry in self.agents.iter() {
120            let agent_name = entry.key();
121
122            // Skip excluded agent if specified
123            if let Some(excluded) = exclude {
124                if agent_name == excluded {
125                    continue;
126                }
127            }
128
129            // Clone message for each agent
130            let msg = Message::new(
131                message.sender.clone(),
132                message.payload.clone(),
133                agent_name.clone(),
134            );
135
136            let result = entry.value().send(msg).await.map_err(|_| {
137                Error::MeshError(format!("Failed to broadcast to agent {}", agent_name))
138            });
139
140            results.push(result);
141        }
142
143        Ok(results)
144    }
145
146    /// Submit a request (fire-and-forget). Returns request ID.
147    pub async fn submit(&self, target: &str, payload: String) -> Result<String> {
148        let request_id = self.request_queue.submit(target, payload.clone());
149
150        let mut msg = Message::new("request", payload.into_bytes(), target);
151        msg.set_correlation_id(&request_id);
152
153        self.send(msg, target).await?;
154        Ok(request_id)
155    }
156
157    /// Get pending requests
158    pub fn get_pending(&self) -> Vec<MeshRequest> {
159        self.request_queue.get_pending()
160    }
161
162    /// Check if there are pending requests
163    pub fn has_pending(&self) -> bool {
164        self.request_queue.has_pending()
165    }
166
167    /// Get available results (removes them from queue)
168    pub fn get_results(&self) -> Vec<MeshResult> {
169        self.request_queue.take_results()
170    }
171
172    /// Peek at results without removing
173    pub fn peek_results(&self) -> Vec<MeshResult> {
174        self.request_queue.peek_results()
175    }
176
177    /// Send reminders for stale requests
178    pub async fn send_reminders(&self, older_than_secs: f64) -> Result<Vec<String>> {
179        let stale = self
180            .request_queue
181            .get_stale(Duration::from_secs_f64(older_than_secs));
182        let mut reminded = Vec::new();
183
184        for req in stale {
185            let mut reminder_msg = Message::new(
186                "reminder",
187                format!("REMINDER: Please complete request {}", req.id).into_bytes(),
188                &req.target,
189            );
190            reminder_msg.set_correlation_id(&req.id);
191
192            if self.send(reminder_msg, &req.target).await.is_ok() {
193                self.request_queue.increment_reminder(&req.id);
194                reminded.push(req.id);
195            }
196        }
197
198        Ok(reminded)
199    }
200
201    /// Wait for a specific result with auto-reminders
202    pub async fn wait_for(
203        &self,
204        request_id: &str,
205        timeout_secs: f64,
206        reminder_interval_secs: f64,
207    ) -> Result<MeshResult> {
208        let timeout = Duration::from_secs_f64(timeout_secs);
209        let reminder_interval = Duration::from_secs_f64(reminder_interval_secs);
210        let deadline = std::time::Instant::now() + timeout;
211        let mut last_reminder = std::time::Instant::now();
212
213        loop {
214            // Check if result is available
215            if let Some(result) = self.request_queue.take_result(request_id) {
216                return Ok(result);
217            }
218
219            // Check if request still exists
220            let pending = self.request_queue.get_pending();
221            let request = pending.iter().find(|r| r.id == request_id);
222            if request.is_none() {
223                return Err(Error::MeshError(format!(
224                    "Request {} not found",
225                    request_id
226                )));
227            }
228
229            // Check timeout
230            if std::time::Instant::now() >= deadline {
231                return Err(Error::MeshError(format!(
232                    "Request {} timed out",
233                    request_id
234                )));
235            }
236
237            // Send reminder if interval passed
238            if last_reminder.elapsed() >= reminder_interval {
239                let _ = self.send_reminders(reminder_interval_secs).await;
240                last_reminder = std::time::Instant::now();
241            }
242
243            // Wait briefly
244            tokio::time::sleep(Duration::from_millis(100)).await;
245        }
246    }
247
248    /// Collect all results, blocking until all pending complete
249    pub async fn collect_results(&self, reminder_interval_secs: f64) -> Vec<MeshResult> {
250        let reminder_interval = Duration::from_secs_f64(reminder_interval_secs);
251        let mut last_reminder = std::time::Instant::now();
252        let mut all_results = Vec::new();
253
254        while self.request_queue.has_pending() {
255            // Collect any available results
256            let results = self.request_queue.take_results();
257            all_results.extend(results);
258
259            // Send reminders if interval passed
260            if last_reminder.elapsed() >= reminder_interval {
261                let _ = self.send_reminders(reminder_interval_secs).await;
262                last_reminder = std::time::Instant::now();
263            }
264
265            // Wait briefly before checking again
266            tokio::time::sleep(Duration::from_millis(100)).await;
267        }
268
269        // Get any final results
270        let results = self.request_queue.take_results();
271        all_results.extend(results);
272
273        all_results
274    }
275}