Skip to main content

dynamo_runtime/component/
endpoint.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use educe::Educe;
10use tokio_util::sync::CancellationToken;
11
12use crate::{
13    component::{DeviceType, Endpoint, Instance, TransportType},
14    distributed::RequestPlaneMode,
15    pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
16    protocols::EndpointId,
17    traits::DistributedRuntimeProvider,
18    transports::nats,
19};
20
21fn endpoint_device_type() -> Option<DeviceType> {
22    // Common CUDA masks that explicitly disable GPU visibility.
23    if std::env::var("CUDA_VISIBLE_DEVICES")
24        .ok()
25        .map(|v| {
26            let l = v.trim().to_ascii_lowercase();
27            l.is_empty() || l == "-1" || l == "none" || l == "void"
28        })
29        .unwrap_or(false)
30    {
31        return Some(DeviceType::Cpu);
32    }
33
34    // Container runtimes often use NVIDIA_VISIBLE_DEVICES to gate GPU visibility.
35    if std::env::var("NVIDIA_VISIBLE_DEVICES")
36        .ok()
37        .map(|v| {
38            let l = v.trim().to_ascii_lowercase();
39            l == "none" || l == "void"
40        })
41        .unwrap_or(false)
42    {
43        return Some(DeviceType::Cpu);
44    }
45
46    // Default: no explicit CPU override means this endpoint is CUDA-capable.
47    Some(DeviceType::Cuda)
48}
49
50#[derive(Educe, Builder, Dissolve)]
51#[educe(Debug)]
52#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
53pub struct EndpointConfig {
54    #[builder(private)]
55    endpoint: Endpoint,
56
57    /// Endpoint handler
58    #[educe(Debug(ignore))]
59    handler: Arc<dyn PushWorkHandler>,
60
61    /// Additional labels for metrics
62    #[builder(default, setter(into))]
63    metrics_labels: Option<Vec<(String, String)>>,
64
65    /// Whether to wait for inflight requests to complete during shutdown
66    #[builder(default = "true")]
67    graceful_shutdown: bool,
68
69    /// Health check payload for this endpoint
70    /// This payload will be sent to the endpoint during health checks
71    /// to verify it's responding properly
72    #[educe(Debug(ignore))]
73    #[builder(default, setter(into, strip_option))]
74    health_check_payload: Option<serde_json::Value>,
75}
76
77impl EndpointConfigBuilder {
78    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
79        Self::default().endpoint(endpoint)
80    }
81
82    /// Register an async engine in the local endpoint registry for direct in-process calls
83    pub fn register_local_engine(
84        self,
85        engine: crate::local_endpoint_registry::LocalAsyncEngine,
86    ) -> Result<Self> {
87        if let Some(endpoint) = &self.endpoint {
88            let registry = endpoint.drt().local_endpoint_registry();
89            registry.register(endpoint.name.clone(), engine);
90            tracing::debug!(
91                "Registered engine for endpoint '{}' in local registry",
92                endpoint.name
93            );
94        }
95        Ok(self)
96    }
97
98    pub async fn start(self) -> Result<()> {
99        let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
100            self.build_internal()?.dissolve();
101        let connection_id = endpoint.drt().connection_id();
102        let endpoint_id = endpoint.id();
103
104        tracing::debug!("Starting endpoint: {endpoint_id}");
105
106        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
107            .as_ref()
108            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
109        // Add metrics to the handler. The endpoint provides additional information to the handler.
110        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
111
112        // This creates a child token of the runtime's endpoint_shutdown_token. That token is
113        // cancelled first as part of graceful shutdown. See Runtime::shutdown.
114        let endpoint_shutdown_token = endpoint.drt().child_token();
115
116        let system_health = endpoint.drt().system_health();
117
118        // Register with graceful shutdown tracker if needed
119        if graceful_shutdown {
120            tracing::debug!(
121                "Registering endpoint '{}' with graceful shutdown tracker",
122                endpoint.name
123            );
124            let tracker = endpoint.drt().graceful_shutdown_tracker();
125            tracker.register_endpoint();
126        } else {
127            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
128        }
129
130        // Launch endpoint based on request plane mode
131        let tracker_clone = if graceful_shutdown {
132            Some(endpoint.drt().graceful_shutdown_tracker())
133        } else {
134            None
135        };
136
137        // Create clones for the async closure
138        let namespace_name_for_task = endpoint_id.namespace.clone();
139        let component_name_for_task = endpoint_id.component.clone();
140        let endpoint_name_for_task = endpoint_id.name.clone();
141
142        // Get the unified request plane server
143        let server = endpoint.drt().request_plane_server().await?;
144
145        // Register health check target in SystemHealth if provided
146        if let Some(health_check_payload) = &health_check_payload {
147            // Build transport based on request plane mode
148            let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
149
150            let instance = Instance {
151                component: endpoint_id.component.clone(),
152                endpoint: endpoint_id.name.clone(),
153                namespace: endpoint_id.namespace.clone(),
154                instance_id: connection_id,
155                transport,
156                device_type: endpoint_device_type(),
157            };
158            tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
159            let guard = system_health.lock();
160            guard.register_health_check_target(
161                &endpoint.name,
162                instance,
163                health_check_payload.clone(),
164            );
165            if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
166                handler.set_endpoint_health_check_notifier(notifier)?;
167            }
168        }
169
170        tracing::debug!(
171            endpoint = %endpoint_name_for_task,
172            transport = server.transport_name(),
173            "Registering endpoint with request plane server"
174        );
175
176        // Register endpoint with the server (unified interface)
177        server
178            .register_endpoint(
179                endpoint_name_for_task.clone(),
180                handler,
181                connection_id,
182                namespace_name_for_task.clone(),
183                component_name_for_task.clone(),
184                system_health.clone(),
185            )
186            .await?;
187
188        // Create cleanup task that unregisters on cancellation
189        let endpoint_name_for_cleanup = endpoint_name_for_task.clone();
190        let server_for_cleanup = server.clone();
191        let cancel_token_for_cleanup = endpoint_shutdown_token.clone();
192
193        let task: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
194            cancel_token_for_cleanup.cancelled().await;
195
196            tracing::debug!(
197                endpoint = %endpoint_name_for_cleanup,
198                "Unregistering endpoint from request plane server"
199            );
200
201            // Unregister from server
202            if let Err(e) = server_for_cleanup
203                .unregister_endpoint(&endpoint_name_for_cleanup)
204                .await
205            {
206                tracing::warn!(
207                    endpoint = %endpoint_name_for_cleanup,
208                    error = %e,
209                    "Failed to unregister endpoint"
210                );
211            }
212
213            // Unregister from graceful shutdown tracker
214            if let Some(tracker) = tracker_clone {
215                tracing::debug!("Unregister endpoint from graceful shutdown tracker");
216                tracker.unregister_endpoint();
217            }
218
219            anyhow::Ok(())
220        });
221
222        // Register this endpoint instance in the discovery plane
223        // The discovery interface abstracts storage backend (etcd, k8s, etc) and provides
224        // consistent registration/discovery across the system.
225        let discovery = endpoint.drt().discovery();
226
227        // Build transport for discovery service based on request plane mode
228        let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
229
230        let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
231            namespace: endpoint_id.namespace.clone(),
232            component: endpoint_id.component.clone(),
233            endpoint: endpoint_id.name.clone(),
234            transport,
235            device_type: endpoint_device_type(),
236        };
237
238        if let Err(e) = discovery.register(discovery_spec).await {
239            tracing::error!(
240                %endpoint_id,
241                error = %e,
242                "Unable to register service for discovery"
243            );
244            endpoint_shutdown_token.cancel();
245            anyhow::bail!(
246                "Unable to register service for discovery. Check discovery service status"
247            );
248        }
249
250        task.await??;
251
252        Ok(())
253    }
254}
255
256/// Build transport type based on request plane mode
257///
258/// This function handles both health check and discovery transport building.
259/// All transport modes use consistent addressing:
260/// - HTTP: Uses full URL path including endpoint name (e.g., http://host:port/v1/rpc/endpoint_name)
261/// - TCP: Includes instance_id and endpoint name for routing (e.g., host:port/instance_id_hex/endpoint_name)
262/// - NATS: Uses subject-based addressing (unique per endpoint)
263///
264/// # Errors
265/// Returns an error if TCP mode is used but the TCP server hasn't been started yet.
266fn build_transport_type_inner(
267    mode: RequestPlaneMode,
268    endpoint_id: &EndpointId,
269    connection_id: u64,
270) -> Result<TransportType> {
271    match mode {
272        RequestPlaneMode::Http => {
273            let http_host = crate::utils::get_http_rpc_host_from_env();
274            // If a fixed port is explicitly configured, use it directly.
275            // Otherwise, use the actual bound port (set by HTTP server after binding when port 0 is used).
276            let http_port = std::env::var("DYN_HTTP_RPC_PORT")
277                .ok()
278                .and_then(|p| p.parse::<u16>().ok())
279                .filter(|&p| p != 0)
280                .unwrap_or(crate::pipeline::network::manager::get_actual_http_rpc_port()?);
281            let rpc_root =
282                std::env::var("DYN_HTTP_RPC_ROOT_PATH").unwrap_or_else(|_| "/v1/rpc".to_string());
283
284            let http_endpoint = format!(
285                "http://{http_host}:{http_port}{rpc_root}/{}",
286                endpoint_id.name
287            );
288
289            Ok(TransportType::Http(http_endpoint))
290        }
291        RequestPlaneMode::Tcp => {
292            let tcp_host = crate::utils::get_tcp_rpc_host_from_env();
293            // If a fixed port is explicitly configured, use it directly (no init ordering dependency).
294            // Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used).
295            let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
296                .ok()
297                .and_then(|p| p.parse::<u16>().ok())
298                .filter(|&p| p != 0)
299                .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
300
301            // Include instance_id and endpoint name for proper TCP routing.
302            // Format: host:port/instance_id_hex/endpoint_name
303            // This ensures each worker has a unique routing key when multiple workers
304            // share the same TCP server (e.g., --num-workers > 1).
305            let tcp_endpoint = format!(
306                "{}:{}/{:x}/{}",
307                tcp_host, tcp_port, connection_id, endpoint_id.name
308            );
309
310            Ok(TransportType::Tcp(tcp_endpoint))
311        }
312        RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
313            endpoint_id,
314            connection_id,
315        ))),
316    }
317}
318
319/// Build transport type, ensuring TCP server is initialized when needed.
320///
321/// In TCP mode with an OS-assigned port (`DYN_TCP_RPC_PORT` unset or invalid), the server must bind
322/// before we can construct a correct transport address. This helper ensures that initialization
323/// occurs, then delegates to the internal builder.
324pub async fn build_transport_type(
325    endpoint: &Endpoint,
326    endpoint_id: &EndpointId,
327    connection_id: u64,
328) -> Result<TransportType> {
329    let mode = endpoint.drt().request_plane();
330
331    // For TCP and HTTP with OS-assigned ports, we must ensure the server is initialized
332    // (bound to a port) before we can construct a correct transport address.
333    let has_fixed_port = match mode {
334        RequestPlaneMode::Tcp => std::env::var("DYN_TCP_RPC_PORT")
335            .ok()
336            .and_then(|p| p.parse::<u16>().ok())
337            .filter(|&p| p != 0)
338            .is_some(),
339        RequestPlaneMode::Http => std::env::var("DYN_HTTP_RPC_PORT")
340            .ok()
341            .and_then(|p| p.parse::<u16>().ok())
342            .filter(|&p| p != 0)
343            .is_some(),
344        RequestPlaneMode::Nats => true, // NATS doesn't need port init
345    };
346
347    if !has_fixed_port {
348        // Ensure request plane server is initialized before building transport.
349        let _ = endpoint.drt().request_plane_server().await?;
350    }
351
352    build_transport_type_inner(mode, endpoint_id, connection_id)
353}
354
355impl Endpoint {
356    /// Unregister this endpoint instance from discovery.
357    ///
358    /// This removes the endpoint from the instances bucket, preventing the router
359    /// from sending requests to this worker. Use this when a worker is sleeping
360    /// and should not receive any requests.
361    pub async fn unregister_endpoint_instance(&self) -> anyhow::Result<()> {
362        let drt = self.drt();
363        let instance_id = drt.connection_id();
364        let endpoint_id = self.id();
365
366        // Get the transport type for the endpoint
367        let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
368
369        let instance = crate::discovery::DiscoveryInstance::Endpoint(Instance {
370            namespace: endpoint_id.namespace,
371            component: endpoint_id.component,
372            endpoint: endpoint_id.name,
373            instance_id,
374            transport,
375            device_type: endpoint_device_type(),
376        });
377
378        let discovery = drt.discovery();
379        if let Err(e) = discovery.unregister(instance).await {
380            let endpoint_id = self.id();
381            tracing::error!(
382                %endpoint_id,
383                error = %e,
384                "Unable to unregister endpoint instance from discovery"
385            );
386            anyhow::bail!(
387                "Unable to unregister endpoint instance from discovery. Check discovery service status"
388            );
389        }
390
391        tracing::info!(
392            instance_id = instance_id,
393            "Successfully unregistered endpoint instance from discovery - worker removed from routing pool"
394        );
395
396        Ok(())
397    }
398
399    /// Re-register this endpoint instance to discovery.
400    ///
401    /// This adds the endpoint back to the instances bucket, allowing the router
402    /// to send requests to this worker again. Use this when a worker wakes up
403    /// and should start receiving requests.
404    pub async fn register_endpoint_instance(&self) -> anyhow::Result<()> {
405        let drt = self.drt();
406        let instance_id = drt.connection_id();
407        let endpoint_id = self.id();
408
409        // Get the transport type for the endpoint
410        let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
411
412        let spec = crate::discovery::DiscoverySpec::Endpoint {
413            namespace: endpoint_id.namespace,
414            component: endpoint_id.component,
415            endpoint: endpoint_id.name,
416            transport,
417            device_type: endpoint_device_type(),
418        };
419
420        let discovery = drt.discovery();
421        if let Err(e) = discovery.register(spec).await {
422            let endpoint_id = self.id();
423            tracing::error!(
424                %endpoint_id,
425                error = %e,
426                "Unable to re-register endpoint instance to discovery"
427            );
428            anyhow::bail!(
429                "Unable to re-register endpoint instance to discovery. Check discovery service status"
430            );
431        }
432
433        tracing::info!(
434            instance_id = instance_id,
435            "Successfully re-registered endpoint instance to discovery - worker added back to routing pool"
436        );
437
438        Ok(())
439    }
440}