Skip to main content

dynamo_runtime/pipeline/network/egress/
unified_client.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Unified Request Plane Client Interface
5//!
6//! This module defines a transport-agnostic interface for sending requests
7//! in the request plane. All transport implementations (TCP, HTTP, NATS)
8//! implement this trait to provide a consistent interface for the egress router.
9
10use anyhow::Result;
11use async_trait::async_trait;
12use bytes::Bytes;
13use std::collections::HashMap;
14
15/// Type alias for request headers
16pub type Headers = HashMap<String, String>;
17
18/// Unified interface for request plane clients
19///
20/// This trait abstracts over different transport mechanisms (TCP, HTTP, NATS)
21/// providing a consistent interface for sending requests and receiving acknowledgments.
22///
23/// # Design Principles
24///
25/// 1. **Transport Agnostic**: Implementations can be swapped without changing router logic
26/// 2. **Async by Default**: All operations are async to support high concurrency
27/// 3. **Headers Support**: All transports must support custom headers for tracing, etc.
28/// 4. **Health Checks**: Implementations should provide connection health information
29/// 5. **Error Handling**: All errors are wrapped in anyhow::Result for flexibility
30///
31/// # Example
32///
33/// ```ignore
34/// use dynamo_runtime::pipeline::network::egress::RequestPlaneClient;
35///
36/// async fn send_request(client: &dyn RequestPlaneClient) -> Result<()> {
37///     let mut headers = HashMap::new();
38///     headers.insert("x-request-id".to_string(), "123".to_string());
39///
40///     let response = client.send_request(
41///         "service-endpoint".to_string(),
42///         Bytes::from("payload"),
43///         headers,
44///     ).await?;
45///
46///     Ok(())
47/// }
48/// ```
49#[async_trait]
50pub trait RequestPlaneClient: Send + Sync {
51    /// Send a request to a specific address and wait for acknowledgment
52    ///
53    /// # Arguments
54    ///
55    /// * `address` - Transport-specific address:
56    ///   - HTTP: `http://host:port/path`
57    ///   - TCP: `host:port` or `tcp://host:port`
58    ///   - NATS: `subject.name`
59    /// * `payload` - Request payload (encoded as bytes)
60    /// * `headers` - Custom headers for tracing, authentication, etc.
61    ///
62    /// # Returns
63    ///
64    /// Returns an acknowledgment response. Note that for streaming responses,
65    /// the actual response data comes over the TCP response plane, not through
66    /// this acknowledgment.
67    ///
68    /// # Errors
69    ///
70    /// Returns an error if:
71    /// - Connection to the endpoint fails
72    /// - Request times out
73    /// - Transport-specific errors occur (e.g., NATS server unavailable)
74    async fn send_request(
75        &self,
76        address: String,
77        payload: Bytes,
78        headers: Headers,
79    ) -> Result<Bytes>;
80
81    /// Get the transport name
82    ///
83    /// Returns a static string identifier for the transport type.
84    /// Used for logging and debugging.
85    ///
86    /// # Examples
87    ///
88    /// - `"tcp"` - Raw TCP transport
89    /// - `"http"` or `"http2"` - HTTP/2 transport
90    /// - `"nats"` - NATS messaging
91    fn transport_name(&self) -> &'static str;
92
93    /// Check connection health
94    ///
95    /// Returns `true` if the client is healthy and ready to send requests.
96    /// This is a lightweight check that doesn't perform actual network I/O.
97    ///
98    /// Implementations should return `false` if:
99    /// - Connection pool is exhausted
100    /// - Underlying transport is disconnected
101    /// - Client has been explicitly closed
102    fn is_healthy(&self) -> bool;
103
104    /// Get client statistics (optional)
105    ///
106    /// Returns runtime statistics about the client for monitoring and debugging.
107    /// Default implementation returns empty statistics.
108    fn stats(&self) -> ClientStats {
109        ClientStats::default()
110    }
111
112    /// Close the client gracefully (optional)
113    ///
114    /// Implementations should:
115    /// - Close all active connections
116    /// - Wait for in-flight requests to complete (or timeout)
117    /// - Release all resources
118    ///
119    /// Default implementation does nothing.
120    async fn close(&self) -> Result<()> {
121        Ok(())
122    }
123}
124
125/// Client runtime statistics
126///
127/// Used for monitoring and debugging transport client performance.
128#[derive(Debug, Clone, Default)]
129pub struct ClientStats {
130    /// Total number of requests sent
131    pub requests_sent: u64,
132
133    /// Total number of successful responses
134    pub responses_received: u64,
135
136    /// Total number of errors
137    pub errors: u64,
138
139    /// Total bytes sent
140    pub bytes_sent: u64,
141
142    /// Total bytes received
143    pub bytes_received: u64,
144
145    /// Number of active connections (for connection-pooled transports)
146    pub active_connections: usize,
147
148    /// Number of idle connections in pool
149    pub idle_connections: usize,
150
151    /// Average request latency in microseconds (0 if not available)
152    pub avg_latency_us: u64,
153}
154
155impl ClientStats {
156    /// Create new empty statistics
157    pub fn new() -> Self {
158        Self::default()
159    }
160
161    /// Check if statistics are available (non-zero)
162    pub fn is_available(&self) -> bool {
163        self.requests_sent > 0 || self.active_connections > 0
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[test]
172    fn test_client_stats_default() {
173        let stats = ClientStats::default();
174        assert_eq!(stats.requests_sent, 0);
175        assert_eq!(stats.responses_received, 0);
176        assert!(!stats.is_available());
177    }
178
179    #[test]
180    fn test_client_stats_is_available() {
181        let mut stats = ClientStats::default();
182        assert!(!stats.is_available());
183
184        stats.requests_sent = 1;
185        assert!(stats.is_available());
186
187        let stats2 = ClientStats {
188            active_connections: 1,
189            ..Default::default()
190        };
191        assert!(stats2.is_available());
192    }
193}