sentinel_proxy/agents/
agent_v2.rs

1//! Protocol v2 agent implementation.
2//!
3//! This module provides v2 agent support using the bidirectional streaming
4//! protocol with capabilities, health reporting, and metrics export.
5
6use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use sentinel_agent_protocol::v2::{
11    AgentCapabilities, AgentPool, AgentPoolConfig as ProtocolPoolConfig,
12    AgentPoolStats, CancelReason, ConfigPusher, ConfigUpdateType,
13    LoadBalanceStrategy as ProtocolLBStrategy, MetricsCollector,
14};
15use sentinel_agent_protocol::{
16    AgentResponse, EventType, RequestBodyChunkEvent, RequestHeadersEvent,
17    ResponseBodyChunkEvent, ResponseHeadersEvent,
18};
19use sentinel_common::{
20    errors::{SentinelError, SentinelResult},
21    CircuitBreaker,
22};
23use sentinel_config::{AgentConfig, AgentEvent, FailureMode, LoadBalanceStrategy};
24use tracing::{debug, error, info, trace, warn};
25
26use super::metrics::AgentMetrics;
27
28/// Sentinel value indicating no timestamp recorded
29const NO_TIMESTAMP: u64 = 0;
30
31/// Protocol v2 agent with connection pooling and bidirectional streaming.
32pub struct AgentV2 {
33    /// Agent configuration
34    config: AgentConfig,
35    /// V2 connection pool
36    pool: Arc<AgentPool>,
37    /// Circuit breaker
38    circuit_breaker: Arc<CircuitBreaker>,
39    /// Agent-specific metrics
40    metrics: Arc<AgentMetrics>,
41    /// Base instant for timestamp calculations
42    base_instant: Instant,
43    /// Last successful call (nanoseconds since base_instant, 0 = never)
44    last_success_ns: AtomicU64,
45    /// Consecutive failures
46    consecutive_failures: AtomicU32,
47}
48
49impl AgentV2 {
50    /// Create a new v2 agent.
51    pub fn new(
52        config: AgentConfig,
53        circuit_breaker: Arc<CircuitBreaker>,
54    ) -> Self {
55        trace!(
56            agent_id = %config.id,
57            agent_type = ?config.agent_type,
58            timeout_ms = config.timeout_ms,
59            events = ?config.events,
60            "Creating v2 agent instance"
61        );
62
63        // Convert config pool settings to protocol pool config
64        let pool_config = config.pool.as_ref().map(|p| ProtocolPoolConfig {
65            connections_per_agent: p.connections_per_agent,
66            load_balance_strategy: convert_lb_strategy(p.load_balance_strategy),
67            connect_timeout: Duration::from_millis(p.connect_timeout_ms),
68            request_timeout: Duration::from_millis(config.timeout_ms),
69            reconnect_interval: Duration::from_millis(p.reconnect_interval_ms),
70            max_reconnect_attempts: p.max_reconnect_attempts,
71            drain_timeout: Duration::from_millis(p.drain_timeout_ms),
72            max_concurrent_per_connection: p.max_concurrent_per_connection,
73            health_check_interval: Duration::from_millis(p.health_check_interval_ms),
74            ..Default::default()
75        }).unwrap_or_default();
76
77        let pool = Arc::new(AgentPool::with_config(pool_config));
78
79        Self {
80            config,
81            pool,
82            circuit_breaker,
83            metrics: Arc::new(AgentMetrics::default()),
84            base_instant: Instant::now(),
85            last_success_ns: AtomicU64::new(NO_TIMESTAMP),
86            consecutive_failures: AtomicU32::new(0),
87        }
88    }
89
90    /// Get the agent ID.
91    pub fn id(&self) -> &str {
92        &self.config.id
93    }
94
95    /// Get the agent's circuit breaker.
96    pub fn circuit_breaker(&self) -> &CircuitBreaker {
97        &self.circuit_breaker
98    }
99
100    /// Get the agent's failure mode.
101    pub fn failure_mode(&self) -> FailureMode {
102        self.config.failure_mode
103    }
104
105    /// Get the agent's timeout in milliseconds.
106    pub fn timeout_ms(&self) -> u64 {
107        self.config.timeout_ms
108    }
109
110    /// Get the agent's metrics.
111    pub fn metrics(&self) -> &AgentMetrics {
112        &self.metrics
113    }
114
115    /// Check if agent handles a specific event type.
116    pub fn handles_event(&self, event_type: EventType) -> bool {
117        self.config.events.iter().any(|e| match (e, event_type) {
118            (AgentEvent::RequestHeaders, EventType::RequestHeaders) => true,
119            (AgentEvent::RequestBody, EventType::RequestBodyChunk) => true,
120            (AgentEvent::ResponseHeaders, EventType::ResponseHeaders) => true,
121            (AgentEvent::ResponseBody, EventType::ResponseBodyChunk) => true,
122            (AgentEvent::Log, EventType::RequestComplete) => true,
123            (AgentEvent::WebSocketFrame, EventType::WebSocketFrame) => true,
124            _ => false,
125        })
126    }
127
128    /// Initialize agent connection(s).
129    pub async fn initialize(&self) -> SentinelResult<()> {
130        let endpoint = self.get_endpoint()?;
131
132        debug!(
133            agent_id = %self.config.id,
134            endpoint = %endpoint,
135            "Initializing v2 agent pool"
136        );
137
138        let start = Instant::now();
139
140        // Add agent to pool - pool will establish connections
141        self.pool.add_agent(&self.config.id, &endpoint).await
142            .map_err(|e| {
143                error!(
144                    agent_id = %self.config.id,
145                    endpoint = %endpoint,
146                    error = %e,
147                    "Failed to add agent to v2 pool"
148                );
149                SentinelError::Agent {
150                    agent: self.config.id.clone(),
151                    message: format!("Failed to initialize v2 agent: {}", e),
152                    event: "initialize".to_string(),
153                    source: None,
154                }
155            })?;
156
157        info!(
158            agent_id = %self.config.id,
159            endpoint = %endpoint,
160            connect_time_ms = start.elapsed().as_millis(),
161            "V2 agent pool initialized"
162        );
163
164        // Send configuration if present
165        if let Some(config_value) = &self.config.config {
166            self.send_configure(config_value.clone()).await?;
167        }
168
169        Ok(())
170    }
171
172    /// Get endpoint from transport config.
173    fn get_endpoint(&self) -> SentinelResult<String> {
174        use sentinel_config::AgentTransport;
175        match &self.config.transport {
176            AgentTransport::Grpc { address, .. } => Ok(address.clone()),
177            AgentTransport::UnixSocket { path } => {
178                // For UDS, format as unix:path
179                Ok(format!("unix:{}", path.display()))
180            }
181            AgentTransport::Http { url, .. } => {
182                // V2 doesn't support HTTP transport
183                Err(SentinelError::Agent {
184                    agent: self.config.id.clone(),
185                    message: "HTTP transport not supported for v2 protocol".to_string(),
186                    event: "initialize".to_string(),
187                    source: None,
188                })
189            }
190        }
191    }
192
193    /// Send configuration to the agent.
194    ///
195    /// Note: Configuration is sent through the control stream when connections
196    /// are established. This is a placeholder for explicit config updates.
197    async fn send_configure(&self, _config: serde_json::Value) -> SentinelResult<()> {
198        debug!(
199            agent_id = %self.config.id,
200            "Configuration will be sent through control stream on connection"
201        );
202
203        // Configuration is handled by the pool's connections during initialization
204        // For explicit config updates, we'd need to iterate through connections
205        // and send configure through their control streams
206
207        info!(
208            agent_id = %self.config.id,
209            "V2 agent configuration noted"
210        );
211
212        Ok(())
213    }
214
215    /// Call agent with request headers event.
216    pub async fn call_request_headers(
217        &self,
218        event: &RequestHeadersEvent,
219    ) -> SentinelResult<AgentResponse> {
220        let call_num = self.metrics.calls_total.fetch_add(1, Ordering::Relaxed) + 1;
221
222        // Get correlation_id from event metadata
223        let correlation_id = &event.metadata.correlation_id;
224
225        trace!(
226            agent_id = %self.config.id,
227            call_num = call_num,
228            correlation_id = %correlation_id,
229            "Sending request headers to v2 agent"
230        );
231
232        self.pool
233            .send_request_headers(&self.config.id, correlation_id, event)
234            .await
235            .map_err(|e| {
236                error!(
237                    agent_id = %self.config.id,
238                    correlation_id = %correlation_id,
239                    error = %e,
240                    "V2 agent request headers call failed"
241                );
242                SentinelError::Agent {
243                    agent: self.config.id.clone(),
244                    message: e.to_string(),
245                    event: "request_headers".to_string(),
246                    source: None,
247                }
248            })
249    }
250
251    /// Call agent with request body chunk event.
252    ///
253    /// For streaming body inspection, chunks are sent sequentially with
254    /// increasing `chunk_index`. The agent responds after processing each chunk.
255    pub async fn call_request_body_chunk(
256        &self,
257        event: &RequestBodyChunkEvent,
258    ) -> SentinelResult<AgentResponse> {
259        let correlation_id = &event.correlation_id;
260
261        trace!(
262            agent_id = %self.config.id,
263            correlation_id = %correlation_id,
264            chunk_index = event.chunk_index,
265            is_last = event.is_last,
266            "Sending request body chunk to v2 agent"
267        );
268
269        self.pool
270            .send_request_body_chunk(&self.config.id, correlation_id, event)
271            .await
272            .map_err(|e| {
273                error!(
274                    agent_id = %self.config.id,
275                    correlation_id = %correlation_id,
276                    error = %e,
277                    "V2 agent request body chunk call failed"
278                );
279                SentinelError::Agent {
280                    agent: self.config.id.clone(),
281                    message: e.to_string(),
282                    event: "request_body_chunk".to_string(),
283                    source: None,
284                }
285            })
286    }
287
288    /// Call agent with response headers event.
289    ///
290    /// Called when upstream response headers are received, allowing the agent
291    /// to inspect/modify response headers before they're sent to the client.
292    pub async fn call_response_headers(
293        &self,
294        event: &ResponseHeadersEvent,
295    ) -> SentinelResult<AgentResponse> {
296        let correlation_id = &event.correlation_id;
297
298        trace!(
299            agent_id = %self.config.id,
300            correlation_id = %correlation_id,
301            status = event.status,
302            "Sending response headers to v2 agent"
303        );
304
305        self.pool
306            .send_response_headers(&self.config.id, correlation_id, event)
307            .await
308            .map_err(|e| {
309                error!(
310                    agent_id = %self.config.id,
311                    correlation_id = %correlation_id,
312                    error = %e,
313                    "V2 agent response headers call failed"
314                );
315                SentinelError::Agent {
316                    agent: self.config.id.clone(),
317                    message: e.to_string(),
318                    event: "response_headers".to_string(),
319                    source: None,
320                }
321            })
322    }
323
324    /// Call agent with response body chunk event.
325    ///
326    /// For streaming response body inspection, chunks are sent sequentially.
327    /// The agent can inspect and optionally modify response body data.
328    pub async fn call_response_body_chunk(
329        &self,
330        event: &ResponseBodyChunkEvent,
331    ) -> SentinelResult<AgentResponse> {
332        let correlation_id = &event.correlation_id;
333
334        trace!(
335            agent_id = %self.config.id,
336            correlation_id = %correlation_id,
337            chunk_index = event.chunk_index,
338            is_last = event.is_last,
339            "Sending response body chunk to v2 agent"
340        );
341
342        self.pool
343            .send_response_body_chunk(&self.config.id, correlation_id, event)
344            .await
345            .map_err(|e| {
346                error!(
347                    agent_id = %self.config.id,
348                    correlation_id = %correlation_id,
349                    error = %e,
350                    "V2 agent response body chunk call failed"
351                );
352                SentinelError::Agent {
353                    agent: self.config.id.clone(),
354                    message: e.to_string(),
355                    event: "response_body_chunk".to_string(),
356                    source: None,
357                }
358            })
359    }
360
361    /// Cancel an in-flight request.
362    pub async fn cancel_request(
363        &self,
364        correlation_id: &str,
365        reason: CancelReason,
366    ) -> SentinelResult<()> {
367        trace!(
368            agent_id = %self.config.id,
369            correlation_id = %correlation_id,
370            reason = ?reason,
371            "Cancelling request on v2 agent"
372        );
373
374        self.pool
375            .cancel_request(&self.config.id, correlation_id, reason)
376            .await
377            .map_err(|e| {
378                warn!(
379                    agent_id = %self.config.id,
380                    correlation_id = %correlation_id,
381                    error = %e,
382                    "Failed to cancel request on v2 agent"
383                );
384                SentinelError::Agent {
385                    agent: self.config.id.clone(),
386                    message: format!("Cancel failed: {}", e),
387                    event: "cancel".to_string(),
388                    source: None,
389                }
390            })
391    }
392
393    /// Get agent capabilities.
394    pub async fn capabilities(&self) -> Option<AgentCapabilities> {
395        self.pool.agent_capabilities(&self.config.id).await
396    }
397
398    /// Check if agent is healthy.
399    pub async fn is_healthy(&self) -> bool {
400        self.pool.is_agent_healthy(&self.config.id)
401    }
402
403    /// Record successful call (lock-free).
404    pub fn record_success(&self, duration: Duration) {
405        let success_count = self.metrics.calls_success.fetch_add(1, Ordering::Relaxed) + 1;
406        self.metrics
407            .duration_total_us
408            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
409        self.consecutive_failures.store(0, Ordering::Relaxed);
410        self.last_success_ns.store(
411            self.base_instant.elapsed().as_nanos() as u64,
412            Ordering::Relaxed,
413        );
414
415        trace!(
416            agent_id = %self.config.id,
417            duration_ms = duration.as_millis(),
418            total_successes = success_count,
419            "Recorded v2 agent call success"
420        );
421
422        self.circuit_breaker.record_success();
423    }
424
425    /// Get the time since last successful call.
426    #[inline]
427    pub fn time_since_last_success(&self) -> Option<Duration> {
428        let last_ns = self.last_success_ns.load(Ordering::Relaxed);
429        if last_ns == NO_TIMESTAMP {
430            return None;
431        }
432        let current_ns = self.base_instant.elapsed().as_nanos() as u64;
433        Some(Duration::from_nanos(current_ns.saturating_sub(last_ns)))
434    }
435
436    /// Record failed call.
437    pub fn record_failure(&self) {
438        let fail_count = self.metrics.calls_failed.fetch_add(1, Ordering::Relaxed) + 1;
439        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
440
441        debug!(
442            agent_id = %self.config.id,
443            total_failures = fail_count,
444            consecutive_failures = consecutive,
445            "Recorded v2 agent call failure"
446        );
447
448        self.circuit_breaker.record_failure();
449    }
450
451    /// Record timeout.
452    pub fn record_timeout(&self) {
453        let timeout_count = self.metrics.calls_timeout.fetch_add(1, Ordering::Relaxed) + 1;
454        let consecutive = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
455
456        debug!(
457            agent_id = %self.config.id,
458            total_timeouts = timeout_count,
459            consecutive_failures = consecutive,
460            timeout_ms = self.config.timeout_ms,
461            "Recorded v2 agent call timeout"
462        );
463
464        self.circuit_breaker.record_failure();
465    }
466
467    /// Get pool statistics.
468    pub async fn pool_stats(&self) -> Option<AgentPoolStats> {
469        self.pool.agent_stats(&self.config.id).await
470    }
471
472    /// Get the pool's metrics collector.
473    ///
474    /// Returns a reference to the shared metrics collector that aggregates
475    /// metrics reports from all agents in this pool.
476    pub fn pool_metrics_collector(&self) -> &MetricsCollector {
477        self.pool.metrics_collector()
478    }
479
480    /// Get an Arc to the pool's metrics collector.
481    ///
482    /// This is useful for registering the collector with a MetricsManager.
483    pub fn pool_metrics_collector_arc(&self) -> Arc<MetricsCollector> {
484        self.pool.metrics_collector_arc()
485    }
486
487    /// Export agent metrics in Prometheus format.
488    ///
489    /// Returns a string containing all metrics collected from agents
490    /// in Prometheus exposition format.
491    pub fn export_prometheus(&self) -> String {
492        self.pool.export_prometheus()
493    }
494
495    /// Get the pool's config pusher.
496    ///
497    /// Returns a reference to the shared config pusher that distributes
498    /// configuration updates to agents.
499    pub fn config_pusher(&self) -> &ConfigPusher {
500        self.pool.config_pusher()
501    }
502
503    /// Push a configuration update to this agent.
504    ///
505    /// Returns the push ID if the agent supports config push, None otherwise.
506    pub fn push_config(&self, update_type: ConfigUpdateType) -> Option<String> {
507        self.pool.push_config_to_agent(&self.config.id, update_type)
508    }
509
510    /// Send a configuration update to this agent via the control stream.
511    ///
512    /// This is a direct config push using the `ConfigureEvent` message.
513    pub async fn send_configuration(&self, config: serde_json::Value) -> SentinelResult<()> {
514        // Get a connection and send the configure event
515        // For now, we rely on the pool's config push mechanism
516        // which tracks acknowledgments and retries
517        if let Some(push_id) = self.push_config(ConfigUpdateType::RequestReload) {
518            debug!(
519                agent_id = %self.config.id,
520                push_id = %push_id,
521                "Configuration push initiated"
522            );
523            Ok(())
524        } else {
525            warn!(
526                agent_id = %self.config.id,
527                "Agent does not support config push"
528            );
529            Err(SentinelError::Agent {
530                agent: self.config.id.clone(),
531                message: "Agent does not support config push".to_string(),
532                event: "send_configuration".to_string(),
533                source: None,
534            })
535        }
536    }
537
538    /// Shutdown agent.
539    ///
540    /// This removes the agent from the pool and closes all connections.
541    pub async fn shutdown(&self) {
542        debug!(
543            agent_id = %self.config.id,
544            "Shutting down v2 agent"
545        );
546
547        // Remove from pool - this gracefully closes connections
548        if let Err(e) = self.pool.remove_agent(&self.config.id).await {
549            warn!(
550                agent_id = %self.config.id,
551                error = %e,
552                "Error removing agent from pool during shutdown"
553            );
554        }
555
556        let stats = (
557            self.metrics.calls_total.load(Ordering::Relaxed),
558            self.metrics.calls_success.load(Ordering::Relaxed),
559            self.metrics.calls_failed.load(Ordering::Relaxed),
560            self.metrics.calls_timeout.load(Ordering::Relaxed),
561        );
562
563        info!(
564            agent_id = %self.config.id,
565            total_calls = stats.0,
566            successes = stats.1,
567            failures = stats.2,
568            timeouts = stats.3,
569            "V2 agent shutdown complete"
570        );
571    }
572}
573
574/// Convert config load balance strategy to protocol load balance strategy.
575fn convert_lb_strategy(strategy: LoadBalanceStrategy) -> ProtocolLBStrategy {
576    match strategy {
577        LoadBalanceStrategy::RoundRobin => ProtocolLBStrategy::RoundRobin,
578        LoadBalanceStrategy::LeastConnections => ProtocolLBStrategy::LeastConnections,
579        LoadBalanceStrategy::HealthBased => ProtocolLBStrategy::HealthBased,
580        LoadBalanceStrategy::Random => ProtocolLBStrategy::Random,
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587
588    #[test]
589    fn test_convert_lb_strategy() {
590        assert_eq!(
591            convert_lb_strategy(LoadBalanceStrategy::RoundRobin),
592            ProtocolLBStrategy::RoundRobin
593        );
594        assert_eq!(
595            convert_lb_strategy(LoadBalanceStrategy::LeastConnections),
596            ProtocolLBStrategy::LeastConnections
597        );
598        assert_eq!(
599            convert_lb_strategy(LoadBalanceStrategy::HealthBased),
600            ProtocolLBStrategy::HealthBased
601        );
602        assert_eq!(
603            convert_lb_strategy(LoadBalanceStrategy::Random),
604            ProtocolLBStrategy::Random
605        );
606    }
607}