dynamo_runtime/component/
client.rs1use std::sync::Arc;
5use std::{collections::HashMap, time::Duration};
6
7use anyhow::Result;
8use arc_swap::ArcSwap;
9use futures::StreamExt;
10use tokio::net::unix::pipe::Receiver;
11
12use crate::discovery::{DiscoveryEvent, DiscoveryInstance, DiscoveryInstanceId};
13use crate::{
14 component::{Endpoint, Instance},
15 pipeline::async_trait,
16 pipeline::{
17 AddressedPushRouter, AddressedRequest, AsyncEngine, Data, ManyOut, PushRouter, RouterMode,
18 SingleIn,
19 },
20 traits::DistributedRuntimeProvider,
21 transports::etcd::Client as EtcdClient,
22};
23
24const DEFAULT_RECONCILE_INTERVAL: Duration = Duration::from_secs(5);
26
27#[derive(Clone, Debug)]
28pub struct Client {
29 pub endpoint: Endpoint,
31 pub instance_source: Arc<tokio::sync::watch::Receiver<Vec<Instance>>>,
33 instance_avail: Arc<ArcSwap<Vec<u64>>>,
35 instance_free: Arc<ArcSwap<Vec<u64>>>,
37 instance_avail_tx: Arc<tokio::sync::watch::Sender<Vec<u64>>>,
39 instance_avail_rx: tokio::sync::watch::Receiver<Vec<u64>>,
41 reconcile_interval: Duration,
44}
45
46impl Client {
47 pub(crate) async fn new(endpoint: Endpoint) -> Result<Self> {
49 Self::with_reconcile_interval(endpoint, DEFAULT_RECONCILE_INTERVAL).await
50 }
51
52 pub(crate) async fn with_reconcile_interval(
56 endpoint: Endpoint,
57 reconcile_interval: Duration,
58 ) -> Result<Self> {
59 tracing::trace!(
60 "Client::new_dynamic: Creating dynamic client for endpoint: {}",
61 endpoint.id()
62 );
63 let instance_source = Self::get_or_create_dynamic_instance_source(&endpoint).await?;
64
65 let initial_ids: Vec<u64> = instance_source
70 .borrow()
71 .iter()
72 .map(|instance| instance.id())
73 .collect();
74 let (avail_tx, avail_rx) = tokio::sync::watch::channel(initial_ids.clone());
75 let client = Client {
76 endpoint: endpoint.clone(),
77 instance_source: instance_source.clone(),
78 instance_avail: Arc::new(ArcSwap::from(Arc::new(initial_ids.clone()))),
79 instance_free: Arc::new(ArcSwap::from(Arc::new(initial_ids))),
80 instance_avail_tx: Arc::new(avail_tx),
81 instance_avail_rx: avail_rx,
82 reconcile_interval,
83 };
84 client.monitor_instance_source();
85 Ok(client)
86 }
87
88 pub fn instances(&self) -> Vec<Instance> {
90 self.instance_source.borrow().clone()
91 }
92
93 pub fn instance_ids(&self) -> Vec<u64> {
94 self.instances().into_iter().map(|ep| ep.id()).collect()
95 }
96
97 pub fn instance_ids_avail(&self) -> arc_swap::Guard<Arc<Vec<u64>>> {
98 self.instance_avail.load()
99 }
100
101 pub fn instance_ids_free(&self) -> arc_swap::Guard<Arc<Vec<u64>>> {
102 self.instance_free.load()
103 }
104
105 pub fn instance_avail_watcher(&self) -> tokio::sync::watch::Receiver<Vec<u64>> {
107 self.instance_avail_rx.clone()
108 }
109
110 pub async fn wait_for_instances(&self) -> Result<Vec<Instance>> {
112 tracing::trace!(
113 "wait_for_instances: Starting wait for endpoint: {}",
114 self.endpoint.id()
115 );
116 let mut rx = self.instance_source.as_ref().clone();
117 let mut instances: Vec<Instance>;
119 loop {
120 instances = rx.borrow_and_update().to_vec();
121 if instances.is_empty() {
122 rx.changed().await?;
123 } else {
124 tracing::info!(
125 "wait_for_instances: Found {} instance(s) for endpoint: {}",
126 instances.len(),
127 self.endpoint.id()
128 );
129 break;
130 }
131 }
132 Ok(instances)
133 }
134
135 pub fn report_instance_down(&self, instance_id: u64) {
137 let filtered = self
138 .instance_ids_avail()
139 .iter()
140 .filter_map(|&id| if id == instance_id { None } else { Some(id) })
141 .collect::<Vec<_>>();
142 self.instance_avail.store(Arc::new(filtered.clone()));
143
144 let _ = self.instance_avail_tx.send(filtered);
146
147 tracing::debug!("inhibiting instance {instance_id}");
148 }
149
150 pub fn update_free_instances(&self, busy_instance_ids: &[u64]) {
152 let all_instance_ids = self.instance_ids();
153 let free_ids: Vec<u64> = all_instance_ids
154 .into_iter()
155 .filter(|id| !busy_instance_ids.contains(id))
156 .collect();
157 self.instance_free.store(Arc::new(free_ids));
158 }
159
160 fn monitor_instance_source(&self) {
167 let reconcile_interval = self.reconcile_interval;
168 let cancel_token = self.endpoint.drt().primary_token();
169 let client = self.clone();
170 let endpoint_id = self.endpoint.id();
171 tokio::task::spawn(async move {
172 let mut rx = client.instance_source.as_ref().clone();
173 while !cancel_token.is_cancelled() {
174 let instance_ids: Vec<u64> = rx
175 .borrow_and_update()
176 .iter()
177 .map(|instance| instance.id())
178 .collect();
179
180 client.instance_avail.store(Arc::new(instance_ids.clone()));
182 client.instance_free.store(Arc::new(instance_ids.clone()));
183
184 let _ = client.instance_avail_tx.send(instance_ids);
186
187 tokio::select! {
188 result = rx.changed() => {
189 if let Err(err) = result {
190 tracing::error!(
191 "monitor_instance_source: The Sender is dropped: {err}, endpoint={endpoint_id}",
192 );
193 cancel_token.cancel();
194 }
195 }
196 _ = tokio::time::sleep(reconcile_interval) => {
197 tracing::trace!(
198 "monitor_instance_source: periodic reconciliation for endpoint={endpoint_id}",
199 );
200 }
201 }
202 }
203 });
204 }
205
206 async fn get_or_create_dynamic_instance_source(
207 endpoint: &Endpoint,
208 ) -> Result<Arc<tokio::sync::watch::Receiver<Vec<Instance>>>> {
209 let drt = endpoint.drt();
210 let instance_sources = drt.instance_sources();
211 let mut instance_sources = instance_sources.lock().await;
212
213 if let Some(instance_source) = instance_sources.get(endpoint) {
214 if let Some(instance_source) = instance_source.upgrade() {
215 return Ok(instance_source);
216 } else {
217 instance_sources.remove(endpoint);
218 }
219 }
220
221 let discovery = drt.discovery();
222 let discovery_query = crate::discovery::DiscoveryQuery::Endpoint {
223 namespace: endpoint.component.namespace.name.clone(),
224 component: endpoint.component.name.clone(),
225 endpoint: endpoint.name.clone(),
226 };
227
228 let mut discovery_stream = discovery
229 .list_and_watch(discovery_query.clone(), None)
230 .await?;
231 let (watch_tx, watch_rx) = tokio::sync::watch::channel(vec![]);
232
233 let secondary = endpoint.component.drt.runtime().secondary().clone();
234
235 secondary.spawn(async move {
236 tracing::trace!("endpoint_watcher: Starting for discovery query: {:?}", discovery_query);
237 let mut map: HashMap<u64, Instance> = HashMap::new();
238
239 loop {
240 let discovery_event = tokio::select! {
241 _ = watch_tx.closed() => {
242 break;
243 }
244 discovery_event = discovery_stream.next() => {
245 match discovery_event {
246 Some(Ok(event)) => {
247 event
248 },
249 Some(Err(e)) => {
250 tracing::error!("endpoint_watcher: discovery stream error: {}; shutting down for discovery query: {:?}", e, discovery_query);
251 break;
252 }
253 None => {
254 break;
255 }
256 }
257 }
258 };
259
260 match discovery_event {
261 DiscoveryEvent::Added(discovery_instance) => {
262 if let DiscoveryInstance::Endpoint(instance) = discovery_instance {
263
264 map.insert(instance.instance_id, instance);
265 }
266 }
267 DiscoveryEvent::Removed(id) => {
268 map.remove(&id.instance_id());
269 }
270 }
271
272 let instances: Vec<Instance> = map.values().cloned().collect();
273 if watch_tx.send(instances).is_err() {
274 break;
275 }
276 }
277 let _ = watch_tx.send(vec![]);
278 });
279
280 let instance_source = Arc::new(watch_rx);
281 instance_sources.insert(endpoint.clone(), Arc::downgrade(&instance_source));
282 Ok(instance_source)
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use crate::{DistributedRuntime, Runtime, distributed::DistributedConfig};
290
291 #[tokio::test]
294 async fn test_instance_reconciliation() {
295 const TEST_RECONCILE_INTERVAL: Duration = Duration::from_millis(100);
296
297 let rt = Runtime::from_current().unwrap();
298 let drt = DistributedRuntime::new(rt.clone(), DistributedConfig::process_local())
300 .await
301 .unwrap();
302 let ns = drt.namespace("test_reconciliation".to_string()).unwrap();
303 let component = ns.component("test_component".to_string()).unwrap();
304 let endpoint = component.endpoint("test_endpoint".to_string());
305
306 let client = Client::with_reconcile_interval(endpoint, TEST_RECONCILE_INTERVAL)
308 .await
309 .unwrap();
310
311 assert!(client.instance_ids_avail().is_empty());
313
314 client.instance_avail.store(Arc::new(vec![1, 2, 3]));
317
318 assert_eq!(**client.instance_ids_avail(), vec![1u64, 2, 3]);
319
320 client.report_instance_down(2);
322 assert_eq!(**client.instance_ids_avail(), vec![1u64, 3]);
323
324 tokio::time::sleep(TEST_RECONCILE_INTERVAL + Duration::from_millis(50)).await;
328
329 assert!(
331 client.instance_ids_avail().is_empty(),
332 "After reconciliation, instance_avail should match instance_source"
333 );
334
335 rt.shutdown();
336 }
337
338 #[tokio::test]
340 async fn test_report_instance_down() {
341 let rt = Runtime::from_current().unwrap();
342 let drt = DistributedRuntime::new(rt.clone(), DistributedConfig::process_local())
344 .await
345 .unwrap();
346 let ns = drt.namespace("test_report_down".to_string()).unwrap();
347 let component = ns.component("test_component".to_string()).unwrap();
348 let endpoint = component.endpoint("test_endpoint".to_string());
349
350 let client = endpoint.client().await.unwrap();
351
352 client.instance_avail.store(Arc::new(vec![1, 2, 3]));
354 assert_eq!(**client.instance_ids_avail(), vec![1u64, 2, 3]);
355
356 client.report_instance_down(2);
358
359 let avail = client.instance_ids_avail();
361 assert!(avail.contains(&1), "Instance 1 should still be available");
362 assert!(
363 !avail.contains(&2),
364 "Instance 2 should be removed after report_instance_down"
365 );
366 assert!(avail.contains(&3), "Instance 3 should still be available");
367
368 rt.shutdown();
369 }
370
371 #[tokio::test]
373 async fn test_instance_avail_watcher() {
374 let rt = Runtime::from_current().unwrap();
375 let drt = DistributedRuntime::new(rt.clone(), DistributedConfig::process_local())
377 .await
378 .unwrap();
379 let ns = drt.namespace("test_watcher".to_string()).unwrap();
380 let component = ns.component("test_component".to_string()).unwrap();
381 let endpoint = component.endpoint("test_endpoint".to_string());
382
383 let client = endpoint.client().await.unwrap();
384 let watcher = client.instance_avail_watcher();
385
386 client.instance_avail.store(Arc::new(vec![1, 2, 3]));
388
389 client.report_instance_down(2);
391
392 let current = watcher.borrow().clone();
395 assert_eq!(current, vec![1, 3]);
396
397 rt.shutdown();
398 }
399}