ceylon_core/
request_queue.rs1use dashmap::DashMap;
2use serde::{Deserialize, Serialize};
3use std::collections::VecDeque;
4use std::sync::Mutex;
5use std::time::{Duration, Instant};
6use tokio::sync::Notify;
7use uuid::Uuid;
8
9#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
11pub enum RequestStatus {
12 Pending,
13 Completed,
14 Failed(String),
15 TimedOut,
16}
17
18#[derive(Debug, Clone)]
20pub struct MeshRequest {
21 pub id: String,
22 pub target: String,
23 pub payload: String,
24 pub submitted_at: Instant,
25 pub status: RequestStatus,
26 pub reminder_count: u32,
27}
28
29impl MeshRequest {
30 pub fn new(target: impl Into<String>, payload: impl Into<String>) -> Self {
31 Self {
32 id: Uuid::new_v4().to_string(),
33 target: target.into(),
34 payload: payload.into(),
35 submitted_at: Instant::now(),
36 status: RequestStatus::Pending,
37 reminder_count: 0,
38 }
39 }
40
41 pub fn elapsed_seconds(&self) -> f64 {
43 self.submitted_at.elapsed().as_secs_f64()
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct MeshResult {
50 pub request_id: String,
51 pub target: String,
52 pub response: String,
53 pub completed_at: Instant,
54 pub duration_ms: u64,
55}
56
57impl MeshResult {
58 pub fn new(
59 request_id: impl Into<String>,
60 target: impl Into<String>,
61 response: impl Into<String>,
62 duration_ms: u64,
63 ) -> Self {
64 Self {
65 request_id: request_id.into(),
66 target: target.into(),
67 response: response.into(),
68 completed_at: Instant::now(),
69 duration_ms,
70 }
71 }
72}
73
74pub struct RequestQueue {
76 pending: DashMap<String, MeshRequest>,
77 results: Mutex<VecDeque<MeshResult>>,
78 result_notify: Notify,
79 max_results: usize,
80}
81
82impl Default for RequestQueue {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl RequestQueue {
89 pub fn new() -> Self {
91 Self::with_max_results(1000)
92 }
93
94 pub fn with_max_results(max_results: usize) -> Self {
96 Self {
97 pending: DashMap::new(),
98 results: Mutex::new(VecDeque::new()),
99 result_notify: Notify::new(),
100 max_results,
101 }
102 }
103
104 pub fn submit(&self, target: &str, payload: String) -> String {
106 let request = MeshRequest::new(target, payload);
107 let id = request.id.clone();
108 self.pending.insert(id.clone(), request);
109 id
110 }
111
112 pub fn complete(&self, request_id: &str, response: String) {
114 if let Some((_, request)) = self.pending.remove(request_id) {
115 let duration_ms = request.submitted_at.elapsed().as_millis() as u64;
116 let result = MeshResult::new(request_id, &request.target, response, duration_ms);
117
118 let mut results = self.results.lock().unwrap();
119 results.push_back(result);
120
121 while results.len() > self.max_results {
123 results.pop_front();
124 }
125
126 self.result_notify.notify_waiters();
128 }
129 }
130
131 pub fn fail(&self, request_id: &str, error: String) {
133 if let Some(mut entry) = self.pending.get_mut(request_id) {
134 entry.status = RequestStatus::Failed(error);
135 }
136 self.pending.remove(request_id);
137 self.result_notify.notify_waiters();
138 }
139
140 pub fn get_pending(&self) -> Vec<MeshRequest> {
142 self.pending.iter().map(|e| e.value().clone()).collect()
143 }
144
145 pub fn get_stale(&self, older_than: Duration) -> Vec<MeshRequest> {
147 self.pending
148 .iter()
149 .filter(|e| e.value().submitted_at.elapsed() > older_than)
150 .map(|e| e.value().clone())
151 .collect()
152 }
153
154 pub fn has_pending(&self) -> bool {
156 !self.pending.is_empty()
157 }
158
159 pub fn pending_count(&self) -> usize {
161 self.pending.len()
162 }
163
164 pub fn increment_reminder(&self, request_id: &str) {
166 if let Some(mut entry) = self.pending.get_mut(request_id) {
167 entry.reminder_count += 1;
168 }
169 }
170
171 pub fn take_results(&self) -> Vec<MeshResult> {
173 let mut results = self.results.lock().unwrap();
174 results.drain(..).collect()
175 }
176
177 pub fn peek_results(&self) -> Vec<MeshResult> {
179 let results = self.results.lock().unwrap();
180 results.iter().cloned().collect()
181 }
182
183 pub fn take_result(&self, request_id: &str) -> Option<MeshResult> {
185 let mut results = self.results.lock().unwrap();
186 if let Some(pos) = results.iter().position(|r| r.request_id == request_id) {
187 results.remove(pos)
188 } else {
189 None
190 }
191 }
192
193 pub async fn wait_next(&self, timeout: Duration) -> Option<MeshResult> {
195 let deadline = Instant::now() + timeout;
196
197 loop {
198 {
200 let mut results = self.results.lock().unwrap();
201 if let Some(result) = results.pop_front() {
202 return Some(result);
203 }
204 }
205
206 if !self.has_pending() {
208 return None;
209 }
210
211 let remaining = deadline.saturating_duration_since(Instant::now());
213 if remaining.is_zero() {
214 return None; }
216
217 let _ = tokio::time::timeout(remaining, self.result_notify.notified()).await;
219 }
220 }
221
222 pub async fn wait_for(&self, request_id: &str, timeout: Duration) -> Option<MeshResult> {
224 let deadline = Instant::now() + timeout;
225
226 loop {
227 if let Some(result) = self.take_result(request_id) {
229 return Some(result);
230 }
231
232 if !self.pending.contains_key(request_id) {
234 return None; }
236
237 let remaining = deadline.saturating_duration_since(Instant::now());
239 if remaining.is_zero() {
240 return None; }
242
243 let _ = tokio::time::timeout(remaining, self.result_notify.notified()).await;
245 }
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 #[test]
254 fn test_submit_and_complete() {
255 let queue = RequestQueue::new();
256
257 let id = queue.submit("agent1", "hello".to_string());
258 assert_eq!(queue.pending_count(), 1);
259 assert!(queue.has_pending());
260
261 queue.complete(&id, "world".to_string());
262 assert_eq!(queue.pending_count(), 0);
263 assert!(!queue.has_pending());
264
265 let results = queue.take_results();
266 assert_eq!(results.len(), 1);
267 assert_eq!(results[0].request_id, id);
268 assert_eq!(results[0].response, "world");
269 }
270
271 #[test]
272 fn test_get_stale() {
273 let queue = RequestQueue::new();
274
275 queue.submit("agent1", "msg1".to_string());
276
277 let stale = queue.get_stale(Duration::from_secs(1));
279 assert!(stale.is_empty());
280 }
281
282 #[test]
283 fn test_increment_reminder() {
284 let queue = RequestQueue::new();
285
286 let id = queue.submit("agent1", "msg".to_string());
287
288 queue.increment_reminder(&id);
289 queue.increment_reminder(&id);
290
291 let pending = queue.get_pending();
292 assert_eq!(pending[0].reminder_count, 2);
293 }
294}