dynamo_runtime/pipeline/network/egress/unified_client.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Unified Request Plane Client Interface
5//!
6//! This module defines a transport-agnostic interface for sending requests
7//! in the request plane. All transport implementations (TCP, HTTP, NATS)
8//! implement this trait to provide a consistent interface for the egress router.
9
10use anyhow::Result;
11use async_trait::async_trait;
12use bytes::Bytes;
13use std::collections::HashMap;
14
15/// Type alias for request headers
16pub type Headers = HashMap<String, String>;
17
18/// Unified interface for request plane clients
19///
20/// This trait abstracts over different transport mechanisms (TCP, HTTP, NATS)
21/// providing a consistent interface for sending requests and receiving acknowledgments.
22///
23/// # Design Principles
24///
25/// 1. **Transport Agnostic**: Implementations can be swapped without changing router logic
26/// 2. **Async by Default**: All operations are async to support high concurrency
27/// 3. **Headers Support**: All transports must support custom headers for tracing, etc.
28/// 4. **Health Checks**: Implementations should provide connection health information
29/// 5. **Error Handling**: All errors are wrapped in anyhow::Result for flexibility
30///
31/// # Example
32///
33/// ```ignore
34/// use dynamo_runtime::pipeline::network::egress::RequestPlaneClient;
35///
36/// async fn send_request(client: &dyn RequestPlaneClient) -> Result<()> {
37/// let mut headers = HashMap::new();
38/// headers.insert("x-request-id".to_string(), "123".to_string());
39///
40/// let response = client.send_request(
41/// "service-endpoint".to_string(),
42/// Bytes::from("payload"),
43/// headers,
44/// ).await?;
45///
46/// Ok(())
47/// }
48/// ```
49#[async_trait]
50pub trait RequestPlaneClient: Send + Sync {
51 /// Send a request to a specific address and wait for acknowledgment
52 ///
53 /// # Arguments
54 ///
55 /// * `address` - Transport-specific address:
56 /// - HTTP: `http://host:port/path`
57 /// - TCP: `host:port` or `tcp://host:port`
58 /// - NATS: `subject.name`
59 /// * `payload` - Request payload (encoded as bytes)
60 /// * `headers` - Custom headers for tracing, authentication, etc.
61 ///
62 /// # Returns
63 ///
64 /// Returns an acknowledgment response. Note that for streaming responses,
65 /// the actual response data comes over the TCP response plane, not through
66 /// this acknowledgment.
67 ///
68 /// # Errors
69 ///
70 /// Returns an error if:
71 /// - Connection to the endpoint fails
72 /// - Request times out
73 /// - Transport-specific errors occur (e.g., NATS server unavailable)
74 async fn send_request(
75 &self,
76 address: String,
77 payload: Bytes,
78 headers: Headers,
79 ) -> Result<Bytes>;
80
81 /// Get the transport name
82 ///
83 /// Returns a static string identifier for the transport type.
84 /// Used for logging and debugging.
85 ///
86 /// # Examples
87 ///
88 /// - `"tcp"` - Raw TCP transport
89 /// - `"http"` or `"http2"` - HTTP/2 transport
90 /// - `"nats"` - NATS messaging
91 fn transport_name(&self) -> &'static str;
92
93 /// Check connection health
94 ///
95 /// Returns `true` if the client is healthy and ready to send requests.
96 /// This is a lightweight check that doesn't perform actual network I/O.
97 ///
98 /// Implementations should return `false` if:
99 /// - Connection pool is exhausted
100 /// - Underlying transport is disconnected
101 /// - Client has been explicitly closed
102 fn is_healthy(&self) -> bool;
103
104 /// Get client statistics (optional)
105 ///
106 /// Returns runtime statistics about the client for monitoring and debugging.
107 /// Default implementation returns empty statistics.
108 fn stats(&self) -> ClientStats {
109 ClientStats::default()
110 }
111
112 /// Close the client gracefully (optional)
113 ///
114 /// Implementations should:
115 /// - Close all active connections
116 /// - Wait for in-flight requests to complete (or timeout)
117 /// - Release all resources
118 ///
119 /// Default implementation does nothing.
120 async fn close(&self) -> Result<()> {
121 Ok(())
122 }
123}
124
125/// Client runtime statistics
126///
127/// Used for monitoring and debugging transport client performance.
128#[derive(Debug, Clone, Default)]
129pub struct ClientStats {
130 /// Total number of requests sent
131 pub requests_sent: u64,
132
133 /// Total number of successful responses
134 pub responses_received: u64,
135
136 /// Total number of errors
137 pub errors: u64,
138
139 /// Total bytes sent
140 pub bytes_sent: u64,
141
142 /// Total bytes received
143 pub bytes_received: u64,
144
145 /// Number of active connections (for connection-pooled transports)
146 pub active_connections: usize,
147
148 /// Number of idle connections in pool
149 pub idle_connections: usize,
150
151 /// Average request latency in microseconds (0 if not available)
152 pub avg_latency_us: u64,
153}
154
155impl ClientStats {
156 /// Create new empty statistics
157 pub fn new() -> Self {
158 Self::default()
159 }
160
161 /// Check if statistics are available (non-zero)
162 pub fn is_available(&self) -> bool {
163 self.requests_sent > 0 || self.active_connections > 0
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[test]
172 fn test_client_stats_default() {
173 let stats = ClientStats::default();
174 assert_eq!(stats.requests_sent, 0);
175 assert_eq!(stats.responses_received, 0);
176 assert!(!stats.is_available());
177 }
178
179 #[test]
180 fn test_client_stats_is_available() {
181 let mut stats = ClientStats::default();
182 assert!(!stats.is_available());
183
184 stats.requests_sent = 1;
185 assert!(stats.is_available());
186
187 let stats2 = ClientStats {
188 active_connections: 1,
189 ..Default::default()
190 };
191 assert!(stats2.is_available());
192 }
193}