Skip to main content

simulator_client/
subscriptions.rs

1use std::{future::Future, time::Duration};
2
3use futures::{SinkExt, StreamExt};
4use serde::Deserialize;
5use simulator_api::EncodedBinary;
6use solana_client::{
7    nonblocking::pubsub_client::PubsubClient,
8    rpc_response::{Response, RpcLogsResponse},
9};
10use solana_commitment_config::CommitmentConfig;
11use solana_rpc_client_api::config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter};
12use thiserror::Error;
13use tokio::{
14    sync::{oneshot, watch},
15    task::JoinHandle,
16};
17use tokio_tungstenite::tungstenite::Message;
18
19use crate::urls::{UrlError, http_to_ws_url};
20
21/// Error establishing a PubSub log subscription.
22#[derive(Debug, Error)]
23pub enum SubscriptionError {
24    #[error(transparent)]
25    InvalidUrl(#[from] UrlError),
26
27    #[error("pubsub connect to {url} failed: {source}")]
28    Connect {
29        url: String,
30        #[source]
31        source: Box<dyn std::error::Error + Send + Sync>,
32    },
33
34    #[error("logs_subscribe failed: {source}")]
35    Subscribe {
36        #[source]
37        source: Box<dyn std::error::Error + Send + Sync>,
38    },
39
40    #[error("subscription task exited unexpectedly before signaling ready")]
41    TaskDropped,
42
43    #[error("session has no rpc_endpoint (was the session created?)")]
44    NoRpcEndpoint,
45}
46
47#[derive(Debug, Error)]
48pub enum SubscriptionRuntimeError {
49    #[error("{kind} subscription for {target} closed unexpectedly")]
50    Closed { kind: &'static str, target: String },
51
52    #[error("{kind} subscription callback worker for {target} failed: {source}")]
53    CallbackWorker {
54        kind: &'static str,
55        target: String,
56        #[source]
57        source: tokio::task::JoinError,
58    },
59}
60
61const SUBSCRIPTION_DRAIN_IDLE_TIMEOUT: Duration = Duration::from_millis(250);
62const SUBSCRIPTION_DRAIN_MAX_DURATION: Duration = Duration::from_secs(5);
63
64type SubscriptionTaskHandle = JoinHandle<Result<(), SubscriptionRuntimeError>>;
65type AccountDiffWs =
66    tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
67
68/// A unified subscription handle that can represent any subscription type.
69pub struct SubscriptionHandle {
70    pub join_handle: SubscriptionTaskHandle,
71    pub stop: watch::Sender<bool>,
72}
73
74impl From<LogSubscriptionHandle> for SubscriptionHandle {
75    fn from(h: LogSubscriptionHandle) -> Self {
76        Self {
77            join_handle: h.join_handle,
78            stop: h.stop,
79        }
80    }
81}
82
83impl From<AccountDiffSubscriptionHandle> for SubscriptionHandle {
84    fn from(h: AccountDiffSubscriptionHandle) -> Self {
85        Self {
86            join_handle: h.join_handle,
87            stop: h.stop,
88        }
89    }
90}
91
92/// Handle for a running log subscription background task.
93pub struct LogSubscriptionHandle {
94    /// Background task that drives the subscription and spawns per-notification callbacks.
95    ///
96    /// Resolves after `stop.send(true)` is called, remaining buffered
97    /// notifications are drained, and all spawned callback tasks complete.
98    pub join_handle: SubscriptionTaskHandle,
99
100    /// Send `true` to signal the background task to stop accepting new
101    /// notifications, drain remaining buffered ones, and exit cleanly.
102    pub stop: watch::Sender<bool>,
103}
104
105/// Subscribe to program log notifications and invoke a callback for each one.
106///
107/// Spawns a background task that:
108/// 1. Connects to the PubSub endpoint derived from `rpc_endpoint`.
109/// 2. Subscribes to logs mentioning `program_id`.
110/// 3. For each notification, spawns `on_notification(notification)` as a Tokio task.
111/// 4. When `handle.stop.send(true)` is called, drains remaining buffered
112///    notifications (up to 1s), waits for all spawned tasks, then returns.
113///
114/// Returns after the subscription is established. If setup fails, an error is
115/// returned before any background task is left running.
116///
117/// ## Example
118///
119/// ```no_run
120/// use std::sync::{Arc, Mutex};
121/// use simulator_client::subscribe_program_logs;
122/// use solana_commitment_config::CommitmentConfig;
123///
124/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
125/// let handle = subscribe_program_logs(
126///     "https://api.mainnet-beta.solana.com",
127///     "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
128///     CommitmentConfig::confirmed(),
129///     |notification| async move {
130///         println!("sig: {}", notification.value.signature);
131///     },
132/// )
133/// .await?;
134///
135/// // ... do other work ...
136///
137/// handle.stop.send(true).ok();
138/// handle.join_handle.await.ok();
139/// # Ok(())
140/// # }
141/// ```
142pub async fn subscribe_program_logs<F, Fut>(
143    rpc_endpoint: &str,
144    program_id: &str,
145    commitment: CommitmentConfig,
146    on_notification: F,
147) -> Result<LogSubscriptionHandle, SubscriptionError>
148where
149    F: Fn(Response<RpcLogsResponse>) -> Fut + Send + Sync + 'static,
150    Fut: Future<Output = ()> + Send + 'static,
151{
152    let ws_url = http_to_ws_url(rpc_endpoint)?;
153    let program_id = program_id.to_string();
154
155    let (ready_tx, ready_rx) = oneshot::channel::<Result<(), SubscriptionError>>();
156    let (stop_tx, mut stop_rx) = watch::channel(false);
157
158    // PubsubClient::logs_subscribe borrows &self, so both the client and the
159    // stream must live inside the spawned task. We report setup success/failure
160    // back through a oneshot channel before entering the notification loop.
161    let join_handle = tokio::spawn(async move {
162        let client = match PubsubClient::new(&ws_url).await {
163            Ok(c) => c,
164            Err(e) => {
165                let _ = ready_tx.send(Err(SubscriptionError::Connect {
166                    url: ws_url,
167                    source: Box::new(e),
168                }));
169                return Ok(());
170            }
171        };
172
173        let (mut stream, _unsubscribe) = match client
174            .logs_subscribe(
175                RpcTransactionLogsFilter::Mentions(vec![program_id.clone()]),
176                RpcTransactionLogsConfig {
177                    commitment: Some(commitment),
178                },
179            )
180            .await
181        {
182            Ok(s) => s,
183            Err(e) => {
184                let _ = ready_tx.send(Err(SubscriptionError::Subscribe {
185                    source: Box::new(e),
186                }));
187                return Ok(());
188            }
189        };
190
191        let _ = ready_tx.send(Ok(()));
192
193        let mut tasks: Vec<JoinHandle<()>> = Vec::new();
194        let kind = "program logs";
195
196        loop {
197            if *stop_rx.borrow() {
198                let drain_deadline = tokio::time::Instant::now() + SUBSCRIPTION_DRAIN_MAX_DURATION;
199                while let Ok(Ok(Some(notification))) = tokio::time::timeout_at(
200                    drain_deadline,
201                    tokio::time::timeout(SUBSCRIPTION_DRAIN_IDLE_TIMEOUT, stream.next()),
202                )
203                .await
204                {
205                    tasks.push(tokio::spawn(on_notification(notification)));
206                }
207                break;
208            }
209
210            let notification = tokio::select! {
211                n = stream.next() => n,
212                _ = stop_rx.changed() => continue,
213            };
214
215            match notification {
216                Some(n) => tasks.push(tokio::spawn(on_notification(n))),
217                None => return Err(subscription_runtime_closed(kind, &program_id)),
218            }
219        }
220
221        // Wait for all in-flight callback tasks to complete.
222        for task in tasks {
223            if let Err(source) = task.await {
224                return Err(callback_worker_failed(kind, &program_id, source));
225            }
226        }
227
228        Ok(())
229    });
230
231    match ready_rx.await {
232        Ok(Ok(())) => Ok(LogSubscriptionHandle {
233            join_handle,
234            stop: stop_tx,
235        }),
236        Ok(Err(e)) => {
237            join_handle.abort();
238            Err(e)
239        }
240        Err(_) => {
241            join_handle.abort();
242            Err(SubscriptionError::TaskDropped)
243        }
244    }
245}
246
247// ── Account diff subscription ────────────────────────────────────────────────
248
249/// Slot context included in every account diff notification.
250#[derive(Debug, Clone, Deserialize)]
251pub struct AccountDiffContext {
252    pub slot: u64,
253}
254
255/// A single account diff notification delivered by `accountDiffSubscribe`.
256#[derive(Debug, Clone, Deserialize)]
257pub struct AccountDiffNotification {
258    pub context: AccountDiffContext,
259    /// The address of the account that changed.
260    pub account: Option<String>,
261    /// Signature of the transaction that triggered this change, if known.
262    pub signature: Option<String>,
263    /// Position of the transaction within its slot, if known.
264    #[serde(default)]
265    pub tx_index: Option<u32>,
266    /// Unix-seconds block time of the slot, if known.
267    #[serde(default)]
268    pub block_time: Option<i64>,
269    /// Account state before the change (absent for newly created accounts).
270    pub pre: Option<serde_json::Value>,
271    /// Account state after the change (absent for deleted accounts).
272    pub post: Option<serde_json::Value>,
273}
274
275#[derive(Debug, Clone, Deserialize)]
276pub struct ActionResultContext {
277    pub slot: u64,
278}
279
280/// One scheduled-action result delivered by `actionSubscribe`.
281#[derive(Debug, Clone, Deserialize)]
282#[serde(rename_all = "camelCase")]
283pub struct ActionResultNotification {
284    pub context: ActionResultContext,
285    pub slot: u64,
286    /// Batch the action fired at; `None` for slot-boundary actions.
287    #[serde(default)]
288    pub batch_index: Option<u32>,
289    /// Index of the action in the session's `actions` list.
290    pub action_index: u32,
291    #[serde(default)]
292    pub label: Option<String>,
293    pub committed: bool,
294    /// One per transaction in the action's sequence, in order. A failing
295    /// transaction is the last entry; later transactions don't run.
296    #[serde(default)]
297    pub transaction_outcomes: Vec<ActionTransactionOutcome>,
298    /// Post-execution `UiAccount` JSON per `return_accounts` address, positional;
299    /// cumulative state after the final transaction.
300    #[serde(default)]
301    pub accounts: Vec<Option<serde_json::Value>>,
302    /// Encoded transaction whose discovery-filter match triggered this action;
303    /// absent for slot-boundary actions. Decode with [`EncodedBinary::decode`]
304    /// (base64 bincode of `TxWithMeta`) to inspect the matching transaction.
305    #[serde(default)]
306    pub matched: Option<EncodedBinary>,
307}
308
309/// One transaction's result within a scheduled action's sequence.
310#[derive(Debug, Clone, Deserialize)]
311#[serde(rename_all = "camelCase")]
312pub struct ActionTransactionOutcome {
313    /// Transaction failure message; `None` on success.
314    #[serde(default)]
315    pub err: Option<String>,
316    #[serde(default)]
317    pub logs: Vec<String>,
318    pub units_consumed: u64,
319    #[serde(default)]
320    pub fee: Option<u64>,
321    /// Program return data (`{programId, data}`), if any.
322    #[serde(default)]
323    pub return_data: Option<serde_json::Value>,
324}
325
326/// A routed account diff notification tied to the subscribed account that produced it.
327#[derive(Debug, Clone)]
328pub struct RoutedAccountDiffNotification {
329    pub account: String,
330    pub notification: AccountDiffNotification,
331}
332
333/// Handle for a running account diff subscription background task.
334///
335/// Send `true` on `stop` to request a clean shutdown, then await `join_handle`.
336pub struct AccountDiffSubscriptionHandle {
337    pub join_handle: SubscriptionTaskHandle,
338    pub stop: watch::Sender<bool>,
339}
340
341fn subscription_runtime_closed(
342    kind: &'static str,
343    target: impl Into<String>,
344) -> SubscriptionRuntimeError {
345    SubscriptionRuntimeError::Closed {
346        kind,
347        target: target.into(),
348    }
349}
350
351fn callback_worker_failed(
352    kind: &'static str,
353    target: impl Into<String>,
354    source: tokio::task::JoinError,
355) -> SubscriptionRuntimeError {
356    SubscriptionRuntimeError::CallbackWorker {
357        kind,
358        target: target.into(),
359        source,
360    }
361}
362
363/// Subscribe to account diff notifications and invoke a callback for each one.
364///
365/// Spawns a background task that:
366/// 1. Connects to the WebSocket endpoint derived from `rpc_endpoint`.
367/// 2. Subscribes to account diffs for the given filter (account or program).
368/// 3. For each notification, spawns `on_notification(notification)` as a Tokio task.
369/// 4. When `handle.stop.send(true)` is called, drains remaining buffered
370///    notifications (up to 1s), waits for all spawned tasks, then returns.
371///
372/// ## Example
373///
374/// ```no_run
375/// use simulator_client::subscribe_account_diffs;
376///
377/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
378/// let handle = subscribe_account_diffs(
379///     "http://localhost:8900/session/abc",
380///     "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
381///     |notification| async move {
382///         println!("slot={} sig={:?}", notification.context.slot, notification.signature);
383///     },
384/// )
385/// .await?;
386///
387/// handle.stop.send(true).ok();
388/// handle.join_handle.await.ok();
389/// # Ok(())
390/// # }
391/// ```
392pub async fn subscribe_account_diffs<F, Fut>(
393    rpc_endpoint: &str,
394    account: &str,
395    on_notification: F,
396) -> Result<AccountDiffSubscriptionHandle, SubscriptionError>
397where
398    F: Fn(AccountDiffNotification) -> Fut + Send + Sync + 'static,
399    Fut: Future<Output = ()> + Send + 'static,
400{
401    subscribe_account_diffs_many(rpc_endpoint, [account.to_string()], move |notification| {
402        on_notification(notification.notification)
403    })
404    .await
405}
406
407/// Subscribe to account diff notifications for many accounts over a single websocket.
408///
409/// All requested subscriptions must be acknowledged before this returns. Once the
410/// stream is live, any websocket disconnect is treated as a fatal completeness
411/// error instead of silently reconnecting and risking dropped notifications.
412pub async fn subscribe_account_diffs_many<F, Fut, I, S>(
413    rpc_endpoint: &str,
414    accounts: I,
415    on_notification: F,
416) -> Result<AccountDiffSubscriptionHandle, SubscriptionError>
417where
418    F: Fn(RoutedAccountDiffNotification) -> Fut + Send + Sync + 'static,
419    Fut: Future<Output = ()> + Send + 'static,
420    I: IntoIterator<Item = S>,
421    S: Into<String>,
422{
423    let ws_url = http_to_ws_url(rpc_endpoint)?;
424    let accounts = dedup_accounts(accounts);
425    if accounts.is_empty() {
426        let (stop_tx, stop_rx) = watch::channel(false);
427        return Ok(AccountDiffSubscriptionHandle {
428            join_handle: tokio::spawn(async move {
429                let _ = stop_rx;
430                Ok(())
431            }),
432            stop: stop_tx,
433        });
434    }
435
436    let (ready_tx, ready_rx) = oneshot::channel::<Result<(), SubscriptionError>>();
437    let (stop_tx, mut stop_rx) = watch::channel(false);
438    let target = format!("{} accounts", accounts.len());
439
440    let join_handle = tokio::spawn(async move {
441        let (notification_tx, mut notification_rx) = tokio::sync::mpsc::unbounded_channel();
442        let callback_handle = tokio::spawn(async move {
443            while let Some(notification) = notification_rx.recv().await {
444                on_notification(notification).await;
445            }
446        });
447
448        let (mut ws, _) = match tokio_tungstenite::connect_async(&ws_url).await {
449            Ok(connection) => connection,
450            Err(e) => {
451                let _ = ready_tx.send(Err(SubscriptionError::Connect {
452                    url: ws_url,
453                    source: Box::new(e),
454                }));
455                return Ok(());
456            }
457        };
458
459        let subscriptions =
460            match send_account_diff_subscribe_many(&mut ws, &accounts, &notification_tx).await {
461                Ok(subscriptions) => subscriptions,
462                Err(error) => {
463                    let _ = ready_tx.send(Err(error));
464                    return Ok(());
465                }
466            };
467
468        let _ = ready_tx.send(Ok(()));
469
470        if let Err(error) =
471            drive_account_diff_stream_many(&mut ws, &subscriptions, &notification_tx, &mut stop_rx)
472                .await
473        {
474            drop(notification_tx);
475            if let Err(source) = callback_handle.await {
476                return Err(callback_worker_failed("account diff", target, source));
477            }
478            return Err(error);
479        }
480
481        drop(notification_tx);
482        if let Err(source) = callback_handle.await {
483            return Err(callback_worker_failed("account diff", target, source));
484        }
485
486        Ok(())
487    });
488
489    match ready_rx.await {
490        Ok(Ok(())) => Ok(AccountDiffSubscriptionHandle {
491            join_handle,
492            stop: stop_tx,
493        }),
494        Ok(Err(e)) => {
495            join_handle.abort();
496            Err(e)
497        }
498        Err(_) => {
499            join_handle.abort();
500            Err(SubscriptionError::TaskDropped)
501        }
502    }
503}
504
505#[derive(Deserialize)]
506struct AccountDiffMessage {
507    method: String,
508    params: AccountDiffParams,
509}
510
511#[derive(Deserialize)]
512struct AccountDiffParams {
513    subscription: u64,
514    result: AccountDiffNotification,
515}
516
517async fn send_account_diff_subscribe_many(
518    ws: &mut AccountDiffWs,
519    accounts: &[String],
520    notification_tx: &tokio::sync::mpsc::UnboundedSender<RoutedAccountDiffNotification>,
521) -> Result<std::collections::HashMap<u64, String>, SubscriptionError> {
522    #[derive(Deserialize)]
523    struct SubscriptionConfirmation {
524        id: u64,
525        result: Option<u64>,
526    }
527
528    let mut pending: std::collections::HashMap<u64, String> = std::collections::HashMap::new();
529    let mut subscriptions = std::collections::HashMap::with_capacity(accounts.len());
530
531    for (index, account) in accounts.iter().enumerate() {
532        let request_id = (index + 1) as u64;
533        let req = serde_json::json!({
534            "jsonrpc": "2.0",
535            "id": request_id,
536            "method": "accountDiffSubscribe",
537            "params": [account]
538        });
539        ws.send(Message::Text(req.to_string()))
540            .await
541            .map_err(|source| SubscriptionError::Subscribe {
542                source: Box::new(source),
543            })?;
544        pending.insert(request_id, account.clone());
545    }
546
547    while !pending.is_empty() {
548        match ws.next().await {
549            Some(Ok(Message::Text(text))) => {
550                if let Ok(confirmation) = serde_json::from_str::<SubscriptionConfirmation>(&text) {
551                    let Some(account) = pending.remove(&confirmation.id) else {
552                        continue;
553                    };
554                    let Some(subscription_id) = confirmation.result else {
555                        return Err(SubscriptionError::TaskDropped);
556                    };
557                    subscriptions.insert(subscription_id, account);
558                    continue;
559                }
560
561                if let Some(notification) =
562                    parse_routed_account_diff_notification(&text, &subscriptions)
563                {
564                    let _ = notification_tx.send(notification);
565                }
566            }
567            Some(Ok(_)) => {}
568            _ => return Err(SubscriptionError::TaskDropped),
569        }
570    }
571
572    Ok(subscriptions)
573}
574
575async fn drive_account_diff_stream_many(
576    ws: &mut AccountDiffWs,
577    subscriptions: &std::collections::HashMap<u64, String>,
578    notification_tx: &tokio::sync::mpsc::UnboundedSender<RoutedAccountDiffNotification>,
579    stop_rx: &mut watch::Receiver<bool>,
580) -> Result<(), SubscriptionRuntimeError> {
581    loop {
582        if *stop_rx.borrow() {
583            let drain_deadline = tokio::time::Instant::now() + SUBSCRIPTION_DRAIN_MAX_DURATION;
584            loop {
585                match tokio::time::timeout_at(
586                    drain_deadline,
587                    tokio::time::timeout(SUBSCRIPTION_DRAIN_IDLE_TIMEOUT, ws.next()),
588                )
589                .await
590                {
591                    Ok(Ok(Some(Ok(Message::Text(text))))) => {
592                        if let Some(notification) =
593                            parse_routed_account_diff_notification(&text, subscriptions)
594                        {
595                            let _ = notification_tx.send(notification);
596                        }
597                    }
598                    _ => return Ok(()),
599                }
600            }
601        }
602
603        let msg = tokio::select! {
604            m = ws.next() => m,
605            _ = stop_rx.changed() => continue,
606        };
607
608        match msg {
609            Some(Ok(Message::Text(text))) => {
610                if let Some(notification) =
611                    parse_routed_account_diff_notification(&text, subscriptions)
612                {
613                    let _ = notification_tx.send(notification);
614                }
615            }
616            Some(Ok(_)) => {}
617            _ => {
618                return Err(subscription_runtime_closed(
619                    "account diff",
620                    format!("{} accounts", subscriptions.len()),
621                ));
622            }
623        }
624    }
625}
626
627fn parse_account_diff_message(text: &str) -> Option<AccountDiffMessage> {
628    let msg: AccountDiffMessage = serde_json::from_str(text).ok()?;
629    (msg.method == "accountDiffNotification").then_some(msg)
630}
631
632fn parse_routed_account_diff_notification(
633    text: &str,
634    subscriptions: &std::collections::HashMap<u64, String>,
635) -> Option<RoutedAccountDiffNotification> {
636    let msg = parse_account_diff_message(text)?;
637    let account = subscriptions.get(&msg.params.subscription)?.clone();
638    Some(RoutedAccountDiffNotification {
639        account,
640        notification: msg.params.result,
641    })
642}
643
644fn dedup_accounts<I, S>(accounts: I) -> Vec<String>
645where
646    I: IntoIterator<Item = S>,
647    S: Into<String>,
648{
649    let mut unique = std::collections::BTreeSet::new();
650    accounts
651        .into_iter()
652        .map(Into::into)
653        .filter(|account| unique.insert(account.clone()))
654        .collect()
655}
656
657// ── Program account diff subscription ────────────────────────────────────────
658
659/// Subscribe to account diff notifications for all accounts owned by a program.
660///
661/// Uses the server-side program filter (`{"address_type": "program"}`), so no
662/// RPC prefetch of program accounts is required.  The callback receives one
663/// [`AccountDiffNotification`] per changed account.
664///
665/// A websocket disconnect is treated as a fatal error — the handle's
666/// `join_handle` resolves with a [`SubscriptionRuntimeError`].
667///
668/// ## Example
669///
670/// ```no_run
671/// use simulator_client::subscribe_program_diffs;
672///
673/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
674/// let handle = subscribe_program_diffs(
675///     "http://localhost:8900/session/abc",
676///     "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
677///     |notification| async move {
678///         let account = notification.account.unwrap_or_default();
679///         println!("account={account} slot={}", notification.context.slot);
680///     },
681/// )
682/// .await?;
683///
684/// handle.stop.send(true).ok();
685/// handle.join_handle.await.ok();
686/// # Ok(())
687/// # }
688/// ```
689pub async fn subscribe_program_diffs<F, Fut>(
690    rpc_endpoint: &str,
691    program_id: &str,
692    on_notification: F,
693) -> Result<AccountDiffSubscriptionHandle, SubscriptionError>
694where
695    F: Fn(AccountDiffNotification) -> Fut + Send + Sync + 'static,
696    Fut: Future<Output = ()> + Send + 'static,
697{
698    let ws_url = http_to_ws_url(rpc_endpoint)?;
699    let program_id = program_id.to_string();
700
701    let (ready_tx, ready_rx) = oneshot::channel::<Result<(), SubscriptionError>>();
702    let (stop_tx, mut stop_rx) = watch::channel(false);
703
704    let join_handle = tokio::spawn(async move {
705        let (notification_tx, mut notification_rx) = tokio::sync::mpsc::unbounded_channel();
706        let callback_handle = tokio::spawn(async move {
707            while let Some(notification) = notification_rx.recv().await {
708                on_notification(notification).await;
709            }
710        });
711
712        let (mut ws, _) = match tokio_tungstenite::connect_async(&ws_url).await {
713            Ok(connection) => connection,
714            Err(e) => {
715                let _ = ready_tx.send(Err(SubscriptionError::Connect {
716                    url: ws_url,
717                    source: Box::new(e),
718                }));
719                return Ok(());
720            }
721        };
722
723        if let Err(error) = send_program_diff_subscribe(&mut ws, &program_id).await {
724            let _ = ready_tx.send(Err(error));
725            return Ok(());
726        }
727
728        let _ = ready_tx.send(Ok(()));
729
730        if let Err(error) =
731            drive_program_diff_stream(&mut ws, &notification_tx, &mut stop_rx, &program_id).await
732        {
733            drop(notification_tx);
734            if let Err(source) = callback_handle.await {
735                return Err(callback_worker_failed(
736                    "program account diff",
737                    &program_id,
738                    source,
739                ));
740            }
741            return Err(error);
742        }
743
744        drop(notification_tx);
745        if let Err(source) = callback_handle.await {
746            return Err(callback_worker_failed(
747                "program account diff",
748                &program_id,
749                source,
750            ));
751        }
752
753        Ok(())
754    });
755
756    match ready_rx.await {
757        Ok(Ok(())) => Ok(AccountDiffSubscriptionHandle {
758            join_handle,
759            stop: stop_tx,
760        }),
761        Ok(Err(e)) => {
762            join_handle.abort();
763            Err(e)
764        }
765        Err(_) => {
766            join_handle.abort();
767            Err(SubscriptionError::TaskDropped)
768        }
769    }
770}
771
772async fn send_program_diff_subscribe(
773    ws: &mut AccountDiffWs,
774    program_id: &str,
775) -> Result<(), SubscriptionError> {
776    #[derive(Deserialize)]
777    struct SubscriptionConfirmation {
778        result: Option<u64>,
779    }
780
781    let req = serde_json::json!({
782        "jsonrpc": "2.0",
783        "id": 1,
784        "method": "accountDiffSubscribe",
785        "params": [program_id, {"address_type": "program"}]
786    });
787    ws.send(Message::Text(req.to_string()))
788        .await
789        .map_err(|source| SubscriptionError::Subscribe {
790            source: Box::new(source),
791        })?;
792
793    loop {
794        match ws.next().await {
795            Some(Ok(Message::Text(text))) => {
796                match serde_json::from_str::<SubscriptionConfirmation>(&text) {
797                    Ok(SubscriptionConfirmation { result: Some(_) }) => return Ok(()),
798                    Ok(_) => continue,
799                    Err(source) => {
800                        return Err(SubscriptionError::Subscribe {
801                            source: Box::new(source),
802                        });
803                    }
804                }
805            }
806            Some(Ok(_)) => continue,
807            _ => return Err(SubscriptionError::TaskDropped),
808        }
809    }
810}
811
812async fn drive_program_diff_stream(
813    ws: &mut AccountDiffWs,
814    notification_tx: &tokio::sync::mpsc::UnboundedSender<AccountDiffNotification>,
815    stop_rx: &mut watch::Receiver<bool>,
816    program_id: &str,
817) -> Result<(), SubscriptionRuntimeError> {
818    loop {
819        if *stop_rx.borrow() {
820            let drain_deadline = tokio::time::Instant::now() + SUBSCRIPTION_DRAIN_MAX_DURATION;
821            loop {
822                match tokio::time::timeout_at(
823                    drain_deadline,
824                    tokio::time::timeout(SUBSCRIPTION_DRAIN_IDLE_TIMEOUT, ws.next()),
825                )
826                .await
827                {
828                    Ok(Ok(Some(Ok(Message::Text(text))))) => {
829                        if let Some(msg) = parse_account_diff_message(&text) {
830                            let _ = notification_tx.send(msg.params.result);
831                        }
832                    }
833                    _ => return Ok(()),
834                }
835            }
836        }
837
838        let msg = tokio::select! {
839            m = ws.next() => m,
840            _ = stop_rx.changed() => continue,
841        };
842
843        match msg {
844            Some(Ok(Message::Text(text))) => {
845                if let Some(msg) = parse_account_diff_message(&text) {
846                    let _ = notification_tx.send(msg.params.result);
847                }
848            }
849            Some(Ok(_)) => {}
850            _ => {
851                return Err(subscription_runtime_closed(
852                    "program account diff",
853                    program_id,
854                ));
855            }
856        }
857    }
858}
859
860#[cfg(test)]
861mod tests {
862    use super::*;
863
864    #[test]
865    fn parse_account_diff_notification_ignores_other_messages() {
866        let confirmation = r#"{"jsonrpc":"2.0","result":1,"id":1}"#;
867        assert!(parse_account_diff_message(confirmation).is_none());
868    }
869
870    #[test]
871    fn parse_account_diff_notification_extracts_payload() {
872        let text = r#"{
873            "jsonrpc":"2.0",
874            "method":"accountDiffNotification",
875            "params":{
876                "subscription":7,
877                "result":{
878                    "context":{"slot":123},
879                    "signature":"sig",
880                    "pre":{"a":1},
881                    "post":{"a":2}
882                }
883            }
884        }"#;
885
886        let notification = parse_account_diff_message(text)
887            .expect("notification")
888            .params
889            .result;
890        assert_eq!(notification.context.slot, 123);
891        assert_eq!(notification.signature.as_deref(), Some("sig"));
892        assert_eq!(notification.pre, Some(serde_json::json!({"a": 1})));
893        assert_eq!(notification.post, Some(serde_json::json!({"a": 2})));
894    }
895
896    #[test]
897    fn parse_routed_account_diff_notification_extracts_subscription_account() {
898        let text = r#"{
899            "jsonrpc":"2.0",
900            "method":"accountDiffNotification",
901            "params":{
902                "subscription":42,
903                "result":{
904                    "context":{"slot":456},
905                    "signature":"sig",
906                    "pre":null,
907                    "post":{"a":2}
908                }
909            }
910        }"#;
911        let subscriptions = std::collections::HashMap::from([(42_u64, "acct".to_string())]);
912
913        let notification =
914            parse_routed_account_diff_notification(text, &subscriptions).expect("notification");
915        assert_eq!(notification.account, "acct");
916        assert_eq!(notification.notification.context.slot, 456);
917    }
918
919    #[test]
920    fn dedup_accounts_preserves_first_seen_order() {
921        let accounts = dedup_accounts([
922            "b".to_string(),
923            "a".to_string(),
924            "b".to_string(),
925            "c".to_string(),
926        ]);
927        assert_eq!(accounts, vec!["b", "a", "c"]);
928    }
929}