Skip to main content

simulator_client/
subscriptions.rs

1use std::{future::Future, time::Duration};
2
3use futures::StreamExt;
4use solana_client::{
5    nonblocking::pubsub_client::PubsubClient,
6    rpc_response::{Response, RpcLogsResponse},
7};
8use solana_commitment_config::CommitmentConfig;
9use solana_rpc_client_api::config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter};
10use thiserror::Error;
11use tokio::{
12    sync::{oneshot, watch},
13    task::JoinHandle,
14};
15
16use crate::urls::{UrlError, http_to_ws_url};
17
18/// Error establishing a PubSub log subscription.
19#[derive(Debug, Error)]
20pub enum SubscriptionError {
21    #[error(transparent)]
22    InvalidUrl(#[from] UrlError),
23
24    #[error("pubsub connect to {url} failed: {source}")]
25    Connect {
26        url: String,
27        #[source]
28        source: Box<dyn std::error::Error + Send + Sync>,
29    },
30
31    #[error("logs_subscribe failed: {source}")]
32    Subscribe {
33        #[source]
34        source: Box<dyn std::error::Error + Send + Sync>,
35    },
36
37    #[error("subscription task exited unexpectedly before signaling ready")]
38    TaskDropped,
39
40    #[error("session has no rpc_endpoint (was the session created?)")]
41    NoRpcEndpoint,
42}
43
44/// Handle for a running log subscription background task.
45pub struct LogSubscriptionHandle {
46    /// Background task that drives the subscription and spawns per-notification callbacks.
47    ///
48    /// Resolves after `stop.send(true)` is called, remaining buffered
49    /// notifications are drained, and all spawned callback tasks complete.
50    pub join_handle: JoinHandle<()>,
51
52    /// Send `true` to signal the background task to stop accepting new
53    /// notifications, drain remaining buffered ones, and exit cleanly.
54    pub stop: watch::Sender<bool>,
55}
56
57/// Subscribe to program log notifications and invoke a callback for each one.
58///
59/// Spawns a background task that:
60/// 1. Connects to the PubSub endpoint derived from `rpc_endpoint`.
61/// 2. Subscribes to logs mentioning `program_id`.
62/// 3. For each notification, spawns `on_notification(notification)` as a Tokio task.
63/// 4. When `handle.stop.send(true)` is called, drains remaining buffered
64///    notifications (up to 1s), waits for all spawned tasks, then returns.
65///
66/// Returns after the subscription is established. If setup fails, an error is
67/// returned before any background task is left running.
68///
69/// ## Example
70///
71/// ```no_run
72/// use std::sync::{Arc, Mutex};
73/// use simulator_client::subscribe_program_logs;
74/// use solana_commitment_config::CommitmentConfig;
75///
76/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
77/// let handle = subscribe_program_logs(
78///     "https://api.mainnet-beta.solana.com",
79///     "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
80///     CommitmentConfig::confirmed(),
81///     |notification| async move {
82///         println!("sig: {}", notification.value.signature);
83///     },
84/// )
85/// .await?;
86///
87/// // ... do other work ...
88///
89/// handle.stop.send(true).ok();
90/// handle.join_handle.await.ok();
91/// # Ok(())
92/// # }
93/// ```
94pub async fn subscribe_program_logs<F, Fut>(
95    rpc_endpoint: &str,
96    program_id: &str,
97    commitment: CommitmentConfig,
98    on_notification: F,
99) -> Result<LogSubscriptionHandle, SubscriptionError>
100where
101    F: Fn(Response<RpcLogsResponse>) -> Fut + Send + Sync + 'static,
102    Fut: Future<Output = ()> + Send + 'static,
103{
104    let ws_url = http_to_ws_url(rpc_endpoint)?;
105    let program_id = program_id.to_string();
106
107    let (ready_tx, ready_rx) = oneshot::channel::<Result<(), SubscriptionError>>();
108    let (stop_tx, mut stop_rx) = watch::channel(false);
109
110    // PubsubClient::logs_subscribe borrows &self, so both the client and the
111    // stream must live inside the spawned task. We report setup success/failure
112    // back through a oneshot channel before entering the notification loop.
113    let join_handle = tokio::spawn(async move {
114        let client = match PubsubClient::new(&ws_url).await {
115            Ok(c) => c,
116            Err(e) => {
117                let _ = ready_tx.send(Err(SubscriptionError::Connect {
118                    url: ws_url,
119                    source: Box::new(e),
120                }));
121                return;
122            }
123        };
124
125        let (mut stream, _unsubscribe) = match client
126            .logs_subscribe(
127                RpcTransactionLogsFilter::Mentions(vec![program_id]),
128                RpcTransactionLogsConfig {
129                    commitment: Some(commitment),
130                },
131            )
132            .await
133        {
134            Ok(s) => s,
135            Err(e) => {
136                let _ = ready_tx.send(Err(SubscriptionError::Subscribe {
137                    source: Box::new(e),
138                }));
139                return;
140            }
141        };
142
143        let _ = ready_tx.send(Ok(()));
144
145        let mut tasks: Vec<JoinHandle<()>> = Vec::new();
146
147        loop {
148            if *stop_rx.borrow() {
149                // Drain notifications that arrived just before stop was signaled,
150                // bounded by a short total deadline to avoid running indefinitely
151                // for high-traffic programs.
152                let drain_until = tokio::time::Instant::now() + Duration::from_secs(1);
153                while let Ok(Some(notification)) =
154                    tokio::time::timeout_at(drain_until, stream.next()).await
155                {
156                    tasks.push(tokio::spawn(on_notification(notification)));
157                }
158                break;
159            }
160
161            let notification = tokio::select! {
162                n = stream.next() => n,
163                _ = stop_rx.changed() => continue,
164            };
165
166            match notification {
167                Some(n) => tasks.push(tokio::spawn(on_notification(n))),
168                None => break,
169            }
170        }
171
172        // Wait for all in-flight callback tasks to complete.
173        for task in tasks {
174            let _ = task.await;
175        }
176    });
177
178    match ready_rx.await {
179        Ok(Ok(())) => Ok(LogSubscriptionHandle {
180            join_handle,
181            stop: stop_tx,
182        }),
183        Ok(Err(e)) => {
184            join_handle.abort();
185            Err(e)
186        }
187        Err(_) => {
188            join_handle.abort();
189            Err(SubscriptionError::TaskDropped)
190        }
191    }
192}