fuse_rule/
agent_queue.rs

1use crate::agent::{Activation, Agent};
2use anyhow::Result;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::sync::mpsc;
6use tokio::time::{sleep, Instant};
7use tracing::{debug, error, info, warn};
8
9/// Circuit breaker state
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11enum CircuitState {
12    Closed,   // Normal operation
13    Open,     // Failing, reject requests
14    HalfOpen, // Testing if service recovered
15}
16
17/// Circuit breaker for agent execution
18pub struct CircuitBreaker {
19    state: Arc<tokio::sync::RwLock<CircuitState>>,
20    failure_count: Arc<tokio::sync::RwLock<u32>>,
21    last_failure_time: Arc<tokio::sync::RwLock<Option<Instant>>>,
22    failure_threshold: u32,
23    #[allow(dead_code)]
24    timeout: Duration,
25    half_open_timeout: Duration,
26}
27
28impl CircuitBreaker {
29    pub fn new(failure_threshold: u32, timeout: Duration) -> Self {
30        Self {
31            state: Arc::new(tokio::sync::RwLock::new(CircuitState::Closed)),
32            failure_count: Arc::new(tokio::sync::RwLock::new(0)),
33            last_failure_time: Arc::new(tokio::sync::RwLock::new(None)),
34            failure_threshold,
35            timeout,
36            half_open_timeout: Duration::from_secs(60), // Try again after 60s
37        }
38    }
39
40    pub async fn call<F, Fut>(&self, f: F) -> Result<()>
41    where
42        F: FnOnce() -> Fut,
43        Fut: std::future::Future<Output = Result<()>>,
44    {
45        let state = *self.state.read().await;
46
47        match state {
48            CircuitState::Open => {
49                let last_failure = *self.last_failure_time.read().await;
50                if let Some(last) = last_failure {
51                    if last.elapsed() > self.half_open_timeout {
52                        // Transition to half-open
53                        *self.state.write().await = CircuitState::HalfOpen;
54                        *self.failure_count.write().await = 0;
55                        debug!("Circuit breaker transitioning to half-open");
56                    } else {
57                        return Err(anyhow::anyhow!("Circuit breaker is open"));
58                    }
59                } else {
60                    return Err(anyhow::anyhow!("Circuit breaker is open"));
61                }
62            }
63            CircuitState::HalfOpen => {
64                // Allow one attempt
65            }
66            CircuitState::Closed => {
67                // Normal operation
68            }
69        }
70
71        match f().await {
72            Ok(()) => {
73                // Success - reset failure count
74                *self.failure_count.write().await = 0;
75                if state == CircuitState::HalfOpen {
76                    *self.state.write().await = CircuitState::Closed;
77                    info!("Circuit breaker closed - service recovered");
78                }
79                Ok(())
80            }
81            Err(e) => {
82                // Failure - increment count
83                let mut count = self.failure_count.write().await;
84                *count += 1;
85                *self.last_failure_time.write().await = Some(Instant::now());
86
87                if *count >= self.failure_threshold {
88                    *self.state.write().await = CircuitState::Open;
89                    warn!("Circuit breaker opened after {} failures", *count);
90                }
91                Err(e)
92            }
93        }
94    }
95}
96
97/// Agent execution task with retry
98pub struct AgentTask {
99    pub activation: Activation,
100    pub agent: Arc<dyn Agent>,
101    pub max_retries: u32,
102    pub circuit_breaker: Option<Arc<CircuitBreaker>>,
103    pub dlq_sender: Option<mpsc::UnboundedSender<Activation>>,
104}
105
106impl AgentTask {
107    pub fn new(
108        activation: Activation,
109        agent: Arc<dyn Agent>,
110        max_retries: u32,
111        circuit_breaker: Option<Arc<CircuitBreaker>>,
112        dlq_sender: Option<mpsc::UnboundedSender<Activation>>,
113    ) -> Self {
114        Self {
115            activation,
116            agent,
117            max_retries,
118            circuit_breaker,
119            dlq_sender,
120        }
121    }
122
123    async fn execute_with_retry(&self) -> Result<()> {
124        let mut last_error = None;
125
126        for attempt in 0..=self.max_retries {
127            if attempt > 0 {
128                // Exponential backoff: 1s, 2s, 4s, 8s, ...
129                let delay = Duration::from_secs(2_u64.pow(attempt - 1));
130                debug!(
131                    rule_id = %self.activation.rule_id,
132                    attempt = attempt,
133                    delay_secs = delay.as_secs(),
134                    "Retrying agent execution"
135                );
136                sleep(delay).await;
137            }
138
139            let result = if let Some(cb) = &self.circuit_breaker {
140                cb.call(|| self.agent.execute(&self.activation)).await
141            } else {
142                self.agent.execute(&self.activation).await
143            };
144
145            match result {
146                Ok(()) => {
147                    if attempt > 0 {
148                        info!(
149                            rule_id = %self.activation.rule_id,
150                            attempt = attempt,
151                            "Agent execution succeeded after retry"
152                        );
153                    }
154                    return Ok(());
155                }
156                Err(e) => {
157                    last_error = Some(e);
158                    if attempt < self.max_retries {
159                        warn!(
160                            rule_id = %self.activation.rule_id,
161                            attempt = attempt,
162                            "Agent execution failed, will retry"
163                        );
164                    }
165                }
166            }
167        }
168
169        error!(
170            rule_id = %self.activation.rule_id,
171            max_retries = self.max_retries,
172            "Agent execution failed after all retries"
173        );
174        Err(last_error.unwrap_or_else(|| anyhow::anyhow!("Unknown error")))
175    }
176}
177
178/// Background agent execution queue with backpressure support
179pub struct AgentQueue {
180    sender: mpsc::Sender<AgentTask>, // Bounded channel for backpressure
181    pub dlq_sender: mpsc::UnboundedSender<Activation>, // Dead letter queue
182}
183
184impl AgentQueue {
185    pub fn new(capacity: Option<usize>) -> (Self, AgentQueueWorker) {
186        let capacity = capacity.unwrap_or(1000);
187        let (tx, rx) = mpsc::channel(capacity); // Bounded channel for backpressure
188        let (dlq_tx, dlq_rx) = mpsc::unbounded_channel();
189
190        let worker = AgentQueueWorker {
191            receiver: rx,
192            dlq_receiver: Some(dlq_rx),
193            concurrency: 1, // Default, will be set via with_concurrency
194        };
195
196        (
197            Self {
198                sender: tx,
199                dlq_sender: dlq_tx,
200            },
201            worker,
202        )
203    }
204
205    pub async fn enqueue(&self, task: AgentTask) -> Result<()> {
206        // This will block if the queue is full (backpressure)
207        self.sender
208            .send(task)
209            .await
210            .map_err(|e| anyhow::anyhow!("Failed to enqueue agent task: {}", e))?;
211        Ok(())
212    }
213
214    pub fn try_enqueue(&self, task: AgentTask) -> Result<()> {
215        // Non-blocking version - returns error if queue is full
216        self.sender
217            .try_send(task)
218            .map_err(|e| anyhow::anyhow!("Agent queue full (backpressure): {}", e))?;
219        Ok(())
220    }
221}
222
223/// Worker that processes agent tasks in the background with configurable concurrency
224pub struct AgentQueueWorker {
225    receiver: mpsc::Receiver<AgentTask>,
226    #[allow(dead_code)]
227    dlq_receiver: Option<mpsc::UnboundedReceiver<Activation>>, // Reserved for future DLQ processing
228    concurrency: usize,
229}
230
231impl AgentQueueWorker {
232    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
233        self.concurrency = concurrency;
234        self
235    }
236}
237
238/// Dead letter queue handler
239pub struct DeadLetterQueue {
240    receiver: mpsc::UnboundedReceiver<Activation>,
241}
242
243impl DeadLetterQueue {
244    pub fn new(receiver: mpsc::UnboundedReceiver<Activation>) -> Self {
245        Self { receiver }
246    }
247
248    pub async fn run(mut self) {
249        info!("Dead letter queue handler started");
250        let mut count = 0;
251        while let Some(activation) = self.receiver.recv().await {
252            count += 1;
253            warn!(
254                rule_id = %activation.rule_id,
255                rule_name = %activation.rule_name,
256                count = count,
257                "Activation sent to dead letter queue"
258            );
259            // In production, you'd store this to persistent storage
260        }
261        info!("Dead letter queue handler stopped");
262    }
263}
264
265impl AgentQueueWorker {
266    pub async fn run(mut self) {
267        let concurrency = self.concurrency.max(1); // At least 1 worker
268        info!(concurrency = concurrency, "Agent queue worker started");
269
270        // Spawn concurrent workers for parallel agent execution
271        // Use a shared receiver with Arc<Mutex> for concurrent access
272        let (task_tx, task_rx) = mpsc::unbounded_channel::<AgentTask>();
273        let shared_rx = Arc::new(tokio::sync::Mutex::new(task_rx));
274
275        // Spawn worker tasks
276        let mut worker_handles = Vec::new();
277        for worker_id in 0..concurrency {
278            let rx = Arc::clone(&shared_rx);
279            let handle = tokio::spawn(async move {
280                loop {
281                    let task = {
282                        let mut rx_guard = rx.lock().await;
283                        rx_guard.recv().await
284                    };
285                    let task = match task {
286                        Some(t) => t,
287                        None => break, // Channel closed
288                    };
289                    let start = Instant::now();
290                    match task.execute_with_retry().await {
291                        Ok(()) => {
292                            let duration = start.elapsed();
293                            debug!(
294                                worker_id = worker_id,
295                                rule_id = %task.activation.rule_id,
296                                agent = task.agent.name(),
297                                duration_ms = duration.as_millis(),
298                                "Agent execution completed"
299                            );
300                            // Record success metric
301                            crate::metrics::METRICS.record_agent_execution_duration(
302                                task.agent.name(),
303                                duration.as_secs_f64(),
304                            );
305                        }
306                        Err(e) => {
307                            error!(
308                                worker_id = worker_id,
309                                rule_id = %task.activation.rule_id,
310                                error = %e,
311                                "Agent execution failed permanently"
312                            );
313                            // Send to dead letter queue
314                            if let Some(dlq) = &task.dlq_sender {
315                                if let Err(dlq_err) = dlq.send(task.activation.clone()) {
316                                    warn!(
317                                        rule_id = %task.activation.rule_id,
318                                        error = %dlq_err,
319                                        "Failed to send to DLQ"
320                                    );
321                                }
322                            }
323                            crate::metrics::METRICS
324                                .agent_failures
325                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
326                        }
327                    }
328                }
329            });
330            worker_handles.push(handle);
331        }
332
333        // Main loop: receive tasks and distribute to workers
334        while let Some(task) = self.receiver.recv().await {
335            if let Err(e) = task_tx.send(task) {
336                error!("Failed to distribute task to worker: {}", e);
337                break;
338            }
339        }
340
341        // Close the task channel and wait for workers
342        drop(task_tx);
343        for handle in worker_handles {
344            let _ = handle.await;
345        }
346
347        info!("Agent queue worker stopped");
348    }
349}