Skip to main content

ProxyManager

Struct ProxyManager 

Source
pub struct ProxyManager { /* private fields */ }
Expand description

Manages proxy routing for agent-controlled services

The ProxyManager coordinates between the agent’s service lifecycle and the proxy crate’s routing/load balancing infrastructure. It supports:

  • HTTP/HTTPS/WebSocket (L7): Multiple port listeners sharing the same ServiceRegistry for request matching and load balancing.
  • TCP/UDP (L4): Standalone stream proxy listeners that forward raw connections/datagrams to backends via the StreamRegistry.

Implementations§

Source§

impl ProxyManager

Source

pub fn new( config: ProxyManagerConfig, registry: Arc<ServiceRegistry>, cert_manager: Option<Arc<CertManager>>, ) -> Self

Create a new ProxyManager with the given configuration, service registry, and optional certificate manager.

Source

pub fn registry(&self) -> Arc<ServiceRegistry>

Get a reference to the service registry

Source

pub fn load_balancer(&self) -> Arc<LoadBalancer>

Get a reference to the load balancer

Source

pub fn active_connections(&self) -> u64

Get the number of currently active proxy connections.

Source

pub fn cert_manager(&self) -> Option<&Arc<CertManager>>

Get a reference to the certificate manager (if configured)

Source

pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>)

Set the stream registry for L4 proxy integration (TCP/UDP)

Source

pub fn with_stream_registry(self, registry: Arc<StreamRegistry>) -> Self

Builder pattern: add stream registry for L4 proxy integration

Source

pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>>

Get the stream registry (if configured)

Source

pub fn set_network_policy_checker(&mut self, checker: NetworkPolicyChecker)

Set the network policy checker for access control enforcement

Source

pub fn with_network_policy_checker(self, checker: NetworkPolicyChecker) -> Self

Builder pattern: add network policy checker for access control enforcement

Source

pub async fn listen_on(&self, port: u16, bind_ip: IpAddr) -> Result<()>

Start listening on a specific port bound to the given address.

If already listening on this port, skip. All port listeners share the same ServiceRegistry for request matching.

§Errors

Returns an error if the proxy server cannot be started.

Source

pub async fn listen_on_tls(&self, port: u16, bind_ip: IpAddr) -> Result<()>

Start an HTTPS listener on the given port using SniCertResolver for dynamic cert selection.

If already listening on this port, skip. Requires a CertManager to be configured; logs a warning and returns Ok(()) if not.

§Errors

Returns an error if the HTTPS proxy server cannot be started.

Source

pub async fn stop(&self)

Stop all proxy servers on all ports.

After signalling each server to shut down, waits up to 30 seconds for active connections to drain before returning.

Source

pub async fn unbind(&self, port: u16)

Remove and shut down the listener on a specific port.

Source

pub async fn ensure_ports_for_service( &self, spec: &ServiceSpec, overlay_ip: Option<IpAddr>, ) -> Result<()>

Scan a service’s endpoints and ensure the proxy is listening on all required ports.

  • HTTP/HTTPS/WebSocket endpoints start an HTTP proxy listener.
  • TCP endpoints bind a TcpListener and spawn a TcpStreamService.
  • UDP endpoints bind a UdpSocket and spawn a UdpStreamService.

Bind address is determined by the expose type:

  • Public endpoints bind to 0.0.0.0 (all interfaces).
  • Internal endpoints bind to the overlay IP so they are only reachable from within the overlay network. If no overlay is available, internal endpoints bind to 127.0.0.1 (localhost only).
§Errors

Returns an error if an HTTP/HTTPS listener cannot be started.

Source

pub async fn publish_loopback_for_container( &self, service_name: &str, spec: &ServiceSpec, container_ip: IpAddr, port_override: Option<u16>, )

Publish a single container’s exposed ports on the node loopback (127.0.0.1:<endpoint.port>), forwarding to wherever the container actually listens.

This implements the GitHub-Actions “service published to localhost” convention so a consumer sharing the node loopback can reach the service at localhost:<port>. The published port is always endpoint.port; the backend the listener forwards to is (container_ip, port_override.unwrap_or(endpoint.target_port())), which is already runtime-resolved by the caller:

  • On the macOS seatbelt/libkrun runtimes every replica shares the host 127.0.0.1 and gets a unique port_override, so the container listens on 127.0.0.1:<port_override> and we forward there.
  • On Linux/VZ/HCS the container listens on its overlay IP, so container_ip is the overlay address and port_override is None, forwarding to overlay_ip:<target_port>.

