1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use sentinel_agent_protocol::v2::{
11 AgentCapabilities, AgentPool, AgentPoolConfig as ProtocolPoolConfig, AgentPoolStats,
12 CancelReason, ConfigPusher, ConfigUpdateType, LoadBalanceStrategy as ProtocolLBStrategy,
13 MetricsCollector,
14};
15use sentinel_agent_protocol::{
16 AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent, ResponseBodyChunkEvent,
17 ResponseHeadersEvent,
18};
19use sentinel_common::{
20 errors::{SentinelError, SentinelResult},
21 CircuitBreaker,
22};
23use sentinel_config::{AgentConfig, AgentEvent, FailureMode, LoadBalanceStrategy};
24use tracing::{debug, error, info, trace, warn};
25
26use super::metrics::AgentMetrics;
27
28const NO_TIMESTAMP: u64 = 0;
30
31pub struct AgentV2 {
33 config: AgentConfig,
35 pool: Arc<AgentPool>,
37 circuit_breaker: Arc<CircuitBreaker>,
39 metrics: Arc<AgentMetrics>,
41 base_instant: Instant,
43 last_success_ns: AtomicU64,
45 consecutive_failures: AtomicU32,
47}
48
49impl AgentV2 {
50 pub fn new(config: AgentConfig, circuit_breaker: Arc<CircuitBreaker>) -> Self {
52 trace!(
53 agent_id = %config.id,
54 agent_type = ?config.agent_type,
55 timeout_ms = config.timeout_ms,
56 events = ?config.events,
57 "Creating v2 agent instance"
58 );
59
60 let pool_config = config
62 .pool
63 .as_ref()
64 .map(|p| ProtocolPoolConfig {
65 connections_per_agent: p.connections_per_agent,
66 load_balance_strategy: convert_lb_strategy(p.load_balance_strategy),
67 connect_timeout: Duration::from_millis(p.connect_timeout_ms),
68 request_timeout: Duration::from_millis(config.timeout_ms),
69 reconnect_interval: Duration::from_millis(p.reconnect_interval_ms),
70 max_reconnect_attempts: p.max_reconnect_attempts,
71 drain_timeout: Duration::from_millis(p.drain_timeout_ms),
72 max_concurrent_per_connection: p.max_concurrent_per_connection,
73 health_check_interval: Duration::from_millis(p.health_check_interval_ms),
74 ..Default::default()
75 })
76 .unwrap_or_default();
77
78 let pool = Arc::new(AgentPool::with_config(pool_config));
79
80 Self {
81 config,
82 pool,
83 circuit_breaker,
84 metrics: Arc::new(AgentMetrics::default()),
85 base_instant: Instant::now(),
86 last_success_ns: AtomicU64::new(NO_TIMESTAMP),
87 consecutive_failures: AtomicU32::new(0),
88 }
89 }
90
91 pub fn id(&self) -> &str {
93 &self.config.id
94 }
95
96 pub fn circuit_breaker(&self) -> &CircuitBreaker {
98 &self.circuit_breaker
99 }
100
101 pub fn failure_mode(&self) -> FailureMode {
103 self.config.failure_mode
104 }
105
106 pub fn timeout_ms(&self) -> u64 {
108 self.config.timeout_ms
109 }
110
111 pub fn metrics(&self) -> &AgentMetrics {
113 &self.metrics
114 }
115
116 pub fn handles_event(&self, event_type: EventType) -> bool {
118 self.config.events.iter().any(|e| match (e, event_type) {
119 (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
120 (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
121 (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
122 (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
123 (AgentEvent::Log, EventType::RequestComplete) => true,
124 (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
125 _ => false,
126 })
127 }
128
129 pub async fn initialize(&self) -> SentinelResult<()> {
131 let endpoint = self.get_endpoint()?;
132
133 debug!(
134 agent_id = %self.config.id,
135 endpoint = %endpoint,
136 "Initializing v2 agent pool"
137 );
138
139 let start = Instant::now();
140
141 self.pool
143 .add_agent(&self.config.id, &endpoint)
144 .await
145 .map_err(|e| {
146 error!(
147 agent_id = %self.config.id,
148 endpoint = %endpoint,
149 error = %e,
150 "Failed to add agent to v2 pool"
151 );
152 SentinelError::Agent {
153 agent: self.config.id.clone(),
154 message: format!("Failed to initialize v2 agent: {}", e),
155 event: "initialize".to_string(),
156 source: None,
157 }
158 })?;
159
160 info!(
161 agent_id = %self.config.id,
162 endpoint = %endpoint,
163 connect_time_ms = start.elapsed().as_millis(),
164 "V2 agent pool initialized"
165 );
166
167 if let Some(config_value) = &self.config.config {
169 self.send_configure(config_value.clone()).await?;
170 }
171
172 Ok(())
173 }
174
175 fn get_endpoint(&self) -> SentinelResult<String> {
177 use sentinel_config::AgentTransport;
178 match &self.config.transport {
179 AgentTransport::Grpc { address, .. } => Ok(address.clone()),
180 AgentTransport::UnixSocket { path } => {
181 Ok(format!("unix:{}", path.display()))
183 }
184 AgentTransport::Http { url, .. } => {
185 Err(SentinelError::Agent {
187 agent: self.config.id.clone(),
188 message: "HTTP transport not supported for v2 protocol".to_string(),
189 event: "initialize".to_string(),
190 source: None,
191 })
192 }
193 }
194 }
195
196 async fn send_configure(&self, _config: serde_json::Value) -> SentinelResult<()> {
201 debug!(
202 agent_id = %self.config.id,
203 "Configuration will be sent through control stream on connection"
204 );
205
206 info!(
211 agent_id = %self.config.id,
212 "V2 agent configuration noted"
213 );
214
215 Ok(())
216 }
217
218 pub async fn call_request_headers(
220 &self,
221 event: &RequestHeadersEvent,
222 ) -> SentinelResult<AgentResponse> {
223 let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
224
225 let correlation_id = &event.metadata.correlation_id;
227
228 trace!(
229 agent_id = %self.config.id,
230 call_num = call_num,
231 correlation_id = %correlation_id,
232 "Sending request headers to v2 agent"
233 );
234
235 self.pool
236 .send_request_headers(&self.config.id, correlation_id, event)
237 .await
238 .map_err(|e| {
239 error!(
240 agent_id = %self.config.id,
241 correlation_id = %correlation_id,
242 error = %e,
243 "V2 agent request headers call failed"
244 );
245 SentinelError::Agent {
246 agent: self.config.id.clone(),
247 message: e.to_string(),
248 event: "request_headers".to_string(),
249 source: None,
250 }
251 })
252 }
253
254 pub async fn call_request_body_chunk(
259 &self,
260 event: &RequestBodyChunkEvent,
261 ) -> SentinelResult<AgentResponse> {
262 let correlation_id = &event.correlation_id;
263
264 trace!(
265 agent_id = %self.config.id,
266 correlation_id = %correlation_id,
267 chunk_index = event.chunk_index,
268 is_last = event.is_last,
269 "Sending request body chunk to v2 agent"
270 );
271
272 self.pool
273 .send_request_body_chunk(&self.config.id, correlation_id, event)
274 .await
275 .map_err(|e| {
276 error!(
277 agent_id = %self.config.id,
278 correlation_id = %correlation_id,
279 error = %e,
280 "V2 agent request body chunk call failed"
281 );
282 SentinelError::Agent {
283 agent: self.config.id.clone(),
284 message: e.to_string(),
285 event: "request_body_chunk".to_string(),
286 source: None,
287 }
288 })
289 }
290
291 pub async fn call_response_headers(
296 &self,
297 event: &ResponseHeadersEvent,
298 ) -> SentinelResult<AgentResponse> {
299 let correlation_id = &event.correlation_id;
300
301 trace!(
302 agent_id = %self.config.id,
303 correlation_id = %correlation_id,
304 status = event.status,
305 "Sending response headers to v2 agent"
306 );
307
308 self.pool
309 .send_response_headers(&self.config.id, correlation_id, event)
310 .await
311 .map_err(|e| {
312 error!(
313 agent_id = %self.config.id,
314 correlation_id = %correlation_id,
315 error = %e,
316 "V2 agent response headers call failed"
317 );
318 SentinelError::Agent {
319 agent: self.config.id.clone(),
320 message: e.to_string(),
321 event: "response_headers".to_string(),
322 source: None,
323 }
324 })
325 }
326
327 pub async fn call_response_body_chunk(
332 &self,
333 event: &ResponseBodyChunkEvent,
334 ) -> SentinelResult<AgentResponse> {
335 let correlation_id = &event.correlation_id;
336
337 trace!(
338 agent_id = %self.config.id,
339 correlation_id = %correlation_id,
340 chunk_index = event.chunk_index,
341 is_last = event.is_last,
342 "Sending response body chunk to v2 agent"
343 );
344
345 self.pool
346 .send_response_body_chunk(&self.config.id, correlation_id, event)
347 .await
348 .map_err(|e| {
349 error!(
350 agent_id = %self.config.id,
351 correlation_id = %correlation_id,
352 error = %e,
353 "V2 agent response body chunk call failed"
354 );
355 SentinelError::Agent {
356 agent: self.config.id.clone(),
357 message: e.to_string(),
358 event: "response_body_chunk".to_string(),
359 source: None,
360 }
361 })
362 }
363
364 pub async fn cancel_request(
366 &self,
367 correlation_id: &str,
368 reason: CancelReason,
369 ) -> SentinelResult<()> {
370 trace!(
371 agent_id = %self.config.id,
372 correlation_id = %correlation_id,
373 reason = ?reason,
374 "Cancelling request on v2 agent"
375 );
376
377 self.pool
378 .cancel_request(&self.config.id, correlation_id, reason)
379 .await
380 .map_err(|e| {
381 warn!(
382 agent_id = %self.config.id,
383 correlation_id = %correlation_id,
384 error = %e,
385 "Failed to cancel request on v2 agent"
386 );
387 SentinelError::Agent {
388 agent: self.config.id.clone(),
389 message: format!("Cancel failed: {}", e),
390 event: "cancel".to_string(),
391 source: None,
392 }
393 })
394 }
395
396 pub async fn capabilities(&self) -> Option<AgentCapabilities> {
398 self.pool.agent_capabilities(&self.config.id).await
399 }
400
401 pub async fn is_healthy(&self) -> bool {
403 self.pool.is_agent_healthy(&self.config.id)
404 }
405
406 pub fn record_success(&self, duration: Duration) {
408 let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
409 self.metrics
410 .duration_total_us
411 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
412 self.consecutive_failures.store(0, Ordering::Relaxed);
413 self.last_success_ns.store(
414 self.base_instant.elapsed().as_nanos() as u64,
415 Ordering::Relaxed,
416 );
417
418 trace!(
419 agent_id = %self.config.id,
420 duration_ms = duration.as_millis(),
421 total_successes = success_count,
422 "Recorded v2 agent call success"
423 );
424
425 self.circuit_breaker.record_success();
426 }
427
428 #[inline]
430 pub fn time_since_last_success(&self) -> Option<Duration> {
431 let last_ns = self.last_success_ns.load(Ordering::Relaxed);
432 if last_ns == NO_TIMESTAMP {
433 return None;
434 }
435 let current_ns = self.base_instant.elapsed().as_nanos() as u64;
436 Some(Duration::from_nanos(current_ns.saturating_sub(last_ns)))
437 }
438
439 pub fn record_failure(&self) {
441 let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
442 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
443
444 debug!(
445 agent_id = %self.config.id,
446 total_failures = fail_count,
447 consecutive_failures = consecutive,
448 "Recorded v2 agent call failure"
449 );
450
451 self.circuit_breaker.record_failure();
452 }
453
454 pub fn record_timeout(&self) {
456 let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
457 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
458
459 debug!(
460 agent_id = %self.config.id,
461 total_timeouts = timeout_count,
462 consecutive_failures = consecutive,
463 timeout_ms = self.config.timeout_ms,
464 "Recorded v2 agent call timeout"
465 );
466
467 self.circuit_breaker.record_failure();
468 }
469
470 pub async fn pool_stats(&self) -> Option<AgentPoolStats> {
472 self.pool.agent_stats(&self.config.id).await
473 }
474
475 pub fn pool_metrics_collector(&self) -> &MetricsCollector {
480 self.pool.metrics_collector()
481 }
482
483 pub fn pool_metrics_collector_arc(&self) -> Arc<MetricsCollector> {
487 self.pool.metrics_collector_arc()
488 }
489
490 pub fn export_prometheus(&self) -> String {
495 self.pool.export_prometheus()
496 }
497
498 pub fn config_pusher(&self) -> &ConfigPusher {
503 self.pool.config_pusher()
504 }
505
506 pub fn push_config(&self, update_type: ConfigUpdateType) -> Option<String> {
510 self.pool.push_config_to_agent(&self.config.id, update_type)
511 }
512
513 pub async fn send_configuration(&self, config: serde_json::Value) -> SentinelResult<()> {
517 if let Some(push_id) = self.push_config(ConfigUpdateType::RequestReload) {
521 debug!(
522 agent_id = %self.config.id,
523 push_id = %push_id,
524 "Configuration push initiated"
525 );
526 Ok(())
527 } else {
528 warn!(
529 agent_id = %self.config.id,
530 "Agent does not support config push"
531 );
532 Err(SentinelError::Agent {
533 agent: self.config.id.clone(),
534 message: "Agent does not support config push".to_string(),
535 event: "send_configuration".to_string(),
536 source: None,
537 })
538 }
539 }
540
541 pub async fn shutdown(&self) {
545 debug!(
546 agent_id = %self.config.id,
547 "Shutting down v2 agent"
548 );
549
550 if let Err(e) = self.pool.remove_agent(&self.config.id).await {
552 warn!(
553 agent_id = %self.config.id,
554 error = %e,
555 "Error removing agent from pool during shutdown"
556 );
557 }
558
559 let stats = (
560 self.metrics.calls_total.load(Ordering::Relaxed),
561 self.metrics.calls_success.load(Ordering::Relaxed),
562 self.metrics.calls_failed.load(Ordering::Relaxed),
563 self.metrics.calls_timeout.load(Ordering::Relaxed),
564 );
565
566 info!(
567 agent_id = %self.config.id,
568 total_calls = stats.0,
569 successes = stats.1,
570 failures = stats.2,
571 timeouts = stats.3,
572 "V2 agent shutdown complete"
573 );
574 }
575}
576
577fn convert_lb_strategy(strategy: LoadBalanceStrategy) -> ProtocolLBStrategy {
579 match strategy {
580 LoadBalanceStrategy::RoundRobin => ProtocolLBStrategy::RoundRobin,
581 LoadBalanceStrategy::LeastConnections => ProtocolLBStrategy::LeastConnections,
582 LoadBalanceStrategy::HealthBased => ProtocolLBStrategy::HealthBased,
583 LoadBalanceStrategy::Random => ProtocolLBStrategy::Random,
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590
591 #[test]
592 fn test_convert_lb_strategy() {
593 assert_eq!(
594 convert_lb_strategy(LoadBalanceStrategy::RoundRobin),
595 ProtocolLBStrategy::RoundRobin
596 );
597 assert_eq!(
598 convert_lb_strategy(LoadBalanceStrategy::LeastConnections),
599 ProtocolLBStrategy::LeastConnections
600 );
601 assert_eq!(
602 convert_lb_strategy(LoadBalanceStrategy::HealthBased),
603 ProtocolLBStrategy::HealthBased
604 );
605 assert_eq!(
606 convert_lb_strategy(LoadBalanceStrategy::Random),
607 ProtocolLBStrategy::Random
608 );
609 }
610}