dynamo_runtime/component/
endpoint.rs1use std::sync::Arc;
5
6use anyhow::Result;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use educe::Educe;
10use tokio_util::sync::CancellationToken;
11
12use crate::{
13 component::{DeviceType, Endpoint, Instance, TransportType},
14 distributed::RequestPlaneMode,
15 pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
16 protocols::EndpointId,
17 traits::DistributedRuntimeProvider,
18 transports::nats,
19};
20
21fn endpoint_device_type() -> Option<DeviceType> {
22 if std::env::var("CUDA_VISIBLE_DEVICES")
24 .ok()
25 .map(|v| {
26 let l = v.trim().to_ascii_lowercase();
27 l.is_empty() || l == "-1" || l == "none" || l == "void"
28 })
29 .unwrap_or(false)
30 {
31 return Some(DeviceType::Cpu);
32 }
33
34 if std::env::var("NVIDIA_VISIBLE_DEVICES")
36 .ok()
37 .map(|v| {
38 let l = v.trim().to_ascii_lowercase();
39 l == "none" || l == "void"
40 })
41 .unwrap_or(false)
42 {
43 return Some(DeviceType::Cpu);
44 }
45
46 Some(DeviceType::Cuda)
48}
49
50#[derive(Educe, Builder, Dissolve)]
51#[educe(Debug)]
52#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
53pub struct EndpointConfig {
54 #[builder(private)]
55 endpoint: Endpoint,
56
57 #[educe(Debug(ignore))]
59 handler: Arc<dyn PushWorkHandler>,
60
61 #[builder(default, setter(into))]
63 metrics_labels: Option<Vec<(String, String)>>,
64
65 #[builder(default = "true")]
67 graceful_shutdown: bool,
68
69 #[educe(Debug(ignore))]
73 #[builder(default, setter(into, strip_option))]
74 health_check_payload: Option<serde_json::Value>,
75}
76
77impl EndpointConfigBuilder {
78 pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
79 Self::default().endpoint(endpoint)
80 }
81
82 pub fn register_local_engine(
84 self,
85 engine: crate::local_endpoint_registry::LocalAsyncEngine,
86 ) -> Result<Self> {
87 if let Some(endpoint) = &self.endpoint {
88 let registry = endpoint.drt().local_endpoint_registry();
89 registry.register(endpoint.name.clone(), engine);
90 tracing::debug!(
91 "Registered engine for endpoint '{}' in local registry",
92 endpoint.name
93 );
94 }
95 Ok(self)
96 }
97
98 pub async fn start(self) -> Result<()> {
99 let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
100 self.build_internal()?.dissolve();
101 let connection_id = endpoint.drt().connection_id();
102 let endpoint_id = endpoint.id();
103
104 tracing::debug!("Starting endpoint: {endpoint_id}");
105
106 let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
107 .as_ref()
108 .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
109 handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
111
112 let endpoint_shutdown_token = endpoint.drt().child_token();
115
116 let system_health = endpoint.drt().system_health();
117
118 if graceful_shutdown {
120 tracing::debug!(
121 "Registering endpoint '{}' with graceful shutdown tracker",
122 endpoint.name
123 );
124 let tracker = endpoint.drt().graceful_shutdown_tracker();
125 tracker.register_endpoint();
126 } else {
127 tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
128 }
129
130 let tracker_clone = if graceful_shutdown {
132 Some(endpoint.drt().graceful_shutdown_tracker())
133 } else {
134 None
135 };
136
137 let namespace_name_for_task = endpoint_id.namespace.clone();
139 let component_name_for_task = endpoint_id.component.clone();
140 let endpoint_name_for_task = endpoint_id.name.clone();
141
142 let server = endpoint.drt().request_plane_server().await?;
144
145 if let Some(health_check_payload) = &health_check_payload {
147 if system_health.lock().health_check_enabled()
148 && endpoint
149 .drt()
150 .local_endpoint_registry()
151 .get(&endpoint.name)
152 .is_none()
153 {
154 anyhow::bail!(
155 "Endpoint '{}' has a health_check_payload and canary is enabled, \
156 but no local engine is registered. Call .register_local_engine() \
157 before .start() so the canary health check can function.",
158 endpoint.name
159 );
160 }
161
162 let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
164
165 let instance = Instance {
166 component: endpoint_id.component.clone(),
167 endpoint: endpoint_id.name.clone(),
168 namespace: endpoint_id.namespace.clone(),
169 instance_id: connection_id,
170 transport,
171 device_type: endpoint_device_type(),
172 };
173 tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
174 let guard = system_health.lock();
175 guard.register_health_check_target(
176 &endpoint.name,
177 instance,
178 health_check_payload.clone(),
179 );
180 if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
181 handler.set_endpoint_health_check_notifier(notifier)?;
182 }
183 }
184
185 tracing::debug!(
186 endpoint = %endpoint_name_for_task,
187 transport = server.transport_name(),
188 "Registering endpoint with request plane server"
189 );
190
191 server
193 .register_endpoint(
194 endpoint_name_for_task.clone(),
195 handler,
196 connection_id,
197 namespace_name_for_task.clone(),
198 component_name_for_task.clone(),
199 system_health.clone(),
200 )
201 .await?;
202
203 let endpoint_name_for_cleanup = endpoint_name_for_task.clone();
205 let server_for_cleanup = server.clone();
206 let cancel_token_for_cleanup = endpoint_shutdown_token.clone();
207
208 let task: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
209 cancel_token_for_cleanup.cancelled().await;
210
211 tracing::debug!(
212 endpoint = %endpoint_name_for_cleanup,
213 "Unregistering endpoint from request plane server"
214 );
215
216 if let Err(e) = server_for_cleanup
218 .unregister_endpoint(&endpoint_name_for_cleanup)
219 .await
220 {
221 tracing::warn!(
222 endpoint = %endpoint_name_for_cleanup,
223 error = %e,
224 "Failed to unregister endpoint"
225 );
226 }
227
228 if let Some(tracker) = tracker_clone {
230 tracing::debug!("Unregister endpoint from graceful shutdown tracker");
231 tracker.unregister_endpoint();
232 }
233
234 anyhow::Ok(())
235 });
236
237 let discovery = endpoint.drt().discovery();
241
242 let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
244
245 let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
246 namespace: endpoint_id.namespace.clone(),
247 component: endpoint_id.component.clone(),
248 endpoint: endpoint_id.name.clone(),
249 transport,
250 device_type: endpoint_device_type(),
251 };
252
253 if let Err(e) = discovery.register(discovery_spec).await {
254 tracing::error!(
255 %endpoint_id,
256 error = %e,
257 "Unable to register service for discovery"
258 );
259 endpoint_shutdown_token.cancel();
260 anyhow::bail!(
261 "Unable to register service for discovery. Check discovery service status"
262 );
263 }
264
265 task.await??;
266
267 Ok(())
268 }
269}
270
271fn build_transport_type_inner(
281 mode: RequestPlaneMode,
282 endpoint_id: &EndpointId,
283 connection_id: u64,
284) -> Result<TransportType> {
285 match mode {
286 RequestPlaneMode::Tcp => {
287 let tcp_host = crate::utils::tcp_rpc_host_from_env();
288 let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
291 .ok()
292 .and_then(|p| p.parse::<u16>().ok())
293 .filter(|&p| p != 0)
294 .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
295
296 let tcp_endpoint = format!(
301 "{}:{}/{:x}/{}",
302 tcp_host, tcp_port, connection_id, endpoint_id.name
303 );
304
305 Ok(TransportType::Tcp(tcp_endpoint))
306 }
307 RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
308 endpoint_id,
309 connection_id,
310 ))),
311 }
312}
313
314pub async fn build_transport_type(
320 endpoint: &Endpoint,
321 endpoint_id: &EndpointId,
322 connection_id: u64,
323) -> Result<TransportType> {
324 let mode = endpoint.drt().request_plane();
325
326 let has_fixed_port = match mode {
329 RequestPlaneMode::Tcp => std::env::var("DYN_TCP_RPC_PORT")
330 .ok()
331 .and_then(|p| p.parse::<u16>().ok())
332 .filter(|&p| p != 0)
333 .is_some(),
334 RequestPlaneMode::Nats => true, };
336
337 if !has_fixed_port {
338 let _ = endpoint.drt().request_plane_server().await?;
340 }
341
342 build_transport_type_inner(mode, endpoint_id, connection_id)
343}
344
345impl Endpoint {
346 pub async fn unregister_endpoint_instance(&self) -> anyhow::Result<()> {
352 let drt = self.drt();
353 let instance_id = drt.connection_id();
354 let endpoint_id = self.id();
355
356 let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
358
359 let instance = crate::discovery::DiscoveryInstance::Endpoint(Instance {
360 namespace: endpoint_id.namespace,
361 component: endpoint_id.component,
362 endpoint: endpoint_id.name,
363 instance_id,
364 transport,
365 device_type: endpoint_device_type(),
366 });
367
368 let discovery = drt.discovery();
369 if let Err(e) = discovery.unregister(instance).await {
370 let endpoint_id = self.id();
371 tracing::error!(
372 %endpoint_id,
373 error = %e,
374 "Unable to unregister endpoint instance from discovery"
375 );
376 anyhow::bail!(
377 "Unable to unregister endpoint instance from discovery. Check discovery service status"
378 );
379 }
380
381 tracing::info!(
382 instance_id = instance_id,
383 "Successfully unregistered endpoint instance from discovery - worker removed from routing pool"
384 );
385
386 Ok(())
387 }
388
389 pub async fn register_endpoint_instance(&self) -> anyhow::Result<()> {
395 let drt = self.drt();
396 let instance_id = drt.connection_id();
397 let endpoint_id = self.id();
398
399 let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
401
402 let spec = crate::discovery::DiscoverySpec::Endpoint {
403 namespace: endpoint_id.namespace,
404 component: endpoint_id.component,
405 endpoint: endpoint_id.name,
406 transport,
407 device_type: endpoint_device_type(),
408 };
409
410 let discovery = drt.discovery();
411 if let Err(e) = discovery.register(spec).await {
412 let endpoint_id = self.id();
413 tracing::error!(
414 %endpoint_id,
415 error = %e,
416 "Unable to re-register endpoint instance to discovery"
417 );
418 anyhow::bail!(
419 "Unable to re-register endpoint instance to discovery. Check discovery service status"
420 );
421 }
422
423 tracing::info!(
424 instance_id = instance_id,
425 "Successfully re-registered endpoint instance to discovery - worker added back to routing pool"
426 );
427
428 Ok(())
429 }
430}