simulator_client/
subscriptions.rs1use std::{future::Future, time::Duration};
2
3use futures::StreamExt;
4use solana_client::{
5 nonblocking::pubsub_client::PubsubClient,
6 rpc_response::{Response, RpcLogsResponse},
7};
8use solana_commitment_config::CommitmentConfig;
9use solana_rpc_client_api::config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter};
10use thiserror::Error;
11use tokio::{
12 sync::{oneshot, watch},
13 task::JoinHandle,
14};
15
16use crate::urls::{UrlError, http_to_ws_url};
17
18#[derive(Debug, Error)]
20pub enum SubscriptionError {
21 #[error(transparent)]
22 InvalidUrl(#[from] UrlError),
23
24 #[error("pubsub connect to {url} failed: {source}")]
25 Connect {
26 url: String,
27 #[source]
28 source: Box<dyn std::error::Error + Send + Sync>,
29 },
30
31 #[error("logs_subscribe failed: {source}")]
32 Subscribe {
33 #[source]
34 source: Box<dyn std::error::Error + Send + Sync>,
35 },
36
37 #[error("subscription task exited unexpectedly before signaling ready")]
38 TaskDropped,
39
40 #[error("session has no rpc_endpoint (was the session created?)")]
41 NoRpcEndpoint,
42}
43
44pub struct LogSubscriptionHandle {
46 pub join_handle: JoinHandle<()>,
51
52 pub stop: watch::Sender<bool>,
55}
56
57pub async fn subscribe_program_logs<F, Fut>(
95 rpc_endpoint: &str,
96 program_id: &str,
97 commitment: CommitmentConfig,
98 on_notification: F,
99) -> Result<LogSubscriptionHandle, SubscriptionError>
100where
101 F: Fn(Response<RpcLogsResponse>) -> Fut + Send + Sync + 'static,
102 Fut: Future<Output = ()> + Send + 'static,
103{
104 let ws_url = http_to_ws_url(rpc_endpoint)?;
105 let program_id = program_id.to_string();
106
107 let (ready_tx, ready_rx) = oneshot::channel::<Result<(), SubscriptionError>>();
108 let (stop_tx, mut stop_rx) = watch::channel(false);
109
110 let join_handle = tokio::spawn(async move {
114 let client = match PubsubClient::new(&ws_url).await {
115 Ok(c) => c,
116 Err(e) => {
117 let _ = ready_tx.send(Err(SubscriptionError::Connect {
118 url: ws_url,
119 source: Box::new(e),
120 }));
121 return;
122 }
123 };
124
125 let (mut stream, _unsubscribe) = match client
126 .logs_subscribe(
127 RpcTransactionLogsFilter::Mentions(vec![program_id]),
128 RpcTransactionLogsConfig {
129 commitment: Some(commitment),
130 },
131 )
132 .await
133 {
134 Ok(s) => s,
135 Err(e) => {
136 let _ = ready_tx.send(Err(SubscriptionError::Subscribe {
137 source: Box::new(e),
138 }));
139 return;
140 }
141 };
142
143 let _ = ready_tx.send(Ok(()));
144
145 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
146
147 loop {
148 if *stop_rx.borrow() {
149 let drain_until = tokio::time::Instant::now() + Duration::from_secs(1);
153 while let Ok(Some(notification)) =
154 tokio::time::timeout_at(drain_until, stream.next()).await
155 {
156 tasks.push(tokio::spawn(on_notification(notification)));
157 }
158 break;
159 }
160
161 let notification = tokio::select! {
162 n = stream.next() => n,
163 _ = stop_rx.changed() => continue,
164 };
165
166 match notification {
167 Some(n) => tasks.push(tokio::spawn(on_notification(n))),
168 None => break,
169 }
170 }
171
172 for task in tasks {
174 let _ = task.await;
175 }
176 });
177
178 match ready_rx.await {
179 Ok(Ok(())) => Ok(LogSubscriptionHandle {
180 join_handle,
181 stop: stop_tx,
182 }),
183 Ok(Err(e)) => {
184 join_handle.abort();
185 Err(e)
186 }
187 Err(_) => {
188 join_handle.abort();
189 Err(SubscriptionError::TaskDropped)
190 }
191 }
192}