1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use sentinel_agent_protocol::v2::{
11 AgentCapabilities, AgentPool, AgentPoolConfig as ProtocolPoolConfig,
12 AgentPoolStats, CancelReason, ConfigPusher, ConfigUpdateType,
13 LoadBalanceStrategy as ProtocolLBStrategy, MetricsCollector,
14};
15use sentinel_agent_protocol::{
16 AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent,
17 ResponseBodyChunkEvent, ResponseHeadersEvent,
18};
19use sentinel_common::{
20 errors::{SentinelError, SentinelResult},
21 CircuitBreaker,
22};
23use sentinel_config::{AgentConfig, AgentEvent, FailureMode, LoadBalanceStrategy};
24use tracing::{debug, error, info, trace, warn};
25
26use super::metrics::AgentMetrics;
27
28const NO_TIMESTAMP: u64 = 0;
30
31pub struct AgentV2 {
33 config: AgentConfig,
35 pool: Arc<AgentPool>,
37 circuit_breaker: Arc<CircuitBreaker>,
39 metrics: Arc<AgentMetrics>,
41 base_instant: Instant,
43 last_success_ns: AtomicU64,
45 consecutive_failures: AtomicU32,
47}
48
49impl AgentV2 {
50 pub fn new(
52 config: AgentConfig,
53 circuit_breaker: Arc<CircuitBreaker>,
54 ) -> Self {
55 trace!(
56 agent_id = %config.id,
57 agent_type = ?config.agent_type,
58 timeout_ms = config.timeout_ms,
59 events = ?config.events,
60 "Creating v2 agent instance"
61 );
62
63 let pool_config = config.pool.as_ref().map(|p| ProtocolPoolConfig {
65 connections_per_agent: p.connections_per_agent,
66 load_balance_strategy: convert_lb_strategy(p.load_balance_strategy),
67 connect_timeout: Duration::from_millis(p.connect_timeout_ms),
68 request_timeout: Duration::from_millis(config.timeout_ms),
69 reconnect_interval: Duration::from_millis(p.reconnect_interval_ms),
70 max_reconnect_attempts: p.max_reconnect_attempts,
71 drain_timeout: Duration::from_millis(p.drain_timeout_ms),
72 max_concurrent_per_connection: p.max_concurrent_per_connection,
73 health_check_interval: Duration::from_millis(p.health_check_interval_ms),
74 ..Default::default()
75 }).unwrap_or_default();
76
77 let pool = Arc::new(AgentPool::with_config(pool_config));
78
79 Self {
80 config,
81 pool,
82 circuit_breaker,
83 metrics: Arc::new(AgentMetrics::default()),
84 base_instant: Instant::now(),
85 last_success_ns: AtomicU64::new(NO_TIMESTAMP),
86 consecutive_failures: AtomicU32::new(0),
87 }
88 }
89
90 pub fn id(&self) -> &str {
92 &self.config.id
93 }
94
95 pub fn circuit_breaker(&self) -> &CircuitBreaker {
97 &self.circuit_breaker
98 }
99
100 pub fn failure_mode(&self) -> FailureMode {
102 self.config.failure_mode
103 }
104
105 pub fn timeout_ms(&self) -> u64 {
107 self.config.timeout_ms
108 }
109
110 pub fn metrics(&self) -> &AgentMetrics {
112 &self.metrics
113 }
114
115 pub fn handles_event(&self, event_type: EventType) -> bool {
117 self.config.events.iter().any(|e| match (e, event_type) {
118 (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
119 (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
120 (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
121 (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
122 (AgentEvent::Log, EventType::RequestComplete) => true,
123 (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
124 _ => false,
125 })
126 }
127
128 pub async fn initialize(&self) -> SentinelResult<()> {
130 let endpoint = self.get_endpoint()?;
131
132 debug!(
133 agent_id = %self.config.id,
134 endpoint = %endpoint,
135 "Initializing v2 agent pool"
136 );
137
138 let start = Instant::now();
139
140 self.pool.add_agent(&self.config.id, &endpoint).await
142 .map_err(|e| {
143 error!(
144 agent_id = %self.config.id,
145 endpoint = %endpoint,
146 error = %e,
147 "Failed to add agent to v2 pool"
148 );
149 SentinelError::Agent {
150 agent: self.config.id.clone(),
151 message: format!("Failed to initialize v2 agent: {}", e),
152 event: "initialize".to_string(),
153 source: None,
154 }
155 })?;
156
157 info!(
158 agent_id = %self.config.id,
159 endpoint = %endpoint,
160 connect_time_ms = start.elapsed().as_millis(),
161 "V2 agent pool initialized"
162 );
163
164 if let Some(config_value) = &self.config.config {
166 self.send_configure(config_value.clone()).await?;
167 }
168
169 Ok(())
170 }
171
172 fn get_endpoint(&self) -> SentinelResult<String> {
174 use sentinel_config::AgentTransport;
175 match &self.config.transport {
176 AgentTransport::Grpc { address, .. } => Ok(address.clone()),
177 AgentTransport::UnixSocket { path } => {
178 Ok(format!("unix:{}", path.display()))
180 }
181 AgentTransport::Http { url, .. } => {
182 Err(SentinelError::Agent {
184 agent: self.config.id.clone(),
185 message: "HTTP transport not supported for v2 protocol".to_string(),
186 event: "initialize".to_string(),
187 source: None,
188 })
189 }
190 }
191 }
192
193 async fn send_configure(&self, _config: serde_json::Value) -> SentinelResult<()> {
198 debug!(
199 agent_id = %self.config.id,
200 "Configuration will be sent through control stream on connection"
201 );
202
203 info!(
208 agent_id = %self.config.id,
209 "V2 agent configuration noted"
210 );
211
212 Ok(())
213 }
214
215 pub async fn call_request_headers(
217 &self,
218 event: &RequestHeadersEvent,
219 ) -> SentinelResult<AgentResponse> {
220 let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
221
222 let correlation_id = &event.metadata.correlation_id;
224
225 trace!(
226 agent_id = %self.config.id,
227 call_num = call_num,
228 correlation_id = %correlation_id,
229 "Sending request headers to v2 agent"
230 );
231
232 self.pool
233 .send_request_headers(&self.config.id, correlation_id, event)
234 .await
235 .map_err(|e| {
236 error!(
237 agent_id = %self.config.id,
238 correlation_id = %correlation_id,
239 error = %e,
240 "V2 agent request headers call failed"
241 );
242 SentinelError::Agent {
243 agent: self.config.id.clone(),
244 message: e.to_string(),
245 event: "request_headers".to_string(),
246 source: None,
247 }
248 })
249 }
250
251 pub async fn call_request_body_chunk(
256 &self,
257 event: &RequestBodyChunkEvent,
258 ) -> SentinelResult<AgentResponse> {
259 let correlation_id = &event.correlation_id;
260
261 trace!(
262 agent_id = %self.config.id,
263 correlation_id = %correlation_id,
264 chunk_index = event.chunk_index,
265 is_last = event.is_last,
266 "Sending request body chunk to v2 agent"
267 );
268
269 self.pool
270 .send_request_body_chunk(&self.config.id, correlation_id, event)
271 .await
272 .map_err(|e| {
273 error!(
274 agent_id = %self.config.id,
275 correlation_id = %correlation_id,
276 error = %e,
277 "V2 agent request body chunk call failed"
278 );
279 SentinelError::Agent {
280 agent: self.config.id.clone(),
281 message: e.to_string(),
282 event: "request_body_chunk".to_string(),
283 source: None,
284 }
285 })
286 }
287
288 pub async fn call_response_headers(
293 &self,
294 event: &ResponseHeadersEvent,
295 ) -> SentinelResult<AgentResponse> {
296 let correlation_id = &event.correlation_id;
297
298 trace!(
299 agent_id = %self.config.id,
300 correlation_id = %correlation_id,
301 status = event.status,
302 "Sending response headers to v2 agent"
303 );
304
305 self.pool
306 .send_response_headers(&self.config.id, correlation_id, event)
307 .await
308 .map_err(|e| {
309 error!(
310 agent_id = %self.config.id,
311 correlation_id = %correlation_id,
312 error = %e,
313 "V2 agent response headers call failed"
314 );
315 SentinelError::Agent {
316 agent: self.config.id.clone(),
317 message: e.to_string(),
318 event: "response_headers".to_string(),
319 source: None,
320 }
321 })
322 }
323
324 pub async fn call_response_body_chunk(
329 &self,
330 event: &ResponseBodyChunkEvent,
331 ) -> SentinelResult<AgentResponse> {
332 let correlation_id = &event.correlation_id;
333
334 trace!(
335 agent_id = %self.config.id,
336 correlation_id = %correlation_id,
337 chunk_index = event.chunk_index,
338 is_last = event.is_last,
339 "Sending response body chunk to v2 agent"
340 );
341
342 self.pool
343 .send_response_body_chunk(&self.config.id, correlation_id, event)
344 .await
345 .map_err(|e| {
346 error!(
347 agent_id = %self.config.id,
348 correlation_id = %correlation_id,
349 error = %e,
350 "V2 agent response body chunk call failed"
351 );
352 SentinelError::Agent {
353 agent: self.config.id.clone(),
354 message: e.to_string(),
355 event: "response_body_chunk".to_string(),
356 source: None,
357 }
358 })
359 }
360
361 pub async fn cancel_request(
363 &self,
364 correlation_id: &str,
365 reason: CancelReason,
366 ) -> SentinelResult<()> {
367 trace!(
368 agent_id = %self.config.id,
369 correlation_id = %correlation_id,
370 reason = ?reason,
371 "Cancelling request on v2 agent"
372 );
373
374 self.pool
375 .cancel_request(&self.config.id, correlation_id, reason)
376 .await
377 .map_err(|e| {
378 warn!(
379 agent_id = %self.config.id,
380 correlation_id = %correlation_id,
381 error = %e,
382 "Failed to cancel request on v2 agent"
383 );
384 SentinelError::Agent {
385 agent: self.config.id.clone(),
386 message: format!("Cancel failed: {}", e),
387 event: "cancel".to_string(),
388 source: None,
389 }
390 })
391 }
392
393 pub async fn capabilities(&self) -> Option<AgentCapabilities> {
395 self.pool.agent_capabilities(&self.config.id).await
396 }
397
398 pub async fn is_healthy(&self) -> bool {
400 self.pool.is_agent_healthy(&self.config.id)
401 }
402
403 pub fn record_success(&self, duration: Duration) {
405 let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
406 self.metrics
407 .duration_total_us
408 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
409 self.consecutive_failures.store(0, Ordering::Relaxed);
410 self.last_success_ns.store(
411 self.base_instant.elapsed().as_nanos() as u64,
412 Ordering::Relaxed,
413 );
414
415 trace!(
416 agent_id = %self.config.id,
417 duration_ms = duration.as_millis(),
418 total_successes = success_count,
419 "Recorded v2 agent call success"
420 );
421
422 self.circuit_breaker.record_success();
423 }
424
425 #[inline]
427 pub fn time_since_last_success(&self) -> Option<Duration> {
428 let last_ns = self.last_success_ns.load(Ordering::Relaxed);
429 if last_ns == NO_TIMESTAMP {
430 return None;
431 }
432 let current_ns = self.base_instant.elapsed().as_nanos() as u64;
433 Some(Duration::from_nanos(current_ns.saturating_sub(last_ns)))
434 }
435
436 pub fn record_failure(&self) {
438 let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
439 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
440
441 debug!(
442 agent_id = %self.config.id,
443 total_failures = fail_count,
444 consecutive_failures = consecutive,
445 "Recorded v2 agent call failure"
446 );
447
448 self.circuit_breaker.record_failure();
449 }
450
451 pub fn record_timeout(&self) {
453 let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
454 let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
455
456 debug!(
457 agent_id = %self.config.id,
458 total_timeouts = timeout_count,
459 consecutive_failures = consecutive,
460 timeout_ms = self.config.timeout_ms,
461 "Recorded v2 agent call timeout"
462 );
463
464 self.circuit_breaker.record_failure();
465 }
466
467 pub async fn pool_stats(&self) -> Option<AgentPoolStats> {
469 self.pool.agent_stats(&self.config.id).await
470 }
471
472 pub fn pool_metrics_collector(&self) -> &MetricsCollector {
477 self.pool.metrics_collector()
478 }
479
480 pub fn pool_metrics_collector_arc(&self) -> Arc<MetricsCollector> {
484 self.pool.metrics_collector_arc()
485 }
486
487 pub fn export_prometheus(&self) -> String {
492 self.pool.export_prometheus()
493 }
494
495 pub fn config_pusher(&self) -> &ConfigPusher {
500 self.pool.config_pusher()
501 }
502
503 pub fn push_config(&self, update_type: ConfigUpdateType) -> Option<String> {
507 self.pool.push_config_to_agent(&self.config.id, update_type)
508 }
509
510 pub async fn send_configuration(&self, config: serde_json::Value) -> SentinelResult<()> {
514 if let Some(push_id) = self.push_config(ConfigUpdateType::RequestReload) {
518 debug!(
519 agent_id = %self.config.id,
520 push_id = %push_id,
521 "Configuration push initiated"
522 );
523 Ok(())
524 } else {
525 warn!(
526 agent_id = %self.config.id,
527 "Agent does not support config push"
528 );
529 Err(SentinelError::Agent {
530 agent: self.config.id.clone(),
531 message: "Agent does not support config push".to_string(),
532 event: "send_configuration".to_string(),
533 source: None,
534 })
535 }
536 }
537
538 pub async fn shutdown(&self) {
542 debug!(
543 agent_id = %self.config.id,
544 "Shutting down v2 agent"
545 );
546
547 if let Err(e) = self.pool.remove_agent(&self.config.id).await {
549 warn!(
550 agent_id = %self.config.id,
551 error = %e,
552 "Error removing agent from pool during shutdown"
553 );
554 }
555
556 let stats = (
557 self.metrics.calls_total.load(Ordering::Relaxed),
558 self.metrics.calls_success.load(Ordering::Relaxed),
559 self.metrics.calls_failed.load(Ordering::Relaxed),
560 self.metrics.calls_timeout.load(Ordering::Relaxed),
561 );
562
563 info!(
564 agent_id = %self.config.id,
565 total_calls = stats.0,
566 successes = stats.1,
567 failures = stats.2,
568 timeouts = stats.3,
569 "V2 agent shutdown complete"
570 );
571 }
572}
573
574fn convert_lb_strategy(strategy: LoadBalanceStrategy) -> ProtocolLBStrategy {
576 match strategy {
577 LoadBalanceStrategy::RoundRobin => ProtocolLBStrategy::RoundRobin,
578 LoadBalanceStrategy::LeastConnections => ProtocolLBStrategy::LeastConnections,
579 LoadBalanceStrategy::HealthBased => ProtocolLBStrategy::HealthBased,
580 LoadBalanceStrategy::Random => ProtocolLBStrategy::Random,
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587
588 #[test]
589 fn test_convert_lb_strategy() {
590 assert_eq!(
591 convert_lb_strategy(LoadBalanceStrategy::RoundRobin),
592 ProtocolLBStrategy::RoundRobin
593 );
594 assert_eq!(
595 convert_lb_strategy(LoadBalanceStrategy::LeastConnections),
596 ProtocolLBStrategy::LeastConnections
597 );
598 assert_eq!(
599 convert_lb_strategy(LoadBalanceStrategy::HealthBased),
600 ProtocolLBStrategy::HealthBased
601 );
602 assert_eq!(
603 convert_lb_strategy(LoadBalanceStrategy::Random),
604 ProtocolLBStrategy::Random
605 );
606 }
607}