1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use sentinel_agent_protocol::v2::{
11 AgentCapabilities, AgentPool, AgentPoolConfig as ProtocolPoolConfig,
12 AgentPoolStats, CancelReason, ConfigPusher, ConfigUpdateType,
13 LoadBalanceStrategy as ProtocolLBStrategy, MetricsCollector,
14};
15use sentinel_agent_protocol::{
16 AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent,
17 ResponseBodyChunkEvent, ResponseHeadersEvent,
18};
19use sentinel_common::{
20 errors::{SentinelError, SentinelResult},
21 CircuitBreaker,
22};
23use sentinel_config::{AgentConfig, AgentEvent, FailureMode, LoadBalanceStrategy};
24use tracing::{debug, error, info, trace, warn};
25
26use super::metrics::AgentMetrics;
27
28const NO_TIMESTAMP: u64 = 0;
30
31pub struct AgentV2 {
33 config: AgentConfig,
35 pool: Arc<AgentPool>,
37 circuit_breaker: Arc<CircuitBreaker>,
39 metrics: Arc<AgentMetrics>,
41 base_instant: Instant,
43 last_success_ns: AtomicU64,
45 consecutive_failures: AtomicU32,
47}
48
49impl AgentV2 {
50 pub fn new(
52 config: AgentConfig,
53 circuit_breaker: Arc<CircuitBreaker>,
54 ) -> Self {
55 trace!(
56 agent_id = %config.id,
57 agent_type = ?config.agent_type,
58 timeout_ms = config.timeout_ms,
59 events = ?config.events,
60 "Creating v2 agent instance"
61 );
62
63 let pool_config = config.pool.as_ref().map(|p| ProtocolPoolConfig {
65 connections_per_agent: p.connections_per_agent,
66 load_balance_strategy: convert_lb_strategy(p.load_balance_strategy),
67 connect_timeout: Duration::from_millis(p.connect_timeout_ms),
68 request_timeout: Duration::from_millis(config.timeout_ms),
69 reconnect_interval: Duration::from_millis(p.reconnect_interval_ms),
70 max_reconnect_attempts: p.max_reconnect_attempts,
71 drain_timeout: Duration::from_millis(p.drain_timeout_ms),
72 max_concurrent_per_connection: p.max_concurrent_per_connection,
73 health_check_interval: Duration::from_millis(p.health_check_interval_ms),
74 }).unwrap_or_default();
75
76 let pool = Arc::new(AgentPool::with_config(pool_config));
77
78 Self {
79 config,
80 pool,
81 circuit_breaker,
82 metrics: Arc::new(AgentMetrics::default()),
83 base_instant: Instant::now(),
84 last_success_ns: AtomicU64::new(NO_TIMESTAMP),
85 consecutive_failures: AtomicU32::new(0),
86 }
87 }
88
89 pub fn id(&self) -> &str {
91 &self.config.id
92 }
93
94 pub fn circuit_breaker(&self) -> &CircuitBreaker {
96 &self.circuit_breaker
97 }
98
99 pub fn failure_mode(&self) -> FailureMode {
101 self.config.failure_mode
102 }
103
104 pub fn timeout_ms(&self) -> u64 {
106 self.config.timeout_ms
107 }
108
109 pub fn metrics(&self) -> &AgentMetrics {
111 &self.metrics
112 }
113
114 pub fn handles_event(&self, event_type: EventType) -> bool {
116 self.config.events.iter().any(|e| match (e, event_type) {
117 (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
118 (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
119 (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
120 (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
121 (AgentEvent::Log, EventType::RequestComplete) => true,
122 (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
123 _ => false,
124 })
125 }
126
127 pub async fn initialize(&self) -> SentinelResult<()> {
129 let endpoint = self.get_endpoint()?;
130
131 debug!(
132 agent_id = %self.config.id,
133 endpoint = %endpoint,
134 "Initializing v2 agent pool"
135 );
136
137 let start = Instant::now();
138
139 self.pool.add_agent(&self.config.id, &endpoint).await
141 .map_err(|e| {
142 error!(
143 agent_id = %self.config.id,
144 endpoint = %endpoint,
145 error = %e,
146 "Failed to add agent to v2 pool"
147 );
148 SentinelError::Agent {
149 agent: self.config.id.clone(),
150 message: format!("Failed to initialize v2 agent: {}", e),
151 event: "initialize".to_string(),
152 source: None,
153 }
154 })?;
155
156 info!(
157 agent_id = %self.config.id,
158 endpoint = %endpoint,
159 connect_time_ms = start.elapsed().as_millis(),
160 "V2 agent pool initialized"
161 );
162
163 if let Some(config_value) = &self.config.config {
165 self.send_configure(config_value.clone()).await?;
166 }
167
168 Ok(())
169 }
170
171 fn get_endpoint(&self) -> SentinelResult<String> {
173 use sentinel_config::AgentTransport;
174 match &self.config.transport {
175 AgentTransport::Grpc { address, .. } => Ok(address.clone()),
176 AgentTransport::UnixSocket { path } => {
177 Ok(format!("unix:{}", path.display()))
179 }
180 AgentTransport::Http { url, .. } => {
181 Err(SentinelError::Agent {
183 agent: self.config.id.clone(),
184 message: "HTTP transport not supported for v2 protocol".to_string(),
185 event: "initialize".to_string(),
186 source: None,
187 })
188 }
189 }
190 }
191
192 async fn send_configure(&self, _config: serde_json::Value) -> SentinelResult<()> {
197 debug!(
198 agent_id = %self.config.id,
199 "Configuration will be sent through control stream on connection"
200 );
201
202 info!(
207 agent_id = %self.config.id,
208 "V2 agent configuration noted"
209 );
210
211 Ok(())
212 }
213
214 pub async fn call_request_headers(
216 &self,
217 event: &RequestHeadersEvent,
218 ) -> SentinelResult<AgentResponse> {
219 let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
220
221 let correlation_id = &event.metadata.correlation_id;
223
224 trace!(
225 agent_id = %self.config.id,
226 call_num = call_num,
227 correlation_id = %correlation_id,
228 "Sending request headers to v2 agent"
229 );
230
231 self.pool
232 .send_request_headers(&self.config.id, correlation_id, event)
233 .await
234 .map_err(|e| {
235 error!(
236 agent_id = %self.config.id,
237 correlation_id = %correlation_id,
238 error = %e,
239 "V2 agent request headers call failed"
240 );
241 SentinelError::Agent {
242 agent: self.config.id.clone(),
243 message: e.to_string(),
244 event: "request_headers".to_string(),
245 source: None,
246 }
247 })
248 }
249
250 pub async fn call_request_body_chunk(
255 &self,
256 event: &RequestBodyChunkEvent,
257 ) -> SentinelResult<AgentResponse> {
258 let correlation_id = &event.correlation_id;
259
260 trace!(
261 agent_id = %self.config.id,
262 correlation_id = %correlation_id,
263 chunk_index = event.chunk_index,
264 is_last = event.is_last,
265 "Sending request body chunk to v2 agent"
266 );
267
268 self.pool
269 .send_request_body_chunk(&self.config.id, correlation_id, event)
270 .await
271 .map_err(|e| {
272 error!(
273 agent_id = %self.config.id,
274 correlation_id = %correlation_id,
275 error = %e,
276 "V2 agent request body chunk call failed"
277 );
278 SentinelError::Agent {
279 agent: self.config.id.clone(),
280 message: e.to_string(),
281 event: "request_body_chunk".to_string(),
282 source: None,
283 }
284 })
285 }
286
287 pub async fn call_response_headers(
292 &self,
293 event: &ResponseHeadersEvent,
294 ) -> SentinelResult<AgentResponse> {
295 let correlation_id = &event.correlation_id;
296
297 trace!(
298 agent_id = %self.config.id,
299 correlation_id = %correlation_id,
300 status = event.status,
301 "Sending response headers to v2 agent"
302 );
303
304 self.pool
305 .send_response_headers(&self.config.id, correlation_id, event)
306 .await
307 .map_err(|e| {
308 error!(
309 agent_id = %self.config.id,
310 correlation_id = %correlation_id,
311 error = %e,
312 "V2 agent response headers call failed"
313 );
314 SentinelError::Agent {
315 agent: self.config.id.clone(),
316 message: e.to_string(),
317 event: "response_headers".to_string(),
318 source: None,
319 }
320 })
321 }
322
323 pub async fn call_response_body_chunk(
328 &self,
329 event: &ResponseBodyChunkEvent,
330 ) -> SentinelResult<AgentResponse> {
331 let correlation_id = &event.correlation_id;
332
333 trace!(
334 agent_id = %self.config.id,
335 correlation_id = %correlation_id,
336 chunk_index = event.chunk_index,
337 is_last = event.is_last,
338 "Sending response body chunk to v2 agent"
339 );
340
341 self.pool
342 .send_response_body_chunk(&self.config.id, correlation_id, event)
343 .await
344 .map_err(|e| {
345 error!(
346 agent_id = %self.config.id,
347 correlation_id = %correlation_id,
348 error = %e,
349 "V2 agent response body chunk call failed"
350 );
351 SentinelError::Agent {
352 agent: self.config.id.clone(),
353 message: e.to_string(),
354 event: "response_body_chunk".to_string(),
355 source: None,
356 }
357 })
358 }
359
360 pub async fn cancel_request(
362 &self,
363 correlation_id: &str,
364 reason: CancelReason,
365 ) -> SentinelResult<()> {
366 trace!(
367 agent_id = %self.config.id,
368 correlation_id = %correlation_id,
369 reason = ?reason,
370 "Cancelling request on v2 agent"
371 );
372
373 self.pool
374 .cancel_request(&self.config.id, correlation_id, reason)
375 .await
376 .map_err(|e| {
377 warn!(
378 agent_id = %self.config.id,
379 correlation_id = %correlation_id,
380 error = %e,
381 "Failed to cancel request on v2 agent"
382 );
383 SentinelError::Agent {
384 agent: self.config.id.clone(),
385 message: format!("Cancel failed: {}", e),
386 event: "cancel".to_string(),
387 source: None,
388 }
389 })
390 }
391
392 pub async fn capabilities(&self) -> Option<AgentCapabilities> {
394 self.pool.agent_capabilities(&self.config.id).await
395 }
396
397 pub async fn is_healthy(&self) -> bool {
399 self.pool.is_agent_healthy(&self.config.id).await
400 }
401
402 pub fn record_success(&self, duration: Duration) {
404 let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
405 self.metrics
406 .duration_total_us
407 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
408 self.consecutive_failures.store(0, Ordering::Relaxed);
409 self.last_success_ns.store(
410 self.base_instant.elapsed().as_nanos() as u64,
411 Ordering::Relaxed,
412 );
413
414 trace!(
415 agent_id = %self.config.id,
416 duration_ms = duration.as_millis(),
417 total_successes = success_count,
418 "Recorded v2 agent call success"
419 );
420
421 self.circuit_breaker.record_success();
422 }
423
424 #[inline]
426 pub fn time_since_last_success(&self) -> Option<Duration> {
427 let last_ns = self.last_success_ns.load(Ordering::Relaxed);
428 if last_ns == NO_TIMESTAMP {
429 return None;
430 }
431 let current_ns = self.base_instant.elapsed().as_nanos() as u64;
432 Some(Duration::from_nanos(current_ns.saturating_sub(last_ns)))
433 }
434
435 pub fn record_failure(&self) {
437 let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
438 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
439
440 debug!(
441 agent_id = %self.config.id,
442 total_failures = fail_count,
443 consecutive_failures = consecutive,
444 "Recorded v2 agent call failure"
445 );
446
447 self.circuit_breaker.record_failure();
448 }
449
450 pub fn record_timeout(&self) {
452 let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
453 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
454
455 debug!(
456 agent_id = %self.config.id,
457 total_timeouts = timeout_count,
458 consecutive_failures = consecutive,
459 timeout_ms = self.config.timeout_ms,
460 "Recorded v2 agent call timeout"
461 );
462
463 self.circuit_breaker.record_failure();
464 }
465
466 pub async fn pool_stats(&self) -> Option<AgentPoolStats> {
468 self.pool.agent_stats(&self.config.id).await
469 }
470
471 pub fn pool_metrics_collector(&self) -> &MetricsCollector {
476 self.pool.metrics_collector()
477 }
478
479 pub fn pool_metrics_collector_arc(&self) -> Arc<MetricsCollector> {
483 self.pool.metrics_collector_arc()
484 }
485
486 pub fn export_prometheus(&self) -> String {
491 self.pool.export_prometheus()
492 }
493
494 pub fn config_pusher(&self) -> &ConfigPusher {
499 self.pool.config_pusher()
500 }
501
502 pub fn push_config(&self, update_type: ConfigUpdateType) -> Option<String> {
506 self.pool.push_config_to_agent(&self.config.id, update_type)
507 }
508
509 pub async fn send_configuration(&self, config: serde_json::Value) -> SentinelResult<()> {
513 if let Some(push_id) = self.push_config(ConfigUpdateType::RequestReload) {
517 debug!(
518 agent_id = %self.config.id,
519 push_id = %push_id,
520 "Configuration push initiated"
521 );
522 Ok(())
523 } else {
524 warn!(
525 agent_id = %self.config.id,
526 "Agent does not support config push"
527 );
528 Err(SentinelError::Agent {
529 agent: self.config.id.clone(),
530 message: "Agent does not support config push".to_string(),
531 event: "send_configuration".to_string(),
532 source: None,
533 })
534 }
535 }
536
537 pub async fn shutdown(&self) {
541 debug!(
542 agent_id = %self.config.id,
543 "Shutting down v2 agent"
544 );
545
546 if let Err(e) = self.pool.remove_agent(&self.config.id).await {
548 warn!(
549 agent_id = %self.config.id,
550 error = %e,
551 "Error removing agent from pool during shutdown"
552 );
553 }
554
555 let stats = (
556 self.metrics.calls_total.load(Ordering::Relaxed),
557 self.metrics.calls_success.load(Ordering::Relaxed),
558 self.metrics.calls_failed.load(Ordering::Relaxed),
559 self.metrics.calls_timeout.load(Ordering::Relaxed),
560 );
561
562 info!(
563 agent_id = %self.config.id,
564 total_calls = stats.0,
565 successes = stats.1,
566 failures = stats.2,
567 timeouts = stats.3,
568 "V2 agent shutdown complete"
569 );
570 }
571}
572
573fn convert_lb_strategy(strategy: LoadBalanceStrategy) -> ProtocolLBStrategy {
575 match strategy {
576 LoadBalanceStrategy::RoundRobin => ProtocolLBStrategy::RoundRobin,
577 LoadBalanceStrategy::LeastConnections => ProtocolLBStrategy::LeastConnections,
578 LoadBalanceStrategy::HealthBased => ProtocolLBStrategy::HealthBased,
579 LoadBalanceStrategy::Random => ProtocolLBStrategy::Random,
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 use super::*;
586
587 #[test]
588 fn test_convert_lb_strategy() {
589 assert_eq!(
590 convert_lb_strategy(LoadBalanceStrategy::RoundRobin),
591 ProtocolLBStrategy::RoundRobin
592 );
593 assert_eq!(
594 convert_lb_strategy(LoadBalanceStrategy::LeastConnections),
595 ProtocolLBStrategy::LeastConnections
596 );
597 assert_eq!(
598 convert_lb_strategy(LoadBalanceStrategy::HealthBased),
599 ProtocolLBStrategy::HealthBased
600 );
601 assert_eq!(
602 convert_lb_strategy(LoadBalanceStrategy::Random),
603 ProtocolLBStrategy::Random
604 );
605 }
606}