Skip to main content

dynamo_runtime/component/
endpoint.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use educe::Educe;
10use tokio_util::sync::CancellationToken;
11
12use crate::{
13    component::{Endpoint, Instance, TransportType},
14    distributed::RequestPlaneMode,
15    pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
16    protocols::EndpointId,
17    traits::DistributedRuntimeProvider,
18    transports::nats,
19};
20
21#[derive(Educe, Builder, Dissolve)]
22#[educe(Debug)]
23#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
24pub struct EndpointConfig {
25    #[builder(private)]
26    endpoint: Endpoint,
27
28    /// Endpoint handler
29    #[educe(Debug(ignore))]
30    handler: Arc<dyn PushWorkHandler>,
31
32    /// Additional labels for metrics
33    #[builder(default, setter(into))]
34    metrics_labels: Option<Vec<(String, String)>>,
35
36    /// Whether to wait for inflight requests to complete during shutdown
37    #[builder(default = "true")]
38    graceful_shutdown: bool,
39
40    /// Health check payload for this endpoint
41    /// This payload will be sent to the endpoint during health checks
42    /// to verify it's responding properly
43    #[educe(Debug(ignore))]
44    #[builder(default, setter(into, strip_option))]
45    health_check_payload: Option<serde_json::Value>,
46}
47
48impl EndpointConfigBuilder {
49    pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
50        Self::default().endpoint(endpoint)
51    }
52
53    /// Register an async engine in the local endpoint registry for direct in-process calls
54    pub fn register_local_engine(
55        self,
56        engine: crate::local_endpoint_registry::LocalAsyncEngine,
57    ) -> Result<Self> {
58        if let Some(endpoint) = &self.endpoint {
59            let registry = endpoint.drt().local_endpoint_registry();
60            registry.register(endpoint.name.clone(), engine);
61            tracing::debug!(
62                "Registered engine for endpoint '{}' in local registry",
63                endpoint.name
64            );
65        }
66        Ok(self)
67    }
68
69    pub async fn start(self) -> Result<()> {
70        let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
71            self.build_internal()?.dissolve();
72        let connection_id = endpoint.drt().connection_id();
73        let endpoint_id = endpoint.id();
74
75        tracing::debug!("Starting endpoint: {endpoint_id}");
76
77        let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
78            .as_ref()
79            .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
80        // Add metrics to the handler. The endpoint provides additional information to the handler.
81        handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
82
83        // This creates a child token of the runtime's endpoint_shutdown_token. That token is
84        // cancelled first as part of graceful shutdown. See Runtime::shutdown.
85        let endpoint_shutdown_token = endpoint.drt().child_token();
86
87        let system_health = endpoint.drt().system_health();
88
89        // Register with graceful shutdown tracker if needed
90        if graceful_shutdown {
91            tracing::debug!(
92                "Registering endpoint '{}' with graceful shutdown tracker",
93                endpoint.name
94            );
95            let tracker = endpoint.drt().graceful_shutdown_tracker();
96            tracker.register_endpoint();
97        } else {
98            tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
99        }
100
101        // Launch endpoint based on request plane mode
102        let tracker_clone = if graceful_shutdown {
103            Some(endpoint.drt().graceful_shutdown_tracker())
104        } else {
105            None
106        };
107
108        // Create clones for the async closure
109        let namespace_name_for_task = endpoint_id.namespace.clone();
110        let component_name_for_task = endpoint_id.component.clone();
111        let endpoint_name_for_task = endpoint_id.name.clone();
112
113        // Get the unified request plane server
114        let server = endpoint.drt().request_plane_server().await?;
115
116        // Register health check target in SystemHealth if provided
117        if let Some(health_check_payload) = &health_check_payload {
118            // Build transport based on request plane mode
119            let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
120
121            let instance = Instance {
122                component: endpoint_id.component.clone(),
123                endpoint: endpoint_id.name.clone(),
124                namespace: endpoint_id.namespace.clone(),
125                instance_id: connection_id,
126                transport,
127            };
128            tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
129            let guard = system_health.lock();
130            guard.register_health_check_target(
131                &endpoint.name,
132                instance,
133                health_check_payload.clone(),
134            );
135            if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
136                handler.set_endpoint_health_check_notifier(notifier)?;
137            }
138        }
139
140        tracing::debug!(
141            endpoint = %endpoint_name_for_task,
142            transport = server.transport_name(),
143            "Registering endpoint with request plane server"
144        );
145
146        // Register endpoint with the server (unified interface)
147        server
148            .register_endpoint(
149                endpoint_name_for_task.clone(),
150                handler,
151                connection_id,
152                namespace_name_for_task.clone(),
153                component_name_for_task.clone(),
154                system_health.clone(),
155            )
156            .await?;
157
158        // Create cleanup task that unregisters on cancellation
159        let endpoint_name_for_cleanup = endpoint_name_for_task.clone();
160        let server_for_cleanup = server.clone();
161        let cancel_token_for_cleanup = endpoint_shutdown_token.clone();
162
163        let task: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
164            cancel_token_for_cleanup.cancelled().await;
165
166            tracing::debug!(
167                endpoint = %endpoint_name_for_cleanup,
168                "Unregistering endpoint from request plane server"
169            );
170
171            // Unregister from server
172            if let Err(e) = server_for_cleanup
173                .unregister_endpoint(&endpoint_name_for_cleanup)
174                .await
175            {
176                tracing::warn!(
177                    endpoint = %endpoint_name_for_cleanup,
178                    error = %e,
179                    "Failed to unregister endpoint"
180                );
181            }
182
183            // Unregister from graceful shutdown tracker
184            if let Some(tracker) = tracker_clone {
185                tracing::debug!("Unregister endpoint from graceful shutdown tracker");
186                tracker.unregister_endpoint();
187            }
188
189            anyhow::Ok(())
190        });
191
192        // Register this endpoint instance in the discovery plane
193        // The discovery interface abstracts storage backend (etcd, k8s, etc) and provides
194        // consistent registration/discovery across the system.
195        let discovery = endpoint.drt().discovery();
196
197        // Build transport for discovery service based on request plane mode
198        let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
199
200        let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
201            namespace: endpoint_id.namespace.clone(),
202            component: endpoint_id.component.clone(),
203            endpoint: endpoint_id.name.clone(),
204            transport,
205        };
206
207        if let Err(e) = discovery.register(discovery_spec).await {
208            tracing::error!(
209                %endpoint_id,
210                error = %e,
211                "Unable to register service for discovery"
212            );
213            endpoint_shutdown_token.cancel();
214            anyhow::bail!(
215                "Unable to register service for discovery. Check discovery service status"
216            );
217        }
218
219        task.await??;
220
221        Ok(())
222    }
223}
224
225/// Build transport type based on request plane mode
226///
227/// This function handles both health check and discovery transport building.
228/// All transport modes use consistent addressing:
229/// - HTTP: Uses full URL path including endpoint name (e.g., http://host:port/v1/rpc/endpoint_name)
230/// - TCP: Includes instance_id and endpoint name for routing (e.g., host:port/instance_id_hex/endpoint_name)
231/// - NATS: Uses subject-based addressing (unique per endpoint)
232///
233/// # Errors
234/// Returns an error if TCP mode is used but the TCP server hasn't been started yet.
235fn build_transport_type_inner(
236    mode: RequestPlaneMode,
237    endpoint_id: &EndpointId,
238    connection_id: u64,
239) -> Result<TransportType> {
240    match mode {
241        RequestPlaneMode::Http => {
242            let http_host = crate::utils::get_http_rpc_host_from_env();
243            let http_port = std::env::var("DYN_HTTP_RPC_PORT")
244                .ok()
245                .and_then(|p| p.parse::<u16>().ok())
246                .unwrap_or(8888);
247            let rpc_root =
248                std::env::var("DYN_HTTP_RPC_ROOT_PATH").unwrap_or_else(|_| "/v1/rpc".to_string());
249
250            let http_endpoint = format!(
251                "http://{http_host}:{http_port}{rpc_root}/{}",
252                endpoint_id.name
253            );
254
255            Ok(TransportType::Http(http_endpoint))
256        }
257        RequestPlaneMode::Tcp => {
258            let tcp_host = crate::utils::get_tcp_rpc_host_from_env();
259            // If a fixed port is explicitly configured, use it directly (no init ordering dependency).
260            // Otherwise, use the actual bound port (set by TCP server after binding when port 0 is used).
261            let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
262                .ok()
263                .and_then(|p| p.parse::<u16>().ok())
264                .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
265
266            // Include instance_id and endpoint name for proper TCP routing.
267            // Format: host:port/instance_id_hex/endpoint_name
268            // This ensures each worker has a unique routing key when multiple workers
269            // share the same TCP server (e.g., --num-workers > 1).
270            let tcp_endpoint = format!(
271                "{}:{}/{:x}/{}",
272                tcp_host, tcp_port, connection_id, endpoint_id.name
273            );
274
275            Ok(TransportType::Tcp(tcp_endpoint))
276        }
277        RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
278            endpoint_id,
279            connection_id,
280        ))),
281    }
282}
283
284/// Build transport type, ensuring TCP server is initialized when needed.
285///
286/// In TCP mode with an OS-assigned port (`DYN_TCP_RPC_PORT` unset or invalid), the server must bind
287/// before we can construct a correct transport address. This helper ensures that initialization
288/// occurs, then delegates to the internal builder.
289pub async fn build_transport_type(
290    endpoint: &Endpoint,
291    endpoint_id: &EndpointId,
292    connection_id: u64,
293) -> Result<TransportType> {
294    let mode = endpoint.drt().request_plane();
295
296    if mode == RequestPlaneMode::Tcp {
297        // Only force server init when we *don't* have a valid explicit port.
298        let has_fixed_port = std::env::var("DYN_TCP_RPC_PORT")
299            .ok()
300            .and_then(|p| p.parse::<u16>().ok())
301            .is_some();
302
303        if !has_fixed_port {
304            // Ensure request plane server is initialized before building transport.
305            let _ = endpoint.drt().request_plane_server().await?;
306        }
307    }
308
309    build_transport_type_inner(mode, endpoint_id, connection_id)
310}
311
312impl Endpoint {
313    /// Unregister this endpoint instance from discovery.
314    ///
315    /// This removes the endpoint from the instances bucket, preventing the router
316    /// from sending requests to this worker. Use this when a worker is sleeping
317    /// and should not receive any requests.
318    pub async fn unregister_endpoint_instance(&self) -> anyhow::Result<()> {
319        let drt = self.drt();
320        let instance_id = drt.connection_id();
321        let endpoint_id = self.id();
322
323        // Get the transport type for the endpoint
324        let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
325
326        let instance = crate::discovery::DiscoveryInstance::Endpoint(Instance {
327            namespace: endpoint_id.namespace,
328            component: endpoint_id.component,
329            endpoint: endpoint_id.name,
330            instance_id,
331            transport,
332        });
333
334        let discovery = drt.discovery();
335        if let Err(e) = discovery.unregister(instance).await {
336            let endpoint_id = self.id();
337            tracing::error!(
338                %endpoint_id,
339                error = %e,
340                "Unable to unregister endpoint instance from discovery"
341            );
342            anyhow::bail!(
343                "Unable to unregister endpoint instance from discovery. Check discovery service status"
344            );
345        }
346
347        tracing::info!(
348            instance_id = instance_id,
349            "Successfully unregistered endpoint instance from discovery - worker removed from routing pool"
350        );
351
352        Ok(())
353    }
354
355    /// Re-register this endpoint instance to discovery.
356    ///
357    /// This adds the endpoint back to the instances bucket, allowing the router
358    /// to send requests to this worker again. Use this when a worker wakes up
359    /// and should start receiving requests.
360    pub async fn register_endpoint_instance(&self) -> anyhow::Result<()> {
361        let drt = self.drt();
362        let instance_id = drt.connection_id();
363        let endpoint_id = self.id();
364
365        // Get the transport type for the endpoint
366        let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
367
368        let spec = crate::discovery::DiscoverySpec::Endpoint {
369            namespace: endpoint_id.namespace,
370            component: endpoint_id.component,
371            endpoint: endpoint_id.name,
372            transport,
373        };
374
375        let discovery = drt.discovery();
376        if let Err(e) = discovery.register(spec).await {
377            let endpoint_id = self.id();
378            tracing::error!(
379                %endpoint_id,
380                error = %e,
381                "Unable to re-register endpoint instance to discovery"
382            );
383            anyhow::bail!(
384                "Unable to re-register endpoint instance to discovery. Check discovery service status"
385            );
386        }
387
388        tracing::info!(
389            instance_id = instance_id,
390            "Successfully re-registered endpoint instance to discovery - worker added back to routing pool"
391        );
392
393        Ok(())
394    }
395}