use serde::Deserialize;
use super::{DiscoveryEvent, DiscoveryInstance, DiscoveryStream};
pub fn watch_and_extract_field<T, V, F>(
stream: DiscoveryStream,
extractor: F,
) -> tokio::sync::watch::Receiver<std::collections::HashMap<u64, V>>
where
T: for<'de> Deserialize<'de> + 'static,
V: Clone + Send + Sync + 'static,
F: Fn(T) -> V + Send + 'static,
{
use futures::StreamExt;
use std::collections::HashMap;
let (tx, rx) = tokio::sync::watch::channel(HashMap::new());
tokio::spawn(async move {
let mut state: HashMap<u64, V> = HashMap::new();
let mut stream = stream;
while let Some(result) = stream.next().await {
match result {
Ok(DiscoveryEvent::Added(instance)) => {
let instance_id = instance.instance_id();
let deserialized: T = match instance.deserialize_model() {
Ok(d) => d,
Err(e) => {
tracing::warn!(
instance_id,
error = %e,
"Failed to deserialize discovery instance, skipping"
);
continue;
}
};
let value = extractor(deserialized);
state.insert(instance_id, value);
if tx.send(state.clone()).is_err() {
tracing::debug!("watch_and_extract_field receiver dropped, stopping");
break;
}
}
Ok(DiscoveryEvent::Removed(id)) => {
state.remove(&id.instance_id());
if tx.send(state.clone()).is_err() {
tracing::debug!("watch_and_extract_field receiver dropped, stopping");
break;
}
}
Err(e) => {
tracing::error!(error = %e, "Discovery event stream error in watch_and_extract_field");
}
}
}
tracing::debug!("watch_and_extract_field task stopped");
});
rx
}