sentinel_proxy/agents/
agent_v2.rs

1//! Protocol v2 agent implementation.
2//!
3//! This module provides v2 agent support using the bidirectional streaming
4//! protocol with capabilities, health reporting, and metrics export.
5
6use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use sentinel_agent_protocol::v2::{
11    AgentCapabilities, AgentPool, AgentPoolConfig as ProtocolPoolConfig,
12    AgentPoolStats, CancelReason, ConfigPusher, ConfigUpdateType,
13    LoadBalanceStrategy as ProtocolLBStrategy, MetricsCollector,
14};
15use sentinel_agent_protocol::{
16    AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent,
17    ResponseBodyChunkEvent, ResponseHeadersEvent,
18};
19use sentinel_common::{
20    errors::{SentinelError, SentinelResult},
21    CircuitBreaker,
22};
23use sentinel_config::{AgentConfig, AgentEvent, FailureMode, LoadBalanceStrategy};
24use tracing::{debug, error, info, trace, warn};
25
26use super::metrics::AgentMetrics;
27
28/// Sentinel value indicating no timestamp recorded
29const NO_TIMESTAMP: u64 = 0;
30
31/// Protocol v2 agent with connection pooling and bidirectional streaming.
32pub struct AgentV2 {
33    /// Agent configuration
34    config: AgentConfig,
35    /// V2 connection pool
36    pool: Arc<AgentPool>,
37    /// Circuit breaker
38    circuit_breaker: Arc<CircuitBreaker>,
39    /// Agent-specific metrics
40    metrics: Arc<AgentMetrics>,
41    /// Base instant for timestamp calculations
42    base_instant: Instant,
43    /// Last successful call (nanoseconds since base_instant, 0 = never)
44    last_success_ns: AtomicU64,
45    /// Consecutive failures
46    consecutive_failures: AtomicU32,
47}
48
49impl AgentV2 {
50    /// Create a new v2 agent.
51    pub fn new(
52        config: AgentConfig,
53        circuit_breaker: Arc<CircuitBreaker>,
54    ) -> Self {
55        trace!(
56            agent_id = %config.id,
57            agent_type = ?config.agent_type,
58            timeout_ms = config.timeout_ms,
59            events = ?config.events,
60            "Creating v2 agent instance"
61        );
62
63        // Convert config pool settings to protocol pool config
64        let pool_config = config.pool.as_ref().map(|p| ProtocolPoolConfig {
65            connections_per_agent: p.connections_per_agent,
66            load_balance_strategy: convert_lb_strategy(p.load_balance_strategy),
67            connect_timeout: Duration::from_millis(p.connect_timeout_ms),
68            request_timeout: Duration::from_millis(config.timeout_ms),
69            reconnect_interval: Duration::from_millis(p.reconnect_interval_ms),
70            max_reconnect_attempts: p.max_reconnect_attempts,
71            drain_timeout: Duration::from_millis(p.drain_timeout_ms),
72            max_concurrent_per_connection: p.max_concurrent_per_connection,
73            health_check_interval: Duration::from_millis(p.health_check_interval_ms),
74        }).unwrap_or_default();
75
76        let pool = Arc::new(AgentPool::with_config(pool_config));
77
78        Self {
79            config,
80            pool,
81            circuit_breaker,
82            metrics: Arc::new(AgentMetrics::default()),
83            base_instant: Instant::now(),
84            last_success_ns: AtomicU64::new(NO_TIMESTAMP),
85            consecutive_failures: AtomicU32::new(0),
86        }
87    }
88
89    /// Get the agent ID.
90    pub fn id(&self) -> &str {
91        &self.config.id
92    }
93
94    /// Get the agent's circuit breaker.
95    pub fn circuit_breaker(&self) -> &CircuitBreaker {
96        &self.circuit_breaker
97    }
98
99    /// Get the agent's failure mode.
100    pub fn failure_mode(&self) -> FailureMode {
101        self.config.failure_mode
102    }
103
104    /// Get the agent's timeout in milliseconds.
105    pub fn timeout_ms(&self) -> u64 {
106        self.config.timeout_ms
107    }
108
109    /// Get the agent's metrics.
110    pub fn metrics(&self) -> &AgentMetrics {
111        &self.metrics
112    }
113
114    /// Check if agent handles a specific event type.
115    pub fn handles_event(&self, event_type: EventType) -> bool {
116        self.config.events.iter().any(|e| match (e, event_type) {
117            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
118            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
119            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
120            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
121            (AgentEvent::Log, EventType::RequestComplete) => true,
122            (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
123            _ => false,
124        })
125    }
126
127    /// Initialize agent connection(s).
128    pub async fn initialize(&self) -> SentinelResult<()> {
129        let endpoint = self.get_endpoint()?;
130
131        debug!(
132            agent_id = %self.config.id,
133            endpoint = %endpoint,
134            "Initializing v2 agent pool"
135        );
136
137        let start = Instant::now();
138
139        // Add agent to pool - pool will establish connections
140        self.pool.add_agent(&self.config.id, &endpoint).await
141            .map_err(|e| {
142                error!(
143                    agent_id = %self.config.id,
144                    endpoint = %endpoint,
145                    error = %e,
146                    "Failed to add agent to v2 pool"
147                );
148                SentinelError::Agent {
149                    agent: self.config.id.clone(),
150                    message: format!("Failed to initialize v2 agent: {}", e),
151                    event: "initialize".to_string(),
152                    source: None,
153                }
154            })?;
155
156        info!(
157            agent_id = %self.config.id,
158            endpoint = %endpoint,
159            connect_time_ms = start.elapsed().as_millis(),
160            "V2 agent pool initialized"
161        );
162
163        // Send configuration if present
164        if let Some(config_value) = &self.config.config {
165            self.send_configure(config_value.clone()).await?;
166        }
167
168        Ok(())
169    }
170
171    /// Get endpoint from transport config.
172    fn get_endpoint(&self) -> SentinelResult<String> {
173        use sentinel_config::AgentTransport;
174        match &self.config.transport {
175            AgentTransport::Grpc { address, .. } => Ok(address.clone()),
176            AgentTransport::UnixSocket { path } => {
177                // For UDS, format as unix:path
178                Ok(format!("unix:{}", path.display()))
179            }
180            AgentTransport::Http { url, .. } => {
181                // V2 doesn't support HTTP transport
182                Err(SentinelError::Agent {
183                    agent: self.config.id.clone(),
184                    message: "HTTP transport not supported for v2 protocol".to_string(),
185                    event: "initialize".to_string(),
186                    source: None,
187                })
188            }
189        }
190    }
191
192    /// Send configuration to the agent.
193    ///
194    /// Note: Configuration is sent through the control stream when connections
195    /// are established. This is a placeholder for explicit config updates.
196    async fn send_configure(&self, _config: serde_json::Value) -> SentinelResult<()> {
197        debug!(
198            agent_id = %self.config.id,
199            "Configuration will be sent through control stream on connection"
200        );
201
202        // Configuration is handled by the pool's connections during initialization
203        // For explicit config updates, we'd need to iterate through connections
204        // and send configure through their control streams
205
206        info!(
207            agent_id = %self.config.id,
208            "V2 agent configuration noted"
209        );
210
211        Ok(())
212    }
213
214    /// Call agent with request headers event.
215    pub async fn call_request_headers(
216        &self,
217        event: &RequestHeadersEvent,
218    ) -> SentinelResult<AgentResponse> {
219        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
220
221        // Get correlation_id from event metadata
222        let correlation_id = &event.metadata.correlation_id;
223
224        trace!(
225            agent_id = %self.config.id,
226            call_num = call_num,
227            correlation_id = %correlation_id,
228            "Sending request headers to v2 agent"
229        );
230
231        self.pool
232            .send_request_headers(&self.config.id, correlation_id, event)
233            .await
234            .map_err(|e| {
235                error!(
236                    agent_id = %self.config.id,
237                    correlation_id = %correlation_id,
238                    error = %e,
239                    "V2 agent request headers call failed"
240                );
241                SentinelError::Agent {
242                    agent: self.config.id.clone(),
243                    message: e.to_string(),
244                    event: "request_headers".to_string(),
245                    source: None,
246                }
247            })
248    }
249
250    /// Call agent with request body chunk event.
251    ///
252    /// For streaming body inspection, chunks are sent sequentially with
253    /// increasing `chunk_index`. The agent responds after processing each chunk.
254    pub async fn call_request_body_chunk(
255        &self,
256        event: &RequestBodyChunkEvent,
257    ) -> SentinelResult<AgentResponse> {
258        let correlation_id = &event.correlation_id;
259
260        trace!(
261            agent_id = %self.config.id,
262            correlation_id = %correlation_id,
263            chunk_index = event.chunk_index,
264            is_last = event.is_last,
265            "Sending request body chunk to v2 agent"
266        );
267
268        self.pool
269            .send_request_body_chunk(&self.config.id, correlation_id, event)
270            .await
271            .map_err(|e| {
272                error!(
273                    agent_id = %self.config.id,
274                    correlation_id = %correlation_id,
275                    error = %e,
276                    "V2 agent request body chunk call failed"
277                );
278                SentinelError::Agent {
279                    agent: self.config.id.clone(),
280                    message: e.to_string(),
281                    event: "request_body_chunk".to_string(),
282                    source: None,
283                }
284            })
285    }
286
287    /// Call agent with response headers event.
288    ///
289    /// Called when upstream response headers are received, allowing the agent
290    /// to inspect/modify response headers before they're sent to the client.
291    pub async fn call_response_headers(
292        &self,
293        event: &ResponseHeadersEvent,
294    ) -> SentinelResult<AgentResponse> {
295        let correlation_id = &event.correlation_id;
296
297        trace!(
298            agent_id = %self.config.id,
299            correlation_id = %correlation_id,
300            status = event.status,
301            "Sending response headers to v2 agent"
302        );
303
304        self.pool
305            .send_response_headers(&self.config.id, correlation_id, event)
306            .await
307            .map_err(|e| {
308                error!(
309                    agent_id = %self.config.id,
310                    correlation_id = %correlation_id,
311                    error = %e,
312                    "V2 agent response headers call failed"
313                );
314                SentinelError::Agent {
315                    agent: self.config.id.clone(),
316                    message: e.to_string(),
317                    event: "response_headers".to_string(),
318                    source: None,
319                }
320            })
321    }
322
323    /// Call agent with response body chunk event.
324    ///
325    /// For streaming response body inspection, chunks are sent sequentially.
326    /// The agent can inspect and optionally modify response body data.
327    pub async fn call_response_body_chunk(
328        &self,
329        event: &ResponseBodyChunkEvent,
330    ) -> SentinelResult<AgentResponse> {
331        let correlation_id = &event.correlation_id;
332
333        trace!(
334            agent_id = %self.config.id,
335            correlation_id = %correlation_id,
336            chunk_index = event.chunk_index,
337            is_last = event.is_last,
338            "Sending response body chunk to v2 agent"
339        );
340
341        self.pool
342            .send_response_body_chunk(&self.config.id, correlation_id, event)
343            .await
344            .map_err(|e| {
345                error!(
346                    agent_id = %self.config.id,
347                    correlation_id = %correlation_id,
348                    error = %e,
349                    "V2 agent response body chunk call failed"
350                );
351                SentinelError::Agent {
352                    agent: self.config.id.clone(),
353                    message: e.to_string(),
354                    event: "response_body_chunk".to_string(),
355                    source: None,
356                }
357            })
358    }
359
360    /// Cancel an in-flight request.
361    pub async fn cancel_request(
362        &self,
363        correlation_id: &str,
364        reason: CancelReason,
365    ) -> SentinelResult<()> {
366        trace!(
367            agent_id = %self.config.id,
368            correlation_id = %correlation_id,
369            reason = ?reason,
370            "Cancelling request on v2 agent"
371        );
372
373        self.pool
374            .cancel_request(&self.config.id, correlation_id, reason)
375            .await
376            .map_err(|e| {
377                warn!(
378                    agent_id = %self.config.id,
379                    correlation_id = %correlation_id,
380                    error = %e,
381                    "Failed to cancel request on v2 agent"
382                );
383                SentinelError::Agent {
384                    agent: self.config.id.clone(),
385                    message: format!("Cancel failed: {}", e),
386                    event: "cancel".to_string(),
387                    source: None,
388                }
389            })
390    }
391
392    /// Get agent capabilities.
393    pub async fn capabilities(&self) -> Option<AgentCapabilities> {
394        self.pool.agent_capabilities(&self.config.id).await
395    }
396
397    /// Check if agent is healthy.
398    pub async fn is_healthy(&self) -> bool {
399        self.pool.is_agent_healthy(&self.config.id).await
400    }
401
402    /// Record successful call (lock-free).
403    pub fn record_success(&self, duration: Duration) {
404        let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
405        self.metrics
406            .duration_total_us
407            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
408        self.consecutive_failures.store(0, Ordering::Relaxed);
409        self.last_success_ns.store(
410            self.base_instant.elapsed().as_nanos() as u64,
411            Ordering::Relaxed,
412        );
413
414        trace!(
415            agent_id = %self.config.id,
416            duration_ms = duration.as_millis(),
417            total_successes = success_count,
418            "Recorded v2 agent call success"
419        );
420
421        self.circuit_breaker.record_success();
422    }
423
424    /// Get the time since last successful call.
425    #[inline]
426    pub fn time_since_last_success(&self) -> Option<Duration> {
427        let last_ns = self.last_success_ns.load(Ordering::Relaxed);
428        if last_ns == NO_TIMESTAMP {
429            return None;
430        }
431        let current_ns = self.base_instant.elapsed().as_nanos() as u64;
432        Some(Duration::from_nanos(current_ns.saturating_sub(last_ns)))
433    }
434
435    /// Record failed call.
436    pub fn record_failure(&self) {
437        let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
438        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
439
440        debug!(
441            agent_id = %self.config.id,
442            total_failures = fail_count,
443            consecutive_failures = consecutive,
444            "Recorded v2 agent call failure"
445        );
446
447        self.circuit_breaker.record_failure();
448    }
449
450    /// Record timeout.
451    pub fn record_timeout(&self) {
452        let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
453        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
454
455        debug!(
456            agent_id = %self.config.id,
457            total_timeouts = timeout_count,
458            consecutive_failures = consecutive,
459            timeout_ms = self.config.timeout_ms,
460            "Recorded v2 agent call timeout"
461        );
462
463        self.circuit_breaker.record_failure();
464    }
465
466    /// Get pool statistics.
467    pub async fn pool_stats(&self) -> Option<AgentPoolStats> {
468        self.pool.agent_stats(&self.config.id).await
469    }
470
471    /// Get the pool's metrics collector.
472    ///
473    /// Returns a reference to the shared metrics collector that aggregates
474    /// metrics reports from all agents in this pool.
475    pub fn pool_metrics_collector(&self) -> &MetricsCollector {
476        self.pool.metrics_collector()
477    }
478
479    /// Get an Arc to the pool's metrics collector.
480    ///
481    /// This is useful for registering the collector with a MetricsManager.
482    pub fn pool_metrics_collector_arc(&self) -> Arc<MetricsCollector> {
483        self.pool.metrics_collector_arc()
484    }
485
486    /// Export agent metrics in Prometheus format.
487    ///
488    /// Returns a string containing all metrics collected from agents
489    /// in Prometheus exposition format.
490    pub fn export_prometheus(&self) -> String {
491        self.pool.export_prometheus()
492    }
493
494    /// Get the pool's config pusher.
495    ///
496    /// Returns a reference to the shared config pusher that distributes
497    /// configuration updates to agents.
498    pub fn config_pusher(&self) -> &ConfigPusher {
499        self.pool.config_pusher()
500    }
501
502    /// Push a configuration update to this agent.
503    ///
504    /// Returns the push ID if the agent supports config push, None otherwise.
505    pub fn push_config(&self, update_type: ConfigUpdateType) -> Option<String> {
506        self.pool.push_config_to_agent(&self.config.id, update_type)
507    }
508
509    /// Send a configuration update to this agent via the control stream.
510    ///
511    /// This is a direct config push using the `ConfigureEvent` message.
512    pub async fn send_configuration(&self, config: serde_json::Value) -> SentinelResult<()> {
513        // Get a connection and send the configure event
514        // For now, we rely on the pool's config push mechanism
515        // which tracks acknowledgments and retries
516        if let Some(push_id) = self.push_config(ConfigUpdateType::RequestReload) {
517            debug!(
518                agent_id = %self.config.id,
519                push_id = %push_id,
520                "Configuration push initiated"
521            );
522            Ok(())
523        } else {
524            warn!(
525                agent_id = %self.config.id,
526                "Agent does not support config push"
527            );
528            Err(SentinelError::Agent {
529                agent: self.config.id.clone(),
530                message: "Agent does not support config push".to_string(),
531                event: "send_configuration".to_string(),
532                source: None,
533            })
534        }
535    }
536
537    /// Shutdown agent.
538    ///
539    /// This removes the agent from the pool and closes all connections.
540    pub async fn shutdown(&self) {
541        debug!(
542            agent_id = %self.config.id,
543            "Shutting down v2 agent"
544        );
545
546        // Remove from pool - this gracefully closes connections
547        if let Err(e) = self.pool.remove_agent(&self.config.id).await {
548            warn!(
549                agent_id = %self.config.id,
550                error = %e,
551                "Error removing agent from pool during shutdown"
552            );
553        }
554
555        let stats = (
556            self.metrics.calls_total.load(Ordering::Relaxed),
557            self.metrics.calls_success.load(Ordering::Relaxed),
558            self.metrics.calls_failed.load(Ordering::Relaxed),
559            self.metrics.calls_timeout.load(Ordering::Relaxed),
560        );
561
562        info!(
563            agent_id = %self.config.id,
564            total_calls = stats.0,
565            successes = stats.1,
566            failures = stats.2,
567            timeouts = stats.3,
568            "V2 agent shutdown complete"
569        );
570    }
571}
572
573/// Convert config load balance strategy to protocol load balance strategy.
574fn convert_lb_strategy(strategy: LoadBalanceStrategy) -> ProtocolLBStrategy {
575    match strategy {
576        LoadBalanceStrategy::RoundRobin => ProtocolLBStrategy::RoundRobin,
577        LoadBalanceStrategy::LeastConnections => ProtocolLBStrategy::LeastConnections,
578        LoadBalanceStrategy::HealthBased => ProtocolLBStrategy::HealthBased,
579        LoadBalanceStrategy::Random => ProtocolLBStrategy::Random,
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586
587    #[test]
588    fn test_convert_lb_strategy() {
589        assert_eq!(
590            convert_lb_strategy(LoadBalanceStrategy::RoundRobin),
591            ProtocolLBStrategy::RoundRobin
592        );
593        assert_eq!(
594            convert_lb_strategy(LoadBalanceStrategy::LeastConnections),
595            ProtocolLBStrategy::LeastConnections
596        );
597        assert_eq!(
598            convert_lb_strategy(LoadBalanceStrategy::HealthBased),
599            ProtocolLBStrategy::HealthBased
600        );
601        assert_eq!(
602            convert_lb_strategy(LoadBalanceStrategy::Random),
603            ProtocolLBStrategy::Random
604        );
605    }
606}