dynamo_runtime/pipeline/network/egress/unified_client.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Unified Request Plane Client Interface
5//!
6//! This module defines a transport-agnostic interface for sending requests
7//! in the request plane. All transport implementations (TCP, HTTP, NATS)
8//! implement this trait to provide a consistent interface for the egress router.
9
10use anyhow::Result;
11use async_trait::async_trait;
12use bytes::Bytes;
13use std::collections::HashMap;
14
15/// Type alias for request headers
16pub type Headers = HashMap<String, String>;
17
18/// Unified interface for request plane clients
19///
20/// This trait abstracts over different transport mechanisms (TCP, HTTP, NATS)
21/// providing a consistent interface for sending requests and receiving acknowledgments.
22///
23/// # Design Principles
24///
25/// 1. **Transport Agnostic**: Implementations can be swapped without changing router logic
26/// 2. **Async by Default**: All operations are async to support high concurrency
27/// 3. **Headers Support**: All transports must support custom headers for tracing, etc.
28/// 4. **Health Checks**: Implementations should provide connection health information
29/// 5. **Error Handling**: All errors are wrapped in anyhow::Result for flexibility
30///
31/// # Example
32///
33/// ```ignore
34/// use dynamo_runtime::pipeline::network::egress::RequestPlaneClient;
35///
36/// async fn send_request(client: &dyn RequestPlaneClient) -> Result<()> {
37/// let mut headers = HashMap::new();
38/// headers.insert("x-request-id".to_string(), "123".to_string());
39///
40/// let response = client.send_request(
41/// "service-endpoint".to_string(),
42/// Bytes::from("payload"),
43/// headers,
44/// ).await?;
45///
46/// Ok(())
47/// }
48/// ```
49#[async_trait]
50pub trait RequestPlaneClient: Send + Sync {
51 /// Send a request to a specific address and wait for acknowledgment
52 ///
53 /// # Arguments
54 ///
55 /// * `address` - Transport-specific address:
56 /// - HTTP: `http://host:port/path`
57 /// - TCP: `host:port` or `tcp://host:port`
58 /// - NATS: `subject.name`
59 /// * `payload` - Request payload (encoded as bytes)
60 /// * `headers` - Custom headers for tracing, authentication, etc.
61 ///
62 /// # Returns
63 ///
64 /// Returns an acknowledgment response. Note that for streaming responses,
65 /// the actual response data comes over the TCP response plane, not through
66 /// this acknowledgment.
67 ///
68 /// # Errors
69 ///
70 /// Returns an error if:
71 /// - Connection to the endpoint fails
72 /// - Request times out
73 /// - Transport-specific errors occur (e.g., NATS server unavailable)
74 async fn send_request(
75 &self,
76 address: String,
77 payload: Bytes,
78 headers: Headers,
79 ) -> Result<Bytes>;
80
81 /// Get the transport name
82 ///
83 /// Returns a static string identifier for the transport type.
84 /// Used for logging and debugging.
85 ///
86 /// # Examples
87 ///
88 /// - `"tcp"` - Raw TCP transport
89 /// - `"http"` or `"http2"` - HTTP/2 transport
90 /// - `"nats"` - NATS messaging
91 fn transport_name(&self) -> &'static str;
92
93 /// Check connection health
94 ///
95 /// Returns `true` if the client is healthy and ready to send requests.
96 /// This is a lightweight check that doesn't perform actual network I/O.
97 ///
98 /// Implementations should return `false` if:
99 /// - Connection pool is exhausted
100 /// - Underlying transport is disconnected
101 /// - Client has been explicitly closed
102 fn is_healthy(&self) -> bool;
103
104 /// Get client statistics (optional)
105 ///
106 /// Returns runtime statistics about the client for monitoring and debugging.
107 /// Default implementation returns empty statistics.
108 fn stats(&self) -> ClientStats {
109 ClientStats::default()
110 }
111
112 /// Start a background task that eagerly warms connections for newly-discovered backends.
113 /// Only TCP overrides this; HTTP and NATS clients inherit the no-op.
114 fn start_warmup(
115 &self,
116 _instance_rx: tokio::sync::watch::Receiver<Vec<crate::component::Instance>>,
117 _cancel_token: tokio_util::sync::CancellationToken,
118 ) {
119 // No-op default
120 }
121
122 /// Close the client gracefully (optional)
123 ///
124 /// Implementations should:
125 /// - Close all active connections
126 /// - Wait for in-flight requests to complete (or timeout)
127 /// - Release all resources
128 ///
129 /// Default implementation does nothing.
130 async fn close(&self) -> Result<()> {
131 Ok(())
132 }
133}
134
135/// Client runtime statistics
136///
137/// Used for monitoring and debugging transport client performance.
138#[derive(Debug, Clone, Default)]
139pub struct ClientStats {
140 /// Total number of requests sent
141 pub requests_sent: u64,
142
143 /// Total number of successful responses
144 pub responses_received: u64,
145
146 /// Total number of errors
147 pub errors: u64,
148
149 /// Total bytes sent
150 pub bytes_sent: u64,
151
152 /// Total bytes received
153 pub bytes_received: u64,
154
155 /// Number of active connections (for connection-pooled transports)
156 pub active_connections: usize,
157
158 /// Number of idle connections in pool
159 pub idle_connections: usize,
160
161 /// Average request latency in microseconds (0 if not available)
162 pub avg_latency_us: u64,
163}
164
165impl ClientStats {
166 /// Create new empty statistics
167 pub fn new() -> Self {
168 Self::default()
169 }
170
171 /// Check if statistics are available (non-zero)
172 pub fn is_available(&self) -> bool {
173 self.requests_sent > 0 || self.active_connections > 0
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180
181 #[test]
182 fn test_client_stats_default() {
183 let stats = ClientStats::default();
184 assert_eq!(stats.requests_sent, 0);
185 assert_eq!(stats.responses_received, 0);
186 assert!(!stats.is_available());
187 }
188
189 #[test]
190 fn test_client_stats_is_available() {
191 let mut stats = ClientStats::default();
192 assert!(!stats.is_available());
193
194 stats.requests_sent = 1;
195 assert!(stats.is_available());
196
197 let stats2 = ClientStats {
198 active_connections: 1,
199 ..Default::default()
200 };
201 assert!(stats2.is_available());
202 }
203}