1use std::{
2 collections::{hash_map::Entry, HashMap},
3 num::NonZeroUsize,
4 ops::DerefMut,
5 sync::Arc,
6 time::Duration,
7};
8
9use futures_util::{Stream, StreamExt, TryStreamExt};
10use gmsol_solana_utils::{
11 cluster::Cluster, solana_client::rpc_response::RpcLogsResponse, utils::WithSlot,
12};
13use solana_client::{
14 nonblocking::pubsub_client::PubsubClient as SolanaPubsubClient,
15 rpc_config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter},
16};
17use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
18use tokio::{
19 sync::{broadcast, oneshot, Mutex, RwLock},
20 task::{AbortHandle, JoinSet},
21};
22use tokio_stream::wrappers::BroadcastStream;
23use tracing::Instrument;
24
25#[derive(Debug)]
28pub struct PubsubClient {
29 inner: RwLock<Option<Inner>>,
30 cluster: Cluster,
31 config: SubscriptionConfig,
32}
33
34impl PubsubClient {
35 pub async fn new(cluster: Cluster, config: SubscriptionConfig) -> crate::Result<Self> {
37 Ok(Self {
38 inner: RwLock::new(None),
39 cluster,
40 config,
41 })
42 }
43
44 async fn prepare(&self) -> crate::Result<()> {
45 if self.inner.read().await.is_some() {
46 return Ok(());
47 }
48 self.reset().await
49 }
50
51 pub async fn logs_subscribe(
53 &self,
54 mention: &Pubkey,
55 commitment: Option<CommitmentConfig>,
56 ) -> crate::Result<impl Stream<Item = crate::Result<WithSlot<RpcLogsResponse>>>> {
57 self.prepare().await?;
58 let res = self
59 .inner
60 .read()
61 .await
62 .as_ref()
63 .ok_or_else(|| crate::Error::custom("the pubsub client has been closed"))?
64 .logs_subscribe(mention, commitment, &self.config)
65 .await;
66 match res {
67 Ok(stream) => Ok(stream),
68 Err(crate::Error::PubsubClosed) => {
69 self.reset().await?;
70 Err(crate::Error::PubsubClosed)
71 }
72 Err(err) => Err(err),
73 }
74 }
75
76 pub async fn reset(&self) -> crate::Result<()> {
78 let client = SolanaPubsubClient::new(self.cluster.ws_url())
79 .await
80 .map_err(crate::Error::custom)?;
81 let mut inner = self.inner.write().await;
82 if let Some(previous) = inner.take() {
83 _ = previous.shutdown().await;
84 }
85 *inner = Some(Inner::new(client));
86 Ok(())
87 }
88
89 pub async fn shutdown(&self) -> crate::Result<()> {
91 if let Some(inner) = self.inner.write().await.take() {
92 inner.shutdown().await?;
93 }
94 Ok(())
95 }
96}
97
98#[derive(Debug)]
99struct Inner {
100 tasks: Mutex<JoinSet<()>>,
101 client: Arc<SolanaPubsubClient>,
102 logs: LogsSubscriptions,
103}
104
105impl Inner {
106 fn new(client: SolanaPubsubClient) -> Self {
107 Self {
108 tasks: Default::default(),
109 client: Arc::new(client),
110 logs: Default::default(),
111 }
112 }
113
114 async fn logs_subscribe(
115 &self,
116 mention: &Pubkey,
117 commitment: Option<CommitmentConfig>,
118 config: &SubscriptionConfig,
119 ) -> crate::Result<impl Stream<Item = crate::Result<WithSlot<RpcLogsResponse>>>> {
120 let config = SubscriptionConfig {
121 commitment: commitment.unwrap_or(config.commitment),
122 ..*config
123 };
124 let receiver = self
125 .logs
126 .subscribe(
127 self.tasks.lock().await.deref_mut(),
128 &self.client,
129 mention,
130 config,
131 )
132 .await?;
133 Ok(BroadcastStream::new(receiver).map_err(crate::Error::custom))
134 }
135
136 async fn shutdown(self) -> crate::Result<()> {
137 self.tasks.lock().await.shutdown().await;
138 Arc::into_inner(self.client)
139 .ok_or_else(|| crate::Error::custom("the client should be unique here, but it is not"))?
140 .shutdown()
141 .await
142 .map_err(crate::Error::custom)?;
143 Ok(())
144 }
145}
146
147#[derive(Debug, Clone)]
149pub struct SubscriptionConfig {
150 pub commitment: CommitmentConfig,
152 pub cleanup_interval: Duration,
154 pub capacity: NonZeroUsize,
156}
157
158impl Default for SubscriptionConfig {
159 fn default() -> Self {
160 Self {
161 commitment: CommitmentConfig::finalized(),
162 cleanup_interval: Duration::from_secs(10),
163 capacity: NonZeroUsize::new(256).unwrap(),
164 }
165 }
166}
167
168#[derive(Debug)]
169struct LogsSubscription {
170 commitment: CommitmentConfig,
171 sender: ClosableSender<WithSlot<RpcLogsResponse>>,
172 abort: AbortHandle,
173}
174
175impl Drop for LogsSubscription {
176 fn drop(&mut self) {
177 self.abort.abort();
178 }
179}
180
181impl LogsSubscription {
182 async fn init(
183 join_set: &mut JoinSet<()>,
184 sender: ClosableSender<WithSlot<RpcLogsResponse>>,
185 client: &Arc<SolanaPubsubClient>,
186 mention: &Pubkey,
187 commitment: CommitmentConfig,
188 cleanup_interval: Duration,
189 ) -> crate::Result<Self> {
190 let (tx, rx) = oneshot::channel::<Result<_, _>>();
191 let abort = join_set.spawn({
192 let client = client.clone();
193 let mention = *mention;
194 let sender = sender.clone();
195 async move {
196 let res = client
197 .logs_subscribe(
198 RpcTransactionLogsFilter::Mentions(vec![mention.to_string()]),
199 RpcTransactionLogsConfig { commitment: Some(commitment) },
200 )
201 .await
202 .inspect_err(
203 |err| tracing::error!(%err, %mention, "failed to subscribe transaction logs"),
204 );
205 match res {
206 Ok((mut stream, unsubscribe)) => {
207 _ = tx.send(Ok(()));
208 let mut interval = tokio::time::interval(cleanup_interval);
209 loop {
210 tokio::select! {
211 _ = interval.tick() => {
212 if sender.receiver_count().unwrap_or(0) == 0 {
213 break;
214 }
215 }
216 res = stream.next() => {
217 match res {
218 Some(res) => {
219 if sender.send(WithSlot::new(res.context.slot, res.value)).unwrap_or(0) == 0 {
220 break;
221 }
222 }
223 None => break,
224 }
225 }
226 }
227 }
228 (unsubscribe)().await;
229 },
230 Err(err) => {
231 _ = tx.send(Err(err));
232 }
233 }
234 tracing::info!(%mention, "logs subscription end");
235 }
236 .in_current_span()
237 });
238 rx.await
239 .map_err(|_| crate::Error::custom("worker is dead"))?
240 .map_err(crate::Error::custom)?;
241 Ok(Self {
242 commitment,
243 abort,
244 sender,
245 })
246 }
247}
248
249#[derive(Debug, Default)]
250struct LogsSubscriptions(RwLock<HashMap<Pubkey, LogsSubscription>>);
251
252impl LogsSubscriptions {
253 async fn subscribe(
254 &self,
255 join_set: &mut JoinSet<()>,
256 client: &Arc<SolanaPubsubClient>,
257 mention: &Pubkey,
258 config: SubscriptionConfig,
259 ) -> crate::Result<broadcast::Receiver<WithSlot<RpcLogsResponse>>> {
260 let mut map = self.0.write().await;
261 loop {
262 match map.entry(*mention) {
263 Entry::Occupied(entry) => {
264 let subscription = entry.get();
265 if subscription.abort.is_finished() {
266 entry.remove();
267 } else {
268 if config.commitment != subscription.commitment {
269 return Err(crate::Error::custom(format!(
270 "commitment mismatched, current: {}",
271 subscription.commitment.commitment
272 )));
273 }
274 if let Some(receiver) = subscription.sender.subscribe() {
275 return Ok(receiver);
276 } else {
277 entry.remove();
278 }
279 }
280 }
281 Entry::Vacant(entry) => {
282 let (sender, receiver) = broadcast::channel(config.capacity.get());
283 let subscription = LogsSubscription::init(
284 join_set,
285 sender.into(),
286 client,
287 mention,
288 config.commitment,
289 config.cleanup_interval,
290 )
291 .await?;
292 entry.insert(subscription);
293 return Ok(receiver);
294 }
295 }
296 }
297 }
298}
299
300#[derive(Debug)]
301struct ClosableSender<T>(Arc<std::sync::RwLock<Option<broadcast::Sender<T>>>>);
302
303impl<T> From<broadcast::Sender<T>> for ClosableSender<T> {
304 fn from(sender: broadcast::Sender<T>) -> Self {
305 Self(Arc::new(std::sync::RwLock::new(Some(sender))))
306 }
307}
308
309impl<T> Clone for ClosableSender<T> {
310 fn clone(&self) -> Self {
311 Self(self.0.clone())
312 }
313}
314
315impl<T> ClosableSender<T> {
316 fn send(&self, value: T) -> Result<usize, broadcast::error::SendError<T>> {
317 match self.0.read().unwrap().as_ref() {
318 Some(sender) => sender.send(value),
319 None => Err(broadcast::error::SendError(value)),
320 }
321 }
322
323 fn receiver_count(&self) -> Option<usize> {
324 Some(self.0.read().unwrap().as_ref()?.receiver_count())
325 }
326
327 fn subscribe(&self) -> Option<broadcast::Receiver<T>> {
328 Some(self.0.read().unwrap().as_ref()?.subscribe())
329 }
330
331 fn close(&self) -> bool {
332 self.0.write().unwrap().take().is_some()
333 }
334}
335
336impl<T> Drop for ClosableSender<T> {
337 fn drop(&mut self) {
338 self.close();
339 }
340}