pyth_lazer_client/
arc_swap.rs1use {
2 anyhow::Context as _,
3 arc_swap::ArcSwap,
4 futures::Stream,
5 futures_util::StreamExt as _,
6 std::sync::Arc,
7 tracing::{info, Instrument as _},
8};
9
10#[async_trait::async_trait]
11pub trait StreamIntoAutoUpdatedHandle: Stream + Unpin + Sized + 'static
12where
13 Self::Item: Send + Sync,
14{
15 async fn into_auto_updated_handle(mut self) -> anyhow::Result<Arc<ArcSwap<Self::Item>>> {
17 let first_value = self
18 .next()
19 .await
20 .context("cannot create auto updated handle from empty stream")?;
21 let handle = Arc::new(ArcSwap::new(Arc::new(first_value)));
22 let weak_handle = Arc::downgrade(&handle);
23 tokio::spawn(
24 async move {
25 while let Some(value) = self.next().await {
26 let Some(handle) = weak_handle.upgrade() else {
27 info!("handle dropped, stopping auto handle update task");
28 return;
29 };
30 handle.store(Arc::new(value));
31 }
32 }
33 .in_current_span(),
34 );
35 Ok(handle)
36 }
37}
38
39impl<T: Stream + Unpin + 'static> StreamIntoAutoUpdatedHandle for T where T::Item: Send + Sync {}