surfpool_core/rpc/
ws.rs

1use std::{
2    collections::HashMap,
3    str::FromStr,
4    sync::{atomic, Arc, RwLock},
5};
6
7use jsonrpc_core::{Error, ErrorCode, Result};
8use jsonrpc_derive::rpc;
9use jsonrpc_pubsub::{
10    typed::{Sink, Subscriber},
11    SubscriptionId,
12};
13use solana_account_decoder::{UiAccount, UiAccountEncoding};
14use solana_client::{
15    rpc_config::RpcSignatureSubscribeConfig,
16    rpc_response::{
17        ProcessedSignatureResult, ReceivedSignatureResult, RpcResponseContext, RpcSignatureResult,
18    },
19};
20use solana_commitment_config::{CommitmentConfig, CommitmentLevel};
21use solana_pubkey::Pubkey;
22use solana_rpc_client_api::response::{Response as RpcResponse, SlotInfo};
23use solana_signature::Signature;
24use solana_transaction_status::TransactionConfirmationStatus;
25
26use super::{State, SurfnetRpcContext, SurfpoolWebsocketMeta};
27use crate::surfnet::{locker::SvmAccessContext, GetTransactionResult, SignatureSubscriptionType};
28
29/// Configuration for account subscription requests.
30///
31/// This struct defines the parameters that clients can specify when subscribing
32/// to account change notifications through WebSocket connections. It allows customization
33/// of both the commitment level for updates and the encoding format for account data.
34///
35/// ## Fields
36/// - `commitment`: Optional commitment configuration specifying when to send notifications
37///   (processed, confirmed, or finalized). Defaults to the node's default commitment level.
38/// - `encoding`: Optional encoding format for account data serialization (base58, base64, jsonParsed, etc.).
39///   Defaults to base58 encoding if not specified.
40///
41/// ## Usage
42/// Clients can provide this configuration to customize their subscription behavior:
43/// - Set commitment level to control notification timing based on confirmation status
44/// - Set encoding to specify the preferred format for receiving account data
45///
46/// ## Example Usage
47/// ```json
48/// {
49///   "commitment": "confirmed",
50///   "encoding": "base64"
51/// }
52/// ```
53#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub struct RpcAccountSubscribeConfig {
56    #[serde(flatten)]
57    pub commitment: Option<CommitmentConfig>,
58    pub encoding: Option<UiAccountEncoding>,
59}
60
61#[rpc]
62pub trait Rpc {
63    type Metadata;
64
65    /// Subscribe to signature status notifications via WebSocket.
66    ///
67    /// This method allows clients to subscribe to status updates for a specific transaction signature.
68    /// The subscriber will receive notifications when the transaction reaches the desired confirmation level
69    /// or when it's initially received by the network (if configured).
70    ///
71    /// ## Parameters
72    /// - `meta`: WebSocket metadata containing RPC context and connection information.
73    /// - `subscriber`: The subscription sink for sending signature status notifications to the client.
74    /// - `signature_str`: The transaction signature to monitor, as a base-58 encoded string.
75    /// - `config`: Optional configuration specifying commitment level and notification preferences.
76    ///
77    /// ## Returns
78    /// This method does not return a value directly. Instead, it establishes a WebSocket subscription
79    /// that will send `RpcResponse<RpcSignatureResult>` notifications to the subscriber when the
80    /// transaction status changes.
81    ///
82    /// ## Example WebSocket Request
83    /// ```json
84    /// {
85    ///   "jsonrpc": "2.0",
86    ///   "id": 1,
87    ///   "method": "signatureSubscribe",
88    ///   "params": [
89    ///     "2id3YC2jK9G5Wo2phDx4gJVAew8DcY5NAojnVuao8rkxwPYPe8cSwE5GzhEgJA2y8fVjDEo6iR6ykBvDxrTQrtpb",
90    ///     {
91    ///       "commitment": "finalized",
92    ///       "enableReceivedNotification": false
93    ///     }
94    ///   ]
95    /// }
96    /// ```
97    ///
98    /// ## Example WebSocket Response (Subscription Confirmation)
99    /// ```json
100    /// {
101    ///   "jsonrpc": "2.0",
102    ///   "result": 0,
103    ///   "id": 1
104    /// }
105    /// ```
106    ///
107    /// ## Example WebSocket Notification
108    /// ```json
109    /// {
110    ///   "jsonrpc": "2.0",
111    ///   "method": "signatureNotification",
112    ///   "params": {
113    ///     "result": {
114    ///       "context": {
115    ///         "slot": 5207624
116    ///       },
117    ///       "value": {
118    ///         "err": null
119    ///       }
120    ///     },
121    ///     "subscription": 0
122    ///   }
123    /// }
124    /// ```
125    ///
126    /// ## Notes
127    /// - If the transaction already exists with the desired confirmation status, the subscriber
128    ///   will be notified immediately and the subscription will complete.
129    /// - The subscription automatically terminates after sending the first matching notification.
130    /// - Invalid signature formats will cause the subscription to be rejected with an error.
131    /// - Each subscription runs in its own async task for optimal performance.
132    ///
133    /// ## See Also
134    /// - `signatureUnsubscribe`: Remove an active signature subscription
135    /// - `getSignatureStatuses`: Get current status of multiple signatures
136    #[pubsub(
137        subscription = "signatureNotification",
138        subscribe,
139        name = "signatureSubscribe"
140    )]
141    fn signature_subscribe(
142        &self,
143        meta: Self::Metadata,
144        subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
145        signature_str: String,
146        config: Option<RpcSignatureSubscribeConfig>,
147    );
148
149    /// Unsubscribe from signature status notifications.
150    ///
151    /// This method removes an active signature subscription, stopping further notifications
152    /// for the specified subscription ID.
153    ///
154    /// ## Parameters
155    /// - `meta`: Optional WebSocket metadata containing connection information.
156    /// - `subscription`: The subscription ID to remove, as returned by `signatureSubscribe`.
157    ///
158    /// ## Returns
159    /// A `Result<bool>` indicating whether the unsubscription was successful:
160    /// - `Ok(true)` if the subscription was successfully removed
161    /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
162    ///
163    /// ## Example WebSocket Request
164    /// ```json
165    /// {
166    ///   "jsonrpc": "2.0",
167    ///   "id": 1,
168    ///   "method": "signatureUnsubscribe",
169    ///   "params": [0]
170    /// }
171    /// ```
172    ///
173    /// ## Example WebSocket Response
174    /// ```json
175    /// {
176    ///   "jsonrpc": "2.0",
177    ///   "result": true,
178    ///   "id": 1
179    /// }
180    /// ```
181    ///
182    /// ## Notes
183    /// - Attempting to unsubscribe from a non-existent subscription will return an error.
184    /// - Successfully unsubscribed connections will no longer receive notifications.
185    /// - This method is thread-safe and can be called concurrently.
186    ///
187    /// ## See Also
188    /// - `signatureSubscribe`: Create a signature status subscription
189    #[pubsub(
190        subscription = "signatureNotification",
191        unsubscribe,
192        name = "signatureUnsubscribe"
193    )]
194    fn signature_unsubscribe(
195        &self,
196        meta: Option<Self::Metadata>,
197        subscription: SubscriptionId,
198    ) -> Result<bool>;
199
200    /// Subscribe to account change notifications via WebSocket.
201    ///
202    /// This method allows clients to subscribe to updates for a specific account.
203    /// The subscriber will receive notifications whenever the account's data, lamports balance,
204    /// ownership, or other properties change.
205    ///
206    /// ## Parameters
207    /// - `meta`: WebSocket metadata containing RPC context and connection information.
208    /// - `subscriber`: The subscription sink for sending account update notifications to the client.
209    /// - `pubkey_str`: The account public key to monitor, as a base-58 encoded string.
210    /// - `config`: Optional configuration specifying commitment level and encoding format for account data.
211    ///
212    /// ## Returns
213    /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
214    /// subscription that will send `RpcResponse<UiAccount>` notifications to the subscriber whenever
215    /// the account state changes.
216    ///
217    /// ## Example WebSocket Request
218    /// ```json
219    /// {
220    ///   "jsonrpc": "2.0",
221    ///   "id": 1,
222    ///   "method": "accountSubscribe",
223    ///   "params": [
224    ///     "CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12",
225    ///     {
226    ///       "commitment": "finalized",
227    ///       "encoding": "base64"
228    ///     }
229    ///   ]
230    /// }
231    /// ```
232    ///
233    /// ## Example WebSocket Response (Subscription Confirmation)
234    /// ```json
235    /// {
236    ///   "jsonrpc": "2.0",
237    ///   "result": 23784,
238    ///   "id": 1
239    /// }
240    /// ```
241    ///
242    /// ## Example WebSocket Notification
243    /// ```json
244    /// {
245    ///   "jsonrpc": "2.0",
246    ///   "method": "accountNotification",
247    ///   "params": {
248    ///     "result": {
249    ///       "context": {
250    ///         "slot": 5208469
251    ///       },
252    ///       "value": {
253    ///         "data": ["base64EncodedAccountData", "base64"],
254    ///         "executable": false,
255    ///         "lamports": 33594,
256    ///         "owner": "11111111111111111111111111111112",
257    ///         "rentEpoch": 636
258    ///       }
259    ///     },
260    ///     "subscription": 23784
261    ///   }
262    /// }
263    /// ```
264    ///
265    /// ## Notes
266    /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
267    /// - Account notifications are sent whenever any aspect of the account changes.
268    /// - The encoding format specified in the config determines how account data is serialized.
269    /// - Invalid public key formats will cause the subscription to be rejected with an error.
270    /// - Each subscription runs in its own async task to ensure optimal performance.
271    ///
272    /// ## See Also
273    /// - `accountUnsubscribe`: Remove an active account subscription
274    /// - `getAccountInfo`: Get current account information
275    #[pubsub(
276        subscription = "accountNotification",
277        subscribe,
278        name = "accountSubscribe"
279    )]
280    fn account_subscribe(
281        &self,
282        meta: Self::Metadata,
283        subscriber: Subscriber<RpcResponse<UiAccount>>,
284        pubkey_str: String,
285        config: Option<RpcAccountSubscribeConfig>,
286    );
287
288    /// Unsubscribe from account change notifications.
289    ///
290    /// This method removes an active account subscription, stopping further notifications
291    /// for the specified subscription ID. The monitoring task will automatically terminate
292    /// when the subscription is removed.
293    ///
294    /// ## Parameters
295    /// - `meta`: Optional WebSocket metadata containing connection information.
296    /// - `subscription`: The subscription ID to remove, as returned by `accountSubscribe`.
297    ///
298    /// ## Returns
299    /// A `Result<bool>` indicating whether the unsubscription was successful:
300    /// - `Ok(true)` if the subscription was successfully removed
301    /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
302    ///
303    /// ## Example WebSocket Request
304    /// ```json
305    /// {
306    ///   "jsonrpc": "2.0",
307    ///   "id": 1,
308    ///   "method": "accountUnsubscribe",
309    ///   "params": [23784]
310    /// }
311    /// ```
312    ///
313    /// ## Example WebSocket Response
314    /// ```json
315    /// {
316    ///   "jsonrpc": "2.0",
317    ///   "result": true,
318    ///   "id": 1
319    /// }
320    /// ```
321    ///
322    /// ## Notes
323    /// - Attempting to unsubscribe from a non-existent subscription will return an error.
324    /// - Successfully unsubscribed connections will no longer receive account notifications.
325    /// - The monitoring task automatically detects subscription removal and terminates gracefully.
326    /// - This method is thread-safe and can be called concurrently.
327    ///
328    /// ## See Also
329    /// - `accountSubscribe`: Create an account change subscription
330    #[pubsub(
331        subscription = "accountNotification",
332        unsubscribe,
333        name = "accountUnsubscribe"
334    )]
335    fn account_unsubscribe(
336        &self,
337        meta: Option<Self::Metadata>,
338        subscription: SubscriptionId,
339    ) -> Result<bool>;
340
341    /// Subscribe to slot notifications.
342    ///
343    /// This method allows clients to subscribe to updates for a specific slot.
344    /// The subscriber will receive notifications whenever the slot changes.
345    ///
346    /// ## Parameters
347    /// - `meta`: WebSocket metadata containing RPC context and connection information.
348    /// - `subscriber`: The subscription sink for sending slot update notifications to the client.
349    ///
350    /// ## Returns
351    /// This method does not return a value directly. Instead, it establishes a continuous WebSocket
352    /// subscription that will send `SlotInfo` notifications to the subscriber whenever
353    /// the slot changes.
354    ///
355    /// ## Example WebSocket Request
356    /// ```json
357    /// {
358    ///   "jsonrpc": "2.0",
359    ///   "id": 1,
360    ///   "method": "slotSubscribe",
361    ///   "params": [
362    ///     {
363    ///       "commitment": "finalized"
364    ///     }
365    ///   ]
366    /// }
367    /// ```
368    ///
369    /// ## Example WebSocket Response (Subscription Confirmation)
370    /// ```json
371    /// {
372    ///   "jsonrpc": "2.0",
373    ///   "result": 5207624,
374    ///   "id": 1
375    /// }
376    /// ```
377    ///
378    /// ## Example WebSocket Notification
379    /// ```json
380    /// {
381    ///   "jsonrpc": "2.0",
382    ///   "method": "slotNotification",
383    ///   "params": {
384    ///     "result": {
385    ///       "slot": 5207624
386    ///     },
387    ///     "subscription": 5207624
388    ///   }
389    /// }
390    /// ```
391    ///
392    /// ## Notes
393    /// - The subscription remains active until explicitly unsubscribed or the connection is closed.
394    /// - Slot notifications are sent whenever the slot changes.
395    /// - The subscription automatically terminates when the slot changes.
396    /// - Each subscription runs in its own async task for optimal performance.
397    ///
398    /// ## See Also
399    /// - `slotUnsubscribe`: Remove an active slot subscription
400    #[pubsub(subscription = "slotNotification", subscribe, name = "slotSubscribe")]
401    fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>);
402
403    /// Unsubscribe from slot notifications.
404    ///
405    /// This method removes an active slot subscription, stopping further notifications
406    /// for the specified subscription ID.
407    ///
408    /// ## Parameters
409    /// - `meta`: Optional WebSocket metadata containing connection information.
410    /// - `subscription`: The subscription ID to remove, as returned by `slotSubscribe`.
411    ///
412    /// ## Returns
413    /// A `Result<bool>` indicating whether the unsubscription was successful:
414    /// - `Ok(true)` if the subscription was successfully removed
415    /// - `Err(Error)` with `InvalidParams` if the subscription ID doesn't exist
416    ///
417    /// ## Example WebSocket Request
418    /// ```json
419    /// {
420    ///   "jsonrpc": "2.0",
421    ///   "id": 1,
422    ///   "method": "slotUnsubscribe",
423    ///   "params": [0]
424    /// }
425    /// ```
426    ///
427    /// ## Example WebSocket Response
428    /// ```json
429    /// {
430    ///   "jsonrpc": "2.0",
431    ///   "result": true,
432    ///   "id": 1
433    /// }
434    /// ```
435    ///
436    /// ## Notes
437    /// - Attempting to unsubscribe from a non-existent subscription will return an error.
438    /// - Successfully unsubscribed connections will no longer receive notifications.
439    /// - This method is thread-safe and can be called concurrently.
440    ///
441    /// ## See Also
442    /// - `slotSubscribe`: Create a slot subscription
443    #[pubsub(
444        subscription = "slotNotification",
445        unsubscribe,
446        name = "slotUnsubscribe"
447    )]
448    fn slot_unsubscribe(
449        &self,
450        meta: Option<Self::Metadata>,
451        subscription: SubscriptionId,
452    ) -> Result<bool>;
453}
454
455/// WebSocket RPC server implementation for Surfpool.
456///
457/// This struct manages WebSocket subscriptions for both signature status updates
458/// and account change notifications in the Surfpool environment. It provides a complete
459/// WebSocket RPC interface that allows clients to subscribe to real-time updates
460/// from the Solana Virtual Machine (SVM) and handles the lifecycle of WebSocket connections.
461///
462/// ## Fields
463/// - `uid`: Atomic counter for generating unique subscription IDs across all subscription types.
464/// - `signature_subscription_map`: Thread-safe HashMap containing active signature subscriptions, mapping subscription IDs to their notification sinks.
465/// - `account_subscription_map`: Thread-safe HashMap containing active account subscriptions, mapping subscription IDs to their notification sinks.
466/// - `slot_subscription_map`: Thread-safe HashMap containing active slot subscriptions, mapping subscription IDs to their notification sinks.
467/// - `tokio_handle`: Runtime handle for spawning asynchronous subscription monitoring tasks.
468///
469/// ## Features
470/// - **Concurrent Subscriptions**: Supports multiple simultaneous subscriptions without blocking.
471/// - **Thread Safety**: All subscription management operations are thread-safe using RwLock.
472/// - **Automatic Cleanup**: Subscriptions are automatically cleaned up when completed or unsubscribed.
473/// - **Efficient Monitoring**: Each subscription runs in its own async task for optimal performance.
474/// - **Real-time Updates**: Provides immediate notifications when monitored conditions are met.
475///
476/// ## Usage
477/// This struct implements the `Rpc` trait and is typically used as part of a larger
478/// WebSocket server infrastructure to provide real-time blockchain data to clients.
479///
480/// ## Notes
481/// - Each subscription is assigned a unique numeric ID for tracking and management.
482/// - The struct maintains separate maps for different subscription types to optimize performance.
483/// - All async operations are managed through the provided Tokio runtime handle.
484///
485/// ## See Also
486/// - `Rpc`: The trait interface this struct implements
487/// - `RpcAccountSubscribeConfig`: Configuration options for account subscriptions
488pub struct SurfpoolWsRpc {
489    pub uid: atomic::AtomicUsize,
490    pub signature_subscription_map:
491        Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<RpcSignatureResult>>>>>,
492    pub account_subscription_map:
493        Arc<RwLock<HashMap<SubscriptionId, Sink<RpcResponse<UiAccount>>>>>,
494    pub slot_subscription_map: Arc<RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>>,
495    pub tokio_handle: tokio::runtime::Handle,
496}
497
498impl Rpc for SurfpoolWsRpc {
499    type Metadata = Option<SurfpoolWebsocketMeta>;
500
501    /// Implementation of signature subscription for WebSocket clients.
502    ///
503    /// This method handles the complete lifecycle of signature subscriptions:
504    /// 1. Validates the provided signature string format
505    /// 2. Determines the subscription type (received vs commitment-based)
506    /// 3. Checks if the transaction already exists in the desired state
507    /// 4. If found and confirmed, immediately notifies the subscriber
508    /// 5. Otherwise, sets up a continuous monitoring loop
509    /// 6. Spawns an async task to handle ongoing subscription management
510    ///
511    /// # Error Handling
512    /// - Rejects subscription with `InvalidParams` for malformed signatures
513    /// - Handles RPC context retrieval failures
514    /// - Manages subscription cleanup on completion or failure
515    ///
516    /// # Concurrency
517    /// Each subscription runs in its own async task, allowing multiple
518    /// concurrent subscriptions without blocking each other.
519    fn signature_subscribe(
520        &self,
521        meta: Self::Metadata,
522        subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
523        signature_str: String,
524        config: Option<RpcSignatureSubscribeConfig>,
525    ) {
526        let signature = match Signature::from_str(&signature_str) {
527            Ok(sig) => sig,
528            Err(_) => {
529                let error = Error {
530                    code: ErrorCode::InvalidParams,
531                    message: "Invalid signature format.".into(),
532                    data: None,
533                };
534                if let Err(e) = subscriber.reject(error.clone()) {
535                    log::error!("Failed to reject subscriber: {:?}", e);
536                }
537                return;
538            }
539        };
540        let config = config.unwrap_or_default();
541        let subscription_type = if config.enable_received_notification.unwrap_or(false) {
542            SignatureSubscriptionType::Received
543        } else {
544            SignatureSubscriptionType::Commitment(config.commitment.unwrap_or_default().commitment)
545        };
546
547        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
548        let sub_id = SubscriptionId::Number(id as u64);
549        let sink = match subscriber.assign_id(sub_id.clone()) {
550            Ok(sink) => sink,
551            Err(e) => {
552                log::error!("Failed to assign subscription ID: {:?}", e);
553                return;
554            }
555        };
556        let active = Arc::clone(&self.signature_subscription_map);
557        let meta = meta.clone();
558
559        self.tokio_handle.spawn(async move {
560            if let Ok(mut guard) = active.write() {
561                guard.insert(sub_id.clone(), sink);
562            } else {
563                log::error!("Failed to acquire write lock on signature_subscription_map");
564                return;
565            }
566
567            let SurfnetRpcContext {
568                svm_locker,
569                remote_ctx,
570            } = match meta.get_rpc_context(None) {
571                Ok(res) => res,
572                Err(e) => {
573                    log::error!("Failed to get RPC context: {:?}", e);
574                    if let Ok(mut guard) = active.write() {
575                        if let Some(sink) = guard.remove(&sub_id) {
576                            if let Err(e) = sink.notify(Err(e.into())) {
577                                log::error!("Failed to notify client about RPC context error: {e}");
578                            }
579                        }
580                    }
581                    return;
582                }
583            };
584            // get the signature from the SVM to see if it's already been processed
585            let SvmAccessContext {
586                inner: tx_result, ..
587            } = svm_locker.get_transaction(&remote_ctx, &signature).await;
588
589            // if we already had the transaction, check if its confirmation status matches the desired status set by the subscription
590            // if so, notify the user and complete the subscription
591            // otherwise, subscribe to the transaction updates
592            if let GetTransactionResult::FoundTransaction(_, _, tx) = tx_result {
593                match (&subscription_type, tx.confirmation_status) {
594                    (&SignatureSubscriptionType::Received, _)
595                    | (
596                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Processed),
597                        Some(TransactionConfirmationStatus::Processed),
598                    )
599                    | (
600                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Confirmed),
601                        Some(TransactionConfirmationStatus::Confirmed),
602                    )
603                    | (
604                        &SignatureSubscriptionType::Commitment(CommitmentLevel::Finalized),
605                        Some(TransactionConfirmationStatus::Finalized),
606                    ) => {
607                        if let Ok(mut guard) = active.write() {
608                            if let Some(sink) = guard.remove(&sub_id) {
609                                let _ = sink.notify(Ok(RpcResponse {
610                                    context: RpcResponseContext::new(tx.slot),
611                                    value: RpcSignatureResult::ProcessedSignature(
612                                        ProcessedSignatureResult { err: None },
613                                    ),
614                                }));
615                            }
616                        }
617                        return;
618                    }
619                    _ => {}
620                }
621            }
622
623            // update our surfnet SVM to subscribe to the signature updates
624            let rx =
625                svm_locker.subscribe_for_signature_updates(&signature, subscription_type.clone());
626
627            loop {
628                if let Ok((slot, some_err)) = rx.try_recv() {
629                    if let Ok(mut guard) = active.write() {
630                        if let Some(sink) = guard.remove(&sub_id) {
631                            match subscription_type {
632                                SignatureSubscriptionType::Received => {
633                                    let _ = sink.notify(Ok(RpcResponse {
634                                        context: RpcResponseContext::new(slot),
635                                        value: RpcSignatureResult::ReceivedSignature(
636                                            ReceivedSignatureResult::ReceivedSignature,
637                                        ),
638                                    }));
639                                }
640                                SignatureSubscriptionType::Commitment(_) => {
641                                    let _ = sink.notify(Ok(RpcResponse {
642                                        context: RpcResponseContext::new(slot),
643                                        value: RpcSignatureResult::ProcessedSignature(
644                                            ProcessedSignatureResult { err: some_err },
645                                        ),
646                                    }));
647                                }
648                            }
649                        }
650                    }
651                    return;
652                }
653                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
654            }
655        });
656    }
657
658    /// Implementation of signature unsubscription for WebSocket clients.
659    ///
660    /// This method removes an active signature subscription from the internal
661    /// tracking maps, effectively stopping further notifications for that subscription.
662    ///
663    /// # Implementation Details
664    /// - Attempts to remove the subscription from the active subscriptions map
665    /// - Returns success if the subscription existed and was removed
666    /// - Returns an error if the subscription ID was not found
667    ///
668    /// # Thread Safety
669    /// Uses write locks to ensure thread-safe removal from the subscription map.
670    fn signature_unsubscribe(
671        &self,
672        _meta: Option<Self::Metadata>,
673        subscription: SubscriptionId,
674    ) -> Result<bool> {
675        let removed = if let Ok(mut guard) = self.signature_subscription_map.write() {
676            guard.remove(&subscription)
677        } else {
678            log::error!("Failed to acquire write lock on signature_subscription_map");
679            None
680        };
681        if removed.is_some() {
682            Ok(true)
683        } else {
684            Err(Error {
685                code: ErrorCode::InvalidParams,
686                message: "Invalid subscription.".into(),
687                data: None,
688            })
689        }
690    }
691
692    /// Implementation of account subscription for WebSocket clients.
693    ///
694    /// This method handles the complete lifecycle of account subscriptions:
695    /// 1. Validates the provided public key string format
696    /// 2. Parses the subscription configuration (commitment and encoding)
697    /// 3. Generates a unique subscription ID and assigns it to the subscriber
698    /// 4. Spawns an async task to continuously monitor account changes
699    /// 5. Sends notifications whenever the account state changes
700    ///
701    /// # Monitoring Loop
702    /// The spawned task runs a continuous loop that:
703    /// - Checks if the subscription is still active (not unsubscribed)
704    /// - Polls for account updates from the SVM
705    /// - Sends notifications to the subscriber when changes occur
706    /// - Automatically terminates when the subscription is removed
707    ///
708    /// # Error Handling
709    /// - Rejects subscription with `InvalidParams` for malformed public keys
710    /// - Handles encoding configuration for account data serialization
711    /// - Manages subscription cleanup through the monitoring loop
712    ///
713    /// # Performance
714    /// Uses efficient polling with minimal CPU overhead and automatic
715    /// cleanup when subscriptions are no longer needed.
716    fn account_subscribe(
717        &self,
718        meta: Self::Metadata,
719        subscriber: Subscriber<RpcResponse<UiAccount>>,
720        pubkey_str: String,
721        config: Option<RpcAccountSubscribeConfig>,
722    ) {
723        let pubkey = match Pubkey::from_str(&pubkey_str) {
724            Ok(pk) => pk,
725            Err(_) => {
726                let error = Error {
727                    code: ErrorCode::InvalidParams,
728                    message: "Invalid pubkey format.".into(),
729                    data: None,
730                };
731                if subscriber.reject(error.clone()).is_err() {
732                    log::error!("Failed to reject subscriber for invalid pubkey format.");
733                }
734                return;
735            }
736        };
737
738        let config = config.unwrap_or(RpcAccountSubscribeConfig {
739            commitment: None,
740            encoding: None,
741        });
742
743        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
744        let sub_id = SubscriptionId::Number(id as u64);
745        let sink = match subscriber.assign_id(sub_id.clone()) {
746            Ok(sink) => sink,
747            Err(e) => {
748                log::error!("Failed to assign subscription ID: {:?}", e);
749                return;
750            }
751        };
752
753        let account_active = Arc::clone(&self.account_subscription_map);
754        let meta = meta.clone();
755        let svm_locker = match meta.get_svm_locker() {
756            Ok(locker) => locker,
757            Err(e) => {
758                log::error!("Failed to get SVM locker for account subscription: {e}");
759                if let Err(e) = sink.notify(Err(e.into())) {
760                    log::error!(
761                        "Failed to send error notification to client for SVM locker failure: {e}"
762                    );
763                }
764                return;
765            }
766        };
767        let slot = svm_locker.with_svm_reader(|svm| svm.get_latest_absolute_slot());
768
769        self.tokio_handle.spawn(async move {
770            if let Ok(mut guard) = account_active.write() {
771                guard.insert(sub_id.clone(), sink);
772            } else {
773                log::error!("Failed to acquire write lock on account_subscription_map");
774                return;
775            }
776
777            // subscribe to account updates
778            let rx = svm_locker.subscribe_for_account_updates(&pubkey, config.encoding);
779
780            loop {
781                // if the subscription has been removed, break the loop
782                if let Ok(guard) = account_active.read() {
783                    if guard.get(&sub_id).is_none() {
784                        break;
785                    }
786                } else {
787                    log::error!("Failed to acquire read lock on account_subscription_map");
788                    break;
789                }
790
791                if let Ok(ui_account) = rx.try_recv() {
792                    if let Ok(guard) = account_active.read() {
793                        if let Some(sink) = guard.get(&sub_id) {
794                            let _ = sink.notify(Ok(RpcResponse {
795                                context: RpcResponseContext::new(slot),
796                                value: ui_account,
797                            }));
798                        }
799                    }
800                }
801            }
802        });
803    }
804
805    /// Implementation of account unsubscription for WebSocket clients.
806    ///
807    /// This method removes an active account subscription from the internal
808    /// tracking maps, effectively stopping further notifications for that subscription.
809    /// The monitoring loop in the corresponding subscription task will detect this
810    /// removal and automatically terminate.
811    ///
812    /// # Implementation Details
813    /// - Attempts to remove the subscription from the account subscriptions map
814    /// - Returns success if the subscription existed and was removed
815    /// - Returns an error if the subscription ID was not found
816    /// - The removal triggers automatic cleanup of the monitoring task
817    ///
818    /// # Thread Safety
819    /// Uses write locks to ensure thread-safe removal from the subscription map.
820    /// The monitoring task uses read locks to check subscription status, creating
821    /// a clean synchronization pattern.
822    fn account_unsubscribe(
823        &self,
824        _meta: Option<Self::Metadata>,
825        subscription: SubscriptionId,
826    ) -> Result<bool> {
827        let removed = if let Ok(mut guard) = self.account_subscription_map.write() {
828            guard.remove(&subscription)
829        } else {
830            log::error!("Failed to acquire write lock on account_subscription_map");
831            None
832        };
833        if removed.is_some() {
834            Ok(true)
835        } else {
836            Err(Error {
837                code: ErrorCode::InvalidParams,
838                message: "Invalid subscription.".into(),
839                data: None,
840            })
841        }
842    }
843
844    fn slot_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<SlotInfo>) {
845        let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst);
846        let sub_id = SubscriptionId::Number(id as u64);
847        let sink = match subscriber.assign_id(sub_id.clone()) {
848            Ok(sink) => sink,
849            Err(e) => {
850                log::error!("Failed to assign subscription ID: {:?}", e);
851                return;
852            }
853        };
854
855        let slot_active = Arc::clone(&self.slot_subscription_map);
856        let meta = meta.clone();
857
858        let svm_locker = match meta.get_svm_locker() {
859            Ok(locker) => locker,
860            Err(e) => {
861                log::error!("Failed to get SVM locker for slot subscription: {e}");
862                if let Err(e) = sink.notify(Err(e.into())) {
863                    log::error!(
864                        "Failed to send error notification to client for SVM locker failure: {e}"
865                    );
866                }
867                return;
868            }
869        };
870
871        self.tokio_handle.spawn(async move {
872            if let Ok(mut guard) = slot_active.write() {
873                guard.insert(sub_id.clone(), sink);
874            } else {
875                log::error!("Failed to acquire write lock on slot_subscription_map");
876                return;
877            }
878
879            let rx = svm_locker.subscribe_for_slot_updates();
880
881            loop {
882                // if the subscription has been removed, break the loop
883                if let Ok(guard) = slot_active.read() {
884                    if guard.get(&sub_id).is_none() {
885                        break;
886                    }
887                } else {
888                    log::error!("Failed to acquire read lock on slot_subscription_map");
889                    break;
890                }
891
892                if let Ok(slot_info) = rx.try_recv() {
893                    if let Ok(guard) = slot_active.read() {
894                        if let Some(sink) = guard.get(&sub_id) {
895                            let _ = sink.notify(Ok(slot_info));
896                        }
897                    }
898                }
899            }
900        });
901    }
902
903    fn slot_unsubscribe(
904        &self,
905        _meta: Option<Self::Metadata>,
906        subscription: SubscriptionId,
907    ) -> Result<bool> {
908        let removed = if let Ok(mut guard) = self.slot_subscription_map.write() {
909            guard.remove(&subscription)
910        } else {
911            log::error!("Failed to acquire write lock on slot_subscription_map");
912            None
913        };
914        if removed.is_some() {
915            Ok(true)
916        } else {
917            Err(Error {
918                code: ErrorCode::InvalidParams,
919                message: "Invalid subscription.".into(),
920                data: None,
921            })
922        }
923    }
924}