dynamo_runtime/component/
client.rs1use std::sync::Arc;
5use std::{collections::HashMap, time::Duration};
6
7use anyhow::Result;
8use arc_swap::ArcSwap;
9use futures::StreamExt;
10use tokio::net::unix::pipe::Receiver;
11
12use crate::discovery::{DiscoveryEvent, DiscoveryInstance, DiscoveryInstanceId};
13use crate::{
14 component::{Endpoint, Instance},
15 pipeline::async_trait,
16 pipeline::{
17 AddressedPushRouter, AddressedRequest, AsyncEngine, Data, ManyOut, PushRouter, RouterMode,
18 SingleIn,
19 },
20 traits::DistributedRuntimeProvider,
21 transports::etcd::Client as EtcdClient,
22};
23
24const DEFAULT_RECONCILE_INTERVAL: Duration = Duration::from_secs(5);
26
27#[derive(Clone, Debug)]
28pub struct Client {
29 pub endpoint: Endpoint,
31 pub instance_source: Arc<tokio::sync::watch::Receiver<Vec<Instance>>>,
33 instance_avail: Arc<ArcSwap<Vec<u64>>>,
35 instance_free: Arc<ArcSwap<Vec<u64>>>,
37 instance_avail_tx: Arc<tokio::sync::watch::Sender<Vec<u64>>>,
39 instance_avail_rx: tokio::sync::watch::Receiver<Vec<u64>>,
41 reconcile_interval: Duration,
44}
45
46impl Client {
47 pub(crate) async fn new(endpoint: Endpoint) -> Result<Self> {
49 Self::with_reconcile_interval(endpoint, DEFAULT_RECONCILE_INTERVAL).await
50 }
51
52 pub(crate) async fn with_reconcile_interval(
56 endpoint: Endpoint,
57 reconcile_interval: Duration,
58 ) -> Result<Self> {
59 tracing::trace!(
60 "Client::new_dynamic: Creating dynamic client for endpoint: {}",
61 endpoint.id()
62 );
63 let instance_source = Self::get_or_create_dynamic_instance_source(&endpoint).await?;
64
65 let (avail_tx, avail_rx) = tokio::sync::watch::channel(vec![]);
66 let client = Client {
67 endpoint: endpoint.clone(),
68 instance_source: instance_source.clone(),
69 instance_avail: Arc::new(ArcSwap::from(Arc::new(vec![]))),
70 instance_free: Arc::new(ArcSwap::from(Arc::new(vec![]))),
71 instance_avail_tx: Arc::new(avail_tx),
72 instance_avail_rx: avail_rx,
73 reconcile_interval,
74 };
75 client.monitor_instance_source();
76 Ok(client)
77 }
78
79 pub fn instances(&self) -> Vec<Instance> {
81 self.instance_source.borrow().clone()
82 }
83
84 pub fn instance_ids(&self) -> Vec<u64> {
85 self.instances().into_iter().map(|ep| ep.id()).collect()
86 }
87
88 pub fn instance_ids_avail(&self) -> arc_swap::Guard<Arc<Vec<u64>>> {
89 self.instance_avail.load()
90 }
91
92 pub fn instance_ids_free(&self) -> arc_swap::Guard<Arc<Vec<u64>>> {
93 self.instance_free.load()
94 }
95
96 pub fn instance_avail_watcher(&self) -> tokio::sync::watch::Receiver<Vec<u64>> {
98 self.instance_avail_rx.clone()
99 }
100
101 pub async fn wait_for_instances(&self) -> Result<Vec<Instance>> {
103 tracing::trace!(
104 "wait_for_instances: Starting wait for endpoint: {}",
105 self.endpoint.id()
106 );
107 let mut rx = self.instance_source.as_ref().clone();
108 let mut instances: Vec<Instance>;
110 loop {
111 instances = rx.borrow_and_update().to_vec();
112 if instances.is_empty() {
113 rx.changed().await?;
114 } else {
115 tracing::info!(
116 "wait_for_instances: Found {} instance(s) for endpoint: {}",
117 instances.len(),
118 self.endpoint.id()
119 );
120 break;
121 }
122 }
123 Ok(instances)
124 }
125
126 pub fn report_instance_down(&self, instance_id: u64) {
128 let filtered = self
129 .instance_ids_avail()
130 .iter()
131 .filter_map(|&id| if id == instance_id { None } else { Some(id) })
132 .collect::<Vec<_>>();
133 self.instance_avail.store(Arc::new(filtered.clone()));
134
135 let _ = self.instance_avail_tx.send(filtered);
137
138 tracing::debug!("inhibiting instance {instance_id}");
139 }
140
141 pub fn update_free_instances(&self, busy_instance_ids: &[u64]) {
143 let all_instance_ids = self.instance_ids();
144 let free_ids: Vec<u64> = all_instance_ids
145 .into_iter()
146 .filter(|id| !busy_instance_ids.contains(id))
147 .collect();
148 self.instance_free.store(Arc::new(free_ids));
149 }
150
151 fn monitor_instance_source(&self) {
158 let reconcile_interval = self.reconcile_interval;
159 let cancel_token = self.endpoint.drt().primary_token();
160 let client = self.clone();
161 let endpoint_id = self.endpoint.id();
162 tokio::task::spawn(async move {
163 let mut rx = client.instance_source.as_ref().clone();
164 while !cancel_token.is_cancelled() {
165 let instance_ids: Vec<u64> = rx
166 .borrow_and_update()
167 .iter()
168 .map(|instance| instance.id())
169 .collect();
170
171 client.instance_avail.store(Arc::new(instance_ids.clone()));
173 client.instance_free.store(Arc::new(instance_ids.clone()));
174
175 let _ = client.instance_avail_tx.send(instance_ids);
177
178 tokio::select! {
179 result = rx.changed() => {
180 if let Err(err) = result {
181 tracing::error!(
182 "monitor_instance_source: The Sender is dropped: {err}, endpoint={endpoint_id}",
183 );
184 cancel_token.cancel();
185 }
186 }
187 _ = tokio::time::sleep(reconcile_interval) => {
188 tracing::trace!(
189 "monitor_instance_source: periodic reconciliation for endpoint={endpoint_id}",
190 );
191 }
192 }
193 }
194 });
195 }
196
197 async fn get_or_create_dynamic_instance_source(
198 endpoint: &Endpoint,
199 ) -> Result<Arc<tokio::sync::watch::Receiver<Vec<Instance>>>> {
200 let drt = endpoint.drt();
201 let instance_sources = drt.instance_sources();
202 let mut instance_sources = instance_sources.lock().await;
203
204 if let Some(instance_source) = instance_sources.get(endpoint) {
205 if let Some(instance_source) = instance_source.upgrade() {
206 return Ok(instance_source);
207 } else {
208 instance_sources.remove(endpoint);
209 }
210 }
211
212 let discovery = drt.discovery();
213 let discovery_query = crate::discovery::DiscoveryQuery::Endpoint {
214 namespace: endpoint.component.namespace.name.clone(),
215 component: endpoint.component.name.clone(),
216 endpoint: endpoint.name.clone(),
217 };
218
219 let mut discovery_stream = discovery
220 .list_and_watch(discovery_query.clone(), None)
221 .await?;
222 let (watch_tx, watch_rx) = tokio::sync::watch::channel(vec![]);
223
224 let secondary = endpoint.component.drt.runtime().secondary().clone();
225
226 secondary.spawn(async move {
227 tracing::trace!("endpoint_watcher: Starting for discovery query: {:?}", discovery_query);
228 let mut map: HashMap<u64, Instance> = HashMap::new();
229
230 loop {
231 let discovery_event = tokio::select! {
232 _ = watch_tx.closed() => {
233 break;
234 }
235 discovery_event = discovery_stream.next() => {
236 match discovery_event {
237 Some(Ok(event)) => {
238 event
239 },
240 Some(Err(e)) => {
241 tracing::error!("endpoint_watcher: discovery stream error: {}; shutting down for discovery query: {:?}", e, discovery_query);
242 break;
243 }
244 None => {
245 break;
246 }
247 }
248 }
249 };
250
251 match discovery_event {
252 DiscoveryEvent::Added(discovery_instance) => {
253 if let DiscoveryInstance::Endpoint(instance) = discovery_instance {
254
255 map.insert(instance.instance_id, instance);
256 }
257 }
258 DiscoveryEvent::Removed(id) => {
259 map.remove(&id.instance_id());
260 }
261 }
262
263 let instances: Vec<Instance> = map.values().cloned().collect();
264 if watch_tx.send(instances).is_err() {
265 break;
266 }
267 }
268 let _ = watch_tx.send(vec![]);
269 });
270
271 let instance_source = Arc::new(watch_rx);
272 instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source));
273 Ok(instance_source)
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use crate::{DistributedRuntime, Runtime, distributed::DistributedConfig};
281
282 #[tokio::test]
285 async fn test_instance_reconciliation() {
286 const TEST_RECONCILE_INTERVAL: Duration = Duration::from_millis(100);
287
288 let rt = Runtime::from_current().unwrap();
289 let drt = DistributedRuntime::new(rt.clone(), DistributedConfig::process_local())
291 .await
292 .unwrap();
293 let ns = drt.namespace("test_reconciliation".to_string()).unwrap();
294 let component = ns.component("test_component".to_string()).unwrap();
295 let endpoint = component.endpoint("test_endpoint".to_string());
296
297 let client = Client::with_reconcile_interval(endpoint, TEST_RECONCILE_INTERVAL)
299 .await
300 .unwrap();
301
302 assert!(client.instance_ids_avail().is_empty());
304
305 client.instance_avail.store(Arc::new(vec![1, 2, 3]));
308
309 assert_eq!(**client.instance_ids_avail(), vec![1u64, 2, 3]);
310
311 client.report_instance_down(2);
313 assert_eq!(**client.instance_ids_avail(), vec![1u64, 3]);
314
315 tokio::time::sleep(TEST_RECONCILE_INTERVAL + Duration::from_millis(50)).await;
319
320 assert!(
322 client.instance_ids_avail().is_empty(),
323 "After reconciliation, instance_avail should match instance_source"
324 );
325
326 rt.shutdown();
327 }
328
329 #[tokio::test]
331 async fn test_report_instance_down() {
332 let rt = Runtime::from_current().unwrap();
333 let drt = DistributedRuntime::new(rt.clone(), DistributedConfig::process_local())
335 .await
336 .unwrap();
337 let ns = drt.namespace("test_report_down".to_string()).unwrap();
338 let component = ns.component("test_component".to_string()).unwrap();
339 let endpoint = component.endpoint("test_endpoint".to_string());
340
341 let client = endpoint.client().await.unwrap();
342
343 client.instance_avail.store(Arc::new(vec![1, 2, 3]));
345 assert_eq!(**client.instance_ids_avail(), vec![1u64, 2, 3]);
346
347 client.report_instance_down(2);
349
350 let avail = client.instance_ids_avail();
352 assert!(avail.contains(&1), "Instance 1 should still be available");
353 assert!(
354 !avail.contains(&2),
355 "Instance 2 should be removed after report_instance_down"
356 );
357 assert!(avail.contains(&3), "Instance 3 should still be available");
358
359 rt.shutdown();
360 }
361
362 #[tokio::test]
364 async fn test_instance_avail_watcher() {
365 let rt = Runtime::from_current().unwrap();
366 let drt = DistributedRuntime::new(rt.clone(), DistributedConfig::process_local())
368 .await
369 .unwrap();
370 let ns = drt.namespace("test_watcher".to_string()).unwrap();
371 let component = ns.component("test_component".to_string()).unwrap();
372 let endpoint = component.endpoint("test_endpoint".to_string());
373
374 let client = endpoint.client().await.unwrap();
375 let watcher = client.instance_avail_watcher();
376
377 client.instance_avail.store(Arc::new(vec![1, 2, 3]));
379
380 client.report_instance_down(2);
382
383 let current = watcher.borrow().clone();
386 assert_eq!(current, vec![1, 3]);
387
388 rt.shutdown();
389 }
390}