AgentPool

Struct AgentPool 

Source
pub struct AgentPool { /* private fields */ }
Expand description

Agent connection pool.

Manages multiple connections to multiple agents with load balancing, health tracking, automatic reconnection, and metrics collection.

§Performance

Uses DashMap for lock-free reads in the hot path. Agent lookup is O(1) without contention. Connection selection uses cached health state to avoid async I/O per request.

Implementations§

Source§

impl AgentPool

Source

pub fn new() -> Self

Create a new agent pool with default configuration.

Source

pub fn with_config(config: AgentPoolConfig) -> Self

Create a new agent pool with custom configuration.

Source

pub fn protocol_metrics(&self) -> &ProtocolMetrics

Get the protocol metrics for accessing proxy-side instrumentation.

Source

pub fn protocol_metrics_arc(&self) -> Arc<ProtocolMetrics>

Get an Arc to the protocol metrics.

Source

pub fn metrics_collector(&self) -> &MetricsCollector

Get the metrics collector for accessing aggregated agent metrics.

Source

pub fn metrics_collector_arc(&self) -> Arc<MetricsCollector>

Get an Arc to the metrics collector.

This is useful for registering the collector with a MetricsManager.

Source

pub fn export_prometheus(&self) -> String

Export all agent metrics in Prometheus format.

Source

pub fn clear_correlation_affinity(&self, correlation_id: &str)

Clear connection affinity for a correlation ID.

Call this when a request is complete (after receiving final response) to free the affinity mapping. Not strictly necessary (affinities are cheap), but good practice for long-running proxies.

Source

pub fn correlation_affinity_count(&self) -> usize

Get the number of active correlation affinities.

This is useful for monitoring and debugging.

Source

pub fn create_sticky_session( &self, session_id: impl Into<String>, agent_id: &str, ) -> Result<(), AgentProtocolError>

Create a sticky session for a long-lived streaming connection.

Sticky sessions ensure that all requests for a given session use the same agent connection. This is essential for:

  • WebSocket connections
  • Server-Sent Events (SSE)
  • Long-polling
  • Any streaming scenario requiring connection affinity
§Arguments
  • session_id - A unique identifier for this session (e.g., WebSocket connection ID)
  • agent_id - The agent to bind this session to
§Returns

Returns Ok(()) if the session was created, or an error if the agent is not found or has no healthy connections.

§Example
// When a WebSocket is established
pool.create_sticky_session("ws-12345", "waf-agent").await?;

// All subsequent messages use the same connection
pool.send_request_with_sticky_session("ws-12345", &event).await?;

// When the WebSocket closes
pool.clear_sticky_session("ws-12345");
Source

pub fn refresh_sticky_session(&self, session_id: &str) -> bool

Refresh a sticky session, updating its last-accessed time.

Returns true if the session exists and was refreshed, false otherwise.

Source

pub fn has_sticky_session(&self, session_id: &str) -> bool

Check if a sticky session exists and is valid.

Source

pub fn clear_sticky_session(&self, session_id: &str)

Clear a sticky session.

Call this when a long-lived stream ends (WebSocket closed, SSE ended, etc.)

Source

pub fn sticky_session_count(&self) -> usize

Get the number of active sticky sessions.

Useful for monitoring and debugging.

Source

pub fn sticky_session_agent(&self, session_id: &str) -> Option<String>

Get the agent ID bound to a sticky session.

Source

pub async fn send_request_headers_with_sticky_session( &self, session_id: &str, agent_id: &str, correlation_id: &str, event: &RequestHeadersEvent, ) -> Result<(AgentResponse, bool), AgentProtocolError>

Send a request using a sticky session.

If the session exists and is valid, uses the bound connection. If the session doesn’t exist or has expired, falls back to normal connection selection.

§Returns

A tuple of (response, used_sticky_session).

Source

pub fn cleanup_expired_sessions(&self) -> usize

Clean up expired sticky sessions.

Called automatically by the maintenance task, but can also be called manually to immediately reclaim resources.

Source

pub fn config_pusher(&self) -> &ConfigPusher

Get the config pusher for pushing configuration updates to agents.

Source

pub fn config_update_handler(&self) -> &ConfigUpdateHandler

Get the config update handler for processing agent config requests.

