pub struct ProxyManager { /* private fields */ }Expand description
Manages proxy routing for agent-controlled services
The ProxyManager coordinates between the agent’s service lifecycle and
the proxy crate’s routing/load balancing infrastructure. It supports:
- HTTP/HTTPS/WebSocket (L7): Multiple port listeners sharing the same
ServiceRegistryfor request matching and load balancing. - TCP/UDP (L4): Standalone stream proxy listeners that forward raw
connections/datagrams to backends via the
StreamRegistry.
Implementations§
Source§impl ProxyManager
impl ProxyManager
Sourcepub fn new(
config: ProxyManagerConfig,
registry: Arc<ServiceRegistry>,
cert_manager: Option<Arc<CertManager>>,
) -> Self
pub fn new( config: ProxyManagerConfig, registry: Arc<ServiceRegistry>, cert_manager: Option<Arc<CertManager>>, ) -> Self
Create a new ProxyManager with the given configuration, service registry,
and optional certificate manager.
Sourcepub fn registry(&self) -> Arc<ServiceRegistry> ⓘ
pub fn registry(&self) -> Arc<ServiceRegistry> ⓘ
Get a reference to the service registry
Sourcepub fn load_balancer(&self) -> Arc<LoadBalancer> ⓘ
pub fn load_balancer(&self) -> Arc<LoadBalancer> ⓘ
Get a reference to the load balancer
Sourcepub fn active_connections(&self) -> u64
pub fn active_connections(&self) -> u64
Get the number of currently active proxy connections.
Sourcepub fn cert_manager(&self) -> Option<&Arc<CertManager>>
pub fn cert_manager(&self) -> Option<&Arc<CertManager>>
Get a reference to the certificate manager (if configured)
Sourcepub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>)
pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>)
Set the stream registry for L4 proxy integration (TCP/UDP)
Sourcepub fn with_stream_registry(self, registry: Arc<StreamRegistry>) -> Self
pub fn with_stream_registry(self, registry: Arc<StreamRegistry>) -> Self
Builder pattern: add stream registry for L4 proxy integration
Sourcepub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>>
pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>>
Get the stream registry (if configured)
Sourcepub fn set_network_policy_checker(&mut self, checker: NetworkPolicyChecker)
pub fn set_network_policy_checker(&mut self, checker: NetworkPolicyChecker)
Set the network policy checker for access control enforcement
Sourcepub fn with_network_policy_checker(self, checker: NetworkPolicyChecker) -> Self
pub fn with_network_policy_checker(self, checker: NetworkPolicyChecker) -> Self
Builder pattern: add network policy checker for access control enforcement
Sourcepub async fn listen_on(&self, port: u16, bind_ip: IpAddr) -> Result<()>
pub async fn listen_on(&self, port: u16, bind_ip: IpAddr) -> Result<()>
Start listening on a specific port bound to the given address.
If already listening on this port, skip.
All port listeners share the same ServiceRegistry for request matching.
§Errors
Returns an error if the proxy server cannot be started.
Sourcepub async fn listen_on_tls(&self, port: u16, bind_ip: IpAddr) -> Result<()>
pub async fn listen_on_tls(&self, port: u16, bind_ip: IpAddr) -> Result<()>
Start an HTTPS listener on the given port using SniCertResolver for dynamic cert selection.
If already listening on this port, skip.
Requires a CertManager to be configured; logs a warning and returns Ok(()) if not.
§Errors
Returns an error if the HTTPS proxy server cannot be started.
Sourcepub async fn stop(&self)
pub async fn stop(&self)
Stop all proxy servers on all ports.
After signalling each server to shut down, waits up to 30 seconds for active connections to drain before returning.
Sourcepub async fn ensure_ports_for_service(
&self,
spec: &ServiceSpec,
overlay_ip: Option<IpAddr>,
) -> Result<()>
pub async fn ensure_ports_for_service( &self, spec: &ServiceSpec, overlay_ip: Option<IpAddr>, ) -> Result<()>
Scan a service’s endpoints and ensure the proxy is listening on all required ports.
- HTTP/HTTPS/WebSocket endpoints start an HTTP proxy listener.
- TCP endpoints bind a
TcpListenerand spawn aTcpStreamService. - UDP endpoints bind a
UdpSocketand spawn aUdpStreamService.
Bind address is determined by the expose type:
- Public endpoints bind to
0.0.0.0(all interfaces). - Internal endpoints bind to the overlay IP so they are only
reachable from within the overlay network. If no overlay is
available, internal endpoints bind to
127.0.0.1(localhost only).
§Errors
Returns an error if an HTTP/HTTPS listener cannot be started.
Sourcepub async fn publish_loopback_for_container(
&self,
service_name: &str,
spec: &ServiceSpec,
container_ip: IpAddr,
port_override: Option<u16>,
)
pub async fn publish_loopback_for_container( &self, service_name: &str, spec: &ServiceSpec, container_ip: IpAddr, port_override: Option<u16>, )
Publish a single container’s exposed ports on the node loopback
(127.0.0.1:<endpoint.port>), forwarding to wherever the container
actually listens.
This implements the GitHub-Actions “service published to localhost”
convention so a consumer sharing the node loopback can reach the
service at localhost:<port>. The published port is always
endpoint.port; the backend the listener forwards to is
(container_ip, port_override.unwrap_or(endpoint.target_port())),
which is already runtime-resolved by the caller:
- On the macOS seatbelt/libkrun runtimes every replica shares the host
127.0.0.1and gets a uniqueport_override, so the container listens on127.0.0.1:<port_override>and we forward there. - On Linux/VZ/HCS the container listens on its overlay IP, so
container_ipis the overlay address andport_overrideisNone, forwarding tooverlay_ip:<target_port>.
Backends accumulate across replicas so multiple members round-robin
behind the single loopback port. Public endpoints are skipped: they
are already bound on 0.0.0.0 and therefore already reachable on
loopback — binding 127.0.0.1:<port> again would fail with
EADDRINUSE.
This NEVER rewrites a container’s own loopback: it only binds the
NODE’s 127.0.0.1 and forwards to the container’s runtime-resolved
address.
Bind failures are tolerated (logged at warn!); this never panics and
never returns an error.
Sourcepub async fn unpublish_loopback_for_container(
&self,
spec: &ServiceSpec,
container_ip: IpAddr,
port_override: Option<u16>,
)
pub async fn unpublish_loopback_for_container( &self, spec: &ServiceSpec, container_ip: IpAddr, port_override: Option<u16>, )
Remove a single container’s backend from the node-loopback publish
path. Mirrors Self::publish_loopback_for_container: it recomputes
the same (container_ip, port_override.unwrap_or(target_port)) backend
per endpoint and drops it from the loopback registry.
When a published port’s backend set becomes empty, the registry entry
is unregistered and the loopback listener is forgotten so the port is
freed for the next bind. Public endpoints are skipped (they were
never published here).
Sourcepub async fn add_service(&self, name: &str, spec: &ServiceSpec)
pub async fn add_service(&self, name: &str, spec: &ServiceSpec)
Add routes for a service based on its specification
This creates proxy routes for each endpoint defined in the ServiceSpec.
HTTP/HTTPS/WebSocket endpoints get L7 routes via the ServiceRegistry.
TCP/UDP endpoints are tracked but their L4 registration is handled
by the ServiceManager::register_service_routes() method.
Sourcepub async fn remove_service(&self, name: &str)
pub async fn remove_service(&self, name: &str)
Remove all routes, L4 listeners, and HTTP server handles for a service.
This performs a full cleanup of all proxy resources associated with the service:
- Removes L7 (HTTP/HTTPS/WebSocket) routes from the
ServiceRegistry - Unregisters TCP/UDP stream services from the
StreamRegistry - Removes port tracking for TCP/UDP listeners
- Shuts down HTTP proxy server handles that were exclusively owned by this service (only if no other service uses the same port)
Sourcepub async fn add_backend(&self, service: &str, addr: SocketAddr)
pub async fn add_backend(&self, service: &str, addr: SocketAddr)
Add a single backend to a service.
Adds to the service-level LB group and to every per-endpoint LB
group tracked for service. Per-endpoint role filtering happens at
collection time in the agent’s service manager, so any backend
surfaced here is already eligible for every endpoint.
Sourcepub async fn remove_backend(&self, service: &str, addr: SocketAddr)
pub async fn remove_backend(&self, service: &str, addr: SocketAddr)
Remove a backend from a service.
Removes from the service-level LB group and from every per-endpoint LB group.
Sourcepub async fn update_backend_health(
&self,
service: &str,
addr: SocketAddr,
healthy: bool,
)
pub async fn update_backend_health( &self, service: &str, addr: SocketAddr, healthy: bool, )
Update the health status of a backend in the load balancer.
Delegates to LoadBalancer::mark_health so that unhealthy backends
are skipped during selection. Health is tracked on both the
service-level group and every per-endpoint group that contains
this address.
Sourcepub async fn update_backends(&self, service: &str, addrs: Vec<SocketAddr>)
pub async fn update_backends(&self, service: &str, addrs: Vec<SocketAddr>)
Update the backends for every endpoint of a service with the same list.
Use this only when caller cannot distinguish per-endpoint backend
sets (e.g., legacy paths that do not honor target_role). Prefer
Self::update_endpoint_backends when per-endpoint filtering is
possible.
Sourcepub async fn update_endpoint_backends(
&self,
service: &str,
endpoint_name: &str,
addrs: Vec<SocketAddr>,
)
pub async fn update_endpoint_backends( &self, service: &str, endpoint_name: &str, addrs: Vec<SocketAddr>, )
Update backends for a single L7 endpoint of a service.
This honors [EndpointSpec::target_role] filtering: the caller
supplies the role-filtered backend list and this method updates
only the routes and LB group corresponding to (service, endpoint_name).
Sourcepub async fn route_count(&self) -> usize
pub async fn route_count(&self) -> usize
Get the number of registered routes
Sourcepub async fn list_services(&self) -> Vec<String>
pub async fn list_services(&self) -> Vec<String>
Get the list of registered service names
Sourcepub async fn has_service(&self, name: &str) -> bool
pub async fn has_service(&self, name: &str) -> bool
Check if a service has any registered endpoints
Trait Implementations§
Source§impl Drop for ProxyManager
impl Drop for ProxyManager
Auto Trait Implementations§
impl !Freeze for ProxyManager
impl !RefUnwindSafe for ProxyManager
impl !UnwindSafe for ProxyManager
impl Send for ProxyManager
impl Sync for ProxyManager
impl Unpin for ProxyManager
impl UnsafeUnpin for ProxyManager
Blanket Implementations§
Source§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
Source§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
Source§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
Source§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Requestimpl<T> OptionalSend for T
impl<T> OptionalSync for T
Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<T> ServiceExt for T
impl<T> ServiceExt for T
Source§fn propagate_header(self, header: HeaderName) -> PropagateHeader<Self>where
Self: Sized,
fn propagate_header(self, header: HeaderName) -> PropagateHeader<Self>where
Self: Sized,
Source§fn add_extension<T>(self, value: T) -> AddExtension<Self, T>where
Self: Sized,
fn add_extension<T>(self, value: T) -> AddExtension<Self, T>where
Self: Sized,
Source§fn map_request_body<F>(self, f: F) -> MapRequestBody<Self, F>where
Self: Sized,
fn map_request_body<F>(self, f: F) -> MapRequestBody<Self, F>where
Self: Sized,
Source§fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>where
Self: Sized,
fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>where
Self: Sized,
Source§fn compression(self) -> Compression<Self>where
Self: Sized,
fn compression(self) -> Compression<Self>where
Self: Sized,
Source§fn decompression(self) -> Decompression<Self>where
Self: Sized,
fn decompression(self) -> Decompression<Self>where
Self: Sized,
Source§fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>where
Self: Sized,
fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>where
Self: Sized,
Source§fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>where
Self: Sized,
fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>where
Self: Sized,
Source§fn follow_redirects(self) -> FollowRedirect<Self>where
Self: Sized,
fn follow_redirects(self) -> FollowRedirect<Self>where
Self: Sized,
Source§fn sensitive_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveRequestHeaders<SetSensitiveResponseHeaders<Self>>where
Self: Sized,
fn sensitive_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveRequestHeaders<SetSensitiveResponseHeaders<Self>>where
Self: Sized,
Source§fn sensitive_request_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveRequestHeaders<Self>where
Self: Sized,
fn sensitive_request_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveRequestHeaders<Self>where
Self: Sized,
Source§fn sensitive_response_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveResponseHeaders<Self>where
Self: Sized,
fn sensitive_response_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveResponseHeaders<Self>where
Self: Sized,
Source§fn override_request_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
fn override_request_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
Source§fn append_request_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
fn append_request_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
Source§fn insert_request_header_if_not_present<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
fn insert_request_header_if_not_present<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
Source§fn override_response_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
fn override_response_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
Source§fn append_response_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
fn append_response_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
Source§fn insert_response_header_if_not_present<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
fn insert_response_header_if_not_present<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
Source§fn set_request_id<M>(
self,
header_name: HeaderName,
make_request_id: M,
) -> SetRequestId<Self, M>where
Self: Sized,
M: MakeRequestId,
fn set_request_id<M>(
self,
header_name: HeaderName,
make_request_id: M,
) -> SetRequestId<Self, M>where
Self: Sized,
M: MakeRequestId,
Source§fn set_x_request_id<M>(self, make_request_id: M) -> SetRequestId<Self, M>where
Self: Sized,
M: MakeRequestId,
fn set_x_request_id<M>(self, make_request_id: M) -> SetRequestId<Self, M>where
Self: Sized,
M: MakeRequestId,
x-request-id as the header name. Read moreSource§fn propagate_request_id(
self,
header_name: HeaderName,
) -> PropagateRequestId<Self>where
Self: Sized,
fn propagate_request_id(
self,
header_name: HeaderName,
) -> PropagateRequestId<Self>where
Self: Sized,
Source§fn propagate_x_request_id(self) -> PropagateRequestId<Self>where
Self: Sized,
fn propagate_x_request_id(self) -> PropagateRequestId<Self>where
Self: Sized,
x-request-id as the header name. Read moreSource§fn catch_panic(self) -> CatchPanic<Self, DefaultResponseForPanic>where
Self: Sized,
fn catch_panic(self) -> CatchPanic<Self, DefaultResponseForPanic>where
Self: Sized,
500 Internal Server responses. Read moreSource§fn request_body_limit(self, limit: usize) -> RequestBodyLimit<Self>where
Self: Sized,
fn request_body_limit(self, limit: usize) -> RequestBodyLimit<Self>where
Self: Sized,
413 Payload Too Large responses. Read more