Skip to main content

dynamo_runtime/component/
endpoint.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use educe::Educe;
10use tokio_util::sync::CancellationToken;
11
12use crate::{
13    component::{DeviceType, Endpoint, Instance, TransportType},
14    distributed::RequestPlaneMode,
15    pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
16    protocols::EndpointId,
17    traits::DistributedRuntimeProvider,
18    transports::nats,
19};
20
21fn endpoint_device_type() -> Option<DeviceType> {
22    // Common CUDA masks that explicitly disable GPU visibility.
23    if std::env::var("CUDA_VISIBLE_DEVICES")
24        .ok()
25        .map(|v| {
26            let l = v.trim().to_ascii_lowercase();
27            l.is_empty() || l == "-1" || l == "none" || l == "void"
28        })
29        .unwrap_or(false)
30    {
31        return Some(DeviceType::Cpu);
32    }
33
34    // Container runtimes often use NVIDIA_VISIBLE_DEVICES to gate GPU visibility.
35    if std::env::var("NVIDIA_VISIBLE_DEVICES")
36        .ok()
37        .map(|v| {
38            let l = v.trim().to_ascii_lowercase();
39            l == "none" || l == "void"
40        })
41        .unwrap_or(false)
42    {
43        return Some(DeviceType::Cpu);
44    }
45
46    // Default: no explicit CPU override means this endpoint is CUDA-capable.
47    Some(DeviceType::Cuda)
48}
49
50#[derive(Educe, Builder, Dissolve)]
51#[educe(Debug)]
52#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
53pub struct EndpointConfig {
54    #[builder(private)]
55    endpoint: Endpoint,
56
57    /// Endpoint handler
58    #[educe(Debug(ignore))]
59    handler: Arc<dyn PushWorkHandler>,
60
61    /// Additional labels for metrics
62    #[builder(default, setter(into))]
63    metrics_labels: Option<Vec<(String, String)>>,
64
65    /// Whether to wait for inflight requests to complete during shutdown
66    #[builder(default = "true")]
67    graceful_shutdown: bool,
68
69    /// Health check payload for this endpoint
70    /// This payload will be sent to the endpoint during health checks
71    /// to verify it's responding properly
72    #[educe(Debug(ignore))]
73    #[builder(default, setter(into, strip_option))]
74    health_check_payload: Option<serde_json::Value>,
75}
76
77impl EndpointConfigBuilder {
78    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
79        Self::default().endpoint(endpoint)
80    }
81
82    /// Register an async engine in the local endpoint registry for direct in-process calls
83    pub fn register_local_engine(
84        self,
85        engine: crate::local_endpoint_registry::LocalAsyncEngine,
86    ) -> Result<Self> {
87        if let Some(endpoint) = &self.endpoint {
88            let registry = endpoint.drt().local_endpoint_registry();
89            registry.register(endpoint.name.clone(), engine);
90            tracing::debug!(
91                "Registered engine for endpoint '{}' in local registry",
92                endpoint.name
93            );
94        }
95        Ok(self)
96    }
97
98    pub async fn start(self) -> Result<()> {
99        let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
100            self.build_internal()?.dissolve();
101        let connection_id = endpoint.drt().connection_id();
102        let endpoint_id = endpoint.id();
103
104        tracing::debug!("Starting endpoint: {endpoint_id}");
105
106        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
107            .as_ref()
108            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
109        // Add metrics to the handler. The endpoint provides additional information to the handler.
110        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
111
112        // This creates a child token of the runtime's endpoint_shutdown_token. That token is
113        // cancelled first as part of graceful shutdown. See Runtime::shutdown.
114        let endpoint_shutdown_token = endpoint.drt().child_token();
115
116        let system_health = endpoint.drt().system_health();
117
118        // Register with graceful shutdown tracker if needed
119        if graceful_shutdown {
120            tracing::debug!(
121                "Registering endpoint '{}' with graceful shutdown tracker",
122                endpoint.name
123            );
124            let tracker = endpoint.drt().graceful_shutdown_tracker();
125            tracker.register_endpoint();
126        } else {
127            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
128        }
129
130        // Launch endpoint based on request plane mode
131        let tracker_clone = if graceful_shutdown {
132            Some(endpoint.drt().graceful_shutdown_tracker())
133        } else {
134            None
135        };
136
137        // Create clones for the async closure
138        let namespace_name_for_task = endpoint_id.namespace.clone();
139        let component_name_for_task = endpoint_id.component.clone();
140        let endpoint_name_for_task = endpoint_id.name.clone();
141
142        // Get the unified request plane server
143        let server = endpoint.drt().request_plane_server().await?;
144
145        // Register health check target in SystemHealth if provided
146        if let Some(health_check_payload) = &health_check_payload {
147            if system_health.lock().health_check_enabled()
148                && endpoint
149                    .drt()
150                    .local_endpoint_registry()
151                    .get(&endpoint.name)
152                    .is_none()
153            {
154                anyhow::bail!(
155                    "Endpoint '{}' has a health_check_payload and canary is enabled, \
156                     but no local engine is registered. Call .register_local_engine() \
157                     before .start() so the canary health check can function.",
158                    endpoint.name
159                );
160            }
161
162            // Build transport based on request plane mode
163            let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
164
165            let instance = Instance {
166                component: endpoint_id.component.clone(),
167                endpoint: endpoint_id.name.clone(),
168                namespace: endpoint_id.namespace.clone(),
169                instance_id: connection_id,
170                transport,
171                device_type: endpoint_device_type(),
172            };
173            tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
174            let guard = system_health.lock();
175            guard.register_health_check_target(
176                &endpoint.name,
177                instance,
178                health_check_payload.clone(),
179            );
180            if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
181                handler.set_endpoint_health_check_notifier(notifier)?;
182            }
183        }
184
185        tracing::debug!(
186            endpoint = %endpoint_name_for_task,
187            transport = server.transport_name(),
188            "Registering endpoint with request plane server"
189        );
190
191        // Register endpoint with the server (unified interface)
192        server
193            .register_endpoint(
194                endpoint_name_for_task.clone(),
195                handler,
196                connection_id,
197                namespace_name_for_task.clone(),
198                component_name_for_task.clone(),
199                system_health.clone(),
200            )
201            .await?;
202
203        // Create cleanup task that unregisters on cancellation
204        let endpoint_name_for_cleanup = endpoint_name_for_task.clone();
205        let server_for_cleanup = server.clone();
206        let cancel_token_for_cleanup = endpoint_shutdown_token.clone();
207
208        let task: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
209            cancel_token_for_cleanup.cancelled().await;
210
211            tracing::debug!(
212                endpoint = %endpoint_name_for_cleanup,
213                "Unregistering endpoint from request plane server"
214            );
215
216            // Unregister from server
217            if let Err(e) = server_for_cleanup
218                .unregister_endpoint(&endpoint_name_for_cleanup)
219                .await
220            {
221                tracing::warn!(
222                    endpoint = %endpoint_name_for_cleanup,
223                    error = %e,
224                    "Failed to unregister endpoint"
225                );
226            }
227
228            // Unregister from graceful shutdown tracker
229            if let Some(tracker) = tracker_clone {
230                tracing::debug!("Unregister endpoint from graceful shutdown tracker");
231                tracker.unregister_endpoint();
232            }
233
234            anyhow::Ok(())
235        });
236
237        // Register this endpoint instance in the discovery plane
238        // The discovery interface abstracts storage backend (etcd, k8s, etc) and provides
239        // consistent registration/discovery across the system.
240        let discovery = endpoint.drt().discovery();
241
242        // Build transport for discovery service based on request plane mode
243        let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
244
245        let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
246            namespace: endpoint_id.namespace.clone(),
247            component: endpoint_id.component.clone(),
248            endpoint: endpoint_id.name.clone(),
249            transport,
250            device_type: endpoint_device_type(),
251        };
252
253        if let Err(e) = discovery.register(discovery_spec).await {
254            tracing::error!(
255                %endpoint_id,
256                error = %e,
257                "Unable to register service for discovery"
258            );
259            endpoint_shutdown_token.cancel();
260            anyhow::bail!(
261                "Unable to register service for discovery. Check discovery service status"
262            );
263        }
264
265        task.await??;
266
267        Ok(())
268    }
269}
270
271/// Build transport type based on request plane mode
272///
273/// This function handles both health check and discovery transport building.
274/// All transport modes use consistent addressing:
275/// - TCP: Includes instance_id and endpoint name for routing (e.g., host:port/instance_id_hex/endpoint_name)
276/// - NATS: Uses subject-based addressing (unique per endpoint)
277///
278/// # Errors
279/// Returns an error if TCP mode is used but the TCP server hasn't been started yet.
280fn build_transport_type_inner(
281    mode: RequestPlaneMode,
282    endpoint_id: &EndpointId,
283    connection_id: u64,
284) -> Result<TransportType> {
285    match mode {
286        RequestPlaneMode::Tcp => {
287            let tcp_host = crate::utils::tcp_rpc_host_from_env();
288            // If a fixed port is explicitly configured, use it directly (no init ordering dependency).
289            // Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used).
290            let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
291                .ok()
292                .and_then(|p| p.parse::<u16>().ok())
293                .filter(|&p| p != 0)
294                .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
295
296            // Include instance_id and endpoint name for proper TCP routing.
297            // Format: host:port/instance_id_hex/endpoint_name
298            // This ensures each worker has a unique routing key when multiple workers
299            // share the same TCP server (e.g., --num-workers > 1).
300            let tcp_endpoint = format!(
301                "{}:{}/{:x}/{}",
302                tcp_host, tcp_port, connection_id, endpoint_id.name
303            );
304
305            Ok(TransportType::Tcp(tcp_endpoint))
306        }
307        RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
308            endpoint_id,
309            connection_id,
310        ))),
311    }
312}
313
314/// Build transport type, ensuring TCP server is initialized when needed.
315///
316/// In TCP mode with an OS-assigned port (`DYN_TCP_RPC_PORT` unset or invalid), the server must bind
317/// before we can construct a correct transport address. This helper ensures that initialization
318/// occurs, then delegates to the internal builder.
319pub async fn build_transport_type(
320    endpoint: &Endpoint,
321    endpoint_id: &EndpointId,
322    connection_id: u64,
323) -> Result<TransportType> {
324    let mode = endpoint.drt().request_plane();
325
326    // For TCP with OS-assigned ports, we must ensure the server is initialized
327    // (bound to a port) before we can construct a correct transport address.
328    let has_fixed_port = match mode {
329        RequestPlaneMode::Tcp => std::env::var("DYN_TCP_RPC_PORT")
330            .ok()
331            .and_then(|p| p.parse::<u16>().ok())
332            .filter(|&p| p != 0)
333            .is_some(),
334        RequestPlaneMode::Nats => true, // NATS doesn't need port init
335    };
336
337    if !has_fixed_port {
338        // Ensure request plane server is initialized before building transport.
339        let _ = endpoint.drt().request_plane_server().await?;
340    }
341
342    build_transport_type_inner(mode, endpoint_id, connection_id)
343}
344
345impl Endpoint {
346    /// Unregister this endpoint instance from discovery.
347    ///
348    /// This removes the endpoint from the instances bucket, preventing the router
349    /// from sending requests to this worker. Use this when a worker is sleeping
350    /// and should not receive any requests.
351    pub async fn unregister_endpoint_instance(&self) -> anyhow::Result<()> {
352        let drt = self.drt();
353        let instance_id = drt.connection_id();
354        let endpoint_id = self.id();
355
356        // Get the transport type for the endpoint
357        let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
358
359        let instance = crate::discovery::DiscoveryInstance::Endpoint(Instance {
360            namespace: endpoint_id.namespace,
361            component: endpoint_id.component,
362            endpoint: endpoint_id.name,
363            instance_id,
364            transport,
365            device_type: endpoint_device_type(),
366        });
367
368        let discovery = drt.discovery();
369        if let Err(e) = discovery.unregister(instance).await {
370            let endpoint_id = self.id();
371            tracing::error!(
372                %endpoint_id,
373                error = %e,
374                "Unable to unregister endpoint instance from discovery"
375            );
376            anyhow::bail!(
377                "Unable to unregister endpoint instance from discovery. Check discovery service status"
378            );
379        }
380
381        tracing::info!(
382            instance_id = instance_id,
383            "Successfully unregistered endpoint instance from discovery - worker removed from routing pool"
384        );
385
386        Ok(())
387    }
388
389    /// Re-register this endpoint instance to discovery.
390    ///
391    /// This adds the endpoint back to the instances bucket, allowing the router
392    /// to send requests to this worker again. Use this when a worker wakes up
393    /// and should start receiving requests.
394    pub async fn register_endpoint_instance(&self) -> anyhow::Result<()> {
395        let drt = self.drt();
396        let instance_id = drt.connection_id();
397        let endpoint_id = self.id();
398
399        // Get the transport type for the endpoint
400        let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
401
402        let spec = crate::discovery::DiscoverySpec::Endpoint {
403            namespace: endpoint_id.namespace,
404            component: endpoint_id.component,
405            endpoint: endpoint_id.name,
406            transport,
407            device_type: endpoint_device_type(),
408        };
409
410        let discovery = drt.discovery();
411        if let Err(e) = discovery.register(spec).await {
412            let endpoint_id = self.id();
413            tracing::error!(
414                %endpoint_id,
415                error = %e,
416                "Unable to re-register endpoint instance to discovery"
417            );
418            anyhow::bail!(
419                "Unable to re-register endpoint instance to discovery. Check discovery service status"
420            );
421        }
422
423        tracing::info!(
424            instance_id = instance_id,
425            "Successfully re-registered endpoint instance to discovery - worker added back to routing pool"
426        );
427
428        Ok(())
429    }
430}