1use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, EventType};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, trace, warn};
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16pub struct Agent {
18 pub(super) config: AgentConfig,
20 pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22 pub(super) pool: Arc<AgentConnectionPool>,
24 pub(super) circuit_breaker: Arc<CircuitBreaker>,
26 pub(super) metrics: Arc<AgentMetrics>,
28 pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30 pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35 pub fn new(
37 config: AgentConfig,
38 pool: Arc<AgentConnectionPool>,
39 circuit_breaker: Arc<CircuitBreaker>,
40 ) -> Self {
41 trace!(
42 agent_id = %config.id,
43 agent_type = ?config.agent_type,
44 timeout_ms = config.timeout_ms,
45 events = ?config.events,
46 "Creating agent instance"
47 );
48 Self {
49 config,
50 client: Arc::new(RwLock::new(None)),
51 pool,
52 circuit_breaker,
53 metrics: Arc::new(AgentMetrics::default()),
54 last_success: Arc::new(RwLock::new(None)),
55 consecutive_failures: AtomicU32::new(0),
56 }
57 }
58
59 pub fn id(&self) -> &str {
61 &self.config.id
62 }
63
64 pub fn circuit_breaker(&self) -> &CircuitBreaker {
66 &self.circuit_breaker
67 }
68
69 pub fn failure_mode(&self) -> sentinel_config::FailureMode {
71 self.config.failure_mode.clone()
72 }
73
74 pub fn timeout_ms(&self) -> u64 {
76 self.config.timeout_ms
77 }
78
79 pub fn metrics(&self) -> &AgentMetrics {
81 &self.metrics
82 }
83
84 pub fn handles_event(&self, event_type: EventType) -> bool {
86 self.config.events.iter().any(|e| match (e, event_type) {
87 (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
88 (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
89 (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
90 (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
91 (AgentEvent::Log, EventType::RequestComplete) => true,
92 _ => false,
93 })
94 }
95
96 pub async fn initialize(&self) -> SentinelResult<()> {
98 let timeout = Duration::from_millis(self.config.timeout_ms);
99
100 debug!(
101 agent_id = %self.config.id,
102 transport = ?self.config.transport,
103 timeout_ms = self.config.timeout_ms,
104 "Initializing agent connection"
105 );
106
107 let start = Instant::now();
108
109 match &self.config.transport {
110 AgentTransport::UnixSocket { path } => {
111 trace!(
112 agent_id = %self.config.id,
113 socket_path = %path.display(),
114 "Connecting to agent via Unix socket"
115 );
116
117 let client = AgentClient::unix_socket(&self.config.id, path, timeout)
118 .await
119 .map_err(|e| {
120 error!(
121 agent_id = %self.config.id,
122 socket_path = %path.display(),
123 error = %e,
124 "Failed to connect to agent via Unix socket"
125 );
126 SentinelError::Agent {
127 agent: self.config.id.clone(),
128 message: format!("Failed to connect via Unix socket: {}", e),
129 event: "initialize".to_string(),
130 source: None,
131 }
132 })?;
133
134 *self.client.write().await = Some(client);
135
136 info!(
137 agent_id = %self.config.id,
138 socket_path = %path.display(),
139 connect_time_ms = start.elapsed().as_millis(),
140 "Agent connected via Unix socket"
141 );
142 Ok(())
143 }
144 AgentTransport::Grpc { address, tls: _ } => {
145 trace!(
146 agent_id = %self.config.id,
147 address = %address,
148 "Connecting to agent via gRPC"
149 );
150
151 let client = AgentClient::grpc(&self.config.id, address, timeout)
153 .await
154 .map_err(|e| {
155 error!(
156 agent_id = %self.config.id,
157 address = %address,
158 error = %e,
159 "Failed to connect to agent via gRPC"
160 );
161 SentinelError::Agent {
162 agent: self.config.id.clone(),
163 message: format!("Failed to connect via gRPC: {}", e),
164 event: "initialize".to_string(),
165 source: None,
166 }
167 })?;
168
169 *self.client.write().await = Some(client);
170
171 info!(
172 agent_id = %self.config.id,
173 address = %address,
174 connect_time_ms = start.elapsed().as_millis(),
175 "Agent connected via gRPC"
176 );
177 Ok(())
178 }
179 AgentTransport::Http { url, tls: _ } => {
180 warn!(
181 agent_id = %self.config.id,
182 url = %url,
183 "HTTP transport not yet implemented, agent will not be available"
184 );
185 Ok(())
186 }
187 }
188 }
189
190 pub async fn call_event<T: serde::Serialize>(
192 &self,
193 event_type: EventType,
194 event: &T,
195 ) -> SentinelResult<AgentResponse> {
196 trace!(
197 agent_id = %self.config.id,
198 event_type = ?event_type,
199 "Preparing to call agent"
200 );
201
202 let mut client_guard = self.client.write().await;
204
205 if client_guard.is_none() {
206 trace!(
207 agent_id = %self.config.id,
208 "No existing connection, initializing"
209 );
210 drop(client_guard);
211 self.initialize().await?;
212 client_guard = self.client.write().await;
213 }
214
215 let client = client_guard.as_mut().ok_or_else(|| {
216 error!(
217 agent_id = %self.config.id,
218 event_type = ?event_type,
219 "No client connection available after initialization"
220 );
221 SentinelError::Agent {
222 agent: self.config.id.clone(),
223 message: "No client connection".to_string(),
224 event: format!("{:?}", event_type),
225 source: None,
226 }
227 })?;
228
229 let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
231
232 trace!(
233 agent_id = %self.config.id,
234 event_type = ?event_type,
235 call_num = call_num,
236 "Sending event to agent"
237 );
238
239 client.send_event(event_type, event).await.map_err(|e| {
240 error!(
241 agent_id = %self.config.id,
242 event_type = ?event_type,
243 error = %e,
244 "Agent call failed"
245 );
246 SentinelError::Agent {
247 agent: self.config.id.clone(),
248 message: e.to_string(),
249 event: format!("{:?}", event_type),
250 source: None,
251 }
252 })
253 }
254
255 pub async fn record_success(&self, duration: Duration) {
257 let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
258 self.metrics
259 .duration_total_us
260 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
261 self.consecutive_failures.store(0, Ordering::Relaxed);
262 *self.last_success.write().await = Some(Instant::now());
263
264 trace!(
265 agent_id = %self.config.id,
266 duration_ms = duration.as_millis(),
267 total_successes = success_count,
268 "Recorded agent call success"
269 );
270
271 self.circuit_breaker.record_success().await;
272 }
273
274 pub async fn record_failure(&self) {
276 let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
277 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
278
279 debug!(
280 agent_id = %self.config.id,
281 total_failures = fail_count,
282 consecutive_failures = consecutive,
283 "Recorded agent call failure"
284 );
285
286 self.circuit_breaker.record_failure().await;
287 }
288
289 pub async fn record_timeout(&self) {
291 let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
292 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
293
294 debug!(
295 agent_id = %self.config.id,
296 total_timeouts = timeout_count,
297 consecutive_failures = consecutive,
298 timeout_ms = self.config.timeout_ms,
299 "Recorded agent call timeout"
300 );
301
302 self.circuit_breaker.record_failure().await;
303 }
304
305 pub async fn shutdown(&self) {
307 debug!(
308 agent_id = %self.config.id,
309 "Shutting down agent"
310 );
311
312 if let Some(client) = self.client.write().await.take() {
313 trace!(
314 agent_id = %self.config.id,
315 "Closing agent client connection"
316 );
317 let _ = client.close().await;
318 }
319
320 let stats = (
321 self.metrics.calls_total.load(Ordering::Relaxed),
322 self.metrics.calls_success.load(Ordering::Relaxed),
323 self.metrics.calls_failed.load(Ordering::Relaxed),
324 self.metrics.calls_timeout.load(Ordering::Relaxed),
325 );
326
327 info!(
328 agent_id = %self.config.id,
329 total_calls = stats.0,
330 successes = stats.1,
331 failures = stats.2,
332 timeouts = stats.3,
333 "Agent shutdown complete"
334 );
335 }
336}