1use crate::agent::{Activation, Agent};
2use anyhow::Result;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::mpsc;
6use tokio::time::{sleep, Instant};
7use tracing::{debug, error, info, warn};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11enum CircuitState {
12 Closed, Open, HalfOpen, }
16
17pub struct CircuitBreaker {
19 state: Arc<tokio::sync::RwLock<CircuitState>>,
20 failure_count: Arc<tokio::sync::RwLock<u32>>,
21 last_failure_time: Arc<tokio::sync::RwLock<Option<Instant>>>,
22 failure_threshold: u32,
23 #[allow(dead_code)]
24 timeout: Duration,
25 half_open_timeout: Duration,
26}
27
28impl CircuitBreaker {
29 pub fn new(failure_threshold: u32, timeout: Duration) -> Self {
30 Self {
31 state: Arc::new(tokio::sync::RwLock::new(CircuitState::Closed)),
32 failure_count: Arc::new(tokio::sync::RwLock::new(0)),
33 last_failure_time: Arc::new(tokio::sync::RwLock::new(None)),
34 failure_threshold,
35 timeout,
36 half_open_timeout: Duration::from_secs(60), }
38 }
39
40 pub async fn call<F, Fut>(&self, f: F) -> Result<()>
41 where
42 F: FnOnce() -> Fut,
43 Fut: std::future::Future<Output = Result<()>>,
44 {
45 let state = *self.state.read().await;
46
47 match state {
48 CircuitState::Open => {
49 let last_failure = *self.last_failure_time.read().await;
50 if let Some(last) = last_failure {
51 if last.elapsed() > self.half_open_timeout {
52 *self.state.write().await = CircuitState::HalfOpen;
54 *self.failure_count.write().await = 0;
55 debug!("Circuit breaker transitioning to half-open");
56 } else {
57 return Err(anyhow::anyhow!("Circuit breaker is open"));
58 }
59 } else {
60 return Err(anyhow::anyhow!("Circuit breaker is open"));
61 }
62 }
63 CircuitState::HalfOpen => {
64 }
66 CircuitState::Closed => {
67 }
69 }
70
71 match f().await {
72 Ok(()) => {
73 *self.failure_count.write().await = 0;
75 if state == CircuitState::HalfOpen {
76 *self.state.write().await = CircuitState::Closed;
77 info!("Circuit breaker closed - service recovered");
78 }
79 Ok(())
80 }
81 Err(e) => {
82 let mut count = self.failure_count.write().await;
84 *count += 1;
85 *self.last_failure_time.write().await = Some(Instant::now());
86
87 if *count >= self.failure_threshold {
88 *self.state.write().await = CircuitState::Open;
89 warn!("Circuit breaker opened after {} failures", *count);
90 }
91 Err(e)
92 }
93 }
94 }
95}
96
97pub struct AgentTask {
99 pub activation: Activation,
100 pub agent: Arc<dyn Agent>,
101 pub max_retries: u32,
102 pub circuit_breaker: Option<Arc<CircuitBreaker>>,
103 pub dlq_sender: Option<mpsc::UnboundedSender<Activation>>,
104}
105
106impl AgentTask {
107 pub fn new(
108 activation: Activation,
109 agent: Arc<dyn Agent>,
110 max_retries: u32,
111 circuit_breaker: Option<Arc<CircuitBreaker>>,
112 dlq_sender: Option<mpsc::UnboundedSender<Activation>>,
113 ) -> Self {
114 Self {
115 activation,
116 agent,
117 max_retries,
118 circuit_breaker,
119 dlq_sender,
120 }
121 }
122
123 async fn execute_with_retry(&self) -> Result<()> {
124 let mut last_error = None;
125
126 for attempt in 0..=self.max_retries {
127 if attempt > 0 {
128 let delay = Duration::from_secs(2_u64.pow(attempt - 1));
130 debug!(
131 rule_id = %self.activation.rule_id,
132 attempt = attempt,
133 delay_secs = delay.as_secs(),
134 "Retrying agent execution"
135 );
136 sleep(delay).await;
137 }
138
139 let result = if let Some(cb) = &self.circuit_breaker {
140 cb.call(|| self.agent.execute(&self.activation)).await
141 } else {
142 self.agent.execute(&self.activation).await
143 };
144
145 match result {
146 Ok(()) => {
147 if attempt > 0 {
148 info!(
149 rule_id = %self.activation.rule_id,
150 attempt = attempt,
151 "Agent execution succeeded after retry"
152 );
153 }
154 return Ok(());
155 }
156 Err(e) => {
157 last_error = Some(e);
158 if attempt < self.max_retries {
159 warn!(
160 rule_id = %self.activation.rule_id,
161 attempt = attempt,
162 "Agent execution failed, will retry"
163 );
164 }
165 }
166 }
167 }
168
169 error!(
170 rule_id = %self.activation.rule_id,
171 max_retries = self.max_retries,
172 "Agent execution failed after all retries"
173 );
174 Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Unknown error")))
175 }
176}
177
178pub struct AgentQueue {
180 sender: mpsc::Sender<AgentTask>, pub dlq_sender: mpsc::UnboundedSender<Activation>, }
183
184impl AgentQueue {
185 pub fn new(capacity: Option<usize>) -> (Self, AgentQueueWorker) {
186 let capacity = capacity.unwrap_or(1000);
187 let (tx, rx) = mpsc::channel(capacity); let (dlq_tx, dlq_rx) = mpsc::unbounded_channel();
189
190 let worker = AgentQueueWorker {
191 receiver: rx,
192 dlq_receiver: Some(dlq_rx),
193 concurrency: 1, };
195
196 (
197 Self {
198 sender: tx,
199 dlq_sender: dlq_tx,
200 },
201 worker,
202 )
203 }
204
205 pub async fn enqueue(&self, task: AgentTask) -> Result<()> {
206 self.sender
208 .send(task)
209 .await
210 .map_err(|e| anyhow::anyhow!("Failed to enqueue agent task: {}", e))?;
211 Ok(())
212 }
213
214 pub fn try_enqueue(&self, task: AgentTask) -> Result<()> {
215 self.sender
217 .try_send(task)
218 .map_err(|e| anyhow::anyhow!("Agent queue full (backpressure): {}", e))?;
219 Ok(())
220 }
221}
222
223pub struct AgentQueueWorker {
225 receiver: mpsc::Receiver<AgentTask>,
226 #[allow(dead_code)]
227 dlq_receiver: Option<mpsc::UnboundedReceiver<Activation>>, concurrency: usize,
229}
230
231impl AgentQueueWorker {
232 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
233 self.concurrency = concurrency;
234 self
235 }
236}
237
238pub struct DeadLetterQueue {
240 receiver: mpsc::UnboundedReceiver<Activation>,
241}
242
243impl DeadLetterQueue {
244 pub fn new(receiver: mpsc::UnboundedReceiver<Activation>) -> Self {
245 Self { receiver }
246 }
247
248 pub async fn run(mut self) {
249 info!("Dead letter queue handler started");
250 let mut count = 0;
251 while let Some(activation) = self.receiver.recv().await {
252 count += 1;
253 warn!(
254 rule_id = %activation.rule_id,
255 rule_name = %activation.rule_name,
256 count = count,
257 "Activation sent to dead letter queue"
258 );
259 }
261 info!("Dead letter queue handler stopped");
262 }
263}
264
265impl AgentQueueWorker {
266 pub async fn run(mut self) {
267 let concurrency = self.concurrency.max(1); info!(concurrency = concurrency, "Agent queue worker started");
269
270 let (task_tx, task_rx) = mpsc::unbounded_channel::<AgentTask>();
273 let shared_rx = Arc::new(tokio::sync::Mutex::new(task_rx));
274
275 let mut worker_handles = Vec::new();
277 for worker_id in 0..concurrency {
278 let rx = Arc::clone(&shared_rx);
279 let handle = tokio::spawn(async move {
280 loop {
281 let task = {
282 let mut rx_guard = rx.lock().await;
283 rx_guard.recv().await
284 };
285 let task = match task {
286 Some(t) => t,
287 None => break, };
289 let start = Instant::now();
290 match task.execute_with_retry().await {
291 Ok(()) => {
292 let duration = start.elapsed();
293 debug!(
294 worker_id = worker_id,
295 rule_id = %task.activation.rule_id,
296 agent = task.agent.name(),
297 duration_ms = duration.as_millis(),
298 "Agent execution completed"
299 );
300 crate::metrics::METRICS.record_agent_execution_duration(
302 task.agent.name(),
303 duration.as_secs_f64(),
304 );
305 }
306 Err(e) => {
307 error!(
308 worker_id = worker_id,
309 rule_id = %task.activation.rule_id,
310 error = %e,
311 "Agent execution failed permanently"
312 );
313 if let Some(dlq) = &task.dlq_sender {
315 if let Err(dlq_err) = dlq.send(task.activation.clone()) {
316 warn!(
317 rule_id = %task.activation.rule_id,
318 error = %dlq_err,
319 "Failed to send to DLQ"
320 );
321 }
322 }
323 crate::metrics::METRICS
324 .agent_failures
325 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
326 }
327 }
328 }
329 });
330 worker_handles.push(handle);
331 }
332
333 while let Some(task) = self.receiver.recv().await {
335 if let Err(e) = task_tx.send(task) {
336 error!("Failed to distribute task to worker: {}", e);
337 break;
338 }
339 }
340
341 drop(task_tx);
343 for handle in worker_handles {
344 let _ = handle.await;
345 }
346
347 info!("Agent queue worker stopped");
348 }
349}