1use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, EventType};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::{debug, error, info, trace, warn};
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16pub struct Agent {
18 pub(super) config: AgentConfig,
20 pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22 pub(super) pool: Arc<AgentConnectionPool>,
24 pub(super) circuit_breaker: Arc<CircuitBreaker>,
26 pub(super) metrics: Arc<AgentMetrics>,
28 pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30 pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35 pub fn new(
37 config: AgentConfig,
38 pool: Arc<AgentConnectionPool>,
39 circuit_breaker: Arc<CircuitBreaker>,
40 ) -> Self {
41 trace!(
42 agent_id = %config.id,
43 agent_type = ?config.agent_type,
44 timeout_ms = config.timeout_ms,
45 events = ?config.events,
46 "Creating agent instance"
47 );
48 Self {
49 config,
50 client: Arc::new(RwLock::new(None)),
51 pool,
52 circuit_breaker,
53 metrics: Arc::new(AgentMetrics::default()),
54 last_success: Arc::new(RwLock::new(None)),
55 consecutive_failures: AtomicU32::new(0),
56 }
57 }
58
59 pub fn id(&self) -> &str {
61 &self.config.id
62 }
63
64 pub fn circuit_breaker(&self) -> &CircuitBreaker {
66 &self.circuit_breaker
67 }
68
69 pub fn failure_mode(&self) -> sentinel_config::FailureMode {
71 self.config.failure_mode
72 }
73
74 pub fn timeout_ms(&self) -> u64 {
76 self.config.timeout_ms
77 }
78
79 pub fn metrics(&self) -> &AgentMetrics {
81 &self.metrics
82 }
83
84 pub fn handles_event(&self, event_type: EventType) -> bool {
86 self.config.events.iter().any(|e| match (e, event_type) {
87 (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
88 (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
89 (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
90 (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
91 (AgentEvent::Log, EventType::RequestComplete) => true,
92 (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
93 _ => false,
94 })
95 }
96
97 pub async fn initialize(&self) -> SentinelResult<()> {
99 let timeout = Duration::from_millis(self.config.timeout_ms);
100
101 debug!(
102 agent_id = %self.config.id,
103 transport = ?self.config.transport,
104 timeout_ms = self.config.timeout_ms,
105 "Initializing agent connection"
106 );
107
108 let start = Instant::now();
109
110 match &self.config.transport {
111 AgentTransport::UnixSocket { path } => {
112 trace!(
113 agent_id = %self.config.id,
114 socket_path = %path.display(),
115 "Connecting to agent via Unix socket"
116 );
117
118 let client = AgentClient::unix_socket(&self.config.id, path, timeout)
119 .await
120 .map_err(|e| {
121 error!(
122 agent_id = %self.config.id,
123 socket_path = %path.display(),
124 error = %e,
125 "Failed to connect to agent via Unix socket"
126 );
127 SentinelError::Agent {
128 agent: self.config.id.clone(),
129 message: format!("Failed to connect via Unix socket: {}", e),
130 event: "initialize".to_string(),
131 source: None,
132 }
133 })?;
134
135 *self.client.write().await = Some(client);
136
137 info!(
138 agent_id = %self.config.id,
139 socket_path = %path.display(),
140 connect_time_ms = start.elapsed().as_millis(),
141 "Agent connected via Unix socket"
142 );
143 Ok(())
144 }
145 AgentTransport::Grpc { address, tls: _ } => {
146 trace!(
147 agent_id = %self.config.id,
148 address = %address,
149 "Connecting to agent via gRPC"
150 );
151
152 let client = AgentClient::grpc(&self.config.id, address, timeout)
154 .await
155 .map_err(|e| {
156 error!(
157 agent_id = %self.config.id,
158 address = %address,
159 error = %e,
160 "Failed to connect to agent via gRPC"
161 );
162 SentinelError::Agent {
163 agent: self.config.id.clone(),
164 message: format!("Failed to connect via gRPC: {}", e),
165 event: "initialize".to_string(),
166 source: None,
167 }
168 })?;
169
170 *self.client.write().await = Some(client);
171
172 info!(
173 agent_id = %self.config.id,
174 address = %address,
175 connect_time_ms = start.elapsed().as_millis(),
176 "Agent connected via gRPC"
177 );
178 Ok(())
179 }
180 AgentTransport::Http { url, tls: _ } => {
181 warn!(
182 agent_id = %self.config.id,
183 url = %url,
184 "HTTP transport not yet implemented, agent will not be available"
185 );
186 Ok(())
187 }
188 }
189 }
190
191 pub async fn call_event<T: serde::Serialize>(
193 &self,
194 event_type: EventType,
195 event: &T,
196 ) -> SentinelResult<AgentResponse> {
197 trace!(
198 agent_id = %self.config.id,
199 event_type = ?event_type,
200 "Preparing to call agent"
201 );
202
203 let mut client_guard = self.client.write().await;
205
206 if client_guard.is_none() {
207 trace!(
208 agent_id = %self.config.id,
209 "No existing connection, initializing"
210 );
211 drop(client_guard);
212 self.initialize().await?;
213 client_guard = self.client.write().await;
214 }
215
216 let client = client_guard.as_mut().ok_or_else(|| {
217 error!(
218 agent_id = %self.config.id,
219 event_type = ?event_type,
220 "No client connection available after initialization"
221 );
222 SentinelError::Agent {
223 agent: self.config.id.clone(),
224 message: "No client connection".to_string(),
225 event: format!("{:?}", event_type),
226 source: None,
227 }
228 })?;
229
230 let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
232
233 trace!(
234 agent_id = %self.config.id,
235 event_type = ?event_type,
236 call_num = call_num,
237 "Sending event to agent"
238 );
239
240 client.send_event(event_type, event).await.map_err(|e| {
241 error!(
242 agent_id = %self.config.id,
243 event_type = ?event_type,
244 error = %e,
245 "Agent call failed"
246 );
247 SentinelError::Agent {
248 agent: self.config.id.clone(),
249 message: e.to_string(),
250 event: format!("{:?}", event_type),
251 source: None,
252 }
253 })
254 }
255
256 pub async fn record_success(&self, duration: Duration) {
258 let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
259 self.metrics
260 .duration_total_us
261 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
262 self.consecutive_failures.store(0, Ordering::Relaxed);
263 *self.last_success.write().await = Some(Instant::now());
264
265 trace!(
266 agent_id = %self.config.id,
267 duration_ms = duration.as_millis(),
268 total_successes = success_count,
269 "Recorded agent call success"
270 );
271
272 self.circuit_breaker.record_success().await;
273 }
274
275 pub async fn record_failure(&self) {
277 let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
278 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
279
280 debug!(
281 agent_id = %self.config.id,
282 total_failures = fail_count,
283 consecutive_failures = consecutive,
284 "Recorded agent call failure"
285 );
286
287 self.circuit_breaker.record_failure().await;
288 }
289
290 pub async fn record_timeout(&self) {
292 let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
293 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
294
295 debug!(
296 agent_id = %self.config.id,
297 total_timeouts = timeout_count,
298 consecutive_failures = consecutive,
299 timeout_ms = self.config.timeout_ms,
300 "Recorded agent call timeout"
301 );
302
303 self.circuit_breaker.record_failure().await;
304 }
305
306 pub async fn shutdown(&self) {
308 debug!(
309 agent_id = %self.config.id,
310 "Shutting down agent"
311 );
312
313 if let Some(client) = self.client.write().await.take() {
314 trace!(
315 agent_id = %self.config.id,
316 "Closing agent client connection"
317 );
318 let _ = client.close().await;
319 }
320
321 let stats = (
322 self.metrics.calls_total.load(Ordering::Relaxed),
323 self.metrics.calls_success.load(Ordering::Relaxed),
324 self.metrics.calls_failed.load(Ordering::Relaxed),
325 self.metrics.calls_timeout.load(Ordering::Relaxed),
326 );
327
328 info!(
329 agent_id = %self.config.id,
330 total_calls = stats.0,
331 successes = stats.1,
332 failures = stats.2,
333 timeouts = stats.3,
334 "Agent shutdown complete"
335 );
336 }
337}