Skip to main content

dynamo_runtime/component/
endpoint.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use educe::Educe;
10use tokio_util::sync::CancellationToken;
11
12use crate::{
13    component::{DeviceType, Endpoint, Instance, TransportType},
14    distributed::RequestPlaneMode,
15    pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
16    protocols::EndpointId,
17    traits::DistributedRuntimeProvider,
18    transports::nats,
19};
20
21fn endpoint_device_type() -> Option<DeviceType> {
22    // Common CUDA masks that explicitly disable GPU visibility.
23    if std::env::var("CUDA_VISIBLE_DEVICES")
24        .ok()
25        .map(|v| {
26            let l = v.trim().to_ascii_lowercase();
27            l.is_empty() || l == "-1" || l == "none" || l == "void"
28        })
29        .unwrap_or(false)
30    {
31        return Some(DeviceType::Cpu);
32    }
33
34    // Container runtimes often use NVIDIA_VISIBLE_DEVICES to gate GPU visibility.
35    if std::env::var("NVIDIA_VISIBLE_DEVICES")
36        .ok()
37        .map(|v| {
38            let l = v.trim().to_ascii_lowercase();
39            l == "none" || l == "void"
40        })
41        .unwrap_or(false)
42    {
43        return Some(DeviceType::Cpu);
44    }
45
46    // Default: no explicit CPU override means this endpoint is CUDA-capable.
47    Some(DeviceType::Cuda)
48}
49
50#[derive(Educe, Builder, Dissolve)]
51#[educe(Debug)]
52#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
53pub struct EndpointConfig {
54    #[builder(private)]
55    endpoint: Endpoint,
56
57    /// Endpoint handler
58    #[educe(Debug(ignore))]
59    handler: Arc<dyn PushWorkHandler>,
60
61    /// Additional labels for metrics
62    #[builder(default, setter(into))]
63    metrics_labels: Option<Vec<(String, String)>>,
64
65    /// Whether to wait for inflight requests to complete during shutdown
66    #[builder(default = "true")]
67    graceful_shutdown: bool,
68
69    /// Health check payload for this endpoint
70    /// This payload will be sent to the endpoint during health checks
71    /// to verify it's responding properly
72    #[educe(Debug(ignore))]
73    #[builder(default, setter(into, strip_option))]
74    health_check_payload: Option<serde_json::Value>,
75}
76
77impl EndpointConfigBuilder {
78    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
79        Self::default().endpoint(endpoint)
80    }
81
82    /// Register an async engine in the local endpoint registry for direct in-process calls
83    pub fn register_local_engine(
84        self,
85        engine: crate::local_endpoint_registry::LocalAsyncEngine,
86    ) -> Result<Self> {
87        if let Some(endpoint) = &self.endpoint {
88            let registry = endpoint.drt().local_endpoint_registry();
89            registry.register(endpoint.name.clone(), engine);
90            tracing::debug!(
91                "Registered engine for endpoint '{}' in local registry",
92                endpoint.name
93            );
94        }
95        Ok(self)
96    }
97
98    pub async fn start(self) -> Result<()> {
99        let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
100            self.build_internal()?.dissolve();
101        let connection_id = endpoint.drt().connection_id();
102        let endpoint_id = endpoint.id();
103
104        tracing::debug!("Starting endpoint: {endpoint_id}");
105
106        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
107            .as_ref()
108            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
109        // Add metrics to the handler. The endpoint provides additional information to the handler.
110        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
111
112        // This creates a child token of the runtime's endpoint_shutdown_token. That token is
113        // cancelled first as part of graceful shutdown. See Runtime::shutdown.
114        let endpoint_shutdown_token = endpoint.drt().child_token();
115
116        let system_health = endpoint.drt().system_health();
117
118        // Register with graceful shutdown tracker if needed
119        if graceful_shutdown {
120            tracing::debug!(
121                "Registering endpoint '{}' with graceful shutdown tracker",
122                endpoint.name
123            );
124            let tracker = endpoint.drt().graceful_shutdown_tracker();
125            tracker.register_endpoint();
126        } else {
127            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
128        }
129
130        // Launch endpoint based on request plane mode
131        let tracker_clone = if graceful_shutdown {
132            Some(endpoint.drt().graceful_shutdown_tracker())
133        } else {
134            None
135        };
136
137        // Create clones for the async closure
138        let namespace_name_for_task = endpoint_id.namespace.clone();
139        let component_name_for_task = endpoint_id.component.clone();
140        let endpoint_name_for_task = endpoint_id.name.clone();
141
142        // Get the unified request plane server
143        let server = endpoint.drt().request_plane_server().await?;
144
145        // Register health check target in SystemHealth if provided
146        if let Some(health_check_payload) = &health_check_payload {
147            if system_health.lock().health_check_enabled()
148                && endpoint
149                    .drt()
150                    .local_endpoint_registry()
151                    .get(&endpoint.name)
152                    .is_none()
153            {
154                anyhow::bail!(
155                    "Endpoint '{}' has a health_check_payload and canary is enabled, \
156                     but no local engine is registered. Call .register_local_engine() \
157                     before .start() so the canary health check can function.",
158                    endpoint.name
159                );
160            }
161
162            // Build transport based on request plane mode
163            let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
164
165            let instance = Instance {
166                component: endpoint_id.component.clone(),
167                endpoint: endpoint_id.name.clone(),
168                namespace: endpoint_id.namespace.clone(),
169                instance_id: connection_id,
170                transport,
171                device_type: endpoint_device_type(),
172            };
173            tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
174            let guard = system_health.lock();
175            guard.register_health_check_target(
176                &endpoint.name,
177                instance,
178                health_check_payload.clone(),
179            );
180            if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
181                handler.set_endpoint_health_check_notifier(notifier)?;
182            }
183        }
184
185        tracing::debug!(
186            endpoint = %endpoint_name_for_task,
187            transport = server.transport_name(),
188            "Registering endpoint with request plane server"
189        );
190
191        // Register endpoint with the server (unified interface)
192        server
193            .register_endpoint(
194                endpoint_name_for_task.clone(),
195                handler,
196                connection_id,
197                namespace_name_for_task.clone(),
198                component_name_for_task.clone(),
199                system_health.clone(),
200            )
201            .await?;
202
203        // Create cleanup task that unregisters on cancellation
204        let endpoint_name_for_cleanup = endpoint_name_for_task.clone();
205        let server_for_cleanup = server.clone();
206        let cancel_token_for_cleanup = endpoint_shutdown_token.clone();
207
208        let task: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
209            cancel_token_for_cleanup.cancelled().await;
210
211            tracing::debug!(
212                endpoint = %endpoint_name_for_cleanup,
213                "Unregistering endpoint from request plane server"
214            );
215
216            // Unregister from server
217            if let Err(e) = server_for_cleanup
218                .unregister_endpoint(&endpoint_name_for_cleanup)
219                .await
220            {
221                tracing::warn!(
222                    endpoint = %endpoint_name_for_cleanup,
223                    error = %e,
224                    "Failed to unregister endpoint"
225                );
226            }
227
228            // Unregister from graceful shutdown tracker
229            if let Some(tracker) = tracker_clone {
230                tracing::debug!("Unregister endpoint from graceful shutdown tracker");
231                tracker.unregister_endpoint();
232            }
233
234            anyhow::Ok(())
235        });
236
237        // Register this endpoint instance in the discovery plane
238        // The discovery interface abstracts storage backend (etcd, k8s, etc) and provides
239        // consistent registration/discovery across the system.
240        let discovery = endpoint.drt().discovery();
241
242        // Build transport for discovery service based on request plane mode
243        let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
244
245        let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
246            namespace: endpoint_id.namespace.clone(),
247            component: endpoint_id.component.clone(),
248            endpoint: endpoint_id.name.clone(),
249            transport,
250            device_type: endpoint_device_type(),
251        };
252
253        if let Err(e) = discovery.register(discovery_spec).await {
254            tracing::error!(
255                %endpoint_id,
256                error = %e,
257                "Unable to register service for discovery"
258            );
259            endpoint_shutdown_token.cancel();
260            anyhow::bail!(
261                "Unable to register service for discovery. Check discovery service status"
262            );
263        }
264
265        task.await??;
266
267        Ok(())
268    }
269}
270
271/// Build transport type based on request plane mode
272///
273/// This function handles both health check and discovery transport building.
274/// All transport modes use consistent addressing:
275/// - HTTP: Uses full URL path including endpoint name (e.g., http://host:port/v1/rpc/endpoint_name)
276/// - TCP: Includes instance_id and endpoint name for routing (e.g., host:port/instance_id_hex/endpoint_name)
277/// - NATS: Uses subject-based addressing (unique per endpoint)
278///
279/// # Errors
280/// Returns an error if TCP mode is used but the TCP server hasn't been started yet.
281fn build_transport_type_inner(
282    mode: RequestPlaneMode,
283    endpoint_id: &EndpointId,
284    connection_id: u64,
285) -> Result<TransportType> {
286    match mode {
287        RequestPlaneMode::Http => {
288            let http_host = crate::utils::get_http_rpc_host_from_env();
289            // If a fixed port is explicitly configured, use it directly.
290            // Otherwise, use the actual bound port (set by HTTP server after binding when port 0 is used).
291            let http_port = std::env::var("DYN_HTTP_RPC_PORT")
292                .ok()
293                .and_then(|p| p.parse::<u16>().ok())
294                .filter(|&p| p != 0)
295                .unwrap_or(crate::pipeline::network::manager::get_actual_http_rpc_port()?);
296            let rpc_root =
297                std::env::var("DYN_HTTP_RPC_ROOT_PATH").unwrap_or_else(|_| "/v1/rpc".to_string());
298
299            let http_endpoint = format!(
300                "http://{http_host}:{http_port}{rpc_root}/{}",
301                endpoint_id.name
302            );
303
304            Ok(TransportType::Http(http_endpoint))
305        }
306        RequestPlaneMode::Tcp => {
307            let tcp_host = crate::utils::get_tcp_rpc_host_from_env();
308            // If a fixed port is explicitly configured, use it directly (no init ordering dependency).
309            // Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used).
310            let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
311                .ok()
312                .and_then(|p| p.parse::<u16>().ok())
313                .filter(|&p| p != 0)
314                .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
315
316            // Include instance_id and endpoint name for proper TCP routing.
317            // Format: host:port/instance_id_hex/endpoint_name
318            // This ensures each worker has a unique routing key when multiple workers
319            // share the same TCP server (e.g., --num-workers > 1).
320            let tcp_endpoint = format!(
321                "{}:{}/{:x}/{}",
322                tcp_host, tcp_port, connection_id, endpoint_id.name
323            );
324
325            Ok(TransportType::Tcp(tcp_endpoint))
326        }
327        RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
328            endpoint_id,
329            connection_id,
330        ))),
331    }
332}
333
334/// Build transport type, ensuring TCP server is initialized when needed.
335///
336/// In TCP mode with an OS-assigned port (`DYN_TCP_RPC_PORT` unset or invalid), the server must bind
337/// before we can construct a correct transport address. This helper ensures that initialization
338/// occurs, then delegates to the internal builder.
339pub async fn build_transport_type(
340    endpoint: &Endpoint,
341    endpoint_id: &EndpointId,
342    connection_id: u64,
343) -> Result<TransportType> {
344    let mode = endpoint.drt().request_plane();
345
346    // For TCP and HTTP with OS-assigned ports, we must ensure the server is initialized
347    // (bound to a port) before we can construct a correct transport address.
348    let has_fixed_port = match mode {
349        RequestPlaneMode::Tcp => std::env::var("DYN_TCP_RPC_PORT")
350            .ok()
351            .and_then(|p| p.parse::<u16>().ok())
352            .filter(|&p| p != 0)
353            .is_some(),
354        RequestPlaneMode::Http => std::env::var("DYN_HTTP_RPC_PORT")
355            .ok()
356            .and_then(|p| p.parse::<u16>().ok())
357            .filter(|&p| p != 0)
358            .is_some(),
359        RequestPlaneMode::Nats => true, // NATS doesn't need port init
360    };
361
362    if !has_fixed_port {
363        // Ensure request plane server is initialized before building transport.
364        let _ = endpoint.drt().request_plane_server().await?;
365    }
366
367    build_transport_type_inner(mode, endpoint_id, connection_id)
368}
369
370impl Endpoint {
371    /// Unregister this endpoint instance from discovery.
372    ///
373    /// This removes the endpoint from the instances bucket, preventing the router
374    /// from sending requests to this worker. Use this when a worker is sleeping
375    /// and should not receive any requests.
376    pub async fn unregister_endpoint_instance(&self) -> anyhow::Result<()> {
377        let drt = self.drt();
378        let instance_id = drt.connection_id();
379        let endpoint_id = self.id();
380
381        // Get the transport type for the endpoint
382        let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
383
384        let instance = crate::discovery::DiscoveryInstance::Endpoint(Instance {
385            namespace: endpoint_id.namespace,
386            component: endpoint_id.component,
387            endpoint: endpoint_id.name,
388            instance_id,
389            transport,
390            device_type: endpoint_device_type(),
391        });
392
393        let discovery = drt.discovery();
394        if let Err(e) = discovery.unregister(instance).await {
395            let endpoint_id = self.id();
396            tracing::error!(
397                %endpoint_id,
398                error = %e,
399                "Unable to unregister endpoint instance from discovery"
400            );
401            anyhow::bail!(
402                "Unable to unregister endpoint instance from discovery. Check discovery service status"
403            );
404        }
405
406        tracing::info!(
407            instance_id = instance_id,
408            "Successfully unregistered endpoint instance from discovery - worker removed from routing pool"
409        );
410
411        Ok(())
412    }
413
414    /// Re-register this endpoint instance to discovery.
415    ///
416    /// This adds the endpoint back to the instances bucket, allowing the router
417    /// to send requests to this worker again. Use this when a worker wakes up
418    /// and should start receiving requests.
419    pub async fn register_endpoint_instance(&self) -> anyhow::Result<()> {
420        let drt = self.drt();
421        let instance_id = drt.connection_id();
422        let endpoint_id = self.id();
423
424        // Get the transport type for the endpoint
425        let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
426
427        let spec = crate::discovery::DiscoverySpec::Endpoint {
428            namespace: endpoint_id.namespace,
429            component: endpoint_id.component,
430            endpoint: endpoint_id.name,
431            transport,
432            device_type: endpoint_device_type(),
433        };
434
435        let discovery = drt.discovery();
436        if let Err(e) = discovery.register(spec).await {
437            let endpoint_id = self.id();
438            tracing::error!(
439                %endpoint_id,
440                error = %e,
441                "Unable to re-register endpoint instance to discovery"
442            );
443            anyhow::bail!(
444                "Unable to re-register endpoint instance to discovery. Check discovery service status"
445            );
446        }
447
448        tracing::info!(
449            instance_id = instance_id,
450            "Successfully re-registered endpoint instance to discovery - worker added back to routing pool"
451        );
452
453        Ok(())
454    }
455}