Source

pub fn push_config_to_agent( &self, agent_id: &str, update_type: ConfigUpdateType, ) -> Option<String>

Push a configuration update to a specific agent.

Returns the push ID if the agent supports config push, None otherwise.

Source

pub fn push_config_to_all(&self, update_type: ConfigUpdateType) -> Vec<String>

Push a configuration update to all agents that support config push.

Returns the push IDs for each agent that received the update.

Source

pub fn acknowledge_config_push( &self, push_id: &str, accepted: bool, error: Option<String>, )

Acknowledge a config push by its push ID.

Source

pub async fn add_agent( &self, agent_id: impl Into<String>, endpoint: impl Into<String>, ) -> Result<(), AgentProtocolError>

Add an agent to the pool.

This creates the configured number of connections to the agent.

Source

pub async fn remove_agent( &self, agent_id: &str, ) -> Result<(), AgentProtocolError>

Remove an agent from the pool.

This gracefully closes all connections to the agent.

Source

pub async fn add_reverse_connection( &self, agent_id: &str, client: ReverseConnectionClient, capabilities: AgentCapabilities, ) -> Result<(), AgentProtocolError>

Add a reverse connection to the pool.

This is called by the ReverseConnectionListener when an agent connects. The connection is wrapped in a V2Transport and added to the agent’s connection pool.

Source

pub async fn send_request_headers( &self, agent_id: &str, correlation_id: &str, event: &RequestHeadersEvent, ) -> Result<AgentResponse, AgentProtocolError>

Send a request headers event to an agent.

The pool selects the best connection based on the load balancing strategy.

§Performance

This is the hot path. Uses:

  • Lock-free agent lookup via DashMap
  • Cached health state (no async I/O for health check)
  • Atomic last_used tracking (no RwLock)
Source

pub async fn send_request_body_chunk( &self, agent_id: &str, correlation_id: &str, event: &RequestBodyChunkEvent, ) -> Result<AgentResponse, AgentProtocolError>

Send a request body chunk to an agent.

The pool uses correlation_id to route the chunk to the same connection that received the request headers (connection affinity).

Source

pub async fn send_response_headers( &self, agent_id: &str, correlation_id: &str, event: &ResponseHeadersEvent, ) -> Result<AgentResponse, AgentProtocolError>

Send response headers to an agent.

Called when upstream response headers are received, allowing the agent to inspect/modify response headers before they’re sent to the client.

Source

pub async fn send_response_body_chunk( &self, agent_id: &str, correlation_id: &str, event: &ResponseBodyChunkEvent, ) -> Result<AgentResponse, AgentProtocolError>

Send a response body chunk to an agent.

For streaming response body inspection, chunks are sent sequentially. The agent can inspect and optionally modify response body data.

Source

pub async fn cancel_request( &self, agent_id: &str, correlation_id: &str, reason: CancelReason, ) -> Result<(), AgentProtocolError>

Cancel a request on all connections for an agent.

Source

pub async fn stats(&self) -> Vec<AgentPoolStats>

Get statistics for all agents in the pool.

Source

pub async fn agent_stats(&self, agent_id: &str) -> Option<AgentPoolStats>

Get statistics for a specific agent.

Source

pub async fn agent_capabilities( &self, agent_id: &str, ) -> Option<AgentCapabilities>

Get the capabilities of an agent.

Source

pub fn is_agent_healthy(&self, agent_id: &str) -> bool

Check if an agent is healthy.

Uses cached health state for fast, lock-free access.

Source

pub fn agent_ids(&self) -> Vec<String>

Get all agent IDs in the pool.

Source

pub async fn shutdown(&self) -> Result<(), AgentProtocolError>

Gracefully shut down the pool.

This drains all connections and waits for in-flight requests to complete.

Source

pub async fn run_maintenance(&self)

Run background maintenance tasks.

This should be spawned as a background task. It handles:

  • Health checking (updates cached health state)
  • Reconnection of failed connections
  • Cleanup of idle connections
§Health Check Strategy

Health is checked here (with I/O) and cached in PooledConnection::healthy_cached. The hot path (select_connection) reads the cached value without I/O.

Trait Implementations§

Source§

impl Debug for AgentPool

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for AgentPool

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more