pyth_lazer_client/
arc_swap.rs

1use {
2    anyhow::Context as _,
3    arc_swap::ArcSwap,
4    futures::Stream,
5    futures_util::StreamExt as _,
6    std::sync::Arc,
7    tracing::{info, Instrument as _},
8};
9
10#[async_trait::async_trait]
11pub trait StreamIntoAutoUpdatedHandle: Stream + Unpin + Sized + 'static
12where
13    Self::Item: Send + Sync,
14{
15    /// Create an `ArcSwap` that provides access to the most recent value produced by the stream.
16    async fn into_auto_updated_handle(mut self) -> anyhow::Result<Arc<ArcSwap<Self::Item>>> {
17        let first_value = self
18            .next()
19            .await
20            .context("cannot create auto updated handle from empty stream")?;
21        let handle = Arc::new(ArcSwap::new(Arc::new(first_value)));
22        let weak_handle = Arc::downgrade(&handle);
23        tokio::spawn(
24            async move {
25                while let Some(value) = self.next().await {
26                    let Some(handle) = weak_handle.upgrade() else {
27                        info!("handle dropped, stopping auto handle update task");
28                        return;
29                    };
30                    handle.store(Arc::new(value));
31                }
32            }
33            .in_current_span(),
34        );
35        Ok(handle)
36    }
37}
38
39impl<T: Stream + Unpin + 'static> StreamIntoAutoUpdatedHandle for T where T::Item: Send + Sync {}