Skip to main content

sentinel_proxy/agents/
agent_v2.rs

1//! Protocol v2 agent implementation.
2//!
3//! This module provides v2 agent support using the bidirectional streaming
4//! protocol with capabilities, health reporting, and metrics export.
5
6use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use sentinel_agent_protocol::v2::{
11    AgentCapabilities, AgentPool, AgentPoolConfig as ProtocolPoolConfig, AgentPoolStats,
12    CancelReason, ConfigPusher, ConfigUpdateType, LoadBalanceStrategy as ProtocolLBStrategy,
13    MetricsCollector,
14};
15use sentinel_agent_protocol::{
16    AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent, ResponseBodyChunkEvent,
17    ResponseHeadersEvent,
18};
19use sentinel_common::{
20    errors::{SentinelError, SentinelResult},
21    CircuitBreaker,
22};
23use sentinel_config::{AgentConfig, AgentEvent, FailureMode, LoadBalanceStrategy};
24use tracing::{debug, error, info, trace, warn};
25
26use super::metrics::AgentMetrics;
27
28/// Sentinel value indicating no timestamp recorded
29const NO_TIMESTAMP: u64 = 0;
30
31/// Protocol v2 agent with connection pooling and bidirectional streaming.
32pub struct AgentV2 {
33    /// Agent configuration
34    config: AgentConfig,
35    /// V2 connection pool
36    pool: Arc<AgentPool>,
37    /// Circuit breaker
38    circuit_breaker: Arc<CircuitBreaker>,
39    /// Agent-specific metrics
40    metrics: Arc<AgentMetrics>,
41    /// Base instant for timestamp calculations
42    base_instant: Instant,
43    /// Last successful call (nanoseconds since base_instant, 0 = never)
44    last_success_ns: AtomicU64,
45    /// Consecutive failures
46    consecutive_failures: AtomicU32,
47}
48
49impl AgentV2 {
50    /// Create a new v2 agent.
51    pub fn new(config: AgentConfig, circuit_breaker: Arc<CircuitBreaker>) -> Self {
52        trace!(
53            agent_id = %config.id,
54            agent_type = ?config.agent_type,
55            timeout_ms = config.timeout_ms,
56            events = ?config.events,
57            "Creating v2 agent instance"
58        );
59
60        // Convert config pool settings to protocol pool config
61        let pool_config = config
62            .pool
63            .as_ref()
64            .map(|p| ProtocolPoolConfig {
65                connections_per_agent: p.connections_per_agent,
66                load_balance_strategy: convert_lb_strategy(p.load_balance_strategy),
67                connect_timeout: Duration::from_millis(p.connect_timeout_ms),
68                request_timeout: Duration::from_millis(config.timeout_ms),
69                reconnect_interval: Duration::from_millis(p.reconnect_interval_ms),
70                max_reconnect_attempts: p.max_reconnect_attempts,
71                drain_timeout: Duration::from_millis(p.drain_timeout_ms),
72                max_concurrent_per_connection: p.max_concurrent_per_connection,
73                health_check_interval: Duration::from_millis(p.health_check_interval_ms),
74                ..Default::default()
75            })
76            .unwrap_or_default();
77
78        let pool = Arc::new(AgentPool::with_config(pool_config));
79
80        Self {
81            config,
82            pool,
83            circuit_breaker,
84            metrics: Arc::new(AgentMetrics::default()),
85            base_instant: Instant::now(),
86            last_success_ns: AtomicU64::new(NO_TIMESTAMP),
87            consecutive_failures: AtomicU32::new(0),
88        }
89    }
90
91    /// Get the agent ID.
92    pub fn id(&self) -> &str {
93        &self.config.id
94    }
95
96    /// Get the agent's circuit breaker.
97    pub fn circuit_breaker(&self) -> &CircuitBreaker {
98        &self.circuit_breaker
99    }
100
101    /// Get the agent's failure mode.
102    pub fn failure_mode(&self) -> FailureMode {
103        self.config.failure_mode
104    }
105
106    /// Get the agent's timeout in milliseconds.
107    pub fn timeout_ms(&self) -> u64 {
108        self.config.timeout_ms
109    }
110
111    /// Get the agent's metrics.
112    pub fn metrics(&self) -> &AgentMetrics {
113        &self.metrics
114    }
115
116    /// Check if agent handles a specific event type.
117    pub fn handles_event(&self, event_type: EventType) -> bool {
118        self.config.events.iter().any(|e| match (e, event_type) {
119            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
120            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
121            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
122            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
123            (AgentEvent::Log, EventType::RequestComplete) => true,
124            (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
125            _ => false,
126        })
127    }
128
129    /// Initialize agent connection(s).
130    pub async fn initialize(&self) -> SentinelResult<()> {
131        let endpoint = self.get_endpoint()?;
132
133        debug!(
134            agent_id = %self.config.id,
135            endpoint = %endpoint,
136            "Initializing v2 agent pool"
137        );
138
139        let start = Instant::now();
140
141        // Add agent to pool - pool will establish connections
142        self.pool
143            .add_agent(&self.config.id, &endpoint)
144            .await
145            .map_err(|e| {
146                error!(
147                    agent_id = %self.config.id,
148                    endpoint = %endpoint,
149                    error = %e,
150                    "Failed to add agent to v2 pool"
151                );
152                SentinelError::Agent {
153                    agent: self.config.id.clone(),
154                    message: format!("Failed to initialize v2 agent: {}", e),
155                    event: "initialize".to_string(),
156                    source: None,
157                }
158            })?;
159
160        info!(
161            agent_id = %self.config.id,
162            endpoint = %endpoint,
163            connect_time_ms = start.elapsed().as_millis(),
164            "V2 agent pool initialized"
165        );
166
167        // Send configuration if present
168        if let Some(config_value) = &self.config.config {
169            self.send_configure(config_value.clone()).await?;
170        }
171
172        Ok(())
173    }
174
175    /// Get endpoint from transport config.
176    fn get_endpoint(&self) -> SentinelResult<String> {
177        use sentinel_config::AgentTransport;
178        match &self.config.transport {
179            AgentTransport::Grpc { address, .. } => Ok(address.clone()),
180            AgentTransport::UnixSocket { path } => {
181                // For UDS, format as unix:path
182                Ok(format!("unix:{}", path.display()))
183            }
184            AgentTransport::Http { url, .. } => {
185                // V2 doesn't support HTTP transport
186                Err(SentinelError::Agent {
187                    agent: self.config.id.clone(),
188                    message: "HTTP transport not supported for v2 protocol".to_string(),
189                    event: "initialize".to_string(),
190                    source: None,
191                })
192            }
193        }
194    }
195
196    /// Send configuration to the agent.
197    ///
198    /// Note: Configuration is sent through the control stream when connections
199    /// are established. This is a placeholder for explicit config updates.
200    async fn send_configure(&self, _config: serde_json::Value) -> SentinelResult<()> {
201        debug!(
202            agent_id = %self.config.id,
203            "Configuration will be sent through control stream on connection"
204        );
205
206        // Configuration is handled by the pool's connections during initialization
207        // For explicit config updates, we'd need to iterate through connections
208        // and send configure through their control streams
209
210        info!(
211            agent_id = %self.config.id,
212            "V2 agent configuration noted"
213        );
214
215        Ok(())
216    }
217
218    /// Call agent with request headers event.
219    pub async fn call_request_headers(
220        &self,
221        event: &RequestHeadersEvent,
222    ) -> SentinelResult<AgentResponse> {
223        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
224
225        // Get correlation_id from event metadata
226        let correlation_id = &event.metadata.correlation_id;
227
228        trace!(
229            agent_id = %self.config.id,
230            call_num = call_num,
231            correlation_id = %correlation_id,
232            "Sending request headers to v2 agent"
233        );
234
235        self.pool
236            .send_request_headers(&self.config.id, correlation_id, event)
237            .await
238            .map_err(|e| {
239                error!(
240                    agent_id = %self.config.id,
241                    correlation_id = %correlation_id,
242                    error = %e,
243                    "V2 agent request headers call failed"
244                );
245                SentinelError::Agent {
246                    agent: self.config.id.clone(),
247                    message: e.to_string(),
248                    event: "request_headers".to_string(),
249                    source: None,
250                }
251            })
252    }
253
254    /// Call agent with request body chunk event.
255    ///
256    /// For streaming body inspection, chunks are sent sequentially with
257    /// increasing `chunk_index`. The agent responds after processing each chunk.
258    pub async fn call_request_body_chunk(
259        &self,
260        event: &RequestBodyChunkEvent,
261    ) -> SentinelResult<AgentResponse> {
262        let correlation_id = &event.correlation_id;
263
264        trace!(
265            agent_id = %self.config.id,
266            correlation_id = %correlation_id,
267            chunk_index = event.chunk_index,
268            is_last = event.is_last,
269            "Sending request body chunk to v2 agent"
270        );
271
272        self.pool
273            .send_request_body_chunk(&self.config.id, correlation_id, event)
274            .await
275            .map_err(|e| {
276                error!(
277                    agent_id = %self.config.id,
278                    correlation_id = %correlation_id,
279                    error = %e,
280                    "V2 agent request body chunk call failed"
281                );
282                SentinelError::Agent {
283                    agent: self.config.id.clone(),
284                    message: e.to_string(),
285                    event: "request_body_chunk".to_string(),
286                    source: None,
287                }
288            })
289    }
290
291    /// Call agent with response headers event.
292    ///
293    /// Called when upstream response headers are received, allowing the agent
294    /// to inspect/modify response headers before they're sent to the client.
295    pub async fn call_response_headers(
296        &self,
297        event: &ResponseHeadersEvent,
298    ) -> SentinelResult<AgentResponse> {
299        let correlation_id = &event.correlation_id;
300
301        trace!(
302            agent_id = %self.config.id,
303            correlation_id = %correlation_id,
304            status = event.status,
305            "Sending response headers to v2 agent"
306        );
307
308        self.pool
309            .send_response_headers(&self.config.id, correlation_id, event)
310            .await
311            .map_err(|e| {
312                error!(
313                    agent_id = %self.config.id,
314                    correlation_id = %correlation_id,
315                    error = %e,
316                    "V2 agent response headers call failed"
317                );
318                SentinelError::Agent {
319                    agent: self.config.id.clone(),
320                    message: e.to_string(),
321                    event: "response_headers".to_string(),
322                    source: None,
323                }
324            })
325    }
326
327    /// Call agent with response body chunk event.
328    ///
329    /// For streaming response body inspection, chunks are sent sequentially.
330    /// The agent can inspect and optionally modify response body data.
331    pub async fn call_response_body_chunk(
332        &self,
333        event: &ResponseBodyChunkEvent,
334    ) -> SentinelResult<AgentResponse> {
335        let correlation_id = &event.correlation_id;
336
337        trace!(
338            agent_id = %self.config.id,
339            correlation_id = %correlation_id,
340            chunk_index = event.chunk_index,
341            is_last = event.is_last,
342            "Sending response body chunk to v2 agent"
343        );
344
345        self.pool
346            .send_response_body_chunk(&self.config.id, correlation_id, event)
347            .await
348            .map_err(|e| {
349                error!(
350                    agent_id = %self.config.id,
351                    correlation_id = %correlation_id,
352                    error = %e,
353                    "V2 agent response body chunk call failed"
354                );
355                SentinelError::Agent {
356                    agent: self.config.id.clone(),
357                    message: e.to_string(),
358                    event: "response_body_chunk".to_string(),
359                    source: None,
360                }
361            })
362    }
363
364    /// Cancel an in-flight request.
365    pub async fn cancel_request(
366        &self,
367        correlation_id: &str,
368        reason: CancelReason,
369    ) -> SentinelResult<()> {
370        trace!(
371            agent_id = %self.config.id,
372            correlation_id = %correlation_id,
373            reason = ?reason,
374            "Cancelling request on v2 agent"
375        );
376
377        self.pool
378            .cancel_request(&self.config.id, correlation_id, reason)
379            .await
380            .map_err(|e| {
381                warn!(
382                    agent_id = %self.config.id,
383                    correlation_id = %correlation_id,
384                    error = %e,
385                    "Failed to cancel request on v2 agent"
386                );
387                SentinelError::Agent {
388                    agent: self.config.id.clone(),
389                    message: format!("Cancel failed: {}", e),
390                    event: "cancel".to_string(),
391                    source: None,
392                }
393            })
394    }
395
396    /// Get agent capabilities.
397    pub async fn capabilities(&self) -> Option<AgentCapabilities> {
398        self.pool.agent_capabilities(&self.config.id).await
399    }
400
401    /// Check if agent is healthy.
402    pub async fn is_healthy(&self) -> bool {
403        self.pool.is_agent_healthy(&self.config.id)
404    }
405
406    /// Record successful call (lock-free).
407    pub fn record_success(&self, duration: Duration) {
408        let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
409        self.metrics
410            .duration_total_us
411            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
412        self.consecutive_failures.store(0, Ordering::Relaxed);
413        self.last_success_ns.store(
414            self.base_instant.elapsed().as_nanos() as u64,
415            Ordering::Relaxed,
416        );
417
418        trace!(
419            agent_id = %self.config.id,
420            duration_ms = duration.as_millis(),
421            total_successes = success_count,
422            "Recorded v2 agent call success"
423        );
424
425        self.circuit_breaker.record_success();
426    }
427
428    /// Get the time since last successful call.
429    #[inline]
430    pub fn time_since_last_success(&self) -> Option<Duration> {
431        let last_ns = self.last_success_ns.load(Ordering::Relaxed);
432        if last_ns == NO_TIMESTAMP {
433            return None;
434        }
435        let current_ns = self.base_instant.elapsed().as_nanos() as u64;
436        Some(Duration::from_nanos(current_ns.saturating_sub(last_ns)))
437    }
438
439    /// Record failed call.
440    pub fn record_failure(&self) {
441        let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
442        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
443
444        debug!(
445            agent_id = %self.config.id,
446            total_failures = fail_count,
447            consecutive_failures = consecutive,
448            "Recorded v2 agent call failure"
449        );
450
451        self.circuit_breaker.record_failure();
452    }
453
454    /// Record timeout.
455    pub fn record_timeout(&self) {
456        let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
457        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
458
459        debug!(
460            agent_id = %self.config.id,
461            total_timeouts = timeout_count,
462            consecutive_failures = consecutive,
463            timeout_ms = self.config.timeout_ms,
464            "Recorded v2 agent call timeout"
465        );
466
467        self.circuit_breaker.record_failure();
468    }
469
470    /// Get pool statistics.
471    pub async fn pool_stats(&self) -> Option<AgentPoolStats> {
472        self.pool.agent_stats(&self.config.id).await
473    }
474
475    /// Get the pool's metrics collector.
476    ///
477    /// Returns a reference to the shared metrics collector that aggregates
478    /// metrics reports from all agents in this pool.
479    pub fn pool_metrics_collector(&self) -> &MetricsCollector {
480        self.pool.metrics_collector()
481    }
482
483    /// Get an Arc to the pool's metrics collector.
484    ///
485    /// This is useful for registering the collector with a MetricsManager.
486    pub fn pool_metrics_collector_arc(&self) -> Arc<MetricsCollector> {
487        self.pool.metrics_collector_arc()
488    }
489
490    /// Export agent metrics in Prometheus format.
491    ///
492    /// Returns a string containing all metrics collected from agents
493    /// in Prometheus exposition format.
494    pub fn export_prometheus(&self) -> String {
495        self.pool.export_prometheus()
496    }
497
498    /// Get the pool's config pusher.
499    ///
500    /// Returns a reference to the shared config pusher that distributes
501    /// configuration updates to agents.
502    pub fn config_pusher(&self) -> &ConfigPusher {
503        self.pool.config_pusher()
504    }
505
506    /// Push a configuration update to this agent.
507    ///
508    /// Returns the push ID if the agent supports config push, None otherwise.
509    pub fn push_config(&self, update_type: ConfigUpdateType) -> Option<String> {
510        self.pool.push_config_to_agent(&self.config.id, update_type)
511    }
512
513    /// Send a configuration update to this agent via the control stream.
514    ///
515    /// This is a direct config push using the `ConfigureEvent` message.
516    pub async fn send_configuration(&self, config: serde_json::Value) -> SentinelResult<()> {
517        // Get a connection and send the configure event
518        // For now, we rely on the pool's config push mechanism
519        // which tracks acknowledgments and retries
520        if let Some(push_id) = self.push_config(ConfigUpdateType::RequestReload) {
521            debug!(
522                agent_id = %self.config.id,
523                push_id = %push_id,
524                "Configuration push initiated"
525            );
526            Ok(())
527        } else {
528            warn!(
529                agent_id = %self.config.id,
530                "Agent does not support config push"
531            );
532            Err(SentinelError::Agent {
533                agent: self.config.id.clone(),
534                message: "Agent does not support config push".to_string(),
535                event: "send_configuration".to_string(),
536                source: None,
537            })
538        }
539    }
540
541    /// Shutdown agent.
542    ///
543    /// This removes the agent from the pool and closes all connections.
544    pub async fn shutdown(&self) {
545        debug!(
546            agent_id = %self.config.id,
547            "Shutting down v2 agent"
548        );
549
550        // Remove from pool - this gracefully closes connections
551        if let Err(e) = self.pool.remove_agent(&self.config.id).await {
552            warn!(
553                agent_id = %self.config.id,
554                error = %e,
555                "Error removing agent from pool during shutdown"
556            );
557        }
558
559        let stats = (
560            self.metrics.calls_total.load(Ordering::Relaxed),
561            self.metrics.calls_success.load(Ordering::Relaxed),
562            self.metrics.calls_failed.load(Ordering::Relaxed),
563            self.metrics.calls_timeout.load(Ordering::Relaxed),
564        );
565
566        info!(
567            agent_id = %self.config.id,
568            total_calls = stats.0,
569            successes = stats.1,
570            failures = stats.2,
571            timeouts = stats.3,
572            "V2 agent shutdown complete"
573        );
574    }
575}
576
577/// Convert config load balance strategy to protocol load balance strategy.
578fn convert_lb_strategy(strategy: LoadBalanceStrategy) -> ProtocolLBStrategy {
579    match strategy {
580        LoadBalanceStrategy::RoundRobin => ProtocolLBStrategy::RoundRobin,
581        LoadBalanceStrategy::LeastConnections => ProtocolLBStrategy::LeastConnections,
582        LoadBalanceStrategy::HealthBased => ProtocolLBStrategy::HealthBased,
583        LoadBalanceStrategy::Random => ProtocolLBStrategy::Random,
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590
591    #[test]
592    fn test_convert_lb_strategy() {
593        assert_eq!(
594            convert_lb_strategy(LoadBalanceStrategy::RoundRobin),
595            ProtocolLBStrategy::RoundRobin
596        );
597        assert_eq!(
598            convert_lb_strategy(LoadBalanceStrategy::LeastConnections),
599            ProtocolLBStrategy::LeastConnections
600        );
601        assert_eq!(
602            convert_lb_strategy(LoadBalanceStrategy::HealthBased),
603            ProtocolLBStrategy::HealthBased
604        );
605        assert_eq!(
606            convert_lb_strategy(LoadBalanceStrategy::Random),
607            ProtocolLBStrategy::Random
608        );
609    }
610}