sentinel_proxy/agents/
agent.rs1use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, EventType};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::warn;
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16pub struct Agent {
18 pub(super) config: AgentConfig,
20 pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22 pub(super) pool: Arc<AgentConnectionPool>,
24 pub(super) circuit_breaker: Arc<CircuitBreaker>,
26 pub(super) metrics: Arc<AgentMetrics>,
28 pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30 pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35 pub fn new(
37 config: AgentConfig,
38 pool: Arc<AgentConnectionPool>,
39 circuit_breaker: Arc<CircuitBreaker>,
40 ) -> Self {
41 Self {
42 config,
43 client: Arc::new(RwLock::new(None)),
44 pool,
45 circuit_breaker,
46 metrics: Arc::new(AgentMetrics::default()),
47 last_success: Arc::new(RwLock::new(None)),
48 consecutive_failures: AtomicU32::new(0),
49 }
50 }
51
52 pub fn id(&self) -> &str {
54 &self.config.id
55 }
56
57 pub fn circuit_breaker(&self) -> &CircuitBreaker {
59 &self.circuit_breaker
60 }
61
62 pub fn failure_mode(&self) -> sentinel_config::FailureMode {
64 self.config.failure_mode.clone()
65 }
66
67 pub fn timeout_ms(&self) -> u64 {
69 self.config.timeout_ms
70 }
71
72 pub fn metrics(&self) -> &AgentMetrics {
74 &self.metrics
75 }
76
77 pub fn handles_event(&self, event_type: EventType) -> bool {
79 self.config.events.iter().any(|e| match (e, event_type) {
80 (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
81 (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
82 (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
83 (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
84 (AgentEvent::Log, EventType::RequestComplete) => true,
85 _ => false,
86 })
87 }
88
89 pub async fn initialize(&self) -> SentinelResult<()> {
91 let timeout = Duration::from_millis(self.config.timeout_ms);
92
93 match &self.config.transport {
94 AgentTransport::UnixSocket { path } => {
95 let client = AgentClient::unix_socket(&self.config.id, path, timeout)
96 .await
97 .map_err(|e| SentinelError::Agent {
98 agent: self.config.id.clone(),
99 message: format!("Failed to connect via Unix socket: {}", e),
100 event: "initialize".to_string(),
101 source: None,
102 })?;
103
104 *self.client.write().await = Some(client);
105 Ok(())
106 }
107 AgentTransport::Grpc { address, tls: _ } => {
108 let client = AgentClient::grpc(&self.config.id, address, timeout)
110 .await
111 .map_err(|e| SentinelError::Agent {
112 agent: self.config.id.clone(),
113 message: format!("Failed to connect via gRPC: {}", e),
114 event: "initialize".to_string(),
115 source: None,
116 })?;
117
118 *self.client.write().await = Some(client);
119 Ok(())
120 }
121 AgentTransport::Http { url, tls: _ } => {
122 warn!(
123 agent = %self.config.id,
124 url = %url,
125 "HTTP transport not yet implemented, agent will not be available"
126 );
127 Ok(())
128 }
129 }
130 }
131
132 pub async fn call_event<T: serde::Serialize>(
134 &self,
135 event_type: EventType,
136 event: &T,
137 ) -> SentinelResult<AgentResponse> {
138 let mut client_guard = self.client.write().await;
140
141 if client_guard.is_none() {
142 drop(client_guard);
143 self.initialize().await?;
144 client_guard = self.client.write().await;
145 }
146
147 let client = client_guard.as_mut().ok_or_else(|| SentinelError::Agent {
148 agent: self.config.id.clone(),
149 message: "No client connection".to_string(),
150 event: format!("{:?}", event_type),
151 source: None,
152 })?;
153
154 self.metrics.calls_total.fetch_add(1, Ordering::Relaxed);
156
157 client.send_event(event_type, event).await.map_err(|e| {
158 SentinelError::Agent {
159 agent: self.config.id.clone(),
160 message: e.to_string(),
161 event: format!("{:?}", event_type),
162 source: None,
163 }
164 })
165 }
166
167 pub async fn record_success(&self, duration: Duration) {
169 self.metrics.calls_success.fetch_add(1, Ordering::Relaxed);
170 self.metrics
171 .duration_total_us
172 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
173 self.consecutive_failures.store(0, Ordering::Relaxed);
174 *self.last_success.write().await = Some(Instant::now());
175
176 self.circuit_breaker.record_success().await;
177 }
178
179 pub async fn record_failure(&self) {
181 self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed);
182 self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
183
184 self.circuit_breaker.record_failure().await;
185 }
186
187 pub async fn record_timeout(&self) {
189 self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed);
190 self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
191
192 self.circuit_breaker.record_failure().await;
193 }
194
195 pub async fn shutdown(&self) {
197 if let Some(client) = self.client.write().await.take() {
198 let _ = client.close().await;
199 }
200 }
201}