Skip to main content

surfpool_core/rpc/
ws.rs

1use std::{
2    collections::HashMap,
3    str::FromStr,
4    sync::{Arc, RwLock, atomic},
5};
6
7use crossbeam_channel::TryRecvError;
8use jsonrpc_core::{Error, ErrorCode, Result};
9use jsonrpc_derive::rpc;
10use jsonrpc_pubsub::{
11    SubscriptionId,
12    typed::{Sink, Subscriber},
13};
14use solana_account_decoder::{UiAccount, UiAccountEncoding};
15use solana_client::{
16    rpc_config::{RpcSignatureSubscribeConfig, RpcTransactionConfig, RpcTransactionLogsFilter},
17    rpc_filter::RpcFilterType,
18    rpc_response::{
19        ProcessedSignatureResult, ReceivedSignatureResult, RpcKeyedAccount, RpcLogsResponse,
20        RpcResponseContext, RpcSignatureResult,
21    },
22};
23use solana_commitment_config::{CommitmentConfig, CommitmentLevel};
24use solana_pubkey::Pubkey;
25use solana_rpc_client_api::response::{Response as RpcResponse, SlotInfo};
26use solana_signature::Signature;
27use solana_transaction_status::{TransactionConfirmationStatus, UiTransactionEncoding};
28
29use super::{State, SurfnetRpcContext, SurfpoolWebsocketMeta};
30use crate::surfnet::{GetTransactionResult, SignatureSubscriptionType};
31
32/// Configuration for account subscription requests.
33///
34/// This struct defines the parameters that clients can specify when subscribing
35/// to account change notifications through WebSocket connections. It allows customization
36/// of both the commitment level for updates and the encoding format for account data.
37///
38/// ## Fields
39/// - `commitment`: Optional commitment configuration specifying when to send notifications
40///   (processed, confirmed, or finalized). Defaults to the node's default commitment level.
41/// - `encoding`: Optional encoding format for account data serialization (base58, base64, jsonParsed, etc.).
42///   Defaults to base58 encoding if not specified.
43///
44/// ## Usage
45/// Clients can provide this configuration to customize their subscription behavior:
46/// - Set commitment level to control notification timing based on confirmation status
47/// - Set encoding to specify the preferred format for receiving account data
48///
49/// ## Example Usage
50/// ```json
51/// {
52///   "commitment": "confirmed",
53///   "encoding": "base64"
54/// }
55/// ```
56#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
57#[serde(rename_all = "camelCase")]
58pub struct RpcAccountSubscribeConfig {
59    #[serde(flatten)]
60    pub commitment: Option<CommitmentConfig>,
61    pub encoding: Option<UiAccountEncoding>,
62}
63
64#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)]
65#[serde(rename_all = "camelCase")]
66pub struct RpcProgramSubscribeConfig {
67    #[serde(flatten)]
68    pub commitment: Option<CommitmentConfig>,
69    pub encoding: Option<UiAccountEncoding>,
70    pub filters: Option<Vec<RpcFilterType>>,
71}
72
73#[rpc]
74pub trait Rpc {
75    type Metadata;
76
77    /// Subscribe to signature status notifications via WebSocket.
78    ///
79    /// This method allows clients to subscribe to status updates for a specific transaction signature.
80    /// The subscriber will receive notifications when the transaction reaches the desired confirmation level
81    /// or when it's initially received by the network (if configured).
82    ///
83    /// ## Parameters
84    /// - `meta`: WebSocket metadata containing RPC context and connection information.
85    /// - `subscriber`: The subscription sink for sending signature status notifications to the client.
86    /// - `signature_str`: The transaction signature to monitor, as a base-58 encoded string.
87    /// - `config`: Optional configuration specifying commitment level and notification preferences.
88    ///
89    /// ## Returns
90    /// This method does not return a value directly. Instead, it establishes a WebSocket subscription
91    /// that will send `RpcResponse<RpcSignatureResult>` notifications to the subscriber when the
92    /// transaction status changes.
93    ///
94    /// ## Example WebSocket Request
95    /// ```json
96    /// {
97    ///   "jsonrpc": "2.0",
98    ///   "id": 1,
99    ///   "method": "signatureSubscribe",
100    ///   "params": [
101    ///     "2id3YC2jK9G5Wo2phDx4gJVAew8DcY5NAojnVuao8rkxwPYPe8cSwE5GzhEgJA2y8fVjDEo6iR6ykBvDxrTQrtpb",
102    ///     {
103    ///       "commitment": "finalized",
104    ///       "enableReceivedNotification": false
105    ///     }
106    ///   ]
107    /// }
108    /// ```
109    ///
110    /// ## Example WebSocket Response (Subscription Confirmation)
111    /// ```json
112    /// {
113    ///   "jsonrpc": "2.0",
114    ///   "result": 0,
115    ///   "id": 1
116    /// }
117    /// ```
118    ///
119    /// ## Example WebSocket Notification
120    /// ```json
121    /// {
122    ///   "jsonrpc": "2.0",
123    ///   "method": "signatureNotification",
124    ///   "params": {
125    ///     "result": {
126    ///       "context": {
127    ///         "slot": 5207624
128    ///       },
129    ///       "value": {
130    ///         "err": null
131    ///       }
132    ///     },
133    ///     "subscription": 0
134    ///   }
135    /// }
136    /// ```
137    ///
138    /// ## Notes
139    /// - If the transaction already exists with the desired confirmation status, the subscriber
140    ///   will be notified immediately and the subscription will complete.
141    /// - The subscription automatically terminates after sending the first matching notification.
142    /// - Invalid signature formats will cause the subscription to be rejected with an error.
143    /// - Each subscription runs in its own async task for optimal performance.
144    ///
145    /// ## See Also
146    /// - `signatureUnsubscribe`: Remove an active signature subscription
147    /// - `getSignatureStatuses`: Get current status of multiple signatures
148    #[pubsub(
149        subscription = "signatureNotification",
150        subscribe,
151        name = "signatureSubscribe"
152    )]
153    fn signature_subscribe(
154        &self,
155        meta: Self::Metadata,
156        subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
157        signature_str: String,
158        config: Option<RpcSignatureSubscribeConfig>,
159    );
160
161    /// Unsubscribe from signature status notifications.
162    ///
163    /// This method removes an active signature subscription, stopping further notifications
164    /// for the specified subscription ID.
165    ///
166    /// ## Parameters
167    /// - `meta`: Optional WebSocket metadata containing connection information.
168    /// - `subscription`: The subscription ID to remove, as returned by `signatureSubscribe`.
169    ///
170    /// ## Returns
171    /// A `Result<bool>` indicating whether the unsubscription was successful:
172    /// - `Ok(true)` if the subscription was successfully removed
173    /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
174    ///
175    /// ## Example WebSocket Request
176    /// ```json
177    /// {
178    ///   "jsonrpc": "2.0",
179    ///   "id": 1,
180    ///   "method": "signatureUnsubscribe",
181    ///   "params": [0]
182    /// }
183    /// ```
184    ///
185    /// ## Example WebSocket Response
186    /// ```json
187    /// {
188    ///   "jsonrpc": "2.0",
189    ///   "result": true,
190    ///   "id": 1
191    /// }
192    /// ```
193    ///
194    /// ## Notes
195    /// - Attempting to unsubscribe from a non-existent subscription will return an error.
196    /// - Successfully unsubscribed connections will no longer receive notifications.
197    /// - This method is thread-safe and can be called concurrently.
198    ///
199    /// ## See Also
200    /// - `signatureSubscribe`: Create a signature status subscription
201    #[pubsub(
202        subscription = "signatureNotification",
203        unsubscribe,
204        name = "signatureUnsubscribe"
205    )]
206    fn signature_unsubscribe(
207        &self,
208        meta: Option<Self::Metadata>,
209        subscription: SubscriptionId,
210    ) -> Result<bool>;
211
212    /// Subscribe to account change notifications via WebSocket.
213    ///
214    /// This method allows clients to subscribe to updates for a specific account.
215    /// The subscriber will receive notifications whenever the account's data, lamports balance,
216    /// ownership, or other properties change.
217    ///
218    /// ## Parameters
219    /// - `meta`: WebSocket metadata containing RPC context and connection information.
220    /// - `subscriber`: The subscription sink for sending account update notifications to the client.
221    /// - `pubkey_str`: The account public key to monitor, as a base-58 encoded string.
222    /// - `config`: Optional configuration specifying commitment level and encoding format for account data.
223    ///
224    /// ## Returns
225    /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
226    /// subscription that will send `RpcResponse<UiAccount>` notifications to the subscriber whenever
227    /// the account state changes.
228    ///
229    /// ## Example WebSocket Request
230    /// ```json
231    /// {
232    ///   "jsonrpc": "2.0",
233    ///   "id": 1,
234    ///   "method": "accountSubscribe",
235    ///   "params": [
236    ///     "CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12",
237    ///     {
238    ///       "commitment": "finalized",
239    ///       "encoding": "base64"
240    ///     }
241    ///   ]
242    /// }
243    /// ```
244    ///
245    /// ## Example WebSocket Response (Subscription Confirmation)
246    /// ```json
247    /// {
248    ///   "jsonrpc": "2.0",
249    ///   "result": 23784,
250    ///   "id": 1
251    /// }
252    /// ```
253    ///
254    /// ## Example WebSocket Notification
255    /// ```json
256    /// {
257    ///   "jsonrpc": "2.0",
258    ///   "method": "accountNotification",
259    ///   "params": {
260    ///     "result": {
261    ///       "context": {
262    ///         "slot": 5208469
263    ///       },
264    ///       "value": {
265    ///         "data": ["base64EncodedAccountData", "base64"],
266    ///         "executable": false,
267    ///         "lamports": 33594,
268    ///         "owner": "11111111111111111111111111111112",
269    ///         "rentEpoch": 636
270    ///       }
271    ///     },
272    ///     "subscription": 23784
273    ///   }
274    /// }
275    /// ```
276    ///
277    /// ## Notes
278    /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
279    /// - Account notifications are sent whenever any aspect of the account changes.
280    /// - The encoding format specified in the config determines how account data is serialized.
281    /// - Invalid public key formats will cause the subscription to be rejected with an error.
282    /// - Each subscription runs in its own async task to ensure optimal performance.
283    ///
284    /// ## See Also
285    /// - `accountUnsubscribe`: Remove an active account subscription
286    /// - `getAccountInfo`: Get current account information
287    #[pubsub(
288        subscription = "accountNotification",
289        subscribe,
290        name = "accountSubscribe"
291    )]
292    fn account_subscribe(
293        &self,
294        meta: Self::Metadata,
295        subscriber: Subscriber<RpcResponse<UiAccount>>,
296        pubkey_str: String,
297        config: Option<RpcAccountSubscribeConfig>,
298    );
299
300    /// Unsubscribe from account change notifications.
301    ///
302    /// This method removes an active account subscription, stopping further notifications
303    /// for the specified subscription ID. The monitoring task will automatically terminate
304    /// when the subscription is removed.
305    ///
306    /// ## Parameters
307    /// - `meta`: Optional WebSocket metadata containing connection information.
308    /// - `subscription`: The subscription ID to remove, as returned by `accountSubscribe`.
309    ///
310    /// ## Returns
311    /// A `Result<bool>` indicating whether the unsubscription was successful:
312    /// - `Ok(true)` if the subscription was successfully removed
313    /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
314    ///
315    /// ## Example WebSocket Request
316    /// ```json
317    /// {
318    ///   "jsonrpc": "2.0",
319    ///   "id": 1,
320    ///   "method": "accountUnsubscribe",
321    ///   "params": [23784]
322    /// }
323    /// ```
324    ///
325    /// ## Example WebSocket Response
326    /// ```json
327    /// {
328    ///   "jsonrpc": "2.0",
329    ///   "result": true,
330    ///   "id": 1
331    /// }
332    /// ```
333    ///
334    /// ## Notes
335    /// - Attempting to unsubscribe from a non-existent subscription will return an error.
336    /// - Successfully unsubscribed connections will no longer receive account notifications.
337    /// - The monitoring task automatically detects subscription removal and terminates gracefully.
338    /// - This method is thread-safe and can be called concurrently.
339    ///
340    /// ## See Also
341    /// - `accountSubscribe`: Create an account change subscription
342    #[pubsub(
343        subscription = "accountNotification",
344        unsubscribe,
345        name = "accountUnsubscribe"
346    )]
347    fn account_unsubscribe(
348        &self,
349        meta: Option<Self::Metadata>,
350        subscription: SubscriptionId,
351    ) -> Result<bool>;
352
353    /// Subscribe to slot notifications.
354    ///
355    /// This method allows clients to subscribe to updates for a specific slot.
356    /// The subscriber will receive notifications whenever the slot changes.
357    ///
358    /// ## Parameters
359    /// - `meta`: WebSocket metadata containing RPC context and connection information.
360    /// - `subscriber`: The subscription sink for sending slot update notifications to the client.
361    ///
362    /// ## Returns
363    /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
364    /// subscription that will send `SlotInfo` notifications to the subscriber whenever
365    /// the slot changes.
366    ///
367    /// ## Example WebSocket Request
368    /// ```json
369    /// {
370    ///   "jsonrpc": "2.0",
371    ///   "id": 1,
372    ///   "method": "slotSubscribe",
373    ///   "params": [
374    ///     {
375    ///       "commitment": "finalized"
376    ///     }
377    ///   ]
378    /// }
379    /// ```
380    ///
381    /// ## Example WebSocket Response (Subscription Confirmation)
382    /// ```json
383    /// {
384    ///   "jsonrpc": "2.0",
385    ///   "result": 5207624,
386    ///   "id": 1
387    /// }
388    /// ```
389    ///
390    /// ## Example WebSocket Notification
391    /// ```json
392    /// {
393    ///   "jsonrpc": "2.0",
394    ///   "method": "slotNotification",
395    ///   "params": {
396    ///     "result": {
397    ///       "slot": 5207624
398    ///     },
399    ///     "subscription": 5207624
400    ///   }
401    /// }
402    /// ```
403    ///
404    /// ## Notes
405    /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
406    /// - Slot notifications are sent whenever the slot changes.
407    /// - The subscription automatically terminates when the slot changes.
408    /// - Each subscription runs in its own async task for optimal performance.
409    ///
410    /// ## See Also
411    /// - `slotUnsubscribe`: Remove an active slot subscription
412    #[pubsub(subscription = "slotNotification", subscribe, name = "slotSubscribe")]
413    fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>);
414
415    /// Unsubscribe from slot notifications.
416    ///
417    /// This method removes an active slot subscription, stopping further notifications
418    /// for the specified subscription ID.
419    ///
420    /// ## Parameters
421    /// - `meta`: Optional WebSocket metadata containing connection information.
422    /// - `subscription`: The subscription ID to remove, as returned by `slotSubscribe`.
423    ///
424    /// ## Returns
425    /// A `Result<bool>` indicating whether the unsubscription was successful:
426    /// - `Ok(true)` if the subscription was successfully removed
427    /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
428    ///
429    /// ## Example WebSocket Request
430    /// ```json
431    /// {
432    ///   "jsonrpc": "2.0",
433    ///   "id": 1,
434    ///   "method": "slotUnsubscribe",
435    ///   "params": [0]
436    /// }
437    /// ```
438    ///
439    /// ## Example WebSocket Response
440    /// ```json
441    /// {
442    ///   "jsonrpc": "2.0",
443    ///   "result": true,
444    ///   "id": 1
445    /// }
446    /// ```
447    ///
448    /// ## Notes
449    /// - Attempting to unsubscribe from a non-existent subscription will return an error.
450    /// - Successfully unsubscribed connections will no longer receive notifications.
451    /// - This method is thread-safe and can be called concurrently.
452    ///
453    /// ## See Also
454    /// - `slotSubscribe`: Create a slot subscription
455    #[pubsub(
456        subscription = "slotNotification",
457        unsubscribe,
458        name = "slotUnsubscribe"
459    )]
460    fn slot_unsubscribe(
461        &self,
462        meta: Option<Self::Metadata>,
463        subscription: SubscriptionId,
464    ) -> Result<bool>;
465
466    /// Subscribe to logs notifications.
467    ///
468    /// This method allows clients to subscribe to transaction log messages
469    /// emitted during transaction execution. It supports filtering by signature,
470    /// account mentions, or all transactions.
471    ///
472    /// ## Parameters
473    /// - `meta`: WebSocket metadata containing RPC context and connection information.
474    /// - `subscriber`: The subscription sink for sending log notifications to the client.
475    /// - `mentions`: Optional filter for the subscription: can be a specific signature, account, or `"all"`.
476    /// - `commitment`: Optional commitment level for filtering logs by block finality.
477    ///
478    /// ## Returns
479    /// This method establishes a continuous WebSocket subscription that streams
480    /// `RpcLogsResponse` notifications as new transactions are processed.
481    ///
482    /// ## Example WebSocket Request
483    /// ```json
484    /// {
485    ///   "jsonrpc": "2.0",
486    ///   "id": 1,
487    ///   "method": "logsSubscribe",
488    ///   "params": [
489    ///     {
490    ///       "mentions": ["11111111111111111111111111111111"]
491    ///     },
492    ///     {
493    ///       "commitment": "finalized"
494    ///     }
495    ///   ]
496    /// }
497    /// ```
498    ///
499    /// ## Example WebSocket Response (Subscription Confirmation)
500    /// ```json
501    /// {
502    ///   "jsonrpc": "2.0",
503    ///   "result": 42,
504    ///   "id": 1
505    /// }
506    /// ```
507    ///
508    /// ## Example WebSocket Notification
509    /// ```json
510    /// {
511    ///   "jsonrpc": "2.0",
512    ///   "method": "logsNotification",
513    ///   "params": {
514    ///     "result": {
515    ///       "signature": "3s6n...",
516    ///       "err": null,
517    ///       "logs": ["Program 111111... invoke [1]", "Program 111111... success"]
518    ///     },
519    ///     "subscription": 42
520    ///   }
521    /// }
522    /// ```
523    ///
524    /// ## Notes
525    /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
526    /// - Each log subscription runs independently and supports filtering.
527    /// - Log messages may be truncated depending on cluster configuration.
528    ///
529    /// ## See Also
530    /// - `logsUnsubscribe`: Remove an active logs subscription.
531    #[pubsub(subscription = "logsNotification", subscribe, name = "logsSubscribe")]
532    fn logs_subscribe(
533        &self,
534        meta: Self::Metadata,
535        subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
536        mentions: Option<RpcTransactionLogsFilter>,
537        commitment: Option<CommitmentConfig>,
538    );
539
540    /// Unsubscribe from logs notifications.
541    ///
542    /// This method removes an active logs subscription, stopping further notifications
543    /// for the specified subscription ID.
544    ///
545    /// ## Parameters
546    /// - `meta`: Optional WebSocket metadata containing connection information.
547    /// - `subscription`: The subscription ID to remove, as returned by `logsSubscribe`.
548    ///
549    /// ## Returns
550    /// A `Result<bool>` indicating whether the unsubscription was successful:
551    /// - `Ok(true)` if the subscription was successfully removed.
552    /// - `Err(Error)` with `InvalidParams` if the subscription ID is not recognized.
553    ///
554    /// ## Example WebSocket Request
555    /// ```json
556    /// {
557    ///   "jsonrpc": "2.0",
558    ///   "id": 1,
559    ///   "method": "logsUnsubscribe",
560    ///   "params": [42]
561    /// }
562    /// ```
563    ///
564    /// ## Example WebSocket Response
565    /// ```json
566    /// {
567    ///   "jsonrpc": "2.0",
568    ///   "result": true,
569    ///   "id": 1
570    /// }
571    /// ```
572    ///
573    /// ## Notes
574    /// - Unsubscribing from a non-existent subscription ID returns an error.
575    /// - Successfully unsubscribed clients will no longer receive logs notifications.
576    /// - This method is thread-safe and may be called concurrently.
577    ///
578    /// ## See Also
579    /// - `logsSubscribe`: Create a logs subscription.
580    #[pubsub(
581        subscription = "logsNotification",
582        unsubscribe,
583        name = "logsUnsubscribe"
584    )]
585    fn logs_unsubscribe(
586        &self,
587        meta: Option<Self::Metadata>,
588        subscription: SubscriptionId,
589    ) -> Result<bool>;
590
591    #[pubsub(subscription = "rootNotification", subscribe, name = "rootSubscribe")]
592    fn root_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcResponse<()>>);
593
594    #[pubsub(
595        subscription = "rootNotification",
596        unsubscribe,
597        name = "rootUnsubscribe"
598    )]
599    fn root_unsubscribe(
600        &self,
601        meta: Option<Self::Metadata>,
602        subscription: SubscriptionId,
603    ) -> Result<bool>;
604
605    /// Subscribe to notifications for all accounts owned by a specific program via WebSocket.
606    ///
607    /// This method allows clients to subscribe to updates for any account whose `owner`
608    /// matches the given program ID. Notifications are sent whenever an account owned by
609    /// the program is created, updated, or deleted.
610    ///
611    /// ## Parameters
612    /// - `meta`: WebSocket metadata containing RPC context and connection information.
613    /// - `subscriber`: The subscription sink for sending program account notifications to the client.
614    /// - `pubkey_str`: The program public key to monitor, as a base-58 encoded string.
615    /// - `config`: Optional configuration specifying commitment level, encoding format, and filters.
616    ///
617    /// ## Returns
618    /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
619    /// subscription that will send `RpcResponse<RpcKeyedAccount>` notifications to the subscriber
620    /// whenever an account owned by the program changes.
621    ///
622    /// ## Filters
623    /// The optional config may include filters to narrow which account updates trigger notifications:
624    /// - `dataSize`: Only notify for accounts with a specific data length (in bytes).
625    /// - `memcmp`: Only notify for accounts whose data matches specific bytes at a given offset.
626    ///
627    /// ## Example WebSocket Request
628    /// ```json
629    /// {
630    ///   "jsonrpc": "2.0",
631    ///   "id": 1,
632    ///   "method": "programSubscribe",
633    ///   "params": [
634    ///     "11111111111111111111111111111111",
635    ///     {
636    ///       "encoding": "base64",
637    ///       "filters": [
638    ///         { "dataSize": 80 }
639    ///       ]
640    ///     }
641    ///   ]
642    /// }
643    /// ```
644    ///
645    /// ## Example WebSocket Response (Subscription Confirmation)
646    /// ```json
647    /// {
648    ///   "jsonrpc": "2.0",
649    ///   "result": 24040,
650    ///   "id": 1
651    /// }
652    /// ```
653    ///
654    /// ## Example WebSocket Notification
655    /// ```json
656    /// {
657    ///   "jsonrpc": "2.0",
658    ///   "method": "programNotification",
659    ///   "params": {
660    ///     "result": {
661    ///       "context": { "slot": 5208469 },
662    ///       "value": {
663    ///         "pubkey": "H4vnBqifaSACnKa7acsxstsY1iV1bvJNxsCY7enrd1hq",
664    ///         "account": {
665    ///           "data": ["base64data", "base64"],
666    ///           "executable": false,
667    ///           "lamports": 33594,
668    ///           "owner": "11111111111111111111111111111111",
669    ///           "rentEpoch": 636,
670    ///           "space": 36
671    ///         }
672    ///       }
673    ///     },
674    ///     "subscription": 24040
675    ///   }
676    /// }
677    /// ```
678    ///
679    /// ## Notes
680    /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
681    /// - Notifications include both the account pubkey and the full account data.
682    /// - Invalid public key formats will cause the subscription to be rejected with an error.
683    /// - Each subscription runs in its own async task for optimal performance.
684    ///
685    /// ## See Also
686    /// - `programUnsubscribe`: Remove an active program subscription
687    /// - `getProgramAccounts`: Get current accounts for a program
688    #[pubsub(
689        subscription = "programNotification",
690        subscribe,
691        name = "programSubscribe"
692    )]
693    fn program_subscribe(
694        &self,
695        meta: Self::Metadata,
696        subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
697        pubkey_str: String,
698        config: Option<RpcProgramSubscribeConfig>,
699    );
700
701    /// Unsubscribe from program account change notifications.
702    ///
703    /// This method removes an active program subscription, stopping further notifications
704    /// for the specified subscription ID. The monitoring task will automatically terminate
705    /// when the subscription is removed.
706    ///
707    /// ## Parameters
708    /// - `meta`: Optional WebSocket metadata containing connection information.
709    /// - `subscription`: The subscription ID to remove, as returned by `programSubscribe`.
710    ///
711    /// ## Returns
712    /// A `Result<bool>` indicating whether the unsubscription was successful:
713    /// - `Ok(true)` if the subscription was successfully removed
714    /// - `Err(Error)` with `InternalError` if the subscription map lock could not be acquired
715    ///
716    /// ## Example WebSocket Request
717    /// ```json
718    /// {
719    ///   "jsonrpc": "2.0",
720    ///   "id": 1,
721    ///   "method": "programUnsubscribe",
722    ///   "params": [24040]
723    /// }
724    /// ```
725    ///
726    /// ## Example WebSocket Response
727    /// ```json
728    /// {
729    ///   "jsonrpc": "2.0",
730    ///   "result": true,
731    ///   "id": 1
732    /// }
733    /// ```
734    ///
735    /// ## Notes
736    /// - Successfully unsubscribed connections will no longer receive program account notifications.
737    /// - The monitoring task automatically detects subscription removal and terminates gracefully.
738    /// - This method is thread-safe and can be called concurrently.
739    ///
740    /// ## See Also
741    /// - `programSubscribe`: Create a program account change subscription
742    #[pubsub(
743        subscription = "programNotification",
744        unsubscribe,
745        name = "programUnsubscribe"
746    )]
747    fn program_unsubscribe(
748        &self,
749        meta: Option<Self::Metadata>,
750        subscription: SubscriptionId,
751    ) -> Result<bool>;
752
753    #[pubsub(
754        subscription = "slotsUpdatesNotification",
755        subscribe,
756        name = "slotsUpdatesSubscribe"
757    )]
758    fn slots_updates_subscribe(
759        &self,
760        meta: Self::Metadata,
761        subscriber: Subscriber<RpcResponse<()>>,
762    );
763
764    #[pubsub(
765        subscription = "slotsUpdatesNotification",
766        unsubscribe,
767        name = "slotsUpdatesUnsubscribe"
768    )]
769    fn slots_updates_unsubscribe(
770        &self,
771        meta: Option<Self::Metadata>,
772        subscription: SubscriptionId,
773    ) -> Result<bool>;
774
775    #[pubsub(subscription = "blockNotification", subscribe, name = "blockSubscribe")]
776    fn block_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcResponse<()>>);
777
778    #[pubsub(
779        subscription = "blockNotification",
780        unsubscribe,
781        name = "blockUnsubscribe"
782    )]
783    fn block_unsubscribe(
784        &self,
785        meta: Option<Self::Metadata>,
786        subscription: SubscriptionId,
787    ) -> Result<bool>;
788
789    #[pubsub(subscription = "voteNotification", subscribe, name = "voteSubscribe")]
790    fn vote_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcResponse<()>>);
791
792    #[pubsub(
793        subscription = "voteNotification",
794        unsubscribe,
795        name = "voteUnsubscribe"
796    )]
797    fn vote_unsubscribe(
798        &self,
799        meta: Option<Self::Metadata>,
800        subscription: SubscriptionId,
801    ) -> Result<bool>;
802
803    /// Subscribe to snapshot import notifications via WebSocket.
804    ///
805    /// This method allows clients to subscribe to real-time updates about snapshot import operations
806    /// from a specific snapshot URL. The subscriber will receive notifications when the snapshot
807    /// is being imported, including progress updates and completion status.
808    ///
809    /// ## Parameters
810    /// - `meta`: WebSocket metadata containing RPC context and connection information.
811    /// - `subscriber`: The subscription sink for sending snapshot import notifications to the client.
812    /// - `snapshot_url`: The URL of the snapshot to import and monitor.
813    ///
814    /// ## Returns
815    /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
816    /// subscription that will send `SnapshotImportNotification` notifications to the subscriber whenever
817    /// the snapshot import operation status changes.
818    ///
819    /// ## Example WebSocket Request
820    /// ```json
821    /// {
822    ///   "jsonrpc": "2.0",
823    ///   "id": 1,
824    ///   "method": "snapshotSubscribe",
825    ///   "params": ["https://example.com/snapshot.json"]
826    /// }
827    /// ```
828    ///
829    /// ## Example WebSocket Response (Subscription Confirmation)
830    /// ```json
831    /// {
832    ///   "jsonrpc": "2.0",
833    ///   "result": 12345,
834    ///   "id": 1
835    /// }
836    /// ```
837    ///
838    /// ## Example WebSocket Notification
839    /// ```json
840    /// {
841    ///   "jsonrpc": "2.0",
842    ///   "method": "snapshotNotification",
843    ///   "params": {
844    ///     "result": {
845    ///       "snapshotId": "snapshot_20240107_123456",
846    ///       "status": "InProgress",
847    ///       "accountsLoaded": 1500,
848    ///       "totalAccounts": 3000,
849    ///       "error": null
850    ///     },
851    ///     "subscription": 12345
852    ///   }
853    /// }
854    /// ```
855    ///
856    /// ## Notes
857    /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
858    /// - Multiple clients can subscribe to different snapshot notifications simultaneously.
859    /// - The snapshot URL must be accessible and contain a valid snapshot format.
860    /// - Each subscription runs in its own async task for optimal performance.
861    ///
862    /// ## See Also
863    /// - `snapshotUnsubscribe`: Remove an active snapshot subscription
864    #[pubsub(
865        subscription = "snapshotNotification",
866        subscribe,
867        name = "snapshotSubscribe"
868    )]
869    fn snapshot_subscribe(
870        &self,
871        meta: Self::Metadata,
872        subscriber: Subscriber<crate::surfnet::SnapshotImportNotification>,
873        snapshot_url: String,
874    );
875
876    /// Unsubscribe from snapshot import notifications.
877    ///
878    /// This method removes an active snapshot subscription, stopping further notifications
879    /// for the specified subscription ID.
880    ///
881    /// ## Parameters
882    /// - `meta`: Optional WebSocket metadata containing connection information.
883    /// - `subscription`: The subscription ID to remove, as returned by `snapshotSubscribe`.
884    ///
885    /// ## Returns
886    /// A `Result<bool>` indicating whether the unsubscription was successful:
887    /// - `Ok(true)` if the subscription was successfully removed
888    /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
889    ///
890    /// ## Example WebSocket Request
891    /// ```json
892    /// {
893    ///   "jsonrpc": "2.0",
894    ///   "id": 1,
895    ///   "method": "snapshotUnsubscribe",
896    ///   "params": [12345]
897    /// }
898    /// ```
899    ///
900    /// ## Example WebSocket Response
901    /// ```json
902    /// {
903    ///   "jsonrpc": "2.0",
904    ///   "result": true,
905    ///   "id": 1
906    /// }
907    /// ```
908    ///
909    /// ## Notes
910    /// - Attempting to unsubscribe from a non-existent subscription will return an error.
911    /// - Successfully unsubscribed connections will no longer receive snapshot notifications.
912    /// - This method is thread-safe and can be called concurrently.
913    ///
914    /// ## See Also
915    /// - `snapshotSubscribe`: Create a snapshot import subscription
916    #[pubsub(
917        subscription = "snapshotNotification",
918        unsubscribe,
919        name = "snapshotUnsubscribe"
920    )]
921    fn snapshot_unsubscribe(
922        &self,
923        meta: Option<Self::Metadata>,
924        subscription: SubscriptionId,
925    ) -> Result<bool>;
926}
927
928/// WebSocket RPC server implementation for Surfpool.
929///
930/// This struct manages WebSocket subscriptions for both signature status updates
931/// and account change notifications in the Surfpool environment. It provides a complete
932/// WebSocket RPC interface that allows clients to subscribe to real-time updates
933/// from the Solana Virtual Machine (SVM) and handles the lifecycle of WebSocket connections.
934///
935/// ## Fields
936/// - `uid`: Atomic counter for generating unique subscription IDs across all subscription types.
937/// - `signature_subscription_map`: Thread-safe HashMap containing active signature subscriptions, mapping subscription IDs to their notification sinks.
938/// - `account_subscription_map`: Thread-safe HashMap containing active account subscriptions, mapping subscription IDs to their notification sinks.
939/// - `slot_subscription_map`: Thread-safe HashMap containing active slot subscriptions, mapping subscription IDs to their notification sinks.
940/// - `tokio_handle`: Runtime handle for spawning asynchronous subscription monitoring tasks.
941///
942/// ## Features
943/// - **Concurrent Subscriptions**: Supports multiple simultaneous subscriptions without blocking.
944/// - **Thread Safety**: All subscription management operations are thread-safe using RwLock.
945/// - **Automatic Cleanup**: Subscriptions are automatically cleaned up when completed or unsubscribed.
946/// - **Efficient Monitoring**: Each subscription runs in its own async task for optimal performance.
947/// - **Real-time Updates**: Provides immediate notifications when monitored conditions are met.
948///
949/// ## Usage
950/// This struct implements the `Rpc` trait and is typically used as part of a larger
951/// WebSocket server infrastructure to provide real-time blockchain data to clients.
952///
953/// ## Notes
954/// - Each subscription is assigned a unique numeric ID for tracking and management.
955/// - The struct maintains separate maps for different subscription types to optimize performance.
956/// - All async operations are managed through the provided Tokio runtime handle.
957///
958/// ## See Also
959/// - `Rpc`: The trait interface this struct implements
960/// - `RpcAccountSubscribeConfig`: Configuration options for account subscriptions
961pub struct SurfpoolWsRpc {
962    pub uid: atomic::AtomicUsize,
963    pub signature_subscription_map:
964        Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcSignatureResult>>>>>,
965    pub account_subscription_map:
966        Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<UiAccount>>>>>,
967    pub program_subscription_map:
968        Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcKeyedAccount>>>>>,
969    pub slot_subscription_map: Arc<RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>>,
970    pub logs_subscription_map:
971        Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcLogsResponse>>>>>,
972    pub snapshot_subscription_map:
973        Arc<RwLock<HashMap<SubscriptionId, Sink<crate::surfnet::SnapshotImportNotification>>>>,
974    pub tokio_handle: tokio::runtime::Handle,
975}
976
977impl Rpc for SurfpoolWsRpc {
978    type Metadata = Option<SurfpoolWebsocketMeta>;
979
980    /// Implementation of signature subscription for WebSocket clients.
981    ///
982    /// This method handles the complete lifecycle of signature subscriptions:
983    /// 1. Validates the provided signature string format
984    /// 2. Determines the subscription type (received vs commitment-based)
985    /// 3. Checks if the transaction already exists in the desired state
986    /// 4. If found and confirmed, immediately notifies the subscriber
987    /// 5. Otherwise, sets up a continuous monitoring loop
988    /// 6. Spawns an async task to handle ongoing subscription management
989    ///
990    /// # Error Handling
991    /// - Rejects subscription with `InvalidParams` for malformed signatures
992    /// - Handles RPC context retrieval failures
993    /// - Manages subscription cleanup on completion or failure
994    ///
995    /// # Concurrency
996    /// Each subscription runs in its own async task, allowing multiple
997    /// concurrent subscriptions without blocking each other.
998    fn signature_subscribe(
999        &self,
1000        meta: Self::Metadata,
1001        subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
1002        signature_str: String,
1003        config: Option<RpcSignatureSubscribeConfig>,
1004    ) {
1005        let _ = meta
1006            .as_ref()
1007            .map(|m| m.log_debug("Websocket 'signature_subscribe' connection established"));
1008
1009        let signature = match Signature::from_str(&signature_str) {
1010            Ok(sig) => sig,
1011            Err(_) => {
1012                let error = Error {
1013                    code: ErrorCode::InvalidParams,
1014                    message: "Invalid signature format.".into(),
1015                    data: None,
1016                };
1017                if let Err(e) = subscriber.reject(error.clone()) {
1018                    log::error!("Failed to reject subscriber: {:?}", e);
1019                }
1020                return;
1021            }
1022        };
1023        let config = config.unwrap_or_default();
1024        let rpc_transaction_config = RpcTransactionConfig {
1025            encoding: Some(UiTransactionEncoding::Json),
1026            commitment: config.commitment,
1027            max_supported_transaction_version: Some(0),
1028        };
1029
1030        let subscription_type = if config.enable_received_notification.unwrap_or(false) {
1031            SignatureSubscriptionType::Received
1032        } else {
1033            SignatureSubscriptionType::Commitment(config.commitment.unwrap_or_default().commitment)
1034        };
1035
1036        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1037        let sub_id = SubscriptionId::Number(id as u64);
1038        let sink = match subscriber.assign_id(sub_id.clone()) {
1039            Ok(sink) => sink,
1040            Err(e) => {
1041                log::error!("Failed to assign subscription ID: {:?}", e);
1042                return;
1043            }
1044        };
1045        let active = Arc::clone(&self.signature_subscription_map);
1046        let meta = meta.clone();
1047        self.tokio_handle.spawn(async move {
1048            if let Ok(mut guard) = active.write() {
1049                guard.insert(sub_id.clone(), sink);
1050            } else {
1051                log::error!("Failed to acquire write lock on signature_subscription_map");
1052                return;
1053            }
1054
1055            let SurfnetRpcContext {
1056                svm_locker,
1057                remote_ctx,
1058            } = match meta.get_rpc_context(()) {
1059                Ok(res) => res,
1060                Err(e) => {
1061                    log::error!("Failed to get RPC context: {:?}", e);
1062                    if let Ok(mut guard) = active.write() {
1063                        if let Some(sink) = guard.remove(&sub_id) {
1064                            if let Err(e) = sink.notify(Err(e.into())) {
1065                                log::error!("Failed to notify client about RPC context error: {e}");
1066                            }
1067                        }
1068                    }
1069                    return;
1070                }
1071            };
1072            // get the signature from the SVM to see if it's already been processed
1073            let tx_result = match svm_locker
1074                .get_transaction(
1075                    &remote_ctx.map(|(r, _)| r),
1076                    &signature,
1077                    rpc_transaction_config,
1078                )
1079                .await
1080            {
1081                Ok(res) => res,
1082                Err(e) => {
1083                    if let Ok(mut guard) = active.write() {
1084                        if let Some(sink) = guard.remove(&sub_id) {
1085                            let _ = sink.notify(Err(e.into()));
1086                        }
1087                    }
1088                    return;
1089                }
1090            };
1091
1092            // if we already had the transaction, check if its confirmation status matches the desired status set by the subscription
1093            // if so, notify the user and complete the subscription
1094            // otherwise, subscribe to the transaction updates
1095            if let GetTransactionResult::FoundTransaction(_, _, tx) = tx_result {
1096                match (&subscription_type, tx.confirmation_status) {
1097                    (&SignatureSubscriptionType::Received, _)
1098                    | (
1099                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
1100                        Some(TransactionConfirmationStatus::Processed),
1101                    )
1102                    | (
1103                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
1104                        Some(TransactionConfirmationStatus::Confirmed),
1105                    )
1106                    | (
1107                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
1108                        Some(TransactionConfirmationStatus::Finalized),
1109                    )
1110                    | (
1111                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
1112                        Some(TransactionConfirmationStatus::Confirmed),
1113                    )
1114                    | (
1115                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
1116                        Some(TransactionConfirmationStatus::Finalized),
1117                    )
1118                    | (
1119                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Finalized),
1120                        Some(TransactionConfirmationStatus::Finalized),
1121                    ) => {
1122                        if let Ok(mut guard) = active.write() {
1123                            if let Some(sink) = guard.remove(&sub_id) {
1124                                let _ = sink.notify(Ok(RpcResponse {
1125                                    context: RpcResponseContext::new(tx.slot),
1126                                    value: RpcSignatureResult::ProcessedSignature(
1127                                        ProcessedSignatureResult {
1128                                            err: tx.err.map(|e| e.into()),
1129                                        },
1130                                    ),
1131                                }));
1132                            }
1133                        }
1134                        return;
1135                    }
1136                    _ => {}
1137                }
1138            }
1139
1140            // update our surfnet SVM to subscribe to the signature updates
1141            let rx =
1142                svm_locker.subscribe_for_signature_updates(&signature, subscription_type.clone());
1143
1144            loop {
1145                let (slot, some_err) = match rx.try_recv() {
1146                    Ok(msg) => msg,
1147                    Err(e) => {
1148                        match e {
1149                            TryRecvError::Empty => {
1150                                // no update yet, continue
1151                                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1152                                continue;
1153                            }
1154                            TryRecvError::Disconnected => {
1155                                warn!(
1156                                    "Signature subscription channel closed for sub id {:?}",
1157                                    sub_id
1158                                );
1159                                // channel closed, exit loop
1160                                break;
1161                            }
1162                        }
1163                    }
1164                };
1165
1166                let Ok(mut guard) = active.write() else {
1167                    log::error!("Failed to acquire read lock on signature_subscription_map");
1168                    break;
1169                };
1170
1171                let Some(sink) = guard.remove(&sub_id) else {
1172                    log::error!("Failed to get sink for subscription ID");
1173                    break;
1174                };
1175
1176                let res = match subscription_type {
1177                    SignatureSubscriptionType::Received => sink.notify(Ok(RpcResponse {
1178                        context: RpcResponseContext::new(slot),
1179                        value: RpcSignatureResult::ReceivedSignature(
1180                            ReceivedSignatureResult::ReceivedSignature,
1181                        ),
1182                    })),
1183                    SignatureSubscriptionType::Commitment(_) => sink.notify(Ok(RpcResponse {
1184                        context: RpcResponseContext::new(slot),
1185                        value: RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult {
1186                            err: some_err.map(|e| e.into()),
1187                        }),
1188                    })),
1189                };
1190
1191                if guard.is_empty() {
1192                    break;
1193                }
1194
1195                if let Err(e) = res {
1196                    log::error!("Failed to notify client about account update: {e}");
1197                    break;
1198                }
1199            }
1200        });
1201    }
1202
1203    /// Implementation of signature unsubscription for WebSocket clients.
1204    ///
1205    /// This method removes an active signature subscription from the internal
1206    /// tracking maps, effectively stopping further notifications for that subscription.
1207    ///
1208    /// # Implementation Details
1209    /// - Attempts to remove the subscription from the active subscriptions map
1210    /// - Returns success if the subscription existed and was removed
1211    /// - Returns an error if the subscription ID was not found
1212    ///
1213    /// # Thread Safety
1214    /// Uses write locks to ensure thread-safe removal from the subscription map.
1215    fn signature_unsubscribe(
1216        &self,
1217        _meta: Option<Self::Metadata>,
1218        subscription: SubscriptionId,
1219    ) -> Result<bool> {
1220        if let Ok(mut guard) = self.signature_subscription_map.write() {
1221            guard.remove(&subscription);
1222        } else {
1223            log::error!("Failed to acquire write lock on signature_subscription_map");
1224            return Err(Error {
1225                code: ErrorCode::InternalError,
1226                message: "Internal error.".into(),
1227                data: None,
1228            });
1229        };
1230        Ok(true)
1231    }
1232
1233    /// Implementation of account subscription for WebSocket clients.
1234    ///
1235    /// This method handles the complete lifecycle of account subscriptions:
1236    /// 1. Validates the provided public key string format
1237    /// 2. Parses the subscription configuration (commitment and encoding)
1238    /// 3. Generates a unique subscription ID and assigns it to the subscriber
1239    /// 4. Spawns an async task to continuously monitor account changes
1240    /// 5. Sends notifications whenever the account state changes
1241    ///
1242    /// # Monitoring Loop
1243    /// The spawned task runs a continuous loop that:
1244    /// - Checks if the subscription is still active (not unsubscribed)
1245    /// - Polls for account updates from the SVM
1246    /// - Sends notifications to the subscriber when changes occur
1247    /// - Automatically terminates when the subscription is removed
1248    ///
1249    /// # Error Handling
1250    /// - Rejects subscription with `InvalidParams` for malformed public keys
1251    /// - Handles encoding configuration for account data serialization
1252    /// - Manages subscription cleanup through the monitoring loop
1253    ///
1254    /// # Performance
1255    /// Uses efficient polling with minimal CPU overhead and automatic
1256    /// cleanup when subscriptions are no longer needed.
1257    fn account_subscribe(
1258        &self,
1259        meta: Self::Metadata,
1260        subscriber: Subscriber<RpcResponse<UiAccount>>,
1261        pubkey_str: String,
1262        config: Option<RpcAccountSubscribeConfig>,
1263    ) {
1264        let _ = meta
1265            .as_ref()
1266            .map(|m| m.log_debug("Websocket 'account_subscribe' connection established"));
1267
1268        let pubkey = match Pubkey::from_str(&pubkey_str) {
1269            Ok(pk) => pk,
1270            Err(_) => {
1271                let error = Error {
1272                    code: ErrorCode::InvalidParams,
1273                    message: "Invalid pubkey format.".into(),
1274                    data: None,
1275                };
1276                if subscriber.reject(error.clone()).is_err() {
1277                    log::error!("Failed to reject subscriber for invalid pubkey format.");
1278                }
1279                return;
1280            }
1281        };
1282
1283        let config = config.unwrap_or(RpcAccountSubscribeConfig {
1284            commitment: None,
1285            encoding: None,
1286        });
1287
1288        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1289        let sub_id = SubscriptionId::Number(id as u64);
1290        let sink = match subscriber.assign_id(sub_id.clone()) {
1291            Ok(sink) => sink,
1292            Err(e) => {
1293                log::error!("Failed to assign subscription ID: {:?}", e);
1294                return;
1295            }
1296        };
1297
1298        let account_active = Arc::clone(&self.account_subscription_map);
1299        let meta = meta.clone();
1300        let svm_locker = match meta.get_svm_locker() {
1301            Ok(locker) => locker,
1302            Err(e) => {
1303                log::error!("Failed to get SVM locker for account subscription: {e}");
1304                if let Err(e) = sink.notify(Err(e.into())) {
1305                    log::error!(
1306                        "Failed to send error notification to client for SVM locker failure: {e}"
1307                    );
1308                }
1309                return;
1310            }
1311        };
1312        let slot = svm_locker.with_svm_reader(|svm| svm.get_latest_absolute_slot());
1313
1314        self.tokio_handle.spawn(async move {
1315            if let Ok(mut guard) = account_active.write() {
1316                guard.insert(sub_id.clone(), sink);
1317            } else {
1318                log::error!("Failed to acquire write lock on account_subscription_map");
1319                return;
1320            }
1321
1322            // subscribe to account updates
1323            let rx = svm_locker.subscribe_for_account_updates(&pubkey, config.encoding);
1324
1325            loop {
1326                // if the subscription has been removed, break the loop
1327                if let Ok(guard) = account_active.read() {
1328                    if guard.get(&sub_id).is_none() {
1329                        break;
1330                    }
1331                } else {
1332                    log::error!("Failed to acquire read lock on account_subscription_map");
1333                    break;
1334                }
1335
1336                let Ok(ui_account) = rx.try_recv() else {
1337                    tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1338                    continue;
1339                };
1340
1341                let Ok(guard) = account_active.read() else {
1342                    log::error!("Failed to acquire read lock on account_subscription_map");
1343                    break;
1344                };
1345
1346                let Some(sink) = guard.get(&sub_id) else {
1347                    log::error!("Failed to get sink for subscription ID");
1348                    break;
1349                };
1350
1351                if let Err(e) = sink.notify(Ok(RpcResponse {
1352                    context: RpcResponseContext::new(slot),
1353                    value: ui_account,
1354                })) {
1355                    log::error!("Failed to notify client about account update: {e}");
1356                    break;
1357                }
1358            }
1359        });
1360    }
1361
1362    /// Implementation of account unsubscription for WebSocket clients.
1363    ///
1364    /// This method removes an active account subscription from the internal
1365    /// tracking maps, effectively stopping further notifications for that subscription.
1366    /// The monitoring loop in the corresponding subscription task will detect this
1367    /// removal and automatically terminate.
1368    ///
1369    /// # Implementation Details
1370    /// - Attempts to remove the subscription from the account subscriptions map
1371    /// - Returns success if the subscription existed and was removed
1372    /// - Returns an error if the subscription ID was not found
1373    /// - The removal triggers automatic cleanup of the monitoring task
1374    ///
1375    /// # Thread Safety
1376    /// Uses write locks to ensure thread-safe removal from the subscription map.
1377    /// The monitoring task uses read locks to check subscription status, creating
1378    /// a clean synchronization pattern.
1379    fn account_unsubscribe(
1380        &self,
1381        _meta: Option<Self::Metadata>,
1382        subscription: SubscriptionId,
1383    ) -> Result<bool> {
1384        if let Ok(mut guard) = self.account_subscription_map.write() {
1385            guard.remove(&subscription)
1386        } else {
1387            log::error!("Failed to acquire write lock on account_subscription_map");
1388            return Err(Error {
1389                code: ErrorCode::InternalError,
1390                message: "Internal error.".into(),
1391                data: None,
1392            });
1393        };
1394        Ok(true)
1395    }
1396
1397    fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>) {
1398        let _ = meta
1399            .as_ref()
1400            .map(|m| m.log_debug("Websocket 'slot_subscribe' connection established"));
1401
1402        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1403        let sub_id = SubscriptionId::Number(id as u64);
1404        let sink = match subscriber.assign_id(sub_id.clone()) {
1405            Ok(sink) => sink,
1406            Err(e) => {
1407                log::error!("Failed to assign subscription ID: {:?}", e);
1408                return;
1409            }
1410        };
1411
1412        let slot_active = Arc::clone(&self.slot_subscription_map);
1413        let meta = meta.clone();
1414
1415        let svm_locker = match meta.get_svm_locker() {
1416            Ok(locker) => locker,
1417            Err(e) => {
1418                log::error!("Failed to get SVM locker for slot subscription: {e}");
1419                if let Err(e) = sink.notify(Err(e.into())) {
1420                    log::error!(
1421                        "Failed to send error notification to client for SVM locker failure: {e}"
1422                    );
1423                }
1424                return;
1425            }
1426        };
1427
1428        self.tokio_handle.spawn(async move {
1429            if let Ok(mut guard) = slot_active.write() {
1430                guard.insert(sub_id.clone(), sink);
1431            } else {
1432                log::error!("Failed to acquire write lock on slot_subscription_map");
1433                return;
1434            }
1435
1436            let rx = svm_locker.subscribe_for_slot_updates();
1437
1438            loop {
1439                // if the subscription has been removed, break the loop
1440                if let Ok(guard) = slot_active.read() {
1441                    if guard.get(&sub_id).is_none() {
1442                        break;
1443                    }
1444                } else {
1445                    log::error!("Failed to acquire read lock on slot_subscription_map");
1446                    break;
1447                }
1448
1449                let Ok(slot_info) = rx.try_recv() else {
1450                    tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1451                    continue;
1452                };
1453
1454                let Ok(guard) = slot_active.read() else {
1455                    log::error!("Failed to acquire read lock on slots_subscription_map");
1456                    break;
1457                };
1458
1459                let Some(sink) = guard.get(&sub_id) else {
1460                    log::error!("Failed to get sink for subscription ID");
1461                    break;
1462                };
1463
1464                if let Err(e) = sink.notify(Ok(slot_info)) {
1465                    log::error!("Failed to notify client about slots update: {e}");
1466                    break;
1467                }
1468            }
1469        });
1470    }
1471
1472    fn slot_unsubscribe(
1473        &self,
1474        _meta: Option<Self::Metadata>,
1475        subscription: SubscriptionId,
1476    ) -> Result<bool> {
1477        if let Ok(mut guard) = self.slot_subscription_map.write() {
1478            guard.remove(&subscription)
1479        } else {
1480            log::error!("Failed to acquire write lock on slot_subscription_map");
1481            return Err(Error {
1482                code: ErrorCode::InternalError,
1483                message: "Internal error.".into(),
1484                data: None,
1485            });
1486        };
1487        Ok(true)
1488    }
1489
1490    fn logs_subscribe(
1491        &self,
1492        meta: Self::Metadata,
1493        subscriber: Subscriber<RpcResponse<RpcLogsResponse>>,
1494        mentions: Option<RpcTransactionLogsFilter>,
1495        commitment: Option<CommitmentConfig>,
1496    ) {
1497        let _ = meta
1498            .as_ref()
1499            .map(|m| m.log_debug("Websocket 'logs_subscribe' connection established"));
1500
1501        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1502        let sub_id = SubscriptionId::Number(id as u64);
1503        let sink = match subscriber.assign_id(sub_id.clone()) {
1504            Ok(sink) => sink,
1505            Err(e) => {
1506                log::error!("Failed to assign subscription ID: {:?}", e);
1507                return;
1508            }
1509        };
1510
1511        let mentions = mentions.unwrap_or(RpcTransactionLogsFilter::All);
1512        let commitment = commitment.unwrap_or_default().commitment;
1513
1514        let logs_active = Arc::clone(&self.logs_subscription_map);
1515        let meta = meta.clone();
1516
1517        let svm_locker = match meta.get_svm_locker() {
1518            Ok(locker) => locker,
1519            Err(e) => {
1520                log::error!("Failed to get SVM locker for slot subscription: {e}");
1521                if let Err(e) = sink.notify(Err(e.into())) {
1522                    log::error!(
1523                        "Failed to send error notification to client for SVM locker failure: {e}"
1524                    );
1525                }
1526                return;
1527            }
1528        };
1529
1530        self.tokio_handle.spawn(async move {
1531            if let Ok(mut guard) = logs_active.write() {
1532                guard.insert(sub_id.clone(), sink);
1533            } else {
1534                log::error!("Failed to acquire write lock on slot_subscription_map");
1535                return;
1536            }
1537
1538            let rx = svm_locker.subscribe_for_logs_updates(&commitment, &mentions);
1539
1540            loop {
1541                // if the subscription has been removed, break the loop
1542                if let Ok(guard) = logs_active.read() {
1543                    if guard.get(&sub_id).is_none() {
1544                        break;
1545                    }
1546                } else {
1547                    log::error!("Failed to acquire read lock on slot_subscription_map");
1548                    break;
1549                }
1550
1551                let Ok((slot, value)) = rx.try_recv() else {
1552                    tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1553                    continue;
1554                };
1555
1556                let Ok(guard) = logs_active.read() else {
1557                    log::error!("Failed to acquire read lock on logs_subscription_map");
1558                    break;
1559                };
1560
1561                let Some(sink) = guard.get(&sub_id) else {
1562                    log::error!("Failed to get sink for subscription ID");
1563                    break;
1564                };
1565
1566                if let Err(e) = sink.notify(Ok(RpcResponse {
1567                    context: RpcResponseContext::new(slot),
1568                    value,
1569                })) {
1570                    log::error!("Failed to notify client about logs update: {e}");
1571                    break;
1572                }
1573            }
1574        });
1575    }
1576
1577    fn logs_unsubscribe(
1578        &self,
1579        _meta: Option<Self::Metadata>,
1580        subscription: SubscriptionId,
1581    ) -> Result<bool> {
1582        if let Ok(mut guard) = self.logs_subscription_map.write() {
1583            guard.remove(&subscription);
1584        } else {
1585            log::error!("Failed to acquire write lock on logs_subscription_map");
1586            return Err(Error {
1587                code: ErrorCode::InternalError,
1588                message: "Internal error.".into(),
1589                data: None,
1590            });
1591        };
1592        Ok(true)
1593    }
1594
1595    fn root_subscribe(&self, meta: Self::Metadata, _subscriber: Subscriber<RpcResponse<()>>) {
1596        let _ = meta
1597            .as_ref()
1598            .map(|m| m.log_warn("Websocket method 'root_subscribe' is uninmplemented"));
1599    }
1600
1601    fn root_unsubscribe(
1602        &self,
1603        _meta: Option<Self::Metadata>,
1604        _subscription: SubscriptionId,
1605    ) -> Result<bool> {
1606        Ok(true)
1607    }
1608
1609    /// Implementation of program subscription for WebSocket clients.
1610    ///
1611    /// This method handles the complete lifecycle of program subscriptions:
1612    /// 1. Validates the provided program public key string format
1613    /// 2. Parses the subscription configuration (commitment, encoding, and filters)
1614    /// 3. Generates a unique subscription ID and assigns it to the subscriber
1615    /// 4. Spawns an async task to continuously monitor account changes for the program
1616    /// 5. Sends notifications whenever an account owned by the program changes and matches filters
1617    ///
1618    /// # Monitoring Loop
1619    /// The spawned task runs a continuous loop that:
1620    /// - Checks if the subscription is still active (not unsubscribed)
1621    /// - Polls for program account updates from the SVM
1622    /// - Applies configured filters (dataSize, memcmp) before notifying
1623    /// - Sends `RpcKeyedAccount` notifications (including account pubkey) to the subscriber
1624    /// - Automatically terminates when the subscription is removed
1625    ///
1626    /// # Error Handling
1627    /// - Rejects subscription with `InvalidParams` for malformed public keys
1628    /// - Handles encoding configuration for account data serialization
1629    /// - Manages subscription cleanup through the monitoring loop
1630    ///
1631    /// # Performance
1632    /// Uses efficient polling with minimal CPU overhead and automatic
1633    /// cleanup when subscriptions are no longer needed.
1634    fn program_subscribe(
1635        &self,
1636        meta: Self::Metadata,
1637        subscriber: Subscriber<RpcResponse<RpcKeyedAccount>>,
1638        pubkey_str: String,
1639        config: Option<RpcProgramSubscribeConfig>,
1640    ) {
1641        let _ = meta
1642            .as_ref()
1643            .map(|m| m.log_debug("Websocket 'program_subscribe' connection established"));
1644
1645        let program_id = match Pubkey::from_str(&pubkey_str) {
1646            Ok(pk) => pk,
1647            Err(_) => {
1648                let error = Error {
1649                    code: ErrorCode::InvalidParams,
1650                    message: "Invalid pubkey format.".into(),
1651                    data: None,
1652                };
1653                if subscriber.reject(error.clone()).is_err() {
1654                    log::error!("Failed to reject subscriber for invalid pubkey format.");
1655                }
1656                return;
1657            }
1658        };
1659
1660        let config = config.unwrap_or_default();
1661
1662        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1663        let sub_id = SubscriptionId::Number(id as u64);
1664        let sink = match subscriber.assign_id(sub_id.clone()) {
1665            Ok(sink) => sink,
1666            Err(e) => {
1667                log::error!("Failed to assign subscription ID: {:?}", e);
1668                return;
1669            }
1670        };
1671
1672        let program_active = Arc::clone(&self.program_subscription_map);
1673        let meta = meta.clone();
1674        let svm_locker = match meta.get_svm_locker() {
1675            Ok(locker) => locker,
1676            Err(e) => {
1677                log::error!("Failed to get SVM locker for program subscription: {e}");
1678                if let Err(e) = sink.notify(Err(e.into())) {
1679                    log::error!(
1680                        "Failed to send error notification to client for SVM locker failure: {e}"
1681                    );
1682                }
1683                return;
1684            }
1685        };
1686        let slot = svm_locker.with_svm_reader(|svm| svm.get_latest_absolute_slot());
1687
1688        self.tokio_handle.spawn(async move {
1689            if let Ok(mut guard) = program_active.write() {
1690                guard.insert(sub_id.clone(), sink);
1691            } else {
1692                log::error!("Failed to acquire write lock on program_subscription_map");
1693                return;
1694            }
1695
1696            let rx = svm_locker.subscribe_for_program_updates(
1697                &program_id,
1698                config.encoding,
1699                config.filters,
1700            );
1701
1702            loop {
1703                // if the subscription has been removed, break the loop
1704                if let Ok(guard) = program_active.read() {
1705                    if guard.get(&sub_id).is_none() {
1706                        break;
1707                    }
1708                } else {
1709                    log::error!("Failed to acquire read lock on program_subscription_map");
1710                    break;
1711                }
1712
1713                let Ok(keyed_account) = rx.try_recv() else {
1714                    tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1715                    continue;
1716                };
1717
1718                let Ok(guard) = program_active.read() else {
1719                    log::error!("Failed to acquire read lock on program_subscription_map");
1720                    break;
1721                };
1722
1723                let Some(sink) = guard.get(&sub_id) else {
1724                    log::error!("Failed to get sink for subscription ID");
1725                    break;
1726                };
1727
1728                if let Err(e) = sink.notify(Ok(RpcResponse {
1729                    context: RpcResponseContext::new(slot),
1730                    value: keyed_account,
1731                })) {
1732                    log::error!("Failed to notify client about program account update: {e}");
1733                    break;
1734                }
1735            }
1736        });
1737    }
1738
1739    /// Implementation of program unsubscription for WebSocket clients.
1740    ///
1741    /// This method removes an active program subscription from the internal
1742    /// tracking maps, effectively stopping further notifications for that subscription.
1743    /// The monitoring loop in the corresponding subscription task will detect this
1744    /// removal and automatically terminate.
1745    ///
1746    /// # Implementation Details
1747    /// - Attempts to remove the subscription from the program subscriptions map
1748    /// - Returns success if the subscription existed and was removed
1749    /// - Returns an error if the lock could not be acquired
1750    /// - The removal triggers automatic cleanup of the monitoring task
1751    ///
1752    /// # Thread Safety
1753    /// Uses write locks to ensure thread-safe removal from the subscription map.
1754    /// The monitoring task uses read locks to check subscription status, creating
1755    /// a clean synchronization pattern.
1756    fn program_unsubscribe(
1757        &self,
1758        _meta: Option<Self::Metadata>,
1759        subscription: SubscriptionId,
1760    ) -> Result<bool> {
1761        if let Ok(mut guard) = self.program_subscription_map.write() {
1762            guard.remove(&subscription);
1763        } else {
1764            log::error!("Failed to acquire write lock on program_subscription_map");
1765            return Err(Error {
1766                code: ErrorCode::InternalError,
1767                message: "Internal error.".into(),
1768                data: None,
1769            });
1770        };
1771        Ok(true)
1772    }
1773
1774    fn slots_updates_subscribe(
1775        &self,
1776        meta: Self::Metadata,
1777        _subscriber: Subscriber<RpcResponse<()>>,
1778    ) {
1779        let _ = meta
1780            .as_ref()
1781            .map(|m| m.log_warn("Websocket method 'slots_updates_subscribe' is uninmplemented"));
1782    }
1783
1784    fn slots_updates_unsubscribe(
1785        &self,
1786        _meta: Option<Self::Metadata>,
1787        _subscription: SubscriptionId,
1788    ) -> Result<bool> {
1789        Ok(true)
1790    }
1791
1792    fn block_subscribe(&self, meta: Self::Metadata, _subscriber: Subscriber<RpcResponse<()>>) {
1793        let _ = meta
1794            .as_ref()
1795            .map(|m| m.log_warn("Websocket method 'block_subscribe' is uninmplemented"));
1796    }
1797
1798    fn block_unsubscribe(
1799        &self,
1800        _meta: Option<Self::Metadata>,
1801        _subscription: SubscriptionId,
1802    ) -> Result<bool> {
1803        Ok(true)
1804    }
1805
1806    fn vote_subscribe(&self, meta: Self::Metadata, _subscriber: Subscriber<RpcResponse<()>>) {
1807        let _ = meta
1808            .as_ref()
1809            .map(|m| m.log_warn("Websocket method 'vote_subscribe' is uninmplemented"));
1810    }
1811
1812    fn vote_unsubscribe(
1813        &self,
1814        _meta: Option<Self::Metadata>,
1815        _subscription: SubscriptionId,
1816    ) -> Result<bool> {
1817        Ok(true)
1818    }
1819
1820    fn snapshot_subscribe(
1821        &self,
1822        meta: Self::Metadata,
1823        subscriber: Subscriber<crate::surfnet::SnapshotImportNotification>,
1824        snapshot_url: String,
1825    ) {
1826        let _ = meta
1827            .as_ref()
1828            .map(|m| m.log_debug("Websocket 'snapshot_subscribe' connection established"));
1829
1830        // Validate snapshot URL format
1831        if snapshot_url.is_empty() {
1832            let error = Error {
1833                code: ErrorCode::InvalidParams,
1834                message: "Invalid snapshot URL: URL cannot be empty.".into(),
1835                data: None,
1836            };
1837            if let Err(e) = subscriber.reject(error.clone()) {
1838                log::error!("Failed to reject subscriber: {:?}", e);
1839            }
1840            return;
1841        }
1842
1843        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
1844        let sub_id = SubscriptionId::Number(id as u64);
1845        let sink = match subscriber.assign_id(sub_id.clone()) {
1846            Ok(sink) => sink,
1847            Err(e) => {
1848                log::error!("Failed to assign subscription ID: {:?}", e);
1849                return;
1850            }
1851        };
1852
1853        let snapshot_active = Arc::clone(&self.snapshot_subscription_map);
1854        let meta = meta.clone();
1855
1856        let svm_locker = match meta.get_svm_locker() {
1857            Ok(locker) => locker,
1858            Err(e) => {
1859                log::error!("Failed to get SVM locker for snapshot subscription: {e}");
1860                if let Err(e) = sink.notify(Err(e.into())) {
1861                    log::error!(
1862                        "Failed to send error notification to client for SVM locker failure: {e}"
1863                    );
1864                }
1865                return;
1866            }
1867        };
1868
1869        self.tokio_handle.spawn(async move {
1870            if let Ok(mut guard) = snapshot_active.write() {
1871                guard.insert(sub_id.clone(), sink);
1872            } else {
1873                log::error!("Failed to acquire write lock on snapshot_subscription_map");
1874                return;
1875            }
1876
1877            // Generate a unique snapshot ID for this import operation
1878            let snapshot_id = format!(
1879                "snapshot_{}",
1880                chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
1881            );
1882
1883            // Subscribe to snapshot import updates
1884            // The locker will send the Started notification through the channel
1885            let rx = svm_locker.subscribe_for_snapshot_import_updates(&snapshot_url, &snapshot_id);
1886
1887            loop {
1888                // if the subscription has been removed, break the loop
1889                if let Ok(guard) = snapshot_active.read() {
1890                    if guard.get(&sub_id).is_none() {
1891                        break;
1892                    }
1893                } else {
1894                    log::error!("Failed to acquire read lock on snapshot_subscription_map");
1895                    break;
1896                }
1897
1898                let Ok(notification) = rx.try_recv() else {
1899                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1900                    continue;
1901                };
1902
1903                let Ok(guard) = snapshot_active.read() else {
1904                    log::error!("Failed to acquire read lock on snapshot_subscription_map");
1905                    break;
1906                };
1907
1908                let Some(sink) = guard.get(&sub_id) else {
1909                    log::error!("Failed to get sink for subscription ID");
1910                    break;
1911                };
1912
1913                if let Err(e) = sink.notify(Ok(notification)) {
1914                    log::error!("Failed to notify client about snapshot import update: {e}");
1915                    break;
1916                }
1917
1918                // If the import is completed or failed, we can end the subscription
1919                if let Ok(guard) = snapshot_active.read() {
1920                    if let Some(_sink) = guard.get(&sub_id) {
1921                        // Check if this was the final notification
1922                        // We'll determine this by checking the status in the last notification
1923                        // For now, we'll keep the subscription alive in case of multiple imports
1924                    }
1925                }
1926            }
1927        });
1928    }
1929
1930    fn snapshot_unsubscribe(
1931        &self,
1932        _meta: Option<Self::Metadata>,
1933        subscription: SubscriptionId,
1934    ) -> Result<bool> {
1935        if let Ok(mut guard) = self.snapshot_subscription_map.write() {
1936            guard.remove(&subscription);
1937        } else {
1938            log::error!("Failed to acquire write lock on snapshot_subscription_map");
1939            return Err(Error {
1940                code: ErrorCode::InternalError,
1941                message: "Internal error.".into(),
1942                data: None,
1943            });
1944        };
1945        Ok(true)
1946    }
1947}