dynamo_runtime/component/
endpoint.rs1use std::sync::Arc;
5
6use anyhow::Result;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use educe::Educe;
10use tokio_util::sync::CancellationToken;
11
12use crate::{
13 component::{DeviceType, Endpoint, Instance, TransportType},
14 distributed::RequestPlaneMode,
15 pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
16 protocols::EndpointId,
17 traits::DistributedRuntimeProvider,
18 transports::nats,
19};
20
21fn endpoint_device_type() -> Option<DeviceType> {
22 if std::env::var("CUDA_VISIBLE_DEVICES")
24 .ok()
25 .map(|v| {
26 let l = v.trim().to_ascii_lowercase();
27 l.is_empty() || l == "-1" || l == "none" || l == "void"
28 })
29 .unwrap_or(false)
30 {
31 return Some(DeviceType::Cpu);
32 }
33
34 if std::env::var("NVIDIA_VISIBLE_DEVICES")
36 .ok()
37 .map(|v| {
38 let l = v.trim().to_ascii_lowercase();
39 l == "none" || l == "void"
40 })
41 .unwrap_or(false)
42 {
43 return Some(DeviceType::Cpu);
44 }
45
46 Some(DeviceType::Cuda)
48}
49
50#[derive(Educe, Builder, Dissolve)]
51#[educe(Debug)]
52#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
53pub struct EndpointConfig {
54 #[builder(private)]
55 endpoint: Endpoint,
56
57 #[educe(Debug(ignore))]
59 handler: Arc<dyn PushWorkHandler>,
60
61 #[builder(default, setter(into))]
63 metrics_labels: Option<Vec<(String, String)>>,
64
65 #[builder(default = "true")]
67 graceful_shutdown: bool,
68
69 #[educe(Debug(ignore))]
73 #[builder(default, setter(into, strip_option))]
74 health_check_payload: Option<serde_json::Value>,
75}
76
77impl EndpointConfigBuilder {
78 pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
79 Self::default().endpoint(endpoint)
80 }
81
82 pub fn register_local_engine(
84 self,
85 engine: crate::local_endpoint_registry::LocalAsyncEngine,
86 ) -> Result<Self> {
87 if let Some(endpoint) = &self.endpoint {
88 let registry = endpoint.drt().local_endpoint_registry();
89 registry.register(endpoint.name.clone(), engine);
90 tracing::debug!(
91 "Registered engine for endpoint '{}' in local registry",
92 endpoint.name
93 );
94 }
95 Ok(self)
96 }
97
98 pub async fn start(self) -> Result<()> {
99 let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
100 self.build_internal()?.dissolve();
101 let connection_id = endpoint.drt().connection_id();
102 let endpoint_id = endpoint.id();
103
104 tracing::debug!("Starting endpoint: {endpoint_id}");
105
106 let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
107 .as_ref()
108 .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
109 handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
111
112 let endpoint_shutdown_token = endpoint.drt().child_token();
115
116 let system_health = endpoint.drt().system_health();
117
118 if graceful_shutdown {
120 tracing::debug!(
121 "Registering endpoint '{}' with graceful shutdown tracker",
122 endpoint.name
123 );
124 let tracker = endpoint.drt().graceful_shutdown_tracker();
125 tracker.register_endpoint();
126 } else {
127 tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
128 }
129
130 let tracker_clone = if graceful_shutdown {
132 Some(endpoint.drt().graceful_shutdown_tracker())
133 } else {
134 None
135 };
136
137 let namespace_name_for_task = endpoint_id.namespace.clone();
139 let component_name_for_task = endpoint_id.component.clone();
140 let endpoint_name_for_task = endpoint_id.name.clone();
141
142 let server = endpoint.drt().request_plane_server().await?;
144
145 if let Some(health_check_payload) = &health_check_payload {
147 let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
149
150 let instance = Instance {
151 component: endpoint_id.component.clone(),
152 endpoint: endpoint_id.name.clone(),
153 namespace: endpoint_id.namespace.clone(),
154 instance_id: connection_id,
155 transport,
156 device_type: endpoint_device_type(),
157 };
158 tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
159 let guard = system_health.lock();
160 guard.register_health_check_target(
161 &endpoint.name,
162 instance,
163 health_check_payload.clone(),
164 );
165 if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
166 handler.set_endpoint_health_check_notifier(notifier)?;
167 }
168 }
169
170 tracing::debug!(
171 endpoint = %endpoint_name_for_task,
172 transport = server.transport_name(),
173 "Registering endpoint with request plane server"
174 );
175
176 server
178 .register_endpoint(
179 endpoint_name_for_task.clone(),
180 handler,
181 connection_id,
182 namespace_name_for_task.clone(),
183 component_name_for_task.clone(),
184 system_health.clone(),
185 )
186 .await?;
187
188 let endpoint_name_for_cleanup = endpoint_name_for_task.clone();
190 let server_for_cleanup = server.clone();
191 let cancel_token_for_cleanup = endpoint_shutdown_token.clone();
192
193 let task: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
194 cancel_token_for_cleanup.cancelled().await;
195
196 tracing::debug!(
197 endpoint = %endpoint_name_for_cleanup,
198 "Unregistering endpoint from request plane server"
199 );
200
201 if let Err(e) = server_for_cleanup
203 .unregister_endpoint(&endpoint_name_for_cleanup)
204 .await
205 {
206 tracing::warn!(
207 endpoint = %endpoint_name_for_cleanup,
208 error = %e,
209 "Failed to unregister endpoint"
210 );
211 }
212
213 if let Some(tracker) = tracker_clone {
215 tracing::debug!("Unregister endpoint from graceful shutdown tracker");
216 tracker.unregister_endpoint();
217 }
218
219 anyhow::Ok(())
220 });
221
222 let discovery = endpoint.drt().discovery();
226
227 let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
229
230 let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
231 namespace: endpoint_id.namespace.clone(),
232 component: endpoint_id.component.clone(),
233 endpoint: endpoint_id.name.clone(),
234 transport,
235 device_type: endpoint_device_type(),
236 };
237
238 if let Err(e) = discovery.register(discovery_spec).await {
239 tracing::error!(
240 %endpoint_id,
241 error = %e,
242 "Unable to register service for discovery"
243 );
244 endpoint_shutdown_token.cancel();
245 anyhow::bail!(
246 "Unable to register service for discovery. Check discovery service status"
247 );
248 }
249
250 task.await??;
251
252 Ok(())
253 }
254}
255
256fn build_transport_type_inner(
267 mode: RequestPlaneMode,
268 endpoint_id: &EndpointId,
269 connection_id: u64,
270) -> Result<TransportType> {
271 match mode {
272 RequestPlaneMode::Http => {
273 let http_host = crate::utils::get_http_rpc_host_from_env();
274 let http_port = std::env::var("DYN_HTTP_RPC_PORT")
277 .ok()
278 .and_then(|p| p.parse::<u16>().ok())
279 .filter(|&p| p != 0)
280 .unwrap_or(crate::pipeline::network::manager::get_actual_http_rpc_port()?);
281 let rpc_root =
282 std::env::var("DYN_HTTP_RPC_ROOT_PATH").unwrap_or_else(|_| "/v1/rpc".to_string());
283
284 let http_endpoint = format!(
285 "http://{http_host}:{http_port}{rpc_root}/{}",
286 endpoint_id.name
287 );
288
289 Ok(TransportType::Http(http_endpoint))
290 }
291 RequestPlaneMode::Tcp => {
292 let tcp_host = crate::utils::get_tcp_rpc_host_from_env();
293 let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
296 .ok()
297 .and_then(|p| p.parse::<u16>().ok())
298 .filter(|&p| p != 0)
299 .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
300
301 let tcp_endpoint = format!(
306 "{}:{}/{:x}/{}",
307 tcp_host, tcp_port, connection_id, endpoint_id.name
308 );
309
310 Ok(TransportType::Tcp(tcp_endpoint))
311 }
312 RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
313 endpoint_id,
314 connection_id,
315 ))),
316 }
317}
318
319pub async fn build_transport_type(
325 endpoint: &Endpoint,
326 endpoint_id: &EndpointId,
327 connection_id: u64,
328) -> Result<TransportType> {
329 let mode = endpoint.drt().request_plane();
330
331 let has_fixed_port = match mode {
334 RequestPlaneMode::Tcp => std::env::var("DYN_TCP_RPC_PORT")
335 .ok()
336 .and_then(|p| p.parse::<u16>().ok())
337 .filter(|&p| p != 0)
338 .is_some(),
339 RequestPlaneMode::Http => std::env::var("DYN_HTTP_RPC_PORT")
340 .ok()
341 .and_then(|p| p.parse::<u16>().ok())
342 .filter(|&p| p != 0)
343 .is_some(),
344 RequestPlaneMode::Nats => true, };
346
347 if !has_fixed_port {
348 let _ = endpoint.drt().request_plane_server().await?;
350 }
351
352 build_transport_type_inner(mode, endpoint_id, connection_id)
353}
354
355impl Endpoint {
356 pub async fn unregister_endpoint_instance(&self) -> anyhow::Result<()> {
362 let drt = self.drt();
363 let instance_id = drt.connection_id();
364 let endpoint_id = self.id();
365
366 let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
368
369 let instance = crate::discovery::DiscoveryInstance::Endpoint(Instance {
370 namespace: endpoint_id.namespace,
371 component: endpoint_id.component,
372 endpoint: endpoint_id.name,
373 instance_id,
374 transport,
375 device_type: endpoint_device_type(),
376 });
377
378 let discovery = drt.discovery();
379 if let Err(e) = discovery.unregister(instance).await {
380 let endpoint_id = self.id();
381 tracing::error!(
382 %endpoint_id,
383 error = %e,
384 "Unable to unregister endpoint instance from discovery"
385 );
386 anyhow::bail!(
387 "Unable to unregister endpoint instance from discovery. Check discovery service status"
388 );
389 }
390
391 tracing::info!(
392 instance_id = instance_id,
393 "Successfully unregistered endpoint instance from discovery - worker removed from routing pool"
394 );
395
396 Ok(())
397 }
398
399 pub async fn register_endpoint_instance(&self) -> anyhow::Result<()> {
405 let drt = self.drt();
406 let instance_id = drt.connection_id();
407 let endpoint_id = self.id();
408
409 let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
411
412 let spec = crate::discovery::DiscoverySpec::Endpoint {
413 namespace: endpoint_id.namespace,
414 component: endpoint_id.component,
415 endpoint: endpoint_id.name,
416 transport,
417 device_type: endpoint_device_type(),
418 };
419
420 let discovery = drt.discovery();
421 if let Err(e) = discovery.register(spec).await {
422 let endpoint_id = self.id();
423 tracing::error!(
424 %endpoint_id,
425 error = %e,
426 "Unable to re-register endpoint instance to discovery"
427 );
428 anyhow::bail!(
429 "Unable to re-register endpoint instance to discovery. Check discovery service status"
430 );
431 }
432
433 tracing::info!(
434 instance_id = instance_id,
435 "Successfully re-registered endpoint instance to discovery - worker added back to routing pool"
436 );
437
438 Ok(())
439 }
440}