ceylon_core/
request_queue.rs

1use dashmap::DashMap;
2use serde::{Deserialize, Serialize};
3use std::collections::VecDeque;
4use std::sync::Mutex;
5use std::time::{Duration, Instant};
6use tokio::sync::Notify;
7use uuid::Uuid;
8
9/// Status of a mesh request
10#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
11pub enum RequestStatus {
12    Pending,
13    Completed,
14    Failed(String),
15    TimedOut,
16}
17
18/// A request submitted to the mesh
19#[derive(Debug, Clone)]
20pub struct MeshRequest {
21    pub id: String,
22    pub target: String,
23    pub payload: String,
24    pub submitted_at: Instant,
25    pub status: RequestStatus,
26    pub reminder_count: u32,
27}
28
29impl MeshRequest {
30    pub fn new(target: impl Into<String>, payload: impl Into<String>) -> Self {
31        Self {
32            id: Uuid::new_v4().to_string(),
33            target: target.into(),
34            payload: payload.into(),
35            submitted_at: Instant::now(),
36            status: RequestStatus::Pending,
37            reminder_count: 0,
38        }
39    }
40
41    /// Get elapsed time since submission in seconds
42    pub fn elapsed_seconds(&self) -> f64 {
43        self.submitted_at.elapsed().as_secs_f64()
44    }
45}
46
47/// A result from processing a mesh request
48#[derive(Debug, Clone)]
49pub struct MeshResult {
50    pub request_id: String,
51    pub target: String,
52    pub response: String,
53    pub completed_at: Instant,
54    pub duration_ms: u64,
55}
56
57impl MeshResult {
58    pub fn new(
59        request_id: impl Into<String>,
60        target: impl Into<String>,
61        response: impl Into<String>,
62        duration_ms: u64,
63    ) -> Self {
64        Self {
65            request_id: request_id.into(),
66            target: target.into(),
67            response: response.into(),
68            completed_at: Instant::now(),
69            duration_ms,
70        }
71    }
72}
73
74/// Manages pending requests and completed results for a mesh
75pub struct RequestQueue {
76    pending: DashMap<String, MeshRequest>,
77    results: Mutex<VecDeque<MeshResult>>,
78    result_notify: Notify,
79    max_results: usize,
80}
81
82impl Default for RequestQueue {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl RequestQueue {
89    /// Create a new request queue
90    pub fn new() -> Self {
91        Self::with_max_results(1000)
92    }
93
94    /// Create with a maximum result buffer size
95    pub fn with_max_results(max_results: usize) -> Self {
96        Self {
97            pending: DashMap::new(),
98            results: Mutex::new(VecDeque::new()),
99            result_notify: Notify::new(),
100            max_results,
101        }
102    }
103
104    /// Submit a new request, returns the request ID
105    pub fn submit(&self, target: &str, payload: String) -> String {
106        let request = MeshRequest::new(target, payload);
107        let id = request.id.clone();
108        self.pending.insert(id.clone(), request);
109        id
110    }
111
112    /// Mark a request as completed with a response
113    pub fn complete(&self, request_id: &str, response: String) {
114        if let Some((_, request)) = self.pending.remove(request_id) {
115            let duration_ms = request.submitted_at.elapsed().as_millis() as u64;
116            let result = MeshResult::new(request_id, &request.target, response, duration_ms);
117
118            let mut results = self.results.lock().unwrap();
119            results.push_back(result);
120
121            // Enforce max results limit
122            while results.len() > self.max_results {
123                results.pop_front();
124            }
125
126            // Wake up any waiters
127            self.result_notify.notify_waiters();
128        }
129    }
130
131    /// Mark a request as failed
132    pub fn fail(&self, request_id: &str, error: String) {
133        if let Some(mut entry) = self.pending.get_mut(request_id) {
134            entry.status = RequestStatus::Failed(error);
135        }
136        self.pending.remove(request_id);
137        self.result_notify.notify_waiters();
138    }
139
140    /// Get all pending requests
141    pub fn get_pending(&self) -> Vec<MeshRequest> {
142        self.pending.iter().map(|e| e.value().clone()).collect()
143    }
144
145    /// Get pending requests older than the specified duration
146    pub fn get_stale(&self, older_than: Duration) -> Vec<MeshRequest> {
147        self.pending
148            .iter()
149            .filter(|e| e.value().submitted_at.elapsed() > older_than)
150            .map(|e| e.value().clone())
151            .collect()
152    }
153
154    /// Check if there are any pending requests
155    pub fn has_pending(&self) -> bool {
156        !self.pending.is_empty()
157    }
158
159    /// Get number of pending requests
160    pub fn pending_count(&self) -> usize {
161        self.pending.len()
162    }
163
164    /// Increment reminder count for a request
165    pub fn increment_reminder(&self, request_id: &str) {
166        if let Some(mut entry) = self.pending.get_mut(request_id) {
167            entry.reminder_count += 1;
168        }
169    }
170
171    /// Get available results (removes them from the queue)
172    pub fn take_results(&self) -> Vec<MeshResult> {
173        let mut results = self.results.lock().unwrap();
174        results.drain(..).collect()
175    }
176
177    /// Peek at results without removing them
178    pub fn peek_results(&self) -> Vec<MeshResult> {
179        let results = self.results.lock().unwrap();
180        results.iter().cloned().collect()
181    }
182
183    /// Get result for a specific request (removes it if found)
184    pub fn take_result(&self, request_id: &str) -> Option<MeshResult> {
185        let mut results = self.results.lock().unwrap();
186        if let Some(pos) = results.iter().position(|r| r.request_id == request_id) {
187            results.remove(pos)
188        } else {
189            None
190        }
191    }
192
193    /// Wait for the next result to become available
194    pub async fn wait_next(&self, timeout: Duration) -> Option<MeshResult> {
195        let deadline = Instant::now() + timeout;
196
197        loop {
198            // Check for available results first
199            {
200                let mut results = self.results.lock().unwrap();
201                if let Some(result) = results.pop_front() {
202                    return Some(result);
203                }
204            }
205
206            // No pending requests means nothing to wait for
207            if !self.has_pending() {
208                return None;
209            }
210
211            // Calculate remaining time
212            let remaining = deadline.saturating_duration_since(Instant::now());
213            if remaining.is_zero() {
214                return None; // Timeout
215            }
216
217            // Wait for notification or timeout
218            let _ = tokio::time::timeout(remaining, self.result_notify.notified()).await;
219        }
220    }
221
222    /// Wait for a specific result
223    pub async fn wait_for(&self, request_id: &str, timeout: Duration) -> Option<MeshResult> {
224        let deadline = Instant::now() + timeout;
225
226        loop {
227            // Check if result is available
228            if let Some(result) = self.take_result(request_id) {
229                return Some(result);
230            }
231
232            // Check if request is still pending
233            if !self.pending.contains_key(request_id) {
234                return None; // Request doesn't exist or was removed
235            }
236
237            // Calculate remaining time
238            let remaining = deadline.saturating_duration_since(Instant::now());
239            if remaining.is_zero() {
240                return None; // Timeout
241            }
242
243            // Wait for notification or timeout
244            let _ = tokio::time::timeout(remaining, self.result_notify.notified()).await;
245        }
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    #[test]
254    fn test_submit_and_complete() {
255        let queue = RequestQueue::new();
256
257        let id = queue.submit("agent1", "hello".to_string());
258        assert_eq!(queue.pending_count(), 1);
259        assert!(queue.has_pending());
260
261        queue.complete(&id, "world".to_string());
262        assert_eq!(queue.pending_count(), 0);
263        assert!(!queue.has_pending());
264
265        let results = queue.take_results();
266        assert_eq!(results.len(), 1);
267        assert_eq!(results[0].request_id, id);
268        assert_eq!(results[0].response, "world");
269    }
270
271    #[test]
272    fn test_get_stale() {
273        let queue = RequestQueue::new();
274
275        queue.submit("agent1", "msg1".to_string());
276
277        // Immediately, nothing is stale
278        let stale = queue.get_stale(Duration::from_secs(1));
279        assert!(stale.is_empty());
280    }
281
282    #[test]
283    fn test_increment_reminder() {
284        let queue = RequestQueue::new();
285
286        let id = queue.submit("agent1", "msg".to_string());
287
288        queue.increment_reminder(&id);
289        queue.increment_reminder(&id);
290
291        let pending = queue.get_pending();
292        assert_eq!(pending[0].reminder_count, 2);
293    }
294}