Backends accumulate across replicas so multiple members round-robin behind the single loopback port. Public endpoints are skipped: they are already bound on 0.0.0.0 and therefore already reachable on loopback — binding 127.0.0.1:<port> again would fail with EADDRINUSE.

This NEVER rewrites a container’s own loopback: it only binds the NODE’s 127.0.0.1 and forwards to the container’s runtime-resolved address.

Bind failures are tolerated (logged at warn!); this never panics and never returns an error.

Source

pub async fn unpublish_loopback_for_container( &self, spec: &ServiceSpec, container_ip: IpAddr, port_override: Option<u16>, )

Remove a single container’s backend from the node-loopback publish path. Mirrors Self::publish_loopback_for_container: it recomputes the same (container_ip, port_override.unwrap_or(target_port)) backend per endpoint and drops it from the loopback registry.

When a published port’s backend set becomes empty, the registry entry is unregistered and the loopback listener is forgotten so the port is freed for the next bind. Public endpoints are skipped (they were never published here).

Source

pub async fn add_service(&self, name: &str, spec: &ServiceSpec)

Add routes for a service based on its specification

This creates proxy routes for each endpoint defined in the ServiceSpec. HTTP/HTTPS/WebSocket endpoints get L7 routes via the ServiceRegistry. TCP/UDP endpoints are tracked but their L4 registration is handled by the ServiceManager::register_service_routes() method.

Source

pub async fn remove_service(&self, name: &str)

Remove all routes, L4 listeners, and HTTP server handles for a service.

This performs a full cleanup of all proxy resources associated with the service:

  • Removes L7 (HTTP/HTTPS/WebSocket) routes from the ServiceRegistry
  • Unregisters TCP/UDP stream services from the StreamRegistry
  • Removes port tracking for TCP/UDP listeners
  • Shuts down HTTP proxy server handles that were exclusively owned by this service (only if no other service uses the same port)
Source

pub async fn add_backend(&self, service: &str, addr: SocketAddr)

Add a single backend to a service.

Adds to the service-level LB group and to every per-endpoint LB group tracked for service. Per-endpoint role filtering happens at collection time in the agent’s service manager, so any backend surfaced here is already eligible for every endpoint.

Source

pub async fn remove_backend(&self, service: &str, addr: SocketAddr)

Remove a backend from a service.

Removes from the service-level LB group and from every per-endpoint LB group.

Source

pub async fn update_backend_health( &self, service: &str, addr: SocketAddr, healthy: bool, )

Update the health status of a backend in the load balancer.

Delegates to LoadBalancer::mark_health so that unhealthy backends are skipped during selection. Health is tracked on both the service-level group and every per-endpoint group that contains this address.

Source

pub async fn update_backends(&self, service: &str, addrs: Vec<SocketAddr>)

Update the backends for every endpoint of a service with the same list.

Use this only when caller cannot distinguish per-endpoint backend sets (e.g., legacy paths that do not honor target_role). Prefer Self::update_endpoint_backends when per-endpoint filtering is possible.

Source

pub async fn update_endpoint_backends( &self, service: &str, endpoint_name: &str, addrs: Vec<SocketAddr>, )

Update backends for a single L7 endpoint of a service.

This honors [EndpointSpec::target_role] filtering: the caller supplies the role-filtered backend list and this method updates only the routes and LB group corresponding to (service, endpoint_name).

Source

pub async fn route_count(&self) -> usize

Get the number of registered routes

Source

pub async fn list_services(&self) -> Vec<String>

Get the list of registered service names

Source

pub async fn has_service(&self, name: &str) -> bool

Check if a service has any registered endpoints

Trait Implementations§

Source§

impl Drop for ProxyManager

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

fn pin_drop(self: Pin<&mut Self>)

🔬This is a nightly-only experimental API. (pin_ergonomics)
Execute the destructor for this type, but different to Drop::drop, it requires self to be pinned. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<'a, T, E> AsTaggedExplicit<'a, E> for T
where T: 'a,

Source§

fn explicit(self, class: Class, tag: u32) -> TaggedParser<'a, Explicit, Self, E>

Source§

impl<'a, T, E> AsTaggedExplicit<'a, E> for T
where T: 'a,

Source§

fn explicit(self, class: Class, tag: u32) -> TaggedParser<'a, Explicit, Self, E>

Source§

impl<'a, T, E> AsTaggedImplicit<'a, E> for T
where T: 'a,

Source§

fn implicit( self, class: Class, constructed: bool, tag: u32, ) -> TaggedParser<'a, Implicit, Self, E>

Source§

impl<'a, T, E> AsTaggedImplicit<'a, E> for T
where T: 'a,

Source§

fn implicit( self, class: Class, constructed: bool, tag: u32, ) -> TaggedParser<'a, Implicit, Self, E>

Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> OptionalSend for T
where T: Send + ?Sized,

