dynamo_runtime/component/
endpoint.rs1use std::sync::Arc;
5
6use anyhow::Result;
7use derive_builder::Builder;
8use derive_getters::Dissolve;
9use educe::Educe;
10use tokio_util::sync::CancellationToken;
11
12use crate::{
13 component::{Endpoint, Instance, TransportType},
14 distributed::RequestPlaneMode,
15 pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
16 protocols::EndpointId,
17 traits::DistributedRuntimeProvider,
18 transports::nats,
19};
20
21#[derive(Educe, Builder, Dissolve)]
22#[educe(Debug)]
23#[builder(pattern = "owned", build_fn(private, name = "build_internal"))]
24pub struct EndpointConfig {
25 #[builder(private)]
26 endpoint: Endpoint,
27
28 #[educe(Debug(ignore))]
30 handler: Arc<dyn PushWorkHandler>,
31
32 #[builder(default, setter(into))]
34 metrics_labels: Option<Vec<(String, String)>>,
35
36 #[builder(default = "true")]
38 graceful_shutdown: bool,
39
40 #[educe(Debug(ignore))]
44 #[builder(default, setter(into, strip_option))]
45 health_check_payload: Option<serde_json::Value>,
46}
47
48impl EndpointConfigBuilder {
49 pub(crate) fn from_endpoint(endpoint: Endpoint) -> Self {
50 Self::default().endpoint(endpoint)
51 }
52
53 pub fn register_local_engine(
55 self,
56 engine: crate::local_endpoint_registry::LocalAsyncEngine,
57 ) -> Result<Self> {
58 if let Some(endpoint) = &self.endpoint {
59 let registry = endpoint.drt().local_endpoint_registry();
60 registry.register(endpoint.name.clone(), engine);
61 tracing::debug!(
62 "Registered engine for endpoint '{}' in local registry",
63 endpoint.name
64 );
65 }
66 Ok(self)
67 }
68
69 pub async fn start(self) -> Result<()> {
70 let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
71 self.build_internal()?.dissolve();
72 let connection_id = endpoint.drt().connection_id();
73 let endpoint_id = endpoint.id();
74
75 tracing::debug!("Starting endpoint: {endpoint_id}");
76
77 let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
78 .as_ref()
79 .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
80 handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
82
83 let endpoint_shutdown_token = endpoint.drt().child_token();
86
87 let system_health = endpoint.drt().system_health();
88
89 if graceful_shutdown {
91 tracing::debug!(
92 "Registering endpoint '{}' with graceful shutdown tracker",
93 endpoint.name
94 );
95 let tracker = endpoint.drt().graceful_shutdown_tracker();
96 tracker.register_endpoint();
97 } else {
98 tracing::debug!("Endpoint '{}' has graceful_shutdown=false", endpoint.name);
99 }
100
101 let tracker_clone = if graceful_shutdown {
103 Some(endpoint.drt().graceful_shutdown_tracker())
104 } else {
105 None
106 };
107
108 let namespace_name_for_task = endpoint_id.namespace.clone();
110 let component_name_for_task = endpoint_id.component.clone();
111 let endpoint_name_for_task = endpoint_id.name.clone();
112
113 let server = endpoint.drt().request_plane_server().await?;
115
116 if let Some(health_check_payload) = &health_check_payload {
118 let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
120
121 let instance = Instance {
122 component: endpoint_id.component.clone(),
123 endpoint: endpoint_id.name.clone(),
124 namespace: endpoint_id.namespace.clone(),
125 instance_id: connection_id,
126 transport,
127 };
128 tracing::debug!(endpoint_name = %endpoint.name, "Registering endpoint health check target");
129 let guard = system_health.lock();
130 guard.register_health_check_target(
131 &endpoint.name,
132 instance,
133 health_check_payload.clone(),
134 );
135 if let Some(notifier) = guard.get_endpoint_health_check_notifier(&endpoint.name) {
136 handler.set_endpoint_health_check_notifier(notifier)?;
137 }
138 }
139
140 tracing::debug!(
141 endpoint = %endpoint_name_for_task,
142 transport = server.transport_name(),
143 "Registering endpoint with request plane server"
144 );
145
146 server
148 .register_endpoint(
149 endpoint_name_for_task.clone(),
150 handler,
151 connection_id,
152 namespace_name_for_task.clone(),
153 component_name_for_task.clone(),
154 system_health.clone(),
155 )
156 .await?;
157
158 let endpoint_name_for_cleanup = endpoint_name_for_task.clone();
160 let server_for_cleanup = server.clone();
161 let cancel_token_for_cleanup = endpoint_shutdown_token.clone();
162
163 let task: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
164 cancel_token_for_cleanup.cancelled().await;
165
166 tracing::debug!(
167 endpoint = %endpoint_name_for_cleanup,
168 "Unregistering endpoint from request plane server"
169 );
170
171 if let Err(e) = server_for_cleanup
173 .unregister_endpoint(&endpoint_name_for_cleanup)
174 .await
175 {
176 tracing::warn!(
177 endpoint = %endpoint_name_for_cleanup,
178 error = %e,
179 "Failed to unregister endpoint"
180 );
181 }
182
183 if let Some(tracker) = tracker_clone {
185 tracing::debug!("Unregister endpoint from graceful shutdown tracker");
186 tracker.unregister_endpoint();
187 }
188
189 anyhow::Ok(())
190 });
191
192 let discovery = endpoint.drt().discovery();
196
197 let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
199
200 let discovery_spec = crate::discovery::DiscoverySpec::Endpoint {
201 namespace: endpoint_id.namespace.clone(),
202 component: endpoint_id.component.clone(),
203 endpoint: endpoint_id.name.clone(),
204 transport,
205 };
206
207 if let Err(e) = discovery.register(discovery_spec).await {
208 tracing::error!(
209 %endpoint_id,
210 error = %e,
211 "Unable to register service for discovery"
212 );
213 endpoint_shutdown_token.cancel();
214 anyhow::bail!(
215 "Unable to register service for discovery. Check discovery service status"
216 );
217 }
218
219 task.await??;
220
221 Ok(())
222 }
223}
224
225fn build_transport_type_inner(
236 mode: RequestPlaneMode,
237 endpoint_id: &EndpointId,
238 connection_id: u64,
239) -> Result<TransportType> {
240 match mode {
241 RequestPlaneMode::Http => {
242 let http_host = crate::utils::get_http_rpc_host_from_env();
243 let http_port = std::env::var("DYN_HTTP_RPC_PORT")
244 .ok()
245 .and_then(|p| p.parse::<u16>().ok())
246 .unwrap_or(8888);
247 let rpc_root =
248 std::env::var("DYN_HTTP_RPC_ROOT_PATH").unwrap_or_else(|_| "/v1/rpc".to_string());
249
250 let http_endpoint = format!(
251 "http://{http_host}:{http_port}{rpc_root}/{}",
252 endpoint_id.name
253 );
254
255 Ok(TransportType::Http(http_endpoint))
256 }
257 RequestPlaneMode::Tcp => {
258 let tcp_host = crate::utils::get_tcp_rpc_host_from_env();
259 let tcp_port = std::env::var("DYN_TCP_RPC_PORT")
262 .ok()
263 .and_then(|p| p.parse::<u16>().ok())
264 .unwrap_or(crate::pipeline::network::manager::get_actual_tcp_rpc_port()?);
265
266 let tcp_endpoint = format!(
271 "{}:{}/{:x}/{}",
272 tcp_host, tcp_port, connection_id, endpoint_id.name
273 );
274
275 Ok(TransportType::Tcp(tcp_endpoint))
276 }
277 RequestPlaneMode::Nats => Ok(TransportType::Nats(nats::instance_subject(
278 endpoint_id,
279 connection_id,
280 ))),
281 }
282}
283
284pub async fn build_transport_type(
290 endpoint: &Endpoint,
291 endpoint_id: &EndpointId,
292 connection_id: u64,
293) -> Result<TransportType> {
294 let mode = endpoint.drt().request_plane();
295
296 if mode == RequestPlaneMode::Tcp {
297 let has_fixed_port = std::env::var("DYN_TCP_RPC_PORT")
299 .ok()
300 .and_then(|p| p.parse::<u16>().ok())
301 .is_some();
302
303 if !has_fixed_port {
304 let _ = endpoint.drt().request_plane_server().await?;
306 }
307 }
308
309 build_transport_type_inner(mode, endpoint_id, connection_id)
310}
311
312impl Endpoint {
313 pub async fn unregister_endpoint_instance(&self) -> anyhow::Result<()> {
319 let drt = self.drt();
320 let instance_id = drt.connection_id();
321 let endpoint_id = self.id();
322
323 let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
325
326 let instance = crate::discovery::DiscoveryInstance::Endpoint(Instance {
327 namespace: endpoint_id.namespace,
328 component: endpoint_id.component,
329 endpoint: endpoint_id.name,
330 instance_id,
331 transport,
332 });
333
334 let discovery = drt.discovery();
335 if let Err(e) = discovery.unregister(instance).await {
336 let endpoint_id = self.id();
337 tracing::error!(
338 %endpoint_id,
339 error = %e,
340 "Unable to unregister endpoint instance from discovery"
341 );
342 anyhow::bail!(
343 "Unable to unregister endpoint instance from discovery. Check discovery service status"
344 );
345 }
346
347 tracing::info!(
348 instance_id = instance_id,
349 "Successfully unregistered endpoint instance from discovery - worker removed from routing pool"
350 );
351
352 Ok(())
353 }
354
355 pub async fn register_endpoint_instance(&self) -> anyhow::Result<()> {
361 let drt = self.drt();
362 let instance_id = drt.connection_id();
363 let endpoint_id = self.id();
364
365 let transport = build_transport_type(self, &endpoint_id, instance_id).await?;
367
368 let spec = crate::discovery::DiscoverySpec::Endpoint {
369 namespace: endpoint_id.namespace,
370 component: endpoint_id.component,
371 endpoint: endpoint_id.name,
372 transport,
373 };
374
375 let discovery = drt.discovery();
376 if let Err(e) = discovery.register(spec).await {
377 let endpoint_id = self.id();
378 tracing::error!(
379 %endpoint_id,
380 error = %e,
381 "Unable to re-register endpoint instance to discovery"
382 );
383 anyhow::bail!(
384 "Unable to re-register endpoint instance to discovery. Check discovery service status"
385 );
386 }
387
388 tracing::info!(
389 instance_id = instance_id,
390 "Successfully re-registered endpoint instance to discovery - worker added back to routing pool"
391 );
392
393 Ok(())
394 }
395}