pub struct AgentPool { /* private fields */ }Expand description
Agent connection pool.
Manages multiple connections to multiple agents with load balancing, health tracking, automatic reconnection, and metrics collection.
§Performance
Uses DashMap for lock-free reads in the hot path. Agent lookup is O(1)
without contention. Connection selection uses cached health state to avoid
async I/O per request.
Implementations§
Source§impl AgentPool
impl AgentPool
Sourcepub fn with_config(config: AgentPoolConfig) -> Self
pub fn with_config(config: AgentPoolConfig) -> Self
Create a new agent pool with custom configuration.
Sourcepub fn protocol_metrics(&self) -> &ProtocolMetrics
pub fn protocol_metrics(&self) -> &ProtocolMetrics
Get the protocol metrics for accessing proxy-side instrumentation.
Sourcepub fn protocol_metrics_arc(&self) -> Arc<ProtocolMetrics>
pub fn protocol_metrics_arc(&self) -> Arc<ProtocolMetrics>
Get an Arc to the protocol metrics.
Sourcepub fn metrics_collector(&self) -> &MetricsCollector
pub fn metrics_collector(&self) -> &MetricsCollector
Get the metrics collector for accessing aggregated agent metrics.
Sourcepub fn metrics_collector_arc(&self) -> Arc<MetricsCollector>
pub fn metrics_collector_arc(&self) -> Arc<MetricsCollector>
Get an Arc to the metrics collector.
This is useful for registering the collector with a MetricsManager.
Sourcepub fn export_prometheus(&self) -> String
pub fn export_prometheus(&self) -> String
Export all agent metrics in Prometheus format.
Sourcepub fn clear_correlation_affinity(&self, correlation_id: &str)
pub fn clear_correlation_affinity(&self, correlation_id: &str)
Clear connection affinity for a correlation ID.
Call this when a request is complete (after receiving final response) to free the affinity mapping. Not strictly necessary (affinities are cheap), but good practice for long-running proxies.
Sourcepub fn correlation_affinity_count(&self) -> usize
pub fn correlation_affinity_count(&self) -> usize
Get the number of active correlation affinities.
This is useful for monitoring and debugging.
Sourcepub fn create_sticky_session(
&self,
session_id: impl Into<String>,
agent_id: &str,
) -> Result<(), AgentProtocolError>
pub fn create_sticky_session( &self, session_id: impl Into<String>, agent_id: &str, ) -> Result<(), AgentProtocolError>
Create a sticky session for a long-lived streaming connection.
Sticky sessions ensure that all requests for a given session use the same agent connection. This is essential for:
- WebSocket connections
- Server-Sent Events (SSE)
- Long-polling
- Any streaming scenario requiring connection affinity
§Arguments
session_id- A unique identifier for this session (e.g., WebSocket connection ID)agent_id- The agent to bind this session to
§Returns
Returns Ok(()) if the session was created, or an error if the agent
is not found or has no healthy connections.
§Example
// When a WebSocket is established
pool.create_sticky_session("ws-12345", "waf-agent").await?;
// All subsequent messages use the same connection
pool.send_request_with_sticky_session("ws-12345", &event).await?;
// When the WebSocket closes
pool.clear_sticky_session("ws-12345");Sourcepub fn refresh_sticky_session(&self, session_id: &str) -> bool
pub fn refresh_sticky_session(&self, session_id: &str) -> bool
Refresh a sticky session, updating its last-accessed time.
Returns true if the session exists and was refreshed, false otherwise.
Sourcepub fn has_sticky_session(&self, session_id: &str) -> bool
pub fn has_sticky_session(&self, session_id: &str) -> bool
Check if a sticky session exists and is valid.
Sourcepub fn clear_sticky_session(&self, session_id: &str)
pub fn clear_sticky_session(&self, session_id: &str)
Clear a sticky session.
Call this when a long-lived stream ends (WebSocket closed, SSE ended, etc.)
Sourcepub fn sticky_session_count(&self) -> usize
pub fn sticky_session_count(&self) -> usize
Get the number of active sticky sessions.
Useful for monitoring and debugging.
Sourcepub fn sticky_session_agent(&self, session_id: &str) -> Option<String>
pub fn sticky_session_agent(&self, session_id: &str) -> Option<String>
Get the agent ID bound to a sticky session.
Sourcepub async fn send_request_headers_with_sticky_session(
&self,
session_id: &str,
agent_id: &str,
correlation_id: &str,
event: &RequestHeadersEvent,
) -> Result<(AgentResponse, bool), AgentProtocolError>
pub async fn send_request_headers_with_sticky_session( &self, session_id: &str, agent_id: &str, correlation_id: &str, event: &RequestHeadersEvent, ) -> Result<(AgentResponse, bool), AgentProtocolError>
Send a request using a sticky session.
If the session exists and is valid, uses the bound connection. If the session doesn’t exist or has expired, falls back to normal connection selection.
§Returns
A tuple of (response, used_sticky_session).
Sourcepub fn cleanup_expired_sessions(&self) -> usize
pub fn cleanup_expired_sessions(&self) -> usize
Clean up expired sticky sessions.
Called automatically by the maintenance task, but can also be called manually to immediately reclaim resources.
Sourcepub fn config_pusher(&self) -> &ConfigPusher
pub fn config_pusher(&self) -> &ConfigPusher
Get the config pusher for pushing configuration updates to agents.
Sourcepub fn config_update_handler(&self) -> &ConfigUpdateHandler
pub fn config_update_handler(&self) -> &ConfigUpdateHandler
Get the config update handler for processing agent config requests.
Sourcepub fn push_config_to_agent(
&self,
agent_id: &str,
update_type: ConfigUpdateType,
) -> Option<String>
pub fn push_config_to_agent( &self, agent_id: &str, update_type: ConfigUpdateType, ) -> Option<String>
Push a configuration update to a specific agent.
Returns the push ID if the agent supports config push, None otherwise.
Sourcepub fn push_config_to_all(&self, update_type: ConfigUpdateType) -> Vec<String>
pub fn push_config_to_all(&self, update_type: ConfigUpdateType) -> Vec<String>
Push a configuration update to all agents that support config push.
Returns the push IDs for each agent that received the update.
Sourcepub fn acknowledge_config_push(
&self,
push_id: &str,
accepted: bool,
error: Option<String>,
)
pub fn acknowledge_config_push( &self, push_id: &str, accepted: bool, error: Option<String>, )
Acknowledge a config push by its push ID.
Sourcepub async fn add_agent(
&self,
agent_id: impl Into<String>,
endpoint: impl Into<String>,
) -> Result<(), AgentProtocolError>
pub async fn add_agent( &self, agent_id: impl Into<String>, endpoint: impl Into<String>, ) -> Result<(), AgentProtocolError>
Add an agent to the pool.
This creates the configured number of connections to the agent.
Sourcepub async fn remove_agent(
&self,
agent_id: &str,
) -> Result<(), AgentProtocolError>
pub async fn remove_agent( &self, agent_id: &str, ) -> Result<(), AgentProtocolError>
Remove an agent from the pool.
This gracefully closes all connections to the agent.
Sourcepub async fn add_reverse_connection(
&self,
agent_id: &str,
client: ReverseConnectionClient,
capabilities: AgentCapabilities,
) -> Result<(), AgentProtocolError>
pub async fn add_reverse_connection( &self, agent_id: &str, client: ReverseConnectionClient, capabilities: AgentCapabilities, ) -> Result<(), AgentProtocolError>
Add a reverse connection to the pool.
This is called by the ReverseConnectionListener when an agent connects. The connection is wrapped in a V2Transport and added to the agent’s connection pool.
Sourcepub async fn send_request_headers(
&self,
agent_id: &str,
correlation_id: &str,
event: &RequestHeadersEvent,
) -> Result<AgentResponse, AgentProtocolError>
pub async fn send_request_headers( &self, agent_id: &str, correlation_id: &str, event: &RequestHeadersEvent, ) -> Result<AgentResponse, AgentProtocolError>
Send a request headers event to an agent.
The pool selects the best connection based on the load balancing strategy.
§Performance
This is the hot path. Uses:
- Lock-free agent lookup via
DashMap - Cached health state (no async I/O for health check)
- Atomic last_used tracking (no RwLock)
Sourcepub async fn send_request_body_chunk(
&self,
agent_id: &str,
correlation_id: &str,
event: &RequestBodyChunkEvent,
) -> Result<AgentResponse, AgentProtocolError>
pub async fn send_request_body_chunk( &self, agent_id: &str, correlation_id: &str, event: &RequestBodyChunkEvent, ) -> Result<AgentResponse, AgentProtocolError>
Send a request body chunk to an agent.
The pool uses correlation_id to route the chunk to the same connection that received the request headers (connection affinity).
Sourcepub async fn send_response_headers(
&self,
agent_id: &str,
correlation_id: &str,
event: &ResponseHeadersEvent,
) -> Result<AgentResponse, AgentProtocolError>
pub async fn send_response_headers( &self, agent_id: &str, correlation_id: &str, event: &ResponseHeadersEvent, ) -> Result<AgentResponse, AgentProtocolError>
Send response headers to an agent.
Called when upstream response headers are received, allowing the agent to inspect/modify response headers before they’re sent to the client.
Sourcepub async fn send_response_body_chunk(
&self,
agent_id: &str,
correlation_id: &str,
event: &ResponseBodyChunkEvent,
) -> Result<AgentResponse, AgentProtocolError>
pub async fn send_response_body_chunk( &self, agent_id: &str, correlation_id: &str, event: &ResponseBodyChunkEvent, ) -> Result<AgentResponse, AgentProtocolError>
Send a response body chunk to an agent.
For streaming response body inspection, chunks are sent sequentially. The agent can inspect and optionally modify response body data.
Sourcepub async fn cancel_request(
&self,
agent_id: &str,
correlation_id: &str,
reason: CancelReason,
) -> Result<(), AgentProtocolError>
pub async fn cancel_request( &self, agent_id: &str, correlation_id: &str, reason: CancelReason, ) -> Result<(), AgentProtocolError>
Cancel a request on all connections for an agent.
Sourcepub async fn stats(&self) -> Vec<AgentPoolStats>
pub async fn stats(&self) -> Vec<AgentPoolStats>
Get statistics for all agents in the pool.
Sourcepub async fn agent_stats(&self, agent_id: &str) -> Option<AgentPoolStats>
pub async fn agent_stats(&self, agent_id: &str) -> Option<AgentPoolStats>
Get statistics for a specific agent.
Sourcepub async fn agent_capabilities(
&self,
agent_id: &str,
) -> Option<AgentCapabilities>
pub async fn agent_capabilities( &self, agent_id: &str, ) -> Option<AgentCapabilities>
Get the capabilities of an agent.
Sourcepub fn is_agent_healthy(&self, agent_id: &str) -> bool
pub fn is_agent_healthy(&self, agent_id: &str) -> bool
Check if an agent is healthy.
Uses cached health state for fast, lock-free access.
Sourcepub async fn shutdown(&self) -> Result<(), AgentProtocolError>
pub async fn shutdown(&self) -> Result<(), AgentProtocolError>
Gracefully shut down the pool.
This drains all connections and waits for in-flight requests to complete.
Sourcepub async fn run_maintenance(&self)
pub async fn run_maintenance(&self)
Run background maintenance tasks.
This should be spawned as a background task. It handles:
- Health checking (updates cached health state)
- Reconnection of failed connections
- Cleanup of idle connections
§Health Check Strategy
Health is checked here (with I/O) and cached in PooledConnection::healthy_cached.
The hot path (select_connection) reads the cached value without I/O.
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for AgentPool
impl !RefUnwindSafe for AgentPool
impl Send for AgentPool
impl Sync for AgentPool
impl Unpin for AgentPool
impl !UnwindSafe for AgentPool
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request