sentinel_proxy/agents/
agent.rs1use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use sentinel_agent_protocol::{AgentClient, AgentResponse, EventType};
8use sentinel_common::{errors::SentinelError, errors::SentinelResult, CircuitBreaker};
9use sentinel_config::{AgentConfig, AgentEvent, AgentTransport};
10use tokio::sync::RwLock;
11use tracing::warn;
12
13use super::metrics::AgentMetrics;
14use super::pool::AgentConnectionPool;
15
16pub struct Agent {
18 pub(super) config: AgentConfig,
20 pub(super) client: Arc<RwLock<Option<AgentClient>>>,
22 pub(super) pool: Arc<AgentConnectionPool>,
24 pub(super) circuit_breaker: Arc<CircuitBreaker>,
26 pub(super) metrics: Arc<AgentMetrics>,
28 pub(super) last_success: Arc<RwLock<Option<Instant>>>,
30 pub(super) consecutive_failures: AtomicU32,
32}
33
34impl Agent {
35 pub fn new(
37 config: AgentConfig,
38 pool: Arc<AgentConnectionPool>,
39 circuit_breaker: Arc<CircuitBreaker>,
40 ) -> Self {
41 Self {
42 config,
43 client: Arc::new(RwLock::new(None)),
44 pool,
45 circuit_breaker,
46 metrics: Arc::new(AgentMetrics::default()),
47 last_success: Arc::new(RwLock::new(None)),
48 consecutive_failures: AtomicU32::new(0),
49 }
50 }
51
52 pub fn id(&self) -> &str {
54 &self.config.id
55 }
56
57 pub fn circuit_breaker(&self) -> &CircuitBreaker {
59 &self.circuit_breaker
60 }
61
62 pub fn failure_mode(&self) -> sentinel_config::FailureMode {
64 self.config.failure_mode.clone()
65 }
66
67 pub fn timeout_ms(&self) -> u64 {
69 self.config.timeout_ms
70 }
71
72 pub fn metrics(&self) -> &AgentMetrics {
74 &self.metrics
75 }
76
77 pub fn handles_event(&self, event_type: EventType) -> bool {
79 self.config.events.iter().any(|e| match (e, event_type) {
80 (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
81 (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
82 (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
83 (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
84 (AgentEvent::Log, EventType::RequestComplete) => true,
85 _ => false,
86 })
87 }
88
89 pub async fn initialize(&self) -> SentinelResult<()> {
91 match &self.config.transport {
92 AgentTransport::UnixSocket { path } => {
93 let client = AgentClient::unix_socket(
94 &self.config.id,
95 path,
96 Duration::from_millis(self.config.timeout_ms),
97 )
98 .await
99 .map_err(|e| SentinelError::Agent {
100 agent: self.config.id.clone(),
101 message: format!("Failed to connect: {}", e),
102 event: "initialize".to_string(),
103 source: None,
104 })?;
105
106 *self.client.write().await = Some(client);
107 Ok(())
108 }
109 _ => {
110 warn!(
111 "Unsupported agent transport: {:?}",
112 self.config.transport
113 );
114 Ok(())
115 }
116 }
117 }
118
119 pub async fn call_event<T: serde::Serialize>(
121 &self,
122 event_type: EventType,
123 event: &T,
124 ) -> SentinelResult<AgentResponse> {
125 let mut client_guard = self.client.write().await;
127
128 if client_guard.is_none() {
129 drop(client_guard);
130 self.initialize().await?;
131 client_guard = self.client.write().await;
132 }
133
134 let client = client_guard.as_mut().ok_or_else(|| SentinelError::Agent {
135 agent: self.config.id.clone(),
136 message: "No client connection".to_string(),
137 event: format!("{:?}", event_type),
138 source: None,
139 })?;
140
141 self.metrics.calls_total.fetch_add(1, Ordering::Relaxed);
143
144 client.send_event(event_type, event).await.map_err(|e| {
145 SentinelError::Agent {
146 agent: self.config.id.clone(),
147 message: e.to_string(),
148 event: format!("{:?}", event_type),
149 source: None,
150 }
151 })
152 }
153
154 pub async fn record_success(&self, duration: Duration) {
156 self.metrics.calls_success.fetch_add(1, Ordering::Relaxed);
157 self.metrics
158 .duration_total_us
159 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
160 self.consecutive_failures.store(0, Ordering::Relaxed);
161 *self.last_success.write().await = Some(Instant::now());
162
163 self.circuit_breaker.record_success().await;
164 }
165
166 pub async fn record_failure(&self) {
168 self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed);
169 self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
170
171 self.circuit_breaker.record_failure().await;
172 }
173
174 pub async fn record_timeout(&self) {
176 self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed);
177 self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
178
179 self.circuit_breaker.record_failure().await;
180 }
181
182 pub async fn shutdown(&self) {
184 if let Some(client) = self.client.write().await.take() {
185 let _ = client.close().await;
186 }
187 }
188}