Source§

impl<T> OptionalSync for T
where T: Sync + ?Sized,

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ServiceExt for T

Source§

fn propagate_header(self, header: HeaderName) -> PropagateHeader<Self>
where Self: Sized,

Propagate a header from the request to the response. Read more
Source§

fn add_extension<T>(self, value: T) -> AddExtension<Self, T>
where Self: Sized,

Add some shareable value to request extensions. Read more
Source§

fn map_request_body<F>(self, f: F) -> MapRequestBody<Self, F>
where Self: Sized,

Apply a transformation to the request body. Read more
Source§

fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>
where Self: Sized,

Apply a transformation to the response body. Read more
Source§

fn compression(self) -> Compression<Self>
where Self: Sized,

Compresses response bodies. Read more
Source§

fn decompression(self) -> Decompression<Self>
where Self: Sized,

Decompress response bodies. Read more
Source§

fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>
where Self: Sized,

High level tracing that classifies responses using HTTP status codes. Read more
Source§

fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>
where Self: Sized,

High level tracing that classifies responses using gRPC headers. Read more
Source§

fn follow_redirects(self) -> FollowRedirect<Self>
where Self: Sized,

Follow redirect resposes using the Standard policy. Read more
Source§

fn sensitive_headers( self, headers: impl IntoIterator<Item = HeaderName>, ) -> SetSensitiveRequestHeaders<SetSensitiveResponseHeaders<Self>>
where Self: Sized,

Mark headers as sensitive on both requests and responses. Read more
Source§

fn sensitive_request_headers( self, headers: impl IntoIterator<Item = HeaderName>, ) -> SetSensitiveRequestHeaders<Self>
where Self: Sized,

Mark headers as sensitive on requests. Read more
Source§

fn sensitive_response_headers( self, headers: impl IntoIterator<Item = HeaderName>, ) -> SetSensitiveResponseHeaders<Self>
where Self: Sized,

Mark headers as sensitive on responses. Read more
Source§

fn override_request_header<M>( self, header_name: HeaderName, make: M, ) -> SetRequestHeader<Self, M>
where Self: Sized,

Insert a header into the request. Read more
Source§

fn append_request_header<M>( self, header_name: HeaderName, make: M, ) -> SetRequestHeader<Self, M>
where Self: Sized,

Append a header into the request. Read more
Source§

fn insert_request_header_if_not_present<M>( self, header_name: HeaderName, make: M, ) -> SetRequestHeader<Self, M>
where Self: Sized,

Insert a header into the request, if the header is not already present. Read more
Source§

fn override_response_header<M>( self, header_name: HeaderName, make: M, ) -> SetResponseHeader<Self, M>
where Self: Sized,

Insert a header into the response. Read more
Source§

fn append_response_header<M>( self, header_name: HeaderName, make: M, ) -> SetResponseHeader<Self, M>
where Self: Sized,

Append a header into the response. Read more
Source§

fn insert_response_header_if_not_present<M>( self, header_name: HeaderName, make: M, ) -> SetResponseHeader<Self, M>
where Self: Sized,

Insert a header into the response, if the header is not already present. Read more
Source§

fn set_request_id<M>( self, header_name: HeaderName, make_request_id: M, ) -> SetRequestId<Self, M>
where Self: Sized, M: MakeRequestId,

Add request id header and extension. Read more
Source§

fn set_x_request_id<M>(self, make_request_id: M) -> SetRequestId<Self, M>
where Self: Sized, M: MakeRequestId,

Add request id header and extension, using x-request-id as the header name. Read more
Source§

fn propagate_request_id( self, header_name: HeaderName, ) -> PropagateRequestId<Self>
where Self: Sized,

Propgate request ids from requests to responses. Read more
Source§

fn propagate_x_request_id(self) -> PropagateRequestId<Self>
where Self: Sized,

Propgate request ids from requests to responses, using x-request-id as the header name. Read more
Source§

fn catch_panic(self) -> CatchPanic<Self, DefaultResponseForPanic>
where Self: Sized,

Catch panics and convert them into 500 Internal Server responses. Read more
Source§

fn request_body_limit(self, limit: usize) -> RequestBodyLimit<Self>
where Self: Sized,

Intercept requests with over-sized payloads and convert them into 413 Payload Too Large responses. Read more
Source§

fn trim_trailing_slash(self) -> NormalizePath<Self>
where Self: Sized,

Remove trailing slashes from paths. Read more
Source§

fn append_trailing_slash(self) -> NormalizePath<Self>
where Self: Sized,

Append trailing slash to paths. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more