Skip to main content

dynamo_runtime/component/
endpoint.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use educe::Educe;
10use tokio_util::sync::CancellationToken;
11
12use crate::{
13    component::{Endpoint, Instance, TransportType},
14    distributed::RequestPlaneMode,
15    pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
16    protocols::EndpointId,
17    traits::DistributedRuntimeProvider,
18    transports::nats,
19};
20
21#[derive(Educe, Builder, Dissolve)]
22#[educe(Debug)]
23#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
24pub struct EndpointConfig {
25    #[builder(private)]
26    endpoint: Endpoint,
27
28    /// Endpoint handler
29    #[educe(Debug(ignore))]
30    handler: Arc<dyn PushWorkHandler>,
31
32    /// Additional labels for metrics
33    #[builder(default, setter(into))]
34    metrics_labels: Option<Vec<(String, String)>>,
35
36    /// Whether to wait for inflight requests to complete during shutdown
37    #[builder(default = "true")]
38    graceful_shutdown: bool,
39
40    /// Health check payload for this endpoint
41    /// This payload will be sent to the endpoint during health checks
42    /// to verify it's responding properly
43    #[educe(Debug(ignore))]
44    #[builder(default, setter(into, strip_option))]
45    health_check_payload: Option<serde_json::Value>,
46}
47
48impl EndpointConfigBuilder {
49    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
50        Self::default().endpoint(endpoint)
51    }
52
53    /// Register an async engine in the local endpoint registry for direct in-process calls
54    pub fn register_local_engine(
55        self,
56        engine: crate::local_endpoint_registry::LocalAsyncEngine,
57    ) -> Result<Self> {
58        if let Some(endpoint) = &self.endpoint {
59            let registry = endpoint.drt().local_endpoint_registry();
60            registry.register(endpoint.name.clone(), engine);
61            tracing::debug!(
62                "Registered engine for endpoint '{}' in local registry",
63                endpoint.name
64            );
65        }
66        Ok(self)
67    }
68
69    pub async fn start(self) -> Result<()> {
70        let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
71            self.build_internal()?.dissolve();
72        let connection_id = endpoint.drt().connection_id();
73        let endpoint_id = endpoint.id();
74
75        tracing::debug!("Starting endpoint: {endpoint_id}");
76
77        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
78            .as_ref()
79            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
80        // Add metrics to the handler. The endpoint provides additional information to the handler.
81        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
82
83        // This creates a child token of the runtime's endpoint_shutdown_token. That token is
84        // cancelled first as part of graceful shutdown. See Runtime::shutdown.
85        let endpoint_shutdown_token = endpoint.drt().child_token();
86
87        let system_health = endpoint.drt().system_health();
88
89        // Register with graceful shutdown tracker if needed
90        if graceful_shutdown {
91            tracing::debug!(
92                "Registering endpoint '{}' with graceful shutdown tracker",
93                endpoint.name
94            );
95            let tracker = endpoint.drt().graceful_shutdown_tracker();
96            tracker.register_endpoint();
97        } else {
98            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
99        }
100
101        // Launch endpoint based on request plane mode
102        let tracker_clone = if graceful_shutdown {
103            Some(endpoint.drt().graceful_shutdown_tracker())
104        } else {
105            None
106        };
107
108        // Create clones for the async closure
109        let namespace_name_for_task = endpoint_id.namespace.clone();
110        let component_name_for_task = endpoint_id.component.clone();
111        let endpoint_name_for_task = endpoint_id.name.clone();
112
113        // Get the unified request plane server
114        let server = endpoint.drt().request_plane_server().await?;
115
116        // Register health check target in SystemHealth if provided
117        if let Some(health_check_payload) = &health_check_payload {
118            // Build transport based on request plane mode
119            let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
120
121            let instance = Instance {
122                component: endpoint_id.component.clone(),
123                endpoint: endpoint_id.name.clone(),
124                namespace: endpoint_id.namespace.clone(),
125                instance_id: connection_id,
126                transport,
127            };
128            tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
129            let guard = system_health.lock();
130            guard.register_health_check_target(
131                &endpoint.name,
132                instance,
133                health_check_payload.clone(),
134            );
135            if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
136                handler.set_endpoint_health_check_notifier(notifier)?;
137            }
138        }
139
140        tracing::debug!(
141            endpoint = %endpoint_name_for_task,
142            transport = server.transport_name(),
143            "Registering endpoint with request plane server"
144        );
145
146        // Register endpoint with the server (unified interface)
147        server
148            .register_endpoint(
149                endpoint_name_for_task.clone(),
150                handler,
151                connection_id,
152                namespace_name_for_task.clone(),
153                component_name_for_task.clone(),
154                system_health.clone(),
155            )
156            .await?;
157
158        // Create cleanup task that unregisters on cancellation
159        let endpoint_name_for_cleanup = endpoint_name_for_task.clone();
160        let server_for_cleanup = server.clone();
161        let cancel_token_for_cleanup = endpoint_shutdown_token.clone();
162
163        let task: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
164            cancel_token_for_cleanup.cancelled().await;
165
166            tracing::debug!(
167                endpoint = %endpoint_name_for_cleanup,
168                "Unregistering endpoint from request plane server"
169            );
170
171            // Unregister from server
172            if let Err(e) = server_for_cleanup
173                .unregister_endpoint(&endpoint_name_for_cleanup)
174                .await
175            {
176                tracing::warn!(
177                    endpoint = %endpoint_name_for_cleanup,
178                    error = %e,
179                    "Failed to unregister endpoint"
180                );
181            }
182
183            // Unregister from graceful shutdown tracker
184            if let Some(tracker) = tracker_clone {
185                tracing::debug!("Unregister endpoint from graceful shutdown tracker");
186                tracker.unregister_endpoint();
187            }
188
189            anyhow::Ok(())
190        });
191
192        // Register this endpoint instance in the discovery plane
193        // The discovery interface abstracts storage backend (etcd, k8s, etc) and provides
194        // consistent registration/discovery across the system.
195        let discovery = endpoint.drt().discovery();
196
197        // Build transport for discovery service based on request plane mode
198        let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
199
200        let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
201            namespace: endpoint_id.namespace.clone(),
202            component: endpoint_id.component.clone(),
203            endpoint: endpoint_id.name.clone(),
204            transport,
205        };
206
207        if let Err(e) = discovery.register(discovery_spec).await {
208            tracing::error!(
209                %endpoint_id,
210                error = %e,
211                "Unable to register service for discovery"
212            );
213            endpoint_shutdown_token.cancel();
214            anyhow::bail!(
215                "Unable to register service for discovery. Check discovery service status"
216            );
217        }
218
219        task.await??;
220
221        Ok(())
222    }
223}
224
225/// Build transport type based on request plane mode
226///
227/// This function handles both health check and discovery transport building.
228/// All transport modes use consistent addressing:
229/// - HTTP: Uses full URL path including endpoint name (e.g., http://host:port/v1/rpc/endpoint_name)
230/// - TCP: Includes instance_id and endpoint name for routing (e.g., host:port/instance_id_hex/endpoint_name)
231/// - NATS: Uses subject-based addressing (unique per endpoint)
232///
233/// # Errors
234/// Returns an error if TCP mode is used but the TCP server hasn't been started yet.
235fn build_transport_type_inner(
236    mode: RequestPlaneMode,
237    endpoint_id: &EndpointId,
238    connection_id: u64,
239) -> Result<TransportType> {
240    match mode {
241        RequestPlaneMode::Http => {
242            let http_host = crate::utils::get_http_rpc_host_from_env();
243            // If a fixed port is explicitly configured, use it directly.
244            // Otherwise, use the actual bound port (set by HTTP server after binding when port 0 is used).
245            let http_port = std::env::var("DYN_HTTP_RPC_PORT")
246                .ok()
247                .and_then(|p| p.parse::<u16>().ok())
248                .filter(|&p| p != 0)
249                .unwrap_or(crate::pipeline::network::manager::get_actual_http_rpc_port()?);
250            let rpc_root =
251                std::env::var("DYN_HTTP_RPC_ROOT_PATH").unwrap_or_else(|_| "/v1/rpc".to_string());
252
253            let http_endpoint = format!(
254                "http://{http_host}:{http_port}{rpc_root}/{}",
255                endpoint_id.name
256            );
257
258            Ok(TransportType::Http(http_endpoint))
259        }
260        RequestPlaneMode::Tcp => {
261            let tcp_host = crate::utils::get_tcp_rpc_host_from_env();
262            // If a fixed port is explicitly configured, use it directly (no init ordering dependency).
263            // Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used).
264            let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
265                .ok()
266                .and_then(|p| p.parse::<u16>().ok())
267                .filter(|&p| p != 0)
268                .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
269
270            // Include instance_id and endpoint name for proper TCP routing.
271            // Format: host:port/instance_id_hex/endpoint_name
272            // This ensures each worker has a unique routing key when multiple workers
273            // share the same TCP server (e.g., --num-workers > 1).
274            let tcp_endpoint = format!(
275                "{}:{}/{:x}/{}",
276                tcp_host, tcp_port, connection_id, endpoint_id.name
277            );
278
279            Ok(TransportType::Tcp(tcp_endpoint))
280        }
281        RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
282            endpoint_id,
283            connection_id,
284        ))),
285    }
286}
287
288/// Build transport type, ensuring TCP server is initialized when needed.
289///
290/// In TCP mode with an OS-assigned port (`DYN_TCP_RPC_PORT` unset or invalid), the server must bind
291/// before we can construct a correct transport address. This helper ensures that initialization
292/// occurs, then delegates to the internal builder.
293pub async fn build_transport_type(
294    endpoint: &Endpoint,
295    endpoint_id: &EndpointId,
296    connection_id: u64,
297) -> Result<TransportType> {
298    let mode = endpoint.drt().request_plane();
299
300    // For TCP and HTTP with OS-assigned ports, we must ensure the server is initialized
301    // (bound to a port) before we can construct a correct transport address.
302    let has_fixed_port = match mode {
303        RequestPlaneMode::Tcp => std::env::var("DYN_TCP_RPC_PORT")
304            .ok()
305            .and_then(|p| p.parse::<u16>().ok())
306            .filter(|&p| p != 0)
307            .is_some(),
308        RequestPlaneMode::Http => std::env::var("DYN_HTTP_RPC_PORT")
309            .ok()
310            .and_then(|p| p.parse::<u16>().ok())
311            .filter(|&p| p != 0)
312            .is_some(),
313        RequestPlaneMode::Nats => true, // NATS doesn't need port init
314    };
315
316    if !has_fixed_port {
317        // Ensure request plane server is initialized before building transport.
318        let _ = endpoint.drt().request_plane_server().await?;
319    }
320
321    build_transport_type_inner(mode, endpoint_id, connection_id)
322}
323
324impl Endpoint {
325    /// Unregister this endpoint instance from discovery.
326    ///
327    /// This removes the endpoint from the instances bucket, preventing the router
328    /// from sending requests to this worker. Use this when a worker is sleeping
329    /// and should not receive any requests.
330    pub async fn unregister_endpoint_instance(&self) -> anyhow::Result<()> {
331        let drt = self.drt();
332        let instance_id = drt.connection_id();
333        let endpoint_id = self.id();
334
335        // Get the transport type for the endpoint
336        let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
337
338        let instance = crate::discovery::DiscoveryInstance::Endpoint(Instance {
339            namespace: endpoint_id.namespace,
340            component: endpoint_id.component,
341            endpoint: endpoint_id.name,
342            instance_id,
343            transport,
344        });
345
346        let discovery = drt.discovery();
347        if let Err(e) = discovery.unregister(instance).await {
348            let endpoint_id = self.id();
349            tracing::error!(
350                %endpoint_id,
351                error = %e,
352                "Unable to unregister endpoint instance from discovery"
353            );
354            anyhow::bail!(
355                "Unable to unregister endpoint instance from discovery. Check discovery service status"
356            );
357        }
358
359        tracing::info!(
360            instance_id = instance_id,
361            "Successfully unregistered endpoint instance from discovery - worker removed from routing pool"
362        );
363
364        Ok(())
365    }
366
367    /// Re-register this endpoint instance to discovery.
368    ///
369    /// This adds the endpoint back to the instances bucket, allowing the router
370    /// to send requests to this worker again. Use this when a worker wakes up
371    /// and should start receiving requests.
372    pub async fn register_endpoint_instance(&self) -> anyhow::Result<()> {
373        let drt = self.drt();
374        let instance_id = drt.connection_id();
375        let endpoint_id = self.id();
376
377        // Get the transport type for the endpoint
378        let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
379
380        let spec = crate::discovery::DiscoverySpec::Endpoint {
381            namespace: endpoint_id.namespace,
382            component: endpoint_id.component,
383            endpoint: endpoint_id.name,
384            transport,
385        };
386
387        let discovery = drt.discovery();
388        if let Err(e) = discovery.register(spec).await {
389            let endpoint_id = self.id();
390            tracing::error!(
391                %endpoint_id,
392                error = %e,
393                "Unable to re-register endpoint instance to discovery"
394            );
395            anyhow::bail!(
396                "Unable to re-register endpoint instance to discovery. Check discovery service status"
397            );
398        }
399
400        tracing::info!(
401            instance_id = instance_id,
402            "Successfully re-registered endpoint instance to discovery - worker added back to routing pool"
403        );
404
405        Ok(())
406    }
407}