gmsol_sdk/client/
pubsub.rs

1use std::{
2    collections::{hash_map::Entry, HashMap},
3    num::NonZeroUsize,
4    ops::DerefMut,
5    sync::Arc,
6    time::Duration,
7};
8
9use futures_util::{Stream, StreamExt, TryStreamExt};
10use gmsol_solana_utils::{
11    cluster::Cluster, solana_client::rpc_response::RpcLogsResponse, utils::WithSlot,
12};
13use solana_client::{
14    nonblocking::pubsub_client::PubsubClient as SolanaPubsubClient,
15    rpc_config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter},
16};
17use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
18use tokio::{
19    sync::{broadcast, oneshot, Mutex, RwLock},
20    task::{AbortHandle, JoinSet},
21};
22use tokio_stream::wrappers::BroadcastStream;
23use tracing::Instrument;
24
25/// A wrapper of [the solana version of pubsub client](SolanaPubsubClient)
26/// with shared subscription support.
27#[derive(Debug)]
28pub struct PubsubClient {
29    inner: RwLock<Option<Inner>>,
30    cluster: Cluster,
31    config: SubscriptionConfig,
32}
33
34impl PubsubClient {
35    /// Create a new [`PubsubClient`] with the given config.
36    pub async fn new(cluster: Cluster, config: SubscriptionConfig) -> crate::Result<Self> {
37        Ok(Self {
38            inner: RwLock::new(None),
39            cluster,
40            config,
41        })
42    }
43
44    async fn prepare(&self) -> crate::Result<()> {
45        if self.inner.read().await.is_some() {
46            return Ok(());
47        }
48        self.reset().await
49    }
50
51    /// Subscribe to transaction logs.
52    pub async fn logs_subscribe(
53        &self,
54        mention: &Pubkey,
55        commitment: Option<CommitmentConfig>,
56    ) -> crate::Result<impl Stream<Item = crate::Result<WithSlot<RpcLogsResponse>>>> {
57        self.prepare().await?;
58        let res = self
59            .inner
60            .read()
61            .await
62            .as_ref()
63            .ok_or_else(|| crate::Error::custom("the pubsub client has been closed"))?
64            .logs_subscribe(mention, commitment, &self.config)
65            .await;
66        match res {
67            Ok(stream) => Ok(stream),
68            Err(crate::Error::PubsubClosed) => {
69                self.reset().await?;
70                Err(crate::Error::PubsubClosed)
71            }
72            Err(err) => Err(err),
73        }
74    }
75
76    /// Reset the client.
77    pub async fn reset(&self) -> crate::Result<()> {
78        let client = SolanaPubsubClient::new(self.cluster.ws_url())
79            .await
80            .map_err(crate::Error::custom)?;
81        let mut inner = self.inner.write().await;
82        if let Some(previous) = inner.take() {
83            _ = previous.shutdown().await;
84        }
85        *inner = Some(Inner::new(client));
86        Ok(())
87    }
88
89    /// Shutdown gracefully.
90    pub async fn shutdown(&self) -> crate::Result<()> {
91        if let Some(inner) = self.inner.write().await.take() {
92            inner.shutdown().await?;
93        }
94        Ok(())
95    }
96}
97
98#[derive(Debug)]
99struct Inner {
100    tasks: Mutex<JoinSet<()>>,
101    client: Arc<SolanaPubsubClient>,
102    logs: LogsSubscriptions,
103}
104
105impl Inner {
106    fn new(client: SolanaPubsubClient) -> Self {
107        Self {
108            tasks: Default::default(),
109            client: Arc::new(client),
110            logs: Default::default(),
111        }
112    }
113
114    async fn logs_subscribe(
115        &self,
116        mention: &Pubkey,
117        commitment: Option<CommitmentConfig>,
118        config: &SubscriptionConfig,
119    ) -> crate::Result<impl Stream<Item = crate::Result<WithSlot<RpcLogsResponse>>>> {
120        let config = SubscriptionConfig {
121            commitment: commitment.unwrap_or(config.commitment),
122            ..*config
123        };
124        let receiver = self
125            .logs
126            .subscribe(
127                self.tasks.lock().await.deref_mut(),
128                &self.client,
129                mention,
130                config,
131            )
132            .await?;
133        Ok(BroadcastStream::new(receiver).map_err(crate::Error::custom))
134    }
135
136    async fn shutdown(self) -> crate::Result<()> {
137        self.tasks.lock().await.shutdown().await;
138        Arc::into_inner(self.client)
139            .ok_or_else(|| crate::Error::custom("the client should be unique here, but it is not"))?
140            .shutdown()
141            .await
142            .map_err(crate::Error::custom)?;
143        Ok(())
144    }
145}
146
147/// Config for subscription manager.
148#[derive(Debug, Clone)]
149pub struct SubscriptionConfig {
150    /// Commitment.
151    pub commitment: CommitmentConfig,
152    /// Cleanup interval.
153    pub cleanup_interval: Duration,
154    /// Capacity for the broadcast channel.
155    pub capacity: NonZeroUsize,
156}
157
158impl Default for SubscriptionConfig {
159    fn default() -> Self {
160        Self {
161            commitment: CommitmentConfig::finalized(),
162            cleanup_interval: Duration::from_secs(10),
163            capacity: NonZeroUsize::new(256).unwrap(),
164        }
165    }
166}
167
168#[derive(Debug)]
169struct LogsSubscription {
170    commitment: CommitmentConfig,
171    sender: ClosableSender<WithSlot<RpcLogsResponse>>,
172    abort: AbortHandle,
173}
174
175impl Drop for LogsSubscription {
176    fn drop(&mut self) {
177        self.abort.abort();
178    }
179}
180
181impl LogsSubscription {
182    async fn init(
183        join_set: &mut JoinSet<()>,
184        sender: ClosableSender<WithSlot<RpcLogsResponse>>,
185        client: &Arc<SolanaPubsubClient>,
186        mention: &Pubkey,
187        commitment: CommitmentConfig,
188        cleanup_interval: Duration,
189    ) -> crate::Result<Self> {
190        let (tx, rx) = oneshot::channel::<Result<_, _>>();
191        let abort = join_set.spawn({
192            let client = client.clone();
193            let mention = *mention;
194            let sender = sender.clone();
195            async move {
196                let res = client
197                    .logs_subscribe(
198                        RpcTransactionLogsFilter::Mentions(vec![mention.to_string()]),
199                        RpcTransactionLogsConfig { commitment: Some(commitment) },
200                    )
201                    .await
202                    .inspect_err(
203                        |err| tracing::error!(%err, %mention, "failed to subscribe transaction logs"),
204                    );
205                match res {
206                    Ok((mut stream, unsubscribe)) => {
207                        _ = tx.send(Ok(()));
208                        let mut interval = tokio::time::interval(cleanup_interval);
209                        loop {
210                            tokio::select! {
211                                _ = interval.tick() => {
212                                    if sender.receiver_count().unwrap_or(0) == 0 {
213                                        break;
214                                    }
215                                }
216                                res = stream.next() => {
217                                    match res {
218                                        Some(res) => {
219                                            if sender.send(WithSlot::new(res.context.slot, res.value)).unwrap_or(0) == 0 {
220                                                break;
221                                            }
222                                        }
223                                        None => break,
224                                    }
225                                }
226                            }
227                        }
228                        (unsubscribe)().await;
229                    },
230                    Err(err) => {
231                        _ = tx.send(Err(err));
232                    }
233                }
234                tracing::info!(%mention, "logs subscription end");
235            }
236            .in_current_span()
237        });
238        rx.await
239            .map_err(|_| crate::Error::custom("worker is dead"))?
240            .map_err(crate::Error::custom)?;
241        Ok(Self {
242            commitment,
243            abort,
244            sender,
245        })
246    }
247}
248
249#[derive(Debug, Default)]
250struct LogsSubscriptions(RwLock<HashMap<Pubkey, LogsSubscription>>);
251
252impl LogsSubscriptions {
253    async fn subscribe(
254        &self,
255        join_set: &mut JoinSet<()>,
256        client: &Arc<SolanaPubsubClient>,
257        mention: &Pubkey,
258        config: SubscriptionConfig,
259    ) -> crate::Result<broadcast::Receiver<WithSlot<RpcLogsResponse>>> {
260        let mut map = self.0.write().await;
261        loop {
262            match map.entry(*mention) {
263                Entry::Occupied(entry) => {
264                    let subscription = entry.get();
265                    if subscription.abort.is_finished() {
266                        entry.remove();
267                    } else {
268                        if config.commitment != subscription.commitment {
269                            return Err(crate::Error::custom(format!(
270                                "commitment mismatched, current: {}",
271                                subscription.commitment.commitment
272                            )));
273                        }
274                        if let Some(receiver) = subscription.sender.subscribe() {
275                            return Ok(receiver);
276                        } else {
277                            entry.remove();
278                        }
279                    }
280                }
281                Entry::Vacant(entry) => {
282                    let (sender, receiver) = broadcast::channel(config.capacity.get());
283                    let subscription = LogsSubscription::init(
284                        join_set,
285                        sender.into(),
286                        client,
287                        mention,
288                        config.commitment,
289                        config.cleanup_interval,
290                    )
291                    .await?;
292                    entry.insert(subscription);
293                    return Ok(receiver);
294                }
295            }
296        }
297    }
298}
299
300#[derive(Debug)]
301struct ClosableSender<T>(Arc<std::sync::RwLock<Option<broadcast::Sender<T>>>>);
302
303impl<T> From<broadcast::Sender<T>> for ClosableSender<T> {
304    fn from(sender: broadcast::Sender<T>) -> Self {
305        Self(Arc::new(std::sync::RwLock::new(Some(sender))))
306    }
307}
308
309impl<T> Clone for ClosableSender<T> {
310    fn clone(&self) -> Self {
311        Self(self.0.clone())
312    }
313}
314
315impl<T> ClosableSender<T> {
316    fn send(&self, value: T) -> Result<usize, broadcast::error::SendError<T>> {
317        match self.0.read().unwrap().as_ref() {
318            Some(sender) => sender.send(value),
319            None => Err(broadcast::error::SendError(value)),
320        }
321    }
322
323    fn receiver_count(&self) -> Option<usize> {
324        Some(self.0.read().unwrap().as_ref()?.receiver_count())
325    }
326
327    fn subscribe(&self) -> Option<broadcast::Receiver<T>> {
328        Some(self.0.read().unwrap().as_ref()?.subscribe())
329    }
330
331    fn close(&self) -> bool {
332        self.0.write().unwrap().take().is_some()
333    }
334}
335
336impl<T> Drop for ClosableSender<T> {
337    fn drop(&mut self) {
338        self.close();
339    }
340}