dynamo_runtime/component/
endpoint.rs1use std::sync::Arc;
5
6use anyhow::Result;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use educe::Educe;
10use tokio_util::sync::CancellationToken;
11
12use crate::{
13 component::{Endpoint, Instance, TransportType},
14 distributed::RequestPlaneMode,
15 pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
16 protocols::EndpointId,
17 traits::DistributedRuntimeProvider,
18 transports::nats,
19};
20
21#[derive(Educe, Builder, Dissolve)]
22#[educe(Debug)]
23#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
24pub struct EndpointConfig {
25 #[builder(private)]
26 endpoint: Endpoint,
27
28 #[educe(Debug(ignore))]
30 handler: Arc<dyn PushWorkHandler>,
31
32 #[builder(default, setter(into))]
34 metrics_labels: Option<Vec<(String, String)>>,
35
36 #[builder(default = "true")]
38 graceful_shutdown: bool,
39
40 #[educe(Debug(ignore))]
44 #[builder(default, setter(into, strip_option))]
45 health_check_payload: Option<serde_json::Value>,
46}
47
48impl EndpointConfigBuilder {
49 pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
50 Self::default().endpoint(endpoint)
51 }
52
53 pub fn register_local_engine(
55 self,
56 engine: crate::local_endpoint_registry::LocalAsyncEngine,
57 ) -> Result<Self> {
58 if let Some(endpoint) = &self.endpoint {
59 let registry = endpoint.drt().local_endpoint_registry();
60 registry.register(endpoint.name.clone(), engine);
61 tracing::debug!(
62 "Registered engine for endpoint '{}' in local registry",
63 endpoint.name
64 );
65 }
66 Ok(self)
67 }
68
69 pub async fn start(self) -> Result<()> {
70 let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
71 self.build_internal()?.dissolve();
72 let connection_id = endpoint.drt().connection_id();
73 let endpoint_id = endpoint.id();
74
75 tracing::debug!("Starting endpoint: {endpoint_id}");
76
77 let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
78 .as_ref()
79 .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
80 handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
82
83 let endpoint_shutdown_token = endpoint.drt().child_token();
86
87 let system_health = endpoint.drt().system_health();
88
89 if graceful_shutdown {
91 tracing::debug!(
92 "Registering endpoint '{}' with graceful shutdown tracker",
93 endpoint.name
94 );
95 let tracker = endpoint.drt().graceful_shutdown_tracker();
96 tracker.register_endpoint();
97 } else {
98 tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
99 }
100
101 let tracker_clone = if graceful_shutdown {
103 Some(endpoint.drt().graceful_shutdown_tracker())
104 } else {
105 None
106 };
107
108 let namespace_name_for_task = endpoint_id.namespace.clone();
110 let component_name_for_task = endpoint_id.component.clone();
111 let endpoint_name_for_task = endpoint_id.name.clone();
112
113 let server = endpoint.drt().request_plane_server().await?;
115
116 if let Some(health_check_payload) = &health_check_payload {
118 let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
120
121 let instance = Instance {
122 component: endpoint_id.component.clone(),
123 endpoint: endpoint_id.name.clone(),
124 namespace: endpoint_id.namespace.clone(),
125 instance_id: connection_id,
126 transport,
127 };
128 tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
129 let guard = system_health.lock();
130 guard.register_health_check_target(
131 &endpoint.name,
132 instance,
133 health_check_payload.clone(),
134 );
135 if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
136 handler.set_endpoint_health_check_notifier(notifier)?;
137 }
138 }
139
140 tracing::debug!(
141 endpoint = %endpoint_name_for_task,
142 transport = server.transport_name(),
143 "Registering endpoint with request plane server"
144 );
145
146 server
148 .register_endpoint(
149 endpoint_name_for_task.clone(),
150 handler,
151 connection_id,
152 namespace_name_for_task.clone(),
153 component_name_for_task.clone(),
154 system_health.clone(),
155 )
156 .await?;
157
158 let endpoint_name_for_cleanup = endpoint_name_for_task.clone();
160 let server_for_cleanup = server.clone();
161 let cancel_token_for_cleanup = endpoint_shutdown_token.clone();
162
163 let task: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
164 cancel_token_for_cleanup.cancelled().await;
165
166 tracing::debug!(
167 endpoint = %endpoint_name_for_cleanup,
168 "Unregistering endpoint from request plane server"
169 );
170
171 if let Err(e) = server_for_cleanup
173 .unregister_endpoint(&endpoint_name_for_cleanup)
174 .await
175 {
176 tracing::warn!(
177 endpoint = %endpoint_name_for_cleanup,
178 error = %e,
179 "Failed to unregister endpoint"
180 );
181 }
182
183 if let Some(tracker) = tracker_clone {
185 tracing::debug!("Unregister endpoint from graceful shutdown tracker");
186 tracker.unregister_endpoint();
187 }
188
189 anyhow::Ok(())
190 });
191
192 let discovery = endpoint.drt().discovery();
196
197 let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
199
200 let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
201 namespace: endpoint_id.namespace.clone(),
202 component: endpoint_id.component.clone(),
203 endpoint: endpoint_id.name.clone(),
204 transport,
205 };
206
207 if let Err(e) = discovery.register(discovery_spec).await {
208 tracing::error!(
209 %endpoint_id,
210 error = %e,
211 "Unable to register service for discovery"
212 );
213 endpoint_shutdown_token.cancel();
214 anyhow::bail!(
215 "Unable to register service for discovery. Check discovery service status"
216 );
217 }
218
219 task.await??;
220
221 Ok(())
222 }
223}
224
225fn build_transport_type_inner(
236 mode: RequestPlaneMode,
237 endpoint_id: &EndpointId,
238 connection_id: u64,
239) -> Result<TransportType> {
240 match mode {
241 RequestPlaneMode::Http => {
242 let http_host = crate::utils::get_http_rpc_host_from_env();
243 let http_port = std::env::var("DYN_HTTP_RPC_PORT")
246 .ok()
247 .and_then(|p| p.parse::<u16>().ok())
248 .filter(|&p| p != 0)
249 .unwrap_or(crate::pipeline::network::manager::get_actual_http_rpc_port()?);
250 let rpc_root =
251 std::env::var("DYN_HTTP_RPC_ROOT_PATH").unwrap_or_else(|_| "/v1/rpc".to_string());
252
253 let http_endpoint = format!(
254 "http://{http_host}:{http_port}{rpc_root}/{}",
255 endpoint_id.name
256 );
257
258 Ok(TransportType::Http(http_endpoint))
259 }
260 RequestPlaneMode::Tcp => {
261 let tcp_host = crate::utils::get_tcp_rpc_host_from_env();
262 let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
265 .ok()
266 .and_then(|p| p.parse::<u16>().ok())
267 .filter(|&p| p != 0)
268 .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
269
270 let tcp_endpoint = format!(
275 "{}:{}/{:x}/{}",
276 tcp_host, tcp_port, connection_id, endpoint_id.name
277 );
278
279 Ok(TransportType::Tcp(tcp_endpoint))
280 }
281 RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
282 endpoint_id,
283 connection_id,
284 ))),
285 }
286}
287
288pub async fn build_transport_type(
294 endpoint: &Endpoint,
295 endpoint_id: &EndpointId,
296 connection_id: u64,
297) -> Result<TransportType> {
298 let mode = endpoint.drt().request_plane();
299
300 let has_fixed_port = match mode {
303 RequestPlaneMode::Tcp => std::env::var("DYN_TCP_RPC_PORT")
304 .ok()
305 .and_then(|p| p.parse::<u16>().ok())
306 .filter(|&p| p != 0)
307 .is_some(),
308 RequestPlaneMode::Http => std::env::var("DYN_HTTP_RPC_PORT")
309 .ok()
310 .and_then(|p| p.parse::<u16>().ok())
311 .filter(|&p| p != 0)
312 .is_some(),
313 RequestPlaneMode::Nats => true, };
315
316 if !has_fixed_port {
317 let _ = endpoint.drt().request_plane_server().await?;
319 }
320
321 build_transport_type_inner(mode, endpoint_id, connection_id)
322}
323
324impl Endpoint {
325 pub async fn unregister_endpoint_instance(&self) -> anyhow::Result<()> {
331 let drt = self.drt();
332 let instance_id = drt.connection_id();
333 let endpoint_id = self.id();
334
335 let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
337
338 let instance = crate::discovery::DiscoveryInstance::Endpoint(Instance {
339 namespace: endpoint_id.namespace,
340 component: endpoint_id.component,
341 endpoint: endpoint_id.name,
342 instance_id,
343 transport,
344 });
345
346 let discovery = drt.discovery();
347 if let Err(e) = discovery.unregister(instance).await {
348 let endpoint_id = self.id();
349 tracing::error!(
350 %endpoint_id,
351 error = %e,
352 "Unable to unregister endpoint instance from discovery"
353 );
354 anyhow::bail!(
355 "Unable to unregister endpoint instance from discovery. Check discovery service status"
356 );
357 }
358
359 tracing::info!(
360 instance_id = instance_id,
361 "Successfully unregistered endpoint instance from discovery - worker removed from routing pool"
362 );
363
364 Ok(())
365 }
366
367 pub async fn register_endpoint_instance(&self) -> anyhow::Result<()> {
373 let drt = self.drt();
374 let instance_id = drt.connection_id();
375 let endpoint_id = self.id();
376
377 let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
379
380 let spec = crate::discovery::DiscoverySpec::Endpoint {
381 namespace: endpoint_id.namespace,
382 component: endpoint_id.component,
383 endpoint: endpoint_id.name,
384 transport,
385 };
386
387 let discovery = drt.discovery();
388 if let Err(e) = discovery.register(spec).await {
389 let endpoint_id = self.id();
390 tracing::error!(
391 %endpoint_id,
392 error = %e,
393 "Unable to re-register endpoint instance to discovery"
394 );
395 anyhow::bail!(
396 "Unable to re-register endpoint instance to discovery. Check discovery service status"
397 );
398 }
399
400 tracing::info!(
401 instance_id = instance_id,
402 "Successfully re-registered endpoint instance to discovery - worker added back to routing pool"
403 );
404
405 Ok(())
406 }
407}